ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerRemoteglidein.py
Revision: 1.22
Committed: Tue Sep 3 13:57:24 2013 UTC (11 years, 7 months ago) by belforte
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_9_1, CRAB_2_9_1_pre2, CRAB_2_9_1_pre1, HEAD
Changes since 1.21: +3 -1 lines
Log Message:
make sure unicode from SiteDB is cast to string: https://savannah.cern.ch/bugs/index.php?102436

File Contents

# User Rev Content
1 belforte 1.1 """
2     Implements the Remote Glidein scheduler
3     """
4    
5     from SchedulerGrid import SchedulerGrid
6     from crab_exceptions import CrabException
7     from crab_util import runCommand
8 belforte 1.17 from crab_util import gethnUserNameFromSiteDB
9 belforte 1.1 from ServerConfig import *
10     import Scram
11    
12     import common
13     import os
14     import socket
15     import re
16     import commands
17    
18     # FUTURE: for python 2.4 & 2.6
19     try:
20     from hashlib import sha1
21     except:
22     from sha import sha as sha1
23    
24     class SchedulerRemoteglidein(SchedulerGrid) :
25     """
26     Class to implement the vanilla remote Condor scheduler
27     for a glidein frontend, the created JDL will not work
28     on valilla local condor.
29     Naming convention: Methods starting with 'ws' provide
30     the corresponding part of the job script
31     ('ws' stands for 'write script').
32     """
33    
34     def __init__(self):
35     SchedulerGrid.__init__(self,"REMOTEGLIDEIN")
36    
37     self.datasetPath = None
38     self.selectNoInput = None
39     self.OSBsize = 50*1000*1000 # 50 MB
40    
41     self.environment_unique_identifier = None
42 belforte 1.4 self.submissionDay = time.strftime("%y%m%d",time.localtime())
43 belforte 1.15
44 belforte 1.1 return
45    
46    
47     def configure(self, cfg_params):
48     """
49     Configure the scheduler with the config settings from the user
50     """
51 belforte 1.15
52     # this line needs to be before the call to SchedulerGrid.configure
53     # because that calls SchedulerRemoteglidin in turn and
54     # sshControlPersist needs to be defined then :-(
55     self.sshControlPersist = cfg_params.get('USER.ssh_control_persist','3600')
56     if self.sshControlPersist.lower() == "no" or \
57     self.sshControlPersist.lower() == "yes" or \
58     self.sshControlPersist.isdigit() :
59     pass
60     else:
61     msg = "Error: invalid value '%s' for USER.ssh_control_persist " % \
62     self.sshControlPersist
63     raise CrabException(msg)
64    
65 belforte 1.1 SchedulerGrid.configure(self, cfg_params)
66    
67     self.proxyValid=0
68 belforte 1.8 self.dontCheckProxy=int(cfg_params.get("GRID.dont_check_proxy",'0'))
69 belforte 1.1 self.space_token = cfg_params.get("USER.space_token",None)
70     self.proxyServer= 'myproxy.cern.ch'
71     self.group = cfg_params.get("GRID.group", None)
72     self.role = cfg_params.get("GRID.role", None)
73     self.VO = cfg_params.get('GRID.virtual_organization','cms')
74 belforte 1.8 self.allowOverflow = cfg_params.get('GRID.allow_overflow', '1')
75 belforte 1.13 self.max_rss = cfg_params.get('GRID.max_rss','2000')
76 belforte 1.1
77     self.checkProxy()
78    
79    
80     try:
81     tmp = cfg_params['CMSSW.datasetpath']
82     if tmp.lower() == 'none':
83     self.datasetPath = None
84     self.selectNoInput = 1
85     else:
86     self.datasetPath = tmp
87     self.selectNoInput = 0
88     except KeyError:
89     msg = "Error: datasetpath not defined "
90     raise CrabException(msg)
91    
92     if cfg_params.get('GRID.ce_black_list', None) or \
93     cfg_params.get('GRID.ce_white_list', None) :
94     msg="BEWARE: scheduler REMOTEGLIDEIN ignores CE black/white lists."
95     msg+="\n Remove them from crab configuration to proceed."
96     msg+="\n Use GRID.se_white_list and/or GRID.se_black_list instead"
97     raise CrabException(msg)
98    
99 belforte 1.5
100     # make sure proxy FQAN has not changed since last time
101     command = "voms-proxy-info -identity -fqan 2>/dev/null"
102     command += " | head -2"
103     identity = runCommand(command)
104     idfile = common.work_space.shareDir() + "GridIdentity"
105     if os.access(idfile, os.F_OK) :
106     # identity file exists from previous commands
107     f=open(idfile, 'r')
108     idFromFile=f.read()
109     f.close()
110     else :
111     # create it
112     f=open(idfile, 'w')
113     f.write(identity)
114     f.close()
115     idFromFile = identity
116    
117     if identity != idFromFile:
118     msg = "Wrong Grid Credentials:\n%s" % identity
119     msg += "\nMake sure you have "
120     msg += " DN, FQAN =\n%s" % idFromFile
121     raise CrabException(msg)
122    
123 belforte 1.1 return
124    
125 belforte 1.17 #
126 belforte 1.1 def envUniqueID(self):
127     taskHash = sha1(common._db.queryTask('name')).hexdigest()
128     id = "https://" + socket.gethostname() + '/' + taskHash + "/${NJob}"
129     return id
130    
131     def sched_parameter(self, i, task):
132     """
133     Return scheduler-specific parameters. Used at crab -submit time
134     by $CRABPYTHON/Scheduler.py
135     """
136    
137 belforte 1.8 #SB paste from crab SchedulerGlidein
138 belforte 1.1
139     jobParams = ""
140    
141     (self.remoteHost,self.remoteUserHost) = self.pickRemoteSubmissionHost(task)
142    
143     seDest = task.jobs[i-1]['dlsDestination']
144    
145     if seDest == [''] :
146     seDest = self.blackWhiteListParser.expandList("T") # all of SiteDB
147    
148     seString=self.blackWhiteListParser.cleanForBlackWhiteList(seDest)
149 belforte 1.22 # beware SiteDB V2 API, explicely cast to string in case it is unicode
150     seString=str(seString)
151    
152 belforte 1.1 jobParams += '+DESIRED_SEs = "'+seString+'"; '
153    
154     scram = Scram.Scram(None)
155     cmsVersion = scram.getSWVersion()
156     scramArch = scram.getArch()
157    
158     cmsver=re.split('_', cmsVersion)
159     numericCmsVersion = "%s%.2d%.2d" %(cmsver[1], int(cmsver[2]), int(cmsver[3]))
160    
161 belforte 1.21 # protect against datasetPath being None
162     jobParams += '+DESIRED_CMSDataset ="' + str(self.datasetPath) + '";'
163 belforte 1.20
164 belforte 1.1 jobParams += '+DESIRED_CMSVersion ="' +cmsVersion+'";'
165     jobParams += '+DESIRED_CMSVersionNr ="' +numericCmsVersion+'";'
166     jobParams += '+DESIRED_CMSScramArch ="' +scramArch+'";'
167 belforte 1.17
168     userName = gethnUserNameFromSiteDB()
169     jobParams += '+AccountingGroup ="' + userName+'";'
170 belforte 1.1
171     myscheddName = self.remoteHost
172 belforte 1.15
173 belforte 1.4 jobParams += '+Glidein_MonitorID = "https://'+ myscheddName + \
174     '//' + self.submissionDay + '//$(Cluster).$(Process)"; '
175 belforte 1.1
176     if (self.EDG_clock_time):
177 belforte 1.17 glideinTime = "%d" % (int(self.EDG_clock_time)+20) # 20 min to wrapup
178 belforte 1.10 jobParams += '+MaxWallTimeMins = '+ glideinTime + '; '
179 belforte 1.1 else:
180 belforte 1.16 jobParams += '+MaxWallTimeMins = %d; ' % (21*60+55) # 21:55h (unit = min)
181    
182 belforte 1.1
183 belforte 1.8 if self.max_rss :
184     jobParams += 'request_memory = '+self.max_rss+';'
185    
186 belforte 1.6 if self.allowOverflow == "0":
187     jobParams += '+CMS_ALLOW_OVERFLOW = False; '
188    
189 belforte 1.7 if self.EDG_addJdlParam:
190     if self.EDG_addJdlParam[-1] == '':
191     self.EDG_addJdlParam = self.EDG_addJdlParam[:-1]
192     for p in self.EDG_addJdlParam:
193     jobParams += p.strip()+';\n'
194    
195 belforte 1.1 common._db.updateTask_({'jobType':jobParams})
196    
197     return jobParams
198    
199    
200     def realSchedParams(self, cfg_params):
201     """
202     Return dictionary with specific parameters, to use with real scheduler
203     is called when scheduler is initialized in Boss, i.e. at each crab command
204     """
205 belforte 1.15 #SB this method is used to pass informatinos to Boss Scheduler
206 belforte 1.1 # via params dictionary
207    
208     jobDir = common.work_space.jobDir()
209     taskDir=common.work_space.topDir().split('/')[-2]
210     shareDir = common.work_space.shareDir()
211    
212     params = {'shareDir':shareDir,
213     'jobDir':jobDir,
214 belforte 1.3 'taskDir':taskDir,
215 belforte 1.15 'submissionDay':self.submissionDay,
216     'sshControlPersist':self.sshControlPersist}
217 belforte 1.1
218     return params
219    
220    
221     def listMatch(self, seList, full):
222     """
223     Check the compatibility of available resources
224     """
225    
226     return [True]
227    
228    
229     def decodeLogInfo(self, fileName):
230     """
231     Parse logging info file and return main info
232     """
233    
234     import CondorGLoggingInfo
235     loggingInfo = CondorGLoggingInfo.CondorGLoggingInfo()
236     reason = loggingInfo.decodeReason(fileName)
237     return reason
238    
239    
240     # def wsCopyOutput(self):
241     # """
242     # Write a CopyResults part of a job script, e.g.
243     # to copy produced output into a storage element.
244     # """
245     # txt = self.wsCopyOutput()
246     # return txt
247    
248    
249     def wsExitFunc(self):
250     """
251     Returns the part of the job script which runs prior to exit
252     """
253    
254     txt = '\n'
255     txt += '#\n'
256     txt += '# EXECUTE THIS FUNCTION BEFORE EXIT \n'
257     txt += '#\n\n'
258    
259     txt += 'func_exit() { \n'
260     txt += self.wsExitFunc_common()
261     txt += ' exit $job_exit_code\n'
262     txt += '}\n'
263    
264     return txt
265    
266    
267     def sched_fix_parameter(self):
268     """
269     Returns string with requirements and scheduler-specific parameters
270     """
271    
272     if self.EDG_requirements:
273     req = self.EDG_requirements
274     taskReq = {'commonRequirements':req}
275     common._db.updateTask_(taskReq)
276    
277     def pickRemoteSubmissionHost(self, task):
278    
279     task = common._db.getTask()
280    
281     if task['serverName']!=None and task['serverName']!="":
282     # remoteHost is already defined and stored for this task
283     # so pick it from DB
284     # cast to string to avoid issues with unicode :-(
285     remoteUserHost=str(task['serverName'])
286     common.logger.info("serverName from Task DB is %s" %
287     remoteUserHost)
288     else:
289     if self.cfg_params.has_key('CRAB.submit_host'):
290     # get a remote submission host from crab config file
291     srvCfg=ServerConfig(self.cfg_params['CRAB.submit_host']).config()
292 belforte 1.15 remoteUserHost=srvCfg['serverName']
293     common.logger.info("remotehost from crab.cfg = %s" % remoteUserHost)
294 belforte 1.1 else:
295     # pick from Available Servers List
296     srvCfg=ServerConfig('default').config()
297 belforte 1.15 remoteUserHost = srvCfg['serverName']
298     common.logger.info("remotehost from Avail.List = %s" % remoteUserHost)
299 belforte 1.1
300 belforte 1.15 if not remoteUserHost:
301 belforte 1.1 raise CrabException('FATAL ERROR: remoteHost not defined')
302    
303 belforte 1.15 if '@' in remoteUserHost:
304     remoteHost = remoteUserHost.split('@')[1]
305     else:
306     remoteHost = remoteUserHost
307 belforte 1.1
308     common._db.updateTask_({'serverName':remoteUserHost})
309    
310     return (remoteHost, remoteUserHost)