ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Submitter.py
Revision: 1.133
Committed: Fri Oct 24 16:02:48 2008 UTC (16 years, 6 months ago) by ewv
Content type: text/x-python
Branch: MAIN
Changes since 1.132: +2 -2 lines
Log Message:
Transition to WMCore SiteScreening and SiteDB

File Contents

# User Rev Content
1 nsmirnov 1.1 from Actor import *
2 nsmirnov 1.6 from crab_util import *
3 nsmirnov 1.2 import common
4 corvo 1.9 from ApmonIf import ApmonIf
5 slacapra 1.60 #from random import random
6 corvo 1.30 import time
7 ewv 1.128 import sha
8 slacapra 1.60 from ProgressBar import ProgressBar
9     from TerminalController import TerminalController
10 nsmirnov 1.1
11     class Submitter(Actor):
12 slacapra 1.84 def __init__(self, cfg_params, parsed_range, val):
13 nsmirnov 1.1 self.cfg_params = cfg_params
14 slacapra 1.84
15     # get user request
16     nsjobs = -1
17     chosenJobsList = None
18     if val:
19 slacapra 1.91 if val=='range': # for Resubmitter
20     chosenJobsList = parsed_range
21 ewv 1.92 elif val=='all':
22 slacapra 1.84 pass
23     elif (type(eval(val)) is int) and eval(val) > 0:
24     # positive number
25     nsjobs = eval(val)
26     elif (type(eval(val)) is tuple)or( type(eval(val)) is int and eval(val)<0 ) :
27     chosenJobsList = parsed_range
28 ewv 1.92 nsjobs = len(chosenJobsList)
29 slacapra 1.84 else:
30     msg = 'Bad submission option <'+str(val)+'>\n'
31     msg += ' Must be an integer or "all"'
32     msg += ' Generic range is not allowed"'
33     raise CrabException(msg)
34     pass
35 ewv 1.92
36 slacapra 1.84 common.logger.debug(5,'nsjobs '+str(nsjobs))
37     # total jobs
38     nj_list = []
39     # get the first not already submitted
40 spiga 1.115 self.complete_List = common._db.nJobs('list')
41     common.logger.debug(5,'Total jobs '+str(len(self.complete_List)))
42 slacapra 1.84 jobSetForSubmission = 0
43     jobSkippedInSubmission = []
44     datasetpath=self.cfg_params['CMSSW.datasetpath']
45 slacapra 1.97 if string.lower(datasetpath)=='none':
46 fanzago 1.104 datasetpath = None
47 spiga 1.115 tmp_jList = self.complete_List
48 slacapra 1.84 if chosenJobsList != None:
49     tmp_jList = chosenJobsList
50     # build job list
51 ewv 1.133 from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
52     self.blackWhiteListParser = SEBlackWhiteListParser(self.cfg_params,common.logger)
53 spiga 1.98 dlsDest=common._db.queryJob('dlsDestination',tmp_jList)
54     jStatus=common._db.queryRunJob('status',tmp_jList)
55     for nj in range(len(tmp_jList)):
56 ewv 1.128 cleanedBlackWhiteList = self.blackWhiteListParser.cleanForBlackWhiteList(dlsDest[nj])
57     if (cleanedBlackWhiteList != '') or (datasetpath == None):
58 spiga 1.126 if ( jStatus[nj] not in ['SS','SU','SR','R','S','K','Y','A','D','Z','E','UE','SSE', 'KK']):
59 slacapra 1.84 jobSetForSubmission +=1
60 spiga 1.120 nj_list.append(tmp_jList[nj])
61 ewv 1.92 else:
62 slacapra 1.84 continue
63     else :
64 spiga 1.100 jobSkippedInSubmission.append(tmp_jList[nj])
65 slacapra 1.84 if nsjobs >0 and nsjobs == jobSetForSubmission:
66     break
67     pass
68 ewv 1.128
69 slacapra 1.84 if nsjobs>jobSetForSubmission:
70     common.logger.message('asking to submit '+str(nsjobs)+' jobs, but only '+str(jobSetForSubmission)+' left: submitting those')
71     if len(jobSkippedInSubmission) > 0 :
72     mess =""
73     for jobs in jobSkippedInSubmission:
74     mess += str(jobs) + ","
75     common.logger.message("Jobs: " +str(mess) + "\n skipped because no sites are hosting this data\n")
76 slacapra 1.89 self.submissionError()
77     pass
78 slacapra 1.84 # submit N from last submitted job
79     common.logger.debug(5,'nj_list '+str(nj_list))
80 ewv 1.92
81 corvo 1.30
82 slacapra 1.84 self.nj_list = nj_list
83 nsmirnov 1.1 return
84 ewv 1.92
85 nsmirnov 1.1 def run(self):
86 nsmirnov 1.2 """
87 slacapra 1.53 The main method of the class: submit jobs in range self.nj_list
88 nsmirnov 1.2 """
89     common.logger.debug(5, "Submitter::run() called")
90 slacapra 1.24
91 spiga 1.112 start = time.time()
92    
93 ewv 1.128 check = self.checkIfCreate()
94    
95 spiga 1.112 if check == 0 :
96     self.SendMLpre()
97 ewv 1.128
98     list_matched , task = self.performMatch()
99     njs = self.perfromSubmission(list_matched, task)
100    
101 spiga 1.112 stop = time.time()
102     common.logger.debug(1, "Submission Time: "+str(stop - start))
103     common.logger.write("Submission time :"+str(stop - start))
104 ewv 1.128
105 spiga 1.112 msg = '\nTotal of %d jobs submitted'%njs
106     if njs != len(self.nj_list) :
107     msg += ' (from %d requested).'%(len(self.nj_list))
108     else:
109     msg += '.'
110     common.logger.message(msg)
111 ewv 1.128
112 spiga 1.112 if (njs < len(self.nj_list) or len(self.nj_list)==0):
113     self.submissionError()
114    
115    
116 ewv 1.128 def checkIfCreate(self):
117 spiga 1.112 """
118     """
119     code = 0
120 spiga 1.94 totalCreatedJobs = 0
121 spiga 1.98 jList=common._db.nJobs('list')
122     st = common._db.queryRunJob('status',jList)
123     for nj in range(len(jList)):
124     if ( st[nj] in ['C','RC']):totalCreatedJobs +=1
125 slacapra 1.24 pass
126    
127     if (totalCreatedJobs==0):
128 spiga 1.112 common.logger.message("No jobs to be submitted: first create them")
129 ewv 1.128 code = 1
130     return code
131 ewv 1.92
132 gutsche 1.70
133 ewv 1.128 def performMatch(self):
134     """
135 spiga 1.113 """
136 spiga 1.114 common.logger.message("Checking available resources...")
137 ewv 1.128 ### define here the list of distinct destinations sites list
138 spiga 1.94 distinct_dests = common._db.queryDistJob_Attr('dlsDestination', 'jobId' ,self.nj_list)
139    
140    
141     ### define here the list of jobs Id for each distinct list of sites
142 spiga 1.112 self.sub_jobs =[] # list of jobs Id list to submit
143 spiga 1.95 jobs_to_match =[] # list of jobs Id to match
144 ewv 1.128 all_jobs=[]
145 spiga 1.94 count=0
146 ewv 1.128 for distDest in distinct_dests:
147 spiga 1.94 all_jobs.append(common._db.queryAttrJob({'dlsDestination':distDest},'jobId'))
148     sub_jobs_temp=[]
149     for i in self.nj_list:
150 ewv 1.128 if i in all_jobs[count]: sub_jobs_temp.append(i)
151 spiga 1.94 if len(sub_jobs_temp)>0:
152 ewv 1.128 self.sub_jobs.append(sub_jobs_temp)
153 spiga 1.112 jobs_to_match.append(self.sub_jobs[count][0])
154 spiga 1.103 count +=1
155 spiga 1.94 sel=0
156 ewv 1.128 matched=[]
157 spiga 1.95
158     task=common._db.getTask()
159    
160     for id_job in jobs_to_match :
161 spiga 1.121 match = common.scheduler.listMatch(distinct_dests[sel], False)
162 slacapra 1.111 if len(match)>0:
163 spiga 1.121 common.logger.message("Found compatible site(s) for job "+str(id_job))
164 slacapra 1.110 matched.append(sel)
165 spiga 1.77 else:
166 spiga 1.112 common.logger.message("No compatible site found, will not submit jobs "+str(self.sub_jobs[sel]))
167 slacapra 1.110 self.submissionError()
168 spiga 1.94 sel += 1
169 ewv 1.92
170 ewv 1.128 return matched , task
171 spiga 1.112
172     def perfromSubmission(self,matched,task):
173    
174 ewv 1.128 njs=0
175    
176 spiga 1.94 ### Progress Bar indicator, deactivate for debug
177     if not common.logger.debugLevel() :
178 slacapra 1.110 term = TerminalController()
179 ewv 1.128
180     if len(matched)>0:
181 spiga 1.94 common.logger.message(str(len(matched))+" blocks of jobs will be submitted")
182 ewv 1.128 for ii in matched:
183 spiga 1.112 common.logger.debug(1,'Submitting jobs '+str(self.sub_jobs[ii]))
184    
185 slacapra 1.110 try:
186 spiga 1.112 common.scheduler.submit(self.sub_jobs[ii],task)
187 slacapra 1.110 except CrabException:
188     raise CrabException("Job not submitted")
189    
190 corvo 1.74 if not common.logger.debugLevel() :
191 spiga 1.112 try: pbar = ProgressBar(term, 'Submitting '+str(len(self.sub_jobs[ii]))+' jobs')
192 corvo 1.74 except: pbar = None
193 spiga 1.94 if not common.logger.debugLevel():
194     if pbar :
195 spiga 1.112 pbar.update(float(ii+1)/float(len(self.sub_jobs)),'please wait')
196 ewv 1.128 ### check the if the submission succeded Maybe not neede
197 spiga 1.94 if not common.logger.debugLevel():
198     if pbar :
199 spiga 1.112 pbar.update(float(ii+1)/float(len(self.sub_jobs)),'please wait')
200 ewv 1.92
201 ewv 1.128 ### check the if the submission succeded Maybe not needed or at least simplified
202 spiga 1.112 sched_Id = common._db.queryRunJob('schedulerId', self.sub_jobs[ii])
203 spiga 1.95 listId=[]
204 spiga 1.94 run_jobToSave = {'status' :'S'}
205 spiga 1.108 listRunField = []
206 ewv 1.128 for j in range(len(self.sub_jobs[ii])):
207     if str(sched_Id[j]) != '':
208     listId.append(self.sub_jobs[ii][j])
209     listRunField.append(run_jobToSave)
210 spiga 1.112 common.logger.debug(5,"Submitted job # "+ str(self.sub_jobs[ii][j]))
211 spiga 1.94 njs += 1
212 ewv 1.128 common._db.updateRunJob_(listId, listRunField)
213 spiga 1.112 self.SendMLpost(self.sub_jobs[ii])
214    
215 spiga 1.94 else:
216     common.logger.message("The whole task doesn't found compatible site ")
217 ewv 1.92
218 spiga 1.112 return njs
219 spiga 1.99
220     def submissionError(self):
221     ## add some more verbose message in case submission is not complete
222     msg = 'Submission performed using the Requirements: \n'
223     ### TODO_ DS--BL
224     #msg += common.taskDB.dict("jobtype")+' version: '+common.taskDB.dict("codeVersion")+'\n'
225     #msg += '(Hint: please check if '+common.taskDB.dict("jobtype")+' is available at the Sites)\n'
226     if self.cfg_params.has_key('EDG.se_white_list'):
227     msg += 'SE White List: '+self.cfg_params['EDG.se_white_list']+'\n'
228     if self.cfg_params.has_key('EDG.se_black_list'):
229     msg += 'SE Black List: '+self.cfg_params['EDG.se_black_list']+'\n'
230     if self.cfg_params.has_key('EDG.ce_white_list'):
231     msg += 'CE White List: '+self.cfg_params['EDG.ce_white_list']+'\n'
232     if self.cfg_params.has_key('EDG.ce_black_list'):
233     msg += 'CE Black List: '+self.cfg_params['EDG.ce_black_list']+'\n'
234     msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\nPlease check if the dataset is available at this site!)\n'
235 spiga 1.112 common.logger.message(msg)
236    
237     return
238 spiga 1.99
239 spiga 1.112 def collect_MLInfo(self):
240     """
241 ewv 1.129 Prepare DashBoard information
242 spiga 1.112 """
243 ewv 1.92
244 ewv 1.131 taskId = uniqueTaskName(common._db.queryTask('name'))
245 spiga 1.112 gridName = string.strip(common.scheduler.userName())
246     common.logger.debug(5, "GRIDNAME: "+gridName)
247     taskType = 'analysis'
248 ewv 1.128
249 spiga 1.112 self.datasetPath = self.cfg_params['CMSSW.datasetpath']
250     if string.lower(self.datasetPath)=='none':
251     self.datasetPath = None
252     self.executable = self.cfg_params.get('CMSSW.executable','cmsRun')
253     VO = self.cfg_params.get('EDG.virtual_organization','cms')
254    
255 ewv 1.129 params = {'tool': common.prog_name,
256     'JSToolVersion': common.prog_version_str,
257     'tool_ui': os.environ.get('HOSTNAME',''),
258     'scheduler': common.scheduler.name(),
259     'GridName': gridName,
260     'taskType': taskType,
261     'vo': VO,
262     'user': os.environ.get('USER',''),
263     'taskId': taskId,
264     'datasetFull': self.datasetPath,
265 ewv 1.128 'exe': self.executable }
266 spiga 1.112
267     return params
268 ewv 1.128
269 spiga 1.112 def SendMLpre(self):
270     """
271 ewv 1.128 Send Pre info to ML
272 spiga 1.112 """
273     params = self.collect_MLInfo()
274 ewv 1.128
275 spiga 1.112 params['jobId'] ='TaskMeta'
276 ewv 1.128
277 spiga 1.112 common.apmon.sendToML(params)
278 ewv 1.128
279 spiga 1.112 common.logger.debug(5,'Submission DashBoard Pre-Submission report: '+str(params))
280 ewv 1.128
281 spiga 1.112 return
282 ewv 1.92
283 spiga 1.112 def SendMLpost(self,allList):
284     """
285 ewv 1.128 Send post-submission info to ML
286     """
287     task = common._db.getTask(allList)
288 spiga 1.112
289     params = {}
290     for k,v in self.collect_MLInfo().iteritems():
291     params[k] = v
292 ewv 1.128
293 spiga 1.118
294 spiga 1.112 Sub_Type = 'Direct'
295     for job in task.jobs:
296 ewv 1.128 jj = job['jobId']
297 spiga 1.112 jobId = ''
298     localId = ''
299 ewv 1.128 jid = str(job.runningJob['schedulerId'])
300 ewv 1.130 if common.scheduler.name().upper() in ['CONDOR_G','GLIDEIN']:
301 spiga 1.112 rb = 'OSG'
302 ewv 1.128 taskHash = sha.new(common._db.queryTask('name')).hexdigest()
303 ewv 1.130 jobId = str(jj) + '_https://' + common.scheduler.name() + '/' + taskHash + '/' + str(jj)
304 spiga 1.112 common.logger.debug(5,'JobID for ML monitoring is created for CONDOR_G scheduler:'+jobId)
305 ewv 1.128 elif common.scheduler.name().upper() in ['LSF', 'CAF']:
306 spiga 1.132 jobId="https://"+common.scheduler.name()+":/"+jid+"-"+string.replace(str(task['name']),"_","-")
307 spiga 1.112 common.logger.debug(5,'JobID for ML monitoring is created for LSF scheduler:'+jobId)
308     rb = common.scheduler.name()
309     localId = jid
310     else:
311     jobId = str(jj) + '_' + str(jid)
312     common.logger.debug(5,'JobID for ML monitoring is created for gLite scheduler'+jobId)
313     rb = str(job.runningJob['service'])
314 ewv 1.128
315     dlsDest = job['dlsDestination']
316 spiga 1.125 if len(dlsDest) == 1 :
317     T_SE=str(dlsDest[0])
318     elif len(dlsDest) == 2 :
319     T_SE=str(dlsDest[0])+','+str(dlsDest[1])
320 ewv 1.128 else :
321 spiga 1.112 T_SE=str(len(dlsDest))+'_Selected_SE'
322    
323    
324     infos = { 'jobId': jobId, \
325     'sid': jid, \
326     'broker': rb, \
327     'bossId': jj, \
328     'SubmissionType': Sub_Type, \
329     'TargetSE': T_SE, \
330     'localId' : localId}
331    
332     for k,v in infos.iteritems():
333     params[k] = v
334    
335     common.logger.debug(5,'Submission DashBoard report: '+str(params))
336     common.apmon.sendToML(params)
337 nsmirnov 1.1 return
338 spiga 1.112
339