ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Submitter.py
Revision: 1.188
Committed: Fri Dec 14 16:19:20 2012 UTC (12 years, 4 months ago) by belforte
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_9_1, CRAB_2_9_1_pre2, CRAB_2_9_1_pre1, CRAB_2_9_0, CRAB_2_9_0_pre2, CRAB_2_9_0_pre1, CRAB_2_8_8, CRAB_2_8_8_pre1, CRAB_2_8_7_patch3, CRAB_2_8_7_patch2, CRAB_2_8_7_patch1, CRAB_2_8_7, CRAB_2_8_7_pre2, CRAB_2_8_7_pre1, CRAB_2_8_6, CRAB_2_8_6_pre1, CRAB_2_8_5_patch3, CRAB_2_8_5_patch2, CRAB_2_8_5_patch1, CRAB_2_8_5, CRAB_2_8_5_pre5, CRAB_2_8_5_pre4, CRAB_2_8_5_pre3, CRAB_2_8_4_patch3, CRAB_2_8_5_pre2, CRAB_2_8_4_patch2, CRAB_2_8_5_pre1, CRAB_2_8_4_patch1, CRAB_2_8_4, CRAB_2_8_4_pre5, CRAB_2_8_4_pre4, HEAD
Changes since 1.187: +1 -1 lines
Log Message:
remove ML message logging https://savannah.cern.ch/bugs/index.php?99330

File Contents

