ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerRemoteglidein.py
Revision: 1.7
Committed: Mon Nov 5 19:50:10 2012 UTC (12 years, 5 months ago) by belforte
Content type: text/x-python
Branch: MAIN
Changes since 1.6: +6 -0 lines
Log Message:
allow additional_jdl_parameters. see https://savannah.cern.ch/bugs/index.php?98534

File Contents

# User Rev Content
1 belforte 1.1 """
2     Implements the Remote Glidein scheduler
3     """
4    
5     from SchedulerGrid import SchedulerGrid
6     from crab_exceptions import CrabException
7     from crab_util import runCommand
8     from ServerConfig import *
9     from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
10     import Scram
11    
12     import common
13     import os
14     import socket
15     import re
16     import commands
17    
18     # FUTURE: for python 2.4 & 2.6
19     try:
20     from hashlib import sha1
21     except:
22     from sha import sha as sha1
23    
24     class SchedulerRemoteglidein(SchedulerGrid) :
25     """
26     Class to implement the vanilla remote Condor scheduler
27     for a glidein frontend, the created JDL will not work
28     on valilla local condor.
29     Naming convention: Methods starting with 'ws' provide
30     the corresponding part of the job script
31     ('ws' stands for 'write script').
32     """
33    
34     def __init__(self):
35     SchedulerGrid.__init__(self,"REMOTEGLIDEIN")
36    
37     self.datasetPath = None
38     self.selectNoInput = None
39     self.OSBsize = 50*1000*1000 # 50 MB
40    
41     self.environment_unique_identifier = None
42 belforte 1.4 self.submissionDay = time.strftime("%y%m%d",time.localtime())
43 belforte 1.1
44     return
45    
46    
47     def configure(self, cfg_params):
48     """
49     Configure the scheduler with the config settings from the user
50     """
51    
52     SchedulerGrid.configure(self, cfg_params)
53    
54     self.proxyValid=0
55     self.dontCheckProxy=int(cfg_params.get("GRID.dont_check_proxy",0))
56     self.space_token = cfg_params.get("USER.space_token",None)
57     self.proxyServer= 'myproxy.cern.ch'
58     self.group = cfg_params.get("GRID.group", None)
59     self.role = cfg_params.get("GRID.role", None)
60     self.VO = cfg_params.get('GRID.virtual_organization','cms')
61 belforte 1.6 self.allowOverflow = cfg_params.get('GRID.allow_overflow', 1)
62 belforte 1.1
63     self.checkProxy()
64    
65    
66     try:
67     tmp = cfg_params['CMSSW.datasetpath']
68     if tmp.lower() == 'none':
69     self.datasetPath = None
70     self.selectNoInput = 1
71     else:
72     self.datasetPath = tmp
73     self.selectNoInput = 0
74     except KeyError:
75     msg = "Error: datasetpath not defined "
76     raise CrabException(msg)
77    
78     if cfg_params.get('GRID.ce_black_list', None) or \
79     cfg_params.get('GRID.ce_white_list', None) :
80     msg="BEWARE: scheduler REMOTEGLIDEIN ignores CE black/white lists."
81     msg+="\n Remove them from crab configuration to proceed."
82     msg+="\n Use GRID.se_white_list and/or GRID.se_black_list instead"
83     raise CrabException(msg)
84    
85 belforte 1.5
86     # make sure proxy FQAN has not changed since last time
87     command = "voms-proxy-info -identity -fqan 2>/dev/null"
88     command += " | head -2"
89     identity = runCommand(command)
90     idfile = common.work_space.shareDir() + "GridIdentity"
91     if os.access(idfile, os.F_OK) :
92     # identity file exists from previous commands
93     f=open(idfile, 'r')
94     idFromFile=f.read()
95     f.close()
96     else :
97     # create it
98     f=open(idfile, 'w')
99     f.write(identity)
100     f.close()
101     idFromFile = identity
102    
103     if identity != idFromFile:
104     msg = "Wrong Grid Credentials:\n%s" % identity
105     msg += "\nMake sure you have "
106     msg += " DN, FQAN =\n%s" % idFromFile
107     raise CrabException(msg)
108    
109 belforte 1.1 return
110    
111     def userName(self):
112     """ return the user name """
113     tmp=runCommand("voms-proxy-info -identity 2>/dev/null")
114     return tmp.strip()
115    
116     def envUniqueID(self):
117     taskHash = sha1(common._db.queryTask('name')).hexdigest()
118     id = "https://" + socket.gethostname() + '/' + taskHash + "/${NJob}"
119     return id
120    
121     def sched_parameter(self, i, task):
122     """
123     Return scheduler-specific parameters. Used at crab -submit time
124     by $CRABPYTHON/Scheduler.py
125     """
126    
127     #SB paste from crab ScheduerGlidein
128    
129     jobParams = ""
130    
131     (self.remoteHost,self.remoteUserHost) = self.pickRemoteSubmissionHost(task)
132    
133     seDest = task.jobs[i-1]['dlsDestination']
134    
135     if seDest == [''] :
136     seDest = self.blackWhiteListParser.expandList("T") # all of SiteDB
137    
138     seString=self.blackWhiteListParser.cleanForBlackWhiteList(seDest)
139    
140     jobParams += '+DESIRED_SEs = "'+seString+'"; '
141    
142     scram = Scram.Scram(None)
143     cmsVersion = scram.getSWVersion()
144     scramArch = scram.getArch()
145    
146     cmsver=re.split('_', cmsVersion)
147     numericCmsVersion = "%s%.2d%.2d" %(cmsver[1], int(cmsver[2]), int(cmsver[3]))
148    
149     jobParams += '+DESIRED_CMSVersion ="' +cmsVersion+'";'
150     jobParams += '+DESIRED_CMSVersionNr ="' +numericCmsVersion+'";'
151     jobParams += '+DESIRED_CMSScramArch ="' +scramArch+'";'
152    
153     myscheddName = self.remoteHost
154 belforte 1.4 jobParams += '+Glidein_MonitorID = "https://'+ myscheddName + \
155     '//' + self.submissionDay + '//$(Cluster).$(Process)"; '
156 belforte 1.1
157     if (self.EDG_clock_time):
158     jobParams += '+MaxWallTimeMins = '+self.EDG_clock_time+'; '
159     else:
160     jobParams += '+MaxWallTimeMins = %d; ' % (60*24)
161    
162 belforte 1.6 if self.allowOverflow == "0":
163     jobParams += '+CMS_ALLOW_OVERFLOW = False; '
164    
165 belforte 1.7 if self.EDG_addJdlParam:
166     if self.EDG_addJdlParam[-1] == '':
167     self.EDG_addJdlParam = self.EDG_addJdlParam[:-1]
168     for p in self.EDG_addJdlParam:
169     jobParams += p.strip()+';\n'
170    
171 belforte 1.1 common._db.updateTask_({'jobType':jobParams})
172    
173    
174     return jobParams
175    
176    
177     def realSchedParams(self, cfg_params):
178     """
179     Return dictionary with specific parameters, to use with real scheduler
180     is called when scheduler is initialized in Boss, i.e. at each crab command
181     """
182     #SB this method is used to pass directory names to Boss Scheduler
183     # via params dictionary
184    
185     jobDir = common.work_space.jobDir()
186     taskDir=common.work_space.topDir().split('/')[-2]
187     shareDir = common.work_space.shareDir()
188    
189     params = {'shareDir':shareDir,
190     'jobDir':jobDir,
191 belforte 1.3 'taskDir':taskDir,
192 belforte 1.4 'submissionDay':self.submissionDay}
193 belforte 1.1
194     return params
195    
196    
197     def listMatch(self, seList, full):
198     """
199     Check the compatibility of available resources
200     """
201    
202     return [True]
203    
204    
205     def decodeLogInfo(self, fileName):
206     """
207     Parse logging info file and return main info
208     """
209    
210     import CondorGLoggingInfo
211     loggingInfo = CondorGLoggingInfo.CondorGLoggingInfo()
212     reason = loggingInfo.decodeReason(fileName)
213     return reason
214    
215    
216     # def wsCopyOutput(self):
217     # """
218     # Write a CopyResults part of a job script, e.g.
219     # to copy produced output into a storage element.
220     # """
221     # txt = self.wsCopyOutput()
222     # return txt
223    
224    
225     def wsExitFunc(self):
226     """
227     Returns the part of the job script which runs prior to exit
228     """
229    
230     txt = '\n'
231     txt += '#\n'
232     txt += '# EXECUTE THIS FUNCTION BEFORE EXIT \n'
233     txt += '#\n\n'
234    
235     txt += 'func_exit() { \n'
236     txt += self.wsExitFunc_common()
237    
238     txt += ' tar zcvf ${out_files}.tgz ${final_list}\n'
239     txt += ' tmp_size=`ls -gGrta ${out_files}.tgz | awk \'{ print $3 }\'`\n'
240     txt += ' rm ${out_files}.tgz\n'
241     txt += ' size=`expr $tmp_size`\n'
242     txt += ' echo "Total Output dimension: $size"\n'
243     txt += ' limit='+str(self.OSBsize) +' \n'
244     txt += ' echo "WARNING: output files size limit is set to: $limit"\n'
245     txt += ' if [ "$limit" -lt "$size" ]; then\n'
246     txt += ' exceed=1\n'
247     txt += ' job_exit_code=70000\n'
248     txt += ' echo "Output Sanbox too big. Produced output is lost "\n'
249     txt += ' else\n'
250     txt += ' exceed=0\n'
251     txt += ' echo "Total Output dimension $size is fine."\n'
252     txt += ' fi\n'
253    
254     txt += ' echo "JOB_EXIT_STATUS = $job_exit_code"\n'
255     txt += ' echo "JobExitCode=$job_exit_code" >> $RUNTIME_AREA/$repo\n'
256     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
257     txt += ' if [ $exceed -ne 1 ]; then\n'
258     txt += ' tar zcvf ${out_files}.tgz ${final_list}\n'
259     txt += ' else\n'
260     txt += ' tar zcvf ${out_files}.tgz CMSSW_${NJob}.stdout CMSSW_${NJob}.stderr\n'
261     txt += ' fi\n'
262     txt += ' python $RUNTIME_AREA/fillCrabFjr.py $RUNTIME_AREA/crab_fjr_$NJob.xml --errorcode $job_exit_code \n'
263    
264     txt += ' exit $job_exit_code\n'
265     txt += '}\n'
266    
267     return txt
268    
269    
270     def sched_fix_parameter(self):
271     """
272     Returns string with requirements and scheduler-specific parameters
273     """
274    
275     if self.EDG_requirements:
276     req = self.EDG_requirements
277     taskReq = {'commonRequirements':req}
278     common._db.updateTask_(taskReq)
279    
280     def pickRemoteSubmissionHost(self, task):
281    
282     task = common._db.getTask()
283    
284     if task['serverName']!=None and task['serverName']!="":
285     # remoteHost is already defined and stored for this task
286     # so pick it from DB
287     # cast to string to avoid issues with unicode :-(
288     remoteUserHost=str(task['serverName'])
289     common.logger.info("serverName from Task DB is %s" %
290     remoteUserHost)
291     if '@' in remoteUserHost:
292     remoteHost = remoteUserHost.split('@')[1]
293     else:
294     remoteHost = remoteUserHost
295     else:
296     if self.cfg_params.has_key('CRAB.submit_host'):
297     # get a remote submission host from crab config file
298     srvCfg=ServerConfig(self.cfg_params['CRAB.submit_host']).config()
299     remoteHost=srvCfg['serverName']
300     common.logger.info("remotehost from crab.cfg = %s" % remoteHost)
301     else:
302     # pick from Available Servers List
303     srvCfg=ServerConfig('default').config()
304     remoteHost = srvCfg['serverName']
305     common.logger.info("remotehost from Avail.List = %s" % remoteHost)
306    
307     if not remoteHost:
308     raise CrabException('FATAL ERROR: remoteHost not defined')
309    
310 belforte 1.3 #common.logger.info("try to find out username for remote Host via uberftp ...")
311     #command="uberftp %s pwd|grep User|awk '{print $3}'" % remoteHost
312     #(status, output) = commands.getstatusoutput(command)
313     #if status == 0:
314     # remoteUser = output
315     # common.logger.info("remoteUser set to %s" % remoteUser)
316     # if remoteUser==None:
317     # raise CrabException('FATAL ERROR: REMOTE USER not defined')
318 belforte 1.1
319 belforte 1.3 #remoteUserHost = remoteUser + '@' + remoteHost
320     remoteUserHost = remoteHost
321 belforte 1.1
322     common._db.updateTask_({'serverName':remoteUserHost})
323    
324     return (remoteHost, remoteUserHost)