ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Submitter.py
Revision: 1.88
Committed: Mon Feb 4 17:59:37 2008 UTC (17 years, 2 months ago) by slacapra
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_1_0_pre4
Changes since 1.87: +1 -1 lines
Log Message:
fixing problem in monitoring with dashbard

File Contents

# User Rev Content
1 nsmirnov 1.1 from Actor import *
2 nsmirnov 1.6 from crab_util import *
3 nsmirnov 1.2 import common
4 corvo 1.9 from ApmonIf import ApmonIf
5 slacapra 1.60 #from random import random
6 corvo 1.30 import time
7 slacapra 1.60 from ProgressBar import ProgressBar
8     from TerminalController import TerminalController
9 nsmirnov 1.1
10     class Submitter(Actor):
11 slacapra 1.84 def __init__(self, cfg_params, parsed_range, val):
12 nsmirnov 1.1 self.cfg_params = cfg_params
13 slacapra 1.84
14     # get user request
15     nsjobs = -1
16     chosenJobsList = None
17     if val:
18     if val=='all':
19     pass
20     elif (type(eval(val)) is int) and eval(val) > 0:
21     # positive number
22     nsjobs = eval(val)
23     # NEW PART # Fabio
24     # put here code for LIST MANAGEMEN
25     elif (type(eval(val)) is tuple)or( type(eval(val)) is int and eval(val)<0 ) :
26     chosenJobsList = parsed_range
27     chosenJobsList = [i-1 for i in chosenJobsList ]
28     nsjobs = len(chosenJobsList)
29     #
30     else:
31     msg = 'Bad submission option <'+str(val)+'>\n'
32     msg += ' Must be an integer or "all"'
33     msg += ' Generic range is not allowed"'
34     raise CrabException(msg)
35     pass
36    
37     common.logger.debug(5,'nsjobs '+str(nsjobs))
38     # total jobs
39     nj_list = []
40     # get the first not already submitted
41     common.logger.debug(5,'Total jobs '+str(common.jobDB.nJobs()))
42     jobSetForSubmission = 0
43     jobSkippedInSubmission = []
44     datasetpath=self.cfg_params['CMSSW.datasetpath']
45    
46     # NEW PART # Fabio
47     # modified to handle list of jobs by the users # Fabio
48     tmp_jList = range(common.jobDB.nJobs())
49     if chosenJobsList != None:
50     tmp_jList = chosenJobsList
51     # build job list
52     from BlackWhiteListParser import BlackWhiteListParser
53     self.blackWhiteListParser = BlackWhiteListParser(self.cfg_params)
54     for nj in tmp_jList:
55     cleanedBlackWhiteList = self.blackWhiteListParser.cleanForBlackWhiteList(common.jobDB.destination(nj)) # More readable # Fabio
56     if (cleanedBlackWhiteList != '') or (datasetpath == "None" ) or (datasetpath == None): ## Matty's fix
57     if (common.jobDB.status(nj) not in ['R','S','K','Y','A','D','Z']):
58     jobSetForSubmission +=1
59     nj_list.append(nj)
60     else:
61     continue
62     else :
63     jobSkippedInSubmission.append(nj+1)
64     #
65     if nsjobs >0 and nsjobs == jobSetForSubmission:
66     break
67     pass
68     del tmp_jList
69     #
70    
71     if nsjobs>jobSetForSubmission:
72     common.logger.message('asking to submit '+str(nsjobs)+' jobs, but only '+str(jobSetForSubmission)+' left: submitting those')
73     if len(jobSkippedInSubmission) > 0 :
74     #print jobSkippedInSubmission
75     #print spanRanges(jobSkippedInSubmission)
76     mess =""
77     for jobs in jobSkippedInSubmission:
78     mess += str(jobs) + ","
79     common.logger.message("Jobs: " +str(mess) + "\n skipped because no sites are hosting this data\n")
80     # submit N from last submitted job
81     common.logger.debug(5,'nj_list '+str(nj_list))
82    
83 gutsche 1.58
84 slacapra 1.85 if common.scheduler.name() == 'CONDOR_G':
85 gutsche 1.58 # create hash of cfg file
86     self.hash = makeCksum(common.work_space.cfgFileName())
87     else:
88     self.hash = ''
89 corvo 1.30
90 slacapra 1.84 self.nj_list = nj_list
91    
92     self.UseServer=int(self.cfg_params.get('CRAB.server_mode',0))
93 spiga 1.77
94 nsmirnov 1.1 return
95    
96     def run(self):
97 nsmirnov 1.2 """
98 slacapra 1.53 The main method of the class: submit jobs in range self.nj_list
99 nsmirnov 1.2 """
100     common.logger.debug(5, "Submitter::run() called")
101 slacapra 1.24
102 gutsche 1.42 totalCreatedJobs= 0
103 corvo 1.33 start = time.time()
104 slacapra 1.24 for nj in range(common.jobDB.nJobs()):
105 spiga 1.29 if (common.jobDB.status(nj)=='C') or (common.jobDB.status(nj)=='RC'): totalCreatedJobs +=1
106 slacapra 1.24 pass
107    
108     if (totalCreatedJobs==0):
109     common.logger.message("No jobs to be submitted: first create them")
110 slacapra 1.28 return
111 slacapra 1.60
112 gutsche 1.70 # submit pre DashBoard information
113     params = {'jobId':'TaskMeta'}
114 slacapra 1.85
115 slacapra 1.83 fl = open(common.work_space.shareDir() + '/' + common.apmon.fName, 'r')
116 gutsche 1.70 for i in fl.readlines():
117 slacapra 1.87 key, val = i.split(':')
118     params[key] = string.strip(val)
119 slacapra 1.85 fl.close()
120 gutsche 1.70
121     common.logger.debug(5,'Submission DashBoard Pre-Submission report: '+str(params))
122    
123 slacapra 1.83 common.apmon.sendToML(params)
124 gutsche 1.70
125 spiga 1.77 # modified to support server mode
126     # The boss declare step is performed here
127     # only if crab is used server mode
128     if (self.UseServer== 9999):
129     if not common.scheduler.taskDeclared( common.taskDB.dict('projectName') ): #os.path.basename(os.path.split(common.work_space.topDir())[0]) ):
130     common.logger.debug(5,'Declaring jobs to BOSS')
131 slacapra 1.85 common.scheduler.declare() #Add for BOSS4
132 spiga 1.77 else:
133     common.logger.debug(5,'Jobs already declared into BOSS')
134     common.jobDB.save()
135     common.taskDB.save()
136    
137     #########
138 fanzago 1.16 #########
139 nsmirnov 1.2 # Loop over jobs
140 nsmirnov 1.6 njs = 0
141 corvo 1.15 try:
142 slacapra 1.69 list=[]
143 spiga 1.66 list_of_list = []
144 slacapra 1.57 lastBlock=-1
145 spiga 1.66 count = 0
146 corvo 1.17 for nj in self.nj_list:
147 slacapra 1.53 same=0
148 slacapra 1.49 # first check that status of the job is suitable for submission
149 corvo 1.17 st = common.jobDB.status(nj)
150 slacapra 1.69 if st != 'C' and st != 'K' and st != 'A' and st != 'RC':
151 corvo 1.17 long_st = crabJobStatusToString(st)
152 slacapra 1.19 msg = "Job # %d not submitted: status %s"%(nj+1, long_st)
153 slacapra 1.18 common.logger.message(msg)
154 corvo 1.17 continue
155 corvo 1.72
156 slacapra 1.57 currBlock = common.jobDB.block(nj)
157     # SL perform listmatch only if block has changed
158     if (currBlock!=lastBlock):
159 slacapra 1.85 if common.scheduler.name() != "CONDOR_G" :
160     ### SL TODO to be moved in blackWhiteListParser
161 mcinquil 1.81 ### MATTY: patch for white-black list with the list-mathc in glite ###
162     whiteL = []
163     blackL = []
164 mcinquil 1.82 if self.cfg_params['CRAB.scheduler'].find("glite") != -1:
165     if 'EDG.ce_white_list' in self.cfg_params.keys():
166     #print self.cfg_params['EDG.ce_white_list'].strip().split(",")
167     if self.cfg_params['EDG.ce_white_list'].strip() != "" and self.cfg_params['EDG.ce_white_list'] != None:
168     for ceW in self.cfg_params['EDG.ce_white_list'].strip().split(","):
169     if len(ceW.strip()) > 0 and ceW.strip() != None:
170     whiteL.append(ceW.strip())
171     #print "ADDING white ce = "+str(ceW.strip())
172     if 'EDG.ce_black_list' in self.cfg_params.keys():
173     #print self.cfg_params['EDG.ce_black_list'].strip().split(",")
174     if self.cfg_params['EDG.ce_black_list'].strip() != "" and self.cfg_params['EDG.ce_black_list'] != None:
175     for ceB in self.cfg_params['EDG.ce_black_list'].strip().split(","):
176     if len(ceB.strip()) > 0 and ceB.strip() != None:
177     blackL.append(ceB.strip())
178     #print "ADDING ce = "+str(ceB.strip())
179     match = common.scheduler.listMatch(nj, currBlock, whiteL, blackL)
180 mcinquil 1.81 #######################################################################
181 gutsche 1.54 else :
182     match = "1"
183 slacapra 1.57 lastBlock = currBlock
184 mkirn 1.50 else:
185 slacapra 1.53 common.logger.debug(1,"Sites for job "+str(nj+1)+" the same as previous job")
186     same=1
187 corvo 1.72
188 slacapra 1.49 if match:
189 slacapra 1.53 if not same:
190     common.logger.message("Found "+str(match)+" compatible site(s) for job "+str(nj+1))
191     else:
192     common.logger.debug(1,"Found "+str(match)+" compatible site(s) for job "+str(nj+1))
193 slacapra 1.69 list.append(common.jobDB.bossId(nj))
194 corvo 1.72
195 slacapra 1.69 if nj == self.nj_list[-1]: # check that is not the last job in the list
196     list_of_list.append([currBlock,list])
197     else: # check if next job has same group
198     nextBlock = common.jobDB.block(nj+1)
199     if currBlock != nextBlock : # if not, close this group and reset
200 spiga 1.66 list_of_list.append([currBlock,list])
201 slacapra 1.69 list=[]
202 slacapra 1.49 else:
203 mkirn 1.50 common.logger.message("No compatible site found, will not submit job "+str(nj+1))
204 slacapra 1.49 continue
205 spiga 1.66 count += 1
206 gutsche 1.65 ### Progress Bar indicator, deactivate for debug
207     if not common.logger.debugLevel() :
208     term = TerminalController()
209 corvo 1.72
210 spiga 1.66 for ii in range(len(list_of_list)): # Add loop DS
211 slacapra 1.68 common.logger.debug(1,'Submitting jobs '+str(list_of_list[ii][1]))
212 corvo 1.74 if not common.logger.debugLevel() :
213     try: pbar = ProgressBar(term, 'Submitting '+str(len(list_of_list[ii][1]))+' jobs')
214     except: pbar = None
215 mcinquil 1.78
216 spiga 1.79 jidLista, bjidLista = common.scheduler.submit(list_of_list[ii])
217 slacapra 1.69 bjidLista = map(int, bjidLista) # cast all bjidLista to int
218 mcinquil 1.78
219 corvo 1.74 if not common.logger.debugLevel():
220     if pbar :
221     pbar.update(float(ii+1)/float(len(list_of_list)),'please wait')
222 corvo 1.72
223 corvo 1.67 for jj in bjidLista: # Add loop over SID returned from group submission DS
224 slacapra 1.69 tmpNj = jj - 1
225 mcinquil 1.78
226 corvo 1.67 jid=jidLista[bjidLista.index(jj)]
227 corvo 1.72 common.logger.debug(5,"Submitted job # "+ `(jj)`)
228 slacapra 1.69 common.jobDB.setStatus(tmpNj, 'S')
229     common.jobDB.setJobId(tmpNj, jid)
230 slacapra 1.83 common.jobDB.setTaskId(tmpNj, common.taskDB.dict('taskId'))
231 mcinquil 1.78
232 spiga 1.56 njs += 1
233    
234 slacapra 1.69 ##### DashBoard report #####################
235 spiga 1.80 ## To distinguish if job is direct or through the server
236     if (self.UseServer == 0):
237     Sub_Type = 'Direct'
238     else:
239     Sub_Type = 'Server'
240    
241 slacapra 1.69 try:
242 spiga 1.56 resFlag = 0
243 slacapra 1.69 if st == 'RC': resFlag = 2
244 spiga 1.56 except:
245     pass
246    
247     # OLI: JobID treatment, special for Condor-G scheduler
248     jobId = ''
249 slacapra 1.86 localId = ''
250 slacapra 1.85 if common.scheduler.name() == 'CONDOR_G':
251 slacapra 1.86 rb = 'OSG'
252 corvo 1.72 jobId = str(jj) + '_' + self.hash + '_' + jid
253 spiga 1.56 common.logger.debug(5,'JobID for ML monitoring is created for CONDOR_G scheduler:'+jobId)
254 slacapra 1.86 elif common.scheduler.name() == 'lsf' or common.scheduler.name() == 'caf':
255 slacapra 1.88 jobId="https://"+common.scheduler.name()+":/"+jid+"-"+string.replace(common.taskDB.dict('taskId'),"_","-")+"-"+str(jj)
256 slacapra 1.86 common.logger.debug(5,'JobID for ML monitoring is created for LSF scheduler:'+jobId)
257     rb = common.scheduler.name()
258     localId = jid
259 spiga 1.56 else:
260 corvo 1.72 jobId = str(jj) + '_' + jid
261 spiga 1.56 common.logger.debug(5,'JobID for ML monitoring is created for EDG scheduler'+jobId)
262     rb = jid.split(':')[1]
263 slacapra 1.69 rb = rb.replace('//', '')
264 spiga 1.80
265     if len(common.jobDB.destination(tmpNj)) <= 2 :
266     T_SE=string.join((common.jobDB.destination(tmpNj)),",")
267     else :
268     T_SE=str(len(common.jobDB.destination(tmpNj)))+'_Selected_SE'
269 slacapra 1.86
270 spiga 1.56 params = {'jobId': jobId, \
271 slacapra 1.69 'sid': jid, \
272     'broker': rb, \
273 corvo 1.72 'bossId': jj, \
274 spiga 1.80 'SubmissionType': Sub_Type, \
275 slacapra 1.86 'TargetSE': T_SE, \
276     'localId' : localId}
277 spiga 1.80 common.logger.debug(5,str(params))
278 spiga 1.56
279 slacapra 1.83 fl = open(common.work_space.shareDir() + '/' + common.apmon.fName, 'r')
280 spiga 1.56 for i in fl.readlines():
281 slacapra 1.87 key, val = i.split(':')
282     params[key] = string.strip(val)
283 slacapra 1.69 fl.close()
284 corvo 1.72
285 gutsche 1.65 common.logger.debug(5,'Submission DashBoard report: '+str(params))
286    
287 slacapra 1.83 common.apmon.sendToML(params)
288 slacapra 1.69 pass
289     pass
290 corvo 1.17
291     except:
292 corvo 1.27 exctype, value = sys.exc_info()[:2]
293 corvo 1.30 print "Type: %s Value: %s"%(exctype, value)
294 corvo 1.27 common.logger.message("Submitter::run Exception raised: %s %s"%(exctype, value))
295 corvo 1.17 common.jobDB.save()
296 corvo 1.30
297     stop = time.time()
298 gutsche 1.47 common.logger.debug(1, "Submission Time: "+str(stop - start))
299 slacapra 1.60 common.logger.write("Submission time :"+str(stop - start))
300 corvo 1.17 common.jobDB.save()
301 corvo 1.15
302 nsmirnov 1.6 msg = '\nTotal of %d jobs submitted'%njs
303     if njs != len(self.nj_list) :
304     msg += ' (from %d requested).'%(len(self.nj_list))
305     pass
306     else:
307     msg += '.'
308     pass
309 nsmirnov 1.2 common.logger.message(msg)
310 slacapra 1.73 ## add some more verbose message in case submission is not complete
311     if (njs < len(self.nj_list)):
312     msg = 'Submission performed using the Requirements: \n'
313 slacapra 1.76 msg += common.taskDB.dict("jobtype")+' version: '+common.taskDB.dict("codeVersion")+'\n'
314 slacapra 1.85 if self.cfg_params.has_key('EDG.se_white_list'):
315     msg += 'SE White List: '+self.cfg_params['EDG.se_white_list']+'\n'
316     if self.cfg_params.has_key('EDG.se_black_list'):
317     msg += 'SE Black List: '+self.cfg_params['EDG.se_black_list']+'\n'
318     if self.cfg_params.has_key('EDG.ce_white_list'):
319     msg += 'CE White List: '+self.cfg_params['EDG.ce_white_list']+'\n'
320     if self.cfg_params.has_key('EDG.ce_black_list'):
321     msg += 'CE Black List: '+self.cfg_params['EDG.ce_black_list']+'\n'
322 slacapra 1.76 msg += '(Hint: please check if '+common.taskDB.dict("jobtype")+' is available at the Sites)\n'
323 slacapra 1.73
324     common.logger.message(msg)
325    
326 nsmirnov 1.1 return