1 |
spiga |
1.1 |
from Actor import *
|
2 |
|
|
import common
|
3 |
|
|
import string, os, time
|
4 |
|
|
from crab_util import *
|
5 |
|
|
|
6 |
|
|
class GetOutput(Actor):
|
7 |
|
|
def __init__(self, *args):
|
8 |
|
|
self.cfg_params = args[0]
|
9 |
|
|
self.jobs = args[1]
|
10 |
|
|
|
11 |
fanzago |
1.31 |
self.log=0
|
12 |
fanzago |
1.55 |
|
13 |
|
|
dir = os.getcwd()+'/'
|
14 |
fanzago |
1.16 |
self.outDir = self.cfg_params.get('USER.outputdir' ,common.work_space.resDir())
|
15 |
fanzago |
1.30 |
if ( self.outDir[-1] != '/' ) : self.outDir = self.outDir + '/'
|
16 |
fanzago |
1.55 |
if ( self.outDir[0] != '/') : self.outDir = dir + self.outDir
|
17 |
fanzago |
1.16 |
self.logDir = self.cfg_params.get('USER.logdir' ,common.work_space.resDir())
|
18 |
fanzago |
1.30 |
if ( self.logDir[-1] != '/' ) : self.logDir = self.logDir + '/'
|
19 |
fanzago |
1.55 |
if ( self.logDir[0] != '/') : self.logDir = dir +self.logDir
|
20 |
fanzago |
1.31 |
if self.logDir != self.outDir:
|
21 |
|
|
self.log=1
|
22 |
spiga |
1.1 |
self.return_data = self.cfg_params.get('USER.return_data',0)
|
23 |
|
|
|
24 |
slacapra |
1.40 |
self.dontCheckSpaceLeft = int(self.cfg_params.get('USER.dontCheckSpaceLeft' ,0))
|
25 |
spiga |
1.1 |
return
|
26 |
|
|
|
27 |
|
|
def run(self):
|
28 |
|
|
"""
|
29 |
spiga |
1.11 |
The main method of the class: Check destination dirs and
|
30 |
|
|
perform the get output
|
31 |
spiga |
1.1 |
"""
|
32 |
spiga |
1.50 |
common.logger.debug( "GetOutput::run() called")
|
33 |
spiga |
1.1 |
|
34 |
|
|
start = time.time()
|
35 |
|
|
self.getOutput()
|
36 |
|
|
stop = time.time()
|
37 |
spiga |
1.50 |
common.logger.debug( "GetOutput Time: "+str(stop - start))
|
38 |
spiga |
1.1 |
pass
|
39 |
|
|
|
40 |
|
|
def checkBeforeGet(self):
|
41 |
spiga |
1.2 |
# should be in this way... but a core dump appear... waiting for solution
|
42 |
|
|
#self.up_task = common.scheduler.queryEverything(1)
|
43 |
|
|
self.up_task = common._db.getTask()
|
44 |
spiga |
1.1 |
list_id_done=[]
|
45 |
mcinquil |
1.57 |
list_id_done_not_term=[]
|
46 |
spiga |
1.1 |
self.list_id=[]
|
47 |
|
|
self.all_id =[]
|
48 |
|
|
for job in self.up_task.jobs:
|
49 |
spiga |
1.49 |
if (job.runningJob['state'] == 'Terminated'):
|
50 |
spiga |
1.18 |
list_id_done.append(job['jobId'])
|
51 |
mcinquil |
1.57 |
elif job.runningJob['status'] in ['Done', 'Done (Failed)']:
|
52 |
|
|
list_id_done_not_term.append(job['jobId'])
|
53 |
spiga |
1.18 |
self.all_id.append(job['jobId'])
|
54 |
spiga |
1.1 |
check = -1
|
55 |
spiga |
1.6 |
if self.jobs != 'all': check = len( set(self.jobs).intersection(set(list_id_done)) )
|
56 |
spiga |
1.1 |
if len(list_id_done)==0 or ( check == 0 ) :
|
57 |
spiga |
1.35 |
msg=''
|
58 |
|
|
list_jobs=self.jobs
|
59 |
|
|
if self.jobs == 'all': list_jobs=self.all_id
|
60 |
mcinquil |
1.57 |
msg += 'Jobs %s are not in Done status. It is not possible yet to retrieve the output. \n'% readableList(self,list_jobs)
|
61 |
|
|
if len(list_id_done) > 0:
|
62 |
|
|
msg += ' Retrieve the jobs if those are in Done status and Terminatedi action. \n'
|
63 |
|
|
msg += ' To know the action of a job run: "crab -status v " \n'
|
64 |
spiga |
1.1 |
raise CrabException(msg)
|
65 |
|
|
else:
|
66 |
|
|
if self.jobs == 'all':
|
67 |
|
|
self.list_id= list_id_done
|
68 |
|
|
if len(self.up_task.jobs)>len(self.list_id):
|
69 |
spiga |
1.54 |
msg = 'Only %d jobs will be retrieved '% (len(self.list_id))
|
70 |
spiga |
1.1 |
msg += ' from %d requested.\n'%(len(self.up_task.jobs))
|
71 |
spiga |
1.54 |
msg += '\t(for details: crab -status)'
|
72 |
spiga |
1.50 |
common.logger.info(msg)
|
73 |
spiga |
1.1 |
else:
|
74 |
|
|
for id in self.jobs:
|
75 |
|
|
if id in list_id_done: self.list_id.append(id)
|
76 |
spiga |
1.6 |
if len(self.jobs) > len(self.list_id):
|
77 |
spiga |
1.54 |
msg = 'Only %d jobs will be retrieved '% (len(self.list_id))
|
78 |
spiga |
1.1 |
msg += ' from %d requested.\n'%(len(self.jobs))
|
79 |
spiga |
1.54 |
msg += '\t(for details: crab -status)'
|
80 |
spiga |
1.50 |
common.logger.info(msg)
|
81 |
spiga |
1.1 |
if not os.path.isdir(self.logDir) or not os.path.isdir(self.outDir):
|
82 |
|
|
msg = ' Output or Log dir not found!! check '+self.logDir+' and '+self.outDir
|
83 |
|
|
raise CrabException(msg)
|
84 |
|
|
return
|
85 |
|
|
|
86 |
|
|
def getOutput(self):
|
87 |
|
|
"""
|
88 |
|
|
Get output for a finished job with id.
|
89 |
|
|
"""
|
90 |
spiga |
1.4 |
self.checkBeforeGet()
|
91 |
slacapra |
1.40 |
# Get first job of the list
|
92 |
|
|
if not self.dontCheckSpaceLeft and not has_freespace(self.outDir, 10*1024): # First check for more than 10 Mb
|
93 |
|
|
msg = "You have LESS than 10 MB of free space on your working dir\n"
|
94 |
|
|
msg +="Please make some room before retrying\n\n"
|
95 |
|
|
msg +="To bypass this check, run \n"
|
96 |
|
|
msg +="crab -get -USER.dontCheckSpaceLeft=1 \n"
|
97 |
|
|
raise CrabException(msg)
|
98 |
|
|
list_first=self.list_id[0:1]
|
99 |
|
|
task= common.scheduler.getOutput(1, list_first, self.outDir)
|
100 |
|
|
lastSize = self.organizeOutput( task, list_first )
|
101 |
|
|
# here check disk space for first job
|
102 |
|
|
if not self.dontCheckSpaceLeft and not has_freespace(self.outDir, lastSize*len(self.list_id)*1.2) : # add a 20% overhead
|
103 |
|
|
msg = "Estimated space needed for getOutput is "+str(lastSize*len(self.list_id)*1.2)
|
104 |
|
|
msg +=" which is LESS than available space on disk\n"
|
105 |
|
|
msg +="Please make some room before retrying\n"
|
106 |
|
|
msg +="To bypass this check, run \n"
|
107 |
|
|
msg +="crab -get -USER.dontCheckSpaceLeft=1 \n"
|
108 |
|
|
raise CrabException(msg)
|
109 |
|
|
# get the size of the actual OSB of first job
|
110 |
|
|
if (len(self.list_id)>1) :
|
111 |
|
|
# check disk space for other N jobs using estimate from the first
|
112 |
|
|
list_other=self.list_id[1:]
|
113 |
|
|
task= common.scheduler.getOutput(1, list_other, self.outDir)
|
114 |
|
|
self.organizeOutput( task, list_other )
|
115 |
spiga |
1.12 |
return
|
116 |
|
|
|
117 |
slacapra |
1.40 |
def organizeOutput(self, task, list_id):
|
118 |
spiga |
1.12 |
"""
|
119 |
|
|
Untar Output
|
120 |
|
|
"""
|
121 |
spiga |
1.6 |
listCode = []
|
122 |
|
|
job_id = []
|
123 |
spiga |
1.2 |
|
124 |
spiga |
1.13 |
success_ret = 0
|
125 |
fanzago |
1.42 |
size = 0 # in kB
|
126 |
slacapra |
1.40 |
for id in list_id:
|
127 |
spiga |
1.39 |
runningJob = task.getJob( id ).runningJob
|
128 |
spiga |
1.38 |
if runningJob.isError() :
|
129 |
|
|
continue
|
130 |
spiga |
1.6 |
file = 'out_files_'+ str(id)+'.tgz'
|
131 |
fanzago |
1.33 |
if os.path.exists(self.outDir + file):
|
132 |
spiga |
1.39 |
self.max_id = runningJob['submission']
|
133 |
fanzago |
1.33 |
if self.max_id > 1:
|
134 |
|
|
for f in os.listdir(self.outDir):
|
135 |
fanzago |
1.46 |
if (f.find('_'+str(id)+'.') != -1 ) and (f != file) and f.find('Submission_'+str(id)) == -1:
|
136 |
fanzago |
1.33 |
self.moveOutput(id, self.max_id, self.outDir, f)
|
137 |
|
|
if self.log==1:
|
138 |
|
|
for f in os.listdir(self.logDir):
|
139 |
fanzago |
1.46 |
if f.find('_'+str(id)+'.') != -1 and f.find('Submission_'+str(id)) == -1:
|
140 |
fanzago |
1.33 |
self.moveOutput(id, self.max_id, self.logDir, f)
|
141 |
slacapra |
1.40 |
pass
|
142 |
|
|
pass
|
143 |
|
|
pass
|
144 |
mcinquil |
1.45 |
try:
|
145 |
|
|
size = getGZSize(self.outDir + file)/1024 # in kB
|
146 |
|
|
cmd = 'tar zxf ' + self.outDir + file + ' ' + '-C ' + self.outDir
|
147 |
|
|
cmd_out = runCommand(cmd)
|
148 |
|
|
cmd_2 ='rm ' + self.outDir + 'out_files_'+ str(id)+'.tgz'
|
149 |
|
|
cmd_out2 = runCommand(cmd_2)
|
150 |
|
|
msg = 'Results of Jobs # '+str(id)+' are in '+self.outDir
|
151 |
spiga |
1.50 |
common.logger.info(msg)
|
152 |
mcinquil |
1.45 |
except IOError, eio:
|
153 |
spiga |
1.50 |
common.logger.info("Output files for job "+ str(id) +" seems corrupted.\n")
|
154 |
mcinquil |
1.45 |
continue
|
155 |
spiga |
1.6 |
else:
|
156 |
spiga |
1.7 |
msg ="Output files for job "+ str(id) +" not available.\n"
|
157 |
belforte |
1.62 |
common.logger.info(msg)
|
158 |
|
|
FieldToUpdate={}
|
159 |
|
|
FieldToUpdate['state']= 'Cleared'
|
160 |
|
|
FieldToUpdate["applicationReturnCode"] = str(50700)
|
161 |
|
|
FieldToUpdate["wrapperReturnCode"] = str(50700)
|
162 |
|
|
job_id.append(id)
|
163 |
|
|
listCode.append(FieldToUpdate)
|
164 |
|
|
#continue
|
165 |
spiga |
1.6 |
input = 'crab_fjr_' + str(id) + '.xml'
|
166 |
fanzago |
1.33 |
if os.path.exists(self.outDir + input):
|
167 |
spiga |
1.51 |
FiledToUpdate = self.parseFinalReport(self.outDir + input)
|
168 |
|
|
FiledToUpdate['state']= 'Cleared'
|
169 |
spiga |
1.8 |
job_id.append(id)
|
170 |
spiga |
1.51 |
listCode.append(FiledToUpdate)
|
171 |
spiga |
1.6 |
else:
|
172 |
|
|
msg = "Problems with "+str(input)+". File not available.\n"
|
173 |
spiga |
1.50 |
common.logger.info(msg)
|
174 |
spiga |
1.13 |
success_ret +=1
|
175 |
spiga |
1.6 |
common._db.updateRunJob_(job_id , listCode)
|
176 |
spiga |
1.1 |
|
177 |
|
|
if self.logDir != self.outDir:
|
178 |
slacapra |
1.40 |
for i_id in list_id:
|
179 |
spiga |
1.1 |
try:
|
180 |
fanzago |
1.17 |
cmd = 'mv '+str(self.outDir)+'/*'+str(i_id)+'.std* '+str(self.outDir)+'/.BrokerInfo '+str(self.logDir)
|
181 |
spiga |
1.1 |
cmd_out =os.system(cmd)
|
182 |
|
|
except:
|
183 |
|
|
msg = 'Problem with copy of job results'
|
184 |
spiga |
1.50 |
common.logger.info(msg)
|
185 |
slacapra |
1.40 |
msg = 'Results of Jobs # '+str(list_id)+' are in '+self.outDir+' (log files are in '+self.logDir+')'
|
186 |
spiga |
1.50 |
common.logger.info(msg)
|
187 |
slacapra |
1.40 |
return size
|
188 |
spiga |
1.1 |
|
189 |
fanzago |
1.5 |
def parseFinalReport(self, input):
|
190 |
|
|
"""
|
191 |
|
|
Parses the FJR produced by job in order to retrieve
|
192 |
|
|
the WrapperExitCode and ExeExitCode.
|
193 |
|
|
Updates the BossDB with these values.
|
194 |
|
|
|
195 |
|
|
"""
|
196 |
|
|
from ProdCommon.FwkJobRep.ReportParser import readJobReport
|
197 |
|
|
|
198 |
|
|
codeValue = {}
|
199 |
|
|
|
200 |
afanfani |
1.14 |
jreports = readJobReport(input)
|
201 |
|
|
if len(jreports) <= 0 :
|
202 |
|
|
codeValue["applicationReturnCode"] = str(50115)
|
203 |
|
|
codeValue["wrapperReturnCode"] = str(50115)
|
204 |
spiga |
1.50 |
common.logger.debug("Empty FWkobreport: error code assigned is 50115 ")
|
205 |
afanfani |
1.14 |
return codeValue
|
206 |
|
|
|
207 |
|
|
jobReport = jreports[0]
|
208 |
fanzago |
1.5 |
|
209 |
|
|
exit_status = ''
|
210 |
|
|
|
211 |
|
|
##### temporary fix for FJR incomplete ####
|
212 |
|
|
fjr = open (input)
|
213 |
|
|
len_fjr = len(fjr.readlines())
|
214 |
|
|
if (len_fjr <= 6):
|
215 |
|
|
### 50115 - cmsRun did not produce a valid/readable job report at runtime
|
216 |
|
|
codeValue["applicationReturnCode"] = str(50115)
|
217 |
|
|
codeValue["wrapperReturnCode"] = str(50115)
|
218 |
|
|
|
219 |
|
|
if len(jobReport.errors) != 0 :
|
220 |
|
|
for error in jobReport.errors:
|
221 |
|
|
if error['Type'] == 'WrapperExitCode':
|
222 |
|
|
codeValue["wrapperReturnCode"] = error['ExitStatus']
|
223 |
spiga |
1.43 |
elif error['Type'] == 'ExeExitCode':
|
224 |
|
|
codeValue["applicationReturnCode"] = error['ExitStatus']
|
225 |
|
|
if error['Type'] == 'CMSException':
|
226 |
|
|
codeValue["applicationReturnCodeOrig"] = error['ExitStatus']
|
227 |
fanzago |
1.5 |
else:
|
228 |
|
|
continue
|
229 |
|
|
|
230 |
|
|
if not codeValue.has_key('wrapperReturnCode'):
|
231 |
|
|
codeValue["wrapperReturnCode"] = ''
|
232 |
|
|
if not codeValue.has_key('applicationReturnCode'):
|
233 |
spiga |
1.43 |
if codeValue.has_key('applicationReturnCodeOrig'):
|
234 |
|
|
codeValue["applicationReturnCode"] = \
|
235 |
|
|
codeValue["applicationReturnCodeOrig"]
|
236 |
|
|
codeValue.pop("applicationReturnCodeOrig")
|
237 |
|
|
else:
|
238 |
|
|
codeValue["applicationReturnCode"] = ''
|
239 |
fanzago |
1.5 |
|
240 |
spiga |
1.44 |
else:
|
241 |
|
|
if codeValue.has_key('applicationReturnCodeOrig'):
|
242 |
|
|
codeValue.pop("applicationReturnCodeOrig")
|
243 |
fanzago |
1.53 |
|
244 |
|
|
#### Filling BOSS DB with SE name and LFN, for edm and not_edm files ####
|
245 |
slacapra |
1.58 |
lfns=[]
|
246 |
fanzago |
1.59 |
pfns=[]
|
247 |
fanzago |
1.60 |
|
248 |
fanzago |
1.53 |
if (len(jobReport.files) != 0):
|
249 |
|
|
for f in jobReport.files:
|
250 |
|
|
if f['LFN']:
|
251 |
slacapra |
1.58 |
lfns.append(f['LFN'])
|
252 |
fanzago |
1.59 |
if f['PFN']:
|
253 |
fanzago |
1.60 |
#### FEDE to have the correct endpoit to use in the copyData (we modify the bossDB value and not the fjr )
|
254 |
|
|
if common.scheduler.name().upper() not in ['LSF', 'CAF', 'PBS'] and codeValue["wrapperReturnCode"] == 60308:
|
255 |
|
|
pfns.append(os.path.dirname(f['SurlForGrid'])+'/')
|
256 |
|
|
else:
|
257 |
|
|
pfns.append(os.path.dirname(f['PFN'])+'/')
|
258 |
|
|
##########
|
259 |
fanzago |
1.53 |
if (len(jobReport.analysisFiles) != 0):
|
260 |
|
|
for aFile in jobReport.analysisFiles:
|
261 |
|
|
if aFile['LFN']:
|
262 |
slacapra |
1.58 |
lfns.append(aFile['LFN'])
|
263 |
fanzago |
1.60 |
if aFile['PFN']:
|
264 |
|
|
#### FEDE to have the correct endpoit to use in the copyData (we modify the bossDB value and not the fjr )
|
265 |
|
|
if common.scheduler.name().upper() not in ['LSF', 'CAF', 'PBS'] and codeValue["wrapperReturnCode"] == 60308:
|
266 |
|
|
pfns.append(os.path.dirname(aFile['SurlForGrid'])+'/')
|
267 |
|
|
else:
|
268 |
|
|
pfns.append(os.path.dirname(aFile['PFN'])+'/')
|
269 |
|
|
#########
|
270 |
fanzago |
1.59 |
codeValue["storage"] = pfns
|
271 |
slacapra |
1.58 |
codeValue["lfn"] = lfns
|
272 |
fanzago |
1.5 |
return codeValue
|
273 |
spiga |
1.1 |
|
274 |
fanzago |
1.33 |
def moveOutput(self,id, max_id,path,file):
|
275 |
spiga |
1.6 |
"""
|
276 |
|
|
Move output of job already retrieved
|
277 |
|
|
into the correct backup directory
|
278 |
|
|
"""
|
279 |
fanzago |
1.33 |
Dir_Base=path +'Submission_'
|
280 |
|
|
|
281 |
|
|
for i in range(1, max_id):
|
282 |
|
|
if not os.path.isdir( Dir_Base + str(i) + '/'):
|
283 |
|
|
cmd=('mkdir '+ Dir_Base + str(i) + '/ >& /dev/null')
|
284 |
spiga |
1.6 |
cmd_out = runCommand(cmd)
|
285 |
spiga |
1.50 |
common.logger.debug(str(cmd_out))
|
286 |
fanzago |
1.33 |
cmd='mv '+ path + file + ' ' + Dir_Base + str(max_id -1) + '/ >& /dev/null'
|
287 |
|
|
|
288 |
|
|
try:
|
289 |
|
|
cmd_out = runCommand(cmd)
|
290 |
spiga |
1.50 |
common.logger.debug(cmd_out)
|
291 |
fanzago |
1.33 |
except:
|
292 |
|
|
msg = 'no output to move for job '+str(id)
|
293 |
spiga |
1.50 |
common.logger.debug(msg)
|
294 |
fanzago |
1.33 |
pass
|
295 |
spiga |
1.6 |
return
|