ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Reporter.py
Revision: 1.23
Committed: Thu Oct 17 08:58:44 2013 UTC (11 years, 6 months ago) by fanzago
Content type: text/x-python
Branch: MAIN
CVS Tags: HEAD
Changes since 1.22: +1 -1 lines
Log Message:
typos

File Contents

# User Rev Content
1 slacapra 1.1 import os, common, string
2     from Actor import *
3     from crab_util import *
4 ewv 1.13 from ProdCommon.FwkJobRep.ReportParser import readJobReport
5 ewv 1.17 try: # Can remove when CMSSW 3.7 and earlier are dropped
6     from FWCore.PythonUtilities.LumiList import LumiList
7     except ImportError:
8     from LumiList import LumiList
9 ewv 1.13
10     try: # FUTURE: Python 2.6, prior to 2.6 requires simplejson
11     import json
12     except:
13     import simplejson as json
14    
15 slacapra 1.1
16     class Reporter(Actor):
17     """ A class to report a short summary of the info of a task, including what
18     is needed for user analysis, such as #events requestes/done, integrated
19     lumi and so one.
20     """
21     def __init__(self, cfg_params):
22     self.cfg_params = cfg_params
23 ewv 1.16 self.fjrDirectory = cfg_params.get('USER.outputdir' ,
24     common.work_space.resDir()) + '/'
25 slacapra 1.1 return
26    
27 fanzago 1.19 def getInputRunLumi(self, file):
28     import xml.dom.minidom
29    
30     dom = xml.dom.minidom.parse(file)
31     ll=[]
32    
33     for elem in dom.getElementsByTagName("Job"):
34     nJob = int(elem.getAttribute("JobID"))
35 fanzago 1.21 lumis = elem.getAttribute('Lumis')
36 fanzago 1.19 #lumis = '193752:1'
37     #lumis = '193752:1-193752:5,193774:1-193774:5,193775:1'
38     if lumis:
39     tmp=str.split(str(lumis), ",")
40     #print "tmp = ", tmp
41     else:
42 fanzago 1.21 msg = "The summary file inputLumiSummaryOfTask.json about input run and lumi isn't created"
43     common.logger.info(msg)
44 fanzago 1.19 return
45    
46    
47     #tmp = [193752:1-193752:5] [193774:1-193774:5]
48     for entry in tmp:
49     run_lumi=str.split(entry, "-")
50     # run_lumi = [193752:1] [193752:5]
51     if len(run_lumi) == 0: pass
52     if len(run_lumi) == 1:
53     lumi = str.split(run_lumi[0],":")[1]
54     run = str.split(run_lumi[0],":")[0]
55     ll.append((run,int(lumi)))
56    
57     if len(run_lumi) == 2:
58     lumi_max = str.split(run_lumi[1],":")[1]
59     lumi_min = str.split(run_lumi[0],":")[1]
60     run = str.split(run_lumi[1],":")[0]
61     for count in range(int(lumi_min),int(lumi_max) + 1):
62     ll.append((run,count))
63     if len(ll):
64     lumiList = LumiList(lumis = ll)
65     compactList = lumiList.getCompactList()
66    
67 fanzago 1.20 totalLumiFilename = self.fjrDirectory + 'inputLumiSummaryOfTask.json'
68 fanzago 1.19 totalLumiSummary = open(totalLumiFilename, 'w')
69     json.dump(compactList, totalLumiSummary)
70     totalLumiSummary.write('\n')
71     totalLumiSummary.close()
72 fanzago 1.21 msg = "Summary file of input run and lumi to be analize with this task: %s\n" %totalLumiFilename
73     common.logger.info(msg)
74     else:
75     msg = "The summary file inputLumiSummaryOfTask.json about input run and lumi isn't created"
76 fanzago 1.20 common.logger.info(msg)
77 fanzago 1.19 return totalLumiFilename
78    
79 fanzago 1.22 #def compareJsonFile(self,inputJsonFile):
80     def compareJsonFile(self,inputJsonFile, outputJsonFile):
81 fanzago 1.19
82 fanzago 1.22 ### file contains the lumi and run analyzed by correctly finished jobs
83 fanzago 1.19 reportFileName = self.fjrDirectory + 'lumiSummary.json'
84 fanzago 1.22
85     #missingLumiFile=self.fjrDirectory + 'missingLumiSummary.json'
86     missingLumiFile=self.fjrDirectory + outputJsonFile
87 fanzago 1.20 command = 'compareJSON.py --sub ' + inputJsonFile + ' ' + reportFileName + ' ' + missingLumiFile
88 fanzago 1.19 #common.logger.info(command)
89     os.system(command)
90 fanzago 1.22 msg = "Json file about the difference in run and lumi between the lumis in %s and the lumis analyzed by your correctly terminated jobs in %s :\n%s\n"%(os.path.basename(inputJsonFile),os.path.basename(reportFileName),missingLumiFile)
91     #msg = "to complete your analysis, you have to analyze the run and lumi reported in the %s file\n" %missingLumiFile
92 fanzago 1.20 common.logger.info(msg)
93 fanzago 1.19 return
94    
95 slacapra 1.1 def run(self):
96     """
97     The main method of the class: report status of a task
98     """
99 spiga 1.4 common.logger.debug( "Reporter::run() called")
100 spiga 1.7 task = common._db.getTask()
101    
102 spiga 1.6 msg= "--------------------\n"
103     msg += "Dataset: %s\n"%str(task['dataset'])
104 slacapra 1.2 if self.cfg_params.has_key('USER.copy_data') and int(self.cfg_params['USER.copy_data'])==1:
105 spiga 1.6 msg+= "Remote output :\n"
106 slacapra 1.2 ## TODO: SL should come from jobDB!
107     from PhEDExDatasvcInfo import PhEDExDatasvcInfo
108 spiga 1.7
109 slacapra 1.2 stageout = PhEDExDatasvcInfo(self.cfg_params)
110     endpoint, lfn, SE, SE_PATH, user = stageout.getEndpoint()
111     #print endpoint, lfn, SE, SE_PATH, user
112    
113 spiga 1.6 msg+= "SE: %s %s srmPath: %s\n"%(self.cfg_params['USER.storage_element'],SE,endpoint)
114 ewv 1.13
115 slacapra 1.2 else:
116 ewv 1.16 msg += "Local output: %s\n" % task['outputDirectory']
117 slacapra 1.2 #print task
118 slacapra 1.1 possible_status = [ 'Created',
119     'Undefined',
120     'Submitting',
121     'Submitted',
122 slacapra 1.3 'NotSubmitted',
123 slacapra 1.1 'Waiting',
124     'Ready',
125     'Scheduled',
126     'Running',
127     'Done',
128     'Killing',
129     'Killed',
130     'Aborted',
131     'Unknown',
132     'Done (Failed)',
133     'Cleared',
134 spiga 1.5 'Retrieved'
135 slacapra 1.1 ]
136     eventsRead=0
137 slacapra 1.2 eventsRequired=0
138 slacapra 1.1 filesRead=0
139 slacapra 1.2 filesRequired=0
140 spiga 1.15 lumis = []
141 slacapra 1.1 for job in task.getJobs():
142 fanzago 1.18 if (job.runningJob['applicationReturnCode']!=0 or job.runningJob['wrapperReturnCode']!=0): continue
143 slacapra 1.1 # get FJR filename
144 ewv 1.16 fjr = self.fjrDirectory + job['outputFiles'][-1]
145    
146 slacapra 1.1 jobReport = readJobReport(fjr)
147 ewv 1.13 if len(jobReport) > 0:
148     inputFiles = jobReport[0].inputFiles
149 slacapra 1.1 for inputFile in inputFiles:
150 ewv 1.13 # Accumulate the list of lum sections run over
151     for run in inputFile.runs.keys():
152     for lumi in inputFile.runs[run]:
153 spiga 1.15 lumis.append((run, lumi))
154 slacapra 1.1 filesRead+=1
155     eventsRead+=int(inputFile['EventsRead'])
156     #print jobReport[0].inputFiles,'\n'
157     else:
158 slacapra 1.3 pass
159     #print 'no FJR avaialble for job #%s'%job['jobId']
160 slacapra 1.1 #print "--------------------------"
161 ewv 1.13
162 spiga 1.15 # Compact and write the list of successful lumis
163 ewv 1.13
164 spiga 1.15 lumiList = LumiList(lumis = lumis)
165     compactList = lumiList.getCompactList()
166 ewv 1.13
167     lumiFilename = task['outputDirectory'] + 'lumiSummary.json'
168     lumiSummary = open(lumiFilename, 'w')
169     json.dump(compactList, lumiSummary)
170     lumiSummary.write('\n')
171     lumiSummary.close()
172    
173     msg += "Total Events read: %s\n" % eventsRead
174     msg += "Total Files read: %s\n" % filesRead
175     msg += "Total Jobs : %s\n" % len(task.getJobs())
176     msg += "Luminosity section summary file: %s\n" % lumiFilename
177 slacapra 1.1 list_ID={}
178 farinafa 1.14
179 ewv 1.16 # TEMPORARY by Fabio, to be removed
180 farinafa 1.14 # avoid clashes between glite_slc5 and glite schedulers when a server is used
181 ewv 1.16 # otherwise, -report with a server requires a local scheduler
182 farinafa 1.14 if self.cfg_params.get('CRAB.server_name', None) is None:
183     common.logger.debug( "Reporter updating task status")
184     task = common.scheduler.queryEverything(task['id'])
185    
186 slacapra 1.1 for st in possible_status:
187     list_ID = common._db.queryAttrRunJob({'statusScheduler':st},'jobId')
188     if (len(list_ID)>0):
189 spiga 1.6 msg+= " # Jobs: %s:%s\n"%(str(st),len(list_ID))
190 slacapra 1.1 pass
191 spiga 1.6 msg+= "\n----------------------------\n"
192 ewv 1.13 common.logger.info(msg)
193 fanzago 1.19
194    
195     file = common.work_space.shareDir() + 'arguments.xml'
196     #print "file = ", file
197    
198     ### starting from the arguments.xml file, a json file containing the run:lumi
199     ### that should be analyzed with the task
200     inputRunLumiFileName = self.getInputRunLumi(file)
201 fanzago 1.22 #print "inputRunLumiFileName = ", inputRunLumiFileName
202 fanzago 1.19
203    
204     ### missing lumi to analyze: starting from lumimask or from argument file
205     ### calculate the difference with report.json
206     ### if a lumimask is used in the crab.cfg
207 fanzago 1.22 if (self.cfg_params.get('CMSSW.lumi_mask') == None) and (os.path.isfile(inputRunLumiFileName) == False or (os.path.isfile(inputRunLumiFileName) == True and os.path.getsize(inputRunLumiFileName) == 0)):
208     common.logger.info("No Lumi file to compare")
209     else:
210     if (self.cfg_params.get('CMSSW.lumi_mask')):
211     lumimask=self.cfg_params.get('CMSSW.lumi_mask')
212     #print "lumimask = ", lumimask
213 fanzago 1.23 self.compareJsonFile(lumimask,'total_missingLumiSummary.json')
214 fanzago 1.22 ### without lumimask
215     if (inputRunLumiFileName):
216     self.compareJsonFile(inputRunLumiFileName, 'task_missingLumiSummary.json')
217 ewv 1.13 return
218