ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerSge.py
Revision: 1.6
Committed: Fri Mar 20 15:39:33 2009 UTC (16 years, 1 month ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_5_1_pre1
Changes since 1.5: +331 -95 lines
Log Message:
changes from Friederike

File Contents

# User Rev Content
1 spiga 1.6 #!/usr/bin/env python
2     """
3     basic SGE CLI interaction class
4     """
5    
6     __revision__ = "$Id: SchedulerSge.py,v 1.5 2009/01/20 18:49:45 gcodispo Exp $"
7     __version__ = "$Revision: 1.5 $"
8    
9     import re, os
10    
11     from ProdCommon.BossLite.Scheduler.SchedulerInterface import SchedulerInterface
12     from ProdCommon.BossLite.Common.Exceptions import SchedulerError
13     from ProdCommon.BossLite.DbObjects.Job import Job
14     from ProdCommon.BossLite.DbObjects.Task import Task
15     from ProdCommon.BossLite.DbObjects.RunningJob import RunningJob
16    
17     class SchedulerSge (SchedulerInterface) :
18     """
19     basic class to handle sge jobs
20     """
21     def __init__( self, **args):
22    
23     # call super class init method
24     super(SchedulerSge, self).__init__(**args)
25     self.statusMap = {
26     'd':'K',
27     'E':'DA',
28     'h':'R',
29     'r':'R',
30     'R':'R',
31     's':'R',
32     'S':'R',
33     't':'SS',
34     'T':'R',
35     'w':'R',
36     'qw':'R',
37     'Eqw':'K',
38     'DONE':'SD'
39     }
40    
41    
42     def checkUserProxy( self, cert='' ):
43     return
44 slacapra 1.1
45 spiga 1.6 def jobDescription ( self, obj, requirements='', config='', service = '' ):
46     """
47     retrieve scheduler specific job description
48     return it as a string
49     """
50     args=''
51     if type(obj) == RunningJob or type(obj) == Job:
52     return self.decode(obj, requirements)
53     elif type(obj) == Task :
54     task = obj
55     for job in task.getJobs() :
56     args += self.decode(job, task, requirements)+' \n'
57     return args
58 slacapra 1.1
59    
60 spiga 1.6 def submit ( self, obj, requirements='', config='', service = '' ) :
61     """
62     set up submission parameters and submit
63 slacapra 1.1
64 spiga 1.6 return jobAttributes, bulkId, service
65 slacapra 1.1
66 spiga 1.6 - jobAttributs is a map of the format
67     jobAttributes[ 'name' : 'schedulerId' ]
68     - bulkId is an eventual bulk submission identifier
69     - service is a endpoit to connect withs (such as the WMS)
70     """
71 ewv 1.5
72 spiga 1.6 #print "config"+config
73     if type(obj) == RunningJob or type(obj) == Job:
74     return self.submitJob(obj, requirements)
75     elif type(obj) == Task :
76     return self.submitTask (obj, requirements )
77    
78     def submitTask ( self, task, requirements=''):
79     ret_map={}
80     for job in task.getJobs() :
81     map, taskId, queue = self.submitJob(job, task, requirements)
82     ret_map.update(map)
83    
84     return ret_map, taskId, queue
85    
86     def submitJob ( self, job, task=None, requirements=''):
87     """Need to copy the inputsandbox to WN before submitting a job"""
88    
89     arg = self.decode(job, task, requirements )
90    
91     command = "qsub " + arg
92     #print command + "\n"
93     out, ret = self.ExecuteCommand(command)
94     #print "out:" + out + "\n"
95     r = re.compile("Your job (\d+) .* has been submitted")
96    
97     m= r.search(out)
98     if m is not None:
99     jobId =m.group(1)
100     command = "qstat -j " + jobId
101     #out, ret = self.ExecuteCommand(command)
102     #print "out:" + out + "\n"
103     #queue = m.group(2)
104     queue = "all"
105     else:
106     #rNot = re.compile("Job not submitted.*<(\w+)>")
107     #m= rNot.search(out)
108     #if m is not None:
109     # print m
110     # print "Job NOT submitted"
111     # print out
112     raise SchedulerError('error', out)
113     taskId = None
114     #print "Your job identifier is: ", taskId, queue
115     map={ job[ 'name' ] : jobId }
116     return map, taskId, queue
117 slacapra 1.1
118 spiga 1.4
119 spiga 1.6 def decode (self, job, task=None, requirements='' , config ='', service='' ):
120 slacapra 1.1 """
121 spiga 1.6 prepare file for submission
122 slacapra 1.1 """
123    
124 spiga 1.6 txt = "#batch script for SGE jobs\n"
125     txt += "MYTMPDIR=$TMP/$JOB_NAME\n"
126     txt += "mkdir -p $MYTMPDIR \n"
127     txt += "cd $MYTMPDIR\n"
128     # Need to copy InputSandBox to WN
129     if task:
130     subDir=task[ 'startDirectory' ]
131     for inpFile in task[ 'globalSandbox' ].split(','):
132     #txt += "cp "+subDir+inpFile+" . \n"
133     txt += "cp "+inpFile+" . \n"
134     ## Job specific ISB
135     #for inpFile in job[ 'inputFiles' ]:
136     # if inpFile != '': txt += self.cpCmd+" "+self.rfioSer+"/"+inpFile+" . \n"
137    
138     ## Now the actual wrapper
139     args = job[ 'arguments' ]
140     exe = job[ 'executable' ]
141     txt += "./"+os.path.basename(exe)+" "+args+"\n"
142    
143     ## And finally copy back the output
144     outputDir=task['outputDirectory']
145     for outFile in job['outputFiles']:
146     #print "outputFile:"+outFile
147     txt += "cp "+outFile+" "+outputDir+"/. \n"
148    
149     txt += "cd $SGE_O_HOME\n"
150     txt += "rm -rf $MYTMPDIR\n"
151     arg = ""
152     if job[ 'standardInput' ] != '':
153     arg += ' -i %s ' % job[ 'standardInput' ]
154    
155     #delete old log files as SGE will append to the file
156     if os.path.exists(job[ 'standardOutput' ]):
157     os.remove(job[ 'standardOutput' ])
158    
159     if os.path.exists(job[ 'standardError' ]):
160     os.remove(job[ 'standardError' ])
161    
162     arg += ' -o %s ' % job[ 'standardOutput' ]
163     arg += ' -e %s ' % job[ 'standardError' ]
164    
165     # jobrundir = outputDir
166     # jobrundir += "/%s" % job['id']
167     # if not os.path.exists(jobrundir):
168     # os.mkdir(jobrundir)
169    
170     arg +='-wd '+outputDir+ ' '
171     # txt += "rm -rf "+jobrundir+"\n"
172     # blindly append user requirements
173     arg += requirements
174     arg += '-N '+task[ 'name' ]+' '
175     #create job script
176     f = open(outputDir+'/batchscript', 'wc')
177     f.write("#!/bin/sh\n")
178     f.write(txt)
179     f.close()
180    
181     # and finally the wrapper
182     #arg += ' %s ' % txt
183     arg += outputDir+"/batchscript"
184     return arg
185    
186    
187     ##########################################################################
188     def query(self, obj, service='', objType='node') :
189 slacapra 1.1 """
190 spiga 1.6 query status and eventually other scheduler related information
191 slacapra 1.1 """
192    
193 spiga 1.6 # the object passed is a Task:
194     # check whether parent id are provided, make a list of ids
195     # and check the status
196     if type(obj) == Task :
197     schedIds = []
198    
199     # query performed through single job ids
200     if objType == 'node' :
201     for job in obj.jobs :
202     if self.valid( job.runningJob ) and \
203     job.runningJob['status'] != 'SD':
204     schedIds.append( job.runningJob['schedulerId'] )
205     jobAttributes = self.queryLocal( schedIds, objType )
206    
207     # query performed through a bulk id
208     elif objType == 'parent' :
209     for job in obj.jobs :
210     if self.valid( job.runningJob ) \
211     and job.runningJob['status'] != 'SD' \
212     and job.runningJob['schedulerParentId'] not in schedIds:
213     schedIds.append( job.runningJob['schedulerParentId'] )
214     jobAttributes = self.queryLocal( schedIds, objType )
215    
216     # status association
217     for job in obj.jobs :
218     try:
219     valuesMap = jobAttributes[ job.runningJob['schedulerId'] ]
220     except:
221     continue
222     for key, value in valuesMap.iteritems() :
223     job.runningJob[key] = value
224    
225     # unknown object type
226     else:
227     raise SchedulerError('wrong argument type', str( type(obj) ))
228 slacapra 1.1
229 spiga 1.3
230 slacapra 1.1
231 spiga 1.6 def queryLocal(self, schedIdList, objType='node' ) :
232 slacapra 1.1
233     """
234 spiga 1.6 query status and eventually other scheduler related information
235     It may use single 'node' scheduler id or bulk id for association
236    
237     return jobAttributes
238    
239     where jobAttributes is a map of the format:
240     jobAttributes[ schedId :
241     [ key : val ]
242     ]
243     where key can be any parameter of the Job object and at least status
244    
245 slacapra 1.1 """
246 spiga 1.6 ret_map={}
247     #print schedIdList, service, objType
248     r = re.compile("(\d+) .* "+os.getlogin()+" \W+(\w+) .* (\S+)@(\w+)")
249     rnohost = re.compile("(\d+) .* "+os.getlogin()+" \W+(\w+) ")
250     cmd='qstat -u '+os.getlogin()
251     #print cmd
252     out, ret = self.ExecuteCommand(cmd)
253     #print "<"+out+">"
254     for line in out.split('\n'):
255     #print line
256     queue=None
257     host=None
258     id=None
259     st=None
260     mfull= r.search(line)
261     if (mfull):
262     #print "matched"
263     id,st,queue,host=mfull.groups()
264     else:
265     mnohost = rnohost.search(line)
266     if (mnohost):
267     id,st = mnohost.groups()
268     pass
269     pass
270     #print "got id %s" % id
271     if(id) and (id in schedIdList):
272     map={}
273     map[ 'status' ] = self.statusMap[st]
274     map[ 'statusScheduler' ] = st
275     if (host): map[ 'destination' ] = host
276     ret_map[id]=map
277     #print "set state to "+map['statusScheduler']
278     pass
279     pass
280    
281     #set all missing jobs to done
282     for jobid in schedIdList:
283     jobid = jobid.strip()
284     if not jobid in ret_map:
285     #print "job "+jobid+" not found in qstat list setting to DONE"
286     id = jobid
287     st = "DONE"
288     map={}
289     map[ 'status' ] = self.statusMap[st]
290     map[ 'statusScheduler' ]= st
291     ret_map[id]=map
292     pass
293     pass
294    
295     return ret_map
296    
297 slacapra 1.1
298 spiga 1.6 def getOutput( self, obj, outdir):
299     """
300     retrieve output or just put it in the destination directory
301 slacapra 1.1
302 spiga 1.6 does not return
303     """
304     #output ends up in the wrong location with a user defined
305     #output directory...Thus we have to move it to the correct
306     #directory here....
307     #print "SchedulerSGE:getOutput called!"
308    
309     if type(obj) == Task :
310     oldoutdir=obj[ 'outputDirectory' ]
311     if(outdir != oldoutdir):
312     for job in obj.jobs:
313     jobid = job[ 'id' ];
314     #print "job:"+str(jobid)
315     if self.valid( job.runningJob ):
316     #print "is valid"
317     for outFile in job['outputFiles']:
318     #print "outputFile:"+outFile
319     command = "mv "+oldoutdir+"/"+outFile+" "+outdir+"/. \n"
320     #print command
321     out, ret = self.ExecuteCommand(command)
322     if (out!=""):
323     raise SchedulerError('unable to move file', out)
324     #raise SchedulerError("unable to move file "+oldoutdir+"/"+outFile+" ",out)
325     pass
326     pass
327     pass
328     pass
329     pass
330    
331 spiga 1.2
332 spiga 1.6 def kill( self, obj ):
333     """
334     kill the job instance
335 slacapra 1.1
336 spiga 1.6 does not return
337     """
338     r = re.compile("has registered the job (\d+) for deletion")
339     rFinished = re.compile("Job <(\d+)>: Job has already finished")
340     # for jobid in schedIdList:
341     for job in obj.jobs:
342     if not self.valid( job.runningJob ):
343     continue
344     jobid = str( job.runningJob[ 'schedulerId' ] ).strip()
345     cmd='qdel '+str(jobid)
346     out, ret = self.ExecuteCommand(cmd)
347     #print "kill:"+out
348     mKilled= r.search(out)
349     if not mKilled:
350     raise SchedulerError ( "Unable to kill job "+jobid+" . Reason: ", out )
351     pass
352     pass
353 slacapra 1.1
354 spiga 1.6 def postMortem ( self, schedIdList, outfile, service ) :
355 spiga 1.3 """
356 spiga 1.6 execute any post mortem command such as logging-info
357     and write it in outfile
358 ewv 1.5 """
359