ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/crab.py
Revision: 1.51
Committed: Tue Mar 14 15:52:05 2006 UTC (19 years, 1 month ago) by corvo
Content type: text/x-python
Branch: MAIN
Changes since 1.50: +36 -4 lines
Log Message:
Fix Submitter parameters

File Contents

# User Rev Content
1 slacapra 1.27 #!/usr/bin/env python2.2
2 nsmirnov 1.1 from crab_help import *
3     from crab_util import *
4     from crab_exceptions import *
5     from crab_logger import Logger
6     from WorkSpace import WorkSpace
7     from JobDB import JobDB
8 nsmirnov 1.3 from JobList import JobList
9 nsmirnov 1.1 from Creator import Creator
10     from Submitter import Submitter
11 slacapra 1.8 from Checker import Checker
12 slacapra 1.9 from PostMortem import PostMortem
13     from Status import Status
14 spiga 1.16 from StatusBoss import StatusBoss
15 corvo 1.42 from ApmonIf import ApmonIf
16 slacapra 1.36 from Cleaner import Cleaner
17 nsmirnov 1.1 import common
18 spiga 1.13 import Statistic
19 nsmirnov 1.1
20     import sys, os, time, string
21    
22     ###########################################################################
23     class Crab:
24 nsmirnov 1.3 def __init__(self, opts):
25 nsmirnov 1.1
26     # The order of main_actions is important !
27 slacapra 1.28 self.main_actions = [ '-create', '-submit' ]
28 slacapra 1.8 self.aux_actions = [ '-list', '-kill', '-status', '-getoutput',
29 slacapra 1.28 '-resubmit' , '-cancelAndResubmit', '-testJdl', '-postMortem', '-clean']
30 nsmirnov 1.1
31     # Dictionary of actions, e.g. '-create' -> object of class Creator
32     self.actions = {}
33    
34     # Configuration file
35     self.cfg_fname = None
36     # Dictionary with configuration parameters
37     self.cfg_params = {}
38    
39     # Current working directory
40     self.cwd = os.getcwd()+'/'
41     # Current time in format 'yymmdd_hhmmss'
42     self.current_time = time.strftime('%y%m%d_%H%M%S',
43     time.localtime(time.time()))
44    
45 nsmirnov 1.3 # Session name (?) Do we need this ?
46 nsmirnov 1.1 self.name = '0'
47    
48     # Job type
49     self.job_type_name = None
50 corvo 1.51 # marco
51     self.job_type = None
52 nsmirnov 1.1
53     # Continuation flag
54     self.flag_continue = 0
55    
56     # quiet mode, i.e. no output on screen
57     self.flag_quiet = 0
58     # produce more output
59     self.debug_level = 0
60    
61 nsmirnov 1.3 # Scheduler name, e.g. 'edg', 'lsf'
62 fanzago 1.14 self.scheduler_name = ''
63 nsmirnov 1.1
64 nsmirnov 1.3 self.initialize_(opts)
65 nsmirnov 1.1
66     return
67    
68 nsmirnov 1.7 def version():
69 nsmirnov 1.1 return common.prog_version_str
70    
71 nsmirnov 1.7 version = staticmethod(version)
72    
73 nsmirnov 1.3 def initialize_(self, opts):
74 nsmirnov 1.1
75     # Process the '-continue' option first because
76     # in the case of continuation the CRAB configuration
77     # parameters are loaded from already existing Working Space.
78 nsmirnov 1.3 self.processContinueOption_(opts)
79 nsmirnov 1.1
80     # Process ini-file first, then command line options
81     # because they override ini-file settings.
82    
83 nsmirnov 1.3 self.processIniFile_(opts)
84 nsmirnov 1.1
85 nsmirnov 1.3 if self.flag_continue: opts = self.loadConfiguration_(opts)
86 nsmirnov 1.1
87 nsmirnov 1.3 self.processOptions_(opts)
88 nsmirnov 1.1
89     if not self.flag_continue:
90 nsmirnov 1.3 self.createWorkingSpace_()
91 slacapra 1.9 optsToBeSaved={}
92     for it in opts.keys():
93     if (it in self.main_actions) or (it in self.aux_actions) or (it == '-debug'):
94     pass
95     else:
96     optsToBeSaved[it]=opts[it]
97     common.work_space.saveConfiguration(optsToBeSaved, self.cfg_fname)
98 nsmirnov 1.1 pass
99    
100     # At this point all configuration options have been read.
101    
102     args = string.join(sys.argv,' ')
103 slacapra 1.11
104 nsmirnov 1.3 self.updateHistory_(args)
105 slacapra 1.11
106 nsmirnov 1.3 self.createLogger_(args)
107 slacapra 1.11
108 nsmirnov 1.1 common.jobDB = JobDB()
109 corvo 1.51
110     # marco
111    
112     self.job_type = self.createJobtype_()
113    
114     # marco
115 slacapra 1.11
116 nsmirnov 1.3 if self.flag_continue:
117 slacapra 1.12 try:
118     common.jobDB.load()
119 corvo 1.50 self.cfg_params['taskId'] = common.jobDB._jobs[0].taskId
120 slacapra 1.12 common.logger.debug(6, str(common.jobDB))
121     except DBException,e:
122     pass
123 nsmirnov 1.3 pass
124 slacapra 1.11
125 nsmirnov 1.3 self.createScheduler_()
126 slacapra 1.11
127 nsmirnov 1.3 if common.logger.debugLevel() >= 6:
128     common.logger.debug(6, 'Used properties:')
129     keys = self.cfg_params.keys()
130     keys.sort()
131     for k in keys:
132     if self.cfg_params[k]:
133 corvo 1.42 common.logger.debug(6, ' '+k+' : '+str(self.cfg_params[k]))
134 nsmirnov 1.3 pass
135     else:
136     common.logger.debug(6, ' '+k+' : ')
137     pass
138     pass
139     common.logger.debug(6, 'End of used properties.\n')
140     pass
141     self.initializeActions_(opts)
142 nsmirnov 1.1 return
143    
144 nsmirnov 1.3 def processContinueOption_(self,opts):
145 nsmirnov 1.1
146     continue_dir = None
147 nsmirnov 1.4
148     # Look for the '-continue' option.
149    
150 nsmirnov 1.1 for opt in opts.keys():
151     if ( opt in ('-continue','-c') ):
152     self.flag_continue = 1
153     val = opts[opt]
154     if val:
155     if val[0] == '/': continue_dir = val # abs path
156     else: continue_dir = self.cwd + val # rel path
157     pass
158 nsmirnov 1.4 break
159     pass
160    
161     # Look for actions which has sense only with '-continue'
162    
163     if not self.flag_continue:
164     for opt in opts.keys():
165 slacapra 1.6 if ( opt in (self.aux_actions) ):
166 nsmirnov 1.4 self.flag_continue = 1
167     break
168 nsmirnov 1.1 pass
169     pass
170 slacapra 1.6 submit_flag=0
171     create_flag=0
172     for opt in opts.keys():
173     if opt == "-submit": submit_flag=1
174     if opt == "-create": create_flag=1
175     pass
176     if (submit_flag and not create_flag):
177 nsmirnov 1.7 msg = "'-submit' must be used with either '-create' or '-continue'."
178     raise CrabException(msg)
179 slacapra 1.6 pass
180 nsmirnov 1.1
181     if not self.flag_continue: return
182    
183     if not continue_dir:
184     prefix = common.prog_name + '_' + self.name + '_'
185     continue_dir = findLastWorkDir(prefix)
186     pass
187    
188     if not continue_dir:
189     raise CrabException('Cannot find last working directory.')
190    
191     if not os.path.exists(continue_dir):
192     msg = 'Cannot continue because the working directory <'
193     msg += continue_dir
194     msg += '> does not exist.'
195     raise CrabException(msg)
196    
197     # Instantiate WorkSpace
198 fanzago 1.49 common.work_space = WorkSpace(continue_dir, self.cfg_params)
199 nsmirnov 1.1
200     return
201    
202 nsmirnov 1.3 def processIniFile_(self, opts):
203 nsmirnov 1.1 """
204     Processes a configuration INI-file.
205     """
206    
207     # Extract cfg-file name from the cmd-line options.
208    
209     for opt in opts.keys():
210     if ( opt == '-cfg' ):
211     if self.flag_continue:
212     raise CrabException('-continue and -cfg cannot coexist.')
213     if opts[opt] : self.cfg_fname = opts[opt]
214     else : usage()
215     pass
216    
217     elif ( opt == '-name' ):
218     self.name = opts[opt]
219     pass
220    
221     pass
222    
223     # Set default cfg-fname
224    
225     if self.cfg_fname == None:
226     if self.flag_continue:
227     self.cfg_fname = common.work_space.cfgFileName()
228     else:
229     self.cfg_fname = common.prog_name+'.cfg'
230     pass
231     pass
232    
233     # Load cfg-file
234    
235     if string.lower(self.cfg_fname) != 'none':
236     if os.path.exists(self.cfg_fname):
237     self.cfg_params = loadConfig(self.cfg_fname)
238 corvo 1.42 # Better idea on where to put user info for ML?
239 corvo 1.51 # self.cfg_params['user'] = os.getlogin()
240     self.cfg_params['user'] = "corvo"
241 corvo 1.42 # Better idea on where to put ML???? Looks crap here, but had no better thought!
242     if int(self.cfg_params['USER.activate_monalisa']): self.cfg_params['apmon'] = ApmonIf()
243 nsmirnov 1.1 pass
244     else:
245     msg = 'cfg-file '+self.cfg_fname+' not found.'
246     raise CrabException(msg)
247     pass
248     pass
249    
250     # process the [CRAB] section
251    
252     lhp = len('CRAB.')
253     for k in self.cfg_params.keys():
254     if len(k) >= lhp and k[:lhp] == 'CRAB.':
255     opt = '-'+k[lhp:]
256     if len(opt) >= 3 and opt[:3] == '-__': continue
257     if opt not in opts.keys():
258     opts[opt] = self.cfg_params[k]
259     pass
260     pass
261     pass
262    
263     return
264    
265 nsmirnov 1.3 def processOptions_(self, opts):
266 nsmirnov 1.1 """
267     Processes the command-line options.
268     """
269    
270     for opt in opts.keys():
271     val = opts[opt]
272    
273 nsmirnov 1.3 # Skip actions, they are processed later in initializeActions_()
274     if opt in self.main_actions:
275     self.cfg_params['CRAB.'+opt[1:]] = val
276     continue
277     if opt in self.aux_actions:
278     self.cfg_params['CRAB.'+opt[1:]] = val
279     continue
280 nsmirnov 1.1
281    
282     elif ( opt == '-cfg' ):
283     pass
284    
285     elif ( opt in ('-continue', '-c') ):
286 nsmirnov 1.4 # Already processed in processContinueOption_()
287 nsmirnov 1.1 pass
288    
289     elif ( opt == '-jobtype' ):
290     if val : self.job_type_name = string.upper(val)
291     else : usage()
292     pass
293    
294     elif ( opt == '-Q' ):
295     self.flag_quiet = 1
296     pass
297    
298     elif ( opt == '-debug' ):
299 slacapra 1.6 if val: self.debug_level = int(val)
300     else: self.debug_level = 1
301 nsmirnov 1.1 pass
302    
303     elif ( opt == '-scheduler' ):
304     if val: self.scheduler_name = val
305     else:
306     print common.prog_name+". No value for '-scheduler'."
307     usage()
308     pass
309     pass
310 slacapra 1.22
311 fanzago 1.18 elif ( opt in ('-use_boss', '-useboss') ):
312     if ( val == '1' ):
313     self.scheduler_name = 'boss'
314     pass
315     elif ( val == '0' ):
316     pass
317     else:
318     print common.prog_name+'. Bad flag for -use_boss option:',\
319     val,'Possible values are 0(=No) or 1(=Yes)'
320     usage()
321     pass
322 fanzago 1.14 pass
323    
324 nsmirnov 1.3 elif string.find(opt,'.') == -1:
325     print common.prog_name+'. Unrecognized option '+opt
326     usage()
327     pass
328 nsmirnov 1.1
329 nsmirnov 1.3 # Override config parameters from INI-file with cmd-line params
330     if string.find(opt,'.') == -1 :
331     self.cfg_params['CRAB.'+opt[1:]] = val
332 nsmirnov 1.1 pass
333 nsmirnov 1.3 else:
334 nsmirnov 1.1 # Command line parameters in the form -SECTION.ENTRY=VALUE
335     self.cfg_params[opt[1:]] = val
336     pass
337     pass
338     return
339    
340 slacapra 1.8 def parseRange_(self, aRange):
341 nsmirnov 1.4 """
342 slacapra 1.8 Takes as the input a string with a range defined in any of the following
343     way, including combination, and return a tuple with the ints defined
344     according to following table. A consistency check is done.
345     NB: the first job is "1", not "0".
346     'all' -> [1,2,..., NJobs]
347     '' -> [1,2,..., NJobs]
348     'n1' -> [n1]
349     'n1-n2' -> [n1, n1+1, n1+2, ..., n2-1, n2]
350     'n1,n2' -> [n1, n2]
351     'n1,n2-n3' -> [n1, n2, n2+1, n2+2, ..., n3-1, n3]
352     """
353     result = []
354    
355 slacapra 1.9 common.logger.debug(5,"parseRange_ "+str(aRange))
356     if aRange=='all' or aRange==None or aRange=='':
357 slacapra 1.8 result=range(0,common.jobDB.nJobs())
358     return result
359 slacapra 1.9 elif aRange=='0':
360     return result
361 slacapra 1.8
362     subRanges = string.split(aRange, ',')
363     for subRange in subRanges:
364     result = result+self.parseSimpleRange_(subRange)
365    
366     if self.checkUniqueness_(result):
367     return result
368     else:
369 slacapra 1.33 common.logger.message("Error "+result)
370 slacapra 1.8 return []
371    
372     def checkUniqueness_(self, list):
373     """
374 slacapra 1.9 check if a list contains only unique elements
375 slacapra 1.8 """
376    
377     uniqueList = []
378     # use a list comprehension statement (takes a while to understand)
379    
380     [uniqueList.append(it) for it in list if not uniqueList.count(it)]
381    
382     return (len(list)==len(uniqueList))
383    
384     def parseSimpleRange_(self, aRange):
385     """
386     Takes as the input a string with two integers separated by
387     the minus sign and returns the tuple with these numbers:
388     'n1-n2' -> [n1, n1+1, n1+2, ..., n2-1, n2]
389     'n1' -> [n1]
390     """
391     (start, end) = (None, None)
392    
393     result = []
394     minus = string.find(aRange, '-')
395     if ( minus < 0 ):
396     if isInt(aRange) and int(aRange)>0:
397 fanzago 1.37 # FEDE
398     #result.append(int(aRange)-1)
399     ###
400     result.append(aRange)
401 slacapra 1.8 else:
402 spiga 1.47 common.logger.message("parseSimpleRange_ ERROR "+aRange)
403     usage()
404     pass
405    
406 nsmirnov 1.4 pass
407     else:
408 slacapra 1.8 (start, end) = string.split(aRange, '-')
409     if isInt(start) and isInt(end) and int(start)>0 and int(start)<int(end):
410 spiga 1.38 #result=range(int(start)-1, int(end))
411     result=range(int(start), int(end)+1) #Daniele
412 slacapra 1.8 else:
413 slacapra 1.33 common.logger.message("parseSimpleRange_ ERROR "+start+end)
414 slacapra 1.8
415     return result
416 nsmirnov 1.4
417 nsmirnov 1.3 def initializeActions_(self, opts):
418 nsmirnov 1.1 """
419     For each user action instantiate a corresponding
420     object and put it in the action dictionary.
421     """
422     for opt in opts.keys():
423 spiga 1.32 self.flag_useboss = 0
424 spiga 1.16 if ( opt == '-use_boss'):
425 slacapra 1.25 val = opts[opt]
426     if ( val == '1' ):
427     self.flag_useboss = 1
428     common.logger.message('Using BOSS')
429     pass
430     else:
431     self.flag_useboss = 0
432     pass
433 spiga 1.15
434     for opt in opts.keys():
435    
436 nsmirnov 1.1 val = opts[opt]
437 spiga 1.15
438    
439     if ( opt == '-create' ):
440 slacapra 1.22 ncjobs = 0
441 nsmirnov 1.1 if val:
442     if ( isInt(val) ):
443     ncjobs = int(val)
444     elif ( val == 'all'):
445     ncjobs = val
446     else:
447 nsmirnov 1.5 msg = 'Bad creation bunch size <'+str(val)+'>\n'
448     msg += ' Must be an integer or "all"'
449 slacapra 1.8 msg += ' Generic range is not allowed"'
450 nsmirnov 1.5 raise CrabException(msg)
451 nsmirnov 1.1 pass
452     else: ncjobs = 'all'
453    
454     if ncjobs != 0:
455 nsmirnov 1.3 # Instantiate Creator object
456 nsmirnov 1.1 creator = Creator(self.job_type_name,
457     self.cfg_params,
458 corvo 1.51 ncjobs, self.job_type)
459 nsmirnov 1.1 self.actions[opt] = creator
460 nsmirnov 1.3
461     # Initialize the JobDB object if needed
462 nsmirnov 1.1 if not self.flag_continue:
463     common.jobDB.create(creator.nJobs())
464     pass
465 nsmirnov 1.3
466     # Create and initialize JobList
467    
468     common.job_list = JobList(common.jobDB.nJobs(),
469 corvo 1.51 self.job_type)
470     # marco
471     # creator.jobType())
472 nsmirnov 1.3
473     common.job_list.setScriptNames(self.job_type_name+'.sh')
474     common.job_list.setJDLNames(self.job_type_name+'.jdl')
475 slacapra 1.12 common.job_list.setCfgNames(self.job_type_name+'.orcarc')
476 slacapra 1.9
477     creator.writeJobsSpecsToDB()
478 nsmirnov 1.1 pass
479 nsmirnov 1.3 pass
480 nsmirnov 1.1
481     elif ( opt == '-submit' ):
482 slacapra 1.23
483     # total jobs
484     # get the first not already submitted
485 slacapra 1.24 common.logger.debug(5,'Total jobs '+str(common.jobDB.nJobs()))
486     lastSubmittedJob=0
487     for nj in range(common.jobDB.nJobs()):
488 fanzago 1.45 if (common.jobDB.status(nj) in ['S','K','RC','Y','A','D']):
489 slacapra 1.24 lastSubmittedJob +=1
490 slacapra 1.30 else: break
491 slacapra 1.24 # count job from 1
492 slacapra 1.30 totalJobsSubmittable = common.jobDB.nJobs()-lastSubmittedJob
493 slacapra 1.24 common.logger.debug(5,'lastSubmittedJob '+str(lastSubmittedJob))
494     common.logger.debug(5,'totalJobsSubmittable '+str(totalJobsSubmittable))
495 slacapra 1.23
496 slacapra 1.24 nsjobs = lastSubmittedJob+totalJobsSubmittable
497 slacapra 1.23 # get user request
498 slacapra 1.22 if val:
499     if ( isInt(val) ):
500 slacapra 1.24 tmp = int(val)
501     if (tmp >= totalJobsSubmittable):
502     common.logger.message('asking to submit '+str(tmp)+' jobs, but only '+str(totalJobsSubmittable)+' left: submitting those')
503     pass
504     else:
505     nsjobs=lastSubmittedJob+int(val)
506 slacapra 1.23 elif (val=='all'):
507     pass
508 slacapra 1.22 else:
509     msg = 'Bad submission option <'+str(val)+'>\n'
510     msg += ' Must be an integer or "all"'
511     msg += ' Generic range is not allowed"'
512     raise CrabException(msg)
513     pass
514 slacapra 1.24 common.logger.debug(5,'nsjobs '+str(nsjobs))
515 slacapra 1.23
516     # submit N from last submitted job
517 slacapra 1.24 nj_list = range(lastSubmittedJob, nsjobs)
518     common.logger.debug(5,'nj_list '+str(nj_list))
519 nsmirnov 1.5
520     if len(nj_list) != 0:
521 nsmirnov 1.3 # Instantiate Submitter object
522 corvo 1.51 self.actions[opt] = Submitter(self.cfg_params, nj_list, self.job_type)
523 nsmirnov 1.3
524     # Create and initialize JobList
525     if len(common.job_list) == 0 :
526     common.job_list = JobList(common.jobDB.nJobs(),
527     None)
528     common.job_list.setJDLNames(self.job_type_name+'.jdl')
529     pass
530 nsmirnov 1.1 pass
531 slacapra 1.30 pass
532 nsmirnov 1.1
533 nsmirnov 1.4 elif ( opt == '-list' ):
534 slacapra 1.8 jobs = self.parseRange_(val)
535    
536     common.jobDB.dump(jobs)
537 nsmirnov 1.4 pass
538    
539     elif ( opt == '-status' ):
540 slacapra 1.8 jobs = self.parseRange_(val)
541    
542 slacapra 1.9 if len(jobs) != 0:
543 spiga 1.16 if ( self.flag_useboss == 1 ):
544 corvo 1.42 self.actions[opt] = StatusBoss(self.cfg_params)
545 spiga 1.15 else:
546 corvo 1.46 self.actions[opt] = Status(self.cfg_params, jobs)
547 spiga 1.15 pass
548 nsmirnov 1.1 pass
549     pass
550 slacapra 1.22
551 nsmirnov 1.4 elif ( opt == '-kill' ):
552 slacapra 1.8
553 spiga 1.31 if ( self.flag_useboss == 1 ):
554     if val:
555     if val =='all':
556 fanzago 1.37 allBoss_id = common.scheduler.listBoss()
557     jobs = allBoss_id.keys()
558 spiga 1.31 else:
559     jobs = self.parseRange_(val)
560     common.scheduler.cancel(jobs)
561     else:
562     common.logger.message("Warning: with '-kill' you _MUST_ specify a job range or 'all'")
563     else:
564     if val:
565     jobs = self.parseRange_(val)
566    
567     for nj in jobs:
568     st = common.jobDB.status(nj)
569     if st == 'S':
570     jid = common.jobDB.jobId(nj)
571     common.logger.message("Killing job # "+`(nj+1)`)
572     common.scheduler.cancel(jid)
573     common.jobDB.setStatus(nj, 'K')
574     pass
575 slacapra 1.9 pass
576 spiga 1.31
577     common.jobDB.save()
578 nsmirnov 1.4 pass
579 spiga 1.31 else:
580     common.logger.message("Warning: with '-kill' you _MUST_ specify a job range or 'all'")
581 nsmirnov 1.1
582 slacapra 1.8 elif ( opt == '-getoutput' ):
583    
584 spiga 1.20 if ( self.flag_useboss == 1 ):
585     if val=='all' or val==None or val=='':
586 fanzago 1.37 allBoss_id = common.scheduler.listBoss()
587     jobs = allBoss_id.keys()
588 spiga 1.20 else:
589     jobs = self.parseRange_(val)
590     common.scheduler.getOutput(jobs)
591     else:
592     jobs = self.parseRange_(val)
593 spiga 1.13
594 slacapra 1.22 ## also this: create a ActorClass (GetOutput)
595 spiga 1.20 jobs_done = []
596     for nj in jobs:
597     st = common.jobDB.status(nj)
598     if st == 'D':
599 slacapra 1.12 jobs_done.append(nj)
600 spiga 1.20 pass
601     elif st == 'S':
602     jid = common.jobDB.jobId(nj)
603     currStatus = common.scheduler.queryStatus(jid)
604     if currStatus=="Done":
605     jobs_done.append(nj)
606     else:
607     msg = 'Job # '+`(nj+1)`+' submitted but still status '+currStatus+' not possible to get output'
608     common.logger.message(msg)
609     pass
610 slacapra 1.9 else:
611 spiga 1.20 # common.logger.message('Jobs #'+`(nj+1)`+' has status '+st+' not possible to get output')
612     pass
613 slacapra 1.12 pass
614    
615 spiga 1.20 for nj in jobs_done:
616 spiga 1.15 jid = common.jobDB.jobId(nj)
617     dir = common.scheduler.getOutput(jid)
618     common.jobDB.setStatus(nj, 'Y')
619 slacapra 1.12
620     # Rename the directory with results to smth readable
621     new_dir = common.work_space.resDir()
622 spiga 1.20 try:
623     files = os.listdir(dir)
624     for file in files:
625     os.rename(dir+'/'+file, new_dir+'/'+file)
626     os.rmdir(dir)
627     except OSError, e:
628     msg = 'rename files from '+dir+' to '+new_dir+' error: '
629     msg += str(e)
630     common.logger.message(msg)
631     # ignore error
632 slacapra 1.12 pass
633 spiga 1.20 pass
634 slacapra 1.22 ###
635    
636 spiga 1.13 resFlag = 0
637     exCode = common.scheduler.getExitStatus(jid)
638 spiga 1.34 Statistic.Monitor('retrieved',resFlag,jid,exCode)
639 slacapra 1.12
640     msg = 'Results of Job # '+`(nj+1)`+' are in '+new_dir
641     common.logger.message(msg)
642 nsmirnov 1.4 pass
643    
644     common.jobDB.save()
645     pass
646    
647     elif ( opt == '-resubmit' ):
648 fanzago 1.37 if ( self.flag_useboss == 1 ):
649     if val=='all' or val==None or val=='':
650     allBoss_id = common.scheduler.listBoss()
651     jobs = allBoss_id.keys()
652     else:
653     jobs = self.parseRange_(val)
654     else:
655     if val:
656     jobs = self.parseRange_(val)
657    
658 slacapra 1.11 if val:
659     # create a list of jobs to be resubmitted.
660 slacapra 1.8
661 slacapra 1.22 ### as before, create a Resubmittter Class
662 slacapra 1.11 nj_list = []
663     for nj in jobs:
664 fanzago 1.37 st = common.jobDB.status(int(nj)-1)
665 slacapra 1.11 if st in ['K','A']:
666 fanzago 1.37 nj_list.append(int(nj)-1)
667     common.jobDB.setStatus(int(nj)-1,'C')
668 slacapra 1.11 elif st == 'Y':
669 fanzago 1.39 common.scheduler.moveOutput(nj)
670 fanzago 1.37 nj_list.append(int(nj)-1)
671     st = common.jobDB.setStatus(int(nj)-1,'RC')
672 spiga 1.44 elif st in ['C','X']:
673     common.logger.message('Job #'+`int(nj)`+' has status '+crabJobStatusToString(st)+' not yet submitted!!!')
674 slacapra 1.11 pass
675 spiga 1.44 elif st == 'D':
676     common.logger.message('Job #'+`int(nj)`+' has status '+crabJobStatusToString(st)+' must be retrieved before resubmission')
677 slacapra 1.11 else:
678 spiga 1.44 common.logger.message('Job #'+`nj`+' has status '+crabJobStatusToString(st)+' must be "killed" before resubmission')
679 slacapra 1.9 pass
680 slacapra 1.8
681 fanzago 1.37 if len(common.job_list) == 0 :
682     common.job_list = JobList(common.jobDB.nJobs(),None)
683     common.job_list.setJDLNames(self.job_type_name+'.jdl')
684     pass
685    
686 slacapra 1.11 if len(nj_list) != 0:
687 fanzago 1.37 common.scheduler.resubmit(nj_list)
688 slacapra 1.11 # Instantiate Submitter object
689     self.actions[opt] = Submitter(self.cfg_params, nj_list)
690    
691 slacapra 1.8 pass
692     pass
693 slacapra 1.11 else:
694     common.logger.message("Warning: with '-resubmit' you _MUST_ specify a job range or 'all'")
695     common.logger.message("WARNING: _all_ job specified in the rage will be resubmitted!!!")
696     pass
697 fanzago 1.37 common.jobDB.save()
698 slacapra 1.8 pass
699    
700     elif ( opt == '-cancelAndResubmit' ):
701 nsmirnov 1.5
702 spiga 1.47 if ( self.flag_useboss == 1 ):
703     if val:
704     if val =='all':
705     allBoss_id = common.scheduler.listBoss()
706     jobs = allBoss_id.keys()
707     else:
708     jobs = self.parseRange_(val)
709     # kill submitted jobs
710     common.scheduler.cancel(jobs)
711     else:
712     common.logger.message("Warning: with '-cancelAndResubmit' you _MUST_ specify a job range or 'all'")
713     else:
714     if val:
715     jobs = self.parseRange_(val)
716     else:
717     common.logger.message("Warning: with '-kill' you _MUST_ specify a job range or 'all'")
718 nsmirnov 1.5 pass
719    
720 spiga 1.47 # resubmit cancelled jobs.
721     if val:
722     nj_list = []
723     for nj in jobs:
724     if ( self.flag_useboss != 1 ):
725     st = common.jobDB.status(nj)
726     if st == 'S':
727     jid = common.jobDB.jobId(nj)
728     common.scheduler.cancel(jid)
729     st = 'K'
730     common.jobDB.setStatus(nj, st)
731     pass
732     common.jobDB.save()
733     pass
734     st = common.jobDB.status(int(nj)-1)
735     if st in ['K','A']:
736     nj_list.append(int(nj)-1)
737     common.jobDB.setStatus(int(nj)-1,'C')
738     elif st == 'Y':
739     common.scheduler.moveOutput(nj)
740     nj_list.append(int(nj)-1)
741     st = common.jobDB.setStatus(int(nj)-1,'RC')
742     elif st in ['C','X']:
743     common.logger.message('Job #'+`int(nj)`+' has status '+crabJobStatusToString(st)+' not yet submitted!!!')
744     pass
745     elif st == 'D':
746     common.logger.message('Job #'+`int(nj)`+' has status '+crabJobStatusToString(st)+' must be retrieved before resubmission')
747     else:
748     common.logger.message('Job #'+`nj`+' has status '+crabJobStatusToString(st)+' must be "killed" before resubmission')
749     pass
750    
751 nsmirnov 1.5 if len(common.job_list) == 0 :
752 spiga 1.47 common.job_list = JobList(common.jobDB.nJobs(),None)
753 nsmirnov 1.5 common.job_list.setJDLNames(self.job_type_name+'.jdl')
754     pass
755 spiga 1.47
756     if len(nj_list) != 0:
757     common.scheduler.resubmit(nj_list)
758     self.actions[opt] = Submitter(self.cfg_params, nj_list)
759     pass
760     pass
761     else:
762     common.logger.message("WARNING: _all_ job specified in the rage will be cancelled and resubmitted!!!")
763 nsmirnov 1.5 pass
764 spiga 1.47 common.jobDB.save()
765 nsmirnov 1.4 pass
766 spiga 1.47
767 slacapra 1.28 elif ( opt == '-testJdl' ):
768 slacapra 1.8 jobs = self.parseRange_(val)
769     nj_list = []
770     for nj in jobs:
771     st = common.jobDB.status(nj)
772 slacapra 1.12 if st == 'C': nj_list.append(nj)
773 slacapra 1.8 pass
774    
775     if len(nj_list) != 0:
776     # Instantiate Submitter object
777     self.actions[opt] = Checker(self.cfg_params, nj_list)
778    
779     # Create and initialize JobList
780    
781     if len(common.job_list) == 0 :
782     common.job_list = JobList(common.jobDB.nJobs(), None)
783     common.job_list.setJDLNames(self.job_type_name+'.jdl')
784     pass
785     pass
786    
787 slacapra 1.9 elif ( opt == '-postMortem' ):
788     jobs = self.parseRange_(val)
789     nj_list = []
790     for nj in jobs:
791 fanzago 1.45 # fede: nj scala di uno perche' e' l'internal id di boss
792     # ed il jobDB parte da zero ...
793     st = common.jobDB.status(int(nj)-1)
794     if st not in ['X', 'C']: nj_list.append(int(nj))
795 slacapra 1.9 pass
796    
797     if len(nj_list) != 0:
798     # Instantiate Submitter object
799 spiga 1.35 self.actions[opt] = PostMortem(self.cfg_params, nj_list,self.flag_useboss)
800 slacapra 1.9
801     # Create and initialize JobList
802    
803     if len(common.job_list) == 0 :
804     common.job_list = JobList(common.jobDB.nJobs(), None)
805     common.job_list.setJDLNames(self.job_type_name+'.jdl')
806     pass
807     pass
808 slacapra 1.33 else:
809     common.logger.message("No jobs to analyze")
810 slacapra 1.9
811     elif ( opt == '-clean' ):
812     if val != None:
813     raise CrabException("No range allowed for '-clean'")
814    
815 corvo 1.46 theCleaner = Cleaner(self.scheduler_name == 'boss', self.cfg_params)
816 slacapra 1.36 theCleaner.clean()
817 slacapra 1.9
818 nsmirnov 1.1 pass
819     return
820    
821 nsmirnov 1.3 def createWorkingSpace_(self):
822 slacapra 1.9 new_dir = ''
823    
824     try:
825     new_dir = self.cfg_params['USER.ui_working_dir']
826 corvo 1.50 self.cfg_params['taskId'] = self.cfg_params['user'] + '_' + string.split(new_dir, '/')[len(string.split(new_dir, '/')) - 1] + '_' + self.current_time
827 fanzago 1.48 if os.path.exists(new_dir):
828     if os.listdir(new_dir):
829     msg = new_dir + ' already exists and is not empty. Please remove it before create new task'
830     raise CrabException(msg)
831 slacapra 1.9 except KeyError:
832     new_dir = common.prog_name + '_' + self.name + '_' + self.current_time
833 corvo 1.50 self.cfg_params['taskId'] = self.cfg_params['user'] + '_' + new_dir
834 slacapra 1.9 new_dir = self.cwd + new_dir
835     pass
836 fanzago 1.49 common.work_space = WorkSpace(new_dir, self.cfg_params)
837 nsmirnov 1.1 common.work_space.create()
838     return
839    
840 nsmirnov 1.3 def loadConfiguration_(self, opts):
841 nsmirnov 1.1
842     save_opts = common.work_space.loadSavedOptions()
843    
844     # Override saved options with new command-line options
845    
846     for k in opts.keys():
847     save_opts[k] = opts[k]
848     pass
849    
850     # Return updated options
851     return save_opts
852    
853 nsmirnov 1.3 def createLogger_(self, args):
854 nsmirnov 1.1
855     log = Logger()
856     log.quiet(self.flag_quiet)
857     log.setDebugLevel(self.debug_level)
858     log.write(args+'\n')
859 nsmirnov 1.3 log.message(self.headerString_())
860 nsmirnov 1.1 log.flush()
861     common.logger = log
862     return
863    
864 nsmirnov 1.3 def updateHistory_(self, args):
865 nsmirnov 1.1 history_fname = common.prog_name+'.history'
866     history_file = open(history_fname, 'a')
867     history_file.write(self.current_time+': '+args+'\n')
868     history_file.close()
869     return
870    
871 nsmirnov 1.3 def headerString_(self):
872 nsmirnov 1.1 """
873     Creates a string describing program options either given in
874     the command line or their default values.
875     """
876     header = common.prog_name + ' (version ' + common.prog_version_str + \
877     ') running on ' + \
878     time.ctime(time.time())+'\n\n' + \
879     common.prog_name+'. Working options:\n'
880 corvo 1.51 print self.job_type_name
881 nsmirnov 1.1 header = header +\
882     ' scheduler ' + self.scheduler_name + '\n'+\
883     ' job type ' + self.job_type_name + '\n'+\
884     ' working directory ' + common.work_space.topDir()\
885     + '\n'
886     return header
887    
888 nsmirnov 1.3 def createScheduler_(self):
889 nsmirnov 1.1 """
890     Creates a scheduler object instantiated by its name.
891     """
892     klass_name = 'Scheduler' + string.capitalize(self.scheduler_name)
893     file_name = klass_name
894     try:
895     klass = importName(file_name, klass_name)
896     except KeyError:
897     msg = 'No `class '+klass_name+'` found in file `'+file_name+'.py`'
898     raise CrabException(msg)
899     except ImportError, e:
900     msg = 'Cannot create scheduler '+self.scheduler_name
901     msg += ' (file: '+file_name+', class '+klass_name+'):\n'
902     msg += str(e)
903     raise CrabException(msg)
904    
905     common.scheduler = klass()
906     common.scheduler.configure(self.cfg_params)
907     return
908    
909 corvo 1.51 def createJobtype_(self):
910     """
911     Create the jobtype specified in the crab.cfg file
912     """
913     file_name = 'cms_'+ string.lower(self.job_type_name)
914     klass_name = string.capitalize(self.job_type_name)
915    
916     try:
917     klass = importName(file_name, klass_name)
918     except KeyError:
919     msg = 'No `class '+klass_name+'` found in file `'+file_name+'.py`'
920     raise CrabException(msg)
921     except ImportError, e:
922     msg = 'Cannot create job type '+self.job_type_name
923     msg += ' (file: '+file_name+', class '+klass_name+'):\n'
924     msg += str(e)
925     raise CrabException(msg)
926     job_type = klass(self.cfg_params)
927     return job_type
928    
929 nsmirnov 1.1 def run(self):
930     """
931     For each
932     """
933    
934     for act in self.main_actions:
935     if act in self.actions.keys(): self.actions[act].run()
936     pass
937    
938     for act in self.aux_actions:
939     if act in self.actions.keys(): self.actions[act].run()
940     pass
941     return
942    
943     ###########################################################################
944     def processHelpOptions(opts):
945    
946 slacapra 1.11 if len(opts):
947     for opt in opts.keys():
948     if opt in ('-v', '-version', '--version') :
949     print Crab.version()
950     return 1
951     if opt in ('-h','-help','--help') :
952     if opts[opt] : help(opts[opt])
953     else: help()
954     return 1
955     else:
956     usage()
957 nsmirnov 1.1
958     return 0
959    
960     ###########################################################################
961     if __name__ == '__main__':
962    
963 fanzago 1.18
964 fanzago 1.19 # # Initial settings for Python modules. Avoid appending manually lib paths.
965     # try:
966     # path=os.environ['EDG_WL_LOCATION']
967     # except:
968     # print "Error: Please set the EDG_WL_LOCATION environment variable pointing to the userinterface installation path"
969     # sys.exit(1)
970     #
971     # libPath=os.path.join(path, "lib")
972     # sys.path.append(libPath)
973     # libPath=os.path.join(path, "lib", "python")
974     # sys.path.append(libPath)
975 fanzago 1.18
976    
977 nsmirnov 1.1 # Parse command-line options and create a dictionary with
978     # key-value pairs.
979    
980     options = parseOptions(sys.argv[1:])
981    
982     # Process "help" options, such as '-help', '-version'
983    
984 slacapra 1.11 if processHelpOptions(options) : sys.exit(0)
985 nsmirnov 1.1
986     # Create, initialize, and run a Crab object
987    
988     try:
989 nsmirnov 1.3 crab = Crab(options)
990 nsmirnov 1.1 crab.run()
991     except CrabException, e:
992     print '\n' + common.prog_name + ': ' + str(e) + '\n'
993     if common.logger:
994     common.logger.write('ERROR: '+str(e)+'\n')
995     pass
996     pass
997    
998     pass