ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerGlite.py
(Generate patch)

Comparing COMP/CRAB/python/SchedulerGlite.py (file contents):
Revision 1.56 by spiga, Mon May 26 16:57:35 2008 UTC vs.
Revision 1.73 by spiga, Thu Jan 14 10:24:17 2010 UTC

# Line 1 | Line 1
1 + """
2 + CRAB interface to BossLite gLite Scheduler
3 + """
4 +
5 + __revision__ = "$Id$"
6 + __version__ = "$Revision$"
7 +
8   from SchedulerGrid import SchedulerGrid
2 from crab_logger import Logger
9   from crab_exceptions import *
10   from crab_util import *
11   from GliteConfig import *
12   import EdgLoggingInfo
13   import common
14 + from WMCore.SiteScreening.BlackWhiteListParser import CEBlackWhiteListParser
15  
16   import os, sys, time
17  
# Line 16 | Line 23 | class SchedulerGlite(SchedulerGrid):
23  
24      def configure(self,cfg_params):
25          SchedulerGrid.configure(self, cfg_params)
26 <        self.checkProxy()
20 <        self.environment_unique_identifier = 'GLITE_WMS_JOBID'
26 >        self.environment_unique_identifier = '$GLITE_WMS_JOBID'
27  
28      def realSchedParams(self,cfg_params):
29          """
30 <        Return dictionary with specific parameters, to use
31 <        with real scheduler  
30 >        Return dictionary with specific parameters, to use
31 >        with real scheduler
32          """
33          self.rb_param_file=''
34 <        if (cfg_params.has_key('EDG.rb')):
35 <            self.rb_param_file=common.scheduler.rb_configure(cfg_params.get("EDG.rb"))
36 <        self.wms_service=cfg_params.get("EDG.wms_service",'')
37 <        self.skipWMSAuth=cfg_params.get("EDG.skipwmsauth",0)
34 >        if (not cfg_params.has_key('GRID.rb')):
35 >            cfg_params['GRID.rb']='CERN'
36 >        self.rb_param_file=common.scheduler.rb_configure(cfg_params.get("GRID.rb"))
37 >        self.wms_service=cfg_params.get("GRID.wms_service",'')
38 >        self.skipWMSAuth=cfg_params.get("GRID.skipwmsauth",1)
39          params = { 'service' : self.wms_service, \
40                     'config' : self.rb_param_file, \
41 <                   'skipWMSAuth' : self.skipWMSAuth
41 >                   'skipWMSAuth' : self.skipWMSAuth
42                   }
43          return  params
44 <      
44 >
45  
46      def rb_configure(self, RB):
47 +        ## 25-Jun-2009 SL: patch to use Cream enabled WMS
48 +        if ( self.cfg_params.get('GRID.use_cream',None) ):
49 +            RB='CREAM'
50          if not RB: return None
51          glite_config = None
52          rb_param_file = None
# Line 52 | Line 62 | class SchedulerGlite(SchedulerGrid):
62          """
63          Returns string with requirement CE related
64          """
65 +        ceParser = CEBlackWhiteListParser(self.EDG_ce_white_list,
66 +                                          self.EDG_ce_black_list, common.logger())
67          req = ''
68 +        ce_white_list = []
69 +        ce_black_list = []
70          if self.EDG_ce_white_list:
71 <            ce_white_list = self.EDG_ce_white_list
71 >            ce_white_list = ceParser.whiteList()
72              tmpCe=[]
73              concString = '&&'
74              for ce in ce_white_list:
# Line 73 | Line 87 | class SchedulerGlite(SchedulerGrid):
87                      req += ") "
88  
89          if self.EDG_ce_black_list:
90 <            ce_black_list = self.EDG_ce_black_list
90 >            ce_black_list = ceParser.blackList()
91              tmpCe=[]
92              concString = '&&'
93              for ce in ce_black_list:
# Line 81 | Line 95 | class SchedulerGlite(SchedulerGrid):
95              if len(tmpCe): req += " && (" + concString.join(tmpCe) + ") "
96  
97          # requirement added to skip gliteCE
98 <        req += '&& (!RegExp("blah", other.GlueCEUniqueId))'
98 >        # not more needed
99 > #       req += '&& (!RegExp("blah", other.GlueCEUniqueId))'
100 >        retWL = ','.join(ce_white_list)
101 >        retBL = ','.join(ce_black_list)
102 >        if not retWL:
103 >            retWL = None
104 >        if not retBL:
105 >            retBL = None
106  
107 <        return req,self.EDG_ce_white_list,self.EDG_ce_black_list
107 >        return req, retWL, retBL
108  
109 <    def se_list(self, id, dest):
109 >    def se_list(self, dest):
110          """
111          Returns string with requirement SE related
112          """
113 <        hostList=self.findSites_(id,dest)
113 >        hostList=self.findSites_(dest)
114          req=''
115          reqtmp=[]
116          concString = '||'
# Line 109 | Line 130 | class SchedulerGlite(SchedulerGrid):
130          if self.EDG_addJdlParam:
131              if self.EDG_addJdlParam[-1] == '': self.EDG_addJdlParam= self.EDG_addJdlParam[:-1]
132              for p in self.EDG_addJdlParam:
133 <                req+=string.strip(p)+';\n'
133 >                req+=string.strip(p)+';\n'
134          return req
135  
136      def specific_req(self):
# Line 127 | Line 148 | class SchedulerGlite(SchedulerGrid):
148  
149          return req
150  
130    def sched_fix_parameter(self):
131        """
132        Returns string with requirements and scheduler-specific parameters
133        """
134        index = int(common._db.nJobs())
135        job = common.job_list[index-1]
136        jbt = job.type()
137        req = ''
138        req = req + jbt.getRequirements()
139
140        if self.EDG_requirements:
141            if (not req == ' '): req = req +  ' && '
142            req = req + self.EDG_requirements
143
144        Task_Req={'jobType':req}
145        common._db.updateTask_(Task_Req)
146
151      def sched_parameter(self,i,task):
152          """
153          Returns string with requirements and scheduler-specific parameters
154          """
155 <        dest=  task.jobs[i-1]['dlsDestination']
155 >        dest=  task.jobs[i-1]['dlsDestination']
156  
157          req=''
158          req +=task['jobType']
159  
160          sched_param=''
161 <        sched_param+='Requirements = ' + req +self.specific_req() + self.se_list(i,dest) +\
162 <                                        self.ce_list()[0] +';\n'
163 <        if self.EDG_addJdlParam: sched_param+=self.jdlParam()
161 >        sched_param+='Requirements = ' + req +self.specific_req() + self.se_list(dest) +\
162 >                                        self.ce_list()[0] +';\n'
163 >        if self.EDG_addJdlParam: sched_param+=self.jdlParam()
164          sched_param+='MyProxyServer = "' + self.proxyServer + '";\n'
165          sched_param+='VirtualOrganisation = "' + self.VO + '";\n'
166          sched_param+='RetryCount = '+str(self.EDG_retry_count)+';\n'
# Line 172 | Line 176 | class SchedulerGlite(SchedulerGrid):
176          reason = loggingInfo.decodeReason(file)
177          return reason
178  
179 <    def findSites_(self, n, sites):
179 >    def findSites_(self, sites):
180          itr4 =[]
181          if len(sites)>0 and sites[0]=="":
182              return itr4
183          if sites != [""]:
184 <            replicas = self.blackWhiteListParser.checkBlackList(sites,n)
184 >            replicas = self.blackWhiteListParser.checkBlackList(sites)
185              if len(replicas)!=0:
186 <                replicas = self.blackWhiteListParser.checkWhiteList(replicas,n)
186 >                replicas = self.blackWhiteListParser.checkWhiteList(replicas)
187  
188              itr4 = replicas
189          return itr4
190  
191 <    
191 >    def delegateProxy(self):
192 >        self.boss().delegateProxy()  
193 >        return
194 >
195      def wsExitFunc(self):
196          """
197          """
# Line 199 | Line 206 | class SchedulerGlite(SchedulerGrid):
206          ### specific Glite check for OSB
207          txt += '    tar zcvf ${out_files}.tgz  ${final_list}\n'
208          txt += '    tmp_size=`ls -gGrta ${out_files}.tgz | awk \'{ print $3 }\'`\n'
209 <        txt += '    rm ${out_files}.tgz\n'  
209 >        txt += '    rm ${out_files}.tgz\n'
210          txt += '    size=`expr $tmp_size`\n'
211          txt += '    echo "Total Output dimension: $size"\n'
212 <        txt += '    limit='+str(self.OSBsize) +' \n'  
212 >        txt += '    limit='+str(self.OSBsize) +' \n'
213          txt += '    echo "WARNING: output files size limit is set to: $limit"\n'
214          txt += '    if [ "$limit" -lt "$size" ]; then\n'
215          txt += '        exceed=1\n'
# Line 221 | Line 228 | class SchedulerGlite(SchedulerGrid):
228          txt += '    else\n'
229          txt += '        tar zcvf ${out_files}.tgz CMSSW_${NJob}.stdout CMSSW_${NJob}.stderr\n'
230          txt += '    fi\n'
231 +        txt += '    python $RUNTIME_AREA/fillCrabFjr.py $RUNTIME_AREA/crab_fjr_$NJob.xml --errorcode $job_exit_code \n'
232          txt += '    exit $job_exit_code\n'
233  
234          txt += '}\n'
235          return txt
228
229    def userName(self):
230        """ return the user name """
231        tmp=runCommand("voms-proxy-info -identity")
232        return tmp.strip()

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines