ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/crab.py
Revision: 1.125
Committed: Mon Sep 24 10:11:50 2007 UTC (17 years, 7 months ago) by farinafa
Content type: text/x-python
Branch: MAIN
Changes since 1.124: +28 -7 lines
Log Message:
Modified submitterServer for range submission and connected crab.py

File Contents

# User Rev Content
1 elmer 1.65 #!/usr/bin/env python
2 nsmirnov 1.1 from crab_help import *
3     from crab_util import *
4     from crab_exceptions import *
5     from crab_logger import Logger
6     from WorkSpace import WorkSpace
7     from JobDB import JobDB
8 slacapra 1.77 from TaskDB import TaskDB
9 nsmirnov 1.3 from JobList import JobList
10 nsmirnov 1.1 from Creator import Creator
11     from Submitter import Submitter
12 slacapra 1.8 from Checker import Checker
13 slacapra 1.9 from PostMortem import PostMortem
14     from Status import Status
15 spiga 1.16 from StatusBoss import StatusBoss
16 corvo 1.42 from ApmonIf import ApmonIf
17 slacapra 1.36 from Cleaner import Cleaner
18 nsmirnov 1.1 import common
19 spiga 1.13 import Statistic
20 spiga 1.107 import commands
21 gutsche 1.121 from BlackWhiteListParser import BlackWhiteListParser
22 nsmirnov 1.1
23 spiga 1.98 from BossSession import *
24 corvo 1.93
25 spiga 1.107 #modified to support server mode
26 mcinquil 1.119 #from SubmitterServer import SubmitterServer
27     #from GetOutputServer import GetOutputServer
28     #from StatusServer import StatusServer
29     #from PostMortemServer import PostMortemServer
30     #from KillerServer import KillerServer
31     #from CleanerServer import CleanerServer
32 spiga 1.107
33 nsmirnov 1.1 import sys, os, time, string
34    
35     ###########################################################################
36     class Crab:
37 nsmirnov 1.3 def __init__(self, opts):
38 fanzago 1.76 ## test_tag
39 nsmirnov 1.1 # The order of main_actions is important !
40 fanzago 1.111 self.main_actions = [ '-create', '-submit' ]
41     ### FEDE new option "-publish" FOR DBS OUTPUT PUBLICATION
42 slacapra 1.70 self.aux_actions = [ '-list', '-kill', '-status', '-getoutput','-get',
43 slacapra 1.67 '-resubmit' , '-cancelAndResubmit', '-testJdl', '-postMortem', '-clean',
44 fanzago 1.111 '-printId', '-publish' ]
45 nsmirnov 1.1
46     # Dictionary of actions, e.g. '-create' -> object of class Creator
47     self.actions = {}
48 fanzago 1.76
49 nsmirnov 1.1 # Configuration file
50     self.cfg_fname = None
51     # Dictionary with configuration parameters
52     self.cfg_params = {}
53    
54     # Current working directory
55     self.cwd = os.getcwd()+'/'
56     # Current time in format 'yymmdd_hhmmss'
57     self.current_time = time.strftime('%y%m%d_%H%M%S',
58     time.localtime(time.time()))
59    
60 nsmirnov 1.3 # Session name (?) Do we need this ?
61 nsmirnov 1.1 self.name = '0'
62    
63     # Job type
64     self.job_type_name = None
65    
66     # Continuation flag
67     self.flag_continue = 0
68    
69     # quiet mode, i.e. no output on screen
70     self.flag_quiet = 0
71     # produce more output
72     self.debug_level = 0
73    
74    
75 nsmirnov 1.3 self.initialize_(opts)
76 nsmirnov 1.1
77     return
78    
79 nsmirnov 1.7 def version():
80 nsmirnov 1.1 return common.prog_version_str
81    
82 nsmirnov 1.7 version = staticmethod(version)
83    
84 nsmirnov 1.3 def initialize_(self, opts):
85 nsmirnov 1.1
86     # Process the '-continue' option first because
87     # in the case of continuation the CRAB configuration
88     # parameters are loaded from already existing Working Space.
89 nsmirnov 1.3 self.processContinueOption_(opts)
90 nsmirnov 1.1
91     # Process ini-file first, then command line options
92     # because they override ini-file settings.
93    
94 nsmirnov 1.3 self.processIniFile_(opts)
95 nsmirnov 1.1
96 nsmirnov 1.3 if self.flag_continue: opts = self.loadConfiguration_(opts)
97 nsmirnov 1.1
98 nsmirnov 1.3 self.processOptions_(opts)
99 nsmirnov 1.1
100 slacapra 1.77 common.taskDB = TaskDB()
101    
102 slacapra 1.101
103 nsmirnov 1.1 if not self.flag_continue:
104 nsmirnov 1.3 self.createWorkingSpace_()
105 slacapra 1.9 optsToBeSaved={}
106     for it in opts.keys():
107     if (it in self.main_actions) or (it in self.aux_actions) or (it == '-debug'):
108     pass
109     else:
110     optsToBeSaved[it]=opts[it]
111 slacapra 1.101 # store in taskDB the opts
112     common.taskDB.setDict(it[1:], optsToBeSaved[it])
113 slacapra 1.9 common.work_space.saveConfiguration(optsToBeSaved, self.cfg_fname)
114 nsmirnov 1.1 pass
115 slacapra 1.77 else:
116     common.taskDB.load()
117 nsmirnov 1.1
118     # At this point all configuration options have been read.
119    
120     args = string.join(sys.argv,' ')
121 slacapra 1.11
122 nsmirnov 1.3 self.updateHistory_(args)
123 slacapra 1.11
124 nsmirnov 1.3 self.createLogger_(args)
125 slacapra 1.11
126 nsmirnov 1.1 common.jobDB = JobDB()
127 corvo 1.51
128 gutsche 1.121 # init BlackWhiteListParser
129     self.blackWhiteListParser = BlackWhiteListParser(self.cfg_params)
130    
131 spiga 1.107 self.UseServer=0
132     try:
133     self.UseServer=int(self.cfg_params['CRAB.server_mode'])
134     except KeyError:
135     pass
136    
137    
138 corvo 1.63 if int(self.cfg_params['USER.activate_monalisa']): self.cfg_params['apmon'] = ApmonIf()
139    
140 nsmirnov 1.3 if self.flag_continue:
141 slacapra 1.12 try:
142     common.jobDB.load()
143 corvo 1.50 self.cfg_params['taskId'] = common.jobDB._jobs[0].taskId
144 slacapra 1.12 common.logger.debug(6, str(common.jobDB))
145     except DBException,e:
146     pass
147 nsmirnov 1.3 pass
148 slacapra 1.11
149 nsmirnov 1.3 self.createScheduler_()
150 slacapra 1.11
151 slacapra 1.106 common.logger.debug(6, 'Used properties:')
152     if (common.logger.debugLevel()<6 ):
153     common.logger.write('Used properties:')
154     keys = self.cfg_params.keys()
155     keys.sort()
156     for k in keys:
157     if self.cfg_params[k]:
158     common.logger.debug(6, ' '+k+' : '+str(self.cfg_params[k]))
159     if (common.logger.debugLevel()<6 ):
160     common.logger.write(' '+k+' : '+str(self.cfg_params[k]))
161     pass
162     else:
163     common.logger.debug(6, ' '+k+' : ')
164     if (common.logger.debugLevel()<6 ):
165     common.logger.write(' '+k+' : ')
166 nsmirnov 1.3 pass
167     pass
168 slacapra 1.106 common.logger.debug(6, 'End of used properties.\n')
169     if (common.logger.debugLevel()<6 ):
170     common.logger.write('End of used properties.\n')
171    
172 nsmirnov 1.3 self.initializeActions_(opts)
173 nsmirnov 1.1 return
174    
175 nsmirnov 1.3 def processContinueOption_(self,opts):
176 nsmirnov 1.1
177     continue_dir = None
178 nsmirnov 1.4
179     # Look for the '-continue' option.
180    
181 nsmirnov 1.1 for opt in opts.keys():
182     if ( opt in ('-continue','-c') ):
183     self.flag_continue = 1
184     val = opts[opt]
185     if val:
186     if val[0] == '/': continue_dir = val # abs path
187     else: continue_dir = self.cwd + val # rel path
188     pass
189 nsmirnov 1.4 break
190     pass
191    
192     # Look for actions which has sense only with '-continue'
193    
194     if not self.flag_continue:
195     for opt in opts.keys():
196 slacapra 1.102 if ( opt in (self.aux_actions)):
197 nsmirnov 1.4 self.flag_continue = 1
198     break
199 nsmirnov 1.1 pass
200     pass
201 slacapra 1.102 if ("-submit" in opts.keys() and "-create" not in opts.keys() ):
202     self.flag_continue = 1
203 nsmirnov 1.1
204     if not self.flag_continue: return
205    
206     if not continue_dir:
207     prefix = common.prog_name + '_' + self.name + '_'
208     continue_dir = findLastWorkDir(prefix)
209     pass
210    
211     if not continue_dir:
212     raise CrabException('Cannot find last working directory.')
213    
214     if not os.path.exists(continue_dir):
215     msg = 'Cannot continue because the working directory <'
216     msg += continue_dir
217     msg += '> does not exist.'
218     raise CrabException(msg)
219    
220     # Instantiate WorkSpace
221 fanzago 1.49 common.work_space = WorkSpace(continue_dir, self.cfg_params)
222 nsmirnov 1.1
223     return
224    
225 nsmirnov 1.3 def processIniFile_(self, opts):
226 nsmirnov 1.1 """
227     Processes a configuration INI-file.
228     """
229    
230     # Extract cfg-file name from the cmd-line options.
231    
232     for opt in opts.keys():
233     if ( opt == '-cfg' ):
234     if self.flag_continue:
235     raise CrabException('-continue and -cfg cannot coexist.')
236     if opts[opt] : self.cfg_fname = opts[opt]
237     else : usage()
238     pass
239    
240     elif ( opt == '-name' ):
241     self.name = opts[opt]
242     pass
243    
244     pass
245    
246     # Set default cfg-fname
247    
248     if self.cfg_fname == None:
249     if self.flag_continue:
250     self.cfg_fname = common.work_space.cfgFileName()
251     else:
252     self.cfg_fname = common.prog_name+'.cfg'
253     pass
254     pass
255    
256     # Load cfg-file
257    
258     if string.lower(self.cfg_fname) != 'none':
259     if os.path.exists(self.cfg_fname):
260     self.cfg_params = loadConfig(self.cfg_fname)
261 corvo 1.64 self.cfg_params['user'] = os.environ['USER']
262 nsmirnov 1.1 pass
263     else:
264     msg = 'cfg-file '+self.cfg_fname+' not found.'
265     raise CrabException(msg)
266     pass
267     pass
268    
269     # process the [CRAB] section
270    
271     lhp = len('CRAB.')
272     for k in self.cfg_params.keys():
273     if len(k) >= lhp and k[:lhp] == 'CRAB.':
274     opt = '-'+k[lhp:]
275     if len(opt) >= 3 and opt[:3] == '-__': continue
276     if opt not in opts.keys():
277     opts[opt] = self.cfg_params[k]
278     pass
279     pass
280     pass
281    
282     return
283    
284 nsmirnov 1.3 def processOptions_(self, opts):
285 nsmirnov 1.1 """
286     Processes the command-line options.
287     """
288    
289     for opt in opts.keys():
290     val = opts[opt]
291    
292 nsmirnov 1.3 # Skip actions, they are processed later in initializeActions_()
293     if opt in self.main_actions:
294     self.cfg_params['CRAB.'+opt[1:]] = val
295     continue
296     if opt in self.aux_actions:
297     self.cfg_params['CRAB.'+opt[1:]] = val
298     continue
299 nsmirnov 1.1
300 spiga 1.107 elif ( opt == '-server_mode' ): #Add for server mode usage
301     pass
302     elif ( opt == '-server_name' ):
303     pass
304 nsmirnov 1.1
305     elif ( opt == '-cfg' ):
306     pass
307    
308     elif ( opt in ('-continue', '-c') ):
309 nsmirnov 1.4 # Already processed in processContinueOption_()
310 nsmirnov 1.1 pass
311    
312     elif ( opt == '-jobtype' ):
313     if val : self.job_type_name = string.upper(val)
314     else : usage()
315     pass
316    
317     elif ( opt == '-Q' ):
318     self.flag_quiet = 1
319     pass
320    
321     elif ( opt == '-debug' ):
322 slacapra 1.6 if val: self.debug_level = int(val)
323     else: self.debug_level = 1
324 nsmirnov 1.1 pass
325    
326     elif ( opt == '-scheduler' ):
327     pass
328 slacapra 1.22
329 nsmirnov 1.3 elif string.find(opt,'.') == -1:
330     print common.prog_name+'. Unrecognized option '+opt
331     usage()
332     pass
333 nsmirnov 1.1
334 nsmirnov 1.3 # Override config parameters from INI-file with cmd-line params
335     if string.find(opt,'.') == -1 :
336     self.cfg_params['CRAB.'+opt[1:]] = val
337 nsmirnov 1.1 pass
338 nsmirnov 1.3 else:
339 nsmirnov 1.1 # Command line parameters in the form -SECTION.ENTRY=VALUE
340     self.cfg_params[opt[1:]] = val
341     pass
342     pass
343     return
344    
345 slacapra 1.8 def parseRange_(self, aRange):
346 nsmirnov 1.4 """
347 slacapra 1.8 Takes as the input a string with a range defined in any of the following
348     way, including combination, and return a tuple with the ints defined
349     according to following table. A consistency check is done.
350     NB: the first job is "1", not "0".
351     'all' -> [1,2,..., NJobs]
352     '' -> [1,2,..., NJobs]
353     'n1' -> [n1]
354     'n1-n2' -> [n1, n1+1, n1+2, ..., n2-1, n2]
355     'n1,n2' -> [n1, n2]
356     'n1,n2-n3' -> [n1, n2, n2+1, n2+2, ..., n3-1, n3]
357     """
358     result = []
359    
360 slacapra 1.9 common.logger.debug(5,"parseRange_ "+str(aRange))
361     if aRange=='all' or aRange==None or aRange=='':
362 slacapra 1.73 result=range(1,common.jobDB.nJobs()+1)
363 slacapra 1.8 return result
364 slacapra 1.9 elif aRange=='0':
365     return result
366 slacapra 1.8
367     subRanges = string.split(aRange, ',')
368     for subRange in subRanges:
369     result = result+self.parseSimpleRange_(subRange)
370    
371     if self.checkUniqueness_(result):
372     return result
373     else:
374 slacapra 1.33 common.logger.message("Error "+result)
375 slacapra 1.8 return []
376    
377     def checkUniqueness_(self, list):
378     """
379 slacapra 1.9 check if a list contains only unique elements
380 slacapra 1.8 """
381    
382     uniqueList = []
383     # use a list comprehension statement (takes a while to understand)
384    
385     [uniqueList.append(it) for it in list if not uniqueList.count(it)]
386    
387     return (len(list)==len(uniqueList))
388    
389 spiga 1.107 def uuidgen(self):
390     """Generate a UUID"""
391     return commands.getoutput('uuidgen')
392    
393    
394 slacapra 1.8 def parseSimpleRange_(self, aRange):
395     """
396     Takes as the input a string with two integers separated by
397     the minus sign and returns the tuple with these numbers:
398     'n1-n2' -> [n1, n1+1, n1+2, ..., n2-1, n2]
399     'n1' -> [n1]
400     """
401     (start, end) = (None, None)
402    
403     result = []
404     minus = string.find(aRange, '-')
405     if ( minus < 0 ):
406     if isInt(aRange) and int(aRange)>0:
407 corvo 1.97 # FEDE
408     #result.append(int(aRange)-1)
409     ###
410 slacapra 1.62 result.append(int(aRange))
411 slacapra 1.8 else:
412 spiga 1.47 common.logger.message("parseSimpleRange_ ERROR "+aRange)
413     usage()
414     pass
415    
416 nsmirnov 1.4 pass
417     else:
418 slacapra 1.8 (start, end) = string.split(aRange, '-')
419     if isInt(start) and isInt(end) and int(start)>0 and int(start)<int(end):
420 corvo 1.97 #result=range(int(start)-1, int(end))
421     result=range(int(start), int(end)+1) #Daniele
422 slacapra 1.8 else:
423 slacapra 1.33 common.logger.message("parseSimpleRange_ ERROR "+start+end)
424 slacapra 1.8
425     return result
426 nsmirnov 1.4
427 nsmirnov 1.3 def initializeActions_(self, opts):
428 nsmirnov 1.1 """
429     For each user action instantiate a corresponding
430     object and put it in the action dictionary.
431     """
432 spiga 1.15
433     for opt in opts.keys():
434    
435 nsmirnov 1.1 val = opts[opt]
436 spiga 1.15
437    
438     if ( opt == '-create' ):
439 gutsche 1.83 if val and val != 'all':
440     msg = 'Per default, CRAB will create all jobs as specified in the crab.cfg file, not the command line!'
441     common.logger.message(msg)
442     msg = 'Submission will still take into account the number of jobs specified on the command line!\n'
443     common.logger.message(msg)
444 slacapra 1.82 ncjobs = 'all'
445    
446     # Instantiate Creator object
447     self.creator = Creator(self.job_type_name,
448     self.cfg_params,
449     ncjobs)
450     self.actions[opt] = self.creator
451    
452     # Initialize the JobDB object if needed
453     if not self.flag_continue:
454     common.jobDB.create(self.creator.nJobs())
455 nsmirnov 1.1 pass
456 nsmirnov 1.3
457 slacapra 1.82 # Create and initialize JobList
458 nsmirnov 1.3
459 slacapra 1.82 common.job_list = JobList(common.jobDB.nJobs(),
460     self.creator.jobType())
461 nsmirnov 1.3
462 slacapra 1.82 common.taskDB.setDict('ScriptName',common.work_space.jobDir()+"/"+self.job_type_name+'.sh')
463     common.taskDB.setDict('JdlName',common.work_space.jobDir()+"/"+self.job_type_name+'.jdl')
464     common.taskDB.setDict('CfgName',common.work_space.jobDir()+"/"+self.creator.jobType().configFilename())
465     common.job_list.setScriptNames(self.job_type_name+'.sh')
466     common.job_list.setJDLNames(self.job_type_name+'.jdl')
467     common.job_list.setCfgNames(self.creator.jobType().configFilename())
468 slacapra 1.9
469 slacapra 1.82 self.creator.writeJobsSpecsToDB()
470 spiga 1.107 taskUnicId= self.uuidgen()
471     common.taskDB.setDict('TasKUUID',taskUnicId)
472 slacapra 1.77
473 slacapra 1.82 common.taskDB.save()
474 nsmirnov 1.3 pass
475 nsmirnov 1.1
476     elif ( opt == '-submit' ):
477 spiga 1.107 # modified to support server mode
478     if (self.UseServer== 1):
479 mcinquil 1.119 from SubmitterServer import SubmitterServer
480 farinafa 1.125 self.actions[opt] = SubmitterServer(self.cfg_params, self.parseRange_(val), val)
481 spiga 1.107 else:
482     # modified to support server mode
483     # get user request
484     nsjobs = -1
485     if val:
486 farinafa 1.125 if val=='all':
487 spiga 1.107 pass
488 farinafa 1.125 elif (type(eval(val)) is int) and eval(val) > 0:
489     # positive number
490     nsjobs = eval(val)
491     # NEW PART # Fabio
492     # put here code for LIST MANAGEMEN
493     elif (type(eval(val)) is tuple)or( type(eval(val)) is int and eval(val)<0 ) :
494     chosenJobsList = self.parseRange_(val)
495     chosenJobsList = [i-1 for i in chosenJobsList ]
496     nsjobs = len(chosenJobsList)
497     #
498 spiga 1.107 else:
499     msg = 'Bad submission option <'+str(val)+'>\n'
500     msg += ' Must be an integer or "all"'
501     msg += ' Generic range is not allowed"'
502     raise CrabException(msg)
503     pass
504    
505     common.logger.debug(5,'nsjobs '+str(nsjobs))
506     # total jobs
507     nj_list = []
508     # get the first not already submitted
509     common.logger.debug(5,'Total jobs '+str(common.jobDB.nJobs()))
510     jobSetForSubmission = 0
511 gutsche 1.110 jobSkippedInSubmission = []
512 slacapra 1.114 datasetpath=self.cfg_params['CMSSW.datasetpath']
513 farinafa 1.125
514     # NEW PART # Fabio
515     # modified to handle list of jobs by the users # Fabio
516     tmp_jList = range(common.jobDB.nJobs())
517     if chosenJobsList != None:
518     tmp_jList = chosenJobsList
519     # build job list
520     for nj in tmp_jList:
521     cleanedBlackWhiteList = self.blackWhiteListParser.cleanForBlackWhiteList(common.jobDB.destination(nj)) # More readable # Fabio
522     if (cleanedBlackWhiteList != '') or (datasetpath == None ):
523 gutsche 1.110 if (common.jobDB.status(nj) not in ['R','S','K','Y','A','D','Z']):
524     jobSetForSubmission +=1
525     nj_list.append(nj)
526 farinafa 1.125 else:
527     continue
528 gutsche 1.110 else :
529     jobSkippedInSubmission.append(nj+1)
530 farinafa 1.125 #
531 spiga 1.107 if nsjobs >0 and nsjobs == jobSetForSubmission:
532     break
533     pass
534 farinafa 1.125 del tmp_jList
535     #
536    
537 spiga 1.107 if nsjobs>jobSetForSubmission:
538     common.logger.message('asking to submit '+str(nsjobs)+' jobs, but only '+str(jobSetForSubmission)+' left: submitting those')
539 gutsche 1.110 if len(jobSkippedInSubmission) > 0 :
540     common.logger.message("Jobs: " + spanRanges(jobSkippedInSubmission) + " skipped because no sites are hosting this data")
541 spiga 1.107 # submit N from last submitted job
542     common.logger.debug(5,'nj_list '+str(nj_list))
543    
544     if len(nj_list) != 0:
545     # Instantiate Submitter object
546     self.actions[opt] = Submitter(self.cfg_params, nj_list)
547     # Create and initialize JobList
548     if len(common.job_list) == 0 :
549     common.job_list = JobList(common.jobDB.nJobs(),
550     None)
551     common.job_list.setJDLNames(self.job_type_name+'.jdl')
552     pass
553 slacapra 1.23 pass
554 slacapra 1.22 else:
555 spiga 1.107 common.logger.message('No jobs left to submit: exiting...')
556 nsmirnov 1.1 pass
557    
558 nsmirnov 1.4 elif ( opt == '-list' ):
559 slacapra 1.8 jobs = self.parseRange_(val)
560    
561     common.jobDB.dump(jobs)
562 nsmirnov 1.4 pass
563    
564 slacapra 1.67 elif ( opt == '-printId' ):
565 spiga 1.115 # modified to support server mode
566     if (self.UseServer== 1):
567     try:
568     common.taskDB.load()
569     WorkDirName =os.path.basename(os.path.split(common.work_space.topDir())[0])
570     projectUniqName = 'crab_'+str(WorkDirName)+'_'+common.taskDB.dict('TasKUUID')
571     print "Task Id = %-40s " %(projectUniqName)
572     except:
573     common.logger.message("Warning :Interaction in query task unique ID failed")
574     pass
575     else:
576     try:
577 slacapra 1.117 common.scheduler.bossTask.load(ALL)
578 spiga 1.115 except RuntimeError,e:
579     common.logger.message( e.__str__() )
580     except ValueError,e:
581     common.logger.message("Warning : Scheduler interaction in query operation failed for jobs:")
582     common.logger.message(e.what())
583     pass
584     task = common.scheduler.bossTask.jobsDict()
585    
586     for c, v in task.iteritems():
587     k = int(c)
588     nj=k
589     id = v['CHAIN_ID']
590     jid = v['SCHED_ID']
591     if jid:
592     print "Job: %-5s Id = %-40s " %(id,jid)
593     #else:
594     # print "Job: ",id," No ID yet"
595 corvo 1.97 pass
596 slacapra 1.67
597 nsmirnov 1.4 elif ( opt == '-status' ):
598 spiga 1.107 # modified to support server mode
599     if (self.UseServer== 1):
600 mcinquil 1.119 from StatusServer import StatusServer
601 spiga 1.107 self.actions[opt] = StatusServer(self.cfg_params)
602     else:
603     jobs = self.parseRange_(val)
604    
605     if len(jobs) != 0:
606     self.actions[opt] = StatusBoss(self.cfg_params)
607     pass
608 slacapra 1.22
609 nsmirnov 1.4 elif ( opt == '-kill' ):
610 slacapra 1.8
611 corvo 1.109 if (self.UseServer== 1):
612 mcinquil 1.119 from KillerServer import KillerServer
613 farinafa 1.120
614     # Matteo for server kill by range
615     if val:
616     if val !='all':
617     val = self.parseRange_(val)
618     else:
619     val='all'
620    
621     self.actions[opt] = KillerServer(self.cfg_params,val)
622    
623     #self.actions[opt] = KillerServer(self.cfg_params)
624 corvo 1.109 else:
625 farinafa 1.120 if val:
626 corvo 1.109 if val =='all':
627     jobs = common.scheduler.listBoss()
628     else:
629     jobs = self.parseRange_(val)
630     common.scheduler.cancel(jobs)
631 spiga 1.31 else:
632 corvo 1.109 common.logger.message("Warning: with '-kill' you _MUST_ specify a job range or 'all'")
633 nsmirnov 1.1
634 farinafa 1.120
635    
636 slacapra 1.70 elif ( opt == '-getoutput' or opt == '-get'):
637 spiga 1.107 # modified to support server mode
638     if (self.UseServer== 1):
639 mcinquil 1.119 from GetOutputServer import GetOutputServer
640 spiga 1.107 self.actions[opt] = GetOutputServer(self.cfg_params)
641 spiga 1.20 else:
642 spiga 1.13
643 spiga 1.107 if val=='all' or val==None or val=='':
644     jobs = common.scheduler.listBoss()
645     else:
646     jobs = self.parseRange_(val)
647    
648     jobs_done = []
649     for nj in jobs:
650     jobs_done.append(nj)
651     common.scheduler.getOutput(jobs_done)
652     pass
653 nsmirnov 1.4
654     elif ( opt == '-resubmit' ):
655 slacapra 1.78 if val=='all' or val==None or val=='':
656     jobs = common.scheduler.listBoss()
657 fanzago 1.37 else:
658 slacapra 1.78 jobs = self.parseRange_(val)
659 fanzago 1.37
660 slacapra 1.11 if val:
661     # create a list of jobs to be resubmitted.
662 corvo 1.93 val = string.replace(val,'-',':')
663 slacapra 1.8
664 slacapra 1.22 ### as before, create a Resubmittter Class
665 slacapra 1.78 maxIndex = common.scheduler.listBoss()
666 corvo 1.93 ##
667     # Marco. Vediamo se va meglio cosi'...
668     ##
669 corvo 1.97 try:
670     common.scheduler.bossTask.query(ALL, val)
671     except RuntimeError,e:
672     common.logger.message( e.__str__() )
673     except ValueError,e:
674 slacapra 1.103 common.logger.message( "Warning : Scheduler interaction in query operation failed for jobs:")
675     common.logger.message(e.what())
676 corvo 1.97 pass
677 corvo 1.93 task = common.scheduler.bossTask.jobsDict()
678    
679 slacapra 1.11 nj_list = []
680 corvo 1.93 for c, v in task.iteritems():
681 corvo 1.97 k = int(c)
682     nj=k
683 corvo 1.93 st = v['STATUS']
684    
685 corvo 1.97 if int(nj) <= int(len(maxIndex)) :
686 spiga 1.104 if st in ['K','SA','Z','DA']:
687 corvo 1.97 nj_list.append(int(nj)-1)
688     common.jobDB.setStatus(int(nj)-1,'C')
689 spiga 1.99 elif st in ['E','SE']:
690 spiga 1.60 common.scheduler.moveOutput(nj)
691 corvo 1.97 nj_list.append(int(nj)-1)
692     st = common.jobDB.setStatus(int(nj)-1,'RC')
693 corvo 1.93 elif st in ['W']:
694 corvo 1.97 common.logger.message('Job #'+`int(nj)`+' has status '+crabJobStatusToString(st)+' not yet submitted!!!')
695 spiga 1.60 pass
696 corvo 1.93 elif st in ['SD', 'OR']:
697 corvo 1.97 common.logger.message('Job #'+`int(nj)`+' has status '+crabJobStatusToString(st)+' must be retrieved before resubmission')
698 spiga 1.60 else:
699     common.logger.message('Job #'+`nj`+' has status '+crabJobStatusToString(st)+' must be "killed" before resubmission')
700 slacapra 1.11 else:
701 corvo 1.97 common.logger.message('Job #'+`int(nj)`+' no possible to resubmit!! out of range')
702 fanzago 1.37 if len(common.job_list) == 0 :
703     common.job_list = JobList(common.jobDB.nJobs(),None)
704     common.job_list.setJDLNames(self.job_type_name+'.jdl')
705     pass
706    
707 slacapra 1.11 if len(nj_list) != 0:
708 corvo 1.93 nj_list.sort()
709 slacapra 1.11 # Instantiate Submitter object
710 fanzago 1.58 self.actions[opt] = Submitter(self.cfg_params, nj_list)
711 slacapra 1.11
712 slacapra 1.8 pass
713     pass
714 slacapra 1.11 else:
715     common.logger.message("Warning: with '-resubmit' you _MUST_ specify a job range or 'all'")
716 spiga 1.60 common.logger.message("WARNING: _all_ job specified in the range will be resubmitted!!!")
717 slacapra 1.11 pass
718 fanzago 1.37 common.jobDB.save()
719 slacapra 1.8 pass
720 gutsche 1.61
721 slacapra 1.8 elif ( opt == '-cancelAndResubmit' ):
722 nsmirnov 1.5
723 slacapra 1.78 if val:
724     if val =='all':
725     jobs = common.scheduler.listBoss()
726 spiga 1.47 else:
727 slacapra 1.78 jobs = self.parseRange_(val)
728     # kill submitted jobs
729     common.scheduler.cancel(jobs)
730 spiga 1.47 else:
731 slacapra 1.78 common.logger.message("Warning: with '-cancelAndResubmit' you _MUST_ specify a job range or 'all'")
732 nsmirnov 1.5
733 spiga 1.47 # resubmit cancelled jobs.
734     if val:
735     nj_list = []
736     for nj in jobs:
737     st = common.jobDB.status(int(nj)-1)
738     if st in ['K','A']:
739 corvo 1.97 nj_list.append(int(nj)-1)
740     common.jobDB.setStatus(int(nj)-1,'C')
741 spiga 1.47 elif st == 'Y':
742     common.scheduler.moveOutput(nj)
743 corvo 1.97 nj_list.append(int(nj)-1)
744     st = common.jobDB.setStatus(int(nj)-1,'RC')
745 spiga 1.47 elif st in ['C','X']:
746 corvo 1.97 common.logger.message('Job #'+`int(nj)`+' has status '+crabJobStatusToString(st)+' not yet submitted!!!')
747 spiga 1.47 pass
748     elif st == 'D':
749 corvo 1.97 common.logger.message('Job #'+`int(nj)`+' has status '+crabJobStatusToString(st)+' must be retrieved before resubmission')
750 spiga 1.47 else:
751     common.logger.message('Job #'+`nj`+' has status '+crabJobStatusToString(st)+' must be "killed" before resubmission')
752     pass
753    
754 nsmirnov 1.5 if len(common.job_list) == 0 :
755 spiga 1.47 common.job_list = JobList(common.jobDB.nJobs(),None)
756 nsmirnov 1.5 common.job_list.setJDLNames(self.job_type_name+'.jdl')
757     pass
758 spiga 1.47
759     if len(nj_list) != 0:
760 spiga 1.86 # common.scheduler.resubmit(nj_list)
761 fanzago 1.58 self.actions[opt] = Submitter(self.cfg_params, nj_list)
762 spiga 1.47 pass
763     pass
764     else:
765     common.logger.message("WARNING: _all_ job specified in the rage will be cancelled and resubmitted!!!")
766 nsmirnov 1.5 pass
767 spiga 1.47 common.jobDB.save()
768 nsmirnov 1.4 pass
769 spiga 1.47
770 slacapra 1.28 elif ( opt == '-testJdl' ):
771 slacapra 1.8 jobs = self.parseRange_(val)
772     nj_list = []
773     for nj in jobs:
774 slacapra 1.62 st = common.jobDB.status(nj-1)
775     if st == 'C': nj_list.append(nj-1)
776 slacapra 1.8 pass
777    
778     if len(nj_list) != 0:
779 slacapra 1.77 # Instantiate Checker object
780 slacapra 1.8 self.actions[opt] = Checker(self.cfg_params, nj_list)
781    
782     # Create and initialize JobList
783    
784     if len(common.job_list) == 0 :
785     common.job_list = JobList(common.jobDB.nJobs(), None)
786     common.job_list.setJDLNames(self.job_type_name+'.jdl')
787     pass
788     pass
789    
790 slacapra 1.9 elif ( opt == '-postMortem' ):
791 corvo 1.95
792 spiga 1.108 # modified to support server mode
793     if (self.UseServer== 1):
794 mcinquil 1.119 from PostMortemServer import PostMortemServer
795 spiga 1.108 self.actions[opt] = PostMortemServer(self.cfg_params)
796     else:
797     if val:
798     val = string.replace(val,'-',':')
799     else: val=''
800     nj_list = {}
801 corvo 1.97
802 spiga 1.108 try:
803     common.scheduler.bossTask.query(ALL, val)
804     except RuntimeError,e:
805     common.logger.message( e.__str__() )
806     except ValueError,e:
807     common.logger.message("Warning : Scheduler interaction in query operation failed for jobs:")
808     common.logger.message( e.what())
809     pass
810 corvo 1.93
811 spiga 1.108 task = common.scheduler.bossTask.jobsDict()
812    
813     for c, v in task.iteritems():
814     k = int(c)
815     nj=k
816     if v['SCHED_ID']: nj_list[v['CHAIN_ID']]=v['SCHED_ID']
817     pass
818 slacapra 1.9
819 spiga 1.108 if len(nj_list) != 0:
820 corvo 1.93 # Instantiate PostMortem object
821 spiga 1.108 self.actions[opt] = PostMortem(self.cfg_params, nj_list)
822 slacapra 1.9 # Create and initialize JobList
823 spiga 1.108 if len(common.job_list) == 0 :
824     common.job_list = JobList(common.jobDB.nJobs(), None)
825     common.job_list.setJDLNames(self.job_type_name+'.jdl')
826     pass
827 slacapra 1.9 pass
828 spiga 1.108 else:
829     common.logger.message("No jobs to analyze")
830 slacapra 1.9
831     elif ( opt == '-clean' ):
832     if val != None:
833     raise CrabException("No range allowed for '-clean'")
834 mcinquil 1.118 if (self.UseServer== 1):
835 mcinquil 1.119 from CleanerServer import CleanerServer
836 mcinquil 1.118 self.actions[opt] = CleanerServer(self.cfg_params)
837     else:
838     self.actions[opt] = Cleaner(self.cfg_params)
839    
840 fanzago 1.111 ### FEDE DBS/DLS OUTPUT PUBLICATION
841     elif ( opt == '-publish' ):
842 slacapra 1.113 from DBS2Publisher import Publisher
843 slacapra 1.116 self.actions[opt] = Publisher(self.cfg_params)
844    
845     #precessedData=self.cfg_params['USER.publish_data_name']
846     #self.cfg_params['USER.processed_datasetname']
847     #thePublisher = Publisher(self.cfg_params)
848     # publish_exit_status = thePublisher.publish()
849     # if (publish_exit_status == '1'):
850     # common.logger.message("user data publication --> problems")
851     # else:
852     # common.logger.message("user data publication --> ok ")
853     #########################################
854 nsmirnov 1.1 pass
855     return
856    
857 nsmirnov 1.3 def createWorkingSpace_(self):
858 slacapra 1.9 new_dir = ''
859    
860     try:
861     new_dir = self.cfg_params['USER.ui_working_dir']
862 corvo 1.50 self.cfg_params['taskId'] = self.cfg_params['user'] + '_' + string.split(new_dir, '/')[len(string.split(new_dir, '/')) - 1] + '_' + self.current_time
863 slacapra 1.122 if len(string.split(new_dir, '/')) == 1:
864     new_dir = self.cwd + new_dir
865     else:
866     new_dir = new_dir
867 fanzago 1.48 if os.path.exists(new_dir):
868     if os.listdir(new_dir):
869     msg = new_dir + ' already exists and is not empty. Please remove it before create new task'
870     raise CrabException(msg)
871 slacapra 1.9 except KeyError:
872     new_dir = common.prog_name + '_' + self.name + '_' + self.current_time
873 corvo 1.50 self.cfg_params['taskId'] = self.cfg_params['user'] + '_' + new_dir
874 slacapra 1.9 new_dir = self.cwd + new_dir
875     pass
876 fanzago 1.49 common.work_space = WorkSpace(new_dir, self.cfg_params)
877 nsmirnov 1.1 common.work_space.create()
878     return
879    
880 nsmirnov 1.3 def loadConfiguration_(self, opts):
881 nsmirnov 1.1
882     save_opts = common.work_space.loadSavedOptions()
883    
884     # Override saved options with new command-line options
885    
886     for k in opts.keys():
887     save_opts[k] = opts[k]
888     pass
889    
890     # Return updated options
891     return save_opts
892    
893 nsmirnov 1.3 def createLogger_(self, args):
894 nsmirnov 1.1
895     log = Logger()
896     log.quiet(self.flag_quiet)
897     log.setDebugLevel(self.debug_level)
898     log.write(args+'\n')
899 nsmirnov 1.3 log.message(self.headerString_())
900 nsmirnov 1.1 log.flush()
901     common.logger = log
902     return
903    
904 nsmirnov 1.3 def updateHistory_(self, args):
905 nsmirnov 1.1 history_fname = common.prog_name+'.history'
906     history_file = open(history_fname, 'a')
907     history_file.write(self.current_time+': '+args+'\n')
908     history_file.close()
909     return
910    
911 nsmirnov 1.3 def headerString_(self):
912 nsmirnov 1.1 """
913     Creates a string describing program options either given in
914     the command line or their default values.
915     """
916     header = common.prog_name + ' (version ' + common.prog_version_str + \
917     ') running on ' + \
918     time.ctime(time.time())+'\n\n' + \
919     common.prog_name+'. Working options:\n'
920 fanzago 1.59 #print self.job_type_name
921 nsmirnov 1.1 header = header +\
922 slacapra 1.85 ' scheduler ' + self.cfg_params['CRAB.scheduler'] + '\n'+\
923 nsmirnov 1.1 ' job type ' + self.job_type_name + '\n'+\
924     ' working directory ' + common.work_space.topDir()\
925     + '\n'
926     return header
927    
928 nsmirnov 1.3 def createScheduler_(self):
929 nsmirnov 1.1 """
930     Creates a scheduler object instantiated by its name.
931     """
932 slacapra 1.101 klass_name = 'SchedulerBoss'
933 nsmirnov 1.1 file_name = klass_name
934     try:
935     klass = importName(file_name, klass_name)
936     except KeyError:
937     msg = 'No `class '+klass_name+'` found in file `'+file_name+'.py`'
938     raise CrabException(msg)
939     except ImportError, e:
940 slacapra 1.101 msg = 'Cannot create scheduler Boss'
941 nsmirnov 1.1 msg += ' (file: '+file_name+', class '+klass_name+'):\n'
942     msg += str(e)
943     raise CrabException(msg)
944    
945     common.scheduler = klass()
946     common.scheduler.configure(self.cfg_params)
947     return
948    
949     def run(self):
950     """
951     For each
952     """
953    
954     for act in self.main_actions:
955     if act in self.actions.keys(): self.actions[act].run()
956     pass
957    
958     for act in self.aux_actions:
959     if act in self.actions.keys(): self.actions[act].run()
960     pass
961     return
962    
963     ###########################################################################
964     def processHelpOptions(opts):
965    
966 slacapra 1.11 if len(opts):
967     for opt in opts.keys():
968     if opt in ('-v', '-version', '--version') :
969     print Crab.version()
970     return 1
971     if opt in ('-h','-help','--help') :
972     if opts[opt] : help(opts[opt])
973     else: help()
974     return 1
975     else:
976     usage()
977 corvo 1.93
978 nsmirnov 1.1 return 0
979    
980 corvo 1.93 ###########################################################################
981 nsmirnov 1.1 if __name__ == '__main__':
982 slacapra 1.92 ## Get rid of some useless warning
983 slacapra 1.94 try:
984     import warnings
985     warnings.simplefilter("ignore", RuntimeWarning)
986     except:
987     pass # too bad, you'll get the warning
988 corvo 1.93
989 spiga 1.123 os.putenv("PATH", definePath("new") )
990    
991 nsmirnov 1.1 # Parse command-line options and create a dictionary with
992     # key-value pairs.
993    
994     options = parseOptions(sys.argv[1:])
995    
996     # Process "help" options, such as '-help', '-version'
997    
998 slacapra 1.11 if processHelpOptions(options) : sys.exit(0)
999 nsmirnov 1.1
1000     # Create, initialize, and run a Crab object
1001    
1002     try:
1003 nsmirnov 1.3 crab = Crab(options)
1004 nsmirnov 1.1 crab.run()
1005 corvo 1.54 crab.cfg_params['apmon'].free()
1006 nsmirnov 1.1 except CrabException, e:
1007     print '\n' + common.prog_name + ': ' + str(e) + '\n'
1008     if common.logger:
1009     common.logger.write('ERROR: '+str(e)+'\n')
1010     pass
1011     pass
1012    
1013     pass