ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/GetOutput.py
Revision: 1.54
Committed: Wed Jul 22 22:19:48 2009 UTC (15 years, 9 months ago) by spiga
Content type: text/x-python
Branch: MAIN
Changes since 1.53: +4 -4 lines
Log Message:
better print out formatting.

File Contents

# Content
1 from Actor import *
2 import common
3 import string, os, time
4 from crab_util import *
5
6 class GetOutput(Actor):
7 def __init__(self, *args):
8 self.cfg_params = args[0]
9 self.jobs = args[1]
10
11 self.log=0
12 self.outDir = self.cfg_params.get('USER.outputdir' ,common.work_space.resDir())
13 if ( self.outDir[-1] != '/' ) : self.outDir = self.outDir + '/'
14 self.logDir = self.cfg_params.get('USER.logdir' ,common.work_space.resDir())
15 if ( self.logDir[-1] != '/' ) : self.logDir = self.logDir + '/'
16 if self.logDir != self.outDir:
17 self.log=1
18 self.return_data = self.cfg_params.get('USER.return_data',0)
19
20 self.dontCheckSpaceLeft = int(self.cfg_params.get('USER.dontCheckSpaceLeft' ,0))
21
22 return
23
24 def run(self):
25 """
26 The main method of the class: Check destination dirs and
27 perform the get output
28 """
29 common.logger.debug( "GetOutput::run() called")
30
31 start = time.time()
32 self.getOutput()
33 stop = time.time()
34 common.logger.debug( "GetOutput Time: "+str(stop - start))
35 pass
36
37 def checkBeforeGet(self):
38 # should be in this way... but a core dump appear... waiting for solution
39 #self.up_task = common.scheduler.queryEverything(1)
40 self.up_task = common._db.getTask()
41 list_id_done=[]
42 self.list_id=[]
43 self.all_id =[]
44 for job in self.up_task.jobs:
45 if (job.runningJob['state'] == 'Terminated'):
46 list_id_done.append(job['jobId'])
47 self.all_id.append(job['jobId'])
48 check = -1
49 if self.jobs != 'all': check = len( set(self.jobs).intersection(set(list_id_done)) )
50 if len(list_id_done)==0 or ( check == 0 ) :
51 msg=''
52 list_jobs=self.jobs
53 if self.jobs == 'all': list_jobs=self.all_id
54 msg += 'Jobs %s are not in Done status. It is not possible yet to retrieve the output.'% readableList(self,list_jobs)
55 raise CrabException(msg)
56 else:
57 if self.jobs == 'all':
58 self.list_id= list_id_done
59 if len(self.up_task.jobs)>len(self.list_id):
60 msg = 'Only %d jobs will be retrieved '% (len(self.list_id))
61 msg += ' from %d requested.\n'%(len(self.up_task.jobs))
62 msg += '\t(for details: crab -status)'
63 common.logger.info(msg)
64 else:
65 for id in self.jobs:
66 if id in list_id_done: self.list_id.append(id)
67 if len(self.jobs) > len(self.list_id):
68 msg = 'Only %d jobs will be retrieved '% (len(self.list_id))
69 msg += ' from %d requested.\n'%(len(self.jobs))
70 msg += '\t(for details: crab -status)'
71 common.logger.info(msg)
72 if not os.path.isdir(self.logDir) or not os.path.isdir(self.outDir):
73 msg = ' Output or Log dir not found!! check '+self.logDir+' and '+self.outDir
74 raise CrabException(msg)
75 #else:
76 # submission_id = common._db.queryRunJob('submission',self.list_id)
77 # submission_id.sort()
78 # submission_id.reverse()
79 # max_id=submission_id[0]
80 # if max_id > 1: self.moveOutput(max_id)
81
82 return
83
84 def getOutput(self):
85 """
86 Get output for a finished job with id.
87 """
88 self.checkBeforeGet()
89 # Get first job of the list
90 if not self.dontCheckSpaceLeft and not has_freespace(self.outDir, 10*1024): # First check for more than 10 Mb
91 msg = "You have LESS than 10 MB of free space on your working dir\n"
92 msg +="Please make some room before retrying\n\n"
93 msg +="To bypass this check, run \n"
94 msg +="crab -get -USER.dontCheckSpaceLeft=1 \n"
95 raise CrabException(msg)
96 list_first=self.list_id[0:1]
97 task= common.scheduler.getOutput(1, list_first, self.outDir)
98 lastSize = self.organizeOutput( task, list_first )
99 # here check disk space for first job
100 if not self.dontCheckSpaceLeft and not has_freespace(self.outDir, lastSize*len(self.list_id)*1.2) : # add a 20% overhead
101 msg = "Estimated space needed for getOutput is "+str(lastSize*len(self.list_id)*1.2)
102 msg +=" which is LESS than available space on disk\n"
103 msg +="Please make some room before retrying\n"
104 msg +="To bypass this check, run \n"
105 msg +="crab -get -USER.dontCheckSpaceLeft=1 \n"
106 raise CrabException(msg)
107 # get the size of the actual OSB of first job
108 if (len(self.list_id)>1) :
109 # check disk space for other N jobs using estimate from the first
110 list_other=self.list_id[1:]
111 task= common.scheduler.getOutput(1, list_other, self.outDir)
112 self.organizeOutput( task, list_other )
113 return
114
115 def organizeOutput(self, task, list_id):
116 """
117 Untar Output
118 """
119 listCode = []
120 job_id = []
121
122 #cwd = os.getcwd()
123 #os.chdir( self.outDir )
124 success_ret = 0
125 size = 0 # in kB
126 for id in list_id:
127 runningJob = task.getJob( id ).runningJob
128 if runningJob.isError() :
129 continue
130 file = 'out_files_'+ str(id)+'.tgz'
131 if os.path.exists(self.outDir + file):
132 self.max_id = runningJob['submission']
133 if self.max_id > 1:
134 for f in os.listdir(self.outDir):
135 if (f.find('_'+str(id)+'.') != -1 ) and (f != file) and f.find('Submission_'+str(id)) == -1:
136 self.moveOutput(id, self.max_id, self.outDir, f)
137 if self.log==1:
138 for f in os.listdir(self.logDir):
139 if f.find('_'+str(id)+'.') != -1 and f.find('Submission_'+str(id)) == -1:
140 self.moveOutput(id, self.max_id, self.logDir, f)
141 pass
142 pass
143 pass
144 try:
145 size = getGZSize(self.outDir + file)/1024 # in kB
146 cmd = 'tar zxf ' + self.outDir + file + ' ' + '-C ' + self.outDir
147 cmd_out = runCommand(cmd)
148 cmd_2 ='rm ' + self.outDir + 'out_files_'+ str(id)+'.tgz'
149 #cmd_2 ='rm out_files_'+ str(id)+'.tgz'
150 cmd_out2 = runCommand(cmd_2)
151 msg = 'Results of Jobs # '+str(id)+' are in '+self.outDir
152 common.logger.info(msg)
153 except IOError, eio:
154 common.logger.info("Output files for job "+ str(id) +" seems corrupted.\n")
155 continue
156 else:
157 msg ="Output files for job "+ str(id) +" not available.\n"
158 common.logger.debug(msg)
159 continue
160 input = 'crab_fjr_' + str(id) + '.xml'
161 if os.path.exists(self.outDir + input):
162 FiledToUpdate = self.parseFinalReport(self.outDir + input)
163 FiledToUpdate['state']= 'Cleared'
164 job_id.append(id)
165 listCode.append(FiledToUpdate)
166 else:
167 msg = "Problems with "+str(input)+". File not available.\n"
168 common.logger.info(msg)
169 success_ret +=1
170 #os.chdir( cwd )
171 common._db.updateRunJob_(job_id , listCode)
172
173 if self.logDir != self.outDir:
174 for i_id in list_id:
175 try:
176 cmd = 'mv '+str(self.outDir)+'/*'+str(i_id)+'.std* '+str(self.outDir)+'/.BrokerInfo '+str(self.logDir)
177 cmd_out =os.system(cmd)
178 except:
179 msg = 'Problem with copy of job results'
180 common.logger.info(msg)
181 msg = 'Results of Jobs # '+str(list_id)+' are in '+self.outDir+' (log files are in '+self.logDir+')'
182 common.logger.info(msg)
183 return size
184
185 def parseFinalReport(self, input):
186 """
187 Parses the FJR produced by job in order to retrieve
188 the WrapperExitCode and ExeExitCode.
189 Updates the BossDB with these values.
190
191 """
192 from ProdCommon.FwkJobRep.ReportParser import readJobReport
193
194 #input = self.outDir + '/crab_fjr_' + str(jobid) + '.xml'
195 codeValue = {}
196
197 jreports = readJobReport(input)
198 if len(jreports) <= 0 :
199 codeValue["applicationReturnCode"] = str(50115)
200 codeValue["wrapperReturnCode"] = str(50115)
201 common.logger.debug("Empty FWkobreport: error code assigned is 50115 ")
202 return codeValue
203
204 jobReport = jreports[0]
205
206 exit_status = ''
207
208 ##### temporary fix for FJR incomplete ####
209 fjr = open (input)
210 len_fjr = len(fjr.readlines())
211 if (len_fjr <= 6):
212 ### 50115 - cmsRun did not produce a valid/readable job report at runtime
213 codeValue["applicationReturnCode"] = str(50115)
214 codeValue["wrapperReturnCode"] = str(50115)
215
216 if len(jobReport.errors) != 0 :
217 for error in jobReport.errors:
218 if error['Type'] == 'WrapperExitCode':
219 codeValue["wrapperReturnCode"] = error['ExitStatus']
220 elif error['Type'] == 'ExeExitCode':
221 codeValue["applicationReturnCode"] = error['ExitStatus']
222 if error['Type'] == 'CMSException':
223 codeValue["applicationReturnCodeOrig"] = error['ExitStatus']
224 else:
225 continue
226
227 if not codeValue.has_key('wrapperReturnCode'):
228 codeValue["wrapperReturnCode"] = ''
229 if not codeValue.has_key('applicationReturnCode'):
230 if codeValue.has_key('applicationReturnCodeOrig'):
231 codeValue["applicationReturnCode"] = \
232 codeValue["applicationReturnCodeOrig"]
233 codeValue.pop("applicationReturnCodeOrig")
234 else:
235 codeValue["applicationReturnCode"] = ''
236
237 else:
238 if codeValue.has_key('applicationReturnCodeOrig'):
239 codeValue.pop("applicationReturnCodeOrig")
240
241 #### Filling BOSS DB with SE name and LFN, for edm and not_edm files ####
242 if (len(jobReport.files) != 0):
243 for f in jobReport.files:
244 if f['LFN']:
245 codeValue["lfn"] = f['LFN']
246 if f['SEName']:
247 codeValue["storage"] = f['SEName']
248
249 if (len(jobReport.analysisFiles) != 0):
250 for aFile in jobReport.analysisFiles:
251 if aFile['LFN']:
252 codeValue["lfn"] = aFile['LFN']
253 if aFile['SEName']:
254 codeValue["storage"] = aFile['SEName']
255
256 if not codeValue.has_key('storage'):
257 codeValue["storage"] = ''
258 if not codeValue.has_key('lfn'):
259 codeValue["lfn"] = ''
260
261 return codeValue
262
263 def moveOutput(self,id, max_id,path,file):
264 """
265 Move output of job already retrieved
266 into the correct backup directory
267 """
268 Dir_Base=path +'Submission_'
269
270 for i in range(1, max_id):
271 if not os.path.isdir( Dir_Base + str(i) + '/'):
272 cmd=('mkdir '+ Dir_Base + str(i) + '/ >& /dev/null')
273 cmd_out = runCommand(cmd)
274 common.logger.debug(str(cmd_out))
275 cmd='mv '+ path + file + ' ' + Dir_Base + str(max_id -1) + '/ >& /dev/null'
276
277 try:
278 cmd_out = runCommand(cmd)
279 common.logger.debug(cmd_out)
280 except:
281 msg = 'no output to move for job '+str(id)
282 common.logger.debug(msg)
283 pass
284 return