ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerEdg.py
(Generate patch)

Comparing COMP/CRAB/python/SchedulerEdg.py (file contents):
Revision 1.90 by slacapra, Fri Oct 6 15:50:17 2006 UTC vs.
Revision 1.106.2.1 by slacapra, Wed Dec 6 10:33:05 2006 UTC

# Line 18 | Line 18 | class SchedulerEdg(Scheduler):
18                        "owner","parent_job", "reason","resubmitted","rsl","seed",\
19                        "stateEnterTime","stateEnterTimes","subjob_failed", \
20                        "user tags" , "status" , "status_code","hierarchy"]
21 +        
22          return
23  
24      def configure(self, cfg_params):
25  
26          try:
27 <            RB = cfg_params["EDG.rb"]
28 <            edgConfig = EdgConfig(RB)
28 <            self.edg_config = edgConfig.config()
29 <            self.edg_config_vo = edgConfig.configVO()
27 >            RB=cfg_params["EDG.rb"]
28 >            self.rb_param_file=self.rb_configure(RB)
29          except KeyError:
30 <            self.edg_config = ''
31 <            self.edg_config_vo = ''
33 <
30 >            self.rb_param_file=''
31 >            pass
32          try:
33              self.proxyServer = cfg_params["EDG.proxy_server"]
34          except KeyError:
# Line 38 | Line 36 | class SchedulerEdg(Scheduler):
36          common.logger.debug(5,'Setting myproxy server to '+self.proxyServer)
37  
38          try:
39 +            self.group = cfg_params["EDG.group"]
40 +        except KeyError:
41 +            self.group = None
42 +            
43 +        try:
44              self.role = cfg_params["EDG.role"]
45          except KeyError:
46              self.role = None
# Line 45 | Line 48 | class SchedulerEdg(Scheduler):
48          try: self.LCG_version = cfg_params["EDG.lcg_version"]
49          except KeyError: self.LCG_version = '2'
50  
48        try: self.EDG_requirements = cfg_params['EDG.requirements']
49        except KeyError: self.EDG_requirements = ''
50
51        try: self.EDG_retry_count = cfg_params['EDG.retry_count']
52        except KeyError: self.EDG_retry_count = ''
53
51          try:
52              self.EDG_ce_black_list = cfg_params['EDG.ce_black_list']
56            #print "self.EDG_ce_black_list = ", self.EDG_ce_black_list
53          except KeyError:
54              self.EDG_ce_black_list  = ''
55  
56          try:
57              self.EDG_ce_white_list = cfg_params['EDG.ce_white_list']
62            #print "self.EDG_ce_white_list = ", self.EDG_ce_white_list
58          except KeyError: self.EDG_ce_white_list = ''
59  
60          try: self.VO = cfg_params['EDG.virtual_organization']
61          except KeyError: self.VO = 'cms'
62  
63 +        try: self.copy_input_data = cfg_params["USER.copy_input_data"]
64 +        except KeyError: self.copy_input_data = 0
65 +
66          try: self.return_data = cfg_params['USER.return_data']
67 <        except KeyError: self.return_data = 1
67 >        except KeyError: self.return_data = 0
68  
69          try:
70              self.copy_data = cfg_params["USER.copy_data"]
# Line 128 | Line 126 | class SchedulerEdg(Scheduler):
126  
127          try: self.EDG_requirements = cfg_params['EDG.requirements']
128          except KeyError: self.EDG_requirements = ''
129 <                                                                                                                                                            
129 >
130 >        try: self.EDG_addJdlParam = string.split(cfg_params['EDG.additional_jdl_parameters'],',')
131 >        except KeyError: self.EDG_addJdlParam = []
132 >
133          try: self.EDG_retry_count = cfg_params['EDG.retry_count']
134          except KeyError: self.EDG_retry_count = ''
135 <                                                                                                                                                            
135 >
136 >        try: self.EDG_shallow_retry_count= cfg_params['EDG.shallow_retry_count']
137 >        except KeyError: self.EDG_shallow_retry_count = ''
138 >
139          try: self.EDG_clock_time = cfg_params['EDG.max_wall_clock_time']
140          except KeyError: self.EDG_clock_time= ''
141 <                                                                                                                                                            
141 >
142          try: self.EDG_cpu_time = cfg_params['EDG.max_cpu_time']
143          except KeyError: self.EDG_cpu_time = ''
144  
# Line 164 | Line 168 | class SchedulerEdg(Scheduler):
168          try: self.schedulerName = cfg_params['CRAB.scheduler']
169          except KeyError: self.scheduler = ''
170  
171 +        try: self.dontCheckProxy=cfg_params["EDG.dont_check_proxy"]
172 +        except KeyError: self.dontCheckProxy = 0
173 +
174          return
175      
176  
177 +    def rb_configure(self, RB):
178 +        self.edg_config = ''
179 +        self.edg_config_vo = ''
180 +        self.rb_param_file = ''
181 +
182 +        edgConfig = EdgConfig(RB)
183 +        self.edg_config = edgConfig.config()
184 +        self.edg_config_vo = edgConfig.configVO()
185 +
186 +        if (self.edg_config and self.edg_config_vo != ''):
187 +            self.rb_param_file = 'RBconfig = "'+self.edg_config+'";\nRBconfigVO = "'+self.edg_config_vo+'";'
188 +            #print "rb_param_file = ", self.rb_param_file
189 +        return self.rb_param_file
190 +      
191 +
192      def sched_parameter(self):
193          """
194          Returns file with requirements and scheduler-specific parameters
# Line 175 | Line 197 | class SchedulerEdg(Scheduler):
197          job = common.job_list[index]
198          jbt = job.type()
199          
200 <        lastDest=''
200 >        lastBlock=-1
201          first = []
180        last  = []
202          for n in range(common.jobDB.nJobs()):
203 <            currDest=common.jobDB.destination(n)
204 <            if (currDest!=lastDest):
205 <                lastDest = currDest
203 >            currBlock=common.jobDB.block(n)
204 >            if (currBlock!=lastBlock):
205 >                lastBlock = currBlock
206                  first.append(n)
186                if n != 0:last.append(n-1)
187        if len(first)>len(last) :last.append(common.jobDB.nJobs())
207    
208          req = ''
209          req = req + jbt.getRequirements()
# Line 194 | Line 213 | class SchedulerEdg(Scheduler):
213                  req = req + self.EDG_requirements
214              else:
215                  req = req +  ' && ' + self.EDG_requirements
216 +
217          if self.EDG_ce_white_list:
218              ce_white_list = string.split(self.EDG_ce_white_list,',')
219              for i in range(len(ce_white_list)):
220                  if i == 0:
221                      if (req == ' '):
222 <                        req = req + '((RegExp("' + ce_white_list[i] + '", other.GlueCEUniqueId))'
222 >                        req = req + '((RegExp("' + string.strip(ce_white_list[i]) + '", other.GlueCEUniqueId))'
223                      else:
224 <                        req = req +  ' && ((RegExp("' + ce_white_list[i] + '", other.GlueCEUniqueId))'
224 >                        req = req +  ' && ((RegExp("' +  string.strip(ce_white_list[i]) + '", other.GlueCEUniqueId))'
225                      pass
226                  else:
227 <                    req = req +  ' || (RegExp("' + ce_white_list[i] + '", other.GlueCEUniqueId))'
227 >                    req = req +  ' || (RegExp("' +  string.strip(ce_white_list[i]) + '", other.GlueCEUniqueId))'
228              req = req + ')'
229          
230          if self.EDG_ce_black_list:
231              ce_black_list = string.split(self.EDG_ce_black_list,',')
232              for ce in ce_black_list:
233                  if (req == ' '):
234 <                    req = req + '(!RegExp("' + ce + '", other.GlueCEUniqueId))'
234 >                    req = req + '(!RegExp("' + string.strip(ce) + '", other.GlueCEUniqueId))'
235                  else:
236 <                    req = req +  ' && (!RegExp("' + ce + '", other.GlueCEUniqueId))'
236 >                    req = req +  ' && (!RegExp("' + string.strip(ce) + '", other.GlueCEUniqueId))'
237                  pass
238          if self.EDG_clock_time:
239              if (req == ' '):
# Line 228 | Line 248 | class SchedulerEdg(Scheduler):
248                  req = req + ' && other.GlueCEPolicyMaxCPUTime>='+self.EDG_cpu_time
249                  
250          for i in range(len(first)): # Add loop DS
251 +            groupReq = req
252              self.param='sched_param_'+str(i)+'.clad'
253              param_file = open(common.work_space.shareDir()+'/'+self.param, 'w')
254  
255              itr4=self.findSites_(first[i])
256 <            if (itr4 != []):
257 <                req1=''  
258 <                for arg in itr4:
238 <                    req1 = req + ' && anyMatch(other.storage.CloseSEs, ('+str(arg)+'))'
239 <            param_file.write('Requirements = '+req1 +';\n')  
256 >            for arg in itr4:
257 >                groupReq = groupReq + ' && anyMatch(other.storage.CloseSEs, ('+str(arg)+'))'
258 >            param_file.write('Requirements = '+groupReq +';\n')  
259    
260 <            if (self.edg_config and self.edg_config_vo != ''):
261 <                param_file.write('RBconfig = "'+self.edg_config+'";\n')  
262 <                param_file.write('RBconfigVO = "'+self.edg_config_vo+'";')
260 >            if (self.rb_param_file != ''):
261 >                param_file.write(self.rb_param_file)  
262 >
263 >            if len(self.EDG_addJdlParam):
264 >                for p in self.EDG_addJdlParam:
265 >                    param_file.write(p)
266  
267              param_file.close()  
268  
# Line 379 | Line 401 | class SchedulerEdg(Scheduler):
401          Copy input data from SE to WN    
402          """
403          txt = ''
404 +        if not self.copy_input_data: return txt
405  
406          ## OLI_Daniele deactivate for OSG (wait for LCG UI installed on OSG)
407          txt += 'if [ $middleware == OSG ]; then\n'
# Line 438 | Line 461 | class SchedulerEdg(Scheduler):
461             txt += '        export X509_CERT_DIR=$OSG_APP/glite/etc/grid-security/certificates\n'
462             txt += '        echo "export X509_CERT_DIR=$X509_CERT_DIR"\n'
463             txt += '    fi \n'
464 +
465             txt += '    for out_file in $file_list ; do\n'
466 <           txt += '        echo "Trying to copy output file to $SE using lcg-cp"\n'
467 <           if common.logger.debugLevel() >= 5:
468 <               txt += '        echo "lcg-cp --vo $VO -t 2400 --verbose file://`pwd`/$out_file gsiftp://${SE}${SE_PATH}$out_file"\n'
469 <               txt += '        exitstring=`lcg-cp --vo $VO -t 2400 --verbose file://\`pwd\`/$out_file gsiftp://${SE}${SE_PATH}$out_file 2>&1`\n'
470 <           else:
471 <               txt += '        echo "lcg-cp --vo $VO -t 2400 file://`pwd`/$out_file gsiftp://${SE}${SE_PATH}$out_file"\n'
472 <               txt += '        exitstring=`lcg-cp --vo $VO -t 2400 file://\`pwd\`/$out_file gsiftp://${SE}${SE_PATH}$out_file 2>&1`\n'
466 >           txt += '        echo "Trying to copy output file to $SE using srmcp"\n'
467 >           txt += '        echo "mkdir -p $HOME/.srmconfig"\n'
468 >           txt += '        mkdir -p $HOME/.srmconfig\n'
469 >           txt += '        if [ $middleware == LCG ]; then\n'
470 >           txt += '           echo "srmcp -retry_num 3 -retry_timeout 480000 file:////`pwd`/$out_file srm://${SE}:8443${SE_PATH}$out_file"\n'
471 >           txt += '           exitstring=`srmcp -retry_num 3 -retry_timeout 480000 file:////\`pwd\`/$out_file srm://${SE}:8443${SE_PATH}$out_file 2>&1`\n'
472 >           txt += '        elif [ $middleware == OSG ]; then\n'
473 >           txt += '           echo "srmcp -retry_num 3 -retry_timeout 240000 -x509_user_trusted_certificates $X509_CERT_DIR file:////`pwd`/$out_file srm://${SE}:8443${SE_PATH}$out_file"\n'
474 >           txt += '           exitstring=`srmcp -retry_num 3 -retry_timeout 240000 -x509_user_trusted_certificates $X509_CERT_DIR file:////\`pwd\`/$out_file srm://${SE}:8443${SE_PATH}$out_file 2>&1`\n'
475 >           txt += '        fi \n'
476             txt += '        copy_exit_status=$?\n'
477 <           txt += '        echo "COPY_EXIT_STATUS for lcg-cp = $copy_exit_status"\n'
477 >           txt += '        echo "COPY_EXIT_STATUS for srmcp = $copy_exit_status"\n'
478             txt += '        echo "STAGE_OUT = $copy_exit_status"\n'
479 +
480             txt += '        if [ $copy_exit_status -ne 0 ]; then\n'
481             txt += '            echo "Possible problem with SE = $SE"\n'
482             txt += '            echo "StageOutExitStatus = 198" | tee -a $RUNTIME_AREA/$repo\n'
483             txt += '            echo "StageOutExitStatusReason = $exitstring" | tee -a $RUNTIME_AREA/$repo\n'
484 <           txt += '            echo "lcg-cp failed.  For verbose lcg-cp output, use command line option -debug 5."\n'
485 <           txt += '            echo "lcg-cp failed, attempting srmcp"\n'
486 <           txt += '            echo "mkdir -p $HOME/.srmconfig"\n'
487 <           txt += '            mkdir -p $HOME/.srmconfig\n'
488 <           txt += '            if [ $middleware == LCG ]; then\n'
489 <           txt += '               echo "srmcp -retry_num 5 -retry_timeout 480000 file:////`pwd`/$out_file srm://${SE}:8443${SE_PATH}$out_file"\n'
490 <           txt += '               exitstring=`srmcp -retry_num 5 -retry_timeout 480000 file:////\`pwd\`/$out_file srm://${SE}:8443${SE_PATH}$out_file 2>&1`\n'
463 <           txt += '            elif [ $middleware == OSG ]; then\n'
464 <           txt += '               echo "srmcp -retry_num 5 -retry_timeout 240000 -x509_user_trusted_certificates $OSG_APP/glite/etc/grid-security/certificates file:////`pwd`/$out_file srm://${SE}:8443${SE_PATH}$out_file"\n'
465 <           txt += '               exitstring=`srmcp -retry_num 5 -retry_timeout 240000 -x509_user_trusted_certificates $OSG_APP/glite/etc/grid-security/certificates file:////\`pwd\`/$out_file srm://${SE}:8443${SE_PATH}$out_file 2>&1`\n'
466 <           txt += '            fi \n'
484 >           txt += '            echo "srmcp failed, attempting lcg-cp."\n'
485 >           if common.logger.debugLevel() >= 5:
486 >               txt += '            echo "lcg-cp --vo $VO -t 2400 --verbose file://`pwd`/$out_file gsiftp://${SE}${SE_PATH}$out_file"\n'
487 >               txt += '            exitstring=`lcg-cp --vo $VO -t 2400 --verbose file://\`pwd\`/$out_file gsiftp://${SE}${SE_PATH}$out_file 2>&1`\n'
488 >           else:              
489 >               txt += '            echo "lcg-cp --vo $VO -t 2400 file://`pwd`/$out_file gsiftp://${SE}${SE_PATH}$out_file"\n'
490 >               txt += '            exitstring=`lcg-cp --vo $VO -t 2400 file://\`pwd\`/$out_file gsiftp://${SE}${SE_PATH}$out_file 2>&1`\n'
491             txt += '            copy_exit_status=$?\n'
492 <           txt += '            echo "COPY_EXIT_STATUS for srm = $copy_exit_status"\n'
492 >           txt += '            echo "COPY_EXIT_STATUS for lcg-cp = $copy_exit_status"\n'
493             txt += '            echo "STAGE_OUT = $copy_exit_status"\n'
494 +
495             txt += '            if [ $copy_exit_status -ne 0 ]; then\n'
496             txt += '               echo "Problems with SE = $SE"\n'
497             txt += '               echo "StageOutExitStatus = 198" | tee -a $RUNTIME_AREA/$repo\n'
498             txt += '               echo "StageOutExitStatusReason = $exitstring" | tee -a $RUNTIME_AREA/$repo\n'
499 <           txt += '               echo "lcg-cp and srm failed"\n'
475 <           txt += '               echo "If storage_path in your config file contains a ? you may need a \? instead."\n'
499 >           txt += '               echo "srmcp and lcg-cp and failed!"\n'
500             txt += '            else\n'
501             txt += '               echo "StageOutSE = $SE" | tee -a $RUNTIME_AREA/$repo\n'
502             txt += '               echo "StageOutCatalog = " | tee -a $RUNTIME_AREA/$repo\n'
503             txt += '               echo "output copied into $SE/$SE_PATH directory"\n'
504             txt += '               echo "StageOutExitStatus = 0" | tee -a $RUNTIME_AREA/$repo\n'
505 <           txt += '               echo "srmcp succeeded"\n'
505 >           txt += '               echo "lcg-cp succeeded"\n'
506             txt += '            fi\n'
507             txt += '        else\n'
508             txt += '            echo "StageOutSE = $SE" | tee -a $RUNTIME_AREA/$repo\n'
509             txt += '            echo "StageOutCatalog = " | tee -a $RUNTIME_AREA/$repo\n'
510             txt += '            echo "output copied into $SE/$SE_PATH directory"\n'
511             txt += '            echo "StageOutExitStatus = 0" | tee -a $RUNTIME_AREA/$repo\n'
512 <           txt += '            echo "lcg-cp succeeded"\n'
512 >           txt += '            echo "srmcp succeeded"\n'
513             txt += '         fi\n'
514             txt += '     done\n'
515          return txt
# Line 560 | Line 584 | class SchedulerEdg(Scheduler):
584          """
585          self.checkProxy()
586          cmd = 'edg-job-get-logging-info -v 2 ' + id
563        #cmd_out = os.popen(cmd)
587          cmd_out = runCommand(cmd)
588          return cmd_out
589  
567    def getExitStatus(self, id):
568        return self.getStatusAttribute_(id, 'exit_code')
569
570    def queryStatus(self, id):
571        return self.getStatusAttribute_(id, 'status')
572
573    def queryDest(self, id):  
574        return self.getStatusAttribute_(id, 'destination')
575
576
577    def getStatusAttribute_(self, id, attr):
578        """ Query a status of the job with id """
579
580        self.checkProxy()
581        hstates = {}
582        Status = importName('edg_wl_userinterface_common_LbWrapper', 'Status')
583        # Bypass edg-job-status interfacing directly to C++ API
584        # Job attribute vector to retrieve status without edg-job-status
585        level = 0
586        # Instance of the Status class provided by LB API
587        jobStat = Status()
588        st = 0
589        jobStat.getStatus(id, level)
590        err, apiMsg = jobStat.get_error()
591        if err:
592            common.logger.debug(5,'Error caught' + apiMsg)
593            return None
594        else:
595            for i in range(len(self.states)):
596                # Fill an hash table with all information retrieved from LB API
597                hstates[ self.states[i] ] = jobStat.loadStatus(st)[i]
598            result = jobStat.loadStatus(st)[self.states.index(attr)]
599            return result
600
590      def queryDetailedStatus(self, id):
591          """ Query a detailed status of the job with id """
592          cmd = 'edg-job-status '+id
# Line 620 | Line 609 | class SchedulerEdg(Scheduler):
609          return itr4
610  
611      def createXMLSchScript(self, nj, argsList):
623   # def createXMLSchScript(self, nj):
612        
613          """
614          Create a XML-file for BOSS4.
# Line 658 | Line 646 | class SchedulerEdg(Scheduler):
646          req=' '
647          req = req + jbt.getRequirements()
648  
661
662        #sites = common.jobDB.destination(nj)
663        #if len(sites)>0 and sites[0]!="Any":
664        #    req = req + ' && anyMatch(other.storage.CloseSEs, (_ITR4_))'
665        #req = req    
666    
649          if self.EDG_requirements:
650              if (req == ' '):
651                  req = req + self.EDG_requirements
# Line 706 | Line 688 | class SchedulerEdg(Scheduler):
688              to_write = to_write + 'RetryCount = "'+self.EDG_retry_count+'"\n'
689              pass
690  
691 +        if ( self.EDG_shallow_retry_count ):              
692 +            to_write = to_write + 'ShallowRetryCount = "'+self.EDG_shallow_retry_count+'"\n'
693 +            pass
694 +
695          to_write = to_write + 'MyProxyServer = "&quot;' + self.proxyServer + '&quot;"\n'
696          to_write = to_write + 'VirtualOrganisation = "&quot;' + self.VO + '&quot;"\n'
697  
698 <                #TaskName  
698 >        #TaskName  
699          dir = string.split(common.work_space.topDir(), '/')
700          taskName = dir[len(dir)-2]
701  
# Line 756 | Line 742 | class SchedulerEdg(Scheduler):
742          xml.write('<exec> ' + os.path.basename(script) +' </exec>\n')
743          xml.write(jt_string)
744      
759          
760        ### only one .sh  JDL has arguments:
761        ### Fabio
762 #        xml.write('args = "' + str(nj+1)+' '+ jbt.getJobTypeArguments(nj, "EDG") +'"\n')
745          xml.write('<args> <![CDATA[\n _ITR2_ \n]]> </args>\n')
746          xml.write('<program_types> crabjob </program_types>\n')
747          inp_box = script + ','
# Line 774 | Line 756 | class SchedulerEdg(Scheduler):
756                    os.path.abspath(os.environ['CRABDIR']+'/python/'+'DashboardAPI.py') + ','+\
757                    os.path.abspath(os.environ['CRABDIR']+'/python/'+'Logger.py') + ','+\
758                    os.path.abspath(os.environ['CRABDIR']+'/python/'+'ProcInfo.py') + ','+\
759 <                  os.path.abspath(os.environ['CRABDIR']+'/python/'+'apmon.py')
759 >                  os.path.abspath(os.environ['CRABDIR']+'/python/'+'apmon.py') + ','+\
760 >                  os.path.abspath(os.environ['CRABDIR']+'/python/'+'parseCrabFjr.py')
761  
762          if (not jbt.additional_inbox_files == []):
763              inp_box = inp_box + ','
# Line 812 | Line 795 | class SchedulerEdg(Scheduler):
795          INDY
796          something similar should be also done for infiles (if it makes sense!)
797          """
798 +        # Stuff to be returned _always_ via sandbox
799 +        for fl in jbt.output_file_sandbox:
800 +            out_box = out_box + '' + jbt.numberFile_(fl, '_ITR1_') + ','
801 +            pass
802 +        pass
803 +
804 +        # via sandbox iif required return_data
805          if int(self.return_data) == 1:
806              for fl in jbt.output_file:
807                  out_box = out_box + '' + jbt.numberFile_(fl, '_ITR1_') + ','
# Line 840 | Line 830 | class SchedulerEdg(Scheduler):
830          Function to check the Globus proxy.
831          """
832          if (self.proxyValid): return
833 +
834 +        ### Just return if asked to do so
835 +        if (self.dontCheckProxy):
836 +            self.proxyValid=1
837 +            return
838 +
839          timeleft = -999
840          minTimeLeft=10*3600 # in seconds
841  
# Line 860 | Line 856 | class SchedulerEdg(Scheduler):
856          pass
857  
858          if mustRenew:
859 <            common.logger.message( "No valid proxy found or remaining time of validity of already existing proxy shorter than 10 hours!\n Creating a user proxy with default length of 96h\n")
860 <            cmd = 'voms-proxy-init -voms '+self.VO+' -valid 96:00'
859 >            common.logger.message( "No valid proxy found or remaining time of validity of already existing proxy shorter than 10 hours!\n Creating a user proxy with default length of 192h\n")
860 >            cmd = 'voms-proxy-init -voms '+self.VO
861 >            if self.group:
862 >                cmd += ':/'+self.VO+'/'+self.group
863              if self.role:
864 <                cmd = 'voms-proxy-init -voms '+self.VO+':/'+self.VO+'/role='+self.role+' -valid 96:00'
864 >                cmd += '/role='+self.role
865 >            cmd += ' -valid 192:00'
866              try:
867                  # SL as above: damn it!
868 +                common.logger.debug(10,cmd)
869                  out = os.system(cmd)
870                  if (out>0): raise CrabException("Unable to create a valid proxy!\n")
871              except:

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines