ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Creator.py
Revision: 1.37
Committed: Sun May 7 11:09:39 2006 UTC (18 years, 11 months ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: pre_cmssw_integration_20060527
Changes since 1.36: +7 -1 lines
Log Message:
bug fix for crabmon

File Contents

# User Rev Content
1 nsmirnov 1.1 from Actor import Actor
2     from WorkSpace import WorkSpace
3     from JobList import JobList
4     from JobDB import JobDB
5     from ScriptWriter import ScriptWriter
6     from Scheduler import Scheduler
7     from crab_logger import Logger
8     from crab_exceptions import *
9     from crab_util import *
10     import common
11    
12 slacapra 1.8 import os, string, math
13 corvo 1.26 # marco
14 nsmirnov 1.1 class Creator(Actor):
15 corvo 1.30 def __init__(self, job_type_name, cfg_params, ncjobs):
16 nsmirnov 1.1 self.job_type_name = job_type_name
17 corvo 1.30 self.job_type = None
18 nsmirnov 1.1 self.cfg_params = cfg_params
19 nsmirnov 1.2 self.total_njobs = 0
20     self.total_number_of_events = 0
21     self.job_number_of_events = 0
22 slacapra 1.7 self.first_event = 0
23 slacapra 1.14
24 corvo 1.30 self.createJobTypeObject()
25 nsmirnov 1.3 common.logger.debug(5, __name__+": JobType "+self.job_type.name()+" created")
26 nsmirnov 1.1
27     self.job_type.prepareSteeringCards()
28 nsmirnov 1.3 common.logger.debug(5, __name__+": Steering cards prepared")
29 nsmirnov 1.1
30 nsmirnov 1.2 self.defineTotalNumberOfJobs_()
31 nsmirnov 1.3 common.logger.debug(5, __name__+": total # of jobs = "+`self.total_njobs`)
32 nsmirnov 1.1
33     # Set number of jobs to be created
34    
35     self.ncjobs = ncjobs
36     if ncjobs == 'all' : self.ncjobs = self.total_njobs
37     if ncjobs > self.total_njobs : self.ncjobs = self.total_njobs
38 spiga 1.11
39 slacapra 1.35 # This is code for proto-monitoring
40 spiga 1.20 fileCODE1 = open(common.work_space.shareDir()+"/.code","a")
41 spiga 1.37 if self.job_type.name() == 'ORCA' or self.job_type.name() == 'ORCA_DBSDLS' :
42 spiga 1.20 self.owner = cfg_params['ORCA.owner']
43     self.dataset = cfg_params['ORCA.dataset']
44     fileCODE1.write('::'+str(self.job_type.name())+'::'+str(self.ncjobs)+'::'+str(self.dataset)+'::'+str(self.owner))
45     pass
46     elif self.job_type.name() == 'FAMOS':
47 spiga 1.27 self.inputFile = cfg_params['FAMOS.input_lfn']
48     self.executable = cfg_params['FAMOS.executable']
49     fileCODE1.write('::'+str(self.job_type.name())+'::'+str(self.ncjobs)+'::'+str(self.inputFile)+'::'+str(self.executable))
50 spiga 1.20 pass
51 spiga 1.37 elif self.job_type.name() == 'CMSSW':
52     self.inputFile = cfg_params['CMSSW.datasetpath']
53     self.executable = cfg_params['CMSSW.pset']
54     fileCODE1.write('::'+str(self.job_type.name())+'::'+str(self.ncjobs)+'::'+str(self.inputFile)+'::'+str(self.executable))
55     pass
56    
57 spiga 1.11 fileCODE1.close()
58    
59 slacapra 1.35 # This is code for dashboard
60 spiga 1.36
61     #First checkProxy
62     common.scheduler.checkProxy()
63 corvo 1.33 try:
64     fl = open(common.work_space.shareDir() + '/' + self.cfg_params['apmon'].fName, 'w')
65     self.cfg_params['GridName'] = runCommand("voms-proxy-info -identity")
66     common.logger.debug(5, "GRIDNAME: "+self.cfg_params['GridName'])
67     taskType = 'analysis'
68    
69     params = {'tool': common.prog_name, 'tool_ui': os.environ['HOSTNAME'], \
70     'scheduler': self.cfg_params['CRAB.scheduler'], 'GridName': self.cfg_params['GridName'].strip(), \
71     'taskType': taskType, 'vo': self.cfg_params['EDG.virtual_organization'], 'user': self.cfg_params['user']}
72     jtParam = self.job_type.getParams()
73     for i in jtParam.iterkeys():
74     params[i] = string.strip(jtParam[i])
75     for j, k in params.iteritems():
76     # print "Values: %s %s"%(j, k)
77     fl.write(j + ':' + k + '\n')
78     fl.close()
79     except:
80     exctype, value = sys.exc_info()[:2]
81 slacapra 1.35 common.logger.message("Creator::run Exception raised in collection params for dashboard: %s %s"%(exctype, value))
82 corvo 1.33 pass
83 nsmirnov 1.1
84     #TODO: deprecated code, not needed,
85     # will be eliminated when WorkSpace.saveConfiguration()
86     # will be improved.
87     #
88     # Set/Save Job Type name
89    
90     jt_fname = common.work_space.shareDir() + 'jobtype'
91     if os.path.exists(jt_fname):
92     # Read stored job type name
93     jt_file = open(jt_fname, 'r')
94     jt = jt_file.read()
95     if self.job_type_name:
96     if ( jt != self.job_type_name+'\n' ):
97     msg = 'Job Type mismatch: requested <' + self.job_type_name
98     msg += '>, found <' + jt[:-1] + '>.'
99     raise CrabException(msg)
100     pass
101     else:
102     self.job_type_name = jt[:-1]
103     pass
104     jt_file.close()
105     pass
106     else:
107     # Save job type name
108     jt_file = open(jt_fname, 'w')
109     jt_file.write(self.job_type_name+'\n')
110     jt_file.close()
111     pass
112     #end of deprecated code
113    
114 nsmirnov 1.3 common.logger.debug(5, "Creator constructor finished")
115 nsmirnov 1.1 return
116    
117 nsmirnov 1.2 def defineTotalNumberOfJobs_(self):
118     """
119     Calculates the total number of jobs to be created.
120     """
121    
122     try:
123 slacapra 1.8 self.first_event = int(self.cfg_params['USER.first_event'])
124 slacapra 1.7 except KeyError:
125     self.first_event = 0
126 slacapra 1.8 common.logger.debug(1,"First event ot be analyzed: "+str(self.first_event))
127 slacapra 1.7
128 slacapra 1.10 # TODO Could we find a better way to get this number?
129 slacapra 1.8 maxAvailableEvents = int(self.job_type.maxEvents)
130     common.logger.debug(1,"Available events: "+str(maxAvailableEvents))
131 slacapra 1.7
132 slacapra 1.8 # some sanity check
133     if self.first_event>=maxAvailableEvents:
134 slacapra 1.7 raise CrabException('First event is bigger than maximum number of available events!')
135    
136 slacapra 1.8 # the total number of events to be analyzed
137 slacapra 1.7 try:
138 nsmirnov 1.2 n = self.cfg_params['USER.total_number_of_events']
139     if n == 'all': n = '-1'
140 slacapra 1.8 if n == '-1':
141     self.total_number_of_events = (maxAvailableEvents - self.first_event)
142     common.logger.debug(1,"Analysing all available events "+str(self.total_number_of_events))
143     else:
144     if maxAvailableEvents<(int(n)+self.first_event):
145 slacapra 1.17 raise CrabException('(First event + total events)='+str(int(n)+self.first_event)+' is bigger than maximum number of available events '+str(maxAvailableEvents)+' !! Use "total_number_of_events=-1" to analyze to whole dataset')
146 slacapra 1.8 self.total_number_of_events = int(n)
147 slacapra 1.7 except KeyError:
148     common.logger.message("total_number_of_events not defined, set it to maximum available")
149 slacapra 1.8 self.total_number_of_events = (maxAvailableEvents - self.first_event)
150 slacapra 1.7 pass
151 slacapra 1.8 common.logger.message("Total number of events to be analyzed: "+str(self.total_number_of_events))
152    
153 slacapra 1.7
154 slacapra 1.8 # read user directives
155 slacapra 1.7 eventPerJob=0
156     try:
157     eventPerJob = self.cfg_params['USER.job_number_of_events']
158 nsmirnov 1.2 except KeyError:
159     pass
160    
161 slacapra 1.7 jobsPerTask=0
162 nsmirnov 1.2 try:
163 slacapra 1.7 jobsPerTask = int(self.cfg_params['USER.total_number_of_jobs'])
164 nsmirnov 1.2 except KeyError:
165 slacapra 1.7 pass
166    
167     # If both the above set, complain and use event per jobs
168     if eventPerJob>0 and jobsPerTask>0:
169 nsmirnov 1.2 msg = 'Warning. '
170 slacapra 1.7 msg += 'job_number_of_events and total_number_of_jobs are both defined '
171     msg += 'Using job_number_of_events.'
172 nsmirnov 1.2 common.logger.message(msg)
173 slacapra 1.7 jobsPerTask = 0
174     if eventPerJob==0 and jobsPerTask==0:
175     msg = 'Warning. '
176     msg += 'job_number_of_events and total_number_of_jobs are not defined '
177     msg += 'Creating just one job for all events.'
178     common.logger.message(msg)
179     jobsPerTask = 1
180    
181     # first case: events per job defined
182     if eventPerJob>0:
183     n=eventPerJob
184     if n == 'all' or n == '-1' or (int(n)>self.total_number_of_events and self.total_number_of_events>0):
185     common.logger.message("Asking more events than available: set it to maximum available")
186     self.job_number_of_events = self.total_number_of_events
187 slacapra 1.8 self.total_njobs = 1
188 slacapra 1.7 else:
189     self.job_number_of_events = int(n)
190 slacapra 1.8 self.total_njobs = int((self.total_number_of_events-1)/self.job_number_of_events)+1
191 slacapra 1.7 # second case: jobs per task defined
192     elif jobsPerTask>0:
193 slacapra 1.8 common.logger.debug(2,"total number of events: "+str(self.total_number_of_events)+" JobPerTask "+str(jobsPerTask))
194     self.job_number_of_events = int(math.floor((self.total_number_of_events)/jobsPerTask))
195     self.total_njobs = jobsPerTask
196 slacapra 1.7 # should not happen...
197     else:
198     raise CrabException('Somthing wrong with splitting')
199    
200 slacapra 1.8 common.logger.debug(2,"total number of events: "+str(self.total_number_of_events)+
201 slacapra 1.7 " events per job: "+str(self.job_number_of_events))
202 slacapra 1.8
203 slacapra 1.7 return
204    
205     def writeJobsSpecsToDB(self):
206     """
207     Write firstEvent and maxEvents in the DB for future use
208     """
209    
210     common.jobDB.load()
211     # case one: write first and max events
212     nJobs=self.nJobs()
213    
214     firstEvent=self.first_event
215 fanzago 1.21 lastJobsNumberOfEvents = self.job_number_of_events
216 fanzago 1.28
217     try:
218     self.events_management = common.analisys_common_info['events_management']
219 fanzago 1.31 #print "self.events_management = ", self.events_management
220 fanzago 1.28 except KeyError: self.events_management = 0
221    
222 slacapra 1.7 # last jobs is different...
223     for job in range(nJobs-1):
224     common.jobDB.setFirstEvent(job, firstEvent)
225     common.jobDB.setMaxEvents(job, self.job_number_of_events)
226 fanzago 1.28 ### fede checks parameter for famos and orca
227     #try:
228     # self.events_management = common.analisys_common_info['events_management']
229     # print "self.events_management = ", self.events_management
230     #except KeyError: self.events_management = 0
231 spiga 1.27 firstEvent=firstEvent+self.job_number_of_events
232 slacapra 1.7
233     # this is the last job
234     common.jobDB.setFirstEvent(nJobs-1, firstEvent)
235 fanzago 1.21 if int(self.events_management) == 1:
236     lastJobsNumberOfEvents= (self.total_number_of_events+self.first_event)-firstEvent
237 slacapra 1.8 common.jobDB.setMaxEvents(nJobs-1, lastJobsNumberOfEvents)
238 slacapra 1.7
239 slacapra 1.15 if (lastJobsNumberOfEvents!=self.job_number_of_events):
240     common.logger.message(str(self.total_njobs-1)+' jobs will be created for '+str(self.job_number_of_events)+' events each plus 1 for '+str(lastJobsNumberOfEvents)+' events for a total of '+str(self.job_number_of_events*(self.total_njobs-1)+lastJobsNumberOfEvents)+' events')
241     else:
242     common.logger.message(str(self.total_njobs)+' jobs will be created for '+str(self.job_number_of_events)+' events each for a total of '+str(self.job_number_of_events*(self.total_njobs-1)+lastJobsNumberOfEvents)+' events')
243 slacapra 1.7
244     # case two (to be implemented) write eventCollections for each jobs
245 nsmirnov 1.2
246 slacapra 1.9 # save the DB
247     common.jobDB.save()
248 nsmirnov 1.2 return
249    
250 nsmirnov 1.1 def nJobs(self):
251     return self.total_njobs
252    
253     def createJobTypeObject(self):
254     file_name = 'cms_'+ string.lower(self.job_type_name)
255     klass_name = string.capitalize(self.job_type_name)
256    
257     try:
258     klass = importName(file_name, klass_name)
259     except KeyError:
260     msg = 'No `class '+klass_name+'` found in file `'+file_name+'.py`'
261     raise CrabException(msg)
262     except ImportError, e:
263     msg = 'Cannot create job type '+self.job_type_name
264     msg += ' (file: '+file_name+', class '+klass_name+'):\n'
265     msg += str(e)
266     raise CrabException(msg)
267    
268     self.job_type = klass(self.cfg_params)
269     return
270 nsmirnov 1.5
271     def jobType(self):
272     return self.job_type
273 nsmirnov 1.1
274     def run(self):
275 nsmirnov 1.4 """
276     The main method of the class.
277     """
278 nsmirnov 1.1
279 nsmirnov 1.3 common.logger.debug(5, "Creator::run() called")
280    
281 nsmirnov 1.1 # Instantiate ScriptWriter
282    
283     script_writer = ScriptWriter('crab_template.sh')
284    
285     # Loop over jobs
286    
287     njc = 0
288 nsmirnov 1.4 for nj in range(self.total_njobs):
289 nsmirnov 1.1 if njc == self.ncjobs : break
290 nsmirnov 1.4 st = common.jobDB.status(nj)
291 nsmirnov 1.1 if st != 'X': continue
292    
293 nsmirnov 1.6 common.logger.message("Creating job # "+`(nj+1)`)
294 nsmirnov 1.3
295 nsmirnov 1.1 # Prepare configuration file
296    
297 nsmirnov 1.4 self.job_type.modifySteeringCards(nj)
298 fanzago 1.12 # Create script (sh)
299 fanzago 1.13 #print "nel for prima del modifyTemplateScript, nj vale", nj
300 nsmirnov 1.4 script_writer.modifyTemplateScript(nj)
301     os.chmod(common.job_list[nj].scriptFilename(), 0744)
302 nsmirnov 1.1
303 fanzago 1.12 # Create scheduler scripts (jdl)
304     common.scheduler.createSchScript(nj)
305    
306 nsmirnov 1.4 common.jobDB.setStatus(nj, 'C')
307 slacapra 1.9 # common: write input and output sandbox
308     common.jobDB.setInputSandbox(nj, self.job_type.inputSandbox(nj))
309    
310     outputSandbox=self.job_type.outputSandbox(nj)
311 gutsche 1.32 stdout=common.job_list[nj].stdout()
312 slacapra 1.9 stderr=common.job_list[nj].stderr()
313     outputSandbox.append(common.job_list[nj].stdout())
314     # check if out!=err
315     if stdout != stderr:
316     outputSandbox.append(common.job_list[nj].stderr())
317     common.jobDB.setOutputSandbox(nj, outputSandbox)
318 corvo 1.18 common.jobDB.setTaskId(nj, self.cfg_params['taskId'])
319 slacapra 1.9
320 nsmirnov 1.1 njc = njc + 1
321     pass
322    
323     ####
324 fanzago 1.12
325 nsmirnov 1.1 common.jobDB.save()
326    
327     msg = '\nTotal of %d jobs created'%njc
328     if njc != self.ncjobs: msg = msg + ' from %d requested'%self.ncjobs
329     msg = msg + '.\n'
330     common.logger.message(msg)
331     return