ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Submitter.py
Revision: 1.55
Committed: Fri Sep 29 13:20:52 2006 UTC (18 years, 7 months ago) by slacapra
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_1_4_0_pre1, CRAB_1_3_0
Changes since 1.54: +1 -0 lines
Log Message:
add some timing

File Contents

# User Rev Content
1 nsmirnov 1.1 from Actor import *
2 nsmirnov 1.6 from crab_util import *
3 nsmirnov 1.2 import common
4 corvo 1.9 from ApmonIf import ApmonIf
5 spiga 1.10 import Statistic
6 corvo 1.27 from random import random
7 corvo 1.30 import time
8 nsmirnov 1.1
9     class Submitter(Actor):
10 corvo 1.40 def __init__(self, cfg_params, nj_list):
11 nsmirnov 1.1 self.cfg_params = cfg_params
12 nsmirnov 1.4 self.nj_list = nj_list
13 corvo 1.35
14 slacapra 1.25 try:
15     self.ML = int(cfg_params['USER.activate_monalisa'])
16     except KeyError:
17     self.ML = 0
18     pass
19 corvo 1.30
20 nsmirnov 1.1 return
21    
22     def run(self):
23 nsmirnov 1.2 """
24 slacapra 1.53 The main method of the class: submit jobs in range self.nj_list
25 nsmirnov 1.2 """
26     common.logger.debug(5, "Submitter::run() called")
27 slacapra 1.24
28 gutsche 1.42 totalCreatedJobs= 0
29 corvo 1.33 start = time.time()
30 slacapra 1.24 for nj in range(common.jobDB.nJobs()):
31 spiga 1.29 if (common.jobDB.status(nj)=='C') or (common.jobDB.status(nj)=='RC'): totalCreatedJobs +=1
32 slacapra 1.24 pass
33    
34     if (totalCreatedJobs==0):
35     common.logger.message("No jobs to be submitted: first create them")
36 slacapra 1.28 return
37 fanzago 1.16 #########
38 nsmirnov 1.2 # Loop over jobs
39 nsmirnov 1.6 njs = 0
40 corvo 1.15 try:
41 slacapra 1.49 lastDest=''
42 corvo 1.17 for nj in self.nj_list:
43 slacapra 1.53 same=0
44 slacapra 1.49 # first check that status of the job is suitable for submission
45 corvo 1.17 st = common.jobDB.status(nj)
46     if st != 'C' and st != 'K' and st != 'A' and st != 'RC':
47     long_st = crabJobStatusToString(st)
48 slacapra 1.19 msg = "Job # %d not submitted: status %s"%(nj+1, long_st)
49 slacapra 1.18 common.logger.message(msg)
50 corvo 1.17 continue
51 slacapra 1.49 # SL perform listmatch only if destination has changed
52     currDest=common.jobDB.destination(nj)
53     if (currDest!=lastDest):
54 gutsche 1.54 if common.scheduler.boss_scheduler_name != "condor_g" :
55     match = common.scheduler.listMatch(nj)
56     else :
57     match = "1"
58 slacapra 1.49 lastDest = currDest
59 mkirn 1.50 else:
60 slacapra 1.53 common.logger.debug(1,"Sites for job "+str(nj+1)+" the same as previous job")
61     same=1
62 slacapra 1.49 if match:
63 slacapra 1.53 if not same:
64     common.logger.message("Found "+str(match)+" compatible site(s) for job "+str(nj+1))
65     else:
66     common.logger.debug(1,"Found "+str(match)+" compatible site(s) for job "+str(nj+1))
67 slacapra 1.49 else:
68 mkirn 1.50 common.logger.message("No compatible site found, will not submit job "+str(nj+1))
69 slacapra 1.49 continue
70 corvo 1.17
71     common.logger.message("Submitting job # "+`(nj+1)`)
72     jid = common.scheduler.submit(nj)
73    
74     common.jobDB.setStatus(nj, 'S')
75     common.jobDB.setJobId(nj, jid)
76 corvo 1.33 common.jobDB.setTaskId(nj, self.cfg_params['taskId'])
77 corvo 1.17 njs += 1
78    
79     ############################################
80    
81     if st == 'C':
82     resFlag = 0
83     elif st == 'RC':
84     resFlag = 2
85     else:
86     resFlag = 0
87     pass
88 spiga 1.26
89 spiga 1.38 try:
90     Statistic.Monitor('submit',resFlag,jid,'-----')
91     except:
92     pass
93 corvo 1.17
94 corvo 1.44 fl = open(common.work_space.shareDir() + '/' + self.cfg_params['apmon'].fName, 'r')
95     self.cfg_params['sid'] = jid
96 gutsche 1.47 #### FF: per il momento commentiamo nevtJob che non c'e' piu' nel jobdb
97     #nevtJob = common.jobDB.maxEvents(nj)
98 gutsche 1.46
99     # OLI: JobID treatment, special for Condor-G scheduler
100     jobId = ''
101     if common.scheduler.boss_scheduler_name == 'condor_g':
102     # create hash of cfg file
103     hash = makeCksum(common.work_space.cfgFileName())
104     jobId = str(nj + 1) + '_' + hash + '_' + self.cfg_params['sid']
105     common.logger.debug(5,'JobID for ML monitoring is created for CONDOR_G scheduler:'+jobId)
106     else:
107     jobId = str(nj + 1) + '_' + self.cfg_params['sid']
108     common.logger.debug(5,'JobID for ML monitoring is created for EDG scheduler'+jobId)
109    
110 corvo 1.44 if ( jid.find(":") != -1 ) :
111     rb = jid.split(':')[1]
112     self.cfg_params['rb'] = rb.replace('//', '')
113     else :
114     self.cfg_params['rb'] = 'OSG'
115 gutsche 1.46
116 gutsche 1.47 #### FF: per il momento commentiamo nevtJob che non c'e' piu' nel jobdb
117     #params = {'nevtJob': nevtJob, 'jobId': jobId, 'sid': self.cfg_params['sid'], \
118     # 'broker': self.cfg_params['rb'], 'bossId': common.jobDB.bossId(nj)}
119 slacapra 1.51 params = {'jobId': jobId, \
120     'sid': self.cfg_params['sid'], \
121     'broker': self.cfg_params['rb'], \
122     'bossId': common.jobDB.bossId(nj), \
123     'TargetCE': string.join((common.jobDB.destination(nj)),",")}
124    
125 corvo 1.44 for i in fl.readlines():
126     val = i.split(':')
127     params[val[0]] = string.strip(val[1])
128 corvo 1.45 self.cfg_params['apmon'].sendToML(params)
129 corvo 1.17 except:
130 corvo 1.27 exctype, value = sys.exc_info()[:2]
131 corvo 1.30 print "Type: %s Value: %s"%(exctype, value)
132 corvo 1.27 common.logger.message("Submitter::run Exception raised: %s %s"%(exctype, value))
133 corvo 1.17 common.jobDB.save()
134 corvo 1.30
135     stop = time.time()
136 gutsche 1.47 common.logger.debug(1, "Submission Time: "+str(stop - start))
137 slacapra 1.55 common.logger.write("Submission Time: "+str(stop - start))
138 fanzago 1.32 #print "Submission Time: %d "%(stop - start)
139 corvo 1.17 common.jobDB.save()
140 corvo 1.15
141 nsmirnov 1.6 msg = '\nTotal of %d jobs submitted'%njs
142     if njs != len(self.nj_list) :
143     msg += ' (from %d requested).'%(len(self.nj_list))
144     pass
145     else:
146     msg += '.'
147     pass
148 nsmirnov 1.2 common.logger.message(msg)
149 nsmirnov 1.1 return