ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Submitter.py
Revision: 1.47
Committed: Sun May 28 02:27:17 2006 UTC (18 years, 11 months ago) by gutsche
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_1_2_1, CRAB_1_2_0, CRAB_1_2_0_pre9, CRAB_1_2_0_pre8, CRAB_1_2_0_pre7, post_cmssw_integration_20060527
Branch point for: CRAB_BOSS4
Changes since 1.46: +7 -3 lines
Log Message:
Integrate CMSSW: remove number of events per job from ML reporting

File Contents

# User Rev Content
1 nsmirnov 1.1 from Actor import *
2 nsmirnov 1.6 from crab_util import *
3 nsmirnov 1.2 import common
4 corvo 1.9 from ApmonIf import ApmonIf
5 spiga 1.10 import Statistic
6 corvo 1.27 from random import random
7 corvo 1.30 import time
8 nsmirnov 1.1
9     class Submitter(Actor):
10 corvo 1.40 def __init__(self, cfg_params, nj_list):
11 nsmirnov 1.1 self.cfg_params = cfg_params
12 nsmirnov 1.4 self.nj_list = nj_list
13 corvo 1.35
14 slacapra 1.25 try:
15     self.ML = int(cfg_params['USER.activate_monalisa'])
16     except KeyError:
17     self.ML = 0
18     pass
19 corvo 1.30
20 nsmirnov 1.1 return
21    
22     def run(self):
23 nsmirnov 1.2 """
24     The main method of the class.
25     """
26     common.logger.debug(5, "Submitter::run() called")
27 slacapra 1.24
28 gutsche 1.42 totalCreatedJobs= 0
29 corvo 1.37 listCE = ""
30 corvo 1.33 start = time.time()
31 slacapra 1.24 for nj in range(common.jobDB.nJobs()):
32 spiga 1.29 if (common.jobDB.status(nj)=='C') or (common.jobDB.status(nj)=='RC'): totalCreatedJobs +=1
33 slacapra 1.24 pass
34    
35     if (totalCreatedJobs==0):
36     common.logger.message("No jobs to be submitted: first create them")
37 slacapra 1.28 return
38 slacapra 1.24
39 slacapra 1.7 firstJob=self.nj_list[0]
40     match = common.scheduler.listMatch(firstJob)
41 slacapra 1.8 if match:
42 slacapra 1.43 common.logger.message("Found "+str(match)+" compatible site(s)")
43 slacapra 1.8 else:
44 slacapra 1.28 raise CrabException("No compatible site found!")
45 fanzago 1.16 #########
46 nsmirnov 1.2 # Loop over jobs
47 nsmirnov 1.6 njs = 0
48 corvo 1.15 try:
49 corvo 1.17 for nj in self.nj_list:
50     st = common.jobDB.status(nj)
51 corvo 1.21 # print "nj = ", nj
52     # print "st = ", st
53 corvo 1.17 if st != 'C' and st != 'K' and st != 'A' and st != 'RC':
54     long_st = crabJobStatusToString(st)
55 slacapra 1.19 msg = "Job # %d not submitted: status %s"%(nj+1, long_st)
56 slacapra 1.18 common.logger.message(msg)
57 corvo 1.17 continue
58    
59     common.logger.message("Submitting job # "+`(nj+1)`)
60     jid = common.scheduler.submit(nj)
61    
62     common.jobDB.setStatus(nj, 'S')
63     common.jobDB.setJobId(nj, jid)
64 corvo 1.33 common.jobDB.setTaskId(nj, self.cfg_params['taskId'])
65 corvo 1.17 njs += 1
66    
67     ############################################
68    
69     if st == 'C':
70     resFlag = 0
71     elif st == 'RC':
72     resFlag = 2
73     else:
74     resFlag = 0
75     pass
76 spiga 1.26
77 spiga 1.38 try:
78     Statistic.Monitor('submit',resFlag,jid,'-----')
79     except:
80     pass
81 corvo 1.17
82 corvo 1.44 fl = open(common.work_space.shareDir() + '/' + self.cfg_params['apmon'].fName, 'r')
83     self.cfg_params['sid'] = jid
84 gutsche 1.47 #### FF: per il momento commentiamo nevtJob che non c'e' piu' nel jobdb
85     #nevtJob = common.jobDB.maxEvents(nj)
86 gutsche 1.46
87     # OLI: JobID treatment, special for Condor-G scheduler
88     jobId = ''
89     if common.scheduler.boss_scheduler_name == 'condor_g':
90     # create hash of cfg file
91     hash = makeCksum(common.work_space.cfgFileName())
92     jobId = str(nj + 1) + '_' + hash + '_' + self.cfg_params['sid']
93     common.logger.debug(5,'JobID for ML monitoring is created for CONDOR_G scheduler:'+jobId)
94     else:
95     jobId = str(nj + 1) + '_' + self.cfg_params['sid']
96     common.logger.debug(5,'JobID for ML monitoring is created for EDG scheduler'+jobId)
97    
98 corvo 1.44 if ( jid.find(":") != -1 ) :
99     rb = jid.split(':')[1]
100     self.cfg_params['rb'] = rb.replace('//', '')
101     else :
102     self.cfg_params['rb'] = 'OSG'
103 gutsche 1.46
104 gutsche 1.47 #### FF: per il momento commentiamo nevtJob che non c'e' piu' nel jobdb
105     #params = {'nevtJob': nevtJob, 'jobId': jobId, 'sid': self.cfg_params['sid'], \
106     # 'broker': self.cfg_params['rb'], 'bossId': common.jobDB.bossId(nj)}
107     params = {'jobId': jobId, 'sid': self.cfg_params['sid'], \
108 corvo 1.44 'broker': self.cfg_params['rb'], 'bossId': common.jobDB.bossId(nj)}
109     for i in fl.readlines():
110     val = i.split(':')
111     params[val[0]] = string.strip(val[1])
112 corvo 1.45 self.cfg_params['apmon'].sendToML(params)
113 corvo 1.17 except:
114 corvo 1.27 exctype, value = sys.exc_info()[:2]
115 corvo 1.30 print "Type: %s Value: %s"%(exctype, value)
116 corvo 1.27 common.logger.message("Submitter::run Exception raised: %s %s"%(exctype, value))
117 corvo 1.17 common.jobDB.save()
118 corvo 1.30
119     stop = time.time()
120 gutsche 1.47 common.logger.debug(1, "Submission Time: "+str(stop - start))
121 fanzago 1.32 #print "Submission Time: %d "%(stop - start)
122 corvo 1.17 common.jobDB.save()
123 corvo 1.15
124 nsmirnov 1.6 msg = '\nTotal of %d jobs submitted'%njs
125     if njs != len(self.nj_list) :
126     msg += ' (from %d requested).'%(len(self.nj_list))
127     pass
128     else:
129     msg += '.'
130     pass
131 nsmirnov 1.2 common.logger.message(msg)
132 nsmirnov 1.1 return