ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Reporter.py
Revision: 1.20
Committed: Fri Sep 21 13:44:13 2012 UTC (12 years, 7 months ago) by fanzago
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_8_3_pre3, CRAB_2_8_3_pre2, CRAB_2_8_3_pre1
Changes since 1.19: +7 -2 lines
Log Message:
changed some messages

File Contents

# User Rev Content
1 slacapra 1.1 import os, common, string
2     from Actor import *
3     from crab_util import *
4 ewv 1.13 from ProdCommon.FwkJobRep.ReportParser import readJobReport
5 ewv 1.17 try: # Can remove when CMSSW 3.7 and earlier are dropped
6     from FWCore.PythonUtilities.LumiList import LumiList
7     except ImportError:
8     from LumiList import LumiList
9 ewv 1.13
10     try: # FUTURE: Python 2.6, prior to 2.6 requires simplejson
11     import json
12     except:
13     import simplejson as json
14    
15 slacapra 1.1
16     class Reporter(Actor):
17     """ A class to report a short summary of the info of a task, including what
18     is needed for user analysis, such as #events requestes/done, integrated
19     lumi and so one.
20     """
21     def __init__(self, cfg_params):
22     self.cfg_params = cfg_params
23 ewv 1.16 self.fjrDirectory = cfg_params.get('USER.outputdir' ,
24     common.work_space.resDir()) + '/'
25 slacapra 1.1 return
26    
27 fanzago 1.19 def getInputRunLumi(self, file):
28     import xml.dom.minidom
29    
30     dom = xml.dom.minidom.parse(file)
31     ll=[]
32    
33     for elem in dom.getElementsByTagName("Job"):
34     nJob = int(elem.getAttribute("JobID"))
35     #print "---> nJob = ", nJob
36     lumis = elem.getAttribute('Lumis')
37     #print "--->>> lumis = ", str(lumis)
38     #lumis = '193752:1'
39     #lumis = '193752:1-193752:5,193774:1-193774:5,193775:1'
40     if lumis:
41     tmp=str.split(str(lumis), ",")
42     #print "tmp = ", tmp
43     else:
44     return
45    
46    
47     #tmp = [193752:1-193752:5] [193774:1-193774:5]
48     for entry in tmp:
49     run_lumi=str.split(entry, "-")
50     # run_lumi = [193752:1] [193752:5]
51     #print"run_lumi = ", run_lumi
52     if len(run_lumi) == 0: pass
53     if len(run_lumi) == 1:
54     lumi = str.split(run_lumi[0],":")[1]
55     run = str.split(run_lumi[0],":")[0]
56     ll.append((run,int(lumi)))
57    
58     if len(run_lumi) == 2:
59     lumi_max = str.split(run_lumi[1],":")[1]
60     lumi_min = str.split(run_lumi[0],":")[1]
61     #print "lumi_min = ", lumi_min
62     #print "lumi_max = ", lumi_max
63     run = str.split(run_lumi[1],":")[0]
64     #print "run = ", run
65     for count in range(int(lumi_min),int(lumi_max) + 1):
66     ll.append((run,count))
67    
68     #print "alla fine ll = ", ll
69    
70     if len(ll):
71     lumiList = LumiList(lumis = ll)
72     #print "lumiList = ", lumiList
73     compactList = lumiList.getCompactList()
74     #print "compactList = ", compactList
75    
76 fanzago 1.20 totalLumiFilename = self.fjrDirectory + 'inputLumiSummaryOfTask.json'
77 fanzago 1.19 totalLumiSummary = open(totalLumiFilename, 'w')
78     json.dump(compactList, totalLumiSummary)
79     totalLumiSummary.write('\n')
80     totalLumiSummary.close()
81 fanzago 1.20 msg = "Summary file of input run and lumi to be analize with this task: %s\n" % totalLumiFilename
82     common.logger.info(msg)
83 fanzago 1.19 return totalLumiFilename
84    
85     def compareJsonFile(self,inputJsonFile):
86    
87     #if (self.fjrDirectory + 'lumiSummary.json'):
88     reportFileName = self.fjrDirectory + 'lumiSummary.json'
89 fanzago 1.20 missingLumiFile=self.fjrDirectory + 'missingLumiSummary.json'
90     command = 'compareJSON.py --sub ' + inputJsonFile + ' ' + reportFileName + ' ' + missingLumiFile
91 fanzago 1.19 #common.logger.info(command)
92     os.system(command)
93 fanzago 1.20 msg = "json file containing the difference in run and lumi between input and analyzed files: %s\n" % missingLumiFile
94     common.logger.info(msg)
95 fanzago 1.19 return
96    
97 slacapra 1.1 def run(self):
98     """
99     The main method of the class: report status of a task
100     """
101 spiga 1.4 common.logger.debug( "Reporter::run() called")
102 spiga 1.7 task = common._db.getTask()
103    
104 spiga 1.6 msg= "--------------------\n"
105     msg += "Dataset: %s\n"%str(task['dataset'])
106 slacapra 1.2 if self.cfg_params.has_key('USER.copy_data') and int(self.cfg_params['USER.copy_data'])==1:
107 spiga 1.6 msg+= "Remote output :\n"
108 slacapra 1.2 ## TODO: SL should come from jobDB!
109     from PhEDExDatasvcInfo import PhEDExDatasvcInfo
110 spiga 1.7
111 slacapra 1.2 stageout = PhEDExDatasvcInfo(self.cfg_params)
112     endpoint, lfn, SE, SE_PATH, user = stageout.getEndpoint()
113     #print endpoint, lfn, SE, SE_PATH, user
114    
115 spiga 1.6 msg+= "SE: %s %s srmPath: %s\n"%(self.cfg_params['USER.storage_element'],SE,endpoint)
116 ewv 1.13
117 slacapra 1.2 else:
118 ewv 1.16 msg += "Local output: %s\n" % task['outputDirectory']
119 slacapra 1.2 #print task
120 slacapra 1.1 possible_status = [ 'Created',
121     'Undefined',
122     'Submitting',
123     'Submitted',
124 slacapra 1.3 'NotSubmitted',
125 slacapra 1.1 'Waiting',
126     'Ready',
127     'Scheduled',
128     'Running',
129     'Done',
130     'Killing',
131     'Killed',
132     'Aborted',
133     'Unknown',
134     'Done (Failed)',
135     'Cleared',
136 spiga 1.5 'Retrieved'
137 slacapra 1.1 ]
138     eventsRead=0
139 slacapra 1.2 eventsRequired=0
140 slacapra 1.1 filesRead=0
141 slacapra 1.2 filesRequired=0
142 spiga 1.15 lumis = []
143 slacapra 1.1 for job in task.getJobs():
144 fanzago 1.18 if (job.runningJob['applicationReturnCode']!=0 or job.runningJob['wrapperReturnCode']!=0): continue
145 slacapra 1.1 # get FJR filename
146 ewv 1.16 fjr = self.fjrDirectory + job['outputFiles'][-1]
147    
148 slacapra 1.1 jobReport = readJobReport(fjr)
149 ewv 1.13 if len(jobReport) > 0:
150     inputFiles = jobReport[0].inputFiles
151 slacapra 1.1 for inputFile in inputFiles:
152 ewv 1.13 # Accumulate the list of lum sections run over
153     for run in inputFile.runs.keys():
154     for lumi in inputFile.runs[run]:
155 spiga 1.15 lumis.append((run, lumi))
156 slacapra 1.1 filesRead+=1
157     eventsRead+=int(inputFile['EventsRead'])
158     #print jobReport[0].inputFiles,'\n'
159     else:
160 slacapra 1.3 pass
161     #print 'no FJR avaialble for job #%s'%job['jobId']
162 slacapra 1.1 #print "--------------------------"
163 ewv 1.13
164 spiga 1.15 # Compact and write the list of successful lumis
165 ewv 1.13
166 spiga 1.15 lumiList = LumiList(lumis = lumis)
167     compactList = lumiList.getCompactList()
168 ewv 1.13
169     lumiFilename = task['outputDirectory'] + 'lumiSummary.json'
170     lumiSummary = open(lumiFilename, 'w')
171     json.dump(compactList, lumiSummary)
172     lumiSummary.write('\n')
173     lumiSummary.close()
174    
175     msg += "Total Events read: %s\n" % eventsRead
176     msg += "Total Files read: %s\n" % filesRead
177     msg += "Total Jobs : %s\n" % len(task.getJobs())
178     msg += "Luminosity section summary file: %s\n" % lumiFilename
179 slacapra 1.1 list_ID={}
180 farinafa 1.14
181 ewv 1.16 # TEMPORARY by Fabio, to be removed
182 farinafa 1.14 # avoid clashes between glite_slc5 and glite schedulers when a server is used
183 ewv 1.16 # otherwise, -report with a server requires a local scheduler
184 farinafa 1.14 if self.cfg_params.get('CRAB.server_name', None) is None:
185     common.logger.debug( "Reporter updating task status")
186     task = common.scheduler.queryEverything(task['id'])
187    
188 slacapra 1.1 for st in possible_status:
189     list_ID = common._db.queryAttrRunJob({'statusScheduler':st},'jobId')
190     if (len(list_ID)>0):
191 spiga 1.6 msg+= " # Jobs: %s:%s\n"%(str(st),len(list_ID))
192 slacapra 1.1 pass
193 spiga 1.6 msg+= "\n----------------------------\n"
194 ewv 1.13 common.logger.info(msg)
195 fanzago 1.19
196    
197     file = common.work_space.shareDir() + 'arguments.xml'
198     #print "file = ", file
199    
200     ### starting from the arguments.xml file, a json file containing the run:lumi
201     ### that should be analyzed with the task
202     inputRunLumiFileName = self.getInputRunLumi(file)
203    
204    
205     ### missing lumi to analyze: starting from lumimask or from argument file
206     ### calculate the difference with report.json
207     ### if a lumimask is used in the crab.cfg
208     if (self.cfg_params.get('CMSSW.lumi_mask')):
209     lumimask=self.cfg_params.get('CMSSW.lumi_mask')
210     #print "lumimask = ", lumimask
211     self.compareJsonFile(lumimask)
212     ### without lumimask
213     elif (inputRunLumiFileName):
214     self.compareJsonFile(inputRunLumiFileName)
215     else:
216     common.logger.info("no json file to compare")
217 ewv 1.13 return
218