ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Submitter.py
Revision: 1.109
Committed: Tue Apr 8 09:45:37 2008 UTC (17 years ago) by farinafa
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_2_0_pre5
Changes since 1.108: +3 -1 lines
Log Message:
Fix for listMatch params order

File Contents

# User Rev Content
1 nsmirnov 1.1 from Actor import *
2 nsmirnov 1.6 from crab_util import *
3 nsmirnov 1.2 import common
4 corvo 1.9 from ApmonIf import ApmonIf
5 slacapra 1.60 #from random import random
6 corvo 1.30 import time
7 slacapra 1.60 from ProgressBar import ProgressBar
8     from TerminalController import TerminalController
9 nsmirnov 1.1
10     class Submitter(Actor):
11 slacapra 1.84 def __init__(self, cfg_params, parsed_range, val):
12 nsmirnov 1.1 self.cfg_params = cfg_params
13 slacapra 1.84
14     # get user request
15     nsjobs = -1
16     chosenJobsList = None
17     if val:
18 slacapra 1.91 if val=='range': # for Resubmitter
19     chosenJobsList = parsed_range
20 ewv 1.92 elif val=='all':
21 slacapra 1.84 pass
22     elif (type(eval(val)) is int) and eval(val) > 0:
23     # positive number
24     nsjobs = eval(val)
25     elif (type(eval(val)) is tuple)or( type(eval(val)) is int and eval(val)<0 ) :
26     chosenJobsList = parsed_range
27 ewv 1.92 nsjobs = len(chosenJobsList)
28 slacapra 1.84 else:
29     msg = 'Bad submission option <'+str(val)+'>\n'
30     msg += ' Must be an integer or "all"'
31     msg += ' Generic range is not allowed"'
32     raise CrabException(msg)
33     pass
34 ewv 1.92
35 slacapra 1.84 common.logger.debug(5,'nsjobs '+str(nsjobs))
36     # total jobs
37     nj_list = []
38     # get the first not already submitted
39 spiga 1.94 common.logger.debug(5,'Total jobs '+str(common._db.nJobs()))
40 slacapra 1.84 jobSetForSubmission = 0
41     jobSkippedInSubmission = []
42     datasetpath=self.cfg_params['CMSSW.datasetpath']
43 slacapra 1.97 if string.lower(datasetpath)=='none':
44 fanzago 1.104 datasetpath = None
45 spiga 1.98 tmp_jList = common._db.nJobs('list')
46 slacapra 1.84 if chosenJobsList != None:
47     tmp_jList = chosenJobsList
48     # build job list
49     from BlackWhiteListParser import BlackWhiteListParser
50     self.blackWhiteListParser = BlackWhiteListParser(self.cfg_params)
51 spiga 1.98 dlsDest=common._db.queryJob('dlsDestination',tmp_jList)
52     jStatus=common._db.queryRunJob('status',tmp_jList)
53     for nj in range(len(tmp_jList)):
54     cleanedBlackWhiteList = self.blackWhiteListParser.cleanForBlackWhiteList(dlsDest[nj])
55 slacapra 1.97 if (cleanedBlackWhiteList != '') or (datasetpath == None): ## Matty's fix
56 spiga 1.100 ##if ( jStatus[nj] not in ['R','S','K','Y','A','D','Z']): ## here the old flags
57 spiga 1.101 if ( jStatus[nj] not in ['SS','SU','SR','R','S','K','Y','A','D','Z']):
58     #nj_list.append(nj+1)## Warning added +1 for jobId BL--DS
59 slacapra 1.84 jobSetForSubmission +=1
60 spiga 1.100 nj_list.append(tmp_jList[nj])## Warning added +1 for jobId BL--DS
61 ewv 1.92 else:
62 slacapra 1.84 continue
63     else :
64 spiga 1.100 jobSkippedInSubmission.append(tmp_jList[nj])
65 slacapra 1.84 if nsjobs >0 and nsjobs == jobSetForSubmission:
66     break
67     pass
68 fanzago 1.104
69 slacapra 1.84 if nsjobs>jobSetForSubmission:
70     common.logger.message('asking to submit '+str(nsjobs)+' jobs, but only '+str(jobSetForSubmission)+' left: submitting those')
71     if len(jobSkippedInSubmission) > 0 :
72     mess =""
73     for jobs in jobSkippedInSubmission:
74     mess += str(jobs) + ","
75     common.logger.message("Jobs: " +str(mess) + "\n skipped because no sites are hosting this data\n")
76 slacapra 1.89 self.submissionError()
77     pass
78 slacapra 1.84 # submit N from last submitted job
79     common.logger.debug(5,'nj_list '+str(nj_list))
80 ewv 1.92
81 ewv 1.93 if common.scheduler.name().upper() == 'CONDOR_G':
82 gutsche 1.58 # create hash of cfg file
83     self.hash = makeCksum(common.work_space.cfgFileName())
84     else:
85     self.hash = ''
86 corvo 1.30
87 slacapra 1.84 self.nj_list = nj_list
88 nsmirnov 1.1 return
89 ewv 1.92
90 nsmirnov 1.1 def run(self):
91 nsmirnov 1.2 """
92 slacapra 1.53 The main method of the class: submit jobs in range self.nj_list
93 nsmirnov 1.2 """
94     common.logger.debug(5, "Submitter::run() called")
95 slacapra 1.24
96 spiga 1.94 totalCreatedJobs = 0
97 corvo 1.33 start = time.time()
98 spiga 1.98 jList=common._db.nJobs('list')
99     st = common._db.queryRunJob('status',jList)
100     for nj in range(len(jList)):
101     if ( st[nj] in ['C','RC']):totalCreatedJobs +=1
102 slacapra 1.24 pass
103    
104     if (totalCreatedJobs==0):
105     common.logger.message("No jobs to be submitted: first create them")
106 slacapra 1.28 return
107 slacapra 1.60
108 gutsche 1.70 # submit pre DashBoard information
109     params = {'jobId':'TaskMeta'}
110 ewv 1.92
111 slacapra 1.83 fl = open(common.work_space.shareDir() + '/' + common.apmon.fName, 'r')
112 gutsche 1.70 for i in fl.readlines():
113 ewv 1.92 try:
114     key, val = i.split(':')
115     params[key] = string.strip(val)
116     except ValueError: # Not in the right format
117     pass
118 slacapra 1.85 fl.close()
119 gutsche 1.70
120     common.logger.debug(5,'Submission DashBoard Pre-Submission report: '+str(params))
121 ewv 1.92
122 slacapra 1.83 common.apmon.sendToML(params)
123 gutsche 1.70
124 spiga 1.94 ### define here the list of distinct destinations sites list
125     # distinct_dests = common._db.queryDistJob('dlsDestination')
126     distinct_dests = common._db.queryDistJob_Attr('dlsDestination', 'jobId' ,self.nj_list)
127    
128    
129     ### define here the list of jobs Id for each distinct list of sites
130     sub_jobs =[] # list of jobs Id list to submit
131 spiga 1.95 jobs_to_match =[] # list of jobs Id to match
132 spiga 1.94 all_jobs=[]
133     count=0
134 fanzago 1.104 njs=0
135 spiga 1.94 for distDest in distinct_dests:
136     all_jobs.append(common._db.queryAttrJob({'dlsDestination':distDest},'jobId'))
137     sub_jobs_temp=[]
138     for i in self.nj_list:
139 spiga 1.103 if i in all_jobs[count]: sub_jobs_temp.append(i)
140 spiga 1.94 if len(sub_jobs_temp)>0:
141     sub_jobs.append(sub_jobs_temp)
142 spiga 1.95 jobs_to_match.append(sub_jobs[count][0])
143 spiga 1.103 count +=1
144 spiga 1.94 sel=0
145     matched=[]
146 spiga 1.95 Requi=[]
147    
148     task=common._db.getTask()
149 spiga 1.106 ce_white_list=[]
150     ce_black_list=[]
151     tags=''
152 farinafa 1.109
153    
154 spiga 1.106 if common.scheduler.name().upper() != "CONDOR_G" :
155     ce_white_list=common.scheduler.ce_list()[1]
156     ce_black_list=common.scheduler.ce_list()[2]
157     tags_tmp=string.split(task['jobType'],'"')
158     tags=[str(tags_tmp[1]),str(tags_tmp[3])]
159 spiga 1.95
160     for id_job in jobs_to_match :
161     Requi.append(common.scheduler.sched_parameter(id_job,task))
162 spiga 1.94 if common.scheduler.name().upper() != "CONDOR_G" :
163 spiga 1.98 cleanedList=None
164     if len(distinct_dests[sel])!=0:cleanedList = self.blackWhiteListParser.cleanForBlackWhiteList(distinct_dests[sel],'list')
165 farinafa 1.109 match = common.scheduler.listMatch(tags,cleanedList,ce_black_list,ce_white_list)
166 spiga 1.94 else :
167     match = "1"
168     if match:
169     common.logger.message("Found "+str(match)+" compatible site(s) for job "+str(id_job))
170     matched.append(sel)
171 spiga 1.77 else:
172 spiga 1.94 common.logger.message("No compatible site found, will not submit jobs "+str(sub_jobs[sel]))
173     self.submissionError()
174     sel += 1
175 ewv 1.92
176 spiga 1.94 ### Progress Bar indicator, deactivate for debug
177     if not common.logger.debugLevel() :
178 gutsche 1.65 term = TerminalController()
179 spiga 1.94
180     if len(matched)>0:
181     common.logger.message(str(len(matched))+" blocks of jobs will be submitted")
182     for ii in matched:
183     common.logger.debug(1,'Submitting jobs '+str(sub_jobs[ii]))
184 spiga 1.100 common.scheduler.submit(sub_jobs[ii],Requi[ii])
185 corvo 1.74 if not common.logger.debugLevel() :
186 spiga 1.94 try: pbar = ProgressBar(term, 'Submitting '+str(len(sub_jobs[ii]))+' jobs')
187 corvo 1.74 except: pbar = None
188 spiga 1.94 if not common.logger.debugLevel():
189     if pbar :
190     pbar.update(float(ii+1)/float(len(sub_jobs)),'please wait')
191     ### check the if the submission succeded Maybe not neede
192     if not common.logger.debugLevel():
193     if pbar :
194     pbar.update(float(ii+1)/float(len(sub_jobs)),'please wait')
195     ### check the if the submission succeded Maybe not neede
196     if not common.logger.debugLevel():
197     if pbar :
198     pbar.update(float(ii+1)/float(len(sub_jobs)),'please wait')
199     ### check the if the submission succeded Maybe not neede
200 corvo 1.74 if not common.logger.debugLevel():
201     if pbar :
202 spiga 1.94 pbar.update(float(ii+1)/float(len(sub_jobs)),'please wait')
203 ewv 1.92
204 spiga 1.94 ### check the if the submission succeded Maybe not needed or at least simplified
205 fanzago 1.104 #njs=0
206 spiga 1.102 sched_Id = common._db.queryRunJob('schedulerId', sub_jobs[ii])
207 spiga 1.95 listId=[]
208 spiga 1.94 run_jobToSave = {'status' :'S'}
209 spiga 1.108 listRunField = []
210 spiga 1.102 for j in range(len(sub_jobs[ii])): # Add loop over SID returned from group submission DS
211     if str(sched_Id[j]) != '':
212 spiga 1.94 #if (st[j]=='S'):
213 spiga 1.102 listId.append(sub_jobs[ii][j])
214 spiga 1.108 listRunField.append(run_jobToSave)
215 spiga 1.102 common.logger.debug(5,"Submitted job # "+ str(sub_jobs[ii][j]))
216 spiga 1.94 njs += 1
217 spiga 1.108 common._db.updateRunJob_(listId, listRunField) ## New BL--DS
218 spiga 1.107 #
219     # ##### DashBoard report #####################
220     # Sub_Type = 'Direct'
221     #
222     # # OLI: JobID treatment, special for Condor-G scheduler
223     # jobId = ''
224     # localId = ''
225     # if common.scheduler.name().upper() == 'CONDOR_G':
226     # rb = 'OSG'
227     # jobId = str(jj) + '_' + self.hash + '_' + jid
228     # common.logger.debug(5,'JobID for ML monitoring is created for CONDOR_G scheduler:'+jobId)
229     # elif common.scheduler.name() == 'lsf' or common.scheduler.name() == 'caf':
230     # jobId="https://"+common.scheduler.name()+":/"+jid+"-"+string.replace(common.taskDB.dict('taskId'),"_","-")+"-"+str(jj)
231     # common.logger.debug(5,'JobID for ML monitoring is created for LSF scheduler:'+jobId)
232     # rb = common.scheduler.name()
233     # localId = jid
234     # else:
235     # jobId = str(jj) + '_' + jid
236     # common.logger.debug(5,'JobID for ML monitoring is created for EDG scheduler'+jobId)
237     # rb = jid.split(':')[1]
238     # rb = rb.replace('//', '')
239     #
240     # if len(common.jobDB.destination(tmpNj)) <= 2 :
241     # T_SE=string.join((common.jobDB.destination(tmpNj)),",")
242     # else :
243     # T_SE=str(len(common.jobDB.destination(tmpNj)))+'_Selected_SE'
244     #
245     # params = {'jobId': jobId, \
246     # 'sid': jid, \
247     # 'broker': rb, \
248     # 'bossId': jj, \
249     # 'SubmissionType': Sub_Type, \
250     # 'TargetSE': T_SE, \
251     # 'localId' : localId}
252     # common.logger.debug(5,str(params))
253     #
254     # fl = open(common.work_space.shareDir() + '/' + common.apmon.fName, 'r')
255     # for i in fl.readlines():
256     # key, val = i.split(':')
257     # params[key] = string.strip(val)
258     # fl.close()
259     #
260     # common.logger.debug(5,'Submission DashBoard report: '+str(params))
261     #
262     # common.apmon.sendToML(params)
263     # pass
264     # pass
265     #
266 spiga 1.94 else:
267     common.logger.message("The whole task doesn't found compatible site ")
268 ewv 1.92
269 corvo 1.30 stop = time.time()
270 gutsche 1.47 common.logger.debug(1, "Submission Time: "+str(stop - start))
271 slacapra 1.60 common.logger.write("Submission time :"+str(stop - start))
272 ewv 1.92
273 nsmirnov 1.6 msg = '\nTotal of %d jobs submitted'%njs
274     if njs != len(self.nj_list) :
275 spiga 1.99 msg += ' (from %d requested).'%(len(self.nj_list))
276 nsmirnov 1.6 else:
277 spiga 1.99 msg += '.'
278     common.logger.message(msg)
279    
280     if (njs < len(self.nj_list) or len(self.nj_list)==0):
281     self.submissionError()
282     return
283    
284     def submissionError(self):
285     ## add some more verbose message in case submission is not complete
286     msg = 'Submission performed using the Requirements: \n'
287     ### TODO_ DS--BL
288     #msg += common.taskDB.dict("jobtype")+' version: '+common.taskDB.dict("codeVersion")+'\n'
289     #msg += '(Hint: please check if '+common.taskDB.dict("jobtype")+' is available at the Sites)\n'
290     if self.cfg_params.has_key('EDG.se_white_list'):
291     msg += 'SE White List: '+self.cfg_params['EDG.se_white_list']+'\n'
292     if self.cfg_params.has_key('EDG.se_black_list'):
293     msg += 'SE Black List: '+self.cfg_params['EDG.se_black_list']+'\n'
294     if self.cfg_params.has_key('EDG.ce_white_list'):
295     msg += 'CE White List: '+self.cfg_params['EDG.ce_white_list']+'\n'
296     if self.cfg_params.has_key('EDG.ce_black_list'):
297     msg += 'CE Black List: '+self.cfg_params['EDG.ce_black_list']+'\n'
298     msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\nPlease check if the dataset is available at this site!)\n'
299    
300 slacapra 1.89 common.logger.message(msg)
301 ewv 1.92
302    
303 nsmirnov 1.1 return