ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Submitter.py
Revision: 1.137
Committed: Tue Nov 11 15:08:21 2008 UTC (16 years, 5 months ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_4_3_pre7, CRAB_2_4_3_pre6, CRAB_2_4_3_pre5, CRAB_2_4_3_pre3, CRAB_2_4_3_pre2, CRAB_2_4_3_pre1, CRAB_2_4_2
Changes since 1.136: +3 -0 lines
Log Message:
notify user about default T1 BL in case of no resources founded

File Contents

# User Rev Content
1 nsmirnov 1.1 from Actor import *
2 nsmirnov 1.6 from crab_util import *
3 nsmirnov 1.2 import common
4 corvo 1.9 from ApmonIf import ApmonIf
5 slacapra 1.60 #from random import random
6 corvo 1.30 import time
7 ewv 1.128 import sha
8 slacapra 1.60 from ProgressBar import ProgressBar
9     from TerminalController import TerminalController
10 nsmirnov 1.1
11     class Submitter(Actor):
12 slacapra 1.84 def __init__(self, cfg_params, parsed_range, val):
13 nsmirnov 1.1 self.cfg_params = cfg_params
14 slacapra 1.84
15     # get user request
16     nsjobs = -1
17     chosenJobsList = None
18     if val:
19 slacapra 1.91 if val=='range': # for Resubmitter
20     chosenJobsList = parsed_range
21 ewv 1.92 elif val=='all':
22 slacapra 1.84 pass
23     elif (type(eval(val)) is int) and eval(val) > 0:
24     # positive number
25     nsjobs = eval(val)
26     elif (type(eval(val)) is tuple)or( type(eval(val)) is int and eval(val)<0 ) :
27     chosenJobsList = parsed_range
28 ewv 1.92 nsjobs = len(chosenJobsList)
29 slacapra 1.84 else:
30     msg = 'Bad submission option <'+str(val)+'>\n'
31     msg += ' Must be an integer or "all"'
32     msg += ' Generic range is not allowed"'
33     raise CrabException(msg)
34     pass
35 ewv 1.92
36 slacapra 1.84 common.logger.debug(5,'nsjobs '+str(nsjobs))
37     # total jobs
38     nj_list = []
39     # get the first not already submitted
40 spiga 1.115 self.complete_List = common._db.nJobs('list')
41     common.logger.debug(5,'Total jobs '+str(len(self.complete_List)))
42 slacapra 1.84 jobSetForSubmission = 0
43     jobSkippedInSubmission = []
44     datasetpath=self.cfg_params['CMSSW.datasetpath']
45 slacapra 1.97 if string.lower(datasetpath)=='none':
46 fanzago 1.104 datasetpath = None
47 spiga 1.115 tmp_jList = self.complete_List
48 slacapra 1.84 if chosenJobsList != None:
49     tmp_jList = chosenJobsList
50     # build job list
51 ewv 1.133 from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
52 ewv 1.135 seWhiteList = cfg_params.get('EDG.se_white_list',[])
53     seBlackList = cfg_params.get('EDG.se_black_list',[])
54     self.blackWhiteListParser = SEBlackWhiteListParser(seWhiteList, seBlackList, common.logger)
55 spiga 1.134 for job in common._db.getTask(tmp_jList).jobs:
56 ewv 1.135 cleanedBlackWhiteList = self.blackWhiteListParser.cleanForBlackWhiteList(job['dlsDestination'])
57 ewv 1.128 if (cleanedBlackWhiteList != '') or (datasetpath == None):
58 spiga 1.134 if ( job.runningJob['status'] in ['C','RC'] and \
59     job.runningJob['statusScheduler'] in ['Created',None]):
60 slacapra 1.84 jobSetForSubmission +=1
61 ewv 1.135 nj_list.append(job['id'])
62 ewv 1.92 else:
63 slacapra 1.84 continue
64     else :
65 spiga 1.134 jobSkippedInSubmission.append( job['id'] )
66 slacapra 1.84 if nsjobs >0 and nsjobs == jobSetForSubmission:
67     break
68     pass
69     if nsjobs>jobSetForSubmission:
70 spiga 1.134 common.logger.message('asking to submit '+str(nsjobs)+' jobs, but only '+\
71     str(jobSetForSubmission)+' left: submitting those')
72 slacapra 1.84 if len(jobSkippedInSubmission) > 0 :
73     mess =""
74     for jobs in jobSkippedInSubmission:
75     mess += str(jobs) + ","
76     common.logger.message("Jobs: " +str(mess) + "\n skipped because no sites are hosting this data\n")
77 slacapra 1.89 self.submissionError()
78     pass
79 slacapra 1.84 # submit N from last submitted job
80     common.logger.debug(5,'nj_list '+str(nj_list))
81 ewv 1.92
82 corvo 1.30
83 slacapra 1.84 self.nj_list = nj_list
84 nsmirnov 1.1 return
85 ewv 1.92
86 nsmirnov 1.1 def run(self):
87 nsmirnov 1.2 """
88 slacapra 1.53 The main method of the class: submit jobs in range self.nj_list
89 nsmirnov 1.2 """
90     common.logger.debug(5, "Submitter::run() called")
91 slacapra 1.24
92 spiga 1.112 start = time.time()
93    
94 ewv 1.128 check = self.checkIfCreate()
95    
96 spiga 1.112 if check == 0 :
97     self.SendMLpre()
98 ewv 1.128
99     list_matched , task = self.performMatch()
100     njs = self.perfromSubmission(list_matched, task)
101    
102 spiga 1.112 stop = time.time()
103     common.logger.debug(1, "Submission Time: "+str(stop - start))
104     common.logger.write("Submission time :"+str(stop - start))
105 ewv 1.128
106 spiga 1.112 msg = '\nTotal of %d jobs submitted'%njs
107     if njs != len(self.nj_list) :
108     msg += ' (from %d requested).'%(len(self.nj_list))
109     else:
110     msg += '.'
111     common.logger.message(msg)
112 ewv 1.128
113 spiga 1.112 if (njs < len(self.nj_list) or len(self.nj_list)==0):
114     self.submissionError()
115    
116    
117 ewv 1.128 def checkIfCreate(self):
118 spiga 1.112 """
119     """
120     code = 0
121 spiga 1.94 totalCreatedJobs = 0
122 spiga 1.134 task=common._db.getTask()
123     for job in task.jobs:
124     if job.runningJob['status'] in ['C','RC'] \
125     and job.runningJob['statusScheduler'] == 'Created':totalCreatedJobs +=1
126 slacapra 1.24
127     if (totalCreatedJobs==0):
128 spiga 1.112 common.logger.message("No jobs to be submitted: first create them")
129 ewv 1.128 code = 1
130     return code
131 ewv 1.92
132 gutsche 1.70
133 ewv 1.128 def performMatch(self):
134     """
135 spiga 1.113 """
136 spiga 1.114 common.logger.message("Checking available resources...")
137 ewv 1.128 ### define here the list of distinct destinations sites list
138 spiga 1.94 distinct_dests = common._db.queryDistJob_Attr('dlsDestination', 'jobId' ,self.nj_list)
139    
140    
141     ### define here the list of jobs Id for each distinct list of sites
142 spiga 1.112 self.sub_jobs =[] # list of jobs Id list to submit
143 spiga 1.95 jobs_to_match =[] # list of jobs Id to match
144 ewv 1.128 all_jobs=[]
145 spiga 1.94 count=0
146 ewv 1.128 for distDest in distinct_dests:
147 spiga 1.94 all_jobs.append(common._db.queryAttrJob({'dlsDestination':distDest},'jobId'))
148     sub_jobs_temp=[]
149     for i in self.nj_list:
150 ewv 1.128 if i in all_jobs[count]: sub_jobs_temp.append(i)
151 spiga 1.94 if len(sub_jobs_temp)>0:
152 ewv 1.128 self.sub_jobs.append(sub_jobs_temp)
153 spiga 1.112 jobs_to_match.append(self.sub_jobs[count][0])
154 spiga 1.103 count +=1
155 spiga 1.94 sel=0
156 ewv 1.128 matched=[]
157 spiga 1.95
158     task=common._db.getTask()
159    
160     for id_job in jobs_to_match :
161 spiga 1.121 match = common.scheduler.listMatch(distinct_dests[sel], False)
162 slacapra 1.111 if len(match)>0:
163 spiga 1.121 common.logger.message("Found compatible site(s) for job "+str(id_job))
164 slacapra 1.110 matched.append(sel)
165 spiga 1.77 else:
166 spiga 1.112 common.logger.message("No compatible site found, will not submit jobs "+str(self.sub_jobs[sel]))
167 slacapra 1.110 self.submissionError()
168 spiga 1.94 sel += 1
169 ewv 1.92
170 ewv 1.128 return matched , task
171 spiga 1.112
172     def perfromSubmission(self,matched,task):
173    
174 ewv 1.128 njs=0
175    
176 spiga 1.94 ### Progress Bar indicator, deactivate for debug
177     if not common.logger.debugLevel() :
178 slacapra 1.110 term = TerminalController()
179 ewv 1.128
180     if len(matched)>0:
181 spiga 1.94 common.logger.message(str(len(matched))+" blocks of jobs will be submitted")
182 ewv 1.128 for ii in matched:
183 spiga 1.112 common.logger.debug(1,'Submitting jobs '+str(self.sub_jobs[ii]))
184    
185 slacapra 1.110 try:
186 spiga 1.112 common.scheduler.submit(self.sub_jobs[ii],task)
187 slacapra 1.110 except CrabException:
188     raise CrabException("Job not submitted")
189    
190 corvo 1.74 if not common.logger.debugLevel() :
191 spiga 1.112 try: pbar = ProgressBar(term, 'Submitting '+str(len(self.sub_jobs[ii]))+' jobs')
192 corvo 1.74 except: pbar = None
193 spiga 1.94 if not common.logger.debugLevel():
194     if pbar :
195 spiga 1.112 pbar.update(float(ii+1)/float(len(self.sub_jobs)),'please wait')
196 ewv 1.128 ### check the if the submission succeded Maybe not neede
197 spiga 1.94 if not common.logger.debugLevel():
198     if pbar :
199 spiga 1.112 pbar.update(float(ii+1)/float(len(self.sub_jobs)),'please wait')
200 ewv 1.92
201 ewv 1.128 ### check the if the submission succeded Maybe not needed or at least simplified
202 spiga 1.112 sched_Id = common._db.queryRunJob('schedulerId', self.sub_jobs[ii])
203 spiga 1.95 listId=[]
204 spiga 1.94 run_jobToSave = {'status' :'S'}
205 spiga 1.108 listRunField = []
206 ewv 1.128 for j in range(len(self.sub_jobs[ii])):
207     if str(sched_Id[j]) != '':
208     listId.append(self.sub_jobs[ii][j])
209     listRunField.append(run_jobToSave)
210 spiga 1.112 common.logger.debug(5,"Submitted job # "+ str(self.sub_jobs[ii][j]))
211 spiga 1.94 njs += 1
212 ewv 1.128 common._db.updateRunJob_(listId, listRunField)
213 spiga 1.112 self.SendMLpost(self.sub_jobs[ii])
214    
215 spiga 1.94 else:
216     common.logger.message("The whole task doesn't found compatible site ")
217 ewv 1.92
218 spiga 1.112 return njs
219 spiga 1.99
220     def submissionError(self):
221     ## add some more verbose message in case submission is not complete
222     msg = 'Submission performed using the Requirements: \n'
223     ### TODO_ DS--BL
224     #msg += common.taskDB.dict("jobtype")+' version: '+common.taskDB.dict("codeVersion")+'\n'
225     #msg += '(Hint: please check if '+common.taskDB.dict("jobtype")+' is available at the Sites)\n'
226     if self.cfg_params.has_key('EDG.se_white_list'):
227 spiga 1.136 msg += '\tSE White List: '+self.cfg_params['EDG.se_white_list']+'\n'
228 spiga 1.99 if self.cfg_params.has_key('EDG.se_black_list'):
229 spiga 1.136 msg += '\tSE Black List: '+self.cfg_params['EDG.se_black_list']+'\n'
230 spiga 1.99 if self.cfg_params.has_key('EDG.ce_white_list'):
231 spiga 1.136 msg += '\tCE White List: '+self.cfg_params['EDG.ce_white_list']+'\n'
232 spiga 1.99 if self.cfg_params.has_key('EDG.ce_black_list'):
233 spiga 1.136 msg += '\tCE Black List: '+self.cfg_params['EDG.ce_black_list']+'\n'
234 spiga 1.137 removeDefBL = self.cfg_params.get('EDG.remove_default_blacklist',0)
235     if removeDefBL == '0':
236     msg += '\tNote: All CMS T1s are BlackListed by default \n'
237 spiga 1.136 msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
238     msg += '\tPlease check if :\n'
239     msg += '\t\t -- the dataset is available at this site!\n'
240     msg += '\t\t -- the CMSSW version is available at this site!)\n'
241 spiga 1.112 common.logger.message(msg)
242    
243     return
244 spiga 1.99
245 spiga 1.112 def collect_MLInfo(self):
246     """
247 ewv 1.129 Prepare DashBoard information
248 spiga 1.112 """
249 ewv 1.92
250 ewv 1.131 taskId = uniqueTaskName(common._db.queryTask('name'))
251 spiga 1.112 gridName = string.strip(common.scheduler.userName())
252     common.logger.debug(5, "GRIDNAME: "+gridName)
253     taskType = 'analysis'
254 ewv 1.128
255 spiga 1.112 self.datasetPath = self.cfg_params['CMSSW.datasetpath']
256     if string.lower(self.datasetPath)=='none':
257     self.datasetPath = None
258     self.executable = self.cfg_params.get('CMSSW.executable','cmsRun')
259     VO = self.cfg_params.get('EDG.virtual_organization','cms')
260    
261 ewv 1.129 params = {'tool': common.prog_name,
262     'JSToolVersion': common.prog_version_str,
263     'tool_ui': os.environ.get('HOSTNAME',''),
264     'scheduler': common.scheduler.name(),
265     'GridName': gridName,
266     'taskType': taskType,
267     'vo': VO,
268     'user': os.environ.get('USER',''),
269     'taskId': taskId,
270     'datasetFull': self.datasetPath,
271 ewv 1.128 'exe': self.executable }
272 spiga 1.112
273     return params
274 ewv 1.128
275 spiga 1.112 def SendMLpre(self):
276     """
277 ewv 1.128 Send Pre info to ML
278 spiga 1.112 """
279     params = self.collect_MLInfo()
280 ewv 1.128
281 spiga 1.112 params['jobId'] ='TaskMeta'
282 ewv 1.128
283 spiga 1.112 common.apmon.sendToML(params)
284 ewv 1.128
285 spiga 1.112 common.logger.debug(5,'Submission DashBoard Pre-Submission report: '+str(params))
286 ewv 1.128
287 spiga 1.112 return
288 ewv 1.92
289 spiga 1.112 def SendMLpost(self,allList):
290     """
291 ewv 1.128 Send post-submission info to ML
292     """
293     task = common._db.getTask(allList)
294 spiga 1.112
295     params = {}
296     for k,v in self.collect_MLInfo().iteritems():
297     params[k] = v
298 ewv 1.128
299 spiga 1.118
300 spiga 1.112 Sub_Type = 'Direct'
301     for job in task.jobs:
302 ewv 1.128 jj = job['jobId']
303 spiga 1.112 jobId = ''
304     localId = ''
305 ewv 1.128 jid = str(job.runningJob['schedulerId'])
306 ewv 1.130 if common.scheduler.name().upper() in ['CONDOR_G','GLIDEIN']:
307 spiga 1.112 rb = 'OSG'
308 ewv 1.128 taskHash = sha.new(common._db.queryTask('name')).hexdigest()
309 ewv 1.130 jobId = str(jj) + '_https://' + common.scheduler.name() + '/' + taskHash + '/' + str(jj)
310 spiga 1.112 common.logger.debug(5,'JobID for ML monitoring is created for CONDOR_G scheduler:'+jobId)
311 ewv 1.128 elif common.scheduler.name().upper() in ['LSF', 'CAF']:
312 spiga 1.132 jobId="https://"+common.scheduler.name()+":/"+jid+"-"+string.replace(str(task['name']),"_","-")
313 spiga 1.112 common.logger.debug(5,'JobID for ML monitoring is created for LSF scheduler:'+jobId)
314     rb = common.scheduler.name()
315     localId = jid
316     else:
317     jobId = str(jj) + '_' + str(jid)
318     common.logger.debug(5,'JobID for ML monitoring is created for gLite scheduler'+jobId)
319     rb = str(job.runningJob['service'])
320 ewv 1.128
321     dlsDest = job['dlsDestination']
322 spiga 1.125 if len(dlsDest) == 1 :
323     T_SE=str(dlsDest[0])
324     elif len(dlsDest) == 2 :
325     T_SE=str(dlsDest[0])+','+str(dlsDest[1])
326 ewv 1.128 else :
327 spiga 1.112 T_SE=str(len(dlsDest))+'_Selected_SE'
328    
329    
330     infos = { 'jobId': jobId, \
331     'sid': jid, \
332     'broker': rb, \
333     'bossId': jj, \
334     'SubmissionType': Sub_Type, \
335     'TargetSE': T_SE, \
336     'localId' : localId}
337    
338     for k,v in infos.iteritems():
339     params[k] = v
340    
341     common.logger.debug(5,'Submission DashBoard report: '+str(params))
342     common.apmon.sendToML(params)
343 nsmirnov 1.1 return
344 spiga 1.112
345