Revision: | 1.14 |
Committed: | Wed Jul 2 16:07:30 2008 UTC (16 years, 9 months ago) by fanzago |
Content type: | text/x-python |
Branch: | MAIN |
CVS Tags: | CRAB_2_7_8_dash, CRAB_2_7_7_patch1, CRAB_2_7_7_patch1_pre1, CRAB_2_7_8_pre1, CRAB_2_7_7, CRAB_2_7_7_pre2, CRAB_2_7_7_pre1, CRAB_2_7_6_patch1, CRAB_2_7_6, CRAB_2_7_6_pre1, CRAB_2_7_5_patch1, CRAB_2_7_5, CRAB_2_7_5_pre3, CRAB_2_7_5_pre2, CRAB_2_7_5_pre1, CRAB_2_7_4_patch1, CRAB_2_7_4, CRAB_2_7_4_pre6, CRAB_2_7_4_pre5, CRAB_2_7_4_pre4, CRAB_2_7_4_pre3, CRAB_2_7_4_pre2, CRAB_2_7_4_pre1, CRAB_2_7_3, CRAB_2_7_3_pre3, CRAB_2_7_3_pre3_beta, CRAB_2_7_3_pre2, CRAB_2_7_3_pre2_beta, CRAB_2_7_3_pre1, CRAB_2_7_3_beta3, CRAB_2_7_3_beta2, CRAB_2_7_3_beta1, CRAB_2_7_3_beta, CRAB_2_7_2_p1, CRAB_2_7_1_branch_firstMERGE, CRAB_2_7_2, CRAB_2_7_2_pre4, CRAB_2_7_2_pre3, CRAB_2_7_2_pre2, CRAB_2_7_2_pre1, CRAB_2_7_1, fede_170310, CRAB_2_7_1_pre12, CRAB_2_7_1_pre11, CRAB_2_7_1_pre10, CRAB_2_7_1_pre9, CRAB_LumiMask, CRAB_2_7_lumi, from_LimiMask, CRAB_2_7_1_pre8, CRAB_2_7_1_pre6, CRAB_2_7_1_pre5, CRAB_2_7_1_wmbs_pre4, CRAB_2_7_1_pre4, CRAB_2_7_1_pre3, CRAB_2_6_6_pre6, CRAB_2_7_1_pre2, CRAB_2_6_6_pre5, CRAB_2_7_1_pre1, CRAB_2_6_6_pre4, CRAB_2_6_6_pre3, CRAB_2_6_6_pre2, CRAB_2_6_6_check, CRAB_2_6_6, CRAB_2_6_6_pre1, CRAB_2_7_0, CRAB_2_6_5, CRAB_2_7_0_pre8, CRAB_2_6_5_pre1, CRAB_2_7_0_pre7, CRAB_2_6_4, CRAB_2_7_0_pre6, CRAB_2_6_4_pre1, CRAB_2_7_0_pre5, CRAB_2_6_3_patch_2, CRAB_2_6_3_patch_2_pre2, CRAB_2_6_3_patch_2_pre1, CRAB_2_6_3_patch_1, CRAB_2_7_0_pre4, CRAB_2_7_0_pre3, CRAB_2_6_3, CRAB_2_6_3_pre5, CRAB_2_6_3_pre4, CRAB_2_6_3_pre3, CRAB_2_6_3_pre2, CRAB_2_7_0_pre2, CRAB_2_6_3_pre1, test_1, CRAB_2_7_0_pre1, CRAB_2_6_2, CRAB_2_6_2_pre2, CRAB_2_6_2_pre1, CRAB_2_6_1_pre4, CRAB_2_6_1_pre3, CRAB_2_6_1_pre2, CRAB_2_6_1_pre1, CRAB_2_6_1, CRAB_2_6_0, CRAB_2_6_0_pre14, CRAB_2_6_0_pre13, CRAB_2_6_0_pre12, CRAB_2_6_0_pre11, CRAB_2_6_0_pre10, CRAB_2_6_0_pre9, CRAB_2_6_0_pre8, CRAB_2_6_0_pre7, CRAB_2_6_0_pre6, CRAB_2_6_0_pre5, CRAB_2_6_0_pre4, CRAB_2_6_0_pre3, CRAB_2_6_0_pre2, CRAB_2_6_0_pre1, CRAB_2_5_1, CRAB_2_5_1_pre4, CRAB_2_5_1_pre3, CRAB_2_5_1_pre2, CRAB_2_5_1_pre1, CRAB_2_5_0, CRAB_2_5_0_pre7, CRAB_2_5_0_pre6, CRAB_2_5_0_pre5, CRAB_2_5_0_pre4, CRAB_2_5_0_pre3, CRAB_2_5_0_pre2, CRAB_2_5_0_pre1, CRAB_2_4_4, CRAB_2_4_4_pre6, CRAB_2_4_4_pre5, CRAB_2_4_4_pre4, CRAB_2_4_4_pre3, CRAB_2_4_4_pre2, CRAB_2_4_4_pre1, CRAB_2_4_3, CRAB_2_4_3_pre8, CRAB_2_4_3_pre7, CRAB_2_4_3_pre6, CRAB_2_4_3_pre5, CRAB_2_4_3_pre3, CRAB_2_4_3_pre2, CRAB_2_4_3_pre1, CRAB_2_4_2, CRAB_2_4_2_pre3, CRAB_2_4_2_pre2, CRAB_2_4_2_pre1, CRAB_2_4_1, CRAB_2_4_1_pre4, CRAB_2_4_1_pre3, CRAB_2_4_1_pre2, CRAB_2_4_1_pre1, CRAB_2_4_0_Tutorial, CRAB_2_4_0_Tutorial_pre1, CRAB_2_4_0, CRAB_2_4_0_pre9, CRAB_2_4_0_pre8, CRAB_2_4_0_pre7, CRAB_2_4_0_pre6, CRAB_2_4_0_pre5, CRAB_2_4_0_pre4, CRAB_2_4_0_pre3, CRAB_2_4_0_pre2, CRAB_2_4_0_pre1, CRAB_DLS_PHED1, CRAB_DLS_PHED, CRAB_2_3_2_Fnal, CRAB_2_3_2, CRAB_2_3_2_pre7, CRAB_2_3_2_pre5, CRAB_2_3_2_pre4, CRAB_2_3_2_pre3, CRAB_2_3_2_pre2, CRAB_2_3_2_pre1, CRAB_2_4_0_test |
Branch point for: | CRAB_multiout, CRAB_2_7_1_branch, Lumi2_8, CRAB_2_6_X_br, AnaDataSet |
Changes since 1.13: | +6 -1 lines |
Log Message: | added try-except for the case of fjr containing only <AnalysisFile> info. Exit_code 50115 |
# | User | Rev | Content |
---|---|---|---|
1 | gutsche | 1.1 | #!/usr/bin/env python |
2 | |||
3 | import sys, getopt, string | ||
4 | |||
5 | fanzago | 1.6 | from ProdCommon.FwkJobRep.ReportParser import readJobReport |
6 | gutsche | 1.2 | from DashboardAPI import apmonSend, apmonFree |
7 | |||
8 | spiga | 1.10 | class parseFjr: |
9 | def __init__(self, argv): | ||
10 | """ | ||
11 | parseCrabFjr | ||
12 | |||
13 | - parse CRAB FrameworkJobReport on WN: { 'protocol' : { 'action' : [attempted,succedeed,total-size,total-time,min-time,max-time] , ... } , ... } | ||
14 | - report parameters to DashBoard using DashBoardApi.py: for all 'read' actions of all protocols, report MBPS | ||
15 | - return ExitStatus and dump of DashBoard report separated by semi-colon to WN wrapper script | ||
16 | """ | ||
17 | # defaults | ||
18 | self.input = '' | ||
19 | self.MonitorID = '' | ||
20 | self.MonitorJobID = '' | ||
21 | self.info2dash = False | ||
22 | self.exitCode = False | ||
23 | self.lfnList = False | ||
24 | self.debug = 0 | ||
25 | try: | ||
26 | opts, args = getopt.getopt(argv, "", ["input=", "dashboard=", "exitcode", "lfn" , "debug", "help"]) | ||
27 | except getopt.GetoptError: | ||
28 | print self.usage() | ||
29 | sys.exit(2) | ||
30 | self.check(opts) | ||
31 | |||
32 | return | ||
33 | |||
34 | def check(self,opts): | ||
35 | # check command line parameter | ||
36 | for opt, arg in opts : | ||
37 | if opt == "--help" : | ||
38 | print self.usage() | ||
39 | sys.exit() | ||
40 | elif opt == "--input" : | ||
41 | self.input = arg | ||
42 | elif opt == "--exitcode": | ||
43 | self.exitCode = True | ||
44 | elif opt == "--lfn": | ||
45 | self.lfnList = True | ||
46 | elif opt == "--dashboard": | ||
47 | self.info2dash = True | ||
48 | try: | ||
49 | self.MonitorID = arg.split(",")[0] | ||
50 | self.MonitorJobID = arg.split(",")[1] | ||
51 | except: | ||
52 | self.MonitorID = '' | ||
53 | self.MonitorJobID = '' | ||
54 | elif opt == "--debug" : | ||
55 | self.debug = 1 | ||
56 | |||
57 | if self.input == '' or (not self.info2dash and not self.lfnList and not self.exitCode) : | ||
58 | print self.usage() | ||
59 | gutsche | 1.1 | sys.exit() |
60 | spiga | 1.10 | |
61 | if self.info2dash: | ||
62 | if self.MonitorID == '' or self.MonitorJobID == '': | ||
63 | print self.usage() | ||
64 | sys.exit() | ||
65 | return | ||
66 | |||
67 | def run(self): | ||
68 | |||
69 | # load FwkJobRep | ||
70 | fanzago | 1.14 | try: |
71 | jobReport = readJobReport(self.input)[0] | ||
72 | except: | ||
73 | print '50115' | ||
74 | sys.exit() | ||
75 | |||
76 | spiga | 1.10 | if self.exitCode : |
77 | self.exitCodes(jobReport) | ||
78 | if self.lfnList : | ||
79 | self.lfn_List(jobReport) | ||
80 | if self.info2dash : | ||
81 | self.reportDash(jobReport) | ||
82 | return | ||
83 | |||
84 | def exitCodes(self, jobReport): | ||
85 | |||
86 | exit_status = '' | ||
87 | ##### temporary fix for FJR incomplete #### | ||
88 | fjr = open (self.input) | ||
89 | len_fjr = len(fjr.readlines()) | ||
90 | if (len_fjr <= 6): | ||
91 | ### 50115 - cmsRun did not produce a valid/readable job report at runtime | ||
92 | exit_status = str(50115) | ||
93 | else: | ||
94 | # get ExitStatus of last error | ||
95 | if len(jobReport.errors) != 0 : | ||
96 | exit_status = str(jobReport.errors[-1]['ExitStatus']) | ||
97 | else : | ||
98 | exit_status = str(0) | ||
99 | #check exit code | ||
100 | if string.strip(exit_status) == '': exit_status = -999 | ||
101 | print exit_status | ||
102 | |||
103 | return | ||
104 | |||
105 | def lfn_List(self,jobReport): | ||
106 | ''' | ||
107 | get list of analyzed files | ||
108 | ''' | ||
109 | lfnlist = [x['LFN'] for x in jobReport.inputFiles] | ||
110 | for file in lfnlist: print file | ||
111 | return | ||
112 | |||
113 | def storageStat(self,jobReport): | ||
114 | ''' | ||
115 | get i/o statistics | ||
116 | ''' | ||
117 | storageStatistics = str(jobReport.storageStatistics) | ||
118 | storage_report = {} | ||
119 | farinafa | 1.11 | |
120 | spiga | 1.10 | # check if storageStatistics is valid |
121 | if storageStatistics.find('Storage statistics:') != -1 : | ||
122 | # report form: { 'protocol' : { 'action' : [attempted,succedeed,total-size,total-time,min-time,max-time] , ... } , ... } | ||
123 | data = storageStatistics.split('Storage statistics:')[1] | ||
124 | data_fields = data.split(';') | ||
125 | for data_field in data_fields: | ||
126 | # parse: format protocol/action = attepted/succedeed/total-size/total-time/min-time/max-time | ||
127 | if data_field == ' ' or not data_field or data_field == '': | ||
128 | continue | ||
129 | key = data_field.split('=')[0].strip() | ||
130 | item = data_field.split('=')[1].strip() | ||
131 | protocol = str(key.split('/')[0].strip()) | ||
132 | action = str(key.split('/')[1].strip()) | ||
133 | item_array = item.split('/') | ||
134 | attempted = str(item_array[0].strip()) | ||
135 | succeeded = str(item_array[1].strip()) | ||
136 | total_size = str(item_array[2].strip().split('MB')[0]) | ||
137 | total_time = str(item_array[3].strip().split('ms')[0]) | ||
138 | min_time = str(item_array[4].strip().split('ms')[0]) | ||
139 | max_time = str(item_array[5].strip().split('ms')[0]) | ||
140 | # add to report | ||
141 | if protocol in storage_report.keys() : | ||
142 | if action in storage_report[protocol].keys() : | ||
143 | print 'protocol/action:',protocol,'/',action,'listed twice in report, taking the first' | ||
144 | else : | ||
145 | storage_report[protocol][action] = [attempted,succeeded,total_size,total_time,min_time,max_time] | ||
146 | gutsche | 1.2 | else : |
147 | spiga | 1.10 | storage_report[protocol] = {action : [attempted,succeeded,total_size,total_time,min_time,max_time] } |
148 | |||
149 | if self.debug : | ||
150 | for protocol in storage_report.keys() : | ||
151 | print 'protocol:',protocol | ||
152 | for action in storage_report[protocol].keys() : | ||
153 | print 'action:',action,'measurement:',storage_report[protocol][action] | ||
154 | farinafa | 1.11 | |
155 | ##### | ||
156 | # throughput measurements # Fabio | ||
157 | throughput_report = { 'rSize':0.0, 'rTime':0.0, 'wSize':0.0, 'wTime':0.0 } | ||
158 | for protocol in storage_report.keys() : | ||
159 | for action in storage_report[protocol].keys(): | ||
160 | # not interesting | ||
161 | if 'read' not in action and 'write' not in action and 'seek' not in action: | ||
162 | continue | ||
163 | |||
164 | # convert values | ||
165 | try: | ||
166 | sizeValue = float(storage_report[protocol][action][2]) | ||
167 | timeValue = float(storage_report[protocol][action][3]) | ||
168 | except Exception,e: | ||
169 | continue | ||
170 | spiga | 1.10 | |
171 | farinafa | 1.11 | # aggregate data |
172 | if 'read' in action: | ||
173 | throughput_report['rSize'] += sizeValue | ||
174 | throughput_report['rTime'] += timeValue | ||
175 | elif 'write' in action: | ||
176 | throughput_report['wSize'] += sizeValue | ||
177 | throughput_report['wTime'] += timeValue | ||
178 | elif 'seek' in action: | ||
179 | throughput_report['rTime'] += timeValue | ||
180 | else: | ||
181 | continue | ||
182 | |||
183 | # calculate global throughput | ||
184 | throughput_report['readThr'] = 'NULL' | ||
185 | if throughput_report['rTime'] > 0.0: | ||
186 | throughput_report['rTime'] /= 1000.0 # scale ms to s | ||
187 | throughput_report['readThr'] = float(throughput_report['rSize']/throughput_report['rTime']) | ||
188 | |||
189 | throughput_report['avgNetThr'] = 'NULL' | ||
190 | try: | ||
191 | throughput_report['avgNetThr'] = throughput_report['rSize'] / float(jobReport.performance.summaries['ExeTime']) | ||
192 | except: | ||
193 | pass | ||
194 | |||
195 | throughput_report['writeThr'] = 'NULL' | ||
196 | if throughput_report['wTime'] > 0.0: | ||
197 | throughput_report['wTime'] /= 1000.0 | ||
198 | throughput_report['writeThr'] = float(throughput_report['wSize']/throughput_report['wTime']) | ||
199 | ##### | ||
200 | |||
201 | if self.debug == 1 : | ||
202 | print storage_report | ||
203 | print throughput_report | ||
204 | return storage_report, throughput_report | ||
205 | spiga | 1.10 | |
206 | def n_of_events(self,jobReport): | ||
207 | ''' | ||
208 | #Brian's patch to sent number of events procedded to the Dashboard | ||
209 | # Add NoEventsPerRun to the Dashboard report | ||
210 | ''' | ||
211 | event_report = {} | ||
212 | eventsPerRun = 0 | ||
213 | for inputFile in jobReport.inputFiles: | ||
214 | try: | ||
215 | eventsRead = str(inputFile.get('EventsRead', 0)) | ||
216 | eventsRead = int(eventsRead.strip()) | ||
217 | except: | ||
218 | continue | ||
219 | eventsPerRun += eventsRead | ||
220 | event_report['NoEventsPerRun'] = eventsPerRun | ||
221 | event_report['NbEvPerRun'] = eventsPerRun | ||
222 | event_report['NEventsProcessed'] = eventsPerRun | ||
223 | |||
224 | if self.debug == 1 : print event_report | ||
225 | |||
226 | return event_report | ||
227 | |||
228 | def reportDash(self,jobReport): | ||
229 | ''' | ||
230 | dashboard report dictionary | ||
231 | ''' | ||
232 | event_report = self.n_of_events(jobReport) | ||
233 | farinafa | 1.11 | storage_report, throughput_report = self.storageStat(jobReport) |
234 | spiga | 1.10 | dashboard_report = {} |
235 | # | ||
236 | for k,v in event_report.iteritems() : | ||
237 | dashboard_report[k]=v | ||
238 | gutsche | 1.2 | |
239 | # extract information to be sent to DashBoard | ||
240 | # per protocol and for action=read, calculate MBPS | ||
241 | # dashboard key is io_action | ||
242 | spiga | 1.10 | dashboard_report['MonitorID'] = self.MonitorID |
243 | dashboard_report['MonitorJobID'] = self.MonitorJobID | ||
244 | for protocol in storage_report.keys() : | ||
245 | for action in storage_report[protocol].keys() : | ||
246 | try: size = float(storage_report[protocol][action][2]) | ||
247 | gutsche | 1.3 | except: size = 'NULL' |
248 | spiga | 1.10 | try: time = float(storage_report[protocol][action][3])/1000 |
249 | gutsche | 1.3 | except: time = 'NULL' |
250 | farinafa | 1.13 | dashboard_report['io_'+protocol+'_'+action] = str(size)+'_'+str(time) |
251 | spiga | 1.10 | if self.debug : |
252 | gutsche | 1.3 | ordered = dashboard_report.keys() |
253 | ordered.sort() | ||
254 | for key in ordered: | ||
255 | print key,'=',dashboard_report[key] | ||
256 | farinafa | 1.11 | |
257 | # IO throughput information | ||
258 | dashboard_report['io_read_throughput'] = throughput_report['readThr'] | ||
259 | dashboard_report['io_write_throughput'] = throughput_report['writeThr'] | ||
260 | dashboard_report['io_netAvg_throughput'] = throughput_report['avgNetThr'] | ||
261 | spiga | 1.10 | |
262 | gutsche | 1.2 | # send to DashBoard |
263 | spiga | 1.10 | apmonSend(self.MonitorID, self.MonitorJobID, dashboard_report) |
264 | gutsche | 1.2 | apmonFree() |
265 | |||
266 | spiga | 1.10 | if self.debug == 1 : print dashboard_report |
267 | gutsche | 1.2 | |
268 | spiga | 1.10 | return |
269 | gutsche | 1.5 | |
270 | spiga | 1.10 | def usage(self): |
271 | |||
272 | msg=""" | ||
273 | required parameters: | ||
274 | --input : input FJR xml file | ||
275 | |||
276 | optional parameters: | ||
277 | --dashboard : send info to the dashboard. require following args: "MonitorID,MonitorJobID" | ||
278 | MonitorID : DashBoard MonitorID | ||
279 | MonitorJobID : DashBoard MonitorJobID | ||
280 | --exitcode : print executable exit code | ||
281 | --lfn : report list of files really analyzed | ||
282 | --help : help | ||
283 | --debug : debug statements | ||
284 | """ | ||
285 | return msg | ||
286 | gutsche | 1.1 | |
287 | |||
288 | if __name__ == '__main__' : | ||
289 | spiga | 1.10 | try: |
290 | parseFjr_ = parseFjr(sys.argv[1:]) | ||
291 | parseFjr_.run() | ||
292 | except: | ||
293 | pass |