ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/UserCode/MitProd/Processing/bin/jobSitter.py
Revision: 1.9
Committed: Tue Feb 28 11:54:36 2012 UTC (13 years, 2 months ago) by paus
Content type: text/x-python
Branch: MAIN
CVS Tags: Mit_028a, Mit_028, Mit_027a, Mit_027, Mit_026, Mit_025e, Mit_025d
Changes since 1.8: +20 -22 lines
Log Message:
Last updates.

File Contents

# User Rev Content
1 paus 1.2 #!/usr/bin/env python
2     #---------------------------------------------------------------------------------------------------
3     # Script to go through my crab jobs, get status and output and take some completion action if
4     # needed and desired.
5     #
6     # Author: C.Paus (July 1, 2008)
7     #---------------------------------------------------------------------------------------------------
8     import os,sys,getopt,re,string
9     import task
10    
11     def removeCrabTask(crabTask):
12     cmd = 'crab -kill all -continue ' + crabTask.tag + ' >& /dev/null; rm -rf ' + crabTask.tag
13     print ' KILL and REMOVE task: ' + cmd
14     status = os.system(cmd)
15     return status
16    
17     def updateExitStati(tag,status):
18     # Make the file
19     file = tag + '/res/CMSSW_%d.stdout'%(status.index)
20     #print ' Analyzing file: ' + file
21     cmd = 'cat ' + file
22     if os.path.exists(file):
23     for line in os.popen(cmd).readlines(): # run command
24     line = line[:-1]
25     # get ride of empty or commented lines
26     if line == '' or line[0] == '#':
27     continue
28     # no more unnecessary spaces
29     line = " ".join(str(line).split()).strip()
30    
31     f = line.split(" ")
32    
33     if f[0] == 'EXECUTABLE_EXIT_STATUS':
34     status.exitCode = int(f[2])
35     if f[0] == 'StageOutExitStatus':
36     status.exitStatus = int(f[2])
37     #else:
38     # print ' output file for this job not yet retrieved. '
39    
40     def appendBlacklistSites(tag,failedlist,blacklist,exe):
41     # Make the file
42     file = tag + '/share/crab.cfg'
43     # Find out whether there are any ce black listed
44     ceBlacklist = ""
45     # Get the original black list
46     cmd = 'cat ' + file
47    
48     # finding the one line
49     join = 0
50     fullLine = ""
51     bSlash = "\\";
52    
53     # New Configuration file
54     fileOutput = open(tag + '_crab.cfg','w')
55    
56     for line in os.popen(cmd).readlines(): # run command
57     line = line[:-1]
58     ##print 'Line: ' + line
59    
60     # get ride of empty or commented lines
61     if line == '' or line[0] == '#':
62     continue
63     # no more unnecessary spaces
64     line = " ".join(str(line).split()).strip()
65     # join lines
66     if join == 1:
67     fullLine += line
68     else:
69     fullLine = line
70     # determine if finished or more is coming
71     if fullLine[-1] == bSlash:
72     join = 1
73     fullLine = fullLine[:-1]
74     # line really ended, so now look at the whole thing
75     else:
76     join = 0
77     # test whether there is a directory
78     names = fullLine.split('=') # splitting every blank
79     ##print "FullLine: " + fullLine
80     if names[0] == 'ce_black_list':
81     ceBlacklist = fullLine
82     fullLine += ',' + failedlist
83     if blacklist != "":
84     fullLine = "ce_black_list=" + blacklist
85     print " new blacklist: " + fullLine
86    
87     fileOutput.write(fullLine + '\n')
88    
89     fileOutput.close()
90    
91     if ceBlacklist == '':
92     print ' WARNING - site blacklisting did not work'
93    
94     cmd = 'mv ' + tag + '_crab.cfg ' + file
95     print "\nACTION -- MOVE: " + cmd
96     status = 0
97     if exe == 1:
98     status = os.system(cmd)
99     else:
100     ##status = os.system('cat ' + tag + '_crab.cfg')
101     status = os.system('rm ' + tag + '_crab.cfg')
102     return status
103    
104     def removeJobRemainders(storageEle,storagePath,mitDataset,index,exe):
105     fileMit = 'srm://' + storageEle + ':8443' + storagePath + '/' + mitDataset + \
106     '_000_%d'%(index) + '.root'
107     fileEdm = 'srm://' + storageEle + ':8443' + storagePath + '/' + mitDataset + \
108     '-edm_%d'%(index) + '.root'
109     cmd = 'srmrm ' + fileMit + ' ' + fileEdm + ' >& /dev/null'
110     #print 'ACTION -- REMOVE: \n srmrm ' + fileMit + '\n srmrm ' + fileEdm + '\n'
111     status = 0
112     if exe == 1:
113     # for now not execute this # status = os.system(cmd)
114     print 'For now not removing file: ' + cmd
115     return status
116    
117     #===================================================================================================
118     # Main starts here
119     #===================================================================================================
120     # Define string to explain usage of the script
121     usage = \
122 paus 1.8 "\nUsage: jobSitter.py [ --pattern= --apattern= --blacklist=" \
123     + " --status --kill --remove --help --backward --clean --extend --one" \
124     + " --exe ]\n"
125 paus 1.2
126     # Define the valid options which can be specified and check out the command line
127 paus 1.8 valid = ['pattern=','apattern=','blacklist=','catalog=',
128     'help','backward','kill','remove','clean','exe','extend','one']
129 paus 1.2 try:
130     opts, args = getopt.getopt(sys.argv[1:], "", valid)
131     except getopt.GetoptError, ex:
132     print usage
133     print str(ex)
134     sys.exit(1)
135    
136     # --------------------------------------------------------------------------------------------------
137     # Get all parameters for this little task
138     # --------------------------------------------------------------------------------------------------
139     # Set defaults
140     pattern = ''
141 paus 1.8 apattern = ''
142 paus 1.2 blacklist = ''
143     catalog = 0
144     clean = 0
145     kill = 0
146 paus 1.8 remove = 0
147 paus 1.2 exe = 0
148     extend = 0
149     one = 0
150     backward = ''
151    
152     # Read new values from the command line
153     for opt, arg in opts:
154     if opt == "--help":
155     print usage
156     sys.exit(0)
157     if opt == "--pattern":
158     pattern = arg
159 paus 1.8 if opt == "--apattern":
160     apattern = arg
161 paus 1.2 if opt == "--blacklist":
162     blacklist = arg
163     if opt == "--catalog":
164     catalog = int(arg)
165     if opt == "--clean":
166     clean = 1
167     if opt == "--one":
168     one = 1
169     if opt == "--exe":
170     exe = 1
171     if opt == "--extend":
172     extend = 1
173     if opt == "--backward":
174     backward = ' -r '
175     if opt == "--kill":
176     kill = 1
177 paus 1.8 if opt == "--remove":
178     remove = 1
179 paus 1.2
180     # --------------------------------------------------------------------------------------------------
181     # Here is where the real action starts -------------------------------------------------------------
182     # --------------------------------------------------------------------------------------------------
183    
184     # Find the list of crab tasks to babysit
185     crabTasks = []
186     datasetList = []
187     cmd = 'find ./ -maxdepth 1 -name crab_0_\* |grep -v cfg | sort' + backward
188     print '\n=============================================================================='
189     print ' Summary of crab task list: \n'
190     for line in os.popen(cmd).readlines(): # run command
191     line = line[:-1] # strip '\n'
192     ## print ' LINE: ' + line
193     f = line.split('/') # splitting every blank
194     tag = f.pop()
195    
196     crabTask = task.Task(tag)
197    
198     #print 'Pattern: ' + pattern + ' tag: ' + crabTask.mitDataset
199 paus 1.8 if apattern != '' and re.search(apattern,crabTask.mitDataset):
200     print '\n Skipping: ' + crabTask.mitDataset + '\n\n'
201 paus 1.2 if re.search(pattern,crabTask.mitDataset):
202     crabTasks.append(crabTask)
203     crabTask.show()
204    
205     if one == 1:
206     break
207    
208     # Process the crab tasks determined to be relevant in the last query
209     print '\n=============================================================================='
210 paus 1.7 print ' Process crab task list\n'
211 paus 1.2 i = 0
212     for crabTask in crabTasks:
213    
214 paus 1.5 print '\n------------------------------------------------------------------------------'
215     print ' --> PREPPING ' + crabTask.tag \
216 paus 1.9 + '\n -> ' + crabTask.cmsDataset + ' (' + crabTask.mitDataset + ')'\
217     + '\n -> ' + crabTask.storageEle \
218     + '\n -> ' + crabTask.storagePath
219 paus 1.5 print '------------------------------------------------------------------------------\n'
220    
221 paus 1.2 dataset = crabTask.cmsDataset
222     storageEle = crabTask.storageEle
223     storagePath = crabTask.storagePath
224    
225     if kill == 1:
226     crabTask.killAndRemove(1)
227     continue
228    
229 paus 1.8 if remove == 1:
230     crabTask.remove(1)
231     continue
232    
233 paus 1.2 crabTask.loadAllLfns(crabTask.mitCfg + '/' + crabTask.mitVersion + '/' + \
234     crabTask.mitDataset + '.lfns')
235     #if crabTask.status == 'cataloged':
236     # ##removeCrabTask(crabTask)
237     # crabTask.killAndRemove(1)
238     # continue
239    
240     # make sure catalog is up to date
241     f = storagePath.split(" ")
242     cmd = 'catalog.sh -ceg -m ' + crabTask.mitCfg + ' ' \
243     + crabTask.mitVersion + ' ' + crabTask.mitDataset + '/' + crabTask.tag + ' --retry'
244    
245     if catalog == 3:
246     cmd = 'catalog.sh -cegt -m ' + crabTask.mitCfg + ' ' + crabTask.mitVersion + \
247     ' ' + crabTask.mitDataset + '/' + crabTask.tag + ' --retry'
248     if catalog == 4:
249     cmd = 'catalog.sh -eg -m ' + crabTask.mitCfg + ' ' + crabTask.mitVersion + \
250     ' ' + crabTask.mitDataset + '/' + crabTask.tag + ' --retry'
251     if catalog == 5:
252     cmd = 'catalog.sh -ceg -m ' + crabTask.mitCfg + ' ' + crabTask.mitVersion + \
253     ' ' + crabTask.mitDataset + '/' + crabTask.tag + ' --remove'
254 paus 1.3 if catalog == 6:
255     cmd = 'catalog.sh -e -m ' + crabTask.mitCfg + ' ' + crabTask.mitVersion + \
256     ' ' + crabTask.mitDataset + '/' + crabTask.tag + ' --compact'
257     if catalog == 7:
258     cmd = 'catalog.sh -g -m ' + crabTask.mitCfg + ' ' + crabTask.mitVersion + \
259     ' ' + crabTask.mitDataset + '/' + crabTask.tag + ' --compact'
260 paus 1.4
261 paus 1.9 #print '\n --> CATALOG '
262     #print ' ' + cmd + '\n'
263    
264 paus 1.2 if catalog != 0:
265     os.system(cmd)
266    
267     # break out of the loop as only cataloging is required
268     if catalog > 1:
269     continue
270    
271     # do we need to extend the task
272     if extend == 1:
273 paus 1.4 cmd = 'crab -extend -c ' + crabTask.tag
274 paus 1.2 print '\n------------------------------------------------------------------------------'
275 paus 1.7 print ' --> EXTEND ' + crabTask.tag + ' -- ' \
276 paus 1.2 + '\n -> ' + dataset + ' (' + crabTask.mitDataset + ')'\
277     + '\n -> ' + storageEle \
278     + '\n -> ' + storagePath
279     print '------------------------------------------------------------------------------\n'
280 paus 1.4 print ' --> ' + cmd
281 paus 1.2 os.system(cmd)
282    
283 paus 1.9 #print '\n------------------------------------------------------------------------------'
284     #print ' --> STATUS ' + crabTask.tag + ' -- ' \
285     # + '\n -> ' + dataset + ' (' + crabTask.mitDataset + ')'\
286     # + '\n -> ' + storageEle \
287     # + '\n -> ' + storagePath
288     #print '------------------------------------------------------------------------------\n'
289     print '\n --> STATUS ' + crabTask.tag + ' -- ' + crabTask.mitDataset
290 paus 1.2
291     # interact with crab to get the job status
292     crabTask.getJobStati()
293     if len(crabTask.jobStati) < 1:
294     print ' ERROR - dropped empty crab task from the work list.'
295     print ' crab task id: ' + crabTask.tag
296     continue
297     else:
298     print ' '
299     print ' Task status: ' + crabTask.status
300     if crabTask.status == 'completed' or crabTask.status == 'finished':
301     crabTask.remove(clean)
302 paus 1.5 print ' INFO - crab task has been removed, continuing.\n'
303 paus 1.4 continue
304 paus 1.2 print ' '
305    
306     # review failing sites
307     siteList = ""
308     if len(crabTask.failingSites) > 0:
309     nSites = 0
310     siteList = ",".join(crabTask.failingSites)
311     print " Failing sites (consider blacklisting them)"
312     for site,nAbort in crabTask.failingSites.iteritems():
313     nSites += 1
314     print ' ' + site + '(%d'%nAbort + ')'
315     appendBlacklistSites(crabTask.tag,siteList,blacklist,exe)
316    
317     # review all job stati and update exit stati if needed
318     for status in crabTask.jobStati:
319     if status.tag == 'Retrieved' and (status.exitCode < 0 and status.exitStatus < 0):
320     updateExitStati(crabTask.tag,status)
321     #status.showCompact()
322    
323     # review all job stati and propose action
324     subList = ''
325     resubList = ''
326     for status in crabTask.jobStati:
327     ##print ' %4.0d '%(status.index) + ' --> ' + status.tag
328     if ((status.tag == 'Created' and status.outputFile == 0)):
329     status.showCompact()
330     if subList == '':
331     subList += '%d'%(status.index)
332     else:
333     subList += ',%d'%(status.index)
334 paus 1.5 if ((status.tag == 'Aborted' or status.exitCode > 0 or status.exitStatus > 0)):
335     ##or(status.tag == 'Retrieved' and status.outputFile == 0)):
336 paus 1.2 status.showCompact()
337     if resubList == '':
338     resubList += '%d'%(status.index)
339     else:
340     resubList += ',%d'%(status.index)
341     # for failed job first remove remainders before resubmitting
342     if status.outputFile == 1:
343     removeJobRemainders(storageEle,storagePath,crabTask.mitDataset,status.index,exe)
344    
345     if subList != '':
346     if not re.search('-',subList) and not re.search(',',subList):
347     subList = subList + ',999999999'
348     cmd = 'crab -c ' + crabTask.tag + ' -submit ' + subList
349 paus 1.5 ##print '\nACTION -- SUBMIT.PY: ' + cmd
350     ##if exe == 1:
351     ## status = os.system(cmd)
352 paus 1.2 if resubList != '':
353     cmd = 'crab -c ' + crabTask.tag + ' -resubmit ' + resubList
354 paus 1.5 ##print '\nACTION -- RE-SUBMIT.PY: ' + cmd
355     ##if exe == 1:
356     ## status = os.system(cmd)
357 paus 1.2
358    
359 paus 1.4 cmd = 'crab -getoutput -continue ' + crabTask.tag
360 paus 1.9 #print '\n------------------------------------------------------------------------------'
361     #print ' --> GETOUTPUT ' + crabTask.tag + ' -- ' \
362     # + '\n -> ' + dataset \
363     # + '\n -> ' + storageEle \
364     # + '\n -> ' + storagePath
365     #print '------------------------------------------------------------------------------\n'
366     print '\n --> GETOUTPUT ' + crabTask.tag + ' -- ' + crabTask.mitDataset
367 paus 1.4 print ' --> ' + cmd
368 paus 1.2 status = os.system(cmd)
369    
370     cmd = 'cleanupLog.py --crabId ' + crabTask.tag
371     status = os.system(cmd)
372    
373     i += 1