ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Splitter.py
Revision: 1.33
Committed: Sun Feb 21 12:47:19 2010 UTC (15 years, 2 months ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_LumiMask, CRAB_2_7_lumi, from_LimiMask
Changes since 1.32: +5 -43 lines
Log Message:
merging LumiMask..

File Contents

# User Rev Content
1 ewv 1.27
2 spiga 1.33 __revision__ = "$Id: Splitter.py,v 1.32.2.1 2010/02/05 16:05:29 ewv Exp $"
3     __version__ = "$Revision: 1.32.2.1 $"
4 ewv 1.27
5 spiga 1.1 import common
6     from crab_exceptions import *
7     from crab_util import *
8 ewv 1.26
9     from WMCore.DataStructs.File import File
10     from WMCore.DataStructs.Fileset import Fileset
11     from WMCore.DataStructs.Run import Run
12     from WMCore.DataStructs.Subscription import Subscription
13     from WMCore.DataStructs.Workflow import Workflow
14     from WMCore.JobSplitting.SplitterFactory import SplitterFactory
15 spiga 1.1 from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
16 spiga 1.33 from LumiList import LumiList
17 spiga 1.1
18     class JobSplitter:
19     def __init__( self, cfg_params, args ):
20     self.cfg_params = cfg_params
21 spiga 1.3 self.args=args
22 ewv 1.27
23     self.lumisPerJob = -1
24     self.totalNLumis = 0
25     self.theNumberOfJobs = 0
26     self.limitNJobs = False
27     self.limitTotalLumis = False
28     self.limitJobLumis = False
29    
30 spiga 1.1 #self.maxEvents
31     # init BlackWhiteListParser
32 spiga 1.24 self.seWhiteList = cfg_params.get('GRID.se_white_list',[])
33 spiga 1.14 seBlackList = cfg_params.get('GRID.se_black_list',[])
34 spiga 1.24 self.blackWhiteListParser = SEBlackWhiteListParser(self.seWhiteList, seBlackList, common.logger())
35 spiga 1.1
36    
37     def checkUserSettings(self):
38     ## Events per job
39     if self.cfg_params.has_key('CMSSW.events_per_job'):
40     self.eventsPerJob =int( self.cfg_params['CMSSW.events_per_job'])
41     self.selectEventsPerJob = 1
42     else:
43     self.eventsPerJob = -1
44     self.selectEventsPerJob = 0
45    
46     ## number of jobs
47     if self.cfg_params.has_key('CMSSW.number_of_jobs'):
48     self.theNumberOfJobs =int( self.cfg_params['CMSSW.number_of_jobs'])
49     self.selectNumberOfJobs = 1
50     else:
51     self.theNumberOfJobs = 0
52     self.selectNumberOfJobs = 0
53    
54     if self.cfg_params.has_key('CMSSW.total_number_of_events'):
55     self.total_number_of_events = int(self.cfg_params['CMSSW.total_number_of_events'])
56     self.selectTotalNumberEvents = 1
57     if self.selectNumberOfJobs == 1:
58     if (self.total_number_of_events != -1) and int(self.total_number_of_events) < int(self.theNumberOfJobs):
59     msg = 'Must specify at least one event per job. total_number_of_events > number_of_jobs '
60     raise CrabException(msg)
61     else:
62     self.total_number_of_events = 0
63     self.selectTotalNumberEvents = 0
64    
65    
66 ewv 1.27 def checkLumiSettings(self):
67     """
68     Check to make sure the user has specified enough information to
69     perform splitting by Lumis to run the job
70     """
71     settings = 0
72     if self.cfg_params.has_key('CMSSW.lumis_per_job'):
73     self.lumisPerJob =int( self.cfg_params['CMSSW.lumis_per_job'])
74     self.limitJobLumis = True
75     settings += 1
76    
77     if self.cfg_params.has_key('CMSSW.number_of_jobs'):
78     self.theNumberOfJobs =int( self.cfg_params['CMSSW.number_of_jobs'])
79     self.limitNJobs = True
80     settings += 1
81    
82     if self.cfg_params.has_key('CMSSW.total_number_of_lumis'):
83     self.totalNLumis = int(self.cfg_params['CMSSW.total_number_of_lumis'])
84     self.limitTotalLumis = (self.totalNLumis != -1)
85     settings += 1
86    
87     if settings != 2:
88     msg = 'When running on analysis datasets you must specify two and only two of:\n'
89 ewv 1.28 msg += ' number_of_jobs, lumis_per_job, total_number_of_lumis'
90 ewv 1.27 raise CrabException(msg)
91     if self.limitNJobs and self.limitJobLumis:
92     self.limitTotalLumis = True
93     self.totalNLumis = self.lumisPerJob * self.theNumberOfJobs
94    
95    
96 spiga 1.24 def ComputeSubBlockSites( self, blockSites ):
97     """
98     """
99     sub_blockSites = {}
100     for k,v in blockSites.iteritems():
101     sites=self.blackWhiteListParser.checkWhiteList(v)
102     if sites : sub_blockSites[k]=v
103     if len(sub_blockSites) < 1:
104     msg = 'WARNING: the sites %s is not hosting any part of data.'%self.seWhiteList
105     raise CrabException(msg)
106     return sub_blockSites
107    
108 spiga 1.1 ########################################################################
109     def jobSplittingByEvent( self ):
110     """
111     Perform job splitting. Jobs run over an integer number of files
112     and no more than one block.
113     ARGUMENT: blockSites: dictionary with blocks as keys and list of host sites as values
114     REQUIRES: self.selectTotalNumberEvents, self.selectEventsPerJob, self.selectNumberofJobs,
115     self.total_number_of_events, self.eventsPerJob, self.theNumberOfJobs,
116     self.maxEvents, self.filesbyblock
117 spiga 1.3 SETS: jobDestination - Site destination(s) for each job (a list of lists)
118 spiga 1.1 self.total_number_of_jobs - Total # of jobs
119     self.list_of_args - File(s) job will run on (a list of lists)
120     """
121    
122 ewv 1.22 jobDestination=[]
123 spiga 1.1 self.checkUserSettings()
124     if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
125     msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
126     raise CrabException(msg)
127 ewv 1.22
128     blockSites = self.args['blockSites']
129 spiga 1.4 pubdata = self.args['pubdata']
130 spiga 1.3 filesbyblock=pubdata.getFiles()
131    
132     self.eventsbyblock=pubdata.getEventsPerBlock()
133     self.eventsbyfile=pubdata.getEventsPerFile()
134     self.parentFiles=pubdata.getParent()
135 spiga 1.1
136     ## get max number of events
137 spiga 1.3 self.maxEvents=pubdata.getMaxEvents()
138 spiga 1.1
139     self.useParent = int(self.cfg_params.get('CMSSW.use_parent',0))
140     noBboundary = int(self.cfg_params.get('CMSSW.no_block_boundary',0))
141    
142 spiga 1.24 if noBboundary == 1:
143     if self.total_number_of_events== -1:
144     msg = 'You are selecting no_block_boundary=1 which does not allow to set total_number_of_events=-1\n'
145 ewv 1.26 msg +='\tYou shoud get the number of event from DBS web interface and use it for your configuration.'
146     raise CrabException(msg)
147 spiga 1.25 if len(self.seWhiteList) == 0 or len(self.seWhiteList.split(',')) != 1:
148 spiga 1.24 msg = 'You are selecting no_block_boundary=1 which requires to choose one and only one site.\n'
149     msg += "\tPlease set se_white_list with the site's storage element name."
150 ewv 1.26 raise CrabException(msg)
151     blockSites = self.ComputeSubBlockSites(blockSites)
152 spiga 1.24
153 spiga 1.1 # ---- Handle the possible job splitting configurations ---- #
154     if (self.selectTotalNumberEvents):
155     totalEventsRequested = self.total_number_of_events
156     if (self.selectEventsPerJob):
157     eventsPerJobRequested = self.eventsPerJob
158     if (self.selectNumberOfJobs):
159     totalEventsRequested = self.theNumberOfJobs * self.eventsPerJob
160    
161     # If user requested all the events in the dataset
162     if (totalEventsRequested == -1):
163     eventsRemaining=self.maxEvents
164     # If user requested more events than are in the dataset
165     elif (totalEventsRequested > self.maxEvents):
166     eventsRemaining = self.maxEvents
167 spiga 1.13 common.logger.info("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
168 spiga 1.1 # If user requested less events than are in the dataset
169     else:
170     eventsRemaining = totalEventsRequested
171    
172     # If user requested more events per job than are in the dataset
173     if (self.selectEventsPerJob and eventsPerJobRequested > self.maxEvents):
174     eventsPerJobRequested = self.maxEvents
175    
176     # For user info at end
177     totalEventCount = 0
178    
179     if (self.selectTotalNumberEvents and self.selectNumberOfJobs):
180     eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs)
181    
182     if (self.selectNumberOfJobs):
183 spiga 1.13 common.logger.info("May not create the exact number_of_jobs requested.")
184 spiga 1.1
185     # old... to remove Daniele
186     totalNumberOfJobs = 999999999
187    
188 spiga 1.3 blocks = blockSites.keys()
189 spiga 1.1 blockCount = 0
190     # Backup variable in case self.maxEvents counted events in a non-included block
191     numBlocksInDataset = len(blocks)
192    
193     jobCount = 0
194     list_of_lists = []
195    
196     # list tracking which jobs are in which jobs belong to which block
197     jobsOfBlock = {}
198    
199     parString = ""
200 spiga 1.16 pString = ""
201 spiga 1.1 filesEventCount = 0
202 ewv 1.22 msg=''
203 spiga 1.1
204     # ---- Iterate over the blocks in the dataset until ---- #
205     # ---- we've met the requested total # of events ---- #
206     while ( (eventsRemaining > 0) and (blockCount < numBlocksInDataset) and (jobCount < totalNumberOfJobs)):
207     block = blocks[blockCount]
208     blockCount += 1
209     if block not in jobsOfBlock.keys() :
210     jobsOfBlock[block] = []
211    
212     if self.eventsbyblock.has_key(block) :
213     numEventsInBlock = self.eventsbyblock[block]
214 spiga 1.13 common.logger.debug('Events in Block File '+str(numEventsInBlock))
215 spiga 1.1
216 spiga 1.4 files = filesbyblock[block]
217 spiga 1.1 numFilesInBlock = len(files)
218     if (numFilesInBlock <= 0):
219     continue
220     fileCount = 0
221     if noBboundary == 0: # DD
222     # ---- New block => New job ---- #
223     parString = ""
224 spiga 1.16 pString=""
225 spiga 1.1 # counter for number of events in files currently worked on
226     filesEventCount = 0
227     # flag if next while loop should touch new file
228     newFile = 1
229     # job event counter
230     jobSkipEventCount = 0
231    
232     # ---- Iterate over the files in the block until we've met the requested ---- #
233     # ---- total # of events or we've gone over all the files in this block ---- #
234 spiga 1.15 msg='\n'
235 spiga 1.1 while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
236     file = files[fileCount]
237     if self.useParent==1:
238     parent = self.parentFiles[file]
239 spiga 1.13 common.logger.log(10-1, "File "+str(file)+" has the following parents: "+str(parent))
240 spiga 1.1 if newFile :
241     try:
242     numEventsInFile = self.eventsbyfile[file]
243 spiga 1.13 common.logger.log(10-1, "File "+str(file)+" has "+str(numEventsInFile)+" events")
244 spiga 1.1 # increase filesEventCount
245     filesEventCount += numEventsInFile
246     # Add file to current job
247 spiga 1.11 parString += file + ','
248 spiga 1.16 if self.useParent==1:
249     for f in parent :
250     pString += f + ','
251 spiga 1.1 newFile = 0
252     except KeyError:
253 spiga 1.13 common.logger.info("File "+str(file)+" has unknown number of events: skipping")
254 spiga 1.1
255     eventsPerJobRequested = min(eventsPerJobRequested, eventsRemaining)
256     # if less events in file remain than eventsPerJobRequested
257     if ( filesEventCount - jobSkipEventCount < eventsPerJobRequested):
258     if noBboundary == 1: ## DD
259     newFile = 1
260     fileCount += 1
261     else:
262     # if last file in block
263     if ( fileCount == numFilesInBlock-1 ) :
264     # end job using last file, use remaining events in block
265     # close job and touch new file
266 spiga 1.11 fullString = parString[:-1]
267 spiga 1.1 if self.useParent==1:
268 spiga 1.11 fullParentString = pString[:-1]
269 spiga 1.1 list_of_lists.append([fullString,fullParentString,str(-1),str(jobSkipEventCount)])
270     else:
271     list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
272 spiga 1.15 msg += "Job %s can run over %s events (last file in block).\n"%(str(jobCount+1), str(filesEventCount - jobSkipEventCount))
273 spiga 1.3 jobDestination.append(blockSites[block])
274 slacapra 1.20 msg += "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
275 spiga 1.1 # fill jobs of block dictionary
276     jobsOfBlock[block].append(jobCount+1)
277     # reset counter
278     jobCount = jobCount + 1
279     totalEventCount = totalEventCount + filesEventCount - jobSkipEventCount
280     eventsRemaining = eventsRemaining - filesEventCount + jobSkipEventCount
281     jobSkipEventCount = 0
282     # reset file
283     pString = ""
284     parString = ""
285     filesEventCount = 0
286     newFile = 1
287     fileCount += 1
288     else :
289     # go to next file
290     newFile = 1
291     fileCount += 1
292     # if events in file equal to eventsPerJobRequested
293     elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
294     # close job and touch new file
295 spiga 1.11 fullString = parString[:-1]
296 spiga 1.1 if self.useParent==1:
297 spiga 1.11 fullParentString = pString[:-1]
298 spiga 1.1 list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
299     else:
300     list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
301 spiga 1.15 msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
302 spiga 1.3 jobDestination.append(blockSites[block])
303 slacapra 1.20 msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
304 spiga 1.1 jobsOfBlock[block].append(jobCount+1)
305     # reset counter
306     jobCount = jobCount + 1
307     totalEventCount = totalEventCount + eventsPerJobRequested
308     eventsRemaining = eventsRemaining - eventsPerJobRequested
309     jobSkipEventCount = 0
310     # reset file
311     pString = ""
312     parString = ""
313     filesEventCount = 0
314     newFile = 1
315     fileCount += 1
316    
317     # if more events in file remain than eventsPerJobRequested
318     else :
319     # close job but don't touch new file
320 spiga 1.11 fullString = parString[:-1]
321 spiga 1.1 if self.useParent==1:
322 spiga 1.11 fullParentString = pString[:-1]
323 spiga 1.1 list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
324     else:
325     list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
326 spiga 1.15 msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
327 spiga 1.3 jobDestination.append(blockSites[block])
328 slacapra 1.20 msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
329 spiga 1.1 jobsOfBlock[block].append(jobCount+1)
330     # increase counter
331     jobCount = jobCount + 1
332     totalEventCount = totalEventCount + eventsPerJobRequested
333     eventsRemaining = eventsRemaining - eventsPerJobRequested
334     # calculate skip events for last file
335     # use filesEventCount (contains several files), jobSkipEventCount and eventsPerJobRequest
336     jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
337     # remove all but the last file
338     filesEventCount = self.eventsbyfile[file]
339 spiga 1.16 pString_tmp=''
340 spiga 1.1 if self.useParent==1:
341 spiga 1.16 for f in parent : pString_tmp += f + ','
342 ewv 1.22 pString = pString_tmp
343 spiga 1.11 parString = file + ','
344 spiga 1.1 pass # END if
345     pass # END while (iterate over files in the block)
346     pass # END while (iterate over blocks in the dataset)
347 spiga 1.15 common.logger.debug(msg)
348 spiga 1.1 self.ncjobs = self.total_number_of_jobs = jobCount
349     if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
350 spiga 1.13 common.logger.info("Could not run on all requested events because some blocks not hosted at allowed sites.")
351     common.logger.info(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
352 ewv 1.22
353 spiga 1.1 # skip check on block with no sites DD
354 spiga 1.5 if noBboundary == 0 : self.checkBlockNoSite(blocks,jobsOfBlock,blockSites)
355 spiga 1.1
356     # prepare dict output
357     dictOut = {}
358 spiga 1.11 dictOut['params']= ['InputFiles','MaxEvents','SkipEvents']
359     if self.useParent: dictOut['params']= ['InputFiles','ParentFiles','MaxEvents','SkipEvents']
360 spiga 1.1 dictOut['args'] = list_of_lists
361 spiga 1.3 dictOut['jobDestination'] = jobDestination
362 spiga 1.1 dictOut['njobs']=self.total_number_of_jobs
363    
364     return dictOut
365    
366     # keep trace of block with no sites to print a warning at the end
367    
368 ewv 1.22 def checkBlockNoSite(self,blocks,jobsOfBlock,blockSites):
369 spiga 1.1 # screen output
370     screenOutput = "List of jobs and available destination sites:\n\n"
371     noSiteBlock = []
372     bloskNoSite = []
373 spiga 1.10 allBlock = []
374 spiga 1.1
375     blockCounter = 0
376     for block in blocks:
377     if block in jobsOfBlock.keys() :
378     blockCounter += 1
379 spiga 1.10 allBlock.append( blockCounter )
380 slacapra 1.19 sites=self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],[block]),[block])
381 spiga 1.1 screenOutput += "Block %5i: jobs %20s: sites: %s\n" % (blockCounter,spanRanges(jobsOfBlock[block]),
382 slacapra 1.20 ', '.join(SE2CMS(sites)))
383 slacapra 1.19 if len(sites) == 0:
384 spiga 1.1 noSiteBlock.append( spanRanges(jobsOfBlock[block]) )
385     bloskNoSite.append( blockCounter )
386    
387 spiga 1.13 common.logger.info(screenOutput)
388 spiga 1.1 if len(noSiteBlock) > 0 and len(bloskNoSite) > 0:
389     msg = 'WARNING: No sites are hosting any part of data for block:\n '
390     virgola = ""
391     if len(bloskNoSite) > 1:
392     virgola = ","
393     for block in bloskNoSite:
394     msg += ' ' + str(block) + virgola
395 spiga 1.25 msg += '\n\t\tRelated jobs:\n '
396 spiga 1.1 virgola = ""
397     if len(noSiteBlock) > 1:
398     virgola = ","
399     for range_jobs in noSiteBlock:
400     msg += str(range_jobs) + virgola
401 spiga 1.25 msg += '\n\t\twill not be submitted and this block of data can not be analyzed!\n'
402 spiga 1.14 if self.cfg_params.has_key('GRID.se_white_list'):
403 spiga 1.25 msg += '\tWARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
404     msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
405     msg += '\tPlease check if the dataset is available at this site!)'
406 spiga 1.14 if self.cfg_params.has_key('GRID.ce_white_list'):
407 spiga 1.25 msg += '\tWARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
408     msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
409     msg += '\tPlease check if the dataset is available at this site!)\n'
410 spiga 1.1
411 spiga 1.13 common.logger.info(msg)
412 spiga 1.1
413 spiga 1.10 if bloskNoSite == allBlock:
414 spiga 1.32 msg += 'Requested jobs cannot be Created! \n'
415     if self.cfg_params.has_key('GRID.se_white_list'):
416     msg += '\tWARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
417     msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
418     msg += '\tPlease check if the dataset is available at this site!)'
419     if self.cfg_params.has_key('GRID.ce_white_list'):
420     msg += '\tWARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
421     msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
422     msg += '\tPlease check if the dataset is available at this site!)\n'
423     raise CrabException(msg)
424 spiga 1.10
425 spiga 1.1 return
426    
427    
428     ########################################################################
429 ewv 1.22 def jobSplittingByRun(self):
430 spiga 1.1 """
431     """
432    
433     self.checkUserSettings()
434 ewv 1.22 blockSites = self.args['blockSites']
435 spiga 1.4 pubdata = self.args['pubdata']
436 spiga 1.1
437     if self.selectNumberOfJobs == 0 :
438     self.theNumberOfJobs = 9999999
439     blocks = {}
440 ewv 1.22 runList = []
441 spiga 1.1 thefiles = Fileset(name='FilesToSplit')
442 spiga 1.3 fileList = pubdata.getListFiles()
443 spiga 1.1 for f in fileList:
444     block = f['Block']['Name']
445 ewv 1.22 try:
446 spiga 1.3 f['Block']['StorageElementList'].extend(blockSites[block])
447 spiga 1.1 except:
448     continue
449     wmbsFile = File(f['LogicalFileName'])
450 spiga 1.3 [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
451 spiga 1.1 wmbsFile['block'] = block
452     runNum = f['RunsList'][0]['RunNumber']
453 ewv 1.22 runList.append(runNum)
454 spiga 1.1 myRun = Run(runNumber=runNum)
455     wmbsFile.addRun( myRun )
456     thefiles.addFile(
457     wmbsFile
458     )
459 ewv 1.22
460 spiga 1.1 work = Workflow()
461     subs = Subscription(
462     fileset = thefiles,
463     workflow = work,
464     split_algo = 'RunBased',
465     type = "Processing")
466     splitter = SplitterFactory()
467     jobfactory = splitter(subs)
468 ewv 1.22
469     #loop over all runs
470 spiga 1.1 list_of_lists = []
471     jobDestination = []
472     count = 0
473 spiga 1.17 for jobGroup in jobfactory():
474 spiga 1.1 if count < self.theNumberOfJobs:
475 spiga 1.17 res = self.getJobInfo(jobGroup)
476 ewv 1.22 parString = ''
477 spiga 1.1 for file in res['lfns']:
478 spiga 1.11 parString += file + ','
479     fullString = parString[:-1]
480 ewv 1.22 list_of_lists.append([fullString,str(-1),str(0)])
481 spiga 1.2 #need to check single file location
482 ewv 1.22 jobDestination.append(res['locations'])
483 spiga 1.1 count +=1
484     # prepare dict output
485     dictOut = {}
486 spiga 1.11 dictOut['params']= ['InputFiles','MaxEvents','SkipEvents']
487 spiga 1.1 dictOut['args'] = list_of_lists
488     dictOut['jobDestination'] = jobDestination
489     dictOut['njobs']=count
490    
491     return dictOut
492    
493     def getJobInfo( self,jobGroup ):
494     res = {}
495 ewv 1.22 lfns = []
496     locations = []
497 spiga 1.1 tmp_check=0
498     for job in jobGroup.jobs:
499     for file in job.getFiles():
500 ewv 1.22 lfns.append(file['lfn'])
501 spiga 1.1 for loc in file['locations']:
502     if tmp_check < 1 :
503     locations.append(loc)
504 ewv 1.22 tmp_check = tmp_check + 1
505     ### qui va messo il check per la locations
506     res['lfns'] = lfns
507     res['locations'] = locations
508     return res
509    
510 spiga 1.1 ########################################################################
511 spiga 1.23 def prepareSplittingNoInput(self):
512 spiga 1.1 """
513     """
514     if (self.selectEventsPerJob):
515 spiga 1.13 common.logger.info('Required '+str(self.eventsPerJob)+' events per job ')
516 spiga 1.1 if (self.selectNumberOfJobs):
517 spiga 1.13 common.logger.info('Required '+str(self.theNumberOfJobs)+' jobs in total ')
518 spiga 1.1 if (self.selectTotalNumberEvents):
519 spiga 1.13 common.logger.info('Required '+str(self.total_number_of_events)+' events in total ')
520 spiga 1.1
521     if (self.total_number_of_events < 0):
522     msg='Cannot split jobs per Events with "-1" as total number of events'
523     raise CrabException(msg)
524    
525     if (self.selectEventsPerJob):
526     if (self.selectTotalNumberEvents):
527     self.total_number_of_jobs = int(self.total_number_of_events/self.eventsPerJob)
528     elif(self.selectNumberOfJobs) :
529     self.total_number_of_jobs =self.theNumberOfJobs
530     self.total_number_of_events =int(self.theNumberOfJobs*self.eventsPerJob)
531    
532     elif (self.selectNumberOfJobs) :
533     self.total_number_of_jobs = self.theNumberOfJobs
534     self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
535    
536 spiga 1.23
537     def jobSplittingNoInput(self):
538     """
539     Perform job splitting based on number of event per job
540     """
541     common.logger.debug('Splitting per events')
542     self.checkUserSettings()
543     jobDestination=[]
544     if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
545     msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
546     raise CrabException(msg)
547    
548     managedGenerators =self.args['managedGenerators']
549     generator = self.args['generator']
550 ewv 1.30 firstLumi = self.cfg_params.get('CMSSW.first_lumi', 1)
551 spiga 1.23
552     self.prepareSplittingNoInput()
553    
554 spiga 1.13 common.logger.debug('N jobs '+str(self.total_number_of_jobs))
555 spiga 1.1
556     # is there any remainder?
557     check = int(self.total_number_of_events) - (int(self.total_number_of_jobs)*self.eventsPerJob)
558    
559 spiga 1.13 common.logger.debug('Check '+str(check))
560 spiga 1.1
561 spiga 1.13 common.logger.info(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
562 spiga 1.1 if check > 0:
563 spiga 1.13 common.logger.info('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
564 spiga 1.1
565     # argument is seed number.$i
566     self.list_of_args = []
567     for i in range(self.total_number_of_jobs):
568     ## Since there is no input, any site is good
569 ewv 1.29 jobDestination.append([""]) # must be empty to correctly write the XML
570 spiga 1.1 args=[]
571 ewv 1.30 if (firstLumi): # Pythia first lumi
572     args.append(str(int(firstLumi)+i))
573 spiga 1.3 if (generator in managedGenerators):
574 ewv 1.22 args.append(generator)
575     if (generator == 'comphep' and i == 0):
576 spiga 1.1 # COMPHEP is brain-dead and wants event #'s like 1,100,200,300
577     args.append('1')
578 ewv 1.22 else:
579 spiga 1.1 args.append(str(i*self.eventsPerJob))
580 spiga 1.7 args.append(str(self.eventsPerJob))
581 spiga 1.1 self.list_of_args.append(args)
582     # prepare dict output
583 spiga 1.11
584 spiga 1.1 dictOut = {}
585 spiga 1.11 dictOut['params'] = ['MaxEvents']
586 ewv 1.30 if (firstLumi):
587     dictOut['params'] = ['FirstLumi','MaxEvents']
588     if (generator in managedGenerators):
589     dictOut['params'] = ['FirstLumi', 'Generator', 'FirstEvent', 'MaxEvents']
590 ewv 1.22 else:
591     if (generator in managedGenerators) :
592     dictOut['params'] = ['Generator', 'FirstEvent', 'MaxEvents']
593 spiga 1.1 dictOut['args'] = self.list_of_args
594 spiga 1.3 dictOut['jobDestination'] = jobDestination
595 spiga 1.1 dictOut['njobs']=self.total_number_of_jobs
596    
597     return dictOut
598    
599    
600     def jobSplittingForScript(self):
601     """
602     Perform job splitting based on number of job
603     """
604     self.checkUserSettings()
605 spiga 1.3 if (self.selectNumberOfJobs == 0):
606 spiga 1.1 msg = 'must specify number_of_jobs.'
607     raise crabexception(msg)
608 spiga 1.3 jobDestination = []
609 spiga 1.13 common.logger.debug('Splitting per job')
610     common.logger.info('Required '+str(self.theNumberOfJobs)+' jobs in total ')
611 spiga 1.1
612 spiga 1.23 # self.total_number_of_jobs = self.theNumberOfJobs
613    
614     self.prepareSplittingNoInput()
615 spiga 1.1
616 spiga 1.13 common.logger.debug('N jobs '+str(self.total_number_of_jobs))
617 spiga 1.1
618 spiga 1.13 common.logger.info(str(self.total_number_of_jobs)+' jobs can be created')
619 spiga 1.1
620     # argument is seed number.$i
621 spiga 1.23 self.list_of_args = []
622 spiga 1.1 for i in range(self.total_number_of_jobs):
623 spiga 1.23 args=[]
624 spiga 1.3 jobDestination.append([""])
625 spiga 1.23 if self.eventsPerJob != 0 :
626     args.append(str(self.eventsPerJob))
627     self.list_of_args.append(args)
628 spiga 1.1
629     # prepare dict output
630     dictOut = {}
631 spiga 1.23 dictOut['params'] = ['MaxEvents']
632     dictOut['args'] = self.list_of_args
633 spiga 1.3 dictOut['jobDestination'] = jobDestination
634 spiga 1.1 dictOut['njobs']=self.total_number_of_jobs
635     return dictOut
636    
637 ewv 1.22
638     def jobSplittingByLumi(self):
639 spiga 1.1 """
640 ewv 1.26 Split task into jobs by Lumi section paying attention to which
641     lumis should be run (according to the analysis dataset).
642     This uses WMBS job splitting which does not split files over jobs
643     so the job will have AT LEAST as many lumis as requested, perhaps
644     more
645 spiga 1.1 """
646 ewv 1.26
647     common.logger.debug('Splitting by Lumi')
648 ewv 1.27 self.checkLumiSettings()
649 ewv 1.26
650     blockSites = self.args['blockSites']
651     pubdata = self.args['pubdata']
652    
653     lumisPerFile = pubdata.getLumis()
654    
655     # Make the list of WMBS files for job splitter
656     fileList = pubdata.getListFiles()
657     thefiles = Fileset(name='FilesToSplit')
658     for jobFile in fileList:
659     block = jobFile['Block']['Name']
660     try:
661     jobFile['Block']['StorageElementList'].extend(blockSites[block])
662     except:
663     continue
664     wmbsFile = File(jobFile['LogicalFileName'])
665     [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
666     wmbsFile['block'] = block
667     for lumi in lumisPerFile[jobFile['LogicalFileName']]:
668     wmbsFile.addRun(Run(lumi[0], lumi[1]))
669     thefiles.addFile(wmbsFile)
670    
671 ewv 1.27 # Create the factory and workflow
672 ewv 1.26 work = Workflow()
673     subs = Subscription(fileset = thefiles, workflow = work,
674     split_algo = 'LumiBased', type = "Processing")
675     splitter = SplitterFactory()
676     jobFactory = splitter(subs)
677    
678     list_of_lists = []
679     jobDestination = []
680     jobCount = 0
681 ewv 1.27 lumisCreated = 0
682    
683     if not self.limitJobLumis:
684     self.lumisPerJob = pubdata.getMaxLumis() // self.theNumberOfJobs + 1
685     common.logger.info('Each job will process about %s lumis.' %
686     self.lumisPerJob)
687    
688     for jobGroup in jobFactory(lumis_per_job = self.lumisPerJob):
689 ewv 1.26 for job in jobGroup.jobs:
690 ewv 1.27 if (self.limitNJobs and jobCount >= self.theNumberOfJobs):
691     common.logger.info('Limit on number of jobs reached.')
692     break
693     if (self.limitTotalLumis and lumisCreated >= self.totalNLumis):
694     common.logger.info('Limit on number of lumis reached.')
695     break
696     lumis = []
697     lfns = []
698     locations = []
699     firstFile = True
700     # Collect information from all the files
701     for jobFile in job.getFiles():
702     if firstFile: # Get locations from first file in the job
703     for loc in jobFile['locations']:
704     locations.append(loc)
705     firstFile = False
706     # Accumulate Lumis from all files
707     for lumiList in jobFile['runs']:
708     theRun = lumiList.run
709     for theLumi in list(lumiList):
710     lumis.append( (theRun, theLumi) )
711    
712     lfns.append(jobFile['lfn'])
713     fileString = ','.join(lfns)
714 spiga 1.33 lumiLister = LumiList(lumis = lumis)
715     lumiString = lumiLister.getCMSSWString()
716 ewv 1.27 list_of_lists.append([fileString, str(-1), str(0), lumiString])
717    
718     jobDestination.append(locations)
719     jobCount += 1
720     lumisCreated += len(lumis)
721     common.logger.debug('Job %s will run on %s files and %s lumis '
722     % (jobCount, len(lfns), len(lumis) ))
723    
724     common.logger.info('%s jobs created to run on %s lumis' %
725     (jobCount, lumisCreated))
726 ewv 1.26
727     # Prepare dict output matching back to non-WMBS job creation
728     dictOut = {}
729     dictOut['params'] = ['InputFiles', 'MaxEvents', 'SkipEvents', 'Lumis']
730     dictOut['args'] = list_of_lists
731     dictOut['jobDestination'] = jobDestination
732     dictOut['njobs'] = jobCount
733    
734     return dictOut
735    
736    
737 spiga 1.1 def Algos(self):
738     """
739     Define key splittingType matrix
740     """
741 ewv 1.22 SplitAlogs = {
742     'EventBased' : self.jobSplittingByEvent,
743 spiga 1.1 'RunBased' : self.jobSplittingByRun,
744 ewv 1.22 'LumiBased' : self.jobSplittingByLumi,
745     'NoInput' : self.jobSplittingNoInput,
746 spiga 1.1 'ForScript' : self.jobSplittingForScript
747 ewv 1.22 }
748 spiga 1.1 return SplitAlogs
749