ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Boss.py
Revision: 1.8
Committed: Mon Mar 17 14:00:59 2008 UTC (17 years, 1 month ago) by spiga
Content type: text/x-python
Branch: MAIN
Changes since 1.7: +30 -67 lines
Log Message:
Many changes to integrate BossLite. Creation step fully implemented and optimized... Submission is now working again. Here the missing things are the support for jobs submission by range, the message sending to ML, and the listmatch_match check. Actually the requirements can be changed on the fly as was in the past. The status is fully working with BossLite. The exit code display is not there since the new boss does not implement the RealTime mon.. here the functionality is under development by Federica: to be integrated.

File Contents

# User Rev Content
1 slacapra 1.1 from crab_logger import Logger
2     from crab_exceptions import *
3     from crab_util import *
4     import common
5     import os, time, shutil
6    
7 spiga 1.4 from ProdCommon.BossLite.API.BossLiteAPI import BossLiteAPI
8    
9    
10     from ProdCommon.BossLite.DbObjects.Job import Job
11     from ProdCommon.BossLite.DbObjects.Task import Task
12     from ProdCommon.BossLite.DbObjects.RunningJob import RunningJob
13    
14     from ProdCommon.BossLite.API.BossLiteAPISched import BossLiteAPISched
15 slacapra 1.1
16     class Boss:
17     def __init__(self):
18 spiga 1.4
19 slacapra 1.1 return
20    
21     def __del__(self):
22     """ destroy instance """
23     del self.bossAdmin
24     del self.bossUser
25     return
26    
27    
28     #### Boss Configuration
29    
30     def configure(self, cfg_params):
31 spiga 1.4
32 slacapra 1.1 self.cfg_params = cfg_params
33 spiga 1.4 self.schedulerName = cfg_params.get("CRAB.scheduler",'') # this should match with the bosslite requirements
34    
35 spiga 1.8 ## Add here the map for others Schedulers (LSF/CAF/CondorG)
36     SchedMap = {'glite':'SchedulerGLiteAPI'}
37    
38     # schedulerConfig = { 'name' : SchedMap[self.schedulerName]}
39    
40     schedulerConfig = {
41 spiga 1.6 'name' : 'SchedulerGLiteAPI',
42 spiga 1.8 # 'service' :'https://wms006.cnaf.infn.it:7443/glite_wms_wmproxy_server'
43     # 'config' : '/afs/cern.ch/user/s/spiga/scratch0/WorkingDir/glite.conf.CMS_CNAF'
44 slacapra 1.7 }
45    
46    
47 spiga 1.8 self.schedSession = BossLiteAPISched( common.bossSession, schedulerConfig)
48 spiga 1.4
49 slacapra 1.1 self.outDir = cfg_params.get("USER.outputdir", common.work_space.resDir() )
50     self.logDir = cfg_params.get("USER.logdir", common.work_space.resDir() )
51    
52 spiga 1.4 self.return_data = cfg_params.get('USER.return_data',0)
53 slacapra 1.1
54     return
55    
56     #### End Boss Configuration
57    
58 spiga 1.8 def declare(self, nj):
59 slacapra 1.1 """
60     BOSS declaration of jobs
61     """
62 spiga 1.4 index = nj - 1
63     job = common.job_list[index]
64     jbt = job.type()
65     base = jbt.name()
66    
67 spiga 1.8 wrapper = common._db.queryTask('scriptName') ## Should disappear...
68     ## we'll have ONLY 'executable'
69     ## as task field and not job field
70     listField=[]
71     listID=[]
72     task=common._db.getTask()
73 spiga 1.4 for id in range(nj):
74     parameters={}
75     jobs=[]
76     out=[]
77 spiga 1.8 stdout = base +'_'+ str(id+1)+'.stdout'
78     stderr = base +'_'+ str(id+1)+'.stderr'
79 spiga 1.4 jobs.append(id)
80 spiga 1.8 out=task.jobs[id]['outputFiles']
81 spiga 1.4 out.append(stdout)
82     out.append(stderr)
83     out.append('.BrokerInfo')
84     parameters['outputFiles']=out
85 spiga 1.8 parameters['executable']=wrapper
86 spiga 1.4 parameters['standardOutput'] = stdout
87     parameters['standardError'] = stderr
88 spiga 1.8 listField.append(parameters)
89     listID.append(id)
90     common._db.updateJob_( listID, listField)
91 slacapra 1.1
92     return
93    
94     def task(self):
95     """ return Boss Task """
96     return self.bossTask
97    
98     ########################################## ---- OK for Boss4 ds
99     def listMatch(self, schedulerName, schcladstring):
100     """
101     Check the compatibility of available resources
102     """
103     Tout = 120
104     CEs=[]
105     try:
106     CEs=self.bossUser.schedListMatch( schedulerName, schcladstring, self.bossTask.id(), "", Tout)
107     common.logger.debug(1,"CEs :"+str(CEs))
108     except SchedulerError,e:
109     common.logger.message( "Warning : Scheduler interaction in list-match operation failed for jobs:")
110     common.logger.message( e.__str__())
111     pass
112     except BossError,e:
113     raise CrabException("ERROR: listMatch failed with message " + e.__str__())
114     return CEs
115    
116 spiga 1.8 def submit(self, jobsList,req):
117 slacapra 1.1 """
118     Submit BOSS function.
119     Submit one job. nj -- job number.
120     """
121 spiga 1.5 task = common._db.getTask()
122     #self.schedSession.submit( task, string.join(jobsList,','))
123 spiga 1.8 #print jobsList
124     pippo =[]
125     pippo.append(0)
126     self.schedSession.submit( task,jobsList,req )
127 spiga 1.5 # try:
128     # except SchedulerError,e:
129     # common.logger.message("Warning : Scheduler interaction in submit operation failed for jobs:")
130     # common.logger.message(e.__str__())
131     # pass
132     # except BossError,e:
133     # common.logger.message("Error : BOSS command failed with message:")
134     # common.logger.message(e.__str__())
135 slacapra 1.1
136 spiga 1.5 # jid=[]
137     # bjid = []
138     # self.bossTask.clear()
139     # range = str(jobsList[0]) + ":" + str(jobsList[-1])
140     # try:
141     # self.bossTask.load(ALL, range)
142     # except SchedulerError,e:
143     # common.logger.message("Warning : Scheduler interaction in query operation failed for jobs:")
144     # common.logger.message(e.__str__())
145     # pass
146     # except BossError,e:
147     # common.logger.message("Error : BOSS command failed with message:")
148     # common.logger.message(e.__str__())
149     # task = self.bossTask.jobsDict()
150 slacapra 1.1
151 spiga 1.5 # for k, v in task.iteritems():
152     # if (v["STATUS"] != 'W'):
153     # jid.append(v["SCHED_ID"])
154     # bjid.append(k)
155     # pass
156 slacapra 1.1 # if (v["STATUS"] == 'S'):
157     # jid.append(v["SCHED_ID"])
158     # bjid.append(k)
159     # pass
160 spiga 1.5 return #jid, bjid
161 slacapra 1.1
162     ###################### ---- OK for Boss4 ds
163     def moveOutput(self, int_id):
164     """
165     Move output of job already retrieved
166     """
167     self.current_time = time.strftime('%y%m%d_%H%M%S',time.localtime(time.time()))
168     resDir = common.work_space.resDir()
169     resDirSave = resDir+'res_backup'
170     if not os.path.exists(resDirSave):
171     os.mkdir(resDirSave)
172    
173     boss_id = str(int_id)
174     try:
175     self.bossTask.load (ALL, boss_id )
176     cmd_out = self.bossTask.program(boss_id, '1')['OUTFILES']
177     except BossError,e:
178     common.logger.message( e.__str__() )
179    
180     files = cmd_out.split(',')
181     for i in files:
182     if os.path.exists(self.outDir+'/'+i):
183     shutil.move(self.outDir+'/'+i, resDirSave+'/'+i+'_'+self.current_time)
184     common.logger.message('Output file '+i+' moved to '+resDirSave)
185    
186     if os.path.exists(self.logDir+'/'+i):
187     shutil.move(self.logDir+'/'+i, resDirSave+'/'+i+'_'+self.current_time)
188     common.logger.message('Output file '+i+' moved to '+resDirSave)
189     return
190    
191    
192     def queryDetailedStatus(self, id):
193     """ Query a detailed status of the job with id """
194    
195     return self.boss_scheduler.queryDetailedStatus(id)
196    
197     ###################### ---- OK for Boss4 ds
198     def getOutput(self, int_id):
199     """
200     Get output for a finished job with id.
201     Returns the name of directory with results.
202     """
203     if not os.path.isdir(self.logDir) or not os.path.isdir(self.outDir):
204     msg = ' Output or Log dir not found!! check '+self.logDir+' and '+self.outDir
205     raise CrabException(msg)
206     common.jobDB.load()
207     allBoss_id = self.list()
208     bossTaskId = common.taskDB.dict('BossTaskId')
209     ## first get the status of all job in the list
210     statusList = self.queryStatusList(bossTaskId, int_id)
211     check = 0
212    
213     ## then loop over jobs and retrieve it if it's the case
214     create= []
215     run= []
216     clear=[]
217     abort=[]
218     canc=[]
219     read=[]
220     wait=[]
221     sched=[]
222     kill=[]
223     other=[]
224     Tout=180
225    
226     for i_id in int_id :
227     if i_id not in allBoss_id:
228     msg = 'Job # '+`int(i_id)`+' out of range for task '+ self.groupName
229     common.logger.message(msg)
230     else:
231     dir = self.outDir
232     logDir = self.logDir
233     boss_id = i_id
234     #bossTaskIdStatus = common.scheduler.queryStatus(bossTaskId, boss_id)
235     bossTaskIdStatus = statusList[boss_id]
236     if bossTaskIdStatus == 'Done (Success)' or bossTaskIdStatus == 'Done (Abort)':
237     check = 1
238     try:
239     self.bossTask.getOutput (str(boss_id), str(dir), Tout)
240     if logDir != dir:
241     try:
242     ######
243     cmd = 'mv '+str(dir)+'/*'+str(i_id)+'.std* '+str(dir)+'/.BrokerInfo '+str(dir)+'/*.log '+str(logDir)
244     cmd_out =os.system(cmd)
245     msg = 'Results of Job # '+str(i_id)+' are in '+dir+' (log files are in '+logDir+')'
246     common.logger.message(msg)
247     #####
248     #toMove = str(dir)+'/*'+`int(i_id)`+'.std* '+str(dir)+'/*.log '+str(dir)+'/.BrokerInfo '
249     #shutil.move(toMove, str(logDir))
250     #####
251     except:
252     msg = 'Problem with copy of job results'
253     common.logger.message(msg)
254     pass
255     else:
256     msg = 'Results of Job # '+`int(i_id)`+' are in '+dir
257     common.logger.message(msg)
258     common.jobDB.setStatus(int(i_id)-1, 'Y')
259     except SchedulerError,e:
260     common.logger.message("Warning : Scheduler interaction in getOutput operation failed for jobs:")
261     common.logger.message(e.__str__())
262     pass
263     except BossError,e:
264     common.logger.message(e.__str__())
265     msg = 'Results of Job # '+`int(i_id)`+' have been corrupted and could not be retrieved.'
266     common.logger.message(msg)
267     common.jobDB.setStatus(int(i_id)-1, 'Z')
268     elif bossTaskIdStatus == 'Running' :
269     run.append(i_id)
270     # msg = 'Job # '+`int(i_id)`+' has status '+bossTaskIdStatus+'. It is not possible yet to retrieve the output.'
271     # common.logger.message(msg)
272     elif bossTaskIdStatus == 'Cleared' :
273     clear.append(i_id)
274     # msg = 'Job # '+`int(i_id)`+' has status '+bossTaskIdStatus+'. The output was already retrieved.'
275     # common.logger.message(msg)
276     elif bossTaskIdStatus == 'Aborted' :
277     abort.append(i_id)
278     # msg = 'Job # '+`int(i_id)`+' has status '+bossTaskIdStatus+'. It is not possible to retrieve the output.'
279     # common.logger.message(msg)
280     elif bossTaskIdStatus == 'Created' :
281     create.append(i_id)
282     elif bossTaskIdStatus == 'Cancelled' :
283     canc.append(i_id)
284     elif bossTaskIdStatus == 'Ready' :
285     read.append(i_id)
286     elif bossTaskIdStatus == 'Scheduled' :
287     sched.append(i_id)
288     elif bossTaskIdStatus == 'Waiting' :
289     wait.append(i_id)
290     elif bossTaskIdStatus == 'Killed' :
291     kill.append(i_id)
292     else:
293     other.append(i_id)
294     # msg = 'Job # '+`int(i_id)`+' has status '+bossTaskIdStatus+'. It is currently not possible to retrieve the output.'
295     # common.logger.message(msg)
296     dir += os.environ['USER']
297     dir += '_' + os.path.basename(str(boss_id))
298     pass
299     common.jobDB.save()
300     if check == 0:
301     msg = '\n\n*********No job in Done status. It is not possible yet to retrieve the output.\n'
302     common.logger.message(msg)
303    
304     if len(clear)!=0: print str(len(clear))+' jobs already cleared'
305     if len(abort)!=0: print str(len(abort))+' jobs aborted'
306     if len(canc)!=0: print str(len(canc))+' jobs cancelled'
307     if len(kill)!=0: print str(len(kill))+' jobs killed'
308     if len(run)!=0: print str(len(run))+' jobs still running'
309     if len(sched)!=0: print str(len(sched))+' jobs scheduled'
310     if len(wait)!=0: print str(len(wait))+' jobs waiting'
311     if len(read)!=0: print str(len(read))+' jobs ready'
312     if len(other)!=0: print str(len(other))+' jobs submitted'
313     if len(create)!=0: print str(len(create))+' jobs not yet submitted'
314    
315     print ' '
316     return
317    
318     ###################### ---- OK for Boss4 ds
319     def cancel(self,subm_id):
320     """
321     Cancel the EDG job with id: if id == -1, means all jobs.
322     """
323     #print "CANCEL -------------------------"
324     #print "int_id ",int_id," nSubmitted ", common.jobDB.nSubmittedJobs()
325    
326     common.jobDB.load()
327     if len( subm_id ) > 0:
328     try:
329     subm_id.sort()
330     range = self.prepString( subm_id )
331     common.logger.message("Killing job # " + str(subm_id).replace("[","",1).replace("]","",1) )
332     Tout =len(subm_id)*60
333     self.bossTask.kill( range, Tout )
334     self.bossTask.load(ALL, range)
335     task = self.bossTask.jobsDict()
336     for k, v in task.iteritems():
337     k = int(k)
338     status = v['STATUS']
339     if k in subm_id and status == 'K':
340     common.jobDB.setStatus(k - 1, 'K')
341     except SchedulerError,e:
342     common.logger.message("Warning : Scheduler interaction on kill operation failed for jobs:"+ e.__str__())
343     pass
344     except BossError,e:
345     common.logger.message( e.__str__() + "\nError killing jobs # "+str(subm_id)+" . See log for details")
346     common.jobDB.save()
347     pass
348     else:
349 slacapra 1.3 common.logger.message("\nNo job to be killed")
350 slacapra 1.1 common.jobDB.save()
351     return
352    
353     def setFlag( self, list, index ):
354     if len( list ) > (index + 1):
355     if list[index + 1] == ( list[index] + 1 ):
356     return -2
357     return -1
358     return list[ len(list) - 1 ]
359    
360     def prepString( self, list ):
361     s = ""
362     flag = 0
363     for i in range( len( list ) ):
364     if flag == 0:
365     s = str( list[i] )
366     flag = self.setFlag( list, i )
367     elif flag == -1:
368     s = s + "," + str( list[i] )
369     flag = self.setFlag( list, i )
370     elif flag == -2:
371     flag = self.setFlag( list, i )
372     if flag == -1:
373     s = s + ":" + str( list[i] )
374     if flag > 0:
375     s = s + ":" + str( list[i] )
376     return s
377    
378     def getAttribute(self, id, attr):
379     return self.boss_scheduler.getStatusAttribute_(id, attr)
380    
381     def getExitStatus(self, id):
382     return self.boss_scheduler.getStatusAttribute_(id, 'exit_code')
383    
384     def queryDest(self, id):
385     return self.boss_scheduler.getStatusAttribute_(id, 'destination')
386    
387     def boss_SID(self,int_ID):
388     """ Return Sid of job """
389     SID = ''
390    
391     if common.jobDB.nSubmittedJobs() == 0:
392     common.jobDB.load()
393    
394     SID = common.jobDB.jobId(int_ID-1)
395    
396     return SID
397    
398     ##################################################
399     def queryEverything(self,taskid):
400     """
401     Query needed info of all jobs with specified boss taskid
402     """
403    
404 spiga 1.8 self.schedSession.query( str(taskid))
405 slacapra 1.1
406 spiga 1.8 return
407 slacapra 1.1
408     def queryEveryStatus(self,taskid):
409     """ Query a status of all jobs with specified boss taskid """
410    
411     self.boss_scheduler.checkProxy()
412    
413     results = {}
414     try:
415     nTot = common.jobDB.nJobs()
416     Tout = nTot*20
417     # fill dictionary { 'bossid' : 'status' , ... }
418     self.bossTask.query( ALL, timeout = Tout )
419     task = self.bossTask.jobsDict()
420     for k, v in task.iteritems():
421     results[k] = self.status[v['STATUS']]
422     except SchedulerError,e:
423     common.logger.message("Warning : Scheduler interaction on query operation failed for jobs:")
424     common.logger.message(e.__str__())
425     pass
426     except BossError,e:
427     common.logger.message( e.__str__() )
428    
429     return results
430    
431     ##################################################
432     def queryStatusList(self,taskid,list_id):
433     """ Query a status of the job with id """
434    
435     allBoss_id = self.list()
436     tmpQ = ''
437     if not len(allBoss_id)==len(list_id): tmpQ = string.join(map(str,list_id),",")
438    
439     results = {}
440     try:
441     Tout = len(list_id)*20
442     # fill dictionary { 'bossid' : 'status' , ... }
443     self.bossTask.query( ALL, tmpQ, timeout = Tout )
444     task = self.bossTask.jobsDict()
445     for k, v in task.iteritems():
446     results[int(k)] = self.status[v['STATUS']]
447     except SchedulerError,e:
448     common.logger.message("Warning : Scheduler interaction on query operation failed for jobs:")
449     common.logger.message( e.__str__() )
450     pass
451     except BossError,e:
452     common.logger.message( e.__str__() )
453    
454     return results
455    
456     ###################### ---- OK for Boss4 ds
457     def list(self):
458     """
459     Return a list of all boss_Id of a task
460     """
461     ListBoss_ID = []
462     task = self.bossTask.jobsDict()
463     for k, v in task.iteritems():
464     ListBoss_ID.append(int(k))
465     ListBoss_ID.sort()
466     listBoss_Uniq = []
467     for i in ListBoss_ID: # check if there double index
468     if i not in listBoss_Uniq: listBoss_Uniq.append(i)
469     return listBoss_Uniq
470    
471     ##################
472     def taskDeclared( self, taskName ):
473     taskDict = self.bossUser.loadByName( taskName )
474     return (len(taskDict) > 0)
475    
476     def clean(self):
477     """ destroy boss instance """
478     del self.bossUser
479     return