ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerArc.py
Revision: 1.5
Committed: Tue Mar 24 11:52:12 2009 UTC (16 years, 1 month ago) by edelmann
Content type: text/x-python
Branch: MAIN
Changes since 1.4: +0 -18 lines
Log Message:
python/SchedulerArc.py: Removed check on sandbox size, unnecessary for ARC.

File Contents

# User Rev Content
1 edelmann 1.1
2     from SchedulerGrid import SchedulerGrid
3     from Scheduler import Scheduler
4     from crab_exceptions import *
5     from Boss import Boss
6     import common
7     import string, time, os
8     from crab_util import *
9     from WMCore.SiteScreening.BlackWhiteListParser import CEBlackWhiteListParser, \
10     SEBlackWhiteListParser
11    
12     import sys
13 edelmann 1.4 import sha # Good for python 2.4, replaced with hashlib in 2.5
14 edelmann 1.1
15     #
16     # Naming convention:
17     # methods starting with 'ws' are responsible to provide
18     # corresponding part of the job script ('ws' stands for 'write script').
19     #
20    
21     class SchedulerArc(SchedulerGrid):
22     def __init__(self, name='ARC'):
23     sys.stderr.write("python/SchedulerArc.__init__\n")
24     SchedulerGrid.__init__(self,name)
25     return
26    
27 edelmann 1.4 def envUniqueID(self):
28     taskHash = sha.new(common._db.queryTask('name')).hexdigest()
29     id = 'https://' + self.name() + '/' + taskHash + '/${NJob}'
30     msg = 'JobID for ML monitoring is created for ARC scheduler: %s' % id
31     common.logger.debug(5, msg)
32     return id
33    
34 edelmann 1.1 def realSchedParams(self,cfg_params):
35     """
36     """
37     sys.stderr.write("python/SchedulerArc.realSchedParams\n")
38     return {}
39    
40     def configure(self, cfg_params):
41     print "python/SchedulerArc.configure - in"
42     print "cfg_params:", cfg_params
43    
44     self.cfg_params = cfg_params
45     Scheduler.configure(self,cfg_params)
46    
47 edelmann 1.4 self.environment_unique_identifier = None
48 edelmann 1.1
49     # init BlackWhiteListParser
50     seWhiteList = cfg_params.get('EDG.se_white_list',[])
51     seBlackList = cfg_params.get('EDG.se_black_list',[])
52     self.blackWhiteListParser = SEBlackWhiteListParser(seWhiteList, seBlackList, common.logger)
53    
54     self.proxyValid=0
55     self.dontCheckProxy=int(cfg_params.get("EDG.dont_check_proxy",0))
56    
57     self.proxyServer = cfg_params.get("EDG.proxy_server",'myproxy.cern.ch')
58     common.logger.debug(5,'Setting myproxy server to '+self.proxyServer)
59    
60     self.group = cfg_params.get("EDG.group", None)
61    
62     self.role = cfg_params.get("EDG.role", None)
63    
64     removeT1bL = cfg_params.get("EDG.remove_default_blacklist", 0 )
65    
66     T1_BL = ["fnal.gov", "gridka.de" ,"w-ce01.grid.sinica.edu.tw", "w-ce02.grid.sinica.edu.tw", "lcg00125.grid.sinica.edu.tw",\
67     "gridpp.rl.ac.uk" , "cclcgceli03.in2p3.fr","cclcgceli04.in2p3.fr" , "pic.es", "cnaf"]
68     if int(removeT1bL) == 1:
69     T1_BL = []
70     self.EDG_ce_black_list = cfg_params.get('EDG.ce_black_list',None)
71     if (self.EDG_ce_black_list):
72     self.EDG_ce_black_list = string.split(self.EDG_ce_black_list,',') + T1_BL
73     else :
74     if int(removeT1bL) == 0: self.EDG_ce_black_list = T1_BL
75     self.EDG_ce_white_list = cfg_params.get('EDG.ce_white_list',None)
76     if (self.EDG_ce_white_list): self.EDG_ce_white_list = string.split(self.EDG_ce_white_list,',')
77    
78    
79     self.VO = cfg_params.get('EDG.virtual_organization','cms')
80    
81     self.return_data = cfg_params.get('USER.return_data',0)
82    
83     self.publish_data = cfg_params.get("USER.publish_data",0)
84    
85     self.copy_data = cfg_params.get("USER.copy_data",0)
86     if int(self.copy_data) == 1:
87     self.SE = cfg_params.get('USER.storage_element',None)
88     if not self.SE:
89     msg = "Error. The [USER] section does not have 'storage_element'"
90     common.logger.message(msg)
91     raise CrabException(msg)
92    
93     if ( int(self.return_data) == 0 and int(self.copy_data) == 0 ):
94     msg = 'Error: return_data = 0 and copy_data = 0 ==> your exe output will be lost\n'
95     msg = msg + 'Please modify return_data and copy_data value in your crab.cfg file\n'
96     raise CrabException(msg)
97    
98     if ( int(self.return_data) == 1 and int(self.copy_data) == 1 ):
99     msg = 'Error: return_data and copy_data cannot be set both to 1\n'
100     msg = msg + 'Please modify return_data or copy_data value in your crab.cfg file\n'
101     raise CrabException(msg)
102    
103     if ( int(self.copy_data) == 0 and int(self.publish_data) == 1 ):
104     msg = 'Warning: publish_data = 1 must be used with copy_data = 1\n'
105     msg = msg + 'Please modify copy_data value in your crab.cfg file\n'
106     common.logger.message(msg)
107     raise CrabException(msg)
108    
109     self.EDG_requirements = cfg_params.get('EDG.requirements',None)
110    
111     self.EDG_addJdlParam = cfg_params.get('EDG.additional_jdl_parameters',None)
112     if (self.EDG_addJdlParam): self.EDG_addJdlParam = string.split(self.EDG_addJdlParam,';')
113    
114     self.EDG_retry_count = cfg_params.get('EDG.retry_count',0)
115    
116     self.EDG_shallow_retry_count= cfg_params.get('EDG.shallow_retry_count',-1)
117    
118     self.EDG_clock_time = cfg_params.get('EDG.max_wall_clock_time',None)
119    
120     # Default minimum CPU time to >= 130 minutes
121     self.EDG_cpu_time = cfg_params.get('EDG.max_cpu_time', '130')
122    
123     self.debug_wrapper = int(cfg_params.get('USER.debug_wrapper',0))
124     self.debugWrap=''
125     if self.debug_wrapper==1: self.debugWrap='--debug'
126    
127     self.check_RemoteDir = int(cfg_params.get('USER.check_user_remote_dir',0))
128    
129     ## Add EDG_WL_LOCATION to the python path
130     #
131     #if not os.environ.has_key('EDG_WL_LOCATION'):
132     # msg = "Error: the EDG_WL_LOCATION variable is not set."
133     # raise CrabException(msg)
134     #path = os.environ['EDG_WL_LOCATION']
135     path = ''
136    
137     libPath=os.path.join(path, "lib")
138     sys.path.append(libPath)
139     libPath=os.path.join(path, "lib", "python")
140     sys.path.append(libPath)
141    
142     self.jobtypeName = cfg_params.get('CRAB.jobtype','')
143     self.schedulerName = cfg_params.get('CRAB.scheduler','')
144    
145     self.checkProxy()
146     return
147    
148    
149     #def rb_configure(self, RB):
150     # Since ARC doesn't have RB:s, parent's 'None'
151     # returning implementation ought to be fine for us.
152    
153     def ce_list(self):
154     """
155     Returns string with requirement CE related
156     """
157     sys.stderr.write("python/SchedulerArc.ce_list\n")
158    
159     ceParser = CEBlackWhiteListParser(self.EDG_ce_white_list,
160     self.EDG_ce_black_list, common.logger)
161     req = ''
162     ce_white_list = []
163     ce_black_list = []
164    
165     if self.EDG_ce_white_list:
166     ce_white_list = ceParser.whiteList()
167     tmpCe=[]
168     for ce in ce_white_list:
169     tmpCe.append('RegExp("' + string.strip(ce) + '", other.GlueCEUniqueId)')
170     if len(tmpCe) == 1:
171     req += " && ( " + tmpCe[0] + " ) "
172     elif len(tmpCe) > 1:
173     concString = ") || ("
174     req += " && ( (" + concString.join(tmpCe) +") )"
175     # Do we need all those parentesis above? Or could we do:
176     #concString = " || "
177     #req += " && ( " + concString.join(tmpCe) +" )"
178    
179     if self.EDG_ce_black_list:
180     ce_black_list = ceParser.blackList()
181     tmpCe=[]
182     concString = '&&'
183     for ce in ce_black_list:
184     tmpCe.append('(!RegExp("' + string.strip(ce) + '", other.GlueCEUniqueId))')
185     if len(tmpCe): req += " && (" + concString.join(tmpCe) + ") "
186    
187     ## requirement added to skip gliteCE
188     #req += '&& (!RegExp("blah", other.GlueCEUniqueId))'
189    
190     retWL = ','.join(ce_white_list)
191     retBL = ','.join(ce_black_list)
192     if not retWL:
193     retWL = None
194     if not retBL:
195     retBL = None
196    
197     sys.stderr.write("ce_list: %s, %s, %s\n" % (req, str(retWL), str(retBL)))
198    
199     return req, retWL, retBL
200    
201    
202     def se_list(self, id, dest):
203     sys.stderr.write("python/SchedulerArc.se_list\n")
204     se_white = self.blackWhiteListParser.whiteList()
205     se_black = self.blackWhiteListParser.blackList()
206     return '', se_white, se_black
207    
208    
209     def sched_parameter(self,i,task):
210     """
211     Returns parameter scheduler-specific, to use with BOSS .
212     """
213     se_dls = task.jobs[i-1]['dlsDestination']
214     blah, se_white, se_black = self.se_list(i, se_dls)
215    
216     se_list = []
217     for se in se_dls:
218     if se_white:
219     if se in se_white: se_list.append(se)
220     elif se_black:
221     if se not in se_black: se_list.append(se)
222     else:
223     se_list.append(se)
224     # FIXME: Check that se_list contains at least one SE!
225    
226 edelmann 1.2 ce_list = self.listMatch(se_list, 'False')
227 edelmann 1.1
228     s = ""
229     if len(ce_list) > 0:
230 edelmann 1.2
231 edelmann 1.1 # A ce-list with more than one element must be an OR:ed
232     # list: (|(cluster=ce1)(cluster=ce2)...)
233     if len(ce_list) > 1:
234     s += '(|'
235     for ce in ce_list:
236     s += '(cluster=%s)' % ce
237     if len(ce_list) > 1:
238     s += ')'
239    
240     # FIXME: If len(ce_list) == 0 ==> s = "" ==> we'll submit
241     # "anywhere", which is completely contrary behaviour to what we want!
242     # len(ce_list) == 0 means there were _no_ CE in ce_infoSys that
243     # survived the white- and black-list filter, so we shouldn't submit
244     # at all!
245    
246     return s
247    
248    
249     def wsExitFunc(self):
250     """
251     Returns part of a job script which does scheduler-specific
252     output checks and management.
253     """
254     txt = '\n'
255    
256     txt += '#\n'
257     txt += '# EXECUTE THIS FUNCTION BEFORE EXIT \n'
258     txt += '#\n\n'
259    
260     txt += 'func_exit() { \n'
261     txt += self.wsExitFunc_common()
262    
263     # Remove ".BrokerInfo" that the code generated by
264     # self.wsExitFunc_common() adds to $final_list. (This is an ugly
265     # hack -- the "good" solution would be to add ARC-knowledge to
266     # self.wsExitFunc_common())
267     txt += " final_list=${final_list%.BrokerInfo}\n"
268    
269     txt += ' echo "JOB_EXIT_STATUS = $job_exit_code"\n'
270     txt += ' echo "JobExitCode=$job_exit_code" >> $RUNTIME_AREA/$repo\n'
271     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
272     txt += ' if [ $exceed -ne 1 ]; then\n'
273     txt += ' tar zcvf ${out_files}.tgz ${final_list}\n'
274     txt += ' else\n'
275     txt += ' tar zcvf ${out_files}.tgz CMSSW_${NJob}.stdout CMSSW_${NJob}.stderr\n'
276     txt += ' fi\n'
277     txt += ' exit $job_exit_code\n'
278     txt += '}\n'
279     return txt
280    
281    
282     def tags(self):
283     sys.stderr.write("python/SchedulerArc.tags\n")
284     return ''
285    
286     def submit(self,list,task):
287     """ submit to scheduler a list of jobs """
288     sys.stderr.write("python/SchedulerArc.submit\n")
289     if (not len(list)):
290     common.logger.message("No sites where to submit jobs")
291     req=str(self.sched_parameter(list[0],task))
292    
293     ### reduce collection size...if needed
294     new_list = bulkControl(self,list)
295    
296     for sub_list in new_list:
297     self.boss().submit(task['id'],sub_list,req)
298     return
299    
300     def queryEverything(self,taskid):
301     """
302     Query needed info of all jobs with specified boss taskid
303     """
304     return self.boss().queryEverything(taskid)
305    
306    
307     def cancel(self,ids):
308     """
309     Cancel the job(s) with ids (a list of id's)
310     """
311     self._boss.cancel(ids)
312     return
313    
314     def decodeLogInfo(self, file):
315     """
316     Parse logging info file and return main info
317     """
318     return
319    
320     def writeJDL(self, list, task):
321     """
322     Materialize JDL for a list of jobs
323     """
324     req=str(self.sched_parameter(list[0],task))
325     new_list = bulkControl(self,list)
326     jdl=[]
327     for sub_list in new_list:
328     tmp_jdl = self.boss().writeJDL(task['id'], sub_list, req)
329     jdl.append(tmp_jdl)
330     return jdl