1 |
#---------------------------------------------------------------------------------------------------
|
2 |
# Python Module File to describe CRAB tasks and the corresponding job stati
|
3 |
#
|
4 |
# Author: C.Paus (Oct 10, 2008)
|
5 |
#---------------------------------------------------------------------------------------------------
|
6 |
import os,sys,getopt,re,string
|
7 |
|
8 |
def Domain():
|
9 |
domain = os.uname()[1]
|
10 |
f = domain.split('.')
|
11 |
return '.'.join(f[1:])
|
12 |
|
13 |
#---------------------------------------------------------------------------------------------------
|
14 |
"""
|
15 |
Class: Sample(cmsDataset='undefined',mitDataset='undefined',
|
16 |
localpath='undefined',status='undefined')
|
17 |
Each sample can be described through this class
|
18 |
"""
|
19 |
#---------------------------------------------------------------------------------------------------
|
20 |
class Sample:
|
21 |
"Description of a datasample to be produced using CRAB"
|
22 |
cmsDataset = 'undefined'
|
23 |
mitDataset = 'undefined'
|
24 |
nEvents = 'undefined'
|
25 |
status = 'undefined'
|
26 |
localPath = 'undefined'
|
27 |
dbs = 'undefined'
|
28 |
fixSites = 'undefined'
|
29 |
#-----------------------------------------------------------------------------------------------
|
30 |
# constructor to connect with existing setup
|
31 |
#-----------------------------------------------------------------------------------------------
|
32 |
def __init__(self,cmsDataset='undefined',mitDataset='undefined',
|
33 |
nEvents='undefined',status='undefined',localPath='undefined',dbs='undefined',
|
34 |
fixSites='undefined'):
|
35 |
self.cmsDataset = cmsDataset
|
36 |
self.mitDataset = mitDataset
|
37 |
self.nEvents = nEvents
|
38 |
self.status = status
|
39 |
self.localPath = localPath
|
40 |
self.dbs = dbs
|
41 |
self.fixSites = fixSites
|
42 |
|
43 |
#-----------------------------------------------------------------------------------------------
|
44 |
# present the current samples
|
45 |
#-----------------------------------------------------------------------------------------------
|
46 |
def show(self):
|
47 |
print ' Dataset : ' + self.cmsDataset + ' (' + self.mitDataset + ')'
|
48 |
print ' NEvents : ' + self.nEvents
|
49 |
print ' Status : ' + self.status
|
50 |
print ' LocalPath: ' + self.localPath
|
51 |
print ' Dbs : ' + self.dbs
|
52 |
print ' FixSites : ' + self.fixSites
|
53 |
|
54 |
def showFormat(self,f1,f2,f3,f4,f5,f6,f7):
|
55 |
dbs = ''
|
56 |
if self.dbs != 'undefined':
|
57 |
dbs += ' ' + self.dbs
|
58 |
fixSites = ''
|
59 |
if self.fixSites != 'undefined':
|
60 |
fixSites += ' ' + self.fixSites
|
61 |
print self.cmsDataset.ljust(f1),self.mitDataset.ljust(f2),self.nEvents.ljust(f3),\
|
62 |
self.status.ljust(f4),self.localPath.ljust(f5),dbs.ljust(f6),fixSites.ljust(f7)
|
63 |
|
64 |
#---------------------------------------------------------------------------------------------------
|
65 |
"""
|
66 |
Class: SubTask(tag,)
|
67 |
Each SubTask in CRAB can be described through this class
|
68 |
"""
|
69 |
#---------------------------------------------------------------------------------------------------
|
70 |
class SubTask:
|
71 |
"Description of a SubTask in CRAB"
|
72 |
# variable to be determined
|
73 |
index = -1
|
74 |
lfnFile = 'undefined' # file containing the LFNs to be processed
|
75 |
nSubTaskLfn = -1 # number of LFNs in this subtask
|
76 |
#-----------------------------------------------------------------------------------------------
|
77 |
# constructor to connect with existing setup
|
78 |
#-----------------------------------------------------------------------------------------------
|
79 |
def __init__(self,index,lfnFile):
|
80 |
self.index = index
|
81 |
self.lfnFile = lfnFile
|
82 |
self.nSubTaskLfn = 0
|
83 |
|
84 |
#-----------------------------------------------------------------------------------------------
|
85 |
# present the current crab subtask
|
86 |
#-----------------------------------------------------------------------------------------------
|
87 |
def show(self):
|
88 |
print ' SubTask (Idx: %d, LfnFile: %s) ===='%(self.index,self.lfnFile)
|
89 |
|
90 |
#-----------------------------------------------------------------------------------------------
|
91 |
# subtask tag
|
92 |
#-----------------------------------------------------------------------------------------------
|
93 |
def tag(self):
|
94 |
return "%04d"%self.index
|
95 |
|
96 |
#---------------------------------------------------------------------------------------------------
|
97 |
"""
|
98 |
Class: Task(tag,cmsDataset='undefined',mitCfg='undefined',mitVersion='undefined')
|
99 |
Each task in CRAB can be described through this class
|
100 |
"""
|
101 |
#---------------------------------------------------------------------------------------------------
|
102 |
class Task:
|
103 |
"Description of a Task in CRAB"
|
104 |
# this is sufficient to do anything
|
105 |
tag = 'undefined'
|
106 |
# from actual crab configuration directly
|
107 |
storageEle = 'undefined'
|
108 |
storagePath = 'undefined'
|
109 |
cmsDataset = 'undefined'
|
110 |
nEvents = -1
|
111 |
nTotalEvts = -1
|
112 |
# MIT specific stuff
|
113 |
mitCfg = 'undefined'
|
114 |
mitVersion = 'undefined'
|
115 |
mitDataset = 'undefined'
|
116 |
cmssw = 'undefined'
|
117 |
localPath = 'undefined'
|
118 |
dbs = 'undefined'
|
119 |
fixSites = 'undefined'
|
120 |
# status of task as a whole and each individual job
|
121 |
status = 'undefined' # 'undefined', ....
|
122 |
#
|
123 |
# 'undefined': initial status, not yet even checked
|
124 |
# 'cataloged': all jobs of the tasks have completed and are cataloged successfully
|
125 |
# 'completed': all jobs of the tasks have completed successfully
|
126 |
# 'finished' : all jobs have run, but unsubmitted jobs, errors or aborts might have occured
|
127 |
# 'active' : some jobs are either in Done, Running or to be Run
|
128 |
#
|
129 |
jobStati = []
|
130 |
failingSites = {}
|
131 |
lfns = {}
|
132 |
blocks = {}
|
133 |
# subtasks
|
134 |
nSubTaskLfnMax = 400 # maximum number of LFNs in a subtask
|
135 |
subTasks = [] # list of subtasks
|
136 |
#-----------------------------------------------------------------------------------------------
|
137 |
# constructor to connect with existing setup
|
138 |
#-----------------------------------------------------------------------------------------------
|
139 |
def __init__(self,tag,cmsDataset='undefined',
|
140 |
mitDataset='undefined',mitCfg='undefined',mitVersion='undefined',
|
141 |
cmssw='undefined'):
|
142 |
self.tag = tag
|
143 |
self.status = 'undefined'
|
144 |
|
145 |
if tag == 'new':
|
146 |
self.new(cmsDataset,mitDataset,mitCfg,mitVersion,cmssw)
|
147 |
elif not os.path.exists(tag):
|
148 |
self.new(cmsDataset,mitDataset,mitCfg,mitVersion,cmssw)
|
149 |
else:
|
150 |
cmd = 'cat ' + tag + '/share/crab.cfg | grep ^dataset| cut -d= -f2| tr -d \' \''
|
151 |
for line in os.popen(cmd).readlines(): # run command
|
152 |
self.cmsDataset = line[:-1] # strip '\n'
|
153 |
cmd = 'cat ' + tag + '/share/crab.cfg | grep ^storage_element| cut -d= -f2| tr -d \' \''
|
154 |
for line in os.popen(cmd).readlines(): # run command
|
155 |
self.storageEle = line[:-1] # strip '\n'
|
156 |
cmd = 'cat ' + tag + '/share/crab.cfg | grep ^storage_path| cut -d= -f2-3| tr -d \' \''
|
157 |
for line in os.popen(cmd).readlines(): # run command
|
158 |
self.storagePath = line[:-1] # strip '\n'
|
159 |
cmd = 'cat '+tag+'/share/crab.cfg | grep ^user_remote_dir| cut -d= -f2-3| tr -d \' \''
|
160 |
for line in os.popen(cmd).readlines(): # run command
|
161 |
self.storagePath += line[:-1] # strip '\n'
|
162 |
f = (self.storagePath).split('/')
|
163 |
if re.search('crab_0_',f[-1]):
|
164 |
self.mitDataset = f[-2]
|
165 |
self.mitVersion = f[-3]
|
166 |
self.mitCfg = f[-4]
|
167 |
else:
|
168 |
self.mitDataset = f[-1]
|
169 |
self.mitVersion = f[-2]
|
170 |
self.mitCfg = f[-3]
|
171 |
|
172 |
#-----------------------------------------------------------------------------------------------
|
173 |
# constructor for new creation
|
174 |
#-----------------------------------------------------------------------------------------------
|
175 |
def new(self,cmsDataset,mitDataset,mitCfg,mitVersion,cmssw):
|
176 |
self.cmsDataset = cmsDataset
|
177 |
self.mitDataset = mitDataset
|
178 |
self.mitCfg = mitCfg
|
179 |
self.mitVersion = mitVersion
|
180 |
self.cmssw = cmssw
|
181 |
self.status = 'undefined'
|
182 |
|
183 |
# derive the missing parameters
|
184 |
seFile = os.environ['MIT_PROD_DIR'] + '/' + mitCfg + '/'+ mitVersion + '/seTable'
|
185 |
if not os.path.exists(seFile):
|
186 |
cmd = "Storage element file not found: %s" % seFile
|
187 |
raise RuntimeError, cmd
|
188 |
# resolve the other mitCfg parameters from the configuration file
|
189 |
#cmd = 'cat ' + os.environ['MIT_PROD_DIR'] + '/' + \
|
190 |
# mitCfg + '/' + mitVersion + '/' + 'Productions' + '.' + self.cmssw
|
191 |
cmd = 'cat ' + './' + \
|
192 |
mitCfg + '/' + mitVersion + '/' + 'Productions' + '.' + self.cmssw
|
193 |
join = 0
|
194 |
fullLine = ""
|
195 |
bSlash = "\\";
|
196 |
for line in os.popen(cmd).readlines(): # run command
|
197 |
line = line[:-1]
|
198 |
# get ride of empty or commented lines
|
199 |
if line == '' or line[0] == '#':
|
200 |
continue
|
201 |
# join lines
|
202 |
if join == 1:
|
203 |
fullLine += line
|
204 |
else:
|
205 |
fullLine = line
|
206 |
# determine if finished or more is coming
|
207 |
if fullLine[-1] == bSlash:
|
208 |
join = 1
|
209 |
fullLine = fullLine[:-1]
|
210 |
else:
|
211 |
join = 0
|
212 |
# test whether there is a directory
|
213 |
#-print ' Full line: ' + fullLine
|
214 |
names = fullLine.split() # splitting every blank
|
215 |
if names[0] == self.cmsDataset or names[1] == self.mitDataset:
|
216 |
self.cmsDataset = names[0] # CMS name of the dataset
|
217 |
self.mitDataset = names[1] # the equivalent MIT name of the dataset
|
218 |
self.nEvents = int(names[2]) # number of events to be used in the production
|
219 |
if names[4] != "-":
|
220 |
self.localPath = names[4]
|
221 |
print "\n Sample Info: " + fullLine + "\n"
|
222 |
print "\n Sample info from database Productions.%s\n %s"%(cmssw,fullLine)
|
223 |
if len(names) >= 6:
|
224 |
dbs = names[5]
|
225 |
testDbs = 'wget http://cmsdbsprod.cern.ch/cms_dbs_' + dbs \
|
226 |
+ '/servlet/DBSServlet >& /dev/null'
|
227 |
status = os.system(testDbs)
|
228 |
if status == 0:
|
229 |
self.dbs = 'http://cmsdbsprod.cern.ch/cms_dbs_' + dbs \
|
230 |
+ '/servlet/DBSServlet'
|
231 |
else:
|
232 |
self.dbs = dbs
|
233 |
print ' dbs: ' + self.dbs + '\n'
|
234 |
if len(names) >= 7:
|
235 |
self.fixSites = names[6]
|
236 |
else:
|
237 |
self.dbs = \
|
238 |
"http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet"
|
239 |
print ''
|
240 |
|
241 |
# decide on the forseen default storage place (where are we running?)
|
242 |
storageTag = 'T2_US_MIT'
|
243 |
domain = Domain()
|
244 |
if re.search('mit.edu',domain):
|
245 |
storageTag = 'T2_US_MIT'
|
246 |
elif re.search('cern.ch',domain):
|
247 |
storageTag = 'T0_CH_CERN'
|
248 |
print ' Loading storage from local seTable: ' + storageTag
|
249 |
cmd = 'grep ^' + storageTag + ' ' + seFile
|
250 |
for line in os.popen(cmd).readlines(): # run command
|
251 |
#print ' LINE: ' + line
|
252 |
line = line[:-1] # strip '\n'
|
253 |
line = line.replace(' ','')
|
254 |
f = line.split(':')
|
255 |
self.storageEle = f[1]
|
256 |
self.storagePath = f[2]
|
257 |
userRemoteDir = f[3]
|
258 |
print ' Storage -- Ele: ' + self.storageEle \
|
259 |
+ ' Path: ' + self.storagePath + ' UserDir: ' + userRemoteDir
|
260 |
self.storagePath += userRemoteDir \
|
261 |
+ '/' + self.mitCfg + '/' + self.mitVersion + '/' + self.mitDataset
|
262 |
|
263 |
#-----------------------------------------------------------------------------------------------
|
264 |
# present the current crab task
|
265 |
#-----------------------------------------------------------------------------------------------
|
266 |
def show(self):
|
267 |
print ' ==== CRAB Task Information (%s, %s, %s) ===='%(self.tag,self.mitCfg,self.mitVersion)
|
268 |
print ' Dataset: ' + self.cmsDataset + ' (' + self.mitDataset + ')'
|
269 |
print ' Storage: %s @ %s'%(self.storagePath,self.storageEle)
|
270 |
print ' List of sub tasks to be completed: '
|
271 |
for subTask in self.subTasks:
|
272 |
subTask.show()
|
273 |
print ' '
|
274 |
|
275 |
#-----------------------------------------------------------------------------------------------
|
276 |
# create all subtasks of the tasks
|
277 |
#-----------------------------------------------------------------------------------------------
|
278 |
def createSubTasks(self,lfnFile):
|
279 |
print ' creating subtasks'
|
280 |
# loop through the missing lfn file and create subtasks each nSubTaskEvents
|
281 |
cmd = 'cat ' + lfnFile
|
282 |
iLine = 0
|
283 |
index = 0
|
284 |
output = open("/tmp/tmp.bak",'w')
|
285 |
for line in os.popen(cmd).readlines(): # run command
|
286 |
iLine += 1
|
287 |
# open file as needed
|
288 |
if iLine % self.nSubTaskLfnMax == 1:
|
289 |
if output:
|
290 |
output.close()
|
291 |
index += 1
|
292 |
file = lfnFile + '_%04d' % index
|
293 |
output = open(file,'w')
|
294 |
subTask = SubTask(index,file)
|
295 |
# add this subtaks to the list
|
296 |
self.subTasks.append(subTask)
|
297 |
# one more lfn entry for this sub task
|
298 |
output.write(line)
|
299 |
subTask.nSubTaskLfn += 1
|
300 |
|
301 |
# closeup the last subtask
|
302 |
output.close()
|
303 |
|
304 |
print ' '
|
305 |
self.show()
|
306 |
|
307 |
#-----------------------------------------------------------------------------------------------
|
308 |
# load all lfns relevant to this task
|
309 |
#-----------------------------------------------------------------------------------------------
|
310 |
def loadAllLfns(self, lfnFile):
|
311 |
# initialize from scratch
|
312 |
self.lfns = {}
|
313 |
self.blocks = {}
|
314 |
self.nTotalEvts = 0
|
315 |
# use the complete lfn file list
|
316 |
cmd = 'cat ' + lfnFile
|
317 |
for line in os.popen(cmd).readlines(): # run command
|
318 |
line = line[:-1]
|
319 |
# get ride of empty or commented lines
|
320 |
if line == '' or line[0] == '#':
|
321 |
continue
|
322 |
|
323 |
# decoding the input line
|
324 |
f = line.split() # splitting every blank
|
325 |
block = f[0]
|
326 |
file = f[1]
|
327 |
nEvents = int(f[2])
|
328 |
self.nTotalEvts += nEvents
|
329 |
|
330 |
f = file.split('/')
|
331 |
file = f[-1]
|
332 |
|
333 |
if file in self.lfns.keys():
|
334 |
self.lfns[file] = 1
|
335 |
else:
|
336 |
self.lfns[file] = 0
|
337 |
|
338 |
if not self.blocks.get(block):
|
339 |
self.blocks[block] = 1
|
340 |
else:
|
341 |
self.blocks[block] += 1
|
342 |
|
343 |
print ' TOTAL - Lfns: %6d [ Blocks: %4d Events: %9d ]'\
|
344 |
%(len(self.lfns),len(self.blocks),self.nTotalEvts)
|
345 |
|
346 |
#-----------------------------------------------------------------------------------------------
|
347 |
# load all lfns so far completed relevant to this task
|
348 |
#-----------------------------------------------------------------------------------------------
|
349 |
def loadCompletedLfns(self):
|
350 |
# initialize from scratch
|
351 |
self.nLfnDone = 0
|
352 |
# find all already existing files
|
353 |
f = self.storagePath.split('=')
|
354 |
path = f[-1]
|
355 |
if re.search('crab_0_',path) or re.search('CRAB',path):
|
356 |
f = path.split('/')
|
357 |
f = f[:-1]
|
358 |
path = '/'.join(f)
|
359 |
cmd = 'list ' + path + ' | grep root 2> /dev/null'
|
360 |
for line in os.popen(cmd).readlines(): # run command
|
361 |
f = line.split()
|
362 |
file = f[1]
|
363 |
if file in self.lfns.keys():
|
364 |
self.lfns[file] = 1
|
365 |
else:
|
366 |
print ' ERROR -- found completed lfn not in list of all lfns?! ->' + file + '<-'
|
367 |
self.lfns[file] = 2
|
368 |
self.nLfnDone += 1
|
369 |
|
370 |
print ' DONE - Lfns: %6d'%(self.nLfnDone)
|
371 |
|
372 |
#-----------------------------------------------------------------------------------------------
|
373 |
# load all lfns relevant to this task
|
374 |
#-----------------------------------------------------------------------------------------------
|
375 |
def createMissingLfns(self, lfnFile, restLfnFile = "remaining.lfns"):
|
376 |
# fill the remaining lfns from complete database
|
377 |
self.status = 'cataloged'
|
378 |
self.nLfnMissing = 0
|
379 |
cmd = 'rm -rf ' + restLfnFile + '; touch ' + restLfnFile
|
380 |
os.system(cmd)
|
381 |
for lfn,status in self.lfns.iteritems():
|
382 |
if status == 0:
|
383 |
# add this lfn to the file
|
384 |
self.nLfnMissing += 1
|
385 |
cmd = 'grep ' + lfn + ' ' + lfnFile + ' >> ' + restLfnFile
|
386 |
os.system(cmd)
|
387 |
else:
|
388 |
self.status = 'undefined'
|
389 |
|
390 |
# it is important to sort them (by first column == block)
|
391 |
cmd = 'sort -u ' + restLfnFile + ' > /tmp/cache' + ' ; mv /tmp/cache '+ restLfnFile
|
392 |
os.system(cmd)
|
393 |
|
394 |
print ' MISSING - Lfns: %6d'%(self.nLfnMissing)
|
395 |
|
396 |
#-----------------------------------------------------------------------------------------------
|
397 |
# create an inventory of all the existing output files
|
398 |
#-----------------------------------------------------------------------------------------------
|
399 |
def makeInventory(self):
|
400 |
castorPath = self.storagePath
|
401 |
f = castorPath.split("=")
|
402 |
castorPath = f[1]
|
403 |
#cmd = "srmls srm://" + self.storageEle + ":8443" + self.storagePath + " | grep " \
|
404 |
# + self.mitDataset + "_"
|
405 |
cmd = "list " + castorPath + " | grep root | cut -d ' ' -f2"
|
406 |
|
407 |
print " cmd: " + cmd
|
408 |
for status in self.jobStati:
|
409 |
status.outputFile = 0
|
410 |
for line in os.popen(cmd).readlines(): # run directory list command
|
411 |
#print " line: " + line
|
412 |
line = line[:-1] # strip '\n'
|
413 |
f = line.split("_")
|
414 |
number = f.pop()
|
415 |
|
416 |
# new crab (2_7_7 from 2_7_2) pop two more :-)
|
417 |
number = f.pop()
|
418 |
number = f.pop()
|
419 |
|
420 |
f = number.split(".")
|
421 |
number = int(f[0])
|
422 |
# update the job status
|
423 |
#print ' Index: %d >> %s'%(number,line)
|
424 |
if number-1 >= len(self.jobStati):
|
425 |
print ' Index out of range requested: %d Waiting for the CRASH!'%(number)
|
426 |
self.jobStati[number-1].outputFile = 1
|
427 |
#-----------------------------------------------------------------------------------------------
|
428 |
# get crab status information for each job in the task
|
429 |
#-----------------------------------------------------------------------------------------------
|
430 |
def getJobStati(self):
|
431 |
# Interact with crab to determine the present status of the jobs
|
432 |
pattern = 'Created'
|
433 |
# result = re.search(pattern,line)
|
434 |
|
435 |
active = 0
|
436 |
self.jobStati = []
|
437 |
self.failingSites = {}
|
438 |
cmd = 'crab -status -continue ' + self.tag
|
439 |
#print 'Access Crab Job Stati, now!'
|
440 |
for line in os.popen(cmd).readlines(): # run command
|
441 |
line = line[:-1] # strip '\n'
|
442 |
if active == 0:
|
443 |
print ' CRAB: ' + line
|
444 |
else:
|
445 |
if not re.search(pattern,line) and not re.search('------',line):
|
446 |
print ' CRAB: ' + line
|
447 |
|
448 |
#>> # compactify line
|
449 |
#>> line = " ".join(str(line).split()).strip()
|
450 |
#>> f = line.split(" ")
|
451 |
# decide whether we are in job status line or not
|
452 |
if line[1:5] == "----":
|
453 |
if active == 0:
|
454 |
active = 1 # print "Activated parsing"
|
455 |
continue
|
456 |
if active == 1 and line == '':
|
457 |
active = 0 # print "Deactivated parsing"
|
458 |
# parse the content of the job report
|
459 |
if active == 1:
|
460 |
#>> #print ' LINE: ' + line
|
461 |
#>> status = JobStatus(int(f[0]),f[1])
|
462 |
#>> if len(f) > 2:
|
463 |
#>> status.ce = f[2]
|
464 |
#>>
|
465 |
#>> if len(f) >= 2 and f[1] == 'Retrieved':
|
466 |
#>> if len(f) > 5:
|
467 |
#>> status.exitCode = int(f[3])
|
468 |
#>> status.exitStatus = int(f[4])
|
469 |
|
470 |
# fixed column read
|
471 |
## # CRAB_2_7_2
|
472 |
## ##ID STATUS E_HOST EXE_EXIT_CODE JOB_EXIT_STATUS
|
473 |
## #lastChar = len(line)
|
474 |
## #print 'Last Char: %d'%lastChar
|
475 |
## iJob = int(line[0:5].strip())
|
476 |
## sJob = line[7:24].strip()
|
477 |
## status = JobStatus(iJob,sJob)
|
478 |
## status.ce = line[26:61].strip()
|
479 |
## tmp = line[63:75].strip()
|
480 |
|
481 |
# CRAB_2_7_7
|
482 |
##ID END STATUS ACTION ExeExitCode JobExitCode E_HOST
|
483 |
##25 N Running SubSuccess llrcream.in2p3.fr
|
484 |
#lastChar = len(line)
|
485 |
#print 'Last Char: %d'%lastChar
|
486 |
iJob = int(line[0:5].strip())
|
487 |
sJob = line[10:27].strip()
|
488 |
status = JobStatus(iJob,sJob)
|
489 |
status.ce = line[65:].strip()
|
490 |
tmp = line[41:52].strip()
|
491 |
|
492 |
if tmp != '':
|
493 |
status.exitCode = int(tmp)
|
494 |
tmp = line[53:64].strip()
|
495 |
if tmp != '':
|
496 |
status.exitStatus = int(tmp)
|
497 |
|
498 |
#print ' Appending: id %d array entry: %d '%(iJob,len(self.jobStati))
|
499 |
self.jobStati.append(status)
|
500 |
|
501 |
# review job output so far
|
502 |
if len(self.jobStati) > 0:
|
503 |
self.makeInventory()
|
504 |
else:
|
505 |
print ' ERROR - This task has not jobs stati assigned to it. Something is wrong.'
|
506 |
print ' crab task id: ' + self.tag
|
507 |
|
508 |
|
509 |
# Make sure certain cases get fixed to avoid deletion
|
510 |
for status in self.jobStati:
|
511 |
# fix those jobs where the output has already been found
|
512 |
if status.exitStatus == 60303 and status.outputFile == 1:
|
513 |
status.exitStatus = 0
|
514 |
elif status.exitStatus == -1 and status.exitCode == -1 and status.outputFile == 1:
|
515 |
status.exitStatus = 0
|
516 |
status.exitCode = 0
|
517 |
|
518 |
elif (status.exitStatus != 0 or status.exitCode != 0) and \
|
519 |
not (status.exitStatus == -1 and status.exitCode == -1):
|
520 |
|
521 |
print " ==> Failure: filing with status/code: %d %d CE: %s" \
|
522 |
%(status.exitStatus,status.exitCode,status.ce)
|
523 |
|
524 |
if status.ce in self.failingSites:
|
525 |
self.failingSites[status.ce] += 1
|
526 |
else:
|
527 |
self.failingSites[status.ce] = 1
|
528 |
|
529 |
#print ' Dimension of Job Stati: %d'%(len(self.jobStati))
|
530 |
|
531 |
# Loop through the job stati and determine the task status
|
532 |
# - check whether task is completed
|
533 |
active = 0
|
534 |
for status in self.jobStati:
|
535 |
if status.tag != 'Aborted' and status.tag != 'Retrieved' and status.tag != 'Created' and status.tag != 'Cleared':
|
536 |
active = 1
|
537 |
break
|
538 |
if active == 0:
|
539 |
self.status = 'completed'
|
540 |
for status in self.jobStati:
|
541 |
if status.tag == 'Aborted' or status.exitCode != 0 or status.exitStatus != 0:
|
542 |
self.status = 'finished'
|
543 |
break
|
544 |
else:
|
545 |
self.status = 'active'
|
546 |
|
547 |
|
548 |
|
549 |
#-----------------------------------------------------------------------------------------------
|
550 |
# print the line to complete the task
|
551 |
#-----------------------------------------------------------------------------------------------
|
552 |
def complete(self):
|
553 |
print ' ./bin/submit.py --noTestJob --complete --version=008 --mitDataset=%s'% \
|
554 |
(self.mitDataset)
|
555 |
|
556 |
#-----------------------------------------------------------------------------------------------
|
557 |
# print the line to remove the task and do it if requested
|
558 |
#-----------------------------------------------------------------------------------------------
|
559 |
def remove(self,clean=0):
|
560 |
cmd = ' cleanupLog.py --crabId=' + self.tag + \
|
561 |
'; mkdir -p ./completed; mv ' + self.tag + ' ./completed/'
|
562 |
print ' -> ' + cmd
|
563 |
if clean == 1:
|
564 |
os.system(cmd)
|
565 |
|
566 |
#-----------------------------------------------------------------------------------------------
|
567 |
# print the line to remove the task and do it if requested
|
568 |
#-----------------------------------------------------------------------------------------------
|
569 |
def killAndRemove(self,clean=0):
|
570 |
# kill the remaining jobs
|
571 |
cmd = 'crab -kill all -continue ' +self.tag
|
572 |
print ' -> ' + cmd
|
573 |
if clean == 1:
|
574 |
os.system(cmd)
|
575 |
# now remove the remainders
|
576 |
self.remove(clean)
|
577 |
|
578 |
#---------------------------------------------------------------------------------------------------
|
579 |
"""
|
580 |
Class: JobStatus(index,tag)
|
581 |
The status of one job of a list of CRAB jobs (inside one task) is described by this class
|
582 |
"""
|
583 |
#---------------------------------------------------------------------------------------------------
|
584 |
class JobStatus:
|
585 |
"Minimal but sufficient Job Status for crab operations"
|
586 |
index = -1
|
587 |
tag = 'undefined'
|
588 |
ce = 'undefined'
|
589 |
outputFile = -1
|
590 |
exitCode = -1
|
591 |
exitStatus = -1
|
592 |
#-----------------------------------------------------------------------------------------------
|
593 |
# constructor
|
594 |
#-----------------------------------------------------------------------------------------------
|
595 |
def __init__(self, index, tag):
|
596 |
self.index = index
|
597 |
self.tag = tag
|
598 |
#-----------------------------------------------------------------------------------------------
|
599 |
# present the current crab task in compact form
|
600 |
#-----------------------------------------------------------------------------------------------
|
601 |
def showCompact(self):
|
602 |
print 'Status: %6d %20s Output: %2d Exit: %6d,%6d at CE: %s'% \
|
603 |
(self.index,self.tag,self.outputFile,self.exitCode, \
|
604 |
self.exitStatus,self.ce)
|
605 |
#-----------------------------------------------------------------------------------------------
|
606 |
# present the current crab task in long form
|
607 |
#-----------------------------------------------------------------------------------------------
|
608 |
def show(self):
|
609 |
print '==== Job Status Information ===='
|
610 |
print ' Index: %6d Tag: %s CE: %s'%(self.index,self.tag,self.ce)
|
611 |
print ' Output: %2d Exit: %6d,%6d'%(self.outputFile,self.exitCode,self.exitStatus)
|