1 |
|
import os, common, string |
2 |
|
from Actor import * |
3 |
|
from crab_util import * |
4 |
+ |
from ProdCommon.FwkJobRep.ReportParser import readJobReport |
5 |
+ |
|
6 |
+ |
try: # FUTURE: Python 2.6, prior to 2.6 requires simplejson |
7 |
+ |
import json |
8 |
+ |
except: |
9 |
+ |
import simplejson as json |
10 |
+ |
|
11 |
|
|
12 |
|
class Reporter(Actor): |
13 |
|
""" A class to report a short summary of the info of a task, including what |
37 |
|
#print endpoint, lfn, SE, SE_PATH, user |
38 |
|
|
39 |
|
msg+= "SE: %s %s srmPath: %s\n"%(self.cfg_params['USER.storage_element'],SE,endpoint) |
40 |
< |
|
40 |
> |
|
41 |
|
else: |
42 |
|
msg+= "Local output: %s"%task['outputDirectory'] |
43 |
|
#print task |
37 |
– |
from ProdCommon.FwkJobRep.ReportParser import readJobReport |
44 |
|
possible_status = [ 'Created', |
45 |
|
'Undefined', |
46 |
|
'Submitting', |
63 |
|
eventsRequired=0 |
64 |
|
filesRead=0 |
65 |
|
filesRequired=0 |
66 |
+ |
runsAndLumis = {} |
67 |
|
for job in task.getJobs(): |
68 |
|
if (job.runningJob['applicationReturnCode']>0 or job.runningJob['wrapperReturnCode']>0): continue |
69 |
|
# get FJR filename |
70 |
|
fjr=task['outputDirectory']+job['outputFiles'][-1] |
71 |
|
jobReport = readJobReport(fjr) |
72 |
< |
if len(jobReport)>0: |
73 |
< |
inputFiles=jobReport[0].inputFiles |
72 |
> |
if len(jobReport) > 0: |
73 |
> |
inputFiles = jobReport[0].inputFiles |
74 |
|
for inputFile in inputFiles: |
75 |
< |
runs=inputFile.runs |
76 |
< |
#print [inputFile[it] for it in ['LFN','EventsRead']] |
77 |
< |
# print "FileIn :",inputFile['LFN'],": Events",inputFile['EventsRead'] |
78 |
< |
# for run in runs.keys(): |
79 |
< |
# print "Run",run,": lumi sections",runs[run] |
75 |
> |
# Accumulate the list of lum sections run over |
76 |
> |
for run in inputFile.runs.keys(): |
77 |
> |
if not runsAndLumis.has_key(run): |
78 |
> |
runsAndLumis[run] = set() |
79 |
> |
for lumi in inputFile.runs[run]: |
80 |
> |
runsAndLumis[run].add(lumi) |
81 |
|
filesRead+=1 |
82 |
|
eventsRead+=int(inputFile['EventsRead']) |
83 |
|
#print jobReport[0].inputFiles,'\n' |
85 |
|
pass |
86 |
|
#print 'no FJR avaialble for job #%s'%job['jobId'] |
87 |
|
#print "--------------------------" |
88 |
< |
#msg+= "Total Events read: %s required: %s\n"%(eventsRead,eventsRequired) |
89 |
< |
msg+= "Total Events read: %s \n"%eventsRead |
90 |
< |
#msg+= "Total Files read: %s required: %s\n"%(filesRead,filesRequired) |
91 |
< |
msg+= "Total Files read: %s \n"%filesRead |
92 |
< |
msg+= "Total Jobs : %s \n"%len(task.getJobs()) |
88 |
> |
|
89 |
> |
# Sort, compact, and write the list of successful lumis |
90 |
> |
compactList = {} |
91 |
> |
|
92 |
> |
for run in runsAndLumis.keys(): |
93 |
> |
lastLumi = -1000 |
94 |
> |
compactList[run] = [] |
95 |
> |
|
96 |
> |
lumiList = list(runsAndLumis[run]) |
97 |
> |
lumiList.sort() |
98 |
> |
for lumi in lumiList: |
99 |
> |
if lumi != lastLumi + 1: # Break in sequence from last lumi |
100 |
> |
compactList[run].append([lumi,lumi]) |
101 |
> |
else: |
102 |
> |
compactList[run][len(compactList[run])-1][1] = lumi |
103 |
> |
lastLumi = lumi |
104 |
> |
|
105 |
> |
lumiFilename = task['outputDirectory'] + 'lumiSummary.json' |
106 |
> |
lumiSummary = open(lumiFilename, 'w') |
107 |
> |
json.dump(compactList, lumiSummary) |
108 |
> |
lumiSummary.write('\n') |
109 |
> |
lumiSummary.close() |
110 |
> |
|
111 |
> |
msg += "Total Events read: %s\n" % eventsRead |
112 |
> |
msg += "Total Files read: %s\n" % filesRead |
113 |
> |
msg += "Total Jobs : %s\n" % len(task.getJobs()) |
114 |
> |
msg += "Luminosity section summary file: %s\n" % lumiFilename |
115 |
|
list_ID={} |
116 |
|
task = common.scheduler.queryEverything(task['id']) |
117 |
|
for st in possible_status: |
120 |
|
msg+= " # Jobs: %s:%s\n"%(str(st),len(list_ID)) |
121 |
|
pass |
122 |
|
msg+= "\n----------------------------\n" |
123 |
< |
common.logger.info(msg) |
124 |
< |
return |
123 |
> |
common.logger.info(msg) |
124 |
> |
return |
125 |
> |
|
126 |
|
|
96 |
– |
|