ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerRemoteglidein.py
Revision: 1.5
Committed: Wed Oct 10 21:10:20 2012 UTC (12 years, 6 months ago) by belforte
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_8_3
Changes since 1.4: +24 -0 lines
Log Message:
make sure VOMS FQAN is not changing for remoteGlidein, fixes https://savannah.cern.ch/bugs/?98099

File Contents

# Content
1 """
2 Implements the Remote Glidein scheduler
3 """
4
5 from SchedulerGrid import SchedulerGrid
6 from crab_exceptions import CrabException
7 from crab_util import runCommand
8 from ServerConfig import *
9 from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
10 import Scram
11
12 import common
13 import os
14 import socket
15 import re
16 import commands
17
18 # FUTURE: for python 2.4 & 2.6
19 try:
20 from hashlib import sha1
21 except:
22 from sha import sha as sha1
23
24 class SchedulerRemoteglidein(SchedulerGrid) :
25 """
26 Class to implement the vanilla remote Condor scheduler
27 for a glidein frontend, the created JDL will not work
28 on valilla local condor.
29 Naming convention: Methods starting with 'ws' provide
30 the corresponding part of the job script
31 ('ws' stands for 'write script').
32 """
33
34 def __init__(self):
35 SchedulerGrid.__init__(self,"REMOTEGLIDEIN")
36
37 self.datasetPath = None
38 self.selectNoInput = None
39 self.OSBsize = 50*1000*1000 # 50 MB
40
41 self.environment_unique_identifier = None
42 self.submissionDay = time.strftime("%y%m%d",time.localtime())
43
44 return
45
46
47 def configure(self, cfg_params):
48 """
49 Configure the scheduler with the config settings from the user
50 """
51
52 SchedulerGrid.configure(self, cfg_params)
53
54 self.proxyValid=0
55 self.dontCheckProxy=int(cfg_params.get("GRID.dont_check_proxy",0))
56 self.space_token = cfg_params.get("USER.space_token",None)
57 self.proxyServer= 'myproxy.cern.ch'
58 self.group = cfg_params.get("GRID.group", None)
59 self.role = cfg_params.get("GRID.role", None)
60 self.VO = cfg_params.get('GRID.virtual_organization','cms')
61
62 self.checkProxy()
63
64
65 try:
66 tmp = cfg_params['CMSSW.datasetpath']
67 if tmp.lower() == 'none':
68 self.datasetPath = None
69 self.selectNoInput = 1
70 else:
71 self.datasetPath = tmp
72 self.selectNoInput = 0
73 except KeyError:
74 msg = "Error: datasetpath not defined "
75 raise CrabException(msg)
76
77 if cfg_params.get('GRID.ce_black_list', None) or \
78 cfg_params.get('GRID.ce_white_list', None) :
79 msg="BEWARE: scheduler REMOTEGLIDEIN ignores CE black/white lists."
80 msg+="\n Remove them from crab configuration to proceed."
81 msg+="\n Use GRID.se_white_list and/or GRID.se_black_list instead"
82 raise CrabException(msg)
83
84
85 # make sure proxy FQAN has not changed since last time
86 command = "voms-proxy-info -identity -fqan 2>/dev/null"
87 command += " | head -2"
88 identity = runCommand(command)
89 idfile = common.work_space.shareDir() + "GridIdentity"
90 if os.access(idfile, os.F_OK) :
91 # identity file exists from previous commands
92 f=open(idfile, 'r')
93 idFromFile=f.read()
94 f.close()
95 else :
96 # create it
97 f=open(idfile, 'w')
98 f.write(identity)
99 f.close()
100 idFromFile = identity
101
102 if identity != idFromFile:
103 msg = "Wrong Grid Credentials:\n%s" % identity
104 msg += "\nMake sure you have "
105 msg += " DN, FQAN =\n%s" % idFromFile
106 raise CrabException(msg)
107
108 return
109
110 def userName(self):
111 """ return the user name """
112 tmp=runCommand("voms-proxy-info -identity 2>/dev/null")
113 return tmp.strip()
114
115 def envUniqueID(self):
116 taskHash = sha1(common._db.queryTask('name')).hexdigest()
117 id = "https://" + socket.gethostname() + '/' + taskHash + "/${NJob}"
118 return id
119
120 def sched_parameter(self, i, task):
121 """
122 Return scheduler-specific parameters. Used at crab -submit time
123 by $CRABPYTHON/Scheduler.py
124 """
125
126 #SB paste from crab ScheduerGlidein
127
128 jobParams = ""
129
130 (self.remoteHost,self.remoteUserHost) = self.pickRemoteSubmissionHost(task)
131
132 seDest = task.jobs[i-1]['dlsDestination']
133
134 if seDest == [''] :
135 seDest = self.blackWhiteListParser.expandList("T") # all of SiteDB
136
137 seString=self.blackWhiteListParser.cleanForBlackWhiteList(seDest)
138
139 jobParams += '+DESIRED_SEs = "'+seString+'"; '
140
141 scram = Scram.Scram(None)
142 cmsVersion = scram.getSWVersion()
143 scramArch = scram.getArch()
144
145 cmsver=re.split('_', cmsVersion)
146 numericCmsVersion = "%s%.2d%.2d" %(cmsver[1], int(cmsver[2]), int(cmsver[3]))
147
148 jobParams += '+DESIRED_CMSVersion ="' +cmsVersion+'";'
149 jobParams += '+DESIRED_CMSVersionNr ="' +numericCmsVersion+'";'
150 jobParams += '+DESIRED_CMSScramArch ="' +scramArch+'";'
151
152 myscheddName = self.remoteHost
153 jobParams += '+Glidein_MonitorID = "https://'+ myscheddName + \
154 '//' + self.submissionDay + '//$(Cluster).$(Process)"; '
155
156 if (self.EDG_clock_time):
157 jobParams += '+MaxWallTimeMins = '+self.EDG_clock_time+'; '
158 else:
159 jobParams += '+MaxWallTimeMins = %d; ' % (60*24)
160
161 common._db.updateTask_({'jobType':jobParams})
162
163
164 return jobParams
165
166
167 def realSchedParams(self, cfg_params):
168 """
169 Return dictionary with specific parameters, to use with real scheduler
170 is called when scheduler is initialized in Boss, i.e. at each crab command
171 """
172 #SB this method is used to pass directory names to Boss Scheduler
173 # via params dictionary
174
175 jobDir = common.work_space.jobDir()
176 taskDir=common.work_space.topDir().split('/')[-2]
177 shareDir = common.work_space.shareDir()
178
179 params = {'shareDir':shareDir,
180 'jobDir':jobDir,
181 'taskDir':taskDir,
182 'submissionDay':self.submissionDay}
183
184 return params
185
186
187 def listMatch(self, seList, full):
188 """
189 Check the compatibility of available resources
190 """
191
192 return [True]
193
194
195 def decodeLogInfo(self, fileName):
196 """
197 Parse logging info file and return main info
198 """
199
200 import CondorGLoggingInfo
201 loggingInfo = CondorGLoggingInfo.CondorGLoggingInfo()
202 reason = loggingInfo.decodeReason(fileName)
203 return reason
204
205
206 # def wsCopyOutput(self):
207 # """
208 # Write a CopyResults part of a job script, e.g.
209 # to copy produced output into a storage element.
210 # """
211 # txt = self.wsCopyOutput()
212 # return txt
213
214
215 def wsExitFunc(self):
216 """
217 Returns the part of the job script which runs prior to exit
218 """
219
220 txt = '\n'
221 txt += '#\n'
222 txt += '# EXECUTE THIS FUNCTION BEFORE EXIT \n'
223 txt += '#\n\n'
224
225 txt += 'func_exit() { \n'
226 txt += self.wsExitFunc_common()
227
228 txt += ' tar zcvf ${out_files}.tgz ${final_list}\n'
229 txt += ' tmp_size=`ls -gGrta ${out_files}.tgz | awk \'{ print $3 }\'`\n'
230 txt += ' rm ${out_files}.tgz\n'
231 txt += ' size=`expr $tmp_size`\n'
232 txt += ' echo "Total Output dimension: $size"\n'
233 txt += ' limit='+str(self.OSBsize) +' \n'
234 txt += ' echo "WARNING: output files size limit is set to: $limit"\n'
235 txt += ' if [ "$limit" -lt "$size" ]; then\n'
236 txt += ' exceed=1\n'
237 txt += ' job_exit_code=70000\n'
238 txt += ' echo "Output Sanbox too big. Produced output is lost "\n'
239 txt += ' else\n'
240 txt += ' exceed=0\n'
241 txt += ' echo "Total Output dimension $size is fine."\n'
242 txt += ' fi\n'
243
244 txt += ' echo "JOB_EXIT_STATUS = $job_exit_code"\n'
245 txt += ' echo "JobExitCode=$job_exit_code" >> $RUNTIME_AREA/$repo\n'
246 txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
247 txt += ' if [ $exceed -ne 1 ]; then\n'
248 txt += ' tar zcvf ${out_files}.tgz ${final_list}\n'
249 txt += ' else\n'
250 txt += ' tar zcvf ${out_files}.tgz CMSSW_${NJob}.stdout CMSSW_${NJob}.stderr\n'
251 txt += ' fi\n'
252 txt += ' python $RUNTIME_AREA/fillCrabFjr.py $RUNTIME_AREA/crab_fjr_$NJob.xml --errorcode $job_exit_code \n'
253
254 txt += ' exit $job_exit_code\n'
255 txt += '}\n'
256
257 return txt
258
259
260 def sched_fix_parameter(self):
261 """
262 Returns string with requirements and scheduler-specific parameters
263 """
264
265 if self.EDG_requirements:
266 req = self.EDG_requirements
267 taskReq = {'commonRequirements':req}
268 common._db.updateTask_(taskReq)
269
270 def pickRemoteSubmissionHost(self, task):
271
272 task = common._db.getTask()
273
274 if task['serverName']!=None and task['serverName']!="":
275 # remoteHost is already defined and stored for this task
276 # so pick it from DB
277 # cast to string to avoid issues with unicode :-(
278 remoteUserHost=str(task['serverName'])
279 common.logger.info("serverName from Task DB is %s" %
280 remoteUserHost)
281 if '@' in remoteUserHost:
282 remoteHost = remoteUserHost.split('@')[1]
283 else:
284 remoteHost = remoteUserHost
285 else:
286 if self.cfg_params.has_key('CRAB.submit_host'):
287 # get a remote submission host from crab config file
288 srvCfg=ServerConfig(self.cfg_params['CRAB.submit_host']).config()
289 remoteHost=srvCfg['serverName']
290 common.logger.info("remotehost from crab.cfg = %s" % remoteHost)
291 else:
292 # pick from Available Servers List
293 srvCfg=ServerConfig('default').config()
294 remoteHost = srvCfg['serverName']
295 common.logger.info("remotehost from Avail.List = %s" % remoteHost)
296
297 if not remoteHost:
298 raise CrabException('FATAL ERROR: remoteHost not defined')
299
300 #common.logger.info("try to find out username for remote Host via uberftp ...")
301 #command="uberftp %s pwd|grep User|awk '{print $3}'" % remoteHost
302 #(status, output) = commands.getstatusoutput(command)
303 #if status == 0:
304 # remoteUser = output
305 # common.logger.info("remoteUser set to %s" % remoteUser)
306 # if remoteUser==None:
307 # raise CrabException('FATAL ERROR: REMOTE USER not defined')
308
309 #remoteUserHost = remoteUser + '@' + remoteHost
310 remoteUserHost = remoteHost
311
312 common._db.updateTask_({'serverName':remoteUserHost})
313
314 return (remoteHost, remoteUserHost)