1 |
|
#!/usr/bin/env python |
2 |
|
|
3 |
< |
import sys, getopt, string |
3 |
> |
import sys, getopt, string, os |
4 |
|
|
5 |
|
from ProdCommon.FwkJobRep.ReportParser import readJobReport |
6 |
|
from DashboardAPI import apmonSend, apmonFree |
9 |
|
def __init__(self, argv): |
10 |
|
""" |
11 |
|
parseCrabFjr |
12 |
< |
|
12 |
> |
|
13 |
|
- parse CRAB FrameworkJobReport on WN: { 'protocol' : { 'action' : [attempted,succedeed,total-size,total-time,min-time,max-time] , ... } , ... } |
14 |
|
- report parameters to DashBoard using DashBoardApi.py: for all 'read' actions of all protocols, report MBPS |
15 |
|
- return ExitStatus and dump of DashBoard report separated by semi-colon to WN wrapper script |
18 |
|
self.input = '' |
19 |
|
self.MonitorID = '' |
20 |
|
self.MonitorJobID = '' |
21 |
< |
self.info2dash = False |
22 |
< |
self.exitCode = False |
23 |
< |
self.lfnList = False |
21 |
> |
self.info2dash = False |
22 |
> |
self.exitCode = False |
23 |
> |
self.lfnList = False |
24 |
|
self.debug = 0 |
25 |
|
try: |
26 |
< |
opts, args = getopt.getopt(argv, "", ["input=", "dashboard=", "exitcode", "lfn" , "debug", "help"]) |
26 |
> |
opts, args = getopt.getopt(argv, "", ["input=", "dashboard=", "exitcode", "lfn" , "debug", "popularity=", "help"]) |
27 |
|
except getopt.GetoptError: |
28 |
|
print self.usage() |
29 |
|
sys.exit(2) |
30 |
< |
self.check(opts) |
30 |
> |
self.check(opts) |
31 |
|
|
32 |
|
return |
33 |
|
|
34 |
< |
def check(self,opts): |
34 |
> |
def check(self,opts): |
35 |
|
# check command line parameter |
36 |
|
for opt, arg in opts : |
37 |
|
if opt == "--help" : |
40 |
|
elif opt == "--input" : |
41 |
|
self.input = arg |
42 |
|
elif opt == "--exitcode": |
43 |
< |
self.exitCode = True |
43 |
> |
self.exitCode = True |
44 |
|
elif opt == "--lfn": |
45 |
< |
self.lfnList = True |
45 |
> |
self.lfnList = True |
46 |
> |
elif opt == "--popularity": |
47 |
> |
self.popularity = True |
48 |
> |
try: |
49 |
> |
self.MonitorID = arg.split(",")[0] |
50 |
> |
self.MonitorJobID = arg.split(",")[1] |
51 |
> |
self.inputInfos = arg.split(",")[2] |
52 |
> |
except: |
53 |
> |
self.MonitorID = '' |
54 |
> |
self.MonitorJobID = '' |
55 |
> |
self.inputInfos = '' |
56 |
|
elif opt == "--dashboard": |
57 |
< |
self.info2dash = True |
58 |
< |
try: |
57 |
> |
self.info2dash = True |
58 |
> |
try: |
59 |
|
self.MonitorID = arg.split(",")[0] |
60 |
|
self.MonitorJobID = arg.split(",")[1] |
61 |
|
except: |
63 |
|
self.MonitorJobID = '' |
64 |
|
elif opt == "--debug" : |
65 |
|
self.debug = 1 |
66 |
< |
|
67 |
< |
if self.input == '' or (not self.info2dash and not self.lfnList and not self.exitCode) : |
66 |
> |
|
67 |
> |
if self.input == '' or (not self.info2dash and not self.lfnList and not self.exitCode and not self.popularity) : |
68 |
|
print self.usage() |
69 |
|
sys.exit() |
70 |
< |
|
71 |
< |
if self.info2dash: |
70 |
> |
|
71 |
> |
if self.info2dash: |
72 |
|
if self.MonitorID == '' or self.MonitorJobID == '': |
73 |
|
print self.usage() |
74 |
|
sys.exit() |
75 |
|
return |
76 |
|
|
77 |
< |
def run(self): |
77 |
> |
def run(self): |
78 |
|
|
79 |
|
# load FwkJobRep |
80 |
< |
jobReport = readJobReport(self.input)[0] |
81 |
< |
if self.exitCode : |
80 |
> |
try: |
81 |
> |
jobReport = readJobReport(self.input)[0] |
82 |
> |
except: |
83 |
> |
print '50115' |
84 |
> |
sys.exit() |
85 |
> |
|
86 |
> |
if self.exitCode : |
87 |
|
self.exitCodes(jobReport) |
88 |
< |
if self.lfnList : |
88 |
> |
if self.lfnList : |
89 |
|
self.lfn_List(jobReport) |
90 |
< |
if self.info2dash : |
90 |
> |
if self.info2dash : |
91 |
|
self.reportDash(jobReport) |
92 |
+ |
if self.popularity: |
93 |
+ |
self.popularityInfos(jobReport) |
94 |
|
return |
95 |
|
|
96 |
|
def exitCodes(self, jobReport): |
102 |
|
if (len_fjr <= 6): |
103 |
|
### 50115 - cmsRun did not produce a valid/readable job report at runtime |
104 |
|
exit_status = str(50115) |
105 |
< |
else: |
105 |
> |
else: |
106 |
|
# get ExitStatus of last error |
107 |
|
if len(jobReport.errors) != 0 : |
108 |
|
exit_status = str(jobReport.errors[-1]['ExitStatus']) |
109 |
|
else : |
110 |
|
exit_status = str(0) |
111 |
< |
#check exit code |
111 |
> |
#check exit code |
112 |
|
if string.strip(exit_status) == '': exit_status = -999 |
113 |
< |
print exit_status |
114 |
< |
|
113 |
> |
print exit_status |
114 |
> |
|
115 |
|
return |
116 |
|
|
117 |
|
def lfn_List(self,jobReport): |
118 |
< |
''' |
119 |
< |
get list of analyzed files |
118 |
> |
''' |
119 |
> |
get list of analyzed files |
120 |
|
''' |
121 |
|
lfnlist = [x['LFN'] for x in jobReport.inputFiles] |
122 |
|
for file in lfnlist: print file |
123 |
< |
return |
123 |
> |
return |
124 |
|
|
125 |
|
def storageStat(self,jobReport): |
126 |
< |
''' |
127 |
< |
get i/o statistics |
126 |
> |
''' |
127 |
> |
get i/o statistics |
128 |
|
''' |
129 |
|
storageStatistics = str(jobReport.storageStatistics) |
130 |
|
storage_report = {} |
157 |
|
storage_report[protocol][action] = [attempted,succeeded,total_size,total_time,min_time,max_time] |
158 |
|
else : |
159 |
|
storage_report[protocol] = {action : [attempted,succeeded,total_size,total_time,min_time,max_time] } |
160 |
< |
|
160 |
> |
|
161 |
|
if self.debug : |
162 |
|
for protocol in storage_report.keys() : |
163 |
|
print 'protocol:',protocol |
164 |
|
for action in storage_report[protocol].keys() : |
165 |
|
print 'action:',action,'measurement:',storage_report[protocol][action] |
166 |
|
|
167 |
< |
##### |
167 |
> |
##### |
168 |
|
# throughput measurements # Fabio |
169 |
|
throughput_report = { 'rSize':0.0, 'rTime':0.0, 'wSize':0.0, 'wTime':0.0 } |
170 |
|
for protocol in storage_report.keys() : |
174 |
|
continue |
175 |
|
|
176 |
|
# convert values |
177 |
< |
try: |
177 |
> |
try: |
178 |
|
sizeValue = float(storage_report[protocol][action][2]) |
179 |
|
timeValue = float(storage_report[protocol][action][3]) |
180 |
|
except Exception,e: |
181 |
|
continue |
182 |
< |
|
182 |
> |
|
183 |
|
# aggregate data |
184 |
< |
if 'read' in action: |
184 |
> |
if 'read' in action: |
185 |
|
throughput_report['rSize'] += sizeValue |
186 |
|
throughput_report['rTime'] += timeValue |
187 |
|
elif 'write' in action: |
200 |
|
|
201 |
|
throughput_report['avgNetThr'] = 'NULL' |
202 |
|
try: |
203 |
< |
throughput_report['avgNetThr'] = throughput_report['rSize'] / float(jobReport.performance.summaries['ExeTime']) |
203 |
> |
throughput_report['avgNetThr'] = throughput_report['rSize'] / float(jobReport.performance.summaries['ExeTime']) |
204 |
|
except: |
205 |
|
pass |
206 |
|
|
207 |
|
throughput_report['writeThr'] = 'NULL' |
208 |
|
if throughput_report['wTime'] > 0.0: |
209 |
< |
throughput_report['wTime'] /= 1000.0 |
209 |
> |
throughput_report['wTime'] /= 1000.0 |
210 |
|
throughput_report['writeThr'] = float(throughput_report['wSize']/throughput_report['wTime']) |
211 |
< |
##### |
212 |
< |
|
213 |
< |
if self.debug == 1 : |
211 |
> |
##### |
212 |
> |
|
213 |
> |
if self.debug == 1 : |
214 |
|
print storage_report |
215 |
|
print throughput_report |
216 |
|
return storage_report, throughput_report |
217 |
|
|
218 |
+ |
def popularityInfos(self, jobReport): |
219 |
+ |
report_dict = {} |
220 |
+ |
inputList = [] |
221 |
+ |
inputParentList = [] |
222 |
+ |
report_dict['inputBlocks'] = '' |
223 |
+ |
if (os.path.exists(self.inputInfos)): |
224 |
+ |
file=open(self.inputInfos,'r') |
225 |
+ |
lines = file.readlines() |
226 |
+ |
for line in lines: |
227 |
+ |
if line.find("inputBlocks")>=0: |
228 |
+ |
report_dict['inputBlocks']= line.split("=")[1].strip() |
229 |
+ |
if line.find("inputFiles")>=0: |
230 |
+ |
inputList = line.split("=")[1].strip().split(";") |
231 |
+ |
if line.find("parentFiles")>=0: |
232 |
+ |
inputParentList = line.split("=")[1].strip().split(";") |
233 |
+ |
file.close() |
234 |
+ |
if len(inputList) == 1 and inputList[0] == '': |
235 |
+ |
inputList=[] |
236 |
+ |
if len(inputParentList) == 1 and inputParentList[0] == '': |
237 |
+ |
inputParentList=[] |
238 |
+ |
basename = '' |
239 |
+ |
if len(inputList) > 1: |
240 |
+ |
basename = os.path.commonprefix(inputList) |
241 |
+ |
elif len(inputList) == 1: |
242 |
+ |
basename = "%s/"%os.path.dirname(inputList[0]) |
243 |
+ |
basenameParent = '' |
244 |
+ |
if len(inputParentList) > 1: |
245 |
+ |
basenameParent = os.path.commonprefix(inputParentList) |
246 |
+ |
elif len(inputParentList) == 1: |
247 |
+ |
basenameParent = "%s/"%os.path.dirname(inputParentList[0]) |
248 |
+ |
|
249 |
+ |
readFile = {} |
250 |
+ |
|
251 |
+ |
readFileParent = {} |
252 |
+ |
fileAttr = [] |
253 |
+ |
fileParentAttr = [] |
254 |
+ |
for inputFile in jobReport.inputFiles: |
255 |
+ |
fileAccess = 'Local' |
256 |
+ |
if inputFile.get("PFN").find('xrootd'): fileAccess = 'Remote' |
257 |
+ |
if inputFile['LFN'].find(basename) >=0: |
258 |
+ |
fileAttr = (inputFile.get("FileType"), fileAccess, inputFile.get("Runs")) |
259 |
+ |
readFile[inputFile.get("LFN").split(basename)[1]] = fileAttr |
260 |
+ |
else: |
261 |
+ |
fileParentAttr = (inputFile.get("FileType"), fileAccess, inputFile.get("Runs")) |
262 |
+ |
readParentFile[inputFile.get("LFN").split(basenameParent)[1]] = fileParentAttr |
263 |
+ |
cleanedinputList = [] |
264 |
+ |
for file in inputList: |
265 |
+ |
cleanedinputList.append(file.split(basename)[1]) |
266 |
+ |
cleanedParentList = [] |
267 |
+ |
for file in inputParentList: |
268 |
+ |
cleanedParentList.append(file.split(basenameParent)[1]) |
269 |
+ |
|
270 |
+ |
inputString = '' |
271 |
+ |
LumisString = '' |
272 |
+ |
countFile = 1 |
273 |
+ |
for f,t in readFile.items(): |
274 |
+ |
cleanedinputList.remove(f) |
275 |
+ |
inputString += '%s::%d::%s::%s::%d;'%(f,1,t[0],t[1],countFile) |
276 |
+ |
LumisString += '%s::%s::%d;'%(t[2].keys()[0],self.makeRanges(t[2].values()[0]),countFile) |
277 |
+ |
countFile += 1 |
278 |
+ |
|
279 |
+ |
inputParentString = '' |
280 |
+ |
LumisParentString = '' |
281 |
+ |
countParentFile = 1 |
282 |
+ |
for fp,tp in readFileParent.items(): |
283 |
+ |
cleanedParentList.remove(fp) |
284 |
+ |
inputParentString += '%s::%d::%s::%s::%d;'%(fp,1,tp[0],tp[1],countParentFile) |
285 |
+ |
LumisParentString += '%s::%s::%d;'%(tp[2].keys()[0],self.makeRanges(tp[2].values()[0]),countParentFile) |
286 |
+ |
countParentFile += 1 |
287 |
+ |
|
288 |
+ |
if len(cleanedinputList): |
289 |
+ |
for file in cleanedinputList : |
290 |
+ |
if len(jobReport.errors): |
291 |
+ |
if jobReport.errors[0]["Description"].find(file) >= 0: |
292 |
+ |
fileAccess = 'Local' |
293 |
+ |
if jobReport.errors[0]["Description"].find('xrootd') >= 0: fileAccess = 'Remote' |
294 |
+ |
inputString += '%s::%d::%s::%s::%s;'%(file,0,'Unknown',fileAccess,'Unknown') |
295 |
+ |
else: |
296 |
+ |
inputString += '%s::%d::%s::%s::%s;'%(file,2,'Unknown','Unknown','Unknown') |
297 |
+ |
else: |
298 |
+ |
inputString += '%s::%d::%s::%s::%s;'%(file,2,'Unknown','Unknown','Unknown') |
299 |
+ |
|
300 |
+ |
if len(cleanedParentList): |
301 |
+ |
for file in cleanedParentList : |
302 |
+ |
if len(jobReport.errors): |
303 |
+ |
if jobReport.errors[0]["Description"].find(file) >= 0: |
304 |
+ |
inputString += '%s::%d::%s::%s::%s;'%(file,0,'Unknown','Local','Unknown') |
305 |
+ |
else: |
306 |
+ |
inputString += '%s::%d::%s::%s::%s;'%(file,2,'Unknown','Unknown','Unknown') |
307 |
+ |
else: |
308 |
+ |
inputParentString += '%s::%d::%s::%s::%s;'%(file,2,'Unknown','Unknown','Unknown') |
309 |
+ |
|
310 |
+ |
report_dict['inputFiles']= inputString |
311 |
+ |
report_dict['parentFiles']= inputParentString |
312 |
+ |
report_dict['lumisRange']= LumisString |
313 |
+ |
report_dict['lumisParentRange']= LumisParentString |
314 |
+ |
report_dict['Basename']= basename |
315 |
+ |
report_dict['BasenameParent']= basenameParent |
316 |
+ |
|
317 |
+ |
# send to DashBoard |
318 |
+ |
apmonSend(self.MonitorID, self.MonitorJobID, report_dict) |
319 |
+ |
apmonFree() |
320 |
+ |
|
321 |
+ |
# if self.debug == 1 : |
322 |
+ |
print "Popularity start" |
323 |
+ |
for k,v in report_dict.items(): |
324 |
+ |
print "%s : %s"%(k,v) |
325 |
+ |
print "Popularity stop" |
326 |
+ |
return |
327 |
+ |
|
328 |
|
def n_of_events(self,jobReport): |
329 |
|
''' |
330 |
< |
#Brian's patch to sent number of events procedded to the Dashboard |
330 |
> |
#Brian's patch to sent number of events procedded to the Dashboard |
331 |
|
# Add NoEventsPerRun to the Dashboard report |
332 |
< |
''' |
332 |
> |
''' |
333 |
|
event_report = {} |
334 |
|
eventsPerRun = 0 |
335 |
|
for inputFile in jobReport.inputFiles: |
346 |
|
if self.debug == 1 : print event_report |
347 |
|
|
348 |
|
return event_report |
349 |
< |
|
349 |
> |
|
350 |
|
def reportDash(self,jobReport): |
351 |
|
''' |
352 |
< |
dashboard report dictionary |
352 |
> |
dashboard report dictionary |
353 |
|
''' |
354 |
|
event_report = self.n_of_events(jobReport) |
355 |
|
storage_report, throughput_report = self.storageStat(jobReport) |
380 |
|
dashboard_report['io_read_throughput'] = throughput_report['readThr'] |
381 |
|
dashboard_report['io_write_throughput'] = throughput_report['writeThr'] |
382 |
|
dashboard_report['io_netAvg_throughput'] = throughput_report['avgNetThr'] |
383 |
< |
|
383 |
> |
|
384 |
|
# send to DashBoard |
385 |
|
apmonSend(self.MonitorID, self.MonitorJobID, dashboard_report) |
386 |
|
apmonFree() |
389 |
|
|
390 |
|
return |
391 |
|
|
392 |
+ |
def makeRanges(self,lumilist): |
393 |
+ |
""" convert list to range """ |
394 |
+ |
|
395 |
+ |
counter = lumilist[0] |
396 |
+ |
lumilist.remove(counter) |
397 |
+ |
tempRange=[] |
398 |
+ |
tempRange.append(counter) |
399 |
+ |
string = '' |
400 |
+ |
for i in lumilist: |
401 |
+ |
if i == counter+1: |
402 |
+ |
tempRange.append(i) |
403 |
+ |
counter +=1 |
404 |
+ |
else: |
405 |
+ |
if len(tempRange)==1: |
406 |
+ |
string += "%s,"%tempRange[0] |
407 |
+ |
else: |
408 |
+ |
string += "%s-%s,"%(tempRange[:1][0],tempRange[-1:][0]) |
409 |
+ |
counter = i |
410 |
+ |
tempRange=[] |
411 |
+ |
tempRange.append(counter) |
412 |
+ |
if i == lumilist[-1:][0] : |
413 |
+ |
if len(tempRange)==1: |
414 |
+ |
string += "%s"%tempRange[0] |
415 |
+ |
else: |
416 |
+ |
string += "%s-%s"%(tempRange[:1][0],tempRange[-1:][0]) |
417 |
+ |
return string |
418 |
+ |
|
419 |
|
def usage(self): |
420 |
< |
|
421 |
< |
msg=""" |
420 |
> |
|
421 |
> |
msg=""" |
422 |
|
required parameters: |
423 |
|
--input : input FJR xml file |
424 |
< |
|
424 |
> |
|
425 |
|
optional parameters: |
426 |
|
--dashboard : send info to the dashboard. require following args: "MonitorID,MonitorJobID" |
427 |
|
MonitorID : DashBoard MonitorID |
428 |
|
MonitorJobID : DashBoard MonitorJobID |
429 |
< |
--exitcode : print executable exit code |
429 |
> |
--exitcode : print executable exit code |
430 |
|
--lfn : report list of files really analyzed |
431 |
|
--help : help |
432 |
|
--debug : debug statements |
433 |
|
""" |
434 |
< |
return msg |
434 |
> |
return msg |
435 |
|
|
436 |
|
|
437 |
|
if __name__ == '__main__' : |
438 |
< |
try: |
439 |
< |
parseFjr_ = parseFjr(sys.argv[1:]) |
440 |
< |
parseFjr_.run() |
438 |
> |
try: |
439 |
> |
parseFjr_ = parseFjr(sys.argv[1:]) |
440 |
> |
parseFjr_.run() |
441 |
|
except: |
442 |
|
pass |