ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/GetOutput.py
Revision: 1.6
Committed: Mon Apr 7 16:51:51 2008 UTC (17 years ago) by spiga
Content type: text/x-python
Branch: MAIN
Changes since 1.5: +53 -38 lines
Log Message:
many Improvements on get output

File Contents

# Content
1 from Actor import *
2 import common
3 import string, os, time
4 from crab_util import *
5
6 class GetOutput(Actor):
7 def __init__(self, *args):
8 self.cfg_params = args[0]
9 self.jobs = args[1]
10
11 self.outDir = self.cfg_params.get("USER.outputdir", common.work_space.resDir() )
12 self.logDir = self.cfg_params.get("USER.logdir", common.work_space.resDir() )
13 self.return_data = self.cfg_params.get('USER.return_data',0)
14
15 self.possible_status = [
16 'Undefined',
17 'Submitted',
18 'Waiting',
19 'Ready',
20 'Scheduled',
21 'Running',
22 'Done',
23 'Cancelled',
24 'Aborted',
25 'Unknown',
26 'Done(failed)'
27 'Cleared'
28 ]
29 return
30
31 def run(self):
32 """
33 The main method of the class: compute the status and print a report
34 """
35 common.logger.debug(5, "Status::run() called")
36
37 start = time.time()
38 self.getOutput()
39 # self.parse.fjr
40 stop = time.time()
41 common.logger.debug(1, "GetOutput Time: "+str(stop - start))
42 common.logger.write("GetOutput Time: "+str(stop - start))
43 pass
44
45 def checkBeforeGet(self):
46 # should be in this way... but a core dump appear... waiting for solution
47 #self.up_task = common.scheduler.queryEverything(1)
48 self.up_task = common._db.getTask()
49 list_id_done=[]
50 self.list_id=[]
51 self.all_id =[]
52 for job in self.up_task.jobs:
53 if job.runningJob['statusScheduler']=='Done':
54 list_id_done.append(job['id'])
55 self.all_id.append(job['id'])
56 check = -1
57 if self.jobs != 'all': check = len( set(self.jobs).intersection(set(list_id_done)) )
58 if len(list_id_done)==0 or ( check == 0 ) :
59 msg='\n'
60 list_ID=[]
61 for st in self.possible_status:
62 list_ID = common._db.queryAttrRunJob({'statusScheduler':st},'jobId')
63 if len(list_ID)>0: msg += " %i Jobs %s \n" % (len(list_ID), str(st))
64 msg += '\n*******No jobs in Done status. It is not possible yet to retrieve the output.\n'
65 raise CrabException(msg)
66 else:
67 if self.jobs == 'all':
68 self.list_id= list_id_done
69 if len(self.up_task.jobs)>len(self.list_id):
70 msg = '\nOnly %d jobs will be retrieved '% (len(self.list_id))
71 msg += ' from %d requested.\n'%(len(self.up_task.jobs))
72 msg += ' (for details: crab -status)'
73 common.logger.message(msg)
74 else:
75 for id in self.jobs:
76 if id in list_id_done: self.list_id.append(id)
77 if len(self.jobs) > len(self.list_id):
78 msg = '\nOnly %d jobs will be retrieved '% (len(self.list_id))
79 msg += ' from %d requested.\n'%(len(self.jobs))
80 msg += ' (for details: crab -status)'
81 common.logger.message(msg)
82 if not os.path.isdir(self.logDir) or not os.path.isdir(self.outDir):
83 msg = ' Output or Log dir not found!! check '+self.logDir+' and '+self.outDir
84 raise CrabException(msg)
85 else:
86 submission_id = common._db.queryRunJob('submission',self.list_id)
87 submission_id.sort()
88 max_id=submission_id[0]
89 if max_id > 1: self.moveOutput()
90
91 return
92
93 def getOutput(self):
94 """
95 Get output for a finished job with id.
96 Returns the name of directory with results.
97
98 """
99 self.checkBeforeGet()
100 common.scheduler.getOutput(1,self.list_id,self.outDir) ## NeW BL--DS
101
102 listCode = []
103 job_id = []
104
105 cwd = os.getcwd()
106 os.chdir( self.outDir )
107 for id in self.list_id:
108 file = 'out_files_'+ str(id)+'.tgz'
109 if os.path.exists(file):
110 cmd = 'tar zxvf '+file
111 cmd_out = runCommand(cmd)
112 cmd_2 ='rm out_files_'+ str(id)+'.tgz'
113 cmd_out2 = runCommand(cmd_2)
114 else:
115 msg ="Output files for job "+ str(id) +"not available.\n"
116 common.logger.message(msg)
117 input = 'crab_fjr_' + str(id) + '.xml'
118 if os.path.exists(input):
119 codeValue = self.parseFinalReport(input)
120 job_id.append(jobid)
121 listCode.append(codeValue)
122 else:
123 msg = "Problems with "+str(input)+". File not available.\n"
124 common.logger.message(msg)
125 os.chdir( cwd )
126 common._db.updateRunJob_(job_id , listCode)
127
128 if self.logDir != self.outDir:
129 for i_id in self.list_id:
130 try:
131 cmd = 'mv '+str(self.outDir)+'/*'+str(i_id)+'.std* '+str(self.outDir)+'/.BrokerInfo '+str(self.outDir)+'/*.log '+str(self.logDir)
132 cmd_out =os.system(cmd)
133 except:
134 msg = 'Problem with copy of job results'
135 common.logger.message(msg)
136 msg = 'Results of Jobs # '+str(self.list_id)+' are in '+self.outDir+' (log files are in '+self.logDir+')'
137 common.logger.message(msg)
138 else:
139 msg = 'Results of Jobs # '+str(self.list_id)+' are in '+self.outDir
140 common.logger.message(msg)
141
142 return
143
144 def parseFinalReport(self, input):
145 """
146 Parses the FJR produced by job in order to retrieve
147 the WrapperExitCode and ExeExitCode.
148 Updates the BossDB with these values.
149
150 """
151 from ProdCommon.FwkJobRep.ReportParser import readJobReport
152
153 #input = self.outDir + '/crab_fjr_' + str(jobid) + '.xml'
154 codeValue = {}
155
156 jobReport = readJobReport(input)[0]
157
158 exit_status = ''
159
160 ##### temporary fix for FJR incomplete ####
161 fjr = open (input)
162 len_fjr = len(fjr.readlines())
163 if (len_fjr <= 6):
164 ### 50115 - cmsRun did not produce a valid/readable job report at runtime
165 codeValue["applicationReturnCode"] = str(50115)
166 codeValue["wrapperReturnCode"] = str(50115)
167
168 if len(jobReport.errors) != 0 :
169 for error in jobReport.errors:
170 if error['Type'] == 'WrapperExitCode':
171 codeValue["wrapperReturnCode"] = error['ExitStatus']
172 #print "wrapperReturnCode = ", error['ExitStatus']
173 elif error['Type'] == 'ExeExitCode':
174 codeValue["applicationReturnCode"] = error['ExitStatus']
175 #print "applicationReturnCode = ", error['ExitStatus']
176 else:
177 continue
178
179 if not codeValue.has_key('wrapperReturnCode'):
180 codeValue["wrapperReturnCode"] = ''
181 if not codeValue.has_key('applicationReturnCode'):
182 codeValue["applicationReturnCode"] = ''
183
184 return codeValue
185
186 def moveOutput(self):
187 """
188 Move output of job already retrieved
189 into the correct backup directory
190 """
191 sub_id = common._db.queryRunJob('submission',self.list_id)
192 Dist_sub_id = list(set(sub_id))
193
194 OutDir_Base=self.outDir+'Submission_'
195
196 for i in Dist_sub_id:
197 if not os.path.isdir(OutDir_Base+str(i)):
198 cmd=('mkdir '+OutDir_Base+str(i))
199 cmd_out = runCommand(cmd)
200 common.logger.debug(3,cmd_out)
201 i = 0
202 for id in self.list_id:
203 try:
204 cmd='mv '+self.outDir+'*_'+str(id)+'.* '+OutDir_Base+str(sub_id[i])
205 cmd_out = runCommand(cmd)
206 common.logger.debug(3,cmd_out)
207 except:
208 msg = 'no output to move for job '+str(id)
209 common.logger.debig(3,msg)
210 pass
211 i+=1
212 return