ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/GetOutput.py
Revision: 1.36
Committed: Mon Sep 1 09:51:21 2008 UTC (16 years, 8 months ago) by mcinquil
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_3_2, CRAB_2_3_2_pre7
Changes since 1.35: +1 -1 lines
Log Message:
Removed SK unused status (now there is just K)

File Contents

# User Rev Content
1 spiga 1.1 from Actor import *
2     import common
3     import string, os, time
4     from crab_util import *
5    
6     class GetOutput(Actor):
7     def __init__(self, *args):
8     self.cfg_params = args[0]
9     self.jobs = args[1]
10    
11 fanzago 1.31 self.log=0
12 fanzago 1.16 self.outDir = self.cfg_params.get('USER.outputdir' ,common.work_space.resDir())
13 fanzago 1.30 if ( self.outDir[-1] != '/' ) : self.outDir = self.outDir + '/'
14 fanzago 1.16 self.logDir = self.cfg_params.get('USER.logdir' ,common.work_space.resDir())
15 fanzago 1.30 if ( self.logDir[-1] != '/' ) : self.logDir = self.logDir + '/'
16 fanzago 1.31 if self.logDir != self.outDir:
17     self.log=1
18 spiga 1.1 self.return_data = self.cfg_params.get('USER.return_data',0)
19    
20 slacapra 1.10 self.possible_status = {
21     'UN': 'Unknown',
22     'SU': 'Submitted',
23     'SW': 'Waiting',
24     'SS': 'Scheduled',
25     'R': 'Running',
26     'SD': 'Done',
27 mcinquil 1.36 'K': 'Killed',
28 slacapra 1.10 'SA': 'Aborted',
29     'SE': 'Cleared',
30     'E': 'Cleared'
31     }
32 spiga 1.1 return
33    
34     def run(self):
35     """
36 spiga 1.11 The main method of the class: Check destination dirs and
37     perform the get output
38 spiga 1.1 """
39 spiga 1.11 common.logger.debug(5, "GetOutput::run() called")
40 spiga 1.1
41     start = time.time()
42     self.getOutput()
43     stop = time.time()
44     common.logger.debug(1, "GetOutput Time: "+str(stop - start))
45     common.logger.write("GetOutput Time: "+str(stop - start))
46     pass
47    
48     def checkBeforeGet(self):
49 spiga 1.2 # should be in this way... but a core dump appear... waiting for solution
50     #self.up_task = common.scheduler.queryEverything(1)
51     self.up_task = common._db.getTask()
52 spiga 1.1 list_id_done=[]
53     self.list_id=[]
54     self.all_id =[]
55     for job in self.up_task.jobs:
56 mcinquil 1.32 if job.runningJob['status'] in ['SD','DA']:
57 spiga 1.18 list_id_done.append(job['jobId'])
58     self.all_id.append(job['jobId'])
59 spiga 1.1 check = -1
60 spiga 1.6 if self.jobs != 'all': check = len( set(self.jobs).intersection(set(list_id_done)) )
61 spiga 1.1 if len(list_id_done)==0 or ( check == 0 ) :
62 spiga 1.35 msg=''
63     list_jobs=self.jobs
64     if self.jobs == 'all': list_jobs=self.all_id
65     msg += 'Jobs %s are not in Done status. It is not possible yet to retrieve the output.'% readableList(self,list_jobs)
66 spiga 1.1 raise CrabException(msg)
67     else:
68     if self.jobs == 'all':
69     self.list_id= list_id_done
70     if len(self.up_task.jobs)>len(self.list_id):
71     msg = '\nOnly %d jobs will be retrieved '% (len(self.list_id))
72     msg += ' from %d requested.\n'%(len(self.up_task.jobs))
73     msg += ' (for details: crab -status)'
74     common.logger.message(msg)
75     else:
76     for id in self.jobs:
77     if id in list_id_done: self.list_id.append(id)
78 spiga 1.6 if len(self.jobs) > len(self.list_id):
79 spiga 1.1 msg = '\nOnly %d jobs will be retrieved '% (len(self.list_id))
80     msg += ' from %d requested.\n'%(len(self.jobs))
81     msg += ' (for details: crab -status)'
82     common.logger.message(msg)
83     if not os.path.isdir(self.logDir) or not os.path.isdir(self.outDir):
84     msg = ' Output or Log dir not found!! check '+self.logDir+' and '+self.outDir
85     raise CrabException(msg)
86 fanzago 1.33 #else:
87     # submission_id = common._db.queryRunJob('submission',self.list_id)
88     # submission_id.sort()
89     # submission_id.reverse()
90     # max_id=submission_id[0]
91     # if max_id > 1: self.moveOutput(max_id)
92 spiga 1.1
93     return
94    
95     def getOutput(self):
96     """
97     Get output for a finished job with id.
98     """
99 spiga 1.4 self.checkBeforeGet()
100 spiga 1.20 common.scheduler.getOutput(1,self.list_id,self.outDir)
101 spiga 1.12 self.organizeOutput()
102     return
103    
104     def organizeOutput(self):
105     """
106     Untar Output
107     """
108 spiga 1.6 listCode = []
109     job_id = []
110 spiga 1.2
111 fanzago 1.33 #cwd = os.getcwd()
112     #os.chdir( self.outDir )
113 spiga 1.13 success_ret = 0
114 spiga 1.2 for id in self.list_id:
115 spiga 1.6 file = 'out_files_'+ str(id)+'.tgz'
116 fanzago 1.33 if os.path.exists(self.outDir + file):
117     self.submission_id = common._db.queryRunJob('submission',id)
118     self.max_id=max(self.submission_id)
119     if self.max_id > 1:
120     for f in os.listdir(self.outDir):
121     if (f.find(str(id)) != -1 ) and (f != file):
122     self.moveOutput(id, self.max_id, self.outDir, f)
123     if self.log==1:
124     for f in os.listdir(self.logDir):
125     if f.find(str(id)) != -1:
126     self.moveOutput(id, self.max_id, self.logDir, f)
127     cmd = 'tar zxvf ' + self.outDir + file + ' ' + '-C ' + self.outDir
128 spiga 1.6 cmd_out = runCommand(cmd)
129 fanzago 1.33 cmd_2 ='rm ' + self.outDir + 'out_files_'+ str(id)+'.tgz'
130     #cmd_2 ='rm out_files_'+ str(id)+'.tgz'
131 spiga 1.9 cmd_out2 = runCommand(cmd_2)
132 spiga 1.21 msg = 'Results of Jobs # '+str(id)+' are in '+self.outDir
133 spiga 1.20 common.logger.message(msg)
134 spiga 1.6 else:
135 spiga 1.7 msg ="Output files for job "+ str(id) +" not available.\n"
136 spiga 1.13 common.logger.debug(1,msg)
137 spiga 1.7 continue
138 spiga 1.6 input = 'crab_fjr_' + str(id) + '.xml'
139 fanzago 1.33 if os.path.exists(self.outDir + input):
140     codeValue = self.parseFinalReport(self.outDir + input)
141 spiga 1.8 job_id.append(id)
142 spiga 1.6 listCode.append(codeValue)
143     else:
144     msg = "Problems with "+str(input)+". File not available.\n"
145     common.logger.message(msg)
146 spiga 1.13 success_ret +=1
147 fanzago 1.33 #os.chdir( cwd )
148 spiga 1.6 common._db.updateRunJob_(job_id , listCode)
149 spiga 1.1
150     if self.logDir != self.outDir:
151     for i_id in self.list_id:
152     try:
153 fanzago 1.17 cmd = 'mv '+str(self.outDir)+'/*'+str(i_id)+'.std* '+str(self.outDir)+'/.BrokerInfo '+str(self.logDir)
154 spiga 1.1 cmd_out =os.system(cmd)
155     except:
156     msg = 'Problem with copy of job results'
157     common.logger.message(msg)
158     msg = 'Results of Jobs # '+str(self.list_id)+' are in '+self.outDir+' (log files are in '+self.logDir+')'
159     common.logger.message(msg)
160     return
161    
162 fanzago 1.5 def parseFinalReport(self, input):
163     """
164     Parses the FJR produced by job in order to retrieve
165     the WrapperExitCode and ExeExitCode.
166     Updates the BossDB with these values.
167    
168     """
169     from ProdCommon.FwkJobRep.ReportParser import readJobReport
170    
171     #input = self.outDir + '/crab_fjr_' + str(jobid) + '.xml'
172     codeValue = {}
173    
174 afanfani 1.14 jreports = readJobReport(input)
175     if len(jreports) <= 0 :
176     codeValue["applicationReturnCode"] = str(50115)
177     codeValue["wrapperReturnCode"] = str(50115)
178     common.logger.debug(5,"Empty FWkobreport: error code assigned is 50115 ")
179     return codeValue
180    
181     jobReport = jreports[0]
182 fanzago 1.5
183     exit_status = ''
184    
185     ##### temporary fix for FJR incomplete ####
186     fjr = open (input)
187     len_fjr = len(fjr.readlines())
188     if (len_fjr <= 6):
189     ### 50115 - cmsRun did not produce a valid/readable job report at runtime
190     codeValue["applicationReturnCode"] = str(50115)
191     codeValue["wrapperReturnCode"] = str(50115)
192    
193     if len(jobReport.errors) != 0 :
194     for error in jobReport.errors:
195     if error['Type'] == 'WrapperExitCode':
196     codeValue["wrapperReturnCode"] = error['ExitStatus']
197     elif error['Type'] == 'ExeExitCode':
198     codeValue["applicationReturnCode"] = error['ExitStatus']
199     else:
200     continue
201    
202     if not codeValue.has_key('wrapperReturnCode'):
203     codeValue["wrapperReturnCode"] = ''
204     if not codeValue.has_key('applicationReturnCode'):
205     codeValue["applicationReturnCode"] = ''
206    
207     return codeValue
208 spiga 1.1
209 fanzago 1.33 def moveOutput(self,id, max_id,path,file):
210 spiga 1.6 """
211     Move output of job already retrieved
212     into the correct backup directory
213     """
214 fanzago 1.33 Dir_Base=path +'Submission_'
215    
216     for i in range(1, max_id):
217     if not os.path.isdir( Dir_Base + str(i) + '/'):
218     cmd=('mkdir '+ Dir_Base + str(i) + '/ >& /dev/null')
219 spiga 1.6 cmd_out = runCommand(cmd)
220 mcinquil 1.24 common.logger.write(str(cmd_out))
221     common.logger.debug(3,str(cmd_out))
222 fanzago 1.33 cmd='mv '+ path + file + ' ' + Dir_Base + str(max_id -1) + '/ >& /dev/null'
223    
224     try:
225     cmd_out = runCommand(cmd)
226     common.logger.write(cmd_out)
227     common.logger.debug(3,cmd_out)
228     except:
229     msg = 'no output to move for job '+str(id)
230     common.logger.write(msg)
231     common.logger.debug(3,msg)
232     pass
233 spiga 1.6 return