5 |
|
from SchedulerGrid import SchedulerGrid |
6 |
|
from crab_exceptions import CrabException |
7 |
|
from crab_util import runCommand |
8 |
< |
#from WMCore.SiteScreening.BlackWhiteListParser import CEBlackWhiteListParser |
8 |
> |
from ServerConfig import * |
9 |
|
from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser |
10 |
< |
|
10 |
> |
import Scram |
11 |
|
|
12 |
|
|
13 |
|
import common |
14 |
|
import os |
15 |
|
import socket |
16 |
+ |
import re |
17 |
+ |
import commands |
18 |
|
|
19 |
|
# FUTURE: for python 2.4 & 2.6 |
20 |
|
try: |
24 |
|
|
25 |
|
class SchedulerRcondor(SchedulerGrid) : |
26 |
|
""" |
27 |
< |
Class to implement the vanilla (local) Condor scheduler |
27 |
> |
Class to implement the vanilla remote Condor scheduler |
28 |
|
Naming convention: Methods starting with 'ws' provide |
29 |
|
the corresponding part of the job script |
30 |
|
('ws' stands for 'write script'). |
32 |
|
|
33 |
|
def __init__(self): |
34 |
|
SchedulerGrid.__init__(self,"RCONDOR") |
35 |
+ |
|
36 |
|
self.datasetPath = None |
37 |
|
self.selectNoInput = None |
38 |
|
self.OSBsize = 50*1000*1000 # 50 MB |
39 |
|
|
40 |
|
self.environment_unique_identifier = None |
41 |
+ |
|
42 |
|
return |
43 |
|
|
44 |
|
|
46 |
|
""" |
47 |
|
Configure the scheduler with the config settings from the user |
48 |
|
""" |
49 |
+ |
|
50 |
+ |
# task = common._db.getTask() |
51 |
+ |
# #print task.__dict__ |
52 |
+ |
# |
53 |
+ |
# if task['serverName']!=None and task['serverName']!="": |
54 |
+ |
# # cast to string to avoid issues with unicode :-( |
55 |
+ |
# self.rcondorUserHost=str(task['serverName']) |
56 |
+ |
# common.logger.info("serverName from Task DB is %s" % |
57 |
+ |
# self.rcondorUserHost) |
58 |
+ |
# else : |
59 |
+ |
# # get an rcondor host from config and save |
60 |
+ |
# common.logger.info("no serverName in Task DB, use env.var.") |
61 |
+ |
# |
62 |
+ |
# self.rcondorHost = os.getenv('RCONDOR_HOST') |
63 |
+ |
# if not self.rcondorHost : |
64 |
+ |
# raise CrabException('FATAL ERROR: env.var RCONDOR_HOST not defined') |
65 |
+ |
# self.rcondorUser = os.getenv('RCONDOR_USER') |
66 |
+ |
# if not self.rcondorUser : |
67 |
+ |
# common.logger.info("$RCONDOR_USER not defined, try to find out via uberftp ...") |
68 |
+ |
# command="uberftp $RCONDOR_HOST pwd|grep User|awk '{print $3}'" |
69 |
+ |
# (status, output) = commands.getstatusoutput(command) |
70 |
+ |
# if status == 0: |
71 |
+ |
# self.rcondorUser = output |
72 |
+ |
# common.logger.info("rcondorUser set to %s" % self.rcondorUser) |
73 |
+ |
# if self.rcondorUser==None: |
74 |
+ |
# raise CrabException('FATAL ERROR: RCONDOR_USER not defined') |
75 |
+ |
# |
76 |
+ |
# self.rcondorUserHost = self.rcondorUser + '@' + self.rcondorHost |
77 |
+ |
# |
78 |
+ |
# print "will set server name to : ", self.rcondorUserHost |
79 |
+ |
# common._db.updateTask_({'serverName':self.rcondorUserHost}) |
80 |
+ |
# print "ok" |
81 |
|
|
82 |
|
SchedulerGrid.configure(self, cfg_params) |
83 |
|
|
84 |
|
self.proxyValid=0 |
85 |
|
self.dontCheckProxy=int(cfg_params.get("GRID.dont_check_proxy",0)) |
86 |
|
self.space_token = cfg_params.get("USER.space_token",None) |
87 |
< |
try: |
52 |
< |
self.proxyServer = Downloader("http://cmsdoc.cern.ch/cms/LCG/crab/config/").config("myproxy_server.conf") |
53 |
< |
self.proxyServer = self.proxyServer.strip() |
54 |
< |
if self.proxyServer is None: |
55 |
< |
raise CrabException("myproxy_server.conf retrieved but empty") |
56 |
< |
except Exception, e: |
57 |
< |
common.logger.info("Problem setting myproxy server endpoint: using myproxy.cern.ch") |
58 |
< |
common.logger.debug(e) |
59 |
< |
self.proxyServer= 'myproxy.cern.ch' |
87 |
> |
self.proxyServer= 'myproxy.cern.ch' |
88 |
|
self.group = cfg_params.get("GRID.group", None) |
89 |
|
self.role = cfg_params.get("GRID.role", None) |
90 |
|
self.VO = cfg_params.get('GRID.virtual_organization','cms') |
91 |
|
|
92 |
+ |
self.checkProxy() |
93 |
+ |
|
94 |
+ |
|
95 |
|
try: |
96 |
|
tmp = cfg_params['CMSSW.datasetpath'] |
97 |
|
if tmp.lower() == 'none': |
104 |
|
msg = "Error: datasetpath not defined " |
105 |
|
raise CrabException(msg) |
106 |
|
|
107 |
< |
self.checkProxy() |
107 |
> |
if cfg_params.get('GRID.ce_black_list', None) or \ |
108 |
> |
cfg_params.get('GRID.ce_white_list', None) : |
109 |
> |
msg="BEWARE: scheduler RGLIDEIN ignores CE black/white lists." |
110 |
> |
msg+="\n Remove them from crab configuration to proceed." |
111 |
> |
msg+="\n Use GRID.se_white_list and/or GRID.se_black_list instead" |
112 |
> |
raise CrabException(msg) |
113 |
|
|
114 |
|
return |
115 |
|
|
126 |
|
def sched_parameter(self, i, task): |
127 |
|
""" |
128 |
|
Return scheduler-specific parameters. Used at crab -submit time |
129 |
+ |
by $CRABPYTHON/Scheduler.py |
130 |
|
""" |
131 |
|
|
132 |
|
#SB paste from crab ScheduerGlidein |
133 |
|
|
134 |
|
jobParams = "" |
135 |
|
|
136 |
+ |
(self.rcondorHost,self.rcondorUserHost) = self.pickRcondorSubmissionHost(task) |
137 |
+ |
|
138 |
|
seDest = task.jobs[i-1]['dlsDestination'] |
139 |
|
|
140 |
|
if seDest == [''] : |
144 |
|
|
145 |
|
jobParams += '+DESIRED_SEs = "'+seString+'"; ' |
146 |
|
|
147 |
+ |
scram = Scram.Scram(None) |
148 |
+ |
cmsVersion = scram.getSWVersion() |
149 |
+ |
scramArch = scram.getArch() |
150 |
+ |
|
151 |
+ |
cmsver=re.split('_', cmsVersion) |
152 |
+ |
numericCmsVersion = "%s%.2d%.2d" %(cmsver[1], int(cmsver[2]), int(cmsver[3])) |
153 |
+ |
|
154 |
+ |
jobParams += '+DESIRED_CMSVersion ="' +cmsVersion+'";' |
155 |
+ |
jobParams += '+DESIRED_CMSVersionNr ="' +numericCmsVersion+'";' |
156 |
+ |
jobParams += '+DESIRED_CMSScramArch ="' +scramArch+'";' |
157 |
+ |
|
158 |
+ |
myscheddName = self.rcondorHost |
159 |
+ |
jobParams += '+Glidein_MonitorID = "https://'+ myscheddName + '//$(Cluster).$(Process)"; ' |
160 |
+ |
|
161 |
|
if (self.EDG_clock_time): |
162 |
|
jobParams += '+MaxWallTimeMins = '+self.EDG_clock_time+'; ' |
163 |
|
else: |
174 |
|
Return dictionary with specific parameters, to use with real scheduler |
175 |
|
is called when scheduler is initialized in Boss, i.e. at each crab command |
176 |
|
""" |
177 |
+ |
#SB this method is used to pass directory names to Boss Scheduler |
178 |
+ |
# via params dictionary |
179 |
|
|
125 |
– |
tmpDir = os.path.join(common.work_space.shareDir(),'.condor_temp') |
126 |
– |
tmpDir = os.path.join(common.work_space.shareDir(),'.condor_temp') |
180 |
|
jobDir = common.work_space.jobDir() |
128 |
– |
|
181 |
|
taskDir=common.work_space.topDir().split('/')[-2] |
182 |
< |
rcondorDir ='%s/.rcondor/%s/mount/' % (os.getenv('HOME'),os.getenv('RCONDOR_HOST')) |
131 |
< |
tmpDir = os.path.join(rcondorDir,taskDir) |
132 |
< |
tmpDir = os.path.join(tmpDir,'condor_temp') |
182 |
> |
shareDir = common.work_space.shareDir() |
183 |
|
|
184 |
< |
params = {'tmpDir':tmpDir, |
185 |
< |
'jobDir':jobDir} |
184 |
> |
params = {'shareDir':shareDir, |
185 |
> |
'jobDir':jobDir, |
186 |
> |
'taskDir':taskDir} |
187 |
|
|
188 |
|
return params |
189 |
|
|
229 |
|
txt += 'func_exit() { \n' |
230 |
|
txt += self.wsExitFunc_common() |
231 |
|
|
181 |
– |
#txt += ' cp ${out_files}.tgz $_CONDOR_SCRATCH_DIR/\n' |
182 |
– |
#txt += ' cp CMSSW_$NJob.stdout $_CONDOR_SCRATCH_DIR/\n' |
183 |
– |
#txt += ' cp CMSSW_$NJob.stderr $_CONDOR_SCRATCH_DIR/\n' |
184 |
– |
#txt += ' cp Watchdog_$NJob.log.gz $_CONDOR_SCRATCH_DIR/\n' |
185 |
– |
#txt += ' cp crab_fjr_$NJob.xml $_CONDOR_SCRATCH_DIR/\n' |
186 |
– |
|
187 |
– |
|
188 |
– |
### specific Glite check for OSB |
232 |
|
txt += ' tar zcvf ${out_files}.tgz ${final_list}\n' |
233 |
|
txt += ' tmp_size=`ls -gGrta ${out_files}.tgz | awk \'{ print $3 }\'`\n' |
234 |
|
txt += ' rm ${out_files}.tgz\n' |
271 |
|
taskReq = {'commonRequirements':req} |
272 |
|
common._db.updateTask_(taskReq) |
273 |
|
|
274 |
+ |
def pickRcondorSubmissionHost(self, task): |
275 |
+ |
|
276 |
+ |
task = common._db.getTask() |
277 |
|
|
278 |
< |
# presa di brutto da SchedulerGrid.py |
279 |
< |
""" |
280 |
< |
|
281 |
< |
def wsSetupEnvironment(self): |
278 |
> |
if task['serverName']!=None and task['serverName']!="": |
279 |
> |
# rcondorHost is already defined and stored for this task |
280 |
> |
# so pick it from DB |
281 |
> |
# cast to string to avoid issues with unicode :-( |
282 |
> |
rcondorUserHost=str(task['serverName']) |
283 |
> |
common.logger.info("serverName from Task DB is %s" % |
284 |
> |
rcondorUserHost) |
285 |
> |
if '@' in rcondorUserHost: |
286 |
> |
rcondorHost = rcondorUserHost.split('@')[1] |
287 |
> |
else: |
288 |
> |
rcondorHost = rcondorUserHost |
289 |
> |
else: |
290 |
> |
if self.cfg_params.has_key('CRAB.submit_host'): |
291 |
> |
# get an rcondor host from crab config file |
292 |
> |
srvCfg=ServerConfig(self.cfg_params['CRAB.submit_host']).config() |
293 |
> |
rcondorHost=srvCfg['serverName'] |
294 |
> |
common.logger.info("rcondorhost from crab.cfg = %s" % rcondorHost) |
295 |
> |
else: |
296 |
> |
# pick from Available Servers List |
297 |
> |
srvCfg=ServerConfig('default').config() |
298 |
> |
print srvCfg |
299 |
> |
rcondorHost = srvCfg['serverName'] |
300 |
> |
common.logger.info("rcondorhost from Avail.List = %s" % rcondorHost) |
301 |
> |
|
302 |
> |
if not rcondorHost: |
303 |
> |
raise CrabException('FATAL ERROR: condorHost not defined') |
304 |
> |
# fall back to env. |
305 |
> |
#common.logger.info("no serverName in Task DB, use env.var.") |
306 |
> |
#rcondorHost = os.getenv('RCONDOR_HOST') |
307 |
> |
#if not rcondorHost : |
308 |
> |
# raise CrabException('FATAL ERROR: env.var RCONDOR_HOST not defined') |
309 |
> |
|
310 |
> |
#rcondorUser = os.getenv('RCONDOR_USER') |
311 |
> |
#if not rcondorUser : |
312 |
> |
common.logger.info("try to find out RCONDOR_USER via uberftp ...") |
313 |
> |
command="uberftp %s pwd|grep User|awk '{print $3}'" % rcondorHost |
314 |
> |
(status, output) = commands.getstatusoutput(command) |
315 |
> |
if status == 0: |
316 |
> |
rcondorUser = output |
317 |
> |
common.logger.info("rcondorUser set to %s" % rcondorUser) |
318 |
> |
if rcondorUser==None: |
319 |
> |
raise CrabException('FATAL ERROR: RCONDOR_USER not defined') |
320 |
|
|
321 |
< |
#Returns part of a job script which does scheduler-specific work. |
321 |
> |
rcondorUserHost = rcondorUser + '@' + rcondorHost |
322 |
|
|
323 |
< |
taskId =common._db.queryTask('name') |
324 |
< |
index = int(common._db.nJobs()) |
241 |
< |
job = common.job_list[index-1] |
242 |
< |
jbt = job.type() |
243 |
< |
if not self.environment_unique_identifier: |
244 |
< |
try : |
245 |
< |
self.environment_unique_identifier = self.envUniqueID() |
246 |
< |
except : |
247 |
< |
raise CrabException('environment_unique_identifier not set') |
248 |
< |
|
249 |
< |
# start with wrapper timing |
250 |
< |
txt = 'export TIME_WRAP_INI=`date +%s` \n' |
251 |
< |
txt += 'export TIME_STAGEOUT=-2 \n\n' |
252 |
< |
txt += '# '+self.name()+' specific stuff\n' |
253 |
< |
txt += '# strip arguments\n' |
254 |
< |
txt += 'echo "strip arguments"\n' |
255 |
< |
txt += 'args=("$@")\n' |
256 |
< |
txt += 'nargs=$#\n' |
257 |
< |
txt += 'shift $nargs\n' |
258 |
< |
txt += "# job number (first parameter for job wrapper)\n" |
259 |
< |
txt += "NJob=${args[0]}; export NJob\n" |
260 |
< |
txt += "NResub=${args[1]}; export NResub\n" |
261 |
< |
txt += "NRand=`getRandSeed`; export NRand\n" |
262 |
< |
# append random code |
263 |
< |
txt += 'OutUniqueID=_$NRand\n' |
264 |
< |
txt += 'OutUniqueID=_$NResub$OutUniqueID\n' |
265 |
< |
txt += 'OutUniqueID=$NJob$OutUniqueID; export OutUniqueID\n' |
266 |
< |
txt += 'CRAB_UNIQUE_JOB_ID=%s_${OutUniqueID}; export CRAB_UNIQUE_JOB_ID\n' % taskId |
267 |
< |
txt += 'echo env var CRAB_UNIQUE_JOB_ID set to: ${CRAB_UNIQUE_JOB_ID}\n' |
268 |
< |
# if we want to prepend |
269 |
< |
#txt += 'OutUniqueID=_$NResub\n' |
270 |
< |
#txt += 'OutUniqueID=_$NJob$OutUniqueID\n' |
271 |
< |
#txt += 'OutUniqueID=$NRand$OutUniqueID; export OutUniqueID\n' |
272 |
< |
|
273 |
< |
txt += "out_files=out_files_${NJob}; export out_files\n" |
274 |
< |
txt += "echo $out_files\n" |
275 |
< |
txt += jbt.outList() |
276 |
< |
# txt += 'if [ $JobRunCount ] && [ `expr $JobRunCount - 1` -gt 0 ] && [ $Glidein_MonitorID ]; then \n' |
277 |
< |
txt += 'if [ $Glidein_MonitorID ]; then \n' |
278 |
< |
# txt += ' attempt=`expr $JobRunCount - 1` \n' |
279 |
< |
# txt += ' MonitorJobID=${NJob}_${Glidein_MonitorID}__${attempt}\n' |
280 |
< |
# txt += ' SyncGridJobId=${Glidein_MonitorID}__${attempt}\n' |
281 |
< |
txt += ' MonitorJobID=${NJob}_${Glidein_MonitorID}\n' |
282 |
< |
txt += ' SyncGridJobId=${Glidein_MonitorID}\n' |
283 |
< |
txt += 'else \n' |
284 |
< |
txt += ' MonitorJobID=${NJob}_'+self.environment_unique_identifier+'\n' |
285 |
< |
txt += ' SyncGridJobId='+self.environment_unique_identifier+'\n' |
286 |
< |
txt += 'fi\n' |
287 |
< |
txt += 'MonitorID='+taskId+'\n' |
288 |
< |
txt += 'echo "MonitorJobID=$MonitorJobID" > $RUNTIME_AREA/$repo \n' |
289 |
< |
txt += 'echo "SyncGridJobId=$SyncGridJobId" >> $RUNTIME_AREA/$repo \n' |
290 |
< |
txt += 'echo "MonitorID=$MonitorID" >> $RUNTIME_AREA/$repo\n' |
291 |
< |
|
292 |
< |
txt += 'echo ">>> GridFlavour discovery: " \n' |
293 |
< |
txt += 'if [ $OSG_GRID ]; then \n' |
294 |
< |
txt += ' middleware=OSG \n' |
295 |
< |
txt += ' if [ $OSG_JOB_CONTACT ]; then \n' |
296 |
< |
txt += ' SyncCE="$OSG_JOB_CONTACT"; \n' |
297 |
< |
txt += ' echo "SyncCE=$SyncCE" >> $RUNTIME_AREA/$repo ;\n' |
298 |
< |
txt += ' else\n' |
299 |
< |
txt += ' echo "not reporting SyncCE";\n' |
300 |
< |
txt += ' fi\n'; |
301 |
< |
txt += ' echo "GridFlavour=$middleware" | tee -a $RUNTIME_AREA/$repo \n' |
302 |
< |
txt += ' echo "source OSG GRID setup script" \n' |
303 |
< |
txt += ' source $OSG_GRID/setup.sh \n' |
304 |
< |
txt += 'elif [ $NORDUGRID_CE ]; then \n' # We look for $NORDUGRID_CE before $VO_CMS_SW_DIR, |
305 |
< |
txt += ' middleware=ARC \n' # because the latter is defined for ARC too |
306 |
< |
txt += ' echo "SyncCE=${NORDUGRID_CE}:2811/nordugrid-GE-${QUEUE:-queue}" >> $RUNTIME_AREA/$repo \n' |
307 |
< |
txt += ' echo "GridFlavour=$middleware" | tee -a $RUNTIME_AREA/$repo \n' |
308 |
< |
txt += 'elif [ $VO_CMS_SW_DIR ]; then \n' |
309 |
< |
txt += ' middleware=LCG \n' |
310 |
< |
txt += ' if [ $GLIDEIN_Gatekeeper ]; then \n' |
311 |
< |
txt += ' echo "SyncCE=`echo $GLIDEIN_Gatekeeper | sed -e s/:2119//`" >> $RUNTIME_AREA/$repo \n' |
312 |
< |
txt += ' else \n' |
313 |
< |
txt += ' echo "SyncCE=`glite-brokerinfo getCE`" >> $RUNTIME_AREA/$repo \n' |
314 |
< |
txt += ' fi \n' |
315 |
< |
txt += ' echo "GridFlavour=$middleware" | tee -a $RUNTIME_AREA/$repo \n' |
316 |
< |
txt += 'else \n' |
317 |
< |
txt += ' echo "ERROR ==> GridFlavour not identified" \n' |
318 |
< |
txt += ' job_exit_code=10030 \n' |
319 |
< |
txt += ' func_exit \n' |
320 |
< |
txt += 'fi \n' |
321 |
< |
|
322 |
< |
txt += 'dumpStatus $RUNTIME_AREA/$repo \n' |
323 |
< |
txt += '\n\n' |
324 |
< |
|
325 |
< |
|
326 |
< |
txt += 'export VO='+self.VO+'\n' |
327 |
< |
txt += 'if [ $middleware == LCG ]; then\n' |
328 |
< |
txt += ' if [ $GLIDEIN_Gatekeeper ]; then\n' |
329 |
< |
txt += ' CloseCEs=$GLIDEIN_Gatekeeper \n' |
330 |
< |
txt += ' else\n' |
331 |
< |
txt += ' CloseCEs=`glite-brokerinfo getCE`\n' |
332 |
< |
txt += ' fi\n' |
333 |
< |
txt += ' echo "CloseCEs = $CloseCEs"\n' |
334 |
< |
txt += ' CE=`echo $CloseCEs | sed -e "s/:.*//"`\n' |
335 |
< |
txt += ' echo "CE = $CE"\n' |
336 |
< |
txt += 'elif [ $middleware == OSG ]; then \n' |
337 |
< |
txt += ' if [ $OSG_JOB_CONTACT ]; then \n' |
338 |
< |
txt += ' CE=`echo $OSG_JOB_CONTACT | /usr/bin/awk -F\/ \'{print $1}\'` \n' |
339 |
< |
txt += ' else \n' |
340 |
< |
txt += ' echo "ERROR ==> OSG mode in setting CE name from OSG_JOB_CONTACT" \n' |
341 |
< |
txt += ' job_exit_code=10099\n' |
342 |
< |
txt += ' func_exit\n' |
343 |
< |
txt += ' fi \n' |
344 |
< |
txt += 'elif [ $middleware == ARC ]; then \n' |
345 |
< |
txt += ' echo "CE = $NORDUGRID_CE"\n' |
346 |
< |
txt += 'fi \n' |
323 |
> |
print "will set server name to : ", rcondorUserHost |
324 |
> |
common._db.updateTask_({'serverName':rcondorUserHost}) |
325 |
|
|
326 |
< |
return txt |
349 |
< |
""" |
326 |
> |
return (rcondorHost, rcondorUserHost) |