1 |
from Actor import *
|
2 |
import common
|
3 |
import string, os, time
|
4 |
from crab_util import *
|
5 |
|
6 |
class GetOutput(Actor):
|
7 |
def __init__(self, *args):
|
8 |
self.cfg_params = args[0]
|
9 |
self.jobs = args[1]
|
10 |
|
11 |
self.outDir = common.work_space.resDir()
|
12 |
self.logDir = common.work_space.resDir()
|
13 |
self.return_data = self.cfg_params.get('USER.return_data',0)
|
14 |
|
15 |
self.possible_status = {
|
16 |
'UN': 'Unknown',
|
17 |
'SU': 'Submitted',
|
18 |
'SW': 'Waiting',
|
19 |
'SS': 'Scheduled',
|
20 |
'R': 'Running',
|
21 |
'SD': 'Done',
|
22 |
'SK': 'Killed',
|
23 |
'SA': 'Aborted',
|
24 |
'SE': 'Cleared',
|
25 |
'E': 'Cleared'
|
26 |
}
|
27 |
return
|
28 |
|
29 |
def run(self):
|
30 |
"""
|
31 |
The main method of the class: Check destination dirs and
|
32 |
perform the get output
|
33 |
"""
|
34 |
common.logger.debug(5, "GetOutput::run() called")
|
35 |
|
36 |
start = time.time()
|
37 |
self.getOutput()
|
38 |
stop = time.time()
|
39 |
common.logger.debug(1, "GetOutput Time: "+str(stop - start))
|
40 |
common.logger.write("GetOutput Time: "+str(stop - start))
|
41 |
pass
|
42 |
|
43 |
def checkBeforeGet(self):
|
44 |
# should be in this way... but a core dump appear... waiting for solution
|
45 |
#self.up_task = common.scheduler.queryEverything(1)
|
46 |
self.up_task = common._db.getTask()
|
47 |
list_id_done=[]
|
48 |
self.list_id=[]
|
49 |
self.all_id =[]
|
50 |
for job in self.up_task.jobs:
|
51 |
if job.runningJob['status'] in ['SD','E']:
|
52 |
list_id_done.append(job['id'])
|
53 |
self.all_id.append(job['id'])
|
54 |
check = -1
|
55 |
if self.jobs != 'all': check = len( set(self.jobs).intersection(set(list_id_done)) )
|
56 |
if len(list_id_done)==0 or ( check == 0 ) :
|
57 |
msg='\n'
|
58 |
list_ID=[]
|
59 |
for st,stDetail in self.possible_status.iteritems():
|
60 |
list_ID = common._db.queryAttrRunJob({'status':st},'jobId')
|
61 |
if len(list_ID)>0: msg += " %i Jobs in status: %s \n" % (len(list_ID), str(stDetail))
|
62 |
msg += '\n*******No jobs in Done status. It is not possible yet to retrieve the output.\n'
|
63 |
raise CrabException(msg)
|
64 |
else:
|
65 |
if self.jobs == 'all':
|
66 |
self.list_id= list_id_done
|
67 |
if len(self.up_task.jobs)>len(self.list_id):
|
68 |
msg = '\nOnly %d jobs will be retrieved '% (len(self.list_id))
|
69 |
msg += ' from %d requested.\n'%(len(self.up_task.jobs))
|
70 |
msg += ' (for details: crab -status)'
|
71 |
common.logger.message(msg)
|
72 |
else:
|
73 |
for id in self.jobs:
|
74 |
if id in list_id_done: self.list_id.append(id)
|
75 |
if len(self.jobs) > len(self.list_id):
|
76 |
msg = '\nOnly %d jobs will be retrieved '% (len(self.list_id))
|
77 |
msg += ' from %d requested.\n'%(len(self.jobs))
|
78 |
msg += ' (for details: crab -status)'
|
79 |
common.logger.message(msg)
|
80 |
if not os.path.isdir(self.logDir) or not os.path.isdir(self.outDir):
|
81 |
msg = ' Output or Log dir not found!! check '+self.logDir+' and '+self.outDir
|
82 |
raise CrabException(msg)
|
83 |
else:
|
84 |
submission_id = common._db.queryRunJob('submission',self.list_id)
|
85 |
submission_id.sort()
|
86 |
max_id=submission_id[0]
|
87 |
if max_id > 1: self.moveOutput()
|
88 |
|
89 |
return
|
90 |
|
91 |
def getOutput(self):
|
92 |
"""
|
93 |
Get output for a finished job with id.
|
94 |
"""
|
95 |
self.checkBeforeGet()
|
96 |
common.scheduler.getOutput(1,self.list_id,self.outDir) ## NeW BL--DS
|
97 |
self.organizeOutput()
|
98 |
return
|
99 |
|
100 |
def organizeOutput(self):
|
101 |
"""
|
102 |
Untar Output
|
103 |
"""
|
104 |
listCode = []
|
105 |
job_id = []
|
106 |
|
107 |
cwd = os.getcwd()
|
108 |
os.chdir( self.outDir )
|
109 |
for id in self.list_id:
|
110 |
file = 'out_files_'+ str(id)+'.tgz'
|
111 |
if os.path.exists(file):
|
112 |
cmd = 'tar zxvf '+file
|
113 |
cmd_out = runCommand(cmd)
|
114 |
cmd_2 ='rm out_files_'+ str(id)+'.tgz'
|
115 |
cmd_out2 = runCommand(cmd_2)
|
116 |
else:
|
117 |
msg ="Output files for job "+ str(id) +" not available.\n"
|
118 |
common.logger.message(msg)
|
119 |
continue
|
120 |
input = 'crab_fjr_' + str(id) + '.xml'
|
121 |
if os.path.exists(input):
|
122 |
codeValue = self.parseFinalReport(input)
|
123 |
job_id.append(id)
|
124 |
listCode.append(codeValue)
|
125 |
else:
|
126 |
msg = "Problems with "+str(input)+". File not available.\n"
|
127 |
common.logger.message(msg)
|
128 |
os.chdir( cwd )
|
129 |
common._db.updateRunJob_(job_id , listCode)
|
130 |
|
131 |
if self.logDir != self.outDir:
|
132 |
for i_id in self.list_id:
|
133 |
try:
|
134 |
cmd = 'mv '+str(self.outDir)+'/*'+str(i_id)+'.std* '+str(self.outDir)+'/.BrokerInfo '+str(self.outDir)+'/*.log '+str(self.logDir)
|
135 |
cmd_out =os.system(cmd)
|
136 |
except:
|
137 |
msg = 'Problem with copy of job results'
|
138 |
common.logger.message(msg)
|
139 |
msg = 'Results of Jobs # '+str(self.list_id)+' are in '+self.outDir+' (log files are in '+self.logDir+')'
|
140 |
common.logger.message(msg)
|
141 |
else:
|
142 |
msg = 'Results of Jobs # '+str(self.list_id)+' are in '+self.outDir
|
143 |
common.logger.message(msg)
|
144 |
|
145 |
return
|
146 |
|
147 |
def parseFinalReport(self, input):
|
148 |
"""
|
149 |
Parses the FJR produced by job in order to retrieve
|
150 |
the WrapperExitCode and ExeExitCode.
|
151 |
Updates the BossDB with these values.
|
152 |
|
153 |
"""
|
154 |
from ProdCommon.FwkJobRep.ReportParser import readJobReport
|
155 |
|
156 |
#input = self.outDir + '/crab_fjr_' + str(jobid) + '.xml'
|
157 |
codeValue = {}
|
158 |
|
159 |
jobReport = readJobReport(input)[0]
|
160 |
|
161 |
exit_status = ''
|
162 |
|
163 |
##### temporary fix for FJR incomplete ####
|
164 |
fjr = open (input)
|
165 |
len_fjr = len(fjr.readlines())
|
166 |
if (len_fjr <= 6):
|
167 |
### 50115 - cmsRun did not produce a valid/readable job report at runtime
|
168 |
codeValue["applicationReturnCode"] = str(50115)
|
169 |
codeValue["wrapperReturnCode"] = str(50115)
|
170 |
|
171 |
if len(jobReport.errors) != 0 :
|
172 |
for error in jobReport.errors:
|
173 |
if error['Type'] == 'WrapperExitCode':
|
174 |
codeValue["wrapperReturnCode"] = error['ExitStatus']
|
175 |
#print "wrapperReturnCode = ", error['ExitStatus']
|
176 |
elif error['Type'] == 'ExeExitCode':
|
177 |
codeValue["applicationReturnCode"] = error['ExitStatus']
|
178 |
#print "applicationReturnCode = ", error['ExitStatus']
|
179 |
else:
|
180 |
continue
|
181 |
|
182 |
if not codeValue.has_key('wrapperReturnCode'):
|
183 |
codeValue["wrapperReturnCode"] = ''
|
184 |
if not codeValue.has_key('applicationReturnCode'):
|
185 |
codeValue["applicationReturnCode"] = ''
|
186 |
|
187 |
return codeValue
|
188 |
|
189 |
def moveOutput(self):
|
190 |
"""
|
191 |
Move output of job already retrieved
|
192 |
into the correct backup directory
|
193 |
"""
|
194 |
sub_id = common._db.queryRunJob('submission',self.list_id)
|
195 |
Dist_sub_id = list(set(sub_id))
|
196 |
|
197 |
OutDir_Base=self.outDir+'Submission_'
|
198 |
|
199 |
for i in Dist_sub_id:
|
200 |
if not os.path.isdir(OutDir_Base+str(i)):
|
201 |
cmd=('mkdir '+OutDir_Base+str(i))
|
202 |
cmd_out = runCommand(cmd)
|
203 |
common.logger.debug(3,cmd_out)
|
204 |
i = 0
|
205 |
for id in self.list_id:
|
206 |
try:
|
207 |
cmd='mv '+self.outDir+'*_'+str(id)+'.* '+OutDir_Base+str(sub_id[i])
|
208 |
cmd_out = runCommand(cmd)
|
209 |
common.logger.debug(3,cmd_out)
|
210 |
except:
|
211 |
msg = 'no output to move for job '+str(id)
|
212 |
common.logger.debig(3,msg)
|
213 |
pass
|
214 |
i+=1
|
215 |
return
|