ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Splitter.py
Revision: 1.30
Committed: Mon Dec 14 22:33:54 2009 UTC (15 years, 4 months ago) by ewv
Content type: text/x-python
Branch: MAIN
Changes since 1.29: +9 -9 lines
Log Message:
Lumi related changes for 2.7: first_run replaced by first_lumi, defaults to 1 and -report write lumiSummary.json to res/ directory

File Contents

# User Rev Content
1 ewv 1.27
2 ewv 1.30 __revision__ = "$Id: Splitter.py,v 1.29.2.1 2009/11/09 22:23:37 ewv Exp $"
3     __version__ = "$Revision: 1.29.2.1 $"
4 ewv 1.27
5 spiga 1.1 import common
6 ewv 1.26 from sets import Set
7 spiga 1.1 from crab_exceptions import *
8     from crab_util import *
9 ewv 1.26
10     from WMCore.DataStructs.File import File
11     from WMCore.DataStructs.Fileset import Fileset
12     from WMCore.DataStructs.Run import Run
13     from WMCore.DataStructs.Subscription import Subscription
14     from WMCore.DataStructs.Workflow import Workflow
15     from WMCore.JobSplitting.SplitterFactory import SplitterFactory
16 spiga 1.1 from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
17    
18     class JobSplitter:
19     def __init__( self, cfg_params, args ):
20     self.cfg_params = cfg_params
21 spiga 1.3 self.args=args
22 ewv 1.27
23     self.lumisPerJob = -1
24     self.totalNLumis = 0
25     self.theNumberOfJobs = 0
26     self.limitNJobs = False
27     self.limitTotalLumis = False
28     self.limitJobLumis = False
29    
30 spiga 1.1 #self.maxEvents
31     # init BlackWhiteListParser
32 spiga 1.24 self.seWhiteList = cfg_params.get('GRID.se_white_list',[])
33 spiga 1.14 seBlackList = cfg_params.get('GRID.se_black_list',[])
34 spiga 1.24 self.blackWhiteListParser = SEBlackWhiteListParser(self.seWhiteList, seBlackList, common.logger())
35 spiga 1.1
36    
37     def checkUserSettings(self):
38     ## Events per job
39     if self.cfg_params.has_key('CMSSW.events_per_job'):
40     self.eventsPerJob =int( self.cfg_params['CMSSW.events_per_job'])
41     self.selectEventsPerJob = 1
42     else:
43     self.eventsPerJob = -1
44     self.selectEventsPerJob = 0
45    
46     ## number of jobs
47     if self.cfg_params.has_key('CMSSW.number_of_jobs'):
48     self.theNumberOfJobs =int( self.cfg_params['CMSSW.number_of_jobs'])
49     self.selectNumberOfJobs = 1
50     else:
51     self.theNumberOfJobs = 0
52     self.selectNumberOfJobs = 0
53    
54     if self.cfg_params.has_key('CMSSW.total_number_of_events'):
55     self.total_number_of_events = int(self.cfg_params['CMSSW.total_number_of_events'])
56     self.selectTotalNumberEvents = 1
57     if self.selectNumberOfJobs == 1:
58     if (self.total_number_of_events != -1) and int(self.total_number_of_events) < int(self.theNumberOfJobs):
59     msg = 'Must specify at least one event per job. total_number_of_events > number_of_jobs '
60     raise CrabException(msg)
61     else:
62     self.total_number_of_events = 0
63     self.selectTotalNumberEvents = 0
64    
65    
66 ewv 1.27 def checkLumiSettings(self):
67     """
68     Check to make sure the user has specified enough information to
69     perform splitting by Lumis to run the job
70     """
71     settings = 0
72     if self.cfg_params.has_key('CMSSW.lumis_per_job'):
73     self.lumisPerJob =int( self.cfg_params['CMSSW.lumis_per_job'])
74     self.limitJobLumis = True
75     settings += 1
76    
77     if self.cfg_params.has_key('CMSSW.number_of_jobs'):
78     self.theNumberOfJobs =int( self.cfg_params['CMSSW.number_of_jobs'])
79     self.limitNJobs = True
80     settings += 1
81    
82     if self.cfg_params.has_key('CMSSW.total_number_of_lumis'):
83     self.totalNLumis = int(self.cfg_params['CMSSW.total_number_of_lumis'])
84     self.limitTotalLumis = (self.totalNLumis != -1)
85     settings += 1
86    
87     if settings != 2:
88     msg = 'When running on analysis datasets you must specify two and only two of:\n'
89 ewv 1.28 msg += ' number_of_jobs, lumis_per_job, total_number_of_lumis'
90 ewv 1.27 raise CrabException(msg)
91     if self.limitNJobs and self.limitJobLumis:
92     self.limitTotalLumis = True
93     self.totalNLumis = self.lumisPerJob * self.theNumberOfJobs
94    
95    
96 spiga 1.24 def ComputeSubBlockSites( self, blockSites ):
97     """
98     """
99     sub_blockSites = {}
100     for k,v in blockSites.iteritems():
101     sites=self.blackWhiteListParser.checkWhiteList(v)
102     if sites : sub_blockSites[k]=v
103     if len(sub_blockSites) < 1:
104     msg = 'WARNING: the sites %s is not hosting any part of data.'%self.seWhiteList
105     raise CrabException(msg)
106     return sub_blockSites
107    
108 spiga 1.1 ########################################################################
109     def jobSplittingByEvent( self ):
110     """
111     Perform job splitting. Jobs run over an integer number of files
112     and no more than one block.
113     ARGUMENT: blockSites: dictionary with blocks as keys and list of host sites as values
114     REQUIRES: self.selectTotalNumberEvents, self.selectEventsPerJob, self.selectNumberofJobs,
115     self.total_number_of_events, self.eventsPerJob, self.theNumberOfJobs,
116     self.maxEvents, self.filesbyblock
117 spiga 1.3 SETS: jobDestination - Site destination(s) for each job (a list of lists)
118 spiga 1.1 self.total_number_of_jobs - Total # of jobs
119     self.list_of_args - File(s) job will run on (a list of lists)
120     """
121    
122 ewv 1.22 jobDestination=[]
123 spiga 1.1 self.checkUserSettings()
124     if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
125     msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
126     raise CrabException(msg)
127 ewv 1.22
128     blockSites = self.args['blockSites']
129 spiga 1.4 pubdata = self.args['pubdata']
130 spiga 1.3 filesbyblock=pubdata.getFiles()
131    
132     self.eventsbyblock=pubdata.getEventsPerBlock()
133     self.eventsbyfile=pubdata.getEventsPerFile()
134     self.parentFiles=pubdata.getParent()
135 spiga 1.1
136     ## get max number of events
137 spiga 1.3 self.maxEvents=pubdata.getMaxEvents()
138 spiga 1.1
139     self.useParent = int(self.cfg_params.get('CMSSW.use_parent',0))
140     noBboundary = int(self.cfg_params.get('CMSSW.no_block_boundary',0))
141    
142 spiga 1.24 if noBboundary == 1:
143     if self.total_number_of_events== -1:
144     msg = 'You are selecting no_block_boundary=1 which does not allow to set total_number_of_events=-1\n'
145 ewv 1.26 msg +='\tYou shoud get the number of event from DBS web interface and use it for your configuration.'
146     raise CrabException(msg)
147 spiga 1.25 if len(self.seWhiteList) == 0 or len(self.seWhiteList.split(',')) != 1:
148 spiga 1.24 msg = 'You are selecting no_block_boundary=1 which requires to choose one and only one site.\n'
149     msg += "\tPlease set se_white_list with the site's storage element name."
150 ewv 1.26 raise CrabException(msg)
151     blockSites = self.ComputeSubBlockSites(blockSites)
152 spiga 1.24
153 spiga 1.1 # ---- Handle the possible job splitting configurations ---- #
154     if (self.selectTotalNumberEvents):
155     totalEventsRequested = self.total_number_of_events
156     if (self.selectEventsPerJob):
157     eventsPerJobRequested = self.eventsPerJob
158     if (self.selectNumberOfJobs):
159     totalEventsRequested = self.theNumberOfJobs * self.eventsPerJob
160    
161     # If user requested all the events in the dataset
162     if (totalEventsRequested == -1):
163     eventsRemaining=self.maxEvents
164     # If user requested more events than are in the dataset
165     elif (totalEventsRequested > self.maxEvents):
166     eventsRemaining = self.maxEvents
167 spiga 1.13 common.logger.info("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
168 spiga 1.1 # If user requested less events than are in the dataset
169     else:
170     eventsRemaining = totalEventsRequested
171    
172     # If user requested more events per job than are in the dataset
173     if (self.selectEventsPerJob and eventsPerJobRequested > self.maxEvents):
174     eventsPerJobRequested = self.maxEvents
175    
176     # For user info at end
177     totalEventCount = 0
178    
179     if (self.selectTotalNumberEvents and self.selectNumberOfJobs):
180     eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs)
181    
182     if (self.selectNumberOfJobs):
183 spiga 1.13 common.logger.info("May not create the exact number_of_jobs requested.")
184 spiga 1.1
185     # old... to remove Daniele
186     totalNumberOfJobs = 999999999
187    
188 spiga 1.3 blocks = blockSites.keys()
189 spiga 1.1 blockCount = 0
190     # Backup variable in case self.maxEvents counted events in a non-included block
191     numBlocksInDataset = len(blocks)
192    
193     jobCount = 0
194     list_of_lists = []
195    
196     # list tracking which jobs are in which jobs belong to which block
197     jobsOfBlock = {}
198    
199     parString = ""
200 spiga 1.16 pString = ""
201 spiga 1.1 filesEventCount = 0
202 ewv 1.22 msg=''
203 spiga 1.1
204     # ---- Iterate over the blocks in the dataset until ---- #
205     # ---- we've met the requested total # of events ---- #
206     while ( (eventsRemaining > 0) and (blockCount < numBlocksInDataset) and (jobCount < totalNumberOfJobs)):
207     block = blocks[blockCount]
208     blockCount += 1
209     if block not in jobsOfBlock.keys() :
210     jobsOfBlock[block] = []
211    
212     if self.eventsbyblock.has_key(block) :
213     numEventsInBlock = self.eventsbyblock[block]
214 spiga 1.13 common.logger.debug('Events in Block File '+str(numEventsInBlock))
215 spiga 1.1
216 spiga 1.4 files = filesbyblock[block]
217 spiga 1.1 numFilesInBlock = len(files)
218     if (numFilesInBlock <= 0):
219     continue
220     fileCount = 0
221     if noBboundary == 0: # DD
222     # ---- New block => New job ---- #
223     parString = ""
224 spiga 1.16 pString=""
225 spiga 1.1 # counter for number of events in files currently worked on
226     filesEventCount = 0
227     # flag if next while loop should touch new file
228     newFile = 1
229     # job event counter
230     jobSkipEventCount = 0
231    
232     # ---- Iterate over the files in the block until we've met the requested ---- #
233     # ---- total # of events or we've gone over all the files in this block ---- #
234 spiga 1.15 msg='\n'
235 spiga 1.1 while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
236     file = files[fileCount]
237     if self.useParent==1:
238     parent = self.parentFiles[file]
239 spiga 1.13 common.logger.log(10-1, "File "+str(file)+" has the following parents: "+str(parent))
240 spiga 1.1 if newFile :
241     try:
242     numEventsInFile = self.eventsbyfile[file]
243 spiga 1.13 common.logger.log(10-1, "File "+str(file)+" has "+str(numEventsInFile)+" events")
244 spiga 1.1 # increase filesEventCount
245     filesEventCount += numEventsInFile
246     # Add file to current job
247 spiga 1.11 parString += file + ','
248 spiga 1.16 if self.useParent==1:
249     for f in parent :
250     pString += f + ','
251 spiga 1.1 newFile = 0
252     except KeyError:
253 spiga 1.13 common.logger.info("File "+str(file)+" has unknown number of events: skipping")
254 spiga 1.1
255     eventsPerJobRequested = min(eventsPerJobRequested, eventsRemaining)
256     # if less events in file remain than eventsPerJobRequested
257     if ( filesEventCount - jobSkipEventCount < eventsPerJobRequested):
258     if noBboundary == 1: ## DD
259     newFile = 1
260     fileCount += 1
261     else:
262     # if last file in block
263     if ( fileCount == numFilesInBlock-1 ) :
264     # end job using last file, use remaining events in block
265     # close job and touch new file
266 spiga 1.11 fullString = parString[:-1]
267 spiga 1.1 if self.useParent==1:
268 spiga 1.11 fullParentString = pString[:-1]
269 spiga 1.1 list_of_lists.append([fullString,fullParentString,str(-1),str(jobSkipEventCount)])
270     else:
271     list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
272 spiga 1.15 msg += "Job %s can run over %s events (last file in block).\n"%(str(jobCount+1), str(filesEventCount - jobSkipEventCount))
273 spiga 1.3 jobDestination.append(blockSites[block])
274 slacapra 1.20 msg += "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
275 spiga 1.1 # fill jobs of block dictionary
276     jobsOfBlock[block].append(jobCount+1)
277     # reset counter
278     jobCount = jobCount + 1
279     totalEventCount = totalEventCount + filesEventCount - jobSkipEventCount
280     eventsRemaining = eventsRemaining - filesEventCount + jobSkipEventCount
281     jobSkipEventCount = 0
282     # reset file
283     pString = ""
284     parString = ""
285     filesEventCount = 0
286     newFile = 1
287     fileCount += 1
288     else :
289     # go to next file
290     newFile = 1
291     fileCount += 1
292     # if events in file equal to eventsPerJobRequested
293     elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
294     # close job and touch new file
295 spiga 1.11 fullString = parString[:-1]
296 spiga 1.1 if self.useParent==1:
297 spiga 1.11 fullParentString = pString[:-1]
298 spiga 1.1 list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
299     else:
300     list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
301 spiga 1.15 msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
302 spiga 1.3 jobDestination.append(blockSites[block])
303 slacapra 1.20 msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
304 spiga 1.1 jobsOfBlock[block].append(jobCount+1)
305     # reset counter
306     jobCount = jobCount + 1
307     totalEventCount = totalEventCount + eventsPerJobRequested
308     eventsRemaining = eventsRemaining - eventsPerJobRequested
309     jobSkipEventCount = 0
310     # reset file
311     pString = ""
312     parString = ""
313     filesEventCount = 0
314     newFile = 1
315     fileCount += 1
316    
317     # if more events in file remain than eventsPerJobRequested
318     else :
319     # close job but don't touch new file
320 spiga 1.11 fullString = parString[:-1]
321 spiga 1.1 if self.useParent==1:
322 spiga 1.11 fullParentString = pString[:-1]
323 spiga 1.1 list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
324     else:
325     list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
326 spiga 1.15 msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
327 spiga 1.3 jobDestination.append(blockSites[block])
328 slacapra 1.20 msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
329 spiga 1.1 jobsOfBlock[block].append(jobCount+1)
330     # increase counter
331     jobCount = jobCount + 1
332     totalEventCount = totalEventCount + eventsPerJobRequested
333     eventsRemaining = eventsRemaining - eventsPerJobRequested
334     # calculate skip events for last file
335     # use filesEventCount (contains several files), jobSkipEventCount and eventsPerJobRequest
336     jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
337     # remove all but the last file
338     filesEventCount = self.eventsbyfile[file]
339 spiga 1.16 pString_tmp=''
340 spiga 1.1 if self.useParent==1:
341 spiga 1.16 for f in parent : pString_tmp += f + ','
342 ewv 1.22 pString = pString_tmp
343 spiga 1.11 parString = file + ','
344 spiga 1.1 pass # END if
345     pass # END while (iterate over files in the block)
346     pass # END while (iterate over blocks in the dataset)
347 spiga 1.15 common.logger.debug(msg)
348 spiga 1.1 self.ncjobs = self.total_number_of_jobs = jobCount
349     if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
350 spiga 1.13 common.logger.info("Could not run on all requested events because some blocks not hosted at allowed sites.")
351     common.logger.info(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
352 ewv 1.22
353 spiga 1.1 # skip check on block with no sites DD
354 spiga 1.5 if noBboundary == 0 : self.checkBlockNoSite(blocks,jobsOfBlock,blockSites)
355 spiga 1.1
356     # prepare dict output
357     dictOut = {}
358 spiga 1.11 dictOut['params']= ['InputFiles','MaxEvents','SkipEvents']
359     if self.useParent: dictOut['params']= ['InputFiles','ParentFiles','MaxEvents','SkipEvents']
360 spiga 1.1 dictOut['args'] = list_of_lists
361 spiga 1.3 dictOut['jobDestination'] = jobDestination
362 spiga 1.1 dictOut['njobs']=self.total_number_of_jobs
363    
364     return dictOut
365    
366     # keep trace of block with no sites to print a warning at the end
367    
368 ewv 1.22 def checkBlockNoSite(self,blocks,jobsOfBlock,blockSites):
369 spiga 1.1 # screen output
370     screenOutput = "List of jobs and available destination sites:\n\n"
371     noSiteBlock = []
372     bloskNoSite = []
373 spiga 1.10 allBlock = []
374 spiga 1.1
375     blockCounter = 0
376     for block in blocks:
377     if block in jobsOfBlock.keys() :
378     blockCounter += 1
379 spiga 1.10 allBlock.append( blockCounter )
380 slacapra 1.19 sites=self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],[block]),[block])
381 spiga 1.1 screenOutput += "Block %5i: jobs %20s: sites: %s\n" % (blockCounter,spanRanges(jobsOfBlock[block]),
382 slacapra 1.20 ', '.join(SE2CMS(sites)))
383 slacapra 1.19 if len(sites) == 0:
384 spiga 1.1 noSiteBlock.append( spanRanges(jobsOfBlock[block]) )
385     bloskNoSite.append( blockCounter )
386    
387 spiga 1.13 common.logger.info(screenOutput)
388 spiga 1.1 if len(noSiteBlock) > 0 and len(bloskNoSite) > 0:
389     msg = 'WARNING: No sites are hosting any part of data for block:\n '
390     virgola = ""
391     if len(bloskNoSite) > 1:
392     virgola = ","
393     for block in bloskNoSite:
394     msg += ' ' + str(block) + virgola
395 spiga 1.25 msg += '\n\t\tRelated jobs:\n '
396 spiga 1.1 virgola = ""
397     if len(noSiteBlock) > 1:
398     virgola = ","
399     for range_jobs in noSiteBlock:
400     msg += str(range_jobs) + virgola
401 spiga 1.25 msg += '\n\t\twill not be submitted and this block of data can not be analyzed!\n'
402 spiga 1.14 if self.cfg_params.has_key('GRID.se_white_list'):
403 spiga 1.25 msg += '\tWARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
404     msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
405     msg += '\tPlease check if the dataset is available at this site!)'
406 spiga 1.14 if self.cfg_params.has_key('GRID.ce_white_list'):
407 spiga 1.25 msg += '\tWARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
408     msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
409     msg += '\tPlease check if the dataset is available at this site!)\n'
410 spiga 1.1
411 spiga 1.13 common.logger.info(msg)
412 spiga 1.1
413 spiga 1.10 if bloskNoSite == allBlock:
414 ewv 1.22 raise CrabException('No jobs created')
415 spiga 1.10
416 spiga 1.1 return
417    
418    
419     ########################################################################
420 ewv 1.22 def jobSplittingByRun(self):
421 spiga 1.1 """
422     """
423    
424     self.checkUserSettings()
425 ewv 1.22 blockSites = self.args['blockSites']
426 spiga 1.4 pubdata = self.args['pubdata']
427 spiga 1.1
428     if self.selectNumberOfJobs == 0 :
429     self.theNumberOfJobs = 9999999
430     blocks = {}
431 ewv 1.22 runList = []
432 spiga 1.1 thefiles = Fileset(name='FilesToSplit')
433 spiga 1.3 fileList = pubdata.getListFiles()
434 spiga 1.1 for f in fileList:
435     block = f['Block']['Name']
436 ewv 1.22 try:
437 spiga 1.3 f['Block']['StorageElementList'].extend(blockSites[block])
438 spiga 1.1 except:
439     continue
440     wmbsFile = File(f['LogicalFileName'])
441 spiga 1.3 [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
442 spiga 1.1 wmbsFile['block'] = block
443     runNum = f['RunsList'][0]['RunNumber']
444 ewv 1.22 runList.append(runNum)
445 spiga 1.1 myRun = Run(runNumber=runNum)
446     wmbsFile.addRun( myRun )
447     thefiles.addFile(
448     wmbsFile
449     )
450 ewv 1.22
451 spiga 1.1 work = Workflow()
452     subs = Subscription(
453     fileset = thefiles,
454     workflow = work,
455     split_algo = 'RunBased',
456     type = "Processing")
457     splitter = SplitterFactory()
458     jobfactory = splitter(subs)
459 ewv 1.22
460     #loop over all runs
461 spiga 1.1 set = Set(runList)
462     list_of_lists = []
463     jobDestination = []
464     count = 0
465 spiga 1.17 for jobGroup in jobfactory():
466 spiga 1.1 if count < self.theNumberOfJobs:
467 spiga 1.17 res = self.getJobInfo(jobGroup)
468 ewv 1.22 parString = ''
469 spiga 1.1 for file in res['lfns']:
470 spiga 1.11 parString += file + ','
471     fullString = parString[:-1]
472 ewv 1.22 list_of_lists.append([fullString,str(-1),str(0)])
473 spiga 1.2 #need to check single file location
474 ewv 1.22 jobDestination.append(res['locations'])
475 spiga 1.1 count +=1
476     # prepare dict output
477     dictOut = {}
478 spiga 1.11 dictOut['params']= ['InputFiles','MaxEvents','SkipEvents']
479 spiga 1.1 dictOut['args'] = list_of_lists
480     dictOut['jobDestination'] = jobDestination
481     dictOut['njobs']=count
482    
483     return dictOut
484    
485     def getJobInfo( self,jobGroup ):
486     res = {}
487 ewv 1.22 lfns = []
488     locations = []
489 spiga 1.1 tmp_check=0
490     for job in jobGroup.jobs:
491     for file in job.getFiles():
492 ewv 1.22 lfns.append(file['lfn'])
493 spiga 1.1 for loc in file['locations']:
494     if tmp_check < 1 :
495     locations.append(loc)
496 ewv 1.22 tmp_check = tmp_check + 1
497     ### qui va messo il check per la locations
498     res['lfns'] = lfns
499     res['locations'] = locations
500     return res
501    
502 spiga 1.1 ########################################################################
503 spiga 1.23 def prepareSplittingNoInput(self):
504 spiga 1.1 """
505     """
506     if (self.selectEventsPerJob):
507 spiga 1.13 common.logger.info('Required '+str(self.eventsPerJob)+' events per job ')
508 spiga 1.1 if (self.selectNumberOfJobs):
509 spiga 1.13 common.logger.info('Required '+str(self.theNumberOfJobs)+' jobs in total ')
510 spiga 1.1 if (self.selectTotalNumberEvents):
511 spiga 1.13 common.logger.info('Required '+str(self.total_number_of_events)+' events in total ')
512 spiga 1.1
513     if (self.total_number_of_events < 0):
514     msg='Cannot split jobs per Events with "-1" as total number of events'
515     raise CrabException(msg)
516    
517     if (self.selectEventsPerJob):
518     if (self.selectTotalNumberEvents):
519     self.total_number_of_jobs = int(self.total_number_of_events/self.eventsPerJob)
520     elif(self.selectNumberOfJobs) :
521     self.total_number_of_jobs =self.theNumberOfJobs
522     self.total_number_of_events =int(self.theNumberOfJobs*self.eventsPerJob)
523    
524     elif (self.selectNumberOfJobs) :
525     self.total_number_of_jobs = self.theNumberOfJobs
526     self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
527    
528 spiga 1.23
529     def jobSplittingNoInput(self):
530     """
531     Perform job splitting based on number of event per job
532     """
533     common.logger.debug('Splitting per events')
534     self.checkUserSettings()
535     jobDestination=[]
536     if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
537     msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
538     raise CrabException(msg)
539    
540     managedGenerators =self.args['managedGenerators']
541     generator = self.args['generator']
542 ewv 1.30 firstLumi = self.cfg_params.get('CMSSW.first_lumi', 1)
543 spiga 1.23
544     self.prepareSplittingNoInput()
545    
546 spiga 1.13 common.logger.debug('N jobs '+str(self.total_number_of_jobs))
547 spiga 1.1
548     # is there any remainder?
549     check = int(self.total_number_of_events) - (int(self.total_number_of_jobs)*self.eventsPerJob)
550    
551 spiga 1.13 common.logger.debug('Check '+str(check))
552 spiga 1.1
553 spiga 1.13 common.logger.info(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
554 spiga 1.1 if check > 0:
555 spiga 1.13 common.logger.info('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
556 spiga 1.1
557     # argument is seed number.$i
558     self.list_of_args = []
559     for i in range(self.total_number_of_jobs):
560     ## Since there is no input, any site is good
561 ewv 1.29 jobDestination.append([""]) # must be empty to correctly write the XML
562 spiga 1.1 args=[]
563 ewv 1.30 if (firstLumi): # Pythia first lumi
564     args.append(str(int(firstLumi)+i))
565 spiga 1.3 if (generator in managedGenerators):
566 ewv 1.22 args.append(generator)
567     if (generator == 'comphep' and i == 0):
568 spiga 1.1 # COMPHEP is brain-dead and wants event #'s like 1,100,200,300
569     args.append('1')
570 ewv 1.22 else:
571 spiga 1.1 args.append(str(i*self.eventsPerJob))
572 spiga 1.7 args.append(str(self.eventsPerJob))
573 spiga 1.1 self.list_of_args.append(args)
574     # prepare dict output
575 spiga 1.11
576 spiga 1.1 dictOut = {}
577 spiga 1.11 dictOut['params'] = ['MaxEvents']
578 ewv 1.30 if (firstLumi):
579     dictOut['params'] = ['FirstLumi','MaxEvents']
580     if (generator in managedGenerators):
581     dictOut['params'] = ['FirstLumi', 'Generator', 'FirstEvent', 'MaxEvents']
582 ewv 1.22 else:
583     if (generator in managedGenerators) :
584     dictOut['params'] = ['Generator', 'FirstEvent', 'MaxEvents']
585 spiga 1.1 dictOut['args'] = self.list_of_args
586 spiga 1.3 dictOut['jobDestination'] = jobDestination
587 spiga 1.1 dictOut['njobs']=self.total_number_of_jobs
588    
589     return dictOut
590    
591    
592     def jobSplittingForScript(self):
593     """
594     Perform job splitting based on number of job
595     """
596     self.checkUserSettings()
597 spiga 1.3 if (self.selectNumberOfJobs == 0):
598 spiga 1.1 msg = 'must specify number_of_jobs.'
599     raise crabexception(msg)
600 spiga 1.3 jobDestination = []
601 spiga 1.13 common.logger.debug('Splitting per job')
602     common.logger.info('Required '+str(self.theNumberOfJobs)+' jobs in total ')
603 spiga 1.1
604 spiga 1.23 # self.total_number_of_jobs = self.theNumberOfJobs
605    
606     self.prepareSplittingNoInput()
607 spiga 1.1
608 spiga 1.13 common.logger.debug('N jobs '+str(self.total_number_of_jobs))
609 spiga 1.1
610 spiga 1.13 common.logger.info(str(self.total_number_of_jobs)+' jobs can be created')
611 spiga 1.1
612     # argument is seed number.$i
613 spiga 1.23 self.list_of_args = []
614 spiga 1.1 for i in range(self.total_number_of_jobs):
615 spiga 1.23 args=[]
616 spiga 1.3 jobDestination.append([""])
617 spiga 1.23 if self.eventsPerJob != 0 :
618     args.append(str(self.eventsPerJob))
619     self.list_of_args.append(args)
620 spiga 1.1
621     # prepare dict output
622     dictOut = {}
623 spiga 1.23 dictOut['params'] = ['MaxEvents']
624     dictOut['args'] = self.list_of_args
625 spiga 1.3 dictOut['jobDestination'] = jobDestination
626 spiga 1.1 dictOut['njobs']=self.total_number_of_jobs
627     return dictOut
628    
629 ewv 1.22
630     def jobSplittingByLumi(self):
631 spiga 1.1 """
632 ewv 1.26 Split task into jobs by Lumi section paying attention to which
633     lumis should be run (according to the analysis dataset).
634     This uses WMBS job splitting which does not split files over jobs
635     so the job will have AT LEAST as many lumis as requested, perhaps
636     more
637 spiga 1.1 """
638 ewv 1.26
639     common.logger.debug('Splitting by Lumi')
640 ewv 1.27 self.checkLumiSettings()
641 ewv 1.26
642     blockSites = self.args['blockSites']
643     pubdata = self.args['pubdata']
644    
645     lumisPerFile = pubdata.getLumis()
646    
647     # Make the list of WMBS files for job splitter
648     fileList = pubdata.getListFiles()
649     thefiles = Fileset(name='FilesToSplit')
650     for jobFile in fileList:
651     block = jobFile['Block']['Name']
652     try:
653     jobFile['Block']['StorageElementList'].extend(blockSites[block])
654     except:
655     continue
656     wmbsFile = File(jobFile['LogicalFileName'])
657     [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
658     wmbsFile['block'] = block
659     for lumi in lumisPerFile[jobFile['LogicalFileName']]:
660     wmbsFile.addRun(Run(lumi[0], lumi[1]))
661     thefiles.addFile(wmbsFile)
662    
663 ewv 1.27 # Create the factory and workflow
664 ewv 1.26 work = Workflow()
665     subs = Subscription(fileset = thefiles, workflow = work,
666     split_algo = 'LumiBased', type = "Processing")
667     splitter = SplitterFactory()
668     jobFactory = splitter(subs)
669    
670     list_of_lists = []
671     jobDestination = []
672     jobCount = 0
673 ewv 1.27 lumisCreated = 0
674    
675     if not self.limitJobLumis:
676     self.lumisPerJob = pubdata.getMaxLumis() // self.theNumberOfJobs + 1
677     common.logger.info('Each job will process about %s lumis.' %
678     self.lumisPerJob)
679    
680     for jobGroup in jobFactory(lumis_per_job = self.lumisPerJob):
681 ewv 1.26 for job in jobGroup.jobs:
682 ewv 1.27 if (self.limitNJobs and jobCount >= self.theNumberOfJobs):
683     common.logger.info('Limit on number of jobs reached.')
684     break
685     if (self.limitTotalLumis and lumisCreated >= self.totalNLumis):
686     common.logger.info('Limit on number of lumis reached.')
687     break
688     lumis = []
689     lfns = []
690     locations = []
691     firstFile = True
692     # Collect information from all the files
693     for jobFile in job.getFiles():
694     if firstFile: # Get locations from first file in the job
695     for loc in jobFile['locations']:
696     locations.append(loc)
697     firstFile = False
698     # Accumulate Lumis from all files
699     for lumiList in jobFile['runs']:
700     theRun = lumiList.run
701     for theLumi in list(lumiList):
702     lumis.append( (theRun, theLumi) )
703    
704     lfns.append(jobFile['lfn'])
705     fileString = ','.join(lfns)
706     lumiString = compressLumiString(lumis)
707     list_of_lists.append([fileString, str(-1), str(0), lumiString])
708    
709     jobDestination.append(locations)
710     jobCount += 1
711     lumisCreated += len(lumis)
712     common.logger.debug('Job %s will run on %s files and %s lumis '
713     % (jobCount, len(lfns), len(lumis) ))
714    
715     common.logger.info('%s jobs created to run on %s lumis' %
716     (jobCount, lumisCreated))
717 ewv 1.26
718     # Prepare dict output matching back to non-WMBS job creation
719     dictOut = {}
720     dictOut['params'] = ['InputFiles', 'MaxEvents', 'SkipEvents', 'Lumis']
721     dictOut['args'] = list_of_lists
722     dictOut['jobDestination'] = jobDestination
723     dictOut['njobs'] = jobCount
724    
725     return dictOut
726    
727    
728 spiga 1.1 def Algos(self):
729     """
730     Define key splittingType matrix
731     """
732 ewv 1.22 SplitAlogs = {
733     'EventBased' : self.jobSplittingByEvent,
734 spiga 1.1 'RunBased' : self.jobSplittingByRun,
735 ewv 1.22 'LumiBased' : self.jobSplittingByLumi,
736     'NoInput' : self.jobSplittingNoInput,
737 spiga 1.1 'ForScript' : self.jobSplittingForScript
738 ewv 1.22 }
739 spiga 1.1 return SplitAlogs
740    
741 ewv 1.26
742    
743     def compressLumiString(lumis):
744     """
745     Turn a list of 2-tuples of run/lumi numbers into a list of the format
746     R1:L1,R2:L2-R3:L3 which is acceptable to CMSSW LumiBlockRange variable
747     """
748    
749     lumis.sort()
750     parts = []
751     startRange = None
752     endRange = None
753    
754     for lumiBlock in lumis:
755     if not startRange: # This is the first one
756     startRange = lumiBlock
757     endRange = lumiBlock
758     elif lumiBlock == endRange: # Same Lumi (different files?)
759     pass
760     elif lumiBlock[0] == endRange[0] and lumiBlock[1] == endRange[1] + 1: # This is a continuation
761     endRange = lumiBlock
762     else: # This is the start of a new range
763     part = ':'.join(map(str, startRange))
764     if startRange != endRange:
765     part += '-' + ':'.join(map(str, endRange))
766     parts.append(part)
767     startRange = lumiBlock
768     endRange = lumiBlock
769    
770     # Put out what's left
771     if startRange:
772     part = ':'.join(map(str, startRange))
773     if startRange != endRange:
774     part += '-' + ':'.join(map(str, endRange))
775     parts.append(part)
776    
777     output = ','.join(parts)
778     return output
779    
780