ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Submitter.py
Revision: 1.43
Committed: Tue Mar 28 11:04:01 2006 UTC (19 years, 1 month ago) by slacapra
Content type: text/x-python
Branch: MAIN
Changes since 1.42: +1 -1 lines
Log Message:
cosmetics

File Contents

# User Rev Content
1 nsmirnov 1.1 from Actor import *
2 nsmirnov 1.6 from crab_util import *
3 nsmirnov 1.2 import common
4 corvo 1.9 from ApmonIf import ApmonIf
5 spiga 1.10 import Statistic
6 corvo 1.27 from random import random
7 corvo 1.30 import time
8 nsmirnov 1.1
9     class Submitter(Actor):
10 corvo 1.35 # marco
11 corvo 1.40 # def __init__(self, cfg_params, nj_list, job_type):
12     def __init__(self, cfg_params, nj_list):
13 nsmirnov 1.1 self.cfg_params = cfg_params
14 nsmirnov 1.4 self.nj_list = nj_list
15 corvo 1.40 # self.job_type = job_type
16 corvo 1.35
17 slacapra 1.25 try:
18     self.ML = int(cfg_params['USER.activate_monalisa'])
19     except KeyError:
20     self.ML = 0
21     pass
22 corvo 1.30
23 nsmirnov 1.1 return
24    
25     def run(self):
26 nsmirnov 1.2 """
27     The main method of the class.
28     """
29     common.logger.debug(5, "Submitter::run() called")
30 slacapra 1.24
31 gutsche 1.42 totalCreatedJobs= 0
32 corvo 1.35 # marco. Common job type parameters to be sent to ML and Daniele's Monitor
33 corvo 1.40 # jobtype_p = self.job_type.getParams()
34 corvo 1.37 listCE = ""
35 corvo 1.33 start = time.time()
36 slacapra 1.24 for nj in range(common.jobDB.nJobs()):
37 spiga 1.29 if (common.jobDB.status(nj)=='C') or (common.jobDB.status(nj)=='RC'): totalCreatedJobs +=1
38 slacapra 1.24 pass
39    
40     if (totalCreatedJobs==0):
41     common.logger.message("No jobs to be submitted: first create them")
42 slacapra 1.28 return
43 slacapra 1.24
44 slacapra 1.7 firstJob=self.nj_list[0]
45     match = common.scheduler.listMatch(firstJob)
46 slacapra 1.8 if match:
47 slacapra 1.43 common.logger.message("Found "+str(match)+" compatible site(s)")
48 slacapra 1.8 else:
49 slacapra 1.28 raise CrabException("No compatible site found!")
50 fanzago 1.16 #########
51 nsmirnov 1.2 # Loop over jobs
52 nsmirnov 1.6 njs = 0
53 corvo 1.15 try:
54 corvo 1.17 for nj in self.nj_list:
55     st = common.jobDB.status(nj)
56 corvo 1.21 # print "nj = ", nj
57     # print "st = ", st
58 corvo 1.17 if st != 'C' and st != 'K' and st != 'A' and st != 'RC':
59     long_st = crabJobStatusToString(st)
60 slacapra 1.19 msg = "Job # %d not submitted: status %s"%(nj+1, long_st)
61 slacapra 1.18 common.logger.message(msg)
62 corvo 1.17 continue
63    
64     common.logger.message("Submitting job # "+`(nj+1)`)
65     jid = common.scheduler.submit(nj)
66    
67     common.jobDB.setStatus(nj, 'S')
68     common.jobDB.setJobId(nj, jid)
69 corvo 1.33 common.jobDB.setTaskId(nj, self.cfg_params['taskId'])
70 corvo 1.17 njs += 1
71    
72     ############################################
73    
74     if st == 'C':
75     resFlag = 0
76     elif st == 'RC':
77     resFlag = 2
78     else:
79     resFlag = 0
80     pass
81 spiga 1.26
82 spiga 1.38 try:
83     Statistic.Monitor('submit',resFlag,jid,'-----')
84     except:
85     pass
86 corvo 1.17
87 slacapra 1.25 if (self.ML==1):
88 slacapra 1.22 try:
89 spiga 1.26 #List of parameters to be sent to ML monitor system
90 corvo 1.35 # Marco. Should be better to put it in the SchedulerEdg/gLite class
91 gutsche 1.42 # if self.job_type.name() == "ORCA" or self.job_type.name() == 'ORCA_DBSDLS' or self.job_type.name() == 'ORCA_COMMON':
92 corvo 1.40 # listCE = ','.join(common.analisys_common_info['sites'])
93 gutsche 1.42 self.cfg_params['GridName'] = runCommand("voms-proxy-info -identity")
94 fanzago 1.32 common.logger.debug(5, "GRIDNAME: "+self.cfg_params['GridName'])
95 corvo 1.35 self.cfg_params['jobId'] = str(nj + 1)
96 corvo 1.30 self.cfg_params['sid'] = jid
97 slacapra 1.25 nevtJob = common.jobDB.maxEvents(nj)
98     taskType = 'analysis'
99 gutsche 1.42 taskId = self.cfg_params['user'] + '_' + string.split(common.work_space.topDir(),'/')[-2]
100     if ( jid.find(":") != -1 ) :
101     rb = jid.split(':')[1]
102     self.cfg_params['rb'] = rb.replace('//', '')
103     else :
104     self.cfg_params['rb'] = 'OSG'
105 corvo 1.35
106 corvo 1.40 params = {'jobId': str(nj + 1) + '_' + self.cfg_params['sid'] ,'taskId': taskId, 'sid': self.cfg_params['sid'], \
107 corvo 1.35 'nevtJob': nevtJob, 'tool': common.prog_name, 'tool_ui': os.environ['HOSTNAME'], \
108 corvo 1.30 'scheduler': self.cfg_params['CRAB.scheduler'], 'GridName': self.cfg_params['GridName'], 'taskType': taskType, \
109 corvo 1.35 'vo': self.cfg_params['EDG.virtual_organization'], 'broker': self.cfg_params['rb'], 'user': self.cfg_params['user'], 'TargetCE': listCE, 'bossId': common.jobDB.bossId(nj)}
110 corvo 1.40 # for i in jobtype_p.iterkeys():
111     # params[i] = jobtype_p[i]
112 corvo 1.35 # for j, k in params.iteritems():
113     # print "Values: %s %s"%(j, k)
114 corvo 1.30 self.cfg_params['apmon'].fillDict(params)
115     self.cfg_params['apmon'].sendToML()
116 slacapra 1.25 except:
117 corvo 1.27 exctype, value = sys.exc_info()[:2]
118 spiga 1.38 common.logger.message("Submitter::run Exception raised: %s %s"%(exctype, value))
119 slacapra 1.25 pass
120     pass # use ML
121 corvo 1.17 except:
122 corvo 1.27 exctype, value = sys.exc_info()[:2]
123 corvo 1.30 print "Type: %s Value: %s"%(exctype, value)
124 corvo 1.27 common.logger.message("Submitter::run Exception raised: %s %s"%(exctype, value))
125 corvo 1.17 common.jobDB.save()
126 corvo 1.30
127     stop = time.time()
128 fanzago 1.32 common.logger.debug(5, "Submission Time: "+str(stop - start))
129     #print "Submission Time: %d "%(stop - start)
130 corvo 1.17 common.jobDB.save()
131 corvo 1.15
132 nsmirnov 1.6 msg = '\nTotal of %d jobs submitted'%njs
133     if njs != len(self.nj_list) :
134     msg += ' (from %d requested).'%(len(self.nj_list))
135     pass
136     else:
137     msg += '.'
138     pass
139 nsmirnov 1.2 common.logger.message(msg)
140 nsmirnov 1.1 return