ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Reporter.py
Revision: 1.18
Committed: Thu Jul 15 16:08:59 2010 UTC (14 years, 9 months ago) by fanzago
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_8_2_patch1, CRAB_2_8_2, CRAB_2_8_2_pre5, CRAB_2_8_2_pre4, CRAB_2_8_2_pre3, CRAB_2_8_2_pre2, CRAB_2_8_2_pre1, CRAB_2_8_1, CRAB_2_8_0, CRAB_2_8_0_pre1, CRAB_2_7_10_pre3, CRAB_2_7_9_patch2_pre1, CRAB_2_7_10_pre2, CRAB_2_7_10_pre1, CRAB_2_7_9_patch1, CRAB_2_7_9, CRAB_2_7_9_pre5, CRAB_2_7_9_pre4, CRAB_2_7_9_pre3, CRAB_2_7_9_pre2, CRAB_2_7_8_patch2, CRAB_2_7_9_pre1, CRAB_2_7_8_patch2_pre1, CRAB_2_7_8_patch1, CRAB_2_7_8_patch1_pre1, CRAB_2_7_8, CRAB_2_7_8_pre3, CRAB_2_7_8_pre2, CRAB_2_7_8_dash3, CRAB_2_7_8_dash2, CRAB_2_7_8_dash, CRAB_2_7_7_patch1, CRAB_2_7_7_patch1_pre1, CRAB_2_7_8_pre1, CRAB_2_7_7, CRAB_2_7_7_pre2, CRAB_2_7_7_pre1, CRAB_2_7_6_patch1, CRAB_2_7_6, CRAB_2_7_6_pre1, CRAB_2_7_5_patch1, CRAB_2_7_5, CRAB_2_7_5_pre3, CRAB_2_7_5_pre2, CRAB_2_7_5_pre1, CRAB_2_7_4_patch1, CRAB_2_7_4, CRAB_2_7_4_pre6, CRAB_2_7_4_pre5, CRAB_2_7_4_pre4, CRAB_2_7_4_pre3
Changes since 1.17: +1 -1 lines
Log Message:
fixed problem related info consistency, savannah 69184

File Contents

