ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Submitter.py
Revision: 1.89
Committed: Tue Feb 12 09:23:21 2008 UTC (17 years, 2 months ago) by slacapra
Content type: text/x-python
Branch: MAIN
Changes since 1.88: +22 -13 lines
Log Message:
add more verbose error message in case of problem in submission

File Contents

# User Rev Content
1 nsmirnov 1.1 from Actor import *
2 nsmirnov 1.6 from crab_util import *
3 nsmirnov 1.2 import common
4 corvo 1.9 from ApmonIf import ApmonIf
5 slacapra 1.60 #from random import random
6 corvo 1.30 import time
7 slacapra 1.60 from ProgressBar import ProgressBar
8     from TerminalController import TerminalController
9 nsmirnov 1.1
10     class Submitter(Actor):
11 slacapra 1.84 def __init__(self, cfg_params, parsed_range, val):
12 nsmirnov 1.1 self.cfg_params = cfg_params
13 slacapra 1.84
14     # get user request
15     nsjobs = -1
16     chosenJobsList = None
17     if val:
18     if val=='all':
19     pass
20     elif (type(eval(val)) is int) and eval(val) > 0:
21     # positive number
22     nsjobs = eval(val)
23     # NEW PART # Fabio
24     # put here code for LIST MANAGEMEN
25     elif (type(eval(val)) is tuple)or( type(eval(val)) is int and eval(val)<0 ) :
26     chosenJobsList = parsed_range
27     chosenJobsList = [i-1 for i in chosenJobsList ]
28     nsjobs = len(chosenJobsList)
29     #
30     else:
31     msg = 'Bad submission option <'+str(val)+'>\n'
32     msg += ' Must be an integer or "all"'
33     msg += ' Generic range is not allowed"'
34     raise CrabException(msg)
35     pass
36    
37     common.logger.debug(5,'nsjobs '+str(nsjobs))
38     # total jobs
39     nj_list = []
40     # get the first not already submitted
41     common.logger.debug(5,'Total jobs '+str(common.jobDB.nJobs()))
42     jobSetForSubmission = 0
43     jobSkippedInSubmission = []
44     datasetpath=self.cfg_params['CMSSW.datasetpath']
45    
46     # NEW PART # Fabio
47     # modified to handle list of jobs by the users # Fabio
48     tmp_jList = range(common.jobDB.nJobs())
49     if chosenJobsList != None:
50     tmp_jList = chosenJobsList
51     # build job list
52     from BlackWhiteListParser import BlackWhiteListParser
53     self.blackWhiteListParser = BlackWhiteListParser(self.cfg_params)
54     for nj in tmp_jList:
55     cleanedBlackWhiteList = self.blackWhiteListParser.cleanForBlackWhiteList(common.jobDB.destination(nj)) # More readable # Fabio
56     if (cleanedBlackWhiteList != '') or (datasetpath == "None" ) or (datasetpath == None): ## Matty's fix
57     if (common.jobDB.status(nj) not in ['R','S','K','Y','A','D','Z']):
58     jobSetForSubmission +=1
59     nj_list.append(nj)
60     else:
61     continue
62     else :
63     jobSkippedInSubmission.append(nj+1)
64     #
65     if nsjobs >0 and nsjobs == jobSetForSubmission:
66     break
67     pass
68     del tmp_jList
69     #
70    
71     if nsjobs>jobSetForSubmission:
72     common.logger.message('asking to submit '+str(nsjobs)+' jobs, but only '+str(jobSetForSubmission)+' left: submitting those')
73     if len(jobSkippedInSubmission) > 0 :
74     #print jobSkippedInSubmission
75     #print spanRanges(jobSkippedInSubmission)
76     mess =""
77     for jobs in jobSkippedInSubmission:
78     mess += str(jobs) + ","
79     common.logger.message("Jobs: " +str(mess) + "\n skipped because no sites are hosting this data\n")
80 slacapra 1.89 self.submissionError()
81     pass
82 slacapra 1.84 # submit N from last submitted job
83     common.logger.debug(5,'nj_list '+str(nj_list))
84    
85 gutsche 1.58
86 slacapra 1.85 if common.scheduler.name() == 'CONDOR_G':
87 gutsche 1.58 # create hash of cfg file
88     self.hash = makeCksum(common.work_space.cfgFileName())
89     else:
90     self.hash = ''
91 corvo 1.30
92 slacapra 1.84 self.nj_list = nj_list
93    
94     self.UseServer=int(self.cfg_params.get('CRAB.server_mode',0))
95 spiga 1.77
96 nsmirnov 1.1 return
97    
98     def run(self):
99 nsmirnov 1.2 """
100 slacapra 1.53 The main method of the class: submit jobs in range self.nj_list
101 nsmirnov 1.2 """
102     common.logger.debug(5, "Submitter::run() called")
103 slacapra 1.24
104 gutsche 1.42 totalCreatedJobs= 0
105 corvo 1.33 start = time.time()
106 slacapra 1.24 for nj in range(common.jobDB.nJobs()):
107 spiga 1.29 if (common.jobDB.status(nj)=='C') or (common.jobDB.status(nj)=='RC'): totalCreatedJobs +=1
108 slacapra 1.24 pass
109    
110     if (totalCreatedJobs==0):
111     common.logger.message("No jobs to be submitted: first create them")
112 slacapra 1.28 return
113 slacapra 1.60
114 gutsche 1.70 # submit pre DashBoard information
115     params = {'jobId':'TaskMeta'}
116 slacapra 1.85
117 slacapra 1.83 fl = open(common.work_space.shareDir() + '/' + common.apmon.fName, 'r')
118 gutsche 1.70 for i in fl.readlines():
119 slacapra 1.87 key, val = i.split(':')
120     params[key] = string.strip(val)
121 slacapra 1.85 fl.close()
122 gutsche 1.70
123     common.logger.debug(5,'Submission DashBoard Pre-Submission report: '+str(params))
124    
125 slacapra 1.83 common.apmon.sendToML(params)
126 gutsche 1.70
127 spiga 1.77 # modified to support server mode
128     # The boss declare step is performed here
129     # only if crab is used server mode
130     if (self.UseServer== 9999):
131     if not common.scheduler.taskDeclared( common.taskDB.dict('projectName') ): #os.path.basename(os.path.split(common.work_space.topDir())[0]) ):
132     common.logger.debug(5,'Declaring jobs to BOSS')
133 slacapra 1.85 common.scheduler.declare() #Add for BOSS4
134 spiga 1.77 else:
135     common.logger.debug(5,'Jobs already declared into BOSS')
136     common.jobDB.save()
137     common.taskDB.save()
138    
139     #########
140 fanzago 1.16 #########
141 nsmirnov 1.2 # Loop over jobs
142 nsmirnov 1.6 njs = 0
143 corvo 1.15 try:
144 slacapra 1.69 list=[]
145 spiga 1.66 list_of_list = []
146 slacapra 1.57 lastBlock=-1
147 spiga 1.66 count = 0
148 corvo 1.17 for nj in self.nj_list:
149 slacapra 1.53 same=0
150 slacapra 1.49 # first check that status of the job is suitable for submission
151 corvo 1.17 st = common.jobDB.status(nj)
152 slacapra 1.69 if st != 'C' and st != 'K' and st != 'A' and st != 'RC':
153 corvo 1.17 long_st = crabJobStatusToString(st)
154 slacapra 1.19 msg = "Job # %d not submitted: status %s"%(nj+1, long_st)
155 slacapra 1.18 common.logger.message(msg)
156 corvo 1.17 continue
157 corvo 1.72
158 slacapra 1.57 currBlock = common.jobDB.block(nj)
159     # SL perform listmatch only if block has changed
160     if (currBlock!=lastBlock):
161 slacapra 1.85 if common.scheduler.name() != "CONDOR_G" :
162     ### SL TODO to be moved in blackWhiteListParser
163 mcinquil 1.81 ### MATTY: patch for white-black list with the list-mathc in glite ###
164     whiteL = []
165     blackL = []
166 mcinquil 1.82 if self.cfg_params['CRAB.scheduler'].find("glite") != -1:
167     if 'EDG.ce_white_list' in self.cfg_params.keys():
168     #print self.cfg_params['EDG.ce_white_list'].strip().split(",")
169     if self.cfg_params['EDG.ce_white_list'].strip() != "" and self.cfg_params['EDG.ce_white_list'] != None:
170     for ceW in self.cfg_params['EDG.ce_white_list'].strip().split(","):
171     if len(ceW.strip()) > 0 and ceW.strip() != None:
172     whiteL.append(ceW.strip())
173     #print "ADDING white ce = "+str(ceW.strip())
174     if 'EDG.ce_black_list' in self.cfg_params.keys():
175     #print self.cfg_params['EDG.ce_black_list'].strip().split(",")
176     if self.cfg_params['EDG.ce_black_list'].strip() != "" and self.cfg_params['EDG.ce_black_list'] != None:
177     for ceB in self.cfg_params['EDG.ce_black_list'].strip().split(","):
178     if len(ceB.strip()) > 0 and ceB.strip() != None:
179     blackL.append(ceB.strip())
180     #print "ADDING ce = "+str(ceB.strip())
181     match = common.scheduler.listMatch(nj, currBlock, whiteL, blackL)
182 mcinquil 1.81 #######################################################################
183 gutsche 1.54 else :
184     match = "1"
185 slacapra 1.57 lastBlock = currBlock
186 mkirn 1.50 else:
187 slacapra 1.53 common.logger.debug(1,"Sites for job "+str(nj+1)+" the same as previous job")
188     same=1
189 corvo 1.72
190 slacapra 1.49 if match:
191 slacapra 1.53 if not same:
192     common.logger.message("Found "+str(match)+" compatible site(s) for job "+str(nj+1))
193     else:
194     common.logger.debug(1,"Found "+str(match)+" compatible site(s) for job "+str(nj+1))
195 slacapra 1.69 list.append(common.jobDB.bossId(nj))
196 corvo 1.72
197 slacapra 1.69 if nj == self.nj_list[-1]: # check that is not the last job in the list
198     list_of_list.append([currBlock,list])
199     else: # check if next job has same group
200     nextBlock = common.jobDB.block(nj+1)
201     if currBlock != nextBlock : # if not, close this group and reset
202 spiga 1.66 list_of_list.append([currBlock,list])
203 slacapra 1.69 list=[]
204 slacapra 1.49 else:
205 mkirn 1.50 common.logger.message("No compatible site found, will not submit job "+str(nj+1))
206 slacapra 1.89 self.submissionError()
207 slacapra 1.49 continue
208 spiga 1.66 count += 1
209 gutsche 1.65 ### Progress Bar indicator, deactivate for debug
210     if not common.logger.debugLevel() :
211     term = TerminalController()
212 corvo 1.72
213 spiga 1.66 for ii in range(len(list_of_list)): # Add loop DS
214 slacapra 1.68 common.logger.debug(1,'Submitting jobs '+str(list_of_list[ii][1]))
215 corvo 1.74 if not common.logger.debugLevel() :
216     try: pbar = ProgressBar(term, 'Submitting '+str(len(list_of_list[ii][1]))+' jobs')
217     except: pbar = None
218 mcinquil 1.78
219 spiga 1.79 jidLista, bjidLista = common.scheduler.submit(list_of_list[ii])
220 slacapra 1.69 bjidLista = map(int, bjidLista) # cast all bjidLista to int
221 mcinquil 1.78
222 corvo 1.74 if not common.logger.debugLevel():
223     if pbar :
224     pbar.update(float(ii+1)/float(len(list_of_list)),'please wait')
225 corvo 1.72
226 corvo 1.67 for jj in bjidLista: # Add loop over SID returned from group submission DS
227 slacapra 1.69 tmpNj = jj - 1
228 mcinquil 1.78
229 corvo 1.67 jid=jidLista[bjidLista.index(jj)]
230 corvo 1.72 common.logger.debug(5,"Submitted job # "+ `(jj)`)
231 slacapra 1.69 common.jobDB.setStatus(tmpNj, 'S')
232     common.jobDB.setJobId(tmpNj, jid)
233 slacapra 1.83 common.jobDB.setTaskId(tmpNj, common.taskDB.dict('taskId'))
234 mcinquil 1.78
235 spiga 1.56 njs += 1
236    
237 slacapra 1.69 ##### DashBoard report #####################
238 spiga 1.80 ## To distinguish if job is direct or through the server
239     if (self.UseServer == 0):
240     Sub_Type = 'Direct'
241     else:
242     Sub_Type = 'Server'
243    
244 slacapra 1.69 try:
245 spiga 1.56 resFlag = 0
246 slacapra 1.69 if st == 'RC': resFlag = 2
247 spiga 1.56 except:
248     pass
249    
250     # OLI: JobID treatment, special for Condor-G scheduler
251     jobId = ''
252 slacapra 1.86 localId = ''
253 slacapra 1.85 if common.scheduler.name() == 'CONDOR_G':
254 slacapra 1.86 rb = 'OSG'
255 corvo 1.72 jobId = str(jj) + '_' + self.hash + '_' + jid
256 spiga 1.56 common.logger.debug(5,'JobID for ML monitoring is created for CONDOR_G scheduler:'+jobId)
257 slacapra 1.86 elif common.scheduler.name() == 'lsf' or common.scheduler.name() == 'caf':
258 slacapra 1.88 jobId="https://"+common.scheduler.name()+":/"+jid+"-"+string.replace(common.taskDB.dict('taskId'),"_","-")+"-"+str(jj)
259 slacapra 1.86 common.logger.debug(5,'JobID for ML monitoring is created for LSF scheduler:'+jobId)
260     rb = common.scheduler.name()
261     localId = jid
262 spiga 1.56 else:
263 corvo 1.72 jobId = str(jj) + '_' + jid
264 spiga 1.56 common.logger.debug(5,'JobID for ML monitoring is created for EDG scheduler'+jobId)
265     rb = jid.split(':')[1]
266 slacapra 1.69 rb = rb.replace('//', '')
267 spiga 1.80
268     if len(common.jobDB.destination(tmpNj)) <= 2 :
269     T_SE=string.join((common.jobDB.destination(tmpNj)),",")
270     else :
271     T_SE=str(len(common.jobDB.destination(tmpNj)))+'_Selected_SE'
272 slacapra 1.86
273 spiga 1.56 params = {'jobId': jobId, \
274 slacapra 1.69 'sid': jid, \
275     'broker': rb, \
276 corvo 1.72 'bossId': jj, \
277 spiga 1.80 'SubmissionType': Sub_Type, \
278 slacapra 1.86 'TargetSE': T_SE, \
279     'localId' : localId}
280 spiga 1.80 common.logger.debug(5,str(params))
281 spiga 1.56
282 slacapra 1.83 fl = open(common.work_space.shareDir() + '/' + common.apmon.fName, 'r')
283 spiga 1.56 for i in fl.readlines():
284 slacapra 1.87 key, val = i.split(':')
285     params[key] = string.strip(val)
286 slacapra 1.69 fl.close()
287 corvo 1.72
288 gutsche 1.65 common.logger.debug(5,'Submission DashBoard report: '+str(params))
289    
290 slacapra 1.83 common.apmon.sendToML(params)
291 slacapra 1.69 pass
292     pass
293 corvo 1.17
294     except:
295 corvo 1.27 exctype, value = sys.exc_info()[:2]
296 corvo 1.30 print "Type: %s Value: %s"%(exctype, value)
297 corvo 1.27 common.logger.message("Submitter::run Exception raised: %s %s"%(exctype, value))
298 corvo 1.17 common.jobDB.save()
299 corvo 1.30
300     stop = time.time()
301 gutsche 1.47 common.logger.debug(1, "Submission Time: "+str(stop - start))
302 slacapra 1.60 common.logger.write("Submission time :"+str(stop - start))
303 corvo 1.17 common.jobDB.save()
304 corvo 1.15
305 nsmirnov 1.6 msg = '\nTotal of %d jobs submitted'%njs
306     if njs != len(self.nj_list) :
307     msg += ' (from %d requested).'%(len(self.nj_list))
308     pass
309     else:
310     msg += '.'
311     pass
312 nsmirnov 1.2 common.logger.message(msg)
313 slacapra 1.89
314     if (njs < len(self.nj_list) or len(self.nj_list)==0):
315     self.submissionError()
316     return
317    
318     def submissionError(self):
319 slacapra 1.73 ## add some more verbose message in case submission is not complete
320 slacapra 1.89 msg = 'Submission performed using the Requirements: \n'
321     msg += common.taskDB.dict("jobtype")+' version: '+common.taskDB.dict("codeVersion")+'\n'
322     if self.cfg_params.has_key('EDG.se_white_list'):
323     msg += 'SE White List: '+self.cfg_params['EDG.se_white_list']+'\n'
324     if self.cfg_params.has_key('EDG.se_black_list'):
325     msg += 'SE Black List: '+self.cfg_params['EDG.se_black_list']+'\n'
326     if self.cfg_params.has_key('EDG.ce_white_list'):
327     msg += 'CE White List: '+self.cfg_params['EDG.ce_white_list']+'\n'
328     if self.cfg_params.has_key('EDG.ce_black_list'):
329     msg += 'CE Black List: '+self.cfg_params['EDG.ce_black_list']+'\n'
330     msg += '(Hint: please check if '+common.taskDB.dict("jobtype")+' is available at the Sites)\n'
331    
332     common.logger.message(msg)
333 slacapra 1.73
334    
335 nsmirnov 1.1 return