ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/GetOutput.py
Revision: 1.62
Committed: Wed May 23 13:06:46 2012 UTC (12 years, 11 months ago) by belforte
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_9_1, CRAB_2_9_1_pre2, CRAB_2_9_1_pre1, CRAB_2_9_0, CRAB_2_9_0_pre2, CRAB_2_9_0_pre1, CRAB_2_8_8, CRAB_2_8_8_pre1, CRAB_2_8_7_patch3, CRAB_2_8_7_patch2, CRAB_2_8_7_patch1, CRAB_2_8_7, CRAB_2_8_7_pre2, CRAB_2_8_7_pre1, CRAB_2_8_6, CRAB_2_8_6_pre1, CRAB_2_8_5_patch3, CRAB_2_8_5_patch2, CRAB_2_8_5_patch1, CRAB_2_8_5, CRAB_2_8_5_pre5, CRAB_2_8_5_pre4, CRAB_2_8_5_pre3, CRAB_2_8_4_patch3, CRAB_2_8_5_pre2, CRAB_2_8_4_patch2, CRAB_2_8_5_pre1, CRAB_2_8_4_patch1, CRAB_2_8_4, CRAB_2_8_4_pre5, CRAB_2_8_4_pre4, CRAB_2_8_4_pre3, CRAB_2_8_4_pre2, CRAB_2_8_4_pre1, CRAB_2_8_3, CRAB_2_8_3_pre4, CRAB_2_8_3_pre3, CRAB_2_8_3_pre2, CRAB_2_8_3_pre1, CRAB_2_8_2_patch1, CRAB_2_8_2, CRAB_2_8_2_pre5, CRAB_2_8_2_pre4, CRAB_2_8_2_pre3, HEAD
Changes since 1.61: +8 -2 lines
Log Message:
flag with 50700 cases where gLite returned no OSB, https://savannah.cern.ch/bugs/index.php?93844

File Contents

