ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerGlite.py
Revision: 1.86
Committed: Thu Jan 17 15:15:03 2013 UTC (12 years, 3 months ago) by belforte
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_9_1, CRAB_2_9_1_pre2, CRAB_2_9_1_pre1, CRAB_2_9_0, CRAB_2_9_0_pre2, CRAB_2_9_0_pre1, CRAB_2_8_8, CRAB_2_8_8_pre1, CRAB_2_8_7_patch3, CRAB_2_8_7_patch2, CRAB_2_8_7_patch1, CRAB_2_8_7, CRAB_2_8_7_pre2, CRAB_2_8_7_pre1, CRAB_2_8_6, CRAB_2_8_6_pre1, CRAB_2_8_5_patch3, CRAB_2_8_5_patch2, CRAB_2_8_5_patch1, CRAB_2_8_5, CRAB_2_8_5_pre5, CRAB_2_8_5_pre4, CRAB_2_8_5_pre3, HEAD
Changes since 1.85: +2 -28 lines
Log Message:
move OSB sixe check to Scheduler.py https://savannah.cern.ch/bugs/index.php?95466

File Contents

# Content
1 """
2 CRAB interface to BossLite gLite Scheduler
3 """
4
5 __revision__ = "$Id: SchedulerGlite.py,v 1.85 2012/06/14 15:50:46 belforte Exp $"
6 __version__ = "$Revision: 1.85 $"
7
8 from SchedulerGrid import SchedulerGrid
9 from crab_exceptions import *
10 from crab_util import *
11 import EdgLoggingInfo
12 import common
13 from WMCore.SiteScreening.BlackWhiteListParser import CEBlackWhiteListParser
14
15 import os, sys, time
16
17 class SchedulerGlite(SchedulerGrid):
18 def __init__(self, name="GLITE"):
19 SchedulerGrid.__init__(self,name)
20
21 self.EDG_retry_count = 0
22 self.EDG_shallow_retry_count= -1
23 self.OSBsize = 55*1000*1000 # 55MB
24
25 def configure(self,cfg_params):
26 SchedulerGrid.configure(self, cfg_params)
27 self.environment_unique_identifier = '$GLITE_WMS_JOBID'
28
29 def realSchedParams(self,cfg_params):
30 """
31 Return dictionary with specific parameters, to use
32 with real scheduler
33 """
34 self.rb_param_file=''
35 if (not cfg_params.has_key('GRID.rb')):
36 cfg_params['GRID.rb']='CERN'
37 self.rb_param_file=common.scheduler.rb_configure(cfg_params.get("GRID.rb"))
38 self.wms_service=cfg_params.get("GRID.wms_service",'')
39 self.skipWMSAuth=cfg_params.get("GRID.skipwmsauth",1)
40 params = { 'service' : self.wms_service, \
41 'config' : self.rb_param_file, \
42 'skipWMSAuth' : self.skipWMSAuth
43 }
44 return params
45
46
47 def rb_configure(self, RB):
48 url ='http://cmsdoc.cern.ch/cms/LCG/crab/config/'
49 from Downloader import Downloader
50 import httplib
51 common.logger.debug('Downloading config files for WMS: '+url)
52 ## 25-Jun-2009 SL: patch to use Cream enabled WMS
53 if ( self.cfg_params.get('GRID.use_cream',None) ):
54 RB='CREAM'
55 if not RB: return None
56 rb_param_file = None
57 configFileName = 'glite_wms_'+str(RB)+'.conf'
58
59 results = Downloader(url)
60 try:
61 gliteConfig = results.filePath(configFileName)
62 except httplib.HTTPException, ex:
63 raise CrabException( "Problem getting RB config file: %s, reason:"%(configFileName, ex) )
64
65 if (gliteConfig ):
66 rb_param_file = gliteConfig
67 return rb_param_file
68
69 def ce_list(self):
70 """
71 Returns string with requirement CE related
72 """
73 ceParser = CEBlackWhiteListParser(self.EDG_ce_white_list,
74 self.EDG_ce_black_list, common.logger())
75 req = ''
76 ce_white_list = []
77 ce_black_list = []
78 if self.EDG_ce_white_list:
79 ce_white_list = ceParser.whiteList()
80 tmpCe=[]
81 concString = '&&'
82 for ce in ce_white_list:
83 tmpCe.append('RegExp("' + string.strip(ce) + '", other.GlueCEUniqueId)')
84 if len(tmpCe) == 1:
85 req += " && (" + concString.join(tmpCe) + ") "
86 elif len(tmpCe) > 1:
87 firstCE = 0
88 for reqTemp in tmpCe:
89 if firstCE == 0:
90 req += " && ( (" + reqTemp + ") "
91 firstCE = 1
92 elif firstCE > 0:
93 req += " || (" + reqTemp + ") "
94 if firstCE > 0:
95 req += ") "
96
97 if self.EDG_ce_black_list:
98 ce_black_list = ceParser.blackList()
99 tmpCe=[]
100 concString = '&&'
101 for ce in ce_black_list:
102 tmpCe.append('(!RegExp("' + string.strip(ce) + '", other.GlueCEUniqueId))')
103 if len(tmpCe): req += " && (" + concString.join(tmpCe) + ") "
104
105 # requirement added to skip gliteCE
106 # not more needed
107 # req += '&& (!RegExp("blah", other.GlueCEUniqueId))'
108 retWL = ','.join(ce_white_list)
109 retBL = ','.join(ce_black_list)
110 if not retWL:
111 retWL = None
112 if not retBL:
113 retBL = None
114
115 return req, retWL, retBL
116
117 def se_list(self, dest):
118 """
119 Returns string with requirement SE related
120 """
121 hostList=self.findSites_(dest)
122 req=''
123 reqtmp=[]
124 concString = '||'
125
126 for arg in hostList:
127 reqtmp.append(' Member("'+arg+'" , other.GlueCESEBindGroupSEUniqueID) ')
128
129 if len(reqtmp): req += " && (" + concString.join(reqtmp) + ") "
130
131 return req
132
133 def jdlParam(self):
134 """
135 Returns
136 """
137 req=''
138 if self.EDG_addJdlParam:
139 if self.EDG_addJdlParam[-1] == '': self.EDG_addJdlParam= self.EDG_addJdlParam[:-1]
140 for p in self.EDG_addJdlParam:
141 req+=string.strip(p)+';\n'
142 return req
143
144 def specific_req(self):
145 """
146 Returns string with specific requirements
147 """
148 req=''
149 if self.EDG_clock_time:
150 if (not req == ' '): req = req + ' && '
151 req = req + 'other.GlueCEPolicyMaxWallClockTime>='+self.EDG_clock_time
152
153 if self.EDG_cpu_time:
154 if (not req == ' '): req = req + ' && '
155 req = req + ' other.GlueCEPolicyMaxCPUTime>='+self.EDG_cpu_time
156
157 return req
158
159 def sched_parameter(self,i,task):
160 """
161 Returns string with requirements and scheduler-specific parameters
162 """
163 dest= task.jobs[i-1]['dlsDestination']
164
165 req=''
166 #req +=task['jobType']
167 ####### FEDE FOR BUG 73010 ############
168 try:
169 #print "task['jobType'] = ", task['jobType']
170 req +=task['jobType']
171 except TypeError:
172 msg = "Error: wrong or missing task info. Your created task can not be submitted. Please check your configuration file and create the task again. \n "
173 raise CrabException(msg)
174 #######################################
175
176 sched_param=''
177 sched_param+='Requirements = ' + req +self.specific_req() + self.se_list(dest) +\
178 self.ce_list()[0] +';\n'
179 if self.EDG_addJdlParam: sched_param+=self.jdlParam()
180 sched_param+='MyProxyServer = "' + self.proxyServer + '";\n'
181 sched_param+='VirtualOrganisation = "' + self.VO + '";\n'
182 sched_param+='RetryCount = '+str(self.EDG_retry_count)+';\n'
183 sched_param+='DefaultNodeRetryCount = '+str(self.EDG_retry_count)+';\n'
184 sched_param+='ShallowRetryCount = '+str(self.EDG_shallow_retry_count)+';\n'
185 sched_param+='DefaultNodeShallowRetryCount = '+str(self.EDG_shallow_retry_count)+';\n'
186
187 return sched_param
188
189 def decodeLogInfo(self, file):
190 """
191 Parse logging info file and return main info
192 """
193 loggingInfo = EdgLoggingInfo.EdgLoggingInfo()
194 reason = loggingInfo.decodeReason(file)
195 return reason
196
197 def findSites_(self, sites):
198 itr4 =[]
199 if len(sites)>0 and sites[0]=="":
200 return itr4
201 if sites != [""]:
202 replicas = self.blackWhiteListParser.checkBlackList(sites)
203 if len(replicas)!=0:
204 replicas = self.blackWhiteListParser.checkWhiteList(replicas)
205
206 itr4 = replicas
207 return itr4
208
209 def delegateProxy(self):
210 self.boss().delegateProxy()
211 return
212
213 def wsExitFunc(self):
214 """
215 """
216 txt = '\n'
217
218 txt += '#\n'
219 txt += '# EXECUTE THIS FUNCTION BEFORE EXIT \n'
220 txt += '#\n\n'
221
222 txt += 'func_exit() { \n'
223 txt += self.wsExitFunc_common()
224 txt += ' exit $job_exit_code\n'
225
226 txt += '}\n'
227 return txt
228
229 def listMatch(self, dest, full):
230 matching='fast'
231
232 if self.boss().schedulerConfig['name'] == 'SchedulerGLite' :
233 taskId=common._db.getTask()
234 req=str(self.sched_parameter(1,taskId))
235 sites = self.boss().schedSession().matchResources(taskId, requirements=req)
236 else :
237 sites = SchedulerGrid.listMatch(self, dest, full)
238
239 if full == True: matching='full'
240 common.logger.debug("list of available site ( "+str(matching) +" matching ) : "+str(sites))
241
242 return sites