ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/parseCrabFjr.py
Revision: 1.12
Committed: Tue Jun 17 08:22:29 2008 UTC (16 years, 10 months ago) by farinafa
Content type: text/x-python
Branch: MAIN
Changes since 1.11: +1 -1 lines
Log Message:
The old IO throughput measurements are no longer sent to dashboard

File Contents

# User Rev Content
1 gutsche 1.1 #!/usr/bin/env python
2    
3     import sys, getopt, string
4    
5 fanzago 1.6 from ProdCommon.FwkJobRep.ReportParser import readJobReport
6 gutsche 1.2 from DashboardAPI import apmonSend, apmonFree
7    
8 spiga 1.10 class parseFjr:
9     def __init__(self, argv):
10     """
11     parseCrabFjr
12    
13     - parse CRAB FrameworkJobReport on WN: { 'protocol' : { 'action' : [attempted,succedeed,total-size,total-time,min-time,max-time] , ... } , ... }
14     - report parameters to DashBoard using DashBoardApi.py: for all 'read' actions of all protocols, report MBPS
15     - return ExitStatus and dump of DashBoard report separated by semi-colon to WN wrapper script
16     """
17     # defaults
18     self.input = ''
19     self.MonitorID = ''
20     self.MonitorJobID = ''
21     self.info2dash = False
22     self.exitCode = False
23     self.lfnList = False
24     self.debug = 0
25     try:
26     opts, args = getopt.getopt(argv, "", ["input=", "dashboard=", "exitcode", "lfn" , "debug", "help"])
27     except getopt.GetoptError:
28     print self.usage()
29     sys.exit(2)
30     self.check(opts)
31    
32     return
33    
34     def check(self,opts):
35     # check command line parameter
36     for opt, arg in opts :
37     if opt == "--help" :
38     print self.usage()
39     sys.exit()
40     elif opt == "--input" :
41     self.input = arg
42     elif opt == "--exitcode":
43     self.exitCode = True
44     elif opt == "--lfn":
45     self.lfnList = True
46     elif opt == "--dashboard":
47     self.info2dash = True
48     try:
49     self.MonitorID = arg.split(",")[0]
50     self.MonitorJobID = arg.split(",")[1]
51     except:
52     self.MonitorID = ''
53     self.MonitorJobID = ''
54     elif opt == "--debug" :
55     self.debug = 1
56    
57     if self.input == '' or (not self.info2dash and not self.lfnList and not self.exitCode) :
58     print self.usage()
59 gutsche 1.1 sys.exit()
60 spiga 1.10
61     if self.info2dash:
62     if self.MonitorID == '' or self.MonitorJobID == '':
63     print self.usage()
64     sys.exit()
65     return
66    
67     def run(self):
68    
69     # load FwkJobRep
70     jobReport = readJobReport(self.input)[0]
71     if self.exitCode :
72     self.exitCodes(jobReport)
73     if self.lfnList :
74     self.lfn_List(jobReport)
75     if self.info2dash :
76     self.reportDash(jobReport)
77     return
78    
79     def exitCodes(self, jobReport):
80    
81     exit_status = ''
82     ##### temporary fix for FJR incomplete ####
83     fjr = open (self.input)
84     len_fjr = len(fjr.readlines())
85     if (len_fjr <= 6):
86     ### 50115 - cmsRun did not produce a valid/readable job report at runtime
87     exit_status = str(50115)
88     else:
89     # get ExitStatus of last error
90     if len(jobReport.errors) != 0 :
91     exit_status = str(jobReport.errors[-1]['ExitStatus'])
92     else :
93     exit_status = str(0)
94     #check exit code
95     if string.strip(exit_status) == '': exit_status = -999
96     print exit_status
97    
98     return
99    
100     def lfn_List(self,jobReport):
101     '''
102     get list of analyzed files
103     '''
104     lfnlist = [x['LFN'] for x in jobReport.inputFiles]
105     for file in lfnlist: print file
106     return
107    
108     def storageStat(self,jobReport):
109     '''
110     get i/o statistics
111     '''
112     storageStatistics = str(jobReport.storageStatistics)
113     storage_report = {}
114 farinafa 1.11
115 spiga 1.10 # check if storageStatistics is valid
116     if storageStatistics.find('Storage statistics:') != -1 :
117     # report form: { 'protocol' : { 'action' : [attempted,succedeed,total-size,total-time,min-time,max-time] , ... } , ... }
118     data = storageStatistics.split('Storage statistics:')[1]
119     data_fields = data.split(';')
120     for data_field in data_fields:
121     # parse: format protocol/action = attepted/succedeed/total-size/total-time/min-time/max-time
122     if data_field == ' ' or not data_field or data_field == '':
123     continue
124     key = data_field.split('=')[0].strip()
125     item = data_field.split('=')[1].strip()
126     protocol = str(key.split('/')[0].strip())
127     action = str(key.split('/')[1].strip())
128     item_array = item.split('/')
129     attempted = str(item_array[0].strip())
130     succeeded = str(item_array[1].strip())
131     total_size = str(item_array[2].strip().split('MB')[0])
132     total_time = str(item_array[3].strip().split('ms')[0])
133     min_time = str(item_array[4].strip().split('ms')[0])
134     max_time = str(item_array[5].strip().split('ms')[0])
135     # add to report
136     if protocol in storage_report.keys() :
137     if action in storage_report[protocol].keys() :
138     print 'protocol/action:',protocol,'/',action,'listed twice in report, taking the first'
139     else :
140     storage_report[protocol][action] = [attempted,succeeded,total_size,total_time,min_time,max_time]
141 gutsche 1.2 else :
142 spiga 1.10 storage_report[protocol] = {action : [attempted,succeeded,total_size,total_time,min_time,max_time] }
143    
144     if self.debug :
145     for protocol in storage_report.keys() :
146     print 'protocol:',protocol
147     for action in storage_report[protocol].keys() :
148     print 'action:',action,'measurement:',storage_report[protocol][action]
149 farinafa 1.11
150     #####
151     # throughput measurements # Fabio
152     throughput_report = { 'rSize':0.0, 'rTime':0.0, 'wSize':0.0, 'wTime':0.0 }
153     for protocol in storage_report.keys() :
154     for action in storage_report[protocol].keys():
155     # not interesting
156     if 'read' not in action and 'write' not in action and 'seek' not in action:
157     continue
158    
159     # convert values
160     try:
161     sizeValue = float(storage_report[protocol][action][2])
162     timeValue = float(storage_report[protocol][action][3])
163     except Exception,e:
164     continue
165 spiga 1.10
166 farinafa 1.11 # aggregate data
167     if 'read' in action:
168     throughput_report['rSize'] += sizeValue
169     throughput_report['rTime'] += timeValue
170     elif 'write' in action:
171     throughput_report['wSize'] += sizeValue
172     throughput_report['wTime'] += timeValue
173     elif 'seek' in action:
174     throughput_report['rTime'] += timeValue
175     else:
176     continue
177    
178     # calculate global throughput
179     throughput_report['readThr'] = 'NULL'
180     if throughput_report['rTime'] > 0.0:
181     throughput_report['rTime'] /= 1000.0 # scale ms to s
182     throughput_report['readThr'] = float(throughput_report['rSize']/throughput_report['rTime'])
183    
184     throughput_report['avgNetThr'] = 'NULL'
185     try:
186     throughput_report['avgNetThr'] = throughput_report['rSize'] / float(jobReport.performance.summaries['ExeTime'])
187     except:
188     pass
189    
190     throughput_report['writeThr'] = 'NULL'
191     if throughput_report['wTime'] > 0.0:
192     throughput_report['wTime'] /= 1000.0
193     throughput_report['writeThr'] = float(throughput_report['wSize']/throughput_report['wTime'])
194     #####
195    
196     if self.debug == 1 :
197     print storage_report
198     print throughput_report
199     return storage_report, throughput_report
200 spiga 1.10
201     def n_of_events(self,jobReport):
202     '''
203     #Brian's patch to sent number of events procedded to the Dashboard
204     # Add NoEventsPerRun to the Dashboard report
205     '''
206     event_report = {}
207     eventsPerRun = 0
208     for inputFile in jobReport.inputFiles:
209     try:
210     eventsRead = str(inputFile.get('EventsRead', 0))
211     eventsRead = int(eventsRead.strip())
212     except:
213     continue
214     eventsPerRun += eventsRead
215     event_report['NoEventsPerRun'] = eventsPerRun
216     event_report['NbEvPerRun'] = eventsPerRun
217     event_report['NEventsProcessed'] = eventsPerRun
218    
219     if self.debug == 1 : print event_report
220    
221     return event_report
222    
223     def reportDash(self,jobReport):
224     '''
225     dashboard report dictionary
226     '''
227     event_report = self.n_of_events(jobReport)
228 farinafa 1.11 storage_report, throughput_report = self.storageStat(jobReport)
229 spiga 1.10 dashboard_report = {}
230     #
231     for k,v in event_report.iteritems() :
232     dashboard_report[k]=v
233 gutsche 1.2
234     # extract information to be sent to DashBoard
235     # per protocol and for action=read, calculate MBPS
236     # dashboard key is io_action
237 spiga 1.10 dashboard_report['MonitorID'] = self.MonitorID
238     dashboard_report['MonitorJobID'] = self.MonitorJobID
239     for protocol in storage_report.keys() :
240     for action in storage_report[protocol].keys() :
241     try: size = float(storage_report[protocol][action][2])
242 gutsche 1.3 except: size = 'NULL'
243 spiga 1.10 try: time = float(storage_report[protocol][action][3])/1000
244 gutsche 1.3 except: time = 'NULL'
245 farinafa 1.12 ## dashboard_report['io_'+protocol+'_'+action] = str(size)+'_'+str(time)
246 spiga 1.10 if self.debug :
247 gutsche 1.3 ordered = dashboard_report.keys()
248     ordered.sort()
249     for key in ordered:
250     print key,'=',dashboard_report[key]
251 farinafa 1.11
252     # IO throughput information
253     dashboard_report['io_read_throughput'] = throughput_report['readThr']
254     dashboard_report['io_write_throughput'] = throughput_report['writeThr']
255     dashboard_report['io_netAvg_throughput'] = throughput_report['avgNetThr']
256 spiga 1.10
257 gutsche 1.2 # send to DashBoard
258 spiga 1.10 apmonSend(self.MonitorID, self.MonitorJobID, dashboard_report)
259 gutsche 1.2 apmonFree()
260    
261 spiga 1.10 if self.debug == 1 : print dashboard_report
262 gutsche 1.2
263 spiga 1.10 return
264 gutsche 1.5
265 spiga 1.10 def usage(self):
266    
267     msg="""
268     required parameters:
269     --input : input FJR xml file
270    
271     optional parameters:
272     --dashboard : send info to the dashboard. require following args: "MonitorID,MonitorJobID"
273     MonitorID : DashBoard MonitorID
274     MonitorJobID : DashBoard MonitorJobID
275     --exitcode : print executable exit code
276     --lfn : report list of files really analyzed
277     --help : help
278     --debug : debug statements
279     """
280     return msg
281 gutsche 1.1
282    
283     if __name__ == '__main__' :
284 spiga 1.10 try:
285     parseFjr_ = parseFjr(sys.argv[1:])
286     parseFjr_.run()
287     except:
288     pass