ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Submitter.py
Revision: 1.162
Committed: Thu Sep 3 16:14:55 2009 UTC (15 years, 8 months ago) by ewv
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_7_0_pre2
Changes since 1.161: +6 -2 lines
Log Message:
Client-side changes from Sanjay for Glidein, deal with multiple submission attempts

File Contents

# User Rev Content
1 nsmirnov 1.1 from Actor import *
2 nsmirnov 1.6 from crab_util import *
3 nsmirnov 1.2 import common
4 corvo 1.9 from ApmonIf import ApmonIf
5 slacapra 1.60 #from random import random
6 corvo 1.30 import time
7 ewv 1.128 import sha
8 ewv 1.139 import socket
9 ewv 1.140 import Scram
10 slacapra 1.60 from ProgressBar import ProgressBar
11     from TerminalController import TerminalController
12 nsmirnov 1.1
13     class Submitter(Actor):
14 slacapra 1.84 def __init__(self, cfg_params, parsed_range, val):
15 nsmirnov 1.1 self.cfg_params = cfg_params
16 ewv 1.161 self.limitJobs = True
17 slacapra 1.84 # get user request
18 spiga 1.156 self.nsjobs = -1
19     self.chosenJobsList = None
20 slacapra 1.84 if val:
21 slacapra 1.91 if val=='range': # for Resubmitter
22 spiga 1.156 self.chosenJobsList = parsed_range
23 ewv 1.92 elif val=='all':
24 slacapra 1.84 pass
25     elif (type(eval(val)) is int) and eval(val) > 0:
26     # positive number
27 spiga 1.156 self.nsjobs = eval(val)
28 slacapra 1.84 elif (type(eval(val)) is tuple)or( type(eval(val)) is int and eval(val)<0 ) :
29 spiga 1.156 self.chosenJobsList = parsed_range
30 farinafa 1.158 self.nsjobs = len(self.chosenJobsList)
31 slacapra 1.84 else:
32     msg = 'Bad submission option <'+str(val)+'>\n'
33     msg += ' Must be an integer or "all"'
34     msg += ' Generic range is not allowed"'
35     raise CrabException(msg)
36     pass
37 ewv 1.92
38 spiga 1.156 self.seWhiteList = cfg_params.get('GRID.se_white_list',[])
39     self.seBlackList = cfg_params.get('GRID.se_black_list',[])
40 ewv 1.157 self.datasetPath=self.cfg_params['CMSSW.datasetpath']
41     if string.lower(self.datasetPath)=='none':
42     self.datasetPath = None
43 spiga 1.156 self.scram = Scram.Scram(cfg_params)
44     return
45    
46     def BuildJobList(self):
47 slacapra 1.84 # total jobs
48     nj_list = []
49 spiga 1.156 # build job list
50     from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
51     self.blackWhiteListParser = SEBlackWhiteListParser(self.seWhiteList, self.seBlackList, common.logger())
52     common.logger.debug('nsjobs '+str(self.nsjobs))
53 slacapra 1.84 # get the first not already submitted
54 spiga 1.115 self.complete_List = common._db.nJobs('list')
55 spiga 1.145 common.logger.debug('Total jobs '+str(len(self.complete_List)))
56 ewv 1.161
57 slacapra 1.84 jobSetForSubmission = 0
58     jobSkippedInSubmission = []
59 spiga 1.115 tmp_jList = self.complete_List
60 spiga 1.156 if self.chosenJobsList != None:
61     tmp_jList = self.chosenJobsList
62 spiga 1.134 for job in common._db.getTask(tmp_jList).jobs:
63 ewv 1.135 cleanedBlackWhiteList = self.blackWhiteListParser.cleanForBlackWhiteList(job['dlsDestination'])
64 ewv 1.157 if (cleanedBlackWhiteList != '') or (self.datasetPath == None):
65 slacapra 1.155 #if ( job.runningJob['status'] in ['C','RC'] and job.runningJob['statusScheduler'] in ['Created',None]):
66     if ( job.runningJob['state'] in ['Created']):
67 slacapra 1.84 jobSetForSubmission +=1
68 ewv 1.135 nj_list.append(job['id'])
69 ewv 1.92 else:
70 slacapra 1.84 continue
71     else :
72 spiga 1.134 jobSkippedInSubmission.append( job['id'] )
73 spiga 1.156 if self.nsjobs >0 and self.nsjobs == jobSetForSubmission:
74 slacapra 1.84 break
75     pass
76 spiga 1.156 if self.nsjobs>jobSetForSubmission:
77     common.logger.info('asking to submit '+str(self.nsjobs)+' jobs, but only '+\
78 spiga 1.134 str(jobSetForSubmission)+' left: submitting those')
79 slacapra 1.84 if len(jobSkippedInSubmission) > 0 :
80     mess =""
81     for jobs in jobSkippedInSubmission:
82     mess += str(jobs) + ","
83 spiga 1.156 common.logger.info("Jobs: " +str(mess) + "\n\tskipped because no sites are hosting this data\n")
84 slacapra 1.89 self.submissionError()
85     pass
86 slacapra 1.84 # submit N from last submitted job
87 spiga 1.145 common.logger.debug('nj_list '+str(nj_list))
88 slacapra 1.84 self.nj_list = nj_list
89 ewv 1.161 if self.limitJobs and len(self.nj_list) > 500:
90     msg = "The CRAB client will not submit more than 500 jobs.\n"
91     msg += "Use the server mode or submit your jobs in smaller groups"
92     raise CrabException(msg)
93 nsmirnov 1.1 return
94 ewv 1.92
95 nsmirnov 1.1 def run(self):
96 nsmirnov 1.2 """
97 slacapra 1.53 The main method of the class: submit jobs in range self.nj_list
98 nsmirnov 1.2 """
99 spiga 1.145 common.logger.debug("Submitter::run() called")
100 slacapra 1.24
101 spiga 1.112 start = time.time()
102    
103 spiga 1.156 self.BuildJobList()
104    
105 ewv 1.128 check = self.checkIfCreate()
106    
107 spiga 1.112 if check == 0 :
108     self.SendMLpre()
109 ewv 1.128
110     list_matched , task = self.performMatch()
111     njs = self.perfromSubmission(list_matched, task)
112    
113 spiga 1.112 stop = time.time()
114 spiga 1.145 common.logger.debug("Submission Time: "+str(stop - start))
115 ewv 1.128
116 spiga 1.149 msg = 'Total of %d jobs submitted'%njs
117 spiga 1.112 if njs != len(self.nj_list) :
118     msg += ' (from %d requested).'%(len(self.nj_list))
119     else:
120     msg += '.'
121 spiga 1.145 common.logger.info(msg)
122 ewv 1.128
123 spiga 1.112 if (njs < len(self.nj_list) or len(self.nj_list)==0):
124     self.submissionError()
125    
126    
127 ewv 1.128 def checkIfCreate(self):
128 spiga 1.112 """
129     """
130     code = 0
131 spiga 1.94 totalCreatedJobs = 0
132 spiga 1.134 task=common._db.getTask()
133     for job in task.jobs:
134 slacapra 1.155 if job.runningJob['state'] == 'Created': totalCreatedJobs +=1
135 slacapra 1.24
136     if (totalCreatedJobs==0):
137 slacapra 1.155 common.logger.info("No jobs to be submitted: first create them")
138     code = 1
139 ewv 1.128 return code
140 ewv 1.92
141 gutsche 1.70
142 ewv 1.128 def performMatch(self):
143     """
144 spiga 1.113 """
145 spiga 1.145 common.logger.info("Checking available resources...")
146 ewv 1.128 ### define here the list of distinct destinations sites list
147 spiga 1.94 distinct_dests = common._db.queryDistJob_Attr('dlsDestination', 'jobId' ,self.nj_list)
148    
149    
150     ### define here the list of jobs Id for each distinct list of sites
151 spiga 1.112 self.sub_jobs =[] # list of jobs Id list to submit
152 spiga 1.95 jobs_to_match =[] # list of jobs Id to match
153 ewv 1.128 all_jobs=[]
154 spiga 1.94 count=0
155 ewv 1.128 for distDest in distinct_dests:
156 spiga 1.94 all_jobs.append(common._db.queryAttrJob({'dlsDestination':distDest},'jobId'))
157     sub_jobs_temp=[]
158     for i in self.nj_list:
159 ewv 1.128 if i in all_jobs[count]: sub_jobs_temp.append(i)
160 spiga 1.94 if len(sub_jobs_temp)>0:
161 ewv 1.128 self.sub_jobs.append(sub_jobs_temp)
162 spiga 1.112 jobs_to_match.append(self.sub_jobs[count][0])
163 spiga 1.103 count +=1
164 spiga 1.94 sel=0
165 ewv 1.128 matched=[]
166 spiga 1.95
167     task=common._db.getTask()
168     for id_job in jobs_to_match :
169 spiga 1.121 match = common.scheduler.listMatch(distinct_dests[sel], False)
170 slacapra 1.111 if len(match)>0:
171 spiga 1.145 common.logger.info("Found compatible site(s) for job "+str(id_job))
172 slacapra 1.110 matched.append(sel)
173 spiga 1.77 else:
174 spiga 1.145 common.logger.info("No compatible site found, will not submit jobs "+str(self.sub_jobs[sel]))
175 slacapra 1.110 self.submissionError()
176 spiga 1.94 sel += 1
177 ewv 1.92
178 ewv 1.128 return matched , task
179 spiga 1.112
180     def perfromSubmission(self,matched,task):
181    
182 ewv 1.128 njs=0
183    
184 spiga 1.94 ### Progress Bar indicator, deactivate for debug
185 spiga 1.147 if common.debugLevel == 0 :
186 slacapra 1.110 term = TerminalController()
187 ewv 1.128
188     if len(matched)>0:
189 spiga 1.145 common.logger.info(str(len(matched))+" blocks of jobs will be submitted")
190 spiga 1.159 common.logger.debug("Delegating proxy ")
191     try:
192     common.scheduler.delegateProxy()
193     except CrabException:
194     common.logger.debug("Plroxy delegation failed ")
195    
196 ewv 1.128 for ii in matched:
197 spiga 1.145 common.logger.debug('Submitting jobs '+str(self.sub_jobs[ii]))
198 spiga 1.112
199 slacapra 1.110 try:
200 spiga 1.112 common.scheduler.submit(self.sub_jobs[ii],task)
201 slacapra 1.110 except CrabException:
202     raise CrabException("Job not submitted")
203    
204 spiga 1.150 if common.debugLevel == 0 :
205 spiga 1.112 try: pbar = ProgressBar(term, 'Submitting '+str(len(self.sub_jobs[ii]))+' jobs')
206 corvo 1.74 except: pbar = None
207 spiga 1.150 if common.debugLevel == 0:
208 spiga 1.94 if pbar :
209 spiga 1.112 pbar.update(float(ii+1)/float(len(self.sub_jobs)),'please wait')
210 ewv 1.128 ### check the if the submission succeded Maybe not needed or at least simplified
211 spiga 1.112 sched_Id = common._db.queryRunJob('schedulerId', self.sub_jobs[ii])
212 spiga 1.95 listId=[]
213 spiga 1.94 run_jobToSave = {'status' :'S'}
214 spiga 1.108 listRunField = []
215 ewv 1.128 for j in range(len(self.sub_jobs[ii])):
216     if str(sched_Id[j]) != '':
217     listId.append(self.sub_jobs[ii][j])
218     listRunField.append(run_jobToSave)
219 spiga 1.145 common.logger.debug("Submitted job # "+ str(self.sub_jobs[ii][j]))
220 spiga 1.94 njs += 1
221 ewv 1.128 common._db.updateRunJob_(listId, listRunField)
222 mcinquil 1.144 self.stateChange(listId,"SubSuccess")
223 spiga 1.112 self.SendMLpost(self.sub_jobs[ii])
224 spiga 1.94 else:
225 spiga 1.145 common.logger.info("The whole task doesn't found compatible site ")
226 ewv 1.92
227 spiga 1.112 return njs
228 spiga 1.99
229     def submissionError(self):
230     ## add some more verbose message in case submission is not complete
231     msg = 'Submission performed using the Requirements: \n'
232     ### TODO_ DS--BL
233     #msg += common.taskDB.dict("jobtype")+' version: '+common.taskDB.dict("codeVersion")+'\n'
234     #msg += '(Hint: please check if '+common.taskDB.dict("jobtype")+' is available at the Sites)\n'
235 spiga 1.146 if self.cfg_params.has_key('GRID.se_white_list'):
236     msg += '\tSE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
237     if self.cfg_params.has_key('GRID.se_black_list'):
238     msg += '\tSE Black List: '+self.cfg_params['GRID.se_black_list']+'\n'
239     if self.cfg_params.has_key('GRID.ce_white_list'):
240     msg += '\tCE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
241     if self.cfg_params.has_key('GRID.ce_black_list'):
242     msg += '\tCE Black List: '+self.cfg_params['GRID.ce_black_list']+'\n'
243     removeDefBL = self.cfg_params.get('GRID.remove_default_blacklist',0)
244 spiga 1.137 if removeDefBL == '0':
245     msg += '\tNote: All CMS T1s are BlackListed by default \n'
246 spiga 1.136 msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
247     msg += '\tPlease check if :\n'
248     msg += '\t\t -- the dataset is available at this site!\n'
249     msg += '\t\t -- the CMSSW version is available at this site!)\n'
250 spiga 1.145 common.logger.info(msg)
251 spiga 1.112
252     return
253 spiga 1.99
254 spiga 1.112 def collect_MLInfo(self):
255     """
256 ewv 1.129 Prepare DashBoard information
257 spiga 1.112 """
258 ewv 1.92
259 spiga 1.142 taskId = common._db.queryTask('name')
260 spiga 1.112 gridName = string.strip(common.scheduler.userName())
261 spiga 1.151 common.logger.debug("GRIDNAME: %s "%gridName)
262 spiga 1.112 taskType = 'analysis'
263 ewv 1.128
264 spiga 1.112 self.executable = self.cfg_params.get('CMSSW.executable','cmsRun')
265 spiga 1.146 VO = self.cfg_params.get('GRID.virtual_organization','cms')
266 spiga 1.112
267 ewv 1.129 params = {'tool': common.prog_name,
268 ewv 1.157 'SubmissionType':'direct',
269 ewv 1.129 'JSToolVersion': common.prog_version_str,
270     'tool_ui': os.environ.get('HOSTNAME',''),
271     'scheduler': common.scheduler.name(),
272     'GridName': gridName,
273 ewv 1.140 'ApplicationVersion': self.scram.getSWVersion(),
274 ewv 1.129 'taskType': taskType,
275     'vo': VO,
276 spiga 1.142 'CMSUser': getUserName(),
277     'user': getUserName(),
278 spiga 1.143 'taskId': str(taskId),
279 ewv 1.129 'datasetFull': self.datasetPath,
280 ewv 1.128 'exe': self.executable }
281 spiga 1.112
282     return params
283 ewv 1.128
284 spiga 1.112 def SendMLpre(self):
285     """
286 ewv 1.128 Send Pre info to ML
287 spiga 1.112 """
288     params = self.collect_MLInfo()
289 ewv 1.128
290 spiga 1.112 params['jobId'] ='TaskMeta'
291 ewv 1.128
292 spiga 1.112 common.apmon.sendToML(params)
293 ewv 1.128
294 spiga 1.151 common.logger.debug('Submission DashBoard Pre-Submission report: %s'%str(params))
295 ewv 1.128
296 spiga 1.112 return
297 ewv 1.92
298 spiga 1.112 def SendMLpost(self,allList):
299     """
300 ewv 1.128 Send post-submission info to ML
301     """
302     task = common._db.getTask(allList)
303 spiga 1.112
304     params = {}
305     for k,v in self.collect_MLInfo().iteritems():
306     params[k] = v
307 ewv 1.128
308 ewv 1.157 msg = ''
309 spiga 1.112 Sub_Type = 'Direct'
310     for job in task.jobs:
311 ewv 1.128 jj = job['jobId']
312 spiga 1.112 jobId = ''
313     localId = ''
314 ewv 1.128 jid = str(job.runningJob['schedulerId'])
315 ewv 1.162 if common.scheduler.name().upper() in ['CONDOR_G']:
316 spiga 1.112 rb = 'OSG'
317 ewv 1.128 taskHash = sha.new(common._db.queryTask('name')).hexdigest()
318 ewv 1.130 jobId = str(jj) + '_https://' + common.scheduler.name() + '/' + taskHash + '/' + str(jj)
319 spiga 1.151 msg += ('JobID for ML monitoring is created for CONDOR_G scheduler: %s \n'%str(jobId))
320 ewv 1.162 elif common.scheduler.name().upper() in ['GLIDEIN']:
321     rb = common.scheduler.name()
322     jobId = str(jj) + '_https://' + str(jid)
323     msg += ('JobID for ML monitoring is created for GLIDEIN scheduler: %s \n'%str(jobId))
324 ewv 1.128 elif common.scheduler.name().upper() in ['LSF', 'CAF']:
325 spiga 1.138 jobId= str(jj) + "_https://"+common.scheduler.name()+":/"+jid+"-"+string.replace(str(task['name']),"_","-")
326 spiga 1.151 msg += ('JobID for ML monitoring is created for LSF scheduler: %s\n'%str(jobId))
327 spiga 1.112 rb = common.scheduler.name()
328     localId = jid
329 ewv 1.139 elif common.scheduler.name().upper() in ['CONDOR']:
330     taskHash = sha.new(common._db.queryTask('name')).hexdigest()
331     jobId = str(jj) + '_https://' + socket.gethostname() + '/' + taskHash + '/' + str(jj)
332 ewv 1.162 rb = common.scheduler.name()
333 spiga 1.151 msg += ('JobID for ML monitoring is created for CONDOR scheduler: %s\n'%str(jobId))
334 edelmann 1.154 elif common.scheduler.name().upper() in ['ARC']:
335     taskHash = sha.new(common._db.queryTask('name')).hexdigest()
336 edelmann 1.160 jobId = str(jj) + '_https://' + common.scheduler.name() + '/' + taskHash + '/' + str(jj)
337 edelmann 1.154 msg += ('JobID for ML monitoring is created for ARC scheduler: %s\n'%str(jobId))
338     rb = 'ARC'
339 spiga 1.112 else:
340     jobId = str(jj) + '_' + str(jid)
341 spiga 1.151 msg += ('JobID for ML monitoring is created for gLite scheduler %s\n'%str(jobId))
342 spiga 1.112 rb = str(job.runningJob['service'])
343 ewv 1.128
344     dlsDest = job['dlsDestination']
345 spiga 1.125 if len(dlsDest) == 1 :
346     T_SE=str(dlsDest[0])
347     elif len(dlsDest) == 2 :
348     T_SE=str(dlsDest[0])+','+str(dlsDest[1])
349 ewv 1.128 else :
350 spiga 1.112 T_SE=str(len(dlsDest))+'_Selected_SE'
351    
352    
353     infos = { 'jobId': jobId, \
354     'sid': jid, \
355     'broker': rb, \
356     'bossId': jj, \
357     'SubmissionType': Sub_Type, \
358     'TargetSE': T_SE, \
359     'localId' : localId}
360    
361     for k,v in infos.iteritems():
362     params[k] = v
363    
364 spiga 1.151 msg +=('Submission DashBoard report: %s\n'%str(params))
365 spiga 1.112 common.apmon.sendToML(params)
366 spiga 1.151 common.logger.log(10-1,msg)
367 nsmirnov 1.1 return
368 spiga 1.112
369