ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/UserCode/MitProd/Processing/bin/jobSitter.py
Revision: 1.5
Committed: Tue Sep 21 20:09:09 2010 UTC (14 years, 7 months ago) by paus
Content type: text/x-python
Branch: MAIN
CVS Tags: Mit_017pre3, Mit_017pre2, Mit_017pre1, Mit_016, Mit_015b, Mit_015a, Mit_015, Mit_014e, Mit_014d, Mit_014c
Changes since 1.4: +14 -9 lines
Log Message:
Update before going to 014e tag and version 3_8_4.

File Contents

# User Rev Content
1 paus 1.2 #!/usr/bin/env python
2     #---------------------------------------------------------------------------------------------------
3     # Script to go through my crab jobs, get status and output and take some completion action if
4     # needed and desired.
5     #
6     # Author: C.Paus (July 1, 2008)
7     #---------------------------------------------------------------------------------------------------
8     import os,sys,getopt,re,string
9     import task
10    
11     def removeCrabTask(crabTask):
12     cmd = 'crab -kill all -continue ' + crabTask.tag + ' >& /dev/null; rm -rf ' + crabTask.tag
13     print ' KILL and REMOVE task: ' + cmd
14     status = os.system(cmd)
15     return status
16    
17     def updateExitStati(tag,status):
18     # Make the file
19     file = tag + '/res/CMSSW_%d.stdout'%(status.index)
20     #print ' Analyzing file: ' + file
21     cmd = 'cat ' + file
22     if os.path.exists(file):
23     for line in os.popen(cmd).readlines(): # run command
24     line = line[:-1]
25     # get ride of empty or commented lines
26     if line == '' or line[0] == '#':
27     continue
28     # no more unnecessary spaces
29     line = " ".join(str(line).split()).strip()
30    
31     f = line.split(" ")
32    
33     if f[0] == 'EXECUTABLE_EXIT_STATUS':
34     status.exitCode = int(f[2])
35     if f[0] == 'StageOutExitStatus':
36     status.exitStatus = int(f[2])
37     #else:
38     # print ' output file for this job not yet retrieved. '
39    
40     def appendBlacklistSites(tag,failedlist,blacklist,exe):
41     # Make the file
42     file = tag + '/share/crab.cfg'
43     # Find out whether there are any ce black listed
44     ceBlacklist = ""
45     # Get the original black list
46     cmd = 'cat ' + file
47    
48     # finding the one line
49     join = 0
50     fullLine = ""
51     bSlash = "\\";
52    
53     # New Configuration file
54     fileOutput = open(tag + '_crab.cfg','w')
55    
56     for line in os.popen(cmd).readlines(): # run command
57     line = line[:-1]
58     ##print 'Line: ' + line
59    
60     # get ride of empty or commented lines
61     if line == '' or line[0] == '#':
62     continue
63     # no more unnecessary spaces
64     line = " ".join(str(line).split()).strip()
65     # join lines
66     if join == 1:
67     fullLine += line
68     else:
69     fullLine = line
70     # determine if finished or more is coming
71     if fullLine[-1] == bSlash:
72     join = 1
73     fullLine = fullLine[:-1]
74     # line really ended, so now look at the whole thing
75     else:
76     join = 0
77     # test whether there is a directory
78     names = fullLine.split('=') # splitting every blank
79     ##print "FullLine: " + fullLine
80     if names[0] == 'ce_black_list':
81     ceBlacklist = fullLine
82     fullLine += ',' + failedlist
83     if blacklist != "":
84     fullLine = "ce_black_list=" + blacklist
85     print " new blacklist: " + fullLine
86    
87     fileOutput.write(fullLine + '\n')
88    
89     fileOutput.close()
90    
91     if ceBlacklist == '':
92     print ' WARNING - site blacklisting did not work'
93    
94     cmd = 'mv ' + tag + '_crab.cfg ' + file
95     print "\nACTION -- MOVE: " + cmd
96     status = 0
97     if exe == 1:
98     status = os.system(cmd)
99     else:
100     ##status = os.system('cat ' + tag + '_crab.cfg')
101     status = os.system('rm ' + tag + '_crab.cfg')
102     return status
103    
104     def removeJobRemainders(storageEle,storagePath,mitDataset,index,exe):
105     fileMit = 'srm://' + storageEle + ':8443' + storagePath + '/' + mitDataset + \
106     '_000_%d'%(index) + '.root'
107     fileEdm = 'srm://' + storageEle + ':8443' + storagePath + '/' + mitDataset + \
108     '-edm_%d'%(index) + '.root'
109     cmd = 'srmrm ' + fileMit + ' ' + fileEdm + ' >& /dev/null'
110     #print 'ACTION -- REMOVE: \n srmrm ' + fileMit + '\n srmrm ' + fileEdm + '\n'
111     status = 0
112     if exe == 1:
113     # for now not execute this # status = os.system(cmd)
114     print 'For now not removing file: ' + cmd
115     return status
116    
117     #===================================================================================================
118     # Main starts here
119     #===================================================================================================
120     # Define string to explain usage of the script
121     usage = \
122     "\nUsage: jobSitter.py [ --pattern= --blacklist=" + \
123     " --status --help --backward --clean --extend --one --exe ]\n"
124    
125     # Define the valid options which can be specified and check out the command line
126     valid = ['pattern=','blacklist=','catalog=','help','backward','kill','clean','exe','extend','one']
127     try:
128     opts, args = getopt.getopt(sys.argv[1:], "", valid)
129     except getopt.GetoptError, ex:
130     print usage
131     print str(ex)
132     sys.exit(1)
133    
134     # --------------------------------------------------------------------------------------------------
135     # Get all parameters for this little task
136     # --------------------------------------------------------------------------------------------------
137     # Set defaults
138     pattern = ''
139     blacklist = ''
140     catalog = 0
141     clean = 0
142     kill = 0
143     exe = 0
144     extend = 0
145     one = 0
146     backward = ''
147    
148     # Read new values from the command line
149     for opt, arg in opts:
150     if opt == "--help":
151     print usage
152     sys.exit(0)
153     if opt == "--pattern":
154     pattern = arg
155     if opt == "--blacklist":
156     blacklist = arg
157     if opt == "--catalog":
158     catalog = int(arg)
159     if opt == "--clean":
160     clean = 1
161     if opt == "--one":
162     one = 1
163     if opt == "--exe":
164     exe = 1
165     if opt == "--extend":
166     extend = 1
167     if opt == "--backward":
168     backward = ' -r '
169     if opt == "--kill":
170     kill = 1
171    
172     # --------------------------------------------------------------------------------------------------
173     # Here is where the real action starts -------------------------------------------------------------
174     # --------------------------------------------------------------------------------------------------
175    
176     # Find the list of crab tasks to babysit
177     crabTasks = []
178     datasetList = []
179     cmd = 'find ./ -maxdepth 1 -name crab_0_\* |grep -v cfg | sort' + backward
180     print '\n=============================================================================='
181     print ' Summary of crab task list: \n'
182     for line in os.popen(cmd).readlines(): # run command
183     line = line[:-1] # strip '\n'
184     ## print ' LINE: ' + line
185     f = line.split('/') # splitting every blank
186     tag = f.pop()
187    
188     crabTask = task.Task(tag)
189    
190     #print 'Pattern: ' + pattern + ' tag: ' + crabTask.mitDataset
191     if re.search(pattern,crabTask.mitDataset):
192     crabTasks.append(crabTask)
193     crabTask.show()
194    
195     if one == 1:
196     break
197    
198     # Process the crab tasks determined to be relevant in the last query
199     print '\n=============================================================================='
200     print ' Process crab task list (please wait, crab commands are first fully parsed)\n'
201     i = 0
202     for crabTask in crabTasks:
203    
204 paus 1.5 print '\n------------------------------------------------------------------------------'
205     print ' --> PREPPING ' + crabTask.tag \
206     + '\n -> ' + crabTask.mitDataset
207     print '------------------------------------------------------------------------------\n'
208    
209 paus 1.2 dataset = crabTask.cmsDataset
210     storageEle = crabTask.storageEle
211     storagePath = crabTask.storagePath
212    
213     if kill == 1:
214     crabTask.killAndRemove(1)
215     continue
216    
217     crabTask.loadAllLfns(crabTask.mitCfg + '/' + crabTask.mitVersion + '/' + \
218     crabTask.mitDataset + '.lfns')
219     #if crabTask.status == 'cataloged':
220     # ##removeCrabTask(crabTask)
221     # crabTask.killAndRemove(1)
222     # continue
223    
224     # make sure catalog is up to date
225     f = storagePath.split(" ")
226     cmd = 'catalog.sh -ceg -m ' + crabTask.mitCfg + ' ' \
227     + crabTask.mitVersion + ' ' + crabTask.mitDataset + '/' + crabTask.tag + ' --retry'
228    
229     if catalog == 3:
230     cmd = 'catalog.sh -cegt -m ' + crabTask.mitCfg + ' ' + crabTask.mitVersion + \
231     ' ' + crabTask.mitDataset + '/' + crabTask.tag + ' --retry'
232     if catalog == 4:
233     cmd = 'catalog.sh -eg -m ' + crabTask.mitCfg + ' ' + crabTask.mitVersion + \
234     ' ' + crabTask.mitDataset + '/' + crabTask.tag + ' --retry'
235     if catalog == 5:
236     cmd = 'catalog.sh -ceg -m ' + crabTask.mitCfg + ' ' + crabTask.mitVersion + \
237     ' ' + crabTask.mitDataset + '/' + crabTask.tag + ' --remove'
238 paus 1.3 if catalog == 6:
239     cmd = 'catalog.sh -e -m ' + crabTask.mitCfg + ' ' + crabTask.mitVersion + \
240     ' ' + crabTask.mitDataset + '/' + crabTask.tag + ' --compact'
241     if catalog == 7:
242     cmd = 'catalog.sh -g -m ' + crabTask.mitCfg + ' ' + crabTask.mitVersion + \
243     ' ' + crabTask.mitDataset + '/' + crabTask.tag + ' --compact'
244 paus 1.4
245    
246     print '\n------------------------------------------------------------------------------'
247     print ' --> CATALOG ' + crabTask.tag \
248     + '\n -> ' + dataset + ' (' + crabTask.mitDataset + ')'\
249     + '\n -> ' + storageEle \
250     + '\n -> ' + storagePath
251     print '------------------------------------------------------------------------------\n'
252     print ' --> ' + cmd
253    
254 paus 1.2 if catalog != 0:
255     os.system(cmd)
256    
257     # break out of the loop as only cataloging is required
258     if catalog > 1:
259     continue
260    
261     # do we need to extend the task
262     if extend == 1:
263 paus 1.4 cmd = 'crab -extend -c ' + crabTask.tag
264 paus 1.2 print '\n------------------------------------------------------------------------------'
265     print ' --> EXTEND ' + crabTask.tag + ' -- wait crab commands first fully parsed -- ' \
266     + '\n -> ' + dataset + ' (' + crabTask.mitDataset + ')'\
267     + '\n -> ' + storageEle \
268     + '\n -> ' + storagePath
269     print '------------------------------------------------------------------------------\n'
270 paus 1.4 print ' --> ' + cmd
271 paus 1.2 os.system(cmd)
272    
273     print '\n------------------------------------------------------------------------------'
274     print ' --> STATUS ' + crabTask.tag + ' -- wait crab commands first fully parsed -- ' \
275     + '\n -> ' + dataset + ' (' + crabTask.mitDataset + ')'\
276     + '\n -> ' + storageEle \
277     + '\n -> ' + storagePath
278     print '------------------------------------------------------------------------------\n'
279    
280     # interact with crab to get the job status
281     crabTask.getJobStati()
282     if len(crabTask.jobStati) < 1:
283     print ' ERROR - dropped empty crab task from the work list.'
284     print ' crab task id: ' + crabTask.tag
285     continue
286     else:
287     print ' '
288     print ' Task status: ' + crabTask.status
289     if crabTask.status == 'completed' or crabTask.status == 'finished':
290     crabTask.remove(clean)
291 paus 1.5 print ' INFO - crab task has been removed, continuing.\n'
292 paus 1.4 continue
293 paus 1.2 print ' '
294    
295     # review failing sites
296     siteList = ""
297     if len(crabTask.failingSites) > 0:
298     nSites = 0
299     siteList = ",".join(crabTask.failingSites)
300     print " Failing sites (consider blacklisting them)"
301     for site,nAbort in crabTask.failingSites.iteritems():
302     nSites += 1
303     print ' ' + site + '(%d'%nAbort + ')'
304     appendBlacklistSites(crabTask.tag,siteList,blacklist,exe)
305    
306     # review all job stati and update exit stati if needed
307     for status in crabTask.jobStati:
308     if status.tag == 'Retrieved' and (status.exitCode < 0 and status.exitStatus < 0):
309     updateExitStati(crabTask.tag,status)
310     #status.showCompact()
311    
312     # review all job stati and propose action
313     subList = ''
314     resubList = ''
315     for status in crabTask.jobStati:
316     ##print ' %4.0d '%(status.index) + ' --> ' + status.tag
317     if ((status.tag == 'Created' and status.outputFile == 0)):
318     status.showCompact()
319     if subList == '':
320     subList += '%d'%(status.index)
321     else:
322     subList += ',%d'%(status.index)
323 paus 1.5 if ((status.tag == 'Aborted' or status.exitCode > 0 or status.exitStatus > 0)):
324     ##or(status.tag == 'Retrieved' and status.outputFile == 0)):
325 paus 1.2 status.showCompact()
326     if resubList == '':
327     resubList += '%d'%(status.index)
328     else:
329     resubList += ',%d'%(status.index)
330     # for failed job first remove remainders before resubmitting
331     if status.outputFile == 1:
332     removeJobRemainders(storageEle,storagePath,crabTask.mitDataset,status.index,exe)
333    
334     if subList != '':
335     if not re.search('-',subList) and not re.search(',',subList):
336     subList = subList + ',999999999'
337     cmd = 'crab -c ' + crabTask.tag + ' -submit ' + subList
338 paus 1.5 ##print '\nACTION -- SUBMIT.PY: ' + cmd
339     ##if exe == 1:
340     ## status = os.system(cmd)
341 paus 1.2 if resubList != '':
342     cmd = 'crab -c ' + crabTask.tag + ' -resubmit ' + resubList
343 paus 1.5 ##print '\nACTION -- RE-SUBMIT.PY: ' + cmd
344     ##if exe == 1:
345     ## status = os.system(cmd)
346 paus 1.2
347    
348 paus 1.4 cmd = 'crab -getoutput -continue ' + crabTask.tag
349 paus 1.2 print '\n------------------------------------------------------------------------------'
350     print ' --> GETOUTPUT ' + crabTask.tag + ' -- wait crab commands first fully parsed -- ' \
351     + '\n -> ' + dataset \
352     + '\n -> ' + storageEle \
353     + '\n -> ' + storagePath
354     print '------------------------------------------------------------------------------\n'
355 paus 1.4 print ' --> ' + cmd
356 paus 1.2 status = os.system(cmd)
357    
358     cmd = 'cleanupLog.py --crabId ' + crabTask.tag
359     status = os.system(cmd)
360    
361     i += 1