# User Rev Content
1 slacapra 1.1 import os, common, string
2     from Actor import *
3     from crab_util import *
4 ewv 1.13 from ProdCommon.FwkJobRep.ReportParser import readJobReport
5 ewv 1.17 try: # Can remove when CMSSW 3.7 and earlier are dropped
6     from FWCore.PythonUtilities.LumiList import LumiList
7     except ImportError:
8     from LumiList import LumiList
9 ewv 1.13
10     try: # FUTURE: Python 2.6, prior to 2.6 requires simplejson
11     import json
12     except:
13     import simplejson as json
14    
15 slacapra 1.1
16     class Reporter(Actor):
17     """ A class to report a short summary of the info of a task, including what
18     is needed for user analysis, such as #events requestes/done, integrated
19     lumi and so one.
20     """
21     def __init__(self, cfg_params):
22     self.cfg_params = cfg_params
23 ewv 1.16 self.fjrDirectory = cfg_params.get('USER.outputdir' ,
24     common.work_space.resDir()) + '/'
25 slacapra 1.1 return
26    
27     def run(self):
28     """
29     The main method of the class: report status of a task
30     """
31 spiga 1.4 common.logger.debug( "Reporter::run() called")
32 spiga 1.7 task = common._db.getTask()
33    
34 spiga 1.6 msg= "--------------------\n"
35     msg += "Dataset: %s\n"%str(task['dataset'])
36 slacapra 1.2 if self.cfg_params.has_key('USER.copy_data') and int(self.cfg_params['USER.copy_data'])==1:
37 spiga 1.6 msg+= "Remote output :\n"
38 slacapra 1.2 ## TODO: SL should come from jobDB!
39     from PhEDExDatasvcInfo import PhEDExDatasvcInfo
40 spiga 1.7
41 slacapra 1.2 stageout = PhEDExDatasvcInfo(self.cfg_params)
42     endpoint, lfn, SE, SE_PATH, user = stageout.getEndpoint()
43     #print endpoint, lfn, SE, SE_PATH, user
44    
45 spiga 1.6 msg+= "SE: %s %s srmPath: %s\n"%(self.cfg_params['USER.storage_element'],SE,endpoint)
46 ewv 1.13
47 slacapra 1.2 else:
48 ewv 1.16 msg += "Local output: %s\n" % task['outputDirectory']
49 slacapra 1.2 #print task
50 slacapra 1.1 possible_status = [ 'Created',
51     'Undefined',
52     'Submitting',
53     'Submitted',
54 slacapra 1.3 'NotSubmitted',
55 slacapra 1.1 'Waiting',
56     'Ready',
57     'Scheduled',
58     'Running',
59     'Done',
60     'Killing',
61     'Killed',
62     'Aborted',
63     'Unknown',
64     'Done (Failed)',
65     'Cleared',
66 spiga 1.5 'Retrieved'
67 slacapra 1.1 ]
68     eventsRead=0
69 slacapra 1.2 eventsRequired=0
70 slacapra 1.1 filesRead=0
71 slacapra 1.2 filesRequired=0
72 spiga 1.15 lumis = []
73 slacapra 1.1 for job in task.getJobs():
74 fanzago 1.18 if (job.runningJob['applicationReturnCode']!=0 or job.runningJob['wrapperReturnCode']!=0): continue
75 slacapra 1.1 # get FJR filename
76 ewv 1.16 fjr = self.fjrDirectory + job['outputFiles'][-1]
77    
78 slacapra 1.1 jobReport = readJobReport(fjr)
79 ewv 1.13 if len(jobReport) > 0:
80     inputFiles = jobReport[0].inputFiles
81 slacapra 1.1 for inputFile in inputFiles:
82 ewv 1.13 # Accumulate the list of lum sections run over
83     for run in inputFile.runs.keys():
84     for lumi in inputFile.runs[run]:
85 spiga 1.15 lumis.append((run, lumi))
86 slacapra 1.1 filesRead+=1
87     eventsRead+=int(inputFile['EventsRead'])
88     #print jobReport[0].inputFiles,'\n'
89     else:
90 slacapra 1.3 pass
91     #print 'no FJR avaialble for job #%s'%job['jobId']
92 slacapra 1.1 #print "--------------------------"
93 ewv 1.13
94 spiga 1.15 # Compact and write the list of successful lumis
95 ewv 1.13
96 spiga 1.15 lumiList = LumiList(lumis = lumis)
97     compactList = lumiList.getCompactList()
98 ewv 1.13
99     lumiFilename = task['outputDirectory'] + 'lumiSummary.json'
100     lumiSummary = open(lumiFilename, 'w')
101     json.dump(compactList, lumiSummary)
102     lumiSummary.write('\n')
103     lumiSummary.close()
104    
105     msg += "Total Events read: %s\n" % eventsRead
106     msg += "Total Files read: %s\n" % filesRead
107     msg += "Total Jobs : %s\n" % len(task.getJobs())
108     msg += "Luminosity section summary file: %s\n" % lumiFilename
109 slacapra 1.1 list_ID={}
110 farinafa 1.14
111 ewv 1.16 # TEMPORARY by Fabio, to be removed
112 farinafa 1.14 # avoid clashes between glite_slc5 and glite schedulers when a server is used
113 ewv 1.16 # otherwise, -report with a server requires a local scheduler
114 farinafa 1.14 if self.cfg_params.get('CRAB.server_name', None) is None:
115     common.logger.debug( "Reporter updating task status")
116     task = common.scheduler.queryEverything(task['id'])
117    
118 slacapra 1.1 for st in possible_status:
119     list_ID = common._db.queryAttrRunJob({'statusScheduler':st},'jobId')
120     if (len(list_ID)>0):
121 spiga 1.6 msg+= " # Jobs: %s:%s\n"%(str(st),len(list_ID))
122 slacapra 1.1 pass
123 spiga 1.6 msg+= "\n----------------------------\n"
124 ewv 1.13 common.logger.info(msg)
125     return
126    
127 spiga 1.7