ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/crab.py
Revision: 1.123
Committed: Fri Sep 14 16:06:21 2007 UTC (17 years, 7 months ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_1_5_4_pre1
Changes since 1.122: +2 -0 lines
Log Message:
change the path before execute grid commands

File Contents

# User Rev Content
1 elmer 1.65 #!/usr/bin/env python
2 nsmirnov 1.1 from crab_help import *
3     from crab_util import *
4     from crab_exceptions import *
5     from crab_logger import Logger
6     from WorkSpace import WorkSpace
7     from JobDB import JobDB
8 slacapra 1.77 from TaskDB import TaskDB
9 nsmirnov 1.3 from JobList import JobList
10 nsmirnov 1.1 from Creator import Creator
11     from Submitter import Submitter
12 slacapra 1.8 from Checker import Checker
13 slacapra 1.9 from PostMortem import PostMortem
14     from Status import Status
15 spiga 1.16 from StatusBoss import StatusBoss
16 corvo 1.42 from ApmonIf import ApmonIf
17 slacapra 1.36 from Cleaner import Cleaner
18 nsmirnov 1.1 import common
19 spiga 1.13 import Statistic
20 spiga 1.107 import commands
21 gutsche 1.121 from BlackWhiteListParser import BlackWhiteListParser
22 nsmirnov 1.1
23 spiga 1.98 from BossSession import *
24 corvo 1.93
25 spiga 1.107 #modified to support server mode
26 mcinquil 1.119 #from SubmitterServer import SubmitterServer
27     #from GetOutputServer import GetOutputServer
28     #from StatusServer import StatusServer
29     #from PostMortemServer import PostMortemServer
30     #from KillerServer import KillerServer
31     #from CleanerServer import CleanerServer
32 spiga 1.107
33 nsmirnov 1.1 import sys, os, time, string
34    
35     ###########################################################################
36     class Crab:
37 nsmirnov 1.3 def __init__(self, opts):
38 fanzago 1.76 ## test_tag
39 nsmirnov 1.1 # The order of main_actions is important !
40 fanzago 1.111 self.main_actions = [ '-create', '-submit' ]
41     ### FEDE new option "-publish" FOR DBS OUTPUT PUBLICATION
42 slacapra 1.70 self.aux_actions = [ '-list', '-kill', '-status', '-getoutput','-get',
43 slacapra 1.67 '-resubmit' , '-cancelAndResubmit', '-testJdl', '-postMortem', '-clean',
44 fanzago 1.111 '-printId', '-publish' ]
45 nsmirnov 1.1
46     # Dictionary of actions, e.g. '-create' -> object of class Creator
47     self.actions = {}
48 fanzago 1.76
49 nsmirnov 1.1 # Configuration file
50     self.cfg_fname = None
51     # Dictionary with configuration parameters
52     self.cfg_params = {}
53    
54     # Current working directory
55     self.cwd = os.getcwd()+'/'
56     # Current time in format 'yymmdd_hhmmss'
57     self.current_time = time.strftime('%y%m%d_%H%M%S',
58     time.localtime(time.time()))
59    
60 nsmirnov 1.3 # Session name (?) Do we need this ?
61 nsmirnov 1.1 self.name = '0'
62    
63     # Job type
64     self.job_type_name = None
65    
66     # Continuation flag
67     self.flag_continue = 0
68    
69     # quiet mode, i.e. no output on screen
70     self.flag_quiet = 0
71     # produce more output
72     self.debug_level = 0
73    
74    
75 nsmirnov 1.3 self.initialize_(opts)
76 nsmirnov 1.1
77     return
78    
79 nsmirnov 1.7 def version():
80 nsmirnov 1.1 return common.prog_version_str
81    
82 nsmirnov 1.7 version = staticmethod(version)
83    
84 nsmirnov 1.3 def initialize_(self, opts):
85 nsmirnov 1.1
86     # Process the '-continue' option first because
87     # in the case of continuation the CRAB configuration
88     # parameters are loaded from already existing Working Space.
89 nsmirnov 1.3 self.processContinueOption_(opts)
90 nsmirnov 1.1
91     # Process ini-file first, then command line options
92     # because they override ini-file settings.
93    
94 nsmirnov 1.3 self.processIniFile_(opts)
95 nsmirnov 1.1
96 nsmirnov 1.3 if self.flag_continue: opts = self.loadConfiguration_(opts)
97 nsmirnov 1.1
98 nsmirnov 1.3 self.processOptions_(opts)
99 nsmirnov 1.1
100 slacapra 1.77 common.taskDB = TaskDB()
101    
102 slacapra 1.101
103 nsmirnov 1.1 if not self.flag_continue:
104 nsmirnov 1.3 self.createWorkingSpace_()
105 slacapra 1.9 optsToBeSaved={}
106     for it in opts.keys():
107     if (it in self.main_actions) or (it in self.aux_actions) or (it == '-debug'):
108     pass
109     else:
110     optsToBeSaved[it]=opts[it]
111 slacapra 1.101 # store in taskDB the opts
112     common.taskDB.setDict(it[1:], optsToBeSaved[it])
113 slacapra 1.9 common.work_space.saveConfiguration(optsToBeSaved, self.cfg_fname)
114 nsmirnov 1.1 pass
115 slacapra 1.77 else:
116     common.taskDB.load()
117 nsmirnov 1.1
118     # At this point all configuration options have been read.
119    
120     args = string.join(sys.argv,' ')
121 slacapra 1.11
122 nsmirnov 1.3 self.updateHistory_(args)
123 slacapra 1.11
124 nsmirnov 1.3 self.createLogger_(args)
125 slacapra 1.11
126 nsmirnov 1.1 common.jobDB = JobDB()
127 corvo 1.51
128 gutsche 1.121 # init BlackWhiteListParser
129     self.blackWhiteListParser = BlackWhiteListParser(self.cfg_params)
130    
131 spiga 1.107 self.UseServer=0
132     try:
133     self.UseServer=int(self.cfg_params['CRAB.server_mode'])
134     except KeyError:
135     pass
136    
137    
138 corvo 1.63 if int(self.cfg_params['USER.activate_monalisa']): self.cfg_params['apmon'] = ApmonIf()
139    
140 nsmirnov 1.3 if self.flag_continue:
141 slacapra 1.12 try:
142     common.jobDB.load()
143 corvo 1.50 self.cfg_params['taskId'] = common.jobDB._jobs[0].taskId
144 slacapra 1.12 common.logger.debug(6, str(common.jobDB))
145     except DBException,e:
146     pass
147 nsmirnov 1.3 pass
148 slacapra 1.11
149 nsmirnov 1.3 self.createScheduler_()
150 slacapra 1.11
151 slacapra 1.106 common.logger.debug(6, 'Used properties:')
152     if (common.logger.debugLevel()<6 ):
153     common.logger.write('Used properties:')
154     keys = self.cfg_params.keys()
155     keys.sort()
156     for k in keys:
157     if self.cfg_params[k]:
158     common.logger.debug(6, ' '+k+' : '+str(self.cfg_params[k]))
159     if (common.logger.debugLevel()<6 ):
160     common.logger.write(' '+k+' : '+str(self.cfg_params[k]))
161     pass
162     else:
163     common.logger.debug(6, ' '+k+' : ')
164     if (common.logger.debugLevel()<6 ):
165     common.logger.write(' '+k+' : ')
166 nsmirnov 1.3 pass
167     pass
168 slacapra 1.106 common.logger.debug(6, 'End of used properties.\n')
169     if (common.logger.debugLevel()<6 ):
170     common.logger.write('End of used properties.\n')
171    
172 nsmirnov 1.3 self.initializeActions_(opts)
173 nsmirnov 1.1 return
174    
175 nsmirnov 1.3 def processContinueOption_(self,opts):
176 nsmirnov 1.1
177     continue_dir = None
178 nsmirnov 1.4
179     # Look for the '-continue' option.
180    
181 nsmirnov 1.1 for opt in opts.keys():
182     if ( opt in ('-continue','-c') ):
183     self.flag_continue = 1
184     val = opts[opt]
185     if val:
186     if val[0] == '/': continue_dir = val # abs path
187     else: continue_dir = self.cwd + val # rel path
188     pass
189 nsmirnov 1.4 break
190     pass
191    
192     # Look for actions which has sense only with '-continue'
193    
194     if not self.flag_continue:
195     for opt in opts.keys():
196 slacapra 1.102 if ( opt in (self.aux_actions)):
197 nsmirnov 1.4 self.flag_continue = 1
198     break
199 nsmirnov 1.1 pass
200     pass
201 slacapra 1.102 if ("-submit" in opts.keys() and "-create" not in opts.keys() ):
202     self.flag_continue = 1
203 nsmirnov 1.1
204     if not self.flag_continue: return
205    
206     if not continue_dir:
207     prefix = common.prog_name + '_' + self.name + '_'
208     continue_dir = findLastWorkDir(prefix)
209     pass
210    
211     if not continue_dir:
212     raise CrabException('Cannot find last working directory.')
213    
214     if not os.path.exists(continue_dir):
215     msg = 'Cannot continue because the working directory <'
216     msg += continue_dir
217     msg += '> does not exist.'
218     raise CrabException(msg)
219    
220     # Instantiate WorkSpace
221 fanzago 1.49 common.work_space = WorkSpace(continue_dir, self.cfg_params)
222 nsmirnov 1.1
223     return
224    
225 nsmirnov 1.3 def processIniFile_(self, opts):
226 nsmirnov 1.1 """
227     Processes a configuration INI-file.
228     """
229    
230     # Extract cfg-file name from the cmd-line options.
231    
232     for opt in opts.keys():
233     if ( opt == '-cfg' ):
234     if self.flag_continue:
235     raise CrabException('-continue and -cfg cannot coexist.')
236     if opts[opt] : self.cfg_fname = opts[opt]
237     else : usage()
238     pass
239    
240     elif ( opt == '-name' ):
241     self.name = opts[opt]
242     pass
243    
244     pass
245    
246     # Set default cfg-fname
247    
248     if self.cfg_fname == None:
249     if self.flag_continue:
250     self.cfg_fname = common.work_space.cfgFileName()
251     else:
252     self.cfg_fname = common.prog_name+'.cfg'
253     pass
254     pass
255    
256     # Load cfg-file
257    
258     if string.lower(self.cfg_fname) != 'none':
259     if os.path.exists(self.cfg_fname):
260     self.cfg_params = loadConfig(self.cfg_fname)
261 corvo 1.64 self.cfg_params['user'] = os.environ['USER']
262 nsmirnov 1.1 pass
263     else:
264     msg = 'cfg-file '+self.cfg_fname+' not found.'
265     raise CrabException(msg)
266     pass
267     pass
268    
269     # process the [CRAB] section
270    
271     lhp = len('CRAB.')
272     for k in self.cfg_params.keys():
273     if len(k) >= lhp and k[:lhp] == 'CRAB.':
274     opt = '-'+k[lhp:]
275     if len(opt) >= 3 and opt[:3] == '-__': continue
276     if opt not in opts.keys():
277     opts[opt] = self.cfg_params[k]
278     pass
279     pass
280     pass
281    
282     return
283    
284 nsmirnov 1.3 def processOptions_(self, opts):
285 nsmirnov 1.1 """
286     Processes the command-line options.
287     """
288    
289     for opt in opts.keys():
290     val = opts[opt]
291    
292 nsmirnov 1.3 # Skip actions, they are processed later in initializeActions_()
293     if opt in self.main_actions:
294     self.cfg_params['CRAB.'+opt[1:]] = val
295     continue
296     if opt in self.aux_actions:
297     self.cfg_params['CRAB.'+opt[1:]] = val
298     continue
299 nsmirnov 1.1
300 spiga 1.107 elif ( opt == '-server_mode' ): #Add for server mode usage
301     pass
302     elif ( opt == '-server_name' ):
303     pass
304 nsmirnov 1.1
305     elif ( opt == '-cfg' ):
306     pass
307    
308     elif ( opt in ('-continue', '-c') ):
309 nsmirnov 1.4 # Already processed in processContinueOption_()
310 nsmirnov 1.1 pass
311    
312     elif ( opt == '-jobtype' ):
313     if val : self.job_type_name = string.upper(val)
314     else : usage()
315     pass
316    
317     elif ( opt == '-Q' ):
318     self.flag_quiet = 1
319     pass
320    
321     elif ( opt == '-debug' ):
322 slacapra 1.6 if val: self.debug_level = int(val)
323     else: self.debug_level = 1
324 nsmirnov 1.1 pass
325    
326     elif ( opt == '-scheduler' ):
327     pass
328 slacapra 1.22
329 nsmirnov 1.3 elif string.find(opt,'.') == -1:
330     print common.prog_name+'. Unrecognized option '+opt
331     usage()
332     pass
333 nsmirnov 1.1
334 nsmirnov 1.3 # Override config parameters from INI-file with cmd-line params
335     if string.find(opt,'.') == -1 :
336     self.cfg_params['CRAB.'+opt[1:]] = val
337 nsmirnov 1.1 pass
338 nsmirnov 1.3 else:
339 nsmirnov 1.1 # Command line parameters in the form -SECTION.ENTRY=VALUE
340     self.cfg_params[opt[1:]] = val
341     pass
342     pass
343     return
344    
345 slacapra 1.8 def parseRange_(self, aRange):
346 nsmirnov 1.4 """
347 slacapra 1.8 Takes as the input a string with a range defined in any of the following
348     way, including combination, and return a tuple with the ints defined
349     according to following table. A consistency check is done.
350     NB: the first job is "1", not "0".
351     'all' -> [1,2,..., NJobs]
352     '' -> [1,2,..., NJobs]
353     'n1' -> [n1]
354     'n1-n2' -> [n1, n1+1, n1+2, ..., n2-1, n2]
355     'n1,n2' -> [n1, n2]
356     'n1,n2-n3' -> [n1, n2, n2+1, n2+2, ..., n3-1, n3]
357     """
358     result = []
359    
360 slacapra 1.9 common.logger.debug(5,"parseRange_ "+str(aRange))
361     if aRange=='all' or aRange==None or aRange=='':
362 slacapra 1.73 result=range(1,common.jobDB.nJobs()+1)
363 slacapra 1.8 return result
364 slacapra 1.9 elif aRange=='0':
365     return result
366 slacapra 1.8
367     subRanges = string.split(aRange, ',')
368     for subRange in subRanges:
369     result = result+self.parseSimpleRange_(subRange)
370    
371     if self.checkUniqueness_(result):
372     return result
373     else:
374 slacapra 1.33 common.logger.message("Error "+result)
375 slacapra 1.8 return []
376    
377     def checkUniqueness_(self, list):
378     """
379 slacapra 1.9 check if a list contains only unique elements
380 slacapra 1.8 """
381    
382     uniqueList = []
383     # use a list comprehension statement (takes a while to understand)
384    
385     [uniqueList.append(it) for it in list if not uniqueList.count(it)]
386    
387     return (len(list)==len(uniqueList))
388    
389 spiga 1.107 def uuidgen(self):
390     """Generate a UUID"""
391     return commands.getoutput('uuidgen')
392    
393    
394 slacapra 1.8 def parseSimpleRange_(self, aRange):
395     """
396     Takes as the input a string with two integers separated by
397     the minus sign and returns the tuple with these numbers:
398     'n1-n2' -> [n1, n1+1, n1+2, ..., n2-1, n2]
399     'n1' -> [n1]
400     """
401     (start, end) = (None, None)
402    
403     result = []
404     minus = string.find(aRange, '-')
405     if ( minus < 0 ):
406     if isInt(aRange) and int(aRange)>0:
407 corvo 1.97 # FEDE
408     #result.append(int(aRange)-1)
409     ###
410 slacapra 1.62 result.append(int(aRange))
411 slacapra 1.8 else:
412 spiga 1.47 common.logger.message("parseSimpleRange_ ERROR "+aRange)
413     usage()
414     pass
415    
416 nsmirnov 1.4 pass
417     else:
418 slacapra 1.8 (start, end) = string.split(aRange, '-')
419     if isInt(start) and isInt(end) and int(start)>0 and int(start)<int(end):
420 corvo 1.97 #result=range(int(start)-1, int(end))
421     result=range(int(start), int(end)+1) #Daniele
422 slacapra 1.8 else:
423 slacapra 1.33 common.logger.message("parseSimpleRange_ ERROR "+start+end)
424 slacapra 1.8
425     return result
426 nsmirnov 1.4
427 nsmirnov 1.3 def initializeActions_(self, opts):
428 nsmirnov 1.1 """
429     For each user action instantiate a corresponding
430     object and put it in the action dictionary.
431     """
432 spiga 1.15
433     for opt in opts.keys():
434    
435 nsmirnov 1.1 val = opts[opt]
436 spiga 1.15
437    
438     if ( opt == '-create' ):
439 gutsche 1.83 if val and val != 'all':
440     msg = 'Per default, CRAB will create all jobs as specified in the crab.cfg file, not the command line!'
441     common.logger.message(msg)
442     msg = 'Submission will still take into account the number of jobs specified on the command line!\n'
443     common.logger.message(msg)
444 slacapra 1.82 ncjobs = 'all'
445    
446     # Instantiate Creator object
447     self.creator = Creator(self.job_type_name,
448     self.cfg_params,
449     ncjobs)
450     self.actions[opt] = self.creator
451    
452     # Initialize the JobDB object if needed
453     if not self.flag_continue:
454     common.jobDB.create(self.creator.nJobs())
455 nsmirnov 1.1 pass
456 nsmirnov 1.3
457 slacapra 1.82 # Create and initialize JobList
458 nsmirnov 1.3
459 slacapra 1.82 common.job_list = JobList(common.jobDB.nJobs(),
460     self.creator.jobType())
461 nsmirnov 1.3
462 slacapra 1.82 common.taskDB.setDict('ScriptName',common.work_space.jobDir()+"/"+self.job_type_name+'.sh')
463     common.taskDB.setDict('JdlName',common.work_space.jobDir()+"/"+self.job_type_name+'.jdl')
464     common.taskDB.setDict('CfgName',common.work_space.jobDir()+"/"+self.creator.jobType().configFilename())
465     common.job_list.setScriptNames(self.job_type_name+'.sh')
466     common.job_list.setJDLNames(self.job_type_name+'.jdl')
467     common.job_list.setCfgNames(self.creator.jobType().configFilename())
468 slacapra 1.9
469 slacapra 1.82 self.creator.writeJobsSpecsToDB()
470 spiga 1.107 taskUnicId= self.uuidgen()
471     common.taskDB.setDict('TasKUUID',taskUnicId)
472 slacapra 1.77
473 slacapra 1.82 common.taskDB.save()
474 nsmirnov 1.3 pass
475 nsmirnov 1.1
476     elif ( opt == '-submit' ):
477 spiga 1.107 # modified to support server mode
478     if (self.UseServer== 1):
479 mcinquil 1.119 from SubmitterServer import SubmitterServer
480 spiga 1.107 self.actions[opt] = SubmitterServer(self.cfg_params)
481     else:
482     # modified to support server mode
483     # get user request
484     nsjobs = -1
485     if val:
486     if ( isInt(val) ):
487     nsjobs = int(val)
488     elif (val=='all'):
489     pass
490     else:
491     msg = 'Bad submission option <'+str(val)+'>\n'
492     msg += ' Must be an integer or "all"'
493     msg += ' Generic range is not allowed"'
494     raise CrabException(msg)
495     pass
496    
497     common.logger.debug(5,'nsjobs '+str(nsjobs))
498     # total jobs
499     nj_list = []
500     # get the first not already submitted
501     common.logger.debug(5,'Total jobs '+str(common.jobDB.nJobs()))
502     jobSetForSubmission = 0
503 gutsche 1.110 jobSkippedInSubmission = []
504 slacapra 1.114 datasetpath=self.cfg_params['CMSSW.datasetpath']
505 spiga 1.107 for nj in range(common.jobDB.nJobs()):
506 gutsche 1.121 if (self.blackWhiteListParser.cleanForBlackWhiteList(common.jobDB.destination(nj)) != '') or (datasetpath == None ):
507 gutsche 1.110 if (common.jobDB.status(nj) not in ['R','S','K','Y','A','D','Z']):
508     jobSetForSubmission +=1
509     nj_list.append(nj)
510     else: continue
511     else :
512     jobSkippedInSubmission.append(nj+1)
513 spiga 1.107 if nsjobs >0 and nsjobs == jobSetForSubmission:
514     break
515     pass
516     if nsjobs>jobSetForSubmission:
517     common.logger.message('asking to submit '+str(nsjobs)+' jobs, but only '+str(jobSetForSubmission)+' left: submitting those')
518 gutsche 1.110 if len(jobSkippedInSubmission) > 0 :
519     common.logger.message("Jobs: " + spanRanges(jobSkippedInSubmission) + " skipped because no sites are hosting this data")
520 spiga 1.107 # submit N from last submitted job
521     common.logger.debug(5,'nj_list '+str(nj_list))
522    
523     if len(nj_list) != 0:
524     # Instantiate Submitter object
525     self.actions[opt] = Submitter(self.cfg_params, nj_list)
526     # Create and initialize JobList
527     if len(common.job_list) == 0 :
528     common.job_list = JobList(common.jobDB.nJobs(),
529     None)
530     common.job_list.setJDLNames(self.job_type_name+'.jdl')
531     pass
532 slacapra 1.23 pass
533 slacapra 1.22 else:
534 spiga 1.107 common.logger.message('No jobs left to submit: exiting...')
535 nsmirnov 1.1 pass
536    
537 nsmirnov 1.4 elif ( opt == '-list' ):
538 slacapra 1.8 jobs = self.parseRange_(val)
539    
540     common.jobDB.dump(jobs)
541 nsmirnov 1.4 pass
542    
543 slacapra 1.67 elif ( opt == '-printId' ):
544 spiga 1.115 # modified to support server mode
545     if (self.UseServer== 1):
546     try:
547     common.taskDB.load()
548     WorkDirName =os.path.basename(os.path.split(common.work_space.topDir())[0])
549     projectUniqName = 'crab_'+str(WorkDirName)+'_'+common.taskDB.dict('TasKUUID')
550     print "Task Id = %-40s " %(projectUniqName)
551     except:
552     common.logger.message("Warning :Interaction in query task unique ID failed")
553     pass
554     else:
555     try:
556 slacapra 1.117 common.scheduler.bossTask.load(ALL)
557 spiga 1.115 except RuntimeError,e:
558     common.logger.message( e.__str__() )
559     except ValueError,e:
560     common.logger.message("Warning : Scheduler interaction in query operation failed for jobs:")
561     common.logger.message(e.what())
562     pass
563     task = common.scheduler.bossTask.jobsDict()
564    
565     for c, v in task.iteritems():
566     k = int(c)
567     nj=k
568     id = v['CHAIN_ID']
569     jid = v['SCHED_ID']
570     if jid:
571     print "Job: %-5s Id = %-40s " %(id,jid)
572     #else:
573     # print "Job: ",id," No ID yet"
574 corvo 1.97 pass
575 slacapra 1.67
576 nsmirnov 1.4 elif ( opt == '-status' ):
577 spiga 1.107 # modified to support server mode
578     if (self.UseServer== 1):
579 mcinquil 1.119 from StatusServer import StatusServer
580 spiga 1.107 self.actions[opt] = StatusServer(self.cfg_params)
581     else:
582     jobs = self.parseRange_(val)
583    
584     if len(jobs) != 0:
585     self.actions[opt] = StatusBoss(self.cfg_params)
586     pass
587 slacapra 1.22
588 nsmirnov 1.4 elif ( opt == '-kill' ):
589 slacapra 1.8
590 corvo 1.109 if (self.UseServer== 1):
591 mcinquil 1.119 from KillerServer import KillerServer
592 farinafa 1.120
593     # Matteo for server kill by range
594     if val:
595     if val !='all':
596     val = self.parseRange_(val)
597     else:
598     val='all'
599    
600     self.actions[opt] = KillerServer(self.cfg_params,val)
601    
602     #self.actions[opt] = KillerServer(self.cfg_params)
603 corvo 1.109 else:
604 farinafa 1.120 if val:
605 corvo 1.109 if val =='all':
606     jobs = common.scheduler.listBoss()
607     else:
608     jobs = self.parseRange_(val)
609     common.scheduler.cancel(jobs)
610 spiga 1.31 else:
611 corvo 1.109 common.logger.message("Warning: with '-kill' you _MUST_ specify a job range or 'all'")
612 nsmirnov 1.1
613 farinafa 1.120
614    
615 slacapra 1.70 elif ( opt == '-getoutput' or opt == '-get'):
616 spiga 1.107 # modified to support server mode
617     if (self.UseServer== 1):
618 mcinquil 1.119 from GetOutputServer import GetOutputServer
619 spiga 1.107 self.actions[opt] = GetOutputServer(self.cfg_params)
620 spiga 1.20 else:
621 spiga 1.13
622 spiga 1.107 if val=='all' or val==None or val=='':
623     jobs = common.scheduler.listBoss()
624     else:
625     jobs = self.parseRange_(val)
626    
627     jobs_done = []
628     for nj in jobs:
629     jobs_done.append(nj)
630     common.scheduler.getOutput(jobs_done)
631     pass
632 nsmirnov 1.4
633     elif ( opt == '-resubmit' ):
634 slacapra 1.78 if val=='all' or val==None or val=='':
635     jobs = common.scheduler.listBoss()
636 fanzago 1.37 else:
637 slacapra 1.78 jobs = self.parseRange_(val)
638 fanzago 1.37
639 slacapra 1.11 if val:
640     # create a list of jobs to be resubmitted.
641 corvo 1.93 val = string.replace(val,'-',':')
642 slacapra 1.8
643 slacapra 1.22 ### as before, create a Resubmittter Class
644 slacapra 1.78 maxIndex = common.scheduler.listBoss()
645 corvo 1.93 ##
646     # Marco. Vediamo se va meglio cosi'...
647     ##
648 corvo 1.97 try:
649     common.scheduler.bossTask.query(ALL, val)
650     except RuntimeError,e:
651     common.logger.message( e.__str__() )
652     except ValueError,e:
653 slacapra 1.103 common.logger.message( "Warning : Scheduler interaction in query operation failed for jobs:")
654     common.logger.message(e.what())
655 corvo 1.97 pass
656 corvo 1.93 task = common.scheduler.bossTask.jobsDict()
657    
658 slacapra 1.11 nj_list = []
659 corvo 1.93 for c, v in task.iteritems():
660 corvo 1.97 k = int(c)
661     nj=k
662 corvo 1.93 st = v['STATUS']
663    
664 corvo 1.97 if int(nj) <= int(len(maxIndex)) :
665 spiga 1.104 if st in ['K','SA','Z','DA']:
666 corvo 1.97 nj_list.append(int(nj)-1)
667     common.jobDB.setStatus(int(nj)-1,'C')
668 spiga 1.99 elif st in ['E','SE']:
669 spiga 1.60 common.scheduler.moveOutput(nj)
670 corvo 1.97 nj_list.append(int(nj)-1)
671     st = common.jobDB.setStatus(int(nj)-1,'RC')
672 corvo 1.93 elif st in ['W']:
673 corvo 1.97 common.logger.message('Job #'+`int(nj)`+' has status '+crabJobStatusToString(st)+' not yet submitted!!!')
674 spiga 1.60 pass
675 corvo 1.93 elif st in ['SD', 'OR']:
676 corvo 1.97 common.logger.message('Job #'+`int(nj)`+' has status '+crabJobStatusToString(st)+' must be retrieved before resubmission')
677 spiga 1.60 else:
678     common.logger.message('Job #'+`nj`+' has status '+crabJobStatusToString(st)+' must be "killed" before resubmission')
679 slacapra 1.11 else:
680 corvo 1.97 common.logger.message('Job #'+`int(nj)`+' no possible to resubmit!! out of range')
681 fanzago 1.37 if len(common.job_list) == 0 :
682     common.job_list = JobList(common.jobDB.nJobs(),None)
683     common.job_list.setJDLNames(self.job_type_name+'.jdl')
684     pass
685    
686 slacapra 1.11 if len(nj_list) != 0:
687 corvo 1.93 nj_list.sort()
688 slacapra 1.11 # Instantiate Submitter object
689 fanzago 1.58 self.actions[opt] = Submitter(self.cfg_params, nj_list)
690 slacapra 1.11
691 slacapra 1.8 pass
692     pass
693 slacapra 1.11 else:
694     common.logger.message("Warning: with '-resubmit' you _MUST_ specify a job range or 'all'")
695 spiga 1.60 common.logger.message("WARNING: _all_ job specified in the range will be resubmitted!!!")
696 slacapra 1.11 pass
697 fanzago 1.37 common.jobDB.save()
698 slacapra 1.8 pass
699 gutsche 1.61
700 slacapra 1.8 elif ( opt == '-cancelAndResubmit' ):
701 nsmirnov 1.5
702 slacapra 1.78 if val:
703     if val =='all':
704     jobs = common.scheduler.listBoss()
705 spiga 1.47 else:
706 slacapra 1.78 jobs = self.parseRange_(val)
707     # kill submitted jobs
708     common.scheduler.cancel(jobs)
709 spiga 1.47 else:
710 slacapra 1.78 common.logger.message("Warning: with '-cancelAndResubmit' you _MUST_ specify a job range or 'all'")
711 nsmirnov 1.5
712 spiga 1.47 # resubmit cancelled jobs.
713     if val:
714     nj_list = []
715     for nj in jobs:
716     st = common.jobDB.status(int(nj)-1)
717     if st in ['K','A']:
718 corvo 1.97 nj_list.append(int(nj)-1)
719     common.jobDB.setStatus(int(nj)-1,'C')
720 spiga 1.47 elif st == 'Y':
721     common.scheduler.moveOutput(nj)
722 corvo 1.97 nj_list.append(int(nj)-1)
723     st = common.jobDB.setStatus(int(nj)-1,'RC')
724 spiga 1.47 elif st in ['C','X']:
725 corvo 1.97 common.logger.message('Job #'+`int(nj)`+' has status '+crabJobStatusToString(st)+' not yet submitted!!!')
726 spiga 1.47 pass
727     elif st == 'D':
728 corvo 1.97 common.logger.message('Job #'+`int(nj)`+' has status '+crabJobStatusToString(st)+' must be retrieved before resubmission')
729 spiga 1.47 else:
730     common.logger.message('Job #'+`nj`+' has status '+crabJobStatusToString(st)+' must be "killed" before resubmission')
731     pass
732    
733 nsmirnov 1.5 if len(common.job_list) == 0 :
734 spiga 1.47 common.job_list = JobList(common.jobDB.nJobs(),None)
735 nsmirnov 1.5 common.job_list.setJDLNames(self.job_type_name+'.jdl')
736     pass
737 spiga 1.47
738     if len(nj_list) != 0:
739 spiga 1.86 # common.scheduler.resubmit(nj_list)
740 fanzago 1.58 self.actions[opt] = Submitter(self.cfg_params, nj_list)
741 spiga 1.47 pass
742     pass
743     else:
744     common.logger.message("WARNING: _all_ job specified in the rage will be cancelled and resubmitted!!!")
745 nsmirnov 1.5 pass
746 spiga 1.47 common.jobDB.save()
747 nsmirnov 1.4 pass
748 spiga 1.47
749 slacapra 1.28 elif ( opt == '-testJdl' ):
750 slacapra 1.8 jobs = self.parseRange_(val)
751     nj_list = []
752     for nj in jobs:
753 slacapra 1.62 st = common.jobDB.status(nj-1)
754     if st == 'C': nj_list.append(nj-1)
755 slacapra 1.8 pass
756    
757     if len(nj_list) != 0:
758 slacapra 1.77 # Instantiate Checker object
759 slacapra 1.8 self.actions[opt] = Checker(self.cfg_params, nj_list)
760    
761     # Create and initialize JobList
762    
763     if len(common.job_list) == 0 :
764     common.job_list = JobList(common.jobDB.nJobs(), None)
765     common.job_list.setJDLNames(self.job_type_name+'.jdl')
766     pass
767     pass
768    
769 slacapra 1.9 elif ( opt == '-postMortem' ):
770 corvo 1.95
771 spiga 1.108 # modified to support server mode
772     if (self.UseServer== 1):
773 mcinquil 1.119 from PostMortemServer import PostMortemServer
774 spiga 1.108 self.actions[opt] = PostMortemServer(self.cfg_params)
775     else:
776     if val:
777     val = string.replace(val,'-',':')
778     else: val=''
779     nj_list = {}
780 corvo 1.97
781 spiga 1.108 try:
782     common.scheduler.bossTask.query(ALL, val)
783     except RuntimeError,e:
784     common.logger.message( e.__str__() )
785     except ValueError,e:
786     common.logger.message("Warning : Scheduler interaction in query operation failed for jobs:")
787     common.logger.message( e.what())
788     pass
789 corvo 1.93
790 spiga 1.108 task = common.scheduler.bossTask.jobsDict()
791    
792     for c, v in task.iteritems():
793     k = int(c)
794     nj=k
795     if v['SCHED_ID']: nj_list[v['CHAIN_ID']]=v['SCHED_ID']
796     pass
797 slacapra 1.9
798 spiga 1.108 if len(nj_list) != 0:
799 corvo 1.93 # Instantiate PostMortem object
800 spiga 1.108 self.actions[opt] = PostMortem(self.cfg_params, nj_list)
801 slacapra 1.9 # Create and initialize JobList
802 spiga 1.108 if len(common.job_list) == 0 :
803     common.job_list = JobList(common.jobDB.nJobs(), None)
804     common.job_list.setJDLNames(self.job_type_name+'.jdl')
805     pass
806 slacapra 1.9 pass
807 spiga 1.108 else:
808     common.logger.message("No jobs to analyze")
809 slacapra 1.9
810     elif ( opt == '-clean' ):
811     if val != None:
812     raise CrabException("No range allowed for '-clean'")
813 mcinquil 1.118 if (self.UseServer== 1):
814 mcinquil 1.119 from CleanerServer import CleanerServer
815 mcinquil 1.118 self.actions[opt] = CleanerServer(self.cfg_params)
816     else:
817     self.actions[opt] = Cleaner(self.cfg_params)
818    
819 fanzago 1.111 ### FEDE DBS/DLS OUTPUT PUBLICATION
820     elif ( opt == '-publish' ):
821 slacapra 1.113 from DBS2Publisher import Publisher
822 slacapra 1.116 self.actions[opt] = Publisher(self.cfg_params)
823    
824     #precessedData=self.cfg_params['USER.publish_data_name']
825     #self.cfg_params['USER.processed_datasetname']
826     #thePublisher = Publisher(self.cfg_params)
827     # publish_exit_status = thePublisher.publish()
828     # if (publish_exit_status == '1'):
829     # common.logger.message("user data publication --> problems")
830     # else:
831     # common.logger.message("user data publication --> ok ")
832     #########################################
833 nsmirnov 1.1 pass
834     return
835    
836 nsmirnov 1.3 def createWorkingSpace_(self):
837 slacapra 1.9 new_dir = ''
838    
839     try:
840     new_dir = self.cfg_params['USER.ui_working_dir']
841 corvo 1.50 self.cfg_params['taskId'] = self.cfg_params['user'] + '_' + string.split(new_dir, '/')[len(string.split(new_dir, '/')) - 1] + '_' + self.current_time
842 slacapra 1.122 if len(string.split(new_dir, '/')) == 1:
843     new_dir = self.cwd + new_dir
844     else:
845     new_dir = new_dir
846 fanzago 1.48 if os.path.exists(new_dir):
847     if os.listdir(new_dir):
848     msg = new_dir + ' already exists and is not empty. Please remove it before create new task'
849     raise CrabException(msg)
850 slacapra 1.9 except KeyError:
851     new_dir = common.prog_name + '_' + self.name + '_' + self.current_time
852 corvo 1.50 self.cfg_params['taskId'] = self.cfg_params['user'] + '_' + new_dir
853 slacapra 1.9 new_dir = self.cwd + new_dir
854     pass
855 fanzago 1.49 common.work_space = WorkSpace(new_dir, self.cfg_params)
856 nsmirnov 1.1 common.work_space.create()
857     return
858    
859 nsmirnov 1.3 def loadConfiguration_(self, opts):
860 nsmirnov 1.1
861     save_opts = common.work_space.loadSavedOptions()
862    
863     # Override saved options with new command-line options
864    
865     for k in opts.keys():
866     save_opts[k] = opts[k]
867     pass
868    
869     # Return updated options
870     return save_opts
871    
872 nsmirnov 1.3 def createLogger_(self, args):
873 nsmirnov 1.1
874     log = Logger()
875     log.quiet(self.flag_quiet)
876     log.setDebugLevel(self.debug_level)
877     log.write(args+'\n')
878 nsmirnov 1.3 log.message(self.headerString_())
879 nsmirnov 1.1 log.flush()
880     common.logger = log
881     return
882    
883 nsmirnov 1.3 def updateHistory_(self, args):
884 nsmirnov 1.1 history_fname = common.prog_name+'.history'
885     history_file = open(history_fname, 'a')
886     history_file.write(self.current_time+': '+args+'\n')
887     history_file.close()
888     return
889    
890 nsmirnov 1.3 def headerString_(self):
891 nsmirnov 1.1 """
892     Creates a string describing program options either given in
893     the command line or their default values.
894     """
895     header = common.prog_name + ' (version ' + common.prog_version_str + \
896     ') running on ' + \
897     time.ctime(time.time())+'\n\n' + \
898     common.prog_name+'. Working options:\n'
899 fanzago 1.59 #print self.job_type_name
900 nsmirnov 1.1 header = header +\
901 slacapra 1.85 ' scheduler ' + self.cfg_params['CRAB.scheduler'] + '\n'+\
902 nsmirnov 1.1 ' job type ' + self.job_type_name + '\n'+\
903     ' working directory ' + common.work_space.topDir()\
904     + '\n'
905     return header
906    
907 nsmirnov 1.3 def createScheduler_(self):
908 nsmirnov 1.1 """
909     Creates a scheduler object instantiated by its name.
910     """
911 slacapra 1.101 klass_name = 'SchedulerBoss'
912 nsmirnov 1.1 file_name = klass_name
913     try:
914     klass = importName(file_name, klass_name)
915     except KeyError:
916     msg = 'No `class '+klass_name+'` found in file `'+file_name+'.py`'
917     raise CrabException(msg)
918     except ImportError, e:
919 slacapra 1.101 msg = 'Cannot create scheduler Boss'
920 nsmirnov 1.1 msg += ' (file: '+file_name+', class '+klass_name+'):\n'
921     msg += str(e)
922     raise CrabException(msg)
923    
924     common.scheduler = klass()
925     common.scheduler.configure(self.cfg_params)
926     return
927    
928     def run(self):
929     """
930     For each
931     """
932    
933     for act in self.main_actions:
934     if act in self.actions.keys(): self.actions[act].run()
935     pass
936    
937     for act in self.aux_actions:
938     if act in self.actions.keys(): self.actions[act].run()
939     pass
940     return
941    
942     ###########################################################################
943     def processHelpOptions(opts):
944    
945 slacapra 1.11 if len(opts):
946     for opt in opts.keys():
947     if opt in ('-v', '-version', '--version') :
948     print Crab.version()
949     return 1
950     if opt in ('-h','-help','--help') :
951     if opts[opt] : help(opts[opt])
952     else: help()
953     return 1
954     else:
955     usage()
956 corvo 1.93
957 nsmirnov 1.1 return 0
958    
959 corvo 1.93 ###########################################################################
960 nsmirnov 1.1 if __name__ == '__main__':
961 slacapra 1.92 ## Get rid of some useless warning
962 slacapra 1.94 try:
963     import warnings
964     warnings.simplefilter("ignore", RuntimeWarning)
965     except:
966     pass # too bad, you'll get the warning
967 corvo 1.93
968 spiga 1.123 os.putenv("PATH", definePath("new") )
969    
970 nsmirnov 1.1 # Parse command-line options and create a dictionary with
971     # key-value pairs.
972    
973     options = parseOptions(sys.argv[1:])
974    
975     # Process "help" options, such as '-help', '-version'
976    
977 slacapra 1.11 if processHelpOptions(options) : sys.exit(0)
978 nsmirnov 1.1
979     # Create, initialize, and run a Crab object
980    
981     try:
982 nsmirnov 1.3 crab = Crab(options)
983 nsmirnov 1.1 crab.run()
984 corvo 1.54 crab.cfg_params['apmon'].free()
985 nsmirnov 1.1 except CrabException, e:
986     print '\n' + common.prog_name + ': ' + str(e) + '\n'
987     if common.logger:
988     common.logger.write('ERROR: '+str(e)+'\n')
989     pass
990     pass
991    
992     pass