ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Creator.py
Revision: 1.24
Committed: Fri Mar 10 13:59:03 2006 UTC (19 years, 1 month ago) by fanzago
Content type: text/x-python
Branch: MAIN
Changes since 1.23: +3 -11 lines
Log Message:
fixed params number

File Contents

# User Rev Content
1 nsmirnov 1.1 from Actor import Actor
2     from WorkSpace import WorkSpace
3     from JobList import JobList
4     from JobDB import JobDB
5     from ScriptWriter import ScriptWriter
6     from Scheduler import Scheduler
7     from crab_logger import Logger
8     from crab_exceptions import *
9     from crab_util import *
10     import common
11    
12 slacapra 1.8 import os, string, math
13 fanzago 1.24
14 nsmirnov 1.1 class Creator(Actor):
15 fanzago 1.24 def __init__(self, job_type_name, cfg_params, ncjobs):
16 nsmirnov 1.1 self.job_type_name = job_type_name
17 fanzago 1.24 self.job_type = None
18 nsmirnov 1.1 self.cfg_params = cfg_params
19 nsmirnov 1.2 self.total_njobs = 0
20     self.total_number_of_events = 0
21     self.job_number_of_events = 0
22 slacapra 1.7 self.first_event = 0
23 slacapra 1.14
24 nsmirnov 1.1 self.createJobTypeObject()
25 nsmirnov 1.3 common.logger.debug(5, __name__+": JobType "+self.job_type.name()+" created")
26 nsmirnov 1.1
27     self.job_type.prepareSteeringCards()
28 nsmirnov 1.3 common.logger.debug(5, __name__+": Steering cards prepared")
29 nsmirnov 1.1
30 nsmirnov 1.2 self.defineTotalNumberOfJobs_()
31 nsmirnov 1.3 common.logger.debug(5, __name__+": total # of jobs = "+`self.total_njobs`)
32 nsmirnov 1.1
33     # Set number of jobs to be created
34    
35     self.ncjobs = ncjobs
36     if ncjobs == 'all' : self.ncjobs = self.total_njobs
37     if ncjobs > self.total_njobs : self.ncjobs = self.total_njobs
38 spiga 1.11
39 spiga 1.20 fileCODE1 = open(common.work_space.shareDir()+"/.code","a")
40     ### commented for FAMOS
41     if self.job_type.name() == 'ORCA':
42     self.owner = cfg_params['ORCA.owner']
43     self.dataset = cfg_params['ORCA.dataset']
44     fileCODE1.write('::'+str(self.job_type.name())+'::'+str(self.ncjobs)+'::'+str(self.dataset)+'::'+str(self.owner))
45     pass
46     elif self.job_type.name() == 'FAMOS':
47     pass
48 spiga 1.11 fileCODE1.close()
49    
50 nsmirnov 1.1
51     #TODO: deprecated code, not needed,
52     # will be eliminated when WorkSpace.saveConfiguration()
53     # will be improved.
54     #
55     # Set/Save Job Type name
56    
57     jt_fname = common.work_space.shareDir() + 'jobtype'
58     if os.path.exists(jt_fname):
59     # Read stored job type name
60     jt_file = open(jt_fname, 'r')
61     jt = jt_file.read()
62     if self.job_type_name:
63     if ( jt != self.job_type_name+'\n' ):
64     msg = 'Job Type mismatch: requested <' + self.job_type_name
65     msg += '>, found <' + jt[:-1] + '>.'
66     raise CrabException(msg)
67     pass
68     else:
69     self.job_type_name = jt[:-1]
70     pass
71     jt_file.close()
72     pass
73     else:
74     # Save job type name
75     jt_file = open(jt_fname, 'w')
76     jt_file.write(self.job_type_name+'\n')
77     jt_file.close()
78     pass
79     #end of deprecated code
80    
81 nsmirnov 1.3 common.logger.debug(5, "Creator constructor finished")
82 nsmirnov 1.1 return
83    
84 nsmirnov 1.2 def defineTotalNumberOfJobs_(self):
85     """
86     Calculates the total number of jobs to be created.
87     """
88    
89     try:
90 slacapra 1.8 self.first_event = int(self.cfg_params['USER.first_event'])
91 slacapra 1.7 except KeyError:
92     self.first_event = 0
93 slacapra 1.8 common.logger.debug(1,"First event ot be analyzed: "+str(self.first_event))
94 slacapra 1.7
95 slacapra 1.10 # TODO Could we find a better way to get this number?
96 slacapra 1.8 maxAvailableEvents = int(self.job_type.maxEvents)
97     common.logger.debug(1,"Available events: "+str(maxAvailableEvents))
98 slacapra 1.7
99 slacapra 1.8 # some sanity check
100     if self.first_event>=maxAvailableEvents:
101 slacapra 1.7 raise CrabException('First event is bigger than maximum number of available events!')
102    
103 slacapra 1.8 # the total number of events to be analyzed
104 slacapra 1.7 try:
105 nsmirnov 1.2 n = self.cfg_params['USER.total_number_of_events']
106     if n == 'all': n = '-1'
107 slacapra 1.8 if n == '-1':
108     self.total_number_of_events = (maxAvailableEvents - self.first_event)
109     common.logger.debug(1,"Analysing all available events "+str(self.total_number_of_events))
110     else:
111     if maxAvailableEvents<(int(n)+self.first_event):
112 slacapra 1.17 raise CrabException('(First event + total events)='+str(int(n)+self.first_event)+' is bigger than maximum number of available events '+str(maxAvailableEvents)+' !! Use "total_number_of_events=-1" to analyze to whole dataset')
113 slacapra 1.8 self.total_number_of_events = int(n)
114 slacapra 1.7 except KeyError:
115     common.logger.message("total_number_of_events not defined, set it to maximum available")
116 slacapra 1.8 self.total_number_of_events = (maxAvailableEvents - self.first_event)
117 slacapra 1.7 pass
118 slacapra 1.8 common.logger.message("Total number of events to be analyzed: "+str(self.total_number_of_events))
119    
120 slacapra 1.7
121 slacapra 1.8 # read user directives
122 slacapra 1.7 eventPerJob=0
123     try:
124     eventPerJob = self.cfg_params['USER.job_number_of_events']
125 nsmirnov 1.2 except KeyError:
126     pass
127    
128 slacapra 1.7 jobsPerTask=0
129 nsmirnov 1.2 try:
130 slacapra 1.7 jobsPerTask = int(self.cfg_params['USER.total_number_of_jobs'])
131 nsmirnov 1.2 except KeyError:
132 slacapra 1.7 pass
133    
134     # If both the above set, complain and use event per jobs
135     if eventPerJob>0 and jobsPerTask>0:
136 nsmirnov 1.2 msg = 'Warning. '
137 slacapra 1.7 msg += 'job_number_of_events and total_number_of_jobs are both defined '
138     msg += 'Using job_number_of_events.'
139 nsmirnov 1.2 common.logger.message(msg)
140 slacapra 1.7 jobsPerTask = 0
141     if eventPerJob==0 and jobsPerTask==0:
142     msg = 'Warning. '
143     msg += 'job_number_of_events and total_number_of_jobs are not defined '
144     msg += 'Creating just one job for all events.'
145     common.logger.message(msg)
146     jobsPerTask = 1
147    
148     # first case: events per job defined
149     if eventPerJob>0:
150     n=eventPerJob
151     if n == 'all' or n == '-1' or (int(n)>self.total_number_of_events and self.total_number_of_events>0):
152     common.logger.message("Asking more events than available: set it to maximum available")
153     self.job_number_of_events = self.total_number_of_events
154 slacapra 1.8 self.total_njobs = 1
155 slacapra 1.7 else:
156     self.job_number_of_events = int(n)
157 slacapra 1.8 self.total_njobs = int((self.total_number_of_events-1)/self.job_number_of_events)+1
158 slacapra 1.7 # second case: jobs per task defined
159     elif jobsPerTask>0:
160 slacapra 1.8 common.logger.debug(2,"total number of events: "+str(self.total_number_of_events)+" JobPerTask "+str(jobsPerTask))
161     self.job_number_of_events = int(math.floor((self.total_number_of_events)/jobsPerTask))
162     self.total_njobs = jobsPerTask
163 slacapra 1.7 # should not happen...
164     else:
165     raise CrabException('Somthing wrong with splitting')
166    
167 slacapra 1.8 common.logger.debug(2,"total number of events: "+str(self.total_number_of_events)+
168 slacapra 1.7 " events per job: "+str(self.job_number_of_events))
169 slacapra 1.8
170 slacapra 1.7 return
171    
172     def writeJobsSpecsToDB(self):
173     """
174     Write firstEvent and maxEvents in the DB for future use
175     """
176    
177     common.jobDB.load()
178     # case one: write first and max events
179     nJobs=self.nJobs()
180    
181     firstEvent=self.first_event
182 fanzago 1.21 ### fede for famos
183     lastJobsNumberOfEvents = self.job_number_of_events
184     ###
185 slacapra 1.7 # last jobs is different...
186     for job in range(nJobs-1):
187     common.jobDB.setFirstEvent(job, firstEvent)
188     common.jobDB.setMaxEvents(job, self.job_number_of_events)
189 fanzago 1.21 ### fede checks parameter for famos and orca
190     try:
191     self.events_management = common.analisys_common_info['events_management']
192     #print "self.events_management = ", self.events_management
193     except KeyError: self.events_management = 0
194     if int(self.events_management) == 1:
195     firstEvent=firstEvent+self.job_number_of_events
196     #print "firstEvent", firstEvent
197     ###
198    
199     ### firstEvent=firstEvent+self.job_number_of_events
200 slacapra 1.7
201     # this is the last job
202     common.jobDB.setFirstEvent(nJobs-1, firstEvent)
203 fanzago 1.21 #### FEDE
204     #print "firstEvent", firstEvent
205     if int(self.events_management) == 1:
206     lastJobsNumberOfEvents= (self.total_number_of_events+self.first_event)-firstEvent
207     ####
208    
209     ### lastJobsNumberOfEvents= (self.total_number_of_events+self.first_event)-firstEvent
210 fanzago 1.19 #print 'lastJobsNumberOfEvents :', lastJobsNumberOfEvents
211 slacapra 1.8 common.jobDB.setMaxEvents(nJobs-1, lastJobsNumberOfEvents)
212 slacapra 1.7
213 slacapra 1.15 if (lastJobsNumberOfEvents!=self.job_number_of_events):
214     common.logger.message(str(self.total_njobs-1)+' jobs will be created for '+str(self.job_number_of_events)+' events each plus 1 for '+str(lastJobsNumberOfEvents)+' events for a total of '+str(self.job_number_of_events*(self.total_njobs-1)+lastJobsNumberOfEvents)+' events')
215     else:
216     common.logger.message(str(self.total_njobs)+' jobs will be created for '+str(self.job_number_of_events)+' events each for a total of '+str(self.job_number_of_events*(self.total_njobs-1)+lastJobsNumberOfEvents)+' events')
217 slacapra 1.7
218     # case two (to be implemented) write eventCollections for each jobs
219 nsmirnov 1.2
220 slacapra 1.9 # save the DB
221     common.jobDB.save()
222 nsmirnov 1.2 return
223    
224 nsmirnov 1.1 def nJobs(self):
225     return self.total_njobs
226    
227     def createJobTypeObject(self):
228     file_name = 'cms_'+ string.lower(self.job_type_name)
229     klass_name = string.capitalize(self.job_type_name)
230    
231     try:
232     klass = importName(file_name, klass_name)
233     except KeyError:
234     msg = 'No `class '+klass_name+'` found in file `'+file_name+'.py`'
235     raise CrabException(msg)
236     except ImportError, e:
237     msg = 'Cannot create job type '+self.job_type_name
238     msg += ' (file: '+file_name+', class '+klass_name+'):\n'
239     msg += str(e)
240     raise CrabException(msg)
241    
242     self.job_type = klass(self.cfg_params)
243     return
244 nsmirnov 1.5
245     def jobType(self):
246     return self.job_type
247 nsmirnov 1.1
248     def run(self):
249 nsmirnov 1.4 """
250     The main method of the class.
251     """
252 nsmirnov 1.1
253 nsmirnov 1.3 common.logger.debug(5, "Creator::run() called")
254    
255 nsmirnov 1.1 # Instantiate ScriptWriter
256    
257     script_writer = ScriptWriter('crab_template.sh')
258    
259     # Loop over jobs
260    
261     njc = 0
262 nsmirnov 1.4 for nj in range(self.total_njobs):
263 nsmirnov 1.1 if njc == self.ncjobs : break
264 nsmirnov 1.4 st = common.jobDB.status(nj)
265 nsmirnov 1.1 if st != 'X': continue
266    
267 nsmirnov 1.6 common.logger.message("Creating job # "+`(nj+1)`)
268 nsmirnov 1.3
269 nsmirnov 1.1 # Prepare configuration file
270    
271 nsmirnov 1.4 self.job_type.modifySteeringCards(nj)
272 fanzago 1.12 # Create script (sh)
273 fanzago 1.13 #print "nel for prima del modifyTemplateScript, nj vale", nj
274 nsmirnov 1.4 script_writer.modifyTemplateScript(nj)
275     os.chmod(common.job_list[nj].scriptFilename(), 0744)
276 nsmirnov 1.1
277 fanzago 1.12 # Create scheduler scripts (jdl)
278     common.scheduler.createSchScript(nj)
279    
280 nsmirnov 1.4 common.jobDB.setStatus(nj, 'C')
281 slacapra 1.9 # common: write input and output sandbox
282     common.jobDB.setInputSandbox(nj, self.job_type.inputSandbox(nj))
283    
284     outputSandbox=self.job_type.outputSandbox(nj)
285     stdout=common.job_list[nj].stdout()
286     stderr=common.job_list[nj].stderr()
287     outputSandbox.append(common.job_list[nj].stdout())
288     # check if out!=err
289     if stdout != stderr:
290     outputSandbox.append(common.job_list[nj].stderr())
291     common.jobDB.setOutputSandbox(nj, outputSandbox)
292 corvo 1.18 common.jobDB.setTaskId(nj, self.cfg_params['taskId'])
293 slacapra 1.9
294 nsmirnov 1.1 njc = njc + 1
295     pass
296    
297     ####
298 fanzago 1.12
299 nsmirnov 1.1 common.jobDB.save()
300    
301     msg = '\nTotal of %d jobs created'%njc
302     if njc != self.ncjobs: msg = msg + ' from %d requested'%self.ncjobs
303     msg = msg + '.\n'
304     common.logger.message(msg)
305     return