ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/crab.py
Revision: 1.111
Committed: Tue Jun 19 17:23:56 2007 UTC (17 years, 10 months ago) by fanzago
Content type: text/x-python
Branch: MAIN
Changes since 1.110: +19 -2 lines
Log Message:
changes for DBS2 output publication

File Contents

# User Rev Content
1 elmer 1.65 #!/usr/bin/env python
2 nsmirnov 1.1 from crab_help import *
3     from crab_util import *
4     from crab_exceptions import *
5     from crab_logger import Logger
6     from WorkSpace import WorkSpace
7     from JobDB import JobDB
8 slacapra 1.77 from TaskDB import TaskDB
9 nsmirnov 1.3 from JobList import JobList
10 nsmirnov 1.1 from Creator import Creator
11     from Submitter import Submitter
12 slacapra 1.8 from Checker import Checker
13 slacapra 1.9 from PostMortem import PostMortem
14     from Status import Status
15 spiga 1.16 from StatusBoss import StatusBoss
16 corvo 1.42 from ApmonIf import ApmonIf
17 slacapra 1.36 from Cleaner import Cleaner
18 fanzago 1.111 from DBS2Publisher import Publisher
19 nsmirnov 1.1 import common
20 spiga 1.13 import Statistic
21 spiga 1.107 import commands
22 nsmirnov 1.1
23 spiga 1.98 from BossSession import *
24 corvo 1.93
25 spiga 1.107 #modified to support server mode
26     from SubmitterServer import SubmitterServer
27     from GetOutputServer import GetOutputServer
28     from StatusServer import StatusServer
29 spiga 1.108 from PostMortemServer import PostMortemServer
30 corvo 1.109 from KillerServer import KillerServer
31 spiga 1.107
32 nsmirnov 1.1 import sys, os, time, string
33    
34     ###########################################################################
35     class Crab:
36 nsmirnov 1.3 def __init__(self, opts):
37 fanzago 1.76 ## test_tag
38 nsmirnov 1.1 # The order of main_actions is important !
39 fanzago 1.111 self.main_actions = [ '-create', '-submit' ]
40     ### FEDE new option "-publish" FOR DBS OUTPUT PUBLICATION
41 slacapra 1.70 self.aux_actions = [ '-list', '-kill', '-status', '-getoutput','-get',
42 slacapra 1.67 '-resubmit' , '-cancelAndResubmit', '-testJdl', '-postMortem', '-clean',
43 fanzago 1.111 '-printId', '-publish' ]
44 nsmirnov 1.1
45     # Dictionary of actions, e.g. '-create' -> object of class Creator
46     self.actions = {}
47 fanzago 1.76
48 nsmirnov 1.1 # Configuration file
49     self.cfg_fname = None
50     # Dictionary with configuration parameters
51     self.cfg_params = {}
52    
53     # Current working directory
54     self.cwd = os.getcwd()+'/'
55     # Current time in format 'yymmdd_hhmmss'
56     self.current_time = time.strftime('%y%m%d_%H%M%S',
57     time.localtime(time.time()))
58    
59 nsmirnov 1.3 # Session name (?) Do we need this ?
60 nsmirnov 1.1 self.name = '0'
61    
62     # Job type
63     self.job_type_name = None
64    
65     # Continuation flag
66     self.flag_continue = 0
67    
68     # quiet mode, i.e. no output on screen
69     self.flag_quiet = 0
70     # produce more output
71     self.debug_level = 0
72    
73    
74 nsmirnov 1.3 self.initialize_(opts)
75 nsmirnov 1.1
76     return
77    
78 nsmirnov 1.7 def version():
79 nsmirnov 1.1 return common.prog_version_str
80    
81 nsmirnov 1.7 version = staticmethod(version)
82    
83 nsmirnov 1.3 def initialize_(self, opts):
84 nsmirnov 1.1
85     # Process the '-continue' option first because
86     # in the case of continuation the CRAB configuration
87     # parameters are loaded from already existing Working Space.
88 nsmirnov 1.3 self.processContinueOption_(opts)
89 nsmirnov 1.1
90     # Process ini-file first, then command line options
91     # because they override ini-file settings.
92    
93 nsmirnov 1.3 self.processIniFile_(opts)
94 nsmirnov 1.1
95 nsmirnov 1.3 if self.flag_continue: opts = self.loadConfiguration_(opts)
96 nsmirnov 1.1
97 nsmirnov 1.3 self.processOptions_(opts)
98 nsmirnov 1.1
99 slacapra 1.77 common.taskDB = TaskDB()
100    
101 slacapra 1.101
102 nsmirnov 1.1 if not self.flag_continue:
103 nsmirnov 1.3 self.createWorkingSpace_()
104 slacapra 1.9 optsToBeSaved={}
105     for it in opts.keys():
106     if (it in self.main_actions) or (it in self.aux_actions) or (it == '-debug'):
107     pass
108     else:
109     optsToBeSaved[it]=opts[it]
110 slacapra 1.101 # store in taskDB the opts
111     common.taskDB.setDict(it[1:], optsToBeSaved[it])
112 slacapra 1.9 common.work_space.saveConfiguration(optsToBeSaved, self.cfg_fname)
113 nsmirnov 1.1 pass
114 slacapra 1.77 else:
115     common.taskDB.load()
116 nsmirnov 1.1
117     # At this point all configuration options have been read.
118    
119     args = string.join(sys.argv,' ')
120 slacapra 1.11
121 nsmirnov 1.3 self.updateHistory_(args)
122 slacapra 1.11
123 nsmirnov 1.3 self.createLogger_(args)
124 slacapra 1.11
125 nsmirnov 1.1 common.jobDB = JobDB()
126 corvo 1.51
127 spiga 1.107 self.UseServer=0
128     try:
129     self.UseServer=int(self.cfg_params['CRAB.server_mode'])
130     except KeyError:
131     pass
132    
133    
134 corvo 1.63 if int(self.cfg_params['USER.activate_monalisa']): self.cfg_params['apmon'] = ApmonIf()
135    
136 nsmirnov 1.3 if self.flag_continue:
137 slacapra 1.12 try:
138     common.jobDB.load()
139 corvo 1.50 self.cfg_params['taskId'] = common.jobDB._jobs[0].taskId
140 slacapra 1.12 common.logger.debug(6, str(common.jobDB))
141     except DBException,e:
142     pass
143 nsmirnov 1.3 pass
144 slacapra 1.11
145 nsmirnov 1.3 self.createScheduler_()
146 slacapra 1.11
147 slacapra 1.106 common.logger.debug(6, 'Used properties:')
148     if (common.logger.debugLevel()<6 ):
149     common.logger.write('Used properties:')
150     keys = self.cfg_params.keys()
151     keys.sort()
152     for k in keys:
153     if self.cfg_params[k]:
154     common.logger.debug(6, ' '+k+' : '+str(self.cfg_params[k]))
155     if (common.logger.debugLevel()<6 ):
156     common.logger.write(' '+k+' : '+str(self.cfg_params[k]))
157     pass
158     else:
159     common.logger.debug(6, ' '+k+' : ')
160     if (common.logger.debugLevel()<6 ):
161     common.logger.write(' '+k+' : ')
162 nsmirnov 1.3 pass
163     pass
164 slacapra 1.106 common.logger.debug(6, 'End of used properties.\n')
165     if (common.logger.debugLevel()<6 ):
166     common.logger.write('End of used properties.\n')
167    
168 nsmirnov 1.3 self.initializeActions_(opts)
169 nsmirnov 1.1 return
170    
171 nsmirnov 1.3 def processContinueOption_(self,opts):
172 nsmirnov 1.1
173     continue_dir = None
174 nsmirnov 1.4
175     # Look for the '-continue' option.
176    
177 nsmirnov 1.1 for opt in opts.keys():
178     if ( opt in ('-continue','-c') ):
179     self.flag_continue = 1
180     val = opts[opt]
181     if val:
182     if val[0] == '/': continue_dir = val # abs path
183     else: continue_dir = self.cwd + val # rel path
184     pass
185 nsmirnov 1.4 break
186     pass
187    
188     # Look for actions which has sense only with '-continue'
189    
190     if not self.flag_continue:
191     for opt in opts.keys():
192 slacapra 1.102 if ( opt in (self.aux_actions)):
193 nsmirnov 1.4 self.flag_continue = 1
194     break
195 nsmirnov 1.1 pass
196     pass
197 slacapra 1.102 if ("-submit" in opts.keys() and "-create" not in opts.keys() ):
198     self.flag_continue = 1
199 nsmirnov 1.1
200     if not self.flag_continue: return
201    
202     if not continue_dir:
203     prefix = common.prog_name + '_' + self.name + '_'
204     continue_dir = findLastWorkDir(prefix)
205     pass
206    
207     if not continue_dir:
208     raise CrabException('Cannot find last working directory.')
209    
210     if not os.path.exists(continue_dir):
211     msg = 'Cannot continue because the working directory <'
212     msg += continue_dir
213     msg += '> does not exist.'
214     raise CrabException(msg)
215    
216     # Instantiate WorkSpace
217 fanzago 1.49 common.work_space = WorkSpace(continue_dir, self.cfg_params)
218 nsmirnov 1.1
219     return
220    
221 nsmirnov 1.3 def processIniFile_(self, opts):
222 nsmirnov 1.1 """
223     Processes a configuration INI-file.
224     """
225    
226     # Extract cfg-file name from the cmd-line options.
227    
228     for opt in opts.keys():
229     if ( opt == '-cfg' ):
230     if self.flag_continue:
231     raise CrabException('-continue and -cfg cannot coexist.')
232     if opts[opt] : self.cfg_fname = opts[opt]
233     else : usage()
234     pass
235    
236     elif ( opt == '-name' ):
237     self.name = opts[opt]
238     pass
239    
240     pass
241    
242     # Set default cfg-fname
243    
244     if self.cfg_fname == None:
245     if self.flag_continue:
246     self.cfg_fname = common.work_space.cfgFileName()
247     else:
248     self.cfg_fname = common.prog_name+'.cfg'
249     pass
250     pass
251    
252     # Load cfg-file
253    
254     if string.lower(self.cfg_fname) != 'none':
255     if os.path.exists(self.cfg_fname):
256     self.cfg_params = loadConfig(self.cfg_fname)
257 corvo 1.64 self.cfg_params['user'] = os.environ['USER']
258 nsmirnov 1.1 pass
259     else:
260     msg = 'cfg-file '+self.cfg_fname+' not found.'
261     raise CrabException(msg)
262     pass
263     pass
264    
265     # process the [CRAB] section
266    
267     lhp = len('CRAB.')
268     for k in self.cfg_params.keys():
269     if len(k) >= lhp and k[:lhp] == 'CRAB.':
270     opt = '-'+k[lhp:]
271     if len(opt) >= 3 and opt[:3] == '-__': continue
272     if opt not in opts.keys():
273     opts[opt] = self.cfg_params[k]
274     pass
275     pass
276     pass
277    
278     return
279    
280 nsmirnov 1.3 def processOptions_(self, opts):
281 nsmirnov 1.1 """
282     Processes the command-line options.
283     """
284    
285     for opt in opts.keys():
286     val = opts[opt]
287    
288 nsmirnov 1.3 # Skip actions, they are processed later in initializeActions_()
289     if opt in self.main_actions:
290     self.cfg_params['CRAB.'+opt[1:]] = val
291     continue
292     if opt in self.aux_actions:
293     self.cfg_params['CRAB.'+opt[1:]] = val
294     continue
295 nsmirnov 1.1
296 spiga 1.107 elif ( opt == '-server_mode' ): #Add for server mode usage
297     pass
298     elif ( opt == '-server_name' ):
299     pass
300 nsmirnov 1.1
301     elif ( opt == '-cfg' ):
302     pass
303    
304     elif ( opt in ('-continue', '-c') ):
305 nsmirnov 1.4 # Already processed in processContinueOption_()
306 nsmirnov 1.1 pass
307    
308     elif ( opt == '-jobtype' ):
309     if val : self.job_type_name = string.upper(val)
310     else : usage()
311     pass
312    
313     elif ( opt == '-Q' ):
314     self.flag_quiet = 1
315     pass
316    
317     elif ( opt == '-debug' ):
318 slacapra 1.6 if val: self.debug_level = int(val)
319     else: self.debug_level = 1
320 nsmirnov 1.1 pass
321    
322     elif ( opt == '-scheduler' ):
323     pass
324 slacapra 1.22
325 nsmirnov 1.3 elif string.find(opt,'.') == -1:
326     print common.prog_name+'. Unrecognized option '+opt
327     usage()
328     pass
329 nsmirnov 1.1
330 nsmirnov 1.3 # Override config parameters from INI-file with cmd-line params
331     if string.find(opt,'.') == -1 :
332     self.cfg_params['CRAB.'+opt[1:]] = val
333 nsmirnov 1.1 pass
334 nsmirnov 1.3 else:
335 nsmirnov 1.1 # Command line parameters in the form -SECTION.ENTRY=VALUE
336     self.cfg_params[opt[1:]] = val
337     pass
338     pass
339     return
340    
341 slacapra 1.8 def parseRange_(self, aRange):
342 nsmirnov 1.4 """
343 slacapra 1.8 Takes as the input a string with a range defined in any of the following
344     way, including combination, and return a tuple with the ints defined
345     according to following table. A consistency check is done.
346     NB: the first job is "1", not "0".
347     'all' -> [1,2,..., NJobs]
348     '' -> [1,2,..., NJobs]
349     'n1' -> [n1]
350     'n1-n2' -> [n1, n1+1, n1+2, ..., n2-1, n2]
351     'n1,n2' -> [n1, n2]
352     'n1,n2-n3' -> [n1, n2, n2+1, n2+2, ..., n3-1, n3]
353     """
354     result = []
355    
356 slacapra 1.9 common.logger.debug(5,"parseRange_ "+str(aRange))
357     if aRange=='all' or aRange==None or aRange=='':
358 slacapra 1.73 result=range(1,common.jobDB.nJobs()+1)
359 slacapra 1.8 return result
360 slacapra 1.9 elif aRange=='0':
361     return result
362 slacapra 1.8
363     subRanges = string.split(aRange, ',')
364     for subRange in subRanges:
365     result = result+self.parseSimpleRange_(subRange)
366    
367     if self.checkUniqueness_(result):
368     return result
369     else:
370 slacapra 1.33 common.logger.message("Error "+result)
371 slacapra 1.8 return []
372    
373     def checkUniqueness_(self, list):
374     """
375 slacapra 1.9 check if a list contains only unique elements
376 slacapra 1.8 """
377    
378     uniqueList = []
379     # use a list comprehension statement (takes a while to understand)
380    
381     [uniqueList.append(it) for it in list if not uniqueList.count(it)]
382    
383     return (len(list)==len(uniqueList))
384    
385 spiga 1.107 def uuidgen(self):
386     """Generate a UUID"""
387     return commands.getoutput('uuidgen')
388    
389    
390 slacapra 1.8 def parseSimpleRange_(self, aRange):
391     """
392     Takes as the input a string with two integers separated by
393     the minus sign and returns the tuple with these numbers:
394     'n1-n2' -> [n1, n1+1, n1+2, ..., n2-1, n2]
395     'n1' -> [n1]
396     """
397     (start, end) = (None, None)
398    
399     result = []
400     minus = string.find(aRange, '-')
401     if ( minus < 0 ):
402     if isInt(aRange) and int(aRange)>0:
403 corvo 1.97 # FEDE
404     #result.append(int(aRange)-1)
405     ###
406 slacapra 1.62 result.append(int(aRange))
407 slacapra 1.8 else:
408 spiga 1.47 common.logger.message("parseSimpleRange_ ERROR "+aRange)
409     usage()
410     pass
411    
412 nsmirnov 1.4 pass
413     else:
414 slacapra 1.8 (start, end) = string.split(aRange, '-')
415     if isInt(start) and isInt(end) and int(start)>0 and int(start)<int(end):
416 corvo 1.97 #result=range(int(start)-1, int(end))
417     result=range(int(start), int(end)+1) #Daniele
418 slacapra 1.8 else:
419 slacapra 1.33 common.logger.message("parseSimpleRange_ ERROR "+start+end)
420 slacapra 1.8
421     return result
422 nsmirnov 1.4
423 nsmirnov 1.3 def initializeActions_(self, opts):
424 nsmirnov 1.1 """
425     For each user action instantiate a corresponding
426     object and put it in the action dictionary.
427     """
428 spiga 1.15
429     for opt in opts.keys():
430    
431 nsmirnov 1.1 val = opts[opt]
432 spiga 1.15
433    
434     if ( opt == '-create' ):
435 gutsche 1.83 if val and val != 'all':
436     msg = 'Per default, CRAB will create all jobs as specified in the crab.cfg file, not the command line!'
437     common.logger.message(msg)
438     msg = 'Submission will still take into account the number of jobs specified on the command line!\n'
439     common.logger.message(msg)
440 slacapra 1.82 ncjobs = 'all'
441    
442     # Instantiate Creator object
443     self.creator = Creator(self.job_type_name,
444     self.cfg_params,
445     ncjobs)
446     self.actions[opt] = self.creator
447    
448     # Initialize the JobDB object if needed
449     if not self.flag_continue:
450     common.jobDB.create(self.creator.nJobs())
451 nsmirnov 1.1 pass
452 nsmirnov 1.3
453 slacapra 1.82 # Create and initialize JobList
454 nsmirnov 1.3
455 slacapra 1.82 common.job_list = JobList(common.jobDB.nJobs(),
456     self.creator.jobType())
457 nsmirnov 1.3
458 slacapra 1.82 common.taskDB.setDict('ScriptName',common.work_space.jobDir()+"/"+self.job_type_name+'.sh')
459     common.taskDB.setDict('JdlName',common.work_space.jobDir()+"/"+self.job_type_name+'.jdl')
460     common.taskDB.setDict('CfgName',common.work_space.jobDir()+"/"+self.creator.jobType().configFilename())
461     common.job_list.setScriptNames(self.job_type_name+'.sh')
462     common.job_list.setJDLNames(self.job_type_name+'.jdl')
463     common.job_list.setCfgNames(self.creator.jobType().configFilename())
464 slacapra 1.9
465 slacapra 1.82 self.creator.writeJobsSpecsToDB()
466 spiga 1.107 taskUnicId= self.uuidgen()
467     common.taskDB.setDict('TasKUUID',taskUnicId)
468 slacapra 1.77
469 slacapra 1.82 common.taskDB.save()
470 nsmirnov 1.3 pass
471 nsmirnov 1.1
472     elif ( opt == '-submit' ):
473 spiga 1.107 # modified to support server mode
474     if (self.UseServer== 1):
475     self.actions[opt] = SubmitterServer(self.cfg_params)
476     else:
477     # modified to support server mode
478     # get user request
479     nsjobs = -1
480     if val:
481     if ( isInt(val) ):
482     nsjobs = int(val)
483     elif (val=='all'):
484     pass
485     else:
486     msg = 'Bad submission option <'+str(val)+'>\n'
487     msg += ' Must be an integer or "all"'
488     msg += ' Generic range is not allowed"'
489     raise CrabException(msg)
490     pass
491    
492     common.logger.debug(5,'nsjobs '+str(nsjobs))
493     # total jobs
494     nj_list = []
495     # get the first not already submitted
496     common.logger.debug(5,'Total jobs '+str(common.jobDB.nJobs()))
497     jobSetForSubmission = 0
498 gutsche 1.110 jobSkippedInSubmission = []
499 spiga 1.107 for nj in range(common.jobDB.nJobs()):
500 gutsche 1.110 if len(common.jobDB.destination(nj)) != 0 :
501     if (common.jobDB.status(nj) not in ['R','S','K','Y','A','D','Z']):
502     jobSetForSubmission +=1
503     nj_list.append(nj)
504     else: continue
505     else :
506     jobSkippedInSubmission.append(nj+1)
507 spiga 1.107 if nsjobs >0 and nsjobs == jobSetForSubmission:
508     break
509     pass
510     if nsjobs>jobSetForSubmission:
511     common.logger.message('asking to submit '+str(nsjobs)+' jobs, but only '+str(jobSetForSubmission)+' left: submitting those')
512 gutsche 1.110 if len(jobSkippedInSubmission) > 0 :
513     common.logger.message("Jobs: " + spanRanges(jobSkippedInSubmission) + " skipped because no sites are hosting this data")
514 spiga 1.107 # submit N from last submitted job
515     common.logger.debug(5,'nj_list '+str(nj_list))
516    
517     if len(nj_list) != 0:
518     # Instantiate Submitter object
519     self.actions[opt] = Submitter(self.cfg_params, nj_list)
520     # Create and initialize JobList
521     if len(common.job_list) == 0 :
522     common.job_list = JobList(common.jobDB.nJobs(),
523     None)
524     common.job_list.setJDLNames(self.job_type_name+'.jdl')
525     pass
526 slacapra 1.23 pass
527 slacapra 1.22 else:
528 spiga 1.107 common.logger.message('No jobs left to submit: exiting...')
529 nsmirnov 1.1 pass
530    
531 nsmirnov 1.4 elif ( opt == '-list' ):
532 slacapra 1.8 jobs = self.parseRange_(val)
533    
534     common.jobDB.dump(jobs)
535 nsmirnov 1.4 pass
536    
537 slacapra 1.67 elif ( opt == '-printId' ):
538 corvo 1.93
539 corvo 1.97 try:
540     common.scheduler.bossTask.query(ALL)
541     except RuntimeError,e:
542     common.logger.message( e.__str__() )
543     except ValueError,e:
544 slacapra 1.103 common.logger.message("Warning : Scheduler interaction in query operation failed for jobs:")
545     common.logger.message(e.what())
546 corvo 1.97 pass
547 corvo 1.93 task = common.scheduler.bossTask.jobsDict()
548    
549 corvo 1.97 for c, v in task.iteritems():
550     k = int(c)
551     nj=k
552 corvo 1.93 id = v['CHAIN_ID']
553     jid = v['SCHED_ID']
554     if jid:
555     print "Job: %-5s Id = %-40s " %(id,jid)
556     #else:
557     # print "Job: ",id," No ID yet"
558 slacapra 1.67 pass
559    
560 nsmirnov 1.4 elif ( opt == '-status' ):
561 spiga 1.107 # modified to support server mode
562     if (self.UseServer== 1):
563     self.actions[opt] = StatusServer(self.cfg_params)
564     else:
565     jobs = self.parseRange_(val)
566    
567     if len(jobs) != 0:
568     self.actions[opt] = StatusBoss(self.cfg_params)
569     pass
570 slacapra 1.22
571 nsmirnov 1.4 elif ( opt == '-kill' ):
572 slacapra 1.8
573 corvo 1.109 if (self.UseServer== 1):
574     self.actions[opt] = KillerServer(self.cfg_params)
575     else:
576     if val:
577     if val =='all':
578     jobs = common.scheduler.listBoss()
579     else:
580     jobs = self.parseRange_(val)
581     common.scheduler.cancel(jobs)
582 spiga 1.31 else:
583 corvo 1.109 common.logger.message("Warning: with '-kill' you _MUST_ specify a job range or 'all'")
584 nsmirnov 1.1
585 slacapra 1.70 elif ( opt == '-getoutput' or opt == '-get'):
586 spiga 1.107 # modified to support server mode
587     if (self.UseServer== 1):
588     self.actions[opt] = GetOutputServer(self.cfg_params)
589 spiga 1.20 else:
590 spiga 1.13
591 spiga 1.107 if val=='all' or val==None or val=='':
592     jobs = common.scheduler.listBoss()
593     else:
594     jobs = self.parseRange_(val)
595    
596     jobs_done = []
597     for nj in jobs:
598     jobs_done.append(nj)
599     common.scheduler.getOutput(jobs_done)
600     pass
601 nsmirnov 1.4
602     elif ( opt == '-resubmit' ):
603 slacapra 1.78 if val=='all' or val==None or val=='':
604     jobs = common.scheduler.listBoss()
605 fanzago 1.37 else:
606 slacapra 1.78 jobs = self.parseRange_(val)
607 fanzago 1.37
608 slacapra 1.11 if val:
609     # create a list of jobs to be resubmitted.
610 corvo 1.93 val = string.replace(val,'-',':')
611 slacapra 1.8
612 slacapra 1.22 ### as before, create a Resubmittter Class
613 slacapra 1.78 maxIndex = common.scheduler.listBoss()
614 corvo 1.93 ##
615     # Marco. Vediamo se va meglio cosi'...
616     ##
617 corvo 1.97 try:
618     common.scheduler.bossTask.query(ALL, val)
619     except RuntimeError,e:
620     common.logger.message( e.__str__() )
621     except ValueError,e:
622 slacapra 1.103 common.logger.message( "Warning : Scheduler interaction in query operation failed for jobs:")
623     common.logger.message(e.what())
624 corvo 1.97 pass
625 corvo 1.93 task = common.scheduler.bossTask.jobsDict()
626    
627 slacapra 1.11 nj_list = []
628 corvo 1.93 for c, v in task.iteritems():
629 corvo 1.97 k = int(c)
630     nj=k
631 corvo 1.93 st = v['STATUS']
632    
633 corvo 1.97 if int(nj) <= int(len(maxIndex)) :
634 spiga 1.104 if st in ['K','SA','Z','DA']:
635 corvo 1.97 nj_list.append(int(nj)-1)
636     common.jobDB.setStatus(int(nj)-1,'C')
637 spiga 1.99 elif st in ['E','SE']:
638 spiga 1.60 common.scheduler.moveOutput(nj)
639 corvo 1.97 nj_list.append(int(nj)-1)
640     st = common.jobDB.setStatus(int(nj)-1,'RC')
641 corvo 1.93 elif st in ['W']:
642 corvo 1.97 common.logger.message('Job #'+`int(nj)`+' has status '+crabJobStatusToString(st)+' not yet submitted!!!')
643 spiga 1.60 pass
644 corvo 1.93 elif st in ['SD', 'OR']:
645 corvo 1.97 common.logger.message('Job #'+`int(nj)`+' has status '+crabJobStatusToString(st)+' must be retrieved before resubmission')
646 spiga 1.60 else:
647     common.logger.message('Job #'+`nj`+' has status '+crabJobStatusToString(st)+' must be "killed" before resubmission')
648 slacapra 1.11 else:
649 corvo 1.97 common.logger.message('Job #'+`int(nj)`+' no possible to resubmit!! out of range')
650 fanzago 1.37 if len(common.job_list) == 0 :
651     common.job_list = JobList(common.jobDB.nJobs(),None)
652     common.job_list.setJDLNames(self.job_type_name+'.jdl')
653     pass
654    
655 slacapra 1.11 if len(nj_list) != 0:
656 corvo 1.93 nj_list.sort()
657 slacapra 1.11 # Instantiate Submitter object
658 fanzago 1.58 self.actions[opt] = Submitter(self.cfg_params, nj_list)
659 slacapra 1.11
660 slacapra 1.8 pass
661     pass
662 slacapra 1.11 else:
663     common.logger.message("Warning: with '-resubmit' you _MUST_ specify a job range or 'all'")
664 spiga 1.60 common.logger.message("WARNING: _all_ job specified in the range will be resubmitted!!!")
665 slacapra 1.11 pass
666 fanzago 1.37 common.jobDB.save()
667 slacapra 1.8 pass
668 gutsche 1.61
669 slacapra 1.8 elif ( opt == '-cancelAndResubmit' ):
670 nsmirnov 1.5
671 slacapra 1.78 if val:
672     if val =='all':
673     jobs = common.scheduler.listBoss()
674 spiga 1.47 else:
675 slacapra 1.78 jobs = self.parseRange_(val)
676     # kill submitted jobs
677     common.scheduler.cancel(jobs)
678 spiga 1.47 else:
679 slacapra 1.78 common.logger.message("Warning: with '-cancelAndResubmit' you _MUST_ specify a job range or 'all'")
680 nsmirnov 1.5
681 spiga 1.47 # resubmit cancelled jobs.
682     if val:
683     nj_list = []
684     for nj in jobs:
685     st = common.jobDB.status(int(nj)-1)
686     if st in ['K','A']:
687 corvo 1.97 nj_list.append(int(nj)-1)
688     common.jobDB.setStatus(int(nj)-1,'C')
689 spiga 1.47 elif st == 'Y':
690     common.scheduler.moveOutput(nj)
691 corvo 1.97 nj_list.append(int(nj)-1)
692     st = common.jobDB.setStatus(int(nj)-1,'RC')
693 spiga 1.47 elif st in ['C','X']:
694 corvo 1.97 common.logger.message('Job #'+`int(nj)`+' has status '+crabJobStatusToString(st)+' not yet submitted!!!')
695 spiga 1.47 pass
696     elif st == 'D':
697 corvo 1.97 common.logger.message('Job #'+`int(nj)`+' has status '+crabJobStatusToString(st)+' must be retrieved before resubmission')
698 spiga 1.47 else:
699     common.logger.message('Job #'+`nj`+' has status '+crabJobStatusToString(st)+' must be "killed" before resubmission')
700     pass
701    
702 nsmirnov 1.5 if len(common.job_list) == 0 :
703 spiga 1.47 common.job_list = JobList(common.jobDB.nJobs(),None)
704 nsmirnov 1.5 common.job_list.setJDLNames(self.job_type_name+'.jdl')
705     pass
706 spiga 1.47
707     if len(nj_list) != 0:
708 spiga 1.86 # common.scheduler.resubmit(nj_list)
709 fanzago 1.58 self.actions[opt] = Submitter(self.cfg_params, nj_list)
710 spiga 1.47 pass
711     pass
712     else:
713     common.logger.message("WARNING: _all_ job specified in the rage will be cancelled and resubmitted!!!")
714 nsmirnov 1.5 pass
715 spiga 1.47 common.jobDB.save()
716 nsmirnov 1.4 pass
717 spiga 1.47
718 slacapra 1.28 elif ( opt == '-testJdl' ):
719 slacapra 1.8 jobs = self.parseRange_(val)
720     nj_list = []
721     for nj in jobs:
722 slacapra 1.62 st = common.jobDB.status(nj-1)
723     if st == 'C': nj_list.append(nj-1)
724 slacapra 1.8 pass
725    
726     if len(nj_list) != 0:
727 slacapra 1.77 # Instantiate Checker object
728 slacapra 1.8 self.actions[opt] = Checker(self.cfg_params, nj_list)
729    
730     # Create and initialize JobList
731    
732     if len(common.job_list) == 0 :
733     common.job_list = JobList(common.jobDB.nJobs(), None)
734     common.job_list.setJDLNames(self.job_type_name+'.jdl')
735     pass
736     pass
737    
738 slacapra 1.9 elif ( opt == '-postMortem' ):
739 corvo 1.95
740 spiga 1.108 # modified to support server mode
741     if (self.UseServer== 1):
742     self.actions[opt] = PostMortemServer(self.cfg_params)
743     else:
744     if val:
745     val = string.replace(val,'-',':')
746     else: val=''
747     nj_list = {}
748 corvo 1.97
749 spiga 1.108 try:
750     common.scheduler.bossTask.query(ALL, val)
751     except RuntimeError,e:
752     common.logger.message( e.__str__() )
753     except ValueError,e:
754     common.logger.message("Warning : Scheduler interaction in query operation failed for jobs:")
755     common.logger.message( e.what())
756     pass
757 corvo 1.93
758 spiga 1.108 task = common.scheduler.bossTask.jobsDict()
759    
760     for c, v in task.iteritems():
761     k = int(c)
762     nj=k
763     if v['SCHED_ID']: nj_list[v['CHAIN_ID']]=v['SCHED_ID']
764     pass
765 slacapra 1.9
766 spiga 1.108 if len(nj_list) != 0:
767 corvo 1.93 # Instantiate PostMortem object
768 spiga 1.108 self.actions[opt] = PostMortem(self.cfg_params, nj_list)
769 slacapra 1.9 # Create and initialize JobList
770 spiga 1.108 if len(common.job_list) == 0 :
771     common.job_list = JobList(common.jobDB.nJobs(), None)
772     common.job_list.setJDLNames(self.job_type_name+'.jdl')
773     pass
774 slacapra 1.9 pass
775 spiga 1.108 else:
776     common.logger.message("No jobs to analyze")
777 slacapra 1.9
778     elif ( opt == '-clean' ):
779     if val != None:
780     raise CrabException("No range allowed for '-clean'")
781    
782 slacapra 1.105 self.actions[opt] = Cleaner(self.cfg_params)
783 slacapra 1.9
784 fanzago 1.111 ### FEDE DBS/DLS OUTPUT PUBLICATION
785     elif ( opt == '-publish' ):
786     if val:
787     username,dataname=string.split(val,'_')
788     #print "username = ", username
789     #print "dataname = ", dataname
790     ### from script to class...
791     thePublisher = Publisher(self.cfg_params, username,dataname)
792     publish_exit_status = thePublisher.publish()
793     if (publish_exit_status == '1'):
794     common.logger.message("user data publication --> problems")
795     else:
796     common.logger.message("user data publication --> ok ")
797     else:
798     common.logger.message("Warning: with '-publish' you _MUST_ specify 'username_dataname' as value, where username is your surname and dataname is the name you want to use to publish your produced data")
799 nsmirnov 1.1 pass
800     return
801    
802 nsmirnov 1.3 def createWorkingSpace_(self):
803 slacapra 1.9 new_dir = ''
804    
805     try:
806     new_dir = self.cfg_params['USER.ui_working_dir']
807 corvo 1.50 self.cfg_params['taskId'] = self.cfg_params['user'] + '_' + string.split(new_dir, '/')[len(string.split(new_dir, '/')) - 1] + '_' + self.current_time
808 fanzago 1.48 if os.path.exists(new_dir):
809     if os.listdir(new_dir):
810     msg = new_dir + ' already exists and is not empty. Please remove it before create new task'
811     raise CrabException(msg)
812 spiga 1.89 if len(string.split(new_dir, '/')) == 1:
813     new_dir = self.cwd + new_dir
814     else:
815     new_dir = new_dir
816 slacapra 1.9 except KeyError:
817     new_dir = common.prog_name + '_' + self.name + '_' + self.current_time
818 corvo 1.50 self.cfg_params['taskId'] = self.cfg_params['user'] + '_' + new_dir
819 slacapra 1.9 new_dir = self.cwd + new_dir
820     pass
821 fanzago 1.49 common.work_space = WorkSpace(new_dir, self.cfg_params)
822 nsmirnov 1.1 common.work_space.create()
823     return
824    
825 nsmirnov 1.3 def loadConfiguration_(self, opts):
826 nsmirnov 1.1
827     save_opts = common.work_space.loadSavedOptions()
828    
829     # Override saved options with new command-line options
830    
831     for k in opts.keys():
832     save_opts[k] = opts[k]
833     pass
834    
835     # Return updated options
836     return save_opts
837    
838 nsmirnov 1.3 def createLogger_(self, args):
839 nsmirnov 1.1
840     log = Logger()
841     log.quiet(self.flag_quiet)
842     log.setDebugLevel(self.debug_level)
843     log.write(args+'\n')
844 nsmirnov 1.3 log.message(self.headerString_())
845 nsmirnov 1.1 log.flush()
846     common.logger = log
847     return
848    
849 nsmirnov 1.3 def updateHistory_(self, args):
850 nsmirnov 1.1 history_fname = common.prog_name+'.history'
851     history_file = open(history_fname, 'a')
852     history_file.write(self.current_time+': '+args+'\n')
853     history_file.close()
854     return
855    
856 nsmirnov 1.3 def headerString_(self):
857 nsmirnov 1.1 """
858     Creates a string describing program options either given in
859     the command line or their default values.
860     """
861     header = common.prog_name + ' (version ' + common.prog_version_str + \
862     ') running on ' + \
863     time.ctime(time.time())+'\n\n' + \
864     common.prog_name+'. Working options:\n'
865 fanzago 1.59 #print self.job_type_name
866 nsmirnov 1.1 header = header +\
867 slacapra 1.85 ' scheduler ' + self.cfg_params['CRAB.scheduler'] + '\n'+\
868 nsmirnov 1.1 ' job type ' + self.job_type_name + '\n'+\
869     ' working directory ' + common.work_space.topDir()\
870     + '\n'
871     return header
872    
873 nsmirnov 1.3 def createScheduler_(self):
874 nsmirnov 1.1 """
875     Creates a scheduler object instantiated by its name.
876     """
877 slacapra 1.101 klass_name = 'SchedulerBoss'
878 nsmirnov 1.1 file_name = klass_name
879     try:
880     klass = importName(file_name, klass_name)
881     except KeyError:
882     msg = 'No `class '+klass_name+'` found in file `'+file_name+'.py`'
883     raise CrabException(msg)
884     except ImportError, e:
885 slacapra 1.101 msg = 'Cannot create scheduler Boss'
886 nsmirnov 1.1 msg += ' (file: '+file_name+', class '+klass_name+'):\n'
887     msg += str(e)
888     raise CrabException(msg)
889    
890     common.scheduler = klass()
891     common.scheduler.configure(self.cfg_params)
892     return
893    
894     def run(self):
895     """
896     For each
897     """
898    
899     for act in self.main_actions:
900     if act in self.actions.keys(): self.actions[act].run()
901     pass
902    
903     for act in self.aux_actions:
904     if act in self.actions.keys(): self.actions[act].run()
905     pass
906     return
907    
908     ###########################################################################
909     def processHelpOptions(opts):
910    
911 slacapra 1.11 if len(opts):
912     for opt in opts.keys():
913     if opt in ('-v', '-version', '--version') :
914     print Crab.version()
915     return 1
916     if opt in ('-h','-help','--help') :
917     if opts[opt] : help(opts[opt])
918     else: help()
919     return 1
920     else:
921     usage()
922 corvo 1.93
923 nsmirnov 1.1 return 0
924    
925 corvo 1.93 ###########################################################################
926 nsmirnov 1.1 if __name__ == '__main__':
927 slacapra 1.92 ## Get rid of some useless warning
928 slacapra 1.94 try:
929     import warnings
930     warnings.simplefilter("ignore", RuntimeWarning)
931     except:
932     pass # too bad, you'll get the warning
933 corvo 1.93
934 nsmirnov 1.1 # Parse command-line options and create a dictionary with
935     # key-value pairs.
936    
937     options = parseOptions(sys.argv[1:])
938    
939     # Process "help" options, such as '-help', '-version'
940    
941 slacapra 1.11 if processHelpOptions(options) : sys.exit(0)
942 nsmirnov 1.1
943     # Create, initialize, and run a Crab object
944    
945     try:
946 nsmirnov 1.3 crab = Crab(options)
947 nsmirnov 1.1 crab.run()
948 corvo 1.54 crab.cfg_params['apmon'].free()
949 nsmirnov 1.1 except CrabException, e:
950     print '\n' + common.prog_name + ': ' + str(e) + '\n'
951     if common.logger:
952     common.logger.write('ERROR: '+str(e)+'\n')
953     pass
954     pass
955    
956     pass