ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerRcondor.py
Revision: 1.12
Committed: Thu Aug 23 13:25:47 2012 UTC (12 years, 8 months ago) by belforte
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_9_1, CRAB_2_9_1_pre2, CRAB_2_9_1_pre1, CRAB_2_9_0, CRAB_2_9_0_pre2, CRAB_2_9_0_pre1, CRAB_2_8_8, CRAB_2_8_8_pre1, CRAB_2_8_7_patch3, CRAB_2_8_7_patch2, CRAB_2_8_7_patch1, CRAB_2_8_7, CRAB_2_8_7_pre2, CRAB_2_8_7_pre1, CRAB_2_8_6, CRAB_2_8_6_pre1, CRAB_2_8_5_patch3, CRAB_2_8_5_patch2, CRAB_2_8_5_patch1, CRAB_2_8_5, CRAB_2_8_5_pre5, CRAB_2_8_5_pre4, CRAB_2_8_5_pre3, CRAB_2_8_4_patch3, CRAB_2_8_5_pre2, CRAB_2_8_4_patch2, CRAB_2_8_5_pre1, CRAB_2_8_4_patch1, CRAB_2_8_4, CRAB_2_8_4_pre5, CRAB_2_8_4_pre4, CRAB_2_8_4_pre3, CRAB_2_8_4_pre2, CRAB_2_8_4_pre1, CRAB_2_8_3, CRAB_2_8_3_pre4, CRAB_2_8_3_pre3, CRAB_2_8_3_pre2, CRAB_2_8_3_pre1, HEAD
Changes since 1.11: +98 -13 lines
Log Message:
no env.var anymore, use AvailServerList

File Contents

