ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Splitter.py
Revision: 1.37
Committed: Wed May 26 19:46:12 2010 UTC (14 years, 11 months ago) by ewv
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_7_3_beta
Changes since 1.36: +9 -3 lines
Log Message:
Add runselection along with lumi_mask. Needs Splitter, LumiList, and DataDiscovery

File Contents

# User Rev Content
1 ewv 1.27
2 ewv 1.37 __revision__ = "$Id: Splitter.py,v 1.36 2010/04/09 13:17:52 ewv Exp $"
3     __version__ = "$Revision: 1.36 $"
4 ewv 1.27
5 spiga 1.1 import common
6     from crab_exceptions import *
7     from crab_util import *
8 ewv 1.26
9     from WMCore.DataStructs.File import File
10     from WMCore.DataStructs.Fileset import Fileset
11     from WMCore.DataStructs.Run import Run
12     from WMCore.DataStructs.Subscription import Subscription
13     from WMCore.DataStructs.Workflow import Workflow
14     from WMCore.JobSplitting.SplitterFactory import SplitterFactory
15 spiga 1.1 from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
16 spiga 1.33 from LumiList import LumiList
17 spiga 1.1
18     class JobSplitter:
19     def __init__( self, cfg_params, args ):
20     self.cfg_params = cfg_params
21 spiga 1.3 self.args=args
22 ewv 1.27
23     self.lumisPerJob = -1
24     self.totalNLumis = 0
25     self.theNumberOfJobs = 0
26     self.limitNJobs = False
27     self.limitTotalLumis = False
28     self.limitJobLumis = False
29    
30 spiga 1.1 #self.maxEvents
31     # init BlackWhiteListParser
32 spiga 1.24 self.seWhiteList = cfg_params.get('GRID.se_white_list',[])
33 spiga 1.14 seBlackList = cfg_params.get('GRID.se_black_list',[])
34 spiga 1.24 self.blackWhiteListParser = SEBlackWhiteListParser(self.seWhiteList, seBlackList, common.logger())
35 spiga 1.1
36    
37     def checkUserSettings(self):
38     ## Events per job
39     if self.cfg_params.has_key('CMSSW.events_per_job'):
40     self.eventsPerJob =int( self.cfg_params['CMSSW.events_per_job'])
41     self.selectEventsPerJob = 1
42     else:
43     self.eventsPerJob = -1
44     self.selectEventsPerJob = 0
45    
46     ## number of jobs
47     if self.cfg_params.has_key('CMSSW.number_of_jobs'):
48     self.theNumberOfJobs =int( self.cfg_params['CMSSW.number_of_jobs'])
49     self.selectNumberOfJobs = 1
50     else:
51     self.theNumberOfJobs = 0
52     self.selectNumberOfJobs = 0
53    
54     if self.cfg_params.has_key('CMSSW.total_number_of_events'):
55     self.total_number_of_events = int(self.cfg_params['CMSSW.total_number_of_events'])
56     self.selectTotalNumberEvents = 1
57     if self.selectNumberOfJobs == 1:
58     if (self.total_number_of_events != -1) and int(self.total_number_of_events) < int(self.theNumberOfJobs):
59     msg = 'Must specify at least one event per job. total_number_of_events > number_of_jobs '
60     raise CrabException(msg)
61     else:
62     self.total_number_of_events = 0
63     self.selectTotalNumberEvents = 0
64    
65 ewv 1.37 return
66 spiga 1.1
67 ewv 1.27 def checkLumiSettings(self):
68     """
69     Check to make sure the user has specified enough information to
70     perform splitting by Lumis to run the job
71     """
72     settings = 0
73     if self.cfg_params.has_key('CMSSW.lumis_per_job'):
74     self.lumisPerJob =int( self.cfg_params['CMSSW.lumis_per_job'])
75     self.limitJobLumis = True
76     settings += 1
77    
78     if self.cfg_params.has_key('CMSSW.number_of_jobs'):
79     self.theNumberOfJobs =int( self.cfg_params['CMSSW.number_of_jobs'])
80     self.limitNJobs = True
81     settings += 1
82    
83     if self.cfg_params.has_key('CMSSW.total_number_of_lumis'):
84     self.totalNLumis = int(self.cfg_params['CMSSW.total_number_of_lumis'])
85     self.limitTotalLumis = (self.totalNLumis != -1)
86     settings += 1
87    
88     if settings != 2:
89 ewv 1.35 msg = 'When splitting by lumi section you must specify two and only two of:\n'
90 ewv 1.28 msg += ' number_of_jobs, lumis_per_job, total_number_of_lumis'
91 ewv 1.27 raise CrabException(msg)
92     if self.limitNJobs and self.limitJobLumis:
93     self.limitTotalLumis = True
94     self.totalNLumis = self.lumisPerJob * self.theNumberOfJobs
95    
96 ewv 1.37 # Has the user specified runselection?
97     if (self.cfg_params.has_key('CMSSW.runselection')):
98     common.logger.info('You have specified runselection and split by lumi.')
99     common.logger.info('Good lumi list will be the intersection of runselection and lumimask or ADS (if any).')
100     return
101 ewv 1.27
102 spiga 1.24 def ComputeSubBlockSites( self, blockSites ):
103     """
104     """
105     sub_blockSites = {}
106     for k,v in blockSites.iteritems():
107     sites=self.blackWhiteListParser.checkWhiteList(v)
108     if sites : sub_blockSites[k]=v
109     if len(sub_blockSites) < 1:
110     msg = 'WARNING: the sites %s is not hosting any part of data.'%self.seWhiteList
111     raise CrabException(msg)
112     return sub_blockSites
113    
114 spiga 1.1 ########################################################################
115     def jobSplittingByEvent( self ):
116     """
117     Perform job splitting. Jobs run over an integer number of files
118     and no more than one block.
119     ARGUMENT: blockSites: dictionary with blocks as keys and list of host sites as values
120     REQUIRES: self.selectTotalNumberEvents, self.selectEventsPerJob, self.selectNumberofJobs,
121     self.total_number_of_events, self.eventsPerJob, self.theNumberOfJobs,
122     self.maxEvents, self.filesbyblock
123 spiga 1.3 SETS: jobDestination - Site destination(s) for each job (a list of lists)
124 spiga 1.1 self.total_number_of_jobs - Total # of jobs
125     self.list_of_args - File(s) job will run on (a list of lists)
126     """
127    
128 ewv 1.22 jobDestination=[]
129 spiga 1.1 self.checkUserSettings()
130     if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
131     msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
132     raise CrabException(msg)
133 ewv 1.22
134     blockSites = self.args['blockSites']
135 spiga 1.4 pubdata = self.args['pubdata']
136 spiga 1.3 filesbyblock=pubdata.getFiles()
137    
138     self.eventsbyblock=pubdata.getEventsPerBlock()
139     self.eventsbyfile=pubdata.getEventsPerFile()
140     self.parentFiles=pubdata.getParent()
141 spiga 1.1
142     ## get max number of events
143 spiga 1.3 self.maxEvents=pubdata.getMaxEvents()
144 spiga 1.1
145     self.useParent = int(self.cfg_params.get('CMSSW.use_parent',0))
146     noBboundary = int(self.cfg_params.get('CMSSW.no_block_boundary',0))
147    
148 spiga 1.24 if noBboundary == 1:
149     if self.total_number_of_events== -1:
150     msg = 'You are selecting no_block_boundary=1 which does not allow to set total_number_of_events=-1\n'
151 ewv 1.26 msg +='\tYou shoud get the number of event from DBS web interface and use it for your configuration.'
152     raise CrabException(msg)
153 spiga 1.25 if len(self.seWhiteList) == 0 or len(self.seWhiteList.split(',')) != 1:
154 spiga 1.24 msg = 'You are selecting no_block_boundary=1 which requires to choose one and only one site.\n'
155     msg += "\tPlease set se_white_list with the site's storage element name."
156 ewv 1.26 raise CrabException(msg)
157     blockSites = self.ComputeSubBlockSites(blockSites)
158 spiga 1.24
159 spiga 1.1 # ---- Handle the possible job splitting configurations ---- #
160     if (self.selectTotalNumberEvents):
161     totalEventsRequested = self.total_number_of_events
162     if (self.selectEventsPerJob):
163     eventsPerJobRequested = self.eventsPerJob
164     if (self.selectNumberOfJobs):
165     totalEventsRequested = self.theNumberOfJobs * self.eventsPerJob
166    
167     # If user requested all the events in the dataset
168     if (totalEventsRequested == -1):
169     eventsRemaining=self.maxEvents
170     # If user requested more events than are in the dataset
171     elif (totalEventsRequested > self.maxEvents):
172     eventsRemaining = self.maxEvents
173 spiga 1.13 common.logger.info("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
174 spiga 1.1 # If user requested less events than are in the dataset
175     else:
176     eventsRemaining = totalEventsRequested
177    
178     # If user requested more events per job than are in the dataset
179     if (self.selectEventsPerJob and eventsPerJobRequested > self.maxEvents):
180     eventsPerJobRequested = self.maxEvents
181    
182     # For user info at end
183     totalEventCount = 0
184    
185     if (self.selectTotalNumberEvents and self.selectNumberOfJobs):
186     eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs)
187    
188     if (self.selectNumberOfJobs):
189 spiga 1.13 common.logger.info("May not create the exact number_of_jobs requested.")
190 spiga 1.1
191     # old... to remove Daniele
192     totalNumberOfJobs = 999999999
193    
194 spiga 1.3 blocks = blockSites.keys()
195 spiga 1.1 blockCount = 0
196     # Backup variable in case self.maxEvents counted events in a non-included block
197     numBlocksInDataset = len(blocks)
198    
199     jobCount = 0
200     list_of_lists = []
201    
202     # list tracking which jobs are in which jobs belong to which block
203     jobsOfBlock = {}
204    
205     parString = ""
206 spiga 1.16 pString = ""
207 spiga 1.1 filesEventCount = 0
208 ewv 1.22 msg=''
209 spiga 1.1
210     # ---- Iterate over the blocks in the dataset until ---- #
211     # ---- we've met the requested total # of events ---- #
212     while ( (eventsRemaining > 0) and (blockCount < numBlocksInDataset) and (jobCount < totalNumberOfJobs)):
213     block = blocks[blockCount]
214     blockCount += 1
215     if block not in jobsOfBlock.keys() :
216     jobsOfBlock[block] = []
217    
218     if self.eventsbyblock.has_key(block) :
219     numEventsInBlock = self.eventsbyblock[block]
220 spiga 1.13 common.logger.debug('Events in Block File '+str(numEventsInBlock))
221 spiga 1.1
222 spiga 1.4 files = filesbyblock[block]
223 spiga 1.1 numFilesInBlock = len(files)
224     if (numFilesInBlock <= 0):
225     continue
226     fileCount = 0
227     if noBboundary == 0: # DD
228     # ---- New block => New job ---- #
229     parString = ""
230 spiga 1.16 pString=""
231 spiga 1.1 # counter for number of events in files currently worked on
232     filesEventCount = 0
233     # flag if next while loop should touch new file
234     newFile = 1
235     # job event counter
236     jobSkipEventCount = 0
237    
238     # ---- Iterate over the files in the block until we've met the requested ---- #
239     # ---- total # of events or we've gone over all the files in this block ---- #
240 spiga 1.15 msg='\n'
241 spiga 1.1 while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
242     file = files[fileCount]
243     if self.useParent==1:
244     parent = self.parentFiles[file]
245 spiga 1.13 common.logger.log(10-1, "File "+str(file)+" has the following parents: "+str(parent))
246 spiga 1.1 if newFile :
247     try:
248     numEventsInFile = self.eventsbyfile[file]
249 spiga 1.13 common.logger.log(10-1, "File "+str(file)+" has "+str(numEventsInFile)+" events")
250 spiga 1.1 # increase filesEventCount
251     filesEventCount += numEventsInFile
252     # Add file to current job
253 spiga 1.11 parString += file + ','
254 spiga 1.16 if self.useParent==1:
255     for f in parent :
256     pString += f + ','
257 spiga 1.1 newFile = 0
258     except KeyError:
259 spiga 1.13 common.logger.info("File "+str(file)+" has unknown number of events: skipping")
260 spiga 1.1
261     eventsPerJobRequested = min(eventsPerJobRequested, eventsRemaining)
262     # if less events in file remain than eventsPerJobRequested
263     if ( filesEventCount - jobSkipEventCount < eventsPerJobRequested):
264     if noBboundary == 1: ## DD
265     newFile = 1
266     fileCount += 1
267     else:
268     # if last file in block
269     if ( fileCount == numFilesInBlock-1 ) :
270     # end job using last file, use remaining events in block
271     # close job and touch new file
272 spiga 1.11 fullString = parString[:-1]
273 spiga 1.1 if self.useParent==1:
274 spiga 1.11 fullParentString = pString[:-1]
275 spiga 1.1 list_of_lists.append([fullString,fullParentString,str(-1),str(jobSkipEventCount)])
276     else:
277     list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
278 spiga 1.15 msg += "Job %s can run over %s events (last file in block).\n"%(str(jobCount+1), str(filesEventCount - jobSkipEventCount))
279 spiga 1.3 jobDestination.append(blockSites[block])
280 slacapra 1.20 msg += "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
281 spiga 1.1 # fill jobs of block dictionary
282     jobsOfBlock[block].append(jobCount+1)
283     # reset counter
284     jobCount = jobCount + 1
285     totalEventCount = totalEventCount + filesEventCount - jobSkipEventCount
286     eventsRemaining = eventsRemaining - filesEventCount + jobSkipEventCount
287     jobSkipEventCount = 0
288     # reset file
289     pString = ""
290     parString = ""
291     filesEventCount = 0
292     newFile = 1
293     fileCount += 1
294     else :
295     # go to next file
296     newFile = 1
297     fileCount += 1
298     # if events in file equal to eventsPerJobRequested
299     elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
300     # close job and touch new file
301 spiga 1.11 fullString = parString[:-1]
302 spiga 1.1 if self.useParent==1:
303 spiga 1.11 fullParentString = pString[:-1]
304 spiga 1.1 list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
305     else:
306     list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
307 spiga 1.15 msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
308 spiga 1.3 jobDestination.append(blockSites[block])
309 slacapra 1.20 msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
310 spiga 1.1 jobsOfBlock[block].append(jobCount+1)
311     # reset counter
312     jobCount = jobCount + 1
313     totalEventCount = totalEventCount + eventsPerJobRequested
314     eventsRemaining = eventsRemaining - eventsPerJobRequested
315     jobSkipEventCount = 0
316     # reset file
317     pString = ""
318     parString = ""
319     filesEventCount = 0
320     newFile = 1
321     fileCount += 1
322    
323     # if more events in file remain than eventsPerJobRequested
324     else :
325     # close job but don't touch new file
326 spiga 1.11 fullString = parString[:-1]
327 spiga 1.1 if self.useParent==1:
328 spiga 1.11 fullParentString = pString[:-1]
329 spiga 1.1 list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
330     else:
331     list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
332 spiga 1.15 msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
333 spiga 1.3 jobDestination.append(blockSites[block])
334 slacapra 1.20 msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
335 spiga 1.1 jobsOfBlock[block].append(jobCount+1)
336     # increase counter
337     jobCount = jobCount + 1
338     totalEventCount = totalEventCount + eventsPerJobRequested
339     eventsRemaining = eventsRemaining - eventsPerJobRequested
340     # calculate skip events for last file
341     # use filesEventCount (contains several files), jobSkipEventCount and eventsPerJobRequest
342     jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
343     # remove all but the last file
344     filesEventCount = self.eventsbyfile[file]
345 spiga 1.16 pString_tmp=''
346 spiga 1.1 if self.useParent==1:
347 spiga 1.16 for f in parent : pString_tmp += f + ','
348 ewv 1.22 pString = pString_tmp
349 spiga 1.11 parString = file + ','
350 spiga 1.1 pass # END if
351     pass # END while (iterate over files in the block)
352     pass # END while (iterate over blocks in the dataset)
353 spiga 1.15 common.logger.debug(msg)
354 spiga 1.1 self.ncjobs = self.total_number_of_jobs = jobCount
355     if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
356 spiga 1.13 common.logger.info("Could not run on all requested events because some blocks not hosted at allowed sites.")
357     common.logger.info(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
358 ewv 1.22
359 spiga 1.1 # skip check on block with no sites DD
360 spiga 1.5 if noBboundary == 0 : self.checkBlockNoSite(blocks,jobsOfBlock,blockSites)
361 spiga 1.1
362     # prepare dict output
363     dictOut = {}
364 spiga 1.11 dictOut['params']= ['InputFiles','MaxEvents','SkipEvents']
365     if self.useParent: dictOut['params']= ['InputFiles','ParentFiles','MaxEvents','SkipEvents']
366 spiga 1.1 dictOut['args'] = list_of_lists
367 spiga 1.3 dictOut['jobDestination'] = jobDestination
368 spiga 1.1 dictOut['njobs']=self.total_number_of_jobs
369    
370     return dictOut
371    
372     # keep trace of block with no sites to print a warning at the end
373    
374 ewv 1.22 def checkBlockNoSite(self,blocks,jobsOfBlock,blockSites):
375 spiga 1.1 # screen output
376     screenOutput = "List of jobs and available destination sites:\n\n"
377     noSiteBlock = []
378     bloskNoSite = []
379 spiga 1.10 allBlock = []
380 spiga 1.1
381     blockCounter = 0
382     for block in blocks:
383     if block in jobsOfBlock.keys() :
384     blockCounter += 1
385 spiga 1.10 allBlock.append( blockCounter )
386 slacapra 1.19 sites=self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],[block]),[block])
387 spiga 1.1 screenOutput += "Block %5i: jobs %20s: sites: %s\n" % (blockCounter,spanRanges(jobsOfBlock[block]),
388 slacapra 1.20 ', '.join(SE2CMS(sites)))
389 slacapra 1.19 if len(sites) == 0:
390 spiga 1.1 noSiteBlock.append( spanRanges(jobsOfBlock[block]) )
391     bloskNoSite.append( blockCounter )
392    
393 spiga 1.13 common.logger.info(screenOutput)
394 spiga 1.1 if len(noSiteBlock) > 0 and len(bloskNoSite) > 0:
395     msg = 'WARNING: No sites are hosting any part of data for block:\n '
396     virgola = ""
397     if len(bloskNoSite) > 1:
398     virgola = ","
399     for block in bloskNoSite:
400     msg += ' ' + str(block) + virgola
401 spiga 1.25 msg += '\n\t\tRelated jobs:\n '
402 spiga 1.1 virgola = ""
403     if len(noSiteBlock) > 1:
404     virgola = ","
405     for range_jobs in noSiteBlock:
406     msg += str(range_jobs) + virgola
407 spiga 1.25 msg += '\n\t\twill not be submitted and this block of data can not be analyzed!\n'
408 spiga 1.14 if self.cfg_params.has_key('GRID.se_white_list'):
409 spiga 1.25 msg += '\tWARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
410     msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
411     msg += '\tPlease check if the dataset is available at this site!)'
412 spiga 1.14 if self.cfg_params.has_key('GRID.ce_white_list'):
413 spiga 1.25 msg += '\tWARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
414     msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
415     msg += '\tPlease check if the dataset is available at this site!)\n'
416 spiga 1.1
417 spiga 1.13 common.logger.info(msg)
418 spiga 1.1
419 spiga 1.10 if bloskNoSite == allBlock:
420 spiga 1.32 msg += 'Requested jobs cannot be Created! \n'
421     if self.cfg_params.has_key('GRID.se_white_list'):
422     msg += '\tWARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
423     msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
424     msg += '\tPlease check if the dataset is available at this site!)'
425     if self.cfg_params.has_key('GRID.ce_white_list'):
426     msg += '\tWARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
427     msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
428     msg += '\tPlease check if the dataset is available at this site!)\n'
429     raise CrabException(msg)
430 spiga 1.10
431 spiga 1.1 return
432    
433    
434     ########################################################################
435 ewv 1.22 def jobSplittingByRun(self):
436 spiga 1.1 """
437     """
438    
439     self.checkUserSettings()
440 ewv 1.22 blockSites = self.args['blockSites']
441 spiga 1.4 pubdata = self.args['pubdata']
442 spiga 1.1
443     if self.selectNumberOfJobs == 0 :
444     self.theNumberOfJobs = 9999999
445     blocks = {}
446 ewv 1.22 runList = []
447 spiga 1.1 thefiles = Fileset(name='FilesToSplit')
448 spiga 1.3 fileList = pubdata.getListFiles()
449 spiga 1.1 for f in fileList:
450     block = f['Block']['Name']
451 ewv 1.22 try:
452 spiga 1.3 f['Block']['StorageElementList'].extend(blockSites[block])
453 spiga 1.1 except:
454     continue
455     wmbsFile = File(f['LogicalFileName'])
456 spiga 1.3 [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
457 spiga 1.1 wmbsFile['block'] = block
458     runNum = f['RunsList'][0]['RunNumber']
459 ewv 1.22 runList.append(runNum)
460 spiga 1.1 myRun = Run(runNumber=runNum)
461     wmbsFile.addRun( myRun )
462     thefiles.addFile(
463     wmbsFile
464     )
465 ewv 1.22
466 spiga 1.1 work = Workflow()
467     subs = Subscription(
468     fileset = thefiles,
469     workflow = work,
470     split_algo = 'RunBased',
471     type = "Processing")
472     splitter = SplitterFactory()
473     jobfactory = splitter(subs)
474 ewv 1.22
475     #loop over all runs
476 spiga 1.1 list_of_lists = []
477     jobDestination = []
478     count = 0
479 spiga 1.17 for jobGroup in jobfactory():
480 spiga 1.1 if count < self.theNumberOfJobs:
481 spiga 1.17 res = self.getJobInfo(jobGroup)
482 ewv 1.22 parString = ''
483 spiga 1.1 for file in res['lfns']:
484 spiga 1.11 parString += file + ','
485     fullString = parString[:-1]
486 ewv 1.22 list_of_lists.append([fullString,str(-1),str(0)])
487 spiga 1.2 #need to check single file location
488 ewv 1.22 jobDestination.append(res['locations'])
489 spiga 1.1 count +=1
490 ewv 1.37 # prepare dict output
491 spiga 1.1 dictOut = {}
492 spiga 1.11 dictOut['params']= ['InputFiles','MaxEvents','SkipEvents']
493 spiga 1.1 dictOut['args'] = list_of_lists
494     dictOut['jobDestination'] = jobDestination
495     dictOut['njobs']=count
496    
497     return dictOut
498    
499     def getJobInfo( self,jobGroup ):
500     res = {}
501 ewv 1.22 lfns = []
502     locations = []
503 spiga 1.1 tmp_check=0
504     for job in jobGroup.jobs:
505     for file in job.getFiles():
506 ewv 1.22 lfns.append(file['lfn'])
507 spiga 1.1 for loc in file['locations']:
508     if tmp_check < 1 :
509     locations.append(loc)
510 ewv 1.22 tmp_check = tmp_check + 1
511     ### qui va messo il check per la locations
512     res['lfns'] = lfns
513     res['locations'] = locations
514     return res
515    
516 spiga 1.1 ########################################################################
517 spiga 1.23 def prepareSplittingNoInput(self):
518 spiga 1.1 """
519     """
520     if (self.selectEventsPerJob):
521 spiga 1.13 common.logger.info('Required '+str(self.eventsPerJob)+' events per job ')
522 spiga 1.1 if (self.selectNumberOfJobs):
523 spiga 1.13 common.logger.info('Required '+str(self.theNumberOfJobs)+' jobs in total ')
524 spiga 1.1 if (self.selectTotalNumberEvents):
525 spiga 1.13 common.logger.info('Required '+str(self.total_number_of_events)+' events in total ')
526 spiga 1.1
527     if (self.total_number_of_events < 0):
528     msg='Cannot split jobs per Events with "-1" as total number of events'
529     raise CrabException(msg)
530    
531     if (self.selectEventsPerJob):
532     if (self.selectTotalNumberEvents):
533     self.total_number_of_jobs = int(self.total_number_of_events/self.eventsPerJob)
534     elif(self.selectNumberOfJobs) :
535     self.total_number_of_jobs =self.theNumberOfJobs
536     self.total_number_of_events =int(self.theNumberOfJobs*self.eventsPerJob)
537    
538     elif (self.selectNumberOfJobs) :
539     self.total_number_of_jobs = self.theNumberOfJobs
540     self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
541    
542 spiga 1.23
543     def jobSplittingNoInput(self):
544     """
545     Perform job splitting based on number of event per job
546     """
547     common.logger.debug('Splitting per events')
548     self.checkUserSettings()
549     jobDestination=[]
550     if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
551     msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
552     raise CrabException(msg)
553    
554     managedGenerators =self.args['managedGenerators']
555     generator = self.args['generator']
556 ewv 1.30 firstLumi = self.cfg_params.get('CMSSW.first_lumi', 1)
557 spiga 1.23
558     self.prepareSplittingNoInput()
559    
560 spiga 1.13 common.logger.debug('N jobs '+str(self.total_number_of_jobs))
561 spiga 1.1
562     # is there any remainder?
563     check = int(self.total_number_of_events) - (int(self.total_number_of_jobs)*self.eventsPerJob)
564    
565 spiga 1.13 common.logger.debug('Check '+str(check))
566 spiga 1.1
567 spiga 1.13 common.logger.info(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
568 spiga 1.1 if check > 0:
569 spiga 1.13 common.logger.info('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
570 spiga 1.1
571     # argument is seed number.$i
572     self.list_of_args = []
573     for i in range(self.total_number_of_jobs):
574     ## Since there is no input, any site is good
575 ewv 1.29 jobDestination.append([""]) # must be empty to correctly write the XML
576 spiga 1.1 args=[]
577 ewv 1.30 if (firstLumi): # Pythia first lumi
578     args.append(str(int(firstLumi)+i))
579 spiga 1.3 if (generator in managedGenerators):
580 ewv 1.22 args.append(generator)
581     if (generator == 'comphep' and i == 0):
582 spiga 1.1 # COMPHEP is brain-dead and wants event #'s like 1,100,200,300
583     args.append('1')
584 ewv 1.22 else:
585 spiga 1.1 args.append(str(i*self.eventsPerJob))
586 spiga 1.7 args.append(str(self.eventsPerJob))
587 spiga 1.1 self.list_of_args.append(args)
588     # prepare dict output
589 spiga 1.11
590 spiga 1.1 dictOut = {}
591 spiga 1.11 dictOut['params'] = ['MaxEvents']
592 ewv 1.30 if (firstLumi):
593     dictOut['params'] = ['FirstLumi','MaxEvents']
594     if (generator in managedGenerators):
595     dictOut['params'] = ['FirstLumi', 'Generator', 'FirstEvent', 'MaxEvents']
596 ewv 1.22 else:
597     if (generator in managedGenerators) :
598     dictOut['params'] = ['Generator', 'FirstEvent', 'MaxEvents']
599 spiga 1.1 dictOut['args'] = self.list_of_args
600 spiga 1.3 dictOut['jobDestination'] = jobDestination
601 spiga 1.1 dictOut['njobs']=self.total_number_of_jobs
602    
603     return dictOut
604    
605    
606     def jobSplittingForScript(self):
607     """
608     Perform job splitting based on number of job
609     """
610     self.checkUserSettings()
611 spiga 1.3 if (self.selectNumberOfJobs == 0):
612 spiga 1.1 msg = 'must specify number_of_jobs.'
613     raise crabexception(msg)
614 spiga 1.3 jobDestination = []
615 spiga 1.13 common.logger.debug('Splitting per job')
616     common.logger.info('Required '+str(self.theNumberOfJobs)+' jobs in total ')
617 spiga 1.1
618 spiga 1.23 # self.total_number_of_jobs = self.theNumberOfJobs
619    
620     self.prepareSplittingNoInput()
621 spiga 1.1
622 spiga 1.13 common.logger.debug('N jobs '+str(self.total_number_of_jobs))
623 spiga 1.1
624 spiga 1.13 common.logger.info(str(self.total_number_of_jobs)+' jobs can be created')
625 spiga 1.1
626     # argument is seed number.$i
627 spiga 1.23 self.list_of_args = []
628 spiga 1.1 for i in range(self.total_number_of_jobs):
629 spiga 1.23 args=[]
630 spiga 1.3 jobDestination.append([""])
631 spiga 1.23 if self.eventsPerJob != 0 :
632     args.append(str(self.eventsPerJob))
633     self.list_of_args.append(args)
634 spiga 1.1
635     # prepare dict output
636     dictOut = {}
637 spiga 1.23 dictOut['params'] = ['MaxEvents']
638     dictOut['args'] = self.list_of_args
639 spiga 1.3 dictOut['jobDestination'] = jobDestination
640 spiga 1.1 dictOut['njobs']=self.total_number_of_jobs
641     return dictOut
642    
643 ewv 1.22
644     def jobSplittingByLumi(self):
645 spiga 1.1 """
646 ewv 1.26 Split task into jobs by Lumi section paying attention to which
647     lumis should be run (according to the analysis dataset).
648     This uses WMBS job splitting which does not split files over jobs
649     so the job will have AT LEAST as many lumis as requested, perhaps
650     more
651 spiga 1.1 """
652 ewv 1.26
653     common.logger.debug('Splitting by Lumi')
654 ewv 1.27 self.checkLumiSettings()
655 ewv 1.26
656     blockSites = self.args['blockSites']
657     pubdata = self.args['pubdata']
658    
659     lumisPerFile = pubdata.getLumis()
660    
661     # Make the list of WMBS files for job splitter
662     fileList = pubdata.getListFiles()
663     thefiles = Fileset(name='FilesToSplit')
664     for jobFile in fileList:
665     block = jobFile['Block']['Name']
666     try:
667     jobFile['Block']['StorageElementList'].extend(blockSites[block])
668     except:
669     continue
670     wmbsFile = File(jobFile['LogicalFileName'])
671     [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
672     wmbsFile['block'] = block
673     for lumi in lumisPerFile[jobFile['LogicalFileName']]:
674     wmbsFile.addRun(Run(lumi[0], lumi[1]))
675     thefiles.addFile(wmbsFile)
676    
677 ewv 1.27 # Create the factory and workflow
678 ewv 1.26 work = Workflow()
679     subs = Subscription(fileset = thefiles, workflow = work,
680     split_algo = 'LumiBased', type = "Processing")
681     splitter = SplitterFactory()
682     jobFactory = splitter(subs)
683    
684     list_of_lists = []
685     jobDestination = []
686     jobCount = 0
687 ewv 1.27 lumisCreated = 0
688    
689     if not self.limitJobLumis:
690     self.lumisPerJob = pubdata.getMaxLumis() // self.theNumberOfJobs + 1
691     common.logger.info('Each job will process about %s lumis.' %
692     self.lumisPerJob)
693    
694     for jobGroup in jobFactory(lumis_per_job = self.lumisPerJob):
695 ewv 1.26 for job in jobGroup.jobs:
696 ewv 1.27 if (self.limitNJobs and jobCount >= self.theNumberOfJobs):
697     common.logger.info('Limit on number of jobs reached.')
698     break
699     if (self.limitTotalLumis and lumisCreated >= self.totalNLumis):
700     common.logger.info('Limit on number of lumis reached.')
701     break
702     lumis = []
703     lfns = []
704     locations = []
705     firstFile = True
706     # Collect information from all the files
707     for jobFile in job.getFiles():
708 ewv 1.34 doFile = False
709 ewv 1.27 if firstFile: # Get locations from first file in the job
710     for loc in jobFile['locations']:
711     locations.append(loc)
712     firstFile = False
713     # Accumulate Lumis from all files
714     for lumiList in jobFile['runs']:
715     theRun = lumiList.run
716     for theLumi in list(lumiList):
717 ewv 1.35 if (not self.limitTotalLumis) or \
718     (lumisCreated <= self.totalNLumis):
719 ewv 1.34 doFile = True
720 ewv 1.36 lumisCreated += 1
721 ewv 1.34 lumis.append( (theRun, theLumi) )
722     if doFile:
723     lfns.append(jobFile['lfn'])
724 ewv 1.27 fileString = ','.join(lfns)
725 spiga 1.33 lumiLister = LumiList(lumis = lumis)
726     lumiString = lumiLister.getCMSSWString()
727 ewv 1.27 list_of_lists.append([fileString, str(-1), str(0), lumiString])
728    
729     jobDestination.append(locations)
730     jobCount += 1
731     common.logger.debug('Job %s will run on %s files and %s lumis '
732     % (jobCount, len(lfns), len(lumis) ))
733    
734     common.logger.info('%s jobs created to run on %s lumis' %
735     (jobCount, lumisCreated))
736 ewv 1.26
737     # Prepare dict output matching back to non-WMBS job creation
738     dictOut = {}
739     dictOut['params'] = ['InputFiles', 'MaxEvents', 'SkipEvents', 'Lumis']
740     dictOut['args'] = list_of_lists
741     dictOut['jobDestination'] = jobDestination
742     dictOut['njobs'] = jobCount
743    
744     return dictOut
745    
746    
747 spiga 1.1 def Algos(self):
748     """
749     Define key splittingType matrix
750     """
751 ewv 1.22 SplitAlogs = {
752     'EventBased' : self.jobSplittingByEvent,
753 spiga 1.1 'RunBased' : self.jobSplittingByRun,
754 ewv 1.22 'LumiBased' : self.jobSplittingByLumi,
755     'NoInput' : self.jobSplittingNoInput,
756 spiga 1.1 'ForScript' : self.jobSplittingForScript
757 ewv 1.22 }
758 spiga 1.1 return SplitAlogs
759