ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Reporter.py
Revision: 1.16
Committed: Wed Apr 28 12:44:40 2010 UTC (15 years ago) by ewv
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_7_3, CRAB_2_7_3_pre3, CRAB_2_7_3_pre3_beta, CRAB_2_7_3_pre2, CRAB_2_7_3_pre2_beta, CRAB_2_7_3_pre1, CRAB_2_7_3_beta3, CRAB_2_7_3_beta2, CRAB_2_7_3_beta1, CRAB_2_7_3_beta
Changes since 1.15: +7 -4 lines
Log Message:
Fix for bug https://savannah.cern.ch/bugs/?66505

File Contents

# User Rev Content
1 slacapra 1.1 import os, common, string
2     from Actor import *
3     from crab_util import *
4 ewv 1.13 from ProdCommon.FwkJobRep.ReportParser import readJobReport
5 spiga 1.15 from LumiList import LumiList
6 ewv 1.13
7     try: # FUTURE: Python 2.6, prior to 2.6 requires simplejson
8     import json
9     except:
10     import simplejson as json
11    
12 slacapra 1.1
13     class Reporter(Actor):
14     """ A class to report a short summary of the info of a task, including what
15     is needed for user analysis, such as #events requestes/done, integrated
16     lumi and so one.
17     """
18     def __init__(self, cfg_params):
19     self.cfg_params = cfg_params
20 ewv 1.16 self.fjrDirectory = cfg_params.get('USER.outputdir' ,
21     common.work_space.resDir()) + '/'
22 slacapra 1.1 return
23    
24     def run(self):
25     """
26     The main method of the class: report status of a task
27     """
28 spiga 1.4 common.logger.debug( "Reporter::run() called")
29 spiga 1.7 task = common._db.getTask()
30    
31 spiga 1.6 msg= "--------------------\n"
32     msg += "Dataset: %s\n"%str(task['dataset'])
33 slacapra 1.2 if self.cfg_params.has_key('USER.copy_data') and int(self.cfg_params['USER.copy_data'])==1:
34 spiga 1.6 msg+= "Remote output :\n"
35 slacapra 1.2 ## TODO: SL should come from jobDB!
36     from PhEDExDatasvcInfo import PhEDExDatasvcInfo
37 spiga 1.7
38 slacapra 1.2 stageout = PhEDExDatasvcInfo(self.cfg_params)
39     endpoint, lfn, SE, SE_PATH, user = stageout.getEndpoint()
40     #print endpoint, lfn, SE, SE_PATH, user
41    
42 spiga 1.6 msg+= "SE: %s %s srmPath: %s\n"%(self.cfg_params['USER.storage_element'],SE,endpoint)
43 ewv 1.13
44 slacapra 1.2 else:
45 ewv 1.16 msg += "Local output: %s\n" % task['outputDirectory']
46 slacapra 1.2 #print task
47 slacapra 1.1 possible_status = [ 'Created',
48     'Undefined',
49     'Submitting',
50     'Submitted',
51 slacapra 1.3 'NotSubmitted',
52 slacapra 1.1 'Waiting',
53     'Ready',
54     'Scheduled',
55     'Running',
56     'Done',
57     'Killing',
58     'Killed',
59     'Aborted',
60     'Unknown',
61     'Done (Failed)',
62     'Cleared',
63 spiga 1.5 'Retrieved'
64 slacapra 1.1 ]
65     eventsRead=0
66 slacapra 1.2 eventsRequired=0
67 slacapra 1.1 filesRead=0
68 slacapra 1.2 filesRequired=0
69 spiga 1.15 lumis = []
70 slacapra 1.1 for job in task.getJobs():
71 slacapra 1.3 if (job.runningJob['applicationReturnCode']>0 or job.runningJob['wrapperReturnCode']>0): continue
72 slacapra 1.1 # get FJR filename
73 ewv 1.16 fjr = self.fjrDirectory + job['outputFiles'][-1]
74    
75 slacapra 1.1 jobReport = readJobReport(fjr)
76 ewv 1.13 if len(jobReport) > 0:
77     inputFiles = jobReport[0].inputFiles
78 slacapra 1.1 for inputFile in inputFiles:
79 ewv 1.13 # Accumulate the list of lum sections run over
80     for run in inputFile.runs.keys():
81     for lumi in inputFile.runs[run]:
82 spiga 1.15 lumis.append((run, lumi))
83 slacapra 1.1 filesRead+=1
84     eventsRead+=int(inputFile['EventsRead'])
85     #print jobReport[0].inputFiles,'\n'
86     else:
87 slacapra 1.3 pass
88     #print 'no FJR avaialble for job #%s'%job['jobId']
89 slacapra 1.1 #print "--------------------------"
90 ewv 1.13
91 spiga 1.15 # Compact and write the list of successful lumis
92 ewv 1.13
93 spiga 1.15 lumiList = LumiList(lumis = lumis)
94     compactList = lumiList.getCompactList()
95 ewv 1.13
96     lumiFilename = task['outputDirectory'] + 'lumiSummary.json'
97     lumiSummary = open(lumiFilename, 'w')
98     json.dump(compactList, lumiSummary)
99     lumiSummary.write('\n')
100     lumiSummary.close()
101    
102     msg += "Total Events read: %s\n" % eventsRead
103     msg += "Total Files read: %s\n" % filesRead
104     msg += "Total Jobs : %s\n" % len(task.getJobs())
105     msg += "Luminosity section summary file: %s\n" % lumiFilename
106 slacapra 1.1 list_ID={}
107 farinafa 1.14
108 ewv 1.16 # TEMPORARY by Fabio, to be removed
109 farinafa 1.14 # avoid clashes between glite_slc5 and glite schedulers when a server is used
110 ewv 1.16 # otherwise, -report with a server requires a local scheduler
111 farinafa 1.14 if self.cfg_params.get('CRAB.server_name', None) is None:
112     common.logger.debug( "Reporter updating task status")
113     task = common.scheduler.queryEverything(task['id'])
114    
115 slacapra 1.1 for st in possible_status:
116     list_ID = common._db.queryAttrRunJob({'statusScheduler':st},'jobId')
117     if (len(list_ID)>0):
118 spiga 1.6 msg+= " # Jobs: %s:%s\n"%(str(st),len(list_ID))
119 slacapra 1.1 pass
120 spiga 1.6 msg+= "\n----------------------------\n"
121 ewv 1.13 common.logger.info(msg)
122     return
123    
124 spiga 1.7