# User Rev Content
1 belforte 1.1 """
2     Implements the vanilla (local) Remote Condor scheduler
3     """
4    
5     from SchedulerGrid import SchedulerGrid
6     from crab_exceptions import CrabException
7     from crab_util import runCommand
8 belforte 1.12 from ServerConfig import *
9 belforte 1.2 from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
10 belforte 1.10 import Scram
11 belforte 1.2
12 belforte 1.1
13     import common
14     import os
15     import socket
16 belforte 1.9 import re
17 belforte 1.12 import commands
18 belforte 1.1
19     # FUTURE: for python 2.4 & 2.6
20     try:
21     from hashlib import sha1
22     except:
23     from sha import sha as sha1
24    
25     class SchedulerRcondor(SchedulerGrid) :
26     """
27 belforte 1.12 Class to implement the vanilla remote Condor scheduler
28 belforte 1.1 Naming convention: Methods starting with 'ws' provide
29     the corresponding part of the job script
30     ('ws' stands for 'write script').
31     """
32    
33     def __init__(self):
34     SchedulerGrid.__init__(self,"RCONDOR")
35 belforte 1.12
36 belforte 1.1 self.datasetPath = None
37     self.selectNoInput = None
38 belforte 1.2 self.OSBsize = 50*1000*1000 # 50 MB
39 belforte 1.1
40     self.environment_unique_identifier = None
41 belforte 1.9
42 belforte 1.1 return
43    
44    
45     def configure(self, cfg_params):
46     """
47     Configure the scheduler with the config settings from the user
48     """
49 belforte 1.12
50     # task = common._db.getTask()
51     # #print task.__dict__
52     #
53     # if task['serverName']!=None and task['serverName']!="":
54     # # cast to string to avoid issues with unicode :-(
55     # self.rcondorUserHost=str(task['serverName'])
56     # common.logger.info("serverName from Task DB is %s" %
57     # self.rcondorUserHost)
58     # else :
59     # # get an rcondor host from config and save
60     # common.logger.info("no serverName in Task DB, use env.var.")
61     #
62     # self.rcondorHost = os.getenv('RCONDOR_HOST')
63     # if not self.rcondorHost :
64     # raise CrabException('FATAL ERROR: env.var RCONDOR_HOST not defined')
65     # self.rcondorUser = os.getenv('RCONDOR_USER')
66     # if not self.rcondorUser :
67     # common.logger.info("$RCONDOR_USER not defined, try to find out via uberftp ...")
68     # command="uberftp $RCONDOR_HOST pwd|grep User|awk '{print $3}'"
69     # (status, output) = commands.getstatusoutput(command)
70     # if status == 0:
71     # self.rcondorUser = output
72     # common.logger.info("rcondorUser set to %s" % self.rcondorUser)
73     # if self.rcondorUser==None:
74     # raise CrabException('FATAL ERROR: RCONDOR_USER not defined')
75     #
76     # self.rcondorUserHost = self.rcondorUser + '@' + self.rcondorHost
77     #
78     # print "will set server name to : ", self.rcondorUserHost
79     # common._db.updateTask_({'serverName':self.rcondorUserHost})
80     # print "ok"
81 belforte 1.1
82     SchedulerGrid.configure(self, cfg_params)
83    
84     self.proxyValid=0
85     self.dontCheckProxy=int(cfg_params.get("GRID.dont_check_proxy",0))
86     self.space_token = cfg_params.get("USER.space_token",None)
87 belforte 1.8 self.proxyServer= 'myproxy.cern.ch'
88 belforte 1.1 self.group = cfg_params.get("GRID.group", None)
89     self.role = cfg_params.get("GRID.role", None)
90     self.VO = cfg_params.get('GRID.virtual_organization','cms')
91 belforte 1.12
92     self.checkProxy()
93    
94    
95 belforte 1.1 try:
96     tmp = cfg_params['CMSSW.datasetpath']
97     if tmp.lower() == 'none':
98     self.datasetPath = None
99     self.selectNoInput = 1
100     else:
101     self.datasetPath = tmp
102     self.selectNoInput = 0
103     except KeyError:
104     msg = "Error: datasetpath not defined "
105     raise CrabException(msg)
106    
107 belforte 1.11 if cfg_params.get('GRID.ce_black_list', None) or \
108     cfg_params.get('GRID.ce_white_list', None) :
109     msg="BEWARE: scheduler RGLIDEIN ignores CE black/white lists."
110     msg+="\n Remove them from crab configuration to proceed."
111     msg+="\n Use GRID.se_white_list and/or GRID.se_black_list instead"
112     raise CrabException(msg)
113    
114 belforte 1.1 return
115    
116     def userName(self):
117     """ return the user name """
118     tmp=runCommand("voms-proxy-info -identity 2>/dev/null")
119     return tmp.strip()
120    
121     def envUniqueID(self):
122     taskHash = sha1(common._db.queryTask('name')).hexdigest()
123     id = "https://" + socket.gethostname() + '/' + taskHash + "/${NJob}"
124     return id
125    
126     def sched_parameter(self, i, task):
127     """
128 belforte 1.2 Return scheduler-specific parameters. Used at crab -submit time
129 belforte 1.12 by $CRABPYTHON/Scheduler.py
130 belforte 1.1 """
131    
132 belforte 1.2 #SB paste from crab ScheduerGlidein
133    
134     jobParams = ""
135    
136 belforte 1.12 (self.rcondorHost,self.rcondorUserHost) = self.pickRcondorSubmissionHost(task)
137    
138 belforte 1.2 seDest = task.jobs[i-1]['dlsDestination']
139    
140 belforte 1.4 if seDest == [''] :
141     seDest = self.blackWhiteListParser.expandList("T") # all of SiteDB
142    
143 belforte 1.2 seString=self.blackWhiteListParser.cleanForBlackWhiteList(seDest)
144    
145     jobParams += '+DESIRED_SEs = "'+seString+'"; '
146 belforte 1.9
147 belforte 1.10 scram = Scram.Scram(None)
148     cmsVersion = scram.getSWVersion()
149     scramArch = scram.getArch()
150    
151 belforte 1.9 cmsver=re.split('_', cmsVersion)
152     numericCmsVersion = "%s%.2d%.2d" %(cmsver[1], int(cmsver[2]), int(cmsver[3]))
153    
154     jobParams += '+DESIRED_CMSVersion ="' +cmsVersion+'";'
155     jobParams += '+DESIRED_CMSVersionNr ="' +numericCmsVersion+'";'
156 belforte 1.10 jobParams += '+DESIRED_CMSScramArch ="' +scramArch+'";'
157 belforte 1.9
158 belforte 1.12 myscheddName = self.rcondorHost
159     jobParams += '+Glidein_MonitorID = "https://'+ myscheddName + '//$(Cluster).$(Process)"; '
160 belforte 1.2
161     if (self.EDG_clock_time):
162     jobParams += '+MaxWallTimeMins = '+self.EDG_clock_time+'; '
163     else:
164     jobParams += '+MaxWallTimeMins = %d; ' % (60*24)
165    
166     common._db.updateTask_({'jobType':jobParams})
167    
168    
169     return jobParams
170 belforte 1.1
171    
172     def realSchedParams(self, cfg_params):
173     """
174     Return dictionary with specific parameters, to use with real scheduler
175 belforte 1.2 is called when scheduler is initialized in Boss, i.e. at each crab command
176 belforte 1.1 """
177 belforte 1.7 #SB this method is used to pass directory names to Boss Scheduler
178     # via params dictionary
179 belforte 1.1
180     jobDir = common.work_space.jobDir()
181 belforte 1.6 taskDir=common.work_space.topDir().split('/')[-2]
182 belforte 1.7 shareDir = common.work_space.shareDir()
183 belforte 1.1
184 belforte 1.12 params = {'shareDir':shareDir,
185 belforte 1.7 'jobDir':jobDir,
186     'taskDir':taskDir}
187 belforte 1.2
188 belforte 1.1 return params
189    
190    
191     def listMatch(self, seList, full):
192     """
193     Check the compatibility of available resources
194     """
195    
196     return [True]
197    
198    
199     def decodeLogInfo(self, fileName):
200     """
201     Parse logging info file and return main info
202     """
203    
204     import CondorGLoggingInfo
205     loggingInfo = CondorGLoggingInfo.CondorGLoggingInfo()
206     reason = loggingInfo.decodeReason(fileName)
207     return reason
208    
209    
210     # def wsCopyOutput(self):
211     # """
212     # Write a CopyResults part of a job script, e.g.
213     # to copy produced output into a storage element.
214     # """
215     # txt = self.wsCopyOutput()
216     # return txt
217    
218    
219     def wsExitFunc(self):
220     """
221     Returns the part of the job script which runs prior to exit
222     """
223    
224     txt = '\n'
225     txt += '#\n'
226     txt += '# EXECUTE THIS FUNCTION BEFORE EXIT \n'
227     txt += '#\n\n'
228    
229     txt += 'func_exit() { \n'
230     txt += self.wsExitFunc_common()
231    
232 belforte 1.2 txt += ' tar zcvf ${out_files}.tgz ${final_list}\n'
233     txt += ' tmp_size=`ls -gGrta ${out_files}.tgz | awk \'{ print $3 }\'`\n'
234     txt += ' rm ${out_files}.tgz\n'
235     txt += ' size=`expr $tmp_size`\n'
236     txt += ' echo "Total Output dimension: $size"\n'
237     txt += ' limit='+str(self.OSBsize) +' \n'
238     txt += ' echo "WARNING: output files size limit is set to: $limit"\n'
239     txt += ' if [ "$limit" -lt "$size" ]; then\n'
240     txt += ' exceed=1\n'
241     txt += ' job_exit_code=70000\n'
242     txt += ' echo "Output Sanbox too big. Produced output is lost "\n'
243     txt += ' else\n'
244     txt += ' exceed=0\n'
245     txt += ' echo "Total Output dimension $size is fine."\n'
246     txt += ' fi\n'
247    
248     txt += ' echo "JOB_EXIT_STATUS = $job_exit_code"\n'
249     txt += ' echo "JobExitCode=$job_exit_code" >> $RUNTIME_AREA/$repo\n'
250     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
251     txt += ' if [ $exceed -ne 1 ]; then\n'
252     txt += ' tar zcvf ${out_files}.tgz ${final_list}\n'
253     txt += ' else\n'
254     txt += ' tar zcvf ${out_files}.tgz CMSSW_${NJob}.stdout CMSSW_${NJob}.stderr\n'
255     txt += ' fi\n'
256     txt += ' python $RUNTIME_AREA/fillCrabFjr.py $RUNTIME_AREA/crab_fjr_$NJob.xml --errorcode $job_exit_code \n'
257 belforte 1.1
258     txt += ' exit $job_exit_code\n'
259     txt += '}\n'
260    
261     return txt
262    
263    
264     def sched_fix_parameter(self):
265     """
266     Returns string with requirements and scheduler-specific parameters
267     """
268    
269     if self.EDG_requirements:
270     req = self.EDG_requirements
271     taskReq = {'commonRequirements':req}
272     common._db.updateTask_(taskReq)
273    
274 belforte 1.12 def pickRcondorSubmissionHost(self, task):
275    
276     task = common._db.getTask()
277    
278     if task['serverName']!=None and task['serverName']!="":
279     # rcondorHost is already defined and stored for this task
280     # so pick it from DB
281     # cast to string to avoid issues with unicode :-(
282     rcondorUserHost=str(task['serverName'])
283     common.logger.info("serverName from Task DB is %s" %
284     rcondorUserHost)
285     if '@' in rcondorUserHost:
286     rcondorHost = rcondorUserHost.split('@')[1]
287     else:
288     rcondorHost = rcondorUserHost
289     else:
290     if self.cfg_params.has_key('CRAB.submit_host'):
291     # get an rcondor host from crab config file
292     srvCfg=ServerConfig(self.cfg_params['CRAB.submit_host']).config()
293     rcondorHost=srvCfg['serverName']
294     common.logger.info("rcondorhost from crab.cfg = %s" % rcondorHost)
295     else:
296     # pick from Available Servers List
297     srvCfg=ServerConfig('default').config()
298     print srvCfg
299     rcondorHost = srvCfg['serverName']
300     common.logger.info("rcondorhost from Avail.List = %s" % rcondorHost)
301    
302     if not rcondorHost:
303     raise CrabException('FATAL ERROR: condorHost not defined')
304     # fall back to env.
305     #common.logger.info("no serverName in Task DB, use env.var.")
306     #rcondorHost = os.getenv('RCONDOR_HOST')
307     #if not rcondorHost :
308     # raise CrabException('FATAL ERROR: env.var RCONDOR_HOST not defined')
309    
310     #rcondorUser = os.getenv('RCONDOR_USER')
311     #if not rcondorUser :
312     common.logger.info("try to find out RCONDOR_USER via uberftp ...")
313     command="uberftp %s pwd|grep User|awk '{print $3}'" % rcondorHost
314     (status, output) = commands.getstatusoutput(command)
315     if status == 0:
316     rcondorUser = output
317     common.logger.info("rcondorUser set to %s" % rcondorUser)
318     if rcondorUser==None:
319     raise CrabException('FATAL ERROR: RCONDOR_USER not defined')
320    
321     rcondorUserHost = rcondorUser + '@' + rcondorHost
322    
323     print "will set server name to : ", rcondorUserHost
324     common._db.updateTask_({'serverName':rcondorUserHost})
325 belforte 1.1
326 belforte 1.12 return (rcondorHost, rcondorUserHost)