ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Submitter.py
Revision: 1.85
Committed: Fri Jan 18 18:41:04 2008 UTC (17 years, 3 months ago) by slacapra
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_1_0_pre3
Changes since 1.84: +16 -14 lines
Log Message:
Redesign the inheritance three of Scheduler Classes, and remove the SchedulerBoss one
Introduce ScheuledGrid and ScheduleLocal as base classes for Grid and local scheduler, respectively.
All interaction with boss are done via Boss class, whose (unique) instance is owned by Scheduler
These changes are done to reduce code duplication.

Plus other minor modification and cosmetcs, as usual

File Contents

# User Rev Content
1 nsmirnov 1.1 from Actor import *
2 nsmirnov 1.6 from crab_util import *
3 nsmirnov 1.2 import common
4 corvo 1.9 from ApmonIf import ApmonIf
5 slacapra 1.60 #from random import random
6 corvo 1.30 import time
7 slacapra 1.60 from ProgressBar import ProgressBar
8     from TerminalController import TerminalController
9 nsmirnov 1.1
10     class Submitter(Actor):
11 slacapra 1.84 def __init__(self, cfg_params, parsed_range, val):
12 nsmirnov 1.1 self.cfg_params = cfg_params
13 slacapra 1.84
14     # get user request
15     nsjobs = -1
16     chosenJobsList = None
17     if val:
18     if val=='all':
19     pass
20     elif (type(eval(val)) is int) and eval(val) > 0:
21     # positive number
22     nsjobs = eval(val)
23     # NEW PART # Fabio
24     # put here code for LIST MANAGEMEN
25     elif (type(eval(val)) is tuple)or( type(eval(val)) is int and eval(val)<0 ) :
26     chosenJobsList = parsed_range
27     chosenJobsList = [i-1 for i in chosenJobsList ]
28     nsjobs = len(chosenJobsList)
29     #
30     else:
31     msg = 'Bad submission option <'+str(val)+'>\n'
32     msg += ' Must be an integer or "all"'
33     msg += ' Generic range is not allowed"'
34     raise CrabException(msg)
35     pass
36    
37     common.logger.debug(5,'nsjobs '+str(nsjobs))
38     # total jobs
39     nj_list = []
40     # get the first not already submitted
41     common.logger.debug(5,'Total jobs '+str(common.jobDB.nJobs()))
42     jobSetForSubmission = 0
43     jobSkippedInSubmission = []
44     datasetpath=self.cfg_params['CMSSW.datasetpath']
45    
46     # NEW PART # Fabio
47     # modified to handle list of jobs by the users # Fabio
48     tmp_jList = range(common.jobDB.nJobs())
49     if chosenJobsList != None:
50     tmp_jList = chosenJobsList
51     # build job list
52     from BlackWhiteListParser import BlackWhiteListParser
53     self.blackWhiteListParser = BlackWhiteListParser(self.cfg_params)
54     for nj in tmp_jList:
55     cleanedBlackWhiteList = self.blackWhiteListParser.cleanForBlackWhiteList(common.jobDB.destination(nj)) # More readable # Fabio
56     if (cleanedBlackWhiteList != '') or (datasetpath == "None" ) or (datasetpath == None): ## Matty's fix
57     if (common.jobDB.status(nj) not in ['R','S','K','Y','A','D','Z']):
58     jobSetForSubmission +=1
59     nj_list.append(nj)
60     else:
61     continue
62     else :
63     jobSkippedInSubmission.append(nj+1)
64     #
65     if nsjobs >0 and nsjobs == jobSetForSubmission:
66     break
67     pass
68     del tmp_jList
69     #
70    
71     if nsjobs>jobSetForSubmission:
72     common.logger.message('asking to submit '+str(nsjobs)+' jobs, but only '+str(jobSetForSubmission)+' left: submitting those')
73     if len(jobSkippedInSubmission) > 0 :
74     #print jobSkippedInSubmission
75     #print spanRanges(jobSkippedInSubmission)
76     mess =""
77     for jobs in jobSkippedInSubmission:
78     mess += str(jobs) + ","
79     common.logger.message("Jobs: " +str(mess) + "\n skipped because no sites are hosting this data\n")
80     # submit N from last submitted job
81     common.logger.debug(5,'nj_list '+str(nj_list))
82    
83 gutsche 1.58
84 slacapra 1.85 if common.scheduler.name() == 'CONDOR_G':
85 gutsche 1.58 # create hash of cfg file
86     self.hash = makeCksum(common.work_space.cfgFileName())
87     else:
88     self.hash = ''
89 corvo 1.30
90 slacapra 1.84 self.nj_list = nj_list
91    
92     self.UseServer=int(self.cfg_params.get('CRAB.server_mode',0))
93 spiga 1.77
94 nsmirnov 1.1 return
95    
96     def run(self):
97 nsmirnov 1.2 """
98 slacapra 1.53 The main method of the class: submit jobs in range self.nj_list
99 nsmirnov 1.2 """
100     common.logger.debug(5, "Submitter::run() called")
101 slacapra 1.24
102 gutsche 1.42 totalCreatedJobs= 0
103 corvo 1.33 start = time.time()
104 slacapra 1.24 for nj in range(common.jobDB.nJobs()):
105 spiga 1.29 if (common.jobDB.status(nj)=='C') or (common.jobDB.status(nj)=='RC'): totalCreatedJobs +=1
106 slacapra 1.24 pass
107    
108     if (totalCreatedJobs==0):
109     common.logger.message("No jobs to be submitted: first create them")
110 slacapra 1.28 return
111 slacapra 1.60
112 gutsche 1.70 # submit pre DashBoard information
113     params = {'jobId':'TaskMeta'}
114 slacapra 1.85
115 slacapra 1.83 fl = open(common.work_space.shareDir() + '/' + common.apmon.fName, 'r')
116 slacapra 1.85 print fl
117 gutsche 1.70 for i in fl.readlines():
118     val = i.split(':')
119     params[val[0]] = string.strip(val[1])
120 slacapra 1.85 fl.close()
121 gutsche 1.70
122     common.logger.debug(5,'Submission DashBoard Pre-Submission report: '+str(params))
123    
124 slacapra 1.83 common.apmon.sendToML(params)
125 gutsche 1.70
126 spiga 1.77 # modified to support server mode
127     # The boss declare step is performed here
128     # only if crab is used server mode
129     if (self.UseServer== 9999):
130     if not common.scheduler.taskDeclared( common.taskDB.dict('projectName') ): #os.path.basename(os.path.split(common.work_space.topDir())[0]) ):
131     common.logger.debug(5,'Declaring jobs to BOSS')
132 slacapra 1.85 common.scheduler.declare() #Add for BOSS4
133 spiga 1.77 else:
134     common.logger.debug(5,'Jobs already declared into BOSS')
135     common.jobDB.save()
136     common.taskDB.save()
137    
138     #########
139 fanzago 1.16 #########
140 nsmirnov 1.2 # Loop over jobs
141 nsmirnov 1.6 njs = 0
142 corvo 1.15 try:
143 slacapra 1.69 list=[]
144 spiga 1.66 list_of_list = []
145 slacapra 1.57 lastBlock=-1
146 spiga 1.66 count = 0
147 corvo 1.17 for nj in self.nj_list:
148 slacapra 1.53 same=0
149 slacapra 1.49 # first check that status of the job is suitable for submission
150 corvo 1.17 st = common.jobDB.status(nj)
151 slacapra 1.69 if st != 'C' and st != 'K' and st != 'A' and st != 'RC':
152 corvo 1.17 long_st = crabJobStatusToString(st)
153 slacapra 1.19 msg = "Job # %d not submitted: status %s"%(nj+1, long_st)
154 slacapra 1.18 common.logger.message(msg)
155 corvo 1.17 continue
156 corvo 1.72
157 slacapra 1.57 currBlock = common.jobDB.block(nj)
158     # SL perform listmatch only if block has changed
159     if (currBlock!=lastBlock):
160 slacapra 1.85 if common.scheduler.name() != "CONDOR_G" :
161     ### SL TODO to be moved in blackWhiteListParser
162 mcinquil 1.81 ### MATTY: patch for white-black list with the list-mathc in glite ###
163     whiteL = []
164     blackL = []
165 mcinquil 1.82 if self.cfg_params['CRAB.scheduler'].find("glite") != -1:
166     if 'EDG.ce_white_list' in self.cfg_params.keys():
167     #print self.cfg_params['EDG.ce_white_list'].strip().split(",")
168     if self.cfg_params['EDG.ce_white_list'].strip() != "" and self.cfg_params['EDG.ce_white_list'] != None:
169     for ceW in self.cfg_params['EDG.ce_white_list'].strip().split(","):
170     if len(ceW.strip()) > 0 and ceW.strip() != None:
171     whiteL.append(ceW.strip())
172     #print "ADDING white ce = "+str(ceW.strip())
173     if 'EDG.ce_black_list' in self.cfg_params.keys():
174     #print self.cfg_params['EDG.ce_black_list'].strip().split(",")
175     if self.cfg_params['EDG.ce_black_list'].strip() != "" and self.cfg_params['EDG.ce_black_list'] != None:
176     for ceB in self.cfg_params['EDG.ce_black_list'].strip().split(","):
177     if len(ceB.strip()) > 0 and ceB.strip() != None:
178     blackL.append(ceB.strip())
179     #print "ADDING ce = "+str(ceB.strip())
180     match = common.scheduler.listMatch(nj, currBlock, whiteL, blackL)
181 mcinquil 1.81 #######################################################################
182 gutsche 1.54 else :
183     match = "1"
184 slacapra 1.57 lastBlock = currBlock
185 mkirn 1.50 else:
186 slacapra 1.53 common.logger.debug(1,"Sites for job "+str(nj+1)+" the same as previous job")
187     same=1
188 corvo 1.72
189 slacapra 1.49 if match:
190 slacapra 1.53 if not same:
191     common.logger.message("Found "+str(match)+" compatible site(s) for job "+str(nj+1))
192     else:
193     common.logger.debug(1,"Found "+str(match)+" compatible site(s) for job "+str(nj+1))
194 slacapra 1.69 list.append(common.jobDB.bossId(nj))
195 corvo 1.72
196 slacapra 1.69 if nj == self.nj_list[-1]: # check that is not the last job in the list
197     list_of_list.append([currBlock,list])
198     else: # check if next job has same group
199     nextBlock = common.jobDB.block(nj+1)
200     if currBlock != nextBlock : # if not, close this group and reset
201 spiga 1.66 list_of_list.append([currBlock,list])
202 slacapra 1.69 list=[]
203 slacapra 1.49 else:
204 mkirn 1.50 common.logger.message("No compatible site found, will not submit job "+str(nj+1))
205 slacapra 1.49 continue
206 spiga 1.66 count += 1
207 gutsche 1.65 ### Progress Bar indicator, deactivate for debug
208     if not common.logger.debugLevel() :
209     term = TerminalController()
210 corvo 1.72
211 spiga 1.66 for ii in range(len(list_of_list)): # Add loop DS
212 slacapra 1.68 common.logger.debug(1,'Submitting jobs '+str(list_of_list[ii][1]))
213 corvo 1.74 if not common.logger.debugLevel() :
214     try: pbar = ProgressBar(term, 'Submitting '+str(len(list_of_list[ii][1]))+' jobs')
215     except: pbar = None
216 mcinquil 1.78
217 spiga 1.79 jidLista, bjidLista = common.scheduler.submit(list_of_list[ii])
218 slacapra 1.69 bjidLista = map(int, bjidLista) # cast all bjidLista to int
219 mcinquil 1.78
220 corvo 1.74 if not common.logger.debugLevel():
221     if pbar :
222     pbar.update(float(ii+1)/float(len(list_of_list)),'please wait')
223 corvo 1.72
224 corvo 1.67 for jj in bjidLista: # Add loop over SID returned from group submission DS
225 slacapra 1.69 tmpNj = jj - 1
226 mcinquil 1.78
227 corvo 1.67 jid=jidLista[bjidLista.index(jj)]
228 corvo 1.72 common.logger.debug(5,"Submitted job # "+ `(jj)`)
229 slacapra 1.69 common.jobDB.setStatus(tmpNj, 'S')
230     common.jobDB.setJobId(tmpNj, jid)
231 slacapra 1.83 common.jobDB.setTaskId(tmpNj, common.taskDB.dict('taskId'))
232 mcinquil 1.78
233 spiga 1.56 njs += 1
234    
235 slacapra 1.69 ##### DashBoard report #####################
236 spiga 1.80 ## To distinguish if job is direct or through the server
237     if (self.UseServer == 0):
238     Sub_Type = 'Direct'
239     else:
240     Sub_Type = 'Server'
241    
242 slacapra 1.69 try:
243 spiga 1.56 resFlag = 0
244 slacapra 1.69 if st == 'RC': resFlag = 2
245 spiga 1.56 except:
246     pass
247    
248     # OLI: JobID treatment, special for Condor-G scheduler
249     jobId = ''
250 slacapra 1.85 if common.scheduler.name() == 'CONDOR_G':
251 corvo 1.72 jobId = str(jj) + '_' + self.hash + '_' + jid
252 spiga 1.56 common.logger.debug(5,'JobID for ML monitoring is created for CONDOR_G scheduler:'+jobId)
253     else:
254 corvo 1.72 jobId = str(jj) + '_' + jid
255 spiga 1.56 common.logger.debug(5,'JobID for ML monitoring is created for EDG scheduler'+jobId)
256    
257     if ( jid.find(":") != -1 ) :
258     rb = jid.split(':')[1]
259 slacapra 1.69 rb = rb.replace('//', '')
260 spiga 1.56 else :
261 slacapra 1.69 rb = 'OSG'
262 spiga 1.80
263     if len(common.jobDB.destination(tmpNj)) <= 2 :
264     T_SE=string.join((common.jobDB.destination(tmpNj)),",")
265     else :
266     T_SE=str(len(common.jobDB.destination(tmpNj)))+'_Selected_SE'
267 spiga 1.56 params = {'jobId': jobId, \
268 slacapra 1.69 'sid': jid, \
269     'broker': rb, \
270 corvo 1.72 'bossId': jj, \
271 spiga 1.80 'SubmissionType': Sub_Type, \
272     'TargetSE': T_SE,}
273     common.logger.debug(5,str(params))
274 spiga 1.56
275 slacapra 1.83 fl = open(common.work_space.shareDir() + '/' + common.apmon.fName, 'r')
276 spiga 1.56 for i in fl.readlines():
277     val = i.split(':')
278     params[val[0]] = string.strip(val[1])
279 slacapra 1.69 fl.close()
280 corvo 1.72
281 gutsche 1.65 common.logger.debug(5,'Submission DashBoard report: '+str(params))
282    
283 slacapra 1.83 common.apmon.sendToML(params)
284 slacapra 1.69 pass
285     pass
286 corvo 1.17
287     except:
288 corvo 1.27 exctype, value = sys.exc_info()[:2]
289 corvo 1.30 print "Type: %s Value: %s"%(exctype, value)
290 corvo 1.27 common.logger.message("Submitter::run Exception raised: %s %s"%(exctype, value))
291 corvo 1.17 common.jobDB.save()
292 corvo 1.30
293     stop = time.time()
294 gutsche 1.47 common.logger.debug(1, "Submission Time: "+str(stop - start))
295 slacapra 1.60 common.logger.write("Submission time :"+str(stop - start))
296 corvo 1.17 common.jobDB.save()
297 corvo 1.15
298 nsmirnov 1.6 msg = '\nTotal of %d jobs submitted'%njs
299     if njs != len(self.nj_list) :
300     msg += ' (from %d requested).'%(len(self.nj_list))
301     pass
302     else:
303     msg += '.'
304     pass
305 nsmirnov 1.2 common.logger.message(msg)
306 slacapra 1.73 ## add some more verbose message in case submission is not complete
307     if (njs < len(self.nj_list)):
308     msg = 'Submission performed using the Requirements: \n'
309 slacapra 1.76 msg += common.taskDB.dict("jobtype")+' version: '+common.taskDB.dict("codeVersion")+'\n'
310 slacapra 1.85 if self.cfg_params.has_key('EDG.se_white_list'):
311     msg += 'SE White List: '+self.cfg_params['EDG.se_white_list']+'\n'
312     if self.cfg_params.has_key('EDG.se_black_list'):
313     msg += 'SE Black List: '+self.cfg_params['EDG.se_black_list']+'\n'
314     if self.cfg_params.has_key('EDG.ce_white_list'):
315     msg += 'CE White List: '+self.cfg_params['EDG.ce_white_list']+'\n'
316     if self.cfg_params.has_key('EDG.ce_black_list'):
317     msg += 'CE Black List: '+self.cfg_params['EDG.ce_black_list']+'\n'
318 slacapra 1.76 msg += '(Hint: please check if '+common.taskDB.dict("jobtype")+' is available at the Sites)\n'
319 slacapra 1.73
320     common.logger.message(msg)
321    
322 nsmirnov 1.1 return