ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Boss.py
Revision: 1.38
Committed: Wed Jul 2 09:51:29 2008 UTC (16 years, 10 months ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_4_0_test, CRAB_2_3_1_pre3
Changes since 1.37: +1 -0 lines
Log Message:
added time out

File Contents

# User Rev Content
1 slacapra 1.1 from crab_logger import Logger
2     from crab_exceptions import *
3     from crab_util import *
4     import common
5     import os, time, shutil
6 spiga 1.17 import traceback
7 slacapra 1.1
8 spiga 1.4 from ProdCommon.BossLite.API.BossLiteAPI import BossLiteAPI
9    
10    
11     from ProdCommon.BossLite.DbObjects.Job import Job
12     from ProdCommon.BossLite.DbObjects.Task import Task
13     from ProdCommon.BossLite.DbObjects.RunningJob import RunningJob
14    
15 spiga 1.17 from ProdCommon.BossLite.Common.Exceptions import SchedulerError
16 spiga 1.4 from ProdCommon.BossLite.API.BossLiteAPISched import BossLiteAPISched
17 slacapra 1.1
18     class Boss:
19     def __init__(self):
20 spiga 1.4
21 slacapra 1.21 self.session=None
22 slacapra 1.1 return
23 ewv 1.14
24 slacapra 1.21 def __del__(self):
25     """ destroy instance """
26     del self.session
27    
28 ewv 1.14 def configure(self,cfg_params):
29 spiga 1.9 self.cfg_params = cfg_params
30     self.schedulerName = self.cfg_params.get("CRAB.scheduler",'') # this should match with the bosslite requirements
31     self.rb_param_file=''
32     if (cfg_params.has_key('EDG.rb')):
33     self.rb_param_file=common.scheduler.rb_configure(cfg_params.get("EDG.rb"))
34 ewv 1.14 self.wms_service=cfg_params.get("EDG.wms_service",'')
35    
36 spiga 1.4
37 spiga 1.8 ## Add here the map for others Schedulers (LSF/CAF/CondorG)
38 ewv 1.30 SchedMap = {'glite': 'SchedulerGLiteAPI',
39 spiga 1.9 'glitecoll':'SchedulerGLiteAPI',\
40 ewv 1.30 'condor': 'SchedulerCondor',\
41     'condor_g': 'SchedulerCondorG',\
42     'glidein': 'SchedulerGlidein',\
43     'lsf': 'SchedulerLsf',\
44 slacapra 1.33 'caf': 'SchedulerLsf',\
45     'sge': 'SchedulerSge'
46 ewv 1.23 }
47    
48 spiga 1.31 self.schedulerConfig = common.scheduler.realSchedParams(cfg_params)
49 ewv 1.32 self.schedulerConfig['name'] = SchedMap[(self.schedulerName).lower()]
50 spiga 1.38 self.schedulerConfig['timeout'] = 180
51 slacapra 1.21
52 ewv 1.32 self.session = None
53 slacapra 1.1 return
54    
55 ewv 1.23 def schedSession(self):
56     '''
57 spiga 1.17 Istantiate BossLiteApi session
58 ewv 1.23 '''
59 slacapra 1.21 if not self.session:
60 ewv 1.23 try:
61 slacapra 1.21 self.session = BossLiteAPISched( common.bossSession, self.schedulerConfig)
62     except Exception, e :
63     common.logger.debug(3, "Istantiate SchedSession: " +str(traceback.format_exc()))
64     raise CrabException('Scheduler Session: '+str(e))
65     return self.session
66 spiga 1.17
67 spiga 1.37 def declare(self, listID):
68 slacapra 1.1 """
69     BOSS declaration of jobs
70     """
71 spiga 1.37 index = len(listID) - 1
72 spiga 1.4 job = common.job_list[index]
73     jbt = job.type()
74     base = jbt.name()
75    
76 spiga 1.24 wrapper = os.path.basename(str(common._db.queryTask('scriptName')))
77 spiga 1.8 listField=[]
78     task=common._db.getTask()
79 spiga 1.37 for id in listID:
80 spiga 1.4 parameters={}
81     jobs=[]
82 ewv 1.14 out=[]
83 spiga 1.37 stdout = base +'_'+ str(id)+'.stdout'
84     stderr = base +'_'+ str(id)+'.stderr'
85 spiga 1.4 jobs.append(id)
86 spiga 1.37 out=task.jobs[id-1]['outputFiles']
87 spiga 1.16 # out.append(stdout)
88     # out.append(stderr)
89 spiga 1.18 # out.append('.BrokerInfo')
90 spiga 1.19 ## To be better understood if it is needed
91 spiga 1.37 out.append('crab_fjr_'+str(id)+'.xml')
92 ewv 1.14 parameters['outputFiles']=out
93 spiga 1.8 parameters['executable']=wrapper
94 spiga 1.4 parameters['standardOutput'] = stdout
95     parameters['standardError'] = stderr
96 spiga 1.8 listField.append(parameters)
97     common._db.updateJob_( listID, listField)
98 slacapra 1.1
99 ewv 1.14 return
100 slacapra 1.1
101 spiga 1.35 def listMatch(self, tags, dest, whiteL, blackL, isFull):
102 slacapra 1.1 """
103     Check the compatibility of available resources
104     """
105 spiga 1.17 try:
106 spiga 1.35 sites = self.schedSession().lcgInfo(tags, seList=dest, blacklist=blackL, whitelist=whiteL, full=isFull)
107 spiga 1.17 except SchedulerError, err :
108     common.logger.message("Warning: List Match operation failed with message: " +str(err))
109     common.logger.debug(3, "List Match failed: " +str(traceback.format_exc()))
110    
111 slacapra 1.25 return sites
112 ewv 1.14
113 spiga 1.28 def submit(self, taskId, jobsList, req):
114 slacapra 1.1 """
115     Submit BOSS function.
116     Submit one job. nj -- job number.
117     """
118 ewv 1.23 try:
119 spiga 1.34 task_sub = self.schedSession().submit( taskId, jobsList,req )
120     wms = task_sub.jobs[0].runningJob['service']
121 spiga 1.36 collId = task_sub.jobs[0].runningJob['schedulerParentId']
122     msg = 'WMS : ' +str(wms)+'\n'
123     msg+= 'Collection ID : ' +str(collId)
124     common.logger.write(msg)
125     common.logger.debug(5,msg)
126 spiga 1.17 except SchedulerError, err :
127 spiga 1.20 common.logger.message("Submit: " +str(err))
128     common.logger.debug(3, "Submit: " +str(traceback.format_exc()))
129     raise CrabException('Submit: '+str(err))
130 ewv 1.14
131     return
132 spiga 1.10
133     def queryEverything(self,taskid):
134     """
135 spiga 1.15 Query needed info of all jobs with specified taskid
136 spiga 1.10 """
137    
138 spiga 1.17 try:
139 slacapra 1.21 statusRes = self.schedSession().query( str(taskid))
140 spiga 1.17 except SchedulerError, err :
141     common.logger.message("Status Query : " +str(err))
142     common.logger.debug(3, "Status Query : " +str(traceback.format_exc()))
143     raise CrabException('Status Query : '+str(err))
144    
145 spiga 1.15 return statusRes
146 slacapra 1.1
147 spiga 1.15 def getOutput(self,taskId,jobRange, outdir):
148 slacapra 1.1 """
149 spiga 1.15 Retrieve output of all jobs with specified taskid
150 slacapra 1.1 """
151 ewv 1.23 try:
152 slacapra 1.21 self.schedSession().getOutput( taskId, jobRange, outdir )
153 spiga 1.17 except SchedulerError, err :
154     common.logger.message("GetOutput : " +str(err))
155     common.logger.debug(3, "GetOutput : " +str(traceback.format_exc()))
156     raise CrabException('GetOutput : '+str(err))
157 ewv 1.23
158 slacapra 1.1 return
159    
160 slacapra 1.21 def cancel(self,list):
161 slacapra 1.1 """
162 slacapra 1.21 Cancel the job with id from a list
163 slacapra 1.1 """
164 slacapra 1.21 task = common._db.getTask(list)
165 ewv 1.23 try:
166 slacapra 1.21 self.schedSession().kill( task, list)
167     except SchedulerError, err :
168     common.logger.message("Kill: " +str(err))
169     common.logger.debug(3, "Kill: " +str(traceback.format_exc()))
170     raise CrabException('Kill: '+str(err))
171 slacapra 1.1 return
172    
173 spiga 1.22 def LoggingInfo(self,list_id,outfile):
174     """
175     query the logging info with id from a list and
176 ewv 1.23 retourn the reults
177     """
178     try:
179 spiga 1.22 self.schedSession().postMortem(1,list_id,outfile)
180     except SchedulerError, err :
181     common.logger.message("logginginfo: " +str(err))
182     common.logger.debug(3, "logginginfo: " +str(traceback.format_exc()))
183     raise CrabException('logginginfo: '+str(err))
184     return
185    
186 spiga 1.28 def writeJDL(self, taskId,jobsList,req):
187     """
188     JDL description BOSS function.
189     """
190     try:
191     jdl = self.schedSession().jobDescription( taskId,jobsList,req )
192     except SchedulerError, err :
193     common.logger.message("writeJDL: " +str(err))
194     common.logger.debug(3, "writeJDL: " +str(traceback.format_exc()))
195     raise CrabException('writeJDL: '+str(err))
196    
197     return jdl
198    
199 slacapra 1.1 def setFlag( self, list, index ):
200     if len( list ) > (index + 1):
201     if list[index + 1] == ( list[index] + 1 ):
202     return -2
203     return -1
204     return list[ len(list) - 1 ]
205    
206     def prepString( self, list ):
207     s = ""
208     flag = 0
209     for i in range( len( list ) ):
210     if flag == 0:
211     s = str( list[i] )
212     flag = self.setFlag( list, i )
213     elif flag == -1:
214     s = s + "," + str( list[i] )
215     flag = self.setFlag( list, i )
216     elif flag == -2:
217     flag = self.setFlag( list, i )
218     if flag == -1:
219     s = s + ":" + str( list[i] )
220     if flag > 0:
221     s = s + ":" + str( list[i] )
222     return s