ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/GetOutput.py
Revision: 1.31
Committed: Mon Aug 11 09:16:17 2008 UTC (16 years, 8 months ago) by fanzago
Content type: text/x-python
Branch: MAIN
Changes since 1.30: +22 -1 lines
Log Message:
fixed moving of log file in the case of resubmission with outputDir and logDir name chosen by the user

File Contents

# Content
1 from Actor import *
2 import common
3 import string, os, time
4 from crab_util import *
5
6 class GetOutput(Actor):
7 def __init__(self, *args):
8 self.cfg_params = args[0]
9 self.jobs = args[1]
10
11 self.log=0
12 self.outDir = self.cfg_params.get('USER.outputdir' ,common.work_space.resDir())
13 if ( self.outDir[-1] != '/' ) : self.outDir = self.outDir + '/'
14 self.logDir = self.cfg_params.get('USER.logdir' ,common.work_space.resDir())
15 if ( self.logDir[-1] != '/' ) : self.logDir = self.logDir + '/'
16 if self.logDir != self.outDir:
17 self.log=1
18 self.return_data = self.cfg_params.get('USER.return_data',0)
19
20 self.possible_status = {
21 'UN': 'Unknown',
22 'SU': 'Submitted',
23 'SW': 'Waiting',
24 'SS': 'Scheduled',
25 'R': 'Running',
26 'SD': 'Done',
27 'SK': 'Killed',
28 'SA': 'Aborted',
29 'SE': 'Cleared',
30 'E': 'Cleared'
31 }
32 return
33
34 def run(self):
35 """
36 The main method of the class: Check destination dirs and
37 perform the get output
38 """
39 common.logger.debug(5, "GetOutput::run() called")
40
41 start = time.time()
42 self.getOutput()
43 stop = time.time()
44 common.logger.debug(1, "GetOutput Time: "+str(stop - start))
45 common.logger.write("GetOutput Time: "+str(stop - start))
46 pass
47
48 def checkBeforeGet(self):
49 # should be in this way... but a core dump appear... waiting for solution
50 #self.up_task = common.scheduler.queryEverything(1)
51 self.up_task = common._db.getTask()
52 list_id_done=[]
53 self.list_id=[]
54 self.all_id =[]
55 for job in self.up_task.jobs:
56 if job.runningJob['status'] in ['SD']:
57 list_id_done.append(job['jobId'])
58 self.all_id.append(job['jobId'])
59 check = -1
60 if self.jobs != 'all': check = len( set(self.jobs).intersection(set(list_id_done)) )
61 if len(list_id_done)==0 or ( check == 0 ) :
62 msg='\n'
63 list_ID=[]
64 for st,stDetail in self.possible_status.iteritems():
65 list_ID = common._db.queryAttrRunJob({'status':st},'jobId')
66 if len(list_ID)>0: msg += " %i Jobs in status: %s \n" % (len(list_ID), str(stDetail))
67 msg += '\n*******No jobs in Done status. It is not possible yet to retrieve the output.\n'
68 raise CrabException(msg)
69 else:
70 if self.jobs == 'all':
71 self.list_id= list_id_done
72 if len(self.up_task.jobs)>len(self.list_id):
73 msg = '\nOnly %d jobs will be retrieved '% (len(self.list_id))
74 msg += ' from %d requested.\n'%(len(self.up_task.jobs))
75 msg += ' (for details: crab -status)'
76 common.logger.message(msg)
77 else:
78 for id in self.jobs:
79 if id in list_id_done: self.list_id.append(id)
80 if len(self.jobs) > len(self.list_id):
81 msg = '\nOnly %d jobs will be retrieved '% (len(self.list_id))
82 msg += ' from %d requested.\n'%(len(self.jobs))
83 msg += ' (for details: crab -status)'
84 common.logger.message(msg)
85 if not os.path.isdir(self.logDir) or not os.path.isdir(self.outDir):
86 msg = ' Output or Log dir not found!! check '+self.logDir+' and '+self.outDir
87 raise CrabException(msg)
88 else:
89 submission_id = common._db.queryRunJob('submission',self.list_id)
90 submission_id.sort()
91 submission_id.reverse()
92 max_id=submission_id[0]
93 if max_id > 1: self.moveOutput(max_id)
94
95 return
96
97 def getOutput(self):
98 """
99 Get output for a finished job with id.
100 """
101 self.checkBeforeGet()
102 common.scheduler.getOutput(1,self.list_id,self.outDir)
103 self.organizeOutput()
104 return
105
106 def organizeOutput(self):
107 """
108 Untar Output
109 """
110 listCode = []
111 job_id = []
112
113 cwd = os.getcwd()
114 os.chdir( self.outDir )
115 success_ret = 0
116 for id in self.list_id:
117 file = 'out_files_'+ str(id)+'.tgz'
118 if os.path.exists(file):
119 cmd = 'tar zxvf '+file
120 cmd_out = runCommand(cmd)
121 cmd_2 ='rm out_files_'+ str(id)+'.tgz'
122 cmd_out2 = runCommand(cmd_2)
123 msg = 'Results of Jobs # '+str(id)+' are in '+self.outDir
124 common.logger.message(msg)
125 else:
126 msg ="Output files for job "+ str(id) +" not available.\n"
127 common.logger.debug(1,msg)
128 continue
129 input = 'crab_fjr_' + str(id) + '.xml'
130 if os.path.exists(input):
131 codeValue = self.parseFinalReport(input)
132 job_id.append(id)
133 listCode.append(codeValue)
134 else:
135 msg = "Problems with "+str(input)+". File not available.\n"
136 common.logger.message(msg)
137 success_ret +=1
138 os.chdir( cwd )
139 common._db.updateRunJob_(job_id , listCode)
140
141 if self.logDir != self.outDir:
142 for i_id in self.list_id:
143 try:
144 cmd = 'mv '+str(self.outDir)+'/*'+str(i_id)+'.std* '+str(self.outDir)+'/.BrokerInfo '+str(self.logDir)
145 cmd_out =os.system(cmd)
146 except:
147 msg = 'Problem with copy of job results'
148 common.logger.message(msg)
149 msg = 'Results of Jobs # '+str(self.list_id)+' are in '+self.outDir+' (log files are in '+self.logDir+')'
150 common.logger.message(msg)
151 return
152
153 def parseFinalReport(self, input):
154 """
155 Parses the FJR produced by job in order to retrieve
156 the WrapperExitCode and ExeExitCode.
157 Updates the BossDB with these values.
158
159 """
160 from ProdCommon.FwkJobRep.ReportParser import readJobReport
161
162 #input = self.outDir + '/crab_fjr_' + str(jobid) + '.xml'
163 codeValue = {}
164
165 jreports = readJobReport(input)
166 if len(jreports) <= 0 :
167 codeValue["applicationReturnCode"] = str(50115)
168 codeValue["wrapperReturnCode"] = str(50115)
169 common.logger.debug(5,"Empty FWkobreport: error code assigned is 50115 ")
170 return codeValue
171
172 jobReport = jreports[0]
173
174 exit_status = ''
175
176 ##### temporary fix for FJR incomplete ####
177 fjr = open (input)
178 len_fjr = len(fjr.readlines())
179 if (len_fjr <= 6):
180 ### 50115 - cmsRun did not produce a valid/readable job report at runtime
181 codeValue["applicationReturnCode"] = str(50115)
182 codeValue["wrapperReturnCode"] = str(50115)
183
184 if len(jobReport.errors) != 0 :
185 for error in jobReport.errors:
186 if error['Type'] == 'WrapperExitCode':
187 codeValue["wrapperReturnCode"] = error['ExitStatus']
188 elif error['Type'] == 'ExeExitCode':
189 codeValue["applicationReturnCode"] = error['ExitStatus']
190 else:
191 continue
192
193 if not codeValue.has_key('wrapperReturnCode'):
194 codeValue["wrapperReturnCode"] = ''
195 if not codeValue.has_key('applicationReturnCode'):
196 codeValue["applicationReturnCode"] = ''
197
198 return codeValue
199
200 def moveOutput(self,max_id):
201 """
202 Move output of job already retrieved
203 into the correct backup directory
204 """
205 sub_id = common._db.queryRunJob('submission',self.list_id)
206 Dist_sub_id = list(set(sub_id))
207
208 OutDir_Base=self.outDir+'Submission_'
209 for i in range(1,max_id):
210 if not os.path.isdir( OutDir_Base + str(i) + '/'):
211 cmd=('mkdir '+ OutDir_Base + str(i) + '/ >& /dev/null')
212 cmd_out = runCommand(cmd)
213 common.logger.write(str(cmd_out))
214 common.logger.debug(3,str(cmd_out))
215 if self.log == 1:
216 LogDir_Base=self.logDir+'Submission_'
217 if not os.path.isdir( LogDir_Base + str(i) + '/'):
218 cmd=('mkdir '+ LogDir_Base + str(i) + '/ >& /dev/null')
219 cmd_out = runCommand(cmd)
220 common.logger.write(str(cmd_out))
221 common.logger.debug(3,str(cmd_out))
222 for i in range(len(self.list_id)):
223 id = self.list_id[i]
224 if sub_id[i] > 1 :
225 cmd='mv '+self.outDir+'*_'+str(self.list_id[i])+'.* ' + OutDir_Base + str(sub_id[i]-1) + '/ >& /dev/null'
226 #else:
227 # cmd='mv '+self.outDir+'*_'+str(self.list_id[i])+'.* ' + OutDir_Base + str(sub_id[i]) + '/ >& /dev/null'
228 try:
229 cmd_out = runCommand(cmd)
230 common.logger.write(cmd_out)
231 common.logger.debug(3,cmd_out)
232 except:
233 msg = 'no output to move for job '+str(id)
234 common.logger.write(msg)
235 common.logger.debug(3,msg)
236 pass
237 if self.log == 1:
238 cmdlog='mv '+self.logDir+'*_'+str(self.list_id[i])+'.* ' + LogDir_Base + str(sub_id[i]-1) + '/ >& /dev/null'
239 try:
240 cmd_out = runCommand(cmdlog)
241 common.logger.write(cmd_out)
242 common.logger.debug(3,cmd_out)
243 except:
244 msg = 'no output to move for job '+str(id)
245 common.logger.write(msg)
246 common.logger.debug(3,msg)
247 pass
248 return