ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerArc.py
Revision: 1.9
Committed: Tue Apr 28 12:19:41 2009 UTC (16 years ago) by edelmann
Content type: text/x-python
Branch: MAIN
Changes since 1.8: +36 -47 lines
Log Message:
python/SchedulerArc.py: Retionalized sched_parameter a bit.

File Contents

# User Rev Content
1 edelmann 1.1
2     from SchedulerGrid import SchedulerGrid
3     from Scheduler import Scheduler
4     from crab_exceptions import *
5     from Boss import Boss
6     import common
7     import string, time, os
8     from crab_util import *
9     from WMCore.SiteScreening.BlackWhiteListParser import CEBlackWhiteListParser, \
10     SEBlackWhiteListParser
11    
12     import sys
13 edelmann 1.4 import sha # Good for python 2.4, replaced with hashlib in 2.5
14 edelmann 1.1
15     #
16     # Naming convention:
17     # methods starting with 'ws' are responsible to provide
18     # corresponding part of the job script ('ws' stands for 'write script').
19     #
20    
21     class SchedulerArc(SchedulerGrid):
22     def __init__(self, name='ARC'):
23     SchedulerGrid.__init__(self,name)
24     return
25    
26 edelmann 1.4 def envUniqueID(self):
27     taskHash = sha.new(common._db.queryTask('name')).hexdigest()
28     id = 'https://' + self.name() + '/' + taskHash + '/${NJob}'
29     msg = 'JobID for ML monitoring is created for ARC scheduler: %s' % id
30     common.logger.debug(5, msg)
31     return id
32    
33 edelmann 1.9
34 edelmann 1.1 def realSchedParams(self,cfg_params):
35     """
36 edelmann 1.9 Return dictionary with specific parameters, to use
37     with real scheduler
38 edelmann 1.1 """
39     return {}
40    
41    
42 edelmann 1.6 def configure(self,cfg_params):
43 edelmann 1.1
44 edelmann 1.6 if not os.environ.has_key('EDG_WL_LOCATION'):
45     # This is an ugly hack needed for SchedulerGrid.configure() to
46     # work!
47     os.environ['EDG_WL_LOCATION'] = ''
48    
49     SchedulerGrid.configure(self, cfg_params)
50 edelmann 1.4 self.environment_unique_identifier = None
51 edelmann 1.1
52    
53     def ce_list(self):
54     ceParser = CEBlackWhiteListParser(self.EDG_ce_white_list,
55     self.EDG_ce_black_list, common.logger)
56 edelmann 1.9 wl = ','.join(ceParser.whiteList()) or None
57     bl = ','.join(ceParser.blackList()) or None
58     return '', wl, bl
59 edelmann 1.1
60    
61     def se_list(self, id, dest):
62     se_white = self.blackWhiteListParser.whiteList()
63     se_black = self.blackWhiteListParser.blackList()
64     return '', se_white, se_black
65    
66    
67     def sched_parameter(self,i,task):
68     """
69     Returns parameter scheduler-specific, to use with BOSS .
70     """
71 edelmann 1.9 return self.runtimeXrsl(i, task) + self.clusterXrsl(i, task)
72    
73    
74     def runtimeXrsl(self,i,task):
75     """
76     Return an xRSL-code snippet with required runtime environments
77     """
78     xrsl = "(runTimeEnvironment=\"APPS/HEP/CMSSW-PA\")"
79     for s in task['jobType'].split('&&'):
80     if re.match('Member\(".*", .*RunTimeEnvironment', s):
81     rte = re.sub(", .*", "", re.sub("Member\(", "", s))
82     xrsl += "(runTimeEnvironment=%s)" % rte
83     return xrsl
84    
85    
86     def clusterXrsl(self,i,task):
87     """
88     Return an xRSL-code snippet to select a CE ("cluster", in ARC parlance)
89     """
90 edelmann 1.1 se_dls = task.jobs[i-1]['dlsDestination']
91     blah, se_white, se_black = self.se_list(i, se_dls)
92    
93     se_list = []
94     for se in se_dls:
95     if se_white:
96     if se in se_white: se_list.append(se)
97     elif se_black:
98     if se not in se_black: se_list.append(se)
99     else:
100     se_list.append(se)
101     # FIXME: Check that se_list contains at least one SE!
102    
103 edelmann 1.2 ce_list = self.listMatch(se_list, 'False')
104 edelmann 1.1
105 edelmann 1.9 xrsl = ""
106 edelmann 1.1 if len(ce_list) > 0:
107 edelmann 1.2
108 edelmann 1.1 # A ce-list with more than one element must be an OR:ed
109     # list: (|(cluster=ce1)(cluster=ce2)...)
110     if len(ce_list) > 1:
111 edelmann 1.9 xrsl += '(|'
112 edelmann 1.1 for ce in ce_list:
113 edelmann 1.9 xrsl += '(cluster=%s)' % ce
114 edelmann 1.1 if len(ce_list) > 1:
115 edelmann 1.9 xrsl += ')'
116 edelmann 1.1
117 edelmann 1.9 # FIXME: If ce_list == [] ==> xrsl = "" ==> we'll submit
118 edelmann 1.1 # "anywhere", which is completely contrary behaviour to what we want!
119 edelmann 1.9 # ce_list == [] means there were _no_ CE in ce_infoSys that
120 edelmann 1.1 # survived the white- and black-list filter, so we shouldn't submit
121     # at all!
122    
123 edelmann 1.9 return xrsl
124 edelmann 1.1
125 edelmann 1.8
126 edelmann 1.7 # def wsInitialEnvironment(self):
127     # return ''
128 edelmann 1.1
129 edelmann 1.8
130 edelmann 1.7 def wsExitFunc(self):
131 edelmann 1.1 """
132     Returns part of a job script which does scheduler-specific
133     output checks and management.
134     """
135     txt = '\n'
136    
137     txt += '#\n'
138     txt += '# EXECUTE THIS FUNCTION BEFORE EXIT \n'
139     txt += '#\n\n'
140    
141     txt += 'func_exit() { \n'
142     txt += self.wsExitFunc_common()
143    
144     # Remove ".BrokerInfo" that the code generated by
145     # self.wsExitFunc_common() adds to $final_list. (This is an ugly
146     # hack -- the "good" solution would be to add ARC-knowledge to
147     # self.wsExitFunc_common())
148     txt += " final_list=${final_list%.BrokerInfo}\n"
149    
150     txt += ' echo "JOB_EXIT_STATUS = $job_exit_code"\n'
151     txt += ' echo "JobExitCode=$job_exit_code" >> $RUNTIME_AREA/$repo\n'
152     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
153     txt += ' if [ $exceed -ne 1 ]; then\n'
154     txt += ' tar zcvf ${out_files}.tgz ${final_list}\n'
155     txt += ' else\n'
156     txt += ' tar zcvf ${out_files}.tgz CMSSW_${NJob}.stdout CMSSW_${NJob}.stderr\n'
157     txt += ' fi\n'
158     txt += ' exit $job_exit_code\n'
159     txt += '}\n'
160     return txt
161    
162    
163     def tags(self):
164     return ''
165    
166 edelmann 1.8
167 edelmann 1.1 def submit(self,list,task):
168     """ submit to scheduler a list of jobs """
169     if (not len(list)):
170     common.logger.message("No sites where to submit jobs")
171     req=str(self.sched_parameter(list[0],task))
172    
173     ### reduce collection size...if needed
174     new_list = bulkControl(self,list)
175    
176     for sub_list in new_list:
177     self.boss().submit(task['id'],sub_list,req)
178     return
179 edelmann 1.9
180 edelmann 1.1
181     def queryEverything(self,taskid):
182     """
183     Query needed info of all jobs with specified boss taskid
184     """
185     return self.boss().queryEverything(taskid)
186    
187    
188     def cancel(self,ids):
189     """
190     Cancel the job(s) with ids (a list of id's)
191     """
192     self._boss.cancel(ids)
193     return
194    
195 edelmann 1.9
196 edelmann 1.1 def decodeLogInfo(self, file):
197     """
198     Parse logging info file and return main info
199     """
200     return
201    
202 edelmann 1.9
203 edelmann 1.1 def writeJDL(self, list, task):
204     """
205     Materialize JDL for a list of jobs
206     """
207 edelmann 1.9 # FIXME: Is this function being used?
208 edelmann 1.1 req=str(self.sched_parameter(list[0],task))
209     new_list = bulkControl(self,list)
210     jdl=[]
211     for sub_list in new_list:
212     tmp_jdl = self.boss().writeJDL(task['id'], sub_list, req)
213     jdl.append(tmp_jdl)
214     return jdl