ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/crab.py
Revision: 1.138
Committed: Wed Nov 7 13:42:09 2007 UTC (17 years, 5 months ago) by farinafa
Content type: text/x-python
Branch: MAIN
Changes since 1.137: +16 -23 lines
Log Message:
Fixes for -create -submit use case plus some code cleaning

File Contents

# User Rev Content
1 elmer 1.65 #!/usr/bin/env python
2 farinafa 1.133 import sys, os, time, string, commands
3    
4     ## pre-import env configuratin steps
5     def dropOutPy23dynLoads():
6     if '-create' not in sys.argv:
7     return
8     for p in sys.path:
9     if p.find( "python2.3/lib-dynload" ) != -1 :
10     sys.path.pop( sys.path.index(p) )
11     # this is needed to remove interferences between LCG and CMSSW envs
12 farinafa 1.138 dropOutPy23dynLoads()
13 farinafa 1.133
14     ## actual import session
15 nsmirnov 1.1 from crab_help import *
16     from crab_util import *
17     from crab_exceptions import *
18     from crab_logger import Logger
19     from WorkSpace import WorkSpace
20     from JobDB import JobDB
21 slacapra 1.77 from TaskDB import TaskDB
22 nsmirnov 1.3 from JobList import JobList
23 nsmirnov 1.1 from Creator import Creator
24     from Submitter import Submitter
25 slacapra 1.8 from Checker import Checker
26 slacapra 1.9 from PostMortem import PostMortem
27     from Status import Status
28 spiga 1.16 from StatusBoss import StatusBoss
29 corvo 1.42 from ApmonIf import ApmonIf
30 slacapra 1.36 from Cleaner import Cleaner
31 nsmirnov 1.1 import common
32 spiga 1.13 import Statistic
33 gutsche 1.121 from BlackWhiteListParser import BlackWhiteListParser
34 nsmirnov 1.1
35 farinafa 1.133 # from BossSession import *
36 corvo 1.93
37 spiga 1.107 #modified to support server mode
38 mcinquil 1.119 #from SubmitterServer import SubmitterServer
39     #from GetOutputServer import GetOutputServer
40     #from StatusServer import StatusServer
41     #from PostMortemServer import PostMortemServer
42     #from KillerServer import KillerServer
43     #from CleanerServer import CleanerServer
44 spiga 1.107
45 nsmirnov 1.1 ###########################################################################
46     class Crab:
47 nsmirnov 1.3 def __init__(self, opts):
48 fanzago 1.76 ## test_tag
49 nsmirnov 1.1 # The order of main_actions is important !
50 fanzago 1.111 self.main_actions = [ '-create', '-submit' ]
51     ### FEDE new option "-publish" FOR DBS OUTPUT PUBLICATION
52 slacapra 1.70 self.aux_actions = [ '-list', '-kill', '-status', '-getoutput','-get',
53 slacapra 1.67 '-resubmit' , '-cancelAndResubmit', '-testJdl', '-postMortem', '-clean',
54 fanzago 1.111 '-printId', '-publish' ]
55 nsmirnov 1.1
56     # Dictionary of actions, e.g. '-create' -> object of class Creator
57     self.actions = {}
58 fanzago 1.76
59 nsmirnov 1.1 # Configuration file
60     self.cfg_fname = None
61     # Dictionary with configuration parameters
62     self.cfg_params = {}
63    
64     # Current working directory
65     self.cwd = os.getcwd()+'/'
66     # Current time in format 'yymmdd_hhmmss'
67     self.current_time = time.strftime('%y%m%d_%H%M%S',
68     time.localtime(time.time()))
69    
70 nsmirnov 1.3 # Session name (?) Do we need this ?
71 nsmirnov 1.1 self.name = '0'
72    
73     # Job type
74     self.job_type_name = None
75    
76     # Continuation flag
77     self.flag_continue = 0
78    
79     # quiet mode, i.e. no output on screen
80     self.flag_quiet = 0
81     # produce more output
82     self.debug_level = 0
83    
84    
85 nsmirnov 1.3 self.initialize_(opts)
86 nsmirnov 1.1
87     return
88    
89 nsmirnov 1.7 def version():
90 nsmirnov 1.1 return common.prog_version_str
91    
92 nsmirnov 1.7 version = staticmethod(version)
93    
94 nsmirnov 1.3 def initialize_(self, opts):
95 nsmirnov 1.1
96     # Process the '-continue' option first because
97     # in the case of continuation the CRAB configuration
98     # parameters are loaded from already existing Working Space.
99 nsmirnov 1.3 self.processContinueOption_(opts)
100 nsmirnov 1.1
101     # Process ini-file first, then command line options
102     # because they override ini-file settings.
103    
104 nsmirnov 1.3 self.processIniFile_(opts)
105 nsmirnov 1.1
106 nsmirnov 1.3 if self.flag_continue: opts = self.loadConfiguration_(opts)
107 nsmirnov 1.1
108 nsmirnov 1.3 self.processOptions_(opts)
109 nsmirnov 1.1
110 slacapra 1.77 common.taskDB = TaskDB()
111    
112 slacapra 1.101
113 nsmirnov 1.1 if not self.flag_continue:
114 nsmirnov 1.3 self.createWorkingSpace_()
115 slacapra 1.9 optsToBeSaved={}
116     for it in opts.keys():
117     if (it in self.main_actions) or (it in self.aux_actions) or (it == '-debug'):
118     pass
119     else:
120     optsToBeSaved[it]=opts[it]
121 slacapra 1.101 # store in taskDB the opts
122     common.taskDB.setDict(it[1:], optsToBeSaved[it])
123 slacapra 1.9 common.work_space.saveConfiguration(optsToBeSaved, self.cfg_fname)
124 nsmirnov 1.1 pass
125 slacapra 1.77 else:
126     common.taskDB.load()
127 nsmirnov 1.1
128     # At this point all configuration options have been read.
129    
130     args = string.join(sys.argv,' ')
131 slacapra 1.11
132 nsmirnov 1.3 self.updateHistory_(args)
133 slacapra 1.11
134 nsmirnov 1.3 self.createLogger_(args)
135 slacapra 1.11
136 nsmirnov 1.1 common.jobDB = JobDB()
137 corvo 1.51
138 gutsche 1.121 # init BlackWhiteListParser
139     self.blackWhiteListParser = BlackWhiteListParser(self.cfg_params)
140    
141 spiga 1.107 self.UseServer=0
142     try:
143     self.UseServer=int(self.cfg_params['CRAB.server_mode'])
144     except KeyError:
145     pass
146    
147    
148 corvo 1.63 if int(self.cfg_params['USER.activate_monalisa']): self.cfg_params['apmon'] = ApmonIf()
149    
150 nsmirnov 1.3 if self.flag_continue:
151 slacapra 1.12 try:
152     common.jobDB.load()
153 corvo 1.50 self.cfg_params['taskId'] = common.jobDB._jobs[0].taskId
154 slacapra 1.12 common.logger.debug(6, str(common.jobDB))
155     except DBException,e:
156     pass
157 nsmirnov 1.3 pass
158 slacapra 1.11
159 nsmirnov 1.3 self.createScheduler_()
160 slacapra 1.11
161 slacapra 1.106 common.logger.debug(6, 'Used properties:')
162     if (common.logger.debugLevel()<6 ):
163     common.logger.write('Used properties:')
164     keys = self.cfg_params.keys()
165     keys.sort()
166     for k in keys:
167     if self.cfg_params[k]:
168     common.logger.debug(6, ' '+k+' : '+str(self.cfg_params[k]))
169     if (common.logger.debugLevel()<6 ):
170     common.logger.write(' '+k+' : '+str(self.cfg_params[k]))
171     pass
172     else:
173     common.logger.debug(6, ' '+k+' : ')
174     if (common.logger.debugLevel()<6 ):
175     common.logger.write(' '+k+' : ')
176 nsmirnov 1.3 pass
177     pass
178 slacapra 1.106 common.logger.debug(6, 'End of used properties.\n')
179     if (common.logger.debugLevel()<6 ):
180     common.logger.write('End of used properties.\n')
181    
182 nsmirnov 1.3 self.initializeActions_(opts)
183 nsmirnov 1.1 return
184    
185 nsmirnov 1.3 def processContinueOption_(self,opts):
186 nsmirnov 1.1
187     continue_dir = None
188 nsmirnov 1.4
189     # Look for the '-continue' option.
190    
191 nsmirnov 1.1 for opt in opts.keys():
192     if ( opt in ('-continue','-c') ):
193     self.flag_continue = 1
194     val = opts[opt]
195     if val:
196     if val[0] == '/': continue_dir = val # abs path
197     else: continue_dir = self.cwd + val # rel path
198     pass
199 nsmirnov 1.4 break
200     pass
201    
202     # Look for actions which has sense only with '-continue'
203    
204     if not self.flag_continue:
205     for opt in opts.keys():
206 slacapra 1.102 if ( opt in (self.aux_actions)):
207 nsmirnov 1.4 self.flag_continue = 1
208     break
209 nsmirnov 1.1 pass
210     pass
211 slacapra 1.102 if ("-submit" in opts.keys() and "-create" not in opts.keys() ):
212     self.flag_continue = 1
213 nsmirnov 1.1
214     if not self.flag_continue: return
215    
216     if not continue_dir:
217     prefix = common.prog_name + '_' + self.name + '_'
218     continue_dir = findLastWorkDir(prefix)
219     pass
220    
221     if not continue_dir:
222     raise CrabException('Cannot find last working directory.')
223    
224     if not os.path.exists(continue_dir):
225     msg = 'Cannot continue because the working directory <'
226     msg += continue_dir
227     msg += '> does not exist.'
228     raise CrabException(msg)
229    
230     # Instantiate WorkSpace
231 fanzago 1.49 common.work_space = WorkSpace(continue_dir, self.cfg_params)
232 nsmirnov 1.1
233     return
234    
235 nsmirnov 1.3 def processIniFile_(self, opts):
236 nsmirnov 1.1 """
237     Processes a configuration INI-file.
238     """
239    
240     # Extract cfg-file name from the cmd-line options.
241    
242     for opt in opts.keys():
243     if ( opt == '-cfg' ):
244     if self.flag_continue:
245     raise CrabException('-continue and -cfg cannot coexist.')
246     if opts[opt] : self.cfg_fname = opts[opt]
247     else : usage()
248     pass
249    
250     elif ( opt == '-name' ):
251     self.name = opts[opt]
252     pass
253    
254     pass
255    
256     # Set default cfg-fname
257    
258     if self.cfg_fname == None:
259     if self.flag_continue:
260     self.cfg_fname = common.work_space.cfgFileName()
261     else:
262     self.cfg_fname = common.prog_name+'.cfg'
263     pass
264     pass
265    
266     # Load cfg-file
267    
268     if string.lower(self.cfg_fname) != 'none':
269     if os.path.exists(self.cfg_fname):
270     self.cfg_params = loadConfig(self.cfg_fname)
271 corvo 1.64 self.cfg_params['user'] = os.environ['USER']
272 nsmirnov 1.1 pass
273     else:
274     msg = 'cfg-file '+self.cfg_fname+' not found.'
275     raise CrabException(msg)
276     pass
277     pass
278    
279     # process the [CRAB] section
280    
281     lhp = len('CRAB.')
282     for k in self.cfg_params.keys():
283     if len(k) >= lhp and k[:lhp] == 'CRAB.':
284     opt = '-'+k[lhp:]
285     if len(opt) >= 3 and opt[:3] == '-__': continue
286     if opt not in opts.keys():
287     opts[opt] = self.cfg_params[k]
288     pass
289     pass
290     pass
291    
292     return
293    
294 nsmirnov 1.3 def processOptions_(self, opts):
295 nsmirnov 1.1 """
296     Processes the command-line options.
297     """
298    
299     for opt in opts.keys():
300     val = opts[opt]
301    
302 nsmirnov 1.3 # Skip actions, they are processed later in initializeActions_()
303     if opt in self.main_actions:
304     self.cfg_params['CRAB.'+opt[1:]] = val
305     continue
306     if opt in self.aux_actions:
307     self.cfg_params['CRAB.'+opt[1:]] = val
308     continue
309 nsmirnov 1.1
310 spiga 1.107 elif ( opt == '-server_mode' ): #Add for server mode usage
311     pass
312     elif ( opt == '-server_name' ):
313     pass
314 nsmirnov 1.1
315     elif ( opt == '-cfg' ):
316     pass
317    
318     elif ( opt in ('-continue', '-c') ):
319 nsmirnov 1.4 # Already processed in processContinueOption_()
320 nsmirnov 1.1 pass
321    
322     elif ( opt == '-jobtype' ):
323     if val : self.job_type_name = string.upper(val)
324     else : usage()
325     pass
326    
327     elif ( opt == '-Q' ):
328     self.flag_quiet = 1
329     pass
330    
331     elif ( opt == '-debug' ):
332 slacapra 1.6 if val: self.debug_level = int(val)
333     else: self.debug_level = 1
334 nsmirnov 1.1 pass
335    
336     elif ( opt == '-scheduler' ):
337     pass
338 slacapra 1.22
339 nsmirnov 1.3 elif string.find(opt,'.') == -1:
340     print common.prog_name+'. Unrecognized option '+opt
341     usage()
342     pass
343 nsmirnov 1.1
344 nsmirnov 1.3 # Override config parameters from INI-file with cmd-line params
345     if string.find(opt,'.') == -1 :
346     self.cfg_params['CRAB.'+opt[1:]] = val
347 nsmirnov 1.1 pass
348 nsmirnov 1.3 else:
349 nsmirnov 1.1 # Command line parameters in the form -SECTION.ENTRY=VALUE
350     self.cfg_params[opt[1:]] = val
351     pass
352     pass
353     return
354    
355 slacapra 1.8 def parseRange_(self, aRange):
356 nsmirnov 1.4 """
357 slacapra 1.8 Takes as the input a string with a range defined in any of the following
358     way, including combination, and return a tuple with the ints defined
359     according to following table. A consistency check is done.
360     NB: the first job is "1", not "0".
361     'all' -> [1,2,..., NJobs]
362     '' -> [1,2,..., NJobs]
363     'n1' -> [n1]
364     'n1-n2' -> [n1, n1+1, n1+2, ..., n2-1, n2]
365     'n1,n2' -> [n1, n2]
366     'n1,n2-n3' -> [n1, n2, n2+1, n2+2, ..., n3-1, n3]
367     """
368     result = []
369 farinafa 1.128
370 slacapra 1.9 common.logger.debug(5,"parseRange_ "+str(aRange))
371     if aRange=='all' or aRange==None or aRange=='':
372 slacapra 1.73 result=range(1,common.jobDB.nJobs()+1)
373 slacapra 1.8 return result
374 slacapra 1.9 elif aRange=='0':
375     return result
376 slacapra 1.8
377 farinafa 1.128 subRanges = str(aRange).split(',') # DEPRECATED # Fabio #string.split(aRange, ',')
378 slacapra 1.8 for subRange in subRanges:
379     result = result+self.parseSimpleRange_(subRange)
380    
381     if self.checkUniqueness_(result):
382     return result
383     else:
384 mcinquil 1.132 common.logger.message( "Error " +str(result) )
385 slacapra 1.8 return []
386    
387     def checkUniqueness_(self, list):
388     """
389 slacapra 1.9 check if a list contains only unique elements
390 slacapra 1.8 """
391    
392     uniqueList = []
393     # use a list comprehension statement (takes a while to understand)
394    
395     [uniqueList.append(it) for it in list if not uniqueList.count(it)]
396    
397     return (len(list)==len(uniqueList))
398    
399 spiga 1.107 def uuidgen(self):
400     """Generate a UUID"""
401     return commands.getoutput('uuidgen')
402    
403    
404 slacapra 1.8 def parseSimpleRange_(self, aRange):
405     """
406     Takes as the input a string with two integers separated by
407     the minus sign and returns the tuple with these numbers:
408     'n1-n2' -> [n1, n1+1, n1+2, ..., n2-1, n2]
409     'n1' -> [n1]
410     """
411     (start, end) = (None, None)
412    
413     result = []
414 farinafa 1.128 minus = str(aRange).find('-') #DEPRECATED #Fabio #string.find(aRange, '-')
415 slacapra 1.8 if ( minus < 0 ):
416 farinafa 1.128 if int(aRange)>0:
417 corvo 1.97 # FEDE
418     #result.append(int(aRange)-1)
419     ###
420 slacapra 1.62 result.append(int(aRange))
421 slacapra 1.8 else:
422 spiga 1.47 common.logger.message("parseSimpleRange_ ERROR "+aRange)
423     usage()
424     pass
425    
426 nsmirnov 1.4 pass
427     else:
428 farinafa 1.128 (start, end) = str(aRange).split('-')
429 slacapra 1.8 if isInt(start) and isInt(end) and int(start)>0 and int(start)<int(end):
430 corvo 1.97 #result=range(int(start)-1, int(end))
431     result=range(int(start), int(end)+1) #Daniele
432 slacapra 1.8 else:
433 slacapra 1.33 common.logger.message("parseSimpleRange_ ERROR "+start+end)
434 slacapra 1.8
435     return result
436 nsmirnov 1.4
437 nsmirnov 1.3 def initializeActions_(self, opts):
438 nsmirnov 1.1 """
439     For each user action instantiate a corresponding
440     object and put it in the action dictionary.
441     """
442 spiga 1.15
443     for opt in opts.keys():
444    
445 nsmirnov 1.1 val = opts[opt]
446 spiga 1.15
447    
448     if ( opt == '-create' ):
449 gutsche 1.83 if val and val != 'all':
450     msg = 'Per default, CRAB will create all jobs as specified in the crab.cfg file, not the command line!'
451     common.logger.message(msg)
452     msg = 'Submission will still take into account the number of jobs specified on the command line!\n'
453     common.logger.message(msg)
454 slacapra 1.82 ncjobs = 'all'
455 farinafa 1.138
456 slacapra 1.82 # Instantiate Creator object
457     self.creator = Creator(self.job_type_name,
458     self.cfg_params,
459     ncjobs)
460     self.actions[opt] = self.creator
461    
462 farinafa 1.138 ###########################################
463     # environmet swapping # Fabio
464     ###########################################
465     if '-submit' in opts:
466     os.putenv('PATH', os.environ['AUX_SUBM_PATH'])
467     os.putenv('PYTHONPATH', os.environ['AUX_SUBM_PY'])
468     #
469     os.environ['PATH'] = os.environ['AUX_SUBM_PATH']
470     os.environ['PYTHONPATH'] = os.environ['AUX_SUBM_PY']
471     #
472     dropOutPy23dynLoads()
473     pass
474    
475 slacapra 1.82 # Initialize the JobDB object if needed
476     if not self.flag_continue:
477     common.jobDB.create(self.creator.nJobs())
478 nsmirnov 1.1 pass
479 nsmirnov 1.3
480 slacapra 1.82 # Create and initialize JobList
481     common.job_list = JobList(common.jobDB.nJobs(),
482     self.creator.jobType())
483 nsmirnov 1.3
484 slacapra 1.82 common.taskDB.setDict('ScriptName',common.work_space.jobDir()+"/"+self.job_type_name+'.sh')
485     common.taskDB.setDict('JdlName',common.work_space.jobDir()+"/"+self.job_type_name+'.jdl')
486     common.taskDB.setDict('CfgName',common.work_space.jobDir()+"/"+self.creator.jobType().configFilename())
487     common.job_list.setScriptNames(self.job_type_name+'.sh')
488     common.job_list.setJDLNames(self.job_type_name+'.jdl')
489     common.job_list.setCfgNames(self.creator.jobType().configFilename())
490 slacapra 1.9
491 slacapra 1.82 self.creator.writeJobsSpecsToDB()
492 spiga 1.107 taskUnicId= self.uuidgen()
493     common.taskDB.setDict('TasKUUID',taskUnicId)
494 slacapra 1.77
495 slacapra 1.82 common.taskDB.save()
496 nsmirnov 1.3 pass
497 nsmirnov 1.1
498     elif ( opt == '-submit' ):
499 spiga 1.107 # modified to support server mode
500     if (self.UseServer== 1):
501 mcinquil 1.119 from SubmitterServer import SubmitterServer
502 farinafa 1.125 self.actions[opt] = SubmitterServer(self.cfg_params, self.parseRange_(val), val)
503 spiga 1.107 else:
504     # modified to support server mode
505     # get user request
506     nsjobs = -1
507 mcinquil 1.126 chosenJobsList = None
508 spiga 1.107 if val:
509 farinafa 1.125 if val=='all':
510 spiga 1.107 pass
511 farinafa 1.125 elif (type(eval(val)) is int) and eval(val) > 0:
512     # positive number
513     nsjobs = eval(val)
514     # NEW PART # Fabio
515     # put here code for LIST MANAGEMEN
516     elif (type(eval(val)) is tuple)or( type(eval(val)) is int and eval(val)<0 ) :
517     chosenJobsList = self.parseRange_(val)
518     chosenJobsList = [i-1 for i in chosenJobsList ]
519     nsjobs = len(chosenJobsList)
520     #
521 spiga 1.107 else:
522     msg = 'Bad submission option <'+str(val)+'>\n'
523     msg += ' Must be an integer or "all"'
524     msg += ' Generic range is not allowed"'
525     raise CrabException(msg)
526     pass
527    
528     common.logger.debug(5,'nsjobs '+str(nsjobs))
529     # total jobs
530     nj_list = []
531     # get the first not already submitted
532     common.logger.debug(5,'Total jobs '+str(common.jobDB.nJobs()))
533     jobSetForSubmission = 0
534 gutsche 1.110 jobSkippedInSubmission = []
535 slacapra 1.114 datasetpath=self.cfg_params['CMSSW.datasetpath']
536 farinafa 1.125
537     # NEW PART # Fabio
538     # modified to handle list of jobs by the users # Fabio
539     tmp_jList = range(common.jobDB.nJobs())
540     if chosenJobsList != None:
541     tmp_jList = chosenJobsList
542     # build job list
543     for nj in tmp_jList:
544     cleanedBlackWhiteList = self.blackWhiteListParser.cleanForBlackWhiteList(common.jobDB.destination(nj)) # More readable # Fabio
545 mcinquil 1.127 if (cleanedBlackWhiteList != '') or (datasetpath == "None" ) or (datasetpath == None): ## Matty's fix
546 gutsche 1.110 if (common.jobDB.status(nj) not in ['R','S','K','Y','A','D','Z']):
547     jobSetForSubmission +=1
548     nj_list.append(nj)
549 farinafa 1.125 else:
550     continue
551 gutsche 1.110 else :
552     jobSkippedInSubmission.append(nj+1)
553 farinafa 1.125 #
554 spiga 1.107 if nsjobs >0 and nsjobs == jobSetForSubmission:
555     break
556     pass
557 farinafa 1.125 del tmp_jList
558     #
559    
560 spiga 1.107 if nsjobs>jobSetForSubmission:
561     common.logger.message('asking to submit '+str(nsjobs)+' jobs, but only '+str(jobSetForSubmission)+' left: submitting those')
562 gutsche 1.110 if len(jobSkippedInSubmission) > 0 :
563 mcinquil 1.131 #print jobSkippedInSubmission
564     #print spanRanges(jobSkippedInSubmission)
565     mess =""
566     for jobs in jobSkippedInSubmission:
567     mess += str(jobs) + ","
568     common.logger.message("Jobs: " +str(mess) + "\n skipped because no sites are hosting this data\n")
569 spiga 1.107 # submit N from last submitted job
570     common.logger.debug(5,'nj_list '+str(nj_list))
571    
572     if len(nj_list) != 0:
573     # Instantiate Submitter object
574     self.actions[opt] = Submitter(self.cfg_params, nj_list)
575 farinafa 1.138
576 spiga 1.107 # Create and initialize JobList
577     if len(common.job_list) == 0 :
578     common.job_list = JobList(common.jobDB.nJobs(),
579     None)
580     common.job_list.setJDLNames(self.job_type_name+'.jdl')
581     pass
582 slacapra 1.23 pass
583 slacapra 1.22 else:
584 spiga 1.107 common.logger.message('No jobs left to submit: exiting...')
585 nsmirnov 1.1 pass
586    
587 nsmirnov 1.4 elif ( opt == '-list' ):
588 slacapra 1.8 jobs = self.parseRange_(val)
589    
590     common.jobDB.dump(jobs)
591 nsmirnov 1.4 pass
592    
593 slacapra 1.67 elif ( opt == '-printId' ):
594 spiga 1.115 # modified to support server mode
595     if (self.UseServer== 1):
596     try:
597     common.taskDB.load()
598     WorkDirName =os.path.basename(os.path.split(common.work_space.topDir())[0])
599     projectUniqName = 'crab_'+str(WorkDirName)+'_'+common.taskDB.dict('TasKUUID')
600     print "Task Id = %-40s " %(projectUniqName)
601     except:
602     common.logger.message("Warning :Interaction in query task unique ID failed")
603     pass
604     else:
605     try:
606 slacapra 1.117 common.scheduler.bossTask.load(ALL)
607 spiga 1.115 except RuntimeError,e:
608     common.logger.message( e.__str__() )
609     except ValueError,e:
610     common.logger.message("Warning : Scheduler interaction in query operation failed for jobs:")
611     common.logger.message(e.what())
612     pass
613     task = common.scheduler.bossTask.jobsDict()
614    
615     for c, v in task.iteritems():
616     k = int(c)
617     nj=k
618     id = v['CHAIN_ID']
619     jid = v['SCHED_ID']
620     if jid:
621     print "Job: %-5s Id = %-40s " %(id,jid)
622     #else:
623     # print "Job: ",id," No ID yet"
624 corvo 1.97 pass
625 slacapra 1.67
626 nsmirnov 1.4 elif ( opt == '-status' ):
627 spiga 1.107 # modified to support server mode
628     if (self.UseServer== 1):
629 mcinquil 1.119 from StatusServer import StatusServer
630 spiga 1.107 self.actions[opt] = StatusServer(self.cfg_params)
631     else:
632     jobs = self.parseRange_(val)
633    
634     if len(jobs) != 0:
635     self.actions[opt] = StatusBoss(self.cfg_params)
636     pass
637 slacapra 1.22
638 nsmirnov 1.4 elif ( opt == '-kill' ):
639 slacapra 1.8
640 corvo 1.109 if (self.UseServer== 1):
641 mcinquil 1.130 if val:
642     if val =='all':
643     jobs = common.scheduler.listBoss()
644     else:
645     jobs = self.parseRange_(val)
646     from KillerServer import KillerServer
647     self.actions[opt] = KillerServer(self.cfg_params,val, self.parseRange_(val)) #Fabio
648     else:
649     common.logger.message("Warning: with '-kill' you _MUST_ specify a job range or 'all'")
650 corvo 1.109 else:
651 farinafa 1.120 if val:
652 corvo 1.109 if val =='all':
653     jobs = common.scheduler.listBoss()
654     else:
655     jobs = self.parseRange_(val)
656     common.scheduler.cancel(jobs)
657 spiga 1.31 else:
658 corvo 1.109 common.logger.message("Warning: with '-kill' you _MUST_ specify a job range or 'all'")
659 nsmirnov 1.1
660 farinafa 1.120
661 slacapra 1.70 elif ( opt == '-getoutput' or opt == '-get'):
662 spiga 1.107 # modified to support server mode
663     if (self.UseServer== 1):
664 mcinquil 1.119 from GetOutputServer import GetOutputServer
665 spiga 1.107 self.actions[opt] = GetOutputServer(self.cfg_params)
666 spiga 1.20 else:
667 spiga 1.107 if val=='all' or val==None or val=='':
668     jobs = common.scheduler.listBoss()
669     else:
670     jobs = self.parseRange_(val)
671    
672     jobs_done = []
673     for nj in jobs:
674     jobs_done.append(nj)
675     common.scheduler.getOutput(jobs_done)
676     pass
677 nsmirnov 1.4
678     elif ( opt == '-resubmit' ):
679 slacapra 1.78 if val=='all' or val==None or val=='':
680     jobs = common.scheduler.listBoss()
681 fanzago 1.37 else:
682 slacapra 1.78 jobs = self.parseRange_(val)
683 fanzago 1.37
684 slacapra 1.11 if val:
685     # create a list of jobs to be resubmitted.
686 corvo 1.93 val = string.replace(val,'-',':')
687 slacapra 1.8
688 slacapra 1.22 ### as before, create a Resubmittter Class
689 slacapra 1.78 maxIndex = common.scheduler.listBoss()
690 corvo 1.93 ##
691     # Marco. Vediamo se va meglio cosi'...
692     ##
693 corvo 1.97 try:
694     common.scheduler.bossTask.query(ALL, val)
695     except RuntimeError,e:
696     common.logger.message( e.__str__() )
697     except ValueError,e:
698 slacapra 1.103 common.logger.message( "Warning : Scheduler interaction in query operation failed for jobs:")
699     common.logger.message(e.what())
700 corvo 1.97 pass
701 corvo 1.93 task = common.scheduler.bossTask.jobsDict()
702    
703 slacapra 1.11 nj_list = []
704 corvo 1.93 for c, v in task.iteritems():
705 corvo 1.97 k = int(c)
706     nj=k
707 corvo 1.93 st = v['STATUS']
708    
709 corvo 1.97 if int(nj) <= int(len(maxIndex)) :
710 spiga 1.104 if st in ['K','SA','Z','DA']:
711 corvo 1.97 nj_list.append(int(nj)-1)
712     common.jobDB.setStatus(int(nj)-1,'C')
713 spiga 1.99 elif st in ['E','SE']:
714 spiga 1.60 common.scheduler.moveOutput(nj)
715 corvo 1.97 nj_list.append(int(nj)-1)
716     st = common.jobDB.setStatus(int(nj)-1,'RC')
717 corvo 1.93 elif st in ['W']:
718 corvo 1.97 common.logger.message('Job #'+`int(nj)`+' has status '+crabJobStatusToString(st)+' not yet submitted!!!')
719 spiga 1.60 pass
720 corvo 1.93 elif st in ['SD', 'OR']:
721 corvo 1.97 common.logger.message('Job #'+`int(nj)`+' has status '+crabJobStatusToString(st)+' must be retrieved before resubmission')
722 spiga 1.60 else:
723     common.logger.message('Job #'+`nj`+' has status '+crabJobStatusToString(st)+' must be "killed" before resubmission')
724 slacapra 1.11 else:
725 corvo 1.97 common.logger.message('Job #'+`int(nj)`+' no possible to resubmit!! out of range')
726 fanzago 1.37 if len(common.job_list) == 0 :
727     common.job_list = JobList(common.jobDB.nJobs(),None)
728     common.job_list.setJDLNames(self.job_type_name+'.jdl')
729     pass
730    
731 slacapra 1.11 if len(nj_list) != 0:
732 corvo 1.93 nj_list.sort()
733 slacapra 1.11 # Instantiate Submitter object
734 fanzago 1.58 self.actions[opt] = Submitter(self.cfg_params, nj_list)
735 slacapra 1.11
736 slacapra 1.8 pass
737     pass
738 slacapra 1.11 else:
739     common.logger.message("Warning: with '-resubmit' you _MUST_ specify a job range or 'all'")
740 spiga 1.60 common.logger.message("WARNING: _all_ job specified in the range will be resubmitted!!!")
741 slacapra 1.11 pass
742 fanzago 1.37 common.jobDB.save()
743 slacapra 1.8 pass
744 gutsche 1.61
745 slacapra 1.8 elif ( opt == '-cancelAndResubmit' ):
746 nsmirnov 1.5
747 slacapra 1.78 if val:
748     if val =='all':
749     jobs = common.scheduler.listBoss()
750 spiga 1.47 else:
751 slacapra 1.78 jobs = self.parseRange_(val)
752     # kill submitted jobs
753     common.scheduler.cancel(jobs)
754 spiga 1.47 else:
755 slacapra 1.78 common.logger.message("Warning: with '-cancelAndResubmit' you _MUST_ specify a job range or 'all'")
756 nsmirnov 1.5
757 spiga 1.47 # resubmit cancelled jobs.
758     if val:
759     nj_list = []
760     for nj in jobs:
761     st = common.jobDB.status(int(nj)-1)
762     if st in ['K','A']:
763 corvo 1.97 nj_list.append(int(nj)-1)
764     common.jobDB.setStatus(int(nj)-1,'C')
765 spiga 1.47 elif st == 'Y':
766     common.scheduler.moveOutput(nj)
767 corvo 1.97 nj_list.append(int(nj)-1)
768     st = common.jobDB.setStatus(int(nj)-1,'RC')
769 spiga 1.47 elif st in ['C','X']:
770 corvo 1.97 common.logger.message('Job #'+`int(nj)`+' has status '+crabJobStatusToString(st)+' not yet submitted!!!')
771 spiga 1.47 pass
772     elif st == 'D':
773 corvo 1.97 common.logger.message('Job #'+`int(nj)`+' has status '+crabJobStatusToString(st)+' must be retrieved before resubmission')
774 spiga 1.47 else:
775     common.logger.message('Job #'+`nj`+' has status '+crabJobStatusToString(st)+' must be "killed" before resubmission')
776     pass
777    
778 nsmirnov 1.5 if len(common.job_list) == 0 :
779 spiga 1.47 common.job_list = JobList(common.jobDB.nJobs(),None)
780 nsmirnov 1.5 common.job_list.setJDLNames(self.job_type_name+'.jdl')
781     pass
782 spiga 1.47
783     if len(nj_list) != 0:
784 spiga 1.86 # common.scheduler.resubmit(nj_list)
785 fanzago 1.58 self.actions[opt] = Submitter(self.cfg_params, nj_list)
786 spiga 1.47 pass
787     pass
788     else:
789     common.logger.message("WARNING: _all_ job specified in the rage will be cancelled and resubmitted!!!")
790 nsmirnov 1.5 pass
791 spiga 1.47 common.jobDB.save()
792 nsmirnov 1.4 pass
793 spiga 1.47
794 slacapra 1.28 elif ( opt == '-testJdl' ):
795 slacapra 1.8 jobs = self.parseRange_(val)
796     nj_list = []
797     for nj in jobs:
798 slacapra 1.62 st = common.jobDB.status(nj-1)
799 fanzago 1.129 if st != 'X': nj_list.append(nj-1)
800 slacapra 1.8 pass
801    
802     if len(nj_list) != 0:
803 slacapra 1.77 # Instantiate Checker object
804 slacapra 1.8 self.actions[opt] = Checker(self.cfg_params, nj_list)
805    
806     # Create and initialize JobList
807    
808     if len(common.job_list) == 0 :
809     common.job_list = JobList(common.jobDB.nJobs(), None)
810     common.job_list.setJDLNames(self.job_type_name+'.jdl')
811     pass
812     pass
813    
814 slacapra 1.9 elif ( opt == '-postMortem' ):
815 corvo 1.95
816 spiga 1.108 # modified to support server mode
817     if (self.UseServer== 1):
818 mcinquil 1.119 from PostMortemServer import PostMortemServer
819 spiga 1.108 self.actions[opt] = PostMortemServer(self.cfg_params)
820     else:
821     if val:
822     val = string.replace(val,'-',':')
823     else: val=''
824     nj_list = {}
825 corvo 1.97
826 spiga 1.108 try:
827     common.scheduler.bossTask.query(ALL, val)
828     except RuntimeError,e:
829     common.logger.message( e.__str__() )
830     except ValueError,e:
831     common.logger.message("Warning : Scheduler interaction in query operation failed for jobs:")
832     common.logger.message( e.what())
833     pass
834 corvo 1.93
835 spiga 1.108 task = common.scheduler.bossTask.jobsDict()
836    
837     for c, v in task.iteritems():
838     k = int(c)
839     nj=k
840     if v['SCHED_ID']: nj_list[v['CHAIN_ID']]=v['SCHED_ID']
841     pass
842 slacapra 1.9
843 spiga 1.108 if len(nj_list) != 0:
844 corvo 1.93 # Instantiate PostMortem object
845 spiga 1.108 self.actions[opt] = PostMortem(self.cfg_params, nj_list)
846 slacapra 1.9 # Create and initialize JobList
847 spiga 1.108 if len(common.job_list) == 0 :
848     common.job_list = JobList(common.jobDB.nJobs(), None)
849     common.job_list.setJDLNames(self.job_type_name+'.jdl')
850     pass
851 slacapra 1.9 pass
852 spiga 1.108 else:
853     common.logger.message("No jobs to analyze")
854 slacapra 1.9
855     elif ( opt == '-clean' ):
856     if val != None:
857     raise CrabException("No range allowed for '-clean'")
858 mcinquil 1.118 if (self.UseServer== 1):
859 mcinquil 1.119 from CleanerServer import CleanerServer
860 mcinquil 1.118 self.actions[opt] = CleanerServer(self.cfg_params)
861     else:
862     self.actions[opt] = Cleaner(self.cfg_params)
863    
864 fanzago 1.111 ### FEDE DBS/DLS OUTPUT PUBLICATION
865     elif ( opt == '-publish' ):
866 slacapra 1.113 from DBS2Publisher import Publisher
867 slacapra 1.116 self.actions[opt] = Publisher(self.cfg_params)
868    
869     #precessedData=self.cfg_params['USER.publish_data_name']
870     #self.cfg_params['USER.processed_datasetname']
871     #thePublisher = Publisher(self.cfg_params)
872     # publish_exit_status = thePublisher.publish()
873     # if (publish_exit_status == '1'):
874     # common.logger.message("user data publication --> problems")
875     # else:
876     # common.logger.message("user data publication --> ok ")
877     #########################################
878 nsmirnov 1.1 pass
879     return
880    
881 nsmirnov 1.3 def createWorkingSpace_(self):
882 slacapra 1.9 new_dir = ''
883    
884     try:
885     new_dir = self.cfg_params['USER.ui_working_dir']
886 corvo 1.50 self.cfg_params['taskId'] = self.cfg_params['user'] + '_' + string.split(new_dir, '/')[len(string.split(new_dir, '/')) - 1] + '_' + self.current_time
887 slacapra 1.122 if len(string.split(new_dir, '/')) == 1:
888     new_dir = self.cwd + new_dir
889     else:
890     new_dir = new_dir
891 fanzago 1.48 if os.path.exists(new_dir):
892     if os.listdir(new_dir):
893     msg = new_dir + ' already exists and is not empty. Please remove it before create new task'
894     raise CrabException(msg)
895 slacapra 1.9 except KeyError:
896     new_dir = common.prog_name + '_' + self.name + '_' + self.current_time
897 corvo 1.50 self.cfg_params['taskId'] = self.cfg_params['user'] + '_' + new_dir
898 slacapra 1.9 new_dir = self.cwd + new_dir
899     pass
900 fanzago 1.49 common.work_space = WorkSpace(new_dir, self.cfg_params)
901 nsmirnov 1.1 common.work_space.create()
902     return
903    
904 nsmirnov 1.3 def loadConfiguration_(self, opts):
905 nsmirnov 1.1
906     save_opts = common.work_space.loadSavedOptions()
907    
908     # Override saved options with new command-line options
909    
910     for k in opts.keys():
911     save_opts[k] = opts[k]
912     pass
913    
914     # Return updated options
915     return save_opts
916    
917 nsmirnov 1.3 def createLogger_(self, args):
918 nsmirnov 1.1
919     log = Logger()
920     log.quiet(self.flag_quiet)
921     log.setDebugLevel(self.debug_level)
922     log.write(args+'\n')
923 nsmirnov 1.3 log.message(self.headerString_())
924 nsmirnov 1.1 log.flush()
925     common.logger = log
926     return
927    
928 nsmirnov 1.3 def updateHistory_(self, args):
929 nsmirnov 1.1 history_fname = common.prog_name+'.history'
930     history_file = open(history_fname, 'a')
931     history_file.write(self.current_time+': '+args+'\n')
932     history_file.close()
933     return
934    
935 nsmirnov 1.3 def headerString_(self):
936 nsmirnov 1.1 """
937     Creates a string describing program options either given in
938     the command line or their default values.
939     """
940     header = common.prog_name + ' (version ' + common.prog_version_str + \
941     ') running on ' + \
942     time.ctime(time.time())+'\n\n' + \
943     common.prog_name+'. Working options:\n'
944 fanzago 1.59 #print self.job_type_name
945 nsmirnov 1.1 header = header +\
946 slacapra 1.85 ' scheduler ' + self.cfg_params['CRAB.scheduler'] + '\n'+\
947 nsmirnov 1.1 ' job type ' + self.job_type_name + '\n'+\
948     ' working directory ' + common.work_space.topDir()\
949     + '\n'
950     return header
951    
952 nsmirnov 1.3 def createScheduler_(self):
953 nsmirnov 1.1 """
954     Creates a scheduler object instantiated by its name.
955     """
956 slacapra 1.101 klass_name = 'SchedulerBoss'
957 nsmirnov 1.1 file_name = klass_name
958     try:
959     klass = importName(file_name, klass_name)
960     except KeyError:
961     msg = 'No `class '+klass_name+'` found in file `'+file_name+'.py`'
962     raise CrabException(msg)
963     except ImportError, e:
964 slacapra 1.101 msg = 'Cannot create scheduler Boss'
965 nsmirnov 1.1 msg += ' (file: '+file_name+', class '+klass_name+'):\n'
966     msg += str(e)
967     raise CrabException(msg)
968    
969     common.scheduler = klass()
970     common.scheduler.configure(self.cfg_params)
971     return
972    
973     def run(self):
974     """
975     For each
976     """
977    
978     for act in self.main_actions:
979     if act in self.actions.keys(): self.actions[act].run()
980     pass
981    
982     for act in self.aux_actions:
983     if act in self.actions.keys(): self.actions[act].run()
984     pass
985     return
986    
987     ###########################################################################
988     def processHelpOptions(opts):
989    
990 slacapra 1.11 if len(opts):
991     for opt in opts.keys():
992     if opt in ('-v', '-version', '--version') :
993     print Crab.version()
994     return 1
995     if opt in ('-h','-help','--help') :
996     if opts[opt] : help(opts[opt])
997     else: help()
998     return 1
999     else:
1000     usage()
1001 corvo 1.93
1002 nsmirnov 1.1 return 0
1003    
1004 corvo 1.93 ###########################################################################
1005 nsmirnov 1.1 if __name__ == '__main__':
1006 slacapra 1.92 ## Get rid of some useless warning
1007 slacapra 1.94 try:
1008     import warnings
1009     warnings.simplefilter("ignore", RuntimeWarning)
1010     except:
1011     pass # too bad, you'll get the warning
1012 spiga 1.137
1013 nsmirnov 1.1 # Parse command-line options and create a dictionary with
1014     # key-value pairs.
1015     options = parseOptions(sys.argv[1:])
1016    
1017     # Process "help" options, such as '-help', '-version'
1018 slacapra 1.11 if processHelpOptions(options) : sys.exit(0)
1019 nsmirnov 1.1
1020     # Create, initialize, and run a Crab object
1021     try:
1022 nsmirnov 1.3 crab = Crab(options)
1023 nsmirnov 1.1 crab.run()
1024 corvo 1.54 crab.cfg_params['apmon'].free()
1025 nsmirnov 1.1 except CrabException, e:
1026     print '\n' + common.prog_name + ': ' + str(e) + '\n'
1027     if common.logger:
1028     common.logger.write('ERROR: '+str(e)+'\n')
1029     pass
1030     pass
1031    
1032     pass