ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/parseCrabFjr.py
Revision: 1.16
Committed: Wed Mar 23 10:48:39 2011 UTC (14 years, 1 month ago) by spiga
Content type: text/x-python
Branch: MAIN
Changes since 1.15: +1 -0 lines
Log Message:
set default for inputBlocks

File Contents

# User Rev Content
1 gutsche 1.1 #!/usr/bin/env python
2    
3 spiga 1.15 import sys, getopt, string, os
4 gutsche 1.1
5 fanzago 1.6 from ProdCommon.FwkJobRep.ReportParser import readJobReport
6 gutsche 1.2 from DashboardAPI import apmonSend, apmonFree
7    
8 spiga 1.10 class parseFjr:
9     def __init__(self, argv):
10     """
11     parseCrabFjr
12    
13     - parse CRAB FrameworkJobReport on WN: { 'protocol' : { 'action' : [attempted,succedeed,total-size,total-time,min-time,max-time] , ... } , ... }
14     - report parameters to DashBoard using DashBoardApi.py: for all 'read' actions of all protocols, report MBPS
15     - return ExitStatus and dump of DashBoard report separated by semi-colon to WN wrapper script
16     """
17     # defaults
18     self.input = ''
19     self.MonitorID = ''
20     self.MonitorJobID = ''
21     self.info2dash = False
22     self.exitCode = False
23     self.lfnList = False
24     self.debug = 0
25     try:
26 spiga 1.15 opts, args = getopt.getopt(argv, "", ["input=", "dashboard=", "exitcode", "lfn" , "debug", "popularity=", "help"])
27 spiga 1.10 except getopt.GetoptError:
28     print self.usage()
29     sys.exit(2)
30     self.check(opts)
31    
32     return
33    
34     def check(self,opts):
35     # check command line parameter
36     for opt, arg in opts :
37     if opt == "--help" :
38     print self.usage()
39     sys.exit()
40     elif opt == "--input" :
41     self.input = arg
42     elif opt == "--exitcode":
43     self.exitCode = True
44     elif opt == "--lfn":
45     self.lfnList = True
46 spiga 1.15 elif opt == "--popularity":
47     self.popularity = True
48     try:
49     self.MonitorID = arg.split(",")[0]
50     self.MonitorJobID = arg.split(",")[1]
51     self.inputInfos = arg.split(",")[2]
52     except:
53     self.MonitorID = ''
54     self.MonitorJobID = ''
55     self.inputInfos = ''
56 spiga 1.10 elif opt == "--dashboard":
57     self.info2dash = True
58     try:
59     self.MonitorID = arg.split(",")[0]
60     self.MonitorJobID = arg.split(",")[1]
61     except:
62     self.MonitorID = ''
63     self.MonitorJobID = ''
64     elif opt == "--debug" :
65     self.debug = 1
66    
67 spiga 1.15 if self.input == '' or (not self.info2dash and not self.lfnList and not self.exitCode and not self.popularity) :
68 spiga 1.10 print self.usage()
69 gutsche 1.1 sys.exit()
70 spiga 1.10
71     if self.info2dash:
72     if self.MonitorID == '' or self.MonitorJobID == '':
73     print self.usage()
74     sys.exit()
75     return
76    
77     def run(self):
78    
79     # load FwkJobRep
80 fanzago 1.14 try:
81     jobReport = readJobReport(self.input)[0]
82     except:
83     print '50115'
84     sys.exit()
85    
86 spiga 1.10 if self.exitCode :
87     self.exitCodes(jobReport)
88     if self.lfnList :
89     self.lfn_List(jobReport)
90     if self.info2dash :
91     self.reportDash(jobReport)
92 spiga 1.15 if self.popularity:
93     self.popularityInfos(jobReport)
94 spiga 1.10 return
95    
96     def exitCodes(self, jobReport):
97    
98     exit_status = ''
99     ##### temporary fix for FJR incomplete ####
100     fjr = open (self.input)
101     len_fjr = len(fjr.readlines())
102     if (len_fjr <= 6):
103     ### 50115 - cmsRun did not produce a valid/readable job report at runtime
104     exit_status = str(50115)
105     else:
106     # get ExitStatus of last error
107     if len(jobReport.errors) != 0 :
108     exit_status = str(jobReport.errors[-1]['ExitStatus'])
109     else :
110     exit_status = str(0)
111     #check exit code
112     if string.strip(exit_status) == '': exit_status = -999
113     print exit_status
114    
115     return
116    
117     def lfn_List(self,jobReport):
118     '''
119     get list of analyzed files
120     '''
121     lfnlist = [x['LFN'] for x in jobReport.inputFiles]
122     for file in lfnlist: print file
123     return
124    
125     def storageStat(self,jobReport):
126     '''
127     get i/o statistics
128     '''
129     storageStatistics = str(jobReport.storageStatistics)
130     storage_report = {}
131 farinafa 1.11
132 spiga 1.10 # check if storageStatistics is valid
133     if storageStatistics.find('Storage statistics:') != -1 :
134     # report form: { 'protocol' : { 'action' : [attempted,succedeed,total-size,total-time,min-time,max-time] , ... } , ... }
135     data = storageStatistics.split('Storage statistics:')[1]
136     data_fields = data.split(';')
137     for data_field in data_fields:
138     # parse: format protocol/action = attepted/succedeed/total-size/total-time/min-time/max-time
139     if data_field == ' ' or not data_field or data_field == '':
140     continue
141     key = data_field.split('=')[0].strip()
142     item = data_field.split('=')[1].strip()
143     protocol = str(key.split('/')[0].strip())
144     action = str(key.split('/')[1].strip())
145     item_array = item.split('/')
146     attempted = str(item_array[0].strip())
147     succeeded = str(item_array[1].strip())
148     total_size = str(item_array[2].strip().split('MB')[0])
149     total_time = str(item_array[3].strip().split('ms')[0])
150     min_time = str(item_array[4].strip().split('ms')[0])
151     max_time = str(item_array[5].strip().split('ms')[0])
152     # add to report
153     if protocol in storage_report.keys() :
154     if action in storage_report[protocol].keys() :
155     print 'protocol/action:',protocol,'/',action,'listed twice in report, taking the first'
156     else :
157     storage_report[protocol][action] = [attempted,succeeded,total_size,total_time,min_time,max_time]
158 gutsche 1.2 else :
159 spiga 1.10 storage_report[protocol] = {action : [attempted,succeeded,total_size,total_time,min_time,max_time] }
160    
161     if self.debug :
162     for protocol in storage_report.keys() :
163     print 'protocol:',protocol
164     for action in storage_report[protocol].keys() :
165     print 'action:',action,'measurement:',storage_report[protocol][action]
166 farinafa 1.11
167     #####
168     # throughput measurements # Fabio
169     throughput_report = { 'rSize':0.0, 'rTime':0.0, 'wSize':0.0, 'wTime':0.0 }
170     for protocol in storage_report.keys() :
171     for action in storage_report[protocol].keys():
172     # not interesting
173     if 'read' not in action and 'write' not in action and 'seek' not in action:
174     continue
175    
176     # convert values
177     try:
178     sizeValue = float(storage_report[protocol][action][2])
179     timeValue = float(storage_report[protocol][action][3])
180     except Exception,e:
181     continue
182 spiga 1.10
183 farinafa 1.11 # aggregate data
184     if 'read' in action:
185     throughput_report['rSize'] += sizeValue
186     throughput_report['rTime'] += timeValue
187     elif 'write' in action:
188     throughput_report['wSize'] += sizeValue
189     throughput_report['wTime'] += timeValue
190     elif 'seek' in action:
191     throughput_report['rTime'] += timeValue
192     else:
193     continue
194    
195     # calculate global throughput
196     throughput_report['readThr'] = 'NULL'
197     if throughput_report['rTime'] > 0.0:
198     throughput_report['rTime'] /= 1000.0 # scale ms to s
199     throughput_report['readThr'] = float(throughput_report['rSize']/throughput_report['rTime'])
200    
201     throughput_report['avgNetThr'] = 'NULL'
202     try:
203     throughput_report['avgNetThr'] = throughput_report['rSize'] / float(jobReport.performance.summaries['ExeTime'])
204     except:
205     pass
206    
207     throughput_report['writeThr'] = 'NULL'
208     if throughput_report['wTime'] > 0.0:
209     throughput_report['wTime'] /= 1000.0
210     throughput_report['writeThr'] = float(throughput_report['wSize']/throughput_report['wTime'])
211     #####
212    
213     if self.debug == 1 :
214     print storage_report
215     print throughput_report
216     return storage_report, throughput_report
217 spiga 1.10
218 spiga 1.15 def popularityInfos(self, jobReport):
219     report_dict = {}
220     inputList = []
221     inputParentList = []
222 spiga 1.16 report_dict['inputBlocks'] = ''
223 spiga 1.15 if (os.path.exists(self.inputInfos)):
224     file=open(self.inputInfos,'r')
225     lines = file.readlines()
226     for line in lines:
227     if line.find("inputBlocks")>=0:
228     report_dict['inputBlocks']= line.split("=")[1].strip()
229     if line.find("inputFiles")>=0:
230     inputList = line.split("=")[1].strip().split(";")
231     if line.find("parentFiles")>=0:
232     inputParentList = line.split("=")[1].strip().split(";")
233     file.close()
234     basename = ''
235     if len(inputList):
236     basename = os.path.commonprefix(inputList)
237     basenameParent = ''
238     if len(inputParentList):
239     basenameParent = os.path.commonprefix(inputParentList)
240    
241     readFile = {}
242    
243     readFileParent = {}
244     for inputFile in jobReport.inputFiles:
245     if inputFile['LFN'].find(basename) >=0:
246     fileAttr = (inputFile.get("FileType"), "Local", inputFile.get("Runs"))
247     readFile[inputFile.get("LFN").split(basename)[1]] = fileAttr
248     else:
249     fileParentAttr = (inputFile.get("FileType"), "Local", inputFile.get("Runs"))
250     readParentFile[inputFile.get("LFN").split(basenameParent)[1]] = fileParentAttr
251     cleanedinputList = []
252     for file in inputList:
253     cleanedinputList.append(file.split(basename)[1])
254     cleanedParentList = []
255     for file in inputParentList:
256     cleanedParentList.append(file.split(basenameParent)[1])
257    
258     inputString = ''
259     LumisString = ''
260     for f,t in readFile.items():
261     cleanedinputList.remove(f)
262     inputString += '%s::%d::%s::%s;'%(f,1,t[0],t[1])
263     LumisString += '%s::%s;'%(t[2].keys()[0],t[2].values()[0])
264    
265     inputParentString = ''
266     LumisParentString = ''
267     for fp,tp in readFileParent.items():
268     cleanedParentList.remove(fp)
269     inputParentString += '%s::%d::%s::%s;'%(fp,1,tp[0],tp[1])
270     LumisParentString += '%s::%s;'%(tp[2].keys()[0],tp[2].values()[0])
271    
272     if len(cleanedinputList):
273     for file in cleanedinputList :
274     if len(jobReport.errors):
275     if jobReport.errors[0]["Description"].find(file) >= 0:
276     inputString += '%s::%d::%s::%s;'%(file,0,'Unknown','Local')
277     else:
278     inputString += '%s::%d::%s::%s;'%(file,2,'Unknown','Unknown')
279     else:
280     inputString += '%s::%d::%s::%s;'%(file,2,'Unknown','Unknown')
281    
282     if len(cleanedParentList):
283     for file in cleanedParentList :
284     if len(jobReport.errors):
285     if jobReport.errors[0]["Description"].find(file) >= 0:
286     inputString += '%s::%d::%s::%s;'%(file,0,'Unknown','Local')
287     else:
288     inputString += '%s::%d::%s::%s;'%(file,2,'Unknown','Unknown')
289     else:
290     inputParentString += '%s::%d::%s::%s;'%(file,2,'Unknown','Unknown')
291    
292     report_dict['inputFiles']= inputString
293     report_dict['parentFIles']= inputParentString
294     report_dict['lumisRange']= LumisString
295     report_dict['lumisParentRange']= LumisParentString
296     report_dict['Basename']= basename
297     report_dict['BasenameParent']= basenameParent
298    
299     # send to DashBoard
300     apmonSend(self.MonitorID, self.MonitorJobID, report_dict)
301     apmonFree()
302    
303     if self.debug == 1 :
304     for k,v in report_dict.items():
305     print "%s : %s"%(k,v)
306     return
307    
308 spiga 1.10 def n_of_events(self,jobReport):
309     '''
310     #Brian's patch to sent number of events procedded to the Dashboard
311     # Add NoEventsPerRun to the Dashboard report
312     '''
313     event_report = {}
314     eventsPerRun = 0
315     for inputFile in jobReport.inputFiles:
316     try:
317     eventsRead = str(inputFile.get('EventsRead', 0))
318     eventsRead = int(eventsRead.strip())
319     except:
320     continue
321     eventsPerRun += eventsRead
322     event_report['NoEventsPerRun'] = eventsPerRun
323     event_report['NbEvPerRun'] = eventsPerRun
324     event_report['NEventsProcessed'] = eventsPerRun
325    
326     if self.debug == 1 : print event_report
327    
328     return event_report
329    
330     def reportDash(self,jobReport):
331     '''
332     dashboard report dictionary
333     '''
334     event_report = self.n_of_events(jobReport)
335 farinafa 1.11 storage_report, throughput_report = self.storageStat(jobReport)
336 spiga 1.10 dashboard_report = {}
337     #
338     for k,v in event_report.iteritems() :
339     dashboard_report[k]=v
340 gutsche 1.2
341     # extract information to be sent to DashBoard
342     # per protocol and for action=read, calculate MBPS
343     # dashboard key is io_action
344 spiga 1.10 dashboard_report['MonitorID'] = self.MonitorID
345     dashboard_report['MonitorJobID'] = self.MonitorJobID
346     for protocol in storage_report.keys() :
347     for action in storage_report[protocol].keys() :
348     try: size = float(storage_report[protocol][action][2])
349 gutsche 1.3 except: size = 'NULL'
350 spiga 1.10 try: time = float(storage_report[protocol][action][3])/1000
351 gutsche 1.3 except: time = 'NULL'
352 farinafa 1.13 dashboard_report['io_'+protocol+'_'+action] = str(size)+'_'+str(time)
353 spiga 1.10 if self.debug :
354 gutsche 1.3 ordered = dashboard_report.keys()
355     ordered.sort()
356     for key in ordered:
357     print key,'=',dashboard_report[key]
358 farinafa 1.11
359     # IO throughput information
360     dashboard_report['io_read_throughput'] = throughput_report['readThr']
361     dashboard_report['io_write_throughput'] = throughput_report['writeThr']
362     dashboard_report['io_netAvg_throughput'] = throughput_report['avgNetThr']
363 spiga 1.10
364 gutsche 1.2 # send to DashBoard
365 spiga 1.10 apmonSend(self.MonitorID, self.MonitorJobID, dashboard_report)
366 gutsche 1.2 apmonFree()
367    
368 spiga 1.10 if self.debug == 1 : print dashboard_report
369 gutsche 1.2
370 spiga 1.10 return
371 gutsche 1.5
372 spiga 1.10 def usage(self):
373    
374     msg="""
375     required parameters:
376     --input : input FJR xml file
377    
378     optional parameters:
379     --dashboard : send info to the dashboard. require following args: "MonitorID,MonitorJobID"
380     MonitorID : DashBoard MonitorID
381     MonitorJobID : DashBoard MonitorJobID
382     --exitcode : print executable exit code
383     --lfn : report list of files really analyzed
384     --help : help
385     --debug : debug statements
386     """
387     return msg
388 gutsche 1.1
389    
390     if __name__ == '__main__' :
391 spiga 1.10 try:
392     parseFjr_ = parseFjr(sys.argv[1:])
393     parseFjr_.run()
394     except:
395     pass