ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Splitter.py
(Generate patch)

Comparing COMP/CRAB/python/Splitter.py (file contents):
Revision 1.35 by ewv, Mon Mar 22 15:29:42 2010 UTC vs.
Revision 1.49 by spiga, Wed Mar 2 10:48:46 2011 UTC

# Line 13 | Line 13 | from WMCore.DataStructs.Subscription imp
13   from WMCore.DataStructs.Workflow import Workflow
14   from WMCore.JobSplitting.SplitterFactory import SplitterFactory
15   from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
16 < from LumiList import LumiList
16 > try: # Can remove when CMSSW 3.7 and earlier are dropped
17 >    from FWCore.PythonUtilities.LumiList import LumiList
18 > except ImportError:
19 >    from LumiList import LumiList
20  
21   class JobSplitter:
22      def __init__( self, cfg_params,  args ):
# Line 33 | Line 36 | class JobSplitter:
36          seBlackList = cfg_params.get('GRID.se_black_list',[])
37          self.blackWhiteListParser = SEBlackWhiteListParser(self.seWhiteList, seBlackList, common.logger())
38  
39 +        ## check if has been asked for a non default file to store/read analyzed fileBlocks
40 +        defaultName = common.work_space.shareDir()+'AnalyzedBlocks.txt'
41 +        self.fileBlocks_FileName = os.path.abspath(self.cfg_params.get('CMSSW.fileblocks_file',defaultName))
42 +
43  
44      def checkUserSettings(self):
45          ## Events per job
# Line 62 | Line 69 | class JobSplitter:
69              self.total_number_of_events = 0
70              self.selectTotalNumberEvents = 0
71  
72 +        return
73  
74      def checkLumiSettings(self):
75          """
# Line 92 | Line 100 | class JobSplitter:
100              self.limitTotalLumis = True
101              self.totalNLumis = self.lumisPerJob * self.theNumberOfJobs
102  
103 +        # Has the user specified runselection?
104 +        if (self.cfg_params.has_key('CMSSW.runselection')):
105 +            common.logger.info('You have specified runselection and split by lumi.')
106 +            common.logger.info('Good lumi list will be the intersection of runselection and lumimask or ADS (if any).')
107 +        return
108  
109      def ComputeSubBlockSites( self, blockSites ):
110          """
# Line 266 | Line 279 | class JobSplitter:
279                                  fullString = parString[:-1]
280                                  if self.useParent==1:
281                                      fullParentString = pString[:-1]
282 <                                    list_of_lists.append([fullString,fullParentString,str(-1),str(jobSkipEventCount)])
282 >                                    list_of_lists.append([fullString,fullParentString,str(-1),str(jobSkipEventCount)],block)
283                                  else:
284 <                                    list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
284 >                                    list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)],block)
285                                  msg += "Job %s can run over %s  events (last file in block).\n"%(str(jobCount+1), str(filesEventCount - jobSkipEventCount))
286                                  jobDestination.append(blockSites[block])
287                                  msg += "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
# Line 295 | Line 308 | class JobSplitter:
308                          fullString = parString[:-1]
309                          if self.useParent==1:
310                              fullParentString = pString[:-1]
311 <                            list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
311 >                            list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)],block)
312                          else:
313 <                            list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
313 >                            list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)],block)
314                          msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
315                          jobDestination.append(blockSites[block])
316                          msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
# Line 320 | Line 333 | class JobSplitter:
333                          fullString = parString[:-1]
334                          if self.useParent==1:
335                              fullParentString = pString[:-1]
336 <                            list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
336 >                            list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)],block)
337                          else:
338 <                            list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
338 >                            list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)],block)
339                          msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
340                          jobDestination.append(blockSites[block])
341                          msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
# Line 355 | Line 368 | class JobSplitter:
368  
369         # prepare dict output
370          dictOut = {}
371 <        dictOut['params']= ['InputFiles','MaxEvents','SkipEvents']
372 <        if self.useParent: dictOut['params']= ['InputFiles','ParentFiles','MaxEvents','SkipEvents']
371 >        dictOut['params']= ['InputFiles','MaxEvents','SkipEvents','InputBlocks']
372 >        if self.useParent: dictOut['params']= ['InputFiles','ParentFiles','MaxEvents','SkipEvents','InputBlocks']
373          dictOut['args'] = list_of_lists
374          dictOut['jobDestination'] = jobDestination
375          dictOut['njobs']=self.total_number_of_jobs
# Line 373 | Line 386 | class JobSplitter:
386          allBlock = []
387  
388          blockCounter = 0
389 +        saveFblocks =''
390          for block in blocks:
391              if block in jobsOfBlock.keys() :
392                  blockCounter += 1
# Line 383 | Line 397 | class JobSplitter:
397                  if len(sites) == 0:
398                      noSiteBlock.append( spanRanges(jobsOfBlock[block]) )
399                      bloskNoSite.append( blockCounter )
400 +                else:
401 +                    saveFblocks += str(block)+'\n'
402 +        writeTXTfile(self, self.fileBlocks_FileName , saveFblocks)
403  
404          common.logger.info(screenOutput)
405          if len(noSiteBlock) > 0 and len(bloskNoSite) > 0:
# Line 469 | Line 486 | class JobSplitter:
486          #loop over all runs
487          list_of_lists = []
488          jobDestination = []
489 +        list_of_blocks = []
490          count = 0
491          for jobGroup in  jobfactory():
492              if count <  self.theNumberOfJobs:
# Line 476 | Line 494 | class JobSplitter:
494                  parString = ''
495                  for file in res['lfns']:
496                      parString += file + ','
497 +                list_of_blocks.append(res['block'])
498                  fullString = parString[:-1]
499 <                list_of_lists.append([fullString,str(-1),str(0)])
499 >                blockString=','.join(list_of_blocks)
500 >                list_of_lists.append([fullString,str(-1),str(0)],blockString)
501                  #need to check single file location
502                  jobDestination.append(res['locations'])
503                  count +=1
504 <       # prepare dict output
504 >        # prepare dict output
505          dictOut = {}
506 <        dictOut['params']= ['InputFiles','MaxEvents','SkipEvents']
506 >        dictOut['params']= ['InputFiles','MaxEvents','SkipEvents','InputBlocks']
507          dictOut['args'] = list_of_lists
508          dictOut['jobDestination'] = jobDestination
509          dictOut['njobs']=count
510 +        self.cacheBlocks(list_of_blocks,jobDestination)
511  
512          return dictOut
513  
# Line 501 | Line 522 | class JobSplitter:
522                  for loc in file['locations']:
523                      if tmp_check < 1 :
524                          locations.append(loc)
525 +                        res['block']= file['block']
526                  tmp_check = tmp_check + 1
505                ### qui va messo il check per la locations
527          res['lfns'] = lfns
528          res['locations'] = locations
529          return res
# Line 643 | Line 664 | class JobSplitter:
664          so the job will have AT LEAST as many lumis as requested, perhaps
665          more
666          """
667 <
667 >        self.useParent = int(self.cfg_params.get('CMSSW.use_parent',0))
668          common.logger.debug('Splitting by Lumi')
669          self.checkLumiSettings()
670  
# Line 651 | Line 672 | class JobSplitter:
672          pubdata = self.args['pubdata']
673  
674          lumisPerFile  = pubdata.getLumis()
675 <
675 >        self.parentFiles=pubdata.getParent()
676          # Make the list of WMBS files for job splitter
677          fileList = pubdata.getListFiles()
678 <        thefiles = Fileset(name='FilesToSplit')
678 >        wmFileList = []
679          for jobFile in fileList:
680              block = jobFile['Block']['Name']
681              try:
# Line 662 | Line 683 | class JobSplitter:
683              except:
684                  continue
685              wmbsFile = File(jobFile['LogicalFileName'])
686 +            if not  blockSites[block]:
687 +                wmbsFile['locations'].add('Nowhere')
688              [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
689              wmbsFile['block'] = block
690              for lumi in lumisPerFile[jobFile['LogicalFileName']]:
691                  wmbsFile.addRun(Run(lumi[0], lumi[1]))
692 <            thefiles.addFile(wmbsFile)
692 >            wmFileList.append(wmbsFile)
693 >
694 >        fileSet = set(wmFileList)
695 >        thefiles = Fileset(name='FilesToSplit', files = fileSet)
696  
697          # Create the factory and workflow
698          work = Workflow()
# Line 679 | Line 705 | class JobSplitter:
705          jobDestination = []
706          jobCount = 0
707          lumisCreated = 0
708 <
708 >        list_of_blocks = []
709          if not self.limitJobLumis:
710 <            self.lumisPerJob = pubdata.getMaxLumis() // self.theNumberOfJobs + 1
710 >            if self.totalNLumis > 0:
711 >                self.lumisPerJob = max(self.totalNLumis // self.theNumberOfJobs,1)
712 >            else:
713 >                self.lumisPerJob = pubdata.getMaxLumis() // self.theNumberOfJobs + 1
714              common.logger.info('Each job will process about %s lumis.' %
715                                  self.lumisPerJob)
716  
717 <        for jobGroup in  jobFactory(lumis_per_job = self.lumisPerJob):
717 >        for jobGroup in jobFactory(lumis_per_job = self.lumisPerJob):
718              for job in jobGroup.jobs:
719                  if (self.limitNJobs and jobCount >= self.theNumberOfJobs):
720 <                    common.logger.info('Limit on number of jobs reached.')
720 >                    common.logger.info('Requested number of jobs reached.')
721                      break
722                  if (self.limitTotalLumis and lumisCreated >= self.totalNLumis):
723 <                    common.logger.info('Limit on number of lumis reached.')
723 >                    common.logger.info('Requested number of lumis reached.')
724                      break
725                  lumis = []
726                  lfns  = []
727 +                if self.useParent==1:
728 +                 parentlfns  = []
729 +                 pString =""
730 +
731                  locations = []
732 +                blocks = []
733                  firstFile = True
734                  # Collect information from all the files
735                  for jobFile in job.getFiles():
# Line 703 | Line 737 | class JobSplitter:
737                      if firstFile:  # Get locations from first file in the job
738                          for loc in jobFile['locations']:
739                              locations.append(loc)
740 +                        blocks.append(jobFile['block'])
741                          firstFile = False
742                      # Accumulate Lumis from all files
743                      for lumiList in jobFile['runs']:
744                          theRun = lumiList.run
745                          for theLumi in list(lumiList):
711                            lumisCreated += 1
746                              if (not self.limitTotalLumis) or \
747 <                               (lumisCreated <= self.totalNLumis):
747 >                               (lumisCreated < self.totalNLumis):
748                                  doFile = True
749 +                                lumisCreated += 1
750                                  lumis.append( (theRun, theLumi) )
751                      if doFile:
752                          lfns.append(jobFile['lfn'])
753 +                        if self.useParent==1:
754 +                           parent = self.parentFiles[jobFile['lfn']]
755 +                           for p in parent :
756 +                               pString += p  + ','
757                  fileString = ','.join(lfns)
758                  lumiLister = LumiList(lumis = lumis)
759                  lumiString = lumiLister.getCMSSWString()
760 <                list_of_lists.append([fileString, str(-1), str(0), lumiString])
761 <
760 >                blockString=','.join(blocks)
761 >                if self.useParent==1:
762 >                  common.logger.debug("Files: "+fileString+" with the following parents: "+pString[:-1])
763 >                  pfileString = pString[:-1]
764 >                  list_of_lists.append([fileString, pfileString, str(-1), str(0), lumiString,blockString])
765 >                else:
766 >                 list_of_lists.append([fileString, str(-1), str(0), lumiString, blockString])
767 >                list_of_blocks.append(blocks)
768                  jobDestination.append(locations)
769                  jobCount += 1
770                  common.logger.debug('Job %s will run on %s files and %s lumis '
# Line 730 | Line 775 | class JobSplitter:
775  
776          # Prepare dict output matching back to non-WMBS job creation
777          dictOut = {}
778 <        dictOut['params'] = ['InputFiles', 'MaxEvents', 'SkipEvents', 'Lumis']
778 >        dictOut['params'] = ['InputFiles', 'MaxEvents', 'SkipEvents', 'Lumis','InputBlocks']
779 >        if self.useParent==1:
780 >         dictOut['params']= ['InputFiles','ParentFiles','MaxEvents','SkipEvents','Lumis','InputBlocks']
781          dictOut['args'] = list_of_lists
782          dictOut['jobDestination'] = jobDestination
783          dictOut['njobs'] = jobCount
784 +        self.cacheBlocks(list_of_blocks,jobDestination)
785  
786          return dictOut
787  
788 +    def cacheBlocks(self, blocks,destinations):
789 +
790 +        saveFblocks=''
791 +        for i in range(len(blocks)):
792 +            sites=self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(destinations[i]))
793 +            if len(sites) != 0:
794 +                for block in blocks[i]:
795 +                    saveFblocks += str(block)+'\n'
796 +        writeTXTfile(self, self.fileBlocks_FileName , saveFblocks)
797  
798      def Algos(self):
799          """

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines