ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerGrid.py
Revision: 1.82
Committed: Thu Oct 30 16:25:24 2008 UTC (16 years, 6 months ago) by ewv
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_4_2_pre2
Changes since 1.81: +5 -3 lines
Log Message:
Don't pass cfg_params to SiteScreening

File Contents

# User Rev Content
1 ewv 1.69 """
2     Base class for all grid schedulers
3     """
4    
5 ewv 1.82 __revision__ = "$Id: SchedulerGrid.py,v 1.81 2008/10/24 16:02:48 ewv Exp $"
6     __version__ = "$Revision: 1.81 $"
7 ewv 1.69
8 gutsche 1.1 from Scheduler import Scheduler
9     from crab_logger import Logger
10     from crab_exceptions import *
11     from crab_util import *
12 ewv 1.81 from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
13 gutsche 1.1 import common
14 spiga 1.70 from PhEDExDatasvcInfo import PhEDExDatasvcInfo
15 spiga 1.28 from JobList import JobList
16 gutsche 1.1
17     import os, sys, time
18    
19     class SchedulerGrid(Scheduler):
20 slacapra 1.8
21     def __init__(self, name):
22     Scheduler.__init__(self,name)
23 gutsche 1.1 self.states = [ "Acl", "cancelReason", "cancelling","ce_node","children", \
24     "children_hist","children_num","children_states","condorId","condor_jdl", \
25     "cpuTime","destination", "done_code","exit_code","expectFrom", \
26     "expectUpdate","globusId","jdl","jobId","jobtype", \
27     "lastUpdateTime","localId","location", "matched_jdl","network_server", \
28     "owner","parent_job", "reason","resubmitted","rsl","seed",\
29     "stateEnterTime","stateEnterTimes","subjob_failed", \
30     "user tags" , "status" , "status_code","hierarchy"]
31     return
32 slacapra 1.12
33 slacapra 1.8 def configure(self, cfg_params):
34 spiga 1.70 self.cfg_params = cfg_params
35 slacapra 1.8 Scheduler.configure(self,cfg_params)
36    
37     # init BlackWhiteListParser
38 ewv 1.82 seWhiteList = cfg_params.get('EDG.se_white_list',[])
39     seBlackList = cfg_params.get('EDG.se_black_list',[])
40     self.blackWhiteListParser = SEBlackWhiteListParser(seWhiteList, seBlackList, common.logger)
41 slacapra 1.8
42     self.proxyValid=0
43     self.dontCheckProxy=int(cfg_params.get("EDG.dont_check_proxy",0))
44    
45     self.proxyServer = cfg_params.get("EDG.proxy_server",'myproxy.cern.ch')
46     common.logger.debug(5,'Setting myproxy server to '+self.proxyServer)
47    
48     self.group = cfg_params.get("EDG.group", None)
49    
50     self.role = cfg_params.get("EDG.role", None)
51    
52     self.EDG_ce_black_list = cfg_params.get('EDG.ce_black_list',None)
53     if (self.EDG_ce_black_list): self.EDG_ce_black_list = string.split(self.EDG_ce_black_list,',')
54    
55     self.EDG_ce_white_list = cfg_params.get('EDG.ce_white_list',None)
56     if (self.EDG_ce_white_list): self.EDG_ce_white_list = string.split(self.EDG_ce_white_list,',')
57    
58     self.VO = cfg_params.get('EDG.virtual_organization','cms')
59    
60     self.return_data = cfg_params.get('USER.return_data',0)
61 gutsche 1.1
62 spiga 1.70 self.publish_data = cfg_params.get("USER.publish_data",0)
63    
64 slacapra 1.8 self.copy_data = cfg_params.get("USER.copy_data",0)
65     if int(self.copy_data) == 1:
66     self.SE = cfg_params.get('USER.storage_element',None)
67 spiga 1.70 if not self.SE:
68 slacapra 1.8 msg = "Error. The [USER] section does not have 'storage_element'"
69     common.logger.message(msg)
70     raise CrabException(msg)
71 gutsche 1.1
72     if ( int(self.return_data) == 0 and int(self.copy_data) == 0 ):
73 ewv 1.69 msg = 'Error: return_data = 0 and copy_data = 0 ==> your exe output will be lost\n'
74     msg = msg + 'Please modify return_data and copy_data value in your crab.cfg file\n'
75     raise CrabException(msg)
76 fanzago 1.6
77 slacapra 1.8 if ( int(self.return_data) == 1 and int(self.copy_data) == 1 ):
78 ewv 1.69 msg = 'Error: return_data and copy_data cannot be set both to 1\n'
79     msg = msg + 'Please modify return_data or copy_data value in your crab.cfg file\n'
80     raise CrabException(msg)
81 slacapra 1.8
82     if ( int(self.copy_data) == 0 and int(self.publish_data) == 1 ):
83 ewv 1.69 msg = 'Warning: publish_data = 1 must be used with copy_data = 1\n'
84     msg = msg + 'Please modify copy_data value in your crab.cfg file\n'
85     common.logger.message(msg)
86     raise CrabException(msg)
87 ewv 1.68
88 slacapra 1.8 self.EDG_requirements = cfg_params.get('EDG.requirements',None)
89    
90     self.EDG_addJdlParam = cfg_params.get('EDG.additional_jdl_parameters',None)
91     if (self.EDG_addJdlParam): self.EDG_addJdlParam = string.split(self.EDG_addJdlParam,';')
92    
93 spiga 1.20 self.EDG_retry_count = cfg_params.get('EDG.retry_count',0)
94 slacapra 1.8
95 spiga 1.20 self.EDG_shallow_retry_count= cfg_params.get('EDG.shallow_retry_count',-1)
96 gutsche 1.1
97 slacapra 1.8 self.EDG_clock_time = cfg_params.get('EDG.max_wall_clock_time',None)
98    
99 afanfani 1.23 # Default minimum CPU time to >= 130 minutes
100     self.EDG_cpu_time = cfg_params.get('EDG.max_cpu_time', '130')
101 gutsche 1.1
102 spiga 1.70 self.debug_wrapper = cfg_params.get('USER.debug_wrapper',False)
103     self.debugWrap=''
104     if self.debug_wrapper: self.debugWrap='--debug'
105    
106 gutsche 1.1 # Add EDG_WL_LOCATION to the python path
107    
108 slacapra 1.8 if not os.environ.has_key('EDG_WL_LOCATION'):
109 gutsche 1.1 msg = "Error: the EDG_WL_LOCATION variable is not set."
110 ewv 1.27 raise CrabException(msg)
111     path = os.environ['EDG_WL_LOCATION']
112 gutsche 1.1
113 ewv 1.27 libPath=os.path.join(path, "lib")
114     sys.path.append(libPath)
115     libPath=os.path.join(path, "lib", "python")
116     sys.path.append(libPath)
117 ewv 1.24
118 ewv 1.66 self._taskId = uniqueTaskName(common._db.queryTask('name'))
119     self.jobtypeName = cfg_params.get('CRAB.jobtype','')
120 slacapra 1.8 self.schedulerName = cfg_params.get('CRAB.scheduler','')
121    
122 spiga 1.49 self.checkProxy()
123 gutsche 1.1 return
124 slacapra 1.8
125     def rb_configure(self, RB):
126     """
127 ewv 1.16 Return a requirement to be add to Jdl to select a specific RB/WMS:
128 slacapra 1.8 return None if RB=None
129     To be re-implemented in concrete scheduler
130     """
131     return None
132 ewv 1.24
133     def sched_fix_parameter(self):
134 ewv 1.69 """
135     Returns string with requirements and scheduler-specific parameters
136     """
137     index = int(common._db.nJobs())
138     job = common.job_list[index-1]
139     jbt = job.type()
140     req = ''
141     req = req + jbt.getRequirements()
142    
143     if self.EDG_requirements:
144     if (not req == ' '):
145     req = req + ' && '
146     req = req + self.EDG_requirements
147    
148     taskReq = {'jobType':req}
149     common._db.updateTask_(taskReq)
150 gutsche 1.1
151 spiga 1.50 def listMatch(self, dest, full):
152 ewv 1.58 matching='fast'
153 spiga 1.50 ces=Scheduler.listMatch(self, dest, full)
154 slacapra 1.35 sites=[]
155     for ce in ces:
156     site=ce.split(":")[0]
157     if site not in sites:
158     sites.append(site)
159     pass
160 ewv 1.58 if full == True: matching='full'
161     common.logger.write("list of available site ( "+str(matching) +" matching ) : "+str(sites))
162 slacapra 1.35 return sites
163    
164    
165 gutsche 1.1 def wsSetupEnvironment(self):
166     """
167     Returns part of a job script which does scheduler-specific work.
168     """
169 spiga 1.28 index = int(common._db.nJobs())
170     job = common.job_list[index-1]
171     jbt = job.type()
172 slacapra 1.8 if not self.environment_unique_identifier:
173     raise CrabException('environment_unique_identifier not set')
174    
175 ewv 1.58 # start with wrapper timing
176 farinafa 1.55 txt = 'export TIME_WRAP_INI=`date +%s` \n'
177 farinafa 1.61 txt += 'export TIME_STAGEOUT=-2 \n\n'
178 farinafa 1.53 txt += '# '+self.name()+' specific stuff\n'
179 slacapra 1.8 txt += '# strip arguments\n'
180     txt += 'echo "strip arguments"\n'
181     txt += 'args=("$@")\n'
182     txt += 'nargs=$#\n'
183     txt += 'shift $nargs\n'
184     txt += "# job number (first parameter for job wrapper)\n"
185 ewv 1.16 txt += "NJob=${args[0]}; export NJob\n"
186 slacapra 1.8
187 spiga 1.29 txt += "out_files=out_files_${NJob}; export out_files\n"
188     txt += "echo $out_files\n"
189 spiga 1.28 txt += jbt.outList()
190 spiga 1.29
191 ewv 1.65 txt += 'MonitorJobID=${NJob}_'+self.environment_unique_identifier+'\n'
192     txt += 'SyncGridJobId='+self.environment_unique_identifier+'\n'
193 fanzago 1.17 txt += 'MonitorID='+self._taskId+'\n'
194     txt += 'echo "MonitorJobID=$MonitorJobID" > $RUNTIME_AREA/$repo \n'
195     txt += 'echo "SyncGridJobId=$SyncGridJobId" >> $RUNTIME_AREA/$repo \n'
196     txt += 'echo "MonitorID=$MonitorID" >> $RUNTIME_AREA/$repo\n'
197 slacapra 1.8
198 fanzago 1.17 txt += 'echo ">>> GridFlavour discovery: " \n'
199 slacapra 1.8 txt += 'if [ $OSG_APP ]; then \n'
200     txt += ' middleware=OSG \n'
201     txt += ' if [ $OSG_JOB_CONTACT ]; then \n'
202     txt += ' SyncCE="$OSG_JOB_CONTACT"; \n'
203 fanzago 1.17 txt += ' echo "SyncCE=$SyncCE" >> $RUNTIME_AREA/$repo ;\n'
204 slacapra 1.8 txt += ' else\n'
205     txt += ' echo "not reporting SyncCE";\n'
206     txt += ' fi\n';
207 fanzago 1.17 txt += ' echo "GridFlavour=$middleware" | tee -a $RUNTIME_AREA/$repo \n'
208 ewv 1.80 txt += ' echo "source OSG GRID setup script" \n'
209     txt += ' source $OSG_GRID/setup.sh \n'
210    
211 slacapra 1.8 txt += 'elif [ $VO_CMS_SW_DIR ]; then \n'
212 spiga 1.3 txt += ' middleware=LCG \n'
213 fanzago 1.17 txt += ' echo "SyncCE=`glite-brokerinfo getCE`" >> $RUNTIME_AREA/$repo \n'
214     txt += ' echo "GridFlavour=$middleware" | tee -a $RUNTIME_AREA/$repo \n'
215 gutsche 1.1 txt += 'else \n'
216 fanzago 1.17 txt += ' echo "ERROR ==> GridFlavour not identified" \n'
217     txt += ' job_exit_code=10030 \n'
218     txt += ' func_exit \n'
219 slacapra 1.8 txt += 'fi \n'
220    
221     txt += 'dumpStatus $RUNTIME_AREA/$repo \n'
222 gutsche 1.1 txt += '\n\n'
223    
224 ewv 1.37 txt += 'export VO='+self.VO+'\n'
225     txt += 'if [ $middleware == LCG ]; then\n'
226     txt += ' CloseCEs=`glite-brokerinfo getCE`\n'
227     txt += ' echo "CloseCEs = $CloseCEs"\n'
228     txt += ' CE=`echo $CloseCEs | sed -e "s/:.*//"`\n'
229     txt += ' echo "CE = $CE"\n'
230     txt += 'elif [ $middleware == OSG ]; then \n'
231     txt += ' if [ $OSG_JOB_CONTACT ]; then \n'
232     txt += ' CE=`echo $OSG_JOB_CONTACT | /usr/bin/awk -F\/ \'{print $1}\'` \n'
233     txt += ' else \n'
234     txt += ' echo "ERROR ==> OSG mode in setting CE name from OSG_JOB_CONTACT" \n'
235     txt += ' job_exit_code=10099\n'
236     txt += ' func_exit\n'
237     txt += ' fi \n'
238     txt += 'fi \n'
239    
240 gutsche 1.1 return txt
241    
242     def wsCopyOutput(self):
243     """
244     Write a CopyResults part of a job script, e.g.
245     to copy produced output into a storage element.
246     """
247 slacapra 1.8 txt = '\n'
248 gutsche 1.1
249 spiga 1.18 txt += '#\n'
250     txt += '# COPY OUTPUT FILE TO SE\n'
251     txt += '#\n\n'
252 gutsche 1.1
253 slacapra 1.8 if int(self.copy_data) == 1:
254 spiga 1.70 stageout = PhEDExDatasvcInfo(self.cfg_params)
255     endpoint, lfn, SE, SE_PATH, user = stageout.getEndpoint()
256    
257     txt += 'export SE='+SE+'\n'
258     txt += 'echo "SE = $SE"\n'
259 slacapra 1.8 txt += 'export SE_PATH='+SE_PATH+'\n'
260     txt += 'echo "SE_PATH = $SE_PATH"\n'
261 spiga 1.70 txt += 'export LFNBaseName='+lfn+'\n'
262 fanzago 1.73 txt += 'echo "LFNBaseName = $LFNBaseName"\n'
263 spiga 1.70 txt += 'export USER='+user+'\n'
264 fanzago 1.72 txt += 'echo "USER = $USER"\n'
265 spiga 1.70 txt += 'export endpoint='+endpoint+'\n'
266 fanzago 1.72 txt += 'echo "endpoint = $endpoint"\n'
267 ewv 1.79
268 spiga 1.70 txt += 'echo ">>> Copy output files from WN = `hostname` to $SE_PATH :"\n'
269 farinafa 1.56 txt += 'export TIME_STAGEOUT_INI=`date +%s` \n'
270 fanzago 1.17 txt += 'copy_exit_status=0\n'
271 ewv 1.80 txt += 'echo "python cmscp.py --destination $endpoint --inputFileList $file_list --middleware $middleware '+self.debugWrap+'"\n'
272     txt += 'python cmscp.py --destination $endpoint --inputFileList $file_list --middleware $middleware '+self.debugWrap+'\n'
273 ewv 1.79 if self.debug_wrapper:
274 ewv 1.80 txt += 'echo "which lcg-ls"\n'
275     txt += 'which lcg-ls\n'
276 ewv 1.79 txt += 'echo ########### details of SE interaction\n'
277     txt += 'cat .SEinteraction.log\n'
278     txt += 'echo ########### contents of cmscpReport\n'
279     txt += 'cat cmscpReport.sh\n'
280     txt += 'echo ########### \n'
281 spiga 1.75 txt += 'source cmscpReport.sh\n'
282 spiga 1.70 txt += 'if [ $StageOutExitStatus -ne 0 ]; then\n'
283     txt += ' echo "Problem copying file to $SE $SE_PATH"\n'
284     txt += ' copy_exit_status=$StageOutExitStatus \n'
285 ewv 1.79 if not self.debug_wrapper:
286     txt += 'echo ########### details of SE interaction\n'
287     txt += ' cat .SEinteraction.log\n'
288     txt += 'echo ########### \n'
289 spiga 1.70 # txt += ' SE=""\n'
290     # txt += ' SE_PATH=""\n'
291     txt += ' job_exit_code=$StageOutExitStatus\n'
292 slacapra 1.8 txt += 'fi\n'
293 farinafa 1.56 txt += 'export TIME_STAGEOUT_END=`date +%s` \n'
294 farinafa 1.55 txt += 'let "TIME_STAGEOUT = TIME_STAGEOUT_END - TIME_STAGEOUT_INI" \n'
295 farinafa 1.53 else:
296     # set stageout timing to a fake value
297     txt += 'export TIME_STAGEOUT=-1 \n'
298 gutsche 1.1 return txt
299    
300 spiga 1.47 def checkProxy(self, deep=0):
301 gutsche 1.1 """
302     Function to check the Globus proxy.
303     """
304     if (self.proxyValid): return
305 slacapra 1.8
306     ### Just return if asked to do so
307     if (self.dontCheckProxy==1):
308     self.proxyValid=1
309     return
310 spiga 1.47 if deep == 0 :
311     minTimeLeft=10*3600 # in seconds
312     else:
313     minTimeLeft=100*3600 # in seconds
314 ewv 1.58
315 slacapra 1.8 mustRenew = 0
316     timeLeftLocal = runCommand('voms-proxy-info -timeleft 2>/dev/null')
317 mcinquil 1.48 ## if no valid proxy
318     if timeLeftLocal == None:
319     mustRenew = 1
320 ewv 1.58 ## if valid check how long
321 mcinquil 1.48 elif int(timeLeftLocal)<minTimeLeft :
322 slacapra 1.8 mustRenew = 1
323    
324 spiga 1.60 ## check first attribute
325     att=runCommand('voms-proxy-info -fqan 2>/dev/null | head -1')
326     reg="/%s/"%self.VO
327     if self.group:
328     reg+=self.group
329     if self.role:
330     reg+="/Role=%s"%self.role
331     ## you always have at least /cms/Role=NULL/Capability=NULL
332     if not re.compile(r"^"+reg).search(att):
333     if not mustRenew:
334     common.logger.message( "Valid proxy found, but with wrong VO group/role.\n")
335     mustRenew = 1
336     ######
337    
338    
339 slacapra 1.8 if mustRenew:
340     common.logger.message( "No valid proxy found or remaining time of validity of already existing proxy shorter than 10 hours!\n Creating a user proxy with default length of 192h\n")
341     cmd = 'voms-proxy-init -voms '+self.VO
342     if self.group:
343     cmd += ':/'+self.VO+'/'+self.group
344     if self.role:
345     cmd += '/role='+self.role
346     cmd += ' -valid 192:00'
347 gutsche 1.1 try:
348     # SL as above: damn it!
349 slacapra 1.8 common.logger.debug(10,cmd)
350 gutsche 1.1 out = os.system(cmd)
351     if (out>0): raise CrabException("Unable to create a valid proxy!\n")
352     except:
353     msg = "Unable to create a valid proxy!\n"
354     raise CrabException(msg)
355     pass
356 slacapra 1.8
357     ## now I do have a voms proxy valid, and I check the myproxy server
358     renewProxy = 0
359     cmd = 'myproxy-info -d -s '+self.proxyServer
360     cmd_out = runCommand(cmd,0,20)
361     if not cmd_out:
362     common.logger.message('No credential delegated to myproxy server '+self.proxyServer+' will do now')
363     renewProxy = 1
364     else:
365     ## minimum time: 5 days
366     minTime = 4 * 24 * 3600
367     ## regex to extract the right information
368     myproxyRE = re.compile("timeleft: (?P<hours>[\\d]*):(?P<minutes>[\\d]*):(?P<seconds>[\\d]*)")
369     for row in cmd_out.split("\n"):
370     g = myproxyRE.search(row)
371     if g:
372     hours = g.group("hours")
373     minutes = g.group("minutes")
374     seconds = g.group("seconds")
375     timeleft = int(hours)*3600 + int(minutes)*60 + int(seconds)
376     if timeleft < minTime:
377     renewProxy = 1
378     common.logger.message('Your proxy will expire in:\n\t'+hours+' hours '+minutes+' minutes '+seconds+' seconds\n')
379     common.logger.message('Need to renew it:')
380     pass
381     pass
382     pass
383    
384     # if not, create one.
385     if renewProxy:
386     cmd = 'myproxy-init -d -n -s '+self.proxyServer
387     out = os.system(cmd)
388     if (out>0):
389     raise CrabException("Unable to delegate the proxy to myproxyserver "+self.proxyServer+" !\n")
390     pass
391    
392     # cache proxy validity
393 gutsche 1.1 self.proxyValid=1
394     return
395    
396 slacapra 1.8 def userName(self):
397     """ return the user name """
398 ewv 1.68 tmp=runCommand("voms-proxy-info -identity 2>/dev/null")
399 slacapra 1.8 return tmp.strip()
400    
401 gutsche 1.1 def configOpt_(self):
402     edg_ui_cfg_opt = ' '
403     if self.edg_config:
404 slacapra 1.8 edg_ui_cfg_opt = ' -c ' + self.edg_config + ' '
405     if self.edg_config_vo:
406     edg_ui_cfg_opt += ' --config-vo ' + self.edg_config_vo + ' '
407 gutsche 1.1 return edg_ui_cfg_opt
408 slacapra 1.8
409    
410    
411 slacapra 1.34 def tags(self):
412     task=common._db.getTask()
413 ewv 1.37 tags_tmp=string.split(task['jobType'],'"')
414 slacapra 1.34 tags=[str(tags_tmp[1]),str(tags_tmp[3])]
415     return tags
416 ewv 1.37