1 |
paus |
1.2 |
#---------------------------------------------------------------------------------------------------
|
2 |
|
|
# Python Module File to describe CRAB tasks and the corresponding job stati
|
3 |
|
|
#
|
4 |
|
|
# Author: C.Paus (Oct 10, 2008)
|
5 |
|
|
#---------------------------------------------------------------------------------------------------
|
6 |
|
|
import os,sys,getopt,re,string
|
7 |
|
|
|
8 |
paus |
1.7 |
def Domain():
|
9 |
|
|
domain = os.uname()[1]
|
10 |
|
|
f = domain.split('.')
|
11 |
|
|
return '.'.join(f[1:])
|
12 |
|
|
|
13 |
|
|
#---------------------------------------------------------------------------------------------------
|
14 |
|
|
"""
|
15 |
|
|
Class: Sample(cmsDataset='undefined',mitDataset='undefined',
|
16 |
|
|
localpath='undefined',status='undefined')
|
17 |
|
|
Each sample can be described through this class
|
18 |
|
|
"""
|
19 |
|
|
#---------------------------------------------------------------------------------------------------
|
20 |
|
|
class Sample:
|
21 |
|
|
"Description of a datasample to be produced using CRAB"
|
22 |
|
|
cmsDataset = 'undefined'
|
23 |
|
|
mitDataset = 'undefined'
|
24 |
|
|
nEvents = 'undefined'
|
25 |
|
|
status = 'undefined'
|
26 |
|
|
localPath = 'undefined'
|
27 |
|
|
dbs = 'undefined'
|
28 |
paus |
1.11 |
fixSites = 'undefined'
|
29 |
paus |
1.7 |
#-----------------------------------------------------------------------------------------------
|
30 |
|
|
# constructor to connect with existing setup
|
31 |
|
|
#-----------------------------------------------------------------------------------------------
|
32 |
|
|
def __init__(self,cmsDataset='undefined',mitDataset='undefined',
|
33 |
paus |
1.11 |
nEvents='undefined',status='undefined',localPath='undefined',dbs='undefined',
|
34 |
|
|
fixSites='undefined'):
|
35 |
paus |
1.7 |
self.cmsDataset = cmsDataset
|
36 |
|
|
self.mitDataset = mitDataset
|
37 |
|
|
self.nEvents = nEvents
|
38 |
|
|
self.status = status
|
39 |
|
|
self.localPath = localPath
|
40 |
|
|
self.dbs = dbs
|
41 |
paus |
1.11 |
self.fixSites = fixSites
|
42 |
paus |
1.7 |
|
43 |
|
|
#-----------------------------------------------------------------------------------------------
|
44 |
|
|
# present the current samples
|
45 |
|
|
#-----------------------------------------------------------------------------------------------
|
46 |
|
|
def show(self):
|
47 |
|
|
print ' Dataset : ' + self.cmsDataset + ' (' + self.mitDataset + ')'
|
48 |
|
|
print ' NEvents : ' + self.nEvents
|
49 |
|
|
print ' Status : ' + self.status
|
50 |
|
|
print ' LocalPath: ' + self.localPath
|
51 |
|
|
print ' Dbs : ' + self.dbs
|
52 |
paus |
1.11 |
print ' FixSites : ' + self.fixSites
|
53 |
paus |
1.7 |
|
54 |
paus |
1.11 |
def showFormat(self,f1,f2,f3,f4,f5,f6,f7):
|
55 |
paus |
1.7 |
dbs = ''
|
56 |
|
|
if self.dbs != 'undefined':
|
57 |
paus |
1.11 |
dbs += ' ' + self.dbs
|
58 |
|
|
fixSites = ''
|
59 |
|
|
if self.fixSites != 'undefined':
|
60 |
|
|
fixSites += ' ' + self.fixSites
|
61 |
paus |
1.7 |
print self.cmsDataset.ljust(f1),self.mitDataset.ljust(f2),self.nEvents.ljust(f3),\
|
62 |
paus |
1.11 |
self.status.ljust(f4),self.localPath.ljust(f5),dbs.ljust(f6),fixSites.ljust(f7)
|
63 |
paus |
1.7 |
|
64 |
paus |
1.2 |
#---------------------------------------------------------------------------------------------------
|
65 |
|
|
"""
|
66 |
|
|
Class: SubTask(tag,)
|
67 |
|
|
Each SubTask in CRAB can be described through this class
|
68 |
|
|
"""
|
69 |
|
|
#---------------------------------------------------------------------------------------------------
|
70 |
|
|
class SubTask:
|
71 |
|
|
"Description of a SubTask in CRAB"
|
72 |
|
|
# variable to be determined
|
73 |
|
|
index = -1
|
74 |
|
|
lfnFile = 'undefined' # file containing the LFNs to be processed
|
75 |
|
|
nSubTaskLfn = -1 # number of LFNs in this subtask
|
76 |
|
|
#-----------------------------------------------------------------------------------------------
|
77 |
|
|
# constructor to connect with existing setup
|
78 |
|
|
#-----------------------------------------------------------------------------------------------
|
79 |
|
|
def __init__(self,index,lfnFile):
|
80 |
|
|
self.index = index
|
81 |
|
|
self.lfnFile = lfnFile
|
82 |
|
|
self.nSubTaskLfn = 0
|
83 |
|
|
|
84 |
|
|
#-----------------------------------------------------------------------------------------------
|
85 |
|
|
# present the current crab subtask
|
86 |
|
|
#-----------------------------------------------------------------------------------------------
|
87 |
|
|
def show(self):
|
88 |
|
|
print ' SubTask (Idx: %d, LfnFile: %s) ===='%(self.index,self.lfnFile)
|
89 |
|
|
|
90 |
|
|
#-----------------------------------------------------------------------------------------------
|
91 |
|
|
# subtask tag
|
92 |
|
|
#-----------------------------------------------------------------------------------------------
|
93 |
|
|
def tag(self):
|
94 |
|
|
return "%04d"%self.index
|
95 |
|
|
|
96 |
|
|
#---------------------------------------------------------------------------------------------------
|
97 |
|
|
"""
|
98 |
|
|
Class: Task(tag,cmsDataset='undefined',mitCfg='undefined',mitVersion='undefined')
|
99 |
|
|
Each task in CRAB can be described through this class
|
100 |
|
|
"""
|
101 |
|
|
#---------------------------------------------------------------------------------------------------
|
102 |
|
|
class Task:
|
103 |
|
|
"Description of a Task in CRAB"
|
104 |
|
|
# this is sufficient to do anything
|
105 |
|
|
tag = 'undefined'
|
106 |
|
|
# from actual crab configuration directly
|
107 |
|
|
storageEle = 'undefined'
|
108 |
|
|
storagePath = 'undefined'
|
109 |
|
|
cmsDataset = 'undefined'
|
110 |
|
|
nEvents = -1
|
111 |
|
|
nTotalEvts = -1
|
112 |
|
|
# MIT specific stuff
|
113 |
|
|
mitCfg = 'undefined'
|
114 |
|
|
mitVersion = 'undefined'
|
115 |
|
|
mitDataset = 'undefined'
|
116 |
paus |
1.7 |
cmssw = 'undefined'
|
117 |
paus |
1.2 |
localPath = 'undefined'
|
118 |
paus |
1.7 |
dbs = 'undefined'
|
119 |
paus |
1.11 |
fixSites = 'undefined'
|
120 |
paus |
1.2 |
# status of task as a whole and each individual job
|
121 |
|
|
status = 'undefined' # 'undefined', ....
|
122 |
|
|
#
|
123 |
|
|
# 'undefined': initial status, not yet even checked
|
124 |
|
|
# 'cataloged': all jobs of the tasks have completed and are cataloged successfully
|
125 |
|
|
# 'completed': all jobs of the tasks have completed successfully
|
126 |
|
|
# 'finished' : all jobs have run, but unsubmitted jobs, errors or aborts might have occured
|
127 |
|
|
# 'active' : some jobs are either in Done, Running or to be Run
|
128 |
|
|
#
|
129 |
|
|
jobStati = []
|
130 |
|
|
failingSites = {}
|
131 |
|
|
lfns = {}
|
132 |
|
|
blocks = {}
|
133 |
|
|
# subtasks
|
134 |
|
|
nSubTaskLfnMax = 400 # maximum number of LFNs in a subtask
|
135 |
|
|
subTasks = [] # list of subtasks
|
136 |
|
|
#-----------------------------------------------------------------------------------------------
|
137 |
|
|
# constructor to connect with existing setup
|
138 |
|
|
#-----------------------------------------------------------------------------------------------
|
139 |
paus |
1.7 |
def __init__(self,tag,cmsDataset='undefined',
|
140 |
|
|
mitDataset='undefined',mitCfg='undefined',mitVersion='undefined',
|
141 |
|
|
cmssw='undefined'):
|
142 |
|
|
self.tag = tag
|
143 |
paus |
1.2 |
self.status = 'undefined'
|
144 |
paus |
1.7 |
|
145 |
paus |
1.2 |
if tag == 'new':
|
146 |
paus |
1.7 |
self.new(cmsDataset,mitDataset,mitCfg,mitVersion,cmssw)
|
147 |
paus |
1.2 |
elif not os.path.exists(tag):
|
148 |
paus |
1.7 |
self.new(cmsDataset,mitDataset,mitCfg,mitVersion,cmssw)
|
149 |
paus |
1.2 |
else:
|
150 |
|
|
cmd = 'cat ' + tag + '/share/crab.cfg | grep ^dataset| cut -d= -f2| tr -d \' \''
|
151 |
|
|
for line in os.popen(cmd).readlines(): # run command
|
152 |
|
|
self.cmsDataset = line[:-1] # strip '\n'
|
153 |
|
|
cmd = 'cat ' + tag + '/share/crab.cfg | grep ^storage_element| cut -d= -f2| tr -d \' \''
|
154 |
|
|
for line in os.popen(cmd).readlines(): # run command
|
155 |
|
|
self.storageEle = line[:-1] # strip '\n'
|
156 |
|
|
cmd = 'cat ' + tag + '/share/crab.cfg | grep ^storage_path| cut -d= -f2-3| tr -d \' \''
|
157 |
|
|
for line in os.popen(cmd).readlines(): # run command
|
158 |
|
|
self.storagePath = line[:-1] # strip '\n'
|
159 |
|
|
cmd = 'cat '+tag+'/share/crab.cfg | grep ^user_remote_dir| cut -d= -f2-3| tr -d \' \''
|
160 |
|
|
for line in os.popen(cmd).readlines(): # run command
|
161 |
|
|
self.storagePath += line[:-1] # strip '\n'
|
162 |
|
|
f = (self.storagePath).split('/')
|
163 |
|
|
if re.search('crab_0_',f[-1]):
|
164 |
|
|
self.mitDataset = f[-2]
|
165 |
|
|
self.mitVersion = f[-3]
|
166 |
|
|
self.mitCfg = f[-4]
|
167 |
|
|
else:
|
168 |
|
|
self.mitDataset = f[-1]
|
169 |
|
|
self.mitVersion = f[-2]
|
170 |
|
|
self.mitCfg = f[-3]
|
171 |
|
|
|
172 |
|
|
#-----------------------------------------------------------------------------------------------
|
173 |
|
|
# constructor for new creation
|
174 |
|
|
#-----------------------------------------------------------------------------------------------
|
175 |
paus |
1.7 |
def new(self,cmsDataset,mitDataset,mitCfg,mitVersion,cmssw):
|
176 |
paus |
1.2 |
self.cmsDataset = cmsDataset
|
177 |
paus |
1.7 |
self.mitDataset = mitDataset
|
178 |
paus |
1.2 |
self.mitCfg = mitCfg
|
179 |
|
|
self.mitVersion = mitVersion
|
180 |
paus |
1.7 |
self.cmssw = cmssw
|
181 |
paus |
1.2 |
self.status = 'undefined'
|
182 |
paus |
1.7 |
|
183 |
paus |
1.2 |
# derive the missing parameters
|
184 |
paus |
1.7 |
seFile = os.environ['MIT_PROD_DIR'] + '/' + mitCfg + '/'+ mitVersion + '/seTable'
|
185 |
|
|
if not os.path.exists(seFile):
|
186 |
|
|
cmd = "Storage element file not found: %s" % seFile
|
187 |
paus |
1.2 |
raise RuntimeError, cmd
|
188 |
|
|
# resolve the other mitCfg parameters from the configuration file
|
189 |
paus |
1.3 |
cmd = 'cat ' + os.environ['MIT_PROD_DIR'] + '/' + \
|
190 |
paus |
1.7 |
mitCfg + '/' + mitVersion + '/' + 'Productions' + '.' + self.cmssw
|
191 |
paus |
1.2 |
join = 0
|
192 |
|
|
fullLine = ""
|
193 |
|
|
bSlash = "\\";
|
194 |
|
|
for line in os.popen(cmd).readlines(): # run command
|
195 |
|
|
line = line[:-1]
|
196 |
|
|
# get ride of empty or commented lines
|
197 |
|
|
if line == '' or line[0] == '#':
|
198 |
|
|
continue
|
199 |
|
|
# join lines
|
200 |
|
|
if join == 1:
|
201 |
|
|
fullLine += line
|
202 |
|
|
else:
|
203 |
|
|
fullLine = line
|
204 |
|
|
# determine if finished or more is coming
|
205 |
|
|
if fullLine[-1] == bSlash:
|
206 |
|
|
join = 1
|
207 |
|
|
fullLine = fullLine[:-1]
|
208 |
|
|
else:
|
209 |
|
|
join = 0
|
210 |
|
|
# test whether there is a directory
|
211 |
paus |
1.8 |
#-print ' Full line: ' + fullLine
|
212 |
paus |
1.2 |
names = fullLine.split() # splitting every blank
|
213 |
paus |
1.7 |
if names[0] == self.cmsDataset or names[1] == self.mitDataset:
|
214 |
|
|
self.cmsDataset = names[0] # CMS name of the dataset
|
215 |
paus |
1.2 |
self.mitDataset = names[1] # the equivalent MIT name of the dataset
|
216 |
|
|
self.nEvents = int(names[2]) # number of events to be used in the production
|
217 |
|
|
if names[4] != "-":
|
218 |
|
|
self.localPath = names[4]
|
219 |
|
|
print "\n Sample Info: " + fullLine + "\n"
|
220 |
paus |
1.7 |
print "\n Sample info from database Productions.%s\n %s"%(cmssw,fullLine)
|
221 |
paus |
1.11 |
if len(names) >= 6:
|
222 |
paus |
1.7 |
dbs = names[5]
|
223 |
paus |
1.10 |
testDbs = 'wget http://cmsdbsprod.cern.ch/cms_dbs_' + dbs \
|
224 |
|
|
+ '/servlet/DBSServlet >& /dev/null'
|
225 |
|
|
status = os.system(testDbs)
|
226 |
|
|
if status == 0:
|
227 |
|
|
self.dbs = 'http://cmsdbsprod.cern.ch/cms_dbs_' + dbs \
|
228 |
|
|
+ '/servlet/DBSServlet'
|
229 |
|
|
else:
|
230 |
|
|
self.dbs = dbs
|
231 |
paus |
1.7 |
print ' dbs: ' + self.dbs + '\n'
|
232 |
paus |
1.11 |
if len(names) >= 7:
|
233 |
|
|
self.fixSites = names[6]
|
234 |
paus |
1.7 |
else:
|
235 |
|
|
self.dbs = \
|
236 |
|
|
"http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet"
|
237 |
|
|
print ''
|
238 |
|
|
|
239 |
|
|
# decide on the forseen default storage place (where are we running?)
|
240 |
|
|
storageTag = 'T2_US_MIT'
|
241 |
|
|
domain = Domain()
|
242 |
|
|
if re.search('mit.edu',domain):
|
243 |
|
|
storageTag = 'T2_US_MIT'
|
244 |
|
|
elif re.search('cern.ch',domain):
|
245 |
|
|
storageTag = 'T0_CH_CERN'
|
246 |
|
|
print ' Loading storage from local seTable: ' + storageTag
|
247 |
|
|
cmd = 'grep ^' + storageTag + ' ' + seFile
|
248 |
paus |
1.2 |
for line in os.popen(cmd).readlines(): # run command
|
249 |
paus |
1.7 |
#print ' LINE: ' + line
|
250 |
|
|
line = line[:-1] # strip '\n'
|
251 |
|
|
line = line.replace(' ','')
|
252 |
|
|
f = line.split(':')
|
253 |
|
|
self.storageEle = f[1]
|
254 |
|
|
self.storagePath = f[2]
|
255 |
|
|
userRemoteDir = f[3]
|
256 |
|
|
print ' Storage -- Ele: ' + self.storageEle \
|
257 |
|
|
+ ' Path: ' + self.storagePath + ' UserDir: ' + userRemoteDir
|
258 |
|
|
self.storagePath += userRemoteDir \
|
259 |
|
|
+ '/' + self.mitCfg + '/' + self.mitVersion + '/' + self.mitDataset
|
260 |
paus |
1.2 |
|
261 |
|
|
#-----------------------------------------------------------------------------------------------
|
262 |
|
|
# present the current crab task
|
263 |
|
|
#-----------------------------------------------------------------------------------------------
|
264 |
|
|
def show(self):
|
265 |
|
|
print ' ==== CRAB Task Information (%s, %s, %s) ===='%(self.tag,self.mitCfg,self.mitVersion)
|
266 |
|
|
print ' Dataset: ' + self.cmsDataset + ' (' + self.mitDataset + ')'
|
267 |
|
|
print ' Storage: %s @ %s'%(self.storagePath,self.storageEle)
|
268 |
|
|
print ' List of sub tasks to be completed: '
|
269 |
|
|
for subTask in self.subTasks:
|
270 |
|
|
subTask.show()
|
271 |
|
|
print ' '
|
272 |
|
|
|
273 |
|
|
#-----------------------------------------------------------------------------------------------
|
274 |
|
|
# create all subtasks of the tasks
|
275 |
|
|
#-----------------------------------------------------------------------------------------------
|
276 |
|
|
def createSubTasks(self,lfnFile):
|
277 |
|
|
print ' creating subtasks'
|
278 |
|
|
# loop through the missing lfn file and create subtasks each nSubTaskEvents
|
279 |
|
|
cmd = 'cat ' + lfnFile
|
280 |
|
|
iLine = 0
|
281 |
|
|
index = 0
|
282 |
|
|
output = open("/tmp/tmp.bak",'w')
|
283 |
|
|
for line in os.popen(cmd).readlines(): # run command
|
284 |
|
|
iLine += 1
|
285 |
|
|
# open file as needed
|
286 |
|
|
if iLine % self.nSubTaskLfnMax == 1:
|
287 |
|
|
if output:
|
288 |
|
|
output.close()
|
289 |
|
|
index += 1
|
290 |
|
|
file = lfnFile + '_%04d' % index
|
291 |
|
|
output = open(file,'w')
|
292 |
|
|
subTask = SubTask(index,file)
|
293 |
|
|
# add this subtaks to the list
|
294 |
|
|
self.subTasks.append(subTask)
|
295 |
|
|
# one more lfn entry for this sub task
|
296 |
|
|
output.write(line)
|
297 |
|
|
subTask.nSubTaskLfn += 1
|
298 |
|
|
|
299 |
|
|
# closeup the last subtask
|
300 |
|
|
output.close()
|
301 |
|
|
|
302 |
|
|
print ' '
|
303 |
|
|
self.show()
|
304 |
|
|
|
305 |
|
|
#-----------------------------------------------------------------------------------------------
|
306 |
|
|
# load all lfns relevant to this task
|
307 |
|
|
#-----------------------------------------------------------------------------------------------
|
308 |
|
|
def loadAllLfns(self, lfnFile):
|
309 |
|
|
# initialize from scratch
|
310 |
|
|
self.lfns = {}
|
311 |
|
|
self.blocks = {}
|
312 |
|
|
self.nTotalEvts = 0
|
313 |
|
|
# use the complete lfn file list
|
314 |
|
|
cmd = 'cat ' + lfnFile
|
315 |
|
|
for line in os.popen(cmd).readlines(): # run command
|
316 |
|
|
line = line[:-1]
|
317 |
|
|
# get ride of empty or commented lines
|
318 |
|
|
if line == '' or line[0] == '#':
|
319 |
|
|
continue
|
320 |
|
|
|
321 |
|
|
# decoding the input line
|
322 |
|
|
f = line.split() # splitting every blank
|
323 |
|
|
block = f[0]
|
324 |
|
|
file = f[1]
|
325 |
|
|
nEvents = int(f[2])
|
326 |
|
|
self.nTotalEvts += nEvents
|
327 |
|
|
|
328 |
|
|
f = file.split('/')
|
329 |
|
|
file = f[-1]
|
330 |
|
|
|
331 |
paus |
1.4 |
if file in self.lfns.keys():
|
332 |
|
|
self.lfns[file] = 1
|
333 |
|
|
else:
|
334 |
paus |
1.2 |
self.lfns[file] = 0
|
335 |
|
|
|
336 |
|
|
if not self.blocks.get(block):
|
337 |
|
|
self.blocks[block] = 1
|
338 |
|
|
else:
|
339 |
|
|
self.blocks[block] += 1
|
340 |
|
|
|
341 |
|
|
print ' TOTAL - Lfns: %6d [ Blocks: %4d Events: %9d ]'\
|
342 |
|
|
%(len(self.lfns),len(self.blocks),self.nTotalEvts)
|
343 |
|
|
|
344 |
|
|
#-----------------------------------------------------------------------------------------------
|
345 |
|
|
# load all lfns so far completed relevant to this task
|
346 |
|
|
#-----------------------------------------------------------------------------------------------
|
347 |
|
|
def loadCompletedLfns(self):
|
348 |
|
|
# initialize from scratch
|
349 |
|
|
self.nLfnDone = 0
|
350 |
|
|
# find all already existing files
|
351 |
|
|
f = self.storagePath.split('=')
|
352 |
|
|
path = f[-1]
|
353 |
|
|
if re.search('crab_0_',path) or re.search('CRAB',path):
|
354 |
|
|
f = path.split('/')
|
355 |
|
|
f = f[:-1]
|
356 |
|
|
path = '/'.join(f)
|
357 |
|
|
cmd = 'list ' + path + ' | grep root 2> /dev/null'
|
358 |
|
|
for line in os.popen(cmd).readlines(): # run command
|
359 |
|
|
f = line.split()
|
360 |
|
|
file = f[1]
|
361 |
paus |
1.4 |
if file in self.lfns.keys():
|
362 |
|
|
self.lfns[file] = 1
|
363 |
|
|
else:
|
364 |
|
|
print ' ERROR -- found completed lfn not in list of all lfns?! ->' + file + '<-'
|
365 |
paus |
1.2 |
self.lfns[file] = 2
|
366 |
|
|
self.nLfnDone += 1
|
367 |
paus |
1.9 |
|
368 |
|
|
# account for files already done in old storage location
|
369 |
|
|
if re.match('/mnt/hadoop',path):
|
370 |
|
|
oldpath = path.replace('/mnt/hadoop','/pnfs/cmsaf.mit.edu/t2bat')
|
371 |
|
|
cmd = 'list ' + oldpath + ' | grep root 2> /dev/null'
|
372 |
|
|
for line in os.popen(cmd).readlines(): # run command
|
373 |
|
|
f = line.split()
|
374 |
|
|
file = f[1]
|
375 |
|
|
if file in self.lfns.keys():
|
376 |
|
|
self.lfns[file] = 1
|
377 |
|
|
else:
|
378 |
|
|
print ' ERROR -- found completed lfn not in list of all lfns?! ->' + file + '<-'
|
379 |
|
|
self.lfns[file] = 2
|
380 |
|
|
self.nLfnDone += 1
|
381 |
|
|
# end of old storage location accounting #
|
382 |
|
|
|
383 |
paus |
1.2 |
print ' DONE - Lfns: %6d'%(self.nLfnDone)
|
384 |
|
|
|
385 |
|
|
#-----------------------------------------------------------------------------------------------
|
386 |
|
|
# load all lfns relevant to this task
|
387 |
|
|
#-----------------------------------------------------------------------------------------------
|
388 |
|
|
def createMissingLfns(self, lfnFile, restLfnFile = "remaining.lfns"):
|
389 |
|
|
# fill the remaining lfns from complete database
|
390 |
|
|
self.status = 'cataloged'
|
391 |
|
|
self.nLfnMissing = 0
|
392 |
|
|
cmd = 'rm -rf ' + restLfnFile + '; touch ' + restLfnFile
|
393 |
|
|
os.system(cmd)
|
394 |
|
|
for lfn,status in self.lfns.iteritems():
|
395 |
|
|
if status == 0:
|
396 |
|
|
# add this lfn to the file
|
397 |
|
|
self.nLfnMissing += 1
|
398 |
|
|
cmd = 'grep ' + lfn + ' ' + lfnFile + ' >> ' + restLfnFile
|
399 |
|
|
os.system(cmd)
|
400 |
|
|
else:
|
401 |
|
|
self.status = 'undefined'
|
402 |
|
|
|
403 |
|
|
# it is important to sort them (by first column == block)
|
404 |
|
|
cmd = 'sort -u ' + restLfnFile + ' > /tmp/cache' + ' ; mv /tmp/cache '+ restLfnFile
|
405 |
|
|
os.system(cmd)
|
406 |
|
|
|
407 |
|
|
print ' MISSING - Lfns: %6d'%(self.nLfnMissing)
|
408 |
|
|
|
409 |
|
|
#-----------------------------------------------------------------------------------------------
|
410 |
|
|
# create an inventory of all the existing output files
|
411 |
|
|
#-----------------------------------------------------------------------------------------------
|
412 |
|
|
def makeInventory(self):
|
413 |
|
|
castorPath = self.storagePath
|
414 |
|
|
f = castorPath.split("=")
|
415 |
|
|
castorPath = f[1]
|
416 |
|
|
#cmd = "srmls srm://" + self.storageEle + ":8443" + self.storagePath + " | grep " \
|
417 |
|
|
# + self.mitDataset + "_"
|
418 |
|
|
cmd = "list " + castorPath + " | grep root | cut -d ' ' -f2"
|
419 |
|
|
|
420 |
|
|
print " cmd: " + cmd
|
421 |
|
|
for status in self.jobStati:
|
422 |
|
|
status.outputFile = 0
|
423 |
|
|
for line in os.popen(cmd).readlines(): # run directory list command
|
424 |
|
|
#print " line: " + line
|
425 |
|
|
line = line[:-1] # strip '\n'
|
426 |
|
|
f = line.split("_")
|
427 |
|
|
number = f.pop()
|
428 |
paus |
1.6 |
|
429 |
|
|
# new crab (2_7_7 from 2_7_2) pop two more :-)
|
430 |
|
|
number = f.pop()
|
431 |
|
|
number = f.pop()
|
432 |
|
|
|
433 |
paus |
1.2 |
f = number.split(".")
|
434 |
|
|
number = int(f[0])
|
435 |
|
|
# update the job status
|
436 |
|
|
#print ' Index: %d >> %s'%(number,line)
|
437 |
|
|
if number-1 >= len(self.jobStati):
|
438 |
|
|
print ' Index out of range requested: %d Waiting for the CRASH!'%(number)
|
439 |
|
|
self.jobStati[number-1].outputFile = 1
|
440 |
|
|
#-----------------------------------------------------------------------------------------------
|
441 |
|
|
# get crab status information for each job in the task
|
442 |
|
|
#-----------------------------------------------------------------------------------------------
|
443 |
|
|
def getJobStati(self):
|
444 |
|
|
# Interact with crab to determine the present status of the jobs
|
445 |
|
|
pattern = 'Created'
|
446 |
|
|
# result = re.search(pattern,line)
|
447 |
|
|
|
448 |
|
|
active = 0
|
449 |
|
|
self.jobStati = []
|
450 |
|
|
self.failingSites = {}
|
451 |
|
|
cmd = 'crab -status -continue ' + self.tag
|
452 |
|
|
#print 'Access Crab Job Stati, now!'
|
453 |
|
|
for line in os.popen(cmd).readlines(): # run command
|
454 |
|
|
line = line[:-1] # strip '\n'
|
455 |
|
|
if active == 0:
|
456 |
|
|
print ' CRAB: ' + line
|
457 |
|
|
else:
|
458 |
|
|
if not re.search(pattern,line) and not re.search('------',line):
|
459 |
|
|
print ' CRAB: ' + line
|
460 |
|
|
|
461 |
|
|
#>> # compactify line
|
462 |
|
|
#>> line = " ".join(str(line).split()).strip()
|
463 |
|
|
#>> f = line.split(" ")
|
464 |
|
|
# decide whether we are in job status line or not
|
465 |
|
|
if line[1:5] == "----":
|
466 |
|
|
if active == 0:
|
467 |
|
|
active = 1 # print "Activated parsing"
|
468 |
|
|
continue
|
469 |
|
|
if active == 1 and line == '':
|
470 |
|
|
active = 0 # print "Deactivated parsing"
|
471 |
|
|
# parse the content of the job report
|
472 |
|
|
if active == 1:
|
473 |
|
|
#>> #print ' LINE: ' + line
|
474 |
|
|
#>> status = JobStatus(int(f[0]),f[1])
|
475 |
|
|
#>> if len(f) > 2:
|
476 |
|
|
#>> status.ce = f[2]
|
477 |
|
|
#>>
|
478 |
|
|
#>> if len(f) >= 2 and f[1] == 'Retrieved':
|
479 |
|
|
#>> if len(f) > 5:
|
480 |
|
|
#>> status.exitCode = int(f[3])
|
481 |
|
|
#>> status.exitStatus = int(f[4])
|
482 |
|
|
|
483 |
|
|
# fixed column read
|
484 |
paus |
1.6 |
## # CRAB_2_7_2
|
485 |
|
|
## ##ID STATUS E_HOST EXE_EXIT_CODE JOB_EXIT_STATUS
|
486 |
|
|
## #lastChar = len(line)
|
487 |
|
|
## #print 'Last Char: %d'%lastChar
|
488 |
|
|
## iJob = int(line[0:5].strip())
|
489 |
|
|
## sJob = line[7:24].strip()
|
490 |
|
|
## status = JobStatus(iJob,sJob)
|
491 |
|
|
## status.ce = line[26:61].strip()
|
492 |
|
|
## tmp = line[63:75].strip()
|
493 |
|
|
|
494 |
|
|
# CRAB_2_7_7
|
495 |
|
|
##ID END STATUS ACTION ExeExitCode JobExitCode E_HOST
|
496 |
|
|
##25 N Running SubSuccess llrcream.in2p3.fr
|
497 |
paus |
1.2 |
#lastChar = len(line)
|
498 |
|
|
#print 'Last Char: %d'%lastChar
|
499 |
|
|
iJob = int(line[0:5].strip())
|
500 |
paus |
1.6 |
sJob = line[10:27].strip()
|
501 |
paus |
1.2 |
status = JobStatus(iJob,sJob)
|
502 |
paus |
1.6 |
status.ce = line[65:].strip()
|
503 |
|
|
tmp = line[41:52].strip()
|
504 |
|
|
|
505 |
paus |
1.2 |
if tmp != '':
|
506 |
|
|
status.exitCode = int(tmp)
|
507 |
paus |
1.6 |
tmp = line[53:64].strip()
|
508 |
paus |
1.2 |
if tmp != '':
|
509 |
|
|
status.exitStatus = int(tmp)
|
510 |
|
|
|
511 |
|
|
#print ' Appending: id %d array entry: %d '%(iJob,len(self.jobStati))
|
512 |
|
|
self.jobStati.append(status)
|
513 |
|
|
|
514 |
|
|
# review job output so far
|
515 |
|
|
if len(self.jobStati) > 0:
|
516 |
|
|
self.makeInventory()
|
517 |
|
|
else:
|
518 |
|
|
print ' ERROR - This task has not jobs stati assigned to it. Something is wrong.'
|
519 |
|
|
print ' crab task id: ' + self.tag
|
520 |
|
|
|
521 |
|
|
|
522 |
|
|
# Make sure certain cases get fixed to avoid deletion
|
523 |
|
|
for status in self.jobStati:
|
524 |
|
|
# fix those jobs where the output has already been found
|
525 |
|
|
if status.exitStatus == 60303 and status.outputFile == 1:
|
526 |
|
|
status.exitStatus = 0
|
527 |
|
|
elif status.exitStatus == -1 and status.exitCode == -1 and status.outputFile == 1:
|
528 |
|
|
status.exitStatus = 0
|
529 |
|
|
status.exitCode = 0
|
530 |
|
|
|
531 |
|
|
elif (status.exitStatus != 0 or status.exitCode != 0) and \
|
532 |
|
|
not (status.exitStatus == -1 and status.exitCode == -1):
|
533 |
|
|
|
534 |
|
|
print " ==> Failure: filing with status/code: %d %d CE: %s" \
|
535 |
|
|
%(status.exitStatus,status.exitCode,status.ce)
|
536 |
|
|
|
537 |
|
|
if status.ce in self.failingSites:
|
538 |
|
|
self.failingSites[status.ce] += 1
|
539 |
|
|
else:
|
540 |
|
|
self.failingSites[status.ce] = 1
|
541 |
|
|
|
542 |
|
|
#print ' Dimension of Job Stati: %d'%(len(self.jobStati))
|
543 |
|
|
|
544 |
|
|
# Loop through the job stati and determine the task status
|
545 |
|
|
# - check whether task is completed
|
546 |
|
|
active = 0
|
547 |
|
|
for status in self.jobStati:
|
548 |
paus |
1.5 |
if status.tag != 'Aborted' and status.tag != 'Retrieved' and status.tag != 'Created' and status.tag != 'Cleared':
|
549 |
paus |
1.2 |
active = 1
|
550 |
|
|
break
|
551 |
|
|
if active == 0:
|
552 |
|
|
self.status = 'completed'
|
553 |
|
|
for status in self.jobStati:
|
554 |
|
|
if status.tag == 'Aborted' or status.exitCode != 0 or status.exitStatus != 0:
|
555 |
|
|
self.status = 'finished'
|
556 |
|
|
break
|
557 |
|
|
else:
|
558 |
|
|
self.status = 'active'
|
559 |
|
|
|
560 |
|
|
|
561 |
|
|
|
562 |
|
|
#-----------------------------------------------------------------------------------------------
|
563 |
|
|
# print the line to complete the task
|
564 |
|
|
#-----------------------------------------------------------------------------------------------
|
565 |
|
|
def complete(self):
|
566 |
|
|
print ' ./bin/submit.py --noTestJob --complete --version=008 --mitDataset=%s'% \
|
567 |
|
|
(self.mitDataset)
|
568 |
|
|
|
569 |
|
|
#-----------------------------------------------------------------------------------------------
|
570 |
|
|
# print the line to remove the task and do it if requested
|
571 |
|
|
#-----------------------------------------------------------------------------------------------
|
572 |
|
|
def remove(self,clean=0):
|
573 |
paus |
1.9 |
cmd = ' cleanupLog.py --crabId=' + self.tag + \
|
574 |
paus |
1.2 |
'; mkdir -p ./completed; mv ' + self.tag + ' ./completed/'
|
575 |
|
|
print ' -> ' + cmd
|
576 |
|
|
if clean == 1:
|
577 |
|
|
os.system(cmd)
|
578 |
|
|
|
579 |
|
|
#-----------------------------------------------------------------------------------------------
|
580 |
|
|
# print the line to remove the task and do it if requested
|
581 |
|
|
#-----------------------------------------------------------------------------------------------
|
582 |
|
|
def killAndRemove(self,clean=0):
|
583 |
paus |
1.9 |
# kill the remaining jobs
|
584 |
|
|
cmd = 'crab -kill all -continue ' +self.tag
|
585 |
paus |
1.2 |
print ' -> ' + cmd
|
586 |
|
|
if clean == 1:
|
587 |
|
|
os.system(cmd)
|
588 |
paus |
1.9 |
# now remove the remainders
|
589 |
paus |
1.11 |
self.remove(clean)
|
590 |
paus |
1.2 |
|
591 |
|
|
#---------------------------------------------------------------------------------------------------
|
592 |
|
|
"""
|
593 |
|
|
Class: JobStatus(index,tag)
|
594 |
|
|
The status of one job of a list of CRAB jobs (inside one task) is described by this class
|
595 |
|
|
"""
|
596 |
|
|
#---------------------------------------------------------------------------------------------------
|
597 |
|
|
class JobStatus:
|
598 |
|
|
"Minimal but sufficient Job Status for crab operations"
|
599 |
|
|
index = -1
|
600 |
|
|
tag = 'undefined'
|
601 |
|
|
ce = 'undefined'
|
602 |
|
|
outputFile = -1
|
603 |
|
|
exitCode = -1
|
604 |
|
|
exitStatus = -1
|
605 |
|
|
#-----------------------------------------------------------------------------------------------
|
606 |
|
|
# constructor
|
607 |
|
|
#-----------------------------------------------------------------------------------------------
|
608 |
|
|
def __init__(self, index, tag):
|
609 |
|
|
self.index = index
|
610 |
|
|
self.tag = tag
|
611 |
|
|
#-----------------------------------------------------------------------------------------------
|
612 |
|
|
# present the current crab task in compact form
|
613 |
|
|
#-----------------------------------------------------------------------------------------------
|
614 |
|
|
def showCompact(self):
|
615 |
|
|
print 'Status: %6d %20s Output: %2d Exit: %6d,%6d at CE: %s'% \
|
616 |
|
|
(self.index,self.tag,self.outputFile,self.exitCode, \
|
617 |
|
|
self.exitStatus,self.ce)
|
618 |
|
|
#-----------------------------------------------------------------------------------------------
|
619 |
|
|
# present the current crab task in long form
|
620 |
|
|
#-----------------------------------------------------------------------------------------------
|
621 |
|
|
def show(self):
|
622 |
|
|
print '==== Job Status Information ===='
|
623 |
|
|
print ' Index: %6d Tag: %s CE: %s'%(self.index,self.tag,self.ce)
|
624 |
|
|
print ' Output: %2d Exit: %6d,%6d'%(self.outputFile,self.exitCode,self.exitStatus)
|