5 |
|
from ProdCommon.FwkJobRep.ReportParser import readJobReport |
6 |
|
from DashboardAPI import apmonSend, apmonFree |
7 |
|
|
8 |
< |
|
9 |
< |
def main(argv) : |
10 |
< |
""" |
11 |
< |
parseCrabFjr |
12 |
< |
|
13 |
< |
- parse CRAB FrameworkJobReport on WN: { 'protocol' : { 'action' : [attempted,succedeed,total-size,total-time,min-time,max-time] , ... } , ... } |
14 |
< |
- report parameters to DashBoard using DashBoardApi.py: for all 'read' actions of all protocols, report MBPS |
15 |
< |
- return ExitStatus and dump of DashBoard report separated by semi-colon to WN wrapper script |
16 |
< |
|
17 |
< |
required parameters: |
18 |
< |
--input : input FJR xml file |
19 |
< |
--MonitorID : DashBoard MonitorID |
20 |
< |
--MonitorJobID : DashBoard MonitorJobID |
21 |
< |
|
22 |
< |
optional parameters: |
23 |
< |
--help : help |
24 |
< |
--debug : debug statements |
25 |
< |
|
26 |
< |
""" |
27 |
< |
|
28 |
< |
# defaults |
29 |
< |
input = '' |
30 |
< |
MonitorID = '' |
31 |
< |
MonitorJobID = '' |
32 |
< |
debug = 0 |
33 |
< |
|
34 |
< |
try: |
35 |
< |
opts, args = getopt.getopt(argv, "", ["input=", "MonitorID=", "MonitorJobID=", "debug", "help"]) |
36 |
< |
except getopt.GetoptError: |
37 |
< |
print main.__doc__ |
38 |
< |
sys.exit(2) |
39 |
< |
|
40 |
< |
# check command line parameter |
41 |
< |
for opt, arg in opts : |
42 |
< |
if opt == "--help" : |
43 |
< |
print main.__doc__ |
8 |
> |
class parseFjr: |
9 |
> |
def __init__(self, argv): |
10 |
> |
""" |
11 |
> |
parseCrabFjr |
12 |
> |
|
13 |
> |
- parse CRAB FrameworkJobReport on WN: { 'protocol' : { 'action' : [attempted,succedeed,total-size,total-time,min-time,max-time] , ... } , ... } |
14 |
> |
- report parameters to DashBoard using DashBoardApi.py: for all 'read' actions of all protocols, report MBPS |
15 |
> |
- return ExitStatus and dump of DashBoard report separated by semi-colon to WN wrapper script |
16 |
> |
""" |
17 |
> |
# defaults |
18 |
> |
self.input = '' |
19 |
> |
self.MonitorID = '' |
20 |
> |
self.MonitorJobID = '' |
21 |
> |
self.info2dash = False |
22 |
> |
self.exitCode = False |
23 |
> |
self.lfnList = False |
24 |
> |
self.debug = 0 |
25 |
> |
try: |
26 |
> |
opts, args = getopt.getopt(argv, "", ["input=", "dashboard=", "exitcode", "lfn" , "debug", "help"]) |
27 |
> |
except getopt.GetoptError: |
28 |
> |
print self.usage() |
29 |
> |
sys.exit(2) |
30 |
> |
self.check(opts) |
31 |
> |
|
32 |
> |
return |
33 |
> |
|
34 |
> |
def check(self,opts): |
35 |
> |
# check command line parameter |
36 |
> |
for opt, arg in opts : |
37 |
> |
if opt == "--help" : |
38 |
> |
print self.usage() |
39 |
> |
sys.exit() |
40 |
> |
elif opt == "--input" : |
41 |
> |
self.input = arg |
42 |
> |
elif opt == "--exitcode": |
43 |
> |
self.exitCode = True |
44 |
> |
elif opt == "--lfn": |
45 |
> |
self.lfnList = True |
46 |
> |
elif opt == "--dashboard": |
47 |
> |
self.info2dash = True |
48 |
> |
try: |
49 |
> |
self.MonitorID = arg.split(",")[0] |
50 |
> |
self.MonitorJobID = arg.split(",")[1] |
51 |
> |
except: |
52 |
> |
self.MonitorID = '' |
53 |
> |
self.MonitorJobID = '' |
54 |
> |
elif opt == "--debug" : |
55 |
> |
self.debug = 1 |
56 |
> |
|
57 |
> |
if self.input == '' or (not self.info2dash and not self.lfnList and not self.exitCode) : |
58 |
> |
print self.usage() |
59 |
|
sys.exit() |
60 |
< |
elif opt == "--input" : |
61 |
< |
input = arg |
62 |
< |
elif opt == "--MonitorID" : |
63 |
< |
MonitorID = arg |
64 |
< |
elif opt == "--MonitorJobID" : |
65 |
< |
MonitorJobID = arg |
66 |
< |
elif opt == "--debug" : |
67 |
< |
debug = 1 |
68 |
< |
|
69 |
< |
if input == '' or MonitorID == '' or MonitorJobID == '': |
70 |
< |
print main.__doc__ |
71 |
< |
sys.exit() |
72 |
< |
|
73 |
< |
# load FwkJobRep |
74 |
< |
jobReport = readJobReport(input)[0] |
75 |
< |
|
76 |
< |
exit_status = '' |
77 |
< |
|
78 |
< |
##### temporary fix for FJR incomplete #### |
79 |
< |
fjr = open (input) |
80 |
< |
len_fjr = len(fjr.readlines()) |
81 |
< |
if (len_fjr <= 6): |
82 |
< |
### 50115 - cmsRun did not produce a valid/readable job report at runtime |
83 |
< |
exit_status = str(50115) |
84 |
< |
else: |
85 |
< |
# get ExitStatus of last error |
86 |
< |
if len(jobReport.errors) != 0 : |
87 |
< |
exit_status = str(jobReport.errors[-1]['ExitStatus']) |
88 |
< |
else : |
89 |
< |
exit_status = str(0) |
90 |
< |
|
91 |
< |
# get i/o statistics |
77 |
< |
storageStatistics = str(jobReport.storageStatistics) |
78 |
< |
|
79 |
< |
# dashboard report dictionary |
80 |
< |
dashboard_report = {} |
81 |
< |
|
82 |
< |
# check if storageStatistics is valid |
83 |
< |
if storageStatistics.find('Storage statistics:') != -1 : |
84 |
< |
# report form: { 'protocol' : { 'action' : [attempted,succedeed,total-size,total-time,min-time,max-time] , ... } , ... } |
85 |
< |
report = {} |
86 |
< |
data = storageStatistics.split('Storage statistics:')[1] |
87 |
< |
data_fields = data.split(';') |
88 |
< |
for data_field in data_fields: |
89 |
< |
# parse: format protocol/action = attepted/succedeed/total-size/total-time/min-time/max-time |
90 |
< |
key = data_field.split('=')[0].strip() |
91 |
< |
item = data_field.split('=')[1].strip() |
92 |
< |
protocol = str(key.split('/')[0].strip()) |
93 |
< |
action = str(key.split('/')[1].strip()) |
94 |
< |
item_array = item.split('/') |
95 |
< |
attempted = str(item_array[0].strip()) |
96 |
< |
succeeded = str(item_array[1].strip()) |
97 |
< |
total_size = str(item_array[2].strip().split('MB')[0]) |
98 |
< |
total_time = str(item_array[3].strip().split('ms')[0]) |
99 |
< |
min_time = str(item_array[4].strip().split('ms')[0]) |
100 |
< |
max_time = str(item_array[5].strip().split('ms')[0]) |
101 |
< |
# add to report |
102 |
< |
if protocol in report.keys() : |
103 |
< |
if action in report[protocol].keys() : |
104 |
< |
print 'protocol/action:',protocol,'/',action,'listed twice in report, taking the first' |
105 |
< |
else : |
106 |
< |
report[protocol][action] = [attempted,succeeded,total_size,total_time,min_time,max_time] |
60 |
> |
|
61 |
> |
if self.info2dash: |
62 |
> |
if self.MonitorID == '' or self.MonitorJobID == '': |
63 |
> |
print self.usage() |
64 |
> |
sys.exit() |
65 |
> |
return |
66 |
> |
|
67 |
> |
def run(self): |
68 |
> |
|
69 |
> |
# load FwkJobRep |
70 |
> |
jobReport = readJobReport(self.input)[0] |
71 |
> |
if self.exitCode : |
72 |
> |
self.exitCodes(jobReport) |
73 |
> |
if self.lfnList : |
74 |
> |
self.lfn_List(jobReport) |
75 |
> |
if self.info2dash : |
76 |
> |
self.reportDash(jobReport) |
77 |
> |
return |
78 |
> |
|
79 |
> |
def exitCodes(self, jobReport): |
80 |
> |
|
81 |
> |
exit_status = '' |
82 |
> |
##### temporary fix for FJR incomplete #### |
83 |
> |
fjr = open (self.input) |
84 |
> |
len_fjr = len(fjr.readlines()) |
85 |
> |
if (len_fjr <= 6): |
86 |
> |
### 50115 - cmsRun did not produce a valid/readable job report at runtime |
87 |
> |
exit_status = str(50115) |
88 |
> |
else: |
89 |
> |
# get ExitStatus of last error |
90 |
> |
if len(jobReport.errors) != 0 : |
91 |
> |
exit_status = str(jobReport.errors[-1]['ExitStatus']) |
92 |
|
else : |
93 |
< |
report[protocol] = {action : [attempted,succeeded,total_size,total_time,min_time,max_time] } |
94 |
< |
|
95 |
< |
if debug : |
96 |
< |
for protocol in report.keys() : |
97 |
< |
print 'protocol:',protocol |
98 |
< |
for action in report[protocol].keys() : |
99 |
< |
print 'action:',action,'measurement:',report[protocol][action] |
93 |
> |
exit_status = str(0) |
94 |
> |
#check exit code |
95 |
> |
if string.strip(exit_status) == '': exit_status = -999 |
96 |
> |
print exit_status |
97 |
> |
|
98 |
> |
return |
99 |
> |
|
100 |
> |
def lfn_List(self,jobReport): |
101 |
> |
''' |
102 |
> |
get list of analyzed files |
103 |
> |
''' |
104 |
> |
lfnlist = [x['LFN'] for x in jobReport.inputFiles] |
105 |
> |
for file in lfnlist: print file |
106 |
> |
return |
107 |
> |
|
108 |
> |
def storageStat(self,jobReport): |
109 |
> |
''' |
110 |
> |
get i/o statistics |
111 |
> |
''' |
112 |
> |
storageStatistics = str(jobReport.storageStatistics) |
113 |
> |
storage_report = {} |
114 |
> |
|
115 |
> |
# check if storageStatistics is valid |
116 |
> |
if storageStatistics.find('Storage statistics:') != -1 : |
117 |
> |
# report form: { 'protocol' : { 'action' : [attempted,succedeed,total-size,total-time,min-time,max-time] , ... } , ... } |
118 |
> |
data = storageStatistics.split('Storage statistics:')[1] |
119 |
> |
data_fields = data.split(';') |
120 |
> |
for data_field in data_fields: |
121 |
> |
# parse: format protocol/action = attepted/succedeed/total-size/total-time/min-time/max-time |
122 |
> |
if data_field == ' ' or not data_field or data_field == '': |
123 |
> |
continue |
124 |
> |
key = data_field.split('=')[0].strip() |
125 |
> |
item = data_field.split('=')[1].strip() |
126 |
> |
protocol = str(key.split('/')[0].strip()) |
127 |
> |
action = str(key.split('/')[1].strip()) |
128 |
> |
item_array = item.split('/') |
129 |
> |
attempted = str(item_array[0].strip()) |
130 |
> |
succeeded = str(item_array[1].strip()) |
131 |
> |
total_size = str(item_array[2].strip().split('MB')[0]) |
132 |
> |
total_time = str(item_array[3].strip().split('ms')[0]) |
133 |
> |
min_time = str(item_array[4].strip().split('ms')[0]) |
134 |
> |
max_time = str(item_array[5].strip().split('ms')[0]) |
135 |
> |
# add to report |
136 |
> |
if protocol in storage_report.keys() : |
137 |
> |
if action in storage_report[protocol].keys() : |
138 |
> |
print 'protocol/action:',protocol,'/',action,'listed twice in report, taking the first' |
139 |
> |
else : |
140 |
> |
storage_report[protocol][action] = [attempted,succeeded,total_size,total_time,min_time,max_time] |
141 |
> |
else : |
142 |
> |
storage_report[protocol] = {action : [attempted,succeeded,total_size,total_time,min_time,max_time] } |
143 |
> |
|
144 |
> |
if self.debug : |
145 |
> |
for protocol in storage_report.keys() : |
146 |
> |
print 'protocol:',protocol |
147 |
> |
for action in storage_report[protocol].keys() : |
148 |
> |
print 'action:',action,'measurement:',storage_report[protocol][action] |
149 |
> |
|
150 |
> |
##### |
151 |
> |
# throughput measurements # Fabio |
152 |
> |
throughput_report = { 'rSize':0.0, 'rTime':0.0, 'wSize':0.0, 'wTime':0.0 } |
153 |
> |
for protocol in storage_report.keys() : |
154 |
> |
for action in storage_report[protocol].keys(): |
155 |
> |
# not interesting |
156 |
> |
if 'read' not in action and 'write' not in action and 'seek' not in action: |
157 |
> |
continue |
158 |
> |
|
159 |
> |
# convert values |
160 |
> |
try: |
161 |
> |
sizeValue = float(storage_report[protocol][action][2]) |
162 |
> |
timeValue = float(storage_report[protocol][action][3]) |
163 |
> |
except Exception,e: |
164 |
> |
continue |
165 |
> |
|
166 |
> |
# aggregate data |
167 |
> |
if 'read' in action: |
168 |
> |
throughput_report['rSize'] += sizeValue |
169 |
> |
throughput_report['rTime'] += timeValue |
170 |
> |
elif 'write' in action: |
171 |
> |
throughput_report['wSize'] += sizeValue |
172 |
> |
throughput_report['wTime'] += timeValue |
173 |
> |
elif 'seek' in action: |
174 |
> |
throughput_report['rTime'] += timeValue |
175 |
> |
else: |
176 |
> |
continue |
177 |
> |
|
178 |
> |
# calculate global throughput |
179 |
> |
throughput_report['readThr'] = 'NULL' |
180 |
> |
if throughput_report['rTime'] > 0.0: |
181 |
> |
throughput_report['rTime'] /= 1000.0 # scale ms to s |
182 |
> |
throughput_report['readThr'] = float(throughput_report['rSize']/throughput_report['rTime']) |
183 |
> |
|
184 |
> |
throughput_report['avgNetThr'] = 'NULL' |
185 |
> |
try: |
186 |
> |
throughput_report['avgNetThr'] = throughput_report['rSize'] / float(jobReport.performance.summaries['ExeTime']) |
187 |
> |
except: |
188 |
> |
pass |
189 |
> |
|
190 |
> |
throughput_report['writeThr'] = 'NULL' |
191 |
> |
if throughput_report['wTime'] > 0.0: |
192 |
> |
throughput_report['wTime'] /= 1000.0 |
193 |
> |
throughput_report['writeThr'] = float(throughput_report['wSize']/throughput_report['wTime']) |
194 |
> |
##### |
195 |
> |
|
196 |
> |
if self.debug == 1 : |
197 |
> |
print storage_report |
198 |
> |
print throughput_report |
199 |
> |
return storage_report, throughput_report |
200 |
> |
|
201 |
> |
def n_of_events(self,jobReport): |
202 |
> |
''' |
203 |
> |
#Brian's patch to sent number of events procedded to the Dashboard |
204 |
> |
# Add NoEventsPerRun to the Dashboard report |
205 |
> |
''' |
206 |
> |
event_report = {} |
207 |
> |
eventsPerRun = 0 |
208 |
> |
for inputFile in jobReport.inputFiles: |
209 |
> |
try: |
210 |
> |
eventsRead = str(inputFile.get('EventsRead', 0)) |
211 |
> |
eventsRead = int(eventsRead.strip()) |
212 |
> |
except: |
213 |
> |
continue |
214 |
> |
eventsPerRun += eventsRead |
215 |
> |
event_report['NoEventsPerRun'] = eventsPerRun |
216 |
> |
event_report['NbEvPerRun'] = eventsPerRun |
217 |
> |
event_report['NEventsProcessed'] = eventsPerRun |
218 |
> |
|
219 |
> |
if self.debug == 1 : print event_report |
220 |
> |
|
221 |
> |
return event_report |
222 |
> |
|
223 |
> |
def reportDash(self,jobReport): |
224 |
> |
''' |
225 |
> |
dashboard report dictionary |
226 |
> |
''' |
227 |
> |
event_report = self.n_of_events(jobReport) |
228 |
> |
storage_report, throughput_report = self.storageStat(jobReport) |
229 |
> |
dashboard_report = {} |
230 |
> |
# |
231 |
> |
for k,v in event_report.iteritems() : |
232 |
> |
dashboard_report[k]=v |
233 |
|
|
234 |
|
# extract information to be sent to DashBoard |
235 |
|
# per protocol and for action=read, calculate MBPS |
236 |
|
# dashboard key is io_action |
237 |
< |
dashboard_report['MonitorID'] = MonitorID |
238 |
< |
dashboard_report['MonitorJobID'] = MonitorJobID |
239 |
< |
for protocol in report.keys() : |
240 |
< |
for action in report[protocol].keys() : |
241 |
< |
try: size = float(report[protocol][action][2]) |
237 |
> |
dashboard_report['MonitorID'] = self.MonitorID |
238 |
> |
dashboard_report['MonitorJobID'] = self.MonitorJobID |
239 |
> |
for protocol in storage_report.keys() : |
240 |
> |
for action in storage_report[protocol].keys() : |
241 |
> |
try: size = float(storage_report[protocol][action][2]) |
242 |
|
except: size = 'NULL' |
243 |
< |
try: time = float(report[protocol][action][3])/1000 |
243 |
> |
try: time = float(storage_report[protocol][action][3])/1000 |
244 |
|
except: time = 'NULL' |
245 |
|
dashboard_report['io_'+protocol+'_'+action] = str(size)+'_'+str(time) |
246 |
< |
|
129 |
< |
if debug : |
246 |
> |
if self.debug : |
247 |
|
ordered = dashboard_report.keys() |
248 |
|
ordered.sort() |
249 |
|
for key in ordered: |
250 |
|
print key,'=',dashboard_report[key] |
251 |
|
|
252 |
+ |
# IO throughput information |
253 |
+ |
dashboard_report['io_read_throughput'] = throughput_report['readThr'] |
254 |
+ |
dashboard_report['io_write_throughput'] = throughput_report['writeThr'] |
255 |
+ |
dashboard_report['io_netAvg_throughput'] = throughput_report['avgNetThr'] |
256 |
+ |
|
257 |
|
# send to DashBoard |
258 |
< |
apmonSend(MonitorID, MonitorJobID, dashboard_report) |
258 |
> |
apmonSend(self.MonitorID, self.MonitorJobID, dashboard_report) |
259 |
|
apmonFree() |
260 |
|
|
261 |
< |
# prepare exit string |
140 |
< |
exit_string = str(exit_status) |
141 |
< |
for key in dashboard_report.keys() : |
142 |
< |
exit_string += ';' + str(key) + '=' + str(dashboard_report[key]) |
261 |
> |
if self.debug == 1 : print dashboard_report |
262 |
|
|
263 |
< |
|
263 |
> |
return |
264 |
|
|
265 |
< |
return exit_string |
265 |
> |
def usage(self): |
266 |
> |
|
267 |
> |
msg=""" |
268 |
> |
required parameters: |
269 |
> |
--input : input FJR xml file |
270 |
> |
|
271 |
> |
optional parameters: |
272 |
> |
--dashboard : send info to the dashboard. require following args: "MonitorID,MonitorJobID" |
273 |
> |
MonitorID : DashBoard MonitorID |
274 |
> |
MonitorJobID : DashBoard MonitorJobID |
275 |
> |
--exitcode : print executable exit code |
276 |
> |
--lfn : report list of files really analyzed |
277 |
> |
--help : help |
278 |
> |
--debug : debug statements |
279 |
> |
""" |
280 |
> |
return msg |
281 |
|
|
282 |
|
|
283 |
|
if __name__ == '__main__' : |
284 |
< |
exit_status = main(sys.argv[1:]) |
285 |
< |
# output for wrapper script |
286 |
< |
print exit_status |
287 |
< |
|
288 |
< |
|
284 |
> |
try: |
285 |
> |
parseFjr_ = parseFjr(sys.argv[1:]) |
286 |
> |
parseFjr_.run() |
287 |
> |
except: |
288 |
> |
pass |