ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Submitter.py
Revision: 1.135
Committed: Thu Oct 30 16:25:24 2008 UTC (16 years, 6 months ago) by ewv
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_4_2_pre3, CRAB_2_4_2_pre2
Changes since 1.134: +5 -3 lines
Log Message:
Don't pass cfg_params to SiteScreening

File Contents

# User Rev Content
1 nsmirnov 1.1 from Actor import *
2 nsmirnov 1.6 from crab_util import *
3 nsmirnov 1.2 import common
4 corvo 1.9 from ApmonIf import ApmonIf
5 slacapra 1.60 #from random import random
6 corvo 1.30 import time
7 ewv 1.128 import sha
8 slacapra 1.60 from ProgressBar import ProgressBar
9     from TerminalController import TerminalController
10 nsmirnov 1.1
11     class Submitter(Actor):
12 slacapra 1.84 def __init__(self, cfg_params, parsed_range, val):
13 nsmirnov 1.1 self.cfg_params = cfg_params
14 slacapra 1.84
15     # get user request
16     nsjobs = -1
17     chosenJobsList = None
18     if val:
19 slacapra 1.91 if val=='range': # for Resubmitter
20     chosenJobsList = parsed_range
21 ewv 1.92 elif val=='all':
22 slacapra 1.84 pass
23     elif (type(eval(val)) is int) and eval(val) > 0:
24     # positive number
25     nsjobs = eval(val)
26     elif (type(eval(val)) is tuple)or( type(eval(val)) is int and eval(val)<0 ) :
27     chosenJobsList = parsed_range
28 ewv 1.92 nsjobs = len(chosenJobsList)
29 slacapra 1.84 else:
30     msg = 'Bad submission option <'+str(val)+'>\n'
31     msg += ' Must be an integer or "all"'
32     msg += ' Generic range is not allowed"'
33     raise CrabException(msg)
34     pass
35 ewv 1.92
36 slacapra 1.84 common.logger.debug(5,'nsjobs '+str(nsjobs))
37     # total jobs
38     nj_list = []
39     # get the first not already submitted
40 spiga 1.115 self.complete_List = common._db.nJobs('list')
41     common.logger.debug(5,'Total jobs '+str(len(self.complete_List)))
42 slacapra 1.84 jobSetForSubmission = 0
43     jobSkippedInSubmission = []
44     datasetpath=self.cfg_params['CMSSW.datasetpath']
45 slacapra 1.97 if string.lower(datasetpath)=='none':
46 fanzago 1.104 datasetpath = None
47 spiga 1.115 tmp_jList = self.complete_List
48 slacapra 1.84 if chosenJobsList != None:
49     tmp_jList = chosenJobsList
50     # build job list
51 ewv 1.133 from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
52 ewv 1.135 seWhiteList = cfg_params.get('EDG.se_white_list',[])
53     seBlackList = cfg_params.get('EDG.se_black_list',[])
54     self.blackWhiteListParser = SEBlackWhiteListParser(seWhiteList, seBlackList, common.logger)
55 spiga 1.134 for job in common._db.getTask(tmp_jList).jobs:
56 ewv 1.135 cleanedBlackWhiteList = self.blackWhiteListParser.cleanForBlackWhiteList(job['dlsDestination'])
57 ewv 1.128 if (cleanedBlackWhiteList != '') or (datasetpath == None):
58 spiga 1.134 if ( job.runningJob['status'] in ['C','RC'] and \
59     job.runningJob['statusScheduler'] in ['Created',None]):
60 slacapra 1.84 jobSetForSubmission +=1
61 ewv 1.135 nj_list.append(job['id'])
62 ewv 1.92 else:
63 slacapra 1.84 continue
64     else :
65 spiga 1.134 jobSkippedInSubmission.append( job['id'] )
66 slacapra 1.84 if nsjobs >0 and nsjobs == jobSetForSubmission:
67     break
68     pass
69     if nsjobs>jobSetForSubmission:
70 spiga 1.134 common.logger.message('asking to submit '+str(nsjobs)+' jobs, but only '+\
71     str(jobSetForSubmission)+' left: submitting those')
72 slacapra 1.84 if len(jobSkippedInSubmission) > 0 :
73     mess =""
74     for jobs in jobSkippedInSubmission:
75     mess += str(jobs) + ","
76     common.logger.message("Jobs: " +str(mess) + "\n skipped because no sites are hosting this data\n")
77 slacapra 1.89 self.submissionError()
78     pass
79 slacapra 1.84 # submit N from last submitted job
80     common.logger.debug(5,'nj_list '+str(nj_list))
81 ewv 1.92
82 corvo 1.30
83 slacapra 1.84 self.nj_list = nj_list
84 nsmirnov 1.1 return
85 ewv 1.92
86 nsmirnov 1.1 def run(self):
87 nsmirnov 1.2 """
88 slacapra 1.53 The main method of the class: submit jobs in range self.nj_list
89 nsmirnov 1.2 """
90     common.logger.debug(5, "Submitter::run() called")
91 slacapra 1.24
92 spiga 1.112 start = time.time()
93    
94 ewv 1.128 check = self.checkIfCreate()
95    
96 spiga 1.112 if check == 0 :
97     self.SendMLpre()
98 ewv 1.128
99     list_matched , task = self.performMatch()
100     njs = self.perfromSubmission(list_matched, task)
101    
102 spiga 1.112 stop = time.time()
103     common.logger.debug(1, "Submission Time: "+str(stop - start))
104     common.logger.write("Submission time :"+str(stop - start))
105 ewv 1.128
106 spiga 1.112 msg = '\nTotal of %d jobs submitted'%njs
107     if njs != len(self.nj_list) :
108     msg += ' (from %d requested).'%(len(self.nj_list))
109     else:
110     msg += '.'
111     common.logger.message(msg)
112 ewv 1.128
113 spiga 1.112 if (njs < len(self.nj_list) or len(self.nj_list)==0):
114     self.submissionError()
115    
116    
117 ewv 1.128 def checkIfCreate(self):
118 spiga 1.112 """
119     """
120     code = 0
121 spiga 1.94 totalCreatedJobs = 0
122 spiga 1.134 task=common._db.getTask()
123     for job in task.jobs:
124     if job.runningJob['status'] in ['C','RC'] \
125     and job.runningJob['statusScheduler'] == 'Created':totalCreatedJobs +=1
126 slacapra 1.24
127     if (totalCreatedJobs==0):
128 spiga 1.112 common.logger.message("No jobs to be submitted: first create them")
129 ewv 1.128 code = 1
130     return code
131 ewv 1.92
132 gutsche 1.70
133 ewv 1.128 def performMatch(self):
134     """
135 spiga 1.113 """
136 spiga 1.114 common.logger.message("Checking available resources...")
137 ewv 1.128 ### define here the list of distinct destinations sites list
138 spiga 1.94 distinct_dests = common._db.queryDistJob_Attr('dlsDestination', 'jobId' ,self.nj_list)
139    
140    
141     ### define here the list of jobs Id for each distinct list of sites
142 spiga 1.112 self.sub_jobs =[] # list of jobs Id list to submit
143 spiga 1.95 jobs_to_match =[] # list of jobs Id to match
144 ewv 1.128 all_jobs=[]
145 spiga 1.94 count=0
146 ewv 1.128 for distDest in distinct_dests:
147 spiga 1.94 all_jobs.append(common._db.queryAttrJob({'dlsDestination':distDest},'jobId'))
148     sub_jobs_temp=[]
149     for i in self.nj_list:
150 ewv 1.128 if i in all_jobs[count]: sub_jobs_temp.append(i)
151 spiga 1.94 if len(sub_jobs_temp)>0:
152 ewv 1.128 self.sub_jobs.append(sub_jobs_temp)
153 spiga 1.112 jobs_to_match.append(self.sub_jobs[count][0])
154 spiga 1.103 count +=1
155 spiga 1.94 sel=0
156 ewv 1.128 matched=[]
157 spiga 1.95
158     task=common._db.getTask()
159    
160     for id_job in jobs_to_match :
161 spiga 1.121 match = common.scheduler.listMatch(distinct_dests[sel], False)
162 slacapra 1.111 if len(match)>0:
163 spiga 1.121 common.logger.message("Found compatible site(s) for job "+str(id_job))
164 slacapra 1.110 matched.append(sel)
165 spiga 1.77 else:
166 spiga 1.112 common.logger.message("No compatible site found, will not submit jobs "+str(self.sub_jobs[sel]))
167 slacapra 1.110 self.submissionError()
168 spiga 1.94 sel += 1
169 ewv 1.92
170 ewv 1.128 return matched , task
171 spiga 1.112
172     def perfromSubmission(self,matched,task):
173    
174 ewv 1.128 njs=0
175    
176 spiga 1.94 ### Progress Bar indicator, deactivate for debug
177     if not common.logger.debugLevel() :
178 slacapra 1.110 term = TerminalController()
179 ewv 1.128
180     if len(matched)>0:
181 spiga 1.94 common.logger.message(str(len(matched))+" blocks of jobs will be submitted")
182 ewv 1.128 for ii in matched:
183 spiga 1.112 common.logger.debug(1,'Submitting jobs '+str(self.sub_jobs[ii]))
184    
185 slacapra 1.110 try:
186 spiga 1.112 common.scheduler.submit(self.sub_jobs[ii],task)
187 slacapra 1.110 except CrabException:
188     raise CrabException("Job not submitted")
189    
190 corvo 1.74 if not common.logger.debugLevel() :
191 spiga 1.112 try: pbar = ProgressBar(term, 'Submitting '+str(len(self.sub_jobs[ii]))+' jobs')
192 corvo 1.74 except: pbar = None
193 spiga 1.94 if not common.logger.debugLevel():
194     if pbar :
195 spiga 1.112 pbar.update(float(ii+1)/float(len(self.sub_jobs)),'please wait')
196 ewv 1.128 ### check the if the submission succeded Maybe not neede
197 spiga 1.94 if not common.logger.debugLevel():
198     if pbar :
199 spiga 1.112 pbar.update(float(ii+1)/float(len(self.sub_jobs)),'please wait')
200 ewv 1.92
201 ewv 1.128 ### check the if the submission succeded Maybe not needed or at least simplified
202 spiga 1.112 sched_Id = common._db.queryRunJob('schedulerId', self.sub_jobs[ii])
203 spiga 1.95 listId=[]
204 spiga 1.94 run_jobToSave = {'status' :'S'}
205 spiga 1.108 listRunField = []
206 ewv 1.128 for j in range(len(self.sub_jobs[ii])):
207     if str(sched_Id[j]) != '':
208     listId.append(self.sub_jobs[ii][j])
209     listRunField.append(run_jobToSave)
210 spiga 1.112 common.logger.debug(5,"Submitted job # "+ str(self.sub_jobs[ii][j]))
211 spiga 1.94 njs += 1
212 ewv 1.128 common._db.updateRunJob_(listId, listRunField)
213 spiga 1.112 self.SendMLpost(self.sub_jobs[ii])
214    
215 spiga 1.94 else:
216     common.logger.message("The whole task doesn't found compatible site ")
217 ewv 1.92
218 spiga 1.112 return njs
219 spiga 1.99
220     def submissionError(self):
221     ## add some more verbose message in case submission is not complete
222     msg = 'Submission performed using the Requirements: \n'
223     ### TODO_ DS--BL
224     #msg += common.taskDB.dict("jobtype")+' version: '+common.taskDB.dict("codeVersion")+'\n'
225     #msg += '(Hint: please check if '+common.taskDB.dict("jobtype")+' is available at the Sites)\n'
226     if self.cfg_params.has_key('EDG.se_white_list'):
227     msg += 'SE White List: '+self.cfg_params['EDG.se_white_list']+'\n'
228     if self.cfg_params.has_key('EDG.se_black_list'):
229     msg += 'SE Black List: '+self.cfg_params['EDG.se_black_list']+'\n'
230     if self.cfg_params.has_key('EDG.ce_white_list'):
231     msg += 'CE White List: '+self.cfg_params['EDG.ce_white_list']+'\n'
232     if self.cfg_params.has_key('EDG.ce_black_list'):
233     msg += 'CE Black List: '+self.cfg_params['EDG.ce_black_list']+'\n'
234     msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\nPlease check if the dataset is available at this site!)\n'
235 spiga 1.112 common.logger.message(msg)
236    
237     return
238 spiga 1.99
239 spiga 1.112 def collect_MLInfo(self):
240     """
241 ewv 1.129 Prepare DashBoard information
242 spiga 1.112 """
243 ewv 1.92
244 ewv 1.131 taskId = uniqueTaskName(common._db.queryTask('name'))
245 spiga 1.112 gridName = string.strip(common.scheduler.userName())
246     common.logger.debug(5, "GRIDNAME: "+gridName)
247     taskType = 'analysis'
248 ewv 1.128
249 spiga 1.112 self.datasetPath = self.cfg_params['CMSSW.datasetpath']
250     if string.lower(self.datasetPath)=='none':
251     self.datasetPath = None
252     self.executable = self.cfg_params.get('CMSSW.executable','cmsRun')
253     VO = self.cfg_params.get('EDG.virtual_organization','cms')
254    
255 ewv 1.129 params = {'tool': common.prog_name,
256     'JSToolVersion': common.prog_version_str,
257     'tool_ui': os.environ.get('HOSTNAME',''),
258     'scheduler': common.scheduler.name(),
259     'GridName': gridName,
260     'taskType': taskType,
261     'vo': VO,
262     'user': os.environ.get('USER',''),
263     'taskId': taskId,
264     'datasetFull': self.datasetPath,
265 ewv 1.128 'exe': self.executable }
266 spiga 1.112
267     return params
268 ewv 1.128
269 spiga 1.112 def SendMLpre(self):
270     """
271 ewv 1.128 Send Pre info to ML
272 spiga 1.112 """
273     params = self.collect_MLInfo()
274 ewv 1.128
275 spiga 1.112 params['jobId'] ='TaskMeta'
276 ewv 1.128
277 spiga 1.112 common.apmon.sendToML(params)
278 ewv 1.128
279 spiga 1.112 common.logger.debug(5,'Submission DashBoard Pre-Submission report: '+str(params))
280 ewv 1.128
281 spiga 1.112 return
282 ewv 1.92
283 spiga 1.112 def SendMLpost(self,allList):
284     """
285 ewv 1.128 Send post-submission info to ML
286     """
287     task = common._db.getTask(allList)
288 spiga 1.112
289     params = {}
290     for k,v in self.collect_MLInfo().iteritems():
291     params[k] = v
292 ewv 1.128
293 spiga 1.118
294 spiga 1.112 Sub_Type = 'Direct'
295     for job in task.jobs:
296 ewv 1.128 jj = job['jobId']
297 spiga 1.112 jobId = ''
298     localId = ''
299 ewv 1.128 jid = str(job.runningJob['schedulerId'])
300 ewv 1.130 if common.scheduler.name().upper() in ['CONDOR_G','GLIDEIN']:
301 spiga 1.112 rb = 'OSG'
302 ewv 1.128 taskHash = sha.new(common._db.queryTask('name')).hexdigest()
303 ewv 1.130 jobId = str(jj) + '_https://' + common.scheduler.name() + '/' + taskHash + '/' + str(jj)
304 spiga 1.112 common.logger.debug(5,'JobID for ML monitoring is created for CONDOR_G scheduler:'+jobId)
305 ewv 1.128 elif common.scheduler.name().upper() in ['LSF', 'CAF']:
306 spiga 1.132 jobId="https://"+common.scheduler.name()+":/"+jid+"-"+string.replace(str(task['name']),"_","-")
307 spiga 1.112 common.logger.debug(5,'JobID for ML monitoring is created for LSF scheduler:'+jobId)
308     rb = common.scheduler.name()
309     localId = jid
310     else:
311     jobId = str(jj) + '_' + str(jid)
312     common.logger.debug(5,'JobID for ML monitoring is created for gLite scheduler'+jobId)
313     rb = str(job.runningJob['service'])
314 ewv 1.128
315     dlsDest = job['dlsDestination']
316 spiga 1.125 if len(dlsDest) == 1 :
317     T_SE=str(dlsDest[0])
318     elif len(dlsDest) == 2 :
319     T_SE=str(dlsDest[0])+','+str(dlsDest[1])
320 ewv 1.128 else :
321 spiga 1.112 T_SE=str(len(dlsDest))+'_Selected_SE'
322    
323    
324     infos = { 'jobId': jobId, \
325     'sid': jid, \
326     'broker': rb, \
327     'bossId': jj, \
328     'SubmissionType': Sub_Type, \
329     'TargetSE': T_SE, \
330     'localId' : localId}
331    
332     for k,v in infos.iteritems():
333     params[k] = v
334    
335     common.logger.debug(5,'Submission DashBoard report: '+str(params))
336     common.apmon.sendToML(params)
337 nsmirnov 1.1 return
338 spiga 1.112
339