ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerRcondor.py
Revision: 1.4
Committed: Thu Jul 26 23:03:38 2012 UTC (12 years, 9 months ago) by belforte
Content type: text/x-python
Branch: MAIN
Changes since 1.3: +3 -1 lines
Log Message:
if no input dataset, start with all sites and apply B/W list

File Contents

# User Rev Content
1 belforte 1.1 """
2     Implements the vanilla (local) Remote Condor scheduler
3     """
4    
5     from SchedulerGrid import SchedulerGrid
6     from crab_exceptions import CrabException
7     from crab_util import runCommand
8 belforte 1.2 #from WMCore.SiteScreening.BlackWhiteListParser import CEBlackWhiteListParser
9     from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
10    
11 belforte 1.1
12    
13     import common
14     import os
15     import socket
16    
17     # FUTURE: for python 2.4 & 2.6
18     try:
19     from hashlib import sha1
20     except:
21     from sha import sha as sha1
22    
23     class SchedulerRcondor(SchedulerGrid) :
24     """
25     Class to implement the vanilla (local) Condor scheduler
26     Naming convention: Methods starting with 'ws' provide
27     the corresponding part of the job script
28     ('ws' stands for 'write script').
29     """
30    
31     def __init__(self):
32     SchedulerGrid.__init__(self,"RCONDOR")
33     self.datasetPath = None
34     self.selectNoInput = None
35 belforte 1.2 self.OSBsize = 50*1000*1000 # 50 MB
36 belforte 1.1
37     self.environment_unique_identifier = None
38     return
39    
40    
41     def configure(self, cfg_params):
42     """
43     Configure the scheduler with the config settings from the user
44     """
45    
46     SchedulerGrid.configure(self, cfg_params)
47    
48     self.proxyValid=0
49     self.dontCheckProxy=int(cfg_params.get("GRID.dont_check_proxy",0))
50     self.space_token = cfg_params.get("USER.space_token",None)
51     try:
52     self.proxyServer = Downloader("http://cmsdoc.cern.ch/cms/LCG/crab/config/").config("myproxy_server.conf")
53     self.proxyServer = self.proxyServer.strip()
54     if self.proxyServer is None:
55     raise CrabException("myproxy_server.conf retrieved but empty")
56     except Exception, e:
57     common.logger.info("Problem setting myproxy server endpoint: using myproxy.cern.ch")
58     common.logger.debug(e)
59     self.proxyServer= 'myproxy.cern.ch'
60     self.group = cfg_params.get("GRID.group", None)
61     self.role = cfg_params.get("GRID.role", None)
62     self.VO = cfg_params.get('GRID.virtual_organization','cms')
63    
64     try:
65     tmp = cfg_params['CMSSW.datasetpath']
66     if tmp.lower() == 'none':
67     self.datasetPath = None
68     self.selectNoInput = 1
69     else:
70     self.datasetPath = tmp
71     self.selectNoInput = 0
72     except KeyError:
73     msg = "Error: datasetpath not defined "
74     raise CrabException(msg)
75    
76     self.checkProxy()
77    
78     return
79    
80     def userName(self):
81     """ return the user name """
82     tmp=runCommand("voms-proxy-info -identity 2>/dev/null")
83     return tmp.strip()
84    
85     def envUniqueID(self):
86     taskHash = sha1(common._db.queryTask('name')).hexdigest()
87     id = "https://" + socket.gethostname() + '/' + taskHash + "/${NJob}"
88     return id
89    
90     def sched_parameter(self, i, task):
91     """
92 belforte 1.2 Return scheduler-specific parameters. Used at crab -submit time
93 belforte 1.1 """
94    
95 belforte 1.2 #SB paste from crab ScheduerGlidein
96    
97     jobParams = ""
98    
99     seDest = task.jobs[i-1]['dlsDestination']
100    
101 belforte 1.4 if seDest == [''] :
102     seDest = self.blackWhiteListParser.expandList("T") # all of SiteDB
103    
104 belforte 1.2 seString=self.blackWhiteListParser.cleanForBlackWhiteList(seDest)
105    
106     jobParams += '+DESIRED_SEs = "'+seString+'"; '
107    
108     if (self.EDG_clock_time):
109     jobParams += '+MaxWallTimeMins = '+self.EDG_clock_time+'; '
110     else:
111     jobParams += '+MaxWallTimeMins = %d; ' % (60*24)
112    
113     common._db.updateTask_({'jobType':jobParams})
114    
115    
116     return jobParams
117 belforte 1.1
118    
119     def realSchedParams(self, cfg_params):
120     """
121     Return dictionary with specific parameters, to use with real scheduler
122 belforte 1.2 is called when scheduler is initialized in Boss, i.e. at each crab command
123 belforte 1.1 """
124    
125     tmpDir = os.path.join(common.work_space.shareDir(),'.condor_temp')
126     tmpDir = os.path.join(common.work_space.shareDir(),'.condor_temp')
127     jobDir = common.work_space.jobDir()
128    
129     taskDir=common.work_space.topDir().split('/')[-2]
130 belforte 1.3 rcondorDir ='%s/.rcondor/%s/mount/' % (os.getenv('HOME'),os.getenv('RCONDOR_HOST'))
131 belforte 1.1 tmpDir = os.path.join(rcondorDir,taskDir)
132     tmpDir = os.path.join(tmpDir,'condor_temp')
133    
134     params = {'tmpDir':tmpDir,
135     'jobDir':jobDir}
136 belforte 1.2
137 belforte 1.1 return params
138    
139    
140     def listMatch(self, seList, full):
141     """
142     Check the compatibility of available resources
143     """
144    
145     return [True]
146    
147    
148     def decodeLogInfo(self, fileName):
149     """
150     Parse logging info file and return main info
151     """
152    
153     import CondorGLoggingInfo
154     loggingInfo = CondorGLoggingInfo.CondorGLoggingInfo()
155     reason = loggingInfo.decodeReason(fileName)
156     return reason
157    
158    
159     # def wsCopyOutput(self):
160     # """
161     # Write a CopyResults part of a job script, e.g.
162     # to copy produced output into a storage element.
163     # """
164     # txt = self.wsCopyOutput()
165     # return txt
166    
167    
168     def wsExitFunc(self):
169     """
170     Returns the part of the job script which runs prior to exit
171     """
172    
173     txt = '\n'
174     txt += '#\n'
175     txt += '# EXECUTE THIS FUNCTION BEFORE EXIT \n'
176     txt += '#\n\n'
177    
178     txt += 'func_exit() { \n'
179     txt += self.wsExitFunc_common()
180    
181 belforte 1.2 #txt += ' cp ${out_files}.tgz $_CONDOR_SCRATCH_DIR/\n'
182     #txt += ' cp CMSSW_$NJob.stdout $_CONDOR_SCRATCH_DIR/\n'
183     #txt += ' cp CMSSW_$NJob.stderr $_CONDOR_SCRATCH_DIR/\n'
184     #txt += ' cp Watchdog_$NJob.log.gz $_CONDOR_SCRATCH_DIR/\n'
185     #txt += ' cp crab_fjr_$NJob.xml $_CONDOR_SCRATCH_DIR/\n'
186    
187    
188     ### specific Glite check for OSB
189     txt += ' tar zcvf ${out_files}.tgz ${final_list}\n'
190     txt += ' tmp_size=`ls -gGrta ${out_files}.tgz | awk \'{ print $3 }\'`\n'
191     txt += ' rm ${out_files}.tgz\n'
192     txt += ' size=`expr $tmp_size`\n'
193     txt += ' echo "Total Output dimension: $size"\n'
194     txt += ' limit='+str(self.OSBsize) +' \n'
195     txt += ' echo "WARNING: output files size limit is set to: $limit"\n'
196     txt += ' if [ "$limit" -lt "$size" ]; then\n'
197     txt += ' exceed=1\n'
198     txt += ' job_exit_code=70000\n'
199     txt += ' echo "Output Sanbox too big. Produced output is lost "\n'
200     txt += ' else\n'
201     txt += ' exceed=0\n'
202     txt += ' echo "Total Output dimension $size is fine."\n'
203     txt += ' fi\n'
204    
205     txt += ' echo "JOB_EXIT_STATUS = $job_exit_code"\n'
206     txt += ' echo "JobExitCode=$job_exit_code" >> $RUNTIME_AREA/$repo\n'
207     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
208     txt += ' if [ $exceed -ne 1 ]; then\n'
209     txt += ' tar zcvf ${out_files}.tgz ${final_list}\n'
210     txt += ' else\n'
211     txt += ' tar zcvf ${out_files}.tgz CMSSW_${NJob}.stdout CMSSW_${NJob}.stderr\n'
212     txt += ' fi\n'
213     txt += ' python $RUNTIME_AREA/fillCrabFjr.py $RUNTIME_AREA/crab_fjr_$NJob.xml --errorcode $job_exit_code \n'
214 belforte 1.1
215     txt += ' exit $job_exit_code\n'
216     txt += '}\n'
217    
218     return txt
219    
220    
221     def sched_fix_parameter(self):
222     """
223     Returns string with requirements and scheduler-specific parameters
224     """
225    
226     if self.EDG_requirements:
227     req = self.EDG_requirements
228     taskReq = {'commonRequirements':req}
229     common._db.updateTask_(taskReq)
230    
231    
232     # presa di brutto da SchedulerGrid.py
233     """
234    
235     def wsSetupEnvironment(self):
236    
237     #Returns part of a job script which does scheduler-specific work.
238    
239     taskId =common._db.queryTask('name')
240     index = int(common._db.nJobs())
241     job = common.job_list[index-1]
242     jbt = job.type()
243     if not self.environment_unique_identifier:
244     try :
245     self.environment_unique_identifier = self.envUniqueID()
246     except :
247     raise CrabException('environment_unique_identifier not set')
248    
249     # start with wrapper timing
250     txt = 'export TIME_WRAP_INI=`date +%s` \n'
251     txt += 'export TIME_STAGEOUT=-2 \n\n'
252     txt += '# '+self.name()+' specific stuff\n'
253     txt += '# strip arguments\n'
254     txt += 'echo "strip arguments"\n'
255     txt += 'args=("$@")\n'
256     txt += 'nargs=$#\n'
257     txt += 'shift $nargs\n'
258     txt += "# job number (first parameter for job wrapper)\n"
259     txt += "NJob=${args[0]}; export NJob\n"
260     txt += "NResub=${args[1]}; export NResub\n"
261     txt += "NRand=`getRandSeed`; export NRand\n"
262     # append random code
263     txt += 'OutUniqueID=_$NRand\n'
264     txt += 'OutUniqueID=_$NResub$OutUniqueID\n'
265     txt += 'OutUniqueID=$NJob$OutUniqueID; export OutUniqueID\n'
266     txt += 'CRAB_UNIQUE_JOB_ID=%s_${OutUniqueID}; export CRAB_UNIQUE_JOB_ID\n' % taskId
267     txt += 'echo env var CRAB_UNIQUE_JOB_ID set to: ${CRAB_UNIQUE_JOB_ID}\n'
268     # if we want to prepend
269     #txt += 'OutUniqueID=_$NResub\n'
270     #txt += 'OutUniqueID=_$NJob$OutUniqueID\n'
271     #txt += 'OutUniqueID=$NRand$OutUniqueID; export OutUniqueID\n'
272    
273     txt += "out_files=out_files_${NJob}; export out_files\n"
274     txt += "echo $out_files\n"
275     txt += jbt.outList()
276     # txt += 'if [ $JobRunCount ] && [ `expr $JobRunCount - 1` -gt 0 ] && [ $Glidein_MonitorID ]; then \n'
277     txt += 'if [ $Glidein_MonitorID ]; then \n'
278     # txt += ' attempt=`expr $JobRunCount - 1` \n'
279     # txt += ' MonitorJobID=${NJob}_${Glidein_MonitorID}__${attempt}\n'
280     # txt += ' SyncGridJobId=${Glidein_MonitorID}__${attempt}\n'
281     txt += ' MonitorJobID=${NJob}_${Glidein_MonitorID}\n'
282     txt += ' SyncGridJobId=${Glidein_MonitorID}\n'
283     txt += 'else \n'
284     txt += ' MonitorJobID=${NJob}_'+self.environment_unique_identifier+'\n'
285     txt += ' SyncGridJobId='+self.environment_unique_identifier+'\n'
286     txt += 'fi\n'
287     txt += 'MonitorID='+taskId+'\n'
288     txt += 'echo "MonitorJobID=$MonitorJobID" > $RUNTIME_AREA/$repo \n'
289     txt += 'echo "SyncGridJobId=$SyncGridJobId" >> $RUNTIME_AREA/$repo \n'
290     txt += 'echo "MonitorID=$MonitorID" >> $RUNTIME_AREA/$repo\n'
291    
292     txt += 'echo ">>> GridFlavour discovery: " \n'
293     txt += 'if [ $OSG_GRID ]; then \n'
294     txt += ' middleware=OSG \n'
295     txt += ' if [ $OSG_JOB_CONTACT ]; then \n'
296     txt += ' SyncCE="$OSG_JOB_CONTACT"; \n'
297     txt += ' echo "SyncCE=$SyncCE" >> $RUNTIME_AREA/$repo ;\n'
298     txt += ' else\n'
299     txt += ' echo "not reporting SyncCE";\n'
300     txt += ' fi\n';
301     txt += ' echo "GridFlavour=$middleware" | tee -a $RUNTIME_AREA/$repo \n'
302     txt += ' echo "source OSG GRID setup script" \n'
303     txt += ' source $OSG_GRID/setup.sh \n'
304     txt += 'elif [ $NORDUGRID_CE ]; then \n' # We look for $NORDUGRID_CE before $VO_CMS_SW_DIR,
305     txt += ' middleware=ARC \n' # because the latter is defined for ARC too
306     txt += ' echo "SyncCE=${NORDUGRID_CE}:2811/nordugrid-GE-${QUEUE:-queue}" >> $RUNTIME_AREA/$repo \n'
307     txt += ' echo "GridFlavour=$middleware" | tee -a $RUNTIME_AREA/$repo \n'
308     txt += 'elif [ $VO_CMS_SW_DIR ]; then \n'
309     txt += ' middleware=LCG \n'
310     txt += ' if [ $GLIDEIN_Gatekeeper ]; then \n'
311     txt += ' echo "SyncCE=`echo $GLIDEIN_Gatekeeper | sed -e s/:2119//`" >> $RUNTIME_AREA/$repo \n'
312     txt += ' else \n'
313     txt += ' echo "SyncCE=`glite-brokerinfo getCE`" >> $RUNTIME_AREA/$repo \n'
314     txt += ' fi \n'
315     txt += ' echo "GridFlavour=$middleware" | tee -a $RUNTIME_AREA/$repo \n'
316     txt += 'else \n'
317     txt += ' echo "ERROR ==> GridFlavour not identified" \n'
318     txt += ' job_exit_code=10030 \n'
319     txt += ' func_exit \n'
320     txt += 'fi \n'
321    
322     txt += 'dumpStatus $RUNTIME_AREA/$repo \n'
323     txt += '\n\n'
324    
325    
326     txt += 'export VO='+self.VO+'\n'
327     txt += 'if [ $middleware == LCG ]; then\n'
328     txt += ' if [ $GLIDEIN_Gatekeeper ]; then\n'
329     txt += ' CloseCEs=$GLIDEIN_Gatekeeper \n'
330     txt += ' else\n'
331     txt += ' CloseCEs=`glite-brokerinfo getCE`\n'
332     txt += ' fi\n'
333     txt += ' echo "CloseCEs = $CloseCEs"\n'
334     txt += ' CE=`echo $CloseCEs | sed -e "s/:.*//"`\n'
335     txt += ' echo "CE = $CE"\n'
336     txt += 'elif [ $middleware == OSG ]; then \n'
337     txt += ' if [ $OSG_JOB_CONTACT ]; then \n'
338     txt += ' CE=`echo $OSG_JOB_CONTACT | /usr/bin/awk -F\/ \'{print $1}\'` \n'
339     txt += ' else \n'
340     txt += ' echo "ERROR ==> OSG mode in setting CE name from OSG_JOB_CONTACT" \n'
341     txt += ' job_exit_code=10099\n'
342     txt += ' func_exit\n'
343     txt += ' fi \n'
344     txt += 'elif [ $middleware == ARC ]; then \n'
345     txt += ' echo "CE = $NORDUGRID_CE"\n'
346     txt += 'fi \n'
347    
348     return txt
349     """