ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Reporter.py
Revision: 1.20
Committed: Fri Sep 21 13:44:13 2012 UTC (12 years, 7 months ago) by fanzago
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_8_3_pre3, CRAB_2_8_3_pre2, CRAB_2_8_3_pre1
Changes since 1.19: +7 -2 lines
Log Message:
changed some messages

File Contents

# Content
1 import os, common, string
2 from Actor import *
3 from crab_util import *
4 from ProdCommon.FwkJobRep.ReportParser import readJobReport
5 try: # Can remove when CMSSW 3.7 and earlier are dropped
6 from FWCore.PythonUtilities.LumiList import LumiList
7 except ImportError:
8 from LumiList import LumiList
9
10 try: # FUTURE: Python 2.6, prior to 2.6 requires simplejson
11 import json
12 except:
13 import simplejson as json
14
15
16 class Reporter(Actor):
17 """ A class to report a short summary of the info of a task, including what
18 is needed for user analysis, such as #events requestes/done, integrated
19 lumi and so one.
20 """
21 def __init__(self, cfg_params):
22 self.cfg_params = cfg_params
23 self.fjrDirectory = cfg_params.get('USER.outputdir' ,
24 common.work_space.resDir()) + '/'
25 return
26
27 def getInputRunLumi(self, file):
28 import xml.dom.minidom
29
30 dom = xml.dom.minidom.parse(file)
31 ll=[]
32
33 for elem in dom.getElementsByTagName("Job"):
34 nJob = int(elem.getAttribute("JobID"))
35 #print "---> nJob = ", nJob
36 lumis = elem.getAttribute('Lumis')
37 #print "--->>> lumis = ", str(lumis)
38 #lumis = '193752:1'
39 #lumis = '193752:1-193752:5,193774:1-193774:5,193775:1'
40 if lumis:
41 tmp=str.split(str(lumis), ",")
42 #print "tmp = ", tmp
43 else:
44 return
45
46
47 #tmp = [193752:1-193752:5] [193774:1-193774:5]
48 for entry in tmp:
49 run_lumi=str.split(entry, "-")
50 # run_lumi = [193752:1] [193752:5]
51 #print"run_lumi = ", run_lumi
52 if len(run_lumi) == 0: pass
53 if len(run_lumi) == 1:
54 lumi = str.split(run_lumi[0],":")[1]
55 run = str.split(run_lumi[0],":")[0]
56 ll.append((run,int(lumi)))
57
58 if len(run_lumi) == 2:
59 lumi_max = str.split(run_lumi[1],":")[1]
60 lumi_min = str.split(run_lumi[0],":")[1]
61 #print "lumi_min = ", lumi_min
62 #print "lumi_max = ", lumi_max
63 run = str.split(run_lumi[1],":")[0]
64 #print "run = ", run
65 for count in range(int(lumi_min),int(lumi_max) + 1):
66 ll.append((run,count))
67
68 #print "alla fine ll = ", ll
69
70 if len(ll):
71 lumiList = LumiList(lumis = ll)
72 #print "lumiList = ", lumiList
73 compactList = lumiList.getCompactList()
74 #print "compactList = ", compactList
75
76 totalLumiFilename = self.fjrDirectory + 'inputLumiSummaryOfTask.json'
77 totalLumiSummary = open(totalLumiFilename, 'w')
78 json.dump(compactList, totalLumiSummary)
79 totalLumiSummary.write('\n')
80 totalLumiSummary.close()
81 msg = "Summary file of input run and lumi to be analize with this task: %s\n" % totalLumiFilename
82 common.logger.info(msg)
83 return totalLumiFilename
84
85 def compareJsonFile(self,inputJsonFile):
86
87 #if (self.fjrDirectory + 'lumiSummary.json'):
88 reportFileName = self.fjrDirectory + 'lumiSummary.json'
89 missingLumiFile=self.fjrDirectory + 'missingLumiSummary.json'
90 command = 'compareJSON.py --sub ' + inputJsonFile + ' ' + reportFileName + ' ' + missingLumiFile
91 #common.logger.info(command)
92 os.system(command)
93 msg = "json file containing the difference in run and lumi between input and analyzed files: %s\n" % missingLumiFile
94 common.logger.info(msg)
95 return
96
97 def run(self):
98 """
99 The main method of the class: report status of a task
100 """
101 common.logger.debug( "Reporter::run() called")
102 task = common._db.getTask()
103
104 msg= "--------------------\n"
105 msg += "Dataset: %s\n"%str(task['dataset'])
106 if self.cfg_params.has_key('USER.copy_data') and int(self.cfg_params['USER.copy_data'])==1:
107 msg+= "Remote output :\n"
108 ## TODO: SL should come from jobDB!
109 from PhEDExDatasvcInfo import PhEDExDatasvcInfo
110
111 stageout = PhEDExDatasvcInfo(self.cfg_params)
112 endpoint, lfn, SE, SE_PATH, user = stageout.getEndpoint()
113 #print endpoint, lfn, SE, SE_PATH, user
114
115 msg+= "SE: %s %s srmPath: %s\n"%(self.cfg_params['USER.storage_element'],SE,endpoint)
116
117 else:
118 msg += "Local output: %s\n" % task['outputDirectory']
119 #print task
120 possible_status = [ 'Created',
121 'Undefined',
122 'Submitting',
123 'Submitted',
124 'NotSubmitted',
125 'Waiting',
126 'Ready',
127 'Scheduled',
128 'Running',
129 'Done',
130 'Killing',
131 'Killed',
132 'Aborted',
133 'Unknown',
134 'Done (Failed)',
135 'Cleared',
136 'Retrieved'
137 ]
138 eventsRead=0
139 eventsRequired=0
140 filesRead=0
141 filesRequired=0
142 lumis = []
143 for job in task.getJobs():
144 if (job.runningJob['applicationReturnCode']!=0 or job.runningJob['wrapperReturnCode']!=0): continue
145 # get FJR filename
146 fjr = self.fjrDirectory + job['outputFiles'][-1]
147
148 jobReport = readJobReport(fjr)
149 if len(jobReport) > 0:
150 inputFiles = jobReport[0].inputFiles
151 for inputFile in inputFiles:
152 # Accumulate the list of lum sections run over
153 for run in inputFile.runs.keys():
154 for lumi in inputFile.runs[run]:
155 lumis.append((run, lumi))
156 filesRead+=1
157 eventsRead+=int(inputFile['EventsRead'])
158 #print jobReport[0].inputFiles,'\n'
159 else:
160 pass
161 #print 'no FJR avaialble for job #%s'%job['jobId']
162 #print "--------------------------"
163
164 # Compact and write the list of successful lumis
165
166 lumiList = LumiList(lumis = lumis)
167 compactList = lumiList.getCompactList()
168
169 lumiFilename = task['outputDirectory'] + 'lumiSummary.json'
170 lumiSummary = open(lumiFilename, 'w')
171 json.dump(compactList, lumiSummary)
172 lumiSummary.write('\n')
173 lumiSummary.close()
174
175 msg += "Total Events read: %s\n" % eventsRead
176 msg += "Total Files read: %s\n" % filesRead
177 msg += "Total Jobs : %s\n" % len(task.getJobs())
178 msg += "Luminosity section summary file: %s\n" % lumiFilename
179 list_ID={}
180
181 # TEMPORARY by Fabio, to be removed
182 # avoid clashes between glite_slc5 and glite schedulers when a server is used
183 # otherwise, -report with a server requires a local scheduler
184 if self.cfg_params.get('CRAB.server_name', None) is None:
185 common.logger.debug( "Reporter updating task status")
186 task = common.scheduler.queryEverything(task['id'])
187
188 for st in possible_status:
189 list_ID = common._db.queryAttrRunJob({'statusScheduler':st},'jobId')
190 if (len(list_ID)>0):
191 msg+= " # Jobs: %s:%s\n"%(str(st),len(list_ID))
192 pass
193 msg+= "\n----------------------------\n"
194 common.logger.info(msg)
195
196
197 file = common.work_space.shareDir() + 'arguments.xml'
198 #print "file = ", file
199
200 ### starting from the arguments.xml file, a json file containing the run:lumi
201 ### that should be analyzed with the task
202 inputRunLumiFileName = self.getInputRunLumi(file)
203
204
205 ### missing lumi to analyze: starting from lumimask or from argument file
206 ### calculate the difference with report.json
207 ### if a lumimask is used in the crab.cfg
208 if (self.cfg_params.get('CMSSW.lumi_mask')):
209 lumimask=self.cfg_params.get('CMSSW.lumi_mask')
210 #print "lumimask = ", lumimask
211 self.compareJsonFile(lumimask)
212 ### without lumimask
213 elif (inputRunLumiFileName):
214 self.compareJsonFile(inputRunLumiFileName)
215 else:
216 common.logger.info("no json file to compare")
217 return
218