ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/crab.py
Revision: 1.114
Committed: Thu Jun 28 15:53:37 2007 UTC (17 years, 10 months ago) by slacapra
Content type: text/x-python
Branch: MAIN
Changes since 1.113: +2 -1 lines
Log Message:
if datasetpath is None, perform submission even if no destination is selected

File Contents

# User Rev Content
1 elmer 1.65 #!/usr/bin/env python
2 nsmirnov 1.1 from crab_help import *
3     from crab_util import *
4     from crab_exceptions import *
5     from crab_logger import Logger
6     from WorkSpace import WorkSpace
7     from JobDB import JobDB
8 slacapra 1.77 from TaskDB import TaskDB
9 nsmirnov 1.3 from JobList import JobList
10 nsmirnov 1.1 from Creator import Creator
11     from Submitter import Submitter
12 slacapra 1.8 from Checker import Checker
13 slacapra 1.9 from PostMortem import PostMortem
14     from Status import Status
15 spiga 1.16 from StatusBoss import StatusBoss
16 corvo 1.42 from ApmonIf import ApmonIf
17 slacapra 1.36 from Cleaner import Cleaner
18 nsmirnov 1.1 import common
19 spiga 1.13 import Statistic
20 spiga 1.107 import commands
21 nsmirnov 1.1
22 spiga 1.98 from BossSession import *
23 corvo 1.93
24 spiga 1.107 #modified to support server mode
25     from SubmitterServer import SubmitterServer
26     from GetOutputServer import GetOutputServer
27     from StatusServer import StatusServer
28 spiga 1.108 from PostMortemServer import PostMortemServer
29 corvo 1.109 from KillerServer import KillerServer
30 spiga 1.107
31 nsmirnov 1.1 import sys, os, time, string
32    
33     ###########################################################################
34     class Crab:
35 nsmirnov 1.3 def __init__(self, opts):
36 fanzago 1.76 ## test_tag
37 nsmirnov 1.1 # The order of main_actions is important !
38 fanzago 1.111 self.main_actions = [ '-create', '-submit' ]
39     ### FEDE new option "-publish" FOR DBS OUTPUT PUBLICATION
40 slacapra 1.70 self.aux_actions = [ '-list', '-kill', '-status', '-getoutput','-get',
41 slacapra 1.67 '-resubmit' , '-cancelAndResubmit', '-testJdl', '-postMortem', '-clean',
42 fanzago 1.111 '-printId', '-publish' ]
43 nsmirnov 1.1
44     # Dictionary of actions, e.g. '-create' -> object of class Creator
45     self.actions = {}
46 fanzago 1.76
47 nsmirnov 1.1 # Configuration file
48     self.cfg_fname = None
49     # Dictionary with configuration parameters
50     self.cfg_params = {}
51    
52     # Current working directory
53     self.cwd = os.getcwd()+'/'
54     # Current time in format 'yymmdd_hhmmss'
55     self.current_time = time.strftime('%y%m%d_%H%M%S',
56     time.localtime(time.time()))
57    
58 nsmirnov 1.3 # Session name (?) Do we need this ?
59 nsmirnov 1.1 self.name = '0'
60    
61     # Job type
62     self.job_type_name = None
63    
64     # Continuation flag
65     self.flag_continue = 0
66    
67     # quiet mode, i.e. no output on screen
68     self.flag_quiet = 0
69     # produce more output
70     self.debug_level = 0
71    
72    
73 nsmirnov 1.3 self.initialize_(opts)
74 nsmirnov 1.1
75     return
76    
77 nsmirnov 1.7 def version():
78 nsmirnov 1.1 return common.prog_version_str
79    
80 nsmirnov 1.7 version = staticmethod(version)
81    
82 nsmirnov 1.3 def initialize_(self, opts):
83 nsmirnov 1.1
84     # Process the '-continue' option first because
85     # in the case of continuation the CRAB configuration
86     # parameters are loaded from already existing Working Space.
87 nsmirnov 1.3 self.processContinueOption_(opts)
88 nsmirnov 1.1
89     # Process ini-file first, then command line options
90     # because they override ini-file settings.
91    
92 nsmirnov 1.3 self.processIniFile_(opts)
93 nsmirnov 1.1
94 nsmirnov 1.3 if self.flag_continue: opts = self.loadConfiguration_(opts)
95 nsmirnov 1.1
96 nsmirnov 1.3 self.processOptions_(opts)
97 nsmirnov 1.1
98 slacapra 1.77 common.taskDB = TaskDB()
99    
100 slacapra 1.101
101 nsmirnov 1.1 if not self.flag_continue:
102 nsmirnov 1.3 self.createWorkingSpace_()
103 slacapra 1.9 optsToBeSaved={}
104     for it in opts.keys():
105     if (it in self.main_actions) or (it in self.aux_actions) or (it == '-debug'):
106     pass
107     else:
108     optsToBeSaved[it]=opts[it]
109 slacapra 1.101 # store in taskDB the opts
110     common.taskDB.setDict(it[1:], optsToBeSaved[it])
111 slacapra 1.9 common.work_space.saveConfiguration(optsToBeSaved, self.cfg_fname)
112 nsmirnov 1.1 pass
113 slacapra 1.77 else:
114     common.taskDB.load()
115 nsmirnov 1.1
116     # At this point all configuration options have been read.
117    
118     args = string.join(sys.argv,' ')
119 slacapra 1.11
120 nsmirnov 1.3 self.updateHistory_(args)
121 slacapra 1.11
122 nsmirnov 1.3 self.createLogger_(args)
123 slacapra 1.11
124 nsmirnov 1.1 common.jobDB = JobDB()
125 corvo 1.51
126 spiga 1.107 self.UseServer=0
127     try:
128     self.UseServer=int(self.cfg_params['CRAB.server_mode'])
129     except KeyError:
130     pass
131    
132    
133 corvo 1.63 if int(self.cfg_params['USER.activate_monalisa']): self.cfg_params['apmon'] = ApmonIf()
134    
135 nsmirnov 1.3 if self.flag_continue:
136 slacapra 1.12 try:
137     common.jobDB.load()
138 corvo 1.50 self.cfg_params['taskId'] = common.jobDB._jobs[0].taskId
139 slacapra 1.12 common.logger.debug(6, str(common.jobDB))
140     except DBException,e:
141     pass
142 nsmirnov 1.3 pass
143 slacapra 1.11
144 nsmirnov 1.3 self.createScheduler_()
145 slacapra 1.11
146 slacapra 1.106 common.logger.debug(6, 'Used properties:')
147     if (common.logger.debugLevel()<6 ):
148     common.logger.write('Used properties:')
149     keys = self.cfg_params.keys()
150     keys.sort()
151     for k in keys:
152     if self.cfg_params[k]:
153     common.logger.debug(6, ' '+k+' : '+str(self.cfg_params[k]))
154     if (common.logger.debugLevel()<6 ):
155     common.logger.write(' '+k+' : '+str(self.cfg_params[k]))
156     pass
157     else:
158     common.logger.debug(6, ' '+k+' : ')
159     if (common.logger.debugLevel()<6 ):
160     common.logger.write(' '+k+' : ')
161 nsmirnov 1.3 pass
162     pass
163 slacapra 1.106 common.logger.debug(6, 'End of used properties.\n')
164     if (common.logger.debugLevel()<6 ):
165     common.logger.write('End of used properties.\n')
166    
167 nsmirnov 1.3 self.initializeActions_(opts)
168 nsmirnov 1.1 return
169    
170 nsmirnov 1.3 def processContinueOption_(self,opts):
171 nsmirnov 1.1
172     continue_dir = None
173 nsmirnov 1.4
174     # Look for the '-continue' option.
175    
176 nsmirnov 1.1 for opt in opts.keys():
177     if ( opt in ('-continue','-c') ):
178     self.flag_continue = 1
179     val = opts[opt]
180     if val:
181     if val[0] == '/': continue_dir = val # abs path
182     else: continue_dir = self.cwd + val # rel path
183     pass
184 nsmirnov 1.4 break
185     pass
186    
187     # Look for actions which has sense only with '-continue'
188    
189     if not self.flag_continue:
190     for opt in opts.keys():
191 slacapra 1.102 if ( opt in (self.aux_actions)):
192 nsmirnov 1.4 self.flag_continue = 1
193     break
194 nsmirnov 1.1 pass
195     pass
196 slacapra 1.102 if ("-submit" in opts.keys() and "-create" not in opts.keys() ):
197     self.flag_continue = 1
198 nsmirnov 1.1
199     if not self.flag_continue: return
200    
201     if not continue_dir:
202     prefix = common.prog_name + '_' + self.name + '_'
203     continue_dir = findLastWorkDir(prefix)
204     pass
205    
206     if not continue_dir:
207     raise CrabException('Cannot find last working directory.')
208    
209     if not os.path.exists(continue_dir):
210     msg = 'Cannot continue because the working directory <'
211     msg += continue_dir
212     msg += '> does not exist.'
213     raise CrabException(msg)
214    
215     # Instantiate WorkSpace
216 fanzago 1.49 common.work_space = WorkSpace(continue_dir, self.cfg_params)
217 nsmirnov 1.1
218     return
219    
220 nsmirnov 1.3 def processIniFile_(self, opts):
221 nsmirnov 1.1 """
222     Processes a configuration INI-file.
223     """
224    
225     # Extract cfg-file name from the cmd-line options.
226    
227     for opt in opts.keys():
228     if ( opt == '-cfg' ):
229     if self.flag_continue:
230     raise CrabException('-continue and -cfg cannot coexist.')
231     if opts[opt] : self.cfg_fname = opts[opt]
232     else : usage()
233     pass
234    
235     elif ( opt == '-name' ):
236     self.name = opts[opt]
237     pass
238    
239     pass
240    
241     # Set default cfg-fname
242    
243     if self.cfg_fname == None:
244     if self.flag_continue:
245     self.cfg_fname = common.work_space.cfgFileName()
246     else:
247     self.cfg_fname = common.prog_name+'.cfg'
248     pass
249     pass
250    
251     # Load cfg-file
252    
253     if string.lower(self.cfg_fname) != 'none':
254     if os.path.exists(self.cfg_fname):
255     self.cfg_params = loadConfig(self.cfg_fname)
256 corvo 1.64 self.cfg_params['user'] = os.environ['USER']
257 nsmirnov 1.1 pass
258     else:
259     msg = 'cfg-file '+self.cfg_fname+' not found.'
260     raise CrabException(msg)
261     pass
262     pass
263    
264     # process the [CRAB] section
265    
266     lhp = len('CRAB.')
267     for k in self.cfg_params.keys():
268     if len(k) >= lhp and k[:lhp] == 'CRAB.':
269     opt = '-'+k[lhp:]
270     if len(opt) >= 3 and opt[:3] == '-__': continue
271     if opt not in opts.keys():
272     opts[opt] = self.cfg_params[k]
273     pass
274     pass
275     pass
276    
277     return
278    
279 nsmirnov 1.3 def processOptions_(self, opts):
280 nsmirnov 1.1 """
281     Processes the command-line options.
282     """
283    
284     for opt in opts.keys():
285     val = opts[opt]
286    
287 nsmirnov 1.3 # Skip actions, they are processed later in initializeActions_()
288     if opt in self.main_actions:
289     self.cfg_params['CRAB.'+opt[1:]] = val
290     continue
291     if opt in self.aux_actions:
292     self.cfg_params['CRAB.'+opt[1:]] = val
293     continue
294 nsmirnov 1.1
295 spiga 1.107 elif ( opt == '-server_mode' ): #Add for server mode usage
296     pass
297     elif ( opt == '-server_name' ):
298     pass
299 nsmirnov 1.1
300     elif ( opt == '-cfg' ):
301     pass
302    
303     elif ( opt in ('-continue', '-c') ):
304 nsmirnov 1.4 # Already processed in processContinueOption_()
305 nsmirnov 1.1 pass
306    
307     elif ( opt == '-jobtype' ):
308     if val : self.job_type_name = string.upper(val)
309     else : usage()
310     pass
311    
312     elif ( opt == '-Q' ):
313     self.flag_quiet = 1
314     pass
315    
316     elif ( opt == '-debug' ):
317 slacapra 1.6 if val: self.debug_level = int(val)
318     else: self.debug_level = 1
319 nsmirnov 1.1 pass
320    
321     elif ( opt == '-scheduler' ):
322     pass
323 slacapra 1.22
324 nsmirnov 1.3 elif string.find(opt,'.') == -1:
325     print common.prog_name+'. Unrecognized option '+opt
326     usage()
327     pass
328 nsmirnov 1.1
329 nsmirnov 1.3 # Override config parameters from INI-file with cmd-line params
330     if string.find(opt,'.') == -1 :
331     self.cfg_params['CRAB.'+opt[1:]] = val
332 nsmirnov 1.1 pass
333 nsmirnov 1.3 else:
334 nsmirnov 1.1 # Command line parameters in the form -SECTION.ENTRY=VALUE
335     self.cfg_params[opt[1:]] = val
336     pass
337     pass
338     return
339    
340 slacapra 1.8 def parseRange_(self, aRange):
341 nsmirnov 1.4 """
342 slacapra 1.8 Takes as the input a string with a range defined in any of the following
343     way, including combination, and return a tuple with the ints defined
344     according to following table. A consistency check is done.
345     NB: the first job is "1", not "0".
346     'all' -> [1,2,..., NJobs]
347     '' -> [1,2,..., NJobs]
348     'n1' -> [n1]
349     'n1-n2' -> [n1, n1+1, n1+2, ..., n2-1, n2]
350     'n1,n2' -> [n1, n2]
351     'n1,n2-n3' -> [n1, n2, n2+1, n2+2, ..., n3-1, n3]
352     """
353     result = []
354    
355 slacapra 1.9 common.logger.debug(5,"parseRange_ "+str(aRange))
356     if aRange=='all' or aRange==None or aRange=='':
357 slacapra 1.73 result=range(1,common.jobDB.nJobs()+1)
358 slacapra 1.8 return result
359 slacapra 1.9 elif aRange=='0':
360     return result
361 slacapra 1.8
362     subRanges = string.split(aRange, ',')
363     for subRange in subRanges:
364     result = result+self.parseSimpleRange_(subRange)
365    
366     if self.checkUniqueness_(result):
367     return result
368     else:
369 slacapra 1.33 common.logger.message("Error "+result)
370 slacapra 1.8 return []
371    
372     def checkUniqueness_(self, list):
373     """
374 slacapra 1.9 check if a list contains only unique elements
375 slacapra 1.8 """
376    
377     uniqueList = []
378     # use a list comprehension statement (takes a while to understand)
379    
380     [uniqueList.append(it) for it in list if not uniqueList.count(it)]
381    
382     return (len(list)==len(uniqueList))
383    
384 spiga 1.107 def uuidgen(self):
385     """Generate a UUID"""
386     return commands.getoutput('uuidgen')
387    
388    
389 slacapra 1.8 def parseSimpleRange_(self, aRange):
390     """
391     Takes as the input a string with two integers separated by
392     the minus sign and returns the tuple with these numbers:
393     'n1-n2' -> [n1, n1+1, n1+2, ..., n2-1, n2]
394     'n1' -> [n1]
395     """
396     (start, end) = (None, None)
397    
398     result = []
399     minus = string.find(aRange, '-')
400     if ( minus < 0 ):
401     if isInt(aRange) and int(aRange)>0:
402 corvo 1.97 # FEDE
403     #result.append(int(aRange)-1)
404     ###
405 slacapra 1.62 result.append(int(aRange))
406 slacapra 1.8 else:
407 spiga 1.47 common.logger.message("parseSimpleRange_ ERROR "+aRange)
408     usage()
409     pass
410    
411 nsmirnov 1.4 pass
412     else:
413 slacapra 1.8 (start, end) = string.split(aRange, '-')
414     if isInt(start) and isInt(end) and int(start)>0 and int(start)<int(end):
415 corvo 1.97 #result=range(int(start)-1, int(end))
416     result=range(int(start), int(end)+1) #Daniele
417 slacapra 1.8 else:
418 slacapra 1.33 common.logger.message("parseSimpleRange_ ERROR "+start+end)
419 slacapra 1.8
420     return result
421 nsmirnov 1.4
422 nsmirnov 1.3 def initializeActions_(self, opts):
423 nsmirnov 1.1 """
424     For each user action instantiate a corresponding
425     object and put it in the action dictionary.
426     """
427 spiga 1.15
428     for opt in opts.keys():
429    
430 nsmirnov 1.1 val = opts[opt]
431 spiga 1.15
432    
433     if ( opt == '-create' ):
434 gutsche 1.83 if val and val != 'all':
435     msg = 'Per default, CRAB will create all jobs as specified in the crab.cfg file, not the command line!'
436     common.logger.message(msg)
437     msg = 'Submission will still take into account the number of jobs specified on the command line!\n'
438     common.logger.message(msg)
439 slacapra 1.82 ncjobs = 'all'
440    
441     # Instantiate Creator object
442     self.creator = Creator(self.job_type_name,
443     self.cfg_params,
444     ncjobs)
445     self.actions[opt] = self.creator
446    
447     # Initialize the JobDB object if needed
448     if not self.flag_continue:
449     common.jobDB.create(self.creator.nJobs())
450 nsmirnov 1.1 pass
451 nsmirnov 1.3
452 slacapra 1.82 # Create and initialize JobList
453 nsmirnov 1.3
454 slacapra 1.82 common.job_list = JobList(common.jobDB.nJobs(),
455     self.creator.jobType())
456 nsmirnov 1.3
457 slacapra 1.82 common.taskDB.setDict('ScriptName',common.work_space.jobDir()+"/"+self.job_type_name+'.sh')
458     common.taskDB.setDict('JdlName',common.work_space.jobDir()+"/"+self.job_type_name+'.jdl')
459     common.taskDB.setDict('CfgName',common.work_space.jobDir()+"/"+self.creator.jobType().configFilename())
460     common.job_list.setScriptNames(self.job_type_name+'.sh')
461     common.job_list.setJDLNames(self.job_type_name+'.jdl')
462     common.job_list.setCfgNames(self.creator.jobType().configFilename())
463 slacapra 1.9
464 slacapra 1.82 self.creator.writeJobsSpecsToDB()
465 spiga 1.107 taskUnicId= self.uuidgen()
466     common.taskDB.setDict('TasKUUID',taskUnicId)
467 slacapra 1.77
468 slacapra 1.82 common.taskDB.save()
469 nsmirnov 1.3 pass
470 nsmirnov 1.1
471     elif ( opt == '-submit' ):
472 spiga 1.107 # modified to support server mode
473     if (self.UseServer== 1):
474     self.actions[opt] = SubmitterServer(self.cfg_params)
475     else:
476     # modified to support server mode
477     # get user request
478     nsjobs = -1
479     if val:
480     if ( isInt(val) ):
481     nsjobs = int(val)
482     elif (val=='all'):
483     pass
484     else:
485     msg = 'Bad submission option <'+str(val)+'>\n'
486     msg += ' Must be an integer or "all"'
487     msg += ' Generic range is not allowed"'
488     raise CrabException(msg)
489     pass
490    
491     common.logger.debug(5,'nsjobs '+str(nsjobs))
492     # total jobs
493     nj_list = []
494     # get the first not already submitted
495     common.logger.debug(5,'Total jobs '+str(common.jobDB.nJobs()))
496     jobSetForSubmission = 0
497 gutsche 1.110 jobSkippedInSubmission = []
498 slacapra 1.114 datasetpath=self.cfg_params['CMSSW.datasetpath']
499 spiga 1.107 for nj in range(common.jobDB.nJobs()):
500 slacapra 1.114 if (len(common.jobDB.destination(nj)) != 0) or (datasetpath != None ):
501 gutsche 1.110 if (common.jobDB.status(nj) not in ['R','S','K','Y','A','D','Z']):
502     jobSetForSubmission +=1
503     nj_list.append(nj)
504     else: continue
505     else :
506     jobSkippedInSubmission.append(nj+1)
507 spiga 1.107 if nsjobs >0 and nsjobs == jobSetForSubmission:
508     break
509     pass
510     if nsjobs>jobSetForSubmission:
511     common.logger.message('asking to submit '+str(nsjobs)+' jobs, but only '+str(jobSetForSubmission)+' left: submitting those')
512 gutsche 1.110 if len(jobSkippedInSubmission) > 0 :
513     common.logger.message("Jobs: " + spanRanges(jobSkippedInSubmission) + " skipped because no sites are hosting this data")
514 spiga 1.107 # submit N from last submitted job
515     common.logger.debug(5,'nj_list '+str(nj_list))
516    
517     if len(nj_list) != 0:
518     # Instantiate Submitter object
519     self.actions[opt] = Submitter(self.cfg_params, nj_list)
520     # Create and initialize JobList
521     if len(common.job_list) == 0 :
522     common.job_list = JobList(common.jobDB.nJobs(),
523     None)
524     common.job_list.setJDLNames(self.job_type_name+'.jdl')
525     pass
526 slacapra 1.23 pass
527 slacapra 1.22 else:
528 spiga 1.107 common.logger.message('No jobs left to submit: exiting...')
529 nsmirnov 1.1 pass
530    
531 nsmirnov 1.4 elif ( opt == '-list' ):
532 slacapra 1.8 jobs = self.parseRange_(val)
533    
534     common.jobDB.dump(jobs)
535 nsmirnov 1.4 pass
536    
537 slacapra 1.67 elif ( opt == '-printId' ):
538 corvo 1.93
539 corvo 1.97 try:
540     common.scheduler.bossTask.query(ALL)
541     except RuntimeError,e:
542     common.logger.message( e.__str__() )
543     except ValueError,e:
544 slacapra 1.103 common.logger.message("Warning : Scheduler interaction in query operation failed for jobs:")
545     common.logger.message(e.what())
546 corvo 1.97 pass
547 corvo 1.93 task = common.scheduler.bossTask.jobsDict()
548    
549 corvo 1.97 for c, v in task.iteritems():
550     k = int(c)
551     nj=k
552 corvo 1.93 id = v['CHAIN_ID']
553     jid = v['SCHED_ID']
554     if jid:
555     print "Job: %-5s Id = %-40s " %(id,jid)
556     #else:
557     # print "Job: ",id," No ID yet"
558 slacapra 1.67 pass
559    
560 nsmirnov 1.4 elif ( opt == '-status' ):
561 spiga 1.107 # modified to support server mode
562     if (self.UseServer== 1):
563     self.actions[opt] = StatusServer(self.cfg_params)
564     else:
565     jobs = self.parseRange_(val)
566    
567     if len(jobs) != 0:
568     self.actions[opt] = StatusBoss(self.cfg_params)
569     pass
570 slacapra 1.22
571 nsmirnov 1.4 elif ( opt == '-kill' ):
572 slacapra 1.8
573 corvo 1.109 if (self.UseServer== 1):
574     self.actions[opt] = KillerServer(self.cfg_params)
575     else:
576     if val:
577     if val =='all':
578     jobs = common.scheduler.listBoss()
579     else:
580     jobs = self.parseRange_(val)
581     common.scheduler.cancel(jobs)
582 spiga 1.31 else:
583 corvo 1.109 common.logger.message("Warning: with '-kill' you _MUST_ specify a job range or 'all'")
584 nsmirnov 1.1
585 slacapra 1.70 elif ( opt == '-getoutput' or opt == '-get'):
586 spiga 1.107 # modified to support server mode
587     if (self.UseServer== 1):
588     self.actions[opt] = GetOutputServer(self.cfg_params)
589 spiga 1.20 else:
590 spiga 1.13
591 spiga 1.107 if val=='all' or val==None or val=='':
592     jobs = common.scheduler.listBoss()
593     else:
594     jobs = self.parseRange_(val)
595    
596     jobs_done = []
597     for nj in jobs:
598     jobs_done.append(nj)
599     common.scheduler.getOutput(jobs_done)
600     pass
601 nsmirnov 1.4
602     elif ( opt == '-resubmit' ):
603 slacapra 1.78 if val=='all' or val==None or val=='':
604     jobs = common.scheduler.listBoss()
605 fanzago 1.37 else:
606 slacapra 1.78 jobs = self.parseRange_(val)
607 fanzago 1.37
608 slacapra 1.11 if val:
609     # create a list of jobs to be resubmitted.
610 corvo 1.93 val = string.replace(val,'-',':')
611 slacapra 1.8
612 slacapra 1.22 ### as before, create a Resubmittter Class
613 slacapra 1.78 maxIndex = common.scheduler.listBoss()
614 corvo 1.93 ##
615     # Marco. Vediamo se va meglio cosi'...
616     ##
617 corvo 1.97 try:
618     common.scheduler.bossTask.query(ALL, val)
619     except RuntimeError,e:
620     common.logger.message( e.__str__() )
621     except ValueError,e:
622 slacapra 1.103 common.logger.message( "Warning : Scheduler interaction in query operation failed for jobs:")
623     common.logger.message(e.what())
624 corvo 1.97 pass
625 corvo 1.93 task = common.scheduler.bossTask.jobsDict()
626    
627 slacapra 1.11 nj_list = []
628 corvo 1.93 for c, v in task.iteritems():
629 corvo 1.97 k = int(c)
630     nj=k
631 corvo 1.93 st = v['STATUS']
632    
633 corvo 1.97 if int(nj) <= int(len(maxIndex)) :
634 spiga 1.104 if st in ['K','SA','Z','DA']:
635 corvo 1.97 nj_list.append(int(nj)-1)
636     common.jobDB.setStatus(int(nj)-1,'C')
637 spiga 1.99 elif st in ['E','SE']:
638 spiga 1.60 common.scheduler.moveOutput(nj)
639 corvo 1.97 nj_list.append(int(nj)-1)
640     st = common.jobDB.setStatus(int(nj)-1,'RC')
641 corvo 1.93 elif st in ['W']:
642 corvo 1.97 common.logger.message('Job #'+`int(nj)`+' has status '+crabJobStatusToString(st)+' not yet submitted!!!')
643 spiga 1.60 pass
644 corvo 1.93 elif st in ['SD', 'OR']:
645 corvo 1.97 common.logger.message('Job #'+`int(nj)`+' has status '+crabJobStatusToString(st)+' must be retrieved before resubmission')
646 spiga 1.60 else:
647     common.logger.message('Job #'+`nj`+' has status '+crabJobStatusToString(st)+' must be "killed" before resubmission')
648 slacapra 1.11 else:
649 corvo 1.97 common.logger.message('Job #'+`int(nj)`+' no possible to resubmit!! out of range')
650 fanzago 1.37 if len(common.job_list) == 0 :
651     common.job_list = JobList(common.jobDB.nJobs(),None)
652     common.job_list.setJDLNames(self.job_type_name+'.jdl')
653     pass
654    
655 slacapra 1.11 if len(nj_list) != 0:
656 corvo 1.93 nj_list.sort()
657 slacapra 1.11 # Instantiate Submitter object
658 fanzago 1.58 self.actions[opt] = Submitter(self.cfg_params, nj_list)
659 slacapra 1.11
660 slacapra 1.8 pass
661     pass
662 slacapra 1.11 else:
663     common.logger.message("Warning: with '-resubmit' you _MUST_ specify a job range or 'all'")
664 spiga 1.60 common.logger.message("WARNING: _all_ job specified in the range will be resubmitted!!!")
665 slacapra 1.11 pass
666 fanzago 1.37 common.jobDB.save()
667 slacapra 1.8 pass
668 gutsche 1.61
669 slacapra 1.8 elif ( opt == '-cancelAndResubmit' ):
670 nsmirnov 1.5
671 slacapra 1.78 if val:
672     if val =='all':
673     jobs = common.scheduler.listBoss()
674 spiga 1.47 else:
675 slacapra 1.78 jobs = self.parseRange_(val)
676     # kill submitted jobs
677     common.scheduler.cancel(jobs)
678 spiga 1.47 else:
679 slacapra 1.78 common.logger.message("Warning: with '-cancelAndResubmit' you _MUST_ specify a job range or 'all'")
680 nsmirnov 1.5
681 spiga 1.47 # resubmit cancelled jobs.
682     if val:
683     nj_list = []
684     for nj in jobs:
685     st = common.jobDB.status(int(nj)-1)
686     if st in ['K','A']:
687 corvo 1.97 nj_list.append(int(nj)-1)
688     common.jobDB.setStatus(int(nj)-1,'C')
689 spiga 1.47 elif st == 'Y':
690     common.scheduler.moveOutput(nj)
691 corvo 1.97 nj_list.append(int(nj)-1)
692     st = common.jobDB.setStatus(int(nj)-1,'RC')
693 spiga 1.47 elif st in ['C','X']:
694 corvo 1.97 common.logger.message('Job #'+`int(nj)`+' has status '+crabJobStatusToString(st)+' not yet submitted!!!')
695 spiga 1.47 pass
696     elif st == 'D':
697 corvo 1.97 common.logger.message('Job #'+`int(nj)`+' has status '+crabJobStatusToString(st)+' must be retrieved before resubmission')
698 spiga 1.47 else:
699     common.logger.message('Job #'+`nj`+' has status '+crabJobStatusToString(st)+' must be "killed" before resubmission')
700     pass
701    
702 nsmirnov 1.5 if len(common.job_list) == 0 :
703 spiga 1.47 common.job_list = JobList(common.jobDB.nJobs(),None)
704 nsmirnov 1.5 common.job_list.setJDLNames(self.job_type_name+'.jdl')
705     pass
706 spiga 1.47
707     if len(nj_list) != 0:
708 spiga 1.86 # common.scheduler.resubmit(nj_list)
709 fanzago 1.58 self.actions[opt] = Submitter(self.cfg_params, nj_list)
710 spiga 1.47 pass
711     pass
712     else:
713     common.logger.message("WARNING: _all_ job specified in the rage will be cancelled and resubmitted!!!")
714 nsmirnov 1.5 pass
715 spiga 1.47 common.jobDB.save()
716 nsmirnov 1.4 pass
717 spiga 1.47
718 slacapra 1.28 elif ( opt == '-testJdl' ):
719 slacapra 1.8 jobs = self.parseRange_(val)
720     nj_list = []
721     for nj in jobs:
722 slacapra 1.62 st = common.jobDB.status(nj-1)
723     if st == 'C': nj_list.append(nj-1)
724 slacapra 1.8 pass
725    
726     if len(nj_list) != 0:
727 slacapra 1.77 # Instantiate Checker object
728 slacapra 1.8 self.actions[opt] = Checker(self.cfg_params, nj_list)
729    
730     # Create and initialize JobList
731    
732     if len(common.job_list) == 0 :
733     common.job_list = JobList(common.jobDB.nJobs(), None)
734     common.job_list.setJDLNames(self.job_type_name+'.jdl')
735     pass
736     pass
737    
738 slacapra 1.9 elif ( opt == '-postMortem' ):
739 corvo 1.95
740 spiga 1.108 # modified to support server mode
741     if (self.UseServer== 1):
742     self.actions[opt] = PostMortemServer(self.cfg_params)
743     else:
744     if val:
745     val = string.replace(val,'-',':')
746     else: val=''
747     nj_list = {}
748 corvo 1.97
749 spiga 1.108 try:
750     common.scheduler.bossTask.query(ALL, val)
751     except RuntimeError,e:
752     common.logger.message( e.__str__() )
753     except ValueError,e:
754     common.logger.message("Warning : Scheduler interaction in query operation failed for jobs:")
755     common.logger.message( e.what())
756     pass
757 corvo 1.93
758 spiga 1.108 task = common.scheduler.bossTask.jobsDict()
759    
760     for c, v in task.iteritems():
761     k = int(c)
762     nj=k
763     if v['SCHED_ID']: nj_list[v['CHAIN_ID']]=v['SCHED_ID']
764     pass
765 slacapra 1.9
766 spiga 1.108 if len(nj_list) != 0:
767 corvo 1.93 # Instantiate PostMortem object
768 spiga 1.108 self.actions[opt] = PostMortem(self.cfg_params, nj_list)
769 slacapra 1.9 # Create and initialize JobList
770 spiga 1.108 if len(common.job_list) == 0 :
771     common.job_list = JobList(common.jobDB.nJobs(), None)
772     common.job_list.setJDLNames(self.job_type_name+'.jdl')
773     pass
774 slacapra 1.9 pass
775 spiga 1.108 else:
776     common.logger.message("No jobs to analyze")
777 slacapra 1.9
778     elif ( opt == '-clean' ):
779     if val != None:
780     raise CrabException("No range allowed for '-clean'")
781    
782 slacapra 1.105 self.actions[opt] = Cleaner(self.cfg_params)
783 slacapra 1.9
784 fanzago 1.111 ### FEDE DBS/DLS OUTPUT PUBLICATION
785 fanzago 1.112 #elif ( opt == '-publish' ):
786     # if val:
787     # username,dataname=string.split(val,'_')
788     # #print "username = ", username
789     # #print "dataname = ", dataname
790     # ### from script to class...
791     # thePublisher = Publisher(self.cfg_params, username,dataname)
792     # publish_exit_status = thePublisher.publish()
793     # if (publish_exit_status == '1'):
794     # common.logger.message("user data publication --> problems")
795     # else:
796     # common.logger.message("user data publication --> ok ")
797     # else:
798     # common.logger.message("Warning: with '-publish' you _MUST_ specify 'username_dataname' as value, where username is your surname and dataname is the name you want to use to publish your produced data")
799 fanzago 1.111 elif ( opt == '-publish' ):
800 slacapra 1.113 from DBS2Publisher import Publisher
801 fanzago 1.112 try:
802     if (int(self.cfg_params['USER.copy_data']) == 1 and int(self.cfg_params['USER.publish_data']) == 1):
803     try:
804     #self.cfg_params['USER.processed_datasetname']
805     precessedData=self.cfg_params['USER.publish_data_name']
806     thePublisher = Publisher(self.cfg_params)
807     publish_exit_status = thePublisher.publish()
808     if (publish_exit_status == '1'):
809     common.logger.message("user data publication --> problems")
810     else:
811     common.logger.message("user data publication --> ok ")
812     except KeyError:
813     raise CrabException('Cannot publish output data, because you did not specify USER.publish_data_name parameter in the crab.cfg file')
814 fanzago 1.111 else:
815 fanzago 1.112 common.logger.message("You can not publish data because you did not selected *** copy_data = 1 AND publish_data =1 *** in the crab.cfg file")
816     except KeyError:
817     raise CrabException('You can not publish data because you did not selected *** copy_data = 1 AND publish_data =1 *** in the crab.cfg file')
818     #########################################
819 nsmirnov 1.1 pass
820     return
821    
822 nsmirnov 1.3 def createWorkingSpace_(self):
823 slacapra 1.9 new_dir = ''
824    
825     try:
826     new_dir = self.cfg_params['USER.ui_working_dir']
827 corvo 1.50 self.cfg_params['taskId'] = self.cfg_params['user'] + '_' + string.split(new_dir, '/')[len(string.split(new_dir, '/')) - 1] + '_' + self.current_time
828 fanzago 1.48 if os.path.exists(new_dir):
829     if os.listdir(new_dir):
830     msg = new_dir + ' already exists and is not empty. Please remove it before create new task'
831     raise CrabException(msg)
832 spiga 1.89 if len(string.split(new_dir, '/')) == 1:
833     new_dir = self.cwd + new_dir
834     else:
835     new_dir = new_dir
836 slacapra 1.9 except KeyError:
837     new_dir = common.prog_name + '_' + self.name + '_' + self.current_time
838 corvo 1.50 self.cfg_params['taskId'] = self.cfg_params['user'] + '_' + new_dir
839 slacapra 1.9 new_dir = self.cwd + new_dir
840     pass
841 fanzago 1.49 common.work_space = WorkSpace(new_dir, self.cfg_params)
842 nsmirnov 1.1 common.work_space.create()
843     return
844    
845 nsmirnov 1.3 def loadConfiguration_(self, opts):
846 nsmirnov 1.1
847     save_opts = common.work_space.loadSavedOptions()
848    
849     # Override saved options with new command-line options
850    
851     for k in opts.keys():
852     save_opts[k] = opts[k]
853     pass
854    
855     # Return updated options
856     return save_opts
857    
858 nsmirnov 1.3 def createLogger_(self, args):
859 nsmirnov 1.1
860     log = Logger()
861     log.quiet(self.flag_quiet)
862     log.setDebugLevel(self.debug_level)
863     log.write(args+'\n')
864 nsmirnov 1.3 log.message(self.headerString_())
865 nsmirnov 1.1 log.flush()
866     common.logger = log
867     return
868    
869 nsmirnov 1.3 def updateHistory_(self, args):
870 nsmirnov 1.1 history_fname = common.prog_name+'.history'
871     history_file = open(history_fname, 'a')
872     history_file.write(self.current_time+': '+args+'\n')
873     history_file.close()
874     return
875    
876 nsmirnov 1.3 def headerString_(self):
877 nsmirnov 1.1 """
878     Creates a string describing program options either given in
879     the command line or their default values.
880     """
881     header = common.prog_name + ' (version ' + common.prog_version_str + \
882     ') running on ' + \
883     time.ctime(time.time())+'\n\n' + \
884     common.prog_name+'. Working options:\n'
885 fanzago 1.59 #print self.job_type_name
886 nsmirnov 1.1 header = header +\
887 slacapra 1.85 ' scheduler ' + self.cfg_params['CRAB.scheduler'] + '\n'+\
888 nsmirnov 1.1 ' job type ' + self.job_type_name + '\n'+\
889     ' working directory ' + common.work_space.topDir()\
890     + '\n'
891     return header
892    
893 nsmirnov 1.3 def createScheduler_(self):
894 nsmirnov 1.1 """
895     Creates a scheduler object instantiated by its name.
896     """
897 slacapra 1.101 klass_name = 'SchedulerBoss'
898 nsmirnov 1.1 file_name = klass_name
899     try:
900     klass = importName(file_name, klass_name)
901     except KeyError:
902     msg = 'No `class '+klass_name+'` found in file `'+file_name+'.py`'
903     raise CrabException(msg)
904     except ImportError, e:
905 slacapra 1.101 msg = 'Cannot create scheduler Boss'
906 nsmirnov 1.1 msg += ' (file: '+file_name+', class '+klass_name+'):\n'
907     msg += str(e)
908     raise CrabException(msg)
909    
910     common.scheduler = klass()
911     common.scheduler.configure(self.cfg_params)
912     return
913    
914     def run(self):
915     """
916     For each
917     """
918    
919     for act in self.main_actions:
920     if act in self.actions.keys(): self.actions[act].run()
921     pass
922    
923     for act in self.aux_actions:
924     if act in self.actions.keys(): self.actions[act].run()
925     pass
926     return
927    
928     ###########################################################################
929     def processHelpOptions(opts):
930    
931 slacapra 1.11 if len(opts):
932     for opt in opts.keys():
933     if opt in ('-v', '-version', '--version') :
934     print Crab.version()
935     return 1
936     if opt in ('-h','-help','--help') :
937     if opts[opt] : help(opts[opt])
938     else: help()
939     return 1
940     else:
941     usage()
942 corvo 1.93
943 nsmirnov 1.1 return 0
944    
945 corvo 1.93 ###########################################################################
946 nsmirnov 1.1 if __name__ == '__main__':
947 slacapra 1.92 ## Get rid of some useless warning
948 slacapra 1.94 try:
949     import warnings
950     warnings.simplefilter("ignore", RuntimeWarning)
951     except:
952     pass # too bad, you'll get the warning
953 corvo 1.93
954 nsmirnov 1.1 # Parse command-line options and create a dictionary with
955     # key-value pairs.
956    
957     options = parseOptions(sys.argv[1:])
958    
959     # Process "help" options, such as '-help', '-version'
960    
961 slacapra 1.11 if processHelpOptions(options) : sys.exit(0)
962 nsmirnov 1.1
963     # Create, initialize, and run a Crab object
964    
965     try:
966 nsmirnov 1.3 crab = Crab(options)
967 nsmirnov 1.1 crab.run()
968 corvo 1.54 crab.cfg_params['apmon'].free()
969 nsmirnov 1.1 except CrabException, e:
970     print '\n' + common.prog_name + ': ' + str(e) + '\n'
971     if common.logger:
972     common.logger.write('ERROR: '+str(e)+'\n')
973     pass
974     pass
975    
976     pass