ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Submitter.py
Revision: 1.86
Committed: Mon Feb 4 14:41:58 2008 UTC (17 years, 2 months ago) by slacapra
Content type: text/x-python
Branch: MAIN
Changes since 1.85: +10 -5 lines
Log Message:
fix to report correctly info o dashboard

File Contents

# User Rev Content
1 nsmirnov 1.1 from Actor import *
2 nsmirnov 1.6 from crab_util import *
3 nsmirnov 1.2 import common
4 corvo 1.9 from ApmonIf import ApmonIf
5 slacapra 1.60 #from random import random
6 corvo 1.30 import time
7 slacapra 1.60 from ProgressBar import ProgressBar
8     from TerminalController import TerminalController
9 nsmirnov 1.1
10     class Submitter(Actor):
11 slacapra 1.84 def __init__(self, cfg_params, parsed_range, val):
12 nsmirnov 1.1 self.cfg_params = cfg_params
13 slacapra 1.84
14     # get user request
15     nsjobs = -1
16     chosenJobsList = None
17     if val:
18     if val=='all':
19     pass
20     elif (type(eval(val)) is int) and eval(val) > 0:
21     # positive number
22     nsjobs = eval(val)
23     # NEW PART # Fabio
24     # put here code for LIST MANAGEMEN
25     elif (type(eval(val)) is tuple)or( type(eval(val)) is int and eval(val)<0 ) :
26     chosenJobsList = parsed_range
27     chosenJobsList = [i-1 for i in chosenJobsList ]
28     nsjobs = len(chosenJobsList)
29     #
30     else:
31     msg = 'Bad submission option <'+str(val)+'>\n'
32     msg += ' Must be an integer or "all"'
33     msg += ' Generic range is not allowed"'
34     raise CrabException(msg)
35     pass
36    
37     common.logger.debug(5,'nsjobs '+str(nsjobs))
38     # total jobs
39     nj_list = []
40     # get the first not already submitted
41     common.logger.debug(5,'Total jobs '+str(common.jobDB.nJobs()))
42     jobSetForSubmission = 0
43     jobSkippedInSubmission = []
44     datasetpath=self.cfg_params['CMSSW.datasetpath']
45    
46     # NEW PART # Fabio
47     # modified to handle list of jobs by the users # Fabio
48     tmp_jList = range(common.jobDB.nJobs())
49     if chosenJobsList != None:
50     tmp_jList = chosenJobsList
51     # build job list
52     from BlackWhiteListParser import BlackWhiteListParser
53     self.blackWhiteListParser = BlackWhiteListParser(self.cfg_params)
54     for nj in tmp_jList:
55     cleanedBlackWhiteList = self.blackWhiteListParser.cleanForBlackWhiteList(common.jobDB.destination(nj)) # More readable # Fabio
56     if (cleanedBlackWhiteList != '') or (datasetpath == "None" ) or (datasetpath == None): ## Matty's fix
57     if (common.jobDB.status(nj) not in ['R','S','K','Y','A','D','Z']):
58     jobSetForSubmission +=1
59     nj_list.append(nj)
60     else:
61     continue
62     else :
63     jobSkippedInSubmission.append(nj+1)
64     #
65     if nsjobs >0 and nsjobs == jobSetForSubmission:
66     break
67     pass
68     del tmp_jList
69     #
70    
71     if nsjobs>jobSetForSubmission:
72     common.logger.message('asking to submit '+str(nsjobs)+' jobs, but only '+str(jobSetForSubmission)+' left: submitting those')
73     if len(jobSkippedInSubmission) > 0 :
74     #print jobSkippedInSubmission
75     #print spanRanges(jobSkippedInSubmission)
76     mess =""
77     for jobs in jobSkippedInSubmission:
78     mess += str(jobs) + ","
79     common.logger.message("Jobs: " +str(mess) + "\n skipped because no sites are hosting this data\n")
80     # submit N from last submitted job
81     common.logger.debug(5,'nj_list '+str(nj_list))
82    
83 gutsche 1.58
84 slacapra 1.85 if common.scheduler.name() == 'CONDOR_G':
85 gutsche 1.58 # create hash of cfg file
86     self.hash = makeCksum(common.work_space.cfgFileName())
87     else:
88     self.hash = ''
89 corvo 1.30
90 slacapra 1.84 self.nj_list = nj_list
91    
92     self.UseServer=int(self.cfg_params.get('CRAB.server_mode',0))
93 spiga 1.77
94 nsmirnov 1.1 return
95    
96     def run(self):
97 nsmirnov 1.2 """
98 slacapra 1.53 The main method of the class: submit jobs in range self.nj_list
99 nsmirnov 1.2 """
100     common.logger.debug(5, "Submitter::run() called")
101 slacapra 1.24
102 gutsche 1.42 totalCreatedJobs= 0
103 corvo 1.33 start = time.time()
104 slacapra 1.24 for nj in range(common.jobDB.nJobs()):
105 spiga 1.29 if (common.jobDB.status(nj)=='C') or (common.jobDB.status(nj)=='RC'): totalCreatedJobs +=1
106 slacapra 1.24 pass
107    
108     if (totalCreatedJobs==0):
109     common.logger.message("No jobs to be submitted: first create them")
110 slacapra 1.28 return
111 slacapra 1.60
112 gutsche 1.70 # submit pre DashBoard information
113     params = {'jobId':'TaskMeta'}
114 slacapra 1.85
115 slacapra 1.83 fl = open(common.work_space.shareDir() + '/' + common.apmon.fName, 'r')
116 slacapra 1.85 print fl
117 gutsche 1.70 for i in fl.readlines():
118     val = i.split(':')
119     params[val[0]] = string.strip(val[1])
120 slacapra 1.85 fl.close()
121 gutsche 1.70
122     common.logger.debug(5,'Submission DashBoard Pre-Submission report: '+str(params))
123    
124 slacapra 1.83 common.apmon.sendToML(params)
125 gutsche 1.70
126 spiga 1.77 # modified to support server mode
127     # The boss declare step is performed here
128     # only if crab is used server mode
129     if (self.UseServer== 9999):
130     if not common.scheduler.taskDeclared( common.taskDB.dict('projectName') ): #os.path.basename(os.path.split(common.work_space.topDir())[0]) ):
131     common.logger.debug(5,'Declaring jobs to BOSS')
132 slacapra 1.85 common.scheduler.declare() #Add for BOSS4
133 spiga 1.77 else:
134     common.logger.debug(5,'Jobs already declared into BOSS')
135     common.jobDB.save()
136     common.taskDB.save()
137    
138     #########
139 fanzago 1.16 #########
140 nsmirnov 1.2 # Loop over jobs
141 nsmirnov 1.6 njs = 0
142 corvo 1.15 try:
143 slacapra 1.69 list=[]
144 spiga 1.66 list_of_list = []
145 slacapra 1.57 lastBlock=-1
146 spiga 1.66 count = 0
147 corvo 1.17 for nj in self.nj_list:
148 slacapra 1.53 same=0
149 slacapra 1.49 # first check that status of the job is suitable for submission
150 corvo 1.17 st = common.jobDB.status(nj)
151 slacapra 1.69 if st != 'C' and st != 'K' and st != 'A' and st != 'RC':
152 corvo 1.17 long_st = crabJobStatusToString(st)
153 slacapra 1.19 msg = "Job # %d not submitted: status %s"%(nj+1, long_st)
154 slacapra 1.18 common.logger.message(msg)
155 corvo 1.17 continue
156 corvo 1.72
157 slacapra 1.57 currBlock = common.jobDB.block(nj)
158     # SL perform listmatch only if block has changed
159     if (currBlock!=lastBlock):
160 slacapra 1.85 if common.scheduler.name() != "CONDOR_G" :
161     ### SL TODO to be moved in blackWhiteListParser
162 mcinquil 1.81 ### MATTY: patch for white-black list with the list-mathc in glite ###
163     whiteL = []
164     blackL = []
165 mcinquil 1.82 if self.cfg_params['CRAB.scheduler'].find("glite") != -1:
166     if 'EDG.ce_white_list' in self.cfg_params.keys():
167     #print self.cfg_params['EDG.ce_white_list'].strip().split(",")
168     if self.cfg_params['EDG.ce_white_list'].strip() != "" and self.cfg_params['EDG.ce_white_list'] != None:
169     for ceW in self.cfg_params['EDG.ce_white_list'].strip().split(","):
170     if len(ceW.strip()) > 0 and ceW.strip() != None:
171     whiteL.append(ceW.strip())
172     #print "ADDING white ce = "+str(ceW.strip())
173     if 'EDG.ce_black_list' in self.cfg_params.keys():
174     #print self.cfg_params['EDG.ce_black_list'].strip().split(",")
175     if self.cfg_params['EDG.ce_black_list'].strip() != "" and self.cfg_params['EDG.ce_black_list'] != None:
176     for ceB in self.cfg_params['EDG.ce_black_list'].strip().split(","):
177     if len(ceB.strip()) > 0 and ceB.strip() != None:
178     blackL.append(ceB.strip())
179     #print "ADDING ce = "+str(ceB.strip())
180     match = common.scheduler.listMatch(nj, currBlock, whiteL, blackL)
181 mcinquil 1.81 #######################################################################
182 gutsche 1.54 else :
183     match = "1"
184 slacapra 1.57 lastBlock = currBlock
185 mkirn 1.50 else:
186 slacapra 1.53 common.logger.debug(1,"Sites for job "+str(nj+1)+" the same as previous job")
187     same=1
188 corvo 1.72
189 slacapra 1.49 if match:
190 slacapra 1.53 if not same:
191     common.logger.message("Found "+str(match)+" compatible site(s) for job "+str(nj+1))
192     else:
193     common.logger.debug(1,"Found "+str(match)+" compatible site(s) for job "+str(nj+1))
194 slacapra 1.69 list.append(common.jobDB.bossId(nj))
195 corvo 1.72
196 slacapra 1.69 if nj == self.nj_list[-1]: # check that is not the last job in the list
197     list_of_list.append([currBlock,list])
198     else: # check if next job has same group
199     nextBlock = common.jobDB.block(nj+1)
200     if currBlock != nextBlock : # if not, close this group and reset
201 spiga 1.66 list_of_list.append([currBlock,list])
202 slacapra 1.69 list=[]
203 slacapra 1.49 else:
204 mkirn 1.50 common.logger.message("No compatible site found, will not submit job "+str(nj+1))
205 slacapra 1.49 continue
206 spiga 1.66 count += 1
207 gutsche 1.65 ### Progress Bar indicator, deactivate for debug
208     if not common.logger.debugLevel() :
209     term = TerminalController()
210 corvo 1.72
211 spiga 1.66 for ii in range(len(list_of_list)): # Add loop DS
212 slacapra 1.68 common.logger.debug(1,'Submitting jobs '+str(list_of_list[ii][1]))
213 corvo 1.74 if not common.logger.debugLevel() :
214     try: pbar = ProgressBar(term, 'Submitting '+str(len(list_of_list[ii][1]))+' jobs')
215     except: pbar = None
216 mcinquil 1.78
217 spiga 1.79 jidLista, bjidLista = common.scheduler.submit(list_of_list[ii])
218 slacapra 1.69 bjidLista = map(int, bjidLista) # cast all bjidLista to int
219 mcinquil 1.78
220 corvo 1.74 if not common.logger.debugLevel():
221     if pbar :
222     pbar.update(float(ii+1)/float(len(list_of_list)),'please wait')
223 corvo 1.72
224 corvo 1.67 for jj in bjidLista: # Add loop over SID returned from group submission DS
225 slacapra 1.69 tmpNj = jj - 1
226 mcinquil 1.78
227 corvo 1.67 jid=jidLista[bjidLista.index(jj)]
228 corvo 1.72 common.logger.debug(5,"Submitted job # "+ `(jj)`)
229 slacapra 1.69 common.jobDB.setStatus(tmpNj, 'S')
230     common.jobDB.setJobId(tmpNj, jid)
231 slacapra 1.83 common.jobDB.setTaskId(tmpNj, common.taskDB.dict('taskId'))
232 mcinquil 1.78
233 spiga 1.56 njs += 1
234    
235 slacapra 1.69 ##### DashBoard report #####################
236 spiga 1.80 ## To distinguish if job is direct or through the server
237     if (self.UseServer == 0):
238     Sub_Type = 'Direct'
239     else:
240     Sub_Type = 'Server'
241    
242 slacapra 1.69 try:
243 spiga 1.56 resFlag = 0
244 slacapra 1.69 if st == 'RC': resFlag = 2
245 spiga 1.56 except:
246     pass
247    
248     # OLI: JobID treatment, special for Condor-G scheduler
249     jobId = ''
250 slacapra 1.86 localId = ''
251 slacapra 1.85 if common.scheduler.name() == 'CONDOR_G':
252 slacapra 1.86 rb = 'OSG'
253 corvo 1.72 jobId = str(jj) + '_' + self.hash + '_' + jid
254 spiga 1.56 common.logger.debug(5,'JobID for ML monitoring is created for CONDOR_G scheduler:'+jobId)
255 slacapra 1.86 elif common.scheduler.name() == 'lsf' or common.scheduler.name() == 'caf':
256     jobId="https://"+common.scheduler.name()+":/"+jid+"-"+common.taskDB.dict('taskId')+"-"+str(jj)
257     common.logger.debug(5,'JobID for ML monitoring is created for LSF scheduler:'+jobId)
258     rb = common.scheduler.name()
259     localId = jid
260 spiga 1.56 else:
261 corvo 1.72 jobId = str(jj) + '_' + jid
262 spiga 1.56 common.logger.debug(5,'JobID for ML monitoring is created for EDG scheduler'+jobId)
263     rb = jid.split(':')[1]
264 slacapra 1.69 rb = rb.replace('//', '')
265 spiga 1.80
266     if len(common.jobDB.destination(tmpNj)) <= 2 :
267     T_SE=string.join((common.jobDB.destination(tmpNj)),",")
268     else :
269     T_SE=str(len(common.jobDB.destination(tmpNj)))+'_Selected_SE'
270 slacapra 1.86
271 spiga 1.56 params = {'jobId': jobId, \
272 slacapra 1.69 'sid': jid, \
273     'broker': rb, \
274 corvo 1.72 'bossId': jj, \
275 spiga 1.80 'SubmissionType': Sub_Type, \
276 slacapra 1.86 'TargetSE': T_SE, \
277     'localId' : localId}
278 spiga 1.80 common.logger.debug(5,str(params))
279 spiga 1.56
280 slacapra 1.83 fl = open(common.work_space.shareDir() + '/' + common.apmon.fName, 'r')
281 spiga 1.56 for i in fl.readlines():
282     val = i.split(':')
283     params[val[0]] = string.strip(val[1])
284 slacapra 1.69 fl.close()
285 corvo 1.72
286 gutsche 1.65 common.logger.debug(5,'Submission DashBoard report: '+str(params))
287    
288 slacapra 1.83 common.apmon.sendToML(params)
289 slacapra 1.69 pass
290     pass
291 corvo 1.17
292     except:
293 corvo 1.27 exctype, value = sys.exc_info()[:2]
294 corvo 1.30 print "Type: %s Value: %s"%(exctype, value)
295 corvo 1.27 common.logger.message("Submitter::run Exception raised: %s %s"%(exctype, value))
296 corvo 1.17 common.jobDB.save()
297 corvo 1.30
298     stop = time.time()
299 gutsche 1.47 common.logger.debug(1, "Submission Time: "+str(stop - start))
300 slacapra 1.60 common.logger.write("Submission time :"+str(stop - start))
301 corvo 1.17 common.jobDB.save()
302 corvo 1.15
303 nsmirnov 1.6 msg = '\nTotal of %d jobs submitted'%njs
304     if njs != len(self.nj_list) :
305     msg += ' (from %d requested).'%(len(self.nj_list))
306     pass
307     else:
308     msg += '.'
309     pass
310 nsmirnov 1.2 common.logger.message(msg)
311 slacapra 1.73 ## add some more verbose message in case submission is not complete
312     if (njs < len(self.nj_list)):
313     msg = 'Submission performed using the Requirements: \n'
314 slacapra 1.76 msg += common.taskDB.dict("jobtype")+' version: '+common.taskDB.dict("codeVersion")+'\n'
315 slacapra 1.85 if self.cfg_params.has_key('EDG.se_white_list'):
316     msg += 'SE White List: '+self.cfg_params['EDG.se_white_list']+'\n'
317     if self.cfg_params.has_key('EDG.se_black_list'):
318     msg += 'SE Black List: '+self.cfg_params['EDG.se_black_list']+'\n'
319     if self.cfg_params.has_key('EDG.ce_white_list'):
320     msg += 'CE White List: '+self.cfg_params['EDG.ce_white_list']+'\n'
321     if self.cfg_params.has_key('EDG.ce_black_list'):
322     msg += 'CE Black List: '+self.cfg_params['EDG.ce_black_list']+'\n'
323 slacapra 1.76 msg += '(Hint: please check if '+common.taskDB.dict("jobtype")+' is available at the Sites)\n'
324 slacapra 1.73
325     common.logger.message(msg)
326    
327 nsmirnov 1.1 return