ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerEdg.py
(Generate patch)

Comparing COMP/CRAB/python/SchedulerEdg.py (file contents):
Revision 1.7 by slacapra, Mon Jul 25 14:31:24 2005 UTC vs.
Revision 1.15 by fanzago, Wed Oct 5 16:23:32 2005 UTC

# Line 9 | Line 9 | import os, sys, tempfile
9   class SchedulerEdg(Scheduler):
10      def __init__(self):
11          Scheduler.__init__(self,"EDG")
12 +        self.states = [ "Acl", "cancelReason", "cancelling","ce_node","children", \
13 +                      "children_hist","children_num","children_states","condorId","condor_jdl", \
14 +                      "cpuTime","destination", "done_code","exit_code","expectFrom", \
15 +                      "expectUpdate","globusId","jdl","jobId","jobtype", \
16 +                      "lastUpdateTime","localId","location", "matched_jdl","network_server", \
17 +                      "owner","parent_job", "reason","resubmitted","rsl","seed",\
18 +                      "stateEnterTime","stateEnterTimes","subjob_failed", \
19 +                      "user tags" , "status" , "status_code","hierarchy"]
20          return
21  
22      def configure(self, cfg_params):
23  
16        try: self.edg_ui_cfg = cfg_params["EDG.rb_config"]
17        except KeyError: self.edg_ui_cfg = ''
18
24          try: self.edg_config = cfg_params["EDG.config"]
25          except KeyError: self.edg_config = ''
26  
# Line 31 | Line 36 | class SchedulerEdg(Scheduler):
36          try: self.EDG_retry_count = cfg_params['EDG.retry_count']
37          except KeyError: self.EDG_retry_count = ''
38  
39 <        try:
40 <            self.VO = cfg_params['EDG.virtual_organization']
36 <        except KeyError:
37 <            msg = 'EDG.virtual_organization is mandatory.'
38 <            raise CrabException(msg)
39 >        try: self.VO = cfg_params['EDG.virtual_organization']
40 >        except KeyError: self.VO = 'cms'
41  
42 <        
43 <        #self.scripts_dir = common.bin_dir + '/scripts'
44 <        #self.cmd_prefix = 'edg'
45 <        #if common.LCG_version == '0' : self.cmd_prefix = 'dg'
46 <
47 <        # Add EDG_WL_LOCATION to the python path
48 <
49 <        try:
50 <            path = os.environ['EDG_WL_LOCATION']
51 <        except:
52 <            msg = "Error: the EDG_WL_LOCATION variable is not set."
53 <            raise CrabException(msg)
54 <
55 <        libPath=os.path.join(path, "lib")
56 <        sys.path.append(libPath)
57 <        libPath=os.path.join(path, "lib", "python")
58 <        sys.path.append(libPath)
42 >        try: self.return_data = cfg_params['USER.return_data']
43 >        except KeyError: self.return_data = ''
44 >
45 >        try:
46 >            self.copy_data = cfg_params["USER.copy_data"]
47 >            try:
48 >                self.SE = cfg_params['USER.storage_element']
49 >                self.SE_PATH = cfg_params['USER.storage_path']
50 >            except KeyError:
51 >                msg = "Error. The [USER] section does not have 'storage_element'"
52 >                msg = msg + " and/or 'storage_path' entries, necessary to copy the output"
53 >                common.logger.message(msg)
54 >                raise CrabException(msg)
55 >        except KeyError: self.copy_data = ''
56 >
57 >        try:
58 >            self.register_data = cfg_params["USER.register_data"]
59 >            try:
60 >                 self.LFN = cfg_params['USER.lfn_dir']
61 >            except KeyError:
62 >                msg = "Error. The [USER] section does not have 'lfn_dir' value"
63 >                msg = msg + " it's necessary for RLS registration"
64 >                common.logger.message(msg)
65 >                raise CrabException(msg)
66 >        except KeyError: self.register_data= ''
67 >
68 >        try: self.EDG_requirements = cfg_params['EDG.requirements']
69 >        except KeyError: self.EDG_requirements = ''
70 >                                                                                                                                                            
71 >        try: self.EDG_retry_count = cfg_params['EDG.retry_count']
72 >        except KeyError: self.EDG_retry_count = ''
73 >                                                                                                                                                            
74 >        try: self.EDG_clock_time = cfg_params['EDG.max_wall_clock_time']
75 >        except KeyError: self.EDG_clock_time= ''
76 >                                                                                                                                                            
77 >        try: self.EDG_cpu_time = cfg_params['EDG.max_cpu_time']
78 >        except KeyError: self.EDG_cpu_time = ''
79 >
80 > #        # Add EDG_WL_LOCATION to the python path
81 > #
82 > #        try:
83 > #           path = os.environ['EDG_WL_LOCATION']
84 > #       except:
85 > #           msg = "Error: the EDG_WL_LOCATION variable is not set."
86 > #           raise CrabException(msg)
87 > #
88 > #       libPath=os.path.join(path, "lib")
89 > #       sys.path.append(libPath)
90 > #       libPath=os.path.join(path, "lib", "python")
91 > #       sys.path.append(libPath)
92  
93          self.checkProxy_()
94          return
95      
96 +
97 +    def sched_parameter(self):
98 +        """
99 +        Returns file with scheduler-specific parameters
100 +        """
101 +      
102 +        if (self.edg_config and self.edg_config_vo != ''):
103 +            self.param='sched_param.clad'
104 +            param_file = open(common.work_space.shareDir()+'/'+self.param, 'w')
105 +            param_file.write('RBconfig = "'+self.edg_config+'";\n')  
106 +            param_file.write('RBconfigVO = "'+self.edg_config_vo+'";')
107 +            param_file.close()  
108 +            return 1
109 +        else:
110 +            return 0
111 +
112      def wsSetupEnvironment(self):
113          """
114          Returns part of a job script which does scheduler-specific work.
115          """
116 <        txt = '\n'
116 >
117 >        txt = ''
118 >        if self.copy_data:
119 >           if self.SE:
120 >              txt += 'export SE='+self.SE+'\n'
121 >              txt += 'echo "SE = $SE"\n'
122 >           if self.SE_PATH:
123 >              if ( self.SE_PATH[-1] != '/' ) : self.SE_PATH = self.SE_PATH + '/'
124 >              txt += 'export SE_PATH='+self.SE_PATH+'\n'
125 >              txt += 'echo "SE_PATH = $SE_PATH"\n'
126 >                                                                                                                                                            
127 >        if self.register_data:
128 >           if self.VO:
129 >              txt += 'export VO='+self.VO+'\n'
130 >           if self.LFN:
131 >              txt += 'export LFN='+self.LFN+'\n'
132 >              txt += '\n'
133          txt += 'CloseCEs=`edg-brokerinfo getCE`\n'
134          txt += 'echo "CloseCEs = $CloseCEs"\n'
135          txt += 'CE=`echo $CloseCEs | sed -e "s/:.*//"`\n'
136          txt += 'echo "CE = $CE"\n'
137          return txt
138  
139 +    def wsCopyOutput(self):
140 +        """
141 +        Write a CopyResults part of a job script, e.g.
142 +        to copy produced output into a storage element.
143 +        """
144 +        txt = ''
145 +        if self.copy_data:
146 +           copy = 'globus-url-copy file://`pwd`/$out_file gsiftp://${SE}${SE_PATH}$out_file'
147 +           txt += '#\n'
148 +           txt += '#   Copy output to SE = $SE\n'
149 +           txt += '#\n'
150 +           txt += 'if [ $executable_exit_status -eq 0 ]; then\n'
151 +           txt += '  for out_file in $file_list ; do\n'
152 +           txt += '    echo "Trying to copy output file to $SE "\n'
153 +           txt += '    echo "'+copy+'"\n'
154 +           txt += '    '+copy+' 2>&1\n'
155 +           txt += '    copy_exit_status=$?\n'
156 +           txt += '    echo "COPY_EXIT_STATUS = $copy_exit_status"\n'
157 +           txt += '    echo "STAGE_OUT = $copy_exit_status"\n'
158 +           txt += '    if [ $copy_exit_status -ne 0 ]; then \n'
159 +           txt += '       echo "Problems with SE= $SE" \n'
160 +           txt += '    else \n'
161 +           txt += '       echo "output copied into $SE/$SE_PATH directory"\n'
162 +           txt += '    fi \n'
163 +           txt += '  done\n'
164 +           txt += 'fi \n'
165 +        return txt
166 +
167 +    def wsRegisterOutput(self):
168 +        """
169 +        Returns part of a job script which does scheduler-specific work.
170 +        """
171 +
172 +        txt = ''
173 +        if self.register_data:
174 +           txt += '#\n'
175 +           txt += '#  Register output to RLS\n'
176 +           txt += '#\n'
177 +           txt += 'if [[ $executable_exit_status -eq 0 && $copy_exit_status -eq 0 ]]; then\n'
178 +           txt += '   for out_file in $file_list ; do\n'
179 +           txt += '      echo "Trying to register the output file into RLS"\n'
180 +           txt += '      echo "lcg-rf -l $LFN/$out_file --vo $VO sfn://$SE$SE_PATH/$out_file"\n'
181 +           txt += '      lcg-rf -l $LFN/$out_file --vo $VO sfn://$SE$SE_PATH/$out_file 2>&1 \n'
182 +           txt += '      register_exit_status=$?\n'
183 +           txt += '      echo "REGISTER_EXIT_STATUS = $register_exit_status"\n'
184 +           txt += '      echo "STAGE_OUT = $register_exit_status"\n'
185 +           txt += '      if [ $register_exit_status -ne 0 ]; then \n'
186 +           txt += '         echo "Problems with the registration to RLS" \n'
187 +           txt += '         echo "Try with srm protocol" \n'
188 +           txt += '         echo "lcg-rf -l $LFN/$out_file --vo $VO srm://$SE$SE_PATH/$out_file"\n'
189 +           txt += '         lcg-rf -l $LFN/$out_file --vo $VO srm://$SE$SE_PATH/$out_file 2>&1 \n'
190 +           txt += '         register_exit_status=$?\n'
191 +           txt += '         echo "REGISTER_EXIT_STATUS = $register_exit_status"\n'
192 +           txt += '         echo "STAGE_OUT = $register_exit_status"\n'
193 +           txt += '         if [ $register_exit_status -ne 0 ]; then \n'
194 +           txt += '            echo "Problems with the registration into RLS" \n'
195 +           txt += '         fi \n'
196 +           txt += '      else \n'
197 +           txt += '         echo "output registered to RLS"\n'
198 +           txt += '      fi \n'
199 +           txt += '   done\n'
200 +           txt += 'elif [[ $executable_exit_status -eq 0 && $copy_exit_status -ne 0 ]]; then \n'
201 +           txt += '   echo "Trying to copy output file to CloseSE"\n'
202 +           txt += '   CLOSE_SE=`edg-brokerinfo getCloseSEs | head -1`\n'
203 +           txt += '   for out_file in $file_list ; do\n'
204 +           txt += '      echo "lcg-cr -v -l lfn:${LFN}/$out_file -d $SE -P $LFN/$out_file --vo $VO file://`pwd`/$out_file" \n'
205 +           txt += '      lcg-cr -v -l lfn:${LFN}/$out_file -d $SE -P $LFN/$out_file --vo $VO file://`pwd`/$out_file 2>&1 \n'
206 +           txt += '      register_exit_status=$?\n'
207 +           txt += '      echo "REGISTER_EXIT_STATUS = $register_exit_status"\n'
208 +           txt += '      echo "STAGE_OUT = $register_exit_status"\n'
209 +           txt += '      if [ $register_exit_status -ne 0 ]; then \n'
210 +           txt += '         echo "Problems with CloseSE" \n'
211 +           txt += '      else \n'
212 +           txt += '         echo "The program was successfully executed"\n'
213 +           txt += '         echo "SE = $CLOSE_SE"\n'
214 +           txt += '         echo "LFN for the file is LFN=${LFN}/$out_file"\n'
215 +           txt += '      fi \n'
216 +           txt += '   done\n'
217 +           txt += 'else\n'
218 +           txt += '   echo "Problem with the executable"\n'
219 +           txt += 'fi \n'
220 +        return txt
221 +        #####################
222 +
223      def loggingInfo(self, nj):
224          """
225          retrieve the logging info from logging and bookkeeping and return it
# Line 112 | Line 263 | class SchedulerEdg(Scheduler):
263          Match=0
264          for line in out:
265              line = line.strip()
115            #print line
266              if reComment.match( line ):
267                  next = 0
268                  continue
# Line 187 | Line 337 | class SchedulerEdg(Scheduler):
337              pass
338          return jid
339  
340 +    def getExitStatus(self, id):
341 +        return self.getStatusAttribute_(id, 'exit_code')
342 +
343      def queryStatus(self, id):
344 +        return self.getStatusAttribute_(id, 'status')
345 +
346 +    def queryDest(self, id):  
347 +        return self.getStatusAttribute_(id, 'destination')
348 +
349 +
350 +    def getStatusAttribute_(self, id, attr):
351          """ Query a status of the job with id """
352 <        cmd0 = 'edg-job-status '
353 <        cmd = cmd0 + id
354 <        cmd_out = runCommand(cmd)
355 <        if cmd_out == None:
356 <            common.logger.message('Error. No output from `'+cmd+'`')
357 <            return None
358 <        # parse output
359 <        status_prefix = 'Current Status:'
360 <        status_index = string.find(cmd_out, status_prefix)
361 <        if status_index == -1:
362 <            common.logger.message('Error. Bad output of `'+cmd0+'`:\n'+cmd_out)
352 >
353 >        hstates = {}
354 >        Status = importName('edg_wl_userinterface_common_LbWrapper', 'Status')
355 >        # Bypass edg-job-status interfacing directly to C++ API
356 >        # Job attribute vector to retrieve status without edg-job-status
357 >        level = 0
358 >        # Instance of the Status class provided by LB API
359 >        jobStat = Status()
360 >        st = 0
361 >        jobStat.getStatus(id, level)
362 >        err, apiMsg = jobStat.get_error()
363 >        if err:
364 >            print 'Error caught', apiMsg
365 >            common.log.message(apiMsg)
366              return None
367 <        status = cmd_out[(status_index+len(status_prefix)):]
368 <        nl = string.find(status,'\n')
369 <        status = string.strip(status[0:nl])
370 <        return status
367 >        else:
368 >            for i in range(len(self.states)):
369 >                # Fill an hash table with all information retrieved from LB API
370 >                hstates[ self.states[i] ] = jobStat.loadStatus(st)[i]
371 >            result = jobStat.loadStatus(st)[ self.states.index(attr) ]
372 >            return result
373  
374      def queryDetailedStatus(self, id):
375          """ Query a detailed status of the job with id """
# Line 247 | Line 412 | class SchedulerEdg(Scheduler):
412          if timeleft < 1:  ok=0
413  
414          if ok==0:
415 <            msg = 'No valid proxy found !\n'
416 <            msg += "Please do 'grid-proxy-init'."
417 <            raise CrabException(msg)
415 >            print "No valid proxy found !\n"
416 >            print "Creating a user proxy with default length of 100h\n"
417 >            msg = "Unable to create a valid proxy!\n"
418 >            if os.system("grid-proxy-init -valid 100:00"):
419 >                raise CrabException(msg)
420          return
421      
422 <    def createJDL(self, nj):
422 >    def createSchScript(self, nj):
423          """
424          Create a JDL-file for EDG.
425          """
426  
427          job = common.job_list[nj]
428          jbt = job.type()
262 #        jbt.loadJobInfo()
429          inp_sandbox = jbt.inputSandbox(nj)
430          out_sandbox = jbt.outputSandbox(nj)
431 <        inp_storage_subdir = ''#jbt.inputStorageSubdir()
431 >        inp_storage_subdir = ''
432          
433          title = '# This JDL was generated by '+\
434                  common.prog_name+' (version '+common.prog_version_str+')\n'
435          jt_string = ''
436 +
437 +
438          
439          SPL = inp_storage_subdir
440          if ( SPL and SPL[-1] != '/' ) : SPL = SPL + '/'
# Line 279 | Line 447 | class SchedulerEdg(Scheduler):
447          jdl.write('Executable = "' + os.path.basename(script) +'";\n')
448          jdl.write(jt_string)
449  
450 +        ### only one .sh  JDL has arguments:
451 +        firstEvent = common.jobDB.firstEvent(nj)
452 +        maxEvents = common.jobDB.maxEvents(nj)
453 +        jdl.write('Arguments = "' + str(nj+1)+' '+str(firstEvent)+' '+str(maxEvents)+'";\n')
454 +
455          inp_box = 'InputSandbox = { '
456          inp_box = inp_box + '"' + script + '",'
457  
# Line 302 | Line 475 | class SchedulerEdg(Scheduler):
475  
476          jdl.write('StdOutput     = "' + job.stdout() + '";\n')
477          jdl.write('StdError      = "' + job.stderr() + '";\n')
478 <
479 <        #if common.flag_return_data :
480 <        #    for fl in job.outputDataFiles():
481 <        #        out_box = out_box + ' "' + fl + '",'
482 <        #        pass
483 <        #    pass
484 <
485 <        out_box = 'OutputSandbox = { '
486 <        if out_sandbox != None:
487 <            for fl in out_sandbox:
488 <                out_box = out_box + ' "' + fl + '",'
478 >        
479 >        
480 >        if job.stdout() == job.stderr():
481 >          out_box = 'OutputSandbox = { "' + \
482 >                    job.stdout() + '", ".BrokerInfo",'
483 >        else:
484 >          out_box = 'OutputSandbox = { "' + \
485 >                    job.stdout() + '", "' + \
486 >                    job.stderr() + '", ".BrokerInfo",'
487 >
488 >        if self.return_data :
489 >            if out_sandbox != None:
490 >                for fl in out_sandbox:
491 >                    out_box = out_box + ' "' + fl + '",'
492 >                    pass
493                  pass
494              pass
495 <
495 >                                                                                                                                                            
496          if out_box[-1] == ',' : out_box = out_box[:-1]
497          out_box = out_box + ' };'
498          jdl.write(out_box+'\n')
499  
500 <        # If CloseCE is used ...
324 <        #if common.flag_usecloseCE and job.inputDataFiles():
325 <        #    indata = 'InputData = { '
326 <        #    for fl in job.inputDataFiles():
327 <        #       indata = indata + ' "lfn:' + SPL + fl + '",'
328 <        #    if indata[-1] == ',' : indata = indata[:-1]
329 <        #    indata = indata + ' };'
330 <        #    jdl.write(indata+'\n')
331 <        #    jdl.write('DataAccessProtocol = { "gsiftp" };\n')
332 <
500 >        ### if at least a CE exists ...
501          if common.analisys_common_info['sites']:
502 <           if common.analisys_common_info['sw_version']:
503 <
504 <             req='Requirements = '
505 <         ### First ORCA version
506 <             req=req + 'Member("VO-cms-' + \
507 <                 common.analisys_common_info['sw_version'] + \
508 <                 '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
509 <         ## then sites
510 <             if len(common.analisys_common_info['sites'])>0:
511 <               req = req + ' && ('
512 <             for i in range(len(common.analisys_common_info['sites'])):
513 <                req = req + 'other.GlueCEInfoHostName == "' \
514 <                      + common.analisys_common_info['sites'][i] + '"'
515 <                if ( i < (int(len(common.analisys_common_info['sites']) - 1)) ):
516 <                    req = req + ' || '
517 <             req = req + ')'
518 <         ## then user requirement
519 <             if self.EDG_requirements:
520 <               req = req +  ' && ' + self.EDG_requirements
521 <             req = req + ';\n'
522 <        jdl.write(req)
523 <
502 >            if common.analisys_common_info['sw_version']:
503 >                req='Requirements = '
504 >                req=req + 'Member("VO-cms-' + \
505 >                     common.analisys_common_info['sw_version'] + \
506 >                     '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
507 >            if len(common.analisys_common_info['sites'])>0:
508 >                req = req + ' && ('
509 >                for i in range(len(common.analisys_common_info['sites'])):
510 >                    req = req + 'other.GlueCEInfoHostName == "' \
511 >                         + common.analisys_common_info['sites'][i] + '"'
512 >                    if ( i < (int(len(common.analisys_common_info['sites']) - 1)) ):
513 >                        req = req + ' || '
514 >            req = req + ')'
515 >
516 >            #### and USER REQUIREMENT
517 >            if self.EDG_requirements:
518 >                req = req +  ' && ' + self.EDG_requirements
519 >            if self.EDG_clock_time:
520 >                req = req + ' && other.GlueCEPolicyMaxWallClockTime>='+self.EDG_clock_time
521 >            if self.EDG_cpu_time:
522 >                req = req + ' && other.GlueCEPolicyMaxCPUTime>='+self.EDG_cpu_time
523 >            req = req + ';\n'
524 >            jdl.write(req)
525 >                                                                                                                                                            
526          jdl.write('VirtualOrganisation = "' + self.VO + '";\n')
527  
528          if ( self.EDG_retry_count ):              

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines