ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Splitter.py
(Generate patch)

Comparing COMP/CRAB/python/Splitter.py (file contents):
Revision 1.19 by slacapra, Wed Jun 10 11:31:33 2009 UTC vs.
Revision 1.42 by ewv, Tue Jul 6 14:44:27 2010 UTC

# Line 1 | Line 1
1 +
2 + __revision__ = "$Id$"
3 + __version__ = "$Revision$"
4 +
5   import common
6   from crab_exceptions import *
7   from crab_util import *
8 +
9 + from WMCore.DataStructs.File import File
10 + from WMCore.DataStructs.Fileset import Fileset
11 + from WMCore.DataStructs.Run import Run
12 + from WMCore.DataStructs.Subscription import Subscription
13 + from WMCore.DataStructs.Workflow import Workflow
14 + from WMCore.JobSplitting.SplitterFactory import SplitterFactory
15   from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
16 + from LumiList import LumiList
17  
18   class JobSplitter:
19      def __init__( self, cfg_params,  args ):
20          self.cfg_params = cfg_params
21          self.args=args
22 +
23 +        self.lumisPerJob = -1
24 +        self.totalNLumis = 0
25 +        self.theNumberOfJobs = 0
26 +        self.limitNJobs = False
27 +        self.limitTotalLumis = False
28 +        self.limitJobLumis = False
29 +
30          #self.maxEvents
31          # init BlackWhiteListParser
32 <        seWhiteList = cfg_params.get('GRID.se_white_list',[])
32 >        self.seWhiteList = cfg_params.get('GRID.se_white_list',[])
33          seBlackList = cfg_params.get('GRID.se_black_list',[])
34 <        self.blackWhiteListParser = SEBlackWhiteListParser(seWhiteList, seBlackList, common.logger())
34 >        self.blackWhiteListParser = SEBlackWhiteListParser(self.seWhiteList, seBlackList, common.logger())
35 >
36 >        ## check if has been asked for a non default file to store/read analyzed fileBlocks
37 >        defaultName = common.work_space.shareDir()+'AnalyzedBlocks.txt'
38 >        self.fileBlocks_FileName = os.path.abspath(self.cfg_params.get('CMSSW.fileblocks_file',defaultName))
39  
40  
41      def checkUserSettings(self):
# Line 42 | Line 66 | class JobSplitter:
66              self.total_number_of_events = 0
67              self.selectTotalNumberEvents = 0
68  
69 +        return
70 +
71 +    def checkLumiSettings(self):
72 +        """
73 +        Check to make sure the user has specified enough information to
74 +        perform splitting by Lumis to run the job
75 +        """
76 +        settings = 0
77 +        if self.cfg_params.has_key('CMSSW.lumis_per_job'):
78 +            self.lumisPerJob =int( self.cfg_params['CMSSW.lumis_per_job'])
79 +            self.limitJobLumis = True
80 +            settings += 1
81 +
82 +        if self.cfg_params.has_key('CMSSW.number_of_jobs'):
83 +            self.theNumberOfJobs =int( self.cfg_params['CMSSW.number_of_jobs'])
84 +            self.limitNJobs = True
85 +            settings += 1
86 +
87 +        if self.cfg_params.has_key('CMSSW.total_number_of_lumis'):
88 +            self.totalNLumis = int(self.cfg_params['CMSSW.total_number_of_lumis'])
89 +            self.limitTotalLumis = (self.totalNLumis != -1)
90 +            settings += 1
91 +
92 +        if settings != 2:
93 +            msg = 'When splitting by lumi section you must specify two and only two of:\n'
94 +            msg += '  number_of_jobs, lumis_per_job, total_number_of_lumis'
95 +            raise CrabException(msg)
96 +        if self.limitNJobs and self.limitJobLumis:
97 +            self.limitTotalLumis = True
98 +            self.totalNLumis = self.lumisPerJob * self.theNumberOfJobs
99 +
100 +        # Has the user specified runselection?
101 +        if (self.cfg_params.has_key('CMSSW.runselection')):
102 +            common.logger.info('You have specified runselection and split by lumi.')
103 +            common.logger.info('Good lumi list will be the intersection of runselection and lumimask or ADS (if any).')
104 +        return
105 +
106 +    def ComputeSubBlockSites( self, blockSites ):
107 +        """
108 +        """
109 +        sub_blockSites = {}
110 +        for k,v in blockSites.iteritems():
111 +            sites=self.blackWhiteListParser.checkWhiteList(v)
112 +            if sites : sub_blockSites[k]=v
113 +        if len(sub_blockSites) < 1:
114 +            msg = 'WARNING: the sites %s is not hosting any part of data.'%self.seWhiteList
115 +            raise CrabException(msg)
116 +        return sub_blockSites
117  
118   ########################################################################
119      def jobSplittingByEvent( self ):
# Line 57 | Line 129 | class JobSplitter:
129                self.list_of_args - File(s) job will run on (a list of lists)
130          """
131  
132 <        jobDestination=[]  
132 >        jobDestination=[]
133          self.checkUserSettings()
134          if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
135              msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
136              raise CrabException(msg)
137 <
138 <        blockSites = self.args['blockSites']
137 >
138 >        blockSites = self.args['blockSites']
139          pubdata = self.args['pubdata']
140          filesbyblock=pubdata.getFiles()
141  
# Line 77 | Line 149 | class JobSplitter:
149          self.useParent = int(self.cfg_params.get('CMSSW.use_parent',0))
150          noBboundary = int(self.cfg_params.get('CMSSW.no_block_boundary',0))
151  
152 +        if noBboundary == 1:
153 +            if self.total_number_of_events== -1:
154 +                msg = 'You are selecting no_block_boundary=1 which does not allow to set total_number_of_events=-1\n'
155 +                msg +='\tYou shoud get the number of event from DBS web interface and use it for your configuration.'
156 +                raise CrabException(msg)
157 +            if len(self.seWhiteList) == 0 or  len(self.seWhiteList.split(',')) != 1:
158 +                msg = 'You are selecting no_block_boundary=1 which requires to choose one and only one site.\n'
159 +                msg += "\tPlease set se_white_list with the site's storage element name."
160 +                raise  CrabException(msg)
161 +            blockSites = self.ComputeSubBlockSites(blockSites)
162 +
163          # ---- Handle the possible job splitting configurations ---- #
164          if (self.selectTotalNumberEvents):
165              totalEventsRequested = self.total_number_of_events
# Line 126 | Line 209 | class JobSplitter:
209          parString = ""
210          pString = ""
211          filesEventCount = 0
212 +        msg=''
213  
214          # ---- Iterate over the blocks in the dataset until ---- #
215          # ---- we've met the requested total # of events    ---- #
# Line 197 | Line 281 | class JobSplitter:
281                                      list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
282                                  msg += "Job %s can run over %s  events (last file in block).\n"%(str(jobCount+1), str(filesEventCount - jobSkipEventCount))
283                                  jobDestination.append(blockSites[block])
284 <                                msg += "Job %s Destination: %s\n"%(str(jobCount+1),str(destinationCMS(jobDestination[jobCount])))
201 <                                from ProdCommon.SiteDB.CmsSiteMapper import CmsSEMap
202 <                                cms_se = CmsSEMap()
203 <                                SEDestination = [cms_se[dest] for dest in jobDestination[jobCount]]
204 <                                msg+="\t  CMSDestination: %s "%(str(SEDestination))
284 >                                msg += "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
285                                  # fill jobs of block dictionary
286                                  jobsOfBlock[block].append(jobCount+1)
287                                  # reset counter
# Line 230 | Line 310 | class JobSplitter:
310                              list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
311                          msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
312                          jobDestination.append(blockSites[block])
313 <                        msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(destinationCMS(jobDestination[jobCount])))
313 >                        msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
314                          jobsOfBlock[block].append(jobCount+1)
315                          # reset counter
316                          jobCount = jobCount + 1
# Line 255 | Line 335 | class JobSplitter:
335                              list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
336                          msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
337                          jobDestination.append(blockSites[block])
338 <                        msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(destinationCMS(jobDestination[jobCount])))
338 >                        msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
339                          jobsOfBlock[block].append(jobCount+1)
340                          # increase counter
341                          jobCount = jobCount + 1
# Line 269 | Line 349 | class JobSplitter:
349                          pString_tmp=''
350                          if self.useParent==1:
351                              for f in parent : pString_tmp +=  f + ','
352 <                        pString =  pString_tmp
352 >                        pString =  pString_tmp
353                          parString =  file + ','
354                      pass # END if
355                  pass # END while (iterate over files in the block)
# Line 279 | Line 359 | class JobSplitter:
359          if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
360              common.logger.info("Could not run on all requested events because some blocks not hosted at allowed sites.")
361          common.logger.info(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
362 <
362 >
363          # skip check on  block with no sites  DD
364          if noBboundary == 0 : self.checkBlockNoSite(blocks,jobsOfBlock,blockSites)
365  
# Line 295 | Line 375 | class JobSplitter:
375  
376          # keep trace of block with no sites to print a warning at the end
377  
378 <    def checkBlockNoSite(self,blocks,jobsOfBlock,blockSites):  
378 >    def checkBlockNoSite(self,blocks,jobsOfBlock,blockSites):
379          # screen output
380          screenOutput = "List of jobs and available destination sites:\n\n"
381          noSiteBlock = []
# Line 303 | Line 383 | class JobSplitter:
383          allBlock = []
384  
385          blockCounter = 0
386 +        saveFblocks =''
387          for block in blocks:
388              if block in jobsOfBlock.keys() :
389                  blockCounter += 1
390                  allBlock.append( blockCounter )
391                  sites=self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],[block]),[block])
392                  screenOutput += "Block %5i: jobs %20s: sites: %s\n" % (blockCounter,spanRanges(jobsOfBlock[block]),
393 <                    ', '.join(destinationCMS(sites)))
393 >                    ', '.join(SE2CMS(sites)))
394                  if len(sites) == 0:
395                      noSiteBlock.append( spanRanges(jobsOfBlock[block]) )
396                      bloskNoSite.append( blockCounter )
397 +                else:
398 +                    saveFblocks += str(block)+'\n'
399 +        writeTXTfile(self, self.fileBlocks_FileName , saveFblocks)
400  
401          common.logger.info(screenOutput)
402          if len(noSiteBlock) > 0 and len(bloskNoSite) > 0:
# Line 322 | Line 406 | class JobSplitter:
406                  virgola = ","
407              for block in bloskNoSite:
408                  msg += ' ' + str(block) + virgola
409 <            msg += '\n               Related jobs:\n                 '
409 >            msg += '\n\t\tRelated jobs:\n                 '
410              virgola = ""
411              if len(noSiteBlock) > 1:
412                  virgola = ","
413              for range_jobs in noSiteBlock:
414                  msg += str(range_jobs) + virgola
415 <            msg += '\n               will not be submitted and this block of data can not be analyzed!\n'
415 >            msg += '\n\t\twill not be submitted and this block of data can not be analyzed!\n'
416              if self.cfg_params.has_key('GRID.se_white_list'):
417 <                msg += 'WARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
418 <                msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
419 <                msg += 'Please check if the dataset is available at this site!)\n'
417 >                msg += '\tWARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
418 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
419 >                msg += '\tPlease check if the dataset is available at this site!)'
420              if self.cfg_params.has_key('GRID.ce_white_list'):
421 <                msg += 'WARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
422 <                msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
423 <                msg += 'Please check if the dataset is available at this site!)\n'
421 >                msg += '\tWARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
422 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
423 >                msg += '\tPlease check if the dataset is available at this site!)\n'
424  
425              common.logger.info(msg)
426  
427          if bloskNoSite == allBlock:
428 <            raise CrabException('No jobs created')
428 >            msg += 'Requested jobs cannot be Created! \n'
429 >            if self.cfg_params.has_key('GRID.se_white_list'):
430 >                msg += '\tWARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
431 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
432 >                msg += '\tPlease check if the dataset is available at this site!)'
433 >            if self.cfg_params.has_key('GRID.ce_white_list'):
434 >                msg += '\tWARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
435 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
436 >                msg += '\tPlease check if the dataset is available at this site!)\n'
437 >            raise CrabException(msg)
438  
439          return
440  
441  
442   ########################################################################
443 <    def jobSplittingByRun(self):
443 >    def jobSplittingByRun(self):
444          """
445          """
353        from sets import Set  
354        from WMCore.JobSplitting.RunBased import RunBased
355        from WMCore.DataStructs.Workflow import Workflow
356        from WMCore.DataStructs.File import File
357        from WMCore.DataStructs.Fileset import Fileset
358        from WMCore.DataStructs.Subscription import Subscription
359        from WMCore.JobSplitting.SplitterFactory import SplitterFactory
360        from WMCore.DataStructs.Run import Run
446  
447          self.checkUserSettings()
448 <        blockSites = self.args['blockSites']
448 >        blockSites = self.args['blockSites']
449          pubdata = self.args['pubdata']
450  
451          if self.selectNumberOfJobs == 0 :
452              self.theNumberOfJobs = 9999999
453          blocks = {}
454 <        runList = []
454 >        runList = []
455          thefiles = Fileset(name='FilesToSplit')
456          fileList = pubdata.getListFiles()
457          for f in fileList:
458              block = f['Block']['Name']
459 <            try:
459 >            try:
460                  f['Block']['StorageElementList'].extend(blockSites[block])
461              except:
462                  continue
# Line 379 | Line 464 | class JobSplitter:
464              [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
465              wmbsFile['block'] = block
466              runNum = f['RunsList'][0]['RunNumber']
467 <            runList.append(runNum)
467 >            runList.append(runNum)
468              myRun = Run(runNumber=runNum)
469              wmbsFile.addRun( myRun )
470              thefiles.addFile(
471                  wmbsFile
472                  )
473 <
473 >
474          work = Workflow()
475          subs = Subscription(
476          fileset = thefiles,
# Line 394 | Line 479 | class JobSplitter:
479          type = "Processing")
480          splitter = SplitterFactory()
481          jobfactory = splitter(subs)
482 <        
483 <        #loop over all runs
399 <        set = Set(runList)
482 >
483 >        #loop over all runs
484          list_of_lists = []
485          jobDestination = []
486 +        list_of_blocks = []
487          count = 0
488          for jobGroup in  jobfactory():
489              if count <  self.theNumberOfJobs:
490                  res = self.getJobInfo(jobGroup)
491 <                parString = ''
491 >                parString = ''
492                  for file in res['lfns']:
493                      parString += file + ','
494                  fullString = parString[:-1]
495 <                list_of_lists.append([fullString,str(-1),str(0)])    
495 >                list_of_lists.append([fullString,str(-1),str(0)])
496                  #need to check single file location
497 <                jobDestination.append(res['locations'])  
497 >                jobDestination.append(res['locations'])
498 >                list_of_blocks.append(res['block'])
499                  count +=1
500 <       # prepare dict output
500 >        # prepare dict output
501          dictOut = {}
502          dictOut['params']= ['InputFiles','MaxEvents','SkipEvents']
503          dictOut['args'] = list_of_lists
504          dictOut['jobDestination'] = jobDestination
505          dictOut['njobs']=count
506  
507 +        self.cacheBlocks(list_of_blocks,jobDestination)
508 +
509          return dictOut
510  
511      def getJobInfo( self,jobGroup ):
512          res = {}
513 <        lfns = []        
514 <        locations = []        
513 >        lfns = []
514 >        locations = []
515          tmp_check=0
516          for job in jobGroup.jobs:
517              for file in job.getFiles():
518 <                lfns.append(file['lfn'])
518 >                lfns.append(file['lfn'])
519                  for loc in file['locations']:
520                      if tmp_check < 1 :
521                          locations.append(loc)
522 <                tmp_check = tmp_check + 1
523 <                ### qui va messo il check per la locations
524 <        res['lfns'] = lfns
525 <        res['locations'] = locations
526 <        return res                
527 <      
522 >                        res['block']= file['block']
523 >                tmp_check = tmp_check + 1
524 >        res['lfns'] = lfns
525 >        res['locations'] = locations
526 >        return res
527 >
528   ########################################################################
529 <    def jobSplittingNoInput(self):
529 >    def prepareSplittingNoInput(self):
530          """
443        Perform job splitting based on number of event per job
531          """
445        common.logger.debug('Splitting per events')
446        self.checkUserSettings()
447        jobDestination=[]
448        if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
449            msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
450            raise CrabException(msg)
451
452        managedGenerators =self.args['managedGenerators']
453        generator = self.args['generator']
454        firstRun = self.cfg_params.get('CMSSW.first_run',None)
455
532          if (self.selectEventsPerJob):
533              common.logger.info('Required '+str(self.eventsPerJob)+' events per job ')
534          if (self.selectNumberOfJobs):
# Line 475 | Line 551 | class JobSplitter:
551              self.total_number_of_jobs = self.theNumberOfJobs
552              self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
553  
554 +
555 +    def jobSplittingNoInput(self):
556 +        """
557 +        Perform job splitting based on number of event per job
558 +        """
559 +        common.logger.debug('Splitting per events')
560 +        self.checkUserSettings()
561 +        jobDestination=[]
562 +        if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
563 +            msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
564 +            raise CrabException(msg)
565 +
566 +        managedGenerators =self.args['managedGenerators']
567 +        generator = self.args['generator']
568 +        firstLumi = self.cfg_params.get('CMSSW.first_lumi', 1)
569 +
570 +        self.prepareSplittingNoInput()
571 +
572          common.logger.debug('N jobs  '+str(self.total_number_of_jobs))
573  
574          # is there any remainder?
# Line 490 | Line 584 | class JobSplitter:
584          self.list_of_args = []
585          for i in range(self.total_number_of_jobs):
586              ## Since there is no input, any site is good
587 <            jobDestination.append([""]) #must be empty to write correctly the xml
587 >            jobDestination.append([""]) # must be empty to correctly write the XML
588              args=[]
589 <            if (firstRun):
590 <                ## pythia first run
497 <                args.append(str(firstRun)+str(i))
589 >            if (firstLumi): # Pythia first lumi
590 >                args.append(str(int(firstLumi)+i))
591              if (generator in managedGenerators):
592 <                if (generator == 'comphep' and i == 0):
592 >               args.append(generator)
593 >               if (generator == 'comphep' and i == 0):
594                      # COMPHEP is brain-dead and wants event #'s like 1,100,200,300
595                      args.append('1')
596 <                else:
596 >               else:
597                      args.append(str(i*self.eventsPerJob))
598              args.append(str(self.eventsPerJob))
599              self.list_of_args.append(args)
# Line 507 | Line 601 | class JobSplitter:
601  
602          dictOut = {}
603          dictOut['params'] = ['MaxEvents']
604 <        if (firstRun):
605 <            dictOut['params'] = ['FirstRun','MaxEvents']
606 <            if ( generator in managedGenerators ) : dictOut['params'] = ['FirstRun', 'FirstEvent', 'MaxEvents']
607 <        else:  
608 <            if (generator in managedGenerators) : dictOut['params'] = ['FirstEvent', 'MaxEvents']
604 >        if (firstLumi):
605 >            dictOut['params'] = ['FirstLumi','MaxEvents']
606 >            if (generator in managedGenerators):
607 >                dictOut['params'] = ['FirstLumi', 'Generator', 'FirstEvent', 'MaxEvents']
608 >        else:
609 >            if (generator in managedGenerators) :
610 >                dictOut['params'] = ['Generator', 'FirstEvent', 'MaxEvents']
611          dictOut['args'] = self.list_of_args
612          dictOut['jobDestination'] = jobDestination
613          dictOut['njobs']=self.total_number_of_jobs
# Line 531 | Line 627 | class JobSplitter:
627          common.logger.debug('Splitting per job')
628          common.logger.info('Required '+str(self.theNumberOfJobs)+' jobs in total ')
629  
630 <        self.total_number_of_jobs = self.theNumberOfJobs
630 > #        self.total_number_of_jobs = self.theNumberOfJobs
631 >
632 >        self.prepareSplittingNoInput()
633  
634          common.logger.debug('N jobs  '+str(self.total_number_of_jobs))
635  
636          common.logger.info(str(self.total_number_of_jobs)+' jobs can be created')
637  
638          # argument is seed number.$i
639 <        #self.list_of_args = []
639 >        self.list_of_args = []
640          for i in range(self.total_number_of_jobs):
641 +            args=[]
642              jobDestination.append([""])
643 <        #   self.list_of_args.append([str(i)])
643 >            if self.eventsPerJob != 0 :
644 >                args.append(str(self.eventsPerJob))
645 >                self.list_of_args.append(args)
646  
647         # prepare dict output
648          dictOut = {}
649 <        dictOut['args'] = [] # self.list_of_args
649 >        dictOut['params'] = ['MaxEvents']
650 >        dictOut['args'] =  self.list_of_args
651          dictOut['jobDestination'] = jobDestination
652          dictOut['njobs']=self.total_number_of_jobs
653          return dictOut
552
654  
655 <    def jobSplittingByLumi(self):
655 >
656 >    def jobSplittingByLumi(self):
657          """
658 +        Split task into jobs by Lumi section paying attention to which
659 +        lumis should be run (according to the analysis dataset).
660 +        This uses WMBS job splitting which does not split files over jobs
661 +        so the job will have AT LEAST as many lumis as requested, perhaps
662 +        more
663          """
664 <        return
664 >
665 >        common.logger.debug('Splitting by Lumi')
666 >        self.checkLumiSettings()
667 >
668 >        blockSites = self.args['blockSites']
669 >        pubdata = self.args['pubdata']
670 >
671 >        lumisPerFile  = pubdata.getLumis()
672 >
673 >        # Make the list of WMBS files for job splitter
674 >        fileList = pubdata.getListFiles()
675 >        thefiles = Fileset(name='FilesToSplit')
676 >        for jobFile in fileList:
677 >            block = jobFile['Block']['Name']
678 >            try:
679 >                jobFile['Block']['StorageElementList'].extend(blockSites[block])
680 >            except:
681 >                continue
682 >            wmbsFile = File(jobFile['LogicalFileName'])
683 >            if not  blockSites[block]:
684 >                wmbsFile['locations'].add('Nowhere')
685 >            [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
686 >            wmbsFile['block'] = block
687 >            for lumi in lumisPerFile[jobFile['LogicalFileName']]:
688 >                wmbsFile.addRun(Run(lumi[0], lumi[1]))
689 >            thefiles.addFile(wmbsFile)
690 >
691 >        # Create the factory and workflow
692 >        work = Workflow()
693 >        subs = Subscription(fileset    = thefiles,    workflow = work,
694 >                            split_algo = 'LumiBased', type     = "Processing")
695 >        splitter = SplitterFactory()
696 >        jobFactory = splitter(subs)
697 >
698 >        list_of_lists = []
699 >        jobDestination = []
700 >        jobCount = 0
701 >        lumisCreated = 0
702 >        list_of_blocks = []
703 >        if not self.limitJobLumis:
704 >            self.lumisPerJob = pubdata.getMaxLumis() // self.theNumberOfJobs + 1
705 >            common.logger.info('Each job will process about %s lumis.' %
706 >                                self.lumisPerJob)
707 >
708 >        for jobGroup in jobFactory(lumis_per_job = self.lumisPerJob):
709 >            for job in jobGroup.jobs:
710 >                if (self.limitNJobs and jobCount >= self.theNumberOfJobs):
711 >                    common.logger.info('Limit on number of jobs reached.')
712 >                    break
713 >                if (self.limitTotalLumis and lumisCreated >= self.totalNLumis):
714 >                    common.logger.info('Limit on number of lumis reached.')
715 >                    break
716 >                lumis = []
717 >                lfns  = []
718 >                locations = []
719 >                blocks = []
720 >                firstFile = True
721 >                # Collect information from all the files
722 >                for jobFile in job.getFiles():
723 >                    doFile = False
724 >                    if firstFile:  # Get locations from first file in the job
725 >                        for loc in jobFile['locations']:
726 >                            locations.append(loc)
727 >                        blocks.append(jobFile['block'])
728 >                        firstFile = False
729 >                    # Accumulate Lumis from all files
730 >                    for lumiList in jobFile['runs']:
731 >                        theRun = lumiList.run
732 >                        for theLumi in list(lumiList):
733 >                            if (not self.limitTotalLumis) or \
734 >                               (lumisCreated < self.totalNLumis):
735 >                                doFile = True
736 >                                lumisCreated += 1
737 >                                lumis.append( (theRun, theLumi) )
738 >                    if doFile:
739 >                        lfns.append(jobFile['lfn'])
740 >                fileString = ','.join(lfns)
741 >                lumiLister = LumiList(lumis = lumis)
742 >                lumiString = lumiLister.getCMSSWString()
743 >                list_of_lists.append([fileString, str(-1), str(0), lumiString])
744 >                list_of_blocks.append(blocks)
745 >                jobDestination.append(locations)
746 >                jobCount += 1
747 >                common.logger.debug('Job %s will run on %s files and %s lumis '
748 >                    % (jobCount, len(lfns), len(lumis) ))
749 >
750 >        common.logger.info('%s jobs created to run on %s lumis' %
751 >                              (jobCount, lumisCreated))
752 >
753 >        # Prepare dict output matching back to non-WMBS job creation
754 >        dictOut = {}
755 >        dictOut['params'] = ['InputFiles', 'MaxEvents', 'SkipEvents', 'Lumis']
756 >        dictOut['args'] = list_of_lists
757 >        dictOut['jobDestination'] = jobDestination
758 >        dictOut['njobs'] = jobCount
759 >
760 >        self.cacheBlocks(list_of_blocks,jobDestination)
761 >
762 >        return dictOut
763 >
764 >    def cacheBlocks(self, blocks,destinations):
765 >
766 >        saveFblocks=''
767 >        for i in range(len(blocks)):
768 >            sites=self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(destinations[i]))
769 >            if len(sites) != 0:
770 >                for block in blocks[i]:
771 >                    saveFblocks += str(block)+'\n'
772 >        writeTXTfile(self, self.fileBlocks_FileName , saveFblocks)
773 >
774      def Algos(self):
775          """
776          Define key splittingType matrix
777          """
778 <        SplitAlogs = {
779 <                     'EventBased'           : self.jobSplittingByEvent,
778 >        SplitAlogs = {
779 >                     'EventBased'           : self.jobSplittingByEvent,
780                       'RunBased'             : self.jobSplittingByRun,
781 <                     'LumiBased'            : self.jobSplittingByLumi,
782 <                     'NoInput'              : self.jobSplittingNoInput,
781 >                     'LumiBased'            : self.jobSplittingByLumi,
782 >                     'NoInput'              : self.jobSplittingNoInput,
783                       'ForScript'            : self.jobSplittingForScript
784 <                     }  
784 >                     }
785          return SplitAlogs
786  

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines