ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Splitter.py
Revision: 1.31
Committed: Tue Dec 15 13:19:13 2009 UTC (15 years, 4 months ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_7_1_pre2, CRAB_2_7_1_pre1
Changes since 1.30: +2 -4 lines
Log Message:
sets is deprecated ...

File Contents

# User Rev Content
1 ewv 1.27
2 spiga 1.31 __revision__ = "$Id: Splitter.py,v 1.30 2009/12/14 22:33:54 ewv Exp $"
3     __version__ = "$Revision: 1.30 $"
4 ewv 1.27
5 spiga 1.1 import common
6     from crab_exceptions import *
7     from crab_util import *
8 ewv 1.26
9     from WMCore.DataStructs.File import File
10     from WMCore.DataStructs.Fileset import Fileset
11     from WMCore.DataStructs.Run import Run
12     from WMCore.DataStructs.Subscription import Subscription
13     from WMCore.DataStructs.Workflow import Workflow
14     from WMCore.JobSplitting.SplitterFactory import SplitterFactory
15 spiga 1.1 from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
16    
17     class JobSplitter:
18     def __init__( self, cfg_params, args ):
19     self.cfg_params = cfg_params
20 spiga 1.3 self.args=args
21 ewv 1.27
22     self.lumisPerJob = -1
23     self.totalNLumis = 0
24     self.theNumberOfJobs = 0
25     self.limitNJobs = False
26     self.limitTotalLumis = False
27     self.limitJobLumis = False
28    
29 spiga 1.1 #self.maxEvents
30     # init BlackWhiteListParser
31 spiga 1.24 self.seWhiteList = cfg_params.get('GRID.se_white_list',[])
32 spiga 1.14 seBlackList = cfg_params.get('GRID.se_black_list',[])
33 spiga 1.24 self.blackWhiteListParser = SEBlackWhiteListParser(self.seWhiteList, seBlackList, common.logger())
34 spiga 1.1
35    
36     def checkUserSettings(self):
37     ## Events per job
38     if self.cfg_params.has_key('CMSSW.events_per_job'):
39     self.eventsPerJob =int( self.cfg_params['CMSSW.events_per_job'])
40     self.selectEventsPerJob = 1
41     else:
42     self.eventsPerJob = -1
43     self.selectEventsPerJob = 0
44    
45     ## number of jobs
46     if self.cfg_params.has_key('CMSSW.number_of_jobs'):
47     self.theNumberOfJobs =int( self.cfg_params['CMSSW.number_of_jobs'])
48     self.selectNumberOfJobs = 1
49     else:
50     self.theNumberOfJobs = 0
51     self.selectNumberOfJobs = 0
52    
53     if self.cfg_params.has_key('CMSSW.total_number_of_events'):
54     self.total_number_of_events = int(self.cfg_params['CMSSW.total_number_of_events'])
55     self.selectTotalNumberEvents = 1
56     if self.selectNumberOfJobs == 1:
57     if (self.total_number_of_events != -1) and int(self.total_number_of_events) < int(self.theNumberOfJobs):
58     msg = 'Must specify at least one event per job. total_number_of_events > number_of_jobs '
59     raise CrabException(msg)
60     else:
61     self.total_number_of_events = 0
62     self.selectTotalNumberEvents = 0
63    
64    
65 ewv 1.27 def checkLumiSettings(self):
66     """
67     Check to make sure the user has specified enough information to
68     perform splitting by Lumis to run the job
69     """
70     settings = 0
71     if self.cfg_params.has_key('CMSSW.lumis_per_job'):
72     self.lumisPerJob =int( self.cfg_params['CMSSW.lumis_per_job'])
73     self.limitJobLumis = True
74     settings += 1
75    
76     if self.cfg_params.has_key('CMSSW.number_of_jobs'):
77     self.theNumberOfJobs =int( self.cfg_params['CMSSW.number_of_jobs'])
78     self.limitNJobs = True
79     settings += 1
80    
81     if self.cfg_params.has_key('CMSSW.total_number_of_lumis'):
82     self.totalNLumis = int(self.cfg_params['CMSSW.total_number_of_lumis'])
83     self.limitTotalLumis = (self.totalNLumis != -1)
84     settings += 1
85    
86     if settings != 2:
87     msg = 'When running on analysis datasets you must specify two and only two of:\n'
88 ewv 1.28 msg += ' number_of_jobs, lumis_per_job, total_number_of_lumis'
89 ewv 1.27 raise CrabException(msg)
90     if self.limitNJobs and self.limitJobLumis:
91     self.limitTotalLumis = True
92     self.totalNLumis = self.lumisPerJob * self.theNumberOfJobs
93    
94    
95 spiga 1.24 def ComputeSubBlockSites( self, blockSites ):
96     """
97     """
98     sub_blockSites = {}
99     for k,v in blockSites.iteritems():
100     sites=self.blackWhiteListParser.checkWhiteList(v)
101     if sites : sub_blockSites[k]=v
102     if len(sub_blockSites) < 1:
103     msg = 'WARNING: the sites %s is not hosting any part of data.'%self.seWhiteList
104     raise CrabException(msg)
105     return sub_blockSites
106    
107 spiga 1.1 ########################################################################
108     def jobSplittingByEvent( self ):
109     """
110     Perform job splitting. Jobs run over an integer number of files
111     and no more than one block.
112     ARGUMENT: blockSites: dictionary with blocks as keys and list of host sites as values
113     REQUIRES: self.selectTotalNumberEvents, self.selectEventsPerJob, self.selectNumberofJobs,
114     self.total_number_of_events, self.eventsPerJob, self.theNumberOfJobs,
115     self.maxEvents, self.filesbyblock
116 spiga 1.3 SETS: jobDestination - Site destination(s) for each job (a list of lists)
117 spiga 1.1 self.total_number_of_jobs - Total # of jobs
118     self.list_of_args - File(s) job will run on (a list of lists)
119     """
120    
121 ewv 1.22 jobDestination=[]
122 spiga 1.1 self.checkUserSettings()
123     if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
124     msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
125     raise CrabException(msg)
126 ewv 1.22
127     blockSites = self.args['blockSites']
128 spiga 1.4 pubdata = self.args['pubdata']
129 spiga 1.3 filesbyblock=pubdata.getFiles()
130    
131     self.eventsbyblock=pubdata.getEventsPerBlock()
132     self.eventsbyfile=pubdata.getEventsPerFile()
133     self.parentFiles=pubdata.getParent()
134 spiga 1.1
135     ## get max number of events
136 spiga 1.3 self.maxEvents=pubdata.getMaxEvents()
137 spiga 1.1
138     self.useParent = int(self.cfg_params.get('CMSSW.use_parent',0))
139     noBboundary = int(self.cfg_params.get('CMSSW.no_block_boundary',0))
140    
141 spiga 1.24 if noBboundary == 1:
142     if self.total_number_of_events== -1:
143     msg = 'You are selecting no_block_boundary=1 which does not allow to set total_number_of_events=-1\n'
144 ewv 1.26 msg +='\tYou shoud get the number of event from DBS web interface and use it for your configuration.'
145     raise CrabException(msg)
146 spiga 1.25 if len(self.seWhiteList) == 0 or len(self.seWhiteList.split(',')) != 1:
147 spiga 1.24 msg = 'You are selecting no_block_boundary=1 which requires to choose one and only one site.\n'
148     msg += "\tPlease set se_white_list with the site's storage element name."
149 ewv 1.26 raise CrabException(msg)
150     blockSites = self.ComputeSubBlockSites(blockSites)
151 spiga 1.24
152 spiga 1.1 # ---- Handle the possible job splitting configurations ---- #
153     if (self.selectTotalNumberEvents):
154     totalEventsRequested = self.total_number_of_events
155     if (self.selectEventsPerJob):
156     eventsPerJobRequested = self.eventsPerJob
157     if (self.selectNumberOfJobs):
158     totalEventsRequested = self.theNumberOfJobs * self.eventsPerJob
159    
160     # If user requested all the events in the dataset
161     if (totalEventsRequested == -1):
162     eventsRemaining=self.maxEvents
163     # If user requested more events than are in the dataset
164     elif (totalEventsRequested > self.maxEvents):
165     eventsRemaining = self.maxEvents
166 spiga 1.13 common.logger.info("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
167 spiga 1.1 # If user requested less events than are in the dataset
168     else:
169     eventsRemaining = totalEventsRequested
170    
171     # If user requested more events per job than are in the dataset
172     if (self.selectEventsPerJob and eventsPerJobRequested > self.maxEvents):
173     eventsPerJobRequested = self.maxEvents
174    
175     # For user info at end
176     totalEventCount = 0
177    
178     if (self.selectTotalNumberEvents and self.selectNumberOfJobs):
179     eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs)
180    
181     if (self.selectNumberOfJobs):
182 spiga 1.13 common.logger.info("May not create the exact number_of_jobs requested.")
183 spiga 1.1
184     # old... to remove Daniele
185     totalNumberOfJobs = 999999999
186    
187 spiga 1.3 blocks = blockSites.keys()
188 spiga 1.1 blockCount = 0
189     # Backup variable in case self.maxEvents counted events in a non-included block
190     numBlocksInDataset = len(blocks)
191    
192     jobCount = 0
193     list_of_lists = []
194    
195     # list tracking which jobs are in which jobs belong to which block
196     jobsOfBlock = {}
197    
198     parString = ""
199 spiga 1.16 pString = ""
200 spiga 1.1 filesEventCount = 0
201 ewv 1.22 msg=''
202 spiga 1.1
203     # ---- Iterate over the blocks in the dataset until ---- #
204     # ---- we've met the requested total # of events ---- #
205     while ( (eventsRemaining > 0) and (blockCount < numBlocksInDataset) and (jobCount < totalNumberOfJobs)):
206     block = blocks[blockCount]
207     blockCount += 1
208     if block not in jobsOfBlock.keys() :
209     jobsOfBlock[block] = []
210    
211     if self.eventsbyblock.has_key(block) :
212     numEventsInBlock = self.eventsbyblock[block]
213 spiga 1.13 common.logger.debug('Events in Block File '+str(numEventsInBlock))
214 spiga 1.1
215 spiga 1.4 files = filesbyblock[block]
216 spiga 1.1 numFilesInBlock = len(files)
217     if (numFilesInBlock <= 0):
218     continue
219     fileCount = 0
220     if noBboundary == 0: # DD
221     # ---- New block => New job ---- #
222     parString = ""
223 spiga 1.16 pString=""
224 spiga 1.1 # counter for number of events in files currently worked on
225     filesEventCount = 0
226     # flag if next while loop should touch new file
227     newFile = 1
228     # job event counter
229     jobSkipEventCount = 0
230    
231     # ---- Iterate over the files in the block until we've met the requested ---- #
232     # ---- total # of events or we've gone over all the files in this block ---- #
233 spiga 1.15 msg='\n'
234 spiga 1.1 while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
235     file = files[fileCount]
236     if self.useParent==1:
237     parent = self.parentFiles[file]
238 spiga 1.13 common.logger.log(10-1, "File "+str(file)+" has the following parents: "+str(parent))
239 spiga 1.1 if newFile :
240     try:
241     numEventsInFile = self.eventsbyfile[file]
242 spiga 1.13 common.logger.log(10-1, "File "+str(file)+" has "+str(numEventsInFile)+" events")
243 spiga 1.1 # increase filesEventCount
244     filesEventCount += numEventsInFile
245     # Add file to current job
246 spiga 1.11 parString += file + ','
247 spiga 1.16 if self.useParent==1:
248     for f in parent :
249     pString += f + ','
250 spiga 1.1 newFile = 0
251     except KeyError:
252 spiga 1.13 common.logger.info("File "+str(file)+" has unknown number of events: skipping")
253 spiga 1.1
254     eventsPerJobRequested = min(eventsPerJobRequested, eventsRemaining)
255     # if less events in file remain than eventsPerJobRequested
256     if ( filesEventCount - jobSkipEventCount < eventsPerJobRequested):
257     if noBboundary == 1: ## DD
258     newFile = 1
259     fileCount += 1
260     else:
261     # if last file in block
262     if ( fileCount == numFilesInBlock-1 ) :
263     # end job using last file, use remaining events in block
264     # close job and touch new file
265 spiga 1.11 fullString = parString[:-1]
266 spiga 1.1 if self.useParent==1:
267 spiga 1.11 fullParentString = pString[:-1]
268 spiga 1.1 list_of_lists.append([fullString,fullParentString,str(-1),str(jobSkipEventCount)])
269     else:
270     list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
271 spiga 1.15 msg += "Job %s can run over %s events (last file in block).\n"%(str(jobCount+1), str(filesEventCount - jobSkipEventCount))
272 spiga 1.3 jobDestination.append(blockSites[block])
273 slacapra 1.20 msg += "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
274 spiga 1.1 # fill jobs of block dictionary
275     jobsOfBlock[block].append(jobCount+1)
276     # reset counter
277     jobCount = jobCount + 1
278     totalEventCount = totalEventCount + filesEventCount - jobSkipEventCount
279     eventsRemaining = eventsRemaining - filesEventCount + jobSkipEventCount
280     jobSkipEventCount = 0
281     # reset file
282     pString = ""
283     parString = ""
284     filesEventCount = 0
285     newFile = 1
286     fileCount += 1
287     else :
288     # go to next file
289     newFile = 1
290     fileCount += 1
291     # if events in file equal to eventsPerJobRequested
292     elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
293     # close job and touch new file
294 spiga 1.11 fullString = parString[:-1]
295 spiga 1.1 if self.useParent==1:
296 spiga 1.11 fullParentString = pString[:-1]
297 spiga 1.1 list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
298     else:
299     list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
300 spiga 1.15 msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
301 spiga 1.3 jobDestination.append(blockSites[block])
302 slacapra 1.20 msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
303 spiga 1.1 jobsOfBlock[block].append(jobCount+1)
304     # reset counter
305     jobCount = jobCount + 1
306     totalEventCount = totalEventCount + eventsPerJobRequested
307     eventsRemaining = eventsRemaining - eventsPerJobRequested
308     jobSkipEventCount = 0
309     # reset file
310     pString = ""
311     parString = ""
312     filesEventCount = 0
313     newFile = 1
314     fileCount += 1
315    
316     # if more events in file remain than eventsPerJobRequested
317     else :
318     # close job but don't touch new file
319 spiga 1.11 fullString = parString[:-1]
320 spiga 1.1 if self.useParent==1:
321 spiga 1.11 fullParentString = pString[:-1]
322 spiga 1.1 list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
323     else:
324     list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
325 spiga 1.15 msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
326 spiga 1.3 jobDestination.append(blockSites[block])
327 slacapra 1.20 msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
328 spiga 1.1 jobsOfBlock[block].append(jobCount+1)
329     # increase counter
330     jobCount = jobCount + 1
331     totalEventCount = totalEventCount + eventsPerJobRequested
332     eventsRemaining = eventsRemaining - eventsPerJobRequested
333     # calculate skip events for last file
334     # use filesEventCount (contains several files), jobSkipEventCount and eventsPerJobRequest
335     jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
336     # remove all but the last file
337     filesEventCount = self.eventsbyfile[file]
338 spiga 1.16 pString_tmp=''
339 spiga 1.1 if self.useParent==1:
340 spiga 1.16 for f in parent : pString_tmp += f + ','
341 ewv 1.22 pString = pString_tmp
342 spiga 1.11 parString = file + ','
343 spiga 1.1 pass # END if
344     pass # END while (iterate over files in the block)
345     pass # END while (iterate over blocks in the dataset)
346 spiga 1.15 common.logger.debug(msg)
347 spiga 1.1 self.ncjobs = self.total_number_of_jobs = jobCount
348     if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
349 spiga 1.13 common.logger.info("Could not run on all requested events because some blocks not hosted at allowed sites.")
350     common.logger.info(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
351 ewv 1.22
352 spiga 1.1 # skip check on block with no sites DD
353 spiga 1.5 if noBboundary == 0 : self.checkBlockNoSite(blocks,jobsOfBlock,blockSites)
354 spiga 1.1
355     # prepare dict output
356     dictOut = {}
357 spiga 1.11 dictOut['params']= ['InputFiles','MaxEvents','SkipEvents']
358     if self.useParent: dictOut['params']= ['InputFiles','ParentFiles','MaxEvents','SkipEvents']
359 spiga 1.1 dictOut['args'] = list_of_lists
360 spiga 1.3 dictOut['jobDestination'] = jobDestination
361 spiga 1.1 dictOut['njobs']=self.total_number_of_jobs
362    
363     return dictOut
364    
365     # keep trace of block with no sites to print a warning at the end
366    
367 ewv 1.22 def checkBlockNoSite(self,blocks,jobsOfBlock,blockSites):
368 spiga 1.1 # screen output
369     screenOutput = "List of jobs and available destination sites:\n\n"
370     noSiteBlock = []
371     bloskNoSite = []
372 spiga 1.10 allBlock = []
373 spiga 1.1
374     blockCounter = 0
375     for block in blocks:
376     if block in jobsOfBlock.keys() :
377     blockCounter += 1
378 spiga 1.10 allBlock.append( blockCounter )
379 slacapra 1.19 sites=self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],[block]),[block])
380 spiga 1.1 screenOutput += "Block %5i: jobs %20s: sites: %s\n" % (blockCounter,spanRanges(jobsOfBlock[block]),
381 slacapra 1.20 ', '.join(SE2CMS(sites)))
382 slacapra 1.19 if len(sites) == 0:
383 spiga 1.1 noSiteBlock.append( spanRanges(jobsOfBlock[block]) )
384     bloskNoSite.append( blockCounter )
385    
386 spiga 1.13 common.logger.info(screenOutput)
387 spiga 1.1 if len(noSiteBlock) > 0 and len(bloskNoSite) > 0:
388     msg = 'WARNING: No sites are hosting any part of data for block:\n '
389     virgola = ""
390     if len(bloskNoSite) > 1:
391     virgola = ","
392     for block in bloskNoSite:
393     msg += ' ' + str(block) + virgola
394 spiga 1.25 msg += '\n\t\tRelated jobs:\n '
395 spiga 1.1 virgola = ""
396     if len(noSiteBlock) > 1:
397     virgola = ","
398     for range_jobs in noSiteBlock:
399     msg += str(range_jobs) + virgola
400 spiga 1.25 msg += '\n\t\twill not be submitted and this block of data can not be analyzed!\n'
401 spiga 1.14 if self.cfg_params.has_key('GRID.se_white_list'):
402 spiga 1.25 msg += '\tWARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
403     msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
404     msg += '\tPlease check if the dataset is available at this site!)'
405 spiga 1.14 if self.cfg_params.has_key('GRID.ce_white_list'):
406 spiga 1.25 msg += '\tWARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
407     msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
408     msg += '\tPlease check if the dataset is available at this site!)\n'
409 spiga 1.1
410 spiga 1.13 common.logger.info(msg)
411 spiga 1.1
412 spiga 1.10 if bloskNoSite == allBlock:
413 ewv 1.22 raise CrabException('No jobs created')
414 spiga 1.10
415 spiga 1.1 return
416    
417    
418     ########################################################################
419 ewv 1.22 def jobSplittingByRun(self):
420 spiga 1.1 """
421     """
422    
423     self.checkUserSettings()
424 ewv 1.22 blockSites = self.args['blockSites']
425 spiga 1.4 pubdata = self.args['pubdata']
426 spiga 1.1
427     if self.selectNumberOfJobs == 0 :
428     self.theNumberOfJobs = 9999999
429     blocks = {}
430 ewv 1.22 runList = []
431 spiga 1.1 thefiles = Fileset(name='FilesToSplit')
432 spiga 1.3 fileList = pubdata.getListFiles()
433 spiga 1.1 for f in fileList:
434     block = f['Block']['Name']
435 ewv 1.22 try:
436 spiga 1.3 f['Block']['StorageElementList'].extend(blockSites[block])
437 spiga 1.1 except:
438     continue
439     wmbsFile = File(f['LogicalFileName'])
440 spiga 1.3 [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
441 spiga 1.1 wmbsFile['block'] = block
442     runNum = f['RunsList'][0]['RunNumber']
443 ewv 1.22 runList.append(runNum)
444 spiga 1.1 myRun = Run(runNumber=runNum)
445     wmbsFile.addRun( myRun )
446     thefiles.addFile(
447     wmbsFile
448     )
449 ewv 1.22
450 spiga 1.1 work = Workflow()
451     subs = Subscription(
452     fileset = thefiles,
453     workflow = work,
454     split_algo = 'RunBased',
455     type = "Processing")
456     splitter = SplitterFactory()
457     jobfactory = splitter(subs)
458 ewv 1.22
459     #loop over all runs
460 spiga 1.1 list_of_lists = []
461     jobDestination = []
462     count = 0
463 spiga 1.17 for jobGroup in jobfactory():
464 spiga 1.1 if count < self.theNumberOfJobs:
465 spiga 1.17 res = self.getJobInfo(jobGroup)
466 ewv 1.22 parString = ''
467 spiga 1.1 for file in res['lfns']:
468 spiga 1.11 parString += file + ','
469     fullString = parString[:-1]
470 ewv 1.22 list_of_lists.append([fullString,str(-1),str(0)])
471 spiga 1.2 #need to check single file location
472 ewv 1.22 jobDestination.append(res['locations'])
473 spiga 1.1 count +=1
474     # prepare dict output
475     dictOut = {}
476 spiga 1.11 dictOut['params']= ['InputFiles','MaxEvents','SkipEvents']
477 spiga 1.1 dictOut['args'] = list_of_lists
478     dictOut['jobDestination'] = jobDestination
479     dictOut['njobs']=count
480    
481     return dictOut
482    
483     def getJobInfo( self,jobGroup ):
484     res = {}
485 ewv 1.22 lfns = []
486     locations = []
487 spiga 1.1 tmp_check=0
488     for job in jobGroup.jobs:
489     for file in job.getFiles():
490 ewv 1.22 lfns.append(file['lfn'])
491 spiga 1.1 for loc in file['locations']:
492     if tmp_check < 1 :
493     locations.append(loc)
494 ewv 1.22 tmp_check = tmp_check + 1
495     ### qui va messo il check per la locations
496     res['lfns'] = lfns
497     res['locations'] = locations
498     return res
499    
500 spiga 1.1 ########################################################################
501 spiga 1.23 def prepareSplittingNoInput(self):
502 spiga 1.1 """
503     """
504     if (self.selectEventsPerJob):
505 spiga 1.13 common.logger.info('Required '+str(self.eventsPerJob)+' events per job ')
506 spiga 1.1 if (self.selectNumberOfJobs):
507 spiga 1.13 common.logger.info('Required '+str(self.theNumberOfJobs)+' jobs in total ')
508 spiga 1.1 if (self.selectTotalNumberEvents):
509 spiga 1.13 common.logger.info('Required '+str(self.total_number_of_events)+' events in total ')
510 spiga 1.1
511     if (self.total_number_of_events < 0):
512     msg='Cannot split jobs per Events with "-1" as total number of events'
513     raise CrabException(msg)
514    
515     if (self.selectEventsPerJob):
516     if (self.selectTotalNumberEvents):
517     self.total_number_of_jobs = int(self.total_number_of_events/self.eventsPerJob)
518     elif(self.selectNumberOfJobs) :
519     self.total_number_of_jobs =self.theNumberOfJobs
520     self.total_number_of_events =int(self.theNumberOfJobs*self.eventsPerJob)
521    
522     elif (self.selectNumberOfJobs) :
523     self.total_number_of_jobs = self.theNumberOfJobs
524     self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
525    
526 spiga 1.23
527     def jobSplittingNoInput(self):
528     """
529     Perform job splitting based on number of event per job
530     """
531     common.logger.debug('Splitting per events')
532     self.checkUserSettings()
533     jobDestination=[]
534     if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
535     msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
536     raise CrabException(msg)
537    
538     managedGenerators =self.args['managedGenerators']
539     generator = self.args['generator']
540 ewv 1.30 firstLumi = self.cfg_params.get('CMSSW.first_lumi', 1)
541 spiga 1.23
542     self.prepareSplittingNoInput()
543    
544 spiga 1.13 common.logger.debug('N jobs '+str(self.total_number_of_jobs))
545 spiga 1.1
546     # is there any remainder?
547     check = int(self.total_number_of_events) - (int(self.total_number_of_jobs)*self.eventsPerJob)
548    
549 spiga 1.13 common.logger.debug('Check '+str(check))
550 spiga 1.1
551 spiga 1.13 common.logger.info(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
552 spiga 1.1 if check > 0:
553 spiga 1.13 common.logger.info('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
554 spiga 1.1
555     # argument is seed number.$i
556     self.list_of_args = []
557     for i in range(self.total_number_of_jobs):
558     ## Since there is no input, any site is good
559 ewv 1.29 jobDestination.append([""]) # must be empty to correctly write the XML
560 spiga 1.1 args=[]
561 ewv 1.30 if (firstLumi): # Pythia first lumi
562     args.append(str(int(firstLumi)+i))
563 spiga 1.3 if (generator in managedGenerators):
564 ewv 1.22 args.append(generator)
565     if (generator == 'comphep' and i == 0):
566 spiga 1.1 # COMPHEP is brain-dead and wants event #'s like 1,100,200,300
567     args.append('1')
568 ewv 1.22 else:
569 spiga 1.1 args.append(str(i*self.eventsPerJob))
570 spiga 1.7 args.append(str(self.eventsPerJob))
571 spiga 1.1 self.list_of_args.append(args)
572     # prepare dict output
573 spiga 1.11
574 spiga 1.1 dictOut = {}
575 spiga 1.11 dictOut['params'] = ['MaxEvents']
576 ewv 1.30 if (firstLumi):
577     dictOut['params'] = ['FirstLumi','MaxEvents']
578     if (generator in managedGenerators):
579     dictOut['params'] = ['FirstLumi', 'Generator', 'FirstEvent', 'MaxEvents']
580 ewv 1.22 else:
581     if (generator in managedGenerators) :
582     dictOut['params'] = ['Generator', 'FirstEvent', 'MaxEvents']
583 spiga 1.1 dictOut['args'] = self.list_of_args
584 spiga 1.3 dictOut['jobDestination'] = jobDestination
585 spiga 1.1 dictOut['njobs']=self.total_number_of_jobs
586    
587     return dictOut
588    
589    
590     def jobSplittingForScript(self):
591     """
592     Perform job splitting based on number of job
593     """
594     self.checkUserSettings()
595 spiga 1.3 if (self.selectNumberOfJobs == 0):
596 spiga 1.1 msg = 'must specify number_of_jobs.'
597     raise crabexception(msg)
598 spiga 1.3 jobDestination = []
599 spiga 1.13 common.logger.debug('Splitting per job')
600     common.logger.info('Required '+str(self.theNumberOfJobs)+' jobs in total ')
601 spiga 1.1
602 spiga 1.23 # self.total_number_of_jobs = self.theNumberOfJobs
603    
604     self.prepareSplittingNoInput()
605 spiga 1.1
606 spiga 1.13 common.logger.debug('N jobs '+str(self.total_number_of_jobs))
607 spiga 1.1
608 spiga 1.13 common.logger.info(str(self.total_number_of_jobs)+' jobs can be created')
609 spiga 1.1
610     # argument is seed number.$i
611 spiga 1.23 self.list_of_args = []
612 spiga 1.1 for i in range(self.total_number_of_jobs):
613 spiga 1.23 args=[]
614 spiga 1.3 jobDestination.append([""])
615 spiga 1.23 if self.eventsPerJob != 0 :
616     args.append(str(self.eventsPerJob))
617     self.list_of_args.append(args)
618 spiga 1.1
619     # prepare dict output
620     dictOut = {}
621 spiga 1.23 dictOut['params'] = ['MaxEvents']
622     dictOut['args'] = self.list_of_args
623 spiga 1.3 dictOut['jobDestination'] = jobDestination
624 spiga 1.1 dictOut['njobs']=self.total_number_of_jobs
625     return dictOut
626    
627 ewv 1.22
628     def jobSplittingByLumi(self):
629 spiga 1.1 """
630 ewv 1.26 Split task into jobs by Lumi section paying attention to which
631     lumis should be run (according to the analysis dataset).
632     This uses WMBS job splitting which does not split files over jobs
633     so the job will have AT LEAST as many lumis as requested, perhaps
634     more
635 spiga 1.1 """
636 ewv 1.26
637     common.logger.debug('Splitting by Lumi')
638 ewv 1.27 self.checkLumiSettings()
639 ewv 1.26
640     blockSites = self.args['blockSites']
641     pubdata = self.args['pubdata']
642    
643     lumisPerFile = pubdata.getLumis()
644    
645     # Make the list of WMBS files for job splitter
646     fileList = pubdata.getListFiles()
647     thefiles = Fileset(name='FilesToSplit')
648     for jobFile in fileList:
649     block = jobFile['Block']['Name']
650     try:
651     jobFile['Block']['StorageElementList'].extend(blockSites[block])
652     except:
653     continue
654     wmbsFile = File(jobFile['LogicalFileName'])
655     [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
656     wmbsFile['block'] = block
657     for lumi in lumisPerFile[jobFile['LogicalFileName']]:
658     wmbsFile.addRun(Run(lumi[0], lumi[1]))
659     thefiles.addFile(wmbsFile)
660    
661 ewv 1.27 # Create the factory and workflow
662 ewv 1.26 work = Workflow()
663     subs = Subscription(fileset = thefiles, workflow = work,
664     split_algo = 'LumiBased', type = "Processing")
665     splitter = SplitterFactory()
666     jobFactory = splitter(subs)
667    
668     list_of_lists = []
669     jobDestination = []
670     jobCount = 0
671 ewv 1.27 lumisCreated = 0
672    
673     if not self.limitJobLumis:
674     self.lumisPerJob = pubdata.getMaxLumis() // self.theNumberOfJobs + 1
675     common.logger.info('Each job will process about %s lumis.' %
676     self.lumisPerJob)
677    
678     for jobGroup in jobFactory(lumis_per_job = self.lumisPerJob):
679 ewv 1.26 for job in jobGroup.jobs:
680 ewv 1.27 if (self.limitNJobs and jobCount >= self.theNumberOfJobs):
681     common.logger.info('Limit on number of jobs reached.')
682     break
683     if (self.limitTotalLumis and lumisCreated >= self.totalNLumis):
684     common.logger.info('Limit on number of lumis reached.')
685     break
686     lumis = []
687     lfns = []
688     locations = []
689     firstFile = True
690     # Collect information from all the files
691     for jobFile in job.getFiles():
692     if firstFile: # Get locations from first file in the job
693     for loc in jobFile['locations']:
694     locations.append(loc)
695     firstFile = False
696     # Accumulate Lumis from all files
697     for lumiList in jobFile['runs']:
698     theRun = lumiList.run
699     for theLumi in list(lumiList):
700     lumis.append( (theRun, theLumi) )
701    
702     lfns.append(jobFile['lfn'])
703     fileString = ','.join(lfns)
704     lumiString = compressLumiString(lumis)
705     list_of_lists.append([fileString, str(-1), str(0), lumiString])
706    
707     jobDestination.append(locations)
708     jobCount += 1
709     lumisCreated += len(lumis)
710     common.logger.debug('Job %s will run on %s files and %s lumis '
711     % (jobCount, len(lfns), len(lumis) ))
712    
713     common.logger.info('%s jobs created to run on %s lumis' %
714     (jobCount, lumisCreated))
715 ewv 1.26
716     # Prepare dict output matching back to non-WMBS job creation
717     dictOut = {}
718     dictOut['params'] = ['InputFiles', 'MaxEvents', 'SkipEvents', 'Lumis']
719     dictOut['args'] = list_of_lists
720     dictOut['jobDestination'] = jobDestination
721     dictOut['njobs'] = jobCount
722    
723     return dictOut
724    
725    
726 spiga 1.1 def Algos(self):
727     """
728     Define key splittingType matrix
729     """
730 ewv 1.22 SplitAlogs = {
731     'EventBased' : self.jobSplittingByEvent,
732 spiga 1.1 'RunBased' : self.jobSplittingByRun,
733 ewv 1.22 'LumiBased' : self.jobSplittingByLumi,
734     'NoInput' : self.jobSplittingNoInput,
735 spiga 1.1 'ForScript' : self.jobSplittingForScript
736 ewv 1.22 }
737 spiga 1.1 return SplitAlogs
738    
739 ewv 1.26
740    
741     def compressLumiString(lumis):
742     """
743     Turn a list of 2-tuples of run/lumi numbers into a list of the format
744     R1:L1,R2:L2-R3:L3 which is acceptable to CMSSW LumiBlockRange variable
745     """
746    
747     lumis.sort()
748     parts = []
749     startRange = None
750     endRange = None
751    
752     for lumiBlock in lumis:
753     if not startRange: # This is the first one
754     startRange = lumiBlock
755     endRange = lumiBlock
756     elif lumiBlock == endRange: # Same Lumi (different files?)
757     pass
758     elif lumiBlock[0] == endRange[0] and lumiBlock[1] == endRange[1] + 1: # This is a continuation
759     endRange = lumiBlock
760     else: # This is the start of a new range
761     part = ':'.join(map(str, startRange))
762     if startRange != endRange:
763     part += '-' + ':'.join(map(str, endRange))
764     parts.append(part)
765     startRange = lumiBlock
766     endRange = lumiBlock
767    
768     # Put out what's left
769     if startRange:
770     part = ':'.join(map(str, startRange))
771     if startRange != endRange:
772     part += '-' + ':'.join(map(str, endRange))
773     parts.append(part)
774    
775     output = ','.join(parts)
776     return output
777    
778