ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Creator.py
Revision: 1.26
Committed: Tue Mar 14 16:14:53 2006 UTC (19 years, 1 month ago) by corvo
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_1_0_6, CRAB_1_0_5
Changes since 1.25: +12 -4 lines
Log Message:
Fix Creator parameters

File Contents

# User Rev Content
1 nsmirnov 1.1 from Actor import Actor
2     from WorkSpace import WorkSpace
3     from JobList import JobList
4     from JobDB import JobDB
5     from ScriptWriter import ScriptWriter
6     from Scheduler import Scheduler
7     from crab_logger import Logger
8     from crab_exceptions import *
9     from crab_util import *
10     import common
11    
12 slacapra 1.8 import os, string, math
13 corvo 1.26 # marco
14 nsmirnov 1.1 class Creator(Actor):
15 corvo 1.26 def __init__(self, job_type_name, cfg_params, ncjobs, job_type):
16 nsmirnov 1.1 self.job_type_name = job_type_name
17 corvo 1.26 # self.job_type = None
18     # marco
19     self.job_type = job_type
20 nsmirnov 1.1 self.cfg_params = cfg_params
21 nsmirnov 1.2 self.total_njobs = 0
22     self.total_number_of_events = 0
23     self.job_number_of_events = 0
24 slacapra 1.7 self.first_event = 0
25 slacapra 1.14
26 corvo 1.26 ### commented for FAMOS
27     # self.owner = cfg_params['ORCA.owner']
28     # self.dataset = cfg_params['ORCA.dataset']
29    
30     # marco
31     # self.createJobTypeObject()
32 nsmirnov 1.1 self.createJobTypeObject()
33 nsmirnov 1.3 common.logger.debug(5, __name__+": JobType "+self.job_type.name()+" created")
34 nsmirnov 1.1
35     self.job_type.prepareSteeringCards()
36 nsmirnov 1.3 common.logger.debug(5, __name__+": Steering cards prepared")
37 nsmirnov 1.1
38 nsmirnov 1.2 self.defineTotalNumberOfJobs_()
39 nsmirnov 1.3 common.logger.debug(5, __name__+": total # of jobs = "+`self.total_njobs`)
40 nsmirnov 1.1
41     # Set number of jobs to be created
42    
43     self.ncjobs = ncjobs
44     if ncjobs == 'all' : self.ncjobs = self.total_njobs
45     if ncjobs > self.total_njobs : self.ncjobs = self.total_njobs
46 spiga 1.11
47 spiga 1.20 fileCODE1 = open(common.work_space.shareDir()+"/.code","a")
48     ### commented for FAMOS
49 corvo 1.26 if self.job_type.name() == 'ORCA':
50 spiga 1.20 self.owner = cfg_params['ORCA.owner']
51     self.dataset = cfg_params['ORCA.dataset']
52     fileCODE1.write('::'+str(self.job_type.name())+'::'+str(self.ncjobs)+'::'+str(self.dataset)+'::'+str(self.owner))
53     pass
54     elif self.job_type.name() == 'FAMOS':
55     pass
56 spiga 1.11 fileCODE1.close()
57    
58 nsmirnov 1.1
59     #TODO: deprecated code, not needed,
60     # will be eliminated when WorkSpace.saveConfiguration()
61     # will be improved.
62     #
63     # Set/Save Job Type name
64    
65     jt_fname = common.work_space.shareDir() + 'jobtype'
66     if os.path.exists(jt_fname):
67     # Read stored job type name
68     jt_file = open(jt_fname, 'r')
69     jt = jt_file.read()
70     if self.job_type_name:
71     if ( jt != self.job_type_name+'\n' ):
72     msg = 'Job Type mismatch: requested <' + self.job_type_name
73     msg += '>, found <' + jt[:-1] + '>.'
74     raise CrabException(msg)
75     pass
76     else:
77     self.job_type_name = jt[:-1]
78     pass
79     jt_file.close()
80     pass
81     else:
82     # Save job type name
83     jt_file = open(jt_fname, 'w')
84     jt_file.write(self.job_type_name+'\n')
85     jt_file.close()
86     pass
87     #end of deprecated code
88    
89 nsmirnov 1.3 common.logger.debug(5, "Creator constructor finished")
90 nsmirnov 1.1 return
91    
92 nsmirnov 1.2 def defineTotalNumberOfJobs_(self):
93     """
94     Calculates the total number of jobs to be created.
95     """
96    
97     try:
98 slacapra 1.8 self.first_event = int(self.cfg_params['USER.first_event'])
99 slacapra 1.7 except KeyError:
100     self.first_event = 0
101 slacapra 1.8 common.logger.debug(1,"First event ot be analyzed: "+str(self.first_event))
102 slacapra 1.7
103 slacapra 1.10 # TODO Could we find a better way to get this number?
104 slacapra 1.8 maxAvailableEvents = int(self.job_type.maxEvents)
105     common.logger.debug(1,"Available events: "+str(maxAvailableEvents))
106 slacapra 1.7
107 slacapra 1.8 # some sanity check
108     if self.first_event>=maxAvailableEvents:
109 slacapra 1.7 raise CrabException('First event is bigger than maximum number of available events!')
110    
111 slacapra 1.8 # the total number of events to be analyzed
112 slacapra 1.7 try:
113 nsmirnov 1.2 n = self.cfg_params['USER.total_number_of_events']
114     if n == 'all': n = '-1'
115 slacapra 1.8 if n == '-1':
116     self.total_number_of_events = (maxAvailableEvents - self.first_event)
117     common.logger.debug(1,"Analysing all available events "+str(self.total_number_of_events))
118     else:
119     if maxAvailableEvents<(int(n)+self.first_event):
120 slacapra 1.17 raise CrabException('(First event + total events)='+str(int(n)+self.first_event)+' is bigger than maximum number of available events '+str(maxAvailableEvents)+' !! Use "total_number_of_events=-1" to analyze to whole dataset')
121 slacapra 1.8 self.total_number_of_events = int(n)
122 slacapra 1.7 except KeyError:
123     common.logger.message("total_number_of_events not defined, set it to maximum available")
124 slacapra 1.8 self.total_number_of_events = (maxAvailableEvents - self.first_event)
125 slacapra 1.7 pass
126 slacapra 1.8 common.logger.message("Total number of events to be analyzed: "+str(self.total_number_of_events))
127    
128 slacapra 1.7
129 slacapra 1.8 # read user directives
130 slacapra 1.7 eventPerJob=0
131     try:
132     eventPerJob = self.cfg_params['USER.job_number_of_events']
133 nsmirnov 1.2 except KeyError:
134     pass
135    
136 slacapra 1.7 jobsPerTask=0
137 nsmirnov 1.2 try:
138 slacapra 1.7 jobsPerTask = int(self.cfg_params['USER.total_number_of_jobs'])
139 nsmirnov 1.2 except KeyError:
140 slacapra 1.7 pass
141    
142     # If both the above set, complain and use event per jobs
143     if eventPerJob>0 and jobsPerTask>0:
144 nsmirnov 1.2 msg = 'Warning. '
145 slacapra 1.7 msg += 'job_number_of_events and total_number_of_jobs are both defined '
146     msg += 'Using job_number_of_events.'
147 nsmirnov 1.2 common.logger.message(msg)
148 slacapra 1.7 jobsPerTask = 0
149     if eventPerJob==0 and jobsPerTask==0:
150     msg = 'Warning. '
151     msg += 'job_number_of_events and total_number_of_jobs are not defined '
152     msg += 'Creating just one job for all events.'
153     common.logger.message(msg)
154     jobsPerTask = 1
155    
156     # first case: events per job defined
157     if eventPerJob>0:
158     n=eventPerJob
159     if n == 'all' or n == '-1' or (int(n)>self.total_number_of_events and self.total_number_of_events>0):
160     common.logger.message("Asking more events than available: set it to maximum available")
161     self.job_number_of_events = self.total_number_of_events
162 slacapra 1.8 self.total_njobs = 1
163 slacapra 1.7 else:
164     self.job_number_of_events = int(n)
165 slacapra 1.8 self.total_njobs = int((self.total_number_of_events-1)/self.job_number_of_events)+1
166 slacapra 1.7 # second case: jobs per task defined
167     elif jobsPerTask>0:
168 slacapra 1.8 common.logger.debug(2,"total number of events: "+str(self.total_number_of_events)+" JobPerTask "+str(jobsPerTask))
169     self.job_number_of_events = int(math.floor((self.total_number_of_events)/jobsPerTask))
170     self.total_njobs = jobsPerTask
171 slacapra 1.7 # should not happen...
172     else:
173     raise CrabException('Somthing wrong with splitting')
174    
175 slacapra 1.8 common.logger.debug(2,"total number of events: "+str(self.total_number_of_events)+
176 slacapra 1.7 " events per job: "+str(self.job_number_of_events))
177 slacapra 1.8
178 slacapra 1.7 return
179    
180     def writeJobsSpecsToDB(self):
181     """
182     Write firstEvent and maxEvents in the DB for future use
183     """
184    
185     common.jobDB.load()
186     # case one: write first and max events
187     nJobs=self.nJobs()
188    
189     firstEvent=self.first_event
190 fanzago 1.21 ### fede for famos
191     lastJobsNumberOfEvents = self.job_number_of_events
192     ###
193 slacapra 1.7 # last jobs is different...
194     for job in range(nJobs-1):
195     common.jobDB.setFirstEvent(job, firstEvent)
196     common.jobDB.setMaxEvents(job, self.job_number_of_events)
197 fanzago 1.21 ### fede checks parameter for famos and orca
198     try:
199     self.events_management = common.analisys_common_info['events_management']
200     #print "self.events_management = ", self.events_management
201     except KeyError: self.events_management = 0
202     if int(self.events_management) == 1:
203     firstEvent=firstEvent+self.job_number_of_events
204     #print "firstEvent", firstEvent
205     ###
206    
207     ### firstEvent=firstEvent+self.job_number_of_events
208 slacapra 1.7
209     # this is the last job
210     common.jobDB.setFirstEvent(nJobs-1, firstEvent)
211 fanzago 1.21 #### FEDE
212     #print "firstEvent", firstEvent
213     if int(self.events_management) == 1:
214     lastJobsNumberOfEvents= (self.total_number_of_events+self.first_event)-firstEvent
215     ####
216    
217     ### lastJobsNumberOfEvents= (self.total_number_of_events+self.first_event)-firstEvent
218 fanzago 1.19 #print 'lastJobsNumberOfEvents :', lastJobsNumberOfEvents
219 slacapra 1.8 common.jobDB.setMaxEvents(nJobs-1, lastJobsNumberOfEvents)
220 slacapra 1.7
221 slacapra 1.15 if (lastJobsNumberOfEvents!=self.job_number_of_events):
222     common.logger.message(str(self.total_njobs-1)+' jobs will be created for '+str(self.job_number_of_events)+' events each plus 1 for '+str(lastJobsNumberOfEvents)+' events for a total of '+str(self.job_number_of_events*(self.total_njobs-1)+lastJobsNumberOfEvents)+' events')
223     else:
224     common.logger.message(str(self.total_njobs)+' jobs will be created for '+str(self.job_number_of_events)+' events each for a total of '+str(self.job_number_of_events*(self.total_njobs-1)+lastJobsNumberOfEvents)+' events')
225 slacapra 1.7
226     # case two (to be implemented) write eventCollections for each jobs
227 nsmirnov 1.2
228 slacapra 1.9 # save the DB
229     common.jobDB.save()
230 nsmirnov 1.2 return
231    
232 nsmirnov 1.1 def nJobs(self):
233     return self.total_njobs
234    
235     def createJobTypeObject(self):
236     file_name = 'cms_'+ string.lower(self.job_type_name)
237     klass_name = string.capitalize(self.job_type_name)
238    
239     try:
240     klass = importName(file_name, klass_name)
241     except KeyError:
242     msg = 'No `class '+klass_name+'` found in file `'+file_name+'.py`'
243     raise CrabException(msg)
244     except ImportError, e:
245     msg = 'Cannot create job type '+self.job_type_name
246     msg += ' (file: '+file_name+', class '+klass_name+'):\n'
247     msg += str(e)
248     raise CrabException(msg)
249    
250     self.job_type = klass(self.cfg_params)
251     return
252 nsmirnov 1.5
253     def jobType(self):
254     return self.job_type
255 nsmirnov 1.1
256     def run(self):
257 nsmirnov 1.4 """
258     The main method of the class.
259     """
260 nsmirnov 1.1
261 nsmirnov 1.3 common.logger.debug(5, "Creator::run() called")
262    
263 nsmirnov 1.1 # Instantiate ScriptWriter
264    
265     script_writer = ScriptWriter('crab_template.sh')
266    
267     # Loop over jobs
268    
269     njc = 0
270 nsmirnov 1.4 for nj in range(self.total_njobs):
271 nsmirnov 1.1 if njc == self.ncjobs : break
272 nsmirnov 1.4 st = common.jobDB.status(nj)
273 nsmirnov 1.1 if st != 'X': continue
274    
275 nsmirnov 1.6 common.logger.message("Creating job # "+`(nj+1)`)
276 nsmirnov 1.3
277 nsmirnov 1.1 # Prepare configuration file
278    
279 nsmirnov 1.4 self.job_type.modifySteeringCards(nj)
280 fanzago 1.12 # Create script (sh)
281 fanzago 1.13 #print "nel for prima del modifyTemplateScript, nj vale", nj
282 nsmirnov 1.4 script_writer.modifyTemplateScript(nj)
283     os.chmod(common.job_list[nj].scriptFilename(), 0744)
284 nsmirnov 1.1
285 fanzago 1.12 # Create scheduler scripts (jdl)
286     common.scheduler.createSchScript(nj)
287    
288 nsmirnov 1.4 common.jobDB.setStatus(nj, 'C')
289 slacapra 1.9 # common: write input and output sandbox
290     common.jobDB.setInputSandbox(nj, self.job_type.inputSandbox(nj))
291    
292     outputSandbox=self.job_type.outputSandbox(nj)
293     stdout=common.job_list[nj].stdout()
294     stderr=common.job_list[nj].stderr()
295     outputSandbox.append(common.job_list[nj].stdout())
296     # check if out!=err
297     if stdout != stderr:
298     outputSandbox.append(common.job_list[nj].stderr())
299     common.jobDB.setOutputSandbox(nj, outputSandbox)
300 corvo 1.18 common.jobDB.setTaskId(nj, self.cfg_params['taskId'])
301 slacapra 1.9
302 nsmirnov 1.1 njc = njc + 1
303     pass
304    
305     ####
306 fanzago 1.12
307 nsmirnov 1.1 common.jobDB.save()
308    
309     msg = '\nTotal of %d jobs created'%njc
310     if njc != self.ncjobs: msg = msg + ' from %d requested'%self.ncjobs
311     msg = msg + '.\n'
312     common.logger.message(msg)
313     return