ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerArc.py
Revision: 1.8
Committed: Mon Apr 27 11:41:15 2009 UTC (16 years ago) by edelmann
Content type: text/x-python
Branch: MAIN
Changes since 1.7: +3 -9 lines
Log Message:
python/SchedulerArc.py: Removed some debugging output.

File Contents

# User Rev Content
1 edelmann 1.1
2     from SchedulerGrid import SchedulerGrid
3     from Scheduler import Scheduler
4     from crab_exceptions import *
5     from Boss import Boss
6     import common
7     import string, time, os
8     from crab_util import *
9     from WMCore.SiteScreening.BlackWhiteListParser import CEBlackWhiteListParser, \
10     SEBlackWhiteListParser
11    
12     import sys
13 edelmann 1.4 import sha # Good for python 2.4, replaced with hashlib in 2.5
14 edelmann 1.1
15     #
16     # Naming convention:
17     # methods starting with 'ws' are responsible to provide
18     # corresponding part of the job script ('ws' stands for 'write script').
19     #
20    
21     class SchedulerArc(SchedulerGrid):
22     def __init__(self, name='ARC'):
23     SchedulerGrid.__init__(self,name)
24     return
25    
26 edelmann 1.4 def envUniqueID(self):
27     taskHash = sha.new(common._db.queryTask('name')).hexdigest()
28     id = 'https://' + self.name() + '/' + taskHash + '/${NJob}'
29     msg = 'JobID for ML monitoring is created for ARC scheduler: %s' % id
30     common.logger.debug(5, msg)
31     return id
32    
33 edelmann 1.1 def realSchedParams(self,cfg_params):
34     """
35     """
36     return {}
37    
38    
39 edelmann 1.6 def configure(self,cfg_params):
40 edelmann 1.1
41 edelmann 1.6 if not os.environ.has_key('EDG_WL_LOCATION'):
42     # This is an ugly hack needed for SchedulerGrid.configure() to
43     # work!
44     os.environ['EDG_WL_LOCATION'] = ''
45    
46     SchedulerGrid.configure(self, cfg_params)
47 edelmann 1.4 self.environment_unique_identifier = None
48 edelmann 1.1
49    
50     def ce_list(self):
51     """
52     Returns string with requirement CE related
53     """
54     ceParser = CEBlackWhiteListParser(self.EDG_ce_white_list,
55     self.EDG_ce_black_list, common.logger)
56     req = ''
57     ce_white_list = []
58     ce_black_list = []
59    
60     if self.EDG_ce_white_list:
61     ce_white_list = ceParser.whiteList()
62     tmpCe=[]
63     for ce in ce_white_list:
64     tmpCe.append('RegExp("' + string.strip(ce) + '", other.GlueCEUniqueId)')
65     if len(tmpCe) == 1:
66     req += " && ( " + tmpCe[0] + " ) "
67     elif len(tmpCe) > 1:
68     concString = ") || ("
69     req += " && ( (" + concString.join(tmpCe) +") )"
70     # Do we need all those parentesis above? Or could we do:
71     #concString = " || "
72     #req += " && ( " + concString.join(tmpCe) +" )"
73    
74     if self.EDG_ce_black_list:
75     ce_black_list = ceParser.blackList()
76     tmpCe=[]
77     concString = '&&'
78     for ce in ce_black_list:
79     tmpCe.append('(!RegExp("' + string.strip(ce) + '", other.GlueCEUniqueId))')
80     if len(tmpCe): req += " && (" + concString.join(tmpCe) + ") "
81    
82     ## requirement added to skip gliteCE
83     #req += '&& (!RegExp("blah", other.GlueCEUniqueId))'
84    
85     retWL = ','.join(ce_white_list)
86     retBL = ','.join(ce_black_list)
87     if not retWL:
88     retWL = None
89     if not retBL:
90     retBL = None
91    
92     return req, retWL, retBL
93    
94    
95     def se_list(self, id, dest):
96     se_white = self.blackWhiteListParser.whiteList()
97     se_black = self.blackWhiteListParser.blackList()
98     return '', se_white, se_black
99    
100    
101     def sched_parameter(self,i,task):
102     """
103     Returns parameter scheduler-specific, to use with BOSS .
104     """
105     se_dls = task.jobs[i-1]['dlsDestination']
106     blah, se_white, se_black = self.se_list(i, se_dls)
107    
108     se_list = []
109     for se in se_dls:
110     if se_white:
111     if se in se_white: se_list.append(se)
112     elif se_black:
113     if se not in se_black: se_list.append(se)
114     else:
115     se_list.append(se)
116     # FIXME: Check that se_list contains at least one SE!
117    
118 edelmann 1.2 ce_list = self.listMatch(se_list, 'False')
119 edelmann 1.1
120     s = ""
121     if len(ce_list) > 0:
122 edelmann 1.2
123 edelmann 1.1 # A ce-list with more than one element must be an OR:ed
124     # list: (|(cluster=ce1)(cluster=ce2)...)
125     if len(ce_list) > 1:
126     s += '(|'
127     for ce in ce_list:
128     s += '(cluster=%s)' % ce
129     if len(ce_list) > 1:
130     s += ')'
131    
132     # FIXME: If len(ce_list) == 0 ==> s = "" ==> we'll submit
133     # "anywhere", which is completely contrary behaviour to what we want!
134     # len(ce_list) == 0 means there were _no_ CE in ce_infoSys that
135     # survived the white- and black-list filter, so we shouldn't submit
136     # at all!
137    
138     return s
139    
140 edelmann 1.8
141 edelmann 1.7 # def wsInitialEnvironment(self):
142     # return ''
143 edelmann 1.1
144 edelmann 1.8
145 edelmann 1.7 def wsExitFunc(self):
146 edelmann 1.1 """
147     Returns part of a job script which does scheduler-specific
148     output checks and management.
149     """
150     txt = '\n'
151    
152     txt += '#\n'
153     txt += '# EXECUTE THIS FUNCTION BEFORE EXIT \n'
154     txt += '#\n\n'
155    
156     txt += 'func_exit() { \n'
157     txt += self.wsExitFunc_common()
158    
159     # Remove ".BrokerInfo" that the code generated by
160     # self.wsExitFunc_common() adds to $final_list. (This is an ugly
161     # hack -- the "good" solution would be to add ARC-knowledge to
162     # self.wsExitFunc_common())
163     txt += " final_list=${final_list%.BrokerInfo}\n"
164    
165     txt += ' echo "JOB_EXIT_STATUS = $job_exit_code"\n'
166     txt += ' echo "JobExitCode=$job_exit_code" >> $RUNTIME_AREA/$repo\n'
167     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
168     txt += ' if [ $exceed -ne 1 ]; then\n'
169     txt += ' tar zcvf ${out_files}.tgz ${final_list}\n'
170     txt += ' else\n'
171     txt += ' tar zcvf ${out_files}.tgz CMSSW_${NJob}.stdout CMSSW_${NJob}.stderr\n'
172     txt += ' fi\n'
173     txt += ' exit $job_exit_code\n'
174     txt += '}\n'
175     return txt
176    
177    
178     def tags(self):
179     return ''
180    
181 edelmann 1.8
182 edelmann 1.1 def submit(self,list,task):
183     """ submit to scheduler a list of jobs """
184     if (not len(list)):
185     common.logger.message("No sites where to submit jobs")
186     req=str(self.sched_parameter(list[0],task))
187    
188     ### reduce collection size...if needed
189     new_list = bulkControl(self,list)
190    
191     for sub_list in new_list:
192     self.boss().submit(task['id'],sub_list,req)
193     return
194    
195     def queryEverything(self,taskid):
196     """
197     Query needed info of all jobs with specified boss taskid
198     """
199     return self.boss().queryEverything(taskid)
200    
201    
202     def cancel(self,ids):
203     """
204     Cancel the job(s) with ids (a list of id's)
205     """
206     self._boss.cancel(ids)
207     return
208    
209     def decodeLogInfo(self, file):
210     """
211     Parse logging info file and return main info
212     """
213     return
214    
215     def writeJDL(self, list, task):
216     """
217     Materialize JDL for a list of jobs
218     """
219     req=str(self.sched_parameter(list[0],task))
220     new_list = bulkControl(self,list)
221     jdl=[]
222     for sub_list in new_list:
223     tmp_jdl = self.boss().writeJDL(task['id'], sub_list, req)
224     jdl.append(tmp_jdl)
225     return jdl