ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Boss.py
Revision: 1.11
Committed: Tue Mar 25 09:49:41 2008 UTC (17 years, 1 month ago) by spiga
Content type: text/x-python
Branch: MAIN
Changes since 1.10: +1 -1 lines
Log Message:
enabled submission by range...as in the past

File Contents

# User Rev Content
1 slacapra 1.1 from crab_logger import Logger
2     from crab_exceptions import *
3     from crab_util import *
4     import common
5     import os, time, shutil
6    
7 spiga 1.4 from ProdCommon.BossLite.API.BossLiteAPI import BossLiteAPI
8    
9    
10     from ProdCommon.BossLite.DbObjects.Job import Job
11     from ProdCommon.BossLite.DbObjects.Task import Task
12     from ProdCommon.BossLite.DbObjects.RunningJob import RunningJob
13    
14     from ProdCommon.BossLite.API.BossLiteAPISched import BossLiteAPISched
15 slacapra 1.1
16     class Boss:
17     def __init__(self):
18 spiga 1.4
19 slacapra 1.1 return
20 spiga 1.9
21     def configure(self,cfg_params):
22     self.cfg_params = cfg_params
23     self.schedulerName = self.cfg_params.get("CRAB.scheduler",'') # this should match with the bosslite requirements
24     self.rb_param_file=''
25     if (cfg_params.has_key('EDG.rb')):
26     self.rb_param_file=common.scheduler.rb_configure(cfg_params.get("EDG.rb"))
27     self.wms_service=cfg_params.get("EDG.wms_service",'')
28    
29     self.outDir = cfg_params.get("USER.outputdir", common.work_space.resDir() )
30     self.logDir = cfg_params.get("USER.logdir", common.work_space.resDir() )
31 slacapra 1.1
32 spiga 1.9 self.return_data = cfg_params.get('USER.return_data',0)
33 spiga 1.4
34 spiga 1.8 ## Add here the map for others Schedulers (LSF/CAF/CondorG)
35 spiga 1.9 SchedMap = {'glite':'SchedulerGLiteAPI',
36     'glitecoll':'SchedulerGLiteAPI',\
37     'condor_g':'',\
38     'lsf':'',\
39     'caf':''
40     }
41    
42 spiga 1.8 schedulerConfig = {
43 spiga 1.9 'name' : SchedMap[self.schedulerName], \
44     'service' : self.wms_service, \
45     'config' : self.rb_param_file
46 slacapra 1.7 }
47    
48 spiga 1.8 self.schedSession = BossLiteAPISched( common.bossSession, schedulerConfig)
49 slacapra 1.1
50     return
51    
52 spiga 1.8 def declare(self, nj):
53 slacapra 1.1 """
54     BOSS declaration of jobs
55     """
56 spiga 1.4 index = nj - 1
57     job = common.job_list[index]
58     jbt = job.type()
59     base = jbt.name()
60    
61 spiga 1.8 wrapper = common._db.queryTask('scriptName') ## Should disappear...
62     ## we'll have ONLY 'executable'
63     ## as task field and not job field
64     listField=[]
65     listID=[]
66     task=common._db.getTask()
67 spiga 1.4 for id in range(nj):
68     parameters={}
69     jobs=[]
70     out=[]
71 spiga 1.8 stdout = base +'_'+ str(id+1)+'.stdout'
72     stderr = base +'_'+ str(id+1)+'.stderr'
73 spiga 1.4 jobs.append(id)
74 spiga 1.8 out=task.jobs[id]['outputFiles']
75 spiga 1.4 out.append(stdout)
76     out.append(stderr)
77     out.append('.BrokerInfo')
78     parameters['outputFiles']=out
79 spiga 1.8 parameters['executable']=wrapper
80 spiga 1.4 parameters['standardOutput'] = stdout
81     parameters['standardError'] = stderr
82 spiga 1.8 listField.append(parameters)
83     listID.append(id)
84     common._db.updateJob_( listID, listField)
85 slacapra 1.1
86     return
87    
88 spiga 1.10 def listMatch(self, tags, dest, whiteL, blackL ):
89 slacapra 1.1 """
90     Check the compatibility of available resources
91     """
92 spiga 1.10 sites = self.schedSession.lcgInfo(tags, dest, whiteL, blackL )
93    
94 spiga 1.9 # Tout = 120
95     # CEs=[]
96     # try:
97     # CEs=self.bossUser.schedListMatch( schedulerName, schcladstring, self.bossTask.id(), "", Tout)
98     # common.logger.debug(1,"CEs :"+str(CEs))
99     # except SchedulerError,e:
100     # common.logger.message( "Warning : Scheduler interaction in list-match operation failed for jobs:")
101     # common.logger.message( e.__str__())
102     # pass
103     # except BossError,e:
104     # raise CrabException("ERROR: listMatch failed with message " + e.__str__())
105     # return CEs
106 spiga 1.10 return len(sites)
107 slacapra 1.1
108 spiga 1.8 def submit(self, jobsList,req):
109 slacapra 1.1 """
110     Submit BOSS function.
111     Submit one job. nj -- job number.
112     """
113 spiga 1.11 task = common._db.getTask(jobsList)
114 spiga 1.8 self.schedSession.submit( task,jobsList,req )
115 spiga 1.5 # try:
116     # except SchedulerError,e:
117     # common.logger.message("Warning : Scheduler interaction in submit operation failed for jobs:")
118     # common.logger.message(e.__str__())
119     # pass
120     # except BossError,e:
121     # common.logger.message("Error : BOSS command failed with message:")
122     # common.logger.message(e.__str__())
123 slacapra 1.1
124 spiga 1.10 return
125    
126     def queryEverything(self,taskid):
127     """
128     Query needed info of all jobs with specified boss taskid
129     """
130    
131     self.schedSession.query( str(taskid))
132    
133     return
134 slacapra 1.1
135     def moveOutput(self, int_id):
136     """
137     Move output of job already retrieved
138     """
139     self.current_time = time.strftime('%y%m%d_%H%M%S',time.localtime(time.time()))
140     resDir = common.work_space.resDir()
141     resDirSave = resDir+'res_backup'
142     if not os.path.exists(resDirSave):
143     os.mkdir(resDirSave)
144    
145     boss_id = str(int_id)
146     try:
147     self.bossTask.load (ALL, boss_id )
148     cmd_out = self.bossTask.program(boss_id, '1')['OUTFILES']
149     except BossError,e:
150     common.logger.message( e.__str__() )
151    
152     files = cmd_out.split(',')
153     for i in files:
154     if os.path.exists(self.outDir+'/'+i):
155     shutil.move(self.outDir+'/'+i, resDirSave+'/'+i+'_'+self.current_time)
156     common.logger.message('Output file '+i+' moved to '+resDirSave)
157    
158     if os.path.exists(self.logDir+'/'+i):
159     shutil.move(self.logDir+'/'+i, resDirSave+'/'+i+'_'+self.current_time)
160     common.logger.message('Output file '+i+' moved to '+resDirSave)
161     return
162    
163     ###################### ---- OK for Boss4 ds
164     def getOutput(self, int_id):
165     """
166     Get output for a finished job with id.
167     Returns the name of directory with results.
168     """
169     if not os.path.isdir(self.logDir) or not os.path.isdir(self.outDir):
170     msg = ' Output or Log dir not found!! check '+self.logDir+' and '+self.outDir
171     raise CrabException(msg)
172     common.jobDB.load()
173     allBoss_id = self.list()
174     bossTaskId = common.taskDB.dict('BossTaskId')
175     ## first get the status of all job in the list
176     statusList = self.queryStatusList(bossTaskId, int_id)
177     check = 0
178    
179     ## then loop over jobs and retrieve it if it's the case
180     create= []
181     run= []
182     clear=[]
183     abort=[]
184     canc=[]
185     read=[]
186     wait=[]
187     sched=[]
188     kill=[]
189     other=[]
190     Tout=180
191    
192     for i_id in int_id :
193     if i_id not in allBoss_id:
194     msg = 'Job # '+`int(i_id)`+' out of range for task '+ self.groupName
195     common.logger.message(msg)
196     else:
197     dir = self.outDir
198     logDir = self.logDir
199     boss_id = i_id
200     #bossTaskIdStatus = common.scheduler.queryStatus(bossTaskId, boss_id)
201     bossTaskIdStatus = statusList[boss_id]
202     if bossTaskIdStatus == 'Done (Success)' or bossTaskIdStatus == 'Done (Abort)':
203     check = 1
204     try:
205     self.bossTask.getOutput (str(boss_id), str(dir), Tout)
206     if logDir != dir:
207     try:
208     ######
209     cmd = 'mv '+str(dir)+'/*'+str(i_id)+'.std* '+str(dir)+'/.BrokerInfo '+str(dir)+'/*.log '+str(logDir)
210     cmd_out =os.system(cmd)
211     msg = 'Results of Job # '+str(i_id)+' are in '+dir+' (log files are in '+logDir+')'
212     common.logger.message(msg)
213     #####
214     #toMove = str(dir)+'/*'+`int(i_id)`+'.std* '+str(dir)+'/*.log '+str(dir)+'/.BrokerInfo '
215     #shutil.move(toMove, str(logDir))
216     #####
217     except:
218     msg = 'Problem with copy of job results'
219     common.logger.message(msg)
220     pass
221     else:
222     msg = 'Results of Job # '+`int(i_id)`+' are in '+dir
223     common.logger.message(msg)
224     common.jobDB.setStatus(int(i_id)-1, 'Y')
225     except SchedulerError,e:
226     common.logger.message("Warning : Scheduler interaction in getOutput operation failed for jobs:")
227     common.logger.message(e.__str__())
228     pass
229     except BossError,e:
230     common.logger.message(e.__str__())
231     msg = 'Results of Job # '+`int(i_id)`+' have been corrupted and could not be retrieved.'
232     common.logger.message(msg)
233     common.jobDB.setStatus(int(i_id)-1, 'Z')
234     elif bossTaskIdStatus == 'Running' :
235     run.append(i_id)
236     # msg = 'Job # '+`int(i_id)`+' has status '+bossTaskIdStatus+'. It is not possible yet to retrieve the output.'
237     # common.logger.message(msg)
238     elif bossTaskIdStatus == 'Cleared' :
239     clear.append(i_id)
240     # msg = 'Job # '+`int(i_id)`+' has status '+bossTaskIdStatus+'. The output was already retrieved.'
241     # common.logger.message(msg)
242     elif bossTaskIdStatus == 'Aborted' :
243     abort.append(i_id)
244     # msg = 'Job # '+`int(i_id)`+' has status '+bossTaskIdStatus+'. It is not possible to retrieve the output.'
245     # common.logger.message(msg)
246     elif bossTaskIdStatus == 'Created' :
247     create.append(i_id)
248     elif bossTaskIdStatus == 'Cancelled' :
249     canc.append(i_id)
250     elif bossTaskIdStatus == 'Ready' :
251     read.append(i_id)
252     elif bossTaskIdStatus == 'Scheduled' :
253     sched.append(i_id)
254     elif bossTaskIdStatus == 'Waiting' :
255     wait.append(i_id)
256     elif bossTaskIdStatus == 'Killed' :
257     kill.append(i_id)
258     else:
259     other.append(i_id)
260     # msg = 'Job # '+`int(i_id)`+' has status '+bossTaskIdStatus+'. It is currently not possible to retrieve the output.'
261     # common.logger.message(msg)
262     dir += os.environ['USER']
263     dir += '_' + os.path.basename(str(boss_id))
264     pass
265     common.jobDB.save()
266     if check == 0:
267     msg = '\n\n*********No job in Done status. It is not possible yet to retrieve the output.\n'
268     common.logger.message(msg)
269    
270     if len(clear)!=0: print str(len(clear))+' jobs already cleared'
271     if len(abort)!=0: print str(len(abort))+' jobs aborted'
272     if len(canc)!=0: print str(len(canc))+' jobs cancelled'
273     if len(kill)!=0: print str(len(kill))+' jobs killed'
274     if len(run)!=0: print str(len(run))+' jobs still running'
275     if len(sched)!=0: print str(len(sched))+' jobs scheduled'
276     if len(wait)!=0: print str(len(wait))+' jobs waiting'
277     if len(read)!=0: print str(len(read))+' jobs ready'
278     if len(other)!=0: print str(len(other))+' jobs submitted'
279     if len(create)!=0: print str(len(create))+' jobs not yet submitted'
280    
281     print ' '
282     return
283    
284     ###################### ---- OK for Boss4 ds
285     def cancel(self,subm_id):
286     """
287     Cancel the EDG job with id: if id == -1, means all jobs.
288     """
289     #print "CANCEL -------------------------"
290     #print "int_id ",int_id," nSubmitted ", common.jobDB.nSubmittedJobs()
291    
292     common.jobDB.load()
293     if len( subm_id ) > 0:
294     try:
295     subm_id.sort()
296     range = self.prepString( subm_id )
297     common.logger.message("Killing job # " + str(subm_id).replace("[","",1).replace("]","",1) )
298     Tout =len(subm_id)*60
299     self.bossTask.kill( range, Tout )
300     self.bossTask.load(ALL, range)
301     task = self.bossTask.jobsDict()
302     for k, v in task.iteritems():
303     k = int(k)
304     status = v['STATUS']
305     if k in subm_id and status == 'K':
306     common.jobDB.setStatus(k - 1, 'K')
307     except SchedulerError,e:
308     common.logger.message("Warning : Scheduler interaction on kill operation failed for jobs:"+ e.__str__())
309     pass
310     except BossError,e:
311     common.logger.message( e.__str__() + "\nError killing jobs # "+str(subm_id)+" . See log for details")
312     common.jobDB.save()
313     pass
314     else:
315 slacapra 1.3 common.logger.message("\nNo job to be killed")
316 slacapra 1.1 common.jobDB.save()
317     return
318    
319     def setFlag( self, list, index ):
320     if len( list ) > (index + 1):
321     if list[index + 1] == ( list[index] + 1 ):
322     return -2
323     return -1
324     return list[ len(list) - 1 ]
325    
326     def prepString( self, list ):
327     s = ""
328     flag = 0
329     for i in range( len( list ) ):
330     if flag == 0:
331     s = str( list[i] )
332     flag = self.setFlag( list, i )
333     elif flag == -1:
334     s = s + "," + str( list[i] )
335     flag = self.setFlag( list, i )
336     elif flag == -2:
337     flag = self.setFlag( list, i )
338     if flag == -1:
339     s = s + ":" + str( list[i] )
340     if flag > 0:
341     s = s + ":" + str( list[i] )
342     return s
343    
344    
345    
346