1 |
slacapra |
1.1 |
from crab_logger import Logger
|
2 |
|
|
from crab_exceptions import *
|
3 |
|
|
from crab_util import *
|
4 |
|
|
import common
|
5 |
|
|
import os, time, shutil
|
6 |
spiga |
1.17 |
import traceback
|
7 |
slacapra |
1.1 |
|
8 |
spiga |
1.4 |
from ProdCommon.BossLite.API.BossLiteAPI import BossLiteAPI
|
9 |
|
|
|
10 |
|
|
|
11 |
|
|
from ProdCommon.BossLite.DbObjects.Job import Job
|
12 |
|
|
from ProdCommon.BossLite.DbObjects.Task import Task
|
13 |
|
|
from ProdCommon.BossLite.DbObjects.RunningJob import RunningJob
|
14 |
|
|
|
15 |
spiga |
1.17 |
from ProdCommon.BossLite.Common.Exceptions import SchedulerError
|
16 |
spiga |
1.4 |
from ProdCommon.BossLite.API.BossLiteAPISched import BossLiteAPISched
|
17 |
slacapra |
1.1 |
|
18 |
|
|
class Boss:
|
19 |
|
|
def __init__(self):
|
20 |
spiga |
1.4 |
|
21 |
slacapra |
1.21 |
self.session=None
|
22 |
slacapra |
1.1 |
return
|
23 |
ewv |
1.14 |
|
24 |
slacapra |
1.21 |
def __del__(self):
|
25 |
|
|
""" destroy instance """
|
26 |
|
|
del self.session
|
27 |
|
|
|
28 |
ewv |
1.14 |
def configure(self,cfg_params):
|
29 |
spiga |
1.9 |
self.cfg_params = cfg_params
|
30 |
|
|
self.schedulerName = self.cfg_params.get("CRAB.scheduler",'') # this should match with the bosslite requirements
|
31 |
|
|
self.rb_param_file=''
|
32 |
|
|
if (cfg_params.has_key('EDG.rb')):
|
33 |
|
|
self.rb_param_file=common.scheduler.rb_configure(cfg_params.get("EDG.rb"))
|
34 |
ewv |
1.14 |
self.wms_service=cfg_params.get("EDG.wms_service",'')
|
35 |
|
|
|
36 |
spiga |
1.4 |
|
37 |
spiga |
1.8 |
## Add here the map for others Schedulers (LSF/CAF/CondorG)
|
38 |
ewv |
1.30 |
SchedMap = {'glite': 'SchedulerGLiteAPI',
|
39 |
spiga |
1.9 |
'glitecoll':'SchedulerGLiteAPI',\
|
40 |
ewv |
1.30 |
'condor': 'SchedulerCondor',\
|
41 |
|
|
'condor_g': 'SchedulerCondorG',\
|
42 |
|
|
'glidein': 'SchedulerGlidein',\
|
43 |
|
|
'lsf': 'SchedulerLsf',\
|
44 |
slacapra |
1.33 |
'caf': 'SchedulerLsf',\
|
45 |
|
|
'sge': 'SchedulerSge'
|
46 |
ewv |
1.23 |
}
|
47 |
|
|
|
48 |
spiga |
1.31 |
self.schedulerConfig = common.scheduler.realSchedParams(cfg_params)
|
49 |
ewv |
1.32 |
self.schedulerConfig['name'] = SchedMap[(self.schedulerName).lower()]
|
50 |
slacapra |
1.21 |
|
51 |
ewv |
1.32 |
self.session = None
|
52 |
slacapra |
1.1 |
return
|
53 |
|
|
|
54 |
ewv |
1.23 |
def schedSession(self):
|
55 |
|
|
'''
|
56 |
spiga |
1.17 |
Istantiate BossLiteApi session
|
57 |
ewv |
1.23 |
'''
|
58 |
slacapra |
1.21 |
if not self.session:
|
59 |
ewv |
1.23 |
try:
|
60 |
slacapra |
1.21 |
self.session = BossLiteAPISched( common.bossSession, self.schedulerConfig)
|
61 |
|
|
except Exception, e :
|
62 |
|
|
common.logger.debug(3, "Istantiate SchedSession: " +str(traceback.format_exc()))
|
63 |
|
|
raise CrabException('Scheduler Session: '+str(e))
|
64 |
|
|
return self.session
|
65 |
spiga |
1.17 |
|
66 |
ewv |
1.14 |
def declare(self, nj):
|
67 |
slacapra |
1.1 |
"""
|
68 |
|
|
BOSS declaration of jobs
|
69 |
|
|
"""
|
70 |
spiga |
1.4 |
index = nj - 1
|
71 |
|
|
job = common.job_list[index]
|
72 |
|
|
jbt = job.type()
|
73 |
|
|
base = jbt.name()
|
74 |
|
|
|
75 |
spiga |
1.24 |
wrapper = os.path.basename(str(common._db.queryTask('scriptName')))
|
76 |
spiga |
1.8 |
listField=[]
|
77 |
ewv |
1.14 |
listID=[]
|
78 |
spiga |
1.8 |
task=common._db.getTask()
|
79 |
spiga |
1.4 |
for id in range(nj):
|
80 |
|
|
parameters={}
|
81 |
|
|
jobs=[]
|
82 |
ewv |
1.14 |
out=[]
|
83 |
spiga |
1.8 |
stdout = base +'_'+ str(id+1)+'.stdout'
|
84 |
|
|
stderr = base +'_'+ str(id+1)+'.stderr'
|
85 |
spiga |
1.4 |
jobs.append(id)
|
86 |
spiga |
1.8 |
out=task.jobs[id]['outputFiles']
|
87 |
spiga |
1.16 |
# out.append(stdout)
|
88 |
|
|
# out.append(stderr)
|
89 |
spiga |
1.18 |
# out.append('.BrokerInfo')
|
90 |
spiga |
1.19 |
## To be better understood if it is needed
|
91 |
|
|
out.append('crab_fjr_'+str(id+1)+'.xml')
|
92 |
ewv |
1.14 |
parameters['outputFiles']=out
|
93 |
spiga |
1.8 |
parameters['executable']=wrapper
|
94 |
spiga |
1.4 |
parameters['standardOutput'] = stdout
|
95 |
|
|
parameters['standardError'] = stderr
|
96 |
spiga |
1.8 |
listField.append(parameters)
|
97 |
ewv |
1.14 |
listID.append(id+1)
|
98 |
spiga |
1.8 |
common._db.updateJob_( listID, listField)
|
99 |
slacapra |
1.1 |
|
100 |
ewv |
1.14 |
return
|
101 |
slacapra |
1.1 |
|
102 |
spiga |
1.10 |
def listMatch(self, tags, dest, whiteL, blackL ):
|
103 |
slacapra |
1.1 |
"""
|
104 |
|
|
Check the compatibility of available resources
|
105 |
|
|
"""
|
106 |
spiga |
1.17 |
try:
|
107 |
afanfani |
1.26 |
sites = self.schedSession().lcgInfo(tags, seList=dest, blacklist=blackL, whitelist=whiteL)
|
108 |
spiga |
1.17 |
except SchedulerError, err :
|
109 |
|
|
common.logger.message("Warning: List Match operation failed with message: " +str(err))
|
110 |
|
|
common.logger.debug(3, "List Match failed: " +str(traceback.format_exc()))
|
111 |
|
|
|
112 |
slacapra |
1.25 |
return sites
|
113 |
ewv |
1.14 |
|
114 |
spiga |
1.28 |
def submit(self, taskId, jobsList, req):
|
115 |
slacapra |
1.1 |
"""
|
116 |
|
|
Submit BOSS function.
|
117 |
|
|
Submit one job. nj -- job number.
|
118 |
|
|
"""
|
119 |
ewv |
1.23 |
try:
|
120 |
spiga |
1.34 |
task_sub = self.schedSession().submit( taskId, jobsList,req )
|
121 |
|
|
wms = task_sub.jobs[0].runningJob['service']
|
122 |
|
|
common.logger.write("WMS : " +str(wms))
|
123 |
spiga |
1.17 |
except SchedulerError, err :
|
124 |
spiga |
1.20 |
common.logger.message("Submit: " +str(err))
|
125 |
|
|
common.logger.debug(3, "Submit: " +str(traceback.format_exc()))
|
126 |
|
|
raise CrabException('Submit: '+str(err))
|
127 |
ewv |
1.14 |
|
128 |
|
|
return
|
129 |
spiga |
1.10 |
|
130 |
|
|
def queryEverything(self,taskid):
|
131 |
|
|
"""
|
132 |
spiga |
1.15 |
Query needed info of all jobs with specified taskid
|
133 |
spiga |
1.10 |
"""
|
134 |
|
|
|
135 |
spiga |
1.17 |
try:
|
136 |
slacapra |
1.21 |
statusRes = self.schedSession().query( str(taskid))
|
137 |
spiga |
1.17 |
except SchedulerError, err :
|
138 |
|
|
common.logger.message("Status Query : " +str(err))
|
139 |
|
|
common.logger.debug(3, "Status Query : " +str(traceback.format_exc()))
|
140 |
|
|
raise CrabException('Status Query : '+str(err))
|
141 |
|
|
|
142 |
spiga |
1.15 |
return statusRes
|
143 |
slacapra |
1.1 |
|
144 |
spiga |
1.15 |
def getOutput(self,taskId,jobRange, outdir):
|
145 |
slacapra |
1.1 |
"""
|
146 |
spiga |
1.15 |
Retrieve output of all jobs with specified taskid
|
147 |
slacapra |
1.1 |
"""
|
148 |
ewv |
1.23 |
try:
|
149 |
slacapra |
1.21 |
self.schedSession().getOutput( taskId, jobRange, outdir )
|
150 |
spiga |
1.17 |
except SchedulerError, err :
|
151 |
|
|
common.logger.message("GetOutput : " +str(err))
|
152 |
|
|
common.logger.debug(3, "GetOutput : " +str(traceback.format_exc()))
|
153 |
|
|
raise CrabException('GetOutput : '+str(err))
|
154 |
ewv |
1.23 |
|
155 |
slacapra |
1.1 |
return
|
156 |
|
|
|
157 |
slacapra |
1.21 |
def cancel(self,list):
|
158 |
slacapra |
1.1 |
"""
|
159 |
slacapra |
1.21 |
Cancel the job with id from a list
|
160 |
slacapra |
1.1 |
"""
|
161 |
slacapra |
1.21 |
task = common._db.getTask(list)
|
162 |
ewv |
1.23 |
try:
|
163 |
slacapra |
1.21 |
self.schedSession().kill( task, list)
|
164 |
|
|
except SchedulerError, err :
|
165 |
|
|
common.logger.message("Kill: " +str(err))
|
166 |
|
|
common.logger.debug(3, "Kill: " +str(traceback.format_exc()))
|
167 |
|
|
raise CrabException('Kill: '+str(err))
|
168 |
slacapra |
1.1 |
return
|
169 |
|
|
|
170 |
spiga |
1.22 |
def LoggingInfo(self,list_id,outfile):
|
171 |
|
|
"""
|
172 |
|
|
query the logging info with id from a list and
|
173 |
ewv |
1.23 |
retourn the reults
|
174 |
|
|
"""
|
175 |
|
|
try:
|
176 |
spiga |
1.22 |
self.schedSession().postMortem(1,list_id,outfile)
|
177 |
|
|
except SchedulerError, err :
|
178 |
|
|
common.logger.message("logginginfo: " +str(err))
|
179 |
|
|
common.logger.debug(3, "logginginfo: " +str(traceback.format_exc()))
|
180 |
|
|
raise CrabException('logginginfo: '+str(err))
|
181 |
|
|
return
|
182 |
|
|
|
183 |
spiga |
1.28 |
def writeJDL(self, taskId,jobsList,req):
|
184 |
|
|
"""
|
185 |
|
|
JDL description BOSS function.
|
186 |
|
|
"""
|
187 |
|
|
try:
|
188 |
|
|
jdl = self.schedSession().jobDescription( taskId,jobsList,req )
|
189 |
|
|
except SchedulerError, err :
|
190 |
|
|
common.logger.message("writeJDL: " +str(err))
|
191 |
|
|
common.logger.debug(3, "writeJDL: " +str(traceback.format_exc()))
|
192 |
|
|
raise CrabException('writeJDL: '+str(err))
|
193 |
|
|
|
194 |
|
|
return jdl
|
195 |
|
|
|
196 |
slacapra |
1.1 |
def setFlag( self, list, index ):
|
197 |
|
|
if len( list ) > (index + 1):
|
198 |
|
|
if list[index + 1] == ( list[index] + 1 ):
|
199 |
|
|
return -2
|
200 |
|
|
return -1
|
201 |
|
|
return list[ len(list) - 1 ]
|
202 |
|
|
|
203 |
|
|
def prepString( self, list ):
|
204 |
|
|
s = ""
|
205 |
|
|
flag = 0
|
206 |
|
|
for i in range( len( list ) ):
|
207 |
|
|
if flag == 0:
|
208 |
|
|
s = str( list[i] )
|
209 |
|
|
flag = self.setFlag( list, i )
|
210 |
|
|
elif flag == -1:
|
211 |
|
|
s = s + "," + str( list[i] )
|
212 |
|
|
flag = self.setFlag( list, i )
|
213 |
|
|
elif flag == -2:
|
214 |
|
|
flag = self.setFlag( list, i )
|
215 |
|
|
if flag == -1:
|
216 |
|
|
s = s + ":" + str( list[i] )
|
217 |
|
|
if flag > 0:
|
218 |
|
|
s = s + ":" + str( list[i] )
|
219 |
|
|
return s
|