ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerEdg.py
(Generate patch)

Comparing COMP/CRAB/python/SchedulerEdg.py (file contents):
Revision 1.102 by slacapra, Tue Oct 17 13:28:03 2006 UTC vs.
Revision 1.108 by slacapra, Wed Dec 6 10:27:30 2006 UTC

# Line 5 | Line 5 | from crab_util import *
5   from EdgConfig import *
6   import common
7  
8 < import os, sys, time
8 > import os, sys, time, gzip
9  
10   class SchedulerEdg(Scheduler):
11      def __init__(self):
# Line 18 | Line 18 | class SchedulerEdg(Scheduler):
18                        "owner","parent_job", "reason","resubmitted","rsl","seed",\
19                        "stateEnterTime","stateEnterTimes","subjob_failed", \
20                        "user tags" , "status" , "status_code","hierarchy"]
21 +        
22          return
23  
24      def configure(self, cfg_params):
# Line 167 | Line 168 | class SchedulerEdg(Scheduler):
168          try: self.schedulerName = cfg_params['CRAB.scheduler']
169          except KeyError: self.scheduler = ''
170  
171 +        try: self.dontCheckProxy=cfg_params["EDG.dont_check_proxy"]
172 +        except KeyError: self.dontCheckProxy = 0
173 +
174          return
175      
176  
# Line 215 | Line 219 | class SchedulerEdg(Scheduler):
219              for i in range(len(ce_white_list)):
220                  if i == 0:
221                      if (req == ' '):
222 <                        req = req + '((RegExp("' + ce_white_list[i] + '", other.GlueCEUniqueId))'
222 >                        req = req + '((RegExp("' + string.strip(ce_white_list[i]) + '", other.GlueCEUniqueId))'
223                      else:
224 <                        req = req +  ' && ((RegExp("' + ce_white_list[i] + '", other.GlueCEUniqueId))'
224 >                        req = req +  ' && ((RegExp("' +  string.strip(ce_white_list[i]) + '", other.GlueCEUniqueId))'
225                      pass
226                  else:
227 <                    req = req +  ' || (RegExp("' + ce_white_list[i] + '", other.GlueCEUniqueId))'
227 >                    req = req +  ' || (RegExp("' +  string.strip(ce_white_list[i]) + '", other.GlueCEUniqueId))'
228              req = req + ')'
229          
230          if self.EDG_ce_black_list:
231              ce_black_list = string.split(self.EDG_ce_black_list,',')
232              for ce in ce_black_list:
233                  if (req == ' '):
234 <                    req = req + '(!RegExp("' + ce + '", other.GlueCEUniqueId))'
234 >                    req = req + '(!RegExp("' + string.strip(ce) + '", other.GlueCEUniqueId))'
235                  else:
236 <                    req = req +  ' && (!RegExp("' + ce + '", other.GlueCEUniqueId))'
236 >                    req = req +  ' && (!RegExp("' + string.strip(ce) + '", other.GlueCEUniqueId))'
237                  pass
238          if self.EDG_clock_time:
239              if (req == ' '):
# Line 244 | Line 248 | class SchedulerEdg(Scheduler):
248                  req = req + ' && other.GlueCEPolicyMaxCPUTime>='+self.EDG_cpu_time
249                  
250          for i in range(len(first)): # Add loop DS
251 +            groupReq = req
252              self.param='sched_param_'+str(i)+'.clad'
253              param_file = open(common.work_space.shareDir()+'/'+self.param, 'w')
254  
255              itr4=self.findSites_(first[i])
256              for arg in itr4:
257 <                req = req + ' && anyMatch(other.storage.CloseSEs, ('+str(arg)+'))'
258 <            param_file.write('Requirements = '+req +';\n')  
257 >                groupReq = groupReq + ' && anyMatch(other.storage.CloseSEs, ('+str(arg)+'))'
258 >            param_file.write('Requirements = '+groupReq +';\n')  
259    
260              if (self.rb_param_file != ''):
261                  param_file.write(self.rb_param_file)  
# Line 579 | Line 584 | class SchedulerEdg(Scheduler):
584          """
585          self.checkProxy()
586          cmd = 'edg-job-get-logging-info -v 2 ' + id
582        #cmd_out = os.popen(cmd)
587          cmd_out = runCommand(cmd)
588          return cmd_out
589  
586    def getExitStatus(self, id):
587        return self.getStatusAttribute_(id, 'exit_code')
588
589    def queryStatus(self, id):
590        return self.getStatusAttribute_(id, 'status')
591
592    def queryDest(self, id):  
593        return self.getStatusAttribute_(id, 'destination')
594
595
596    def getStatusAttribute_(self, id, attr):
597        """ Query a status of the job with id """
598
599        self.checkProxy()
600        hstates = {}
601        Status = importName('edg_wl_userinterface_common_LbWrapper', 'Status')
602        # Bypass edg-job-status interfacing directly to C++ API
603        # Job attribute vector to retrieve status without edg-job-status
604        level = 0
605        # Instance of the Status class provided by LB API
606        jobStat = Status()
607        st = 0
608        #print id, level, attr, self.states.index(attr)
609        jobStat.getStatus(id, level)
610        #print jobStat.loadStatus(st)
611        err, apiMsg = jobStat.get_error()
612        if err:
613            common.logger.debug(5,'Error caught' + apiMsg)
614            return None
615        else:
616            for i in range(len(self.states)):
617                # Fill an hash table with all information retrieved from LB API
618                hstates[ self.states[i] ] = jobStat.loadStatus(st)[i]
619                #print i, jobStat.loadStatus(st)[i]
620            result = jobStat.loadStatus(st)[self.states.index(attr)]
621            #print str(result)
622            return result
623
590      def queryDetailedStatus(self, id):
591          """ Query a detailed status of the job with id """
592          cmd = 'edg-job-status '+id
# Line 643 | Line 609 | class SchedulerEdg(Scheduler):
609          return itr4
610  
611      def createXMLSchScript(self, nj, argsList):
646   # def createXMLSchScript(self, nj):
612        
613          """
614          Create a XML-file for BOSS4.
# Line 735 | Line 700 | class SchedulerEdg(Scheduler):
700          taskName = dir[len(dir)-2]
701  
702          xml.write(str(title))
703 <        xml.write('<task name="' +str(taskName)+'">\n')
703 >        xml.write('<task name="' +str(taskName)+'" sub_path="' + common.work_space.bossCache() + '">\n')
704          xml.write(jt_string)
705          
706          if (to_write != ''):
# Line 776 | Line 741 | class SchedulerEdg(Scheduler):
741          xml.write('<program>\n')
742          xml.write('<exec> ' + os.path.basename(script) +' </exec>\n')
743          xml.write(jt_string)
744 <    
744 >
745          xml.write('<args> <![CDATA[\n _ITR2_ \n]]> </args>\n')
746          xml.write('<program_types> crabjob </program_types>\n')
747 <        inp_box = script + ','
747 >        inp_box = common.work_space.pathForTgz() + 'job/' + jbt.scriptName + ','
748  
749          if inp_sandbox != None:
750              for fl in inp_sandbox:
# Line 787 | Line 752 | class SchedulerEdg(Scheduler):
752                  pass
753              pass
754  
790        inp_box = inp_box + os.path.abspath(os.environ['CRABDIR']+'/python/'+'report.py') + ',' +\
791                  os.path.abspath(os.environ['CRABDIR']+'/python/'+'DashboardAPI.py') + ','+\
792                  os.path.abspath(os.environ['CRABDIR']+'/python/'+'Logger.py') + ','+\
793                  os.path.abspath(os.environ['CRABDIR']+'/python/'+'ProcInfo.py') + ','+\
794                  os.path.abspath(os.environ['CRABDIR']+'/python/'+'apmon.py') + ','+\
795                  os.path.abspath(os.environ['CRABDIR']+'/python/'+'parseCrabFjr.py')
796
755          if (not jbt.additional_inbox_files == []):
756              inp_box = inp_box + ','
757              for addFile in jbt.additional_inbox_files:
# Line 830 | Line 788 | class SchedulerEdg(Scheduler):
788          INDY
789          something similar should be also done for infiles (if it makes sense!)
790          """
791 +        # Stuff to be returned _always_ via sandbox
792 +        for fl in jbt.output_file_sandbox:
793 +            out_box = out_box + '' + jbt.numberFile_(fl, '_ITR1_') + ','
794 +            pass
795 +        pass
796 +
797 +        # via sandbox iif required return_data
798          if int(self.return_data) == 1:
799              for fl in jbt.output_file:
800                  out_box = out_box + '' + jbt.numberFile_(fl, '_ITR1_') + ','
# Line 858 | Line 823 | class SchedulerEdg(Scheduler):
823          Function to check the Globus proxy.
824          """
825          if (self.proxyValid): return
826 +
827 +        ### Just return if asked to do so
828 +        if (self.dontCheckProxy):
829 +            self.proxyValid=1
830 +            return
831 +
832          timeleft = -999
833          minTimeLeft=10*3600 # in seconds
834  

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines