ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerRcondor.py
Revision: 1.6
Committed: Tue Aug 7 23:05:18 2012 UTC (12 years, 8 months ago) by belforte
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_8_2
Changes since 1.5: +28 -1 lines
Log Message:
use submit-1.t2.ucsd.edu and better reporting to ML

File Contents

# User Rev Content
1 belforte 1.1 """
2     Implements the vanilla (local) Remote Condor scheduler
3     """
4    
5     from SchedulerGrid import SchedulerGrid
6     from crab_exceptions import CrabException
7     from crab_util import runCommand
8 belforte 1.2 #from WMCore.SiteScreening.BlackWhiteListParser import CEBlackWhiteListParser
9     from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
10    
11 belforte 1.1
12    
13     import common
14     import os
15     import socket
16    
17     # FUTURE: for python 2.4 & 2.6
18     try:
19     from hashlib import sha1
20     except:
21     from sha import sha as sha1
22    
23     class SchedulerRcondor(SchedulerGrid) :
24     """
25     Class to implement the vanilla (local) Condor scheduler
26     Naming convention: Methods starting with 'ws' provide
27     the corresponding part of the job script
28     ('ws' stands for 'write script').
29     """
30    
31     def __init__(self):
32     SchedulerGrid.__init__(self,"RCONDOR")
33 belforte 1.6 self.rcondorHost = os.getenv('RCONDOR_HOST')
34 belforte 1.1 self.datasetPath = None
35     self.selectNoInput = None
36 belforte 1.2 self.OSBsize = 50*1000*1000 # 50 MB
37 belforte 1.1
38     self.environment_unique_identifier = None
39     return
40    
41    
42     def configure(self, cfg_params):
43     """
44     Configure the scheduler with the config settings from the user
45     """
46    
47     SchedulerGrid.configure(self, cfg_params)
48    
49     self.proxyValid=0
50     self.dontCheckProxy=int(cfg_params.get("GRID.dont_check_proxy",0))
51     self.space_token = cfg_params.get("USER.space_token",None)
52     try:
53     self.proxyServer = Downloader("http://cmsdoc.cern.ch/cms/LCG/crab/config/").config("myproxy_server.conf")
54     self.proxyServer = self.proxyServer.strip()
55     if self.proxyServer is None:
56     raise CrabException("myproxy_server.conf retrieved but empty")
57     except Exception, e:
58     common.logger.info("Problem setting myproxy server endpoint: using myproxy.cern.ch")
59     common.logger.debug(e)
60     self.proxyServer= 'myproxy.cern.ch'
61     self.group = cfg_params.get("GRID.group", None)
62     self.role = cfg_params.get("GRID.role", None)
63     self.VO = cfg_params.get('GRID.virtual_organization','cms')
64    
65     try:
66     tmp = cfg_params['CMSSW.datasetpath']
67     if tmp.lower() == 'none':
68     self.datasetPath = None
69     self.selectNoInput = 1
70     else:
71     self.datasetPath = tmp
72     self.selectNoInput = 0
73     except KeyError:
74     msg = "Error: datasetpath not defined "
75     raise CrabException(msg)
76    
77     self.checkProxy()
78    
79     return
80    
81     def userName(self):
82     """ return the user name """
83     tmp=runCommand("voms-proxy-info -identity 2>/dev/null")
84     return tmp.strip()
85    
86     def envUniqueID(self):
87     taskHash = sha1(common._db.queryTask('name')).hexdigest()
88     id = "https://" + socket.gethostname() + '/' + taskHash + "/${NJob}"
89     return id
90    
91     def sched_parameter(self, i, task):
92     """
93 belforte 1.2 Return scheduler-specific parameters. Used at crab -submit time
94 belforte 1.1 """
95    
96 belforte 1.2 #SB paste from crab ScheduerGlidein
97    
98     jobParams = ""
99    
100     seDest = task.jobs[i-1]['dlsDestination']
101    
102 belforte 1.4 if seDest == [''] :
103     seDest = self.blackWhiteListParser.expandList("T") # all of SiteDB
104    
105 belforte 1.2 seString=self.blackWhiteListParser.cleanForBlackWhiteList(seDest)
106    
107     jobParams += '+DESIRED_SEs = "'+seString+'"; '
108 belforte 1.6 myschedName = self.rcondorHost
109     jobParams += '+Glidein_MonitorID = "https://'+ myschedName + '//$(Cluster).$(Process)"; '
110 belforte 1.2
111     if (self.EDG_clock_time):
112     jobParams += '+MaxWallTimeMins = '+self.EDG_clock_time+'; '
113     else:
114     jobParams += '+MaxWallTimeMins = %d; ' % (60*24)
115    
116     common._db.updateTask_({'jobType':jobParams})
117    
118    
119     return jobParams
120 belforte 1.1
121    
122     def realSchedParams(self, cfg_params):
123     """
124     Return dictionary with specific parameters, to use with real scheduler
125 belforte 1.2 is called when scheduler is initialized in Boss, i.e. at each crab command
126 belforte 1.1 """
127    
128     tmpDir = os.path.join(common.work_space.shareDir(),'.condor_temp')
129     tmpDir = os.path.join(common.work_space.shareDir(),'.condor_temp')
130     jobDir = common.work_space.jobDir()
131    
132     taskDir=common.work_space.topDir().split('/')[-2]
133 belforte 1.6 rcondorDir ='%s/.rcondor/%s/mount/' % (os.getenv('HOME'),self.rcondorHost)
134    
135     if (self.EDG_clock_time):
136     jobParams += '+MaxWallTimeMins = '+self.EDG_clock_time+'; '
137     else:
138     jobParams += '+MaxWallTimeMins = %d; ' % (60*24)
139    
140     common._db.updateTask_({'jobType':jobParams})
141    
142    
143     return jobParams
144    
145    
146     def realSchedParams(self, cfg_params):
147     """
148     Return dictionary with specific parameters, to use with real scheduler
149     is called when scheduler is initialized in Boss, i.e. at each crab command
150     """
151    
152     tmpDir = os.path.join(common.work_space.shareDir(),'.condor_temp')
153     tmpDir = os.path.join(common.work_space.shareDir(),'.condor_temp')
154     jobDir = common.work_space.jobDir()
155    
156     taskDir=common.work_space.topDir().split('/')[-2]
157     rcondorDir ='%s/.rcondor/%s/mount/' % (os.getenv('HOME'),self.rcondorHost)
158 belforte 1.1 tmpDir = os.path.join(rcondorDir,taskDir)
159     tmpDir = os.path.join(tmpDir,'condor_temp')
160    
161     params = {'tmpDir':tmpDir,
162     'jobDir':jobDir}
163 belforte 1.2
164 belforte 1.1 return params
165    
166    
167     def listMatch(self, seList, full):
168     """
169     Check the compatibility of available resources
170     """
171    
172     return [True]
173    
174    
175     def decodeLogInfo(self, fileName):
176     """
177     Parse logging info file and return main info
178     """
179    
180     import CondorGLoggingInfo
181     loggingInfo = CondorGLoggingInfo.CondorGLoggingInfo()
182     reason = loggingInfo.decodeReason(fileName)
183     return reason
184    
185    
186     # def wsCopyOutput(self):
187     # """
188     # Write a CopyResults part of a job script, e.g.
189     # to copy produced output into a storage element.
190     # """
191     # txt = self.wsCopyOutput()
192     # return txt
193    
194    
195     def wsExitFunc(self):
196     """
197     Returns the part of the job script which runs prior to exit
198     """
199    
200     txt = '\n'
201     txt += '#\n'
202     txt += '# EXECUTE THIS FUNCTION BEFORE EXIT \n'
203     txt += '#\n\n'
204    
205     txt += 'func_exit() { \n'
206     txt += self.wsExitFunc_common()
207    
208 belforte 1.2 txt += ' tar zcvf ${out_files}.tgz ${final_list}\n'
209     txt += ' tmp_size=`ls -gGrta ${out_files}.tgz | awk \'{ print $3 }\'`\n'
210     txt += ' rm ${out_files}.tgz\n'
211     txt += ' size=`expr $tmp_size`\n'
212     txt += ' echo "Total Output dimension: $size"\n'
213     txt += ' limit='+str(self.OSBsize) +' \n'
214     txt += ' echo "WARNING: output files size limit is set to: $limit"\n'
215     txt += ' if [ "$limit" -lt "$size" ]; then\n'
216     txt += ' exceed=1\n'
217     txt += ' job_exit_code=70000\n'
218     txt += ' echo "Output Sanbox too big. Produced output is lost "\n'
219     txt += ' else\n'
220     txt += ' exceed=0\n'
221     txt += ' echo "Total Output dimension $size is fine."\n'
222     txt += ' fi\n'
223    
224     txt += ' echo "JOB_EXIT_STATUS = $job_exit_code"\n'
225     txt += ' echo "JobExitCode=$job_exit_code" >> $RUNTIME_AREA/$repo\n'
226     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
227     txt += ' if [ $exceed -ne 1 ]; then\n'
228     txt += ' tar zcvf ${out_files}.tgz ${final_list}\n'
229     txt += ' else\n'
230     txt += ' tar zcvf ${out_files}.tgz CMSSW_${NJob}.stdout CMSSW_${NJob}.stderr\n'
231     txt += ' fi\n'
232     txt += ' python $RUNTIME_AREA/fillCrabFjr.py $RUNTIME_AREA/crab_fjr_$NJob.xml --errorcode $job_exit_code \n'
233 belforte 1.1
234     txt += ' exit $job_exit_code\n'
235     txt += '}\n'
236    
237     return txt
238    
239    
240     def sched_fix_parameter(self):
241     """
242     Returns string with requirements and scheduler-specific parameters
243     """
244    
245     if self.EDG_requirements:
246     req = self.EDG_requirements
247     taskReq = {'commonRequirements':req}
248     common._db.updateTask_(taskReq)
249    
250