ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/UserCode/MitProd/Processing/bin/jobSitter.py
Revision: 1.7
Committed: Tue Mar 22 02:48:51 2011 UTC (14 years, 1 month ago) by paus
Content type: text/x-python
Branch: MAIN
CVS Tags: Mit_025, Mit_025pre2, Mit_024b, Mit_025pre1, Mit_024a, Mit_024, Mit_023, Mit_022a, Mit_022, Mit_020d, TMit_020d, Mit_020c, Mit_021, Mit_021pre2, Mit_021pre1, Mit_020b, Mit_020a, Mit_020
Changes since 1.6: +4 -4 lines
Log Message:
Version 020 updates (64 bit architecture).

File Contents

# Content
1 #!/usr/bin/env python
2 #---------------------------------------------------------------------------------------------------
3 # Script to go through my crab jobs, get status and output and take some completion action if
4 # needed and desired.
5 #
6 # Author: C.Paus (July 1, 2008)
7 #---------------------------------------------------------------------------------------------------
8 import os,sys,getopt,re,string
9 import task
10
11 def removeCrabTask(crabTask):
12 cmd = 'crab -kill all -continue ' + crabTask.tag + ' >& /dev/null; rm -rf ' + crabTask.tag
13 print ' KILL and REMOVE task: ' + cmd
14 status = os.system(cmd)
15 return status
16
17 def updateExitStati(tag,status):
18 # Make the file
19 file = tag + '/res/CMSSW_%d.stdout'%(status.index)
20 #print ' Analyzing file: ' + file
21 cmd = 'cat ' + file
22 if os.path.exists(file):
23 for line in os.popen(cmd).readlines(): # run command
24 line = line[:-1]
25 # get ride of empty or commented lines
26 if line == '' or line[0] == '#':
27 continue
28 # no more unnecessary spaces
29 line = " ".join(str(line).split()).strip()
30
31 f = line.split(" ")
32
33 if f[0] == 'EXECUTABLE_EXIT_STATUS':
34 status.exitCode = int(f[2])
35 if f[0] == 'StageOutExitStatus':
36 status.exitStatus = int(f[2])
37 #else:
38 # print ' output file for this job not yet retrieved. '
39
40 def appendBlacklistSites(tag,failedlist,blacklist,exe):
41 # Make the file
42 file = tag + '/share/crab.cfg'
43 # Find out whether there are any ce black listed
44 ceBlacklist = ""
45 # Get the original black list
46 cmd = 'cat ' + file
47
48 # finding the one line
49 join = 0
50 fullLine = ""
51 bSlash = "\\";
52
53 # New Configuration file
54 fileOutput = open(tag + '_crab.cfg','w')
55
56 for line in os.popen(cmd).readlines(): # run command
57 line = line[:-1]
58 ##print 'Line: ' + line
59
60 # get ride of empty or commented lines
61 if line == '' or line[0] == '#':
62 continue
63 # no more unnecessary spaces
64 line = " ".join(str(line).split()).strip()
65 # join lines
66 if join == 1:
67 fullLine += line
68 else:
69 fullLine = line
70 # determine if finished or more is coming
71 if fullLine[-1] == bSlash:
72 join = 1
73 fullLine = fullLine[:-1]
74 # line really ended, so now look at the whole thing
75 else:
76 join = 0
77 # test whether there is a directory
78 names = fullLine.split('=') # splitting every blank
79 ##print "FullLine: " + fullLine
80 if names[0] == 'ce_black_list':
81 ceBlacklist = fullLine
82 fullLine += ',' + failedlist
83 if blacklist != "":
84 fullLine = "ce_black_list=" + blacklist
85 print " new blacklist: " + fullLine
86
87 fileOutput.write(fullLine + '\n')
88
89 fileOutput.close()
90
91 if ceBlacklist == '':
92 print ' WARNING - site blacklisting did not work'
93
94 cmd = 'mv ' + tag + '_crab.cfg ' + file
95 print "\nACTION -- MOVE: " + cmd
96 status = 0
97 if exe == 1:
98 status = os.system(cmd)
99 else:
100 ##status = os.system('cat ' + tag + '_crab.cfg')
101 status = os.system('rm ' + tag + '_crab.cfg')
102 return status
103
104 def removeJobRemainders(storageEle,storagePath,mitDataset,index,exe):
105 fileMit = 'srm://' + storageEle + ':8443' + storagePath + '/' + mitDataset + \
106 '_000_%d'%(index) + '.root'
107 fileEdm = 'srm://' + storageEle + ':8443' + storagePath + '/' + mitDataset + \
108 '-edm_%d'%(index) + '.root'
109 cmd = 'srmrm ' + fileMit + ' ' + fileEdm + ' >& /dev/null'
110 #print 'ACTION -- REMOVE: \n srmrm ' + fileMit + '\n srmrm ' + fileEdm + '\n'
111 status = 0
112 if exe == 1:
113 # for now not execute this # status = os.system(cmd)
114 print 'For now not removing file: ' + cmd
115 return status
116
117 #===================================================================================================
118 # Main starts here
119 #===================================================================================================
120 # Define string to explain usage of the script
121 usage = \
122 "\nUsage: jobSitter.py [ --pattern= --blacklist=" + \
123 " --status --help --backward --clean --extend --one --exe ]\n"
124
125 # Define the valid options which can be specified and check out the command line
126 valid = ['pattern=','blacklist=','catalog=','help','backward','kill','clean','exe','extend','one']
127 try:
128 opts, args = getopt.getopt(sys.argv[1:], "", valid)
129 except getopt.GetoptError, ex:
130 print usage
131 print str(ex)
132 sys.exit(1)
133
134 # --------------------------------------------------------------------------------------------------
135 # Get all parameters for this little task
136 # --------------------------------------------------------------------------------------------------
137 # Set defaults
138 pattern = ''
139 blacklist = ''
140 catalog = 0
141 clean = 0
142 kill = 0
143 exe = 0
144 extend = 0
145 one = 0
146 backward = ''
147
148 # Read new values from the command line
149 for opt, arg in opts:
150 if opt == "--help":
151 print usage
152 sys.exit(0)
153 if opt == "--pattern":
154 pattern = arg
155 if opt == "--blacklist":
156 blacklist = arg
157 if opt == "--catalog":
158 catalog = int(arg)
159 if opt == "--clean":
160 clean = 1
161 if opt == "--one":
162 one = 1
163 if opt == "--exe":
164 exe = 1
165 if opt == "--extend":
166 extend = 1
167 if opt == "--backward":
168 backward = ' -r '
169 if opt == "--kill":
170 kill = 1
171
172 # --------------------------------------------------------------------------------------------------
173 # Here is where the real action starts -------------------------------------------------------------
174 # --------------------------------------------------------------------------------------------------
175
176 # Find the list of crab tasks to babysit
177 crabTasks = []
178 datasetList = []
179 cmd = 'find ./ -maxdepth 1 -name crab_0_\* |grep -v cfg | sort' + backward
180 print '\n=============================================================================='
181 print ' Summary of crab task list: \n'
182 for line in os.popen(cmd).readlines(): # run command
183 line = line[:-1] # strip '\n'
184 ## print ' LINE: ' + line
185 f = line.split('/') # splitting every blank
186 tag = f.pop()
187
188 crabTask = task.Task(tag)
189
190 #print 'Pattern: ' + pattern + ' tag: ' + crabTask.mitDataset
191 if re.search(pattern,crabTask.mitDataset):
192 crabTasks.append(crabTask)
193 crabTask.show()
194
195 if one == 1:
196 break
197
198 # Process the crab tasks determined to be relevant in the last query
199 print '\n=============================================================================='
200 print ' Process crab task list\n'
201 i = 0
202 for crabTask in crabTasks:
203
204 print '\n------------------------------------------------------------------------------'
205 print ' --> PREPPING ' + crabTask.tag \
206 + '\n -> ' + crabTask.mitDataset
207 print '------------------------------------------------------------------------------\n'
208
209 dataset = crabTask.cmsDataset
210 storageEle = crabTask.storageEle
211 storagePath = crabTask.storagePath
212
213 if kill == 1:
214 crabTask.killAndRemove(1)
215 continue
216
217 crabTask.loadAllLfns(crabTask.mitCfg + '/' + crabTask.mitVersion + '/' + \
218 crabTask.mitDataset + '.lfns')
219 #if crabTask.status == 'cataloged':
220 # ##removeCrabTask(crabTask)
221 # crabTask.killAndRemove(1)
222 # continue
223
224 # make sure catalog is up to date
225 f = storagePath.split(" ")
226 cmd = 'catalog.sh -ceg -m ' + crabTask.mitCfg + ' ' \
227 + crabTask.mitVersion + ' ' + crabTask.mitDataset + '/' + crabTask.tag + ' --retry'
228
229 if catalog == 3:
230 cmd = 'catalog.sh -cegt -m ' + crabTask.mitCfg + ' ' + crabTask.mitVersion + \
231 ' ' + crabTask.mitDataset + '/' + crabTask.tag + ' --retry'
232 if catalog == 4:
233 cmd = 'catalog.sh -eg -m ' + crabTask.mitCfg + ' ' + crabTask.mitVersion + \
234 ' ' + crabTask.mitDataset + '/' + crabTask.tag + ' --retry'
235 if catalog == 5:
236 cmd = 'catalog.sh -ceg -m ' + crabTask.mitCfg + ' ' + crabTask.mitVersion + \
237 ' ' + crabTask.mitDataset + '/' + crabTask.tag + ' --remove'
238 if catalog == 6:
239 cmd = 'catalog.sh -e -m ' + crabTask.mitCfg + ' ' + crabTask.mitVersion + \
240 ' ' + crabTask.mitDataset + '/' + crabTask.tag + ' --compact'
241 if catalog == 7:
242 cmd = 'catalog.sh -g -m ' + crabTask.mitCfg + ' ' + crabTask.mitVersion + \
243 ' ' + crabTask.mitDataset + '/' + crabTask.tag + ' --compact'
244
245
246 print '\n------------------------------------------------------------------------------'
247 print ' --> CATALOG ' + crabTask.tag \
248 + '\n -> ' + dataset + ' (' + crabTask.mitDataset + ')'\
249 + '\n -> ' + storageEle \
250 + '\n -> ' + storagePath
251 print '------------------------------------------------------------------------------\n'
252 #print ' --> ' + cmd
253
254 if catalog != 0:
255 os.system(cmd)
256
257 # break out of the loop as only cataloging is required
258 if catalog > 1:
259 continue
260
261 # do we need to extend the task
262 if extend == 1:
263 cmd = 'crab -extend -c ' + crabTask.tag
264 print '\n------------------------------------------------------------------------------'
265 print ' --> EXTEND ' + crabTask.tag + ' -- ' \
266 + '\n -> ' + dataset + ' (' + crabTask.mitDataset + ')'\
267 + '\n -> ' + storageEle \
268 + '\n -> ' + storagePath
269 print '------------------------------------------------------------------------------\n'
270 print ' --> ' + cmd
271 os.system(cmd)
272
273 print '\n------------------------------------------------------------------------------'
274 print ' --> STATUS ' + crabTask.tag + ' -- ' \
275 + '\n -> ' + dataset + ' (' + crabTask.mitDataset + ')'\
276 + '\n -> ' + storageEle \
277 + '\n -> ' + storagePath
278 print '------------------------------------------------------------------------------\n'
279
280 # interact with crab to get the job status
281 crabTask.getJobStati()
282 if len(crabTask.jobStati) < 1:
283 print ' ERROR - dropped empty crab task from the work list.'
284 print ' crab task id: ' + crabTask.tag
285 continue
286 else:
287 print ' '
288 print ' Task status: ' + crabTask.status
289 if crabTask.status == 'completed' or crabTask.status == 'finished':
290 crabTask.remove(clean)
291 print ' INFO - crab task has been removed, continuing.\n'
292 continue
293 print ' '
294
295 # review failing sites
296 siteList = ""
297 if len(crabTask.failingSites) > 0:
298 nSites = 0
299 siteList = ",".join(crabTask.failingSites)
300 print " Failing sites (consider blacklisting them)"
301 for site,nAbort in crabTask.failingSites.iteritems():
302 nSites += 1
303 print ' ' + site + '(%d'%nAbort + ')'
304 appendBlacklistSites(crabTask.tag,siteList,blacklist,exe)
305
306 # review all job stati and update exit stati if needed
307 for status in crabTask.jobStati:
308 if status.tag == 'Retrieved' and (status.exitCode < 0 and status.exitStatus < 0):
309 updateExitStati(crabTask.tag,status)
310 #status.showCompact()
311
312 # review all job stati and propose action
313 subList = ''
314 resubList = ''
315 for status in crabTask.jobStati:
316 ##print ' %4.0d '%(status.index) + ' --> ' + status.tag
317 if ((status.tag == 'Created' and status.outputFile == 0)):
318 status.showCompact()
319 if subList == '':
320 subList += '%d'%(status.index)
321 else:
322 subList += ',%d'%(status.index)
323 if ((status.tag == 'Aborted' or status.exitCode > 0 or status.exitStatus > 0)):
324 ##or(status.tag == 'Retrieved' and status.outputFile == 0)):
325 status.showCompact()
326 if resubList == '':
327 resubList += '%d'%(status.index)
328 else:
329 resubList += ',%d'%(status.index)
330 # for failed job first remove remainders before resubmitting
331 if status.outputFile == 1:
332 removeJobRemainders(storageEle,storagePath,crabTask.mitDataset,status.index,exe)
333
334 if subList != '':
335 if not re.search('-',subList) and not re.search(',',subList):
336 subList = subList + ',999999999'
337 cmd = 'crab -c ' + crabTask.tag + ' -submit ' + subList
338 ##print '\nACTION -- SUBMIT.PY: ' + cmd
339 ##if exe == 1:
340 ## status = os.system(cmd)
341 if resubList != '':
342 cmd = 'crab -c ' + crabTask.tag + ' -resubmit ' + resubList
343 ##print '\nACTION -- RE-SUBMIT.PY: ' + cmd
344 ##if exe == 1:
345 ## status = os.system(cmd)
346
347
348 cmd = 'crab -getoutput -continue ' + crabTask.tag
349 print '\n------------------------------------------------------------------------------'
350 print ' --> GETOUTPUT ' + crabTask.tag + ' -- ' \
351 + '\n -> ' + dataset \
352 + '\n -> ' + storageEle \
353 + '\n -> ' + storagePath
354 print '------------------------------------------------------------------------------\n'
355 print ' --> ' + cmd
356 status = os.system(cmd)
357
358 cmd = 'cleanupLog.py --crabId ' + crabTask.tag
359 status = os.system(cmd)
360
361 i += 1