ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Splitter.py
(Generate patch)

Comparing COMP/CRAB/python/Splitter.py (file contents):
Revision 1.22 by ewv, Wed Jun 17 20:58:07 2009 UTC vs.
Revision 1.37 by ewv, Wed May 26 19:46:12 2010 UTC

# Line 1 | Line 1
1 +
2 + __revision__ = "$Id$"
3 + __version__ = "$Revision$"
4 +
5   import common
6   from crab_exceptions import *
7   from crab_util import *
8 +
9 + from WMCore.DataStructs.File import File
10 + from WMCore.DataStructs.Fileset import Fileset
11 + from WMCore.DataStructs.Run import Run
12 + from WMCore.DataStructs.Subscription import Subscription
13 + from WMCore.DataStructs.Workflow import Workflow
14 + from WMCore.JobSplitting.SplitterFactory import SplitterFactory
15   from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
16 + from LumiList import LumiList
17  
18   class JobSplitter:
19      def __init__( self, cfg_params,  args ):
20          self.cfg_params = cfg_params
21          self.args=args
22 +
23 +        self.lumisPerJob = -1
24 +        self.totalNLumis = 0
25 +        self.theNumberOfJobs = 0
26 +        self.limitNJobs = False
27 +        self.limitTotalLumis = False
28 +        self.limitJobLumis = False
29 +
30          #self.maxEvents
31          # init BlackWhiteListParser
32 <        seWhiteList = cfg_params.get('GRID.se_white_list',[])
32 >        self.seWhiteList = cfg_params.get('GRID.se_white_list',[])
33          seBlackList = cfg_params.get('GRID.se_black_list',[])
34 <        self.blackWhiteListParser = SEBlackWhiteListParser(seWhiteList, seBlackList, common.logger())
34 >        self.blackWhiteListParser = SEBlackWhiteListParser(self.seWhiteList, seBlackList, common.logger())
35  
36  
37      def checkUserSettings(self):
# Line 42 | Line 62 | class JobSplitter:
62              self.total_number_of_events = 0
63              self.selectTotalNumberEvents = 0
64  
65 +        return
66 +
67 +    def checkLumiSettings(self):
68 +        """
69 +        Check to make sure the user has specified enough information to
70 +        perform splitting by Lumis to run the job
71 +        """
72 +        settings = 0
73 +        if self.cfg_params.has_key('CMSSW.lumis_per_job'):
74 +            self.lumisPerJob =int( self.cfg_params['CMSSW.lumis_per_job'])
75 +            self.limitJobLumis = True
76 +            settings += 1
77 +
78 +        if self.cfg_params.has_key('CMSSW.number_of_jobs'):
79 +            self.theNumberOfJobs =int( self.cfg_params['CMSSW.number_of_jobs'])
80 +            self.limitNJobs = True
81 +            settings += 1
82 +
83 +        if self.cfg_params.has_key('CMSSW.total_number_of_lumis'):
84 +            self.totalNLumis = int(self.cfg_params['CMSSW.total_number_of_lumis'])
85 +            self.limitTotalLumis = (self.totalNLumis != -1)
86 +            settings += 1
87 +
88 +        if settings != 2:
89 +            msg = 'When splitting by lumi section you must specify two and only two of:\n'
90 +            msg += '  number_of_jobs, lumis_per_job, total_number_of_lumis'
91 +            raise CrabException(msg)
92 +        if self.limitNJobs and self.limitJobLumis:
93 +            self.limitTotalLumis = True
94 +            self.totalNLumis = self.lumisPerJob * self.theNumberOfJobs
95 +
96 +        # Has the user specified runselection?
97 +        if (self.cfg_params.has_key('CMSSW.runselection')):
98 +            common.logger.info('You have specified runselection and split by lumi.')
99 +            common.logger.info('Good lumi list will be the intersection of runselection and lumimask or ADS (if any).')
100 +        return
101 +
102 +    def ComputeSubBlockSites( self, blockSites ):
103 +        """
104 +        """
105 +        sub_blockSites = {}
106 +        for k,v in blockSites.iteritems():
107 +            sites=self.blackWhiteListParser.checkWhiteList(v)
108 +            if sites : sub_blockSites[k]=v
109 +        if len(sub_blockSites) < 1:
110 +            msg = 'WARNING: the sites %s is not hosting any part of data.'%self.seWhiteList
111 +            raise CrabException(msg)
112 +        return sub_blockSites
113  
114   ########################################################################
115      def jobSplittingByEvent( self ):
# Line 77 | Line 145 | class JobSplitter:
145          self.useParent = int(self.cfg_params.get('CMSSW.use_parent',0))
146          noBboundary = int(self.cfg_params.get('CMSSW.no_block_boundary',0))
147  
148 +        if noBboundary == 1:
149 +            if self.total_number_of_events== -1:
150 +                msg = 'You are selecting no_block_boundary=1 which does not allow to set total_number_of_events=-1\n'
151 +                msg +='\tYou shoud get the number of event from DBS web interface and use it for your configuration.'
152 +                raise CrabException(msg)
153 +            if len(self.seWhiteList) == 0 or  len(self.seWhiteList.split(',')) != 1:
154 +                msg = 'You are selecting no_block_boundary=1 which requires to choose one and only one site.\n'
155 +                msg += "\tPlease set se_white_list with the site's storage element name."
156 +                raise  CrabException(msg)
157 +            blockSites = self.ComputeSubBlockSites(blockSites)
158 +
159          # ---- Handle the possible job splitting configurations ---- #
160          if (self.selectTotalNumberEvents):
161              totalEventsRequested = self.total_number_of_events
# Line 319 | Line 398 | class JobSplitter:
398                  virgola = ","
399              for block in bloskNoSite:
400                  msg += ' ' + str(block) + virgola
401 <            msg += '\n               Related jobs:\n                 '
401 >            msg += '\n\t\tRelated jobs:\n                 '
402              virgola = ""
403              if len(noSiteBlock) > 1:
404                  virgola = ","
405              for range_jobs in noSiteBlock:
406                  msg += str(range_jobs) + virgola
407 <            msg += '\n               will not be submitted and this block of data can not be analyzed!\n'
407 >            msg += '\n\t\twill not be submitted and this block of data can not be analyzed!\n'
408              if self.cfg_params.has_key('GRID.se_white_list'):
409 <                msg += 'WARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
410 <                msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
411 <                msg += 'Please check if the dataset is available at this site!)\n'
409 >                msg += '\tWARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
410 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
411 >                msg += '\tPlease check if the dataset is available at this site!)'
412              if self.cfg_params.has_key('GRID.ce_white_list'):
413 <                msg += 'WARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
414 <                msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
415 <                msg += 'Please check if the dataset is available at this site!)\n'
413 >                msg += '\tWARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
414 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
415 >                msg += '\tPlease check if the dataset is available at this site!)\n'
416  
417              common.logger.info(msg)
418  
419          if bloskNoSite == allBlock:
420 <            raise CrabException('No jobs created')
420 >            msg += 'Requested jobs cannot be Created! \n'
421 >            if self.cfg_params.has_key('GRID.se_white_list'):
422 >                msg += '\tWARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
423 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
424 >                msg += '\tPlease check if the dataset is available at this site!)'
425 >            if self.cfg_params.has_key('GRID.ce_white_list'):
426 >                msg += '\tWARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
427 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
428 >                msg += '\tPlease check if the dataset is available at this site!)\n'
429 >            raise CrabException(msg)
430  
431          return
432  
# Line 347 | Line 435 | class JobSplitter:
435      def jobSplittingByRun(self):
436          """
437          """
350        from sets import Set
351        from WMCore.JobSplitting.RunBased import RunBased
352        from WMCore.DataStructs.Workflow import Workflow
353        from WMCore.DataStructs.File import File
354        from WMCore.DataStructs.Fileset import Fileset
355        from WMCore.DataStructs.Subscription import Subscription
356        from WMCore.JobSplitting.SplitterFactory import SplitterFactory
357        from WMCore.DataStructs.Run import Run
438  
439          self.checkUserSettings()
440          blockSites = self.args['blockSites']
# Line 393 | Line 473 | class JobSplitter:
473          jobfactory = splitter(subs)
474  
475          #loop over all runs
396        set = Set(runList)
476          list_of_lists = []
477          jobDestination = []
478          count = 0
# Line 408 | Line 487 | class JobSplitter:
487                  #need to check single file location
488                  jobDestination.append(res['locations'])
489                  count +=1
490 <       # prepare dict output
490 >        # prepare dict output
491          dictOut = {}
492          dictOut['params']= ['InputFiles','MaxEvents','SkipEvents']
493          dictOut['args'] = list_of_lists
# Line 435 | Line 514 | class JobSplitter:
514          return res
515  
516   ########################################################################
517 <    def jobSplittingNoInput(self):
517 >    def prepareSplittingNoInput(self):
518          """
440        Perform job splitting based on number of event per job
519          """
442        common.logger.debug('Splitting per events')
443        self.checkUserSettings()
444        jobDestination=[]
445        if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
446            msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
447            raise CrabException(msg)
448
449        managedGenerators =self.args['managedGenerators']
450        generator = self.args['generator']
451        firstRun = self.cfg_params.get('CMSSW.first_run',None)
452
520          if (self.selectEventsPerJob):
521              common.logger.info('Required '+str(self.eventsPerJob)+' events per job ')
522          if (self.selectNumberOfJobs):
# Line 472 | Line 539 | class JobSplitter:
539              self.total_number_of_jobs = self.theNumberOfJobs
540              self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
541  
542 +
543 +    def jobSplittingNoInput(self):
544 +        """
545 +        Perform job splitting based on number of event per job
546 +        """
547 +        common.logger.debug('Splitting per events')
548 +        self.checkUserSettings()
549 +        jobDestination=[]
550 +        if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
551 +            msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
552 +            raise CrabException(msg)
553 +
554 +        managedGenerators =self.args['managedGenerators']
555 +        generator = self.args['generator']
556 +        firstLumi = self.cfg_params.get('CMSSW.first_lumi', 1)
557 +
558 +        self.prepareSplittingNoInput()
559 +
560          common.logger.debug('N jobs  '+str(self.total_number_of_jobs))
561  
562          # is there any remainder?
# Line 487 | Line 572 | class JobSplitter:
572          self.list_of_args = []
573          for i in range(self.total_number_of_jobs):
574              ## Since there is no input, any site is good
575 <            jobDestination.append([""]) #must be empty to write correctly the xml
575 >            jobDestination.append([""]) # must be empty to correctly write the XML
576              args=[]
577 <            if (firstRun):
578 <                ## pythia first run
494 <                args.append(str(firstRun)+str(i))
577 >            if (firstLumi): # Pythia first lumi
578 >                args.append(str(int(firstLumi)+i))
579              if (generator in managedGenerators):
580                 args.append(generator)
581                 if (generator == 'comphep' and i == 0):
# Line 505 | Line 589 | class JobSplitter:
589  
590          dictOut = {}
591          dictOut['params'] = ['MaxEvents']
592 <        if (firstRun):
593 <            dictOut['params'] = ['FirstRun','MaxEvents']
594 <            if ( generator in managedGenerators ) :
595 <                dictOut['params'] = ['FirstRun', 'Generator', 'FirstEvent', 'MaxEvents']
592 >        if (firstLumi):
593 >            dictOut['params'] = ['FirstLumi','MaxEvents']
594 >            if (generator in managedGenerators):
595 >                dictOut['params'] = ['FirstLumi', 'Generator', 'FirstEvent', 'MaxEvents']
596          else:
597              if (generator in managedGenerators) :
598                  dictOut['params'] = ['Generator', 'FirstEvent', 'MaxEvents']
# Line 531 | Line 615 | class JobSplitter:
615          common.logger.debug('Splitting per job')
616          common.logger.info('Required '+str(self.theNumberOfJobs)+' jobs in total ')
617  
618 <        self.total_number_of_jobs = self.theNumberOfJobs
618 > #        self.total_number_of_jobs = self.theNumberOfJobs
619 >
620 >        self.prepareSplittingNoInput()
621  
622          common.logger.debug('N jobs  '+str(self.total_number_of_jobs))
623  
624          common.logger.info(str(self.total_number_of_jobs)+' jobs can be created')
625  
626          # argument is seed number.$i
627 <        #self.list_of_args = []
627 >        self.list_of_args = []
628          for i in range(self.total_number_of_jobs):
629 +            args=[]
630              jobDestination.append([""])
631 <        #   self.list_of_args.append([str(i)])
631 >            if self.eventsPerJob != 0 :
632 >                args.append(str(self.eventsPerJob))
633 >                self.list_of_args.append(args)
634  
635         # prepare dict output
636          dictOut = {}
637 <        dictOut['args'] = [] # self.list_of_args
637 >        dictOut['params'] = ['MaxEvents']
638 >        dictOut['args'] =  self.list_of_args
639          dictOut['jobDestination'] = jobDestination
640          dictOut['njobs']=self.total_number_of_jobs
641          return dictOut
# Line 553 | Line 643 | class JobSplitter:
643  
644      def jobSplittingByLumi(self):
645          """
646 +        Split task into jobs by Lumi section paying attention to which
647 +        lumis should be run (according to the analysis dataset).
648 +        This uses WMBS job splitting which does not split files over jobs
649 +        so the job will have AT LEAST as many lumis as requested, perhaps
650 +        more
651          """
652 <        return
652 >
653 >        common.logger.debug('Splitting by Lumi')
654 >        self.checkLumiSettings()
655 >
656 >        blockSites = self.args['blockSites']
657 >        pubdata = self.args['pubdata']
658 >
659 >        lumisPerFile  = pubdata.getLumis()
660 >
661 >        # Make the list of WMBS files for job splitter
662 >        fileList = pubdata.getListFiles()
663 >        thefiles = Fileset(name='FilesToSplit')
664 >        for jobFile in fileList:
665 >            block = jobFile['Block']['Name']
666 >            try:
667 >                jobFile['Block']['StorageElementList'].extend(blockSites[block])
668 >            except:
669 >                continue
670 >            wmbsFile = File(jobFile['LogicalFileName'])
671 >            [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
672 >            wmbsFile['block'] = block
673 >            for lumi in lumisPerFile[jobFile['LogicalFileName']]:
674 >                wmbsFile.addRun(Run(lumi[0], lumi[1]))
675 >            thefiles.addFile(wmbsFile)
676 >
677 >        # Create the factory and workflow
678 >        work = Workflow()
679 >        subs = Subscription(fileset    = thefiles,    workflow = work,
680 >                            split_algo = 'LumiBased', type     = "Processing")
681 >        splitter = SplitterFactory()
682 >        jobFactory = splitter(subs)
683 >
684 >        list_of_lists = []
685 >        jobDestination = []
686 >        jobCount = 0
687 >        lumisCreated = 0
688 >
689 >        if not self.limitJobLumis:
690 >            self.lumisPerJob = pubdata.getMaxLumis() // self.theNumberOfJobs + 1
691 >            common.logger.info('Each job will process about %s lumis.' %
692 >                                self.lumisPerJob)
693 >
694 >        for jobGroup in  jobFactory(lumis_per_job = self.lumisPerJob):
695 >            for job in jobGroup.jobs:
696 >                if (self.limitNJobs and jobCount >= self.theNumberOfJobs):
697 >                    common.logger.info('Limit on number of jobs reached.')
698 >                    break
699 >                if (self.limitTotalLumis and lumisCreated >= self.totalNLumis):
700 >                    common.logger.info('Limit on number of lumis reached.')
701 >                    break
702 >                lumis = []
703 >                lfns  = []
704 >                locations = []
705 >                firstFile = True
706 >                # Collect information from all the files
707 >                for jobFile in job.getFiles():
708 >                    doFile = False
709 >                    if firstFile:  # Get locations from first file in the job
710 >                        for loc in jobFile['locations']:
711 >                            locations.append(loc)
712 >                        firstFile = False
713 >                    # Accumulate Lumis from all files
714 >                    for lumiList in jobFile['runs']:
715 >                        theRun = lumiList.run
716 >                        for theLumi in list(lumiList):
717 >                            if (not self.limitTotalLumis) or \
718 >                               (lumisCreated <= self.totalNLumis):
719 >                                doFile = True
720 >                                lumisCreated += 1
721 >                                lumis.append( (theRun, theLumi) )
722 >                    if doFile:
723 >                        lfns.append(jobFile['lfn'])
724 >                fileString = ','.join(lfns)
725 >                lumiLister = LumiList(lumis = lumis)
726 >                lumiString = lumiLister.getCMSSWString()
727 >                list_of_lists.append([fileString, str(-1), str(0), lumiString])
728 >
729 >                jobDestination.append(locations)
730 >                jobCount += 1
731 >                common.logger.debug('Job %s will run on %s files and %s lumis '
732 >                    % (jobCount, len(lfns), len(lumis) ))
733 >
734 >        common.logger.info('%s jobs created to run on %s lumis' %
735 >                              (jobCount, lumisCreated))
736 >
737 >        # Prepare dict output matching back to non-WMBS job creation
738 >        dictOut = {}
739 >        dictOut['params'] = ['InputFiles', 'MaxEvents', 'SkipEvents', 'Lumis']
740 >        dictOut['args'] = list_of_lists
741 >        dictOut['jobDestination'] = jobDestination
742 >        dictOut['njobs'] = jobCount
743 >
744 >        return dictOut
745 >
746 >
747      def Algos(self):
748          """
749          Define key splittingType matrix

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines