3 |
|
import sys, getopt, string |
4 |
|
|
5 |
|
from FwkJobRep.ReportParser import readJobReport |
6 |
+ |
from DashboardAPI import apmonSend, apmonFree |
7 |
+ |
|
8 |
|
|
9 |
|
def main(argv) : |
10 |
|
""" |
11 |
|
parseCrabFjr |
12 |
|
|
13 |
< |
parse CRAB FrameworkJobReport on WN and return parameters to WN wrapper script |
14 |
< |
|
15 |
< |
prints information separated by semi-colon in fixed order: |
14 |
< |
|
15 |
< |
1. ExitStatus (0 or ExitStatus from CMSSW) |
13 |
> |
- parse CRAB FrameworkJobReport on WN: { 'protocol' : { 'action' : [attempted,succedeed,total-size,total-time,min-time,max-time] , ... } , ... } |
14 |
> |
- report parameters to DashBoard using DashBoardApi.py: for all 'read' actions of all protocols, report MBPS |
15 |
> |
- return ExitStatus and dump of DashBoard report separated by semi-colon to WN wrapper script |
16 |
|
|
17 |
|
required parameters: |
18 |
|
--input : input FJR xml file |
19 |
+ |
--MonitorID : DashBoard MonitorID |
20 |
+ |
--MonitorJobID : DashBoard MonitorJobID |
21 |
|
|
22 |
|
optional parameters: |
23 |
|
--help : help |
27 |
|
|
28 |
|
# defaults |
29 |
|
input = '' |
30 |
+ |
MonitorID = '' |
31 |
+ |
MonitorJobID = '' |
32 |
|
debug = 0 |
33 |
|
|
34 |
|
try: |
35 |
< |
opts, args = getopt.getopt(argv, "", ["input=", "debug", "help"]) |
35 |
> |
opts, args = getopt.getopt(argv, "", ["input=", "MonitorID=", "MonitorJobID=", "debug", "help"]) |
36 |
|
except getopt.GetoptError: |
37 |
|
print main.__doc__ |
38 |
|
sys.exit(2) |
44 |
|
sys.exit() |
45 |
|
elif opt == "--input" : |
46 |
|
input = arg |
47 |
+ |
elif opt == "--MonitorID" : |
48 |
+ |
MonitorID = arg |
49 |
+ |
elif opt == "--MonitorJobID" : |
50 |
+ |
MonitorJobID = arg |
51 |
|
elif opt == "--debug" : |
52 |
|
debug = 1 |
53 |
|
|
54 |
< |
if input == '': |
54 |
> |
if input == '' or MonitorID == '' or MonitorJobID == '': |
55 |
|
print main.__doc__ |
56 |
|
sys.exit() |
57 |
|
|
58 |
|
# load FwkJobRep |
59 |
|
jobReport = readJobReport(input)[0] |
60 |
|
|
61 |
< |
report = [] |
61 |
> |
exit_satus = '' |
62 |
|
|
63 |
|
# get ExitStatus of last error |
64 |
|
if len(jobReport.errors) != 0 : |
65 |
< |
report.append(str(jobReport.errors[-1]['ExitStatus'])) |
65 |
> |
exit_status = str(jobReport.errors[-1]['ExitStatus']) |
66 |
|
else : |
67 |
< |
report.append(str(0)) |
67 |
> |
exit_status = str(0) |
68 |
|
|
69 |
|
# get i/o statistics |
70 |
|
storageStatistics = str(jobReport.storageStatistics) |
71 |
|
|
72 |
< |
print ';'.join(report) |
73 |
< |
|
72 |
> |
# dashboard report dictionary |
73 |
> |
dashboard_report = {} |
74 |
> |
|
75 |
> |
# check if storageStatistics is valid |
76 |
> |
if storageStatistics.find('Storage statistics:') != -1 : |
77 |
> |
# report form: { 'protocol' : { 'action' : [attempted,succedeed,total-size,total-time,min-time,max-time] , ... } , ... } |
78 |
> |
report = {} |
79 |
> |
data = storageStatistics.split('Storage statistics:')[1] |
80 |
> |
data_fields = data.split(';') |
81 |
> |
for data_field in data_fields: |
82 |
> |
# parse: format protocol/action = attepted/succedeed/total-size/total-time/min-time/max-time |
83 |
> |
key = data_field.split('=')[0].strip() |
84 |
> |
item = data_field.split('=')[1].strip() |
85 |
> |
protocol = str(key.split('/')[0].strip()) |
86 |
> |
action = str(key.split('/')[1].strip()) |
87 |
> |
item_array = item.split('/') |
88 |
> |
attempted = str(item_array[0].strip()) |
89 |
> |
succeeded = str(item_array[1].strip()) |
90 |
> |
total_size = str(item_array[2].strip().split('MB')[0]) |
91 |
> |
total_time = str(item_array[3].strip().split('ms')[0]) |
92 |
> |
min_time = str(item_array[4].strip().split('ms')[0]) |
93 |
> |
max_time = str(item_array[5].strip().split('ms')[0]) |
94 |
> |
# add to report |
95 |
> |
if protocol in report.keys() : |
96 |
> |
if action in report[protocol].keys() : |
97 |
> |
print 'protocol/action:',protocol,'/',action,'listed twice in report, taking the first' |
98 |
> |
else : |
99 |
> |
report[protocol][action] = [attempted,succeeded,total_size,total_time,min_time,max_time] |
100 |
> |
else : |
101 |
> |
report[protocol] = {'action' : [attempted,succeeded,total_size,total_time,min_time,max_time] } |
102 |
> |
|
103 |
> |
if debug : |
104 |
> |
for protocol in report.keys() : |
105 |
> |
print 'protocol:',protocol |
106 |
> |
for action in report[protocol].keys() : |
107 |
> |
print 'action:',action,'measurement:',report[protocol][action] |
108 |
> |
|
109 |
> |
# extract information to be sent to DashBoard |
110 |
> |
# per protocol and for action=read, calculate MBPS |
111 |
> |
# dashboard key is io_action |
112 |
> |
dashboard_report['MonitorID'] = MonitorID |
113 |
> |
dashboard_report['MonitorJobID'] = MonitorJobID |
114 |
> |
for protocol in report.keys() : |
115 |
> |
if 'read' in report[protocol].keys() : |
116 |
> |
try: |
117 |
> |
size = float(report[protocol]['read'][2]) |
118 |
> |
time = float(report[protocol]['read'][3]) |
119 |
> |
dashboard_report['io_'+protocol] = size/time*1000 |
120 |
> |
except: |
121 |
> |
pass |
122 |
> |
|
123 |
> |
if debug : |
124 |
> |
print dashboard_report |
125 |
> |
|
126 |
> |
# send to DashBoard |
127 |
> |
apmonSend(MonitorID, MonitorJobID, dashboard_report) |
128 |
> |
apmonFree() |
129 |
> |
|
130 |
> |
# prepare exit string |
131 |
> |
exit_string = str(exit_status) |
132 |
> |
for key in dashboard_report.keys() : |
133 |
> |
exit_string += ';' + str(key) + '=' + str(dashboard_report[key]) |
134 |
> |
|
135 |
> |
return exit_string |
136 |
|
|
137 |
|
|
138 |
|
if __name__ == '__main__' : |
139 |
< |
main(sys.argv[1:]) |
139 |
> |
exit_status = main(sys.argv[1:]) |
140 |
> |
# output for wrapper script |
141 |
> |
print exit_status |
142 |
|
|
143 |
|
|