ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerArc.py
Revision: 1.6
Committed: Tue Mar 24 14:47:40 2009 UTC (16 years, 1 month ago) by edelmann
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_5_1, CRAB_2_5_1_pre4, CRAB_2_5_1_pre3, CRAB_2_5_1_pre2, CRAB_2_5_1_pre1
Changes since 1.5: +7 -108 lines
Log Message:
python/SchedulerArc.configure(): Rely on parent's implementation, with just
a few bits of our own added.

File Contents

# User Rev Content
1 edelmann 1.1
2     from SchedulerGrid import SchedulerGrid
3     from Scheduler import Scheduler
4     from crab_exceptions import *
5     from Boss import Boss
6     import common
7     import string, time, os
8     from crab_util import *
9     from WMCore.SiteScreening.BlackWhiteListParser import CEBlackWhiteListParser, \
10     SEBlackWhiteListParser
11    
12     import sys
13 edelmann 1.4 import sha # Good for python 2.4, replaced with hashlib in 2.5
14 edelmann 1.1
15     #
16     # Naming convention:
17     # methods starting with 'ws' are responsible to provide
18     # corresponding part of the job script ('ws' stands for 'write script').
19     #
20    
21     class SchedulerArc(SchedulerGrid):
22     def __init__(self, name='ARC'):
23     sys.stderr.write("python/SchedulerArc.__init__\n")
24     SchedulerGrid.__init__(self,name)
25     return
26    
27 edelmann 1.4 def envUniqueID(self):
28     taskHash = sha.new(common._db.queryTask('name')).hexdigest()
29     id = 'https://' + self.name() + '/' + taskHash + '/${NJob}'
30     msg = 'JobID for ML monitoring is created for ARC scheduler: %s' % id
31     common.logger.debug(5, msg)
32     return id
33    
34 edelmann 1.1 def realSchedParams(self,cfg_params):
35     """
36     """
37     sys.stderr.write("python/SchedulerArc.realSchedParams\n")
38     return {}
39    
40    
41 edelmann 1.6 def configure(self,cfg_params):
42 edelmann 1.1
43 edelmann 1.6 if not os.environ.has_key('EDG_WL_LOCATION'):
44     # This is an ugly hack needed for SchedulerGrid.configure() to
45     # work!
46     os.environ['EDG_WL_LOCATION'] = ''
47    
48     SchedulerGrid.configure(self, cfg_params)
49 edelmann 1.4 self.environment_unique_identifier = None
50 edelmann 1.1
51    
52     def ce_list(self):
53     """
54     Returns string with requirement CE related
55     """
56     sys.stderr.write("python/SchedulerArc.ce_list\n")
57    
58     ceParser = CEBlackWhiteListParser(self.EDG_ce_white_list,
59     self.EDG_ce_black_list, common.logger)
60     req = ''
61     ce_white_list = []
62     ce_black_list = []
63    
64     if self.EDG_ce_white_list:
65     ce_white_list = ceParser.whiteList()
66     tmpCe=[]
67     for ce in ce_white_list:
68     tmpCe.append('RegExp("' + string.strip(ce) + '", other.GlueCEUniqueId)')
69     if len(tmpCe) == 1:
70     req += " && ( " + tmpCe[0] + " ) "
71     elif len(tmpCe) > 1:
72     concString = ") || ("
73     req += " && ( (" + concString.join(tmpCe) +") )"
74     # Do we need all those parentesis above? Or could we do:
75     #concString = " || "
76     #req += " && ( " + concString.join(tmpCe) +" )"
77    
78     if self.EDG_ce_black_list:
79     ce_black_list = ceParser.blackList()
80     tmpCe=[]
81     concString = '&&'
82     for ce in ce_black_list:
83     tmpCe.append('(!RegExp("' + string.strip(ce) + '", other.GlueCEUniqueId))')
84     if len(tmpCe): req += " && (" + concString.join(tmpCe) + ") "
85    
86     ## requirement added to skip gliteCE
87     #req += '&& (!RegExp("blah", other.GlueCEUniqueId))'
88    
89     retWL = ','.join(ce_white_list)
90     retBL = ','.join(ce_black_list)
91     if not retWL:
92     retWL = None
93     if not retBL:
94     retBL = None
95    
96     sys.stderr.write("ce_list: %s, %s, %s\n" % (req, str(retWL), str(retBL)))
97    
98     return req, retWL, retBL
99    
100    
101     def se_list(self, id, dest):
102     sys.stderr.write("python/SchedulerArc.se_list\n")
103     se_white = self.blackWhiteListParser.whiteList()
104     se_black = self.blackWhiteListParser.blackList()
105     return '', se_white, se_black
106    
107    
108     def sched_parameter(self,i,task):
109     """
110     Returns parameter scheduler-specific, to use with BOSS .
111     """
112     se_dls = task.jobs[i-1]['dlsDestination']
113     blah, se_white, se_black = self.se_list(i, se_dls)
114    
115     se_list = []
116     for se in se_dls:
117     if se_white:
118     if se in se_white: se_list.append(se)
119     elif se_black:
120     if se not in se_black: se_list.append(se)
121     else:
122     se_list.append(se)
123     # FIXME: Check that se_list contains at least one SE!
124    
125 edelmann 1.2 ce_list = self.listMatch(se_list, 'False')
126 edelmann 1.1
127     s = ""
128     if len(ce_list) > 0:
129 edelmann 1.2
130 edelmann 1.1 # A ce-list with more than one element must be an OR:ed
131     # list: (|(cluster=ce1)(cluster=ce2)...)
132     if len(ce_list) > 1:
133     s += '(|'
134     for ce in ce_list:
135     s += '(cluster=%s)' % ce
136     if len(ce_list) > 1:
137     s += ')'
138    
139     # FIXME: If len(ce_list) == 0 ==> s = "" ==> we'll submit
140     # "anywhere", which is completely contrary behaviour to what we want!
141     # len(ce_list) == 0 means there were _no_ CE in ce_infoSys that
142     # survived the white- and black-list filter, so we shouldn't submit
143     # at all!
144    
145     return s
146    
147    
148     def wsExitFunc(self):
149     """
150     Returns part of a job script which does scheduler-specific
151     output checks and management.
152     """
153     txt = '\n'
154    
155     txt += '#\n'
156     txt += '# EXECUTE THIS FUNCTION BEFORE EXIT \n'
157     txt += '#\n\n'
158    
159     txt += 'func_exit() { \n'
160     txt += self.wsExitFunc_common()
161    
162     # Remove ".BrokerInfo" that the code generated by
163     # self.wsExitFunc_common() adds to $final_list. (This is an ugly
164     # hack -- the "good" solution would be to add ARC-knowledge to
165     # self.wsExitFunc_common())
166     txt += " final_list=${final_list%.BrokerInfo}\n"
167    
168     txt += ' echo "JOB_EXIT_STATUS = $job_exit_code"\n'
169     txt += ' echo "JobExitCode=$job_exit_code" >> $RUNTIME_AREA/$repo\n'
170     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
171     txt += ' if [ $exceed -ne 1 ]; then\n'
172     txt += ' tar zcvf ${out_files}.tgz ${final_list}\n'
173     txt += ' else\n'
174     txt += ' tar zcvf ${out_files}.tgz CMSSW_${NJob}.stdout CMSSW_${NJob}.stderr\n'
175     txt += ' fi\n'
176     txt += ' exit $job_exit_code\n'
177     txt += '}\n'
178     return txt
179    
180    
181     def tags(self):
182     sys.stderr.write("python/SchedulerArc.tags\n")
183     return ''
184    
185     def submit(self,list,task):
186     """ submit to scheduler a list of jobs """
187     sys.stderr.write("python/SchedulerArc.submit\n")
188     if (not len(list)):
189     common.logger.message("No sites where to submit jobs")
190     req=str(self.sched_parameter(list[0],task))
191    
192     ### reduce collection size...if needed
193     new_list = bulkControl(self,list)
194    
195     for sub_list in new_list:
196     self.boss().submit(task['id'],sub_list,req)
197     return
198    
199     def queryEverything(self,taskid):
200     """
201     Query needed info of all jobs with specified boss taskid
202     """
203     return self.boss().queryEverything(taskid)
204    
205    
206     def cancel(self,ids):
207     """
208     Cancel the job(s) with ids (a list of id's)
209     """
210     self._boss.cancel(ids)
211     return
212    
213     def decodeLogInfo(self, file):
214     """
215     Parse logging info file and return main info
216     """
217     return
218    
219     def writeJDL(self, list, task):
220     """
221     Materialize JDL for a list of jobs
222     """
223     req=str(self.sched_parameter(list[0],task))
224     new_list = bulkControl(self,list)
225     jdl=[]
226     for sub_list in new_list:
227     tmp_jdl = self.boss().writeJDL(task['id'], sub_list, req)
228     jdl.append(tmp_jdl)
229     return jdl