ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Submitter.py
Revision: 1.104
Committed: Wed Mar 26 17:40:57 2008 UTC (17 years, 1 month ago) by fanzago
Content type: text/x-python
Branch: MAIN
Changes since 1.103: +8 -52 lines
Log Message:
fixed datasetpath written in wrong way

File Contents

# User Rev Content
1 nsmirnov 1.1 from Actor import *
2 nsmirnov 1.6 from crab_util import *
3 nsmirnov 1.2 import common
4 corvo 1.9 from ApmonIf import ApmonIf
5 slacapra 1.60 #from random import random
6 corvo 1.30 import time
7 slacapra 1.60 from ProgressBar import ProgressBar
8     from TerminalController import TerminalController
9 nsmirnov 1.1
10     class Submitter(Actor):
11 slacapra 1.84 def __init__(self, cfg_params, parsed_range, val):
12 nsmirnov 1.1 self.cfg_params = cfg_params
13 slacapra 1.84
14     # get user request
15     nsjobs = -1
16     chosenJobsList = None
17     if val:
18 slacapra 1.91 if val=='range': # for Resubmitter
19     chosenJobsList = parsed_range
20 ewv 1.92 elif val=='all':
21 slacapra 1.84 pass
22     elif (type(eval(val)) is int) and eval(val) > 0:
23     # positive number
24     nsjobs = eval(val)
25     elif (type(eval(val)) is tuple)or( type(eval(val)) is int and eval(val)<0 ) :
26     chosenJobsList = parsed_range
27 ewv 1.92 nsjobs = len(chosenJobsList)
28 slacapra 1.84 else:
29     msg = 'Bad submission option <'+str(val)+'>\n'
30     msg += ' Must be an integer or "all"'
31     msg += ' Generic range is not allowed"'
32     raise CrabException(msg)
33     pass
34 ewv 1.92
35 slacapra 1.84 common.logger.debug(5,'nsjobs '+str(nsjobs))
36     # total jobs
37     nj_list = []
38     # get the first not already submitted
39 spiga 1.94 common.logger.debug(5,'Total jobs '+str(common._db.nJobs()))
40 slacapra 1.84 jobSetForSubmission = 0
41     jobSkippedInSubmission = []
42     datasetpath=self.cfg_params['CMSSW.datasetpath']
43 slacapra 1.97 if string.lower(datasetpath)=='none':
44 fanzago 1.104 datasetpath = None
45 spiga 1.98 tmp_jList = common._db.nJobs('list')
46 slacapra 1.84 if chosenJobsList != None:
47     tmp_jList = chosenJobsList
48     # build job list
49     from BlackWhiteListParser import BlackWhiteListParser
50     self.blackWhiteListParser = BlackWhiteListParser(self.cfg_params)
51 spiga 1.98 dlsDest=common._db.queryJob('dlsDestination',tmp_jList)
52     jStatus=common._db.queryRunJob('status',tmp_jList)
53     for nj in range(len(tmp_jList)):
54     cleanedBlackWhiteList = self.blackWhiteListParser.cleanForBlackWhiteList(dlsDest[nj])
55 slacapra 1.97 if (cleanedBlackWhiteList != '') or (datasetpath == None): ## Matty's fix
56 spiga 1.100 ##if ( jStatus[nj] not in ['R','S','K','Y','A','D','Z']): ## here the old flags
57 spiga 1.101 if ( jStatus[nj] not in ['SS','SU','SR','R','S','K','Y','A','D','Z']):
58     #nj_list.append(nj+1)## Warning added +1 for jobId BL--DS
59 slacapra 1.84 jobSetForSubmission +=1
60 spiga 1.100 nj_list.append(tmp_jList[nj])## Warning added +1 for jobId BL--DS
61 ewv 1.92 else:
62 slacapra 1.84 continue
63     else :
64 spiga 1.100 jobSkippedInSubmission.append(tmp_jList[nj])
65 slacapra 1.84 if nsjobs >0 and nsjobs == jobSetForSubmission:
66     break
67     pass
68 fanzago 1.104 ##### FEDE #####
69     print "nsjobs = ", nsjobs
70     print "jobSkippedInSubmission = ", jobSkippedInSubmission
71     ################
72    
73 slacapra 1.84 if nsjobs>jobSetForSubmission:
74     common.logger.message('asking to submit '+str(nsjobs)+' jobs, but only '+str(jobSetForSubmission)+' left: submitting those')
75     if len(jobSkippedInSubmission) > 0 :
76     mess =""
77     for jobs in jobSkippedInSubmission:
78     mess += str(jobs) + ","
79     common.logger.message("Jobs: " +str(mess) + "\n skipped because no sites are hosting this data\n")
80 slacapra 1.89 self.submissionError()
81     pass
82 slacapra 1.84 # submit N from last submitted job
83     common.logger.debug(5,'nj_list '+str(nj_list))
84 ewv 1.92
85 ewv 1.93 if common.scheduler.name().upper() == 'CONDOR_G':
86 gutsche 1.58 # create hash of cfg file
87     self.hash = makeCksum(common.work_space.cfgFileName())
88     else:
89     self.hash = ''
90 corvo 1.30
91 slacapra 1.84 self.nj_list = nj_list
92 nsmirnov 1.1 return
93 ewv 1.92
94 nsmirnov 1.1 def run(self):
95 nsmirnov 1.2 """
96 slacapra 1.53 The main method of the class: submit jobs in range self.nj_list
97 nsmirnov 1.2 """
98     common.logger.debug(5, "Submitter::run() called")
99 slacapra 1.24
100 spiga 1.94 totalCreatedJobs = 0
101 corvo 1.33 start = time.time()
102 spiga 1.98 jList=common._db.nJobs('list')
103     st = common._db.queryRunJob('status',jList)
104     for nj in range(len(jList)):
105     if ( st[nj] in ['C','RC']):totalCreatedJobs +=1
106 slacapra 1.24 pass
107    
108     if (totalCreatedJobs==0):
109     common.logger.message("No jobs to be submitted: first create them")
110 slacapra 1.28 return
111 slacapra 1.60
112 gutsche 1.70 # submit pre DashBoard information
113     params = {'jobId':'TaskMeta'}
114 ewv 1.92
115 slacapra 1.83 fl = open(common.work_space.shareDir() + '/' + common.apmon.fName, 'r')
116 gutsche 1.70 for i in fl.readlines():
117 ewv 1.92 try:
118     key, val = i.split(':')
119     params[key] = string.strip(val)
120     except ValueError: # Not in the right format
121     pass
122 slacapra 1.85 fl.close()
123 gutsche 1.70
124     common.logger.debug(5,'Submission DashBoard Pre-Submission report: '+str(params))
125 ewv 1.92
126 slacapra 1.83 common.apmon.sendToML(params)
127 gutsche 1.70
128 spiga 1.94 ### define here the list of distinct destinations sites list
129     # distinct_dests = common._db.queryDistJob('dlsDestination')
130     distinct_dests = common._db.queryDistJob_Attr('dlsDestination', 'jobId' ,self.nj_list)
131    
132    
133     ### define here the list of jobs Id for each distinct list of sites
134     sub_jobs =[] # list of jobs Id list to submit
135 spiga 1.95 jobs_to_match =[] # list of jobs Id to match
136 spiga 1.94 all_jobs=[]
137     count=0
138 fanzago 1.104 njs=0
139 spiga 1.94 for distDest in distinct_dests:
140     all_jobs.append(common._db.queryAttrJob({'dlsDestination':distDest},'jobId'))
141     sub_jobs_temp=[]
142     for i in self.nj_list:
143 spiga 1.103 if i in all_jobs[count]: sub_jobs_temp.append(i)
144 spiga 1.94 if len(sub_jobs_temp)>0:
145     sub_jobs.append(sub_jobs_temp)
146 spiga 1.95 jobs_to_match.append(sub_jobs[count][0])
147 spiga 1.103 count +=1
148 spiga 1.94 sel=0
149     matched=[]
150 spiga 1.95 Requi=[]
151    
152     task=common._db.getTask()
153 spiga 1.98 ce_white_list=common.scheduler.ce_list()[1]
154     ce_black_list=common.scheduler.ce_list()[2]
155     tags_tmp=string.split(task['jobType'],'"')
156     tags=[str(tags_tmp[1]),str(tags_tmp[3])]
157 spiga 1.95
158     for id_job in jobs_to_match :
159     Requi.append(common.scheduler.sched_parameter(id_job,task))
160 spiga 1.94 if common.scheduler.name().upper() != "CONDOR_G" :
161 spiga 1.98 cleanedList=None
162     if len(distinct_dests[sel])!=0:cleanedList = self.blackWhiteListParser.cleanForBlackWhiteList(distinct_dests[sel],'list')
163     match = common.scheduler.listMatch(tags,cleanedList,ce_white_list,ce_black_list)
164 spiga 1.94 else :
165     match = "1"
166     if match:
167     common.logger.message("Found "+str(match)+" compatible site(s) for job "+str(id_job))
168     matched.append(sel)
169 spiga 1.77 else:
170 spiga 1.94 common.logger.message("No compatible site found, will not submit jobs "+str(sub_jobs[sel]))
171     self.submissionError()
172     sel += 1
173 ewv 1.92
174 spiga 1.94 ### Progress Bar indicator, deactivate for debug
175     if not common.logger.debugLevel() :
176 gutsche 1.65 term = TerminalController()
177 spiga 1.94
178     if len(matched)>0:
179     common.logger.message(str(len(matched))+" blocks of jobs will be submitted")
180     for ii in matched:
181     common.logger.debug(1,'Submitting jobs '+str(sub_jobs[ii]))
182 spiga 1.100 common.scheduler.submit(sub_jobs[ii],Requi[ii])
183 corvo 1.74 if not common.logger.debugLevel() :
184 spiga 1.94 try: pbar = ProgressBar(term, 'Submitting '+str(len(sub_jobs[ii]))+' jobs')
185 corvo 1.74 except: pbar = None
186 spiga 1.94 if not common.logger.debugLevel():
187     if pbar :
188     pbar.update(float(ii+1)/float(len(sub_jobs)),'please wait')
189     ### check the if the submission succeded Maybe not neede
190     if not common.logger.debugLevel():
191     if pbar :
192     pbar.update(float(ii+1)/float(len(sub_jobs)),'please wait')
193     ### check the if the submission succeded Maybe not neede
194     if not common.logger.debugLevel():
195     if pbar :
196     pbar.update(float(ii+1)/float(len(sub_jobs)),'please wait')
197     ### check the if the submission succeded Maybe not neede
198 corvo 1.74 if not common.logger.debugLevel():
199     if pbar :
200 spiga 1.94 pbar.update(float(ii+1)/float(len(sub_jobs)),'please wait')
201 ewv 1.92
202 spiga 1.94 ### check the if the submission succeded Maybe not needed or at least simplified
203 fanzago 1.104 #njs=0
204 spiga 1.102 sched_Id = common._db.queryRunJob('schedulerId', sub_jobs[ii])
205 spiga 1.95 listId=[]
206 spiga 1.94 run_jobToSave = {'status' :'S'}
207 spiga 1.102 for j in range(len(sub_jobs[ii])): # Add loop over SID returned from group submission DS
208     if str(sched_Id[j]) != '':
209 spiga 1.94 #if (st[j]=='S'):
210 spiga 1.102 listId.append(sub_jobs[ii][j])
211     common.logger.debug(5,"Submitted job # "+ str(sub_jobs[ii][j]))
212 spiga 1.94 njs += 1
213 spiga 1.95 common._db.updateRunJob_(listId, run_jobToSave ) ## New BL--DS
214 spiga 1.94 else:
215     common.logger.message("The whole task doesn't found compatible site ")
216 ewv 1.92
217 corvo 1.30 stop = time.time()
218 gutsche 1.47 common.logger.debug(1, "Submission Time: "+str(stop - start))
219 slacapra 1.60 common.logger.write("Submission time :"+str(stop - start))
220 ewv 1.92
221 nsmirnov 1.6 msg = '\nTotal of %d jobs submitted'%njs
222     if njs != len(self.nj_list) :
223 spiga 1.99 msg += ' (from %d requested).'%(len(self.nj_list))
224 nsmirnov 1.6 else:
225 spiga 1.99 msg += '.'
226     common.logger.message(msg)
227    
228     if (njs < len(self.nj_list) or len(self.nj_list)==0):
229     self.submissionError()
230     return
231    
232     def submissionError(self):
233     ## add some more verbose message in case submission is not complete
234     msg = 'Submission performed using the Requirements: \n'
235     ### TODO_ DS--BL
236     #msg += common.taskDB.dict("jobtype")+' version: '+common.taskDB.dict("codeVersion")+'\n'
237     #msg += '(Hint: please check if '+common.taskDB.dict("jobtype")+' is available at the Sites)\n'
238     if self.cfg_params.has_key('EDG.se_white_list'):
239     msg += 'SE White List: '+self.cfg_params['EDG.se_white_list']+'\n'
240     if self.cfg_params.has_key('EDG.se_black_list'):
241     msg += 'SE Black List: '+self.cfg_params['EDG.se_black_list']+'\n'
242     if self.cfg_params.has_key('EDG.ce_white_list'):
243     msg += 'CE White List: '+self.cfg_params['EDG.ce_white_list']+'\n'
244     if self.cfg_params.has_key('EDG.ce_black_list'):
245     msg += 'CE Black List: '+self.cfg_params['EDG.ce_black_list']+'\n'
246     msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\nPlease check if the dataset is available at this site!)\n'
247    
248 slacapra 1.89 common.logger.message(msg)
249 ewv 1.92
250    
251 nsmirnov 1.1 return