ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerRemoteglidein.py
Revision: 1.13
Committed: Sat Jan 5 16:09:36 2013 UTC (12 years, 3 months ago) by belforte
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_8_4_patch3, CRAB_2_8_5_pre2, CRAB_2_8_4_patch2, CRAB_2_8_5_pre1
Changes since 1.12: +1 -1 lines
Log Message:
default max RSS 2000 MB consistent with factory setting, see https://savannah.cern.ch/bugs/index.php?99657

File Contents

# Content
1 """
2 Implements the Remote Glidein scheduler
3 """
4
5 from SchedulerGrid import SchedulerGrid
6 from crab_exceptions import CrabException
7 from crab_util import runCommand
8 from ServerConfig import *
9 from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
10 import Scram
11
12 import common
13 import os
14 import socket
15 import re
16 import commands
17
18 # FUTURE: for python 2.4 & 2.6
19 try:
20 from hashlib import sha1
21 except:
22 from sha import sha as sha1
23
24 class SchedulerRemoteglidein(SchedulerGrid) :
25 """
26 Class to implement the vanilla remote Condor scheduler
27 for a glidein frontend, the created JDL will not work
28 on valilla local condor.
29 Naming convention: Methods starting with 'ws' provide
30 the corresponding part of the job script
31 ('ws' stands for 'write script').
32 """
33
34 def __init__(self):
35 SchedulerGrid.__init__(self,"REMOTEGLIDEIN")
36
37 self.datasetPath = None
38 self.selectNoInput = None
39 self.OSBsize = 50*1000*1000 # 50 MB
40
41 self.environment_unique_identifier = None
42 self.submissionDay = time.strftime("%y%m%d",time.localtime())
43
44 return
45
46
47 def configure(self, cfg_params):
48 """
49 Configure the scheduler with the config settings from the user
50 """
51
52 SchedulerGrid.configure(self, cfg_params)
53
54 self.proxyValid=0
55 self.dontCheckProxy=int(cfg_params.get("GRID.dont_check_proxy",'0'))
56 self.space_token = cfg_params.get("USER.space_token",None)
57 self.proxyServer= 'myproxy.cern.ch'
58 self.group = cfg_params.get("GRID.group", None)
59 self.role = cfg_params.get("GRID.role", None)
60 self.VO = cfg_params.get('GRID.virtual_organization','cms')
61 self.allowOverflow = cfg_params.get('GRID.allow_overflow', '1')
62 self.max_rss = cfg_params.get('GRID.max_rss','2000')
63
64 self.checkProxy()
65
66
67 try:
68 tmp = cfg_params['CMSSW.datasetpath']
69 if tmp.lower() == 'none':
70 self.datasetPath = None
71 self.selectNoInput = 1
72 else:
73 self.datasetPath = tmp
74 self.selectNoInput = 0
75 except KeyError:
76 msg = "Error: datasetpath not defined "
77 raise CrabException(msg)
78
79 if cfg_params.get('GRID.ce_black_list', None) or \
80 cfg_params.get('GRID.ce_white_list', None) :
81 msg="BEWARE: scheduler REMOTEGLIDEIN ignores CE black/white lists."
82 msg+="\n Remove them from crab configuration to proceed."
83 msg+="\n Use GRID.se_white_list and/or GRID.se_black_list instead"
84 raise CrabException(msg)
85
86
87 # make sure proxy FQAN has not changed since last time
88 command = "voms-proxy-info -identity -fqan 2>/dev/null"
89 command += " | head -2"
90 identity = runCommand(command)
91 idfile = common.work_space.shareDir() + "GridIdentity"
92 if os.access(idfile, os.F_OK) :
93 # identity file exists from previous commands
94 f=open(idfile, 'r')
95 idFromFile=f.read()
96 f.close()
97 else :
98 # create it
99 f=open(idfile, 'w')
100 f.write(identity)
101 f.close()
102 idFromFile = identity
103
104 if identity != idFromFile:
105 msg = "Wrong Grid Credentials:\n%s" % identity
106 msg += "\nMake sure you have "
107 msg += " DN, FQAN =\n%s" % idFromFile
108 raise CrabException(msg)
109
110 return
111
112 def userName(self):
113 """ return the user name """
114 tmp=runCommand("voms-proxy-info -identity 2>/dev/null")
115 return tmp.strip()
116
117 def envUniqueID(self):
118 taskHash = sha1(common._db.queryTask('name')).hexdigest()
119 id = "https://" + socket.gethostname() + '/' + taskHash + "/${NJob}"
120 return id
121
122 def sched_parameter(self, i, task):
123 """
124 Return scheduler-specific parameters. Used at crab -submit time
125 by $CRABPYTHON/Scheduler.py
126 """
127
128 #SB paste from crab SchedulerGlidein
129
130 jobParams = ""
131
132 (self.remoteHost,self.remoteUserHost) = self.pickRemoteSubmissionHost(task)
133
134 seDest = task.jobs[i-1]['dlsDestination']
135
136 if seDest == [''] :
137 seDest = self.blackWhiteListParser.expandList("T") # all of SiteDB
138
139 seString=self.blackWhiteListParser.cleanForBlackWhiteList(seDest)
140
141 jobParams += '+DESIRED_SEs = "'+seString+'"; '
142
143 scram = Scram.Scram(None)
144 cmsVersion = scram.getSWVersion()
145 scramArch = scram.getArch()
146
147 cmsver=re.split('_', cmsVersion)
148 numericCmsVersion = "%s%.2d%.2d" %(cmsver[1], int(cmsver[2]), int(cmsver[3]))
149
150 jobParams += '+DESIRED_CMSVersion ="' +cmsVersion+'";'
151 jobParams += '+DESIRED_CMSVersionNr ="' +numericCmsVersion+'";'
152 jobParams += '+DESIRED_CMSScramArch ="' +scramArch+'";'
153
154 myscheddName = self.remoteHost
155 jobParams += '+Glidein_MonitorID = "https://'+ myscheddName + \
156 '//' + self.submissionDay + '//$(Cluster).$(Process)"; '
157
158 if (self.EDG_clock_time):
159 glideinTime = "%d" % (int(self.EDG_clock_time)+5) # 5 min to wrapup
160 jobParams += '+MaxWallTimeMins = '+ glideinTime + '; '
161 else:
162 jobParams += '+MaxWallTimeMins = %d; ' % (60*22 - 5) # 22h default in glidein, 5min to wrap
163
164 if self.max_rss :
165 jobParams += 'request_memory = '+self.max_rss+';'
166
167 if self.allowOverflow == "0":
168 jobParams += '+CMS_ALLOW_OVERFLOW = False; '
169
170 if self.EDG_addJdlParam:
171 if self.EDG_addJdlParam[-1] == '':
172 self.EDG_addJdlParam = self.EDG_addJdlParam[:-1]
173 for p in self.EDG_addJdlParam:
174 jobParams += p.strip()+';\n'
175
176 common._db.updateTask_({'jobType':jobParams})
177
178
179 return jobParams
180
181
182 def realSchedParams(self, cfg_params):
183 """
184 Return dictionary with specific parameters, to use with real scheduler
185 is called when scheduler is initialized in Boss, i.e. at each crab command
186 """
187 #SB this method is used to pass directory names to Boss Scheduler
188 # via params dictionary
189
190 jobDir = common.work_space.jobDir()
191 taskDir=common.work_space.topDir().split('/')[-2]
192 shareDir = common.work_space.shareDir()
193
194 params = {'shareDir':shareDir,
195 'jobDir':jobDir,
196 'taskDir':taskDir,
197 'submissionDay':self.submissionDay}
198
199 return params
200
201
202 def listMatch(self, seList, full):
203 """
204 Check the compatibility of available resources
205 """
206
207 return [True]
208
209
210 def decodeLogInfo(self, fileName):
211 """
212 Parse logging info file and return main info
213 """
214
215 import CondorGLoggingInfo
216 loggingInfo = CondorGLoggingInfo.CondorGLoggingInfo()
217 reason = loggingInfo.decodeReason(fileName)
218 return reason
219
220
221 # def wsCopyOutput(self):
222 # """
223 # Write a CopyResults part of a job script, e.g.
224 # to copy produced output into a storage element.
225 # """
226 # txt = self.wsCopyOutput()
227 # return txt
228
229
230 def wsExitFunc(self):
231 """
232 Returns the part of the job script which runs prior to exit
233 """
234
235 txt = '\n'
236 txt += '#\n'
237 txt += '# EXECUTE THIS FUNCTION BEFORE EXIT \n'
238 txt += '#\n\n'
239
240 txt += 'func_exit() { \n'
241 txt += self.wsExitFunc_common()
242 txt += '#Check for stdout/err in new location as of condor 7.7\n'
243 txt += ' if [ -s _condor_stdout ]; then\n'
244 txt += ' echo "Found _condor_stdout/err, rename for OSB"\n'
245 txt += ' cp -pfv _condor_stdout CMSSW_${NJob}.stdout\n'
246 txt += ' cp -pfv _condor_stderr CMSSW_${NJob}.stderr\n'
247 txt += ' fi\n'
248 txt += ' tar zcvf ${out_files}.tgz ${final_list}\n'
249 txt += ' tmp_size=`ls -gGrta ${out_files}.tgz | awk \'{ print $3 }\'`\n'
250 txt += ' rm ${out_files}.tgz\n'
251 txt += ' size=`expr $tmp_size`\n'
252 txt += ' echo "Total Output dimension: $size"\n'
253 txt += ' limit='+str(self.OSBsize) +' \n'
254 txt += ' echo "WARNING: output files size limit is set to: $limit"\n'
255 txt += ' if [ "$limit" -lt "$size" ]; then\n'
256 txt += ' exceed=1\n'
257 txt += ' job_exit_code=70000\n'
258 txt += ' echo "Output Sanbox too big. Produced output is lost "\n'
259 txt += ' else\n'
260 txt += ' exceed=0\n'
261 txt += ' echo "Total Output dimension $size is fine."\n'
262 txt += ' fi\n'
263
264 txt += ' echo "JOB_EXIT_STATUS = $job_exit_code"\n'
265 txt += ' echo "JobExitCode=$job_exit_code" >> $RUNTIME_AREA/$repo\n'
266 txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
267 txt += ' if [ $exceed -ne 1 ]; then\n'
268 txt += ' tar zcvf ${out_files}.tgz ${final_list}\n'
269 txt += ' else\n'
270 txt += ' tar zcvf ${out_files}.tgz CMSSW_${NJob}.stdout CMSSW_${NJob}.stderr\n'
271 txt += ' fi\n'
272 txt += ' python $RUNTIME_AREA/fillCrabFjr.py $RUNTIME_AREA/crab_fjr_$NJob.xml --errorcode $job_exit_code \n'
273
274 txt += ' exit $job_exit_code\n'
275 txt += '}\n'
276
277 return txt
278
279
280 def sched_fix_parameter(self):
281 """
282 Returns string with requirements and scheduler-specific parameters
283 """
284
285 if self.EDG_requirements:
286 req = self.EDG_requirements
287 taskReq = {'commonRequirements':req}
288 common._db.updateTask_(taskReq)
289
290 def pickRemoteSubmissionHost(self, task):
291
292 task = common._db.getTask()
293
294 if task['serverName']!=None and task['serverName']!="":
295 # remoteHost is already defined and stored for this task
296 # so pick it from DB
297 # cast to string to avoid issues with unicode :-(
298 remoteUserHost=str(task['serverName'])
299 common.logger.info("serverName from Task DB is %s" %
300 remoteUserHost)
301 if '@' in remoteUserHost:
302 remoteHost = remoteUserHost.split('@')[1]
303 else:
304 remoteHost = remoteUserHost
305 else:
306 if self.cfg_params.has_key('CRAB.submit_host'):
307 # get a remote submission host from crab config file
308 srvCfg=ServerConfig(self.cfg_params['CRAB.submit_host']).config()
309 remoteHost=srvCfg['serverName']
310 common.logger.info("remotehost from crab.cfg = %s" % remoteHost)
311 else:
312 # pick from Available Servers List
313 srvCfg=ServerConfig('default').config()
314 remoteHost = srvCfg['serverName']
315 common.logger.info("remotehost from Avail.List = %s" % remoteHost)
316
317 if not remoteHost:
318 raise CrabException('FATAL ERROR: remoteHost not defined')
319
320 #common.logger.info("try to find out username for remote Host via uberftp ...")
321 #command="uberftp %s pwd|grep User|awk '{print $3}'" % remoteHost
322 #(status, output) = commands.getstatusoutput(command)
323 #if status == 0:
324 # remoteUser = output
325 # common.logger.info("remoteUser set to %s" % remoteUser)
326 # if remoteUser==None:
327 # raise CrabException('FATAL ERROR: REMOTE USER not defined')
328
329 #remoteUserHost = remoteUser + '@' + remoteHost
330 remoteUserHost = remoteHost
331
332 common._db.updateTask_({'serverName':remoteUserHost})
333
334 return (remoteHost, remoteUserHost)