ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Submitter.py
Revision: 1.67
Committed: Wed Oct 18 07:37:08 2006 UTC (18 years, 6 months ago) by corvo
Content type: text/x-python
Branch: MAIN
Changes since 1.66: +10 -5 lines
Log Message:
Fix for wrong Boss-Grid id association

File Contents

# User Rev Content
1 nsmirnov 1.1 from Actor import *
2 nsmirnov 1.6 from crab_util import *
3 nsmirnov 1.2 import common
4 corvo 1.9 from ApmonIf import ApmonIf
5 spiga 1.10 import Statistic
6 slacapra 1.60 #from random import random
7 corvo 1.30 import time
8 slacapra 1.60 from ProgressBar import ProgressBar
9     from TerminalController import TerminalController
10 nsmirnov 1.1
11     class Submitter(Actor):
12 corvo 1.40 def __init__(self, cfg_params, nj_list):
13 nsmirnov 1.1 self.cfg_params = cfg_params
14 nsmirnov 1.4 self.nj_list = nj_list
15 gutsche 1.58
16     if common.scheduler.boss_scheduler_name == 'condor_g':
17     # create hash of cfg file
18     self.hash = makeCksum(common.work_space.cfgFileName())
19     else:
20     self.hash = ''
21 corvo 1.30
22 nsmirnov 1.1 return
23    
24     def run(self):
25 nsmirnov 1.2 """
26 slacapra 1.53 The main method of the class: submit jobs in range self.nj_list
27 nsmirnov 1.2 """
28     common.logger.debug(5, "Submitter::run() called")
29 slacapra 1.24
30 gutsche 1.42 totalCreatedJobs= 0
31 corvo 1.33 start = time.time()
32 slacapra 1.24 for nj in range(common.jobDB.nJobs()):
33 spiga 1.29 if (common.jobDB.status(nj)=='C') or (common.jobDB.status(nj)=='RC'): totalCreatedJobs +=1
34 slacapra 1.24 pass
35    
36     if (totalCreatedJobs==0):
37     common.logger.message("No jobs to be submitted: first create them")
38 slacapra 1.28 return
39 slacapra 1.60
40 fanzago 1.16 #########
41 nsmirnov 1.2 # Loop over jobs
42 nsmirnov 1.6 njs = 0
43 corvo 1.15 try:
44 spiga 1.66 list=''
45     list_of_list = []
46 slacapra 1.57 lastBlock=-1
47 spiga 1.66 count = 0
48 corvo 1.17 for nj in self.nj_list:
49 slacapra 1.53 same=0
50 slacapra 1.49 # first check that status of the job is suitable for submission
51 corvo 1.17 st = common.jobDB.status(nj)
52 spiga 1.66 if st != 'C' and st != 'K' and st != 'A' and st != 'RC': ## commentato per ora...quindi NON risotomette
53 corvo 1.17 long_st = crabJobStatusToString(st)
54 slacapra 1.19 msg = "Job # %d not submitted: status %s"%(nj+1, long_st)
55 slacapra 1.18 common.logger.message(msg)
56 corvo 1.17 continue
57 slacapra 1.57 currBlock = common.jobDB.block(nj)
58     # SL perform listmatch only if block has changed
59     if (currBlock!=lastBlock):
60 gutsche 1.54 if common.scheduler.boss_scheduler_name != "condor_g" :
61 slacapra 1.57 match = common.scheduler.listMatch(nj, currBlock)
62 gutsche 1.54 else :
63     match = "1"
64 slacapra 1.57 lastBlock = currBlock
65 mkirn 1.50 else:
66 slacapra 1.53 common.logger.debug(1,"Sites for job "+str(nj+1)+" the same as previous job")
67     same=1
68 slacapra 1.49 if match:
69 slacapra 1.53 if not same:
70     common.logger.message("Found "+str(match)+" compatible site(s) for job "+str(nj+1))
71     else:
72     common.logger.debug(1,"Found "+str(match)+" compatible site(s) for job "+str(nj+1))
73 spiga 1.66 # job list is string because boss can't manage list
74 corvo 1.67 # list = list+str(nj+1)+','
75     list = list+str(common.jobDB.bossId(nj))+','
76 spiga 1.66 # list.append(nj+1)
77     if nj < self.nj_list[len(self.nj_list)-1]:
78     nextBlock = common.jobDB.block(self.nj_list[count+1])
79     if currBlock != nextBlock :
80     list_of_list.append([currBlock,list])
81     list=''
82     else:
83     list_of_list.append([currBlock,list])
84 slacapra 1.49 else:
85 mkirn 1.50 common.logger.message("No compatible site found, will not submit job "+str(nj+1))
86 slacapra 1.49 continue
87 spiga 1.66 count += 1
88 gutsche 1.65 ### Progress Bar indicator, deactivate for debug
89     if not common.logger.debugLevel() :
90     term = TerminalController()
91 slacapra 1.60
92 spiga 1.66 for ii in range(len(list_of_list)): # Add loop DS
93     common.logger.message('Submitting jobs '+str(list_of_list[ii][1]))
94 gutsche 1.65 if not common.logger.debugLevel() :
95     try: pbar = ProgressBar(term, 'Submitting '+str(len(self.nj_list))+' jobs')
96     except: pbar = None
97 spiga 1.56 #common.logger.message("Submitting job # "+`(nj+1)`)
98 corvo 1.67 jidLista, bjidLista = common.scheduler.submit(list_of_list[ii])
99     bjidLista = map(int, bjidLista)
100 spiga 1.56 ####
101 corvo 1.67
102     for jj in bjidLista: # Add loop over SID returned from group submission DS
103 spiga 1.66 # nj= int(jj+int(list[0]))
104 corvo 1.67 # nj= int(str(list_of_list[ii][1]).split(',')[jj])-1
105     #nj = common.jobDB.
106     nj = jj - 1
107     jid=jidLista[bjidLista.index(jj)]
108 gutsche 1.59 common.logger.debug(1,"Submitted job # "+`(nj+1)`)
109 spiga 1.56 common.jobDB.setStatus(nj, 'S')
110     common.jobDB.setJobId(nj, jid)
111     common.jobDB.setTaskId(nj, self.cfg_params['taskId'])
112     njs += 1
113 gutsche 1.65 if not common.logger.debugLevel():
114     if pbar :
115     pbar.update(float(jj+1)/float(len(jidLista)),'please wait')
116 spiga 1.56 ############################################
117    
118     if st == 'C':
119     resFlag = 0
120     elif st == 'RC':
121     resFlag = 2
122     else:
123     resFlag = 0
124     pass
125    
126     try:
127     Statistic.Monitor('submit',resFlag,jid,'-----')
128     except:
129     pass
130    
131     fl = open(common.work_space.shareDir() + '/' + self.cfg_params['apmon'].fName, 'r')
132     self.cfg_params['sid'] = jid
133     #### FF: per il momento commentiamo nevtJob che non c'e' piu' nel jobdb
134     #nevtJob = common.jobDB.maxEvents(nj)
135    
136     # OLI: JobID treatment, special for Condor-G scheduler
137     jobId = ''
138     if common.scheduler.boss_scheduler_name == 'condor_g':
139 gutsche 1.58 jobId = str(nj + 1) + '_' + self.hash + '_' + self.cfg_params['sid']
140 spiga 1.56 common.logger.debug(5,'JobID for ML monitoring is created for CONDOR_G scheduler:'+jobId)
141     else:
142     jobId = str(nj + 1) + '_' + self.cfg_params['sid']
143     common.logger.debug(5,'JobID for ML monitoring is created for EDG scheduler'+jobId)
144    
145     if ( jid.find(":") != -1 ) :
146     rb = jid.split(':')[1]
147     self.cfg_params['rb'] = rb.replace('//', '')
148     else :
149     self.cfg_params['rb'] = 'OSG'
150    
151     #### FF: per il momento commentiamo nevtJob che non c'e' piu' nel jobdb
152     #params = {'nevtJob': nevtJob, 'jobId': jobId, 'sid': self.cfg_params['sid'], \
153     # 'broker': self.cfg_params['rb'], 'bossId': common.jobDB.bossId(nj)}
154     params = {'jobId': jobId, \
155     'sid': self.cfg_params['sid'], \
156     'broker': self.cfg_params['rb'], \
157     'bossId': common.jobDB.bossId(nj), \
158     'TargetCE': string.join((common.jobDB.destination(nj)),",")}
159    
160     for i in fl.readlines():
161     val = i.split(':')
162     params[val[0]] = string.strip(val[1])
163 gutsche 1.65
164     common.logger.debug(5,'Submission DashBoard report: '+str(params))
165    
166 spiga 1.56 self.cfg_params['apmon'].sendToML(params)
167 corvo 1.17
168     except:
169 corvo 1.27 exctype, value = sys.exc_info()[:2]
170 corvo 1.30 print "Type: %s Value: %s"%(exctype, value)
171 corvo 1.27 common.logger.message("Submitter::run Exception raised: %s %s"%(exctype, value))
172 corvo 1.17 common.jobDB.save()
173 corvo 1.30
174     stop = time.time()
175 gutsche 1.47 common.logger.debug(1, "Submission Time: "+str(stop - start))
176 slacapra 1.60 common.logger.write("Submission time :"+str(stop - start))
177 corvo 1.17 common.jobDB.save()
178 corvo 1.15
179 nsmirnov 1.6 msg = '\nTotal of %d jobs submitted'%njs
180     if njs != len(self.nj_list) :
181     msg += ' (from %d requested).'%(len(self.nj_list))
182     pass
183     else:
184     msg += '.'
185     pass
186 nsmirnov 1.2 common.logger.message(msg)
187 nsmirnov 1.1 return