ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/GetOutput.py
Revision: 1.62
Committed: Wed May 23 13:06:46 2012 UTC (12 years, 11 months ago) by belforte
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_9_1, CRAB_2_9_1_pre2, CRAB_2_9_1_pre1, CRAB_2_9_0, CRAB_2_9_0_pre2, CRAB_2_9_0_pre1, CRAB_2_8_8, CRAB_2_8_8_pre1, CRAB_2_8_7_patch3, CRAB_2_8_7_patch2, CRAB_2_8_7_patch1, CRAB_2_8_7, CRAB_2_8_7_pre2, CRAB_2_8_7_pre1, CRAB_2_8_6, CRAB_2_8_6_pre1, CRAB_2_8_5_patch3, CRAB_2_8_5_patch2, CRAB_2_8_5_patch1, CRAB_2_8_5, CRAB_2_8_5_pre5, CRAB_2_8_5_pre4, CRAB_2_8_5_pre3, CRAB_2_8_4_patch3, CRAB_2_8_5_pre2, CRAB_2_8_4_patch2, CRAB_2_8_5_pre1, CRAB_2_8_4_patch1, CRAB_2_8_4, CRAB_2_8_4_pre5, CRAB_2_8_4_pre4, CRAB_2_8_4_pre3, CRAB_2_8_4_pre2, CRAB_2_8_4_pre1, CRAB_2_8_3, CRAB_2_8_3_pre4, CRAB_2_8_3_pre3, CRAB_2_8_3_pre2, CRAB_2_8_3_pre1, CRAB_2_8_2_patch1, CRAB_2_8_2, CRAB_2_8_2_pre5, CRAB_2_8_2_pre4, CRAB_2_8_2_pre3, HEAD
Changes since 1.61: +8 -2 lines
Log Message:
flag with 50700 cases where gLite returned no OSB, https://savannah.cern.ch/bugs/index.php?93844

File Contents

