ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Splitter.py
(Generate patch)

Comparing COMP/CRAB/python/Splitter.py (file contents):
Revision 1.26 by ewv, Wed Jul 29 21:42:16 2009 UTC vs.
Revision 1.35 by ewv, Mon Mar 22 15:29:42 2010 UTC

# Line 1 | Line 1
1 +
2 + __revision__ = "$Id$"
3 + __version__ = "$Revision$"
4 +
5   import common
2 from sets import Set
6   from crab_exceptions import *
7   from crab_util import *
8  
# Line 10 | Line 13 | from WMCore.DataStructs.Subscription imp
13   from WMCore.DataStructs.Workflow import Workflow
14   from WMCore.JobSplitting.SplitterFactory import SplitterFactory
15   from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
16 + from LumiList import LumiList
17  
18   class JobSplitter:
19      def __init__( self, cfg_params,  args ):
20          self.cfg_params = cfg_params
21          self.args=args
22 +
23 +        self.lumisPerJob = -1
24 +        self.totalNLumis = 0
25 +        self.theNumberOfJobs = 0
26 +        self.limitNJobs = False
27 +        self.limitTotalLumis = False
28 +        self.limitJobLumis = False
29 +
30          #self.maxEvents
31          # init BlackWhiteListParser
32          self.seWhiteList = cfg_params.get('GRID.se_white_list',[])
# Line 51 | Line 63 | class JobSplitter:
63              self.selectTotalNumberEvents = 0
64  
65  
66 +    def checkLumiSettings(self):
67 +        """
68 +        Check to make sure the user has specified enough information to
69 +        perform splitting by Lumis to run the job
70 +        """
71 +        settings = 0
72 +        if self.cfg_params.has_key('CMSSW.lumis_per_job'):
73 +            self.lumisPerJob =int( self.cfg_params['CMSSW.lumis_per_job'])
74 +            self.limitJobLumis = True
75 +            settings += 1
76 +
77 +        if self.cfg_params.has_key('CMSSW.number_of_jobs'):
78 +            self.theNumberOfJobs =int( self.cfg_params['CMSSW.number_of_jobs'])
79 +            self.limitNJobs = True
80 +            settings += 1
81 +
82 +        if self.cfg_params.has_key('CMSSW.total_number_of_lumis'):
83 +            self.totalNLumis = int(self.cfg_params['CMSSW.total_number_of_lumis'])
84 +            self.limitTotalLumis = (self.totalNLumis != -1)
85 +            settings += 1
86 +
87 +        if settings != 2:
88 +            msg = 'When splitting by lumi section you must specify two and only two of:\n'
89 +            msg += '  number_of_jobs, lumis_per_job, total_number_of_lumis'
90 +            raise CrabException(msg)
91 +        if self.limitNJobs and self.limitJobLumis:
92 +            self.limitTotalLumis = True
93 +            self.totalNLumis = self.lumisPerJob * self.theNumberOfJobs
94 +
95 +
96      def ComputeSubBlockSites( self, blockSites ):
97          """
98          """
# Line 369 | Line 411 | class JobSplitter:
411              common.logger.info(msg)
412  
413          if bloskNoSite == allBlock:
414 <            raise CrabException('No jobs created')
414 >            msg += 'Requested jobs cannot be Created! \n'
415 >            if self.cfg_params.has_key('GRID.se_white_list'):
416 >                msg += '\tWARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
417 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
418 >                msg += '\tPlease check if the dataset is available at this site!)'
419 >            if self.cfg_params.has_key('GRID.ce_white_list'):
420 >                msg += '\tWARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
421 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
422 >                msg += '\tPlease check if the dataset is available at this site!)\n'
423 >            raise CrabException(msg)
424  
425          return
426  
# Line 416 | Line 467 | class JobSplitter:
467          jobfactory = splitter(subs)
468  
469          #loop over all runs
419        set = Set(runList)
470          list_of_lists = []
471          jobDestination = []
472          count = 0
# Line 497 | Line 547 | class JobSplitter:
547  
548          managedGenerators =self.args['managedGenerators']
549          generator = self.args['generator']
550 <        firstRun = self.cfg_params.get('CMSSW.first_run',None)
550 >        firstLumi = self.cfg_params.get('CMSSW.first_lumi', 1)
551  
552          self.prepareSplittingNoInput()
553  
# Line 516 | Line 566 | class JobSplitter:
566          self.list_of_args = []
567          for i in range(self.total_number_of_jobs):
568              ## Since there is no input, any site is good
569 <            jobDestination.append([""]) #must be empty to write correctly the xml
569 >            jobDestination.append([""]) # must be empty to correctly write the XML
570              args=[]
571 <            if (firstRun):
572 <                ## pythia first run
523 <                args.append(str(firstRun)+str(i))
571 >            if (firstLumi): # Pythia first lumi
572 >                args.append(str(int(firstLumi)+i))
573              if (generator in managedGenerators):
574                 args.append(generator)
575                 if (generator == 'comphep' and i == 0):
# Line 534 | Line 583 | class JobSplitter:
583  
584          dictOut = {}
585          dictOut['params'] = ['MaxEvents']
586 <        if (firstRun):
587 <            dictOut['params'] = ['FirstRun','MaxEvents']
588 <            if ( generator in managedGenerators ) :
589 <                dictOut['params'] = ['FirstRun', 'Generator', 'FirstEvent', 'MaxEvents']
586 >        if (firstLumi):
587 >            dictOut['params'] = ['FirstLumi','MaxEvents']
588 >            if (generator in managedGenerators):
589 >                dictOut['params'] = ['FirstLumi', 'Generator', 'FirstEvent', 'MaxEvents']
590          else:
591              if (generator in managedGenerators) :
592                  dictOut['params'] = ['Generator', 'FirstEvent', 'MaxEvents']
# Line 596 | Line 645 | class JobSplitter:
645          """
646  
647          common.logger.debug('Splitting by Lumi')
648 <        self.checkUserSettings() # FIXME need one for lumis
648 >        self.checkLumiSettings()
649  
650          blockSites = self.args['blockSites']
651          pubdata = self.args['pubdata']
652  
604        if self.selectNumberOfJobs == 0 :
605            self.theNumberOfJobs = 9999999
653          lumisPerFile  = pubdata.getLumis()
654  
655          # Make the list of WMBS files for job splitter
# Line 621 | Line 668 | class JobSplitter:
668                  wmbsFile.addRun(Run(lumi[0], lumi[1]))
669              thefiles.addFile(wmbsFile)
670  
671 +        # Create the factory and workflow
672          work = Workflow()
673          subs = Subscription(fileset    = thefiles,    workflow = work,
674                              split_algo = 'LumiBased', type     = "Processing")
# Line 630 | Line 678 | class JobSplitter:
678          list_of_lists = []
679          jobDestination = []
680          jobCount = 0
681 <        self.theNumberOfJobs = 20 #FIXME
682 <        for jobGroup in  jobFactory(lumis_per_job = 50): #FIXME
681 >        lumisCreated = 0
682 >
683 >        if not self.limitJobLumis:
684 >            self.lumisPerJob = pubdata.getMaxLumis() // self.theNumberOfJobs + 1
685 >            common.logger.info('Each job will process about %s lumis.' %
686 >                                self.lumisPerJob)
687 >
688 >        for jobGroup in  jobFactory(lumis_per_job = self.lumisPerJob):
689              for job in jobGroup.jobs:
690 <                if jobCount <  self.theNumberOfJobs:
691 <                    lumis = []
692 <                    lfns  = []
693 <                    locations = []
694 <                    firstFile = True
695 <                    # Collect information from all the files
696 <                    for jobFile in job.getFiles():
697 <                        if firstFile:  # Get locations from first file in the job
698 <                            for loc in jobFile['locations']:
699 <                                locations.append(loc)
700 <                            firstFile = False
701 <                        # Accumulate Lumis from all files
702 <                        for lumiList in jobFile['runs']:
703 <                            theRun = lumiList.run
704 <                            for theLumi in list(lumiList):
690 >                if (self.limitNJobs and jobCount >= self.theNumberOfJobs):
691 >                    common.logger.info('Limit on number of jobs reached.')
692 >                    break
693 >                if (self.limitTotalLumis and lumisCreated >= self.totalNLumis):
694 >                    common.logger.info('Limit on number of lumis reached.')
695 >                    break
696 >                lumis = []
697 >                lfns  = []
698 >                locations = []
699 >                firstFile = True
700 >                # Collect information from all the files
701 >                for jobFile in job.getFiles():
702 >                    doFile = False
703 >                    if firstFile:  # Get locations from first file in the job
704 >                        for loc in jobFile['locations']:
705 >                            locations.append(loc)
706 >                        firstFile = False
707 >                    # Accumulate Lumis from all files
708 >                    for lumiList in jobFile['runs']:
709 >                        theRun = lumiList.run
710 >                        for theLumi in list(lumiList):
711 >                            lumisCreated += 1
712 >                            if (not self.limitTotalLumis) or \
713 >                               (lumisCreated <= self.totalNLumis):
714 >                                doFile = True
715                                  lumis.append( (theRun, theLumi) )
716 <
716 >                    if doFile:
717                          lfns.append(jobFile['lfn'])
718 <                    fileString = ','.join(lfns)
719 <                    lumiString = compressLumiString(lumis)
720 <                    common.logger.debug('Job %s will run on %s files and %s lumis '
721 <                        % (jobCount, len(lfns), len(lumis) ))
722 <                    list_of_lists.append([fileString, str(-1), str(0), lumiString])
718 >                fileString = ','.join(lfns)
719 >                lumiLister = LumiList(lumis = lumis)
720 >                lumiString = lumiLister.getCMSSWString()
721 >                list_of_lists.append([fileString, str(-1), str(0), lumiString])
722 >
723 >                jobDestination.append(locations)
724 >                jobCount += 1
725 >                common.logger.debug('Job %s will run on %s files and %s lumis '
726 >                    % (jobCount, len(lfns), len(lumis) ))
727 >
728 >        common.logger.info('%s jobs created to run on %s lumis' %
729 >                              (jobCount, lumisCreated))
730  
660                    jobDestination.append(locations)
661                    jobCount += 1
731          # Prepare dict output matching back to non-WMBS job creation
732          dictOut = {}
733          dictOut['params'] = ['InputFiles', 'MaxEvents', 'SkipEvents', 'Lumis']
# Line 682 | Line 751 | class JobSplitter:
751                       }
752          return SplitAlogs
753  
685
686
687 def compressLumiString(lumis):
688    """
689    Turn a list of 2-tuples of run/lumi numbers into a list of the format
690    R1:L1,R2:L2-R3:L3 which is acceptable to CMSSW LumiBlockRange variable
691    """
692
693    lumis.sort()
694    parts = []
695    startRange = None
696    endRange = None
697
698    for lumiBlock in lumis:
699        if not startRange: # This is the first one
700            startRange = lumiBlock
701            endRange = lumiBlock
702        elif lumiBlock == endRange: # Same Lumi (different files?)
703            pass
704        elif lumiBlock[0] == endRange[0] and lumiBlock[1] == endRange[1] + 1: # This is a continuation
705            endRange = lumiBlock
706        else: # This is the start of a new range
707            part = ':'.join(map(str, startRange))
708            if startRange != endRange:
709                part += '-' + ':'.join(map(str, endRange))
710            parts.append(part)
711            startRange = lumiBlock
712            endRange = lumiBlock
713
714    # Put out what's left
715    if startRange:
716        part = ':'.join(map(str, startRange))
717        if startRange != endRange:
718            part += '-' + ':'.join(map(str, endRange))
719        parts.append(part)
720
721    output = ','.join(parts)
722    return output
723
724

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines