ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerRcondor.py
Revision: 1.11
Committed: Sun Aug 19 17:28:19 2012 UTC (12 years, 8 months ago) by belforte
Content type: text/x-python
Branch: MAIN
Changes since 1.10: +7 -2 lines
Log Message:
warn user and stop i sing ce b/w list

File Contents

# User Rev Content
1 belforte 1.1 """
2     Implements the vanilla (local) Remote Condor scheduler
3     """
4    
5     from SchedulerGrid import SchedulerGrid
6     from crab_exceptions import CrabException
7     from crab_util import runCommand
8 belforte 1.2 from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
9 belforte 1.10 import Scram
10 belforte 1.2
11 belforte 1.1
12     import common
13     import os
14     import socket
15 belforte 1.9 import re
16 belforte 1.1
17     # FUTURE: for python 2.4 & 2.6
18     try:
19     from hashlib import sha1
20     except:
21     from sha import sha as sha1
22    
23     class SchedulerRcondor(SchedulerGrid) :
24     """
25     Class to implement the vanilla (local) Condor scheduler
26     Naming convention: Methods starting with 'ws' provide
27     the corresponding part of the job script
28     ('ws' stands for 'write script').
29     """
30    
31     def __init__(self):
32     SchedulerGrid.__init__(self,"RCONDOR")
33 belforte 1.6 self.rcondorHost = os.getenv('RCONDOR_HOST')
34 belforte 1.7 if self.rcondorHost == None:
35     raise CrabException('FATAL ERROR: env.var RCONDOR_HOST not defined')
36 belforte 1.1 self.datasetPath = None
37     self.selectNoInput = None
38 belforte 1.2 self.OSBsize = 50*1000*1000 # 50 MB
39 belforte 1.1
40     self.environment_unique_identifier = None
41 belforte 1.9
42 belforte 1.1 return
43    
44    
45     def configure(self, cfg_params):
46     """
47     Configure the scheduler with the config settings from the user
48     """
49    
50     SchedulerGrid.configure(self, cfg_params)
51    
52     self.proxyValid=0
53     self.dontCheckProxy=int(cfg_params.get("GRID.dont_check_proxy",0))
54     self.space_token = cfg_params.get("USER.space_token",None)
55 belforte 1.8 self.proxyServer= 'myproxy.cern.ch'
56 belforte 1.1 self.group = cfg_params.get("GRID.group", None)
57     self.role = cfg_params.get("GRID.role", None)
58     self.VO = cfg_params.get('GRID.virtual_organization','cms')
59 belforte 1.9
60 belforte 1.1 try:
61     tmp = cfg_params['CMSSW.datasetpath']
62     if tmp.lower() == 'none':
63     self.datasetPath = None
64     self.selectNoInput = 1
65     else:
66     self.datasetPath = tmp
67     self.selectNoInput = 0
68     except KeyError:
69     msg = "Error: datasetpath not defined "
70     raise CrabException(msg)
71    
72 belforte 1.11 if cfg_params.get('GRID.ce_black_list', None) or \
73     cfg_params.get('GRID.ce_white_list', None) :
74     msg="BEWARE: scheduler RGLIDEIN ignores CE black/white lists."
75     msg+="\n Remove them from crab configuration to proceed."
76     msg+="\n Use GRID.se_white_list and/or GRID.se_black_list instead"
77     raise CrabException(msg)
78    
79 belforte 1.1 self.checkProxy()
80    
81     return
82    
83     def userName(self):
84     """ return the user name """
85     tmp=runCommand("voms-proxy-info -identity 2>/dev/null")
86     return tmp.strip()
87    
88     def envUniqueID(self):
89     taskHash = sha1(common._db.queryTask('name')).hexdigest()
90     id = "https://" + socket.gethostname() + '/' + taskHash + "/${NJob}"
91     return id
92    
93     def sched_parameter(self, i, task):
94     """
95 belforte 1.2 Return scheduler-specific parameters. Used at crab -submit time
96 belforte 1.1 """
97    
98 belforte 1.2 #SB paste from crab ScheduerGlidein
99    
100     jobParams = ""
101    
102     seDest = task.jobs[i-1]['dlsDestination']
103    
104 belforte 1.4 if seDest == [''] :
105     seDest = self.blackWhiteListParser.expandList("T") # all of SiteDB
106    
107 belforte 1.2 seString=self.blackWhiteListParser.cleanForBlackWhiteList(seDest)
108    
109     jobParams += '+DESIRED_SEs = "'+seString+'"; '
110 belforte 1.9
111 belforte 1.10 scram = Scram.Scram(None)
112     cmsVersion = scram.getSWVersion()
113     scramArch = scram.getArch()
114    
115 belforte 1.9 cmsver=re.split('_', cmsVersion)
116     numericCmsVersion = "%s%.2d%.2d" %(cmsver[1], int(cmsver[2]), int(cmsver[3]))
117    
118     jobParams += '+DESIRED_CMSVersion ="' +cmsVersion+'";'
119     jobParams += '+DESIRED_CMSVersionNr ="' +numericCmsVersion+'";'
120 belforte 1.10 jobParams += '+DESIRED_CMSScramArch ="' +scramArch+'";'
121 belforte 1.9
122 belforte 1.6 myschedName = self.rcondorHost
123     jobParams += '+Glidein_MonitorID = "https://'+ myschedName + '//$(Cluster).$(Process)"; '
124 belforte 1.2
125     if (self.EDG_clock_time):
126     jobParams += '+MaxWallTimeMins = '+self.EDG_clock_time+'; '
127     else:
128     jobParams += '+MaxWallTimeMins = %d; ' % (60*24)
129    
130     common._db.updateTask_({'jobType':jobParams})
131    
132    
133     return jobParams
134 belforte 1.1
135    
136     def realSchedParams(self, cfg_params):
137     """
138     Return dictionary with specific parameters, to use with real scheduler
139 belforte 1.2 is called when scheduler is initialized in Boss, i.e. at each crab command
140 belforte 1.1 """
141 belforte 1.7 #SB this method is used to pass directory names to Boss Scheduler
142     # via params dictionary
143 belforte 1.1
144     jobDir = common.work_space.jobDir()
145 belforte 1.6 taskDir=common.work_space.topDir().split('/')[-2]
146 belforte 1.7 shareDir = common.work_space.shareDir()
147     #SBtmpDir = common.work_space.tmpDir()
148 belforte 1.1
149 belforte 1.7 params = {'rcondorHost':self.rcondorHost,
150     'shareDir':shareDir,
151     #SB'tmpDir':tmpDir,
152     'jobDir':jobDir,
153     'taskDir':taskDir}
154 belforte 1.2
155 belforte 1.1 return params
156    
157    
158     def listMatch(self, seList, full):
159     """
160     Check the compatibility of available resources
161     """
162    
163     return [True]
164    
165    
166     def decodeLogInfo(self, fileName):
167     """
168     Parse logging info file and return main info
169     """
170    
171     import CondorGLoggingInfo
172     loggingInfo = CondorGLoggingInfo.CondorGLoggingInfo()
173     reason = loggingInfo.decodeReason(fileName)
174     return reason
175    
176    
177     # def wsCopyOutput(self):
178     # """
179     # Write a CopyResults part of a job script, e.g.
180     # to copy produced output into a storage element.
181     # """
182     # txt = self.wsCopyOutput()
183     # return txt
184    
185    
186     def wsExitFunc(self):
187     """
188     Returns the part of the job script which runs prior to exit
189     """
190    
191     txt = '\n'
192     txt += '#\n'
193     txt += '# EXECUTE THIS FUNCTION BEFORE EXIT \n'
194     txt += '#\n\n'
195    
196     txt += 'func_exit() { \n'
197     txt += self.wsExitFunc_common()
198    
199 belforte 1.2 txt += ' tar zcvf ${out_files}.tgz ${final_list}\n'
200     txt += ' tmp_size=`ls -gGrta ${out_files}.tgz | awk \'{ print $3 }\'`\n'
201     txt += ' rm ${out_files}.tgz\n'
202     txt += ' size=`expr $tmp_size`\n'
203     txt += ' echo "Total Output dimension: $size"\n'
204     txt += ' limit='+str(self.OSBsize) +' \n'
205     txt += ' echo "WARNING: output files size limit is set to: $limit"\n'
206     txt += ' if [ "$limit" -lt "$size" ]; then\n'
207     txt += ' exceed=1\n'
208     txt += ' job_exit_code=70000\n'
209     txt += ' echo "Output Sanbox too big. Produced output is lost "\n'
210     txt += ' else\n'
211     txt += ' exceed=0\n'
212     txt += ' echo "Total Output dimension $size is fine."\n'
213     txt += ' fi\n'
214    
215     txt += ' echo "JOB_EXIT_STATUS = $job_exit_code"\n'
216     txt += ' echo "JobExitCode=$job_exit_code" >> $RUNTIME_AREA/$repo\n'
217     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
218     txt += ' if [ $exceed -ne 1 ]; then\n'
219     txt += ' tar zcvf ${out_files}.tgz ${final_list}\n'
220     txt += ' else\n'
221     txt += ' tar zcvf ${out_files}.tgz CMSSW_${NJob}.stdout CMSSW_${NJob}.stderr\n'
222     txt += ' fi\n'
223     txt += ' python $RUNTIME_AREA/fillCrabFjr.py $RUNTIME_AREA/crab_fjr_$NJob.xml --errorcode $job_exit_code \n'
224 belforte 1.1
225     txt += ' exit $job_exit_code\n'
226     txt += '}\n'
227    
228     return txt
229    
230    
231     def sched_fix_parameter(self):
232     """
233     Returns string with requirements and scheduler-specific parameters
234     """
235    
236     if self.EDG_requirements:
237     req = self.EDG_requirements
238     taskReq = {'commonRequirements':req}
239     common._db.updateTask_(taskReq)
240    
241