ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerRemoteglidein.py
Revision: 1.18
Committed: Mon Apr 22 15:17:37 2013 UTC (12 years ago) by belforte
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_9_0_pre1, CRAB_2_8_8, CRAB_2_8_8_pre1, CRAB_2_8_7_patch3, CRAB_2_8_7_patch2, CRAB_2_8_7_patch1, CRAB_2_8_7, CRAB_2_8_7_pre2
Changes since 1.17: +0 -1 lines
Log Message:
remove unused import

File Contents

# User Rev Content
1 belforte 1.1 """
2     Implements the Remote Glidein scheduler
3     """
4    
5     from SchedulerGrid import SchedulerGrid
6     from crab_exceptions import CrabException
7     from crab_util import runCommand
8 belforte 1.17 from crab_util import gethnUserNameFromSiteDB
9 belforte 1.1 from ServerConfig import *
10     import Scram
11    
12     import common
13     import os
14     import socket
15     import re
16     import commands
17    
18     # FUTURE: for python 2.4 & 2.6
19     try:
20     from hashlib import sha1
21     except:
22     from sha import sha as sha1
23    
24     class SchedulerRemoteglidein(SchedulerGrid) :
25     """
26     Class to implement the vanilla remote Condor scheduler
27     for a glidein frontend, the created JDL will not work
28     on valilla local condor.
29     Naming convention: Methods starting with 'ws' provide
30     the corresponding part of the job script
31     ('ws' stands for 'write script').
32     """
33    
34     def __init__(self):
35     SchedulerGrid.__init__(self,"REMOTEGLIDEIN")
36    
37     self.datasetPath = None
38     self.selectNoInput = None
39     self.OSBsize = 50*1000*1000 # 50 MB
40    
41     self.environment_unique_identifier = None
42 belforte 1.4 self.submissionDay = time.strftime("%y%m%d",time.localtime())
43 belforte 1.15
44 belforte 1.1 return
45    
46    
47     def configure(self, cfg_params):
48     """
49     Configure the scheduler with the config settings from the user
50     """
51 belforte 1.15
52     # this line needs to be before the call to SchedulerGrid.configure
53     # because that calls SchedulerRemoteglidin in turn and
54     # sshControlPersist needs to be defined then :-(
55     self.sshControlPersist = cfg_params.get('USER.ssh_control_persist','3600')
56     if self.sshControlPersist.lower() == "no" or \
57     self.sshControlPersist.lower() == "yes" or \
58     self.sshControlPersist.isdigit() :
59     pass
60     else:
61     msg = "Error: invalid value '%s' for USER.ssh_control_persist " % \
62     self.sshControlPersist
63     raise CrabException(msg)
64    
65 belforte 1.1 SchedulerGrid.configure(self, cfg_params)
66    
67     self.proxyValid=0
68 belforte 1.8 self.dontCheckProxy=int(cfg_params.get("GRID.dont_check_proxy",'0'))
69 belforte 1.1 self.space_token = cfg_params.get("USER.space_token",None)
70     self.proxyServer= 'myproxy.cern.ch'
71     self.group = cfg_params.get("GRID.group", None)
72     self.role = cfg_params.get("GRID.role", None)
73     self.VO = cfg_params.get('GRID.virtual_organization','cms')
74 belforte 1.8 self.allowOverflow = cfg_params.get('GRID.allow_overflow', '1')
75 belforte 1.13 self.max_rss = cfg_params.get('GRID.max_rss','2000')
76 belforte 1.1
77     self.checkProxy()
78    
79    
80     try:
81     tmp = cfg_params['CMSSW.datasetpath']
82     if tmp.lower() == 'none':
83     self.datasetPath = None
84     self.selectNoInput = 1
85     else:
86     self.datasetPath = tmp
87     self.selectNoInput = 0
88     except KeyError:
89     msg = "Error: datasetpath not defined "
90     raise CrabException(msg)
91    
92     if cfg_params.get('GRID.ce_black_list', None) or \
93     cfg_params.get('GRID.ce_white_list', None) :
94     msg="BEWARE: scheduler REMOTEGLIDEIN ignores CE black/white lists."
95     msg+="\n Remove them from crab configuration to proceed."
96     msg+="\n Use GRID.se_white_list and/or GRID.se_black_list instead"
97     raise CrabException(msg)
98    
99 belforte 1.5
100     # make sure proxy FQAN has not changed since last time
101     command = "voms-proxy-info -identity -fqan 2>/dev/null"
102     command += " | head -2"
103     identity = runCommand(command)
104     idfile = common.work_space.shareDir() + "GridIdentity"
105     if os.access(idfile, os.F_OK) :
106     # identity file exists from previous commands
107     f=open(idfile, 'r')
108     idFromFile=f.read()
109     f.close()
110     else :
111     # create it
112     f=open(idfile, 'w')
113     f.write(identity)
114     f.close()
115     idFromFile = identity
116    
117     if identity != idFromFile:
118     msg = "Wrong Grid Credentials:\n%s" % identity
119     msg += "\nMake sure you have "
120     msg += " DN, FQAN =\n%s" % idFromFile
121     raise CrabException(msg)
122    
123 belforte 1.1 return
124    
125 belforte 1.17 #
126 belforte 1.1 def envUniqueID(self):
127     taskHash = sha1(common._db.queryTask('name')).hexdigest()
128     id = "https://" + socket.gethostname() + '/' + taskHash + "/${NJob}"
129     return id
130    
131     def sched_parameter(self, i, task):
132     """
133     Return scheduler-specific parameters. Used at crab -submit time
134     by $CRABPYTHON/Scheduler.py
135     """
136    
137 belforte 1.8 #SB paste from crab SchedulerGlidein
138 belforte 1.1
139     jobParams = ""
140    
141     (self.remoteHost,self.remoteUserHost) = self.pickRemoteSubmissionHost(task)
142    
143     seDest = task.jobs[i-1]['dlsDestination']
144    
145     if seDest == [''] :
146     seDest = self.blackWhiteListParser.expandList("T") # all of SiteDB
147    
148     seString=self.blackWhiteListParser.cleanForBlackWhiteList(seDest)
149    
150     jobParams += '+DESIRED_SEs = "'+seString+'"; '
151    
152     scram = Scram.Scram(None)
153     cmsVersion = scram.getSWVersion()
154     scramArch = scram.getArch()
155    
156     cmsver=re.split('_', cmsVersion)
157     numericCmsVersion = "%s%.2d%.2d" %(cmsver[1], int(cmsver[2]), int(cmsver[3]))
158    
159     jobParams += '+DESIRED_CMSVersion ="' +cmsVersion+'";'
160     jobParams += '+DESIRED_CMSVersionNr ="' +numericCmsVersion+'";'
161     jobParams += '+DESIRED_CMSScramArch ="' +scramArch+'";'
162 belforte 1.17
163     userName = gethnUserNameFromSiteDB()
164     jobParams += '+AccountingGroup ="' + userName+'";'
165 belforte 1.1
166     myscheddName = self.remoteHost
167 belforte 1.15
168 belforte 1.4 jobParams += '+Glidein_MonitorID = "https://'+ myscheddName + \
169     '//' + self.submissionDay + '//$(Cluster).$(Process)"; '
170 belforte 1.1
171     if (self.EDG_clock_time):
172 belforte 1.17 glideinTime = "%d" % (int(self.EDG_clock_time)+20) # 20 min to wrapup
173 belforte 1.10 jobParams += '+MaxWallTimeMins = '+ glideinTime + '; '
174 belforte 1.1 else:
175 belforte 1.16 jobParams += '+MaxWallTimeMins = %d; ' % (21*60+55) # 21:55h (unit = min)
176    
177 belforte 1.1
178 belforte 1.8 if self.max_rss :
179     jobParams += 'request_memory = '+self.max_rss+';'
180    
181 belforte 1.6 if self.allowOverflow == "0":
182     jobParams += '+CMS_ALLOW_OVERFLOW = False; '
183    
184 belforte 1.7 if self.EDG_addJdlParam:
185     if self.EDG_addJdlParam[-1] == '':
186     self.EDG_addJdlParam = self.EDG_addJdlParam[:-1]
187     for p in self.EDG_addJdlParam:
188     jobParams += p.strip()+';\n'
189    
190 belforte 1.1 common._db.updateTask_({'jobType':jobParams})
191    
192     return jobParams
193    
194    
195     def realSchedParams(self, cfg_params):
196     """
197     Return dictionary with specific parameters, to use with real scheduler
198     is called when scheduler is initialized in Boss, i.e. at each crab command
199     """
200 belforte 1.15 #SB this method is used to pass informatinos to Boss Scheduler
201 belforte 1.1 # via params dictionary
202    
203     jobDir = common.work_space.jobDir()
204     taskDir=common.work_space.topDir().split('/')[-2]
205     shareDir = common.work_space.shareDir()
206    
207     params = {'shareDir':shareDir,
208     'jobDir':jobDir,
209 belforte 1.3 'taskDir':taskDir,
210 belforte 1.15 'submissionDay':self.submissionDay,
211     'sshControlPersist':self.sshControlPersist}
212 belforte 1.1
213     return params
214    
215    
216     def listMatch(self, seList, full):
217     """
218     Check the compatibility of available resources
219     """
220    
221     return [True]
222    
223    
224     def decodeLogInfo(self, fileName):
225     """
226     Parse logging info file and return main info
227     """
228    
229     import CondorGLoggingInfo
230     loggingInfo = CondorGLoggingInfo.CondorGLoggingInfo()
231     reason = loggingInfo.decodeReason(fileName)
232     return reason
233    
234    
235     # def wsCopyOutput(self):
236     # """
237     # Write a CopyResults part of a job script, e.g.
238     # to copy produced output into a storage element.
239     # """
240     # txt = self.wsCopyOutput()
241     # return txt
242    
243    
244     def wsExitFunc(self):
245     """
246     Returns the part of the job script which runs prior to exit
247     """
248    
249     txt = '\n'
250     txt += '#\n'
251     txt += '# EXECUTE THIS FUNCTION BEFORE EXIT \n'
252     txt += '#\n\n'
253    
254     txt += 'func_exit() { \n'
255     txt += self.wsExitFunc_common()
256     txt += ' exit $job_exit_code\n'
257     txt += '}\n'
258    
259     return txt
260    
261    
262     def sched_fix_parameter(self):
263     """
264     Returns string with requirements and scheduler-specific parameters
265     """
266    
267     if self.EDG_requirements:
268     req = self.EDG_requirements
269     taskReq = {'commonRequirements':req}
270     common._db.updateTask_(taskReq)
271    
272     def pickRemoteSubmissionHost(self, task):
273    
274     task = common._db.getTask()
275    
276     if task['serverName']!=None and task['serverName']!="":
277     # remoteHost is already defined and stored for this task
278     # so pick it from DB
279     # cast to string to avoid issues with unicode :-(
280     remoteUserHost=str(task['serverName'])
281     common.logger.info("serverName from Task DB is %s" %
282     remoteUserHost)
283     else:
284     if self.cfg_params.has_key('CRAB.submit_host'):
285     # get a remote submission host from crab config file
286     srvCfg=ServerConfig(self.cfg_params['CRAB.submit_host']).config()
287 belforte 1.15 remoteUserHost=srvCfg['serverName']
288     common.logger.info("remotehost from crab.cfg = %s" % remoteUserHost)
289 belforte 1.1 else:
290     # pick from Available Servers List
291     srvCfg=ServerConfig('default').config()
292 belforte 1.15 remoteUserHost = srvCfg['serverName']
293     common.logger.info("remotehost from Avail.List = %s" % remoteUserHost)
294 belforte 1.1
295 belforte 1.15 if not remoteUserHost:
296 belforte 1.1 raise CrabException('FATAL ERROR: remoteHost not defined')
297    
298 belforte 1.15 if '@' in remoteUserHost:
299     remoteHost = remoteUserHost.split('@')[1]
300     else:
301     remoteHost = remoteUserHost
302 belforte 1.1
303     common._db.updateTask_({'serverName':remoteUserHost})
304    
305     return (remoteHost, remoteUserHost)