ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Splitter.py
Revision: 1.27
Committed: Thu Jul 30 18:45:44 2009 UTC (15 years, 9 months ago) by ewv
Content type: text/x-python
Branch: MAIN
Changes since 1.26: +87 -30 lines
Log Message:
Add config parameters for lumi-based splitting

File Contents

# User Rev Content
1 ewv 1.27
2     __revision__ = "$Id: writeCfg.py,v 1.22 2009/07/29 21:20:03 ewv Exp $"
3     __version__ = "$Revision: 1.22 $"
4    
5 spiga 1.1 import common
6 ewv 1.26 from sets import Set
7 spiga 1.1 from crab_exceptions import *
8     from crab_util import *
9 ewv 1.26
10     from WMCore.DataStructs.File import File
11     from WMCore.DataStructs.Fileset import Fileset
12     from WMCore.DataStructs.Run import Run
13     from WMCore.DataStructs.Subscription import Subscription
14     from WMCore.DataStructs.Workflow import Workflow
15     from WMCore.JobSplitting.SplitterFactory import SplitterFactory
16 spiga 1.1 from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
17    
18     class JobSplitter:
19     def __init__( self, cfg_params, args ):
20     self.cfg_params = cfg_params
21 spiga 1.3 self.args=args
22 ewv 1.27
23     self.lumisPerJob = -1
24     self.totalNLumis = 0
25     self.theNumberOfJobs = 0
26     self.limitNJobs = False
27     self.limitTotalLumis = False
28     self.limitJobLumis = False
29    
30 spiga 1.1 #self.maxEvents
31     # init BlackWhiteListParser
32 spiga 1.24 self.seWhiteList = cfg_params.get('GRID.se_white_list',[])
33 spiga 1.14 seBlackList = cfg_params.get('GRID.se_black_list',[])
34 spiga 1.24 self.blackWhiteListParser = SEBlackWhiteListParser(self.seWhiteList, seBlackList, common.logger())
35 spiga 1.1
36    
37     def checkUserSettings(self):
38     ## Events per job
39     if self.cfg_params.has_key('CMSSW.events_per_job'):
40     self.eventsPerJob =int( self.cfg_params['CMSSW.events_per_job'])
41     self.selectEventsPerJob = 1
42     else:
43     self.eventsPerJob = -1
44     self.selectEventsPerJob = 0
45    
46     ## number of jobs
47     if self.cfg_params.has_key('CMSSW.number_of_jobs'):
48     self.theNumberOfJobs =int( self.cfg_params['CMSSW.number_of_jobs'])
49     self.selectNumberOfJobs = 1
50     else:
51     self.theNumberOfJobs = 0
52     self.selectNumberOfJobs = 0
53    
54     if self.cfg_params.has_key('CMSSW.total_number_of_events'):
55     self.total_number_of_events = int(self.cfg_params['CMSSW.total_number_of_events'])
56     self.selectTotalNumberEvents = 1
57     if self.selectNumberOfJobs == 1:
58     if (self.total_number_of_events != -1) and int(self.total_number_of_events) < int(self.theNumberOfJobs):
59     msg = 'Must specify at least one event per job. total_number_of_events > number_of_jobs '
60     raise CrabException(msg)
61     else:
62     self.total_number_of_events = 0
63     self.selectTotalNumberEvents = 0
64    
65    
66 ewv 1.27 def checkLumiSettings(self):
67     """
68     Check to make sure the user has specified enough information to
69     perform splitting by Lumis to run the job
70     """
71     settings = 0
72     if self.cfg_params.has_key('CMSSW.lumis_per_job'):
73     self.lumisPerJob =int( self.cfg_params['CMSSW.lumis_per_job'])
74     self.limitJobLumis = True
75     settings += 1
76    
77     if self.cfg_params.has_key('CMSSW.number_of_jobs'):
78     self.theNumberOfJobs =int( self.cfg_params['CMSSW.number_of_jobs'])
79     self.limitNJobs = True
80     settings += 1
81    
82     if self.cfg_params.has_key('CMSSW.total_number_of_lumis'):
83     self.totalNLumis = int(self.cfg_params['CMSSW.total_number_of_lumis'])
84     self.limitTotalLumis = (self.totalNLumis != -1)
85     settings += 1
86    
87     if settings != 2:
88     msg = 'When running on analysis datasets you must specify two and only two of:\n'
89     msg = ' number_of_jobs, lumis_per_job, total_number_of_lumis'
90     raise CrabException(msg)
91     if self.limitNJobs and self.limitJobLumis:
92     self.limitTotalLumis = True
93     self.totalNLumis = self.lumisPerJob * self.theNumberOfJobs
94    
95    
96 spiga 1.24 def ComputeSubBlockSites( self, blockSites ):
97     """
98     """
99     sub_blockSites = {}
100     for k,v in blockSites.iteritems():
101     sites=self.blackWhiteListParser.checkWhiteList(v)
102     if sites : sub_blockSites[k]=v
103     if len(sub_blockSites) < 1:
104     msg = 'WARNING: the sites %s is not hosting any part of data.'%self.seWhiteList
105     raise CrabException(msg)
106     return sub_blockSites
107    
108 spiga 1.1 ########################################################################
109     def jobSplittingByEvent( self ):
110     """
111     Perform job splitting. Jobs run over an integer number of files
112     and no more than one block.
113     ARGUMENT: blockSites: dictionary with blocks as keys and list of host sites as values
114     REQUIRES: self.selectTotalNumberEvents, self.selectEventsPerJob, self.selectNumberofJobs,
115     self.total_number_of_events, self.eventsPerJob, self.theNumberOfJobs,
116     self.maxEvents, self.filesbyblock
117 spiga 1.3 SETS: jobDestination - Site destination(s) for each job (a list of lists)
118 spiga 1.1 self.total_number_of_jobs - Total # of jobs
119     self.list_of_args - File(s) job will run on (a list of lists)
120     """
121    
122 ewv 1.22 jobDestination=[]
123 spiga 1.1 self.checkUserSettings()
124     if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
125     msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
126     raise CrabException(msg)
127 ewv 1.22
128     blockSites = self.args['blockSites']
129 spiga 1.4 pubdata = self.args['pubdata']
130 spiga 1.3 filesbyblock=pubdata.getFiles()
131    
132     self.eventsbyblock=pubdata.getEventsPerBlock()
133     self.eventsbyfile=pubdata.getEventsPerFile()
134     self.parentFiles=pubdata.getParent()
135 spiga 1.1
136     ## get max number of events
137 spiga 1.3 self.maxEvents=pubdata.getMaxEvents()
138 spiga 1.1
139     self.useParent = int(self.cfg_params.get('CMSSW.use_parent',0))
140     noBboundary = int(self.cfg_params.get('CMSSW.no_block_boundary',0))
141    
142 spiga 1.24 if noBboundary == 1:
143     if self.total_number_of_events== -1:
144     msg = 'You are selecting no_block_boundary=1 which does not allow to set total_number_of_events=-1\n'
145 ewv 1.26 msg +='\tYou shoud get the number of event from DBS web interface and use it for your configuration.'
146     raise CrabException(msg)
147 spiga 1.25 if len(self.seWhiteList) == 0 or len(self.seWhiteList.split(',')) != 1:
148 spiga 1.24 msg = 'You are selecting no_block_boundary=1 which requires to choose one and only one site.\n'
149     msg += "\tPlease set se_white_list with the site's storage element name."
150 ewv 1.26 raise CrabException(msg)
151     blockSites = self.ComputeSubBlockSites(blockSites)
152 spiga 1.24
153 spiga 1.1 # ---- Handle the possible job splitting configurations ---- #
154     if (self.selectTotalNumberEvents):
155     totalEventsRequested = self.total_number_of_events
156     if (self.selectEventsPerJob):
157     eventsPerJobRequested = self.eventsPerJob
158     if (self.selectNumberOfJobs):
159     totalEventsRequested = self.theNumberOfJobs * self.eventsPerJob
160    
161     # If user requested all the events in the dataset
162     if (totalEventsRequested == -1):
163     eventsRemaining=self.maxEvents
164     # If user requested more events than are in the dataset
165     elif (totalEventsRequested > self.maxEvents):
166     eventsRemaining = self.maxEvents
167 spiga 1.13 common.logger.info("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
168 spiga 1.1 # If user requested less events than are in the dataset
169     else:
170     eventsRemaining = totalEventsRequested
171    
172     # If user requested more events per job than are in the dataset
173     if (self.selectEventsPerJob and eventsPerJobRequested > self.maxEvents):
174     eventsPerJobRequested = self.maxEvents
175    
176     # For user info at end
177     totalEventCount = 0
178    
179     if (self.selectTotalNumberEvents and self.selectNumberOfJobs):
180     eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs)
181    
182     if (self.selectNumberOfJobs):
183 spiga 1.13 common.logger.info("May not create the exact number_of_jobs requested.")
184 spiga 1.1
185     # old... to remove Daniele
186     totalNumberOfJobs = 999999999
187    
188 spiga 1.3 blocks = blockSites.keys()
189 spiga 1.1 blockCount = 0
190     # Backup variable in case self.maxEvents counted events in a non-included block
191     numBlocksInDataset = len(blocks)
192    
193     jobCount = 0
194     list_of_lists = []
195    
196     # list tracking which jobs are in which jobs belong to which block
197     jobsOfBlock = {}
198    
199     parString = ""
200 spiga 1.16 pString = ""
201 spiga 1.1 filesEventCount = 0
202 ewv 1.22 msg=''
203 spiga 1.1
204     # ---- Iterate over the blocks in the dataset until ---- #
205     # ---- we've met the requested total # of events ---- #
206     while ( (eventsRemaining > 0) and (blockCount < numBlocksInDataset) and (jobCount < totalNumberOfJobs)):
207     block = blocks[blockCount]
208     blockCount += 1
209     if block not in jobsOfBlock.keys() :
210     jobsOfBlock[block] = []
211    
212     if self.eventsbyblock.has_key(block) :
213     numEventsInBlock = self.eventsbyblock[block]
214 spiga 1.13 common.logger.debug('Events in Block File '+str(numEventsInBlock))
215 spiga 1.1
216 spiga 1.4 files = filesbyblock[block]
217 spiga 1.1 numFilesInBlock = len(files)
218     if (numFilesInBlock <= 0):
219     continue
220     fileCount = 0
221     if noBboundary == 0: # DD
222     # ---- New block => New job ---- #
223     parString = ""
224 spiga 1.16 pString=""
225 spiga 1.1 # counter for number of events in files currently worked on
226     filesEventCount = 0
227     # flag if next while loop should touch new file
228     newFile = 1
229     # job event counter
230     jobSkipEventCount = 0
231    
232     # ---- Iterate over the files in the block until we've met the requested ---- #
233     # ---- total # of events or we've gone over all the files in this block ---- #
234 spiga 1.15 msg='\n'
235 spiga 1.1 while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
236     file = files[fileCount]
237     if self.useParent==1:
238     parent = self.parentFiles[file]
239 spiga 1.13 common.logger.log(10-1, "File "+str(file)+" has the following parents: "+str(parent))
240 spiga 1.1 if newFile :
241     try:
242     numEventsInFile = self.eventsbyfile[file]
243 spiga 1.13 common.logger.log(10-1, "File "+str(file)+" has "+str(numEventsInFile)+" events")
244 spiga 1.1 # increase filesEventCount
245     filesEventCount += numEventsInFile
246     # Add file to current job
247 spiga 1.11 parString += file + ','
248 spiga 1.16 if self.useParent==1:
249     for f in parent :
250     pString += f + ','
251 spiga 1.1 newFile = 0
252     except KeyError:
253 spiga 1.13 common.logger.info("File "+str(file)+" has unknown number of events: skipping")
254 spiga 1.1
255     eventsPerJobRequested = min(eventsPerJobRequested, eventsRemaining)
256     # if less events in file remain than eventsPerJobRequested
257     if ( filesEventCount - jobSkipEventCount < eventsPerJobRequested):
258     if noBboundary == 1: ## DD
259     newFile = 1
260     fileCount += 1
261     else:
262     # if last file in block
263     if ( fileCount == numFilesInBlock-1 ) :
264     # end job using last file, use remaining events in block
265     # close job and touch new file
266 spiga 1.11 fullString = parString[:-1]
267 spiga 1.1 if self.useParent==1:
268 spiga 1.11 fullParentString = pString[:-1]
269 spiga 1.1 list_of_lists.append([fullString,fullParentString,str(-1),str(jobSkipEventCount)])
270     else:
271     list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
272 spiga 1.15 msg += "Job %s can run over %s events (last file in block).\n"%(str(jobCount+1), str(filesEventCount - jobSkipEventCount))
273 spiga 1.3 jobDestination.append(blockSites[block])
274 slacapra 1.20 msg += "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
275 spiga 1.1 # fill jobs of block dictionary
276     jobsOfBlock[block].append(jobCount+1)
277     # reset counter
278     jobCount = jobCount + 1
279     totalEventCount = totalEventCount + filesEventCount - jobSkipEventCount
280     eventsRemaining = eventsRemaining - filesEventCount + jobSkipEventCount
281     jobSkipEventCount = 0
282     # reset file
283     pString = ""
284     parString = ""
285     filesEventCount = 0
286     newFile = 1
287     fileCount += 1
288     else :
289     # go to next file
290     newFile = 1
291     fileCount += 1
292     # if events in file equal to eventsPerJobRequested
293     elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
294     # close job and touch new file
295 spiga 1.11 fullString = parString[:-1]
296 spiga 1.1 if self.useParent==1:
297 spiga 1.11 fullParentString = pString[:-1]
298 spiga 1.1 list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
299     else:
300     list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
301 spiga 1.15 msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
302 spiga 1.3 jobDestination.append(blockSites[block])
303 slacapra 1.20 msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
304 spiga 1.1 jobsOfBlock[block].append(jobCount+1)
305     # reset counter
306     jobCount = jobCount + 1
307     totalEventCount = totalEventCount + eventsPerJobRequested
308     eventsRemaining = eventsRemaining - eventsPerJobRequested
309     jobSkipEventCount = 0
310     # reset file
311     pString = ""
312     parString = ""
313     filesEventCount = 0
314     newFile = 1
315     fileCount += 1
316    
317     # if more events in file remain than eventsPerJobRequested
318     else :
319     # close job but don't touch new file
320 spiga 1.11 fullString = parString[:-1]
321 spiga 1.1 if self.useParent==1:
322 spiga 1.11 fullParentString = pString[:-1]
323 spiga 1.1 list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
324     else:
325     list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
326 spiga 1.15 msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
327 spiga 1.3 jobDestination.append(blockSites[block])
328 slacapra 1.20 msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
329 spiga 1.1 jobsOfBlock[block].append(jobCount+1)
330     # increase counter
331     jobCount = jobCount + 1
332     totalEventCount = totalEventCount + eventsPerJobRequested
333     eventsRemaining = eventsRemaining - eventsPerJobRequested
334     # calculate skip events for last file
335     # use filesEventCount (contains several files), jobSkipEventCount and eventsPerJobRequest
336     jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
337     # remove all but the last file
338     filesEventCount = self.eventsbyfile[file]
339 spiga 1.16 pString_tmp=''
340 spiga 1.1 if self.useParent==1:
341 spiga 1.16 for f in parent : pString_tmp += f + ','
342 ewv 1.22 pString = pString_tmp
343 spiga 1.11 parString = file + ','
344 spiga 1.1 pass # END if
345     pass # END while (iterate over files in the block)
346     pass # END while (iterate over blocks in the dataset)
347 spiga 1.15 common.logger.debug(msg)
348 spiga 1.1 self.ncjobs = self.total_number_of_jobs = jobCount
349     if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
350 spiga 1.13 common.logger.info("Could not run on all requested events because some blocks not hosted at allowed sites.")
351     common.logger.info(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
352 ewv 1.22
353 spiga 1.1 # skip check on block with no sites DD
354 spiga 1.5 if noBboundary == 0 : self.checkBlockNoSite(blocks,jobsOfBlock,blockSites)
355 spiga 1.1
356     # prepare dict output
357     dictOut = {}
358 spiga 1.11 dictOut['params']= ['InputFiles','MaxEvents','SkipEvents']
359     if self.useParent: dictOut['params']= ['InputFiles','ParentFiles','MaxEvents','SkipEvents']
360 spiga 1.1 dictOut['args'] = list_of_lists
361 spiga 1.3 dictOut['jobDestination'] = jobDestination
362 spiga 1.1 dictOut['njobs']=self.total_number_of_jobs
363    
364     return dictOut
365    
366     # keep trace of block with no sites to print a warning at the end
367    
368 ewv 1.22 def checkBlockNoSite(self,blocks,jobsOfBlock,blockSites):
369 spiga 1.1 # screen output
370     screenOutput = "List of jobs and available destination sites:\n\n"
371     noSiteBlock = []
372     bloskNoSite = []
373 spiga 1.10 allBlock = []
374 spiga 1.1
375     blockCounter = 0
376     for block in blocks:
377     if block in jobsOfBlock.keys() :
378     blockCounter += 1
379 spiga 1.10 allBlock.append( blockCounter )
380 slacapra 1.19 sites=self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],[block]),[block])
381 spiga 1.1 screenOutput += "Block %5i: jobs %20s: sites: %s\n" % (blockCounter,spanRanges(jobsOfBlock[block]),
382 slacapra 1.20 ', '.join(SE2CMS(sites)))
383 slacapra 1.19 if len(sites) == 0:
384 spiga 1.1 noSiteBlock.append( spanRanges(jobsOfBlock[block]) )
385     bloskNoSite.append( blockCounter )
386    
387 spiga 1.13 common.logger.info(screenOutput)
388 spiga 1.1 if len(noSiteBlock) > 0 and len(bloskNoSite) > 0:
389     msg = 'WARNING: No sites are hosting any part of data for block:\n '
390     virgola = ""
391     if len(bloskNoSite) > 1:
392     virgola = ","
393     for block in bloskNoSite:
394     msg += ' ' + str(block) + virgola
395 spiga 1.25 msg += '\n\t\tRelated jobs:\n '
396 spiga 1.1 virgola = ""
397     if len(noSiteBlock) > 1:
398     virgola = ","
399     for range_jobs in noSiteBlock:
400     msg += str(range_jobs) + virgola
401 spiga 1.25 msg += '\n\t\twill not be submitted and this block of data can not be analyzed!\n'
402 spiga 1.14 if self.cfg_params.has_key('GRID.se_white_list'):
403 spiga 1.25 msg += '\tWARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
404     msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
405     msg += '\tPlease check if the dataset is available at this site!)'
406 spiga 1.14 if self.cfg_params.has_key('GRID.ce_white_list'):
407 spiga 1.25 msg += '\tWARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
408     msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
409     msg += '\tPlease check if the dataset is available at this site!)\n'
410 spiga 1.1
411 spiga 1.13 common.logger.info(msg)
412 spiga 1.1
413 spiga 1.10 if bloskNoSite == allBlock:
414 ewv 1.22 raise CrabException('No jobs created')
415 spiga 1.10
416 spiga 1.1 return
417    
418    
419     ########################################################################
420 ewv 1.22 def jobSplittingByRun(self):
421 spiga 1.1 """
422     """
423    
424     self.checkUserSettings()
425 ewv 1.22 blockSites = self.args['blockSites']
426 spiga 1.4 pubdata = self.args['pubdata']
427 spiga 1.1
428     if self.selectNumberOfJobs == 0 :
429     self.theNumberOfJobs = 9999999
430     blocks = {}
431 ewv 1.22 runList = []
432 spiga 1.1 thefiles = Fileset(name='FilesToSplit')
433 spiga 1.3 fileList = pubdata.getListFiles()
434 spiga 1.1 for f in fileList:
435     block = f['Block']['Name']
436 ewv 1.22 try:
437 spiga 1.3 f['Block']['StorageElementList'].extend(blockSites[block])
438 spiga 1.1 except:
439     continue
440     wmbsFile = File(f['LogicalFileName'])
441 spiga 1.3 [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
442 spiga 1.1 wmbsFile['block'] = block
443     runNum = f['RunsList'][0]['RunNumber']
444 ewv 1.22 runList.append(runNum)
445 spiga 1.1 myRun = Run(runNumber=runNum)
446     wmbsFile.addRun( myRun )
447     thefiles.addFile(
448     wmbsFile
449     )
450 ewv 1.22
451 spiga 1.1 work = Workflow()
452     subs = Subscription(
453     fileset = thefiles,
454     workflow = work,
455     split_algo = 'RunBased',
456     type = "Processing")
457     splitter = SplitterFactory()
458     jobfactory = splitter(subs)
459 ewv 1.22
460     #loop over all runs
461 spiga 1.1 set = Set(runList)
462     list_of_lists = []
463     jobDestination = []
464     count = 0
465 spiga 1.17 for jobGroup in jobfactory():
466 spiga 1.1 if count < self.theNumberOfJobs:
467 spiga 1.17 res = self.getJobInfo(jobGroup)
468 ewv 1.22 parString = ''
469 spiga 1.1 for file in res['lfns']:
470 spiga 1.11 parString += file + ','
471     fullString = parString[:-1]
472 ewv 1.22 list_of_lists.append([fullString,str(-1),str(0)])
473 spiga 1.2 #need to check single file location
474 ewv 1.22 jobDestination.append(res['locations'])
475 spiga 1.1 count +=1
476     # prepare dict output
477     dictOut = {}
478 spiga 1.11 dictOut['params']= ['InputFiles','MaxEvents','SkipEvents']
479 spiga 1.1 dictOut['args'] = list_of_lists
480     dictOut['jobDestination'] = jobDestination
481     dictOut['njobs']=count
482    
483     return dictOut
484    
485     def getJobInfo( self,jobGroup ):
486     res = {}
487 ewv 1.22 lfns = []
488     locations = []
489 spiga 1.1 tmp_check=0
490     for job in jobGroup.jobs:
491     for file in job.getFiles():
492 ewv 1.22 lfns.append(file['lfn'])
493 spiga 1.1 for loc in file['locations']:
494     if tmp_check < 1 :
495     locations.append(loc)
496 ewv 1.22 tmp_check = tmp_check + 1
497     ### qui va messo il check per la locations
498     res['lfns'] = lfns
499     res['locations'] = locations
500     return res
501    
502 spiga 1.1 ########################################################################
503 spiga 1.23 def prepareSplittingNoInput(self):
504 spiga 1.1 """
505     """
506     if (self.selectEventsPerJob):
507 spiga 1.13 common.logger.info('Required '+str(self.eventsPerJob)+' events per job ')
508 spiga 1.1 if (self.selectNumberOfJobs):
509 spiga 1.13 common.logger.info('Required '+str(self.theNumberOfJobs)+' jobs in total ')
510 spiga 1.1 if (self.selectTotalNumberEvents):
511 spiga 1.13 common.logger.info('Required '+str(self.total_number_of_events)+' events in total ')
512 spiga 1.1
513     if (self.total_number_of_events < 0):
514     msg='Cannot split jobs per Events with "-1" as total number of events'
515     raise CrabException(msg)
516    
517     if (self.selectEventsPerJob):
518     if (self.selectTotalNumberEvents):
519     self.total_number_of_jobs = int(self.total_number_of_events/self.eventsPerJob)
520     elif(self.selectNumberOfJobs) :
521     self.total_number_of_jobs =self.theNumberOfJobs
522     self.total_number_of_events =int(self.theNumberOfJobs*self.eventsPerJob)
523    
524     elif (self.selectNumberOfJobs) :
525     self.total_number_of_jobs = self.theNumberOfJobs
526     self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
527    
528 spiga 1.23
529     def jobSplittingNoInput(self):
530     """
531     Perform job splitting based on number of event per job
532     """
533     common.logger.debug('Splitting per events')
534     self.checkUserSettings()
535     jobDestination=[]
536     if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
537     msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
538     raise CrabException(msg)
539    
540     managedGenerators =self.args['managedGenerators']
541     generator = self.args['generator']
542     firstRun = self.cfg_params.get('CMSSW.first_run',None)
543    
544     self.prepareSplittingNoInput()
545    
546 spiga 1.13 common.logger.debug('N jobs '+str(self.total_number_of_jobs))
547 spiga 1.1
548     # is there any remainder?
549     check = int(self.total_number_of_events) - (int(self.total_number_of_jobs)*self.eventsPerJob)
550    
551 spiga 1.13 common.logger.debug('Check '+str(check))
552 spiga 1.1
553 spiga 1.13 common.logger.info(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
554 spiga 1.1 if check > 0:
555 spiga 1.13 common.logger.info('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
556 spiga 1.1
557     # argument is seed number.$i
558     self.list_of_args = []
559     for i in range(self.total_number_of_jobs):
560     ## Since there is no input, any site is good
561 spiga 1.3 jobDestination.append([""]) #must be empty to write correctly the xml
562 spiga 1.1 args=[]
563 spiga 1.3 if (firstRun):
564 spiga 1.1 ## pythia first run
565 spiga 1.3 args.append(str(firstRun)+str(i))
566     if (generator in managedGenerators):
567 ewv 1.22 args.append(generator)
568     if (generator == 'comphep' and i == 0):
569 spiga 1.1 # COMPHEP is brain-dead and wants event #'s like 1,100,200,300
570     args.append('1')
571 ewv 1.22 else:
572 spiga 1.1 args.append(str(i*self.eventsPerJob))
573 spiga 1.7 args.append(str(self.eventsPerJob))
574 spiga 1.1 self.list_of_args.append(args)
575     # prepare dict output
576 spiga 1.11
577 spiga 1.1 dictOut = {}
578 spiga 1.11 dictOut['params'] = ['MaxEvents']
579     if (firstRun):
580     dictOut['params'] = ['FirstRun','MaxEvents']
581 ewv 1.22 if ( generator in managedGenerators ) :
582     dictOut['params'] = ['FirstRun', 'Generator', 'FirstEvent', 'MaxEvents']
583     else:
584     if (generator in managedGenerators) :
585     dictOut['params'] = ['Generator', 'FirstEvent', 'MaxEvents']
586 spiga 1.1 dictOut['args'] = self.list_of_args
587 spiga 1.3 dictOut['jobDestination'] = jobDestination
588 spiga 1.1 dictOut['njobs']=self.total_number_of_jobs
589    
590     return dictOut
591    
592    
593     def jobSplittingForScript(self):
594     """
595     Perform job splitting based on number of job
596     """
597     self.checkUserSettings()
598 spiga 1.3 if (self.selectNumberOfJobs == 0):
599 spiga 1.1 msg = 'must specify number_of_jobs.'
600     raise crabexception(msg)
601 spiga 1.3 jobDestination = []
602 spiga 1.13 common.logger.debug('Splitting per job')
603     common.logger.info('Required '+str(self.theNumberOfJobs)+' jobs in total ')
604 spiga 1.1
605 spiga 1.23 # self.total_number_of_jobs = self.theNumberOfJobs
606    
607     self.prepareSplittingNoInput()
608 spiga 1.1
609 spiga 1.13 common.logger.debug('N jobs '+str(self.total_number_of_jobs))
610 spiga 1.1
611 spiga 1.13 common.logger.info(str(self.total_number_of_jobs)+' jobs can be created')
612 spiga 1.1
613     # argument is seed number.$i
614 spiga 1.23 self.list_of_args = []
615 spiga 1.1 for i in range(self.total_number_of_jobs):
616 spiga 1.23 args=[]
617 spiga 1.3 jobDestination.append([""])
618 spiga 1.23 if self.eventsPerJob != 0 :
619     args.append(str(self.eventsPerJob))
620     self.list_of_args.append(args)
621 spiga 1.1
622     # prepare dict output
623     dictOut = {}
624 spiga 1.23 dictOut['params'] = ['MaxEvents']
625     dictOut['args'] = self.list_of_args
626 spiga 1.3 dictOut['jobDestination'] = jobDestination
627 spiga 1.1 dictOut['njobs']=self.total_number_of_jobs
628     return dictOut
629    
630 ewv 1.22
631     def jobSplittingByLumi(self):
632 spiga 1.1 """
633 ewv 1.26 Split task into jobs by Lumi section paying attention to which
634     lumis should be run (according to the analysis dataset).
635     This uses WMBS job splitting which does not split files over jobs
636     so the job will have AT LEAST as many lumis as requested, perhaps
637     more
638 spiga 1.1 """
639 ewv 1.26
640     common.logger.debug('Splitting by Lumi')
641 ewv 1.27 self.checkLumiSettings()
642 ewv 1.26
643     blockSites = self.args['blockSites']
644     pubdata = self.args['pubdata']
645    
646     lumisPerFile = pubdata.getLumis()
647    
648     # Make the list of WMBS files for job splitter
649     fileList = pubdata.getListFiles()
650     thefiles = Fileset(name='FilesToSplit')
651     for jobFile in fileList:
652     block = jobFile['Block']['Name']
653     try:
654     jobFile['Block']['StorageElementList'].extend(blockSites[block])
655     except:
656     continue
657     wmbsFile = File(jobFile['LogicalFileName'])
658     [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
659     wmbsFile['block'] = block
660     for lumi in lumisPerFile[jobFile['LogicalFileName']]:
661     wmbsFile.addRun(Run(lumi[0], lumi[1]))
662     thefiles.addFile(wmbsFile)
663    
664 ewv 1.27 # Create the factory and workflow
665 ewv 1.26 work = Workflow()
666     subs = Subscription(fileset = thefiles, workflow = work,
667     split_algo = 'LumiBased', type = "Processing")
668     splitter = SplitterFactory()
669     jobFactory = splitter(subs)
670    
671     list_of_lists = []
672     jobDestination = []
673     jobCount = 0
674 ewv 1.27 lumisCreated = 0
675    
676     if not self.limitJobLumis:
677     self.lumisPerJob = pubdata.getMaxLumis() // self.theNumberOfJobs + 1
678     common.logger.info('Each job will process about %s lumis.' %
679     self.lumisPerJob)
680    
681     for jobGroup in jobFactory(lumis_per_job = self.lumisPerJob):
682 ewv 1.26 for job in jobGroup.jobs:
683 ewv 1.27 if (self.limitNJobs and jobCount >= self.theNumberOfJobs):
684     common.logger.info('Limit on number of jobs reached.')
685     break
686     if (self.limitTotalLumis and lumisCreated >= self.totalNLumis):
687     common.logger.info('Limit on number of lumis reached.')
688     break
689     lumis = []
690     lfns = []
691     locations = []
692     firstFile = True
693     # Collect information from all the files
694     for jobFile in job.getFiles():
695     if firstFile: # Get locations from first file in the job
696     for loc in jobFile['locations']:
697     locations.append(loc)
698     firstFile = False
699     # Accumulate Lumis from all files
700     for lumiList in jobFile['runs']:
701     theRun = lumiList.run
702     for theLumi in list(lumiList):
703     lumis.append( (theRun, theLumi) )
704    
705     lfns.append(jobFile['lfn'])
706     fileString = ','.join(lfns)
707     lumiString = compressLumiString(lumis)
708     list_of_lists.append([fileString, str(-1), str(0), lumiString])
709    
710     jobDestination.append(locations)
711     jobCount += 1
712     lumisCreated += len(lumis)
713     common.logger.debug('Job %s will run on %s files and %s lumis '
714     % (jobCount, len(lfns), len(lumis) ))
715    
716     common.logger.info('%s jobs created to run on %s lumis' %
717     (jobCount, lumisCreated))
718 ewv 1.26
719     # Prepare dict output matching back to non-WMBS job creation
720     dictOut = {}
721     dictOut['params'] = ['InputFiles', 'MaxEvents', 'SkipEvents', 'Lumis']
722     dictOut['args'] = list_of_lists
723     dictOut['jobDestination'] = jobDestination
724     dictOut['njobs'] = jobCount
725    
726     return dictOut
727    
728    
729 spiga 1.1 def Algos(self):
730     """
731     Define key splittingType matrix
732     """
733 ewv 1.22 SplitAlogs = {
734     'EventBased' : self.jobSplittingByEvent,
735 spiga 1.1 'RunBased' : self.jobSplittingByRun,
736 ewv 1.22 'LumiBased' : self.jobSplittingByLumi,
737     'NoInput' : self.jobSplittingNoInput,
738 spiga 1.1 'ForScript' : self.jobSplittingForScript
739 ewv 1.22 }
740 spiga 1.1 return SplitAlogs
741    
742 ewv 1.26
743    
744     def compressLumiString(lumis):
745     """
746     Turn a list of 2-tuples of run/lumi numbers into a list of the format
747     R1:L1,R2:L2-R3:L3 which is acceptable to CMSSW LumiBlockRange variable
748     """
749    
750     lumis.sort()
751     parts = []
752     startRange = None
753     endRange = None
754    
755     for lumiBlock in lumis:
756     if not startRange: # This is the first one
757     startRange = lumiBlock
758     endRange = lumiBlock
759     elif lumiBlock == endRange: # Same Lumi (different files?)
760     pass
761     elif lumiBlock[0] == endRange[0] and lumiBlock[1] == endRange[1] + 1: # This is a continuation
762     endRange = lumiBlock
763     else: # This is the start of a new range
764     part = ':'.join(map(str, startRange))
765     if startRange != endRange:
766     part += '-' + ':'.join(map(str, endRange))
767     parts.append(part)
768     startRange = lumiBlock
769     endRange = lumiBlock
770    
771     # Put out what's left
772     if startRange:
773     part = ':'.join(map(str, startRange))
774     if startRange != endRange:
775     part += '-' + ':'.join(map(str, endRange))
776     parts.append(part)
777    
778     output = ','.join(parts)
779     return output
780    
781