ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Splitter.py
Revision: 1.2
Committed: Wed Feb 4 15:09:03 2009 UTC (16 years, 2 months ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_5_0_pre2, CRAB_2_5_0_pre1
Changes since 1.1: +1 -1 lines
Log Message:
removing comment

File Contents

# User Rev Content
1 spiga 1.1 import common
2     from crab_logger import Logger
3     from crab_exceptions import *
4     from crab_util import *
5     from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
6    
7     class JobSplitter:
8     def __init__( self, cfg_params, args ):
9     self.cfg_params = cfg_params
10     self.blockSites = args['blockSites']
11     self.pubdata = args['pubdata']
12     #self.maxEvents
13     self.jobDestination=[] # Site destination(s) for each job (list of lists)
14     # init BlackWhiteListParser
15     seWhiteList = cfg_params.get('EDG.se_white_list',[])
16     seBlackList = cfg_params.get('EDG.se_black_list',[])
17     self.blackWhiteListParser = SEBlackWhiteListParser(seWhiteList, seBlackList, common.logger)
18    
19    
20     def checkUserSettings(self):
21     ## Events per job
22     if self.cfg_params.has_key('CMSSW.events_per_job'):
23     self.eventsPerJob =int( self.cfg_params['CMSSW.events_per_job'])
24     self.selectEventsPerJob = 1
25     else:
26     self.eventsPerJob = -1
27     self.selectEventsPerJob = 0
28    
29     ## number of jobs
30     if self.cfg_params.has_key('CMSSW.number_of_jobs'):
31     self.theNumberOfJobs =int( self.cfg_params['CMSSW.number_of_jobs'])
32     self.selectNumberOfJobs = 1
33     else:
34     self.theNumberOfJobs = 0
35     self.selectNumberOfJobs = 0
36    
37     if self.cfg_params.has_key('CMSSW.total_number_of_events'):
38     self.total_number_of_events = int(self.cfg_params['CMSSW.total_number_of_events'])
39     self.selectTotalNumberEvents = 1
40     if self.selectNumberOfJobs == 1:
41     if (self.total_number_of_events != -1) and int(self.total_number_of_events) < int(self.theNumberOfJobs):
42     msg = 'Must specify at least one event per job. total_number_of_events > number_of_jobs '
43     raise CrabException(msg)
44     else:
45     self.total_number_of_events = 0
46     self.selectTotalNumberEvents = 0
47    
48    
49     ########################################################################
50     def jobSplittingByEvent( self ):
51     """
52     Perform job splitting. Jobs run over an integer number of files
53     and no more than one block.
54     ARGUMENT: blockSites: dictionary with blocks as keys and list of host sites as values
55     REQUIRES: self.selectTotalNumberEvents, self.selectEventsPerJob, self.selectNumberofJobs,
56     self.total_number_of_events, self.eventsPerJob, self.theNumberOfJobs,
57     self.maxEvents, self.filesbyblock
58     SETS: self.jobDestination - Site destination(s) for each job (a list of lists)
59     self.total_number_of_jobs - Total # of jobs
60     self.list_of_args - File(s) job will run on (a list of lists)
61     """
62    
63     self.checkUserSettings()
64     if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
65     msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
66     raise CrabException(msg)
67    
68     self.filesbyblock=self.pubdata.getFiles()
69    
70     self.eventsbyblock=self.pubdata.getEventsPerBlock()
71     self.eventsbyfile=self.pubdata.getEventsPerFile()
72     self.parentFiles=self.pubdata.getParent()
73    
74     ## get max number of events
75     self.maxEvents=self.pubdata.getMaxEvents()
76    
77     self.useParent = int(self.cfg_params.get('CMSSW.use_parent',0))
78     noBboundary = int(self.cfg_params.get('CMSSW.no_block_boundary',0))
79    
80     # ---- Handle the possible job splitting configurations ---- #
81     if (self.selectTotalNumberEvents):
82     totalEventsRequested = self.total_number_of_events
83     if (self.selectEventsPerJob):
84     eventsPerJobRequested = self.eventsPerJob
85     if (self.selectNumberOfJobs):
86     totalEventsRequested = self.theNumberOfJobs * self.eventsPerJob
87    
88     # If user requested all the events in the dataset
89     if (totalEventsRequested == -1):
90     eventsRemaining=self.maxEvents
91     # If user requested more events than are in the dataset
92     elif (totalEventsRequested > self.maxEvents):
93     eventsRemaining = self.maxEvents
94     common.logger.message("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
95     # If user requested less events than are in the dataset
96     else:
97     eventsRemaining = totalEventsRequested
98    
99     # If user requested more events per job than are in the dataset
100     if (self.selectEventsPerJob and eventsPerJobRequested > self.maxEvents):
101     eventsPerJobRequested = self.maxEvents
102    
103     # For user info at end
104     totalEventCount = 0
105    
106     if (self.selectTotalNumberEvents and self.selectNumberOfJobs):
107     eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs)
108    
109     if (self.selectNumberOfJobs):
110     common.logger.message("May not create the exact number_of_jobs requested.")
111    
112     # old... to remove Daniele
113     totalNumberOfJobs = 999999999
114    
115     blocks = self.blockSites.keys()
116     blockCount = 0
117     # Backup variable in case self.maxEvents counted events in a non-included block
118     numBlocksInDataset = len(blocks)
119    
120     jobCount = 0
121     list_of_lists = []
122    
123     # list tracking which jobs are in which jobs belong to which block
124     jobsOfBlock = {}
125    
126     parString = ""
127     filesEventCount = 0
128    
129     # ---- Iterate over the blocks in the dataset until ---- #
130     # ---- we've met the requested total # of events ---- #
131     while ( (eventsRemaining > 0) and (blockCount < numBlocksInDataset) and (jobCount < totalNumberOfJobs)):
132     block = blocks[blockCount]
133     blockCount += 1
134     if block not in jobsOfBlock.keys() :
135     jobsOfBlock[block] = []
136    
137     if self.eventsbyblock.has_key(block) :
138     numEventsInBlock = self.eventsbyblock[block]
139     common.logger.debug(5,'Events in Block File '+str(numEventsInBlock))
140    
141     files = self.filesbyblock[block]
142     numFilesInBlock = len(files)
143     if (numFilesInBlock <= 0):
144     continue
145     fileCount = 0
146     if noBboundary == 0: # DD
147     # ---- New block => New job ---- #
148     parString = ""
149     # counter for number of events in files currently worked on
150     filesEventCount = 0
151     # flag if next while loop should touch new file
152     newFile = 1
153     # job event counter
154     jobSkipEventCount = 0
155    
156     # ---- Iterate over the files in the block until we've met the requested ---- #
157     # ---- total # of events or we've gone over all the files in this block ---- #
158     pString=''
159     while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
160     file = files[fileCount]
161     if self.useParent==1:
162     parent = self.parentFiles[file]
163     for f in parent :
164     pString += '\\\"' + f + '\\\"\,'
165     common.logger.debug(6, "File "+str(file)+" has the following parents: "+str(parent))
166     common.logger.write("File "+str(file)+" has the following parents: "+str(parent))
167     if newFile :
168     try:
169     numEventsInFile = self.eventsbyfile[file]
170     common.logger.debug(6, "File "+str(file)+" has "+str(numEventsInFile)+" events")
171     # increase filesEventCount
172     filesEventCount += numEventsInFile
173     # Add file to current job
174     parString += '\\\"' + file + '\\\"\,'
175     newFile = 0
176     except KeyError:
177     common.logger.message("File "+str(file)+" has unknown number of events: skipping")
178    
179     eventsPerJobRequested = min(eventsPerJobRequested, eventsRemaining)
180     # if less events in file remain than eventsPerJobRequested
181     if ( filesEventCount - jobSkipEventCount < eventsPerJobRequested):
182     if noBboundary == 1: ## DD
183     newFile = 1
184     fileCount += 1
185     else:
186     # if last file in block
187     if ( fileCount == numFilesInBlock-1 ) :
188     # end job using last file, use remaining events in block
189     # close job and touch new file
190     fullString = parString[:-2]
191     if self.useParent==1:
192     fullParentString = pString[:-2]
193     list_of_lists.append([fullString,fullParentString,str(-1),str(jobSkipEventCount)])
194     else:
195     list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
196     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(filesEventCount - jobSkipEventCount)+" events (last file in block).")
197     self.jobDestination.append(self.blockSites[block])
198     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
199     # fill jobs of block dictionary
200     jobsOfBlock[block].append(jobCount+1)
201     # reset counter
202     jobCount = jobCount + 1
203     totalEventCount = totalEventCount + filesEventCount - jobSkipEventCount
204     eventsRemaining = eventsRemaining - filesEventCount + jobSkipEventCount
205     jobSkipEventCount = 0
206     # reset file
207     pString = ""
208     parString = ""
209     filesEventCount = 0
210     newFile = 1
211     fileCount += 1
212     else :
213     # go to next file
214     newFile = 1
215     fileCount += 1
216     # if events in file equal to eventsPerJobRequested
217     elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
218     # close job and touch new file
219     fullString = parString[:-2]
220     if self.useParent==1:
221     fullParentString = pString[:-2]
222     list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
223     else:
224     list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
225     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
226     self.jobDestination.append(self.blockSites[block])
227     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
228     jobsOfBlock[block].append(jobCount+1)
229     # reset counter
230     jobCount = jobCount + 1
231     totalEventCount = totalEventCount + eventsPerJobRequested
232     eventsRemaining = eventsRemaining - eventsPerJobRequested
233     jobSkipEventCount = 0
234     # reset file
235     pString = ""
236     parString = ""
237     filesEventCount = 0
238     newFile = 1
239     fileCount += 1
240    
241     # if more events in file remain than eventsPerJobRequested
242     else :
243     # close job but don't touch new file
244     fullString = parString[:-2]
245     if self.useParent==1:
246     fullParentString = pString[:-2]
247     list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
248     else:
249     list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
250     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
251     self.jobDestination.append(self.blockSites[block])
252     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
253     jobsOfBlock[block].append(jobCount+1)
254     # increase counter
255     jobCount = jobCount + 1
256     totalEventCount = totalEventCount + eventsPerJobRequested
257     eventsRemaining = eventsRemaining - eventsPerJobRequested
258     # calculate skip events for last file
259     # use filesEventCount (contains several files), jobSkipEventCount and eventsPerJobRequest
260     jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
261     # remove all but the last file
262     filesEventCount = self.eventsbyfile[file]
263     if self.useParent==1:
264     for f in parent : pString += '\\\"' + f + '\\\"\,'
265     parString = '\\\"' + file + '\\\"\,'
266     pass # END if
267     pass # END while (iterate over files in the block)
268     pass # END while (iterate over blocks in the dataset)
269     self.ncjobs = self.total_number_of_jobs = jobCount
270     if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
271     common.logger.message("Could not run on all requested events because some blocks not hosted at allowed sites.")
272     common.logger.message(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
273    
274     # skip check on block with no sites DD
275     if noBboundary == 0 : self.checkBlockNoSite(blocks,jobsOfBlock)
276    
277     # prepare dict output
278     dictOut = {}
279     dictOut['args'] = list_of_lists
280     dictOut['jobDestination'] = self.jobDestination
281     dictOut['njobs']=self.total_number_of_jobs
282    
283     return dictOut
284    
285     # keep trace of block with no sites to print a warning at the end
286    
287     def checkBlockNoSite(self,blocks,jobsOfBlock):
288     # screen output
289     screenOutput = "List of jobs and available destination sites:\n\n"
290     noSiteBlock = []
291     bloskNoSite = []
292    
293     blockCounter = 0
294     for block in blocks:
295     if block in jobsOfBlock.keys() :
296     blockCounter += 1
297     screenOutput += "Block %5i: jobs %20s: sites: %s\n" % (blockCounter,spanRanges(jobsOfBlock[block]),
298     ','.join(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(self.blockSites[block],block),block)))
299     if len(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(self.blockSites[block],block),block)) == 0:
300     noSiteBlock.append( spanRanges(jobsOfBlock[block]) )
301     bloskNoSite.append( blockCounter )
302    
303     common.logger.message(screenOutput)
304     if len(noSiteBlock) > 0 and len(bloskNoSite) > 0:
305     msg = 'WARNING: No sites are hosting any part of data for block:\n '
306     virgola = ""
307     if len(bloskNoSite) > 1:
308     virgola = ","
309     for block in bloskNoSite:
310     msg += ' ' + str(block) + virgola
311     msg += '\n Related jobs:\n '
312     virgola = ""
313     if len(noSiteBlock) > 1:
314     virgola = ","
315     for range_jobs in noSiteBlock:
316     msg += str(range_jobs) + virgola
317     msg += '\n will not be submitted and this block of data can not be analyzed!\n'
318     if self.cfg_params.has_key('EDG.se_white_list'):
319     msg += 'WARNING: SE White List: '+self.cfg_params['EDG.se_white_list']+'\n'
320     msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
321     msg += 'Please check if the dataset is available at this site!)\n'
322     if self.cfg_params.has_key('EDG.ce_white_list'):
323     msg += 'WARNING: CE White List: '+self.cfg_params['EDG.ce_white_list']+'\n'
324     msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
325     msg += 'Please check if the dataset is available at this site!)\n'
326    
327     common.logger.message(msg)
328    
329     return
330    
331    
332     ########################################################################
333     def jobSplittingByRun(self):
334     """
335     """
336     from sets import Set
337     from WMCore.JobSplitting.RunBased import RunBased
338     from WMCore.DataStructs.Workflow import Workflow
339     from WMCore.DataStructs.File import File
340     from WMCore.DataStructs.Fileset import Fileset
341     from WMCore.DataStructs.Subscription import Subscription
342     from WMCore.JobSplitting.SplitterFactory import SplitterFactory
343     from WMCore.DataStructs.Run import Run
344    
345     self.checkUserSettings()
346    
347     if self.selectNumberOfJobs == 0 :
348     self.theNumberOfJobs = 9999999
349     blocks = {}
350     runList = []
351     thefiles = Fileset(name='FilesToSplit')
352     fileList = self.pubdata.getListFiles()
353     for f in fileList:
354     # print f
355     block = f['Block']['Name']
356     # if not blocks.has_key(block):
357     # blocks[block] = reader.listFileBlockLocation(block)
358     try:
359     f['Block']['StorageElementList'].extend(self.blockSites[block])
360     except:
361     continue
362     wmbsFile = File(f['LogicalFileName'])
363     [ wmbsFile['locations'].add(x) for x in self.blockSites[block] ]
364     wmbsFile['block'] = block
365     runNum = f['RunsList'][0]['RunNumber']
366     runList.append(runNum)
367     myRun = Run(runNumber=runNum)
368     wmbsFile.addRun( myRun )
369     thefiles.addFile(
370     wmbsFile
371     )
372    
373     work = Workflow()
374     subs = Subscription(
375     fileset = thefiles,
376     workflow = work,
377     split_algo = 'RunBased',
378     type = "Processing")
379     splitter = SplitterFactory()
380     jobfactory = splitter(subs)
381    
382     #loop over all runs
383     set = Set(runList)
384     list_of_lists = []
385     jobDestination = []
386    
387     count = 0
388     for i in list(set):
389     if count < self.theNumberOfJobs:
390     res = self.getJobInfo(jobfactory())
391     parString = ''
392     for file in res['lfns']:
393     parString += '\\\"' + file + '\\\"\,'
394     fullString = parString[:-2]
395     list_of_lists.append([fullString,str(-1),str(0)])
396 spiga 1.2 #need to check single file location
397 spiga 1.1 jobDestination.append(res['locations'])
398     count +=1
399     #print jobDestination
400     # prepare dict output
401     dictOut = {}
402     dictOut['args'] = list_of_lists
403     dictOut['jobDestination'] = jobDestination
404     dictOut['njobs']=count
405    
406     return dictOut
407    
408     def getJobInfo( self,jobGroup ):
409     res = {}
410     lfns = []
411     locations = []
412     tmp_check=0
413     for job in jobGroup.jobs:
414     for file in job.getFiles():
415     lfns.append(file['lfn'])
416     for loc in file['locations']:
417     if tmp_check < 1 :
418     locations.append(loc)
419     tmp_check = tmp_check + 1
420     ### qui va messo il check per la locations
421     res['lfns'] = lfns
422     res['locations'] = locations
423     return res
424    
425     ########################################################################
426     def jobSplittingNoInput(self):
427     """
428     Perform job splitting based on number of event per job
429     """
430     common.logger.debug(5,'Splitting per events')
431    
432     if (self.selectEventsPerJob):
433     common.logger.message('Required '+str(self.eventsPerJob)+' events per job ')
434     if (self.selectNumberOfJobs):
435     common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
436     if (self.selectTotalNumberEvents):
437     common.logger.message('Required '+str(self.total_number_of_events)+' events in total ')
438    
439     if (self.total_number_of_events < 0):
440     msg='Cannot split jobs per Events with "-1" as total number of events'
441     raise CrabException(msg)
442    
443     if (self.selectEventsPerJob):
444     if (self.selectTotalNumberEvents):
445     self.total_number_of_jobs = int(self.total_number_of_events/self.eventsPerJob)
446     elif(self.selectNumberOfJobs) :
447     self.total_number_of_jobs =self.theNumberOfJobs
448     self.total_number_of_events =int(self.theNumberOfJobs*self.eventsPerJob)
449    
450     elif (self.selectNumberOfJobs) :
451     self.total_number_of_jobs = self.theNumberOfJobs
452     self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
453    
454     common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
455    
456     # is there any remainder?
457     check = int(self.total_number_of_events) - (int(self.total_number_of_jobs)*self.eventsPerJob)
458    
459     common.logger.debug(5,'Check '+str(check))
460    
461     common.logger.message(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
462     if check > 0:
463     common.logger.message('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
464    
465     # argument is seed number.$i
466     self.list_of_args = []
467     for i in range(self.total_number_of_jobs):
468     ## Since there is no input, any site is good
469     self.jobDestination.append([""]) #must be empty to write correctly the xml
470     args=[]
471     if (self.firstRun):
472     ## pythia first run
473     args.append(str(self.firstRun)+str(i))
474     if (self.generator in self.managedGenerators):
475     if (self.generator == 'comphep' and i == 0):
476     # COMPHEP is brain-dead and wants event #'s like 1,100,200,300
477     args.append('1')
478     else:
479     args.append(str(i*self.eventsPerJob))
480     self.list_of_args.append(args)
481     # prepare dict output
482     dictOut = {}
483     dictOut['args'] = self.list_of_args
484     dictOut['jobDestination'] = self.jobDestination
485     dictOut['njobs']=self.total_number_of_jobs
486    
487     return dictOut
488    
489    
490     def jobSplittingForScript(self):
491     """
492     Perform job splitting based on number of job
493     """
494     self.checkUserSettings()
495     if (self.selectnumberofjobs == 0):
496     msg = 'must specify number_of_jobs.'
497     raise crabexception(msg)
498    
499     common.logger.debug(5,'Splitting per job')
500     common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
501    
502     self.total_number_of_jobs = self.theNumberOfJobs
503    
504     common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
505    
506     common.logger.message(str(self.total_number_of_jobs)+' jobs can be created')
507    
508     # argument is seed number.$i
509     self.list_of_args = []
510     for i in range(self.total_number_of_jobs):
511     self.jobDestination.append([""])
512     self.list_of_args.append([str(i)])
513    
514     # prepare dict output
515     dictOut = {}
516     dictOut['args'] = self.list_of_args
517     dictOut['jobDestination'] = []
518     dictOut['njobs']=self.total_number_of_jobs
519     return dictOut
520    
521    
522     def jobSplittingByLumi(self):
523     """
524     """
525     return
526     def Algos(self):
527     """
528     Define key splittingType matrix
529     """
530     SplitAlogs = {
531     'EventBased' : self.jobSplittingByEvent,
532     'RunBased' : self.jobSplittingByRun,
533     'LumiBased' : self.jobSplittingByLumi,
534     'NoInput' : self.jobSplittingNoInput,
535     'ForScript' : self.jobSplittingForScript
536     }
537     return SplitAlogs
538