ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Splitter.py
Revision: 1.7
Committed: Wed Feb 11 22:13:03 2009 UTC (16 years, 2 months ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_5_0_pre5, CRAB_2_5_0_pre4
Changes since 1.6: +1 -0 lines
Log Message:
set correctly maxEvent for mc prod jobs

File Contents

# User Rev Content
1 spiga 1.1 import common
2     from crab_logger import Logger
3     from crab_exceptions import *
4     from crab_util import *
5     from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
6    
7     class JobSplitter:
8     def __init__( self, cfg_params, args ):
9     self.cfg_params = cfg_params
10 spiga 1.3 self.args=args
11 spiga 1.1 #self.maxEvents
12     # init BlackWhiteListParser
13     seWhiteList = cfg_params.get('EDG.se_white_list',[])
14     seBlackList = cfg_params.get('EDG.se_black_list',[])
15     self.blackWhiteListParser = SEBlackWhiteListParser(seWhiteList, seBlackList, common.logger)
16    
17    
18     def checkUserSettings(self):
19     ## Events per job
20     if self.cfg_params.has_key('CMSSW.events_per_job'):
21     self.eventsPerJob =int( self.cfg_params['CMSSW.events_per_job'])
22     self.selectEventsPerJob = 1
23     else:
24     self.eventsPerJob = -1
25     self.selectEventsPerJob = 0
26    
27     ## number of jobs
28     if self.cfg_params.has_key('CMSSW.number_of_jobs'):
29     self.theNumberOfJobs =int( self.cfg_params['CMSSW.number_of_jobs'])
30     self.selectNumberOfJobs = 1
31     else:
32     self.theNumberOfJobs = 0
33     self.selectNumberOfJobs = 0
34    
35     if self.cfg_params.has_key('CMSSW.total_number_of_events'):
36     self.total_number_of_events = int(self.cfg_params['CMSSW.total_number_of_events'])
37     self.selectTotalNumberEvents = 1
38     if self.selectNumberOfJobs == 1:
39     if (self.total_number_of_events != -1) and int(self.total_number_of_events) < int(self.theNumberOfJobs):
40     msg = 'Must specify at least one event per job. total_number_of_events > number_of_jobs '
41     raise CrabException(msg)
42     else:
43     self.total_number_of_events = 0
44     self.selectTotalNumberEvents = 0
45    
46    
47     ########################################################################
48     def jobSplittingByEvent( self ):
49     """
50     Perform job splitting. Jobs run over an integer number of files
51     and no more than one block.
52     ARGUMENT: blockSites: dictionary with blocks as keys and list of host sites as values
53     REQUIRES: self.selectTotalNumberEvents, self.selectEventsPerJob, self.selectNumberofJobs,
54     self.total_number_of_events, self.eventsPerJob, self.theNumberOfJobs,
55     self.maxEvents, self.filesbyblock
56 spiga 1.3 SETS: jobDestination - Site destination(s) for each job (a list of lists)
57 spiga 1.1 self.total_number_of_jobs - Total # of jobs
58     self.list_of_args - File(s) job will run on (a list of lists)
59     """
60    
61 spiga 1.3 jobDestination=[]
62 spiga 1.1 self.checkUserSettings()
63     if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
64     msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
65     raise CrabException(msg)
66    
67 spiga 1.4 blockSites = self.args['blockSites']
68     pubdata = self.args['pubdata']
69 spiga 1.3 filesbyblock=pubdata.getFiles()
70    
71     self.eventsbyblock=pubdata.getEventsPerBlock()
72     self.eventsbyfile=pubdata.getEventsPerFile()
73     self.parentFiles=pubdata.getParent()
74 spiga 1.1
75     ## get max number of events
76 spiga 1.3 self.maxEvents=pubdata.getMaxEvents()
77 spiga 1.1
78     self.useParent = int(self.cfg_params.get('CMSSW.use_parent',0))
79     noBboundary = int(self.cfg_params.get('CMSSW.no_block_boundary',0))
80    
81     # ---- Handle the possible job splitting configurations ---- #
82     if (self.selectTotalNumberEvents):
83     totalEventsRequested = self.total_number_of_events
84     if (self.selectEventsPerJob):
85     eventsPerJobRequested = self.eventsPerJob
86     if (self.selectNumberOfJobs):
87     totalEventsRequested = self.theNumberOfJobs * self.eventsPerJob
88    
89     # If user requested all the events in the dataset
90     if (totalEventsRequested == -1):
91     eventsRemaining=self.maxEvents
92     # If user requested more events than are in the dataset
93     elif (totalEventsRequested > self.maxEvents):
94     eventsRemaining = self.maxEvents
95     common.logger.message("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
96     # If user requested less events than are in the dataset
97     else:
98     eventsRemaining = totalEventsRequested
99    
100     # If user requested more events per job than are in the dataset
101     if (self.selectEventsPerJob and eventsPerJobRequested > self.maxEvents):
102     eventsPerJobRequested = self.maxEvents
103    
104     # For user info at end
105     totalEventCount = 0
106    
107     if (self.selectTotalNumberEvents and self.selectNumberOfJobs):
108     eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs)
109    
110     if (self.selectNumberOfJobs):
111     common.logger.message("May not create the exact number_of_jobs requested.")
112    
113     # old... to remove Daniele
114     totalNumberOfJobs = 999999999
115    
116 spiga 1.3 blocks = blockSites.keys()
117 spiga 1.1 blockCount = 0
118     # Backup variable in case self.maxEvents counted events in a non-included block
119     numBlocksInDataset = len(blocks)
120    
121     jobCount = 0
122     list_of_lists = []
123    
124     # list tracking which jobs are in which jobs belong to which block
125     jobsOfBlock = {}
126    
127     parString = ""
128     filesEventCount = 0
129    
130     # ---- Iterate over the blocks in the dataset until ---- #
131     # ---- we've met the requested total # of events ---- #
132     while ( (eventsRemaining > 0) and (blockCount < numBlocksInDataset) and (jobCount < totalNumberOfJobs)):
133     block = blocks[blockCount]
134     blockCount += 1
135     if block not in jobsOfBlock.keys() :
136     jobsOfBlock[block] = []
137    
138     if self.eventsbyblock.has_key(block) :
139     numEventsInBlock = self.eventsbyblock[block]
140     common.logger.debug(5,'Events in Block File '+str(numEventsInBlock))
141    
142 spiga 1.4 files = filesbyblock[block]
143 spiga 1.1 numFilesInBlock = len(files)
144     if (numFilesInBlock <= 0):
145     continue
146     fileCount = 0
147     if noBboundary == 0: # DD
148     # ---- New block => New job ---- #
149     parString = ""
150     # counter for number of events in files currently worked on
151     filesEventCount = 0
152     # flag if next while loop should touch new file
153     newFile = 1
154     # job event counter
155     jobSkipEventCount = 0
156    
157     # ---- Iterate over the files in the block until we've met the requested ---- #
158     # ---- total # of events or we've gone over all the files in this block ---- #
159     pString=''
160     while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
161     file = files[fileCount]
162     if self.useParent==1:
163     parent = self.parentFiles[file]
164     for f in parent :
165     pString += '\\\"' + f + '\\\"\,'
166     common.logger.debug(6, "File "+str(file)+" has the following parents: "+str(parent))
167     common.logger.write("File "+str(file)+" has the following parents: "+str(parent))
168     if newFile :
169     try:
170     numEventsInFile = self.eventsbyfile[file]
171     common.logger.debug(6, "File "+str(file)+" has "+str(numEventsInFile)+" events")
172     # increase filesEventCount
173     filesEventCount += numEventsInFile
174     # Add file to current job
175     parString += '\\\"' + file + '\\\"\,'
176     newFile = 0
177     except KeyError:
178     common.logger.message("File "+str(file)+" has unknown number of events: skipping")
179    
180     eventsPerJobRequested = min(eventsPerJobRequested, eventsRemaining)
181     # if less events in file remain than eventsPerJobRequested
182     if ( filesEventCount - jobSkipEventCount < eventsPerJobRequested):
183     if noBboundary == 1: ## DD
184     newFile = 1
185     fileCount += 1
186     else:
187     # if last file in block
188     if ( fileCount == numFilesInBlock-1 ) :
189     # end job using last file, use remaining events in block
190     # close job and touch new file
191     fullString = parString[:-2]
192     if self.useParent==1:
193     fullParentString = pString[:-2]
194     list_of_lists.append([fullString,fullParentString,str(-1),str(jobSkipEventCount)])
195     else:
196     list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
197     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(filesEventCount - jobSkipEventCount)+" events (last file in block).")
198 spiga 1.3 jobDestination.append(blockSites[block])
199     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(jobDestination[jobCount]))
200 spiga 1.1 # fill jobs of block dictionary
201     jobsOfBlock[block].append(jobCount+1)
202     # reset counter
203     jobCount = jobCount + 1
204     totalEventCount = totalEventCount + filesEventCount - jobSkipEventCount
205     eventsRemaining = eventsRemaining - filesEventCount + jobSkipEventCount
206     jobSkipEventCount = 0
207     # reset file
208     pString = ""
209     parString = ""
210     filesEventCount = 0
211     newFile = 1
212     fileCount += 1
213     else :
214     # go to next file
215     newFile = 1
216     fileCount += 1
217     # if events in file equal to eventsPerJobRequested
218     elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
219     # close job and touch new file
220     fullString = parString[:-2]
221     if self.useParent==1:
222     fullParentString = pString[:-2]
223     list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
224     else:
225     list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
226     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
227 spiga 1.3 jobDestination.append(blockSites[block])
228     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(jobDestination[jobCount]))
229 spiga 1.1 jobsOfBlock[block].append(jobCount+1)
230     # reset counter
231     jobCount = jobCount + 1
232     totalEventCount = totalEventCount + eventsPerJobRequested
233     eventsRemaining = eventsRemaining - eventsPerJobRequested
234     jobSkipEventCount = 0
235     # reset file
236     pString = ""
237     parString = ""
238     filesEventCount = 0
239     newFile = 1
240     fileCount += 1
241    
242     # if more events in file remain than eventsPerJobRequested
243     else :
244     # close job but don't touch new file
245     fullString = parString[:-2]
246     if self.useParent==1:
247     fullParentString = pString[:-2]
248     list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
249     else:
250     list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
251     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
252 spiga 1.3 jobDestination.append(blockSites[block])
253     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(jobDestination[jobCount]))
254 spiga 1.1 jobsOfBlock[block].append(jobCount+1)
255     # increase counter
256     jobCount = jobCount + 1
257     totalEventCount = totalEventCount + eventsPerJobRequested
258     eventsRemaining = eventsRemaining - eventsPerJobRequested
259     # calculate skip events for last file
260     # use filesEventCount (contains several files), jobSkipEventCount and eventsPerJobRequest
261     jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
262     # remove all but the last file
263     filesEventCount = self.eventsbyfile[file]
264     if self.useParent==1:
265     for f in parent : pString += '\\\"' + f + '\\\"\,'
266     parString = '\\\"' + file + '\\\"\,'
267     pass # END if
268     pass # END while (iterate over files in the block)
269     pass # END while (iterate over blocks in the dataset)
270     self.ncjobs = self.total_number_of_jobs = jobCount
271     if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
272     common.logger.message("Could not run on all requested events because some blocks not hosted at allowed sites.")
273     common.logger.message(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
274    
275     # skip check on block with no sites DD
276 spiga 1.5 if noBboundary == 0 : self.checkBlockNoSite(blocks,jobsOfBlock,blockSites)
277 spiga 1.1
278     # prepare dict output
279     dictOut = {}
280     dictOut['args'] = list_of_lists
281 spiga 1.3 dictOut['jobDestination'] = jobDestination
282 spiga 1.1 dictOut['njobs']=self.total_number_of_jobs
283    
284     return dictOut
285    
286     # keep trace of block with no sites to print a warning at the end
287    
288 spiga 1.5 def checkBlockNoSite(self,blocks,jobsOfBlock,blockSites):
289 spiga 1.1 # screen output
290     screenOutput = "List of jobs and available destination sites:\n\n"
291     noSiteBlock = []
292     bloskNoSite = []
293    
294     blockCounter = 0
295     for block in blocks:
296     if block in jobsOfBlock.keys() :
297     blockCounter += 1
298     screenOutput += "Block %5i: jobs %20s: sites: %s\n" % (blockCounter,spanRanges(jobsOfBlock[block]),
299 spiga 1.3 ','.join(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],[block]),[block])))
300     if len(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],[block]),[block])) == 0:
301 spiga 1.1 noSiteBlock.append( spanRanges(jobsOfBlock[block]) )
302     bloskNoSite.append( blockCounter )
303    
304     common.logger.message(screenOutput)
305     if len(noSiteBlock) > 0 and len(bloskNoSite) > 0:
306     msg = 'WARNING: No sites are hosting any part of data for block:\n '
307     virgola = ""
308     if len(bloskNoSite) > 1:
309     virgola = ","
310     for block in bloskNoSite:
311     msg += ' ' + str(block) + virgola
312     msg += '\n Related jobs:\n '
313     virgola = ""
314     if len(noSiteBlock) > 1:
315     virgola = ","
316     for range_jobs in noSiteBlock:
317     msg += str(range_jobs) + virgola
318     msg += '\n will not be submitted and this block of data can not be analyzed!\n'
319     if self.cfg_params.has_key('EDG.se_white_list'):
320     msg += 'WARNING: SE White List: '+self.cfg_params['EDG.se_white_list']+'\n'
321     msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
322     msg += 'Please check if the dataset is available at this site!)\n'
323     if self.cfg_params.has_key('EDG.ce_white_list'):
324     msg += 'WARNING: CE White List: '+self.cfg_params['EDG.ce_white_list']+'\n'
325     msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
326     msg += 'Please check if the dataset is available at this site!)\n'
327    
328     common.logger.message(msg)
329    
330     return
331    
332    
333     ########################################################################
334     def jobSplittingByRun(self):
335     """
336     """
337     from sets import Set
338     from WMCore.JobSplitting.RunBased import RunBased
339     from WMCore.DataStructs.Workflow import Workflow
340     from WMCore.DataStructs.File import File
341     from WMCore.DataStructs.Fileset import Fileset
342     from WMCore.DataStructs.Subscription import Subscription
343     from WMCore.JobSplitting.SplitterFactory import SplitterFactory
344     from WMCore.DataStructs.Run import Run
345    
346     self.checkUserSettings()
347 spiga 1.4 blockSites = self.args['blockSites']
348     pubdata = self.args['pubdata']
349 spiga 1.1
350     if self.selectNumberOfJobs == 0 :
351     self.theNumberOfJobs = 9999999
352     blocks = {}
353     runList = []
354     thefiles = Fileset(name='FilesToSplit')
355 spiga 1.3 fileList = pubdata.getListFiles()
356 spiga 1.1 for f in fileList:
357     # print f
358     block = f['Block']['Name']
359     # if not blocks.has_key(block):
360     # blocks[block] = reader.listFileBlockLocation(block)
361     try:
362 spiga 1.3 f['Block']['StorageElementList'].extend(blockSites[block])
363 spiga 1.1 except:
364     continue
365     wmbsFile = File(f['LogicalFileName'])
366 spiga 1.3 [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
367 spiga 1.1 wmbsFile['block'] = block
368     runNum = f['RunsList'][0]['RunNumber']
369     runList.append(runNum)
370     myRun = Run(runNumber=runNum)
371     wmbsFile.addRun( myRun )
372     thefiles.addFile(
373     wmbsFile
374     )
375    
376     work = Workflow()
377     subs = Subscription(
378     fileset = thefiles,
379     workflow = work,
380     split_algo = 'RunBased',
381     type = "Processing")
382     splitter = SplitterFactory()
383     jobfactory = splitter(subs)
384    
385     #loop over all runs
386     set = Set(runList)
387     list_of_lists = []
388     jobDestination = []
389    
390     count = 0
391     for i in list(set):
392     if count < self.theNumberOfJobs:
393     res = self.getJobInfo(jobfactory())
394     parString = ''
395     for file in res['lfns']:
396     parString += '\\\"' + file + '\\\"\,'
397     fullString = parString[:-2]
398     list_of_lists.append([fullString,str(-1),str(0)])
399 spiga 1.2 #need to check single file location
400 spiga 1.1 jobDestination.append(res['locations'])
401     count +=1
402     #print jobDestination
403     # prepare dict output
404     dictOut = {}
405     dictOut['args'] = list_of_lists
406     dictOut['jobDestination'] = jobDestination
407     dictOut['njobs']=count
408    
409     return dictOut
410    
411     def getJobInfo( self,jobGroup ):
412     res = {}
413     lfns = []
414     locations = []
415     tmp_check=0
416     for job in jobGroup.jobs:
417     for file in job.getFiles():
418     lfns.append(file['lfn'])
419     for loc in file['locations']:
420     if tmp_check < 1 :
421     locations.append(loc)
422     tmp_check = tmp_check + 1
423     ### qui va messo il check per la locations
424     res['lfns'] = lfns
425     res['locations'] = locations
426     return res
427    
428     ########################################################################
429     def jobSplittingNoInput(self):
430     """
431     Perform job splitting based on number of event per job
432     """
433     common.logger.debug(5,'Splitting per events')
434 spiga 1.3 self.checkUserSettings()
435     jobDestination=[]
436 spiga 1.6 if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
437     msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
438 spiga 1.3 raise CrabException(msg)
439    
440     managedGenerators =self.args['managedGenerators']
441     generator = self.args['generator']
442     firstRun = self.cfg_params.get('CMSSW.first_run',None)
443 spiga 1.1
444     if (self.selectEventsPerJob):
445     common.logger.message('Required '+str(self.eventsPerJob)+' events per job ')
446     if (self.selectNumberOfJobs):
447     common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
448     if (self.selectTotalNumberEvents):
449     common.logger.message('Required '+str(self.total_number_of_events)+' events in total ')
450    
451     if (self.total_number_of_events < 0):
452     msg='Cannot split jobs per Events with "-1" as total number of events'
453     raise CrabException(msg)
454    
455     if (self.selectEventsPerJob):
456     if (self.selectTotalNumberEvents):
457     self.total_number_of_jobs = int(self.total_number_of_events/self.eventsPerJob)
458     elif(self.selectNumberOfJobs) :
459     self.total_number_of_jobs =self.theNumberOfJobs
460     self.total_number_of_events =int(self.theNumberOfJobs*self.eventsPerJob)
461    
462     elif (self.selectNumberOfJobs) :
463     self.total_number_of_jobs = self.theNumberOfJobs
464     self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
465    
466     common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
467    
468     # is there any remainder?
469     check = int(self.total_number_of_events) - (int(self.total_number_of_jobs)*self.eventsPerJob)
470    
471     common.logger.debug(5,'Check '+str(check))
472    
473     common.logger.message(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
474     if check > 0:
475     common.logger.message('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
476    
477     # argument is seed number.$i
478     self.list_of_args = []
479     for i in range(self.total_number_of_jobs):
480     ## Since there is no input, any site is good
481 spiga 1.3 jobDestination.append([""]) #must be empty to write correctly the xml
482 spiga 1.1 args=[]
483 spiga 1.3 if (firstRun):
484 spiga 1.1 ## pythia first run
485 spiga 1.3 args.append(str(firstRun)+str(i))
486     if (generator in managedGenerators):
487     if (generator == 'comphep' and i == 0):
488 spiga 1.1 # COMPHEP is brain-dead and wants event #'s like 1,100,200,300
489     args.append('1')
490     else:
491     args.append(str(i*self.eventsPerJob))
492 spiga 1.7 args.append(str(self.eventsPerJob))
493 spiga 1.1 self.list_of_args.append(args)
494     # prepare dict output
495     dictOut = {}
496     dictOut['args'] = self.list_of_args
497 spiga 1.3 dictOut['jobDestination'] = jobDestination
498 spiga 1.1 dictOut['njobs']=self.total_number_of_jobs
499    
500     return dictOut
501    
502    
503     def jobSplittingForScript(self):
504     """
505     Perform job splitting based on number of job
506     """
507     self.checkUserSettings()
508 spiga 1.3 if (self.selectNumberOfJobs == 0):
509 spiga 1.1 msg = 'must specify number_of_jobs.'
510     raise crabexception(msg)
511 spiga 1.3 jobDestination = []
512 spiga 1.1 common.logger.debug(5,'Splitting per job')
513     common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
514    
515     self.total_number_of_jobs = self.theNumberOfJobs
516    
517     common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
518    
519     common.logger.message(str(self.total_number_of_jobs)+' jobs can be created')
520    
521     # argument is seed number.$i
522     self.list_of_args = []
523     for i in range(self.total_number_of_jobs):
524 spiga 1.3 jobDestination.append([""])
525 spiga 1.1 self.list_of_args.append([str(i)])
526    
527     # prepare dict output
528     dictOut = {}
529     dictOut['args'] = self.list_of_args
530 spiga 1.3 dictOut['jobDestination'] = jobDestination
531 spiga 1.1 dictOut['njobs']=self.total_number_of_jobs
532     return dictOut
533    
534    
535     def jobSplittingByLumi(self):
536     """
537     """
538     return
539     def Algos(self):
540     """
541     Define key splittingType matrix
542     """
543     SplitAlogs = {
544     'EventBased' : self.jobSplittingByEvent,
545     'RunBased' : self.jobSplittingByRun,
546     'LumiBased' : self.jobSplittingByLumi,
547     'NoInput' : self.jobSplittingNoInput,
548     'ForScript' : self.jobSplittingForScript
549     }
550     return SplitAlogs
551