ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerRemoteglidein.py
Revision: 1.4
Committed: Tue Oct 2 14:56:52 2012 UTC (12 years, 6 months ago) by belforte
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_8_3_pre4, CRAB_2_8_3_pre3
Changes since 1.3: +4 -3 lines
Log Message:
make monitoringId consistent in wrapper and crab

File Contents

# Content
1 """
2 Implements the Remote Glidein scheduler
3 """
4
5 from SchedulerGrid import SchedulerGrid
6 from crab_exceptions import CrabException
7 from crab_util import runCommand
8 from ServerConfig import *
9 from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
10 import Scram
11
12 import common
13 import os
14 import socket
15 import re
16 import commands
17
18 # FUTURE: for python 2.4 & 2.6
19 try:
20 from hashlib import sha1
21 except:
22 from sha import sha as sha1
23
24 class SchedulerRemoteglidein(SchedulerGrid) :
25 """
26 Class to implement the vanilla remote Condor scheduler
27 for a glidein frontend, the created JDL will not work
28 on valilla local condor.
29 Naming convention: Methods starting with 'ws' provide
30 the corresponding part of the job script
31 ('ws' stands for 'write script').
32 """
33
34 def __init__(self):
35 SchedulerGrid.__init__(self,"REMOTEGLIDEIN")
36
37 self.datasetPath = None
38 self.selectNoInput = None
39 self.OSBsize = 50*1000*1000 # 50 MB
40
41 self.environment_unique_identifier = None
42 self.submissionDay = time.strftime("%y%m%d",time.localtime())
43
44 return
45
46
47 def configure(self, cfg_params):
48 """
49 Configure the scheduler with the config settings from the user
50 """
51
52 SchedulerGrid.configure(self, cfg_params)
53
54 self.proxyValid=0
55 self.dontCheckProxy=int(cfg_params.get("GRID.dont_check_proxy",0))
56 self.space_token = cfg_params.get("USER.space_token",None)
57 self.proxyServer= 'myproxy.cern.ch'
58 self.group = cfg_params.get("GRID.group", None)
59 self.role = cfg_params.get("GRID.role", None)
60 self.VO = cfg_params.get('GRID.virtual_organization','cms')
61
62 self.checkProxy()
63
64
65 try:
66 tmp = cfg_params['CMSSW.datasetpath']
67 if tmp.lower() == 'none':
68 self.datasetPath = None
69 self.selectNoInput = 1
70 else:
71 self.datasetPath = tmp
72 self.selectNoInput = 0
73 except KeyError:
74 msg = "Error: datasetpath not defined "
75 raise CrabException(msg)
76
77 if cfg_params.get('GRID.ce_black_list', None) or \
78 cfg_params.get('GRID.ce_white_list', None) :
79 msg="BEWARE: scheduler REMOTEGLIDEIN ignores CE black/white lists."
80 msg+="\n Remove them from crab configuration to proceed."
81 msg+="\n Use GRID.se_white_list and/or GRID.se_black_list instead"
82 raise CrabException(msg)
83
84 return
85
86 def userName(self):
87 """ return the user name """
88 tmp=runCommand("voms-proxy-info -identity 2>/dev/null")
89 return tmp.strip()
90
91 def envUniqueID(self):
92 taskHash = sha1(common._db.queryTask('name')).hexdigest()
93 id = "https://" + socket.gethostname() + '/' + taskHash + "/${NJob}"
94 return id
95
96 def sched_parameter(self, i, task):
97 """
98 Return scheduler-specific parameters. Used at crab -submit time
99 by $CRABPYTHON/Scheduler.py
100 """
101
102 #SB paste from crab ScheduerGlidein
103
104 jobParams = ""
105
106 (self.remoteHost,self.remoteUserHost) = self.pickRemoteSubmissionHost(task)
107
108 seDest = task.jobs[i-1]['dlsDestination']
109
110 if seDest == [''] :
111 seDest = self.blackWhiteListParser.expandList("T") # all of SiteDB
112
113 seString=self.blackWhiteListParser.cleanForBlackWhiteList(seDest)
114
115 jobParams += '+DESIRED_SEs = "'+seString+'"; '
116
117 scram = Scram.Scram(None)
118 cmsVersion = scram.getSWVersion()
119 scramArch = scram.getArch()
120
121 cmsver=re.split('_', cmsVersion)
122 numericCmsVersion = "%s%.2d%.2d" %(cmsver[1], int(cmsver[2]), int(cmsver[3]))
123
124 jobParams += '+DESIRED_CMSVersion ="' +cmsVersion+'";'
125 jobParams += '+DESIRED_CMSVersionNr ="' +numericCmsVersion+'";'
126 jobParams += '+DESIRED_CMSScramArch ="' +scramArch+'";'
127
128 myscheddName = self.remoteHost
129 jobParams += '+Glidein_MonitorID = "https://'+ myscheddName + \
130 '//' + self.submissionDay + '//$(Cluster).$(Process)"; '
131
132 if (self.EDG_clock_time):
133 jobParams += '+MaxWallTimeMins = '+self.EDG_clock_time+'; '
134 else:
135 jobParams += '+MaxWallTimeMins = %d; ' % (60*24)
136
137 common._db.updateTask_({'jobType':jobParams})
138
139
140 return jobParams
141
142
143 def realSchedParams(self, cfg_params):
144 """
145 Return dictionary with specific parameters, to use with real scheduler
146 is called when scheduler is initialized in Boss, i.e. at each crab command
147 """
148 #SB this method is used to pass directory names to Boss Scheduler
149 # via params dictionary
150
151 jobDir = common.work_space.jobDir()
152 taskDir=common.work_space.topDir().split('/')[-2]
153 shareDir = common.work_space.shareDir()
154
155 params = {'shareDir':shareDir,
156 'jobDir':jobDir,
157 'taskDir':taskDir,
158 'submissionDay':self.submissionDay}
159
160 return params
161
162
163 def listMatch(self, seList, full):
164 """
165 Check the compatibility of available resources
166 """
167
168 return [True]
169
170
171 def decodeLogInfo(self, fileName):
172 """
173 Parse logging info file and return main info
174 """
175
176 import CondorGLoggingInfo
177 loggingInfo = CondorGLoggingInfo.CondorGLoggingInfo()
178 reason = loggingInfo.decodeReason(fileName)
179 return reason
180
181
182 # def wsCopyOutput(self):
183 # """
184 # Write a CopyResults part of a job script, e.g.
185 # to copy produced output into a storage element.
186 # """
187 # txt = self.wsCopyOutput()
188 # return txt
189
190
191 def wsExitFunc(self):
192 """
193 Returns the part of the job script which runs prior to exit
194 """
195
196 txt = '\n'
197 txt += '#\n'
198 txt += '# EXECUTE THIS FUNCTION BEFORE EXIT \n'
199 txt += '#\n\n'
200
201 txt += 'func_exit() { \n'
202 txt += self.wsExitFunc_common()
203
204 txt += ' tar zcvf ${out_files}.tgz ${final_list}\n'
205 txt += ' tmp_size=`ls -gGrta ${out_files}.tgz | awk \'{ print $3 }\'`\n'
206 txt += ' rm ${out_files}.tgz\n'
207 txt += ' size=`expr $tmp_size`\n'
208 txt += ' echo "Total Output dimension: $size"\n'
209 txt += ' limit='+str(self.OSBsize) +' \n'
210 txt += ' echo "WARNING: output files size limit is set to: $limit"\n'
211 txt += ' if [ "$limit" -lt "$size" ]; then\n'
212 txt += ' exceed=1\n'
213 txt += ' job_exit_code=70000\n'
214 txt += ' echo "Output Sanbox too big. Produced output is lost "\n'
215 txt += ' else\n'
216 txt += ' exceed=0\n'
217 txt += ' echo "Total Output dimension $size is fine."\n'
218 txt += ' fi\n'
219
220 txt += ' echo "JOB_EXIT_STATUS = $job_exit_code"\n'
221 txt += ' echo "JobExitCode=$job_exit_code" >> $RUNTIME_AREA/$repo\n'
222 txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
223 txt += ' if [ $exceed -ne 1 ]; then\n'
224 txt += ' tar zcvf ${out_files}.tgz ${final_list}\n'
225 txt += ' else\n'
226 txt += ' tar zcvf ${out_files}.tgz CMSSW_${NJob}.stdout CMSSW_${NJob}.stderr\n'
227 txt += ' fi\n'
228 txt += ' python $RUNTIME_AREA/fillCrabFjr.py $RUNTIME_AREA/crab_fjr_$NJob.xml --errorcode $job_exit_code \n'
229
230 txt += ' exit $job_exit_code\n'
231 txt += '}\n'
232
233 return txt
234
235
236 def sched_fix_parameter(self):
237 """
238 Returns string with requirements and scheduler-specific parameters
239 """
240
241 if self.EDG_requirements:
242 req = self.EDG_requirements
243 taskReq = {'commonRequirements':req}
244 common._db.updateTask_(taskReq)
245
246 def pickRemoteSubmissionHost(self, task):
247
248 task = common._db.getTask()
249
250 if task['serverName']!=None and task['serverName']!="":
251 # remoteHost is already defined and stored for this task
252 # so pick it from DB
253 # cast to string to avoid issues with unicode :-(
254 remoteUserHost=str(task['serverName'])
255 common.logger.info("serverName from Task DB is %s" %
256 remoteUserHost)
257 if '@' in remoteUserHost:
258 remoteHost = remoteUserHost.split('@')[1]
259 else:
260 remoteHost = remoteUserHost
261 else:
262 if self.cfg_params.has_key('CRAB.submit_host'):
263 # get a remote submission host from crab config file
264 srvCfg=ServerConfig(self.cfg_params['CRAB.submit_host']).config()
265 remoteHost=srvCfg['serverName']
266 common.logger.info("remotehost from crab.cfg = %s" % remoteHost)
267 else:
268 # pick from Available Servers List
269 srvCfg=ServerConfig('default').config()
270 remoteHost = srvCfg['serverName']
271 common.logger.info("remotehost from Avail.List = %s" % remoteHost)
272
273 if not remoteHost:
274 raise CrabException('FATAL ERROR: remoteHost not defined')
275
276 #common.logger.info("try to find out username for remote Host via uberftp ...")
277 #command="uberftp %s pwd|grep User|awk '{print $3}'" % remoteHost
278 #(status, output) = commands.getstatusoutput(command)
279 #if status == 0:
280 # remoteUser = output
281 # common.logger.info("remoteUser set to %s" % remoteUser)
282 # if remoteUser==None:
283 # raise CrabException('FATAL ERROR: REMOTE USER not defined')
284
285 #remoteUserHost = remoteUser + '@' + remoteHost
286 remoteUserHost = remoteHost
287
288 common._db.updateTask_({'serverName':remoteUserHost})
289
290 return (remoteHost, remoteUserHost)