ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Reporter.py
Revision: 1.16
Committed: Wed Apr 28 12:44:40 2010 UTC (15 years ago) by ewv
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_7_3, CRAB_2_7_3_pre3, CRAB_2_7_3_pre3_beta, CRAB_2_7_3_pre2, CRAB_2_7_3_pre2_beta, CRAB_2_7_3_pre1, CRAB_2_7_3_beta3, CRAB_2_7_3_beta2, CRAB_2_7_3_beta1, CRAB_2_7_3_beta
Changes since 1.15: +7 -4 lines
Log Message:
Fix for bug https://savannah.cern.ch/bugs/?66505

File Contents

# Content
1 import os, common, string
2 from Actor import *
3 from crab_util import *
4 from ProdCommon.FwkJobRep.ReportParser import readJobReport
5 from LumiList import LumiList
6
7 try: # FUTURE: Python 2.6, prior to 2.6 requires simplejson
8 import json
9 except:
10 import simplejson as json
11
12
13 class Reporter(Actor):
14 """ A class to report a short summary of the info of a task, including what
15 is needed for user analysis, such as #events requestes/done, integrated
16 lumi and so one.
17 """
18 def __init__(self, cfg_params):
19 self.cfg_params = cfg_params
20 self.fjrDirectory = cfg_params.get('USER.outputdir' ,
21 common.work_space.resDir()) + '/'
22 return
23
24 def run(self):
25 """
26 The main method of the class: report status of a task
27 """
28 common.logger.debug( "Reporter::run() called")
29 task = common._db.getTask()
30
31 msg= "--------------------\n"
32 msg += "Dataset: %s\n"%str(task['dataset'])
33 if self.cfg_params.has_key('USER.copy_data') and int(self.cfg_params['USER.copy_data'])==1:
34 msg+= "Remote output :\n"
35 ## TODO: SL should come from jobDB!
36 from PhEDExDatasvcInfo import PhEDExDatasvcInfo
37
38 stageout = PhEDExDatasvcInfo(self.cfg_params)
39 endpoint, lfn, SE, SE_PATH, user = stageout.getEndpoint()
40 #print endpoint, lfn, SE, SE_PATH, user
41
42 msg+= "SE: %s %s srmPath: %s\n"%(self.cfg_params['USER.storage_element'],SE,endpoint)
43
44 else:
45 msg += "Local output: %s\n" % task['outputDirectory']
46 #print task
47 possible_status = [ 'Created',
48 'Undefined',
49 'Submitting',
50 'Submitted',
51 'NotSubmitted',
52 'Waiting',
53 'Ready',
54 'Scheduled',
55 'Running',
56 'Done',
57 'Killing',
58 'Killed',
59 'Aborted',
60 'Unknown',
61 'Done (Failed)',
62 'Cleared',
63 'Retrieved'
64 ]
65 eventsRead=0
66 eventsRequired=0
67 filesRead=0
68 filesRequired=0
69 lumis = []
70 for job in task.getJobs():
71 if (job.runningJob['applicationReturnCode']>0 or job.runningJob['wrapperReturnCode']>0): continue
72 # get FJR filename
73 fjr = self.fjrDirectory + job['outputFiles'][-1]
74
75 jobReport = readJobReport(fjr)
76 if len(jobReport) > 0:
77 inputFiles = jobReport[0].inputFiles
78 for inputFile in inputFiles:
79 # Accumulate the list of lum sections run over
80 for run in inputFile.runs.keys():
81 for lumi in inputFile.runs[run]:
82 lumis.append((run, lumi))
83 filesRead+=1
84 eventsRead+=int(inputFile['EventsRead'])
85 #print jobReport[0].inputFiles,'\n'
86 else:
87 pass
88 #print 'no FJR avaialble for job #%s'%job['jobId']
89 #print "--------------------------"
90
91 # Compact and write the list of successful lumis
92
93 lumiList = LumiList(lumis = lumis)
94 compactList = lumiList.getCompactList()
95
96 lumiFilename = task['outputDirectory'] + 'lumiSummary.json'
97 lumiSummary = open(lumiFilename, 'w')
98 json.dump(compactList, lumiSummary)
99 lumiSummary.write('\n')
100 lumiSummary.close()
101
102 msg += "Total Events read: %s\n" % eventsRead
103 msg += "Total Files read: %s\n" % filesRead
104 msg += "Total Jobs : %s\n" % len(task.getJobs())
105 msg += "Luminosity section summary file: %s\n" % lumiFilename
106 list_ID={}
107
108 # TEMPORARY by Fabio, to be removed
109 # avoid clashes between glite_slc5 and glite schedulers when a server is used
110 # otherwise, -report with a server requires a local scheduler
111 if self.cfg_params.get('CRAB.server_name', None) is None:
112 common.logger.debug( "Reporter updating task status")
113 task = common.scheduler.queryEverything(task['id'])
114
115 for st in possible_status:
116 list_ID = common._db.queryAttrRunJob({'statusScheduler':st},'jobId')
117 if (len(list_ID)>0):
118 msg+= " # Jobs: %s:%s\n"%(str(st),len(list_ID))
119 pass
120 msg+= "\n----------------------------\n"
121 common.logger.info(msg)
122 return
123
124