ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Submitter.py
Revision: 1.77
Committed: Mon May 28 15:32:23 2007 UTC (17 years, 11 months ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_1_5_3_pre1, CRAB_2_0_0_pre3, CRAB_2_0_0_pre2, CRAB_2_0_0_pre1
Changes since 1.76: +19 -0 lines
Log Message:
just a minor temporary workaround to support server_mode = 1 [if servermode =1--> move boss declaration at creation level to the submission one]

File Contents

# User Rev Content
1 nsmirnov 1.1 from Actor import *
2 nsmirnov 1.6 from crab_util import *
3 nsmirnov 1.2 import common
4 corvo 1.9 from ApmonIf import ApmonIf
5 spiga 1.10 import Statistic
6 slacapra 1.60 #from random import random
7 corvo 1.30 import time
8 slacapra 1.60 from ProgressBar import ProgressBar
9     from TerminalController import TerminalController
10 nsmirnov 1.1
11     class Submitter(Actor):
12 corvo 1.40 def __init__(self, cfg_params, nj_list):
13 nsmirnov 1.1 self.cfg_params = cfg_params
14 nsmirnov 1.4 self.nj_list = nj_list
15 gutsche 1.58
16     if common.scheduler.boss_scheduler_name == 'condor_g':
17     # create hash of cfg file
18     self.hash = makeCksum(common.work_space.cfgFileName())
19     else:
20     self.hash = ''
21 corvo 1.30
22 spiga 1.77 self.UseServer=0
23     try:
24     self.UseServer=int(self.cfg_params['CRAB.server_mode'])
25     except KeyError:
26     pass
27    
28 nsmirnov 1.1 return
29    
30     def run(self):
31 nsmirnov 1.2 """
32 slacapra 1.53 The main method of the class: submit jobs in range self.nj_list
33 nsmirnov 1.2 """
34     common.logger.debug(5, "Submitter::run() called")
35 slacapra 1.24
36 gutsche 1.42 totalCreatedJobs= 0
37 corvo 1.33 start = time.time()
38 slacapra 1.24 for nj in range(common.jobDB.nJobs()):
39 spiga 1.29 if (common.jobDB.status(nj)=='C') or (common.jobDB.status(nj)=='RC'): totalCreatedJobs +=1
40 slacapra 1.24 pass
41    
42     if (totalCreatedJobs==0):
43     common.logger.message("No jobs to be submitted: first create them")
44 slacapra 1.28 return
45 slacapra 1.60
46 gutsche 1.70 # submit pre DashBoard information
47     params = {'jobId':'TaskMeta'}
48    
49     fl = open(common.work_space.shareDir() + '/' + self.cfg_params['apmon'].fName, 'r')
50     for i in fl.readlines():
51     val = i.split(':')
52     params[val[0]] = string.strip(val[1])
53     fl.close()
54    
55     common.logger.debug(5,'Submission DashBoard Pre-Submission report: '+str(params))
56    
57     self.cfg_params['apmon'].sendToML(params)
58    
59 spiga 1.77 # modified to support server mode
60     # The boss declare step is performed here
61     # only if crab is used server mode
62     if (self.UseServer== 9999):
63     if not common.scheduler.taskDeclared( common.taskDB.dict('projectName') ): #os.path.basename(os.path.split(common.work_space.topDir())[0]) ):
64     common.logger.debug(5,'Declaring jobs to BOSS')
65     common.scheduler.declareJob_() #Add for BOSS4
66     else:
67     common.logger.debug(5,'Jobs already declared into BOSS')
68     common.jobDB.save()
69     common.taskDB.save()
70    
71     #########
72 fanzago 1.16 #########
73 nsmirnov 1.2 # Loop over jobs
74 nsmirnov 1.6 njs = 0
75 corvo 1.15 try:
76 slacapra 1.69 list=[]
77 spiga 1.66 list_of_list = []
78 slacapra 1.57 lastBlock=-1
79 spiga 1.66 count = 0
80 corvo 1.17 for nj in self.nj_list:
81 slacapra 1.53 same=0
82 slacapra 1.49 # first check that status of the job is suitable for submission
83 corvo 1.17 st = common.jobDB.status(nj)
84 slacapra 1.69 if st != 'C' and st != 'K' and st != 'A' and st != 'RC':
85 corvo 1.17 long_st = crabJobStatusToString(st)
86 slacapra 1.19 msg = "Job # %d not submitted: status %s"%(nj+1, long_st)
87 slacapra 1.18 common.logger.message(msg)
88 corvo 1.17 continue
89 corvo 1.72
90 slacapra 1.57 currBlock = common.jobDB.block(nj)
91     # SL perform listmatch only if block has changed
92     if (currBlock!=lastBlock):
93 gutsche 1.54 if common.scheduler.boss_scheduler_name != "condor_g" :
94 slacapra 1.57 match = common.scheduler.listMatch(nj, currBlock)
95 gutsche 1.54 else :
96     match = "1"
97 slacapra 1.57 lastBlock = currBlock
98 mkirn 1.50 else:
99 slacapra 1.53 common.logger.debug(1,"Sites for job "+str(nj+1)+" the same as previous job")
100     same=1
101 corvo 1.72
102 slacapra 1.49 if match:
103 slacapra 1.53 if not same:
104     common.logger.message("Found "+str(match)+" compatible site(s) for job "+str(nj+1))
105     else:
106     common.logger.debug(1,"Found "+str(match)+" compatible site(s) for job "+str(nj+1))
107 slacapra 1.69 list.append(common.jobDB.bossId(nj))
108 corvo 1.72
109 slacapra 1.69 if nj == self.nj_list[-1]: # check that is not the last job in the list
110     list_of_list.append([currBlock,list])
111     else: # check if next job has same group
112     nextBlock = common.jobDB.block(nj+1)
113     if currBlock != nextBlock : # if not, close this group and reset
114 spiga 1.66 list_of_list.append([currBlock,list])
115 slacapra 1.69 list=[]
116 slacapra 1.49 else:
117 mkirn 1.50 common.logger.message("No compatible site found, will not submit job "+str(nj+1))
118 slacapra 1.49 continue
119 spiga 1.66 count += 1
120 gutsche 1.65 ### Progress Bar indicator, deactivate for debug
121     if not common.logger.debugLevel() :
122     term = TerminalController()
123 corvo 1.72
124 spiga 1.66 for ii in range(len(list_of_list)): # Add loop DS
125 slacapra 1.68 common.logger.debug(1,'Submitting jobs '+str(list_of_list[ii][1]))
126 corvo 1.74 if not common.logger.debugLevel() :
127     try: pbar = ProgressBar(term, 'Submitting '+str(len(list_of_list[ii][1]))+' jobs')
128     except: pbar = None
129 corvo 1.72
130 corvo 1.67 jidLista, bjidLista = common.scheduler.submit(list_of_list[ii])
131 slacapra 1.69 bjidLista = map(int, bjidLista) # cast all bjidLista to int
132 corvo 1.72
133 corvo 1.74 if not common.logger.debugLevel():
134     if pbar :
135     pbar.update(float(ii+1)/float(len(list_of_list)),'please wait')
136 corvo 1.72
137 corvo 1.67 for jj in bjidLista: # Add loop over SID returned from group submission DS
138 slacapra 1.69 tmpNj = jj - 1
139 corvo 1.67 jid=jidLista[bjidLista.index(jj)]
140 corvo 1.72 common.logger.debug(5,"Submitted job # "+ `(jj)`)
141 slacapra 1.69 common.jobDB.setStatus(tmpNj, 'S')
142     common.jobDB.setJobId(tmpNj, jid)
143     common.jobDB.setTaskId(tmpNj, self.cfg_params['taskId'])
144 spiga 1.56 njs += 1
145    
146 slacapra 1.69 ##### DashBoard report #####################
147     try:
148 spiga 1.56 resFlag = 0
149 slacapra 1.69 if st == 'RC': resFlag = 2
150 spiga 1.71 Statistic.Monitor('submit',resFlag,jid,'-----','dest')
151 spiga 1.56 except:
152     pass
153    
154     # OLI: JobID treatment, special for Condor-G scheduler
155     jobId = ''
156     if common.scheduler.boss_scheduler_name == 'condor_g':
157 corvo 1.72 jobId = str(jj) + '_' + self.hash + '_' + jid
158 spiga 1.56 common.logger.debug(5,'JobID for ML monitoring is created for CONDOR_G scheduler:'+jobId)
159     else:
160 corvo 1.72 jobId = str(jj) + '_' + jid
161 spiga 1.56 common.logger.debug(5,'JobID for ML monitoring is created for EDG scheduler'+jobId)
162    
163     if ( jid.find(":") != -1 ) :
164     rb = jid.split(':')[1]
165 slacapra 1.69 rb = rb.replace('//', '')
166 spiga 1.56 else :
167 slacapra 1.69 rb = 'OSG'
168 spiga 1.56
169     params = {'jobId': jobId, \
170 slacapra 1.69 'sid': jid, \
171     'broker': rb, \
172 corvo 1.72 'bossId': jj, \
173     'TargetSE': string.join((common.jobDB.destination(tmpNj)),",")}
174 spiga 1.56
175 slacapra 1.69 fl = open(common.work_space.shareDir() + '/' + self.cfg_params['apmon'].fName, 'r')
176 spiga 1.56 for i in fl.readlines():
177     val = i.split(':')
178     params[val[0]] = string.strip(val[1])
179 slacapra 1.69 fl.close()
180 corvo 1.72
181 gutsche 1.65 common.logger.debug(5,'Submission DashBoard report: '+str(params))
182    
183 spiga 1.56 self.cfg_params['apmon'].sendToML(params)
184 slacapra 1.69 pass
185     pass
186 corvo 1.17
187     except:
188 corvo 1.27 exctype, value = sys.exc_info()[:2]
189 corvo 1.30 print "Type: %s Value: %s"%(exctype, value)
190 corvo 1.27 common.logger.message("Submitter::run Exception raised: %s %s"%(exctype, value))
191 corvo 1.17 common.jobDB.save()
192 corvo 1.30
193     stop = time.time()
194 gutsche 1.47 common.logger.debug(1, "Submission Time: "+str(stop - start))
195 slacapra 1.60 common.logger.write("Submission time :"+str(stop - start))
196 corvo 1.17 common.jobDB.save()
197 corvo 1.15
198 nsmirnov 1.6 msg = '\nTotal of %d jobs submitted'%njs
199     if njs != len(self.nj_list) :
200     msg += ' (from %d requested).'%(len(self.nj_list))
201     pass
202     else:
203     msg += '.'
204     pass
205 nsmirnov 1.2 common.logger.message(msg)
206 slacapra 1.73 ## add some more verbose message in case submission is not complete
207     if (njs < len(self.nj_list)):
208     msg = 'Submission performed using the Requirements: \n'
209 slacapra 1.76 msg += common.taskDB.dict("jobtype")+' version: '+common.taskDB.dict("codeVersion")+'\n'
210 slacapra 1.73 try: msg += 'SE White List: '+self.cfg_params['EDG.se_white_list']+'\n'
211     except KeyError: pass
212     try: msg += 'SE Black List: '+self.cfg_params['EDG.se_black_list']+'\n'
213     except KeyError: pass
214     try: msg += 'CE White List: '+self.cfg_params['EDG.ce_white_list']+'\n'
215     except KeyError: pass
216     try: msg += 'CE Black List: '+self.cfg_params['EDG.ce_black_list']+'\n'
217     except KeyError: pass
218 slacapra 1.76 msg += '(Hint: please check if '+common.taskDB.dict("jobtype")+' is available at the Sites)\n'
219 slacapra 1.73
220     common.logger.message(msg)
221    
222 nsmirnov 1.1 return