ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Splitter.py
(Generate patch)

Comparing COMP/CRAB/python/Splitter.py (file contents):
Revision 1.23 by spiga, Fri Jun 19 09:54:22 2009 UTC vs.
Revision 1.36 by ewv, Fri Apr 9 13:17:52 2010 UTC

# Line 1 | Line 1
1 +
2 + __revision__ = "$Id$"
3 + __version__ = "$Revision$"
4 +
5   import common
6   from crab_exceptions import *
7   from crab_util import *
8 +
9 + from WMCore.DataStructs.File import File
10 + from WMCore.DataStructs.Fileset import Fileset
11 + from WMCore.DataStructs.Run import Run
12 + from WMCore.DataStructs.Subscription import Subscription
13 + from WMCore.DataStructs.Workflow import Workflow
14 + from WMCore.JobSplitting.SplitterFactory import SplitterFactory
15   from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
16 + from LumiList import LumiList
17  
18   class JobSplitter:
19      def __init__( self, cfg_params,  args ):
20          self.cfg_params = cfg_params
21          self.args=args
22 +
23 +        self.lumisPerJob = -1
24 +        self.totalNLumis = 0
25 +        self.theNumberOfJobs = 0
26 +        self.limitNJobs = False
27 +        self.limitTotalLumis = False
28 +        self.limitJobLumis = False
29 +
30          #self.maxEvents
31          # init BlackWhiteListParser
32 <        seWhiteList = cfg_params.get('GRID.se_white_list',[])
32 >        self.seWhiteList = cfg_params.get('GRID.se_white_list',[])
33          seBlackList = cfg_params.get('GRID.se_black_list',[])
34 <        self.blackWhiteListParser = SEBlackWhiteListParser(seWhiteList, seBlackList, common.logger())
34 >        self.blackWhiteListParser = SEBlackWhiteListParser(self.seWhiteList, seBlackList, common.logger())
35  
36  
37      def checkUserSettings(self):
# Line 43 | Line 63 | class JobSplitter:
63              self.selectTotalNumberEvents = 0
64  
65  
66 +    def checkLumiSettings(self):
67 +        """
68 +        Check to make sure the user has specified enough information to
69 +        perform splitting by Lumis to run the job
70 +        """
71 +        settings = 0
72 +        if self.cfg_params.has_key('CMSSW.lumis_per_job'):
73 +            self.lumisPerJob =int( self.cfg_params['CMSSW.lumis_per_job'])
74 +            self.limitJobLumis = True
75 +            settings += 1
76 +
77 +        if self.cfg_params.has_key('CMSSW.number_of_jobs'):
78 +            self.theNumberOfJobs =int( self.cfg_params['CMSSW.number_of_jobs'])
79 +            self.limitNJobs = True
80 +            settings += 1
81 +
82 +        if self.cfg_params.has_key('CMSSW.total_number_of_lumis'):
83 +            self.totalNLumis = int(self.cfg_params['CMSSW.total_number_of_lumis'])
84 +            self.limitTotalLumis = (self.totalNLumis != -1)
85 +            settings += 1
86 +
87 +        if settings != 2:
88 +            msg = 'When splitting by lumi section you must specify two and only two of:\n'
89 +            msg += '  number_of_jobs, lumis_per_job, total_number_of_lumis'
90 +            raise CrabException(msg)
91 +        if self.limitNJobs and self.limitJobLumis:
92 +            self.limitTotalLumis = True
93 +            self.totalNLumis = self.lumisPerJob * self.theNumberOfJobs
94 +
95 +
96 +    def ComputeSubBlockSites( self, blockSites ):
97 +        """
98 +        """
99 +        sub_blockSites = {}
100 +        for k,v in blockSites.iteritems():
101 +            sites=self.blackWhiteListParser.checkWhiteList(v)
102 +            if sites : sub_blockSites[k]=v
103 +        if len(sub_blockSites) < 1:
104 +            msg = 'WARNING: the sites %s is not hosting any part of data.'%self.seWhiteList
105 +            raise CrabException(msg)
106 +        return sub_blockSites
107 +
108   ########################################################################
109      def jobSplittingByEvent( self ):
110          """
# Line 77 | Line 139 | class JobSplitter:
139          self.useParent = int(self.cfg_params.get('CMSSW.use_parent',0))
140          noBboundary = int(self.cfg_params.get('CMSSW.no_block_boundary',0))
141  
142 +        if noBboundary == 1:
143 +            if self.total_number_of_events== -1:
144 +                msg = 'You are selecting no_block_boundary=1 which does not allow to set total_number_of_events=-1\n'
145 +                msg +='\tYou shoud get the number of event from DBS web interface and use it for your configuration.'
146 +                raise CrabException(msg)
147 +            if len(self.seWhiteList) == 0 or  len(self.seWhiteList.split(',')) != 1:
148 +                msg = 'You are selecting no_block_boundary=1 which requires to choose one and only one site.\n'
149 +                msg += "\tPlease set se_white_list with the site's storage element name."
150 +                raise  CrabException(msg)
151 +            blockSites = self.ComputeSubBlockSites(blockSites)
152 +
153          # ---- Handle the possible job splitting configurations ---- #
154          if (self.selectTotalNumberEvents):
155              totalEventsRequested = self.total_number_of_events
# Line 319 | Line 392 | class JobSplitter:
392                  virgola = ","
393              for block in bloskNoSite:
394                  msg += ' ' + str(block) + virgola
395 <            msg += '\n               Related jobs:\n                 '
395 >            msg += '\n\t\tRelated jobs:\n                 '
396              virgola = ""
397              if len(noSiteBlock) > 1:
398                  virgola = ","
399              for range_jobs in noSiteBlock:
400                  msg += str(range_jobs) + virgola
401 <            msg += '\n               will not be submitted and this block of data can not be analyzed!\n'
401 >            msg += '\n\t\twill not be submitted and this block of data can not be analyzed!\n'
402              if self.cfg_params.has_key('GRID.se_white_list'):
403 <                msg += 'WARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
404 <                msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
405 <                msg += 'Please check if the dataset is available at this site!)\n'
403 >                msg += '\tWARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
404 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
405 >                msg += '\tPlease check if the dataset is available at this site!)'
406              if self.cfg_params.has_key('GRID.ce_white_list'):
407 <                msg += 'WARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
408 <                msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
409 <                msg += 'Please check if the dataset is available at this site!)\n'
407 >                msg += '\tWARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
408 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
409 >                msg += '\tPlease check if the dataset is available at this site!)\n'
410  
411              common.logger.info(msg)
412  
413          if bloskNoSite == allBlock:
414 <            raise CrabException('No jobs created')
414 >            msg += 'Requested jobs cannot be Created! \n'
415 >            if self.cfg_params.has_key('GRID.se_white_list'):
416 >                msg += '\tWARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
417 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
418 >                msg += '\tPlease check if the dataset is available at this site!)'
419 >            if self.cfg_params.has_key('GRID.ce_white_list'):
420 >                msg += '\tWARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
421 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
422 >                msg += '\tPlease check if the dataset is available at this site!)\n'
423 >            raise CrabException(msg)
424  
425          return
426  
# Line 347 | Line 429 | class JobSplitter:
429      def jobSplittingByRun(self):
430          """
431          """
350        from sets import Set
351        from WMCore.JobSplitting.RunBased import RunBased
352        from WMCore.DataStructs.Workflow import Workflow
353        from WMCore.DataStructs.File import File
354        from WMCore.DataStructs.Fileset import Fileset
355        from WMCore.DataStructs.Subscription import Subscription
356        from WMCore.JobSplitting.SplitterFactory import SplitterFactory
357        from WMCore.DataStructs.Run import Run
432  
433          self.checkUserSettings()
434          blockSites = self.args['blockSites']
# Line 393 | Line 467 | class JobSplitter:
467          jobfactory = splitter(subs)
468  
469          #loop over all runs
396        set = Set(runList)
470          list_of_lists = []
471          jobDestination = []
472          count = 0
# Line 474 | Line 547 | class JobSplitter:
547  
548          managedGenerators =self.args['managedGenerators']
549          generator = self.args['generator']
550 <        firstRun = self.cfg_params.get('CMSSW.first_run',None)
550 >        firstLumi = self.cfg_params.get('CMSSW.first_lumi', 1)
551  
552          self.prepareSplittingNoInput()
553  
# Line 493 | Line 566 | class JobSplitter:
566          self.list_of_args = []
567          for i in range(self.total_number_of_jobs):
568              ## Since there is no input, any site is good
569 <            jobDestination.append([""]) #must be empty to write correctly the xml
569 >            jobDestination.append([""]) # must be empty to correctly write the XML
570              args=[]
571 <            if (firstRun):
572 <                ## pythia first run
500 <                args.append(str(firstRun)+str(i))
571 >            if (firstLumi): # Pythia first lumi
572 >                args.append(str(int(firstLumi)+i))
573              if (generator in managedGenerators):
574                 args.append(generator)
575                 if (generator == 'comphep' and i == 0):
# Line 511 | Line 583 | class JobSplitter:
583  
584          dictOut = {}
585          dictOut['params'] = ['MaxEvents']
586 <        if (firstRun):
587 <            dictOut['params'] = ['FirstRun','MaxEvents']
588 <            if ( generator in managedGenerators ) :
589 <                dictOut['params'] = ['FirstRun', 'Generator', 'FirstEvent', 'MaxEvents']
586 >        if (firstLumi):
587 >            dictOut['params'] = ['FirstLumi','MaxEvents']
588 >            if (generator in managedGenerators):
589 >                dictOut['params'] = ['FirstLumi', 'Generator', 'FirstEvent', 'MaxEvents']
590          else:
591              if (generator in managedGenerators) :
592                  dictOut['params'] = ['Generator', 'FirstEvent', 'MaxEvents']
# Line 565 | Line 637 | class JobSplitter:
637  
638      def jobSplittingByLumi(self):
639          """
640 +        Split task into jobs by Lumi section paying attention to which
641 +        lumis should be run (according to the analysis dataset).
642 +        This uses WMBS job splitting which does not split files over jobs
643 +        so the job will have AT LEAST as many lumis as requested, perhaps
644 +        more
645          """
646 <        return
646 >
647 >        common.logger.debug('Splitting by Lumi')
648 >        self.checkLumiSettings()
649 >
650 >        blockSites = self.args['blockSites']
651 >        pubdata = self.args['pubdata']
652 >
653 >        lumisPerFile  = pubdata.getLumis()
654 >
655 >        # Make the list of WMBS files for job splitter
656 >        fileList = pubdata.getListFiles()
657 >        thefiles = Fileset(name='FilesToSplit')
658 >        for jobFile in fileList:
659 >            block = jobFile['Block']['Name']
660 >            try:
661 >                jobFile['Block']['StorageElementList'].extend(blockSites[block])
662 >            except:
663 >                continue
664 >            wmbsFile = File(jobFile['LogicalFileName'])
665 >            [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
666 >            wmbsFile['block'] = block
667 >            for lumi in lumisPerFile[jobFile['LogicalFileName']]:
668 >                wmbsFile.addRun(Run(lumi[0], lumi[1]))
669 >            thefiles.addFile(wmbsFile)
670 >
671 >        # Create the factory and workflow
672 >        work = Workflow()
673 >        subs = Subscription(fileset    = thefiles,    workflow = work,
674 >                            split_algo = 'LumiBased', type     = "Processing")
675 >        splitter = SplitterFactory()
676 >        jobFactory = splitter(subs)
677 >
678 >        list_of_lists = []
679 >        jobDestination = []
680 >        jobCount = 0
681 >        lumisCreated = 0
682 >
683 >        if not self.limitJobLumis:
684 >            self.lumisPerJob = pubdata.getMaxLumis() // self.theNumberOfJobs + 1
685 >            common.logger.info('Each job will process about %s lumis.' %
686 >                                self.lumisPerJob)
687 >
688 >        for jobGroup in  jobFactory(lumis_per_job = self.lumisPerJob):
689 >            for job in jobGroup.jobs:
690 >                if (self.limitNJobs and jobCount >= self.theNumberOfJobs):
691 >                    common.logger.info('Limit on number of jobs reached.')
692 >                    break
693 >                if (self.limitTotalLumis and lumisCreated >= self.totalNLumis):
694 >                    common.logger.info('Limit on number of lumis reached.')
695 >                    break
696 >                lumis = []
697 >                lfns  = []
698 >                locations = []
699 >                firstFile = True
700 >                # Collect information from all the files
701 >                for jobFile in job.getFiles():
702 >                    doFile = False
703 >                    if firstFile:  # Get locations from first file in the job
704 >                        for loc in jobFile['locations']:
705 >                            locations.append(loc)
706 >                        firstFile = False
707 >                    # Accumulate Lumis from all files
708 >                    for lumiList in jobFile['runs']:
709 >                        theRun = lumiList.run
710 >                        for theLumi in list(lumiList):
711 >                            if (not self.limitTotalLumis) or \
712 >                               (lumisCreated <= self.totalNLumis):
713 >                                doFile = True
714 >                                lumisCreated += 1
715 >                                lumis.append( (theRun, theLumi) )
716 >                    if doFile:
717 >                        lfns.append(jobFile['lfn'])
718 >                fileString = ','.join(lfns)
719 >                lumiLister = LumiList(lumis = lumis)
720 >                lumiString = lumiLister.getCMSSWString()
721 >                list_of_lists.append([fileString, str(-1), str(0), lumiString])
722 >
723 >                jobDestination.append(locations)
724 >                jobCount += 1
725 >                common.logger.debug('Job %s will run on %s files and %s lumis '
726 >                    % (jobCount, len(lfns), len(lumis) ))
727 >
728 >        common.logger.info('%s jobs created to run on %s lumis' %
729 >                              (jobCount, lumisCreated))
730 >
731 >        # Prepare dict output matching back to non-WMBS job creation
732 >        dictOut = {}
733 >        dictOut['params'] = ['InputFiles', 'MaxEvents', 'SkipEvents', 'Lumis']
734 >        dictOut['args'] = list_of_lists
735 >        dictOut['jobDestination'] = jobDestination
736 >        dictOut['njobs'] = jobCount
737 >
738 >        return dictOut
739 >
740 >
741      def Algos(self):
742          """
743          Define key splittingType matrix

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines