ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/crab.py
Revision: 1.60
Committed: Fri Mar 24 16:59:46 2006 UTC (19 years, 1 month ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_1_0_7
Changes since 1.59: +20 -17 lines
Log Message:
Bug fix for resubmit

File Contents

# User Rev Content
1 slacapra 1.27 #!/usr/bin/env python2.2
2 nsmirnov 1.1 from crab_help import *
3     from crab_util import *
4     from crab_exceptions import *
5     from crab_logger import Logger
6     from WorkSpace import WorkSpace
7     from JobDB import JobDB
8 nsmirnov 1.3 from JobList import JobList
9 nsmirnov 1.1 from Creator import Creator
10     from Submitter import Submitter
11 slacapra 1.8 from Checker import Checker
12 slacapra 1.9 from PostMortem import PostMortem
13     from Status import Status
14 spiga 1.16 from StatusBoss import StatusBoss
15 corvo 1.42 from ApmonIf import ApmonIf
16 slacapra 1.36 from Cleaner import Cleaner
17 nsmirnov 1.1 import common
18 spiga 1.13 import Statistic
19 nsmirnov 1.1
20     import sys, os, time, string
21    
22     ###########################################################################
23     class Crab:
24 nsmirnov 1.3 def __init__(self, opts):
25 nsmirnov 1.1
26     # The order of main_actions is important !
27 slacapra 1.28 self.main_actions = [ '-create', '-submit' ]
28 slacapra 1.8 self.aux_actions = [ '-list', '-kill', '-status', '-getoutput',
29 slacapra 1.28 '-resubmit' , '-cancelAndResubmit', '-testJdl', '-postMortem', '-clean']
30 nsmirnov 1.1
31     # Dictionary of actions, e.g. '-create' -> object of class Creator
32     self.actions = {}
33    
34     # Configuration file
35     self.cfg_fname = None
36     # Dictionary with configuration parameters
37     self.cfg_params = {}
38    
39     # Current working directory
40     self.cwd = os.getcwd()+'/'
41     # Current time in format 'yymmdd_hhmmss'
42     self.current_time = time.strftime('%y%m%d_%H%M%S',
43     time.localtime(time.time()))
44    
45 nsmirnov 1.3 # Session name (?) Do we need this ?
46 nsmirnov 1.1 self.name = '0'
47    
48     # Job type
49     self.job_type_name = None
50    
51     # Continuation flag
52     self.flag_continue = 0
53    
54     # quiet mode, i.e. no output on screen
55     self.flag_quiet = 0
56     # produce more output
57     self.debug_level = 0
58    
59 nsmirnov 1.3 # Scheduler name, e.g. 'edg', 'lsf'
60 fanzago 1.53 # self.scheduler_name = ''
61 nsmirnov 1.1
62 nsmirnov 1.3 self.initialize_(opts)
63 nsmirnov 1.1
64     return
65    
66 nsmirnov 1.7 def version():
67 nsmirnov 1.1 return common.prog_version_str
68    
69 nsmirnov 1.7 version = staticmethod(version)
70    
71 nsmirnov 1.3 def initialize_(self, opts):
72 nsmirnov 1.1
73     # Process the '-continue' option first because
74     # in the case of continuation the CRAB configuration
75     # parameters are loaded from already existing Working Space.
76 nsmirnov 1.3 self.processContinueOption_(opts)
77 nsmirnov 1.1
78     # Process ini-file first, then command line options
79     # because they override ini-file settings.
80    
81 nsmirnov 1.3 self.processIniFile_(opts)
82 nsmirnov 1.1
83 nsmirnov 1.3 if self.flag_continue: opts = self.loadConfiguration_(opts)
84 nsmirnov 1.1
85 nsmirnov 1.3 self.processOptions_(opts)
86 nsmirnov 1.1
87     if not self.flag_continue:
88 nsmirnov 1.3 self.createWorkingSpace_()
89 slacapra 1.9 optsToBeSaved={}
90     for it in opts.keys():
91     if (it in self.main_actions) or (it in self.aux_actions) or (it == '-debug'):
92     pass
93     else:
94     optsToBeSaved[it]=opts[it]
95     common.work_space.saveConfiguration(optsToBeSaved, self.cfg_fname)
96 nsmirnov 1.1 pass
97    
98     # At this point all configuration options have been read.
99    
100     args = string.join(sys.argv,' ')
101 slacapra 1.11
102 nsmirnov 1.3 self.updateHistory_(args)
103 slacapra 1.11
104 nsmirnov 1.3 self.createLogger_(args)
105 slacapra 1.11
106 nsmirnov 1.1 common.jobDB = JobDB()
107 corvo 1.51
108 nsmirnov 1.3 if self.flag_continue:
109 slacapra 1.12 try:
110     common.jobDB.load()
111 corvo 1.50 self.cfg_params['taskId'] = common.jobDB._jobs[0].taskId
112 slacapra 1.12 common.logger.debug(6, str(common.jobDB))
113     except DBException,e:
114     pass
115 nsmirnov 1.3 pass
116 slacapra 1.11
117 nsmirnov 1.3 self.createScheduler_()
118 slacapra 1.11
119 nsmirnov 1.3 if common.logger.debugLevel() >= 6:
120     common.logger.debug(6, 'Used properties:')
121     keys = self.cfg_params.keys()
122     keys.sort()
123     for k in keys:
124     if self.cfg_params[k]:
125 corvo 1.42 common.logger.debug(6, ' '+k+' : '+str(self.cfg_params[k]))
126 nsmirnov 1.3 pass
127     else:
128     common.logger.debug(6, ' '+k+' : ')
129     pass
130     pass
131     common.logger.debug(6, 'End of used properties.\n')
132     pass
133     self.initializeActions_(opts)
134 nsmirnov 1.1 return
135    
136 nsmirnov 1.3 def processContinueOption_(self,opts):
137 nsmirnov 1.1
138     continue_dir = None
139 nsmirnov 1.4
140     # Look for the '-continue' option.
141    
142 nsmirnov 1.1 for opt in opts.keys():
143     if ( opt in ('-continue','-c') ):
144     self.flag_continue = 1
145     val = opts[opt]
146     if val:
147     if val[0] == '/': continue_dir = val # abs path
148     else: continue_dir = self.cwd + val # rel path
149     pass
150 nsmirnov 1.4 break
151     pass
152    
153     # Look for actions which has sense only with '-continue'
154    
155     if not self.flag_continue:
156     for opt in opts.keys():
157 slacapra 1.6 if ( opt in (self.aux_actions) ):
158 nsmirnov 1.4 self.flag_continue = 1
159     break
160 nsmirnov 1.1 pass
161     pass
162 slacapra 1.6 submit_flag=0
163     create_flag=0
164     for opt in opts.keys():
165     if opt == "-submit": submit_flag=1
166     if opt == "-create": create_flag=1
167     pass
168     if (submit_flag and not create_flag):
169 nsmirnov 1.7 msg = "'-submit' must be used with either '-create' or '-continue'."
170     raise CrabException(msg)
171 slacapra 1.6 pass
172 nsmirnov 1.1
173     if not self.flag_continue: return
174    
175     if not continue_dir:
176     prefix = common.prog_name + '_' + self.name + '_'
177     continue_dir = findLastWorkDir(prefix)
178     pass
179    
180     if not continue_dir:
181     raise CrabException('Cannot find last working directory.')
182    
183     if not os.path.exists(continue_dir):
184     msg = 'Cannot continue because the working directory <'
185     msg += continue_dir
186     msg += '> does not exist.'
187     raise CrabException(msg)
188    
189     # Instantiate WorkSpace
190 fanzago 1.49 common.work_space = WorkSpace(continue_dir, self.cfg_params)
191 nsmirnov 1.1
192     return
193    
194 nsmirnov 1.3 def processIniFile_(self, opts):
195 nsmirnov 1.1 """
196     Processes a configuration INI-file.
197     """
198    
199     # Extract cfg-file name from the cmd-line options.
200    
201     for opt in opts.keys():
202     if ( opt == '-cfg' ):
203     if self.flag_continue:
204     raise CrabException('-continue and -cfg cannot coexist.')
205     if opts[opt] : self.cfg_fname = opts[opt]
206     else : usage()
207     pass
208    
209     elif ( opt == '-name' ):
210     self.name = opts[opt]
211     pass
212    
213     pass
214    
215     # Set default cfg-fname
216    
217     if self.cfg_fname == None:
218     if self.flag_continue:
219     self.cfg_fname = common.work_space.cfgFileName()
220     else:
221     self.cfg_fname = common.prog_name+'.cfg'
222     pass
223     pass
224    
225     # Load cfg-file
226    
227     if string.lower(self.cfg_fname) != 'none':
228     if os.path.exists(self.cfg_fname):
229     self.cfg_params = loadConfig(self.cfg_fname)
230 corvo 1.42 # Better idea on where to put user info for ML?
231 corvo 1.52 self.cfg_params['user'] = os.getlogin()
232 corvo 1.42 # Better idea on where to put ML???? Looks crap here, but had no better thought!
233     if int(self.cfg_params['USER.activate_monalisa']): self.cfg_params['apmon'] = ApmonIf()
234 nsmirnov 1.1 pass
235     else:
236     msg = 'cfg-file '+self.cfg_fname+' not found.'
237     raise CrabException(msg)
238     pass
239     pass
240    
241     # process the [CRAB] section
242    
243     lhp = len('CRAB.')
244     for k in self.cfg_params.keys():
245     if len(k) >= lhp and k[:lhp] == 'CRAB.':
246     opt = '-'+k[lhp:]
247     if len(opt) >= 3 and opt[:3] == '-__': continue
248     if opt not in opts.keys():
249     opts[opt] = self.cfg_params[k]
250     pass
251     pass
252     pass
253    
254     return
255    
256 nsmirnov 1.3 def processOptions_(self, opts):
257 nsmirnov 1.1 """
258     Processes the command-line options.
259     """
260    
261     for opt in opts.keys():
262     val = opts[opt]
263    
264 nsmirnov 1.3 # Skip actions, they are processed later in initializeActions_()
265     if opt in self.main_actions:
266     self.cfg_params['CRAB.'+opt[1:]] = val
267     continue
268     if opt in self.aux_actions:
269     self.cfg_params['CRAB.'+opt[1:]] = val
270     continue
271 nsmirnov 1.1
272    
273     elif ( opt == '-cfg' ):
274     pass
275    
276     elif ( opt in ('-continue', '-c') ):
277 nsmirnov 1.4 # Already processed in processContinueOption_()
278 nsmirnov 1.1 pass
279    
280     elif ( opt == '-jobtype' ):
281     if val : self.job_type_name = string.upper(val)
282     else : usage()
283     pass
284    
285     elif ( opt == '-Q' ):
286     self.flag_quiet = 1
287     pass
288    
289     elif ( opt == '-debug' ):
290 slacapra 1.6 if val: self.debug_level = int(val)
291     else: self.debug_level = 1
292 nsmirnov 1.1 pass
293    
294     elif ( opt == '-scheduler' ):
295 fanzago 1.53 if val:
296     self.scheduler_name = 'boss'
297     self.flag_useboss = 1
298 nsmirnov 1.1 else:
299     print common.prog_name+". No value for '-scheduler'."
300     usage()
301     pass
302     pass
303 slacapra 1.22
304 fanzago 1.53 # elif ( opt in ('-use_boss', '-useboss') ):
305     # if ( val == '1' ):
306     # self.scheduler_name = 'boss'
307     # pass
308     # elif ( val == '0' ):
309     # pass
310     # else:
311     # print common.prog_name+'. Bad flag for -use_boss option:',\
312     # val,'Possible values are 0(=No) or 1(=Yes)'
313     # usage()
314     # pass
315     # pass
316 fanzago 1.14
317 nsmirnov 1.3 elif string.find(opt,'.') == -1:
318     print common.prog_name+'. Unrecognized option '+opt
319     usage()
320     pass
321 nsmirnov 1.1
322 nsmirnov 1.3 # Override config parameters from INI-file with cmd-line params
323     if string.find(opt,'.') == -1 :
324     self.cfg_params['CRAB.'+opt[1:]] = val
325 nsmirnov 1.1 pass
326 nsmirnov 1.3 else:
327 nsmirnov 1.1 # Command line parameters in the form -SECTION.ENTRY=VALUE
328     self.cfg_params[opt[1:]] = val
329     pass
330     pass
331     return
332    
333 slacapra 1.8 def parseRange_(self, aRange):
334 nsmirnov 1.4 """
335 slacapra 1.8 Takes as the input a string with a range defined in any of the following
336     way, including combination, and return a tuple with the ints defined
337     according to following table. A consistency check is done.
338     NB: the first job is "1", not "0".
339     'all' -> [1,2,..., NJobs]
340     '' -> [1,2,..., NJobs]
341     'n1' -> [n1]
342     'n1-n2' -> [n1, n1+1, n1+2, ..., n2-1, n2]
343     'n1,n2' -> [n1, n2]
344     'n1,n2-n3' -> [n1, n2, n2+1, n2+2, ..., n3-1, n3]
345     """
346     result = []
347    
348 slacapra 1.9 common.logger.debug(5,"parseRange_ "+str(aRange))
349     if aRange=='all' or aRange==None or aRange=='':
350 slacapra 1.8 result=range(0,common.jobDB.nJobs())
351     return result
352 slacapra 1.9 elif aRange=='0':
353     return result
354 slacapra 1.8
355     subRanges = string.split(aRange, ',')
356     for subRange in subRanges:
357     result = result+self.parseSimpleRange_(subRange)
358    
359     if self.checkUniqueness_(result):
360     return result
361     else:
362 slacapra 1.33 common.logger.message("Error "+result)
363 slacapra 1.8 return []
364    
365     def checkUniqueness_(self, list):
366     """
367 slacapra 1.9 check if a list contains only unique elements
368 slacapra 1.8 """
369    
370     uniqueList = []
371     # use a list comprehension statement (takes a while to understand)
372    
373     [uniqueList.append(it) for it in list if not uniqueList.count(it)]
374    
375     return (len(list)==len(uniqueList))
376    
377     def parseSimpleRange_(self, aRange):
378     """
379     Takes as the input a string with two integers separated by
380     the minus sign and returns the tuple with these numbers:
381     'n1-n2' -> [n1, n1+1, n1+2, ..., n2-1, n2]
382     'n1' -> [n1]
383     """
384     (start, end) = (None, None)
385    
386     result = []
387     minus = string.find(aRange, '-')
388     if ( minus < 0 ):
389     if isInt(aRange) and int(aRange)>0:
390 fanzago 1.37 # FEDE
391     #result.append(int(aRange)-1)
392     ###
393     result.append(aRange)
394 slacapra 1.8 else:
395 spiga 1.47 common.logger.message("parseSimpleRange_ ERROR "+aRange)
396     usage()
397     pass
398    
399 nsmirnov 1.4 pass
400     else:
401 slacapra 1.8 (start, end) = string.split(aRange, '-')
402     if isInt(start) and isInt(end) and int(start)>0 and int(start)<int(end):
403 spiga 1.38 #result=range(int(start)-1, int(end))
404     result=range(int(start), int(end)+1) #Daniele
405 slacapra 1.8 else:
406 slacapra 1.33 common.logger.message("parseSimpleRange_ ERROR "+start+end)
407 slacapra 1.8
408     return result
409 nsmirnov 1.4
410 nsmirnov 1.3 def initializeActions_(self, opts):
411 nsmirnov 1.1 """
412     For each user action instantiate a corresponding
413     object and put it in the action dictionary.
414     """
415 fanzago 1.53 # for opt in opts.keys():
416     # self.flag_useboss = 0
417     # if ( opt == '-use_boss'):
418     # val = opts[opt]
419     # if ( val == '1' ):
420     # self.flag_useboss = 1
421     # common.logger.message('Using BOSS')
422     # pass
423     # else:
424     # self.flag_useboss = 0
425     # pass
426 spiga 1.15
427     for opt in opts.keys():
428    
429 nsmirnov 1.1 val = opts[opt]
430 spiga 1.15
431    
432     if ( opt == '-create' ):
433 slacapra 1.22 ncjobs = 0
434 nsmirnov 1.1 if val:
435     if ( isInt(val) ):
436     ncjobs = int(val)
437     elif ( val == 'all'):
438     ncjobs = val
439     else:
440 nsmirnov 1.5 msg = 'Bad creation bunch size <'+str(val)+'>\n'
441     msg += ' Must be an integer or "all"'
442 slacapra 1.8 msg += ' Generic range is not allowed"'
443 nsmirnov 1.5 raise CrabException(msg)
444 nsmirnov 1.1 pass
445     else: ncjobs = 'all'
446    
447     if ncjobs != 0:
448 corvo 1.57
449 nsmirnov 1.3 # Instantiate Creator object
450 fanzago 1.56 self.creator = Creator(self.job_type_name,
451 nsmirnov 1.1 self.cfg_params,
452 corvo 1.55 ncjobs)
453 fanzago 1.56 self.actions[opt] = self.creator
454 nsmirnov 1.3
455     # Initialize the JobDB object if needed
456 nsmirnov 1.1 if not self.flag_continue:
457 fanzago 1.56 common.jobDB.create(self.creator.nJobs())
458 nsmirnov 1.1 pass
459 nsmirnov 1.3
460     # Create and initialize JobList
461    
462     common.job_list = JobList(common.jobDB.nJobs(),
463 fanzago 1.56 self.creator.jobType())
464 nsmirnov 1.3
465     common.job_list.setScriptNames(self.job_type_name+'.sh')
466     common.job_list.setJDLNames(self.job_type_name+'.jdl')
467 slacapra 1.12 common.job_list.setCfgNames(self.job_type_name+'.orcarc')
468 slacapra 1.9
469 fanzago 1.56 self.creator.writeJobsSpecsToDB()
470 nsmirnov 1.1 pass
471 nsmirnov 1.3 pass
472 nsmirnov 1.1
473     elif ( opt == '-submit' ):
474 slacapra 1.23
475     # total jobs
476     # get the first not already submitted
477 slacapra 1.24 common.logger.debug(5,'Total jobs '+str(common.jobDB.nJobs()))
478     lastSubmittedJob=0
479     for nj in range(common.jobDB.nJobs()):
480 fanzago 1.45 if (common.jobDB.status(nj) in ['S','K','RC','Y','A','D']):
481 slacapra 1.24 lastSubmittedJob +=1
482 slacapra 1.30 else: break
483 slacapra 1.24 # count job from 1
484 slacapra 1.30 totalJobsSubmittable = common.jobDB.nJobs()-lastSubmittedJob
485 slacapra 1.24 common.logger.debug(5,'lastSubmittedJob '+str(lastSubmittedJob))
486     common.logger.debug(5,'totalJobsSubmittable '+str(totalJobsSubmittable))
487 slacapra 1.23
488 slacapra 1.24 nsjobs = lastSubmittedJob+totalJobsSubmittable
489 slacapra 1.23 # get user request
490 slacapra 1.22 if val:
491     if ( isInt(val) ):
492 slacapra 1.24 tmp = int(val)
493     if (tmp >= totalJobsSubmittable):
494     common.logger.message('asking to submit '+str(tmp)+' jobs, but only '+str(totalJobsSubmittable)+' left: submitting those')
495     pass
496     else:
497     nsjobs=lastSubmittedJob+int(val)
498 slacapra 1.23 elif (val=='all'):
499     pass
500 slacapra 1.22 else:
501     msg = 'Bad submission option <'+str(val)+'>\n'
502     msg += ' Must be an integer or "all"'
503     msg += ' Generic range is not allowed"'
504     raise CrabException(msg)
505     pass
506 slacapra 1.24 common.logger.debug(5,'nsjobs '+str(nsjobs))
507 slacapra 1.23
508     # submit N from last submitted job
509 slacapra 1.24 nj_list = range(lastSubmittedJob, nsjobs)
510     common.logger.debug(5,'nj_list '+str(nj_list))
511 nsmirnov 1.5
512     if len(nj_list) != 0:
513 nsmirnov 1.3 # Instantiate Submitter object
514 corvo 1.57 # self.actions[opt] = Submitter(self.cfg_params, nj_list, self.creator.jobType())
515     self.actions[opt] = Submitter(self.cfg_params, nj_list)
516 nsmirnov 1.3
517     # Create and initialize JobList
518     if len(common.job_list) == 0 :
519     common.job_list = JobList(common.jobDB.nJobs(),
520     None)
521     common.job_list.setJDLNames(self.job_type_name+'.jdl')
522     pass
523 nsmirnov 1.1 pass
524 slacapra 1.30 pass
525 nsmirnov 1.1
526 nsmirnov 1.4 elif ( opt == '-list' ):
527 slacapra 1.8 jobs = self.parseRange_(val)
528    
529     common.jobDB.dump(jobs)
530 nsmirnov 1.4 pass
531    
532     elif ( opt == '-status' ):
533 slacapra 1.8 jobs = self.parseRange_(val)
534    
535 slacapra 1.9 if len(jobs) != 0:
536 spiga 1.16 if ( self.flag_useboss == 1 ):
537 corvo 1.42 self.actions[opt] = StatusBoss(self.cfg_params)
538 spiga 1.15 else:
539 corvo 1.46 self.actions[opt] = Status(self.cfg_params, jobs)
540 spiga 1.15 pass
541 nsmirnov 1.1 pass
542     pass
543 slacapra 1.22
544 nsmirnov 1.4 elif ( opt == '-kill' ):
545 slacapra 1.8
546 spiga 1.31 if ( self.flag_useboss == 1 ):
547     if val:
548     if val =='all':
549 fanzago 1.37 allBoss_id = common.scheduler.listBoss()
550     jobs = allBoss_id.keys()
551 spiga 1.31 else:
552     jobs = self.parseRange_(val)
553     common.scheduler.cancel(jobs)
554     else:
555     common.logger.message("Warning: with '-kill' you _MUST_ specify a job range or 'all'")
556     else:
557     if val:
558     jobs = self.parseRange_(val)
559    
560     for nj in jobs:
561     st = common.jobDB.status(nj)
562     if st == 'S':
563     jid = common.jobDB.jobId(nj)
564     common.logger.message("Killing job # "+`(nj+1)`)
565     common.scheduler.cancel(jid)
566     common.jobDB.setStatus(nj, 'K')
567     pass
568 slacapra 1.9 pass
569 spiga 1.31
570     common.jobDB.save()
571 nsmirnov 1.4 pass
572 spiga 1.31 else:
573     common.logger.message("Warning: with '-kill' you _MUST_ specify a job range or 'all'")
574 nsmirnov 1.1
575 slacapra 1.8 elif ( opt == '-getoutput' ):
576    
577 spiga 1.20 if ( self.flag_useboss == 1 ):
578     if val=='all' or val==None or val=='':
579 fanzago 1.37 allBoss_id = common.scheduler.listBoss()
580     jobs = allBoss_id.keys()
581 spiga 1.20 else:
582     jobs = self.parseRange_(val)
583     common.scheduler.getOutput(jobs)
584     else:
585     jobs = self.parseRange_(val)
586 spiga 1.13
587 slacapra 1.22 ## also this: create a ActorClass (GetOutput)
588 spiga 1.20 jobs_done = []
589     for nj in jobs:
590     st = common.jobDB.status(nj)
591     if st == 'D':
592 slacapra 1.12 jobs_done.append(nj)
593 spiga 1.20 pass
594     elif st == 'S':
595     jid = common.jobDB.jobId(nj)
596     currStatus = common.scheduler.queryStatus(jid)
597     if currStatus=="Done":
598     jobs_done.append(nj)
599     else:
600     msg = 'Job # '+`(nj+1)`+' submitted but still status '+currStatus+' not possible to get output'
601     common.logger.message(msg)
602     pass
603 slacapra 1.9 else:
604 spiga 1.20 # common.logger.message('Jobs #'+`(nj+1)`+' has status '+st+' not possible to get output')
605     pass
606 slacapra 1.12 pass
607    
608 spiga 1.20 for nj in jobs_done:
609 spiga 1.15 jid = common.jobDB.jobId(nj)
610     dir = common.scheduler.getOutput(jid)
611     common.jobDB.setStatus(nj, 'Y')
612 slacapra 1.12
613     # Rename the directory with results to smth readable
614     new_dir = common.work_space.resDir()
615 spiga 1.20 try:
616     files = os.listdir(dir)
617     for file in files:
618     os.rename(dir+'/'+file, new_dir+'/'+file)
619     os.rmdir(dir)
620     except OSError, e:
621     msg = 'rename files from '+dir+' to '+new_dir+' error: '
622     msg += str(e)
623     common.logger.message(msg)
624     # ignore error
625 slacapra 1.12 pass
626 spiga 1.20 pass
627 slacapra 1.22 ###
628    
629 spiga 1.13 resFlag = 0
630     exCode = common.scheduler.getExitStatus(jid)
631 spiga 1.34 Statistic.Monitor('retrieved',resFlag,jid,exCode)
632 slacapra 1.12
633     msg = 'Results of Job # '+`(nj+1)`+' are in '+new_dir
634     common.logger.message(msg)
635 nsmirnov 1.4 pass
636    
637     common.jobDB.save()
638     pass
639    
640     elif ( opt == '-resubmit' ):
641 fanzago 1.37 if ( self.flag_useboss == 1 ):
642     if val=='all' or val==None or val=='':
643     allBoss_id = common.scheduler.listBoss()
644     jobs = allBoss_id.keys()
645     else:
646     jobs = self.parseRange_(val)
647     else:
648     if val:
649     jobs = self.parseRange_(val)
650    
651 slacapra 1.11 if val:
652     # create a list of jobs to be resubmitted.
653 slacapra 1.8
654 slacapra 1.22 ### as before, create a Resubmittter Class
655 spiga 1.60 allBoss_id = common.scheduler.listBoss()
656     maxIndex = allBoss_id.keys()
657 slacapra 1.11 nj_list = []
658     for nj in jobs:
659 spiga 1.60 if int(nj) <= int(len(maxIndex)) :
660     st = common.jobDB.status(int(nj)-1)
661     if st in ['K','A']:
662     nj_list.append(int(nj)-1)
663     common.jobDB.setStatus(int(nj)-1,'C')
664     elif st == 'Y':
665     common.scheduler.moveOutput(nj)
666     nj_list.append(int(nj)-1)
667     st = common.jobDB.setStatus(int(nj)-1,'RC')
668     elif st in ['C','X']:
669     common.logger.message('Job #'+`int(nj)`+' has status '+crabJobStatusToString(st)+' not yet submitted!!!')
670     pass
671     elif st == 'D':
672     common.logger.message('Job #'+`int(nj)`+' has status '+crabJobStatusToString(st)+' must be retrieved before resubmission')
673     else:
674     common.logger.message('Job #'+`nj`+' has status '+crabJobStatusToString(st)+' must be "killed" before resubmission')
675 slacapra 1.11 else:
676 spiga 1.60 common.logger.message('Job #'+`int(nj)`+' no possible to resubmit!! out of range')
677 fanzago 1.37 if len(common.job_list) == 0 :
678     common.job_list = JobList(common.jobDB.nJobs(),None)
679     common.job_list.setJDLNames(self.job_type_name+'.jdl')
680     pass
681    
682 slacapra 1.11 if len(nj_list) != 0:
683 fanzago 1.37 common.scheduler.resubmit(nj_list)
684 slacapra 1.11 # Instantiate Submitter object
685 fanzago 1.58 self.actions[opt] = Submitter(self.cfg_params, nj_list)
686 slacapra 1.11
687 slacapra 1.8 pass
688     pass
689 slacapra 1.11 else:
690     common.logger.message("Warning: with '-resubmit' you _MUST_ specify a job range or 'all'")
691 spiga 1.60 common.logger.message("WARNING: _all_ job specified in the range will be resubmitted!!!")
692 slacapra 1.11 pass
693 fanzago 1.37 common.jobDB.save()
694 slacapra 1.8 pass
695    
696     elif ( opt == '-cancelAndResubmit' ):
697 nsmirnov 1.5
698 spiga 1.47 if ( self.flag_useboss == 1 ):
699     if val:
700     if val =='all':
701     allBoss_id = common.scheduler.listBoss()
702     jobs = allBoss_id.keys()
703     else:
704     jobs = self.parseRange_(val)
705     # kill submitted jobs
706     common.scheduler.cancel(jobs)
707     else:
708     common.logger.message("Warning: with '-cancelAndResubmit' you _MUST_ specify a job range or 'all'")
709     else:
710     if val:
711     jobs = self.parseRange_(val)
712     else:
713     common.logger.message("Warning: with '-kill' you _MUST_ specify a job range or 'all'")
714 nsmirnov 1.5 pass
715    
716 spiga 1.47 # resubmit cancelled jobs.
717     if val:
718     nj_list = []
719     for nj in jobs:
720     if ( self.flag_useboss != 1 ):
721     st = common.jobDB.status(nj)
722     if st == 'S':
723     jid = common.jobDB.jobId(nj)
724     common.scheduler.cancel(jid)
725     st = 'K'
726     common.jobDB.setStatus(nj, st)
727     pass
728     common.jobDB.save()
729     pass
730     st = common.jobDB.status(int(nj)-1)
731     if st in ['K','A']:
732     nj_list.append(int(nj)-1)
733     common.jobDB.setStatus(int(nj)-1,'C')
734     elif st == 'Y':
735     common.scheduler.moveOutput(nj)
736     nj_list.append(int(nj)-1)
737     st = common.jobDB.setStatus(int(nj)-1,'RC')
738     elif st in ['C','X']:
739     common.logger.message('Job #'+`int(nj)`+' has status '+crabJobStatusToString(st)+' not yet submitted!!!')
740     pass
741     elif st == 'D':
742     common.logger.message('Job #'+`int(nj)`+' has status '+crabJobStatusToString(st)+' must be retrieved before resubmission')
743     else:
744     common.logger.message('Job #'+`nj`+' has status '+crabJobStatusToString(st)+' must be "killed" before resubmission')
745     pass
746    
747 nsmirnov 1.5 if len(common.job_list) == 0 :
748 spiga 1.47 common.job_list = JobList(common.jobDB.nJobs(),None)
749 nsmirnov 1.5 common.job_list.setJDLNames(self.job_type_name+'.jdl')
750     pass
751 spiga 1.47
752     if len(nj_list) != 0:
753     common.scheduler.resubmit(nj_list)
754 fanzago 1.58 self.actions[opt] = Submitter(self.cfg_params, nj_list)
755 spiga 1.47 pass
756     pass
757     else:
758     common.logger.message("WARNING: _all_ job specified in the rage will be cancelled and resubmitted!!!")
759 nsmirnov 1.5 pass
760 spiga 1.47 common.jobDB.save()
761 nsmirnov 1.4 pass
762 spiga 1.47
763 slacapra 1.28 elif ( opt == '-testJdl' ):
764 slacapra 1.8 jobs = self.parseRange_(val)
765     nj_list = []
766     for nj in jobs:
767     st = common.jobDB.status(nj)
768 slacapra 1.12 if st == 'C': nj_list.append(nj)
769 slacapra 1.8 pass
770    
771     if len(nj_list) != 0:
772     # Instantiate Submitter object
773     self.actions[opt] = Checker(self.cfg_params, nj_list)
774    
775     # Create and initialize JobList
776    
777     if len(common.job_list) == 0 :
778     common.job_list = JobList(common.jobDB.nJobs(), None)
779     common.job_list.setJDLNames(self.job_type_name+'.jdl')
780     pass
781     pass
782    
783 slacapra 1.9 elif ( opt == '-postMortem' ):
784     jobs = self.parseRange_(val)
785     nj_list = []
786     for nj in jobs:
787 fanzago 1.45 # fede: nj scala di uno perche' e' l'internal id di boss
788     # ed il jobDB parte da zero ...
789     st = common.jobDB.status(int(nj)-1)
790     if st not in ['X', 'C']: nj_list.append(int(nj))
791 slacapra 1.9 pass
792    
793     if len(nj_list) != 0:
794     # Instantiate Submitter object
795 spiga 1.35 self.actions[opt] = PostMortem(self.cfg_params, nj_list,self.flag_useboss)
796 slacapra 1.9
797     # Create and initialize JobList
798    
799     if len(common.job_list) == 0 :
800     common.job_list = JobList(common.jobDB.nJobs(), None)
801     common.job_list.setJDLNames(self.job_type_name+'.jdl')
802     pass
803     pass
804 slacapra 1.33 else:
805     common.logger.message("No jobs to analyze")
806 slacapra 1.9
807     elif ( opt == '-clean' ):
808     if val != None:
809     raise CrabException("No range allowed for '-clean'")
810    
811 corvo 1.46 theCleaner = Cleaner(self.scheduler_name == 'boss', self.cfg_params)
812 slacapra 1.36 theCleaner.clean()
813 slacapra 1.9
814 nsmirnov 1.1 pass
815     return
816    
817 nsmirnov 1.3 def createWorkingSpace_(self):
818 slacapra 1.9 new_dir = ''
819    
820     try:
821     new_dir = self.cfg_params['USER.ui_working_dir']
822 corvo 1.50 self.cfg_params['taskId'] = self.cfg_params['user'] + '_' + string.split(new_dir, '/')[len(string.split(new_dir, '/')) - 1] + '_' + self.current_time
823 fanzago 1.48 if os.path.exists(new_dir):
824     if os.listdir(new_dir):
825     msg = new_dir + ' already exists and is not empty. Please remove it before create new task'
826     raise CrabException(msg)
827 slacapra 1.9 except KeyError:
828     new_dir = common.prog_name + '_' + self.name + '_' + self.current_time
829 corvo 1.50 self.cfg_params['taskId'] = self.cfg_params['user'] + '_' + new_dir
830 slacapra 1.9 new_dir = self.cwd + new_dir
831     pass
832 fanzago 1.49 common.work_space = WorkSpace(new_dir, self.cfg_params)
833 nsmirnov 1.1 common.work_space.create()
834     return
835    
836 nsmirnov 1.3 def loadConfiguration_(self, opts):
837 nsmirnov 1.1
838     save_opts = common.work_space.loadSavedOptions()
839    
840     # Override saved options with new command-line options
841    
842     for k in opts.keys():
843     save_opts[k] = opts[k]
844     pass
845    
846     # Return updated options
847     return save_opts
848    
849 nsmirnov 1.3 def createLogger_(self, args):
850 nsmirnov 1.1
851     log = Logger()
852     log.quiet(self.flag_quiet)
853     log.setDebugLevel(self.debug_level)
854     log.write(args+'\n')
855 nsmirnov 1.3 log.message(self.headerString_())
856 nsmirnov 1.1 log.flush()
857     common.logger = log
858     return
859    
860 nsmirnov 1.3 def updateHistory_(self, args):
861 nsmirnov 1.1 history_fname = common.prog_name+'.history'
862     history_file = open(history_fname, 'a')
863     history_file.write(self.current_time+': '+args+'\n')
864     history_file.close()
865     return
866    
867 nsmirnov 1.3 def headerString_(self):
868 nsmirnov 1.1 """
869     Creates a string describing program options either given in
870     the command line or their default values.
871     """
872     header = common.prog_name + ' (version ' + common.prog_version_str + \
873     ') running on ' + \
874     time.ctime(time.time())+'\n\n' + \
875     common.prog_name+'. Working options:\n'
876 fanzago 1.59 #print self.job_type_name
877 nsmirnov 1.1 header = header +\
878     ' scheduler ' + self.scheduler_name + '\n'+\
879     ' job type ' + self.job_type_name + '\n'+\
880     ' working directory ' + common.work_space.topDir()\
881     + '\n'
882     return header
883    
884 nsmirnov 1.3 def createScheduler_(self):
885 nsmirnov 1.1 """
886     Creates a scheduler object instantiated by its name.
887     """
888     klass_name = 'Scheduler' + string.capitalize(self.scheduler_name)
889     file_name = klass_name
890     try:
891     klass = importName(file_name, klass_name)
892     except KeyError:
893     msg = 'No `class '+klass_name+'` found in file `'+file_name+'.py`'
894     raise CrabException(msg)
895     except ImportError, e:
896     msg = 'Cannot create scheduler '+self.scheduler_name
897     msg += ' (file: '+file_name+', class '+klass_name+'):\n'
898     msg += str(e)
899     raise CrabException(msg)
900    
901     common.scheduler = klass()
902     common.scheduler.configure(self.cfg_params)
903     return
904    
905 corvo 1.55 # def createJobtype_(self):
906     # """
907     # Create the jobtype specified in the crab.cfg file
908     # """
909     # file_name = 'cms_'+ string.lower(self.job_type_name)
910     # klass_name = string.capitalize(self.job_type_name)
911     #
912     # try:
913     # klass = importName(file_name, klass_name)
914     # except KeyError:
915     # msg = 'No `class '+klass_name+'` found in file `'+file_name+'.py`'
916     # raise CrabException(msg)
917     # except ImportError, e:
918     # msg = 'Cannot create job type '+self.job_type_name
919     # msg += ' (file: '+file_name+', class '+klass_name+'):\n'
920     # msg += str(e)
921     # raise CrabException(msg)
922     # job_type = klass(self.cfg_params)
923     # return job_type
924 corvo 1.51
925 nsmirnov 1.1 def run(self):
926     """
927     For each
928     """
929    
930     for act in self.main_actions:
931     if act in self.actions.keys(): self.actions[act].run()
932     pass
933    
934     for act in self.aux_actions:
935     if act in self.actions.keys(): self.actions[act].run()
936     pass
937     return
938    
939     ###########################################################################
940     def processHelpOptions(opts):
941    
942 slacapra 1.11 if len(opts):
943     for opt in opts.keys():
944     if opt in ('-v', '-version', '--version') :
945     print Crab.version()
946     return 1
947     if opt in ('-h','-help','--help') :
948     if opts[opt] : help(opts[opt])
949     else: help()
950     return 1
951     else:
952     usage()
953 nsmirnov 1.1
954     return 0
955    
956     ###########################################################################
957     if __name__ == '__main__':
958    
959 fanzago 1.18
960 fanzago 1.19 # # Initial settings for Python modules. Avoid appending manually lib paths.
961     # try:
962     # path=os.environ['EDG_WL_LOCATION']
963     # except:
964     # print "Error: Please set the EDG_WL_LOCATION environment variable pointing to the userinterface installation path"
965     # sys.exit(1)
966     #
967     # libPath=os.path.join(path, "lib")
968     # sys.path.append(libPath)
969     # libPath=os.path.join(path, "lib", "python")
970     # sys.path.append(libPath)
971 fanzago 1.18
972    
973 nsmirnov 1.1 # Parse command-line options and create a dictionary with
974     # key-value pairs.
975    
976     options = parseOptions(sys.argv[1:])
977    
978     # Process "help" options, such as '-help', '-version'
979    
980 slacapra 1.11 if processHelpOptions(options) : sys.exit(0)
981 nsmirnov 1.1
982     # Create, initialize, and run a Crab object
983    
984     try:
985 nsmirnov 1.3 crab = Crab(options)
986 nsmirnov 1.1 crab.run()
987 corvo 1.54 crab.cfg_params['apmon'].free()
988 nsmirnov 1.1 except CrabException, e:
989     print '\n' + common.prog_name + ': ' + str(e) + '\n'
990     if common.logger:
991     common.logger.write('ERROR: '+str(e)+'\n')
992     pass
993     pass
994    
995     pass