ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/UserCode/MitProd/Processing/bin/jobSitter.py
Revision: 1.8
Committed: Thu Oct 20 23:07:28 2011 UTC (13 years, 6 months ago) by paus
Content type: text/x-python
Branch: MAIN
CVS Tags: Mit_025c_branch1, Mit_025c_branch0, Mit_025c, Mit_025b, Mit_025a
Branch point for: Mit_025c_branch
Changes since 1.7: +17 -3 lines
Log Message:
Update to include hadoop properly.

File Contents

# Content
1 #!/usr/bin/env python
2 #---------------------------------------------------------------------------------------------------
3 # Script to go through my crab jobs, get status and output and take some completion action if
4 # needed and desired.
5 #
6 # Author: C.Paus (July 1, 2008)
7 #---------------------------------------------------------------------------------------------------
8 import os,sys,getopt,re,string
9 import task
10
11 def removeCrabTask(crabTask):
12 cmd = 'crab -kill all -continue ' + crabTask.tag + ' >& /dev/null; rm -rf ' + crabTask.tag
13 print ' KILL and REMOVE task: ' + cmd
14 status = os.system(cmd)
15 return status
16
17 def updateExitStati(tag,status):
18 # Make the file
19 file = tag + '/res/CMSSW_%d.stdout'%(status.index)
20 #print ' Analyzing file: ' + file
21 cmd = 'cat ' + file
22 if os.path.exists(file):
23 for line in os.popen(cmd).readlines(): # run command
24 line = line[:-1]
25 # get ride of empty or commented lines
26 if line == '' or line[0] == '#':
27 continue
28 # no more unnecessary spaces
29 line = " ".join(str(line).split()).strip()
30
31 f = line.split(" ")
32
33 if f[0] == 'EXECUTABLE_EXIT_STATUS':
34 status.exitCode = int(f[2])
35 if f[0] == 'StageOutExitStatus':
36 status.exitStatus = int(f[2])
37 #else:
38 # print ' output file for this job not yet retrieved. '
39
40 def appendBlacklistSites(tag,failedlist,blacklist,exe):
41 # Make the file
42 file = tag + '/share/crab.cfg'
43 # Find out whether there are any ce black listed
44 ceBlacklist = ""
45 # Get the original black list
46 cmd = 'cat ' + file
47
48 # finding the one line
49 join = 0
50 fullLine = ""
51 bSlash = "\\";
52
53 # New Configuration file
54 fileOutput = open(tag + '_crab.cfg','w')
55
56 for line in os.popen(cmd).readlines(): # run command
57 line = line[:-1]
58 ##print 'Line: ' + line
59
60 # get ride of empty or commented lines
61 if line == '' or line[0] == '#':
62 continue
63 # no more unnecessary spaces
64 line = " ".join(str(line).split()).strip()
65 # join lines
66 if join == 1:
67 fullLine += line
68 else:
69 fullLine = line
70 # determine if finished or more is coming
71 if fullLine[-1] == bSlash:
72 join = 1
73 fullLine = fullLine[:-1]
74 # line really ended, so now look at the whole thing
75 else:
76 join = 0
77 # test whether there is a directory
78 names = fullLine.split('=') # splitting every blank
79 ##print "FullLine: " + fullLine
80 if names[0] == 'ce_black_list':
81 ceBlacklist = fullLine
82 fullLine += ',' + failedlist
83 if blacklist != "":
84 fullLine = "ce_black_list=" + blacklist
85 print " new blacklist: " + fullLine
86
87 fileOutput.write(fullLine + '\n')
88
89 fileOutput.close()
90
91 if ceBlacklist == '':
92 print ' WARNING - site blacklisting did not work'
93
94 cmd = 'mv ' + tag + '_crab.cfg ' + file
95 print "\nACTION -- MOVE: " + cmd
96 status = 0
97 if exe == 1:
98 status = os.system(cmd)
99 else:
100 ##status = os.system('cat ' + tag + '_crab.cfg')
101 status = os.system('rm ' + tag + '_crab.cfg')
102 return status
103
104 def removeJobRemainders(storageEle,storagePath,mitDataset,index,exe):
105 fileMit = 'srm://' + storageEle + ':8443' + storagePath + '/' + mitDataset + \
106 '_000_%d'%(index) + '.root'
107 fileEdm = 'srm://' + storageEle + ':8443' + storagePath + '/' + mitDataset + \
108 '-edm_%d'%(index) + '.root'
109 cmd = 'srmrm ' + fileMit + ' ' + fileEdm + ' >& /dev/null'
110 #print 'ACTION -- REMOVE: \n srmrm ' + fileMit + '\n srmrm ' + fileEdm + '\n'
111 status = 0
112 if exe == 1:
113 # for now not execute this # status = os.system(cmd)
114 print 'For now not removing file: ' + cmd
115 return status
116
117 #===================================================================================================
118 # Main starts here
119 #===================================================================================================
120 # Define string to explain usage of the script
121 usage = \
122 "\nUsage: jobSitter.py [ --pattern= --apattern= --blacklist=" \
123 + " --status --kill --remove --help --backward --clean --extend --one" \
124 + " --exe ]\n"
125
126 # Define the valid options which can be specified and check out the command line
127 valid = ['pattern=','apattern=','blacklist=','catalog=',
128 'help','backward','kill','remove','clean','exe','extend','one']
129 try:
130 opts, args = getopt.getopt(sys.argv[1:], "", valid)
131 except getopt.GetoptError, ex:
132 print usage
133 print str(ex)
134 sys.exit(1)
135
136 # --------------------------------------------------------------------------------------------------
137 # Get all parameters for this little task
138 # --------------------------------------------------------------------------------------------------
139 # Set defaults
140 pattern = ''
141 apattern = ''
142 blacklist = ''
143 catalog = 0
144 clean = 0
145 kill = 0
146 remove = 0
147 exe = 0
148 extend = 0
149 one = 0
150 backward = ''
151
152 # Read new values from the command line
153 for opt, arg in opts:
154 if opt == "--help":
155 print usage
156 sys.exit(0)
157 if opt == "--pattern":
158 pattern = arg
159 if opt == "--apattern":
160 apattern = arg
161 if opt == "--blacklist":
162 blacklist = arg
163 if opt == "--catalog":
164 catalog = int(arg)
165 if opt == "--clean":
166 clean = 1
167 if opt == "--one":
168 one = 1
169 if opt == "--exe":
170 exe = 1
171 if opt == "--extend":
172 extend = 1
173 if opt == "--backward":
174 backward = ' -r '
175 if opt == "--kill":
176 kill = 1
177 if opt == "--remove":
178 remove = 1
179
180 # --------------------------------------------------------------------------------------------------
181 # Here is where the real action starts -------------------------------------------------------------
182 # --------------------------------------------------------------------------------------------------
183
184 # Find the list of crab tasks to babysit
185 crabTasks = []
186 datasetList = []
187 cmd = 'find ./ -maxdepth 1 -name crab_0_\* |grep -v cfg | sort' + backward
188 print '\n=============================================================================='
189 print ' Summary of crab task list: \n'
190 for line in os.popen(cmd).readlines(): # run command
191 line = line[:-1] # strip '\n'
192 ## print ' LINE: ' + line
193 f = line.split('/') # splitting every blank
194 tag = f.pop()
195
196 crabTask = task.Task(tag)
197
198 #print 'Pattern: ' + pattern + ' tag: ' + crabTask.mitDataset
199 if apattern != '' and re.search(apattern,crabTask.mitDataset):
200 print '\n Skipping: ' + crabTask.mitDataset + '\n\n'
201 if re.search(pattern,crabTask.mitDataset):
202 crabTasks.append(crabTask)
203 crabTask.show()
204
205 if one == 1:
206 break
207
208 # Process the crab tasks determined to be relevant in the last query
209 print '\n=============================================================================='
210 print ' Process crab task list\n'
211 i = 0
212 for crabTask in crabTasks:
213
214 print '\n------------------------------------------------------------------------------'
215 print ' --> PREPPING ' + crabTask.tag \
216 + '\n -> ' + crabTask.mitDataset
217 print '------------------------------------------------------------------------------\n'
218
219 dataset = crabTask.cmsDataset
220 storageEle = crabTask.storageEle
221 storagePath = crabTask.storagePath
222
223 if kill == 1:
224 crabTask.killAndRemove(1)
225 continue
226
227 if remove == 1:
228 crabTask.remove(1)
229 continue
230
231 crabTask.loadAllLfns(crabTask.mitCfg + '/' + crabTask.mitVersion + '/' + \
232 crabTask.mitDataset + '.lfns')
233 #if crabTask.status == 'cataloged':
234 # ##removeCrabTask(crabTask)
235 # crabTask.killAndRemove(1)
236 # continue
237
238 # make sure catalog is up to date
239 f = storagePath.split(" ")
240 cmd = 'catalog.sh -ceg -m ' + crabTask.mitCfg + ' ' \
241 + crabTask.mitVersion + ' ' + crabTask.mitDataset + '/' + crabTask.tag + ' --retry'
242
243 if catalog == 3:
244 cmd = 'catalog.sh -cegt -m ' + crabTask.mitCfg + ' ' + crabTask.mitVersion + \
245 ' ' + crabTask.mitDataset + '/' + crabTask.tag + ' --retry'
246 if catalog == 4:
247 cmd = 'catalog.sh -eg -m ' + crabTask.mitCfg + ' ' + crabTask.mitVersion + \
248 ' ' + crabTask.mitDataset + '/' + crabTask.tag + ' --retry'
249 if catalog == 5:
250 cmd = 'catalog.sh -ceg -m ' + crabTask.mitCfg + ' ' + crabTask.mitVersion + \
251 ' ' + crabTask.mitDataset + '/' + crabTask.tag + ' --remove'
252 if catalog == 6:
253 cmd = 'catalog.sh -e -m ' + crabTask.mitCfg + ' ' + crabTask.mitVersion + \
254 ' ' + crabTask.mitDataset + '/' + crabTask.tag + ' --compact'
255 if catalog == 7:
256 cmd = 'catalog.sh -g -m ' + crabTask.mitCfg + ' ' + crabTask.mitVersion + \
257 ' ' + crabTask.mitDataset + '/' + crabTask.tag + ' --compact'
258
259
260 print '\n------------------------------------------------------------------------------'
261 print ' --> CATALOG ' + crabTask.tag \
262 + '\n -> ' + dataset + ' (' + crabTask.mitDataset + ')'\
263 + '\n -> ' + storageEle \
264 + '\n -> ' + storagePath
265 print '------------------------------------------------------------------------------\n'
266 #print ' --> ' + cmd
267
268 if catalog != 0:
269 os.system(cmd)
270
271 # break out of the loop as only cataloging is required
272 if catalog > 1:
273 continue
274
275 # do we need to extend the task
276 if extend == 1:
277 cmd = 'crab -extend -c ' + crabTask.tag
278 print '\n------------------------------------------------------------------------------'
279 print ' --> EXTEND ' + crabTask.tag + ' -- ' \
280 + '\n -> ' + dataset + ' (' + crabTask.mitDataset + ')'\
281 + '\n -> ' + storageEle \
282 + '\n -> ' + storagePath
283 print '------------------------------------------------------------------------------\n'
284 print ' --> ' + cmd
285 os.system(cmd)
286
287 print '\n------------------------------------------------------------------------------'
288 print ' --> STATUS ' + crabTask.tag + ' -- ' \
289 + '\n -> ' + dataset + ' (' + crabTask.mitDataset + ')'\
290 + '\n -> ' + storageEle \
291 + '\n -> ' + storagePath
292 print '------------------------------------------------------------------------------\n'
293
294 # interact with crab to get the job status
295 crabTask.getJobStati()
296 if len(crabTask.jobStati) < 1:
297 print ' ERROR - dropped empty crab task from the work list.'
298 print ' crab task id: ' + crabTask.tag
299 continue
300 else:
301 print ' '
302 print ' Task status: ' + crabTask.status
303 if crabTask.status == 'completed' or crabTask.status == 'finished':
304 crabTask.remove(clean)
305 print ' INFO - crab task has been removed, continuing.\n'
306 continue
307 print ' '
308
309 # review failing sites
310 siteList = ""
311 if len(crabTask.failingSites) > 0:
312 nSites = 0
313 siteList = ",".join(crabTask.failingSites)
314 print " Failing sites (consider blacklisting them)"
315 for site,nAbort in crabTask.failingSites.iteritems():
316 nSites += 1
317 print ' ' + site + '(%d'%nAbort + ')'
318 appendBlacklistSites(crabTask.tag,siteList,blacklist,exe)
319
320 # review all job stati and update exit stati if needed
321 for status in crabTask.jobStati:
322 if status.tag == 'Retrieved' and (status.exitCode < 0 and status.exitStatus < 0):
323 updateExitStati(crabTask.tag,status)
324 #status.showCompact()
325
326 # review all job stati and propose action
327 subList = ''
328 resubList = ''
329 for status in crabTask.jobStati:
330 ##print ' %4.0d '%(status.index) + ' --> ' + status.tag
331 if ((status.tag == 'Created' and status.outputFile == 0)):
332 status.showCompact()
333 if subList == '':
334 subList += '%d'%(status.index)
335 else:
336 subList += ',%d'%(status.index)
337 if ((status.tag == 'Aborted' or status.exitCode > 0 or status.exitStatus > 0)):
338 ##or(status.tag == 'Retrieved' and status.outputFile == 0)):
339 status.showCompact()
340 if resubList == '':
341 resubList += '%d'%(status.index)
342 else:
343 resubList += ',%d'%(status.index)
344 # for failed job first remove remainders before resubmitting
345 if status.outputFile == 1:
346 removeJobRemainders(storageEle,storagePath,crabTask.mitDataset,status.index,exe)
347
348 if subList != '':
349 if not re.search('-',subList) and not re.search(',',subList):
350 subList = subList + ',999999999'
351 cmd = 'crab -c ' + crabTask.tag + ' -submit ' + subList
352 ##print '\nACTION -- SUBMIT.PY: ' + cmd
353 ##if exe == 1:
354 ## status = os.system(cmd)
355 if resubList != '':
356 cmd = 'crab -c ' + crabTask.tag + ' -resubmit ' + resubList
357 ##print '\nACTION -- RE-SUBMIT.PY: ' + cmd
358 ##if exe == 1:
359 ## status = os.system(cmd)
360
361
362 cmd = 'crab -getoutput -continue ' + crabTask.tag
363 print '\n------------------------------------------------------------------------------'
364 print ' --> GETOUTPUT ' + crabTask.tag + ' -- ' \
365 + '\n -> ' + dataset \
366 + '\n -> ' + storageEle \
367 + '\n -> ' + storagePath
368 print '------------------------------------------------------------------------------\n'
369 print ' --> ' + cmd
370 status = os.system(cmd)
371
372 cmd = 'cleanupLog.py --crabId ' + crabTask.tag
373 status = os.system(cmd)
374
375 i += 1