ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Submitter.py
Revision: 1.128
Committed: Fri Jul 25 14:37:47 2008 UTC (16 years, 9 months ago) by ewv
Content type: text/x-python
Branch: MAIN
Changes since 1.127: +58 -57 lines
Log Message:
Fix for Dashboard reporting in CondorG

File Contents

# User Rev Content
1 nsmirnov 1.1 from Actor import *
2 nsmirnov 1.6 from crab_util import *
3 nsmirnov 1.2 import common
4 corvo 1.9 from ApmonIf import ApmonIf
5 slacapra 1.60 #from random import random
6 corvo 1.30 import time
7 ewv 1.128 import sha
8 slacapra 1.60 from ProgressBar import ProgressBar
9     from TerminalController import TerminalController
10 nsmirnov 1.1
11     class Submitter(Actor):
12 slacapra 1.84 def __init__(self, cfg_params, parsed_range, val):
13 nsmirnov 1.1 self.cfg_params = cfg_params
14 slacapra 1.84
15     # get user request
16     nsjobs = -1
17     chosenJobsList = None
18     if val:
19 slacapra 1.91 if val=='range': # for Resubmitter
20     chosenJobsList = parsed_range
21 ewv 1.92 elif val=='all':
22 slacapra 1.84 pass
23     elif (type(eval(val)) is int) and eval(val) > 0:
24     # positive number
25     nsjobs = eval(val)
26     elif (type(eval(val)) is tuple)or( type(eval(val)) is int and eval(val)<0 ) :
27     chosenJobsList = parsed_range
28 ewv 1.92 nsjobs = len(chosenJobsList)
29 slacapra 1.84 else:
30     msg = 'Bad submission option <'+str(val)+'>\n'
31     msg += ' Must be an integer or "all"'
32     msg += ' Generic range is not allowed"'
33     raise CrabException(msg)
34     pass
35 ewv 1.92
36 slacapra 1.84 common.logger.debug(5,'nsjobs '+str(nsjobs))
37     # total jobs
38     nj_list = []
39     # get the first not already submitted
40 spiga 1.115 self.complete_List = common._db.nJobs('list')
41     common.logger.debug(5,'Total jobs '+str(len(self.complete_List)))
42 slacapra 1.84 jobSetForSubmission = 0
43     jobSkippedInSubmission = []
44     datasetpath=self.cfg_params['CMSSW.datasetpath']
45 slacapra 1.97 if string.lower(datasetpath)=='none':
46 fanzago 1.104 datasetpath = None
47 spiga 1.115 tmp_jList = self.complete_List
48 slacapra 1.84 if chosenJobsList != None:
49     tmp_jList = chosenJobsList
50     # build job list
51 ewv 1.127 from BlackWhiteListParser import SEBlackWhiteListParser
52     self.blackWhiteListParser = SEBlackWhiteListParser(self.cfg_params)
53 spiga 1.98 dlsDest=common._db.queryJob('dlsDestination',tmp_jList)
54     jStatus=common._db.queryRunJob('status',tmp_jList)
55     for nj in range(len(tmp_jList)):
56 ewv 1.128 cleanedBlackWhiteList = self.blackWhiteListParser.cleanForBlackWhiteList(dlsDest[nj])
57     if (cleanedBlackWhiteList != '') or (datasetpath == None):
58 spiga 1.126 if ( jStatus[nj] not in ['SS','SU','SR','R','S','K','Y','A','D','Z','E','UE','SSE', 'KK']):
59 slacapra 1.84 jobSetForSubmission +=1
60 spiga 1.120 nj_list.append(tmp_jList[nj])
61 ewv 1.92 else:
62 slacapra 1.84 continue
63     else :
64 spiga 1.100 jobSkippedInSubmission.append(tmp_jList[nj])
65 slacapra 1.84 if nsjobs >0 and nsjobs == jobSetForSubmission:
66     break
67     pass
68 ewv 1.128
69 slacapra 1.84 if nsjobs>jobSetForSubmission:
70     common.logger.message('asking to submit '+str(nsjobs)+' jobs, but only '+str(jobSetForSubmission)+' left: submitting those')
71     if len(jobSkippedInSubmission) > 0 :
72     mess =""
73     for jobs in jobSkippedInSubmission:
74     mess += str(jobs) + ","
75     common.logger.message("Jobs: " +str(mess) + "\n skipped because no sites are hosting this data\n")
76 slacapra 1.89 self.submissionError()
77     pass
78 slacapra 1.84 # submit N from last submitted job
79     common.logger.debug(5,'nj_list '+str(nj_list))
80 ewv 1.92
81 corvo 1.30
82 slacapra 1.84 self.nj_list = nj_list
83 nsmirnov 1.1 return
84 ewv 1.92
85 nsmirnov 1.1 def run(self):
86 nsmirnov 1.2 """
87 slacapra 1.53 The main method of the class: submit jobs in range self.nj_list
88 nsmirnov 1.2 """
89     common.logger.debug(5, "Submitter::run() called")
90 slacapra 1.24
91 spiga 1.112 start = time.time()
92    
93 ewv 1.128 check = self.checkIfCreate()
94    
95 spiga 1.112 if check == 0 :
96     self.SendMLpre()
97 ewv 1.128
98     list_matched , task = self.performMatch()
99     njs = self.perfromSubmission(list_matched, task)
100    
101 spiga 1.112 stop = time.time()
102     common.logger.debug(1, "Submission Time: "+str(stop - start))
103     common.logger.write("Submission time :"+str(stop - start))
104 ewv 1.128
105 spiga 1.112 msg = '\nTotal of %d jobs submitted'%njs
106     if njs != len(self.nj_list) :
107     msg += ' (from %d requested).'%(len(self.nj_list))
108     else:
109     msg += '.'
110     common.logger.message(msg)
111 ewv 1.128
112 spiga 1.112 if (njs < len(self.nj_list) or len(self.nj_list)==0):
113     self.submissionError()
114    
115    
116 ewv 1.128 def checkIfCreate(self):
117 spiga 1.112 """
118     """
119     code = 0
120 spiga 1.94 totalCreatedJobs = 0
121 spiga 1.98 jList=common._db.nJobs('list')
122     st = common._db.queryRunJob('status',jList)
123     for nj in range(len(jList)):
124     if ( st[nj] in ['C','RC']):totalCreatedJobs +=1
125 slacapra 1.24 pass
126    
127     if (totalCreatedJobs==0):
128 spiga 1.112 common.logger.message("No jobs to be submitted: first create them")
129 ewv 1.128 code = 1
130     return code
131 ewv 1.92
132 gutsche 1.70
133 ewv 1.128 def performMatch(self):
134     """
135 spiga 1.113 """
136 spiga 1.114 common.logger.message("Checking available resources...")
137 ewv 1.128 ### define here the list of distinct destinations sites list
138 spiga 1.94 distinct_dests = common._db.queryDistJob_Attr('dlsDestination', 'jobId' ,self.nj_list)
139    
140    
141     ### define here the list of jobs Id for each distinct list of sites
142 spiga 1.112 self.sub_jobs =[] # list of jobs Id list to submit
143 spiga 1.95 jobs_to_match =[] # list of jobs Id to match
144 ewv 1.128 all_jobs=[]
145 spiga 1.94 count=0
146 ewv 1.128 for distDest in distinct_dests:
147 spiga 1.94 all_jobs.append(common._db.queryAttrJob({'dlsDestination':distDest},'jobId'))
148     sub_jobs_temp=[]
149     for i in self.nj_list:
150 ewv 1.128 if i in all_jobs[count]: sub_jobs_temp.append(i)
151 spiga 1.94 if len(sub_jobs_temp)>0:
152 ewv 1.128 self.sub_jobs.append(sub_jobs_temp)
153 spiga 1.112 jobs_to_match.append(self.sub_jobs[count][0])
154 spiga 1.103 count +=1
155 spiga 1.94 sel=0
156 ewv 1.128 matched=[]
157 spiga 1.95
158     task=common._db.getTask()
159    
160     for id_job in jobs_to_match :
161 spiga 1.121 match = common.scheduler.listMatch(distinct_dests[sel], False)
162 slacapra 1.111 if len(match)>0:
163 spiga 1.121 common.logger.message("Found compatible site(s) for job "+str(id_job))
164 slacapra 1.110 matched.append(sel)
165 spiga 1.77 else:
166 spiga 1.112 common.logger.message("No compatible site found, will not submit jobs "+str(self.sub_jobs[sel]))
167 slacapra 1.110 self.submissionError()
168 spiga 1.94 sel += 1
169 ewv 1.92
170 ewv 1.128 return matched , task
171 spiga 1.112
172     def perfromSubmission(self,matched,task):
173    
174 ewv 1.128 njs=0
175    
176 spiga 1.94 ### Progress Bar indicator, deactivate for debug
177     if not common.logger.debugLevel() :
178 slacapra 1.110 term = TerminalController()
179 ewv 1.128
180     if len(matched)>0:
181 spiga 1.94 common.logger.message(str(len(matched))+" blocks of jobs will be submitted")
182 ewv 1.128 for ii in matched:
183 spiga 1.112 common.logger.debug(1,'Submitting jobs '+str(self.sub_jobs[ii]))
184    
185 slacapra 1.110 try:
186 spiga 1.112 common.scheduler.submit(self.sub_jobs[ii],task)
187 slacapra 1.110 except CrabException:
188     raise CrabException("Job not submitted")
189    
190 corvo 1.74 if not common.logger.debugLevel() :
191 spiga 1.112 try: pbar = ProgressBar(term, 'Submitting '+str(len(self.sub_jobs[ii]))+' jobs')
192 corvo 1.74 except: pbar = None
193 spiga 1.94 if not common.logger.debugLevel():
194     if pbar :
195 spiga 1.112 pbar.update(float(ii+1)/float(len(self.sub_jobs)),'please wait')
196 ewv 1.128 ### check the if the submission succeded Maybe not neede
197 spiga 1.94 if not common.logger.debugLevel():
198     if pbar :
199 spiga 1.112 pbar.update(float(ii+1)/float(len(self.sub_jobs)),'please wait')
200 ewv 1.92
201 ewv 1.128 ### check the if the submission succeded Maybe not needed or at least simplified
202 spiga 1.112 sched_Id = common._db.queryRunJob('schedulerId', self.sub_jobs[ii])
203 spiga 1.95 listId=[]
204 spiga 1.94 run_jobToSave = {'status' :'S'}
205 spiga 1.108 listRunField = []
206 ewv 1.128 for j in range(len(self.sub_jobs[ii])):
207     if str(sched_Id[j]) != '':
208     listId.append(self.sub_jobs[ii][j])
209     listRunField.append(run_jobToSave)
210 spiga 1.112 common.logger.debug(5,"Submitted job # "+ str(self.sub_jobs[ii][j]))
211 spiga 1.94 njs += 1
212 ewv 1.128 common._db.updateRunJob_(listId, listRunField)
213 spiga 1.112 self.SendMLpost(self.sub_jobs[ii])
214    
215 spiga 1.94 else:
216     common.logger.message("The whole task doesn't found compatible site ")
217 ewv 1.92
218 spiga 1.112 return njs
219 spiga 1.99
220     def submissionError(self):
221     ## add some more verbose message in case submission is not complete
222     msg = 'Submission performed using the Requirements: \n'
223     ### TODO_ DS--BL
224     #msg += common.taskDB.dict("jobtype")+' version: '+common.taskDB.dict("codeVersion")+'\n'
225     #msg += '(Hint: please check if '+common.taskDB.dict("jobtype")+' is available at the Sites)\n'
226     if self.cfg_params.has_key('EDG.se_white_list'):
227     msg += 'SE White List: '+self.cfg_params['EDG.se_white_list']+'\n'
228     if self.cfg_params.has_key('EDG.se_black_list'):
229     msg += 'SE Black List: '+self.cfg_params['EDG.se_black_list']+'\n'
230     if self.cfg_params.has_key('EDG.ce_white_list'):
231     msg += 'CE White List: '+self.cfg_params['EDG.ce_white_list']+'\n'
232     if self.cfg_params.has_key('EDG.ce_black_list'):
233     msg += 'CE Black List: '+self.cfg_params['EDG.ce_black_list']+'\n'
234     msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\nPlease check if the dataset is available at this site!)\n'
235 spiga 1.112 common.logger.message(msg)
236    
237     return
238 spiga 1.99
239 spiga 1.112 def collect_MLInfo(self):
240     """
241     Preapre DashBoard information
242     """
243 ewv 1.92
244 spiga 1.112 taskId=str("_".join(common._db.queryTask('name').split('_')[:-1]))
245     gridName = string.strip(common.scheduler.userName())
246     common.logger.debug(5, "GRIDNAME: "+gridName)
247     taskType = 'analysis'
248 ewv 1.128 # version
249    
250 spiga 1.112 self.datasetPath = self.cfg_params['CMSSW.datasetpath']
251     if string.lower(self.datasetPath)=='none':
252     self.datasetPath = None
253     self.executable = self.cfg_params.get('CMSSW.executable','cmsRun')
254     VO = self.cfg_params.get('EDG.virtual_organization','cms')
255    
256     params = {'tool': common.prog_name,\
257     'JSToolVersion': common.prog_version_str, \
258 slacapra 1.122 'tool_ui': os.environ.get('HOSTNAME',''), \
259 spiga 1.112 'scheduler': common.scheduler.name(), \
260     'GridName': gridName, \
261     'taskType': taskType, \
262     'vo': VO, \
263 slacapra 1.122 'user': os.environ.get('USER',''), \
264 spiga 1.112 'taskId': taskId, \
265     'datasetFull': self.datasetPath, \
266 ewv 1.128 'exe': self.executable }
267 spiga 1.112
268     return params
269 ewv 1.128
270 spiga 1.112 def SendMLpre(self):
271     """
272 ewv 1.128 Send Pre info to ML
273 spiga 1.112 """
274     params = self.collect_MLInfo()
275 ewv 1.128
276 spiga 1.112 params['jobId'] ='TaskMeta'
277 ewv 1.128
278 spiga 1.112 common.apmon.sendToML(params)
279 ewv 1.128
280 spiga 1.112 common.logger.debug(5,'Submission DashBoard Pre-Submission report: '+str(params))
281 ewv 1.128
282 spiga 1.112 return
283 ewv 1.92
284 spiga 1.112 def SendMLpost(self,allList):
285     """
286 ewv 1.128 Send post-submission info to ML
287     """
288     task = common._db.getTask(allList)
289 spiga 1.112
290     params = {}
291     for k,v in self.collect_MLInfo().iteritems():
292     params[k] = v
293 ewv 1.128
294 spiga 1.118
295     taskId= str("_".join(str(task['name']).split('_')[:-1]))
296 ewv 1.128
297 spiga 1.112 Sub_Type = 'Direct'
298     for job in task.jobs:
299 ewv 1.128 jj = job['jobId']
300 spiga 1.112 jobId = ''
301     localId = ''
302 ewv 1.128 jid = str(job.runningJob['schedulerId'])
303 spiga 1.112 if common.scheduler.name().upper() == 'CONDOR_G':
304     rb = 'OSG'
305 ewv 1.128 taskHash = sha.new(common._db.queryTask('name')).hexdigest()
306     jobId = str(jj) + '_https://condorg/' + taskHash + '/' + str(jj)
307 spiga 1.112 common.logger.debug(5,'JobID for ML monitoring is created for CONDOR_G scheduler:'+jobId)
308 ewv 1.128 elif common.scheduler.name().upper() in ['LSF', 'CAF']:
309 spiga 1.118 jobId="https://"+common.scheduler.name()+":/"+jid+"-"+string.replace(str(taskId),"_","-")
310 spiga 1.112 common.logger.debug(5,'JobID for ML monitoring is created for LSF scheduler:'+jobId)
311     rb = common.scheduler.name()
312     localId = jid
313     else:
314     jobId = str(jj) + '_' + str(jid)
315     common.logger.debug(5,'JobID for ML monitoring is created for gLite scheduler'+jobId)
316     rb = str(job.runningJob['service'])
317 ewv 1.128
318     dlsDest = job['dlsDestination']
319 spiga 1.125 if len(dlsDest) == 1 :
320     T_SE=str(dlsDest[0])
321     elif len(dlsDest) == 2 :
322     T_SE=str(dlsDest[0])+','+str(dlsDest[1])
323 ewv 1.128 else :
324 spiga 1.112 T_SE=str(len(dlsDest))+'_Selected_SE'
325    
326    
327     infos = { 'jobId': jobId, \
328     'sid': jid, \
329     'broker': rb, \
330     'bossId': jj, \
331     'SubmissionType': Sub_Type, \
332     'TargetSE': T_SE, \
333     'localId' : localId}
334    
335     for k,v in infos.iteritems():
336     params[k] = v
337    
338     common.logger.debug(5,'Submission DashBoard report: '+str(params))
339     common.apmon.sendToML(params)
340 nsmirnov 1.1 return
341 spiga 1.112
342