ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/parseCrabFjr.py
Revision: 1.14
Committed: Wed Jul 2 16:07:30 2008 UTC (16 years, 9 months ago) by fanzago
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_7_8_dash, CRAB_2_7_7_patch1, CRAB_2_7_7_patch1_pre1, CRAB_2_7_8_pre1, CRAB_2_7_7, CRAB_2_7_7_pre2, CRAB_2_7_7_pre1, CRAB_2_7_6_patch1, CRAB_2_7_6, CRAB_2_7_6_pre1, CRAB_2_7_5_patch1, CRAB_2_7_5, CRAB_2_7_5_pre3, CRAB_2_7_5_pre2, CRAB_2_7_5_pre1, CRAB_2_7_4_patch1, CRAB_2_7_4, CRAB_2_7_4_pre6, CRAB_2_7_4_pre5, CRAB_2_7_4_pre4, CRAB_2_7_4_pre3, CRAB_2_7_4_pre2, CRAB_2_7_4_pre1, CRAB_2_7_3, CRAB_2_7_3_pre3, CRAB_2_7_3_pre3_beta, CRAB_2_7_3_pre2, CRAB_2_7_3_pre2_beta, CRAB_2_7_3_pre1, CRAB_2_7_3_beta3, CRAB_2_7_3_beta2, CRAB_2_7_3_beta1, CRAB_2_7_3_beta, CRAB_2_7_2_p1, CRAB_2_7_1_branch_firstMERGE, CRAB_2_7_2, CRAB_2_7_2_pre4, CRAB_2_7_2_pre3, CRAB_2_7_2_pre2, CRAB_2_7_2_pre1, CRAB_2_7_1, fede_170310, CRAB_2_7_1_pre12, CRAB_2_7_1_pre11, CRAB_2_7_1_pre10, CRAB_2_7_1_pre9, CRAB_LumiMask, CRAB_2_7_lumi, from_LimiMask, CRAB_2_7_1_pre8, CRAB_2_7_1_pre6, CRAB_2_7_1_pre5, CRAB_2_7_1_wmbs_pre4, CRAB_2_7_1_pre4, CRAB_2_7_1_pre3, CRAB_2_6_6_pre6, CRAB_2_7_1_pre2, CRAB_2_6_6_pre5, CRAB_2_7_1_pre1, CRAB_2_6_6_pre4, CRAB_2_6_6_pre3, CRAB_2_6_6_pre2, CRAB_2_6_6_check, CRAB_2_6_6, CRAB_2_6_6_pre1, CRAB_2_7_0, CRAB_2_6_5, CRAB_2_7_0_pre8, CRAB_2_6_5_pre1, CRAB_2_7_0_pre7, CRAB_2_6_4, CRAB_2_7_0_pre6, CRAB_2_6_4_pre1, CRAB_2_7_0_pre5, CRAB_2_6_3_patch_2, CRAB_2_6_3_patch_2_pre2, CRAB_2_6_3_patch_2_pre1, CRAB_2_6_3_patch_1, CRAB_2_7_0_pre4, CRAB_2_7_0_pre3, CRAB_2_6_3, CRAB_2_6_3_pre5, CRAB_2_6_3_pre4, CRAB_2_6_3_pre3, CRAB_2_6_3_pre2, CRAB_2_7_0_pre2, CRAB_2_6_3_pre1, test_1, CRAB_2_7_0_pre1, CRAB_2_6_2, CRAB_2_6_2_pre2, CRAB_2_6_2_pre1, CRAB_2_6_1_pre4, CRAB_2_6_1_pre3, CRAB_2_6_1_pre2, CRAB_2_6_1_pre1, CRAB_2_6_1, CRAB_2_6_0, CRAB_2_6_0_pre14, CRAB_2_6_0_pre13, CRAB_2_6_0_pre12, CRAB_2_6_0_pre11, CRAB_2_6_0_pre10, CRAB_2_6_0_pre9, CRAB_2_6_0_pre8, CRAB_2_6_0_pre7, CRAB_2_6_0_pre6, CRAB_2_6_0_pre5, CRAB_2_6_0_pre4, CRAB_2_6_0_pre3, CRAB_2_6_0_pre2, CRAB_2_6_0_pre1, CRAB_2_5_1, CRAB_2_5_1_pre4, CRAB_2_5_1_pre3, CRAB_2_5_1_pre2, CRAB_2_5_1_pre1, CRAB_2_5_0, CRAB_2_5_0_pre7, CRAB_2_5_0_pre6, CRAB_2_5_0_pre5, CRAB_2_5_0_pre4, CRAB_2_5_0_pre3, CRAB_2_5_0_pre2, CRAB_2_5_0_pre1, CRAB_2_4_4, CRAB_2_4_4_pre6, CRAB_2_4_4_pre5, CRAB_2_4_4_pre4, CRAB_2_4_4_pre3, CRAB_2_4_4_pre2, CRAB_2_4_4_pre1, CRAB_2_4_3, CRAB_2_4_3_pre8, CRAB_2_4_3_pre7, CRAB_2_4_3_pre6, CRAB_2_4_3_pre5, CRAB_2_4_3_pre3, CRAB_2_4_3_pre2, CRAB_2_4_3_pre1, CRAB_2_4_2, CRAB_2_4_2_pre3, CRAB_2_4_2_pre2, CRAB_2_4_2_pre1, CRAB_2_4_1, CRAB_2_4_1_pre4, CRAB_2_4_1_pre3, CRAB_2_4_1_pre2, CRAB_2_4_1_pre1, CRAB_2_4_0_Tutorial, CRAB_2_4_0_Tutorial_pre1, CRAB_2_4_0, CRAB_2_4_0_pre9, CRAB_2_4_0_pre8, CRAB_2_4_0_pre7, CRAB_2_4_0_pre6, CRAB_2_4_0_pre5, CRAB_2_4_0_pre4, CRAB_2_4_0_pre3, CRAB_2_4_0_pre2, CRAB_2_4_0_pre1, CRAB_DLS_PHED1, CRAB_DLS_PHED, CRAB_2_3_2_Fnal, CRAB_2_3_2, CRAB_2_3_2_pre7, CRAB_2_3_2_pre5, CRAB_2_3_2_pre4, CRAB_2_3_2_pre3, CRAB_2_3_2_pre2, CRAB_2_3_2_pre1, CRAB_2_4_0_test
Branch point for: CRAB_multiout, CRAB_2_7_1_branch, Lumi2_8, CRAB_2_6_X_br, AnaDataSet
Changes since 1.13: +6 -1 lines
Log Message:
added try-except for the case of fjr containing only <AnalysisFile> info. Exit_code 50115

