ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerGrid.py
Revision: 1.91
Committed: Tue Jan 20 17:10:08 2009 UTC (16 years, 3 months ago) by spiga
Content type: text/x-python
Branch: MAIN
Changes since 1.90: +10 -4 lines
Log Message:
Check if the log file exist before access it

File Contents

# User Rev Content
1 ewv 1.69 """
2     Base class for all grid schedulers
3     """
4    
5 spiga 1.91 __revision__ = "$Id: SchedulerGrid.py,v 1.90 2009/01/07 11:37:28 spiga Exp $"
6     __version__ = "$Revision: 1.90 $"
7 ewv 1.69
8 gutsche 1.1 from Scheduler import Scheduler
9     from crab_logger import Logger
10     from crab_exceptions import *
11     from crab_util import *
12 ewv 1.81 from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
13 gutsche 1.1 import common
14 spiga 1.70 from PhEDExDatasvcInfo import PhEDExDatasvcInfo
15 spiga 1.28 from JobList import JobList
16 gutsche 1.1
17     import os, sys, time
18    
19     class SchedulerGrid(Scheduler):
20 slacapra 1.8
21     def __init__(self, name):
22     Scheduler.__init__(self,name)
23 gutsche 1.1 self.states = [ "Acl", "cancelReason", "cancelling","ce_node","children", \
24     "children_hist","children_num","children_states","condorId","condor_jdl", \
25     "cpuTime","destination", "done_code","exit_code","expectFrom", \
26     "expectUpdate","globusId","jdl","jobId","jobtype", \
27     "lastUpdateTime","localId","location", "matched_jdl","network_server", \
28     "owner","parent_job", "reason","resubmitted","rsl","seed",\
29     "stateEnterTime","stateEnterTimes","subjob_failed", \
30     "user tags" , "status" , "status_code","hierarchy"]
31     return
32 slacapra 1.12
33 slacapra 1.8 def configure(self, cfg_params):
34 spiga 1.70 self.cfg_params = cfg_params
35 slacapra 1.8 Scheduler.configure(self,cfg_params)
36    
37     # init BlackWhiteListParser
38 ewv 1.82 seWhiteList = cfg_params.get('EDG.se_white_list',[])
39     seBlackList = cfg_params.get('EDG.se_black_list',[])
40     self.blackWhiteListParser = SEBlackWhiteListParser(seWhiteList, seBlackList, common.logger)
41 slacapra 1.8
42     self.proxyValid=0
43     self.dontCheckProxy=int(cfg_params.get("EDG.dont_check_proxy",0))
44    
45     self.proxyServer = cfg_params.get("EDG.proxy_server",'myproxy.cern.ch')
46     common.logger.debug(5,'Setting myproxy server to '+self.proxyServer)
47    
48     self.group = cfg_params.get("EDG.group", None)
49    
50     self.role = cfg_params.get("EDG.role", None)
51    
52 spiga 1.84 removeT1bL = cfg_params.get("EDG.remove_default_blacklist", 0 )
53 ewv 1.86
54 spiga 1.88 T1_BL = ["fnal.gov", "gridka.de" ,"w-ce01.grid.sinica.edu.tw", "w-ce02.grid.sinica.edu.tw", "lcg00125.grid.sinica.edu.tw",\
55     "gridpp.rl.ac.uk" , "cclcgceli03.in2p3.fr","cclcgceli04.in2p3.fr" , "pic.es", "cnaf"]
56 spiga 1.90 if int(removeT1bL) == 1:
57     T1_BL = []
58 slacapra 1.8 self.EDG_ce_black_list = cfg_params.get('EDG.ce_black_list',None)
59 ewv 1.86 if (self.EDG_ce_black_list):
60 spiga 1.84 self.EDG_ce_black_list = string.split(self.EDG_ce_black_list,',') + T1_BL
61 spiga 1.83 else :
62 spiga 1.90 if int(removeT1bL) == 0: self.EDG_ce_black_list = T1_BL
63 slacapra 1.8 self.EDG_ce_white_list = cfg_params.get('EDG.ce_white_list',None)
64     if (self.EDG_ce_white_list): self.EDG_ce_white_list = string.split(self.EDG_ce_white_list,',')
65 ewv 1.86
66 slacapra 1.8
67     self.VO = cfg_params.get('EDG.virtual_organization','cms')
68    
69     self.return_data = cfg_params.get('USER.return_data',0)
70 gutsche 1.1
71 spiga 1.70 self.publish_data = cfg_params.get("USER.publish_data",0)
72    
73 slacapra 1.8 self.copy_data = cfg_params.get("USER.copy_data",0)
74     if int(self.copy_data) == 1:
75     self.SE = cfg_params.get('USER.storage_element',None)
76 spiga 1.70 if not self.SE:
77 slacapra 1.8 msg = "Error. The [USER] section does not have 'storage_element'"
78     common.logger.message(msg)
79     raise CrabException(msg)
80 gutsche 1.1
81     if ( int(self.return_data) == 0 and int(self.copy_data) == 0 ):
82 ewv 1.69 msg = 'Error: return_data = 0 and copy_data = 0 ==> your exe output will be lost\n'
83     msg = msg + 'Please modify return_data and copy_data value in your crab.cfg file\n'
84     raise CrabException(msg)
85 fanzago 1.6
86 slacapra 1.8 if ( int(self.return_data) == 1 and int(self.copy_data) == 1 ):
87 ewv 1.69 msg = 'Error: return_data and copy_data cannot be set both to 1\n'
88     msg = msg + 'Please modify return_data or copy_data value in your crab.cfg file\n'
89     raise CrabException(msg)
90 slacapra 1.8
91     if ( int(self.copy_data) == 0 and int(self.publish_data) == 1 ):
92 ewv 1.69 msg = 'Warning: publish_data = 1 must be used with copy_data = 1\n'
93     msg = msg + 'Please modify copy_data value in your crab.cfg file\n'
94     common.logger.message(msg)
95     raise CrabException(msg)
96 ewv 1.68
97 slacapra 1.8 self.EDG_requirements = cfg_params.get('EDG.requirements',None)
98    
99     self.EDG_addJdlParam = cfg_params.get('EDG.additional_jdl_parameters',None)
100     if (self.EDG_addJdlParam): self.EDG_addJdlParam = string.split(self.EDG_addJdlParam,';')
101    
102 spiga 1.20 self.EDG_retry_count = cfg_params.get('EDG.retry_count',0)
103 slacapra 1.8
104 spiga 1.20 self.EDG_shallow_retry_count= cfg_params.get('EDG.shallow_retry_count',-1)
105 gutsche 1.1
106 slacapra 1.8 self.EDG_clock_time = cfg_params.get('EDG.max_wall_clock_time',None)
107    
108 afanfani 1.23 # Default minimum CPU time to >= 130 minutes
109     self.EDG_cpu_time = cfg_params.get('EDG.max_cpu_time', '130')
110 gutsche 1.1
111 spiga 1.70 self.debug_wrapper = cfg_params.get('USER.debug_wrapper',False)
112     self.debugWrap=''
113     if self.debug_wrapper: self.debugWrap='--debug'
114    
115 spiga 1.87 self.check_RemoteDir = int(cfg_params.get('USER.check_user_remote_dir',0))
116    
117 gutsche 1.1 # Add EDG_WL_LOCATION to the python path
118    
119 slacapra 1.8 if not os.environ.has_key('EDG_WL_LOCATION'):
120 gutsche 1.1 msg = "Error: the EDG_WL_LOCATION variable is not set."
121 ewv 1.27 raise CrabException(msg)
122     path = os.environ['EDG_WL_LOCATION']
123 gutsche 1.1
124 ewv 1.27 libPath=os.path.join(path, "lib")
125     sys.path.append(libPath)
126     libPath=os.path.join(path, "lib", "python")
127     sys.path.append(libPath)
128 ewv 1.24
129 ewv 1.66 self.jobtypeName = cfg_params.get('CRAB.jobtype','')
130 slacapra 1.8 self.schedulerName = cfg_params.get('CRAB.scheduler','')
131    
132 spiga 1.49 self.checkProxy()
133 gutsche 1.1 return
134 slacapra 1.8
135     def rb_configure(self, RB):
136     """
137 ewv 1.16 Return a requirement to be add to Jdl to select a specific RB/WMS:
138 slacapra 1.8 return None if RB=None
139     To be re-implemented in concrete scheduler
140     """
141     return None
142 ewv 1.24
143     def sched_fix_parameter(self):
144 ewv 1.69 """
145     Returns string with requirements and scheduler-specific parameters
146     """
147     index = int(common._db.nJobs())
148     job = common.job_list[index-1]
149     jbt = job.type()
150     req = ''
151     req = req + jbt.getRequirements()
152    
153     if self.EDG_requirements:
154     if (not req == ' '):
155     req = req + ' && '
156     req = req + self.EDG_requirements
157    
158     taskReq = {'jobType':req}
159     common._db.updateTask_(taskReq)
160 gutsche 1.1
161 spiga 1.50 def listMatch(self, dest, full):
162 ewv 1.58 matching='fast'
163 spiga 1.50 ces=Scheduler.listMatch(self, dest, full)
164 slacapra 1.35 sites=[]
165     for ce in ces:
166     site=ce.split(":")[0]
167     if site not in sites:
168     sites.append(site)
169     pass
170 ewv 1.58 if full == True: matching='full'
171     common.logger.write("list of available site ( "+str(matching) +" matching ) : "+str(sites))
172 slacapra 1.35 return sites
173    
174    
175 gutsche 1.1 def wsSetupEnvironment(self):
176     """
177     Returns part of a job script which does scheduler-specific work.
178     """
179 spiga 1.89 taskId = uniqueTaskName(common._db.queryTask('name'))
180 spiga 1.28 index = int(common._db.nJobs())
181     job = common.job_list[index-1]
182     jbt = job.type()
183 slacapra 1.8 if not self.environment_unique_identifier:
184     raise CrabException('environment_unique_identifier not set')
185    
186 ewv 1.58 # start with wrapper timing
187 farinafa 1.55 txt = 'export TIME_WRAP_INI=`date +%s` \n'
188 farinafa 1.61 txt += 'export TIME_STAGEOUT=-2 \n\n'
189 farinafa 1.53 txt += '# '+self.name()+' specific stuff\n'
190 slacapra 1.8 txt += '# strip arguments\n'
191     txt += 'echo "strip arguments"\n'
192     txt += 'args=("$@")\n'
193     txt += 'nargs=$#\n'
194     txt += 'shift $nargs\n'
195     txt += "# job number (first parameter for job wrapper)\n"
196 ewv 1.16 txt += "NJob=${args[0]}; export NJob\n"
197 slacapra 1.8
198 spiga 1.29 txt += "out_files=out_files_${NJob}; export out_files\n"
199     txt += "echo $out_files\n"
200 spiga 1.28 txt += jbt.outList()
201 spiga 1.29
202 ewv 1.65 txt += 'MonitorJobID=${NJob}_'+self.environment_unique_identifier+'\n'
203     txt += 'SyncGridJobId='+self.environment_unique_identifier+'\n'
204 spiga 1.89 txt += 'MonitorID='+taskId+'\n'
205 fanzago 1.17 txt += 'echo "MonitorJobID=$MonitorJobID" > $RUNTIME_AREA/$repo \n'
206     txt += 'echo "SyncGridJobId=$SyncGridJobId" >> $RUNTIME_AREA/$repo \n'
207     txt += 'echo "MonitorID=$MonitorID" >> $RUNTIME_AREA/$repo\n'
208 slacapra 1.8
209 fanzago 1.17 txt += 'echo ">>> GridFlavour discovery: " \n'
210 slacapra 1.8 txt += 'if [ $OSG_APP ]; then \n'
211     txt += ' middleware=OSG \n'
212     txt += ' if [ $OSG_JOB_CONTACT ]; then \n'
213     txt += ' SyncCE="$OSG_JOB_CONTACT"; \n'
214 fanzago 1.17 txt += ' echo "SyncCE=$SyncCE" >> $RUNTIME_AREA/$repo ;\n'
215 slacapra 1.8 txt += ' else\n'
216     txt += ' echo "not reporting SyncCE";\n'
217     txt += ' fi\n';
218 fanzago 1.17 txt += ' echo "GridFlavour=$middleware" | tee -a $RUNTIME_AREA/$repo \n'
219 ewv 1.80 txt += ' echo "source OSG GRID setup script" \n'
220     txt += ' source $OSG_GRID/setup.sh \n'
221    
222 slacapra 1.8 txt += 'elif [ $VO_CMS_SW_DIR ]; then \n'
223 spiga 1.3 txt += ' middleware=LCG \n'
224 fanzago 1.17 txt += ' echo "SyncCE=`glite-brokerinfo getCE`" >> $RUNTIME_AREA/$repo \n'
225     txt += ' echo "GridFlavour=$middleware" | tee -a $RUNTIME_AREA/$repo \n'
226 gutsche 1.1 txt += 'else \n'
227 fanzago 1.17 txt += ' echo "ERROR ==> GridFlavour not identified" \n'
228     txt += ' job_exit_code=10030 \n'
229     txt += ' func_exit \n'
230 slacapra 1.8 txt += 'fi \n'
231    
232     txt += 'dumpStatus $RUNTIME_AREA/$repo \n'
233 gutsche 1.1 txt += '\n\n'
234    
235 ewv 1.37 txt += 'export VO='+self.VO+'\n'
236     txt += 'if [ $middleware == LCG ]; then\n'
237     txt += ' CloseCEs=`glite-brokerinfo getCE`\n'
238     txt += ' echo "CloseCEs = $CloseCEs"\n'
239     txt += ' CE=`echo $CloseCEs | sed -e "s/:.*//"`\n'
240     txt += ' echo "CE = $CE"\n'
241     txt += 'elif [ $middleware == OSG ]; then \n'
242     txt += ' if [ $OSG_JOB_CONTACT ]; then \n'
243     txt += ' CE=`echo $OSG_JOB_CONTACT | /usr/bin/awk -F\/ \'{print $1}\'` \n'
244     txt += ' else \n'
245     txt += ' echo "ERROR ==> OSG mode in setting CE name from OSG_JOB_CONTACT" \n'
246     txt += ' job_exit_code=10099\n'
247     txt += ' func_exit\n'
248     txt += ' fi \n'
249     txt += 'fi \n'
250    
251 gutsche 1.1 return txt
252    
253     def wsCopyOutput(self):
254     """
255     Write a CopyResults part of a job script, e.g.
256     to copy produced output into a storage element.
257     """
258 spiga 1.87 index = int(common._db.nJobs())
259     job = common.job_list[index-1]
260     jbt = job.type()
261    
262 slacapra 1.8 txt = '\n'
263 gutsche 1.1
264 spiga 1.18 txt += '#\n'
265     txt += '# COPY OUTPUT FILE TO SE\n'
266     txt += '#\n\n'
267 gutsche 1.1
268 slacapra 1.8 if int(self.copy_data) == 1:
269 spiga 1.70 stageout = PhEDExDatasvcInfo(self.cfg_params)
270     endpoint, lfn, SE, SE_PATH, user = stageout.getEndpoint()
271 spiga 1.87 if self.check_RemoteDir == 1 :
272     self.checkRemoteDir(endpoint,jbt.outList('list') )
273 spiga 1.70 txt += 'export SE='+SE+'\n'
274     txt += 'echo "SE = $SE"\n'
275 slacapra 1.8 txt += 'export SE_PATH='+SE_PATH+'\n'
276     txt += 'echo "SE_PATH = $SE_PATH"\n'
277 spiga 1.70 txt += 'export LFNBaseName='+lfn+'\n'
278 fanzago 1.73 txt += 'echo "LFNBaseName = $LFNBaseName"\n'
279 spiga 1.70 txt += 'export USER='+user+'\n'
280 fanzago 1.72 txt += 'echo "USER = $USER"\n'
281 spiga 1.70 txt += 'export endpoint='+endpoint+'\n'
282 fanzago 1.72 txt += 'echo "endpoint = $endpoint"\n'
283 ewv 1.79
284 spiga 1.70 txt += 'echo ">>> Copy output files from WN = `hostname` to $SE_PATH :"\n'
285 farinafa 1.56 txt += 'export TIME_STAGEOUT_INI=`date +%s` \n'
286 fanzago 1.17 txt += 'copy_exit_status=0\n'
287 ewv 1.80 txt += 'echo "python cmscp.py --destination $endpoint --inputFileList $file_list --middleware $middleware '+self.debugWrap+'"\n'
288     txt += 'python cmscp.py --destination $endpoint --inputFileList $file_list --middleware $middleware '+self.debugWrap+'\n'
289 ewv 1.79 if self.debug_wrapper:
290 ewv 1.80 txt += 'echo "which lcg-ls"\n'
291     txt += 'which lcg-ls\n'
292 ewv 1.79 txt += 'echo ########### details of SE interaction\n'
293 spiga 1.91 txt += 'if [ -f .SEinteraction.log ] ;then\n'
294     txt += ' cat .SEinteraction.log\n'
295     txt += 'else\n'
296     txt += ' echo ".SEinteraction.log file not found"\n'
297     txt += 'echo ########### contents of cmscpReport\n'
298 ewv 1.79 txt += 'cat cmscpReport.sh\n'
299     txt += 'echo ########### \n'
300 spiga 1.75 txt += 'source cmscpReport.sh\n'
301 spiga 1.70 txt += 'if [ $StageOutExitStatus -ne 0 ]; then\n'
302     txt += ' echo "Problem copying file to $SE $SE_PATH"\n'
303     txt += ' copy_exit_status=$StageOutExitStatus \n'
304 ewv 1.79 if not self.debug_wrapper:
305     txt += 'echo ########### details of SE interaction\n'
306 spiga 1.91 txt += 'if [ -f .SEinteraction.log ] ;then\n'
307 ewv 1.79 txt += ' cat .SEinteraction.log\n'
308 spiga 1.91 txt += 'else\n'
309     txt += ' echo ".SEinteraction.log file not found"\n'
310 ewv 1.79 txt += 'echo ########### \n'
311 spiga 1.70 # txt += ' SE=""\n'
312     # txt += ' SE_PATH=""\n'
313     txt += ' job_exit_code=$StageOutExitStatus\n'
314 slacapra 1.8 txt += 'fi\n'
315 farinafa 1.56 txt += 'export TIME_STAGEOUT_END=`date +%s` \n'
316 farinafa 1.55 txt += 'let "TIME_STAGEOUT = TIME_STAGEOUT_END - TIME_STAGEOUT_INI" \n'
317 farinafa 1.53 else:
318     # set stageout timing to a fake value
319     txt += 'export TIME_STAGEOUT=-1 \n'
320 gutsche 1.1 return txt
321    
322 slacapra 1.8 def userName(self):
323     """ return the user name """
324 ewv 1.68 tmp=runCommand("voms-proxy-info -identity 2>/dev/null")
325 slacapra 1.8 return tmp.strip()
326    
327 gutsche 1.1 def configOpt_(self):
328     edg_ui_cfg_opt = ' '
329     if self.edg_config:
330 slacapra 1.8 edg_ui_cfg_opt = ' -c ' + self.edg_config + ' '
331     if self.edg_config_vo:
332     edg_ui_cfg_opt += ' --config-vo ' + self.edg_config_vo + ' '
333 gutsche 1.1 return edg_ui_cfg_opt
334 slacapra 1.8
335    
336    
337 slacapra 1.34 def tags(self):
338     task=common._db.getTask()
339 ewv 1.37 tags_tmp=string.split(task['jobType'],'"')
340 slacapra 1.34 tags=[str(tags_tmp[1]),str(tags_tmp[3])]
341     return tags
342 ewv 1.37