ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerRcondor.py
Revision: 1.12
Committed: Thu Aug 23 13:25:47 2012 UTC (12 years, 8 months ago) by belforte
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_9_1, CRAB_2_9_1_pre2, CRAB_2_9_1_pre1, CRAB_2_9_0, CRAB_2_9_0_pre2, CRAB_2_9_0_pre1, CRAB_2_8_8, CRAB_2_8_8_pre1, CRAB_2_8_7_patch3, CRAB_2_8_7_patch2, CRAB_2_8_7_patch1, CRAB_2_8_7, CRAB_2_8_7_pre2, CRAB_2_8_7_pre1, CRAB_2_8_6, CRAB_2_8_6_pre1, CRAB_2_8_5_patch3, CRAB_2_8_5_patch2, CRAB_2_8_5_patch1, CRAB_2_8_5, CRAB_2_8_5_pre5, CRAB_2_8_5_pre4, CRAB_2_8_5_pre3, CRAB_2_8_4_patch3, CRAB_2_8_5_pre2, CRAB_2_8_4_patch2, CRAB_2_8_5_pre1, CRAB_2_8_4_patch1, CRAB_2_8_4, CRAB_2_8_4_pre5, CRAB_2_8_4_pre4, CRAB_2_8_4_pre3, CRAB_2_8_4_pre2, CRAB_2_8_4_pre1, CRAB_2_8_3, CRAB_2_8_3_pre4, CRAB_2_8_3_pre3, CRAB_2_8_3_pre2, CRAB_2_8_3_pre1, HEAD
Changes since 1.11: +98 -13 lines
Error occurred while calculating annotation data.
Log Message:
no env.var anymore, use AvailServerList

File Contents

