ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/GetOutput.py
Revision: 1.12
Committed: Tue Apr 15 14:20:18 2008 UTC (17 years ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: bp_osg_bdii, CRAB_2_2_0_pre9, CRAB_2_2_0_pre8, CRAB_2_2_0_pre7
Branch point for: osg_bdii
Changes since 1.11: +8 -4 lines
Log Message:
many improvement on getOutput. Both on client and standalone classes

File Contents

# Content
1 from Actor import *
2 import common
3 import string, os, time
4 from crab_util import *
5
6 class GetOutput(Actor):
7 def __init__(self, *args):
8 self.cfg_params = args[0]
9 self.jobs = args[1]
10
11 self.outDir = common.work_space.resDir()
12 self.logDir = common.work_space.resDir()
13 self.return_data = self.cfg_params.get('USER.return_data',0)
14
15 self.possible_status = {
16 'UN': 'Unknown',
17 'SU': 'Submitted',
18 'SW': 'Waiting',
19 'SS': 'Scheduled',
20 'R': 'Running',
21 'SD': 'Done',
22 'SK': 'Killed',
23 'SA': 'Aborted',
24 'SE': 'Cleared',
25 'E': 'Cleared'
26 }
27 return
28
29 def run(self):
30 """
31 The main method of the class: Check destination dirs and
32 perform the get output
33 """
34 common.logger.debug(5, "GetOutput::run() called")
35
36 start = time.time()
37 self.getOutput()
38 stop = time.time()
39 common.logger.debug(1, "GetOutput Time: "+str(stop - start))
40 common.logger.write("GetOutput Time: "+str(stop - start))
41 pass
42
43 def checkBeforeGet(self):
44 # should be in this way... but a core dump appear... waiting for solution
45 #self.up_task = common.scheduler.queryEverything(1)
46 self.up_task = common._db.getTask()
47 list_id_done=[]
48 self.list_id=[]
49 self.all_id =[]
50 for job in self.up_task.jobs:
51 if job.runningJob['status'] in ['SD','E']:
52 list_id_done.append(job['id'])
53 self.all_id.append(job['id'])
54 check = -1
55 if self.jobs != 'all': check = len( set(self.jobs).intersection(set(list_id_done)) )
56 if len(list_id_done)==0 or ( check == 0 ) :
57 msg='\n'
58 list_ID=[]
59 for st,stDetail in self.possible_status.iteritems():
60 list_ID = common._db.queryAttrRunJob({'status':st},'jobId')
61 if len(list_ID)>0: msg += " %i Jobs in status: %s \n" % (len(list_ID), str(stDetail))
62 msg += '\n*******No jobs in Done status. It is not possible yet to retrieve the output.\n'
63 raise CrabException(msg)
64 else:
65 if self.jobs == 'all':
66 self.list_id= list_id_done
67 if len(self.up_task.jobs)>len(self.list_id):
68 msg = '\nOnly %d jobs will be retrieved '% (len(self.list_id))
69 msg += ' from %d requested.\n'%(len(self.up_task.jobs))
70 msg += ' (for details: crab -status)'
71 common.logger.message(msg)
72 else:
73 for id in self.jobs:
74 if id in list_id_done: self.list_id.append(id)
75 if len(self.jobs) > len(self.list_id):
76 msg = '\nOnly %d jobs will be retrieved '% (len(self.list_id))
77 msg += ' from %d requested.\n'%(len(self.jobs))
78 msg += ' (for details: crab -status)'
79 common.logger.message(msg)
80 if not os.path.isdir(self.logDir) or not os.path.isdir(self.outDir):
81 msg = ' Output or Log dir not found!! check '+self.logDir+' and '+self.outDir
82 raise CrabException(msg)
83 else:
84 submission_id = common._db.queryRunJob('submission',self.list_id)
85 submission_id.sort()
86 max_id=submission_id[0]
87 if max_id > 1: self.moveOutput()
88
89 return
90
91 def getOutput(self):
92 """
93 Get output for a finished job with id.
94 """
95 self.checkBeforeGet()
96 common.scheduler.getOutput(1,self.list_id,self.outDir) ## NeW BL--DS
97 self.organizeOutput()
98 return
99
100 def organizeOutput(self):
101 """
102 Untar Output
103 """
104 listCode = []
105 job_id = []
106
107 cwd = os.getcwd()
108 os.chdir( self.outDir )
109 for id in self.list_id:
110 file = 'out_files_'+ str(id)+'.tgz'
111 if os.path.exists(file):
112 cmd = 'tar zxvf '+file
113 cmd_out = runCommand(cmd)
114 cmd_2 ='rm out_files_'+ str(id)+'.tgz'
115 cmd_out2 = runCommand(cmd_2)
116 else:
117 msg ="Output files for job "+ str(id) +" not available.\n"
118 common.logger.message(msg)
119 continue
120 input = 'crab_fjr_' + str(id) + '.xml'
121 if os.path.exists(input):
122 codeValue = self.parseFinalReport(input)
123 job_id.append(id)
124 listCode.append(codeValue)
125 else:
126 msg = "Problems with "+str(input)+". File not available.\n"
127 common.logger.message(msg)
128 os.chdir( cwd )
129 common._db.updateRunJob_(job_id , listCode)
130
131 if self.logDir != self.outDir:
132 for i_id in self.list_id:
133 try:
134 cmd = 'mv '+str(self.outDir)+'/*'+str(i_id)+'.std* '+str(self.outDir)+'/.BrokerInfo '+str(self.outDir)+'/*.log '+str(self.logDir)
135 cmd_out =os.system(cmd)
136 except:
137 msg = 'Problem with copy of job results'
138 common.logger.message(msg)
139 msg = 'Results of Jobs # '+str(self.list_id)+' are in '+self.outDir+' (log files are in '+self.logDir+')'
140 common.logger.message(msg)
141 else:
142 msg = 'Results of Jobs # '+str(self.list_id)+' are in '+self.outDir
143 common.logger.message(msg)
144
145 return
146
147 def parseFinalReport(self, input):
148 """
149 Parses the FJR produced by job in order to retrieve
150 the WrapperExitCode and ExeExitCode.
151 Updates the BossDB with these values.
152
153 """
154 from ProdCommon.FwkJobRep.ReportParser import readJobReport
155
156 #input = self.outDir + '/crab_fjr_' + str(jobid) + '.xml'
157 codeValue = {}
158
159 jobReport = readJobReport(input)[0]
160
161 exit_status = ''
162
163 ##### temporary fix for FJR incomplete ####
164 fjr = open (input)
165 len_fjr = len(fjr.readlines())
166 if (len_fjr <= 6):
167 ### 50115 - cmsRun did not produce a valid/readable job report at runtime
168 codeValue["applicationReturnCode"] = str(50115)
169 codeValue["wrapperReturnCode"] = str(50115)
170
171 if len(jobReport.errors) != 0 :
172 for error in jobReport.errors:
173 if error['Type'] == 'WrapperExitCode':
174 codeValue["wrapperReturnCode"] = error['ExitStatus']
175 #print "wrapperReturnCode = ", error['ExitStatus']
176 elif error['Type'] == 'ExeExitCode':
177 codeValue["applicationReturnCode"] = error['ExitStatus']
178 #print "applicationReturnCode = ", error['ExitStatus']
179 else:
180 continue
181
182 if not codeValue.has_key('wrapperReturnCode'):
183 codeValue["wrapperReturnCode"] = ''
184 if not codeValue.has_key('applicationReturnCode'):
185 codeValue["applicationReturnCode"] = ''
186
187 return codeValue
188
189 def moveOutput(self):
190 """
191 Move output of job already retrieved
192 into the correct backup directory
193 """
194 sub_id = common._db.queryRunJob('submission',self.list_id)
195 Dist_sub_id = list(set(sub_id))
196
197 OutDir_Base=self.outDir+'Submission_'
198
199 for i in Dist_sub_id:
200 if not os.path.isdir(OutDir_Base+str(i)):
201 cmd=('mkdir '+OutDir_Base+str(i))
202 cmd_out = runCommand(cmd)
203 common.logger.debug(3,cmd_out)
204 i = 0
205 for id in self.list_id:
206 try:
207 cmd='mv '+self.outDir+'*_'+str(id)+'.* '+OutDir_Base+str(sub_id[i])
208 cmd_out = runCommand(cmd)
209 common.logger.debug(3,cmd_out)
210 except:
211 msg = 'no output to move for job '+str(id)
212 common.logger.debig(3,msg)
213 pass
214 i+=1
215 return