1 |
from Scheduler import Scheduler
|
2 |
from crab_exceptions import *
|
3 |
from crab_util import getLocalDomain
|
4 |
import common
|
5 |
from PhEDExDatasvcInfo import PhEDExDatasvcInfo
|
6 |
|
7 |
import os,string
|
8 |
|
9 |
# Base class for all local scheduler
|
10 |
|
11 |
class SchedulerLocal(Scheduler) :
|
12 |
|
13 |
def configure(self, cfg_params):
|
14 |
self.environment_unique_identifier = None
|
15 |
self.cfg_params = cfg_params
|
16 |
Scheduler.configure(self,cfg_params)
|
17 |
self.jobtypeName = cfg_params['CRAB.jobtype']
|
18 |
|
19 |
name=string.upper(self.name())
|
20 |
self.queue = cfg_params.get(name+'.queue',None)
|
21 |
|
22 |
self.res = cfg_params.get(name+'.resource',None)
|
23 |
|
24 |
# minimal padding time for jobs. For local schedulers is disabled.
|
25 |
# Added for alignment purpose only (and for test) with Grid schedulers
|
26 |
self.minimal_job_duration = 0
|
27 |
|
28 |
if (cfg_params.has_key(self.name()+'.env_id')): self.environment_unique_identifier = cfg_params[self.name()+'.env_id']
|
29 |
## is this ok?
|
30 |
localDomainName = getLocalDomain(self)
|
31 |
if not cfg_params.has_key('GRID.se_white_list'):
|
32 |
cfg_params['GRID.se_white_list']=localDomainName
|
33 |
common.logger.info("Your domain name is "+str(localDomainName)+": only local dataset will be considered")
|
34 |
else:
|
35 |
common.logger.info("Your se_white_list is set to "+str(cfg_params['GRID.se_white_list'])+": only local dataset will be considered")
|
36 |
return
|
37 |
|
38 |
def userName(self):
|
39 |
""" return the user name """
|
40 |
import pwd,getpass
|
41 |
tmp=pwd.getpwnam(getpass.getuser())[4]
|
42 |
return "/CN="+tmp.strip()
|
43 |
|
44 |
def envUniqueID(self):
|
45 |
return
|
46 |
|
47 |
def wsSetupEnvironment(self):
|
48 |
"""
|
49 |
Returns part of a job script which does scheduler-specific work.
|
50 |
"""
|
51 |
taskId = common._db.queryTask('name')
|
52 |
if not self.environment_unique_identifier:
|
53 |
try :
|
54 |
self.environment_unique_identifier = self.envUniqueID()
|
55 |
except :
|
56 |
raise CrabException('environment_unique_identifier not set')
|
57 |
index = int(common._db.nJobs())
|
58 |
job = common.job_list[index-1]
|
59 |
jbt = job.type()
|
60 |
# start with wrapper timing
|
61 |
txt = 'export TIME_WRAP_INI=`date +%s` \n'
|
62 |
txt += 'export TIME_STAGEOUT=-2 \n\n'
|
63 |
|
64 |
txt += '# '+self.name()+' specific stuff\n'
|
65 |
txt += '# strip arguments\n'
|
66 |
txt += 'echo "strip arguments"\n'
|
67 |
txt += 'args=("$@")\n'
|
68 |
txt += 'nargs=$#\n'
|
69 |
txt += 'shift $nargs\n'
|
70 |
txt += "# job number (first parameter for job wrapper)\n"
|
71 |
txt += "NJob=${args[0]}; export NJob\n"
|
72 |
txt += "NResub=${args[1]}; export NResub\n"
|
73 |
txt += "NRand=`getRandSeed`; export NRand\n"
|
74 |
txt += 'OutUniqueID=_$NRand\n'
|
75 |
txt += 'OutUniqueID=_$NResub$OutUniqueID\n'
|
76 |
txt += 'OutUniqueID=$NJob$OutUniqueID; export OutUniqueID\n'
|
77 |
|
78 |
txt += "out_files=out_files_${NJob}; export out_files\n"
|
79 |
txt += "echo $out_files\n"
|
80 |
txt += jbt.outList()
|
81 |
|
82 |
txt += 'SyncGridJobId=`echo '+self.environment_unique_identifier+'`\n'
|
83 |
txt += 'MonitorJobID=`echo ${NJob}_${SyncGridJobId}`\n'
|
84 |
txt += 'MonitorID=`echo ' + taskId + '`\n'
|
85 |
|
86 |
txt += 'echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
|
87 |
txt += 'echo "SyncGridJobId=`echo $SyncGridJobId`" | tee -a $RUNTIME_AREA/$repo \n'
|
88 |
txt += 'echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
|
89 |
txt += 'echo "SyncCE='+self.name()+'.`hostname -d`" | tee -a $RUNTIME_AREA/$repo \n'
|
90 |
|
91 |
txt += 'middleware='+self.name().upper()+' \n'
|
92 |
|
93 |
txt += 'dumpStatus $RUNTIME_AREA/$repo \n'
|
94 |
|
95 |
txt += 'InputSandBox=${args[3]}\n'
|
96 |
|
97 |
txt += '\n\n'
|
98 |
|
99 |
return txt
|
100 |
|
101 |
def wsCopyOutput_comm(self, pool=None):
|
102 |
"""
|
103 |
Write a CopyResults part of a job script, e.g.
|
104 |
to copy produced output into a storage element.
|
105 |
"""
|
106 |
index = int(common._db.nJobs())
|
107 |
job = common.job_list[index-1]
|
108 |
jbt = job.type()
|
109 |
txt = '\n'
|
110 |
if int(self.copy_data) == 1:
|
111 |
|
112 |
stageout = PhEDExDatasvcInfo(self.cfg_params)
|
113 |
endpoint, lfn, SE, SE_PATH, user = stageout.getEndpoint()
|
114 |
print "endpoint = ", endpoint
|
115 |
|
116 |
########################################################
|
117 |
##################### FEDE FOR CAF #####################
|
118 |
cmscp_args = ''
|
119 |
if common.scheduler.name().upper() == 'CAF':
|
120 |
if endpoint.find('root:') != -1:
|
121 |
SE_PATH = '/' + endpoint.split('//')[2]
|
122 |
caf_protocol = 'xrootd'
|
123 |
cmscp_args += '--protocol %s '%caf_protocol
|
124 |
elif endpoint.find('rfio:') != -1:
|
125 |
if endpoint.find('path=') != -1:
|
126 |
SE_PATH = endpoint.split('path=')[1]
|
127 |
else:
|
128 |
SE_PATH = endpoint
|
129 |
caf_protocol = 'rfio'
|
130 |
cmscp_args += '--protocol %s '%caf_protocol
|
131 |
########################################################
|
132 |
########################################################
|
133 |
|
134 |
if self.check_RemoteDir == 1 :
|
135 |
self.checkRemoteDir(endpoint,jbt.outList('list') )
|
136 |
|
137 |
txt += '#\n'
|
138 |
txt += '# COPY OUTPUT FILE TO '+SE_PATH+ '\n'
|
139 |
txt += '#\n\n'
|
140 |
|
141 |
txt += 'export SE='+SE+'\n'
|
142 |
txt += 'echo "SE = $SE"\n'
|
143 |
txt += 'export SE_PATH='+SE_PATH+'\n'
|
144 |
txt += 'echo "SE_PATH = $SE_PATH"\n'
|
145 |
txt += 'export LFNBaseName='+lfn+'\n'
|
146 |
txt += 'echo "LFNBaseName = $LFNBaseName"\n'
|
147 |
txt += 'export USER='+user+'\n'
|
148 |
txt += 'echo "USER = $USER"\n'
|
149 |
#txt += 'export endpoint='+"'"+endpoint+"'"+'\n'
|
150 |
txt += 'export endpoint='+endpoint+'\n'
|
151 |
txt += 'echo "endpoint = $endpoint"\n'
|
152 |
|
153 |
if (pool) and (pool != 'None'):
|
154 |
txt += 'export STAGE_SVCCLASS='+str(pool)+'\n'
|
155 |
|
156 |
txt += 'echo ">>> Copy output files from WN = `hostname` to $SE_PATH :"\n'
|
157 |
txt += 'export TIME_STAGEOUT_INI=`date +%s` \n'
|
158 |
txt += 'copy_exit_status=0\n'
|
159 |
#### FEDE added += #########
|
160 |
cmscp_args += ' --destination $endpoint --inputFileList $file_list'
|
161 |
#######################################################
|
162 |
#######################################################
|
163 |
cmscp_args +=' --middleware $middleware --se_name $SE --for_lfn $LFNBaseName %s %s '%(self.loc_stage_out,self.debugWrap)
|
164 |
txt += 'echo "python cmscp.py %s "\n'%cmscp_args
|
165 |
txt += 'python cmscp.py %s \n'%cmscp_args
|
166 |
if self.debug_wrapper==1:
|
167 |
txt += 'echo "########### details of SE interaction"\n'
|
168 |
txt += 'if [ -f .SEinteraction.log ] ;then\n'
|
169 |
txt += ' cat .SEinteraction.log\n'
|
170 |
txt += 'else\n'
|
171 |
txt += ' echo ".SEinteraction.log file not found"\n'
|
172 |
txt += 'fi\n'
|
173 |
txt += 'echo "#####################################"\n'
|
174 |
txt += 'if [ -f $RUNTIME_AREA/resultCopyFile ] ;then\n'
|
175 |
txt += ' cat $RUNTIME_AREA/resultCopyFile\n'
|
176 |
txt += ' pwd\n'
|
177 |
txt += 'else\n'
|
178 |
### to avoid some 70500 error ....
|
179 |
txt += ' echo "ERROR ==> $RUNTIME_AREA/resultCopyFile file not found. Problem during the stageout"\n'
|
180 |
txt += ' echo "RUNTIME_AREA content: "\n'
|
181 |
txt += ' ls $RUNTIME_AREA \n'
|
182 |
txt += ' job_exit_code=60318\n'
|
183 |
txt += ' func_exit \n'
|
184 |
txt += 'fi\n'
|
185 |
##########################################
|
186 |
|
187 |
txt += 'if [ -f ${RUNTIME_AREA}/cmscpReport.sh ] ;then\n'
|
188 |
txt += ' echo "-------- cat ${RUNTIME_AREA}/cmscpReport.sh "\n'
|
189 |
txt += ' cat ${RUNTIME_AREA}/cmscpReport.sh\n'
|
190 |
txt += ' echo "-------- end of ${RUNTIME_AREA}/cmscpReport.sh "\n'
|
191 |
txt += ' source ${RUNTIME_AREA}/cmscpReport.sh\n'
|
192 |
txt += ' source_result=$? \n'
|
193 |
txt += ' if [ $source_result -ne 0 ]; then\n'
|
194 |
txt += ' echo "problem with the source of cmscpReport.sh file"\n'
|
195 |
txt += ' StageOutExitStatus=60307\n'
|
196 |
txt += ' fi\n'
|
197 |
txt += 'else\n'
|
198 |
txt += ' echo "cmscpReport.sh file not found"\n'
|
199 |
txt += ' StageOutExitStatus=60307\n'
|
200 |
txt += 'fi\n'
|
201 |
txt += 'if [ $StageOutExitStatus -ne 0 ]; then\n'
|
202 |
txt += ' echo "Problem copying file to $SE $SE_PATH"\n'
|
203 |
txt += ' copy_exit_status=$StageOutExitStatus \n'
|
204 |
if not self.debug_wrapper==1:
|
205 |
txt += 'if [ -f .SEinteraction.log ] ;then\n'
|
206 |
txt += ' echo "########## contents of SE interaction"\n'
|
207 |
txt += ' cat .SEinteraction.log\n'
|
208 |
txt += ' echo "#####################################"\n'
|
209 |
txt += 'else\n'
|
210 |
txt += ' echo ".SEinteraction.log file not found"\n'
|
211 |
txt += 'fi\n'
|
212 |
txt += ' job_exit_code=$StageOutExitStatus\n'
|
213 |
txt += 'fi\n'
|
214 |
txt += 'export TIME_STAGEOUT_END=`date +%s` \n'
|
215 |
txt += 'let "TIME_STAGEOUT = TIME_STAGEOUT_END - TIME_STAGEOUT_INI" \n'
|
216 |
else:
|
217 |
txt += 'export TIME_STAGEOUT=-1 \n'
|
218 |
return txt
|