ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Reporter.py
Revision: 1.6
Committed: Fri May 29 07:16:16 2009 UTC (15 years, 11 months ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_6_0_pre5
Changes since 1.5: +12 -13 lines
Log Message:
update database before preapre report. add stdout to the log file

File Contents

# Content
1 import os, common, string
2 from Actor import *
3 from crab_util import *
4
5 class Reporter(Actor):
6 """ A class to report a short summary of the info of a task, including what
7 is needed for user analysis, such as #events requestes/done, integrated
8 lumi and so one.
9 """
10 def __init__(self, cfg_params):
11 self.cfg_params = cfg_params
12 return
13
14 def run(self):
15 """
16 The main method of the class: report status of a task
17 """
18 common.logger.debug( "Reporter::run() called")
19 task = common.scheduler.queryEverything(1)
20 msg= "--------------------\n"
21 msg += "Dataset: %s\n"%str(task['dataset'])
22 if self.cfg_params.has_key('USER.copy_data') and int(self.cfg_params['USER.copy_data'])==1:
23 msg+= "Remote output :\n"
24 ## TODO: SL should come from jobDB!
25 from PhEDExDatasvcInfo import PhEDExDatasvcInfo
26 stageout = PhEDExDatasvcInfo(self.cfg_params)
27 endpoint, lfn, SE, SE_PATH, user = stageout.getEndpoint()
28 #print endpoint, lfn, SE, SE_PATH, user
29
30 msg+= "SE: %s %s srmPath: %s\n"%(self.cfg_params['USER.storage_element'],SE,endpoint)
31
32 else:
33 msg+= "Local output: %s"%task['outputDirectory']
34 #print task
35 from ProdCommon.FwkJobRep.ReportParser import readJobReport
36 possible_status = [ 'Created',
37 'Undefined',
38 'Submitting',
39 'Submitted',
40 'NotSubmitted',
41 'Waiting',
42 'Ready',
43 'Scheduled',
44 'Running',
45 'Done',
46 'Killing',
47 'Killed',
48 'Aborted',
49 'Unknown',
50 'Done (Failed)',
51 'Cleared',
52 'Retrieved'
53 ]
54 eventsRead=0
55 eventsRequired=0
56 filesRead=0
57 filesRequired=0
58 for job in task.getJobs():
59 if (job.runningJob['applicationReturnCode']>0 or job.runningJob['wrapperReturnCode']>0): continue
60 # get FJR filename
61 fjr=task['outputDirectory']+job['outputFiles'][-1]
62 #print fjr
63 jobReport = readJobReport(fjr)
64 if len(jobReport)>0:
65 inputFiles=jobReport[0].inputFiles
66 for inputFile in inputFiles:
67 runs=inputFile.runs
68 #print [inputFile[it] for it in ['LFN','EventsRead']]
69 # print "FileIn :",inputFile['LFN'],": Events",inputFile['EventsRead']
70 # for run in runs.keys():
71 # print "Run",run,": lumi sections",runs[run]
72 filesRead+=1
73 eventsRead+=int(inputFile['EventsRead'])
74
75 #print jobReport[0].inputFiles,'\n'
76 else:
77 pass
78 #print 'no FJR avaialble for job #%s'%job['jobId']
79 #print "--------------------------"
80 msg+= "Total Events read: %s required: %s\n"%(eventsRead,eventsRequired)
81 msg+= "Total Files read: %s reuired: %s\n"%(filesRead,filesRequired)
82 msg+= "Total Jobs : %s \n"%len(task.getJobs())
83 list_ID={}
84 for st in possible_status:
85 list_ID = common._db.queryAttrRunJob({'statusScheduler':st},'jobId')
86 if (len(list_ID)>0):
87 msg+= " # Jobs: %s:%s\n"%(str(st),len(list_ID))
88 pass
89 msg+= "\n----------------------------\n"
90 common.logger.info(msg)
91 return