ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerArc.py
Revision: 1.12
Committed: Mon May 25 17:58:35 2009 UTC (15 years, 11 months ago) by edelmann
Content type: text/x-python
Branch: MAIN
Changes since 1.11: +16 -6 lines
Log Message:
Improved checking of runtimeEnvironments.

File Contents

# User Rev Content
1 edelmann 1.1
2     from SchedulerGrid import SchedulerGrid
3     from Scheduler import Scheduler
4     from crab_exceptions import *
5     from Boss import Boss
6     import common
7     import string, time, os
8     from crab_util import *
9     from WMCore.SiteScreening.BlackWhiteListParser import CEBlackWhiteListParser, \
10     SEBlackWhiteListParser
11    
12     import sys
13 edelmann 1.4 import sha # Good for python 2.4, replaced with hashlib in 2.5
14 edelmann 1.1
15     #
16     # Naming convention:
17     # methods starting with 'ws' are responsible to provide
18     # corresponding part of the job script ('ws' stands for 'write script').
19     #
20    
21     class SchedulerArc(SchedulerGrid):
22     def __init__(self, name='ARC'):
23     SchedulerGrid.__init__(self,name)
24     return
25    
26 edelmann 1.4 def envUniqueID(self):
27     taskHash = sha.new(common._db.queryTask('name')).hexdigest()
28     id = 'https://' + self.name() + '/' + taskHash + '/${NJob}'
29     msg = 'JobID for ML monitoring is created for ARC scheduler: %s' % id
30     common.logger.debug(5, msg)
31     return id
32    
33 edelmann 1.9
34 edelmann 1.1 def realSchedParams(self,cfg_params):
35     """
36 edelmann 1.9 Return dictionary with specific parameters, to use
37     with real scheduler
38 edelmann 1.1 """
39     return {}
40    
41    
42 edelmann 1.6 def configure(self,cfg_params):
43 edelmann 1.1
44 edelmann 1.6 if not os.environ.has_key('EDG_WL_LOCATION'):
45     # This is an ugly hack needed for SchedulerGrid.configure() to
46     # work!
47     os.environ['EDG_WL_LOCATION'] = ''
48    
49 edelmann 1.10 if not os.environ.has_key('X509_USER_PROXY'):
50     # Set X509_USER_PROXY to the default location. We'll do this
51     # because in functions called by Scheduler.checkProxy()
52     # voms-proxy-info will be called with '-file $X509_USER_PROXY',
53     # so if X509_USER_PROXY isn't set, it won't work.
54     os.environ['X509_USER_PROXY'] = '/tmp/x509up_u' + str(os.getuid())
55    
56 edelmann 1.6 SchedulerGrid.configure(self, cfg_params)
57 edelmann 1.4 self.environment_unique_identifier = None
58 edelmann 1.1
59    
60 edelmann 1.11 def checkProxy(self, minTime=10):
61     """
62     Function to check the Globus proxy.
63     """
64     if (self.proxyValid): return
65    
66     ### Just return if asked to do so
67     if (self.dontCheckProxy==1):
68     self.proxyValid=1
69     return
70     CredAPI_config = { 'credential':'Proxy',\
71     'myProxySvr': self.proxyServer \
72     }
73     from ProdCommon.Credential.CredentialAPI import CredentialAPI
74     CredAPI = CredentialAPI(CredAPI_config)
75    
76     if not CredAPI.checkCredential(Time=int(minTime)) or \
77     not CredAPI.checkAttribute(group=self.group, role=self.role):
78     try:
79     CredAPI.ManualRenewCredential(group=self.group, role=self.role)
80     except Exception, ex:
81     raise CrabException(str(ex))
82     # cache proxy validity
83     self.proxyValid=1
84     return
85    
86    
87 edelmann 1.1 def ce_list(self):
88     ceParser = CEBlackWhiteListParser(self.EDG_ce_white_list,
89     self.EDG_ce_black_list, common.logger)
90 edelmann 1.9 wl = ','.join(ceParser.whiteList()) or None
91     bl = ','.join(ceParser.blackList()) or None
92     return '', wl, bl
93 edelmann 1.1
94    
95     def se_list(self, id, dest):
96     se_white = self.blackWhiteListParser.whiteList()
97     se_black = self.blackWhiteListParser.blackList()
98     return '', se_white, se_black
99    
100    
101     def sched_parameter(self,i,task):
102     """
103     Returns parameter scheduler-specific, to use with BOSS .
104     """
105 edelmann 1.9 return self.runtimeXrsl(i, task) + self.clusterXrsl(i, task)
106    
107    
108     def runtimeXrsl(self,i,task):
109     """
110     Return an xRSL-code snippet with required runtime environments
111     """
112 edelmann 1.12 xrsl = ""
113     for t in self.tags():
114     xrsl += "(runTimeEnvironment=%s)" % t
115     #xrsl = "(runTimeEnvironment=\"APPS/HEP/CMSSW-PA\")"
116     #for s in task['jobType'].split('&&'):
117     # if re.match('Member\(".*", .*RunTimeEnvironment', s):
118     # rte = re.sub(", .*", "", re.sub("Member\(", "", s))
119     # xrsl += "(runTimeEnvironment=%s)" % rte
120 edelmann 1.9 return xrsl
121    
122    
123     def clusterXrsl(self,i,task):
124     """
125     Return an xRSL-code snippet to select a CE ("cluster", in ARC parlance)
126     """
127 edelmann 1.1 se_dls = task.jobs[i-1]['dlsDestination']
128     blah, se_white, se_black = self.se_list(i, se_dls)
129    
130     se_list = []
131     for se in se_dls:
132     if se_white:
133     if se in se_white: se_list.append(se)
134     elif se_black:
135     if se not in se_black: se_list.append(se)
136     else:
137     se_list.append(se)
138     # FIXME: Check that se_list contains at least one SE!
139    
140 edelmann 1.2 ce_list = self.listMatch(se_list, 'False')
141 edelmann 1.1
142 edelmann 1.9 xrsl = ""
143 edelmann 1.1 if len(ce_list) > 0:
144 edelmann 1.2
145 edelmann 1.1 # A ce-list with more than one element must be an OR:ed
146     # list: (|(cluster=ce1)(cluster=ce2)...)
147     if len(ce_list) > 1:
148 edelmann 1.9 xrsl += '(|'
149 edelmann 1.1 for ce in ce_list:
150 edelmann 1.9 xrsl += '(cluster=%s)' % ce
151 edelmann 1.1 if len(ce_list) > 1:
152 edelmann 1.9 xrsl += ')'
153 edelmann 1.1
154 edelmann 1.9 # FIXME: If ce_list == [] ==> xrsl = "" ==> we'll submit
155 edelmann 1.1 # "anywhere", which is completely contrary behaviour to what we want!
156 edelmann 1.9 # ce_list == [] means there were _no_ CE in ce_infoSys that
157 edelmann 1.1 # survived the white- and black-list filter, so we shouldn't submit
158     # at all!
159    
160 edelmann 1.9 return xrsl
161 edelmann 1.1
162 edelmann 1.8
163 edelmann 1.7 # def wsInitialEnvironment(self):
164     # return ''
165 edelmann 1.1
166 edelmann 1.8
167 edelmann 1.7 def wsExitFunc(self):
168 edelmann 1.1 """
169     Returns part of a job script which does scheduler-specific
170     output checks and management.
171     """
172     txt = '\n'
173    
174     txt += '#\n'
175     txt += '# EXECUTE THIS FUNCTION BEFORE EXIT \n'
176     txt += '#\n\n'
177    
178     txt += 'func_exit() { \n'
179     txt += self.wsExitFunc_common()
180    
181     # Remove ".BrokerInfo" that the code generated by
182     # self.wsExitFunc_common() adds to $final_list. (This is an ugly
183     # hack -- the "good" solution would be to add ARC-knowledge to
184     # self.wsExitFunc_common())
185     txt += " final_list=${final_list%.BrokerInfo}\n"
186    
187     txt += ' echo "JOB_EXIT_STATUS = $job_exit_code"\n'
188     txt += ' echo "JobExitCode=$job_exit_code" >> $RUNTIME_AREA/$repo\n'
189     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
190     txt += ' if [ $exceed -ne 1 ]; then\n'
191     txt += ' tar zcvf ${out_files}.tgz ${final_list}\n'
192     txt += ' else\n'
193     txt += ' tar zcvf ${out_files}.tgz CMSSW_${NJob}.stdout CMSSW_${NJob}.stderr\n'
194     txt += ' fi\n'
195     txt += ' exit $job_exit_code\n'
196     txt += '}\n'
197     return txt
198    
199    
200     def tags(self):
201 edelmann 1.12 task=common._db.getTask()
202     tags = ["APPS/HEP/CMSSW-PA"]
203     for s in task['jobType'].split('&&'):
204     if re.match('Member\(".*", .*RunTimeEnvironment', s):
205     rte = re.sub(", .*", "", re.sub("Member\(", "", s))
206     rte = re.sub("\"", "", rte)
207     tags.append(rte)
208     return tags
209 edelmann 1.1
210 edelmann 1.8
211 edelmann 1.1 def submit(self,list,task):
212     """ submit to scheduler a list of jobs """
213     if (not len(list)):
214     common.logger.message("No sites where to submit jobs")
215     req=str(self.sched_parameter(list[0],task))
216    
217     ### reduce collection size...if needed
218     new_list = bulkControl(self,list)
219    
220     for sub_list in new_list:
221     self.boss().submit(task['id'],sub_list,req)
222     return
223 edelmann 1.9
224 edelmann 1.1
225     def queryEverything(self,taskid):
226     """
227     Query needed info of all jobs with specified boss taskid
228     """
229     return self.boss().queryEverything(taskid)
230    
231    
232     def cancel(self,ids):
233     """
234     Cancel the job(s) with ids (a list of id's)
235     """
236     self._boss.cancel(ids)
237     return
238    
239 edelmann 1.9
240 edelmann 1.1 def decodeLogInfo(self, file):
241     """
242     Parse logging info file and return main info
243     """
244     return
245    
246 edelmann 1.9
247 edelmann 1.1 def writeJDL(self, list, task):
248     """
249     Materialize JDL for a list of jobs
250     """
251 edelmann 1.9 # FIXME: Is this function being used?
252 edelmann 1.1 req=str(self.sched_parameter(list[0],task))
253     new_list = bulkControl(self,list)
254     jdl=[]
255     for sub_list in new_list:
256     tmp_jdl = self.boss().writeJDL(task['id'], sub_list, req)
257     jdl.append(tmp_jdl)
258     return jdl