ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerSge.py
(Generate patch)

Comparing COMP/CRAB/python/SchedulerSge.py (file contents):
Revision 1.6 by spiga, Fri Mar 20 15:39:33 2009 UTC vs.
Revision 1.7 by spiga, Tue Apr 7 08:03:31 2009 UTC

# Line 1 | Line 1
1 < #!/usr/bin/env python
2 < """
3 < basic SGE CLI interaction class
4 < """
5 <
6 < __revision__ = "$Id$"
7 < __version__ = "$Revision$"
8 <
9 < import re, os
10 <
11 < from ProdCommon.BossLite.Scheduler.SchedulerInterface import SchedulerInterface
12 < from ProdCommon.BossLite.Common.Exceptions import SchedulerError
13 < from ProdCommon.BossLite.DbObjects.Job import Job
14 < from ProdCommon.BossLite.DbObjects.Task import Task
15 < from ProdCommon.BossLite.DbObjects.RunningJob import RunningJob
16 <
17 < class SchedulerSge (SchedulerInterface) :
18 <    """
19 <    basic class to handle sge jobs
20 <    """
21 <    def __init__( self, **args):
22 <
23 <        # call super class init method
24 <        super(SchedulerSge, self).__init__(**args)
25 <        self.statusMap = {
26 <            'd':'K',
27 <            'E':'DA',
28 <            'h':'R',
29 <            'r':'R',
30 <            'R':'R',
31 <            's':'R',
32 <            'S':'R',
33 <            't':'SS',
34 <            'T':'R',
35 <            'w':'R',
36 <            'qw':'R',
37 <            'Eqw':'K',
38 <            'DONE':'SD'
39 <            }
40 <  
41 <        
42 <    def checkUserProxy( self, cert='' ):
1 > from Scheduler import Scheduler
2 > from SchedulerLocal import SchedulerLocal
3 > from crab_exceptions import *
4 > from crab_util import *
5 > from crab_logger import Logger
6 > import common
7 >
8 > import os,string
9 >
10 > #
11 > #  Naming convention:
12 > #  methods starting with 'ws' are responsible to provide
13 > #  corresponding part of the job script ('ws' stands for 'write script').
14 > #
15 > # Author: Hartmut Stadie <stadie@mail.desy.de> Inst. f. Experimentalphysik; Universitaet Hamburg
16 > #
17 >
18 > class SchedulerSge(SchedulerLocal) :
19 >
20 >    def __init__(self):
21 >        Scheduler.__init__(self,"SGE")
22 >        self.datasetPath   = None
23 >        self.selectNoInput = None
24          return
25  
26 <    def jobDescription ( self, obj, requirements='', config='', service = '' ):
27 <        """
47 <        retrieve scheduler specific job description
48 <        return it as a string
49 <        """
50 <        args=''
51 <        if type(obj) == RunningJob or type(obj) == Job:
52 <            return self.decode(obj, requirements)
53 <        elif type(obj) == Task :
54 <            task = obj
55 <            for job in task.getJobs() :
56 <                args += self.decode(job, task, requirements)+'  \n'
57 <            return args
58 <
26 >    def configure(self, cfg_params):
27 >        SchedulerLocal.configure(self, cfg_params)
28  
29 <    def submit ( self, obj, requirements='', config='', service = '' ) :
30 <        """
31 <        set up submission parameters and submit
29 >        try:
30 >            tmp =  cfg_params['CMSSW.datasetpath']
31 >            if tmp.lower() == 'none':
32 >                self.datasetPath = None
33 >                self.selectNoInput = 1
34 >            else:
35 >                self.datasetPath = tmp
36 >                self.selectNoInput = 0
37 >        except KeyError:
38 >            msg = "Error: datasetpath not defined "
39 >            raise CrabException(msg)
40 >
41 >        self.return_data = cfg_params.get('USER.return_data', 0)
42 >        self.copy_data   = cfg_params.get("USER.copy_data", 0)
43 >
44 >        if ( int(self.return_data) == 0 and int(self.copy_data) == 0 ):
45 >            msg = 'Error: return_data and copy_data cannot be set both to 0\n'
46 >            msg = msg + 'Please modify your crab.cfg file\n'
47 >            raise CrabException(msg)
48 >
49 >        if ( int(self.return_data) == 1 and int(self.copy_data) == 1 ):
50 >            msg = 'Error: return_data and copy_data cannot be set both to 1\n'
51 >            msg = msg + 'Please modify your crab.cfg file\n'
52 >            raise CrabException(msg)
53 >
54 >        if ( int(self.copy_data) == 0 and int(self.publish_data) == 1 ):
55 >            msg = 'Warning: publish_data = 1 must be used with copy_data = 1\n'
56 >            msg = msg + 'Please modify copy_data value in your crab.cfg file\n'
57 >            common.logger.message(msg)
58 >            raise CrabException(msg)
59 >
60 >        if int(self.copy_data) == 1:
61 >            self.SE = cfg_params.get('USER.storage_element', None)
62 >            if not self.SE:
63 >                msg = "Error. The [USER] section has no 'storage_element'"
64 >                common.logger.message(msg)
65 >                raise CrabException(msg)
66 >
67 >            self.proxyValid = 0
68 >            self.dontCheckProxy = int(cfg_params.get("EDG.dont_check_proxy",0))
69 >            self.proxyServer = cfg_params.get("EDG.proxy_server",'myproxy.cern.ch')
70 >            common.logger.debug(5,'Setting myproxy server to ' + self.proxyServer)
71 >
72 >            self.group = cfg_params.get("EDG.group", None)
73 >            self.role  = cfg_params.get("EDG.role", None)
74 >            self.VO    = cfg_params.get('EDG.virtual_organization', 'cms')
75 >
76 >            self.checkProxy()
77 >
78 >        self.role = None
79 >
80 >        self.pool = cfg_params.get('USER.storage_pool',None)
81 > #        self.cpu = cfg_params.get('USER.cpu',172800)
82 > #        self.vmem = cfg_params.get('USER.vmem',2)
83 >        return
84  
85 <        return jobAttributes, bulkId, service
85 >    def envUniqueID(self):
86 >        id = "https://"+common.scheduler.name()+":/${JOB_ID}-"+ \
87 >            string.replace(common._db.queryTask('name'),"_","-")
88 >        return id
89  
90 <        - jobAttributs is a map of the format
67 <              jobAttributes[ 'name' : 'schedulerId' ]
68 <        - bulkId is an eventual bulk submission identifier
69 <        - service is a endpoit to connect withs (such as the WMS)
90 >    def realSchedParams(self,cfg_params):
91          """
92 +        Return dictionary with specific parameters, to use
93 +        with real scheduler
94 +        """
95 +        params = {}
96 +        return  params
97  
98 <        #print "config"+config
73 <        if type(obj) == RunningJob or type(obj) == Job:
74 <            return self.submitJob(obj, requirements)
75 <        elif type(obj) == Task :
76 <            return self.submitTask (obj, requirements )
77 <
78 <    def submitTask ( self, task, requirements=''):
79 <        ret_map={}
80 <        for job in task.getJobs() :
81 <            map, taskId, queue = self.submitJob(job, task, requirements)
82 <            ret_map.update(map)
83 <
84 <        return ret_map, taskId, queue
85 <
86 <    def submitJob ( self, job, task=None, requirements=''):
87 <        """Need to copy the inputsandbox to WN before submitting a job"""
88 <        
89 <        arg = self.decode(job, task, requirements )
90 <
91 <        command = "qsub " + arg
92 <        #print command + "\n"
93 <        out, ret = self.ExecuteCommand(command)
94 <        #print "out:" + out + "\n"
95 <        r = re.compile("Your job (\d+) .* has been submitted")
96 <
97 <        m= r.search(out)
98 <        if m is not None:
99 <            jobId =m.group(1)
100 <            command = "qstat -j " +  jobId
101 <            #out, ret = self.ExecuteCommand(command)
102 <            #print "out:" + out + "\n"
103 <            #queue = m.group(2)
104 <            queue = "all"
105 <        else:
106 <            #rNot = re.compile("Job not submitted.*<(\w+)>")
107 <            #m= rNot.search(out)
108 <            #if m is not None:
109 <            #    print m
110 <            #    print "Job NOT submitted"
111 <            #    print out
112 <            raise SchedulerError('error', out)
113 <        taskId = None
114 <        #print "Your job identifier is: ", taskId, queue
115 <        map={ job[ 'name' ] : jobId }
116 <        return map, taskId, queue
117 <
118 <
119 <    def decode (self, job, task=None, requirements='' , config ='', service='' ):
98 >    def sched_parameter(self,i,task):
99          """
100 <        prepare file for submission
100 >        Returns parameter scheduler-specific, to use with BOSS .
101          """
102 +        index = int(common._db.nJobs()) - 1
103 +        sched_param= ''
104 +
105 +        for i in range(index): # Add loop DS
106 +            sched_param= ''
107 +            if (self.queue):
108 +                sched_param += '-q '+self.queue +' '
109 +                if (self.res): sched_param += ' -l '+self.res +' '
110 +            pass
111 +
112 +        #default is request 2G memory and 48 hours CPU time
113 +        #sched_param += ' -V -l h_vmem=2G -l h_cpu=172800 '
114 + #        sched_param += ' -V -l h_vmem='
115 + #        sched_param += self.vmem.__str__()
116 + #        sched_param += 'G -l h_cpu='
117 + #        sched_param += self.cpu.__str__()
118 + #        sched_param += ' '
119  
120 <        txt = "#batch script for SGE jobs\n"
125 <        txt += "MYTMPDIR=$TMP/$JOB_NAME\n"
126 <        txt += "mkdir -p $MYTMPDIR \n"
127 <        txt += "cd $MYTMPDIR\n"
128 <        # Need to copy InputSandBox to WN
129 <        if task:
130 <            subDir=task[ 'startDirectory' ]
131 <            for inpFile in task[ 'globalSandbox' ].split(','):
132 <                #txt += "cp "+subDir+inpFile+" . \n"
133 <                txt += "cp "+inpFile+" . \n"
134 <        ## Job specific ISB
135 <        #for inpFile in job[ 'inputFiles' ]:
136 <        #    if inpFile != '': txt += self.cpCmd+" "+self.rfioSer+"/"+inpFile+" . \n"
137 <
138 <        ## Now the actual wrapper
139 <        args = job[ 'arguments' ]
140 <        exe = job[ 'executable' ]
141 <        txt += "./"+os.path.basename(exe)+" "+args+"\n"
142 <        
143 <        ## And finally copy back the output
144 <        outputDir=task['outputDirectory']
145 <        for outFile in job['outputFiles']:
146 <            #print "outputFile:"+outFile
147 <            txt += "cp "+outFile+" "+outputDir+"/. \n"
148 <
149 <        txt += "cd $SGE_O_HOME\n"
150 <        txt += "rm -rf $MYTMPDIR\n"
151 <        arg = ""
152 <        if job[ 'standardInput' ] != '':
153 <            arg += ' -i %s ' % job[ 'standardInput' ]
154 <            
155 <        #delete old log files as SGE will append to the file    
156 <        if os.path.exists(job[ 'standardOutput' ]):
157 <            os.remove(job[ 'standardOutput' ])
158 <            
159 <        if os.path.exists(job[ 'standardError' ]):
160 <            os.remove(job[ 'standardError' ])
161 <            
162 <        arg += ' -o %s ' % job[ 'standardOutput' ]
163 <        arg += ' -e %s ' % job[ 'standardError' ]
164 <
165 < #       jobrundir = outputDir
166 < #       jobrundir += "/%s" % job['id']
167 < #       if not os.path.exists(jobrundir):
168 < #           os.mkdir(jobrundir)
169 <            
170 <        arg +='-wd '+outputDir+ ' '
171 < #       txt += "rm -rf "+jobrundir+"\n"
172 <        # blindly append user requirements
173 <        arg += requirements
174 <        arg += '-N '+task[ 'name' ]+' '
175 <        #create job script
176 <        f = open(outputDir+'/batchscript', 'wc')
177 <        f.write("#!/bin/sh\n")
178 <        f.write(txt)
179 <        f.close()
180 <        
181 <        # and finally the wrapper
182 <        #arg +=  '  %s ' % txt
183 <        arg += outputDir+"/batchscript"
184 <        return arg
120 >        return sched_param
121  
122 +    def loggingInfo(self, id):
123 +        """ return logging info about job nj """
124 +        print "Warning: SchedulerSge::loggingInfo not implemented!"
125 +        return ""
126  
127 <    ##########################################################################
188 <    def query(self, obj, service='', objType='node') :
127 >    def wsExitFunc(self):
128          """
190        query status and eventually other scheduler related information
129          """
130 +        txt = '\n'
131  
132 <        # the object passed is a Task:
133 <        #   check whether parent id are provided, make a list of ids
134 <        #     and check the status
196 <        if type(obj) == Task :
197 <            schedIds = []
198 <
199 <            # query performed through single job ids
200 <            if objType == 'node' :
201 <                for job in obj.jobs :
202 <                    if self.valid( job.runningJob ) and \
203 <                           job.runningJob['status'] != 'SD':
204 <                        schedIds.append( job.runningJob['schedulerId'] )
205 <                jobAttributes = self.queryLocal( schedIds, objType )
206 <
207 <            # query performed through a bulk id
208 <            elif objType == 'parent' :
209 <                for job in obj.jobs :
210 <                    if self.valid( job.runningJob ) \
211 <                      and job.runningJob['status'] != 'SD' \
212 <                      and job.runningJob['schedulerParentId'] not in schedIds:
213 <                        schedIds.append( job.runningJob['schedulerParentId'] )
214 <                jobAttributes = self.queryLocal( schedIds, objType )
215 <
216 <            # status association
217 <            for job in obj.jobs :
218 <                try:
219 <                    valuesMap = jobAttributes[ job.runningJob['schedulerId'] ]
220 <                except:
221 <                    continue
222 <                for key, value in valuesMap.iteritems() :
223 <                    job.runningJob[key] = value
224 <
225 <        # unknown object type
226 <        else:
227 <            raise SchedulerError('wrong argument type', str( type(obj) ))
132 >        txt += '#\n'
133 >        txt += '# EXECUTE THIS FUNCTION BEFORE EXIT \n'
134 >        txt += '#\n\n'
135  
136 +        txt += 'func_exit() { \n'
137 +        txt += self.wsExitFunc_common()
138  
139 +        txt += '    cp ${SGE_STDOUT_PATH} CMSSW_${NJob}.stdout \n'
140 +        txt += '    cp ${SGE_STDERR_PATH} CMSSW_${NJob}.stderr \n'
141 +        txt += '    tar zcvf ${out_files}.tgz  ${filesToCheck}\n'
142 +        txt += '    exit $job_exit_code\n'
143 +        txt += '}\n'
144  
145 <    def queryLocal(self, schedIdList, objType='node' ) :
145 >        return txt
146  
147 +    def listMatch(self, dest, full):
148          """
234        query status and eventually other scheduler related information
235        It may use single 'node' scheduler id or bulk id for association
236        
237        return jobAttributes
238
239        where jobAttributes is a map of the format:
240           jobAttributes[ schedId :
241                                    [ key : val ]
242                        ]
243           where key can be any parameter of the Job object and at least status
244                        
149          """
150 <        ret_map={}
151 <        #print schedIdList, service, objType
152 <        r = re.compile("(\d+) .* "+os.getlogin()+" \W+(\w+) .* (\S+)@(\w+)")
153 <        rnohost = re.compile("(\d+) .* "+os.getlogin()+" \W+(\w+) ")
154 <        cmd='qstat -u '+os.getlogin()
251 <        #print cmd
252 <        out, ret = self.ExecuteCommand(cmd)
253 <        #print "<"+out+">"
254 <        for line in out.split('\n'):
255 <            #print line
256 <            queue=None
257 <            host=None
258 <            id=None
259 <            st=None
260 <            mfull= r.search(line)
261 <            if (mfull):
262 <                #print "matched"
263 <                id,st,queue,host=mfull.groups()
264 <            else:
265 <                mnohost = rnohost.search(line)
266 <                if (mnohost):
267 <                    id,st = mnohost.groups()
268 <                    pass
269 <                pass
270 <            #print "got id %s" % id
271 <            if(id) and (id in schedIdList):
272 <                map={}
273 <                map[ 'status' ] = self.statusMap[st]
274 <                map[ 'statusScheduler' ] = st
275 <                if (host): map[ 'destination' ] = host
276 <                ret_map[id]=map
277 <                #print "set state to "+map['statusScheduler']
278 <                pass
279 <            pass
280 <        
281 <        #set all missing jobs to done
282 <        for jobid in schedIdList:
283 <            jobid = jobid.strip()
284 <            if not jobid in ret_map:
285 <                #print "job "+jobid+" not found in qstat list setting to DONE"
286 <                id = jobid
287 <                st = "DONE"
288 <                map={}
289 <                map[ 'status' ] = self.statusMap[st]
290 <                map[ 'statusScheduler' ]= st
291 <                ret_map[id]=map
292 <                pass
293 <            pass
294 <        
295 <        return ret_map
296 <    
150 >        #if len(dest)!=0:
151 >        sites = [self.blackWhiteListParser.cleanForBlackWhiteList(dest,'list')]
152 >        #else:
153 >        #    sites = [str(getLocalDomain(self))]
154 >        return sites
155  
156 <    def getOutput( self, obj, outdir):
157 <        """
158 <        retrieve output or just put it in the destination directory
301 <
302 <        does not return
303 <        """
304 <        #output ends up in the wrong location with a user defined
305 <        #output directory...Thus we have to move it to the correct
306 <        #directory here....
307 <        #print "SchedulerSGE:getOutput called!"
308 <            
309 <        if type(obj) == Task :
310 <            oldoutdir=obj[ 'outputDirectory' ]
311 <            if(outdir != oldoutdir):
312 <                for job in obj.jobs:
313 <                    jobid = job[ 'id' ];
314 <                    #print "job:"+str(jobid)
315 <                    if self.valid( job.runningJob ):
316 <                        #print "is valid"                      
317 <                        for outFile in job['outputFiles']:
318 <                            #print "outputFile:"+outFile
319 <                            command = "mv "+oldoutdir+"/"+outFile+" "+outdir+"/. \n"
320 <                            #print command
321 <                            out, ret = self.ExecuteCommand(command)
322 <                            if (out!=""):
323 <                                raise SchedulerError('unable to move file', out)
324 <                                #raise SchedulerError("unable to move file "+oldoutdir+"/"+outFile+" ",out)
325 <                            pass
326 <                        pass
327 <                    pass
328 <                pass
329 <            pass
330 <        
156 >    def wsCopyOutput(self):
157 >        txt=self.wsCopyOutput_comm(self.pool)
158 >        return txt
159  
160 <    def kill( self, obj ):
161 <        """
334 <        kill the job instance
160 >    def userName(self):
161 >        """ return the user name """
162  
163 <        does not return
164 <        """
165 <        r = re.compile("has registered the job (\d+) for deletion")
166 <        rFinished = re.compile("Job <(\d+)>: Job has already finished")
167 <        # for jobid in schedIdList:
168 <        for job in obj.jobs:
342 <            if not self.valid( job.runningJob ):
343 <                continue
344 <            jobid = str( job.runningJob[ 'schedulerId' ] ).strip()
345 <            cmd='qdel '+str(jobid)
346 <            out, ret = self.ExecuteCommand(cmd)
347 <            #print "kill:"+out
348 <            mKilled= r.search(out)
349 <            if not mKilled:
350 <                raise SchedulerError ( "Unable to kill job "+jobid+" . Reason: ", out )
351 <            pass
352 <        pass
163 >        ## hack for german naf
164 >        import pwd,getpass
165 >        tmp=pwd.getpwnam(getpass.getuser())[4]
166 >        tmp=tmp.rstrip(',')
167 >        tmp=tmp.rstrip(',')
168 >        tmp=tmp.rstrip(',')
169  
354    def postMortem ( self, schedIdList, outfile, service ) :
355        """
356        execute any post mortem command such as logging-info
357        and write it in outfile
358        """
170  
171 +        return "/CN="+tmp.strip()

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines