ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/GetOutput.py
Revision: 1.33
Committed: Wed Aug 13 16:09:00 2008 UTC (16 years, 8 months ago) by fanzago
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_3_2_pre5, CRAB_2_3_2_pre4, CRAB_2_3_2_pre3, CRAB_2_3_2_pre2
Changes since 1.32: +42 -55 lines
Log Message:
fixed problem about moving of output of resubmitted jobs

File Contents

# User Rev Content
1 spiga 1.1 from Actor import *
2     import common
3     import string, os, time
4     from crab_util import *
5    
6     class GetOutput(Actor):
7     def __init__(self, *args):
8     self.cfg_params = args[0]
9     self.jobs = args[1]
10    
11 fanzago 1.31 self.log=0
12 fanzago 1.16 self.outDir = self.cfg_params.get('USER.outputdir' ,common.work_space.resDir())
13 fanzago 1.30 if ( self.outDir[-1] != '/' ) : self.outDir = self.outDir + '/'
14 fanzago 1.16 self.logDir = self.cfg_params.get('USER.logdir' ,common.work_space.resDir())
15 fanzago 1.30 if ( self.logDir[-1] != '/' ) : self.logDir = self.logDir + '/'
16 fanzago 1.31 if self.logDir != self.outDir:
17     self.log=1
18 spiga 1.1 self.return_data = self.cfg_params.get('USER.return_data',0)
19    
20 slacapra 1.10 self.possible_status = {
21     'UN': 'Unknown',
22     'SU': 'Submitted',
23     'SW': 'Waiting',
24     'SS': 'Scheduled',
25     'R': 'Running',
26     'SD': 'Done',
27     'SK': 'Killed',
28     'SA': 'Aborted',
29     'SE': 'Cleared',
30     'E': 'Cleared'
31     }
32 spiga 1.1 return
33    
34     def run(self):
35     """
36 spiga 1.11 The main method of the class: Check destination dirs and
37     perform the get output
38 spiga 1.1 """
39 spiga 1.11 common.logger.debug(5, "GetOutput::run() called")
40 spiga 1.1
41     start = time.time()
42     self.getOutput()
43     stop = time.time()
44     common.logger.debug(1, "GetOutput Time: "+str(stop - start))
45     common.logger.write("GetOutput Time: "+str(stop - start))
46     pass
47    
48     def checkBeforeGet(self):
49 spiga 1.2 # should be in this way... but a core dump appear... waiting for solution
50     #self.up_task = common.scheduler.queryEverything(1)
51     self.up_task = common._db.getTask()
52 spiga 1.1 list_id_done=[]
53     self.list_id=[]
54     self.all_id =[]
55     for job in self.up_task.jobs:
56 mcinquil 1.32 if job.runningJob['status'] in ['SD','DA']:
57 spiga 1.18 list_id_done.append(job['jobId'])
58     self.all_id.append(job['jobId'])
59 spiga 1.1 check = -1
60 spiga 1.6 if self.jobs != 'all': check = len( set(self.jobs).intersection(set(list_id_done)) )
61 spiga 1.1 if len(list_id_done)==0 or ( check == 0 ) :
62     msg='\n'
63     list_ID=[]
64 slacapra 1.10 for st,stDetail in self.possible_status.iteritems():
65     list_ID = common._db.queryAttrRunJob({'status':st},'jobId')
66     if len(list_ID)>0: msg += " %i Jobs in status: %s \n" % (len(list_ID), str(stDetail))
67 spiga 1.1 msg += '\n*******No jobs in Done status. It is not possible yet to retrieve the output.\n'
68     raise CrabException(msg)
69     else:
70     if self.jobs == 'all':
71     self.list_id= list_id_done
72     if len(self.up_task.jobs)>len(self.list_id):
73     msg = '\nOnly %d jobs will be retrieved '% (len(self.list_id))
74     msg += ' from %d requested.\n'%(len(self.up_task.jobs))
75     msg += ' (for details: crab -status)'
76     common.logger.message(msg)
77     else:
78     for id in self.jobs:
79     if id in list_id_done: self.list_id.append(id)
80 spiga 1.6 if len(self.jobs) > len(self.list_id):
81 spiga 1.1 msg = '\nOnly %d jobs will be retrieved '% (len(self.list_id))
82     msg += ' from %d requested.\n'%(len(self.jobs))
83     msg += ' (for details: crab -status)'
84     common.logger.message(msg)
85     if not os.path.isdir(self.logDir) or not os.path.isdir(self.outDir):
86     msg = ' Output or Log dir not found!! check '+self.logDir+' and '+self.outDir
87     raise CrabException(msg)
88 fanzago 1.33 #else:
89     # submission_id = common._db.queryRunJob('submission',self.list_id)
90     # submission_id.sort()
91     # submission_id.reverse()
92     # max_id=submission_id[0]
93     # if max_id > 1: self.moveOutput(max_id)
94 spiga 1.1
95     return
96    
97     def getOutput(self):
98     """
99     Get output for a finished job with id.
100     """
101 spiga 1.4 self.checkBeforeGet()
102 spiga 1.20 common.scheduler.getOutput(1,self.list_id,self.outDir)
103 spiga 1.12 self.organizeOutput()
104     return
105    
106     def organizeOutput(self):
107     """
108     Untar Output
109     """
110 spiga 1.6 listCode = []
111     job_id = []
112 spiga 1.2
113 fanzago 1.33 #cwd = os.getcwd()
114     #os.chdir( self.outDir )
115 spiga 1.13 success_ret = 0
116 spiga 1.2 for id in self.list_id:
117 spiga 1.6 file = 'out_files_'+ str(id)+'.tgz'
118 fanzago 1.33 if os.path.exists(self.outDir + file):
119     self.submission_id = common._db.queryRunJob('submission',id)
120     self.max_id=max(self.submission_id)
121     if self.max_id > 1:
122     for f in os.listdir(self.outDir):
123     if (f.find(str(id)) != -1 ) and (f != file):
124     self.moveOutput(id, self.max_id, self.outDir, f)
125     if self.log==1:
126     for f in os.listdir(self.logDir):
127     if f.find(str(id)) != -1:
128     self.moveOutput(id, self.max_id, self.logDir, f)
129     cmd = 'tar zxvf ' + self.outDir + file + ' ' + '-C ' + self.outDir
130 spiga 1.6 cmd_out = runCommand(cmd)
131 fanzago 1.33 cmd_2 ='rm ' + self.outDir + 'out_files_'+ str(id)+'.tgz'
132     #cmd_2 ='rm out_files_'+ str(id)+'.tgz'
133 spiga 1.9 cmd_out2 = runCommand(cmd_2)
134 spiga 1.21 msg = 'Results of Jobs # '+str(id)+' are in '+self.outDir
135 spiga 1.20 common.logger.message(msg)
136 spiga 1.6 else:
137 spiga 1.7 msg ="Output files for job "+ str(id) +" not available.\n"
138 spiga 1.13 common.logger.debug(1,msg)
139 spiga 1.7 continue
140 spiga 1.6 input = 'crab_fjr_' + str(id) + '.xml'
141 fanzago 1.33 if os.path.exists(self.outDir + input):
142     codeValue = self.parseFinalReport(self.outDir + input)
143 spiga 1.8 job_id.append(id)
144 spiga 1.6 listCode.append(codeValue)
145     else:
146     msg = "Problems with "+str(input)+". File not available.\n"
147     common.logger.message(msg)
148 spiga 1.13 success_ret +=1
149 fanzago 1.33 #os.chdir( cwd )
150 spiga 1.6 common._db.updateRunJob_(job_id , listCode)
151 spiga 1.1
152     if self.logDir != self.outDir:
153     for i_id in self.list_id:
154     try:
155 fanzago 1.17 cmd = 'mv '+str(self.outDir)+'/*'+str(i_id)+'.std* '+str(self.outDir)+'/.BrokerInfo '+str(self.logDir)
156 spiga 1.1 cmd_out =os.system(cmd)
157     except:
158     msg = 'Problem with copy of job results'
159     common.logger.message(msg)
160     msg = 'Results of Jobs # '+str(self.list_id)+' are in '+self.outDir+' (log files are in '+self.logDir+')'
161     common.logger.message(msg)
162     return
163    
164 fanzago 1.5 def parseFinalReport(self, input):
165     """
166     Parses the FJR produced by job in order to retrieve
167     the WrapperExitCode and ExeExitCode.
168     Updates the BossDB with these values.
169    
170     """
171     from ProdCommon.FwkJobRep.ReportParser import readJobReport
172    
173     #input = self.outDir + '/crab_fjr_' + str(jobid) + '.xml'
174     codeValue = {}
175    
176 afanfani 1.14 jreports = readJobReport(input)
177     if len(jreports) <= 0 :
178     codeValue["applicationReturnCode"] = str(50115)
179     codeValue["wrapperReturnCode"] = str(50115)
180     common.logger.debug(5,"Empty FWkobreport: error code assigned is 50115 ")
181     return codeValue
182    
183     jobReport = jreports[0]
184 fanzago 1.5
185     exit_status = ''
186    
187     ##### temporary fix for FJR incomplete ####
188     fjr = open (input)
189     len_fjr = len(fjr.readlines())
190     if (len_fjr <= 6):
191     ### 50115 - cmsRun did not produce a valid/readable job report at runtime
192     codeValue["applicationReturnCode"] = str(50115)
193     codeValue["wrapperReturnCode"] = str(50115)
194    
195     if len(jobReport.errors) != 0 :
196     for error in jobReport.errors:
197     if error['Type'] == 'WrapperExitCode':
198     codeValue["wrapperReturnCode"] = error['ExitStatus']
199     elif error['Type'] == 'ExeExitCode':
200     codeValue["applicationReturnCode"] = error['ExitStatus']
201     else:
202     continue
203    
204     if not codeValue.has_key('wrapperReturnCode'):
205     codeValue["wrapperReturnCode"] = ''
206     if not codeValue.has_key('applicationReturnCode'):
207     codeValue["applicationReturnCode"] = ''
208    
209     return codeValue
210 spiga 1.1
211 fanzago 1.33 def moveOutput(self,id, max_id,path,file):
212 spiga 1.6 """
213     Move output of job already retrieved
214     into the correct backup directory
215     """
216 fanzago 1.33 Dir_Base=path +'Submission_'
217    
218     for i in range(1, max_id):
219     if not os.path.isdir( Dir_Base + str(i) + '/'):
220     cmd=('mkdir '+ Dir_Base + str(i) + '/ >& /dev/null')
221 spiga 1.6 cmd_out = runCommand(cmd)
222 mcinquil 1.24 common.logger.write(str(cmd_out))
223     common.logger.debug(3,str(cmd_out))
224 fanzago 1.33 cmd='mv '+ path + file + ' ' + Dir_Base + str(max_id -1) + '/ >& /dev/null'
225    
226     try:
227     cmd_out = runCommand(cmd)
228     common.logger.write(cmd_out)
229     common.logger.debug(3,cmd_out)
230     except:
231     msg = 'no output to move for job '+str(id)
232     common.logger.write(msg)
233     common.logger.debug(3,msg)
234     pass
235 spiga 1.6 return