ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Submitter.py
Revision: 1.147
Committed: Tue May 26 17:19:39 2009 UTC (15 years, 11 months ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_6_0_pre3
Changes since 1.146: +4 -4 lines
Log Message:
common.logger.debugLevel not more there

File Contents

# User Rev Content
1 nsmirnov 1.1 from Actor import *
2 nsmirnov 1.6 from crab_util import *
3 nsmirnov 1.2 import common
4 corvo 1.9 from ApmonIf import ApmonIf
5 slacapra 1.60 #from random import random
6 corvo 1.30 import time
7 ewv 1.128 import sha
8 ewv 1.139 import socket
9 ewv 1.140 import Scram
10 slacapra 1.60 from ProgressBar import ProgressBar
11     from TerminalController import TerminalController
12 nsmirnov 1.1
13     class Submitter(Actor):
14 slacapra 1.84 def __init__(self, cfg_params, parsed_range, val):
15 nsmirnov 1.1 self.cfg_params = cfg_params
16 slacapra 1.84
17     # get user request
18     nsjobs = -1
19     chosenJobsList = None
20     if val:
21 slacapra 1.91 if val=='range': # for Resubmitter
22     chosenJobsList = parsed_range
23 ewv 1.92 elif val=='all':
24 slacapra 1.84 pass
25     elif (type(eval(val)) is int) and eval(val) > 0:
26     # positive number
27     nsjobs = eval(val)
28     elif (type(eval(val)) is tuple)or( type(eval(val)) is int and eval(val)<0 ) :
29     chosenJobsList = parsed_range
30 ewv 1.92 nsjobs = len(chosenJobsList)
31 slacapra 1.84 else:
32     msg = 'Bad submission option <'+str(val)+'>\n'
33     msg += ' Must be an integer or "all"'
34     msg += ' Generic range is not allowed"'
35     raise CrabException(msg)
36     pass
37 ewv 1.92
38 spiga 1.145 common.logger.debug('nsjobs '+str(nsjobs))
39 slacapra 1.84 # total jobs
40     nj_list = []
41     # get the first not already submitted
42 spiga 1.115 self.complete_List = common._db.nJobs('list')
43 spiga 1.145 common.logger.debug('Total jobs '+str(len(self.complete_List)))
44 slacapra 1.84 jobSetForSubmission = 0
45     jobSkippedInSubmission = []
46     datasetpath=self.cfg_params['CMSSW.datasetpath']
47 slacapra 1.97 if string.lower(datasetpath)=='none':
48 fanzago 1.104 datasetpath = None
49 spiga 1.115 tmp_jList = self.complete_List
50 slacapra 1.84 if chosenJobsList != None:
51     tmp_jList = chosenJobsList
52     # build job list
53 ewv 1.133 from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
54 spiga 1.146 seWhiteList = cfg_params.get('GRID.se_white_list',[])
55     seBlackList = cfg_params.get('GRID.se_black_list',[])
56 ewv 1.135 self.blackWhiteListParser = SEBlackWhiteListParser(seWhiteList, seBlackList, common.logger)
57 spiga 1.134 for job in common._db.getTask(tmp_jList).jobs:
58 ewv 1.135 cleanedBlackWhiteList = self.blackWhiteListParser.cleanForBlackWhiteList(job['dlsDestination'])
59 ewv 1.128 if (cleanedBlackWhiteList != '') or (datasetpath == None):
60 spiga 1.134 if ( job.runningJob['status'] in ['C','RC'] and \
61     job.runningJob['statusScheduler'] in ['Created',None]):
62 slacapra 1.84 jobSetForSubmission +=1
63 ewv 1.135 nj_list.append(job['id'])
64 ewv 1.92 else:
65 slacapra 1.84 continue
66     else :
67 spiga 1.134 jobSkippedInSubmission.append( job['id'] )
68 slacapra 1.84 if nsjobs >0 and nsjobs == jobSetForSubmission:
69     break
70     pass
71     if nsjobs>jobSetForSubmission:
72 spiga 1.145 common.logger.info('asking to submit '+str(nsjobs)+' jobs, but only '+\
73 spiga 1.134 str(jobSetForSubmission)+' left: submitting those')
74 slacapra 1.84 if len(jobSkippedInSubmission) > 0 :
75     mess =""
76     for jobs in jobSkippedInSubmission:
77     mess += str(jobs) + ","
78 spiga 1.145 common.logger.info("Jobs: " +str(mess) + "\n skipped because no sites are hosting this data\n")
79 slacapra 1.89 self.submissionError()
80     pass
81 slacapra 1.84 # submit N from last submitted job
82 spiga 1.145 common.logger.debug('nj_list '+str(nj_list))
83 ewv 1.92
84 corvo 1.30
85 slacapra 1.84 self.nj_list = nj_list
86 ewv 1.140 self.scram = Scram.Scram(cfg_params)
87 nsmirnov 1.1 return
88 ewv 1.92
89 nsmirnov 1.1 def run(self):
90 nsmirnov 1.2 """
91 slacapra 1.53 The main method of the class: submit jobs in range self.nj_list
92 nsmirnov 1.2 """
93 spiga 1.145 common.logger.debug("Submitter::run() called")
94 slacapra 1.24
95 spiga 1.112 start = time.time()
96    
97 ewv 1.128 check = self.checkIfCreate()
98    
99 spiga 1.112 if check == 0 :
100     self.SendMLpre()
101 ewv 1.128
102     list_matched , task = self.performMatch()
103     njs = self.perfromSubmission(list_matched, task)
104    
105 spiga 1.112 stop = time.time()
106 spiga 1.145 common.logger.debug("Submission Time: "+str(stop - start))
107 ewv 1.128
108 spiga 1.112 msg = '\nTotal of %d jobs submitted'%njs
109     if njs != len(self.nj_list) :
110     msg += ' (from %d requested).'%(len(self.nj_list))
111     else:
112     msg += '.'
113 spiga 1.145 common.logger.info(msg)
114 ewv 1.128
115 spiga 1.112 if (njs < len(self.nj_list) or len(self.nj_list)==0):
116     self.submissionError()
117    
118    
119 ewv 1.128 def checkIfCreate(self):
120 spiga 1.112 """
121     """
122     code = 0
123 spiga 1.94 totalCreatedJobs = 0
124 spiga 1.134 task=common._db.getTask()
125     for job in task.jobs:
126     if job.runningJob['status'] in ['C','RC'] \
127     and job.runningJob['statusScheduler'] == 'Created':totalCreatedJobs +=1
128 slacapra 1.24
129     if (totalCreatedJobs==0):
130 spiga 1.145 common.logger.info("No jobs to be submitted: first create them")
131 ewv 1.128 code = 1
132     return code
133 ewv 1.92
134 gutsche 1.70
135 ewv 1.128 def performMatch(self):
136     """
137 spiga 1.113 """
138 spiga 1.145 common.logger.info("Checking available resources...")
139 ewv 1.128 ### define here the list of distinct destinations sites list
140 spiga 1.94 distinct_dests = common._db.queryDistJob_Attr('dlsDestination', 'jobId' ,self.nj_list)
141    
142    
143     ### define here the list of jobs Id for each distinct list of sites
144 spiga 1.112 self.sub_jobs =[] # list of jobs Id list to submit
145 spiga 1.95 jobs_to_match =[] # list of jobs Id to match
146 ewv 1.128 all_jobs=[]
147 spiga 1.94 count=0
148 ewv 1.128 for distDest in distinct_dests:
149 spiga 1.94 all_jobs.append(common._db.queryAttrJob({'dlsDestination':distDest},'jobId'))
150     sub_jobs_temp=[]
151     for i in self.nj_list:
152 ewv 1.128 if i in all_jobs[count]: sub_jobs_temp.append(i)
153 spiga 1.94 if len(sub_jobs_temp)>0:
154 ewv 1.128 self.sub_jobs.append(sub_jobs_temp)
155 spiga 1.112 jobs_to_match.append(self.sub_jobs[count][0])
156 spiga 1.103 count +=1
157 spiga 1.94 sel=0
158 ewv 1.128 matched=[]
159 spiga 1.95
160     task=common._db.getTask()
161    
162     for id_job in jobs_to_match :
163 spiga 1.121 match = common.scheduler.listMatch(distinct_dests[sel], False)
164 slacapra 1.111 if len(match)>0:
165 spiga 1.145 common.logger.info("Found compatible site(s) for job "+str(id_job))
166 slacapra 1.110 matched.append(sel)
167 spiga 1.77 else:
168 spiga 1.145 common.logger.info("No compatible site found, will not submit jobs "+str(self.sub_jobs[sel]))
169 slacapra 1.110 self.submissionError()
170 spiga 1.94 sel += 1
171 ewv 1.92
172 ewv 1.128 return matched , task
173 spiga 1.112
174     def perfromSubmission(self,matched,task):
175    
176 ewv 1.128 njs=0
177    
178 spiga 1.94 ### Progress Bar indicator, deactivate for debug
179 spiga 1.147 if common.debugLevel == 0 :
180 slacapra 1.110 term = TerminalController()
181 ewv 1.128
182     if len(matched)>0:
183 spiga 1.145 common.logger.info(str(len(matched))+" blocks of jobs will be submitted")
184 ewv 1.128 for ii in matched:
185 spiga 1.145 common.logger.debug('Submitting jobs '+str(self.sub_jobs[ii]))
186 spiga 1.112
187 slacapra 1.110 try:
188 spiga 1.112 common.scheduler.submit(self.sub_jobs[ii],task)
189 slacapra 1.110 except CrabException:
190     raise CrabException("Job not submitted")
191    
192 spiga 1.147 if not common.debugLevel == 0 :
193 spiga 1.112 try: pbar = ProgressBar(term, 'Submitting '+str(len(self.sub_jobs[ii]))+' jobs')
194 corvo 1.74 except: pbar = None
195 spiga 1.147 if not common.debugLevel == 0:
196 spiga 1.94 if pbar :
197 spiga 1.112 pbar.update(float(ii+1)/float(len(self.sub_jobs)),'please wait')
198 ewv 1.128 ### check the if the submission succeded Maybe not neede
199 spiga 1.147 if not common.debugLevel == 0:
200 spiga 1.94 if pbar :
201 spiga 1.112 pbar.update(float(ii+1)/float(len(self.sub_jobs)),'please wait')
202 ewv 1.92
203 ewv 1.128 ### check the if the submission succeded Maybe not needed or at least simplified
204 spiga 1.112 sched_Id = common._db.queryRunJob('schedulerId', self.sub_jobs[ii])
205 spiga 1.95 listId=[]
206 spiga 1.94 run_jobToSave = {'status' :'S'}
207 spiga 1.108 listRunField = []
208 ewv 1.128 for j in range(len(self.sub_jobs[ii])):
209     if str(sched_Id[j]) != '':
210     listId.append(self.sub_jobs[ii][j])
211     listRunField.append(run_jobToSave)
212 spiga 1.145 common.logger.debug("Submitted job # "+ str(self.sub_jobs[ii][j]))
213 spiga 1.94 njs += 1
214 ewv 1.128 common._db.updateRunJob_(listId, listRunField)
215 mcinquil 1.144 self.stateChange(listId,"SubSuccess")
216 spiga 1.112 self.SendMLpost(self.sub_jobs[ii])
217    
218 spiga 1.94 else:
219 spiga 1.145 common.logger.info("The whole task doesn't found compatible site ")
220 ewv 1.92
221 spiga 1.112 return njs
222 spiga 1.99
223     def submissionError(self):
224     ## add some more verbose message in case submission is not complete
225     msg = 'Submission performed using the Requirements: \n'
226     ### TODO_ DS--BL
227     #msg += common.taskDB.dict("jobtype")+' version: '+common.taskDB.dict("codeVersion")+'\n'
228     #msg += '(Hint: please check if '+common.taskDB.dict("jobtype")+' is available at the Sites)\n'
229 spiga 1.146 if self.cfg_params.has_key('GRID.se_white_list'):
230     msg += '\tSE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
231     if self.cfg_params.has_key('GRID.se_black_list'):
232     msg += '\tSE Black List: '+self.cfg_params['GRID.se_black_list']+'\n'
233     if self.cfg_params.has_key('GRID.ce_white_list'):
234     msg += '\tCE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
235     if self.cfg_params.has_key('GRID.ce_black_list'):
236     msg += '\tCE Black List: '+self.cfg_params['GRID.ce_black_list']+'\n'
237     removeDefBL = self.cfg_params.get('GRID.remove_default_blacklist',0)
238 spiga 1.137 if removeDefBL == '0':
239     msg += '\tNote: All CMS T1s are BlackListed by default \n'
240 spiga 1.136 msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
241     msg += '\tPlease check if :\n'
242     msg += '\t\t -- the dataset is available at this site!\n'
243     msg += '\t\t -- the CMSSW version is available at this site!)\n'
244 spiga 1.145 common.logger.info(msg)
245 spiga 1.112
246     return
247 spiga 1.99
248 spiga 1.112 def collect_MLInfo(self):
249     """
250 ewv 1.129 Prepare DashBoard information
251 spiga 1.112 """
252 ewv 1.92
253 spiga 1.142 taskId = common._db.queryTask('name')
254 spiga 1.112 gridName = string.strip(common.scheduler.userName())
255 spiga 1.145 common.logger.debug("GRIDNAME: "+gridName)
256 spiga 1.112 taskType = 'analysis'
257 ewv 1.128
258 spiga 1.112 self.datasetPath = self.cfg_params['CMSSW.datasetpath']
259     if string.lower(self.datasetPath)=='none':
260     self.datasetPath = None
261     self.executable = self.cfg_params.get('CMSSW.executable','cmsRun')
262 spiga 1.146 VO = self.cfg_params.get('GRID.virtual_organization','cms')
263 spiga 1.112
264 ewv 1.129 params = {'tool': common.prog_name,
265 spiga 1.141 'SubmissionType':'direct',
266 ewv 1.129 'JSToolVersion': common.prog_version_str,
267     'tool_ui': os.environ.get('HOSTNAME',''),
268     'scheduler': common.scheduler.name(),
269     'GridName': gridName,
270 ewv 1.140 'ApplicationVersion': self.scram.getSWVersion(),
271 ewv 1.129 'taskType': taskType,
272     'vo': VO,
273 spiga 1.142 'CMSUser': getUserName(),
274     'user': getUserName(),
275 spiga 1.143 'taskId': str(taskId),
276 ewv 1.129 'datasetFull': self.datasetPath,
277 ewv 1.128 'exe': self.executable }
278 spiga 1.112
279     return params
280 ewv 1.128
281 spiga 1.112 def SendMLpre(self):
282     """
283 ewv 1.128 Send Pre info to ML
284 spiga 1.112 """
285     params = self.collect_MLInfo()
286 ewv 1.128
287 spiga 1.112 params['jobId'] ='TaskMeta'
288 ewv 1.128
289 spiga 1.112 common.apmon.sendToML(params)
290 ewv 1.128
291 spiga 1.145 common.logger.debug('Submission DashBoard Pre-Submission report: '+str(params))
292 ewv 1.128
293 spiga 1.112 return
294 ewv 1.92
295 spiga 1.112 def SendMLpost(self,allList):
296     """
297 ewv 1.128 Send post-submission info to ML
298     """
299     task = common._db.getTask(allList)
300 spiga 1.112
301     params = {}
302     for k,v in self.collect_MLInfo().iteritems():
303     params[k] = v
304 ewv 1.128
305 spiga 1.118
306 spiga 1.112 Sub_Type = 'Direct'
307     for job in task.jobs:
308 ewv 1.128 jj = job['jobId']
309 spiga 1.112 jobId = ''
310     localId = ''
311 ewv 1.128 jid = str(job.runningJob['schedulerId'])
312 ewv 1.130 if common.scheduler.name().upper() in ['CONDOR_G','GLIDEIN']:
313 spiga 1.112 rb = 'OSG'
314 ewv 1.128 taskHash = sha.new(common._db.queryTask('name')).hexdigest()
315 ewv 1.130 jobId = str(jj) + '_https://' + common.scheduler.name() + '/' + taskHash + '/' + str(jj)
316 spiga 1.145 common.logger.debug('JobID for ML monitoring is created for CONDOR_G scheduler:'+jobId)
317 ewv 1.128 elif common.scheduler.name().upper() in ['LSF', 'CAF']:
318 spiga 1.138 jobId= str(jj) + "_https://"+common.scheduler.name()+":/"+jid+"-"+string.replace(str(task['name']),"_","-")
319 spiga 1.145 common.logger.debug('JobID for ML monitoring is created for LSF scheduler:'+jobId)
320 spiga 1.112 rb = common.scheduler.name()
321     localId = jid
322 ewv 1.139 elif common.scheduler.name().upper() in ['CONDOR']:
323     taskHash = sha.new(common._db.queryTask('name')).hexdigest()
324     jobId = str(jj) + '_https://' + socket.gethostname() + '/' + taskHash + '/' + str(jj)
325 spiga 1.145 common.logger.debug('JobID for ML monitoring is created for CONDOR scheduler:'+jobId)
326 ewv 1.139 rb = common.scheduler.name()
327 spiga 1.112 else:
328     jobId = str(jj) + '_' + str(jid)
329 spiga 1.145 common.logger.debug('JobID for ML monitoring is created for gLite scheduler'+jobId)
330 spiga 1.112 rb = str(job.runningJob['service'])
331 ewv 1.128
332     dlsDest = job['dlsDestination']
333 spiga 1.125 if len(dlsDest) == 1 :
334     T_SE=str(dlsDest[0])
335     elif len(dlsDest) == 2 :
336     T_SE=str(dlsDest[0])+','+str(dlsDest[1])
337 ewv 1.128 else :
338 spiga 1.112 T_SE=str(len(dlsDest))+'_Selected_SE'
339    
340    
341     infos = { 'jobId': jobId, \
342     'sid': jid, \
343     'broker': rb, \
344     'bossId': jj, \
345     'SubmissionType': Sub_Type, \
346     'TargetSE': T_SE, \
347     'localId' : localId}
348    
349     for k,v in infos.iteritems():
350     params[k] = v
351    
352 spiga 1.145 common.logger.debug('Submission DashBoard report: '+str(params))
353 spiga 1.112 common.apmon.sendToML(params)
354 nsmirnov 1.1 return
355 spiga 1.112
356    
357 mcinquil 1.144 def stateChange(self, subrange, action):
358     """
359     _stateChange_
360     """
361 spiga 1.145 common.logger.debug( "Updating [%s] state "%str(subrange))
362 mcinquil 1.144 updlist = [{'state': action}] * len(subrange)
363     common._db.updateRunJob_(subrange, updlist)
364