# Content
1 from Actor import *
2 import common
3 import string, os, time
4 from crab_util import *
5
6 class GetOutput(Actor):
7 def __init__(self, *args):
8 self.cfg_params = args[0]
9 self.jobs = args[1]
10
11 self.log=0
12
13 dir = os.getcwd()+'/'
14 self.outDir = self.cfg_params.get('USER.outputdir' ,common.work_space.resDir())
15 if ( self.outDir[-1] != '/' ) : self.outDir = self.outDir + '/'
16 if ( self.outDir[0] != '/') : self.outDir = dir + self.outDir
17 self.logDir = self.cfg_params.get('USER.logdir' ,common.work_space.resDir())
18 if ( self.logDir[-1] != '/' ) : self.logDir = self.logDir + '/'
19 if ( self.logDir[0] != '/') : self.logDir = dir +self.logDir
20 if self.logDir != self.outDir:
21 self.log=1
22 self.return_data = self.cfg_params.get('USER.return_data',0)
23
24 self.dontCheckSpaceLeft = int(self.cfg_params.get('USER.dontCheckSpaceLeft' ,0))
25 return
26
27 def run(self):
28 """
29 The main method of the class: Check destination dirs and
30 perform the get output
31 """
32 common.logger.debug( "GetOutput::run() called")
33
34 start = time.time()
35 self.getOutput()
36 stop = time.time()
37 common.logger.debug( "GetOutput Time: "+str(stop - start))
38 pass
39
40 def checkBeforeGet(self):
41 # should be in this way... but a core dump appear... waiting for solution
42 #self.up_task = common.scheduler.queryEverything(1)
43 self.up_task = common._db.getTask()
44 list_id_done=[]
45 list_id_done_not_term=[]
46 self.list_id=[]
47 self.all_id =[]
48 for job in self.up_task.jobs:
49 if (job.runningJob['state'] == 'Terminated'):
50 list_id_done.append(job['jobId'])
51 elif job.runningJob['status'] in ['Done', 'Done (Failed)']:
52 list_id_done_not_term.append(job['jobId'])
53 self.all_id.append(job['jobId'])
54 check = -1
55 if self.jobs != 'all': check = len( set(self.jobs).intersection(set(list_id_done)) )
56 if len(list_id_done)==0 or ( check == 0 ) :
57 msg=''
58 list_jobs=self.jobs
59 if self.jobs == 'all': list_jobs=self.all_id
60 msg += 'Jobs %s are not in Done status. It is not possible yet to retrieve the output. \n'% readableList(self,list_jobs)
61 if len(list_id_done) > 0:
62 msg += ' Retrieve the jobs if those are in Done status and Terminatedi action. \n'
63 msg += ' To know the action of a job run: "crab -status v " \n'
64 raise CrabException(msg)
65 else:
66 if self.jobs == 'all':
67 self.list_id= list_id_done
68 if len(self.up_task.jobs)>len(self.list_id):
69 msg = 'Only %d jobs will be retrieved '% (len(self.list_id))
70 msg += ' from %d requested.\n'%(len(self.up_task.jobs))
71 msg += '\t(for details: crab -status)'
72 common.logger.info(msg)
73 else:
74 for id in self.jobs:
75 if id in list_id_done: self.list_id.append(id)
76 if len(self.jobs) > len(self.list_id):
77 msg = 'Only %d jobs will be retrieved '% (len(self.list_id))
78 msg += ' from %d requested.\n'%(len(self.jobs))
79 msg += '\t(for details: crab -status)'
80 common.logger.info(msg)
81 if not os.path.isdir(self.logDir) or not os.path.isdir(self.outDir):
82 msg = ' Output or Log dir not found!! check '+self.logDir+' and '+self.outDir
83 raise CrabException(msg)
84 return
85
86 def getOutput(self):
87 """
88 Get output for a finished job with id.
89 """
90 self.checkBeforeGet()
91 # Get first job of the list
92 if not self.dontCheckSpaceLeft and not has_freespace(self.outDir, 10*1024): # First check for more than 10 Mb
93 msg = "You have LESS than 10 MB of free space on your working dir\n"
94 msg +="Please make some room before retrying\n\n"
95 msg +="To bypass this check, run \n"
96 msg +="crab -get -USER.dontCheckSpaceLeft=1 \n"
97 raise CrabException(msg)
98 list_first=self.list_id[0:1]
99 task= common.scheduler.getOutput(1, list_first, self.outDir)
100 lastSize = self.organizeOutput( task, list_first )
101 # here check disk space for first job
102 if not self.dontCheckSpaceLeft and not has_freespace(self.outDir, lastSize*len(self.list_id)*1.2) : # add a 20% overhead
103 msg = "Estimated space needed for getOutput is "+str(lastSize*len(self.list_id)*1.2)
104 msg +=" which is LESS than available space on disk\n"
105 msg +="Please make some room before retrying\n"
106 msg +="To bypass this check, run \n"
107 msg +="crab -get -USER.dontCheckSpaceLeft=1 \n"
108 raise CrabException(msg)
109 # get the size of the actual OSB of first job
110 if (len(self.list_id)>1) :
111 # check disk space for other N jobs using estimate from the first
112 list_other=self.list_id[1:]
113 task= common.scheduler.getOutput(1, list_other, self.outDir)
114 self.organizeOutput( task, list_other )
115 return
116
117 def organizeOutput(self, task, list_id):
118 """
119 Untar Output
120 """
121 listCode = []
122 job_id = []
123
124 success_ret = 0
125 size = 0 # in kB
126 for id in list_id:
127 runningJob = task.getJob( id ).runningJob
128 if runningJob.isError() :
129 continue
130 file = 'out_files_'+ str(id)+'.tgz'
131 if os.path.exists(self.outDir + file):
132 self.max_id = runningJob['submission']
133 if self.max_id > 1:
134 for f in os.listdir(self.outDir):
135 if (f.find('_'+str(id)+'.') != -1 ) and (f != file) and f.find('Submission_'+str(id)) == -1:
136 self.moveOutput(id, self.max_id, self.outDir, f)
137 if self.log==1:
138 for f in os.listdir(self.logDir):
139 if f.find('_'+str(id)+'.') != -1 and f.find('Submission_'+str(id)) == -1:
140 self.moveOutput(id, self.max_id, self.logDir, f)
141 pass
142 pass
143 pass
144 try:
145 size = getGZSize(self.outDir + file)/1024 # in kB
146 cmd = 'tar zxf ' + self.outDir + file + ' ' + '-C ' + self.outDir
147 cmd_out = runCommand(cmd)
148 cmd_2 ='rm ' + self.outDir + 'out_files_'+ str(id)+'.tgz'
149 cmd_out2 = runCommand(cmd_2)
150 msg = 'Results of Jobs # '+str(id)+' are in '+self.outDir
151 common.logger.info(msg)
152 except IOError, eio:
153 common.logger.info("Output files for job "+ str(id) +" seems corrupted.\n")
154 continue
155 else:
156 msg ="Output files for job "+ str(id) +" not available.\n"
157 common.logger.info(msg)
158 FieldToUpdate={}
159 FieldToUpdate['state']= 'Cleared'
160 FieldToUpdate["applicationReturnCode"] = str(50700)
161 FieldToUpdate["wrapperReturnCode"] = str(50700)
162 job_id.append(id)
163 listCode.append(FieldToUpdate)
164 #continue
165 input = 'crab_fjr_' + str(id) + '.xml'
166 if os.path.exists(self.outDir + input):
167 FiledToUpdate = self.parseFinalReport(self.outDir + input)
168 FiledToUpdate['state']= 'Cleared'
169 job_id.append(id)
170 listCode.append(FiledToUpdate)
171 else:
172 msg = "Problems with "+str(input)+". File not available.\n"
173 common.logger.info(msg)
174 success_ret +=1
175 common._db.updateRunJob_(job_id , listCode)
176
177 if self.logDir != self.outDir:
178 for i_id in list_id:
179 try:
180 cmd = 'mv '+str(self.outDir)+'/*'+str(i_id)+'.std* '+str(self.outDir)+'/.BrokerInfo '+str(self.logDir)
181 cmd_out =os.system(cmd)
182 except:
183 msg = 'Problem with copy of job results'
184 common.logger.info(msg)
185 msg = 'Results of Jobs # '+str(list_id)+' are in '+self.outDir+' (log files are in '+self.logDir+')'
186 common.logger.info(msg)
187 return size
188
189 def parseFinalReport(self, input):
190 """
191 Parses the FJR produced by job in order to retrieve
192 the WrapperExitCode and ExeExitCode.
193 Updates the BossDB with these values.
194
195 """
196 from ProdCommon.FwkJobRep.ReportParser import readJobReport
197
198 codeValue = {}
199
200 jreports = readJobReport(input)
201 if len(jreports) <= 0 :
202 codeValue["applicationReturnCode"] = str(50115)
203 codeValue["wrapperReturnCode"] = str(50115)
204 common.logger.debug("Empty FWkobreport: error code assigned is 50115 ")
205 return codeValue
206
207 jobReport = jreports[0]
208
209 exit_status = ''
210
211 ##### temporary fix for FJR incomplete ####
212 fjr = open (input)
213 len_fjr = len(fjr.readlines())
214 if (len_fjr <= 6):
215 ### 50115 - cmsRun did not produce a valid/readable job report at runtime
216 codeValue["applicationReturnCode"] = str(50115)
217 codeValue["wrapperReturnCode"] = str(50115)
218
219 if len(jobReport.errors) != 0 :
220 for error in jobReport.errors:
221 if error['Type'] == 'WrapperExitCode':
222 codeValue["wrapperReturnCode"] = error['ExitStatus']
223 elif error['Type'] == 'ExeExitCode':
224 codeValue["applicationReturnCode"] = error['ExitStatus']
225 if error['Type'] == 'CMSException':
226 codeValue["applicationReturnCodeOrig"] = error['ExitStatus']
227 else:
228 continue
229
230 if not codeValue.has_key('wrapperReturnCode'):
231 codeValue["wrapperReturnCode"] = ''
232 if not codeValue.has_key('applicationReturnCode'):
233 if codeValue.has_key('applicationReturnCodeOrig'):
234 codeValue["applicationReturnCode"] = \
235 codeValue["applicationReturnCodeOrig"]
236 codeValue.pop("applicationReturnCodeOrig")
237 else:
238 codeValue["applicationReturnCode"] = ''
239
240 else:
241 if codeValue.has_key('applicationReturnCodeOrig'):
242 codeValue.pop("applicationReturnCodeOrig")
243
244 #### Filling BOSS DB with SE name and LFN, for edm and not_edm files ####
245 lfns=[]
246 pfns=[]
247
248 if (len(jobReport.files) != 0):
249 for f in jobReport.files:
250 if f['LFN']:
251 lfns.append(f['LFN'])
252 if f['PFN']:
253 #### FEDE to have the correct endpoit to use in the copyData (we modify the bossDB value and not the fjr )
254 if common.scheduler.name().upper() not in ['LSF', 'CAF', 'PBS'] and codeValue["wrapperReturnCode"] == 60308:
255 pfns.append(os.path.dirname(f['SurlForGrid'])+'/')
256 else:
257 pfns.append(os.path.dirname(f['PFN'])+'/')
258 ##########
259 if (len(jobReport.analysisFiles) != 0):
260 for aFile in jobReport.analysisFiles:
261 if aFile['LFN']:
262 lfns.append(aFile['LFN'])
263 if aFile['PFN']:
264 #### FEDE to have the correct endpoit to use in the copyData (we modify the bossDB value and not the fjr )
265 if common.scheduler.name().upper() not in ['LSF', 'CAF', 'PBS'] and codeValue["wrapperReturnCode"] == 60308:
266 pfns.append(os.path.dirname(aFile['SurlForGrid'])+'/')
267 else:
268 pfns.append(os.path.dirname(aFile['PFN'])+'/')
269 #########
270 codeValue["storage"] = pfns
271 codeValue["lfn"] = lfns
272 return codeValue
273
274 def moveOutput(self,id, max_id,path,file):
275 """
276 Move output of job already retrieved
277 into the correct backup directory
278 """
279 Dir_Base=path +'Submission_'
280
281 for i in range(1, max_id):
282 if not os.path.isdir( Dir_Base + str(i) + '/'):
283 cmd=('mkdir '+ Dir_Base + str(i) + '/ >& /dev/null')
284 cmd_out = runCommand(cmd)
285 common.logger.debug(str(cmd_out))
286 cmd='mv '+ path + file + ' ' + Dir_Base + str(max_id -1) + '/ >& /dev/null'
287
288 try:
289 cmd_out = runCommand(cmd)
290 common.logger.debug(cmd_out)
291 except:
292 msg = 'no output to move for job '+str(id)
293 common.logger.debug(msg)
294 pass
295 return