ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/GetOutput.py
Revision: 1.34
Committed: Sat Aug 30 10:54:47 2008 UTC (16 years, 8 months ago) by spiga
Content type: text/x-python
Branch: MAIN
Changes since 1.33: +5 -5 lines
Log Message:
a bit more clear print out if not jobs to be retrieved

File Contents

# Content
1 from Actor import *
2 import common
3 import string, os, time
4 from crab_util import *
5
6 class GetOutput(Actor):
7 def __init__(self, *args):
8 self.cfg_params = args[0]
9 self.jobs = args[1]
10
11 self.log=0
12 self.outDir = self.cfg_params.get('USER.outputdir' ,common.work_space.resDir())
13 if ( self.outDir[-1] != '/' ) : self.outDir = self.outDir + '/'
14 self.logDir = self.cfg_params.get('USER.logdir' ,common.work_space.resDir())
15 if ( self.logDir[-1] != '/' ) : self.logDir = self.logDir + '/'
16 if self.logDir != self.outDir:
17 self.log=1
18 self.return_data = self.cfg_params.get('USER.return_data',0)
19
20 self.possible_status = {
21 'UN': 'Unknown',
22 'SU': 'Submitted',
23 'SW': 'Waiting',
24 'SS': 'Scheduled',
25 'R': 'Running',
26 'SD': 'Done',
27 'SK': 'Killed',
28 'SA': 'Aborted',
29 'SE': 'Cleared',
30 'E': 'Cleared'
31 }
32 return
33
34 def run(self):
35 """
36 The main method of the class: Check destination dirs and
37 perform the get output
38 """
39 common.logger.debug(5, "GetOutput::run() called")
40
41 start = time.time()
42 self.getOutput()
43 stop = time.time()
44 common.logger.debug(1, "GetOutput Time: "+str(stop - start))
45 common.logger.write("GetOutput Time: "+str(stop - start))
46 pass
47
48 def checkBeforeGet(self):
49 # should be in this way... but a core dump appear... waiting for solution
50 #self.up_task = common.scheduler.queryEverything(1)
51 self.up_task = common._db.getTask()
52 list_id_done=[]
53 self.list_id=[]
54 self.all_id =[]
55 for job in self.up_task.jobs:
56 if job.runningJob['status'] in ['SD','DA']:
57 list_id_done.append(job['jobId'])
58 self.all_id.append(job['jobId'])
59 check = -1
60 if self.jobs != 'all': check = len( set(self.jobs).intersection(set(list_id_done)) )
61 if len(list_id_done)==0 or ( check == 0 ) :
62 msg=''
63 list_ID=[]
64 # for st,stDetail in self.possible_status.iteritems():
65 # list_ID = common._db.queryAttrRunJob({'status':st},'jobId')
66 # if len(list_ID)>0: msg += " %i Jobs in status: %s \n" % (len(list_ID), str(stDetail))
67 msg += 'Jobs %s are not in Done status. It is not possible yet to retrieve the output.'% readableList(self,self.jobs)
68 raise CrabException(msg)
69 else:
70 if self.jobs == 'all':
71 self.list_id= list_id_done
72 if len(self.up_task.jobs)>len(self.list_id):
73 msg = '\nOnly %d jobs will be retrieved '% (len(self.list_id))
74 msg += ' from %d requested.\n'%(len(self.up_task.jobs))
75 msg += ' (for details: crab -status)'
76 common.logger.message(msg)
77 else:
78 for id in self.jobs:
79 if id in list_id_done: self.list_id.append(id)
80 if len(self.jobs) > len(self.list_id):
81 msg = '\nOnly %d jobs will be retrieved '% (len(self.list_id))
82 msg += ' from %d requested.\n'%(len(self.jobs))
83 msg += ' (for details: crab -status)'
84 common.logger.message(msg)
85 if not os.path.isdir(self.logDir) or not os.path.isdir(self.outDir):
86 msg = ' Output or Log dir not found!! check '+self.logDir+' and '+self.outDir
87 raise CrabException(msg)
88 #else:
89 # submission_id = common._db.queryRunJob('submission',self.list_id)
90 # submission_id.sort()
91 # submission_id.reverse()
92 # max_id=submission_id[0]
93 # if max_id > 1: self.moveOutput(max_id)
94
95 return
96
97 def getOutput(self):
98 """
99 Get output for a finished job with id.
100 """
101 self.checkBeforeGet()
102 common.scheduler.getOutput(1,self.list_id,self.outDir)
103 self.organizeOutput()
104 return
105
106 def organizeOutput(self):
107 """
108 Untar Output
109 """
110 listCode = []
111 job_id = []
112
113 #cwd = os.getcwd()
114 #os.chdir( self.outDir )
115 success_ret = 0
116 for id in self.list_id:
117 file = 'out_files_'+ str(id)+'.tgz'
118 if os.path.exists(self.outDir + file):
119 self.submission_id = common._db.queryRunJob('submission',id)
120 self.max_id=max(self.submission_id)
121 if self.max_id > 1:
122 for f in os.listdir(self.outDir):
123 if (f.find(str(id)) != -1 ) and (f != file):
124 self.moveOutput(id, self.max_id, self.outDir, f)
125 if self.log==1:
126 for f in os.listdir(self.logDir):
127 if f.find(str(id)) != -1:
128 self.moveOutput(id, self.max_id, self.logDir, f)
129 cmd = 'tar zxvf ' + self.outDir + file + ' ' + '-C ' + self.outDir
130 cmd_out = runCommand(cmd)
131 cmd_2 ='rm ' + self.outDir + 'out_files_'+ str(id)+'.tgz'
132 #cmd_2 ='rm out_files_'+ str(id)+'.tgz'
133 cmd_out2 = runCommand(cmd_2)
134 msg = 'Results of Jobs # '+str(id)+' are in '+self.outDir
135 common.logger.message(msg)
136 else:
137 msg ="Output files for job "+ str(id) +" not available.\n"
138 common.logger.debug(1,msg)
139 continue
140 input = 'crab_fjr_' + str(id) + '.xml'
141 if os.path.exists(self.outDir + input):
142 codeValue = self.parseFinalReport(self.outDir + input)
143 job_id.append(id)
144 listCode.append(codeValue)
145 else:
146 msg = "Problems with "+str(input)+". File not available.\n"
147 common.logger.message(msg)
148 success_ret +=1
149 #os.chdir( cwd )
150 common._db.updateRunJob_(job_id , listCode)
151
152 if self.logDir != self.outDir:
153 for i_id in self.list_id:
154 try:
155 cmd = 'mv '+str(self.outDir)+'/*'+str(i_id)+'.std* '+str(self.outDir)+'/.BrokerInfo '+str(self.logDir)
156 cmd_out =os.system(cmd)
157 except:
158 msg = 'Problem with copy of job results'
159 common.logger.message(msg)
160 msg = 'Results of Jobs # '+str(self.list_id)+' are in '+self.outDir+' (log files are in '+self.logDir+')'
161 common.logger.message(msg)
162 return
163
164 def parseFinalReport(self, input):
165 """
166 Parses the FJR produced by job in order to retrieve
167 the WrapperExitCode and ExeExitCode.
168 Updates the BossDB with these values.
169
170 """
171 from ProdCommon.FwkJobRep.ReportParser import readJobReport
172
173 #input = self.outDir + '/crab_fjr_' + str(jobid) + '.xml'
174 codeValue = {}
175
176 jreports = readJobReport(input)
177 if len(jreports) <= 0 :
178 codeValue["applicationReturnCode"] = str(50115)
179 codeValue["wrapperReturnCode"] = str(50115)
180 common.logger.debug(5,"Empty FWkobreport: error code assigned is 50115 ")
181 return codeValue
182
183 jobReport = jreports[0]
184
185 exit_status = ''
186
187 ##### temporary fix for FJR incomplete ####
188 fjr = open (input)
189 len_fjr = len(fjr.readlines())
190 if (len_fjr <= 6):
191 ### 50115 - cmsRun did not produce a valid/readable job report at runtime
192 codeValue["applicationReturnCode"] = str(50115)
193 codeValue["wrapperReturnCode"] = str(50115)
194
195 if len(jobReport.errors) != 0 :
196 for error in jobReport.errors:
197 if error['Type'] == 'WrapperExitCode':
198 codeValue["wrapperReturnCode"] = error['ExitStatus']
199 elif error['Type'] == 'ExeExitCode':
200 codeValue["applicationReturnCode"] = error['ExitStatus']
201 else:
202 continue
203
204 if not codeValue.has_key('wrapperReturnCode'):
205 codeValue["wrapperReturnCode"] = ''
206 if not codeValue.has_key('applicationReturnCode'):
207 codeValue["applicationReturnCode"] = ''
208
209 return codeValue
210
211 def moveOutput(self,id, max_id,path,file):
212 """
213 Move output of job already retrieved
214 into the correct backup directory
215 """
216 Dir_Base=path +'Submission_'
217
218 for i in range(1, max_id):
219 if not os.path.isdir( Dir_Base + str(i) + '/'):
220 cmd=('mkdir '+ Dir_Base + str(i) + '/ >& /dev/null')
221 cmd_out = runCommand(cmd)
222 common.logger.write(str(cmd_out))
223 common.logger.debug(3,str(cmd_out))
224 cmd='mv '+ path + file + ' ' + Dir_Base + str(max_id -1) + '/ >& /dev/null'
225
226 try:
227 cmd_out = runCommand(cmd)
228 common.logger.write(cmd_out)
229 common.logger.debug(3,cmd_out)
230 except:
231 msg = 'no output to move for job '+str(id)
232 common.logger.write(msg)
233 common.logger.debug(3,msg)
234 pass
235 return