ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Splitter.py
(Generate patch)

Comparing COMP/CRAB/python/Splitter.py (file contents):
Revision 1.26 by ewv, Wed Jul 29 21:42:16 2009 UTC vs.
Revision 1.29 by ewv, Thu Oct 1 22:00:40 2009 UTC

# Line 1 | Line 1
1 +
2 + __revision__ = "$Id$"
3 + __version__ = "$Revision$"
4 +
5   import common
6   from sets import Set
7   from crab_exceptions import *
# Line 15 | Line 19 | class JobSplitter:
19      def __init__( self, cfg_params,  args ):
20          self.cfg_params = cfg_params
21          self.args=args
22 +
23 +        self.lumisPerJob = -1
24 +        self.totalNLumis = 0
25 +        self.theNumberOfJobs = 0
26 +        self.limitNJobs = False
27 +        self.limitTotalLumis = False
28 +        self.limitJobLumis = False
29 +
30          #self.maxEvents
31          # init BlackWhiteListParser
32          self.seWhiteList = cfg_params.get('GRID.se_white_list',[])
# Line 51 | Line 63 | class JobSplitter:
63              self.selectTotalNumberEvents = 0
64  
65  
66 +    def checkLumiSettings(self):
67 +        """
68 +        Check to make sure the user has specified enough information to
69 +        perform splitting by Lumis to run the job
70 +        """
71 +        settings = 0
72 +        if self.cfg_params.has_key('CMSSW.lumis_per_job'):
73 +            self.lumisPerJob =int( self.cfg_params['CMSSW.lumis_per_job'])
74 +            self.limitJobLumis = True
75 +            settings += 1
76 +
77 +        if self.cfg_params.has_key('CMSSW.number_of_jobs'):
78 +            self.theNumberOfJobs =int( self.cfg_params['CMSSW.number_of_jobs'])
79 +            self.limitNJobs = True
80 +            settings += 1
81 +
82 +        if self.cfg_params.has_key('CMSSW.total_number_of_lumis'):
83 +            self.totalNLumis = int(self.cfg_params['CMSSW.total_number_of_lumis'])
84 +            self.limitTotalLumis = (self.totalNLumis != -1)
85 +            settings += 1
86 +
87 +        if settings != 2:
88 +            msg = 'When running on analysis datasets you must specify two and only two of:\n'
89 +            msg += '  number_of_jobs, lumis_per_job, total_number_of_lumis'
90 +            raise CrabException(msg)
91 +        if self.limitNJobs and self.limitJobLumis:
92 +            self.limitTotalLumis = True
93 +            self.totalNLumis = self.lumisPerJob * self.theNumberOfJobs
94 +
95 +
96      def ComputeSubBlockSites( self, blockSites ):
97          """
98          """
# Line 497 | Line 539 | class JobSplitter:
539  
540          managedGenerators =self.args['managedGenerators']
541          generator = self.args['generator']
542 <        firstRun = self.cfg_params.get('CMSSW.first_run',None)
542 >        firstRun = self.cfg_params.get('CMSSW.first_run', 1)
543  
544          self.prepareSplittingNoInput()
545  
# Line 516 | Line 558 | class JobSplitter:
558          self.list_of_args = []
559          for i in range(self.total_number_of_jobs):
560              ## Since there is no input, any site is good
561 <            jobDestination.append([""]) #must be empty to write correctly the xml
561 >            jobDestination.append([""]) # must be empty to correctly write the XML
562              args=[]
563 <            if (firstRun):
564 <                ## pythia first run
523 <                args.append(str(firstRun)+str(i))
563 >            if (firstRun): # Pythia first run
564 >                args.append(str(int(firstRun)+i))
565              if (generator in managedGenerators):
566                 args.append(generator)
567                 if (generator == 'comphep' and i == 0):
# Line 596 | Line 637 | class JobSplitter:
637          """
638  
639          common.logger.debug('Splitting by Lumi')
640 <        self.checkUserSettings() # FIXME need one for lumis
640 >        self.checkLumiSettings()
641  
642          blockSites = self.args['blockSites']
643          pubdata = self.args['pubdata']
644  
604        if self.selectNumberOfJobs == 0 :
605            self.theNumberOfJobs = 9999999
645          lumisPerFile  = pubdata.getLumis()
646  
647          # Make the list of WMBS files for job splitter
# Line 621 | Line 660 | class JobSplitter:
660                  wmbsFile.addRun(Run(lumi[0], lumi[1]))
661              thefiles.addFile(wmbsFile)
662  
663 +        # Create the factory and workflow
664          work = Workflow()
665          subs = Subscription(fileset    = thefiles,    workflow = work,
666                              split_algo = 'LumiBased', type     = "Processing")
# Line 630 | Line 670 | class JobSplitter:
670          list_of_lists = []
671          jobDestination = []
672          jobCount = 0
673 <        self.theNumberOfJobs = 20 #FIXME
674 <        for jobGroup in  jobFactory(lumis_per_job = 50): #FIXME
673 >        lumisCreated = 0
674 >
675 >        if not self.limitJobLumis:
676 >            self.lumisPerJob = pubdata.getMaxLumis() // self.theNumberOfJobs + 1
677 >            common.logger.info('Each job will process about %s lumis.' %
678 >                                self.lumisPerJob)
679 >
680 >        for jobGroup in  jobFactory(lumis_per_job = self.lumisPerJob):
681              for job in jobGroup.jobs:
682 <                if jobCount <  self.theNumberOfJobs:
683 <                    lumis = []
684 <                    lfns  = []
685 <                    locations = []
686 <                    firstFile = True
687 <                    # Collect information from all the files
688 <                    for jobFile in job.getFiles():
689 <                        if firstFile:  # Get locations from first file in the job
690 <                            for loc in jobFile['locations']:
691 <                                locations.append(loc)
692 <                            firstFile = False
693 <                        # Accumulate Lumis from all files
694 <                        for lumiList in jobFile['runs']:
695 <                            theRun = lumiList.run
696 <                            for theLumi in list(lumiList):
697 <                                lumis.append( (theRun, theLumi) )
698 <
699 <                        lfns.append(jobFile['lfn'])
700 <                    fileString = ','.join(lfns)
701 <                    lumiString = compressLumiString(lumis)
702 <                    common.logger.debug('Job %s will run on %s files and %s lumis '
703 <                        % (jobCount, len(lfns), len(lumis) ))
704 <                    list_of_lists.append([fileString, str(-1), str(0), lumiString])
682 >                if (self.limitNJobs and jobCount >= self.theNumberOfJobs):
683 >                    common.logger.info('Limit on number of jobs reached.')
684 >                    break
685 >                if (self.limitTotalLumis and lumisCreated >= self.totalNLumis):
686 >                    common.logger.info('Limit on number of lumis reached.')
687 >                    break
688 >                lumis = []
689 >                lfns  = []
690 >                locations = []
691 >                firstFile = True
692 >                # Collect information from all the files
693 >                for jobFile in job.getFiles():
694 >                    if firstFile:  # Get locations from first file in the job
695 >                        for loc in jobFile['locations']:
696 >                            locations.append(loc)
697 >                        firstFile = False
698 >                    # Accumulate Lumis from all files
699 >                    for lumiList in jobFile['runs']:
700 >                        theRun = lumiList.run
701 >                        for theLumi in list(lumiList):
702 >                            lumis.append( (theRun, theLumi) )
703 >
704 >                    lfns.append(jobFile['lfn'])
705 >                fileString = ','.join(lfns)
706 >                lumiString = compressLumiString(lumis)
707 >                list_of_lists.append([fileString, str(-1), str(0), lumiString])
708 >
709 >                jobDestination.append(locations)
710 >                jobCount += 1
711 >                lumisCreated += len(lumis)
712 >                common.logger.debug('Job %s will run on %s files and %s lumis '
713 >                    % (jobCount, len(lfns), len(lumis) ))
714 >
715 >        common.logger.info('%s jobs created to run on %s lumis' %
716 >                              (jobCount, lumisCreated))
717  
660                    jobDestination.append(locations)
661                    jobCount += 1
718          # Prepare dict output matching back to non-WMBS job creation
719          dictOut = {}
720          dictOut['params'] = ['InputFiles', 'MaxEvents', 'SkipEvents', 'Lumis']

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines