ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Submitter.py
Revision: 1.130
Committed: Mon Jul 28 15:47:17 2008 UTC (16 years, 9 months ago) by ewv
Content type: text/x-python
Branch: MAIN
Changes since 1.129: +2 -2 lines
Log Message:
Use identifier for CondorG/Glidein closer to Local schedulers

File Contents

# User Rev Content
1 nsmirnov 1.1 from Actor import *
2 nsmirnov 1.6 from crab_util import *
3 nsmirnov 1.2 import common
4 corvo 1.9 from ApmonIf import ApmonIf
5 slacapra 1.60 #from random import random
6 corvo 1.30 import time
7 ewv 1.128 import sha
8 slacapra 1.60 from ProgressBar import ProgressBar
9     from TerminalController import TerminalController
10 nsmirnov 1.1
11     class Submitter(Actor):
12 slacapra 1.84 def __init__(self, cfg_params, parsed_range, val):
13 nsmirnov 1.1 self.cfg_params = cfg_params
14 slacapra 1.84
15     # get user request
16     nsjobs = -1
17     chosenJobsList = None
18     if val:
19 slacapra 1.91 if val=='range': # for Resubmitter
20     chosenJobsList = parsed_range
21 ewv 1.92 elif val=='all':
22 slacapra 1.84 pass
23     elif (type(eval(val)) is int) and eval(val) > 0:
24     # positive number
25     nsjobs = eval(val)
26     elif (type(eval(val)) is tuple)or( type(eval(val)) is int and eval(val)<0 ) :
27     chosenJobsList = parsed_range
28 ewv 1.92 nsjobs = len(chosenJobsList)
29 slacapra 1.84 else:
30     msg = 'Bad submission option <'+str(val)+'>\n'
31     msg += ' Must be an integer or "all"'
32     msg += ' Generic range is not allowed"'
33     raise CrabException(msg)
34     pass
35 ewv 1.92
36 slacapra 1.84 common.logger.debug(5,'nsjobs '+str(nsjobs))
37     # total jobs
38     nj_list = []
39     # get the first not already submitted
40 spiga 1.115 self.complete_List = common._db.nJobs('list')
41     common.logger.debug(5,'Total jobs '+str(len(self.complete_List)))
42 slacapra 1.84 jobSetForSubmission = 0
43     jobSkippedInSubmission = []
44     datasetpath=self.cfg_params['CMSSW.datasetpath']
45 slacapra 1.97 if string.lower(datasetpath)=='none':
46 fanzago 1.104 datasetpath = None
47 spiga 1.115 tmp_jList = self.complete_List
48 slacapra 1.84 if chosenJobsList != None:
49     tmp_jList = chosenJobsList
50     # build job list
51 ewv 1.127 from BlackWhiteListParser import SEBlackWhiteListParser
52     self.blackWhiteListParser = SEBlackWhiteListParser(self.cfg_params)
53 spiga 1.98 dlsDest=common._db.queryJob('dlsDestination',tmp_jList)
54     jStatus=common._db.queryRunJob('status',tmp_jList)
55     for nj in range(len(tmp_jList)):
56 ewv 1.128 cleanedBlackWhiteList = self.blackWhiteListParser.cleanForBlackWhiteList(dlsDest[nj])
57     if (cleanedBlackWhiteList != '') or (datasetpath == None):
58 spiga 1.126 if ( jStatus[nj] not in ['SS','SU','SR','R','S','K','Y','A','D','Z','E','UE','SSE', 'KK']):
59 slacapra 1.84 jobSetForSubmission +=1
60 spiga 1.120 nj_list.append(tmp_jList[nj])
61 ewv 1.92 else:
62 slacapra 1.84 continue
63     else :
64 spiga 1.100 jobSkippedInSubmission.append(tmp_jList[nj])
65 slacapra 1.84 if nsjobs >0 and nsjobs == jobSetForSubmission:
66     break
67     pass
68 ewv 1.128
69 slacapra 1.84 if nsjobs>jobSetForSubmission:
70     common.logger.message('asking to submit '+str(nsjobs)+' jobs, but only '+str(jobSetForSubmission)+' left: submitting those')
71     if len(jobSkippedInSubmission) > 0 :
72     mess =""
73     for jobs in jobSkippedInSubmission:
74     mess += str(jobs) + ","
75     common.logger.message("Jobs: " +str(mess) + "\n skipped because no sites are hosting this data\n")
76 slacapra 1.89 self.submissionError()
77     pass
78 slacapra 1.84 # submit N from last submitted job
79     common.logger.debug(5,'nj_list '+str(nj_list))
80 ewv 1.92
81 corvo 1.30
82 slacapra 1.84 self.nj_list = nj_list
83 nsmirnov 1.1 return
84 ewv 1.92
85 nsmirnov 1.1 def run(self):
86 nsmirnov 1.2 """
87 slacapra 1.53 The main method of the class: submit jobs in range self.nj_list
88 nsmirnov 1.2 """
89     common.logger.debug(5, "Submitter::run() called")
90 slacapra 1.24
91 spiga 1.112 start = time.time()
92    
93 ewv 1.128 check = self.checkIfCreate()
94    
95 spiga 1.112 if check == 0 :
96     self.SendMLpre()
97 ewv 1.128
98     list_matched , task = self.performMatch()
99     njs = self.perfromSubmission(list_matched, task)
100    
101 spiga 1.112 stop = time.time()
102     common.logger.debug(1, "Submission Time: "+str(stop - start))
103     common.logger.write("Submission time :"+str(stop - start))
104 ewv 1.128
105 spiga 1.112 msg = '\nTotal of %d jobs submitted'%njs
106     if njs != len(self.nj_list) :
107     msg += ' (from %d requested).'%(len(self.nj_list))
108     else:
109     msg += '.'
110     common.logger.message(msg)
111 ewv 1.128
112 spiga 1.112 if (njs < len(self.nj_list) or len(self.nj_list)==0):
113     self.submissionError()
114    
115    
116 ewv 1.128 def checkIfCreate(self):
117 spiga 1.112 """
118     """
119     code = 0
120 spiga 1.94 totalCreatedJobs = 0
121 spiga 1.98 jList=common._db.nJobs('list')
122     st = common._db.queryRunJob('status',jList)
123     for nj in range(len(jList)):
124     if ( st[nj] in ['C','RC']):totalCreatedJobs +=1
125 slacapra 1.24 pass
126    
127     if (totalCreatedJobs==0):
128 spiga 1.112 common.logger.message("No jobs to be submitted: first create them")
129 ewv 1.128 code = 1
130     return code
131 ewv 1.92
132 gutsche 1.70
133 ewv 1.128 def performMatch(self):
134     """
135 spiga 1.113 """
136 spiga 1.114 common.logger.message("Checking available resources...")
137 ewv 1.128 ### define here the list of distinct destinations sites list
138 spiga 1.94 distinct_dests = common._db.queryDistJob_Attr('dlsDestination', 'jobId' ,self.nj_list)
139    
140    
141     ### define here the list of jobs Id for each distinct list of sites
142 spiga 1.112 self.sub_jobs =[] # list of jobs Id list to submit
143 spiga 1.95 jobs_to_match =[] # list of jobs Id to match
144 ewv 1.128 all_jobs=[]
145 spiga 1.94 count=0
146 ewv 1.128 for distDest in distinct_dests:
147 spiga 1.94 all_jobs.append(common._db.queryAttrJob({'dlsDestination':distDest},'jobId'))
148     sub_jobs_temp=[]
149     for i in self.nj_list:
150 ewv 1.128 if i in all_jobs[count]: sub_jobs_temp.append(i)
151 spiga 1.94 if len(sub_jobs_temp)>0:
152 ewv 1.128 self.sub_jobs.append(sub_jobs_temp)
153 spiga 1.112 jobs_to_match.append(self.sub_jobs[count][0])
154 spiga 1.103 count +=1
155 spiga 1.94 sel=0
156 ewv 1.128 matched=[]
157 spiga 1.95
158     task=common._db.getTask()
159    
160     for id_job in jobs_to_match :
161 spiga 1.121 match = common.scheduler.listMatch(distinct_dests[sel], False)
162 slacapra 1.111 if len(match)>0:
163 spiga 1.121 common.logger.message("Found compatible site(s) for job "+str(id_job))
164 slacapra 1.110 matched.append(sel)
165 spiga 1.77 else:
166 spiga 1.112 common.logger.message("No compatible site found, will not submit jobs "+str(self.sub_jobs[sel]))
167 slacapra 1.110 self.submissionError()
168 spiga 1.94 sel += 1
169 ewv 1.92
170 ewv 1.128 return matched , task
171 spiga 1.112
172     def perfromSubmission(self,matched,task):
173    
174 ewv 1.128 njs=0
175    
176 spiga 1.94 ### Progress Bar indicator, deactivate for debug
177     if not common.logger.debugLevel() :
178 slacapra 1.110 term = TerminalController()
179 ewv 1.128
180     if len(matched)>0:
181 spiga 1.94 common.logger.message(str(len(matched))+" blocks of jobs will be submitted")
182 ewv 1.128 for ii in matched:
183 spiga 1.112 common.logger.debug(1,'Submitting jobs '+str(self.sub_jobs[ii]))
184    
185 slacapra 1.110 try:
186 spiga 1.112 common.scheduler.submit(self.sub_jobs[ii],task)
187 slacapra 1.110 except CrabException:
188     raise CrabException("Job not submitted")
189    
190 corvo 1.74 if not common.logger.debugLevel() :
191 spiga 1.112 try: pbar = ProgressBar(term, 'Submitting '+str(len(self.sub_jobs[ii]))+' jobs')
192 corvo 1.74 except: pbar = None
193 spiga 1.94 if not common.logger.debugLevel():
194     if pbar :
195 spiga 1.112 pbar.update(float(ii+1)/float(len(self.sub_jobs)),'please wait')
196 ewv 1.128 ### check the if the submission succeded Maybe not neede
197 spiga 1.94 if not common.logger.debugLevel():
198     if pbar :
199 spiga 1.112 pbar.update(float(ii+1)/float(len(self.sub_jobs)),'please wait')
200 ewv 1.92
201 ewv 1.128 ### check the if the submission succeded Maybe not needed or at least simplified
202 spiga 1.112 sched_Id = common._db.queryRunJob('schedulerId', self.sub_jobs[ii])
203 spiga 1.95 listId=[]
204 spiga 1.94 run_jobToSave = {'status' :'S'}
205 spiga 1.108 listRunField = []
206 ewv 1.128 for j in range(len(self.sub_jobs[ii])):
207     if str(sched_Id[j]) != '':
208     listId.append(self.sub_jobs[ii][j])
209     listRunField.append(run_jobToSave)
210 spiga 1.112 common.logger.debug(5,"Submitted job # "+ str(self.sub_jobs[ii][j]))
211 spiga 1.94 njs += 1
212 ewv 1.128 common._db.updateRunJob_(listId, listRunField)
213 spiga 1.112 self.SendMLpost(self.sub_jobs[ii])
214    
215 spiga 1.94 else:
216     common.logger.message("The whole task doesn't found compatible site ")
217 ewv 1.92
218 spiga 1.112 return njs
219 spiga 1.99
220     def submissionError(self):
221     ## add some more verbose message in case submission is not complete
222     msg = 'Submission performed using the Requirements: \n'
223     ### TODO_ DS--BL
224     #msg += common.taskDB.dict("jobtype")+' version: '+common.taskDB.dict("codeVersion")+'\n'
225     #msg += '(Hint: please check if '+common.taskDB.dict("jobtype")+' is available at the Sites)\n'
226     if self.cfg_params.has_key('EDG.se_white_list'):
227     msg += 'SE White List: '+self.cfg_params['EDG.se_white_list']+'\n'
228     if self.cfg_params.has_key('EDG.se_black_list'):
229     msg += 'SE Black List: '+self.cfg_params['EDG.se_black_list']+'\n'
230     if self.cfg_params.has_key('EDG.ce_white_list'):
231     msg += 'CE White List: '+self.cfg_params['EDG.ce_white_list']+'\n'
232     if self.cfg_params.has_key('EDG.ce_black_list'):
233     msg += 'CE Black List: '+self.cfg_params['EDG.ce_black_list']+'\n'
234     msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\nPlease check if the dataset is available at this site!)\n'
235 spiga 1.112 common.logger.message(msg)
236    
237     return
238 spiga 1.99
239 spiga 1.112 def collect_MLInfo(self):
240     """
241 ewv 1.129 Prepare DashBoard information
242 spiga 1.112 """
243 ewv 1.92
244 spiga 1.112 taskId=str("_".join(common._db.queryTask('name').split('_')[:-1]))
245     gridName = string.strip(common.scheduler.userName())
246     common.logger.debug(5, "GRIDNAME: "+gridName)
247     taskType = 'analysis'
248 ewv 1.128
249 spiga 1.112 self.datasetPath = self.cfg_params['CMSSW.datasetpath']
250     if string.lower(self.datasetPath)=='none':
251     self.datasetPath = None
252     self.executable = self.cfg_params.get('CMSSW.executable','cmsRun')
253     VO = self.cfg_params.get('EDG.virtual_organization','cms')
254    
255 ewv 1.129 params = {'tool': common.prog_name,
256     'JSToolVersion': common.prog_version_str,
257     'tool_ui': os.environ.get('HOSTNAME',''),
258     'scheduler': common.scheduler.name(),
259     'GridName': gridName,
260     'taskType': taskType,
261     'vo': VO,
262     'user': os.environ.get('USER',''),
263     'taskId': taskId,
264     'datasetFull': self.datasetPath,
265 ewv 1.128 'exe': self.executable }
266 spiga 1.112
267     return params
268 ewv 1.128
269 spiga 1.112 def SendMLpre(self):
270     """
271 ewv 1.128 Send Pre info to ML
272 spiga 1.112 """
273     params = self.collect_MLInfo()
274 ewv 1.128
275 spiga 1.112 params['jobId'] ='TaskMeta'
276 ewv 1.128
277 spiga 1.112 common.apmon.sendToML(params)
278 ewv 1.128
279 spiga 1.112 common.logger.debug(5,'Submission DashBoard Pre-Submission report: '+str(params))
280 ewv 1.128
281 spiga 1.112 return
282 ewv 1.92
283 spiga 1.112 def SendMLpost(self,allList):
284     """
285 ewv 1.128 Send post-submission info to ML
286     """
287     task = common._db.getTask(allList)
288 spiga 1.112
289     params = {}
290     for k,v in self.collect_MLInfo().iteritems():
291     params[k] = v
292 ewv 1.128
293 spiga 1.118
294     taskId= str("_".join(str(task['name']).split('_')[:-1]))
295 ewv 1.128
296 spiga 1.112 Sub_Type = 'Direct'
297     for job in task.jobs:
298 ewv 1.128 jj = job['jobId']
299 spiga 1.112 jobId = ''
300     localId = ''
301 ewv 1.128 jid = str(job.runningJob['schedulerId'])
302 ewv 1.130 if common.scheduler.name().upper() in ['CONDOR_G','GLIDEIN']:
303 spiga 1.112 rb = 'OSG'
304 ewv 1.128 taskHash = sha.new(common._db.queryTask('name')).hexdigest()
305 ewv 1.130 jobId = str(jj) + '_https://' + common.scheduler.name() + '/' + taskHash + '/' + str(jj)
306 spiga 1.112 common.logger.debug(5,'JobID for ML monitoring is created for CONDOR_G scheduler:'+jobId)
307 ewv 1.128 elif common.scheduler.name().upper() in ['LSF', 'CAF']:
308 spiga 1.118 jobId="https://"+common.scheduler.name()+":/"+jid+"-"+string.replace(str(taskId),"_","-")
309 spiga 1.112 common.logger.debug(5,'JobID for ML monitoring is created for LSF scheduler:'+jobId)
310     rb = common.scheduler.name()
311     localId = jid
312     else:
313     jobId = str(jj) + '_' + str(jid)
314     common.logger.debug(5,'JobID for ML monitoring is created for gLite scheduler'+jobId)
315     rb = str(job.runningJob['service'])
316 ewv 1.128
317     dlsDest = job['dlsDestination']
318 spiga 1.125 if len(dlsDest) == 1 :
319     T_SE=str(dlsDest[0])
320     elif len(dlsDest) == 2 :
321     T_SE=str(dlsDest[0])+','+str(dlsDest[1])
322 ewv 1.128 else :
323 spiga 1.112 T_SE=str(len(dlsDest))+'_Selected_SE'
324    
325    
326     infos = { 'jobId': jobId, \
327     'sid': jid, \
328     'broker': rb, \
329     'bossId': jj, \
330     'SubmissionType': Sub_Type, \
331     'TargetSE': T_SE, \
332     'localId' : localId}
333    
334     for k,v in infos.iteritems():
335     params[k] = v
336    
337     common.logger.debug(5,'Submission DashBoard report: '+str(params))
338     common.apmon.sendToML(params)
339 nsmirnov 1.1 return
340 spiga 1.112
341