ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerGrid.py
Revision: 1.78
Committed: Thu Oct 9 11:18:49 2008 UTC (16 years, 6 months ago) by spiga
Content type: text/x-python
Branch: MAIN
Changes since 1.77: +13 -13 lines
Log Message:
propagate to cmscp the srm version... default is always v2

File Contents

# User Rev Content
1 ewv 1.69 """
2     Base class for all grid schedulers
3     """
4    
5 ewv 1.77 __revision__ = "$Id: SchedulerGrid.py,v 1.76 2008/10/03 11:18:23 spiga Exp $"
6     __version__ = "$Revision: 1.76 $"
7 ewv 1.69
8 gutsche 1.1 from Scheduler import Scheduler
9     from crab_logger import Logger
10     from crab_exceptions import *
11     from crab_util import *
12 ewv 1.64 from BlackWhiteListParser import SEBlackWhiteListParser
13 gutsche 1.1 import common
14 spiga 1.70 from PhEDExDatasvcInfo import PhEDExDatasvcInfo
15 spiga 1.28 from JobList import JobList
16 gutsche 1.1
17     import os, sys, time
18    
19     class SchedulerGrid(Scheduler):
20 slacapra 1.8
21     def __init__(self, name):
22     Scheduler.__init__(self,name)
23 gutsche 1.1 self.states = [ "Acl", "cancelReason", "cancelling","ce_node","children", \
24     "children_hist","children_num","children_states","condorId","condor_jdl", \
25     "cpuTime","destination", "done_code","exit_code","expectFrom", \
26     "expectUpdate","globusId","jdl","jobId","jobtype", \
27     "lastUpdateTime","localId","location", "matched_jdl","network_server", \
28     "owner","parent_job", "reason","resubmitted","rsl","seed",\
29     "stateEnterTime","stateEnterTimes","subjob_failed", \
30     "user tags" , "status" , "status_code","hierarchy"]
31     return
32 slacapra 1.12
33 slacapra 1.8 def configure(self, cfg_params):
34 spiga 1.70 self.cfg_params = cfg_params
35 slacapra 1.8 Scheduler.configure(self,cfg_params)
36    
37     # init BlackWhiteListParser
38 ewv 1.64 self.blackWhiteListParser = SEBlackWhiteListParser(cfg_params)
39 slacapra 1.8
40     self.proxyValid=0
41     self.dontCheckProxy=int(cfg_params.get("EDG.dont_check_proxy",0))
42    
43     self.proxyServer = cfg_params.get("EDG.proxy_server",'myproxy.cern.ch')
44     common.logger.debug(5,'Setting myproxy server to '+self.proxyServer)
45    
46     self.group = cfg_params.get("EDG.group", None)
47    
48     self.role = cfg_params.get("EDG.role", None)
49    
50     self.EDG_ce_black_list = cfg_params.get('EDG.ce_black_list',None)
51     if (self.EDG_ce_black_list): self.EDG_ce_black_list = string.split(self.EDG_ce_black_list,',')
52    
53     self.EDG_ce_white_list = cfg_params.get('EDG.ce_white_list',None)
54     if (self.EDG_ce_white_list): self.EDG_ce_white_list = string.split(self.EDG_ce_white_list,',')
55    
56     self.VO = cfg_params.get('EDG.virtual_organization','cms')
57    
58     self.return_data = cfg_params.get('USER.return_data',0)
59 gutsche 1.1
60 spiga 1.70 self.publish_data = cfg_params.get("USER.publish_data",0)
61    
62 slacapra 1.8 self.copy_data = cfg_params.get("USER.copy_data",0)
63     if int(self.copy_data) == 1:
64     self.SE = cfg_params.get('USER.storage_element',None)
65 spiga 1.70 if not self.SE:
66 slacapra 1.8 msg = "Error. The [USER] section does not have 'storage_element'"
67     common.logger.message(msg)
68     raise CrabException(msg)
69 gutsche 1.1
70     if ( int(self.return_data) == 0 and int(self.copy_data) == 0 ):
71 ewv 1.69 msg = 'Error: return_data = 0 and copy_data = 0 ==> your exe output will be lost\n'
72     msg = msg + 'Please modify return_data and copy_data value in your crab.cfg file\n'
73     raise CrabException(msg)
74 fanzago 1.6
75 slacapra 1.8 if ( int(self.return_data) == 1 and int(self.copy_data) == 1 ):
76 ewv 1.69 msg = 'Error: return_data and copy_data cannot be set both to 1\n'
77     msg = msg + 'Please modify return_data or copy_data value in your crab.cfg file\n'
78     raise CrabException(msg)
79 slacapra 1.8
80     if ( int(self.copy_data) == 0 and int(self.publish_data) == 1 ):
81 ewv 1.69 msg = 'Warning: publish_data = 1 must be used with copy_data = 1\n'
82     msg = msg + 'Please modify copy_data value in your crab.cfg file\n'
83     common.logger.message(msg)
84     raise CrabException(msg)
85 ewv 1.68
86 slacapra 1.8 self.EDG_requirements = cfg_params.get('EDG.requirements',None)
87    
88     self.EDG_addJdlParam = cfg_params.get('EDG.additional_jdl_parameters',None)
89     if (self.EDG_addJdlParam): self.EDG_addJdlParam = string.split(self.EDG_addJdlParam,';')
90    
91 spiga 1.20 self.EDG_retry_count = cfg_params.get('EDG.retry_count',0)
92 slacapra 1.8
93 spiga 1.20 self.EDG_shallow_retry_count= cfg_params.get('EDG.shallow_retry_count',-1)
94 gutsche 1.1
95 slacapra 1.8 self.EDG_clock_time = cfg_params.get('EDG.max_wall_clock_time',None)
96    
97 afanfani 1.23 # Default minimum CPU time to >= 130 minutes
98     self.EDG_cpu_time = cfg_params.get('EDG.max_cpu_time', '130')
99 gutsche 1.1
100 spiga 1.78 self.srm_version = cfg_params.get("USER.srm_version",'srmv2')
101    
102 spiga 1.70 self.debug_wrapper = cfg_params.get('USER.debug_wrapper',False)
103     self.debugWrap=''
104     if self.debug_wrapper: self.debugWrap='--debug'
105    
106 gutsche 1.1 # Add EDG_WL_LOCATION to the python path
107    
108 slacapra 1.8 if not os.environ.has_key('EDG_WL_LOCATION'):
109 gutsche 1.1 msg = "Error: the EDG_WL_LOCATION variable is not set."
110 ewv 1.27 raise CrabException(msg)
111     path = os.environ['EDG_WL_LOCATION']
112 gutsche 1.1
113 ewv 1.27 libPath=os.path.join(path, "lib")
114     sys.path.append(libPath)
115     libPath=os.path.join(path, "lib", "python")
116     sys.path.append(libPath)
117 ewv 1.24
118 ewv 1.66 self._taskId = uniqueTaskName(common._db.queryTask('name'))
119     self.jobtypeName = cfg_params.get('CRAB.jobtype','')
120 slacapra 1.8 self.schedulerName = cfg_params.get('CRAB.scheduler','')
121    
122 spiga 1.49 self.checkProxy()
123 gutsche 1.1 return
124 slacapra 1.8
125     def rb_configure(self, RB):
126     """
127 ewv 1.16 Return a requirement to be add to Jdl to select a specific RB/WMS:
128 slacapra 1.8 return None if RB=None
129     To be re-implemented in concrete scheduler
130     """
131     return None
132 ewv 1.24
133     def sched_fix_parameter(self):
134 ewv 1.69 """
135     Returns string with requirements and scheduler-specific parameters
136     """
137     index = int(common._db.nJobs())
138     job = common.job_list[index-1]
139     jbt = job.type()
140     req = ''
141     req = req + jbt.getRequirements()
142    
143     if self.EDG_requirements:
144     if (not req == ' '):
145     req = req + ' && '
146     req = req + self.EDG_requirements
147    
148     taskReq = {'jobType':req}
149     common._db.updateTask_(taskReq)
150 gutsche 1.1
151 spiga 1.50 def listMatch(self, dest, full):
152 ewv 1.58 matching='fast'
153 spiga 1.50 ces=Scheduler.listMatch(self, dest, full)
154 slacapra 1.35 sites=[]
155     for ce in ces:
156     site=ce.split(":")[0]
157     if site not in sites:
158     sites.append(site)
159     pass
160 ewv 1.58 if full == True: matching='full'
161     common.logger.write("list of available site ( "+str(matching) +" matching ) : "+str(sites))
162 slacapra 1.35 return sites
163    
164    
165 gutsche 1.1 def wsSetupEnvironment(self):
166     """
167     Returns part of a job script which does scheduler-specific work.
168     """
169 spiga 1.28 index = int(common._db.nJobs())
170     job = common.job_list[index-1]
171     jbt = job.type()
172 slacapra 1.8 if not self.environment_unique_identifier:
173     raise CrabException('environment_unique_identifier not set')
174    
175 ewv 1.58 # start with wrapper timing
176 farinafa 1.55 txt = 'export TIME_WRAP_INI=`date +%s` \n'
177 farinafa 1.61 txt += 'export TIME_STAGEOUT=-2 \n\n'
178 farinafa 1.53 txt += '# '+self.name()+' specific stuff\n'
179 slacapra 1.8 txt += '# strip arguments\n'
180     txt += 'echo "strip arguments"\n'
181     txt += 'args=("$@")\n'
182     txt += 'nargs=$#\n'
183     txt += 'shift $nargs\n'
184     txt += "# job number (first parameter for job wrapper)\n"
185 ewv 1.16 txt += "NJob=${args[0]}; export NJob\n"
186 slacapra 1.8
187 spiga 1.29 txt += "out_files=out_files_${NJob}; export out_files\n"
188     txt += "echo $out_files\n"
189 spiga 1.28 txt += jbt.outList()
190 spiga 1.29
191 ewv 1.65 txt += 'MonitorJobID=${NJob}_'+self.environment_unique_identifier+'\n'
192     txt += 'SyncGridJobId='+self.environment_unique_identifier+'\n'
193 fanzago 1.17 txt += 'MonitorID='+self._taskId+'\n'
194     txt += 'echo "MonitorJobID=$MonitorJobID" > $RUNTIME_AREA/$repo \n'
195     txt += 'echo "SyncGridJobId=$SyncGridJobId" >> $RUNTIME_AREA/$repo \n'
196     txt += 'echo "MonitorID=$MonitorID" >> $RUNTIME_AREA/$repo\n'
197 slacapra 1.8
198 fanzago 1.17 txt += 'echo ">>> GridFlavour discovery: " \n'
199 slacapra 1.8 txt += 'if [ $OSG_APP ]; then \n'
200     txt += ' middleware=OSG \n'
201     txt += ' if [ $OSG_JOB_CONTACT ]; then \n'
202     txt += ' SyncCE="$OSG_JOB_CONTACT"; \n'
203 fanzago 1.17 txt += ' echo "SyncCE=$SyncCE" >> $RUNTIME_AREA/$repo ;\n'
204 slacapra 1.8 txt += ' else\n'
205     txt += ' echo "not reporting SyncCE";\n'
206     txt += ' fi\n';
207 fanzago 1.17 txt += ' echo "GridFlavour=$middleware" | tee -a $RUNTIME_AREA/$repo \n'
208 slacapra 1.8 txt += 'elif [ $VO_CMS_SW_DIR ]; then \n'
209 spiga 1.3 txt += ' middleware=LCG \n'
210 fanzago 1.17 txt += ' echo "SyncCE=`glite-brokerinfo getCE`" >> $RUNTIME_AREA/$repo \n'
211     txt += ' echo "GridFlavour=$middleware" | tee -a $RUNTIME_AREA/$repo \n'
212 gutsche 1.1 txt += 'else \n'
213 fanzago 1.17 txt += ' echo "ERROR ==> GridFlavour not identified" \n'
214     txt += ' job_exit_code=10030 \n'
215     txt += ' func_exit \n'
216 slacapra 1.8 txt += 'fi \n'
217    
218     txt += 'dumpStatus $RUNTIME_AREA/$repo \n'
219 gutsche 1.1 txt += '\n\n'
220    
221 ewv 1.37 txt += 'export VO='+self.VO+'\n'
222     txt += 'if [ $middleware == LCG ]; then\n'
223     txt += ' CloseCEs=`glite-brokerinfo getCE`\n'
224     txt += ' echo "CloseCEs = $CloseCEs"\n'
225     txt += ' CE=`echo $CloseCEs | sed -e "s/:.*//"`\n'
226     txt += ' echo "CE = $CE"\n'
227     txt += 'elif [ $middleware == OSG ]; then \n'
228     txt += ' if [ $OSG_JOB_CONTACT ]; then \n'
229     txt += ' CE=`echo $OSG_JOB_CONTACT | /usr/bin/awk -F\/ \'{print $1}\'` \n'
230     txt += ' else \n'
231     txt += ' echo "ERROR ==> OSG mode in setting CE name from OSG_JOB_CONTACT" \n'
232     txt += ' job_exit_code=10099\n'
233     txt += ' func_exit\n'
234     txt += ' fi \n'
235     txt += 'fi \n'
236    
237 gutsche 1.1 return txt
238    
239     def wsCopyOutput(self):
240     """
241     Write a CopyResults part of a job script, e.g.
242     to copy produced output into a storage element.
243     """
244 slacapra 1.8 txt = '\n'
245 gutsche 1.1
246 spiga 1.18 txt += '#\n'
247     txt += '# COPY OUTPUT FILE TO SE\n'
248     txt += '#\n\n'
249 gutsche 1.1
250 slacapra 1.8 if int(self.copy_data) == 1:
251 spiga 1.70 stageout = PhEDExDatasvcInfo(self.cfg_params)
252     endpoint, lfn, SE, SE_PATH, user = stageout.getEndpoint()
253    
254     txt += 'export SE='+SE+'\n'
255     txt += 'echo "SE = $SE"\n'
256 slacapra 1.8 txt += 'export SE_PATH='+SE_PATH+'\n'
257     txt += 'echo "SE_PATH = $SE_PATH"\n'
258 spiga 1.70 txt += 'export LFNBaseName='+lfn+'\n'
259 fanzago 1.73 txt += 'echo "LFNBaseName = $LFNBaseName"\n'
260 spiga 1.70 txt += 'export USER='+user+'\n'
261 fanzago 1.72 txt += 'echo "USER = $USER"\n'
262 spiga 1.70 txt += 'export endpoint='+endpoint+'\n'
263 fanzago 1.72 txt += 'echo "endpoint = $endpoint"\n'
264 spiga 1.78
265 spiga 1.70 txt += 'echo ">>> Copy output files from WN = `hostname` to $SE_PATH :"\n'
266 farinafa 1.56 txt += 'export TIME_STAGEOUT_INI=`date +%s` \n'
267 fanzago 1.17 txt += 'copy_exit_status=0\n'
268 spiga 1.78 txt += 'echo "python cmscp.py --destination $endpoint --inputFileList $file_list --middleware $middleware --srm_version='+self.srm_version+' '+self.debugWrap+'"\n'
269     txt += 'python cmscp.py --destination $endpoint --inputFileList $file_list --middleware $middleware --srm_version='+self.srm_version+' '+self.debugWrap+'\n'
270     if self.debug_wrapper:
271     txt += '########### details of SE interaction\n'
272     txt += 'cat .SEinteraction.log\n'
273     txt += '########### \n'
274 spiga 1.75 txt += 'source cmscpReport.sh\n'
275 spiga 1.70 txt += 'if [ $StageOutExitStatus -ne 0 ]; then\n'
276     txt += ' echo "Problem copying file to $SE $SE_PATH"\n'
277     txt += ' copy_exit_status=$StageOutExitStatus \n'
278 spiga 1.78 if not self.debug_wrapper:
279     txt += ' ########### details of SE interaction\n'
280     txt += ' cat .SEinteraction.log\n'
281     txt += ' ########### \n'
282 spiga 1.70 # txt += ' SE=""\n'
283     # txt += ' SE_PATH=""\n'
284     txt += ' job_exit_code=$StageOutExitStatus\n'
285 slacapra 1.8 txt += 'fi\n'
286 farinafa 1.56 txt += 'export TIME_STAGEOUT_END=`date +%s` \n'
287 farinafa 1.55 txt += 'let "TIME_STAGEOUT = TIME_STAGEOUT_END - TIME_STAGEOUT_INI" \n'
288 farinafa 1.53 else:
289     # set stageout timing to a fake value
290     txt += 'export TIME_STAGEOUT=-1 \n'
291 gutsche 1.1 return txt
292    
293 spiga 1.47 def checkProxy(self, deep=0):
294 gutsche 1.1 """
295     Function to check the Globus proxy.
296     """
297     if (self.proxyValid): return
298 slacapra 1.8
299     ### Just return if asked to do so
300     if (self.dontCheckProxy==1):
301     self.proxyValid=1
302     return
303 spiga 1.47 if deep == 0 :
304     minTimeLeft=10*3600 # in seconds
305     else:
306     minTimeLeft=100*3600 # in seconds
307 ewv 1.58
308 slacapra 1.8 mustRenew = 0
309     timeLeftLocal = runCommand('voms-proxy-info -timeleft 2>/dev/null')
310 mcinquil 1.48 ## if no valid proxy
311     if timeLeftLocal == None:
312     mustRenew = 1
313 ewv 1.58 ## if valid check how long
314 mcinquil 1.48 elif int(timeLeftLocal)<minTimeLeft :
315 slacapra 1.8 mustRenew = 1
316    
317 spiga 1.60 ## check first attribute
318     att=runCommand('voms-proxy-info -fqan 2>/dev/null | head -1')
319     reg="/%s/"%self.VO
320     if self.group:
321     reg+=self.group
322     if self.role:
323     reg+="/Role=%s"%self.role
324     ## you always have at least /cms/Role=NULL/Capability=NULL
325     if not re.compile(r"^"+reg).search(att):
326     if not mustRenew:
327     common.logger.message( "Valid proxy found, but with wrong VO group/role.\n")
328     mustRenew = 1
329     ######
330    
331    
332 slacapra 1.8 if mustRenew:
333     common.logger.message( "No valid proxy found or remaining time of validity of already existing proxy shorter than 10 hours!\n Creating a user proxy with default length of 192h\n")
334     cmd = 'voms-proxy-init -voms '+self.VO
335     if self.group:
336     cmd += ':/'+self.VO+'/'+self.group
337     if self.role:
338     cmd += '/role='+self.role
339     cmd += ' -valid 192:00'
340 gutsche 1.1 try:
341     # SL as above: damn it!
342 slacapra 1.8 common.logger.debug(10,cmd)
343 gutsche 1.1 out = os.system(cmd)
344     if (out>0): raise CrabException("Unable to create a valid proxy!\n")
345     except:
346     msg = "Unable to create a valid proxy!\n"
347     raise CrabException(msg)
348     pass
349 slacapra 1.8
350     ## now I do have a voms proxy valid, and I check the myproxy server
351     renewProxy = 0
352     cmd = 'myproxy-info -d -s '+self.proxyServer
353     cmd_out = runCommand(cmd,0,20)
354     if not cmd_out:
355     common.logger.message('No credential delegated to myproxy server '+self.proxyServer+' will do now')
356     renewProxy = 1
357     else:
358     ## minimum time: 5 days
359     minTime = 4 * 24 * 3600
360     ## regex to extract the right information
361     myproxyRE = re.compile("timeleft: (?P<hours>[\\d]*):(?P<minutes>[\\d]*):(?P<seconds>[\\d]*)")
362     for row in cmd_out.split("\n"):
363     g = myproxyRE.search(row)
364     if g:
365     hours = g.group("hours")
366     minutes = g.group("minutes")
367     seconds = g.group("seconds")
368     timeleft = int(hours)*3600 + int(minutes)*60 + int(seconds)
369     if timeleft < minTime:
370     renewProxy = 1
371     common.logger.message('Your proxy will expire in:\n\t'+hours+' hours '+minutes+' minutes '+seconds+' seconds\n')
372     common.logger.message('Need to renew it:')
373     pass
374     pass
375     pass
376    
377     # if not, create one.
378     if renewProxy:
379     cmd = 'myproxy-init -d -n -s '+self.proxyServer
380     out = os.system(cmd)
381     if (out>0):
382     raise CrabException("Unable to delegate the proxy to myproxyserver "+self.proxyServer+" !\n")
383     pass
384    
385     # cache proxy validity
386 gutsche 1.1 self.proxyValid=1
387     return
388    
389 slacapra 1.8 def userName(self):
390     """ return the user name """
391 ewv 1.68 tmp=runCommand("voms-proxy-info -identity 2>/dev/null")
392 slacapra 1.8 return tmp.strip()
393    
394 gutsche 1.1 def configOpt_(self):
395     edg_ui_cfg_opt = ' '
396     if self.edg_config:
397 slacapra 1.8 edg_ui_cfg_opt = ' -c ' + self.edg_config + ' '
398     if self.edg_config_vo:
399     edg_ui_cfg_opt += ' --config-vo ' + self.edg_config_vo + ' '
400 gutsche 1.1 return edg_ui_cfg_opt
401 slacapra 1.8
402    
403    
404 slacapra 1.34 def tags(self):
405     task=common._db.getTask()
406 ewv 1.37 tags_tmp=string.split(task['jobType'],'"')
407 slacapra 1.34 tags=[str(tags_tmp[1]),str(tags_tmp[3])]
408     return tags
409 ewv 1.37