67 |
|
def run(self): |
68 |
|
|
69 |
|
# load FwkJobRep |
70 |
< |
jobReport = readJobReport(self.input)[0] |
70 |
> |
try: |
71 |
> |
jobReport = readJobReport(self.input)[0] |
72 |
> |
except: |
73 |
> |
print '50115' |
74 |
> |
sys.exit() |
75 |
> |
|
76 |
|
if self.exitCode : |
77 |
|
self.exitCodes(jobReport) |
78 |
|
if self.lfnList : |
116 |
|
''' |
117 |
|
storageStatistics = str(jobReport.storageStatistics) |
118 |
|
storage_report = {} |
119 |
+ |
|
120 |
|
# check if storageStatistics is valid |
121 |
|
if storageStatistics.find('Storage statistics:') != -1 : |
122 |
|
# report form: { 'protocol' : { 'action' : [attempted,succedeed,total-size,total-time,min-time,max-time] , ... } , ... } |
151 |
|
print 'protocol:',protocol |
152 |
|
for action in storage_report[protocol].keys() : |
153 |
|
print 'action:',action,'measurement:',storage_report[protocol][action] |
154 |
+ |
|
155 |
+ |
##### |
156 |
+ |
# throughput measurements # Fabio |
157 |
+ |
throughput_report = { 'rSize':0.0, 'rTime':0.0, 'wSize':0.0, 'wTime':0.0 } |
158 |
+ |
for protocol in storage_report.keys() : |
159 |
+ |
for action in storage_report[protocol].keys(): |
160 |
+ |
# not interesting |
161 |
+ |
if 'read' not in action and 'write' not in action and 'seek' not in action: |
162 |
+ |
continue |
163 |
+ |
|
164 |
+ |
# convert values |
165 |
+ |
try: |
166 |
+ |
sizeValue = float(storage_report[protocol][action][2]) |
167 |
+ |
timeValue = float(storage_report[protocol][action][3]) |
168 |
+ |
except Exception,e: |
169 |
+ |
continue |
170 |
|
|
171 |
< |
if self.debug == 1 : print storage_report |
172 |
< |
return storage_report |
171 |
> |
# aggregate data |
172 |
> |
if 'read' in action: |
173 |
> |
throughput_report['rSize'] += sizeValue |
174 |
> |
throughput_report['rTime'] += timeValue |
175 |
> |
elif 'write' in action: |
176 |
> |
throughput_report['wSize'] += sizeValue |
177 |
> |
throughput_report['wTime'] += timeValue |
178 |
> |
elif 'seek' in action: |
179 |
> |
throughput_report['rTime'] += timeValue |
180 |
> |
else: |
181 |
> |
continue |
182 |
> |
|
183 |
> |
# calculate global throughput |
184 |
> |
throughput_report['readThr'] = 'NULL' |
185 |
> |
if throughput_report['rTime'] > 0.0: |
186 |
> |
throughput_report['rTime'] /= 1000.0 # scale ms to s |
187 |
> |
throughput_report['readThr'] = float(throughput_report['rSize']/throughput_report['rTime']) |
188 |
> |
|
189 |
> |
throughput_report['avgNetThr'] = 'NULL' |
190 |
> |
try: |
191 |
> |
throughput_report['avgNetThr'] = throughput_report['rSize'] / float(jobReport.performance.summaries['ExeTime']) |
192 |
> |
except: |
193 |
> |
pass |
194 |
> |
|
195 |
> |
throughput_report['writeThr'] = 'NULL' |
196 |
> |
if throughput_report['wTime'] > 0.0: |
197 |
> |
throughput_report['wTime'] /= 1000.0 |
198 |
> |
throughput_report['writeThr'] = float(throughput_report['wSize']/throughput_report['wTime']) |
199 |
> |
##### |
200 |
> |
|
201 |
> |
if self.debug == 1 : |
202 |
> |
print storage_report |
203 |
> |
print throughput_report |
204 |
> |
return storage_report, throughput_report |
205 |
|
|
206 |
|
def n_of_events(self,jobReport): |
207 |
|
''' |
230 |
|
dashboard report dictionary |
231 |
|
''' |
232 |
|
event_report = self.n_of_events(jobReport) |
233 |
< |
storage_report = self.storageStat(jobReport) |
233 |
> |
storage_report, throughput_report = self.storageStat(jobReport) |
234 |
|
dashboard_report = {} |
235 |
|
# |
236 |
|
for k,v in event_report.iteritems() : |
253 |
|
ordered.sort() |
254 |
|
for key in ordered: |
255 |
|
print key,'=',dashboard_report[key] |
256 |
+ |
|
257 |
+ |
# IO throughput information |
258 |
+ |
dashboard_report['io_read_throughput'] = throughput_report['readThr'] |
259 |
+ |
dashboard_report['io_write_throughput'] = throughput_report['writeThr'] |
260 |
+ |
dashboard_report['io_netAvg_throughput'] = throughput_report['avgNetThr'] |
261 |
|
|
262 |
|
# send to DashBoard |
263 |
|
apmonSend(self.MonitorID, self.MonitorJobID, dashboard_report) |