ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Submitter.py
Revision: 1.34
Committed: Fri Feb 17 16:11:21 2006 UTC (19 years, 2 months ago) by fanzago
Content type: text/x-python
Branch: MAIN
Changes since 1.33: +8 -4 lines
Log Message:
some changes for FAMOS and LFC

File Contents

# User Rev Content
1 nsmirnov 1.1 from Actor import *
2 nsmirnov 1.6 from crab_util import *
3 nsmirnov 1.2 import common
4 corvo 1.9 from ApmonIf import ApmonIf
5 spiga 1.10 import Statistic
6 corvo 1.27 from random import random
7 corvo 1.30 import time
8 nsmirnov 1.1
9     class Submitter(Actor):
10 nsmirnov 1.4 def __init__(self, cfg_params, nj_list):
11 nsmirnov 1.1 self.cfg_params = cfg_params
12 nsmirnov 1.4 self.nj_list = nj_list
13 slacapra 1.25 try:
14     self.ML = int(cfg_params['USER.activate_monalisa'])
15     except KeyError:
16     self.ML = 0
17     pass
18 corvo 1.30
19 nsmirnov 1.1 return
20    
21     def run(self):
22 nsmirnov 1.2 """
23     The main method of the class.
24     """
25     common.logger.debug(5, "Submitter::run() called")
26 slacapra 1.24
27     totalCreatedJobs= 0
28 corvo 1.33 start = time.time()
29 fanzago 1.34 # self.cfg_params['taskId'] = self.cfg_params['user']+'_'+self.cfg_params['USER.dataset']+'.'+self.cfg_params['USER.owner']+'_'+str(start)
30 slacapra 1.24 for nj in range(common.jobDB.nJobs()):
31 spiga 1.29 if (common.jobDB.status(nj)=='C') or (common.jobDB.status(nj)=='RC'): totalCreatedJobs +=1
32 slacapra 1.24 pass
33    
34     if (totalCreatedJobs==0):
35     common.logger.message("No jobs to be submitted: first create them")
36 slacapra 1.28 return
37 slacapra 1.24
38 slacapra 1.7 firstJob=self.nj_list[0]
39     match = common.scheduler.listMatch(firstJob)
40 slacapra 1.8 if match:
41 slacapra 1.28 common.logger.message("Found "+str(match)+"compatible sites")
42 slacapra 1.8 else:
43 slacapra 1.28 raise CrabException("No compatible site found!")
44 fanzago 1.16 #########
45 nsmirnov 1.2 # Loop over jobs
46 nsmirnov 1.6 njs = 0
47 corvo 1.15 try:
48 corvo 1.17 for nj in self.nj_list:
49     st = common.jobDB.status(nj)
50 corvo 1.21 # print "nj = ", nj
51     # print "st = ", st
52 corvo 1.17 if st != 'C' and st != 'K' and st != 'A' and st != 'RC':
53     long_st = crabJobStatusToString(st)
54 slacapra 1.19 msg = "Job # %d not submitted: status %s"%(nj+1, long_st)
55 slacapra 1.18 common.logger.message(msg)
56 corvo 1.17 continue
57    
58     common.logger.message("Submitting job # "+`(nj+1)`)
59     jid = common.scheduler.submit(nj)
60    
61     common.jobDB.setStatus(nj, 'S')
62     common.jobDB.setJobId(nj, jid)
63 corvo 1.33 common.jobDB.setTaskId(nj, self.cfg_params['taskId'])
64 corvo 1.17 njs += 1
65    
66     ############################################
67    
68     if st == 'C':
69     resFlag = 0
70     elif st == 'RC':
71     resFlag = 2
72     else:
73     resFlag = 0
74     pass
75 spiga 1.26
76     #try:
77 slacapra 1.28 # print 'submitter prima '
78 spiga 1.26
79     Statistic.Monitor('submit',resFlag,jid,'-----')
80     # print 'submitter Dopo '
81    
82 corvo 1.17
83 spiga 1.26 #except:
84     # pass
85 corvo 1.17
86 slacapra 1.25 if (self.ML==1):
87 slacapra 1.22 try:
88 spiga 1.26 #List of parameters to be sent to ML monitor system
89 corvo 1.30 # Marco. Should be better to put it in the SchedulerEdg/gLite class
90     self.cfg_params['GridName'] = runCommand("grid-proxy-info -identity")
91 fanzago 1.32 common.logger.debug(5, "GRIDNAME: "+self.cfg_params['GridName'])
92 corvo 1.30 self.cfg_params['jobId'] = str(nj)
93     self.cfg_params['sid'] = jid
94 slacapra 1.25 user = os.getlogin()
95     try:
96     application = os.path.basename(os.environ['SCRAMRT_LOCALRT'])
97     except KeyError:
98     application = os.path.basename(os.environ['LOCALRT'])
99    
100     nevtJob = common.jobDB.maxEvents(nj)
101 corvo 1.30 tool = common.prog_name+'_v1'
102 slacapra 1.25 taskType = 'analysis'
103 corvo 1.30 rb = jid.split(':')[1]
104     self.cfg_params['rb'] = rb.replace('//', '')
105 fanzago 1.34 #params = {'taskId': self.cfg_params['taskId'], 'jobId': self.cfg_params['jobId'], 'sid': self.cfg_params['sid'], \
106     # 'application': application, 'exe': self.cfg_params['ORCA.executable'], 'nevtJob': nevtJob, 'tool': tool, \
107     # 'scheduler': self.cfg_params['CRAB.scheduler'], 'GridName': self.cfg_params['GridName'], 'taskType': taskType, \
108     # 'vo': self.cfg_params['EDG.virtual_organization'], 'dataset': self.cfg_params['ORCA.dataset'],\
109     # 'owner': self.cfg_params['ORCA.owner'], 'broker': self.cfg_params['rb'], 'user': self.cfg_params['user']}
110 corvo 1.30 params = {'taskId': self.cfg_params['taskId'], 'jobId': self.cfg_params['jobId'], 'sid': self.cfg_params['sid'], \
111 fanzago 1.34 'application': application, 'exe': self.cfg_params['ORCA.executable'], 'nevtJob': nevtJob, 'tool': tool, \
112 corvo 1.30 'scheduler': self.cfg_params['CRAB.scheduler'], 'GridName': self.cfg_params['GridName'], 'taskType': taskType, \
113 fanzago 1.34 'vo': self.cfg_params['EDG.virtual_organization'], 'broker': self.cfg_params['rb'], 'user': self.cfg_params['user']}
114 corvo 1.30 self.cfg_params['apmon'].fillDict(params)
115     self.cfg_params['apmon'].sendToML()
116 slacapra 1.25 except:
117 corvo 1.27 exctype, value = sys.exc_info()[:2]
118     common.logger.message("Submitter::run Exception raised: %s %s"%(exctype, value))
119 slacapra 1.25 pass
120     pass # use ML
121 corvo 1.31 if (self.ML==1):
122     self.cfg_params['apmon'].free()
123 corvo 1.17 except:
124 corvo 1.27 exctype, value = sys.exc_info()[:2]
125 corvo 1.30 print "Type: %s Value: %s"%(exctype, value)
126 corvo 1.27 common.logger.message("Submitter::run Exception raised: %s %s"%(exctype, value))
127 corvo 1.17 common.jobDB.save()
128 corvo 1.30
129     stop = time.time()
130 fanzago 1.32 common.logger.debug(5, "Submission Time: "+str(stop - start))
131     #print "Submission Time: %d "%(stop - start)
132 corvo 1.17 common.jobDB.save()
133 corvo 1.15
134 nsmirnov 1.6 msg = '\nTotal of %d jobs submitted'%njs
135     if njs != len(self.nj_list) :
136     msg += ' (from %d requested).'%(len(self.nj_list))
137     pass
138     else:
139     msg += '.'
140     pass
141 nsmirnov 1.2 common.logger.message(msg)
142 nsmirnov 1.1 return