ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/parseCrabFjr.py
Revision: 1.21
Committed: Fri Jan 20 10:51:47 2012 UTC (13 years, 3 months ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_9_1, CRAB_2_9_1_pre2, CRAB_2_9_1_pre1, CRAB_2_9_0, CRAB_2_9_0_pre2, CRAB_2_9_0_pre1, CRAB_2_8_8, CRAB_2_8_8_pre1, CRAB_2_8_7_patch3, CRAB_2_8_7_patch2, CRAB_2_8_7_patch1, CRAB_2_8_7, CRAB_2_8_7_pre2, CRAB_2_8_7_pre1, CRAB_2_8_6, CRAB_2_8_6_pre1, CRAB_2_8_5_patch3, CRAB_2_8_5_patch2, CRAB_2_8_5_patch1, CRAB_2_8_5, CRAB_2_8_5_pre5, CRAB_2_8_5_pre4, CRAB_2_8_5_pre3, CRAB_2_8_4_patch3, CRAB_2_8_5_pre2, CRAB_2_8_4_patch2, CRAB_2_8_5_pre1, CRAB_2_8_4_patch1, CRAB_2_8_4, CRAB_2_8_4_pre5, CRAB_2_8_4_pre4, CRAB_2_8_4_pre3, CRAB_2_8_4_pre2, CRAB_2_8_4_pre1, CRAB_2_8_3, CRAB_2_8_3_pre4, CRAB_2_8_3_pre3, CRAB_2_8_3_pre2, CRAB_2_8_3_pre1, CRAB_2_8_2_patch1, CRAB_2_8_2, CRAB_2_8_2_pre5, CRAB_2_8_2_pre4, CRAB_2_8_2_pre3, CRAB_2_8_2_pre2, CRAB_2_8_2_pre1, CRAB_2_8_1, CRAB_2_8_0, CRAB_2_8_0_pre1, CRAB_2_7_10_pre3, CRAB_2_7_10_pre2, HEAD
Changes since 1.20: +1 -1 lines
Log Message:
fix minor bug

File Contents

# User Rev Content
1 gutsche 1.1 #!/usr/bin/env python
2    
3 spiga 1.15 import sys, getopt, string, os
4 gutsche 1.1
5 fanzago 1.6 from ProdCommon.FwkJobRep.ReportParser import readJobReport
6 gutsche 1.2 from DashboardAPI import apmonSend, apmonFree
7    
8 spiga 1.10 class parseFjr:
9     def __init__(self, argv):
10     """
11     parseCrabFjr
12 spiga 1.20
13 spiga 1.10 - parse CRAB FrameworkJobReport on WN: { 'protocol' : { 'action' : [attempted,succedeed,total-size,total-time,min-time,max-time] , ... } , ... }
14     - report parameters to DashBoard using DashBoardApi.py: for all 'read' actions of all protocols, report MBPS
15     - return ExitStatus and dump of DashBoard report separated by semi-colon to WN wrapper script
16     """
17     # defaults
18     self.input = ''
19     self.MonitorID = ''
20     self.MonitorJobID = ''
21 spiga 1.20 self.info2dash = False
22     self.exitCode = False
23     self.lfnList = False
24 spiga 1.10 self.debug = 0
25     try:
26 spiga 1.15 opts, args = getopt.getopt(argv, "", ["input=", "dashboard=", "exitcode", "lfn" , "debug", "popularity=", "help"])
27 spiga 1.10 except getopt.GetoptError:
28     print self.usage()
29     sys.exit(2)
30 spiga 1.20 self.check(opts)
31 spiga 1.10
32     return
33    
34 spiga 1.20 def check(self,opts):
35 spiga 1.10 # check command line parameter
36     for opt, arg in opts :
37     if opt == "--help" :
38     print self.usage()
39     sys.exit()
40     elif opt == "--input" :
41     self.input = arg
42     elif opt == "--exitcode":
43 spiga 1.20 self.exitCode = True
44 spiga 1.10 elif opt == "--lfn":
45 spiga 1.20 self.lfnList = True
46 spiga 1.15 elif opt == "--popularity":
47 spiga 1.20 self.popularity = True
48     try:
49 spiga 1.15 self.MonitorID = arg.split(",")[0]
50 spiga 1.20 self.MonitorJobID = arg.split(",")[1]
51     self.inputInfos = arg.split(",")[2]
52 spiga 1.15 except:
53     self.MonitorID = ''
54     self.MonitorJobID = ''
55     self.inputInfos = ''
56 spiga 1.10 elif opt == "--dashboard":
57 spiga 1.20 self.info2dash = True
58     try:
59 spiga 1.10 self.MonitorID = arg.split(",")[0]
60     self.MonitorJobID = arg.split(",")[1]
61     except:
62     self.MonitorID = ''
63     self.MonitorJobID = ''
64     elif opt == "--debug" :
65     self.debug = 1
66 spiga 1.20
67 spiga 1.15 if self.input == '' or (not self.info2dash and not self.lfnList and not self.exitCode and not self.popularity) :
68 spiga 1.10 print self.usage()
69 gutsche 1.1 sys.exit()
70 spiga 1.20
71     if self.info2dash:
72 spiga 1.10 if self.MonitorID == '' or self.MonitorJobID == '':
73     print self.usage()
74     sys.exit()
75     return
76    
77 spiga 1.20 def run(self):
78 spiga 1.10
79     # load FwkJobRep
80 fanzago 1.14 try:
81     jobReport = readJobReport(self.input)[0]
82     except:
83     print '50115'
84     sys.exit()
85 spiga 1.20
86     if self.exitCode :
87 spiga 1.10 self.exitCodes(jobReport)
88 spiga 1.20 if self.lfnList :
89 spiga 1.10 self.lfn_List(jobReport)
90 spiga 1.20 if self.info2dash :
91 spiga 1.10 self.reportDash(jobReport)
92 spiga 1.15 if self.popularity:
93     self.popularityInfos(jobReport)
94 spiga 1.10 return
95    
96     def exitCodes(self, jobReport):
97    
98     exit_status = ''
99     ##### temporary fix for FJR incomplete ####
100     fjr = open (self.input)
101     len_fjr = len(fjr.readlines())
102     if (len_fjr <= 6):
103     ### 50115 - cmsRun did not produce a valid/readable job report at runtime
104     exit_status = str(50115)
105 spiga 1.20 else:
106 spiga 1.10 # get ExitStatus of last error
107     if len(jobReport.errors) != 0 :
108     exit_status = str(jobReport.errors[-1]['ExitStatus'])
109     else :
110     exit_status = str(0)
111 spiga 1.20 #check exit code
112 spiga 1.10 if string.strip(exit_status) == '': exit_status = -999
113 spiga 1.20 print exit_status
114    
115 spiga 1.10 return
116    
117     def lfn_List(self,jobReport):
118 spiga 1.20 '''
119     get list of analyzed files
120 spiga 1.10 '''
121     lfnlist = [x['LFN'] for x in jobReport.inputFiles]
122     for file in lfnlist: print file
123 spiga 1.20 return
124 spiga 1.10
125     def storageStat(self,jobReport):
126 spiga 1.20 '''
127     get i/o statistics
128 spiga 1.10 '''
129     storageStatistics = str(jobReport.storageStatistics)
130     storage_report = {}
131 farinafa 1.11
132 spiga 1.10 # check if storageStatistics is valid
133     if storageStatistics.find('Storage statistics:') != -1 :
134     # report form: { 'protocol' : { 'action' : [attempted,succedeed,total-size,total-time,min-time,max-time] , ... } , ... }
135     data = storageStatistics.split('Storage statistics:')[1]
136     data_fields = data.split(';')
137     for data_field in data_fields:
138     # parse: format protocol/action = attepted/succedeed/total-size/total-time/min-time/max-time
139     if data_field == ' ' or not data_field or data_field == '':
140     continue
141     key = data_field.split('=')[0].strip()
142     item = data_field.split('=')[1].strip()
143     protocol = str(key.split('/')[0].strip())
144     action = str(key.split('/')[1].strip())
145     item_array = item.split('/')
146     attempted = str(item_array[0].strip())
147     succeeded = str(item_array[1].strip())
148     total_size = str(item_array[2].strip().split('MB')[0])
149     total_time = str(item_array[3].strip().split('ms')[0])
150     min_time = str(item_array[4].strip().split('ms')[0])
151     max_time = str(item_array[5].strip().split('ms')[0])
152     # add to report
153     if protocol in storage_report.keys() :
154     if action in storage_report[protocol].keys() :
155     print 'protocol/action:',protocol,'/',action,'listed twice in report, taking the first'
156     else :
157     storage_report[protocol][action] = [attempted,succeeded,total_size,total_time,min_time,max_time]
158 gutsche 1.2 else :
159 spiga 1.10 storage_report[protocol] = {action : [attempted,succeeded,total_size,total_time,min_time,max_time] }
160 spiga 1.20
161 spiga 1.10 if self.debug :
162     for protocol in storage_report.keys() :
163     print 'protocol:',protocol
164     for action in storage_report[protocol].keys() :
165     print 'action:',action,'measurement:',storage_report[protocol][action]
166 farinafa 1.11
167 spiga 1.20 #####
168 farinafa 1.11 # throughput measurements # Fabio
169     throughput_report = { 'rSize':0.0, 'rTime':0.0, 'wSize':0.0, 'wTime':0.0 }
170     for protocol in storage_report.keys() :
171     for action in storage_report[protocol].keys():
172     # not interesting
173     if 'read' not in action and 'write' not in action and 'seek' not in action:
174     continue
175    
176     # convert values
177 spiga 1.20 try:
178 farinafa 1.11 sizeValue = float(storage_report[protocol][action][2])
179     timeValue = float(storage_report[protocol][action][3])
180     except Exception,e:
181     continue
182 spiga 1.20
183 farinafa 1.11 # aggregate data
184 spiga 1.20 if 'read' in action:
185 farinafa 1.11 throughput_report['rSize'] += sizeValue
186     throughput_report['rTime'] += timeValue
187     elif 'write' in action:
188     throughput_report['wSize'] += sizeValue
189     throughput_report['wTime'] += timeValue
190     elif 'seek' in action:
191     throughput_report['rTime'] += timeValue
192     else:
193     continue
194    
195     # calculate global throughput
196     throughput_report['readThr'] = 'NULL'
197     if throughput_report['rTime'] > 0.0:
198     throughput_report['rTime'] /= 1000.0 # scale ms to s
199     throughput_report['readThr'] = float(throughput_report['rSize']/throughput_report['rTime'])
200    
201     throughput_report['avgNetThr'] = 'NULL'
202     try:
203 spiga 1.20 throughput_report['avgNetThr'] = throughput_report['rSize'] / float(jobReport.performance.summaries['ExeTime'])
204 farinafa 1.11 except:
205     pass
206    
207     throughput_report['writeThr'] = 'NULL'
208     if throughput_report['wTime'] > 0.0:
209 spiga 1.20 throughput_report['wTime'] /= 1000.0
210 farinafa 1.11 throughput_report['writeThr'] = float(throughput_report['wSize']/throughput_report['wTime'])
211 spiga 1.20 #####
212    
213     if self.debug == 1 :
214 farinafa 1.11 print storage_report
215     print throughput_report
216     return storage_report, throughput_report
217 spiga 1.10
218 spiga 1.15 def popularityInfos(self, jobReport):
219     report_dict = {}
220     inputList = []
221 spiga 1.20 inputParentList = []
222 spiga 1.16 report_dict['inputBlocks'] = ''
223 spiga 1.15 if (os.path.exists(self.inputInfos)):
224     file=open(self.inputInfos,'r')
225     lines = file.readlines()
226     for line in lines:
227     if line.find("inputBlocks")>=0:
228     report_dict['inputBlocks']= line.split("=")[1].strip()
229     if line.find("inputFiles")>=0:
230     inputList = line.split("=")[1].strip().split(";")
231     if line.find("parentFiles")>=0:
232     inputParentList = line.split("=")[1].strip().split(";")
233     file.close()
234 spiga 1.17 if len(inputList) == 1 and inputList[0] == '':
235     inputList=[]
236     if len(inputParentList) == 1 and inputParentList[0] == '':
237     inputParentList=[]
238 spiga 1.15 basename = ''
239 spiga 1.17 if len(inputList) > 1:
240 spiga 1.15 basename = os.path.commonprefix(inputList)
241 spiga 1.17 elif len(inputList) == 1:
242     basename = "%s/"%os.path.dirname(inputList[0])
243 spiga 1.20 basenameParent = ''
244 spiga 1.17 if len(inputParentList) > 1:
245 spiga 1.15 basenameParent = os.path.commonprefix(inputParentList)
246 spiga 1.20 elif len(inputParentList) == 1:
247 spiga 1.17 basenameParent = "%s/"%os.path.dirname(inputParentList[0])
248 spiga 1.15
249 spiga 1.20 readFile = {}
250    
251     readFileParent = {}
252 spiga 1.17 fileAttr = []
253     fileParentAttr = []
254 spiga 1.15 for inputFile in jobReport.inputFiles:
255 spiga 1.20 fileAccess = 'Local'
256 spiga 1.21 if inputFile.get("PFN").find('xrootd') >= 0 : fileAccess = 'Remote'
257 spiga 1.15 if inputFile['LFN'].find(basename) >=0:
258 spiga 1.20 fileAttr = (inputFile.get("FileType"), fileAccess, inputFile.get("Runs"))
259 spiga 1.15 readFile[inputFile.get("LFN").split(basename)[1]] = fileAttr
260     else:
261 spiga 1.20 fileParentAttr = (inputFile.get("FileType"), fileAccess, inputFile.get("Runs"))
262 spiga 1.15 readParentFile[inputFile.get("LFN").split(basenameParent)[1]] = fileParentAttr
263     cleanedinputList = []
264 spiga 1.20 for file in inputList:
265 spiga 1.15 cleanedinputList.append(file.split(basename)[1])
266     cleanedParentList = []
267 spiga 1.20 for file in inputParentList:
268 spiga 1.15 cleanedParentList.append(file.split(basenameParent)[1])
269    
270     inputString = ''
271     LumisString = ''
272 spiga 1.18 countFile = 1
273 spiga 1.15 for f,t in readFile.items():
274 spiga 1.20 cleanedinputList.remove(f)
275     inputString += '%s::%d::%s::%s::%d;'%(f,1,t[0],t[1],countFile)
276     LumisString += '%s::%s::%d;'%(t[2].keys()[0],self.makeRanges(t[2].values()[0]),countFile)
277 spiga 1.18 countFile += 1
278 spiga 1.15
279     inputParentString = ''
280     LumisParentString = ''
281 spiga 1.18 countParentFile = 1
282 spiga 1.15 for fp,tp in readFileParent.items():
283 spiga 1.20 cleanedParentList.remove(fp)
284     inputParentString += '%s::%d::%s::%s::%d;'%(fp,1,tp[0],tp[1],countParentFile)
285     LumisParentString += '%s::%s::%d;'%(tp[2].keys()[0],self.makeRanges(tp[2].values()[0]),countParentFile)
286 spiga 1.18 countParentFile += 1
287 spiga 1.15
288     if len(cleanedinputList):
289     for file in cleanedinputList :
290     if len(jobReport.errors):
291     if jobReport.errors[0]["Description"].find(file) >= 0:
292 spiga 1.20 fileAccess = 'Local'
293     if jobReport.errors[0]["Description"].find('xrootd') >= 0: fileAccess = 'Remote'
294     inputString += '%s::%d::%s::%s::%s;'%(file,0,'Unknown',fileAccess,'Unknown')
295 spiga 1.15 else:
296 spiga 1.20 inputString += '%s::%d::%s::%s::%s;'%(file,2,'Unknown','Unknown','Unknown')
297 spiga 1.15 else:
298 spiga 1.20 inputString += '%s::%d::%s::%s::%s;'%(file,2,'Unknown','Unknown','Unknown')
299 spiga 1.15
300     if len(cleanedParentList):
301     for file in cleanedParentList :
302     if len(jobReport.errors):
303     if jobReport.errors[0]["Description"].find(file) >= 0:
304 spiga 1.20 inputString += '%s::%d::%s::%s::%s;'%(file,0,'Unknown','Local','Unknown')
305 spiga 1.15 else:
306 spiga 1.20 inputString += '%s::%d::%s::%s::%s;'%(file,2,'Unknown','Unknown','Unknown')
307 spiga 1.15 else:
308 spiga 1.20 inputParentString += '%s::%d::%s::%s::%s;'%(file,2,'Unknown','Unknown','Unknown')
309 spiga 1.15
310     report_dict['inputFiles']= inputString
311 spiga 1.19 report_dict['parentFiles']= inputParentString
312 spiga 1.15 report_dict['lumisRange']= LumisString
313     report_dict['lumisParentRange']= LumisParentString
314     report_dict['Basename']= basename
315     report_dict['BasenameParent']= basenameParent
316    
317     # send to DashBoard
318     apmonSend(self.MonitorID, self.MonitorJobID, report_dict)
319     apmonFree()
320    
321 spiga 1.20 # if self.debug == 1 :
322     print "Popularity start"
323 spiga 1.17 for k,v in report_dict.items():
324     print "%s : %s"%(k,v)
325 spiga 1.20 print "Popularity stop"
326     return
327 spiga 1.15
328 spiga 1.10 def n_of_events(self,jobReport):
329     '''
330 spiga 1.20 #Brian's patch to sent number of events procedded to the Dashboard
331 spiga 1.10 # Add NoEventsPerRun to the Dashboard report
332 spiga 1.20 '''
333 spiga 1.10 event_report = {}
334     eventsPerRun = 0
335     for inputFile in jobReport.inputFiles:
336     try:
337     eventsRead = str(inputFile.get('EventsRead', 0))
338     eventsRead = int(eventsRead.strip())
339     except:
340     continue
341     eventsPerRun += eventsRead
342     event_report['NoEventsPerRun'] = eventsPerRun
343     event_report['NbEvPerRun'] = eventsPerRun
344     event_report['NEventsProcessed'] = eventsPerRun
345    
346     if self.debug == 1 : print event_report
347    
348     return event_report
349 spiga 1.20
350 spiga 1.10 def reportDash(self,jobReport):
351     '''
352 spiga 1.20 dashboard report dictionary
353 spiga 1.10 '''
354     event_report = self.n_of_events(jobReport)
355 farinafa 1.11 storage_report, throughput_report = self.storageStat(jobReport)
356 spiga 1.10 dashboard_report = {}
357     #
358     for k,v in event_report.iteritems() :
359     dashboard_report[k]=v
360 gutsche 1.2
361     # extract information to be sent to DashBoard
362     # per protocol and for action=read, calculate MBPS
363     # dashboard key is io_action
364 spiga 1.10 dashboard_report['MonitorID'] = self.MonitorID
365     dashboard_report['MonitorJobID'] = self.MonitorJobID
366     for protocol in storage_report.keys() :
367     for action in storage_report[protocol].keys() :
368     try: size = float(storage_report[protocol][action][2])
369 gutsche 1.3 except: size = 'NULL'
370 spiga 1.10 try: time = float(storage_report[protocol][action][3])/1000
371 gutsche 1.3 except: time = 'NULL'
372 farinafa 1.13 dashboard_report['io_'+protocol+'_'+action] = str(size)+'_'+str(time)
373 spiga 1.10 if self.debug :
374 gutsche 1.3 ordered = dashboard_report.keys()
375     ordered.sort()
376     for key in ordered:
377     print key,'=',dashboard_report[key]
378 farinafa 1.11
379     # IO throughput information
380     dashboard_report['io_read_throughput'] = throughput_report['readThr']
381     dashboard_report['io_write_throughput'] = throughput_report['writeThr']
382     dashboard_report['io_netAvg_throughput'] = throughput_report['avgNetThr']
383 spiga 1.20
384 gutsche 1.2 # send to DashBoard
385 spiga 1.10 apmonSend(self.MonitorID, self.MonitorJobID, dashboard_report)
386 gutsche 1.2 apmonFree()
387    
388 spiga 1.10 if self.debug == 1 : print dashboard_report
389 gutsche 1.2
390 spiga 1.10 return
391 gutsche 1.5
392 spiga 1.19 def makeRanges(self,lumilist):
393 spiga 1.20 """ convert list to range """
394    
395 spiga 1.19 counter = lumilist[0]
396     lumilist.remove(counter)
397     tempRange=[]
398     tempRange.append(counter)
399     string = ''
400     for i in lumilist:
401     if i == counter+1:
402     tempRange.append(i)
403     counter +=1
404     else:
405     if len(tempRange)==1:
406     string += "%s,"%tempRange[0]
407     else:
408     string += "%s-%s,"%(tempRange[:1][0],tempRange[-1:][0])
409     counter = i
410     tempRange=[]
411     tempRange.append(counter)
412     if i == lumilist[-1:][0] :
413     if len(tempRange)==1:
414     string += "%s"%tempRange[0]
415     else:
416     string += "%s-%s"%(tempRange[:1][0],tempRange[-1:][0])
417     return string
418 spiga 1.20
419 spiga 1.10 def usage(self):
420 spiga 1.20
421     msg="""
422 spiga 1.10 required parameters:
423     --input : input FJR xml file
424 spiga 1.20
425 spiga 1.10 optional parameters:
426     --dashboard : send info to the dashboard. require following args: "MonitorID,MonitorJobID"
427     MonitorID : DashBoard MonitorID
428     MonitorJobID : DashBoard MonitorJobID
429 spiga 1.20 --exitcode : print executable exit code
430 spiga 1.10 --lfn : report list of files really analyzed
431     --help : help
432     --debug : debug statements
433     """
434 spiga 1.20 return msg
435 gutsche 1.1
436    
437     if __name__ == '__main__' :
438 spiga 1.20 try:
439     parseFjr_ = parseFjr(sys.argv[1:])
440     parseFjr_.run()
441 spiga 1.10 except:
442     pass