1 |
slacapra |
1.1 |
from crab_logger import Logger
|
2 |
|
|
from crab_exceptions import *
|
3 |
|
|
from crab_util import *
|
4 |
|
|
import common
|
5 |
|
|
import os, time, shutil
|
6 |
|
|
|
7 |
spiga |
1.4 |
from ProdCommon.BossLite.API.BossLiteAPI import BossLiteAPI
|
8 |
|
|
|
9 |
|
|
|
10 |
|
|
from ProdCommon.BossLite.DbObjects.Job import Job
|
11 |
|
|
from ProdCommon.BossLite.DbObjects.Task import Task
|
12 |
|
|
from ProdCommon.BossLite.DbObjects.RunningJob import RunningJob
|
13 |
|
|
|
14 |
|
|
from ProdCommon.BossLite.API.BossLiteAPISched import BossLiteAPISched
|
15 |
slacapra |
1.1 |
|
16 |
|
|
class Boss:
|
17 |
|
|
def __init__(self):
|
18 |
spiga |
1.4 |
|
19 |
slacapra |
1.1 |
return
|
20 |
|
|
|
21 |
|
|
def __del__(self):
|
22 |
|
|
""" destroy instance """
|
23 |
|
|
del self.bossAdmin
|
24 |
|
|
del self.bossUser
|
25 |
|
|
return
|
26 |
|
|
|
27 |
|
|
|
28 |
|
|
#### Boss Configuration
|
29 |
|
|
|
30 |
|
|
def configure(self, cfg_params):
|
31 |
spiga |
1.4 |
|
32 |
slacapra |
1.1 |
self.cfg_params = cfg_params
|
33 |
spiga |
1.4 |
self.schedulerName = cfg_params.get("CRAB.scheduler",'') # this should match with the bosslite requirements
|
34 |
|
|
|
35 |
spiga |
1.5 |
SchedMap = {'glitecoll':'SchedulerGLiteAPI'}
|
36 |
|
|
|
37 |
spiga |
1.6 |
# schedulerConfig = { 'name' : SchedMap[self.schedulerName]} ## To improve DS
|
38 |
|
|
### Just temporary DS--BL
|
39 |
|
|
schedulerConfig = {
|
40 |
|
|
'name' : 'SchedulerGLiteAPI',
|
41 |
|
|
'service' :'https://wms104.cern.ch:7443/glite_wms_wmproxy_server',
|
42 |
|
|
'config' : '/afs/cern.ch/user/s/spiga/scratch0/WorkingDir/glite.conf.CMS_CNAF'
|
43 |
|
|
}
|
44 |
spiga |
1.5 |
|
45 |
spiga |
1.4 |
self.schedSession = BossLiteAPISched( common.bossSession, schedulerConfig)
|
46 |
|
|
|
47 |
slacapra |
1.1 |
self.outDir = cfg_params.get("USER.outputdir", common.work_space.resDir() )
|
48 |
|
|
self.logDir = cfg_params.get("USER.logdir", common.work_space.resDir() )
|
49 |
|
|
|
50 |
spiga |
1.4 |
self.return_data = cfg_params.get('USER.return_data',0)
|
51 |
slacapra |
1.1 |
|
52 |
|
|
return
|
53 |
|
|
|
54 |
|
|
#### End Boss Configuration
|
55 |
|
|
|
56 |
spiga |
1.4 |
def declareJob_(self, nj):
|
57 |
slacapra |
1.1 |
"""
|
58 |
|
|
BOSS declaration of jobs
|
59 |
|
|
"""
|
60 |
spiga |
1.4 |
index = nj - 1
|
61 |
|
|
job = common.job_list[index]
|
62 |
|
|
jbt = job.type()
|
63 |
|
|
base = jbt.name()
|
64 |
|
|
|
65 |
|
|
for id in range(nj):
|
66 |
|
|
parameters={}
|
67 |
|
|
jobs=[]
|
68 |
|
|
out=[]
|
69 |
|
|
stdout = base +'_'+ str(id)+'.stdout'
|
70 |
|
|
stderr = base +'_'+ str(id)+'.stderr'
|
71 |
|
|
jobs.append(id)
|
72 |
|
|
out=common._db.queryJob('outputFiles',jobs)[0]
|
73 |
|
|
out.append(stdout)
|
74 |
|
|
out.append(stderr)
|
75 |
|
|
out.append('.BrokerInfo')
|
76 |
|
|
parameters['outputFiles']=out
|
77 |
|
|
parameters['executable']=common._db.queryTask('scriptName') ## Should disappear...
|
78 |
|
|
## we'll have ONLY 'executable'
|
79 |
|
|
## as task field and not job field
|
80 |
|
|
parameters['standardOutput'] = stdout
|
81 |
|
|
parameters['standardError'] = stderr
|
82 |
|
|
|
83 |
|
|
common._db.updateJob_(id,parameters)
|
84 |
slacapra |
1.1 |
|
85 |
|
|
return
|
86 |
|
|
|
87 |
|
|
def task(self):
|
88 |
|
|
""" return Boss Task """
|
89 |
|
|
return self.bossTask
|
90 |
|
|
|
91 |
|
|
########################################## ---- OK for Boss4 ds
|
92 |
|
|
def listMatch(self, schedulerName, schcladstring):
|
93 |
|
|
"""
|
94 |
|
|
Check the compatibility of available resources
|
95 |
|
|
"""
|
96 |
|
|
Tout = 120
|
97 |
|
|
CEs=[]
|
98 |
|
|
try:
|
99 |
|
|
CEs=self.bossUser.schedListMatch( schedulerName, schcladstring, self.bossTask.id(), "", Tout)
|
100 |
|
|
common.logger.debug(1,"CEs :"+str(CEs))
|
101 |
|
|
except SchedulerError,e:
|
102 |
|
|
common.logger.message( "Warning : Scheduler interaction in list-match operation failed for jobs:")
|
103 |
|
|
common.logger.message( e.__str__())
|
104 |
|
|
pass
|
105 |
|
|
except BossError,e:
|
106 |
|
|
raise CrabException("ERROR: listMatch failed with message " + e.__str__())
|
107 |
|
|
return CEs
|
108 |
|
|
|
109 |
spiga |
1.6 |
def submit(self, jobsList):
|
110 |
slacapra |
1.1 |
"""
|
111 |
|
|
Submit BOSS function.
|
112 |
|
|
Submit one job. nj -- job number.
|
113 |
|
|
"""
|
114 |
spiga |
1.5 |
task = common._db.getTask()
|
115 |
|
|
#self.schedSession.submit( task, string.join(jobsList,','))
|
116 |
spiga |
1.6 |
self.schedSession.submit( task, jobsList)
|
117 |
spiga |
1.5 |
# try:
|
118 |
|
|
# except SchedulerError,e:
|
119 |
|
|
# common.logger.message("Warning : Scheduler interaction in submit operation failed for jobs:")
|
120 |
|
|
# common.logger.message(e.__str__())
|
121 |
|
|
# pass
|
122 |
|
|
# except BossError,e:
|
123 |
|
|
# common.logger.message("Error : BOSS command failed with message:")
|
124 |
|
|
# common.logger.message(e.__str__())
|
125 |
slacapra |
1.1 |
|
126 |
spiga |
1.5 |
# jid=[]
|
127 |
|
|
# bjid = []
|
128 |
|
|
# self.bossTask.clear()
|
129 |
|
|
# range = str(jobsList[0]) + ":" + str(jobsList[-1])
|
130 |
|
|
# try:
|
131 |
|
|
# self.bossTask.load(ALL, range)
|
132 |
|
|
# except SchedulerError,e:
|
133 |
|
|
# common.logger.message("Warning : Scheduler interaction in query operation failed for jobs:")
|
134 |
|
|
# common.logger.message(e.__str__())
|
135 |
|
|
# pass
|
136 |
|
|
# except BossError,e:
|
137 |
|
|
# common.logger.message("Error : BOSS command failed with message:")
|
138 |
|
|
# common.logger.message(e.__str__())
|
139 |
|
|
# task = self.bossTask.jobsDict()
|
140 |
slacapra |
1.1 |
|
141 |
spiga |
1.5 |
# for k, v in task.iteritems():
|
142 |
|
|
# if (v["STATUS"] != 'W'):
|
143 |
|
|
# jid.append(v["SCHED_ID"])
|
144 |
|
|
# bjid.append(k)
|
145 |
|
|
# pass
|
146 |
slacapra |
1.1 |
# if (v["STATUS"] == 'S'):
|
147 |
|
|
# jid.append(v["SCHED_ID"])
|
148 |
|
|
# bjid.append(k)
|
149 |
|
|
# pass
|
150 |
spiga |
1.5 |
return #jid, bjid
|
151 |
slacapra |
1.1 |
|
152 |
|
|
###################### ---- OK for Boss4 ds
|
153 |
|
|
def moveOutput(self, int_id):
|
154 |
|
|
"""
|
155 |
|
|
Move output of job already retrieved
|
156 |
|
|
"""
|
157 |
|
|
self.current_time = time.strftime('%y%m%d_%H%M%S',time.localtime(time.time()))
|
158 |
|
|
resDir = common.work_space.resDir()
|
159 |
|
|
resDirSave = resDir+'res_backup'
|
160 |
|
|
if not os.path.exists(resDirSave):
|
161 |
|
|
os.mkdir(resDirSave)
|
162 |
|
|
|
163 |
|
|
boss_id = str(int_id)
|
164 |
|
|
try:
|
165 |
|
|
self.bossTask.load (ALL, boss_id )
|
166 |
|
|
cmd_out = self.bossTask.program(boss_id, '1')['OUTFILES']
|
167 |
|
|
except BossError,e:
|
168 |
|
|
common.logger.message( e.__str__() )
|
169 |
|
|
|
170 |
|
|
files = cmd_out.split(',')
|
171 |
|
|
for i in files:
|
172 |
|
|
if os.path.exists(self.outDir+'/'+i):
|
173 |
|
|
shutil.move(self.outDir+'/'+i, resDirSave+'/'+i+'_'+self.current_time)
|
174 |
|
|
common.logger.message('Output file '+i+' moved to '+resDirSave)
|
175 |
|
|
|
176 |
|
|
if os.path.exists(self.logDir+'/'+i):
|
177 |
|
|
shutil.move(self.logDir+'/'+i, resDirSave+'/'+i+'_'+self.current_time)
|
178 |
|
|
common.logger.message('Output file '+i+' moved to '+resDirSave)
|
179 |
|
|
return
|
180 |
|
|
|
181 |
|
|
|
182 |
|
|
def queryDetailedStatus(self, id):
|
183 |
|
|
""" Query a detailed status of the job with id """
|
184 |
|
|
|
185 |
|
|
return self.boss_scheduler.queryDetailedStatus(id)
|
186 |
|
|
|
187 |
|
|
###################### ---- OK for Boss4 ds
|
188 |
|
|
def getOutput(self, int_id):
|
189 |
|
|
"""
|
190 |
|
|
Get output for a finished job with id.
|
191 |
|
|
Returns the name of directory with results.
|
192 |
|
|
"""
|
193 |
|
|
if not os.path.isdir(self.logDir) or not os.path.isdir(self.outDir):
|
194 |
|
|
msg = ' Output or Log dir not found!! check '+self.logDir+' and '+self.outDir
|
195 |
|
|
raise CrabException(msg)
|
196 |
|
|
common.jobDB.load()
|
197 |
|
|
allBoss_id = self.list()
|
198 |
|
|
bossTaskId = common.taskDB.dict('BossTaskId')
|
199 |
|
|
## first get the status of all job in the list
|
200 |
|
|
statusList = self.queryStatusList(bossTaskId, int_id)
|
201 |
|
|
check = 0
|
202 |
|
|
|
203 |
|
|
## then loop over jobs and retrieve it if it's the case
|
204 |
|
|
create= []
|
205 |
|
|
run= []
|
206 |
|
|
clear=[]
|
207 |
|
|
abort=[]
|
208 |
|
|
canc=[]
|
209 |
|
|
read=[]
|
210 |
|
|
wait=[]
|
211 |
|
|
sched=[]
|
212 |
|
|
kill=[]
|
213 |
|
|
other=[]
|
214 |
|
|
Tout=180
|
215 |
|
|
|
216 |
|
|
for i_id in int_id :
|
217 |
|
|
if i_id not in allBoss_id:
|
218 |
|
|
msg = 'Job # '+`int(i_id)`+' out of range for task '+ self.groupName
|
219 |
|
|
common.logger.message(msg)
|
220 |
|
|
else:
|
221 |
|
|
dir = self.outDir
|
222 |
|
|
logDir = self.logDir
|
223 |
|
|
boss_id = i_id
|
224 |
|
|
#bossTaskIdStatus = common.scheduler.queryStatus(bossTaskId, boss_id)
|
225 |
|
|
bossTaskIdStatus = statusList[boss_id]
|
226 |
|
|
if bossTaskIdStatus == 'Done (Success)' or bossTaskIdStatus == 'Done (Abort)':
|
227 |
|
|
check = 1
|
228 |
|
|
try:
|
229 |
|
|
self.bossTask.getOutput (str(boss_id), str(dir), Tout)
|
230 |
|
|
if logDir != dir:
|
231 |
|
|
try:
|
232 |
|
|
######
|
233 |
|
|
cmd = 'mv '+str(dir)+'/*'+str(i_id)+'.std* '+str(dir)+'/.BrokerInfo '+str(dir)+'/*.log '+str(logDir)
|
234 |
|
|
cmd_out =os.system(cmd)
|
235 |
|
|
msg = 'Results of Job # '+str(i_id)+' are in '+dir+' (log files are in '+logDir+')'
|
236 |
|
|
common.logger.message(msg)
|
237 |
|
|
#####
|
238 |
|
|
#toMove = str(dir)+'/*'+`int(i_id)`+'.std* '+str(dir)+'/*.log '+str(dir)+'/.BrokerInfo '
|
239 |
|
|
#shutil.move(toMove, str(logDir))
|
240 |
|
|
#####
|
241 |
|
|
except:
|
242 |
|
|
msg = 'Problem with copy of job results'
|
243 |
|
|
common.logger.message(msg)
|
244 |
|
|
pass
|
245 |
|
|
else:
|
246 |
|
|
msg = 'Results of Job # '+`int(i_id)`+' are in '+dir
|
247 |
|
|
common.logger.message(msg)
|
248 |
|
|
common.jobDB.setStatus(int(i_id)-1, 'Y')
|
249 |
|
|
except SchedulerError,e:
|
250 |
|
|
common.logger.message("Warning : Scheduler interaction in getOutput operation failed for jobs:")
|
251 |
|
|
common.logger.message(e.__str__())
|
252 |
|
|
pass
|
253 |
|
|
except BossError,e:
|
254 |
|
|
common.logger.message(e.__str__())
|
255 |
|
|
msg = 'Results of Job # '+`int(i_id)`+' have been corrupted and could not be retrieved.'
|
256 |
|
|
common.logger.message(msg)
|
257 |
|
|
common.jobDB.setStatus(int(i_id)-1, 'Z')
|
258 |
|
|
elif bossTaskIdStatus == 'Running' :
|
259 |
|
|
run.append(i_id)
|
260 |
|
|
# msg = 'Job # '+`int(i_id)`+' has status '+bossTaskIdStatus+'. It is not possible yet to retrieve the output.'
|
261 |
|
|
# common.logger.message(msg)
|
262 |
|
|
elif bossTaskIdStatus == 'Cleared' :
|
263 |
|
|
clear.append(i_id)
|
264 |
|
|
# msg = 'Job # '+`int(i_id)`+' has status '+bossTaskIdStatus+'. The output was already retrieved.'
|
265 |
|
|
# common.logger.message(msg)
|
266 |
|
|
elif bossTaskIdStatus == 'Aborted' :
|
267 |
|
|
abort.append(i_id)
|
268 |
|
|
# msg = 'Job # '+`int(i_id)`+' has status '+bossTaskIdStatus+'. It is not possible to retrieve the output.'
|
269 |
|
|
# common.logger.message(msg)
|
270 |
|
|
elif bossTaskIdStatus == 'Created' :
|
271 |
|
|
create.append(i_id)
|
272 |
|
|
elif bossTaskIdStatus == 'Cancelled' :
|
273 |
|
|
canc.append(i_id)
|
274 |
|
|
elif bossTaskIdStatus == 'Ready' :
|
275 |
|
|
read.append(i_id)
|
276 |
|
|
elif bossTaskIdStatus == 'Scheduled' :
|
277 |
|
|
sched.append(i_id)
|
278 |
|
|
elif bossTaskIdStatus == 'Waiting' :
|
279 |
|
|
wait.append(i_id)
|
280 |
|
|
elif bossTaskIdStatus == 'Killed' :
|
281 |
|
|
kill.append(i_id)
|
282 |
|
|
else:
|
283 |
|
|
other.append(i_id)
|
284 |
|
|
# msg = 'Job # '+`int(i_id)`+' has status '+bossTaskIdStatus+'. It is currently not possible to retrieve the output.'
|
285 |
|
|
# common.logger.message(msg)
|
286 |
|
|
dir += os.environ['USER']
|
287 |
|
|
dir += '_' + os.path.basename(str(boss_id))
|
288 |
|
|
pass
|
289 |
|
|
common.jobDB.save()
|
290 |
|
|
if check == 0:
|
291 |
|
|
msg = '\n\n*********No job in Done status. It is not possible yet to retrieve the output.\n'
|
292 |
|
|
common.logger.message(msg)
|
293 |
|
|
|
294 |
|
|
if len(clear)!=0: print str(len(clear))+' jobs already cleared'
|
295 |
|
|
if len(abort)!=0: print str(len(abort))+' jobs aborted'
|
296 |
|
|
if len(canc)!=0: print str(len(canc))+' jobs cancelled'
|
297 |
|
|
if len(kill)!=0: print str(len(kill))+' jobs killed'
|
298 |
|
|
if len(run)!=0: print str(len(run))+' jobs still running'
|
299 |
|
|
if len(sched)!=0: print str(len(sched))+' jobs scheduled'
|
300 |
|
|
if len(wait)!=0: print str(len(wait))+' jobs waiting'
|
301 |
|
|
if len(read)!=0: print str(len(read))+' jobs ready'
|
302 |
|
|
if len(other)!=0: print str(len(other))+' jobs submitted'
|
303 |
|
|
if len(create)!=0: print str(len(create))+' jobs not yet submitted'
|
304 |
|
|
|
305 |
|
|
print ' '
|
306 |
|
|
return
|
307 |
|
|
|
308 |
|
|
###################### ---- OK for Boss4 ds
|
309 |
|
|
def cancel(self,subm_id):
|
310 |
|
|
"""
|
311 |
|
|
Cancel the EDG job with id: if id == -1, means all jobs.
|
312 |
|
|
"""
|
313 |
|
|
#print "CANCEL -------------------------"
|
314 |
|
|
#print "int_id ",int_id," nSubmitted ", common.jobDB.nSubmittedJobs()
|
315 |
|
|
|
316 |
|
|
common.jobDB.load()
|
317 |
|
|
if len( subm_id ) > 0:
|
318 |
|
|
try:
|
319 |
|
|
subm_id.sort()
|
320 |
|
|
range = self.prepString( subm_id )
|
321 |
|
|
common.logger.message("Killing job # " + str(subm_id).replace("[","",1).replace("]","",1) )
|
322 |
|
|
Tout =len(subm_id)*60
|
323 |
|
|
self.bossTask.kill( range, Tout )
|
324 |
|
|
self.bossTask.load(ALL, range)
|
325 |
|
|
task = self.bossTask.jobsDict()
|
326 |
|
|
for k, v in task.iteritems():
|
327 |
|
|
k = int(k)
|
328 |
|
|
status = v['STATUS']
|
329 |
|
|
if k in subm_id and status == 'K':
|
330 |
|
|
common.jobDB.setStatus(k - 1, 'K')
|
331 |
|
|
except SchedulerError,e:
|
332 |
|
|
common.logger.message("Warning : Scheduler interaction on kill operation failed for jobs:"+ e.__str__())
|
333 |
|
|
pass
|
334 |
|
|
except BossError,e:
|
335 |
|
|
common.logger.message( e.__str__() + "\nError killing jobs # "+str(subm_id)+" . See log for details")
|
336 |
|
|
common.jobDB.save()
|
337 |
|
|
pass
|
338 |
|
|
else:
|
339 |
slacapra |
1.3 |
common.logger.message("\nNo job to be killed")
|
340 |
slacapra |
1.1 |
common.jobDB.save()
|
341 |
|
|
return
|
342 |
|
|
|
343 |
|
|
def setFlag( self, list, index ):
|
344 |
|
|
if len( list ) > (index + 1):
|
345 |
|
|
if list[index + 1] == ( list[index] + 1 ):
|
346 |
|
|
return -2
|
347 |
|
|
return -1
|
348 |
|
|
return list[ len(list) - 1 ]
|
349 |
|
|
|
350 |
|
|
def prepString( self, list ):
|
351 |
|
|
s = ""
|
352 |
|
|
flag = 0
|
353 |
|
|
for i in range( len( list ) ):
|
354 |
|
|
if flag == 0:
|
355 |
|
|
s = str( list[i] )
|
356 |
|
|
flag = self.setFlag( list, i )
|
357 |
|
|
elif flag == -1:
|
358 |
|
|
s = s + "," + str( list[i] )
|
359 |
|
|
flag = self.setFlag( list, i )
|
360 |
|
|
elif flag == -2:
|
361 |
|
|
flag = self.setFlag( list, i )
|
362 |
|
|
if flag == -1:
|
363 |
|
|
s = s + ":" + str( list[i] )
|
364 |
|
|
if flag > 0:
|
365 |
|
|
s = s + ":" + str( list[i] )
|
366 |
|
|
return s
|
367 |
|
|
|
368 |
|
|
################################################################ To remove when Boss4 store this info DS. (start)
|
369 |
|
|
def getAttribute(self, id, attr):
|
370 |
|
|
return self.boss_scheduler.getStatusAttribute_(id, attr)
|
371 |
|
|
|
372 |
|
|
def getExitStatus(self, id):
|
373 |
|
|
return self.boss_scheduler.getStatusAttribute_(id, 'exit_code')
|
374 |
|
|
|
375 |
|
|
def queryDest(self, id):
|
376 |
|
|
return self.boss_scheduler.getStatusAttribute_(id, 'destination')
|
377 |
|
|
################################################################ (stop)
|
378 |
|
|
|
379 |
|
|
############################## OK for BOSS4 ds.
|
380 |
|
|
############################# ----> we use the SID for the postMortem... probably this functionality come for free with BOSS4?
|
381 |
|
|
def boss_SID(self,int_ID):
|
382 |
|
|
""" Return Sid of job """
|
383 |
|
|
SID = ''
|
384 |
|
|
|
385 |
|
|
if common.jobDB.nSubmittedJobs() == 0:
|
386 |
|
|
common.jobDB.load()
|
387 |
|
|
|
388 |
|
|
SID = common.jobDB.jobId(int_ID-1)
|
389 |
|
|
|
390 |
|
|
return SID
|
391 |
|
|
|
392 |
|
|
##################################################
|
393 |
|
|
def queryEverything(self,taskid):
|
394 |
|
|
"""
|
395 |
|
|
Query needed info of all jobs with specified boss taskid
|
396 |
|
|
"""
|
397 |
|
|
|
398 |
|
|
results = {}
|
399 |
|
|
try:
|
400 |
|
|
# fill dictionary { 'bossid' : 'status' , ... }
|
401 |
|
|
nTot = common.jobDB.nJobs()
|
402 |
|
|
Tout = nTot*20
|
403 |
|
|
self.bossTask.query( ALL, timeout = Tout )
|
404 |
|
|
task = self.bossTask.jobsDict()
|
405 |
|
|
for c, v in task.iteritems():
|
406 |
|
|
k = int(c)
|
407 |
|
|
results[k] = { 'SCHED_ID' : v['SCHED_ID'], 'STATUS' : self.status[v['STATUS']], 'EXEC_HOST' : ['EXEC_HOST'] }
|
408 |
|
|
if v.has_key('STATUS_REASON') :
|
409 |
|
|
results[k]['STATUS_REASON'] = v['STATUS_REASON']
|
410 |
|
|
if v.has_key('LAST_T') :
|
411 |
|
|
results[k]['LAST_T'] = v['LAST_T']
|
412 |
|
|
if v.has_key('DEST_CE') :
|
413 |
|
|
results[k]['DEST_CE'] = v['DEST_CE']
|
414 |
|
|
if v.has_key('LB_TIMESTAMP') :
|
415 |
|
|
results[k]['LB_TIMESTAMP'] = v['LB_TIMESTAMP']
|
416 |
|
|
if v.has_key('RB') :
|
417 |
|
|
results[k]['RB'] = v['RB']
|
418 |
|
|
program = self.bossTask.specific(c, '1')
|
419 |
|
|
results[k]['EXE_EXIT_CODE'] = program['EXE_EXIT_CODE']
|
420 |
|
|
results[k]['JOB_EXIT_STATUS'] = program['JOB_EXIT_STATUS']
|
421 |
|
|
except SchedulerError,e:
|
422 |
|
|
common.logger.message("Warning : Scheduler interaction failed for jobs:")
|
423 |
|
|
common.logger.message(e.__str__())
|
424 |
|
|
pass
|
425 |
|
|
except BossError,e:
|
426 |
|
|
common.logger.message( e.__str__() )
|
427 |
|
|
pass
|
428 |
|
|
|
429 |
|
|
return results
|
430 |
|
|
|
431 |
|
|
##################################################
|
432 |
|
|
################################################## To change "much" when Boss4 store also this infos DS.
|
433 |
|
|
def queryEveryStatus(self,taskid):
|
434 |
|
|
""" Query a status of all jobs with specified boss taskid """
|
435 |
|
|
|
436 |
|
|
self.boss_scheduler.checkProxy()
|
437 |
|
|
|
438 |
|
|
results = {}
|
439 |
|
|
try:
|
440 |
|
|
nTot = common.jobDB.nJobs()
|
441 |
|
|
Tout = nTot*20
|
442 |
|
|
# fill dictionary { 'bossid' : 'status' , ... }
|
443 |
|
|
self.bossTask.query( ALL, timeout = Tout )
|
444 |
|
|
task = self.bossTask.jobsDict()
|
445 |
|
|
for k, v in task.iteritems():
|
446 |
|
|
results[k] = self.status[v['STATUS']]
|
447 |
|
|
except SchedulerError,e:
|
448 |
|
|
common.logger.message("Warning : Scheduler interaction on query operation failed for jobs:")
|
449 |
|
|
common.logger.message(e.__str__())
|
450 |
|
|
pass
|
451 |
|
|
except BossError,e:
|
452 |
|
|
common.logger.message( e.__str__() )
|
453 |
|
|
|
454 |
|
|
return results
|
455 |
|
|
|
456 |
|
|
##################################################
|
457 |
|
|
def queryStatusList(self,taskid,list_id):
|
458 |
|
|
""" Query a status of the job with id """
|
459 |
|
|
|
460 |
|
|
allBoss_id = self.list()
|
461 |
|
|
tmpQ = ''
|
462 |
|
|
if not len(allBoss_id)==len(list_id): tmpQ = string.join(map(str,list_id),",")
|
463 |
|
|
|
464 |
|
|
results = {}
|
465 |
|
|
try:
|
466 |
|
|
Tout = len(list_id)*20
|
467 |
|
|
# fill dictionary { 'bossid' : 'status' , ... }
|
468 |
|
|
self.bossTask.query( ALL, tmpQ, timeout = Tout )
|
469 |
|
|
task = self.bossTask.jobsDict()
|
470 |
|
|
for k, v in task.iteritems():
|
471 |
|
|
results[int(k)] = self.status[v['STATUS']]
|
472 |
|
|
except SchedulerError,e:
|
473 |
|
|
common.logger.message("Warning : Scheduler interaction on query operation failed for jobs:")
|
474 |
|
|
common.logger.message( e.__str__() )
|
475 |
|
|
pass
|
476 |
|
|
except BossError,e:
|
477 |
|
|
common.logger.message( e.__str__() )
|
478 |
|
|
|
479 |
|
|
return results
|
480 |
|
|
|
481 |
|
|
###################### ---- OK for Boss4 ds
|
482 |
|
|
def list(self):
|
483 |
|
|
"""
|
484 |
|
|
Return a list of all boss_Id of a task
|
485 |
|
|
"""
|
486 |
|
|
ListBoss_ID = []
|
487 |
|
|
task = self.bossTask.jobsDict()
|
488 |
|
|
for k, v in task.iteritems():
|
489 |
|
|
ListBoss_ID.append(int(k))
|
490 |
|
|
ListBoss_ID.sort()
|
491 |
|
|
listBoss_Uniq = []
|
492 |
|
|
for i in ListBoss_ID: # check if there double index
|
493 |
|
|
if i not in listBoss_Uniq: listBoss_Uniq.append(i)
|
494 |
|
|
return listBoss_Uniq
|
495 |
|
|
|
496 |
|
|
##################
|
497 |
|
|
def taskDeclared( self, taskName ):
|
498 |
|
|
taskDict = self.bossUser.loadByName( taskName )
|
499 |
|
|
return (len(taskDict) > 0)
|
500 |
|
|
|
501 |
|
|
def clean(self):
|
502 |
|
|
""" destroy boss instance """
|
503 |
|
|
del self.bossUser
|
504 |
|
|
return
|