ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/UserCode/MitProd/Processing/python/task.py
Revision: 1.5
Committed: Fri Jul 30 18:41:11 2010 UTC (14 years, 9 months ago) by paus
Content type: text/x-python
Branch: MAIN
CVS Tags: Mit_020pre1, Mit_018, Mit_017, Mit_017pre3, Mit_017pre2, Mit_017pre1, Mit_016, Mit_015b, Mit_015a, Mit_015, Mit_014e, Mit_014d
Changes since 1.4: +1 -1 lines
Log Message:
Cleaned up and updated version.

File Contents

# User Rev Content
1 paus 1.2 #---------------------------------------------------------------------------------------------------
2     # Python Module File to describe CRAB tasks and the corresponding job stati
3     #
4     # Author: C.Paus (Oct 10, 2008)
5     #---------------------------------------------------------------------------------------------------
6     import os,sys,getopt,re,string
7    
8     #---------------------------------------------------------------------------------------------------
9     """
10     Class: SubTask(tag,)
11     Each SubTask in CRAB can be described through this class
12     """
13     #---------------------------------------------------------------------------------------------------
14     class SubTask:
15     "Description of a SubTask in CRAB"
16     # variable to be determined
17     index = -1
18     lfnFile = 'undefined' # file containing the LFNs to be processed
19     nSubTaskLfn = -1 # number of LFNs in this subtask
20     #-----------------------------------------------------------------------------------------------
21     # constructor to connect with existing setup
22     #-----------------------------------------------------------------------------------------------
23     def __init__(self,index,lfnFile):
24     self.index = index
25     self.lfnFile = lfnFile
26     self.nSubTaskLfn = 0
27    
28     #-----------------------------------------------------------------------------------------------
29     # present the current crab subtask
30     #-----------------------------------------------------------------------------------------------
31     def show(self):
32     print ' SubTask (Idx: %d, LfnFile: %s) ===='%(self.index,self.lfnFile)
33    
34     #-----------------------------------------------------------------------------------------------
35     # subtask tag
36     #-----------------------------------------------------------------------------------------------
37     def tag(self):
38     return "%04d"%self.index
39    
40     #---------------------------------------------------------------------------------------------------
41     """
42     Class: Task(tag,cmsDataset='undefined',mitCfg='undefined',mitVersion='undefined')
43     Each task in CRAB can be described through this class
44     """
45     #---------------------------------------------------------------------------------------------------
46     class Task:
47     "Description of a Task in CRAB"
48     # this is sufficient to do anything
49     tag = 'undefined'
50     # from actual crab configuration directly
51     storageEle = 'undefined'
52     storagePath = 'undefined'
53     cmsDataset = 'undefined'
54     nEvents = -1
55     nTotalEvts = -1
56     # MIT specific stuff
57     mitCfg = 'undefined'
58     mitVersion = 'undefined'
59     mitDataset = 'undefined'
60     localPath = 'undefined'
61     # status of task as a whole and each individual job
62     status = 'undefined' # 'undefined', ....
63     #
64     # 'undefined': initial status, not yet even checked
65     # 'cataloged': all jobs of the tasks have completed and are cataloged successfully
66     # 'completed': all jobs of the tasks have completed successfully
67     # 'finished' : all jobs have run, but unsubmitted jobs, errors or aborts might have occured
68     # 'active' : some jobs are either in Done, Running or to be Run
69     #
70     jobStati = []
71     failingSites = {}
72     lfns = {}
73     blocks = {}
74     # subtasks
75     nSubTaskLfnMax = 400 # maximum number of LFNs in a subtask
76     subTasks = [] # list of subtasks
77     #-----------------------------------------------------------------------------------------------
78     # constructor to connect with existing setup
79     #-----------------------------------------------------------------------------------------------
80     def __init__(self,tag,cmsDataset='undefined',mitCfg='undefined',mitVersion='undefined'):
81     self.tag = tag
82     self.status = 'undefined'
83     if tag == 'new':
84     self.new(cmsDataset,mitCfg,mitVersion)
85     elif not os.path.exists(tag):
86     self.new(cmsDataset,mitCfg,mitVersion)
87     if os.path.exists('crab.cfg'):
88     print ' Loading configuration from local crab.cfg'
89     cmd = 'cat crab.cfg | grep ^dataset| cut -d= -f2| tr -d \' \''
90     for line in os.popen(cmd).readlines(): # run command
91     self.cmsDataset = line[:-1] # strip '\n'
92     cmd = 'cat crab.cfg | grep ^storage_element| cut -d= -f2| tr -d \' \''
93     for line in os.popen(cmd).readlines(): # run command
94     self.storageEle = line[:-1] # strip '\n'
95     cmd = 'cat crab.cfg | grep ^storage_path| cut -d= -f2-3| tr -d \' \''
96     for line in os.popen(cmd).readlines(): # run command
97     self.storagePath = line[:-1] # strip '\n'
98     cmd = 'cat crab.cfg | grep ^user_remote_dir| cut -d= -f2-3| tr -d \' \''
99     for line in os.popen(cmd).readlines(): # run command
100     self.storagePath += line[:-1] # strip '\n'
101     f = (self.storagePath).split('/')
102     if re.search('crab_0_',f[-1]):
103     self.mitDataset = f[-2]
104     self.mitVersion = f[-3]
105     self.mitCfg = f[-4]
106     else:
107     self.mitDataset = f[-1]
108     self.mitVersion = f[-2]
109     self.mitCfg = f[-3]
110     else:
111     cmd = 'cat ' + tag + '/share/crab.cfg | grep ^dataset| cut -d= -f2| tr -d \' \''
112     for line in os.popen(cmd).readlines(): # run command
113     self.cmsDataset = line[:-1] # strip '\n'
114     cmd = 'cat ' + tag + '/share/crab.cfg | grep ^storage_element| cut -d= -f2| tr -d \' \''
115     for line in os.popen(cmd).readlines(): # run command
116     self.storageEle = line[:-1] # strip '\n'
117     cmd = 'cat ' + tag + '/share/crab.cfg | grep ^storage_path| cut -d= -f2-3| tr -d \' \''
118     for line in os.popen(cmd).readlines(): # run command
119     self.storagePath = line[:-1] # strip '\n'
120     cmd = 'cat '+tag+'/share/crab.cfg | grep ^user_remote_dir| cut -d= -f2-3| tr -d \' \''
121     for line in os.popen(cmd).readlines(): # run command
122     self.storagePath += line[:-1] # strip '\n'
123     f = (self.storagePath).split('/')
124     if re.search('crab_0_',f[-1]):
125     self.mitDataset = f[-2]
126     self.mitVersion = f[-3]
127     self.mitCfg = f[-4]
128     else:
129     self.mitDataset = f[-1]
130     self.mitVersion = f[-2]
131     self.mitCfg = f[-3]
132    
133     #-----------------------------------------------------------------------------------------------
134     # constructor for new creation
135     #-----------------------------------------------------------------------------------------------
136     def new(self,cmsDataset,mitCfg,mitVersion):
137     self.cmsDataset = cmsDataset
138     self.mitCfg = mitCfg
139     self.mitVersion = mitVersion
140     self.status = 'undefined'
141     # derive the missing parameters
142     cmsswCfg = 'cmssw.cfg'
143 paus 1.3 crabFile = os.environ['MIT_PROD_DIR'] + '/' + mitCfg + '/' + mitVersion + '/' + 'crab.cfg'
144 paus 1.2 if not os.path.exists(crabFile):
145     cmd = "Crab file not found: %s" % crabFile
146     raise RuntimeError, cmd
147 paus 1.3 cmsswFile = os.environ['MIT_PROD_DIR'] + '/' + mitCfg + '/' + mitVersion + '/' + cmsswCfg
148 paus 1.2 if not os.path.exists(cmsswFile):
149     cmd = "Cmssw file not found: %s" % cmsswFile
150     cmsswCfg = 'cmssw.py'
151 paus 1.3 cmsswFile = os.environ['MIT_PROD_DIR'] + '/' + mitCfg + '/' + mitVersion + '/' + cmsswCfg
152 paus 1.2 if not os.path.exists(cmsswFile):
153     cmd = "Cmssw file not found: %s" % cmsswFile
154     cmd = " XXXX ERROR no valid configuration found XXXX"
155     raise RuntimeError, cmd
156     # resolve the other mitCfg parameters from the configuration file
157 paus 1.3 cmd = 'cat ' + os.environ['MIT_PROD_DIR'] + '/' + \
158     mitCfg + '/' + mitVersion + '/' + 'Productions'
159 paus 1.2 join = 0
160     mitDataset = ""
161     fullLine = ""
162     bSlash = "\\";
163     for line in os.popen(cmd).readlines(): # run command
164     line = line[:-1]
165     # get ride of empty or commented lines
166     if line == '' or line[0] == '#':
167     continue
168     # join lines
169     if join == 1:
170     fullLine += line
171     else:
172     fullLine = line
173     # determine if finished or more is coming
174     if fullLine[-1] == bSlash:
175     join = 1
176     fullLine = fullLine[:-1]
177     else:
178     join = 0
179     # test whether there is a directory
180     names = fullLine.split() # splitting every blank
181     if names[0] == self.cmsDataset:
182     self.mitDataset = names[1] # the equivalent MIT name of the dataset
183     self.nEvents = int(names[2]) # number of events to be used in the production
184     if names[4] != "-":
185     self.localPath = names[4]
186     print "\n Sample Info: " + fullLine + "\n"
187     # find the forseen storage place
188     cmd = 'grep ^storage_element ' + crabFile
189     for file in os.popen(cmd).readlines(): # run command
190     line = file[:-1] # strip '\n'
191     # decode the storage element name
192     names = line.split("=") # splitting every '='
193     self.storageEle = names.pop()
194     self.storageEle = re.sub("\s", "",self.storageEle)
195     cmd = 'grep ^storage_path ' + crabFile
196     for line in os.popen(cmd).readlines(): # run command
197     line = line[:-1] # strip '\n'
198     # decode the storage directory name
199     names = line.split("=") # splitting every '='
200     names = names[1:]
201     self.storagePath = "=".join(names)
202     names = self.storagePath.split('/')
203     names.pop(); names.pop(); names.pop();
204     names.append(self.mitCfg); names.append(self.mitVersion); names.append(self.mitDataset)
205     self.storagePath = '/'.join(names)
206     self.storagePath = re.sub("\s","",self.storagePath)
207     cmd = 'grep ^user_remote_dir ' + crabFile
208     for line in os.popen(cmd).readlines(): # run command
209     line = line[:-1] # strip '\n'
210     # decode the storage directory name
211     names = line.split(" ") # splitting every ' '
212     self.storagePath += names[-1]
213    
214     #-----------------------------------------------------------------------------------------------
215     # present the current crab task
216     #-----------------------------------------------------------------------------------------------
217     def show(self):
218     print ' ==== CRAB Task Information (%s, %s, %s) ===='%(self.tag,self.mitCfg,self.mitVersion)
219     print ' Dataset: ' + self.cmsDataset + ' (' + self.mitDataset + ')'
220     print ' Storage: %s @ %s'%(self.storagePath,self.storageEle)
221     print ' List of sub tasks to be completed: '
222     for subTask in self.subTasks:
223     subTask.show()
224     print ' '
225    
226     #-----------------------------------------------------------------------------------------------
227     # create all subtasks of the tasks
228     #-----------------------------------------------------------------------------------------------
229     def createSubTasks(self,lfnFile):
230     print ' creating subtasks'
231     # loop through the missing lfn file and create subtasks each nSubTaskEvents
232     cmd = 'cat ' + lfnFile
233     iLine = 0
234     index = 0
235     output = open("/tmp/tmp.bak",'w')
236     for line in os.popen(cmd).readlines(): # run command
237     iLine += 1
238     # open file as needed
239     if iLine % self.nSubTaskLfnMax == 1:
240     if output:
241     output.close()
242     index += 1
243     file = lfnFile + '_%04d' % index
244     output = open(file,'w')
245     subTask = SubTask(index,file)
246     # add this subtaks to the list
247     self.subTasks.append(subTask)
248     # one more lfn entry for this sub task
249     output.write(line)
250     subTask.nSubTaskLfn += 1
251    
252     # closeup the last subtask
253     output.close()
254    
255     print ' '
256     self.show()
257    
258     #-----------------------------------------------------------------------------------------------
259     # load all lfns relevant to this task
260     #-----------------------------------------------------------------------------------------------
261     def loadAllLfns(self, lfnFile):
262     # initialize from scratch
263     self.lfns = {}
264     self.blocks = {}
265     self.nTotalEvts = 0
266     # use the complete lfn file list
267     cmd = 'cat ' + lfnFile
268     for line in os.popen(cmd).readlines(): # run command
269     line = line[:-1]
270     # get ride of empty or commented lines
271     if line == '' or line[0] == '#':
272     continue
273    
274     # decoding the input line
275     f = line.split() # splitting every blank
276     block = f[0]
277     file = f[1]
278     nEvents = int(f[2])
279     self.nTotalEvts += nEvents
280    
281     f = file.split('/')
282     file = f[-1]
283    
284 paus 1.4 if file in self.lfns.keys():
285     self.lfns[file] = 1
286     else:
287     #print ' Add: >' + file + '<'
288 paus 1.2 self.lfns[file] = 0
289    
290     if not self.blocks.get(block):
291     self.blocks[block] = 1
292     else:
293     self.blocks[block] += 1
294    
295     print ' TOTAL - Lfns: %6d [ Blocks: %4d Events: %9d ]'\
296     %(len(self.lfns),len(self.blocks),self.nTotalEvts)
297    
298     #-----------------------------------------------------------------------------------------------
299     # load all lfns so far completed relevant to this task
300     #-----------------------------------------------------------------------------------------------
301     def loadCompletedLfns(self):
302     # initialize from scratch
303     self.nLfnDone = 0
304     # find all already existing files
305     f = self.storagePath.split('=')
306     path = f[-1]
307     if re.search('crab_0_',path) or re.search('CRAB',path):
308     f = path.split('/')
309     f = f[:-1]
310     path = '/'.join(f)
311     cmd = 'list ' + path + ' | grep root 2> /dev/null'
312     for line in os.popen(cmd).readlines(): # run command
313     f = line.split()
314     file = f[1]
315 paus 1.4 if file in self.lfns.keys():
316     self.lfns[file] = 1
317     else:
318     print ' ERROR -- found completed lfn not in list of all lfns?! ->' + file + '<-'
319 paus 1.2 self.lfns[file] = 2
320     self.nLfnDone += 1
321     print ' DONE - Lfns: %6d'%(self.nLfnDone)
322    
323     #-----------------------------------------------------------------------------------------------
324     # load all lfns relevant to this task
325     #-----------------------------------------------------------------------------------------------
326     def createMissingLfns(self, lfnFile, restLfnFile = "remaining.lfns"):
327     # fill the remaining lfns from complete database
328     self.status = 'cataloged'
329     self.nLfnMissing = 0
330     cmd = 'rm -rf ' + restLfnFile + '; touch ' + restLfnFile
331     os.system(cmd)
332     for lfn,status in self.lfns.iteritems():
333     if status == 0:
334     # add this lfn to the file
335     self.nLfnMissing += 1
336     cmd = 'grep ' + lfn + ' ' + lfnFile + ' >> ' + restLfnFile
337     #print ' CMD ' + cmd
338     os.system(cmd)
339     else:
340     self.status = 'undefined'
341    
342     # it is important to sort them (by first column == block)
343     cmd = 'sort -u ' + restLfnFile + ' > /tmp/cache' + ' ; mv /tmp/cache '+ restLfnFile
344     os.system(cmd)
345    
346     print ' MISSING - Lfns: %6d'%(self.nLfnMissing)
347    
348     #-----------------------------------------------------------------------------------------------
349     # create an inventory of all the existing output files
350     #-----------------------------------------------------------------------------------------------
351     def makeInventory(self):
352     castorPath = self.storagePath
353     f = castorPath.split("=")
354     castorPath = f[1]
355     #cmd = "srmls srm://" + self.storageEle + ":8443" + self.storagePath + " | grep " \
356     # + self.mitDataset + "_"
357     cmd = "list " + castorPath + " | grep root | cut -d ' ' -f2"
358    
359     print " cmd: " + cmd
360     for status in self.jobStati:
361     status.outputFile = 0
362     for line in os.popen(cmd).readlines(): # run directory list command
363     #print " line: " + line
364     line = line[:-1] # strip '\n'
365     f = line.split("_")
366     number = f.pop()
367     f = number.split(".")
368     number = int(f[0])
369     # update the job status
370     #print ' Index: %d >> %s'%(number,line)
371     if number-1 >= len(self.jobStati):
372     print ' Index out of range requested: %d Waiting for the CRASH!'%(number)
373     self.jobStati[number-1].outputFile = 1
374     #-----------------------------------------------------------------------------------------------
375     # get crab status information for each job in the task
376     #-----------------------------------------------------------------------------------------------
377     def getJobStati(self):
378     # Interact with crab to determine the present status of the jobs
379     pattern = 'Created'
380     # result = re.search(pattern,line)
381    
382     active = 0
383     self.jobStati = []
384     self.failingSites = {}
385     cmd = 'crab -status -continue ' + self.tag
386     #print 'Access Crab Job Stati, now!'
387     for line in os.popen(cmd).readlines(): # run command
388     line = line[:-1] # strip '\n'
389     if active == 0:
390     print ' CRAB: ' + line
391     else:
392     if not re.search(pattern,line) and not re.search('------',line):
393     print ' CRAB: ' + line
394    
395     #>> # compactify line
396     #>> line = " ".join(str(line).split()).strip()
397     #>> f = line.split(" ")
398     # decide whether we are in job status line or not
399     if line[1:5] == "----":
400     if active == 0:
401     active = 1 # print "Activated parsing"
402     continue
403     if active == 1 and line == '':
404     active = 0 # print "Deactivated parsing"
405     # parse the content of the job report
406     if active == 1:
407     #>> #print ' LINE: ' + line
408     #>> status = JobStatus(int(f[0]),f[1])
409     #>> if len(f) > 2:
410     #>> status.ce = f[2]
411     #>>
412     #>> if len(f) >= 2 and f[1] == 'Retrieved':
413     #>> if len(f) > 5:
414     #>> status.exitCode = int(f[3])
415     #>> status.exitStatus = int(f[4])
416    
417     # fixed column read
418     ##ID STATUS E_HOST EXE_EXIT_CODE JOB_EXIT_STATUS
419     #lastChar = len(line)
420     #print 'Last Char: %d'%lastChar
421     iJob = int(line[0:5].strip())
422     sJob = line[7:24].strip()
423     status = JobStatus(iJob,sJob)
424     status.ce = line[26:61].strip()
425     tmp = line[63:75].strip()
426     if tmp != '':
427     status.exitCode = int(tmp)
428     tmp = line[77:91].strip()
429     if tmp != '':
430     status.exitStatus = int(tmp)
431    
432     #print ' Appending: id %d array entry: %d '%(iJob,len(self.jobStati))
433     self.jobStati.append(status)
434    
435     # review job output so far
436     if len(self.jobStati) > 0:
437     self.makeInventory()
438     else:
439     print ' ERROR - This task has not jobs stati assigned to it. Something is wrong.'
440     print ' crab task id: ' + self.tag
441    
442    
443     # Make sure certain cases get fixed to avoid deletion
444     for status in self.jobStati:
445     # fix those jobs where the output has already been found
446     if status.exitStatus == 60303 and status.outputFile == 1:
447     status.exitStatus = 0
448     elif status.exitStatus == -1 and status.exitCode == -1 and status.outputFile == 1:
449     status.exitStatus = 0
450     status.exitCode = 0
451    
452     elif (status.exitStatus != 0 or status.exitCode != 0) and \
453     not (status.exitStatus == -1 and status.exitCode == -1):
454    
455     print " ==> Failure: filing with status/code: %d %d CE: %s" \
456     %(status.exitStatus,status.exitCode,status.ce)
457    
458     if status.ce in self.failingSites:
459     self.failingSites[status.ce] += 1
460     else:
461     self.failingSites[status.ce] = 1
462    
463     #print ' Dimension of Job Stati: %d'%(len(self.jobStati))
464    
465     # Loop through the job stati and determine the task status
466     # - check whether task is completed
467     active = 0
468     for status in self.jobStati:
469 paus 1.5 if status.tag != 'Aborted' and status.tag != 'Retrieved' and status.tag != 'Created' and status.tag != 'Cleared':
470 paus 1.2 active = 1
471     break
472     if active == 0:
473     self.status = 'completed'
474     for status in self.jobStati:
475     if status.tag == 'Aborted' or status.exitCode != 0 or status.exitStatus != 0:
476     self.status = 'finished'
477     break
478     else:
479     self.status = 'active'
480    
481    
482    
483     #-----------------------------------------------------------------------------------------------
484     # print the line to complete the task
485     #-----------------------------------------------------------------------------------------------
486     def complete(self):
487     print ' ./bin/submit.py --noTestJob --complete --version=008 --mitDataset=%s'% \
488     (self.mitDataset)
489    
490     #-----------------------------------------------------------------------------------------------
491     # print the line to remove the task and do it if requested
492     #-----------------------------------------------------------------------------------------------
493     def remove(self,clean=0):
494     cmd = ' cleanupLog.py --crabId ' + self.tag + \
495     '; mkdir -p ./completed; mv ' + self.tag + ' ./completed/'
496     print ' -> ' + cmd
497     if clean == 1:
498     os.system(cmd)
499    
500     #-----------------------------------------------------------------------------------------------
501     # print the line to remove the task and do it if requested
502     #-----------------------------------------------------------------------------------------------
503     def killAndRemove(self,clean=0):
504     cmd = 'crab -kill all -continue ' +self.tag + '; cleanupLog.py --crabId=' + self.tag \
505     + '; mkdir -p ./completed; mv ' + self.tag + ' ./completed/'
506     print ' -> ' + cmd
507     if clean == 1:
508     os.system(cmd)
509    
510     #---------------------------------------------------------------------------------------------------
511     """
512     Class: JobStatus(index,tag)
513     The status of one job of a list of CRAB jobs (inside one task) is described by this class
514     """
515     #---------------------------------------------------------------------------------------------------
516     class JobStatus:
517     "Minimal but sufficient Job Status for crab operations"
518     index = -1
519     tag = 'undefined'
520     ce = 'undefined'
521     outputFile = -1
522     exitCode = -1
523     exitStatus = -1
524     #-----------------------------------------------------------------------------------------------
525     # constructor
526     #-----------------------------------------------------------------------------------------------
527     def __init__(self, index, tag):
528     self.index = index
529     self.tag = tag
530     #-----------------------------------------------------------------------------------------------
531     # present the current crab task in compact form
532     #-----------------------------------------------------------------------------------------------
533     def showCompact(self):
534     print 'Status: %6d %20s Output: %2d Exit: %6d,%6d at CE: %s'% \
535     (self.index,self.tag,self.outputFile,self.exitCode, \
536     self.exitStatus,self.ce)
537     #-----------------------------------------------------------------------------------------------
538     # present the current crab task in long form
539     #-----------------------------------------------------------------------------------------------
540     def show(self):
541     print '==== Job Status Information ===='
542     print ' Index: %6d Tag: %s CE: %s'%(self.index,self.tag,self.ce)
543     print ' Output: %2d Exit: %6d,%6d'%(self.outputFile,self.exitCode,self.exitStatus)