ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerGlite.py
Revision: 1.65
Committed: Mon Dec 8 17:53:32 2008 UTC (16 years, 4 months ago) by ewv
Content type: text/x-python
Branch: MAIN
Changes since 1.64: +10 -5 lines
Log Message:
SiteDB names for CE and gLite

File Contents

# Content
1 """
2 CRAB interface to BossLite gLite Scheduler
3 """
4
5 __revision__ = "$Id: SchedulerGlite.py,v 1.64 2008/11/08 11:57:03 spiga Exp $"
6 __version__ = "$Revision: 1.64 $"
7
8 from SchedulerGrid import SchedulerGrid
9 from crab_logger import Logger
10 from crab_exceptions import *
11 from crab_util import *
12 from GliteConfig import *
13 import EdgLoggingInfo
14 import common
15 from WMCore.SiteScreening.BlackWhiteListParser import CEBlackWhiteListParser
16
17 import os, sys, time
18
19 class SchedulerGlite(SchedulerGrid):
20 def __init__(self, name="GLITE"):
21 SchedulerGrid.__init__(self,name)
22
23 self.OSBsize = 55000000
24
25 def configure(self,cfg_params):
26 SchedulerGrid.configure(self, cfg_params)
27 self.environment_unique_identifier = '$GLITE_WMS_JOBID'
28
29 def realSchedParams(self,cfg_params):
30 """
31 Return dictionary with specific parameters, to use
32 with real scheduler
33 """
34 self.rb_param_file=''
35 if (not cfg_params.has_key('EDG.rb')):
36 cfg_params['EDG.rb']='CERN'
37 self.rb_param_file=common.scheduler.rb_configure(cfg_params.get("EDG.rb"))
38 self.wms_service=cfg_params.get("EDG.wms_service",'')
39 self.skipWMSAuth=cfg_params.get("EDG.skipwmsauth",1)
40 params = { 'service' : self.wms_service, \
41 'config' : self.rb_param_file, \
42 'skipWMSAuth' : self.skipWMSAuth
43 }
44 return params
45
46
47 def rb_configure(self, RB):
48 if not RB: return None
49 glite_config = None
50 rb_param_file = None
51
52 gliteConfig = GliteConfig(RB)
53 glite_config = gliteConfig.config()
54
55 if (glite_config ):
56 rb_param_file = glite_config
57 return rb_param_file
58
59 def ce_list(self):
60 """
61 Returns string with requirement CE related
62 """
63 ceParser = CEBlackWhiteListParser(self.EDG_ce_white_list,
64 self.EDG_ce_black_list, common.logger)
65 req = ''
66 ce_white_list = []
67 ce_black_list = []
68 if self.EDG_ce_white_list:
69 ce_white_list = ceParser.whiteList()
70 tmpCe=[]
71 concString = '&&'
72 for ce in ce_white_list:
73 tmpCe.append('RegExp("' + string.strip(ce) + '", other.GlueCEUniqueId)')
74 if len(tmpCe) == 1:
75 req += " && (" + concString.join(tmpCe) + ") "
76 elif len(tmpCe) > 1:
77 firstCE = 0
78 for reqTemp in tmpCe:
79 if firstCE == 0:
80 req += " && ( (" + reqTemp + ") "
81 firstCE = 1
82 elif firstCE > 0:
83 req += " || (" + reqTemp + ") "
84 if firstCE > 0:
85 req += ") "
86
87 if self.EDG_ce_black_list:
88 ce_black_list = ceParser.blackList()
89 tmpCe=[]
90 concString = '&&'
91 for ce in ce_black_list:
92 tmpCe.append('(!RegExp("' + string.strip(ce) + '", other.GlueCEUniqueId))')
93 if len(tmpCe): req += " && (" + concString.join(tmpCe) + ") "
94
95 # requirement added to skip gliteCE
96 req += '&& (!RegExp("blah", other.GlueCEUniqueId))'
97
98 return req, ','.join(ce_white_list), ','.join(ce_black_list)
99
100 def se_list(self, dest):
101 """
102 Returns string with requirement SE related
103 """
104 hostList=self.findSites_(dest)
105 req=''
106 reqtmp=[]
107 concString = '||'
108
109 for arg in hostList:
110 reqtmp.append(' Member("'+arg+'" , other.GlueCESEBindGroupSEUniqueID) ')
111
112 if len(reqtmp): req += " && (" + concString.join(reqtmp) + ") "
113
114 return req
115
116 def jdlParam(self):
117 """
118 Returns
119 """
120 req=''
121 if self.EDG_addJdlParam:
122 if self.EDG_addJdlParam[-1] == '': self.EDG_addJdlParam= self.EDG_addJdlParam[:-1]
123 for p in self.EDG_addJdlParam:
124 req+=string.strip(p)+';\n'
125 return req
126
127 def specific_req(self):
128 """
129 Returns string with specific requirements
130 """
131 req=''
132 if self.EDG_clock_time:
133 if (not req == ' '): req = req + ' && '
134 req = req + 'other.GlueCEPolicyMaxWallClockTime>='+self.EDG_clock_time
135
136 if self.EDG_cpu_time:
137 if (not req == ' '): req = req + ' && '
138 req = req + ' other.GlueCEPolicyMaxCPUTime>='+self.EDG_cpu_time
139
140 return req
141
142 def sched_parameter(self,i,task):
143 """
144 Returns string with requirements and scheduler-specific parameters
145 """
146 dest= task.jobs[i-1]['dlsDestination']
147
148 req=''
149 req +=task['jobType']
150
151 sched_param=''
152 sched_param+='Requirements = ' + req +self.specific_req() + self.se_list(dest) +\
153 self.ce_list()[0] +';\n'
154 if self.EDG_addJdlParam: sched_param+=self.jdlParam()
155 sched_param+='MyProxyServer = "' + self.proxyServer + '";\n'
156 sched_param+='VirtualOrganisation = "' + self.VO + '";\n'
157 sched_param+='RetryCount = '+str(self.EDG_retry_count)+';\n'
158 sched_param+='ShallowRetryCount = '+str(self.EDG_shallow_retry_count)+';\n'
159
160 return sched_param
161
162 def decodeLogInfo(self, file):
163 """
164 Parse logging info file and return main info
165 """
166 loggingInfo = EdgLoggingInfo.EdgLoggingInfo()
167 reason = loggingInfo.decodeReason(file)
168 return reason
169
170 def findSites_(self, sites):
171 itr4 =[]
172 if len(sites)>0 and sites[0]=="":
173 return itr4
174 if sites != [""]:
175 replicas = self.blackWhiteListParser.checkBlackList(sites)
176 if len(replicas)!=0:
177 replicas = self.blackWhiteListParser.checkWhiteList(replicas)
178
179 itr4 = replicas
180 return itr4
181
182
183 def wsExitFunc(self):
184 """
185 """
186 txt = '\n'
187
188 txt += '#\n'
189 txt += '# EXECUTE THIS FUNCTION BEFORE EXIT \n'
190 txt += '#\n\n'
191
192 txt += 'func_exit() { \n'
193 txt += self.wsExitFunc_common()
194 ### specific Glite check for OSB
195 txt += ' tar zcvf ${out_files}.tgz ${final_list}\n'
196 txt += ' tmp_size=`ls -gGrta ${out_files}.tgz | awk \'{ print $3 }\'`\n'
197 txt += ' rm ${out_files}.tgz\n'
198 txt += ' size=`expr $tmp_size`\n'
199 txt += ' echo "Total Output dimension: $size"\n'
200 txt += ' limit='+str(self.OSBsize) +' \n'
201 txt += ' echo "WARNING: output files size limit is set to: $limit"\n'
202 txt += ' if [ "$limit" -lt "$size" ]; then\n'
203 txt += ' exceed=1\n'
204 txt += ' job_exit_code=70000\n'
205 txt += ' echo "Output Sanbox too big. Produced output is lost "\n'
206 txt += ' else\n'
207 txt += ' exceed=0\n'
208 txt += ' echo "Total Output dimension $size is fine."\n'
209 txt += ' fi\n'
210
211 txt += ' echo "JOB_EXIT_STATUS = $job_exit_code"\n'
212 txt += ' echo "JobExitCode=$job_exit_code" >> $RUNTIME_AREA/$repo\n'
213 txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
214 txt += ' if [ $exceed -ne 1 ]; then\n'
215 txt += ' tar zcvf ${out_files}.tgz ${final_list}\n'
216 txt += ' else\n'
217 txt += ' tar zcvf ${out_files}.tgz CMSSW_${NJob}.stdout CMSSW_${NJob}.stderr\n'
218 txt += ' fi\n'
219 txt += ' exit $job_exit_code\n'
220
221 txt += '}\n'
222 return txt