ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerEdg.py
Revision: 1.55
Committed: Wed May 3 08:44:30 2006 UTC (19 years ago) by corvo
Content type: text/x-python
Branch: MAIN
CVS Tags: pre_cmssw_integration_20060527
Changes since 1.54: +1 -1 lines
Log Message:
Substitute os.getlogin with os.environ['USER']

File Contents

# Content
1 from Scheduler import Scheduler
2 from crab_logger import Logger
3 from crab_exceptions import *
4 from crab_util import *
5 from EdgConfig import *
6 import common
7
8 import os, sys, time
9
10 class SchedulerEdg(Scheduler):
11 def __init__(self):
12 Scheduler.__init__(self,"EDG")
13 self.states = [ "Acl", "cancelReason", "cancelling","ce_node","children", \
14 "children_hist","children_num","children_states","condorId","condor_jdl", \
15 "cpuTime","destination", "done_code","exit_code","expectFrom", \
16 "expectUpdate","globusId","jdl","jobId","jobtype", \
17 "lastUpdateTime","localId","location", "matched_jdl","network_server", \
18 "owner","parent_job", "reason","resubmitted","rsl","seed",\
19 "stateEnterTime","stateEnterTimes","subjob_failed", \
20 "user tags" , "status" , "status_code","hierarchy"]
21 return
22
23 def configure(self, cfg_params):
24
25 try:
26 RB = cfg_params["EDG.rb"]
27 edgConfig = EdgConfig(RB)
28 self.edg_config = edgConfig.config()
29 self.edg_config_vo = edgConfig.configVO()
30 except KeyError:
31 self.edg_config = ''
32 self.edg_config_vo = ''
33
34 try:
35 self.proxyServer = cfg_params["EDG.proxy_server"]
36 except KeyError:
37 self.proxyServer = 'myproxy.cern.ch'
38 common.logger.debug(5,'Setting myproxy server to '+self.proxyServer)
39
40 try: self.LCG_version = cfg_params["EDG.lcg_version"]
41 except KeyError: self.LCG_version = '2'
42
43 try: self.EDG_requirements = cfg_params['EDG.requirements']
44 except KeyError: self.EDG_requirements = ''
45
46 try: self.EDG_retry_count = cfg_params['EDG.retry_count']
47 except KeyError: self.EDG_retry_count = ''
48
49 try:
50 self.EDG_ce_black_list = cfg_params['EDG.ce_black_list']
51 #print "self.EDG_ce_black_list = ", self.EDG_ce_black_list
52 except KeyError:
53 self.EDG_ce_black_list = ''
54
55 try:
56 self.EDG_ce_white_list = cfg_params['EDG.ce_white_list']
57 #print "self.EDG_ce_white_list = ", self.EDG_ce_white_list
58 except KeyError: self.EDG_ce_white_list = ''
59
60 try: self.VO = cfg_params['EDG.virtual_organization']
61 except KeyError: self.VO = 'cms'
62
63 try: self.return_data = cfg_params['USER.return_data']
64 except KeyError: self.return_data = 1
65
66 try:
67 self.copy_input_data = common.analisys_common_info['copy_input_data']
68 #print "self.copy_input_data = ", self.copy_input_data
69 except KeyError: self.copy_input_data = 0
70
71 try:
72 self.copy_data = cfg_params["USER.copy_data"]
73 if int(self.copy_data) == 1:
74 try:
75 self.SE = cfg_params['USER.storage_element']
76 self.SE_PATH = cfg_params['USER.storage_path']
77 except KeyError:
78 msg = "Error. The [USER] section does not have 'storage_element'"
79 msg = msg + " and/or 'storage_path' entries, necessary to copy the output"
80 common.logger.message(msg)
81 raise CrabException(msg)
82 except KeyError: self.copy_data = 0
83
84 if ( int(self.return_data) == 0 and int(self.copy_data) == 0 ):
85 msg = 'Warning: return_data = 0 and copy_data = 0 ==> your exe output will be lost\n'
86 msg = msg + 'Please modify return_data and copy_data value in your crab.cfg file\n'
87 raise CrabException(msg)
88
89 try:
90 self.lfc_host = cfg_params['EDG.lfc_host']
91 except KeyError:
92 msg = "Error. The [EDG] section does not have 'lfc_host' value"
93 msg = msg + " it's necessary to know the LFC host name"
94 common.logger.message(msg)
95 raise CrabException(msg)
96 try:
97 self.lcg_catalog_type = cfg_params['EDG.lcg_catalog_type']
98 except KeyError:
99 msg = "Error. The [EDG] section does not have 'lcg_catalog_type' value"
100 msg = msg + " it's necessary to know the catalog type"
101 common.logger.message(msg)
102 raise CrabException(msg)
103 try:
104 self.lfc_home = cfg_params['EDG.lfc_home']
105 except KeyError:
106 msg = "Error. The [EDG] section does not have 'lfc_home' value"
107 msg = msg + " it's necessary to know the home catalog dir"
108 common.logger.message(msg)
109 raise CrabException(msg)
110
111 try:
112 self.register_data = cfg_params["USER.register_data"]
113 if int(self.register_data) == 1:
114 try:
115 self.LFN = cfg_params['USER.lfn_dir']
116 except KeyError:
117 msg = "Error. The [USER] section does not have 'lfn_dir' value"
118 msg = msg + " it's necessary for LCF registration"
119 common.logger.message(msg)
120 raise CrabException(msg)
121 except KeyError: self.register_data = 0
122
123 if ( int(self.copy_data) == 0 and int(self.register_data) == 1 ):
124 msg = 'Warning: register_data = 1 must be used with copy_data = 1\n'
125 msg = msg + 'Please modify copy_data value in your crab.cfg file\n'
126 common.logger.message(msg)
127 raise CrabException(msg)
128
129 try: self.EDG_requirements = cfg_params['EDG.requirements']
130 except KeyError: self.EDG_requirements = ''
131
132 try: self.EDG_retry_count = cfg_params['EDG.retry_count']
133 except KeyError: self.EDG_retry_count = ''
134
135 try: self.EDG_clock_time = cfg_params['EDG.max_wall_clock_time']
136 except KeyError: self.EDG_clock_time= ''
137
138 try: self.EDG_cpu_time = cfg_params['EDG.max_cpu_time']
139 except KeyError: self.EDG_cpu_time = ''
140
141 # Add EDG_WL_LOCATION to the python path
142
143 try:
144 path = os.environ['EDG_WL_LOCATION']
145 except:
146 msg = "Error: the EDG_WL_LOCATION variable is not set."
147 raise CrabException(msg)
148
149 libPath=os.path.join(path, "lib")
150 sys.path.append(libPath)
151 libPath=os.path.join(path, "lib", "python")
152 sys.path.append(libPath)
153
154 self.proxyValid=0
155 return
156
157
158 def sched_parameter(self):
159 """
160 Returns file with scheduler-specific parameters
161 """
162
163 if (self.edg_config and self.edg_config_vo != ''):
164 self.param='sched_param.clad'
165 param_file = open(common.work_space.shareDir()+'/'+self.param, 'w')
166 param_file.write('RBconfig = "'+self.edg_config+'";\n')
167 param_file.write('RBconfigVO = "'+self.edg_config_vo+'";')
168 param_file.close()
169 return 1
170 else:
171 return 0
172
173 def wsSetupEnvironment(self):
174 """
175 Returns part of a job script which does scheduler-specific work.
176 """
177 txt = ''
178 txt += 'echo "middleware discovery " \n'
179 txt += 'if [ $VO_CMS_SW_DIR ]; then\n'
180 txt += ' middleware=LCG \n'
181 txt += ' echo "middleware =$middleware" \n'
182 txt += 'elif [ $GRID3_APP_DIR ]; then\n'
183 txt += ' middleware=OSG \n'
184 txt += ' echo "middleware =$middleware" \n'
185 txt += 'elif [ $OSG_APP ]; then \n'
186 txt += ' middleware=OSG \n'
187 txt += ' echo "middleware =$middleware" \n'
188 txt += 'else \n'
189 txt += ' echo "SET_CMS_ENV 1 ==> middleware not identified" \n'
190 txt += ' echo "JOB_EXIT_STATUS = 1"\n'
191 txt += ' exit 1\n'
192 txt += 'fi\n'
193
194 txt += '\n\n'
195
196 txt += 'if [ $middleware == LCG ]; then \n'
197 txt += ' echo "SyncGridJobId=`echo $EDG_WL_JOBID`" | tee -a $RUNTIME_AREA/$repo\n'
198 txt += 'fi\n'
199
200 if int(self.copy_data) == 1:
201 if self.SE:
202 txt += 'export SE='+self.SE+'\n'
203 txt += 'echo "SE = $SE"\n'
204 if self.SE_PATH:
205 if ( self.SE_PATH[-1] != '/' ) : self.SE_PATH = self.SE_PATH + '/'
206 txt += 'export SE_PATH='+self.SE_PATH+'\n'
207 txt += 'echo "SE_PATH = $SE_PATH"\n'
208
209 txt += 'export VO='+self.VO+'\n'
210 ### FEDE: add some line for LFC catalog setting
211 txt += 'if [ $middleware == LCG ]; then \n'
212 txt += ' if [[ $LCG_CATALOG_TYPE != \''+self.lcg_catalog_type+'\' ]]; then\n'
213 txt += ' export LCG_CATALOG_TYPE='+self.lcg_catalog_type+'\n'
214 txt += ' fi\n'
215 txt += ' if [[ $LFC_HOST != \''+self.lfc_host+'\' ]]; then\n'
216 txt += ' export LFC_HOST='+self.lfc_host+'\n'
217 txt += ' fi\n'
218 txt += ' if [[ $LFC_HOME != \''+self.lfc_home+'\' ]]; then\n'
219 txt += ' export LFC_HOME='+self.lfc_home+'\n'
220 txt += ' fi\n'
221 txt += 'elif [ $middleware == OSG ]; then\n'
222 txt += ' echo "LFC catalog setting to be implemented for OSG"\n'
223 txt += 'fi\n'
224 #####
225 if int(self.register_data) == 1:
226 txt += 'if [ $middleware == LCG ]; then \n'
227 txt += ' export LFN='+self.LFN+'\n'
228 txt += ' lfc-ls $LFN\n'
229 txt += ' result=$?\n'
230 txt += ' echo $result\n'
231 ### creation of LFN dir in LFC catalog, under /grid/cms dir
232 txt += ' if [ $result != 0 ]; then\n'
233 txt += ' lfc-mkdir $LFN\n'
234 txt += ' result=$?\n'
235 txt += ' echo $result\n'
236 txt += ' fi\n'
237 txt += 'elif [ $middleware == OSG ]; then\n'
238 txt += ' echo " Files registration to be implemented for OSG"\n'
239 txt += 'fi\n'
240 txt += '\n'
241
242 if self.VO:
243 txt += 'export VO='+self.VO+'\n'
244 if self.LFN:
245 txt += 'if [ $middleware == LCG ]; then \n'
246 txt += ' export LFN='+self.LFN+'\n'
247 txt += 'fi\n'
248 txt += '\n'
249
250 txt += 'if [ $middleware == LCG ]; then\n'
251 txt += ' CloseCEs=`edg-brokerinfo getCE`\n'
252 txt += ' echo "CloseCEs = $CloseCEs"\n'
253 txt += ' CE=`echo $CloseCEs | sed -e "s/:.*//"`\n'
254 txt += ' echo "CE = $CE"\n'
255 txt += 'elif [ $middleware == OSG ]; then \n'
256 txt += ' if [ $OSG_JOB_CONTACT ]; then \n'
257 txt += ' CE=`echo $OSG_JOB_CONTACT | /usr/bin/awk -F\/ \'{print $1}\'` \n'
258 txt += ' else \n'
259 txt += ' echo "SET_ENV 1 ==> ERROR in setting CE name - OSG mode -" \n'
260 txt += ' exit 1 \n'
261 txt += ' fi \n'
262 txt += 'fi \n'
263
264 return txt
265
266 def wsCopyInput(self):
267 """
268 Copy input data from SE to WN
269 """
270 txt = ''
271 try:
272 self.copy_input_data = common.analisys_common_info['copy_input_data']
273 #print "self.copy_input_data = ", self.copy_input_data
274 except KeyError: self.copy_input_data = 0
275 if int(self.copy_input_data) == 1:
276 ## OLI_Daniele deactivate for OSG (wait for LCG UI installed on OSG)
277 txt += 'if [ $middleware == OSG ]; then\n'
278 txt += ' #\n'
279 txt += ' # Copy Input Data from SE to this WN deactivated in OSG mode\n'
280 txt += ' #\n'
281 txt += ' echo "Copy Input Data from SE to this WN deactivated in OSG mode"\n'
282 txt += 'elif [ $middleware == LCG ]; then \n'
283 txt += ' #\n'
284 txt += ' # Copy Input Data from SE to this WN\n'
285 txt += ' #\n'
286 ### changed by georgia (put a loop copying more than one input files per jobs)
287 txt += ' for input_file in $cur_file_list \n'
288 txt += ' do \n'
289 txt += ' lcg-cp --vo $VO lfn:$input_lfn/$input_file file:`pwd`/$input_file 2>&1\n'
290 txt += ' copy_input_exit_status=$?\n'
291 txt += ' echo "COPY_INPUT_EXIT_STATUS = $copy_input_exit_status"\n'
292 txt += ' if [ $copy_input_exit_status -ne 0 ]; then \n'
293 txt += ' echo "Problems with copying to WN" \n'
294 txt += ' else \n'
295 txt += ' echo "input copied into WN" \n'
296 txt += ' fi \n'
297 txt += ' done \n'
298 ### copy a set of PU ntuples (same for each jobs -- but accessed randomly)
299 txt += ' for file in $cur_pu_list \n'
300 txt += ' do \n'
301 txt += ' lcg-cp --vo $VO lfn:$pu_lfn/$file file:`pwd`/$file 2>&1\n'
302 txt += ' copy_input_exit_status=$?\n'
303 txt += ' echo "COPY_INPUT_PU_EXIT_STATUS = $copy_input_pu_exit_status"\n'
304 txt += ' if [ $copy_input_pu_exit_status -ne 0 ]; then \n'
305 txt += ' echo "Problems with copying pu to WN" \n'
306 txt += ' else \n'
307 txt += ' echo "input pu files copied into WN" \n'
308 txt += ' fi \n'
309 txt += ' done \n'
310 txt += ' \n'
311 txt += ' ### Check SCRATCH space available on WN : \n'
312 txt += ' df -h \n'
313 txt += 'fi \n'
314
315 return txt
316
317 def wsCopyOutput(self):
318 """
319 Write a CopyResults part of a job script, e.g.
320 to copy produced output into a storage element.
321 """
322 txt = ''
323 if int(self.copy_data) == 1:
324 txt += '#\n'
325 txt += '# Copy output to SE = $SE\n'
326 txt += '#\n'
327 txt += 'if [ $exe_result -eq 0 ]; then\n'
328 txt += ' for out_file in $file_list ; do\n'
329 txt += ' echo "Trying to copy output file to $SE "\n'
330 ## OLI_Daniele globus-* for OSG, lcg-* for LCG
331 txt += ' if [ $middleware == OSG ]; then\n'
332 txt += ' echo "globus-url-copy file://`pwd`/$out_file gsiftp://${SE}${SE_PATH}$out_file"\n'
333 txt += ' copy_exit_status=`globus-url-copy file://\`pwd\`/$out_file gsiftp://${SE}${SE_PATH}$out_file 2>&1`\n'
334 #txt += ' exitstring=`globus-url-copy file://\`pwd\`/$out_file gsiftp://${SE}${SE_PATH}$out_file 2>&1`\n'
335 txt += ' elif [ $middleware == LCG ]; then \n'
336 txt += ' echo "lcg-cp --vo cms -t 1200 file://`pwd`/$out_file gsiftp://${SE}${SE_PATH}$out_file"\n'
337 txt += ' copy_exit_status=`lcg-cp --vo cms -t 1200 file://\`pwd\`/$out_file gsiftp://${SE}${SE_PATH}$out_file 2>&1`\n'
338 #txt += ' exitstring=`lcg-cp --vo cms -t 30 file://\`pwd\`/$out_file gsiftp://${SE}${SE_PATH}$out_file 2>&1`\n'
339 txt += ' fi \n'
340 #txt += ' copy_exit_status=$?\n'
341 txt += ' echo "COPY_EXIT_STATUS = $copy_exit_status"\n'
342 txt += ' echo "STAGE_OUT = $copy_exit_status"\n'
343 txt += ' if [ $copy_exit_status -ne 0 ]; then\n'
344 txt += ' echo "Problems with SE = $SE"\n'
345 txt += ' echo "StageOutExitStatus = 198" | tee -a $RUNTIME_AREA/$repo\n'
346 txt += ' echo "StageOutExitStatusReason = $exitstring" | tee -a $RUNTIME_AREA/$repo\n'
347 txt += ' else\n'
348 txt += ' echo "StageOutSE = $SE" | tee -a $RUNTIME_AREA/$repo\n'
349 txt += ' echo "StageOutCatalog = " | tee -a $RUNTIME_AREA/$repo\n'
350 txt += ' echo "output copied into $SE/$SE_PATH directory"\n'
351 txt += ' echo "StageOutExitStatus = 0" | tee -a $RUNTIME_AREA/$repo\n'
352 txt += ' fi\n'
353 txt += ' done\n'
354 txt += 'fi\n'
355 return txt
356
357 def wsRegisterOutput(self):
358 """
359 Returns part of a job script which does scheduler-specific work.
360 """
361
362 txt = ''
363 if int(self.register_data) == 1:
364 ## OLI_Daniele deactivate for OSG (wait for LCG UI installed on OSG)
365 txt += 'if [ $middleware == OSG ]; then\n'
366 txt += ' #\n'
367 txt += ' # Register output to LFC deactivated in OSG mode\n'
368 txt += ' #\n'
369 txt += ' echo "Register output to LFC deactivated in OSG mode"\n'
370 txt += 'elif [ $middleware == LCG ]; then \n'
371 txt += '#\n'
372 txt += '# Register output to LFC\n'
373 txt += '#\n'
374 txt += ' if [[ $exe_result -eq 0 && $copy_exit_status -eq 0 ]]; then\n'
375 txt += ' for out_file in $file_list ; do\n'
376 txt += ' echo "Trying to register the output file into LFC"\n'
377 txt += ' echo "lcg-rf -l $LFN/$out_file --vo $VO sfn://$SE$SE_PATH/$out_file"\n'
378 txt += ' lcg-rf -l $LFN/$out_file --vo $VO sfn://$SE$SE_PATH/$out_file 2>&1 \n'
379 txt += ' register_exit_status=$?\n'
380 txt += ' echo "REGISTER_EXIT_STATUS = $register_exit_status"\n'
381 txt += ' echo "STAGE_OUT = $register_exit_status"\n'
382 txt += ' if [ $register_exit_status -ne 0 ]; then \n'
383 txt += ' echo "Problems with the registration to LFC" \n'
384 txt += ' echo "Try with srm protocol" \n'
385 txt += ' echo "lcg-rf -l $LFN/$out_file --vo $VO srm://$SE$SE_PATH/$out_file"\n'
386 txt += ' lcg-rf -l $LFN/$out_file --vo $VO srm://$SE$SE_PATH/$out_file 2>&1 \n'
387 txt += ' register_exit_status=$?\n'
388 txt += ' echo "REGISTER_EXIT_STATUS = $register_exit_status"\n'
389 txt += ' echo "STAGE_OUT = $register_exit_status"\n'
390 txt += ' if [ $register_exit_status -ne 0 ]; then \n'
391 txt += ' echo "Problems with the registration into LFC" \n'
392 txt += ' fi \n'
393 txt += ' else \n'
394 txt += ' echo "output registered to LFC"\n'
395 txt += ' fi \n'
396 txt += ' echo "StageOutExitStatus = $register_exit_status" | tee -a $RUNTIME_AREA/$repo\n'
397 txt += ' done\n'
398 txt += ' elif [[ $exe_result -eq 0 && $copy_exit_status -ne 0 ]]; then \n'
399 txt += ' echo "Trying to copy output file to CloseSE"\n'
400 txt += ' CLOSE_SE=`edg-brokerinfo getCloseSEs | head -1`\n'
401 txt += ' for out_file in $file_list ; do\n'
402 txt += ' echo "lcg-cr -v -l lfn:${LFN}/$out_file -d $CLOSE_SE -P $LFN/$out_file --vo $VO file://`pwd`/$out_file" \n'
403 txt += ' lcg-cr -v -l lfn:${LFN}/$out_file -d $CLOSE_SE -P $LFN/$out_file --vo $VO file://`pwd`/$out_file 2>&1 \n'
404 txt += ' register_exit_status=$?\n'
405 txt += ' echo "REGISTER_EXIT_STATUS = $register_exit_status"\n'
406 txt += ' echo "STAGE_OUT = $register_exit_status"\n'
407 txt += ' if [ $register_exit_status -ne 0 ]; then \n'
408 txt += ' echo "Problems with CloseSE" \n'
409 txt += ' else \n'
410 txt += ' echo "The program was successfully executed"\n'
411 txt += ' echo "SE = $CLOSE_SE"\n'
412 txt += ' echo "LFN for the file is LFN=${LFN}/$out_file"\n'
413 txt += ' fi \n'
414 txt += ' echo "StageOutExitStatus = $register_exit_status" | tee -a $RUNTIME_AREA/$repo\n'
415 txt += ' done\n'
416 txt += ' else\n'
417 txt += ' echo "Problem with the executable"\n'
418 txt += ' fi \n'
419 txt += 'fi \n'
420 return txt
421
422 def loggingInfo(self, id):
423 """
424 retrieve the logging info from logging and bookkeeping and return it
425 """
426 self.checkProxy()
427 cmd = 'edg-job-get-logging-info -v 2 ' + id
428 #cmd_out = os.popen(cmd)
429 cmd_out = runCommand(cmd)
430 return cmd_out
431
432 def listMatch(self, nj):
433 """
434 Check the compatibility of available resources
435 """
436 self.checkProxy()
437 jdl = common.job_list[nj].jdlFilename()
438 cmd = 'edg-job-list-match ' + self.configOpt_() + jdl
439 cmd_out = runCommand(cmd,0,10)
440 if not cmd_out:
441 raise CrabException("ERROR: "+cmd+" failed!")
442
443 return self.parseListMatch_(cmd_out, jdl)
444
445 def parseListMatch_(self, out, jdl):
446 """
447 Parse the f* output of edg-list-match and produce something sensible
448 """
449 reComment = re.compile( r'^\**$' )
450 reEmptyLine = re.compile( r'^$' )
451 reVO = re.compile( r'Selected Virtual Organisation name.*' )
452 reLine = re.compile( r'.*')
453 reCE = re.compile( r'(.*:.*)')
454 reCEId = re.compile( r'CEId.*')
455 reNO = re.compile( r'No Computing Element matching' )
456 reRB = re.compile( r'Connecting to host' )
457 next = 0
458 CEs=[]
459 Match=0
460
461 #print out
462 lines = reLine.findall(out)
463
464 i=0
465 CEs=[]
466 for line in lines:
467 string.strip(line)
468 #print line
469 if reNO.match( line ):
470 common.logger.debug(5,line)
471 return 0
472 pass
473 if reVO.match( line ):
474 VO =reVO.match( line ).group()
475 common.logger.debug(5,"VO "+VO)
476 pass
477
478 if reRB.match( line ):
479 RB = reRB.match(line).group()
480 common.logger.debug(5,"RB "+RB)
481 pass
482
483 if reCEId.search( line ):
484 for lineCE in lines[i:-1]:
485 if reCE.match( lineCE ):
486 CE = string.strip(reCE.search(lineCE).group(1))
487 CEs.append(CE.split(':')[0])
488 pass
489 pass
490 pass
491 i=i+1
492 pass
493
494 common.logger.debug(5,"All CE :"+str(CEs))
495
496 sites = []
497 [sites.append(it) for it in CEs if not sites.count(it)]
498
499 common.logger.debug(5,"All Sites :"+str(sites))
500 return len(sites)
501
502 def noMatchFound_(self, jdl):
503 reReq = re.compile( r'Requirements' )
504 reString = re.compile( r'"\S*"' )
505 f = file(jdl,'r')
506 for line in f.readlines():
507 line= line.strip()
508 if reReq.match(line):
509 for req in reString.findall(line):
510 if re.search("VO",req):
511 common.logger.message( "SW required: "+req)
512 continue
513 if re.search('"\d+',req):
514 common.logger.message("Other req : "+req)
515 continue
516 common.logger.message( "CE required: "+req)
517 break
518 pass
519 raise CrabException("No compatible resources found!")
520
521 def submit(self, nj):
522 """
523 Submit one EDG job.
524 """
525
526 self.checkProxy()
527 jid = None
528 jdl = common.job_list[nj].jdlFilename()
529
530 cmd = 'edg-job-submit ' + self.configOpt_() + jdl
531 cmd_out = runCommand(cmd)
532 if cmd_out != None:
533 reSid = re.compile( r'https.+' )
534 jid = reSid.search(cmd_out).group()
535 pass
536 return jid
537
538 def resubmit(self, nj_list):
539 """
540 Prepare jobs to be submit
541 """
542 return
543
544 def getExitStatus(self, id):
545 return self.getStatusAttribute_(id, 'exit_code')
546
547 def queryStatus(self, id):
548 return self.getStatusAttribute_(id, 'status')
549
550 def queryDest(self, id):
551 return self.getStatusAttribute_(id, 'destination')
552
553
554 def getStatusAttribute_(self, id, attr):
555 """ Query a status of the job with id """
556
557 self.checkProxy()
558 hstates = {}
559 Status = importName('edg_wl_userinterface_common_LbWrapper', 'Status')
560 # Bypass edg-job-status interfacing directly to C++ API
561 # Job attribute vector to retrieve status without edg-job-status
562 level = 0
563 # Instance of the Status class provided by LB API
564 jobStat = Status()
565 st = 0
566 jobStat.getStatus(id, level)
567 err, apiMsg = jobStat.get_error()
568 if err:
569 common.logger.debug(5,'Error caught' + apiMsg)
570 return None
571 else:
572 for i in range(len(self.states)):
573 # Fill an hash table with all information retrieved from LB API
574 hstates[ self.states[i] ] = jobStat.loadStatus(st)[i]
575 result = jobStat.loadStatus(st)[ self.states.index(attr) ]
576 return result
577
578 def queryDetailedStatus(self, id):
579 """ Query a detailed status of the job with id """
580 cmd = 'edg-job-status '+id
581 cmd_out = runCommand(cmd)
582 return cmd_out
583
584 def getOutput(self, id):
585 """
586 Get output for a finished job with id.
587 Returns the name of directory with results.
588 """
589
590 self.checkProxy()
591 cmd = 'edg-job-get-output --dir ' + common.work_space.resDir() + ' ' + id
592 cmd_out = runCommand(cmd)
593
594 # Determine the output directory name
595 dir = common.work_space.resDir()
596 dir += os.environ['USER']
597 dir += '_' + os.path.basename(id)
598 return dir
599
600 def cancel(self, id):
601 """ Cancel the EDG job with id """
602 self.checkProxy()
603 cmd = 'edg-job-cancel --noint ' + id
604 cmd_out = runCommand(cmd)
605 return cmd_out
606
607 def createSchScript(self, nj):
608 """
609 Create a JDL-file for EDG.
610 """
611
612 job = common.job_list[nj]
613 jbt = job.type()
614 inp_sandbox = jbt.inputSandbox(nj)
615 out_sandbox = jbt.outputSandbox(nj)
616 inp_storage_subdir = ''
617
618 title = '# This JDL was generated by '+\
619 common.prog_name+' (version '+common.prog_version_str+')\n'
620 jt_string = ''
621
622
623
624 SPL = inp_storage_subdir
625 if ( SPL and SPL[-1] != '/' ) : SPL = SPL + '/'
626
627 jdl_fname = job.jdlFilename()
628 jdl = open(jdl_fname, 'w')
629 jdl.write(title)
630
631 script = job.scriptFilename()
632 jdl.write('Executable = "' + os.path.basename(script) +'";\n')
633 jdl.write(jt_string)
634
635 ### only one .sh JDL has arguments:
636 firstEvent = common.jobDB.firstEvent(nj)
637 maxEvents = common.jobDB.maxEvents(nj)
638 jdl.write('Arguments = "' + str(nj+1)+' '+str(firstEvent)+' '+str(maxEvents)+'";\n')
639
640 inp_box = 'InputSandbox = { '
641 inp_box = inp_box + '"' + script + '",'
642
643 if inp_sandbox != None:
644 for fl in inp_sandbox:
645 inp_box = inp_box + ' "' + fl + '",'
646 pass
647 pass
648
649 #if common.use_jam:
650 # inp_box = inp_box+' "'+common.bin_dir+'/'+common.run_jam+'",'
651
652 # Marco (VERY TEMPORARY ML STUFF)
653 inp_box = inp_box+' "' + os.path.abspath(os.environ['CRABDIR']+'/python/'+'report.py') + '", "' +\
654 os.path.abspath(os.environ['CRABDIR']+'/python/'+'DashboardAPI.py') + '", "'+\
655 os.path.abspath(os.environ['CRABDIR']+'/python/'+'Logger.py') + '", "'+\
656 os.path.abspath(os.environ['CRABDIR']+'/python/'+'ProcInfo.py') + '", "'+\
657 os.path.abspath(os.environ['CRABDIR']+'/python/'+'apmon.py') + '"'
658 # End Marco
659
660 if (not jbt.additional_inbox_files == []):
661 inp_box = inp_box + ', '
662 for addFile in jbt.additional_inbox_files:
663 addFile = os.path.abspath(addFile)
664 inp_box = inp_box+' "'+addFile+'",'
665 pass
666
667 if inp_box[-1] == ',' : inp_box = inp_box[:-1]
668 inp_box = inp_box + ' };\n'
669 jdl.write(inp_box)
670
671 jdl.write('StdOutput = "' + job.stdout() + '";\n')
672 jdl.write('StdError = "' + job.stderr() + '";\n')
673
674
675 if job.stdout() == job.stderr():
676 out_box = 'OutputSandbox = { "' + \
677 job.stdout() + '", ".BrokerInfo",'
678 else:
679 out_box = 'OutputSandbox = { "' + \
680 job.stdout() + '", "' + \
681 job.stderr() + '", ".BrokerInfo",'
682
683 if int(self.return_data) == 1:
684 if out_sandbox != None:
685 for fl in out_sandbox:
686 out_box = out_box + ' "' + fl + '",'
687 pass
688 pass
689 pass
690
691 if out_box[-1] == ',' : out_box = out_box[:-1]
692 out_box = out_box + ' };'
693 jdl.write(out_box+'\n')
694
695
696 req='Requirements = '
697 req = req + jbt.getRequirements()
698 # ### if at least a CE exists ...
699 # if common.analisys_common_info['sites']:
700 # if common.analisys_common_info['sw_version']:
701 # req='Requirements = '
702 # req=req + 'Member("VO-cms-' + \
703 # common.analisys_common_info['sw_version'] + \
704 # '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
705 # if len(common.analisys_common_info['sites'])>0:
706 # req = req + ' && ('
707 # for i in range(len(common.analisys_common_info['sites'])):
708 # req = req + 'other.GlueCEInfoHostName == "' \
709 # + common.analisys_common_info['sites'][i] + '"'
710 # if ( i < (int(len(common.analisys_common_info['sites']) - 1)) ):
711 # req = req + ' || '
712 # req = req + ')'
713 #### and USER REQUIREMENT
714 if self.EDG_requirements:
715 if (req == 'Requirement = '):
716 req = req + self.EDG_requirements
717 else:
718 req = req + ' && ' + self.EDG_requirements
719 #### FEDE #####
720 if self.EDG_ce_white_list:
721 ce_white_list = string.split(self.EDG_ce_white_list,',')
722 #print "req = ", req
723 for i in range(len(ce_white_list)):
724 if i == 0:
725 if (req == 'Requirement = '):
726 req = req + '((RegExp("' + ce_white_list[i] + '", other.GlueCEUniqueId))'
727 else:
728 req = req + ' && ((RegExp("' + ce_white_list[i] + '", other.GlueCEUniqueId))'
729 pass
730 else:
731 req = req + ' || (RegExp("' + ce_white_list[i] + '", other.GlueCEUniqueId))'
732 req = req + ')'
733
734 if self.EDG_ce_black_list:
735 ce_black_list = string.split(self.EDG_ce_black_list,',')
736 for ce in ce_black_list:
737 if (req == 'Requirement = '):
738 req = req + '(!RegExp("' + ce + '", other.GlueCEUniqueId))'
739 else:
740 req = req + ' && (!RegExp("' + ce + '", other.GlueCEUniqueId))'
741 pass
742 ###############
743 if self.EDG_clock_time:
744 if (req == 'Requirement = '):
745 req = req + 'other.GlueCEPolicyMaxWallClockTime>='+self.EDG_clock_time
746 else:
747 req = req + ' && other.GlueCEPolicyMaxWallClockTime>='+self.EDG_clock_time
748
749 if self.EDG_cpu_time:
750 if (req == 'Requirement = '):
751 req = req + ' other.GlueCEPolicyMaxCPUTime>='+self.EDG_cpu_time
752 else:
753 req = req + ' && other.GlueCEPolicyMaxCPUTime>='+self.EDG_cpu_time
754 if (req != 'Requirement = '):
755 req = req + ';\n'
756 jdl.write(req)
757
758 jdl.write('VirtualOrganisation = "' + self.VO + '";\n')
759
760 if ( self.EDG_retry_count ):
761 jdl.write('RetryCount = '+self.EDG_retry_count+';\n')
762 pass
763
764 jdl.write('MyProxyServer = "' + self.proxyServer + '";\n')
765
766 jdl.close()
767 return
768
769 def checkProxy(self):
770 """
771 Function to check the Globus proxy.
772 """
773 if (self.proxyValid): return
774 timeleft = -999
775 minTimeLeft=10*3600 # in seconds
776
777 minTimeLeftServer = 100 # in hours
778
779 #cmd = 'voms-proxy-info -exists -valid '+str(minTimeLeft)+':00'
780 #cmd = 'voms-proxy-info -timeleft'
781 mustRenew = 0
782 timeLeftLocal = runCommand('voms-proxy-info -timeleft')
783 timeLeftServer = -999
784 if not timeLeftLocal or int(timeLeftLocal) <= 0 or not isInt(timeLeftLocal):
785 mustRenew = 1
786 else:
787 timeLeftServer = runCommand('voms-proxy-info -actimeleft | head -1')
788 if not timeLeftServer or not isInt(timeLeftServer):
789 mustRenew = 1
790 elif timeLeftLocal<minTimeLeft or timeLeftServer<minTimeLeft:
791 mustRenew = 1
792 pass
793 pass
794
795 if mustRenew:
796 common.logger.message( "No valid proxy found or timeleft too short!\n Creating a user proxy with default length of 24h\n")
797 cmd = 'voms-proxy-init -voms cms -valid 24:00'
798 try:
799 # SL as above: damn it!
800 out = os.system(cmd)
801 if (out>0): raise CrabException("Unable to create a valid proxy!\n")
802 except:
803 msg = "Unable to create a valid proxy!\n"
804 raise CrabException(msg)
805 # cmd = 'grid-proxy-info -timeleft'
806 # cmd_out = runCommand(cmd,0,20)
807 pass
808
809 ## now I do have a voms proxy valid, and I check the myproxy server
810 renewProxy = 0
811 cmd = 'myproxy-info -d -s '+self.proxyServer
812 cmd_out = runCommand(cmd,0,20)
813 if not cmd_out:
814 common.logger.message('No credential delegated to myproxy server '+self.proxyServer+' will do now')
815 renewProxy = 1
816 else:
817 # if myproxy exist but not long enough, renew
818 reTime = re.compile( r'timeleft: (\d+)' )
819 #print "<"+str(reTime.search( cmd_out ).group(1))+">"
820 if reTime.match( cmd_out ):
821 time = reTime.search( line ).group(1)
822 if time < minTimeLeftServer:
823 renewProxy = 1
824 common.logger.message('No credential delegation will expire in '+time+' hours: renew it')
825 pass
826 pass
827
828 # if not, create one.
829 if renewProxy:
830 cmd = 'myproxy-init -d -n -s '+self.proxyServer
831 out = os.system(cmd)
832 if (out>0):
833 raise CrabException("Unable to delegate the proxy to myproxyserver "+self.proxyServer+" !\n")
834 pass
835
836 # cache proxy validity
837 self.proxyValid=1
838 return
839
840 def configOpt_(self):
841 edg_ui_cfg_opt = ' '
842 if self.edg_config:
843 edg_ui_cfg_opt = ' -c ' + self.edg_config + ' '
844 if self.edg_config_vo:
845 edg_ui_cfg_opt += ' --config-vo ' + self.edg_config_vo + ' '
846 return edg_ui_cfg_opt