ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/crab.py
Revision: 1.68
Committed: Thu Jun 15 17:37:56 2006 UTC (18 years, 10 months ago) by slacapra
Content type: text/x-python
Branch: MAIN
Changes since 1.67: +2 -2 lines
Log Message:
minor problem in numbering for printId

File Contents

# User Rev Content
1 elmer 1.65 #!/usr/bin/env python
2 nsmirnov 1.1 from crab_help import *
3     from crab_util import *
4     from crab_exceptions import *
5     from crab_logger import Logger
6     from WorkSpace import WorkSpace
7     from JobDB import JobDB
8 nsmirnov 1.3 from JobList import JobList
9 nsmirnov 1.1 from Creator import Creator
10     from Submitter import Submitter
11 slacapra 1.8 from Checker import Checker
12 slacapra 1.9 from PostMortem import PostMortem
13     from Status import Status
14 spiga 1.16 from StatusBoss import StatusBoss
15 corvo 1.42 from ApmonIf import ApmonIf
16 slacapra 1.36 from Cleaner import Cleaner
17 nsmirnov 1.1 import common
18 spiga 1.13 import Statistic
19 nsmirnov 1.1
20     import sys, os, time, string
21    
22     ###########################################################################
23     class Crab:
24 nsmirnov 1.3 def __init__(self, opts):
25 nsmirnov 1.1
26     # The order of main_actions is important !
27 slacapra 1.28 self.main_actions = [ '-create', '-submit' ]
28 slacapra 1.8 self.aux_actions = [ '-list', '-kill', '-status', '-getoutput',
29 slacapra 1.67 '-resubmit' , '-cancelAndResubmit', '-testJdl', '-postMortem', '-clean',
30     '-printId' ]
31 nsmirnov 1.1
32     # Dictionary of actions, e.g. '-create' -> object of class Creator
33     self.actions = {}
34    
35     # Configuration file
36     self.cfg_fname = None
37     # Dictionary with configuration parameters
38     self.cfg_params = {}
39    
40     # Current working directory
41     self.cwd = os.getcwd()+'/'
42     # Current time in format 'yymmdd_hhmmss'
43     self.current_time = time.strftime('%y%m%d_%H%M%S',
44     time.localtime(time.time()))
45    
46 nsmirnov 1.3 # Session name (?) Do we need this ?
47 nsmirnov 1.1 self.name = '0'
48    
49     # Job type
50     self.job_type_name = None
51    
52     # Continuation flag
53     self.flag_continue = 0
54    
55     # quiet mode, i.e. no output on screen
56     self.flag_quiet = 0
57     # produce more output
58     self.debug_level = 0
59    
60 nsmirnov 1.3 # Scheduler name, e.g. 'edg', 'lsf'
61 fanzago 1.53 # self.scheduler_name = ''
62 nsmirnov 1.1
63 nsmirnov 1.3 self.initialize_(opts)
64 nsmirnov 1.1
65     return
66    
67 nsmirnov 1.7 def version():
68 nsmirnov 1.1 return common.prog_version_str
69    
70 nsmirnov 1.7 version = staticmethod(version)
71    
72 nsmirnov 1.3 def initialize_(self, opts):
73 nsmirnov 1.1
74     # Process the '-continue' option first because
75     # in the case of continuation the CRAB configuration
76     # parameters are loaded from already existing Working Space.
77 nsmirnov 1.3 self.processContinueOption_(opts)
78 nsmirnov 1.1
79     # Process ini-file first, then command line options
80     # because they override ini-file settings.
81    
82 nsmirnov 1.3 self.processIniFile_(opts)
83 nsmirnov 1.1
84 nsmirnov 1.3 if self.flag_continue: opts = self.loadConfiguration_(opts)
85 nsmirnov 1.1
86 nsmirnov 1.3 self.processOptions_(opts)
87 nsmirnov 1.1
88     if not self.flag_continue:
89 nsmirnov 1.3 self.createWorkingSpace_()
90 slacapra 1.9 optsToBeSaved={}
91     for it in opts.keys():
92     if (it in self.main_actions) or (it in self.aux_actions) or (it == '-debug'):
93     pass
94     else:
95     optsToBeSaved[it]=opts[it]
96     common.work_space.saveConfiguration(optsToBeSaved, self.cfg_fname)
97 nsmirnov 1.1 pass
98    
99     # At this point all configuration options have been read.
100    
101     args = string.join(sys.argv,' ')
102 slacapra 1.11
103 nsmirnov 1.3 self.updateHistory_(args)
104 slacapra 1.11
105 nsmirnov 1.3 self.createLogger_(args)
106 slacapra 1.11
107 nsmirnov 1.1 common.jobDB = JobDB()
108 corvo 1.51
109 corvo 1.63 if int(self.cfg_params['USER.activate_monalisa']): self.cfg_params['apmon'] = ApmonIf()
110    
111 nsmirnov 1.3 if self.flag_continue:
112 slacapra 1.12 try:
113     common.jobDB.load()
114 corvo 1.50 self.cfg_params['taskId'] = common.jobDB._jobs[0].taskId
115 slacapra 1.12 common.logger.debug(6, str(common.jobDB))
116     except DBException,e:
117     pass
118 nsmirnov 1.3 pass
119 slacapra 1.11
120 nsmirnov 1.3 self.createScheduler_()
121 slacapra 1.11
122 nsmirnov 1.3 if common.logger.debugLevel() >= 6:
123     common.logger.debug(6, 'Used properties:')
124     keys = self.cfg_params.keys()
125     keys.sort()
126     for k in keys:
127     if self.cfg_params[k]:
128 corvo 1.42 common.logger.debug(6, ' '+k+' : '+str(self.cfg_params[k]))
129 nsmirnov 1.3 pass
130     else:
131     common.logger.debug(6, ' '+k+' : ')
132     pass
133     pass
134     common.logger.debug(6, 'End of used properties.\n')
135     pass
136     self.initializeActions_(opts)
137 nsmirnov 1.1 return
138    
139 nsmirnov 1.3 def processContinueOption_(self,opts):
140 nsmirnov 1.1
141     continue_dir = None
142 nsmirnov 1.4
143     # Look for the '-continue' option.
144    
145 nsmirnov 1.1 for opt in opts.keys():
146     if ( opt in ('-continue','-c') ):
147     self.flag_continue = 1
148     val = opts[opt]
149     if val:
150     if val[0] == '/': continue_dir = val # abs path
151     else: continue_dir = self.cwd + val # rel path
152     pass
153 nsmirnov 1.4 break
154     pass
155    
156     # Look for actions which has sense only with '-continue'
157    
158     if not self.flag_continue:
159     for opt in opts.keys():
160 slacapra 1.6 if ( opt in (self.aux_actions) ):
161 nsmirnov 1.4 self.flag_continue = 1
162     break
163 nsmirnov 1.1 pass
164     pass
165 slacapra 1.6 submit_flag=0
166     create_flag=0
167     for opt in opts.keys():
168     if opt == "-submit": submit_flag=1
169     if opt == "-create": create_flag=1
170     pass
171     if (submit_flag and not create_flag):
172 nsmirnov 1.7 msg = "'-submit' must be used with either '-create' or '-continue'."
173     raise CrabException(msg)
174 slacapra 1.6 pass
175 nsmirnov 1.1
176     if not self.flag_continue: return
177    
178     if not continue_dir:
179     prefix = common.prog_name + '_' + self.name + '_'
180     continue_dir = findLastWorkDir(prefix)
181     pass
182    
183     if not continue_dir:
184     raise CrabException('Cannot find last working directory.')
185    
186     if not os.path.exists(continue_dir):
187     msg = 'Cannot continue because the working directory <'
188     msg += continue_dir
189     msg += '> does not exist.'
190     raise CrabException(msg)
191    
192     # Instantiate WorkSpace
193 fanzago 1.49 common.work_space = WorkSpace(continue_dir, self.cfg_params)
194 nsmirnov 1.1
195     return
196    
197 nsmirnov 1.3 def processIniFile_(self, opts):
198 nsmirnov 1.1 """
199     Processes a configuration INI-file.
200     """
201    
202     # Extract cfg-file name from the cmd-line options.
203    
204     for opt in opts.keys():
205     if ( opt == '-cfg' ):
206     if self.flag_continue:
207     raise CrabException('-continue and -cfg cannot coexist.')
208     if opts[opt] : self.cfg_fname = opts[opt]
209     else : usage()
210     pass
211    
212     elif ( opt == '-name' ):
213     self.name = opts[opt]
214     pass
215    
216     pass
217    
218     # Set default cfg-fname
219    
220     if self.cfg_fname == None:
221     if self.flag_continue:
222     self.cfg_fname = common.work_space.cfgFileName()
223     else:
224     self.cfg_fname = common.prog_name+'.cfg'
225     pass
226     pass
227    
228     # Load cfg-file
229    
230     if string.lower(self.cfg_fname) != 'none':
231     if os.path.exists(self.cfg_fname):
232     self.cfg_params = loadConfig(self.cfg_fname)
233 corvo 1.64 self.cfg_params['user'] = os.environ['USER']
234 nsmirnov 1.1 pass
235     else:
236     msg = 'cfg-file '+self.cfg_fname+' not found.'
237     raise CrabException(msg)
238     pass
239     pass
240    
241     # process the [CRAB] section
242    
243     lhp = len('CRAB.')
244     for k in self.cfg_params.keys():
245     if len(k) >= lhp and k[:lhp] == 'CRAB.':
246     opt = '-'+k[lhp:]
247     if len(opt) >= 3 and opt[:3] == '-__': continue
248     if opt not in opts.keys():
249     opts[opt] = self.cfg_params[k]
250     pass
251     pass
252     pass
253    
254     return
255    
256 nsmirnov 1.3 def processOptions_(self, opts):
257 nsmirnov 1.1 """
258     Processes the command-line options.
259     """
260    
261     for opt in opts.keys():
262     val = opts[opt]
263    
264 nsmirnov 1.3 # Skip actions, they are processed later in initializeActions_()
265     if opt in self.main_actions:
266     self.cfg_params['CRAB.'+opt[1:]] = val
267     continue
268     if opt in self.aux_actions:
269     self.cfg_params['CRAB.'+opt[1:]] = val
270     continue
271 nsmirnov 1.1
272    
273     elif ( opt == '-cfg' ):
274     pass
275    
276     elif ( opt in ('-continue', '-c') ):
277 nsmirnov 1.4 # Already processed in processContinueOption_()
278 nsmirnov 1.1 pass
279    
280     elif ( opt == '-jobtype' ):
281     if val : self.job_type_name = string.upper(val)
282     else : usage()
283     pass
284    
285     elif ( opt == '-Q' ):
286     self.flag_quiet = 1
287     pass
288    
289     elif ( opt == '-debug' ):
290 slacapra 1.6 if val: self.debug_level = int(val)
291     else: self.debug_level = 1
292 nsmirnov 1.1 pass
293    
294     elif ( opt == '-scheduler' ):
295 fanzago 1.53 if val:
296     self.scheduler_name = 'boss'
297     self.flag_useboss = 1
298 nsmirnov 1.1 else:
299     print common.prog_name+". No value for '-scheduler'."
300     usage()
301     pass
302     pass
303 slacapra 1.22
304 fanzago 1.53 # elif ( opt in ('-use_boss', '-useboss') ):
305     # if ( val == '1' ):
306     # self.scheduler_name = 'boss'
307     # pass
308     # elif ( val == '0' ):
309     # pass
310     # else:
311     # print common.prog_name+'. Bad flag for -use_boss option:',\
312     # val,'Possible values are 0(=No) or 1(=Yes)'
313     # usage()
314     # pass
315     # pass
316 fanzago 1.14
317 nsmirnov 1.3 elif string.find(opt,'.') == -1:
318     print common.prog_name+'. Unrecognized option '+opt
319     usage()
320     pass
321 nsmirnov 1.1
322 nsmirnov 1.3 # Override config parameters from INI-file with cmd-line params
323     if string.find(opt,'.') == -1 :
324     self.cfg_params['CRAB.'+opt[1:]] = val
325 nsmirnov 1.1 pass
326 nsmirnov 1.3 else:
327 nsmirnov 1.1 # Command line parameters in the form -SECTION.ENTRY=VALUE
328     self.cfg_params[opt[1:]] = val
329     pass
330     pass
331     return
332    
333 slacapra 1.8 def parseRange_(self, aRange):
334 nsmirnov 1.4 """
335 slacapra 1.8 Takes as the input a string with a range defined in any of the following
336     way, including combination, and return a tuple with the ints defined
337     according to following table. A consistency check is done.
338     NB: the first job is "1", not "0".
339     'all' -> [1,2,..., NJobs]
340     '' -> [1,2,..., NJobs]
341     'n1' -> [n1]
342     'n1-n2' -> [n1, n1+1, n1+2, ..., n2-1, n2]
343     'n1,n2' -> [n1, n2]
344     'n1,n2-n3' -> [n1, n2, n2+1, n2+2, ..., n3-1, n3]
345     """
346     result = []
347    
348 slacapra 1.9 common.logger.debug(5,"parseRange_ "+str(aRange))
349     if aRange=='all' or aRange==None or aRange=='':
350 slacapra 1.8 result=range(0,common.jobDB.nJobs())
351     return result
352 slacapra 1.9 elif aRange=='0':
353     return result
354 slacapra 1.8
355     subRanges = string.split(aRange, ',')
356     for subRange in subRanges:
357     result = result+self.parseSimpleRange_(subRange)
358    
359     if self.checkUniqueness_(result):
360     return result
361     else:
362 slacapra 1.33 common.logger.message("Error "+result)
363 slacapra 1.8 return []
364    
365     def checkUniqueness_(self, list):
366     """
367 slacapra 1.9 check if a list contains only unique elements
368 slacapra 1.8 """
369    
370     uniqueList = []
371     # use a list comprehension statement (takes a while to understand)
372    
373     [uniqueList.append(it) for it in list if not uniqueList.count(it)]
374    
375     return (len(list)==len(uniqueList))
376    
377     def parseSimpleRange_(self, aRange):
378     """
379     Takes as the input a string with two integers separated by
380     the minus sign and returns the tuple with these numbers:
381     'n1-n2' -> [n1, n1+1, n1+2, ..., n2-1, n2]
382     'n1' -> [n1]
383     """
384     (start, end) = (None, None)
385    
386     result = []
387     minus = string.find(aRange, '-')
388     if ( minus < 0 ):
389     if isInt(aRange) and int(aRange)>0:
390 fanzago 1.37 # FEDE
391     #result.append(int(aRange)-1)
392     ###
393 slacapra 1.62 result.append(int(aRange))
394 slacapra 1.8 else:
395 spiga 1.47 common.logger.message("parseSimpleRange_ ERROR "+aRange)
396     usage()
397     pass
398    
399 nsmirnov 1.4 pass
400     else:
401 slacapra 1.8 (start, end) = string.split(aRange, '-')
402     if isInt(start) and isInt(end) and int(start)>0 and int(start)<int(end):
403 spiga 1.38 #result=range(int(start)-1, int(end))
404     result=range(int(start), int(end)+1) #Daniele
405 slacapra 1.8 else:
406 slacapra 1.33 common.logger.message("parseSimpleRange_ ERROR "+start+end)
407 slacapra 1.8
408     return result
409 nsmirnov 1.4
410 nsmirnov 1.3 def initializeActions_(self, opts):
411 nsmirnov 1.1 """
412     For each user action instantiate a corresponding
413     object and put it in the action dictionary.
414     """
415 fanzago 1.53 # for opt in opts.keys():
416     # self.flag_useboss = 0
417     # if ( opt == '-use_boss'):
418     # val = opts[opt]
419     # if ( val == '1' ):
420     # self.flag_useboss = 1
421     # common.logger.message('Using BOSS')
422     # pass
423     # else:
424     # self.flag_useboss = 0
425     # pass
426 spiga 1.15
427     for opt in opts.keys():
428    
429 nsmirnov 1.1 val = opts[opt]
430 spiga 1.15
431    
432     if ( opt == '-create' ):
433 slacapra 1.22 ncjobs = 0
434 nsmirnov 1.1 if val:
435     if ( isInt(val) ):
436     ncjobs = int(val)
437     elif ( val == 'all'):
438     ncjobs = val
439     else:
440 nsmirnov 1.5 msg = 'Bad creation bunch size <'+str(val)+'>\n'
441     msg += ' Must be an integer or "all"'
442 slacapra 1.8 msg += ' Generic range is not allowed"'
443 nsmirnov 1.5 raise CrabException(msg)
444 nsmirnov 1.1 pass
445     else: ncjobs = 'all'
446    
447     if ncjobs != 0:
448 nsmirnov 1.3 # Instantiate Creator object
449 fanzago 1.56 self.creator = Creator(self.job_type_name,
450 gutsche 1.61 self.cfg_params,
451     ncjobs)
452 fanzago 1.56 self.actions[opt] = self.creator
453 nsmirnov 1.3
454     # Initialize the JobDB object if needed
455 nsmirnov 1.1 if not self.flag_continue:
456 fanzago 1.56 common.jobDB.create(self.creator.nJobs())
457 nsmirnov 1.1 pass
458 nsmirnov 1.3
459     # Create and initialize JobList
460    
461     common.job_list = JobList(common.jobDB.nJobs(),
462 fanzago 1.56 self.creator.jobType())
463 nsmirnov 1.3
464     common.job_list.setScriptNames(self.job_type_name+'.sh')
465     common.job_list.setJDLNames(self.job_type_name+'.jdl')
466 gutsche 1.66 common.job_list.setCfgNames(self.creator.jobType().configFilename())
467 slacapra 1.9
468 fanzago 1.56 self.creator.writeJobsSpecsToDB()
469 nsmirnov 1.1 pass
470 nsmirnov 1.3 pass
471 nsmirnov 1.1
472     elif ( opt == '-submit' ):
473 slacapra 1.23
474     # total jobs
475     # get the first not already submitted
476 slacapra 1.24 common.logger.debug(5,'Total jobs '+str(common.jobDB.nJobs()))
477     lastSubmittedJob=0
478     for nj in range(common.jobDB.nJobs()):
479 fanzago 1.45 if (common.jobDB.status(nj) in ['S','K','RC','Y','A','D']):
480 slacapra 1.24 lastSubmittedJob +=1
481 slacapra 1.30 else: break
482 slacapra 1.24 # count job from 1
483 slacapra 1.30 totalJobsSubmittable = common.jobDB.nJobs()-lastSubmittedJob
484 slacapra 1.24 common.logger.debug(5,'lastSubmittedJob '+str(lastSubmittedJob))
485     common.logger.debug(5,'totalJobsSubmittable '+str(totalJobsSubmittable))
486 slacapra 1.23
487 slacapra 1.24 nsjobs = lastSubmittedJob+totalJobsSubmittable
488 slacapra 1.23 # get user request
489 slacapra 1.22 if val:
490     if ( isInt(val) ):
491 slacapra 1.24 tmp = int(val)
492     if (tmp >= totalJobsSubmittable):
493     common.logger.message('asking to submit '+str(tmp)+' jobs, but only '+str(totalJobsSubmittable)+' left: submitting those')
494     pass
495     else:
496     nsjobs=lastSubmittedJob+int(val)
497 slacapra 1.23 elif (val=='all'):
498     pass
499 slacapra 1.22 else:
500     msg = 'Bad submission option <'+str(val)+'>\n'
501     msg += ' Must be an integer or "all"'
502     msg += ' Generic range is not allowed"'
503     raise CrabException(msg)
504     pass
505 slacapra 1.24 common.logger.debug(5,'nsjobs '+str(nsjobs))
506 slacapra 1.23
507     # submit N from last submitted job
508 slacapra 1.24 nj_list = range(lastSubmittedJob, nsjobs)
509     common.logger.debug(5,'nj_list '+str(nj_list))
510 nsmirnov 1.5
511     if len(nj_list) != 0:
512 nsmirnov 1.3 # Instantiate Submitter object
513 gutsche 1.61 # self.actions[opt] = Submitter(self.cfg_params, nj_list, self.job_type)
514 corvo 1.57 self.actions[opt] = Submitter(self.cfg_params, nj_list)
515 nsmirnov 1.3 # Create and initialize JobList
516     if len(common.job_list) == 0 :
517     common.job_list = JobList(common.jobDB.nJobs(),
518     None)
519     common.job_list.setJDLNames(self.job_type_name+'.jdl')
520     pass
521 nsmirnov 1.1 pass
522 slacapra 1.30 pass
523 nsmirnov 1.1
524 nsmirnov 1.4 elif ( opt == '-list' ):
525 slacapra 1.8 jobs = self.parseRange_(val)
526    
527     common.jobDB.dump(jobs)
528 nsmirnov 1.4 pass
529    
530 slacapra 1.67 elif ( opt == '-printId' ):
531     jobs = self.parseRange_(val)
532    
533     for nj in jobs:
534     #nj parte da 1 --> nj = internal_id di boss
535     st = common.jobDB.status(nj-1)
536     if st == 'S' or st == 'A':
537     id = common.scheduler.boss_SID(nj)
538 slacapra 1.68 print "Job: ",nj," Id = ", id
539 slacapra 1.67 else:
540 slacapra 1.68 print "Job: ",nj," No ID yet"
541 slacapra 1.67 pass
542    
543 nsmirnov 1.4 elif ( opt == '-status' ):
544 slacapra 1.8 jobs = self.parseRange_(val)
545    
546 slacapra 1.9 if len(jobs) != 0:
547 spiga 1.16 if ( self.flag_useboss == 1 ):
548 corvo 1.42 self.actions[opt] = StatusBoss(self.cfg_params)
549 spiga 1.15 else:
550 corvo 1.46 self.actions[opt] = Status(self.cfg_params, jobs)
551 spiga 1.15 pass
552 nsmirnov 1.1 pass
553     pass
554 slacapra 1.22
555 nsmirnov 1.4 elif ( opt == '-kill' ):
556 slacapra 1.8
557 spiga 1.31 if ( self.flag_useboss == 1 ):
558     if val:
559     if val =='all':
560 fanzago 1.37 allBoss_id = common.scheduler.listBoss()
561     jobs = allBoss_id.keys()
562 spiga 1.31 else:
563     jobs = self.parseRange_(val)
564     common.scheduler.cancel(jobs)
565     else:
566     common.logger.message("Warning: with '-kill' you _MUST_ specify a job range or 'all'")
567     else:
568     if val:
569     jobs = self.parseRange_(val)
570    
571     for nj in jobs:
572     st = common.jobDB.status(nj)
573 gutsche 1.61 if st == 'S' or st == 'A':
574 spiga 1.31 jid = common.jobDB.jobId(nj)
575     common.logger.message("Killing job # "+`(nj+1)`)
576     common.scheduler.cancel(jid)
577     common.jobDB.setStatus(nj, 'K')
578     pass
579 slacapra 1.9 pass
580 spiga 1.31
581     common.jobDB.save()
582 nsmirnov 1.4 pass
583 spiga 1.31 else:
584     common.logger.message("Warning: with '-kill' you _MUST_ specify a job range or 'all'")
585 nsmirnov 1.1
586 slacapra 1.8 elif ( opt == '-getoutput' ):
587    
588 spiga 1.20 if ( self.flag_useboss == 1 ):
589     if val=='all' or val==None or val=='':
590 fanzago 1.37 allBoss_id = common.scheduler.listBoss()
591     jobs = allBoss_id.keys()
592 spiga 1.20 else:
593     jobs = self.parseRange_(val)
594     common.scheduler.getOutput(jobs)
595     else:
596     jobs = self.parseRange_(val)
597 spiga 1.13
598 slacapra 1.22 ## also this: create a ActorClass (GetOutput)
599 spiga 1.20 jobs_done = []
600     for nj in jobs:
601     st = common.jobDB.status(nj)
602     if st == 'D':
603 slacapra 1.12 jobs_done.append(nj)
604 spiga 1.20 pass
605     elif st == 'S':
606     jid = common.jobDB.jobId(nj)
607     currStatus = common.scheduler.queryStatus(jid)
608     if currStatus=="Done":
609     jobs_done.append(nj)
610     else:
611     msg = 'Job # '+`(nj+1)`+' submitted but still status '+currStatus+' not possible to get output'
612     common.logger.message(msg)
613     pass
614 slacapra 1.9 else:
615 spiga 1.20 # common.logger.message('Jobs #'+`(nj+1)`+' has status '+st+' not possible to get output')
616     pass
617 slacapra 1.12 pass
618    
619 spiga 1.20 for nj in jobs_done:
620 spiga 1.15 jid = common.jobDB.jobId(nj)
621     dir = common.scheduler.getOutput(jid)
622     common.jobDB.setStatus(nj, 'Y')
623 slacapra 1.12
624     # Rename the directory with results to smth readable
625     new_dir = common.work_space.resDir()
626 gutsche 1.61 if ( dir != '' ) :
627     try:
628     files = os.listdir(dir)
629     for file in files:
630     os.rename(dir+'/'+file, new_dir+'/'+file)
631     os.rmdir(dir)
632     except OSError, e:
633     msg = 'rename files from '+dir+' to '+new_dir+' error: '
634     msg += str(e)
635     common.logger.message(msg)
636     # ignore error
637     pass
638 slacapra 1.12 pass
639 slacapra 1.22 ###
640    
641 spiga 1.13 resFlag = 0
642     exCode = common.scheduler.getExitStatus(jid)
643 spiga 1.34 Statistic.Monitor('retrieved',resFlag,jid,exCode)
644 slacapra 1.12
645     msg = 'Results of Job # '+`(nj+1)`+' are in '+new_dir
646     common.logger.message(msg)
647 nsmirnov 1.4 pass
648    
649     common.jobDB.save()
650     pass
651    
652     elif ( opt == '-resubmit' ):
653 fanzago 1.37 if ( self.flag_useboss == 1 ):
654     if val=='all' or val==None or val=='':
655     allBoss_id = common.scheduler.listBoss()
656     jobs = allBoss_id.keys()
657     else:
658     jobs = self.parseRange_(val)
659     else:
660     if val:
661     jobs = self.parseRange_(val)
662    
663 slacapra 1.11 if val:
664     # create a list of jobs to be resubmitted.
665 slacapra 1.8
666 slacapra 1.22 ### as before, create a Resubmittter Class
667 spiga 1.60 allBoss_id = common.scheduler.listBoss()
668     maxIndex = allBoss_id.keys()
669 slacapra 1.11 nj_list = []
670     for nj in jobs:
671 spiga 1.60 if int(nj) <= int(len(maxIndex)) :
672     st = common.jobDB.status(int(nj)-1)
673     if st in ['K','A']:
674     nj_list.append(int(nj)-1)
675     common.jobDB.setStatus(int(nj)-1,'C')
676     elif st == 'Y':
677     common.scheduler.moveOutput(nj)
678     nj_list.append(int(nj)-1)
679     st = common.jobDB.setStatus(int(nj)-1,'RC')
680     elif st in ['C','X']:
681 gutsche 1.61 common.logger.message('Job #'+`int(nj)`+' has status '+crabJobStatusToString(st)+' not yet submitted!!!')
682 spiga 1.60 pass
683     elif st == 'D':
684     common.logger.message('Job #'+`int(nj)`+' has status '+crabJobStatusToString(st)+' must be retrieved before resubmission')
685     else:
686     common.logger.message('Job #'+`nj`+' has status '+crabJobStatusToString(st)+' must be "killed" before resubmission')
687 slacapra 1.11 else:
688 gutsche 1.61 common.logger.message('Job #'+`int(nj)`+' no possible to resubmit!! out of range')
689 fanzago 1.37 if len(common.job_list) == 0 :
690     common.job_list = JobList(common.jobDB.nJobs(),None)
691     common.job_list.setJDLNames(self.job_type_name+'.jdl')
692     pass
693    
694 slacapra 1.11 if len(nj_list) != 0:
695 fanzago 1.37 common.scheduler.resubmit(nj_list)
696 slacapra 1.11 # Instantiate Submitter object
697 fanzago 1.58 self.actions[opt] = Submitter(self.cfg_params, nj_list)
698 slacapra 1.11
699 slacapra 1.8 pass
700     pass
701 slacapra 1.11 else:
702     common.logger.message("Warning: with '-resubmit' you _MUST_ specify a job range or 'all'")
703 spiga 1.60 common.logger.message("WARNING: _all_ job specified in the range will be resubmitted!!!")
704 slacapra 1.11 pass
705 fanzago 1.37 common.jobDB.save()
706 slacapra 1.8 pass
707 gutsche 1.61
708 slacapra 1.8 elif ( opt == '-cancelAndResubmit' ):
709 nsmirnov 1.5
710 spiga 1.47 if ( self.flag_useboss == 1 ):
711     if val:
712     if val =='all':
713     allBoss_id = common.scheduler.listBoss()
714     jobs = allBoss_id.keys()
715     else:
716     jobs = self.parseRange_(val)
717     # kill submitted jobs
718     common.scheduler.cancel(jobs)
719     else:
720     common.logger.message("Warning: with '-cancelAndResubmit' you _MUST_ specify a job range or 'all'")
721     else:
722     if val:
723     jobs = self.parseRange_(val)
724     else:
725     common.logger.message("Warning: with '-kill' you _MUST_ specify a job range or 'all'")
726 nsmirnov 1.5 pass
727    
728 spiga 1.47 # resubmit cancelled jobs.
729     if val:
730     nj_list = []
731     for nj in jobs:
732     if ( self.flag_useboss != 1 ):
733     st = common.jobDB.status(nj)
734     if st == 'S':
735     jid = common.jobDB.jobId(nj)
736     common.scheduler.cancel(jid)
737     st = 'K'
738     common.jobDB.setStatus(nj, st)
739     pass
740     common.jobDB.save()
741     pass
742     st = common.jobDB.status(int(nj)-1)
743     if st in ['K','A']:
744     nj_list.append(int(nj)-1)
745     common.jobDB.setStatus(int(nj)-1,'C')
746     elif st == 'Y':
747     common.scheduler.moveOutput(nj)
748     nj_list.append(int(nj)-1)
749     st = common.jobDB.setStatus(int(nj)-1,'RC')
750     elif st in ['C','X']:
751     common.logger.message('Job #'+`int(nj)`+' has status '+crabJobStatusToString(st)+' not yet submitted!!!')
752     pass
753     elif st == 'D':
754     common.logger.message('Job #'+`int(nj)`+' has status '+crabJobStatusToString(st)+' must be retrieved before resubmission')
755     else:
756     common.logger.message('Job #'+`nj`+' has status '+crabJobStatusToString(st)+' must be "killed" before resubmission')
757     pass
758    
759 nsmirnov 1.5 if len(common.job_list) == 0 :
760 spiga 1.47 common.job_list = JobList(common.jobDB.nJobs(),None)
761 nsmirnov 1.5 common.job_list.setJDLNames(self.job_type_name+'.jdl')
762     pass
763 spiga 1.47
764     if len(nj_list) != 0:
765     common.scheduler.resubmit(nj_list)
766 fanzago 1.58 self.actions[opt] = Submitter(self.cfg_params, nj_list)
767 spiga 1.47 pass
768     pass
769     else:
770     common.logger.message("WARNING: _all_ job specified in the rage will be cancelled and resubmitted!!!")
771 nsmirnov 1.5 pass
772 spiga 1.47 common.jobDB.save()
773 nsmirnov 1.4 pass
774 spiga 1.47
775 slacapra 1.28 elif ( opt == '-testJdl' ):
776 slacapra 1.8 jobs = self.parseRange_(val)
777     nj_list = []
778     for nj in jobs:
779 slacapra 1.62 st = common.jobDB.status(nj-1)
780     if st == 'C': nj_list.append(nj-1)
781 slacapra 1.8 pass
782    
783     if len(nj_list) != 0:
784     # Instantiate Submitter object
785     self.actions[opt] = Checker(self.cfg_params, nj_list)
786    
787     # Create and initialize JobList
788    
789     if len(common.job_list) == 0 :
790     common.job_list = JobList(common.jobDB.nJobs(), None)
791     common.job_list.setJDLNames(self.job_type_name+'.jdl')
792     pass
793     pass
794    
795 slacapra 1.9 elif ( opt == '-postMortem' ):
796     jobs = self.parseRange_(val)
797     nj_list = []
798     for nj in jobs:
799 fanzago 1.45 # fede: nj scala di uno perche' e' l'internal id di boss
800     # ed il jobDB parte da zero ...
801     st = common.jobDB.status(int(nj)-1)
802     if st not in ['X', 'C']: nj_list.append(int(nj))
803 slacapra 1.9 pass
804    
805     if len(nj_list) != 0:
806     # Instantiate Submitter object
807 spiga 1.35 self.actions[opt] = PostMortem(self.cfg_params, nj_list,self.flag_useboss)
808 slacapra 1.9
809     # Create and initialize JobList
810    
811     if len(common.job_list) == 0 :
812     common.job_list = JobList(common.jobDB.nJobs(), None)
813     common.job_list.setJDLNames(self.job_type_name+'.jdl')
814     pass
815     pass
816 slacapra 1.33 else:
817     common.logger.message("No jobs to analyze")
818 slacapra 1.9
819     elif ( opt == '-clean' ):
820     if val != None:
821     raise CrabException("No range allowed for '-clean'")
822    
823 corvo 1.46 theCleaner = Cleaner(self.scheduler_name == 'boss', self.cfg_params)
824 slacapra 1.36 theCleaner.clean()
825 slacapra 1.9
826 nsmirnov 1.1 pass
827     return
828    
829 nsmirnov 1.3 def createWorkingSpace_(self):
830 slacapra 1.9 new_dir = ''
831    
832     try:
833     new_dir = self.cfg_params['USER.ui_working_dir']
834 corvo 1.50 self.cfg_params['taskId'] = self.cfg_params['user'] + '_' + string.split(new_dir, '/')[len(string.split(new_dir, '/')) - 1] + '_' + self.current_time
835 fanzago 1.48 if os.path.exists(new_dir):
836     if os.listdir(new_dir):
837     msg = new_dir + ' already exists and is not empty. Please remove it before create new task'
838     raise CrabException(msg)
839 slacapra 1.9 except KeyError:
840     new_dir = common.prog_name + '_' + self.name + '_' + self.current_time
841 corvo 1.50 self.cfg_params['taskId'] = self.cfg_params['user'] + '_' + new_dir
842 slacapra 1.9 new_dir = self.cwd + new_dir
843     pass
844 fanzago 1.49 common.work_space = WorkSpace(new_dir, self.cfg_params)
845 nsmirnov 1.1 common.work_space.create()
846     return
847    
848 nsmirnov 1.3 def loadConfiguration_(self, opts):
849 nsmirnov 1.1
850     save_opts = common.work_space.loadSavedOptions()
851    
852     # Override saved options with new command-line options
853    
854     for k in opts.keys():
855     save_opts[k] = opts[k]
856     pass
857    
858     # Return updated options
859     return save_opts
860    
861 nsmirnov 1.3 def createLogger_(self, args):
862 nsmirnov 1.1
863     log = Logger()
864     log.quiet(self.flag_quiet)
865     log.setDebugLevel(self.debug_level)
866     log.write(args+'\n')
867 nsmirnov 1.3 log.message(self.headerString_())
868 nsmirnov 1.1 log.flush()
869     common.logger = log
870     return
871    
872 nsmirnov 1.3 def updateHistory_(self, args):
873 nsmirnov 1.1 history_fname = common.prog_name+'.history'
874     history_file = open(history_fname, 'a')
875     history_file.write(self.current_time+': '+args+'\n')
876     history_file.close()
877     return
878    
879 nsmirnov 1.3 def headerString_(self):
880 nsmirnov 1.1 """
881     Creates a string describing program options either given in
882     the command line or their default values.
883     """
884     header = common.prog_name + ' (version ' + common.prog_version_str + \
885     ') running on ' + \
886     time.ctime(time.time())+'\n\n' + \
887     common.prog_name+'. Working options:\n'
888 fanzago 1.59 #print self.job_type_name
889 nsmirnov 1.1 header = header +\
890     ' scheduler ' + self.scheduler_name + '\n'+\
891     ' job type ' + self.job_type_name + '\n'+\
892     ' working directory ' + common.work_space.topDir()\
893     + '\n'
894     return header
895    
896 nsmirnov 1.3 def createScheduler_(self):
897 nsmirnov 1.1 """
898     Creates a scheduler object instantiated by its name.
899     """
900     klass_name = 'Scheduler' + string.capitalize(self.scheduler_name)
901     file_name = klass_name
902     try:
903     klass = importName(file_name, klass_name)
904     except KeyError:
905     msg = 'No `class '+klass_name+'` found in file `'+file_name+'.py`'
906     raise CrabException(msg)
907     except ImportError, e:
908     msg = 'Cannot create scheduler '+self.scheduler_name
909     msg += ' (file: '+file_name+', class '+klass_name+'):\n'
910     msg += str(e)
911     raise CrabException(msg)
912    
913     common.scheduler = klass()
914     common.scheduler.configure(self.cfg_params)
915     return
916    
917 corvo 1.55 # def createJobtype_(self):
918     # """
919     # Create the jobtype specified in the crab.cfg file
920     # """
921     # file_name = 'cms_'+ string.lower(self.job_type_name)
922     # klass_name = string.capitalize(self.job_type_name)
923 gutsche 1.61 #
924 corvo 1.55 # try:
925     # klass = importName(file_name, klass_name)
926     # except KeyError:
927     # msg = 'No `class '+klass_name+'` found in file `'+file_name+'.py`'
928     # raise CrabException(msg)
929     # except ImportError, e:
930     # msg = 'Cannot create job type '+self.job_type_name
931     # msg += ' (file: '+file_name+', class '+klass_name+'):\n'
932     # msg += str(e)
933     # raise CrabException(msg)
934     # job_type = klass(self.cfg_params)
935     # return job_type
936 corvo 1.51
937 nsmirnov 1.1 def run(self):
938     """
939     For each
940     """
941    
942     for act in self.main_actions:
943     if act in self.actions.keys(): self.actions[act].run()
944     pass
945    
946     for act in self.aux_actions:
947     if act in self.actions.keys(): self.actions[act].run()
948     pass
949     return
950    
951     ###########################################################################
952     def processHelpOptions(opts):
953    
954 slacapra 1.11 if len(opts):
955     for opt in opts.keys():
956     if opt in ('-v', '-version', '--version') :
957     print Crab.version()
958     return 1
959     if opt in ('-h','-help','--help') :
960     if opts[opt] : help(opts[opt])
961     else: help()
962     return 1
963     else:
964     usage()
965 nsmirnov 1.1
966     return 0
967    
968     ###########################################################################
969     if __name__ == '__main__':
970    
971 fanzago 1.18
972 fanzago 1.19 # # Initial settings for Python modules. Avoid appending manually lib paths.
973     # try:
974     # path=os.environ['EDG_WL_LOCATION']
975     # except:
976     # print "Error: Please set the EDG_WL_LOCATION environment variable pointing to the userinterface installation path"
977     # sys.exit(1)
978     #
979     # libPath=os.path.join(path, "lib")
980     # sys.path.append(libPath)
981     # libPath=os.path.join(path, "lib", "python")
982     # sys.path.append(libPath)
983 fanzago 1.18
984    
985 nsmirnov 1.1 # Parse command-line options and create a dictionary with
986     # key-value pairs.
987    
988     options = parseOptions(sys.argv[1:])
989    
990     # Process "help" options, such as '-help', '-version'
991    
992 slacapra 1.11 if processHelpOptions(options) : sys.exit(0)
993 nsmirnov 1.1
994     # Create, initialize, and run a Crab object
995    
996     try:
997 nsmirnov 1.3 crab = Crab(options)
998 nsmirnov 1.1 crab.run()
999 corvo 1.54 crab.cfg_params['apmon'].free()
1000 nsmirnov 1.1 except CrabException, e:
1001     print '\n' + common.prog_name + ': ' + str(e) + '\n'
1002     if common.logger:
1003     common.logger.write('ERROR: '+str(e)+'\n')
1004     pass
1005     pass
1006    
1007     pass