ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Splitter.py
(Generate patch)

Comparing COMP/CRAB/python/Splitter.py (file contents):
Revision 1.22 by ewv, Wed Jun 17 20:58:07 2009 UTC vs.
Revision 1.29 by ewv, Thu Oct 1 22:00:40 2009 UTC

# Line 1 | Line 1
1 +
2 + __revision__ = "$Id$"
3 + __version__ = "$Revision$"
4 +
5   import common
6 + from sets import Set
7   from crab_exceptions import *
8   from crab_util import *
9 +
10 + from WMCore.DataStructs.File import File
11 + from WMCore.DataStructs.Fileset import Fileset
12 + from WMCore.DataStructs.Run import Run
13 + from WMCore.DataStructs.Subscription import Subscription
14 + from WMCore.DataStructs.Workflow import Workflow
15 + from WMCore.JobSplitting.SplitterFactory import SplitterFactory
16   from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
17  
18   class JobSplitter:
19      def __init__( self, cfg_params,  args ):
20          self.cfg_params = cfg_params
21          self.args=args
22 +
23 +        self.lumisPerJob = -1
24 +        self.totalNLumis = 0
25 +        self.theNumberOfJobs = 0
26 +        self.limitNJobs = False
27 +        self.limitTotalLumis = False
28 +        self.limitJobLumis = False
29 +
30          #self.maxEvents
31          # init BlackWhiteListParser
32 <        seWhiteList = cfg_params.get('GRID.se_white_list',[])
32 >        self.seWhiteList = cfg_params.get('GRID.se_white_list',[])
33          seBlackList = cfg_params.get('GRID.se_black_list',[])
34 <        self.blackWhiteListParser = SEBlackWhiteListParser(seWhiteList, seBlackList, common.logger())
34 >        self.blackWhiteListParser = SEBlackWhiteListParser(self.seWhiteList, seBlackList, common.logger())
35  
36  
37      def checkUserSettings(self):
# Line 43 | Line 63 | class JobSplitter:
63              self.selectTotalNumberEvents = 0
64  
65  
66 +    def checkLumiSettings(self):
67 +        """
68 +        Check to make sure the user has specified enough information to
69 +        perform splitting by Lumis to run the job
70 +        """
71 +        settings = 0
72 +        if self.cfg_params.has_key('CMSSW.lumis_per_job'):
73 +            self.lumisPerJob =int( self.cfg_params['CMSSW.lumis_per_job'])
74 +            self.limitJobLumis = True
75 +            settings += 1
76 +
77 +        if self.cfg_params.has_key('CMSSW.number_of_jobs'):
78 +            self.theNumberOfJobs =int( self.cfg_params['CMSSW.number_of_jobs'])
79 +            self.limitNJobs = True
80 +            settings += 1
81 +
82 +        if self.cfg_params.has_key('CMSSW.total_number_of_lumis'):
83 +            self.totalNLumis = int(self.cfg_params['CMSSW.total_number_of_lumis'])
84 +            self.limitTotalLumis = (self.totalNLumis != -1)
85 +            settings += 1
86 +
87 +        if settings != 2:
88 +            msg = 'When running on analysis datasets you must specify two and only two of:\n'
89 +            msg += '  number_of_jobs, lumis_per_job, total_number_of_lumis'
90 +            raise CrabException(msg)
91 +        if self.limitNJobs and self.limitJobLumis:
92 +            self.limitTotalLumis = True
93 +            self.totalNLumis = self.lumisPerJob * self.theNumberOfJobs
94 +
95 +
96 +    def ComputeSubBlockSites( self, blockSites ):
97 +        """
98 +        """
99 +        sub_blockSites = {}
100 +        for k,v in blockSites.iteritems():
101 +            sites=self.blackWhiteListParser.checkWhiteList(v)
102 +            if sites : sub_blockSites[k]=v
103 +        if len(sub_blockSites) < 1:
104 +            msg = 'WARNING: the sites %s is not hosting any part of data.'%self.seWhiteList
105 +            raise CrabException(msg)
106 +        return sub_blockSites
107 +
108   ########################################################################
109      def jobSplittingByEvent( self ):
110          """
# Line 77 | Line 139 | class JobSplitter:
139          self.useParent = int(self.cfg_params.get('CMSSW.use_parent',0))
140          noBboundary = int(self.cfg_params.get('CMSSW.no_block_boundary',0))
141  
142 +        if noBboundary == 1:
143 +            if self.total_number_of_events== -1:
144 +                msg = 'You are selecting no_block_boundary=1 which does not allow to set total_number_of_events=-1\n'
145 +                msg +='\tYou shoud get the number of event from DBS web interface and use it for your configuration.'
146 +                raise CrabException(msg)
147 +            if len(self.seWhiteList) == 0 or  len(self.seWhiteList.split(',')) != 1:
148 +                msg = 'You are selecting no_block_boundary=1 which requires to choose one and only one site.\n'
149 +                msg += "\tPlease set se_white_list with the site's storage element name."
150 +                raise  CrabException(msg)
151 +            blockSites = self.ComputeSubBlockSites(blockSites)
152 +
153          # ---- Handle the possible job splitting configurations ---- #
154          if (self.selectTotalNumberEvents):
155              totalEventsRequested = self.total_number_of_events
# Line 319 | Line 392 | class JobSplitter:
392                  virgola = ","
393              for block in bloskNoSite:
394                  msg += ' ' + str(block) + virgola
395 <            msg += '\n               Related jobs:\n                 '
395 >            msg += '\n\t\tRelated jobs:\n                 '
396              virgola = ""
397              if len(noSiteBlock) > 1:
398                  virgola = ","
399              for range_jobs in noSiteBlock:
400                  msg += str(range_jobs) + virgola
401 <            msg += '\n               will not be submitted and this block of data can not be analyzed!\n'
401 >            msg += '\n\t\twill not be submitted and this block of data can not be analyzed!\n'
402              if self.cfg_params.has_key('GRID.se_white_list'):
403 <                msg += 'WARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
404 <                msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
405 <                msg += 'Please check if the dataset is available at this site!)\n'
403 >                msg += '\tWARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
404 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
405 >                msg += '\tPlease check if the dataset is available at this site!)'
406              if self.cfg_params.has_key('GRID.ce_white_list'):
407 <                msg += 'WARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
408 <                msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
409 <                msg += 'Please check if the dataset is available at this site!)\n'
407 >                msg += '\tWARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
408 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
409 >                msg += '\tPlease check if the dataset is available at this site!)\n'
410  
411              common.logger.info(msg)
412  
# Line 347 | Line 420 | class JobSplitter:
420      def jobSplittingByRun(self):
421          """
422          """
350        from sets import Set
351        from WMCore.JobSplitting.RunBased import RunBased
352        from WMCore.DataStructs.Workflow import Workflow
353        from WMCore.DataStructs.File import File
354        from WMCore.DataStructs.Fileset import Fileset
355        from WMCore.DataStructs.Subscription import Subscription
356        from WMCore.JobSplitting.SplitterFactory import SplitterFactory
357        from WMCore.DataStructs.Run import Run
423  
424          self.checkUserSettings()
425          blockSites = self.args['blockSites']
# Line 435 | Line 500 | class JobSplitter:
500          return res
501  
502   ########################################################################
503 <    def jobSplittingNoInput(self):
503 >    def prepareSplittingNoInput(self):
504          """
440        Perform job splitting based on number of event per job
505          """
442        common.logger.debug('Splitting per events')
443        self.checkUserSettings()
444        jobDestination=[]
445        if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
446            msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
447            raise CrabException(msg)
448
449        managedGenerators =self.args['managedGenerators']
450        generator = self.args['generator']
451        firstRun = self.cfg_params.get('CMSSW.first_run',None)
452
506          if (self.selectEventsPerJob):
507              common.logger.info('Required '+str(self.eventsPerJob)+' events per job ')
508          if (self.selectNumberOfJobs):
# Line 472 | Line 525 | class JobSplitter:
525              self.total_number_of_jobs = self.theNumberOfJobs
526              self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
527  
528 +
529 +    def jobSplittingNoInput(self):
530 +        """
531 +        Perform job splitting based on number of event per job
532 +        """
533 +        common.logger.debug('Splitting per events')
534 +        self.checkUserSettings()
535 +        jobDestination=[]
536 +        if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
537 +            msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
538 +            raise CrabException(msg)
539 +
540 +        managedGenerators =self.args['managedGenerators']
541 +        generator = self.args['generator']
542 +        firstRun = self.cfg_params.get('CMSSW.first_run', 1)
543 +
544 +        self.prepareSplittingNoInput()
545 +
546          common.logger.debug('N jobs  '+str(self.total_number_of_jobs))
547  
548          # is there any remainder?
# Line 487 | Line 558 | class JobSplitter:
558          self.list_of_args = []
559          for i in range(self.total_number_of_jobs):
560              ## Since there is no input, any site is good
561 <            jobDestination.append([""]) #must be empty to write correctly the xml
561 >            jobDestination.append([""]) # must be empty to correctly write the XML
562              args=[]
563 <            if (firstRun):
564 <                ## pythia first run
494 <                args.append(str(firstRun)+str(i))
563 >            if (firstRun): # Pythia first run
564 >                args.append(str(int(firstRun)+i))
565              if (generator in managedGenerators):
566                 args.append(generator)
567                 if (generator == 'comphep' and i == 0):
# Line 531 | Line 601 | class JobSplitter:
601          common.logger.debug('Splitting per job')
602          common.logger.info('Required '+str(self.theNumberOfJobs)+' jobs in total ')
603  
604 <        self.total_number_of_jobs = self.theNumberOfJobs
604 > #        self.total_number_of_jobs = self.theNumberOfJobs
605 >
606 >        self.prepareSplittingNoInput()
607  
608          common.logger.debug('N jobs  '+str(self.total_number_of_jobs))
609  
610          common.logger.info(str(self.total_number_of_jobs)+' jobs can be created')
611  
612          # argument is seed number.$i
613 <        #self.list_of_args = []
613 >        self.list_of_args = []
614          for i in range(self.total_number_of_jobs):
615 +            args=[]
616              jobDestination.append([""])
617 <        #   self.list_of_args.append([str(i)])
617 >            if self.eventsPerJob != 0 :
618 >                args.append(str(self.eventsPerJob))
619 >                self.list_of_args.append(args)
620  
621         # prepare dict output
622          dictOut = {}
623 <        dictOut['args'] = [] # self.list_of_args
623 >        dictOut['params'] = ['MaxEvents']
624 >        dictOut['args'] =  self.list_of_args
625          dictOut['jobDestination'] = jobDestination
626          dictOut['njobs']=self.total_number_of_jobs
627          return dictOut
# Line 553 | Line 629 | class JobSplitter:
629  
630      def jobSplittingByLumi(self):
631          """
632 +        Split task into jobs by Lumi section paying attention to which
633 +        lumis should be run (according to the analysis dataset).
634 +        This uses WMBS job splitting which does not split files over jobs
635 +        so the job will have AT LEAST as many lumis as requested, perhaps
636 +        more
637          """
638 <        return
638 >
639 >        common.logger.debug('Splitting by Lumi')
640 >        self.checkLumiSettings()
641 >
642 >        blockSites = self.args['blockSites']
643 >        pubdata = self.args['pubdata']
644 >
645 >        lumisPerFile  = pubdata.getLumis()
646 >
647 >        # Make the list of WMBS files for job splitter
648 >        fileList = pubdata.getListFiles()
649 >        thefiles = Fileset(name='FilesToSplit')
650 >        for jobFile in fileList:
651 >            block = jobFile['Block']['Name']
652 >            try:
653 >                jobFile['Block']['StorageElementList'].extend(blockSites[block])
654 >            except:
655 >                continue
656 >            wmbsFile = File(jobFile['LogicalFileName'])
657 >            [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
658 >            wmbsFile['block'] = block
659 >            for lumi in lumisPerFile[jobFile['LogicalFileName']]:
660 >                wmbsFile.addRun(Run(lumi[0], lumi[1]))
661 >            thefiles.addFile(wmbsFile)
662 >
663 >        # Create the factory and workflow
664 >        work = Workflow()
665 >        subs = Subscription(fileset    = thefiles,    workflow = work,
666 >                            split_algo = 'LumiBased', type     = "Processing")
667 >        splitter = SplitterFactory()
668 >        jobFactory = splitter(subs)
669 >
670 >        list_of_lists = []
671 >        jobDestination = []
672 >        jobCount = 0
673 >        lumisCreated = 0
674 >
675 >        if not self.limitJobLumis:
676 >            self.lumisPerJob = pubdata.getMaxLumis() // self.theNumberOfJobs + 1
677 >            common.logger.info('Each job will process about %s lumis.' %
678 >                                self.lumisPerJob)
679 >
680 >        for jobGroup in  jobFactory(lumis_per_job = self.lumisPerJob):
681 >            for job in jobGroup.jobs:
682 >                if (self.limitNJobs and jobCount >= self.theNumberOfJobs):
683 >                    common.logger.info('Limit on number of jobs reached.')
684 >                    break
685 >                if (self.limitTotalLumis and lumisCreated >= self.totalNLumis):
686 >                    common.logger.info('Limit on number of lumis reached.')
687 >                    break
688 >                lumis = []
689 >                lfns  = []
690 >                locations = []
691 >                firstFile = True
692 >                # Collect information from all the files
693 >                for jobFile in job.getFiles():
694 >                    if firstFile:  # Get locations from first file in the job
695 >                        for loc in jobFile['locations']:
696 >                            locations.append(loc)
697 >                        firstFile = False
698 >                    # Accumulate Lumis from all files
699 >                    for lumiList in jobFile['runs']:
700 >                        theRun = lumiList.run
701 >                        for theLumi in list(lumiList):
702 >                            lumis.append( (theRun, theLumi) )
703 >
704 >                    lfns.append(jobFile['lfn'])
705 >                fileString = ','.join(lfns)
706 >                lumiString = compressLumiString(lumis)
707 >                list_of_lists.append([fileString, str(-1), str(0), lumiString])
708 >
709 >                jobDestination.append(locations)
710 >                jobCount += 1
711 >                lumisCreated += len(lumis)
712 >                common.logger.debug('Job %s will run on %s files and %s lumis '
713 >                    % (jobCount, len(lfns), len(lumis) ))
714 >
715 >        common.logger.info('%s jobs created to run on %s lumis' %
716 >                              (jobCount, lumisCreated))
717 >
718 >        # Prepare dict output matching back to non-WMBS job creation
719 >        dictOut = {}
720 >        dictOut['params'] = ['InputFiles', 'MaxEvents', 'SkipEvents', 'Lumis']
721 >        dictOut['args'] = list_of_lists
722 >        dictOut['jobDestination'] = jobDestination
723 >        dictOut['njobs'] = jobCount
724 >
725 >        return dictOut
726 >
727 >
728      def Algos(self):
729          """
730          Define key splittingType matrix
# Line 568 | Line 738 | class JobSplitter:
738                       }
739          return SplitAlogs
740  
741 +
742 +
743 + def compressLumiString(lumis):
744 +    """
745 +    Turn a list of 2-tuples of run/lumi numbers into a list of the format
746 +    R1:L1,R2:L2-R3:L3 which is acceptable to CMSSW LumiBlockRange variable
747 +    """
748 +
749 +    lumis.sort()
750 +    parts = []
751 +    startRange = None
752 +    endRange = None
753 +
754 +    for lumiBlock in lumis:
755 +        if not startRange: # This is the first one
756 +            startRange = lumiBlock
757 +            endRange = lumiBlock
758 +        elif lumiBlock == endRange: # Same Lumi (different files?)
759 +            pass
760 +        elif lumiBlock[0] == endRange[0] and lumiBlock[1] == endRange[1] + 1: # This is a continuation
761 +            endRange = lumiBlock
762 +        else: # This is the start of a new range
763 +            part = ':'.join(map(str, startRange))
764 +            if startRange != endRange:
765 +                part += '-' + ':'.join(map(str, endRange))
766 +            parts.append(part)
767 +            startRange = lumiBlock
768 +            endRange = lumiBlock
769 +
770 +    # Put out what's left
771 +    if startRange:
772 +        part = ':'.join(map(str, startRange))
773 +        if startRange != endRange:
774 +            part += '-' + ':'.join(map(str, endRange))
775 +        parts.append(part)
776 +
777 +    output = ','.join(parts)
778 +    return output
779 +
780 +

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines