ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/GetOutput.py
Revision: 1.43
Committed: Tue Oct 28 17:40:20 2008 UTC (16 years, 6 months ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_4_2_pre3, CRAB_2_4_2_pre2
Changes since 1.42: +10 -3 lines
Log Message:
keep CMSException from fjr in case of cmsRun failure...

File Contents

# User Rev Content
1 spiga 1.1 from Actor import *
2     import common
3     import string, os, time
4     from crab_util import *
5    
6     class GetOutput(Actor):
7     def __init__(self, *args):
8     self.cfg_params = args[0]
9     self.jobs = args[1]
10    
11 fanzago 1.31 self.log=0
12 fanzago 1.16 self.outDir = self.cfg_params.get('USER.outputdir' ,common.work_space.resDir())
13 fanzago 1.30 if ( self.outDir[-1] != '/' ) : self.outDir = self.outDir + '/'
14 fanzago 1.16 self.logDir = self.cfg_params.get('USER.logdir' ,common.work_space.resDir())
15 fanzago 1.30 if ( self.logDir[-1] != '/' ) : self.logDir = self.logDir + '/'
16 fanzago 1.31 if self.logDir != self.outDir:
17     self.log=1
18 spiga 1.1 self.return_data = self.cfg_params.get('USER.return_data',0)
19    
20 slacapra 1.40 self.dontCheckSpaceLeft = int(self.cfg_params.get('USER.dontCheckSpaceLeft' ,0))
21    
22 slacapra 1.10 self.possible_status = {
23     'UN': 'Unknown',
24     'SU': 'Submitted',
25     'SW': 'Waiting',
26     'SS': 'Scheduled',
27     'R': 'Running',
28     'SD': 'Done',
29 mcinquil 1.36 'K': 'Killed',
30 slacapra 1.10 'SA': 'Aborted',
31     'SE': 'Cleared',
32     'E': 'Cleared'
33     }
34 spiga 1.1 return
35    
36     def run(self):
37     """
38 spiga 1.11 The main method of the class: Check destination dirs and
39     perform the get output
40 spiga 1.1 """
41 spiga 1.11 common.logger.debug(5, "GetOutput::run() called")
42 spiga 1.1
43     start = time.time()
44     self.getOutput()
45     stop = time.time()
46     common.logger.debug(1, "GetOutput Time: "+str(stop - start))
47     common.logger.write("GetOutput Time: "+str(stop - start))
48     pass
49    
50     def checkBeforeGet(self):
51 spiga 1.2 # should be in this way... but a core dump appear... waiting for solution
52     #self.up_task = common.scheduler.queryEverything(1)
53     self.up_task = common._db.getTask()
54 spiga 1.1 list_id_done=[]
55     self.list_id=[]
56     self.all_id =[]
57     for job in self.up_task.jobs:
58 mcinquil 1.32 if job.runningJob['status'] in ['SD','DA']:
59 spiga 1.18 list_id_done.append(job['jobId'])
60     self.all_id.append(job['jobId'])
61 spiga 1.1 check = -1
62 spiga 1.6 if self.jobs != 'all': check = len( set(self.jobs).intersection(set(list_id_done)) )
63 spiga 1.1 if len(list_id_done)==0 or ( check == 0 ) :
64 spiga 1.35 msg=''
65     list_jobs=self.jobs
66     if self.jobs == 'all': list_jobs=self.all_id
67     msg += 'Jobs %s are not in Done status. It is not possible yet to retrieve the output.'% readableList(self,list_jobs)
68 spiga 1.1 raise CrabException(msg)
69     else:
70     if self.jobs == 'all':
71     self.list_id= list_id_done
72     if len(self.up_task.jobs)>len(self.list_id):
73     msg = '\nOnly %d jobs will be retrieved '% (len(self.list_id))
74     msg += ' from %d requested.\n'%(len(self.up_task.jobs))
75     msg += ' (for details: crab -status)'
76     common.logger.message(msg)
77     else:
78     for id in self.jobs:
79     if id in list_id_done: self.list_id.append(id)
80 spiga 1.6 if len(self.jobs) > len(self.list_id):
81 spiga 1.1 msg = '\nOnly %d jobs will be retrieved '% (len(self.list_id))
82     msg += ' from %d requested.\n'%(len(self.jobs))
83     msg += ' (for details: crab -status)'
84     common.logger.message(msg)
85     if not os.path.isdir(self.logDir) or not os.path.isdir(self.outDir):
86     msg = ' Output or Log dir not found!! check '+self.logDir+' and '+self.outDir
87     raise CrabException(msg)
88 fanzago 1.33 #else:
89     # submission_id = common._db.queryRunJob('submission',self.list_id)
90     # submission_id.sort()
91     # submission_id.reverse()
92     # max_id=submission_id[0]
93     # if max_id > 1: self.moveOutput(max_id)
94 spiga 1.1
95     return
96    
97     def getOutput(self):
98     """
99     Get output for a finished job with id.
100     """
101 spiga 1.4 self.checkBeforeGet()
102 slacapra 1.40 # Get first job of the list
103     if not self.dontCheckSpaceLeft and not has_freespace(self.outDir, 10*1024): # First check for more than 10 Mb
104     msg = "You have LESS than 10 MB of free space on your working dir\n"
105     msg +="Please make some room before retrying\n\n"
106     msg +="To bypass this check, run \n"
107     msg +="crab -get -USER.dontCheckSpaceLeft=1 \n"
108     raise CrabException(msg)
109     list_first=self.list_id[0:1]
110     task= common.scheduler.getOutput(1, list_first, self.outDir)
111     lastSize = self.organizeOutput( task, list_first )
112     # here check disk space for first job
113     if not self.dontCheckSpaceLeft and not has_freespace(self.outDir, lastSize*len(self.list_id)*1.2) : # add a 20% overhead
114     msg = "Estimated space needed for getOutput is "+str(lastSize*len(self.list_id)*1.2)
115     msg +=" which is LESS than available space on disk\n"
116     msg +="Please make some room before retrying\n"
117     msg +="To bypass this check, run \n"
118     msg +="crab -get -USER.dontCheckSpaceLeft=1 \n"
119     raise CrabException(msg)
120     # get the size of the actual OSB of first job
121     if (len(self.list_id)>1) :
122     # check disk space for other N jobs using estimate from the first
123     list_other=self.list_id[1:]
124     task= common.scheduler.getOutput(1, list_other, self.outDir)
125     self.organizeOutput( task, list_other )
126 spiga 1.12 return
127    
128 slacapra 1.40 def organizeOutput(self, task, list_id):
129 spiga 1.12 """
130     Untar Output
131     """
132 spiga 1.6 listCode = []
133     job_id = []
134 spiga 1.2
135 fanzago 1.33 #cwd = os.getcwd()
136     #os.chdir( self.outDir )
137 spiga 1.13 success_ret = 0
138 fanzago 1.42 size = 0 # in kB
139 slacapra 1.40 for id in list_id:
140 spiga 1.39 runningJob = task.getJob( id ).runningJob
141 spiga 1.38 if runningJob.isError() :
142     continue
143 spiga 1.6 file = 'out_files_'+ str(id)+'.tgz'
144 fanzago 1.33 if os.path.exists(self.outDir + file):
145 spiga 1.39 self.max_id = runningJob['submission']
146 fanzago 1.33 if self.max_id > 1:
147     for f in os.listdir(self.outDir):
148 fanzago 1.37 if (f.find(str(id)) != -1 ) and (f != file) and f.find('Submission_'+str(id)) == -1:
149 fanzago 1.33 self.moveOutput(id, self.max_id, self.outDir, f)
150     if self.log==1:
151     for f in os.listdir(self.logDir):
152 fanzago 1.37 if f.find(str(id)) != -1 and f.find('Submission_'+str(id)) == -1:
153 fanzago 1.33 self.moveOutput(id, self.max_id, self.logDir, f)
154 slacapra 1.40 pass
155     pass
156     pass
157 slacapra 1.41 size = getGZSize(self.outDir + file)/1024 # in kB
158     cmd = 'tar zxf ' + self.outDir + file + ' ' + '-C ' + self.outDir
159 spiga 1.6 cmd_out = runCommand(cmd)
160 fanzago 1.33 cmd_2 ='rm ' + self.outDir + 'out_files_'+ str(id)+'.tgz'
161     #cmd_2 ='rm out_files_'+ str(id)+'.tgz'
162 spiga 1.9 cmd_out2 = runCommand(cmd_2)
163 spiga 1.21 msg = 'Results of Jobs # '+str(id)+' are in '+self.outDir
164 spiga 1.20 common.logger.message(msg)
165 spiga 1.6 else:
166 spiga 1.7 msg ="Output files for job "+ str(id) +" not available.\n"
167 spiga 1.13 common.logger.debug(1,msg)
168 spiga 1.7 continue
169 spiga 1.6 input = 'crab_fjr_' + str(id) + '.xml'
170 fanzago 1.33 if os.path.exists(self.outDir + input):
171     codeValue = self.parseFinalReport(self.outDir + input)
172 spiga 1.8 job_id.append(id)
173 spiga 1.6 listCode.append(codeValue)
174     else:
175     msg = "Problems with "+str(input)+". File not available.\n"
176     common.logger.message(msg)
177 spiga 1.13 success_ret +=1
178 fanzago 1.33 #os.chdir( cwd )
179 spiga 1.6 common._db.updateRunJob_(job_id , listCode)
180 spiga 1.1
181     if self.logDir != self.outDir:
182 slacapra 1.40 for i_id in list_id:
183 spiga 1.1 try:
184 fanzago 1.17 cmd = 'mv '+str(self.outDir)+'/*'+str(i_id)+'.std* '+str(self.outDir)+'/.BrokerInfo '+str(self.logDir)
185 spiga 1.1 cmd_out =os.system(cmd)
186     except:
187     msg = 'Problem with copy of job results'
188     common.logger.message(msg)
189 slacapra 1.40 msg = 'Results of Jobs # '+str(list_id)+' are in '+self.outDir+' (log files are in '+self.logDir+')'
190 spiga 1.1 common.logger.message(msg)
191 slacapra 1.40 return size
192 spiga 1.1
193 fanzago 1.5 def parseFinalReport(self, input):
194     """
195     Parses the FJR produced by job in order to retrieve
196     the WrapperExitCode and ExeExitCode.
197     Updates the BossDB with these values.
198    
199     """
200     from ProdCommon.FwkJobRep.ReportParser import readJobReport
201    
202     #input = self.outDir + '/crab_fjr_' + str(jobid) + '.xml'
203     codeValue = {}
204    
205 afanfani 1.14 jreports = readJobReport(input)
206     if len(jreports) <= 0 :
207     codeValue["applicationReturnCode"] = str(50115)
208     codeValue["wrapperReturnCode"] = str(50115)
209     common.logger.debug(5,"Empty FWkobreport: error code assigned is 50115 ")
210     return codeValue
211    
212     jobReport = jreports[0]
213 fanzago 1.5
214     exit_status = ''
215    
216     ##### temporary fix for FJR incomplete ####
217     fjr = open (input)
218     len_fjr = len(fjr.readlines())
219     if (len_fjr <= 6):
220     ### 50115 - cmsRun did not produce a valid/readable job report at runtime
221     codeValue["applicationReturnCode"] = str(50115)
222     codeValue["wrapperReturnCode"] = str(50115)
223    
224     if len(jobReport.errors) != 0 :
225     for error in jobReport.errors:
226     if error['Type'] == 'WrapperExitCode':
227     codeValue["wrapperReturnCode"] = error['ExitStatus']
228 spiga 1.43 elif error['Type'] == 'ExeExitCode':
229     codeValue["applicationReturnCode"] = error['ExitStatus']
230     if error['Type'] == 'CMSException':
231     codeValue["applicationReturnCodeOrig"] = error['ExitStatus']
232 fanzago 1.5 else:
233     continue
234    
235     if not codeValue.has_key('wrapperReturnCode'):
236     codeValue["wrapperReturnCode"] = ''
237     if not codeValue.has_key('applicationReturnCode'):
238 spiga 1.43 if codeValue.has_key('applicationReturnCodeOrig'):
239     codeValue["applicationReturnCode"] = \
240     codeValue["applicationReturnCodeOrig"]
241     codeValue.pop("applicationReturnCodeOrig")
242     else:
243     codeValue["applicationReturnCode"] = ''
244 fanzago 1.5
245     return codeValue
246 spiga 1.1
247 fanzago 1.33 def moveOutput(self,id, max_id,path,file):
248 spiga 1.6 """
249     Move output of job already retrieved
250     into the correct backup directory
251     """
252 fanzago 1.33 Dir_Base=path +'Submission_'
253    
254     for i in range(1, max_id):
255     if not os.path.isdir( Dir_Base + str(i) + '/'):
256     cmd=('mkdir '+ Dir_Base + str(i) + '/ >& /dev/null')
257 spiga 1.6 cmd_out = runCommand(cmd)
258 mcinquil 1.24 common.logger.write(str(cmd_out))
259     common.logger.debug(3,str(cmd_out))
260 fanzago 1.33 cmd='mv '+ path + file + ' ' + Dir_Base + str(max_id -1) + '/ >& /dev/null'
261    
262     try:
263     cmd_out = runCommand(cmd)
264     common.logger.write(cmd_out)
265     common.logger.debug(3,cmd_out)
266     except:
267     msg = 'no output to move for job '+str(id)
268     common.logger.write(msg)
269     common.logger.debug(3,msg)
270     pass
271 spiga 1.6 return