ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerGrid.py
Revision: 1.113
Committed: Tue Aug 18 15:00:37 2009 UTC (15 years, 8 months ago) by ewv
Content type: text/x-python
Branch: MAIN
CVS Tags: test_1, CRAB_2_7_0_pre1, CRAB_2_6_2, CRAB_2_6_2_pre2, CRAB_2_6_2_pre1
Branch point for: CRAB_2_6_X_br
Changes since 1.112: +4 -8 lines
Log Message:
Move EDG_requirements and EDG_addJdlParam to Scheduler base class, enable for Condor

File Contents

# Content
1 """
2 Base class for all grid schedulers
3 """
4
5 __revision__ = "$Id: SchedulerGrid.py,v 1.112 2009/07/09 19:14:30 ewv Exp $"
6 __version__ = "$Revision: 1.112 $"
7
8 from Scheduler import Scheduler
9 from crab_exceptions import *
10 from crab_util import *
11 from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
12 import common
13 from PhEDExDatasvcInfo import PhEDExDatasvcInfo
14 from JobList import JobList
15
16 import os, sys, time
17
18 class SchedulerGrid(Scheduler):
19
20 def __init__(self, name):
21 Scheduler.__init__(self,name)
22 self.states = [ "Acl", "cancelReason", "cancelling","ce_node","children", \
23 "children_hist","children_num","children_states","condorId","condor_jdl", \
24 "cpuTime","destination", "done_code","exit_code","expectFrom", \
25 "expectUpdate","globusId","jdl","jobId","jobtype", \
26 "lastUpdateTime","localId","location", "matched_jdl","network_server", \
27 "owner","parent_job", "reason","resubmitted","rsl","seed",\
28 "stateEnterTime","stateEnterTimes","subjob_failed", \
29 "user tags" , "status" , "status_code","hierarchy"]
30 return
31
32 def configure(self, cfg_params):
33 self.cfg_params = cfg_params
34 self.jobtypeName = cfg_params.get('CRAB.jobtype','')
35 self.schedulerName = cfg_params.get('CRAB.scheduler','')
36 Scheduler.configure(self,cfg_params)
37
38 self.proxyValid=0
39 self.dontCheckProxy=int(cfg_params.get("GRID.dont_check_proxy",0))
40
41 self.proxyServer = cfg_params.get("GRID.proxy_server",'myproxy.cern.ch')
42 common.logger.debug('Setting myproxy server to '+self.proxyServer)
43
44 self.group = cfg_params.get("GRID.group", None)
45 self.role = cfg_params.get("GRID.role", None)
46
47 removeT1bL = cfg_params.get("GRID.remove_default_blacklist", 0 )
48
49 T1_BL = ["fnal.gov", "gridka.de" ,"w-ce01.grid.sinica.edu.tw", "w-ce02.grid.sinica.edu.tw", "lcg00125.grid.sinica.edu.tw",\
50 "gridpp.rl.ac.uk" , "cclcgceli03.in2p3.fr","cclcgceli04.in2p3.fr" , "pic.es", "cnaf", "cern.ch"]
51 if int(removeT1bL) == 1:
52 T1_BL = []
53 self.EDG_ce_black_list = cfg_params.get('GRID.ce_black_list',None)
54 if (self.EDG_ce_black_list):
55 self.EDG_ce_black_list = string.split(self.EDG_ce_black_list,',') + T1_BL
56 else :
57 if int(removeT1bL) == 0: self.EDG_ce_black_list = T1_BL
58 self.EDG_ce_white_list = cfg_params.get('GRID.ce_white_list',None)
59 if (self.EDG_ce_white_list): self.EDG_ce_white_list = string.split(self.EDG_ce_white_list,',')
60
61 self.VO = cfg_params.get('GRID.virtual_organization','cms')
62
63 self.EDG_retry_count = cfg_params.get('GRID.retry_count',0)
64 self.EDG_shallow_retry_count= cfg_params.get('GRID.shallow_retry_count',-1)
65 self.EDG_clock_time = cfg_params.get('GRID.max_wall_clock_time',None)
66
67 # Default minimum CPU time to >= 130 minutes
68 self.EDG_cpu_time = cfg_params.get('GRID.max_cpu_time', '130')
69
70 # Add EDG_WL_LOCATION to the python path
71 if not os.environ.has_key('EDG_WL_LOCATION'):
72 msg = "Error: the EDG_WL_LOCATION variable is not set."
73 raise CrabException(msg)
74 path = os.environ['EDG_WL_LOCATION']
75
76 libPath=os.path.join(path, "lib")
77 sys.path.append(libPath)
78 libPath=os.path.join(path, "lib", "python")
79 sys.path.append(libPath)
80
81 self.checkProxy()
82 return
83
84 def rb_configure(self, RB):
85 """
86 Return a requirement to be add to Jdl to select a specific RB/WMS:
87 return None if RB=None
88 To be re-implemented in concrete scheduler
89 """
90 return None
91
92 def sched_fix_parameter(self):
93 """
94 Returns string with requirements and scheduler-specific parameters
95 """
96 index = int(common._db.nJobs())
97 job = common.job_list[index-1]
98 jbt = job.type()
99 req = ''
100 req = req + jbt.getRequirements()
101
102 if self.EDG_requirements:
103 if (not req == ' '):
104 req = req + ' && '
105 req = req + self.EDG_requirements
106
107 taskReq = {'jobType':req}
108 common._db.updateTask_(taskReq)
109
110 def listMatch(self, dest, full):
111 matching='fast'
112 ces=Scheduler.listMatch(self, dest, full)
113 sites=[]
114 for ce in ces:
115 site=ce.split(":")[0]
116 if site not in sites:
117 sites.append(site)
118 pass
119 if full == True: matching='full'
120 common.logger.debug("list of available site ( "+str(matching) +" matching ) : "+str(sites))
121 return sites
122
123
124 def wsSetupEnvironment(self):
125 """
126 Returns part of a job script which does scheduler-specific work.
127 """
128 taskId =common._db.queryTask('name')
129 index = int(common._db.nJobs())
130 job = common.job_list[index-1]
131 jbt = job.type()
132 if not self.environment_unique_identifier:
133 try :
134 self.environment_unique_identifier = self.envUniqueID()
135 except :
136 raise CrabException('environment_unique_identifier not set')
137
138 # start with wrapper timing
139 txt = 'export TIME_WRAP_INI=`date +%s` \n'
140 txt += 'export TIME_STAGEOUT=-2 \n\n'
141 txt += '# '+self.name()+' specific stuff\n'
142 txt += '# strip arguments\n'
143 txt += 'echo "strip arguments"\n'
144 txt += 'args=("$@")\n'
145 txt += 'nargs=$#\n'
146 txt += 'shift $nargs\n'
147 txt += "# job number (first parameter for job wrapper)\n"
148 txt += "NJob=${args[0]}; export NJob\n"
149
150 txt += "out_files=out_files_${NJob}; export out_files\n"
151 txt += "echo $out_files\n"
152 txt += jbt.outList()
153
154 txt += 'if [ $JobRunCount ] && [ `expr $JobRunCount - 1` -gt 0 ] && [ $Glidein_MonitorID ]; then \n'
155 txt += ' attempt=`expr $JobRunCount - 1` \n'
156 txt += ' MonitorJobID=${NJob}_${Glidein_MonitorID}__${attempt}\n'
157 txt += ' SyncGridJobId=${Glidein_MonitorID}__${attempt}\n'
158 txt += 'elif [ $Glidein_MonitorID ]; then \n'
159 txt += ' MonitorJobID=${NJob}_${Glidein_MonitorID}\n'
160 txt += ' SyncGridJobId=${Glidein_MonitorID}\n'
161 txt += 'else \n'
162 txt += ' MonitorJobID=${NJob}_'+self.environment_unique_identifier+'\n'
163 txt += ' SyncGridJobId='+self.environment_unique_identifier+'\n'
164 txt += 'fi\n'
165 txt += 'MonitorID='+taskId+'\n'
166 txt += 'echo "MonitorJobID=$MonitorJobID" > $RUNTIME_AREA/$repo \n'
167 txt += 'echo "SyncGridJobId=$SyncGridJobId" >> $RUNTIME_AREA/$repo \n'
168 txt += 'echo "MonitorID=$MonitorID" >> $RUNTIME_AREA/$repo\n'
169
170 txt += 'echo ">>> GridFlavour discovery: " \n'
171 txt += 'if [ $OSG_APP ]; then \n'
172 txt += ' middleware=OSG \n'
173 txt += ' if [ $OSG_JOB_CONTACT ]; then \n'
174 txt += ' SyncCE="$OSG_JOB_CONTACT"; \n'
175 txt += ' echo "SyncCE=$SyncCE" >> $RUNTIME_AREA/$repo ;\n'
176 txt += ' else\n'
177 txt += ' echo "not reporting SyncCE";\n'
178 txt += ' fi\n';
179 txt += ' echo "GridFlavour=$middleware" | tee -a $RUNTIME_AREA/$repo \n'
180 txt += ' echo "source OSG GRID setup script" \n'
181 txt += ' source $OSG_GRID/setup.sh \n'
182 txt += 'elif [ $NORDUGRID_CE ]; then \n' # We look for $NORDUGRID_CE before $VO_CMS_SW_DIR,
183 txt += ' middleware=ARC \n' # because the latter is defined for ARC too
184 txt += ' echo "SyncCE=$NORDUGRID_CE" >> $RUNTIME_AREA/$repo \n'
185 txt += ' echo "GridFlavour=$middleware" | tee -a $RUNTIME_AREA/$repo \n'
186 txt += 'elif [ $VO_CMS_SW_DIR ]; then \n'
187 txt += ' middleware=LCG \n'
188 txt += ' if [ $GLIDEIN_Gatekeeper ]; then \n'
189 txt += ' echo "SyncCE=`echo $GLIDEIN_Gatekeeper | sed -e s/:2119//`" >> $RUNTIME_AREA/$repo \n'
190 txt += ' else \n'
191 txt += ' echo "SyncCE=`glite-brokerinfo getCE`" >> $RUNTIME_AREA/$repo \n'
192 txt += ' fi \n'
193 txt += ' echo "GridFlavour=$middleware" | tee -a $RUNTIME_AREA/$repo \n'
194 txt += 'else \n'
195 txt += ' echo "ERROR ==> GridFlavour not identified" \n'
196 txt += ' job_exit_code=10030 \n'
197 txt += ' func_exit \n'
198 txt += 'fi \n'
199
200 txt += 'dumpStatus $RUNTIME_AREA/$repo \n'
201 txt += '\n\n'
202
203
204 txt += 'export VO='+self.VO+'\n'
205 txt += 'if [ $middleware == LCG ]; then\n'
206 txt += ' if [ $GLIDEIN_Gatekeeper ]; then\n'
207 txt += ' CloseCEs=$GLIDEIN_Gatekeeper \n'
208 txt += ' else\n'
209 txt += ' CloseCEs=`glite-brokerinfo getCE`\n'
210 txt += ' fi\n'
211 txt += ' echo "CloseCEs = $CloseCEs"\n'
212 txt += ' CE=`echo $CloseCEs | sed -e "s/:.*//"`\n'
213 txt += ' echo "CE = $CE"\n'
214 txt += 'elif [ $middleware == OSG ]; then \n'
215 txt += ' if [ $OSG_JOB_CONTACT ]; then \n'
216 txt += ' CE=`echo $OSG_JOB_CONTACT | /usr/bin/awk -F\/ \'{print $1}\'` \n'
217 txt += ' else \n'
218 txt += ' echo "ERROR ==> OSG mode in setting CE name from OSG_JOB_CONTACT" \n'
219 txt += ' job_exit_code=10099\n'
220 txt += ' func_exit\n'
221 txt += ' fi \n'
222 txt += 'elif [ $middleware == ARC ]; then \n'
223 txt += ' echo "CE = $NORDUGRID_CE"\n'
224 txt += 'fi \n'
225
226 return txt
227
228 def wsCopyOutput(self):
229 """
230 Write a CopyResults part of a job script, e.g.
231 to copy produced output into a storage element.
232 """
233 index = int(common._db.nJobs())
234 job = common.job_list[index-1]
235 jbt = job.type()
236
237 txt = '\n'
238
239 txt += '#\n'
240 txt += '# COPY OUTPUT FILE TO SE\n'
241 txt += '#\n\n'
242
243 if int(self.copy_data) == 1:
244 stageout = PhEDExDatasvcInfo(self.cfg_params)
245 endpoint, lfn, SE, SE_PATH, user = stageout.getEndpoint()
246 if self.check_RemoteDir == 1 :
247 self.checkRemoteDir(endpoint,jbt.outList('list') )
248 txt += 'export SE='+SE+'\n'
249 txt += 'echo "SE = $SE"\n'
250 txt += 'export SE_PATH='+SE_PATH+'\n'
251 txt += 'echo "SE_PATH = $SE_PATH"\n'
252 txt += 'export LFNBaseName='+lfn+'\n'
253 txt += 'echo "LFNBaseName = $LFNBaseName"\n'
254 txt += 'export USER='+user+'\n'
255 txt += 'echo "USER = $USER"\n'
256 txt += 'export endpoint='+endpoint+'\n'
257 txt += 'echo "endpoint = $endpoint"\n'
258
259 txt += 'echo ">>> Copy output files from WN = `hostname` to $SE_PATH :"\n'
260 txt += 'export TIME_STAGEOUT_INI=`date +%s` \n'
261 txt += 'copy_exit_status=0\n'
262 cmscp_args = ' --destination $endpoint --inputFileList $file_list'
263 cmscp_args +=' --middleware $middleware --lfn $LFNBaseName %s %s '%(self.loc_stage_out,self.debugWrap)
264 txt += 'echo "python cmscp.py %s "\n'%cmscp_args
265 txt += 'python cmscp.py %s \n'%cmscp_args
266 if self.debug_wrapper==1:
267 txt += 'echo "which lcg-ls"\n'
268 txt += 'which lcg-ls\n'
269 txt += 'echo "########### details of SE interaction"\n'
270 txt += 'if [ -f .SEinteraction.log ] ;then\n'
271 txt += ' cat .SEinteraction.log\n'
272 txt += 'else\n'
273 txt += ' echo ".SEinteraction.log file not found"\n'
274 txt += 'fi\n'
275 txt += 'echo "#####################################"\n'
276
277 txt += 'if [ -f cmscpReport.sh ] ;then\n'
278 txt += ' cat cmscpReport.sh\n'
279 txt += ' source cmscpReport.sh\n'
280 txt += 'else\n'
281 txt += ' echo "cmscpReport.sh file not found"\n'
282 txt += ' StageOutExitStatus=60307\n'
283 txt += 'fi\n'
284 txt += 'if [ $StageOutExitStatus -ne 0 ]; then\n'
285 txt += ' echo "Problem copying file to $SE $SE_PATH"\n'
286 txt += ' copy_exit_status=$StageOutExitStatus \n'
287 if not self.debug_wrapper==1:
288 txt += 'if [ -f .SEinteraction.log ] ;then\n'
289 txt += ' echo "########## contents of SE interaction"\n'
290 txt += ' cat .SEinteraction.log\n'
291 txt += ' echo "#####################################"\n'
292 txt += 'else\n'
293 txt += ' echo ".SEinteraction.log file not found"\n'
294 txt += 'fi\n'
295 txt += ' job_exit_code=$StageOutExitStatus\n'
296 txt += 'fi\n'
297 txt += 'export TIME_STAGEOUT_END=`date +%s` \n'
298 txt += 'let "TIME_STAGEOUT = TIME_STAGEOUT_END - TIME_STAGEOUT_INI" \n'
299 else:
300 # set stageout timing to a fake value
301 txt += 'export TIME_STAGEOUT=-1 \n'
302 return txt
303
304 def userName(self):
305 """ return the user name """
306 tmp=runCommand("voms-proxy-info -identity 2>/dev/null")
307 return tmp.strip()
308
309 def configOpt_(self):
310 edg_ui_cfg_opt = ' '
311 if self.edg_config:
312 edg_ui_cfg_opt = ' -c ' + self.edg_config + ' '
313 if self.edg_config_vo:
314 edg_ui_cfg_opt += ' --config-vo ' + self.edg_config_vo + ' '
315 return edg_ui_cfg_opt
316
317
318
319 def tags(self):
320 task=common._db.getTask()
321 tags_tmp=string.split(task['jobType'],'"')
322 tags=[str(tags_tmp[1]),str(tags_tmp[3])]
323 return tags
324