ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/PostMortem.py
Revision: 1.12
Committed: Mon Nov 20 18:44:09 2006 UTC (18 years, 5 months ago) by corvo
Content type: text/x-python
Branch: MAIN
Changes since 1.11: +163 -49 lines
Log Message:
List of jobs is now taken from Boss DB

File Contents

# User Rev Content
1 slacapra 1.1 from Actor import *
2 gutsche 1.6 from crab_util import *
3 slacapra 1.1 import common
4 corvo 1.12 from ApmonIf import ApmonIf
5     import Statistic
6     #from random import random
7     import time
8     from ProgressBar import ProgressBar
9     from TerminalController import TerminalController
10 slacapra 1.1
11 corvo 1.12 class Submitter(Actor):
12 slacapra 1.8 def __init__(self, cfg_params, nj_list):
13 slacapra 1.1 self.cfg_params = cfg_params
14     self.nj_list = nj_list
15 corvo 1.12
16 gutsche 1.10 if common.scheduler.boss_scheduler_name == 'condor_g':
17     # create hash of cfg file
18     self.hash = makeCksum(common.work_space.cfgFileName())
19     else:
20     self.hash = ''
21 corvo 1.12
22 slacapra 1.1 return
23    
24     def run(self):
25     """
26 corvo 1.12 The main method of the class: submit jobs in range self.nj_list
27 slacapra 1.1 """
28 corvo 1.12 common.logger.debug(5, "Submitter::run() called")
29    
30     totalCreatedJobs= 0
31     start = time.time()
32     for nj in range(common.jobDB.nJobs()):
33     if (common.jobDB.status(nj)=='C') or (common.jobDB.status(nj)=='RC'): totalCreatedJobs +=1
34     pass
35 slacapra 1.1
36 corvo 1.12 if (totalCreatedJobs==0):
37     common.logger.message("No jobs to be submitted: first create them")
38 slacapra 1.1 return
39    
40 corvo 1.12 # submit pre DashBoard information
41     params = {'jobId':'TaskMeta'}
42    
43     fl = open(common.work_space.shareDir() + '/' + self.cfg_params['apmon'].fName, 'r')
44     for i in fl.readlines():
45     val = i.split(':')
46     params[val[0]] = string.strip(val[1])
47     fl.close()
48    
49     common.logger.debug(5,'Submission DashBoard Pre-Submission report: '+str(params))
50    
51     self.cfg_params['apmon'].sendToML(params)
52    
53     #########
54     # Loop over jobs
55     njs = 0
56     try:
57     list=[]
58     list_of_list = []
59     lastBlock=-1
60     count = 0
61     for nj in self.nj_list:
62     same=0
63     # first check that status of the job is suitable for submission
64     st = common.jobDB.status(nj)
65     if st != 'C' and st != 'K' and st != 'A' and st != 'RC':
66     long_st = crabJobStatusToString(st)
67     msg = "Job # %d not submitted: status %s"%(nj+1, long_st)
68     common.logger.message(msg)
69     continue
70    
71     currBlock = common.jobDB.block(nj)
72     # SL perform listmatch only if block has changed
73     if (currBlock!=lastBlock):
74     if common.scheduler.boss_scheduler_name != "condor_g" :
75     match = common.scheduler.listMatch(nj, currBlock)
76     else :
77     match = "1"
78     lastBlock = currBlock
79     else:
80     common.logger.debug(1,"Sites for job "+str(nj+1)+" the same as previous job")
81     same=1
82    
83     if match:
84     if not same:
85     common.logger.message("Found "+str(match)+" compatible site(s) for job "+str(nj+1))
86     else:
87     common.logger.debug(1,"Found "+str(match)+" compatible site(s) for job "+str(nj+1))
88     list.append(common.jobDB.bossId(nj))
89    
90     if nj == self.nj_list[-1]: # check that is not the last job in the list
91     list_of_list.append([currBlock,list])
92     else: # check if next job has same group
93     nextBlock = common.jobDB.block(nj+1)
94     if currBlock != nextBlock : # if not, close this group and reset
95     list_of_list.append([currBlock,list])
96     list=[]
97     else:
98     common.logger.message("No compatible site found, will not submit job "+str(nj+1))
99     continue
100     count += 1
101     ### Progress Bar indicator, deactivate for debug
102     if not common.logger.debugLevel() :
103     term = TerminalController()
104    
105     for ii in range(len(list_of_list)): # Add loop DS
106     common.logger.debug(1,'Submitting jobs '+str(list_of_list[ii][1]))
107     # if not common.logger.debugLevel() :
108     # try: pbar = ProgressBar(term, 'Submitting '+str(len(list_of_list[ii][1]))+' jobs')
109     # except: pbar = None
110    
111     jidLista, bjidLista = common.scheduler.submit(list_of_list[ii])
112     bjidLista = map(int, bjidLista) # cast all bjidLista to int
113    
114     # if not common.logger.debugLevel():
115     # if pbar :
116     # pbar.update(float(ii+1)/float(len(list_of_list)),'please wait')
117    
118     for jj in bjidLista: # Add loop over SID returned from group submission DS
119     tmpNj = jj - 1
120     jid=jidLista[bjidLista.index(jj)]
121     common.logger.debug(5,"Submitted job # "+ `(jj)`)
122     common.jobDB.setStatus(tmpNj, 'S')
123     common.jobDB.setJobId(tmpNj, jid)
124     common.jobDB.setTaskId(tmpNj, self.cfg_params['taskId'])
125     njs += 1
126    
127     ##### DashBoard report #####################
128     try:
129     resFlag = 0
130     if st == 'RC': resFlag = 2
131     Statistic.Monitor('submit',resFlag,jid,'-----','dest')
132     except:
133     pass
134    
135     # OLI: JobID treatment, special for Condor-G scheduler
136     jobId = ''
137     if common.scheduler.boss_scheduler_name == 'condor_g':
138     jobId = str(jj) + '_' + self.hash + '_' + jid
139     common.logger.debug(5,'JobID for ML monitoring is created for CONDOR_G scheduler:'+jobId)
140     else:
141     jobId = str(jj) + '_' + jid
142     common.logger.debug(5,'JobID for ML monitoring is created for EDG scheduler'+jobId)
143    
144     if ( jid.find(":") != -1 ) :
145     rb = jid.split(':')[1]
146     rb = rb.replace('//', '')
147     else :
148     rb = 'OSG'
149    
150     params = {'jobId': jobId, \
151     'sid': jid, \
152     'broker': rb, \
153     'bossId': jj, \
154     'TargetSE': string.join((common.jobDB.destination(tmpNj)),",")}
155    
156     fl = open(common.work_space.shareDir() + '/' + self.cfg_params['apmon'].fName, 'r')
157     for i in fl.readlines():
158     val = i.split(':')
159     params[val[0]] = string.strip(val[1])
160     fl.close()
161    
162     common.logger.debug(5,'Submission DashBoard report: '+str(params))
163    
164     self.cfg_params['apmon'].sendToML(params)
165     pass
166     pass
167 gutsche 1.4
168 corvo 1.12 except:
169     exctype, value = sys.exc_info()[:2]
170     print "Type: %s Value: %s"%(exctype, value)
171     common.logger.message("Submitter::run Exception raised: %s %s"%(exctype, value))
172     common.jobDB.save()
173    
174     stop = time.time()
175     common.logger.debug(1, "Submission Time: "+str(stop - start))
176     common.logger.write("Submission time :"+str(stop - start))
177     common.jobDB.save()
178 slacapra 1.1
179 corvo 1.12 msg = '\nTotal of %d jobs submitted'%njs
180     if njs != len(self.nj_list) :
181     msg += ' (from %d requested).'%(len(self.nj_list))
182     pass
183     else:
184     msg += '.'
185 slacapra 1.1 pass
186 corvo 1.12 common.logger.message(msg)
187 slacapra 1.1 return