ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Reporter.py
Revision: 1.23
Committed: Thu Oct 17 08:58:44 2013 UTC (11 years, 6 months ago) by fanzago
Content type: text/x-python
Branch: MAIN
CVS Tags: HEAD
Changes since 1.22: +1 -1 lines
Log Message:
typos

File Contents

# Content
1 import os, common, string
2 from Actor import *
3 from crab_util import *
4 from ProdCommon.FwkJobRep.ReportParser import readJobReport
5 try: # Can remove when CMSSW 3.7 and earlier are dropped
6 from FWCore.PythonUtilities.LumiList import LumiList
7 except ImportError:
8 from LumiList import LumiList
9
10 try: # FUTURE: Python 2.6, prior to 2.6 requires simplejson
11 import json
12 except:
13 import simplejson as json
14
15
16 class Reporter(Actor):
17 """ A class to report a short summary of the info of a task, including what
18 is needed for user analysis, such as #events requestes/done, integrated
19 lumi and so one.
20 """
21 def __init__(self, cfg_params):
22 self.cfg_params = cfg_params
23 self.fjrDirectory = cfg_params.get('USER.outputdir' ,
24 common.work_space.resDir()) + '/'
25 return
26
27 def getInputRunLumi(self, file):
28 import xml.dom.minidom
29
30 dom = xml.dom.minidom.parse(file)
31 ll=[]
32
33 for elem in dom.getElementsByTagName("Job"):
34 nJob = int(elem.getAttribute("JobID"))
35 lumis = elem.getAttribute('Lumis')
36 #lumis = '193752:1'
37 #lumis = '193752:1-193752:5,193774:1-193774:5,193775:1'
38 if lumis:
39 tmp=str.split(str(lumis), ",")
40 #print "tmp = ", tmp
41 else:
42 msg = "The summary file inputLumiSummaryOfTask.json about input run and lumi isn't created"
43 common.logger.info(msg)
44 return
45
46
47 #tmp = [193752:1-193752:5] [193774:1-193774:5]
48 for entry in tmp:
49 run_lumi=str.split(entry, "-")
50 # run_lumi = [193752:1] [193752:5]
51 if len(run_lumi) == 0: pass
52 if len(run_lumi) == 1:
53 lumi = str.split(run_lumi[0],":")[1]
54 run = str.split(run_lumi[0],":")[0]
55 ll.append((run,int(lumi)))
56
57 if len(run_lumi) == 2:
58 lumi_max = str.split(run_lumi[1],":")[1]
59 lumi_min = str.split(run_lumi[0],":")[1]
60 run = str.split(run_lumi[1],":")[0]
61 for count in range(int(lumi_min),int(lumi_max) + 1):
62 ll.append((run,count))
63 if len(ll):
64 lumiList = LumiList(lumis = ll)
65 compactList = lumiList.getCompactList()
66
67 totalLumiFilename = self.fjrDirectory + 'inputLumiSummaryOfTask.json'
68 totalLumiSummary = open(totalLumiFilename, 'w')
69 json.dump(compactList, totalLumiSummary)
70 totalLumiSummary.write('\n')
71 totalLumiSummary.close()
72 msg = "Summary file of input run and lumi to be analize with this task: %s\n" %totalLumiFilename
73 common.logger.info(msg)
74 else:
75 msg = "The summary file inputLumiSummaryOfTask.json about input run and lumi isn't created"
76 common.logger.info(msg)
77 return totalLumiFilename
78
79 #def compareJsonFile(self,inputJsonFile):
80 def compareJsonFile(self,inputJsonFile, outputJsonFile):
81
82 ### file contains the lumi and run analyzed by correctly finished jobs
83 reportFileName = self.fjrDirectory + 'lumiSummary.json'
84
85 #missingLumiFile=self.fjrDirectory + 'missingLumiSummary.json'
86 missingLumiFile=self.fjrDirectory + outputJsonFile
87 command = 'compareJSON.py --sub ' + inputJsonFile + ' ' + reportFileName + ' ' + missingLumiFile
88 #common.logger.info(command)
89 os.system(command)
90 msg = "Json file about the difference in run and lumi between the lumis in %s and the lumis analyzed by your correctly terminated jobs in %s :\n%s\n"%(os.path.basename(inputJsonFile),os.path.basename(reportFileName),missingLumiFile)
91 #msg = "to complete your analysis, you have to analyze the run and lumi reported in the %s file\n" %missingLumiFile
92 common.logger.info(msg)
93 return
94
95 def run(self):
96 """
97 The main method of the class: report status of a task
98 """
99 common.logger.debug( "Reporter::run() called")
100 task = common._db.getTask()
101
102 msg= "--------------------\n"
103 msg += "Dataset: %s\n"%str(task['dataset'])
104 if self.cfg_params.has_key('USER.copy_data') and int(self.cfg_params['USER.copy_data'])==1:
105 msg+= "Remote output :\n"
106 ## TODO: SL should come from jobDB!
107 from PhEDExDatasvcInfo import PhEDExDatasvcInfo
108
109 stageout = PhEDExDatasvcInfo(self.cfg_params)
110 endpoint, lfn, SE, SE_PATH, user = stageout.getEndpoint()
111 #print endpoint, lfn, SE, SE_PATH, user
112
113 msg+= "SE: %s %s srmPath: %s\n"%(self.cfg_params['USER.storage_element'],SE,endpoint)
114
115 else:
116 msg += "Local output: %s\n" % task['outputDirectory']
117 #print task
118 possible_status = [ 'Created',
119 'Undefined',
120 'Submitting',
121 'Submitted',
122 'NotSubmitted',
123 'Waiting',
124 'Ready',
125 'Scheduled',
126 'Running',
127 'Done',
128 'Killing',
129 'Killed',
130 'Aborted',
131 'Unknown',
132 'Done (Failed)',
133 'Cleared',
134 'Retrieved'
135 ]
136 eventsRead=0
137 eventsRequired=0
138 filesRead=0
139 filesRequired=0
140 lumis = []
141 for job in task.getJobs():
142 if (job.runningJob['applicationReturnCode']!=0 or job.runningJob['wrapperReturnCode']!=0): continue
143 # get FJR filename
144 fjr = self.fjrDirectory + job['outputFiles'][-1]
145
146 jobReport = readJobReport(fjr)
147 if len(jobReport) > 0:
148 inputFiles = jobReport[0].inputFiles
149 for inputFile in inputFiles:
150 # Accumulate the list of lum sections run over
151 for run in inputFile.runs.keys():
152 for lumi in inputFile.runs[run]:
153 lumis.append((run, lumi))
154 filesRead+=1
155 eventsRead+=int(inputFile['EventsRead'])
156 #print jobReport[0].inputFiles,'\n'
157 else:
158 pass
159 #print 'no FJR avaialble for job #%s'%job['jobId']
160 #print "--------------------------"
161
162 # Compact and write the list of successful lumis
163
164 lumiList = LumiList(lumis = lumis)
165 compactList = lumiList.getCompactList()
166
167 lumiFilename = task['outputDirectory'] + 'lumiSummary.json'
168 lumiSummary = open(lumiFilename, 'w')
169 json.dump(compactList, lumiSummary)
170 lumiSummary.write('\n')
171 lumiSummary.close()
172
173 msg += "Total Events read: %s\n" % eventsRead
174 msg += "Total Files read: %s\n" % filesRead
175 msg += "Total Jobs : %s\n" % len(task.getJobs())
176 msg += "Luminosity section summary file: %s\n" % lumiFilename
177 list_ID={}
178
179 # TEMPORARY by Fabio, to be removed
180 # avoid clashes between glite_slc5 and glite schedulers when a server is used
181 # otherwise, -report with a server requires a local scheduler
182 if self.cfg_params.get('CRAB.server_name', None) is None:
183 common.logger.debug( "Reporter updating task status")
184 task = common.scheduler.queryEverything(task['id'])
185
186 for st in possible_status:
187 list_ID = common._db.queryAttrRunJob({'statusScheduler':st},'jobId')
188 if (len(list_ID)>0):
189 msg+= " # Jobs: %s:%s\n"%(str(st),len(list_ID))
190 pass
191 msg+= "\n----------------------------\n"
192 common.logger.info(msg)
193
194
195 file = common.work_space.shareDir() + 'arguments.xml'
196 #print "file = ", file
197
198 ### starting from the arguments.xml file, a json file containing the run:lumi
199 ### that should be analyzed with the task
200 inputRunLumiFileName = self.getInputRunLumi(file)
201 #print "inputRunLumiFileName = ", inputRunLumiFileName
202
203
204 ### missing lumi to analyze: starting from lumimask or from argument file
205 ### calculate the difference with report.json
206 ### if a lumimask is used in the crab.cfg
207 if (self.cfg_params.get('CMSSW.lumi_mask') == None) and (os.path.isfile(inputRunLumiFileName) == False or (os.path.isfile(inputRunLumiFileName) == True and os.path.getsize(inputRunLumiFileName) == 0)):
208 common.logger.info("No Lumi file to compare")
209 else:
210 if (self.cfg_params.get('CMSSW.lumi_mask')):
211 lumimask=self.cfg_params.get('CMSSW.lumi_mask')
212 #print "lumimask = ", lumimask
213 self.compareJsonFile(lumimask,'total_missingLumiSummary.json')
214 ### without lumimask
215 if (inputRunLumiFileName):
216 self.compareJsonFile(inputRunLumiFileName, 'task_missingLumiSummary.json')
217 return
218