ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Boss.py
Revision: 1.18
Committed: Mon Apr 7 15:01:01 2008 UTC (17 years ago) by spiga
Content type: text/x-python
Branch: MAIN
Changes since 1.17: +1 -1 lines
Log Message:
.BrokerInfo must be retrieved only for gLite middleware

File Contents

# User Rev Content
1 slacapra 1.1 from crab_logger import Logger
2     from crab_exceptions import *
3     from crab_util import *
4     import common
5     import os, time, shutil
6 spiga 1.17 import traceback
7 slacapra 1.1
8 spiga 1.4 from ProdCommon.BossLite.API.BossLiteAPI import BossLiteAPI
9    
10    
11     from ProdCommon.BossLite.DbObjects.Job import Job
12     from ProdCommon.BossLite.DbObjects.Task import Task
13     from ProdCommon.BossLite.DbObjects.RunningJob import RunningJob
14    
15 spiga 1.17 from ProdCommon.BossLite.Common.Exceptions import SchedulerError
16 spiga 1.4 from ProdCommon.BossLite.API.BossLiteAPISched import BossLiteAPISched
17 slacapra 1.1
18     class Boss:
19     def __init__(self):
20 spiga 1.4
21 slacapra 1.1 return
22 ewv 1.14
23     def configure(self,cfg_params):
24 spiga 1.9 self.cfg_params = cfg_params
25     self.schedulerName = self.cfg_params.get("CRAB.scheduler",'') # this should match with the bosslite requirements
26     self.rb_param_file=''
27     if (cfg_params.has_key('EDG.rb')):
28     self.rb_param_file=common.scheduler.rb_configure(cfg_params.get("EDG.rb"))
29 ewv 1.14 self.wms_service=cfg_params.get("EDG.wms_service",'')
30    
31 spiga 1.4
32 spiga 1.8 ## Add here the map for others Schedulers (LSF/CAF/CondorG)
33 ewv 1.14 SchedMap = {'glite':'SchedulerGLiteAPI',
34 spiga 1.9 'glitecoll':'SchedulerGLiteAPI',\
35 ewv 1.14 'condor_g':'SchedulerCondorGAPI',\
36 spiga 1.9 'lsf':'',\
37 ewv 1.14 'caf':''
38     }
39    
40 spiga 1.16 self.schedulerConfig = {
41 spiga 1.9 'name' : SchedMap[self.schedulerName], \
42     'service' : self.wms_service, \
43 ewv 1.14 'config' : self.rb_param_file
44 slacapra 1.7 }
45 slacapra 1.1 return
46    
47 spiga 1.17 def schedSession(self):
48     '''
49     Istantiate BossLiteApi session
50     '''
51     try:
52     session = BossLiteAPISched( common.bossSession, self.schedulerConfig)
53     except Exception, e :
54     common.logger.debug(3, "Istantiate SchedSession: " +str(traceback.format_exc()))
55     raise CrabException('Scheduler Session: '+str(e))
56     return session
57    
58 ewv 1.14 def declare(self, nj):
59 slacapra 1.1 """
60     BOSS declaration of jobs
61     """
62 spiga 1.4 index = nj - 1
63     job = common.job_list[index]
64     jbt = job.type()
65     base = jbt.name()
66    
67 spiga 1.13 wrapper = os.path.basename(str(common._db.queryTask('scriptName')))
68 spiga 1.8 listField=[]
69 ewv 1.14 listID=[]
70 spiga 1.8 task=common._db.getTask()
71 spiga 1.4 for id in range(nj):
72     parameters={}
73     jobs=[]
74 ewv 1.14 out=[]
75 spiga 1.8 stdout = base +'_'+ str(id+1)+'.stdout'
76     stderr = base +'_'+ str(id+1)+'.stderr'
77 spiga 1.4 jobs.append(id)
78 spiga 1.8 out=task.jobs[id]['outputFiles']
79 spiga 1.16 # out.append(stdout)
80     # out.append(stderr)
81 spiga 1.18 # out.append('.BrokerInfo')
82 ewv 1.14 parameters['outputFiles']=out
83 spiga 1.8 parameters['executable']=wrapper
84 spiga 1.4 parameters['standardOutput'] = stdout
85     parameters['standardError'] = stderr
86 spiga 1.8 listField.append(parameters)
87 ewv 1.14 listID.append(id+1)
88 spiga 1.8 common._db.updateJob_( listID, listField)
89 slacapra 1.1
90 ewv 1.14 return
91 slacapra 1.1
92 spiga 1.10 def listMatch(self, tags, dest, whiteL, blackL ):
93 slacapra 1.1 """
94     Check the compatibility of available resources
95     """
96 spiga 1.17 schedSession = self.schedSession()
97     try:
98     sites = schedSession.lcgInfo(tags, dest, whiteL, blackL )
99     except SchedulerError, err :
100     common.logger.message("Warning: List Match operation failed with message: " +str(err))
101     common.logger.debug(3, "List Match failed: " +str(traceback.format_exc()))
102    
103 ewv 1.14
104 spiga 1.9 # Tout = 120
105     # CEs=[]
106     # try:
107     # CEs=self.bossUser.schedListMatch( schedulerName, schcladstring, self.bossTask.id(), "", Tout)
108     # common.logger.debug(1,"CEs :"+str(CEs))
109     # except SchedulerError,e:
110     # common.logger.message( "Warning : Scheduler interaction in list-match operation failed for jobs:")
111     # common.logger.message( e.__str__())
112     # pass
113     # except BossError,e:
114     # raise CrabException("ERROR: listMatch failed with message " + e.__str__())
115     # return CEs
116 spiga 1.10 return len(sites)
117 ewv 1.14
118 spiga 1.8 def submit(self, jobsList,req):
119 slacapra 1.1 """
120     Submit BOSS function.
121     Submit one job. nj -- job number.
122     """
123 spiga 1.17 schedSession = self.schedSession()
124     # schedSession = BossLiteAPISched( common.bossSession, self.schedulerConfig)
125 spiga 1.11 task = common._db.getTask(jobsList)
126 spiga 1.17 try:
127     schedSession.submit( task,jobsList,req )
128     except SchedulerError, err :
129     common.logger.message("List Match: " +str(err))
130     common.logger.debug(3, "List Match: " +str(traceback.format_exc()))
131     raise CrabException('List Match: '+str(err))
132 ewv 1.14
133     return
134 spiga 1.10
135     def queryEverything(self,taskid):
136     """
137 spiga 1.15 Query needed info of all jobs with specified taskid
138 spiga 1.10 """
139    
140 spiga 1.17 schedSession = self.schedSession()
141     try:
142     statusRes = schedSession.query( str(taskid))
143     except SchedulerError, err :
144     common.logger.message("Status Query : " +str(err))
145     common.logger.debug(3, "Status Query : " +str(traceback.format_exc()))
146     raise CrabException('Status Query : '+str(err))
147    
148 spiga 1.15 return statusRes
149 slacapra 1.1
150 spiga 1.15 def getOutput(self,taskId,jobRange, outdir):
151 slacapra 1.1 """
152 spiga 1.15 Retrieve output of all jobs with specified taskid
153 slacapra 1.1 """
154 spiga 1.17 schedSession = self.schedSession()
155     try:
156     schedSession.getOutput( taskId, jobRange, outdir )
157     except SchedulerError, err :
158     common.logger.message("GetOutput : " +str(err))
159     common.logger.debug(3, "GetOutput : " +str(traceback.format_exc()))
160     raise CrabException('GetOutput : '+str(err))
161 spiga 1.15
162 slacapra 1.1 return
163    
164     def cancel(self,subm_id):
165     """
166     Cancel the EDG job with id: if id == -1, means all jobs.
167     """
168     #print "CANCEL -------------------------"
169     #print "int_id ",int_id," nSubmitted ", common.jobDB.nSubmittedJobs()
170 ewv 1.14
171     common.jobDB.load()
172 slacapra 1.1 if len( subm_id ) > 0:
173     try:
174     subm_id.sort()
175     range = self.prepString( subm_id )
176     common.logger.message("Killing job # " + str(subm_id).replace("[","",1).replace("]","",1) )
177     Tout =len(subm_id)*60
178     self.bossTask.kill( range, Tout )
179     self.bossTask.load(ALL, range)
180     task = self.bossTask.jobsDict()
181     for k, v in task.iteritems():
182     k = int(k)
183     status = v['STATUS']
184     if k in subm_id and status == 'K':
185     common.jobDB.setStatus(k - 1, 'K')
186     except SchedulerError,e:
187     common.logger.message("Warning : Scheduler interaction on kill operation failed for jobs:"+ e.__str__())
188     pass
189     except BossError,e:
190     common.logger.message( e.__str__() + "\nError killing jobs # "+str(subm_id)+" . See log for details")
191     common.jobDB.save()
192     pass
193     else:
194 slacapra 1.3 common.logger.message("\nNo job to be killed")
195 slacapra 1.1 common.jobDB.save()
196     return
197    
198     def setFlag( self, list, index ):
199     if len( list ) > (index + 1):
200     if list[index + 1] == ( list[index] + 1 ):
201     return -2
202     return -1
203     return list[ len(list) - 1 ]
204    
205     def prepString( self, list ):
206     s = ""
207     flag = 0
208     for i in range( len( list ) ):
209     if flag == 0:
210     s = str( list[i] )
211     flag = self.setFlag( list, i )
212     elif flag == -1:
213     s = s + "," + str( list[i] )
214     flag = self.setFlag( list, i )
215     elif flag == -2:
216     flag = self.setFlag( list, i )
217     if flag == -1:
218     s = s + ":" + str( list[i] )
219     if flag > 0:
220     s = s + ":" + str( list[i] )
221     return s
222    
223    
224    
225