ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Submitter.py
Revision: 1.92
Committed: Thu Feb 14 00:17:21 2008 UTC (17 years, 2 months ago) by ewv
Content type: text/x-python
Branch: MAIN
Changes since 1.91: +39 -36 lines
Log Message:
Fix exceptions for Condor_G

File Contents

# User Rev Content
1 nsmirnov 1.1 from Actor import *
2 nsmirnov 1.6 from crab_util import *
3 nsmirnov 1.2 import common
4 corvo 1.9 from ApmonIf import ApmonIf
5 slacapra 1.60 #from random import random
6 corvo 1.30 import time
7 slacapra 1.60 from ProgressBar import ProgressBar
8     from TerminalController import TerminalController
9 nsmirnov 1.1
10     class Submitter(Actor):
11 slacapra 1.84 def __init__(self, cfg_params, parsed_range, val):
12 nsmirnov 1.1 self.cfg_params = cfg_params
13 slacapra 1.84
14     # get user request
15     nsjobs = -1
16     chosenJobsList = None
17     if val:
18 slacapra 1.91 if val=='range': # for Resubmitter
19     chosenJobsList = parsed_range
20 ewv 1.92 elif val=='all':
21 slacapra 1.84 pass
22     elif (type(eval(val)) is int) and eval(val) > 0:
23     # positive number
24     nsjobs = eval(val)
25     # NEW PART # Fabio
26     # put here code for LIST MANAGEMEN
27     elif (type(eval(val)) is tuple)or( type(eval(val)) is int and eval(val)<0 ) :
28     chosenJobsList = parsed_range
29     chosenJobsList = [i-1 for i in chosenJobsList ]
30 ewv 1.92 nsjobs = len(chosenJobsList)
31 slacapra 1.84 #
32     else:
33     msg = 'Bad submission option <'+str(val)+'>\n'
34     msg += ' Must be an integer or "all"'
35     msg += ' Generic range is not allowed"'
36     raise CrabException(msg)
37     pass
38 ewv 1.92
39 slacapra 1.84 common.logger.debug(5,'nsjobs '+str(nsjobs))
40     # total jobs
41     nj_list = []
42     # get the first not already submitted
43     common.logger.debug(5,'Total jobs '+str(common.jobDB.nJobs()))
44     jobSetForSubmission = 0
45     jobSkippedInSubmission = []
46     datasetpath=self.cfg_params['CMSSW.datasetpath']
47    
48     # NEW PART # Fabio
49     # modified to handle list of jobs by the users # Fabio
50     tmp_jList = range(common.jobDB.nJobs())
51     if chosenJobsList != None:
52     tmp_jList = chosenJobsList
53     # build job list
54     from BlackWhiteListParser import BlackWhiteListParser
55     self.blackWhiteListParser = BlackWhiteListParser(self.cfg_params)
56     for nj in tmp_jList:
57     cleanedBlackWhiteList = self.blackWhiteListParser.cleanForBlackWhiteList(common.jobDB.destination(nj)) # More readable # Fabio
58     if (cleanedBlackWhiteList != '') or (datasetpath == "None" ) or (datasetpath == None): ## Matty's fix
59     if (common.jobDB.status(nj) not in ['R','S','K','Y','A','D','Z']):
60     jobSetForSubmission +=1
61     nj_list.append(nj)
62 ewv 1.92 else:
63 slacapra 1.84 continue
64     else :
65     jobSkippedInSubmission.append(nj+1)
66     #
67     if nsjobs >0 and nsjobs == jobSetForSubmission:
68     break
69     pass
70     del tmp_jList
71     #
72    
73     if nsjobs>jobSetForSubmission:
74     common.logger.message('asking to submit '+str(nsjobs)+' jobs, but only '+str(jobSetForSubmission)+' left: submitting those')
75     if len(jobSkippedInSubmission) > 0 :
76     #print jobSkippedInSubmission
77     #print spanRanges(jobSkippedInSubmission)
78     mess =""
79     for jobs in jobSkippedInSubmission:
80     mess += str(jobs) + ","
81     common.logger.message("Jobs: " +str(mess) + "\n skipped because no sites are hosting this data\n")
82 slacapra 1.89 self.submissionError()
83     pass
84 slacapra 1.84 # submit N from last submitted job
85     common.logger.debug(5,'nj_list '+str(nj_list))
86 ewv 1.92
87    
88 slacapra 1.85 if common.scheduler.name() == 'CONDOR_G':
89 gutsche 1.58 # create hash of cfg file
90     self.hash = makeCksum(common.work_space.cfgFileName())
91     else:
92     self.hash = ''
93 corvo 1.30
94 slacapra 1.84 self.nj_list = nj_list
95    
96     self.UseServer=int(self.cfg_params.get('CRAB.server_mode',0))
97 spiga 1.77
98 nsmirnov 1.1 return
99 ewv 1.92
100 nsmirnov 1.1 def run(self):
101 nsmirnov 1.2 """
102 slacapra 1.53 The main method of the class: submit jobs in range self.nj_list
103 nsmirnov 1.2 """
104     common.logger.debug(5, "Submitter::run() called")
105 slacapra 1.24
106 gutsche 1.42 totalCreatedJobs= 0
107 corvo 1.33 start = time.time()
108 slacapra 1.24 for nj in range(common.jobDB.nJobs()):
109 spiga 1.29 if (common.jobDB.status(nj)=='C') or (common.jobDB.status(nj)=='RC'): totalCreatedJobs +=1
110 slacapra 1.24 pass
111    
112     if (totalCreatedJobs==0):
113     common.logger.message("No jobs to be submitted: first create them")
114 slacapra 1.28 return
115 slacapra 1.60
116 gutsche 1.70 # submit pre DashBoard information
117     params = {'jobId':'TaskMeta'}
118 ewv 1.92
119 slacapra 1.83 fl = open(common.work_space.shareDir() + '/' + common.apmon.fName, 'r')
120 gutsche 1.70 for i in fl.readlines():
121 ewv 1.92 try:
122     key, val = i.split(':')
123     params[key] = string.strip(val)
124     except ValueError: # Not in the right format
125     pass
126 slacapra 1.85 fl.close()
127 gutsche 1.70
128     common.logger.debug(5,'Submission DashBoard Pre-Submission report: '+str(params))
129 ewv 1.92
130 slacapra 1.83 common.apmon.sendToML(params)
131 gutsche 1.70
132 spiga 1.77 # modified to support server mode
133 ewv 1.92 # The boss declare step is performed here
134     # only if crab is used server mode
135 spiga 1.77 if (self.UseServer== 9999):
136     if not common.scheduler.taskDeclared( common.taskDB.dict('projectName') ): #os.path.basename(os.path.split(common.work_space.topDir())[0]) ):
137     common.logger.debug(5,'Declaring jobs to BOSS')
138 slacapra 1.85 common.scheduler.declare() #Add for BOSS4
139 spiga 1.77 else:
140     common.logger.debug(5,'Jobs already declared into BOSS')
141     common.jobDB.save()
142     common.taskDB.save()
143 ewv 1.92
144 spiga 1.77 #########
145 fanzago 1.16 #########
146 nsmirnov 1.2 # Loop over jobs
147 nsmirnov 1.6 njs = 0
148 corvo 1.15 try:
149 slacapra 1.69 list=[]
150 ewv 1.92 list_of_list = []
151 slacapra 1.57 lastBlock=-1
152 spiga 1.66 count = 0
153 corvo 1.17 for nj in self.nj_list:
154 slacapra 1.53 same=0
155 slacapra 1.49 # first check that status of the job is suitable for submission
156 corvo 1.17 st = common.jobDB.status(nj)
157 slacapra 1.69 if st != 'C' and st != 'K' and st != 'A' and st != 'RC':
158 corvo 1.17 long_st = crabJobStatusToString(st)
159 slacapra 1.19 msg = "Job # %d not submitted: status %s"%(nj+1, long_st)
160 slacapra 1.18 common.logger.message(msg)
161 corvo 1.17 continue
162 ewv 1.92
163 slacapra 1.57 currBlock = common.jobDB.block(nj)
164     # SL perform listmatch only if block has changed
165     if (currBlock!=lastBlock):
166 ewv 1.92 if common.scheduler.name().upper() != "CONDOR_G" :
167     ### SL TODO to be moved in blackWhiteListParser
168 mcinquil 1.81 ### MATTY: patch for white-black list with the list-mathc in glite ###
169     whiteL = []
170     blackL = []
171 mcinquil 1.82 if self.cfg_params['CRAB.scheduler'].find("glite") != -1:
172     if 'EDG.ce_white_list' in self.cfg_params.keys():
173     #print self.cfg_params['EDG.ce_white_list'].strip().split(",")
174     if self.cfg_params['EDG.ce_white_list'].strip() != "" and self.cfg_params['EDG.ce_white_list'] != None:
175     for ceW in self.cfg_params['EDG.ce_white_list'].strip().split(","):
176     if len(ceW.strip()) > 0 and ceW.strip() != None:
177     whiteL.append(ceW.strip())
178     #print "ADDING white ce = "+str(ceW.strip())
179     if 'EDG.ce_black_list' in self.cfg_params.keys():
180     #print self.cfg_params['EDG.ce_black_list'].strip().split(",")
181     if self.cfg_params['EDG.ce_black_list'].strip() != "" and self.cfg_params['EDG.ce_black_list'] != None:
182     for ceB in self.cfg_params['EDG.ce_black_list'].strip().split(","):
183     if len(ceB.strip()) > 0 and ceB.strip() != None:
184     blackL.append(ceB.strip())
185     #print "ADDING ce = "+str(ceB.strip())
186     match = common.scheduler.listMatch(nj, currBlock, whiteL, blackL)
187 mcinquil 1.81 #######################################################################
188 gutsche 1.54 else :
189     match = "1"
190 slacapra 1.57 lastBlock = currBlock
191 mkirn 1.50 else:
192 slacapra 1.53 common.logger.debug(1,"Sites for job "+str(nj+1)+" the same as previous job")
193     same=1
194 ewv 1.92
195 slacapra 1.49 if match:
196 slacapra 1.53 if not same:
197     common.logger.message("Found "+str(match)+" compatible site(s) for job "+str(nj+1))
198     else:
199     common.logger.debug(1,"Found "+str(match)+" compatible site(s) for job "+str(nj+1))
200 slacapra 1.69 list.append(common.jobDB.bossId(nj))
201 ewv 1.92
202 slacapra 1.69 if nj == self.nj_list[-1]: # check that is not the last job in the list
203     list_of_list.append([currBlock,list])
204     else: # check if next job has same group
205     nextBlock = common.jobDB.block(nj+1)
206     if currBlock != nextBlock : # if not, close this group and reset
207 spiga 1.66 list_of_list.append([currBlock,list])
208 slacapra 1.69 list=[]
209 slacapra 1.49 else:
210 mkirn 1.50 common.logger.message("No compatible site found, will not submit job "+str(nj+1))
211 slacapra 1.89 self.submissionError()
212 slacapra 1.49 continue
213 spiga 1.66 count += 1
214 gutsche 1.65 ### Progress Bar indicator, deactivate for debug
215     if not common.logger.debugLevel() :
216     term = TerminalController()
217 ewv 1.92
218 spiga 1.66 for ii in range(len(list_of_list)): # Add loop DS
219 slacapra 1.68 common.logger.debug(1,'Submitting jobs '+str(list_of_list[ii][1]))
220 corvo 1.74 if not common.logger.debugLevel() :
221     try: pbar = ProgressBar(term, 'Submitting '+str(len(list_of_list[ii][1]))+' jobs')
222     except: pbar = None
223 mcinquil 1.78
224 spiga 1.79 jidLista, bjidLista = common.scheduler.submit(list_of_list[ii])
225 slacapra 1.69 bjidLista = map(int, bjidLista) # cast all bjidLista to int
226 mcinquil 1.78
227 corvo 1.74 if not common.logger.debugLevel():
228     if pbar :
229     pbar.update(float(ii+1)/float(len(list_of_list)),'please wait')
230 ewv 1.92
231 corvo 1.67 for jj in bjidLista: # Add loop over SID returned from group submission DS
232 slacapra 1.69 tmpNj = jj - 1
233 mcinquil 1.78
234 corvo 1.67 jid=jidLista[bjidLista.index(jj)]
235 corvo 1.72 common.logger.debug(5,"Submitted job # "+ `(jj)`)
236 slacapra 1.69 common.jobDB.setStatus(tmpNj, 'S')
237     common.jobDB.setJobId(tmpNj, jid)
238 slacapra 1.83 common.jobDB.setTaskId(tmpNj, common.taskDB.dict('taskId'))
239 mcinquil 1.78
240 spiga 1.56 njs += 1
241 ewv 1.92
242     ##### DashBoard report #####################
243     ## To distinguish if job is direct or through the server
244 spiga 1.80 if (self.UseServer == 0):
245     Sub_Type = 'Direct'
246 ewv 1.92 else:
247 spiga 1.80 Sub_Type = 'Server'
248 ewv 1.92
249 slacapra 1.69 try:
250 spiga 1.56 resFlag = 0
251 slacapra 1.69 if st == 'RC': resFlag = 2
252 spiga 1.56 except:
253     pass
254 ewv 1.92
255 spiga 1.56 # OLI: JobID treatment, special for Condor-G scheduler
256     jobId = ''
257 slacapra 1.86 localId = ''
258 slacapra 1.85 if common.scheduler.name() == 'CONDOR_G':
259 slacapra 1.86 rb = 'OSG'
260 corvo 1.72 jobId = str(jj) + '_' + self.hash + '_' + jid
261 spiga 1.56 common.logger.debug(5,'JobID for ML monitoring is created for CONDOR_G scheduler:'+jobId)
262 slacapra 1.86 elif common.scheduler.name() == 'lsf' or common.scheduler.name() == 'caf':
263 slacapra 1.88 jobId="https://"+common.scheduler.name()+":/"+jid+"-"+string.replace(common.taskDB.dict('taskId'),"_","-")+"-"+str(jj)
264 slacapra 1.86 common.logger.debug(5,'JobID for ML monitoring is created for LSF scheduler:'+jobId)
265     rb = common.scheduler.name()
266     localId = jid
267 spiga 1.56 else:
268 corvo 1.72 jobId = str(jj) + '_' + jid
269 spiga 1.56 common.logger.debug(5,'JobID for ML monitoring is created for EDG scheduler'+jobId)
270     rb = jid.split(':')[1]
271 slacapra 1.69 rb = rb.replace('//', '')
272 spiga 1.80
273     if len(common.jobDB.destination(tmpNj)) <= 2 :
274 ewv 1.92 T_SE=string.join((common.jobDB.destination(tmpNj)),",")
275 spiga 1.80 else :
276     T_SE=str(len(common.jobDB.destination(tmpNj)))+'_Selected_SE'
277 slacapra 1.86
278 spiga 1.56 params = {'jobId': jobId, \
279 slacapra 1.69 'sid': jid, \
280     'broker': rb, \
281 corvo 1.72 'bossId': jj, \
282 spiga 1.80 'SubmissionType': Sub_Type, \
283 slacapra 1.86 'TargetSE': T_SE, \
284     'localId' : localId}
285 spiga 1.80 common.logger.debug(5,str(params))
286 ewv 1.92
287 slacapra 1.83 fl = open(common.work_space.shareDir() + '/' + common.apmon.fName, 'r')
288 spiga 1.56 for i in fl.readlines():
289 slacapra 1.87 key, val = i.split(':')
290     params[key] = string.strip(val)
291 slacapra 1.69 fl.close()
292 ewv 1.92
293 gutsche 1.65 common.logger.debug(5,'Submission DashBoard report: '+str(params))
294 ewv 1.92
295 slacapra 1.83 common.apmon.sendToML(params)
296 slacapra 1.69 pass
297     pass
298 corvo 1.17
299     except:
300 corvo 1.27 exctype, value = sys.exc_info()[:2]
301 corvo 1.30 print "Type: %s Value: %s"%(exctype, value)
302 corvo 1.27 common.logger.message("Submitter::run Exception raised: %s %s"%(exctype, value))
303 corvo 1.17 common.jobDB.save()
304 ewv 1.92
305 corvo 1.30 stop = time.time()
306 gutsche 1.47 common.logger.debug(1, "Submission Time: "+str(stop - start))
307 slacapra 1.60 common.logger.write("Submission time :"+str(stop - start))
308 corvo 1.17 common.jobDB.save()
309 ewv 1.92
310 nsmirnov 1.6 msg = '\nTotal of %d jobs submitted'%njs
311     if njs != len(self.nj_list) :
312     msg += ' (from %d requested).'%(len(self.nj_list))
313     pass
314     else:
315     msg += '.'
316     pass
317 nsmirnov 1.2 common.logger.message(msg)
318 slacapra 1.89
319     if (njs < len(self.nj_list) or len(self.nj_list)==0):
320     self.submissionError()
321     return
322    
323     def submissionError(self):
324 slacapra 1.73 ## add some more verbose message in case submission is not complete
325 slacapra 1.89 msg = 'Submission performed using the Requirements: \n'
326     msg += common.taskDB.dict("jobtype")+' version: '+common.taskDB.dict("codeVersion")+'\n'
327 slacapra 1.90 msg += '(Hint: please check if '+common.taskDB.dict("jobtype")+' is available at the Sites)\n'
328 slacapra 1.89 if self.cfg_params.has_key('EDG.se_white_list'):
329     msg += 'SE White List: '+self.cfg_params['EDG.se_white_list']+'\n'
330     if self.cfg_params.has_key('EDG.se_black_list'):
331     msg += 'SE Black List: '+self.cfg_params['EDG.se_black_list']+'\n'
332     if self.cfg_params.has_key('EDG.ce_white_list'):
333     msg += 'CE White List: '+self.cfg_params['EDG.ce_white_list']+'\n'
334     if self.cfg_params.has_key('EDG.ce_black_list'):
335     msg += 'CE Black List: '+self.cfg_params['EDG.ce_black_list']+'\n'
336 slacapra 1.90 msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\nPlease check if the dataset is available at this site!)\n'
337 slacapra 1.89
338     common.logger.message(msg)
339 ewv 1.92
340    
341 nsmirnov 1.1 return