ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Creator.py
Revision: 1.20
Committed: Mon Mar 6 17:09:46 2006 UTC (19 years, 2 months ago) by spiga
Content type: text/x-python
Branch: MAIN
Changes since 1.19: +10 -7 lines
Log Message:
some changes to support more jobtype monitoring

File Contents

# User Rev Content
1 nsmirnov 1.1 from Actor import Actor
2     from WorkSpace import WorkSpace
3     from JobList import JobList
4     from JobDB import JobDB
5     from ScriptWriter import ScriptWriter
6     from Scheduler import Scheduler
7     from crab_logger import Logger
8     from crab_exceptions import *
9     from crab_util import *
10     import common
11    
12 slacapra 1.8 import os, string, math
13 nsmirnov 1.1
14     class Creator(Actor):
15     def __init__(self, job_type_name, cfg_params, ncjobs):
16     self.job_type_name = job_type_name
17     self.job_type = None
18     self.cfg_params = cfg_params
19 nsmirnov 1.2 self.total_njobs = 0
20     self.total_number_of_events = 0
21     self.job_number_of_events = 0
22 slacapra 1.7 self.first_event = 0
23 slacapra 1.14
24 fanzago 1.19 ### commented for FAMOS
25 spiga 1.20
26 nsmirnov 1.1 self.createJobTypeObject()
27 nsmirnov 1.3 common.logger.debug(5, __name__+": JobType "+self.job_type.name()+" created")
28 nsmirnov 1.1
29     self.job_type.prepareSteeringCards()
30 nsmirnov 1.3 common.logger.debug(5, __name__+": Steering cards prepared")
31 nsmirnov 1.1
32 nsmirnov 1.2 self.defineTotalNumberOfJobs_()
33 nsmirnov 1.3 common.logger.debug(5, __name__+": total # of jobs = "+`self.total_njobs`)
34 nsmirnov 1.1
35     # Set number of jobs to be created
36    
37     self.ncjobs = ncjobs
38     if ncjobs == 'all' : self.ncjobs = self.total_njobs
39     if ncjobs > self.total_njobs : self.ncjobs = self.total_njobs
40 spiga 1.11
41 spiga 1.20 fileCODE1 = open(common.work_space.shareDir()+"/.code","a")
42     ### commented for FAMOS
43     if self.job_type.name() == 'ORCA':
44     self.owner = cfg_params['ORCA.owner']
45     self.dataset = cfg_params['ORCA.dataset']
46     fileCODE1.write('::'+str(self.job_type.name())+'::'+str(self.ncjobs)+'::'+str(self.dataset)+'::'+str(self.owner))
47     pass
48     elif self.job_type.name() == 'FAMOS':
49     pass
50 spiga 1.11 fileCODE1.close()
51    
52 nsmirnov 1.1
53     #TODO: deprecated code, not needed,
54     # will be eliminated when WorkSpace.saveConfiguration()
55     # will be improved.
56     #
57     # Set/Save Job Type name
58    
59     jt_fname = common.work_space.shareDir() + 'jobtype'
60     if os.path.exists(jt_fname):
61     # Read stored job type name
62     jt_file = open(jt_fname, 'r')
63     jt = jt_file.read()
64     if self.job_type_name:
65     if ( jt != self.job_type_name+'\n' ):
66     msg = 'Job Type mismatch: requested <' + self.job_type_name
67     msg += '>, found <' + jt[:-1] + '>.'
68     raise CrabException(msg)
69     pass
70     else:
71     self.job_type_name = jt[:-1]
72     pass
73     jt_file.close()
74     pass
75     else:
76     # Save job type name
77     jt_file = open(jt_fname, 'w')
78     jt_file.write(self.job_type_name+'\n')
79     jt_file.close()
80     pass
81     #end of deprecated code
82    
83 nsmirnov 1.3 common.logger.debug(5, "Creator constructor finished")
84 nsmirnov 1.1 return
85    
86 nsmirnov 1.2 def defineTotalNumberOfJobs_(self):
87     """
88     Calculates the total number of jobs to be created.
89     """
90    
91     try:
92 slacapra 1.8 self.first_event = int(self.cfg_params['USER.first_event'])
93 slacapra 1.7 except KeyError:
94     self.first_event = 0
95 slacapra 1.8 common.logger.debug(1,"First event ot be analyzed: "+str(self.first_event))
96 slacapra 1.7
97 slacapra 1.10 # TODO Could we find a better way to get this number?
98 slacapra 1.8 maxAvailableEvents = int(self.job_type.maxEvents)
99     common.logger.debug(1,"Available events: "+str(maxAvailableEvents))
100 slacapra 1.7
101 slacapra 1.8 # some sanity check
102     if self.first_event>=maxAvailableEvents:
103 slacapra 1.7 raise CrabException('First event is bigger than maximum number of available events!')
104    
105 slacapra 1.8 # the total number of events to be analyzed
106 slacapra 1.7 try:
107 nsmirnov 1.2 n = self.cfg_params['USER.total_number_of_events']
108     if n == 'all': n = '-1'
109 slacapra 1.8 if n == '-1':
110     self.total_number_of_events = (maxAvailableEvents - self.first_event)
111     common.logger.debug(1,"Analysing all available events "+str(self.total_number_of_events))
112     else:
113     if maxAvailableEvents<(int(n)+self.first_event):
114 slacapra 1.17 raise CrabException('(First event + total events)='+str(int(n)+self.first_event)+' is bigger than maximum number of available events '+str(maxAvailableEvents)+' !! Use "total_number_of_events=-1" to analyze to whole dataset')
115 slacapra 1.8 self.total_number_of_events = int(n)
116 slacapra 1.7 except KeyError:
117     common.logger.message("total_number_of_events not defined, set it to maximum available")
118 slacapra 1.8 self.total_number_of_events = (maxAvailableEvents - self.first_event)
119 slacapra 1.7 pass
120 slacapra 1.8 common.logger.message("Total number of events to be analyzed: "+str(self.total_number_of_events))
121    
122 slacapra 1.7
123 slacapra 1.8 # read user directives
124 slacapra 1.7 eventPerJob=0
125     try:
126     eventPerJob = self.cfg_params['USER.job_number_of_events']
127 nsmirnov 1.2 except KeyError:
128     pass
129    
130 slacapra 1.7 jobsPerTask=0
131 nsmirnov 1.2 try:
132 slacapra 1.7 jobsPerTask = int(self.cfg_params['USER.total_number_of_jobs'])
133 nsmirnov 1.2 except KeyError:
134 slacapra 1.7 pass
135    
136     # If both the above set, complain and use event per jobs
137     if eventPerJob>0 and jobsPerTask>0:
138 nsmirnov 1.2 msg = 'Warning. '
139 slacapra 1.7 msg += 'job_number_of_events and total_number_of_jobs are both defined '
140     msg += 'Using job_number_of_events.'
141 nsmirnov 1.2 common.logger.message(msg)
142 slacapra 1.7 jobsPerTask = 0
143     if eventPerJob==0 and jobsPerTask==0:
144     msg = 'Warning. '
145     msg += 'job_number_of_events and total_number_of_jobs are not defined '
146     msg += 'Creating just one job for all events.'
147     common.logger.message(msg)
148     jobsPerTask = 1
149    
150     # first case: events per job defined
151     if eventPerJob>0:
152     n=eventPerJob
153     if n == 'all' or n == '-1' or (int(n)>self.total_number_of_events and self.total_number_of_events>0):
154     common.logger.message("Asking more events than available: set it to maximum available")
155     self.job_number_of_events = self.total_number_of_events
156 slacapra 1.8 self.total_njobs = 1
157 slacapra 1.7 else:
158     self.job_number_of_events = int(n)
159 slacapra 1.8 self.total_njobs = int((self.total_number_of_events-1)/self.job_number_of_events)+1
160 slacapra 1.7 # second case: jobs per task defined
161     elif jobsPerTask>0:
162 slacapra 1.8 common.logger.debug(2,"total number of events: "+str(self.total_number_of_events)+" JobPerTask "+str(jobsPerTask))
163     self.job_number_of_events = int(math.floor((self.total_number_of_events)/jobsPerTask))
164     self.total_njobs = jobsPerTask
165 slacapra 1.7 # should not happen...
166     else:
167     raise CrabException('Somthing wrong with splitting')
168    
169 slacapra 1.8 common.logger.debug(2,"total number of events: "+str(self.total_number_of_events)+
170 slacapra 1.7 " events per job: "+str(self.job_number_of_events))
171 slacapra 1.8
172 slacapra 1.7 return
173    
174     def writeJobsSpecsToDB(self):
175     """
176     Write firstEvent and maxEvents in the DB for future use
177     """
178    
179     common.jobDB.load()
180     # case one: write first and max events
181     nJobs=self.nJobs()
182    
183     firstEvent=self.first_event
184     # last jobs is different...
185     for job in range(nJobs-1):
186     common.jobDB.setFirstEvent(job, firstEvent)
187     common.jobDB.setMaxEvents(job, self.job_number_of_events)
188     firstEvent=firstEvent+self.job_number_of_events
189    
190     # this is the last job
191     common.jobDB.setFirstEvent(nJobs-1, firstEvent)
192 slacapra 1.8 lastJobsNumberOfEvents= (self.total_number_of_events+self.first_event)-firstEvent
193 fanzago 1.19 #print 'lastJobsNumberOfEvents :', lastJobsNumberOfEvents
194 slacapra 1.8 common.jobDB.setMaxEvents(nJobs-1, lastJobsNumberOfEvents)
195 slacapra 1.7
196 slacapra 1.15 if (lastJobsNumberOfEvents!=self.job_number_of_events):
197     common.logger.message(str(self.total_njobs-1)+' jobs will be created for '+str(self.job_number_of_events)+' events each plus 1 for '+str(lastJobsNumberOfEvents)+' events for a total of '+str(self.job_number_of_events*(self.total_njobs-1)+lastJobsNumberOfEvents)+' events')
198     else:
199     common.logger.message(str(self.total_njobs)+' jobs will be created for '+str(self.job_number_of_events)+' events each for a total of '+str(self.job_number_of_events*(self.total_njobs-1)+lastJobsNumberOfEvents)+' events')
200 slacapra 1.7
201     # case two (to be implemented) write eventCollections for each jobs
202 nsmirnov 1.2
203 slacapra 1.9 # save the DB
204     common.jobDB.save()
205 nsmirnov 1.2 return
206    
207 nsmirnov 1.1 def nJobs(self):
208     return self.total_njobs
209    
210     def createJobTypeObject(self):
211     file_name = 'cms_'+ string.lower(self.job_type_name)
212     klass_name = string.capitalize(self.job_type_name)
213    
214     try:
215     klass = importName(file_name, klass_name)
216     except KeyError:
217     msg = 'No `class '+klass_name+'` found in file `'+file_name+'.py`'
218     raise CrabException(msg)
219     except ImportError, e:
220     msg = 'Cannot create job type '+self.job_type_name
221     msg += ' (file: '+file_name+', class '+klass_name+'):\n'
222     msg += str(e)
223     raise CrabException(msg)
224    
225     self.job_type = klass(self.cfg_params)
226     return
227 nsmirnov 1.5
228     def jobType(self):
229     return self.job_type
230 nsmirnov 1.1
231     def run(self):
232 nsmirnov 1.4 """
233     The main method of the class.
234     """
235 nsmirnov 1.1
236 nsmirnov 1.3 common.logger.debug(5, "Creator::run() called")
237    
238 nsmirnov 1.1 # Instantiate ScriptWriter
239    
240     script_writer = ScriptWriter('crab_template.sh')
241    
242     # Loop over jobs
243    
244     njc = 0
245 nsmirnov 1.4 for nj in range(self.total_njobs):
246 nsmirnov 1.1 if njc == self.ncjobs : break
247 nsmirnov 1.4 st = common.jobDB.status(nj)
248 nsmirnov 1.1 if st != 'X': continue
249    
250 nsmirnov 1.6 common.logger.message("Creating job # "+`(nj+1)`)
251 nsmirnov 1.3
252 nsmirnov 1.1 # Prepare configuration file
253    
254 nsmirnov 1.4 self.job_type.modifySteeringCards(nj)
255 fanzago 1.12 # Create script (sh)
256 fanzago 1.13 #print "nel for prima del modifyTemplateScript, nj vale", nj
257 nsmirnov 1.4 script_writer.modifyTemplateScript(nj)
258     os.chmod(common.job_list[nj].scriptFilename(), 0744)
259 nsmirnov 1.1
260 fanzago 1.12 # Create scheduler scripts (jdl)
261     common.scheduler.createSchScript(nj)
262    
263 nsmirnov 1.4 common.jobDB.setStatus(nj, 'C')
264 slacapra 1.9 # common: write input and output sandbox
265     common.jobDB.setInputSandbox(nj, self.job_type.inputSandbox(nj))
266    
267     outputSandbox=self.job_type.outputSandbox(nj)
268     stdout=common.job_list[nj].stdout()
269     stderr=common.job_list[nj].stderr()
270     outputSandbox.append(common.job_list[nj].stdout())
271     # check if out!=err
272     if stdout != stderr:
273     outputSandbox.append(common.job_list[nj].stderr())
274     common.jobDB.setOutputSandbox(nj, outputSandbox)
275 corvo 1.18 common.jobDB.setTaskId(nj, self.cfg_params['taskId'])
276 slacapra 1.9
277 nsmirnov 1.1 njc = njc + 1
278     pass
279    
280     ####
281 fanzago 1.12
282 nsmirnov 1.1 common.jobDB.save()
283    
284     msg = '\nTotal of %d jobs created'%njc
285     if njc != self.ncjobs: msg = msg + ' from %d requested'%self.ncjobs
286     msg = msg + '.\n'
287     common.logger.message(msg)
288     return