ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/UserCode/MitProd/Processing/bin/jobSitter.py
Revision: 1.11
Committed: Sat Jun 29 03:05:18 2013 UTC (11 years, 10 months ago) by paus
Content type: text/x-python
Branch: MAIN
CVS Tags: Mit_032, Mit_031, HEAD
Changes since 1.10: +2 -3 lines
Log Message:
Improve lfn handling.

File Contents

# Content
1 #!/usr/bin/env python
2 #---------------------------------------------------------------------------------------------------
3 # Script to go through my crab jobs, get status and output and take some completion action if
4 # needed and desired.
5 #
6 # Author: C.Paus (July 1, 2008)
7 #---------------------------------------------------------------------------------------------------
8 import os,sys,getopt,re,string
9 import task
10
11 def removeCrabTask(crabTask):
12 cmd = 'crab -kill all -continue ' + crabTask.tag + ' >& /dev/null; rm -rf ' + crabTask.tag
13 print ' KILL and REMOVE task: ' + cmd
14 status = os.system(cmd)
15 return status
16
17 def updateExitStati(tag,status):
18 # Make the file
19 file = tag + '/res/CMSSW_%d.stdout'%(status.index)
20 #print ' Analyzing file: ' + file
21 cmd = 'cat ' + file
22 if os.path.exists(file):
23 for line in os.popen(cmd).readlines(): # run command
24 line = line[:-1]
25 # get ride of empty or commented lines
26 if line == '' or line[0] == '#':
27 continue
28 # no more unnecessary spaces
29 line = " ".join(str(line).split()).strip()
30
31 f = line.split(" ")
32
33 if f[0] == 'EXECUTABLE_EXIT_STATUS':
34 status.exitCode = int(f[2])
35 if f[0] == 'StageOutExitStatus':
36 status.exitStatus = int(f[2])
37 #else:
38 # print ' output file for this job not yet retrieved. '
39
40 def appendBlacklistSites(tag,failedlist,blacklist,exe):
41 # Make the file
42 file = tag + '/share/crab.cfg'
43 # Find out whether there are any ce black listed
44 ceBlacklist = ""
45 # Get the original black list
46 cmd = 'cat ' + file
47
48 # finding the one line
49 join = 0
50 fullLine = ""
51 bSlash = "\\";
52
53 # New Configuration file
54 fileOutput = open(tag + '_crab.cfg','w')
55
56 for line in os.popen(cmd).readlines(): # run command
57 line = line[:-1]
58 ##print 'Line: ' + line
59
60 # get ride of empty or commented lines
61 if line == '' or line[0] == '#':
62 continue
63 # no more unnecessary spaces
64 line = " ".join(str(line).split()).strip()
65 # join lines
66 if join == 1:
67 fullLine += line
68 else:
69 fullLine = line
70 # determine if finished or more is coming
71 if fullLine[-1] == bSlash:
72 join = 1
73 fullLine = fullLine[:-1]
74 # line really ended, so now look at the whole thing
75 else:
76 join = 0
77 # test whether there is a directory
78 names = fullLine.split('=') # splitting every blank
79 ##print "FullLine: " + fullLine
80 if names[0] == 'ce_black_list':
81 ceBlacklist = fullLine
82 fullLine += ',' + failedlist
83 if blacklist != "":
84 fullLine = "ce_black_list=" + blacklist
85 print " new blacklist: " + fullLine
86
87 fileOutput.write(fullLine + '\n')
88
89 fileOutput.close()
90
91 if ceBlacklist == '':
92 print ' WARNING - site blacklisting did not work'
93
94 cmd = 'mv ' + tag + '_crab.cfg ' + file
95 print "\nACTION -- MOVE: " + cmd
96 status = 0
97 if exe == 1:
98 status = os.system(cmd)
99 else:
100 ##status = os.system('cat ' + tag + '_crab.cfg')
101 status = os.system('rm ' + tag + '_crab.cfg')
102 return status
103
104 def removeJobRemainders(storageEle,storagePath,mitDataset,index,exe):
105 fileMit = 'srm://' + storageEle + ':8443' + storagePath + '/' + mitDataset + \
106 '_000_%d'%(index) + '.root'
107 fileEdm = 'srm://' + storageEle + ':8443' + storagePath + '/' + mitDataset + \
108 '-edm_%d'%(index) + '.root'
109 cmd = 'srmrm ' + fileMit + ' ' + fileEdm + ' >& /dev/null'
110 #print 'ACTION -- REMOVE: \n srmrm ' + fileMit + '\n srmrm ' + fileEdm + '\n'
111 status = 0
112 if exe == 1:
113 # for now not execute this # status = os.system(cmd)
114 print 'For now not removing file: ' + cmd
115 return status
116
117 #===================================================================================================
118 # Main starts here
119 #===================================================================================================
120 # Define string to explain usage of the script
121 usage = \
122 "\nUsage: jobSitter.py [ --pattern= --apattern= --blacklist=" \
123 + " --status --kill --remove --help --backward --clean --extend --one" \
124 + " --exe ]\n"
125
126 # Define the valid options which can be specified and check out the command line
127 valid = ['pattern=','apattern=','blacklist=','catalog=',
128 'help','backward','kill','remove','clean','exe','extend','one']
129 try:
130 opts, args = getopt.getopt(sys.argv[1:], "", valid)
131 except getopt.GetoptError, ex:
132 print usage
133 print str(ex)
134 sys.exit(1)
135
136 # --------------------------------------------------------------------------------------------------
137 # Get all parameters for this little task
138 # --------------------------------------------------------------------------------------------------
139 # Set defaults
140 pattern = ''
141 apattern = ''
142 blacklist = ''
143 catalog = 0
144 clean = 0
145 kill = 0
146 remove = 0
147 exe = 0
148 extend = 0
149 one = 0
150 backward = ''
151
152 # Read new values from the command line
153 for opt, arg in opts:
154 if opt == "--help":
155 print usage
156 sys.exit(0)
157 if opt == "--pattern":
158 pattern = arg
159 if opt == "--apattern":
160 apattern = arg
161 if opt == "--blacklist":
162 blacklist = arg
163 if opt == "--catalog":
164 catalog = int(arg)
165 if opt == "--clean":
166 clean = 1
167 if opt == "--one":
168 one = 1
169 if opt == "--exe":
170 exe = 1
171 if opt == "--extend":
172 extend = 1
173 if opt == "--backward":
174 backward = ' -r '
175 if opt == "--kill":
176 kill = 1
177 if opt == "--remove":
178 remove = 1
179
180 # --------------------------------------------------------------------------------------------------
181 # Here is where the real action starts -------------------------------------------------------------
182 # --------------------------------------------------------------------------------------------------
183
184 # Find the list of crab tasks to babysit
185 crabTasks = []
186 datasetList = []
187 cmd = 'find ./ -maxdepth 1 -name crab_0_\* |grep -v cfg | sort' + backward
188 print '\n=============================================================================='
189 print ' Summary of crab task list: \n'
190 for line in os.popen(cmd).readlines(): # run command
191 line = line[:-1] # strip '\n'
192 ## print ' LINE: ' + line
193 f = line.split('/') # splitting every blank
194 tag = f.pop()
195
196 crabTask = task.Task(tag)
197
198 #print 'Pattern: ' + pattern + ' tag: ' + crabTask.mitDataset
199 if apattern != '' and re.search(apattern,crabTask.mitDataset):
200 print '\n Skipping: ' + crabTask.mitDataset + '\n\n'
201 if re.search(pattern,crabTask.mitDataset):
202 crabTasks.append(crabTask)
203 crabTask.show()
204
205 if one == 1:
206 break
207
208 # Process the crab tasks determined to be relevant in the last query
209 print '\n=============================================================================='
210 print ' Process crab task list\n'
211 i = 0
212 for crabTask in crabTasks:
213
214 print '\n------------------------------------------------------------------------------'
215 print ' --> PREPPING ' + crabTask.tag \
216 + '\n -> ' + crabTask.cmsDataset + ' (' + crabTask.mitDataset + ')'\
217 + '\n -> ' + crabTask.storageEle \
218 + '\n -> ' + crabTask.storagePath
219 print '------------------------------------------------------------------------------\n'
220
221 dataset = crabTask.cmsDataset
222 storageEle = crabTask.storageEle
223 storagePath = crabTask.storagePath
224
225 if True:
226 f = storagePath.split("=")
227 path1 = f[1]
228 path0 = "/".join(path1.split("/")[:-1])
229 cmd = ' glexec chmod a+w ' + path0 + ' ' + path1
230 #print ' Updating permissions - ' + cmd
231 status = os.system(cmd)
232
233 if kill == 1:
234 crabTask.killAndRemove(1)
235 continue
236
237 if remove == 1:
238 crabTask.remove(1)
239 continue
240
241 crabTask.loadAllLfns('lfns/' + crabTask.mitDataset + '.lfns')
242 #if crabTask.status == 'cataloged':
243 # ##removeCrabTask(crabTask)
244 # crabTask.killAndRemove(1)
245 # continue
246
247 # make sure catalog is up to date
248 f = storagePath.split(" ")
249 cmd = 'catalog.sh -ceg -m ' + crabTask.mitCfg + ' ' \
250 + crabTask.mitVersion + ' ' + crabTask.mitDataset + '/' + crabTask.tag + ' --retry'
251
252 if catalog == 3:
253 cmd = 'catalog.sh -cegt -m ' + crabTask.mitCfg + ' ' + crabTask.mitVersion + \
254 ' ' + crabTask.mitDataset + '/' + crabTask.tag + ' --retry'
255 if catalog == 4:
256 cmd = 'catalog.sh -eg -m ' + crabTask.mitCfg + ' ' + crabTask.mitVersion + \
257 ' ' + crabTask.mitDataset + '/' + crabTask.tag + ' --retry'
258 if catalog == 5:
259 cmd = 'catalog.sh -ceg -m ' + crabTask.mitCfg + ' ' + crabTask.mitVersion + \
260 ' ' + crabTask.mitDataset + '/' + crabTask.tag + ' --remove'
261 if catalog == 6:
262 cmd = 'catalog.sh -e -m ' + crabTask.mitCfg + ' ' + crabTask.mitVersion + \
263 ' ' + crabTask.mitDataset + '/' + crabTask.tag + ' --compact'
264 if catalog == 7:
265 cmd = 'catalog.sh -g -m ' + crabTask.mitCfg + ' ' + crabTask.mitVersion + \
266 ' ' + crabTask.mitDataset + '/' + crabTask.tag + ' --compact'
267
268 #print '\n --> CATALOG '
269 #print ' ' + cmd + '\n'
270
271 if catalog != 0:
272 os.system(cmd)
273
274 # break out of the loop as only cataloging is required
275 if catalog > 1:
276 continue
277
278 # do we need to extend the task
279 if extend == 1:
280 cmd = 'crab -extend -c ' + crabTask.tag
281 print '\n------------------------------------------------------------------------------'
282 print ' --> EXTEND ' + crabTask.tag + ' -- ' \
283 + '\n -> ' + dataset + ' (' + crabTask.mitDataset + ')'\
284 + '\n -> ' + storageEle \
285 + '\n -> ' + storagePath
286 print '------------------------------------------------------------------------------\n'
287 print ' --> ' + cmd
288 os.system(cmd)
289
290 #print '\n------------------------------------------------------------------------------'
291 #print ' --> STATUS ' + crabTask.tag + ' -- ' \
292 # + '\n -> ' + dataset + ' (' + crabTask.mitDataset + ')'\
293 # + '\n -> ' + storageEle \
294 # + '\n -> ' + storagePath
295 #print '------------------------------------------------------------------------------\n'
296 print '\n --> STATUS ' + crabTask.tag + ' -- ' + crabTask.mitDataset
297
298 # interact with crab to get the job status
299 crabTask.getJobStati()
300 if len(crabTask.jobStati) < 1:
301 print ' ERROR - dropped empty crab task from the work list.'
302 print ' crab task id: ' + crabTask.tag
303 continue
304 else:
305 print ' '
306 print ' Task status: ' + crabTask.status
307 if crabTask.status == 'completed' or crabTask.status == 'finished':
308 crabTask.remove(clean)
309 print ' INFO - crab task has been removed, continuing.\n'
310 continue
311 print ' '
312
313 # review failing sites
314 siteList = ""
315 if len(crabTask.failingSites) > 0:
316 nSites = 0
317 siteList = ",".join(crabTask.failingSites)
318 print " Failing sites (consider blacklisting them)"
319 for site,nAbort in crabTask.failingSites.iteritems():
320 nSites += 1
321 print ' ' + site + '(%d'%nAbort + ')'
322 appendBlacklistSites(crabTask.tag,siteList,blacklist,exe)
323
324 # review all job stati and update exit stati if needed
325 for status in crabTask.jobStati:
326 if status.tag == 'Retrieved' and (status.exitCode < 0 and status.exitStatus < 0):
327 updateExitStati(crabTask.tag,status)
328 #status.showCompact()
329
330 # review all job stati and propose action
331 subList = ''
332 resubList = ''
333 for status in crabTask.jobStati:
334 ##print ' %4.0d '%(status.index) + ' --> ' + status.tag
335 if ((status.tag == 'Created' and status.outputFile == 0)):
336 status.showCompact()
337 if subList == '':
338 subList += '%d'%(status.index)
339 else:
340 subList += ',%d'%(status.index)
341 if ((status.tag == 'Aborted' or status.exitCode > 0 or status.exitStatus > 0)):
342 ##or(status.tag == 'Retrieved' and status.outputFile == 0)):
343 status.showCompact()
344 if resubList == '':
345 resubList += '%d'%(status.index)
346 else:
347 resubList += ',%d'%(status.index)
348 # for failed job first remove remainders before resubmitting
349 if status.outputFile == 1:
350 removeJobRemainders(storageEle,storagePath,crabTask.mitDataset,status.index,exe)
351
352 if subList != '':
353 if not re.search('-',subList) and not re.search(',',subList):
354 subList = subList + ',999999999'
355 cmd = 'crab -c ' + crabTask.tag + ' -submit ' + subList
356 ##print '\nACTION -- SUBMIT.PY: ' + cmd
357 ##if exe == 1:
358 ## status = os.system(cmd)
359 if resubList != '':
360 cmd = 'crab -c ' + crabTask.tag + ' -resubmit ' + resubList
361 ##print '\nACTION -- RE-SUBMIT.PY: ' + cmd
362 ##if exe == 1:
363 ## status = os.system(cmd)
364
365
366 cmd = 'crab -getoutput -continue ' + crabTask.tag
367 #print '\n------------------------------------------------------------------------------'
368 #print ' --> GETOUTPUT ' + crabTask.tag + ' -- ' \
369 # + '\n -> ' + dataset \
370 # + '\n -> ' + storageEle \
371 # + '\n -> ' + storagePath
372 #print '------------------------------------------------------------------------------\n'
373 print '\n --> GETOUTPUT ' + crabTask.tag + ' -- ' + crabTask.mitDataset
374 print ' --> ' + cmd
375 status = os.system(cmd)
376
377 cmd = 'cleanupLog.py --crabId ' + crabTask.tag
378 status = os.system(cmd)
379
380 i += 1