ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerEdg.py
Revision: 1.24
Committed: Tue Nov 8 14:34:35 2005 UTC (19 years, 5 months ago) by fanzago
Content type: text/x-python
Branch: MAIN
Changes since 1.23: +2 -1 lines
Log Message:
removed self.configOpt from logging_info command

File Contents

# User Rev Content
1 nsmirnov 1.1 from Scheduler import Scheduler
2     from crab_logger import Logger
3     from crab_exceptions import *
4     from crab_util import *
5     import common
6    
7 slacapra 1.18 import os, sys, time
8 nsmirnov 1.1
9     class SchedulerEdg(Scheduler):
10     def __init__(self):
11     Scheduler.__init__(self,"EDG")
12 slacapra 1.8 self.states = [ "Acl", "cancelReason", "cancelling","ce_node","children", \
13     "children_hist","children_num","children_states","condorId","condor_jdl", \
14     "cpuTime","destination", "done_code","exit_code","expectFrom", \
15     "expectUpdate","globusId","jdl","jobId","jobtype", \
16     "lastUpdateTime","localId","location", "matched_jdl","network_server", \
17     "owner","parent_job", "reason","resubmitted","rsl","seed",\
18     "stateEnterTime","stateEnterTimes","subjob_failed", \
19     "user tags" , "status" , "status_code","hierarchy"]
20 nsmirnov 1.1 return
21    
22     def configure(self, cfg_params):
23    
24 fanzago 1.10 try: self.edg_config = cfg_params["EDG.config"]
25 nsmirnov 1.1 except KeyError: self.edg_config = ''
26    
27 fanzago 1.10 try: self.edg_config_vo = cfg_params["EDG.config_vo"]
28 nsmirnov 1.3 except KeyError: self.edg_config_vo = ''
29 nsmirnov 1.1
30     try: self.LCG_version = cfg_params["EDG.lcg_version"]
31     except KeyError: self.LCG_version = '2'
32    
33     try: self.EDG_requirements = cfg_params['EDG.requirements']
34     except KeyError: self.EDG_requirements = ''
35    
36     try: self.EDG_retry_count = cfg_params['EDG.retry_count']
37     except KeyError: self.EDG_retry_count = ''
38    
39 fanzago 1.14 try: self.VO = cfg_params['EDG.virtual_organization']
40     except KeyError: self.VO = 'cms'
41    
42     try: self.return_data = cfg_params['USER.return_data']
43     except KeyError: self.return_data = ''
44    
45     try:
46     self.copy_data = cfg_params["USER.copy_data"]
47     try:
48     self.SE = cfg_params['USER.storage_element']
49     self.SE_PATH = cfg_params['USER.storage_path']
50     except KeyError:
51     msg = "Error. The [USER] section does not have 'storage_element'"
52     msg = msg + " and/or 'storage_path' entries, necessary to copy the output"
53     common.logger.message(msg)
54     raise CrabException(msg)
55     except KeyError: self.copy_data = ''
56    
57     try:
58     self.register_data = cfg_params["USER.register_data"]
59     try:
60     self.LFN = cfg_params['USER.lfn_dir']
61     except KeyError:
62     msg = "Error. The [USER] section does not have 'lfn_dir' value"
63     msg = msg + " it's necessary for RLS registration"
64     common.logger.message(msg)
65     raise CrabException(msg)
66     except KeyError: self.register_data= ''
67    
68     try: self.EDG_requirements = cfg_params['EDG.requirements']
69     except KeyError: self.EDG_requirements = ''
70    
71     try: self.EDG_retry_count = cfg_params['EDG.retry_count']
72     except KeyError: self.EDG_retry_count = ''
73    
74     try: self.EDG_clock_time = cfg_params['EDG.max_wall_clock_time']
75     except KeyError: self.EDG_clock_time= ''
76    
77     try: self.EDG_cpu_time = cfg_params['EDG.max_cpu_time']
78     except KeyError: self.EDG_cpu_time = ''
79    
80 fanzago 1.16 # Add EDG_WL_LOCATION to the python path
81    
82     try:
83     path = os.environ['EDG_WL_LOCATION']
84     except:
85     msg = "Error: the EDG_WL_LOCATION variable is not set."
86     raise CrabException(msg)
87    
88     libPath=os.path.join(path, "lib")
89     sys.path.append(libPath)
90     libPath=os.path.join(path, "lib", "python")
91     sys.path.append(libPath)
92 nsmirnov 1.1
93 slacapra 1.18 self.proxyValid=0
94 nsmirnov 1.1 return
95    
96 fanzago 1.10
97     def sched_parameter(self):
98     """
99     Returns file with scheduler-specific parameters
100     """
101    
102     if (self.edg_config and self.edg_config_vo != ''):
103     self.param='sched_param.clad'
104     param_file = open(common.work_space.shareDir()+'/'+self.param, 'w')
105     param_file.write('RBconfig = "'+self.edg_config+'";\n')
106     param_file.write('RBconfigVO = "'+self.edg_config_vo+'";')
107     param_file.close()
108     return 1
109     else:
110     return 0
111 fanzago 1.13
112 nsmirnov 1.2 def wsSetupEnvironment(self):
113     """
114     Returns part of a job script which does scheduler-specific work.
115     """
116 fanzago 1.14
117     txt = ''
118     if self.copy_data:
119     if self.SE:
120     txt += 'export SE='+self.SE+'\n'
121 fanzago 1.15 txt += 'echo "SE = $SE"\n'
122 fanzago 1.14 if self.SE_PATH:
123     if ( self.SE_PATH[-1] != '/' ) : self.SE_PATH = self.SE_PATH + '/'
124     txt += 'export SE_PATH='+self.SE_PATH+'\n'
125 fanzago 1.15 txt += 'echo "SE_PATH = $SE_PATH"\n'
126 fanzago 1.14
127     if self.register_data:
128     if self.VO:
129     txt += 'export VO='+self.VO+'\n'
130     if self.LFN:
131     txt += 'export LFN='+self.LFN+'\n'
132     txt += '\n'
133 fanzago 1.12 txt += 'CloseCEs=`edg-brokerinfo getCE`\n'
134 nsmirnov 1.2 txt += 'echo "CloseCEs = $CloseCEs"\n'
135     txt += 'CE=`echo $CloseCEs | sed -e "s/:.*//"`\n'
136     txt += 'echo "CE = $CE"\n'
137     return txt
138 fanzago 1.15
139 fanzago 1.14 def wsCopyOutput(self):
140     """
141     Write a CopyResults part of a job script, e.g.
142     to copy produced output into a storage element.
143     """
144     txt = ''
145     if self.copy_data:
146     copy = 'globus-url-copy file://`pwd`/$out_file gsiftp://${SE}${SE_PATH}$out_file'
147     txt += '#\n'
148 fanzago 1.15 txt += '# Copy output to SE = $SE\n'
149 fanzago 1.14 txt += '#\n'
150 fanzago 1.17 #### per orca l'exit_status non e' affidabile.....
151     #txt += 'if [ $executable_exit_status -eq 0 ]; then\n'
152     txt += 'if [ $exe_result -eq 0 ]; then\n'
153 fanzago 1.14 txt += ' for out_file in $file_list ; do\n'
154     txt += ' echo "Trying to copy output file to $SE "\n'
155     txt += ' echo "'+copy+'"\n'
156     txt += ' '+copy+' 2>&1\n'
157     txt += ' copy_exit_status=$?\n'
158 fanzago 1.15 txt += ' echo "COPY_EXIT_STATUS = $copy_exit_status"\n'
159     txt += ' echo "STAGE_OUT = $copy_exit_status"\n'
160 fanzago 1.14 txt += ' if [ $copy_exit_status -ne 0 ]; then \n'
161     txt += ' echo "Problems with SE= $SE" \n'
162     txt += ' else \n'
163     txt += ' echo "output copied into $SE/$SE_PATH directory"\n'
164     txt += ' fi \n'
165     txt += ' done\n'
166     txt += 'fi \n'
167     return txt
168    
169     def wsRegisterOutput(self):
170     """
171     Returns part of a job script which does scheduler-specific work.
172     """
173    
174     txt = ''
175     if self.register_data:
176     txt += '#\n'
177 fanzago 1.15 txt += '# Register output to RLS\n'
178 fanzago 1.14 txt += '#\n'
179 fanzago 1.17 ### analogo
180     #txt += 'if [[ $executable_exit_status -eq 0 && $copy_exit_status -eq 0 ]]; then\n'
181     txt += 'if [[ $exe_result -eq 0 && $copy_exit_status -eq 0 ]]; then\n'
182 fanzago 1.14 txt += ' for out_file in $file_list ; do\n'
183     txt += ' echo "Trying to register the output file into RLS"\n'
184     txt += ' echo "lcg-rf -l $LFN/$out_file --vo $VO sfn://$SE$SE_PATH/$out_file"\n'
185     txt += ' lcg-rf -l $LFN/$out_file --vo $VO sfn://$SE$SE_PATH/$out_file 2>&1 \n'
186     txt += ' register_exit_status=$?\n'
187 fanzago 1.15 txt += ' echo "REGISTER_EXIT_STATUS = $register_exit_status"\n'
188     txt += ' echo "STAGE_OUT = $register_exit_status"\n'
189 fanzago 1.14 txt += ' if [ $register_exit_status -ne 0 ]; then \n'
190 fanzago 1.15 txt += ' echo "Problems with the registration to RLS" \n'
191 fanzago 1.14 txt += ' echo "Try with srm protocol" \n'
192     txt += ' echo "lcg-rf -l $LFN/$out_file --vo $VO srm://$SE$SE_PATH/$out_file"\n'
193     txt += ' lcg-rf -l $LFN/$out_file --vo $VO srm://$SE$SE_PATH/$out_file 2>&1 \n'
194     txt += ' register_exit_status=$?\n'
195 fanzago 1.15 txt += ' echo "REGISTER_EXIT_STATUS = $register_exit_status"\n'
196     txt += ' echo "STAGE_OUT = $register_exit_status"\n'
197 fanzago 1.14 txt += ' if [ $register_exit_status -ne 0 ]; then \n'
198     txt += ' echo "Problems with the registration into RLS" \n'
199     txt += ' fi \n'
200     txt += ' else \n'
201 fanzago 1.15 txt += ' echo "output registered to RLS"\n'
202 fanzago 1.14 txt += ' fi \n'
203     txt += ' done\n'
204 fanzago 1.17 txt += 'elif [[ $exe_result -eq 0 && $copy_exit_status -ne 0 ]]; then \n'
205 fanzago 1.14 txt += ' echo "Trying to copy output file to CloseSE"\n'
206     txt += ' CLOSE_SE=`edg-brokerinfo getCloseSEs | head -1`\n'
207     txt += ' for out_file in $file_list ; do\n'
208     txt += ' echo "lcg-cr -v -l lfn:${LFN}/$out_file -d $SE -P $LFN/$out_file --vo $VO file://`pwd`/$out_file" \n'
209     txt += ' lcg-cr -v -l lfn:${LFN}/$out_file -d $SE -P $LFN/$out_file --vo $VO file://`pwd`/$out_file 2>&1 \n'
210     txt += ' register_exit_status=$?\n'
211 fanzago 1.15 txt += ' echo "REGISTER_EXIT_STATUS = $register_exit_status"\n'
212     txt += ' echo "STAGE_OUT = $register_exit_status"\n'
213 fanzago 1.14 txt += ' if [ $register_exit_status -ne 0 ]; then \n'
214     txt += ' echo "Problems with CloseSE" \n'
215     txt += ' else \n'
216     txt += ' echo "The program was successfully executed"\n'
217 fanzago 1.15 txt += ' echo "SE = $CLOSE_SE"\n'
218     txt += ' echo "LFN for the file is LFN=${LFN}/$out_file"\n'
219 fanzago 1.14 txt += ' fi \n'
220     txt += ' done\n'
221     txt += 'else\n'
222 fanzago 1.15 txt += ' echo "Problem with the executable"\n'
223 fanzago 1.14 txt += 'fi \n'
224     return txt
225     #####################
226 nsmirnov 1.1
227 spiga 1.23 def loggingInfo(self, id):
228 slacapra 1.7 """
229     retrieve the logging info from logging and bookkeeping and return it
230     """
231 slacapra 1.18 self.checkProxy()
232 spiga 1.23 # id = common.jobDB.jobId(nj)
233 fanzago 1.24 cmd = 'edg-job-get-logging-info -v 2 ' + id
234 slacapra 1.22 cmd_out = runCommand(cmd)
235 fanzago 1.24 print cmd_out
236 slacapra 1.7 return cmd_out
237    
238 slacapra 1.6 def listMatch(self, nj):
239     """
240     Check the compatibility of available resources
241     """
242 slacapra 1.18 self.checkProxy()
243 slacapra 1.6 jdl = common.job_list[nj].jdlFilename()
244 slacapra 1.18 cmd = 'edg-job-list-match ' + self.configOpt_() + jdl
245 slacapra 1.22 cmd_out = runCommand(cmd)
246 slacapra 1.6 return self.parseListMatch_(cmd_out, jdl)
247    
248     def parseListMatch_(self, out, jdl):
249     reComment = re.compile( r'^\**$' )
250     reEmptyLine = re.compile( r'^$' )
251     reVO = re.compile( r'Selected Virtual Organisation name.*' )
252 slacapra 1.22 reCE = re.compile( r'CEId.*\n((.*:.*)\n)*' )
253 slacapra 1.6 reNO = re.compile( r'No Computing Element matching' )
254     reRB = re.compile( r'Connecting to host' )
255     next = 0
256     CEs=[]
257     Match=0
258 slacapra 1.22
259     if reNO.match( out ):
260     common.logger.debug(5,out)
261     self.noMatchFound_(jdl)
262     Match=0
263     pass
264     if reVO.match( out ):
265     VO =reVO.match( out ).group()
266     common.logger.debug(5, 'VO :'+VO)
267     pass
268    
269     if reRB.match( out ):
270     RB =reRB.match(out).group()
271     common.logger.debug(5, 'Using RB :'+RB)
272     pass
273    
274     if reCE.search( out ):
275     groups=reCE.search(out).groups()
276     for CE in groups:
277     tmp = string.strip(CE)
278     CEs.append(tmp)
279     common.logger.debug(5, 'Matched CE :'+tmp)
280 slacapra 1.6 Match=Match+1
281 slacapra 1.22 pass
282    
283 slacapra 1.6 return Match
284    
285     def noMatchFound_(self, jdl):
286     reReq = re.compile( r'Requirements' )
287     reString = re.compile( r'"\S*"' )
288     f = file(jdl,'r')
289     for line in f.readlines():
290     line= line.strip()
291     if reReq.match(line):
292     for req in reString.findall(line):
293     if re.search("VO",req):
294     common.logger.message( "SW required: "+req)
295     continue
296     if re.search('"\d+',req):
297     common.logger.message("Other req : "+req)
298     continue
299     common.logger.message( "CE required: "+req)
300     break
301     pass
302     raise CrabException("No compatible resources found!")
303    
304 nsmirnov 1.1 def submit(self, nj):
305 nsmirnov 1.3 """
306     Submit one EDG job.
307     """
308 nsmirnov 1.1
309 slacapra 1.18 self.checkProxy()
310 nsmirnov 1.1 jid = None
311     jdl = common.job_list[nj].jdlFilename()
312 slacapra 1.18
313     cmd = 'edg-job-submit ' + self.configOpt_() + jdl
314 nsmirnov 1.1 cmd_out = runCommand(cmd)
315     if cmd_out != None:
316 slacapra 1.18 reSid = re.compile( r'https.+' )
317 slacapra 1.20 jid = reSid.search(cmd_out).group()
318 nsmirnov 1.3 pass
319 nsmirnov 1.1 return jid
320    
321 slacapra 1.8 def getExitStatus(self, id):
322     return self.getStatusAttribute_(id, 'exit_code')
323 fanzago 1.10
324 nsmirnov 1.1 def queryStatus(self, id):
325 slacapra 1.8 return self.getStatusAttribute_(id, 'status')
326 fanzago 1.10
327 spiga 1.9 def queryDest(self, id):
328     return self.getStatusAttribute_(id, 'destination')
329    
330 slacapra 1.8
331     def getStatusAttribute_(self, id, attr):
332 nsmirnov 1.1 """ Query a status of the job with id """
333 slacapra 1.8
334 slacapra 1.18 self.checkProxy()
335 slacapra 1.8 hstates = {}
336 fanzago 1.12 Status = importName('edg_wl_userinterface_common_LbWrapper', 'Status')
337 slacapra 1.8 # Bypass edg-job-status interfacing directly to C++ API
338     # Job attribute vector to retrieve status without edg-job-status
339     level = 0
340     # Instance of the Status class provided by LB API
341     jobStat = Status()
342     st = 0
343     jobStat.getStatus(id, level)
344     err, apiMsg = jobStat.get_error()
345     if err:
346     print 'Error caught', apiMsg
347     common.log.message(apiMsg)
348 nsmirnov 1.1 return None
349 slacapra 1.8 else:
350     for i in range(len(self.states)):
351     # Fill an hash table with all information retrieved from LB API
352     hstates[ self.states[i] ] = jobStat.loadStatus(st)[i]
353     result = jobStat.loadStatus(st)[ self.states.index(attr) ]
354     return result
355 nsmirnov 1.1
356     def queryDetailedStatus(self, id):
357     """ Query a detailed status of the job with id """
358     cmd = 'edg-job-status '+id
359     cmd_out = runCommand(cmd)
360     return cmd_out
361    
362     def getOutput(self, id):
363 nsmirnov 1.5 """
364     Get output for a finished job with id.
365     Returns the name of directory with results.
366     """
367 slacapra 1.6
368 slacapra 1.18 self.checkProxy()
369 slacapra 1.6 cmd = 'edg-job-get-output --dir ' + common.work_space.resDir() + ' ' + id
370 nsmirnov 1.1 cmd_out = runCommand(cmd)
371 nsmirnov 1.5
372     # Determine the output directory name
373     dir = common.work_space.resDir()
374     dir += os.getlogin()
375     dir += '_' + os.path.basename(id)
376     return dir
377 nsmirnov 1.1
378     def cancel(self, id):
379     """ Cancel the EDG job with id """
380 slacapra 1.18 self.checkProxy()
381 nsmirnov 1.1 cmd = 'edg-job-cancel --noint ' + id
382     cmd_out = runCommand(cmd)
383     return cmd_out
384    
385 fanzago 1.10 def createSchScript(self, nj):
386 nsmirnov 1.1 """
387     Create a JDL-file for EDG.
388     """
389    
390     job = common.job_list[nj]
391     jbt = job.type()
392     inp_sandbox = jbt.inputSandbox(nj)
393     out_sandbox = jbt.outputSandbox(nj)
394 fanzago 1.10 inp_storage_subdir = ''
395 nsmirnov 1.1
396     title = '# This JDL was generated by '+\
397     common.prog_name+' (version '+common.prog_version_str+')\n'
398     jt_string = ''
399 fanzago 1.14
400    
401 nsmirnov 1.1
402     SPL = inp_storage_subdir
403     if ( SPL and SPL[-1] != '/' ) : SPL = SPL + '/'
404    
405     jdl_fname = job.jdlFilename()
406     jdl = open(jdl_fname, 'w')
407     jdl.write(title)
408    
409     script = job.scriptFilename()
410     jdl.write('Executable = "' + os.path.basename(script) +'";\n')
411     jdl.write(jt_string)
412    
413 fanzago 1.14 ### only one .sh JDL has arguments:
414 slacapra 1.8 firstEvent = common.jobDB.firstEvent(nj)
415     maxEvents = common.jobDB.maxEvents(nj)
416     jdl.write('Arguments = "' + str(nj+1)+' '+str(firstEvent)+' '+str(maxEvents)+'";\n')
417    
418 nsmirnov 1.1 inp_box = 'InputSandbox = { '
419     inp_box = inp_box + '"' + script + '",'
420    
421     if inp_sandbox != None:
422     for fl in inp_sandbox:
423     inp_box = inp_box + ' "' + fl + '",'
424     pass
425     pass
426    
427     #if common.use_jam:
428     # inp_box = inp_box+' "'+common.bin_dir+'/'+common.run_jam+'",'
429    
430 slacapra 1.7 for addFile in jbt.additional_inbox_files:
431     addFile = os.path.abspath(addFile)
432     inp_box = inp_box+' "'+addFile+'",'
433     pass
434 nsmirnov 1.1
435     if inp_box[-1] == ',' : inp_box = inp_box[:-1]
436     inp_box = inp_box + ' };\n'
437     jdl.write(inp_box)
438    
439     jdl.write('StdOutput = "' + job.stdout() + '";\n')
440     jdl.write('StdError = "' + job.stderr() + '";\n')
441 fanzago 1.14
442    
443     if job.stdout() == job.stderr():
444     out_box = 'OutputSandbox = { "' + \
445     job.stdout() + '", ".BrokerInfo",'
446     else:
447     out_box = 'OutputSandbox = { "' + \
448     job.stdout() + '", "' + \
449     job.stderr() + '", ".BrokerInfo",'
450    
451     if self.return_data :
452     if out_sandbox != None:
453     for fl in out_sandbox:
454     out_box = out_box + ' "' + fl + '",'
455     pass
456 nsmirnov 1.1 pass
457     pass
458 fanzago 1.14
459 nsmirnov 1.1 if out_box[-1] == ',' : out_box = out_box[:-1]
460     out_box = out_box + ' };'
461     jdl.write(out_box+'\n')
462    
463 fanzago 1.14 ### if at least a CE exists ...
464 nsmirnov 1.1 if common.analisys_common_info['sites']:
465 fanzago 1.14 if common.analisys_common_info['sw_version']:
466     req='Requirements = '
467     req=req + 'Member("VO-cms-' + \
468     common.analisys_common_info['sw_version'] + \
469     '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
470     if len(common.analisys_common_info['sites'])>0:
471     req = req + ' && ('
472     for i in range(len(common.analisys_common_info['sites'])):
473     req = req + 'other.GlueCEInfoHostName == "' \
474     + common.analisys_common_info['sites'][i] + '"'
475     if ( i < (int(len(common.analisys_common_info['sites']) - 1)) ):
476     req = req + ' || '
477     req = req + ')'
478    
479     #### and USER REQUIREMENT
480     if self.EDG_requirements:
481     req = req + ' && ' + self.EDG_requirements
482     if self.EDG_clock_time:
483     req = req + ' && other.GlueCEPolicyMaxWallClockTime>='+self.EDG_clock_time
484     if self.EDG_cpu_time:
485     req = req + ' && other.GlueCEPolicyMaxCPUTime>='+self.EDG_cpu_time
486     req = req + ';\n'
487     jdl.write(req)
488    
489 nsmirnov 1.1 jdl.write('VirtualOrganisation = "' + self.VO + '";\n')
490    
491     if ( self.EDG_retry_count ):
492     jdl.write('RetryCount = '+self.EDG_retry_count+';\n')
493     pass
494    
495     jdl.close()
496     return
497 slacapra 1.18
498     def checkProxy(self):
499     """
500     Function to check the Globus proxy.
501     """
502     if (self.proxyValid): return
503     timeleft = -999
504     minTimeLeft=10 # in hours
505     cmd = 'grid-proxy-info -e -v '+str(minTimeLeft)+':00'
506 slacapra 1.19 try: cmd_out = runCommand(cmd,0)
507     except: print cmd_out
508     if (cmd_out == None or cmd_out=='1'):
509 slacapra 1.18 common.logger.message( "No valid proxy found or timeleft too short!\n Creating a user proxy with default length of 100h\n")
510     cmd = 'grid-proxy-init -valid 100:00'
511     try:
512 slacapra 1.19 out = os.system(cmd)
513     if (out>0): raise CrabException("Unable to create a valid proxy!\n")
514 slacapra 1.18 except:
515     msg = "Unable to create a valid proxy!\n"
516     raise CrabException(msg)
517     cmd = 'grid-proxy-info -timeleft'
518     cmd_out = runCommand(cmd,0)
519 slacapra 1.21 #print cmd_out, time.time()
520 slacapra 1.18 #time.time(cms_out)
521     pass
522     self.proxyValid=1
523     return
524    
525     def configOpt_(self):
526     edg_ui_cfg_opt = ' '
527     if self.edg_config:
528     edg_ui_cfg_opt = ' -c ' + self.edg_config + ' '
529     if self.edg_config_vo:
530     edg_ui_cfg_opt += ' --config-vo ' + self.edg_config_vo + ' '
531     return edg_ui_cfg_opt