ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Splitter.py
(Generate patch)

Comparing COMP/CRAB/python/Splitter.py (file contents):
Revision 1.23 by spiga, Fri Jun 19 09:54:22 2009 UTC vs.
Revision 1.40 by spiga, Mon Jun 7 10:38:39 2010 UTC

# Line 1 | Line 1
1 +
2 + __revision__ = "$Id$"
3 + __version__ = "$Revision$"
4 +
5   import common
6   from crab_exceptions import *
7   from crab_util import *
8 +
9 + from WMCore.DataStructs.File import File
10 + from WMCore.DataStructs.Fileset import Fileset
11 + from WMCore.DataStructs.Run import Run
12 + from WMCore.DataStructs.Subscription import Subscription
13 + from WMCore.DataStructs.Workflow import Workflow
14 + from WMCore.JobSplitting.SplitterFactory import SplitterFactory
15   from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
16 + from LumiList import LumiList
17  
18   class JobSplitter:
19      def __init__( self, cfg_params,  args ):
20          self.cfg_params = cfg_params
21          self.args=args
22 +
23 +        self.lumisPerJob = -1
24 +        self.totalNLumis = 0
25 +        self.theNumberOfJobs = 0
26 +        self.limitNJobs = False
27 +        self.limitTotalLumis = False
28 +        self.limitJobLumis = False
29 +
30          #self.maxEvents
31          # init BlackWhiteListParser
32 <        seWhiteList = cfg_params.get('GRID.se_white_list',[])
32 >        self.seWhiteList = cfg_params.get('GRID.se_white_list',[])
33          seBlackList = cfg_params.get('GRID.se_black_list',[])
34 <        self.blackWhiteListParser = SEBlackWhiteListParser(seWhiteList, seBlackList, common.logger())
34 >        self.blackWhiteListParser = SEBlackWhiteListParser(self.seWhiteList, seBlackList, common.logger())
35 >
36 >        ## check if has been asked for a non default file to store/read analyzed fileBlocks
37 >        defaultName = common.work_space.shareDir()+'AnalyzedBlocks.txt'
38 >        self.fileBlocks_FileName = os.path.abspath(self.cfg_params.get('CMSSW.fileblocks_file',defaultName))
39  
40  
41      def checkUserSettings(self):
# Line 42 | Line 66 | class JobSplitter:
66              self.total_number_of_events = 0
67              self.selectTotalNumberEvents = 0
68  
69 +        return
70 +
71 +    def checkLumiSettings(self):
72 +        """
73 +        Check to make sure the user has specified enough information to
74 +        perform splitting by Lumis to run the job
75 +        """
76 +        settings = 0
77 +        if self.cfg_params.has_key('CMSSW.lumis_per_job'):
78 +            self.lumisPerJob =int( self.cfg_params['CMSSW.lumis_per_job'])
79 +            self.limitJobLumis = True
80 +            settings += 1
81 +
82 +        if self.cfg_params.has_key('CMSSW.number_of_jobs'):
83 +            self.theNumberOfJobs =int( self.cfg_params['CMSSW.number_of_jobs'])
84 +            self.limitNJobs = True
85 +            settings += 1
86 +
87 +        if self.cfg_params.has_key('CMSSW.total_number_of_lumis'):
88 +            self.totalNLumis = int(self.cfg_params['CMSSW.total_number_of_lumis'])
89 +            self.limitTotalLumis = (self.totalNLumis != -1)
90 +            settings += 1
91 +
92 +        if settings != 2:
93 +            msg = 'When splitting by lumi section you must specify two and only two of:\n'
94 +            msg += '  number_of_jobs, lumis_per_job, total_number_of_lumis'
95 +            raise CrabException(msg)
96 +        if self.limitNJobs and self.limitJobLumis:
97 +            self.limitTotalLumis = True
98 +            self.totalNLumis = self.lumisPerJob * self.theNumberOfJobs
99 +
100 +        # Has the user specified runselection?
101 +        if (self.cfg_params.has_key('CMSSW.runselection')):
102 +            common.logger.info('You have specified runselection and split by lumi.')
103 +            common.logger.info('Good lumi list will be the intersection of runselection and lumimask or ADS (if any).')
104 +        return
105 +
106 +    def ComputeSubBlockSites( self, blockSites ):
107 +        """
108 +        """
109 +        sub_blockSites = {}
110 +        for k,v in blockSites.iteritems():
111 +            sites=self.blackWhiteListParser.checkWhiteList(v)
112 +            if sites : sub_blockSites[k]=v
113 +        if len(sub_blockSites) < 1:
114 +            msg = 'WARNING: the sites %s is not hosting any part of data.'%self.seWhiteList
115 +            raise CrabException(msg)
116 +        return sub_blockSites
117  
118   ########################################################################
119      def jobSplittingByEvent( self ):
# Line 77 | Line 149 | class JobSplitter:
149          self.useParent = int(self.cfg_params.get('CMSSW.use_parent',0))
150          noBboundary = int(self.cfg_params.get('CMSSW.no_block_boundary',0))
151  
152 +        if noBboundary == 1:
153 +            if self.total_number_of_events== -1:
154 +                msg = 'You are selecting no_block_boundary=1 which does not allow to set total_number_of_events=-1\n'
155 +                msg +='\tYou shoud get the number of event from DBS web interface and use it for your configuration.'
156 +                raise CrabException(msg)
157 +            if len(self.seWhiteList) == 0 or  len(self.seWhiteList.split(',')) != 1:
158 +                msg = 'You are selecting no_block_boundary=1 which requires to choose one and only one site.\n'
159 +                msg += "\tPlease set se_white_list with the site's storage element name."
160 +                raise  CrabException(msg)
161 +            blockSites = self.ComputeSubBlockSites(blockSites)
162 +
163          # ---- Handle the possible job splitting configurations ---- #
164          if (self.selectTotalNumberEvents):
165              totalEventsRequested = self.total_number_of_events
# Line 300 | Line 383 | class JobSplitter:
383          allBlock = []
384  
385          blockCounter = 0
386 +        saveFblocks =''
387          for block in blocks:
388              if block in jobsOfBlock.keys() :
389                  blockCounter += 1
# Line 310 | Line 394 | class JobSplitter:
394                  if len(sites) == 0:
395                      noSiteBlock.append( spanRanges(jobsOfBlock[block]) )
396                      bloskNoSite.append( blockCounter )
397 +                else:
398 +                    saveFblocks += str(block)+'\n'
399 +        writeTXTfile(self, self.fileBlocks_FileName , saveFblocks)
400  
401          common.logger.info(screenOutput)
402          if len(noSiteBlock) > 0 and len(bloskNoSite) > 0:
# Line 319 | Line 406 | class JobSplitter:
406                  virgola = ","
407              for block in bloskNoSite:
408                  msg += ' ' + str(block) + virgola
409 <            msg += '\n               Related jobs:\n                 '
409 >            msg += '\n\t\tRelated jobs:\n                 '
410              virgola = ""
411              if len(noSiteBlock) > 1:
412                  virgola = ","
413              for range_jobs in noSiteBlock:
414                  msg += str(range_jobs) + virgola
415 <            msg += '\n               will not be submitted and this block of data can not be analyzed!\n'
415 >            msg += '\n\t\twill not be submitted and this block of data can not be analyzed!\n'
416              if self.cfg_params.has_key('GRID.se_white_list'):
417 <                msg += 'WARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
418 <                msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
419 <                msg += 'Please check if the dataset is available at this site!)\n'
417 >                msg += '\tWARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
418 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
419 >                msg += '\tPlease check if the dataset is available at this site!)'
420              if self.cfg_params.has_key('GRID.ce_white_list'):
421 <                msg += 'WARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
422 <                msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
423 <                msg += 'Please check if the dataset is available at this site!)\n'
421 >                msg += '\tWARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
422 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
423 >                msg += '\tPlease check if the dataset is available at this site!)\n'
424  
425              common.logger.info(msg)
426  
427          if bloskNoSite == allBlock:
428 <            raise CrabException('No jobs created')
428 >            msg += 'Requested jobs cannot be Created! \n'
429 >            if self.cfg_params.has_key('GRID.se_white_list'):
430 >                msg += '\tWARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
431 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
432 >                msg += '\tPlease check if the dataset is available at this site!)'
433 >            if self.cfg_params.has_key('GRID.ce_white_list'):
434 >                msg += '\tWARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
435 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
436 >                msg += '\tPlease check if the dataset is available at this site!)\n'
437 >            raise CrabException(msg)
438  
439          return
440  
# Line 347 | Line 443 | class JobSplitter:
443      def jobSplittingByRun(self):
444          """
445          """
350        from sets import Set
351        from WMCore.JobSplitting.RunBased import RunBased
352        from WMCore.DataStructs.Workflow import Workflow
353        from WMCore.DataStructs.File import File
354        from WMCore.DataStructs.Fileset import Fileset
355        from WMCore.DataStructs.Subscription import Subscription
356        from WMCore.JobSplitting.SplitterFactory import SplitterFactory
357        from WMCore.DataStructs.Run import Run
446  
447          self.checkUserSettings()
448          blockSites = self.args['blockSites']
# Line 393 | Line 481 | class JobSplitter:
481          jobfactory = splitter(subs)
482  
483          #loop over all runs
396        set = Set(runList)
484          list_of_lists = []
485          jobDestination = []
486 +        list_of_blocks = []
487          count = 0
488          for jobGroup in  jobfactory():
489              if count <  self.theNumberOfJobs:
# Line 407 | Line 495 | class JobSplitter:
495                  list_of_lists.append([fullString,str(-1),str(0)])
496                  #need to check single file location
497                  jobDestination.append(res['locations'])
498 +                list_of_blocks.append(res['block'])  
499                  count +=1
500 <       # prepare dict output
500 >        # prepare dict output
501          dictOut = {}
502          dictOut['params']= ['InputFiles','MaxEvents','SkipEvents']
503          dictOut['args'] = list_of_lists
504          dictOut['jobDestination'] = jobDestination
505          dictOut['njobs']=count
506  
507 +        self.cacheBlocks(list_of_blocks,jobDestination)
508 +
509          return dictOut
510  
511      def getJobInfo( self,jobGroup ):
# Line 428 | Line 519 | class JobSplitter:
519                  for loc in file['locations']:
520                      if tmp_check < 1 :
521                          locations.append(loc)
522 +                        res['block']= file['block']
523                  tmp_check = tmp_check + 1
432                ### qui va messo il check per la locations
524          res['lfns'] = lfns
525          res['locations'] = locations
526          return res
# Line 474 | Line 565 | class JobSplitter:
565  
566          managedGenerators =self.args['managedGenerators']
567          generator = self.args['generator']
568 <        firstRun = self.cfg_params.get('CMSSW.first_run',None)
568 >        firstLumi = self.cfg_params.get('CMSSW.first_lumi', 1)
569  
570          self.prepareSplittingNoInput()
571  
# Line 493 | Line 584 | class JobSplitter:
584          self.list_of_args = []
585          for i in range(self.total_number_of_jobs):
586              ## Since there is no input, any site is good
587 <            jobDestination.append([""]) #must be empty to write correctly the xml
587 >            jobDestination.append([""]) # must be empty to correctly write the XML
588              args=[]
589 <            if (firstRun):
590 <                ## pythia first run
500 <                args.append(str(firstRun)+str(i))
589 >            if (firstLumi): # Pythia first lumi
590 >                args.append(str(int(firstLumi)+i))
591              if (generator in managedGenerators):
592                 args.append(generator)
593                 if (generator == 'comphep' and i == 0):
# Line 511 | Line 601 | class JobSplitter:
601  
602          dictOut = {}
603          dictOut['params'] = ['MaxEvents']
604 <        if (firstRun):
605 <            dictOut['params'] = ['FirstRun','MaxEvents']
606 <            if ( generator in managedGenerators ) :
607 <                dictOut['params'] = ['FirstRun', 'Generator', 'FirstEvent', 'MaxEvents']
604 >        if (firstLumi):
605 >            dictOut['params'] = ['FirstLumi','MaxEvents']
606 >            if (generator in managedGenerators):
607 >                dictOut['params'] = ['FirstLumi', 'Generator', 'FirstEvent', 'MaxEvents']
608          else:
609              if (generator in managedGenerators) :
610                  dictOut['params'] = ['Generator', 'FirstEvent', 'MaxEvents']
# Line 565 | Line 655 | class JobSplitter:
655  
656      def jobSplittingByLumi(self):
657          """
658 +        Split task into jobs by Lumi section paying attention to which
659 +        lumis should be run (according to the analysis dataset).
660 +        This uses WMBS job splitting which does not split files over jobs
661 +        so the job will have AT LEAST as many lumis as requested, perhaps
662 +        more
663          """
664 <        return
664 >
665 >        common.logger.debug('Splitting by Lumi')
666 >        self.checkLumiSettings()
667 >
668 >        blockSites = self.args['blockSites']
669 >        pubdata = self.args['pubdata']
670 >
671 >        lumisPerFile  = pubdata.getLumis()
672 >
673 >        # Make the list of WMBS files for job splitter
674 >        fileList = pubdata.getListFiles()
675 >        thefiles = Fileset(name='FilesToSplit')
676 >        for jobFile in fileList:
677 >            block = jobFile['Block']['Name']
678 >            try:
679 >                jobFile['Block']['StorageElementList'].extend(blockSites[block])
680 >            except:
681 >                continue
682 >            wmbsFile = File(jobFile['LogicalFileName'])
683 >            [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
684 >            wmbsFile['block'] = block
685 >            for lumi in lumisPerFile[jobFile['LogicalFileName']]:
686 >                wmbsFile.addRun(Run(lumi[0], lumi[1]))
687 >            thefiles.addFile(wmbsFile)
688 >
689 >        # Create the factory and workflow
690 >        work = Workflow()
691 >        subs = Subscription(fileset    = thefiles,    workflow = work,
692 >                            split_algo = 'LumiBased', type     = "Processing")
693 >        splitter = SplitterFactory()
694 >        jobFactory = splitter(subs)
695 >
696 >        list_of_lists = []
697 >        jobDestination = []
698 >        jobCount = 0
699 >        lumisCreated = 0
700 >        list_of_blocks = []
701 >        if not self.limitJobLumis:
702 >            self.lumisPerJob = pubdata.getMaxLumis() // self.theNumberOfJobs + 1
703 >            common.logger.info('Each job will process about %s lumis.' %
704 >                                self.lumisPerJob)
705 >
706 >        for jobGroup in jobFactory(lumis_per_job = self.lumisPerJob):
707 >            for job in jobGroup.jobs:
708 >                if (self.limitNJobs and jobCount >= self.theNumberOfJobs):
709 >                    common.logger.info('Limit on number of jobs reached.')
710 >                    break
711 >                if (self.limitTotalLumis and lumisCreated >= self.totalNLumis):
712 >                    common.logger.info('Limit on number of lumis reached.')
713 >                    break
714 >                lumis = []
715 >                lfns  = []
716 >                locations = []
717 >                blocks = []
718 >                firstFile = True
719 >                # Collect information from all the files
720 >                for jobFile in job.getFiles():
721 >                    doFile = False
722 >                    if firstFile:  # Get locations from first file in the job
723 >                        for loc in jobFile['locations']:
724 >                            locations.append(loc)
725 >                        blocks.append(jobFile['block'])
726 >                        firstFile = False
727 >                    # Accumulate Lumis from all files
728 >                    for lumiList in jobFile['runs']:
729 >                        theRun = lumiList.run
730 >                        for theLumi in list(lumiList):
731 >                            if (not self.limitTotalLumis) or \
732 >                               (lumisCreated < self.totalNLumis):
733 >                                doFile = True
734 >                                lumisCreated += 1
735 >                                lumis.append( (theRun, theLumi) )
736 >                    if doFile:
737 >                        lfns.append(jobFile['lfn'])
738 >                fileString = ','.join(lfns)
739 >                lumiLister = LumiList(lumis = lumis)
740 >                lumiString = lumiLister.getCMSSWString()
741 >                list_of_lists.append([fileString, str(-1), str(0), lumiString])
742 >                list_of_blocks.append(blocks)
743 >                jobDestination.append(locations)
744 >                jobCount += 1
745 >                common.logger.debug('Job %s will run on %s files and %s lumis '
746 >                    % (jobCount, len(lfns), len(lumis) ))
747 >
748 >        common.logger.info('%s jobs created to run on %s lumis' %
749 >                              (jobCount, lumisCreated))
750 >
751 >        # Prepare dict output matching back to non-WMBS job creation
752 >        dictOut = {}
753 >        dictOut['params'] = ['InputFiles', 'MaxEvents', 'SkipEvents', 'Lumis']
754 >        dictOut['args'] = list_of_lists
755 >        dictOut['jobDestination'] = jobDestination
756 >        dictOut['njobs'] = jobCount
757 >
758 >        self.cacheBlocks(list_of_blocks,jobDestination)
759 >
760 >        return dictOut
761 >
762 >    def cacheBlocks(self, blocks,destinations):
763 >
764 >        saveFblocks=''
765 >        for i in range(len(blocks)):
766 >            sites=self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(destinations[i]))
767 >            if len(sites) != 0:
768 >                for block in blocks[i]:
769 >                    saveFblocks += str(block)+'\n'
770 >        writeTXTfile(self, self.fileBlocks_FileName , saveFblocks)
771 >
772      def Algos(self):
773          """
774          Define key splittingType matrix

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines