1 |
slacapra |
1.1 |
from crab_logger import Logger
|
2 |
|
|
from crab_exceptions import *
|
3 |
|
|
from crab_util import *
|
4 |
|
|
import common
|
5 |
|
|
import os, time, shutil
|
6 |
|
|
|
7 |
spiga |
1.4 |
from ProdCommon.BossLite.API.BossLiteAPI import BossLiteAPI
|
8 |
|
|
|
9 |
|
|
|
10 |
|
|
from ProdCommon.BossLite.DbObjects.Job import Job
|
11 |
|
|
from ProdCommon.BossLite.DbObjects.Task import Task
|
12 |
|
|
from ProdCommon.BossLite.DbObjects.RunningJob import RunningJob
|
13 |
|
|
|
14 |
|
|
from ProdCommon.BossLite.API.BossLiteAPISched import BossLiteAPISched
|
15 |
slacapra |
1.1 |
|
16 |
|
|
class Boss:
|
17 |
|
|
def __init__(self):
|
18 |
spiga |
1.4 |
|
19 |
slacapra |
1.1 |
return
|
20 |
spiga |
1.9 |
|
21 |
|
|
def configure(self,cfg_params):
|
22 |
|
|
self.cfg_params = cfg_params
|
23 |
|
|
self.schedulerName = self.cfg_params.get("CRAB.scheduler",'') # this should match with the bosslite requirements
|
24 |
|
|
self.rb_param_file=''
|
25 |
|
|
if (cfg_params.has_key('EDG.rb')):
|
26 |
|
|
self.rb_param_file=common.scheduler.rb_configure(cfg_params.get("EDG.rb"))
|
27 |
|
|
self.wms_service=cfg_params.get("EDG.wms_service",'')
|
28 |
|
|
|
29 |
|
|
self.outDir = cfg_params.get("USER.outputdir", common.work_space.resDir() )
|
30 |
|
|
self.logDir = cfg_params.get("USER.logdir", common.work_space.resDir() )
|
31 |
slacapra |
1.1 |
|
32 |
spiga |
1.9 |
self.return_data = cfg_params.get('USER.return_data',0)
|
33 |
spiga |
1.4 |
|
34 |
spiga |
1.8 |
## Add here the map for others Schedulers (LSF/CAF/CondorG)
|
35 |
spiga |
1.9 |
SchedMap = {'glite':'SchedulerGLiteAPI',
|
36 |
|
|
'glitecoll':'SchedulerGLiteAPI',\
|
37 |
|
|
'condor_g':'',\
|
38 |
|
|
'lsf':'',\
|
39 |
|
|
'caf':''
|
40 |
|
|
}
|
41 |
|
|
|
42 |
spiga |
1.8 |
schedulerConfig = {
|
43 |
spiga |
1.9 |
'name' : SchedMap[self.schedulerName], \
|
44 |
|
|
'service' : self.wms_service, \
|
45 |
|
|
'config' : self.rb_param_file
|
46 |
slacapra |
1.7 |
}
|
47 |
|
|
|
48 |
spiga |
1.8 |
self.schedSession = BossLiteAPISched( common.bossSession, schedulerConfig)
|
49 |
slacapra |
1.1 |
|
50 |
|
|
return
|
51 |
|
|
|
52 |
spiga |
1.8 |
def declare(self, nj):
|
53 |
slacapra |
1.1 |
"""
|
54 |
|
|
BOSS declaration of jobs
|
55 |
|
|
"""
|
56 |
spiga |
1.4 |
index = nj - 1
|
57 |
|
|
job = common.job_list[index]
|
58 |
|
|
jbt = job.type()
|
59 |
|
|
base = jbt.name()
|
60 |
|
|
|
61 |
spiga |
1.8 |
wrapper = common._db.queryTask('scriptName') ## Should disappear...
|
62 |
|
|
## we'll have ONLY 'executable'
|
63 |
|
|
## as task field and not job field
|
64 |
|
|
listField=[]
|
65 |
|
|
listID=[]
|
66 |
|
|
task=common._db.getTask()
|
67 |
spiga |
1.4 |
for id in range(nj):
|
68 |
|
|
parameters={}
|
69 |
|
|
jobs=[]
|
70 |
|
|
out=[]
|
71 |
spiga |
1.8 |
stdout = base +'_'+ str(id+1)+'.stdout'
|
72 |
|
|
stderr = base +'_'+ str(id+1)+'.stderr'
|
73 |
spiga |
1.4 |
jobs.append(id)
|
74 |
spiga |
1.8 |
out=task.jobs[id]['outputFiles']
|
75 |
spiga |
1.4 |
out.append(stdout)
|
76 |
|
|
out.append(stderr)
|
77 |
|
|
out.append('.BrokerInfo')
|
78 |
|
|
parameters['outputFiles']=out
|
79 |
spiga |
1.8 |
parameters['executable']=wrapper
|
80 |
spiga |
1.4 |
parameters['standardOutput'] = stdout
|
81 |
|
|
parameters['standardError'] = stderr
|
82 |
spiga |
1.8 |
listField.append(parameters)
|
83 |
|
|
listID.append(id)
|
84 |
|
|
common._db.updateJob_( listID, listField)
|
85 |
slacapra |
1.1 |
|
86 |
|
|
return
|
87 |
|
|
|
88 |
spiga |
1.10 |
def listMatch(self, tags, dest, whiteL, blackL ):
|
89 |
slacapra |
1.1 |
"""
|
90 |
|
|
Check the compatibility of available resources
|
91 |
|
|
"""
|
92 |
spiga |
1.10 |
sites = self.schedSession.lcgInfo(tags, dest, whiteL, blackL )
|
93 |
|
|
|
94 |
spiga |
1.9 |
# Tout = 120
|
95 |
|
|
# CEs=[]
|
96 |
|
|
# try:
|
97 |
|
|
# CEs=self.bossUser.schedListMatch( schedulerName, schcladstring, self.bossTask.id(), "", Tout)
|
98 |
|
|
# common.logger.debug(1,"CEs :"+str(CEs))
|
99 |
|
|
# except SchedulerError,e:
|
100 |
|
|
# common.logger.message( "Warning : Scheduler interaction in list-match operation failed for jobs:")
|
101 |
|
|
# common.logger.message( e.__str__())
|
102 |
|
|
# pass
|
103 |
|
|
# except BossError,e:
|
104 |
|
|
# raise CrabException("ERROR: listMatch failed with message " + e.__str__())
|
105 |
|
|
# return CEs
|
106 |
spiga |
1.10 |
return len(sites)
|
107 |
slacapra |
1.1 |
|
108 |
spiga |
1.8 |
def submit(self, jobsList,req):
|
109 |
slacapra |
1.1 |
"""
|
110 |
|
|
Submit BOSS function.
|
111 |
|
|
Submit one job. nj -- job number.
|
112 |
|
|
"""
|
113 |
spiga |
1.11 |
task = common._db.getTask(jobsList)
|
114 |
spiga |
1.8 |
self.schedSession.submit( task,jobsList,req )
|
115 |
spiga |
1.5 |
# try:
|
116 |
|
|
# except SchedulerError,e:
|
117 |
|
|
# common.logger.message("Warning : Scheduler interaction in submit operation failed for jobs:")
|
118 |
|
|
# common.logger.message(e.__str__())
|
119 |
|
|
# pass
|
120 |
|
|
# except BossError,e:
|
121 |
|
|
# common.logger.message("Error : BOSS command failed with message:")
|
122 |
|
|
# common.logger.message(e.__str__())
|
123 |
slacapra |
1.1 |
|
124 |
spiga |
1.10 |
return
|
125 |
|
|
|
126 |
|
|
def queryEverything(self,taskid):
|
127 |
|
|
"""
|
128 |
|
|
Query needed info of all jobs with specified boss taskid
|
129 |
|
|
"""
|
130 |
|
|
|
131 |
|
|
self.schedSession.query( str(taskid))
|
132 |
|
|
|
133 |
|
|
return
|
134 |
slacapra |
1.1 |
|
135 |
|
|
def moveOutput(self, int_id):
|
136 |
|
|
"""
|
137 |
|
|
Move output of job already retrieved
|
138 |
|
|
"""
|
139 |
|
|
self.current_time = time.strftime('%y%m%d_%H%M%S',time.localtime(time.time()))
|
140 |
|
|
resDir = common.work_space.resDir()
|
141 |
|
|
resDirSave = resDir+'res_backup'
|
142 |
|
|
if not os.path.exists(resDirSave):
|
143 |
|
|
os.mkdir(resDirSave)
|
144 |
|
|
|
145 |
|
|
boss_id = str(int_id)
|
146 |
|
|
try:
|
147 |
|
|
self.bossTask.load (ALL, boss_id )
|
148 |
|
|
cmd_out = self.bossTask.program(boss_id, '1')['OUTFILES']
|
149 |
|
|
except BossError,e:
|
150 |
|
|
common.logger.message( e.__str__() )
|
151 |
|
|
|
152 |
|
|
files = cmd_out.split(',')
|
153 |
|
|
for i in files:
|
154 |
|
|
if os.path.exists(self.outDir+'/'+i):
|
155 |
|
|
shutil.move(self.outDir+'/'+i, resDirSave+'/'+i+'_'+self.current_time)
|
156 |
|
|
common.logger.message('Output file '+i+' moved to '+resDirSave)
|
157 |
|
|
|
158 |
|
|
if os.path.exists(self.logDir+'/'+i):
|
159 |
|
|
shutil.move(self.logDir+'/'+i, resDirSave+'/'+i+'_'+self.current_time)
|
160 |
|
|
common.logger.message('Output file '+i+' moved to '+resDirSave)
|
161 |
|
|
return
|
162 |
|
|
|
163 |
|
|
###################### ---- OK for Boss4 ds
|
164 |
|
|
def getOutput(self, int_id):
|
165 |
|
|
"""
|
166 |
|
|
Get output for a finished job with id.
|
167 |
|
|
Returns the name of directory with results.
|
168 |
|
|
"""
|
169 |
|
|
if not os.path.isdir(self.logDir) or not os.path.isdir(self.outDir):
|
170 |
|
|
msg = ' Output or Log dir not found!! check '+self.logDir+' and '+self.outDir
|
171 |
|
|
raise CrabException(msg)
|
172 |
|
|
common.jobDB.load()
|
173 |
|
|
allBoss_id = self.list()
|
174 |
|
|
bossTaskId = common.taskDB.dict('BossTaskId')
|
175 |
|
|
## first get the status of all job in the list
|
176 |
|
|
statusList = self.queryStatusList(bossTaskId, int_id)
|
177 |
|
|
check = 0
|
178 |
|
|
|
179 |
|
|
## then loop over jobs and retrieve it if it's the case
|
180 |
|
|
create= []
|
181 |
|
|
run= []
|
182 |
|
|
clear=[]
|
183 |
|
|
abort=[]
|
184 |
|
|
canc=[]
|
185 |
|
|
read=[]
|
186 |
|
|
wait=[]
|
187 |
|
|
sched=[]
|
188 |
|
|
kill=[]
|
189 |
|
|
other=[]
|
190 |
|
|
Tout=180
|
191 |
|
|
|
192 |
|
|
for i_id in int_id :
|
193 |
|
|
if i_id not in allBoss_id:
|
194 |
|
|
msg = 'Job # '+`int(i_id)`+' out of range for task '+ self.groupName
|
195 |
|
|
common.logger.message(msg)
|
196 |
|
|
else:
|
197 |
|
|
dir = self.outDir
|
198 |
|
|
logDir = self.logDir
|
199 |
|
|
boss_id = i_id
|
200 |
|
|
#bossTaskIdStatus = common.scheduler.queryStatus(bossTaskId, boss_id)
|
201 |
|
|
bossTaskIdStatus = statusList[boss_id]
|
202 |
|
|
if bossTaskIdStatus == 'Done (Success)' or bossTaskIdStatus == 'Done (Abort)':
|
203 |
|
|
check = 1
|
204 |
|
|
try:
|
205 |
|
|
self.bossTask.getOutput (str(boss_id), str(dir), Tout)
|
206 |
|
|
if logDir != dir:
|
207 |
|
|
try:
|
208 |
|
|
######
|
209 |
|
|
cmd = 'mv '+str(dir)+'/*'+str(i_id)+'.std* '+str(dir)+'/.BrokerInfo '+str(dir)+'/*.log '+str(logDir)
|
210 |
|
|
cmd_out =os.system(cmd)
|
211 |
|
|
msg = 'Results of Job # '+str(i_id)+' are in '+dir+' (log files are in '+logDir+')'
|
212 |
|
|
common.logger.message(msg)
|
213 |
|
|
#####
|
214 |
|
|
#toMove = str(dir)+'/*'+`int(i_id)`+'.std* '+str(dir)+'/*.log '+str(dir)+'/.BrokerInfo '
|
215 |
|
|
#shutil.move(toMove, str(logDir))
|
216 |
|
|
#####
|
217 |
|
|
except:
|
218 |
|
|
msg = 'Problem with copy of job results'
|
219 |
|
|
common.logger.message(msg)
|
220 |
|
|
pass
|
221 |
|
|
else:
|
222 |
|
|
msg = 'Results of Job # '+`int(i_id)`+' are in '+dir
|
223 |
|
|
common.logger.message(msg)
|
224 |
|
|
common.jobDB.setStatus(int(i_id)-1, 'Y')
|
225 |
|
|
except SchedulerError,e:
|
226 |
|
|
common.logger.message("Warning : Scheduler interaction in getOutput operation failed for jobs:")
|
227 |
|
|
common.logger.message(e.__str__())
|
228 |
|
|
pass
|
229 |
|
|
except BossError,e:
|
230 |
|
|
common.logger.message(e.__str__())
|
231 |
|
|
msg = 'Results of Job # '+`int(i_id)`+' have been corrupted and could not be retrieved.'
|
232 |
|
|
common.logger.message(msg)
|
233 |
|
|
common.jobDB.setStatus(int(i_id)-1, 'Z')
|
234 |
|
|
elif bossTaskIdStatus == 'Running' :
|
235 |
|
|
run.append(i_id)
|
236 |
|
|
# msg = 'Job # '+`int(i_id)`+' has status '+bossTaskIdStatus+'. It is not possible yet to retrieve the output.'
|
237 |
|
|
# common.logger.message(msg)
|
238 |
|
|
elif bossTaskIdStatus == 'Cleared' :
|
239 |
|
|
clear.append(i_id)
|
240 |
|
|
# msg = 'Job # '+`int(i_id)`+' has status '+bossTaskIdStatus+'. The output was already retrieved.'
|
241 |
|
|
# common.logger.message(msg)
|
242 |
|
|
elif bossTaskIdStatus == 'Aborted' :
|
243 |
|
|
abort.append(i_id)
|
244 |
|
|
# msg = 'Job # '+`int(i_id)`+' has status '+bossTaskIdStatus+'. It is not possible to retrieve the output.'
|
245 |
|
|
# common.logger.message(msg)
|
246 |
|
|
elif bossTaskIdStatus == 'Created' :
|
247 |
|
|
create.append(i_id)
|
248 |
|
|
elif bossTaskIdStatus == 'Cancelled' :
|
249 |
|
|
canc.append(i_id)
|
250 |
|
|
elif bossTaskIdStatus == 'Ready' :
|
251 |
|
|
read.append(i_id)
|
252 |
|
|
elif bossTaskIdStatus == 'Scheduled' :
|
253 |
|
|
sched.append(i_id)
|
254 |
|
|
elif bossTaskIdStatus == 'Waiting' :
|
255 |
|
|
wait.append(i_id)
|
256 |
|
|
elif bossTaskIdStatus == 'Killed' :
|
257 |
|
|
kill.append(i_id)
|
258 |
|
|
else:
|
259 |
|
|
other.append(i_id)
|
260 |
|
|
# msg = 'Job # '+`int(i_id)`+' has status '+bossTaskIdStatus+'. It is currently not possible to retrieve the output.'
|
261 |
|
|
# common.logger.message(msg)
|
262 |
|
|
dir += os.environ['USER']
|
263 |
|
|
dir += '_' + os.path.basename(str(boss_id))
|
264 |
|
|
pass
|
265 |
|
|
common.jobDB.save()
|
266 |
|
|
if check == 0:
|
267 |
|
|
msg = '\n\n*********No job in Done status. It is not possible yet to retrieve the output.\n'
|
268 |
|
|
common.logger.message(msg)
|
269 |
|
|
|
270 |
|
|
if len(clear)!=0: print str(len(clear))+' jobs already cleared'
|
271 |
|
|
if len(abort)!=0: print str(len(abort))+' jobs aborted'
|
272 |
|
|
if len(canc)!=0: print str(len(canc))+' jobs cancelled'
|
273 |
|
|
if len(kill)!=0: print str(len(kill))+' jobs killed'
|
274 |
|
|
if len(run)!=0: print str(len(run))+' jobs still running'
|
275 |
|
|
if len(sched)!=0: print str(len(sched))+' jobs scheduled'
|
276 |
|
|
if len(wait)!=0: print str(len(wait))+' jobs waiting'
|
277 |
|
|
if len(read)!=0: print str(len(read))+' jobs ready'
|
278 |
|
|
if len(other)!=0: print str(len(other))+' jobs submitted'
|
279 |
|
|
if len(create)!=0: print str(len(create))+' jobs not yet submitted'
|
280 |
|
|
|
281 |
|
|
print ' '
|
282 |
|
|
return
|
283 |
|
|
|
284 |
|
|
###################### ---- OK for Boss4 ds
|
285 |
|
|
def cancel(self,subm_id):
|
286 |
|
|
"""
|
287 |
|
|
Cancel the EDG job with id: if id == -1, means all jobs.
|
288 |
|
|
"""
|
289 |
|
|
#print "CANCEL -------------------------"
|
290 |
|
|
#print "int_id ",int_id," nSubmitted ", common.jobDB.nSubmittedJobs()
|
291 |
|
|
|
292 |
|
|
common.jobDB.load()
|
293 |
|
|
if len( subm_id ) > 0:
|
294 |
|
|
try:
|
295 |
|
|
subm_id.sort()
|
296 |
|
|
range = self.prepString( subm_id )
|
297 |
|
|
common.logger.message("Killing job # " + str(subm_id).replace("[","",1).replace("]","",1) )
|
298 |
|
|
Tout =len(subm_id)*60
|
299 |
|
|
self.bossTask.kill( range, Tout )
|
300 |
|
|
self.bossTask.load(ALL, range)
|
301 |
|
|
task = self.bossTask.jobsDict()
|
302 |
|
|
for k, v in task.iteritems():
|
303 |
|
|
k = int(k)
|
304 |
|
|
status = v['STATUS']
|
305 |
|
|
if k in subm_id and status == 'K':
|
306 |
|
|
common.jobDB.setStatus(k - 1, 'K')
|
307 |
|
|
except SchedulerError,e:
|
308 |
|
|
common.logger.message("Warning : Scheduler interaction on kill operation failed for jobs:"+ e.__str__())
|
309 |
|
|
pass
|
310 |
|
|
except BossError,e:
|
311 |
|
|
common.logger.message( e.__str__() + "\nError killing jobs # "+str(subm_id)+" . See log for details")
|
312 |
|
|
common.jobDB.save()
|
313 |
|
|
pass
|
314 |
|
|
else:
|
315 |
slacapra |
1.3 |
common.logger.message("\nNo job to be killed")
|
316 |
slacapra |
1.1 |
common.jobDB.save()
|
317 |
|
|
return
|
318 |
|
|
|
319 |
|
|
def setFlag( self, list, index ):
|
320 |
|
|
if len( list ) > (index + 1):
|
321 |
|
|
if list[index + 1] == ( list[index] + 1 ):
|
322 |
|
|
return -2
|
323 |
|
|
return -1
|
324 |
|
|
return list[ len(list) - 1 ]
|
325 |
|
|
|
326 |
|
|
def prepString( self, list ):
|
327 |
|
|
s = ""
|
328 |
|
|
flag = 0
|
329 |
|
|
for i in range( len( list ) ):
|
330 |
|
|
if flag == 0:
|
331 |
|
|
s = str( list[i] )
|
332 |
|
|
flag = self.setFlag( list, i )
|
333 |
|
|
elif flag == -1:
|
334 |
|
|
s = s + "," + str( list[i] )
|
335 |
|
|
flag = self.setFlag( list, i )
|
336 |
|
|
elif flag == -2:
|
337 |
|
|
flag = self.setFlag( list, i )
|
338 |
|
|
if flag == -1:
|
339 |
|
|
s = s + ":" + str( list[i] )
|
340 |
|
|
if flag > 0:
|
341 |
|
|
s = s + ":" + str( list[i] )
|
342 |
|
|
return s
|
343 |
|
|
|
344 |
|
|
|
345 |
|
|
|
346 |
|
|
|