ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Submitter.py
Revision: 1.84
Committed: Mon Jan 14 12:45:52 2008 UTC (17 years, 3 months ago) by slacapra
Content type: text/x-python
Branch: MAIN
Changes since 1.83: +74 -7 lines
Log Message:
move all Submitter inizialization from crab.py to Submitter.py constructor

File Contents

# User Rev Content
1 nsmirnov 1.1 from Actor import *
2 nsmirnov 1.6 from crab_util import *
3 nsmirnov 1.2 import common
4 corvo 1.9 from ApmonIf import ApmonIf
5 slacapra 1.60 #from random import random
6 corvo 1.30 import time
7 slacapra 1.60 from ProgressBar import ProgressBar
8     from TerminalController import TerminalController
9 nsmirnov 1.1
10     class Submitter(Actor):
11 slacapra 1.84 def __init__(self, cfg_params, parsed_range, val):
12 nsmirnov 1.1 self.cfg_params = cfg_params
13 slacapra 1.84
14     # get user request
15     nsjobs = -1
16     chosenJobsList = None
17     if val:
18     if val=='all':
19     pass
20     elif (type(eval(val)) is int) and eval(val) > 0:
21     # positive number
22     nsjobs = eval(val)
23     # NEW PART # Fabio
24     # put here code for LIST MANAGEMEN
25     elif (type(eval(val)) is tuple)or( type(eval(val)) is int and eval(val)<0 ) :
26     chosenJobsList = parsed_range
27     chosenJobsList = [i-1 for i in chosenJobsList ]
28     nsjobs = len(chosenJobsList)
29     #
30     else:
31     msg = 'Bad submission option <'+str(val)+'>\n'
32     msg += ' Must be an integer or "all"'
33     msg += ' Generic range is not allowed"'
34     raise CrabException(msg)
35     pass
36    
37     common.logger.debug(5,'nsjobs '+str(nsjobs))
38     # total jobs
39     nj_list = []
40     # get the first not already submitted
41     common.logger.debug(5,'Total jobs '+str(common.jobDB.nJobs()))
42     jobSetForSubmission = 0
43     jobSkippedInSubmission = []
44     datasetpath=self.cfg_params['CMSSW.datasetpath']
45    
46     # NEW PART # Fabio
47     # modified to handle list of jobs by the users # Fabio
48     tmp_jList = range(common.jobDB.nJobs())
49     if chosenJobsList != None:
50     tmp_jList = chosenJobsList
51     # build job list
52     from BlackWhiteListParser import BlackWhiteListParser
53     self.blackWhiteListParser = BlackWhiteListParser(self.cfg_params)
54     for nj in tmp_jList:
55     cleanedBlackWhiteList = self.blackWhiteListParser.cleanForBlackWhiteList(common.jobDB.destination(nj)) # More readable # Fabio
56     if (cleanedBlackWhiteList != '') or (datasetpath == "None" ) or (datasetpath == None): ## Matty's fix
57     if (common.jobDB.status(nj) not in ['R','S','K','Y','A','D','Z']):
58     jobSetForSubmission +=1
59     nj_list.append(nj)
60     else:
61     continue
62     else :
63     jobSkippedInSubmission.append(nj+1)
64     #
65     if nsjobs >0 and nsjobs == jobSetForSubmission:
66     break
67     pass
68     del tmp_jList
69     #
70    
71     if nsjobs>jobSetForSubmission:
72     common.logger.message('asking to submit '+str(nsjobs)+' jobs, but only '+str(jobSetForSubmission)+' left: submitting those')
73     if len(jobSkippedInSubmission) > 0 :
74     #print jobSkippedInSubmission
75     #print spanRanges(jobSkippedInSubmission)
76     mess =""
77     for jobs in jobSkippedInSubmission:
78     mess += str(jobs) + ","
79     common.logger.message("Jobs: " +str(mess) + "\n skipped because no sites are hosting this data\n")
80     # submit N from last submitted job
81     common.logger.debug(5,'nj_list '+str(nj_list))
82    
83 gutsche 1.58
84     if common.scheduler.boss_scheduler_name == 'condor_g':
85     # create hash of cfg file
86     self.hash = makeCksum(common.work_space.cfgFileName())
87     else:
88     self.hash = ''
89 corvo 1.30
90 slacapra 1.84 self.nj_list = nj_list
91    
92     self.UseServer=int(self.cfg_params.get('CRAB.server_mode',0))
93 spiga 1.77
94 nsmirnov 1.1 return
95    
96     def run(self):
97 nsmirnov 1.2 """
98 slacapra 1.53 The main method of the class: submit jobs in range self.nj_list
99 nsmirnov 1.2 """
100     common.logger.debug(5, "Submitter::run() called")
101 slacapra 1.24
102 gutsche 1.42 totalCreatedJobs= 0
103 corvo 1.33 start = time.time()
104 slacapra 1.24 for nj in range(common.jobDB.nJobs()):
105 spiga 1.29 if (common.jobDB.status(nj)=='C') or (common.jobDB.status(nj)=='RC'): totalCreatedJobs +=1
106 slacapra 1.24 pass
107    
108     if (totalCreatedJobs==0):
109     common.logger.message("No jobs to be submitted: first create them")
110 slacapra 1.28 return
111 slacapra 1.60
112 gutsche 1.70 # submit pre DashBoard information
113     params = {'jobId':'TaskMeta'}
114    
115 slacapra 1.83 fl = open(common.work_space.shareDir() + '/' + common.apmon.fName, 'r')
116 gutsche 1.70 for i in fl.readlines():
117     val = i.split(':')
118     params[val[0]] = string.strip(val[1])
119     fl.close()
120    
121     common.logger.debug(5,'Submission DashBoard Pre-Submission report: '+str(params))
122    
123 slacapra 1.83 common.apmon.sendToML(params)
124 gutsche 1.70
125 spiga 1.77 # modified to support server mode
126     # The boss declare step is performed here
127     # only if crab is used server mode
128     if (self.UseServer== 9999):
129     if not common.scheduler.taskDeclared( common.taskDB.dict('projectName') ): #os.path.basename(os.path.split(common.work_space.topDir())[0]) ):
130     common.logger.debug(5,'Declaring jobs to BOSS')
131     common.scheduler.declareJob_() #Add for BOSS4
132     else:
133     common.logger.debug(5,'Jobs already declared into BOSS')
134     common.jobDB.save()
135     common.taskDB.save()
136    
137     #########
138 fanzago 1.16 #########
139 nsmirnov 1.2 # Loop over jobs
140 nsmirnov 1.6 njs = 0
141 corvo 1.15 try:
142 slacapra 1.69 list=[]
143 spiga 1.66 list_of_list = []
144 slacapra 1.57 lastBlock=-1
145 spiga 1.66 count = 0
146 corvo 1.17 for nj in self.nj_list:
147 slacapra 1.53 same=0
148 slacapra 1.49 # first check that status of the job is suitable for submission
149 corvo 1.17 st = common.jobDB.status(nj)
150 slacapra 1.69 if st != 'C' and st != 'K' and st != 'A' and st != 'RC':
151 corvo 1.17 long_st = crabJobStatusToString(st)
152 slacapra 1.19 msg = "Job # %d not submitted: status %s"%(nj+1, long_st)
153 slacapra 1.18 common.logger.message(msg)
154 corvo 1.17 continue
155 corvo 1.72
156 slacapra 1.57 currBlock = common.jobDB.block(nj)
157     # SL perform listmatch only if block has changed
158     if (currBlock!=lastBlock):
159 gutsche 1.54 if common.scheduler.boss_scheduler_name != "condor_g" :
160 mcinquil 1.81 ### MATTY: patch for white-black list with the list-mathc in glite ###
161     whiteL = []
162     blackL = []
163 mcinquil 1.82 if self.cfg_params['CRAB.scheduler'].find("glite") != -1:
164     if 'EDG.ce_white_list' in self.cfg_params.keys():
165     #print self.cfg_params['EDG.ce_white_list'].strip().split(",")
166     if self.cfg_params['EDG.ce_white_list'].strip() != "" and self.cfg_params['EDG.ce_white_list'] != None:
167     for ceW in self.cfg_params['EDG.ce_white_list'].strip().split(","):
168     if len(ceW.strip()) > 0 and ceW.strip() != None:
169     whiteL.append(ceW.strip())
170     #print "ADDING white ce = "+str(ceW.strip())
171     if 'EDG.ce_black_list' in self.cfg_params.keys():
172     #print self.cfg_params['EDG.ce_black_list'].strip().split(",")
173     if self.cfg_params['EDG.ce_black_list'].strip() != "" and self.cfg_params['EDG.ce_black_list'] != None:
174     for ceB in self.cfg_params['EDG.ce_black_list'].strip().split(","):
175     if len(ceB.strip()) > 0 and ceB.strip() != None:
176     blackL.append(ceB.strip())
177     #print "ADDING ce = "+str(ceB.strip())
178     match = common.scheduler.listMatch(nj, currBlock, whiteL, blackL)
179 mcinquil 1.81 #######################################################################
180 gutsche 1.54 else :
181     match = "1"
182 slacapra 1.57 lastBlock = currBlock
183 mkirn 1.50 else:
184 slacapra 1.53 common.logger.debug(1,"Sites for job "+str(nj+1)+" the same as previous job")
185     same=1
186 corvo 1.72
187 slacapra 1.49 if match:
188 slacapra 1.53 if not same:
189     common.logger.message("Found "+str(match)+" compatible site(s) for job "+str(nj+1))
190     else:
191     common.logger.debug(1,"Found "+str(match)+" compatible site(s) for job "+str(nj+1))
192 slacapra 1.69 list.append(common.jobDB.bossId(nj))
193 corvo 1.72
194 slacapra 1.69 if nj == self.nj_list[-1]: # check that is not the last job in the list
195     list_of_list.append([currBlock,list])
196     else: # check if next job has same group
197     nextBlock = common.jobDB.block(nj+1)
198     if currBlock != nextBlock : # if not, close this group and reset
199 spiga 1.66 list_of_list.append([currBlock,list])
200 slacapra 1.69 list=[]
201 slacapra 1.49 else:
202 mkirn 1.50 common.logger.message("No compatible site found, will not submit job "+str(nj+1))
203 slacapra 1.49 continue
204 spiga 1.66 count += 1
205 gutsche 1.65 ### Progress Bar indicator, deactivate for debug
206     if not common.logger.debugLevel() :
207     term = TerminalController()
208 corvo 1.72
209 spiga 1.66 for ii in range(len(list_of_list)): # Add loop DS
210 slacapra 1.68 common.logger.debug(1,'Submitting jobs '+str(list_of_list[ii][1]))
211 corvo 1.74 if not common.logger.debugLevel() :
212     try: pbar = ProgressBar(term, 'Submitting '+str(len(list_of_list[ii][1]))+' jobs')
213     except: pbar = None
214 mcinquil 1.78
215 spiga 1.79 jidLista, bjidLista = common.scheduler.submit(list_of_list[ii])
216 slacapra 1.69 bjidLista = map(int, bjidLista) # cast all bjidLista to int
217 mcinquil 1.78
218 corvo 1.74 if not common.logger.debugLevel():
219     if pbar :
220     pbar.update(float(ii+1)/float(len(list_of_list)),'please wait')
221 corvo 1.72
222 corvo 1.67 for jj in bjidLista: # Add loop over SID returned from group submission DS
223 slacapra 1.69 tmpNj = jj - 1
224 mcinquil 1.78
225 corvo 1.67 jid=jidLista[bjidLista.index(jj)]
226 corvo 1.72 common.logger.debug(5,"Submitted job # "+ `(jj)`)
227 slacapra 1.69 common.jobDB.setStatus(tmpNj, 'S')
228     common.jobDB.setJobId(tmpNj, jid)
229 slacapra 1.83 common.jobDB.setTaskId(tmpNj, common.taskDB.dict('taskId'))
230 mcinquil 1.78
231 spiga 1.56 njs += 1
232    
233 slacapra 1.69 ##### DashBoard report #####################
234 spiga 1.80 ## To distinguish if job is direct or through the server
235     if (self.UseServer == 0):
236     Sub_Type = 'Direct'
237     else:
238     Sub_Type = 'Server'
239    
240 slacapra 1.69 try:
241 spiga 1.56 resFlag = 0
242 slacapra 1.69 if st == 'RC': resFlag = 2
243 spiga 1.56 except:
244     pass
245    
246     # OLI: JobID treatment, special for Condor-G scheduler
247     jobId = ''
248     if common.scheduler.boss_scheduler_name == 'condor_g':
249 corvo 1.72 jobId = str(jj) + '_' + self.hash + '_' + jid
250 spiga 1.56 common.logger.debug(5,'JobID for ML monitoring is created for CONDOR_G scheduler:'+jobId)
251     else:
252 corvo 1.72 jobId = str(jj) + '_' + jid
253 spiga 1.56 common.logger.debug(5,'JobID for ML monitoring is created for EDG scheduler'+jobId)
254    
255     if ( jid.find(":") != -1 ) :
256     rb = jid.split(':')[1]
257 slacapra 1.69 rb = rb.replace('//', '')
258 spiga 1.56 else :
259 slacapra 1.69 rb = 'OSG'
260 spiga 1.80
261     if len(common.jobDB.destination(tmpNj)) <= 2 :
262     T_SE=string.join((common.jobDB.destination(tmpNj)),",")
263     else :
264     T_SE=str(len(common.jobDB.destination(tmpNj)))+'_Selected_SE'
265 spiga 1.56 params = {'jobId': jobId, \
266 slacapra 1.69 'sid': jid, \
267     'broker': rb, \
268 corvo 1.72 'bossId': jj, \
269 spiga 1.80 'SubmissionType': Sub_Type, \
270     'TargetSE': T_SE,}
271     common.logger.debug(5,str(params))
272 spiga 1.56
273 slacapra 1.83 fl = open(common.work_space.shareDir() + '/' + common.apmon.fName, 'r')
274 spiga 1.56 for i in fl.readlines():
275     val = i.split(':')
276     params[val[0]] = string.strip(val[1])
277 slacapra 1.69 fl.close()
278 corvo 1.72
279 gutsche 1.65 common.logger.debug(5,'Submission DashBoard report: '+str(params))
280    
281 slacapra 1.83 common.apmon.sendToML(params)
282 slacapra 1.69 pass
283     pass
284 corvo 1.17
285     except:
286 corvo 1.27 exctype, value = sys.exc_info()[:2]
287 corvo 1.30 print "Type: %s Value: %s"%(exctype, value)
288 corvo 1.27 common.logger.message("Submitter::run Exception raised: %s %s"%(exctype, value))
289 corvo 1.17 common.jobDB.save()
290 corvo 1.30
291     stop = time.time()
292 gutsche 1.47 common.logger.debug(1, "Submission Time: "+str(stop - start))
293 slacapra 1.60 common.logger.write("Submission time :"+str(stop - start))
294 corvo 1.17 common.jobDB.save()
295 corvo 1.15
296 nsmirnov 1.6 msg = '\nTotal of %d jobs submitted'%njs
297     if njs != len(self.nj_list) :
298     msg += ' (from %d requested).'%(len(self.nj_list))
299     pass
300     else:
301     msg += '.'
302     pass
303 nsmirnov 1.2 common.logger.message(msg)
304 slacapra 1.73 ## add some more verbose message in case submission is not complete
305     if (njs < len(self.nj_list)):
306     msg = 'Submission performed using the Requirements: \n'
307 slacapra 1.76 msg += common.taskDB.dict("jobtype")+' version: '+common.taskDB.dict("codeVersion")+'\n'
308 slacapra 1.73 try: msg += 'SE White List: '+self.cfg_params['EDG.se_white_list']+'\n'
309     except KeyError: pass
310     try: msg += 'SE Black List: '+self.cfg_params['EDG.se_black_list']+'\n'
311     except KeyError: pass
312     try: msg += 'CE White List: '+self.cfg_params['EDG.ce_white_list']+'\n'
313     except KeyError: pass
314     try: msg += 'CE Black List: '+self.cfg_params['EDG.ce_black_list']+'\n'
315     except KeyError: pass
316 slacapra 1.76 msg += '(Hint: please check if '+common.taskDB.dict("jobtype")+' is available at the Sites)\n'
317 slacapra 1.73
318     common.logger.message(msg)
319    
320 nsmirnov 1.1 return