ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/GetOutput.py
Revision: 1.41
Committed: Fri Sep 26 17:05:53 2008 UTC (16 years, 7 months ago) by slacapra
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_4_0_pre4
Changes since 1.40: +2 -2 lines
Log Message:
check size of ungzipped OSB

File Contents

# User Rev Content
1 spiga 1.1 from Actor import *
2     import common
3     import string, os, time
4     from crab_util import *
5    
6     class GetOutput(Actor):
7     def __init__(self, *args):
8     self.cfg_params = args[0]
9     self.jobs = args[1]
10    
11 fanzago 1.31 self.log=0
12 fanzago 1.16 self.outDir = self.cfg_params.get('USER.outputdir' ,common.work_space.resDir())
13 fanzago 1.30 if ( self.outDir[-1] != '/' ) : self.outDir = self.outDir + '/'
14 fanzago 1.16 self.logDir = self.cfg_params.get('USER.logdir' ,common.work_space.resDir())
15 fanzago 1.30 if ( self.logDir[-1] != '/' ) : self.logDir = self.logDir + '/'
16 fanzago 1.31 if self.logDir != self.outDir:
17     self.log=1
18 spiga 1.1 self.return_data = self.cfg_params.get('USER.return_data',0)
19    
20 slacapra 1.40 self.dontCheckSpaceLeft = int(self.cfg_params.get('USER.dontCheckSpaceLeft' ,0))
21    
22 slacapra 1.10 self.possible_status = {
23     'UN': 'Unknown',
24     'SU': 'Submitted',
25     'SW': 'Waiting',
26     'SS': 'Scheduled',
27     'R': 'Running',
28     'SD': 'Done',
29 mcinquil 1.36 'K': 'Killed',
30 slacapra 1.10 'SA': 'Aborted',
31     'SE': 'Cleared',
32     'E': 'Cleared'
33     }
34 spiga 1.1 return
35    
36     def run(self):
37     """
38 spiga 1.11 The main method of the class: Check destination dirs and
39     perform the get output
40 spiga 1.1 """
41 spiga 1.11 common.logger.debug(5, "GetOutput::run() called")
42 spiga 1.1
43     start = time.time()
44     self.getOutput()
45     stop = time.time()
46     common.logger.debug(1, "GetOutput Time: "+str(stop - start))
47     common.logger.write("GetOutput Time: "+str(stop - start))
48     pass
49    
50     def checkBeforeGet(self):
51 spiga 1.2 # should be in this way... but a core dump appear... waiting for solution
52     #self.up_task = common.scheduler.queryEverything(1)
53     self.up_task = common._db.getTask()
54 spiga 1.1 list_id_done=[]
55     self.list_id=[]
56     self.all_id =[]
57     for job in self.up_task.jobs:
58 mcinquil 1.32 if job.runningJob['status'] in ['SD','DA']:
59 spiga 1.18 list_id_done.append(job['jobId'])
60     self.all_id.append(job['jobId'])
61 spiga 1.1 check = -1
62 spiga 1.6 if self.jobs != 'all': check = len( set(self.jobs).intersection(set(list_id_done)) )
63 spiga 1.1 if len(list_id_done)==0 or ( check == 0 ) :
64 spiga 1.35 msg=''
65     list_jobs=self.jobs
66     if self.jobs == 'all': list_jobs=self.all_id
67     msg += 'Jobs %s are not in Done status. It is not possible yet to retrieve the output.'% readableList(self,list_jobs)
68 spiga 1.1 raise CrabException(msg)
69     else:
70     if self.jobs == 'all':
71     self.list_id= list_id_done
72     if len(self.up_task.jobs)>len(self.list_id):
73     msg = '\nOnly %d jobs will be retrieved '% (len(self.list_id))
74     msg += ' from %d requested.\n'%(len(self.up_task.jobs))
75     msg += ' (for details: crab -status)'
76     common.logger.message(msg)
77     else:
78     for id in self.jobs:
79     if id in list_id_done: self.list_id.append(id)
80 spiga 1.6 if len(self.jobs) > len(self.list_id):
81 spiga 1.1 msg = '\nOnly %d jobs will be retrieved '% (len(self.list_id))
82     msg += ' from %d requested.\n'%(len(self.jobs))
83     msg += ' (for details: crab -status)'
84     common.logger.message(msg)
85     if not os.path.isdir(self.logDir) or not os.path.isdir(self.outDir):
86     msg = ' Output or Log dir not found!! check '+self.logDir+' and '+self.outDir
87     raise CrabException(msg)
88 fanzago 1.33 #else:
89     # submission_id = common._db.queryRunJob('submission',self.list_id)
90     # submission_id.sort()
91     # submission_id.reverse()
92     # max_id=submission_id[0]
93     # if max_id > 1: self.moveOutput(max_id)
94 spiga 1.1
95     return
96    
97     def getOutput(self):
98     """
99     Get output for a finished job with id.
100     """
101 spiga 1.4 self.checkBeforeGet()
102 slacapra 1.40 # Get first job of the list
103     if not self.dontCheckSpaceLeft and not has_freespace(self.outDir, 10*1024): # First check for more than 10 Mb
104     msg = "You have LESS than 10 MB of free space on your working dir\n"
105     msg +="Please make some room before retrying\n\n"
106     msg +="To bypass this check, run \n"
107     msg +="crab -get -USER.dontCheckSpaceLeft=1 \n"
108     raise CrabException(msg)
109     list_first=self.list_id[0:1]
110     task= common.scheduler.getOutput(1, list_first, self.outDir)
111     lastSize = self.organizeOutput( task, list_first )
112     # here check disk space for first job
113     if not self.dontCheckSpaceLeft and not has_freespace(self.outDir, lastSize*len(self.list_id)*1.2) : # add a 20% overhead
114     msg = "Estimated space needed for getOutput is "+str(lastSize*len(self.list_id)*1.2)
115     msg +=" which is LESS than available space on disk\n"
116     msg +="Please make some room before retrying\n"
117     msg +="To bypass this check, run \n"
118     msg +="crab -get -USER.dontCheckSpaceLeft=1 \n"
119     raise CrabException(msg)
120     # get the size of the actual OSB of first job
121     if (len(self.list_id)>1) :
122     # check disk space for other N jobs using estimate from the first
123     list_other=self.list_id[1:]
124     task= common.scheduler.getOutput(1, list_other, self.outDir)
125     self.organizeOutput( task, list_other )
126 spiga 1.12 return
127    
128 slacapra 1.40 def organizeOutput(self, task, list_id):
129 spiga 1.12 """
130     Untar Output
131     """
132 spiga 1.6 listCode = []
133     job_id = []
134 spiga 1.2
135 fanzago 1.33 #cwd = os.getcwd()
136     #os.chdir( self.outDir )
137 spiga 1.13 success_ret = 0
138 slacapra 1.40 for id in list_id:
139 spiga 1.39 runningJob = task.getJob( id ).runningJob
140 spiga 1.38 if runningJob.isError() :
141     continue
142 spiga 1.6 file = 'out_files_'+ str(id)+'.tgz'
143 fanzago 1.33 if os.path.exists(self.outDir + file):
144 spiga 1.39 self.max_id = runningJob['submission']
145 fanzago 1.33 if self.max_id > 1:
146     for f in os.listdir(self.outDir):
147 fanzago 1.37 if (f.find(str(id)) != -1 ) and (f != file) and f.find('Submission_'+str(id)) == -1:
148 fanzago 1.33 self.moveOutput(id, self.max_id, self.outDir, f)
149     if self.log==1:
150     for f in os.listdir(self.logDir):
151 fanzago 1.37 if f.find(str(id)) != -1 and f.find('Submission_'+str(id)) == -1:
152 fanzago 1.33 self.moveOutput(id, self.max_id, self.logDir, f)
153 slacapra 1.40 pass
154     pass
155     pass
156 slacapra 1.41 size = getGZSize(self.outDir + file)/1024 # in kB
157     cmd = 'tar zxf ' + self.outDir + file + ' ' + '-C ' + self.outDir
158 spiga 1.6 cmd_out = runCommand(cmd)
159 fanzago 1.33 cmd_2 ='rm ' + self.outDir + 'out_files_'+ str(id)+'.tgz'
160     #cmd_2 ='rm out_files_'+ str(id)+'.tgz'
161 spiga 1.9 cmd_out2 = runCommand(cmd_2)
162 spiga 1.21 msg = 'Results of Jobs # '+str(id)+' are in '+self.outDir
163 spiga 1.20 common.logger.message(msg)
164 spiga 1.6 else:
165 spiga 1.7 msg ="Output files for job "+ str(id) +" not available.\n"
166 spiga 1.13 common.logger.debug(1,msg)
167 spiga 1.7 continue
168 spiga 1.6 input = 'crab_fjr_' + str(id) + '.xml'
169 fanzago 1.33 if os.path.exists(self.outDir + input):
170     codeValue = self.parseFinalReport(self.outDir + input)
171 spiga 1.8 job_id.append(id)
172 spiga 1.6 listCode.append(codeValue)
173     else:
174     msg = "Problems with "+str(input)+". File not available.\n"
175     common.logger.message(msg)
176 spiga 1.13 success_ret +=1
177 fanzago 1.33 #os.chdir( cwd )
178 spiga 1.6 common._db.updateRunJob_(job_id , listCode)
179 spiga 1.1
180     if self.logDir != self.outDir:
181 slacapra 1.40 for i_id in list_id:
182 spiga 1.1 try:
183 fanzago 1.17 cmd = 'mv '+str(self.outDir)+'/*'+str(i_id)+'.std* '+str(self.outDir)+'/.BrokerInfo '+str(self.logDir)
184 spiga 1.1 cmd_out =os.system(cmd)
185     except:
186     msg = 'Problem with copy of job results'
187     common.logger.message(msg)
188 slacapra 1.40 msg = 'Results of Jobs # '+str(list_id)+' are in '+self.outDir+' (log files are in '+self.logDir+')'
189 spiga 1.1 common.logger.message(msg)
190 slacapra 1.40 return size
191 spiga 1.1
192 fanzago 1.5 def parseFinalReport(self, input):
193     """
194     Parses the FJR produced by job in order to retrieve
195     the WrapperExitCode and ExeExitCode.
196     Updates the BossDB with these values.
197    
198     """
199     from ProdCommon.FwkJobRep.ReportParser import readJobReport
200    
201     #input = self.outDir + '/crab_fjr_' + str(jobid) + '.xml'
202     codeValue = {}
203    
204 afanfani 1.14 jreports = readJobReport(input)
205     if len(jreports) <= 0 :
206     codeValue["applicationReturnCode"] = str(50115)
207     codeValue["wrapperReturnCode"] = str(50115)
208     common.logger.debug(5,"Empty FWkobreport: error code assigned is 50115 ")
209     return codeValue
210    
211     jobReport = jreports[0]
212 fanzago 1.5
213     exit_status = ''
214    
215     ##### temporary fix for FJR incomplete ####
216     fjr = open (input)
217     len_fjr = len(fjr.readlines())
218     if (len_fjr <= 6):
219     ### 50115 - cmsRun did not produce a valid/readable job report at runtime
220     codeValue["applicationReturnCode"] = str(50115)
221     codeValue["wrapperReturnCode"] = str(50115)
222    
223     if len(jobReport.errors) != 0 :
224     for error in jobReport.errors:
225     if error['Type'] == 'WrapperExitCode':
226     codeValue["wrapperReturnCode"] = error['ExitStatus']
227     elif error['Type'] == 'ExeExitCode':
228     codeValue["applicationReturnCode"] = error['ExitStatus']
229     else:
230     continue
231    
232     if not codeValue.has_key('wrapperReturnCode'):
233     codeValue["wrapperReturnCode"] = ''
234     if not codeValue.has_key('applicationReturnCode'):
235     codeValue["applicationReturnCode"] = ''
236    
237     return codeValue
238 spiga 1.1
239 fanzago 1.33 def moveOutput(self,id, max_id,path,file):
240 spiga 1.6 """
241     Move output of job already retrieved
242     into the correct backup directory
243     """
244 fanzago 1.33 Dir_Base=path +'Submission_'
245    
246     for i in range(1, max_id):
247     if not os.path.isdir( Dir_Base + str(i) + '/'):
248     cmd=('mkdir '+ Dir_Base + str(i) + '/ >& /dev/null')
249 spiga 1.6 cmd_out = runCommand(cmd)
250 mcinquil 1.24 common.logger.write(str(cmd_out))
251     common.logger.debug(3,str(cmd_out))
252 fanzago 1.33 cmd='mv '+ path + file + ' ' + Dir_Base + str(max_id -1) + '/ >& /dev/null'
253    
254     try:
255     cmd_out = runCommand(cmd)
256     common.logger.write(cmd_out)
257     common.logger.debug(3,cmd_out)
258     except:
259     msg = 'no output to move for job '+str(id)
260     common.logger.write(msg)
261     common.logger.debug(3,msg)
262     pass
263 spiga 1.6 return