# Content
1 from Actor import *
2 from crab_util import *
3 import common
4 from ApmonIf import ApmonIf
5 #from random import random
6 import time
7 import socket
8 import Scram
9 from ProgressBar import ProgressBar
10 from TerminalController import TerminalController
11 try:
12 from hashlib import sha1
13 except:
14 from sha import sha as sha1
15
16
17 class Submitter(Actor):
18 def __init__(self, cfg_params, parsed_range, val):
19 self.cfg_params = cfg_params
20 self.limitJobs = True
21 # get user request
22 self.nsjobs = -1
23 self.chosenJobsList = None
24 if val:
25 if val=='range': # for Resubmitter
26 self.chosenJobsList = parsed_range
27 elif val=='all':
28 pass
29 elif (type(eval(val)) is int) and eval(val) > 0:
30 # positive number
31 self.nsjobs = eval(val)
32 elif (type(eval(val)) is tuple)or( type(eval(val)) is int and eval(val)<0 ) :
33 self.chosenJobsList = parsed_range
34 self.nsjobs = len(self.chosenJobsList)
35 else:
36 msg = 'Bad submission option <'+str(val)+'>\n'
37 msg += ' Must be an integer or "all"'
38 msg += ' Generic range is not allowed"'
39 raise CrabException(msg)
40 pass
41 self.seWhiteList = cfg_params.get('GRID.se_white_list',[])
42 self.seBlackList = cfg_params.get('GRID.se_black_list',[])
43 self.datasetPath=self.cfg_params['CMSSW.datasetpath']
44 if string.lower(self.datasetPath)=='none':
45 self.datasetPath = None
46 self.scram = Scram.Scram(cfg_params)
47 return
48
49 #wmbs
50 def BuildJobList(self,type=0):
51 # total jobs
52 nj_list = []
53 self.complete_List = common._db.nJobs('list')
54 if type==1:
55 self.nj_list =[]
56 if self.chosenJobsList: self.nj_list = self.chosenJobsList
57 return
58 # build job list
59 from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
60 self.blackWhiteListParser = SEBlackWhiteListParser(self.seWhiteList, self.seBlackList, common.logger())
61 common.logger.debug('nsjobs '+str(self.nsjobs))
62 # get the first not already submitted
63 common.logger.debug('Total jobs '+str(len(self.complete_List)))
64
65 jobSetForSubmission = 0
66 jobSkippedInSubmission = []
67 tmp_jList = self.complete_List
68 if self.chosenJobsList != None:
69 tmp_jList = self.chosenJobsList
70 for job in common._db.getTask(tmp_jList).jobs:
71 cleanedBlackWhiteList = self.blackWhiteListParser.cleanForBlackWhiteList(job['dlsDestination'])
72 if (cleanedBlackWhiteList != '') or (self.datasetPath == None):
73 #if ( job.runningJob['status'] in ['C','RC'] and job.runningJob['statusScheduler'] in ['Created',None]):
74 if ( job.runningJob['state'] in ['Created']):
75 jobSetForSubmission +=1
76 nj_list.append(job['id'])
77 else:
78 continue
79 else :
80 jobSkippedInSubmission.append( job['id'] )
81 if self.nsjobs >0 and self.nsjobs == jobSetForSubmission:
82 break
83 pass
84 if self.nsjobs>jobSetForSubmission:
85 common.logger.info('asking to submit '+str(self.nsjobs)+' jobs, but only '+\
86 str(jobSetForSubmission)+' left: submitting those')
87 if len(jobSkippedInSubmission) > 0 :
88 mess =""
89 for jobs in jobSkippedInSubmission:
90 mess += str(jobs) + ","
91 common.logger.info("Jobs: " +str(mess) + "\n\tskipped because no sites are hosting this data\n")
92 self.submissionError()
93 pass
94 # submit N from last submitted job
95 common.logger.debug('nj_list '+str(nj_list))
96 self.nj_list = nj_list
97 if self.limitJobs and len(self.nj_list) > 500:
98 ###### FEDE FOR BUG 85243 ##############
99 msg = "The CRAB client will not submit task with more than 500 jobs.\n"
100 msg += " Use the server mode or submit your jobs in smaller groups"
101 raise CrabException(msg)
102 ########################################
103 return
104
105 def run(self):
106 """
107 The main method of the class: submit jobs in range self.nj_list
108 """
109 common.logger.debug("Submitter::run() called")
110
111 start = time.time()
112
113
114 self.BuildJobList()
115
116 check = self.checkIfCreate()
117
118 if check == 0 :
119 self.SendMLpre()
120
121 list_matched , task = self.performMatch()
122 njs = self.perfromSubmission(list_matched, task)
123
124 stop = time.time()
125 common.logger.debug("Submission Time: "+str(stop - start))
126
127 msg = 'Total of %d jobs submitted'%njs
128 if njs != len(self.nj_list) :
129 msg += ' (from %d requested).'%(len(self.nj_list))
130 else:
131 msg += '.'
132 common.logger.info(msg)
133
134 if (njs < len(self.nj_list) or len(self.nj_list)==0):
135 self.submissionError()
136
137 #wmbs
138 def checkIfCreate(self,type=0):
139 """
140 """
141 code = 0
142 task=common._db.getTask()
143 if type == 1 and len(task.jobs)==0:
144 if task['jobType']=='Submitted':
145 common.logger.info("No Request to be submitted: first create it.\n")
146 code=1
147 else:
148 totalCreatedJobs = 0
149 for job in task.jobs:
150 if job.runningJob['state'] == 'Created': totalCreatedJobs +=1
151
152 if (totalCreatedJobs==0):
153 common.logger.info("No jobs to be submitted: first create them")
154 code = 1
155 return code
156
157
158 def performMatch(self):
159 """
160 """
161 common.logger.info("Checking available resources...")
162 ### define here the list of distinct destinations sites list
163 distinct_dests = common._db.queryDistJob_Attr('dlsDestination', 'jobId' ,self.nj_list)
164
165
166 ### define here the list of jobs Id for each distinct list of sites
167 self.sub_jobs =[] # list of jobs Id list to submit
168 jobs_to_match =[] # list of jobs Id to match
169 all_jobs=[]
170 count=0
171 for distDest in distinct_dests:
172 all_jobs.append(common._db.queryAttrJob({'dlsDestination':distDest},'jobId'))
173 sub_jobs_temp=[]
174 for i in self.nj_list:
175 if i in all_jobs[count]: sub_jobs_temp.append(i)
176 if len(sub_jobs_temp)>0:
177 self.sub_jobs.append(sub_jobs_temp)
178 jobs_to_match.append(self.sub_jobs[count][0])
179 count +=1
180 sel=0
181 matched=[]
182
183 task=common._db.getTask()
184 for id_job in jobs_to_match :
185 match = common.scheduler.listMatch(distinct_dests[sel], False)
186 if len(match)>0:
187 common.logger.info("Found compatible site(s) for job "+str(id_job))
188 matched.append(sel)
189 else:
190 common.logger.info("No compatible site found, will not submit jobs "+str(self.sub_jobs[sel]))
191 self.submissionError()
192 sel += 1
193
194 return matched , task
195
196 def perfromSubmission(self,matched,task):
197
198 njs=0
199
200 ### Progress Bar indicator, deactivate for debug
201 if common.debugLevel == 0 :
202 term = TerminalController()
203
204 if len(matched)>0:
205 common.logger.info(str(len(matched))+" blocks of jobs will be submitted")
206 common.logger.debug("Delegating proxy ")
207 try:
208 common.scheduler.delegateProxy()
209 except CrabException:
210 common.logger.debug("Proxy delegation failed ")
211
212 for ii in matched:
213 common.logger.debug('Submitting jobs '+str(self.sub_jobs[ii]))
214
215 # fix arguments for unique naming of the output
216 common._db.updateResubAttribs(self.sub_jobs[ii])
217
218 try:
219 common.scheduler.submit(self.sub_jobs[ii],task)
220 except CrabException:
221 common.logger.debug('common.scheduler.submit exception. Job(s) possibly not submitted')
222 raise CrabException("Job not submitted")
223
224 if common.debugLevel == 0 :
225 try: pbar = ProgressBar(term, 'Submitting '+str(len(self.sub_jobs[ii]))+' jobs')
226 except: pbar = None
227 if common.debugLevel == 0:
228 if pbar :
229 pbar.update(float(ii+1)/float(len(self.sub_jobs)),'please wait')
230 ### check the if the submission succeded Maybe not needed or at least simplified
231 sched_Id = common._db.queryRunJob('schedulerId', self.sub_jobs[ii])
232 listId=[]
233 run_jobToSave = {'status' :'S'}
234 listRunField = []
235 for j in range(len(self.sub_jobs[ii])):
236 if str(sched_Id[j]) != '':
237 listId.append(self.sub_jobs[ii][j])
238 listRunField.append(run_jobToSave)
239 common.logger.debug("Submitted job # "+ str(self.sub_jobs[ii][j]))
240 njs += 1
241 common._db.updateRunJob_(listId, listRunField)
242 self.stateChange(listId,"SubSuccess")
243 self.SendMLpost(self.sub_jobs[ii])
244 else:
245 common.logger.info("The whole task doesn't found compatible site ")
246
247 return njs
248
249 def submissionError(self):
250 ## add some more verbose message in case submission is not complete
251 msg = 'Submission performed using the Requirements: \n'
252 ### TODO_ DS--BL
253 #msg += common.taskDB.dict("jobtype")+' version: '+common.taskDB.dict("codeVersion")+'\n'
254 #msg += '(Hint: please check if '+common.taskDB.dict("jobtype")+' is available at the Sites)\n'
255 if self.cfg_params.has_key('GRID.se_white_list'):
256 msg += '\tSE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
257 if self.cfg_params.has_key('GRID.se_black_list'):
258 msg += '\tSE Black List: '+self.cfg_params['GRID.se_black_list']+'\n'
259 if self.cfg_params.has_key('GRID.ce_white_list'):
260 msg += '\tCE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
261 if self.cfg_params.has_key('GRID.ce_black_list'):
262 msg += '\tCE Black List: '+self.cfg_params['GRID.ce_black_list']+'\n'
263 removeDefBL = self.cfg_params.get('GRID.remove_default_blacklist',0)
264 if removeDefBL == '0':
265 msg += '\tNote: All CMS T1s are BlackListed by default \n'
266 msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
267 msg += '\tPlease check if:\n'
268 msg += '\t\t -- the dataset is available at this site\n'
269 msg += '\t\t -- the CMSSW version is available at this site\n'
270 msg += '\t\t -- grid submission to CERN & FNAL CAFs is not allowed)\n'
271 msg += '\tPlease also look at the Site Status Page for CMS sites,\n'
272 msg += '\t to check if the sites hosting your data are ok\n'
273 msg += '\t http://dashb-ssb.cern.ch/dashboard/request.py/siteviewhome\n'
274 common.logger.info(msg)
275
276 return
277
278 def collect_MLInfo(self):
279 """
280 Prepare DashBoard information
281 """
282
283 taskId = common._db.queryTask('name')
284 gridName = string.strip(common.scheduler.userName())
285 gridScheduler = common.scheduler.name()
286 if gridScheduler.upper() == 'REMOTEGLIDEIN' :
287 gridScheduler = 'GLIDEIN'
288 common.logger.debug("GRIDNAME: %s "%gridName)
289 #### FEDE for taskType (savannah 76950)
290 taskType = self.cfg_params.get('USER.tasktype','analysis')
291 #### taskType = 'analysis'
292
293 self.executable = self.cfg_params.get('CMSSW.executable','cmsRun')
294 VO = self.cfg_params.get('GRID.virtual_organization','cms')
295
296 params = {'tool': common.prog_name,
297 'SubmissionType':'direct',
298 'JSToolVersion': common.prog_version_str,
299 'tool_ui': os.environ.get('HOSTNAME',''),
300 'scheduler': gridScheduler,
301 'GridName': gridName,
302 'ApplicationVersion': self.scram.getSWVersion(),
303 'taskType': taskType,
304 'vo': VO,
305 'CMSUser': getUserName(),
306 'user': getUserName(),
307 'taskId': str(taskId),
308 'datasetFull': self.datasetPath,
309 'resubmitter': 'user', \
310 'exe': self.executable }
311
312 return params
313
314 def SendMLpre(self):
315 """
316 Send Pre info to ML
317 """
318 params = self.collect_MLInfo()
319
320 params['jobId'] ='TaskMeta'
321
322 common.apmon.sendToML(params)
323
324 common.logger.debug('Submission DashBoard Pre-Submission report: %s'%str(params))
325
326 return
327
328 def SendMLpost(self,allList):
329 """
330 Send post-submission info to ML
331 """
332 task = common._db.getTask(allList)
333
334 params = {}
335 for k,v in self.collect_MLInfo().iteritems():
336 params[k] = v
337
338 msg = ''
339 Sub_Type = 'Direct'
340 for job in task.jobs:
341 jj = job['jobId']
342 jobId = ''
343 localId = ''
344 jid = str(job.runningJob['schedulerId'])
345 if common.scheduler.name().upper() in ['CONDOR_G']:
346 rb = 'OSG'
347 taskHash = sha1(common._db.queryTask('name')).hexdigest()
348 jobId = str(jj) + '_https://' + common.scheduler.name() + '/' + taskHash + '/' + str(jj)
349 msg += ('JobID for ML monitoring is created for CONDOR_G scheduler: %s \n'%str(jobId))
350 elif common.scheduler.name().upper() in ['GLIDEIN']:
351 rb = common.scheduler.name()
352 jobId = str(jj) + '_https://' + str(jid)
353 msg += ('JobID for ML monitoring is created for GLIDEIN scheduler: %s \n'%str(jobId))
354 elif common.scheduler.name().upper() in ['REMOTEGLIDEIN']:
355 rb = str(task['serverName'])
356 jobId = str(jj) + '_https://' + str(jid)
357 msg += ('JobID for ML monitoring is created for REMOTEGLIDEIN scheduler: %s\n'%str(jobId))
358 elif common.scheduler.name().upper() in ['LSF', 'CAF', 'PBS']:
359 jobId= str(jj) + "_https://"+common.scheduler.name().upper()+":/"+jid+"-"+string.replace(str(task['name']),"_","-")
360 msg += ('JobID for ML monitoring is created for %s scheduler: %s\n'%(common.scheduler.name().upper(), str(jobId)) )
361 rb = common.scheduler.name()
362 localId = jid
363 elif common.scheduler.name().upper() in ['CONDOR']:
364 taskHash = sha1(common._db.queryTask('name')).hexdigest()
365 jobId = str(jj) + '_https://' + socket.gethostname() + '/' + taskHash + '/' + str(jj)
366 rb = common.scheduler.name()
367 msg += ('JobID for ML monitoring is created for CONDOR scheduler: %s\n'%str(jobId))
368 elif common.scheduler.name().upper() in ['ARC']:
369 jobId = str(jj) + '_' + str(jid)
370 msg += ('JobID for ML monitoring is created for ARC scheduler: %s\n'%str(jobId))
371 rb = 'ARC'
372 else:
373 jobId = str(jj) + '_' + str(jid)
374 msg += ('JobID for ML monitoring is created for gLite scheduler %s\n'%str(jobId))
375 rb = str(job.runningJob['service'])
376
377 dlsDest = job['dlsDestination']
378 if len(dlsDest) == 1 :
379 T_SE=str(dlsDest[0])
380 elif len(dlsDest) == 2 :
381 T_SE=str(dlsDest[0])+','+str(dlsDest[1])
382 else :
383 T_SE=str(len(dlsDest))+'_Selected_SE'
384
385
386 infos = { 'jobId': jobId, \
387 'sid': jid, \
388 'broker': rb, \
389 'bossId': jj, \
390 'SubmissionType': Sub_Type, \
391 'TargetSE': T_SE, \
392 'localId' : localId}
393
394 for k,v in infos.iteritems():
395 params[k] = v
396
397 msg +=('Submission DashBoard report: %s\n'%str(params))
398 common.apmon.sendToML(params)
399 #common.logger.debug(msg)
400 return
401
402