ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Boss.py
Revision: 1.6
Committed: Fri Mar 7 09:27:51 2008 UTC (17 years, 1 month ago) by spiga
Content type: text/x-python
Branch: MAIN
Changes since 1.5: +9 -3 lines
Log Message:
 First version which submit jobs using blite. Not yet fully working. Minor problem with jdl arguments. Submission dose not take range yet

File Contents

# User Rev Content
1 slacapra 1.1 from crab_logger import Logger
2     from crab_exceptions import *
3     from crab_util import *
4     import common
5     import os, time, shutil
6    
7 spiga 1.4 from ProdCommon.BossLite.API.BossLiteAPI import BossLiteAPI
8    
9    
10     from ProdCommon.BossLite.DbObjects.Job import Job
11     from ProdCommon.BossLite.DbObjects.Task import Task
12     from ProdCommon.BossLite.DbObjects.RunningJob import RunningJob
13    
14     from ProdCommon.BossLite.API.BossLiteAPISched import BossLiteAPISched
15 slacapra 1.1
16     class Boss:
17     def __init__(self):
18 spiga 1.4
19 slacapra 1.1 return
20    
21     def __del__(self):
22     """ destroy instance """
23     del self.bossAdmin
24     del self.bossUser
25     return
26    
27    
28     #### Boss Configuration
29    
30     def configure(self, cfg_params):
31 spiga 1.4
32 slacapra 1.1 self.cfg_params = cfg_params
33 spiga 1.4 self.schedulerName = cfg_params.get("CRAB.scheduler",'') # this should match with the bosslite requirements
34    
35 spiga 1.5 SchedMap = {'glitecoll':'SchedulerGLiteAPI'}
36    
37 spiga 1.6 # schedulerConfig = { 'name' : SchedMap[self.schedulerName]} ## To improve DS
38     ### Just temporary DS--BL
39     schedulerConfig = {
40     'name' : 'SchedulerGLiteAPI',
41     'service' :'https://wms104.cern.ch:7443/glite_wms_wmproxy_server',
42     'config' : '/afs/cern.ch/user/s/spiga/scratch0/WorkingDir/glite.conf.CMS_CNAF'
43     }
44 spiga 1.5
45 spiga 1.4 self.schedSession = BossLiteAPISched( common.bossSession, schedulerConfig)
46    
47 slacapra 1.1 self.outDir = cfg_params.get("USER.outputdir", common.work_space.resDir() )
48     self.logDir = cfg_params.get("USER.logdir", common.work_space.resDir() )
49    
50 spiga 1.4 self.return_data = cfg_params.get('USER.return_data',0)
51 slacapra 1.1
52     return
53    
54     #### End Boss Configuration
55    
56 spiga 1.4 def declareJob_(self, nj):
57 slacapra 1.1 """
58     BOSS declaration of jobs
59     """
60 spiga 1.4 index = nj - 1
61     job = common.job_list[index]
62     jbt = job.type()
63     base = jbt.name()
64    
65     for id in range(nj):
66     parameters={}
67     jobs=[]
68     out=[]
69     stdout = base +'_'+ str(id)+'.stdout'
70     stderr = base +'_'+ str(id)+'.stderr'
71     jobs.append(id)
72     out=common._db.queryJob('outputFiles',jobs)[0]
73     out.append(stdout)
74     out.append(stderr)
75     out.append('.BrokerInfo')
76     parameters['outputFiles']=out
77     parameters['executable']=common._db.queryTask('scriptName') ## Should disappear...
78     ## we'll have ONLY 'executable'
79     ## as task field and not job field
80     parameters['standardOutput'] = stdout
81     parameters['standardError'] = stderr
82    
83     common._db.updateJob_(id,parameters)
84 slacapra 1.1
85     return
86    
87     def task(self):
88     """ return Boss Task """
89     return self.bossTask
90    
91     ########################################## ---- OK for Boss4 ds
92     def listMatch(self, schedulerName, schcladstring):
93     """
94     Check the compatibility of available resources
95     """
96     Tout = 120
97     CEs=[]
98     try:
99     CEs=self.bossUser.schedListMatch( schedulerName, schcladstring, self.bossTask.id(), "", Tout)
100     common.logger.debug(1,"CEs :"+str(CEs))
101     except SchedulerError,e:
102     common.logger.message( "Warning : Scheduler interaction in list-match operation failed for jobs:")
103     common.logger.message( e.__str__())
104     pass
105     except BossError,e:
106     raise CrabException("ERROR: listMatch failed with message " + e.__str__())
107     return CEs
108    
109 spiga 1.6 def submit(self, jobsList):
110 slacapra 1.1 """
111     Submit BOSS function.
112     Submit one job. nj -- job number.
113     """
114 spiga 1.5 task = common._db.getTask()
115     #self.schedSession.submit( task, string.join(jobsList,','))
116 spiga 1.6 self.schedSession.submit( task, jobsList)
117 spiga 1.5 # try:
118     # except SchedulerError,e:
119     # common.logger.message("Warning : Scheduler interaction in submit operation failed for jobs:")
120     # common.logger.message(e.__str__())
121     # pass
122     # except BossError,e:
123     # common.logger.message("Error : BOSS command failed with message:")
124     # common.logger.message(e.__str__())
125 slacapra 1.1
126 spiga 1.5 # jid=[]
127     # bjid = []
128     # self.bossTask.clear()
129     # range = str(jobsList[0]) + ":" + str(jobsList[-1])
130     # try:
131     # self.bossTask.load(ALL, range)
132     # except SchedulerError,e:
133     # common.logger.message("Warning : Scheduler interaction in query operation failed for jobs:")
134     # common.logger.message(e.__str__())
135     # pass
136     # except BossError,e:
137     # common.logger.message("Error : BOSS command failed with message:")
138     # common.logger.message(e.__str__())
139     # task = self.bossTask.jobsDict()
140 slacapra 1.1
141 spiga 1.5 # for k, v in task.iteritems():
142     # if (v["STATUS"] != 'W'):
143     # jid.append(v["SCHED_ID"])
144     # bjid.append(k)
145     # pass
146 slacapra 1.1 # if (v["STATUS"] == 'S'):
147     # jid.append(v["SCHED_ID"])
148     # bjid.append(k)
149     # pass
150 spiga 1.5 return #jid, bjid
151 slacapra 1.1
152     ###################### ---- OK for Boss4 ds
153     def moveOutput(self, int_id):
154     """
155     Move output of job already retrieved
156     """
157     self.current_time = time.strftime('%y%m%d_%H%M%S',time.localtime(time.time()))
158     resDir = common.work_space.resDir()
159     resDirSave = resDir+'res_backup'
160     if not os.path.exists(resDirSave):
161     os.mkdir(resDirSave)
162    
163     boss_id = str(int_id)
164     try:
165     self.bossTask.load (ALL, boss_id )
166     cmd_out = self.bossTask.program(boss_id, '1')['OUTFILES']
167     except BossError,e:
168     common.logger.message( e.__str__() )
169    
170     files = cmd_out.split(',')
171     for i in files:
172     if os.path.exists(self.outDir+'/'+i):
173     shutil.move(self.outDir+'/'+i, resDirSave+'/'+i+'_'+self.current_time)
174     common.logger.message('Output file '+i+' moved to '+resDirSave)
175    
176     if os.path.exists(self.logDir+'/'+i):
177     shutil.move(self.logDir+'/'+i, resDirSave+'/'+i+'_'+self.current_time)
178     common.logger.message('Output file '+i+' moved to '+resDirSave)
179     return
180    
181    
182     def queryDetailedStatus(self, id):
183     """ Query a detailed status of the job with id """
184    
185     return self.boss_scheduler.queryDetailedStatus(id)
186    
187     ###################### ---- OK for Boss4 ds
188     def getOutput(self, int_id):
189     """
190     Get output for a finished job with id.
191     Returns the name of directory with results.
192     """
193     if not os.path.isdir(self.logDir) or not os.path.isdir(self.outDir):
194     msg = ' Output or Log dir not found!! check '+self.logDir+' and '+self.outDir
195     raise CrabException(msg)
196     common.jobDB.load()
197     allBoss_id = self.list()
198     bossTaskId = common.taskDB.dict('BossTaskId')
199     ## first get the status of all job in the list
200     statusList = self.queryStatusList(bossTaskId, int_id)
201     check = 0
202    
203     ## then loop over jobs and retrieve it if it's the case
204     create= []
205     run= []
206     clear=[]
207     abort=[]
208     canc=[]
209     read=[]
210     wait=[]
211     sched=[]
212     kill=[]
213     other=[]
214     Tout=180
215    
216     for i_id in int_id :
217     if i_id not in allBoss_id:
218     msg = 'Job # '+`int(i_id)`+' out of range for task '+ self.groupName
219     common.logger.message(msg)
220     else:
221     dir = self.outDir
222     logDir = self.logDir
223     boss_id = i_id
224     #bossTaskIdStatus = common.scheduler.queryStatus(bossTaskId, boss_id)
225     bossTaskIdStatus = statusList[boss_id]
226     if bossTaskIdStatus == 'Done (Success)' or bossTaskIdStatus == 'Done (Abort)':
227     check = 1
228     try:
229     self.bossTask.getOutput (str(boss_id), str(dir), Tout)
230     if logDir != dir:
231     try:
232     ######
233     cmd = 'mv '+str(dir)+'/*'+str(i_id)+'.std* '+str(dir)+'/.BrokerInfo '+str(dir)+'/*.log '+str(logDir)
234     cmd_out =os.system(cmd)
235     msg = 'Results of Job # '+str(i_id)+' are in '+dir+' (log files are in '+logDir+')'
236     common.logger.message(msg)
237     #####
238     #toMove = str(dir)+'/*'+`int(i_id)`+'.std* '+str(dir)+'/*.log '+str(dir)+'/.BrokerInfo '
239     #shutil.move(toMove, str(logDir))
240     #####
241     except:
242     msg = 'Problem with copy of job results'
243     common.logger.message(msg)
244     pass
245     else:
246     msg = 'Results of Job # '+`int(i_id)`+' are in '+dir
247     common.logger.message(msg)
248     common.jobDB.setStatus(int(i_id)-1, 'Y')
249     except SchedulerError,e:
250     common.logger.message("Warning : Scheduler interaction in getOutput operation failed for jobs:")
251     common.logger.message(e.__str__())
252     pass
253     except BossError,e:
254     common.logger.message(e.__str__())
255     msg = 'Results of Job # '+`int(i_id)`+' have been corrupted and could not be retrieved.'
256     common.logger.message(msg)
257     common.jobDB.setStatus(int(i_id)-1, 'Z')
258     elif bossTaskIdStatus == 'Running' :
259     run.append(i_id)
260     # msg = 'Job # '+`int(i_id)`+' has status '+bossTaskIdStatus+'. It is not possible yet to retrieve the output.'
261     # common.logger.message(msg)
262     elif bossTaskIdStatus == 'Cleared' :
263     clear.append(i_id)
264     # msg = 'Job # '+`int(i_id)`+' has status '+bossTaskIdStatus+'. The output was already retrieved.'
265     # common.logger.message(msg)
266     elif bossTaskIdStatus == 'Aborted' :
267     abort.append(i_id)
268     # msg = 'Job # '+`int(i_id)`+' has status '+bossTaskIdStatus+'. It is not possible to retrieve the output.'
269     # common.logger.message(msg)
270     elif bossTaskIdStatus == 'Created' :
271     create.append(i_id)
272     elif bossTaskIdStatus == 'Cancelled' :
273     canc.append(i_id)
274     elif bossTaskIdStatus == 'Ready' :
275     read.append(i_id)
276     elif bossTaskIdStatus == 'Scheduled' :
277     sched.append(i_id)
278     elif bossTaskIdStatus == 'Waiting' :
279     wait.append(i_id)
280     elif bossTaskIdStatus == 'Killed' :
281     kill.append(i_id)
282     else:
283     other.append(i_id)
284     # msg = 'Job # '+`int(i_id)`+' has status '+bossTaskIdStatus+'. It is currently not possible to retrieve the output.'
285     # common.logger.message(msg)
286     dir += os.environ['USER']
287     dir += '_' + os.path.basename(str(boss_id))
288     pass
289     common.jobDB.save()
290     if check == 0:
291     msg = '\n\n*********No job in Done status. It is not possible yet to retrieve the output.\n'
292     common.logger.message(msg)
293    
294     if len(clear)!=0: print str(len(clear))+' jobs already cleared'
295     if len(abort)!=0: print str(len(abort))+' jobs aborted'
296     if len(canc)!=0: print str(len(canc))+' jobs cancelled'
297     if len(kill)!=0: print str(len(kill))+' jobs killed'
298     if len(run)!=0: print str(len(run))+' jobs still running'
299     if len(sched)!=0: print str(len(sched))+' jobs scheduled'
300     if len(wait)!=0: print str(len(wait))+' jobs waiting'
301     if len(read)!=0: print str(len(read))+' jobs ready'
302     if len(other)!=0: print str(len(other))+' jobs submitted'
303     if len(create)!=0: print str(len(create))+' jobs not yet submitted'
304    
305     print ' '
306     return
307    
308     ###################### ---- OK for Boss4 ds
309     def cancel(self,subm_id):
310     """
311     Cancel the EDG job with id: if id == -1, means all jobs.
312     """
313     #print "CANCEL -------------------------"
314     #print "int_id ",int_id," nSubmitted ", common.jobDB.nSubmittedJobs()
315    
316     common.jobDB.load()
317     if len( subm_id ) > 0:
318     try:
319     subm_id.sort()
320     range = self.prepString( subm_id )
321     common.logger.message("Killing job # " + str(subm_id).replace("[","",1).replace("]","",1) )
322     Tout =len(subm_id)*60
323     self.bossTask.kill( range, Tout )
324     self.bossTask.load(ALL, range)
325     task = self.bossTask.jobsDict()
326     for k, v in task.iteritems():
327     k = int(k)
328     status = v['STATUS']
329     if k in subm_id and status == 'K':
330     common.jobDB.setStatus(k - 1, 'K')
331     except SchedulerError,e:
332     common.logger.message("Warning : Scheduler interaction on kill operation failed for jobs:"+ e.__str__())
333     pass
334     except BossError,e:
335     common.logger.message( e.__str__() + "\nError killing jobs # "+str(subm_id)+" . See log for details")
336     common.jobDB.save()
337     pass
338     else:
339 slacapra 1.3 common.logger.message("\nNo job to be killed")
340 slacapra 1.1 common.jobDB.save()
341     return
342    
343     def setFlag( self, list, index ):
344     if len( list ) > (index + 1):
345     if list[index + 1] == ( list[index] + 1 ):
346     return -2
347     return -1
348     return list[ len(list) - 1 ]
349    
350     def prepString( self, list ):
351     s = ""
352     flag = 0
353     for i in range( len( list ) ):
354     if flag == 0:
355     s = str( list[i] )
356     flag = self.setFlag( list, i )
357     elif flag == -1:
358     s = s + "," + str( list[i] )
359     flag = self.setFlag( list, i )
360     elif flag == -2:
361     flag = self.setFlag( list, i )
362     if flag == -1:
363     s = s + ":" + str( list[i] )
364     if flag > 0:
365     s = s + ":" + str( list[i] )
366     return s
367    
368     ################################################################ To remove when Boss4 store this info DS. (start)
369     def getAttribute(self, id, attr):
370     return self.boss_scheduler.getStatusAttribute_(id, attr)
371    
372     def getExitStatus(self, id):
373     return self.boss_scheduler.getStatusAttribute_(id, 'exit_code')
374    
375     def queryDest(self, id):
376     return self.boss_scheduler.getStatusAttribute_(id, 'destination')
377     ################################################################ (stop)
378    
379     ############################## OK for BOSS4 ds.
380     ############################# ----> we use the SID for the postMortem... probably this functionality come for free with BOSS4?
381     def boss_SID(self,int_ID):
382     """ Return Sid of job """
383     SID = ''
384    
385     if common.jobDB.nSubmittedJobs() == 0:
386     common.jobDB.load()
387    
388     SID = common.jobDB.jobId(int_ID-1)
389    
390     return SID
391    
392     ##################################################
393     def queryEverything(self,taskid):
394     """
395     Query needed info of all jobs with specified boss taskid
396     """
397    
398     results = {}
399     try:
400     # fill dictionary { 'bossid' : 'status' , ... }
401     nTot = common.jobDB.nJobs()
402     Tout = nTot*20
403     self.bossTask.query( ALL, timeout = Tout )
404     task = self.bossTask.jobsDict()
405     for c, v in task.iteritems():
406     k = int(c)
407     results[k] = { 'SCHED_ID' : v['SCHED_ID'], 'STATUS' : self.status[v['STATUS']], 'EXEC_HOST' : ['EXEC_HOST'] }
408     if v.has_key('STATUS_REASON') :
409     results[k]['STATUS_REASON'] = v['STATUS_REASON']
410     if v.has_key('LAST_T') :
411     results[k]['LAST_T'] = v['LAST_T']
412     if v.has_key('DEST_CE') :
413     results[k]['DEST_CE'] = v['DEST_CE']
414     if v.has_key('LB_TIMESTAMP') :
415     results[k]['LB_TIMESTAMP'] = v['LB_TIMESTAMP']
416     if v.has_key('RB') :
417     results[k]['RB'] = v['RB']
418     program = self.bossTask.specific(c, '1')
419     results[k]['EXE_EXIT_CODE'] = program['EXE_EXIT_CODE']
420     results[k]['JOB_EXIT_STATUS'] = program['JOB_EXIT_STATUS']
421     except SchedulerError,e:
422     common.logger.message("Warning : Scheduler interaction failed for jobs:")
423     common.logger.message(e.__str__())
424     pass
425     except BossError,e:
426     common.logger.message( e.__str__() )
427     pass
428    
429     return results
430    
431     ##################################################
432     ################################################## To change "much" when Boss4 store also this infos DS.
433     def queryEveryStatus(self,taskid):
434     """ Query a status of all jobs with specified boss taskid """
435    
436     self.boss_scheduler.checkProxy()
437    
438     results = {}
439     try:
440     nTot = common.jobDB.nJobs()
441     Tout = nTot*20
442     # fill dictionary { 'bossid' : 'status' , ... }
443     self.bossTask.query( ALL, timeout = Tout )
444     task = self.bossTask.jobsDict()
445     for k, v in task.iteritems():
446     results[k] = self.status[v['STATUS']]
447     except SchedulerError,e:
448     common.logger.message("Warning : Scheduler interaction on query operation failed for jobs:")
449     common.logger.message(e.__str__())
450     pass
451     except BossError,e:
452     common.logger.message( e.__str__() )
453    
454     return results
455    
456     ##################################################
457     def queryStatusList(self,taskid,list_id):
458     """ Query a status of the job with id """
459    
460     allBoss_id = self.list()
461     tmpQ = ''
462     if not len(allBoss_id)==len(list_id): tmpQ = string.join(map(str,list_id),",")
463    
464     results = {}
465     try:
466     Tout = len(list_id)*20
467     # fill dictionary { 'bossid' : 'status' , ... }
468     self.bossTask.query( ALL, tmpQ, timeout = Tout )
469     task = self.bossTask.jobsDict()
470     for k, v in task.iteritems():
471     results[int(k)] = self.status[v['STATUS']]
472     except SchedulerError,e:
473     common.logger.message("Warning : Scheduler interaction on query operation failed for jobs:")
474     common.logger.message( e.__str__() )
475     pass
476     except BossError,e:
477     common.logger.message( e.__str__() )
478    
479     return results
480    
481     ###################### ---- OK for Boss4 ds
482     def list(self):
483     """
484     Return a list of all boss_Id of a task
485     """
486     ListBoss_ID = []
487     task = self.bossTask.jobsDict()
488     for k, v in task.iteritems():
489     ListBoss_ID.append(int(k))
490     ListBoss_ID.sort()
491     listBoss_Uniq = []
492     for i in ListBoss_ID: # check if there double index
493     if i not in listBoss_Uniq: listBoss_Uniq.append(i)
494     return listBoss_Uniq
495    
496     ##################
497     def taskDeclared( self, taskName ):
498     taskDict = self.bossUser.loadByName( taskName )
499     return (len(taskDict) > 0)
500    
501     def clean(self):
502     """ destroy boss instance """
503     del self.bossUser
504     return