ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerGlite.py
Revision: 1.73
Committed: Thu Jan 14 10:24:17 2010 UTC (15 years, 3 months ago) by spiga
Content type: text/x-python
Branch: MAIN
Changes since 1.72: +3 -2 lines
Log Message:
set job exit code value after checking OSB size

File Contents

# Content
1 """
2 CRAB interface to BossLite gLite Scheduler
3 """
4
5 __revision__ = "$Id: SchedulerGlite.py,v 1.72 2010/01/13 18:40:41 spiga Exp $"
6 __version__ = "$Revision: 1.72 $"
7
8 from SchedulerGrid import SchedulerGrid
9 from crab_exceptions import *
10 from crab_util import *
11 from GliteConfig import *
12 import EdgLoggingInfo
13 import common
14 from WMCore.SiteScreening.BlackWhiteListParser import CEBlackWhiteListParser
15
16 import os, sys, time
17
18 class SchedulerGlite(SchedulerGrid):
19 def __init__(self, name="GLITE"):
20 SchedulerGrid.__init__(self,name)
21
22 self.OSBsize = 55000000
23
24 def configure(self,cfg_params):
25 SchedulerGrid.configure(self, cfg_params)
26 self.environment_unique_identifier = '$GLITE_WMS_JOBID'
27
28 def realSchedParams(self,cfg_params):
29 """
30 Return dictionary with specific parameters, to use
31 with real scheduler
32 """
33 self.rb_param_file=''
34 if (not cfg_params.has_key('GRID.rb')):
35 cfg_params['GRID.rb']='CERN'
36 self.rb_param_file=common.scheduler.rb_configure(cfg_params.get("GRID.rb"))
37 self.wms_service=cfg_params.get("GRID.wms_service",'')
38 self.skipWMSAuth=cfg_params.get("GRID.skipwmsauth",1)
39 params = { 'service' : self.wms_service, \
40 'config' : self.rb_param_file, \
41 'skipWMSAuth' : self.skipWMSAuth
42 }
43 return params
44
45
46 def rb_configure(self, RB):
47 ## 25-Jun-2009 SL: patch to use Cream enabled WMS
48 if ( self.cfg_params.get('GRID.use_cream',None) ):
49 RB='CREAM'
50 if not RB: return None
51 glite_config = None
52 rb_param_file = None
53
54 gliteConfig = GliteConfig(RB)
55 glite_config = gliteConfig.config()
56
57 if (glite_config ):
58 rb_param_file = glite_config
59 return rb_param_file
60
61 def ce_list(self):
62 """
63 Returns string with requirement CE related
64 """
65 ceParser = CEBlackWhiteListParser(self.EDG_ce_white_list,
66 self.EDG_ce_black_list, common.logger())
67 req = ''
68 ce_white_list = []
69 ce_black_list = []
70 if self.EDG_ce_white_list:
71 ce_white_list = ceParser.whiteList()
72 tmpCe=[]
73 concString = '&&'
74 for ce in ce_white_list:
75 tmpCe.append('RegExp("' + string.strip(ce) + '", other.GlueCEUniqueId)')
76 if len(tmpCe) == 1:
77 req += " && (" + concString.join(tmpCe) + ") "
78 elif len(tmpCe) > 1:
79 firstCE = 0
80 for reqTemp in tmpCe:
81 if firstCE == 0:
82 req += " && ( (" + reqTemp + ") "
83 firstCE = 1
84 elif firstCE > 0:
85 req += " || (" + reqTemp + ") "
86 if firstCE > 0:
87 req += ") "
88
89 if self.EDG_ce_black_list:
90 ce_black_list = ceParser.blackList()
91 tmpCe=[]
92 concString = '&&'
93 for ce in ce_black_list:
94 tmpCe.append('(!RegExp("' + string.strip(ce) + '", other.GlueCEUniqueId))')
95 if len(tmpCe): req += " && (" + concString.join(tmpCe) + ") "
96
97 # requirement added to skip gliteCE
98 # not more needed
99 # req += '&& (!RegExp("blah", other.GlueCEUniqueId))'
100 retWL = ','.join(ce_white_list)
101 retBL = ','.join(ce_black_list)
102 if not retWL:
103 retWL = None
104 if not retBL:
105 retBL = None
106
107 return req, retWL, retBL
108
109 def se_list(self, dest):
110 """
111 Returns string with requirement SE related
112 """
113 hostList=self.findSites_(dest)
114 req=''
115 reqtmp=[]
116 concString = '||'
117
118 for arg in hostList:
119 reqtmp.append(' Member("'+arg+'" , other.GlueCESEBindGroupSEUniqueID) ')
120
121 if len(reqtmp): req += " && (" + concString.join(reqtmp) + ") "
122
123 return req
124
125 def jdlParam(self):
126 """
127 Returns
128 """
129 req=''
130 if self.EDG_addJdlParam:
131 if self.EDG_addJdlParam[-1] == '': self.EDG_addJdlParam= self.EDG_addJdlParam[:-1]
132 for p in self.EDG_addJdlParam:
133 req+=string.strip(p)+';\n'
134 return req
135
136 def specific_req(self):
137 """
138 Returns string with specific requirements
139 """
140 req=''
141 if self.EDG_clock_time:
142 if (not req == ' '): req = req + ' && '
143 req = req + 'other.GlueCEPolicyMaxWallClockTime>='+self.EDG_clock_time
144
145 if self.EDG_cpu_time:
146 if (not req == ' '): req = req + ' && '
147 req = req + ' other.GlueCEPolicyMaxCPUTime>='+self.EDG_cpu_time
148
149 return req
150
151 def sched_parameter(self,i,task):
152 """
153 Returns string with requirements and scheduler-specific parameters
154 """
155 dest= task.jobs[i-1]['dlsDestination']
156
157 req=''
158 req +=task['jobType']
159
160 sched_param=''
161 sched_param+='Requirements = ' + req +self.specific_req() + self.se_list(dest) +\
162 self.ce_list()[0] +';\n'
163 if self.EDG_addJdlParam: sched_param+=self.jdlParam()
164 sched_param+='MyProxyServer = "' + self.proxyServer + '";\n'
165 sched_param+='VirtualOrganisation = "' + self.VO + '";\n'
166 sched_param+='RetryCount = '+str(self.EDG_retry_count)+';\n'
167 sched_param+='ShallowRetryCount = '+str(self.EDG_shallow_retry_count)+';\n'
168
169 return sched_param
170
171 def decodeLogInfo(self, file):
172 """
173 Parse logging info file and return main info
174 """
175 loggingInfo = EdgLoggingInfo.EdgLoggingInfo()
176 reason = loggingInfo.decodeReason(file)
177 return reason
178
179 def findSites_(self, sites):
180 itr4 =[]
181 if len(sites)>0 and sites[0]=="":
182 return itr4
183 if sites != [""]:
184 replicas = self.blackWhiteListParser.checkBlackList(sites)
185 if len(replicas)!=0:
186 replicas = self.blackWhiteListParser.checkWhiteList(replicas)
187
188 itr4 = replicas
189 return itr4
190
191 def delegateProxy(self):
192 self.boss().delegateProxy()
193 return
194
195 def wsExitFunc(self):
196 """
197 """
198 txt = '\n'
199
200 txt += '#\n'
201 txt += '# EXECUTE THIS FUNCTION BEFORE EXIT \n'
202 txt += '#\n\n'
203
204 txt += 'func_exit() { \n'
205 txt += self.wsExitFunc_common()
206 ### specific Glite check for OSB
207 txt += ' tar zcvf ${out_files}.tgz ${final_list}\n'
208 txt += ' tmp_size=`ls -gGrta ${out_files}.tgz | awk \'{ print $3 }\'`\n'
209 txt += ' rm ${out_files}.tgz\n'
210 txt += ' size=`expr $tmp_size`\n'
211 txt += ' echo "Total Output dimension: $size"\n'
212 txt += ' limit='+str(self.OSBsize) +' \n'
213 txt += ' echo "WARNING: output files size limit is set to: $limit"\n'
214 txt += ' if [ "$limit" -lt "$size" ]; then\n'
215 txt += ' exceed=1\n'
216 txt += ' job_exit_code=70000\n'
217 txt += ' echo "Output Sanbox too big. Produced output is lost "\n'
218 txt += ' else\n'
219 txt += ' exceed=0\n'
220 txt += ' echo "Total Output dimension $size is fine."\n'
221 txt += ' fi\n'
222
223 txt += ' echo "JOB_EXIT_STATUS = $job_exit_code"\n'
224 txt += ' echo "JobExitCode=$job_exit_code" >> $RUNTIME_AREA/$repo\n'
225 txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
226 txt += ' if [ $exceed -ne 1 ]; then\n'
227 txt += ' tar zcvf ${out_files}.tgz ${final_list}\n'
228 txt += ' else\n'
229 txt += ' tar zcvf ${out_files}.tgz CMSSW_${NJob}.stdout CMSSW_${NJob}.stderr\n'
230 txt += ' fi\n'
231 txt += ' python $RUNTIME_AREA/fillCrabFjr.py $RUNTIME_AREA/crab_fjr_$NJob.xml --errorcode $job_exit_code \n'
232 txt += ' exit $job_exit_code\n'
233
234 txt += '}\n'
235 return txt