ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Boss.py
Revision: 1.7
Committed: Fri Mar 7 17:25:23 2008 UTC (17 years, 1 month ago) by slacapra
Content type: text/x-python
Branch: MAIN
Changes since 1.6: +16 -4 lines
Log Message:
initial mods to have LSF working with BossLite: still way to go...

File Contents

# User Rev Content
1 slacapra 1.1 from crab_logger import Logger
2     from crab_exceptions import *
3     from crab_util import *
4     import common
5     import os, time, shutil
6    
7 spiga 1.4 from ProdCommon.BossLite.API.BossLiteAPI import BossLiteAPI
8    
9    
10     from ProdCommon.BossLite.DbObjects.Job import Job
11     from ProdCommon.BossLite.DbObjects.Task import Task
12     from ProdCommon.BossLite.DbObjects.RunningJob import RunningJob
13    
14     from ProdCommon.BossLite.API.BossLiteAPISched import BossLiteAPISched
15 slacapra 1.1
16     class Boss:
17     def __init__(self):
18 spiga 1.4
19 slacapra 1.1 return
20    
21     def __del__(self):
22     """ destroy instance """
23     del self.bossAdmin
24     del self.bossUser
25     return
26    
27    
28     #### Boss Configuration
29    
30     def configure(self, cfg_params):
31 spiga 1.4
32 slacapra 1.1 self.cfg_params = cfg_params
33 spiga 1.4 self.schedulerName = cfg_params.get("CRAB.scheduler",'') # this should match with the bosslite requirements
34    
35 spiga 1.6 # schedulerConfig = { 'name' : SchedMap[self.schedulerName]} ## To improve DS
36     ### Just temporary DS--BL
37 slacapra 1.7 schedulerConfigGlite = {
38 spiga 1.6 'name' : 'SchedulerGLiteAPI',
39     'service' :'https://wms104.cern.ch:7443/glite_wms_wmproxy_server',
40     'config' : '/afs/cern.ch/user/s/spiga/scratch0/WorkingDir/glite.conf.CMS_CNAF'
41     }
42 spiga 1.5
43 slacapra 1.7 schedulerConfigLsf = {
44     'name' : 'SchedulerLsf',
45     'service' :'',
46     'config' : ''
47     }
48    
49     SchedMap = {'glitecoll': schedulerConfigGlite,
50     'lsf': schedulerConfigLsf}
51    
52     self.schedSession = BossLiteAPISched( common.bossSession, SchedMap[self.schedulerName])
53 spiga 1.4
54 slacapra 1.1 self.outDir = cfg_params.get("USER.outputdir", common.work_space.resDir() )
55     self.logDir = cfg_params.get("USER.logdir", common.work_space.resDir() )
56    
57 spiga 1.4 self.return_data = cfg_params.get('USER.return_data',0)
58 slacapra 1.1
59     return
60    
61     #### End Boss Configuration
62    
63 spiga 1.4 def declareJob_(self, nj):
64 slacapra 1.1 """
65     BOSS declaration of jobs
66     """
67 spiga 1.4 index = nj - 1
68     job = common.job_list[index]
69     jbt = job.type()
70     base = jbt.name()
71    
72     for id in range(nj):
73     parameters={}
74     jobs=[]
75     out=[]
76     stdout = base +'_'+ str(id)+'.stdout'
77     stderr = base +'_'+ str(id)+'.stderr'
78     jobs.append(id)
79     out=common._db.queryJob('outputFiles',jobs)[0]
80     out.append(stdout)
81     out.append(stderr)
82     out.append('.BrokerInfo')
83     parameters['outputFiles']=out
84     parameters['executable']=common._db.queryTask('scriptName') ## Should disappear...
85     ## we'll have ONLY 'executable'
86     ## as task field and not job field
87     parameters['standardOutput'] = stdout
88     parameters['standardError'] = stderr
89    
90     common._db.updateJob_(id,parameters)
91 slacapra 1.1
92     return
93    
94     def task(self):
95     """ return Boss Task """
96     return self.bossTask
97    
98     ########################################## ---- OK for Boss4 ds
99     def listMatch(self, schedulerName, schcladstring):
100     """
101     Check the compatibility of available resources
102     """
103     Tout = 120
104     CEs=[]
105     try:
106     CEs=self.bossUser.schedListMatch( schedulerName, schcladstring, self.bossTask.id(), "", Tout)
107     common.logger.debug(1,"CEs :"+str(CEs))
108     except SchedulerError,e:
109     common.logger.message( "Warning : Scheduler interaction in list-match operation failed for jobs:")
110     common.logger.message( e.__str__())
111     pass
112     except BossError,e:
113     raise CrabException("ERROR: listMatch failed with message " + e.__str__())
114     return CEs
115    
116 spiga 1.6 def submit(self, jobsList):
117 slacapra 1.1 """
118     Submit BOSS function.
119     Submit one job. nj -- job number.
120     """
121 slacapra 1.7 # job = common._db.getJob(jobsList)
122     # #self.schedSession.submit( task, string.join(jobsList,','))
123     # self.schedSession.submit( job, jobsList)
124    
125 spiga 1.5 task = common._db.getTask()
126     #self.schedSession.submit( task, string.join(jobsList,','))
127 spiga 1.6 self.schedSession.submit( task, jobsList)
128 slacapra 1.7
129 spiga 1.5 # try:
130     # except SchedulerError,e:
131     # common.logger.message("Warning : Scheduler interaction in submit operation failed for jobs:")
132     # common.logger.message(e.__str__())
133     # pass
134     # except BossError,e:
135     # common.logger.message("Error : BOSS command failed with message:")
136     # common.logger.message(e.__str__())
137 slacapra 1.1
138 spiga 1.5 # jid=[]
139     # bjid = []
140     # self.bossTask.clear()
141     # range = str(jobsList[0]) + ":" + str(jobsList[-1])
142     # try:
143     # self.bossTask.load(ALL, range)
144     # except SchedulerError,e:
145     # common.logger.message("Warning : Scheduler interaction in query operation failed for jobs:")
146     # common.logger.message(e.__str__())
147     # pass
148     # except BossError,e:
149     # common.logger.message("Error : BOSS command failed with message:")
150     # common.logger.message(e.__str__())
151     # task = self.bossTask.jobsDict()
152 slacapra 1.1
153 spiga 1.5 # for k, v in task.iteritems():
154     # if (v["STATUS"] != 'W'):
155     # jid.append(v["SCHED_ID"])
156     # bjid.append(k)
157     # pass
158 slacapra 1.1 # if (v["STATUS"] == 'S'):
159     # jid.append(v["SCHED_ID"])
160     # bjid.append(k)
161     # pass
162 spiga 1.5 return #jid, bjid
163 slacapra 1.1
164     ###################### ---- OK for Boss4 ds
165     def moveOutput(self, int_id):
166     """
167     Move output of job already retrieved
168     """
169     self.current_time = time.strftime('%y%m%d_%H%M%S',time.localtime(time.time()))
170     resDir = common.work_space.resDir()
171     resDirSave = resDir+'res_backup'
172     if not os.path.exists(resDirSave):
173     os.mkdir(resDirSave)
174    
175     boss_id = str(int_id)
176     try:
177     self.bossTask.load (ALL, boss_id )
178     cmd_out = self.bossTask.program(boss_id, '1')['OUTFILES']
179     except BossError,e:
180     common.logger.message( e.__str__() )
181    
182     files = cmd_out.split(',')
183     for i in files:
184     if os.path.exists(self.outDir+'/'+i):
185     shutil.move(self.outDir+'/'+i, resDirSave+'/'+i+'_'+self.current_time)
186     common.logger.message('Output file '+i+' moved to '+resDirSave)
187    
188     if os.path.exists(self.logDir+'/'+i):
189     shutil.move(self.logDir+'/'+i, resDirSave+'/'+i+'_'+self.current_time)
190     common.logger.message('Output file '+i+' moved to '+resDirSave)
191     return
192    
193    
194     def queryDetailedStatus(self, id):
195     """ Query a detailed status of the job with id """
196    
197     return self.boss_scheduler.queryDetailedStatus(id)
198    
199     ###################### ---- OK for Boss4 ds
200     def getOutput(self, int_id):
201     """
202     Get output for a finished job with id.
203     Returns the name of directory with results.
204     """
205     if not os.path.isdir(self.logDir) or not os.path.isdir(self.outDir):
206     msg = ' Output or Log dir not found!! check '+self.logDir+' and '+self.outDir
207     raise CrabException(msg)
208     common.jobDB.load()
209     allBoss_id = self.list()
210     bossTaskId = common.taskDB.dict('BossTaskId')
211     ## first get the status of all job in the list
212     statusList = self.queryStatusList(bossTaskId, int_id)
213     check = 0
214    
215     ## then loop over jobs and retrieve it if it's the case
216     create= []
217     run= []
218     clear=[]
219     abort=[]
220     canc=[]
221     read=[]
222     wait=[]
223     sched=[]
224     kill=[]
225     other=[]
226     Tout=180
227    
228     for i_id in int_id :
229     if i_id not in allBoss_id:
230     msg = 'Job # '+`int(i_id)`+' out of range for task '+ self.groupName
231     common.logger.message(msg)
232     else:
233     dir = self.outDir
234     logDir = self.logDir
235     boss_id = i_id
236     #bossTaskIdStatus = common.scheduler.queryStatus(bossTaskId, boss_id)
237     bossTaskIdStatus = statusList[boss_id]
238     if bossTaskIdStatus == 'Done (Success)' or bossTaskIdStatus == 'Done (Abort)':
239     check = 1
240     try:
241     self.bossTask.getOutput (str(boss_id), str(dir), Tout)
242     if logDir != dir:
243     try:
244     ######
245     cmd = 'mv '+str(dir)+'/*'+str(i_id)+'.std* '+str(dir)+'/.BrokerInfo '+str(dir)+'/*.log '+str(logDir)
246     cmd_out =os.system(cmd)
247     msg = 'Results of Job # '+str(i_id)+' are in '+dir+' (log files are in '+logDir+')'
248     common.logger.message(msg)
249     #####
250     #toMove = str(dir)+'/*'+`int(i_id)`+'.std* '+str(dir)+'/*.log '+str(dir)+'/.BrokerInfo '
251     #shutil.move(toMove, str(logDir))
252     #####
253     except:
254     msg = 'Problem with copy of job results'
255     common.logger.message(msg)
256     pass
257     else:
258     msg = 'Results of Job # '+`int(i_id)`+' are in '+dir
259     common.logger.message(msg)
260     common.jobDB.setStatus(int(i_id)-1, 'Y')
261     except SchedulerError,e:
262     common.logger.message("Warning : Scheduler interaction in getOutput operation failed for jobs:")
263     common.logger.message(e.__str__())
264     pass
265     except BossError,e:
266     common.logger.message(e.__str__())
267     msg = 'Results of Job # '+`int(i_id)`+' have been corrupted and could not be retrieved.'
268     common.logger.message(msg)
269     common.jobDB.setStatus(int(i_id)-1, 'Z')
270     elif bossTaskIdStatus == 'Running' :
271     run.append(i_id)
272     # msg = 'Job # '+`int(i_id)`+' has status '+bossTaskIdStatus+'. It is not possible yet to retrieve the output.'
273     # common.logger.message(msg)
274     elif bossTaskIdStatus == 'Cleared' :
275     clear.append(i_id)
276     # msg = 'Job # '+`int(i_id)`+' has status '+bossTaskIdStatus+'. The output was already retrieved.'
277     # common.logger.message(msg)
278     elif bossTaskIdStatus == 'Aborted' :
279     abort.append(i_id)
280     # msg = 'Job # '+`int(i_id)`+' has status '+bossTaskIdStatus+'. It is not possible to retrieve the output.'
281     # common.logger.message(msg)
282     elif bossTaskIdStatus == 'Created' :
283     create.append(i_id)
284     elif bossTaskIdStatus == 'Cancelled' :
285     canc.append(i_id)
286     elif bossTaskIdStatus == 'Ready' :
287     read.append(i_id)
288     elif bossTaskIdStatus == 'Scheduled' :
289     sched.append(i_id)
290     elif bossTaskIdStatus == 'Waiting' :
291     wait.append(i_id)
292     elif bossTaskIdStatus == 'Killed' :
293     kill.append(i_id)
294     else:
295     other.append(i_id)
296     # msg = 'Job # '+`int(i_id)`+' has status '+bossTaskIdStatus+'. It is currently not possible to retrieve the output.'
297     # common.logger.message(msg)
298     dir += os.environ['USER']
299     dir += '_' + os.path.basename(str(boss_id))
300     pass
301     common.jobDB.save()
302     if check == 0:
303     msg = '\n\n*********No job in Done status. It is not possible yet to retrieve the output.\n'
304     common.logger.message(msg)
305    
306     if len(clear)!=0: print str(len(clear))+' jobs already cleared'
307     if len(abort)!=0: print str(len(abort))+' jobs aborted'
308     if len(canc)!=0: print str(len(canc))+' jobs cancelled'
309     if len(kill)!=0: print str(len(kill))+' jobs killed'
310     if len(run)!=0: print str(len(run))+' jobs still running'
311     if len(sched)!=0: print str(len(sched))+' jobs scheduled'
312     if len(wait)!=0: print str(len(wait))+' jobs waiting'
313     if len(read)!=0: print str(len(read))+' jobs ready'
314     if len(other)!=0: print str(len(other))+' jobs submitted'
315     if len(create)!=0: print str(len(create))+' jobs not yet submitted'
316    
317     print ' '
318     return
319    
320     ###################### ---- OK for Boss4 ds
321     def cancel(self,subm_id):
322     """
323     Cancel the EDG job with id: if id == -1, means all jobs.
324     """
325     #print "CANCEL -------------------------"
326     #print "int_id ",int_id," nSubmitted ", common.jobDB.nSubmittedJobs()
327    
328     common.jobDB.load()
329     if len( subm_id ) > 0:
330     try:
331     subm_id.sort()
332     range = self.prepString( subm_id )
333     common.logger.message("Killing job # " + str(subm_id).replace("[","",1).replace("]","",1) )
334     Tout =len(subm_id)*60
335     self.bossTask.kill( range, Tout )
336     self.bossTask.load(ALL, range)
337     task = self.bossTask.jobsDict()
338     for k, v in task.iteritems():
339     k = int(k)
340     status = v['STATUS']
341     if k in subm_id and status == 'K':
342     common.jobDB.setStatus(k - 1, 'K')
343     except SchedulerError,e:
344     common.logger.message("Warning : Scheduler interaction on kill operation failed for jobs:"+ e.__str__())
345     pass
346     except BossError,e:
347     common.logger.message( e.__str__() + "\nError killing jobs # "+str(subm_id)+" . See log for details")
348     common.jobDB.save()
349     pass
350     else:
351 slacapra 1.3 common.logger.message("\nNo job to be killed")
352 slacapra 1.1 common.jobDB.save()
353     return
354    
355     def setFlag( self, list, index ):
356     if len( list ) > (index + 1):
357     if list[index + 1] == ( list[index] + 1 ):
358     return -2
359     return -1
360     return list[ len(list) - 1 ]
361    
362     def prepString( self, list ):
363     s = ""
364     flag = 0
365     for i in range( len( list ) ):
366     if flag == 0:
367     s = str( list[i] )
368     flag = self.setFlag( list, i )
369     elif flag == -1:
370     s = s + "," + str( list[i] )
371     flag = self.setFlag( list, i )
372     elif flag == -2:
373     flag = self.setFlag( list, i )
374     if flag == -1:
375     s = s + ":" + str( list[i] )
376     if flag > 0:
377     s = s + ":" + str( list[i] )
378     return s
379    
380     ################################################################ To remove when Boss4 store this info DS. (start)
381     def getAttribute(self, id, attr):
382     return self.boss_scheduler.getStatusAttribute_(id, attr)
383    
384     def getExitStatus(self, id):
385     return self.boss_scheduler.getStatusAttribute_(id, 'exit_code')
386    
387     def queryDest(self, id):
388     return self.boss_scheduler.getStatusAttribute_(id, 'destination')
389     ################################################################ (stop)
390    
391     ############################## OK for BOSS4 ds.
392     ############################# ----> we use the SID for the postMortem... probably this functionality come for free with BOSS4?
393     def boss_SID(self,int_ID):
394     """ Return Sid of job """
395     SID = ''
396    
397     if common.jobDB.nSubmittedJobs() == 0:
398     common.jobDB.load()
399    
400     SID = common.jobDB.jobId(int_ID-1)
401    
402     return SID
403    
404     ##################################################
405     def queryEverything(self,taskid):
406     """
407     Query needed info of all jobs with specified boss taskid
408     """
409    
410     results = {}
411     try:
412     # fill dictionary { 'bossid' : 'status' , ... }
413     nTot = common.jobDB.nJobs()
414     Tout = nTot*20
415     self.bossTask.query( ALL, timeout = Tout )
416     task = self.bossTask.jobsDict()
417     for c, v in task.iteritems():
418     k = int(c)
419     results[k] = { 'SCHED_ID' : v['SCHED_ID'], 'STATUS' : self.status[v['STATUS']], 'EXEC_HOST' : ['EXEC_HOST'] }
420     if v.has_key('STATUS_REASON') :
421     results[k]['STATUS_REASON'] = v['STATUS_REASON']
422     if v.has_key('LAST_T') :
423     results[k]['LAST_T'] = v['LAST_T']
424     if v.has_key('DEST_CE') :
425     results[k]['DEST_CE'] = v['DEST_CE']
426     if v.has_key('LB_TIMESTAMP') :
427     results[k]['LB_TIMESTAMP'] = v['LB_TIMESTAMP']
428     if v.has_key('RB') :
429     results[k]['RB'] = v['RB']
430     program = self.bossTask.specific(c, '1')
431     results[k]['EXE_EXIT_CODE'] = program['EXE_EXIT_CODE']
432     results[k]['JOB_EXIT_STATUS'] = program['JOB_EXIT_STATUS']
433     except SchedulerError,e:
434     common.logger.message("Warning : Scheduler interaction failed for jobs:")
435     common.logger.message(e.__str__())
436     pass
437     except BossError,e:
438     common.logger.message( e.__str__() )
439     pass
440    
441     return results
442    
443     ##################################################
444     ################################################## To change "much" when Boss4 store also this infos DS.
445     def queryEveryStatus(self,taskid):
446     """ Query a status of all jobs with specified boss taskid """
447    
448     self.boss_scheduler.checkProxy()
449    
450     results = {}
451     try:
452     nTot = common.jobDB.nJobs()
453     Tout = nTot*20
454     # fill dictionary { 'bossid' : 'status' , ... }
455     self.bossTask.query( ALL, timeout = Tout )
456     task = self.bossTask.jobsDict()
457     for k, v in task.iteritems():
458     results[k] = self.status[v['STATUS']]
459     except SchedulerError,e:
460     common.logger.message("Warning : Scheduler interaction on query operation failed for jobs:")
461     common.logger.message(e.__str__())
462     pass
463     except BossError,e:
464     common.logger.message( e.__str__() )
465    
466     return results
467    
468     ##################################################
469     def queryStatusList(self,taskid,list_id):
470     """ Query a status of the job with id """
471    
472     allBoss_id = self.list()
473     tmpQ = ''
474     if not len(allBoss_id)==len(list_id): tmpQ = string.join(map(str,list_id),",")
475    
476     results = {}
477     try:
478     Tout = len(list_id)*20
479     # fill dictionary { 'bossid' : 'status' , ... }
480     self.bossTask.query( ALL, tmpQ, timeout = Tout )
481     task = self.bossTask.jobsDict()
482     for k, v in task.iteritems():
483     results[int(k)] = self.status[v['STATUS']]
484     except SchedulerError,e:
485     common.logger.message("Warning : Scheduler interaction on query operation failed for jobs:")
486     common.logger.message( e.__str__() )
487     pass
488     except BossError,e:
489     common.logger.message( e.__str__() )
490    
491     return results
492    
493     ###################### ---- OK for Boss4 ds
494     def list(self):
495     """
496     Return a list of all boss_Id of a task
497     """
498     ListBoss_ID = []
499     task = self.bossTask.jobsDict()
500     for k, v in task.iteritems():
501     ListBoss_ID.append(int(k))
502     ListBoss_ID.sort()
503     listBoss_Uniq = []
504     for i in ListBoss_ID: # check if there double index
505     if i not in listBoss_Uniq: listBoss_Uniq.append(i)
506     return listBoss_Uniq
507    
508     ##################
509     def taskDeclared( self, taskName ):
510     taskDict = self.bossUser.loadByName( taskName )
511     return (len(taskDict) > 0)
512    
513     def clean(self):
514     """ destroy boss instance """
515     del self.bossUser
516     return