ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Splitter.py
(Generate patch)

Comparing COMP/CRAB/python/Splitter.py (file contents):
Revision 1.13 by spiga, Tue May 26 10:23:01 2009 UTC vs.
Revision 1.32 by spiga, Thu Jan 14 16:52:15 2010 UTC

# Line 1 | Line 1
1 +
2 + __revision__ = "$Id$"
3 + __version__ = "$Revision$"
4 +
5   import common
6   from crab_exceptions import *
7   from crab_util import *
8 +
9 + from WMCore.DataStructs.File import File
10 + from WMCore.DataStructs.Fileset import Fileset
11 + from WMCore.DataStructs.Run import Run
12 + from WMCore.DataStructs.Subscription import Subscription
13 + from WMCore.DataStructs.Workflow import Workflow
14 + from WMCore.JobSplitting.SplitterFactory import SplitterFactory
15   from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
16  
17   class JobSplitter:
18      def __init__( self, cfg_params,  args ):
19          self.cfg_params = cfg_params
20          self.args=args
21 +
22 +        self.lumisPerJob = -1
23 +        self.totalNLumis = 0
24 +        self.theNumberOfJobs = 0
25 +        self.limitNJobs = False
26 +        self.limitTotalLumis = False
27 +        self.limitJobLumis = False
28 +
29          #self.maxEvents
30          # init BlackWhiteListParser
31 <        seWhiteList = cfg_params.get('EDG.se_white_list',[])
32 <        seBlackList = cfg_params.get('EDG.se_black_list',[])
33 <        self.blackWhiteListParser = SEBlackWhiteListParser(seWhiteList, seBlackList, common.logger)
31 >        self.seWhiteList = cfg_params.get('GRID.se_white_list',[])
32 >        seBlackList = cfg_params.get('GRID.se_black_list',[])
33 >        self.blackWhiteListParser = SEBlackWhiteListParser(self.seWhiteList, seBlackList, common.logger())
34  
35  
36      def checkUserSettings(self):
# Line 43 | Line 62 | class JobSplitter:
62              self.selectTotalNumberEvents = 0
63  
64  
65 +    def checkLumiSettings(self):
66 +        """
67 +        Check to make sure the user has specified enough information to
68 +        perform splitting by Lumis to run the job
69 +        """
70 +        settings = 0
71 +        if self.cfg_params.has_key('CMSSW.lumis_per_job'):
72 +            self.lumisPerJob =int( self.cfg_params['CMSSW.lumis_per_job'])
73 +            self.limitJobLumis = True
74 +            settings += 1
75 +
76 +        if self.cfg_params.has_key('CMSSW.number_of_jobs'):
77 +            self.theNumberOfJobs =int( self.cfg_params['CMSSW.number_of_jobs'])
78 +            self.limitNJobs = True
79 +            settings += 1
80 +
81 +        if self.cfg_params.has_key('CMSSW.total_number_of_lumis'):
82 +            self.totalNLumis = int(self.cfg_params['CMSSW.total_number_of_lumis'])
83 +            self.limitTotalLumis = (self.totalNLumis != -1)
84 +            settings += 1
85 +
86 +        if settings != 2:
87 +            msg = 'When running on analysis datasets you must specify two and only two of:\n'
88 +            msg += '  number_of_jobs, lumis_per_job, total_number_of_lumis'
89 +            raise CrabException(msg)
90 +        if self.limitNJobs and self.limitJobLumis:
91 +            self.limitTotalLumis = True
92 +            self.totalNLumis = self.lumisPerJob * self.theNumberOfJobs
93 +
94 +
95 +    def ComputeSubBlockSites( self, blockSites ):
96 +        """
97 +        """
98 +        sub_blockSites = {}
99 +        for k,v in blockSites.iteritems():
100 +            sites=self.blackWhiteListParser.checkWhiteList(v)
101 +            if sites : sub_blockSites[k]=v
102 +        if len(sub_blockSites) < 1:
103 +            msg = 'WARNING: the sites %s is not hosting any part of data.'%self.seWhiteList
104 +            raise CrabException(msg)
105 +        return sub_blockSites
106 +
107   ########################################################################
108      def jobSplittingByEvent( self ):
109          """
# Line 57 | Line 118 | class JobSplitter:
118                self.list_of_args - File(s) job will run on (a list of lists)
119          """
120  
121 <        jobDestination=[]  
121 >        jobDestination=[]
122          self.checkUserSettings()
123          if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
124              msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
125              raise CrabException(msg)
126 <
127 <        blockSites = self.args['blockSites']
126 >
127 >        blockSites = self.args['blockSites']
128          pubdata = self.args['pubdata']
129          filesbyblock=pubdata.getFiles()
130  
# Line 77 | Line 138 | class JobSplitter:
138          self.useParent = int(self.cfg_params.get('CMSSW.use_parent',0))
139          noBboundary = int(self.cfg_params.get('CMSSW.no_block_boundary',0))
140  
141 +        if noBboundary == 1:
142 +            if self.total_number_of_events== -1:
143 +                msg = 'You are selecting no_block_boundary=1 which does not allow to set total_number_of_events=-1\n'
144 +                msg +='\tYou shoud get the number of event from DBS web interface and use it for your configuration.'
145 +                raise CrabException(msg)
146 +            if len(self.seWhiteList) == 0 or  len(self.seWhiteList.split(',')) != 1:
147 +                msg = 'You are selecting no_block_boundary=1 which requires to choose one and only one site.\n'
148 +                msg += "\tPlease set se_white_list with the site's storage element name."
149 +                raise  CrabException(msg)
150 +            blockSites = self.ComputeSubBlockSites(blockSites)
151 +
152          # ---- Handle the possible job splitting configurations ---- #
153          if (self.selectTotalNumberEvents):
154              totalEventsRequested = self.total_number_of_events
# Line 124 | Line 196 | class JobSplitter:
196          jobsOfBlock = {}
197  
198          parString = ""
199 +        pString = ""
200          filesEventCount = 0
201 +        msg=''
202  
203          # ---- Iterate over the blocks in the dataset until ---- #
204          # ---- we've met the requested total # of events    ---- #
# Line 146 | Line 220 | class JobSplitter:
220                  if noBboundary == 0: # DD
221                      # ---- New block => New job ---- #
222                      parString = ""
223 +                    pString=""
224                      # counter for number of events in files currently worked on
225                      filesEventCount = 0
226                  # flag if next while loop should touch new file
# Line 155 | Line 230 | class JobSplitter:
230  
231                  # ---- Iterate over the files in the block until we've met the requested ---- #
232                  # ---- total # of events or we've gone over all the files in this block  ---- #
233 <                pString=''
233 >                msg='\n'
234                  while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
235                      file = files[fileCount]
236                      if self.useParent==1:
237                          parent = self.parentFiles[file]
163                        for f in parent :
164                            pString +=  f + ','
238                          common.logger.log(10-1, "File "+str(file)+" has the following parents: "+str(parent))
239                      if newFile :
240                          try:
# Line 171 | Line 244 | class JobSplitter:
244                              filesEventCount += numEventsInFile
245                              # Add file to current job
246                              parString +=  file + ','
247 +                            if self.useParent==1:
248 +                                for f in parent :
249 +                                    pString += f  + ','
250                              newFile = 0
251                          except KeyError:
252                              common.logger.info("File "+str(file)+" has unknown number of events: skipping")
# Line 192 | Line 268 | class JobSplitter:
268                                      list_of_lists.append([fullString,fullParentString,str(-1),str(jobSkipEventCount)])
269                                  else:
270                                      list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
271 <                                common.logger.debug("Job "+str(jobCount+1)+" can run over "+str(filesEventCount - jobSkipEventCount)+" events (last file in block).")
271 >                                msg += "Job %s can run over %s  events (last file in block).\n"%(str(jobCount+1), str(filesEventCount - jobSkipEventCount))
272                                  jobDestination.append(blockSites[block])
273 <                                common.logger.debug("Job "+str(jobCount+1)+" Destination: "+str(jobDestination[jobCount]))
273 >                                msg += "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
274                                  # fill jobs of block dictionary
275                                  jobsOfBlock[block].append(jobCount+1)
276                                  # reset counter
# Line 221 | Line 297 | class JobSplitter:
297                              list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
298                          else:
299                              list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
300 <                        common.logger.debug("Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
300 >                        msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
301                          jobDestination.append(blockSites[block])
302 <                        common.logger.debug("Job "+str(jobCount+1)+" Destination: "+str(jobDestination[jobCount]))
302 >                        msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
303                          jobsOfBlock[block].append(jobCount+1)
304                          # reset counter
305                          jobCount = jobCount + 1
# Line 246 | Line 322 | class JobSplitter:
322                              list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
323                          else:
324                              list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
325 <                        common.logger.debug("Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
325 >                        msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
326                          jobDestination.append(blockSites[block])
327 <                        common.logger.debug("Job "+str(jobCount+1)+" Destination: "+str(jobDestination[jobCount]))
327 >                        msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
328                          jobsOfBlock[block].append(jobCount+1)
329                          # increase counter
330                          jobCount = jobCount + 1
# Line 259 | Line 335 | class JobSplitter:
335                          jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
336                          # remove all but the last file
337                          filesEventCount = self.eventsbyfile[file]
338 +                        pString_tmp=''
339                          if self.useParent==1:
340 <                            for f in parent : pString +=  f + ','
340 >                            for f in parent : pString_tmp +=  f + ','
341 >                        pString =  pString_tmp
342                          parString =  file + ','
343                      pass # END if
344                  pass # END while (iterate over files in the block)
345          pass # END while (iterate over blocks in the dataset)
346 +        common.logger.debug(msg)
347          self.ncjobs = self.total_number_of_jobs = jobCount
348          if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
349              common.logger.info("Could not run on all requested events because some blocks not hosted at allowed sites.")
350          common.logger.info(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
351 <
351 >
352          # skip check on  block with no sites  DD
353          if noBboundary == 0 : self.checkBlockNoSite(blocks,jobsOfBlock,blockSites)
354  
# Line 285 | Line 364 | class JobSplitter:
364  
365          # keep trace of block with no sites to print a warning at the end
366  
367 <    def checkBlockNoSite(self,blocks,jobsOfBlock,blockSites):  
367 >    def checkBlockNoSite(self,blocks,jobsOfBlock,blockSites):
368          # screen output
369          screenOutput = "List of jobs and available destination sites:\n\n"
370          noSiteBlock = []
# Line 297 | Line 376 | class JobSplitter:
376              if block in jobsOfBlock.keys() :
377                  blockCounter += 1
378                  allBlock.append( blockCounter )
379 +                sites=self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],[block]),[block])
380                  screenOutput += "Block %5i: jobs %20s: sites: %s\n" % (blockCounter,spanRanges(jobsOfBlock[block]),
381 <                    ','.join(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],[block]),[block])))
382 <                if len(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],[block]),[block])) == 0:
381 >                    ', '.join(SE2CMS(sites)))
382 >                if len(sites) == 0:
383                      noSiteBlock.append( spanRanges(jobsOfBlock[block]) )
384                      bloskNoSite.append( blockCounter )
385  
# Line 311 | Line 391 | class JobSplitter:
391                  virgola = ","
392              for block in bloskNoSite:
393                  msg += ' ' + str(block) + virgola
394 <            msg += '\n               Related jobs:\n                 '
394 >            msg += '\n\t\tRelated jobs:\n                 '
395              virgola = ""
396              if len(noSiteBlock) > 1:
397                  virgola = ","
398              for range_jobs in noSiteBlock:
399                  msg += str(range_jobs) + virgola
400 <            msg += '\n               will not be submitted and this block of data can not be analyzed!\n'
401 <            if self.cfg_params.has_key('EDG.se_white_list'):
402 <                msg += 'WARNING: SE White List: '+self.cfg_params['EDG.se_white_list']+'\n'
403 <                msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
404 <                msg += 'Please check if the dataset is available at this site!)\n'
405 <            if self.cfg_params.has_key('EDG.ce_white_list'):
406 <                msg += 'WARNING: CE White List: '+self.cfg_params['EDG.ce_white_list']+'\n'
407 <                msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
408 <                msg += 'Please check if the dataset is available at this site!)\n'
400 >            msg += '\n\t\twill not be submitted and this block of data can not be analyzed!\n'
401 >            if self.cfg_params.has_key('GRID.se_white_list'):
402 >                msg += '\tWARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
403 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
404 >                msg += '\tPlease check if the dataset is available at this site!)'
405 >            if self.cfg_params.has_key('GRID.ce_white_list'):
406 >                msg += '\tWARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
407 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
408 >                msg += '\tPlease check if the dataset is available at this site!)\n'
409  
410              common.logger.info(msg)
411  
412          if bloskNoSite == allBlock:
413 <            raise CrabException('No jobs created')
413 >            msg += 'Requested jobs cannot be Created! \n'
414 >            if self.cfg_params.has_key('GRID.se_white_list'):
415 >                msg += '\tWARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
416 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
417 >                msg += '\tPlease check if the dataset is available at this site!)'
418 >            if self.cfg_params.has_key('GRID.ce_white_list'):
419 >                msg += '\tWARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
420 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
421 >                msg += '\tPlease check if the dataset is available at this site!)\n'
422 >            raise CrabException(msg)
423  
424          return
425  
426  
427   ########################################################################
428 <    def jobSplittingByRun(self):
428 >    def jobSplittingByRun(self):
429          """
430          """
342        from sets import Set  
343        from WMCore.JobSplitting.RunBased import RunBased
344        from WMCore.DataStructs.Workflow import Workflow
345        from WMCore.DataStructs.File import File
346        from WMCore.DataStructs.Fileset import Fileset
347        from WMCore.DataStructs.Subscription import Subscription
348        from WMCore.JobSplitting.SplitterFactory import SplitterFactory
349        from WMCore.DataStructs.Run import Run
431  
432          self.checkUserSettings()
433 <        blockSites = self.args['blockSites']
433 >        blockSites = self.args['blockSites']
434          pubdata = self.args['pubdata']
435  
436          if self.selectNumberOfJobs == 0 :
437              self.theNumberOfJobs = 9999999
438          blocks = {}
439 <        runList = []
439 >        runList = []
440          thefiles = Fileset(name='FilesToSplit')
441          fileList = pubdata.getListFiles()
442          for f in fileList:
443              block = f['Block']['Name']
444 <            try:
444 >            try:
445                  f['Block']['StorageElementList'].extend(blockSites[block])
446              except:
447                  continue
# Line 368 | Line 449 | class JobSplitter:
449              [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
450              wmbsFile['block'] = block
451              runNum = f['RunsList'][0]['RunNumber']
452 <            runList.append(runNum)
452 >            runList.append(runNum)
453              myRun = Run(runNumber=runNum)
454              wmbsFile.addRun( myRun )
455              thefiles.addFile(
456                  wmbsFile
457                  )
458 <
458 >
459          work = Workflow()
460          subs = Subscription(
461          fileset = thefiles,
# Line 383 | Line 464 | class JobSplitter:
464          type = "Processing")
465          splitter = SplitterFactory()
466          jobfactory = splitter(subs)
467 <        
468 <        #loop over all runs
388 <        set = Set(runList)
467 >
468 >        #loop over all runs
469          list_of_lists = []
470          jobDestination = []
391
471          count = 0
472 <        for i in list(set):
472 >        for jobGroup in  jobfactory():
473              if count <  self.theNumberOfJobs:
474 <                res = self.getJobInfo(jobfactory())
475 <                parString = ''
474 >                res = self.getJobInfo(jobGroup)
475 >                parString = ''
476                  for file in res['lfns']:
477                      parString += file + ','
478                  fullString = parString[:-1]
479 <                list_of_lists.append([fullString,str(-1),str(0)])    
479 >                list_of_lists.append([fullString,str(-1),str(0)])
480                  #need to check single file location
481 <                jobDestination.append(res['locations'])  
481 >                jobDestination.append(res['locations'])
482                  count +=1
483         # prepare dict output
484          dictOut = {}
# Line 412 | Line 491 | class JobSplitter:
491  
492      def getJobInfo( self,jobGroup ):
493          res = {}
494 <        lfns = []        
495 <        locations = []        
494 >        lfns = []
495 >        locations = []
496          tmp_check=0
497          for job in jobGroup.jobs:
498              for file in job.getFiles():
499 <                lfns.append(file['lfn'])
499 >                lfns.append(file['lfn'])
500                  for loc in file['locations']:
501                      if tmp_check < 1 :
502                          locations.append(loc)
503 <                tmp_check = tmp_check + 1
504 <                ### qui va messo il check per la locations
505 <        res['lfns'] = lfns
506 <        res['locations'] = locations
507 <        return res                
508 <      
503 >                tmp_check = tmp_check + 1
504 >                ### qui va messo il check per la locations
505 >        res['lfns'] = lfns
506 >        res['locations'] = locations
507 >        return res
508 >
509   ########################################################################
510 <    def jobSplittingNoInput(self):
510 >    def prepareSplittingNoInput(self):
511          """
433        Perform job splitting based on number of event per job
512          """
435        common.logger.debug('Splitting per events')
436        self.checkUserSettings()
437        jobDestination=[]
438        if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
439            msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
440            raise CrabException(msg)
441
442        managedGenerators =self.args['managedGenerators']
443        generator = self.args['generator']
444        firstRun = self.cfg_params.get('CMSSW.first_run',None)
445
513          if (self.selectEventsPerJob):
514              common.logger.info('Required '+str(self.eventsPerJob)+' events per job ')
515          if (self.selectNumberOfJobs):
# Line 465 | Line 532 | class JobSplitter:
532              self.total_number_of_jobs = self.theNumberOfJobs
533              self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
534  
535 +
536 +    def jobSplittingNoInput(self):
537 +        """
538 +        Perform job splitting based on number of event per job
539 +        """
540 +        common.logger.debug('Splitting per events')
541 +        self.checkUserSettings()
542 +        jobDestination=[]
543 +        if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
544 +            msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
545 +            raise CrabException(msg)
546 +
547 +        managedGenerators =self.args['managedGenerators']
548 +        generator = self.args['generator']
549 +        firstLumi = self.cfg_params.get('CMSSW.first_lumi', 1)
550 +
551 +        self.prepareSplittingNoInput()
552 +
553          common.logger.debug('N jobs  '+str(self.total_number_of_jobs))
554  
555          # is there any remainder?
# Line 480 | Line 565 | class JobSplitter:
565          self.list_of_args = []
566          for i in range(self.total_number_of_jobs):
567              ## Since there is no input, any site is good
568 <            jobDestination.append([""]) #must be empty to write correctly the xml
568 >            jobDestination.append([""]) # must be empty to correctly write the XML
569              args=[]
570 <            if (firstRun):
571 <                ## pythia first run
487 <                args.append(str(firstRun)+str(i))
570 >            if (firstLumi): # Pythia first lumi
571 >                args.append(str(int(firstLumi)+i))
572              if (generator in managedGenerators):
573 <                if (generator == 'comphep' and i == 0):
573 >               args.append(generator)
574 >               if (generator == 'comphep' and i == 0):
575                      # COMPHEP is brain-dead and wants event #'s like 1,100,200,300
576                      args.append('1')
577 <                else:
577 >               else:
578                      args.append(str(i*self.eventsPerJob))
579              args.append(str(self.eventsPerJob))
580              self.list_of_args.append(args)
# Line 497 | Line 582 | class JobSplitter:
582  
583          dictOut = {}
584          dictOut['params'] = ['MaxEvents']
585 <        if (firstRun):
586 <            dictOut['params'] = ['FirstRun','MaxEvents']
587 <            if ( generator in managedGenerators ) : dictOut['params'] = ['FirstRun', 'FirstEvent', 'MaxEvents']
588 <        else:  
589 <            if (generator in managedGenerators) : dictOut['params'] = ['FirstEvent', 'MaxEvents']
585 >        if (firstLumi):
586 >            dictOut['params'] = ['FirstLumi','MaxEvents']
587 >            if (generator in managedGenerators):
588 >                dictOut['params'] = ['FirstLumi', 'Generator', 'FirstEvent', 'MaxEvents']
589 >        else:
590 >            if (generator in managedGenerators) :
591 >                dictOut['params'] = ['Generator', 'FirstEvent', 'MaxEvents']
592          dictOut['args'] = self.list_of_args
593          dictOut['jobDestination'] = jobDestination
594          dictOut['njobs']=self.total_number_of_jobs
# Line 521 | Line 608 | class JobSplitter:
608          common.logger.debug('Splitting per job')
609          common.logger.info('Required '+str(self.theNumberOfJobs)+' jobs in total ')
610  
611 <        self.total_number_of_jobs = self.theNumberOfJobs
611 > #        self.total_number_of_jobs = self.theNumberOfJobs
612 >
613 >        self.prepareSplittingNoInput()
614  
615          common.logger.debug('N jobs  '+str(self.total_number_of_jobs))
616  
617          common.logger.info(str(self.total_number_of_jobs)+' jobs can be created')
618  
619          # argument is seed number.$i
620 <        #self.list_of_args = []
620 >        self.list_of_args = []
621          for i in range(self.total_number_of_jobs):
622 +            args=[]
623              jobDestination.append([""])
624 <        #   self.list_of_args.append([str(i)])
624 >            if self.eventsPerJob != 0 :
625 >                args.append(str(self.eventsPerJob))
626 >                self.list_of_args.append(args)
627  
628         # prepare dict output
629          dictOut = {}
630 <        dictOut['args'] = [] # self.list_of_args
630 >        dictOut['params'] = ['MaxEvents']
631 >        dictOut['args'] =  self.list_of_args
632          dictOut['jobDestination'] = jobDestination
633          dictOut['njobs']=self.total_number_of_jobs
634          return dictOut
542
635  
636 <    def jobSplittingByLumi(self):
636 >
637 >    def jobSplittingByLumi(self):
638          """
639 +        Split task into jobs by Lumi section paying attention to which
640 +        lumis should be run (according to the analysis dataset).
641 +        This uses WMBS job splitting which does not split files over jobs
642 +        so the job will have AT LEAST as many lumis as requested, perhaps
643 +        more
644          """
645 <        return
645 >
646 >        common.logger.debug('Splitting by Lumi')
647 >        self.checkLumiSettings()
648 >
649 >        blockSites = self.args['blockSites']
650 >        pubdata = self.args['pubdata']
651 >
652 >        lumisPerFile  = pubdata.getLumis()
653 >
654 >        # Make the list of WMBS files for job splitter
655 >        fileList = pubdata.getListFiles()
656 >        thefiles = Fileset(name='FilesToSplit')
657 >        for jobFile in fileList:
658 >            block = jobFile['Block']['Name']
659 >            try:
660 >                jobFile['Block']['StorageElementList'].extend(blockSites[block])
661 >            except:
662 >                continue
663 >            wmbsFile = File(jobFile['LogicalFileName'])
664 >            [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
665 >            wmbsFile['block'] = block
666 >            for lumi in lumisPerFile[jobFile['LogicalFileName']]:
667 >                wmbsFile.addRun(Run(lumi[0], lumi[1]))
668 >            thefiles.addFile(wmbsFile)
669 >
670 >        # Create the factory and workflow
671 >        work = Workflow()
672 >        subs = Subscription(fileset    = thefiles,    workflow = work,
673 >                            split_algo = 'LumiBased', type     = "Processing")
674 >        splitter = SplitterFactory()
675 >        jobFactory = splitter(subs)
676 >
677 >        list_of_lists = []
678 >        jobDestination = []
679 >        jobCount = 0
680 >        lumisCreated = 0
681 >
682 >        if not self.limitJobLumis:
683 >            self.lumisPerJob = pubdata.getMaxLumis() // self.theNumberOfJobs + 1
684 >            common.logger.info('Each job will process about %s lumis.' %
685 >                                self.lumisPerJob)
686 >
687 >        for jobGroup in  jobFactory(lumis_per_job = self.lumisPerJob):
688 >            for job in jobGroup.jobs:
689 >                if (self.limitNJobs and jobCount >= self.theNumberOfJobs):
690 >                    common.logger.info('Limit on number of jobs reached.')
691 >                    break
692 >                if (self.limitTotalLumis and lumisCreated >= self.totalNLumis):
693 >                    common.logger.info('Limit on number of lumis reached.')
694 >                    break
695 >                lumis = []
696 >                lfns  = []
697 >                locations = []
698 >                firstFile = True
699 >                # Collect information from all the files
700 >                for jobFile in job.getFiles():
701 >                    if firstFile:  # Get locations from first file in the job
702 >                        for loc in jobFile['locations']:
703 >                            locations.append(loc)
704 >                        firstFile = False
705 >                    # Accumulate Lumis from all files
706 >                    for lumiList in jobFile['runs']:
707 >                        theRun = lumiList.run
708 >                        for theLumi in list(lumiList):
709 >                            lumis.append( (theRun, theLumi) )
710 >
711 >                    lfns.append(jobFile['lfn'])
712 >                fileString = ','.join(lfns)
713 >                lumiString = compressLumiString(lumis)
714 >                list_of_lists.append([fileString, str(-1), str(0), lumiString])
715 >
716 >                jobDestination.append(locations)
717 >                jobCount += 1
718 >                lumisCreated += len(lumis)
719 >                common.logger.debug('Job %s will run on %s files and %s lumis '
720 >                    % (jobCount, len(lfns), len(lumis) ))
721 >
722 >        common.logger.info('%s jobs created to run on %s lumis' %
723 >                              (jobCount, lumisCreated))
724 >
725 >        # Prepare dict output matching back to non-WMBS job creation
726 >        dictOut = {}
727 >        dictOut['params'] = ['InputFiles', 'MaxEvents', 'SkipEvents', 'Lumis']
728 >        dictOut['args'] = list_of_lists
729 >        dictOut['jobDestination'] = jobDestination
730 >        dictOut['njobs'] = jobCount
731 >
732 >        return dictOut
733 >
734 >
735      def Algos(self):
736          """
737          Define key splittingType matrix
738          """
739 <        SplitAlogs = {
740 <                     'EventBased'           : self.jobSplittingByEvent,
739 >        SplitAlogs = {
740 >                     'EventBased'           : self.jobSplittingByEvent,
741                       'RunBased'             : self.jobSplittingByRun,
742 <                     'LumiBased'            : self.jobSplittingByLumi,
743 <                     'NoInput'              : self.jobSplittingNoInput,
742 >                     'LumiBased'            : self.jobSplittingByLumi,
743 >                     'NoInput'              : self.jobSplittingNoInput,
744                       'ForScript'            : self.jobSplittingForScript
745 <                     }  
745 >                     }
746          return SplitAlogs
747  
748 +
749 +
750 + def compressLumiString(lumis):
751 +    """
752 +    Turn a list of 2-tuples of run/lumi numbers into a list of the format
753 +    R1:L1,R2:L2-R3:L3 which is acceptable to CMSSW LumiBlockRange variable
754 +    """
755 +
756 +    lumis.sort()
757 +    parts = []
758 +    startRange = None
759 +    endRange = None
760 +
761 +    for lumiBlock in lumis:
762 +        if not startRange: # This is the first one
763 +            startRange = lumiBlock
764 +            endRange = lumiBlock
765 +        elif lumiBlock == endRange: # Same Lumi (different files?)
766 +            pass
767 +        elif lumiBlock[0] == endRange[0] and lumiBlock[1] == endRange[1] + 1: # This is a continuation
768 +            endRange = lumiBlock
769 +        else: # This is the start of a new range
770 +            part = ':'.join(map(str, startRange))
771 +            if startRange != endRange:
772 +                part += '-' + ':'.join(map(str, endRange))
773 +            parts.append(part)
774 +            startRange = lumiBlock
775 +            endRange = lumiBlock
776 +
777 +    # Put out what's left
778 +    if startRange:
779 +        part = ':'.join(map(str, startRange))
780 +        if startRange != endRange:
781 +            part += '-' + ':'.join(map(str, endRange))
782 +        parts.append(part)
783 +
784 +    output = ','.join(parts)
785 +    return output
786 +
787 +

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines