ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/UserCode/MitProd/Processing/bin/jobSitter.py
Revision: 1.2
Committed: Sat Jun 5 02:36:28 2010 UTC (14 years, 11 months ago) by paus
Content type: text/x-python
Branch: MAIN
CVS Tags: Mit_014b, Mit_014a
Changes since 1.1: +338 -0 lines
Log Message:
Wow I forgot all about my cvs.

File Contents

# Content
1 #!/usr/bin/env python
2 #---------------------------------------------------------------------------------------------------
3 # Script to go through my crab jobs, get status and output and take some completion action if
4 # needed and desired.
5 #
6 # Author: C.Paus (July 1, 2008)
7 #---------------------------------------------------------------------------------------------------
8 import os,sys,getopt,re,string
9 import task
10
11 def removeCrabTask(crabTask):
12 cmd = 'crab -kill all -continue ' + crabTask.tag + ' >& /dev/null; rm -rf ' + crabTask.tag
13 print ' KILL and REMOVE task: ' + cmd
14 status = os.system(cmd)
15 return status
16
17 def updateExitStati(tag,status):
18 # Make the file
19 file = tag + '/res/CMSSW_%d.stdout'%(status.index)
20 #print ' Analyzing file: ' + file
21 cmd = 'cat ' + file
22 if os.path.exists(file):
23 for line in os.popen(cmd).readlines(): # run command
24 line = line[:-1]
25 # get ride of empty or commented lines
26 if line == '' or line[0] == '#':
27 continue
28 # no more unnecessary spaces
29 line = " ".join(str(line).split()).strip()
30
31 f = line.split(" ")
32
33 if f[0] == 'EXECUTABLE_EXIT_STATUS':
34 status.exitCode = int(f[2])
35 if f[0] == 'StageOutExitStatus':
36 status.exitStatus = int(f[2])
37 #else:
38 # print ' output file for this job not yet retrieved. '
39
40 def appendBlacklistSites(tag,failedlist,blacklist,exe):
41 # Make the file
42 file = tag + '/share/crab.cfg'
43 # Find out whether there are any ce black listed
44 ceBlacklist = ""
45 # Get the original black list
46 cmd = 'cat ' + file
47
48 # finding the one line
49 join = 0
50 fullLine = ""
51 bSlash = "\\";
52
53 # New Configuration file
54 fileOutput = open(tag + '_crab.cfg','w')
55
56 for line in os.popen(cmd).readlines(): # run command
57 line = line[:-1]
58 ##print 'Line: ' + line
59
60 # get ride of empty or commented lines
61 if line == '' or line[0] == '#':
62 continue
63 # no more unnecessary spaces
64 line = " ".join(str(line).split()).strip()
65 # join lines
66 if join == 1:
67 fullLine += line
68 else:
69 fullLine = line
70 # determine if finished or more is coming
71 if fullLine[-1] == bSlash:
72 join = 1
73 fullLine = fullLine[:-1]
74 # line really ended, so now look at the whole thing
75 else:
76 join = 0
77 # test whether there is a directory
78 names = fullLine.split('=') # splitting every blank
79 ##print "FullLine: " + fullLine
80 if names[0] == 'ce_black_list':
81 ceBlacklist = fullLine
82 fullLine += ',' + failedlist
83 if blacklist != "":
84 fullLine = "ce_black_list=" + blacklist
85 print " new blacklist: " + fullLine
86
87 fileOutput.write(fullLine + '\n')
88
89 fileOutput.close()
90
91 if ceBlacklist == '':
92 print ' WARNING - site blacklisting did not work'
93
94 cmd = 'mv ' + tag + '_crab.cfg ' + file
95 print "\nACTION -- MOVE: " + cmd
96 status = 0
97 if exe == 1:
98 status = os.system(cmd)
99 else:
100 ##status = os.system('cat ' + tag + '_crab.cfg')
101 status = os.system('rm ' + tag + '_crab.cfg')
102 return status
103
104 def removeJobRemainders(storageEle,storagePath,mitDataset,index,exe):
105 fileMit = 'srm://' + storageEle + ':8443' + storagePath + '/' + mitDataset + \
106 '_000_%d'%(index) + '.root'
107 fileEdm = 'srm://' + storageEle + ':8443' + storagePath + '/' + mitDataset + \
108 '-edm_%d'%(index) + '.root'
109 cmd = 'srmrm ' + fileMit + ' ' + fileEdm + ' >& /dev/null'
110 #print 'ACTION -- REMOVE: \n srmrm ' + fileMit + '\n srmrm ' + fileEdm + '\n'
111 status = 0
112 if exe == 1:
113 # for now not execute this # status = os.system(cmd)
114 print 'For now not removing file: ' + cmd
115 return status
116
117 #===================================================================================================
118 # Main starts here
119 #===================================================================================================
120 # Define string to explain usage of the script
121 usage = \
122 "\nUsage: jobSitter.py [ --pattern= --blacklist=" + \
123 " --status --help --backward --clean --extend --one --exe ]\n"
124
125 # Define the valid options which can be specified and check out the command line
126 valid = ['pattern=','blacklist=','catalog=','help','backward','kill','clean','exe','extend','one']
127 try:
128 opts, args = getopt.getopt(sys.argv[1:], "", valid)
129 except getopt.GetoptError, ex:
130 print usage
131 print str(ex)
132 sys.exit(1)
133
134 # --------------------------------------------------------------------------------------------------
135 # Get all parameters for this little task
136 # --------------------------------------------------------------------------------------------------
137 # Set defaults
138 pattern = ''
139 blacklist = ''
140 catalog = 0
141 clean = 0
142 kill = 0
143 exe = 0
144 extend = 0
145 one = 0
146 backward = ''
147
148 # Read new values from the command line
149 for opt, arg in opts:
150 if opt == "--help":
151 print usage
152 sys.exit(0)
153 if opt == "--pattern":
154 pattern = arg
155 if opt == "--blacklist":
156 blacklist = arg
157 if opt == "--catalog":
158 catalog = int(arg)
159 if opt == "--clean":
160 clean = 1
161 if opt == "--one":
162 one = 1
163 if opt == "--exe":
164 exe = 1
165 if opt == "--extend":
166 extend = 1
167 if opt == "--backward":
168 backward = ' -r '
169 if opt == "--kill":
170 kill = 1
171
172 # --------------------------------------------------------------------------------------------------
173 # Here is where the real action starts -------------------------------------------------------------
174 # --------------------------------------------------------------------------------------------------
175
176 # Find the list of crab tasks to babysit
177 crabTasks = []
178 datasetList = []
179 cmd = 'find ./ -maxdepth 1 -name crab_0_\* |grep -v cfg | sort' + backward
180 print '\n=============================================================================='
181 print ' Summary of crab task list: \n'
182 for line in os.popen(cmd).readlines(): # run command
183 line = line[:-1] # strip '\n'
184 ## print ' LINE: ' + line
185 f = line.split('/') # splitting every blank
186 tag = f.pop()
187
188 crabTask = task.Task(tag)
189
190 #print 'Pattern: ' + pattern + ' tag: ' + crabTask.mitDataset
191 if re.search(pattern,crabTask.mitDataset):
192 crabTasks.append(crabTask)
193 crabTask.show()
194
195 if one == 1:
196 break
197
198 # Process the crab tasks determined to be relevant in the last query
199 print '\n=============================================================================='
200 print ' Process crab task list (please wait, crab commands are first fully parsed)\n'
201 i = 0
202 for crabTask in crabTasks:
203
204 dataset = crabTask.cmsDataset
205 storageEle = crabTask.storageEle
206 storagePath = crabTask.storagePath
207
208 if kill == 1:
209 crabTask.killAndRemove(1)
210 continue
211
212 crabTask.loadAllLfns(crabTask.mitCfg + '/' + crabTask.mitVersion + '/' + \
213 crabTask.mitDataset + '.lfns')
214 #if crabTask.status == 'cataloged':
215 # ##removeCrabTask(crabTask)
216 # crabTask.killAndRemove(1)
217 # continue
218
219 # make sure catalog is up to date
220 f = storagePath.split(" ")
221 cmd = 'catalog.sh -ceg -m ' + crabTask.mitCfg + ' ' \
222 + crabTask.mitVersion + ' ' + crabTask.mitDataset + '/' + crabTask.tag + ' --retry'
223
224 if catalog == 3:
225 cmd = 'catalog.sh -cegt -m ' + crabTask.mitCfg + ' ' + crabTask.mitVersion + \
226 ' ' + crabTask.mitDataset + '/' + crabTask.tag + ' --retry'
227 if catalog == 4:
228 cmd = 'catalog.sh -eg -m ' + crabTask.mitCfg + ' ' + crabTask.mitVersion + \
229 ' ' + crabTask.mitDataset + '/' + crabTask.tag + ' --retry'
230 if catalog == 5:
231 cmd = 'catalog.sh -ceg -m ' + crabTask.mitCfg + ' ' + crabTask.mitVersion + \
232 ' ' + crabTask.mitDataset + '/' + crabTask.tag + ' --remove'
233 print ' CATALOG: ' + cmd
234 if catalog != 0:
235 os.system(cmd)
236
237 # break out of the loop as only cataloging is required
238 if catalog > 1:
239 continue
240
241 # do we need to extend the task
242 if extend == 1:
243 print '\n------------------------------------------------------------------------------'
244 print ' --> EXTEND ' + crabTask.tag + ' -- wait crab commands first fully parsed -- ' \
245 + '\n -> ' + dataset + ' (' + crabTask.mitDataset + ')'\
246 + '\n -> ' + storageEle \
247 + '\n -> ' + storagePath
248 print '------------------------------------------------------------------------------\n'
249 cmd = 'crab -extend -c ' + crabTask.tag
250 os.system(cmd)
251
252 print '\n------------------------------------------------------------------------------'
253 print ' --> STATUS ' + crabTask.tag + ' -- wait crab commands first fully parsed -- ' \
254 + '\n -> ' + dataset + ' (' + crabTask.mitDataset + ')'\
255 + '\n -> ' + storageEle \
256 + '\n -> ' + storagePath
257 print '------------------------------------------------------------------------------\n'
258
259 # interact with crab to get the job status
260 crabTask.getJobStati()
261 if len(crabTask.jobStati) < 1:
262 print ' ERROR - dropped empty crab task from the work list.'
263 print ' crab task id: ' + crabTask.tag
264 continue
265 else:
266 print ' '
267 print ' Task status: ' + crabTask.status
268 if crabTask.status == 'completed' or crabTask.status == 'finished':
269 crabTask.remove(clean)
270 print ' '
271
272 # review failing sites
273 siteList = ""
274 if len(crabTask.failingSites) > 0:
275 nSites = 0
276 siteList = ",".join(crabTask.failingSites)
277 print " Failing sites (consider blacklisting them)"
278 for site,nAbort in crabTask.failingSites.iteritems():
279 nSites += 1
280 print ' ' + site + '(%d'%nAbort + ')'
281 appendBlacklistSites(crabTask.tag,siteList,blacklist,exe)
282
283 # review all job stati and update exit stati if needed
284 for status in crabTask.jobStati:
285 if status.tag == 'Retrieved' and (status.exitCode < 0 and status.exitStatus < 0):
286 updateExitStati(crabTask.tag,status)
287 #status.showCompact()
288
289 # review all job stati and propose action
290 subList = ''
291 resubList = ''
292 for status in crabTask.jobStati:
293 ##print ' %4.0d '%(status.index) + ' --> ' + status.tag
294 if ((status.tag == 'Created' and status.outputFile == 0)):
295 status.showCompact()
296 if subList == '':
297 subList += '%d'%(status.index)
298 else:
299 subList += ',%d'%(status.index)
300 if ((status.tag == 'Aborted' or status.exitCode > 0 or status.exitStatus > 0) or
301 (status.tag == 'Retrieved' and status.outputFile == 0)):
302 status.showCompact()
303 if resubList == '':
304 resubList += '%d'%(status.index)
305 else:
306 resubList += ',%d'%(status.index)
307 # for failed job first remove remainders before resubmitting
308 if status.outputFile == 1:
309 removeJobRemainders(storageEle,storagePath,crabTask.mitDataset,status.index,exe)
310
311 if subList != '':
312 if not re.search('-',subList) and not re.search(',',subList):
313 subList = subList + ',999999999'
314 cmd = 'crab -c ' + crabTask.tag + ' -submit ' + subList
315 print '\nACTION -- SUBMIT.PY: ' + cmd
316 if exe == 1:
317 status = os.system(cmd)
318 if resubList != '':
319 cmd = 'crab -c ' + crabTask.tag + ' -resubmit ' + resubList
320 print '\nACTION -- RE-SUBMIT.PY: ' + cmd
321 if exe == 1:
322 status = os.system(cmd)
323
324
325 print '\n------------------------------------------------------------------------------'
326 print ' --> GETOUTPUT ' + crabTask.tag + ' -- wait crab commands first fully parsed -- ' \
327 + '\n -> ' + dataset \
328 + '\n -> ' + storageEle \
329 + '\n -> ' + storagePath
330 print '------------------------------------------------------------------------------\n'
331
332 cmd = 'crab -getoutput -continue ' + crabTask.tag
333 status = os.system(cmd)
334
335 cmd = 'cleanupLog.py --crabId ' + crabTask.tag
336 status = os.system(cmd)
337
338 i += 1