2 |
|
|
3 |
|
import sys, getopt, string |
4 |
|
|
5 |
< |
from FwkJobRep.ReportParser import readJobReport |
5 |
> |
from ProdCommon.FwkJobRep.ReportParser import readJobReport |
6 |
|
from DashboardAPI import apmonSend, apmonFree |
7 |
|
|
8 |
< |
|
9 |
< |
def main(argv) : |
10 |
< |
""" |
11 |
< |
parseCrabFjr |
12 |
< |
|
13 |
< |
- parse CRAB FrameworkJobReport on WN: { 'protocol' : { 'action' : [attempted,succedeed,total-size,total-time,min-time,max-time] , ... } , ... } |
14 |
< |
- report parameters to DashBoard using DashBoardApi.py: for all 'read' actions of all protocols, report MBPS |
15 |
< |
- return ExitStatus and dump of DashBoard report separated by semi-colon to WN wrapper script |
16 |
< |
|
17 |
< |
required parameters: |
18 |
< |
--input : input FJR xml file |
19 |
< |
--MonitorID : DashBoard MonitorID |
20 |
< |
--MonitorJobID : DashBoard MonitorJobID |
21 |
< |
|
22 |
< |
optional parameters: |
23 |
< |
--help : help |
24 |
< |
--debug : debug statements |
25 |
< |
|
26 |
< |
""" |
27 |
< |
|
28 |
< |
# defaults |
29 |
< |
input = '' |
30 |
< |
MonitorID = '' |
31 |
< |
MonitorJobID = '' |
32 |
< |
debug = 0 |
33 |
< |
|
34 |
< |
try: |
35 |
< |
opts, args = getopt.getopt(argv, "", ["input=", "MonitorID=", "MonitorJobID=", "debug", "help"]) |
36 |
< |
except getopt.GetoptError: |
37 |
< |
print main.__doc__ |
38 |
< |
sys.exit(2) |
39 |
< |
|
40 |
< |
# check command line parameter |
41 |
< |
for opt, arg in opts : |
42 |
< |
if opt == "--help" : |
43 |
< |
print main.__doc__ |
8 |
> |
class parseFjr: |
9 |
> |
def __init__(self, argv): |
10 |
> |
""" |
11 |
> |
parseCrabFjr |
12 |
> |
|
13 |
> |
- parse CRAB FrameworkJobReport on WN: { 'protocol' : { 'action' : [attempted,succedeed,total-size,total-time,min-time,max-time] , ... } , ... } |
14 |
> |
- report parameters to DashBoard using DashBoardApi.py: for all 'read' actions of all protocols, report MBPS |
15 |
> |
- return ExitStatus and dump of DashBoard report separated by semi-colon to WN wrapper script |
16 |
> |
""" |
17 |
> |
# defaults |
18 |
> |
self.input = '' |
19 |
> |
self.MonitorID = '' |
20 |
> |
self.MonitorJobID = '' |
21 |
> |
self.info2dash = False |
22 |
> |
self.exitCode = False |
23 |
> |
self.lfnList = False |
24 |
> |
self.debug = 0 |
25 |
> |
try: |
26 |
> |
opts, args = getopt.getopt(argv, "", ["input=", "dashboard=", "exitcode", "lfn" , "debug", "help"]) |
27 |
> |
except getopt.GetoptError: |
28 |
> |
print self.usage() |
29 |
> |
sys.exit(2) |
30 |
> |
self.check(opts) |
31 |
> |
|
32 |
> |
return |
33 |
> |
|
34 |
> |
def check(self,opts): |
35 |
> |
# check command line parameter |
36 |
> |
for opt, arg in opts : |
37 |
> |
if opt == "--help" : |
38 |
> |
print self.usage() |
39 |
> |
sys.exit() |
40 |
> |
elif opt == "--input" : |
41 |
> |
self.input = arg |
42 |
> |
elif opt == "--exitcode": |
43 |
> |
self.exitCode = True |
44 |
> |
elif opt == "--lfn": |
45 |
> |
self.lfnList = True |
46 |
> |
elif opt == "--dashboard": |
47 |
> |
self.info2dash = True |
48 |
> |
try: |
49 |
> |
self.MonitorID = arg.split(",")[0] |
50 |
> |
self.MonitorJobID = arg.split(",")[1] |
51 |
> |
except: |
52 |
> |
self.MonitorID = '' |
53 |
> |
self.MonitorJobID = '' |
54 |
> |
elif opt == "--debug" : |
55 |
> |
self.debug = 1 |
56 |
> |
|
57 |
> |
if self.input == '' or (not self.info2dash and not self.lfnList and not self.exitCode) : |
58 |
> |
print self.usage() |
59 |
|
sys.exit() |
60 |
< |
elif opt == "--input" : |
61 |
< |
input = arg |
62 |
< |
elif opt == "--MonitorID" : |
63 |
< |
MonitorID = arg |
64 |
< |
elif opt == "--MonitorJobID" : |
65 |
< |
MonitorJobID = arg |
66 |
< |
elif opt == "--debug" : |
67 |
< |
debug = 1 |
68 |
< |
|
69 |
< |
if input == '' or MonitorID == '' or MonitorJobID == '': |
70 |
< |
print main.__doc__ |
71 |
< |
sys.exit() |
72 |
< |
|
73 |
< |
# load FwkJobRep |
74 |
< |
jobReport = readJobReport(input)[0] |
75 |
< |
|
76 |
< |
exit_satus = '' |
77 |
< |
|
78 |
< |
# get ExitStatus of last error |
79 |
< |
if len(jobReport.errors) != 0 : |
80 |
< |
exit_status = str(jobReport.errors[-1]['ExitStatus']) |
81 |
< |
else : |
82 |
< |
exit_status = str(0) |
83 |
< |
|
84 |
< |
# get i/o statistics |
85 |
< |
storageStatistics = str(jobReport.storageStatistics) |
86 |
< |
|
87 |
< |
# dashboard report dictionary |
88 |
< |
dashboard_report = {} |
89 |
< |
|
90 |
< |
# check if storageStatistics is valid |
91 |
< |
if storageStatistics.find('Storage statistics:') != -1 : |
77 |
< |
# report form: { 'protocol' : { 'action' : [attempted,succedeed,total-size,total-time,min-time,max-time] , ... } , ... } |
78 |
< |
report = {} |
79 |
< |
data = storageStatistics.split('Storage statistics:')[1] |
80 |
< |
data_fields = data.split(';') |
81 |
< |
for data_field in data_fields: |
82 |
< |
# parse: format protocol/action = attepted/succedeed/total-size/total-time/min-time/max-time |
83 |
< |
key = data_field.split('=')[0].strip() |
84 |
< |
item = data_field.split('=')[1].strip() |
85 |
< |
protocol = str(key.split('/')[0].strip()) |
86 |
< |
action = str(key.split('/')[1].strip()) |
87 |
< |
item_array = item.split('/') |
88 |
< |
attempted = str(item_array[0].strip()) |
89 |
< |
succeeded = str(item_array[1].strip()) |
90 |
< |
total_size = str(item_array[2].strip().split('MB')[0]) |
91 |
< |
total_time = str(item_array[3].strip().split('ms')[0]) |
92 |
< |
min_time = str(item_array[4].strip().split('ms')[0]) |
93 |
< |
max_time = str(item_array[5].strip().split('ms')[0]) |
94 |
< |
# add to report |
95 |
< |
if protocol in report.keys() : |
96 |
< |
if action in report[protocol].keys() : |
97 |
< |
print 'protocol/action:',protocol,'/',action,'listed twice in report, taking the first' |
98 |
< |
else : |
99 |
< |
report[protocol][action] = [attempted,succeeded,total_size,total_time,min_time,max_time] |
60 |
> |
|
61 |
> |
if self.info2dash: |
62 |
> |
if self.MonitorID == '' or self.MonitorJobID == '': |
63 |
> |
print self.usage() |
64 |
> |
sys.exit() |
65 |
> |
return |
66 |
> |
|
67 |
> |
def run(self): |
68 |
> |
|
69 |
> |
# load FwkJobRep |
70 |
> |
jobReport = readJobReport(self.input)[0] |
71 |
> |
if self.exitCode : |
72 |
> |
self.exitCodes(jobReport) |
73 |
> |
if self.lfnList : |
74 |
> |
self.lfn_List(jobReport) |
75 |
> |
if self.info2dash : |
76 |
> |
self.reportDash(jobReport) |
77 |
> |
return |
78 |
> |
|
79 |
> |
def exitCodes(self, jobReport): |
80 |
> |
|
81 |
> |
exit_status = '' |
82 |
> |
##### temporary fix for FJR incomplete #### |
83 |
> |
fjr = open (self.input) |
84 |
> |
len_fjr = len(fjr.readlines()) |
85 |
> |
if (len_fjr <= 6): |
86 |
> |
### 50115 - cmsRun did not produce a valid/readable job report at runtime |
87 |
> |
exit_status = str(50115) |
88 |
> |
else: |
89 |
> |
# get ExitStatus of last error |
90 |
> |
if len(jobReport.errors) != 0 : |
91 |
> |
exit_status = str(jobReport.errors[-1]['ExitStatus']) |
92 |
|
else : |
93 |
< |
report[protocol] = {'action' : [attempted,succeeded,total_size,total_time,min_time,max_time] } |
94 |
< |
|
95 |
< |
if debug : |
96 |
< |
for protocol in report.keys() : |
97 |
< |
print 'protocol:',protocol |
98 |
< |
for action in report[protocol].keys() : |
99 |
< |
print 'action:',action,'measurement:',report[protocol][action] |
93 |
> |
exit_status = str(0) |
94 |
> |
#check exit code |
95 |
> |
if string.strip(exit_status) == '': exit_status = -999 |
96 |
> |
print exit_status |
97 |
> |
|
98 |
> |
return |
99 |
> |
|
100 |
> |
def lfn_List(self,jobReport): |
101 |
> |
''' |
102 |
> |
get list of analyzed files |
103 |
> |
''' |
104 |
> |
lfnlist = [x['LFN'] for x in jobReport.inputFiles] |
105 |
> |
for file in lfnlist: print file |
106 |
> |
return |
107 |
> |
|
108 |
> |
def storageStat(self,jobReport): |
109 |
> |
''' |
110 |
> |
get i/o statistics |
111 |
> |
''' |
112 |
> |
storageStatistics = str(jobReport.storageStatistics) |
113 |
> |
storage_report = {} |
114 |
> |
# check if storageStatistics is valid |
115 |
> |
if storageStatistics.find('Storage statistics:') != -1 : |
116 |
> |
# report form: { 'protocol' : { 'action' : [attempted,succedeed,total-size,total-time,min-time,max-time] , ... } , ... } |
117 |
> |
data = storageStatistics.split('Storage statistics:')[1] |
118 |
> |
data_fields = data.split(';') |
119 |
> |
for data_field in data_fields: |
120 |
> |
# parse: format protocol/action = attepted/succedeed/total-size/total-time/min-time/max-time |
121 |
> |
if data_field == ' ' or not data_field or data_field == '': |
122 |
> |
continue |
123 |
> |
key = data_field.split('=')[0].strip() |
124 |
> |
item = data_field.split('=')[1].strip() |
125 |
> |
protocol = str(key.split('/')[0].strip()) |
126 |
> |
action = str(key.split('/')[1].strip()) |
127 |
> |
item_array = item.split('/') |
128 |
> |
attempted = str(item_array[0].strip()) |
129 |
> |
succeeded = str(item_array[1].strip()) |
130 |
> |
total_size = str(item_array[2].strip().split('MB')[0]) |
131 |
> |
total_time = str(item_array[3].strip().split('ms')[0]) |
132 |
> |
min_time = str(item_array[4].strip().split('ms')[0]) |
133 |
> |
max_time = str(item_array[5].strip().split('ms')[0]) |
134 |
> |
# add to report |
135 |
> |
if protocol in storage_report.keys() : |
136 |
> |
if action in storage_report[protocol].keys() : |
137 |
> |
print 'protocol/action:',protocol,'/',action,'listed twice in report, taking the first' |
138 |
> |
else : |
139 |
> |
storage_report[protocol][action] = [attempted,succeeded,total_size,total_time,min_time,max_time] |
140 |
> |
else : |
141 |
> |
storage_report[protocol] = {action : [attempted,succeeded,total_size,total_time,min_time,max_time] } |
142 |
> |
|
143 |
> |
if self.debug : |
144 |
> |
for protocol in storage_report.keys() : |
145 |
> |
print 'protocol:',protocol |
146 |
> |
for action in storage_report[protocol].keys() : |
147 |
> |
print 'action:',action,'measurement:',storage_report[protocol][action] |
148 |
> |
|
149 |
> |
if self.debug == 1 : print storage_report |
150 |
> |
return storage_report |
151 |
> |
|
152 |
> |
def n_of_events(self,jobReport): |
153 |
> |
''' |
154 |
> |
#Brian's patch to sent number of events procedded to the Dashboard |
155 |
> |
# Add NoEventsPerRun to the Dashboard report |
156 |
> |
''' |
157 |
> |
event_report = {} |
158 |
> |
eventsPerRun = 0 |
159 |
> |
for inputFile in jobReport.inputFiles: |
160 |
> |
try: |
161 |
> |
eventsRead = str(inputFile.get('EventsRead', 0)) |
162 |
> |
eventsRead = int(eventsRead.strip()) |
163 |
> |
except: |
164 |
> |
continue |
165 |
> |
eventsPerRun += eventsRead |
166 |
> |
event_report['NoEventsPerRun'] = eventsPerRun |
167 |
> |
event_report['NbEvPerRun'] = eventsPerRun |
168 |
> |
event_report['NEventsProcessed'] = eventsPerRun |
169 |
> |
|
170 |
> |
if self.debug == 1 : print event_report |
171 |
> |
|
172 |
> |
return event_report |
173 |
> |
|
174 |
> |
def reportDash(self,jobReport): |
175 |
> |
''' |
176 |
> |
dashboard report dictionary |
177 |
> |
''' |
178 |
> |
event_report = self.n_of_events(jobReport) |
179 |
> |
storage_report = self.storageStat(jobReport) |
180 |
> |
dashboard_report = {} |
181 |
> |
# |
182 |
> |
for k,v in event_report.iteritems() : |
183 |
> |
dashboard_report[k]=v |
184 |
|
|
185 |
|
# extract information to be sent to DashBoard |
186 |
|
# per protocol and for action=read, calculate MBPS |
187 |
|
# dashboard key is io_action |
188 |
< |
dashboard_report['MonitorID'] = MonitorID |
189 |
< |
dashboard_report['MonitorJobID'] = MonitorJobID |
190 |
< |
for protocol in report.keys() : |
191 |
< |
if 'read' in report[protocol].keys() : |
192 |
< |
try: |
193 |
< |
size = float(report[protocol]['read'][2]) |
194 |
< |
time = float(report[protocol]['read'][3]) |
195 |
< |
dashboard_report['io_'+protocol] = size/time*1000 |
196 |
< |
except: |
197 |
< |
pass |
198 |
< |
|
199 |
< |
if debug : |
200 |
< |
print dashboard_report |
201 |
< |
|
188 |
> |
dashboard_report['MonitorID'] = self.MonitorID |
189 |
> |
dashboard_report['MonitorJobID'] = self.MonitorJobID |
190 |
> |
for protocol in storage_report.keys() : |
191 |
> |
for action in storage_report[protocol].keys() : |
192 |
> |
try: size = float(storage_report[protocol][action][2]) |
193 |
> |
except: size = 'NULL' |
194 |
> |
try: time = float(storage_report[protocol][action][3])/1000 |
195 |
> |
except: time = 'NULL' |
196 |
> |
dashboard_report['io_'+protocol+'_'+action] = str(size)+'_'+str(time) |
197 |
> |
if self.debug : |
198 |
> |
ordered = dashboard_report.keys() |
199 |
> |
ordered.sort() |
200 |
> |
for key in ordered: |
201 |
> |
print key,'=',dashboard_report[key] |
202 |
> |
|
203 |
|
# send to DashBoard |
204 |
< |
apmonSend(MonitorID, MonitorJobID, dashboard_report) |
204 |
> |
apmonSend(self.MonitorID, self.MonitorJobID, dashboard_report) |
205 |
|
apmonFree() |
206 |
|
|
207 |
< |
# prepare exit string |
131 |
< |
exit_string = str(exit_status) |
132 |
< |
for key in dashboard_report.keys() : |
133 |
< |
exit_string += ';' + str(key) + '=' + str(dashboard_report[key]) |
207 |
> |
if self.debug == 1 : print dashboard_report |
208 |
|
|
209 |
< |
return exit_string |
209 |
> |
return |
210 |
|
|
211 |
+ |
def usage(self): |
212 |
+ |
|
213 |
+ |
msg=""" |
214 |
+ |
required parameters: |
215 |
+ |
--input : input FJR xml file |
216 |
+ |
|
217 |
+ |
optional parameters: |
218 |
+ |
--dashboard : send info to the dashboard. require following args: "MonitorID,MonitorJobID" |
219 |
+ |
MonitorID : DashBoard MonitorID |
220 |
+ |
MonitorJobID : DashBoard MonitorJobID |
221 |
+ |
--exitcode : print executable exit code |
222 |
+ |
--lfn : report list of files really analyzed |
223 |
+ |
--help : help |
224 |
+ |
--debug : debug statements |
225 |
+ |
""" |
226 |
+ |
return msg |
227 |
|
|
138 |
– |
if __name__ == '__main__' : |
139 |
– |
exit_status = main(sys.argv[1:]) |
140 |
– |
# output for wrapper script |
141 |
– |
print exit_status |
228 |
|
|
229 |
< |
|
229 |
> |
if __name__ == '__main__' : |
230 |
> |
try: |
231 |
> |
parseFjr_ = parseFjr(sys.argv[1:]) |
232 |
> |
parseFjr_.run() |
233 |
> |
except: |
234 |
> |
pass |