ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/GetOutput.py
Revision: 1.9
Committed: Wed Apr 9 07:52:58 2008 UTC (17 years ago) by spiga
Content type: text/x-python
Branch: MAIN
Changes since 1.8: +1 -1 lines
Log Message:
minor fixes

File Contents

# Content
1 from Actor import *
2 import common
3 import string, os, time
4 from crab_util import *
5
6 class GetOutput(Actor):
7 def __init__(self, *args):
8 self.cfg_params = args[0]
9 self.jobs = args[1]
10
11 self.outDir = self.cfg_params.get("USER.outputdir", common.work_space.resDir() )
12 self.logDir = self.cfg_params.get("USER.logdir", common.work_space.resDir() )
13 self.return_data = self.cfg_params.get('USER.return_data',0)
14
15 self.possible_status = [
16 'Undefined',
17 'Submitted',
18 'Waiting',
19 'Ready',
20 'Scheduled',
21 'Running',
22 'Done',
23 'Cancelled',
24 'Aborted',
25 'Unknown',
26 'Done(failed)'
27 'Cleared'
28 ]
29 return
30
31 def run(self):
32 """
33 The main method of the class: compute the status and print a report
34 """
35 common.logger.debug(5, "Status::run() called")
36
37 start = time.time()
38 self.getOutput()
39 # self.parse.fjr
40 stop = time.time()
41 common.logger.debug(1, "GetOutput Time: "+str(stop - start))
42 common.logger.write("GetOutput Time: "+str(stop - start))
43 pass
44
45 def checkBeforeGet(self):
46 # should be in this way... but a core dump appear... waiting for solution
47 #self.up_task = common.scheduler.queryEverything(1)
48 self.up_task = common._db.getTask()
49 list_id_done=[]
50 self.list_id=[]
51 self.all_id =[]
52 for job in self.up_task.jobs:
53 if job.runningJob['statusScheduler']=='Done':
54 list_id_done.append(job['id'])
55 self.all_id.append(job['id'])
56 check = -1
57 if self.jobs != 'all': check = len( set(self.jobs).intersection(set(list_id_done)) )
58 if len(list_id_done)==0 or ( check == 0 ) :
59 msg='\n'
60 list_ID=[]
61 for st in self.possible_status:
62 list_ID = common._db.queryAttrRunJob({'statusScheduler':st},'jobId')
63 if len(list_ID)>0: msg += " %i Jobs %s \n" % (len(list_ID), str(st))
64 msg += '\n*******No jobs in Done status. It is not possible yet to retrieve the output.\n'
65 raise CrabException(msg)
66 else:
67 if self.jobs == 'all':
68 self.list_id= list_id_done
69 if len(self.up_task.jobs)>len(self.list_id):
70 msg = '\nOnly %d jobs will be retrieved '% (len(self.list_id))
71 msg += ' from %d requested.\n'%(len(self.up_task.jobs))
72 msg += ' (for details: crab -status)'
73 common.logger.message(msg)
74 else:
75 for id in self.jobs:
76 if id in list_id_done: self.list_id.append(id)
77 if len(self.jobs) > len(self.list_id):
78 msg = '\nOnly %d jobs will be retrieved '% (len(self.list_id))
79 msg += ' from %d requested.\n'%(len(self.jobs))
80 msg += ' (for details: crab -status)'
81 common.logger.message(msg)
82 if not os.path.isdir(self.logDir) or not os.path.isdir(self.outDir):
83 msg = ' Output or Log dir not found!! check '+self.logDir+' and '+self.outDir
84 raise CrabException(msg)
85 else:
86 submission_id = common._db.queryRunJob('submission',self.list_id)
87 submission_id.sort()
88 max_id=submission_id[0]
89 if max_id > 1: self.moveOutput()
90
91 return
92
93 def getOutput(self):
94 """
95 Get output for a finished job with id.
96 Returns the name of directory with results.
97
98 """
99 self.checkBeforeGet()
100 common.scheduler.getOutput(1,self.list_id,self.outDir) ## NeW BL--DS
101
102 listCode = []
103 job_id = []
104
105 cwd = os.getcwd()
106 os.chdir( self.outDir )
107 for id in self.list_id:
108 file = 'out_files_'+ str(id)+'.tgz'
109 if os.path.exists(file):
110 cmd = 'tar zxvf '+file
111 cmd_out = runCommand(cmd)
112 cmd_2 ='rm out_files_'+ str(id)+'.tgz'
113 cmd_out2 = runCommand(cmd_2)
114 else:
115 msg ="Output files for job "+ str(id) +" not available.\n"
116 common.logger.message(msg)
117 continue
118 input = 'crab_fjr_' + str(id) + '.xml'
119 if os.path.exists(input):
120 codeValue = self.parseFinalReport(input)
121 job_id.append(id)
122 listCode.append(codeValue)
123 else:
124 msg = "Problems with "+str(input)+". File not available.\n"
125 common.logger.message(msg)
126 os.chdir( cwd )
127 common._db.updateRunJob_(job_id , listCode)
128
129 if self.logDir != self.outDir:
130 for i_id in self.list_id:
131 try:
132 cmd = 'mv '+str(self.outDir)+'/*'+str(i_id)+'.std* '+str(self.outDir)+'/.BrokerInfo '+str(self.outDir)+'/*.log '+str(self.logDir)
133 cmd_out =os.system(cmd)
134 except:
135 msg = 'Problem with copy of job results'
136 common.logger.message(msg)
137 msg = 'Results of Jobs # '+str(self.list_id)+' are in '+self.outDir+' (log files are in '+self.logDir+')'
138 common.logger.message(msg)
139 else:
140 msg = 'Results of Jobs # '+str(self.list_id)+' are in '+self.outDir
141 common.logger.message(msg)
142
143 return
144
145 def parseFinalReport(self, input):
146 """
147 Parses the FJR produced by job in order to retrieve
148 the WrapperExitCode and ExeExitCode.
149 Updates the BossDB with these values.
150
151 """
152 from ProdCommon.FwkJobRep.ReportParser import readJobReport
153
154 #input = self.outDir + '/crab_fjr_' + str(jobid) + '.xml'
155 codeValue = {}
156
157 jobReport = readJobReport(input)[0]
158
159 exit_status = ''
160
161 ##### temporary fix for FJR incomplete ####
162 fjr = open (input)
163 len_fjr = len(fjr.readlines())
164 if (len_fjr <= 6):
165 ### 50115 - cmsRun did not produce a valid/readable job report at runtime
166 codeValue["applicationReturnCode"] = str(50115)
167 codeValue["wrapperReturnCode"] = str(50115)
168
169 if len(jobReport.errors) != 0 :
170 for error in jobReport.errors:
171 if error['Type'] == 'WrapperExitCode':
172 codeValue["wrapperReturnCode"] = error['ExitStatus']
173 #print "wrapperReturnCode = ", error['ExitStatus']
174 elif error['Type'] == 'ExeExitCode':
175 codeValue["applicationReturnCode"] = error['ExitStatus']
176 #print "applicationReturnCode = ", error['ExitStatus']
177 else:
178 continue
179
180 if not codeValue.has_key('wrapperReturnCode'):
181 codeValue["wrapperReturnCode"] = ''
182 if not codeValue.has_key('applicationReturnCode'):
183 codeValue["applicationReturnCode"] = ''
184
185 return codeValue
186
187 def moveOutput(self):
188 """
189 Move output of job already retrieved
190 into the correct backup directory
191 """
192 sub_id = common._db.queryRunJob('submission',self.list_id)
193 Dist_sub_id = list(set(sub_id))
194
195 OutDir_Base=self.outDir+'Submission_'
196
197 for i in Dist_sub_id:
198 if not os.path.isdir(OutDir_Base+str(i)):
199 cmd=('mkdir '+OutDir_Base+str(i))
200 cmd_out = runCommand(cmd)
201 common.logger.debug(3,cmd_out)
202 i = 0
203 for id in self.list_id:
204 try:
205 cmd='mv '+self.outDir+'*_'+str(id)+'.* '+OutDir_Base+str(sub_id[i])
206 cmd_out = runCommand(cmd)
207 common.logger.debug(3,cmd_out)
208 except:
209 msg = 'no output to move for job '+str(id)
210 common.logger.debig(3,msg)
211 pass
212 i+=1
213 return