ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Splitter.py
(Generate patch)

Comparing COMP/CRAB/python/Splitter.py (file contents):
Revision 1.26 by ewv, Wed Jul 29 21:42:16 2009 UTC vs.
Revision 1.27 by ewv, Thu Jul 30 18:45:44 2009 UTC

# Line 1 | Line 1
1 +
2 + __revision__ = "$Id$"
3 + __version__ = "$Revision$"
4 +
5   import common
6   from sets import Set
7   from crab_exceptions import *
# Line 15 | Line 19 | class JobSplitter:
19      def __init__( self, cfg_params,  args ):
20          self.cfg_params = cfg_params
21          self.args=args
22 +
23 +        self.lumisPerJob = -1
24 +        self.totalNLumis = 0
25 +        self.theNumberOfJobs = 0
26 +        self.limitNJobs = False
27 +        self.limitTotalLumis = False
28 +        self.limitJobLumis = False
29 +
30          #self.maxEvents
31          # init BlackWhiteListParser
32          self.seWhiteList = cfg_params.get('GRID.se_white_list',[])
# Line 51 | Line 63 | class JobSplitter:
63              self.selectTotalNumberEvents = 0
64  
65  
66 +    def checkLumiSettings(self):
67 +        """
68 +        Check to make sure the user has specified enough information to
69 +        perform splitting by Lumis to run the job
70 +        """
71 +        settings = 0
72 +        if self.cfg_params.has_key('CMSSW.lumis_per_job'):
73 +            self.lumisPerJob =int( self.cfg_params['CMSSW.lumis_per_job'])
74 +            self.limitJobLumis = True
75 +            settings += 1
76 +
77 +        if self.cfg_params.has_key('CMSSW.number_of_jobs'):
78 +            self.theNumberOfJobs =int( self.cfg_params['CMSSW.number_of_jobs'])
79 +            self.limitNJobs = True
80 +            settings += 1
81 +
82 +        if self.cfg_params.has_key('CMSSW.total_number_of_lumis'):
83 +            self.totalNLumis = int(self.cfg_params['CMSSW.total_number_of_lumis'])
84 +            self.limitTotalLumis = (self.totalNLumis != -1)
85 +            settings += 1
86 +
87 +        if settings != 2:
88 +            msg = 'When running on analysis datasets you must specify two and only two of:\n'
89 +            msg = '  number_of_jobs, lumis_per_job, total_number_of_lumis'
90 +            raise CrabException(msg)
91 +        if self.limitNJobs and self.limitJobLumis:
92 +            self.limitTotalLumis = True
93 +            self.totalNLumis = self.lumisPerJob * self.theNumberOfJobs
94 +
95 +
96      def ComputeSubBlockSites( self, blockSites ):
97          """
98          """
# Line 596 | Line 638 | class JobSplitter:
638          """
639  
640          common.logger.debug('Splitting by Lumi')
641 <        self.checkUserSettings() # FIXME need one for lumis
641 >        self.checkLumiSettings()
642  
643          blockSites = self.args['blockSites']
644          pubdata = self.args['pubdata']
645  
604        if self.selectNumberOfJobs == 0 :
605            self.theNumberOfJobs = 9999999
646          lumisPerFile  = pubdata.getLumis()
647  
648          # Make the list of WMBS files for job splitter
# Line 621 | Line 661 | class JobSplitter:
661                  wmbsFile.addRun(Run(lumi[0], lumi[1]))
662              thefiles.addFile(wmbsFile)
663  
664 +        # Create the factory and workflow
665          work = Workflow()
666          subs = Subscription(fileset    = thefiles,    workflow = work,
667                              split_algo = 'LumiBased', type     = "Processing")
# Line 630 | Line 671 | class JobSplitter:
671          list_of_lists = []
672          jobDestination = []
673          jobCount = 0
674 <        self.theNumberOfJobs = 20 #FIXME
675 <        for jobGroup in  jobFactory(lumis_per_job = 50): #FIXME
674 >        lumisCreated = 0
675 >
676 >        if not self.limitJobLumis:
677 >            self.lumisPerJob = pubdata.getMaxLumis() // self.theNumberOfJobs + 1
678 >            common.logger.info('Each job will process about %s lumis.' %
679 >                                self.lumisPerJob)
680 >
681 >        for jobGroup in  jobFactory(lumis_per_job = self.lumisPerJob):
682              for job in jobGroup.jobs:
683 <                if jobCount <  self.theNumberOfJobs:
684 <                    lumis = []
685 <                    lfns  = []
686 <                    locations = []
687 <                    firstFile = True
688 <                    # Collect information from all the files
689 <                    for jobFile in job.getFiles():
690 <                        if firstFile:  # Get locations from first file in the job
691 <                            for loc in jobFile['locations']:
692 <                                locations.append(loc)
693 <                            firstFile = False
694 <                        # Accumulate Lumis from all files
695 <                        for lumiList in jobFile['runs']:
696 <                            theRun = lumiList.run
697 <                            for theLumi in list(lumiList):
698 <                                lumis.append( (theRun, theLumi) )
699 <
700 <                        lfns.append(jobFile['lfn'])
701 <                    fileString = ','.join(lfns)
702 <                    lumiString = compressLumiString(lumis)
703 <                    common.logger.debug('Job %s will run on %s files and %s lumis '
704 <                        % (jobCount, len(lfns), len(lumis) ))
705 <                    list_of_lists.append([fileString, str(-1), str(0), lumiString])
683 >                if (self.limitNJobs and jobCount >= self.theNumberOfJobs):
684 >                    common.logger.info('Limit on number of jobs reached.')
685 >                    break
686 >                if (self.limitTotalLumis and lumisCreated >= self.totalNLumis):
687 >                    common.logger.info('Limit on number of lumis reached.')
688 >                    break
689 >                lumis = []
690 >                lfns  = []
691 >                locations = []
692 >                firstFile = True
693 >                # Collect information from all the files
694 >                for jobFile in job.getFiles():
695 >                    if firstFile:  # Get locations from first file in the job
696 >                        for loc in jobFile['locations']:
697 >                            locations.append(loc)
698 >                        firstFile = False
699 >                    # Accumulate Lumis from all files
700 >                    for lumiList in jobFile['runs']:
701 >                        theRun = lumiList.run
702 >                        for theLumi in list(lumiList):
703 >                            lumis.append( (theRun, theLumi) )
704 >
705 >                    lfns.append(jobFile['lfn'])
706 >                fileString = ','.join(lfns)
707 >                lumiString = compressLumiString(lumis)
708 >                list_of_lists.append([fileString, str(-1), str(0), lumiString])
709 >
710 >                jobDestination.append(locations)
711 >                jobCount += 1
712 >                lumisCreated += len(lumis)
713 >                common.logger.debug('Job %s will run on %s files and %s lumis '
714 >                    % (jobCount, len(lfns), len(lumis) ))
715 >
716 >        common.logger.info('%s jobs created to run on %s lumis' %
717 >                              (jobCount, lumisCreated))
718  
660                    jobDestination.append(locations)
661                    jobCount += 1
719          # Prepare dict output matching back to non-WMBS job creation
720          dictOut = {}
721          dictOut['params'] = ['InputFiles', 'MaxEvents', 'SkipEvents', 'Lumis']

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines