ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/crab.py
Revision: 1.5
Committed: Mon Jun 27 07:57:23 2005 UTC (19 years, 10 months ago) by nsmirnov
Content type: text/x-python
Branch: MAIN
Changes since 1.4: +61 -12 lines
Log Message:
'-resubmit' implemented

File Contents

# User Rev Content
1 nsmirnov 1.1 #!/usr/bin/env python
2     from crab_help import *
3     from crab_util import *
4     from crab_exceptions import *
5     from crab_logger import Logger
6     from WorkSpace import WorkSpace
7     from JobDB import JobDB
8 nsmirnov 1.3 from JobList import JobList
9 nsmirnov 1.1 from Creator import Creator
10     from Submitter import Submitter
11     import common
12    
13     import sys, os, time, string
14    
15     ###########################################################################
16     class Crab:
17 nsmirnov 1.3 def __init__(self, opts):
18 nsmirnov 1.1
19     # The order of main_actions is important !
20 nsmirnov 1.5 self.main_actions = [ '-create', '-submit', '-monitor' ]
21     self.aux_actions = [ '-list', '-kill', '-status', '-retrieve',
22     '-resubmit' ]
23 nsmirnov 1.1
24     # Dictionary of actions, e.g. '-create' -> object of class Creator
25     self.actions = {}
26    
27     # Configuration file
28     self.cfg_fname = None
29     # Dictionary with configuration parameters
30     self.cfg_params = {}
31    
32     # Current working directory
33     self.cwd = os.getcwd()+'/'
34     # Current time in format 'yymmdd_hhmmss'
35     self.current_time = time.strftime('%y%m%d_%H%M%S',
36     time.localtime(time.time()))
37    
38 nsmirnov 1.3 # Session name (?) Do we need this ?
39 nsmirnov 1.1 self.name = '0'
40    
41     # Job type
42     self.job_type_name = None
43    
44     # Continuation flag
45     self.flag_continue = 0
46    
47     # quiet mode, i.e. no output on screen
48     self.flag_quiet = 0
49     # produce more output
50     self.debug_level = 0
51    
52 nsmirnov 1.3 # Scheduler name, e.g. 'edg', 'lsf'
53 nsmirnov 1.1 self.scheduler_name = 'edg'
54    
55 nsmirnov 1.3 self.initialize_(opts)
56 nsmirnov 1.1
57     return
58    
59     def version(self):
60     return common.prog_version_str
61    
62 nsmirnov 1.3 def initialize_(self, opts):
63 nsmirnov 1.1
64     # Process the '-continue' option first because
65     # in the case of continuation the CRAB configuration
66     # parameters are loaded from already existing Working Space.
67 nsmirnov 1.3 self.processContinueOption_(opts)
68 nsmirnov 1.1
69     # Process ini-file first, then command line options
70     # because they override ini-file settings.
71    
72 nsmirnov 1.3 self.processIniFile_(opts)
73 nsmirnov 1.1
74 nsmirnov 1.3 if self.flag_continue: opts = self.loadConfiguration_(opts)
75 nsmirnov 1.1
76 nsmirnov 1.3 self.processOptions_(opts)
77 nsmirnov 1.1
78     if not self.flag_continue:
79 nsmirnov 1.3 self.createWorkingSpace_()
80 nsmirnov 1.1 common.work_space.saveConfiguration(opts, self.cfg_fname)
81     pass
82    
83     # At this point all configuration options have been read.
84    
85     args = string.join(sys.argv,' ')
86 nsmirnov 1.3 self.updateHistory_(args)
87     self.createLogger_(args)
88 nsmirnov 1.1 common.jobDB = JobDB()
89 nsmirnov 1.3 if self.flag_continue:
90     common.jobDB.load()
91     common.logger.debug(6, str(common.jobDB))
92     pass
93     self.createScheduler_()
94     if common.logger.debugLevel() >= 6:
95     common.logger.debug(6, 'Used properties:')
96     keys = self.cfg_params.keys()
97     keys.sort()
98     for k in keys:
99     if self.cfg_params[k]:
100     common.logger.debug(6, ' '+k+' : '+self.cfg_params[k])
101     pass
102     else:
103     common.logger.debug(6, ' '+k+' : ')
104     pass
105     pass
106     common.logger.debug(6, 'End of used properties.\n')
107     pass
108     self.initializeActions_(opts)
109 nsmirnov 1.1 return
110    
111 nsmirnov 1.3 def processContinueOption_(self,opts):
112 nsmirnov 1.1
113     continue_dir = None
114 nsmirnov 1.4
115     # Look for the '-continue' option.
116    
117 nsmirnov 1.1 for opt in opts.keys():
118     if ( opt in ('-continue','-c') ):
119     self.flag_continue = 1
120     val = opts[opt]
121     if val:
122     if val[0] == '/': continue_dir = val # abs path
123     else: continue_dir = self.cwd + val # rel path
124     pass
125 nsmirnov 1.4 break
126     pass
127    
128     # Look for actions which has sense only with '-continue'
129    
130     if not self.flag_continue:
131     for opt in opts.keys():
132     if ( opt in self.aux_actions ):
133     self.flag_continue = 1
134     break
135 nsmirnov 1.1 pass
136     pass
137    
138     if not self.flag_continue: return
139    
140    
141     if not continue_dir:
142     prefix = common.prog_name + '_' + self.name + '_'
143     continue_dir = findLastWorkDir(prefix)
144     pass
145    
146     if not continue_dir:
147     raise CrabException('Cannot find last working directory.')
148    
149     if not os.path.exists(continue_dir):
150     msg = 'Cannot continue because the working directory <'
151     msg += continue_dir
152     msg += '> does not exist.'
153     raise CrabException(msg)
154    
155     # Instantiate WorkSpace
156     common.work_space = WorkSpace(continue_dir)
157    
158     return
159    
160 nsmirnov 1.3 def processIniFile_(self, opts):
161 nsmirnov 1.1 """
162     Processes a configuration INI-file.
163     """
164    
165     # Extract cfg-file name from the cmd-line options.
166    
167     for opt in opts.keys():
168     if ( opt == '-cfg' ):
169     if self.flag_continue:
170     raise CrabException('-continue and -cfg cannot coexist.')
171     if opts[opt] : self.cfg_fname = opts[opt]
172     else : usage()
173     pass
174    
175     elif ( opt == '-name' ):
176     self.name = opts[opt]
177     pass
178    
179     pass
180    
181     # Set default cfg-fname
182    
183     if self.cfg_fname == None:
184     if self.flag_continue:
185     self.cfg_fname = common.work_space.cfgFileName()
186     else:
187     self.cfg_fname = common.prog_name+'.cfg'
188     pass
189     pass
190    
191     # Load cfg-file
192    
193     if string.lower(self.cfg_fname) != 'none':
194     if os.path.exists(self.cfg_fname):
195     self.cfg_params = loadConfig(self.cfg_fname)
196     pass
197     else:
198     msg = 'cfg-file '+self.cfg_fname+' not found.'
199     raise CrabException(msg)
200     pass
201     pass
202    
203     # process the [CRAB] section
204    
205     lhp = len('CRAB.')
206     for k in self.cfg_params.keys():
207     if len(k) >= lhp and k[:lhp] == 'CRAB.':
208     opt = '-'+k[lhp:]
209     if len(opt) >= 3 and opt[:3] == '-__': continue
210     if opt not in opts.keys():
211     opts[opt] = self.cfg_params[k]
212     pass
213     pass
214     pass
215    
216     return
217    
218 nsmirnov 1.3 def processOptions_(self, opts):
219 nsmirnov 1.1 """
220     Processes the command-line options.
221     """
222    
223     for opt in opts.keys():
224     val = opts[opt]
225    
226 nsmirnov 1.3 # Skip actions, they are processed later in initializeActions_()
227     if opt in self.main_actions:
228     self.cfg_params['CRAB.'+opt[1:]] = val
229     continue
230     if opt in self.aux_actions:
231     self.cfg_params['CRAB.'+opt[1:]] = val
232     continue
233 nsmirnov 1.1
234    
235     elif ( opt == '-cfg' ):
236     pass
237    
238     elif ( opt in ('-continue', '-c') ):
239 nsmirnov 1.4 # Already processed in processContinueOption_()
240 nsmirnov 1.1 pass
241    
242     elif ( opt == '-jobtype' ):
243     if val : self.job_type_name = string.upper(val)
244     else : usage()
245     pass
246    
247     elif ( opt == '-Q' ):
248     self.flag_quiet = 1
249     pass
250    
251     elif ( opt == '-debug' ):
252 nsmirnov 1.2 self.debug_level = int(val)
253 nsmirnov 1.1 pass
254    
255     elif ( opt == '-scheduler' ):
256     if val: self.scheduler_name = val
257     else:
258     print common.prog_name+". No value for '-scheduler'."
259     usage()
260     pass
261     pass
262    
263 nsmirnov 1.3 elif string.find(opt,'.') == -1:
264     print common.prog_name+'. Unrecognized option '+opt
265     usage()
266     pass
267 nsmirnov 1.1
268 nsmirnov 1.3 # Override config parameters from INI-file with cmd-line params
269     if string.find(opt,'.') == -1 :
270     self.cfg_params['CRAB.'+opt[1:]] = val
271 nsmirnov 1.1 pass
272 nsmirnov 1.3 else:
273 nsmirnov 1.1 # Command line parameters in the form -SECTION.ENTRY=VALUE
274     self.cfg_params[opt[1:]] = val
275     pass
276     pass
277     return
278    
279 nsmirnov 1.4 def parseRange_(self, val):
280     """
281     Parses 'val':
282     ----------+------------
283     val | returns
284     ----------+------------
285     'n1-n2' (n1, n2)
286     'n' (n, n)
287     'all' (1, njobs)
288     None (1, njobs)
289     ----------+------------
290     """
291     if val == 'all': val = None
292     if val:
293     (n1, n2) = parseRange(val)
294     if n1 < 1 : n1 = 1
295     if n2 > common.jobDB.nJobs() : n2 = common.jobDB.nJobs()
296     pass
297     else:
298     n1 = 1
299     n2 = common.jobDB.nJobs()
300     pass
301     return (n1,n2)
302    
303 nsmirnov 1.3 def initializeActions_(self, opts):
304 nsmirnov 1.1 """
305     For each user action instantiate a corresponding
306     object and put it in the action dictionary.
307     """
308    
309     for opt in opts.keys():
310     val = opts[opt]
311    
312     if ( opt == '-create' ):
313     if val:
314     if ( isInt(val) ):
315     ncjobs = int(val)
316     elif ( val == 'all'):
317     ncjobs = val
318     else:
319 nsmirnov 1.5 msg = 'Bad creation bunch size <'+str(val)+'>\n'
320     msg += ' Must be an integer or "all"'
321     raise CrabException(msg)
322 nsmirnov 1.1 pass
323     else: ncjobs = 'all'
324    
325     if ncjobs != 0:
326 nsmirnov 1.3 # Instantiate Creator object
327 nsmirnov 1.1 creator = Creator(self.job_type_name,
328     self.cfg_params,
329     ncjobs)
330     self.actions[opt] = creator
331 nsmirnov 1.3
332     # Initialize the JobDB object if needed
333 nsmirnov 1.1 if not self.flag_continue:
334     common.jobDB.create(creator.nJobs())
335     pass
336 nsmirnov 1.3
337     # Create and initialize JobList
338    
339     common.job_list = JobList(common.jobDB.nJobs(),
340     creator.jobType())
341    
342     common.job_list.setScriptNames(self.job_type_name+'.sh')
343     common.job_list.setJDLNames(self.job_type_name+'.jdl')
344     creator.jobType().setSteeringCardsNames()
345 nsmirnov 1.1 pass
346 nsmirnov 1.3 pass
347 nsmirnov 1.1
348     elif ( opt == '-submit' ):
349     if val:
350     if ( isInt(val) ):
351     nsjobs = int(val)
352     elif ( val == 'all'):
353     nsjobs = val
354     else:
355 nsmirnov 1.5 msg = 'Bad submission bunch size <'+str(val)+'>\n'
356     msg += ' Must be an integer or "all"'
357     raise CrabException(msg)
358 nsmirnov 1.1 pass
359     else: nsjobs = 'all'
360    
361 nsmirnov 1.5 # Create a list with numbers of jobs to be submitted
362    
363     total_njobs = common.jobDB.nJobs()
364     if total_njobs == 0 :
365     msg = '\nNo created jobs found.\n'
366     msg += "Maybe you forgot '-create' or '-continue' ?\n"
367     raise CrabException(msg)
368    
369     if nsjobs == 'all': nsjobs = total_njobs
370     if nsjobs > total_njobs : nsjobs = total_njobs
371    
372     nj_list = []
373     for nj in range(total_njobs):
374     if len(nj_list) >= nsjobs : break
375     st = common.jobDB.status(nj)
376     if st == 'C': nj_list.append(nj)
377     pass
378    
379     if len(nj_list) != 0:
380 nsmirnov 1.3 # Instantiate Submitter object
381 nsmirnov 1.5 self.actions[opt] = Submitter(self.cfg_params, nj_list)
382 nsmirnov 1.3
383     # Create and initialize JobList
384    
385     if len(common.job_list) == 0 :
386     common.job_list = JobList(common.jobDB.nJobs(),
387     None)
388     common.job_list.setJDLNames(self.job_type_name+'.jdl')
389     pass
390 nsmirnov 1.1 pass
391 nsmirnov 1.3 pass
392 nsmirnov 1.1
393 nsmirnov 1.4 elif ( opt == '-list' ):
394     print common.jobDB
395     pass
396    
397     elif ( opt == '-status' ):
398     (n1, n2) = self.parseRange_(val)
399    
400     nj = n1 - 1
401     while ( nj < n2 ):
402     st = common.jobDB.status(nj)
403     if st == 'S':
404     jid = common.jobDB.jobId(nj)
405     st = common.scheduler.queryStatus(jid)
406     print 'Job %03d:'%(nj+1),st
407     pass
408     else:
409     print 'Job %03d:'%(nj+1),crabJobStatusToString(st)
410 nsmirnov 1.1 pass
411 nsmirnov 1.4 nj += 1
412 nsmirnov 1.1 pass
413     pass
414    
415 nsmirnov 1.4 elif ( opt == '-kill' ):
416     (n1, n2) = self.parseRange_(val)
417    
418     nj = n1 - 1
419     while ( nj < n2 ):
420     st = common.jobDB.status(nj)
421     if st == 'S':
422     jid = common.jobDB.jobId(nj)
423     common.scheduler.cancel(jid)
424     common.jobDB.setStatus(nj, 'K')
425     pass
426     nj += 1
427 nsmirnov 1.1 pass
428 nsmirnov 1.4
429     common.jobDB.save()
430 nsmirnov 1.1 pass
431    
432     elif ( opt == '-retrieve' ):
433 nsmirnov 1.4 (n1, n2) = self.parseRange_(val)
434    
435     nj = n1 - 1
436     while ( nj < n2 ):
437     st = common.jobDB.status(nj)
438     if st == 'S':
439     jid = common.jobDB.jobId(nj)
440     dir = common.scheduler.getOutput(jid)
441     common.jobDB.setStatus(nj, 'Y')
442    
443     # Rename the directory with results to smth readable
444     new_dir = common.work_space.resDir()+'%06d'%(nj+1)
445     try:
446     os.rename(dir, new_dir)
447     except OSError, e:
448     msg = 'rename('+dir+', '+new_dir+') error: '
449     msg += str(e)
450     common.logger.message(msg)
451     # ignore error
452     pass
453    
454     msg = 'Results of Job # '+`(nj+1)`+' are in '+new_dir
455     common.logger.message(msg)
456     pass
457     nj += 1
458     pass
459    
460     common.jobDB.save()
461     pass
462    
463     elif ( opt == '-resubmit' ):
464 nsmirnov 1.5 (n1, n2) = self.parseRange_(val)
465    
466     # Cancel submitted jobs from the range (n1, n2)
467     # and create a list of jobs to be resubmitted.
468    
469     nj_list = []
470     nj = n1 - 1
471     while ( nj < n2 ):
472     st = common.jobDB.status(nj)
473     if st == 'S':
474     jid = common.jobDB.jobId(nj)
475     common.scheduler.cancel(jid)
476     st = 'K'
477     common.jobDB.setStatus(nj, st)
478     pass
479    
480     if st != 'X': nj_list.append(nj)
481     nj += 1
482     pass
483    
484     if len(nj_list) != 0:
485     # Instantiate Submitter object
486     self.actions[opt] = Submitter(self.cfg_params, nj_list)
487    
488     # Create and initialize JobList
489    
490     if len(common.job_list) == 0 :
491     common.job_list = JobList(common.jobDB.nJobs(),
492     None)
493     common.job_list.setJDLNames(self.job_type_name+'.jdl')
494     pass
495     pass
496 nsmirnov 1.4 pass
497    
498     elif ( opt == '-monitor' ):
499 nsmirnov 1.1 # TODO
500     if val and ( isInt(val) ):
501     common.delay = val
502     else:
503     common.delay = 60
504     pass
505     common.autoretrieve = 1
506     pass
507    
508     pass
509     return
510    
511 nsmirnov 1.3 def createWorkingSpace_(self):
512 nsmirnov 1.1 new_dir = common.prog_name + '_' + self.name + '_' + self.current_time
513     new_dir = self.cwd + new_dir
514     common.work_space = WorkSpace(new_dir)
515     common.work_space.create()
516     return
517    
518 nsmirnov 1.3 def loadConfiguration_(self, opts):
519 nsmirnov 1.1
520     save_opts = common.work_space.loadSavedOptions()
521    
522     # Override saved options with new command-line options
523    
524     for k in opts.keys():
525     save_opts[k] = opts[k]
526     pass
527    
528     # Return updated options
529     return save_opts
530    
531 nsmirnov 1.3 def createLogger_(self, args):
532 nsmirnov 1.1
533     log = Logger()
534     log.quiet(self.flag_quiet)
535     log.setDebugLevel(self.debug_level)
536     log.write(args+'\n')
537 nsmirnov 1.3 log.message(self.headerString_())
538 nsmirnov 1.1 log.flush()
539     common.logger = log
540     return
541    
542 nsmirnov 1.3 def updateHistory_(self, args):
543 nsmirnov 1.1 history_fname = common.prog_name+'.history'
544     history_file = open(history_fname, 'a')
545     history_file.write(self.current_time+': '+args+'\n')
546     history_file.close()
547     return
548    
549 nsmirnov 1.3 def headerString_(self):
550 nsmirnov 1.1 """
551     Creates a string describing program options either given in
552     the command line or their default values.
553     """
554     header = common.prog_name + ' (version ' + common.prog_version_str + \
555     ') running on ' + \
556     time.ctime(time.time())+'\n\n' + \
557     common.prog_name+'. Working options:\n'
558     header = header +\
559     ' scheduler ' + self.scheduler_name + '\n'+\
560     ' job type ' + self.job_type_name + '\n'+\
561     ' working directory ' + common.work_space.topDir()\
562     + '\n'
563     return header
564    
565 nsmirnov 1.3 def createScheduler_(self):
566 nsmirnov 1.1 """
567     Creates a scheduler object instantiated by its name.
568     """
569     klass_name = 'Scheduler' + string.capitalize(self.scheduler_name)
570     file_name = klass_name
571     try:
572     klass = importName(file_name, klass_name)
573     except KeyError:
574     msg = 'No `class '+klass_name+'` found in file `'+file_name+'.py`'
575     raise CrabException(msg)
576     except ImportError, e:
577     msg = 'Cannot create scheduler '+self.scheduler_name
578     msg += ' (file: '+file_name+', class '+klass_name+'):\n'
579     msg += str(e)
580     raise CrabException(msg)
581    
582     common.scheduler = klass()
583     common.scheduler.configure(self.cfg_params)
584     return
585    
586     def run(self):
587     """
588     For each
589     """
590    
591     for act in self.main_actions:
592     if act in self.actions.keys(): self.actions[act].run()
593     pass
594    
595     for act in self.aux_actions:
596     if act in self.actions.keys(): self.actions[act].run()
597     pass
598     return
599    
600     ###########################################################################
601     def processHelpOptions(opts):
602    
603     for opt in opts.keys():
604     if opt == '-v':
605     print Crab().version()
606     return 1
607     if opt in ('-h','-help','--help') :
608     if opts[opt] : help(opts[opt])
609     else: help()
610     return 1
611    
612     return 0
613    
614     ###########################################################################
615     if __name__ == '__main__':
616    
617     # Parse command-line options and create a dictionary with
618     # key-value pairs.
619    
620     options = parseOptions(sys.argv[1:])
621    
622     # Process "help" options, such as '-help', '-version'
623    
624     if processHelpOptions(options): sys.exit(0)
625    
626     # Create, initialize, and run a Crab object
627    
628     try:
629 nsmirnov 1.3 crab = Crab(options)
630 nsmirnov 1.1 crab.run()
631     except CrabException, e:
632     print '\n' + common.prog_name + ': ' + str(e) + '\n'
633     if common.logger:
634     common.logger.write('ERROR: '+str(e)+'\n')
635     pass
636     pass
637    
638     pass