1 |
slacapra |
1.1 |
from crab_logger import Logger
|
2 |
|
|
from crab_exceptions import *
|
3 |
|
|
from crab_util import *
|
4 |
|
|
import common
|
5 |
|
|
import os, time, shutil
|
6 |
spiga |
1.17 |
import traceback
|
7 |
slacapra |
1.1 |
|
8 |
spiga |
1.4 |
from ProdCommon.BossLite.API.BossLiteAPI import BossLiteAPI
|
9 |
|
|
|
10 |
|
|
|
11 |
|
|
from ProdCommon.BossLite.DbObjects.Job import Job
|
12 |
|
|
from ProdCommon.BossLite.DbObjects.Task import Task
|
13 |
|
|
from ProdCommon.BossLite.DbObjects.RunningJob import RunningJob
|
14 |
|
|
|
15 |
spiga |
1.17 |
from ProdCommon.BossLite.Common.Exceptions import SchedulerError
|
16 |
spiga |
1.4 |
from ProdCommon.BossLite.API.BossLiteAPISched import BossLiteAPISched
|
17 |
slacapra |
1.1 |
|
18 |
|
|
class Boss:
|
19 |
|
|
def __init__(self):
|
20 |
spiga |
1.4 |
|
21 |
slacapra |
1.1 |
return
|
22 |
ewv |
1.14 |
|
23 |
|
|
def configure(self,cfg_params):
|
24 |
spiga |
1.9 |
self.cfg_params = cfg_params
|
25 |
|
|
self.schedulerName = self.cfg_params.get("CRAB.scheduler",'') # this should match with the bosslite requirements
|
26 |
|
|
self.rb_param_file=''
|
27 |
|
|
if (cfg_params.has_key('EDG.rb')):
|
28 |
|
|
self.rb_param_file=common.scheduler.rb_configure(cfg_params.get("EDG.rb"))
|
29 |
ewv |
1.14 |
self.wms_service=cfg_params.get("EDG.wms_service",'')
|
30 |
|
|
|
31 |
spiga |
1.4 |
|
32 |
spiga |
1.8 |
## Add here the map for others Schedulers (LSF/CAF/CondorG)
|
33 |
ewv |
1.14 |
SchedMap = {'glite':'SchedulerGLiteAPI',
|
34 |
spiga |
1.9 |
'glitecoll':'SchedulerGLiteAPI',\
|
35 |
ewv |
1.14 |
'condor_g':'SchedulerCondorGAPI',\
|
36 |
spiga |
1.9 |
'lsf':'',\
|
37 |
ewv |
1.14 |
'caf':''
|
38 |
|
|
}
|
39 |
|
|
|
40 |
spiga |
1.16 |
self.schedulerConfig = {
|
41 |
spiga |
1.9 |
'name' : SchedMap[self.schedulerName], \
|
42 |
|
|
'service' : self.wms_service, \
|
43 |
ewv |
1.14 |
'config' : self.rb_param_file
|
44 |
slacapra |
1.7 |
}
|
45 |
slacapra |
1.1 |
return
|
46 |
|
|
|
47 |
spiga |
1.17 |
def schedSession(self):
|
48 |
|
|
'''
|
49 |
|
|
Istantiate BossLiteApi session
|
50 |
|
|
'''
|
51 |
|
|
try:
|
52 |
|
|
session = BossLiteAPISched( common.bossSession, self.schedulerConfig)
|
53 |
|
|
except Exception, e :
|
54 |
|
|
common.logger.debug(3, "Istantiate SchedSession: " +str(traceback.format_exc()))
|
55 |
|
|
raise CrabException('Scheduler Session: '+str(e))
|
56 |
|
|
return session
|
57 |
|
|
|
58 |
ewv |
1.14 |
def declare(self, nj):
|
59 |
slacapra |
1.1 |
"""
|
60 |
|
|
BOSS declaration of jobs
|
61 |
|
|
"""
|
62 |
spiga |
1.4 |
index = nj - 1
|
63 |
|
|
job = common.job_list[index]
|
64 |
|
|
jbt = job.type()
|
65 |
|
|
base = jbt.name()
|
66 |
|
|
|
67 |
spiga |
1.13 |
wrapper = os.path.basename(str(common._db.queryTask('scriptName')))
|
68 |
spiga |
1.8 |
listField=[]
|
69 |
ewv |
1.14 |
listID=[]
|
70 |
spiga |
1.8 |
task=common._db.getTask()
|
71 |
spiga |
1.4 |
for id in range(nj):
|
72 |
|
|
parameters={}
|
73 |
|
|
jobs=[]
|
74 |
ewv |
1.14 |
out=[]
|
75 |
spiga |
1.8 |
stdout = base +'_'+ str(id+1)+'.stdout'
|
76 |
|
|
stderr = base +'_'+ str(id+1)+'.stderr'
|
77 |
spiga |
1.4 |
jobs.append(id)
|
78 |
spiga |
1.8 |
out=task.jobs[id]['outputFiles']
|
79 |
spiga |
1.16 |
# out.append(stdout)
|
80 |
|
|
# out.append(stderr)
|
81 |
spiga |
1.18 |
# out.append('.BrokerInfo')
|
82 |
spiga |
1.19 |
## To be better understood if it is needed
|
83 |
|
|
out.append('crab_fjr_'+str(id+1)+'.xml')
|
84 |
ewv |
1.14 |
parameters['outputFiles']=out
|
85 |
spiga |
1.8 |
parameters['executable']=wrapper
|
86 |
spiga |
1.4 |
parameters['standardOutput'] = stdout
|
87 |
|
|
parameters['standardError'] = stderr
|
88 |
spiga |
1.8 |
listField.append(parameters)
|
89 |
ewv |
1.14 |
listID.append(id+1)
|
90 |
spiga |
1.8 |
common._db.updateJob_( listID, listField)
|
91 |
slacapra |
1.1 |
|
92 |
ewv |
1.14 |
return
|
93 |
slacapra |
1.1 |
|
94 |
spiga |
1.10 |
def listMatch(self, tags, dest, whiteL, blackL ):
|
95 |
slacapra |
1.1 |
"""
|
96 |
|
|
Check the compatibility of available resources
|
97 |
|
|
"""
|
98 |
spiga |
1.17 |
schedSession = self.schedSession()
|
99 |
|
|
try:
|
100 |
|
|
sites = schedSession.lcgInfo(tags, dest, whiteL, blackL )
|
101 |
|
|
except SchedulerError, err :
|
102 |
|
|
common.logger.message("Warning: List Match operation failed with message: " +str(err))
|
103 |
|
|
common.logger.debug(3, "List Match failed: " +str(traceback.format_exc()))
|
104 |
|
|
|
105 |
ewv |
1.14 |
|
106 |
spiga |
1.9 |
# Tout = 120
|
107 |
|
|
# CEs=[]
|
108 |
|
|
# try:
|
109 |
|
|
# CEs=self.bossUser.schedListMatch( schedulerName, schcladstring, self.bossTask.id(), "", Tout)
|
110 |
|
|
# common.logger.debug(1,"CEs :"+str(CEs))
|
111 |
|
|
# except SchedulerError,e:
|
112 |
|
|
# common.logger.message( "Warning : Scheduler interaction in list-match operation failed for jobs:")
|
113 |
|
|
# common.logger.message( e.__str__())
|
114 |
|
|
# pass
|
115 |
|
|
# except BossError,e:
|
116 |
|
|
# raise CrabException("ERROR: listMatch failed with message " + e.__str__())
|
117 |
|
|
# return CEs
|
118 |
spiga |
1.10 |
return len(sites)
|
119 |
ewv |
1.14 |
|
120 |
spiga |
1.8 |
def submit(self, jobsList,req):
|
121 |
slacapra |
1.1 |
"""
|
122 |
|
|
Submit BOSS function.
|
123 |
|
|
Submit one job. nj -- job number.
|
124 |
|
|
"""
|
125 |
spiga |
1.17 |
schedSession = self.schedSession()
|
126 |
|
|
# schedSession = BossLiteAPISched( common.bossSession, self.schedulerConfig)
|
127 |
spiga |
1.11 |
task = common._db.getTask(jobsList)
|
128 |
spiga |
1.17 |
try:
|
129 |
|
|
schedSession.submit( task,jobsList,req )
|
130 |
|
|
except SchedulerError, err :
|
131 |
spiga |
1.20 |
common.logger.message("Submit: " +str(err))
|
132 |
|
|
common.logger.debug(3, "Submit: " +str(traceback.format_exc()))
|
133 |
|
|
raise CrabException('Submit: '+str(err))
|
134 |
ewv |
1.14 |
|
135 |
|
|
return
|
136 |
spiga |
1.10 |
|
137 |
|
|
def queryEverything(self,taskid):
|
138 |
|
|
"""
|
139 |
spiga |
1.15 |
Query needed info of all jobs with specified taskid
|
140 |
spiga |
1.10 |
"""
|
141 |
|
|
|
142 |
spiga |
1.17 |
schedSession = self.schedSession()
|
143 |
|
|
try:
|
144 |
|
|
statusRes = schedSession.query( str(taskid))
|
145 |
|
|
except SchedulerError, err :
|
146 |
|
|
common.logger.message("Status Query : " +str(err))
|
147 |
|
|
common.logger.debug(3, "Status Query : " +str(traceback.format_exc()))
|
148 |
|
|
raise CrabException('Status Query : '+str(err))
|
149 |
|
|
|
150 |
spiga |
1.15 |
return statusRes
|
151 |
slacapra |
1.1 |
|
152 |
spiga |
1.15 |
def getOutput(self,taskId,jobRange, outdir):
|
153 |
slacapra |
1.1 |
"""
|
154 |
spiga |
1.15 |
Retrieve output of all jobs with specified taskid
|
155 |
slacapra |
1.1 |
"""
|
156 |
spiga |
1.17 |
schedSession = self.schedSession()
|
157 |
|
|
try:
|
158 |
|
|
schedSession.getOutput( taskId, jobRange, outdir )
|
159 |
|
|
except SchedulerError, err :
|
160 |
|
|
common.logger.message("GetOutput : " +str(err))
|
161 |
|
|
common.logger.debug(3, "GetOutput : " +str(traceback.format_exc()))
|
162 |
|
|
raise CrabException('GetOutput : '+str(err))
|
163 |
spiga |
1.15 |
|
164 |
slacapra |
1.1 |
return
|
165 |
|
|
|
166 |
|
|
def cancel(self,subm_id):
|
167 |
|
|
"""
|
168 |
|
|
Cancel the EDG job with id: if id == -1, means all jobs.
|
169 |
|
|
"""
|
170 |
|
|
#print "CANCEL -------------------------"
|
171 |
|
|
#print "int_id ",int_id," nSubmitted ", common.jobDB.nSubmittedJobs()
|
172 |
ewv |
1.14 |
|
173 |
|
|
common.jobDB.load()
|
174 |
slacapra |
1.1 |
if len( subm_id ) > 0:
|
175 |
|
|
try:
|
176 |
|
|
subm_id.sort()
|
177 |
|
|
range = self.prepString( subm_id )
|
178 |
|
|
common.logger.message("Killing job # " + str(subm_id).replace("[","",1).replace("]","",1) )
|
179 |
|
|
Tout =len(subm_id)*60
|
180 |
|
|
self.bossTask.kill( range, Tout )
|
181 |
|
|
self.bossTask.load(ALL, range)
|
182 |
|
|
task = self.bossTask.jobsDict()
|
183 |
|
|
for k, v in task.iteritems():
|
184 |
|
|
k = int(k)
|
185 |
|
|
status = v['STATUS']
|
186 |
|
|
if k in subm_id and status == 'K':
|
187 |
|
|
common.jobDB.setStatus(k - 1, 'K')
|
188 |
|
|
except SchedulerError,e:
|
189 |
|
|
common.logger.message("Warning : Scheduler interaction on kill operation failed for jobs:"+ e.__str__())
|
190 |
|
|
pass
|
191 |
|
|
except BossError,e:
|
192 |
|
|
common.logger.message( e.__str__() + "\nError killing jobs # "+str(subm_id)+" . See log for details")
|
193 |
|
|
common.jobDB.save()
|
194 |
|
|
pass
|
195 |
|
|
else:
|
196 |
slacapra |
1.3 |
common.logger.message("\nNo job to be killed")
|
197 |
slacapra |
1.1 |
common.jobDB.save()
|
198 |
|
|
return
|
199 |
|
|
|
200 |
|
|
def setFlag( self, list, index ):
|
201 |
|
|
if len( list ) > (index + 1):
|
202 |
|
|
if list[index + 1] == ( list[index] + 1 ):
|
203 |
|
|
return -2
|
204 |
|
|
return -1
|
205 |
|
|
return list[ len(list) - 1 ]
|
206 |
|
|
|
207 |
|
|
def prepString( self, list ):
|
208 |
|
|
s = ""
|
209 |
|
|
flag = 0
|
210 |
|
|
for i in range( len( list ) ):
|
211 |
|
|
if flag == 0:
|
212 |
|
|
s = str( list[i] )
|
213 |
|
|
flag = self.setFlag( list, i )
|
214 |
|
|
elif flag == -1:
|
215 |
|
|
s = s + "," + str( list[i] )
|
216 |
|
|
flag = self.setFlag( list, i )
|
217 |
|
|
elif flag == -2:
|
218 |
|
|
flag = self.setFlag( list, i )
|
219 |
|
|
if flag == -1:
|
220 |
|
|
s = s + ":" + str( list[i] )
|
221 |
|
|
if flag > 0:
|
222 |
|
|
s = s + ":" + str( list[i] )
|
223 |
|
|
return s
|
224 |
|
|
|
225 |
|
|
|
226 |
|
|
|
227 |
|
|
|