ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerGrid.py
Revision: 1.110
Committed: Fri Jun 5 14:35:13 2009 UTC (15 years, 10 months ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_6_0_pre7
Changes since 1.109: +11 -6 lines
Log Message:
change from Sanjay

File Contents

# User Rev Content
1 ewv 1.69 """
2     Base class for all grid schedulers
3     """
4    
5 spiga 1.110 __revision__ = "$Id: SchedulerGrid.py,v 1.109 2009/05/31 13:19:41 spiga Exp $"
6     __version__ = "$Revision: 1.109 $"
7 ewv 1.69
8 gutsche 1.1 from Scheduler import Scheduler
9     from crab_exceptions import *
10     from crab_util import *
11 ewv 1.81 from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
12 gutsche 1.1 import common
13 spiga 1.70 from PhEDExDatasvcInfo import PhEDExDatasvcInfo
14 spiga 1.28 from JobList import JobList
15 gutsche 1.1
16     import os, sys, time
17    
18     class SchedulerGrid(Scheduler):
19 slacapra 1.8
20     def __init__(self, name):
21     Scheduler.__init__(self,name)
22 gutsche 1.1 self.states = [ "Acl", "cancelReason", "cancelling","ce_node","children", \
23     "children_hist","children_num","children_states","condorId","condor_jdl", \
24     "cpuTime","destination", "done_code","exit_code","expectFrom", \
25     "expectUpdate","globusId","jdl","jobId","jobtype", \
26     "lastUpdateTime","localId","location", "matched_jdl","network_server", \
27     "owner","parent_job", "reason","resubmitted","rsl","seed",\
28     "stateEnterTime","stateEnterTimes","subjob_failed", \
29     "user tags" , "status" , "status_code","hierarchy"]
30     return
31 slacapra 1.12
32 slacapra 1.8 def configure(self, cfg_params):
33 spiga 1.70 self.cfg_params = cfg_params
34 spiga 1.98 self.jobtypeName = cfg_params.get('CRAB.jobtype','')
35     self.schedulerName = cfg_params.get('CRAB.scheduler','')
36 slacapra 1.8 Scheduler.configure(self,cfg_params)
37    
38     self.proxyValid=0
39 spiga 1.105 self.dontCheckProxy=int(cfg_params.get("GRID.dont_check_proxy",0))
40 slacapra 1.8
41 spiga 1.105 self.proxyServer = cfg_params.get("GRID.proxy_server",'myproxy.cern.ch')
42 spiga 1.104 common.logger.debug('Setting myproxy server to '+self.proxyServer)
43 slacapra 1.8
44 spiga 1.105 self.group = cfg_params.get("GRID.group", None)
45     self.role = cfg_params.get("GRID.role", None)
46 slacapra 1.8
47 spiga 1.105 removeT1bL = cfg_params.get("GRID.remove_default_blacklist", 0 )
48 ewv 1.86
49 spiga 1.88 T1_BL = ["fnal.gov", "gridka.de" ,"w-ce01.grid.sinica.edu.tw", "w-ce02.grid.sinica.edu.tw", "lcg00125.grid.sinica.edu.tw",\
50 fanzago 1.107 "gridpp.rl.ac.uk" , "cclcgceli03.in2p3.fr","cclcgceli04.in2p3.fr" , "pic.es", "cnaf", "cern.ch"]
51 spiga 1.90 if int(removeT1bL) == 1:
52     T1_BL = []
53 spiga 1.105 self.EDG_ce_black_list = cfg_params.get('GRID.ce_black_list',None)
54 spiga 1.106 if (self.EDG_ce_black_list):
55     self.EDG_ce_black_list = string.split(self.EDG_ce_black_list,',') + T1_BL
56 spiga 1.83 else :
57 spiga 1.90 if int(removeT1bL) == 0: self.EDG_ce_black_list = T1_BL
58 spiga 1.105 self.EDG_ce_white_list = cfg_params.get('GRID.ce_white_list',None)
59 slacapra 1.8 if (self.EDG_ce_white_list): self.EDG_ce_white_list = string.split(self.EDG_ce_white_list,',')
60 ewv 1.86
61 spiga 1.105 self.VO = cfg_params.get('GRID.virtual_organization','cms')
62     self.EDG_requirements = cfg_params.get('GRID.requirements',None)
63     self.EDG_addJdlParam = cfg_params.get('GRID.additional_jdl_parameters',None)
64 slacapra 1.8
65     if (self.EDG_addJdlParam): self.EDG_addJdlParam = string.split(self.EDG_addJdlParam,';')
66    
67 spiga 1.105 self.EDG_retry_count = cfg_params.get('GRID.retry_count',0)
68     self.EDG_shallow_retry_count= cfg_params.get('GRID.shallow_retry_count',-1)
69     self.EDG_clock_time = cfg_params.get('GRID.max_wall_clock_time',None)
70 slacapra 1.8
71 afanfani 1.23 # Default minimum CPU time to >= 130 minutes
72 spiga 1.105 self.EDG_cpu_time = cfg_params.get('GRID.max_cpu_time', '130')
73 gutsche 1.1
74     # Add EDG_WL_LOCATION to the python path
75 slacapra 1.8 if not os.environ.has_key('EDG_WL_LOCATION'):
76 gutsche 1.1 msg = "Error: the EDG_WL_LOCATION variable is not set."
77 ewv 1.27 raise CrabException(msg)
78     path = os.environ['EDG_WL_LOCATION']
79 gutsche 1.1
80 ewv 1.27 libPath=os.path.join(path, "lib")
81     sys.path.append(libPath)
82     libPath=os.path.join(path, "lib", "python")
83     sys.path.append(libPath)
84 ewv 1.24
85 spiga 1.49 self.checkProxy()
86 gutsche 1.1 return
87 slacapra 1.8
88     def rb_configure(self, RB):
89     """
90 ewv 1.16 Return a requirement to be add to Jdl to select a specific RB/WMS:
91 slacapra 1.8 return None if RB=None
92     To be re-implemented in concrete scheduler
93     """
94     return None
95 ewv 1.24
96     def sched_fix_parameter(self):
97 ewv 1.69 """
98     Returns string with requirements and scheduler-specific parameters
99     """
100     index = int(common._db.nJobs())
101     job = common.job_list[index-1]
102     jbt = job.type()
103     req = ''
104     req = req + jbt.getRequirements()
105    
106     if self.EDG_requirements:
107     if (not req == ' '):
108     req = req + ' && '
109     req = req + self.EDG_requirements
110    
111     taskReq = {'jobType':req}
112     common._db.updateTask_(taskReq)
113 gutsche 1.1
114 spiga 1.50 def listMatch(self, dest, full):
115 ewv 1.58 matching='fast'
116 spiga 1.50 ces=Scheduler.listMatch(self, dest, full)
117 slacapra 1.35 sites=[]
118     for ce in ces:
119     site=ce.split(":")[0]
120     if site not in sites:
121     sites.append(site)
122     pass
123 ewv 1.58 if full == True: matching='full'
124 spiga 1.104 common.logger.debug("list of available site ( "+str(matching) +" matching ) : "+str(sites))
125 slacapra 1.35 return sites
126    
127    
128 gutsche 1.1 def wsSetupEnvironment(self):
129     """
130     Returns part of a job script which does scheduler-specific work.
131     """
132 spiga 1.99 taskId =common._db.queryTask('name')
133 spiga 1.28 index = int(common._db.nJobs())
134     job = common.job_list[index-1]
135     jbt = job.type()
136 slacapra 1.8 if not self.environment_unique_identifier:
137 ewv 1.101 try :
138     self.environment_unique_identifier = self.envUniqueID()
139     except :
140     raise CrabException('environment_unique_identifier not set')
141 slacapra 1.8
142 ewv 1.58 # start with wrapper timing
143 farinafa 1.55 txt = 'export TIME_WRAP_INI=`date +%s` \n'
144 farinafa 1.61 txt += 'export TIME_STAGEOUT=-2 \n\n'
145 farinafa 1.53 txt += '# '+self.name()+' specific stuff\n'
146 slacapra 1.8 txt += '# strip arguments\n'
147     txt += 'echo "strip arguments"\n'
148     txt += 'args=("$@")\n'
149     txt += 'nargs=$#\n'
150     txt += 'shift $nargs\n'
151     txt += "# job number (first parameter for job wrapper)\n"
152 ewv 1.16 txt += "NJob=${args[0]}; export NJob\n"
153 slacapra 1.8
154 spiga 1.29 txt += "out_files=out_files_${NJob}; export out_files\n"
155     txt += "echo $out_files\n"
156 spiga 1.28 txt += jbt.outList()
157 spiga 1.29
158 ewv 1.65 txt += 'MonitorJobID=${NJob}_'+self.environment_unique_identifier+'\n'
159     txt += 'SyncGridJobId='+self.environment_unique_identifier+'\n'
160 spiga 1.89 txt += 'MonitorID='+taskId+'\n'
161 fanzago 1.17 txt += 'echo "MonitorJobID=$MonitorJobID" > $RUNTIME_AREA/$repo \n'
162     txt += 'echo "SyncGridJobId=$SyncGridJobId" >> $RUNTIME_AREA/$repo \n'
163     txt += 'echo "MonitorID=$MonitorID" >> $RUNTIME_AREA/$repo\n'
164 slacapra 1.8
165 fanzago 1.17 txt += 'echo ">>> GridFlavour discovery: " \n'
166 slacapra 1.8 txt += 'if [ $OSG_APP ]; then \n'
167     txt += ' middleware=OSG \n'
168     txt += ' if [ $OSG_JOB_CONTACT ]; then \n'
169     txt += ' SyncCE="$OSG_JOB_CONTACT"; \n'
170 fanzago 1.17 txt += ' echo "SyncCE=$SyncCE" >> $RUNTIME_AREA/$repo ;\n'
171 slacapra 1.8 txt += ' else\n'
172     txt += ' echo "not reporting SyncCE";\n'
173     txt += ' fi\n';
174 fanzago 1.17 txt += ' echo "GridFlavour=$middleware" | tee -a $RUNTIME_AREA/$repo \n'
175 ewv 1.80 txt += ' echo "source OSG GRID setup script" \n'
176     txt += ' source $OSG_GRID/setup.sh \n'
177 ewv 1.108 txt += 'elif [ $NORDUGRID_CE ]; then \n' # We look for $NORDUGRID_CE before $VO_CMS_SW_DIR,
178 edelmann 1.102 txt += ' middleware=ARC \n' # because the latter is defined for ARC too
179     txt += ' echo "SyncCE=$NORDUGRID_CE" >> $RUNTIME_AREA/$repo \n'
180     txt += ' echo "GridFlavour=$middleware" | tee -a $RUNTIME_AREA/$repo \n'
181 slacapra 1.8 txt += 'elif [ $VO_CMS_SW_DIR ]; then \n'
182 spiga 1.109 txt += ' middleware=LCG \n'
183     txt += ' if [ $GLIDEIN_Gatekeeper ]; then \n'
184     txt += ' echo "SyncCE=`echo $GLIDEIN_Gatekeeper | sed -e s/:2119//`" >> $RUNTIME_AREA/$repo \n'
185     txt += ' else \n'
186     txt += ' echo "SyncCE=`glite-brokerinfo getCE`" >> $RUNTIME_AREA/$repo \n'
187     txt += ' fi \n'
188 fanzago 1.17 txt += ' echo "GridFlavour=$middleware" | tee -a $RUNTIME_AREA/$repo \n'
189 gutsche 1.1 txt += 'else \n'
190 fanzago 1.17 txt += ' echo "ERROR ==> GridFlavour not identified" \n'
191     txt += ' job_exit_code=10030 \n'
192     txt += ' func_exit \n'
193 slacapra 1.8 txt += 'fi \n'
194    
195     txt += 'dumpStatus $RUNTIME_AREA/$repo \n'
196 gutsche 1.1 txt += '\n\n'
197    
198 spiga 1.110
199 ewv 1.37 txt += 'export VO='+self.VO+'\n'
200     txt += 'if [ $middleware == LCG ]; then\n'
201 spiga 1.110 txt += ' if [ $GLIDEIN_Gatekeeper ]; then\n'
202     txt += ' CloseCEs=$GLIDEIN_Gatekeeper \n'
203     txt += ' else\n'
204     txt += ' CloseCEs=`glite-brokerinfo getCE`\n'
205     txt += ' fi\n'
206     txt += ' echo "CloseCEs = $CloseCEs"\n'
207     txt += ' CE=`echo $CloseCEs | sed -e "s/:.*//"`\n'
208     txt += ' echo "CE = $CE"\n'
209 ewv 1.37 txt += 'elif [ $middleware == OSG ]; then \n'
210     txt += ' if [ $OSG_JOB_CONTACT ]; then \n'
211     txt += ' CE=`echo $OSG_JOB_CONTACT | /usr/bin/awk -F\/ \'{print $1}\'` \n'
212     txt += ' else \n'
213     txt += ' echo "ERROR ==> OSG mode in setting CE name from OSG_JOB_CONTACT" \n'
214     txt += ' job_exit_code=10099\n'
215     txt += ' func_exit\n'
216     txt += ' fi \n'
217 edelmann 1.102 txt += 'elif [ $middleware == ARC ]; then \n'
218     txt += ' echo "CE = $NORDUGRID_CE"\n'
219 ewv 1.37 txt += 'fi \n'
220    
221 gutsche 1.1 return txt
222    
223     def wsCopyOutput(self):
224     """
225     Write a CopyResults part of a job script, e.g.
226     to copy produced output into a storage element.
227     """
228 spiga 1.87 index = int(common._db.nJobs())
229     job = common.job_list[index-1]
230     jbt = job.type()
231    
232 slacapra 1.8 txt = '\n'
233 gutsche 1.1
234 spiga 1.18 txt += '#\n'
235     txt += '# COPY OUTPUT FILE TO SE\n'
236     txt += '#\n\n'
237 gutsche 1.1
238 slacapra 1.8 if int(self.copy_data) == 1:
239 spiga 1.70 stageout = PhEDExDatasvcInfo(self.cfg_params)
240     endpoint, lfn, SE, SE_PATH, user = stageout.getEndpoint()
241 ewv 1.101 if self.check_RemoteDir == 1 :
242 spiga 1.87 self.checkRemoteDir(endpoint,jbt.outList('list') )
243 spiga 1.70 txt += 'export SE='+SE+'\n'
244     txt += 'echo "SE = $SE"\n'
245 slacapra 1.8 txt += 'export SE_PATH='+SE_PATH+'\n'
246     txt += 'echo "SE_PATH = $SE_PATH"\n'
247 spiga 1.70 txt += 'export LFNBaseName='+lfn+'\n'
248 fanzago 1.73 txt += 'echo "LFNBaseName = $LFNBaseName"\n'
249 spiga 1.70 txt += 'export USER='+user+'\n'
250 fanzago 1.72 txt += 'echo "USER = $USER"\n'
251 spiga 1.70 txt += 'export endpoint='+endpoint+'\n'
252 fanzago 1.72 txt += 'echo "endpoint = $endpoint"\n'
253 ewv 1.79
254 spiga 1.70 txt += 'echo ">>> Copy output files from WN = `hostname` to $SE_PATH :"\n'
255 farinafa 1.56 txt += 'export TIME_STAGEOUT_INI=`date +%s` \n'
256 fanzago 1.17 txt += 'copy_exit_status=0\n'
257 spiga 1.100 cmscp_args = ' --destination $endpoint --inputFileList $file_list'
258     cmscp_args +=' --middleware $middleware --lfn $LFNBaseName %s %s '%(self.loc_stage_out,self.debugWrap)
259 spiga 1.98 txt += 'echo "python cmscp.py %s "\n'%cmscp_args
260     txt += 'python cmscp.py %s \n'%cmscp_args
261 spiga 1.92 if self.debug_wrapper==1:
262 ewv 1.80 txt += 'echo "which lcg-ls"\n'
263     txt += 'which lcg-ls\n'
264 fanzago 1.94 txt += 'echo "########### details of SE interaction"\n'
265 spiga 1.91 txt += 'if [ -f .SEinteraction.log ] ;then\n'
266     txt += ' cat .SEinteraction.log\n'
267     txt += 'else\n'
268     txt += ' echo ".SEinteraction.log file not found"\n'
269 spiga 1.92 txt += 'fi\n'
270 spiga 1.97 txt += 'echo "#####################################"\n'
271 fanzago 1.94
272 spiga 1.92 txt += 'if [ -f cmscpReport.sh ] ;then\n'
273 fanzago 1.94 txt += ' cat cmscpReport.sh\n'
274 spiga 1.92 txt += ' source cmscpReport.sh\n'
275     txt += 'else\n'
276 ewv 1.101 txt += ' echo "cmscpReport.sh file not found"\n'
277 spiga 1.92 txt += ' StageOutExitStatus=60307\n'
278     txt += 'fi\n'
279 spiga 1.70 txt += 'if [ $StageOutExitStatus -ne 0 ]; then\n'
280     txt += ' echo "Problem copying file to $SE $SE_PATH"\n'
281     txt += ' copy_exit_status=$StageOutExitStatus \n'
282 spiga 1.92 if not self.debug_wrapper==1:
283 spiga 1.91 txt += 'if [ -f .SEinteraction.log ] ;then\n'
284 fanzago 1.95 txt += ' echo "########## contents of SE interaction"\n'
285 ewv 1.79 txt += ' cat .SEinteraction.log\n'
286 fanzago 1.95 txt += ' echo "#####################################"\n'
287 spiga 1.91 txt += 'else\n'
288     txt += ' echo ".SEinteraction.log file not found"\n'
289 spiga 1.92 txt += 'fi\n'
290 spiga 1.70 txt += ' job_exit_code=$StageOutExitStatus\n'
291 slacapra 1.8 txt += 'fi\n'
292 farinafa 1.56 txt += 'export TIME_STAGEOUT_END=`date +%s` \n'
293 farinafa 1.55 txt += 'let "TIME_STAGEOUT = TIME_STAGEOUT_END - TIME_STAGEOUT_INI" \n'
294 farinafa 1.53 else:
295     # set stageout timing to a fake value
296     txt += 'export TIME_STAGEOUT=-1 \n'
297 gutsche 1.1 return txt
298    
299 slacapra 1.8 def userName(self):
300     """ return the user name """
301 ewv 1.68 tmp=runCommand("voms-proxy-info -identity 2>/dev/null")
302 slacapra 1.8 return tmp.strip()
303    
304 gutsche 1.1 def configOpt_(self):
305     edg_ui_cfg_opt = ' '
306     if self.edg_config:
307 slacapra 1.8 edg_ui_cfg_opt = ' -c ' + self.edg_config + ' '
308     if self.edg_config_vo:
309     edg_ui_cfg_opt += ' --config-vo ' + self.edg_config_vo + ' '
310 gutsche 1.1 return edg_ui_cfg_opt
311 slacapra 1.8
312    
313    
314 slacapra 1.34 def tags(self):
315     task=common._db.getTask()
316 ewv 1.37 tags_tmp=string.split(task['jobType'],'"')
317 slacapra 1.34 tags=[str(tags_tmp[1]),str(tags_tmp[3])]
318     return tags
319 ewv 1.37