ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Splitter.py
Revision: 1.9
Committed: Fri Mar 6 17:03:08 2009 UTC (16 years, 2 months ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_5_0, CRAB_2_5_0_pre7, CRAB_2_5_0_pre6
Changes since 1.8: +0 -4 lines
Log Message:
removed print

File Contents

# User Rev Content
1 spiga 1.1 import common
2     from crab_logger import Logger
3     from crab_exceptions import *
4     from crab_util import *
5     from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
6    
7     class JobSplitter:
8     def __init__( self, cfg_params, args ):
9     self.cfg_params = cfg_params
10 spiga 1.3 self.args=args
11 spiga 1.1 #self.maxEvents
12     # init BlackWhiteListParser
13     seWhiteList = cfg_params.get('EDG.se_white_list',[])
14     seBlackList = cfg_params.get('EDG.se_black_list',[])
15     self.blackWhiteListParser = SEBlackWhiteListParser(seWhiteList, seBlackList, common.logger)
16    
17    
18     def checkUserSettings(self):
19     ## Events per job
20     if self.cfg_params.has_key('CMSSW.events_per_job'):
21     self.eventsPerJob =int( self.cfg_params['CMSSW.events_per_job'])
22     self.selectEventsPerJob = 1
23     else:
24     self.eventsPerJob = -1
25     self.selectEventsPerJob = 0
26    
27     ## number of jobs
28     if self.cfg_params.has_key('CMSSW.number_of_jobs'):
29     self.theNumberOfJobs =int( self.cfg_params['CMSSW.number_of_jobs'])
30     self.selectNumberOfJobs = 1
31     else:
32     self.theNumberOfJobs = 0
33     self.selectNumberOfJobs = 0
34    
35     if self.cfg_params.has_key('CMSSW.total_number_of_events'):
36     self.total_number_of_events = int(self.cfg_params['CMSSW.total_number_of_events'])
37     self.selectTotalNumberEvents = 1
38     if self.selectNumberOfJobs == 1:
39     if (self.total_number_of_events != -1) and int(self.total_number_of_events) < int(self.theNumberOfJobs):
40     msg = 'Must specify at least one event per job. total_number_of_events > number_of_jobs '
41     raise CrabException(msg)
42     else:
43     self.total_number_of_events = 0
44     self.selectTotalNumberEvents = 0
45    
46    
47     ########################################################################
48     def jobSplittingByEvent( self ):
49     """
50     Perform job splitting. Jobs run over an integer number of files
51     and no more than one block.
52     ARGUMENT: blockSites: dictionary with blocks as keys and list of host sites as values
53     REQUIRES: self.selectTotalNumberEvents, self.selectEventsPerJob, self.selectNumberofJobs,
54     self.total_number_of_events, self.eventsPerJob, self.theNumberOfJobs,
55     self.maxEvents, self.filesbyblock
56 spiga 1.3 SETS: jobDestination - Site destination(s) for each job (a list of lists)
57 spiga 1.1 self.total_number_of_jobs - Total # of jobs
58     self.list_of_args - File(s) job will run on (a list of lists)
59     """
60    
61 spiga 1.3 jobDestination=[]
62 spiga 1.1 self.checkUserSettings()
63     if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
64     msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
65     raise CrabException(msg)
66    
67 spiga 1.4 blockSites = self.args['blockSites']
68     pubdata = self.args['pubdata']
69 spiga 1.3 filesbyblock=pubdata.getFiles()
70    
71     self.eventsbyblock=pubdata.getEventsPerBlock()
72     self.eventsbyfile=pubdata.getEventsPerFile()
73     self.parentFiles=pubdata.getParent()
74 spiga 1.1
75     ## get max number of events
76 spiga 1.3 self.maxEvents=pubdata.getMaxEvents()
77 spiga 1.1
78     self.useParent = int(self.cfg_params.get('CMSSW.use_parent',0))
79     noBboundary = int(self.cfg_params.get('CMSSW.no_block_boundary',0))
80    
81     # ---- Handle the possible job splitting configurations ---- #
82     if (self.selectTotalNumberEvents):
83     totalEventsRequested = self.total_number_of_events
84     if (self.selectEventsPerJob):
85     eventsPerJobRequested = self.eventsPerJob
86     if (self.selectNumberOfJobs):
87     totalEventsRequested = self.theNumberOfJobs * self.eventsPerJob
88    
89     # If user requested all the events in the dataset
90     if (totalEventsRequested == -1):
91     eventsRemaining=self.maxEvents
92     # If user requested more events than are in the dataset
93     elif (totalEventsRequested > self.maxEvents):
94     eventsRemaining = self.maxEvents
95     common.logger.message("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
96     # If user requested less events than are in the dataset
97     else:
98     eventsRemaining = totalEventsRequested
99    
100     # If user requested more events per job than are in the dataset
101     if (self.selectEventsPerJob and eventsPerJobRequested > self.maxEvents):
102     eventsPerJobRequested = self.maxEvents
103    
104     # For user info at end
105     totalEventCount = 0
106    
107     if (self.selectTotalNumberEvents and self.selectNumberOfJobs):
108     eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs)
109    
110     if (self.selectNumberOfJobs):
111     common.logger.message("May not create the exact number_of_jobs requested.")
112    
113     # old... to remove Daniele
114     totalNumberOfJobs = 999999999
115    
116 spiga 1.3 blocks = blockSites.keys()
117 spiga 1.1 blockCount = 0
118     # Backup variable in case self.maxEvents counted events in a non-included block
119     numBlocksInDataset = len(blocks)
120    
121     jobCount = 0
122     list_of_lists = []
123    
124     # list tracking which jobs are in which jobs belong to which block
125     jobsOfBlock = {}
126    
127     parString = ""
128     filesEventCount = 0
129    
130     # ---- Iterate over the blocks in the dataset until ---- #
131     # ---- we've met the requested total # of events ---- #
132     while ( (eventsRemaining > 0) and (blockCount < numBlocksInDataset) and (jobCount < totalNumberOfJobs)):
133     block = blocks[blockCount]
134     blockCount += 1
135     if block not in jobsOfBlock.keys() :
136     jobsOfBlock[block] = []
137    
138     if self.eventsbyblock.has_key(block) :
139     numEventsInBlock = self.eventsbyblock[block]
140     common.logger.debug(5,'Events in Block File '+str(numEventsInBlock))
141    
142 spiga 1.4 files = filesbyblock[block]
143 spiga 1.1 numFilesInBlock = len(files)
144     if (numFilesInBlock <= 0):
145     continue
146     fileCount = 0
147     if noBboundary == 0: # DD
148     # ---- New block => New job ---- #
149     parString = ""
150     # counter for number of events in files currently worked on
151     filesEventCount = 0
152     # flag if next while loop should touch new file
153     newFile = 1
154     # job event counter
155     jobSkipEventCount = 0
156    
157     # ---- Iterate over the files in the block until we've met the requested ---- #
158     # ---- total # of events or we've gone over all the files in this block ---- #
159     pString=''
160     while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
161     file = files[fileCount]
162     if self.useParent==1:
163     parent = self.parentFiles[file]
164     for f in parent :
165     pString += '\\\"' + f + '\\\"\,'
166     common.logger.debug(6, "File "+str(file)+" has the following parents: "+str(parent))
167     common.logger.write("File "+str(file)+" has the following parents: "+str(parent))
168     if newFile :
169     try:
170     numEventsInFile = self.eventsbyfile[file]
171     common.logger.debug(6, "File "+str(file)+" has "+str(numEventsInFile)+" events")
172     # increase filesEventCount
173     filesEventCount += numEventsInFile
174     # Add file to current job
175     parString += '\\\"' + file + '\\\"\,'
176     newFile = 0
177     except KeyError:
178     common.logger.message("File "+str(file)+" has unknown number of events: skipping")
179    
180     eventsPerJobRequested = min(eventsPerJobRequested, eventsRemaining)
181     # if less events in file remain than eventsPerJobRequested
182     if ( filesEventCount - jobSkipEventCount < eventsPerJobRequested):
183     if noBboundary == 1: ## DD
184     newFile = 1
185     fileCount += 1
186     else:
187     # if last file in block
188     if ( fileCount == numFilesInBlock-1 ) :
189     # end job using last file, use remaining events in block
190     # close job and touch new file
191     fullString = parString[:-2]
192     if self.useParent==1:
193     fullParentString = pString[:-2]
194     list_of_lists.append([fullString,fullParentString,str(-1),str(jobSkipEventCount)])
195     else:
196     list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
197     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(filesEventCount - jobSkipEventCount)+" events (last file in block).")
198 spiga 1.3 jobDestination.append(blockSites[block])
199     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(jobDestination[jobCount]))
200 spiga 1.1 # fill jobs of block dictionary
201     jobsOfBlock[block].append(jobCount+1)
202     # reset counter
203     jobCount = jobCount + 1
204     totalEventCount = totalEventCount + filesEventCount - jobSkipEventCount
205     eventsRemaining = eventsRemaining - filesEventCount + jobSkipEventCount
206     jobSkipEventCount = 0
207     # reset file
208     pString = ""
209     parString = ""
210     filesEventCount = 0
211     newFile = 1
212     fileCount += 1
213     else :
214     # go to next file
215     newFile = 1
216     fileCount += 1
217     # if events in file equal to eventsPerJobRequested
218     elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
219     # close job and touch new file
220     fullString = parString[:-2]
221     if self.useParent==1:
222     fullParentString = pString[:-2]
223     list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
224     else:
225     list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
226     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
227 spiga 1.3 jobDestination.append(blockSites[block])
228     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(jobDestination[jobCount]))
229 spiga 1.1 jobsOfBlock[block].append(jobCount+1)
230     # reset counter
231     jobCount = jobCount + 1
232     totalEventCount = totalEventCount + eventsPerJobRequested
233     eventsRemaining = eventsRemaining - eventsPerJobRequested
234     jobSkipEventCount = 0
235     # reset file
236     pString = ""
237     parString = ""
238     filesEventCount = 0
239     newFile = 1
240     fileCount += 1
241    
242     # if more events in file remain than eventsPerJobRequested
243     else :
244     # close job but don't touch new file
245     fullString = parString[:-2]
246     if self.useParent==1:
247     fullParentString = pString[:-2]
248     list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
249     else:
250     list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
251     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
252 spiga 1.3 jobDestination.append(blockSites[block])
253     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(jobDestination[jobCount]))
254 spiga 1.1 jobsOfBlock[block].append(jobCount+1)
255     # increase counter
256     jobCount = jobCount + 1
257     totalEventCount = totalEventCount + eventsPerJobRequested
258     eventsRemaining = eventsRemaining - eventsPerJobRequested
259     # calculate skip events for last file
260     # use filesEventCount (contains several files), jobSkipEventCount and eventsPerJobRequest
261     jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
262     # remove all but the last file
263     filesEventCount = self.eventsbyfile[file]
264     if self.useParent==1:
265     for f in parent : pString += '\\\"' + f + '\\\"\,'
266     parString = '\\\"' + file + '\\\"\,'
267     pass # END if
268     pass # END while (iterate over files in the block)
269     pass # END while (iterate over blocks in the dataset)
270     self.ncjobs = self.total_number_of_jobs = jobCount
271     if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
272     common.logger.message("Could not run on all requested events because some blocks not hosted at allowed sites.")
273     common.logger.message(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
274    
275     # skip check on block with no sites DD
276 spiga 1.5 if noBboundary == 0 : self.checkBlockNoSite(blocks,jobsOfBlock,blockSites)
277 spiga 1.1
278     # prepare dict output
279     dictOut = {}
280     dictOut['args'] = list_of_lists
281 spiga 1.3 dictOut['jobDestination'] = jobDestination
282 spiga 1.1 dictOut['njobs']=self.total_number_of_jobs
283    
284     return dictOut
285    
286     # keep trace of block with no sites to print a warning at the end
287    
288 spiga 1.5 def checkBlockNoSite(self,blocks,jobsOfBlock,blockSites):
289 spiga 1.1 # screen output
290     screenOutput = "List of jobs and available destination sites:\n\n"
291     noSiteBlock = []
292     bloskNoSite = []
293    
294     blockCounter = 0
295     for block in blocks:
296     if block in jobsOfBlock.keys() :
297     blockCounter += 1
298     screenOutput += "Block %5i: jobs %20s: sites: %s\n" % (blockCounter,spanRanges(jobsOfBlock[block]),
299 spiga 1.3 ','.join(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],[block]),[block])))
300     if len(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],[block]),[block])) == 0:
301 spiga 1.1 noSiteBlock.append( spanRanges(jobsOfBlock[block]) )
302     bloskNoSite.append( blockCounter )
303    
304     common.logger.message(screenOutput)
305     if len(noSiteBlock) > 0 and len(bloskNoSite) > 0:
306     msg = 'WARNING: No sites are hosting any part of data for block:\n '
307     virgola = ""
308     if len(bloskNoSite) > 1:
309     virgola = ","
310     for block in bloskNoSite:
311     msg += ' ' + str(block) + virgola
312     msg += '\n Related jobs:\n '
313     virgola = ""
314     if len(noSiteBlock) > 1:
315     virgola = ","
316     for range_jobs in noSiteBlock:
317     msg += str(range_jobs) + virgola
318     msg += '\n will not be submitted and this block of data can not be analyzed!\n'
319     if self.cfg_params.has_key('EDG.se_white_list'):
320     msg += 'WARNING: SE White List: '+self.cfg_params['EDG.se_white_list']+'\n'
321     msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
322     msg += 'Please check if the dataset is available at this site!)\n'
323     if self.cfg_params.has_key('EDG.ce_white_list'):
324     msg += 'WARNING: CE White List: '+self.cfg_params['EDG.ce_white_list']+'\n'
325     msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
326     msg += 'Please check if the dataset is available at this site!)\n'
327    
328     common.logger.message(msg)
329    
330     return
331    
332    
333     ########################################################################
334     def jobSplittingByRun(self):
335     """
336     """
337     from sets import Set
338     from WMCore.JobSplitting.RunBased import RunBased
339     from WMCore.DataStructs.Workflow import Workflow
340     from WMCore.DataStructs.File import File
341     from WMCore.DataStructs.Fileset import Fileset
342     from WMCore.DataStructs.Subscription import Subscription
343     from WMCore.JobSplitting.SplitterFactory import SplitterFactory
344     from WMCore.DataStructs.Run import Run
345    
346     self.checkUserSettings()
347 spiga 1.4 blockSites = self.args['blockSites']
348     pubdata = self.args['pubdata']
349 spiga 1.1
350     if self.selectNumberOfJobs == 0 :
351     self.theNumberOfJobs = 9999999
352     blocks = {}
353     runList = []
354     thefiles = Fileset(name='FilesToSplit')
355 spiga 1.3 fileList = pubdata.getListFiles()
356 spiga 1.1 for f in fileList:
357     block = f['Block']['Name']
358     try:
359 spiga 1.3 f['Block']['StorageElementList'].extend(blockSites[block])
360 spiga 1.1 except:
361     continue
362     wmbsFile = File(f['LogicalFileName'])
363 spiga 1.3 [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
364 spiga 1.1 wmbsFile['block'] = block
365     runNum = f['RunsList'][0]['RunNumber']
366     runList.append(runNum)
367     myRun = Run(runNumber=runNum)
368     wmbsFile.addRun( myRun )
369     thefiles.addFile(
370     wmbsFile
371     )
372    
373     work = Workflow()
374     subs = Subscription(
375     fileset = thefiles,
376     workflow = work,
377     split_algo = 'RunBased',
378     type = "Processing")
379     splitter = SplitterFactory()
380     jobfactory = splitter(subs)
381    
382     #loop over all runs
383     set = Set(runList)
384     list_of_lists = []
385     jobDestination = []
386    
387     count = 0
388     for i in list(set):
389     if count < self.theNumberOfJobs:
390     res = self.getJobInfo(jobfactory())
391     parString = ''
392     for file in res['lfns']:
393     parString += '\\\"' + file + '\\\"\,'
394     fullString = parString[:-2]
395     list_of_lists.append([fullString,str(-1),str(0)])
396 spiga 1.2 #need to check single file location
397 spiga 1.1 jobDestination.append(res['locations'])
398     count +=1
399     # prepare dict output
400     dictOut = {}
401     dictOut['args'] = list_of_lists
402     dictOut['jobDestination'] = jobDestination
403     dictOut['njobs']=count
404    
405     return dictOut
406    
407     def getJobInfo( self,jobGroup ):
408     res = {}
409     lfns = []
410     locations = []
411     tmp_check=0
412     for job in jobGroup.jobs:
413     for file in job.getFiles():
414     lfns.append(file['lfn'])
415     for loc in file['locations']:
416     if tmp_check < 1 :
417     locations.append(loc)
418 spiga 1.8 tmp_check = tmp_check + 1
419 spiga 1.1 ### qui va messo il check per la locations
420     res['lfns'] = lfns
421     res['locations'] = locations
422     return res
423    
424     ########################################################################
425     def jobSplittingNoInput(self):
426     """
427     Perform job splitting based on number of event per job
428     """
429     common.logger.debug(5,'Splitting per events')
430 spiga 1.3 self.checkUserSettings()
431     jobDestination=[]
432 spiga 1.6 if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
433     msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
434 spiga 1.3 raise CrabException(msg)
435    
436     managedGenerators =self.args['managedGenerators']
437     generator = self.args['generator']
438     firstRun = self.cfg_params.get('CMSSW.first_run',None)
439 spiga 1.1
440     if (self.selectEventsPerJob):
441     common.logger.message('Required '+str(self.eventsPerJob)+' events per job ')
442     if (self.selectNumberOfJobs):
443     common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
444     if (self.selectTotalNumberEvents):
445     common.logger.message('Required '+str(self.total_number_of_events)+' events in total ')
446    
447     if (self.total_number_of_events < 0):
448     msg='Cannot split jobs per Events with "-1" as total number of events'
449     raise CrabException(msg)
450    
451     if (self.selectEventsPerJob):
452     if (self.selectTotalNumberEvents):
453     self.total_number_of_jobs = int(self.total_number_of_events/self.eventsPerJob)
454     elif(self.selectNumberOfJobs) :
455     self.total_number_of_jobs =self.theNumberOfJobs
456     self.total_number_of_events =int(self.theNumberOfJobs*self.eventsPerJob)
457    
458     elif (self.selectNumberOfJobs) :
459     self.total_number_of_jobs = self.theNumberOfJobs
460     self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
461    
462     common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
463    
464     # is there any remainder?
465     check = int(self.total_number_of_events) - (int(self.total_number_of_jobs)*self.eventsPerJob)
466    
467     common.logger.debug(5,'Check '+str(check))
468    
469     common.logger.message(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
470     if check > 0:
471     common.logger.message('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
472    
473     # argument is seed number.$i
474     self.list_of_args = []
475     for i in range(self.total_number_of_jobs):
476     ## Since there is no input, any site is good
477 spiga 1.3 jobDestination.append([""]) #must be empty to write correctly the xml
478 spiga 1.1 args=[]
479 spiga 1.3 if (firstRun):
480 spiga 1.1 ## pythia first run
481 spiga 1.3 args.append(str(firstRun)+str(i))
482     if (generator in managedGenerators):
483     if (generator == 'comphep' and i == 0):
484 spiga 1.1 # COMPHEP is brain-dead and wants event #'s like 1,100,200,300
485     args.append('1')
486     else:
487     args.append(str(i*self.eventsPerJob))
488 spiga 1.7 args.append(str(self.eventsPerJob))
489 spiga 1.1 self.list_of_args.append(args)
490     # prepare dict output
491     dictOut = {}
492     dictOut['args'] = self.list_of_args
493 spiga 1.3 dictOut['jobDestination'] = jobDestination
494 spiga 1.1 dictOut['njobs']=self.total_number_of_jobs
495    
496     return dictOut
497    
498    
499     def jobSplittingForScript(self):
500     """
501     Perform job splitting based on number of job
502     """
503     self.checkUserSettings()
504 spiga 1.3 if (self.selectNumberOfJobs == 0):
505 spiga 1.1 msg = 'must specify number_of_jobs.'
506     raise crabexception(msg)
507 spiga 1.3 jobDestination = []
508 spiga 1.1 common.logger.debug(5,'Splitting per job')
509     common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
510    
511     self.total_number_of_jobs = self.theNumberOfJobs
512    
513     common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
514    
515     common.logger.message(str(self.total_number_of_jobs)+' jobs can be created')
516    
517     # argument is seed number.$i
518     self.list_of_args = []
519     for i in range(self.total_number_of_jobs):
520 spiga 1.3 jobDestination.append([""])
521 spiga 1.1 self.list_of_args.append([str(i)])
522    
523     # prepare dict output
524     dictOut = {}
525     dictOut['args'] = self.list_of_args
526 spiga 1.3 dictOut['jobDestination'] = jobDestination
527 spiga 1.1 dictOut['njobs']=self.total_number_of_jobs
528     return dictOut
529    
530    
531     def jobSplittingByLumi(self):
532     """
533     """
534     return
535     def Algos(self):
536     """
537     Define key splittingType matrix
538     """
539     SplitAlogs = {
540     'EventBased' : self.jobSplittingByEvent,
541     'RunBased' : self.jobSplittingByRun,
542     'LumiBased' : self.jobSplittingByLumi,
543     'NoInput' : self.jobSplittingNoInput,
544     'ForScript' : self.jobSplittingForScript
545     }
546     return SplitAlogs
547