ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Submitter.py
Revision: 1.112
Committed: Sun Apr 20 09:34:40 2008 UTC (17 years ago) by spiga
Content type: text/x-python
Branch: MAIN
Changes since 1.111: +160 -101 lines
Log Message:
 reimplemented dashboard communication sending again  both pre and post submssion infos, plus many Submitter code reorganization

File Contents

# User Rev Content
1 nsmirnov 1.1 from Actor import *
2 nsmirnov 1.6 from crab_util import *
3 nsmirnov 1.2 import common
4 corvo 1.9 from ApmonIf import ApmonIf
5 slacapra 1.60 #from random import random
6 corvo 1.30 import time
7 slacapra 1.60 from ProgressBar import ProgressBar
8     from TerminalController import TerminalController
9 nsmirnov 1.1
10     class Submitter(Actor):
11 slacapra 1.84 def __init__(self, cfg_params, parsed_range, val):
12 nsmirnov 1.1 self.cfg_params = cfg_params
13 slacapra 1.84
14     # get user request
15     nsjobs = -1
16     chosenJobsList = None
17     if val:
18 slacapra 1.91 if val=='range': # for Resubmitter
19     chosenJobsList = parsed_range
20 ewv 1.92 elif val=='all':
21 slacapra 1.84 pass
22     elif (type(eval(val)) is int) and eval(val) > 0:
23     # positive number
24     nsjobs = eval(val)
25     elif (type(eval(val)) is tuple)or( type(eval(val)) is int and eval(val)<0 ) :
26     chosenJobsList = parsed_range
27 ewv 1.92 nsjobs = len(chosenJobsList)
28 slacapra 1.84 else:
29     msg = 'Bad submission option <'+str(val)+'>\n'
30     msg += ' Must be an integer or "all"'
31     msg += ' Generic range is not allowed"'
32     raise CrabException(msg)
33     pass
34 ewv 1.92
35 slacapra 1.84 common.logger.debug(5,'nsjobs '+str(nsjobs))
36     # total jobs
37     nj_list = []
38     # get the first not already submitted
39 spiga 1.94 common.logger.debug(5,'Total jobs '+str(common._db.nJobs()))
40 slacapra 1.84 jobSetForSubmission = 0
41     jobSkippedInSubmission = []
42     datasetpath=self.cfg_params['CMSSW.datasetpath']
43 slacapra 1.97 if string.lower(datasetpath)=='none':
44 fanzago 1.104 datasetpath = None
45 spiga 1.98 tmp_jList = common._db.nJobs('list')
46 slacapra 1.84 if chosenJobsList != None:
47     tmp_jList = chosenJobsList
48     # build job list
49     from BlackWhiteListParser import BlackWhiteListParser
50     self.blackWhiteListParser = BlackWhiteListParser(self.cfg_params)
51 spiga 1.98 dlsDest=common._db.queryJob('dlsDestination',tmp_jList)
52     jStatus=common._db.queryRunJob('status',tmp_jList)
53     for nj in range(len(tmp_jList)):
54     cleanedBlackWhiteList = self.blackWhiteListParser.cleanForBlackWhiteList(dlsDest[nj])
55 slacapra 1.97 if (cleanedBlackWhiteList != '') or (datasetpath == None): ## Matty's fix
56 spiga 1.100 ##if ( jStatus[nj] not in ['R','S','K','Y','A','D','Z']): ## here the old flags
57 spiga 1.101 if ( jStatus[nj] not in ['SS','SU','SR','R','S','K','Y','A','D','Z']):
58     #nj_list.append(nj+1)## Warning added +1 for jobId BL--DS
59 slacapra 1.84 jobSetForSubmission +=1
60 spiga 1.100 nj_list.append(tmp_jList[nj])## Warning added +1 for jobId BL--DS
61 ewv 1.92 else:
62 slacapra 1.84 continue
63     else :
64 spiga 1.100 jobSkippedInSubmission.append(tmp_jList[nj])
65 slacapra 1.84 if nsjobs >0 and nsjobs == jobSetForSubmission:
66     break
67     pass
68 fanzago 1.104
69 slacapra 1.84 if nsjobs>jobSetForSubmission:
70     common.logger.message('asking to submit '+str(nsjobs)+' jobs, but only '+str(jobSetForSubmission)+' left: submitting those')
71     if len(jobSkippedInSubmission) > 0 :
72     mess =""
73     for jobs in jobSkippedInSubmission:
74     mess += str(jobs) + ","
75     common.logger.message("Jobs: " +str(mess) + "\n skipped because no sites are hosting this data\n")
76 slacapra 1.89 self.submissionError()
77     pass
78 slacapra 1.84 # submit N from last submitted job
79     common.logger.debug(5,'nj_list '+str(nj_list))
80 ewv 1.92
81 corvo 1.30
82 slacapra 1.84 self.nj_list = nj_list
83 nsmirnov 1.1 return
84 ewv 1.92
85 nsmirnov 1.1 def run(self):
86 nsmirnov 1.2 """
87 slacapra 1.53 The main method of the class: submit jobs in range self.nj_list
88 nsmirnov 1.2 """
89     common.logger.debug(5, "Submitter::run() called")
90 slacapra 1.24
91 spiga 1.112 start = time.time()
92    
93     check = self.checkIfCreate()
94    
95     if check == 0 :
96     self.SendMLpre()
97    
98     list_matched , task = self.performMatch()
99     njs = self.perfromSubmission(list_matched, task)
100    
101     stop = time.time()
102     common.logger.debug(1, "Submission Time: "+str(stop - start))
103     common.logger.write("Submission time :"+str(stop - start))
104    
105     msg = '\nTotal of %d jobs submitted'%njs
106     if njs != len(self.nj_list) :
107     msg += ' (from %d requested).'%(len(self.nj_list))
108     else:
109     msg += '.'
110     common.logger.message(msg)
111    
112     if (njs < len(self.nj_list) or len(self.nj_list)==0):
113     self.submissionError()
114    
115    
116     def checkIfCreate(self):
117     """
118     """
119     code = 0
120 spiga 1.94 totalCreatedJobs = 0
121 spiga 1.98 jList=common._db.nJobs('list')
122     st = common._db.queryRunJob('status',jList)
123     for nj in range(len(jList)):
124     if ( st[nj] in ['C','RC']):totalCreatedJobs +=1
125 slacapra 1.24 pass
126    
127     if (totalCreatedJobs==0):
128 spiga 1.112 common.logger.message("No jobs to be submitted: first create them")
129     code = 1
130     return code
131 ewv 1.92
132 gutsche 1.70
133 spiga 1.112 def performMatch(self):
134 spiga 1.94 ### define here the list of distinct destinations sites list
135     # distinct_dests = common._db.queryDistJob('dlsDestination')
136     distinct_dests = common._db.queryDistJob_Attr('dlsDestination', 'jobId' ,self.nj_list)
137    
138    
139     ### define here the list of jobs Id for each distinct list of sites
140 spiga 1.112 self.sub_jobs =[] # list of jobs Id list to submit
141 spiga 1.95 jobs_to_match =[] # list of jobs Id to match
142 spiga 1.94 all_jobs=[]
143     count=0
144     for distDest in distinct_dests:
145     all_jobs.append(common._db.queryAttrJob({'dlsDestination':distDest},'jobId'))
146     sub_jobs_temp=[]
147     for i in self.nj_list:
148 spiga 1.103 if i in all_jobs[count]: sub_jobs_temp.append(i)
149 spiga 1.94 if len(sub_jobs_temp)>0:
150 spiga 1.112 self.sub_jobs.append(sub_jobs_temp)
151     jobs_to_match.append(self.sub_jobs[count][0])
152 spiga 1.103 count +=1
153 spiga 1.94 sel=0
154     matched=[]
155 spiga 1.95
156     task=common._db.getTask()
157    
158     for id_job in jobs_to_match :
159 slacapra 1.110 match = common.scheduler.listMatch(distinct_dests[sel])
160 slacapra 1.111 if len(match)>0:
161     common.logger.message("Found "+str(len(match))+" compatible site(s) for job "+str(id_job))
162 slacapra 1.110 matched.append(sel)
163 spiga 1.77 else:
164 spiga 1.112 common.logger.message("No compatible site found, will not submit jobs "+str(self.sub_jobs[sel]))
165 slacapra 1.110 self.submissionError()
166 spiga 1.94 sel += 1
167 ewv 1.92
168 spiga 1.112 return matched , task
169    
170     def perfromSubmission(self,matched,task):
171    
172     njs=0
173    
174 spiga 1.94 ### Progress Bar indicator, deactivate for debug
175     if not common.logger.debugLevel() :
176 slacapra 1.110 term = TerminalController()
177 spiga 1.94
178     if len(matched)>0:
179     common.logger.message(str(len(matched))+" blocks of jobs will be submitted")
180     for ii in matched:
181 spiga 1.112 common.logger.debug(1,'Submitting jobs '+str(self.sub_jobs[ii]))
182    
183 slacapra 1.110 try:
184 spiga 1.112 common.scheduler.submit(self.sub_jobs[ii],task)
185 slacapra 1.110 except CrabException:
186     raise CrabException("Job not submitted")
187    
188 corvo 1.74 if not common.logger.debugLevel() :
189 spiga 1.112 try: pbar = ProgressBar(term, 'Submitting '+str(len(self.sub_jobs[ii]))+' jobs')
190 corvo 1.74 except: pbar = None
191 spiga 1.94 if not common.logger.debugLevel():
192     if pbar :
193 spiga 1.112 pbar.update(float(ii+1)/float(len(self.sub_jobs)),'please wait')
194 spiga 1.94 ### check the if the submission succeded Maybe not neede
195     if not common.logger.debugLevel():
196     if pbar :
197 spiga 1.112 pbar.update(float(ii+1)/float(len(self.sub_jobs)),'please wait')
198 ewv 1.92
199 spiga 1.94 ### check the if the submission succeded Maybe not needed or at least simplified
200 fanzago 1.104 #njs=0
201 spiga 1.112 sched_Id = common._db.queryRunJob('schedulerId', self.sub_jobs[ii])
202 spiga 1.95 listId=[]
203 spiga 1.94 run_jobToSave = {'status' :'S'}
204 spiga 1.108 listRunField = []
205 spiga 1.112 for j in range(len(self.sub_jobs[ii])): # Add loop over SID returned from group submission DS
206 spiga 1.102 if str(sched_Id[j]) != '':
207 spiga 1.94 #if (st[j]=='S'):
208 spiga 1.112 listId.append(self.sub_jobs[ii][j])
209 spiga 1.108 listRunField.append(run_jobToSave)
210 spiga 1.112 common.logger.debug(5,"Submitted job # "+ str(self.sub_jobs[ii][j]))
211 spiga 1.94 njs += 1
212 spiga 1.108 common._db.updateRunJob_(listId, listRunField) ## New BL--DS
213 spiga 1.112
214     self.SendMLpost(self.sub_jobs[ii])
215    
216 spiga 1.94 else:
217     common.logger.message("The whole task doesn't found compatible site ")
218 ewv 1.92
219 spiga 1.112 return njs
220 spiga 1.99
221     def submissionError(self):
222     ## add some more verbose message in case submission is not complete
223     msg = 'Submission performed using the Requirements: \n'
224     ### TODO_ DS--BL
225     #msg += common.taskDB.dict("jobtype")+' version: '+common.taskDB.dict("codeVersion")+'\n'
226     #msg += '(Hint: please check if '+common.taskDB.dict("jobtype")+' is available at the Sites)\n'
227     if self.cfg_params.has_key('EDG.se_white_list'):
228     msg += 'SE White List: '+self.cfg_params['EDG.se_white_list']+'\n'
229     if self.cfg_params.has_key('EDG.se_black_list'):
230     msg += 'SE Black List: '+self.cfg_params['EDG.se_black_list']+'\n'
231     if self.cfg_params.has_key('EDG.ce_white_list'):
232     msg += 'CE White List: '+self.cfg_params['EDG.ce_white_list']+'\n'
233     if self.cfg_params.has_key('EDG.ce_black_list'):
234     msg += 'CE Black List: '+self.cfg_params['EDG.ce_black_list']+'\n'
235     msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\nPlease check if the dataset is available at this site!)\n'
236 spiga 1.112 common.logger.message(msg)
237    
238     return
239 spiga 1.99
240 spiga 1.112 def collect_MLInfo(self):
241     """
242     Preapre DashBoard information
243     """
244 ewv 1.92
245 spiga 1.112 taskId=str("_".join(common._db.queryTask('name').split('_')[:-1]))
246     gridName = string.strip(common.scheduler.userName())
247     common.logger.debug(5, "GRIDNAME: "+gridName)
248     taskType = 'analysis'
249     # version
250    
251     self.datasetPath = self.cfg_params['CMSSW.datasetpath']
252     if string.lower(self.datasetPath)=='none':
253     self.datasetPath = None
254     self.executable = self.cfg_params.get('CMSSW.executable','cmsRun')
255     VO = self.cfg_params.get('EDG.virtual_organization','cms')
256    
257     params = {'tool': common.prog_name,\
258     'JSToolVersion': common.prog_version_str, \
259     'tool_ui': os.environ['HOSTNAME'], \
260     'scheduler': common.scheduler.name(), \
261     'GridName': gridName, \
262     'taskType': taskType, \
263     'vo': VO, \
264     'user': os.environ['USER'], \
265     'taskId': taskId, \
266     'datasetFull': self.datasetPath, \
267     #'application', version, \
268     'exe': self.executable }
269    
270     return params
271    
272     def SendMLpre(self):
273     """
274     Send Pre info to ML
275     """
276     params = self.collect_MLInfo()
277    
278     params['jobId'] ='TaskMeta'
279    
280     common.apmon.sendToML(params)
281    
282     common.logger.debug(5,'Submission DashBoard Pre-Submission report: '+str(params))
283    
284     return
285 ewv 1.92
286 spiga 1.112 def SendMLpost(self,allList):
287     """
288     Send post-submission info to ML
289     """
290     task = common._db.getTask(allList)
291    
292     params = {}
293     for k,v in self.collect_MLInfo().iteritems():
294     params[k] = v
295    
296     taskId=str("_".join(task['name']).split('_')[:-1])
297    
298     Sub_Type = 'Direct'
299     for job in task.jobs:
300     jj = job['id']
301     jobId = ''
302     localId = ''
303     jid = str(job.runningJob['schedulerId'])
304     if common.scheduler.name().upper() == 'CONDOR_G':
305     self.hash = makeCksum(common.work_space.cfgFileName())
306     rb = 'OSG'
307     jobId = str(jj) + '_' + self.hash + '_' + jid
308     common.logger.debug(5,'JobID for ML monitoring is created for CONDOR_G scheduler:'+jobId)
309     elif common.scheduler.name() in ['lsf', 'caf']:
310     jobId="https://"+common.scheduler.name()+":/"+jid+"-"+string.replace(taskId,"_","-")
311     common.logger.debug(5,'JobID for ML monitoring is created for LSF scheduler:'+jobId)
312     rb = common.scheduler.name()
313     localId = jid
314     else:
315     jobId = str(jj) + '_' + str(jid)
316     common.logger.debug(5,'JobID for ML monitoring is created for gLite scheduler'+jobId)
317     rb = str(job.runningJob['service'])
318    
319     dlsDest = eval(job['dlsDestination'])
320     if len(dlsDest) <= 2 :
321     T_SE=string.join(str(dlsDest),",")
322     else :
323     T_SE=str(len(dlsDest))+'_Selected_SE'
324    
325    
326     infos = { 'jobId': jobId, \
327     'sid': jid, \
328     'broker': rb, \
329     'bossId': jj, \
330     'SubmissionType': Sub_Type, \
331     'TargetSE': T_SE, \
332     'localId' : localId}
333    
334     for k,v in infos.iteritems():
335     params[k] = v
336    
337     common.logger.debug(5,'Submission DashBoard report: '+str(params))
338     common.apmon.sendToML(params)
339     print params
340 nsmirnov 1.1 return
341 spiga 1.112
342