ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/parseCrabFjr.py
Revision: 1.15
Committed: Wed Mar 23 10:30:23 2011 UTC (14 years, 1 month ago) by spiga
Content type: text/x-python
Branch: MAIN
Changes since 1.14: +104 -3 lines
Log Message:
more info to dashboard

File Contents

# User Rev Content
1 gutsche 1.1 #!/usr/bin/env python
2    
3 spiga 1.15 import sys, getopt, string, os
4 gutsche 1.1
5 fanzago 1.6 from ProdCommon.FwkJobRep.ReportParser import readJobReport
6 gutsche 1.2 from DashboardAPI import apmonSend, apmonFree
7    
8 spiga 1.10 class parseFjr:
9     def __init__(self, argv):
10     """
11     parseCrabFjr
12    
13     - parse CRAB FrameworkJobReport on WN: { 'protocol' : { 'action' : [attempted,succedeed,total-size,total-time,min-time,max-time] , ... } , ... }
14     - report parameters to DashBoard using DashBoardApi.py: for all 'read' actions of all protocols, report MBPS
15     - return ExitStatus and dump of DashBoard report separated by semi-colon to WN wrapper script
16     """
17     # defaults
18     self.input = ''
19     self.MonitorID = ''
20     self.MonitorJobID = ''
21     self.info2dash = False
22     self.exitCode = False
23     self.lfnList = False
24     self.debug = 0
25     try:
26 spiga 1.15 opts, args = getopt.getopt(argv, "", ["input=", "dashboard=", "exitcode", "lfn" , "debug", "popularity=", "help"])
27 spiga 1.10 except getopt.GetoptError:
28     print self.usage()
29     sys.exit(2)
30     self.check(opts)
31    
32     return
33    
34     def check(self,opts):
35     # check command line parameter
36     for opt, arg in opts :
37     if opt == "--help" :
38     print self.usage()
39     sys.exit()
40     elif opt == "--input" :
41     self.input = arg
42     elif opt == "--exitcode":
43     self.exitCode = True
44     elif opt == "--lfn":
45     self.lfnList = True
46 spiga 1.15 elif opt == "--popularity":
47     self.popularity = True
48     try:
49     self.MonitorID = arg.split(",")[0]
50     self.MonitorJobID = arg.split(",")[1]
51     self.inputInfos = arg.split(",")[2]
52     except:
53     self.MonitorID = ''
54     self.MonitorJobID = ''
55     self.inputInfos = ''
56 spiga 1.10 elif opt == "--dashboard":
57     self.info2dash = True
58     try:
59     self.MonitorID = arg.split(",")[0]
60     self.MonitorJobID = arg.split(",")[1]
61     except:
62     self.MonitorID = ''
63     self.MonitorJobID = ''
64     elif opt == "--debug" :
65     self.debug = 1
66    
67 spiga 1.15 if self.input == '' or (not self.info2dash and not self.lfnList and not self.exitCode and not self.popularity) :
68 spiga 1.10 print self.usage()
69 gutsche 1.1 sys.exit()
70 spiga 1.10
71     if self.info2dash:
72     if self.MonitorID == '' or self.MonitorJobID == '':
73     print self.usage()
74     sys.exit()
75     return
76    
77     def run(self):
78    
79     # load FwkJobRep
80 fanzago 1.14 try:
81     jobReport = readJobReport(self.input)[0]
82     except:
83     print '50115'
84     sys.exit()
85    
86 spiga 1.10 if self.exitCode :
87     self.exitCodes(jobReport)
88     if self.lfnList :
89     self.lfn_List(jobReport)
90     if self.info2dash :
91     self.reportDash(jobReport)
92 spiga 1.15 if self.popularity:
93     self.popularityInfos(jobReport)
94 spiga 1.10 return
95    
96     def exitCodes(self, jobReport):
97    
98     exit_status = ''
99     ##### temporary fix for FJR incomplete ####
100     fjr = open (self.input)
101     len_fjr = len(fjr.readlines())
102     if (len_fjr <= 6):
103     ### 50115 - cmsRun did not produce a valid/readable job report at runtime
104     exit_status = str(50115)
105     else:
106     # get ExitStatus of last error
107     if len(jobReport.errors) != 0 :
108     exit_status = str(jobReport.errors[-1]['ExitStatus'])
109     else :
110     exit_status = str(0)
111     #check exit code
112     if string.strip(exit_status) == '': exit_status = -999
113     print exit_status
114    
115     return
116    
117     def lfn_List(self,jobReport):
118     '''
119     get list of analyzed files
120     '''
121     lfnlist = [x['LFN'] for x in jobReport.inputFiles]
122     for file in lfnlist: print file
123     return
124    
125     def storageStat(self,jobReport):
126     '''
127     get i/o statistics
128     '''
129     storageStatistics = str(jobReport.storageStatistics)
130     storage_report = {}
131 farinafa 1.11
132 spiga 1.10 # check if storageStatistics is valid
133     if storageStatistics.find('Storage statistics:') != -1 :
134     # report form: { 'protocol' : { 'action' : [attempted,succedeed,total-size,total-time,min-time,max-time] , ... } , ... }
135     data = storageStatistics.split('Storage statistics:')[1]
136     data_fields = data.split(';')
137     for data_field in data_fields:
138     # parse: format protocol/action = attepted/succedeed/total-size/total-time/min-time/max-time
139     if data_field == ' ' or not data_field or data_field == '':
140     continue
141     key = data_field.split('=')[0].strip()
142     item = data_field.split('=')[1].strip()
143     protocol = str(key.split('/')[0].strip())
144     action = str(key.split('/')[1].strip())
145     item_array = item.split('/')
146     attempted = str(item_array[0].strip())
147     succeeded = str(item_array[1].strip())
148     total_size = str(item_array[2].strip().split('MB')[0])
149     total_time = str(item_array[3].strip().split('ms')[0])
150     min_time = str(item_array[4].strip().split('ms')[0])
151     max_time = str(item_array[5].strip().split('ms')[0])
152     # add to report
153     if protocol in storage_report.keys() :
154     if action in storage_report[protocol].keys() :
155     print 'protocol/action:',protocol,'/',action,'listed twice in report, taking the first'
156     else :
157     storage_report[protocol][action] = [attempted,succeeded,total_size,total_time,min_time,max_time]
158 gutsche 1.2 else :
159 spiga 1.10 storage_report[protocol] = {action : [attempted,succeeded,total_size,total_time,min_time,max_time] }
160    
161     if self.debug :
162     for protocol in storage_report.keys() :
163     print 'protocol:',protocol
164     for action in storage_report[protocol].keys() :
165     print 'action:',action,'measurement:',storage_report[protocol][action]
166 farinafa 1.11
167     #####
168     # throughput measurements # Fabio
169     throughput_report = { 'rSize':0.0, 'rTime':0.0, 'wSize':0.0, 'wTime':0.0 }
170     for protocol in storage_report.keys() :
171     for action in storage_report[protocol].keys():
172     # not interesting
173     if 'read' not in action and 'write' not in action and 'seek' not in action:
174     continue
175    
176     # convert values
177     try:
178     sizeValue = float(storage_report[protocol][action][2])
179     timeValue = float(storage_report[protocol][action][3])
180     except Exception,e:
181     continue
182 spiga 1.10
183 farinafa 1.11 # aggregate data
184     if 'read' in action:
185     throughput_report['rSize'] += sizeValue
186     throughput_report['rTime'] += timeValue
187     elif 'write' in action:
188     throughput_report['wSize'] += sizeValue
189     throughput_report['wTime'] += timeValue
190     elif 'seek' in action:
191     throughput_report['rTime'] += timeValue
192     else:
193     continue
194    
195     # calculate global throughput
196     throughput_report['readThr'] = 'NULL'
197     if throughput_report['rTime'] > 0.0:
198     throughput_report['rTime'] /= 1000.0 # scale ms to s
199     throughput_report['readThr'] = float(throughput_report['rSize']/throughput_report['rTime'])
200    
201     throughput_report['avgNetThr'] = 'NULL'
202     try:
203     throughput_report['avgNetThr'] = throughput_report['rSize'] / float(jobReport.performance.summaries['ExeTime'])
204     except:
205     pass
206    
207     throughput_report['writeThr'] = 'NULL'
208     if throughput_report['wTime'] > 0.0:
209     throughput_report['wTime'] /= 1000.0
210     throughput_report['writeThr'] = float(throughput_report['wSize']/throughput_report['wTime'])
211     #####
212    
213     if self.debug == 1 :
214     print storage_report
215     print throughput_report
216     return storage_report, throughput_report
217 spiga 1.10
218 spiga 1.15 def popularityInfos(self, jobReport):
219     report_dict = {}
220     inputList = []
221     inputParentList = []
222     if (os.path.exists(self.inputInfos)):
223     file=open(self.inputInfos,'r')
224     lines = file.readlines()
225     for line in lines:
226     if line.find("inputBlocks")>=0:
227     report_dict['inputBlocks']= line.split("=")[1].strip()
228     if line.find("inputFiles")>=0:
229     inputList = line.split("=")[1].strip().split(";")
230     if line.find("parentFiles")>=0:
231     inputParentList = line.split("=")[1].strip().split(";")
232     file.close()
233     basename = ''
234     if len(inputList):
235     basename = os.path.commonprefix(inputList)
236     basenameParent = ''
237     if len(inputParentList):
238     basenameParent = os.path.commonprefix(inputParentList)
239    
240     readFile = {}
241    
242     readFileParent = {}
243     for inputFile in jobReport.inputFiles:
244     if inputFile['LFN'].find(basename) >=0:
245     fileAttr = (inputFile.get("FileType"), "Local", inputFile.get("Runs"))
246     readFile[inputFile.get("LFN").split(basename)[1]] = fileAttr
247     else:
248     fileParentAttr = (inputFile.get("FileType"), "Local", inputFile.get("Runs"))
249     readParentFile[inputFile.get("LFN").split(basenameParent)[1]] = fileParentAttr
250     cleanedinputList = []
251     for file in inputList:
252     cleanedinputList.append(file.split(basename)[1])
253     cleanedParentList = []
254     for file in inputParentList:
255     cleanedParentList.append(file.split(basenameParent)[1])
256    
257     inputString = ''
258     LumisString = ''
259     for f,t in readFile.items():
260     cleanedinputList.remove(f)
261     inputString += '%s::%d::%s::%s;'%(f,1,t[0],t[1])
262     LumisString += '%s::%s;'%(t[2].keys()[0],t[2].values()[0])
263    
264     inputParentString = ''
265     LumisParentString = ''
266     for fp,tp in readFileParent.items():
267     cleanedParentList.remove(fp)
268     inputParentString += '%s::%d::%s::%s;'%(fp,1,tp[0],tp[1])
269     LumisParentString += '%s::%s;'%(tp[2].keys()[0],tp[2].values()[0])
270    
271     if len(cleanedinputList):
272     for file in cleanedinputList :
273     if len(jobReport.errors):
274     if jobReport.errors[0]["Description"].find(file) >= 0:
275     inputString += '%s::%d::%s::%s;'%(file,0,'Unknown','Local')
276     else:
277     inputString += '%s::%d::%s::%s;'%(file,2,'Unknown','Unknown')
278     else:
279     inputString += '%s::%d::%s::%s;'%(file,2,'Unknown','Unknown')
280    
281     if len(cleanedParentList):
282     for file in cleanedParentList :
283     if len(jobReport.errors):
284     if jobReport.errors[0]["Description"].find(file) >= 0:
285     inputString += '%s::%d::%s::%s;'%(file,0,'Unknown','Local')
286     else:
287     inputString += '%s::%d::%s::%s;'%(file,2,'Unknown','Unknown')
288     else:
289     inputParentString += '%s::%d::%s::%s;'%(file,2,'Unknown','Unknown')
290    
291     report_dict['inputFiles']= inputString
292     report_dict['parentFIles']= inputParentString
293     report_dict['lumisRange']= LumisString
294     report_dict['lumisParentRange']= LumisParentString
295     report_dict['Basename']= basename
296     report_dict['BasenameParent']= basenameParent
297    
298     # send to DashBoard
299     apmonSend(self.MonitorID, self.MonitorJobID, report_dict)
300     apmonFree()
301    
302     if self.debug == 1 :
303     for k,v in report_dict.items():
304     print "%s : %s"%(k,v)
305     return
306    
307 spiga 1.10 def n_of_events(self,jobReport):
308     '''
309     #Brian's patch to sent number of events procedded to the Dashboard
310     # Add NoEventsPerRun to the Dashboard report
311     '''
312     event_report = {}
313     eventsPerRun = 0
314     for inputFile in jobReport.inputFiles:
315     try:
316     eventsRead = str(inputFile.get('EventsRead', 0))
317     eventsRead = int(eventsRead.strip())
318     except:
319     continue
320     eventsPerRun += eventsRead
321     event_report['NoEventsPerRun'] = eventsPerRun
322     event_report['NbEvPerRun'] = eventsPerRun
323     event_report['NEventsProcessed'] = eventsPerRun
324    
325     if self.debug == 1 : print event_report
326    
327     return event_report
328    
329     def reportDash(self,jobReport):
330     '''
331     dashboard report dictionary
332     '''
333     event_report = self.n_of_events(jobReport)
334 farinafa 1.11 storage_report, throughput_report = self.storageStat(jobReport)
335 spiga 1.10 dashboard_report = {}
336     #
337     for k,v in event_report.iteritems() :
338     dashboard_report[k]=v
339 gutsche 1.2
340     # extract information to be sent to DashBoard
341     # per protocol and for action=read, calculate MBPS
342     # dashboard key is io_action
343 spiga 1.10 dashboard_report['MonitorID'] = self.MonitorID
344     dashboard_report['MonitorJobID'] = self.MonitorJobID
345     for protocol in storage_report.keys() :
346     for action in storage_report[protocol].keys() :
347     try: size = float(storage_report[protocol][action][2])
348 gutsche 1.3 except: size = 'NULL'
349 spiga 1.10 try: time = float(storage_report[protocol][action][3])/1000
350 gutsche 1.3 except: time = 'NULL'
351 farinafa 1.13 dashboard_report['io_'+protocol+'_'+action] = str(size)+'_'+str(time)
352 spiga 1.10 if self.debug :
353 gutsche 1.3 ordered = dashboard_report.keys()
354     ordered.sort()
355     for key in ordered:
356     print key,'=',dashboard_report[key]
357 farinafa 1.11
358     # IO throughput information
359     dashboard_report['io_read_throughput'] = throughput_report['readThr']
360     dashboard_report['io_write_throughput'] = throughput_report['writeThr']
361     dashboard_report['io_netAvg_throughput'] = throughput_report['avgNetThr']
362 spiga 1.10
363 gutsche 1.2 # send to DashBoard
364 spiga 1.10 apmonSend(self.MonitorID, self.MonitorJobID, dashboard_report)
365 gutsche 1.2 apmonFree()
366    
367 spiga 1.10 if self.debug == 1 : print dashboard_report
368 gutsche 1.2
369 spiga 1.10 return
370 gutsche 1.5
371 spiga 1.10 def usage(self):
372    
373     msg="""
374     required parameters:
375     --input : input FJR xml file
376    
377     optional parameters:
378     --dashboard : send info to the dashboard. require following args: "MonitorID,MonitorJobID"
379     MonitorID : DashBoard MonitorID
380     MonitorJobID : DashBoard MonitorJobID
381     --exitcode : print executable exit code
382     --lfn : report list of files really analyzed
383     --help : help
384     --debug : debug statements
385     """
386     return msg
387 gutsche 1.1
388    
389     if __name__ == '__main__' :
390 spiga 1.10 try:
391     parseFjr_ = parseFjr(sys.argv[1:])
392     parseFjr_.run()
393     except:
394     pass