ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/UserCode/MitProd/Processing/bin/jobSitter.py
Revision: 1.2
Committed: Sat Jun 5 02:36:28 2010 UTC (14 years, 11 months ago) by paus
Content type: text/x-python
Branch: MAIN
CVS Tags: Mit_014b, Mit_014a
Changes since 1.1: +338 -0 lines
Log Message:
Wow I forgot all about my cvs.

File Contents

# User Rev Content
1 paus 1.2 #!/usr/bin/env python
2     #---------------------------------------------------------------------------------------------------
3     # Script to go through my crab jobs, get status and output and take some completion action if
4     # needed and desired.
5     #
6     # Author: C.Paus (July 1, 2008)
7     #---------------------------------------------------------------------------------------------------
8     import os,sys,getopt,re,string
9     import task
10    
11     def removeCrabTask(crabTask):
12     cmd = 'crab -kill all -continue ' + crabTask.tag + ' >& /dev/null; rm -rf ' + crabTask.tag
13     print ' KILL and REMOVE task: ' + cmd
14     status = os.system(cmd)
15     return status
16    
17     def updateExitStati(tag,status):
18     # Make the file
19     file = tag + '/res/CMSSW_%d.stdout'%(status.index)
20     #print ' Analyzing file: ' + file
21     cmd = 'cat ' + file
22     if os.path.exists(file):
23     for line in os.popen(cmd).readlines(): # run command
24     line = line[:-1]
25     # get ride of empty or commented lines
26     if line == '' or line[0] == '#':
27     continue
28     # no more unnecessary spaces
29     line = " ".join(str(line).split()).strip()
30    
31     f = line.split(" ")
32    
33     if f[0] == 'EXECUTABLE_EXIT_STATUS':
34     status.exitCode = int(f[2])
35     if f[0] == 'StageOutExitStatus':
36     status.exitStatus = int(f[2])
37     #else:
38     # print ' output file for this job not yet retrieved. '
39    
40     def appendBlacklistSites(tag,failedlist,blacklist,exe):
41     # Make the file
42     file = tag + '/share/crab.cfg'
43     # Find out whether there are any ce black listed
44     ceBlacklist = ""
45     # Get the original black list
46     cmd = 'cat ' + file
47    
48     # finding the one line
49     join = 0
50     fullLine = ""
51     bSlash = "\\";
52    
53     # New Configuration file
54     fileOutput = open(tag + '_crab.cfg','w')
55    
56     for line in os.popen(cmd).readlines(): # run command
57     line = line[:-1]
58     ##print 'Line: ' + line
59    
60     # get ride of empty or commented lines
61     if line == '' or line[0] == '#':
62     continue
63     # no more unnecessary spaces
64     line = " ".join(str(line).split()).strip()
65     # join lines
66     if join == 1:
67     fullLine += line
68     else:
69     fullLine = line
70     # determine if finished or more is coming
71     if fullLine[-1] == bSlash:
72     join = 1
73     fullLine = fullLine[:-1]
74     # line really ended, so now look at the whole thing
75     else:
76     join = 0
77     # test whether there is a directory
78     names = fullLine.split('=') # splitting every blank
79     ##print "FullLine: " + fullLine
80     if names[0] == 'ce_black_list':
81     ceBlacklist = fullLine
82     fullLine += ',' + failedlist
83     if blacklist != "":
84     fullLine = "ce_black_list=" + blacklist
85     print " new blacklist: " + fullLine
86    
87     fileOutput.write(fullLine + '\n')
88    
89     fileOutput.close()
90    
91     if ceBlacklist == '':
92     print ' WARNING - site blacklisting did not work'
93    
94     cmd = 'mv ' + tag + '_crab.cfg ' + file
95     print "\nACTION -- MOVE: " + cmd
96     status = 0
97     if exe == 1:
98     status = os.system(cmd)
99     else:
100     ##status = os.system('cat ' + tag + '_crab.cfg')
101     status = os.system('rm ' + tag + '_crab.cfg')
102     return status
103    
104     def removeJobRemainders(storageEle,storagePath,mitDataset,index,exe):
105     fileMit = 'srm://' + storageEle + ':8443' + storagePath + '/' + mitDataset + \
106     '_000_%d'%(index) + '.root'
107     fileEdm = 'srm://' + storageEle + ':8443' + storagePath + '/' + mitDataset + \
108     '-edm_%d'%(index) + '.root'
109     cmd = 'srmrm ' + fileMit + ' ' + fileEdm + ' >& /dev/null'
110     #print 'ACTION -- REMOVE: \n srmrm ' + fileMit + '\n srmrm ' + fileEdm + '\n'
111     status = 0
112     if exe == 1:
113     # for now not execute this # status = os.system(cmd)
114     print 'For now not removing file: ' + cmd
115     return status
116    
117     #===================================================================================================
118     # Main starts here
119     #===================================================================================================
120     # Define string to explain usage of the script
121     usage = \
122     "\nUsage: jobSitter.py [ --pattern= --blacklist=" + \
123     " --status --help --backward --clean --extend --one --exe ]\n"
124    
125     # Define the valid options which can be specified and check out the command line
126     valid = ['pattern=','blacklist=','catalog=','help','backward','kill','clean','exe','extend','one']
127     try:
128     opts, args = getopt.getopt(sys.argv[1:], "", valid)
129     except getopt.GetoptError, ex:
130     print usage
131     print str(ex)
132     sys.exit(1)
133    
134     # --------------------------------------------------------------------------------------------------
135     # Get all parameters for this little task
136     # --------------------------------------------------------------------------------------------------
137     # Set defaults
138     pattern = ''
139     blacklist = ''
140     catalog = 0
141     clean = 0
142     kill = 0
143     exe = 0
144     extend = 0
145     one = 0
146     backward = ''
147    
148     # Read new values from the command line
149     for opt, arg in opts:
150     if opt == "--help":
151     print usage
152     sys.exit(0)
153     if opt == "--pattern":
154     pattern = arg
155     if opt == "--blacklist":
156     blacklist = arg
157     if opt == "--catalog":
158     catalog = int(arg)
159     if opt == "--clean":
160     clean = 1
161     if opt == "--one":
162     one = 1
163     if opt == "--exe":
164     exe = 1
165     if opt == "--extend":
166     extend = 1
167     if opt == "--backward":
168     backward = ' -r '
169     if opt == "--kill":
170     kill = 1
171    
172     # --------------------------------------------------------------------------------------------------
173     # Here is where the real action starts -------------------------------------------------------------
174     # --------------------------------------------------------------------------------------------------
175    
176     # Find the list of crab tasks to babysit
177     crabTasks = []
178     datasetList = []
179     cmd = 'find ./ -maxdepth 1 -name crab_0_\* |grep -v cfg | sort' + backward
180     print '\n=============================================================================='
181     print ' Summary of crab task list: \n'
182     for line in os.popen(cmd).readlines(): # run command
183     line = line[:-1] # strip '\n'
184     ## print ' LINE: ' + line
185     f = line.split('/') # splitting every blank
186     tag = f.pop()
187    
188     crabTask = task.Task(tag)
189    
190     #print 'Pattern: ' + pattern + ' tag: ' + crabTask.mitDataset
191     if re.search(pattern,crabTask.mitDataset):
192     crabTasks.append(crabTask)
193     crabTask.show()
194    
195     if one == 1:
196     break
197    
198     # Process the crab tasks determined to be relevant in the last query
199     print '\n=============================================================================='
200     print ' Process crab task list (please wait, crab commands are first fully parsed)\n'
201     i = 0
202     for crabTask in crabTasks:
203    
204     dataset = crabTask.cmsDataset
205     storageEle = crabTask.storageEle
206     storagePath = crabTask.storagePath
207    
208     if kill == 1:
209     crabTask.killAndRemove(1)
210     continue
211    
212     crabTask.loadAllLfns(crabTask.mitCfg + '/' + crabTask.mitVersion + '/' + \
213     crabTask.mitDataset + '.lfns')
214     #if crabTask.status == 'cataloged':
215     # ##removeCrabTask(crabTask)
216     # crabTask.killAndRemove(1)
217     # continue
218    
219     # make sure catalog is up to date
220     f = storagePath.split(" ")
221     cmd = 'catalog.sh -ceg -m ' + crabTask.mitCfg + ' ' \
222     + crabTask.mitVersion + ' ' + crabTask.mitDataset + '/' + crabTask.tag + ' --retry'
223    
224     if catalog == 3:
225     cmd = 'catalog.sh -cegt -m ' + crabTask.mitCfg + ' ' + crabTask.mitVersion + \
226     ' ' + crabTask.mitDataset + '/' + crabTask.tag + ' --retry'
227     if catalog == 4:
228     cmd = 'catalog.sh -eg -m ' + crabTask.mitCfg + ' ' + crabTask.mitVersion + \
229     ' ' + crabTask.mitDataset + '/' + crabTask.tag + ' --retry'
230     if catalog == 5:
231     cmd = 'catalog.sh -ceg -m ' + crabTask.mitCfg + ' ' + crabTask.mitVersion + \
232     ' ' + crabTask.mitDataset + '/' + crabTask.tag + ' --remove'
233     print ' CATALOG: ' + cmd
234     if catalog != 0:
235     os.system(cmd)
236    
237     # break out of the loop as only cataloging is required
238     if catalog > 1:
239     continue
240    
241     # do we need to extend the task
242     if extend == 1:
243     print '\n------------------------------------------------------------------------------'
244     print ' --> EXTEND ' + crabTask.tag + ' -- wait crab commands first fully parsed -- ' \
245     + '\n -> ' + dataset + ' (' + crabTask.mitDataset + ')'\
246     + '\n -> ' + storageEle \
247     + '\n -> ' + storagePath
248     print '------------------------------------------------------------------------------\n'
249     cmd = 'crab -extend -c ' + crabTask.tag
250     os.system(cmd)
251    
252     print '\n------------------------------------------------------------------------------'
253     print ' --> STATUS ' + crabTask.tag + ' -- wait crab commands first fully parsed -- ' \
254     + '\n -> ' + dataset + ' (' + crabTask.mitDataset + ')'\
255     + '\n -> ' + storageEle \
256     + '\n -> ' + storagePath
257     print '------------------------------------------------------------------------------\n'
258    
259     # interact with crab to get the job status
260     crabTask.getJobStati()
261     if len(crabTask.jobStati) < 1:
262     print ' ERROR - dropped empty crab task from the work list.'
263     print ' crab task id: ' + crabTask.tag
264     continue
265     else:
266     print ' '
267     print ' Task status: ' + crabTask.status
268     if crabTask.status == 'completed' or crabTask.status == 'finished':
269     crabTask.remove(clean)
270     print ' '
271    
272     # review failing sites
273     siteList = ""
274     if len(crabTask.failingSites) > 0:
275     nSites = 0
276     siteList = ",".join(crabTask.failingSites)
277     print " Failing sites (consider blacklisting them)"
278     for site,nAbort in crabTask.failingSites.iteritems():
279     nSites += 1
280     print ' ' + site + '(%d'%nAbort + ')'
281     appendBlacklistSites(crabTask.tag,siteList,blacklist,exe)
282    
283     # review all job stati and update exit stati if needed
284     for status in crabTask.jobStati:
285     if status.tag == 'Retrieved' and (status.exitCode < 0 and status.exitStatus < 0):
286     updateExitStati(crabTask.tag,status)
287     #status.showCompact()
288    
289     # review all job stati and propose action
290     subList = ''
291     resubList = ''
292     for status in crabTask.jobStati:
293     ##print ' %4.0d '%(status.index) + ' --> ' + status.tag
294     if ((status.tag == 'Created' and status.outputFile == 0)):
295     status.showCompact()
296     if subList == '':
297     subList += '%d'%(status.index)
298     else:
299     subList += ',%d'%(status.index)
300     if ((status.tag == 'Aborted' or status.exitCode > 0 or status.exitStatus > 0) or
301     (status.tag == 'Retrieved' and status.outputFile == 0)):
302     status.showCompact()
303     if resubList == '':
304     resubList += '%d'%(status.index)
305     else:
306     resubList += ',%d'%(status.index)
307     # for failed job first remove remainders before resubmitting
308     if status.outputFile == 1:
309     removeJobRemainders(storageEle,storagePath,crabTask.mitDataset,status.index,exe)
310    
311     if subList != '':
312     if not re.search('-',subList) and not re.search(',',subList):
313     subList = subList + ',999999999'
314     cmd = 'crab -c ' + crabTask.tag + ' -submit ' + subList
315     print '\nACTION -- SUBMIT.PY: ' + cmd
316     if exe == 1:
317     status = os.system(cmd)
318     if resubList != '':
319     cmd = 'crab -c ' + crabTask.tag + ' -resubmit ' + resubList
320     print '\nACTION -- RE-SUBMIT.PY: ' + cmd
321     if exe == 1:
322     status = os.system(cmd)
323    
324    
325     print '\n------------------------------------------------------------------------------'
326     print ' --> GETOUTPUT ' + crabTask.tag + ' -- wait crab commands first fully parsed -- ' \
327     + '\n -> ' + dataset \
328     + '\n -> ' + storageEle \
329     + '\n -> ' + storagePath
330     print '------------------------------------------------------------------------------\n'
331    
332     cmd = 'crab -getoutput -continue ' + crabTask.tag
333     status = os.system(cmd)
334    
335     cmd = 'cleanupLog.py --crabId ' + crabTask.tag
336     status = os.system(cmd)
337    
338     i += 1