ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Splitter.py
(Generate patch)

Comparing COMP/CRAB/python/Splitter.py (file contents):
Revision 1.23 by spiga, Fri Jun 19 09:54:22 2009 UTC vs.
Revision 1.29.2.1 by ewv, Mon Nov 9 22:23:37 2009 UTC

# Line 1 | Line 1
1 +
2 + __revision__ = "$Id$"
3 + __version__ = "$Revision$"
4 +
5   import common
6 + from sets import Set
7   from crab_exceptions import *
8   from crab_util import *
9 +
10 + from WMCore.DataStructs.File import File
11 + from WMCore.DataStructs.Fileset import Fileset
12 + from WMCore.DataStructs.Run import Run
13 + from WMCore.DataStructs.Subscription import Subscription
14 + from WMCore.DataStructs.Workflow import Workflow
15 + from WMCore.JobSplitting.SplitterFactory import SplitterFactory
16   from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
17  
18   class JobSplitter:
19      def __init__( self, cfg_params,  args ):
20          self.cfg_params = cfg_params
21          self.args=args
22 +
23 +        self.lumisPerJob = -1
24 +        self.totalNLumis = 0
25 +        self.theNumberOfJobs = 0
26 +        self.limitNJobs = False
27 +        self.limitTotalLumis = False
28 +        self.limitJobLumis = False
29 +
30          #self.maxEvents
31          # init BlackWhiteListParser
32 <        seWhiteList = cfg_params.get('GRID.se_white_list',[])
32 >        self.seWhiteList = cfg_params.get('GRID.se_white_list',[])
33          seBlackList = cfg_params.get('GRID.se_black_list',[])
34 <        self.blackWhiteListParser = SEBlackWhiteListParser(seWhiteList, seBlackList, common.logger())
34 >        self.blackWhiteListParser = SEBlackWhiteListParser(self.seWhiteList, seBlackList, common.logger())
35  
36  
37      def checkUserSettings(self):
# Line 43 | Line 63 | class JobSplitter:
63              self.selectTotalNumberEvents = 0
64  
65  
66 +    def checkLumiSettings(self):
67 +        """
68 +        Check to make sure the user has specified enough information to
69 +        perform splitting by Lumis to run the job
70 +        """
71 +        settings = 0
72 +        if self.cfg_params.has_key('CMSSW.lumis_per_job'):
73 +            self.lumisPerJob =int( self.cfg_params['CMSSW.lumis_per_job'])
74 +            self.limitJobLumis = True
75 +            settings += 1
76 +
77 +        if self.cfg_params.has_key('CMSSW.number_of_jobs'):
78 +            self.theNumberOfJobs =int( self.cfg_params['CMSSW.number_of_jobs'])
79 +            self.limitNJobs = True
80 +            settings += 1
81 +
82 +        if self.cfg_params.has_key('CMSSW.total_number_of_lumis'):
83 +            self.totalNLumis = int(self.cfg_params['CMSSW.total_number_of_lumis'])
84 +            self.limitTotalLumis = (self.totalNLumis != -1)
85 +            settings += 1
86 +
87 +        if settings != 2:
88 +            msg = 'When running on analysis datasets you must specify two and only two of:\n'
89 +            msg += '  number_of_jobs, lumis_per_job, total_number_of_lumis'
90 +            raise CrabException(msg)
91 +        if self.limitNJobs and self.limitJobLumis:
92 +            self.limitTotalLumis = True
93 +            self.totalNLumis = self.lumisPerJob * self.theNumberOfJobs
94 +
95 +
96 +    def ComputeSubBlockSites( self, blockSites ):
97 +        """
98 +        """
99 +        sub_blockSites = {}
100 +        for k,v in blockSites.iteritems():
101 +            sites=self.blackWhiteListParser.checkWhiteList(v)
102 +            if sites : sub_blockSites[k]=v
103 +        if len(sub_blockSites) < 1:
104 +            msg = 'WARNING: the sites %s is not hosting any part of data.'%self.seWhiteList
105 +            raise CrabException(msg)
106 +        return sub_blockSites
107 +
108   ########################################################################
109      def jobSplittingByEvent( self ):
110          """
# Line 77 | Line 139 | class JobSplitter:
139          self.useParent = int(self.cfg_params.get('CMSSW.use_parent',0))
140          noBboundary = int(self.cfg_params.get('CMSSW.no_block_boundary',0))
141  
142 +        if noBboundary == 1:
143 +            if self.total_number_of_events== -1:
144 +                msg = 'You are selecting no_block_boundary=1 which does not allow to set total_number_of_events=-1\n'
145 +                msg +='\tYou shoud get the number of event from DBS web interface and use it for your configuration.'
146 +                raise CrabException(msg)
147 +            if len(self.seWhiteList) == 0 or  len(self.seWhiteList.split(',')) != 1:
148 +                msg = 'You are selecting no_block_boundary=1 which requires to choose one and only one site.\n'
149 +                msg += "\tPlease set se_white_list with the site's storage element name."
150 +                raise  CrabException(msg)
151 +            blockSites = self.ComputeSubBlockSites(blockSites)
152 +
153          # ---- Handle the possible job splitting configurations ---- #
154          if (self.selectTotalNumberEvents):
155              totalEventsRequested = self.total_number_of_events
# Line 319 | Line 392 | class JobSplitter:
392                  virgola = ","
393              for block in bloskNoSite:
394                  msg += ' ' + str(block) + virgola
395 <            msg += '\n               Related jobs:\n                 '
395 >            msg += '\n\t\tRelated jobs:\n                 '
396              virgola = ""
397              if len(noSiteBlock) > 1:
398                  virgola = ","
399              for range_jobs in noSiteBlock:
400                  msg += str(range_jobs) + virgola
401 <            msg += '\n               will not be submitted and this block of data can not be analyzed!\n'
401 >            msg += '\n\t\twill not be submitted and this block of data can not be analyzed!\n'
402              if self.cfg_params.has_key('GRID.se_white_list'):
403 <                msg += 'WARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
404 <                msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
405 <                msg += 'Please check if the dataset is available at this site!)\n'
403 >                msg += '\tWARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
404 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
405 >                msg += '\tPlease check if the dataset is available at this site!)'
406              if self.cfg_params.has_key('GRID.ce_white_list'):
407 <                msg += 'WARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
408 <                msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
409 <                msg += 'Please check if the dataset is available at this site!)\n'
407 >                msg += '\tWARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
408 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
409 >                msg += '\tPlease check if the dataset is available at this site!)\n'
410  
411              common.logger.info(msg)
412  
# Line 347 | Line 420 | class JobSplitter:
420      def jobSplittingByRun(self):
421          """
422          """
350        from sets import Set
351        from WMCore.JobSplitting.RunBased import RunBased
352        from WMCore.DataStructs.Workflow import Workflow
353        from WMCore.DataStructs.File import File
354        from WMCore.DataStructs.Fileset import Fileset
355        from WMCore.DataStructs.Subscription import Subscription
356        from WMCore.JobSplitting.SplitterFactory import SplitterFactory
357        from WMCore.DataStructs.Run import Run
423  
424          self.checkUserSettings()
425          blockSites = self.args['blockSites']
# Line 474 | Line 539 | class JobSplitter:
539  
540          managedGenerators =self.args['managedGenerators']
541          generator = self.args['generator']
542 <        firstRun = self.cfg_params.get('CMSSW.first_run',None)
542 >        firstLumi = self.cfg_params.get('CMSSW.first_lumi', 1)
543  
544          self.prepareSplittingNoInput()
545  
# Line 493 | Line 558 | class JobSplitter:
558          self.list_of_args = []
559          for i in range(self.total_number_of_jobs):
560              ## Since there is no input, any site is good
561 <            jobDestination.append([""]) #must be empty to write correctly the xml
561 >            jobDestination.append([""]) # must be empty to correctly write the XML
562              args=[]
563 <            if (firstRun):
564 <                ## pythia first run
500 <                args.append(str(firstRun)+str(i))
563 >            if (firstLumi): # Pythia first lumi
564 >                args.append(str(int(firstLumi)+i))
565              if (generator in managedGenerators):
566                 args.append(generator)
567                 if (generator == 'comphep' and i == 0):
# Line 511 | Line 575 | class JobSplitter:
575  
576          dictOut = {}
577          dictOut['params'] = ['MaxEvents']
578 <        if (firstRun):
579 <            dictOut['params'] = ['FirstRun','MaxEvents']
580 <            if ( generator in managedGenerators ) :
581 <                dictOut['params'] = ['FirstRun', 'Generator', 'FirstEvent', 'MaxEvents']
578 >        if (firstLumi):
579 >            dictOut['params'] = ['FirstLumi','MaxEvents']
580 >            if (generator in managedGenerators):
581 >                dictOut['params'] = ['FirstLumi', 'Generator', 'FirstEvent', 'MaxEvents']
582          else:
583              if (generator in managedGenerators) :
584                  dictOut['params'] = ['Generator', 'FirstEvent', 'MaxEvents']
# Line 565 | Line 629 | class JobSplitter:
629  
630      def jobSplittingByLumi(self):
631          """
632 +        Split task into jobs by Lumi section paying attention to which
633 +        lumis should be run (according to the analysis dataset).
634 +        This uses WMBS job splitting which does not split files over jobs
635 +        so the job will have AT LEAST as many lumis as requested, perhaps
636 +        more
637          """
638 <        return
638 >
639 >        common.logger.debug('Splitting by Lumi')
640 >        self.checkLumiSettings()
641 >
642 >        blockSites = self.args['blockSites']
643 >        pubdata = self.args['pubdata']
644 >
645 >        lumisPerFile  = pubdata.getLumis()
646 >
647 >        # Make the list of WMBS files for job splitter
648 >        fileList = pubdata.getListFiles()
649 >        thefiles = Fileset(name='FilesToSplit')
650 >        for jobFile in fileList:
651 >            block = jobFile['Block']['Name']
652 >            try:
653 >                jobFile['Block']['StorageElementList'].extend(blockSites[block])
654 >            except:
655 >                continue
656 >            wmbsFile = File(jobFile['LogicalFileName'])
657 >            [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
658 >            wmbsFile['block'] = block
659 >            for lumi in lumisPerFile[jobFile['LogicalFileName']]:
660 >                wmbsFile.addRun(Run(lumi[0], lumi[1]))
661 >            thefiles.addFile(wmbsFile)
662 >
663 >        # Create the factory and workflow
664 >        work = Workflow()
665 >        subs = Subscription(fileset    = thefiles,    workflow = work,
666 >                            split_algo = 'LumiBased', type     = "Processing")
667 >        splitter = SplitterFactory()
668 >        jobFactory = splitter(subs)
669 >
670 >        list_of_lists = []
671 >        jobDestination = []
672 >        jobCount = 0
673 >        lumisCreated = 0
674 >
675 >        if not self.limitJobLumis:
676 >            self.lumisPerJob = pubdata.getMaxLumis() // self.theNumberOfJobs + 1
677 >            common.logger.info('Each job will process about %s lumis.' %
678 >                                self.lumisPerJob)
679 >
680 >        for jobGroup in  jobFactory(lumis_per_job = self.lumisPerJob):
681 >            for job in jobGroup.jobs:
682 >                if (self.limitNJobs and jobCount >= self.theNumberOfJobs):
683 >                    common.logger.info('Limit on number of jobs reached.')
684 >                    break
685 >                if (self.limitTotalLumis and lumisCreated >= self.totalNLumis):
686 >                    common.logger.info('Limit on number of lumis reached.')
687 >                    break
688 >                lumis = []
689 >                lfns  = []
690 >                locations = []
691 >                firstFile = True
692 >                # Collect information from all the files
693 >                for jobFile in job.getFiles():
694 >                    if firstFile:  # Get locations from first file in the job
695 >                        for loc in jobFile['locations']:
696 >                            locations.append(loc)
697 >                        firstFile = False
698 >                    # Accumulate Lumis from all files
699 >                    for lumiList in jobFile['runs']:
700 >                        theRun = lumiList.run
701 >                        for theLumi in list(lumiList):
702 >                            lumis.append( (theRun, theLumi) )
703 >
704 >                    lfns.append(jobFile['lfn'])
705 >                fileString = ','.join(lfns)
706 >                lumiString = compressLumiString(lumis)
707 >                list_of_lists.append([fileString, str(-1), str(0), lumiString])
708 >
709 >                jobDestination.append(locations)
710 >                jobCount += 1
711 >                lumisCreated += len(lumis)
712 >                common.logger.debug('Job %s will run on %s files and %s lumis '
713 >                    % (jobCount, len(lfns), len(lumis) ))
714 >
715 >        common.logger.info('%s jobs created to run on %s lumis' %
716 >                              (jobCount, lumisCreated))
717 >
718 >        # Prepare dict output matching back to non-WMBS job creation
719 >        dictOut = {}
720 >        dictOut['params'] = ['InputFiles', 'MaxEvents', 'SkipEvents', 'Lumis']
721 >        dictOut['args'] = list_of_lists
722 >        dictOut['jobDestination'] = jobDestination
723 >        dictOut['njobs'] = jobCount
724 >
725 >        return dictOut
726 >
727 >
728      def Algos(self):
729          """
730          Define key splittingType matrix
# Line 580 | Line 738 | class JobSplitter:
738                       }
739          return SplitAlogs
740  
741 +
742 +
743 + def compressLumiString(lumis):
744 +    """
745 +    Turn a list of 2-tuples of run/lumi numbers into a list of the format
746 +    R1:L1,R2:L2-R3:L3 which is acceptable to CMSSW LumiBlockRange variable
747 +    """
748 +
749 +    lumis.sort()
750 +    parts = []
751 +    startRange = None
752 +    endRange = None
753 +
754 +    for lumiBlock in lumis:
755 +        if not startRange: # This is the first one
756 +            startRange = lumiBlock
757 +            endRange = lumiBlock
758 +        elif lumiBlock == endRange: # Same Lumi (different files?)
759 +            pass
760 +        elif lumiBlock[0] == endRange[0] and lumiBlock[1] == endRange[1] + 1: # This is a continuation
761 +            endRange = lumiBlock
762 +        else: # This is the start of a new range
763 +            part = ':'.join(map(str, startRange))
764 +            if startRange != endRange:
765 +                part += '-' + ':'.join(map(str, endRange))
766 +            parts.append(part)
767 +            startRange = lumiBlock
768 +            endRange = lumiBlock
769 +
770 +    # Put out what's left
771 +    if startRange:
772 +        part = ':'.join(map(str, startRange))
773 +        if startRange != endRange:
774 +            part += '-' + ':'.join(map(str, endRange))
775 +        parts.append(part)
776 +
777 +    output = ','.join(parts)
778 +    return output
779 +
780 +

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines