ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/crab.py
Revision: 1.108
Committed: Sun Jun 3 17:53:42 2007 UTC (17 years, 11 months ago) by spiga
Content type: text/x-python
Branch: MAIN
Changes since 1.107: +32 -30 lines
Log Message:
add support for PostMortem fuctionality also while using server

File Contents

# User Rev Content
1 elmer 1.65 #!/usr/bin/env python
2 nsmirnov 1.1 from crab_help import *
3     from crab_util import *
4     from crab_exceptions import *
5     from crab_logger import Logger
6     from WorkSpace import WorkSpace
7     from JobDB import JobDB
8 slacapra 1.77 from TaskDB import TaskDB
9 nsmirnov 1.3 from JobList import JobList
10 nsmirnov 1.1 from Creator import Creator
11     from Submitter import Submitter
12 slacapra 1.8 from Checker import Checker
13 slacapra 1.9 from PostMortem import PostMortem
14     from Status import Status
15 spiga 1.16 from StatusBoss import StatusBoss
16 corvo 1.42 from ApmonIf import ApmonIf
17 slacapra 1.36 from Cleaner import Cleaner
18 nsmirnov 1.1 import common
19 spiga 1.13 import Statistic
20 spiga 1.107 import commands
21 nsmirnov 1.1
22 spiga 1.98 from BossSession import *
23 corvo 1.93
24 spiga 1.107 #modified to support server mode
25     from SubmitterServer import SubmitterServer
26     from GetOutputServer import GetOutputServer
27     from StatusServer import StatusServer
28 spiga 1.108 from PostMortemServer import PostMortemServer
29 spiga 1.107
30 nsmirnov 1.1 import sys, os, time, string
31    
32     ###########################################################################
33     class Crab:
34 nsmirnov 1.3 def __init__(self, opts):
35 fanzago 1.76 ## test_tag
36 nsmirnov 1.1 # The order of main_actions is important !
37 slacapra 1.28 self.main_actions = [ '-create', '-submit' ]
38 slacapra 1.70 self.aux_actions = [ '-list', '-kill', '-status', '-getoutput','-get',
39 slacapra 1.67 '-resubmit' , '-cancelAndResubmit', '-testJdl', '-postMortem', '-clean',
40     '-printId' ]
41 nsmirnov 1.1
42     # Dictionary of actions, e.g. '-create' -> object of class Creator
43     self.actions = {}
44 fanzago 1.76
45 nsmirnov 1.1 # Configuration file
46     self.cfg_fname = None
47     # Dictionary with configuration parameters
48     self.cfg_params = {}
49    
50     # Current working directory
51     self.cwd = os.getcwd()+'/'
52     # Current time in format 'yymmdd_hhmmss'
53     self.current_time = time.strftime('%y%m%d_%H%M%S',
54     time.localtime(time.time()))
55    
56 nsmirnov 1.3 # Session name (?) Do we need this ?
57 nsmirnov 1.1 self.name = '0'
58    
59     # Job type
60     self.job_type_name = None
61    
62     # Continuation flag
63     self.flag_continue = 0
64    
65     # quiet mode, i.e. no output on screen
66     self.flag_quiet = 0
67     # produce more output
68     self.debug_level = 0
69    
70    
71 nsmirnov 1.3 self.initialize_(opts)
72 nsmirnov 1.1
73     return
74    
75 nsmirnov 1.7 def version():
76 nsmirnov 1.1 return common.prog_version_str
77    
78 nsmirnov 1.7 version = staticmethod(version)
79    
80 nsmirnov 1.3 def initialize_(self, opts):
81 nsmirnov 1.1
82     # Process the '-continue' option first because
83     # in the case of continuation the CRAB configuration
84     # parameters are loaded from already existing Working Space.
85 nsmirnov 1.3 self.processContinueOption_(opts)
86 nsmirnov 1.1
87     # Process ini-file first, then command line options
88     # because they override ini-file settings.
89    
90 nsmirnov 1.3 self.processIniFile_(opts)
91 nsmirnov 1.1
92 nsmirnov 1.3 if self.flag_continue: opts = self.loadConfiguration_(opts)
93 nsmirnov 1.1
94 nsmirnov 1.3 self.processOptions_(opts)
95 nsmirnov 1.1
96 slacapra 1.77 common.taskDB = TaskDB()
97    
98 slacapra 1.101
99 nsmirnov 1.1 if not self.flag_continue:
100 nsmirnov 1.3 self.createWorkingSpace_()
101 slacapra 1.9 optsToBeSaved={}
102     for it in opts.keys():
103     if (it in self.main_actions) or (it in self.aux_actions) or (it == '-debug'):
104     pass
105     else:
106     optsToBeSaved[it]=opts[it]
107 slacapra 1.101 # store in taskDB the opts
108     common.taskDB.setDict(it[1:], optsToBeSaved[it])
109 slacapra 1.9 common.work_space.saveConfiguration(optsToBeSaved, self.cfg_fname)
110 nsmirnov 1.1 pass
111 slacapra 1.77 else:
112     common.taskDB.load()
113 nsmirnov 1.1
114     # At this point all configuration options have been read.
115    
116     args = string.join(sys.argv,' ')
117 slacapra 1.11
118 nsmirnov 1.3 self.updateHistory_(args)
119 slacapra 1.11
120 nsmirnov 1.3 self.createLogger_(args)
121 slacapra 1.11
122 nsmirnov 1.1 common.jobDB = JobDB()
123 corvo 1.51
124 spiga 1.107 self.UseServer=0
125     try:
126     self.UseServer=int(self.cfg_params['CRAB.server_mode'])
127     except KeyError:
128     pass
129    
130    
131 corvo 1.63 if int(self.cfg_params['USER.activate_monalisa']): self.cfg_params['apmon'] = ApmonIf()
132    
133 nsmirnov 1.3 if self.flag_continue:
134 slacapra 1.12 try:
135     common.jobDB.load()
136 corvo 1.50 self.cfg_params['taskId'] = common.jobDB._jobs[0].taskId
137 slacapra 1.12 common.logger.debug(6, str(common.jobDB))
138     except DBException,e:
139     pass
140 nsmirnov 1.3 pass
141 slacapra 1.11
142 nsmirnov 1.3 self.createScheduler_()
143 slacapra 1.11
144 slacapra 1.106 common.logger.debug(6, 'Used properties:')
145     if (common.logger.debugLevel()<6 ):
146     common.logger.write('Used properties:')
147     keys = self.cfg_params.keys()
148     keys.sort()
149     for k in keys:
150     if self.cfg_params[k]:
151     common.logger.debug(6, ' '+k+' : '+str(self.cfg_params[k]))
152     if (common.logger.debugLevel()<6 ):
153     common.logger.write(' '+k+' : '+str(self.cfg_params[k]))
154     pass
155     else:
156     common.logger.debug(6, ' '+k+' : ')
157     if (common.logger.debugLevel()<6 ):
158     common.logger.write(' '+k+' : ')
159 nsmirnov 1.3 pass
160     pass
161 slacapra 1.106 common.logger.debug(6, 'End of used properties.\n')
162     if (common.logger.debugLevel()<6 ):
163     common.logger.write('End of used properties.\n')
164    
165 nsmirnov 1.3 self.initializeActions_(opts)
166 nsmirnov 1.1 return
167    
168 nsmirnov 1.3 def processContinueOption_(self,opts):
169 nsmirnov 1.1
170     continue_dir = None
171 nsmirnov 1.4
172     # Look for the '-continue' option.
173    
174 nsmirnov 1.1 for opt in opts.keys():
175     if ( opt in ('-continue','-c') ):
176     self.flag_continue = 1
177     val = opts[opt]
178     if val:
179     if val[0] == '/': continue_dir = val # abs path
180     else: continue_dir = self.cwd + val # rel path
181     pass
182 nsmirnov 1.4 break
183     pass
184    
185     # Look for actions which has sense only with '-continue'
186    
187     if not self.flag_continue:
188     for opt in opts.keys():
189 slacapra 1.102 if ( opt in (self.aux_actions)):
190 nsmirnov 1.4 self.flag_continue = 1
191     break
192 nsmirnov 1.1 pass
193     pass
194 slacapra 1.102 if ("-submit" in opts.keys() and "-create" not in opts.keys() ):
195     self.flag_continue = 1
196 nsmirnov 1.1
197     if not self.flag_continue: return
198    
199     if not continue_dir:
200     prefix = common.prog_name + '_' + self.name + '_'
201     continue_dir = findLastWorkDir(prefix)
202     pass
203    
204     if not continue_dir:
205     raise CrabException('Cannot find last working directory.')
206    
207     if not os.path.exists(continue_dir):
208     msg = 'Cannot continue because the working directory <'
209     msg += continue_dir
210     msg += '> does not exist.'
211     raise CrabException(msg)
212    
213     # Instantiate WorkSpace
214 fanzago 1.49 common.work_space = WorkSpace(continue_dir, self.cfg_params)
215 nsmirnov 1.1
216     return
217    
218 nsmirnov 1.3 def processIniFile_(self, opts):
219 nsmirnov 1.1 """
220     Processes a configuration INI-file.
221     """
222    
223     # Extract cfg-file name from the cmd-line options.
224    
225     for opt in opts.keys():
226     if ( opt == '-cfg' ):
227     if self.flag_continue:
228     raise CrabException('-continue and -cfg cannot coexist.')
229     if opts[opt] : self.cfg_fname = opts[opt]
230     else : usage()
231     pass
232    
233     elif ( opt == '-name' ):
234     self.name = opts[opt]
235     pass
236    
237     pass
238    
239     # Set default cfg-fname
240    
241     if self.cfg_fname == None:
242     if self.flag_continue:
243     self.cfg_fname = common.work_space.cfgFileName()
244     else:
245     self.cfg_fname = common.prog_name+'.cfg'
246     pass
247     pass
248    
249     # Load cfg-file
250    
251     if string.lower(self.cfg_fname) != 'none':
252     if os.path.exists(self.cfg_fname):
253     self.cfg_params = loadConfig(self.cfg_fname)
254 corvo 1.64 self.cfg_params['user'] = os.environ['USER']
255 nsmirnov 1.1 pass
256     else:
257     msg = 'cfg-file '+self.cfg_fname+' not found.'
258     raise CrabException(msg)
259     pass
260     pass
261    
262     # process the [CRAB] section
263    
264     lhp = len('CRAB.')
265     for k in self.cfg_params.keys():
266     if len(k) >= lhp and k[:lhp] == 'CRAB.':
267     opt = '-'+k[lhp:]
268     if len(opt) >= 3 and opt[:3] == '-__': continue
269     if opt not in opts.keys():
270     opts[opt] = self.cfg_params[k]
271     pass
272     pass
273     pass
274    
275     return
276    
277 nsmirnov 1.3 def processOptions_(self, opts):
278 nsmirnov 1.1 """
279     Processes the command-line options.
280     """
281    
282     for opt in opts.keys():
283     val = opts[opt]
284    
285 nsmirnov 1.3 # Skip actions, they are processed later in initializeActions_()
286     if opt in self.main_actions:
287     self.cfg_params['CRAB.'+opt[1:]] = val
288     continue
289     if opt in self.aux_actions:
290     self.cfg_params['CRAB.'+opt[1:]] = val
291     continue
292 nsmirnov 1.1
293 spiga 1.107 elif ( opt == '-server_mode' ): #Add for server mode usage
294     pass
295     elif ( opt == '-server_name' ):
296     pass
297 nsmirnov 1.1
298     elif ( opt == '-cfg' ):
299     pass
300    
301     elif ( opt in ('-continue', '-c') ):
302 nsmirnov 1.4 # Already processed in processContinueOption_()
303 nsmirnov 1.1 pass
304    
305     elif ( opt == '-jobtype' ):
306     if val : self.job_type_name = string.upper(val)
307     else : usage()
308     pass
309    
310     elif ( opt == '-Q' ):
311     self.flag_quiet = 1
312     pass
313    
314     elif ( opt == '-debug' ):
315 slacapra 1.6 if val: self.debug_level = int(val)
316     else: self.debug_level = 1
317 nsmirnov 1.1 pass
318    
319     elif ( opt == '-scheduler' ):
320     pass
321 slacapra 1.22
322 nsmirnov 1.3 elif string.find(opt,'.') == -1:
323     print common.prog_name+'. Unrecognized option '+opt
324     usage()
325     pass
326 nsmirnov 1.1
327 nsmirnov 1.3 # Override config parameters from INI-file with cmd-line params
328     if string.find(opt,'.') == -1 :
329     self.cfg_params['CRAB.'+opt[1:]] = val
330 nsmirnov 1.1 pass
331 nsmirnov 1.3 else:
332 nsmirnov 1.1 # Command line parameters in the form -SECTION.ENTRY=VALUE
333     self.cfg_params[opt[1:]] = val
334     pass
335     pass
336     return
337    
338 slacapra 1.8 def parseRange_(self, aRange):
339 nsmirnov 1.4 """
340 slacapra 1.8 Takes as the input a string with a range defined in any of the following
341     way, including combination, and return a tuple with the ints defined
342     according to following table. A consistency check is done.
343     NB: the first job is "1", not "0".
344     'all' -> [1,2,..., NJobs]
345     '' -> [1,2,..., NJobs]
346     'n1' -> [n1]
347     'n1-n2' -> [n1, n1+1, n1+2, ..., n2-1, n2]
348     'n1,n2' -> [n1, n2]
349     'n1,n2-n3' -> [n1, n2, n2+1, n2+2, ..., n3-1, n3]
350     """
351     result = []
352    
353 slacapra 1.9 common.logger.debug(5,"parseRange_ "+str(aRange))
354     if aRange=='all' or aRange==None or aRange=='':
355 slacapra 1.73 result=range(1,common.jobDB.nJobs()+1)
356 slacapra 1.8 return result
357 slacapra 1.9 elif aRange=='0':
358     return result
359 slacapra 1.8
360     subRanges = string.split(aRange, ',')
361     for subRange in subRanges:
362     result = result+self.parseSimpleRange_(subRange)
363    
364     if self.checkUniqueness_(result):
365     return result
366     else:
367 slacapra 1.33 common.logger.message("Error "+result)
368 slacapra 1.8 return []
369    
370     def checkUniqueness_(self, list):
371     """
372 slacapra 1.9 check if a list contains only unique elements
373 slacapra 1.8 """
374    
375     uniqueList = []
376     # use a list comprehension statement (takes a while to understand)
377    
378     [uniqueList.append(it) for it in list if not uniqueList.count(it)]
379    
380     return (len(list)==len(uniqueList))
381    
382 spiga 1.107 def uuidgen(self):
383     """Generate a UUID"""
384     return commands.getoutput('uuidgen')
385    
386    
387 slacapra 1.8 def parseSimpleRange_(self, aRange):
388     """
389     Takes as the input a string with two integers separated by
390     the minus sign and returns the tuple with these numbers:
391     'n1-n2' -> [n1, n1+1, n1+2, ..., n2-1, n2]
392     'n1' -> [n1]
393     """
394     (start, end) = (None, None)
395    
396     result = []
397     minus = string.find(aRange, '-')
398     if ( minus < 0 ):
399     if isInt(aRange) and int(aRange)>0:
400 corvo 1.97 # FEDE
401     #result.append(int(aRange)-1)
402     ###
403 slacapra 1.62 result.append(int(aRange))
404 slacapra 1.8 else:
405 spiga 1.47 common.logger.message("parseSimpleRange_ ERROR "+aRange)
406     usage()
407     pass
408    
409 nsmirnov 1.4 pass
410     else:
411 slacapra 1.8 (start, end) = string.split(aRange, '-')
412     if isInt(start) and isInt(end) and int(start)>0 and int(start)<int(end):
413 corvo 1.97 #result=range(int(start)-1, int(end))
414     result=range(int(start), int(end)+1) #Daniele
415 slacapra 1.8 else:
416 slacapra 1.33 common.logger.message("parseSimpleRange_ ERROR "+start+end)
417 slacapra 1.8
418     return result
419 nsmirnov 1.4
420 nsmirnov 1.3 def initializeActions_(self, opts):
421 nsmirnov 1.1 """
422     For each user action instantiate a corresponding
423     object and put it in the action dictionary.
424     """
425 spiga 1.15
426     for opt in opts.keys():
427    
428 nsmirnov 1.1 val = opts[opt]
429 spiga 1.15
430    
431     if ( opt == '-create' ):
432 gutsche 1.83 if val and val != 'all':
433     msg = 'Per default, CRAB will create all jobs as specified in the crab.cfg file, not the command line!'
434     common.logger.message(msg)
435     msg = 'Submission will still take into account the number of jobs specified on the command line!\n'
436     common.logger.message(msg)
437 slacapra 1.82 ncjobs = 'all'
438    
439     # Instantiate Creator object
440     self.creator = Creator(self.job_type_name,
441     self.cfg_params,
442     ncjobs)
443     self.actions[opt] = self.creator
444    
445     # Initialize the JobDB object if needed
446     if not self.flag_continue:
447     common.jobDB.create(self.creator.nJobs())
448 nsmirnov 1.1 pass
449 nsmirnov 1.3
450 slacapra 1.82 # Create and initialize JobList
451 nsmirnov 1.3
452 slacapra 1.82 common.job_list = JobList(common.jobDB.nJobs(),
453     self.creator.jobType())
454 nsmirnov 1.3
455 slacapra 1.82 common.taskDB.setDict('ScriptName',common.work_space.jobDir()+"/"+self.job_type_name+'.sh')
456     common.taskDB.setDict('JdlName',common.work_space.jobDir()+"/"+self.job_type_name+'.jdl')
457     common.taskDB.setDict('CfgName',common.work_space.jobDir()+"/"+self.creator.jobType().configFilename())
458     common.job_list.setScriptNames(self.job_type_name+'.sh')
459     common.job_list.setJDLNames(self.job_type_name+'.jdl')
460     common.job_list.setCfgNames(self.creator.jobType().configFilename())
461 slacapra 1.9
462 slacapra 1.82 self.creator.writeJobsSpecsToDB()
463 spiga 1.107 taskUnicId= self.uuidgen()
464     common.taskDB.setDict('TasKUUID',taskUnicId)
465 slacapra 1.77
466 slacapra 1.82 common.taskDB.save()
467 nsmirnov 1.3 pass
468 nsmirnov 1.1
469     elif ( opt == '-submit' ):
470 spiga 1.107 # modified to support server mode
471     if (self.UseServer== 1):
472     self.actions[opt] = SubmitterServer(self.cfg_params)
473     else:
474     # modified to support server mode
475     # get user request
476     nsjobs = -1
477     if val:
478     if ( isInt(val) ):
479     nsjobs = int(val)
480     elif (val=='all'):
481     pass
482     else:
483     msg = 'Bad submission option <'+str(val)+'>\n'
484     msg += ' Must be an integer or "all"'
485     msg += ' Generic range is not allowed"'
486     raise CrabException(msg)
487     pass
488    
489     common.logger.debug(5,'nsjobs '+str(nsjobs))
490     # total jobs
491     nj_list = []
492     # get the first not already submitted
493     common.logger.debug(5,'Total jobs '+str(common.jobDB.nJobs()))
494     jobSetForSubmission = 0
495     for nj in range(common.jobDB.nJobs()):
496     if (common.jobDB.status(nj) not in ['R','S','K','Y','A','D','Z']):
497     jobSetForSubmission +=1
498     nj_list.append(nj)
499     else: continue
500     if nsjobs >0 and nsjobs == jobSetForSubmission:
501     break
502     pass
503     if nsjobs>jobSetForSubmission:
504     common.logger.message('asking to submit '+str(nsjobs)+' jobs, but only '+str(jobSetForSubmission)+' left: submitting those')
505    
506     # submit N from last submitted job
507     common.logger.debug(5,'nj_list '+str(nj_list))
508    
509     if len(nj_list) != 0:
510     # Instantiate Submitter object
511     self.actions[opt] = Submitter(self.cfg_params, nj_list)
512     # Create and initialize JobList
513     if len(common.job_list) == 0 :
514     common.job_list = JobList(common.jobDB.nJobs(),
515     None)
516     common.job_list.setJDLNames(self.job_type_name+'.jdl')
517     pass
518 slacapra 1.23 pass
519 slacapra 1.22 else:
520 spiga 1.107 common.logger.message('No jobs left to submit: exiting...')
521 nsmirnov 1.1 pass
522    
523 nsmirnov 1.4 elif ( opt == '-list' ):
524 slacapra 1.8 jobs = self.parseRange_(val)
525    
526     common.jobDB.dump(jobs)
527 nsmirnov 1.4 pass
528    
529 slacapra 1.67 elif ( opt == '-printId' ):
530 corvo 1.93
531 corvo 1.97 try:
532     common.scheduler.bossTask.query(ALL)
533     except RuntimeError,e:
534     common.logger.message( e.__str__() )
535     except ValueError,e:
536 slacapra 1.103 common.logger.message("Warning : Scheduler interaction in query operation failed for jobs:")
537     common.logger.message(e.what())
538 corvo 1.97 pass
539 corvo 1.93 task = common.scheduler.bossTask.jobsDict()
540    
541 corvo 1.97 for c, v in task.iteritems():
542     k = int(c)
543     nj=k
544 corvo 1.93 id = v['CHAIN_ID']
545     jid = v['SCHED_ID']
546     if jid:
547     print "Job: %-5s Id = %-40s " %(id,jid)
548     #else:
549     # print "Job: ",id," No ID yet"
550 slacapra 1.67 pass
551    
552 nsmirnov 1.4 elif ( opt == '-status' ):
553 spiga 1.107 # modified to support server mode
554     if (self.UseServer== 1):
555     self.actions[opt] = StatusServer(self.cfg_params)
556     else:
557     jobs = self.parseRange_(val)
558    
559     if len(jobs) != 0:
560     self.actions[opt] = StatusBoss(self.cfg_params)
561     pass
562 slacapra 1.22
563 nsmirnov 1.4 elif ( opt == '-kill' ):
564 slacapra 1.8
565 slacapra 1.84 if val:
566     if val =='all':
567     jobs = common.scheduler.listBoss()
568 spiga 1.31 else:
569 slacapra 1.84 jobs = self.parseRange_(val)
570     common.scheduler.cancel(jobs)
571 spiga 1.31 else:
572 slacapra 1.84 common.logger.message("Warning: with '-kill' you _MUST_ specify a job range or 'all'")
573 nsmirnov 1.1
574 slacapra 1.70 elif ( opt == '-getoutput' or opt == '-get'):
575 spiga 1.107 # modified to support server mode
576     if (self.UseServer== 1):
577     self.actions[opt] = GetOutputServer(self.cfg_params)
578 spiga 1.20 else:
579 spiga 1.13
580 spiga 1.107 if val=='all' or val==None or val=='':
581     jobs = common.scheduler.listBoss()
582     else:
583     jobs = self.parseRange_(val)
584    
585     jobs_done = []
586     for nj in jobs:
587     jobs_done.append(nj)
588     common.scheduler.getOutput(jobs_done)
589     pass
590 nsmirnov 1.4
591     elif ( opt == '-resubmit' ):
592 slacapra 1.78 if val=='all' or val==None or val=='':
593     jobs = common.scheduler.listBoss()
594 fanzago 1.37 else:
595 slacapra 1.78 jobs = self.parseRange_(val)
596 fanzago 1.37
597 slacapra 1.11 if val:
598     # create a list of jobs to be resubmitted.
599 corvo 1.93 val = string.replace(val,'-',':')
600 slacapra 1.8
601 slacapra 1.22 ### as before, create a Resubmittter Class
602 slacapra 1.78 maxIndex = common.scheduler.listBoss()
603 corvo 1.93 ##
604     # Marco. Vediamo se va meglio cosi'...
605     ##
606 corvo 1.97 try:
607     common.scheduler.bossTask.query(ALL, val)
608     except RuntimeError,e:
609     common.logger.message( e.__str__() )
610     except ValueError,e:
611 slacapra 1.103 common.logger.message( "Warning : Scheduler interaction in query operation failed for jobs:")
612     common.logger.message(e.what())
613 corvo 1.97 pass
614 corvo 1.93 task = common.scheduler.bossTask.jobsDict()
615    
616 slacapra 1.11 nj_list = []
617 corvo 1.93 for c, v in task.iteritems():
618 corvo 1.97 k = int(c)
619     nj=k
620 corvo 1.93 st = v['STATUS']
621    
622 corvo 1.97 if int(nj) <= int(len(maxIndex)) :
623 spiga 1.104 if st in ['K','SA','Z','DA']:
624 corvo 1.97 nj_list.append(int(nj)-1)
625     common.jobDB.setStatus(int(nj)-1,'C')
626 spiga 1.99 elif st in ['E','SE']:
627 spiga 1.60 common.scheduler.moveOutput(nj)
628 corvo 1.97 nj_list.append(int(nj)-1)
629     st = common.jobDB.setStatus(int(nj)-1,'RC')
630 corvo 1.93 elif st in ['W']:
631 corvo 1.97 common.logger.message('Job #'+`int(nj)`+' has status '+crabJobStatusToString(st)+' not yet submitted!!!')
632 spiga 1.60 pass
633 corvo 1.93 elif st in ['SD', 'OR']:
634 corvo 1.97 common.logger.message('Job #'+`int(nj)`+' has status '+crabJobStatusToString(st)+' must be retrieved before resubmission')
635 spiga 1.60 else:
636     common.logger.message('Job #'+`nj`+' has status '+crabJobStatusToString(st)+' must be "killed" before resubmission')
637 slacapra 1.11 else:
638 corvo 1.97 common.logger.message('Job #'+`int(nj)`+' no possible to resubmit!! out of range')
639 fanzago 1.37 if len(common.job_list) == 0 :
640     common.job_list = JobList(common.jobDB.nJobs(),None)
641     common.job_list.setJDLNames(self.job_type_name+'.jdl')
642     pass
643    
644 slacapra 1.11 if len(nj_list) != 0:
645 corvo 1.93 nj_list.sort()
646 slacapra 1.11 # Instantiate Submitter object
647 fanzago 1.58 self.actions[opt] = Submitter(self.cfg_params, nj_list)
648 slacapra 1.11
649 slacapra 1.8 pass
650     pass
651 slacapra 1.11 else:
652     common.logger.message("Warning: with '-resubmit' you _MUST_ specify a job range or 'all'")
653 spiga 1.60 common.logger.message("WARNING: _all_ job specified in the range will be resubmitted!!!")
654 slacapra 1.11 pass
655 fanzago 1.37 common.jobDB.save()
656 slacapra 1.8 pass
657 gutsche 1.61
658 slacapra 1.8 elif ( opt == '-cancelAndResubmit' ):
659 nsmirnov 1.5
660 slacapra 1.78 if val:
661     if val =='all':
662     jobs = common.scheduler.listBoss()
663 spiga 1.47 else:
664 slacapra 1.78 jobs = self.parseRange_(val)
665     # kill submitted jobs
666     common.scheduler.cancel(jobs)
667 spiga 1.47 else:
668 slacapra 1.78 common.logger.message("Warning: with '-cancelAndResubmit' you _MUST_ specify a job range or 'all'")
669 nsmirnov 1.5
670 spiga 1.47 # resubmit cancelled jobs.
671     if val:
672     nj_list = []
673     for nj in jobs:
674     st = common.jobDB.status(int(nj)-1)
675     if st in ['K','A']:
676 corvo 1.97 nj_list.append(int(nj)-1)
677     common.jobDB.setStatus(int(nj)-1,'C')
678 spiga 1.47 elif st == 'Y':
679     common.scheduler.moveOutput(nj)
680 corvo 1.97 nj_list.append(int(nj)-1)
681     st = common.jobDB.setStatus(int(nj)-1,'RC')
682 spiga 1.47 elif st in ['C','X']:
683 corvo 1.97 common.logger.message('Job #'+`int(nj)`+' has status '+crabJobStatusToString(st)+' not yet submitted!!!')
684 spiga 1.47 pass
685     elif st == 'D':
686 corvo 1.97 common.logger.message('Job #'+`int(nj)`+' has status '+crabJobStatusToString(st)+' must be retrieved before resubmission')
687 spiga 1.47 else:
688     common.logger.message('Job #'+`nj`+' has status '+crabJobStatusToString(st)+' must be "killed" before resubmission')
689     pass
690    
691 nsmirnov 1.5 if len(common.job_list) == 0 :
692 spiga 1.47 common.job_list = JobList(common.jobDB.nJobs(),None)
693 nsmirnov 1.5 common.job_list.setJDLNames(self.job_type_name+'.jdl')
694     pass
695 spiga 1.47
696     if len(nj_list) != 0:
697 spiga 1.86 # common.scheduler.resubmit(nj_list)
698 fanzago 1.58 self.actions[opt] = Submitter(self.cfg_params, nj_list)
699 spiga 1.47 pass
700     pass
701     else:
702     common.logger.message("WARNING: _all_ job specified in the rage will be cancelled and resubmitted!!!")
703 nsmirnov 1.5 pass
704 spiga 1.47 common.jobDB.save()
705 nsmirnov 1.4 pass
706 spiga 1.47
707 slacapra 1.28 elif ( opt == '-testJdl' ):
708 slacapra 1.8 jobs = self.parseRange_(val)
709     nj_list = []
710     for nj in jobs:
711 slacapra 1.62 st = common.jobDB.status(nj-1)
712     if st == 'C': nj_list.append(nj-1)
713 slacapra 1.8 pass
714    
715     if len(nj_list) != 0:
716 slacapra 1.77 # Instantiate Checker object
717 slacapra 1.8 self.actions[opt] = Checker(self.cfg_params, nj_list)
718    
719     # Create and initialize JobList
720    
721     if len(common.job_list) == 0 :
722     common.job_list = JobList(common.jobDB.nJobs(), None)
723     common.job_list.setJDLNames(self.job_type_name+'.jdl')
724     pass
725     pass
726    
727 slacapra 1.9 elif ( opt == '-postMortem' ):
728 corvo 1.95
729 spiga 1.108 # modified to support server mode
730     if (self.UseServer== 1):
731     self.actions[opt] = PostMortemServer(self.cfg_params)
732     else:
733     if val:
734     val = string.replace(val,'-',':')
735     else: val=''
736     nj_list = {}
737 corvo 1.97
738 spiga 1.108 try:
739     common.scheduler.bossTask.query(ALL, val)
740     except RuntimeError,e:
741     common.logger.message( e.__str__() )
742     except ValueError,e:
743     common.logger.message("Warning : Scheduler interaction in query operation failed for jobs:")
744     common.logger.message( e.what())
745     pass
746 corvo 1.93
747 spiga 1.108 task = common.scheduler.bossTask.jobsDict()
748    
749     for c, v in task.iteritems():
750     k = int(c)
751     nj=k
752     if v['SCHED_ID']: nj_list[v['CHAIN_ID']]=v['SCHED_ID']
753     pass
754 slacapra 1.9
755 spiga 1.108 if len(nj_list) != 0:
756 corvo 1.93 # Instantiate PostMortem object
757 spiga 1.108 self.actions[opt] = PostMortem(self.cfg_params, nj_list)
758 slacapra 1.9 # Create and initialize JobList
759 spiga 1.108 if len(common.job_list) == 0 :
760     common.job_list = JobList(common.jobDB.nJobs(), None)
761     common.job_list.setJDLNames(self.job_type_name+'.jdl')
762     pass
763 slacapra 1.9 pass
764 spiga 1.108 else:
765     common.logger.message("No jobs to analyze")
766 slacapra 1.9
767     elif ( opt == '-clean' ):
768     if val != None:
769     raise CrabException("No range allowed for '-clean'")
770    
771 slacapra 1.105 self.actions[opt] = Cleaner(self.cfg_params)
772 slacapra 1.9
773 nsmirnov 1.1 pass
774     return
775    
776 nsmirnov 1.3 def createWorkingSpace_(self):
777 slacapra 1.9 new_dir = ''
778    
779     try:
780     new_dir = self.cfg_params['USER.ui_working_dir']
781 corvo 1.50 self.cfg_params['taskId'] = self.cfg_params['user'] + '_' + string.split(new_dir, '/')[len(string.split(new_dir, '/')) - 1] + '_' + self.current_time
782 fanzago 1.48 if os.path.exists(new_dir):
783     if os.listdir(new_dir):
784     msg = new_dir + ' already exists and is not empty. Please remove it before create new task'
785     raise CrabException(msg)
786 spiga 1.89 if len(string.split(new_dir, '/')) == 1:
787     new_dir = self.cwd + new_dir
788     else:
789     new_dir = new_dir
790 slacapra 1.9 except KeyError:
791     new_dir = common.prog_name + '_' + self.name + '_' + self.current_time
792 corvo 1.50 self.cfg_params['taskId'] = self.cfg_params['user'] + '_' + new_dir
793 slacapra 1.9 new_dir = self.cwd + new_dir
794     pass
795 fanzago 1.49 common.work_space = WorkSpace(new_dir, self.cfg_params)
796 nsmirnov 1.1 common.work_space.create()
797     return
798    
799 nsmirnov 1.3 def loadConfiguration_(self, opts):
800 nsmirnov 1.1
801     save_opts = common.work_space.loadSavedOptions()
802    
803     # Override saved options with new command-line options
804    
805     for k in opts.keys():
806     save_opts[k] = opts[k]
807     pass
808    
809     # Return updated options
810     return save_opts
811    
812 nsmirnov 1.3 def createLogger_(self, args):
813 nsmirnov 1.1
814     log = Logger()
815     log.quiet(self.flag_quiet)
816     log.setDebugLevel(self.debug_level)
817     log.write(args+'\n')
818 nsmirnov 1.3 log.message(self.headerString_())
819 nsmirnov 1.1 log.flush()
820     common.logger = log
821     return
822    
823 nsmirnov 1.3 def updateHistory_(self, args):
824 nsmirnov 1.1 history_fname = common.prog_name+'.history'
825     history_file = open(history_fname, 'a')
826     history_file.write(self.current_time+': '+args+'\n')
827     history_file.close()
828     return
829    
830 nsmirnov 1.3 def headerString_(self):
831 nsmirnov 1.1 """
832     Creates a string describing program options either given in
833     the command line or their default values.
834     """
835     header = common.prog_name + ' (version ' + common.prog_version_str + \
836     ') running on ' + \
837     time.ctime(time.time())+'\n\n' + \
838     common.prog_name+'. Working options:\n'
839 fanzago 1.59 #print self.job_type_name
840 nsmirnov 1.1 header = header +\
841 slacapra 1.85 ' scheduler ' + self.cfg_params['CRAB.scheduler'] + '\n'+\
842 nsmirnov 1.1 ' job type ' + self.job_type_name + '\n'+\
843     ' working directory ' + common.work_space.topDir()\
844     + '\n'
845     return header
846    
847 nsmirnov 1.3 def createScheduler_(self):
848 nsmirnov 1.1 """
849     Creates a scheduler object instantiated by its name.
850     """
851 slacapra 1.101 klass_name = 'SchedulerBoss'
852 nsmirnov 1.1 file_name = klass_name
853     try:
854     klass = importName(file_name, klass_name)
855     except KeyError:
856     msg = 'No `class '+klass_name+'` found in file `'+file_name+'.py`'
857     raise CrabException(msg)
858     except ImportError, e:
859 slacapra 1.101 msg = 'Cannot create scheduler Boss'
860 nsmirnov 1.1 msg += ' (file: '+file_name+', class '+klass_name+'):\n'
861     msg += str(e)
862     raise CrabException(msg)
863    
864     common.scheduler = klass()
865     common.scheduler.configure(self.cfg_params)
866     return
867    
868     def run(self):
869     """
870     For each
871     """
872    
873     for act in self.main_actions:
874     if act in self.actions.keys(): self.actions[act].run()
875     pass
876    
877     for act in self.aux_actions:
878     if act in self.actions.keys(): self.actions[act].run()
879     pass
880     return
881    
882     ###########################################################################
883     def processHelpOptions(opts):
884    
885 slacapra 1.11 if len(opts):
886     for opt in opts.keys():
887     if opt in ('-v', '-version', '--version') :
888     print Crab.version()
889     return 1
890     if opt in ('-h','-help','--help') :
891     if opts[opt] : help(opts[opt])
892     else: help()
893     return 1
894     else:
895     usage()
896 corvo 1.93
897 nsmirnov 1.1 return 0
898    
899 corvo 1.93 ###########################################################################
900 nsmirnov 1.1 if __name__ == '__main__':
901 slacapra 1.92 ## Get rid of some useless warning
902 slacapra 1.94 try:
903     import warnings
904     warnings.simplefilter("ignore", RuntimeWarning)
905     except:
906     pass # too bad, you'll get the warning
907 corvo 1.93
908 nsmirnov 1.1 # Parse command-line options and create a dictionary with
909     # key-value pairs.
910    
911     options = parseOptions(sys.argv[1:])
912    
913     # Process "help" options, such as '-help', '-version'
914    
915 slacapra 1.11 if processHelpOptions(options) : sys.exit(0)
916 nsmirnov 1.1
917     # Create, initialize, and run a Crab object
918    
919     try:
920 nsmirnov 1.3 crab = Crab(options)
921 nsmirnov 1.1 crab.run()
922 corvo 1.54 crab.cfg_params['apmon'].free()
923 nsmirnov 1.1 except CrabException, e:
924     print '\n' + common.prog_name + ': ' + str(e) + '\n'
925     if common.logger:
926     common.logger.write('ERROR: '+str(e)+'\n')
927     pass
928     pass
929    
930     pass