ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerGrid.py
Revision: 1.30
Committed: Tue Apr 8 09:31:36 2008 UTC (17 years ago) by fanzago
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_2_0_pre5
Changes since 1.29: +3 -25 lines
Log Message:
fix for data publication

File Contents

# User Rev Content
1 gutsche 1.1 from Scheduler import Scheduler
2     from crab_logger import Logger
3     from crab_exceptions import *
4     from crab_util import *
5 slacapra 1.8 from BlackWhiteListParser import BlackWhiteListParser
6 gutsche 1.1 import common
7 fanzago 1.30 from LFNBaseName import *
8 spiga 1.28 from JobList import JobList
9 gutsche 1.1
10     import os, sys, time
11    
12 slacapra 1.8 #
13     # Base class for all grid scheduler
14     #
15    
16 gutsche 1.1 class SchedulerGrid(Scheduler):
17 slacapra 1.8
18     def __init__(self, name):
19     Scheduler.__init__(self,name)
20 gutsche 1.1 self.states = [ "Acl", "cancelReason", "cancelling","ce_node","children", \
21     "children_hist","children_num","children_states","condorId","condor_jdl", \
22     "cpuTime","destination", "done_code","exit_code","expectFrom", \
23     "expectUpdate","globusId","jdl","jobId","jobtype", \
24     "lastUpdateTime","localId","location", "matched_jdl","network_server", \
25     "owner","parent_job", "reason","resubmitted","rsl","seed",\
26     "stateEnterTime","stateEnterTimes","subjob_failed", \
27     "user tags" , "status" , "status_code","hierarchy"]
28     return
29 slacapra 1.12
30 slacapra 1.8 def configure(self, cfg_params):
31     Scheduler.configure(self,cfg_params)
32    
33     # init BlackWhiteListParser
34     self.blackWhiteListParser = BlackWhiteListParser(cfg_params)
35    
36     self.proxyValid=0
37     self.dontCheckProxy=int(cfg_params.get("EDG.dont_check_proxy",0))
38    
39 spiga 1.22 # self.rb_param_file=None
40     # if (cfg_params.has_key('EDG.rb')):
41     # self.rb_param_file=self.rb_configure(cfg_params.get("EDG.rb"))
42 slacapra 1.8
43     self.proxyServer = cfg_params.get("EDG.proxy_server",'myproxy.cern.ch')
44     common.logger.debug(5,'Setting myproxy server to '+self.proxyServer)
45    
46     self.group = cfg_params.get("EDG.group", None)
47    
48     self.role = cfg_params.get("EDG.role", None)
49    
50     self.EDG_ce_black_list = cfg_params.get('EDG.ce_black_list',None)
51     if (self.EDG_ce_black_list): self.EDG_ce_black_list = string.split(self.EDG_ce_black_list,',')
52    
53     self.EDG_ce_white_list = cfg_params.get('EDG.ce_white_list',None)
54     if (self.EDG_ce_white_list): self.EDG_ce_white_list = string.split(self.EDG_ce_white_list,',')
55    
56     self.VO = cfg_params.get('EDG.virtual_organization','cms')
57    
58     self.copy_input_data = cfg_params.get("USER.copy_input_data",0)
59 gutsche 1.1
60 slacapra 1.8 self.return_data = cfg_params.get('USER.return_data',0)
61 gutsche 1.1
62 slacapra 1.8 self.copy_data = cfg_params.get("USER.copy_data",0)
63     if int(self.copy_data) == 1:
64     self.SE = cfg_params.get('USER.storage_element',None)
65     self.SE_PATH = cfg_params.get('USER.storage_path',None)
66 spiga 1.14 self.srm_ver = cfg_params.get('USER.srm_version',0) ## DS for srmv2
67 slacapra 1.8 if not self.SE or not self.SE_PATH:
68     msg = "Error. The [USER] section does not have 'storage_element'"
69     msg = msg + " and/or 'storage_path' entries, necessary to copy the output"
70     common.logger.message(msg)
71     raise CrabException(msg)
72 gutsche 1.1
73     if ( int(self.return_data) == 0 and int(self.copy_data) == 0 ):
74 slacapra 1.8 msg = 'Error: return_data = 0 and copy_data = 0 ==> your exe output will be lost\n'
75     msg = msg + 'Please modify return_data and copy_data value in your crab.cfg file\n'
76 gutsche 1.1 raise CrabException(msg)
77 fanzago 1.6
78 slacapra 1.8 if ( int(self.return_data) == 1 and int(self.copy_data) == 1 ):
79     msg = 'Error: return_data and copy_data cannot be set both to 1\n'
80     msg = msg + 'Please modify return_data or copy_data value in your crab.cfg file\n'
81     raise CrabException(msg)
82    
83     self.publish_data = cfg_params.get("USER.publish_data",0)
84     if int(self.publish_data) == 1:
85     self.publish_data_name = cfg_params.get('USER.publish_data_name',None)
86     if not self.publish_data_name:
87     msg = "Error. The [USER] section does not have 'publish_data_name'"
88     raise CrabException(msg)
89    
90     ## SL I don't like a direct call to voms-proxy here
91 fanzago 1.15
92     import urllib
93 slacapra 1.8 try:
94 fanzago 1.15 userdn = runCommand("voms-proxy-info -identity")
95     self.userdn = string.strip(userdn)
96 slacapra 1.8 except:
97     msg = "Error. Problem with voms-proxy-info -identity command"
98     raise CrabException(msg)
99 fanzago 1.15 try:
100 afanfani 1.21 sitedburl="https://cmsweb.cern.ch/sitedb/sitedb/json/index/dnUserName"
101 fanzago 1.15 params = urllib.urlencode({'dn': self.userdn })
102     f = urllib.urlopen(sitedburl,params)
103     udata = f.read()
104     userinfo= eval(udata)
105     self.hnUserName = userinfo['user']
106     except:
107     msg = "Error. Problem extracting user name from %s"%sitedburl
108     raise CrabException(msg)
109 ewv 1.24
110 fanzago 1.15 if not self.hnUserName:
111     msg = "Error. There is no user name associated to DN %s in %s.You need to register in SiteDB"%(userdn,sitedburl)
112     raise CrabException(msg)
113 slacapra 1.8
114     if ( int(self.copy_data) == 0 and int(self.publish_data) == 1 ):
115     msg = 'Warning: publish_data = 1 must be used with copy_data = 1\n'
116     msg = msg + 'Please modify copy_data value in your crab.cfg file\n'
117 gutsche 1.1 common.logger.message(msg)
118     raise CrabException(msg)
119 slacapra 1.8
120     self.EDG_requirements = cfg_params.get('EDG.requirements',None)
121    
122     self.EDG_addJdlParam = cfg_params.get('EDG.additional_jdl_parameters',None)
123     if (self.EDG_addJdlParam): self.EDG_addJdlParam = string.split(self.EDG_addJdlParam,';')
124    
125 spiga 1.20 self.EDG_retry_count = cfg_params.get('EDG.retry_count',0)
126 slacapra 1.8
127 spiga 1.20 self.EDG_shallow_retry_count= cfg_params.get('EDG.shallow_retry_count',-1)
128 gutsche 1.1
129 slacapra 1.8 self.EDG_clock_time = cfg_params.get('EDG.max_wall_clock_time',None)
130    
131 afanfani 1.23 # Default minimum CPU time to >= 130 minutes
132     self.EDG_cpu_time = cfg_params.get('EDG.max_cpu_time', '130')
133 gutsche 1.1
134     # Add EDG_WL_LOCATION to the python path
135    
136 slacapra 1.8 if not os.environ.has_key('EDG_WL_LOCATION'):
137 gutsche 1.1 msg = "Error: the EDG_WL_LOCATION variable is not set."
138 ewv 1.27 raise CrabException(msg)
139     path = os.environ['EDG_WL_LOCATION']
140 gutsche 1.1
141 ewv 1.27 libPath=os.path.join(path, "lib")
142     sys.path.append(libPath)
143     libPath=os.path.join(path, "lib", "python")
144     sys.path.append(libPath)
145 ewv 1.24
146     self._taskId = common._db.queryTask('name') ## DS--BL
147 slacapra 1.8 self.jobtypeName = cfg_params.get('CRAB.jobtype','')
148    
149     self.schedulerName = cfg_params.get('CRAB.scheduler','')
150    
151 gutsche 1.1 return
152 slacapra 1.8
153    
154     def rb_configure(self, RB):
155     """
156 ewv 1.16 Return a requirement to be add to Jdl to select a specific RB/WMS:
157 slacapra 1.8 return None if RB=None
158     To be re-implemented in concrete scheduler
159     """
160     return None
161 ewv 1.24
162     def sched_fix_parameter(self):
163 spiga 1.20 return
164 gutsche 1.1
165     def sched_parameter(self):
166     """
167 slacapra 1.8 Returns file with requirements and scheduler-specific parameters
168 gutsche 1.1 """
169 slacapra 1.8 return
170 gutsche 1.1
171     def wsSetupEnvironment(self):
172     """
173     Returns part of a job script which does scheduler-specific work.
174     """
175 spiga 1.28 index = int(common._db.nJobs())
176     job = common.job_list[index-1]
177     jbt = job.type()
178 slacapra 1.8 if not self.environment_unique_identifier:
179     raise CrabException('environment_unique_identifier not set')
180    
181     txt = '# '+self.name()+' specific stuff\n'
182     txt += '# strip arguments\n'
183     txt += 'echo "strip arguments"\n'
184     txt += 'args=("$@")\n'
185     txt += 'nargs=$#\n'
186     txt += 'shift $nargs\n'
187     txt += "# job number (first parameter for job wrapper)\n"
188 ewv 1.16 txt += "NJob=${args[0]}; export NJob\n"
189 slacapra 1.8
190 spiga 1.29 txt += "out_files=out_files_${NJob}; export out_files\n"
191     txt += "echo $out_files\n"
192 spiga 1.28 txt += jbt.outList()
193 spiga 1.29
194 fanzago 1.17 txt += 'MonitorJobID=${NJob}_$'+self.environment_unique_identifier+'\n'
195     txt += 'SyncGridJobId=$'+self.environment_unique_identifier+'\n'
196     txt += 'MonitorID='+self._taskId+'\n'
197     txt += 'echo "MonitorJobID=$MonitorJobID" > $RUNTIME_AREA/$repo \n'
198     txt += 'echo "SyncGridJobId=$SyncGridJobId" >> $RUNTIME_AREA/$repo \n'
199     txt += 'echo "MonitorID=$MonitorID" >> $RUNTIME_AREA/$repo\n'
200 slacapra 1.8
201 fanzago 1.17 txt += 'echo ">>> GridFlavour discovery: " \n'
202 slacapra 1.8 txt += 'if [ $OSG_APP ]; then \n'
203     txt += ' middleware=OSG \n'
204     txt += ' if [ $OSG_JOB_CONTACT ]; then \n'
205     txt += ' SyncCE="$OSG_JOB_CONTACT"; \n'
206 fanzago 1.17 txt += ' echo "SyncCE=$SyncCE" >> $RUNTIME_AREA/$repo ;\n'
207 slacapra 1.8 txt += ' else\n'
208     txt += ' echo "not reporting SyncCE";\n'
209     txt += ' fi\n';
210 fanzago 1.17 txt += ' echo "GridFlavour=$middleware" | tee -a $RUNTIME_AREA/$repo \n'
211 slacapra 1.8 txt += 'elif [ $VO_CMS_SW_DIR ]; then \n'
212 spiga 1.3 txt += ' middleware=LCG \n'
213 fanzago 1.17 txt += ' echo "SyncCE=`glite-brokerinfo getCE`" >> $RUNTIME_AREA/$repo \n'
214     txt += ' echo "GridFlavour=$middleware" | tee -a $RUNTIME_AREA/$repo \n'
215 gutsche 1.1 txt += 'else \n'
216 fanzago 1.17 txt += ' echo "ERROR ==> GridFlavour not identified" \n'
217     txt += ' job_exit_code=10030 \n'
218     txt += ' func_exit \n'
219 slacapra 1.8 txt += 'fi \n'
220    
221     txt += 'dumpStatus $RUNTIME_AREA/$repo \n'
222 gutsche 1.1
223     txt += '\n\n'
224    
225     txt += 'export VO='+self.VO+'\n'
226 slacapra 1.8 txt += 'if [ $middleware == LCG ]; then\n'
227     txt += ' CloseCEs=`glite-brokerinfo getCE`\n'
228 spiga 1.3 txt += ' echo "CloseCEs = $CloseCEs"\n'
229     txt += ' CE=`echo $CloseCEs | sed -e "s/:.*//"`\n'
230     txt += ' echo "CE = $CE"\n'
231 gutsche 1.1 txt += 'elif [ $middleware == OSG ]; then \n'
232 spiga 1.3 txt += ' if [ $OSG_JOB_CONTACT ]; then \n'
233     txt += ' CE=`echo $OSG_JOB_CONTACT | /usr/bin/awk -F\/ \'{print $1}\'` \n'
234     txt += ' else \n'
235 fanzago 1.17 txt += ' echo "ERROR ==> OSG mode in setting CE name from OSG_JOB_CONTACT" \n'
236     txt += ' job_exit_code=10099\n'
237     txt += ' func_exit\n'
238 spiga 1.3 txt += ' fi \n'
239 slacapra 1.8 txt += 'fi \n'
240 gutsche 1.1
241     return txt
242    
243     def wsCopyInput(self):
244     """
245 slacapra 1.8 Copy input data from SE to WN
246 gutsche 1.1 """
247     txt = ''
248 slacapra 1.8 if not self.copy_input_data: return txt
249    
250 gutsche 1.1 ## OLI_Daniele deactivate for OSG (wait for LCG UI installed on OSG)
251 slacapra 1.8 txt += 'if [ $middleware == OSG ]; then\n'
252     txt += ' #\n'
253     txt += ' # Copy Input Data from SE to this WN deactivated in OSG mode\n'
254     txt += ' #\n'
255     txt += ' echo "Copy Input Data from SE to this WN deactivated in OSG mode"\n'
256     txt += 'elif [ $middleware == LCG ]; then \n'
257     txt += ' #\n'
258     txt += ' # Copy Input Data from SE to this WN\n'
259     txt += ' #\n'
260     ### changed by georgia (put a loop copying more than one input files per jobs)
261     txt += ' for input_file in $cur_file_list \n'
262     txt += ' do \n'
263     txt += ' lcg-cp --vo $VO --verbose -t 1200 lfn:$input_lfn/$input_file file:`pwd`/$input_file 2>&1\n'
264     txt += ' copy_input_exit_status=$?\n'
265     txt += ' echo "COPY_INPUT_EXIT_STATUS = $copy_input_exit_status"\n'
266     txt += ' if [ $copy_input_exit_status -ne 0 ]; then \n'
267     txt += ' echo "Problems with copying to WN" \n'
268     txt += ' else \n'
269     txt += ' echo "input copied into WN" \n'
270     txt += ' fi \n'
271     txt += ' done \n'
272     ### copy a set of PU ntuples (same for each jobs -- but accessed randomly)
273     txt += ' for file in $cur_pu_list \n'
274     txt += ' do \n'
275     txt += ' lcg-cp --vo $VO --verbose -t 1200 lfn:$pu_lfn/$file file:`pwd`/$file 2>&1\n'
276     txt += ' copy_input_pu_exit_status=$?\n'
277     txt += ' echo "COPY_INPUT_PU_EXIT_STATUS = $copy_input_pu_exit_status"\n'
278     txt += ' if [ $copy_input_pu_exit_status -ne 0 ]; then \n'
279     txt += ' echo "Problems with copying pu to WN" \n'
280     txt += ' else \n'
281     txt += ' echo "input pu files copied into WN" \n'
282     txt += ' fi \n'
283     txt += ' done \n'
284     txt += ' \n'
285     txt += ' ### Check SCRATCH space available on WN : \n'
286     txt += ' df -h \n'
287     txt += 'fi \n'
288    
289 gutsche 1.1 return txt
290    
291     def wsCopyOutput(self):
292     """
293     Write a CopyResults part of a job script, e.g.
294     to copy produced output into a storage element.
295     """
296 slacapra 1.8 txt = '\n'
297 gutsche 1.1
298 spiga 1.18 txt += '#\n'
299     txt += '# COPY OUTPUT FILE TO SE\n'
300     txt += '#\n\n'
301 gutsche 1.1
302 slacapra 1.8 SE_PATH=''
303     if int(self.copy_data) == 1:
304     if self.SE:
305     txt += 'export SE='+self.SE+'\n'
306     txt += 'echo "SE = $SE"\n'
307     if self.SE_PATH:
308     if ( self.SE_PATH[-1] != '/' ) : self.SE_PATH = self.SE_PATH + '/'
309     SE_PATH=self.SE_PATH
310     if int(self.publish_data) == 1:
311     txt += '### publish_data = 1 so the SE path where to copy the output is: \n'
312 fanzago 1.30 #path_add = self.hnUserName + '/' + self.publish_data_name +'_${PSETHASH}/'
313     path_add = PFNportion(self.publish_data_name) +'_${PSETHASH}/'
314 slacapra 1.8 SE_PATH = SE_PATH + path_add
315     txt += 'export SE_PATH='+SE_PATH+'\n'
316     txt += 'echo "SE_PATH = $SE_PATH"\n'
317    
318 spiga 1.14 txt += 'export SRM_VER='+str(self.srm_ver)+'\n' ## DS for srmVer
319     txt += 'echo "SRM_VER = $SRM_VER"\n' ## DS for srmVer
320    
321 slacapra 1.8 txt += 'echo ">>> Copy output files from WN = `hostname` to SE = $SE :"\n'
322 fanzago 1.17 txt += 'copy_exit_status=0\n'
323     txt += 'for out_file in $file_list ; do\n'
324     txt += ' if [ -e $SOFTWARE_DIR/$out_file ] ; then\n'
325     txt += ' echo "Trying to copy output file $SOFTWARE_DIR/$out_file to $SE"\n'
326 spiga 1.14 txt += ' cmscp $SOFTWARE_DIR/$out_file ${SE} ${SE_PATH} $out_file ${SRM_VER} $middleware\n'
327 fanzago 1.17 txt += ' if [ $cmscp_exit_status -ne 0 ]; then\n'
328 slacapra 1.8 txt += ' echo "Problem copying $out_file to $SE $SE_PATH"\n'
329 fanzago 1.17 txt += ' copy_exit_status=$cmscp_exit_status\n'
330 slacapra 1.8 txt += ' else\n'
331     txt += ' echo "output copied into $SE/$SE_PATH directory"\n'
332     txt += ' fi\n'
333 fanzago 1.17 txt += ' else\n'
334     txt += ' copy_exit_status=60302\n'
335     txt += ' echo "StageOutExitStatus = $copy_exit_status" | tee -a $RUNTIME_AREA/$repo\n'
336     txt += ' echo "StageOutExitStatusReason = file to copy not found" | tee -a $RUNTIME_AREA/$repo\n'
337 slacapra 1.8 txt += ' fi\n'
338 fanzago 1.17 txt += 'done\n'
339     txt += 'if [ $copy_exit_status -ne 0 ]; then\n'
340     txt += ' SE=""\n'
341     txt += ' SE_PATH=""\n'
342     txt += ' job_exit_code=$copy_exit_status\n'
343 slacapra 1.8 txt += 'fi\n'
344     pass
345 gutsche 1.1 return txt
346    
347    
348     def checkProxy(self):
349     """
350     Function to check the Globus proxy.
351     """
352     if (self.proxyValid): return
353 slacapra 1.8
354     ### Just return if asked to do so
355     if (self.dontCheckProxy==1):
356     self.proxyValid=1
357     return
358    
359     minTimeLeft=10*3600 # in seconds
360    
361     minTimeLeftServer = 100 # in hours
362    
363     mustRenew = 0
364     timeLeftLocal = runCommand('voms-proxy-info -timeleft 2>/dev/null')
365     timeLeftServer = -999
366     if not timeLeftLocal or int(timeLeftLocal) <= 0 or not isInt(timeLeftLocal):
367     mustRenew = 1
368     else:
369     timeLeftServer = runCommand('voms-proxy-info -actimeleft 2>/dev/null | head -1')
370     if not timeLeftServer or not isInt(timeLeftServer):
371     mustRenew = 1
372     elif timeLeftLocal<minTimeLeft or timeLeftServer<minTimeLeft:
373     mustRenew = 1
374     pass
375     pass
376    
377     if mustRenew:
378     common.logger.message( "No valid proxy found or remaining time of validity of already existing proxy shorter than 10 hours!\n Creating a user proxy with default length of 192h\n")
379     cmd = 'voms-proxy-init -voms '+self.VO
380     if self.group:
381     cmd += ':/'+self.VO+'/'+self.group
382     if self.role:
383     cmd += '/role='+self.role
384     cmd += ' -valid 192:00'
385 gutsche 1.1 try:
386     # SL as above: damn it!
387 slacapra 1.8 common.logger.debug(10,cmd)
388 gutsche 1.1 out = os.system(cmd)
389     if (out>0): raise CrabException("Unable to create a valid proxy!\n")
390     except:
391     msg = "Unable to create a valid proxy!\n"
392     raise CrabException(msg)
393     pass
394 slacapra 1.8
395     ## now I do have a voms proxy valid, and I check the myproxy server
396     renewProxy = 0
397     cmd = 'myproxy-info -d -s '+self.proxyServer
398     cmd_out = runCommand(cmd,0,20)
399     if not cmd_out:
400     common.logger.message('No credential delegated to myproxy server '+self.proxyServer+' will do now')
401     renewProxy = 1
402     else:
403     ## minimum time: 5 days
404     minTime = 4 * 24 * 3600
405     ## regex to extract the right information
406     myproxyRE = re.compile("timeleft: (?P<hours>[\\d]*):(?P<minutes>[\\d]*):(?P<seconds>[\\d]*)")
407     for row in cmd_out.split("\n"):
408     g = myproxyRE.search(row)
409     if g:
410     hours = g.group("hours")
411     minutes = g.group("minutes")
412     seconds = g.group("seconds")
413     timeleft = int(hours)*3600 + int(minutes)*60 + int(seconds)
414     if timeleft < minTime:
415     renewProxy = 1
416     common.logger.message('Your proxy will expire in:\n\t'+hours+' hours '+minutes+' minutes '+seconds+' seconds\n')
417     common.logger.message('Need to renew it:')
418     pass
419     pass
420     pass
421    
422     # if not, create one.
423     if renewProxy:
424     cmd = 'myproxy-init -d -n -s '+self.proxyServer
425     out = os.system(cmd)
426     if (out>0):
427     raise CrabException("Unable to delegate the proxy to myproxyserver "+self.proxyServer+" !\n")
428     pass
429    
430     # cache proxy validity
431 gutsche 1.1 self.proxyValid=1
432     return
433    
434 slacapra 1.8 def userName(self):
435     """ return the user name """
436     self.checkProxy()
437     tmp=runCommand("voms-proxy-info -identity")
438     return tmp.strip()
439    
440 gutsche 1.1 def configOpt_(self):
441     edg_ui_cfg_opt = ' '
442     if self.edg_config:
443 slacapra 1.8 edg_ui_cfg_opt = ' -c ' + self.edg_config + ' '
444     if self.edg_config_vo:
445     edg_ui_cfg_opt += ' --config-vo ' + self.edg_config_vo + ' '
446 gutsche 1.1 return edg_ui_cfg_opt
447 slacapra 1.8
448     def tOut(self, list):
449     return 120
450    
451