5 |
|
from ProdCommon.FwkJobRep.ReportParser import readJobReport |
6 |
|
from DashboardAPI import apmonSend, apmonFree |
7 |
|
|
8 |
< |
|
9 |
< |
def main(argv) : |
10 |
< |
""" |
11 |
< |
parseCrabFjr |
12 |
< |
|
13 |
< |
- parse CRAB FrameworkJobReport on WN: { 'protocol' : { 'action' : [attempted,succedeed,total-size,total-time,min-time,max-time] , ... } , ... } |
14 |
< |
- report parameters to DashBoard using DashBoardApi.py: for all 'read' actions of all protocols, report MBPS |
15 |
< |
- return ExitStatus and dump of DashBoard report separated by semi-colon to WN wrapper script |
16 |
< |
|
17 |
< |
required parameters: |
18 |
< |
--input : input FJR xml file |
19 |
< |
--MonitorID : DashBoard MonitorID |
20 |
< |
--MonitorJobID : DashBoard MonitorJobID |
21 |
< |
|
22 |
< |
optional parameters: |
23 |
< |
--help : help |
24 |
< |
--debug : debug statements |
25 |
< |
|
26 |
< |
""" |
27 |
< |
|
28 |
< |
# defaults |
29 |
< |
input = '' |
30 |
< |
MonitorID = '' |
31 |
< |
MonitorJobID = '' |
32 |
< |
debug = 0 |
33 |
< |
|
34 |
< |
try: |
35 |
< |
opts, args = getopt.getopt(argv, "", ["input=", "MonitorID=", "MonitorJobID=", "debug", "help"]) |
36 |
< |
except getopt.GetoptError: |
37 |
< |
print main.__doc__ |
38 |
< |
sys.exit(2) |
39 |
< |
|
40 |
< |
# check command line parameter |
41 |
< |
for opt, arg in opts : |
42 |
< |
if opt == "--help" : |
43 |
< |
print main.__doc__ |
8 |
> |
class parseFjr: |
9 |
> |
def __init__(self, argv): |
10 |
> |
""" |
11 |
> |
parseCrabFjr |
12 |
> |
|
13 |
> |
- parse CRAB FrameworkJobReport on WN: { 'protocol' : { 'action' : [attempted,succedeed,total-size,total-time,min-time,max-time] , ... } , ... } |
14 |
> |
- report parameters to DashBoard using DashBoardApi.py: for all 'read' actions of all protocols, report MBPS |
15 |
> |
- return ExitStatus and dump of DashBoard report separated by semi-colon to WN wrapper script |
16 |
> |
""" |
17 |
> |
# defaults |
18 |
> |
self.input = '' |
19 |
> |
self.MonitorID = '' |
20 |
> |
self.MonitorJobID = '' |
21 |
> |
self.info2dash = False |
22 |
> |
self.exitCode = False |
23 |
> |
self.lfnList = False |
24 |
> |
self.debug = 0 |
25 |
> |
try: |
26 |
> |
opts, args = getopt.getopt(argv, "", ["input=", "dashboard=", "exitcode", "lfn" , "debug", "help"]) |
27 |
> |
except getopt.GetoptError: |
28 |
> |
print self.usage() |
29 |
> |
sys.exit(2) |
30 |
> |
self.check(opts) |
31 |
> |
|
32 |
> |
return |
33 |
> |
|
34 |
> |
def check(self,opts): |
35 |
> |
# check command line parameter |
36 |
> |
for opt, arg in opts : |
37 |
> |
if opt == "--help" : |
38 |
> |
print self.usage() |
39 |
> |
sys.exit() |
40 |
> |
elif opt == "--input" : |
41 |
> |
self.input = arg |
42 |
> |
elif opt == "--exitcode": |
43 |
> |
self.exitCode = True |
44 |
> |
elif opt == "--lfn": |
45 |
> |
self.lfnList = True |
46 |
> |
elif opt == "--dashboard": |
47 |
> |
self.info2dash = True |
48 |
> |
try: |
49 |
> |
self.MonitorID = arg.split(",")[0] |
50 |
> |
self.MonitorJobID = arg.split(",")[1] |
51 |
> |
except: |
52 |
> |
self.MonitorID = '' |
53 |
> |
self.MonitorJobID = '' |
54 |
> |
elif opt == "--debug" : |
55 |
> |
self.debug = 1 |
56 |
> |
|
57 |
> |
if self.input == '' or (not self.info2dash and not self.lfnList and not self.exitCode) : |
58 |
> |
print self.usage() |
59 |
> |
sys.exit() |
60 |
> |
|
61 |
> |
if self.info2dash: |
62 |
> |
if self.MonitorID == '' or self.MonitorJobID == '': |
63 |
> |
print self.usage() |
64 |
> |
sys.exit() |
65 |
> |
return |
66 |
> |
|
67 |
> |
def run(self): |
68 |
> |
|
69 |
> |
# load FwkJobRep |
70 |
> |
try: |
71 |
> |
jobReport = readJobReport(self.input)[0] |
72 |
> |
except: |
73 |
> |
print '50115' |
74 |
|
sys.exit() |
45 |
– |
elif opt == "--input" : |
46 |
– |
input = arg |
47 |
– |
elif opt == "--MonitorID" : |
48 |
– |
MonitorID = arg |
49 |
– |
elif opt == "--MonitorJobID" : |
50 |
– |
MonitorJobID = arg |
51 |
– |
elif opt == "--debug" : |
52 |
– |
debug = 1 |
75 |
|
|
76 |
< |
if input == '' or MonitorID == '' or MonitorJobID == '': |
77 |
< |
print main.__doc__ |
78 |
< |
sys.exit() |
79 |
< |
|
80 |
< |
# load FwkJobRep |
81 |
< |
jobReport = readJobReport(input)[0] |
82 |
< |
|
83 |
< |
exit_status = '' |
84 |
< |
|
85 |
< |
##### temporary fix for FJR incomplete #### |
86 |
< |
fjr = open (input) |
87 |
< |
len_fjr = len(fjr.readlines()) |
88 |
< |
if (len_fjr <= 6): |
89 |
< |
### 50115 - cmsRun did not produce a valid/readable job report at runtime |
90 |
< |
exit_status = str(50115) |
91 |
< |
else: |
92 |
< |
# get ExitStatus of last error |
93 |
< |
if len(jobReport.errors) != 0 : |
94 |
< |
exit_status = str(jobReport.errors[-1]['ExitStatus']) |
95 |
< |
else : |
96 |
< |
exit_status = str(0) |
75 |
< |
|
76 |
< |
# get i/o statistics |
77 |
< |
storageStatistics = str(jobReport.storageStatistics) |
78 |
< |
|
79 |
< |
# dashboard report dictionary |
80 |
< |
dashboard_report = {} |
81 |
< |
|
82 |
< |
# check if storageStatistics is valid |
83 |
< |
if storageStatistics.find('Storage statistics:') != -1 : |
84 |
< |
# report form: { 'protocol' : { 'action' : [attempted,succedeed,total-size,total-time,min-time,max-time] , ... } , ... } |
85 |
< |
report = {} |
86 |
< |
data = storageStatistics.split('Storage statistics:')[1] |
87 |
< |
data_fields = data.split(';') |
88 |
< |
for data_field in data_fields: |
89 |
< |
# parse: format protocol/action = attepted/succedeed/total-size/total-time/min-time/max-time |
90 |
< |
key = data_field.split('=')[0].strip() |
91 |
< |
item = data_field.split('=')[1].strip() |
92 |
< |
protocol = str(key.split('/')[0].strip()) |
93 |
< |
action = str(key.split('/')[1].strip()) |
94 |
< |
item_array = item.split('/') |
95 |
< |
attempted = str(item_array[0].strip()) |
96 |
< |
succeeded = str(item_array[1].strip()) |
97 |
< |
total_size = str(item_array[2].strip().split('MB')[0]) |
98 |
< |
total_time = str(item_array[3].strip().split('ms')[0]) |
99 |
< |
min_time = str(item_array[4].strip().split('ms')[0]) |
100 |
< |
max_time = str(item_array[5].strip().split('ms')[0]) |
101 |
< |
# add to report |
102 |
< |
if protocol in report.keys() : |
103 |
< |
if action in report[protocol].keys() : |
104 |
< |
print 'protocol/action:',protocol,'/',action,'listed twice in report, taking the first' |
105 |
< |
else : |
106 |
< |
report[protocol][action] = [attempted,succeeded,total_size,total_time,min_time,max_time] |
76 |
> |
if self.exitCode : |
77 |
> |
self.exitCodes(jobReport) |
78 |
> |
if self.lfnList : |
79 |
> |
self.lfn_List(jobReport) |
80 |
> |
if self.info2dash : |
81 |
> |
self.reportDash(jobReport) |
82 |
> |
return |
83 |
> |
|
84 |
> |
def exitCodes(self, jobReport): |
85 |
> |
|
86 |
> |
exit_status = '' |
87 |
> |
##### temporary fix for FJR incomplete #### |
88 |
> |
fjr = open (self.input) |
89 |
> |
len_fjr = len(fjr.readlines()) |
90 |
> |
if (len_fjr <= 6): |
91 |
> |
### 50115 - cmsRun did not produce a valid/readable job report at runtime |
92 |
> |
exit_status = str(50115) |
93 |
> |
else: |
94 |
> |
# get ExitStatus of last error |
95 |
> |
if len(jobReport.errors) != 0 : |
96 |
> |
exit_status = str(jobReport.errors[-1]['ExitStatus']) |
97 |
|
else : |
98 |
< |
report[protocol] = {action : [attempted,succeeded,total_size,total_time,min_time,max_time] } |
99 |
< |
|
100 |
< |
if debug : |
101 |
< |
for protocol in report.keys() : |
102 |
< |
print 'protocol:',protocol |
103 |
< |
for action in report[protocol].keys() : |
104 |
< |
print 'action:',action,'measurement:',report[protocol][action] |
98 |
> |
exit_status = str(0) |
99 |
> |
#check exit code |
100 |
> |
if string.strip(exit_status) == '': exit_status = -999 |
101 |
> |
print exit_status |
102 |
> |
|
103 |
> |
return |
104 |
> |
|
105 |
> |
def lfn_List(self,jobReport): |
106 |
> |
''' |
107 |
> |
get list of analyzed files |
108 |
> |
''' |
109 |
> |
lfnlist = [x['LFN'] for x in jobReport.inputFiles] |
110 |
> |
for file in lfnlist: print file |
111 |
> |
return |
112 |
> |
|
113 |
> |
def storageStat(self,jobReport): |
114 |
> |
''' |
115 |
> |
get i/o statistics |
116 |
> |
''' |
117 |
> |
storageStatistics = str(jobReport.storageStatistics) |
118 |
> |
storage_report = {} |
119 |
> |
|
120 |
> |
# check if storageStatistics is valid |
121 |
> |
if storageStatistics.find('Storage statistics:') != -1 : |
122 |
> |
# report form: { 'protocol' : { 'action' : [attempted,succedeed,total-size,total-time,min-time,max-time] , ... } , ... } |
123 |
> |
data = storageStatistics.split('Storage statistics:')[1] |
124 |
> |
data_fields = data.split(';') |
125 |
> |
for data_field in data_fields: |
126 |
> |
# parse: format protocol/action = attepted/succedeed/total-size/total-time/min-time/max-time |
127 |
> |
if data_field == ' ' or not data_field or data_field == '': |
128 |
> |
continue |
129 |
> |
key = data_field.split('=')[0].strip() |
130 |
> |
item = data_field.split('=')[1].strip() |
131 |
> |
protocol = str(key.split('/')[0].strip()) |
132 |
> |
action = str(key.split('/')[1].strip()) |
133 |
> |
item_array = item.split('/') |
134 |
> |
attempted = str(item_array[0].strip()) |
135 |
> |
succeeded = str(item_array[1].strip()) |
136 |
> |
total_size = str(item_array[2].strip().split('MB')[0]) |
137 |
> |
total_time = str(item_array[3].strip().split('ms')[0]) |
138 |
> |
min_time = str(item_array[4].strip().split('ms')[0]) |
139 |
> |
max_time = str(item_array[5].strip().split('ms')[0]) |
140 |
> |
# add to report |
141 |
> |
if protocol in storage_report.keys() : |
142 |
> |
if action in storage_report[protocol].keys() : |
143 |
> |
print 'protocol/action:',protocol,'/',action,'listed twice in report, taking the first' |
144 |
> |
else : |
145 |
> |
storage_report[protocol][action] = [attempted,succeeded,total_size,total_time,min_time,max_time] |
146 |
> |
else : |
147 |
> |
storage_report[protocol] = {action : [attempted,succeeded,total_size,total_time,min_time,max_time] } |
148 |
> |
|
149 |
> |
if self.debug : |
150 |
> |
for protocol in storage_report.keys() : |
151 |
> |
print 'protocol:',protocol |
152 |
> |
for action in storage_report[protocol].keys() : |
153 |
> |
print 'action:',action,'measurement:',storage_report[protocol][action] |
154 |
> |
|
155 |
> |
##### |
156 |
> |
# throughput measurements # Fabio |
157 |
> |
throughput_report = { 'rSize':0.0, 'rTime':0.0, 'wSize':0.0, 'wTime':0.0 } |
158 |
> |
for protocol in storage_report.keys() : |
159 |
> |
for action in storage_report[protocol].keys(): |
160 |
> |
# not interesting |
161 |
> |
if 'read' not in action and 'write' not in action and 'seek' not in action: |
162 |
> |
continue |
163 |
> |
|
164 |
> |
# convert values |
165 |
> |
try: |
166 |
> |
sizeValue = float(storage_report[protocol][action][2]) |
167 |
> |
timeValue = float(storage_report[protocol][action][3]) |
168 |
> |
except Exception,e: |
169 |
> |
continue |
170 |
> |
|
171 |
> |
# aggregate data |
172 |
> |
if 'read' in action: |
173 |
> |
throughput_report['rSize'] += sizeValue |
174 |
> |
throughput_report['rTime'] += timeValue |
175 |
> |
elif 'write' in action: |
176 |
> |
throughput_report['wSize'] += sizeValue |
177 |
> |
throughput_report['wTime'] += timeValue |
178 |
> |
elif 'seek' in action: |
179 |
> |
throughput_report['rTime'] += timeValue |
180 |
> |
else: |
181 |
> |
continue |
182 |
> |
|
183 |
> |
# calculate global throughput |
184 |
> |
throughput_report['readThr'] = 'NULL' |
185 |
> |
if throughput_report['rTime'] > 0.0: |
186 |
> |
throughput_report['rTime'] /= 1000.0 # scale ms to s |
187 |
> |
throughput_report['readThr'] = float(throughput_report['rSize']/throughput_report['rTime']) |
188 |
> |
|
189 |
> |
throughput_report['avgNetThr'] = 'NULL' |
190 |
> |
try: |
191 |
> |
throughput_report['avgNetThr'] = throughput_report['rSize'] / float(jobReport.performance.summaries['ExeTime']) |
192 |
> |
except: |
193 |
> |
pass |
194 |
> |
|
195 |
> |
throughput_report['writeThr'] = 'NULL' |
196 |
> |
if throughput_report['wTime'] > 0.0: |
197 |
> |
throughput_report['wTime'] /= 1000.0 |
198 |
> |
throughput_report['writeThr'] = float(throughput_report['wSize']/throughput_report['wTime']) |
199 |
> |
##### |
200 |
> |
|
201 |
> |
if self.debug == 1 : |
202 |
> |
print storage_report |
203 |
> |
print throughput_report |
204 |
> |
return storage_report, throughput_report |
205 |
> |
|
206 |
> |
def n_of_events(self,jobReport): |
207 |
> |
''' |
208 |
> |
#Brian's patch to sent number of events procedded to the Dashboard |
209 |
> |
# Add NoEventsPerRun to the Dashboard report |
210 |
> |
''' |
211 |
> |
event_report = {} |
212 |
> |
eventsPerRun = 0 |
213 |
> |
for inputFile in jobReport.inputFiles: |
214 |
> |
try: |
215 |
> |
eventsRead = str(inputFile.get('EventsRead', 0)) |
216 |
> |
eventsRead = int(eventsRead.strip()) |
217 |
> |
except: |
218 |
> |
continue |
219 |
> |
eventsPerRun += eventsRead |
220 |
> |
event_report['NoEventsPerRun'] = eventsPerRun |
221 |
> |
event_report['NbEvPerRun'] = eventsPerRun |
222 |
> |
event_report['NEventsProcessed'] = eventsPerRun |
223 |
> |
|
224 |
> |
if self.debug == 1 : print event_report |
225 |
> |
|
226 |
> |
return event_report |
227 |
> |
|
228 |
> |
def reportDash(self,jobReport): |
229 |
> |
''' |
230 |
> |
dashboard report dictionary |
231 |
> |
''' |
232 |
> |
event_report = self.n_of_events(jobReport) |
233 |
> |
storage_report, throughput_report = self.storageStat(jobReport) |
234 |
> |
dashboard_report = {} |
235 |
> |
# |
236 |
> |
for k,v in event_report.iteritems() : |
237 |
> |
dashboard_report[k]=v |
238 |
|
|
239 |
|
# extract information to be sent to DashBoard |
240 |
|
# per protocol and for action=read, calculate MBPS |
241 |
|
# dashboard key is io_action |
242 |
< |
dashboard_report['MonitorID'] = MonitorID |
243 |
< |
dashboard_report['MonitorJobID'] = MonitorJobID |
244 |
< |
for protocol in report.keys() : |
245 |
< |
for action in report[protocol].keys() : |
246 |
< |
try: size = float(report[protocol][action][2]) |
242 |
> |
dashboard_report['MonitorID'] = self.MonitorID |
243 |
> |
dashboard_report['MonitorJobID'] = self.MonitorJobID |
244 |
> |
for protocol in storage_report.keys() : |
245 |
> |
for action in storage_report[protocol].keys() : |
246 |
> |
try: size = float(storage_report[protocol][action][2]) |
247 |
|
except: size = 'NULL' |
248 |
< |
try: time = float(report[protocol][action][3])/1000 |
248 |
> |
try: time = float(storage_report[protocol][action][3])/1000 |
249 |
|
except: time = 'NULL' |
250 |
|
dashboard_report['io_'+protocol+'_'+action] = str(size)+'_'+str(time) |
251 |
< |
|
129 |
< |
if debug : |
251 |
> |
if self.debug : |
252 |
|
ordered = dashboard_report.keys() |
253 |
|
ordered.sort() |
254 |
|
for key in ordered: |
255 |
|
print key,'=',dashboard_report[key] |
256 |
|
|
257 |
+ |
# IO throughput information |
258 |
+ |
dashboard_report['io_read_throughput'] = throughput_report['readThr'] |
259 |
+ |
dashboard_report['io_write_throughput'] = throughput_report['writeThr'] |
260 |
+ |
dashboard_report['io_netAvg_throughput'] = throughput_report['avgNetThr'] |
261 |
+ |
|
262 |
|
# send to DashBoard |
263 |
< |
apmonSend(MonitorID, MonitorJobID, dashboard_report) |
263 |
> |
apmonSend(self.MonitorID, self.MonitorJobID, dashboard_report) |
264 |
|
apmonFree() |
265 |
|
|
266 |
< |
# prepare exit string |
140 |
< |
exit_string = str(exit_status) |
141 |
< |
for key in dashboard_report.keys() : |
142 |
< |
exit_string += ';' + str(key) + '=' + str(dashboard_report[key]) |
266 |
> |
if self.debug == 1 : print dashboard_report |
267 |
|
|
268 |
< |
|
268 |
> |
return |
269 |
|
|
270 |
< |
return exit_string |
270 |
> |
def usage(self): |
271 |
> |
|
272 |
> |
msg=""" |
273 |
> |
required parameters: |
274 |
> |
--input : input FJR xml file |
275 |
> |
|
276 |
> |
optional parameters: |
277 |
> |
--dashboard : send info to the dashboard. require following args: "MonitorID,MonitorJobID" |
278 |
> |
MonitorID : DashBoard MonitorID |
279 |
> |
MonitorJobID : DashBoard MonitorJobID |
280 |
> |
--exitcode : print executable exit code |
281 |
> |
--lfn : report list of files really analyzed |
282 |
> |
--help : help |
283 |
> |
--debug : debug statements |
284 |
> |
""" |
285 |
> |
return msg |
286 |
|
|
287 |
|
|
288 |
|
if __name__ == '__main__' : |
289 |
< |
exit_status = main(sys.argv[1:]) |
290 |
< |
# output for wrapper script |
291 |
< |
print exit_status |
292 |
< |
|
293 |
< |
|
289 |
> |
try: |
290 |
> |
parseFjr_ = parseFjr(sys.argv[1:]) |
291 |
> |
parseFjr_.run() |
292 |
> |
except: |
293 |
> |
pass |