ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Boss.py
Revision: 1.20
Committed: Wed Apr 9 07:52:58 2008 UTC (17 years ago) by spiga
Content type: text/x-python
Branch: MAIN
Changes since 1.19: +3 -3 lines
Log Message:
minor fixes

File Contents

# User Rev Content
1 slacapra 1.1 from crab_logger import Logger
2     from crab_exceptions import *
3     from crab_util import *
4     import common
5     import os, time, shutil
6 spiga 1.17 import traceback
7 slacapra 1.1
8 spiga 1.4 from ProdCommon.BossLite.API.BossLiteAPI import BossLiteAPI
9    
10    
11     from ProdCommon.BossLite.DbObjects.Job import Job
12     from ProdCommon.BossLite.DbObjects.Task import Task
13     from ProdCommon.BossLite.DbObjects.RunningJob import RunningJob
14    
15 spiga 1.17 from ProdCommon.BossLite.Common.Exceptions import SchedulerError
16 spiga 1.4 from ProdCommon.BossLite.API.BossLiteAPISched import BossLiteAPISched
17 slacapra 1.1
18     class Boss:
19     def __init__(self):
20 spiga 1.4
21 slacapra 1.1 return
22 ewv 1.14
23     def configure(self,cfg_params):
24 spiga 1.9 self.cfg_params = cfg_params
25     self.schedulerName = self.cfg_params.get("CRAB.scheduler",'') # this should match with the bosslite requirements
26     self.rb_param_file=''
27     if (cfg_params.has_key('EDG.rb')):
28     self.rb_param_file=common.scheduler.rb_configure(cfg_params.get("EDG.rb"))
29 ewv 1.14 self.wms_service=cfg_params.get("EDG.wms_service",'')
30    
31 spiga 1.4
32 spiga 1.8 ## Add here the map for others Schedulers (LSF/CAF/CondorG)
33 ewv 1.14 SchedMap = {'glite':'SchedulerGLiteAPI',
34 spiga 1.9 'glitecoll':'SchedulerGLiteAPI',\
35 ewv 1.14 'condor_g':'SchedulerCondorGAPI',\
36 spiga 1.9 'lsf':'',\
37 ewv 1.14 'caf':''
38     }
39    
40 spiga 1.16 self.schedulerConfig = {
41 spiga 1.9 'name' : SchedMap[self.schedulerName], \
42     'service' : self.wms_service, \
43 ewv 1.14 'config' : self.rb_param_file
44 slacapra 1.7 }
45 slacapra 1.1 return
46    
47 spiga 1.17 def schedSession(self):
48     '''
49     Istantiate BossLiteApi session
50     '''
51     try:
52     session = BossLiteAPISched( common.bossSession, self.schedulerConfig)
53     except Exception, e :
54     common.logger.debug(3, "Istantiate SchedSession: " +str(traceback.format_exc()))
55     raise CrabException('Scheduler Session: '+str(e))
56     return session
57    
58 ewv 1.14 def declare(self, nj):
59 slacapra 1.1 """
60     BOSS declaration of jobs
61     """
62 spiga 1.4 index = nj - 1
63     job = common.job_list[index]
64     jbt = job.type()
65     base = jbt.name()
66    
67 spiga 1.13 wrapper = os.path.basename(str(common._db.queryTask('scriptName')))
68 spiga 1.8 listField=[]
69 ewv 1.14 listID=[]
70 spiga 1.8 task=common._db.getTask()
71 spiga 1.4 for id in range(nj):
72     parameters={}
73     jobs=[]
74 ewv 1.14 out=[]
75 spiga 1.8 stdout = base +'_'+ str(id+1)+'.stdout'
76     stderr = base +'_'+ str(id+1)+'.stderr'
77 spiga 1.4 jobs.append(id)
78 spiga 1.8 out=task.jobs[id]['outputFiles']
79 spiga 1.16 # out.append(stdout)
80     # out.append(stderr)
81 spiga 1.18 # out.append('.BrokerInfo')
82 spiga 1.19 ## To be better understood if it is needed
83     out.append('crab_fjr_'+str(id+1)+'.xml')
84 ewv 1.14 parameters['outputFiles']=out
85 spiga 1.8 parameters['executable']=wrapper
86 spiga 1.4 parameters['standardOutput'] = stdout
87     parameters['standardError'] = stderr
88 spiga 1.8 listField.append(parameters)
89 ewv 1.14 listID.append(id+1)
90 spiga 1.8 common._db.updateJob_( listID, listField)
91 slacapra 1.1
92 ewv 1.14 return
93 slacapra 1.1
94 spiga 1.10 def listMatch(self, tags, dest, whiteL, blackL ):
95 slacapra 1.1 """
96     Check the compatibility of available resources
97     """
98 spiga 1.17 schedSession = self.schedSession()
99     try:
100     sites = schedSession.lcgInfo(tags, dest, whiteL, blackL )
101     except SchedulerError, err :
102     common.logger.message("Warning: List Match operation failed with message: " +str(err))
103     common.logger.debug(3, "List Match failed: " +str(traceback.format_exc()))
104    
105 ewv 1.14
106 spiga 1.9 # Tout = 120
107     # CEs=[]
108     # try:
109     # CEs=self.bossUser.schedListMatch( schedulerName, schcladstring, self.bossTask.id(), "", Tout)
110     # common.logger.debug(1,"CEs :"+str(CEs))
111     # except SchedulerError,e:
112     # common.logger.message( "Warning : Scheduler interaction in list-match operation failed for jobs:")
113     # common.logger.message( e.__str__())
114     # pass
115     # except BossError,e:
116     # raise CrabException("ERROR: listMatch failed with message " + e.__str__())
117     # return CEs
118 spiga 1.10 return len(sites)
119 ewv 1.14
120 spiga 1.8 def submit(self, jobsList,req):
121 slacapra 1.1 """
122     Submit BOSS function.
123     Submit one job. nj -- job number.
124     """
125 spiga 1.17 schedSession = self.schedSession()
126     # schedSession = BossLiteAPISched( common.bossSession, self.schedulerConfig)
127 spiga 1.11 task = common._db.getTask(jobsList)
128 spiga 1.17 try:
129     schedSession.submit( task,jobsList,req )
130     except SchedulerError, err :
131 spiga 1.20 common.logger.message("Submit: " +str(err))
132     common.logger.debug(3, "Submit: " +str(traceback.format_exc()))
133     raise CrabException('Submit: '+str(err))
134 ewv 1.14
135     return
136 spiga 1.10
137     def queryEverything(self,taskid):
138     """
139 spiga 1.15 Query needed info of all jobs with specified taskid
140 spiga 1.10 """
141    
142 spiga 1.17 schedSession = self.schedSession()
143     try:
144     statusRes = schedSession.query( str(taskid))
145     except SchedulerError, err :
146     common.logger.message("Status Query : " +str(err))
147     common.logger.debug(3, "Status Query : " +str(traceback.format_exc()))
148     raise CrabException('Status Query : '+str(err))
149    
150 spiga 1.15 return statusRes
151 slacapra 1.1
152 spiga 1.15 def getOutput(self,taskId,jobRange, outdir):
153 slacapra 1.1 """
154 spiga 1.15 Retrieve output of all jobs with specified taskid
155 slacapra 1.1 """
156 spiga 1.17 schedSession = self.schedSession()
157     try:
158     schedSession.getOutput( taskId, jobRange, outdir )
159     except SchedulerError, err :
160     common.logger.message("GetOutput : " +str(err))
161     common.logger.debug(3, "GetOutput : " +str(traceback.format_exc()))
162     raise CrabException('GetOutput : '+str(err))
163 spiga 1.15
164 slacapra 1.1 return
165    
166     def cancel(self,subm_id):
167     """
168     Cancel the EDG job with id: if id == -1, means all jobs.
169     """
170     #print "CANCEL -------------------------"
171     #print "int_id ",int_id," nSubmitted ", common.jobDB.nSubmittedJobs()
172 ewv 1.14
173     common.jobDB.load()
174 slacapra 1.1 if len( subm_id ) > 0:
175     try:
176     subm_id.sort()
177     range = self.prepString( subm_id )
178     common.logger.message("Killing job # " + str(subm_id).replace("[","",1).replace("]","",1) )
179     Tout =len(subm_id)*60
180     self.bossTask.kill( range, Tout )
181     self.bossTask.load(ALL, range)
182     task = self.bossTask.jobsDict()
183     for k, v in task.iteritems():
184     k = int(k)
185     status = v['STATUS']
186     if k in subm_id and status == 'K':
187     common.jobDB.setStatus(k - 1, 'K')
188     except SchedulerError,e:
189     common.logger.message("Warning : Scheduler interaction on kill operation failed for jobs:"+ e.__str__())
190     pass
191     except BossError,e:
192     common.logger.message( e.__str__() + "\nError killing jobs # "+str(subm_id)+" . See log for details")
193     common.jobDB.save()
194     pass
195     else:
196 slacapra 1.3 common.logger.message("\nNo job to be killed")
197 slacapra 1.1 common.jobDB.save()
198     return
199    
200     def setFlag( self, list, index ):
201     if len( list ) > (index + 1):
202     if list[index + 1] == ( list[index] + 1 ):
203     return -2
204     return -1
205     return list[ len(list) - 1 ]
206    
207     def prepString( self, list ):
208     s = ""
209     flag = 0
210     for i in range( len( list ) ):
211     if flag == 0:
212     s = str( list[i] )
213     flag = self.setFlag( list, i )
214     elif flag == -1:
215     s = s + "," + str( list[i] )
216     flag = self.setFlag( list, i )
217     elif flag == -2:
218     flag = self.setFlag( list, i )
219     if flag == -1:
220     s = s + ":" + str( list[i] )
221     if flag > 0:
222     s = s + ":" + str( list[i] )
223     return s
224    
225    
226    
227