ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Splitter.py
(Generate patch)

Comparing COMP/CRAB/python/Splitter.py (file contents):
Revision 1.29 by ewv, Thu Oct 1 22:00:40 2009 UTC vs.
Revision 1.55 by belforte, Tue Mar 13 21:49:55 2012 UTC

# Line 3 | Line 3 | __revision__ = "$Id$"
3   __version__ = "$Revision$"
4  
5   import common
6 from sets import Set
6   from crab_exceptions import *
7   from crab_util import *
8  
# Line 14 | Line 13 | from WMCore.DataStructs.Subscription imp
13   from WMCore.DataStructs.Workflow import Workflow
14   from WMCore.JobSplitting.SplitterFactory import SplitterFactory
15   from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
16 + try: # Can remove when CMSSW 3.7 and earlier are dropped
17 +    from FWCore.PythonUtilities.LumiList import LumiList
18 + except ImportError:
19 +    from LumiList import LumiList
20  
21   class JobSplitter:
22      def __init__( self, cfg_params,  args ):
# Line 33 | Line 36 | class JobSplitter:
36          seBlackList = cfg_params.get('GRID.se_black_list',[])
37          self.blackWhiteListParser = SEBlackWhiteListParser(self.seWhiteList, seBlackList, common.logger())
38  
39 +        ## check if has been asked for a non default file to store/read analyzed fileBlocks
40 +        defaultName = common.work_space.shareDir()+'AnalyzedBlocks.txt'
41 +        self.fileBlocks_FileName = os.path.abspath(self.cfg_params.get('CMSSW.fileblocks_file',defaultName))
42 +
43  
44      def checkUserSettings(self):
45          ## Events per job
# Line 62 | Line 69 | class JobSplitter:
69              self.total_number_of_events = 0
70              self.selectTotalNumberEvents = 0
71  
72 +        return
73  
74      def checkLumiSettings(self):
75          """
# Line 85 | Line 93 | class JobSplitter:
93              settings += 1
94  
95          if settings != 2:
96 <            msg = 'When running on analysis datasets you must specify two and only two of:\n'
96 >            msg = 'When splitting by lumi section you must specify two and only two of:\n'
97              msg += '  number_of_jobs, lumis_per_job, total_number_of_lumis'
98              raise CrabException(msg)
99          if self.limitNJobs and self.limitJobLumis:
100              self.limitTotalLumis = True
101              self.totalNLumis = self.lumisPerJob * self.theNumberOfJobs
102  
103 +        # Has the user specified runselection?
104 +        if (self.cfg_params.has_key('CMSSW.runselection')):
105 +            common.logger.info('You have specified runselection and split by lumi.')
106 +            common.logger.info('Good lumi list will be the intersection of runselection and lumimask or ADS (if any).')
107 +        return
108  
109      def ComputeSubBlockSites( self, blockSites ):
110          """
# Line 182 | Line 195 | class JobSplitter:
195          if (self.selectNumberOfJobs):
196              common.logger.info("May not create the exact number_of_jobs requested.")
197  
198 +        if (self.theNumberOfJobs < 0):
199 +            common.logger.info("ERROR: Negative number_of_jobs requested. Will result in no jobs.")
200 +            
201          # old... to remove Daniele
202          totalNumberOfJobs = 999999999
203  
# Line 266 | Line 282 | class JobSplitter:
282                                  fullString = parString[:-1]
283                                  if self.useParent==1:
284                                      fullParentString = pString[:-1]
285 <                                    list_of_lists.append([fullString,fullParentString,str(-1),str(jobSkipEventCount)])
285 >                                    list_of_lists.append([fullString,fullParentString,str(-1),str(jobSkipEventCount),block])
286                                  else:
287 <                                    list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
287 >                                    list_of_lists.append([fullString,str(-1),str(jobSkipEventCount),block])
288                                  msg += "Job %s can run over %s  events (last file in block).\n"%(str(jobCount+1), str(filesEventCount - jobSkipEventCount))
289                                  jobDestination.append(blockSites[block])
290                                  msg += "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
# Line 295 | Line 311 | class JobSplitter:
311                          fullString = parString[:-1]
312                          if self.useParent==1:
313                              fullParentString = pString[:-1]
314 <                            list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
314 >                            list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount),block])
315                          else:
316 <                            list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
316 >                            list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount),block])
317                          msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
318                          jobDestination.append(blockSites[block])
319                          msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
# Line 320 | Line 336 | class JobSplitter:
336                          fullString = parString[:-1]
337                          if self.useParent==1:
338                              fullParentString = pString[:-1]
339 <                            list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
339 >                            list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount),block])
340                          else:
341 <                            list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
341 >                            list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount),block])
342                          msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
343                          jobDestination.append(blockSites[block])
344                          msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
# Line 355 | Line 371 | class JobSplitter:
371  
372         # prepare dict output
373          dictOut = {}
374 <        dictOut['params']= ['InputFiles','MaxEvents','SkipEvents']
375 <        if self.useParent: dictOut['params']= ['InputFiles','ParentFiles','MaxEvents','SkipEvents']
374 >        dictOut['params']= ['InputFiles','MaxEvents','SkipEvents','InputBlocks']
375 >        if self.useParent: dictOut['params']= ['InputFiles','ParentFiles','MaxEvents','SkipEvents','InputBlocks']
376          dictOut['args'] = list_of_lists
377          dictOut['jobDestination'] = jobDestination
378          dictOut['njobs']=self.total_number_of_jobs
# Line 373 | Line 389 | class JobSplitter:
389          allBlock = []
390  
391          blockCounter = 0
392 +        saveFblocks =''
393          for block in blocks:
394              if block in jobsOfBlock.keys() :
395                  blockCounter += 1
# Line 383 | Line 400 | class JobSplitter:
400                  if len(sites) == 0:
401                      noSiteBlock.append( spanRanges(jobsOfBlock[block]) )
402                      bloskNoSite.append( blockCounter )
403 +                else:
404 +                    saveFblocks += str(block)+'\n'
405 +        writeTXTfile(self, self.fileBlocks_FileName , saveFblocks)
406  
407          common.logger.info(screenOutput)
408          if len(noSiteBlock) > 0 and len(bloskNoSite) > 0:
# Line 411 | Line 431 | class JobSplitter:
431              common.logger.info(msg)
432  
433          if bloskNoSite == allBlock:
434 <            raise CrabException('No jobs created')
434 >            msg = 'Requested jobs cannot be Created! \n'
435 >            if self.cfg_params.has_key('GRID.se_white_list'):
436 >                msg += '\tWARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
437 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
438 >                msg += '\tPlease check if the dataset is available at this site!)'
439 >            if self.cfg_params.has_key('GRID.ce_white_list'):
440 >                msg += '\tWARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
441 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
442 >                msg += '\tPlease check if the dataset is available at this site!)\n'
443 >            raise CrabException(msg)
444  
445          return
446  
# Line 438 | Line 467 | class JobSplitter:
467              except:
468                  continue
469              wmbsFile = File(f['LogicalFileName'])
470 +            if not  blockSites[block]:
471 +                msg = 'WARNING: No sites are hosting any part of data for block: %s\n' %block                
472 +                msg += 'Related jobs will not be submitted and this block of data can not be analyzed'
473 +                common.logger.debug(msg)
474              [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
475              wmbsFile['block'] = block
476              runNum = f['RunsList'][0]['RunNumber']
# Line 458 | Line 491 | class JobSplitter:
491          jobfactory = splitter(subs)
492  
493          #loop over all runs
461        set = Set(runList)
494          list_of_lists = []
495          jobDestination = []
496 +        list_of_blocks = []
497          count = 0
498          for jobGroup in  jobfactory():
499              if count <  self.theNumberOfJobs:
# Line 468 | Line 501 | class JobSplitter:
501                  parString = ''
502                  for file in res['lfns']:
503                      parString += file + ','
504 +                list_of_blocks.append(res['block'])
505                  fullString = parString[:-1]
506 <                list_of_lists.append([fullString,str(-1),str(0)])
506 >                blockString=','.join(list_of_blocks)
507 >                list_of_lists.append([fullString,str(-1),str(0),blockString])
508                  #need to check single file location
509                  jobDestination.append(res['locations'])
510                  count +=1
511 <       # prepare dict output
511 >        # prepare dict output
512          dictOut = {}
513 <        dictOut['params']= ['InputFiles','MaxEvents','SkipEvents']
513 >        dictOut['params']= ['InputFiles','MaxEvents','SkipEvents','InputBlocks']
514          dictOut['args'] = list_of_lists
515          dictOut['jobDestination'] = jobDestination
516          dictOut['njobs']=count
517 +        self.cacheBlocks(list_of_blocks,jobDestination)
518  
519          return dictOut
520  
# Line 493 | Line 529 | class JobSplitter:
529                  for loc in file['locations']:
530                      if tmp_check < 1 :
531                          locations.append(loc)
532 +                        res['block']= file['block']
533                  tmp_check = tmp_check + 1
497                ### qui va messo il check per la locations
534          res['lfns'] = lfns
535          res['locations'] = locations
536          return res
# Line 539 | Line 575 | class JobSplitter:
575  
576          managedGenerators =self.args['managedGenerators']
577          generator = self.args['generator']
578 <        firstRun = self.cfg_params.get('CMSSW.first_run', 1)
578 >        firstLumi = self.cfg_params.get('CMSSW.first_lumi', 1)
579  
580          self.prepareSplittingNoInput()
581  
# Line 560 | Line 596 | class JobSplitter:
596              ## Since there is no input, any site is good
597              jobDestination.append([""]) # must be empty to correctly write the XML
598              args=[]
599 <            if (firstRun): # Pythia first run
600 <                args.append(str(int(firstRun)+i))
599 >            if (firstLumi): # Pythia first lumi
600 >                args.append(str(int(firstLumi)+i))
601              if (generator in managedGenerators):
602                 args.append(generator)
603                 if (generator == 'comphep' and i == 0):
# Line 575 | Line 611 | class JobSplitter:
611  
612          dictOut = {}
613          dictOut['params'] = ['MaxEvents']
614 <        if (firstRun):
615 <            dictOut['params'] = ['FirstRun','MaxEvents']
616 <            if ( generator in managedGenerators ) :
617 <                dictOut['params'] = ['FirstRun', 'Generator', 'FirstEvent', 'MaxEvents']
614 >        if (firstLumi):
615 >            dictOut['params'] = ['FirstLumi','MaxEvents']
616 >            if (generator in managedGenerators):
617 >                dictOut['params'] = ['FirstLumi', 'Generator', 'FirstEvent', 'MaxEvents']
618          else:
619              if (generator in managedGenerators) :
620                  dictOut['params'] = ['Generator', 'FirstEvent', 'MaxEvents']
# Line 635 | Line 671 | class JobSplitter:
671          so the job will have AT LEAST as many lumis as requested, perhaps
672          more
673          """
674 <
674 >        self.useParent = int(self.cfg_params.get('CMSSW.use_parent',0))
675          common.logger.debug('Splitting by Lumi')
676          self.checkLumiSettings()
677  
# Line 643 | Line 679 | class JobSplitter:
679          pubdata = self.args['pubdata']
680  
681          lumisPerFile  = pubdata.getLumis()
682 <
682 >        self.parentFiles=pubdata.getParent()
683          # Make the list of WMBS files for job splitter
684          fileList = pubdata.getListFiles()
685 <        thefiles = Fileset(name='FilesToSplit')
685 >        wmFileList = []
686          for jobFile in fileList:
687              block = jobFile['Block']['Name']
688              try:
# Line 654 | Line 690 | class JobSplitter:
690              except:
691                  continue
692              wmbsFile = File(jobFile['LogicalFileName'])
693 +            if not  blockSites[block]:
694 +                msg = 'WARNING: No sites are hosting any part of data for block: %s\n' %block
695 +                msg += 'Related jobs will not be submitted and this block of data can not be analyzed'
696 +                common.logger.debug(msg)
697 +               # wmbsFile['locations'].add('Nowhere')
698              [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
699              wmbsFile['block'] = block
700              for lumi in lumisPerFile[jobFile['LogicalFileName']]:
701                  wmbsFile.addRun(Run(lumi[0], lumi[1]))
702 <            thefiles.addFile(wmbsFile)
702 >            wmFileList.append(wmbsFile)
703 >
704 >        fileSet = set(wmFileList)
705 >        thefiles = Fileset(name='FilesToSplit', files = fileSet)
706  
707          # Create the factory and workflow
708          work = Workflow()
# Line 671 | Line 715 | class JobSplitter:
715          jobDestination = []
716          jobCount = 0
717          lumisCreated = 0
718 <
718 >        list_of_blocks = []
719          if not self.limitJobLumis:
720 <            self.lumisPerJob = pubdata.getMaxLumis() // self.theNumberOfJobs + 1
720 >            if self.totalNLumis > 0:
721 >                self.lumisPerJob = max(self.totalNLumis // self.theNumberOfJobs,1)
722 >            else:
723 >                self.lumisPerJob = pubdata.getMaxLumis() // self.theNumberOfJobs + 1
724              common.logger.info('Each job will process about %s lumis.' %
725                                  self.lumisPerJob)
726  
727 <        for jobGroup in  jobFactory(lumis_per_job = self.lumisPerJob):
727 >        for jobGroup in jobFactory(lumis_per_job = self.lumisPerJob):
728              for job in jobGroup.jobs:
729                  if (self.limitNJobs and jobCount >= self.theNumberOfJobs):
730 <                    common.logger.info('Limit on number of jobs reached.')
730 >                    common.logger.info('Requested number of jobs reached.')
731                      break
732                  if (self.limitTotalLumis and lumisCreated >= self.totalNLumis):
733 <                    common.logger.info('Limit on number of lumis reached.')
733 >                    common.logger.info('Requested number of lumis reached.')
734                      break
735                  lumis = []
736                  lfns  = []
737 +                if self.useParent==1:
738 +                 parentlfns  = []
739 +                 pString =""
740 +
741                  locations = []
742 +                blocks = []
743                  firstFile = True
744                  # Collect information from all the files
745                  for jobFile in job.getFiles():
746 +                    doFile = False
747                      if firstFile:  # Get locations from first file in the job
748                          for loc in jobFile['locations']:
749                              locations.append(loc)
750 +                        blocks.append(jobFile['block'])
751                          firstFile = False
752                      # Accumulate Lumis from all files
753                      for lumiList in jobFile['runs']:
754                          theRun = lumiList.run
755                          for theLumi in list(lumiList):
756 <                            lumis.append( (theRun, theLumi) )
757 <
758 <                    lfns.append(jobFile['lfn'])
756 >                            if (not self.limitTotalLumis) or \
757 >                               (lumisCreated < self.totalNLumis):
758 >                                doFile = True
759 >                                lumisCreated += 1
760 >                                lumis.append( (theRun, theLumi) )
761 >                    if doFile:
762 >                        lfns.append(jobFile['lfn'])
763 >                        if self.useParent==1:
764 >                           parent = self.parentFiles[jobFile['lfn']]
765 >                           for p in parent :
766 >                               pString += p  + ','
767                  fileString = ','.join(lfns)
768 <                lumiString = compressLumiString(lumis)
769 <                list_of_lists.append([fileString, str(-1), str(0), lumiString])
770 <
768 >                lumiLister = LumiList(lumis = lumis)
769 >                lumiString = lumiLister.getCMSSWString()
770 >                blockString=','.join(blocks)
771 >                if self.useParent==1:
772 >                  common.logger.debug("Files: "+fileString+" with the following parents: "+pString[:-1])
773 >                  pfileString = pString[:-1]
774 >                  list_of_lists.append([fileString, pfileString, str(-1), str(0), lumiString,blockString])
775 >                else:
776 >                 list_of_lists.append([fileString, str(-1), str(0), lumiString, blockString])
777 >                list_of_blocks.append(blocks)
778                  jobDestination.append(locations)
779                  jobCount += 1
711                lumisCreated += len(lumis)
780                  common.logger.debug('Job %s will run on %s files and %s lumis '
781                      % (jobCount, len(lfns), len(lumis) ))
782  
# Line 717 | Line 785 | class JobSplitter:
785  
786          # Prepare dict output matching back to non-WMBS job creation
787          dictOut = {}
788 <        dictOut['params'] = ['InputFiles', 'MaxEvents', 'SkipEvents', 'Lumis']
788 >        dictOut['params'] = ['InputFiles', 'MaxEvents', 'SkipEvents', 'Lumis','InputBlocks']
789 >        if self.useParent==1:
790 >         dictOut['params']= ['InputFiles','ParentFiles','MaxEvents','SkipEvents','Lumis','InputBlocks']
791          dictOut['args'] = list_of_lists
792          dictOut['jobDestination'] = jobDestination
793          dictOut['njobs'] = jobCount
794 +        self.cacheBlocks(list_of_blocks,jobDestination)
795  
796          return dictOut
797  
798 +    def cacheBlocks(self, blocks,destinations):
799 +
800 +        saveFblocks=''
801 +        for i in range(len(blocks)):
802 +            sites=self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(destinations[i]))
803 +            if len(sites) != 0:
804 +                for block in blocks[i]:
805 +                    saveFblocks += str(block)+'\n'
806 +        writeTXTfile(self, self.fileBlocks_FileName , saveFblocks)
807  
808      def Algos(self):
809          """
# Line 738 | Line 818 | class JobSplitter:
818                       }
819          return SplitAlogs
820  
741
742
743 def compressLumiString(lumis):
744    """
745    Turn a list of 2-tuples of run/lumi numbers into a list of the format
746    R1:L1,R2:L2-R3:L3 which is acceptable to CMSSW LumiBlockRange variable
747    """
748
749    lumis.sort()
750    parts = []
751    startRange = None
752    endRange = None
753
754    for lumiBlock in lumis:
755        if not startRange: # This is the first one
756            startRange = lumiBlock
757            endRange = lumiBlock
758        elif lumiBlock == endRange: # Same Lumi (different files?)
759            pass
760        elif lumiBlock[0] == endRange[0] and lumiBlock[1] == endRange[1] + 1: # This is a continuation
761            endRange = lumiBlock
762        else: # This is the start of a new range
763            part = ':'.join(map(str, startRange))
764            if startRange != endRange:
765                part += '-' + ':'.join(map(str, endRange))
766            parts.append(part)
767            startRange = lumiBlock
768            endRange = lumiBlock
769
770    # Put out what's left
771    if startRange:
772        part = ':'.join(map(str, startRange))
773        if startRange != endRange:
774            part += '-' + ':'.join(map(str, endRange))
775        parts.append(part)
776
777    output = ','.join(parts)
778    return output
779
780

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines