ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Submitter.py
Revision: 1.94
Committed: Fri Mar 7 09:27:51 2008 UTC (17 years, 1 month ago) by spiga
Content type: text/x-python
Branch: MAIN
Changes since 1.93: +134 -185 lines
Log Message:
 First version which submit jobs using blite. Not yet fully working. Minor problem with jdl arguments. Submission dose not take range yet

File Contents

# User Rev Content
1 nsmirnov 1.1 from Actor import *
2 nsmirnov 1.6 from crab_util import *
3 nsmirnov 1.2 import common
4 corvo 1.9 from ApmonIf import ApmonIf
5 slacapra 1.60 #from random import random
6 corvo 1.30 import time
7 slacapra 1.60 from ProgressBar import ProgressBar
8     from TerminalController import TerminalController
9 nsmirnov 1.1
10     class Submitter(Actor):
11 slacapra 1.84 def __init__(self, cfg_params, parsed_range, val):
12 nsmirnov 1.1 self.cfg_params = cfg_params
13 slacapra 1.84
14     # get user request
15     nsjobs = -1
16     chosenJobsList = None
17     if val:
18 slacapra 1.91 if val=='range': # for Resubmitter
19     chosenJobsList = parsed_range
20 ewv 1.92 elif val=='all':
21 slacapra 1.84 pass
22     elif (type(eval(val)) is int) and eval(val) > 0:
23     # positive number
24     nsjobs = eval(val)
25     # NEW PART # Fabio
26     # put here code for LIST MANAGEMEN
27     elif (type(eval(val)) is tuple)or( type(eval(val)) is int and eval(val)<0 ) :
28     chosenJobsList = parsed_range
29     chosenJobsList = [i-1 for i in chosenJobsList ]
30 ewv 1.92 nsjobs = len(chosenJobsList)
31 slacapra 1.84 #
32     else:
33     msg = 'Bad submission option <'+str(val)+'>\n'
34     msg += ' Must be an integer or "all"'
35     msg += ' Generic range is not allowed"'
36     raise CrabException(msg)
37     pass
38 ewv 1.92
39 slacapra 1.84 common.logger.debug(5,'nsjobs '+str(nsjobs))
40     # total jobs
41     nj_list = []
42     # get the first not already submitted
43 spiga 1.94 common.logger.debug(5,'Total jobs '+str(common._db.nJobs()))
44 slacapra 1.84 jobSetForSubmission = 0
45     jobSkippedInSubmission = []
46     datasetpath=self.cfg_params['CMSSW.datasetpath']
47    
48     # NEW PART # Fabio
49     # modified to handle list of jobs by the users # Fabio
50 spiga 1.94 tmp_jList = range(common._db.nJobs())
51 slacapra 1.84 if chosenJobsList != None:
52     tmp_jList = chosenJobsList
53     # build job list
54     from BlackWhiteListParser import BlackWhiteListParser
55     self.blackWhiteListParser = BlackWhiteListParser(self.cfg_params)
56     for nj in tmp_jList:
57 spiga 1.94 jobs=[]
58     jobs.append(nj)
59     cleanedBlackWhiteList = self.blackWhiteListParser.cleanForBlackWhiteList(common._db.queryJob('dlsDestination',jobs)) # More readable # Fabio
60 slacapra 1.84 if (cleanedBlackWhiteList != '') or (datasetpath == "None" ) or (datasetpath == None): ## Matty's fix
61 spiga 1.94 if (common._db.queryRunJob('status',jobs) not in ['R','S','K','Y','A','D','Z']):
62 slacapra 1.84 jobSetForSubmission +=1
63 spiga 1.94 nj_list.append(nj+1)## Warning added +1 for jobId BL--DS
64 ewv 1.92 else:
65 slacapra 1.84 continue
66     else :
67     jobSkippedInSubmission.append(nj+1)
68     #
69     if nsjobs >0 and nsjobs == jobSetForSubmission:
70     break
71     pass
72     del tmp_jList
73     #
74    
75 spiga 1.94
76 slacapra 1.84 if nsjobs>jobSetForSubmission:
77     common.logger.message('asking to submit '+str(nsjobs)+' jobs, but only '+str(jobSetForSubmission)+' left: submitting those')
78     if len(jobSkippedInSubmission) > 0 :
79     #print jobSkippedInSubmission
80     #print spanRanges(jobSkippedInSubmission)
81     mess =""
82     for jobs in jobSkippedInSubmission:
83     mess += str(jobs) + ","
84     common.logger.message("Jobs: " +str(mess) + "\n skipped because no sites are hosting this data\n")
85 slacapra 1.89 self.submissionError()
86     pass
87 slacapra 1.84 # submit N from last submitted job
88     common.logger.debug(5,'nj_list '+str(nj_list))
89 ewv 1.92
90    
91 ewv 1.93 if common.scheduler.name().upper() == 'CONDOR_G':
92 gutsche 1.58 # create hash of cfg file
93     self.hash = makeCksum(common.work_space.cfgFileName())
94     else:
95     self.hash = ''
96 corvo 1.30
97 slacapra 1.84 self.nj_list = nj_list
98    
99     self.UseServer=int(self.cfg_params.get('CRAB.server_mode',0))
100 spiga 1.77
101 nsmirnov 1.1 return
102 ewv 1.92
103 nsmirnov 1.1 def run(self):
104 nsmirnov 1.2 """
105 slacapra 1.53 The main method of the class: submit jobs in range self.nj_list
106 nsmirnov 1.2 """
107     common.logger.debug(5, "Submitter::run() called")
108 slacapra 1.24
109 spiga 1.94 totalCreatedJobs = 0
110 corvo 1.33 start = time.time()
111 spiga 1.94 for nj in range(common._db.nJobs()):
112     jobs=[]
113     jobs.append(nj)
114     st = common._db.queryRunJob('status',jobs)[0]
115     if ( st in ['C','RC']):totalCreatedJobs +=1
116 slacapra 1.24 pass
117    
118     if (totalCreatedJobs==0):
119     common.logger.message("No jobs to be submitted: first create them")
120 slacapra 1.28 return
121 slacapra 1.60
122 gutsche 1.70 # submit pre DashBoard information
123     params = {'jobId':'TaskMeta'}
124 ewv 1.92
125 slacapra 1.83 fl = open(common.work_space.shareDir() + '/' + common.apmon.fName, 'r')
126 gutsche 1.70 for i in fl.readlines():
127 ewv 1.92 try:
128     key, val = i.split(':')
129     params[key] = string.strip(val)
130     except ValueError: # Not in the right format
131     pass
132 slacapra 1.85 fl.close()
133 gutsche 1.70
134     common.logger.debug(5,'Submission DashBoard Pre-Submission report: '+str(params))
135 ewv 1.92
136 slacapra 1.83 common.apmon.sendToML(params)
137 gutsche 1.70
138 spiga 1.94 ### define here the list of distinct destinations sites list
139     # distinct_dests = common._db.queryDistJob('dlsDestination')
140     distinct_dests = common._db.queryDistJob_Attr('dlsDestination', 'jobId' ,self.nj_list)
141    
142    
143     ### define here the list of jobs Id for each distinct list of sites
144     sub_jobs =[] # list of jobs Id list to submit
145     match_jobs =[] # list of jobs Id to match
146     all_jobs=[]
147     count=0
148     for distDest in distinct_dests:
149     all_jobs.append(common._db.queryAttrJob({'dlsDestination':distDest},'jobId'))
150     sub_jobs_temp=[]
151     for i in self.nj_list:
152     if i in all_jobs[0]: sub_jobs_temp.append(i)
153     if len(sub_jobs_temp)>0:
154     sub_jobs.append(sub_jobs_temp)
155     match_jobs.append(sub_jobs[count][0])
156     count +=1
157     sel=0
158     matched=[]
159     for id_job in match_jobs:
160     if common.scheduler.name().upper() != "CONDOR_G" :
161     #match = common.scheduler.listMatch(id_job)
162     match = "1"
163     else :
164     match = "1"
165     if match:
166     common.logger.message("Found "+str(match)+" compatible site(s) for job "+str(id_job))
167     matched.append(sel)
168 spiga 1.77 else:
169 spiga 1.94 common.logger.message("No compatible site found, will not submit jobs "+str(sub_jobs[sel]))
170     self.submissionError()
171     sel += 1
172 ewv 1.92
173 spiga 1.94 ### Progress Bar indicator, deactivate for debug
174     if not common.logger.debugLevel() :
175 gutsche 1.65 term = TerminalController()
176 spiga 1.94
177     if len(matched)>0:
178     common.logger.message(str(len(matched))+" blocks of jobs will be submitted")
179     for ii in matched:
180     common.logger.debug(1,'Submitting jobs '+str(sub_jobs[ii]))
181 corvo 1.74 if not common.logger.debugLevel() :
182 spiga 1.94 try: pbar = ProgressBar(term, 'Submitting '+str(len(sub_jobs[ii]))+' jobs')
183 corvo 1.74 except: pbar = None
184 spiga 1.94 print sub_jobs[ii]
185 mcinquil 1.78
186 spiga 1.94 common.scheduler.submit(sub_jobs[ii])
187 mcinquil 1.78
188 spiga 1.94 ### Ask To StefanoL
189     if not common.logger.debugLevel():
190     if pbar :
191     pbar.update(float(ii+1)/float(len(sub_jobs)),'please wait')
192     ### check the if the submission succeded Maybe not neede
193     if not common.logger.debugLevel():
194     if pbar :
195     pbar.update(float(ii+1)/float(len(sub_jobs)),'please wait')
196     ### check the if the submission succeded Maybe not neede
197     if not common.logger.debugLevel():
198     if pbar :
199     pbar.update(float(ii+1)/float(len(sub_jobs)),'please wait')
200     ### check the if the submission succeded Maybe not neede
201 corvo 1.74 if not common.logger.debugLevel():
202     if pbar :
203 spiga 1.94 pbar.update(float(ii+1)/float(len(sub_jobs)),'please wait')
204 ewv 1.92
205 spiga 1.94 ### check the if the submission succeded Maybe not needed or at least simplified
206     njs=0
207     jid = common._db.queryRunJob('schedulerId',sub_jobs[ii])
208     st = common._db.queryRunJob('status',sub_jobs[ii])
209     run_jobToSave = {'status' :'S'}
210     for j in range(len(sub_jobs[ii])): # Add loop over SID returned from group submission DS
211     if jid[j] != '':
212     #if (st[j]=='S'):
213     common._db.updateRunJob_(nj+1, run_jobToSave ) ## New BL--DS
214     common.logger.debug(5,"Submitted job # "+ str(sub_jobs[ii][j]))
215     njs += 1
216     #
217     # ##### DashBoard report #####################
218     # Sub_Type = 'Direct'
219     #
220     # # OLI: JobID treatment, special for Condor-G scheduler
221     # jobId = ''
222     # localId = ''
223     # if common.scheduler.name().upper() == 'CONDOR_G':
224     # rb = 'OSG'
225     # jobId = str(jj) + '_' + self.hash + '_' + jid
226     # common.logger.debug(5,'JobID for ML monitoring is created for CONDOR_G scheduler:'+jobId)
227     # elif common.scheduler.name() == 'lsf' or common.scheduler.name() == 'caf':
228     # jobId="https://"+common.scheduler.name()+":/"+jid+"-"+string.replace(common.taskDB.dict('taskId'),"_","-")+"-"+str(jj)
229     # common.logger.debug(5,'JobID for ML monitoring is created for LSF scheduler:'+jobId)
230     # rb = common.scheduler.name()
231     # localId = jid
232     # else:
233     # jobId = str(jj) + '_' + jid
234     # common.logger.debug(5,'JobID for ML monitoring is created for EDG scheduler'+jobId)
235     # rb = jid.split(':')[1]
236     # rb = rb.replace('//', '')
237     #
238     # if len(common.jobDB.destination(tmpNj)) <= 2 :
239     # T_SE=string.join((common.jobDB.destination(tmpNj)),",")
240     # else :
241     # T_SE=str(len(common.jobDB.destination(tmpNj)))+'_Selected_SE'
242     #
243     # params = {'jobId': jobId, \
244     # 'sid': jid, \
245     # 'broker': rb, \
246     # 'bossId': jj, \
247     # 'SubmissionType': Sub_Type, \
248     # 'TargetSE': T_SE, \
249     # 'localId' : localId}
250     # common.logger.debug(5,str(params))
251     #
252     # fl = open(common.work_space.shareDir() + '/' + common.apmon.fName, 'r')
253     # for i in fl.readlines():
254     # key, val = i.split(':')
255     # params[key] = string.strip(val)
256     # fl.close()
257     #
258     # common.logger.debug(5,'Submission DashBoard report: '+str(params))
259     #
260     # common.apmon.sendToML(params)
261     # pass
262     # pass
263     #
264     else:
265     common.logger.message("The whole task doesn't found compatible site ")
266 ewv 1.92
267 corvo 1.30 stop = time.time()
268 gutsche 1.47 common.logger.debug(1, "Submission Time: "+str(stop - start))
269 slacapra 1.60 common.logger.write("Submission time :"+str(stop - start))
270 ewv 1.92
271 nsmirnov 1.6 msg = '\nTotal of %d jobs submitted'%njs
272     if njs != len(self.nj_list) :
273 spiga 1.94 msg += ' (from %d requested).\n'%(len(self.nj_list))
274 nsmirnov 1.6 pass
275     else:
276 spiga 1.94 msg += '.\n'
277 slacapra 1.89 if self.cfg_params.has_key('EDG.se_white_list'):
278     msg += 'SE White List: '+self.cfg_params['EDG.se_white_list']+'\n'
279     if self.cfg_params.has_key('EDG.se_black_list'):
280     msg += 'SE Black List: '+self.cfg_params['EDG.se_black_list']+'\n'
281     if self.cfg_params.has_key('EDG.ce_white_list'):
282     msg += 'CE White List: '+self.cfg_params['EDG.ce_white_list']+'\n'
283     if self.cfg_params.has_key('EDG.ce_black_list'):
284     msg += 'CE Black List: '+self.cfg_params['EDG.ce_black_list']+'\n'
285 slacapra 1.90 msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\nPlease check if the dataset is available at this site!)\n'
286 slacapra 1.89
287     common.logger.message(msg)
288 ewv 1.92
289    
290 nsmirnov 1.1 return