ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Boss.py
Revision: 1.31
Committed: Mon May 5 14:44:23 2008 UTC (16 years, 11 months ago) by spiga
Content type: text/x-python
Branch: MAIN
Changes since 1.30: +2 -5 lines
Log Message:
missed a change for scheduler specific params configuration

File Contents

# User Rev Content
1 slacapra 1.1 from crab_logger import Logger
2     from crab_exceptions import *
3     from crab_util import *
4     import common
5     import os, time, shutil
6 spiga 1.17 import traceback
7 slacapra 1.1
8 spiga 1.4 from ProdCommon.BossLite.API.BossLiteAPI import BossLiteAPI
9    
10    
11     from ProdCommon.BossLite.DbObjects.Job import Job
12     from ProdCommon.BossLite.DbObjects.Task import Task
13     from ProdCommon.BossLite.DbObjects.RunningJob import RunningJob
14    
15 spiga 1.17 from ProdCommon.BossLite.Common.Exceptions import SchedulerError
16 spiga 1.4 from ProdCommon.BossLite.API.BossLiteAPISched import BossLiteAPISched
17 slacapra 1.1
18     class Boss:
19     def __init__(self):
20 spiga 1.4
21 slacapra 1.21 self.session=None
22 slacapra 1.1 return
23 ewv 1.14
24 slacapra 1.21 def __del__(self):
25     """ destroy instance """
26     del self.session
27    
28 ewv 1.14 def configure(self,cfg_params):
29 spiga 1.9 self.cfg_params = cfg_params
30     self.schedulerName = self.cfg_params.get("CRAB.scheduler",'') # this should match with the bosslite requirements
31     self.rb_param_file=''
32     if (cfg_params.has_key('EDG.rb')):
33     self.rb_param_file=common.scheduler.rb_configure(cfg_params.get("EDG.rb"))
34 ewv 1.14 self.wms_service=cfg_params.get("EDG.wms_service",'')
35    
36 spiga 1.4
37 spiga 1.8 ## Add here the map for others Schedulers (LSF/CAF/CondorG)
38 ewv 1.30 SchedMap = {'glite': 'SchedulerGLiteAPI',
39 spiga 1.9 'glitecoll':'SchedulerGLiteAPI',\
40 ewv 1.30 'condor': 'SchedulerCondor',\
41     'condor_g': 'SchedulerCondorG',\
42     'glidein': 'SchedulerGlidein',\
43     'lsf': 'SchedulerLsf',\
44     'caf': 'SchedulerLsf'
45 ewv 1.23 }
46    
47 spiga 1.31 self.schedulerConfig = common.scheduler.realSchedParams(cfg_params)
48     self.schedulerConfig['name'] = SchedMap[(self.schedulerName).lower()
49 slacapra 1.21
50     self.session=None
51 slacapra 1.1 return
52    
53 ewv 1.23 def schedSession(self):
54     '''
55 spiga 1.17 Istantiate BossLiteApi session
56 ewv 1.23 '''
57 slacapra 1.21 if not self.session:
58 ewv 1.23 try:
59 slacapra 1.21 self.session = BossLiteAPISched( common.bossSession, self.schedulerConfig)
60     except Exception, e :
61     common.logger.debug(3, "Istantiate SchedSession: " +str(traceback.format_exc()))
62     raise CrabException('Scheduler Session: '+str(e))
63     return self.session
64 spiga 1.17
65 ewv 1.14 def declare(self, nj):
66 slacapra 1.1 """
67     BOSS declaration of jobs
68     """
69 spiga 1.4 index = nj - 1
70     job = common.job_list[index]
71     jbt = job.type()
72     base = jbt.name()
73    
74 spiga 1.24 wrapper = os.path.basename(str(common._db.queryTask('scriptName')))
75 spiga 1.8 listField=[]
76 ewv 1.14 listID=[]
77 spiga 1.8 task=common._db.getTask()
78 spiga 1.4 for id in range(nj):
79     parameters={}
80     jobs=[]
81 ewv 1.14 out=[]
82 spiga 1.8 stdout = base +'_'+ str(id+1)+'.stdout'
83     stderr = base +'_'+ str(id+1)+'.stderr'
84 spiga 1.4 jobs.append(id)
85 spiga 1.8 out=task.jobs[id]['outputFiles']
86 spiga 1.16 # out.append(stdout)
87     # out.append(stderr)
88 spiga 1.18 # out.append('.BrokerInfo')
89 spiga 1.19 ## To be better understood if it is needed
90     out.append('crab_fjr_'+str(id+1)+'.xml')
91 ewv 1.14 parameters['outputFiles']=out
92 spiga 1.8 parameters['executable']=wrapper
93 spiga 1.4 parameters['standardOutput'] = stdout
94     parameters['standardError'] = stderr
95 spiga 1.8 listField.append(parameters)
96 ewv 1.14 listID.append(id+1)
97 spiga 1.8 common._db.updateJob_( listID, listField)
98 slacapra 1.1
99 ewv 1.14 return
100 slacapra 1.1
101 spiga 1.10 def listMatch(self, tags, dest, whiteL, blackL ):
102 slacapra 1.1 """
103     Check the compatibility of available resources
104     """
105 spiga 1.17 try:
106 afanfani 1.26 sites = self.schedSession().lcgInfo(tags, seList=dest, blacklist=blackL, whitelist=whiteL)
107 spiga 1.17 except SchedulerError, err :
108     common.logger.message("Warning: List Match operation failed with message: " +str(err))
109     common.logger.debug(3, "List Match failed: " +str(traceback.format_exc()))
110    
111 slacapra 1.25 return sites
112 ewv 1.14
113 spiga 1.28 def submit(self, taskId, jobsList, req):
114 slacapra 1.1 """
115     Submit BOSS function.
116     Submit one job. nj -- job number.
117     """
118 ewv 1.23 try:
119 spiga 1.28 self.schedSession().submit( taskId, jobsList,req )
120 spiga 1.17 except SchedulerError, err :
121 spiga 1.20 common.logger.message("Submit: " +str(err))
122     common.logger.debug(3, "Submit: " +str(traceback.format_exc()))
123     raise CrabException('Submit: '+str(err))
124 ewv 1.14
125     return
126 spiga 1.10
127     def queryEverything(self,taskid):
128     """
129 spiga 1.15 Query needed info of all jobs with specified taskid
130 spiga 1.10 """
131    
132 spiga 1.17 try:
133 slacapra 1.21 statusRes = self.schedSession().query( str(taskid))
134 spiga 1.17 except SchedulerError, err :
135     common.logger.message("Status Query : " +str(err))
136     common.logger.debug(3, "Status Query : " +str(traceback.format_exc()))
137     raise CrabException('Status Query : '+str(err))
138    
139 spiga 1.15 return statusRes
140 slacapra 1.1
141 spiga 1.15 def getOutput(self,taskId,jobRange, outdir):
142 slacapra 1.1 """
143 spiga 1.15 Retrieve output of all jobs with specified taskid
144 slacapra 1.1 """
145 ewv 1.23 try:
146 slacapra 1.21 self.schedSession().getOutput( taskId, jobRange, outdir )
147 spiga 1.17 except SchedulerError, err :
148     common.logger.message("GetOutput : " +str(err))
149     common.logger.debug(3, "GetOutput : " +str(traceback.format_exc()))
150     raise CrabException('GetOutput : '+str(err))
151 ewv 1.23
152 slacapra 1.1 return
153    
154 slacapra 1.21 def cancel(self,list):
155 slacapra 1.1 """
156 slacapra 1.21 Cancel the job with id from a list
157 slacapra 1.1 """
158 slacapra 1.21 task = common._db.getTask(list)
159 ewv 1.23 try:
160 slacapra 1.21 self.schedSession().kill( task, list)
161     except SchedulerError, err :
162     common.logger.message("Kill: " +str(err))
163     common.logger.debug(3, "Kill: " +str(traceback.format_exc()))
164     raise CrabException('Kill: '+str(err))
165 slacapra 1.1 return
166    
167 spiga 1.22 def LoggingInfo(self,list_id,outfile):
168     """
169     query the logging info with id from a list and
170 ewv 1.23 retourn the reults
171     """
172     try:
173 spiga 1.22 self.schedSession().postMortem(1,list_id,outfile)
174     except SchedulerError, err :
175     common.logger.message("logginginfo: " +str(err))
176     common.logger.debug(3, "logginginfo: " +str(traceback.format_exc()))
177     raise CrabException('logginginfo: '+str(err))
178     return
179    
180 spiga 1.28 def writeJDL(self, taskId,jobsList,req):
181     """
182     JDL description BOSS function.
183     """
184     try:
185     jdl = self.schedSession().jobDescription( taskId,jobsList,req )
186     except SchedulerError, err :
187     common.logger.message("writeJDL: " +str(err))
188     common.logger.debug(3, "writeJDL: " +str(traceback.format_exc()))
189     raise CrabException('writeJDL: '+str(err))
190    
191     return jdl
192    
193 slacapra 1.1 def setFlag( self, list, index ):
194     if len( list ) > (index + 1):
195     if list[index + 1] == ( list[index] + 1 ):
196     return -2
197     return -1
198     return list[ len(list) - 1 ]
199    
200     def prepString( self, list ):
201     s = ""
202     flag = 0
203     for i in range( len( list ) ):
204     if flag == 0:
205     s = str( list[i] )
206     flag = self.setFlag( list, i )
207     elif flag == -1:
208     s = s + "," + str( list[i] )
209     flag = self.setFlag( list, i )
210     elif flag == -2:
211     flag = self.setFlag( list, i )
212     if flag == -1:
213     s = s + ":" + str( list[i] )
214     if flag > 0:
215     s = s + ":" + str( list[i] )
216     return s