ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Creator.py
Revision: 1.14
Committed: Thu Oct 13 17:10:03 2005 UTC (19 years, 6 months ago) by slacapra
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_1_0_1, CRAB_1_0_0_rc1, CRAB_1_0_0_beta4
Changes since 1.13: +2 -1 lines
Log Message:
mods to run with central bossDB or task-limited bossDB

File Contents

# User Rev Content
1 nsmirnov 1.1 from Actor import Actor
2     from WorkSpace import WorkSpace
3     from JobList import JobList
4     from JobDB import JobDB
5     from ScriptWriter import ScriptWriter
6     from Scheduler import Scheduler
7     from crab_logger import Logger
8     from crab_exceptions import *
9     from crab_util import *
10     import common
11    
12 slacapra 1.8 import os, string, math
13 nsmirnov 1.1
14     class Creator(Actor):
15     def __init__(self, job_type_name, cfg_params, ncjobs):
16     self.job_type_name = job_type_name
17     self.job_type = None
18     self.cfg_params = cfg_params
19 nsmirnov 1.2 self.total_njobs = 0
20     self.total_number_of_events = 0
21     self.job_number_of_events = 0
22 slacapra 1.7 self.first_event = 0
23 slacapra 1.14
24     ## SL No. These must not be here!
25 spiga 1.11 self.owner = cfg_params['USER.owner']
26     self.dataset = cfg_params['USER.dataset']
27 nsmirnov 1.1
28     self.createJobTypeObject()
29 nsmirnov 1.3 common.logger.debug(5, __name__+": JobType "+self.job_type.name()+" created")
30 nsmirnov 1.1
31     self.job_type.prepareSteeringCards()
32 nsmirnov 1.3 common.logger.debug(5, __name__+": Steering cards prepared")
33 nsmirnov 1.1
34 nsmirnov 1.2 self.defineTotalNumberOfJobs_()
35 nsmirnov 1.3 common.logger.debug(5, __name__+": total # of jobs = "+`self.total_njobs`)
36 nsmirnov 1.1
37     # Set number of jobs to be created
38    
39     self.ncjobs = ncjobs
40     if ncjobs == 'all' : self.ncjobs = self.total_njobs
41     if ncjobs > self.total_njobs : self.ncjobs = self.total_njobs
42 spiga 1.11
43     fileCODE1 = open(common.work_space.logDir()+"/.code","a")
44     fileCODE1.write('::'+str(self.ncjobs)+'::'+str(self.dataset)+'::'+str(self.owner))
45     fileCODE1.close()
46    
47 nsmirnov 1.1
48     #TODO: deprecated code, not needed,
49     # will be eliminated when WorkSpace.saveConfiguration()
50     # will be improved.
51     #
52     # Set/Save Job Type name
53    
54     jt_fname = common.work_space.shareDir() + 'jobtype'
55     if os.path.exists(jt_fname):
56     # Read stored job type name
57     jt_file = open(jt_fname, 'r')
58     jt = jt_file.read()
59     if self.job_type_name:
60     if ( jt != self.job_type_name+'\n' ):
61     msg = 'Job Type mismatch: requested <' + self.job_type_name
62     msg += '>, found <' + jt[:-1] + '>.'
63     raise CrabException(msg)
64     pass
65     else:
66     self.job_type_name = jt[:-1]
67     pass
68     jt_file.close()
69     pass
70     else:
71     # Save job type name
72     jt_file = open(jt_fname, 'w')
73     jt_file.write(self.job_type_name+'\n')
74     jt_file.close()
75     pass
76     #end of deprecated code
77    
78 nsmirnov 1.3 common.logger.debug(5, "Creator constructor finished")
79 nsmirnov 1.1 return
80    
81 nsmirnov 1.2 def defineTotalNumberOfJobs_(self):
82     """
83     Calculates the total number of jobs to be created.
84     """
85    
86     try:
87 slacapra 1.8 self.first_event = int(self.cfg_params['USER.first_event'])
88 slacapra 1.7 except KeyError:
89     self.first_event = 0
90 slacapra 1.8 common.logger.debug(1,"First event ot be analyzed: "+str(self.first_event))
91 slacapra 1.7
92 slacapra 1.10 # TODO Could we find a better way to get this number?
93 slacapra 1.8 maxAvailableEvents = int(self.job_type.maxEvents)
94     common.logger.debug(1,"Available events: "+str(maxAvailableEvents))
95 slacapra 1.7
96 slacapra 1.8 # some sanity check
97     if self.first_event>=maxAvailableEvents:
98 slacapra 1.7 raise CrabException('First event is bigger than maximum number of available events!')
99    
100 slacapra 1.8 # the total number of events to be analyzed
101 slacapra 1.7 try:
102 nsmirnov 1.2 n = self.cfg_params['USER.total_number_of_events']
103     if n == 'all': n = '-1'
104 slacapra 1.8 if n == '-1':
105     self.total_number_of_events = (maxAvailableEvents - self.first_event)
106     common.logger.debug(1,"Analysing all available events "+str(self.total_number_of_events))
107     else:
108     if maxAvailableEvents<(int(n)+self.first_event):
109     raise CrabException('(First event + total events)='+str(int(n)+self.first_event)+' is bigger than maximum number of available events '+str(maxAvailableEvents)+' !!')
110     self.total_number_of_events = int(n)
111 slacapra 1.7 except KeyError:
112     common.logger.message("total_number_of_events not defined, set it to maximum available")
113 slacapra 1.8 self.total_number_of_events = (maxAvailableEvents - self.first_event)
114 slacapra 1.7 pass
115 slacapra 1.8 common.logger.message("Total number of events to be analyzed: "+str(self.total_number_of_events))
116    
117 slacapra 1.7
118 slacapra 1.8 # read user directives
119 slacapra 1.7 eventPerJob=0
120     try:
121     eventPerJob = self.cfg_params['USER.job_number_of_events']
122 nsmirnov 1.2 except KeyError:
123     pass
124    
125 slacapra 1.7 jobsPerTask=0
126 nsmirnov 1.2 try:
127 slacapra 1.7 jobsPerTask = int(self.cfg_params['USER.total_number_of_jobs'])
128 nsmirnov 1.2 except KeyError:
129 slacapra 1.7 pass
130    
131     # If both the above set, complain and use event per jobs
132     if eventPerJob>0 and jobsPerTask>0:
133 nsmirnov 1.2 msg = 'Warning. '
134 slacapra 1.7 msg += 'job_number_of_events and total_number_of_jobs are both defined '
135     msg += 'Using job_number_of_events.'
136 nsmirnov 1.2 common.logger.message(msg)
137 slacapra 1.7 jobsPerTask = 0
138     if eventPerJob==0 and jobsPerTask==0:
139     msg = 'Warning. '
140     msg += 'job_number_of_events and total_number_of_jobs are not defined '
141     msg += 'Creating just one job for all events.'
142     common.logger.message(msg)
143     jobsPerTask = 1
144    
145     # first case: events per job defined
146     if eventPerJob>0:
147     n=eventPerJob
148     if n == 'all' or n == '-1' or (int(n)>self.total_number_of_events and self.total_number_of_events>0):
149     common.logger.message("Asking more events than available: set it to maximum available")
150     self.job_number_of_events = self.total_number_of_events
151 slacapra 1.8 self.total_njobs = 1
152 slacapra 1.7 else:
153     self.job_number_of_events = int(n)
154 slacapra 1.8 self.total_njobs = int((self.total_number_of_events-1)/self.job_number_of_events)+1
155 slacapra 1.7 # second case: jobs per task defined
156     elif jobsPerTask>0:
157 slacapra 1.8 common.logger.debug(2,"total number of events: "+str(self.total_number_of_events)+" JobPerTask "+str(jobsPerTask))
158     self.job_number_of_events = int(math.floor((self.total_number_of_events)/jobsPerTask))
159     self.total_njobs = jobsPerTask
160 slacapra 1.7 # should not happen...
161     else:
162     raise CrabException('Somthing wrong with splitting')
163    
164 slacapra 1.8 common.logger.debug(2,"total number of events: "+str(self.total_number_of_events)+
165 slacapra 1.7 " events per job: "+str(self.job_number_of_events))
166 slacapra 1.8
167 slacapra 1.7 return
168    
169     def writeJobsSpecsToDB(self):
170     """
171     Write firstEvent and maxEvents in the DB for future use
172     """
173    
174     common.jobDB.load()
175     # case one: write first and max events
176     nJobs=self.nJobs()
177    
178     firstEvent=self.first_event
179     # last jobs is different...
180     for job in range(nJobs-1):
181     common.jobDB.setFirstEvent(job, firstEvent)
182     common.jobDB.setMaxEvents(job, self.job_number_of_events)
183     firstEvent=firstEvent+self.job_number_of_events
184    
185     # this is the last job
186     common.jobDB.setFirstEvent(nJobs-1, firstEvent)
187 slacapra 1.8 lastJobsNumberOfEvents= (self.total_number_of_events+self.first_event)-firstEvent
188     common.jobDB.setMaxEvents(nJobs-1, lastJobsNumberOfEvents)
189 slacapra 1.7
190 fanzago 1.12 common.logger.message('Will be created '+str(self.total_njobs-1)+' jobs for '+str(self.job_number_of_events)+' each plus 1 for '+str(lastJobsNumberOfEvents)+' for a total of '+str(self.job_number_of_events*(self.total_njobs-1)+lastJobsNumberOfEvents)+' events')
191 slacapra 1.7
192     # case two (to be implemented) write eventCollections for each jobs
193 nsmirnov 1.2
194 slacapra 1.9 # save the DB
195     common.jobDB.save()
196 nsmirnov 1.2 return
197    
198 nsmirnov 1.1 def nJobs(self):
199     return self.total_njobs
200    
201     def createJobTypeObject(self):
202     file_name = 'cms_'+ string.lower(self.job_type_name)
203     klass_name = string.capitalize(self.job_type_name)
204    
205     try:
206     klass = importName(file_name, klass_name)
207     except KeyError:
208     msg = 'No `class '+klass_name+'` found in file `'+file_name+'.py`'
209     raise CrabException(msg)
210     except ImportError, e:
211     msg = 'Cannot create job type '+self.job_type_name
212     msg += ' (file: '+file_name+', class '+klass_name+'):\n'
213     msg += str(e)
214     raise CrabException(msg)
215    
216     self.job_type = klass(self.cfg_params)
217     return
218 nsmirnov 1.5
219     def jobType(self):
220     return self.job_type
221 nsmirnov 1.1
222     def run(self):
223 nsmirnov 1.4 """
224     The main method of the class.
225     """
226 nsmirnov 1.1
227 nsmirnov 1.3 common.logger.debug(5, "Creator::run() called")
228    
229 nsmirnov 1.1 # Instantiate ScriptWriter
230    
231     script_writer = ScriptWriter('crab_template.sh')
232    
233     # Loop over jobs
234    
235     njc = 0
236 nsmirnov 1.4 for nj in range(self.total_njobs):
237 nsmirnov 1.1 if njc == self.ncjobs : break
238 nsmirnov 1.4 st = common.jobDB.status(nj)
239 nsmirnov 1.1 if st != 'X': continue
240    
241 nsmirnov 1.6 common.logger.message("Creating job # "+`(nj+1)`)
242 nsmirnov 1.3
243 nsmirnov 1.1 # Prepare configuration file
244    
245 nsmirnov 1.4 self.job_type.modifySteeringCards(nj)
246 fanzago 1.12 # Create script (sh)
247 fanzago 1.13 #print "nel for prima del modifyTemplateScript, nj vale", nj
248 nsmirnov 1.4 script_writer.modifyTemplateScript(nj)
249     os.chmod(common.job_list[nj].scriptFilename(), 0744)
250 nsmirnov 1.1
251 fanzago 1.12 # Create scheduler scripts (jdl)
252     common.scheduler.createSchScript(nj)
253    
254 nsmirnov 1.4 common.jobDB.setStatus(nj, 'C')
255 slacapra 1.9 # common: write input and output sandbox
256     common.jobDB.setInputSandbox(nj, self.job_type.inputSandbox(nj))
257    
258     outputSandbox=self.job_type.outputSandbox(nj)
259     stdout=common.job_list[nj].stdout()
260     stderr=common.job_list[nj].stderr()
261     outputSandbox.append(common.job_list[nj].stdout())
262     # check if out!=err
263     if stdout != stderr:
264     outputSandbox.append(common.job_list[nj].stderr())
265     common.jobDB.setOutputSandbox(nj, outputSandbox)
266    
267 nsmirnov 1.1 njc = njc + 1
268     pass
269    
270     ####
271 fanzago 1.12
272 nsmirnov 1.1 common.jobDB.save()
273    
274     msg = '\nTotal of %d jobs created'%njc
275     if njc != self.ncjobs: msg = msg + ' from %d requested'%self.ncjobs
276     msg = msg + '.\n'
277     common.logger.message(msg)
278     return