ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerGlite.py
Revision: 1.55
Committed: Tue May 20 11:28:49 2008 UTC (16 years, 11 months ago) by afanfani
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_2_1_pre3, CRAB_2_2_1_pre2, CRAB_2_2_1_pre1
Changes since 1.54: +3 -1 lines
Log Message:
add skipwmsauth option

File Contents

# Content
1 from SchedulerGrid import SchedulerGrid
2 from crab_logger import Logger
3 from crab_exceptions import *
4 from crab_util import *
5 from GliteConfig import *
6 import EdgLoggingInfo
7 import common
8
9 import os, sys, time
10
11 class SchedulerGlite(SchedulerGrid):
12 def __init__(self, name="GLITE"):
13 SchedulerGrid.__init__(self,name)
14
15 self.OSBsize = 55000000
16
17 def configure(self,cfg_params):
18 SchedulerGrid.configure(self, cfg_params)
19 self.checkProxy()
20 self.environment_unique_identifier = 'GLITE_WMS_JOBID'
21
22 def realSchedParams(self,cfg_params):
23 """
24 Return dictionary with specific parameters, to use
25 with real scheduler
26 """
27 self.rb_param_file=''
28 if (cfg_params.has_key('EDG.rb')):
29 self.rb_param_file=common.scheduler.rb_configure(cfg_params.get("EDG.rb"))
30 self.wms_service=cfg_params.get("EDG.wms_service",'')
31 self.skipWMSAuth=cfg_params.get("EDG.skipwmsauth",0)
32 params = { 'service' : self.wms_service, \
33 'config' : self.rb_param_file, \
34 'skipWMSAuth' : self.skipWMSAuth
35 }
36 return params
37
38
39 def rb_configure(self, RB):
40 if not RB: return None
41 glite_config = None
42 rb_param_file = None
43
44 gliteConfig = GliteConfig(RB)
45 glite_config = gliteConfig.config()
46
47 if (glite_config ):
48 rb_param_file = glite_config
49 return rb_param_file
50
51 def ce_list(self):
52 """
53 Returns string with requirement CE related
54 """
55 req = ''
56 if self.EDG_ce_white_list:
57 ce_white_list = self.EDG_ce_white_list
58 tmpCe=[]
59 concString = '&&'
60 for ce in ce_white_list:
61 tmpCe.append('RegExp("' + string.strip(ce) + '", other.GlueCEUniqueId)')
62 if len(tmpCe) == 1:
63 req += " && (" + concString.join(tmpCe) + ") "
64 elif len(tmpCe) > 1:
65 firstCE = 0
66 for reqTemp in tmpCe:
67 if firstCE == 0:
68 req += " && ( (" + reqTemp + ") "
69 firstCE = 1
70 elif firstCE > 0:
71 req += " || (" + reqTemp + ") "
72 if firstCE > 0:
73 req += ") "
74
75 if self.EDG_ce_black_list:
76 ce_black_list = self.EDG_ce_black_list
77 tmpCe=[]
78 concString = '&&'
79 for ce in ce_black_list:
80 tmpCe.append('(!RegExp("' + string.strip(ce) + '", other.GlueCEUniqueId))')
81 if len(tmpCe): req += " && (" + concString.join(tmpCe) + ") "
82
83 # requirement added to skip gliteCE
84 req += '&& (!RegExp("blah", other.GlueCEUniqueId))'
85
86 return req,self.EDG_ce_white_list,self.EDG_ce_black_list
87
88 def se_list(self, id, dest):
89 """
90 Returns string with requirement SE related
91 """
92 hostList=self.findSites_(id,dest)
93 req=''
94 reqtmp=[]
95 concString = '||'
96
97 for arg in hostList:
98 reqtmp.append(' Member("'+arg+'" , other.GlueCESEBindGroupSEUniqueID) ')
99
100 if len(reqtmp): req += " && (" + concString.join(reqtmp) + ") "
101
102 return req
103
104 def jdlParam(self):
105 """
106 Returns
107 """
108 req=''
109 if self.EDG_addJdlParam:
110 if self.EDG_addJdlParam[-1] == '': self.EDG_addJdlParam= self.EDG_addJdlParam[:-1]
111 for p in self.EDG_addJdlParam:
112 # param_file.write(string.strip(p)+';\n')
113 req+=string.strip(p)+';\n' ## BL--DS
114 return req
115
116 def specific_req(self):
117 """
118 Returns string with specific requirements
119 """
120 req=''
121 if self.EDG_clock_time:
122 if (not req == ' '): req = req + ' && '
123 req = req + 'other.GlueCEPolicyMaxWallClockTime>='+self.EDG_clock_time
124
125 if self.EDG_cpu_time:
126 if (not req == ' '): req = req + ' && '
127 req = req + ' other.GlueCEPolicyMaxCPUTime>='+self.EDG_cpu_time
128
129 return req
130
131 def sched_fix_parameter(self):
132 """
133 Returns string with requirements and scheduler-specific parameters
134 """
135 index = int(common._db.nJobs())
136 job = common.job_list[index-1]
137 jbt = job.type()
138 req = ''
139 req = req + jbt.getRequirements()
140
141 if self.EDG_requirements:
142 if (not req == ' '): req = req + ' && '
143 req = req + self.EDG_requirements
144
145 Task_Req={'jobType':req}## DS--BL
146 common._db.updateTask_(Task_Req)
147
148 def sched_parameter(self,i,task):
149 """
150 Returns string with requirements and scheduler-specific parameters
151 """
152 dest= task.jobs[i-1]['dlsDestination'] ## DS--BL
153
154 req=''
155 req +=task['jobType']
156
157 sched_param=''
158 sched_param+='Requirements = ' + req +self.specific_req() + self.se_list(i,dest) +\
159 self.ce_list()[0] +';\n' ## BL--DS
160 if self.EDG_addJdlParam: sched_param+=self.jdlParam() ## BL--DS
161 sched_param+='MyProxyServer = "' + self.proxyServer + '";\n'
162 sched_param+='VirtualOrganisation = "' + self.VO + '";\n'
163 sched_param+='RetryCount = '+str(self.EDG_retry_count)+';\n'
164 sched_param+='ShallowRetryCount = '+str(self.EDG_shallow_retry_count)+';\n'
165
166 return sched_param
167
168 def decodeLogInfo(self, file):
169 """
170 Parse logging info file and return main info
171 """
172 loggingInfo = EdgLoggingInfo.EdgLoggingInfo()
173 reason = loggingInfo.decodeReason(file)
174 return reason
175
176 def findSites_(self, n, sites):
177 itr4 =[]
178 if len(sites)>0 and sites[0]=="":
179 return itr4
180 if sites != [""]:
181 ##Addedd Daniele
182 replicas = self.blackWhiteListParser.checkBlackList(sites,n)
183 if len(replicas)!=0:
184 replicas = self.blackWhiteListParser.checkWhiteList(replicas,n)
185
186 itr4 = replicas
187 #####
188 return itr4
189
190
191 def wsExitFunc(self):
192 """
193 """
194 txt = '\n'
195
196 txt += '#\n'
197 txt += '# EXECUTE THIS FUNCTION BEFORE EXIT \n'
198 txt += '#\n\n'
199
200 txt += 'func_exit() { \n'
201 txt += self.wsExitFunc_common()
202 ### specific Glite check for OSB
203 txt += ' tar zcvf ${out_files}.tgz ${final_list}\n'
204 txt += ' tmp_size=`ls -gGrta ${out_files}.tgz | awk \'{ print $3 }\'`\n'
205 txt += ' rm ${out_files}.tgz\n'
206 txt += ' size=`expr $tmp_size`\n'
207 txt += ' echo "Total Output dimension: $size"\n'
208 txt += ' limit='+str(self.OSBsize) +' \n'
209 txt += ' echo "WARNING: output files size limit is set to: $limit"\n'
210 txt += ' if [ "$limit" -lt "$size" ]; then\n'
211 txt += ' exceed=1\n'
212 txt += ' job_exit_code=70000\n'
213 txt += ' echo "Output Sanbox too big. Produced output is lost "\n'
214 txt += ' else\n'
215 txt += ' exceed=0\n'
216 txt += ' echo "Total Output dimension $size is fine."\n'
217 txt += ' fi\n'
218
219 txt += ' echo "JOB_EXIT_STATUS = $job_exit_code"\n'
220 txt += ' echo "JobExitCode=$job_exit_code" >> $RUNTIME_AREA/$repo\n'
221 txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
222 txt += ' if [ $exceed -ne 1 ]; then\n'
223 txt += ' tar zcvf ${out_files}.tgz ${final_list}\n'
224 txt += ' else\n'
225 txt += ' tar zcvf ${out_files}.tgz CMSSW_${NJob}.stdout CMSSW_${NJob}.stderr\n'
226 txt += ' fi\n'
227 txt += ' exit $job_exit_code\n'
228
229 txt += '}\n'
230 return txt
231
232 def userName(self):
233 """ return the user name """
234 tmp=runCommand("voms-proxy-info -identity")
235 return tmp.strip()