ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/crab.py
Revision: 1.56
Committed: Thu Mar 23 12:25:48 2006 UTC (19 years, 1 month ago) by fanzago
Content type: text/x-python
Branch: MAIN
Changes since 1.55: +8 -8 lines
Log Message:
fixed bug with creator.jobType()

File Contents

# User Rev Content
1 slacapra 1.27 #!/usr/bin/env python2.2
2 nsmirnov 1.1 from crab_help import *
3     from crab_util import *
4     from crab_exceptions import *
5     from crab_logger import Logger
6     from WorkSpace import WorkSpace
7     from JobDB import JobDB
8 nsmirnov 1.3 from JobList import JobList
9 nsmirnov 1.1 from Creator import Creator
10     from Submitter import Submitter
11 slacapra 1.8 from Checker import Checker
12 slacapra 1.9 from PostMortem import PostMortem
13     from Status import Status
14 spiga 1.16 from StatusBoss import StatusBoss
15 corvo 1.42 from ApmonIf import ApmonIf
16 slacapra 1.36 from Cleaner import Cleaner
17 nsmirnov 1.1 import common
18 spiga 1.13 import Statistic
19 nsmirnov 1.1
20     import sys, os, time, string
21    
22     ###########################################################################
23     class Crab:
24 nsmirnov 1.3 def __init__(self, opts):
25 nsmirnov 1.1
26     # The order of main_actions is important !
27 slacapra 1.28 self.main_actions = [ '-create', '-submit' ]
28 slacapra 1.8 self.aux_actions = [ '-list', '-kill', '-status', '-getoutput',
29 slacapra 1.28 '-resubmit' , '-cancelAndResubmit', '-testJdl', '-postMortem', '-clean']
30 nsmirnov 1.1
31     # Dictionary of actions, e.g. '-create' -> object of class Creator
32     self.actions = {}
33    
34     # Configuration file
35     self.cfg_fname = None
36     # Dictionary with configuration parameters
37     self.cfg_params = {}
38    
39     # Current working directory
40     self.cwd = os.getcwd()+'/'
41     # Current time in format 'yymmdd_hhmmss'
42     self.current_time = time.strftime('%y%m%d_%H%M%S',
43     time.localtime(time.time()))
44    
45 nsmirnov 1.3 # Session name (?) Do we need this ?
46 nsmirnov 1.1 self.name = '0'
47    
48     # Job type
49     self.job_type_name = None
50    
51     # Continuation flag
52     self.flag_continue = 0
53    
54     # quiet mode, i.e. no output on screen
55     self.flag_quiet = 0
56     # produce more output
57     self.debug_level = 0
58    
59 nsmirnov 1.3 # Scheduler name, e.g. 'edg', 'lsf'
60 fanzago 1.53 # self.scheduler_name = ''
61 nsmirnov 1.1
62 nsmirnov 1.3 self.initialize_(opts)
63 nsmirnov 1.1
64     return
65    
66 nsmirnov 1.7 def version():
67 nsmirnov 1.1 return common.prog_version_str
68    
69 nsmirnov 1.7 version = staticmethod(version)
70    
71 nsmirnov 1.3 def initialize_(self, opts):
72 nsmirnov 1.1
73     # Process the '-continue' option first because
74     # in the case of continuation the CRAB configuration
75     # parameters are loaded from already existing Working Space.
76 nsmirnov 1.3 self.processContinueOption_(opts)
77 nsmirnov 1.1
78     # Process ini-file first, then command line options
79     # because they override ini-file settings.
80    
81 nsmirnov 1.3 self.processIniFile_(opts)
82 nsmirnov 1.1
83 nsmirnov 1.3 if self.flag_continue: opts = self.loadConfiguration_(opts)
84 nsmirnov 1.1
85 nsmirnov 1.3 self.processOptions_(opts)
86 nsmirnov 1.1
87     if not self.flag_continue:
88 nsmirnov 1.3 self.createWorkingSpace_()
89 slacapra 1.9 optsToBeSaved={}
90     for it in opts.keys():
91     if (it in self.main_actions) or (it in self.aux_actions) or (it == '-debug'):
92     pass
93     else:
94     optsToBeSaved[it]=opts[it]
95     common.work_space.saveConfiguration(optsToBeSaved, self.cfg_fname)
96 nsmirnov 1.1 pass
97    
98     # At this point all configuration options have been read.
99    
100     args = string.join(sys.argv,' ')
101 slacapra 1.11
102 nsmirnov 1.3 self.updateHistory_(args)
103 slacapra 1.11
104 nsmirnov 1.3 self.createLogger_(args)
105 slacapra 1.11
106 nsmirnov 1.1 common.jobDB = JobDB()
107 corvo 1.51
108 nsmirnov 1.3 if self.flag_continue:
109 slacapra 1.12 try:
110     common.jobDB.load()
111 corvo 1.50 self.cfg_params['taskId'] = common.jobDB._jobs[0].taskId
112 slacapra 1.12 common.logger.debug(6, str(common.jobDB))
113     except DBException,e:
114     pass
115 nsmirnov 1.3 pass
116 slacapra 1.11
117 nsmirnov 1.3 self.createScheduler_()
118 slacapra 1.11
119 nsmirnov 1.3 if common.logger.debugLevel() >= 6:
120     common.logger.debug(6, 'Used properties:')
121     keys = self.cfg_params.keys()
122     keys.sort()
123     for k in keys:
124     if self.cfg_params[k]:
125 corvo 1.42 common.logger.debug(6, ' '+k+' : '+str(self.cfg_params[k]))
126 nsmirnov 1.3 pass
127     else:
128     common.logger.debug(6, ' '+k+' : ')
129     pass
130     pass
131     common.logger.debug(6, 'End of used properties.\n')
132     pass
133     self.initializeActions_(opts)
134 nsmirnov 1.1 return
135    
136 nsmirnov 1.3 def processContinueOption_(self,opts):
137 nsmirnov 1.1
138     continue_dir = None
139 nsmirnov 1.4
140     # Look for the '-continue' option.
141    
142 nsmirnov 1.1 for opt in opts.keys():
143     if ( opt in ('-continue','-c') ):
144     self.flag_continue = 1
145     val = opts[opt]
146     if val:
147     if val[0] == '/': continue_dir = val # abs path
148     else: continue_dir = self.cwd + val # rel path
149     pass
150 nsmirnov 1.4 break
151     pass
152    
153     # Look for actions which has sense only with '-continue'
154    
155     if not self.flag_continue:
156     for opt in opts.keys():
157 slacapra 1.6 if ( opt in (self.aux_actions) ):
158 nsmirnov 1.4 self.flag_continue = 1
159     break
160 nsmirnov 1.1 pass
161     pass
162 slacapra 1.6 submit_flag=0
163     create_flag=0
164     for opt in opts.keys():
165     if opt == "-submit": submit_flag=1
166     if opt == "-create": create_flag=1
167     pass
168     if (submit_flag and not create_flag):
169 nsmirnov 1.7 msg = "'-submit' must be used with either '-create' or '-continue'."
170     raise CrabException(msg)
171 slacapra 1.6 pass
172 nsmirnov 1.1
173     if not self.flag_continue: return
174    
175     if not continue_dir:
176     prefix = common.prog_name + '_' + self.name + '_'
177     continue_dir = findLastWorkDir(prefix)
178     pass
179    
180     if not continue_dir:
181     raise CrabException('Cannot find last working directory.')
182    
183     if not os.path.exists(continue_dir):
184     msg = 'Cannot continue because the working directory <'
185     msg += continue_dir
186     msg += '> does not exist.'
187     raise CrabException(msg)
188    
189     # Instantiate WorkSpace
190 fanzago 1.49 common.work_space = WorkSpace(continue_dir, self.cfg_params)
191 nsmirnov 1.1
192     return
193    
194 nsmirnov 1.3 def processIniFile_(self, opts):
195 nsmirnov 1.1 """
196     Processes a configuration INI-file.
197     """
198    
199     # Extract cfg-file name from the cmd-line options.
200    
201     for opt in opts.keys():
202     if ( opt == '-cfg' ):
203     if self.flag_continue:
204     raise CrabException('-continue and -cfg cannot coexist.')
205     if opts[opt] : self.cfg_fname = opts[opt]
206     else : usage()
207     pass
208    
209     elif ( opt == '-name' ):
210     self.name = opts[opt]
211     pass
212    
213     pass
214    
215     # Set default cfg-fname
216    
217     if self.cfg_fname == None:
218     if self.flag_continue:
219     self.cfg_fname = common.work_space.cfgFileName()
220     else:
221     self.cfg_fname = common.prog_name+'.cfg'
222     pass
223     pass
224    
225     # Load cfg-file
226    
227     if string.lower(self.cfg_fname) != 'none':
228     if os.path.exists(self.cfg_fname):
229     self.cfg_params = loadConfig(self.cfg_fname)
230 corvo 1.42 # Better idea on where to put user info for ML?
231 corvo 1.52 self.cfg_params['user'] = os.getlogin()
232 corvo 1.42 # Better idea on where to put ML???? Looks crap here, but had no better thought!
233     if int(self.cfg_params['USER.activate_monalisa']): self.cfg_params['apmon'] = ApmonIf()
234 nsmirnov 1.1 pass
235     else:
236     msg = 'cfg-file '+self.cfg_fname+' not found.'
237     raise CrabException(msg)
238     pass
239     pass
240    
241     # process the [CRAB] section
242    
243     lhp = len('CRAB.')
244     for k in self.cfg_params.keys():
245     if len(k) >= lhp and k[:lhp] == 'CRAB.':
246     opt = '-'+k[lhp:]
247     if len(opt) >= 3 and opt[:3] == '-__': continue
248     if opt not in opts.keys():
249     opts[opt] = self.cfg_params[k]
250     pass
251     pass
252     pass
253    
254     return
255    
256 nsmirnov 1.3 def processOptions_(self, opts):
257 nsmirnov 1.1 """
258     Processes the command-line options.
259     """
260    
261     for opt in opts.keys():
262     val = opts[opt]
263    
264 nsmirnov 1.3 # Skip actions, they are processed later in initializeActions_()
265     if opt in self.main_actions:
266     self.cfg_params['CRAB.'+opt[1:]] = val
267     continue
268     if opt in self.aux_actions:
269     self.cfg_params['CRAB.'+opt[1:]] = val
270     continue
271 nsmirnov 1.1
272    
273     elif ( opt == '-cfg' ):
274     pass
275    
276     elif ( opt in ('-continue', '-c') ):
277 nsmirnov 1.4 # Already processed in processContinueOption_()
278 nsmirnov 1.1 pass
279    
280     elif ( opt == '-jobtype' ):
281     if val : self.job_type_name = string.upper(val)
282     else : usage()
283     pass
284    
285     elif ( opt == '-Q' ):
286     self.flag_quiet = 1
287     pass
288    
289     elif ( opt == '-debug' ):
290 slacapra 1.6 if val: self.debug_level = int(val)
291     else: self.debug_level = 1
292 nsmirnov 1.1 pass
293    
294     elif ( opt == '-scheduler' ):
295 fanzago 1.53 if val:
296     self.scheduler_name = 'boss'
297     self.flag_useboss = 1
298 nsmirnov 1.1 else:
299     print common.prog_name+". No value for '-scheduler'."
300     usage()
301     pass
302     pass
303 slacapra 1.22
304 fanzago 1.53 # elif ( opt in ('-use_boss', '-useboss') ):
305     # if ( val == '1' ):
306     # self.scheduler_name = 'boss'
307     # pass
308     # elif ( val == '0' ):
309     # pass
310     # else:
311     # print common.prog_name+'. Bad flag for -use_boss option:',\
312     # val,'Possible values are 0(=No) or 1(=Yes)'
313     # usage()
314     # pass
315     # pass
316 fanzago 1.14
317 nsmirnov 1.3 elif string.find(opt,'.') == -1:
318     print common.prog_name+'. Unrecognized option '+opt
319     usage()
320     pass
321 nsmirnov 1.1
322 nsmirnov 1.3 # Override config parameters from INI-file with cmd-line params
323     if string.find(opt,'.') == -1 :
324     self.cfg_params['CRAB.'+opt[1:]] = val
325 nsmirnov 1.1 pass
326 nsmirnov 1.3 else:
327 nsmirnov 1.1 # Command line parameters in the form -SECTION.ENTRY=VALUE
328     self.cfg_params[opt[1:]] = val
329     pass
330     pass
331     return
332    
333 slacapra 1.8 def parseRange_(self, aRange):
334 nsmirnov 1.4 """
335 slacapra 1.8 Takes as the input a string with a range defined in any of the following
336     way, including combination, and return a tuple with the ints defined
337     according to following table. A consistency check is done.
338     NB: the first job is "1", not "0".
339     'all' -> [1,2,..., NJobs]
340     '' -> [1,2,..., NJobs]
341     'n1' -> [n1]
342     'n1-n2' -> [n1, n1+1, n1+2, ..., n2-1, n2]
343     'n1,n2' -> [n1, n2]
344     'n1,n2-n3' -> [n1, n2, n2+1, n2+2, ..., n3-1, n3]
345     """
346     result = []
347    
348 slacapra 1.9 common.logger.debug(5,"parseRange_ "+str(aRange))
349     if aRange=='all' or aRange==None or aRange=='':
350 slacapra 1.8 result=range(0,common.jobDB.nJobs())
351     return result
352 slacapra 1.9 elif aRange=='0':
353     return result
354 slacapra 1.8
355     subRanges = string.split(aRange, ',')
356     for subRange in subRanges:
357     result = result+self.parseSimpleRange_(subRange)
358    
359     if self.checkUniqueness_(result):
360     return result
361     else:
362 slacapra 1.33 common.logger.message("Error "+result)
363 slacapra 1.8 return []
364    
365     def checkUniqueness_(self, list):
366     """
367 slacapra 1.9 check if a list contains only unique elements
368 slacapra 1.8 """
369    
370     uniqueList = []
371     # use a list comprehension statement (takes a while to understand)
372    
373     [uniqueList.append(it) for it in list if not uniqueList.count(it)]
374    
375     return (len(list)==len(uniqueList))
376    
377     def parseSimpleRange_(self, aRange):
378     """
379     Takes as the input a string with two integers separated by
380     the minus sign and returns the tuple with these numbers:
381     'n1-n2' -> [n1, n1+1, n1+2, ..., n2-1, n2]
382     'n1' -> [n1]
383     """
384     (start, end) = (None, None)
385    
386     result = []
387     minus = string.find(aRange, '-')
388     if ( minus < 0 ):
389     if isInt(aRange) and int(aRange)>0:
390 fanzago 1.37 # FEDE
391     #result.append(int(aRange)-1)
392     ###
393     result.append(aRange)
394 slacapra 1.8 else:
395 spiga 1.47 common.logger.message("parseSimpleRange_ ERROR "+aRange)
396     usage()
397     pass
398    
399 nsmirnov 1.4 pass
400     else:
401 slacapra 1.8 (start, end) = string.split(aRange, '-')
402     if isInt(start) and isInt(end) and int(start)>0 and int(start)<int(end):
403 spiga 1.38 #result=range(int(start)-1, int(end))
404     result=range(int(start), int(end)+1) #Daniele
405 slacapra 1.8 else:
406 slacapra 1.33 common.logger.message("parseSimpleRange_ ERROR "+start+end)
407 slacapra 1.8
408     return result
409 nsmirnov 1.4
410 nsmirnov 1.3 def initializeActions_(self, opts):
411 nsmirnov 1.1 """
412     For each user action instantiate a corresponding
413     object and put it in the action dictionary.
414     """
415 fanzago 1.53 # for opt in opts.keys():
416     # self.flag_useboss = 0
417     # if ( opt == '-use_boss'):
418     # val = opts[opt]
419     # if ( val == '1' ):
420     # self.flag_useboss = 1
421     # common.logger.message('Using BOSS')
422     # pass
423     # else:
424     # self.flag_useboss = 0
425     # pass
426 spiga 1.15
427     for opt in opts.keys():
428    
429 nsmirnov 1.1 val = opts[opt]
430 spiga 1.15
431    
432     if ( opt == '-create' ):
433 slacapra 1.22 ncjobs = 0
434 nsmirnov 1.1 if val:
435     if ( isInt(val) ):
436     ncjobs = int(val)
437     elif ( val == 'all'):
438     ncjobs = val
439     else:
440 nsmirnov 1.5 msg = 'Bad creation bunch size <'+str(val)+'>\n'
441     msg += ' Must be an integer or "all"'
442 slacapra 1.8 msg += ' Generic range is not allowed"'
443 nsmirnov 1.5 raise CrabException(msg)
444 nsmirnov 1.1 pass
445     else: ncjobs = 'all'
446    
447     if ncjobs != 0:
448 nsmirnov 1.3 # Instantiate Creator object
449 fanzago 1.56 self.creator = Creator(self.job_type_name,
450 nsmirnov 1.1 self.cfg_params,
451 corvo 1.55 ncjobs)
452 fanzago 1.56 self.actions[opt] = self.creator
453 nsmirnov 1.3
454     # Initialize the JobDB object if needed
455 nsmirnov 1.1 if not self.flag_continue:
456 fanzago 1.56 common.jobDB.create(self.creator.nJobs())
457 nsmirnov 1.1 pass
458 nsmirnov 1.3
459     # Create and initialize JobList
460    
461     common.job_list = JobList(common.jobDB.nJobs(),
462 fanzago 1.56 self.creator.jobType())
463 nsmirnov 1.3
464     common.job_list.setScriptNames(self.job_type_name+'.sh')
465     common.job_list.setJDLNames(self.job_type_name+'.jdl')
466 slacapra 1.12 common.job_list.setCfgNames(self.job_type_name+'.orcarc')
467 slacapra 1.9
468 fanzago 1.56 self.creator.writeJobsSpecsToDB()
469 nsmirnov 1.1 pass
470 nsmirnov 1.3 pass
471 nsmirnov 1.1
472     elif ( opt == '-submit' ):
473 slacapra 1.23
474     # total jobs
475     # get the first not already submitted
476 slacapra 1.24 common.logger.debug(5,'Total jobs '+str(common.jobDB.nJobs()))
477     lastSubmittedJob=0
478     for nj in range(common.jobDB.nJobs()):
479 fanzago 1.45 if (common.jobDB.status(nj) in ['S','K','RC','Y','A','D']):
480 slacapra 1.24 lastSubmittedJob +=1
481 slacapra 1.30 else: break
482 slacapra 1.24 # count job from 1
483 slacapra 1.30 totalJobsSubmittable = common.jobDB.nJobs()-lastSubmittedJob
484 slacapra 1.24 common.logger.debug(5,'lastSubmittedJob '+str(lastSubmittedJob))
485     common.logger.debug(5,'totalJobsSubmittable '+str(totalJobsSubmittable))
486 slacapra 1.23
487 slacapra 1.24 nsjobs = lastSubmittedJob+totalJobsSubmittable
488 slacapra 1.23 # get user request
489 slacapra 1.22 if val:
490     if ( isInt(val) ):
491 slacapra 1.24 tmp = int(val)
492     if (tmp >= totalJobsSubmittable):
493     common.logger.message('asking to submit '+str(tmp)+' jobs, but only '+str(totalJobsSubmittable)+' left: submitting those')
494     pass
495     else:
496     nsjobs=lastSubmittedJob+int(val)
497 slacapra 1.23 elif (val=='all'):
498     pass
499 slacapra 1.22 else:
500     msg = 'Bad submission option <'+str(val)+'>\n'
501     msg += ' Must be an integer or "all"'
502     msg += ' Generic range is not allowed"'
503     raise CrabException(msg)
504     pass
505 slacapra 1.24 common.logger.debug(5,'nsjobs '+str(nsjobs))
506 slacapra 1.23
507     # submit N from last submitted job
508 slacapra 1.24 nj_list = range(lastSubmittedJob, nsjobs)
509     common.logger.debug(5,'nj_list '+str(nj_list))
510 nsmirnov 1.5
511     if len(nj_list) != 0:
512 nsmirnov 1.3 # Instantiate Submitter object
513 fanzago 1.56 self.actions[opt] = Submitter(self.cfg_params, nj_list, self.creator.jobType())
514 nsmirnov 1.3
515     # Create and initialize JobList
516     if len(common.job_list) == 0 :
517     common.job_list = JobList(common.jobDB.nJobs(),
518     None)
519     common.job_list.setJDLNames(self.job_type_name+'.jdl')
520     pass
521 nsmirnov 1.1 pass
522 slacapra 1.30 pass
523 nsmirnov 1.1
524 nsmirnov 1.4 elif ( opt == '-list' ):
525 slacapra 1.8 jobs = self.parseRange_(val)
526    
527     common.jobDB.dump(jobs)
528 nsmirnov 1.4 pass
529    
530     elif ( opt == '-status' ):
531 slacapra 1.8 jobs = self.parseRange_(val)
532    
533 slacapra 1.9 if len(jobs) != 0:
534 spiga 1.16 if ( self.flag_useboss == 1 ):
535 corvo 1.42 self.actions[opt] = StatusBoss(self.cfg_params)
536 spiga 1.15 else:
537 corvo 1.46 self.actions[opt] = Status(self.cfg_params, jobs)
538 spiga 1.15 pass
539 nsmirnov 1.1 pass
540     pass
541 slacapra 1.22
542 nsmirnov 1.4 elif ( opt == '-kill' ):
543 slacapra 1.8
544 spiga 1.31 if ( self.flag_useboss == 1 ):
545     if val:
546     if val =='all':
547 fanzago 1.37 allBoss_id = common.scheduler.listBoss()
548     jobs = allBoss_id.keys()
549 spiga 1.31 else:
550     jobs = self.parseRange_(val)
551     common.scheduler.cancel(jobs)
552     else:
553     common.logger.message("Warning: with '-kill' you _MUST_ specify a job range or 'all'")
554     else:
555     if val:
556     jobs = self.parseRange_(val)
557    
558     for nj in jobs:
559     st = common.jobDB.status(nj)
560     if st == 'S':
561     jid = common.jobDB.jobId(nj)
562     common.logger.message("Killing job # "+`(nj+1)`)
563     common.scheduler.cancel(jid)
564     common.jobDB.setStatus(nj, 'K')
565     pass
566 slacapra 1.9 pass
567 spiga 1.31
568     common.jobDB.save()
569 nsmirnov 1.4 pass
570 spiga 1.31 else:
571     common.logger.message("Warning: with '-kill' you _MUST_ specify a job range or 'all'")
572 nsmirnov 1.1
573 slacapra 1.8 elif ( opt == '-getoutput' ):
574    
575 spiga 1.20 if ( self.flag_useboss == 1 ):
576     if val=='all' or val==None or val=='':
577 fanzago 1.37 allBoss_id = common.scheduler.listBoss()
578     jobs = allBoss_id.keys()
579 spiga 1.20 else:
580     jobs = self.parseRange_(val)
581     common.scheduler.getOutput(jobs)
582     else:
583     jobs = self.parseRange_(val)
584 spiga 1.13
585 slacapra 1.22 ## also this: create a ActorClass (GetOutput)
586 spiga 1.20 jobs_done = []
587     for nj in jobs:
588     st = common.jobDB.status(nj)
589     if st == 'D':
590 slacapra 1.12 jobs_done.append(nj)
591 spiga 1.20 pass
592     elif st == 'S':
593     jid = common.jobDB.jobId(nj)
594     currStatus = common.scheduler.queryStatus(jid)
595     if currStatus=="Done":
596     jobs_done.append(nj)
597     else:
598     msg = 'Job # '+`(nj+1)`+' submitted but still status '+currStatus+' not possible to get output'
599     common.logger.message(msg)
600     pass
601 slacapra 1.9 else:
602 spiga 1.20 # common.logger.message('Jobs #'+`(nj+1)`+' has status '+st+' not possible to get output')
603     pass
604 slacapra 1.12 pass
605    
606 spiga 1.20 for nj in jobs_done:
607 spiga 1.15 jid = common.jobDB.jobId(nj)
608     dir = common.scheduler.getOutput(jid)
609     common.jobDB.setStatus(nj, 'Y')
610 slacapra 1.12
611     # Rename the directory with results to smth readable
612     new_dir = common.work_space.resDir()
613 spiga 1.20 try:
614     files = os.listdir(dir)
615     for file in files:
616     os.rename(dir+'/'+file, new_dir+'/'+file)
617     os.rmdir(dir)
618     except OSError, e:
619     msg = 'rename files from '+dir+' to '+new_dir+' error: '
620     msg += str(e)
621     common.logger.message(msg)
622     # ignore error
623 slacapra 1.12 pass
624 spiga 1.20 pass
625 slacapra 1.22 ###
626    
627 spiga 1.13 resFlag = 0
628     exCode = common.scheduler.getExitStatus(jid)
629 spiga 1.34 Statistic.Monitor('retrieved',resFlag,jid,exCode)
630 slacapra 1.12
631     msg = 'Results of Job # '+`(nj+1)`+' are in '+new_dir
632     common.logger.message(msg)
633 nsmirnov 1.4 pass
634    
635     common.jobDB.save()
636     pass
637    
638     elif ( opt == '-resubmit' ):
639 fanzago 1.37 if ( self.flag_useboss == 1 ):
640     if val=='all' or val==None or val=='':
641     allBoss_id = common.scheduler.listBoss()
642     jobs = allBoss_id.keys()
643     else:
644     jobs = self.parseRange_(val)
645     else:
646     if val:
647     jobs = self.parseRange_(val)
648    
649 slacapra 1.11 if val:
650     # create a list of jobs to be resubmitted.
651 slacapra 1.8
652 slacapra 1.22 ### as before, create a Resubmittter Class
653 slacapra 1.11 nj_list = []
654     for nj in jobs:
655 fanzago 1.37 st = common.jobDB.status(int(nj)-1)
656 slacapra 1.11 if st in ['K','A']:
657 fanzago 1.37 nj_list.append(int(nj)-1)
658     common.jobDB.setStatus(int(nj)-1,'C')
659 slacapra 1.11 elif st == 'Y':
660 fanzago 1.39 common.scheduler.moveOutput(nj)
661 fanzago 1.37 nj_list.append(int(nj)-1)
662     st = common.jobDB.setStatus(int(nj)-1,'RC')
663 spiga 1.44 elif st in ['C','X']:
664     common.logger.message('Job #'+`int(nj)`+' has status '+crabJobStatusToString(st)+' not yet submitted!!!')
665 slacapra 1.11 pass
666 spiga 1.44 elif st == 'D':
667     common.logger.message('Job #'+`int(nj)`+' has status '+crabJobStatusToString(st)+' must be retrieved before resubmission')
668 slacapra 1.11 else:
669 spiga 1.44 common.logger.message('Job #'+`nj`+' has status '+crabJobStatusToString(st)+' must be "killed" before resubmission')
670 slacapra 1.9 pass
671 slacapra 1.8
672 fanzago 1.37 if len(common.job_list) == 0 :
673     common.job_list = JobList(common.jobDB.nJobs(),None)
674     common.job_list.setJDLNames(self.job_type_name+'.jdl')
675     pass
676    
677 slacapra 1.11 if len(nj_list) != 0:
678 fanzago 1.37 common.scheduler.resubmit(nj_list)
679 slacapra 1.11 # Instantiate Submitter object
680 fanzago 1.56 self.actions[opt] = Submitter(self.cfg_params, nj_list, self.creator.jobType())
681 slacapra 1.11
682 slacapra 1.8 pass
683     pass
684 slacapra 1.11 else:
685     common.logger.message("Warning: with '-resubmit' you _MUST_ specify a job range or 'all'")
686     common.logger.message("WARNING: _all_ job specified in the rage will be resubmitted!!!")
687     pass
688 fanzago 1.37 common.jobDB.save()
689 slacapra 1.8 pass
690    
691     elif ( opt == '-cancelAndResubmit' ):
692 nsmirnov 1.5
693 spiga 1.47 if ( self.flag_useboss == 1 ):
694     if val:
695     if val =='all':
696     allBoss_id = common.scheduler.listBoss()
697     jobs = allBoss_id.keys()
698     else:
699     jobs = self.parseRange_(val)
700     # kill submitted jobs
701     common.scheduler.cancel(jobs)
702     else:
703     common.logger.message("Warning: with '-cancelAndResubmit' you _MUST_ specify a job range or 'all'")
704     else:
705     if val:
706     jobs = self.parseRange_(val)
707     else:
708     common.logger.message("Warning: with '-kill' you _MUST_ specify a job range or 'all'")
709 nsmirnov 1.5 pass
710    
711 spiga 1.47 # resubmit cancelled jobs.
712     if val:
713     nj_list = []
714     for nj in jobs:
715     if ( self.flag_useboss != 1 ):
716     st = common.jobDB.status(nj)
717     if st == 'S':
718     jid = common.jobDB.jobId(nj)
719     common.scheduler.cancel(jid)
720     st = 'K'
721     common.jobDB.setStatus(nj, st)
722     pass
723     common.jobDB.save()
724     pass
725     st = common.jobDB.status(int(nj)-1)
726     if st in ['K','A']:
727     nj_list.append(int(nj)-1)
728     common.jobDB.setStatus(int(nj)-1,'C')
729     elif st == 'Y':
730     common.scheduler.moveOutput(nj)
731     nj_list.append(int(nj)-1)
732     st = common.jobDB.setStatus(int(nj)-1,'RC')
733     elif st in ['C','X']:
734     common.logger.message('Job #'+`int(nj)`+' has status '+crabJobStatusToString(st)+' not yet submitted!!!')
735     pass
736     elif st == 'D':
737     common.logger.message('Job #'+`int(nj)`+' has status '+crabJobStatusToString(st)+' must be retrieved before resubmission')
738     else:
739     common.logger.message('Job #'+`nj`+' has status '+crabJobStatusToString(st)+' must be "killed" before resubmission')
740     pass
741    
742 nsmirnov 1.5 if len(common.job_list) == 0 :
743 spiga 1.47 common.job_list = JobList(common.jobDB.nJobs(),None)
744 nsmirnov 1.5 common.job_list.setJDLNames(self.job_type_name+'.jdl')
745     pass
746 spiga 1.47
747     if len(nj_list) != 0:
748     common.scheduler.resubmit(nj_list)
749 fanzago 1.56 self.actions[opt] = Submitter(self.cfg_params, nj_list, self.creator.jobtype())
750 spiga 1.47 pass
751     pass
752     else:
753     common.logger.message("WARNING: _all_ job specified in the rage will be cancelled and resubmitted!!!")
754 nsmirnov 1.5 pass
755 spiga 1.47 common.jobDB.save()
756 nsmirnov 1.4 pass
757 spiga 1.47
758 slacapra 1.28 elif ( opt == '-testJdl' ):
759 slacapra 1.8 jobs = self.parseRange_(val)
760     nj_list = []
761     for nj in jobs:
762     st = common.jobDB.status(nj)
763 slacapra 1.12 if st == 'C': nj_list.append(nj)
764 slacapra 1.8 pass
765    
766     if len(nj_list) != 0:
767     # Instantiate Submitter object
768     self.actions[opt] = Checker(self.cfg_params, nj_list)
769    
770     # Create and initialize JobList
771    
772     if len(common.job_list) == 0 :
773     common.job_list = JobList(common.jobDB.nJobs(), None)
774     common.job_list.setJDLNames(self.job_type_name+'.jdl')
775     pass
776     pass
777    
778 slacapra 1.9 elif ( opt == '-postMortem' ):
779     jobs = self.parseRange_(val)
780     nj_list = []
781     for nj in jobs:
782 fanzago 1.45 # fede: nj scala di uno perche' e' l'internal id di boss
783     # ed il jobDB parte da zero ...
784     st = common.jobDB.status(int(nj)-1)
785     if st not in ['X', 'C']: nj_list.append(int(nj))
786 slacapra 1.9 pass
787    
788     if len(nj_list) != 0:
789     # Instantiate Submitter object
790 spiga 1.35 self.actions[opt] = PostMortem(self.cfg_params, nj_list,self.flag_useboss)
791 slacapra 1.9
792     # Create and initialize JobList
793    
794     if len(common.job_list) == 0 :
795     common.job_list = JobList(common.jobDB.nJobs(), None)
796     common.job_list.setJDLNames(self.job_type_name+'.jdl')
797     pass
798     pass
799 slacapra 1.33 else:
800     common.logger.message("No jobs to analyze")
801 slacapra 1.9
802     elif ( opt == '-clean' ):
803     if val != None:
804     raise CrabException("No range allowed for '-clean'")
805    
806 corvo 1.46 theCleaner = Cleaner(self.scheduler_name == 'boss', self.cfg_params)
807 slacapra 1.36 theCleaner.clean()
808 slacapra 1.9
809 nsmirnov 1.1 pass
810     return
811    
812 nsmirnov 1.3 def createWorkingSpace_(self):
813 slacapra 1.9 new_dir = ''
814    
815     try:
816     new_dir = self.cfg_params['USER.ui_working_dir']
817 corvo 1.50 self.cfg_params['taskId'] = self.cfg_params['user'] + '_' + string.split(new_dir, '/')[len(string.split(new_dir, '/')) - 1] + '_' + self.current_time
818 fanzago 1.48 if os.path.exists(new_dir):
819     if os.listdir(new_dir):
820     msg = new_dir + ' already exists and is not empty. Please remove it before create new task'
821     raise CrabException(msg)
822 slacapra 1.9 except KeyError:
823     new_dir = common.prog_name + '_' + self.name + '_' + self.current_time
824 corvo 1.50 self.cfg_params['taskId'] = self.cfg_params['user'] + '_' + new_dir
825 slacapra 1.9 new_dir = self.cwd + new_dir
826     pass
827 fanzago 1.49 common.work_space = WorkSpace(new_dir, self.cfg_params)
828 nsmirnov 1.1 common.work_space.create()
829     return
830    
831 nsmirnov 1.3 def loadConfiguration_(self, opts):
832 nsmirnov 1.1
833     save_opts = common.work_space.loadSavedOptions()
834    
835     # Override saved options with new command-line options
836    
837     for k in opts.keys():
838     save_opts[k] = opts[k]
839     pass
840    
841     # Return updated options
842     return save_opts
843    
844 nsmirnov 1.3 def createLogger_(self, args):
845 nsmirnov 1.1
846     log = Logger()
847     log.quiet(self.flag_quiet)
848     log.setDebugLevel(self.debug_level)
849     log.write(args+'\n')
850 nsmirnov 1.3 log.message(self.headerString_())
851 nsmirnov 1.1 log.flush()
852     common.logger = log
853     return
854    
855 nsmirnov 1.3 def updateHistory_(self, args):
856 nsmirnov 1.1 history_fname = common.prog_name+'.history'
857     history_file = open(history_fname, 'a')
858     history_file.write(self.current_time+': '+args+'\n')
859     history_file.close()
860     return
861    
862 nsmirnov 1.3 def headerString_(self):
863 nsmirnov 1.1 """
864     Creates a string describing program options either given in
865     the command line or their default values.
866     """
867     header = common.prog_name + ' (version ' + common.prog_version_str + \
868     ') running on ' + \
869     time.ctime(time.time())+'\n\n' + \
870     common.prog_name+'. Working options:\n'
871 corvo 1.51 print self.job_type_name
872 nsmirnov 1.1 header = header +\
873     ' scheduler ' + self.scheduler_name + '\n'+\
874     ' job type ' + self.job_type_name + '\n'+\
875     ' working directory ' + common.work_space.topDir()\
876     + '\n'
877     return header
878    
879 nsmirnov 1.3 def createScheduler_(self):
880 nsmirnov 1.1 """
881     Creates a scheduler object instantiated by its name.
882     """
883     klass_name = 'Scheduler' + string.capitalize(self.scheduler_name)
884     file_name = klass_name
885     try:
886     klass = importName(file_name, klass_name)
887     except KeyError:
888     msg = 'No `class '+klass_name+'` found in file `'+file_name+'.py`'
889     raise CrabException(msg)
890     except ImportError, e:
891     msg = 'Cannot create scheduler '+self.scheduler_name
892     msg += ' (file: '+file_name+', class '+klass_name+'):\n'
893     msg += str(e)
894     raise CrabException(msg)
895    
896     common.scheduler = klass()
897     common.scheduler.configure(self.cfg_params)
898     return
899    
900 corvo 1.55 # def createJobtype_(self):
901     # """
902     # Create the jobtype specified in the crab.cfg file
903     # """
904     # file_name = 'cms_'+ string.lower(self.job_type_name)
905     # klass_name = string.capitalize(self.job_type_name)
906     #
907     # try:
908     # klass = importName(file_name, klass_name)
909     # except KeyError:
910     # msg = 'No `class '+klass_name+'` found in file `'+file_name+'.py`'
911     # raise CrabException(msg)
912     # except ImportError, e:
913     # msg = 'Cannot create job type '+self.job_type_name
914     # msg += ' (file: '+file_name+', class '+klass_name+'):\n'
915     # msg += str(e)
916     # raise CrabException(msg)
917     # job_type = klass(self.cfg_params)
918     # return job_type
919 corvo 1.51
920 nsmirnov 1.1 def run(self):
921     """
922     For each
923     """
924    
925     for act in self.main_actions:
926     if act in self.actions.keys(): self.actions[act].run()
927     pass
928    
929     for act in self.aux_actions:
930     if act in self.actions.keys(): self.actions[act].run()
931     pass
932     return
933    
934     ###########################################################################
935     def processHelpOptions(opts):
936    
937 slacapra 1.11 if len(opts):
938     for opt in opts.keys():
939     if opt in ('-v', '-version', '--version') :
940     print Crab.version()
941     return 1
942     if opt in ('-h','-help','--help') :
943     if opts[opt] : help(opts[opt])
944     else: help()
945     return 1
946     else:
947     usage()
948 nsmirnov 1.1
949     return 0
950    
951     ###########################################################################
952     if __name__ == '__main__':
953    
954 fanzago 1.18
955 fanzago 1.19 # # Initial settings for Python modules. Avoid appending manually lib paths.
956     # try:
957     # path=os.environ['EDG_WL_LOCATION']
958     # except:
959     # print "Error: Please set the EDG_WL_LOCATION environment variable pointing to the userinterface installation path"
960     # sys.exit(1)
961     #
962     # libPath=os.path.join(path, "lib")
963     # sys.path.append(libPath)
964     # libPath=os.path.join(path, "lib", "python")
965     # sys.path.append(libPath)
966 fanzago 1.18
967    
968 nsmirnov 1.1 # Parse command-line options and create a dictionary with
969     # key-value pairs.
970    
971     options = parseOptions(sys.argv[1:])
972    
973     # Process "help" options, such as '-help', '-version'
974    
975 slacapra 1.11 if processHelpOptions(options) : sys.exit(0)
976 nsmirnov 1.1
977     # Create, initialize, and run a Crab object
978    
979     try:
980 nsmirnov 1.3 crab = Crab(options)
981 nsmirnov 1.1 crab.run()
982 corvo 1.54 crab.cfg_params['apmon'].free()
983 nsmirnov 1.1 except CrabException, e:
984     print '\n' + common.prog_name + ': ' + str(e) + '\n'
985     if common.logger:
986     common.logger.write('ERROR: '+str(e)+'\n')
987     pass
988     pass
989    
990     pass