ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerLsf.py
(Generate patch)

Comparing COMP/CRAB/python/SchedulerLsf.py (file contents):
Revision 1.1 by slacapra, Fri Jan 4 17:30:56 2008 UTC vs.
Revision 1.14 by spiga, Wed Jun 11 10:03:07 2008 UTC

# Line 1 | Line 1
1   from Scheduler import Scheduler
2 + from SchedulerLocal import SchedulerLocal
3   from crab_exceptions import *
4 + from crab_util import *
5   from crab_logger import Logger
6   import common
7  
# Line 11 | Line 13 | import os,string
13   #  corresponding part of the job script ('ws' stands for 'write script').
14   #
15  
16 < class SchedulerLsf(Scheduler) :
16 > class SchedulerLsf(SchedulerLocal) :
17  
18      def __init__(self):
19          Scheduler.__init__(self,"LSF")
20 <
20 >        
21          return
22  
23      def configure(self, cfg_params):
24 <
25 <        self.jobtypeName = cfg_params['CRAB.jobtype']
26 <
27 <        self.lsf_queue = None
26 <        if (cfg_params.has_key('LSF.queue')): self.lsf_queue = cfg_params['LSF.queue']
27 <
28 <        self.lsf_res = None
29 <        if (cfg_params.has_key('LSF.resource')): self.lsf_res = cfg_params['LSF.resource']
30 <
31 <        self._taskId = common.taskDB.dict('taskId')
32 <
33 <        self.return_data = 1
34 <
35 <        ## Get local domain name
36 <        import socket
37 <        tmp=socket.gethostname()
38 <        dot=string.find(tmp,'.')
39 <        if (dot==-1):
40 <            msg='Unkown domain name. Cannot use local scheduler'
41 <            raise CrabException(msg)
42 <        localDomainName = string.split(tmp,'.',1)[-1]
43 <        cfg_params['EDG.se_white_list']=localDomainName
44 <        common.logger.message("Your domain name is "+str(localDomainName)+": only local dataset will be considered")
24 >        SchedulerLocal.configure(self, cfg_params)
25 >        self.outputDir = cfg_params.get('USER.outputdir' ,common.work_space.resDir())
26 >        self.environment_unique_identifier = "https://"+common.scheduler.name()+":/${LSB_BATCH_JID}-"+ \
27 >            string.replace(common._db.queryTask('name'),"_","-")
28  
29          return
30  
31 <
49 <    def sched_parameter(self):
31 >    def realSchedParams(self,cfg_params):
32          """
33 <        Returns parameter scheduler-specific, to use with BOSS .
33 >        Return dictionary with specific parameters, to use
34 >        with real scheduler  
35          """
36 <        index = int(common.jobDB.nJobs()) - 1
37 <        job = common.job_list[index]
38 <        jbt = job.type()
56 <
57 <        lastBlock=-1
58 <        first = []
59 <        for n in range(common.jobDB.nJobs()):
60 <            currBlock=common.jobDB.block(n)
61 <            if (currBlock!=lastBlock):
62 <                lastBlock = currBlock
63 <                first.append(n)
64 <
65 <        req = ''
66 <        req = req + jbt.getRequirements()
67 <
68 <        for i in range(len(first)): # Add loop DS
69 <            groupReq = req
70 <            self.param='sched_param_'+str(i)+'.clad'
71 <            param_file = open(common.work_space.shareDir()+'/'+self.param, 'w')
72 <
73 <            if (self.lsf_queue):
74 <                param_file.write('queue = '+self.lsf_queue +';\n')
75 <                if (self.lsf_res): param_file.write('requirement = '+self.lsf_res +';\n')
76 <            pass
77 <
78 <            param_file.close()
79 <        pass
80 <
81 <        return
36 >        ### use by the BossLite script
37 >        self.cpCmd  =  cfg_params.get(self.name().upper()+'.cp_command','cp')
38 >        self.rfioName =  cfg_params.get(self.name().upper()+'.rfio_server','')
39  
40 <    def userName(self):
41 <        """ return the user name """
42 <        import pwd,getpass
43 <        tmp=pwd.getpwnam(getpass.getuser())[4]
87 <        return tmp.strip()
40 >        params = { 'cpCmd'  : self.cpCmd, \
41 >                   'rfipName' : self.rfioName
42 >                 }
43 >        return  params
44  
45 <    def wsSetupEnvironment(self):
45 >    def sched_parameter(self,i,task):
46          """
47 <        Returns part of a job script which does scheduler-specific work.
47 >        Returns parameter scheduler-specific, to use with BOSS .
48          """
49 <        txt = '# LSF specific stuff\n'
50 <        txt += '# strip arguments\n'
95 <        txt += 'echo "strip arguments"\n'
96 <        txt += 'args=("$@")\n'
97 <        txt += 'nargs=$#\n'
98 <        txt += 'shift $nargs\n'
99 <        txt += "# job number (first parameter for job wrapper)\n"
100 <        txt += "NJob=${args[0]}\n"
101 <
102 <        txt += 'MonitorJobID=`echo ${NJob}_$LSB_BATCH_JID`\n'
103 <        txt += 'SyncGridJobId=`echo $LSB_BATCH_JID`\n'
104 <        txt += 'MonitorID=`echo ' + self._taskId + '`\n'
105 <
106 <        txt += 'echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
107 <        txt += 'echo "SyncGridJobId=`echo $SyncGridJobId`" | tee -a $RUNTIME_AREA/$repo \n'
108 <        txt += 'echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
49 >        index = int(common._db.nJobs()) - 1
50 >        sched_param= ''
51  
52 <        txt += 'middleware=LSF \n'
52 >        for i in range(index): # Add loop DS
53  
54 <        txt += 'dumpStatus $RUNTIME_AREA/$repo \n'
55 <
56 <        txt += '\n\n'
57 <
58 <        return txt
54 >            sched_param= ''
55 >            if (self.queue):
56 >                sched_param += '-q '+self.queue +' '
57 >                if (self.res): sched_param += ' -R '+self.res +' '
58 >            pass
59  
60 <    def createXMLSchScript(self, nj, argsList):
60 >        sched_param+='-cwd '+ str(self.outputDir)  + ' '
61 >        return sched_param
62  
63 <        """
64 <        Create a XML-file for BOSS4.
65 <        """
63 >    def loggingInfo(self, id):
64 >        """ return logging info about job nj """
65 >        cmd = 'bjobs -l ' + id
66 >        cmd_out = runCommand(cmd)
67 >        return cmd_out
68  
69 +    def wsExitFunc(self):
70          """
125        INDY
126        [begin] FIX-ME:
127        I would pass jobType instead of job
71          """
72 <        index = nj - 1
130 <        job = common.job_list[index]
131 <        jbt = job.type()
132 <        inp_sandbox = jbt.inputSandbox(index)
133 <        #out_sandbox = jbt.outputSandbox(index)
134 <        """
135 <        [end] FIX-ME
136 <        """
137 <
138 <
139 <        title = '<?xml version="1.0" encoding="UTF-8" standalone="no"?>\n'
140 <        jt_string = ''
141 <
142 <        xml_fname = str(self.jobtypeName)+'.xml'
143 <        xml = open(common.work_space.shareDir()+'/'+xml_fname, 'a')
144 <
145 <        #TaskName
146 <        dir = string.split(common.work_space.topDir(), '/')
147 <        taskName = dir[len(dir)-2]
72 >        txt = '\n'
73  
74 <        to_write = ''
74 >        txt += '#\n'
75 >        txt += '# EXECUTE THIS FUNCTION BEFORE EXIT \n'
76 >        txt += '#\n\n'
77  
78 <        req=' '
79 <        req = req + jbt.getRequirements()
78 >        txt += 'func_exit() { \n'
79 >        txt += self.wsExitFunc_common()
80  
81 <        #TaskName
82 <        dir = string.split(common.work_space.topDir(), '/')
83 <        taskName = dir[len(dir)-2]
84 <
85 <        xml.write(str(title))
159 <
160 <        #First check the X509_USER_PROXY. In not there use the default
161 <        xml.write('<task name="' +str(taskName)+ '" sub_path="' +common.work_space.pathForTgz() + 'share/.boss_cache"' + '>\n')
162 <
163 <        xml.write(jt_string)
164 <
165 <        if (to_write != ''):
166 <            xml.write('<extraTags\n')
167 <            xml.write(to_write)
168 <            xml.write('/>\n')
169 <            pass
170 <
171 <        xml.write('<iterator>\n')
172 <        xml.write('\t<iteratorRule name="ITR1">\n')
173 <        xml.write('\t\t<ruleElement> 1:'+ str(nj) + ' </ruleElement>\n')
174 <        xml.write('\t</iteratorRule>\n')
175 <        xml.write('\t<iteratorRule name="ITR2">\n')
176 <        for arg in argsList:
177 <            xml.write('\t\t<ruleElement> <![CDATA[\n'+ arg + '\n\t\t]]> </ruleElement>\n')
178 <            pass
179 <        xml.write('\t</iteratorRule>\n')
180 <        #print jobList
181 <        xml.write('\t<iteratorRule name="ITR3">\n')
182 <        xml.write('\t\t<ruleElement> 1:'+ str(nj) + ':1:6 </ruleElement>\n')
183 <        xml.write('\t</iteratorRule>\n')
184 <
185 <        xml.write('<chain name="' +str(taskName)+'__ITR1_" scheduler="'+str(self.name())+'">\n')
186 <       # xml.write('<chain scheduler="'+str(self.schedulerName)+'">\n')
187 <        xml.write(jt_string)
188 <
189 <        #executable
190 <
191 <        script = job.scriptFilename()
192 <        xml.write('<program>\n')
193 <        xml.write('<exec> ' + os.path.basename(script) +' </exec>\n')
194 <        xml.write(jt_string)
195 <
196 <        xml.write('<args> <![CDATA[\n _ITR2_ \n]]> </args>\n')
197 <        xml.write('<program_types> crabjob </program_types>\n')
198 <        inp_box = common.work_space.pathForTgz() + 'job/' + jbt.scriptName + ','
199 <
200 <        if inp_sandbox != None:
201 <            for fl in inp_sandbox:
202 <                inp_box = inp_box + '' + fl + ','
203 <                pass
204 <            pass
205 <
206 <        if inp_box[-1] == ',' : inp_box = inp_box[:-1]
207 <        inp_box = '<infiles> <![CDATA[\n' + inp_box + '\n]]> </infiles>\n'
208 <        xml.write(inp_box)
209 <
210 <        base = jbt.name()
211 <        stdout = base + '__ITR3_.stdout'
212 <        stderr = base + '__ITR3_.stderr'
213 <
214 <        xml.write('<stderr> ' + stderr + '</stderr>\n')
215 <        xml.write('<stdout> ' + stdout + '</stdout>\n')
216 <
217 <
218 <        out_box = stdout + ',' + \
219 <                  stderr + ',.BrokerInfo,'
220 <
221 <        # Stuff to be returned _always_ via sandbox
222 <        for fl in jbt.output_file_sandbox:
223 <            out_box = out_box + '' + jbt.numberFile_(fl, '_ITR1_') + ','
224 <            pass
225 <        pass
226 <
227 <        # via sandbox iif required return_data
228 <        if int(self.return_data) == 1:
229 <            for fl in jbt.output_file:
230 <                out_box = out_box + '' + jbt.numberFile_(fl, '_ITR1_') + ','
231 <                pass
232 <            pass
233 <
234 <        if out_box[-1] == ',' : out_box = out_box[:-1]
235 <        out_box = '<outfiles> <![CDATA[\n' + out_box + '\n]]></outfiles>\n'
236 <        xml.write(out_box)
237 <
238 <        xml.write('<BossAttr> crabjob.INTERNAL_ID=_ITR1_ </BossAttr>\n')
239 <
240 <        xml.write('</program>\n')
241 <        xml.write('</chain>\n')
242 <
243 <        xml.write('</iterator>\n')
244 <        xml.write('</task>\n')
245 <
246 <        xml.close()
81 >        txt += '    cp *.${LSB_BATCH_JID}.out CMSSW_${NJob}.stdout \n'
82 >        txt += '    cp *.${LSB_BATCH_JID}.err CMSSW_${NJob}.stderr \n'
83 >        txt += '    tar zcvf ${out_files}.tgz  ${filesToCheck}\n'
84 >        txt += '    exit $job_exit_code\n'
85 >        txt += '}\n'
86  
87 +        return txt
88  
249        return

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines