ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Submitter.py
Revision: 1.179
Committed: Fri Aug 5 15:36:10 2011 UTC (13 years, 8 months ago) by fanzago
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_7_9_pre2
Changes since 1.178: +12 -1 lines
Log Message:
try to remove the crab working_dir if the task has problem during creation step, savannah bug 85243

File Contents

# User Rev Content
1 nsmirnov 1.1 from Actor import *
2 nsmirnov 1.6 from crab_util import *
3 nsmirnov 1.2 import common
4 corvo 1.9 from ApmonIf import ApmonIf
5 slacapra 1.60 #from random import random
6 corvo 1.30 import time
7 ewv 1.139 import socket
8 ewv 1.140 import Scram
9 slacapra 1.60 from ProgressBar import ProgressBar
10     from TerminalController import TerminalController
11 spiga 1.175 try:
12     from hashlib import sha1
13     except:
14     from sha import sha as sha1
15    
16 nsmirnov 1.1
17     class Submitter(Actor):
18 slacapra 1.84 def __init__(self, cfg_params, parsed_range, val):
19 nsmirnov 1.1 self.cfg_params = cfg_params
20 ewv 1.161 self.limitJobs = True
21 slacapra 1.84 # get user request
22 spiga 1.156 self.nsjobs = -1
23     self.chosenJobsList = None
24 slacapra 1.84 if val:
25 slacapra 1.91 if val=='range': # for Resubmitter
26 spiga 1.156 self.chosenJobsList = parsed_range
27 ewv 1.92 elif val=='all':
28 slacapra 1.84 pass
29     elif (type(eval(val)) is int) and eval(val) > 0:
30     # positive number
31 spiga 1.156 self.nsjobs = eval(val)
32 slacapra 1.84 elif (type(eval(val)) is tuple)or( type(eval(val)) is int and eval(val)<0 ) :
33 spiga 1.156 self.chosenJobsList = parsed_range
34 farinafa 1.158 self.nsjobs = len(self.chosenJobsList)
35 slacapra 1.84 else:
36     msg = 'Bad submission option <'+str(val)+'>\n'
37     msg += ' Must be an integer or "all"'
38     msg += ' Generic range is not allowed"'
39     raise CrabException(msg)
40     pass
41 spiga 1.156 self.seWhiteList = cfg_params.get('GRID.se_white_list',[])
42     self.seBlackList = cfg_params.get('GRID.se_black_list',[])
43 ewv 1.157 self.datasetPath=self.cfg_params['CMSSW.datasetpath']
44     if string.lower(self.datasetPath)=='none':
45     self.datasetPath = None
46 spiga 1.156 self.scram = Scram.Scram(cfg_params)
47     return
48    
49 spiga 1.165 #wmbs
50     def BuildJobList(self,type=0):
51 slacapra 1.84 # total jobs
52     nj_list = []
53 spiga 1.166 self.complete_List = common._db.nJobs('list')
54 riahi 1.177 if type==1:
55 spiga 1.166 self.nj_list =[]
56 ewv 1.176 if self.chosenJobsList: self.nj_list = self.chosenJobsList
57 spiga 1.165 return
58 spiga 1.156 # build job list
59     from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
60     self.blackWhiteListParser = SEBlackWhiteListParser(self.seWhiteList, self.seBlackList, common.logger())
61     common.logger.debug('nsjobs '+str(self.nsjobs))
62 slacapra 1.84 # get the first not already submitted
63 spiga 1.145 common.logger.debug('Total jobs '+str(len(self.complete_List)))
64 ewv 1.161
65 slacapra 1.84 jobSetForSubmission = 0
66     jobSkippedInSubmission = []
67 spiga 1.115 tmp_jList = self.complete_List
68 spiga 1.156 if self.chosenJobsList != None:
69     tmp_jList = self.chosenJobsList
70 spiga 1.134 for job in common._db.getTask(tmp_jList).jobs:
71 ewv 1.135 cleanedBlackWhiteList = self.blackWhiteListParser.cleanForBlackWhiteList(job['dlsDestination'])
72 ewv 1.157 if (cleanedBlackWhiteList != '') or (self.datasetPath == None):
73 slacapra 1.155 #if ( job.runningJob['status'] in ['C','RC'] and job.runningJob['statusScheduler'] in ['Created',None]):
74     if ( job.runningJob['state'] in ['Created']):
75 slacapra 1.84 jobSetForSubmission +=1
76 ewv 1.135 nj_list.append(job['id'])
77 ewv 1.92 else:
78 slacapra 1.84 continue
79     else :
80 spiga 1.134 jobSkippedInSubmission.append( job['id'] )
81 spiga 1.156 if self.nsjobs >0 and self.nsjobs == jobSetForSubmission:
82 slacapra 1.84 break
83     pass
84 spiga 1.156 if self.nsjobs>jobSetForSubmission:
85     common.logger.info('asking to submit '+str(self.nsjobs)+' jobs, but only '+\
86 spiga 1.134 str(jobSetForSubmission)+' left: submitting those')
87 slacapra 1.84 if len(jobSkippedInSubmission) > 0 :
88     mess =""
89     for jobs in jobSkippedInSubmission:
90     mess += str(jobs) + ","
91 spiga 1.156 common.logger.info("Jobs: " +str(mess) + "\n\tskipped because no sites are hosting this data\n")
92 slacapra 1.89 self.submissionError()
93     pass
94 slacapra 1.84 # submit N from last submitted job
95 spiga 1.145 common.logger.debug('nj_list '+str(nj_list))
96 slacapra 1.84 self.nj_list = nj_list
97 ewv 1.161 if self.limitJobs and len(self.nj_list) > 500:
98 fanzago 1.179 ###### FEDE FOR BUG 85243 ##############
99 ewv 1.161 msg = "The CRAB client will not submit more than 500 jobs.\n"
100 fanzago 1.179 msg += " Use the server mode or submit your jobs in smaller groups"
101     add = '\n'
102     import shutil
103     try:
104     add += ' --->> Removing the working_dir ' + common.work_space._top_dir + ' \n'
105     shutil.rmtree(common.work_space._top_dir)
106     except OSError:
107     add += ' Warning: problems removing the working_dir ' + common.work_space._top_dir + ' \n'
108     add += ' Please remove it by hand.'
109     msg += add
110 ewv 1.161 raise CrabException(msg)
111 fanzago 1.179 ########################################
112 nsmirnov 1.1 return
113 ewv 1.92
114 nsmirnov 1.1 def run(self):
115 nsmirnov 1.2 """
116 slacapra 1.53 The main method of the class: submit jobs in range self.nj_list
117 nsmirnov 1.2 """
118 spiga 1.145 common.logger.debug("Submitter::run() called")
119 slacapra 1.24
120 spiga 1.112 start = time.time()
121    
122 riahi 1.177
123 spiga 1.156 self.BuildJobList()
124    
125 ewv 1.128 check = self.checkIfCreate()
126    
127 spiga 1.112 if check == 0 :
128     self.SendMLpre()
129 ewv 1.128
130     list_matched , task = self.performMatch()
131     njs = self.perfromSubmission(list_matched, task)
132    
133 spiga 1.112 stop = time.time()
134 spiga 1.145 common.logger.debug("Submission Time: "+str(stop - start))
135 ewv 1.128
136 spiga 1.149 msg = 'Total of %d jobs submitted'%njs
137 spiga 1.112 if njs != len(self.nj_list) :
138     msg += ' (from %d requested).'%(len(self.nj_list))
139     else:
140     msg += '.'
141 spiga 1.145 common.logger.info(msg)
142 ewv 1.128
143 spiga 1.112 if (njs < len(self.nj_list) or len(self.nj_list)==0):
144     self.submissionError()
145    
146 spiga 1.165 #wmbs
147     def checkIfCreate(self,type=0):
148 spiga 1.112 """
149     """
150     code = 0
151 spiga 1.134 task=common._db.getTask()
152 spiga 1.165 if type == 1 and len(task.jobs)==0:
153     if task['jobType']=='Submitted':
154     common.logger.info("No Request to be submitted: first create it.\n")
155     code=1
156     else:
157     totalCreatedJobs = 0
158     for job in task.jobs:
159     if job.runningJob['state'] == 'Created': totalCreatedJobs +=1
160 ewv 1.176
161 spiga 1.165 if (totalCreatedJobs==0):
162     common.logger.info("No jobs to be submitted: first create them")
163     code = 1
164 ewv 1.128 return code
165 ewv 1.92
166 gutsche 1.70
167 ewv 1.128 def performMatch(self):
168     """
169 spiga 1.113 """
170 spiga 1.145 common.logger.info("Checking available resources...")
171 ewv 1.128 ### define here the list of distinct destinations sites list
172 spiga 1.94 distinct_dests = common._db.queryDistJob_Attr('dlsDestination', 'jobId' ,self.nj_list)
173    
174    
175     ### define here the list of jobs Id for each distinct list of sites
176 spiga 1.112 self.sub_jobs =[] # list of jobs Id list to submit
177 spiga 1.95 jobs_to_match =[] # list of jobs Id to match
178 ewv 1.128 all_jobs=[]
179 spiga 1.94 count=0
180 ewv 1.128 for distDest in distinct_dests:
181 spiga 1.94 all_jobs.append(common._db.queryAttrJob({'dlsDestination':distDest},'jobId'))
182     sub_jobs_temp=[]
183     for i in self.nj_list:
184 ewv 1.128 if i in all_jobs[count]: sub_jobs_temp.append(i)
185 spiga 1.94 if len(sub_jobs_temp)>0:
186 ewv 1.128 self.sub_jobs.append(sub_jobs_temp)
187 spiga 1.112 jobs_to_match.append(self.sub_jobs[count][0])
188 spiga 1.103 count +=1
189 spiga 1.94 sel=0
190 ewv 1.128 matched=[]
191 spiga 1.95
192     task=common._db.getTask()
193     for id_job in jobs_to_match :
194 spiga 1.121 match = common.scheduler.listMatch(distinct_dests[sel], False)
195 slacapra 1.111 if len(match)>0:
196 spiga 1.145 common.logger.info("Found compatible site(s) for job "+str(id_job))
197 slacapra 1.110 matched.append(sel)
198 spiga 1.77 else:
199 spiga 1.145 common.logger.info("No compatible site found, will not submit jobs "+str(self.sub_jobs[sel]))
200 slacapra 1.110 self.submissionError()
201 spiga 1.94 sel += 1
202 ewv 1.92
203 ewv 1.128 return matched , task
204 spiga 1.112
205     def perfromSubmission(self,matched,task):
206    
207 ewv 1.128 njs=0
208    
209 spiga 1.94 ### Progress Bar indicator, deactivate for debug
210 spiga 1.147 if common.debugLevel == 0 :
211 slacapra 1.110 term = TerminalController()
212 ewv 1.128
213     if len(matched)>0:
214 spiga 1.145 common.logger.info(str(len(matched))+" blocks of jobs will be submitted")
215 spiga 1.159 common.logger.debug("Delegating proxy ")
216     try:
217     common.scheduler.delegateProxy()
218     except CrabException:
219 farinafa 1.163 common.logger.debug("Proxy delegation failed ")
220 spiga 1.159
221 ewv 1.128 for ii in matched:
222 spiga 1.145 common.logger.debug('Submitting jobs '+str(self.sub_jobs[ii]))
223 spiga 1.112
224 farinafa 1.174 # fix arguments for unique naming of the output
225     common._db.updateResubAttribs(self.sub_jobs[ii])
226    
227 slacapra 1.110 try:
228 spiga 1.112 common.scheduler.submit(self.sub_jobs[ii],task)
229 slacapra 1.110 except CrabException:
230     raise CrabException("Job not submitted")
231    
232 spiga 1.150 if common.debugLevel == 0 :
233 spiga 1.112 try: pbar = ProgressBar(term, 'Submitting '+str(len(self.sub_jobs[ii]))+' jobs')
234 corvo 1.74 except: pbar = None
235 spiga 1.150 if common.debugLevel == 0:
236 spiga 1.94 if pbar :
237 spiga 1.112 pbar.update(float(ii+1)/float(len(self.sub_jobs)),'please wait')
238 ewv 1.128 ### check the if the submission succeded Maybe not needed or at least simplified
239 spiga 1.112 sched_Id = common._db.queryRunJob('schedulerId', self.sub_jobs[ii])
240 spiga 1.95 listId=[]
241 spiga 1.94 run_jobToSave = {'status' :'S'}
242 spiga 1.108 listRunField = []
243 ewv 1.128 for j in range(len(self.sub_jobs[ii])):
244     if str(sched_Id[j]) != '':
245     listId.append(self.sub_jobs[ii][j])
246     listRunField.append(run_jobToSave)
247 spiga 1.145 common.logger.debug("Submitted job # "+ str(self.sub_jobs[ii][j]))
248 spiga 1.94 njs += 1
249 ewv 1.128 common._db.updateRunJob_(listId, listRunField)
250 mcinquil 1.144 self.stateChange(listId,"SubSuccess")
251 spiga 1.112 self.SendMLpost(self.sub_jobs[ii])
252 spiga 1.94 else:
253 spiga 1.145 common.logger.info("The whole task doesn't found compatible site ")
254 ewv 1.92
255 spiga 1.112 return njs
256 spiga 1.99
257     def submissionError(self):
258     ## add some more verbose message in case submission is not complete
259     msg = 'Submission performed using the Requirements: \n'
260     ### TODO_ DS--BL
261     #msg += common.taskDB.dict("jobtype")+' version: '+common.taskDB.dict("codeVersion")+'\n'
262     #msg += '(Hint: please check if '+common.taskDB.dict("jobtype")+' is available at the Sites)\n'
263 spiga 1.146 if self.cfg_params.has_key('GRID.se_white_list'):
264     msg += '\tSE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
265     if self.cfg_params.has_key('GRID.se_black_list'):
266     msg += '\tSE Black List: '+self.cfg_params['GRID.se_black_list']+'\n'
267     if self.cfg_params.has_key('GRID.ce_white_list'):
268     msg += '\tCE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
269     if self.cfg_params.has_key('GRID.ce_black_list'):
270     msg += '\tCE Black List: '+self.cfg_params['GRID.ce_black_list']+'\n'
271     removeDefBL = self.cfg_params.get('GRID.remove_default_blacklist',0)
272 spiga 1.137 if removeDefBL == '0':
273     msg += '\tNote: All CMS T1s are BlackListed by default \n'
274 spiga 1.136 msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
275 ewv 1.176 msg += '\tPlease check if:\n'
276     msg += '\t\t -- the dataset is available at this site\n'
277     msg += '\t\t -- the CMSSW version is available at this site\n'
278     msg += '\t\t -- grid submission to CERN & FNAL CAFs is not allowed)\n'
279     msg += '\tPlease also look at the Site Status Page for CMS sites,\n'
280 slacapra 1.169 msg += '\t to check if the sites hosting your data are ok\n'
281     msg += '\t http://dashb-ssb.cern.ch/dashboard/request.py/siteviewhome\n'
282 spiga 1.145 common.logger.info(msg)
283 spiga 1.112
284     return
285 spiga 1.99
286 spiga 1.112 def collect_MLInfo(self):
287     """
288 ewv 1.129 Prepare DashBoard information
289 spiga 1.112 """
290 ewv 1.92
291 spiga 1.142 taskId = common._db.queryTask('name')
292 spiga 1.112 gridName = string.strip(common.scheduler.userName())
293 spiga 1.151 common.logger.debug("GRIDNAME: %s "%gridName)
294 fanzago 1.178 #### FEDE for taskType (savannah 76950)
295     taskType = self.cfg_params.get('USER.tasktype','analysis')
296     #### taskType = 'analysis'
297 ewv 1.128
298 spiga 1.112 self.executable = self.cfg_params.get('CMSSW.executable','cmsRun')
299 spiga 1.146 VO = self.cfg_params.get('GRID.virtual_organization','cms')
300 spiga 1.112
301 ewv 1.129 params = {'tool': common.prog_name,
302 ewv 1.157 'SubmissionType':'direct',
303 ewv 1.129 'JSToolVersion': common.prog_version_str,
304     'tool_ui': os.environ.get('HOSTNAME',''),
305     'scheduler': common.scheduler.name(),
306     'GridName': gridName,
307 ewv 1.140 'ApplicationVersion': self.scram.getSWVersion(),
308 ewv 1.129 'taskType': taskType,
309     'vo': VO,
310 spiga 1.142 'CMSUser': getUserName(),
311     'user': getUserName(),
312 spiga 1.143 'taskId': str(taskId),
313 ewv 1.129 'datasetFull': self.datasetPath,
314 farinafa 1.171 'resubmitter': 'user', \
315 ewv 1.128 'exe': self.executable }
316 spiga 1.112
317     return params
318 ewv 1.128
319 spiga 1.112 def SendMLpre(self):
320     """
321 ewv 1.128 Send Pre info to ML
322 spiga 1.112 """
323     params = self.collect_MLInfo()
324 ewv 1.128
325 spiga 1.112 params['jobId'] ='TaskMeta'
326 ewv 1.128
327 spiga 1.112 common.apmon.sendToML(params)
328 ewv 1.128
329 spiga 1.151 common.logger.debug('Submission DashBoard Pre-Submission report: %s'%str(params))
330 ewv 1.128
331 spiga 1.112 return
332 ewv 1.92
333 spiga 1.112 def SendMLpost(self,allList):
334     """
335 ewv 1.128 Send post-submission info to ML
336     """
337     task = common._db.getTask(allList)
338 spiga 1.112
339     params = {}
340     for k,v in self.collect_MLInfo().iteritems():
341     params[k] = v
342 ewv 1.128
343 ewv 1.157 msg = ''
344 spiga 1.112 Sub_Type = 'Direct'
345     for job in task.jobs:
346 ewv 1.128 jj = job['jobId']
347 spiga 1.112 jobId = ''
348     localId = ''
349 ewv 1.128 jid = str(job.runningJob['schedulerId'])
350 ewv 1.162 if common.scheduler.name().upper() in ['CONDOR_G']:
351 spiga 1.112 rb = 'OSG'
352 ewv 1.176 taskHash = sha1(common._db.queryTask('name')).hexdigest()
353 ewv 1.130 jobId = str(jj) + '_https://' + common.scheduler.name() + '/' + taskHash + '/' + str(jj)
354 spiga 1.151 msg += ('JobID for ML monitoring is created for CONDOR_G scheduler: %s \n'%str(jobId))
355 ewv 1.162 elif common.scheduler.name().upper() in ['GLIDEIN']:
356     rb = common.scheduler.name()
357     jobId = str(jj) + '_https://' + str(jid)
358     msg += ('JobID for ML monitoring is created for GLIDEIN scheduler: %s \n'%str(jobId))
359 mcinquil 1.164 elif common.scheduler.name().upper() in ['LSF', 'CAF', 'PBS']:
360 spiga 1.172 jobId= str(jj) + "_https://"+common.scheduler.name().upper()+":/"+jid+"-"+string.replace(str(task['name']),"_","-")
361 mcinquil 1.164 msg += ('JobID for ML monitoring is created for %s scheduler: %s\n'%(common.scheduler.name().upper(), str(jobId)) )
362 spiga 1.112 rb = common.scheduler.name()
363     localId = jid
364 ewv 1.139 elif common.scheduler.name().upper() in ['CONDOR']:
365 ewv 1.176 taskHash = sha1(common._db.queryTask('name')).hexdigest()
366 ewv 1.139 jobId = str(jj) + '_https://' + socket.gethostname() + '/' + taskHash + '/' + str(jj)
367 ewv 1.162 rb = common.scheduler.name()
368 spiga 1.151 msg += ('JobID for ML monitoring is created for CONDOR scheduler: %s\n'%str(jobId))
369 edelmann 1.154 elif common.scheduler.name().upper() in ['ARC']:
370 edelmann 1.167 jobId = str(jj) + '_' + str(jid)
371 edelmann 1.154 msg += ('JobID for ML monitoring is created for ARC scheduler: %s\n'%str(jobId))
372     rb = 'ARC'
373 spiga 1.112 else:
374     jobId = str(jj) + '_' + str(jid)
375 spiga 1.151 msg += ('JobID for ML monitoring is created for gLite scheduler %s\n'%str(jobId))
376 spiga 1.112 rb = str(job.runningJob['service'])
377 ewv 1.128
378     dlsDest = job['dlsDestination']
379 spiga 1.125 if len(dlsDest) == 1 :
380     T_SE=str(dlsDest[0])
381     elif len(dlsDest) == 2 :
382     T_SE=str(dlsDest[0])+','+str(dlsDest[1])
383 ewv 1.128 else :
384 spiga 1.112 T_SE=str(len(dlsDest))+'_Selected_SE'
385    
386    
387     infos = { 'jobId': jobId, \
388     'sid': jid, \
389     'broker': rb, \
390     'bossId': jj, \
391     'SubmissionType': Sub_Type, \
392     'TargetSE': T_SE, \
393     'localId' : localId}
394    
395     for k,v in infos.iteritems():
396     params[k] = v
397    
398 spiga 1.151 msg +=('Submission DashBoard report: %s\n'%str(params))
399 spiga 1.112 common.apmon.sendToML(params)
400 spiga 1.151 common.logger.log(10-1,msg)
401 nsmirnov 1.1 return
402 spiga 1.112
403