1 |
"""
|
2 |
Base class for all grid schedulers
|
3 |
"""
|
4 |
|
5 |
__revision__ = "$Id: SchedulerGrid.py,v 1.112 2009/07/09 19:14:30 ewv Exp $"
|
6 |
__version__ = "$Revision: 1.112 $"
|
7 |
|
8 |
from Scheduler import Scheduler
|
9 |
from crab_exceptions import *
|
10 |
from crab_util import *
|
11 |
from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
|
12 |
import common
|
13 |
from PhEDExDatasvcInfo import PhEDExDatasvcInfo
|
14 |
from JobList import JobList
|
15 |
|
16 |
import os, sys, time
|
17 |
|
18 |
class SchedulerGrid(Scheduler):
|
19 |
|
20 |
def __init__(self, name):
|
21 |
Scheduler.__init__(self,name)
|
22 |
self.states = [ "Acl", "cancelReason", "cancelling","ce_node","children", \
|
23 |
"children_hist","children_num","children_states","condorId","condor_jdl", \
|
24 |
"cpuTime","destination", "done_code","exit_code","expectFrom", \
|
25 |
"expectUpdate","globusId","jdl","jobId","jobtype", \
|
26 |
"lastUpdateTime","localId","location", "matched_jdl","network_server", \
|
27 |
"owner","parent_job", "reason","resubmitted","rsl","seed",\
|
28 |
"stateEnterTime","stateEnterTimes","subjob_failed", \
|
29 |
"user tags" , "status" , "status_code","hierarchy"]
|
30 |
return
|
31 |
|
32 |
def configure(self, cfg_params):
|
33 |
self.cfg_params = cfg_params
|
34 |
self.jobtypeName = cfg_params.get('CRAB.jobtype','')
|
35 |
self.schedulerName = cfg_params.get('CRAB.scheduler','')
|
36 |
Scheduler.configure(self,cfg_params)
|
37 |
|
38 |
self.proxyValid=0
|
39 |
self.dontCheckProxy=int(cfg_params.get("GRID.dont_check_proxy",0))
|
40 |
|
41 |
self.proxyServer = cfg_params.get("GRID.proxy_server",'myproxy.cern.ch')
|
42 |
common.logger.debug('Setting myproxy server to '+self.proxyServer)
|
43 |
|
44 |
self.group = cfg_params.get("GRID.group", None)
|
45 |
self.role = cfg_params.get("GRID.role", None)
|
46 |
|
47 |
removeT1bL = cfg_params.get("GRID.remove_default_blacklist", 0 )
|
48 |
|
49 |
T1_BL = ["fnal.gov", "gridka.de" ,"w-ce01.grid.sinica.edu.tw", "w-ce02.grid.sinica.edu.tw", "lcg00125.grid.sinica.edu.tw",\
|
50 |
"gridpp.rl.ac.uk" , "cclcgceli03.in2p3.fr","cclcgceli04.in2p3.fr" , "pic.es", "cnaf", "cern.ch"]
|
51 |
if int(removeT1bL) == 1:
|
52 |
T1_BL = []
|
53 |
self.EDG_ce_black_list = cfg_params.get('GRID.ce_black_list',None)
|
54 |
if (self.EDG_ce_black_list):
|
55 |
self.EDG_ce_black_list = string.split(self.EDG_ce_black_list,',') + T1_BL
|
56 |
else :
|
57 |
if int(removeT1bL) == 0: self.EDG_ce_black_list = T1_BL
|
58 |
self.EDG_ce_white_list = cfg_params.get('GRID.ce_white_list',None)
|
59 |
if (self.EDG_ce_white_list): self.EDG_ce_white_list = string.split(self.EDG_ce_white_list,',')
|
60 |
|
61 |
self.VO = cfg_params.get('GRID.virtual_organization','cms')
|
62 |
|
63 |
self.EDG_retry_count = cfg_params.get('GRID.retry_count',0)
|
64 |
self.EDG_shallow_retry_count= cfg_params.get('GRID.shallow_retry_count',-1)
|
65 |
self.EDG_clock_time = cfg_params.get('GRID.max_wall_clock_time',None)
|
66 |
|
67 |
# Default minimum CPU time to >= 130 minutes
|
68 |
self.EDG_cpu_time = cfg_params.get('GRID.max_cpu_time', '130')
|
69 |
|
70 |
# Add EDG_WL_LOCATION to the python path
|
71 |
if not os.environ.has_key('EDG_WL_LOCATION'):
|
72 |
msg = "Error: the EDG_WL_LOCATION variable is not set."
|
73 |
raise CrabException(msg)
|
74 |
path = os.environ['EDG_WL_LOCATION']
|
75 |
|
76 |
libPath=os.path.join(path, "lib")
|
77 |
sys.path.append(libPath)
|
78 |
libPath=os.path.join(path, "lib", "python")
|
79 |
sys.path.append(libPath)
|
80 |
|
81 |
self.checkProxy()
|
82 |
return
|
83 |
|
84 |
def rb_configure(self, RB):
|
85 |
"""
|
86 |
Return a requirement to be add to Jdl to select a specific RB/WMS:
|
87 |
return None if RB=None
|
88 |
To be re-implemented in concrete scheduler
|
89 |
"""
|
90 |
return None
|
91 |
|
92 |
def sched_fix_parameter(self):
|
93 |
"""
|
94 |
Returns string with requirements and scheduler-specific parameters
|
95 |
"""
|
96 |
index = int(common._db.nJobs())
|
97 |
job = common.job_list[index-1]
|
98 |
jbt = job.type()
|
99 |
req = ''
|
100 |
req = req + jbt.getRequirements()
|
101 |
|
102 |
if self.EDG_requirements:
|
103 |
if (not req == ' '):
|
104 |
req = req + ' && '
|
105 |
req = req + self.EDG_requirements
|
106 |
|
107 |
taskReq = {'jobType':req}
|
108 |
common._db.updateTask_(taskReq)
|
109 |
|
110 |
def listMatch(self, dest, full):
|
111 |
matching='fast'
|
112 |
ces=Scheduler.listMatch(self, dest, full)
|
113 |
sites=[]
|
114 |
for ce in ces:
|
115 |
site=ce.split(":")[0]
|
116 |
if site not in sites:
|
117 |
sites.append(site)
|
118 |
pass
|
119 |
if full == True: matching='full'
|
120 |
common.logger.debug("list of available site ( "+str(matching) +" matching ) : "+str(sites))
|
121 |
return sites
|
122 |
|
123 |
|
124 |
def wsSetupEnvironment(self):
|
125 |
"""
|
126 |
Returns part of a job script which does scheduler-specific work.
|
127 |
"""
|
128 |
taskId =common._db.queryTask('name')
|
129 |
index = int(common._db.nJobs())
|
130 |
job = common.job_list[index-1]
|
131 |
jbt = job.type()
|
132 |
if not self.environment_unique_identifier:
|
133 |
try :
|
134 |
self.environment_unique_identifier = self.envUniqueID()
|
135 |
except :
|
136 |
raise CrabException('environment_unique_identifier not set')
|
137 |
|
138 |
# start with wrapper timing
|
139 |
txt = 'export TIME_WRAP_INI=`date +%s` \n'
|
140 |
txt += 'export TIME_STAGEOUT=-2 \n\n'
|
141 |
txt += '# '+self.name()+' specific stuff\n'
|
142 |
txt += '# strip arguments\n'
|
143 |
txt += 'echo "strip arguments"\n'
|
144 |
txt += 'args=("$@")\n'
|
145 |
txt += 'nargs=$#\n'
|
146 |
txt += 'shift $nargs\n'
|
147 |
txt += "# job number (first parameter for job wrapper)\n"
|
148 |
txt += "NJob=${args[0]}; export NJob\n"
|
149 |
|
150 |
txt += "out_files=out_files_${NJob}; export out_files\n"
|
151 |
txt += "echo $out_files\n"
|
152 |
txt += jbt.outList()
|
153 |
|
154 |
txt += 'if [ $JobRunCount ] && [ `expr $JobRunCount - 1` -gt 0 ] && [ $Glidein_MonitorID ]; then \n'
|
155 |
txt += ' attempt=`expr $JobRunCount - 1` \n'
|
156 |
txt += ' MonitorJobID=${NJob}_${Glidein_MonitorID}__${attempt}\n'
|
157 |
txt += ' SyncGridJobId=${Glidein_MonitorID}__${attempt}\n'
|
158 |
txt += 'elif [ $Glidein_MonitorID ]; then \n'
|
159 |
txt += ' MonitorJobID=${NJob}_${Glidein_MonitorID}\n'
|
160 |
txt += ' SyncGridJobId=${Glidein_MonitorID}\n'
|
161 |
txt += 'else \n'
|
162 |
txt += ' MonitorJobID=${NJob}_'+self.environment_unique_identifier+'\n'
|
163 |
txt += ' SyncGridJobId='+self.environment_unique_identifier+'\n'
|
164 |
txt += 'fi\n'
|
165 |
txt += 'MonitorID='+taskId+'\n'
|
166 |
txt += 'echo "MonitorJobID=$MonitorJobID" > $RUNTIME_AREA/$repo \n'
|
167 |
txt += 'echo "SyncGridJobId=$SyncGridJobId" >> $RUNTIME_AREA/$repo \n'
|
168 |
txt += 'echo "MonitorID=$MonitorID" >> $RUNTIME_AREA/$repo\n'
|
169 |
|
170 |
txt += 'echo ">>> GridFlavour discovery: " \n'
|
171 |
txt += 'if [ $OSG_APP ]; then \n'
|
172 |
txt += ' middleware=OSG \n'
|
173 |
txt += ' if [ $OSG_JOB_CONTACT ]; then \n'
|
174 |
txt += ' SyncCE="$OSG_JOB_CONTACT"; \n'
|
175 |
txt += ' echo "SyncCE=$SyncCE" >> $RUNTIME_AREA/$repo ;\n'
|
176 |
txt += ' else\n'
|
177 |
txt += ' echo "not reporting SyncCE";\n'
|
178 |
txt += ' fi\n';
|
179 |
txt += ' echo "GridFlavour=$middleware" | tee -a $RUNTIME_AREA/$repo \n'
|
180 |
txt += ' echo "source OSG GRID setup script" \n'
|
181 |
txt += ' source $OSG_GRID/setup.sh \n'
|
182 |
txt += 'elif [ $NORDUGRID_CE ]; then \n' # We look for $NORDUGRID_CE before $VO_CMS_SW_DIR,
|
183 |
txt += ' middleware=ARC \n' # because the latter is defined for ARC too
|
184 |
txt += ' echo "SyncCE=$NORDUGRID_CE" >> $RUNTIME_AREA/$repo \n'
|
185 |
txt += ' echo "GridFlavour=$middleware" | tee -a $RUNTIME_AREA/$repo \n'
|
186 |
txt += 'elif [ $VO_CMS_SW_DIR ]; then \n'
|
187 |
txt += ' middleware=LCG \n'
|
188 |
txt += ' if [ $GLIDEIN_Gatekeeper ]; then \n'
|
189 |
txt += ' echo "SyncCE=`echo $GLIDEIN_Gatekeeper | sed -e s/:2119//`" >> $RUNTIME_AREA/$repo \n'
|
190 |
txt += ' else \n'
|
191 |
txt += ' echo "SyncCE=`glite-brokerinfo getCE`" >> $RUNTIME_AREA/$repo \n'
|
192 |
txt += ' fi \n'
|
193 |
txt += ' echo "GridFlavour=$middleware" | tee -a $RUNTIME_AREA/$repo \n'
|
194 |
txt += 'else \n'
|
195 |
txt += ' echo "ERROR ==> GridFlavour not identified" \n'
|
196 |
txt += ' job_exit_code=10030 \n'
|
197 |
txt += ' func_exit \n'
|
198 |
txt += 'fi \n'
|
199 |
|
200 |
txt += 'dumpStatus $RUNTIME_AREA/$repo \n'
|
201 |
txt += '\n\n'
|
202 |
|
203 |
|
204 |
txt += 'export VO='+self.VO+'\n'
|
205 |
txt += 'if [ $middleware == LCG ]; then\n'
|
206 |
txt += ' if [ $GLIDEIN_Gatekeeper ]; then\n'
|
207 |
txt += ' CloseCEs=$GLIDEIN_Gatekeeper \n'
|
208 |
txt += ' else\n'
|
209 |
txt += ' CloseCEs=`glite-brokerinfo getCE`\n'
|
210 |
txt += ' fi\n'
|
211 |
txt += ' echo "CloseCEs = $CloseCEs"\n'
|
212 |
txt += ' CE=`echo $CloseCEs | sed -e "s/:.*//"`\n'
|
213 |
txt += ' echo "CE = $CE"\n'
|
214 |
txt += 'elif [ $middleware == OSG ]; then \n'
|
215 |
txt += ' if [ $OSG_JOB_CONTACT ]; then \n'
|
216 |
txt += ' CE=`echo $OSG_JOB_CONTACT | /usr/bin/awk -F\/ \'{print $1}\'` \n'
|
217 |
txt += ' else \n'
|
218 |
txt += ' echo "ERROR ==> OSG mode in setting CE name from OSG_JOB_CONTACT" \n'
|
219 |
txt += ' job_exit_code=10099\n'
|
220 |
txt += ' func_exit\n'
|
221 |
txt += ' fi \n'
|
222 |
txt += 'elif [ $middleware == ARC ]; then \n'
|
223 |
txt += ' echo "CE = $NORDUGRID_CE"\n'
|
224 |
txt += 'fi \n'
|
225 |
|
226 |
return txt
|
227 |
|
228 |
def wsCopyOutput(self):
|
229 |
"""
|
230 |
Write a CopyResults part of a job script, e.g.
|
231 |
to copy produced output into a storage element.
|
232 |
"""
|
233 |
index = int(common._db.nJobs())
|
234 |
job = common.job_list[index-1]
|
235 |
jbt = job.type()
|
236 |
|
237 |
txt = '\n'
|
238 |
|
239 |
txt += '#\n'
|
240 |
txt += '# COPY OUTPUT FILE TO SE\n'
|
241 |
txt += '#\n\n'
|
242 |
|
243 |
if int(self.copy_data) == 1:
|
244 |
stageout = PhEDExDatasvcInfo(self.cfg_params)
|
245 |
endpoint, lfn, SE, SE_PATH, user = stageout.getEndpoint()
|
246 |
if self.check_RemoteDir == 1 :
|
247 |
self.checkRemoteDir(endpoint,jbt.outList('list') )
|
248 |
txt += 'export SE='+SE+'\n'
|
249 |
txt += 'echo "SE = $SE"\n'
|
250 |
txt += 'export SE_PATH='+SE_PATH+'\n'
|
251 |
txt += 'echo "SE_PATH = $SE_PATH"\n'
|
252 |
txt += 'export LFNBaseName='+lfn+'\n'
|
253 |
txt += 'echo "LFNBaseName = $LFNBaseName"\n'
|
254 |
txt += 'export USER='+user+'\n'
|
255 |
txt += 'echo "USER = $USER"\n'
|
256 |
txt += 'export endpoint='+endpoint+'\n'
|
257 |
txt += 'echo "endpoint = $endpoint"\n'
|
258 |
|
259 |
txt += 'echo ">>> Copy output files from WN = `hostname` to $SE_PATH :"\n'
|
260 |
txt += 'export TIME_STAGEOUT_INI=`date +%s` \n'
|
261 |
txt += 'copy_exit_status=0\n'
|
262 |
cmscp_args = ' --destination $endpoint --inputFileList $file_list'
|
263 |
cmscp_args +=' --middleware $middleware --lfn $LFNBaseName %s %s '%(self.loc_stage_out,self.debugWrap)
|
264 |
txt += 'echo "python cmscp.py %s "\n'%cmscp_args
|
265 |
txt += 'python cmscp.py %s \n'%cmscp_args
|
266 |
if self.debug_wrapper==1:
|
267 |
txt += 'echo "which lcg-ls"\n'
|
268 |
txt += 'which lcg-ls\n'
|
269 |
txt += 'echo "########### details of SE interaction"\n'
|
270 |
txt += 'if [ -f .SEinteraction.log ] ;then\n'
|
271 |
txt += ' cat .SEinteraction.log\n'
|
272 |
txt += 'else\n'
|
273 |
txt += ' echo ".SEinteraction.log file not found"\n'
|
274 |
txt += 'fi\n'
|
275 |
txt += 'echo "#####################################"\n'
|
276 |
|
277 |
txt += 'if [ -f cmscpReport.sh ] ;then\n'
|
278 |
txt += ' cat cmscpReport.sh\n'
|
279 |
txt += ' source cmscpReport.sh\n'
|
280 |
txt += 'else\n'
|
281 |
txt += ' echo "cmscpReport.sh file not found"\n'
|
282 |
txt += ' StageOutExitStatus=60307\n'
|
283 |
txt += 'fi\n'
|
284 |
txt += 'if [ $StageOutExitStatus -ne 0 ]; then\n'
|
285 |
txt += ' echo "Problem copying file to $SE $SE_PATH"\n'
|
286 |
txt += ' copy_exit_status=$StageOutExitStatus \n'
|
287 |
if not self.debug_wrapper==1:
|
288 |
txt += 'if [ -f .SEinteraction.log ] ;then\n'
|
289 |
txt += ' echo "########## contents of SE interaction"\n'
|
290 |
txt += ' cat .SEinteraction.log\n'
|
291 |
txt += ' echo "#####################################"\n'
|
292 |
txt += 'else\n'
|
293 |
txt += ' echo ".SEinteraction.log file not found"\n'
|
294 |
txt += 'fi\n'
|
295 |
txt += ' job_exit_code=$StageOutExitStatus\n'
|
296 |
txt += 'fi\n'
|
297 |
txt += 'export TIME_STAGEOUT_END=`date +%s` \n'
|
298 |
txt += 'let "TIME_STAGEOUT = TIME_STAGEOUT_END - TIME_STAGEOUT_INI" \n'
|
299 |
else:
|
300 |
# set stageout timing to a fake value
|
301 |
txt += 'export TIME_STAGEOUT=-1 \n'
|
302 |
return txt
|
303 |
|
304 |
def userName(self):
|
305 |
""" return the user name """
|
306 |
tmp=runCommand("voms-proxy-info -identity 2>/dev/null")
|
307 |
return tmp.strip()
|
308 |
|
309 |
def configOpt_(self):
|
310 |
edg_ui_cfg_opt = ' '
|
311 |
if self.edg_config:
|
312 |
edg_ui_cfg_opt = ' -c ' + self.edg_config + ' '
|
313 |
if self.edg_config_vo:
|
314 |
edg_ui_cfg_opt += ' --config-vo ' + self.edg_config_vo + ' '
|
315 |
return edg_ui_cfg_opt
|
316 |
|
317 |
|
318 |
|
319 |
def tags(self):
|
320 |
task=common._db.getTask()
|
321 |
tags_tmp=string.split(task['jobType'],'"')
|
322 |
tags=[str(tags_tmp[1]),str(tags_tmp[3])]
|
323 |
return tags
|
324 |
|