ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Submitter.py
Revision: 1.69
Committed: Wed Oct 18 17:54:16 2006 UTC (18 years, 6 months ago) by slacapra
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_1_4_0
Changes since 1.68: +37 -49 lines
Log Message:
fix submission

File Contents

# User Rev Content
1 nsmirnov 1.1 from Actor import *
2 nsmirnov 1.6 from crab_util import *
3 nsmirnov 1.2 import common
4 corvo 1.9 from ApmonIf import ApmonIf
5 spiga 1.10 import Statistic
6 slacapra 1.60 #from random import random
7 corvo 1.30 import time
8 slacapra 1.60 from ProgressBar import ProgressBar
9     from TerminalController import TerminalController
10 nsmirnov 1.1
11     class Submitter(Actor):
12 corvo 1.40 def __init__(self, cfg_params, nj_list):
13 nsmirnov 1.1 self.cfg_params = cfg_params
14 nsmirnov 1.4 self.nj_list = nj_list
15 gutsche 1.58
16     if common.scheduler.boss_scheduler_name == 'condor_g':
17     # create hash of cfg file
18     self.hash = makeCksum(common.work_space.cfgFileName())
19     else:
20     self.hash = ''
21 corvo 1.30
22 nsmirnov 1.1 return
23    
24     def run(self):
25 nsmirnov 1.2 """
26 slacapra 1.53 The main method of the class: submit jobs in range self.nj_list
27 nsmirnov 1.2 """
28     common.logger.debug(5, "Submitter::run() called")
29 slacapra 1.24
30 gutsche 1.42 totalCreatedJobs= 0
31 corvo 1.33 start = time.time()
32 slacapra 1.24 for nj in range(common.jobDB.nJobs()):
33 spiga 1.29 if (common.jobDB.status(nj)=='C') or (common.jobDB.status(nj)=='RC'): totalCreatedJobs +=1
34 slacapra 1.24 pass
35    
36     if (totalCreatedJobs==0):
37     common.logger.message("No jobs to be submitted: first create them")
38 slacapra 1.28 return
39 slacapra 1.60
40 fanzago 1.16 #########
41 nsmirnov 1.2 # Loop over jobs
42 nsmirnov 1.6 njs = 0
43 corvo 1.15 try:
44 slacapra 1.69 list=[]
45 spiga 1.66 list_of_list = []
46 slacapra 1.57 lastBlock=-1
47 spiga 1.66 count = 0
48 corvo 1.17 for nj in self.nj_list:
49 slacapra 1.53 same=0
50 slacapra 1.49 # first check that status of the job is suitable for submission
51 corvo 1.17 st = common.jobDB.status(nj)
52 slacapra 1.69 if st != 'C' and st != 'K' and st != 'A' and st != 'RC':
53 corvo 1.17 long_st = crabJobStatusToString(st)
54 slacapra 1.19 msg = "Job # %d not submitted: status %s"%(nj+1, long_st)
55 slacapra 1.18 common.logger.message(msg)
56 corvo 1.17 continue
57 slacapra 1.69
58 slacapra 1.57 currBlock = common.jobDB.block(nj)
59     # SL perform listmatch only if block has changed
60     if (currBlock!=lastBlock):
61 gutsche 1.54 if common.scheduler.boss_scheduler_name != "condor_g" :
62 slacapra 1.57 match = common.scheduler.listMatch(nj, currBlock)
63 gutsche 1.54 else :
64     match = "1"
65 slacapra 1.57 lastBlock = currBlock
66 mkirn 1.50 else:
67 slacapra 1.53 common.logger.debug(1,"Sites for job "+str(nj+1)+" the same as previous job")
68     same=1
69 slacapra 1.69
70 slacapra 1.49 if match:
71 slacapra 1.53 if not same:
72     common.logger.message("Found "+str(match)+" compatible site(s) for job "+str(nj+1))
73     else:
74     common.logger.debug(1,"Found "+str(match)+" compatible site(s) for job "+str(nj+1))
75 slacapra 1.69 list.append(common.jobDB.bossId(nj))
76    
77     if nj == self.nj_list[-1]: # check that is not the last job in the list
78     list_of_list.append([currBlock,list])
79     else: # check if next job has same group
80     nextBlock = common.jobDB.block(nj+1)
81     if currBlock != nextBlock : # if not, close this group and reset
82 spiga 1.66 list_of_list.append([currBlock,list])
83 slacapra 1.69 list=[]
84 slacapra 1.49 else:
85 mkirn 1.50 common.logger.message("No compatible site found, will not submit job "+str(nj+1))
86 slacapra 1.49 continue
87 spiga 1.66 count += 1
88 gutsche 1.65 ### Progress Bar indicator, deactivate for debug
89     if not common.logger.debugLevel() :
90     term = TerminalController()
91 slacapra 1.60
92 spiga 1.66 for ii in range(len(list_of_list)): # Add loop DS
93 slacapra 1.68 common.logger.debug(1,'Submitting jobs '+str(list_of_list[ii][1]))
94 slacapra 1.69 if not common.logger.debugLevel() :
95     try: pbar = ProgressBar(term, 'Submitting '+str(len(list_of_list[ii][1]))+' jobs')
96     except: pbar = None
97    
98 corvo 1.67 jidLista, bjidLista = common.scheduler.submit(list_of_list[ii])
99 slacapra 1.69 bjidLista = map(int, bjidLista) # cast all bjidLista to int
100 corvo 1.67
101 slacapra 1.68 if not common.logger.debugLevel():
102     if pbar :
103     pbar.update(float(ii+1)/float(len(list_of_list)),'please wait')
104    
105 corvo 1.67 for jj in bjidLista: # Add loop over SID returned from group submission DS
106 slacapra 1.69 tmpNj = jj - 1
107 corvo 1.67 jid=jidLista[bjidLista.index(jj)]
108 slacapra 1.69 common.logger.debug(5,"Submitted job # "+`(tmpNj+1)`)
109     common.jobDB.setStatus(tmpNj, 'S')
110     common.jobDB.setJobId(tmpNj, jid)
111     common.jobDB.setTaskId(tmpNj, self.cfg_params['taskId'])
112 spiga 1.56 njs += 1
113    
114 slacapra 1.69 ##### DashBoard report #####################
115     try:
116 spiga 1.56 resFlag = 0
117 slacapra 1.69 if st == 'RC': resFlag = 2
118 spiga 1.56 Statistic.Monitor('submit',resFlag,jid,'-----')
119     except:
120     pass
121    
122     # OLI: JobID treatment, special for Condor-G scheduler
123     jobId = ''
124     if common.scheduler.boss_scheduler_name == 'condor_g':
125 slacapra 1.69 jobId = str(tmpNj + 1) + '_' + self.hash + '_' + jid
126 spiga 1.56 common.logger.debug(5,'JobID for ML monitoring is created for CONDOR_G scheduler:'+jobId)
127     else:
128 slacapra 1.69 jobId = str(tmpNj + 1) + '_' + jid
129 spiga 1.56 common.logger.debug(5,'JobID for ML monitoring is created for EDG scheduler'+jobId)
130    
131     if ( jid.find(":") != -1 ) :
132     rb = jid.split(':')[1]
133 slacapra 1.69 rb = rb.replace('//', '')
134 spiga 1.56 else :
135 slacapra 1.69 rb = 'OSG'
136 spiga 1.56
137     params = {'jobId': jobId, \
138 slacapra 1.69 'sid': jid, \
139     'broker': rb, \
140     'bossId': common.jobDB.bossId(tmpNj), \
141     'TargetCE': string.join((common.jobDB.destination(tmpNj)),",")}
142 spiga 1.56
143 slacapra 1.69 fl = open(common.work_space.shareDir() + '/' + self.cfg_params['apmon'].fName, 'r')
144 spiga 1.56 for i in fl.readlines():
145     val = i.split(':')
146     params[val[0]] = string.strip(val[1])
147 slacapra 1.69 fl.close()
148 gutsche 1.65
149     common.logger.debug(5,'Submission DashBoard report: '+str(params))
150    
151 spiga 1.56 self.cfg_params['apmon'].sendToML(params)
152 slacapra 1.69 pass
153     pass
154 corvo 1.17
155     except:
156 corvo 1.27 exctype, value = sys.exc_info()[:2]
157 corvo 1.30 print "Type: %s Value: %s"%(exctype, value)
158 corvo 1.27 common.logger.message("Submitter::run Exception raised: %s %s"%(exctype, value))
159 corvo 1.17 common.jobDB.save()
160 corvo 1.30
161     stop = time.time()
162 gutsche 1.47 common.logger.debug(1, "Submission Time: "+str(stop - start))
163 slacapra 1.60 common.logger.write("Submission time :"+str(stop - start))
164 corvo 1.17 common.jobDB.save()
165 corvo 1.15
166 nsmirnov 1.6 msg = '\nTotal of %d jobs submitted'%njs
167     if njs != len(self.nj_list) :
168     msg += ' (from %d requested).'%(len(self.nj_list))
169     pass
170     else:
171     msg += '.'
172     pass
173 nsmirnov 1.2 common.logger.message(msg)
174 nsmirnov 1.1 return