ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Submitter.py
Revision: 1.83
Committed: Fri Jan 4 17:30:56 2008 UTC (17 years, 4 months ago) by slacapra
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_1_0_pre2, CRAB_2_1_0_pre1
Changes since 1.82: +5 -7 lines
Log Message:
Add support for LSF/CAF direct submission
Re-establish a correct inheritance pattern for Scheruled* classes
Start to remove some unneeded try: catch: statement and replace them with appropriate if:then:
Erase any use of cfg_param as a common block (expecially in conjuction with apmon) and replace it with the user of task DB
Several minor cleanup

File Contents

# User Rev Content
1 nsmirnov 1.1 from Actor import *
2 nsmirnov 1.6 from crab_util import *
3 nsmirnov 1.2 import common
4 corvo 1.9 from ApmonIf import ApmonIf
5 slacapra 1.60 #from random import random
6 corvo 1.30 import time
7 slacapra 1.60 from ProgressBar import ProgressBar
8     from TerminalController import TerminalController
9 nsmirnov 1.1
10     class Submitter(Actor):
11 corvo 1.40 def __init__(self, cfg_params, nj_list):
12 nsmirnov 1.1 self.cfg_params = cfg_params
13 nsmirnov 1.4 self.nj_list = nj_list
14 gutsche 1.58
15     if common.scheduler.boss_scheduler_name == 'condor_g':
16     # create hash of cfg file
17     self.hash = makeCksum(common.work_space.cfgFileName())
18     else:
19     self.hash = ''
20 corvo 1.30
21 spiga 1.77 self.UseServer=0
22     try:
23     self.UseServer=int(self.cfg_params['CRAB.server_mode'])
24     except KeyError:
25     pass
26    
27 nsmirnov 1.1 return
28    
29     def run(self):
30 nsmirnov 1.2 """
31 slacapra 1.53 The main method of the class: submit jobs in range self.nj_list
32 nsmirnov 1.2 """
33     common.logger.debug(5, "Submitter::run() called")
34 slacapra 1.24
35 gutsche 1.42 totalCreatedJobs= 0
36 corvo 1.33 start = time.time()
37 slacapra 1.24 for nj in range(common.jobDB.nJobs()):
38 spiga 1.29 if (common.jobDB.status(nj)=='C') or (common.jobDB.status(nj)=='RC'): totalCreatedJobs +=1
39 slacapra 1.24 pass
40    
41     if (totalCreatedJobs==0):
42     common.logger.message("No jobs to be submitted: first create them")
43 slacapra 1.28 return
44 slacapra 1.60
45 gutsche 1.70 # submit pre DashBoard information
46     params = {'jobId':'TaskMeta'}
47    
48 slacapra 1.83 fl = open(common.work_space.shareDir() + '/' + common.apmon.fName, 'r')
49 gutsche 1.70 for i in fl.readlines():
50     val = i.split(':')
51     params[val[0]] = string.strip(val[1])
52     fl.close()
53    
54     common.logger.debug(5,'Submission DashBoard Pre-Submission report: '+str(params))
55    
56 slacapra 1.83 common.apmon.sendToML(params)
57 gutsche 1.70
58 spiga 1.77 # modified to support server mode
59     # The boss declare step is performed here
60     # only if crab is used server mode
61     if (self.UseServer== 9999):
62     if not common.scheduler.taskDeclared( common.taskDB.dict('projectName') ): #os.path.basename(os.path.split(common.work_space.topDir())[0]) ):
63     common.logger.debug(5,'Declaring jobs to BOSS')
64     common.scheduler.declareJob_() #Add for BOSS4
65     else:
66     common.logger.debug(5,'Jobs already declared into BOSS')
67     common.jobDB.save()
68     common.taskDB.save()
69    
70     #########
71 fanzago 1.16 #########
72 nsmirnov 1.2 # Loop over jobs
73 nsmirnov 1.6 njs = 0
74 corvo 1.15 try:
75 slacapra 1.69 list=[]
76 spiga 1.66 list_of_list = []
77 slacapra 1.57 lastBlock=-1
78 spiga 1.66 count = 0
79 corvo 1.17 for nj in self.nj_list:
80 slacapra 1.53 same=0
81 slacapra 1.49 # first check that status of the job is suitable for submission
82 corvo 1.17 st = common.jobDB.status(nj)
83 slacapra 1.69 if st != 'C' and st != 'K' and st != 'A' and st != 'RC':
84 corvo 1.17 long_st = crabJobStatusToString(st)
85 slacapra 1.19 msg = "Job # %d not submitted: status %s"%(nj+1, long_st)
86 slacapra 1.18 common.logger.message(msg)
87 corvo 1.17 continue
88 corvo 1.72
89 slacapra 1.57 currBlock = common.jobDB.block(nj)
90     # SL perform listmatch only if block has changed
91     if (currBlock!=lastBlock):
92 gutsche 1.54 if common.scheduler.boss_scheduler_name != "condor_g" :
93 mcinquil 1.81 ### MATTY: patch for white-black list with the list-mathc in glite ###
94     whiteL = []
95     blackL = []
96 mcinquil 1.82 if self.cfg_params['CRAB.scheduler'].find("glite") != -1:
97     if 'EDG.ce_white_list' in self.cfg_params.keys():
98     #print self.cfg_params['EDG.ce_white_list'].strip().split(",")
99     if self.cfg_params['EDG.ce_white_list'].strip() != "" and self.cfg_params['EDG.ce_white_list'] != None:
100     for ceW in self.cfg_params['EDG.ce_white_list'].strip().split(","):
101     if len(ceW.strip()) > 0 and ceW.strip() != None:
102     whiteL.append(ceW.strip())
103     #print "ADDING white ce = "+str(ceW.strip())
104     if 'EDG.ce_black_list' in self.cfg_params.keys():
105     #print self.cfg_params['EDG.ce_black_list'].strip().split(",")
106     if self.cfg_params['EDG.ce_black_list'].strip() != "" and self.cfg_params['EDG.ce_black_list'] != None:
107     for ceB in self.cfg_params['EDG.ce_black_list'].strip().split(","):
108     if len(ceB.strip()) > 0 and ceB.strip() != None:
109     blackL.append(ceB.strip())
110     #print "ADDING ce = "+str(ceB.strip())
111     match = common.scheduler.listMatch(nj, currBlock, whiteL, blackL)
112 mcinquil 1.81 #######################################################################
113 gutsche 1.54 else :
114     match = "1"
115 slacapra 1.57 lastBlock = currBlock
116 mkirn 1.50 else:
117 slacapra 1.53 common.logger.debug(1,"Sites for job "+str(nj+1)+" the same as previous job")
118     same=1
119 corvo 1.72
120 slacapra 1.49 if match:
121 slacapra 1.53 if not same:
122     common.logger.message("Found "+str(match)+" compatible site(s) for job "+str(nj+1))
123     else:
124     common.logger.debug(1,"Found "+str(match)+" compatible site(s) for job "+str(nj+1))
125 slacapra 1.69 list.append(common.jobDB.bossId(nj))
126 corvo 1.72
127 slacapra 1.69 if nj == self.nj_list[-1]: # check that is not the last job in the list
128     list_of_list.append([currBlock,list])
129     else: # check if next job has same group
130     nextBlock = common.jobDB.block(nj+1)
131     if currBlock != nextBlock : # if not, close this group and reset
132 spiga 1.66 list_of_list.append([currBlock,list])
133 slacapra 1.69 list=[]
134 slacapra 1.49 else:
135 mkirn 1.50 common.logger.message("No compatible site found, will not submit job "+str(nj+1))
136 slacapra 1.49 continue
137 spiga 1.66 count += 1
138 gutsche 1.65 ### Progress Bar indicator, deactivate for debug
139     if not common.logger.debugLevel() :
140     term = TerminalController()
141 corvo 1.72
142 spiga 1.66 for ii in range(len(list_of_list)): # Add loop DS
143 slacapra 1.68 common.logger.debug(1,'Submitting jobs '+str(list_of_list[ii][1]))
144 corvo 1.74 if not common.logger.debugLevel() :
145     try: pbar = ProgressBar(term, 'Submitting '+str(len(list_of_list[ii][1]))+' jobs')
146     except: pbar = None
147 mcinquil 1.78
148 spiga 1.79 jidLista, bjidLista = common.scheduler.submit(list_of_list[ii])
149 slacapra 1.69 bjidLista = map(int, bjidLista) # cast all bjidLista to int
150 mcinquil 1.78
151 corvo 1.74 if not common.logger.debugLevel():
152     if pbar :
153     pbar.update(float(ii+1)/float(len(list_of_list)),'please wait')
154 corvo 1.72
155 corvo 1.67 for jj in bjidLista: # Add loop over SID returned from group submission DS
156 slacapra 1.69 tmpNj = jj - 1
157 mcinquil 1.78
158 corvo 1.67 jid=jidLista[bjidLista.index(jj)]
159 corvo 1.72 common.logger.debug(5,"Submitted job # "+ `(jj)`)
160 slacapra 1.69 common.jobDB.setStatus(tmpNj, 'S')
161     common.jobDB.setJobId(tmpNj, jid)
162 slacapra 1.83 common.jobDB.setTaskId(tmpNj, common.taskDB.dict('taskId'))
163 mcinquil 1.78
164 spiga 1.56 njs += 1
165    
166 slacapra 1.69 ##### DashBoard report #####################
167 spiga 1.80 ## To distinguish if job is direct or through the server
168     if (self.UseServer == 0):
169     Sub_Type = 'Direct'
170     else:
171     Sub_Type = 'Server'
172    
173 slacapra 1.69 try:
174 spiga 1.56 resFlag = 0
175 slacapra 1.69 if st == 'RC': resFlag = 2
176 spiga 1.56 except:
177     pass
178    
179     # OLI: JobID treatment, special for Condor-G scheduler
180     jobId = ''
181     if common.scheduler.boss_scheduler_name == 'condor_g':
182 corvo 1.72 jobId = str(jj) + '_' + self.hash + '_' + jid
183 spiga 1.56 common.logger.debug(5,'JobID for ML monitoring is created for CONDOR_G scheduler:'+jobId)
184     else:
185 corvo 1.72 jobId = str(jj) + '_' + jid
186 spiga 1.56 common.logger.debug(5,'JobID for ML monitoring is created for EDG scheduler'+jobId)
187    
188     if ( jid.find(":") != -1 ) :
189     rb = jid.split(':')[1]
190 slacapra 1.69 rb = rb.replace('//', '')
191 spiga 1.56 else :
192 slacapra 1.69 rb = 'OSG'
193 spiga 1.80
194     if len(common.jobDB.destination(tmpNj)) <= 2 :
195     T_SE=string.join((common.jobDB.destination(tmpNj)),",")
196     else :
197     T_SE=str(len(common.jobDB.destination(tmpNj)))+'_Selected_SE'
198 spiga 1.56 params = {'jobId': jobId, \
199 slacapra 1.69 'sid': jid, \
200     'broker': rb, \
201 corvo 1.72 'bossId': jj, \
202 spiga 1.80 'SubmissionType': Sub_Type, \
203     'TargetSE': T_SE,}
204     common.logger.debug(5,str(params))
205 spiga 1.56
206 slacapra 1.83 fl = open(common.work_space.shareDir() + '/' + common.apmon.fName, 'r')
207 spiga 1.56 for i in fl.readlines():
208     val = i.split(':')
209     params[val[0]] = string.strip(val[1])
210 slacapra 1.69 fl.close()
211 corvo 1.72
212 gutsche 1.65 common.logger.debug(5,'Submission DashBoard report: '+str(params))
213    
214 slacapra 1.83 common.apmon.sendToML(params)
215 slacapra 1.69 pass
216     pass
217 corvo 1.17
218     except:
219 corvo 1.27 exctype, value = sys.exc_info()[:2]
220 corvo 1.30 print "Type: %s Value: %s"%(exctype, value)
221 corvo 1.27 common.logger.message("Submitter::run Exception raised: %s %s"%(exctype, value))
222 corvo 1.17 common.jobDB.save()
223 corvo 1.30
224     stop = time.time()
225 gutsche 1.47 common.logger.debug(1, "Submission Time: "+str(stop - start))
226 slacapra 1.60 common.logger.write("Submission time :"+str(stop - start))
227 corvo 1.17 common.jobDB.save()
228 corvo 1.15
229 nsmirnov 1.6 msg = '\nTotal of %d jobs submitted'%njs
230     if njs != len(self.nj_list) :
231     msg += ' (from %d requested).'%(len(self.nj_list))
232     pass
233     else:
234     msg += '.'
235     pass
236 nsmirnov 1.2 common.logger.message(msg)
237 slacapra 1.73 ## add some more verbose message in case submission is not complete
238     if (njs < len(self.nj_list)):
239     msg = 'Submission performed using the Requirements: \n'
240 slacapra 1.76 msg += common.taskDB.dict("jobtype")+' version: '+common.taskDB.dict("codeVersion")+'\n'
241 slacapra 1.73 try: msg += 'SE White List: '+self.cfg_params['EDG.se_white_list']+'\n'
242     except KeyError: pass
243     try: msg += 'SE Black List: '+self.cfg_params['EDG.se_black_list']+'\n'
244     except KeyError: pass
245     try: msg += 'CE White List: '+self.cfg_params['EDG.ce_white_list']+'\n'
246     except KeyError: pass
247     try: msg += 'CE Black List: '+self.cfg_params['EDG.ce_black_list']+'\n'
248     except KeyError: pass
249 slacapra 1.76 msg += '(Hint: please check if '+common.taskDB.dict("jobtype")+' is available at the Sites)\n'
250 slacapra 1.73
251     common.logger.message(msg)
252    
253 nsmirnov 1.1 return