ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerRemoteglidein.py
Revision: 1.3
Committed: Mon Sep 17 15:34:25 2012 UTC (12 years, 7 months ago) by belforte
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_8_3_pre2, CRAB_2_8_3_pre1
Changes since 1.2: +13 -10 lines
Log Message:
no need for uberftp, add submissionDay for schedulerId

File Contents

# User Rev Content
1 belforte 1.1 """
2     Implements the Remote Glidein scheduler
3     """
4    
5     from SchedulerGrid import SchedulerGrid
6     from crab_exceptions import CrabException
7     from crab_util import runCommand
8     from ServerConfig import *
9     from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
10     import Scram
11    
12     import common
13     import os
14     import socket
15     import re
16     import commands
17    
18     # FUTURE: for python 2.4 & 2.6
19     try:
20     from hashlib import sha1
21     except:
22     from sha import sha as sha1
23    
24     class SchedulerRemoteglidein(SchedulerGrid) :
25     """
26     Class to implement the vanilla remote Condor scheduler
27     for a glidein frontend, the created JDL will not work
28     on valilla local condor.
29     Naming convention: Methods starting with 'ws' provide
30     the corresponding part of the job script
31     ('ws' stands for 'write script').
32     """
33    
34     def __init__(self):
35     SchedulerGrid.__init__(self,"REMOTEGLIDEIN")
36    
37     self.datasetPath = None
38     self.selectNoInput = None
39     self.OSBsize = 50*1000*1000 # 50 MB
40    
41     self.environment_unique_identifier = None
42    
43     return
44    
45    
46     def configure(self, cfg_params):
47     """
48     Configure the scheduler with the config settings from the user
49     """
50    
51     SchedulerGrid.configure(self, cfg_params)
52    
53     self.proxyValid=0
54     self.dontCheckProxy=int(cfg_params.get("GRID.dont_check_proxy",0))
55     self.space_token = cfg_params.get("USER.space_token",None)
56     self.proxyServer= 'myproxy.cern.ch'
57     self.group = cfg_params.get("GRID.group", None)
58     self.role = cfg_params.get("GRID.role", None)
59     self.VO = cfg_params.get('GRID.virtual_organization','cms')
60    
61     self.checkProxy()
62    
63    
64     try:
65     tmp = cfg_params['CMSSW.datasetpath']
66     if tmp.lower() == 'none':
67     self.datasetPath = None
68     self.selectNoInput = 1
69     else:
70     self.datasetPath = tmp
71     self.selectNoInput = 0
72     except KeyError:
73     msg = "Error: datasetpath not defined "
74     raise CrabException(msg)
75    
76     if cfg_params.get('GRID.ce_black_list', None) or \
77     cfg_params.get('GRID.ce_white_list', None) :
78     msg="BEWARE: scheduler REMOTEGLIDEIN ignores CE black/white lists."
79     msg+="\n Remove them from crab configuration to proceed."
80     msg+="\n Use GRID.se_white_list and/or GRID.se_black_list instead"
81     raise CrabException(msg)
82    
83     return
84    
85     def userName(self):
86     """ return the user name """
87     tmp=runCommand("voms-proxy-info -identity 2>/dev/null")
88     return tmp.strip()
89    
90     def envUniqueID(self):
91     taskHash = sha1(common._db.queryTask('name')).hexdigest()
92     id = "https://" + socket.gethostname() + '/' + taskHash + "/${NJob}"
93     return id
94    
95     def sched_parameter(self, i, task):
96     """
97     Return scheduler-specific parameters. Used at crab -submit time
98     by $CRABPYTHON/Scheduler.py
99     """
100    
101     #SB paste from crab ScheduerGlidein
102    
103     jobParams = ""
104    
105     (self.remoteHost,self.remoteUserHost) = self.pickRemoteSubmissionHost(task)
106    
107     seDest = task.jobs[i-1]['dlsDestination']
108    
109     if seDest == [''] :
110     seDest = self.blackWhiteListParser.expandList("T") # all of SiteDB
111    
112     seString=self.blackWhiteListParser.cleanForBlackWhiteList(seDest)
113    
114     jobParams += '+DESIRED_SEs = "'+seString+'"; '
115    
116     scram = Scram.Scram(None)
117     cmsVersion = scram.getSWVersion()
118     scramArch = scram.getArch()
119    
120     cmsver=re.split('_', cmsVersion)
121     numericCmsVersion = "%s%.2d%.2d" %(cmsver[1], int(cmsver[2]), int(cmsver[3]))
122    
123     jobParams += '+DESIRED_CMSVersion ="' +cmsVersion+'";'
124     jobParams += '+DESIRED_CMSVersionNr ="' +numericCmsVersion+'";'
125     jobParams += '+DESIRED_CMSScramArch ="' +scramArch+'";'
126    
127     myscheddName = self.remoteHost
128     jobParams += '+Glidein_MonitorID = "https://'+ myscheddName + '//$(Cluster).$(Process)"; '
129    
130     if (self.EDG_clock_time):
131     jobParams += '+MaxWallTimeMins = '+self.EDG_clock_time+'; '
132     else:
133     jobParams += '+MaxWallTimeMins = %d; ' % (60*24)
134    
135     common._db.updateTask_({'jobType':jobParams})
136    
137    
138     return jobParams
139    
140    
141     def realSchedParams(self, cfg_params):
142     """
143     Return dictionary with specific parameters, to use with real scheduler
144     is called when scheduler is initialized in Boss, i.e. at each crab command
145     """
146     #SB this method is used to pass directory names to Boss Scheduler
147     # via params dictionary
148    
149     jobDir = common.work_space.jobDir()
150     taskDir=common.work_space.topDir().split('/')[-2]
151     shareDir = common.work_space.shareDir()
152 belforte 1.3 submissionDay = time.strftime("%y%m%d",time.localtime())
153 belforte 1.1
154     params = {'shareDir':shareDir,
155     'jobDir':jobDir,
156 belforte 1.3 'taskDir':taskDir,
157     'submissionDay':submissionDay}
158 belforte 1.1
159     return params
160    
161    
162     def listMatch(self, seList, full):
163     """
164     Check the compatibility of available resources
165     """
166    
167     return [True]
168    
169    
170     def decodeLogInfo(self, fileName):
171     """
172     Parse logging info file and return main info
173     """
174    
175     import CondorGLoggingInfo
176     loggingInfo = CondorGLoggingInfo.CondorGLoggingInfo()
177     reason = loggingInfo.decodeReason(fileName)
178     return reason
179    
180    
181     # def wsCopyOutput(self):
182     # """
183     # Write a CopyResults part of a job script, e.g.
184     # to copy produced output into a storage element.
185     # """
186     # txt = self.wsCopyOutput()
187     # return txt
188    
189    
190     def wsExitFunc(self):
191     """
192     Returns the part of the job script which runs prior to exit
193     """
194    
195     txt = '\n'
196     txt += '#\n'
197     txt += '# EXECUTE THIS FUNCTION BEFORE EXIT \n'
198     txt += '#\n\n'
199    
200     txt += 'func_exit() { \n'
201     txt += self.wsExitFunc_common()
202    
203     txt += ' tar zcvf ${out_files}.tgz ${final_list}\n'
204     txt += ' tmp_size=`ls -gGrta ${out_files}.tgz | awk \'{ print $3 }\'`\n'
205     txt += ' rm ${out_files}.tgz\n'
206     txt += ' size=`expr $tmp_size`\n'
207     txt += ' echo "Total Output dimension: $size"\n'
208     txt += ' limit='+str(self.OSBsize) +' \n'
209     txt += ' echo "WARNING: output files size limit is set to: $limit"\n'
210     txt += ' if [ "$limit" -lt "$size" ]; then\n'
211     txt += ' exceed=1\n'
212     txt += ' job_exit_code=70000\n'
213     txt += ' echo "Output Sanbox too big. Produced output is lost "\n'
214     txt += ' else\n'
215     txt += ' exceed=0\n'
216     txt += ' echo "Total Output dimension $size is fine."\n'
217     txt += ' fi\n'
218    
219     txt += ' echo "JOB_EXIT_STATUS = $job_exit_code"\n'
220     txt += ' echo "JobExitCode=$job_exit_code" >> $RUNTIME_AREA/$repo\n'
221     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
222     txt += ' if [ $exceed -ne 1 ]; then\n'
223     txt += ' tar zcvf ${out_files}.tgz ${final_list}\n'
224     txt += ' else\n'
225     txt += ' tar zcvf ${out_files}.tgz CMSSW_${NJob}.stdout CMSSW_${NJob}.stderr\n'
226     txt += ' fi\n'
227     txt += ' python $RUNTIME_AREA/fillCrabFjr.py $RUNTIME_AREA/crab_fjr_$NJob.xml --errorcode $job_exit_code \n'
228    
229     txt += ' exit $job_exit_code\n'
230     txt += '}\n'
231    
232     return txt
233    
234    
235     def sched_fix_parameter(self):
236     """
237     Returns string with requirements and scheduler-specific parameters
238     """
239    
240     if self.EDG_requirements:
241     req = self.EDG_requirements
242     taskReq = {'commonRequirements':req}
243     common._db.updateTask_(taskReq)
244    
245     def pickRemoteSubmissionHost(self, task):
246    
247     task = common._db.getTask()
248    
249     if task['serverName']!=None and task['serverName']!="":
250     # remoteHost is already defined and stored for this task
251     # so pick it from DB
252     # cast to string to avoid issues with unicode :-(
253     remoteUserHost=str(task['serverName'])
254     common.logger.info("serverName from Task DB is %s" %
255     remoteUserHost)
256     if '@' in remoteUserHost:
257     remoteHost = remoteUserHost.split('@')[1]
258     else:
259     remoteHost = remoteUserHost
260     else:
261     if self.cfg_params.has_key('CRAB.submit_host'):
262     # get a remote submission host from crab config file
263     srvCfg=ServerConfig(self.cfg_params['CRAB.submit_host']).config()
264     remoteHost=srvCfg['serverName']
265     common.logger.info("remotehost from crab.cfg = %s" % remoteHost)
266     else:
267     # pick from Available Servers List
268     srvCfg=ServerConfig('default').config()
269     remoteHost = srvCfg['serverName']
270     common.logger.info("remotehost from Avail.List = %s" % remoteHost)
271    
272     if not remoteHost:
273     raise CrabException('FATAL ERROR: remoteHost not defined')
274    
275 belforte 1.3 #common.logger.info("try to find out username for remote Host via uberftp ...")
276     #command="uberftp %s pwd|grep User|awk '{print $3}'" % remoteHost
277     #(status, output) = commands.getstatusoutput(command)
278     #if status == 0:
279     # remoteUser = output
280     # common.logger.info("remoteUser set to %s" % remoteUser)
281     # if remoteUser==None:
282     # raise CrabException('FATAL ERROR: REMOTE USER not defined')
283 belforte 1.1
284 belforte 1.3 #remoteUserHost = remoteUser + '@' + remoteHost
285     remoteUserHost = remoteHost
286 belforte 1.1
287     common._db.updateTask_({'serverName':remoteUserHost})
288    
289     return (remoteHost, remoteUserHost)