ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Submitter.py
Revision: 1.103
Committed: Wed Mar 26 11:56:27 2008 UTC (17 years, 1 month ago) by spiga
Content type: text/x-python
Branch: MAIN
Changes since 1.102: +2 -2 lines
Log Message:
fix for blocks of job management

File Contents

# User Rev Content
1 nsmirnov 1.1 from Actor import *
2 nsmirnov 1.6 from crab_util import *
3 nsmirnov 1.2 import common
4 corvo 1.9 from ApmonIf import ApmonIf
5 slacapra 1.60 #from random import random
6 corvo 1.30 import time
7 slacapra 1.60 from ProgressBar import ProgressBar
8     from TerminalController import TerminalController
9 nsmirnov 1.1
10     class Submitter(Actor):
11 slacapra 1.84 def __init__(self, cfg_params, parsed_range, val):
12 nsmirnov 1.1 self.cfg_params = cfg_params
13 slacapra 1.84
14     # get user request
15     nsjobs = -1
16     chosenJobsList = None
17     if val:
18 slacapra 1.91 if val=='range': # for Resubmitter
19     chosenJobsList = parsed_range
20 ewv 1.92 elif val=='all':
21 slacapra 1.84 pass
22     elif (type(eval(val)) is int) and eval(val) > 0:
23     # positive number
24     nsjobs = eval(val)
25     elif (type(eval(val)) is tuple)or( type(eval(val)) is int and eval(val)<0 ) :
26     chosenJobsList = parsed_range
27 ewv 1.92 nsjobs = len(chosenJobsList)
28 slacapra 1.84 else:
29     msg = 'Bad submission option <'+str(val)+'>\n'
30     msg += ' Must be an integer or "all"'
31     msg += ' Generic range is not allowed"'
32     raise CrabException(msg)
33     pass
34 ewv 1.92
35 slacapra 1.84 common.logger.debug(5,'nsjobs '+str(nsjobs))
36     # total jobs
37     nj_list = []
38     # get the first not already submitted
39 spiga 1.94 common.logger.debug(5,'Total jobs '+str(common._db.nJobs()))
40 slacapra 1.84 jobSetForSubmission = 0
41     jobSkippedInSubmission = []
42     datasetpath=self.cfg_params['CMSSW.datasetpath']
43 slacapra 1.97 if string.lower(datasetpath)=='none':
44     datasetPath = None
45 slacapra 1.84
46 spiga 1.98 tmp_jList = common._db.nJobs('list')
47 slacapra 1.84 if chosenJobsList != None:
48     tmp_jList = chosenJobsList
49     # build job list
50     from BlackWhiteListParser import BlackWhiteListParser
51     self.blackWhiteListParser = BlackWhiteListParser(self.cfg_params)
52 spiga 1.98 dlsDest=common._db.queryJob('dlsDestination',tmp_jList)
53     jStatus=common._db.queryRunJob('status',tmp_jList)
54     for nj in range(len(tmp_jList)):
55     cleanedBlackWhiteList = self.blackWhiteListParser.cleanForBlackWhiteList(dlsDest[nj])
56 slacapra 1.97 if (cleanedBlackWhiteList != '') or (datasetpath == None): ## Matty's fix
57 spiga 1.100 ##if ( jStatus[nj] not in ['R','S','K','Y','A','D','Z']): ## here the old flags
58 spiga 1.101 if ( jStatus[nj] not in ['SS','SU','SR','R','S','K','Y','A','D','Z']):
59     #nj_list.append(nj+1)## Warning added +1 for jobId BL--DS
60 slacapra 1.84 jobSetForSubmission +=1
61 spiga 1.100 nj_list.append(tmp_jList[nj])## Warning added +1 for jobId BL--DS
62 ewv 1.92 else:
63 slacapra 1.84 continue
64     else :
65 spiga 1.100 jobSkippedInSubmission.append(tmp_jList[nj])
66 slacapra 1.84 if nsjobs >0 and nsjobs == jobSetForSubmission:
67     break
68     pass
69 spiga 1.94
70 slacapra 1.84 if nsjobs>jobSetForSubmission:
71     common.logger.message('asking to submit '+str(nsjobs)+' jobs, but only '+str(jobSetForSubmission)+' left: submitting those')
72     if len(jobSkippedInSubmission) > 0 :
73     mess =""
74     for jobs in jobSkippedInSubmission:
75     mess += str(jobs) + ","
76     common.logger.message("Jobs: " +str(mess) + "\n skipped because no sites are hosting this data\n")
77 slacapra 1.89 self.submissionError()
78     pass
79 slacapra 1.84 # submit N from last submitted job
80     common.logger.debug(5,'nj_list '+str(nj_list))
81 ewv 1.92
82 ewv 1.93 if common.scheduler.name().upper() == 'CONDOR_G':
83 gutsche 1.58 # create hash of cfg file
84     self.hash = makeCksum(common.work_space.cfgFileName())
85     else:
86     self.hash = ''
87 corvo 1.30
88 slacapra 1.84 self.nj_list = nj_list
89 nsmirnov 1.1 return
90 ewv 1.92
91 nsmirnov 1.1 def run(self):
92 nsmirnov 1.2 """
93 slacapra 1.53 The main method of the class: submit jobs in range self.nj_list
94 nsmirnov 1.2 """
95     common.logger.debug(5, "Submitter::run() called")
96 slacapra 1.24
97 spiga 1.94 totalCreatedJobs = 0
98 corvo 1.33 start = time.time()
99 spiga 1.98 jList=common._db.nJobs('list')
100     st = common._db.queryRunJob('status',jList)
101     for nj in range(len(jList)):
102     if ( st[nj] in ['C','RC']):totalCreatedJobs +=1
103 slacapra 1.24 pass
104    
105     if (totalCreatedJobs==0):
106     common.logger.message("No jobs to be submitted: first create them")
107 slacapra 1.28 return
108 slacapra 1.60
109 gutsche 1.70 # submit pre DashBoard information
110     params = {'jobId':'TaskMeta'}
111 ewv 1.92
112 slacapra 1.83 fl = open(common.work_space.shareDir() + '/' + common.apmon.fName, 'r')
113 gutsche 1.70 for i in fl.readlines():
114 ewv 1.92 try:
115     key, val = i.split(':')
116     params[key] = string.strip(val)
117     except ValueError: # Not in the right format
118     pass
119 slacapra 1.85 fl.close()
120 gutsche 1.70
121     common.logger.debug(5,'Submission DashBoard Pre-Submission report: '+str(params))
122 ewv 1.92
123 slacapra 1.83 common.apmon.sendToML(params)
124 gutsche 1.70
125 spiga 1.94 ### define here the list of distinct destinations sites list
126     # distinct_dests = common._db.queryDistJob('dlsDestination')
127     distinct_dests = common._db.queryDistJob_Attr('dlsDestination', 'jobId' ,self.nj_list)
128    
129    
130     ### define here the list of jobs Id for each distinct list of sites
131     sub_jobs =[] # list of jobs Id list to submit
132 spiga 1.95 jobs_to_match =[] # list of jobs Id to match
133 spiga 1.94 all_jobs=[]
134     count=0
135     for distDest in distinct_dests:
136     all_jobs.append(common._db.queryAttrJob({'dlsDestination':distDest},'jobId'))
137     sub_jobs_temp=[]
138     for i in self.nj_list:
139 spiga 1.103 if i in all_jobs[count]: sub_jobs_temp.append(i)
140 spiga 1.94 if len(sub_jobs_temp)>0:
141     sub_jobs.append(sub_jobs_temp)
142 spiga 1.95 jobs_to_match.append(sub_jobs[count][0])
143 spiga 1.103 count +=1
144 spiga 1.94 sel=0
145     matched=[]
146 spiga 1.95 Requi=[]
147    
148     task=common._db.getTask()
149 spiga 1.98 ce_white_list=common.scheduler.ce_list()[1]
150     ce_black_list=common.scheduler.ce_list()[2]
151     tags_tmp=string.split(task['jobType'],'"')
152     tags=[str(tags_tmp[1]),str(tags_tmp[3])]
153 spiga 1.95
154     for id_job in jobs_to_match :
155     Requi.append(common.scheduler.sched_parameter(id_job,task))
156 spiga 1.94 if common.scheduler.name().upper() != "CONDOR_G" :
157 spiga 1.98 cleanedList=None
158     if len(distinct_dests[sel])!=0:cleanedList = self.blackWhiteListParser.cleanForBlackWhiteList(distinct_dests[sel],'list')
159     match = common.scheduler.listMatch(tags,cleanedList,ce_white_list,ce_black_list)
160 spiga 1.94 else :
161     match = "1"
162     if match:
163     common.logger.message("Found "+str(match)+" compatible site(s) for job "+str(id_job))
164     matched.append(sel)
165 spiga 1.77 else:
166 spiga 1.94 common.logger.message("No compatible site found, will not submit jobs "+str(sub_jobs[sel]))
167     self.submissionError()
168     sel += 1
169 ewv 1.92
170 spiga 1.94 ### Progress Bar indicator, deactivate for debug
171     if not common.logger.debugLevel() :
172 gutsche 1.65 term = TerminalController()
173 spiga 1.94
174     if len(matched)>0:
175     common.logger.message(str(len(matched))+" blocks of jobs will be submitted")
176     for ii in matched:
177     common.logger.debug(1,'Submitting jobs '+str(sub_jobs[ii]))
178 spiga 1.100 common.scheduler.submit(sub_jobs[ii],Requi[ii])
179 corvo 1.74 if not common.logger.debugLevel() :
180 spiga 1.94 try: pbar = ProgressBar(term, 'Submitting '+str(len(sub_jobs[ii]))+' jobs')
181 corvo 1.74 except: pbar = None
182 spiga 1.94 if not common.logger.debugLevel():
183     if pbar :
184     pbar.update(float(ii+1)/float(len(sub_jobs)),'please wait')
185     ### check the if the submission succeded Maybe not neede
186     if not common.logger.debugLevel():
187     if pbar :
188     pbar.update(float(ii+1)/float(len(sub_jobs)),'please wait')
189     ### check the if the submission succeded Maybe not neede
190     if not common.logger.debugLevel():
191     if pbar :
192     pbar.update(float(ii+1)/float(len(sub_jobs)),'please wait')
193     ### check the if the submission succeded Maybe not neede
194 corvo 1.74 if not common.logger.debugLevel():
195     if pbar :
196 spiga 1.94 pbar.update(float(ii+1)/float(len(sub_jobs)),'please wait')
197 ewv 1.92
198 spiga 1.94 ### check the if the submission succeded Maybe not needed or at least simplified
199     njs=0
200 spiga 1.102 sched_Id = common._db.queryRunJob('schedulerId', sub_jobs[ii])
201 spiga 1.95 listId=[]
202 spiga 1.94 run_jobToSave = {'status' :'S'}
203 spiga 1.102 for j in range(len(sub_jobs[ii])): # Add loop over SID returned from group submission DS
204     if str(sched_Id[j]) != '':
205 spiga 1.94 #if (st[j]=='S'):
206 spiga 1.102 listId.append(sub_jobs[ii][j])
207     common.logger.debug(5,"Submitted job # "+ str(sub_jobs[ii][j]))
208 spiga 1.94 njs += 1
209 spiga 1.95 common._db.updateRunJob_(listId, run_jobToSave ) ## New BL--DS
210 spiga 1.94 #
211     # ##### DashBoard report #####################
212     # Sub_Type = 'Direct'
213     #
214     # # OLI: JobID treatment, special for Condor-G scheduler
215     # jobId = ''
216     # localId = ''
217     # if common.scheduler.name().upper() == 'CONDOR_G':
218     # rb = 'OSG'
219     # jobId = str(jj) + '_' + self.hash + '_' + jid
220     # common.logger.debug(5,'JobID for ML monitoring is created for CONDOR_G scheduler:'+jobId)
221     # elif common.scheduler.name() == 'lsf' or common.scheduler.name() == 'caf':
222     # jobId="https://"+common.scheduler.name()+":/"+jid+"-"+string.replace(common.taskDB.dict('taskId'),"_","-")+"-"+str(jj)
223     # common.logger.debug(5,'JobID for ML monitoring is created for LSF scheduler:'+jobId)
224     # rb = common.scheduler.name()
225     # localId = jid
226     # else:
227     # jobId = str(jj) + '_' + jid
228     # common.logger.debug(5,'JobID for ML monitoring is created for EDG scheduler'+jobId)
229     # rb = jid.split(':')[1]
230     # rb = rb.replace('//', '')
231     #
232     # if len(common.jobDB.destination(tmpNj)) <= 2 :
233     # T_SE=string.join((common.jobDB.destination(tmpNj)),",")
234     # else :
235     # T_SE=str(len(common.jobDB.destination(tmpNj)))+'_Selected_SE'
236     #
237     # params = {'jobId': jobId, \
238     # 'sid': jid, \
239     # 'broker': rb, \
240     # 'bossId': jj, \
241     # 'SubmissionType': Sub_Type, \
242     # 'TargetSE': T_SE, \
243     # 'localId' : localId}
244     # common.logger.debug(5,str(params))
245     #
246     # fl = open(common.work_space.shareDir() + '/' + common.apmon.fName, 'r')
247     # for i in fl.readlines():
248     # key, val = i.split(':')
249     # params[key] = string.strip(val)
250     # fl.close()
251     #
252     # common.logger.debug(5,'Submission DashBoard report: '+str(params))
253     #
254     # common.apmon.sendToML(params)
255     # pass
256     # pass
257     #
258     else:
259     common.logger.message("The whole task doesn't found compatible site ")
260 ewv 1.92
261 corvo 1.30 stop = time.time()
262 gutsche 1.47 common.logger.debug(1, "Submission Time: "+str(stop - start))
263 slacapra 1.60 common.logger.write("Submission time :"+str(stop - start))
264 ewv 1.92
265 nsmirnov 1.6 msg = '\nTotal of %d jobs submitted'%njs
266     if njs != len(self.nj_list) :
267 spiga 1.99 msg += ' (from %d requested).'%(len(self.nj_list))
268 nsmirnov 1.6 else:
269 spiga 1.99 msg += '.'
270     common.logger.message(msg)
271    
272     if (njs < len(self.nj_list) or len(self.nj_list)==0):
273     self.submissionError()
274     return
275    
276     def submissionError(self):
277     ## add some more verbose message in case submission is not complete
278     msg = 'Submission performed using the Requirements: \n'
279     ### TODO_ DS--BL
280     #msg += common.taskDB.dict("jobtype")+' version: '+common.taskDB.dict("codeVersion")+'\n'
281     #msg += '(Hint: please check if '+common.taskDB.dict("jobtype")+' is available at the Sites)\n'
282     if self.cfg_params.has_key('EDG.se_white_list'):
283     msg += 'SE White List: '+self.cfg_params['EDG.se_white_list']+'\n'
284     if self.cfg_params.has_key('EDG.se_black_list'):
285     msg += 'SE Black List: '+self.cfg_params['EDG.se_black_list']+'\n'
286     if self.cfg_params.has_key('EDG.ce_white_list'):
287     msg += 'CE White List: '+self.cfg_params['EDG.ce_white_list']+'\n'
288     if self.cfg_params.has_key('EDG.ce_black_list'):
289     msg += 'CE Black List: '+self.cfg_params['EDG.ce_black_list']+'\n'
290     msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\nPlease check if the dataset is available at this site!)\n'
291    
292 slacapra 1.89 common.logger.message(msg)
293 ewv 1.92
294    
295 nsmirnov 1.1 return