ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Submitter.py
Revision: 1.118
Committed: Wed Apr 30 07:50:09 2008 UTC (17 years ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_2_0_pre18, CRAB_2_2_0_pre17, CRAB_2_2_0_pre16, CRAB_2_2_0_pre15
Changes since 1.117: +4 -3 lines
Log Message:
fix for info sent to the dashboard in case of local scheduler

File Contents

# User Rev Content
1 nsmirnov 1.1 from Actor import *
2 nsmirnov 1.6 from crab_util import *
3 nsmirnov 1.2 import common
4 corvo 1.9 from ApmonIf import ApmonIf
5 slacapra 1.60 #from random import random
6 corvo 1.30 import time
7 slacapra 1.60 from ProgressBar import ProgressBar
8     from TerminalController import TerminalController
9 nsmirnov 1.1
10     class Submitter(Actor):
11 slacapra 1.84 def __init__(self, cfg_params, parsed_range, val):
12 nsmirnov 1.1 self.cfg_params = cfg_params
13 slacapra 1.84
14     # get user request
15     nsjobs = -1
16     chosenJobsList = None
17     if val:
18 slacapra 1.91 if val=='range': # for Resubmitter
19     chosenJobsList = parsed_range
20 ewv 1.92 elif val=='all':
21 slacapra 1.84 pass
22     elif (type(eval(val)) is int) and eval(val) > 0:
23     # positive number
24     nsjobs = eval(val)
25     elif (type(eval(val)) is tuple)or( type(eval(val)) is int and eval(val)<0 ) :
26     chosenJobsList = parsed_range
27 ewv 1.92 nsjobs = len(chosenJobsList)
28 slacapra 1.84 else:
29     msg = 'Bad submission option <'+str(val)+'>\n'
30     msg += ' Must be an integer or "all"'
31     msg += ' Generic range is not allowed"'
32     raise CrabException(msg)
33     pass
34 ewv 1.92
35 slacapra 1.84 common.logger.debug(5,'nsjobs '+str(nsjobs))
36     # total jobs
37     nj_list = []
38     # get the first not already submitted
39 spiga 1.115 self.complete_List = common._db.nJobs('list')
40     common.logger.debug(5,'Total jobs '+str(len(self.complete_List)))
41 slacapra 1.84 jobSetForSubmission = 0
42     jobSkippedInSubmission = []
43     datasetpath=self.cfg_params['CMSSW.datasetpath']
44 slacapra 1.97 if string.lower(datasetpath)=='none':
45 fanzago 1.104 datasetpath = None
46 spiga 1.115 tmp_jList = self.complete_List
47 slacapra 1.84 if chosenJobsList != None:
48     tmp_jList = chosenJobsList
49     # build job list
50     from BlackWhiteListParser import BlackWhiteListParser
51     self.blackWhiteListParser = BlackWhiteListParser(self.cfg_params)
52 spiga 1.98 dlsDest=common._db.queryJob('dlsDestination',tmp_jList)
53     jStatus=common._db.queryRunJob('status',tmp_jList)
54     for nj in range(len(tmp_jList)):
55     cleanedBlackWhiteList = self.blackWhiteListParser.cleanForBlackWhiteList(dlsDest[nj])
56 slacapra 1.97 if (cleanedBlackWhiteList != '') or (datasetpath == None): ## Matty's fix
57 spiga 1.100 ##if ( jStatus[nj] not in ['R','S','K','Y','A','D','Z']): ## here the old flags
58 fanzago 1.117 if ( jStatus[nj] not in ['SS','SU','SR','R','S','K','Y','A','D','Z','E']):
59 spiga 1.101 #nj_list.append(nj+1)## Warning added +1 for jobId BL--DS
60 slacapra 1.84 jobSetForSubmission +=1
61 spiga 1.100 nj_list.append(tmp_jList[nj])## Warning added +1 for jobId BL--DS
62 ewv 1.92 else:
63 slacapra 1.84 continue
64     else :
65 spiga 1.100 jobSkippedInSubmission.append(tmp_jList[nj])
66 slacapra 1.84 if nsjobs >0 and nsjobs == jobSetForSubmission:
67     break
68     pass
69 fanzago 1.104
70 slacapra 1.84 if nsjobs>jobSetForSubmission:
71     common.logger.message('asking to submit '+str(nsjobs)+' jobs, but only '+str(jobSetForSubmission)+' left: submitting those')
72     if len(jobSkippedInSubmission) > 0 :
73     mess =""
74     for jobs in jobSkippedInSubmission:
75     mess += str(jobs) + ","
76     common.logger.message("Jobs: " +str(mess) + "\n skipped because no sites are hosting this data\n")
77 slacapra 1.89 self.submissionError()
78     pass
79 slacapra 1.84 # submit N from last submitted job
80     common.logger.debug(5,'nj_list '+str(nj_list))
81 ewv 1.92
82 corvo 1.30
83 slacapra 1.84 self.nj_list = nj_list
84 nsmirnov 1.1 return
85 ewv 1.92
86 nsmirnov 1.1 def run(self):
87 nsmirnov 1.2 """
88 slacapra 1.53 The main method of the class: submit jobs in range self.nj_list
89 nsmirnov 1.2 """
90     common.logger.debug(5, "Submitter::run() called")
91 slacapra 1.24
92 spiga 1.112 start = time.time()
93    
94     check = self.checkIfCreate()
95    
96     if check == 0 :
97     self.SendMLpre()
98    
99     list_matched , task = self.performMatch()
100     njs = self.perfromSubmission(list_matched, task)
101    
102     stop = time.time()
103     common.logger.debug(1, "Submission Time: "+str(stop - start))
104     common.logger.write("Submission time :"+str(stop - start))
105    
106     msg = '\nTotal of %d jobs submitted'%njs
107     if njs != len(self.nj_list) :
108     msg += ' (from %d requested).'%(len(self.nj_list))
109     else:
110     msg += '.'
111     common.logger.message(msg)
112    
113     if (njs < len(self.nj_list) or len(self.nj_list)==0):
114     self.submissionError()
115    
116    
117     def checkIfCreate(self):
118     """
119     """
120     code = 0
121 spiga 1.94 totalCreatedJobs = 0
122 spiga 1.98 jList=common._db.nJobs('list')
123     st = common._db.queryRunJob('status',jList)
124     for nj in range(len(jList)):
125     if ( st[nj] in ['C','RC']):totalCreatedJobs +=1
126 slacapra 1.24 pass
127    
128     if (totalCreatedJobs==0):
129 spiga 1.112 common.logger.message("No jobs to be submitted: first create them")
130     code = 1
131     return code
132 ewv 1.92
133 gutsche 1.70
134 spiga 1.112 def performMatch(self):
135 spiga 1.113 """
136     """
137 spiga 1.114 common.logger.message("Checking available resources...")
138 spiga 1.94 ### define here the list of distinct destinations sites list
139     # distinct_dests = common._db.queryDistJob('dlsDestination')
140     distinct_dests = common._db.queryDistJob_Attr('dlsDestination', 'jobId' ,self.nj_list)
141    
142    
143     ### define here the list of jobs Id for each distinct list of sites
144 spiga 1.112 self.sub_jobs =[] # list of jobs Id list to submit
145 spiga 1.95 jobs_to_match =[] # list of jobs Id to match
146 spiga 1.94 all_jobs=[]
147     count=0
148     for distDest in distinct_dests:
149     all_jobs.append(common._db.queryAttrJob({'dlsDestination':distDest},'jobId'))
150     sub_jobs_temp=[]
151     for i in self.nj_list:
152 spiga 1.103 if i in all_jobs[count]: sub_jobs_temp.append(i)
153 spiga 1.94 if len(sub_jobs_temp)>0:
154 spiga 1.112 self.sub_jobs.append(sub_jobs_temp)
155     jobs_to_match.append(self.sub_jobs[count][0])
156 spiga 1.103 count +=1
157 spiga 1.94 sel=0
158     matched=[]
159 spiga 1.95
160     task=common._db.getTask()
161    
162     for id_job in jobs_to_match :
163 slacapra 1.110 match = common.scheduler.listMatch(distinct_dests[sel])
164 slacapra 1.111 if len(match)>0:
165     common.logger.message("Found "+str(len(match))+" compatible site(s) for job "+str(id_job))
166 slacapra 1.110 matched.append(sel)
167 spiga 1.77 else:
168 spiga 1.112 common.logger.message("No compatible site found, will not submit jobs "+str(self.sub_jobs[sel]))
169 slacapra 1.110 self.submissionError()
170 spiga 1.94 sel += 1
171 ewv 1.92
172 spiga 1.112 return matched , task
173    
174     def perfromSubmission(self,matched,task):
175    
176     njs=0
177    
178 spiga 1.94 ### Progress Bar indicator, deactivate for debug
179     if not common.logger.debugLevel() :
180 slacapra 1.110 term = TerminalController()
181 spiga 1.94
182     if len(matched)>0:
183     common.logger.message(str(len(matched))+" blocks of jobs will be submitted")
184     for ii in matched:
185 spiga 1.112 common.logger.debug(1,'Submitting jobs '+str(self.sub_jobs[ii]))
186    
187 slacapra 1.110 try:
188 spiga 1.112 common.scheduler.submit(self.sub_jobs[ii],task)
189 slacapra 1.110 except CrabException:
190     raise CrabException("Job not submitted")
191    
192 corvo 1.74 if not common.logger.debugLevel() :
193 spiga 1.112 try: pbar = ProgressBar(term, 'Submitting '+str(len(self.sub_jobs[ii]))+' jobs')
194 corvo 1.74 except: pbar = None
195 spiga 1.94 if not common.logger.debugLevel():
196     if pbar :
197 spiga 1.112 pbar.update(float(ii+1)/float(len(self.sub_jobs)),'please wait')
198 spiga 1.94 ### check the if the submission succeded Maybe not neede
199     if not common.logger.debugLevel():
200     if pbar :
201 spiga 1.112 pbar.update(float(ii+1)/float(len(self.sub_jobs)),'please wait')
202 ewv 1.92
203 spiga 1.94 ### check the if the submission succeded Maybe not needed or at least simplified
204 fanzago 1.104 #njs=0
205 spiga 1.112 sched_Id = common._db.queryRunJob('schedulerId', self.sub_jobs[ii])
206 spiga 1.95 listId=[]
207 spiga 1.94 run_jobToSave = {'status' :'S'}
208 spiga 1.108 listRunField = []
209 spiga 1.112 for j in range(len(self.sub_jobs[ii])): # Add loop over SID returned from group submission DS
210 spiga 1.102 if str(sched_Id[j]) != '':
211 spiga 1.94 #if (st[j]=='S'):
212 spiga 1.112 listId.append(self.sub_jobs[ii][j])
213 spiga 1.108 listRunField.append(run_jobToSave)
214 spiga 1.112 common.logger.debug(5,"Submitted job # "+ str(self.sub_jobs[ii][j]))
215 spiga 1.94 njs += 1
216 spiga 1.108 common._db.updateRunJob_(listId, listRunField) ## New BL--DS
217 spiga 1.112
218     self.SendMLpost(self.sub_jobs[ii])
219    
220 spiga 1.94 else:
221     common.logger.message("The whole task doesn't found compatible site ")
222 ewv 1.92
223 spiga 1.112 return njs
224 spiga 1.99
225     def submissionError(self):
226     ## add some more verbose message in case submission is not complete
227     msg = 'Submission performed using the Requirements: \n'
228     ### TODO_ DS--BL
229     #msg += common.taskDB.dict("jobtype")+' version: '+common.taskDB.dict("codeVersion")+'\n'
230     #msg += '(Hint: please check if '+common.taskDB.dict("jobtype")+' is available at the Sites)\n'
231     if self.cfg_params.has_key('EDG.se_white_list'):
232     msg += 'SE White List: '+self.cfg_params['EDG.se_white_list']+'\n'
233     if self.cfg_params.has_key('EDG.se_black_list'):
234     msg += 'SE Black List: '+self.cfg_params['EDG.se_black_list']+'\n'
235     if self.cfg_params.has_key('EDG.ce_white_list'):
236     msg += 'CE White List: '+self.cfg_params['EDG.ce_white_list']+'\n'
237     if self.cfg_params.has_key('EDG.ce_black_list'):
238     msg += 'CE Black List: '+self.cfg_params['EDG.ce_black_list']+'\n'
239     msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\nPlease check if the dataset is available at this site!)\n'
240 spiga 1.112 common.logger.message(msg)
241    
242     return
243 spiga 1.99
244 spiga 1.112 def collect_MLInfo(self):
245     """
246     Preapre DashBoard information
247     """
248 ewv 1.92
249 spiga 1.112 taskId=str("_".join(common._db.queryTask('name').split('_')[:-1]))
250     gridName = string.strip(common.scheduler.userName())
251     common.logger.debug(5, "GRIDNAME: "+gridName)
252     taskType = 'analysis'
253     # version
254    
255     self.datasetPath = self.cfg_params['CMSSW.datasetpath']
256     if string.lower(self.datasetPath)=='none':
257     self.datasetPath = None
258     self.executable = self.cfg_params.get('CMSSW.executable','cmsRun')
259     VO = self.cfg_params.get('EDG.virtual_organization','cms')
260    
261     params = {'tool': common.prog_name,\
262     'JSToolVersion': common.prog_version_str, \
263     'tool_ui': os.environ['HOSTNAME'], \
264     'scheduler': common.scheduler.name(), \
265     'GridName': gridName, \
266     'taskType': taskType, \
267     'vo': VO, \
268     'user': os.environ['USER'], \
269     'taskId': taskId, \
270     'datasetFull': self.datasetPath, \
271     #'application', version, \
272     'exe': self.executable }
273    
274     return params
275    
276     def SendMLpre(self):
277     """
278     Send Pre info to ML
279     """
280     params = self.collect_MLInfo()
281    
282     params['jobId'] ='TaskMeta'
283    
284     common.apmon.sendToML(params)
285    
286     common.logger.debug(5,'Submission DashBoard Pre-Submission report: '+str(params))
287    
288     return
289 ewv 1.92
290 spiga 1.112 def SendMLpost(self,allList):
291     """
292     Send post-submission info to ML
293     """
294     task = common._db.getTask(allList)
295    
296     params = {}
297     for k,v in self.collect_MLInfo().iteritems():
298     params[k] = v
299 spiga 1.118
300    
301     taskId= str("_".join(str(task['name']).split('_')[:-1]))
302 spiga 1.112
303     Sub_Type = 'Direct'
304     for job in task.jobs:
305     jj = job['id']
306     jobId = ''
307     localId = ''
308     jid = str(job.runningJob['schedulerId'])
309     if common.scheduler.name().upper() == 'CONDOR_G':
310     self.hash = makeCksum(common.work_space.cfgFileName())
311     rb = 'OSG'
312     jobId = str(jj) + '_' + self.hash + '_' + jid
313     common.logger.debug(5,'JobID for ML monitoring is created for CONDOR_G scheduler:'+jobId)
314     elif common.scheduler.name() in ['lsf', 'caf']:
315 spiga 1.118 jobId="https://"+common.scheduler.name()+":/"+jid+"-"+string.replace(str(taskId),"_","-")
316 spiga 1.112 common.logger.debug(5,'JobID for ML monitoring is created for LSF scheduler:'+jobId)
317     rb = common.scheduler.name()
318     localId = jid
319     else:
320     jobId = str(jj) + '_' + str(jid)
321     common.logger.debug(5,'JobID for ML monitoring is created for gLite scheduler'+jobId)
322     rb = str(job.runningJob['service'])
323    
324 spiga 1.116 dlsDest = job['dlsDestination']
325 spiga 1.112 if len(dlsDest) <= 2 :
326     T_SE=string.join(str(dlsDest),",")
327     else :
328     T_SE=str(len(dlsDest))+'_Selected_SE'
329    
330    
331     infos = { 'jobId': jobId, \
332     'sid': jid, \
333     'broker': rb, \
334     'bossId': jj, \
335     'SubmissionType': Sub_Type, \
336     'TargetSE': T_SE, \
337     'localId' : localId}
338    
339     for k,v in infos.iteritems():
340     params[k] = v
341    
342     common.logger.debug(5,'Submission DashBoard report: '+str(params))
343     common.apmon.sendToML(params)
344 nsmirnov 1.1 return
345 spiga 1.112
346