ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/parseCrabFjr.py
Revision: 1.17
Committed: Wed Mar 23 16:48:53 2011 UTC (14 years, 1 month ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_7_8_pre2, CRAB_2_7_8_dash3, CRAB_2_7_8_dash2
Changes since 1.16: +18 -6 lines
Log Message:
fixes for additional infos reporting to dashboard

File Contents

# User Rev Content
1 gutsche 1.1 #!/usr/bin/env python
2    
3 spiga 1.15 import sys, getopt, string, os
4 gutsche 1.1
5 fanzago 1.6 from ProdCommon.FwkJobRep.ReportParser import readJobReport
6 gutsche 1.2 from DashboardAPI import apmonSend, apmonFree
7    
8 spiga 1.10 class parseFjr:
9     def __init__(self, argv):
10     """
11     parseCrabFjr
12    
13     - parse CRAB FrameworkJobReport on WN: { 'protocol' : { 'action' : [attempted,succedeed,total-size,total-time,min-time,max-time] , ... } , ... }
14     - report parameters to DashBoard using DashBoardApi.py: for all 'read' actions of all protocols, report MBPS
15     - return ExitStatus and dump of DashBoard report separated by semi-colon to WN wrapper script
16     """
17     # defaults
18     self.input = ''
19     self.MonitorID = ''
20     self.MonitorJobID = ''
21     self.info2dash = False
22     self.exitCode = False
23     self.lfnList = False
24     self.debug = 0
25     try:
26 spiga 1.15 opts, args = getopt.getopt(argv, "", ["input=", "dashboard=", "exitcode", "lfn" , "debug", "popularity=", "help"])
27 spiga 1.10 except getopt.GetoptError:
28     print self.usage()
29     sys.exit(2)
30     self.check(opts)
31    
32     return
33    
34     def check(self,opts):
35     # check command line parameter
36     for opt, arg in opts :
37     if opt == "--help" :
38     print self.usage()
39     sys.exit()
40     elif opt == "--input" :
41     self.input = arg
42     elif opt == "--exitcode":
43     self.exitCode = True
44     elif opt == "--lfn":
45     self.lfnList = True
46 spiga 1.15 elif opt == "--popularity":
47     self.popularity = True
48     try:
49     self.MonitorID = arg.split(",")[0]
50     self.MonitorJobID = arg.split(",")[1]
51     self.inputInfos = arg.split(",")[2]
52     except:
53     self.MonitorID = ''
54     self.MonitorJobID = ''
55     self.inputInfos = ''
56 spiga 1.10 elif opt == "--dashboard":
57     self.info2dash = True
58     try:
59     self.MonitorID = arg.split(",")[0]
60     self.MonitorJobID = arg.split(",")[1]
61     except:
62     self.MonitorID = ''
63     self.MonitorJobID = ''
64     elif opt == "--debug" :
65     self.debug = 1
66    
67 spiga 1.15 if self.input == '' or (not self.info2dash and not self.lfnList and not self.exitCode and not self.popularity) :
68 spiga 1.10 print self.usage()
69 gutsche 1.1 sys.exit()
70 spiga 1.10
71     if self.info2dash:
72     if self.MonitorID == '' or self.MonitorJobID == '':
73     print self.usage()
74     sys.exit()
75     return
76    
77     def run(self):
78    
79     # load FwkJobRep
80 fanzago 1.14 try:
81     jobReport = readJobReport(self.input)[0]
82     except:
83     print '50115'
84     sys.exit()
85    
86 spiga 1.10 if self.exitCode :
87     self.exitCodes(jobReport)
88     if self.lfnList :
89     self.lfn_List(jobReport)
90     if self.info2dash :
91     self.reportDash(jobReport)
92 spiga 1.15 if self.popularity:
93     self.popularityInfos(jobReport)
94 spiga 1.10 return
95    
96     def exitCodes(self, jobReport):
97    
98     exit_status = ''
99     ##### temporary fix for FJR incomplete ####
100     fjr = open (self.input)
101     len_fjr = len(fjr.readlines())
102     if (len_fjr <= 6):
103     ### 50115 - cmsRun did not produce a valid/readable job report at runtime
104     exit_status = str(50115)
105     else:
106     # get ExitStatus of last error
107     if len(jobReport.errors) != 0 :
108     exit_status = str(jobReport.errors[-1]['ExitStatus'])
109     else :
110     exit_status = str(0)
111     #check exit code
112     if string.strip(exit_status) == '': exit_status = -999
113     print exit_status
114    
115     return
116    
117     def lfn_List(self,jobReport):
118     '''
119     get list of analyzed files
120     '''
121     lfnlist = [x['LFN'] for x in jobReport.inputFiles]
122     for file in lfnlist: print file
123     return
124    
125     def storageStat(self,jobReport):
126     '''
127     get i/o statistics
128     '''
129     storageStatistics = str(jobReport.storageStatistics)
130     storage_report = {}
131 farinafa 1.11
132 spiga 1.10 # check if storageStatistics is valid
133     if storageStatistics.find('Storage statistics:') != -1 :
134     # report form: { 'protocol' : { 'action' : [attempted,succedeed,total-size,total-time,min-time,max-time] , ... } , ... }
135     data = storageStatistics.split('Storage statistics:')[1]
136     data_fields = data.split(';')
137     for data_field in data_fields:
138     # parse: format protocol/action = attepted/succedeed/total-size/total-time/min-time/max-time
139     if data_field == ' ' or not data_field or data_field == '':
140     continue
141     key = data_field.split('=')[0].strip()
142     item = data_field.split('=')[1].strip()
143     protocol = str(key.split('/')[0].strip())
144     action = str(key.split('/')[1].strip())
145     item_array = item.split('/')
146     attempted = str(item_array[0].strip())
147     succeeded = str(item_array[1].strip())
148     total_size = str(item_array[2].strip().split('MB')[0])
149     total_time = str(item_array[3].strip().split('ms')[0])
150     min_time = str(item_array[4].strip().split('ms')[0])
151     max_time = str(item_array[5].strip().split('ms')[0])
152     # add to report
153     if protocol in storage_report.keys() :
154     if action in storage_report[protocol].keys() :
155     print 'protocol/action:',protocol,'/',action,'listed twice in report, taking the first'
156     else :
157     storage_report[protocol][action] = [attempted,succeeded,total_size,total_time,min_time,max_time]
158 gutsche 1.2 else :
159 spiga 1.10 storage_report[protocol] = {action : [attempted,succeeded,total_size,total_time,min_time,max_time] }
160    
161     if self.debug :
162     for protocol in storage_report.keys() :
163     print 'protocol:',protocol
164     for action in storage_report[protocol].keys() :
165     print 'action:',action,'measurement:',storage_report[protocol][action]
166 farinafa 1.11
167     #####
168     # throughput measurements # Fabio
169     throughput_report = { 'rSize':0.0, 'rTime':0.0, 'wSize':0.0, 'wTime':0.0 }
170     for protocol in storage_report.keys() :
171     for action in storage_report[protocol].keys():
172     # not interesting
173     if 'read' not in action and 'write' not in action and 'seek' not in action:
174     continue
175    
176     # convert values
177     try:
178     sizeValue = float(storage_report[protocol][action][2])
179     timeValue = float(storage_report[protocol][action][3])
180     except Exception,e:
181     continue
182 spiga 1.10
183 farinafa 1.11 # aggregate data
184     if 'read' in action:
185     throughput_report['rSize'] += sizeValue
186     throughput_report['rTime'] += timeValue
187     elif 'write' in action:
188     throughput_report['wSize'] += sizeValue
189     throughput_report['wTime'] += timeValue
190     elif 'seek' in action:
191     throughput_report['rTime'] += timeValue
192     else:
193     continue
194    
195     # calculate global throughput
196     throughput_report['readThr'] = 'NULL'
197     if throughput_report['rTime'] > 0.0:
198     throughput_report['rTime'] /= 1000.0 # scale ms to s
199     throughput_report['readThr'] = float(throughput_report['rSize']/throughput_report['rTime'])
200    
201     throughput_report['avgNetThr'] = 'NULL'
202     try:
203     throughput_report['avgNetThr'] = throughput_report['rSize'] / float(jobReport.performance.summaries['ExeTime'])
204     except:
205     pass
206    
207     throughput_report['writeThr'] = 'NULL'
208     if throughput_report['wTime'] > 0.0:
209     throughput_report['wTime'] /= 1000.0
210     throughput_report['writeThr'] = float(throughput_report['wSize']/throughput_report['wTime'])
211     #####
212    
213     if self.debug == 1 :
214     print storage_report
215     print throughput_report
216     return storage_report, throughput_report
217 spiga 1.10
218 spiga 1.15 def popularityInfos(self, jobReport):
219     report_dict = {}
220     inputList = []
221     inputParentList = []
222 spiga 1.16 report_dict['inputBlocks'] = ''
223 spiga 1.15 if (os.path.exists(self.inputInfos)):
224     file=open(self.inputInfos,'r')
225     lines = file.readlines()
226     for line in lines:
227     if line.find("inputBlocks")>=0:
228     report_dict['inputBlocks']= line.split("=")[1].strip()
229     if line.find("inputFiles")>=0:
230     inputList = line.split("=")[1].strip().split(";")
231     if line.find("parentFiles")>=0:
232     inputParentList = line.split("=")[1].strip().split(";")
233     file.close()
234 spiga 1.17 if len(inputList) == 1 and inputList[0] == '':
235     inputList=[]
236     if len(inputParentList) == 1 and inputParentList[0] == '':
237     inputParentList=[]
238 spiga 1.15 basename = ''
239 spiga 1.17 if len(inputList) > 1:
240 spiga 1.15 basename = os.path.commonprefix(inputList)
241 spiga 1.17 elif len(inputList) == 1:
242     basename = "%s/"%os.path.dirname(inputList[0])
243 spiga 1.15 basenameParent = ''
244 spiga 1.17 if len(inputParentList) > 1:
245 spiga 1.15 basenameParent = os.path.commonprefix(inputParentList)
246 spiga 1.17 elif len(inputParentList) == 1:
247     basenameParent = "%s/"%os.path.dirname(inputParentList[0])
248    
249 spiga 1.15 readFile = {}
250    
251     readFileParent = {}
252 spiga 1.17 fileAttr = []
253     fileParentAttr = []
254 spiga 1.15 for inputFile in jobReport.inputFiles:
255     if inputFile['LFN'].find(basename) >=0:
256     fileAttr = (inputFile.get("FileType"), "Local", inputFile.get("Runs"))
257     readFile[inputFile.get("LFN").split(basename)[1]] = fileAttr
258     else:
259     fileParentAttr = (inputFile.get("FileType"), "Local", inputFile.get("Runs"))
260     readParentFile[inputFile.get("LFN").split(basenameParent)[1]] = fileParentAttr
261     cleanedinputList = []
262     for file in inputList:
263     cleanedinputList.append(file.split(basename)[1])
264     cleanedParentList = []
265     for file in inputParentList:
266     cleanedParentList.append(file.split(basenameParent)[1])
267    
268     inputString = ''
269     LumisString = ''
270     for f,t in readFile.items():
271     cleanedinputList.remove(f)
272     inputString += '%s::%d::%s::%s;'%(f,1,t[0],t[1])
273     LumisString += '%s::%s;'%(t[2].keys()[0],t[2].values()[0])
274    
275     inputParentString = ''
276     LumisParentString = ''
277     for fp,tp in readFileParent.items():
278     cleanedParentList.remove(fp)
279     inputParentString += '%s::%d::%s::%s;'%(fp,1,tp[0],tp[1])
280     LumisParentString += '%s::%s;'%(tp[2].keys()[0],tp[2].values()[0])
281    
282     if len(cleanedinputList):
283     for file in cleanedinputList :
284     if len(jobReport.errors):
285     if jobReport.errors[0]["Description"].find(file) >= 0:
286     inputString += '%s::%d::%s::%s;'%(file,0,'Unknown','Local')
287     else:
288     inputString += '%s::%d::%s::%s;'%(file,2,'Unknown','Unknown')
289     else:
290     inputString += '%s::%d::%s::%s;'%(file,2,'Unknown','Unknown')
291    
292     if len(cleanedParentList):
293     for file in cleanedParentList :
294     if len(jobReport.errors):
295     if jobReport.errors[0]["Description"].find(file) >= 0:
296     inputString += '%s::%d::%s::%s;'%(file,0,'Unknown','Local')
297     else:
298     inputString += '%s::%d::%s::%s;'%(file,2,'Unknown','Unknown')
299     else:
300     inputParentString += '%s::%d::%s::%s;'%(file,2,'Unknown','Unknown')
301    
302     report_dict['inputFiles']= inputString
303     report_dict['parentFIles']= inputParentString
304     report_dict['lumisRange']= LumisString
305     report_dict['lumisParentRange']= LumisParentString
306     report_dict['Basename']= basename
307     report_dict['BasenameParent']= basenameParent
308    
309     # send to DashBoard
310     apmonSend(self.MonitorID, self.MonitorJobID, report_dict)
311     apmonFree()
312    
313 spiga 1.17 # if self.debug == 1 :
314     print "Popularity start"
315     for k,v in report_dict.items():
316     print "%s : %s"%(k,v)
317     print "Popularity stop"
318 spiga 1.15 return
319    
320 spiga 1.10 def n_of_events(self,jobReport):
321     '''
322     #Brian's patch to sent number of events procedded to the Dashboard
323     # Add NoEventsPerRun to the Dashboard report
324     '''
325     event_report = {}
326     eventsPerRun = 0
327     for inputFile in jobReport.inputFiles:
328     try:
329     eventsRead = str(inputFile.get('EventsRead', 0))
330     eventsRead = int(eventsRead.strip())
331     except:
332     continue
333     eventsPerRun += eventsRead
334     event_report['NoEventsPerRun'] = eventsPerRun
335     event_report['NbEvPerRun'] = eventsPerRun
336     event_report['NEventsProcessed'] = eventsPerRun
337    
338     if self.debug == 1 : print event_report
339    
340     return event_report
341    
342     def reportDash(self,jobReport):
343     '''
344     dashboard report dictionary
345     '''
346     event_report = self.n_of_events(jobReport)
347 farinafa 1.11 storage_report, throughput_report = self.storageStat(jobReport)
348 spiga 1.10 dashboard_report = {}
349     #
350     for k,v in event_report.iteritems() :
351     dashboard_report[k]=v
352 gutsche 1.2
353     # extract information to be sent to DashBoard
354     # per protocol and for action=read, calculate MBPS
355     # dashboard key is io_action
356 spiga 1.10 dashboard_report['MonitorID'] = self.MonitorID
357     dashboard_report['MonitorJobID'] = self.MonitorJobID
358     for protocol in storage_report.keys() :
359     for action in storage_report[protocol].keys() :
360     try: size = float(storage_report[protocol][action][2])
361 gutsche 1.3 except: size = 'NULL'
362 spiga 1.10 try: time = float(storage_report[protocol][action][3])/1000
363 gutsche 1.3 except: time = 'NULL'
364 farinafa 1.13 dashboard_report['io_'+protocol+'_'+action] = str(size)+'_'+str(time)
365 spiga 1.10 if self.debug :
366 gutsche 1.3 ordered = dashboard_report.keys()
367     ordered.sort()
368     for key in ordered:
369     print key,'=',dashboard_report[key]
370 farinafa 1.11
371     # IO throughput information
372     dashboard_report['io_read_throughput'] = throughput_report['readThr']
373     dashboard_report['io_write_throughput'] = throughput_report['writeThr']
374     dashboard_report['io_netAvg_throughput'] = throughput_report['avgNetThr']
375 spiga 1.10
376 gutsche 1.2 # send to DashBoard
377 spiga 1.10 apmonSend(self.MonitorID, self.MonitorJobID, dashboard_report)
378 gutsche 1.2 apmonFree()
379    
380 spiga 1.10 if self.debug == 1 : print dashboard_report
381 gutsche 1.2
382 spiga 1.10 return
383 gutsche 1.5
384 spiga 1.10 def usage(self):
385    
386     msg="""
387     required parameters:
388     --input : input FJR xml file
389    
390     optional parameters:
391     --dashboard : send info to the dashboard. require following args: "MonitorID,MonitorJobID"
392     MonitorID : DashBoard MonitorID
393     MonitorJobID : DashBoard MonitorJobID
394     --exitcode : print executable exit code
395     --lfn : report list of files really analyzed
396     --help : help
397     --debug : debug statements
398     """
399     return msg
400 gutsche 1.1
401    
402     if __name__ == '__main__' :
403 spiga 1.10 try:
404     parseFjr_ = parseFjr(sys.argv[1:])
405     parseFjr_.run()
406     except:
407     pass