ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Splitter.py
(Generate patch)

Comparing COMP/CRAB/python/Splitter.py (file contents):
Revision 1.23 by spiga, Fri Jun 19 09:54:22 2009 UTC vs.
Revision 1.41 by ewv, Tue Jun 29 17:46:42 2010 UTC

# Line 1 | Line 1
1 +
2 + __revision__ = "$Id$"
3 + __version__ = "$Revision$"
4 +
5   import common
6   from crab_exceptions import *
7   from crab_util import *
8 +
9 + from WMCore.DataStructs.File import File
10 + from WMCore.DataStructs.Fileset import Fileset
11 + from WMCore.DataStructs.Run import Run
12 + from WMCore.DataStructs.Subscription import Subscription
13 + from WMCore.DataStructs.Workflow import Workflow
14 + from WMCore.JobSplitting.SplitterFactory import SplitterFactory
15   from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
16 + try: # Can remove when CMSSW 3.7 and earlier are dropped
17 +    from FWCore.PythonUtilities.LumiList import LumiList
18 + except ImportError:
19 +    from LumiList import LumiList
20  
21   class JobSplitter:
22      def __init__( self, cfg_params,  args ):
23          self.cfg_params = cfg_params
24          self.args=args
25 +
26 +        self.lumisPerJob = -1
27 +        self.totalNLumis = 0
28 +        self.theNumberOfJobs = 0
29 +        self.limitNJobs = False
30 +        self.limitTotalLumis = False
31 +        self.limitJobLumis = False
32 +
33          #self.maxEvents
34          # init BlackWhiteListParser
35 <        seWhiteList = cfg_params.get('GRID.se_white_list',[])
35 >        self.seWhiteList = cfg_params.get('GRID.se_white_list',[])
36          seBlackList = cfg_params.get('GRID.se_black_list',[])
37 <        self.blackWhiteListParser = SEBlackWhiteListParser(seWhiteList, seBlackList, common.logger())
37 >        self.blackWhiteListParser = SEBlackWhiteListParser(self.seWhiteList, seBlackList, common.logger())
38 >
39 >        ## check if has been asked for a non default file to store/read analyzed fileBlocks
40 >        defaultName = common.work_space.shareDir()+'AnalyzedBlocks.txt'
41 >        self.fileBlocks_FileName = os.path.abspath(self.cfg_params.get('CMSSW.fileblocks_file',defaultName))
42  
43  
44      def checkUserSettings(self):
# Line 42 | Line 69 | class JobSplitter:
69              self.total_number_of_events = 0
70              self.selectTotalNumberEvents = 0
71  
72 +        return
73 +
74 +    def checkLumiSettings(self):
75 +        """
76 +        Check to make sure the user has specified enough information to
77 +        perform splitting by Lumis to run the job
78 +        """
79 +        settings = 0
80 +        if self.cfg_params.has_key('CMSSW.lumis_per_job'):
81 +            self.lumisPerJob =int( self.cfg_params['CMSSW.lumis_per_job'])
82 +            self.limitJobLumis = True
83 +            settings += 1
84 +
85 +        if self.cfg_params.has_key('CMSSW.number_of_jobs'):
86 +            self.theNumberOfJobs =int( self.cfg_params['CMSSW.number_of_jobs'])
87 +            self.limitNJobs = True
88 +            settings += 1
89 +
90 +        if self.cfg_params.has_key('CMSSW.total_number_of_lumis'):
91 +            self.totalNLumis = int(self.cfg_params['CMSSW.total_number_of_lumis'])
92 +            self.limitTotalLumis = (self.totalNLumis != -1)
93 +            settings += 1
94 +
95 +        if settings != 2:
96 +            msg = 'When splitting by lumi section you must specify two and only two of:\n'
97 +            msg += '  number_of_jobs, lumis_per_job, total_number_of_lumis'
98 +            raise CrabException(msg)
99 +        if self.limitNJobs and self.limitJobLumis:
100 +            self.limitTotalLumis = True
101 +            self.totalNLumis = self.lumisPerJob * self.theNumberOfJobs
102 +
103 +        # Has the user specified runselection?
104 +        if (self.cfg_params.has_key('CMSSW.runselection')):
105 +            common.logger.info('You have specified runselection and split by lumi.')
106 +            common.logger.info('Good lumi list will be the intersection of runselection and lumimask or ADS (if any).')
107 +        return
108 +
109 +    def ComputeSubBlockSites( self, blockSites ):
110 +        """
111 +        """
112 +        sub_blockSites = {}
113 +        for k,v in blockSites.iteritems():
114 +            sites=self.blackWhiteListParser.checkWhiteList(v)
115 +            if sites : sub_blockSites[k]=v
116 +        if len(sub_blockSites) < 1:
117 +            msg = 'WARNING: the sites %s is not hosting any part of data.'%self.seWhiteList
118 +            raise CrabException(msg)
119 +        return sub_blockSites
120  
121   ########################################################################
122      def jobSplittingByEvent( self ):
# Line 77 | Line 152 | class JobSplitter:
152          self.useParent = int(self.cfg_params.get('CMSSW.use_parent',0))
153          noBboundary = int(self.cfg_params.get('CMSSW.no_block_boundary',0))
154  
155 +        if noBboundary == 1:
156 +            if self.total_number_of_events== -1:
157 +                msg = 'You are selecting no_block_boundary=1 which does not allow to set total_number_of_events=-1\n'
158 +                msg +='\tYou shoud get the number of event from DBS web interface and use it for your configuration.'
159 +                raise CrabException(msg)
160 +            if len(self.seWhiteList) == 0 or  len(self.seWhiteList.split(',')) != 1:
161 +                msg = 'You are selecting no_block_boundary=1 which requires to choose one and only one site.\n'
162 +                msg += "\tPlease set se_white_list with the site's storage element name."
163 +                raise  CrabException(msg)
164 +            blockSites = self.ComputeSubBlockSites(blockSites)
165 +
166          # ---- Handle the possible job splitting configurations ---- #
167          if (self.selectTotalNumberEvents):
168              totalEventsRequested = self.total_number_of_events
# Line 300 | Line 386 | class JobSplitter:
386          allBlock = []
387  
388          blockCounter = 0
389 +        saveFblocks =''
390          for block in blocks:
391              if block in jobsOfBlock.keys() :
392                  blockCounter += 1
# Line 310 | Line 397 | class JobSplitter:
397                  if len(sites) == 0:
398                      noSiteBlock.append( spanRanges(jobsOfBlock[block]) )
399                      bloskNoSite.append( blockCounter )
400 +                else:
401 +                    saveFblocks += str(block)+'\n'
402 +        writeTXTfile(self, self.fileBlocks_FileName , saveFblocks)
403  
404          common.logger.info(screenOutput)
405          if len(noSiteBlock) > 0 and len(bloskNoSite) > 0:
# Line 319 | Line 409 | class JobSplitter:
409                  virgola = ","
410              for block in bloskNoSite:
411                  msg += ' ' + str(block) + virgola
412 <            msg += '\n               Related jobs:\n                 '
412 >            msg += '\n\t\tRelated jobs:\n                 '
413              virgola = ""
414              if len(noSiteBlock) > 1:
415                  virgola = ","
416              for range_jobs in noSiteBlock:
417                  msg += str(range_jobs) + virgola
418 <            msg += '\n               will not be submitted and this block of data can not be analyzed!\n'
418 >            msg += '\n\t\twill not be submitted and this block of data can not be analyzed!\n'
419              if self.cfg_params.has_key('GRID.se_white_list'):
420 <                msg += 'WARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
421 <                msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
422 <                msg += 'Please check if the dataset is available at this site!)\n'
420 >                msg += '\tWARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
421 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
422 >                msg += '\tPlease check if the dataset is available at this site!)'
423              if self.cfg_params.has_key('GRID.ce_white_list'):
424 <                msg += 'WARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
425 <                msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
426 <                msg += 'Please check if the dataset is available at this site!)\n'
424 >                msg += '\tWARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
425 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
426 >                msg += '\tPlease check if the dataset is available at this site!)\n'
427  
428              common.logger.info(msg)
429  
430          if bloskNoSite == allBlock:
431 <            raise CrabException('No jobs created')
431 >            msg += 'Requested jobs cannot be Created! \n'
432 >            if self.cfg_params.has_key('GRID.se_white_list'):
433 >                msg += '\tWARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
434 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
435 >                msg += '\tPlease check if the dataset is available at this site!)'
436 >            if self.cfg_params.has_key('GRID.ce_white_list'):
437 >                msg += '\tWARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
438 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
439 >                msg += '\tPlease check if the dataset is available at this site!)\n'
440 >            raise CrabException(msg)
441  
442          return
443  
# Line 347 | Line 446 | class JobSplitter:
446      def jobSplittingByRun(self):
447          """
448          """
350        from sets import Set
351        from WMCore.JobSplitting.RunBased import RunBased
352        from WMCore.DataStructs.Workflow import Workflow
353        from WMCore.DataStructs.File import File
354        from WMCore.DataStructs.Fileset import Fileset
355        from WMCore.DataStructs.Subscription import Subscription
356        from WMCore.JobSplitting.SplitterFactory import SplitterFactory
357        from WMCore.DataStructs.Run import Run
449  
450          self.checkUserSettings()
451          blockSites = self.args['blockSites']
# Line 393 | Line 484 | class JobSplitter:
484          jobfactory = splitter(subs)
485  
486          #loop over all runs
396        set = Set(runList)
487          list_of_lists = []
488          jobDestination = []
489 +        list_of_blocks = []
490          count = 0
491          for jobGroup in  jobfactory():
492              if count <  self.theNumberOfJobs:
# Line 407 | Line 498 | class JobSplitter:
498                  list_of_lists.append([fullString,str(-1),str(0)])
499                  #need to check single file location
500                  jobDestination.append(res['locations'])
501 +                list_of_blocks.append(res['block'])
502                  count +=1
503 <       # prepare dict output
503 >        # prepare dict output
504          dictOut = {}
505          dictOut['params']= ['InputFiles','MaxEvents','SkipEvents']
506          dictOut['args'] = list_of_lists
507          dictOut['jobDestination'] = jobDestination
508          dictOut['njobs']=count
509  
510 +        self.cacheBlocks(list_of_blocks,jobDestination)
511 +
512          return dictOut
513  
514      def getJobInfo( self,jobGroup ):
# Line 428 | Line 522 | class JobSplitter:
522                  for loc in file['locations']:
523                      if tmp_check < 1 :
524                          locations.append(loc)
525 +                        res['block']= file['block']
526                  tmp_check = tmp_check + 1
432                ### qui va messo il check per la locations
527          res['lfns'] = lfns
528          res['locations'] = locations
529          return res
# Line 474 | Line 568 | class JobSplitter:
568  
569          managedGenerators =self.args['managedGenerators']
570          generator = self.args['generator']
571 <        firstRun = self.cfg_params.get('CMSSW.first_run',None)
571 >        firstLumi = self.cfg_params.get('CMSSW.first_lumi', 1)
572  
573          self.prepareSplittingNoInput()
574  
# Line 493 | Line 587 | class JobSplitter:
587          self.list_of_args = []
588          for i in range(self.total_number_of_jobs):
589              ## Since there is no input, any site is good
590 <            jobDestination.append([""]) #must be empty to write correctly the xml
590 >            jobDestination.append([""]) # must be empty to correctly write the XML
591              args=[]
592 <            if (firstRun):
593 <                ## pythia first run
500 <                args.append(str(firstRun)+str(i))
592 >            if (firstLumi): # Pythia first lumi
593 >                args.append(str(int(firstLumi)+i))
594              if (generator in managedGenerators):
595                 args.append(generator)
596                 if (generator == 'comphep' and i == 0):
# Line 511 | Line 604 | class JobSplitter:
604  
605          dictOut = {}
606          dictOut['params'] = ['MaxEvents']
607 <        if (firstRun):
608 <            dictOut['params'] = ['FirstRun','MaxEvents']
609 <            if ( generator in managedGenerators ) :
610 <                dictOut['params'] = ['FirstRun', 'Generator', 'FirstEvent', 'MaxEvents']
607 >        if (firstLumi):
608 >            dictOut['params'] = ['FirstLumi','MaxEvents']
609 >            if (generator in managedGenerators):
610 >                dictOut['params'] = ['FirstLumi', 'Generator', 'FirstEvent', 'MaxEvents']
611          else:
612              if (generator in managedGenerators) :
613                  dictOut['params'] = ['Generator', 'FirstEvent', 'MaxEvents']
# Line 565 | Line 658 | class JobSplitter:
658  
659      def jobSplittingByLumi(self):
660          """
661 +        Split task into jobs by Lumi section paying attention to which
662 +        lumis should be run (according to the analysis dataset).
663 +        This uses WMBS job splitting which does not split files over jobs
664 +        so the job will have AT LEAST as many lumis as requested, perhaps
665 +        more
666          """
667 <        return
667 >
668 >        common.logger.debug('Splitting by Lumi')
669 >        self.checkLumiSettings()
670 >
671 >        blockSites = self.args['blockSites']
672 >        pubdata = self.args['pubdata']
673 >
674 >        lumisPerFile  = pubdata.getLumis()
675 >
676 >        # Make the list of WMBS files for job splitter
677 >        fileList = pubdata.getListFiles()
678 >        thefiles = Fileset(name='FilesToSplit')
679 >        for jobFile in fileList:
680 >            block = jobFile['Block']['Name']
681 >            try:
682 >                jobFile['Block']['StorageElementList'].extend(blockSites[block])
683 >            except:
684 >                continue
685 >            wmbsFile = File(jobFile['LogicalFileName'])
686 >            [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
687 >            wmbsFile['block'] = block
688 >            for lumi in lumisPerFile[jobFile['LogicalFileName']]:
689 >                wmbsFile.addRun(Run(lumi[0], lumi[1]))
690 >            thefiles.addFile(wmbsFile)
691 >
692 >        # Create the factory and workflow
693 >        work = Workflow()
694 >        subs = Subscription(fileset    = thefiles,    workflow = work,
695 >                            split_algo = 'LumiBased', type     = "Processing")
696 >        splitter = SplitterFactory()
697 >        jobFactory = splitter(subs)
698 >
699 >        list_of_lists = []
700 >        jobDestination = []
701 >        jobCount = 0
702 >        lumisCreated = 0
703 >        list_of_blocks = []
704 >        if not self.limitJobLumis:
705 >            self.lumisPerJob = pubdata.getMaxLumis() // self.theNumberOfJobs + 1
706 >            common.logger.info('Each job will process about %s lumis.' %
707 >                                self.lumisPerJob)
708 >
709 >        for jobGroup in jobFactory(lumis_per_job = self.lumisPerJob):
710 >            for job in jobGroup.jobs:
711 >                if (self.limitNJobs and jobCount >= self.theNumberOfJobs):
712 >                    common.logger.info('Limit on number of jobs reached.')
713 >                    break
714 >                if (self.limitTotalLumis and lumisCreated >= self.totalNLumis):
715 >                    common.logger.info('Limit on number of lumis reached.')
716 >                    break
717 >                lumis = []
718 >                lfns  = []
719 >                locations = []
720 >                blocks = []
721 >                firstFile = True
722 >                # Collect information from all the files
723 >                for jobFile in job.getFiles():
724 >                    doFile = False
725 >                    if firstFile:  # Get locations from first file in the job
726 >                        for loc in jobFile['locations']:
727 >                            locations.append(loc)
728 >                        blocks.append(jobFile['block'])
729 >                        firstFile = False
730 >                    # Accumulate Lumis from all files
731 >                    for lumiList in jobFile['runs']:
732 >                        theRun = lumiList.run
733 >                        for theLumi in list(lumiList):
734 >                            if (not self.limitTotalLumis) or \
735 >                               (lumisCreated < self.totalNLumis):
736 >                                doFile = True
737 >                                lumisCreated += 1
738 >                                lumis.append( (theRun, theLumi) )
739 >                    if doFile:
740 >                        lfns.append(jobFile['lfn'])
741 >                fileString = ','.join(lfns)
742 >                lumiLister = LumiList(lumis = lumis)
743 >                lumiString = lumiLister.getCMSSWString()
744 >                list_of_lists.append([fileString, str(-1), str(0), lumiString])
745 >                list_of_blocks.append(blocks)
746 >                jobDestination.append(locations)
747 >                jobCount += 1
748 >                common.logger.debug('Job %s will run on %s files and %s lumis '
749 >                    % (jobCount, len(lfns), len(lumis) ))
750 >
751 >        common.logger.info('%s jobs created to run on %s lumis' %
752 >                              (jobCount, lumisCreated))
753 >
754 >        # Prepare dict output matching back to non-WMBS job creation
755 >        dictOut = {}
756 >        dictOut['params'] = ['InputFiles', 'MaxEvents', 'SkipEvents', 'Lumis']
757 >        dictOut['args'] = list_of_lists
758 >        dictOut['jobDestination'] = jobDestination
759 >        dictOut['njobs'] = jobCount
760 >
761 >        self.cacheBlocks(list_of_blocks,jobDestination)
762 >
763 >        return dictOut
764 >
765 >    def cacheBlocks(self, blocks,destinations):
766 >
767 >        saveFblocks=''
768 >        for i in range(len(blocks)):
769 >            sites=self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(destinations[i]))
770 >            if len(sites) != 0:
771 >                for block in blocks[i]:
772 >                    saveFblocks += str(block)+'\n'
773 >        writeTXTfile(self, self.fileBlocks_FileName , saveFblocks)
774 >
775      def Algos(self):
776          """
777          Define key splittingType matrix

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines