ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerGrid.py
Revision: 1.73
Committed: Mon Sep 22 17:00:44 2008 UTC (16 years, 7 months ago) by fanzago
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_4_0_pre2
Changes since 1.72: +3 -3 lines
Log Message:
fixed a typo

File Contents

# User Rev Content
1 ewv 1.69 """
2     Base class for all grid schedulers
3     """
4    
5 fanzago 1.73 __revision__ = "$Id: SchedulerGrid.py,v 1.72 2008/09/22 16:57:29 fanzago Exp $"
6     __version__ = "$Revision: 1.72 $"
7 ewv 1.69
8 gutsche 1.1 from Scheduler import Scheduler
9     from crab_logger import Logger
10     from crab_exceptions import *
11     from crab_util import *
12 ewv 1.64 from BlackWhiteListParser import SEBlackWhiteListParser
13 gutsche 1.1 import common
14 spiga 1.70 from PhEDExDatasvcInfo import PhEDExDatasvcInfo
15 spiga 1.28 from JobList import JobList
16 gutsche 1.1
17     import os, sys, time
18    
19     class SchedulerGrid(Scheduler):
20 slacapra 1.8
21     def __init__(self, name):
22     Scheduler.__init__(self,name)
23 gutsche 1.1 self.states = [ "Acl", "cancelReason", "cancelling","ce_node","children", \
24     "children_hist","children_num","children_states","condorId","condor_jdl", \
25     "cpuTime","destination", "done_code","exit_code","expectFrom", \
26     "expectUpdate","globusId","jdl","jobId","jobtype", \
27     "lastUpdateTime","localId","location", "matched_jdl","network_server", \
28     "owner","parent_job", "reason","resubmitted","rsl","seed",\
29     "stateEnterTime","stateEnterTimes","subjob_failed", \
30     "user tags" , "status" , "status_code","hierarchy"]
31     return
32 slacapra 1.12
33 slacapra 1.8 def configure(self, cfg_params):
34 spiga 1.70 self.cfg_params = cfg_params
35 slacapra 1.8 Scheduler.configure(self,cfg_params)
36    
37     # init BlackWhiteListParser
38 ewv 1.64 self.blackWhiteListParser = SEBlackWhiteListParser(cfg_params)
39 slacapra 1.8
40     self.proxyValid=0
41     self.dontCheckProxy=int(cfg_params.get("EDG.dont_check_proxy",0))
42    
43     self.proxyServer = cfg_params.get("EDG.proxy_server",'myproxy.cern.ch')
44     common.logger.debug(5,'Setting myproxy server to '+self.proxyServer)
45    
46     self.group = cfg_params.get("EDG.group", None)
47    
48     self.role = cfg_params.get("EDG.role", None)
49    
50     self.EDG_ce_black_list = cfg_params.get('EDG.ce_black_list',None)
51     if (self.EDG_ce_black_list): self.EDG_ce_black_list = string.split(self.EDG_ce_black_list,',')
52    
53     self.EDG_ce_white_list = cfg_params.get('EDG.ce_white_list',None)
54     if (self.EDG_ce_white_list): self.EDG_ce_white_list = string.split(self.EDG_ce_white_list,',')
55    
56     self.VO = cfg_params.get('EDG.virtual_organization','cms')
57    
58     self.return_data = cfg_params.get('USER.return_data',0)
59 gutsche 1.1
60 spiga 1.70 self.publish_data = cfg_params.get("USER.publish_data",0)
61    
62 slacapra 1.8 self.copy_data = cfg_params.get("USER.copy_data",0)
63     if int(self.copy_data) == 1:
64     self.SE = cfg_params.get('USER.storage_element',None)
65 spiga 1.70 if not self.SE:
66 slacapra 1.8 msg = "Error. The [USER] section does not have 'storage_element'"
67     common.logger.message(msg)
68     raise CrabException(msg)
69 gutsche 1.1
70     if ( int(self.return_data) == 0 and int(self.copy_data) == 0 ):
71 ewv 1.69 msg = 'Error: return_data = 0 and copy_data = 0 ==> your exe output will be lost\n'
72     msg = msg + 'Please modify return_data and copy_data value in your crab.cfg file\n'
73     raise CrabException(msg)
74 fanzago 1.6
75 slacapra 1.8 if ( int(self.return_data) == 1 and int(self.copy_data) == 1 ):
76 ewv 1.69 msg = 'Error: return_data and copy_data cannot be set both to 1\n'
77     msg = msg + 'Please modify return_data or copy_data value in your crab.cfg file\n'
78     raise CrabException(msg)
79 slacapra 1.8
80     if ( int(self.copy_data) == 0 and int(self.publish_data) == 1 ):
81 ewv 1.69 msg = 'Warning: publish_data = 1 must be used with copy_data = 1\n'
82     msg = msg + 'Please modify copy_data value in your crab.cfg file\n'
83     common.logger.message(msg)
84     raise CrabException(msg)
85 ewv 1.68
86 slacapra 1.8 self.EDG_requirements = cfg_params.get('EDG.requirements',None)
87    
88     self.EDG_addJdlParam = cfg_params.get('EDG.additional_jdl_parameters',None)
89     if (self.EDG_addJdlParam): self.EDG_addJdlParam = string.split(self.EDG_addJdlParam,';')
90    
91 spiga 1.20 self.EDG_retry_count = cfg_params.get('EDG.retry_count',0)
92 slacapra 1.8
93 spiga 1.20 self.EDG_shallow_retry_count= cfg_params.get('EDG.shallow_retry_count',-1)
94 gutsche 1.1
95 slacapra 1.8 self.EDG_clock_time = cfg_params.get('EDG.max_wall_clock_time',None)
96    
97 afanfani 1.23 # Default minimum CPU time to >= 130 minutes
98     self.EDG_cpu_time = cfg_params.get('EDG.max_cpu_time', '130')
99 gutsche 1.1
100 spiga 1.70 self.debug_wrapper = cfg_params.get('USER.debug_wrapper',False)
101     self.debugWrap=''
102     if self.debug_wrapper: self.debugWrap='--debug'
103    
104 gutsche 1.1 # Add EDG_WL_LOCATION to the python path
105    
106 slacapra 1.8 if not os.environ.has_key('EDG_WL_LOCATION'):
107 gutsche 1.1 msg = "Error: the EDG_WL_LOCATION variable is not set."
108 ewv 1.27 raise CrabException(msg)
109     path = os.environ['EDG_WL_LOCATION']
110 gutsche 1.1
111 ewv 1.27 libPath=os.path.join(path, "lib")
112     sys.path.append(libPath)
113     libPath=os.path.join(path, "lib", "python")
114     sys.path.append(libPath)
115 ewv 1.24
116 ewv 1.66 self._taskId = uniqueTaskName(common._db.queryTask('name'))
117     self.jobtypeName = cfg_params.get('CRAB.jobtype','')
118 slacapra 1.8 self.schedulerName = cfg_params.get('CRAB.scheduler','')
119    
120 spiga 1.49 self.checkProxy()
121 gutsche 1.1 return
122 slacapra 1.8
123     def rb_configure(self, RB):
124     """
125 ewv 1.16 Return a requirement to be add to Jdl to select a specific RB/WMS:
126 slacapra 1.8 return None if RB=None
127     To be re-implemented in concrete scheduler
128     """
129     return None
130 ewv 1.24
131     def sched_fix_parameter(self):
132 ewv 1.69 """
133     Returns string with requirements and scheduler-specific parameters
134     """
135     index = int(common._db.nJobs())
136     job = common.job_list[index-1]
137     jbt = job.type()
138     req = ''
139     req = req + jbt.getRequirements()
140    
141     if self.EDG_requirements:
142     if (not req == ' '):
143     req = req + ' && '
144     req = req + self.EDG_requirements
145    
146     taskReq = {'jobType':req}
147     common._db.updateTask_(taskReq)
148 gutsche 1.1
149 spiga 1.50 def listMatch(self, dest, full):
150 ewv 1.58 matching='fast'
151 spiga 1.50 ces=Scheduler.listMatch(self, dest, full)
152 slacapra 1.35 sites=[]
153     for ce in ces:
154     site=ce.split(":")[0]
155     if site not in sites:
156     sites.append(site)
157     pass
158 ewv 1.58 if full == True: matching='full'
159     common.logger.write("list of available site ( "+str(matching) +" matching ) : "+str(sites))
160 slacapra 1.35 return sites
161    
162    
163 gutsche 1.1 def wsSetupEnvironment(self):
164     """
165     Returns part of a job script which does scheduler-specific work.
166     """
167 spiga 1.28 index = int(common._db.nJobs())
168     job = common.job_list[index-1]
169     jbt = job.type()
170 slacapra 1.8 if not self.environment_unique_identifier:
171     raise CrabException('environment_unique_identifier not set')
172    
173 ewv 1.58 # start with wrapper timing
174 farinafa 1.55 txt = 'export TIME_WRAP_INI=`date +%s` \n'
175 farinafa 1.61 txt += 'export TIME_STAGEOUT=-2 \n\n'
176 farinafa 1.53 txt += '# '+self.name()+' specific stuff\n'
177 slacapra 1.8 txt += '# strip arguments\n'
178     txt += 'echo "strip arguments"\n'
179     txt += 'args=("$@")\n'
180     txt += 'nargs=$#\n'
181     txt += 'shift $nargs\n'
182     txt += "# job number (first parameter for job wrapper)\n"
183 ewv 1.16 txt += "NJob=${args[0]}; export NJob\n"
184 slacapra 1.8
185 spiga 1.29 txt += "out_files=out_files_${NJob}; export out_files\n"
186     txt += "echo $out_files\n"
187 spiga 1.28 txt += jbt.outList()
188 spiga 1.29
189 ewv 1.65 txt += 'MonitorJobID=${NJob}_'+self.environment_unique_identifier+'\n'
190     txt += 'SyncGridJobId='+self.environment_unique_identifier+'\n'
191 fanzago 1.17 txt += 'MonitorID='+self._taskId+'\n'
192     txt += 'echo "MonitorJobID=$MonitorJobID" > $RUNTIME_AREA/$repo \n'
193     txt += 'echo "SyncGridJobId=$SyncGridJobId" >> $RUNTIME_AREA/$repo \n'
194     txt += 'echo "MonitorID=$MonitorID" >> $RUNTIME_AREA/$repo\n'
195 slacapra 1.8
196 fanzago 1.17 txt += 'echo ">>> GridFlavour discovery: " \n'
197 slacapra 1.8 txt += 'if [ $OSG_APP ]; then \n'
198     txt += ' middleware=OSG \n'
199     txt += ' if [ $OSG_JOB_CONTACT ]; then \n'
200     txt += ' SyncCE="$OSG_JOB_CONTACT"; \n'
201 fanzago 1.17 txt += ' echo "SyncCE=$SyncCE" >> $RUNTIME_AREA/$repo ;\n'
202 slacapra 1.8 txt += ' else\n'
203     txt += ' echo "not reporting SyncCE";\n'
204     txt += ' fi\n';
205 fanzago 1.17 txt += ' echo "GridFlavour=$middleware" | tee -a $RUNTIME_AREA/$repo \n'
206 slacapra 1.8 txt += 'elif [ $VO_CMS_SW_DIR ]; then \n'
207 spiga 1.3 txt += ' middleware=LCG \n'
208 fanzago 1.17 txt += ' echo "SyncCE=`glite-brokerinfo getCE`" >> $RUNTIME_AREA/$repo \n'
209     txt += ' echo "GridFlavour=$middleware" | tee -a $RUNTIME_AREA/$repo \n'
210 gutsche 1.1 txt += 'else \n'
211 fanzago 1.17 txt += ' echo "ERROR ==> GridFlavour not identified" \n'
212     txt += ' job_exit_code=10030 \n'
213     txt += ' func_exit \n'
214 slacapra 1.8 txt += 'fi \n'
215    
216     txt += 'dumpStatus $RUNTIME_AREA/$repo \n'
217 gutsche 1.1 txt += '\n\n'
218    
219 ewv 1.37 txt += 'export VO='+self.VO+'\n'
220     txt += 'if [ $middleware == LCG ]; then\n'
221     txt += ' CloseCEs=`glite-brokerinfo getCE`\n'
222     txt += ' echo "CloseCEs = $CloseCEs"\n'
223     txt += ' CE=`echo $CloseCEs | sed -e "s/:.*//"`\n'
224     txt += ' echo "CE = $CE"\n'
225     txt += 'elif [ $middleware == OSG ]; then \n'
226     txt += ' if [ $OSG_JOB_CONTACT ]; then \n'
227     txt += ' CE=`echo $OSG_JOB_CONTACT | /usr/bin/awk -F\/ \'{print $1}\'` \n'
228     txt += ' else \n'
229     txt += ' echo "ERROR ==> OSG mode in setting CE name from OSG_JOB_CONTACT" \n'
230     txt += ' job_exit_code=10099\n'
231     txt += ' func_exit\n'
232     txt += ' fi \n'
233     txt += 'fi \n'
234    
235 gutsche 1.1 return txt
236    
237     def wsCopyOutput(self):
238     """
239     Write a CopyResults part of a job script, e.g.
240     to copy produced output into a storage element.
241     """
242 slacapra 1.8 txt = '\n'
243 gutsche 1.1
244 spiga 1.18 txt += '#\n'
245     txt += '# COPY OUTPUT FILE TO SE\n'
246     txt += '#\n\n'
247 gutsche 1.1
248 slacapra 1.8 if int(self.copy_data) == 1:
249 spiga 1.70 stageout = PhEDExDatasvcInfo(self.cfg_params)
250     endpoint, lfn, SE, SE_PATH, user = stageout.getEndpoint()
251    
252     txt += 'export SE='+SE+'\n'
253     txt += 'echo "SE = $SE"\n'
254 slacapra 1.8 txt += 'export SE_PATH='+SE_PATH+'\n'
255     txt += 'echo "SE_PATH = $SE_PATH"\n'
256 spiga 1.70 txt += 'export LFNBaseName='+lfn+'\n'
257 fanzago 1.73 txt += 'echo "LFNBaseName = $LFNBaseName"\n'
258 spiga 1.70 txt += 'export USER='+user+'\n'
259 fanzago 1.72 txt += 'echo "USER = $USER"\n'
260 spiga 1.70 txt += 'export endpoint='+endpoint+'\n'
261 fanzago 1.72 txt += 'echo "endpoint = $endpoint"\n'
262 spiga 1.70
263     txt += 'echo ">>> Copy output files from WN = `hostname` to $SE_PATH :"\n'
264 farinafa 1.56 txt += 'export TIME_STAGEOUT_INI=`date +%s` \n'
265 fanzago 1.17 txt += 'copy_exit_status=0\n'
266 spiga 1.71 txt += 'python cmscp.py --dest $endpoint --inputFileList $file_list --middleware $middleware '+self.debugWrap+'\n'
267 spiga 1.70 txt += 'source cmscpReport.sh\n'
268     if self.debug_wrapper:
269     txt += '########### details of SE interaction\n'
270     txt += 'cat .SEinteraction.log\n'
271     txt += '########### \n'
272     txt += 'if [ $StageOutExitStatus -ne 0 ]; then\n'
273     txt += ' echo "Problem copying file to $SE $SE_PATH"\n'
274     txt += ' copy_exit_status=$StageOutExitStatus \n'
275     # txt += ' SE=""\n'
276     # txt += ' SE_PATH=""\n'
277     txt += ' job_exit_code=$StageOutExitStatus\n'
278 slacapra 1.8 txt += 'fi\n'
279 farinafa 1.56 txt += 'export TIME_STAGEOUT_END=`date +%s` \n'
280 farinafa 1.55 txt += 'let "TIME_STAGEOUT = TIME_STAGEOUT_END - TIME_STAGEOUT_INI" \n'
281 farinafa 1.53 else:
282     # set stageout timing to a fake value
283     txt += 'export TIME_STAGEOUT=-1 \n'
284 gutsche 1.1 return txt
285    
286 spiga 1.47 def checkProxy(self, deep=0):
287 gutsche 1.1 """
288     Function to check the Globus proxy.
289     """
290     if (self.proxyValid): return
291 slacapra 1.8
292     ### Just return if asked to do so
293     if (self.dontCheckProxy==1):
294     self.proxyValid=1
295     return
296 spiga 1.47 if deep == 0 :
297     minTimeLeft=10*3600 # in seconds
298     else:
299     minTimeLeft=100*3600 # in seconds
300 ewv 1.58
301 slacapra 1.8 mustRenew = 0
302     timeLeftLocal = runCommand('voms-proxy-info -timeleft 2>/dev/null')
303 mcinquil 1.48 ## if no valid proxy
304     if timeLeftLocal == None:
305     mustRenew = 1
306 ewv 1.58 ## if valid check how long
307 mcinquil 1.48 elif int(timeLeftLocal)<minTimeLeft :
308 slacapra 1.8 mustRenew = 1
309    
310 spiga 1.60 ## check first attribute
311     att=runCommand('voms-proxy-info -fqan 2>/dev/null | head -1')
312     reg="/%s/"%self.VO
313     if self.group:
314     reg+=self.group
315     if self.role:
316     reg+="/Role=%s"%self.role
317     ## you always have at least /cms/Role=NULL/Capability=NULL
318     if not re.compile(r"^"+reg).search(att):
319     if not mustRenew:
320     common.logger.message( "Valid proxy found, but with wrong VO group/role.\n")
321     mustRenew = 1
322     ######
323    
324    
325 slacapra 1.8 if mustRenew:
326     common.logger.message( "No valid proxy found or remaining time of validity of already existing proxy shorter than 10 hours!\n Creating a user proxy with default length of 192h\n")
327     cmd = 'voms-proxy-init -voms '+self.VO
328     if self.group:
329     cmd += ':/'+self.VO+'/'+self.group
330     if self.role:
331     cmd += '/role='+self.role
332     cmd += ' -valid 192:00'
333 gutsche 1.1 try:
334     # SL as above: damn it!
335 slacapra 1.8 common.logger.debug(10,cmd)
336 gutsche 1.1 out = os.system(cmd)
337     if (out>0): raise CrabException("Unable to create a valid proxy!\n")
338     except:
339     msg = "Unable to create a valid proxy!\n"
340     raise CrabException(msg)
341     pass
342 slacapra 1.8
343     ## now I do have a voms proxy valid, and I check the myproxy server
344     renewProxy = 0
345     cmd = 'myproxy-info -d -s '+self.proxyServer
346     cmd_out = runCommand(cmd,0,20)
347     if not cmd_out:
348     common.logger.message('No credential delegated to myproxy server '+self.proxyServer+' will do now')
349     renewProxy = 1
350     else:
351     ## minimum time: 5 days
352     minTime = 4 * 24 * 3600
353     ## regex to extract the right information
354     myproxyRE = re.compile("timeleft: (?P<hours>[\\d]*):(?P<minutes>[\\d]*):(?P<seconds>[\\d]*)")
355     for row in cmd_out.split("\n"):
356     g = myproxyRE.search(row)
357     if g:
358     hours = g.group("hours")
359     minutes = g.group("minutes")
360     seconds = g.group("seconds")
361     timeleft = int(hours)*3600 + int(minutes)*60 + int(seconds)
362     if timeleft < minTime:
363     renewProxy = 1
364     common.logger.message('Your proxy will expire in:\n\t'+hours+' hours '+minutes+' minutes '+seconds+' seconds\n')
365     common.logger.message('Need to renew it:')
366     pass
367     pass
368     pass
369    
370     # if not, create one.
371     if renewProxy:
372     cmd = 'myproxy-init -d -n -s '+self.proxyServer
373     out = os.system(cmd)
374     if (out>0):
375     raise CrabException("Unable to delegate the proxy to myproxyserver "+self.proxyServer+" !\n")
376     pass
377    
378     # cache proxy validity
379 gutsche 1.1 self.proxyValid=1
380     return
381    
382 slacapra 1.8 def userName(self):
383     """ return the user name """
384 ewv 1.68 tmp=runCommand("voms-proxy-info -identity 2>/dev/null")
385 slacapra 1.8 return tmp.strip()
386    
387 gutsche 1.1 def configOpt_(self):
388     edg_ui_cfg_opt = ' '
389     if self.edg_config:
390 slacapra 1.8 edg_ui_cfg_opt = ' -c ' + self.edg_config + ' '
391     if self.edg_config_vo:
392     edg_ui_cfg_opt += ' --config-vo ' + self.edg_config_vo + ' '
393 gutsche 1.1 return edg_ui_cfg_opt
394 slacapra 1.8
395    
396    
397 slacapra 1.34 def tags(self):
398     task=common._db.getTask()
399 ewv 1.37 tags_tmp=string.split(task['jobType'],'"')
400 slacapra 1.34 tags=[str(tags_tmp[1]),str(tags_tmp[3])]
401     return tags
402 ewv 1.37