ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/GetOutput.py
Revision: 1.40
Committed: Fri Sep 26 11:48:00 2008 UTC (16 years, 7 months ago) by slacapra
Content type: text/x-python
Branch: MAIN
Changes since 1.39: +35 -7 lines
Log Message:
add check of space left before getOuput

File Contents

# Content
1 from Actor import *
2 import common
3 import string, os, time
4 from crab_util import *
5
6 class GetOutput(Actor):
7 def __init__(self, *args):
8 self.cfg_params = args[0]
9 self.jobs = args[1]
10
11 self.log=0
12 self.outDir = self.cfg_params.get('USER.outputdir' ,common.work_space.resDir())
13 if ( self.outDir[-1] != '/' ) : self.outDir = self.outDir + '/'
14 self.logDir = self.cfg_params.get('USER.logdir' ,common.work_space.resDir())
15 if ( self.logDir[-1] != '/' ) : self.logDir = self.logDir + '/'
16 if self.logDir != self.outDir:
17 self.log=1
18 self.return_data = self.cfg_params.get('USER.return_data',0)
19
20 self.dontCheckSpaceLeft = int(self.cfg_params.get('USER.dontCheckSpaceLeft' ,0))
21
22 self.possible_status = {
23 'UN': 'Unknown',
24 'SU': 'Submitted',
25 'SW': 'Waiting',
26 'SS': 'Scheduled',
27 'R': 'Running',
28 'SD': 'Done',
29 'K': 'Killed',
30 'SA': 'Aborted',
31 'SE': 'Cleared',
32 'E': 'Cleared'
33 }
34 return
35
36 def run(self):
37 """
38 The main method of the class: Check destination dirs and
39 perform the get output
40 """
41 common.logger.debug(5, "GetOutput::run() called")
42
43 start = time.time()
44 self.getOutput()
45 stop = time.time()
46 common.logger.debug(1, "GetOutput Time: "+str(stop - start))
47 common.logger.write("GetOutput Time: "+str(stop - start))
48 pass
49
50 def checkBeforeGet(self):
51 # should be in this way... but a core dump appear... waiting for solution
52 #self.up_task = common.scheduler.queryEverything(1)
53 self.up_task = common._db.getTask()
54 list_id_done=[]
55 self.list_id=[]
56 self.all_id =[]
57 for job in self.up_task.jobs:
58 if job.runningJob['status'] in ['SD','DA']:
59 list_id_done.append(job['jobId'])
60 self.all_id.append(job['jobId'])
61 check = -1
62 if self.jobs != 'all': check = len( set(self.jobs).intersection(set(list_id_done)) )
63 if len(list_id_done)==0 or ( check == 0 ) :
64 msg=''
65 list_jobs=self.jobs
66 if self.jobs == 'all': list_jobs=self.all_id
67 msg += 'Jobs %s are not in Done status. It is not possible yet to retrieve the output.'% readableList(self,list_jobs)
68 raise CrabException(msg)
69 else:
70 if self.jobs == 'all':
71 self.list_id= list_id_done
72 if len(self.up_task.jobs)>len(self.list_id):
73 msg = '\nOnly %d jobs will be retrieved '% (len(self.list_id))
74 msg += ' from %d requested.\n'%(len(self.up_task.jobs))
75 msg += ' (for details: crab -status)'
76 common.logger.message(msg)
77 else:
78 for id in self.jobs:
79 if id in list_id_done: self.list_id.append(id)
80 if len(self.jobs) > len(self.list_id):
81 msg = '\nOnly %d jobs will be retrieved '% (len(self.list_id))
82 msg += ' from %d requested.\n'%(len(self.jobs))
83 msg += ' (for details: crab -status)'
84 common.logger.message(msg)
85 if not os.path.isdir(self.logDir) or not os.path.isdir(self.outDir):
86 msg = ' Output or Log dir not found!! check '+self.logDir+' and '+self.outDir
87 raise CrabException(msg)
88 #else:
89 # submission_id = common._db.queryRunJob('submission',self.list_id)
90 # submission_id.sort()
91 # submission_id.reverse()
92 # max_id=submission_id[0]
93 # if max_id > 1: self.moveOutput(max_id)
94
95 return
96
97 def getOutput(self):
98 """
99 Get output for a finished job with id.
100 """
101 self.checkBeforeGet()
102 # Get first job of the list
103 if not self.dontCheckSpaceLeft and not has_freespace(self.outDir, 10*1024): # First check for more than 10 Mb
104 msg = "You have LESS than 10 MB of free space on your working dir\n"
105 msg +="Please make some room before retrying\n\n"
106 msg +="To bypass this check, run \n"
107 msg +="crab -get -USER.dontCheckSpaceLeft=1 \n"
108 raise CrabException(msg)
109 list_first=self.list_id[0:1]
110 task= common.scheduler.getOutput(1, list_first, self.outDir)
111 lastSize = self.organizeOutput( task, list_first )
112 # here check disk space for first job
113 if not self.dontCheckSpaceLeft and not has_freespace(self.outDir, lastSize*len(self.list_id)*1.2) : # add a 20% overhead
114 msg = "Estimated space needed for getOutput is "+str(lastSize*len(self.list_id)*1.2)
115 msg +=" which is LESS than available space on disk\n"
116 msg +="Please make some room before retrying\n"
117 msg +="To bypass this check, run \n"
118 msg +="crab -get -USER.dontCheckSpaceLeft=1 \n"
119 raise CrabException(msg)
120 # get the size of the actual OSB of first job
121 if (len(self.list_id)>1) :
122 # check disk space for other N jobs using estimate from the first
123 list_other=self.list_id[1:]
124 task= common.scheduler.getOutput(1, list_other, self.outDir)
125 self.organizeOutput( task, list_other )
126 return
127
128 def organizeOutput(self, task, list_id):
129 """
130 Untar Output
131 """
132 listCode = []
133 job_id = []
134
135 #cwd = os.getcwd()
136 #os.chdir( self.outDir )
137 success_ret = 0
138 for id in list_id:
139 runningJob = task.getJob( id ).runningJob
140 if runningJob.isError() :
141 continue
142 file = 'out_files_'+ str(id)+'.tgz'
143 if os.path.exists(self.outDir + file):
144 self.max_id = runningJob['submission']
145 if self.max_id > 1:
146 for f in os.listdir(self.outDir):
147 if (f.find(str(id)) != -1 ) and (f != file) and f.find('Submission_'+str(id)) == -1:
148 self.moveOutput(id, self.max_id, self.outDir, f)
149 if self.log==1:
150 for f in os.listdir(self.logDir):
151 if f.find(str(id)) != -1 and f.find('Submission_'+str(id)) == -1:
152 self.moveOutput(id, self.max_id, self.logDir, f)
153 pass
154 pass
155 pass
156 size = (os.path.getsize(self.outDir + file))/1024 # in kB
157 cmd = 'tar zxvf ' + self.outDir + file + ' ' + '-C ' + self.outDir
158 cmd_out = runCommand(cmd)
159 cmd_2 ='rm ' + self.outDir + 'out_files_'+ str(id)+'.tgz'
160 #cmd_2 ='rm out_files_'+ str(id)+'.tgz'
161 cmd_out2 = runCommand(cmd_2)
162 msg = 'Results of Jobs # '+str(id)+' are in '+self.outDir
163 common.logger.message(msg)
164 else:
165 msg ="Output files for job "+ str(id) +" not available.\n"
166 common.logger.debug(1,msg)
167 continue
168 input = 'crab_fjr_' + str(id) + '.xml'
169 if os.path.exists(self.outDir + input):
170 codeValue = self.parseFinalReport(self.outDir + input)
171 job_id.append(id)
172 listCode.append(codeValue)
173 else:
174 msg = "Problems with "+str(input)+". File not available.\n"
175 common.logger.message(msg)
176 success_ret +=1
177 #os.chdir( cwd )
178 common._db.updateRunJob_(job_id , listCode)
179
180 if self.logDir != self.outDir:
181 for i_id in list_id:
182 try:
183 cmd = 'mv '+str(self.outDir)+'/*'+str(i_id)+'.std* '+str(self.outDir)+'/.BrokerInfo '+str(self.logDir)
184 cmd_out =os.system(cmd)
185 except:
186 msg = 'Problem with copy of job results'
187 common.logger.message(msg)
188 msg = 'Results of Jobs # '+str(list_id)+' are in '+self.outDir+' (log files are in '+self.logDir+')'
189 common.logger.message(msg)
190 return size
191
192 def parseFinalReport(self, input):
193 """
194 Parses the FJR produced by job in order to retrieve
195 the WrapperExitCode and ExeExitCode.
196 Updates the BossDB with these values.
197
198 """
199 from ProdCommon.FwkJobRep.ReportParser import readJobReport
200
201 #input = self.outDir + '/crab_fjr_' + str(jobid) + '.xml'
202 codeValue = {}
203
204 jreports = readJobReport(input)
205 if len(jreports) <= 0 :
206 codeValue["applicationReturnCode"] = str(50115)
207 codeValue["wrapperReturnCode"] = str(50115)
208 common.logger.debug(5,"Empty FWkobreport: error code assigned is 50115 ")
209 return codeValue
210
211 jobReport = jreports[0]
212
213 exit_status = ''
214
215 ##### temporary fix for FJR incomplete ####
216 fjr = open (input)
217 len_fjr = len(fjr.readlines())
218 if (len_fjr <= 6):
219 ### 50115 - cmsRun did not produce a valid/readable job report at runtime
220 codeValue["applicationReturnCode"] = str(50115)
221 codeValue["wrapperReturnCode"] = str(50115)
222
223 if len(jobReport.errors) != 0 :
224 for error in jobReport.errors:
225 if error['Type'] == 'WrapperExitCode':
226 codeValue["wrapperReturnCode"] = error['ExitStatus']
227 elif error['Type'] == 'ExeExitCode':
228 codeValue["applicationReturnCode"] = error['ExitStatus']
229 else:
230 continue
231
232 if not codeValue.has_key('wrapperReturnCode'):
233 codeValue["wrapperReturnCode"] = ''
234 if not codeValue.has_key('applicationReturnCode'):
235 codeValue["applicationReturnCode"] = ''
236
237 return codeValue
238
239 def moveOutput(self,id, max_id,path,file):
240 """
241 Move output of job already retrieved
242 into the correct backup directory
243 """
244 Dir_Base=path +'Submission_'
245
246 for i in range(1, max_id):
247 if not os.path.isdir( Dir_Base + str(i) + '/'):
248 cmd=('mkdir '+ Dir_Base + str(i) + '/ >& /dev/null')
249 cmd_out = runCommand(cmd)
250 common.logger.write(str(cmd_out))
251 common.logger.debug(3,str(cmd_out))
252 cmd='mv '+ path + file + ' ' + Dir_Base + str(max_id -1) + '/ >& /dev/null'
253
254 try:
255 cmd_out = runCommand(cmd)
256 common.logger.write(cmd_out)
257 common.logger.debug(3,cmd_out)
258 except:
259 msg = 'no output to move for job '+str(id)
260 common.logger.write(msg)
261 common.logger.debug(3,msg)
262 pass
263 return