ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerEdg.py
(Generate patch)

Comparing COMP/CRAB/python/SchedulerEdg.py (file contents):
Revision 1.92 by slacapra, Fri Oct 6 16:02:29 2006 UTC vs.
Revision 1.106.2.1 by slacapra, Wed Dec 6 10:33:05 2006 UTC

# Line 18 | Line 18 | class SchedulerEdg(Scheduler):
18                        "owner","parent_job", "reason","resubmitted","rsl","seed",\
19                        "stateEnterTime","stateEnterTimes","subjob_failed", \
20                        "user tags" , "status" , "status_code","hierarchy"]
21 +        
22          return
23  
24      def configure(self, cfg_params):
25  
26          try:
27 <            RB = cfg_params["EDG.rb"]
28 <            edgConfig = EdgConfig(RB)
28 <            self.edg_config = edgConfig.config()
29 <            self.edg_config_vo = edgConfig.configVO()
27 >            RB=cfg_params["EDG.rb"]
28 >            self.rb_param_file=self.rb_configure(RB)
29          except KeyError:
30 <            self.edg_config = ''
31 <            self.edg_config_vo = ''
33 <
30 >            self.rb_param_file=''
31 >            pass
32          try:
33              self.proxyServer = cfg_params["EDG.proxy_server"]
34          except KeyError:
# Line 50 | Line 48 | class SchedulerEdg(Scheduler):
48          try: self.LCG_version = cfg_params["EDG.lcg_version"]
49          except KeyError: self.LCG_version = '2'
50  
53        try: self.EDG_requirements = cfg_params['EDG.requirements']
54        except KeyError: self.EDG_requirements = ''
55
56        try: self.EDG_retry_count = cfg_params['EDG.retry_count']
57        except KeyError: self.EDG_retry_count = ''
58
51          try:
52              self.EDG_ce_black_list = cfg_params['EDG.ce_black_list']
61            #print "self.EDG_ce_black_list = ", self.EDG_ce_black_list
53          except KeyError:
54              self.EDG_ce_black_list  = ''
55  
56          try:
57              self.EDG_ce_white_list = cfg_params['EDG.ce_white_list']
67            #print "self.EDG_ce_white_list = ", self.EDG_ce_white_list
58          except KeyError: self.EDG_ce_white_list = ''
59  
60          try: self.VO = cfg_params['EDG.virtual_organization']
61          except KeyError: self.VO = 'cms'
62  
63 +        try: self.copy_input_data = cfg_params["USER.copy_input_data"]
64 +        except KeyError: self.copy_input_data = 0
65 +
66          try: self.return_data = cfg_params['USER.return_data']
67          except KeyError: self.return_data = 0
68  
# Line 133 | Line 126 | class SchedulerEdg(Scheduler):
126  
127          try: self.EDG_requirements = cfg_params['EDG.requirements']
128          except KeyError: self.EDG_requirements = ''
129 <                                                                                                                                                            
129 >
130 >        try: self.EDG_addJdlParam = string.split(cfg_params['EDG.additional_jdl_parameters'],',')
131 >        except KeyError: self.EDG_addJdlParam = []
132 >
133          try: self.EDG_retry_count = cfg_params['EDG.retry_count']
134          except KeyError: self.EDG_retry_count = ''
135 <                                                                                                                                                            
135 >
136 >        try: self.EDG_shallow_retry_count= cfg_params['EDG.shallow_retry_count']
137 >        except KeyError: self.EDG_shallow_retry_count = ''
138 >
139          try: self.EDG_clock_time = cfg_params['EDG.max_wall_clock_time']
140          except KeyError: self.EDG_clock_time= ''
141 <                                                                                                                                                            
141 >
142          try: self.EDG_cpu_time = cfg_params['EDG.max_cpu_time']
143          except KeyError: self.EDG_cpu_time = ''
144  
# Line 169 | Line 168 | class SchedulerEdg(Scheduler):
168          try: self.schedulerName = cfg_params['CRAB.scheduler']
169          except KeyError: self.scheduler = ''
170  
171 +        try: self.dontCheckProxy=cfg_params["EDG.dont_check_proxy"]
172 +        except KeyError: self.dontCheckProxy = 0
173 +
174          return
175      
176  
177 +    def rb_configure(self, RB):
178 +        self.edg_config = ''
179 +        self.edg_config_vo = ''
180 +        self.rb_param_file = ''
181 +
182 +        edgConfig = EdgConfig(RB)
183 +        self.edg_config = edgConfig.config()
184 +        self.edg_config_vo = edgConfig.configVO()
185 +
186 +        if (self.edg_config and self.edg_config_vo != ''):
187 +            self.rb_param_file = 'RBconfig = "'+self.edg_config+'";\nRBconfigVO = "'+self.edg_config_vo+'";'
188 +            #print "rb_param_file = ", self.rb_param_file
189 +        return self.rb_param_file
190 +      
191 +
192      def sched_parameter(self):
193          """
194          Returns file with requirements and scheduler-specific parameters
# Line 180 | Line 197 | class SchedulerEdg(Scheduler):
197          job = common.job_list[index]
198          jbt = job.type()
199          
200 <        lastDest=''
200 >        lastBlock=-1
201          first = []
185        last  = []
202          for n in range(common.jobDB.nJobs()):
203 <            currDest=common.jobDB.destination(n)
204 <            if (currDest!=lastDest):
205 <                lastDest = currDest
203 >            currBlock=common.jobDB.block(n)
204 >            if (currBlock!=lastBlock):
205 >                lastBlock = currBlock
206                  first.append(n)
191                if n != 0:last.append(n-1)
192        if len(first)>len(last) :last.append(common.jobDB.nJobs())
207    
208          req = ''
209          req = req + jbt.getRequirements()
# Line 199 | Line 213 | class SchedulerEdg(Scheduler):
213                  req = req + self.EDG_requirements
214              else:
215                  req = req +  ' && ' + self.EDG_requirements
216 +
217          if self.EDG_ce_white_list:
218              ce_white_list = string.split(self.EDG_ce_white_list,',')
219              for i in range(len(ce_white_list)):
220                  if i == 0:
221                      if (req == ' '):
222 <                        req = req + '((RegExp("' + ce_white_list[i] + '", other.GlueCEUniqueId))'
222 >                        req = req + '((RegExp("' + string.strip(ce_white_list[i]) + '", other.GlueCEUniqueId))'
223                      else:
224 <                        req = req +  ' && ((RegExp("' + ce_white_list[i] + '", other.GlueCEUniqueId))'
224 >                        req = req +  ' && ((RegExp("' +  string.strip(ce_white_list[i]) + '", other.GlueCEUniqueId))'
225                      pass
226                  else:
227 <                    req = req +  ' || (RegExp("' + ce_white_list[i] + '", other.GlueCEUniqueId))'
227 >                    req = req +  ' || (RegExp("' +  string.strip(ce_white_list[i]) + '", other.GlueCEUniqueId))'
228              req = req + ')'
229          
230          if self.EDG_ce_black_list:
231              ce_black_list = string.split(self.EDG_ce_black_list,',')
232              for ce in ce_black_list:
233                  if (req == ' '):
234 <                    req = req + '(!RegExp("' + ce + '", other.GlueCEUniqueId))'
234 >                    req = req + '(!RegExp("' + string.strip(ce) + '", other.GlueCEUniqueId))'
235                  else:
236 <                    req = req +  ' && (!RegExp("' + ce + '", other.GlueCEUniqueId))'
236 >                    req = req +  ' && (!RegExp("' + string.strip(ce) + '", other.GlueCEUniqueId))'
237                  pass
238          if self.EDG_clock_time:
239              if (req == ' '):
# Line 233 | Line 248 | class SchedulerEdg(Scheduler):
248                  req = req + ' && other.GlueCEPolicyMaxCPUTime>='+self.EDG_cpu_time
249                  
250          for i in range(len(first)): # Add loop DS
251 +            groupReq = req
252              self.param='sched_param_'+str(i)+'.clad'
253              param_file = open(common.work_space.shareDir()+'/'+self.param, 'w')
254  
255              itr4=self.findSites_(first[i])
256 <            if (itr4 != []):
257 <                req1=''  
258 <                for arg in itr4:
243 <                    req1 = req + ' && anyMatch(other.storage.CloseSEs, ('+str(arg)+'))'
244 <            param_file.write('Requirements = '+req1 +';\n')  
256 >            for arg in itr4:
257 >                groupReq = groupReq + ' && anyMatch(other.storage.CloseSEs, ('+str(arg)+'))'
258 >            param_file.write('Requirements = '+groupReq +';\n')  
259    
260 <            if (self.edg_config and self.edg_config_vo != ''):
261 <                param_file.write('RBconfig = "'+self.edg_config+'";\n')  
262 <                param_file.write('RBconfigVO = "'+self.edg_config_vo+'";')
260 >            if (self.rb_param_file != ''):
261 >                param_file.write(self.rb_param_file)  
262 >
263 >            if len(self.EDG_addJdlParam):
264 >                for p in self.EDG_addJdlParam:
265 >                    param_file.write(p)
266  
267              param_file.close()  
268  
# Line 384 | Line 401 | class SchedulerEdg(Scheduler):
401          Copy input data from SE to WN    
402          """
403          txt = ''
404 +        if not self.copy_input_data: return txt
405  
406          ## OLI_Daniele deactivate for OSG (wait for LCG UI installed on OSG)
407          txt += 'if [ $middleware == OSG ]; then\n'
# Line 463 | Line 481 | class SchedulerEdg(Scheduler):
481             txt += '            echo "Possible problem with SE = $SE"\n'
482             txt += '            echo "StageOutExitStatus = 198" | tee -a $RUNTIME_AREA/$repo\n'
483             txt += '            echo "StageOutExitStatusReason = $exitstring" | tee -a $RUNTIME_AREA/$repo\n'
484 <           txt += '            echo "srm failed."\n'
467 <           txt += '            echo "COPY_EXIT_STATUS for srm = $copy_exit_status"\n'
468 <           txt += '            echo "STAGE_OUT = $copy_exit_status"\n'
469 <           txt += '            echo "Trying to copy output file to $SE using lcg-cp"\n'
470 <           txt += '            echo "srmcp failed, attempting lcgcp"\n'
484 >           txt += '            echo "srmcp failed, attempting lcg-cp."\n'
485             if common.logger.debugLevel() >= 5:
486                 txt += '            echo "lcg-cp --vo $VO -t 2400 --verbose file://`pwd`/$out_file gsiftp://${SE}${SE_PATH}$out_file"\n'
487                 txt += '            exitstring=`lcg-cp --vo $VO -t 2400 --verbose file://\`pwd\`/$out_file gsiftp://${SE}${SE_PATH}$out_file 2>&1`\n'
# Line 482 | Line 496 | class SchedulerEdg(Scheduler):
496             txt += '               echo "Problems with SE = $SE"\n'
497             txt += '               echo "StageOutExitStatus = 198" | tee -a $RUNTIME_AREA/$repo\n'
498             txt += '               echo "StageOutExitStatusReason = $exitstring" | tee -a $RUNTIME_AREA/$repo\n'
499 <           txt += '               echo "lcg-cp and srm failed"\n'
486 <           txt += '               echo "If storage_path in your config file contains a ? you may need a \? instead."\n'
499 >           txt += '               echo "srmcp and lcg-cp and failed!"\n'
500             txt += '            else\n'
501             txt += '               echo "StageOutSE = $SE" | tee -a $RUNTIME_AREA/$repo\n'
502             txt += '               echo "StageOutCatalog = " | tee -a $RUNTIME_AREA/$repo\n'
# Line 571 | Line 584 | class SchedulerEdg(Scheduler):
584          """
585          self.checkProxy()
586          cmd = 'edg-job-get-logging-info -v 2 ' + id
574        #cmd_out = os.popen(cmd)
587          cmd_out = runCommand(cmd)
588          return cmd_out
589  
578    def getExitStatus(self, id):
579        return self.getStatusAttribute_(id, 'exit_code')
580
581    def queryStatus(self, id):
582        return self.getStatusAttribute_(id, 'status')
583
584    def queryDest(self, id):  
585        return self.getStatusAttribute_(id, 'destination')
586
587
588    def getStatusAttribute_(self, id, attr):
589        """ Query a status of the job with id """
590
591        self.checkProxy()
592        hstates = {}
593        Status = importName('edg_wl_userinterface_common_LbWrapper', 'Status')
594        # Bypass edg-job-status interfacing directly to C++ API
595        # Job attribute vector to retrieve status without edg-job-status
596        level = 0
597        # Instance of the Status class provided by LB API
598        jobStat = Status()
599        st = 0
600        jobStat.getStatus(id, level)
601        err, apiMsg = jobStat.get_error()
602        if err:
603            common.logger.debug(5,'Error caught' + apiMsg)
604            return None
605        else:
606            for i in range(len(self.states)):
607                # Fill an hash table with all information retrieved from LB API
608                hstates[ self.states[i] ] = jobStat.loadStatus(st)[i]
609            result = jobStat.loadStatus(st)[self.states.index(attr)]
610            return result
611
590      def queryDetailedStatus(self, id):
591          """ Query a detailed status of the job with id """
592          cmd = 'edg-job-status '+id
# Line 631 | Line 609 | class SchedulerEdg(Scheduler):
609          return itr4
610  
611      def createXMLSchScript(self, nj, argsList):
634   # def createXMLSchScript(self, nj):
612        
613          """
614          Create a XML-file for BOSS4.
# Line 669 | Line 646 | class SchedulerEdg(Scheduler):
646          req=' '
647          req = req + jbt.getRequirements()
648  
672
673        #sites = common.jobDB.destination(nj)
674        #if len(sites)>0 and sites[0]!="Any":
675        #    req = req + ' && anyMatch(other.storage.CloseSEs, (_ITR4_))'
676        #req = req    
677    
649          if self.EDG_requirements:
650              if (req == ' '):
651                  req = req + self.EDG_requirements
# Line 717 | Line 688 | class SchedulerEdg(Scheduler):
688              to_write = to_write + 'RetryCount = "'+self.EDG_retry_count+'"\n'
689              pass
690  
691 +        if ( self.EDG_shallow_retry_count ):              
692 +            to_write = to_write + 'ShallowRetryCount = "'+self.EDG_shallow_retry_count+'"\n'
693 +            pass
694 +
695          to_write = to_write + 'MyProxyServer = "&quot;' + self.proxyServer + '&quot;"\n'
696          to_write = to_write + 'VirtualOrganisation = "&quot;' + self.VO + '&quot;"\n'
697  
698 <                #TaskName  
698 >        #TaskName  
699          dir = string.split(common.work_space.topDir(), '/')
700          taskName = dir[len(dir)-2]
701  
# Line 767 | Line 742 | class SchedulerEdg(Scheduler):
742          xml.write('<exec> ' + os.path.basename(script) +' </exec>\n')
743          xml.write(jt_string)
744      
770          
771        ### only one .sh  JDL has arguments:
772        ### Fabio
773 #        xml.write('args = "' + str(nj+1)+' '+ jbt.getJobTypeArguments(nj, "EDG") +'"\n')
745          xml.write('<args> <![CDATA[\n _ITR2_ \n]]> </args>\n')
746          xml.write('<program_types> crabjob </program_types>\n')
747          inp_box = script + ','
# Line 785 | Line 756 | class SchedulerEdg(Scheduler):
756                    os.path.abspath(os.environ['CRABDIR']+'/python/'+'DashboardAPI.py') + ','+\
757                    os.path.abspath(os.environ['CRABDIR']+'/python/'+'Logger.py') + ','+\
758                    os.path.abspath(os.environ['CRABDIR']+'/python/'+'ProcInfo.py') + ','+\
759 <                  os.path.abspath(os.environ['CRABDIR']+'/python/'+'apmon.py')
759 >                  os.path.abspath(os.environ['CRABDIR']+'/python/'+'apmon.py') + ','+\
760 >                  os.path.abspath(os.environ['CRABDIR']+'/python/'+'parseCrabFjr.py')
761  
762          if (not jbt.additional_inbox_files == []):
763              inp_box = inp_box + ','
# Line 823 | Line 795 | class SchedulerEdg(Scheduler):
795          INDY
796          something similar should be also done for infiles (if it makes sense!)
797          """
798 +        # Stuff to be returned _always_ via sandbox
799 +        for fl in jbt.output_file_sandbox:
800 +            out_box = out_box + '' + jbt.numberFile_(fl, '_ITR1_') + ','
801 +            pass
802 +        pass
803 +
804 +        # via sandbox iif required return_data
805          if int(self.return_data) == 1:
806              for fl in jbt.output_file:
807                  out_box = out_box + '' + jbt.numberFile_(fl, '_ITR1_') + ','
# Line 851 | Line 830 | class SchedulerEdg(Scheduler):
830          Function to check the Globus proxy.
831          """
832          if (self.proxyValid): return
833 +
834 +        ### Just return if asked to do so
835 +        if (self.dontCheckProxy):
836 +            self.proxyValid=1
837 +            return
838 +
839          timeleft = -999
840          minTimeLeft=10*3600 # in seconds
841  
# Line 871 | Line 856 | class SchedulerEdg(Scheduler):
856          pass
857  
858          if mustRenew:
859 <            common.logger.message( "No valid proxy found or remaining time of validity of already existing proxy shorter than 10 hours!\n Creating a user proxy with default length of 96h\n")
859 >            common.logger.message( "No valid proxy found or remaining time of validity of already existing proxy shorter than 10 hours!\n Creating a user proxy with default length of 192h\n")
860              cmd = 'voms-proxy-init -voms '+self.VO
861              if self.group:
862                  cmd += ':/'+self.VO+'/'+self.group
863              if self.role:
864                  cmd += '/role='+self.role
865 <            cmd += ' -valid 96:00'
865 >            cmd += ' -valid 192:00'
866              try:
867                  # SL as above: damn it!
868                  common.logger.debug(10,cmd)

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines