ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Splitter.py
Revision: 1.32
Committed: Thu Jan 14 16:52:15 2010 UTC (15 years, 3 months ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_7_1, CRAB_2_7_1_pre12, CRAB_2_7_1_pre11, CRAB_2_7_1_pre10, CRAB_2_7_1_pre9, CRAB_2_7_1_pre8, CRAB_2_7_1_pre6, CRAB_2_7_1_pre5, CRAB_2_7_1_wmbs_pre4, CRAB_2_7_1_pre4, CRAB_2_7_1_pre3
Branch point for: CRAB_2_7_1_branch, LumiMask
Changes since 1.31: +12 -3 lines
Log Message:
improve error msg

File Contents

# User Rev Content
1 ewv 1.27
2 spiga 1.32 __revision__ = "$Id: Splitter.py,v 1.31 2009/12/15 13:19:13 spiga Exp $"
3     __version__ = "$Revision: 1.31 $"
4 ewv 1.27
5 spiga 1.1 import common
6     from crab_exceptions import *
7     from crab_util import *
8 ewv 1.26
9     from WMCore.DataStructs.File import File
10     from WMCore.DataStructs.Fileset import Fileset
11     from WMCore.DataStructs.Run import Run
12     from WMCore.DataStructs.Subscription import Subscription
13     from WMCore.DataStructs.Workflow import Workflow
14     from WMCore.JobSplitting.SplitterFactory import SplitterFactory
15 spiga 1.1 from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
16    
17     class JobSplitter:
18     def __init__( self, cfg_params, args ):
19     self.cfg_params = cfg_params
20 spiga 1.3 self.args=args
21 ewv 1.27
22     self.lumisPerJob = -1
23     self.totalNLumis = 0
24     self.theNumberOfJobs = 0
25     self.limitNJobs = False
26     self.limitTotalLumis = False
27     self.limitJobLumis = False
28    
29 spiga 1.1 #self.maxEvents
30     # init BlackWhiteListParser
31 spiga 1.24 self.seWhiteList = cfg_params.get('GRID.se_white_list',[])
32 spiga 1.14 seBlackList = cfg_params.get('GRID.se_black_list',[])
33 spiga 1.24 self.blackWhiteListParser = SEBlackWhiteListParser(self.seWhiteList, seBlackList, common.logger())
34 spiga 1.1
35    
36     def checkUserSettings(self):
37     ## Events per job
38     if self.cfg_params.has_key('CMSSW.events_per_job'):
39     self.eventsPerJob =int( self.cfg_params['CMSSW.events_per_job'])
40     self.selectEventsPerJob = 1
41     else:
42     self.eventsPerJob = -1
43     self.selectEventsPerJob = 0
44    
45     ## number of jobs
46     if self.cfg_params.has_key('CMSSW.number_of_jobs'):
47     self.theNumberOfJobs =int( self.cfg_params['CMSSW.number_of_jobs'])
48     self.selectNumberOfJobs = 1
49     else:
50     self.theNumberOfJobs = 0
51     self.selectNumberOfJobs = 0
52    
53     if self.cfg_params.has_key('CMSSW.total_number_of_events'):
54     self.total_number_of_events = int(self.cfg_params['CMSSW.total_number_of_events'])
55     self.selectTotalNumberEvents = 1
56     if self.selectNumberOfJobs == 1:
57     if (self.total_number_of_events != -1) and int(self.total_number_of_events) < int(self.theNumberOfJobs):
58     msg = 'Must specify at least one event per job. total_number_of_events > number_of_jobs '
59     raise CrabException(msg)
60     else:
61     self.total_number_of_events = 0
62     self.selectTotalNumberEvents = 0
63    
64    
65 ewv 1.27 def checkLumiSettings(self):
66     """
67     Check to make sure the user has specified enough information to
68     perform splitting by Lumis to run the job
69     """
70     settings = 0
71     if self.cfg_params.has_key('CMSSW.lumis_per_job'):
72     self.lumisPerJob =int( self.cfg_params['CMSSW.lumis_per_job'])
73     self.limitJobLumis = True
74     settings += 1
75    
76     if self.cfg_params.has_key('CMSSW.number_of_jobs'):
77     self.theNumberOfJobs =int( self.cfg_params['CMSSW.number_of_jobs'])
78     self.limitNJobs = True
79     settings += 1
80    
81     if self.cfg_params.has_key('CMSSW.total_number_of_lumis'):
82     self.totalNLumis = int(self.cfg_params['CMSSW.total_number_of_lumis'])
83     self.limitTotalLumis = (self.totalNLumis != -1)
84     settings += 1
85    
86     if settings != 2:
87     msg = 'When running on analysis datasets you must specify two and only two of:\n'
88 ewv 1.28 msg += ' number_of_jobs, lumis_per_job, total_number_of_lumis'
89 ewv 1.27 raise CrabException(msg)
90     if self.limitNJobs and self.limitJobLumis:
91     self.limitTotalLumis = True
92     self.totalNLumis = self.lumisPerJob * self.theNumberOfJobs
93    
94    
95 spiga 1.24 def ComputeSubBlockSites( self, blockSites ):
96     """
97     """
98     sub_blockSites = {}
99     for k,v in blockSites.iteritems():
100     sites=self.blackWhiteListParser.checkWhiteList(v)
101     if sites : sub_blockSites[k]=v
102     if len(sub_blockSites) < 1:
103     msg = 'WARNING: the sites %s is not hosting any part of data.'%self.seWhiteList
104     raise CrabException(msg)
105     return sub_blockSites
106    
107 spiga 1.1 ########################################################################
108     def jobSplittingByEvent( self ):
109     """
110     Perform job splitting. Jobs run over an integer number of files
111     and no more than one block.
112     ARGUMENT: blockSites: dictionary with blocks as keys and list of host sites as values
113     REQUIRES: self.selectTotalNumberEvents, self.selectEventsPerJob, self.selectNumberofJobs,
114     self.total_number_of_events, self.eventsPerJob, self.theNumberOfJobs,
115     self.maxEvents, self.filesbyblock
116 spiga 1.3 SETS: jobDestination - Site destination(s) for each job (a list of lists)
117 spiga 1.1 self.total_number_of_jobs - Total # of jobs
118     self.list_of_args - File(s) job will run on (a list of lists)
119     """
120    
121 ewv 1.22 jobDestination=[]
122 spiga 1.1 self.checkUserSettings()
123     if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
124     msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
125     raise CrabException(msg)
126 ewv 1.22
127     blockSites = self.args['blockSites']
128 spiga 1.4 pubdata = self.args['pubdata']
129 spiga 1.3 filesbyblock=pubdata.getFiles()
130    
131     self.eventsbyblock=pubdata.getEventsPerBlock()
132     self.eventsbyfile=pubdata.getEventsPerFile()
133     self.parentFiles=pubdata.getParent()
134 spiga 1.1
135     ## get max number of events
136 spiga 1.3 self.maxEvents=pubdata.getMaxEvents()
137 spiga 1.1
138     self.useParent = int(self.cfg_params.get('CMSSW.use_parent',0))
139     noBboundary = int(self.cfg_params.get('CMSSW.no_block_boundary',0))
140    
141 spiga 1.24 if noBboundary == 1:
142     if self.total_number_of_events== -1:
143     msg = 'You are selecting no_block_boundary=1 which does not allow to set total_number_of_events=-1\n'
144 ewv 1.26 msg +='\tYou shoud get the number of event from DBS web interface and use it for your configuration.'
145     raise CrabException(msg)
146 spiga 1.25 if len(self.seWhiteList) == 0 or len(self.seWhiteList.split(',')) != 1:
147 spiga 1.24 msg = 'You are selecting no_block_boundary=1 which requires to choose one and only one site.\n'
148     msg += "\tPlease set se_white_list with the site's storage element name."
149 ewv 1.26 raise CrabException(msg)
150     blockSites = self.ComputeSubBlockSites(blockSites)
151 spiga 1.24
152 spiga 1.1 # ---- Handle the possible job splitting configurations ---- #
153     if (self.selectTotalNumberEvents):
154     totalEventsRequested = self.total_number_of_events
155     if (self.selectEventsPerJob):
156     eventsPerJobRequested = self.eventsPerJob
157     if (self.selectNumberOfJobs):
158     totalEventsRequested = self.theNumberOfJobs * self.eventsPerJob
159    
160     # If user requested all the events in the dataset
161     if (totalEventsRequested == -1):
162     eventsRemaining=self.maxEvents
163     # If user requested more events than are in the dataset
164     elif (totalEventsRequested > self.maxEvents):
165     eventsRemaining = self.maxEvents
166 spiga 1.13 common.logger.info("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
167 spiga 1.1 # If user requested less events than are in the dataset
168     else:
169     eventsRemaining = totalEventsRequested
170    
171     # If user requested more events per job than are in the dataset
172     if (self.selectEventsPerJob and eventsPerJobRequested > self.maxEvents):
173     eventsPerJobRequested = self.maxEvents
174    
175     # For user info at end
176     totalEventCount = 0
177    
178     if (self.selectTotalNumberEvents and self.selectNumberOfJobs):
179     eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs)
180    
181     if (self.selectNumberOfJobs):
182 spiga 1.13 common.logger.info("May not create the exact number_of_jobs requested.")
183 spiga 1.1
184     # old... to remove Daniele
185     totalNumberOfJobs = 999999999
186    
187 spiga 1.3 blocks = blockSites.keys()
188 spiga 1.1 blockCount = 0
189     # Backup variable in case self.maxEvents counted events in a non-included block
190     numBlocksInDataset = len(blocks)
191    
192     jobCount = 0
193     list_of_lists = []
194    
195     # list tracking which jobs are in which jobs belong to which block
196     jobsOfBlock = {}
197    
198     parString = ""
199 spiga 1.16 pString = ""
200 spiga 1.1 filesEventCount = 0
201 ewv 1.22 msg=''
202 spiga 1.1
203     # ---- Iterate over the blocks in the dataset until ---- #
204     # ---- we've met the requested total # of events ---- #
205     while ( (eventsRemaining > 0) and (blockCount < numBlocksInDataset) and (jobCount < totalNumberOfJobs)):
206     block = blocks[blockCount]
207     blockCount += 1
208     if block not in jobsOfBlock.keys() :
209     jobsOfBlock[block] = []
210    
211     if self.eventsbyblock.has_key(block) :
212     numEventsInBlock = self.eventsbyblock[block]
213 spiga 1.13 common.logger.debug('Events in Block File '+str(numEventsInBlock))
214 spiga 1.1
215 spiga 1.4 files = filesbyblock[block]
216 spiga 1.1 numFilesInBlock = len(files)
217     if (numFilesInBlock <= 0):
218     continue
219     fileCount = 0
220     if noBboundary == 0: # DD
221     # ---- New block => New job ---- #
222     parString = ""
223 spiga 1.16 pString=""
224 spiga 1.1 # counter for number of events in files currently worked on
225     filesEventCount = 0
226     # flag if next while loop should touch new file
227     newFile = 1
228     # job event counter
229     jobSkipEventCount = 0
230    
231     # ---- Iterate over the files in the block until we've met the requested ---- #
232     # ---- total # of events or we've gone over all the files in this block ---- #
233 spiga 1.15 msg='\n'
234 spiga 1.1 while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
235     file = files[fileCount]
236     if self.useParent==1:
237     parent = self.parentFiles[file]
238 spiga 1.13 common.logger.log(10-1, "File "+str(file)+" has the following parents: "+str(parent))
239 spiga 1.1 if newFile :
240     try:
241     numEventsInFile = self.eventsbyfile[file]
242 spiga 1.13 common.logger.log(10-1, "File "+str(file)+" has "+str(numEventsInFile)+" events")
243 spiga 1.1 # increase filesEventCount
244     filesEventCount += numEventsInFile
245     # Add file to current job
246 spiga 1.11 parString += file + ','
247 spiga 1.16 if self.useParent==1:
248     for f in parent :
249     pString += f + ','
250 spiga 1.1 newFile = 0
251     except KeyError:
252 spiga 1.13 common.logger.info("File "+str(file)+" has unknown number of events: skipping")
253 spiga 1.1
254     eventsPerJobRequested = min(eventsPerJobRequested, eventsRemaining)
255     # if less events in file remain than eventsPerJobRequested
256     if ( filesEventCount - jobSkipEventCount < eventsPerJobRequested):
257     if noBboundary == 1: ## DD
258     newFile = 1
259     fileCount += 1
260     else:
261     # if last file in block
262     if ( fileCount == numFilesInBlock-1 ) :
263     # end job using last file, use remaining events in block
264     # close job and touch new file
265 spiga 1.11 fullString = parString[:-1]
266 spiga 1.1 if self.useParent==1:
267 spiga 1.11 fullParentString = pString[:-1]
268 spiga 1.1 list_of_lists.append([fullString,fullParentString,str(-1),str(jobSkipEventCount)])
269     else:
270     list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
271 spiga 1.15 msg += "Job %s can run over %s events (last file in block).\n"%(str(jobCount+1), str(filesEventCount - jobSkipEventCount))
272 spiga 1.3 jobDestination.append(blockSites[block])
273 slacapra 1.20 msg += "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
274 spiga 1.1 # fill jobs of block dictionary
275     jobsOfBlock[block].append(jobCount+1)
276     # reset counter
277     jobCount = jobCount + 1
278     totalEventCount = totalEventCount + filesEventCount - jobSkipEventCount
279     eventsRemaining = eventsRemaining - filesEventCount + jobSkipEventCount
280     jobSkipEventCount = 0
281     # reset file
282     pString = ""
283     parString = ""
284     filesEventCount = 0
285     newFile = 1
286     fileCount += 1
287     else :
288     # go to next file
289     newFile = 1
290     fileCount += 1
291     # if events in file equal to eventsPerJobRequested
292     elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
293     # close job and touch new file
294 spiga 1.11 fullString = parString[:-1]
295 spiga 1.1 if self.useParent==1:
296 spiga 1.11 fullParentString = pString[:-1]
297 spiga 1.1 list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
298     else:
299     list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
300 spiga 1.15 msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
301 spiga 1.3 jobDestination.append(blockSites[block])
302 slacapra 1.20 msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
303 spiga 1.1 jobsOfBlock[block].append(jobCount+1)
304     # reset counter
305     jobCount = jobCount + 1
306     totalEventCount = totalEventCount + eventsPerJobRequested
307     eventsRemaining = eventsRemaining - eventsPerJobRequested
308     jobSkipEventCount = 0
309     # reset file
310     pString = ""
311     parString = ""
312     filesEventCount = 0
313     newFile = 1
314     fileCount += 1
315    
316     # if more events in file remain than eventsPerJobRequested
317     else :
318     # close job but don't touch new file
319 spiga 1.11 fullString = parString[:-1]
320 spiga 1.1 if self.useParent==1:
321 spiga 1.11 fullParentString = pString[:-1]
322 spiga 1.1 list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
323     else:
324     list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
325 spiga 1.15 msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
326 spiga 1.3 jobDestination.append(blockSites[block])
327 slacapra 1.20 msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
328 spiga 1.1 jobsOfBlock[block].append(jobCount+1)
329     # increase counter
330     jobCount = jobCount + 1
331     totalEventCount = totalEventCount + eventsPerJobRequested
332     eventsRemaining = eventsRemaining - eventsPerJobRequested
333     # calculate skip events for last file
334     # use filesEventCount (contains several files), jobSkipEventCount and eventsPerJobRequest
335     jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
336     # remove all but the last file
337     filesEventCount = self.eventsbyfile[file]
338 spiga 1.16 pString_tmp=''
339 spiga 1.1 if self.useParent==1:
340 spiga 1.16 for f in parent : pString_tmp += f + ','
341 ewv 1.22 pString = pString_tmp
342 spiga 1.11 parString = file + ','
343 spiga 1.1 pass # END if
344     pass # END while (iterate over files in the block)
345     pass # END while (iterate over blocks in the dataset)
346 spiga 1.15 common.logger.debug(msg)
347 spiga 1.1 self.ncjobs = self.total_number_of_jobs = jobCount
348     if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
349 spiga 1.13 common.logger.info("Could not run on all requested events because some blocks not hosted at allowed sites.")
350     common.logger.info(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
351 ewv 1.22
352 spiga 1.1 # skip check on block with no sites DD
353 spiga 1.5 if noBboundary == 0 : self.checkBlockNoSite(blocks,jobsOfBlock,blockSites)
354 spiga 1.1
355     # prepare dict output
356     dictOut = {}
357 spiga 1.11 dictOut['params']= ['InputFiles','MaxEvents','SkipEvents']
358     if self.useParent: dictOut['params']= ['InputFiles','ParentFiles','MaxEvents','SkipEvents']
359 spiga 1.1 dictOut['args'] = list_of_lists
360 spiga 1.3 dictOut['jobDestination'] = jobDestination
361 spiga 1.1 dictOut['njobs']=self.total_number_of_jobs
362    
363     return dictOut
364    
365     # keep trace of block with no sites to print a warning at the end
366    
367 ewv 1.22 def checkBlockNoSite(self,blocks,jobsOfBlock,blockSites):
368 spiga 1.1 # screen output
369     screenOutput = "List of jobs and available destination sites:\n\n"
370     noSiteBlock = []
371     bloskNoSite = []
372 spiga 1.10 allBlock = []
373 spiga 1.1
374     blockCounter = 0
375     for block in blocks:
376     if block in jobsOfBlock.keys() :
377     blockCounter += 1
378 spiga 1.10 allBlock.append( blockCounter )
379 slacapra 1.19 sites=self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],[block]),[block])
380 spiga 1.1 screenOutput += "Block %5i: jobs %20s: sites: %s\n" % (blockCounter,spanRanges(jobsOfBlock[block]),
381 slacapra 1.20 ', '.join(SE2CMS(sites)))
382 slacapra 1.19 if len(sites) == 0:
383 spiga 1.1 noSiteBlock.append( spanRanges(jobsOfBlock[block]) )
384     bloskNoSite.append( blockCounter )
385    
386 spiga 1.13 common.logger.info(screenOutput)
387 spiga 1.1 if len(noSiteBlock) > 0 and len(bloskNoSite) > 0:
388     msg = 'WARNING: No sites are hosting any part of data for block:\n '
389     virgola = ""
390     if len(bloskNoSite) > 1:
391     virgola = ","
392     for block in bloskNoSite:
393     msg += ' ' + str(block) + virgola
394 spiga 1.25 msg += '\n\t\tRelated jobs:\n '
395 spiga 1.1 virgola = ""
396     if len(noSiteBlock) > 1:
397     virgola = ","
398     for range_jobs in noSiteBlock:
399     msg += str(range_jobs) + virgola
400 spiga 1.25 msg += '\n\t\twill not be submitted and this block of data can not be analyzed!\n'
401 spiga 1.14 if self.cfg_params.has_key('GRID.se_white_list'):
402 spiga 1.25 msg += '\tWARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
403     msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
404     msg += '\tPlease check if the dataset is available at this site!)'
405 spiga 1.14 if self.cfg_params.has_key('GRID.ce_white_list'):
406 spiga 1.25 msg += '\tWARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
407     msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
408     msg += '\tPlease check if the dataset is available at this site!)\n'
409 spiga 1.1
410 spiga 1.13 common.logger.info(msg)
411 spiga 1.1
412 spiga 1.10 if bloskNoSite == allBlock:
413 spiga 1.32 msg += 'Requested jobs cannot be Created! \n'
414     if self.cfg_params.has_key('GRID.se_white_list'):
415     msg += '\tWARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
416     msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
417     msg += '\tPlease check if the dataset is available at this site!)'
418     if self.cfg_params.has_key('GRID.ce_white_list'):
419     msg += '\tWARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
420     msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
421     msg += '\tPlease check if the dataset is available at this site!)\n'
422     raise CrabException(msg)
423 spiga 1.10
424 spiga 1.1 return
425    
426    
427     ########################################################################
428 ewv 1.22 def jobSplittingByRun(self):
429 spiga 1.1 """
430     """
431    
432     self.checkUserSettings()
433 ewv 1.22 blockSites = self.args['blockSites']
434 spiga 1.4 pubdata = self.args['pubdata']
435 spiga 1.1
436     if self.selectNumberOfJobs == 0 :
437     self.theNumberOfJobs = 9999999
438     blocks = {}
439 ewv 1.22 runList = []
440 spiga 1.1 thefiles = Fileset(name='FilesToSplit')
441 spiga 1.3 fileList = pubdata.getListFiles()
442 spiga 1.1 for f in fileList:
443     block = f['Block']['Name']
444 ewv 1.22 try:
445 spiga 1.3 f['Block']['StorageElementList'].extend(blockSites[block])
446 spiga 1.1 except:
447     continue
448     wmbsFile = File(f['LogicalFileName'])
449 spiga 1.3 [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
450 spiga 1.1 wmbsFile['block'] = block
451     runNum = f['RunsList'][0]['RunNumber']
452 ewv 1.22 runList.append(runNum)
453 spiga 1.1 myRun = Run(runNumber=runNum)
454     wmbsFile.addRun( myRun )
455     thefiles.addFile(
456     wmbsFile
457     )
458 ewv 1.22
459 spiga 1.1 work = Workflow()
460     subs = Subscription(
461     fileset = thefiles,
462     workflow = work,
463     split_algo = 'RunBased',
464     type = "Processing")
465     splitter = SplitterFactory()
466     jobfactory = splitter(subs)
467 ewv 1.22
468     #loop over all runs
469 spiga 1.1 list_of_lists = []
470     jobDestination = []
471     count = 0
472 spiga 1.17 for jobGroup in jobfactory():
473 spiga 1.1 if count < self.theNumberOfJobs:
474 spiga 1.17 res = self.getJobInfo(jobGroup)
475 ewv 1.22 parString = ''
476 spiga 1.1 for file in res['lfns']:
477 spiga 1.11 parString += file + ','
478     fullString = parString[:-1]
479 ewv 1.22 list_of_lists.append([fullString,str(-1),str(0)])
480 spiga 1.2 #need to check single file location
481 ewv 1.22 jobDestination.append(res['locations'])
482 spiga 1.1 count +=1
483     # prepare dict output
484     dictOut = {}
485 spiga 1.11 dictOut['params']= ['InputFiles','MaxEvents','SkipEvents']
486 spiga 1.1 dictOut['args'] = list_of_lists
487     dictOut['jobDestination'] = jobDestination
488     dictOut['njobs']=count
489    
490     return dictOut
491    
492     def getJobInfo( self,jobGroup ):
493     res = {}
494 ewv 1.22 lfns = []
495     locations = []
496 spiga 1.1 tmp_check=0
497     for job in jobGroup.jobs:
498     for file in job.getFiles():
499 ewv 1.22 lfns.append(file['lfn'])
500 spiga 1.1 for loc in file['locations']:
501     if tmp_check < 1 :
502     locations.append(loc)
503 ewv 1.22 tmp_check = tmp_check + 1
504     ### qui va messo il check per la locations
505     res['lfns'] = lfns
506     res['locations'] = locations
507     return res
508    
509 spiga 1.1 ########################################################################
510 spiga 1.23 def prepareSplittingNoInput(self):
511 spiga 1.1 """
512     """
513     if (self.selectEventsPerJob):
514 spiga 1.13 common.logger.info('Required '+str(self.eventsPerJob)+' events per job ')
515 spiga 1.1 if (self.selectNumberOfJobs):
516 spiga 1.13 common.logger.info('Required '+str(self.theNumberOfJobs)+' jobs in total ')
517 spiga 1.1 if (self.selectTotalNumberEvents):
518 spiga 1.13 common.logger.info('Required '+str(self.total_number_of_events)+' events in total ')
519 spiga 1.1
520     if (self.total_number_of_events < 0):
521     msg='Cannot split jobs per Events with "-1" as total number of events'
522     raise CrabException(msg)
523    
524     if (self.selectEventsPerJob):
525     if (self.selectTotalNumberEvents):
526     self.total_number_of_jobs = int(self.total_number_of_events/self.eventsPerJob)
527     elif(self.selectNumberOfJobs) :
528     self.total_number_of_jobs =self.theNumberOfJobs
529     self.total_number_of_events =int(self.theNumberOfJobs*self.eventsPerJob)
530    
531     elif (self.selectNumberOfJobs) :
532     self.total_number_of_jobs = self.theNumberOfJobs
533     self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
534    
535 spiga 1.23
536     def jobSplittingNoInput(self):
537     """
538     Perform job splitting based on number of event per job
539     """
540     common.logger.debug('Splitting per events')
541     self.checkUserSettings()
542     jobDestination=[]
543     if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
544     msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
545     raise CrabException(msg)
546    
547     managedGenerators =self.args['managedGenerators']
548     generator = self.args['generator']
549 ewv 1.30 firstLumi = self.cfg_params.get('CMSSW.first_lumi', 1)
550 spiga 1.23
551     self.prepareSplittingNoInput()
552    
553 spiga 1.13 common.logger.debug('N jobs '+str(self.total_number_of_jobs))
554 spiga 1.1
555     # is there any remainder?
556     check = int(self.total_number_of_events) - (int(self.total_number_of_jobs)*self.eventsPerJob)
557    
558 spiga 1.13 common.logger.debug('Check '+str(check))
559 spiga 1.1
560 spiga 1.13 common.logger.info(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
561 spiga 1.1 if check > 0:
562 spiga 1.13 common.logger.info('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
563 spiga 1.1
564     # argument is seed number.$i
565     self.list_of_args = []
566     for i in range(self.total_number_of_jobs):
567     ## Since there is no input, any site is good
568 ewv 1.29 jobDestination.append([""]) # must be empty to correctly write the XML
569 spiga 1.1 args=[]
570 ewv 1.30 if (firstLumi): # Pythia first lumi
571     args.append(str(int(firstLumi)+i))
572 spiga 1.3 if (generator in managedGenerators):
573 ewv 1.22 args.append(generator)
574     if (generator == 'comphep' and i == 0):
575 spiga 1.1 # COMPHEP is brain-dead and wants event #'s like 1,100,200,300
576     args.append('1')
577 ewv 1.22 else:
578 spiga 1.1 args.append(str(i*self.eventsPerJob))
579 spiga 1.7 args.append(str(self.eventsPerJob))
580 spiga 1.1 self.list_of_args.append(args)
581     # prepare dict output
582 spiga 1.11
583 spiga 1.1 dictOut = {}
584 spiga 1.11 dictOut['params'] = ['MaxEvents']
585 ewv 1.30 if (firstLumi):
586     dictOut['params'] = ['FirstLumi','MaxEvents']
587     if (generator in managedGenerators):
588     dictOut['params'] = ['FirstLumi', 'Generator', 'FirstEvent', 'MaxEvents']
589 ewv 1.22 else:
590     if (generator in managedGenerators) :
591     dictOut['params'] = ['Generator', 'FirstEvent', 'MaxEvents']
592 spiga 1.1 dictOut['args'] = self.list_of_args
593 spiga 1.3 dictOut['jobDestination'] = jobDestination
594 spiga 1.1 dictOut['njobs']=self.total_number_of_jobs
595    
596     return dictOut
597    
598    
599     def jobSplittingForScript(self):
600     """
601     Perform job splitting based on number of job
602     """
603     self.checkUserSettings()
604 spiga 1.3 if (self.selectNumberOfJobs == 0):
605 spiga 1.1 msg = 'must specify number_of_jobs.'
606     raise crabexception(msg)
607 spiga 1.3 jobDestination = []
608 spiga 1.13 common.logger.debug('Splitting per job')
609     common.logger.info('Required '+str(self.theNumberOfJobs)+' jobs in total ')
610 spiga 1.1
611 spiga 1.23 # self.total_number_of_jobs = self.theNumberOfJobs
612    
613     self.prepareSplittingNoInput()
614 spiga 1.1
615 spiga 1.13 common.logger.debug('N jobs '+str(self.total_number_of_jobs))
616 spiga 1.1
617 spiga 1.13 common.logger.info(str(self.total_number_of_jobs)+' jobs can be created')
618 spiga 1.1
619     # argument is seed number.$i
620 spiga 1.23 self.list_of_args = []
621 spiga 1.1 for i in range(self.total_number_of_jobs):
622 spiga 1.23 args=[]
623 spiga 1.3 jobDestination.append([""])
624 spiga 1.23 if self.eventsPerJob != 0 :
625     args.append(str(self.eventsPerJob))
626     self.list_of_args.append(args)
627 spiga 1.1
628     # prepare dict output
629     dictOut = {}
630 spiga 1.23 dictOut['params'] = ['MaxEvents']
631     dictOut['args'] = self.list_of_args
632 spiga 1.3 dictOut['jobDestination'] = jobDestination
633 spiga 1.1 dictOut['njobs']=self.total_number_of_jobs
634     return dictOut
635    
636 ewv 1.22
637     def jobSplittingByLumi(self):
638 spiga 1.1 """
639 ewv 1.26 Split task into jobs by Lumi section paying attention to which
640     lumis should be run (according to the analysis dataset).
641     This uses WMBS job splitting which does not split files over jobs
642     so the job will have AT LEAST as many lumis as requested, perhaps
643     more
644 spiga 1.1 """
645 ewv 1.26
646     common.logger.debug('Splitting by Lumi')
647 ewv 1.27 self.checkLumiSettings()
648 ewv 1.26
649     blockSites = self.args['blockSites']
650     pubdata = self.args['pubdata']
651    
652     lumisPerFile = pubdata.getLumis()
653    
654     # Make the list of WMBS files for job splitter
655     fileList = pubdata.getListFiles()
656     thefiles = Fileset(name='FilesToSplit')
657     for jobFile in fileList:
658     block = jobFile['Block']['Name']
659     try:
660     jobFile['Block']['StorageElementList'].extend(blockSites[block])
661     except:
662     continue
663     wmbsFile = File(jobFile['LogicalFileName'])
664     [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
665     wmbsFile['block'] = block
666     for lumi in lumisPerFile[jobFile['LogicalFileName']]:
667     wmbsFile.addRun(Run(lumi[0], lumi[1]))
668     thefiles.addFile(wmbsFile)
669    
670 ewv 1.27 # Create the factory and workflow
671 ewv 1.26 work = Workflow()
672     subs = Subscription(fileset = thefiles, workflow = work,
673     split_algo = 'LumiBased', type = "Processing")
674     splitter = SplitterFactory()
675     jobFactory = splitter(subs)
676    
677     list_of_lists = []
678     jobDestination = []
679     jobCount = 0
680 ewv 1.27 lumisCreated = 0
681    
682     if not self.limitJobLumis:
683     self.lumisPerJob = pubdata.getMaxLumis() // self.theNumberOfJobs + 1
684     common.logger.info('Each job will process about %s lumis.' %
685     self.lumisPerJob)
686    
687     for jobGroup in jobFactory(lumis_per_job = self.lumisPerJob):
688 ewv 1.26 for job in jobGroup.jobs:
689 ewv 1.27 if (self.limitNJobs and jobCount >= self.theNumberOfJobs):
690     common.logger.info('Limit on number of jobs reached.')
691     break
692     if (self.limitTotalLumis and lumisCreated >= self.totalNLumis):
693     common.logger.info('Limit on number of lumis reached.')
694     break
695     lumis = []
696     lfns = []
697     locations = []
698     firstFile = True
699     # Collect information from all the files
700     for jobFile in job.getFiles():
701     if firstFile: # Get locations from first file in the job
702     for loc in jobFile['locations']:
703     locations.append(loc)
704     firstFile = False
705     # Accumulate Lumis from all files
706     for lumiList in jobFile['runs']:
707     theRun = lumiList.run
708     for theLumi in list(lumiList):
709     lumis.append( (theRun, theLumi) )
710    
711     lfns.append(jobFile['lfn'])
712     fileString = ','.join(lfns)
713     lumiString = compressLumiString(lumis)
714     list_of_lists.append([fileString, str(-1), str(0), lumiString])
715    
716     jobDestination.append(locations)
717     jobCount += 1
718     lumisCreated += len(lumis)
719     common.logger.debug('Job %s will run on %s files and %s lumis '
720     % (jobCount, len(lfns), len(lumis) ))
721    
722     common.logger.info('%s jobs created to run on %s lumis' %
723     (jobCount, lumisCreated))
724 ewv 1.26
725     # Prepare dict output matching back to non-WMBS job creation
726     dictOut = {}
727     dictOut['params'] = ['InputFiles', 'MaxEvents', 'SkipEvents', 'Lumis']
728     dictOut['args'] = list_of_lists
729     dictOut['jobDestination'] = jobDestination
730     dictOut['njobs'] = jobCount
731    
732     return dictOut
733    
734    
735 spiga 1.1 def Algos(self):
736     """
737     Define key splittingType matrix
738     """
739 ewv 1.22 SplitAlogs = {
740     'EventBased' : self.jobSplittingByEvent,
741 spiga 1.1 'RunBased' : self.jobSplittingByRun,
742 ewv 1.22 'LumiBased' : self.jobSplittingByLumi,
743     'NoInput' : self.jobSplittingNoInput,
744 spiga 1.1 'ForScript' : self.jobSplittingForScript
745 ewv 1.22 }
746 spiga 1.1 return SplitAlogs
747    
748 ewv 1.26
749    
750     def compressLumiString(lumis):
751     """
752     Turn a list of 2-tuples of run/lumi numbers into a list of the format
753     R1:L1,R2:L2-R3:L3 which is acceptable to CMSSW LumiBlockRange variable
754     """
755    
756     lumis.sort()
757     parts = []
758     startRange = None
759     endRange = None
760    
761     for lumiBlock in lumis:
762     if not startRange: # This is the first one
763     startRange = lumiBlock
764     endRange = lumiBlock
765     elif lumiBlock == endRange: # Same Lumi (different files?)
766     pass
767     elif lumiBlock[0] == endRange[0] and lumiBlock[1] == endRange[1] + 1: # This is a continuation
768     endRange = lumiBlock
769     else: # This is the start of a new range
770     part = ':'.join(map(str, startRange))
771     if startRange != endRange:
772     part += '-' + ':'.join(map(str, endRange))
773     parts.append(part)
774     startRange = lumiBlock
775     endRange = lumiBlock
776    
777     # Put out what's left
778     if startRange:
779     part = ':'.join(map(str, startRange))
780     if startRange != endRange:
781     part += '-' + ':'.join(map(str, endRange))
782     parts.append(part)
783    
784     output = ','.join(parts)
785     return output
786    
787