ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/UserCode/MitProd/Processing/bin/jobSitter.py
Revision: 1.11
Committed: Sat Jun 29 03:05:18 2013 UTC (11 years, 10 months ago) by paus
Content type: text/x-python
Branch: MAIN
CVS Tags: Mit_032, Mit_031, HEAD
Changes since 1.10: +2 -3 lines
Log Message:
Improve lfn handling.

File Contents

# User Rev Content
1 paus 1.2 #!/usr/bin/env python
2     #---------------------------------------------------------------------------------------------------
3     # Script to go through my crab jobs, get status and output and take some completion action if
4     # needed and desired.
5     #
6     # Author: C.Paus (July 1, 2008)
7     #---------------------------------------------------------------------------------------------------
8     import os,sys,getopt,re,string
9     import task
10    
11     def removeCrabTask(crabTask):
12     cmd = 'crab -kill all -continue ' + crabTask.tag + ' >& /dev/null; rm -rf ' + crabTask.tag
13     print ' KILL and REMOVE task: ' + cmd
14     status = os.system(cmd)
15     return status
16    
17     def updateExitStati(tag,status):
18     # Make the file
19     file = tag + '/res/CMSSW_%d.stdout'%(status.index)
20     #print ' Analyzing file: ' + file
21     cmd = 'cat ' + file
22     if os.path.exists(file):
23     for line in os.popen(cmd).readlines(): # run command
24     line = line[:-1]
25     # get ride of empty or commented lines
26     if line == '' or line[0] == '#':
27     continue
28     # no more unnecessary spaces
29     line = " ".join(str(line).split()).strip()
30    
31     f = line.split(" ")
32    
33     if f[0] == 'EXECUTABLE_EXIT_STATUS':
34     status.exitCode = int(f[2])
35     if f[0] == 'StageOutExitStatus':
36     status.exitStatus = int(f[2])
37     #else:
38     # print ' output file for this job not yet retrieved. '
39    
40     def appendBlacklistSites(tag,failedlist,blacklist,exe):
41     # Make the file
42     file = tag + '/share/crab.cfg'
43     # Find out whether there are any ce black listed
44     ceBlacklist = ""
45     # Get the original black list
46     cmd = 'cat ' + file
47    
48     # finding the one line
49     join = 0
50     fullLine = ""
51     bSlash = "\\";
52    
53     # New Configuration file
54     fileOutput = open(tag + '_crab.cfg','w')
55    
56     for line in os.popen(cmd).readlines(): # run command
57     line = line[:-1]
58     ##print 'Line: ' + line
59    
60     # get ride of empty or commented lines
61     if line == '' or line[0] == '#':
62     continue
63     # no more unnecessary spaces
64     line = " ".join(str(line).split()).strip()
65     # join lines
66     if join == 1:
67     fullLine += line
68     else:
69     fullLine = line
70     # determine if finished or more is coming
71     if fullLine[-1] == bSlash:
72     join = 1
73     fullLine = fullLine[:-1]
74     # line really ended, so now look at the whole thing
75     else:
76     join = 0
77     # test whether there is a directory
78     names = fullLine.split('=') # splitting every blank
79     ##print "FullLine: " + fullLine
80     if names[0] == 'ce_black_list':
81     ceBlacklist = fullLine
82     fullLine += ',' + failedlist
83     if blacklist != "":
84     fullLine = "ce_black_list=" + blacklist
85     print " new blacklist: " + fullLine
86    
87     fileOutput.write(fullLine + '\n')
88    
89     fileOutput.close()
90    
91     if ceBlacklist == '':
92     print ' WARNING - site blacklisting did not work'
93    
94     cmd = 'mv ' + tag + '_crab.cfg ' + file
95     print "\nACTION -- MOVE: " + cmd
96     status = 0
97     if exe == 1:
98     status = os.system(cmd)
99     else:
100     ##status = os.system('cat ' + tag + '_crab.cfg')
101     status = os.system('rm ' + tag + '_crab.cfg')
102     return status
103    
104     def removeJobRemainders(storageEle,storagePath,mitDataset,index,exe):
105     fileMit = 'srm://' + storageEle + ':8443' + storagePath + '/' + mitDataset + \
106     '_000_%d'%(index) + '.root'
107     fileEdm = 'srm://' + storageEle + ':8443' + storagePath + '/' + mitDataset + \
108     '-edm_%d'%(index) + '.root'
109     cmd = 'srmrm ' + fileMit + ' ' + fileEdm + ' >& /dev/null'
110     #print 'ACTION -- REMOVE: \n srmrm ' + fileMit + '\n srmrm ' + fileEdm + '\n'
111     status = 0
112     if exe == 1:
113     # for now not execute this # status = os.system(cmd)
114     print 'For now not removing file: ' + cmd
115     return status
116    
117     #===================================================================================================
118     # Main starts here
119     #===================================================================================================
120     # Define string to explain usage of the script
121     usage = \
122 paus 1.8 "\nUsage: jobSitter.py [ --pattern= --apattern= --blacklist=" \
123     + " --status --kill --remove --help --backward --clean --extend --one" \
124     + " --exe ]\n"
125 paus 1.2
126     # Define the valid options which can be specified and check out the command line
127 paus 1.8 valid = ['pattern=','apattern=','blacklist=','catalog=',
128     'help','backward','kill','remove','clean','exe','extend','one']
129 paus 1.2 try:
130     opts, args = getopt.getopt(sys.argv[1:], "", valid)
131     except getopt.GetoptError, ex:
132     print usage
133     print str(ex)
134     sys.exit(1)
135    
136     # --------------------------------------------------------------------------------------------------
137     # Get all parameters for this little task
138     # --------------------------------------------------------------------------------------------------
139     # Set defaults
140     pattern = ''
141 paus 1.8 apattern = ''
142 paus 1.2 blacklist = ''
143     catalog = 0
144     clean = 0
145     kill = 0
146 paus 1.8 remove = 0
147 paus 1.2 exe = 0
148     extend = 0
149     one = 0
150     backward = ''
151    
152     # Read new values from the command line
153     for opt, arg in opts:
154     if opt == "--help":
155     print usage
156     sys.exit(0)
157     if opt == "--pattern":
158     pattern = arg
159 paus 1.8 if opt == "--apattern":
160     apattern = arg
161 paus 1.2 if opt == "--blacklist":
162     blacklist = arg
163     if opt == "--catalog":
164     catalog = int(arg)
165     if opt == "--clean":
166     clean = 1
167     if opt == "--one":
168     one = 1
169     if opt == "--exe":
170     exe = 1
171     if opt == "--extend":
172     extend = 1
173     if opt == "--backward":
174     backward = ' -r '
175     if opt == "--kill":
176     kill = 1
177 paus 1.8 if opt == "--remove":
178     remove = 1
179 paus 1.2
180     # --------------------------------------------------------------------------------------------------
181     # Here is where the real action starts -------------------------------------------------------------
182     # --------------------------------------------------------------------------------------------------
183    
184     # Find the list of crab tasks to babysit
185     crabTasks = []
186     datasetList = []
187     cmd = 'find ./ -maxdepth 1 -name crab_0_\* |grep -v cfg | sort' + backward
188     print '\n=============================================================================='
189     print ' Summary of crab task list: \n'
190     for line in os.popen(cmd).readlines(): # run command
191     line = line[:-1] # strip '\n'
192     ## print ' LINE: ' + line
193     f = line.split('/') # splitting every blank
194     tag = f.pop()
195    
196     crabTask = task.Task(tag)
197    
198     #print 'Pattern: ' + pattern + ' tag: ' + crabTask.mitDataset
199 paus 1.8 if apattern != '' and re.search(apattern,crabTask.mitDataset):
200     print '\n Skipping: ' + crabTask.mitDataset + '\n\n'
201 paus 1.2 if re.search(pattern,crabTask.mitDataset):
202     crabTasks.append(crabTask)
203     crabTask.show()
204    
205     if one == 1:
206     break
207    
208     # Process the crab tasks determined to be relevant in the last query
209     print '\n=============================================================================='
210 paus 1.7 print ' Process crab task list\n'
211 paus 1.2 i = 0
212     for crabTask in crabTasks:
213    
214 paus 1.5 print '\n------------------------------------------------------------------------------'
215     print ' --> PREPPING ' + crabTask.tag \
216 paus 1.9 + '\n -> ' + crabTask.cmsDataset + ' (' + crabTask.mitDataset + ')'\
217     + '\n -> ' + crabTask.storageEle \
218     + '\n -> ' + crabTask.storagePath
219 paus 1.5 print '------------------------------------------------------------------------------\n'
220    
221 paus 1.2 dataset = crabTask.cmsDataset
222     storageEle = crabTask.storageEle
223     storagePath = crabTask.storagePath
224    
225 paus 1.10 if True:
226     f = storagePath.split("=")
227     path1 = f[1]
228     path0 = "/".join(path1.split("/")[:-1])
229     cmd = ' glexec chmod a+w ' + path0 + ' ' + path1
230 paus 1.11 #print ' Updating permissions - ' + cmd
231 paus 1.10 status = os.system(cmd)
232    
233 paus 1.2 if kill == 1:
234     crabTask.killAndRemove(1)
235     continue
236    
237 paus 1.8 if remove == 1:
238     crabTask.remove(1)
239     continue
240    
241 paus 1.11 crabTask.loadAllLfns('lfns/' + crabTask.mitDataset + '.lfns')
242 paus 1.2 #if crabTask.status == 'cataloged':
243     # ##removeCrabTask(crabTask)
244     # crabTask.killAndRemove(1)
245     # continue
246    
247     # make sure catalog is up to date
248     f = storagePath.split(" ")
249     cmd = 'catalog.sh -ceg -m ' + crabTask.mitCfg + ' ' \
250     + crabTask.mitVersion + ' ' + crabTask.mitDataset + '/' + crabTask.tag + ' --retry'
251    
252     if catalog == 3:
253     cmd = 'catalog.sh -cegt -m ' + crabTask.mitCfg + ' ' + crabTask.mitVersion + \
254     ' ' + crabTask.mitDataset + '/' + crabTask.tag + ' --retry'
255     if catalog == 4:
256     cmd = 'catalog.sh -eg -m ' + crabTask.mitCfg + ' ' + crabTask.mitVersion + \
257     ' ' + crabTask.mitDataset + '/' + crabTask.tag + ' --retry'
258     if catalog == 5:
259     cmd = 'catalog.sh -ceg -m ' + crabTask.mitCfg + ' ' + crabTask.mitVersion + \
260     ' ' + crabTask.mitDataset + '/' + crabTask.tag + ' --remove'
261 paus 1.3 if catalog == 6:
262     cmd = 'catalog.sh -e -m ' + crabTask.mitCfg + ' ' + crabTask.mitVersion + \
263     ' ' + crabTask.mitDataset + '/' + crabTask.tag + ' --compact'
264     if catalog == 7:
265     cmd = 'catalog.sh -g -m ' + crabTask.mitCfg + ' ' + crabTask.mitVersion + \
266     ' ' + crabTask.mitDataset + '/' + crabTask.tag + ' --compact'
267 paus 1.4
268 paus 1.9 #print '\n --> CATALOG '
269     #print ' ' + cmd + '\n'
270    
271 paus 1.2 if catalog != 0:
272     os.system(cmd)
273    
274     # break out of the loop as only cataloging is required
275     if catalog > 1:
276     continue
277    
278     # do we need to extend the task
279     if extend == 1:
280 paus 1.4 cmd = 'crab -extend -c ' + crabTask.tag
281 paus 1.2 print '\n------------------------------------------------------------------------------'
282 paus 1.7 print ' --> EXTEND ' + crabTask.tag + ' -- ' \
283 paus 1.2 + '\n -> ' + dataset + ' (' + crabTask.mitDataset + ')'\
284     + '\n -> ' + storageEle \
285     + '\n -> ' + storagePath
286     print '------------------------------------------------------------------------------\n'
287 paus 1.4 print ' --> ' + cmd
288 paus 1.2 os.system(cmd)
289    
290 paus 1.9 #print '\n------------------------------------------------------------------------------'
291     #print ' --> STATUS ' + crabTask.tag + ' -- ' \
292     # + '\n -> ' + dataset + ' (' + crabTask.mitDataset + ')'\
293     # + '\n -> ' + storageEle \
294     # + '\n -> ' + storagePath
295     #print '------------------------------------------------------------------------------\n'
296     print '\n --> STATUS ' + crabTask.tag + ' -- ' + crabTask.mitDataset
297 paus 1.2
298     # interact with crab to get the job status
299     crabTask.getJobStati()
300     if len(crabTask.jobStati) < 1:
301     print ' ERROR - dropped empty crab task from the work list.'
302     print ' crab task id: ' + crabTask.tag
303     continue
304     else:
305     print ' '
306     print ' Task status: ' + crabTask.status
307     if crabTask.status == 'completed' or crabTask.status == 'finished':
308     crabTask.remove(clean)
309 paus 1.5 print ' INFO - crab task has been removed, continuing.\n'
310 paus 1.4 continue
311 paus 1.2 print ' '
312    
313     # review failing sites
314     siteList = ""
315     if len(crabTask.failingSites) > 0:
316     nSites = 0
317     siteList = ",".join(crabTask.failingSites)
318     print " Failing sites (consider blacklisting them)"
319     for site,nAbort in crabTask.failingSites.iteritems():
320     nSites += 1
321     print ' ' + site + '(%d'%nAbort + ')'
322     appendBlacklistSites(crabTask.tag,siteList,blacklist,exe)
323    
324     # review all job stati and update exit stati if needed
325     for status in crabTask.jobStati:
326     if status.tag == 'Retrieved' and (status.exitCode < 0 and status.exitStatus < 0):
327     updateExitStati(crabTask.tag,status)
328     #status.showCompact()
329    
330     # review all job stati and propose action
331     subList = ''
332     resubList = ''
333     for status in crabTask.jobStati:
334     ##print ' %4.0d '%(status.index) + ' --> ' + status.tag
335     if ((status.tag == 'Created' and status.outputFile == 0)):
336     status.showCompact()
337     if subList == '':
338     subList += '%d'%(status.index)
339     else:
340     subList += ',%d'%(status.index)
341 paus 1.5 if ((status.tag == 'Aborted' or status.exitCode > 0 or status.exitStatus > 0)):
342     ##or(status.tag == 'Retrieved' and status.outputFile == 0)):
343 paus 1.2 status.showCompact()
344     if resubList == '':
345     resubList += '%d'%(status.index)
346     else:
347     resubList += ',%d'%(status.index)
348     # for failed job first remove remainders before resubmitting
349     if status.outputFile == 1:
350     removeJobRemainders(storageEle,storagePath,crabTask.mitDataset,status.index,exe)
351    
352     if subList != '':
353     if not re.search('-',subList) and not re.search(',',subList):
354     subList = subList + ',999999999'
355     cmd = 'crab -c ' + crabTask.tag + ' -submit ' + subList
356 paus 1.5 ##print '\nACTION -- SUBMIT.PY: ' + cmd
357     ##if exe == 1:
358     ## status = os.system(cmd)
359 paus 1.2 if resubList != '':
360     cmd = 'crab -c ' + crabTask.tag + ' -resubmit ' + resubList
361 paus 1.5 ##print '\nACTION -- RE-SUBMIT.PY: ' + cmd
362     ##if exe == 1:
363     ## status = os.system(cmd)
364 paus 1.2
365    
366 paus 1.4 cmd = 'crab -getoutput -continue ' + crabTask.tag
367 paus 1.9 #print '\n------------------------------------------------------------------------------'
368     #print ' --> GETOUTPUT ' + crabTask.tag + ' -- ' \
369     # + '\n -> ' + dataset \
370     # + '\n -> ' + storageEle \
371     # + '\n -> ' + storagePath
372     #print '------------------------------------------------------------------------------\n'
373     print '\n --> GETOUTPUT ' + crabTask.tag + ' -- ' + crabTask.mitDataset
374 paus 1.4 print ' --> ' + cmd
375 paus 1.2 status = os.system(cmd)
376    
377     cmd = 'cleanupLog.py --crabId ' + crabTask.tag
378     status = os.system(cmd)
379    
380     i += 1