ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerArc.py
Revision: 1.10
Committed: Tue Apr 28 13:10:43 2009 UTC (16 years ago) by edelmann
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_6_0_pre2, CRAB_2_6_0_pre1
Changes since 1.9: +7 -0 lines
Log Message:
python/SchedulerArc.py: Set X509_USER_PROXY, if it isn't set already.

File Contents

# User Rev Content
1 edelmann 1.1
2     from SchedulerGrid import SchedulerGrid
3     from Scheduler import Scheduler
4     from crab_exceptions import *
5     from Boss import Boss
6     import common
7     import string, time, os
8     from crab_util import *
9     from WMCore.SiteScreening.BlackWhiteListParser import CEBlackWhiteListParser, \
10     SEBlackWhiteListParser
11    
12     import sys
13 edelmann 1.4 import sha # Good for python 2.4, replaced with hashlib in 2.5
14 edelmann 1.1
15     #
16     # Naming convention:
17     # methods starting with 'ws' are responsible to provide
18     # corresponding part of the job script ('ws' stands for 'write script').
19     #
20    
21     class SchedulerArc(SchedulerGrid):
22     def __init__(self, name='ARC'):
23     SchedulerGrid.__init__(self,name)
24     return
25    
26 edelmann 1.4 def envUniqueID(self):
27     taskHash = sha.new(common._db.queryTask('name')).hexdigest()
28     id = 'https://' + self.name() + '/' + taskHash + '/${NJob}'
29     msg = 'JobID for ML monitoring is created for ARC scheduler: %s' % id
30     common.logger.debug(5, msg)
31     return id
32    
33 edelmann 1.9
34 edelmann 1.1 def realSchedParams(self,cfg_params):
35     """
36 edelmann 1.9 Return dictionary with specific parameters, to use
37     with real scheduler
38 edelmann 1.1 """
39     return {}
40    
41    
42 edelmann 1.6 def configure(self,cfg_params):
43 edelmann 1.1
44 edelmann 1.6 if not os.environ.has_key('EDG_WL_LOCATION'):
45     # This is an ugly hack needed for SchedulerGrid.configure() to
46     # work!
47     os.environ['EDG_WL_LOCATION'] = ''
48    
49 edelmann 1.10 if not os.environ.has_key('X509_USER_PROXY'):
50     # Set X509_USER_PROXY to the default location. We'll do this
51     # because in functions called by Scheduler.checkProxy()
52     # voms-proxy-info will be called with '-file $X509_USER_PROXY',
53     # so if X509_USER_PROXY isn't set, it won't work.
54     os.environ['X509_USER_PROXY'] = '/tmp/x509up_u' + str(os.getuid())
55    
56 edelmann 1.6 SchedulerGrid.configure(self, cfg_params)
57 edelmann 1.4 self.environment_unique_identifier = None
58 edelmann 1.1
59    
60     def ce_list(self):
61     ceParser = CEBlackWhiteListParser(self.EDG_ce_white_list,
62     self.EDG_ce_black_list, common.logger)
63 edelmann 1.9 wl = ','.join(ceParser.whiteList()) or None
64     bl = ','.join(ceParser.blackList()) or None
65     return '', wl, bl
66 edelmann 1.1
67    
68     def se_list(self, id, dest):
69     se_white = self.blackWhiteListParser.whiteList()
70     se_black = self.blackWhiteListParser.blackList()
71     return '', se_white, se_black
72    
73    
74     def sched_parameter(self,i,task):
75     """
76     Returns parameter scheduler-specific, to use with BOSS .
77     """
78 edelmann 1.9 return self.runtimeXrsl(i, task) + self.clusterXrsl(i, task)
79    
80    
81     def runtimeXrsl(self,i,task):
82     """
83     Return an xRSL-code snippet with required runtime environments
84     """
85     xrsl = "(runTimeEnvironment=\"APPS/HEP/CMSSW-PA\")"
86     for s in task['jobType'].split('&&'):
87     if re.match('Member\(".*", .*RunTimeEnvironment', s):
88     rte = re.sub(", .*", "", re.sub("Member\(", "", s))
89     xrsl += "(runTimeEnvironment=%s)" % rte
90     return xrsl
91    
92    
93     def clusterXrsl(self,i,task):
94     """
95     Return an xRSL-code snippet to select a CE ("cluster", in ARC parlance)
96     """
97 edelmann 1.1 se_dls = task.jobs[i-1]['dlsDestination']
98     blah, se_white, se_black = self.se_list(i, se_dls)
99    
100     se_list = []
101     for se in se_dls:
102     if se_white:
103     if se in se_white: se_list.append(se)
104     elif se_black:
105     if se not in se_black: se_list.append(se)
106     else:
107     se_list.append(se)
108     # FIXME: Check that se_list contains at least one SE!
109    
110 edelmann 1.2 ce_list = self.listMatch(se_list, 'False')
111 edelmann 1.1
112 edelmann 1.9 xrsl = ""
113 edelmann 1.1 if len(ce_list) > 0:
114 edelmann 1.2
115 edelmann 1.1 # A ce-list with more than one element must be an OR:ed
116     # list: (|(cluster=ce1)(cluster=ce2)...)
117     if len(ce_list) > 1:
118 edelmann 1.9 xrsl += '(|'
119 edelmann 1.1 for ce in ce_list:
120 edelmann 1.9 xrsl += '(cluster=%s)' % ce
121 edelmann 1.1 if len(ce_list) > 1:
122 edelmann 1.9 xrsl += ')'
123 edelmann 1.1
124 edelmann 1.9 # FIXME: If ce_list == [] ==> xrsl = "" ==> we'll submit
125 edelmann 1.1 # "anywhere", which is completely contrary behaviour to what we want!
126 edelmann 1.9 # ce_list == [] means there were _no_ CE in ce_infoSys that
127 edelmann 1.1 # survived the white- and black-list filter, so we shouldn't submit
128     # at all!
129    
130 edelmann 1.9 return xrsl
131 edelmann 1.1
132 edelmann 1.8
133 edelmann 1.7 # def wsInitialEnvironment(self):
134     # return ''
135 edelmann 1.1
136 edelmann 1.8
137 edelmann 1.7 def wsExitFunc(self):
138 edelmann 1.1 """
139     Returns part of a job script which does scheduler-specific
140     output checks and management.
141     """
142     txt = '\n'
143    
144     txt += '#\n'
145     txt += '# EXECUTE THIS FUNCTION BEFORE EXIT \n'
146     txt += '#\n\n'
147    
148     txt += 'func_exit() { \n'
149     txt += self.wsExitFunc_common()
150    
151     # Remove ".BrokerInfo" that the code generated by
152     # self.wsExitFunc_common() adds to $final_list. (This is an ugly
153     # hack -- the "good" solution would be to add ARC-knowledge to
154     # self.wsExitFunc_common())
155     txt += " final_list=${final_list%.BrokerInfo}\n"
156    
157     txt += ' echo "JOB_EXIT_STATUS = $job_exit_code"\n'
158     txt += ' echo "JobExitCode=$job_exit_code" >> $RUNTIME_AREA/$repo\n'
159     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
160     txt += ' if [ $exceed -ne 1 ]; then\n'
161     txt += ' tar zcvf ${out_files}.tgz ${final_list}\n'
162     txt += ' else\n'
163     txt += ' tar zcvf ${out_files}.tgz CMSSW_${NJob}.stdout CMSSW_${NJob}.stderr\n'
164     txt += ' fi\n'
165     txt += ' exit $job_exit_code\n'
166     txt += '}\n'
167     return txt
168    
169    
170     def tags(self):
171     return ''
172    
173 edelmann 1.8
174 edelmann 1.1 def submit(self,list,task):
175     """ submit to scheduler a list of jobs """
176     if (not len(list)):
177     common.logger.message("No sites where to submit jobs")
178     req=str(self.sched_parameter(list[0],task))
179    
180     ### reduce collection size...if needed
181     new_list = bulkControl(self,list)
182    
183     for sub_list in new_list:
184     self.boss().submit(task['id'],sub_list,req)
185     return
186 edelmann 1.9
187 edelmann 1.1
188     def queryEverything(self,taskid):
189     """
190     Query needed info of all jobs with specified boss taskid
191     """
192     return self.boss().queryEverything(taskid)
193    
194    
195     def cancel(self,ids):
196     """
197     Cancel the job(s) with ids (a list of id's)
198     """
199     self._boss.cancel(ids)
200     return
201    
202 edelmann 1.9
203 edelmann 1.1 def decodeLogInfo(self, file):
204     """
205     Parse logging info file and return main info
206     """
207     return
208    
209 edelmann 1.9
210 edelmann 1.1 def writeJDL(self, list, task):
211     """
212     Materialize JDL for a list of jobs
213     """
214 edelmann 1.9 # FIXME: Is this function being used?
215 edelmann 1.1 req=str(self.sched_parameter(list[0],task))
216     new_list = bulkControl(self,list)
217     jdl=[]
218     for sub_list in new_list:
219     tmp_jdl = self.boss().writeJDL(task['id'], sub_list, req)
220     jdl.append(tmp_jdl)
221     return jdl