ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Submitter.py
Revision: 1.123
Committed: Thu Jun 12 18:46:16 2008 UTC (16 years, 10 months ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_3_0, CRAB_2_3_0_pre6, CRAB_2_3_0_pre1
Branch point for: CRAB_2_3_0_br
Changes since 1.122: +1 -1 lines
Log Message:
 check also TEMPORARY EE status

File Contents

# User Rev Content
1 nsmirnov 1.1 from Actor import *
2 nsmirnov 1.6 from crab_util import *
3 nsmirnov 1.2 import common
4 corvo 1.9 from ApmonIf import ApmonIf
5 slacapra 1.60 #from random import random
6 corvo 1.30 import time
7 slacapra 1.60 from ProgressBar import ProgressBar
8     from TerminalController import TerminalController
9 nsmirnov 1.1
10     class Submitter(Actor):
11 slacapra 1.84 def __init__(self, cfg_params, parsed_range, val):
12 nsmirnov 1.1 self.cfg_params = cfg_params
13 slacapra 1.84
14     # get user request
15     nsjobs = -1
16     chosenJobsList = None
17     if val:
18 slacapra 1.91 if val=='range': # for Resubmitter
19     chosenJobsList = parsed_range
20 ewv 1.92 elif val=='all':
21 slacapra 1.84 pass
22     elif (type(eval(val)) is int) and eval(val) > 0:
23     # positive number
24     nsjobs = eval(val)
25     elif (type(eval(val)) is tuple)or( type(eval(val)) is int and eval(val)<0 ) :
26     chosenJobsList = parsed_range
27 ewv 1.92 nsjobs = len(chosenJobsList)
28 slacapra 1.84 else:
29     msg = 'Bad submission option <'+str(val)+'>\n'
30     msg += ' Must be an integer or "all"'
31     msg += ' Generic range is not allowed"'
32     raise CrabException(msg)
33     pass
34 ewv 1.92
35 slacapra 1.84 common.logger.debug(5,'nsjobs '+str(nsjobs))
36     # total jobs
37     nj_list = []
38     # get the first not already submitted
39 spiga 1.115 self.complete_List = common._db.nJobs('list')
40     common.logger.debug(5,'Total jobs '+str(len(self.complete_List)))
41 slacapra 1.84 jobSetForSubmission = 0
42     jobSkippedInSubmission = []
43     datasetpath=self.cfg_params['CMSSW.datasetpath']
44 slacapra 1.97 if string.lower(datasetpath)=='none':
45 fanzago 1.104 datasetpath = None
46 spiga 1.115 tmp_jList = self.complete_List
47 slacapra 1.84 if chosenJobsList != None:
48     tmp_jList = chosenJobsList
49     # build job list
50     from BlackWhiteListParser import BlackWhiteListParser
51     self.blackWhiteListParser = BlackWhiteListParser(self.cfg_params)
52 spiga 1.98 dlsDest=common._db.queryJob('dlsDestination',tmp_jList)
53     jStatus=common._db.queryRunJob('status',tmp_jList)
54     for nj in range(len(tmp_jList)):
55     cleanedBlackWhiteList = self.blackWhiteListParser.cleanForBlackWhiteList(dlsDest[nj])
56 spiga 1.120 if (cleanedBlackWhiteList != '') or (datasetpath == None):
57 spiga 1.123 if ( jStatus[nj] not in ['SS','SU','SR','R','S','K','Y','A','D','Z','E','EE','SSE']):
58 slacapra 1.84 jobSetForSubmission +=1
59 spiga 1.120 nj_list.append(tmp_jList[nj])
60 ewv 1.92 else:
61 slacapra 1.84 continue
62     else :
63 spiga 1.100 jobSkippedInSubmission.append(tmp_jList[nj])
64 slacapra 1.84 if nsjobs >0 and nsjobs == jobSetForSubmission:
65     break
66     pass
67 fanzago 1.104
68 slacapra 1.84 if nsjobs>jobSetForSubmission:
69     common.logger.message('asking to submit '+str(nsjobs)+' jobs, but only '+str(jobSetForSubmission)+' left: submitting those')
70     if len(jobSkippedInSubmission) > 0 :
71     mess =""
72     for jobs in jobSkippedInSubmission:
73     mess += str(jobs) + ","
74     common.logger.message("Jobs: " +str(mess) + "\n skipped because no sites are hosting this data\n")
75 slacapra 1.89 self.submissionError()
76     pass
77 slacapra 1.84 # submit N from last submitted job
78     common.logger.debug(5,'nj_list '+str(nj_list))
79 ewv 1.92
80 corvo 1.30
81 slacapra 1.84 self.nj_list = nj_list
82 nsmirnov 1.1 return
83 ewv 1.92
84 nsmirnov 1.1 def run(self):
85 nsmirnov 1.2 """
86 slacapra 1.53 The main method of the class: submit jobs in range self.nj_list
87 nsmirnov 1.2 """
88     common.logger.debug(5, "Submitter::run() called")
89 slacapra 1.24
90 spiga 1.112 start = time.time()
91    
92     check = self.checkIfCreate()
93    
94     if check == 0 :
95     self.SendMLpre()
96    
97     list_matched , task = self.performMatch()
98     njs = self.perfromSubmission(list_matched, task)
99    
100     stop = time.time()
101     common.logger.debug(1, "Submission Time: "+str(stop - start))
102     common.logger.write("Submission time :"+str(stop - start))
103    
104     msg = '\nTotal of %d jobs submitted'%njs
105     if njs != len(self.nj_list) :
106     msg += ' (from %d requested).'%(len(self.nj_list))
107     else:
108     msg += '.'
109     common.logger.message(msg)
110    
111     if (njs < len(self.nj_list) or len(self.nj_list)==0):
112     self.submissionError()
113    
114    
115     def checkIfCreate(self):
116     """
117     """
118     code = 0
119 spiga 1.94 totalCreatedJobs = 0
120 spiga 1.98 jList=common._db.nJobs('list')
121     st = common._db.queryRunJob('status',jList)
122     for nj in range(len(jList)):
123     if ( st[nj] in ['C','RC']):totalCreatedJobs +=1
124 slacapra 1.24 pass
125    
126     if (totalCreatedJobs==0):
127 spiga 1.112 common.logger.message("No jobs to be submitted: first create them")
128     code = 1
129     return code
130 ewv 1.92
131 gutsche 1.70
132 spiga 1.112 def performMatch(self):
133 spiga 1.113 """
134     """
135 spiga 1.114 common.logger.message("Checking available resources...")
136 spiga 1.94 ### define here the list of distinct destinations sites list
137     distinct_dests = common._db.queryDistJob_Attr('dlsDestination', 'jobId' ,self.nj_list)
138    
139    
140     ### define here the list of jobs Id for each distinct list of sites
141 spiga 1.112 self.sub_jobs =[] # list of jobs Id list to submit
142 spiga 1.95 jobs_to_match =[] # list of jobs Id to match
143 spiga 1.94 all_jobs=[]
144     count=0
145     for distDest in distinct_dests:
146     all_jobs.append(common._db.queryAttrJob({'dlsDestination':distDest},'jobId'))
147     sub_jobs_temp=[]
148     for i in self.nj_list:
149 spiga 1.103 if i in all_jobs[count]: sub_jobs_temp.append(i)
150 spiga 1.94 if len(sub_jobs_temp)>0:
151 spiga 1.112 self.sub_jobs.append(sub_jobs_temp)
152     jobs_to_match.append(self.sub_jobs[count][0])
153 spiga 1.103 count +=1
154 spiga 1.94 sel=0
155     matched=[]
156 spiga 1.95
157     task=common._db.getTask()
158    
159     for id_job in jobs_to_match :
160 spiga 1.121 match = common.scheduler.listMatch(distinct_dests[sel], False)
161 slacapra 1.111 if len(match)>0:
162 spiga 1.121 common.logger.message("Found compatible site(s) for job "+str(id_job))
163 slacapra 1.110 matched.append(sel)
164 spiga 1.77 else:
165 spiga 1.112 common.logger.message("No compatible site found, will not submit jobs "+str(self.sub_jobs[sel]))
166 slacapra 1.110 self.submissionError()
167 spiga 1.94 sel += 1
168 ewv 1.92
169 spiga 1.112 return matched , task
170    
171     def perfromSubmission(self,matched,task):
172    
173     njs=0
174    
175 spiga 1.94 ### Progress Bar indicator, deactivate for debug
176     if not common.logger.debugLevel() :
177 slacapra 1.110 term = TerminalController()
178 spiga 1.94
179     if len(matched)>0:
180     common.logger.message(str(len(matched))+" blocks of jobs will be submitted")
181     for ii in matched:
182 spiga 1.112 common.logger.debug(1,'Submitting jobs '+str(self.sub_jobs[ii]))
183    
184 slacapra 1.110 try:
185 spiga 1.112 common.scheduler.submit(self.sub_jobs[ii],task)
186 slacapra 1.110 except CrabException:
187     raise CrabException("Job not submitted")
188    
189 corvo 1.74 if not common.logger.debugLevel() :
190 spiga 1.112 try: pbar = ProgressBar(term, 'Submitting '+str(len(self.sub_jobs[ii]))+' jobs')
191 corvo 1.74 except: pbar = None
192 spiga 1.94 if not common.logger.debugLevel():
193     if pbar :
194 spiga 1.112 pbar.update(float(ii+1)/float(len(self.sub_jobs)),'please wait')
195 spiga 1.94 ### check the if the submission succeded Maybe not neede
196     if not common.logger.debugLevel():
197     if pbar :
198 spiga 1.112 pbar.update(float(ii+1)/float(len(self.sub_jobs)),'please wait')
199 ewv 1.92
200 spiga 1.94 ### check the if the submission succeded Maybe not needed or at least simplified
201 spiga 1.112 sched_Id = common._db.queryRunJob('schedulerId', self.sub_jobs[ii])
202 spiga 1.95 listId=[]
203 spiga 1.94 run_jobToSave = {'status' :'S'}
204 spiga 1.108 listRunField = []
205 spiga 1.120 for j in range(len(self.sub_jobs[ii])):
206 spiga 1.102 if str(sched_Id[j]) != '':
207 spiga 1.112 listId.append(self.sub_jobs[ii][j])
208 spiga 1.108 listRunField.append(run_jobToSave)
209 spiga 1.112 common.logger.debug(5,"Submitted job # "+ str(self.sub_jobs[ii][j]))
210 spiga 1.94 njs += 1
211 spiga 1.120 common._db.updateRunJob_(listId, listRunField)
212 spiga 1.112 self.SendMLpost(self.sub_jobs[ii])
213    
214 spiga 1.94 else:
215     common.logger.message("The whole task doesn't found compatible site ")
216 ewv 1.92
217 spiga 1.112 return njs
218 spiga 1.99
219     def submissionError(self):
220     ## add some more verbose message in case submission is not complete
221     msg = 'Submission performed using the Requirements: \n'
222     ### TODO_ DS--BL
223     #msg += common.taskDB.dict("jobtype")+' version: '+common.taskDB.dict("codeVersion")+'\n'
224     #msg += '(Hint: please check if '+common.taskDB.dict("jobtype")+' is available at the Sites)\n'
225     if self.cfg_params.has_key('EDG.se_white_list'):
226     msg += 'SE White List: '+self.cfg_params['EDG.se_white_list']+'\n'
227     if self.cfg_params.has_key('EDG.se_black_list'):
228     msg += 'SE Black List: '+self.cfg_params['EDG.se_black_list']+'\n'
229     if self.cfg_params.has_key('EDG.ce_white_list'):
230     msg += 'CE White List: '+self.cfg_params['EDG.ce_white_list']+'\n'
231     if self.cfg_params.has_key('EDG.ce_black_list'):
232     msg += 'CE Black List: '+self.cfg_params['EDG.ce_black_list']+'\n'
233     msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\nPlease check if the dataset is available at this site!)\n'
234 spiga 1.112 common.logger.message(msg)
235    
236     return
237 spiga 1.99
238 spiga 1.112 def collect_MLInfo(self):
239     """
240     Preapre DashBoard information
241     """
242 ewv 1.92
243 spiga 1.112 taskId=str("_".join(common._db.queryTask('name').split('_')[:-1]))
244     gridName = string.strip(common.scheduler.userName())
245     common.logger.debug(5, "GRIDNAME: "+gridName)
246     taskType = 'analysis'
247     # version
248    
249     self.datasetPath = self.cfg_params['CMSSW.datasetpath']
250     if string.lower(self.datasetPath)=='none':
251     self.datasetPath = None
252     self.executable = self.cfg_params.get('CMSSW.executable','cmsRun')
253     VO = self.cfg_params.get('EDG.virtual_organization','cms')
254    
255     params = {'tool': common.prog_name,\
256     'JSToolVersion': common.prog_version_str, \
257 slacapra 1.122 'tool_ui': os.environ.get('HOSTNAME',''), \
258 spiga 1.112 'scheduler': common.scheduler.name(), \
259     'GridName': gridName, \
260     'taskType': taskType, \
261     'vo': VO, \
262 slacapra 1.122 'user': os.environ.get('USER',''), \
263 spiga 1.112 'taskId': taskId, \
264     'datasetFull': self.datasetPath, \
265     'exe': self.executable }
266    
267     return params
268    
269     def SendMLpre(self):
270     """
271     Send Pre info to ML
272     """
273     params = self.collect_MLInfo()
274    
275     params['jobId'] ='TaskMeta'
276    
277     common.apmon.sendToML(params)
278    
279     common.logger.debug(5,'Submission DashBoard Pre-Submission report: '+str(params))
280    
281     return
282 ewv 1.92
283 spiga 1.112 def SendMLpost(self,allList):
284     """
285     Send post-submission info to ML
286     """
287     task = common._db.getTask(allList)
288    
289     params = {}
290     for k,v in self.collect_MLInfo().iteritems():
291     params[k] = v
292 spiga 1.118
293    
294     taskId= str("_".join(str(task['name']).split('_')[:-1]))
295 spiga 1.112
296     Sub_Type = 'Direct'
297     for job in task.jobs:
298 spiga 1.119 jj = job['jobId']
299 spiga 1.112 jobId = ''
300     localId = ''
301     jid = str(job.runningJob['schedulerId'])
302     if common.scheduler.name().upper() == 'CONDOR_G':
303     self.hash = makeCksum(common.work_space.cfgFileName())
304     rb = 'OSG'
305     jobId = str(jj) + '_' + self.hash + '_' + jid
306     common.logger.debug(5,'JobID for ML monitoring is created for CONDOR_G scheduler:'+jobId)
307     elif common.scheduler.name() in ['lsf', 'caf']:
308 spiga 1.118 jobId="https://"+common.scheduler.name()+":/"+jid+"-"+string.replace(str(taskId),"_","-")
309 spiga 1.112 common.logger.debug(5,'JobID for ML monitoring is created for LSF scheduler:'+jobId)
310     rb = common.scheduler.name()
311     localId = jid
312     else:
313     jobId = str(jj) + '_' + str(jid)
314     common.logger.debug(5,'JobID for ML monitoring is created for gLite scheduler'+jobId)
315     rb = str(job.runningJob['service'])
316    
317 spiga 1.116 dlsDest = job['dlsDestination']
318 spiga 1.112 if len(dlsDest) <= 2 :
319     T_SE=string.join(str(dlsDest),",")
320     else :
321     T_SE=str(len(dlsDest))+'_Selected_SE'
322    
323    
324     infos = { 'jobId': jobId, \
325     'sid': jid, \
326     'broker': rb, \
327     'bossId': jj, \
328     'SubmissionType': Sub_Type, \
329     'TargetSE': T_SE, \
330     'localId' : localId}
331    
332     for k,v in infos.iteritems():
333     params[k] = v
334    
335     common.logger.debug(5,'Submission DashBoard report: '+str(params))
336     common.apmon.sendToML(params)
337 nsmirnov 1.1 return
338 spiga 1.112
339