ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/GetOutput.py
Revision: 1.36
Committed: Mon Sep 1 09:51:21 2008 UTC (16 years, 8 months ago) by mcinquil
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_3_2, CRAB_2_3_2_pre7
Changes since 1.35: +1 -1 lines
Log Message:
Removed SK unused status (now there is just K)

File Contents

# Content
1 from Actor import *
2 import common
3 import string, os, time
4 from crab_util import *
5
6 class GetOutput(Actor):
7 def __init__(self, *args):
8 self.cfg_params = args[0]
9 self.jobs = args[1]
10
11 self.log=0
12 self.outDir = self.cfg_params.get('USER.outputdir' ,common.work_space.resDir())
13 if ( self.outDir[-1] != '/' ) : self.outDir = self.outDir + '/'
14 self.logDir = self.cfg_params.get('USER.logdir' ,common.work_space.resDir())
15 if ( self.logDir[-1] != '/' ) : self.logDir = self.logDir + '/'
16 if self.logDir != self.outDir:
17 self.log=1
18 self.return_data = self.cfg_params.get('USER.return_data',0)
19
20 self.possible_status = {
21 'UN': 'Unknown',
22 'SU': 'Submitted',
23 'SW': 'Waiting',
24 'SS': 'Scheduled',
25 'R': 'Running',
26 'SD': 'Done',
27 'K': 'Killed',
28 'SA': 'Aborted',
29 'SE': 'Cleared',
30 'E': 'Cleared'
31 }
32 return
33
34 def run(self):
35 """
36 The main method of the class: Check destination dirs and
37 perform the get output
38 """
39 common.logger.debug(5, "GetOutput::run() called")
40
41 start = time.time()
42 self.getOutput()
43 stop = time.time()
44 common.logger.debug(1, "GetOutput Time: "+str(stop - start))
45 common.logger.write("GetOutput Time: "+str(stop - start))
46 pass
47
48 def checkBeforeGet(self):
49 # should be in this way... but a core dump appear... waiting for solution
50 #self.up_task = common.scheduler.queryEverything(1)
51 self.up_task = common._db.getTask()
52 list_id_done=[]
53 self.list_id=[]
54 self.all_id =[]
55 for job in self.up_task.jobs:
56 if job.runningJob['status'] in ['SD','DA']:
57 list_id_done.append(job['jobId'])
58 self.all_id.append(job['jobId'])
59 check = -1
60 if self.jobs != 'all': check = len( set(self.jobs).intersection(set(list_id_done)) )
61 if len(list_id_done)==0 or ( check == 0 ) :
62 msg=''
63 list_jobs=self.jobs
64 if self.jobs == 'all': list_jobs=self.all_id
65 msg += 'Jobs %s are not in Done status. It is not possible yet to retrieve the output.'% readableList(self,list_jobs)
66 raise CrabException(msg)
67 else:
68 if self.jobs == 'all':
69 self.list_id= list_id_done
70 if len(self.up_task.jobs)>len(self.list_id):
71 msg = '\nOnly %d jobs will be retrieved '% (len(self.list_id))
72 msg += ' from %d requested.\n'%(len(self.up_task.jobs))
73 msg += ' (for details: crab -status)'
74 common.logger.message(msg)
75 else:
76 for id in self.jobs:
77 if id in list_id_done: self.list_id.append(id)
78 if len(self.jobs) > len(self.list_id):
79 msg = '\nOnly %d jobs will be retrieved '% (len(self.list_id))
80 msg += ' from %d requested.\n'%(len(self.jobs))
81 msg += ' (for details: crab -status)'
82 common.logger.message(msg)
83 if not os.path.isdir(self.logDir) or not os.path.isdir(self.outDir):
84 msg = ' Output or Log dir not found!! check '+self.logDir+' and '+self.outDir
85 raise CrabException(msg)
86 #else:
87 # submission_id = common._db.queryRunJob('submission',self.list_id)
88 # submission_id.sort()
89 # submission_id.reverse()
90 # max_id=submission_id[0]
91 # if max_id > 1: self.moveOutput(max_id)
92
93 return
94
95 def getOutput(self):
96 """
97 Get output for a finished job with id.
98 """
99 self.checkBeforeGet()
100 common.scheduler.getOutput(1,self.list_id,self.outDir)
101 self.organizeOutput()
102 return
103
104 def organizeOutput(self):
105 """
106 Untar Output
107 """
108 listCode = []
109 job_id = []
110
111 #cwd = os.getcwd()
112 #os.chdir( self.outDir )
113 success_ret = 0
114 for id in self.list_id:
115 file = 'out_files_'+ str(id)+'.tgz'
116 if os.path.exists(self.outDir + file):
117 self.submission_id = common._db.queryRunJob('submission',id)
118 self.max_id=max(self.submission_id)
119 if self.max_id > 1:
120 for f in os.listdir(self.outDir):
121 if (f.find(str(id)) != -1 ) and (f != file):
122 self.moveOutput(id, self.max_id, self.outDir, f)
123 if self.log==1:
124 for f in os.listdir(self.logDir):
125 if f.find(str(id)) != -1:
126 self.moveOutput(id, self.max_id, self.logDir, f)
127 cmd = 'tar zxvf ' + self.outDir + file + ' ' + '-C ' + self.outDir
128 cmd_out = runCommand(cmd)
129 cmd_2 ='rm ' + self.outDir + 'out_files_'+ str(id)+'.tgz'
130 #cmd_2 ='rm out_files_'+ str(id)+'.tgz'
131 cmd_out2 = runCommand(cmd_2)
132 msg = 'Results of Jobs # '+str(id)+' are in '+self.outDir
133 common.logger.message(msg)
134 else:
135 msg ="Output files for job "+ str(id) +" not available.\n"
136 common.logger.debug(1,msg)
137 continue
138 input = 'crab_fjr_' + str(id) + '.xml'
139 if os.path.exists(self.outDir + input):
140 codeValue = self.parseFinalReport(self.outDir + input)
141 job_id.append(id)
142 listCode.append(codeValue)
143 else:
144 msg = "Problems with "+str(input)+". File not available.\n"
145 common.logger.message(msg)
146 success_ret +=1
147 #os.chdir( cwd )
148 common._db.updateRunJob_(job_id , listCode)
149
150 if self.logDir != self.outDir:
151 for i_id in self.list_id:
152 try:
153 cmd = 'mv '+str(self.outDir)+'/*'+str(i_id)+'.std* '+str(self.outDir)+'/.BrokerInfo '+str(self.logDir)
154 cmd_out =os.system(cmd)
155 except:
156 msg = 'Problem with copy of job results'
157 common.logger.message(msg)
158 msg = 'Results of Jobs # '+str(self.list_id)+' are in '+self.outDir+' (log files are in '+self.logDir+')'
159 common.logger.message(msg)
160 return
161
162 def parseFinalReport(self, input):
163 """
164 Parses the FJR produced by job in order to retrieve
165 the WrapperExitCode and ExeExitCode.
166 Updates the BossDB with these values.
167
168 """
169 from ProdCommon.FwkJobRep.ReportParser import readJobReport
170
171 #input = self.outDir + '/crab_fjr_' + str(jobid) + '.xml'
172 codeValue = {}
173
174 jreports = readJobReport(input)
175 if len(jreports) <= 0 :
176 codeValue["applicationReturnCode"] = str(50115)
177 codeValue["wrapperReturnCode"] = str(50115)
178 common.logger.debug(5,"Empty FWkobreport: error code assigned is 50115 ")
179 return codeValue
180
181 jobReport = jreports[0]
182
183 exit_status = ''
184
185 ##### temporary fix for FJR incomplete ####
186 fjr = open (input)
187 len_fjr = len(fjr.readlines())
188 if (len_fjr <= 6):
189 ### 50115 - cmsRun did not produce a valid/readable job report at runtime
190 codeValue["applicationReturnCode"] = str(50115)
191 codeValue["wrapperReturnCode"] = str(50115)
192
193 if len(jobReport.errors) != 0 :
194 for error in jobReport.errors:
195 if error['Type'] == 'WrapperExitCode':
196 codeValue["wrapperReturnCode"] = error['ExitStatus']
197 elif error['Type'] == 'ExeExitCode':
198 codeValue["applicationReturnCode"] = error['ExitStatus']
199 else:
200 continue
201
202 if not codeValue.has_key('wrapperReturnCode'):
203 codeValue["wrapperReturnCode"] = ''
204 if not codeValue.has_key('applicationReturnCode'):
205 codeValue["applicationReturnCode"] = ''
206
207 return codeValue
208
209 def moveOutput(self,id, max_id,path,file):
210 """
211 Move output of job already retrieved
212 into the correct backup directory
213 """
214 Dir_Base=path +'Submission_'
215
216 for i in range(1, max_id):
217 if not os.path.isdir( Dir_Base + str(i) + '/'):
218 cmd=('mkdir '+ Dir_Base + str(i) + '/ >& /dev/null')
219 cmd_out = runCommand(cmd)
220 common.logger.write(str(cmd_out))
221 common.logger.debug(3,str(cmd_out))
222 cmd='mv '+ path + file + ' ' + Dir_Base + str(max_id -1) + '/ >& /dev/null'
223
224 try:
225 cmd_out = runCommand(cmd)
226 common.logger.write(cmd_out)
227 common.logger.debug(3,cmd_out)
228 except:
229 msg = 'no output to move for job '+str(id)
230 common.logger.write(msg)
231 common.logger.debug(3,msg)
232 pass
233 return