ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Submitter.py
Revision: 1.95
Committed: Mon Mar 17 14:00:59 2008 UTC (17 years, 1 month ago) by spiga
Content type: text/x-python
Branch: MAIN
Changes since 1.94: +20 -12 lines
Log Message:
Many changes to integrate BossLite. Creation step fully implemented and optimized... Submission is now working again. Here the missing things are the support for jobs submission by range, the message sending to ML, and the listmatch_match check. Actually the requirements can be changed on the fly as was in the past. The status is fully working with BossLite. The exit code display is not there since the new boss does not implement the RealTime mon.. here the functionality is under development by Federica: to be integrated.

File Contents

# User Rev Content
1 nsmirnov 1.1 from Actor import *
2 nsmirnov 1.6 from crab_util import *
3 nsmirnov 1.2 import common
4 corvo 1.9 from ApmonIf import ApmonIf
5 slacapra 1.60 #from random import random
6 corvo 1.30 import time
7 slacapra 1.60 from ProgressBar import ProgressBar
8     from TerminalController import TerminalController
9 nsmirnov 1.1
10     class Submitter(Actor):
11 slacapra 1.84 def __init__(self, cfg_params, parsed_range, val):
12 nsmirnov 1.1 self.cfg_params = cfg_params
13 slacapra 1.84
14     # get user request
15     nsjobs = -1
16     chosenJobsList = None
17     if val:
18 slacapra 1.91 if val=='range': # for Resubmitter
19     chosenJobsList = parsed_range
20 ewv 1.92 elif val=='all':
21 slacapra 1.84 pass
22     elif (type(eval(val)) is int) and eval(val) > 0:
23     # positive number
24     nsjobs = eval(val)
25     # NEW PART # Fabio
26     # put here code for LIST MANAGEMEN
27     elif (type(eval(val)) is tuple)or( type(eval(val)) is int and eval(val)<0 ) :
28     chosenJobsList = parsed_range
29     chosenJobsList = [i-1 for i in chosenJobsList ]
30 ewv 1.92 nsjobs = len(chosenJobsList)
31 slacapra 1.84 #
32     else:
33     msg = 'Bad submission option <'+str(val)+'>\n'
34     msg += ' Must be an integer or "all"'
35     msg += ' Generic range is not allowed"'
36     raise CrabException(msg)
37     pass
38 ewv 1.92
39 slacapra 1.84 common.logger.debug(5,'nsjobs '+str(nsjobs))
40     # total jobs
41     nj_list = []
42     # get the first not already submitted
43 spiga 1.94 common.logger.debug(5,'Total jobs '+str(common._db.nJobs()))
44 slacapra 1.84 jobSetForSubmission = 0
45     jobSkippedInSubmission = []
46     datasetpath=self.cfg_params['CMSSW.datasetpath']
47    
48     # NEW PART # Fabio
49     # modified to handle list of jobs by the users # Fabio
50 spiga 1.94 tmp_jList = range(common._db.nJobs())
51 slacapra 1.84 if chosenJobsList != None:
52     tmp_jList = chosenJobsList
53     # build job list
54     from BlackWhiteListParser import BlackWhiteListParser
55     self.blackWhiteListParser = BlackWhiteListParser(self.cfg_params)
56     for nj in tmp_jList:
57 spiga 1.94 jobs=[]
58     jobs.append(nj)
59     cleanedBlackWhiteList = self.blackWhiteListParser.cleanForBlackWhiteList(common._db.queryJob('dlsDestination',jobs)) # More readable # Fabio
60 slacapra 1.84 if (cleanedBlackWhiteList != '') or (datasetpath == "None" ) or (datasetpath == None): ## Matty's fix
61 spiga 1.94 if (common._db.queryRunJob('status',jobs) not in ['R','S','K','Y','A','D','Z']):
62 slacapra 1.84 jobSetForSubmission +=1
63 spiga 1.94 nj_list.append(nj+1)## Warning added +1 for jobId BL--DS
64 ewv 1.92 else:
65 slacapra 1.84 continue
66     else :
67     jobSkippedInSubmission.append(nj+1)
68     #
69     if nsjobs >0 and nsjobs == jobSetForSubmission:
70     break
71     pass
72     del tmp_jList
73     #
74    
75 spiga 1.94
76 slacapra 1.84 if nsjobs>jobSetForSubmission:
77     common.logger.message('asking to submit '+str(nsjobs)+' jobs, but only '+str(jobSetForSubmission)+' left: submitting those')
78     if len(jobSkippedInSubmission) > 0 :
79     #print jobSkippedInSubmission
80     #print spanRanges(jobSkippedInSubmission)
81     mess =""
82     for jobs in jobSkippedInSubmission:
83     mess += str(jobs) + ","
84     common.logger.message("Jobs: " +str(mess) + "\n skipped because no sites are hosting this data\n")
85 slacapra 1.89 self.submissionError()
86     pass
87 slacapra 1.84 # submit N from last submitted job
88     common.logger.debug(5,'nj_list '+str(nj_list))
89 ewv 1.92
90    
91 ewv 1.93 if common.scheduler.name().upper() == 'CONDOR_G':
92 gutsche 1.58 # create hash of cfg file
93     self.hash = makeCksum(common.work_space.cfgFileName())
94     else:
95     self.hash = ''
96 corvo 1.30
97 slacapra 1.84 self.nj_list = nj_list
98    
99     self.UseServer=int(self.cfg_params.get('CRAB.server_mode',0))
100 spiga 1.77
101 nsmirnov 1.1 return
102 ewv 1.92
103 nsmirnov 1.1 def run(self):
104 nsmirnov 1.2 """
105 slacapra 1.53 The main method of the class: submit jobs in range self.nj_list
106 nsmirnov 1.2 """
107     common.logger.debug(5, "Submitter::run() called")
108 slacapra 1.24
109 spiga 1.94 totalCreatedJobs = 0
110 corvo 1.33 start = time.time()
111 spiga 1.94 for nj in range(common._db.nJobs()):
112     jobs=[]
113     jobs.append(nj)
114     st = common._db.queryRunJob('status',jobs)[0]
115     if ( st in ['C','RC']):totalCreatedJobs +=1
116 slacapra 1.24 pass
117    
118     if (totalCreatedJobs==0):
119     common.logger.message("No jobs to be submitted: first create them")
120 slacapra 1.28 return
121 slacapra 1.60
122 gutsche 1.70 # submit pre DashBoard information
123     params = {'jobId':'TaskMeta'}
124 ewv 1.92
125 slacapra 1.83 fl = open(common.work_space.shareDir() + '/' + common.apmon.fName, 'r')
126 gutsche 1.70 for i in fl.readlines():
127 ewv 1.92 try:
128     key, val = i.split(':')
129     params[key] = string.strip(val)
130     except ValueError: # Not in the right format
131     pass
132 slacapra 1.85 fl.close()
133 gutsche 1.70
134     common.logger.debug(5,'Submission DashBoard Pre-Submission report: '+str(params))
135 ewv 1.92
136 slacapra 1.83 common.apmon.sendToML(params)
137 gutsche 1.70
138 spiga 1.94 ### define here the list of distinct destinations sites list
139     # distinct_dests = common._db.queryDistJob('dlsDestination')
140     distinct_dests = common._db.queryDistJob_Attr('dlsDestination', 'jobId' ,self.nj_list)
141    
142    
143     ### define here the list of jobs Id for each distinct list of sites
144     sub_jobs =[] # list of jobs Id list to submit
145 spiga 1.95 jobs_to_match =[] # list of jobs Id to match
146 spiga 1.94 all_jobs=[]
147     count=0
148     for distDest in distinct_dests:
149     all_jobs.append(common._db.queryAttrJob({'dlsDestination':distDest},'jobId'))
150     sub_jobs_temp=[]
151     for i in self.nj_list:
152     if i in all_jobs[0]: sub_jobs_temp.append(i)
153     if len(sub_jobs_temp)>0:
154     sub_jobs.append(sub_jobs_temp)
155 spiga 1.95 jobs_to_match.append(sub_jobs[count][0])
156 spiga 1.94 count +=1
157     sel=0
158     matched=[]
159 spiga 1.95 Requi=[]
160    
161     task=common._db.getTask()
162    
163     for id_job in jobs_to_match :
164     Requi.append(common.scheduler.sched_parameter(id_job,task))
165 spiga 1.94 if common.scheduler.name().upper() != "CONDOR_G" :
166     #match = common.scheduler.listMatch(id_job)
167     match = "1"
168     else :
169     match = "1"
170     if match:
171     common.logger.message("Found "+str(match)+" compatible site(s) for job "+str(id_job))
172     matched.append(sel)
173 spiga 1.77 else:
174 spiga 1.94 common.logger.message("No compatible site found, will not submit jobs "+str(sub_jobs[sel]))
175     self.submissionError()
176     sel += 1
177 ewv 1.92
178 spiga 1.94 ### Progress Bar indicator, deactivate for debug
179     if not common.logger.debugLevel() :
180 gutsche 1.65 term = TerminalController()
181 spiga 1.94
182     if len(matched)>0:
183     common.logger.message(str(len(matched))+" blocks of jobs will be submitted")
184     for ii in matched:
185     common.logger.debug(1,'Submitting jobs '+str(sub_jobs[ii]))
186 corvo 1.74 if not common.logger.debugLevel() :
187 spiga 1.94 try: pbar = ProgressBar(term, 'Submitting '+str(len(sub_jobs[ii]))+' jobs')
188 corvo 1.74 except: pbar = None
189 spiga 1.94 print sub_jobs[ii]
190 spiga 1.95 print Requi[ii]
191     req=str(common._db.queryTask('jobType'))
192     common.scheduler.submit(sub_jobs[ii],Requi[ii])
193 mcinquil 1.78
194 spiga 1.94 if not common.logger.debugLevel():
195     if pbar :
196     pbar.update(float(ii+1)/float(len(sub_jobs)),'please wait')
197     ### check the if the submission succeded Maybe not neede
198     if not common.logger.debugLevel():
199     if pbar :
200     pbar.update(float(ii+1)/float(len(sub_jobs)),'please wait')
201     ### check the if the submission succeded Maybe not neede
202     if not common.logger.debugLevel():
203     if pbar :
204     pbar.update(float(ii+1)/float(len(sub_jobs)),'please wait')
205     ### check the if the submission succeded Maybe not neede
206 corvo 1.74 if not common.logger.debugLevel():
207     if pbar :
208 spiga 1.94 pbar.update(float(ii+1)/float(len(sub_jobs)),'please wait')
209 ewv 1.92
210 spiga 1.94 ### check the if the submission succeded Maybe not needed or at least simplified
211     njs=0
212 spiga 1.95 task=common._db.getTask()
213     # jid = common._db.queryRunJob('schedulerId',sub_jobs[ii])
214     # st = common._db.queryRunJob('status',sub_jobs[ii])
215     listId=[]
216 spiga 1.94 run_jobToSave = {'status' :'S'}
217 spiga 1.95 for j in sub_jobs[ii]: # Add loop over SID returned from group submission DS
218     if task.jobs[j-1].runningJob['schedulerId'] != '':
219 spiga 1.94 #if (st[j]=='S'):
220 spiga 1.95 listId.append(j-1)
221     common.logger.debug(5,"Submitted job # "+ str(j))
222 spiga 1.94 njs += 1
223 spiga 1.95 common._db.updateRunJob_(listId, run_jobToSave ) ## New BL--DS
224 spiga 1.94 #
225     # ##### DashBoard report #####################
226     # Sub_Type = 'Direct'
227     #
228     # # OLI: JobID treatment, special for Condor-G scheduler
229     # jobId = ''
230     # localId = ''
231     # if common.scheduler.name().upper() == 'CONDOR_G':
232     # rb = 'OSG'
233     # jobId = str(jj) + '_' + self.hash + '_' + jid
234     # common.logger.debug(5,'JobID for ML monitoring is created for CONDOR_G scheduler:'+jobId)
235     # elif common.scheduler.name() == 'lsf' or common.scheduler.name() == 'caf':
236     # jobId="https://"+common.scheduler.name()+":/"+jid+"-"+string.replace(common.taskDB.dict('taskId'),"_","-")+"-"+str(jj)
237     # common.logger.debug(5,'JobID for ML monitoring is created for LSF scheduler:'+jobId)
238     # rb = common.scheduler.name()
239     # localId = jid
240     # else:
241     # jobId = str(jj) + '_' + jid
242     # common.logger.debug(5,'JobID for ML monitoring is created for EDG scheduler'+jobId)
243     # rb = jid.split(':')[1]
244     # rb = rb.replace('//', '')
245     #
246     # if len(common.jobDB.destination(tmpNj)) <= 2 :
247     # T_SE=string.join((common.jobDB.destination(tmpNj)),",")
248     # else :
249     # T_SE=str(len(common.jobDB.destination(tmpNj)))+'_Selected_SE'
250     #
251     # params = {'jobId': jobId, \
252     # 'sid': jid, \
253     # 'broker': rb, \
254     # 'bossId': jj, \
255     # 'SubmissionType': Sub_Type, \
256     # 'TargetSE': T_SE, \
257     # 'localId' : localId}
258     # common.logger.debug(5,str(params))
259     #
260     # fl = open(common.work_space.shareDir() + '/' + common.apmon.fName, 'r')
261     # for i in fl.readlines():
262     # key, val = i.split(':')
263     # params[key] = string.strip(val)
264     # fl.close()
265     #
266     # common.logger.debug(5,'Submission DashBoard report: '+str(params))
267     #
268     # common.apmon.sendToML(params)
269     # pass
270     # pass
271     #
272     else:
273     common.logger.message("The whole task doesn't found compatible site ")
274 ewv 1.92
275 corvo 1.30 stop = time.time()
276 gutsche 1.47 common.logger.debug(1, "Submission Time: "+str(stop - start))
277 slacapra 1.60 common.logger.write("Submission time :"+str(stop - start))
278 ewv 1.92
279 nsmirnov 1.6 msg = '\nTotal of %d jobs submitted'%njs
280     if njs != len(self.nj_list) :
281 spiga 1.94 msg += ' (from %d requested).\n'%(len(self.nj_list))
282 nsmirnov 1.6 pass
283     else:
284 spiga 1.94 msg += '.\n'
285 slacapra 1.89 if self.cfg_params.has_key('EDG.se_white_list'):
286     msg += 'SE White List: '+self.cfg_params['EDG.se_white_list']+'\n'
287     if self.cfg_params.has_key('EDG.se_black_list'):
288     msg += 'SE Black List: '+self.cfg_params['EDG.se_black_list']+'\n'
289     if self.cfg_params.has_key('EDG.ce_white_list'):
290     msg += 'CE White List: '+self.cfg_params['EDG.ce_white_list']+'\n'
291     if self.cfg_params.has_key('EDG.ce_black_list'):
292     msg += 'CE Black List: '+self.cfg_params['EDG.ce_black_list']+'\n'
293 slacapra 1.90 msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\nPlease check if the dataset is available at this site!)\n'
294 slacapra 1.89
295     common.logger.message(msg)
296 ewv 1.92
297    
298 nsmirnov 1.1 return