ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerArc.py
Revision: 1.3
Committed: Thu Mar 19 15:43:33 2009 UTC (16 years, 1 month ago) by edelmann
Content type: text/x-python
Branch: MAIN
Changes since 1.2: +0 -6 lines
Log Message:
python/SchedulerArc.py: removed loggingInfo(); rely on parent's
implementation instead.

File Contents

# User Rev Content
1 edelmann 1.1
2     from SchedulerGrid import SchedulerGrid
3     from Scheduler import Scheduler
4     from crab_exceptions import *
5     from Boss import Boss
6     import common
7     import string, time, os
8     from crab_util import *
9     from WMCore.SiteScreening.BlackWhiteListParser import CEBlackWhiteListParser, \
10     SEBlackWhiteListParser
11    
12     import sys
13    
14     #
15     # Naming convention:
16     # methods starting with 'ws' are responsible to provide
17     # corresponding part of the job script ('ws' stands for 'write script').
18     #
19    
20     class SchedulerArc(SchedulerGrid):
21     def __init__(self, name='ARC'):
22     sys.stderr.write("python/SchedulerArc.__init__\n")
23     SchedulerGrid.__init__(self,name)
24     self.OSBsize = 55000000
25     return
26    
27     def realSchedParams(self,cfg_params):
28     """
29     """
30     sys.stderr.write("python/SchedulerArc.realSchedParams\n")
31     return {}
32    
33     def configure(self, cfg_params):
34     print "python/SchedulerArc.configure - in"
35     print "cfg_params:", cfg_params
36    
37     self.cfg_params = cfg_params
38     Scheduler.configure(self,cfg_params)
39    
40     # environment_unique_identifier will be used to set $MonitorJobID
41     # in the job.
42     #
43     # FIXME: I'm not sure if this will work. Maybe the identifier
44     # should adhere to some special format? Is it OK to rely on SGE
45     # specific things like $SGE_O_HOST ?
46     self.environment_unique_identifier = '${SGE_O_HOST}-${HOME##*/}'
47    
48     # init BlackWhiteListParser
49     seWhiteList = cfg_params.get('EDG.se_white_list',[])
50     seBlackList = cfg_params.get('EDG.se_black_list',[])
51     self.blackWhiteListParser = SEBlackWhiteListParser(seWhiteList, seBlackList, common.logger)
52    
53     self.proxyValid=0
54     self.dontCheckProxy=int(cfg_params.get("EDG.dont_check_proxy",0))
55    
56     self.proxyServer = cfg_params.get("EDG.proxy_server",'myproxy.cern.ch')
57     common.logger.debug(5,'Setting myproxy server to '+self.proxyServer)
58    
59     self.group = cfg_params.get("EDG.group", None)
60    
61     self.role = cfg_params.get("EDG.role", None)
62    
63     removeT1bL = cfg_params.get("EDG.remove_default_blacklist", 0 )
64    
65     T1_BL = ["fnal.gov", "gridka.de" ,"w-ce01.grid.sinica.edu.tw", "w-ce02.grid.sinica.edu.tw", "lcg00125.grid.sinica.edu.tw",\
66     "gridpp.rl.ac.uk" , "cclcgceli03.in2p3.fr","cclcgceli04.in2p3.fr" , "pic.es", "cnaf"]
67     if int(removeT1bL) == 1:
68     T1_BL = []
69     self.EDG_ce_black_list = cfg_params.get('EDG.ce_black_list',None)
70     if (self.EDG_ce_black_list):
71     self.EDG_ce_black_list = string.split(self.EDG_ce_black_list,',') + T1_BL
72     else :
73     if int(removeT1bL) == 0: self.EDG_ce_black_list = T1_BL
74     self.EDG_ce_white_list = cfg_params.get('EDG.ce_white_list',None)
75     if (self.EDG_ce_white_list): self.EDG_ce_white_list = string.split(self.EDG_ce_white_list,',')
76    
77    
78     self.VO = cfg_params.get('EDG.virtual_organization','cms')
79    
80     self.return_data = cfg_params.get('USER.return_data',0)
81    
82     self.publish_data = cfg_params.get("USER.publish_data",0)
83    
84     self.copy_data = cfg_params.get("USER.copy_data",0)
85     if int(self.copy_data) == 1:
86     self.SE = cfg_params.get('USER.storage_element',None)
87     if not self.SE:
88     msg = "Error. The [USER] section does not have 'storage_element'"
89     common.logger.message(msg)
90     raise CrabException(msg)
91    
92     if ( int(self.return_data) == 0 and int(self.copy_data) == 0 ):
93     msg = 'Error: return_data = 0 and copy_data = 0 ==> your exe output will be lost\n'
94     msg = msg + 'Please modify return_data and copy_data value in your crab.cfg file\n'
95     raise CrabException(msg)
96    
97     if ( int(self.return_data) == 1 and int(self.copy_data) == 1 ):
98     msg = 'Error: return_data and copy_data cannot be set both to 1\n'
99     msg = msg + 'Please modify return_data or copy_data value in your crab.cfg file\n'
100     raise CrabException(msg)
101    
102     if ( int(self.copy_data) == 0 and int(self.publish_data) == 1 ):
103     msg = 'Warning: publish_data = 1 must be used with copy_data = 1\n'
104     msg = msg + 'Please modify copy_data value in your crab.cfg file\n'
105     common.logger.message(msg)
106     raise CrabException(msg)
107    
108     self.EDG_requirements = cfg_params.get('EDG.requirements',None)
109    
110     self.EDG_addJdlParam = cfg_params.get('EDG.additional_jdl_parameters',None)
111     if (self.EDG_addJdlParam): self.EDG_addJdlParam = string.split(self.EDG_addJdlParam,';')
112    
113     self.EDG_retry_count = cfg_params.get('EDG.retry_count',0)
114    
115     self.EDG_shallow_retry_count= cfg_params.get('EDG.shallow_retry_count',-1)
116    
117     self.EDG_clock_time = cfg_params.get('EDG.max_wall_clock_time',None)
118    
119     # Default minimum CPU time to >= 130 minutes
120     self.EDG_cpu_time = cfg_params.get('EDG.max_cpu_time', '130')
121    
122     self.debug_wrapper = int(cfg_params.get('USER.debug_wrapper',0))
123     self.debugWrap=''
124     if self.debug_wrapper==1: self.debugWrap='--debug'
125    
126     self.check_RemoteDir = int(cfg_params.get('USER.check_user_remote_dir',0))
127    
128     ## Add EDG_WL_LOCATION to the python path
129     #
130     #if not os.environ.has_key('EDG_WL_LOCATION'):
131     # msg = "Error: the EDG_WL_LOCATION variable is not set."
132     # raise CrabException(msg)
133     #path = os.environ['EDG_WL_LOCATION']
134     path = ''
135    
136     libPath=os.path.join(path, "lib")
137     sys.path.append(libPath)
138     libPath=os.path.join(path, "lib", "python")
139     sys.path.append(libPath)
140    
141     self.jobtypeName = cfg_params.get('CRAB.jobtype','')
142     self.schedulerName = cfg_params.get('CRAB.scheduler','')
143    
144     self.checkProxy()
145     return
146    
147    
148     #def rb_configure(self, RB):
149     # Since ARC doesn't have RB:s, parent's 'None'
150     # returning implementation ought to be fine for us.
151    
152     def ce_list(self):
153     """
154     Returns string with requirement CE related
155     """
156     sys.stderr.write("python/SchedulerArc.ce_list\n")
157    
158     ceParser = CEBlackWhiteListParser(self.EDG_ce_white_list,
159     self.EDG_ce_black_list, common.logger)
160     req = ''
161     ce_white_list = []
162     ce_black_list = []
163    
164     if self.EDG_ce_white_list:
165     ce_white_list = ceParser.whiteList()
166     tmpCe=[]
167     for ce in ce_white_list:
168     tmpCe.append('RegExp("' + string.strip(ce) + '", other.GlueCEUniqueId)')
169     if len(tmpCe) == 1:
170     req += " && ( " + tmpCe[0] + " ) "
171     elif len(tmpCe) > 1:
172     concString = ") || ("
173     req += " && ( (" + concString.join(tmpCe) +") )"
174     # Do we need all those parentesis above? Or could we do:
175     #concString = " || "
176     #req += " && ( " + concString.join(tmpCe) +" )"
177    
178     if self.EDG_ce_black_list:
179     ce_black_list = ceParser.blackList()
180     tmpCe=[]
181     concString = '&&'
182     for ce in ce_black_list:
183     tmpCe.append('(!RegExp("' + string.strip(ce) + '", other.GlueCEUniqueId))')
184     if len(tmpCe): req += " && (" + concString.join(tmpCe) + ") "
185    
186     ## requirement added to skip gliteCE
187     #req += '&& (!RegExp("blah", other.GlueCEUniqueId))'
188    
189     retWL = ','.join(ce_white_list)
190     retBL = ','.join(ce_black_list)
191     if not retWL:
192     retWL = None
193     if not retBL:
194     retBL = None
195    
196     sys.stderr.write("ce_list: %s, %s, %s\n" % (req, str(retWL), str(retBL)))
197    
198     return req, retWL, retBL
199    
200    
201     def se_list(self, id, dest):
202     sys.stderr.write("python/SchedulerArc.se_list\n")
203     se_white = self.blackWhiteListParser.whiteList()
204     se_black = self.blackWhiteListParser.blackList()
205     return '', se_white, se_black
206    
207    
208     def sched_parameter(self,i,task):
209     """
210     Returns parameter scheduler-specific, to use with BOSS .
211     """
212     se_dls = task.jobs[i-1]['dlsDestination']
213     blah, se_white, se_black = self.se_list(i, se_dls)
214    
215     se_list = []
216     for se in se_dls:
217     if se_white:
218     if se in se_white: se_list.append(se)
219     elif se_black:
220     if se not in se_black: se_list.append(se)
221     else:
222     se_list.append(se)
223     # FIXME: Check that se_list contains at least one SE!
224    
225 edelmann 1.2 ce_list = self.listMatch(se_list, 'False')
226 edelmann 1.1
227     s = ""
228     if len(ce_list) > 0:
229 edelmann 1.2
230 edelmann 1.1 # A ce-list with more than one element must be an OR:ed
231     # list: (|(cluster=ce1)(cluster=ce2)...)
232     if len(ce_list) > 1:
233     s += '(|'
234     for ce in ce_list:
235     s += '(cluster=%s)' % ce
236     if len(ce_list) > 1:
237     s += ')'
238    
239     # FIXME: If len(ce_list) == 0 ==> s = "" ==> we'll submit
240     # "anywhere", which is completely contrary behaviour to what we want!
241     # len(ce_list) == 0 means there were _no_ CE in ce_infoSys that
242     # survived the white- and black-list filter, so we shouldn't submit
243     # at all!
244    
245     return s
246    
247    
248     def wsExitFunc(self):
249     """
250     Returns part of a job script which does scheduler-specific
251     output checks and management.
252     """
253     txt = '\n'
254    
255     txt += '#\n'
256     txt += '# EXECUTE THIS FUNCTION BEFORE EXIT \n'
257     txt += '#\n\n'
258    
259     txt += 'func_exit() { \n'
260     txt += self.wsExitFunc_common()
261    
262     # Remove ".BrokerInfo" that the code generated by
263     # self.wsExitFunc_common() adds to $final_list. (This is an ugly
264     # hack -- the "good" solution would be to add ARC-knowledge to
265     # self.wsExitFunc_common())
266     txt += " final_list=${final_list%.BrokerInfo}\n"
267    
268     ### specific Glite check for OSB
269     txt += ' tar zcvf ${out_files}.tgz ${final_list}\n'
270     txt += ' tmp_size=`ls -gGrta ${out_files}.tgz | awk \'{ print $3 }\'`\n'
271     txt += ' rm ${out_files}.tgz\n'
272     txt += ' size=`expr $tmp_size`\n'
273     txt += ' echo "Total Output dimension: $size"\n'
274     txt += ' limit='+str(self.OSBsize) +' \n'
275     txt += ' echo "WARNING: output files size limit is set to: $limit"\n'
276     txt += ' if [ "$limit" -lt "$size" ]; then\n'
277     txt += ' exceed=1\n'
278     txt += ' job_exit_code=70000\n'
279     txt += ' echo "Output Sanbox too big. Produced output is lost "\n'
280     txt += ' else\n'
281     txt += ' exceed=0\n'
282     txt += ' echo "Total Output dimension $size is fine."\n'
283     txt += ' fi\n'
284    
285     txt += ' echo "JOB_EXIT_STATUS = $job_exit_code"\n'
286     txt += ' echo "JobExitCode=$job_exit_code" >> $RUNTIME_AREA/$repo\n'
287     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
288     txt += ' if [ $exceed -ne 1 ]; then\n'
289     txt += ' tar zcvf ${out_files}.tgz ${final_list}\n'
290     txt += ' else\n'
291     txt += ' tar zcvf ${out_files}.tgz CMSSW_${NJob}.stdout CMSSW_${NJob}.stderr\n'
292     txt += ' fi\n'
293     txt += ' exit $job_exit_code\n'
294     txt += '}\n'
295     return txt
296    
297    
298     def tags(self):
299     sys.stderr.write("python/SchedulerArc.tags\n")
300     return ''
301    
302     def submit(self,list,task):
303     """ submit to scheduler a list of jobs """
304     sys.stderr.write("python/SchedulerArc.submit\n")
305     if (not len(list)):
306     common.logger.message("No sites where to submit jobs")
307     req=str(self.sched_parameter(list[0],task))
308    
309     ### reduce collection size...if needed
310     new_list = bulkControl(self,list)
311    
312     for sub_list in new_list:
313     self.boss().submit(task['id'],sub_list,req)
314     return
315    
316     def queryEverything(self,taskid):
317     """
318     Query needed info of all jobs with specified boss taskid
319     """
320     return self.boss().queryEverything(taskid)
321    
322    
323     def cancel(self,ids):
324     """
325     Cancel the job(s) with ids (a list of id's)
326     """
327     self._boss.cancel(ids)
328     return
329    
330     def decodeLogInfo(self, file):
331     """
332     Parse logging info file and return main info
333     """
334     return
335    
336     def writeJDL(self, list, task):
337     """
338     Materialize JDL for a list of jobs
339     """
340     req=str(self.sched_parameter(list[0],task))
341     new_list = bulkControl(self,list)
342     jdl=[]
343     for sub_list in new_list:
344     tmp_jdl = self.boss().writeJDL(task['id'], sub_list, req)
345     jdl.append(tmp_jdl)
346     return jdl