ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerRemoteglidein.py
Revision: 1.14
Committed: Thu Jan 17 14:46:45 2013 UTC (12 years, 3 months ago) by belforte
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_8_5_patch1, CRAB_2_8_5, CRAB_2_8_5_pre5, CRAB_2_8_5_pre4, CRAB_2_8_5_pre3
Changes since 1.13: +0 -32 lines
Log Message:
move OSB sixe check to Scheduler.py https://savannah.cern.ch/bugs/index.php?95466

File Contents

# User Rev Content
1 belforte 1.1 """
2     Implements the Remote Glidein scheduler
3     """
4    
5     from SchedulerGrid import SchedulerGrid
6     from crab_exceptions import CrabException
7     from crab_util import runCommand
8     from ServerConfig import *
9     from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
10     import Scram
11    
12     import common
13     import os
14     import socket
15     import re
16     import commands
17    
18     # FUTURE: for python 2.4 & 2.6
19     try:
20     from hashlib import sha1
21     except:
22     from sha import sha as sha1
23    
24     class SchedulerRemoteglidein(SchedulerGrid) :
25     """
26     Class to implement the vanilla remote Condor scheduler
27     for a glidein frontend, the created JDL will not work
28     on valilla local condor.
29     Naming convention: Methods starting with 'ws' provide
30     the corresponding part of the job script
31     ('ws' stands for 'write script').
32     """
33    
34     def __init__(self):
35     SchedulerGrid.__init__(self,"REMOTEGLIDEIN")
36    
37     self.datasetPath = None
38     self.selectNoInput = None
39     self.OSBsize = 50*1000*1000 # 50 MB
40    
41     self.environment_unique_identifier = None
42 belforte 1.4 self.submissionDay = time.strftime("%y%m%d",time.localtime())
43 belforte 1.1
44     return
45    
46    
47     def configure(self, cfg_params):
48     """
49     Configure the scheduler with the config settings from the user
50     """
51    
52     SchedulerGrid.configure(self, cfg_params)
53    
54     self.proxyValid=0
55 belforte 1.8 self.dontCheckProxy=int(cfg_params.get("GRID.dont_check_proxy",'0'))
56 belforte 1.1 self.space_token = cfg_params.get("USER.space_token",None)
57     self.proxyServer= 'myproxy.cern.ch'
58     self.group = cfg_params.get("GRID.group", None)
59     self.role = cfg_params.get("GRID.role", None)
60     self.VO = cfg_params.get('GRID.virtual_organization','cms')
61 belforte 1.8 self.allowOverflow = cfg_params.get('GRID.allow_overflow', '1')
62 belforte 1.13 self.max_rss = cfg_params.get('GRID.max_rss','2000')
63 belforte 1.1
64     self.checkProxy()
65    
66    
67     try:
68     tmp = cfg_params['CMSSW.datasetpath']
69     if tmp.lower() == 'none':
70     self.datasetPath = None
71     self.selectNoInput = 1
72     else:
73     self.datasetPath = tmp
74     self.selectNoInput = 0
75     except KeyError:
76     msg = "Error: datasetpath not defined "
77     raise CrabException(msg)
78    
79     if cfg_params.get('GRID.ce_black_list', None) or \
80     cfg_params.get('GRID.ce_white_list', None) :
81     msg="BEWARE: scheduler REMOTEGLIDEIN ignores CE black/white lists."
82     msg+="\n Remove them from crab configuration to proceed."
83     msg+="\n Use GRID.se_white_list and/or GRID.se_black_list instead"
84     raise CrabException(msg)
85    
86 belforte 1.5
87     # make sure proxy FQAN has not changed since last time
88     command = "voms-proxy-info -identity -fqan 2>/dev/null"
89     command += " | head -2"
90     identity = runCommand(command)
91     idfile = common.work_space.shareDir() + "GridIdentity"
92     if os.access(idfile, os.F_OK) :
93     # identity file exists from previous commands
94     f=open(idfile, 'r')
95     idFromFile=f.read()
96     f.close()
97     else :
98     # create it
99     f=open(idfile, 'w')
100     f.write(identity)
101     f.close()
102     idFromFile = identity
103    
104     if identity != idFromFile:
105     msg = "Wrong Grid Credentials:\n%s" % identity
106     msg += "\nMake sure you have "
107     msg += " DN, FQAN =\n%s" % idFromFile
108     raise CrabException(msg)
109    
110 belforte 1.1 return
111    
112     def userName(self):
113     """ return the user name """
114     tmp=runCommand("voms-proxy-info -identity 2>/dev/null")
115     return tmp.strip()
116    
117     def envUniqueID(self):
118     taskHash = sha1(common._db.queryTask('name')).hexdigest()
119     id = "https://" + socket.gethostname() + '/' + taskHash + "/${NJob}"
120     return id
121    
122     def sched_parameter(self, i, task):
123     """
124     Return scheduler-specific parameters. Used at crab -submit time
125     by $CRABPYTHON/Scheduler.py
126     """
127    
128 belforte 1.8 #SB paste from crab SchedulerGlidein
129 belforte 1.1
130     jobParams = ""
131    
132     (self.remoteHost,self.remoteUserHost) = self.pickRemoteSubmissionHost(task)
133    
134     seDest = task.jobs[i-1]['dlsDestination']
135    
136     if seDest == [''] :
137     seDest = self.blackWhiteListParser.expandList("T") # all of SiteDB
138    
139     seString=self.blackWhiteListParser.cleanForBlackWhiteList(seDest)
140    
141     jobParams += '+DESIRED_SEs = "'+seString+'"; '
142    
143     scram = Scram.Scram(None)
144     cmsVersion = scram.getSWVersion()
145     scramArch = scram.getArch()
146    
147     cmsver=re.split('_', cmsVersion)
148     numericCmsVersion = "%s%.2d%.2d" %(cmsver[1], int(cmsver[2]), int(cmsver[3]))
149    
150     jobParams += '+DESIRED_CMSVersion ="' +cmsVersion+'";'
151     jobParams += '+DESIRED_CMSVersionNr ="' +numericCmsVersion+'";'
152     jobParams += '+DESIRED_CMSScramArch ="' +scramArch+'";'
153    
154     myscheddName = self.remoteHost
155 belforte 1.4 jobParams += '+Glidein_MonitorID = "https://'+ myscheddName + \
156     '//' + self.submissionDay + '//$(Cluster).$(Process)"; '
157 belforte 1.1
158     if (self.EDG_clock_time):
159 belforte 1.10 glideinTime = "%d" % (int(self.EDG_clock_time)+5) # 5 min to wrapup
160     jobParams += '+MaxWallTimeMins = '+ glideinTime + '; '
161 belforte 1.1 else:
162 belforte 1.11 jobParams += '+MaxWallTimeMins = %d; ' % (60*22 - 5) # 22h default in glidein, 5min to wrap
163 belforte 1.1
164 belforte 1.8 if self.max_rss :
165     jobParams += 'request_memory = '+self.max_rss+';'
166    
167 belforte 1.6 if self.allowOverflow == "0":
168     jobParams += '+CMS_ALLOW_OVERFLOW = False; '
169    
170 belforte 1.7 if self.EDG_addJdlParam:
171     if self.EDG_addJdlParam[-1] == '':
172     self.EDG_addJdlParam = self.EDG_addJdlParam[:-1]
173     for p in self.EDG_addJdlParam:
174     jobParams += p.strip()+';\n'
175    
176 belforte 1.1 common._db.updateTask_({'jobType':jobParams})
177    
178    
179     return jobParams
180    
181    
182     def realSchedParams(self, cfg_params):
183     """
184     Return dictionary with specific parameters, to use with real scheduler
185     is called when scheduler is initialized in Boss, i.e. at each crab command
186     """
187     #SB this method is used to pass directory names to Boss Scheduler
188     # via params dictionary
189    
190     jobDir = common.work_space.jobDir()
191     taskDir=common.work_space.topDir().split('/')[-2]
192     shareDir = common.work_space.shareDir()
193    
194     params = {'shareDir':shareDir,
195     'jobDir':jobDir,
196 belforte 1.3 'taskDir':taskDir,
197 belforte 1.4 'submissionDay':self.submissionDay}
198 belforte 1.1
199     return params
200    
201    
202     def listMatch(self, seList, full):
203     """
204     Check the compatibility of available resources
205     """
206    
207     return [True]
208    
209    
210     def decodeLogInfo(self, fileName):
211     """
212     Parse logging info file and return main info
213     """
214    
215     import CondorGLoggingInfo
216     loggingInfo = CondorGLoggingInfo.CondorGLoggingInfo()
217     reason = loggingInfo.decodeReason(fileName)
218     return reason
219    
220    
221     # def wsCopyOutput(self):
222     # """
223     # Write a CopyResults part of a job script, e.g.
224     # to copy produced output into a storage element.
225     # """
226     # txt = self.wsCopyOutput()
227     # return txt
228    
229    
230     def wsExitFunc(self):
231     """
232     Returns the part of the job script which runs prior to exit
233     """
234    
235     txt = '\n'
236     txt += '#\n'
237     txt += '# EXECUTE THIS FUNCTION BEFORE EXIT \n'
238     txt += '#\n\n'
239    
240     txt += 'func_exit() { \n'
241     txt += self.wsExitFunc_common()
242     txt += ' exit $job_exit_code\n'
243     txt += '}\n'
244    
245     return txt
246    
247    
248     def sched_fix_parameter(self):
249     """
250     Returns string with requirements and scheduler-specific parameters
251     """
252    
253     if self.EDG_requirements:
254     req = self.EDG_requirements
255     taskReq = {'commonRequirements':req}
256     common._db.updateTask_(taskReq)
257    
258     def pickRemoteSubmissionHost(self, task):
259    
260     task = common._db.getTask()
261    
262     if task['serverName']!=None and task['serverName']!="":
263     # remoteHost is already defined and stored for this task
264     # so pick it from DB
265     # cast to string to avoid issues with unicode :-(
266     remoteUserHost=str(task['serverName'])
267     common.logger.info("serverName from Task DB is %s" %
268     remoteUserHost)
269     if '@' in remoteUserHost:
270     remoteHost = remoteUserHost.split('@')[1]
271     else:
272     remoteHost = remoteUserHost
273     else:
274     if self.cfg_params.has_key('CRAB.submit_host'):
275     # get a remote submission host from crab config file
276     srvCfg=ServerConfig(self.cfg_params['CRAB.submit_host']).config()
277     remoteHost=srvCfg['serverName']
278     common.logger.info("remotehost from crab.cfg = %s" % remoteHost)
279     else:
280     # pick from Available Servers List
281     srvCfg=ServerConfig('default').config()
282     remoteHost = srvCfg['serverName']
283     common.logger.info("remotehost from Avail.List = %s" % remoteHost)
284    
285     if not remoteHost:
286     raise CrabException('FATAL ERROR: remoteHost not defined')
287    
288 belforte 1.3 #common.logger.info("try to find out username for remote Host via uberftp ...")
289     #command="uberftp %s pwd|grep User|awk '{print $3}'" % remoteHost
290     #(status, output) = commands.getstatusoutput(command)
291     #if status == 0:
292     # remoteUser = output
293     # common.logger.info("remoteUser set to %s" % remoteUser)
294     # if remoteUser==None:
295     # raise CrabException('FATAL ERROR: REMOTE USER not defined')
296 belforte 1.1
297 belforte 1.3 #remoteUserHost = remoteUser + '@' + remoteHost
298     remoteUserHost = remoteHost
299 belforte 1.1
300     common._db.updateTask_({'serverName':remoteUserHost})
301    
302     return (remoteHost, remoteUserHost)