1 |
spiga |
1.6 |
#!/usr/bin/env python
|
2 |
|
|
"""
|
3 |
|
|
basic SGE CLI interaction class
|
4 |
|
|
"""
|
5 |
|
|
|
6 |
|
|
__revision__ = "$Id: SchedulerSge.py,v 1.5 2009/01/20 18:49:45 gcodispo Exp $"
|
7 |
|
|
__version__ = "$Revision: 1.5 $"
|
8 |
|
|
|
9 |
|
|
import re, os
|
10 |
|
|
|
11 |
|
|
from ProdCommon.BossLite.Scheduler.SchedulerInterface import SchedulerInterface
|
12 |
|
|
from ProdCommon.BossLite.Common.Exceptions import SchedulerError
|
13 |
|
|
from ProdCommon.BossLite.DbObjects.Job import Job
|
14 |
|
|
from ProdCommon.BossLite.DbObjects.Task import Task
|
15 |
|
|
from ProdCommon.BossLite.DbObjects.RunningJob import RunningJob
|
16 |
|
|
|
17 |
|
|
class SchedulerSge (SchedulerInterface) :
|
18 |
|
|
"""
|
19 |
|
|
basic class to handle sge jobs
|
20 |
|
|
"""
|
21 |
|
|
def __init__( self, **args):
|
22 |
|
|
|
23 |
|
|
# call super class init method
|
24 |
|
|
super(SchedulerSge, self).__init__(**args)
|
25 |
|
|
self.statusMap = {
|
26 |
|
|
'd':'K',
|
27 |
|
|
'E':'DA',
|
28 |
|
|
'h':'R',
|
29 |
|
|
'r':'R',
|
30 |
|
|
'R':'R',
|
31 |
|
|
's':'R',
|
32 |
|
|
'S':'R',
|
33 |
|
|
't':'SS',
|
34 |
|
|
'T':'R',
|
35 |
|
|
'w':'R',
|
36 |
|
|
'qw':'R',
|
37 |
|
|
'Eqw':'K',
|
38 |
|
|
'DONE':'SD'
|
39 |
|
|
}
|
40 |
|
|
|
41 |
|
|
|
42 |
|
|
def checkUserProxy( self, cert='' ):
|
43 |
|
|
return
|
44 |
slacapra |
1.1 |
|
45 |
spiga |
1.6 |
def jobDescription ( self, obj, requirements='', config='', service = '' ):
|
46 |
|
|
"""
|
47 |
|
|
retrieve scheduler specific job description
|
48 |
|
|
return it as a string
|
49 |
|
|
"""
|
50 |
|
|
args=''
|
51 |
|
|
if type(obj) == RunningJob or type(obj) == Job:
|
52 |
|
|
return self.decode(obj, requirements)
|
53 |
|
|
elif type(obj) == Task :
|
54 |
|
|
task = obj
|
55 |
|
|
for job in task.getJobs() :
|
56 |
|
|
args += self.decode(job, task, requirements)+' \n'
|
57 |
|
|
return args
|
58 |
slacapra |
1.1 |
|
59 |
|
|
|
60 |
spiga |
1.6 |
def submit ( self, obj, requirements='', config='', service = '' ) :
|
61 |
|
|
"""
|
62 |
|
|
set up submission parameters and submit
|
63 |
slacapra |
1.1 |
|
64 |
spiga |
1.6 |
return jobAttributes, bulkId, service
|
65 |
slacapra |
1.1 |
|
66 |
spiga |
1.6 |
- jobAttributs is a map of the format
|
67 |
|
|
jobAttributes[ 'name' : 'schedulerId' ]
|
68 |
|
|
- bulkId is an eventual bulk submission identifier
|
69 |
|
|
- service is a endpoit to connect withs (such as the WMS)
|
70 |
|
|
"""
|
71 |
ewv |
1.5 |
|
72 |
spiga |
1.6 |
#print "config"+config
|
73 |
|
|
if type(obj) == RunningJob or type(obj) == Job:
|
74 |
|
|
return self.submitJob(obj, requirements)
|
75 |
|
|
elif type(obj) == Task :
|
76 |
|
|
return self.submitTask (obj, requirements )
|
77 |
|
|
|
78 |
|
|
def submitTask ( self, task, requirements=''):
|
79 |
|
|
ret_map={}
|
80 |
|
|
for job in task.getJobs() :
|
81 |
|
|
map, taskId, queue = self.submitJob(job, task, requirements)
|
82 |
|
|
ret_map.update(map)
|
83 |
|
|
|
84 |
|
|
return ret_map, taskId, queue
|
85 |
|
|
|
86 |
|
|
def submitJob ( self, job, task=None, requirements=''):
|
87 |
|
|
"""Need to copy the inputsandbox to WN before submitting a job"""
|
88 |
|
|
|
89 |
|
|
arg = self.decode(job, task, requirements )
|
90 |
|
|
|
91 |
|
|
command = "qsub " + arg
|
92 |
|
|
#print command + "\n"
|
93 |
|
|
out, ret = self.ExecuteCommand(command)
|
94 |
|
|
#print "out:" + out + "\n"
|
95 |
|
|
r = re.compile("Your job (\d+) .* has been submitted")
|
96 |
|
|
|
97 |
|
|
m= r.search(out)
|
98 |
|
|
if m is not None:
|
99 |
|
|
jobId =m.group(1)
|
100 |
|
|
command = "qstat -j " + jobId
|
101 |
|
|
#out, ret = self.ExecuteCommand(command)
|
102 |
|
|
#print "out:" + out + "\n"
|
103 |
|
|
#queue = m.group(2)
|
104 |
|
|
queue = "all"
|
105 |
|
|
else:
|
106 |
|
|
#rNot = re.compile("Job not submitted.*<(\w+)>")
|
107 |
|
|
#m= rNot.search(out)
|
108 |
|
|
#if m is not None:
|
109 |
|
|
# print m
|
110 |
|
|
# print "Job NOT submitted"
|
111 |
|
|
# print out
|
112 |
|
|
raise SchedulerError('error', out)
|
113 |
|
|
taskId = None
|
114 |
|
|
#print "Your job identifier is: ", taskId, queue
|
115 |
|
|
map={ job[ 'name' ] : jobId }
|
116 |
|
|
return map, taskId, queue
|
117 |
slacapra |
1.1 |
|
118 |
spiga |
1.4 |
|
119 |
spiga |
1.6 |
def decode (self, job, task=None, requirements='' , config ='', service='' ):
|
120 |
slacapra |
1.1 |
"""
|
121 |
spiga |
1.6 |
prepare file for submission
|
122 |
slacapra |
1.1 |
"""
|
123 |
|
|
|
124 |
spiga |
1.6 |
txt = "#batch script for SGE jobs\n"
|
125 |
|
|
txt += "MYTMPDIR=$TMP/$JOB_NAME\n"
|
126 |
|
|
txt += "mkdir -p $MYTMPDIR \n"
|
127 |
|
|
txt += "cd $MYTMPDIR\n"
|
128 |
|
|
# Need to copy InputSandBox to WN
|
129 |
|
|
if task:
|
130 |
|
|
subDir=task[ 'startDirectory' ]
|
131 |
|
|
for inpFile in task[ 'globalSandbox' ].split(','):
|
132 |
|
|
#txt += "cp "+subDir+inpFile+" . \n"
|
133 |
|
|
txt += "cp "+inpFile+" . \n"
|
134 |
|
|
## Job specific ISB
|
135 |
|
|
#for inpFile in job[ 'inputFiles' ]:
|
136 |
|
|
# if inpFile != '': txt += self.cpCmd+" "+self.rfioSer+"/"+inpFile+" . \n"
|
137 |
|
|
|
138 |
|
|
## Now the actual wrapper
|
139 |
|
|
args = job[ 'arguments' ]
|
140 |
|
|
exe = job[ 'executable' ]
|
141 |
|
|
txt += "./"+os.path.basename(exe)+" "+args+"\n"
|
142 |
|
|
|
143 |
|
|
## And finally copy back the output
|
144 |
|
|
outputDir=task['outputDirectory']
|
145 |
|
|
for outFile in job['outputFiles']:
|
146 |
|
|
#print "outputFile:"+outFile
|
147 |
|
|
txt += "cp "+outFile+" "+outputDir+"/. \n"
|
148 |
|
|
|
149 |
|
|
txt += "cd $SGE_O_HOME\n"
|
150 |
|
|
txt += "rm -rf $MYTMPDIR\n"
|
151 |
|
|
arg = ""
|
152 |
|
|
if job[ 'standardInput' ] != '':
|
153 |
|
|
arg += ' -i %s ' % job[ 'standardInput' ]
|
154 |
|
|
|
155 |
|
|
#delete old log files as SGE will append to the file
|
156 |
|
|
if os.path.exists(job[ 'standardOutput' ]):
|
157 |
|
|
os.remove(job[ 'standardOutput' ])
|
158 |
|
|
|
159 |
|
|
if os.path.exists(job[ 'standardError' ]):
|
160 |
|
|
os.remove(job[ 'standardError' ])
|
161 |
|
|
|
162 |
|
|
arg += ' -o %s ' % job[ 'standardOutput' ]
|
163 |
|
|
arg += ' -e %s ' % job[ 'standardError' ]
|
164 |
|
|
|
165 |
|
|
# jobrundir = outputDir
|
166 |
|
|
# jobrundir += "/%s" % job['id']
|
167 |
|
|
# if not os.path.exists(jobrundir):
|
168 |
|
|
# os.mkdir(jobrundir)
|
169 |
|
|
|
170 |
|
|
arg +='-wd '+outputDir+ ' '
|
171 |
|
|
# txt += "rm -rf "+jobrundir+"\n"
|
172 |
|
|
# blindly append user requirements
|
173 |
|
|
arg += requirements
|
174 |
|
|
arg += '-N '+task[ 'name' ]+' '
|
175 |
|
|
#create job script
|
176 |
|
|
f = open(outputDir+'/batchscript', 'wc')
|
177 |
|
|
f.write("#!/bin/sh\n")
|
178 |
|
|
f.write(txt)
|
179 |
|
|
f.close()
|
180 |
|
|
|
181 |
|
|
# and finally the wrapper
|
182 |
|
|
#arg += ' %s ' % txt
|
183 |
|
|
arg += outputDir+"/batchscript"
|
184 |
|
|
return arg
|
185 |
|
|
|
186 |
|
|
|
187 |
|
|
##########################################################################
|
188 |
|
|
def query(self, obj, service='', objType='node') :
|
189 |
slacapra |
1.1 |
"""
|
190 |
spiga |
1.6 |
query status and eventually other scheduler related information
|
191 |
slacapra |
1.1 |
"""
|
192 |
|
|
|
193 |
spiga |
1.6 |
# the object passed is a Task:
|
194 |
|
|
# check whether parent id are provided, make a list of ids
|
195 |
|
|
# and check the status
|
196 |
|
|
if type(obj) == Task :
|
197 |
|
|
schedIds = []
|
198 |
|
|
|
199 |
|
|
# query performed through single job ids
|
200 |
|
|
if objType == 'node' :
|
201 |
|
|
for job in obj.jobs :
|
202 |
|
|
if self.valid( job.runningJob ) and \
|
203 |
|
|
job.runningJob['status'] != 'SD':
|
204 |
|
|
schedIds.append( job.runningJob['schedulerId'] )
|
205 |
|
|
jobAttributes = self.queryLocal( schedIds, objType )
|
206 |
|
|
|
207 |
|
|
# query performed through a bulk id
|
208 |
|
|
elif objType == 'parent' :
|
209 |
|
|
for job in obj.jobs :
|
210 |
|
|
if self.valid( job.runningJob ) \
|
211 |
|
|
and job.runningJob['status'] != 'SD' \
|
212 |
|
|
and job.runningJob['schedulerParentId'] not in schedIds:
|
213 |
|
|
schedIds.append( job.runningJob['schedulerParentId'] )
|
214 |
|
|
jobAttributes = self.queryLocal( schedIds, objType )
|
215 |
|
|
|
216 |
|
|
# status association
|
217 |
|
|
for job in obj.jobs :
|
218 |
|
|
try:
|
219 |
|
|
valuesMap = jobAttributes[ job.runningJob['schedulerId'] ]
|
220 |
|
|
except:
|
221 |
|
|
continue
|
222 |
|
|
for key, value in valuesMap.iteritems() :
|
223 |
|
|
job.runningJob[key] = value
|
224 |
|
|
|
225 |
|
|
# unknown object type
|
226 |
|
|
else:
|
227 |
|
|
raise SchedulerError('wrong argument type', str( type(obj) ))
|
228 |
slacapra |
1.1 |
|
229 |
spiga |
1.3 |
|
230 |
slacapra |
1.1 |
|
231 |
spiga |
1.6 |
def queryLocal(self, schedIdList, objType='node' ) :
|
232 |
slacapra |
1.1 |
|
233 |
|
|
"""
|
234 |
spiga |
1.6 |
query status and eventually other scheduler related information
|
235 |
|
|
It may use single 'node' scheduler id or bulk id for association
|
236 |
|
|
|
237 |
|
|
return jobAttributes
|
238 |
|
|
|
239 |
|
|
where jobAttributes is a map of the format:
|
240 |
|
|
jobAttributes[ schedId :
|
241 |
|
|
[ key : val ]
|
242 |
|
|
]
|
243 |
|
|
where key can be any parameter of the Job object and at least status
|
244 |
|
|
|
245 |
slacapra |
1.1 |
"""
|
246 |
spiga |
1.6 |
ret_map={}
|
247 |
|
|
#print schedIdList, service, objType
|
248 |
|
|
r = re.compile("(\d+) .* "+os.getlogin()+" \W+(\w+) .* (\S+)@(\w+)")
|
249 |
|
|
rnohost = re.compile("(\d+) .* "+os.getlogin()+" \W+(\w+) ")
|
250 |
|
|
cmd='qstat -u '+os.getlogin()
|
251 |
|
|
#print cmd
|
252 |
|
|
out, ret = self.ExecuteCommand(cmd)
|
253 |
|
|
#print "<"+out+">"
|
254 |
|
|
for line in out.split('\n'):
|
255 |
|
|
#print line
|
256 |
|
|
queue=None
|
257 |
|
|
host=None
|
258 |
|
|
id=None
|
259 |
|
|
st=None
|
260 |
|
|
mfull= r.search(line)
|
261 |
|
|
if (mfull):
|
262 |
|
|
#print "matched"
|
263 |
|
|
id,st,queue,host=mfull.groups()
|
264 |
|
|
else:
|
265 |
|
|
mnohost = rnohost.search(line)
|
266 |
|
|
if (mnohost):
|
267 |
|
|
id,st = mnohost.groups()
|
268 |
|
|
pass
|
269 |
|
|
pass
|
270 |
|
|
#print "got id %s" % id
|
271 |
|
|
if(id) and (id in schedIdList):
|
272 |
|
|
map={}
|
273 |
|
|
map[ 'status' ] = self.statusMap[st]
|
274 |
|
|
map[ 'statusScheduler' ] = st
|
275 |
|
|
if (host): map[ 'destination' ] = host
|
276 |
|
|
ret_map[id]=map
|
277 |
|
|
#print "set state to "+map['statusScheduler']
|
278 |
|
|
pass
|
279 |
|
|
pass
|
280 |
|
|
|
281 |
|
|
#set all missing jobs to done
|
282 |
|
|
for jobid in schedIdList:
|
283 |
|
|
jobid = jobid.strip()
|
284 |
|
|
if not jobid in ret_map:
|
285 |
|
|
#print "job "+jobid+" not found in qstat list setting to DONE"
|
286 |
|
|
id = jobid
|
287 |
|
|
st = "DONE"
|
288 |
|
|
map={}
|
289 |
|
|
map[ 'status' ] = self.statusMap[st]
|
290 |
|
|
map[ 'statusScheduler' ]= st
|
291 |
|
|
ret_map[id]=map
|
292 |
|
|
pass
|
293 |
|
|
pass
|
294 |
|
|
|
295 |
|
|
return ret_map
|
296 |
|
|
|
297 |
slacapra |
1.1 |
|
298 |
spiga |
1.6 |
def getOutput( self, obj, outdir):
|
299 |
|
|
"""
|
300 |
|
|
retrieve output or just put it in the destination directory
|
301 |
slacapra |
1.1 |
|
302 |
spiga |
1.6 |
does not return
|
303 |
|
|
"""
|
304 |
|
|
#output ends up in the wrong location with a user defined
|
305 |
|
|
#output directory...Thus we have to move it to the correct
|
306 |
|
|
#directory here....
|
307 |
|
|
#print "SchedulerSGE:getOutput called!"
|
308 |
|
|
|
309 |
|
|
if type(obj) == Task :
|
310 |
|
|
oldoutdir=obj[ 'outputDirectory' ]
|
311 |
|
|
if(outdir != oldoutdir):
|
312 |
|
|
for job in obj.jobs:
|
313 |
|
|
jobid = job[ 'id' ];
|
314 |
|
|
#print "job:"+str(jobid)
|
315 |
|
|
if self.valid( job.runningJob ):
|
316 |
|
|
#print "is valid"
|
317 |
|
|
for outFile in job['outputFiles']:
|
318 |
|
|
#print "outputFile:"+outFile
|
319 |
|
|
command = "mv "+oldoutdir+"/"+outFile+" "+outdir+"/. \n"
|
320 |
|
|
#print command
|
321 |
|
|
out, ret = self.ExecuteCommand(command)
|
322 |
|
|
if (out!=""):
|
323 |
|
|
raise SchedulerError('unable to move file', out)
|
324 |
|
|
#raise SchedulerError("unable to move file "+oldoutdir+"/"+outFile+" ",out)
|
325 |
|
|
pass
|
326 |
|
|
pass
|
327 |
|
|
pass
|
328 |
|
|
pass
|
329 |
|
|
pass
|
330 |
|
|
|
331 |
spiga |
1.2 |
|
332 |
spiga |
1.6 |
def kill( self, obj ):
|
333 |
|
|
"""
|
334 |
|
|
kill the job instance
|
335 |
slacapra |
1.1 |
|
336 |
spiga |
1.6 |
does not return
|
337 |
|
|
"""
|
338 |
|
|
r = re.compile("has registered the job (\d+) for deletion")
|
339 |
|
|
rFinished = re.compile("Job <(\d+)>: Job has already finished")
|
340 |
|
|
# for jobid in schedIdList:
|
341 |
|
|
for job in obj.jobs:
|
342 |
|
|
if not self.valid( job.runningJob ):
|
343 |
|
|
continue
|
344 |
|
|
jobid = str( job.runningJob[ 'schedulerId' ] ).strip()
|
345 |
|
|
cmd='qdel '+str(jobid)
|
346 |
|
|
out, ret = self.ExecuteCommand(cmd)
|
347 |
|
|
#print "kill:"+out
|
348 |
|
|
mKilled= r.search(out)
|
349 |
|
|
if not mKilled:
|
350 |
|
|
raise SchedulerError ( "Unable to kill job "+jobid+" . Reason: ", out )
|
351 |
|
|
pass
|
352 |
|
|
pass
|
353 |
slacapra |
1.1 |
|
354 |
spiga |
1.6 |
def postMortem ( self, schedIdList, outfile, service ) :
|
355 |
spiga |
1.3 |
"""
|
356 |
spiga |
1.6 |
execute any post mortem command such as logging-info
|
357 |
|
|
and write it in outfile
|
358 |
ewv |
1.5 |
"""
|
359 |
|
|
|