ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Creator.py
Revision: 1.50
Committed: Mon May 28 15:32:23 2007 UTC (17 years, 11 months ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_0_0_pre9, CRAB_1_5_4, CRAB_1_5_4_pre2, CRAB_1_5_4_pre1, CRAB_2_0_0_pre7, CRAB_2_0_0_pre6, CRAB_1_5_3, CRAB_1_5_3_pre5, CRAB_1_5_3_pre4, CRAB_2_0_0_pre5, CRAB_1_5_3_pre3, configure, CRAB_2_0_0_pre4, CRAB_1_5_3_pre2, CRAB_1_5_3_pre1, CRAB_2_0_0_pre3, CRAB_2_0_0_pre2, CRAB_2_0_0_pre1
Branch point for: CRAB_1_5_4_SLC3_start
Changes since 1.49: +13 -2 lines
Log Message:
just a minor temporary workaround to support server_mode = 1 [if servermode =1--> move boss declaration at creation level to the submission one]

File Contents

# Content
1 from Actor import Actor
2 from WorkSpace import WorkSpace
3 from JobList import JobList
4 from JobDB import JobDB
5 from ScriptWriter import ScriptWriter
6 from Scheduler import Scheduler
7 from crab_logger import Logger
8 from crab_exceptions import *
9 from crab_util import *
10 import common
11
12 import os, string, math
13 import time
14
15 class Creator(Actor):
16 def __init__(self, job_type_name, cfg_params, ncjobs):
17 self.job_type_name = job_type_name
18 self.job_type = None
19 self.cfg_params = cfg_params
20 self.total_njobs = 0
21 self.total_number_of_events = 0
22 self.job_number_of_events = 0
23 self.first_event = 0
24 self.jobParamsList=[]
25
26 self.createJobTypeObject(ncjobs)
27 common.logger.debug(5, __name__+": JobType "+self.job_type.name()+" created")
28
29 self.job_type.prepareSteeringCards()
30 common.logger.debug(5, __name__+": Steering cards prepared")
31
32 self.total_njobs = self.job_type.numberOfJobs();
33 common.logger.debug(5, __name__+": total # of jobs = "+`self.total_njobs`)
34
35 # Set number of jobs to be created
36 # ---------------------------------> Boss4: changed, "all" by default
37
38 self.ncjobs = ncjobs
39 if ncjobs == 'all' : self.ncjobs = self.total_njobs
40 if ncjobs > self.total_njobs : self.ncjobs = self.total_njobs
41
42 # This is code for proto-monitoring
43 code=common.taskDB.dict("CODE")
44 if self.job_type.name() == 'CMSSW':
45 try:
46 self.primaryDataset = cfg_params['CMSSW.datasetpath'].split("/")[1]
47 except:
48 self.primaryDataset = 'None'
49 try:
50 self.ProcessedDataset = cfg_params['CMSSW.datasetpath'].split("/")[3]
51 except:
52 self.ProcessedDataset = ' '
53 common.taskDB.setDict("CODE",(code+'::'+str(self.job_type.name())+'::'+str(self.ncjobs)+'::'+str(self.primaryDataset)+'::'+str(self.ProcessedDataset)))
54 pass
55
56
57 self.UseServer=0
58 try:
59 self.UseServer=int(self.cfg_params['CRAB.server_mode'])
60 except KeyError:
61 pass
62
63 # This is code for dashboard
64 #First checkProxy
65 common.scheduler.checkProxy()
66 try:
67 fl = open(common.work_space.shareDir() + '/' + self.cfg_params['apmon'].fName, 'w')
68 self.cfg_params['GridName'] = runCommand("voms-proxy-info -identity")
69 common.logger.debug(5, "GRIDNAME: "+self.cfg_params['GridName'])
70 taskType = 'analysis'
71 try: VO = cfg_params['EDG.virtual_organization']
72 except KeyError: VO = 'cms'
73
74 params = {'tool': common.prog_name,\
75 'JSToolVersion': common.prog_version_str, \
76 'tool_ui': os.environ['HOSTNAME'], \
77 'scheduler': self.cfg_params['CRAB.scheduler'], \
78 'GridName': self.cfg_params['GridName'].strip(), \
79 'taskType': taskType, \
80 'vo': VO, \
81 'user': self.cfg_params['user']}
82 jtParam = self.job_type.getParams()
83 for i in jtParam.iterkeys():
84 params[i] = string.strip(jtParam[i])
85 for j, k in params.iteritems():
86 # print "Values: %s %s"%(j, k)
87 fl.write(j + ':' + k + '\n')
88 fl.close()
89 except:
90 exctype, value = sys.exc_info()[:2]
91 common.logger.message("Creator::run Exception raised in collection params for dashboard: %s %s"%(exctype, value))
92 pass
93
94 # Set/Save Job Type name
95 self.job_type_name = common.taskDB.dict("jobtype")
96
97 common.logger.debug(5, "Creator constructor finished")
98 return
99
100 def writeJobsSpecsToDB(self):
101 """
102 Write firstEvent and maxEvents in the DB for future use
103 """
104
105 self.job_type.split(self.jobParamsList)
106 return
107
108 def nJobs(self):
109 return self.total_njobs
110
111 def createJobTypeObject(self,ncjobs):
112 file_name = 'cms_'+ string.lower(self.job_type_name)
113 klass_name = string.capitalize(self.job_type_name)
114
115 try:
116 klass = importName(file_name, klass_name)
117 except KeyError:
118 msg = 'No `class '+klass_name+'` found in file `'+file_name+'.py`'
119 raise CrabException(msg)
120 except ImportError, e:
121 msg = 'Cannot create job type '+self.job_type_name
122 msg += ' (file: '+file_name+', class '+klass_name+'):\n'
123 msg += str(e)
124 raise CrabException(msg)
125
126 self.job_type = klass(self.cfg_params,ncjobs)
127 return
128
129 def jobType(self):
130 return self.job_type
131
132 def run(self):
133 """
134 The main method of the class.
135 """
136
137 common.logger.debug(5, "Creator::run() called")
138 start = time.time()
139
140 # Instantiate ScriptWriter
141
142 script_writer = ScriptWriter('crab_template.sh')
143
144 # Loop over jobs
145 argsList = []
146
147 njc = 0
148 block = -1 # first block is 0
149 lastDest=''
150 for nj in range(self.total_njobs):
151 if njc == self.ncjobs : break
152 st = common.jobDB.status(nj)
153 if st != 'X': continue
154
155 common.logger.debug(1,"Creating job # "+`(nj+1)`)
156
157 # Prepare configuration file
158
159 self.job_type.modifySteeringCards(nj)
160 # Create script (sh)
161 script_writer.modifyTemplateScript()
162 os.chmod(common.taskDB.dict("ScriptName"), 0744)
163
164 # Create XML script and declare related jobs
165 argsList.append( str(nj+1)+' '+ self.job_type.getJobTypeArguments(nj, self.cfg_params['CRAB.scheduler']) )
166
167 common.jobDB.setStatus(nj, 'C')
168 # common: write input and output sandbox
169 common.jobDB.setInputSandbox(nj, self.job_type.inputSandbox(nj))
170
171 outputSandbox=self.job_type.outputSandbox(nj)
172 # check if out!=err
173 common.jobDB.setOutputSandbox(nj, outputSandbox)
174 common.jobDB.setTaskId(nj, self.cfg_params['taskId'])
175
176 currDest=common.jobDB.destination(nj)
177 if (currDest!=lastDest):
178 block+=1
179 lastDest = currDest
180 pass
181 common.jobDB.setBlock(nj,block)
182 njc = njc + 1
183 pass
184
185 ####
186 common.scheduler.createXMLSchScript(self.total_njobs, argsList)
187 common.logger.message('Creating '+str(self.total_njobs)+' jobs, please wait...')
188 # modified to support server mode - DS
189 # if crab is used in server mode the client must skyp
190 # boss declare step. In this case it is performed at server level
191 # just before the submission step
192 if (self.UseServer== 0):
193 common.scheduler.declareJob_() #Add for BOSS4
194
195 stop = time.time()
196 common.logger.debug(2, "Creation Time: "+str(stop - start))
197 common.logger.write("Creation Time: "+str(stop - start))
198
199 common.jobDB.save()
200 common.taskDB.save()
201 msg = '\nTotal of %d jobs created'%njc
202 if njc != self.ncjobs: msg = msg + ' from %d requested'%self.ncjobs
203 msg = msg + '.\n'
204 common.logger.message(msg)
205
206 return