# User Rev Content
1 spiga 1.1 from Actor import *
2     import common
3     import string, os, time
4     from crab_util import *
5    
6     class GetOutput(Actor):
7     def __init__(self, *args):
8     self.cfg_params = args[0]
9     self.jobs = args[1]
10    
11 fanzago 1.31 self.log=0
12 fanzago 1.55
13     dir = os.getcwd()+'/'
14 fanzago 1.16 self.outDir = self.cfg_params.get('USER.outputdir' ,common.work_space.resDir())
15 fanzago 1.30 if ( self.outDir[-1] != '/' ) : self.outDir = self.outDir + '/'
16 fanzago 1.55 if ( self.outDir[0] != '/') : self.outDir = dir + self.outDir
17 fanzago 1.16 self.logDir = self.cfg_params.get('USER.logdir' ,common.work_space.resDir())
18 fanzago 1.30 if ( self.logDir[-1] != '/' ) : self.logDir = self.logDir + '/'
19 fanzago 1.55 if ( self.logDir[0] != '/') : self.logDir = dir +self.logDir
20 fanzago 1.31 if self.logDir != self.outDir:
21     self.log=1
22 spiga 1.1 self.return_data = self.cfg_params.get('USER.return_data',0)
23    
24 slacapra 1.40 self.dontCheckSpaceLeft = int(self.cfg_params.get('USER.dontCheckSpaceLeft' ,0))
25 spiga 1.1 return
26    
27     def run(self):
28     """
29 spiga 1.11 The main method of the class: Check destination dirs and
30     perform the get output
31 spiga 1.1 """
32 spiga 1.50 common.logger.debug( "GetOutput::run() called")
33 spiga 1.1
34     start = time.time()
35     self.getOutput()
36     stop = time.time()
37 spiga 1.50 common.logger.debug( "GetOutput Time: "+str(stop - start))
38 spiga 1.1 pass
39    
40     def checkBeforeGet(self):
41 spiga 1.2 # should be in this way... but a core dump appear... waiting for solution
42     #self.up_task = common.scheduler.queryEverything(1)
43     self.up_task = common._db.getTask()
44 spiga 1.1 list_id_done=[]
45 mcinquil 1.57 list_id_done_not_term=[]
46 spiga 1.1 self.list_id=[]
47     self.all_id =[]
48     for job in self.up_task.jobs:
49 spiga 1.49 if (job.runningJob['state'] == 'Terminated'):
50 spiga 1.18 list_id_done.append(job['jobId'])
51 mcinquil 1.57 elif job.runningJob['status'] in ['Done', 'Done (Failed)']:
52     list_id_done_not_term.append(job['jobId'])
53 spiga 1.18 self.all_id.append(job['jobId'])
54 spiga 1.1 check = -1
55 spiga 1.6 if self.jobs != 'all': check = len( set(self.jobs).intersection(set(list_id_done)) )
56 spiga 1.1 if len(list_id_done)==0 or ( check == 0 ) :
57 spiga 1.35 msg=''
58     list_jobs=self.jobs
59     if self.jobs == 'all': list_jobs=self.all_id
60 mcinquil 1.57 msg += 'Jobs %s are not in Done status. It is not possible yet to retrieve the output. \n'% readableList(self,list_jobs)
61     if len(list_id_done) > 0:
62     msg += ' Retrieve the jobs if those are in Done status and Terminatedi action. \n'
63     msg += ' To know the action of a job run: "crab -status v " \n'
64 spiga 1.1 raise CrabException(msg)
65     else:
66     if self.jobs == 'all':
67     self.list_id= list_id_done
68     if len(self.up_task.jobs)>len(self.list_id):
69 spiga 1.54 msg = 'Only %d jobs will be retrieved '% (len(self.list_id))
70 spiga 1.1 msg += ' from %d requested.\n'%(len(self.up_task.jobs))
71 spiga 1.54 msg += '\t(for details: crab -status)'
72 spiga 1.50 common.logger.info(msg)
73 spiga 1.1 else:
74     for id in self.jobs:
75     if id in list_id_done: self.list_id.append(id)
76 spiga 1.6 if len(self.jobs) > len(self.list_id):
77 spiga 1.54 msg = 'Only %d jobs will be retrieved '% (len(self.list_id))
78 spiga 1.1 msg += ' from %d requested.\n'%(len(self.jobs))
79 spiga 1.54 msg += '\t(for details: crab -status)'
80 spiga 1.50 common.logger.info(msg)
81 spiga 1.1 if not os.path.isdir(self.logDir) or not os.path.isdir(self.outDir):
82     msg = ' Output or Log dir not found!! check '+self.logDir+' and '+self.outDir
83     raise CrabException(msg)
84     return
85    
86     def getOutput(self):
87     """
88     Get output for a finished job with id.
89     """
90 spiga 1.4 self.checkBeforeGet()
91 slacapra 1.40 # Get first job of the list
92     if not self.dontCheckSpaceLeft and not has_freespace(self.outDir, 10*1024): # First check for more than 10 Mb
93     msg = "You have LESS than 10 MB of free space on your working dir\n"
94     msg +="Please make some room before retrying\n\n"
95     msg +="To bypass this check, run \n"
96     msg +="crab -get -USER.dontCheckSpaceLeft=1 \n"
97     raise CrabException(msg)
98     list_first=self.list_id[0:1]
99     task= common.scheduler.getOutput(1, list_first, self.outDir)
100     lastSize = self.organizeOutput( task, list_first )
101     # here check disk space for first job
102     if not self.dontCheckSpaceLeft and not has_freespace(self.outDir, lastSize*len(self.list_id)*1.2) : # add a 20% overhead
103     msg = "Estimated space needed for getOutput is "+str(lastSize*len(self.list_id)*1.2)
104     msg +=" which is LESS than available space on disk\n"
105     msg +="Please make some room before retrying\n"
106     msg +="To bypass this check, run \n"
107     msg +="crab -get -USER.dontCheckSpaceLeft=1 \n"
108     raise CrabException(msg)
109     # get the size of the actual OSB of first job
110     if (len(self.list_id)>1) :
111     # check disk space for other N jobs using estimate from the first
112     list_other=self.list_id[1:]
113     task= common.scheduler.getOutput(1, list_other, self.outDir)
114     self.organizeOutput( task, list_other )
115 spiga 1.12 return
116    
117 slacapra 1.40 def organizeOutput(self, task, list_id):
118 spiga 1.12 """
119     Untar Output
120     """
121 spiga 1.6 listCode = []
122     job_id = []
123 spiga 1.2
124 spiga 1.13 success_ret = 0
125 fanzago 1.42 size = 0 # in kB
126 slacapra 1.40 for id in list_id:
127 spiga 1.39 runningJob = task.getJob( id ).runningJob
128 spiga 1.38 if runningJob.isError() :
129     continue
130 spiga 1.6 file = 'out_files_'+ str(id)+'.tgz'
131 fanzago 1.33 if os.path.exists(self.outDir + file):
132 spiga 1.39 self.max_id = runningJob['submission']
133 fanzago 1.33 if self.max_id > 1:
134     for f in os.listdir(self.outDir):
135 fanzago 1.46 if (f.find('_'+str(id)+'.') != -1 ) and (f != file) and f.find('Submission_'+str(id)) == -1:
136 fanzago 1.33 self.moveOutput(id, self.max_id, self.outDir, f)
137     if self.log==1:
138     for f in os.listdir(self.logDir):
139 fanzago 1.46 if f.find('_'+str(id)+'.') != -1 and f.find('Submission_'+str(id)) == -1:
140 fanzago 1.33 self.moveOutput(id, self.max_id, self.logDir, f)
141 slacapra 1.40 pass
142     pass
143     pass
144 mcinquil 1.45 try:
145     size = getGZSize(self.outDir + file)/1024 # in kB
146     cmd = 'tar zxf ' + self.outDir + file + ' ' + '-C ' + self.outDir
147     cmd_out = runCommand(cmd)
148     cmd_2 ='rm ' + self.outDir + 'out_files_'+ str(id)+'.tgz'
149     cmd_out2 = runCommand(cmd_2)
150     msg = 'Results of Jobs # '+str(id)+' are in '+self.outDir
151 spiga 1.50 common.logger.info(msg)
152 mcinquil 1.45 except IOError, eio:
153 spiga 1.50 common.logger.info("Output files for job "+ str(id) +" seems corrupted.\n")
154 mcinquil 1.45 continue
155 spiga 1.6 else:
156 spiga 1.7 msg ="Output files for job "+ str(id) +" not available.\n"
157 belforte 1.62 common.logger.info(msg)
158     FieldToUpdate={}
159     FieldToUpdate['state']= 'Cleared'
160     FieldToUpdate["applicationReturnCode"] = str(50700)
161     FieldToUpdate["wrapperReturnCode"] = str(50700)
162     job_id.append(id)
163     listCode.append(FieldToUpdate)
164     #continue
165 spiga 1.6 input = 'crab_fjr_' + str(id) + '.xml'
166 fanzago 1.33 if os.path.exists(self.outDir + input):
167 spiga 1.51 FiledToUpdate = self.parseFinalReport(self.outDir + input)
168     FiledToUpdate['state']= 'Cleared'
169 spiga 1.8 job_id.append(id)
170 spiga 1.51 listCode.append(FiledToUpdate)
171 spiga 1.6 else:
172     msg = "Problems with "+str(input)+". File not available.\n"
173 spiga 1.50 common.logger.info(msg)
174 spiga 1.13 success_ret +=1
175 spiga 1.6 common._db.updateRunJob_(job_id , listCode)
176 spiga 1.1
177     if self.logDir != self.outDir:
178 slacapra 1.40 for i_id in list_id:
179 spiga 1.1 try:
180 fanzago 1.17 cmd = 'mv '+str(self.outDir)+'/*'+str(i_id)+'.std* '+str(self.outDir)+'/.BrokerInfo '+str(self.logDir)
181 spiga 1.1 cmd_out =os.system(cmd)
182     except:
183     msg = 'Problem with copy of job results'
184 spiga 1.50 common.logger.info(msg)
185 slacapra 1.40 msg = 'Results of Jobs # '+str(list_id)+' are in '+self.outDir+' (log files are in '+self.logDir+')'
186 spiga 1.50 common.logger.info(msg)
187 slacapra 1.40 return size
188 spiga 1.1
189 fanzago 1.5 def parseFinalReport(self, input):
190     """
191     Parses the FJR produced by job in order to retrieve
192     the WrapperExitCode and ExeExitCode.
193     Updates the BossDB with these values.
194    
195     """
196     from ProdCommon.FwkJobRep.ReportParser import readJobReport
197    
198     codeValue = {}
199    
200 afanfani 1.14 jreports = readJobReport(input)
201     if len(jreports) <= 0 :
202     codeValue["applicationReturnCode"] = str(50115)
203     codeValue["wrapperReturnCode"] = str(50115)
204 spiga 1.50 common.logger.debug("Empty FWkobreport: error code assigned is 50115 ")
205 afanfani 1.14 return codeValue
206    
207     jobReport = jreports[0]
208 fanzago 1.5
209     exit_status = ''
210    
211     ##### temporary fix for FJR incomplete ####
212     fjr = open (input)
213     len_fjr = len(fjr.readlines())
214     if (len_fjr <= 6):
215     ### 50115 - cmsRun did not produce a valid/readable job report at runtime
216     codeValue["applicationReturnCode"] = str(50115)
217     codeValue["wrapperReturnCode"] = str(50115)
218    
219     if len(jobReport.errors) != 0 :
220     for error in jobReport.errors:
221     if error['Type'] == 'WrapperExitCode':
222     codeValue["wrapperReturnCode"] = error['ExitStatus']
223 spiga 1.43 elif error['Type'] == 'ExeExitCode':
224     codeValue["applicationReturnCode"] = error['ExitStatus']
225     if error['Type'] == 'CMSException':
226     codeValue["applicationReturnCodeOrig"] = error['ExitStatus']
227 fanzago 1.5 else:
228     continue
229    
230     if not codeValue.has_key('wrapperReturnCode'):
231     codeValue["wrapperReturnCode"] = ''
232     if not codeValue.has_key('applicationReturnCode'):
233 spiga 1.43 if codeValue.has_key('applicationReturnCodeOrig'):
234     codeValue["applicationReturnCode"] = \
235     codeValue["applicationReturnCodeOrig"]
236     codeValue.pop("applicationReturnCodeOrig")
237     else:
238     codeValue["applicationReturnCode"] = ''
239 fanzago 1.5
240 spiga 1.44 else:
241     if codeValue.has_key('applicationReturnCodeOrig'):
242     codeValue.pop("applicationReturnCodeOrig")
243 fanzago 1.53
244     #### Filling BOSS DB with SE name and LFN, for edm and not_edm files ####
245 slacapra 1.58 lfns=[]
246 fanzago 1.59 pfns=[]
247 fanzago 1.60
248 fanzago 1.53 if (len(jobReport.files) != 0):
249     for f in jobReport.files:
250     if f['LFN']:
251 slacapra 1.58 lfns.append(f['LFN'])
252 fanzago 1.59 if f['PFN']:
253 fanzago 1.60 #### FEDE to have the correct endpoit to use in the copyData (we modify the bossDB value and not the fjr )
254     if common.scheduler.name().upper() not in ['LSF', 'CAF', 'PBS'] and codeValue["wrapperReturnCode"] == 60308:
255     pfns.append(os.path.dirname(f['SurlForGrid'])+'/')
256     else:
257     pfns.append(os.path.dirname(f['PFN'])+'/')
258     ##########
259 fanzago 1.53 if (len(jobReport.analysisFiles) != 0):
260     for aFile in jobReport.analysisFiles:
261     if aFile['LFN']:
262 slacapra 1.58 lfns.append(aFile['LFN'])
263 fanzago 1.60 if aFile['PFN']:
264     #### FEDE to have the correct endpoit to use in the copyData (we modify the bossDB value and not the fjr )
265     if common.scheduler.name().upper() not in ['LSF', 'CAF', 'PBS'] and codeValue["wrapperReturnCode"] == 60308:
266     pfns.append(os.path.dirname(aFile['SurlForGrid'])+'/')
267     else:
268     pfns.append(os.path.dirname(aFile['PFN'])+'/')
269     #########
270 fanzago 1.59 codeValue["storage"] = pfns
271 slacapra 1.58 codeValue["lfn"] = lfns
272 fanzago 1.5 return codeValue
273 spiga 1.1
274 fanzago 1.33 def moveOutput(self,id, max_id,path,file):
275 spiga 1.6 """
276     Move output of job already retrieved
277     into the correct backup directory
278     """
279 fanzago 1.33 Dir_Base=path +'Submission_'
280    
281     for i in range(1, max_id):
282     if not os.path.isdir( Dir_Base + str(i) + '/'):
283     cmd=('mkdir '+ Dir_Base + str(i) + '/ >& /dev/null')
284 spiga 1.6 cmd_out = runCommand(cmd)
285 spiga 1.50 common.logger.debug(str(cmd_out))
286 fanzago 1.33 cmd='mv '+ path + file + ' ' + Dir_Base + str(max_id -1) + '/ >& /dev/null'
287    
288     try:
289     cmd_out = runCommand(cmd)
290 spiga 1.50 common.logger.debug(cmd_out)
291 fanzago 1.33 except:
292     msg = 'no output to move for job '+str(id)
293 spiga 1.50 common.logger.debug(msg)
294 fanzago 1.33 pass
295 spiga 1.6 return