ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Splitter.py
(Generate patch)

Comparing COMP/CRAB/python/Splitter.py (file contents):
Revision 1.26 by ewv, Wed Jul 29 21:42:16 2009 UTC vs.
Revision 1.41 by ewv, Tue Jun 29 17:46:42 2010 UTC

# Line 1 | Line 1
1 +
2 + __revision__ = "$Id$"
3 + __version__ = "$Revision$"
4 +
5   import common
2 from sets import Set
6   from crab_exceptions import *
7   from crab_util import *
8  
# Line 10 | Line 13 | from WMCore.DataStructs.Subscription imp
13   from WMCore.DataStructs.Workflow import Workflow
14   from WMCore.JobSplitting.SplitterFactory import SplitterFactory
15   from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
16 + try: # Can remove when CMSSW 3.7 and earlier are dropped
17 +    from FWCore.PythonUtilities.LumiList import LumiList
18 + except ImportError:
19 +    from LumiList import LumiList
20  
21   class JobSplitter:
22      def __init__( self, cfg_params,  args ):
23          self.cfg_params = cfg_params
24          self.args=args
25 +
26 +        self.lumisPerJob = -1
27 +        self.totalNLumis = 0
28 +        self.theNumberOfJobs = 0
29 +        self.limitNJobs = False
30 +        self.limitTotalLumis = False
31 +        self.limitJobLumis = False
32 +
33          #self.maxEvents
34          # init BlackWhiteListParser
35          self.seWhiteList = cfg_params.get('GRID.se_white_list',[])
36          seBlackList = cfg_params.get('GRID.se_black_list',[])
37          self.blackWhiteListParser = SEBlackWhiteListParser(self.seWhiteList, seBlackList, common.logger())
38  
39 +        ## check if has been asked for a non default file to store/read analyzed fileBlocks
40 +        defaultName = common.work_space.shareDir()+'AnalyzedBlocks.txt'
41 +        self.fileBlocks_FileName = os.path.abspath(self.cfg_params.get('CMSSW.fileblocks_file',defaultName))
42 +
43  
44      def checkUserSettings(self):
45          ## Events per job
# Line 50 | Line 69 | class JobSplitter:
69              self.total_number_of_events = 0
70              self.selectTotalNumberEvents = 0
71  
72 +        return
73 +
74 +    def checkLumiSettings(self):
75 +        """
76 +        Check to make sure the user has specified enough information to
77 +        perform splitting by Lumis to run the job
78 +        """
79 +        settings = 0
80 +        if self.cfg_params.has_key('CMSSW.lumis_per_job'):
81 +            self.lumisPerJob =int( self.cfg_params['CMSSW.lumis_per_job'])
82 +            self.limitJobLumis = True
83 +            settings += 1
84 +
85 +        if self.cfg_params.has_key('CMSSW.number_of_jobs'):
86 +            self.theNumberOfJobs =int( self.cfg_params['CMSSW.number_of_jobs'])
87 +            self.limitNJobs = True
88 +            settings += 1
89 +
90 +        if self.cfg_params.has_key('CMSSW.total_number_of_lumis'):
91 +            self.totalNLumis = int(self.cfg_params['CMSSW.total_number_of_lumis'])
92 +            self.limitTotalLumis = (self.totalNLumis != -1)
93 +            settings += 1
94 +
95 +        if settings != 2:
96 +            msg = 'When splitting by lumi section you must specify two and only two of:\n'
97 +            msg += '  number_of_jobs, lumis_per_job, total_number_of_lumis'
98 +            raise CrabException(msg)
99 +        if self.limitNJobs and self.limitJobLumis:
100 +            self.limitTotalLumis = True
101 +            self.totalNLumis = self.lumisPerJob * self.theNumberOfJobs
102 +
103 +        # Has the user specified runselection?
104 +        if (self.cfg_params.has_key('CMSSW.runselection')):
105 +            common.logger.info('You have specified runselection and split by lumi.')
106 +            common.logger.info('Good lumi list will be the intersection of runselection and lumimask or ADS (if any).')
107 +        return
108  
109      def ComputeSubBlockSites( self, blockSites ):
110          """
# Line 331 | Line 386 | class JobSplitter:
386          allBlock = []
387  
388          blockCounter = 0
389 +        saveFblocks =''
390          for block in blocks:
391              if block in jobsOfBlock.keys() :
392                  blockCounter += 1
# Line 341 | Line 397 | class JobSplitter:
397                  if len(sites) == 0:
398                      noSiteBlock.append( spanRanges(jobsOfBlock[block]) )
399                      bloskNoSite.append( blockCounter )
400 +                else:
401 +                    saveFblocks += str(block)+'\n'
402 +        writeTXTfile(self, self.fileBlocks_FileName , saveFblocks)
403  
404          common.logger.info(screenOutput)
405          if len(noSiteBlock) > 0 and len(bloskNoSite) > 0:
# Line 369 | Line 428 | class JobSplitter:
428              common.logger.info(msg)
429  
430          if bloskNoSite == allBlock:
431 <            raise CrabException('No jobs created')
431 >            msg += 'Requested jobs cannot be Created! \n'
432 >            if self.cfg_params.has_key('GRID.se_white_list'):
433 >                msg += '\tWARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
434 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
435 >                msg += '\tPlease check if the dataset is available at this site!)'
436 >            if self.cfg_params.has_key('GRID.ce_white_list'):
437 >                msg += '\tWARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
438 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
439 >                msg += '\tPlease check if the dataset is available at this site!)\n'
440 >            raise CrabException(msg)
441  
442          return
443  
# Line 416 | Line 484 | class JobSplitter:
484          jobfactory = splitter(subs)
485  
486          #loop over all runs
419        set = Set(runList)
487          list_of_lists = []
488          jobDestination = []
489 +        list_of_blocks = []
490          count = 0
491          for jobGroup in  jobfactory():
492              if count <  self.theNumberOfJobs:
# Line 430 | Line 498 | class JobSplitter:
498                  list_of_lists.append([fullString,str(-1),str(0)])
499                  #need to check single file location
500                  jobDestination.append(res['locations'])
501 +                list_of_blocks.append(res['block'])
502                  count +=1
503 <       # prepare dict output
503 >        # prepare dict output
504          dictOut = {}
505          dictOut['params']= ['InputFiles','MaxEvents','SkipEvents']
506          dictOut['args'] = list_of_lists
507          dictOut['jobDestination'] = jobDestination
508          dictOut['njobs']=count
509  
510 +        self.cacheBlocks(list_of_blocks,jobDestination)
511 +
512          return dictOut
513  
514      def getJobInfo( self,jobGroup ):
# Line 451 | Line 522 | class JobSplitter:
522                  for loc in file['locations']:
523                      if tmp_check < 1 :
524                          locations.append(loc)
525 +                        res['block']= file['block']
526                  tmp_check = tmp_check + 1
455                ### qui va messo il check per la locations
527          res['lfns'] = lfns
528          res['locations'] = locations
529          return res
# Line 497 | Line 568 | class JobSplitter:
568  
569          managedGenerators =self.args['managedGenerators']
570          generator = self.args['generator']
571 <        firstRun = self.cfg_params.get('CMSSW.first_run',None)
571 >        firstLumi = self.cfg_params.get('CMSSW.first_lumi', 1)
572  
573          self.prepareSplittingNoInput()
574  
# Line 516 | Line 587 | class JobSplitter:
587          self.list_of_args = []
588          for i in range(self.total_number_of_jobs):
589              ## Since there is no input, any site is good
590 <            jobDestination.append([""]) #must be empty to write correctly the xml
590 >            jobDestination.append([""]) # must be empty to correctly write the XML
591              args=[]
592 <            if (firstRun):
593 <                ## pythia first run
523 <                args.append(str(firstRun)+str(i))
592 >            if (firstLumi): # Pythia first lumi
593 >                args.append(str(int(firstLumi)+i))
594              if (generator in managedGenerators):
595                 args.append(generator)
596                 if (generator == 'comphep' and i == 0):
# Line 534 | Line 604 | class JobSplitter:
604  
605          dictOut = {}
606          dictOut['params'] = ['MaxEvents']
607 <        if (firstRun):
608 <            dictOut['params'] = ['FirstRun','MaxEvents']
609 <            if ( generator in managedGenerators ) :
610 <                dictOut['params'] = ['FirstRun', 'Generator', 'FirstEvent', 'MaxEvents']
607 >        if (firstLumi):
608 >            dictOut['params'] = ['FirstLumi','MaxEvents']
609 >            if (generator in managedGenerators):
610 >                dictOut['params'] = ['FirstLumi', 'Generator', 'FirstEvent', 'MaxEvents']
611          else:
612              if (generator in managedGenerators) :
613                  dictOut['params'] = ['Generator', 'FirstEvent', 'MaxEvents']
# Line 596 | Line 666 | class JobSplitter:
666          """
667  
668          common.logger.debug('Splitting by Lumi')
669 <        self.checkUserSettings() # FIXME need one for lumis
669 >        self.checkLumiSettings()
670  
671          blockSites = self.args['blockSites']
672          pubdata = self.args['pubdata']
673  
604        if self.selectNumberOfJobs == 0 :
605            self.theNumberOfJobs = 9999999
674          lumisPerFile  = pubdata.getLumis()
675  
676          # Make the list of WMBS files for job splitter
# Line 621 | Line 689 | class JobSplitter:
689                  wmbsFile.addRun(Run(lumi[0], lumi[1]))
690              thefiles.addFile(wmbsFile)
691  
692 +        # Create the factory and workflow
693          work = Workflow()
694          subs = Subscription(fileset    = thefiles,    workflow = work,
695                              split_algo = 'LumiBased', type     = "Processing")
# Line 630 | Line 699 | class JobSplitter:
699          list_of_lists = []
700          jobDestination = []
701          jobCount = 0
702 <        self.theNumberOfJobs = 20 #FIXME
703 <        for jobGroup in  jobFactory(lumis_per_job = 50): #FIXME
702 >        lumisCreated = 0
703 >        list_of_blocks = []
704 >        if not self.limitJobLumis:
705 >            self.lumisPerJob = pubdata.getMaxLumis() // self.theNumberOfJobs + 1
706 >            common.logger.info('Each job will process about %s lumis.' %
707 >                                self.lumisPerJob)
708 >
709 >        for jobGroup in jobFactory(lumis_per_job = self.lumisPerJob):
710              for job in jobGroup.jobs:
711 <                if jobCount <  self.theNumberOfJobs:
712 <                    lumis = []
713 <                    lfns  = []
714 <                    locations = []
715 <                    firstFile = True
716 <                    # Collect information from all the files
717 <                    for jobFile in job.getFiles():
718 <                        if firstFile:  # Get locations from first file in the job
719 <                            for loc in jobFile['locations']:
720 <                                locations.append(loc)
721 <                            firstFile = False
722 <                        # Accumulate Lumis from all files
723 <                        for lumiList in jobFile['runs']:
724 <                            theRun = lumiList.run
725 <                            for theLumi in list(lumiList):
711 >                if (self.limitNJobs and jobCount >= self.theNumberOfJobs):
712 >                    common.logger.info('Limit on number of jobs reached.')
713 >                    break
714 >                if (self.limitTotalLumis and lumisCreated >= self.totalNLumis):
715 >                    common.logger.info('Limit on number of lumis reached.')
716 >                    break
717 >                lumis = []
718 >                lfns  = []
719 >                locations = []
720 >                blocks = []
721 >                firstFile = True
722 >                # Collect information from all the files
723 >                for jobFile in job.getFiles():
724 >                    doFile = False
725 >                    if firstFile:  # Get locations from first file in the job
726 >                        for loc in jobFile['locations']:
727 >                            locations.append(loc)
728 >                        blocks.append(jobFile['block'])
729 >                        firstFile = False
730 >                    # Accumulate Lumis from all files
731 >                    for lumiList in jobFile['runs']:
732 >                        theRun = lumiList.run
733 >                        for theLumi in list(lumiList):
734 >                            if (not self.limitTotalLumis) or \
735 >                               (lumisCreated < self.totalNLumis):
736 >                                doFile = True
737 >                                lumisCreated += 1
738                                  lumis.append( (theRun, theLumi) )
739 <
739 >                    if doFile:
740                          lfns.append(jobFile['lfn'])
741 <                    fileString = ','.join(lfns)
742 <                    lumiString = compressLumiString(lumis)
743 <                    common.logger.debug('Job %s will run on %s files and %s lumis '
744 <                        % (jobCount, len(lfns), len(lumis) ))
745 <                    list_of_lists.append([fileString, str(-1), str(0), lumiString])
741 >                fileString = ','.join(lfns)
742 >                lumiLister = LumiList(lumis = lumis)
743 >                lumiString = lumiLister.getCMSSWString()
744 >                list_of_lists.append([fileString, str(-1), str(0), lumiString])
745 >                list_of_blocks.append(blocks)
746 >                jobDestination.append(locations)
747 >                jobCount += 1
748 >                common.logger.debug('Job %s will run on %s files and %s lumis '
749 >                    % (jobCount, len(lfns), len(lumis) ))
750 >
751 >        common.logger.info('%s jobs created to run on %s lumis' %
752 >                              (jobCount, lumisCreated))
753  
660                    jobDestination.append(locations)
661                    jobCount += 1
754          # Prepare dict output matching back to non-WMBS job creation
755          dictOut = {}
756          dictOut['params'] = ['InputFiles', 'MaxEvents', 'SkipEvents', 'Lumis']
# Line 666 | Line 758 | class JobSplitter:
758          dictOut['jobDestination'] = jobDestination
759          dictOut['njobs'] = jobCount
760  
761 +        self.cacheBlocks(list_of_blocks,jobDestination)
762 +
763          return dictOut
764  
765 +    def cacheBlocks(self, blocks,destinations):
766 +
767 +        saveFblocks=''
768 +        for i in range(len(blocks)):
769 +            sites=self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(destinations[i]))
770 +            if len(sites) != 0:
771 +                for block in blocks[i]:
772 +                    saveFblocks += str(block)+'\n'
773 +        writeTXTfile(self, self.fileBlocks_FileName , saveFblocks)
774  
775      def Algos(self):
776          """
# Line 682 | Line 785 | class JobSplitter:
785                       }
786          return SplitAlogs
787  
685
686
687 def compressLumiString(lumis):
688    """
689    Turn a list of 2-tuples of run/lumi numbers into a list of the format
690    R1:L1,R2:L2-R3:L3 which is acceptable to CMSSW LumiBlockRange variable
691    """
692
693    lumis.sort()
694    parts = []
695    startRange = None
696    endRange = None
697
698    for lumiBlock in lumis:
699        if not startRange: # This is the first one
700            startRange = lumiBlock
701            endRange = lumiBlock
702        elif lumiBlock == endRange: # Same Lumi (different files?)
703            pass
704        elif lumiBlock[0] == endRange[0] and lumiBlock[1] == endRange[1] + 1: # This is a continuation
705            endRange = lumiBlock
706        else: # This is the start of a new range
707            part = ':'.join(map(str, startRange))
708            if startRange != endRange:
709                part += '-' + ':'.join(map(str, endRange))
710            parts.append(part)
711            startRange = lumiBlock
712            endRange = lumiBlock
713
714    # Put out what's left
715    if startRange:
716        part = ':'.join(map(str, startRange))
717        if startRange != endRange:
718            part += '-' + ':'.join(map(str, endRange))
719        parts.append(part)
720
721    output = ','.join(parts)
722    return output
723
724

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines