ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/GetOutput.py
Revision: 1.43
Committed: Tue Oct 28 17:40:20 2008 UTC (16 years, 6 months ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_4_2_pre3, CRAB_2_4_2_pre2
Changes since 1.42: +10 -3 lines
Log Message:
keep CMSException from fjr in case of cmsRun failure...

File Contents

# Content
1 from Actor import *
2 import common
3 import string, os, time
4 from crab_util import *
5
6 class GetOutput(Actor):
7 def __init__(self, *args):
8 self.cfg_params = args[0]
9 self.jobs = args[1]
10
11 self.log=0
12 self.outDir = self.cfg_params.get('USER.outputdir' ,common.work_space.resDir())
13 if ( self.outDir[-1] != '/' ) : self.outDir = self.outDir + '/'
14 self.logDir = self.cfg_params.get('USER.logdir' ,common.work_space.resDir())
15 if ( self.logDir[-1] != '/' ) : self.logDir = self.logDir + '/'
16 if self.logDir != self.outDir:
17 self.log=1
18 self.return_data = self.cfg_params.get('USER.return_data',0)
19
20 self.dontCheckSpaceLeft = int(self.cfg_params.get('USER.dontCheckSpaceLeft' ,0))
21
22 self.possible_status = {
23 'UN': 'Unknown',
24 'SU': 'Submitted',
25 'SW': 'Waiting',
26 'SS': 'Scheduled',
27 'R': 'Running',
28 'SD': 'Done',
29 'K': 'Killed',
30 'SA': 'Aborted',
31 'SE': 'Cleared',
32 'E': 'Cleared'
33 }
34 return
35
36 def run(self):
37 """
38 The main method of the class: Check destination dirs and
39 perform the get output
40 """
41 common.logger.debug(5, "GetOutput::run() called")
42
43 start = time.time()
44 self.getOutput()
45 stop = time.time()
46 common.logger.debug(1, "GetOutput Time: "+str(stop - start))
47 common.logger.write("GetOutput Time: "+str(stop - start))
48 pass
49
50 def checkBeforeGet(self):
51 # should be in this way... but a core dump appear... waiting for solution
52 #self.up_task = common.scheduler.queryEverything(1)
53 self.up_task = common._db.getTask()
54 list_id_done=[]
55 self.list_id=[]
56 self.all_id =[]
57 for job in self.up_task.jobs:
58 if job.runningJob['status'] in ['SD','DA']:
59 list_id_done.append(job['jobId'])
60 self.all_id.append(job['jobId'])
61 check = -1
62 if self.jobs != 'all': check = len( set(self.jobs).intersection(set(list_id_done)) )
63 if len(list_id_done)==0 or ( check == 0 ) :
64 msg=''
65 list_jobs=self.jobs
66 if self.jobs == 'all': list_jobs=self.all_id
67 msg += 'Jobs %s are not in Done status. It is not possible yet to retrieve the output.'% readableList(self,list_jobs)
68 raise CrabException(msg)
69 else:
70 if self.jobs == 'all':
71 self.list_id= list_id_done
72 if len(self.up_task.jobs)>len(self.list_id):
73 msg = '\nOnly %d jobs will be retrieved '% (len(self.list_id))
74 msg += ' from %d requested.\n'%(len(self.up_task.jobs))
75 msg += ' (for details: crab -status)'
76 common.logger.message(msg)
77 else:
78 for id in self.jobs:
79 if id in list_id_done: self.list_id.append(id)
80 if len(self.jobs) > len(self.list_id):
81 msg = '\nOnly %d jobs will be retrieved '% (len(self.list_id))
82 msg += ' from %d requested.\n'%(len(self.jobs))
83 msg += ' (for details: crab -status)'
84 common.logger.message(msg)
85 if not os.path.isdir(self.logDir) or not os.path.isdir(self.outDir):
86 msg = ' Output or Log dir not found!! check '+self.logDir+' and '+self.outDir
87 raise CrabException(msg)
88 #else:
89 # submission_id = common._db.queryRunJob('submission',self.list_id)
90 # submission_id.sort()
91 # submission_id.reverse()
92 # max_id=submission_id[0]
93 # if max_id > 1: self.moveOutput(max_id)
94
95 return
96
97 def getOutput(self):
98 """
99 Get output for a finished job with id.
100 """
101 self.checkBeforeGet()
102 # Get first job of the list
103 if not self.dontCheckSpaceLeft and not has_freespace(self.outDir, 10*1024): # First check for more than 10 Mb
104 msg = "You have LESS than 10 MB of free space on your working dir\n"
105 msg +="Please make some room before retrying\n\n"
106 msg +="To bypass this check, run \n"
107 msg +="crab -get -USER.dontCheckSpaceLeft=1 \n"
108 raise CrabException(msg)
109 list_first=self.list_id[0:1]
110 task= common.scheduler.getOutput(1, list_first, self.outDir)
111 lastSize = self.organizeOutput( task, list_first )
112 # here check disk space for first job
113 if not self.dontCheckSpaceLeft and not has_freespace(self.outDir, lastSize*len(self.list_id)*1.2) : # add a 20% overhead
114 msg = "Estimated space needed for getOutput is "+str(lastSize*len(self.list_id)*1.2)
115 msg +=" which is LESS than available space on disk\n"
116 msg +="Please make some room before retrying\n"
117 msg +="To bypass this check, run \n"
118 msg +="crab -get -USER.dontCheckSpaceLeft=1 \n"
119 raise CrabException(msg)
120 # get the size of the actual OSB of first job
121 if (len(self.list_id)>1) :
122 # check disk space for other N jobs using estimate from the first
123 list_other=self.list_id[1:]
124 task= common.scheduler.getOutput(1, list_other, self.outDir)
125 self.organizeOutput( task, list_other )
126 return
127
128 def organizeOutput(self, task, list_id):
129 """
130 Untar Output
131 """
132 listCode = []
133 job_id = []
134
135 #cwd = os.getcwd()
136 #os.chdir( self.outDir )
137 success_ret = 0
138 size = 0 # in kB
139 for id in list_id:
140 runningJob = task.getJob( id ).runningJob
141 if runningJob.isError() :
142 continue
143 file = 'out_files_'+ str(id)+'.tgz'
144 if os.path.exists(self.outDir + file):
145 self.max_id = runningJob['submission']
146 if self.max_id > 1:
147 for f in os.listdir(self.outDir):
148 if (f.find(str(id)) != -1 ) and (f != file) and f.find('Submission_'+str(id)) == -1:
149 self.moveOutput(id, self.max_id, self.outDir, f)
150 if self.log==1:
151 for f in os.listdir(self.logDir):
152 if f.find(str(id)) != -1 and f.find('Submission_'+str(id)) == -1:
153 self.moveOutput(id, self.max_id, self.logDir, f)
154 pass
155 pass
156 pass
157 size = getGZSize(self.outDir + file)/1024 # in kB
158 cmd = 'tar zxf ' + self.outDir + file + ' ' + '-C ' + self.outDir
159 cmd_out = runCommand(cmd)
160 cmd_2 ='rm ' + self.outDir + 'out_files_'+ str(id)+'.tgz'
161 #cmd_2 ='rm out_files_'+ str(id)+'.tgz'
162 cmd_out2 = runCommand(cmd_2)
163 msg = 'Results of Jobs # '+str(id)+' are in '+self.outDir
164 common.logger.message(msg)
165 else:
166 msg ="Output files for job "+ str(id) +" not available.\n"
167 common.logger.debug(1,msg)
168 continue
169 input = 'crab_fjr_' + str(id) + '.xml'
170 if os.path.exists(self.outDir + input):
171 codeValue = self.parseFinalReport(self.outDir + input)
172 job_id.append(id)
173 listCode.append(codeValue)
174 else:
175 msg = "Problems with "+str(input)+". File not available.\n"
176 common.logger.message(msg)
177 success_ret +=1
178 #os.chdir( cwd )
179 common._db.updateRunJob_(job_id , listCode)
180
181 if self.logDir != self.outDir:
182 for i_id in list_id:
183 try:
184 cmd = 'mv '+str(self.outDir)+'/*'+str(i_id)+'.std* '+str(self.outDir)+'/.BrokerInfo '+str(self.logDir)
185 cmd_out =os.system(cmd)
186 except:
187 msg = 'Problem with copy of job results'
188 common.logger.message(msg)
189 msg = 'Results of Jobs # '+str(list_id)+' are in '+self.outDir+' (log files are in '+self.logDir+')'
190 common.logger.message(msg)
191 return size
192
193 def parseFinalReport(self, input):
194 """
195 Parses the FJR produced by job in order to retrieve
196 the WrapperExitCode and ExeExitCode.
197 Updates the BossDB with these values.
198
199 """
200 from ProdCommon.FwkJobRep.ReportParser import readJobReport
201
202 #input = self.outDir + '/crab_fjr_' + str(jobid) + '.xml'
203 codeValue = {}
204
205 jreports = readJobReport(input)
206 if len(jreports) <= 0 :
207 codeValue["applicationReturnCode"] = str(50115)
208 codeValue["wrapperReturnCode"] = str(50115)
209 common.logger.debug(5,"Empty FWkobreport: error code assigned is 50115 ")
210 return codeValue
211
212 jobReport = jreports[0]
213
214 exit_status = ''
215
216 ##### temporary fix for FJR incomplete ####
217 fjr = open (input)
218 len_fjr = len(fjr.readlines())
219 if (len_fjr <= 6):
220 ### 50115 - cmsRun did not produce a valid/readable job report at runtime
221 codeValue["applicationReturnCode"] = str(50115)
222 codeValue["wrapperReturnCode"] = str(50115)
223
224 if len(jobReport.errors) != 0 :
225 for error in jobReport.errors:
226 if error['Type'] == 'WrapperExitCode':
227 codeValue["wrapperReturnCode"] = error['ExitStatus']
228 elif error['Type'] == 'ExeExitCode':
229 codeValue["applicationReturnCode"] = error['ExitStatus']
230 if error['Type'] == 'CMSException':
231 codeValue["applicationReturnCodeOrig"] = error['ExitStatus']
232 else:
233 continue
234
235 if not codeValue.has_key('wrapperReturnCode'):
236 codeValue["wrapperReturnCode"] = ''
237 if not codeValue.has_key('applicationReturnCode'):
238 if codeValue.has_key('applicationReturnCodeOrig'):
239 codeValue["applicationReturnCode"] = \
240 codeValue["applicationReturnCodeOrig"]
241 codeValue.pop("applicationReturnCodeOrig")
242 else:
243 codeValue["applicationReturnCode"] = ''
244
245 return codeValue
246
247 def moveOutput(self,id, max_id,path,file):
248 """
249 Move output of job already retrieved
250 into the correct backup directory
251 """
252 Dir_Base=path +'Submission_'
253
254 for i in range(1, max_id):
255 if not os.path.isdir( Dir_Base + str(i) + '/'):
256 cmd=('mkdir '+ Dir_Base + str(i) + '/ >& /dev/null')
257 cmd_out = runCommand(cmd)
258 common.logger.write(str(cmd_out))
259 common.logger.debug(3,str(cmd_out))
260 cmd='mv '+ path + file + ' ' + Dir_Base + str(max_id -1) + '/ >& /dev/null'
261
262 try:
263 cmd_out = runCommand(cmd)
264 common.logger.write(cmd_out)
265 common.logger.debug(3,cmd_out)
266 except:
267 msg = 'no output to move for job '+str(id)
268 common.logger.write(msg)
269 common.logger.debug(3,msg)
270 pass
271 return