File Contents

# User Rev Content
1 gutsche 1.1 #!/usr/bin/env python
2    
3     import sys, getopt, string
4    
5 fanzago 1.6 from ProdCommon.FwkJobRep.ReportParser import readJobReport
6 gutsche 1.2 from DashboardAPI import apmonSend, apmonFree
7    
8 spiga 1.10 class parseFjr:
9     def __init__(self, argv):
10     """
11     parseCrabFjr
12    
13     - parse CRAB FrameworkJobReport on WN: { 'protocol' : { 'action' : [attempted,succedeed,total-size,total-time,min-time,max-time] , ... } , ... }
14     - report parameters to DashBoard using DashBoardApi.py: for all 'read' actions of all protocols, report MBPS
15     - return ExitStatus and dump of DashBoard report separated by semi-colon to WN wrapper script
16     """
17     # defaults
18     self.input = ''
19     self.MonitorID = ''
20     self.MonitorJobID = ''
21     self.info2dash = False
22     self.exitCode = False
23     self.lfnList = False
24     self.debug = 0
25     try:
26     opts, args = getopt.getopt(argv, "", ["input=", "dashboard=", "exitcode", "lfn" , "debug", "help"])
27     except getopt.GetoptError:
28     print self.usage()
29     sys.exit(2)
30     self.check(opts)
31    
32     return
33    
34     def check(self,opts):
35     # check command line parameter
36     for opt, arg in opts :
37     if opt == "--help" :
38     print self.usage()
39     sys.exit()
40     elif opt == "--input" :
41     self.input = arg
42     elif opt == "--exitcode":
43     self.exitCode = True
44     elif opt == "--lfn":
45     self.lfnList = True
46     elif opt == "--dashboard":
47     self.info2dash = True
48     try:
49     self.MonitorID = arg.split(",")[0]
50     self.MonitorJobID = arg.split(",")[1]
51     except:
52     self.MonitorID = ''
53     self.MonitorJobID = ''
54     elif opt == "--debug" :
55     self.debug = 1
56    
57     if self.input == '' or (not self.info2dash and not self.lfnList and not self.exitCode) :
58     print self.usage()
59 gutsche 1.1 sys.exit()
60 spiga 1.10
61     if self.info2dash:
62     if self.MonitorID == '' or self.MonitorJobID == '':
63     print self.usage()
64     sys.exit()
65     return
66    
67     def run(self):
68    
69     # load FwkJobRep
70 fanzago 1.14 try:
71     jobReport = readJobReport(self.input)[0]
72     except:
73     print '50115'
74     sys.exit()
75    
76 spiga 1.10 if self.exitCode :
77     self.exitCodes(jobReport)
78     if self.lfnList :
79     self.lfn_List(jobReport)
80     if self.info2dash :
81     self.reportDash(jobReport)
82     return
83    
84     def exitCodes(self, jobReport):
85    
86     exit_status = ''
87     ##### temporary fix for FJR incomplete ####
88     fjr = open (self.input)
89     len_fjr = len(fjr.readlines())
90     if (len_fjr <= 6):
91     ### 50115 - cmsRun did not produce a valid/readable job report at runtime
92     exit_status = str(50115)
93     else:
94     # get ExitStatus of last error
95     if len(jobReport.errors) != 0 :
96     exit_status = str(jobReport.errors[-1]['ExitStatus'])
97     else :
98     exit_status = str(0)
99     #check exit code
100     if string.strip(exit_status) == '': exit_status = -999
101     print exit_status
102    
103     return
104    
105     def lfn_List(self,jobReport):
106     '''
107     get list of analyzed files
108     '''
109     lfnlist = [x['LFN'] for x in jobReport.inputFiles]
110     for file in lfnlist: print file
111     return
112    
113     def storageStat(self,jobReport):
114     '''
115     get i/o statistics
116     '''
117     storageStatistics = str(jobReport.storageStatistics)
118     storage_report = {}
119 farinafa 1.11
120 spiga 1.10 # check if storageStatistics is valid
121     if storageStatistics.find('Storage statistics:') != -1 :
122     # report form: { 'protocol' : { 'action' : [attempted,succedeed,total-size,total-time,min-time,max-time] , ... } , ... }
123     data = storageStatistics.split('Storage statistics:')[1]
124     data_fields = data.split(';')
125     for data_field in data_fields:
126     # parse: format protocol/action = attepted/succedeed/total-size/total-time/min-time/max-time
127     if data_field == ' ' or not data_field or data_field == '':
128     continue
129     key = data_field.split('=')[0].strip()
130     item = data_field.split('=')[1].strip()
131     protocol = str(key.split('/')[0].strip())
132     action = str(key.split('/')[1].strip())
133     item_array = item.split('/')
134     attempted = str(item_array[0].strip())
135     succeeded = str(item_array[1].strip())
136     total_size = str(item_array[2].strip().split('MB')[0])
137     total_time = str(item_array[3].strip().split('ms')[0])
138     min_time = str(item_array[4].strip().split('ms')[0])
139     max_time = str(item_array[5].strip().split('ms')[0])
140     # add to report
141     if protocol in storage_report.keys() :
142     if action in storage_report[protocol].keys() :
143     print 'protocol/action:',protocol,'/',action,'listed twice in report, taking the first'
144     else :
145     storage_report[protocol][action] = [attempted,succeeded,total_size,total_time,min_time,max_time]
146 gutsche 1.2 else :
147 spiga 1.10 storage_report[protocol] = {action : [attempted,succeeded,total_size,total_time,min_time,max_time] }
148    
149     if self.debug :
150     for protocol in storage_report.keys() :
151     print 'protocol:',protocol
152     for action in storage_report[protocol].keys() :
153     print 'action:',action,'measurement:',storage_report[protocol][action]
154 farinafa 1.11
155     #####
156     # throughput measurements # Fabio
157     throughput_report = { 'rSize':0.0, 'rTime':0.0, 'wSize':0.0, 'wTime':0.0 }
158     for protocol in storage_report.keys() :
159     for action in storage_report[protocol].keys():
160     # not interesting
161     if 'read' not in action and 'write' not in action and 'seek' not in action:
162     continue
163    
164     # convert values
165     try:
166     sizeValue = float(storage_report[protocol][action][2])
167     timeValue = float(storage_report[protocol][action][3])
168     except Exception,e:
169     continue
170 spiga 1.10
171 farinafa 1.11 # aggregate data
172     if 'read' in action:
173     throughput_report['rSize'] += sizeValue
174     throughput_report['rTime'] += timeValue
175     elif 'write' in action:
176     throughput_report['wSize'] += sizeValue
177     throughput_report['wTime'] += timeValue
178     elif 'seek' in action:
179     throughput_report['rTime'] += timeValue
180     else:
181     continue
182    
183     # calculate global throughput
184     throughput_report['readThr'] = 'NULL'
185     if throughput_report['rTime'] > 0.0:
186     throughput_report['rTime'] /= 1000.0 # scale ms to s
187     throughput_report['readThr'] = float(throughput_report['rSize']/throughput_report['rTime'])
188    
189     throughput_report['avgNetThr'] = 'NULL'
190     try:
191     throughput_report['avgNetThr'] = throughput_report['rSize'] / float(jobReport.performance.summaries['ExeTime'])
192     except:
193     pass
194    
195     throughput_report['writeThr'] = 'NULL'
196     if throughput_report['wTime'] > 0.0:
197     throughput_report['wTime'] /= 1000.0
198     throughput_report['writeThr'] = float(throughput_report['wSize']/throughput_report['wTime'])
199     #####
200    
201     if self.debug == 1 :
202     print storage_report
203     print throughput_report
204     return storage_report, throughput_report
205 spiga 1.10
206     def n_of_events(self,jobReport):
207     '''
208     #Brian's patch to sent number of events procedded to the Dashboard
209     # Add NoEventsPerRun to the Dashboard report
210     '''
211     event_report = {}
212     eventsPerRun = 0
213     for inputFile in jobReport.inputFiles:
214     try:
215     eventsRead = str(inputFile.get('EventsRead', 0))
216     eventsRead = int(eventsRead.strip())
217     except:
218     continue
219     eventsPerRun += eventsRead
220     event_report['NoEventsPerRun'] = eventsPerRun
221     event_report['NbEvPerRun'] = eventsPerRun
222     event_report['NEventsProcessed'] = eventsPerRun
223    
224     if self.debug == 1 : print event_report
225    
226     return event_report
227    
228     def reportDash(self,jobReport):
229     '''
230     dashboard report dictionary
231     '''
232     event_report = self.n_of_events(jobReport)
233 farinafa 1.11 storage_report, throughput_report = self.storageStat(jobReport)
234 spiga 1.10 dashboard_report = {}
235     #
236     for k,v in event_report.iteritems() :
237     dashboard_report[k]=v
238 gutsche 1.2
239     # extract information to be sent to DashBoard
240     # per protocol and for action=read, calculate MBPS
241     # dashboard key is io_action
242 spiga 1.10 dashboard_report['MonitorID'] = self.MonitorID
243     dashboard_report['MonitorJobID'] = self.MonitorJobID
244     for protocol in storage_report.keys() :
245     for action in storage_report[protocol].keys() :
246     try: size = float(storage_report[protocol][action][2])
247 gutsche 1.3 except: size = 'NULL'
248 spiga 1.10 try: time = float(storage_report[protocol][action][3])/1000
249 gutsche 1.3 except: time = 'NULL'
250 farinafa 1.13 dashboard_report['io_'+protocol+'_'+action] = str(size)+'_'+str(time)
251 spiga 1.10 if self.debug :
252 gutsche 1.3 ordered = dashboard_report.keys()
253     ordered.sort()
254     for key in ordered:
255     print key,'=',dashboard_report[key]
256 farinafa 1.11
257     # IO throughput information
258     dashboard_report['io_read_throughput'] = throughput_report['readThr']
259     dashboard_report['io_write_throughput'] = throughput_report['writeThr']
260     dashboard_report['io_netAvg_throughput'] = throughput_report['avgNetThr']
261 spiga 1.10
262 gutsche 1.2 # send to DashBoard
263 spiga 1.10 apmonSend(self.MonitorID, self.MonitorJobID, dashboard_report)
264 gutsche 1.2 apmonFree()
265    
266 spiga 1.10 if self.debug == 1 : print dashboard_report
267 gutsche 1.2
268 spiga 1.10 return
269 gutsche 1.5
270 spiga 1.10 def usage(self):
271    
272     msg="""
273     required parameters:
274     --input : input FJR xml file
275    
276     optional parameters:
277     --dashboard : send info to the dashboard. require following args: "MonitorID,MonitorJobID"
278     MonitorID : DashBoard MonitorID
279     MonitorJobID : DashBoard MonitorJobID
280     --exitcode : print executable exit code
281     --lfn : report list of files really analyzed
282     --help : help
283     --debug : debug statements
284     """
285     return msg
286 gutsche 1.1
287    
288     if __name__ == '__main__' :
289 spiga 1.10 try:
290     parseFjr_ = parseFjr(sys.argv[1:])
291     parseFjr_.run()
292     except:
293     pass