ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Submitter.py
Revision: 1.134
Committed: Sun Oct 26 12:41:27 2008 UTC (16 years, 6 months ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_4_2_pre1
Changes since 1.133: +12 -14 lines
Log Message:
use BLite object once possible

File Contents

# User Rev Content
1 nsmirnov 1.1 from Actor import *
2 nsmirnov 1.6 from crab_util import *
3 nsmirnov 1.2 import common
4 corvo 1.9 from ApmonIf import ApmonIf
5 slacapra 1.60 #from random import random
6 corvo 1.30 import time
7 ewv 1.128 import sha
8 slacapra 1.60 from ProgressBar import ProgressBar
9     from TerminalController import TerminalController
10 nsmirnov 1.1
11     class Submitter(Actor):
12 slacapra 1.84 def __init__(self, cfg_params, parsed_range, val):
13 nsmirnov 1.1 self.cfg_params = cfg_params
14 slacapra 1.84
15     # get user request
16     nsjobs = -1
17     chosenJobsList = None
18     if val:
19 slacapra 1.91 if val=='range': # for Resubmitter
20     chosenJobsList = parsed_range
21 ewv 1.92 elif val=='all':
22 slacapra 1.84 pass
23     elif (type(eval(val)) is int) and eval(val) > 0:
24     # positive number
25     nsjobs = eval(val)
26     elif (type(eval(val)) is tuple)or( type(eval(val)) is int and eval(val)<0 ) :
27     chosenJobsList = parsed_range
28 ewv 1.92 nsjobs = len(chosenJobsList)
29 slacapra 1.84 else:
30     msg = 'Bad submission option <'+str(val)+'>\n'
31     msg += ' Must be an integer or "all"'
32     msg += ' Generic range is not allowed"'
33     raise CrabException(msg)
34     pass
35 ewv 1.92
36 slacapra 1.84 common.logger.debug(5,'nsjobs '+str(nsjobs))
37     # total jobs
38     nj_list = []
39     # get the first not already submitted
40 spiga 1.115 self.complete_List = common._db.nJobs('list')
41     common.logger.debug(5,'Total jobs '+str(len(self.complete_List)))
42 slacapra 1.84 jobSetForSubmission = 0
43     jobSkippedInSubmission = []
44     datasetpath=self.cfg_params['CMSSW.datasetpath']
45 slacapra 1.97 if string.lower(datasetpath)=='none':
46 fanzago 1.104 datasetpath = None
47 spiga 1.115 tmp_jList = self.complete_List
48 slacapra 1.84 if chosenJobsList != None:
49     tmp_jList = chosenJobsList
50     # build job list
51 ewv 1.133 from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
52     self.blackWhiteListParser = SEBlackWhiteListParser(self.cfg_params,common.logger)
53 spiga 1.134 for job in common._db.getTask(tmp_jList).jobs:
54     cleanedBlackWhiteList = self.blackWhiteListParser.cleanForBlackWhiteList(job['dlsDestination'])
55 ewv 1.128 if (cleanedBlackWhiteList != '') or (datasetpath == None):
56 spiga 1.134 if ( job.runningJob['status'] in ['C','RC'] and \
57     job.runningJob['statusScheduler'] in ['Created',None]):
58 slacapra 1.84 jobSetForSubmission +=1
59 spiga 1.134 nj_list.append(job['id'])
60 ewv 1.92 else:
61 slacapra 1.84 continue
62     else :
63 spiga 1.134 jobSkippedInSubmission.append( job['id'] )
64 slacapra 1.84 if nsjobs >0 and nsjobs == jobSetForSubmission:
65     break
66     pass
67     if nsjobs>jobSetForSubmission:
68 spiga 1.134 common.logger.message('asking to submit '+str(nsjobs)+' jobs, but only '+\
69     str(jobSetForSubmission)+' left: submitting those')
70 slacapra 1.84 if len(jobSkippedInSubmission) > 0 :
71     mess =""
72     for jobs in jobSkippedInSubmission:
73     mess += str(jobs) + ","
74     common.logger.message("Jobs: " +str(mess) + "\n skipped because no sites are hosting this data\n")
75 slacapra 1.89 self.submissionError()
76     pass
77 slacapra 1.84 # submit N from last submitted job
78     common.logger.debug(5,'nj_list '+str(nj_list))
79 ewv 1.92
80 corvo 1.30
81 slacapra 1.84 self.nj_list = nj_list
82 nsmirnov 1.1 return
83 ewv 1.92
84 nsmirnov 1.1 def run(self):
85 nsmirnov 1.2 """
86 slacapra 1.53 The main method of the class: submit jobs in range self.nj_list
87 nsmirnov 1.2 """
88     common.logger.debug(5, "Submitter::run() called")
89 slacapra 1.24
90 spiga 1.112 start = time.time()
91    
92 ewv 1.128 check = self.checkIfCreate()
93    
94 spiga 1.112 if check == 0 :
95     self.SendMLpre()
96 ewv 1.128
97     list_matched , task = self.performMatch()
98     njs = self.perfromSubmission(list_matched, task)
99    
100 spiga 1.112 stop = time.time()
101     common.logger.debug(1, "Submission Time: "+str(stop - start))
102     common.logger.write("Submission time :"+str(stop - start))
103 ewv 1.128
104 spiga 1.112 msg = '\nTotal of %d jobs submitted'%njs
105     if njs != len(self.nj_list) :
106     msg += ' (from %d requested).'%(len(self.nj_list))
107     else:
108     msg += '.'
109     common.logger.message(msg)
110 ewv 1.128
111 spiga 1.112 if (njs < len(self.nj_list) or len(self.nj_list)==0):
112     self.submissionError()
113    
114    
115 ewv 1.128 def checkIfCreate(self):
116 spiga 1.112 """
117     """
118     code = 0
119 spiga 1.94 totalCreatedJobs = 0
120 spiga 1.134 task=common._db.getTask()
121     for job in task.jobs:
122     if job.runningJob['status'] in ['C','RC'] \
123     and job.runningJob['statusScheduler'] == 'Created':totalCreatedJobs +=1
124 slacapra 1.24
125     if (totalCreatedJobs==0):
126 spiga 1.112 common.logger.message("No jobs to be submitted: first create them")
127 ewv 1.128 code = 1
128     return code
129 ewv 1.92
130 gutsche 1.70
131 ewv 1.128 def performMatch(self):
132     """
133 spiga 1.113 """
134 spiga 1.114 common.logger.message("Checking available resources...")
135 ewv 1.128 ### define here the list of distinct destinations sites list
136 spiga 1.94 distinct_dests = common._db.queryDistJob_Attr('dlsDestination', 'jobId' ,self.nj_list)
137    
138    
139     ### define here the list of jobs Id for each distinct list of sites
140 spiga 1.112 self.sub_jobs =[] # list of jobs Id list to submit
141 spiga 1.95 jobs_to_match =[] # list of jobs Id to match
142 ewv 1.128 all_jobs=[]
143 spiga 1.94 count=0
144 ewv 1.128 for distDest in distinct_dests:
145 spiga 1.94 all_jobs.append(common._db.queryAttrJob({'dlsDestination':distDest},'jobId'))
146     sub_jobs_temp=[]
147     for i in self.nj_list:
148 ewv 1.128 if i in all_jobs[count]: sub_jobs_temp.append(i)
149 spiga 1.94 if len(sub_jobs_temp)>0:
150 ewv 1.128 self.sub_jobs.append(sub_jobs_temp)
151 spiga 1.112 jobs_to_match.append(self.sub_jobs[count][0])
152 spiga 1.103 count +=1
153 spiga 1.94 sel=0
154 ewv 1.128 matched=[]
155 spiga 1.95
156     task=common._db.getTask()
157    
158     for id_job in jobs_to_match :
159 spiga 1.121 match = common.scheduler.listMatch(distinct_dests[sel], False)
160 slacapra 1.111 if len(match)>0:
161 spiga 1.121 common.logger.message("Found compatible site(s) for job "+str(id_job))
162 slacapra 1.110 matched.append(sel)
163 spiga 1.77 else:
164 spiga 1.112 common.logger.message("No compatible site found, will not submit jobs "+str(self.sub_jobs[sel]))
165 slacapra 1.110 self.submissionError()
166 spiga 1.94 sel += 1
167 ewv 1.92
168 ewv 1.128 return matched , task
169 spiga 1.112
170     def perfromSubmission(self,matched,task):
171    
172 ewv 1.128 njs=0
173    
174 spiga 1.94 ### Progress Bar indicator, deactivate for debug
175     if not common.logger.debugLevel() :
176 slacapra 1.110 term = TerminalController()
177 ewv 1.128
178     if len(matched)>0:
179 spiga 1.94 common.logger.message(str(len(matched))+" blocks of jobs will be submitted")
180 ewv 1.128 for ii in matched:
181 spiga 1.112 common.logger.debug(1,'Submitting jobs '+str(self.sub_jobs[ii]))
182    
183 slacapra 1.110 try:
184 spiga 1.112 common.scheduler.submit(self.sub_jobs[ii],task)
185 slacapra 1.110 except CrabException:
186     raise CrabException("Job not submitted")
187    
188 corvo 1.74 if not common.logger.debugLevel() :
189 spiga 1.112 try: pbar = ProgressBar(term, 'Submitting '+str(len(self.sub_jobs[ii]))+' jobs')
190 corvo 1.74 except: pbar = None
191 spiga 1.94 if not common.logger.debugLevel():
192     if pbar :
193 spiga 1.112 pbar.update(float(ii+1)/float(len(self.sub_jobs)),'please wait')
194 ewv 1.128 ### check the if the submission succeded Maybe not neede
195 spiga 1.94 if not common.logger.debugLevel():
196     if pbar :
197 spiga 1.112 pbar.update(float(ii+1)/float(len(self.sub_jobs)),'please wait')
198 ewv 1.92
199 ewv 1.128 ### check the if the submission succeded Maybe not needed or at least simplified
200 spiga 1.112 sched_Id = common._db.queryRunJob('schedulerId', self.sub_jobs[ii])
201 spiga 1.95 listId=[]
202 spiga 1.94 run_jobToSave = {'status' :'S'}
203 spiga 1.108 listRunField = []
204 ewv 1.128 for j in range(len(self.sub_jobs[ii])):
205     if str(sched_Id[j]) != '':
206     listId.append(self.sub_jobs[ii][j])
207     listRunField.append(run_jobToSave)
208 spiga 1.112 common.logger.debug(5,"Submitted job # "+ str(self.sub_jobs[ii][j]))
209 spiga 1.94 njs += 1
210 ewv 1.128 common._db.updateRunJob_(listId, listRunField)
211 spiga 1.112 self.SendMLpost(self.sub_jobs[ii])
212    
213 spiga 1.94 else:
214     common.logger.message("The whole task doesn't found compatible site ")
215 ewv 1.92
216 spiga 1.112 return njs
217 spiga 1.99
218     def submissionError(self):
219     ## add some more verbose message in case submission is not complete
220     msg = 'Submission performed using the Requirements: \n'
221     ### TODO_ DS--BL
222     #msg += common.taskDB.dict("jobtype")+' version: '+common.taskDB.dict("codeVersion")+'\n'
223     #msg += '(Hint: please check if '+common.taskDB.dict("jobtype")+' is available at the Sites)\n'
224     if self.cfg_params.has_key('EDG.se_white_list'):
225     msg += 'SE White List: '+self.cfg_params['EDG.se_white_list']+'\n'
226     if self.cfg_params.has_key('EDG.se_black_list'):
227     msg += 'SE Black List: '+self.cfg_params['EDG.se_black_list']+'\n'
228     if self.cfg_params.has_key('EDG.ce_white_list'):
229     msg += 'CE White List: '+self.cfg_params['EDG.ce_white_list']+'\n'
230     if self.cfg_params.has_key('EDG.ce_black_list'):
231     msg += 'CE Black List: '+self.cfg_params['EDG.ce_black_list']+'\n'
232     msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\nPlease check if the dataset is available at this site!)\n'
233 spiga 1.112 common.logger.message(msg)
234    
235     return
236 spiga 1.99
237 spiga 1.112 def collect_MLInfo(self):
238     """
239 ewv 1.129 Prepare DashBoard information
240 spiga 1.112 """
241 ewv 1.92
242 ewv 1.131 taskId = uniqueTaskName(common._db.queryTask('name'))
243 spiga 1.112 gridName = string.strip(common.scheduler.userName())
244     common.logger.debug(5, "GRIDNAME: "+gridName)
245     taskType = 'analysis'
246 ewv 1.128
247 spiga 1.112 self.datasetPath = self.cfg_params['CMSSW.datasetpath']
248     if string.lower(self.datasetPath)=='none':
249     self.datasetPath = None
250     self.executable = self.cfg_params.get('CMSSW.executable','cmsRun')
251     VO = self.cfg_params.get('EDG.virtual_organization','cms')
252    
253 ewv 1.129 params = {'tool': common.prog_name,
254     'JSToolVersion': common.prog_version_str,
255     'tool_ui': os.environ.get('HOSTNAME',''),
256     'scheduler': common.scheduler.name(),
257     'GridName': gridName,
258     'taskType': taskType,
259     'vo': VO,
260     'user': os.environ.get('USER',''),
261     'taskId': taskId,
262     'datasetFull': self.datasetPath,
263 ewv 1.128 'exe': self.executable }
264 spiga 1.112
265     return params
266 ewv 1.128
267 spiga 1.112 def SendMLpre(self):
268     """
269 ewv 1.128 Send Pre info to ML
270 spiga 1.112 """
271     params = self.collect_MLInfo()
272 ewv 1.128
273 spiga 1.112 params['jobId'] ='TaskMeta'
274 ewv 1.128
275 spiga 1.112 common.apmon.sendToML(params)
276 ewv 1.128
277 spiga 1.112 common.logger.debug(5,'Submission DashBoard Pre-Submission report: '+str(params))
278 ewv 1.128
279 spiga 1.112 return
280 ewv 1.92
281 spiga 1.112 def SendMLpost(self,allList):
282     """
283 ewv 1.128 Send post-submission info to ML
284     """
285     task = common._db.getTask(allList)
286 spiga 1.112
287     params = {}
288     for k,v in self.collect_MLInfo().iteritems():
289     params[k] = v
290 ewv 1.128
291 spiga 1.118
292 spiga 1.112 Sub_Type = 'Direct'
293     for job in task.jobs:
294 ewv 1.128 jj = job['jobId']
295 spiga 1.112 jobId = ''
296     localId = ''
297 ewv 1.128 jid = str(job.runningJob['schedulerId'])
298 ewv 1.130 if common.scheduler.name().upper() in ['CONDOR_G','GLIDEIN']:
299 spiga 1.112 rb = 'OSG'
300 ewv 1.128 taskHash = sha.new(common._db.queryTask('name')).hexdigest()
301 ewv 1.130 jobId = str(jj) + '_https://' + common.scheduler.name() + '/' + taskHash + '/' + str(jj)
302 spiga 1.112 common.logger.debug(5,'JobID for ML monitoring is created for CONDOR_G scheduler:'+jobId)
303 ewv 1.128 elif common.scheduler.name().upper() in ['LSF', 'CAF']:
304 spiga 1.132 jobId="https://"+common.scheduler.name()+":/"+jid+"-"+string.replace(str(task['name']),"_","-")
305 spiga 1.112 common.logger.debug(5,'JobID for ML monitoring is created for LSF scheduler:'+jobId)
306     rb = common.scheduler.name()
307     localId = jid
308     else:
309     jobId = str(jj) + '_' + str(jid)
310     common.logger.debug(5,'JobID for ML monitoring is created for gLite scheduler'+jobId)
311     rb = str(job.runningJob['service'])
312 ewv 1.128
313     dlsDest = job['dlsDestination']
314 spiga 1.125 if len(dlsDest) == 1 :
315     T_SE=str(dlsDest[0])
316     elif len(dlsDest) == 2 :
317     T_SE=str(dlsDest[0])+','+str(dlsDest[1])
318 ewv 1.128 else :
319 spiga 1.112 T_SE=str(len(dlsDest))+'_Selected_SE'
320    
321    
322     infos = { 'jobId': jobId, \
323     'sid': jid, \
324     'broker': rb, \
325     'bossId': jj, \
326     'SubmissionType': Sub_Type, \
327     'TargetSE': T_SE, \
328     'localId' : localId}
329    
330     for k,v in infos.iteritems():
331     params[k] = v
332    
333     common.logger.debug(5,'Submission DashBoard report: '+str(params))
334     common.apmon.sendToML(params)
335 nsmirnov 1.1 return
336 spiga 1.112
337