ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/GetOutput.py
Revision: 1.38
Committed: Mon Sep 22 15:46:53 2008 UTC (16 years, 7 months ago) by spiga
Content type: text/x-python
Branch: MAIN
Changes since 1.37: +7 -4 lines
Log Message:
improvements in check for get output avoiding problems for failed job

File Contents

# User Rev Content
1 spiga 1.1 from Actor import *
2     import common
3     import string, os, time
4     from crab_util import *
5    
6     class GetOutput(Actor):
7     def __init__(self, *args):
8     self.cfg_params = args[0]
9     self.jobs = args[1]
10    
11 fanzago 1.31 self.log=0
12 fanzago 1.16 self.outDir = self.cfg_params.get('USER.outputdir' ,common.work_space.resDir())
13 fanzago 1.30 if ( self.outDir[-1] != '/' ) : self.outDir = self.outDir + '/'
14 fanzago 1.16 self.logDir = self.cfg_params.get('USER.logdir' ,common.work_space.resDir())
15 fanzago 1.30 if ( self.logDir[-1] != '/' ) : self.logDir = self.logDir + '/'
16 fanzago 1.31 if self.logDir != self.outDir:
17     self.log=1
18 spiga 1.1 self.return_data = self.cfg_params.get('USER.return_data',0)
19    
20 slacapra 1.10 self.possible_status = {
21     'UN': 'Unknown',
22     'SU': 'Submitted',
23     'SW': 'Waiting',
24     'SS': 'Scheduled',
25     'R': 'Running',
26     'SD': 'Done',
27 mcinquil 1.36 'K': 'Killed',
28 slacapra 1.10 'SA': 'Aborted',
29     'SE': 'Cleared',
30     'E': 'Cleared'
31     }
32 spiga 1.1 return
33    
34     def run(self):
35     """
36 spiga 1.11 The main method of the class: Check destination dirs and
37     perform the get output
38 spiga 1.1 """
39 spiga 1.11 common.logger.debug(5, "GetOutput::run() called")
40 spiga 1.1
41     start = time.time()
42     self.getOutput()
43     stop = time.time()
44     common.logger.debug(1, "GetOutput Time: "+str(stop - start))
45     common.logger.write("GetOutput Time: "+str(stop - start))
46     pass
47    
48     def checkBeforeGet(self):
49 spiga 1.2 # should be in this way... but a core dump appear... waiting for solution
50     #self.up_task = common.scheduler.queryEverything(1)
51     self.up_task = common._db.getTask()
52 spiga 1.1 list_id_done=[]
53     self.list_id=[]
54     self.all_id =[]
55     for job in self.up_task.jobs:
56 mcinquil 1.32 if job.runningJob['status'] in ['SD','DA']:
57 spiga 1.18 list_id_done.append(job['jobId'])
58     self.all_id.append(job['jobId'])
59 spiga 1.1 check = -1
60 spiga 1.6 if self.jobs != 'all': check = len( set(self.jobs).intersection(set(list_id_done)) )
61 spiga 1.1 if len(list_id_done)==0 or ( check == 0 ) :
62 spiga 1.35 msg=''
63     list_jobs=self.jobs
64     if self.jobs == 'all': list_jobs=self.all_id
65     msg += 'Jobs %s are not in Done status. It is not possible yet to retrieve the output.'% readableList(self,list_jobs)
66 spiga 1.1 raise CrabException(msg)
67     else:
68     if self.jobs == 'all':
69     self.list_id= list_id_done
70     if len(self.up_task.jobs)>len(self.list_id):
71     msg = '\nOnly %d jobs will be retrieved '% (len(self.list_id))
72     msg += ' from %d requested.\n'%(len(self.up_task.jobs))
73     msg += ' (for details: crab -status)'
74     common.logger.message(msg)
75     else:
76     for id in self.jobs:
77     if id in list_id_done: self.list_id.append(id)
78 spiga 1.6 if len(self.jobs) > len(self.list_id):
79 spiga 1.1 msg = '\nOnly %d jobs will be retrieved '% (len(self.list_id))
80     msg += ' from %d requested.\n'%(len(self.jobs))
81     msg += ' (for details: crab -status)'
82     common.logger.message(msg)
83     if not os.path.isdir(self.logDir) or not os.path.isdir(self.outDir):
84     msg = ' Output or Log dir not found!! check '+self.logDir+' and '+self.outDir
85     raise CrabException(msg)
86 fanzago 1.33 #else:
87     # submission_id = common._db.queryRunJob('submission',self.list_id)
88     # submission_id.sort()
89     # submission_id.reverse()
90     # max_id=submission_id[0]
91     # if max_id > 1: self.moveOutput(max_id)
92 spiga 1.1
93     return
94    
95     def getOutput(self):
96     """
97     Get output for a finished job with id.
98     """
99 spiga 1.4 self.checkBeforeGet()
100 spiga 1.38 task= common.scheduler.getOutput(1,self.list_id,self.outDir)
101     self.organizeOutput( task )
102 spiga 1.12 return
103    
104 spiga 1.38 def organizeOutput(self, task):
105 spiga 1.12 """
106     Untar Output
107     """
108 spiga 1.6 listCode = []
109     job_id = []
110 spiga 1.2
111 fanzago 1.33 #cwd = os.getcwd()
112     #os.chdir( self.outDir )
113 spiga 1.13 success_ret = 0
114 spiga 1.2 for id in self.list_id:
115 spiga 1.38 runningJob = task.getJob( id ).runningJob()
116     if runningJob.isError() :
117     continue
118 spiga 1.6 file = 'out_files_'+ str(id)+'.tgz'
119 fanzago 1.33 if os.path.exists(self.outDir + file):
120 spiga 1.38 self.submission_id = runningJob['submission']
121 fanzago 1.33 self.max_id=max(self.submission_id)
122     if self.max_id > 1:
123     for f in os.listdir(self.outDir):
124 fanzago 1.37 if (f.find(str(id)) != -1 ) and (f != file) and f.find('Submission_'+str(id)) == -1:
125 fanzago 1.33 self.moveOutput(id, self.max_id, self.outDir, f)
126     if self.log==1:
127     for f in os.listdir(self.logDir):
128 fanzago 1.37 if f.find(str(id)) != -1 and f.find('Submission_'+str(id)) == -1:
129 fanzago 1.33 self.moveOutput(id, self.max_id, self.logDir, f)
130     cmd = 'tar zxvf ' + self.outDir + file + ' ' + '-C ' + self.outDir
131 spiga 1.6 cmd_out = runCommand(cmd)
132 fanzago 1.33 cmd_2 ='rm ' + self.outDir + 'out_files_'+ str(id)+'.tgz'
133     #cmd_2 ='rm out_files_'+ str(id)+'.tgz'
134 spiga 1.9 cmd_out2 = runCommand(cmd_2)
135 spiga 1.21 msg = 'Results of Jobs # '+str(id)+' are in '+self.outDir
136 spiga 1.20 common.logger.message(msg)
137 spiga 1.6 else:
138 spiga 1.7 msg ="Output files for job "+ str(id) +" not available.\n"
139 spiga 1.13 common.logger.debug(1,msg)
140 spiga 1.7 continue
141 spiga 1.6 input = 'crab_fjr_' + str(id) + '.xml'
142 fanzago 1.33 if os.path.exists(self.outDir + input):
143     codeValue = self.parseFinalReport(self.outDir + input)
144 spiga 1.8 job_id.append(id)
145 spiga 1.6 listCode.append(codeValue)
146     else:
147     msg = "Problems with "+str(input)+". File not available.\n"
148     common.logger.message(msg)
149 spiga 1.13 success_ret +=1
150 fanzago 1.33 #os.chdir( cwd )
151 spiga 1.6 common._db.updateRunJob_(job_id , listCode)
152 spiga 1.1
153     if self.logDir != self.outDir:
154     for i_id in self.list_id:
155     try:
156 fanzago 1.17 cmd = 'mv '+str(self.outDir)+'/*'+str(i_id)+'.std* '+str(self.outDir)+'/.BrokerInfo '+str(self.logDir)
157 spiga 1.1 cmd_out =os.system(cmd)
158     except:
159     msg = 'Problem with copy of job results'
160     common.logger.message(msg)
161     msg = 'Results of Jobs # '+str(self.list_id)+' are in '+self.outDir+' (log files are in '+self.logDir+')'
162     common.logger.message(msg)
163     return
164    
165 fanzago 1.5 def parseFinalReport(self, input):
166     """
167     Parses the FJR produced by job in order to retrieve
168     the WrapperExitCode and ExeExitCode.
169     Updates the BossDB with these values.
170    
171     """
172     from ProdCommon.FwkJobRep.ReportParser import readJobReport
173    
174     #input = self.outDir + '/crab_fjr_' + str(jobid) + '.xml'
175     codeValue = {}
176    
177 afanfani 1.14 jreports = readJobReport(input)
178     if len(jreports) <= 0 :
179     codeValue["applicationReturnCode"] = str(50115)
180     codeValue["wrapperReturnCode"] = str(50115)
181     common.logger.debug(5,"Empty FWkobreport: error code assigned is 50115 ")
182     return codeValue
183    
184     jobReport = jreports[0]
185 fanzago 1.5
186     exit_status = ''
187    
188     ##### temporary fix for FJR incomplete ####
189     fjr = open (input)
190     len_fjr = len(fjr.readlines())
191     if (len_fjr <= 6):
192     ### 50115 - cmsRun did not produce a valid/readable job report at runtime
193     codeValue["applicationReturnCode"] = str(50115)
194     codeValue["wrapperReturnCode"] = str(50115)
195    
196     if len(jobReport.errors) != 0 :
197     for error in jobReport.errors:
198     if error['Type'] == 'WrapperExitCode':
199     codeValue["wrapperReturnCode"] = error['ExitStatus']
200     elif error['Type'] == 'ExeExitCode':
201     codeValue["applicationReturnCode"] = error['ExitStatus']
202     else:
203     continue
204    
205     if not codeValue.has_key('wrapperReturnCode'):
206     codeValue["wrapperReturnCode"] = ''
207     if not codeValue.has_key('applicationReturnCode'):
208     codeValue["applicationReturnCode"] = ''
209    
210     return codeValue
211 spiga 1.1
212 fanzago 1.33 def moveOutput(self,id, max_id,path,file):
213 spiga 1.6 """
214     Move output of job already retrieved
215     into the correct backup directory
216     """
217 fanzago 1.33 Dir_Base=path +'Submission_'
218    
219     for i in range(1, max_id):
220     if not os.path.isdir( Dir_Base + str(i) + '/'):
221     cmd=('mkdir '+ Dir_Base + str(i) + '/ >& /dev/null')
222 spiga 1.6 cmd_out = runCommand(cmd)
223 mcinquil 1.24 common.logger.write(str(cmd_out))
224     common.logger.debug(3,str(cmd_out))
225 fanzago 1.33 cmd='mv '+ path + file + ' ' + Dir_Base + str(max_id -1) + '/ >& /dev/null'
226    
227     try:
228     cmd_out = runCommand(cmd)
229     common.logger.write(cmd_out)
230     common.logger.debug(3,cmd_out)
231     except:
232     msg = 'no output to move for job '+str(id)
233     common.logger.write(msg)
234     common.logger.debug(3,msg)
235     pass
236 spiga 1.6 return