# Content
1 """
2 Implements the vanilla (local) Remote Condor scheduler
3 """
4
5 from SchedulerGrid import SchedulerGrid
6 from crab_exceptions import CrabException
7 from crab_util import runCommand
8 from ServerConfig import *
9 from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
10 import Scram
11
12
13 import common
14 import os
15 import socket
16 import re
17 import commands
18
19 # FUTURE: for python 2.4 & 2.6
20 try:
21 from hashlib import sha1
22 except:
23 from sha import sha as sha1
24
25 class SchedulerRcondor(SchedulerGrid) :
26 """
27 Class to implement the vanilla remote Condor scheduler
28 Naming convention: Methods starting with 'ws' provide
29 the corresponding part of the job script
30 ('ws' stands for 'write script').
31 """
32
33 def __init__(self):
34 SchedulerGrid.__init__(self,"RCONDOR")
35
36 self.datasetPath = None
37 self.selectNoInput = None
38 self.OSBsize = 50*1000*1000 # 50 MB
39
40 self.environment_unique_identifier = None
41
42 return
43
44
45 def configure(self, cfg_params):
46 """
47 Configure the scheduler with the config settings from the user
48 """
49
50 # task = common._db.getTask()
51 # #print task.__dict__
52 #
53 # if task['serverName']!=None and task['serverName']!="":
54 # # cast to string to avoid issues with unicode :-(
55 # self.rcondorUserHost=str(task['serverName'])
56 # common.logger.info("serverName from Task DB is %s" %
57 # self.rcondorUserHost)
58 # else :
59 # # get an rcondor host from config and save
60 # common.logger.info("no serverName in Task DB, use env.var.")
61 #
62 # self.rcondorHost = os.getenv('RCONDOR_HOST')
63 # if not self.rcondorHost :
64 # raise CrabException('FATAL ERROR: env.var RCONDOR_HOST not defined')
65 # self.rcondorUser = os.getenv('RCONDOR_USER')
66 # if not self.rcondorUser :
67 # common.logger.info("$RCONDOR_USER not defined, try to find out via uberftp ...")
68 # command="uberftp $RCONDOR_HOST pwd|grep User|awk '{print $3}'"
69 # (status, output) = commands.getstatusoutput(command)
70 # if status == 0:
71 # self.rcondorUser = output
72 # common.logger.info("rcondorUser set to %s" % self.rcondorUser)
73 # if self.rcondorUser==None:
74 # raise CrabException('FATAL ERROR: RCONDOR_USER not defined')
75 #
76 # self.rcondorUserHost = self.rcondorUser + '@' + self.rcondorHost
77 #
78 # print "will set server name to : ", self.rcondorUserHost
79 # common._db.updateTask_({'serverName':self.rcondorUserHost})
80 # print "ok"
81
82 SchedulerGrid.configure(self, cfg_params)
83
84 self.proxyValid=0
85 self.dontCheckProxy=int(cfg_params.get("GRID.dont_check_proxy",0))
86 self.space_token = cfg_params.get("USER.space_token",None)
87 self.proxyServer= 'myproxy.cern.ch'
88 self.group = cfg_params.get("GRID.group", None)
89 self.role = cfg_params.get("GRID.role", None)
90 self.VO = cfg_params.get('GRID.virtual_organization','cms')
91
92 self.checkProxy()
93
94
95 try:
96 tmp = cfg_params['CMSSW.datasetpath']
97 if tmp.lower() == 'none':
98 self.datasetPath = None
99 self.selectNoInput = 1
100 else:
101 self.datasetPath = tmp
102 self.selectNoInput = 0
103 except KeyError:
104 msg = "Error: datasetpath not defined "
105 raise CrabException(msg)
106
107 if cfg_params.get('GRID.ce_black_list', None) or \
108 cfg_params.get('GRID.ce_white_list', None) :
109 msg="BEWARE: scheduler RGLIDEIN ignores CE black/white lists."
110 msg+="\n Remove them from crab configuration to proceed."
111 msg+="\n Use GRID.se_white_list and/or GRID.se_black_list instead"
112 raise CrabException(msg)
113
114 return
115
116 def userName(self):
117 """ return the user name """
118 tmp=runCommand("voms-proxy-info -identity 2>/dev/null")
119 return tmp.strip()
120
121 def envUniqueID(self):
122 taskHash = sha1(common._db.queryTask('name')).hexdigest()
123 id = "https://" + socket.gethostname() + '/' + taskHash + "/${NJob}"
124 return id
125
126 def sched_parameter(self, i, task):
127 """
128 Return scheduler-specific parameters. Used at crab -submit time
129 by $CRABPYTHON/Scheduler.py
130 """
131
132 #SB paste from crab ScheduerGlidein
133
134 jobParams = ""
135
136 (self.rcondorHost,self.rcondorUserHost) = self.pickRcondorSubmissionHost(task)
137
138 seDest = task.jobs[i-1]['dlsDestination']
139
140 if seDest == [''] :
141 seDest = self.blackWhiteListParser.expandList("T") # all of SiteDB
142
143 seString=self.blackWhiteListParser.cleanForBlackWhiteList(seDest)
144
145 jobParams += '+DESIRED_SEs = "'+seString+'"; '
146
147 scram = Scram.Scram(None)
148 cmsVersion = scram.getSWVersion()
149 scramArch = scram.getArch()
150
151 cmsver=re.split('_', cmsVersion)
152 numericCmsVersion = "%s%.2d%.2d" %(cmsver[1], int(cmsver[2]), int(cmsver[3]))
153
154 jobParams += '+DESIRED_CMSVersion ="' +cmsVersion+'";'
155 jobParams += '+DESIRED_CMSVersionNr ="' +numericCmsVersion+'";'
156 jobParams += '+DESIRED_CMSScramArch ="' +scramArch+'";'
157
158 myscheddName = self.rcondorHost
159 jobParams += '+Glidein_MonitorID = "https://'+ myscheddName + '//$(Cluster).$(Process)"; '
160
161 if (self.EDG_clock_time):
162 jobParams += '+MaxWallTimeMins = '+self.EDG_clock_time+'; '
163 else:
164 jobParams += '+MaxWallTimeMins = %d; ' % (60*24)
165
166 common._db.updateTask_({'jobType':jobParams})
167
168
169 return jobParams
170
171
172 def realSchedParams(self, cfg_params):
173 """
174 Return dictionary with specific parameters, to use with real scheduler
175 is called when scheduler is initialized in Boss, i.e. at each crab command
176 """
177 #SB this method is used to pass directory names to Boss Scheduler
178 # via params dictionary
179
180 jobDir = common.work_space.jobDir()
181 taskDir=common.work_space.topDir().split('/')[-2]
182 shareDir = common.work_space.shareDir()
183
184 params = {'shareDir':shareDir,
185 'jobDir':jobDir,
186 'taskDir':taskDir}
187
188 return params
189
190
191 def listMatch(self, seList, full):
192 """
193 Check the compatibility of available resources
194 """
195
196 return [True]
197
198
199 def decodeLogInfo(self, fileName):
200 """
201 Parse logging info file and return main info
202 """
203
204 import CondorGLoggingInfo
205 loggingInfo = CondorGLoggingInfo.CondorGLoggingInfo()
206 reason = loggingInfo.decodeReason(fileName)
207 return reason
208
209
210 # def wsCopyOutput(self):
211 # """
212 # Write a CopyResults part of a job script, e.g.
213 # to copy produced output into a storage element.
214 # """
215 # txt = self.wsCopyOutput()
216 # return txt
217
218
219 def wsExitFunc(self):
220 """
221 Returns the part of the job script which runs prior to exit
222 """
223
224 txt = '\n'
225 txt += '#\n'
226 txt += '# EXECUTE THIS FUNCTION BEFORE EXIT \n'
227 txt += '#\n\n'
228
229 txt += 'func_exit() { \n'
230 txt += self.wsExitFunc_common()
231
232 txt += ' tar zcvf ${out_files}.tgz ${final_list}\n'
233 txt += ' tmp_size=`ls -gGrta ${out_files}.tgz | awk \'{ print $3 }\'`\n'
234 txt += ' rm ${out_files}.tgz\n'
235 txt += ' size=`expr $tmp_size`\n'
236 txt += ' echo "Total Output dimension: $size"\n'
237 txt += ' limit='+str(self.OSBsize) +' \n'
238 txt += ' echo "WARNING: output files size limit is set to: $limit"\n'
239 txt += ' if [ "$limit" -lt "$size" ]; then\n'
240 txt += ' exceed=1\n'
241 txt += ' job_exit_code=70000\n'
242 txt += ' echo "Output Sanbox too big. Produced output is lost "\n'
243 txt += ' else\n'
244 txt += ' exceed=0\n'
245 txt += ' echo "Total Output dimension $size is fine."\n'
246 txt += ' fi\n'
247
248 txt += ' echo "JOB_EXIT_STATUS = $job_exit_code"\n'
249 txt += ' echo "JobExitCode=$job_exit_code" >> $RUNTIME_AREA/$repo\n'
250 txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
251 txt += ' if [ $exceed -ne 1 ]; then\n'
252 txt += ' tar zcvf ${out_files}.tgz ${final_list}\n'
253 txt += ' else\n'
254 txt += ' tar zcvf ${out_files}.tgz CMSSW_${NJob}.stdout CMSSW_${NJob}.stderr\n'
255 txt += ' fi\n'
256 txt += ' python $RUNTIME_AREA/fillCrabFjr.py $RUNTIME_AREA/crab_fjr_$NJob.xml --errorcode $job_exit_code \n'
257
258 txt += ' exit $job_exit_code\n'
259 txt += '}\n'
260
261 return txt
262
263
264 def sched_fix_parameter(self):
265 """
266 Returns string with requirements and scheduler-specific parameters
267 """
268
269 if self.EDG_requirements:
270 req = self.EDG_requirements
271 taskReq = {'commonRequirements':req}
272 common._db.updateTask_(taskReq)
273
274 def pickRcondorSubmissionHost(self, task):
275
276 task = common._db.getTask()
277
278 if task['serverName']!=None and task['serverName']!="":
279 # rcondorHost is already defined and stored for this task
280 # so pick it from DB
281 # cast to string to avoid issues with unicode :-(
282 rcondorUserHost=str(task['serverName'])
283 common.logger.info("serverName from Task DB is %s" %
284 rcondorUserHost)
285 if '@' in rcondorUserHost:
286 rcondorHost = rcondorUserHost.split('@')[1]
287 else:
288 rcondorHost = rcondorUserHost
289 else:
290 if self.cfg_params.has_key('CRAB.submit_host'):
291 # get an rcondor host from crab config file
292 srvCfg=ServerConfig(self.cfg_params['CRAB.submit_host']).config()
293 rcondorHost=srvCfg['serverName']
294 common.logger.info("rcondorhost from crab.cfg = %s" % rcondorHost)
295 else:
296 # pick from Available Servers List
297 srvCfg=ServerConfig('default').config()
298 print srvCfg
299 rcondorHost = srvCfg['serverName']
300 common.logger.info("rcondorhost from Avail.List = %s" % rcondorHost)
301
302 if not rcondorHost:
303 raise CrabException('FATAL ERROR: condorHost not defined')
304 # fall back to env.
305 #common.logger.info("no serverName in Task DB, use env.var.")
306 #rcondorHost = os.getenv('RCONDOR_HOST')
307 #if not rcondorHost :
308 # raise CrabException('FATAL ERROR: env.var RCONDOR_HOST not defined')
309
310 #rcondorUser = os.getenv('RCONDOR_USER')
311 #if not rcondorUser :
312 common.logger.info("try to find out RCONDOR_USER via uberftp ...")
313 command="uberftp %s pwd|grep User|awk '{print $3}'" % rcondorHost
314 (status, output) = commands.getstatusoutput(command)
315 if status == 0:
316 rcondorUser = output
317 common.logger.info("rcondorUser set to %s" % rcondorUser)
318 if rcondorUser==None:
319 raise CrabException('FATAL ERROR: RCONDOR_USER not defined')
320
321 rcondorUserHost = rcondorUser + '@' + rcondorHost
322
323 print "will set server name to : ", rcondorUserHost
324 common._db.updateTask_({'serverName':rcondorUserHost})
325
326 return (rcondorHost, rcondorUserHost)