ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Submitter.py
Revision: 1.59
Committed: Mon Oct 9 23:26:42 2006 UTC (18 years, 6 months ago) by gutsche
Content type: text/x-python
Branch: MAIN
Changes since 1.58: +1 -1 lines
Log Message:
moved individual output for submission of job to debug level 1. As the output is not printed when the job is submitted but rather when the report to the DashBoard is prepared and sent, does not make sense to have it as a message on the screen (also gets longish for >1000 jobs)

File Contents

# User Rev Content
1 nsmirnov 1.1 from Actor import *
2 nsmirnov 1.6 from crab_util import *
3 nsmirnov 1.2 import common
4 corvo 1.9 from ApmonIf import ApmonIf
5 spiga 1.10 import Statistic
6 corvo 1.27 from random import random
7 corvo 1.30 import time
8 nsmirnov 1.1
9     class Submitter(Actor):
10 corvo 1.40 def __init__(self, cfg_params, nj_list):
11 nsmirnov 1.1 self.cfg_params = cfg_params
12 nsmirnov 1.4 self.nj_list = nj_list
13 gutsche 1.58
14     if common.scheduler.boss_scheduler_name == 'condor_g':
15     # create hash of cfg file
16     self.hash = makeCksum(common.work_space.cfgFileName())
17     else:
18     self.hash = ''
19 corvo 1.30
20 nsmirnov 1.1 return
21    
22     def run(self):
23 nsmirnov 1.2 """
24 slacapra 1.53 The main method of the class: submit jobs in range self.nj_list
25 nsmirnov 1.2 """
26     common.logger.debug(5, "Submitter::run() called")
27 slacapra 1.24
28 gutsche 1.42 totalCreatedJobs= 0
29 corvo 1.33 start = time.time()
30 slacapra 1.24 for nj in range(common.jobDB.nJobs()):
31 spiga 1.29 if (common.jobDB.status(nj)=='C') or (common.jobDB.status(nj)=='RC'): totalCreatedJobs +=1
32 slacapra 1.24 pass
33    
34     if (totalCreatedJobs==0):
35     common.logger.message("No jobs to be submitted: first create them")
36 slacapra 1.28 return
37 fanzago 1.16 #########
38 nsmirnov 1.2 # Loop over jobs
39 nsmirnov 1.6 njs = 0
40 corvo 1.15 try:
41 spiga 1.56 first = []
42     last = []
43 slacapra 1.57 lastBlock=-1
44 corvo 1.17 for nj in self.nj_list:
45 slacapra 1.53 same=0
46 slacapra 1.49 # first check that status of the job is suitable for submission
47 corvo 1.17 st = common.jobDB.status(nj)
48 spiga 1.56 if st != 'C' :# and st != 'K' and st != 'A' and st != 'RC': ## commentato per ora...quindi NON risotomette
49 corvo 1.17 long_st = crabJobStatusToString(st)
50 slacapra 1.19 msg = "Job # %d not submitted: status %s"%(nj+1, long_st)
51 slacapra 1.18 common.logger.message(msg)
52 corvo 1.17 continue
53 slacapra 1.57 currBlock = common.jobDB.block(nj)
54     # SL perform listmatch only if block has changed
55     if (currBlock!=lastBlock):
56 gutsche 1.54 if common.scheduler.boss_scheduler_name != "condor_g" :
57 slacapra 1.57 match = common.scheduler.listMatch(nj, currBlock)
58 gutsche 1.54 else :
59     match = "1"
60 slacapra 1.57 lastBlock = currBlock
61 mkirn 1.50 else:
62 slacapra 1.53 common.logger.debug(1,"Sites for job "+str(nj+1)+" the same as previous job")
63     same=1
64 slacapra 1.49 if match:
65 slacapra 1.53 if not same:
66 spiga 1.56 ###################### here create index list start:end
67     first.append(nj)
68     if nj != 0:last.append(nj-1)
69     ######################
70 slacapra 1.53 common.logger.message("Found "+str(match)+" compatible site(s) for job "+str(nj+1))
71     else:
72     common.logger.debug(1,"Found "+str(match)+" compatible site(s) for job "+str(nj+1))
73 slacapra 1.49 else:
74 mkirn 1.50 common.logger.message("No compatible site found, will not submit job "+str(nj+1))
75 slacapra 1.49 continue
76 slacapra 1.57
77 spiga 1.56 ############## TODO improve the follow control .....
78     if len(first)>len(last):
79     if common.jobDB.nJobs() == 1 :
80     last.append(0) # List of last index
81     else:
82     last.append(self.nj_list[len(self.nj_list)-1]) #List of last index
83     else:
84     if first[len(first)-1] > last[len(last)-1]:
85     last.remove(last[len(last)-1])
86     last.append(self.nj_list[len(self.nj_list)-1])
87    
88     for ii in range(len(first)): # Add loop DS
89 gutsche 1.58 common.logger.message('Submitting jobs '+str(first[ii]+1)+' to '+str(last[ii]+1))
90 spiga 1.56 #common.logger.message("Submitting job # "+`(nj+1)`)
91     jidLista = common.scheduler.submit(first[ii],last[ii],ii)
92    
93     ####
94     for jj in range(len(jidLista)): # Add loop over SID returned from group submission DS
95     nj= int(jj+int(first[ii]))
96     jid=jidLista[jj]
97 gutsche 1.59 common.logger.debug(1,"Submitted job # "+`(nj+1)`)
98 spiga 1.56
99     common.jobDB.setStatus(nj, 'S')
100     common.jobDB.setJobId(nj, jid)
101     common.jobDB.setTaskId(nj, self.cfg_params['taskId'])
102     njs += 1
103     ############################################
104    
105     if st == 'C':
106     resFlag = 0
107     elif st == 'RC':
108     resFlag = 2
109     else:
110     resFlag = 0
111     pass
112    
113     try:
114     Statistic.Monitor('submit',resFlag,jid,'-----')
115     except:
116     pass
117    
118     fl = open(common.work_space.shareDir() + '/' + self.cfg_params['apmon'].fName, 'r')
119     self.cfg_params['sid'] = jid
120     #### FF: per il momento commentiamo nevtJob che non c'e' piu' nel jobdb
121     #nevtJob = common.jobDB.maxEvents(nj)
122    
123     # OLI: JobID treatment, special for Condor-G scheduler
124     jobId = ''
125     if common.scheduler.boss_scheduler_name == 'condor_g':
126 gutsche 1.58 jobId = str(nj + 1) + '_' + self.hash + '_' + self.cfg_params['sid']
127 spiga 1.56 common.logger.debug(5,'JobID for ML monitoring is created for CONDOR_G scheduler:'+jobId)
128     else:
129     jobId = str(nj + 1) + '_' + self.cfg_params['sid']
130     common.logger.debug(5,'JobID for ML monitoring is created for EDG scheduler'+jobId)
131    
132     if ( jid.find(":") != -1 ) :
133     rb = jid.split(':')[1]
134     self.cfg_params['rb'] = rb.replace('//', '')
135     else :
136     self.cfg_params['rb'] = 'OSG'
137    
138     #### FF: per il momento commentiamo nevtJob che non c'e' piu' nel jobdb
139     #params = {'nevtJob': nevtJob, 'jobId': jobId, 'sid': self.cfg_params['sid'], \
140     # 'broker': self.cfg_params['rb'], 'bossId': common.jobDB.bossId(nj)}
141     params = {'jobId': jobId, \
142     'sid': self.cfg_params['sid'], \
143     'broker': self.cfg_params['rb'], \
144     'bossId': common.jobDB.bossId(nj), \
145     'TargetCE': string.join((common.jobDB.destination(nj)),",")}
146    
147     for i in fl.readlines():
148     val = i.split(':')
149     params[val[0]] = string.strip(val[1])
150     self.cfg_params['apmon'].sendToML(params)
151 corvo 1.17
152     except:
153 corvo 1.27 exctype, value = sys.exc_info()[:2]
154 corvo 1.30 print "Type: %s Value: %s"%(exctype, value)
155 corvo 1.27 common.logger.message("Submitter::run Exception raised: %s %s"%(exctype, value))
156 corvo 1.17 common.jobDB.save()
157 corvo 1.30
158     stop = time.time()
159 gutsche 1.47 common.logger.debug(1, "Submission Time: "+str(stop - start))
160 fanzago 1.32 #print "Submission Time: %d "%(stop - start)
161 corvo 1.17 common.jobDB.save()
162 corvo 1.15
163 nsmirnov 1.6 msg = '\nTotal of %d jobs submitted'%njs
164     if njs != len(self.nj_list) :
165     msg += ' (from %d requested).'%(len(self.nj_list))
166     pass
167     else:
168     msg += '.'
169     pass
170 nsmirnov 1.2 common.logger.message(msg)
171 nsmirnov 1.1 return