ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Splitter.py
(Generate patch)

Comparing COMP/CRAB/python/Splitter.py (file contents):
Revision 1.19 by slacapra, Wed Jun 10 11:31:33 2009 UTC vs.
Revision 1.36 by ewv, Fri Apr 9 13:17:52 2010 UTC

# Line 1 | Line 1
1 +
2 + __revision__ = "$Id$"
3 + __version__ = "$Revision$"
4 +
5   import common
6   from crab_exceptions import *
7   from crab_util import *
8 +
9 + from WMCore.DataStructs.File import File
10 + from WMCore.DataStructs.Fileset import Fileset
11 + from WMCore.DataStructs.Run import Run
12 + from WMCore.DataStructs.Subscription import Subscription
13 + from WMCore.DataStructs.Workflow import Workflow
14 + from WMCore.JobSplitting.SplitterFactory import SplitterFactory
15   from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
16 + from LumiList import LumiList
17  
18   class JobSplitter:
19      def __init__( self, cfg_params,  args ):
20          self.cfg_params = cfg_params
21          self.args=args
22 +
23 +        self.lumisPerJob = -1
24 +        self.totalNLumis = 0
25 +        self.theNumberOfJobs = 0
26 +        self.limitNJobs = False
27 +        self.limitTotalLumis = False
28 +        self.limitJobLumis = False
29 +
30          #self.maxEvents
31          # init BlackWhiteListParser
32 <        seWhiteList = cfg_params.get('GRID.se_white_list',[])
32 >        self.seWhiteList = cfg_params.get('GRID.se_white_list',[])
33          seBlackList = cfg_params.get('GRID.se_black_list',[])
34 <        self.blackWhiteListParser = SEBlackWhiteListParser(seWhiteList, seBlackList, common.logger())
34 >        self.blackWhiteListParser = SEBlackWhiteListParser(self.seWhiteList, seBlackList, common.logger())
35  
36  
37      def checkUserSettings(self):
# Line 43 | Line 63 | class JobSplitter:
63              self.selectTotalNumberEvents = 0
64  
65  
66 +    def checkLumiSettings(self):
67 +        """
68 +        Check to make sure the user has specified enough information to
69 +        perform splitting by Lumis to run the job
70 +        """
71 +        settings = 0
72 +        if self.cfg_params.has_key('CMSSW.lumis_per_job'):
73 +            self.lumisPerJob =int( self.cfg_params['CMSSW.lumis_per_job'])
74 +            self.limitJobLumis = True
75 +            settings += 1
76 +
77 +        if self.cfg_params.has_key('CMSSW.number_of_jobs'):
78 +            self.theNumberOfJobs =int( self.cfg_params['CMSSW.number_of_jobs'])
79 +            self.limitNJobs = True
80 +            settings += 1
81 +
82 +        if self.cfg_params.has_key('CMSSW.total_number_of_lumis'):
83 +            self.totalNLumis = int(self.cfg_params['CMSSW.total_number_of_lumis'])
84 +            self.limitTotalLumis = (self.totalNLumis != -1)
85 +            settings += 1
86 +
87 +        if settings != 2:
88 +            msg = 'When splitting by lumi section you must specify two and only two of:\n'
89 +            msg += '  number_of_jobs, lumis_per_job, total_number_of_lumis'
90 +            raise CrabException(msg)
91 +        if self.limitNJobs and self.limitJobLumis:
92 +            self.limitTotalLumis = True
93 +            self.totalNLumis = self.lumisPerJob * self.theNumberOfJobs
94 +
95 +
96 +    def ComputeSubBlockSites( self, blockSites ):
97 +        """
98 +        """
99 +        sub_blockSites = {}
100 +        for k,v in blockSites.iteritems():
101 +            sites=self.blackWhiteListParser.checkWhiteList(v)
102 +            if sites : sub_blockSites[k]=v
103 +        if len(sub_blockSites) < 1:
104 +            msg = 'WARNING: the sites %s is not hosting any part of data.'%self.seWhiteList
105 +            raise CrabException(msg)
106 +        return sub_blockSites
107 +
108   ########################################################################
109      def jobSplittingByEvent( self ):
110          """
# Line 57 | Line 119 | class JobSplitter:
119                self.list_of_args - File(s) job will run on (a list of lists)
120          """
121  
122 <        jobDestination=[]  
122 >        jobDestination=[]
123          self.checkUserSettings()
124          if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
125              msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
126              raise CrabException(msg)
127 <
128 <        blockSites = self.args['blockSites']
127 >
128 >        blockSites = self.args['blockSites']
129          pubdata = self.args['pubdata']
130          filesbyblock=pubdata.getFiles()
131  
# Line 77 | Line 139 | class JobSplitter:
139          self.useParent = int(self.cfg_params.get('CMSSW.use_parent',0))
140          noBboundary = int(self.cfg_params.get('CMSSW.no_block_boundary',0))
141  
142 +        if noBboundary == 1:
143 +            if self.total_number_of_events== -1:
144 +                msg = 'You are selecting no_block_boundary=1 which does not allow to set total_number_of_events=-1\n'
145 +                msg +='\tYou shoud get the number of event from DBS web interface and use it for your configuration.'
146 +                raise CrabException(msg)
147 +            if len(self.seWhiteList) == 0 or  len(self.seWhiteList.split(',')) != 1:
148 +                msg = 'You are selecting no_block_boundary=1 which requires to choose one and only one site.\n'
149 +                msg += "\tPlease set se_white_list with the site's storage element name."
150 +                raise  CrabException(msg)
151 +            blockSites = self.ComputeSubBlockSites(blockSites)
152 +
153          # ---- Handle the possible job splitting configurations ---- #
154          if (self.selectTotalNumberEvents):
155              totalEventsRequested = self.total_number_of_events
# Line 126 | Line 199 | class JobSplitter:
199          parString = ""
200          pString = ""
201          filesEventCount = 0
202 +        msg=''
203  
204          # ---- Iterate over the blocks in the dataset until ---- #
205          # ---- we've met the requested total # of events    ---- #
# Line 197 | Line 271 | class JobSplitter:
271                                      list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
272                                  msg += "Job %s can run over %s  events (last file in block).\n"%(str(jobCount+1), str(filesEventCount - jobSkipEventCount))
273                                  jobDestination.append(blockSites[block])
274 <                                msg += "Job %s Destination: %s\n"%(str(jobCount+1),str(destinationCMS(jobDestination[jobCount])))
201 <                                from ProdCommon.SiteDB.CmsSiteMapper import CmsSEMap
202 <                                cms_se = CmsSEMap()
203 <                                SEDestination = [cms_se[dest] for dest in jobDestination[jobCount]]
204 <                                msg+="\t  CMSDestination: %s "%(str(SEDestination))
274 >                                msg += "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
275                                  # fill jobs of block dictionary
276                                  jobsOfBlock[block].append(jobCount+1)
277                                  # reset counter
# Line 230 | Line 300 | class JobSplitter:
300                              list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
301                          msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
302                          jobDestination.append(blockSites[block])
303 <                        msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(destinationCMS(jobDestination[jobCount])))
303 >                        msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
304                          jobsOfBlock[block].append(jobCount+1)
305                          # reset counter
306                          jobCount = jobCount + 1
# Line 255 | Line 325 | class JobSplitter:
325                              list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
326                          msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
327                          jobDestination.append(blockSites[block])
328 <                        msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(destinationCMS(jobDestination[jobCount])))
328 >                        msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
329                          jobsOfBlock[block].append(jobCount+1)
330                          # increase counter
331                          jobCount = jobCount + 1
# Line 269 | Line 339 | class JobSplitter:
339                          pString_tmp=''
340                          if self.useParent==1:
341                              for f in parent : pString_tmp +=  f + ','
342 <                        pString =  pString_tmp
342 >                        pString =  pString_tmp
343                          parString =  file + ','
344                      pass # END if
345                  pass # END while (iterate over files in the block)
# Line 279 | Line 349 | class JobSplitter:
349          if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
350              common.logger.info("Could not run on all requested events because some blocks not hosted at allowed sites.")
351          common.logger.info(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
352 <
352 >
353          # skip check on  block with no sites  DD
354          if noBboundary == 0 : self.checkBlockNoSite(blocks,jobsOfBlock,blockSites)
355  
# Line 295 | Line 365 | class JobSplitter:
365  
366          # keep trace of block with no sites to print a warning at the end
367  
368 <    def checkBlockNoSite(self,blocks,jobsOfBlock,blockSites):  
368 >    def checkBlockNoSite(self,blocks,jobsOfBlock,blockSites):
369          # screen output
370          screenOutput = "List of jobs and available destination sites:\n\n"
371          noSiteBlock = []
# Line 309 | Line 379 | class JobSplitter:
379                  allBlock.append( blockCounter )
380                  sites=self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],[block]),[block])
381                  screenOutput += "Block %5i: jobs %20s: sites: %s\n" % (blockCounter,spanRanges(jobsOfBlock[block]),
382 <                    ', '.join(destinationCMS(sites)))
382 >                    ', '.join(SE2CMS(sites)))
383                  if len(sites) == 0:
384                      noSiteBlock.append( spanRanges(jobsOfBlock[block]) )
385                      bloskNoSite.append( blockCounter )
# Line 322 | Line 392 | class JobSplitter:
392                  virgola = ","
393              for block in bloskNoSite:
394                  msg += ' ' + str(block) + virgola
395 <            msg += '\n               Related jobs:\n                 '
395 >            msg += '\n\t\tRelated jobs:\n                 '
396              virgola = ""
397              if len(noSiteBlock) > 1:
398                  virgola = ","
399              for range_jobs in noSiteBlock:
400                  msg += str(range_jobs) + virgola
401 <            msg += '\n               will not be submitted and this block of data can not be analyzed!\n'
401 >            msg += '\n\t\twill not be submitted and this block of data can not be analyzed!\n'
402              if self.cfg_params.has_key('GRID.se_white_list'):
403 <                msg += 'WARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
404 <                msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
405 <                msg += 'Please check if the dataset is available at this site!)\n'
403 >                msg += '\tWARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
404 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
405 >                msg += '\tPlease check if the dataset is available at this site!)'
406              if self.cfg_params.has_key('GRID.ce_white_list'):
407 <                msg += 'WARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
408 <                msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
409 <                msg += 'Please check if the dataset is available at this site!)\n'
407 >                msg += '\tWARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
408 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
409 >                msg += '\tPlease check if the dataset is available at this site!)\n'
410  
411              common.logger.info(msg)
412  
413          if bloskNoSite == allBlock:
414 <            raise CrabException('No jobs created')
414 >            msg += 'Requested jobs cannot be Created! \n'
415 >            if self.cfg_params.has_key('GRID.se_white_list'):
416 >                msg += '\tWARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
417 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
418 >                msg += '\tPlease check if the dataset is available at this site!)'
419 >            if self.cfg_params.has_key('GRID.ce_white_list'):
420 >                msg += '\tWARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
421 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
422 >                msg += '\tPlease check if the dataset is available at this site!)\n'
423 >            raise CrabException(msg)
424  
425          return
426  
427  
428   ########################################################################
429 <    def jobSplittingByRun(self):
429 >    def jobSplittingByRun(self):
430          """
431          """
353        from sets import Set  
354        from WMCore.JobSplitting.RunBased import RunBased
355        from WMCore.DataStructs.Workflow import Workflow
356        from WMCore.DataStructs.File import File
357        from WMCore.DataStructs.Fileset import Fileset
358        from WMCore.DataStructs.Subscription import Subscription
359        from WMCore.JobSplitting.SplitterFactory import SplitterFactory
360        from WMCore.DataStructs.Run import Run
432  
433          self.checkUserSettings()
434 <        blockSites = self.args['blockSites']
434 >        blockSites = self.args['blockSites']
435          pubdata = self.args['pubdata']
436  
437          if self.selectNumberOfJobs == 0 :
438              self.theNumberOfJobs = 9999999
439          blocks = {}
440 <        runList = []
440 >        runList = []
441          thefiles = Fileset(name='FilesToSplit')
442          fileList = pubdata.getListFiles()
443          for f in fileList:
444              block = f['Block']['Name']
445 <            try:
445 >            try:
446                  f['Block']['StorageElementList'].extend(blockSites[block])
447              except:
448                  continue
# Line 379 | Line 450 | class JobSplitter:
450              [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
451              wmbsFile['block'] = block
452              runNum = f['RunsList'][0]['RunNumber']
453 <            runList.append(runNum)
453 >            runList.append(runNum)
454              myRun = Run(runNumber=runNum)
455              wmbsFile.addRun( myRun )
456              thefiles.addFile(
457                  wmbsFile
458                  )
459 <
459 >
460          work = Workflow()
461          subs = Subscription(
462          fileset = thefiles,
# Line 394 | Line 465 | class JobSplitter:
465          type = "Processing")
466          splitter = SplitterFactory()
467          jobfactory = splitter(subs)
468 <        
469 <        #loop over all runs
399 <        set = Set(runList)
468 >
469 >        #loop over all runs
470          list_of_lists = []
471          jobDestination = []
472          count = 0
473          for jobGroup in  jobfactory():
474              if count <  self.theNumberOfJobs:
475                  res = self.getJobInfo(jobGroup)
476 <                parString = ''
476 >                parString = ''
477                  for file in res['lfns']:
478                      parString += file + ','
479                  fullString = parString[:-1]
480 <                list_of_lists.append([fullString,str(-1),str(0)])    
480 >                list_of_lists.append([fullString,str(-1),str(0)])
481                  #need to check single file location
482 <                jobDestination.append(res['locations'])  
482 >                jobDestination.append(res['locations'])
483                  count +=1
484         # prepare dict output
485          dictOut = {}
# Line 422 | Line 492 | class JobSplitter:
492  
493      def getJobInfo( self,jobGroup ):
494          res = {}
495 <        lfns = []        
496 <        locations = []        
495 >        lfns = []
496 >        locations = []
497          tmp_check=0
498          for job in jobGroup.jobs:
499              for file in job.getFiles():
500 <                lfns.append(file['lfn'])
500 >                lfns.append(file['lfn'])
501                  for loc in file['locations']:
502                      if tmp_check < 1 :
503                          locations.append(loc)
504 <                tmp_check = tmp_check + 1
505 <                ### qui va messo il check per la locations
506 <        res['lfns'] = lfns
507 <        res['locations'] = locations
508 <        return res                
509 <      
504 >                tmp_check = tmp_check + 1
505 >                ### qui va messo il check per la locations
506 >        res['lfns'] = lfns
507 >        res['locations'] = locations
508 >        return res
509 >
510   ########################################################################
511 <    def jobSplittingNoInput(self):
511 >    def prepareSplittingNoInput(self):
512          """
443        Perform job splitting based on number of event per job
513          """
445        common.logger.debug('Splitting per events')
446        self.checkUserSettings()
447        jobDestination=[]
448        if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
449            msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
450            raise CrabException(msg)
451
452        managedGenerators =self.args['managedGenerators']
453        generator = self.args['generator']
454        firstRun = self.cfg_params.get('CMSSW.first_run',None)
455
514          if (self.selectEventsPerJob):
515              common.logger.info('Required '+str(self.eventsPerJob)+' events per job ')
516          if (self.selectNumberOfJobs):
# Line 475 | Line 533 | class JobSplitter:
533              self.total_number_of_jobs = self.theNumberOfJobs
534              self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
535  
536 +
537 +    def jobSplittingNoInput(self):
538 +        """
539 +        Perform job splitting based on number of event per job
540 +        """
541 +        common.logger.debug('Splitting per events')
542 +        self.checkUserSettings()
543 +        jobDestination=[]
544 +        if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
545 +            msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
546 +            raise CrabException(msg)
547 +
548 +        managedGenerators =self.args['managedGenerators']
549 +        generator = self.args['generator']
550 +        firstLumi = self.cfg_params.get('CMSSW.first_lumi', 1)
551 +
552 +        self.prepareSplittingNoInput()
553 +
554          common.logger.debug('N jobs  '+str(self.total_number_of_jobs))
555  
556          # is there any remainder?
# Line 490 | Line 566 | class JobSplitter:
566          self.list_of_args = []
567          for i in range(self.total_number_of_jobs):
568              ## Since there is no input, any site is good
569 <            jobDestination.append([""]) #must be empty to write correctly the xml
569 >            jobDestination.append([""]) # must be empty to correctly write the XML
570              args=[]
571 <            if (firstRun):
572 <                ## pythia first run
497 <                args.append(str(firstRun)+str(i))
571 >            if (firstLumi): # Pythia first lumi
572 >                args.append(str(int(firstLumi)+i))
573              if (generator in managedGenerators):
574 <                if (generator == 'comphep' and i == 0):
574 >               args.append(generator)
575 >               if (generator == 'comphep' and i == 0):
576                      # COMPHEP is brain-dead and wants event #'s like 1,100,200,300
577                      args.append('1')
578 <                else:
578 >               else:
579                      args.append(str(i*self.eventsPerJob))
580              args.append(str(self.eventsPerJob))
581              self.list_of_args.append(args)
# Line 507 | Line 583 | class JobSplitter:
583  
584          dictOut = {}
585          dictOut['params'] = ['MaxEvents']
586 <        if (firstRun):
587 <            dictOut['params'] = ['FirstRun','MaxEvents']
588 <            if ( generator in managedGenerators ) : dictOut['params'] = ['FirstRun', 'FirstEvent', 'MaxEvents']
589 <        else:  
590 <            if (generator in managedGenerators) : dictOut['params'] = ['FirstEvent', 'MaxEvents']
586 >        if (firstLumi):
587 >            dictOut['params'] = ['FirstLumi','MaxEvents']
588 >            if (generator in managedGenerators):
589 >                dictOut['params'] = ['FirstLumi', 'Generator', 'FirstEvent', 'MaxEvents']
590 >        else:
591 >            if (generator in managedGenerators) :
592 >                dictOut['params'] = ['Generator', 'FirstEvent', 'MaxEvents']
593          dictOut['args'] = self.list_of_args
594          dictOut['jobDestination'] = jobDestination
595          dictOut['njobs']=self.total_number_of_jobs
# Line 531 | Line 609 | class JobSplitter:
609          common.logger.debug('Splitting per job')
610          common.logger.info('Required '+str(self.theNumberOfJobs)+' jobs in total ')
611  
612 <        self.total_number_of_jobs = self.theNumberOfJobs
612 > #        self.total_number_of_jobs = self.theNumberOfJobs
613 >
614 >        self.prepareSplittingNoInput()
615  
616          common.logger.debug('N jobs  '+str(self.total_number_of_jobs))
617  
618          common.logger.info(str(self.total_number_of_jobs)+' jobs can be created')
619  
620          # argument is seed number.$i
621 <        #self.list_of_args = []
621 >        self.list_of_args = []
622          for i in range(self.total_number_of_jobs):
623 +            args=[]
624              jobDestination.append([""])
625 <        #   self.list_of_args.append([str(i)])
625 >            if self.eventsPerJob != 0 :
626 >                args.append(str(self.eventsPerJob))
627 >                self.list_of_args.append(args)
628  
629         # prepare dict output
630          dictOut = {}
631 <        dictOut['args'] = [] # self.list_of_args
631 >        dictOut['params'] = ['MaxEvents']
632 >        dictOut['args'] =  self.list_of_args
633          dictOut['jobDestination'] = jobDestination
634          dictOut['njobs']=self.total_number_of_jobs
635          return dictOut
552
636  
637 <    def jobSplittingByLumi(self):
637 >
638 >    def jobSplittingByLumi(self):
639          """
640 +        Split task into jobs by Lumi section paying attention to which
641 +        lumis should be run (according to the analysis dataset).
642 +        This uses WMBS job splitting which does not split files over jobs
643 +        so the job will have AT LEAST as many lumis as requested, perhaps
644 +        more
645          """
646 <        return
646 >
647 >        common.logger.debug('Splitting by Lumi')
648 >        self.checkLumiSettings()
649 >
650 >        blockSites = self.args['blockSites']
651 >        pubdata = self.args['pubdata']
652 >
653 >        lumisPerFile  = pubdata.getLumis()
654 >
655 >        # Make the list of WMBS files for job splitter
656 >        fileList = pubdata.getListFiles()
657 >        thefiles = Fileset(name='FilesToSplit')
658 >        for jobFile in fileList:
659 >            block = jobFile['Block']['Name']
660 >            try:
661 >                jobFile['Block']['StorageElementList'].extend(blockSites[block])
662 >            except:
663 >                continue
664 >            wmbsFile = File(jobFile['LogicalFileName'])
665 >            [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
666 >            wmbsFile['block'] = block
667 >            for lumi in lumisPerFile[jobFile['LogicalFileName']]:
668 >                wmbsFile.addRun(Run(lumi[0], lumi[1]))
669 >            thefiles.addFile(wmbsFile)
670 >
671 >        # Create the factory and workflow
672 >        work = Workflow()
673 >        subs = Subscription(fileset    = thefiles,    workflow = work,
674 >                            split_algo = 'LumiBased', type     = "Processing")
675 >        splitter = SplitterFactory()
676 >        jobFactory = splitter(subs)
677 >
678 >        list_of_lists = []
679 >        jobDestination = []
680 >        jobCount = 0
681 >        lumisCreated = 0
682 >
683 >        if not self.limitJobLumis:
684 >            self.lumisPerJob = pubdata.getMaxLumis() // self.theNumberOfJobs + 1
685 >            common.logger.info('Each job will process about %s lumis.' %
686 >                                self.lumisPerJob)
687 >
688 >        for jobGroup in  jobFactory(lumis_per_job = self.lumisPerJob):
689 >            for job in jobGroup.jobs:
690 >                if (self.limitNJobs and jobCount >= self.theNumberOfJobs):
691 >                    common.logger.info('Limit on number of jobs reached.')
692 >                    break
693 >                if (self.limitTotalLumis and lumisCreated >= self.totalNLumis):
694 >                    common.logger.info('Limit on number of lumis reached.')
695 >                    break
696 >                lumis = []
697 >                lfns  = []
698 >                locations = []
699 >                firstFile = True
700 >                # Collect information from all the files
701 >                for jobFile in job.getFiles():
702 >                    doFile = False
703 >                    if firstFile:  # Get locations from first file in the job
704 >                        for loc in jobFile['locations']:
705 >                            locations.append(loc)
706 >                        firstFile = False
707 >                    # Accumulate Lumis from all files
708 >                    for lumiList in jobFile['runs']:
709 >                        theRun = lumiList.run
710 >                        for theLumi in list(lumiList):
711 >                            if (not self.limitTotalLumis) or \
712 >                               (lumisCreated <= self.totalNLumis):
713 >                                doFile = True
714 >                                lumisCreated += 1
715 >                                lumis.append( (theRun, theLumi) )
716 >                    if doFile:
717 >                        lfns.append(jobFile['lfn'])
718 >                fileString = ','.join(lfns)
719 >                lumiLister = LumiList(lumis = lumis)
720 >                lumiString = lumiLister.getCMSSWString()
721 >                list_of_lists.append([fileString, str(-1), str(0), lumiString])
722 >
723 >                jobDestination.append(locations)
724 >                jobCount += 1
725 >                common.logger.debug('Job %s will run on %s files and %s lumis '
726 >                    % (jobCount, len(lfns), len(lumis) ))
727 >
728 >        common.logger.info('%s jobs created to run on %s lumis' %
729 >                              (jobCount, lumisCreated))
730 >
731 >        # Prepare dict output matching back to non-WMBS job creation
732 >        dictOut = {}
733 >        dictOut['params'] = ['InputFiles', 'MaxEvents', 'SkipEvents', 'Lumis']
734 >        dictOut['args'] = list_of_lists
735 >        dictOut['jobDestination'] = jobDestination
736 >        dictOut['njobs'] = jobCount
737 >
738 >        return dictOut
739 >
740 >
741      def Algos(self):
742          """
743          Define key splittingType matrix
744          """
745 <        SplitAlogs = {
746 <                     'EventBased'           : self.jobSplittingByEvent,
745 >        SplitAlogs = {
746 >                     'EventBased'           : self.jobSplittingByEvent,
747                       'RunBased'             : self.jobSplittingByRun,
748 <                     'LumiBased'            : self.jobSplittingByLumi,
749 <                     'NoInput'              : self.jobSplittingNoInput,
748 >                     'LumiBased'            : self.jobSplittingByLumi,
749 >                     'NoInput'              : self.jobSplittingNoInput,
750                       'ForScript'            : self.jobSplittingForScript
751 <                     }  
751 >                     }
752          return SplitAlogs
753  

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines