ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/crab.py
Revision: 1.132
Committed: Mon Oct 22 11:10:14 2007 UTC (17 years, 6 months ago) by mcinquil
Content type: text/x-python
Branch: MAIN
Changes since 1.131: +1 -1 lines
Log Message:
Fix on printout (added typecasting)

File Contents

# User Rev Content
1 elmer 1.65 #!/usr/bin/env python
2 nsmirnov 1.1 from crab_help import *
3     from crab_util import *
4     from crab_exceptions import *
5     from crab_logger import Logger
6     from WorkSpace import WorkSpace
7     from JobDB import JobDB
8 slacapra 1.77 from TaskDB import TaskDB
9 nsmirnov 1.3 from JobList import JobList
10 nsmirnov 1.1 from Creator import Creator
11     from Submitter import Submitter
12 slacapra 1.8 from Checker import Checker
13 slacapra 1.9 from PostMortem import PostMortem
14     from Status import Status
15 spiga 1.16 from StatusBoss import StatusBoss
16 corvo 1.42 from ApmonIf import ApmonIf
17 slacapra 1.36 from Cleaner import Cleaner
18 nsmirnov 1.1 import common
19 spiga 1.13 import Statistic
20 spiga 1.107 import commands
21 gutsche 1.121 from BlackWhiteListParser import BlackWhiteListParser
22 nsmirnov 1.1
23 spiga 1.98 from BossSession import *
24 corvo 1.93
25 spiga 1.107 #modified to support server mode
26 mcinquil 1.119 #from SubmitterServer import SubmitterServer
27     #from GetOutputServer import GetOutputServer
28     #from StatusServer import StatusServer
29     #from PostMortemServer import PostMortemServer
30     #from KillerServer import KillerServer
31     #from CleanerServer import CleanerServer
32 spiga 1.107
33 nsmirnov 1.1 import sys, os, time, string
34    
35     ###########################################################################
36     class Crab:
37 nsmirnov 1.3 def __init__(self, opts):
38 fanzago 1.76 ## test_tag
39 nsmirnov 1.1 # The order of main_actions is important !
40 fanzago 1.111 self.main_actions = [ '-create', '-submit' ]
41     ### FEDE new option "-publish" FOR DBS OUTPUT PUBLICATION
42 slacapra 1.70 self.aux_actions = [ '-list', '-kill', '-status', '-getoutput','-get',
43 slacapra 1.67 '-resubmit' , '-cancelAndResubmit', '-testJdl', '-postMortem', '-clean',
44 fanzago 1.111 '-printId', '-publish' ]
45 nsmirnov 1.1
46     # Dictionary of actions, e.g. '-create' -> object of class Creator
47     self.actions = {}
48 fanzago 1.76
49 nsmirnov 1.1 # Configuration file
50     self.cfg_fname = None
51     # Dictionary with configuration parameters
52     self.cfg_params = {}
53    
54     # Current working directory
55     self.cwd = os.getcwd()+'/'
56     # Current time in format 'yymmdd_hhmmss'
57     self.current_time = time.strftime('%y%m%d_%H%M%S',
58     time.localtime(time.time()))
59    
60 nsmirnov 1.3 # Session name (?) Do we need this ?
61 nsmirnov 1.1 self.name = '0'
62    
63     # Job type
64     self.job_type_name = None
65    
66     # Continuation flag
67     self.flag_continue = 0
68    
69     # quiet mode, i.e. no output on screen
70     self.flag_quiet = 0
71     # produce more output
72     self.debug_level = 0
73    
74    
75 nsmirnov 1.3 self.initialize_(opts)
76 nsmirnov 1.1
77     return
78    
79 nsmirnov 1.7 def version():
80 nsmirnov 1.1 return common.prog_version_str
81    
82 nsmirnov 1.7 version = staticmethod(version)
83    
84 nsmirnov 1.3 def initialize_(self, opts):
85 nsmirnov 1.1
86     # Process the '-continue' option first because
87     # in the case of continuation the CRAB configuration
88     # parameters are loaded from already existing Working Space.
89 nsmirnov 1.3 self.processContinueOption_(opts)
90 nsmirnov 1.1
91     # Process ini-file first, then command line options
92     # because they override ini-file settings.
93    
94 nsmirnov 1.3 self.processIniFile_(opts)
95 nsmirnov 1.1
96 nsmirnov 1.3 if self.flag_continue: opts = self.loadConfiguration_(opts)
97 nsmirnov 1.1
98 nsmirnov 1.3 self.processOptions_(opts)
99 nsmirnov 1.1
100 slacapra 1.77 common.taskDB = TaskDB()
101    
102 slacapra 1.101
103 nsmirnov 1.1 if not self.flag_continue:
104 nsmirnov 1.3 self.createWorkingSpace_()
105 slacapra 1.9 optsToBeSaved={}
106     for it in opts.keys():
107     if (it in self.main_actions) or (it in self.aux_actions) or (it == '-debug'):
108     pass
109     else:
110     optsToBeSaved[it]=opts[it]
111 slacapra 1.101 # store in taskDB the opts
112     common.taskDB.setDict(it[1:], optsToBeSaved[it])
113 slacapra 1.9 common.work_space.saveConfiguration(optsToBeSaved, self.cfg_fname)
114 nsmirnov 1.1 pass
115 slacapra 1.77 else:
116     common.taskDB.load()
117 nsmirnov 1.1
118     # At this point all configuration options have been read.
119    
120     args = string.join(sys.argv,' ')
121 slacapra 1.11
122 nsmirnov 1.3 self.updateHistory_(args)
123 slacapra 1.11
124 nsmirnov 1.3 self.createLogger_(args)
125 slacapra 1.11
126 nsmirnov 1.1 common.jobDB = JobDB()
127 corvo 1.51
128 gutsche 1.121 # init BlackWhiteListParser
129     self.blackWhiteListParser = BlackWhiteListParser(self.cfg_params)
130    
131 spiga 1.107 self.UseServer=0
132     try:
133     self.UseServer=int(self.cfg_params['CRAB.server_mode'])
134     except KeyError:
135     pass
136    
137    
138 corvo 1.63 if int(self.cfg_params['USER.activate_monalisa']): self.cfg_params['apmon'] = ApmonIf()
139    
140 nsmirnov 1.3 if self.flag_continue:
141 slacapra 1.12 try:
142     common.jobDB.load()
143 corvo 1.50 self.cfg_params['taskId'] = common.jobDB._jobs[0].taskId
144 slacapra 1.12 common.logger.debug(6, str(common.jobDB))
145     except DBException,e:
146     pass
147 nsmirnov 1.3 pass
148 slacapra 1.11
149 nsmirnov 1.3 self.createScheduler_()
150 slacapra 1.11
151 slacapra 1.106 common.logger.debug(6, 'Used properties:')
152     if (common.logger.debugLevel()<6 ):
153     common.logger.write('Used properties:')
154     keys = self.cfg_params.keys()
155     keys.sort()
156     for k in keys:
157     if self.cfg_params[k]:
158     common.logger.debug(6, ' '+k+' : '+str(self.cfg_params[k]))
159     if (common.logger.debugLevel()<6 ):
160     common.logger.write(' '+k+' : '+str(self.cfg_params[k]))
161     pass
162     else:
163     common.logger.debug(6, ' '+k+' : ')
164     if (common.logger.debugLevel()<6 ):
165     common.logger.write(' '+k+' : ')
166 nsmirnov 1.3 pass
167     pass
168 slacapra 1.106 common.logger.debug(6, 'End of used properties.\n')
169     if (common.logger.debugLevel()<6 ):
170     common.logger.write('End of used properties.\n')
171    
172 nsmirnov 1.3 self.initializeActions_(opts)
173 nsmirnov 1.1 return
174    
175 nsmirnov 1.3 def processContinueOption_(self,opts):
176 nsmirnov 1.1
177     continue_dir = None
178 nsmirnov 1.4
179     # Look for the '-continue' option.
180    
181 nsmirnov 1.1 for opt in opts.keys():
182     if ( opt in ('-continue','-c') ):
183     self.flag_continue = 1
184     val = opts[opt]
185     if val:
186     if val[0] == '/': continue_dir = val # abs path
187     else: continue_dir = self.cwd + val # rel path
188     pass
189 nsmirnov 1.4 break
190     pass
191    
192     # Look for actions which has sense only with '-continue'
193    
194     if not self.flag_continue:
195     for opt in opts.keys():
196 slacapra 1.102 if ( opt in (self.aux_actions)):
197 nsmirnov 1.4 self.flag_continue = 1
198     break
199 nsmirnov 1.1 pass
200     pass
201 slacapra 1.102 if ("-submit" in opts.keys() and "-create" not in opts.keys() ):
202     self.flag_continue = 1
203 nsmirnov 1.1
204     if not self.flag_continue: return
205    
206     if not continue_dir:
207     prefix = common.prog_name + '_' + self.name + '_'
208     continue_dir = findLastWorkDir(prefix)
209     pass
210    
211     if not continue_dir:
212     raise CrabException('Cannot find last working directory.')
213    
214     if not os.path.exists(continue_dir):
215     msg = 'Cannot continue because the working directory <'
216     msg += continue_dir
217     msg += '> does not exist.'
218     raise CrabException(msg)
219    
220     # Instantiate WorkSpace
221 fanzago 1.49 common.work_space = WorkSpace(continue_dir, self.cfg_params)
222 nsmirnov 1.1
223     return
224    
225 nsmirnov 1.3 def processIniFile_(self, opts):
226 nsmirnov 1.1 """
227     Processes a configuration INI-file.
228     """
229    
230     # Extract cfg-file name from the cmd-line options.
231    
232     for opt in opts.keys():
233     if ( opt == '-cfg' ):
234     if self.flag_continue:
235     raise CrabException('-continue and -cfg cannot coexist.')
236     if opts[opt] : self.cfg_fname = opts[opt]
237     else : usage()
238     pass
239    
240     elif ( opt == '-name' ):
241     self.name = opts[opt]
242     pass
243    
244     pass
245    
246     # Set default cfg-fname
247    
248     if self.cfg_fname == None:
249     if self.flag_continue:
250     self.cfg_fname = common.work_space.cfgFileName()
251     else:
252     self.cfg_fname = common.prog_name+'.cfg'
253     pass
254     pass
255    
256     # Load cfg-file
257    
258     if string.lower(self.cfg_fname) != 'none':
259     if os.path.exists(self.cfg_fname):
260     self.cfg_params = loadConfig(self.cfg_fname)
261 corvo 1.64 self.cfg_params['user'] = os.environ['USER']
262 nsmirnov 1.1 pass
263     else:
264     msg = 'cfg-file '+self.cfg_fname+' not found.'
265     raise CrabException(msg)
266     pass
267     pass
268    
269     # process the [CRAB] section
270    
271     lhp = len('CRAB.')
272     for k in self.cfg_params.keys():
273     if len(k) >= lhp and k[:lhp] == 'CRAB.':
274     opt = '-'+k[lhp:]
275     if len(opt) >= 3 and opt[:3] == '-__': continue
276     if opt not in opts.keys():
277     opts[opt] = self.cfg_params[k]
278     pass
279     pass
280     pass
281    
282     return
283    
284 nsmirnov 1.3 def processOptions_(self, opts):
285 nsmirnov 1.1 """
286     Processes the command-line options.
287     """
288    
289     for opt in opts.keys():
290     val = opts[opt]
291    
292 nsmirnov 1.3 # Skip actions, they are processed later in initializeActions_()
293     if opt in self.main_actions:
294     self.cfg_params['CRAB.'+opt[1:]] = val
295     continue
296     if opt in self.aux_actions:
297     self.cfg_params['CRAB.'+opt[1:]] = val
298     continue
299 nsmirnov 1.1
300 spiga 1.107 elif ( opt == '-server_mode' ): #Add for server mode usage
301     pass
302     elif ( opt == '-server_name' ):
303     pass
304 nsmirnov 1.1
305     elif ( opt == '-cfg' ):
306     pass
307    
308     elif ( opt in ('-continue', '-c') ):
309 nsmirnov 1.4 # Already processed in processContinueOption_()
310 nsmirnov 1.1 pass
311    
312     elif ( opt == '-jobtype' ):
313     if val : self.job_type_name = string.upper(val)
314     else : usage()
315     pass
316    
317     elif ( opt == '-Q' ):
318     self.flag_quiet = 1
319     pass
320    
321     elif ( opt == '-debug' ):
322 slacapra 1.6 if val: self.debug_level = int(val)
323     else: self.debug_level = 1
324 nsmirnov 1.1 pass
325    
326     elif ( opt == '-scheduler' ):
327     pass
328 slacapra 1.22
329 nsmirnov 1.3 elif string.find(opt,'.') == -1:
330     print common.prog_name+'. Unrecognized option '+opt
331     usage()
332     pass
333 nsmirnov 1.1
334 nsmirnov 1.3 # Override config parameters from INI-file with cmd-line params
335     if string.find(opt,'.') == -1 :
336     self.cfg_params['CRAB.'+opt[1:]] = val
337 nsmirnov 1.1 pass
338 nsmirnov 1.3 else:
339 nsmirnov 1.1 # Command line parameters in the form -SECTION.ENTRY=VALUE
340     self.cfg_params[opt[1:]] = val
341     pass
342     pass
343     return
344    
345 slacapra 1.8 def parseRange_(self, aRange):
346 nsmirnov 1.4 """
347 slacapra 1.8 Takes as the input a string with a range defined in any of the following
348     way, including combination, and return a tuple with the ints defined
349     according to following table. A consistency check is done.
350     NB: the first job is "1", not "0".
351     'all' -> [1,2,..., NJobs]
352     '' -> [1,2,..., NJobs]
353     'n1' -> [n1]
354     'n1-n2' -> [n1, n1+1, n1+2, ..., n2-1, n2]
355     'n1,n2' -> [n1, n2]
356     'n1,n2-n3' -> [n1, n2, n2+1, n2+2, ..., n3-1, n3]
357     """
358     result = []
359 farinafa 1.128
360 slacapra 1.9 common.logger.debug(5,"parseRange_ "+str(aRange))
361     if aRange=='all' or aRange==None or aRange=='':
362 slacapra 1.73 result=range(1,common.jobDB.nJobs()+1)
363 slacapra 1.8 return result
364 slacapra 1.9 elif aRange=='0':
365     return result
366 slacapra 1.8
367 farinafa 1.128 subRanges = str(aRange).split(',') # DEPRECATED # Fabio #string.split(aRange, ',')
368 slacapra 1.8 for subRange in subRanges:
369     result = result+self.parseSimpleRange_(subRange)
370    
371     if self.checkUniqueness_(result):
372     return result
373     else:
374 mcinquil 1.132 common.logger.message( "Error " +str(result) )
375 slacapra 1.8 return []
376    
377     def checkUniqueness_(self, list):
378     """
379 slacapra 1.9 check if a list contains only unique elements
380 slacapra 1.8 """
381    
382     uniqueList = []
383     # use a list comprehension statement (takes a while to understand)
384    
385     [uniqueList.append(it) for it in list if not uniqueList.count(it)]
386    
387     return (len(list)==len(uniqueList))
388    
389 spiga 1.107 def uuidgen(self):
390     """Generate a UUID"""
391     return commands.getoutput('uuidgen')
392    
393    
394 slacapra 1.8 def parseSimpleRange_(self, aRange):
395     """
396     Takes as the input a string with two integers separated by
397     the minus sign and returns the tuple with these numbers:
398     'n1-n2' -> [n1, n1+1, n1+2, ..., n2-1, n2]
399     'n1' -> [n1]
400     """
401     (start, end) = (None, None)
402    
403     result = []
404 farinafa 1.128 minus = str(aRange).find('-') #DEPRECATED #Fabio #string.find(aRange, '-')
405 slacapra 1.8 if ( minus < 0 ):
406 farinafa 1.128 if int(aRange)>0:
407 corvo 1.97 # FEDE
408     #result.append(int(aRange)-1)
409     ###
410 slacapra 1.62 result.append(int(aRange))
411 slacapra 1.8 else:
412 spiga 1.47 common.logger.message("parseSimpleRange_ ERROR "+aRange)
413     usage()
414     pass
415    
416 nsmirnov 1.4 pass
417     else:
418 farinafa 1.128 (start, end) = str(aRange).split('-')
419 slacapra 1.8 if isInt(start) and isInt(end) and int(start)>0 and int(start)<int(end):
420 corvo 1.97 #result=range(int(start)-1, int(end))
421     result=range(int(start), int(end)+1) #Daniele
422 slacapra 1.8 else:
423 slacapra 1.33 common.logger.message("parseSimpleRange_ ERROR "+start+end)
424 slacapra 1.8
425     return result
426 nsmirnov 1.4
427 nsmirnov 1.3 def initializeActions_(self, opts):
428 nsmirnov 1.1 """
429     For each user action instantiate a corresponding
430     object and put it in the action dictionary.
431     """
432 spiga 1.15
433     for opt in opts.keys():
434    
435 nsmirnov 1.1 val = opts[opt]
436 spiga 1.15
437    
438     if ( opt == '-create' ):
439 gutsche 1.83 if val and val != 'all':
440     msg = 'Per default, CRAB will create all jobs as specified in the crab.cfg file, not the command line!'
441     common.logger.message(msg)
442     msg = 'Submission will still take into account the number of jobs specified on the command line!\n'
443     common.logger.message(msg)
444 slacapra 1.82 ncjobs = 'all'
445    
446     # Instantiate Creator object
447     self.creator = Creator(self.job_type_name,
448     self.cfg_params,
449     ncjobs)
450     self.actions[opt] = self.creator
451    
452     # Initialize the JobDB object if needed
453     if not self.flag_continue:
454     common.jobDB.create(self.creator.nJobs())
455 nsmirnov 1.1 pass
456 nsmirnov 1.3
457 slacapra 1.82 # Create and initialize JobList
458 nsmirnov 1.3
459 slacapra 1.82 common.job_list = JobList(common.jobDB.nJobs(),
460     self.creator.jobType())
461 nsmirnov 1.3
462 slacapra 1.82 common.taskDB.setDict('ScriptName',common.work_space.jobDir()+"/"+self.job_type_name+'.sh')
463     common.taskDB.setDict('JdlName',common.work_space.jobDir()+"/"+self.job_type_name+'.jdl')
464     common.taskDB.setDict('CfgName',common.work_space.jobDir()+"/"+self.creator.jobType().configFilename())
465     common.job_list.setScriptNames(self.job_type_name+'.sh')
466     common.job_list.setJDLNames(self.job_type_name+'.jdl')
467     common.job_list.setCfgNames(self.creator.jobType().configFilename())
468 slacapra 1.9
469 slacapra 1.82 self.creator.writeJobsSpecsToDB()
470 spiga 1.107 taskUnicId= self.uuidgen()
471     common.taskDB.setDict('TasKUUID',taskUnicId)
472 slacapra 1.77
473 slacapra 1.82 common.taskDB.save()
474 nsmirnov 1.3 pass
475 nsmirnov 1.1
476     elif ( opt == '-submit' ):
477 spiga 1.107 # modified to support server mode
478     if (self.UseServer== 1):
479 mcinquil 1.119 from SubmitterServer import SubmitterServer
480 farinafa 1.125 self.actions[opt] = SubmitterServer(self.cfg_params, self.parseRange_(val), val)
481 spiga 1.107 else:
482     # modified to support server mode
483     # get user request
484     nsjobs = -1
485 mcinquil 1.126 chosenJobsList = None
486 spiga 1.107 if val:
487 farinafa 1.125 if val=='all':
488 spiga 1.107 pass
489 farinafa 1.125 elif (type(eval(val)) is int) and eval(val) > 0:
490     # positive number
491     nsjobs = eval(val)
492     # NEW PART # Fabio
493     # put here code for LIST MANAGEMEN
494     elif (type(eval(val)) is tuple)or( type(eval(val)) is int and eval(val)<0 ) :
495     chosenJobsList = self.parseRange_(val)
496     chosenJobsList = [i-1 for i in chosenJobsList ]
497     nsjobs = len(chosenJobsList)
498     #
499 spiga 1.107 else:
500     msg = 'Bad submission option <'+str(val)+'>\n'
501     msg += ' Must be an integer or "all"'
502     msg += ' Generic range is not allowed"'
503     raise CrabException(msg)
504     pass
505    
506     common.logger.debug(5,'nsjobs '+str(nsjobs))
507     # total jobs
508     nj_list = []
509     # get the first not already submitted
510     common.logger.debug(5,'Total jobs '+str(common.jobDB.nJobs()))
511     jobSetForSubmission = 0
512 gutsche 1.110 jobSkippedInSubmission = []
513 slacapra 1.114 datasetpath=self.cfg_params['CMSSW.datasetpath']
514 farinafa 1.125
515     # NEW PART # Fabio
516     # modified to handle list of jobs by the users # Fabio
517     tmp_jList = range(common.jobDB.nJobs())
518     if chosenJobsList != None:
519     tmp_jList = chosenJobsList
520     # build job list
521     for nj in tmp_jList:
522     cleanedBlackWhiteList = self.blackWhiteListParser.cleanForBlackWhiteList(common.jobDB.destination(nj)) # More readable # Fabio
523 mcinquil 1.127 if (cleanedBlackWhiteList != '') or (datasetpath == "None" ) or (datasetpath == None): ## Matty's fix
524 gutsche 1.110 if (common.jobDB.status(nj) not in ['R','S','K','Y','A','D','Z']):
525     jobSetForSubmission +=1
526     nj_list.append(nj)
527 farinafa 1.125 else:
528     continue
529 gutsche 1.110 else :
530     jobSkippedInSubmission.append(nj+1)
531 farinafa 1.125 #
532 spiga 1.107 if nsjobs >0 and nsjobs == jobSetForSubmission:
533     break
534     pass
535 farinafa 1.125 del tmp_jList
536     #
537    
538 spiga 1.107 if nsjobs>jobSetForSubmission:
539     common.logger.message('asking to submit '+str(nsjobs)+' jobs, but only '+str(jobSetForSubmission)+' left: submitting those')
540 gutsche 1.110 if len(jobSkippedInSubmission) > 0 :
541 mcinquil 1.131 #print jobSkippedInSubmission
542     #print spanRanges(jobSkippedInSubmission)
543     mess =""
544     for jobs in jobSkippedInSubmission:
545     mess += str(jobs) + ","
546     common.logger.message("Jobs: " +str(mess) + "\n skipped because no sites are hosting this data\n")
547 spiga 1.107 # submit N from last submitted job
548     common.logger.debug(5,'nj_list '+str(nj_list))
549    
550     if len(nj_list) != 0:
551     # Instantiate Submitter object
552     self.actions[opt] = Submitter(self.cfg_params, nj_list)
553     # Create and initialize JobList
554     if len(common.job_list) == 0 :
555     common.job_list = JobList(common.jobDB.nJobs(),
556     None)
557     common.job_list.setJDLNames(self.job_type_name+'.jdl')
558     pass
559 slacapra 1.23 pass
560 slacapra 1.22 else:
561 spiga 1.107 common.logger.message('No jobs left to submit: exiting...')
562 nsmirnov 1.1 pass
563    
564 nsmirnov 1.4 elif ( opt == '-list' ):
565 slacapra 1.8 jobs = self.parseRange_(val)
566    
567     common.jobDB.dump(jobs)
568 nsmirnov 1.4 pass
569    
570 slacapra 1.67 elif ( opt == '-printId' ):
571 spiga 1.115 # modified to support server mode
572     if (self.UseServer== 1):
573     try:
574     common.taskDB.load()
575     WorkDirName =os.path.basename(os.path.split(common.work_space.topDir())[0])
576     projectUniqName = 'crab_'+str(WorkDirName)+'_'+common.taskDB.dict('TasKUUID')
577     print "Task Id = %-40s " %(projectUniqName)
578     except:
579     common.logger.message("Warning :Interaction in query task unique ID failed")
580     pass
581     else:
582     try:
583 slacapra 1.117 common.scheduler.bossTask.load(ALL)
584 spiga 1.115 except RuntimeError,e:
585     common.logger.message( e.__str__() )
586     except ValueError,e:
587     common.logger.message("Warning : Scheduler interaction in query operation failed for jobs:")
588     common.logger.message(e.what())
589     pass
590     task = common.scheduler.bossTask.jobsDict()
591    
592     for c, v in task.iteritems():
593     k = int(c)
594     nj=k
595     id = v['CHAIN_ID']
596     jid = v['SCHED_ID']
597     if jid:
598     print "Job: %-5s Id = %-40s " %(id,jid)
599     #else:
600     # print "Job: ",id," No ID yet"
601 corvo 1.97 pass
602 slacapra 1.67
603 nsmirnov 1.4 elif ( opt == '-status' ):
604 spiga 1.107 # modified to support server mode
605     if (self.UseServer== 1):
606 mcinquil 1.119 from StatusServer import StatusServer
607 spiga 1.107 self.actions[opt] = StatusServer(self.cfg_params)
608     else:
609     jobs = self.parseRange_(val)
610    
611     if len(jobs) != 0:
612     self.actions[opt] = StatusBoss(self.cfg_params)
613     pass
614 slacapra 1.22
615 nsmirnov 1.4 elif ( opt == '-kill' ):
616 slacapra 1.8
617 corvo 1.109 if (self.UseServer== 1):
618 mcinquil 1.130 if val:
619     if val =='all':
620     jobs = common.scheduler.listBoss()
621     else:
622     jobs = self.parseRange_(val)
623     from KillerServer import KillerServer
624     self.actions[opt] = KillerServer(self.cfg_params,val, self.parseRange_(val)) #Fabio
625     else:
626     common.logger.message("Warning: with '-kill' you _MUST_ specify a job range or 'all'")
627 corvo 1.109 else:
628 farinafa 1.120 if val:
629 corvo 1.109 if val =='all':
630     jobs = common.scheduler.listBoss()
631     else:
632     jobs = self.parseRange_(val)
633     common.scheduler.cancel(jobs)
634 spiga 1.31 else:
635 corvo 1.109 common.logger.message("Warning: with '-kill' you _MUST_ specify a job range or 'all'")
636 nsmirnov 1.1
637 farinafa 1.120
638 slacapra 1.70 elif ( opt == '-getoutput' or opt == '-get'):
639 spiga 1.107 # modified to support server mode
640     if (self.UseServer== 1):
641 mcinquil 1.119 from GetOutputServer import GetOutputServer
642 spiga 1.107 self.actions[opt] = GetOutputServer(self.cfg_params)
643 spiga 1.20 else:
644 spiga 1.107 if val=='all' or val==None or val=='':
645     jobs = common.scheduler.listBoss()
646     else:
647     jobs = self.parseRange_(val)
648    
649     jobs_done = []
650     for nj in jobs:
651     jobs_done.append(nj)
652     common.scheduler.getOutput(jobs_done)
653     pass
654 nsmirnov 1.4
655     elif ( opt == '-resubmit' ):
656 slacapra 1.78 if val=='all' or val==None or val=='':
657     jobs = common.scheduler.listBoss()
658 fanzago 1.37 else:
659 slacapra 1.78 jobs = self.parseRange_(val)
660 fanzago 1.37
661 slacapra 1.11 if val:
662     # create a list of jobs to be resubmitted.
663 corvo 1.93 val = string.replace(val,'-',':')
664 slacapra 1.8
665 slacapra 1.22 ### as before, create a Resubmittter Class
666 slacapra 1.78 maxIndex = common.scheduler.listBoss()
667 corvo 1.93 ##
668     # Marco. Vediamo se va meglio cosi'...
669     ##
670 corvo 1.97 try:
671     common.scheduler.bossTask.query(ALL, val)
672     except RuntimeError,e:
673     common.logger.message( e.__str__() )
674     except ValueError,e:
675 slacapra 1.103 common.logger.message( "Warning : Scheduler interaction in query operation failed for jobs:")
676     common.logger.message(e.what())
677 corvo 1.97 pass
678 corvo 1.93 task = common.scheduler.bossTask.jobsDict()
679    
680 slacapra 1.11 nj_list = []
681 corvo 1.93 for c, v in task.iteritems():
682 corvo 1.97 k = int(c)
683     nj=k
684 corvo 1.93 st = v['STATUS']
685    
686 corvo 1.97 if int(nj) <= int(len(maxIndex)) :
687 spiga 1.104 if st in ['K','SA','Z','DA']:
688 corvo 1.97 nj_list.append(int(nj)-1)
689     common.jobDB.setStatus(int(nj)-1,'C')
690 spiga 1.99 elif st in ['E','SE']:
691 spiga 1.60 common.scheduler.moveOutput(nj)
692 corvo 1.97 nj_list.append(int(nj)-1)
693     st = common.jobDB.setStatus(int(nj)-1,'RC')
694 corvo 1.93 elif st in ['W']:
695 corvo 1.97 common.logger.message('Job #'+`int(nj)`+' has status '+crabJobStatusToString(st)+' not yet submitted!!!')
696 spiga 1.60 pass
697 corvo 1.93 elif st in ['SD', 'OR']:
698 corvo 1.97 common.logger.message('Job #'+`int(nj)`+' has status '+crabJobStatusToString(st)+' must be retrieved before resubmission')
699 spiga 1.60 else:
700     common.logger.message('Job #'+`nj`+' has status '+crabJobStatusToString(st)+' must be "killed" before resubmission')
701 slacapra 1.11 else:
702 corvo 1.97 common.logger.message('Job #'+`int(nj)`+' no possible to resubmit!! out of range')
703 fanzago 1.37 if len(common.job_list) == 0 :
704     common.job_list = JobList(common.jobDB.nJobs(),None)
705     common.job_list.setJDLNames(self.job_type_name+'.jdl')
706     pass
707    
708 slacapra 1.11 if len(nj_list) != 0:
709 corvo 1.93 nj_list.sort()
710 slacapra 1.11 # Instantiate Submitter object
711 fanzago 1.58 self.actions[opt] = Submitter(self.cfg_params, nj_list)
712 slacapra 1.11
713 slacapra 1.8 pass
714     pass
715 slacapra 1.11 else:
716     common.logger.message("Warning: with '-resubmit' you _MUST_ specify a job range or 'all'")
717 spiga 1.60 common.logger.message("WARNING: _all_ job specified in the range will be resubmitted!!!")
718 slacapra 1.11 pass
719 fanzago 1.37 common.jobDB.save()
720 slacapra 1.8 pass
721 gutsche 1.61
722 slacapra 1.8 elif ( opt == '-cancelAndResubmit' ):
723 nsmirnov 1.5
724 slacapra 1.78 if val:
725     if val =='all':
726     jobs = common.scheduler.listBoss()
727 spiga 1.47 else:
728 slacapra 1.78 jobs = self.parseRange_(val)
729     # kill submitted jobs
730     common.scheduler.cancel(jobs)
731 spiga 1.47 else:
732 slacapra 1.78 common.logger.message("Warning: with '-cancelAndResubmit' you _MUST_ specify a job range or 'all'")
733 nsmirnov 1.5
734 spiga 1.47 # resubmit cancelled jobs.
735     if val:
736     nj_list = []
737     for nj in jobs:
738     st = common.jobDB.status(int(nj)-1)
739     if st in ['K','A']:
740 corvo 1.97 nj_list.append(int(nj)-1)
741     common.jobDB.setStatus(int(nj)-1,'C')
742 spiga 1.47 elif st == 'Y':
743     common.scheduler.moveOutput(nj)
744 corvo 1.97 nj_list.append(int(nj)-1)
745     st = common.jobDB.setStatus(int(nj)-1,'RC')
746 spiga 1.47 elif st in ['C','X']:
747 corvo 1.97 common.logger.message('Job #'+`int(nj)`+' has status '+crabJobStatusToString(st)+' not yet submitted!!!')
748 spiga 1.47 pass
749     elif st == 'D':
750 corvo 1.97 common.logger.message('Job #'+`int(nj)`+' has status '+crabJobStatusToString(st)+' must be retrieved before resubmission')
751 spiga 1.47 else:
752     common.logger.message('Job #'+`nj`+' has status '+crabJobStatusToString(st)+' must be "killed" before resubmission')
753     pass
754    
755 nsmirnov 1.5 if len(common.job_list) == 0 :
756 spiga 1.47 common.job_list = JobList(common.jobDB.nJobs(),None)
757 nsmirnov 1.5 common.job_list.setJDLNames(self.job_type_name+'.jdl')
758     pass
759 spiga 1.47
760     if len(nj_list) != 0:
761 spiga 1.86 # common.scheduler.resubmit(nj_list)
762 fanzago 1.58 self.actions[opt] = Submitter(self.cfg_params, nj_list)
763 spiga 1.47 pass
764     pass
765     else:
766     common.logger.message("WARNING: _all_ job specified in the rage will be cancelled and resubmitted!!!")
767 nsmirnov 1.5 pass
768 spiga 1.47 common.jobDB.save()
769 nsmirnov 1.4 pass
770 spiga 1.47
771 slacapra 1.28 elif ( opt == '-testJdl' ):
772 slacapra 1.8 jobs = self.parseRange_(val)
773     nj_list = []
774     for nj in jobs:
775 slacapra 1.62 st = common.jobDB.status(nj-1)
776 fanzago 1.129 if st != 'X': nj_list.append(nj-1)
777 slacapra 1.8 pass
778    
779     if len(nj_list) != 0:
780 slacapra 1.77 # Instantiate Checker object
781 slacapra 1.8 self.actions[opt] = Checker(self.cfg_params, nj_list)
782    
783     # Create and initialize JobList
784    
785     if len(common.job_list) == 0 :
786     common.job_list = JobList(common.jobDB.nJobs(), None)
787     common.job_list.setJDLNames(self.job_type_name+'.jdl')
788     pass
789     pass
790    
791 slacapra 1.9 elif ( opt == '-postMortem' ):
792 corvo 1.95
793 spiga 1.108 # modified to support server mode
794     if (self.UseServer== 1):
795 mcinquil 1.119 from PostMortemServer import PostMortemServer
796 spiga 1.108 self.actions[opt] = PostMortemServer(self.cfg_params)
797     else:
798     if val:
799     val = string.replace(val,'-',':')
800     else: val=''
801     nj_list = {}
802 corvo 1.97
803 spiga 1.108 try:
804     common.scheduler.bossTask.query(ALL, val)
805     except RuntimeError,e:
806     common.logger.message( e.__str__() )
807     except ValueError,e:
808     common.logger.message("Warning : Scheduler interaction in query operation failed for jobs:")
809     common.logger.message( e.what())
810     pass
811 corvo 1.93
812 spiga 1.108 task = common.scheduler.bossTask.jobsDict()
813    
814     for c, v in task.iteritems():
815     k = int(c)
816     nj=k
817     if v['SCHED_ID']: nj_list[v['CHAIN_ID']]=v['SCHED_ID']
818     pass
819 slacapra 1.9
820 spiga 1.108 if len(nj_list) != 0:
821 corvo 1.93 # Instantiate PostMortem object
822 spiga 1.108 self.actions[opt] = PostMortem(self.cfg_params, nj_list)
823 slacapra 1.9 # Create and initialize JobList
824 spiga 1.108 if len(common.job_list) == 0 :
825     common.job_list = JobList(common.jobDB.nJobs(), None)
826     common.job_list.setJDLNames(self.job_type_name+'.jdl')
827     pass
828 slacapra 1.9 pass
829 spiga 1.108 else:
830     common.logger.message("No jobs to analyze")
831 slacapra 1.9
832     elif ( opt == '-clean' ):
833     if val != None:
834     raise CrabException("No range allowed for '-clean'")
835 mcinquil 1.118 if (self.UseServer== 1):
836 mcinquil 1.119 from CleanerServer import CleanerServer
837 mcinquil 1.118 self.actions[opt] = CleanerServer(self.cfg_params)
838     else:
839     self.actions[opt] = Cleaner(self.cfg_params)
840    
841 fanzago 1.111 ### FEDE DBS/DLS OUTPUT PUBLICATION
842     elif ( opt == '-publish' ):
843 slacapra 1.113 from DBS2Publisher import Publisher
844 slacapra 1.116 self.actions[opt] = Publisher(self.cfg_params)
845    
846     #precessedData=self.cfg_params['USER.publish_data_name']
847     #self.cfg_params['USER.processed_datasetname']
848     #thePublisher = Publisher(self.cfg_params)
849     # publish_exit_status = thePublisher.publish()
850     # if (publish_exit_status == '1'):
851     # common.logger.message("user data publication --> problems")
852     # else:
853     # common.logger.message("user data publication --> ok ")
854     #########################################
855 nsmirnov 1.1 pass
856     return
857    
858 nsmirnov 1.3 def createWorkingSpace_(self):
859 slacapra 1.9 new_dir = ''
860    
861     try:
862     new_dir = self.cfg_params['USER.ui_working_dir']
863 corvo 1.50 self.cfg_params['taskId'] = self.cfg_params['user'] + '_' + string.split(new_dir, '/')[len(string.split(new_dir, '/')) - 1] + '_' + self.current_time
864 slacapra 1.122 if len(string.split(new_dir, '/')) == 1:
865     new_dir = self.cwd + new_dir
866     else:
867     new_dir = new_dir
868 fanzago 1.48 if os.path.exists(new_dir):
869     if os.listdir(new_dir):
870     msg = new_dir + ' already exists and is not empty. Please remove it before create new task'
871     raise CrabException(msg)
872 slacapra 1.9 except KeyError:
873     new_dir = common.prog_name + '_' + self.name + '_' + self.current_time
874 corvo 1.50 self.cfg_params['taskId'] = self.cfg_params['user'] + '_' + new_dir
875 slacapra 1.9 new_dir = self.cwd + new_dir
876     pass
877 fanzago 1.49 common.work_space = WorkSpace(new_dir, self.cfg_params)
878 nsmirnov 1.1 common.work_space.create()
879     return
880    
881 nsmirnov 1.3 def loadConfiguration_(self, opts):
882 nsmirnov 1.1
883     save_opts = common.work_space.loadSavedOptions()
884    
885     # Override saved options with new command-line options
886    
887     for k in opts.keys():
888     save_opts[k] = opts[k]
889     pass
890    
891     # Return updated options
892     return save_opts
893    
894 nsmirnov 1.3 def createLogger_(self, args):
895 nsmirnov 1.1
896     log = Logger()
897     log.quiet(self.flag_quiet)
898     log.setDebugLevel(self.debug_level)
899     log.write(args+'\n')
900 nsmirnov 1.3 log.message(self.headerString_())
901 nsmirnov 1.1 log.flush()
902     common.logger = log
903     return
904    
905 nsmirnov 1.3 def updateHistory_(self, args):
906 nsmirnov 1.1 history_fname = common.prog_name+'.history'
907     history_file = open(history_fname, 'a')
908     history_file.write(self.current_time+': '+args+'\n')
909     history_file.close()
910     return
911    
912 nsmirnov 1.3 def headerString_(self):
913 nsmirnov 1.1 """
914     Creates a string describing program options either given in
915     the command line or their default values.
916     """
917     header = common.prog_name + ' (version ' + common.prog_version_str + \
918     ') running on ' + \
919     time.ctime(time.time())+'\n\n' + \
920     common.prog_name+'. Working options:\n'
921 fanzago 1.59 #print self.job_type_name
922 nsmirnov 1.1 header = header +\
923 slacapra 1.85 ' scheduler ' + self.cfg_params['CRAB.scheduler'] + '\n'+\
924 nsmirnov 1.1 ' job type ' + self.job_type_name + '\n'+\
925     ' working directory ' + common.work_space.topDir()\
926     + '\n'
927     return header
928    
929 nsmirnov 1.3 def createScheduler_(self):
930 nsmirnov 1.1 """
931     Creates a scheduler object instantiated by its name.
932     """
933 slacapra 1.101 klass_name = 'SchedulerBoss'
934 nsmirnov 1.1 file_name = klass_name
935     try:
936     klass = importName(file_name, klass_name)
937     except KeyError:
938     msg = 'No `class '+klass_name+'` found in file `'+file_name+'.py`'
939     raise CrabException(msg)
940     except ImportError, e:
941 slacapra 1.101 msg = 'Cannot create scheduler Boss'
942 nsmirnov 1.1 msg += ' (file: '+file_name+', class '+klass_name+'):\n'
943     msg += str(e)
944     raise CrabException(msg)
945    
946     common.scheduler = klass()
947     common.scheduler.configure(self.cfg_params)
948     return
949    
950     def run(self):
951     """
952     For each
953     """
954    
955     for act in self.main_actions:
956     if act in self.actions.keys(): self.actions[act].run()
957     pass
958    
959     for act in self.aux_actions:
960     if act in self.actions.keys(): self.actions[act].run()
961     pass
962     return
963    
964     ###########################################################################
965     def processHelpOptions(opts):
966    
967 slacapra 1.11 if len(opts):
968     for opt in opts.keys():
969     if opt in ('-v', '-version', '--version') :
970     print Crab.version()
971     return 1
972     if opt in ('-h','-help','--help') :
973     if opts[opt] : help(opts[opt])
974     else: help()
975     return 1
976     else:
977     usage()
978 corvo 1.93
979 nsmirnov 1.1 return 0
980    
981 corvo 1.93 ###########################################################################
982 nsmirnov 1.1 if __name__ == '__main__':
983 slacapra 1.92 ## Get rid of some useless warning
984 slacapra 1.94 try:
985     import warnings
986     warnings.simplefilter("ignore", RuntimeWarning)
987     except:
988     pass # too bad, you'll get the warning
989 corvo 1.93
990 spiga 1.123 os.putenv("PATH", definePath("new") )
991    
992 nsmirnov 1.1 # Parse command-line options and create a dictionary with
993     # key-value pairs.
994    
995     options = parseOptions(sys.argv[1:])
996    
997     # Process "help" options, such as '-help', '-version'
998    
999 slacapra 1.11 if processHelpOptions(options) : sys.exit(0)
1000 nsmirnov 1.1
1001     # Create, initialize, and run a Crab object
1002    
1003     try:
1004 nsmirnov 1.3 crab = Crab(options)
1005 nsmirnov 1.1 crab.run()
1006 corvo 1.54 crab.cfg_params['apmon'].free()
1007 nsmirnov 1.1 except CrabException, e:
1008     print '\n' + common.prog_name + ': ' + str(e) + '\n'
1009     if common.logger:
1010     common.logger.write('ERROR: '+str(e)+'\n')
1011     pass
1012     pass
1013    
1014     pass