ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Boss.py
Revision: 1.34
Committed: Fri May 30 09:06:50 2008 UTC (16 years, 11 months ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_2_2_pre1, CRAB_2_2_1, CRAB_2_2_1_pre6, CRAB_2_2_1_pre5
Changes since 1.33: +3 -1 lines
Log Message:
write in the log the used service (one for each collection)

File Contents

# User Rev Content
1 slacapra 1.1 from crab_logger import Logger
2     from crab_exceptions import *
3     from crab_util import *
4     import common
5     import os, time, shutil
6 spiga 1.17 import traceback
7 slacapra 1.1
8 spiga 1.4 from ProdCommon.BossLite.API.BossLiteAPI import BossLiteAPI
9    
10    
11     from ProdCommon.BossLite.DbObjects.Job import Job
12     from ProdCommon.BossLite.DbObjects.Task import Task
13     from ProdCommon.BossLite.DbObjects.RunningJob import RunningJob
14    
15 spiga 1.17 from ProdCommon.BossLite.Common.Exceptions import SchedulerError
16 spiga 1.4 from ProdCommon.BossLite.API.BossLiteAPISched import BossLiteAPISched
17 slacapra 1.1
18     class Boss:
19     def __init__(self):
20 spiga 1.4
21 slacapra 1.21 self.session=None
22 slacapra 1.1 return
23 ewv 1.14
24 slacapra 1.21 def __del__(self):
25     """ destroy instance """
26     del self.session
27    
28 ewv 1.14 def configure(self,cfg_params):
29 spiga 1.9 self.cfg_params = cfg_params
30     self.schedulerName = self.cfg_params.get("CRAB.scheduler",'') # this should match with the bosslite requirements
31     self.rb_param_file=''
32     if (cfg_params.has_key('EDG.rb')):
33     self.rb_param_file=common.scheduler.rb_configure(cfg_params.get("EDG.rb"))
34 ewv 1.14 self.wms_service=cfg_params.get("EDG.wms_service",'')
35    
36 spiga 1.4
37 spiga 1.8 ## Add here the map for others Schedulers (LSF/CAF/CondorG)
38 ewv 1.30 SchedMap = {'glite': 'SchedulerGLiteAPI',
39 spiga 1.9 'glitecoll':'SchedulerGLiteAPI',\
40 ewv 1.30 'condor': 'SchedulerCondor',\
41     'condor_g': 'SchedulerCondorG',\
42     'glidein': 'SchedulerGlidein',\
43     'lsf': 'SchedulerLsf',\
44 slacapra 1.33 'caf': 'SchedulerLsf',\
45     'sge': 'SchedulerSge'
46 ewv 1.23 }
47    
48 spiga 1.31 self.schedulerConfig = common.scheduler.realSchedParams(cfg_params)
49 ewv 1.32 self.schedulerConfig['name'] = SchedMap[(self.schedulerName).lower()]
50 slacapra 1.21
51 ewv 1.32 self.session = None
52 slacapra 1.1 return
53    
54 ewv 1.23 def schedSession(self):
55     '''
56 spiga 1.17 Istantiate BossLiteApi session
57 ewv 1.23 '''
58 slacapra 1.21 if not self.session:
59 ewv 1.23 try:
60 slacapra 1.21 self.session = BossLiteAPISched( common.bossSession, self.schedulerConfig)
61     except Exception, e :
62     common.logger.debug(3, "Istantiate SchedSession: " +str(traceback.format_exc()))
63     raise CrabException('Scheduler Session: '+str(e))
64     return self.session
65 spiga 1.17
66 ewv 1.14 def declare(self, nj):
67 slacapra 1.1 """
68     BOSS declaration of jobs
69     """
70 spiga 1.4 index = nj - 1
71     job = common.job_list[index]
72     jbt = job.type()
73     base = jbt.name()
74    
75 spiga 1.24 wrapper = os.path.basename(str(common._db.queryTask('scriptName')))
76 spiga 1.8 listField=[]
77 ewv 1.14 listID=[]
78 spiga 1.8 task=common._db.getTask()
79 spiga 1.4 for id in range(nj):
80     parameters={}
81     jobs=[]
82 ewv 1.14 out=[]
83 spiga 1.8 stdout = base +'_'+ str(id+1)+'.stdout'
84     stderr = base +'_'+ str(id+1)+'.stderr'
85 spiga 1.4 jobs.append(id)
86 spiga 1.8 out=task.jobs[id]['outputFiles']
87 spiga 1.16 # out.append(stdout)
88     # out.append(stderr)
89 spiga 1.18 # out.append('.BrokerInfo')
90 spiga 1.19 ## To be better understood if it is needed
91     out.append('crab_fjr_'+str(id+1)+'.xml')
92 ewv 1.14 parameters['outputFiles']=out
93 spiga 1.8 parameters['executable']=wrapper
94 spiga 1.4 parameters['standardOutput'] = stdout
95     parameters['standardError'] = stderr
96 spiga 1.8 listField.append(parameters)
97 ewv 1.14 listID.append(id+1)
98 spiga 1.8 common._db.updateJob_( listID, listField)
99 slacapra 1.1
100 ewv 1.14 return
101 slacapra 1.1
102 spiga 1.10 def listMatch(self, tags, dest, whiteL, blackL ):
103 slacapra 1.1 """
104     Check the compatibility of available resources
105     """
106 spiga 1.17 try:
107 afanfani 1.26 sites = self.schedSession().lcgInfo(tags, seList=dest, blacklist=blackL, whitelist=whiteL)
108 spiga 1.17 except SchedulerError, err :
109     common.logger.message("Warning: List Match operation failed with message: " +str(err))
110     common.logger.debug(3, "List Match failed: " +str(traceback.format_exc()))
111    
112 slacapra 1.25 return sites
113 ewv 1.14
114 spiga 1.28 def submit(self, taskId, jobsList, req):
115 slacapra 1.1 """
116     Submit BOSS function.
117     Submit one job. nj -- job number.
118     """
119 ewv 1.23 try:
120 spiga 1.34 task_sub = self.schedSession().submit( taskId, jobsList,req )
121     wms = task_sub.jobs[0].runningJob['service']
122     common.logger.write("WMS : " +str(wms))
123 spiga 1.17 except SchedulerError, err :
124 spiga 1.20 common.logger.message("Submit: " +str(err))
125     common.logger.debug(3, "Submit: " +str(traceback.format_exc()))
126     raise CrabException('Submit: '+str(err))
127 ewv 1.14
128     return
129 spiga 1.10
130     def queryEverything(self,taskid):
131     """
132 spiga 1.15 Query needed info of all jobs with specified taskid
133 spiga 1.10 """
134    
135 spiga 1.17 try:
136 slacapra 1.21 statusRes = self.schedSession().query( str(taskid))
137 spiga 1.17 except SchedulerError, err :
138     common.logger.message("Status Query : " +str(err))
139     common.logger.debug(3, "Status Query : " +str(traceback.format_exc()))
140     raise CrabException('Status Query : '+str(err))
141    
142 spiga 1.15 return statusRes
143 slacapra 1.1
144 spiga 1.15 def getOutput(self,taskId,jobRange, outdir):
145 slacapra 1.1 """
146 spiga 1.15 Retrieve output of all jobs with specified taskid
147 slacapra 1.1 """
148 ewv 1.23 try:
149 slacapra 1.21 self.schedSession().getOutput( taskId, jobRange, outdir )
150 spiga 1.17 except SchedulerError, err :
151     common.logger.message("GetOutput : " +str(err))
152     common.logger.debug(3, "GetOutput : " +str(traceback.format_exc()))
153     raise CrabException('GetOutput : '+str(err))
154 ewv 1.23
155 slacapra 1.1 return
156    
157 slacapra 1.21 def cancel(self,list):
158 slacapra 1.1 """
159 slacapra 1.21 Cancel the job with id from a list
160 slacapra 1.1 """
161 slacapra 1.21 task = common._db.getTask(list)
162 ewv 1.23 try:
163 slacapra 1.21 self.schedSession().kill( task, list)
164     except SchedulerError, err :
165     common.logger.message("Kill: " +str(err))
166     common.logger.debug(3, "Kill: " +str(traceback.format_exc()))
167     raise CrabException('Kill: '+str(err))
168 slacapra 1.1 return
169    
170 spiga 1.22 def LoggingInfo(self,list_id,outfile):
171     """
172     query the logging info with id from a list and
173 ewv 1.23 retourn the reults
174     """
175     try:
176 spiga 1.22 self.schedSession().postMortem(1,list_id,outfile)
177     except SchedulerError, err :
178     common.logger.message("logginginfo: " +str(err))
179     common.logger.debug(3, "logginginfo: " +str(traceback.format_exc()))
180     raise CrabException('logginginfo: '+str(err))
181     return
182    
183 spiga 1.28 def writeJDL(self, taskId,jobsList,req):
184     """
185     JDL description BOSS function.
186     """
187     try:
188     jdl = self.schedSession().jobDescription( taskId,jobsList,req )
189     except SchedulerError, err :
190     common.logger.message("writeJDL: " +str(err))
191     common.logger.debug(3, "writeJDL: " +str(traceback.format_exc()))
192     raise CrabException('writeJDL: '+str(err))
193    
194     return jdl
195    
196 slacapra 1.1 def setFlag( self, list, index ):
197     if len( list ) > (index + 1):
198     if list[index + 1] == ( list[index] + 1 ):
199     return -2
200     return -1
201     return list[ len(list) - 1 ]
202    
203     def prepString( self, list ):
204     s = ""
205     flag = 0
206     for i in range( len( list ) ):
207     if flag == 0:
208     s = str( list[i] )
209     flag = self.setFlag( list, i )
210     elif flag == -1:
211     s = s + "," + str( list[i] )
212     flag = self.setFlag( list, i )
213     elif flag == -2:
214     flag = self.setFlag( list, i )
215     if flag == -1:
216     s = s + ":" + str( list[i] )
217     if flag > 0:
218     s = s + ":" + str( list[i] )
219     return s