ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Submitter.py
Revision: 1.114
Committed: Sun Apr 20 12:04:23 2008 UTC (17 years ago) by spiga
Content type: text/x-python
Branch: MAIN
Changes since 1.113: +1 -1 lines
Log Message:
if inheritance is correctly implemented.. double range parsing is not neede... added some checks plus minor fixes

File Contents

# User Rev Content
1 nsmirnov 1.1 from Actor import *
2 nsmirnov 1.6 from crab_util import *
3 nsmirnov 1.2 import common
4 corvo 1.9 from ApmonIf import ApmonIf
5 slacapra 1.60 #from random import random
6 corvo 1.30 import time
7 slacapra 1.60 from ProgressBar import ProgressBar
8     from TerminalController import TerminalController
9 nsmirnov 1.1
10     class Submitter(Actor):
11 slacapra 1.84 def __init__(self, cfg_params, parsed_range, val):
12 nsmirnov 1.1 self.cfg_params = cfg_params
13 slacapra 1.84
14     # get user request
15     nsjobs = -1
16     chosenJobsList = None
17     if val:
18 slacapra 1.91 if val=='range': # for Resubmitter
19     chosenJobsList = parsed_range
20 ewv 1.92 elif val=='all':
21 slacapra 1.84 pass
22     elif (type(eval(val)) is int) and eval(val) > 0:
23     # positive number
24     nsjobs = eval(val)
25     elif (type(eval(val)) is tuple)or( type(eval(val)) is int and eval(val)<0 ) :
26     chosenJobsList = parsed_range
27 ewv 1.92 nsjobs = len(chosenJobsList)
28 slacapra 1.84 else:
29     msg = 'Bad submission option <'+str(val)+'>\n'
30     msg += ' Must be an integer or "all"'
31     msg += ' Generic range is not allowed"'
32     raise CrabException(msg)
33     pass
34 ewv 1.92
35 slacapra 1.84 common.logger.debug(5,'nsjobs '+str(nsjobs))
36     # total jobs
37     nj_list = []
38     # get the first not already submitted
39 spiga 1.94 common.logger.debug(5,'Total jobs '+str(common._db.nJobs()))
40 slacapra 1.84 jobSetForSubmission = 0
41     jobSkippedInSubmission = []
42     datasetpath=self.cfg_params['CMSSW.datasetpath']
43 slacapra 1.97 if string.lower(datasetpath)=='none':
44 fanzago 1.104 datasetpath = None
45 spiga 1.98 tmp_jList = common._db.nJobs('list')
46 slacapra 1.84 if chosenJobsList != None:
47     tmp_jList = chosenJobsList
48     # build job list
49     from BlackWhiteListParser import BlackWhiteListParser
50     self.blackWhiteListParser = BlackWhiteListParser(self.cfg_params)
51 spiga 1.98 dlsDest=common._db.queryJob('dlsDestination',tmp_jList)
52     jStatus=common._db.queryRunJob('status',tmp_jList)
53     for nj in range(len(tmp_jList)):
54     cleanedBlackWhiteList = self.blackWhiteListParser.cleanForBlackWhiteList(dlsDest[nj])
55 slacapra 1.97 if (cleanedBlackWhiteList != '') or (datasetpath == None): ## Matty's fix
56 spiga 1.100 ##if ( jStatus[nj] not in ['R','S','K','Y','A','D','Z']): ## here the old flags
57 spiga 1.101 if ( jStatus[nj] not in ['SS','SU','SR','R','S','K','Y','A','D','Z']):
58     #nj_list.append(nj+1)## Warning added +1 for jobId BL--DS
59 slacapra 1.84 jobSetForSubmission +=1
60 spiga 1.100 nj_list.append(tmp_jList[nj])## Warning added +1 for jobId BL--DS
61 ewv 1.92 else:
62 slacapra 1.84 continue
63     else :
64 spiga 1.100 jobSkippedInSubmission.append(tmp_jList[nj])
65 slacapra 1.84 if nsjobs >0 and nsjobs == jobSetForSubmission:
66     break
67     pass
68 fanzago 1.104
69 slacapra 1.84 if nsjobs>jobSetForSubmission:
70     common.logger.message('asking to submit '+str(nsjobs)+' jobs, but only '+str(jobSetForSubmission)+' left: submitting those')
71     if len(jobSkippedInSubmission) > 0 :
72     mess =""
73     for jobs in jobSkippedInSubmission:
74     mess += str(jobs) + ","
75     common.logger.message("Jobs: " +str(mess) + "\n skipped because no sites are hosting this data\n")
76 slacapra 1.89 self.submissionError()
77     pass
78 slacapra 1.84 # submit N from last submitted job
79     common.logger.debug(5,'nj_list '+str(nj_list))
80 ewv 1.92
81 corvo 1.30
82 slacapra 1.84 self.nj_list = nj_list
83 nsmirnov 1.1 return
84 ewv 1.92
85 nsmirnov 1.1 def run(self):
86 nsmirnov 1.2 """
87 slacapra 1.53 The main method of the class: submit jobs in range self.nj_list
88 nsmirnov 1.2 """
89     common.logger.debug(5, "Submitter::run() called")
90 slacapra 1.24
91 spiga 1.112 start = time.time()
92    
93     check = self.checkIfCreate()
94    
95     if check == 0 :
96     self.SendMLpre()
97    
98     list_matched , task = self.performMatch()
99     njs = self.perfromSubmission(list_matched, task)
100    
101     stop = time.time()
102     common.logger.debug(1, "Submission Time: "+str(stop - start))
103     common.logger.write("Submission time :"+str(stop - start))
104    
105     msg = '\nTotal of %d jobs submitted'%njs
106     if njs != len(self.nj_list) :
107     msg += ' (from %d requested).'%(len(self.nj_list))
108     else:
109     msg += '.'
110     common.logger.message(msg)
111    
112     if (njs < len(self.nj_list) or len(self.nj_list)==0):
113     self.submissionError()
114    
115    
116     def checkIfCreate(self):
117     """
118     """
119     code = 0
120 spiga 1.94 totalCreatedJobs = 0
121 spiga 1.98 jList=common._db.nJobs('list')
122     st = common._db.queryRunJob('status',jList)
123     for nj in range(len(jList)):
124     if ( st[nj] in ['C','RC']):totalCreatedJobs +=1
125 slacapra 1.24 pass
126    
127     if (totalCreatedJobs==0):
128 spiga 1.112 common.logger.message("No jobs to be submitted: first create them")
129     code = 1
130     return code
131 ewv 1.92
132 gutsche 1.70
133 spiga 1.112 def performMatch(self):
134 spiga 1.113 """
135     """
136 spiga 1.114 common.logger.message("Checking available resources...")
137 spiga 1.94 ### define here the list of distinct destinations sites list
138     # distinct_dests = common._db.queryDistJob('dlsDestination')
139     distinct_dests = common._db.queryDistJob_Attr('dlsDestination', 'jobId' ,self.nj_list)
140    
141    
142     ### define here the list of jobs Id for each distinct list of sites
143 spiga 1.112 self.sub_jobs =[] # list of jobs Id list to submit
144 spiga 1.95 jobs_to_match =[] # list of jobs Id to match
145 spiga 1.94 all_jobs=[]
146     count=0
147     for distDest in distinct_dests:
148     all_jobs.append(common._db.queryAttrJob({'dlsDestination':distDest},'jobId'))
149     sub_jobs_temp=[]
150     for i in self.nj_list:
151 spiga 1.103 if i in all_jobs[count]: sub_jobs_temp.append(i)
152 spiga 1.94 if len(sub_jobs_temp)>0:
153 spiga 1.112 self.sub_jobs.append(sub_jobs_temp)
154     jobs_to_match.append(self.sub_jobs[count][0])
155 spiga 1.103 count +=1
156 spiga 1.94 sel=0
157     matched=[]
158 spiga 1.95
159     task=common._db.getTask()
160    
161     for id_job in jobs_to_match :
162 slacapra 1.110 match = common.scheduler.listMatch(distinct_dests[sel])
163 slacapra 1.111 if len(match)>0:
164     common.logger.message("Found "+str(len(match))+" compatible site(s) for job "+str(id_job))
165 slacapra 1.110 matched.append(sel)
166 spiga 1.77 else:
167 spiga 1.112 common.logger.message("No compatible site found, will not submit jobs "+str(self.sub_jobs[sel]))
168 slacapra 1.110 self.submissionError()
169 spiga 1.94 sel += 1
170 ewv 1.92
171 spiga 1.112 return matched , task
172    
173     def perfromSubmission(self,matched,task):
174    
175     njs=0
176    
177 spiga 1.94 ### Progress Bar indicator, deactivate for debug
178     if not common.logger.debugLevel() :
179 slacapra 1.110 term = TerminalController()
180 spiga 1.94
181     if len(matched)>0:
182     common.logger.message(str(len(matched))+" blocks of jobs will be submitted")
183     for ii in matched:
184 spiga 1.112 common.logger.debug(1,'Submitting jobs '+str(self.sub_jobs[ii]))
185    
186 slacapra 1.110 try:
187 spiga 1.112 common.scheduler.submit(self.sub_jobs[ii],task)
188 slacapra 1.110 except CrabException:
189     raise CrabException("Job not submitted")
190    
191 corvo 1.74 if not common.logger.debugLevel() :
192 spiga 1.112 try: pbar = ProgressBar(term, 'Submitting '+str(len(self.sub_jobs[ii]))+' jobs')
193 corvo 1.74 except: pbar = None
194 spiga 1.94 if not common.logger.debugLevel():
195     if pbar :
196 spiga 1.112 pbar.update(float(ii+1)/float(len(self.sub_jobs)),'please wait')
197 spiga 1.94 ### check the if the submission succeded Maybe not neede
198     if not common.logger.debugLevel():
199     if pbar :
200 spiga 1.112 pbar.update(float(ii+1)/float(len(self.sub_jobs)),'please wait')
201 ewv 1.92
202 spiga 1.94 ### check the if the submission succeded Maybe not needed or at least simplified
203 fanzago 1.104 #njs=0
204 spiga 1.112 sched_Id = common._db.queryRunJob('schedulerId', self.sub_jobs[ii])
205 spiga 1.95 listId=[]
206 spiga 1.94 run_jobToSave = {'status' :'S'}
207 spiga 1.108 listRunField = []
208 spiga 1.112 for j in range(len(self.sub_jobs[ii])): # Add loop over SID returned from group submission DS
209 spiga 1.102 if str(sched_Id[j]) != '':
210 spiga 1.94 #if (st[j]=='S'):
211 spiga 1.112 listId.append(self.sub_jobs[ii][j])
212 spiga 1.108 listRunField.append(run_jobToSave)
213 spiga 1.112 common.logger.debug(5,"Submitted job # "+ str(self.sub_jobs[ii][j]))
214 spiga 1.94 njs += 1
215 spiga 1.108 common._db.updateRunJob_(listId, listRunField) ## New BL--DS
216 spiga 1.112
217     self.SendMLpost(self.sub_jobs[ii])
218    
219 spiga 1.94 else:
220     common.logger.message("The whole task doesn't found compatible site ")
221 ewv 1.92
222 spiga 1.112 return njs
223 spiga 1.99
224     def submissionError(self):
225     ## add some more verbose message in case submission is not complete
226     msg = 'Submission performed using the Requirements: \n'
227     ### TODO_ DS--BL
228     #msg += common.taskDB.dict("jobtype")+' version: '+common.taskDB.dict("codeVersion")+'\n'
229     #msg += '(Hint: please check if '+common.taskDB.dict("jobtype")+' is available at the Sites)\n'
230     if self.cfg_params.has_key('EDG.se_white_list'):
231     msg += 'SE White List: '+self.cfg_params['EDG.se_white_list']+'\n'
232     if self.cfg_params.has_key('EDG.se_black_list'):
233     msg += 'SE Black List: '+self.cfg_params['EDG.se_black_list']+'\n'
234     if self.cfg_params.has_key('EDG.ce_white_list'):
235     msg += 'CE White List: '+self.cfg_params['EDG.ce_white_list']+'\n'
236     if self.cfg_params.has_key('EDG.ce_black_list'):
237     msg += 'CE Black List: '+self.cfg_params['EDG.ce_black_list']+'\n'
238     msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\nPlease check if the dataset is available at this site!)\n'
239 spiga 1.112 common.logger.message(msg)
240    
241     return
242 spiga 1.99
243 spiga 1.112 def collect_MLInfo(self):
244     """
245     Preapre DashBoard information
246     """
247 ewv 1.92
248 spiga 1.112 taskId=str("_".join(common._db.queryTask('name').split('_')[:-1]))
249     gridName = string.strip(common.scheduler.userName())
250     common.logger.debug(5, "GRIDNAME: "+gridName)
251     taskType = 'analysis'
252     # version
253    
254     self.datasetPath = self.cfg_params['CMSSW.datasetpath']
255     if string.lower(self.datasetPath)=='none':
256     self.datasetPath = None
257     self.executable = self.cfg_params.get('CMSSW.executable','cmsRun')
258     VO = self.cfg_params.get('EDG.virtual_organization','cms')
259    
260     params = {'tool': common.prog_name,\
261     'JSToolVersion': common.prog_version_str, \
262     'tool_ui': os.environ['HOSTNAME'], \
263     'scheduler': common.scheduler.name(), \
264     'GridName': gridName, \
265     'taskType': taskType, \
266     'vo': VO, \
267     'user': os.environ['USER'], \
268     'taskId': taskId, \
269     'datasetFull': self.datasetPath, \
270     #'application', version, \
271     'exe': self.executable }
272    
273     return params
274    
275     def SendMLpre(self):
276     """
277     Send Pre info to ML
278     """
279     params = self.collect_MLInfo()
280    
281     params['jobId'] ='TaskMeta'
282    
283     common.apmon.sendToML(params)
284    
285     common.logger.debug(5,'Submission DashBoard Pre-Submission report: '+str(params))
286    
287     return
288 ewv 1.92
289 spiga 1.112 def SendMLpost(self,allList):
290     """
291     Send post-submission info to ML
292     """
293     task = common._db.getTask(allList)
294    
295     params = {}
296     for k,v in self.collect_MLInfo().iteritems():
297     params[k] = v
298    
299     taskId=str("_".join(task['name']).split('_')[:-1])
300    
301     Sub_Type = 'Direct'
302     for job in task.jobs:
303     jj = job['id']
304     jobId = ''
305     localId = ''
306     jid = str(job.runningJob['schedulerId'])
307     if common.scheduler.name().upper() == 'CONDOR_G':
308     self.hash = makeCksum(common.work_space.cfgFileName())
309     rb = 'OSG'
310     jobId = str(jj) + '_' + self.hash + '_' + jid
311     common.logger.debug(5,'JobID for ML monitoring is created for CONDOR_G scheduler:'+jobId)
312     elif common.scheduler.name() in ['lsf', 'caf']:
313     jobId="https://"+common.scheduler.name()+":/"+jid+"-"+string.replace(taskId,"_","-")
314     common.logger.debug(5,'JobID for ML monitoring is created for LSF scheduler:'+jobId)
315     rb = common.scheduler.name()
316     localId = jid
317     else:
318     jobId = str(jj) + '_' + str(jid)
319     common.logger.debug(5,'JobID for ML monitoring is created for gLite scheduler'+jobId)
320     rb = str(job.runningJob['service'])
321    
322     dlsDest = eval(job['dlsDestination'])
323     if len(dlsDest) <= 2 :
324     T_SE=string.join(str(dlsDest),",")
325     else :
326     T_SE=str(len(dlsDest))+'_Selected_SE'
327    
328    
329     infos = { 'jobId': jobId, \
330     'sid': jid, \
331     'broker': rb, \
332     'bossId': jj, \
333     'SubmissionType': Sub_Type, \
334     'TargetSE': T_SE, \
335     'localId' : localId}
336    
337     for k,v in infos.iteritems():
338     params[k] = v
339    
340     common.logger.debug(5,'Submission DashBoard report: '+str(params))
341     common.apmon.sendToML(params)
342 nsmirnov 1.1 return
343 spiga 1.112
344