1 |
ewv |
1.69 |
"""
|
2 |
|
|
Base class for all grid schedulers
|
3 |
|
|
"""
|
4 |
|
|
|
5 |
spiga |
1.110 |
__revision__ = "$Id: SchedulerGrid.py,v 1.109 2009/05/31 13:19:41 spiga Exp $"
|
6 |
|
|
__version__ = "$Revision: 1.109 $"
|
7 |
ewv |
1.69 |
|
8 |
gutsche |
1.1 |
from Scheduler import Scheduler
|
9 |
|
|
from crab_exceptions import *
|
10 |
|
|
from crab_util import *
|
11 |
ewv |
1.81 |
from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
|
12 |
gutsche |
1.1 |
import common
|
13 |
spiga |
1.70 |
from PhEDExDatasvcInfo import PhEDExDatasvcInfo
|
14 |
spiga |
1.28 |
from JobList import JobList
|
15 |
gutsche |
1.1 |
|
16 |
|
|
import os, sys, time
|
17 |
|
|
|
18 |
|
|
class SchedulerGrid(Scheduler):
|
19 |
slacapra |
1.8 |
|
20 |
|
|
def __init__(self, name):
|
21 |
|
|
Scheduler.__init__(self,name)
|
22 |
gutsche |
1.1 |
self.states = [ "Acl", "cancelReason", "cancelling","ce_node","children", \
|
23 |
|
|
"children_hist","children_num","children_states","condorId","condor_jdl", \
|
24 |
|
|
"cpuTime","destination", "done_code","exit_code","expectFrom", \
|
25 |
|
|
"expectUpdate","globusId","jdl","jobId","jobtype", \
|
26 |
|
|
"lastUpdateTime","localId","location", "matched_jdl","network_server", \
|
27 |
|
|
"owner","parent_job", "reason","resubmitted","rsl","seed",\
|
28 |
|
|
"stateEnterTime","stateEnterTimes","subjob_failed", \
|
29 |
|
|
"user tags" , "status" , "status_code","hierarchy"]
|
30 |
|
|
return
|
31 |
slacapra |
1.12 |
|
32 |
slacapra |
1.8 |
def configure(self, cfg_params):
|
33 |
spiga |
1.70 |
self.cfg_params = cfg_params
|
34 |
spiga |
1.98 |
self.jobtypeName = cfg_params.get('CRAB.jobtype','')
|
35 |
|
|
self.schedulerName = cfg_params.get('CRAB.scheduler','')
|
36 |
slacapra |
1.8 |
Scheduler.configure(self,cfg_params)
|
37 |
|
|
|
38 |
|
|
self.proxyValid=0
|
39 |
spiga |
1.105 |
self.dontCheckProxy=int(cfg_params.get("GRID.dont_check_proxy",0))
|
40 |
slacapra |
1.8 |
|
41 |
spiga |
1.105 |
self.proxyServer = cfg_params.get("GRID.proxy_server",'myproxy.cern.ch')
|
42 |
spiga |
1.104 |
common.logger.debug('Setting myproxy server to '+self.proxyServer)
|
43 |
slacapra |
1.8 |
|
44 |
spiga |
1.105 |
self.group = cfg_params.get("GRID.group", None)
|
45 |
|
|
self.role = cfg_params.get("GRID.role", None)
|
46 |
slacapra |
1.8 |
|
47 |
spiga |
1.105 |
removeT1bL = cfg_params.get("GRID.remove_default_blacklist", 0 )
|
48 |
ewv |
1.86 |
|
49 |
spiga |
1.88 |
T1_BL = ["fnal.gov", "gridka.de" ,"w-ce01.grid.sinica.edu.tw", "w-ce02.grid.sinica.edu.tw", "lcg00125.grid.sinica.edu.tw",\
|
50 |
fanzago |
1.107 |
"gridpp.rl.ac.uk" , "cclcgceli03.in2p3.fr","cclcgceli04.in2p3.fr" , "pic.es", "cnaf", "cern.ch"]
|
51 |
spiga |
1.90 |
if int(removeT1bL) == 1:
|
52 |
|
|
T1_BL = []
|
53 |
spiga |
1.105 |
self.EDG_ce_black_list = cfg_params.get('GRID.ce_black_list',None)
|
54 |
spiga |
1.106 |
if (self.EDG_ce_black_list):
|
55 |
|
|
self.EDG_ce_black_list = string.split(self.EDG_ce_black_list,',') + T1_BL
|
56 |
spiga |
1.83 |
else :
|
57 |
spiga |
1.90 |
if int(removeT1bL) == 0: self.EDG_ce_black_list = T1_BL
|
58 |
spiga |
1.105 |
self.EDG_ce_white_list = cfg_params.get('GRID.ce_white_list',None)
|
59 |
slacapra |
1.8 |
if (self.EDG_ce_white_list): self.EDG_ce_white_list = string.split(self.EDG_ce_white_list,',')
|
60 |
ewv |
1.86 |
|
61 |
spiga |
1.105 |
self.VO = cfg_params.get('GRID.virtual_organization','cms')
|
62 |
|
|
self.EDG_requirements = cfg_params.get('GRID.requirements',None)
|
63 |
|
|
self.EDG_addJdlParam = cfg_params.get('GRID.additional_jdl_parameters',None)
|
64 |
slacapra |
1.8 |
|
65 |
|
|
if (self.EDG_addJdlParam): self.EDG_addJdlParam = string.split(self.EDG_addJdlParam,';')
|
66 |
|
|
|
67 |
spiga |
1.105 |
self.EDG_retry_count = cfg_params.get('GRID.retry_count',0)
|
68 |
|
|
self.EDG_shallow_retry_count= cfg_params.get('GRID.shallow_retry_count',-1)
|
69 |
|
|
self.EDG_clock_time = cfg_params.get('GRID.max_wall_clock_time',None)
|
70 |
slacapra |
1.8 |
|
71 |
afanfani |
1.23 |
# Default minimum CPU time to >= 130 minutes
|
72 |
spiga |
1.105 |
self.EDG_cpu_time = cfg_params.get('GRID.max_cpu_time', '130')
|
73 |
gutsche |
1.1 |
|
74 |
|
|
# Add EDG_WL_LOCATION to the python path
|
75 |
slacapra |
1.8 |
if not os.environ.has_key('EDG_WL_LOCATION'):
|
76 |
gutsche |
1.1 |
msg = "Error: the EDG_WL_LOCATION variable is not set."
|
77 |
ewv |
1.27 |
raise CrabException(msg)
|
78 |
|
|
path = os.environ['EDG_WL_LOCATION']
|
79 |
gutsche |
1.1 |
|
80 |
ewv |
1.27 |
libPath=os.path.join(path, "lib")
|
81 |
|
|
sys.path.append(libPath)
|
82 |
|
|
libPath=os.path.join(path, "lib", "python")
|
83 |
|
|
sys.path.append(libPath)
|
84 |
ewv |
1.24 |
|
85 |
spiga |
1.49 |
self.checkProxy()
|
86 |
gutsche |
1.1 |
return
|
87 |
slacapra |
1.8 |
|
88 |
|
|
def rb_configure(self, RB):
|
89 |
|
|
"""
|
90 |
ewv |
1.16 |
Return a requirement to be add to Jdl to select a specific RB/WMS:
|
91 |
slacapra |
1.8 |
return None if RB=None
|
92 |
|
|
To be re-implemented in concrete scheduler
|
93 |
|
|
"""
|
94 |
|
|
return None
|
95 |
ewv |
1.24 |
|
96 |
|
|
def sched_fix_parameter(self):
|
97 |
ewv |
1.69 |
"""
|
98 |
|
|
Returns string with requirements and scheduler-specific parameters
|
99 |
|
|
"""
|
100 |
|
|
index = int(common._db.nJobs())
|
101 |
|
|
job = common.job_list[index-1]
|
102 |
|
|
jbt = job.type()
|
103 |
|
|
req = ''
|
104 |
|
|
req = req + jbt.getRequirements()
|
105 |
|
|
|
106 |
|
|
if self.EDG_requirements:
|
107 |
|
|
if (not req == ' '):
|
108 |
|
|
req = req + ' && '
|
109 |
|
|
req = req + self.EDG_requirements
|
110 |
|
|
|
111 |
|
|
taskReq = {'jobType':req}
|
112 |
|
|
common._db.updateTask_(taskReq)
|
113 |
gutsche |
1.1 |
|
114 |
spiga |
1.50 |
def listMatch(self, dest, full):
|
115 |
ewv |
1.58 |
matching='fast'
|
116 |
spiga |
1.50 |
ces=Scheduler.listMatch(self, dest, full)
|
117 |
slacapra |
1.35 |
sites=[]
|
118 |
|
|
for ce in ces:
|
119 |
|
|
site=ce.split(":")[0]
|
120 |
|
|
if site not in sites:
|
121 |
|
|
sites.append(site)
|
122 |
|
|
pass
|
123 |
ewv |
1.58 |
if full == True: matching='full'
|
124 |
spiga |
1.104 |
common.logger.debug("list of available site ( "+str(matching) +" matching ) : "+str(sites))
|
125 |
slacapra |
1.35 |
return sites
|
126 |
|
|
|
127 |
|
|
|
128 |
gutsche |
1.1 |
def wsSetupEnvironment(self):
|
129 |
|
|
"""
|
130 |
|
|
Returns part of a job script which does scheduler-specific work.
|
131 |
|
|
"""
|
132 |
spiga |
1.99 |
taskId =common._db.queryTask('name')
|
133 |
spiga |
1.28 |
index = int(common._db.nJobs())
|
134 |
|
|
job = common.job_list[index-1]
|
135 |
|
|
jbt = job.type()
|
136 |
slacapra |
1.8 |
if not self.environment_unique_identifier:
|
137 |
ewv |
1.101 |
try :
|
138 |
|
|
self.environment_unique_identifier = self.envUniqueID()
|
139 |
|
|
except :
|
140 |
|
|
raise CrabException('environment_unique_identifier not set')
|
141 |
slacapra |
1.8 |
|
142 |
ewv |
1.58 |
# start with wrapper timing
|
143 |
farinafa |
1.55 |
txt = 'export TIME_WRAP_INI=`date +%s` \n'
|
144 |
farinafa |
1.61 |
txt += 'export TIME_STAGEOUT=-2 \n\n'
|
145 |
farinafa |
1.53 |
txt += '# '+self.name()+' specific stuff\n'
|
146 |
slacapra |
1.8 |
txt += '# strip arguments\n'
|
147 |
|
|
txt += 'echo "strip arguments"\n'
|
148 |
|
|
txt += 'args=("$@")\n'
|
149 |
|
|
txt += 'nargs=$#\n'
|
150 |
|
|
txt += 'shift $nargs\n'
|
151 |
|
|
txt += "# job number (first parameter for job wrapper)\n"
|
152 |
ewv |
1.16 |
txt += "NJob=${args[0]}; export NJob\n"
|
153 |
slacapra |
1.8 |
|
154 |
spiga |
1.29 |
txt += "out_files=out_files_${NJob}; export out_files\n"
|
155 |
|
|
txt += "echo $out_files\n"
|
156 |
spiga |
1.28 |
txt += jbt.outList()
|
157 |
spiga |
1.29 |
|
158 |
ewv |
1.65 |
txt += 'MonitorJobID=${NJob}_'+self.environment_unique_identifier+'\n'
|
159 |
|
|
txt += 'SyncGridJobId='+self.environment_unique_identifier+'\n'
|
160 |
spiga |
1.89 |
txt += 'MonitorID='+taskId+'\n'
|
161 |
fanzago |
1.17 |
txt += 'echo "MonitorJobID=$MonitorJobID" > $RUNTIME_AREA/$repo \n'
|
162 |
|
|
txt += 'echo "SyncGridJobId=$SyncGridJobId" >> $RUNTIME_AREA/$repo \n'
|
163 |
|
|
txt += 'echo "MonitorID=$MonitorID" >> $RUNTIME_AREA/$repo\n'
|
164 |
slacapra |
1.8 |
|
165 |
fanzago |
1.17 |
txt += 'echo ">>> GridFlavour discovery: " \n'
|
166 |
slacapra |
1.8 |
txt += 'if [ $OSG_APP ]; then \n'
|
167 |
|
|
txt += ' middleware=OSG \n'
|
168 |
|
|
txt += ' if [ $OSG_JOB_CONTACT ]; then \n'
|
169 |
|
|
txt += ' SyncCE="$OSG_JOB_CONTACT"; \n'
|
170 |
fanzago |
1.17 |
txt += ' echo "SyncCE=$SyncCE" >> $RUNTIME_AREA/$repo ;\n'
|
171 |
slacapra |
1.8 |
txt += ' else\n'
|
172 |
|
|
txt += ' echo "not reporting SyncCE";\n'
|
173 |
|
|
txt += ' fi\n';
|
174 |
fanzago |
1.17 |
txt += ' echo "GridFlavour=$middleware" | tee -a $RUNTIME_AREA/$repo \n'
|
175 |
ewv |
1.80 |
txt += ' echo "source OSG GRID setup script" \n'
|
176 |
|
|
txt += ' source $OSG_GRID/setup.sh \n'
|
177 |
ewv |
1.108 |
txt += 'elif [ $NORDUGRID_CE ]; then \n' # We look for $NORDUGRID_CE before $VO_CMS_SW_DIR,
|
178 |
edelmann |
1.102 |
txt += ' middleware=ARC \n' # because the latter is defined for ARC too
|
179 |
|
|
txt += ' echo "SyncCE=$NORDUGRID_CE" >> $RUNTIME_AREA/$repo \n'
|
180 |
|
|
txt += ' echo "GridFlavour=$middleware" | tee -a $RUNTIME_AREA/$repo \n'
|
181 |
slacapra |
1.8 |
txt += 'elif [ $VO_CMS_SW_DIR ]; then \n'
|
182 |
spiga |
1.109 |
txt += ' middleware=LCG \n'
|
183 |
|
|
txt += ' if [ $GLIDEIN_Gatekeeper ]; then \n'
|
184 |
|
|
txt += ' echo "SyncCE=`echo $GLIDEIN_Gatekeeper | sed -e s/:2119//`" >> $RUNTIME_AREA/$repo \n'
|
185 |
|
|
txt += ' else \n'
|
186 |
|
|
txt += ' echo "SyncCE=`glite-brokerinfo getCE`" >> $RUNTIME_AREA/$repo \n'
|
187 |
|
|
txt += ' fi \n'
|
188 |
fanzago |
1.17 |
txt += ' echo "GridFlavour=$middleware" | tee -a $RUNTIME_AREA/$repo \n'
|
189 |
gutsche |
1.1 |
txt += 'else \n'
|
190 |
fanzago |
1.17 |
txt += ' echo "ERROR ==> GridFlavour not identified" \n'
|
191 |
|
|
txt += ' job_exit_code=10030 \n'
|
192 |
|
|
txt += ' func_exit \n'
|
193 |
slacapra |
1.8 |
txt += 'fi \n'
|
194 |
|
|
|
195 |
|
|
txt += 'dumpStatus $RUNTIME_AREA/$repo \n'
|
196 |
gutsche |
1.1 |
txt += '\n\n'
|
197 |
|
|
|
198 |
spiga |
1.110 |
|
199 |
ewv |
1.37 |
txt += 'export VO='+self.VO+'\n'
|
200 |
|
|
txt += 'if [ $middleware == LCG ]; then\n'
|
201 |
spiga |
1.110 |
txt += ' if [ $GLIDEIN_Gatekeeper ]; then\n'
|
202 |
|
|
txt += ' CloseCEs=$GLIDEIN_Gatekeeper \n'
|
203 |
|
|
txt += ' else\n'
|
204 |
|
|
txt += ' CloseCEs=`glite-brokerinfo getCE`\n'
|
205 |
|
|
txt += ' fi\n'
|
206 |
|
|
txt += ' echo "CloseCEs = $CloseCEs"\n'
|
207 |
|
|
txt += ' CE=`echo $CloseCEs | sed -e "s/:.*//"`\n'
|
208 |
|
|
txt += ' echo "CE = $CE"\n'
|
209 |
ewv |
1.37 |
txt += 'elif [ $middleware == OSG ]; then \n'
|
210 |
|
|
txt += ' if [ $OSG_JOB_CONTACT ]; then \n'
|
211 |
|
|
txt += ' CE=`echo $OSG_JOB_CONTACT | /usr/bin/awk -F\/ \'{print $1}\'` \n'
|
212 |
|
|
txt += ' else \n'
|
213 |
|
|
txt += ' echo "ERROR ==> OSG mode in setting CE name from OSG_JOB_CONTACT" \n'
|
214 |
|
|
txt += ' job_exit_code=10099\n'
|
215 |
|
|
txt += ' func_exit\n'
|
216 |
|
|
txt += ' fi \n'
|
217 |
edelmann |
1.102 |
txt += 'elif [ $middleware == ARC ]; then \n'
|
218 |
|
|
txt += ' echo "CE = $NORDUGRID_CE"\n'
|
219 |
ewv |
1.37 |
txt += 'fi \n'
|
220 |
|
|
|
221 |
gutsche |
1.1 |
return txt
|
222 |
|
|
|
223 |
|
|
def wsCopyOutput(self):
|
224 |
|
|
"""
|
225 |
|
|
Write a CopyResults part of a job script, e.g.
|
226 |
|
|
to copy produced output into a storage element.
|
227 |
|
|
"""
|
228 |
spiga |
1.87 |
index = int(common._db.nJobs())
|
229 |
|
|
job = common.job_list[index-1]
|
230 |
|
|
jbt = job.type()
|
231 |
|
|
|
232 |
slacapra |
1.8 |
txt = '\n'
|
233 |
gutsche |
1.1 |
|
234 |
spiga |
1.18 |
txt += '#\n'
|
235 |
|
|
txt += '# COPY OUTPUT FILE TO SE\n'
|
236 |
|
|
txt += '#\n\n'
|
237 |
gutsche |
1.1 |
|
238 |
slacapra |
1.8 |
if int(self.copy_data) == 1:
|
239 |
spiga |
1.70 |
stageout = PhEDExDatasvcInfo(self.cfg_params)
|
240 |
|
|
endpoint, lfn, SE, SE_PATH, user = stageout.getEndpoint()
|
241 |
ewv |
1.101 |
if self.check_RemoteDir == 1 :
|
242 |
spiga |
1.87 |
self.checkRemoteDir(endpoint,jbt.outList('list') )
|
243 |
spiga |
1.70 |
txt += 'export SE='+SE+'\n'
|
244 |
|
|
txt += 'echo "SE = $SE"\n'
|
245 |
slacapra |
1.8 |
txt += 'export SE_PATH='+SE_PATH+'\n'
|
246 |
|
|
txt += 'echo "SE_PATH = $SE_PATH"\n'
|
247 |
spiga |
1.70 |
txt += 'export LFNBaseName='+lfn+'\n'
|
248 |
fanzago |
1.73 |
txt += 'echo "LFNBaseName = $LFNBaseName"\n'
|
249 |
spiga |
1.70 |
txt += 'export USER='+user+'\n'
|
250 |
fanzago |
1.72 |
txt += 'echo "USER = $USER"\n'
|
251 |
spiga |
1.70 |
txt += 'export endpoint='+endpoint+'\n'
|
252 |
fanzago |
1.72 |
txt += 'echo "endpoint = $endpoint"\n'
|
253 |
ewv |
1.79 |
|
254 |
spiga |
1.70 |
txt += 'echo ">>> Copy output files from WN = `hostname` to $SE_PATH :"\n'
|
255 |
farinafa |
1.56 |
txt += 'export TIME_STAGEOUT_INI=`date +%s` \n'
|
256 |
fanzago |
1.17 |
txt += 'copy_exit_status=0\n'
|
257 |
spiga |
1.100 |
cmscp_args = ' --destination $endpoint --inputFileList $file_list'
|
258 |
|
|
cmscp_args +=' --middleware $middleware --lfn $LFNBaseName %s %s '%(self.loc_stage_out,self.debugWrap)
|
259 |
spiga |
1.98 |
txt += 'echo "python cmscp.py %s "\n'%cmscp_args
|
260 |
|
|
txt += 'python cmscp.py %s \n'%cmscp_args
|
261 |
spiga |
1.92 |
if self.debug_wrapper==1:
|
262 |
ewv |
1.80 |
txt += 'echo "which lcg-ls"\n'
|
263 |
|
|
txt += 'which lcg-ls\n'
|
264 |
fanzago |
1.94 |
txt += 'echo "########### details of SE interaction"\n'
|
265 |
spiga |
1.91 |
txt += 'if [ -f .SEinteraction.log ] ;then\n'
|
266 |
|
|
txt += ' cat .SEinteraction.log\n'
|
267 |
|
|
txt += 'else\n'
|
268 |
|
|
txt += ' echo ".SEinteraction.log file not found"\n'
|
269 |
spiga |
1.92 |
txt += 'fi\n'
|
270 |
spiga |
1.97 |
txt += 'echo "#####################################"\n'
|
271 |
fanzago |
1.94 |
|
272 |
spiga |
1.92 |
txt += 'if [ -f cmscpReport.sh ] ;then\n'
|
273 |
fanzago |
1.94 |
txt += ' cat cmscpReport.sh\n'
|
274 |
spiga |
1.92 |
txt += ' source cmscpReport.sh\n'
|
275 |
|
|
txt += 'else\n'
|
276 |
ewv |
1.101 |
txt += ' echo "cmscpReport.sh file not found"\n'
|
277 |
spiga |
1.92 |
txt += ' StageOutExitStatus=60307\n'
|
278 |
|
|
txt += 'fi\n'
|
279 |
spiga |
1.70 |
txt += 'if [ $StageOutExitStatus -ne 0 ]; then\n'
|
280 |
|
|
txt += ' echo "Problem copying file to $SE $SE_PATH"\n'
|
281 |
|
|
txt += ' copy_exit_status=$StageOutExitStatus \n'
|
282 |
spiga |
1.92 |
if not self.debug_wrapper==1:
|
283 |
spiga |
1.91 |
txt += 'if [ -f .SEinteraction.log ] ;then\n'
|
284 |
fanzago |
1.95 |
txt += ' echo "########## contents of SE interaction"\n'
|
285 |
ewv |
1.79 |
txt += ' cat .SEinteraction.log\n'
|
286 |
fanzago |
1.95 |
txt += ' echo "#####################################"\n'
|
287 |
spiga |
1.91 |
txt += 'else\n'
|
288 |
|
|
txt += ' echo ".SEinteraction.log file not found"\n'
|
289 |
spiga |
1.92 |
txt += 'fi\n'
|
290 |
spiga |
1.70 |
txt += ' job_exit_code=$StageOutExitStatus\n'
|
291 |
slacapra |
1.8 |
txt += 'fi\n'
|
292 |
farinafa |
1.56 |
txt += 'export TIME_STAGEOUT_END=`date +%s` \n'
|
293 |
farinafa |
1.55 |
txt += 'let "TIME_STAGEOUT = TIME_STAGEOUT_END - TIME_STAGEOUT_INI" \n'
|
294 |
farinafa |
1.53 |
else:
|
295 |
|
|
# set stageout timing to a fake value
|
296 |
|
|
txt += 'export TIME_STAGEOUT=-1 \n'
|
297 |
gutsche |
1.1 |
return txt
|
298 |
|
|
|
299 |
slacapra |
1.8 |
def userName(self):
|
300 |
|
|
""" return the user name """
|
301 |
ewv |
1.68 |
tmp=runCommand("voms-proxy-info -identity 2>/dev/null")
|
302 |
slacapra |
1.8 |
return tmp.strip()
|
303 |
|
|
|
304 |
gutsche |
1.1 |
def configOpt_(self):
|
305 |
|
|
edg_ui_cfg_opt = ' '
|
306 |
|
|
if self.edg_config:
|
307 |
slacapra |
1.8 |
edg_ui_cfg_opt = ' -c ' + self.edg_config + ' '
|
308 |
|
|
if self.edg_config_vo:
|
309 |
|
|
edg_ui_cfg_opt += ' --config-vo ' + self.edg_config_vo + ' '
|
310 |
gutsche |
1.1 |
return edg_ui_cfg_opt
|
311 |
slacapra |
1.8 |
|
312 |
|
|
|
313 |
|
|
|
314 |
slacapra |
1.34 |
def tags(self):
|
315 |
|
|
task=common._db.getTask()
|
316 |
ewv |
1.37 |
tags_tmp=string.split(task['jobType'],'"')
|
317 |
slacapra |
1.34 |
tags=[str(tags_tmp[1]),str(tags_tmp[3])]
|
318 |
|
|
return tags
|
319 |
ewv |
1.37 |
|