ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Reporter.py
Revision: 1.13
Committed: Mon Dec 14 22:33:54 2009 UTC (15 years, 4 months ago) by ewv
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_7_1_pre5, CRAB_2_7_1_wmbs_pre4, CRAB_2_7_1_pre4, CRAB_2_7_1_pre3, CRAB_2_7_1_pre2, CRAB_2_7_1_pre1
Branch point for: CRAB_2_7_1_branch, LumiMask
Changes since 1.12: +47 -17 lines
Log Message:
Lumi related changes for 2.7: first_run replaced by first_lumi, defaults to 1 and -report write lumiSummary.json to res/ directory

File Contents

# Content
1 import os, common, string
2 from Actor import *
3 from crab_util import *
4 from ProdCommon.FwkJobRep.ReportParser import readJobReport
5
6 try: # FUTURE: Python 2.6, prior to 2.6 requires simplejson
7 import json
8 except:
9 import simplejson as json
10
11
12 class Reporter(Actor):
13 """ A class to report a short summary of the info of a task, including what
14 is needed for user analysis, such as #events requestes/done, integrated
15 lumi and so one.
16 """
17 def __init__(self, cfg_params):
18 self.cfg_params = cfg_params
19 return
20
21 def run(self):
22 """
23 The main method of the class: report status of a task
24 """
25 common.logger.debug( "Reporter::run() called")
26 task = common._db.getTask()
27
28 msg= "--------------------\n"
29 msg += "Dataset: %s\n"%str(task['dataset'])
30 if self.cfg_params.has_key('USER.copy_data') and int(self.cfg_params['USER.copy_data'])==1:
31 msg+= "Remote output :\n"
32 ## TODO: SL should come from jobDB!
33 from PhEDExDatasvcInfo import PhEDExDatasvcInfo
34
35 stageout = PhEDExDatasvcInfo(self.cfg_params)
36 endpoint, lfn, SE, SE_PATH, user = stageout.getEndpoint()
37 #print endpoint, lfn, SE, SE_PATH, user
38
39 msg+= "SE: %s %s srmPath: %s\n"%(self.cfg_params['USER.storage_element'],SE,endpoint)
40
41 else:
42 msg+= "Local output: %s"%task['outputDirectory']
43 #print task
44 possible_status = [ 'Created',
45 'Undefined',
46 'Submitting',
47 'Submitted',
48 'NotSubmitted',
49 'Waiting',
50 'Ready',
51 'Scheduled',
52 'Running',
53 'Done',
54 'Killing',
55 'Killed',
56 'Aborted',
57 'Unknown',
58 'Done (Failed)',
59 'Cleared',
60 'Retrieved'
61 ]
62 eventsRead=0
63 eventsRequired=0
64 filesRead=0
65 filesRequired=0
66 runsAndLumis = {}
67 for job in task.getJobs():
68 if (job.runningJob['applicationReturnCode']>0 or job.runningJob['wrapperReturnCode']>0): continue
69 # get FJR filename
70 fjr=task['outputDirectory']+job['outputFiles'][-1]
71 jobReport = readJobReport(fjr)
72 if len(jobReport) > 0:
73 inputFiles = jobReport[0].inputFiles
74 for inputFile in inputFiles:
75 # Accumulate the list of lum sections run over
76 for run in inputFile.runs.keys():
77 if not runsAndLumis.has_key(run):
78 runsAndLumis[run] = set()
79 for lumi in inputFile.runs[run]:
80 runsAndLumis[run].add(lumi)
81 filesRead+=1
82 eventsRead+=int(inputFile['EventsRead'])
83 #print jobReport[0].inputFiles,'\n'
84 else:
85 pass
86 #print 'no FJR avaialble for job #%s'%job['jobId']
87 #print "--------------------------"
88
89 # Sort, compact, and write the list of successful lumis
90 compactList = {}
91
92 for run in runsAndLumis.keys():
93 lastLumi = -1000
94 compactList[run] = []
95
96 lumiList = list(runsAndLumis[run])
97 lumiList.sort()
98 for lumi in lumiList:
99 if lumi != lastLumi + 1: # Break in sequence from last lumi
100 compactList[run].append([lumi,lumi])
101 else:
102 compactList[run][len(compactList[run])-1][1] = lumi
103 lastLumi = lumi
104
105 lumiFilename = task['outputDirectory'] + 'lumiSummary.json'
106 lumiSummary = open(lumiFilename, 'w')
107 json.dump(compactList, lumiSummary)
108 lumiSummary.write('\n')
109 lumiSummary.close()
110
111 msg += "Total Events read: %s\n" % eventsRead
112 msg += "Total Files read: %s\n" % filesRead
113 msg += "Total Jobs : %s\n" % len(task.getJobs())
114 msg += "Luminosity section summary file: %s\n" % lumiFilename
115 list_ID={}
116 task = common.scheduler.queryEverything(task['id'])
117 for st in possible_status:
118 list_ID = common._db.queryAttrRunJob({'statusScheduler':st},'jobId')
119 if (len(list_ID)>0):
120 msg+= " # Jobs: %s:%s\n"%(str(st),len(list_ID))
121 pass
122 msg+= "\n----------------------------\n"
123 common.logger.info(msg)
124 return
125
126