ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Splitter.py
(Generate patch)

Comparing COMP/CRAB/python/Splitter.py (file contents):
Revision 1.32.4.1 by spiga, Wed Apr 7 15:56:40 2010 UTC vs.
Revision 1.41 by ewv, Tue Jun 29 17:46:42 2010 UTC

# Line 13 | Line 13 | from WMCore.DataStructs.Subscription imp
13   from WMCore.DataStructs.Workflow import Workflow
14   from WMCore.JobSplitting.SplitterFactory import SplitterFactory
15   from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
16 < from LumiList import LumiList
16 > try: # Can remove when CMSSW 3.7 and earlier are dropped
17 >    from FWCore.PythonUtilities.LumiList import LumiList
18 > except ImportError:
19 >    from LumiList import LumiList
20  
21   class JobSplitter:
22      def __init__( self, cfg_params,  args ):
# Line 33 | Line 36 | class JobSplitter:
36          seBlackList = cfg_params.get('GRID.se_black_list',[])
37          self.blackWhiteListParser = SEBlackWhiteListParser(self.seWhiteList, seBlackList, common.logger())
38  
39 +        ## check if has been asked for a non default file to store/read analyzed fileBlocks
40 +        defaultName = common.work_space.shareDir()+'AnalyzedBlocks.txt'
41 +        self.fileBlocks_FileName = os.path.abspath(self.cfg_params.get('CMSSW.fileblocks_file',defaultName))
42 +
43  
44      def checkUserSettings(self):
45          ## Events per job
# Line 62 | Line 69 | class JobSplitter:
69              self.total_number_of_events = 0
70              self.selectTotalNumberEvents = 0
71  
72 +        return
73  
74      def checkLumiSettings(self):
75          """
# Line 92 | Line 100 | class JobSplitter:
100              self.limitTotalLumis = True
101              self.totalNLumis = self.lumisPerJob * self.theNumberOfJobs
102  
103 +        # Has the user specified runselection?
104 +        if (self.cfg_params.has_key('CMSSW.runselection')):
105 +            common.logger.info('You have specified runselection and split by lumi.')
106 +            common.logger.info('Good lumi list will be the intersection of runselection and lumimask or ADS (if any).')
107 +        return
108  
109      def ComputeSubBlockSites( self, blockSites ):
110          """
# Line 373 | Line 386 | class JobSplitter:
386          allBlock = []
387  
388          blockCounter = 0
389 +        saveFblocks =''
390          for block in blocks:
391              if block in jobsOfBlock.keys() :
392                  blockCounter += 1
# Line 383 | Line 397 | class JobSplitter:
397                  if len(sites) == 0:
398                      noSiteBlock.append( spanRanges(jobsOfBlock[block]) )
399                      bloskNoSite.append( blockCounter )
400 +                else:
401 +                    saveFblocks += str(block)+'\n'
402 +        writeTXTfile(self, self.fileBlocks_FileName , saveFblocks)
403  
404          common.logger.info(screenOutput)
405          if len(noSiteBlock) > 0 and len(bloskNoSite) > 0:
# Line 469 | Line 486 | class JobSplitter:
486          #loop over all runs
487          list_of_lists = []
488          jobDestination = []
489 +        list_of_blocks = []
490          count = 0
491          for jobGroup in  jobfactory():
492              if count <  self.theNumberOfJobs:
# Line 480 | Line 498 | class JobSplitter:
498                  list_of_lists.append([fullString,str(-1),str(0)])
499                  #need to check single file location
500                  jobDestination.append(res['locations'])
501 +                list_of_blocks.append(res['block'])
502                  count +=1
503 <       # prepare dict output
503 >        # prepare dict output
504          dictOut = {}
505          dictOut['params']= ['InputFiles','MaxEvents','SkipEvents']
506          dictOut['args'] = list_of_lists
507          dictOut['jobDestination'] = jobDestination
508          dictOut['njobs']=count
509  
510 +        self.cacheBlocks(list_of_blocks,jobDestination)
511 +
512          return dictOut
513  
514      def getJobInfo( self,jobGroup ):
# Line 501 | Line 522 | class JobSplitter:
522                  for loc in file['locations']:
523                      if tmp_check < 1 :
524                          locations.append(loc)
525 +                        res['block']= file['block']
526                  tmp_check = tmp_check + 1
505                ### qui va messo il check per la locations
527          res['lfns'] = lfns
528          res['locations'] = locations
529          return res
# Line 679 | Line 700 | class JobSplitter:
700          jobDestination = []
701          jobCount = 0
702          lumisCreated = 0
703 <
703 >        list_of_blocks = []
704          if not self.limitJobLumis:
705              self.lumisPerJob = pubdata.getMaxLumis() // self.theNumberOfJobs + 1
706              common.logger.info('Each job will process about %s lumis.' %
707                                  self.lumisPerJob)
708  
709 <        for jobGroup in  jobFactory(lumis_per_job = self.lumisPerJob):
709 >        for jobGroup in jobFactory(lumis_per_job = self.lumisPerJob):
710              for job in jobGroup.jobs:
711                  if (self.limitNJobs and jobCount >= self.theNumberOfJobs):
712                      common.logger.info('Limit on number of jobs reached.')
# Line 696 | Line 717 | class JobSplitter:
717                  lumis = []
718                  lfns  = []
719                  locations = []
720 +                blocks = []
721                  firstFile = True
722                  # Collect information from all the files
723                  for jobFile in job.getFiles():
# Line 703 | Line 725 | class JobSplitter:
725                      if firstFile:  # Get locations from first file in the job
726                          for loc in jobFile['locations']:
727                              locations.append(loc)
728 +                        blocks.append(jobFile['block'])
729                          firstFile = False
730                      # Accumulate Lumis from all files
731                      for lumiList in jobFile['runs']:
732                          theRun = lumiList.run
733                          for theLumi in list(lumiList):
711                            lumisCreated += 1
734                              if (not self.limitTotalLumis) or \
735 <                               (lumisCreated <= self.totalNLumis):
735 >                               (lumisCreated < self.totalNLumis):
736                                  doFile = True
737 +                                lumisCreated += 1
738                                  lumis.append( (theRun, theLumi) )
739                      if doFile:
740                          lfns.append(jobFile['lfn'])
# Line 719 | Line 742 | class JobSplitter:
742                  lumiLister = LumiList(lumis = lumis)
743                  lumiString = lumiLister.getCMSSWString()
744                  list_of_lists.append([fileString, str(-1), str(0), lumiString])
745 <
745 >                list_of_blocks.append(blocks)
746                  jobDestination.append(locations)
747                  jobCount += 1
748                  common.logger.debug('Job %s will run on %s files and %s lumis '
# Line 735 | Line 758 | class JobSplitter:
758          dictOut['jobDestination'] = jobDestination
759          dictOut['njobs'] = jobCount
760  
761 +        self.cacheBlocks(list_of_blocks,jobDestination)
762 +
763          return dictOut
764  
765 +    def cacheBlocks(self, blocks,destinations):
766 +
767 +        saveFblocks=''
768 +        for i in range(len(blocks)):
769 +            sites=self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(destinations[i]))
770 +            if len(sites) != 0:
771 +                for block in blocks[i]:
772 +                    saveFblocks += str(block)+'\n'
773 +        writeTXTfile(self, self.fileBlocks_FileName , saveFblocks)
774  
775      def Algos(self):
776          """

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines