ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Splitter.py
(Generate patch)

Comparing COMP/CRAB/python/Splitter.py (file contents):
Revision 1.13 by spiga, Tue May 26 10:23:01 2009 UTC vs.
Revision 1.35 by ewv, Mon Mar 22 15:29:42 2010 UTC

# Line 1 | Line 1
1 +
2 + __revision__ = "$Id$"
3 + __version__ = "$Revision$"
4 +
5   import common
6   from crab_exceptions import *
7   from crab_util import *
8 +
9 + from WMCore.DataStructs.File import File
10 + from WMCore.DataStructs.Fileset import Fileset
11 + from WMCore.DataStructs.Run import Run
12 + from WMCore.DataStructs.Subscription import Subscription
13 + from WMCore.DataStructs.Workflow import Workflow
14 + from WMCore.JobSplitting.SplitterFactory import SplitterFactory
15   from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
16 + from LumiList import LumiList
17  
18   class JobSplitter:
19      def __init__( self, cfg_params,  args ):
20          self.cfg_params = cfg_params
21          self.args=args
22 +
23 +        self.lumisPerJob = -1
24 +        self.totalNLumis = 0
25 +        self.theNumberOfJobs = 0
26 +        self.limitNJobs = False
27 +        self.limitTotalLumis = False
28 +        self.limitJobLumis = False
29 +
30          #self.maxEvents
31          # init BlackWhiteListParser
32 <        seWhiteList = cfg_params.get('EDG.se_white_list',[])
33 <        seBlackList = cfg_params.get('EDG.se_black_list',[])
34 <        self.blackWhiteListParser = SEBlackWhiteListParser(seWhiteList, seBlackList, common.logger)
32 >        self.seWhiteList = cfg_params.get('GRID.se_white_list',[])
33 >        seBlackList = cfg_params.get('GRID.se_black_list',[])
34 >        self.blackWhiteListParser = SEBlackWhiteListParser(self.seWhiteList, seBlackList, common.logger())
35  
36  
37      def checkUserSettings(self):
# Line 43 | Line 63 | class JobSplitter:
63              self.selectTotalNumberEvents = 0
64  
65  
66 +    def checkLumiSettings(self):
67 +        """
68 +        Check to make sure the user has specified enough information to
69 +        perform splitting by Lumis to run the job
70 +        """
71 +        settings = 0
72 +        if self.cfg_params.has_key('CMSSW.lumis_per_job'):
73 +            self.lumisPerJob =int( self.cfg_params['CMSSW.lumis_per_job'])
74 +            self.limitJobLumis = True
75 +            settings += 1
76 +
77 +        if self.cfg_params.has_key('CMSSW.number_of_jobs'):
78 +            self.theNumberOfJobs =int( self.cfg_params['CMSSW.number_of_jobs'])
79 +            self.limitNJobs = True
80 +            settings += 1
81 +
82 +        if self.cfg_params.has_key('CMSSW.total_number_of_lumis'):
83 +            self.totalNLumis = int(self.cfg_params['CMSSW.total_number_of_lumis'])
84 +            self.limitTotalLumis = (self.totalNLumis != -1)
85 +            settings += 1
86 +
87 +        if settings != 2:
88 +            msg = 'When splitting by lumi section you must specify two and only two of:\n'
89 +            msg += '  number_of_jobs, lumis_per_job, total_number_of_lumis'
90 +            raise CrabException(msg)
91 +        if self.limitNJobs and self.limitJobLumis:
92 +            self.limitTotalLumis = True
93 +            self.totalNLumis = self.lumisPerJob * self.theNumberOfJobs
94 +
95 +
96 +    def ComputeSubBlockSites( self, blockSites ):
97 +        """
98 +        """
99 +        sub_blockSites = {}
100 +        for k,v in blockSites.iteritems():
101 +            sites=self.blackWhiteListParser.checkWhiteList(v)
102 +            if sites : sub_blockSites[k]=v
103 +        if len(sub_blockSites) < 1:
104 +            msg = 'WARNING: the sites %s is not hosting any part of data.'%self.seWhiteList
105 +            raise CrabException(msg)
106 +        return sub_blockSites
107 +
108   ########################################################################
109      def jobSplittingByEvent( self ):
110          """
# Line 57 | Line 119 | class JobSplitter:
119                self.list_of_args - File(s) job will run on (a list of lists)
120          """
121  
122 <        jobDestination=[]  
122 >        jobDestination=[]
123          self.checkUserSettings()
124          if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
125              msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
126              raise CrabException(msg)
127 <
128 <        blockSites = self.args['blockSites']
127 >
128 >        blockSites = self.args['blockSites']
129          pubdata = self.args['pubdata']
130          filesbyblock=pubdata.getFiles()
131  
# Line 77 | Line 139 | class JobSplitter:
139          self.useParent = int(self.cfg_params.get('CMSSW.use_parent',0))
140          noBboundary = int(self.cfg_params.get('CMSSW.no_block_boundary',0))
141  
142 +        if noBboundary == 1:
143 +            if self.total_number_of_events== -1:
144 +                msg = 'You are selecting no_block_boundary=1 which does not allow to set total_number_of_events=-1\n'
145 +                msg +='\tYou shoud get the number of event from DBS web interface and use it for your configuration.'
146 +                raise CrabException(msg)
147 +            if len(self.seWhiteList) == 0 or  len(self.seWhiteList.split(',')) != 1:
148 +                msg = 'You are selecting no_block_boundary=1 which requires to choose one and only one site.\n'
149 +                msg += "\tPlease set se_white_list with the site's storage element name."
150 +                raise  CrabException(msg)
151 +            blockSites = self.ComputeSubBlockSites(blockSites)
152 +
153          # ---- Handle the possible job splitting configurations ---- #
154          if (self.selectTotalNumberEvents):
155              totalEventsRequested = self.total_number_of_events
# Line 124 | Line 197 | class JobSplitter:
197          jobsOfBlock = {}
198  
199          parString = ""
200 +        pString = ""
201          filesEventCount = 0
202 +        msg=''
203  
204          # ---- Iterate over the blocks in the dataset until ---- #
205          # ---- we've met the requested total # of events    ---- #
# Line 146 | Line 221 | class JobSplitter:
221                  if noBboundary == 0: # DD
222                      # ---- New block => New job ---- #
223                      parString = ""
224 +                    pString=""
225                      # counter for number of events in files currently worked on
226                      filesEventCount = 0
227                  # flag if next while loop should touch new file
# Line 155 | Line 231 | class JobSplitter:
231  
232                  # ---- Iterate over the files in the block until we've met the requested ---- #
233                  # ---- total # of events or we've gone over all the files in this block  ---- #
234 <                pString=''
234 >                msg='\n'
235                  while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
236                      file = files[fileCount]
237                      if self.useParent==1:
238                          parent = self.parentFiles[file]
163                        for f in parent :
164                            pString +=  f + ','
239                          common.logger.log(10-1, "File "+str(file)+" has the following parents: "+str(parent))
240                      if newFile :
241                          try:
# Line 171 | Line 245 | class JobSplitter:
245                              filesEventCount += numEventsInFile
246                              # Add file to current job
247                              parString +=  file + ','
248 +                            if self.useParent==1:
249 +                                for f in parent :
250 +                                    pString += f  + ','
251                              newFile = 0
252                          except KeyError:
253                              common.logger.info("File "+str(file)+" has unknown number of events: skipping")
# Line 192 | Line 269 | class JobSplitter:
269                                      list_of_lists.append([fullString,fullParentString,str(-1),str(jobSkipEventCount)])
270                                  else:
271                                      list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
272 <                                common.logger.debug("Job "+str(jobCount+1)+" can run over "+str(filesEventCount - jobSkipEventCount)+" events (last file in block).")
272 >                                msg += "Job %s can run over %s  events (last file in block).\n"%(str(jobCount+1), str(filesEventCount - jobSkipEventCount))
273                                  jobDestination.append(blockSites[block])
274 <                                common.logger.debug("Job "+str(jobCount+1)+" Destination: "+str(jobDestination[jobCount]))
274 >                                msg += "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
275                                  # fill jobs of block dictionary
276                                  jobsOfBlock[block].append(jobCount+1)
277                                  # reset counter
# Line 221 | Line 298 | class JobSplitter:
298                              list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
299                          else:
300                              list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
301 <                        common.logger.debug("Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
301 >                        msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
302                          jobDestination.append(blockSites[block])
303 <                        common.logger.debug("Job "+str(jobCount+1)+" Destination: "+str(jobDestination[jobCount]))
303 >                        msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
304                          jobsOfBlock[block].append(jobCount+1)
305                          # reset counter
306                          jobCount = jobCount + 1
# Line 246 | Line 323 | class JobSplitter:
323                              list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
324                          else:
325                              list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
326 <                        common.logger.debug("Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
326 >                        msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
327                          jobDestination.append(blockSites[block])
328 <                        common.logger.debug("Job "+str(jobCount+1)+" Destination: "+str(jobDestination[jobCount]))
328 >                        msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
329                          jobsOfBlock[block].append(jobCount+1)
330                          # increase counter
331                          jobCount = jobCount + 1
# Line 259 | Line 336 | class JobSplitter:
336                          jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
337                          # remove all but the last file
338                          filesEventCount = self.eventsbyfile[file]
339 +                        pString_tmp=''
340                          if self.useParent==1:
341 <                            for f in parent : pString +=  f + ','
341 >                            for f in parent : pString_tmp +=  f + ','
342 >                        pString =  pString_tmp
343                          parString =  file + ','
344                      pass # END if
345                  pass # END while (iterate over files in the block)
346          pass # END while (iterate over blocks in the dataset)
347 +        common.logger.debug(msg)
348          self.ncjobs = self.total_number_of_jobs = jobCount
349          if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
350              common.logger.info("Could not run on all requested events because some blocks not hosted at allowed sites.")
351          common.logger.info(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
352 <
352 >
353          # skip check on  block with no sites  DD
354          if noBboundary == 0 : self.checkBlockNoSite(blocks,jobsOfBlock,blockSites)
355  
# Line 285 | Line 365 | class JobSplitter:
365  
366          # keep trace of block with no sites to print a warning at the end
367  
368 <    def checkBlockNoSite(self,blocks,jobsOfBlock,blockSites):  
368 >    def checkBlockNoSite(self,blocks,jobsOfBlock,blockSites):
369          # screen output
370          screenOutput = "List of jobs and available destination sites:\n\n"
371          noSiteBlock = []
# Line 297 | Line 377 | class JobSplitter:
377              if block in jobsOfBlock.keys() :
378                  blockCounter += 1
379                  allBlock.append( blockCounter )
380 +                sites=self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],[block]),[block])
381                  screenOutput += "Block %5i: jobs %20s: sites: %s\n" % (blockCounter,spanRanges(jobsOfBlock[block]),
382 <                    ','.join(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],[block]),[block])))
383 <                if len(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],[block]),[block])) == 0:
382 >                    ', '.join(SE2CMS(sites)))
383 >                if len(sites) == 0:
384                      noSiteBlock.append( spanRanges(jobsOfBlock[block]) )
385                      bloskNoSite.append( blockCounter )
386  
# Line 311 | Line 392 | class JobSplitter:
392                  virgola = ","
393              for block in bloskNoSite:
394                  msg += ' ' + str(block) + virgola
395 <            msg += '\n               Related jobs:\n                 '
395 >            msg += '\n\t\tRelated jobs:\n                 '
396              virgola = ""
397              if len(noSiteBlock) > 1:
398                  virgola = ","
399              for range_jobs in noSiteBlock:
400                  msg += str(range_jobs) + virgola
401 <            msg += '\n               will not be submitted and this block of data can not be analyzed!\n'
402 <            if self.cfg_params.has_key('EDG.se_white_list'):
403 <                msg += 'WARNING: SE White List: '+self.cfg_params['EDG.se_white_list']+'\n'
404 <                msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
405 <                msg += 'Please check if the dataset is available at this site!)\n'
406 <            if self.cfg_params.has_key('EDG.ce_white_list'):
407 <                msg += 'WARNING: CE White List: '+self.cfg_params['EDG.ce_white_list']+'\n'
408 <                msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
409 <                msg += 'Please check if the dataset is available at this site!)\n'
401 >            msg += '\n\t\twill not be submitted and this block of data can not be analyzed!\n'
402 >            if self.cfg_params.has_key('GRID.se_white_list'):
403 >                msg += '\tWARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
404 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
405 >                msg += '\tPlease check if the dataset is available at this site!)'
406 >            if self.cfg_params.has_key('GRID.ce_white_list'):
407 >                msg += '\tWARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
408 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
409 >                msg += '\tPlease check if the dataset is available at this site!)\n'
410  
411              common.logger.info(msg)
412  
413          if bloskNoSite == allBlock:
414 <            raise CrabException('No jobs created')
414 >            msg += 'Requested jobs cannot be Created! \n'
415 >            if self.cfg_params.has_key('GRID.se_white_list'):
416 >                msg += '\tWARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
417 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
418 >                msg += '\tPlease check if the dataset is available at this site!)'
419 >            if self.cfg_params.has_key('GRID.ce_white_list'):
420 >                msg += '\tWARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
421 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
422 >                msg += '\tPlease check if the dataset is available at this site!)\n'
423 >            raise CrabException(msg)
424  
425          return
426  
427  
428   ########################################################################
429 <    def jobSplittingByRun(self):
429 >    def jobSplittingByRun(self):
430          """
431          """
342        from sets import Set  
343        from WMCore.JobSplitting.RunBased import RunBased
344        from WMCore.DataStructs.Workflow import Workflow
345        from WMCore.DataStructs.File import File
346        from WMCore.DataStructs.Fileset import Fileset
347        from WMCore.DataStructs.Subscription import Subscription
348        from WMCore.JobSplitting.SplitterFactory import SplitterFactory
349        from WMCore.DataStructs.Run import Run
432  
433          self.checkUserSettings()
434 <        blockSites = self.args['blockSites']
434 >        blockSites = self.args['blockSites']
435          pubdata = self.args['pubdata']
436  
437          if self.selectNumberOfJobs == 0 :
438              self.theNumberOfJobs = 9999999
439          blocks = {}
440 <        runList = []
440 >        runList = []
441          thefiles = Fileset(name='FilesToSplit')
442          fileList = pubdata.getListFiles()
443          for f in fileList:
444              block = f['Block']['Name']
445 <            try:
445 >            try:
446                  f['Block']['StorageElementList'].extend(blockSites[block])
447              except:
448                  continue
# Line 368 | Line 450 | class JobSplitter:
450              [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
451              wmbsFile['block'] = block
452              runNum = f['RunsList'][0]['RunNumber']
453 <            runList.append(runNum)
453 >            runList.append(runNum)
454              myRun = Run(runNumber=runNum)
455              wmbsFile.addRun( myRun )
456              thefiles.addFile(
457                  wmbsFile
458                  )
459 <
459 >
460          work = Workflow()
461          subs = Subscription(
462          fileset = thefiles,
# Line 383 | Line 465 | class JobSplitter:
465          type = "Processing")
466          splitter = SplitterFactory()
467          jobfactory = splitter(subs)
468 <        
469 <        #loop over all runs
388 <        set = Set(runList)
468 >
469 >        #loop over all runs
470          list_of_lists = []
471          jobDestination = []
391
472          count = 0
473 <        for i in list(set):
473 >        for jobGroup in  jobfactory():
474              if count <  self.theNumberOfJobs:
475 <                res = self.getJobInfo(jobfactory())
476 <                parString = ''
475 >                res = self.getJobInfo(jobGroup)
476 >                parString = ''
477                  for file in res['lfns']:
478                      parString += file + ','
479                  fullString = parString[:-1]
480 <                list_of_lists.append([fullString,str(-1),str(0)])    
480 >                list_of_lists.append([fullString,str(-1),str(0)])
481                  #need to check single file location
482 <                jobDestination.append(res['locations'])  
482 >                jobDestination.append(res['locations'])
483                  count +=1
484         # prepare dict output
485          dictOut = {}
# Line 412 | Line 492 | class JobSplitter:
492  
493      def getJobInfo( self,jobGroup ):
494          res = {}
495 <        lfns = []        
496 <        locations = []        
495 >        lfns = []
496 >        locations = []
497          tmp_check=0
498          for job in jobGroup.jobs:
499              for file in job.getFiles():
500 <                lfns.append(file['lfn'])
500 >                lfns.append(file['lfn'])
501                  for loc in file['locations']:
502                      if tmp_check < 1 :
503                          locations.append(loc)
504 <                tmp_check = tmp_check + 1
505 <                ### qui va messo il check per la locations
506 <        res['lfns'] = lfns
507 <        res['locations'] = locations
508 <        return res                
509 <      
504 >                tmp_check = tmp_check + 1
505 >                ### qui va messo il check per la locations
506 >        res['lfns'] = lfns
507 >        res['locations'] = locations
508 >        return res
509 >
510   ########################################################################
511 <    def jobSplittingNoInput(self):
511 >    def prepareSplittingNoInput(self):
512          """
433        Perform job splitting based on number of event per job
513          """
435        common.logger.debug('Splitting per events')
436        self.checkUserSettings()
437        jobDestination=[]
438        if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
439            msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
440            raise CrabException(msg)
441
442        managedGenerators =self.args['managedGenerators']
443        generator = self.args['generator']
444        firstRun = self.cfg_params.get('CMSSW.first_run',None)
445
514          if (self.selectEventsPerJob):
515              common.logger.info('Required '+str(self.eventsPerJob)+' events per job ')
516          if (self.selectNumberOfJobs):
# Line 465 | Line 533 | class JobSplitter:
533              self.total_number_of_jobs = self.theNumberOfJobs
534              self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
535  
536 +
537 +    def jobSplittingNoInput(self):
538 +        """
539 +        Perform job splitting based on number of event per job
540 +        """
541 +        common.logger.debug('Splitting per events')
542 +        self.checkUserSettings()
543 +        jobDestination=[]
544 +        if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
545 +            msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
546 +            raise CrabException(msg)
547 +
548 +        managedGenerators =self.args['managedGenerators']
549 +        generator = self.args['generator']
550 +        firstLumi = self.cfg_params.get('CMSSW.first_lumi', 1)
551 +
552 +        self.prepareSplittingNoInput()
553 +
554          common.logger.debug('N jobs  '+str(self.total_number_of_jobs))
555  
556          # is there any remainder?
# Line 480 | Line 566 | class JobSplitter:
566          self.list_of_args = []
567          for i in range(self.total_number_of_jobs):
568              ## Since there is no input, any site is good
569 <            jobDestination.append([""]) #must be empty to write correctly the xml
569 >            jobDestination.append([""]) # must be empty to correctly write the XML
570              args=[]
571 <            if (firstRun):
572 <                ## pythia first run
487 <                args.append(str(firstRun)+str(i))
571 >            if (firstLumi): # Pythia first lumi
572 >                args.append(str(int(firstLumi)+i))
573              if (generator in managedGenerators):
574 <                if (generator == 'comphep' and i == 0):
574 >               args.append(generator)
575 >               if (generator == 'comphep' and i == 0):
576                      # COMPHEP is brain-dead and wants event #'s like 1,100,200,300
577                      args.append('1')
578 <                else:
578 >               else:
579                      args.append(str(i*self.eventsPerJob))
580              args.append(str(self.eventsPerJob))
581              self.list_of_args.append(args)
# Line 497 | Line 583 | class JobSplitter:
583  
584          dictOut = {}
585          dictOut['params'] = ['MaxEvents']
586 <        if (firstRun):
587 <            dictOut['params'] = ['FirstRun','MaxEvents']
588 <            if ( generator in managedGenerators ) : dictOut['params'] = ['FirstRun', 'FirstEvent', 'MaxEvents']
589 <        else:  
590 <            if (generator in managedGenerators) : dictOut['params'] = ['FirstEvent', 'MaxEvents']
586 >        if (firstLumi):
587 >            dictOut['params'] = ['FirstLumi','MaxEvents']
588 >            if (generator in managedGenerators):
589 >                dictOut['params'] = ['FirstLumi', 'Generator', 'FirstEvent', 'MaxEvents']
590 >        else:
591 >            if (generator in managedGenerators) :
592 >                dictOut['params'] = ['Generator', 'FirstEvent', 'MaxEvents']
593          dictOut['args'] = self.list_of_args
594          dictOut['jobDestination'] = jobDestination
595          dictOut['njobs']=self.total_number_of_jobs
# Line 521 | Line 609 | class JobSplitter:
609          common.logger.debug('Splitting per job')
610          common.logger.info('Required '+str(self.theNumberOfJobs)+' jobs in total ')
611  
612 <        self.total_number_of_jobs = self.theNumberOfJobs
612 > #        self.total_number_of_jobs = self.theNumberOfJobs
613 >
614 >        self.prepareSplittingNoInput()
615  
616          common.logger.debug('N jobs  '+str(self.total_number_of_jobs))
617  
618          common.logger.info(str(self.total_number_of_jobs)+' jobs can be created')
619  
620          # argument is seed number.$i
621 <        #self.list_of_args = []
621 >        self.list_of_args = []
622          for i in range(self.total_number_of_jobs):
623 +            args=[]
624              jobDestination.append([""])
625 <        #   self.list_of_args.append([str(i)])
625 >            if self.eventsPerJob != 0 :
626 >                args.append(str(self.eventsPerJob))
627 >                self.list_of_args.append(args)
628  
629         # prepare dict output
630          dictOut = {}
631 <        dictOut['args'] = [] # self.list_of_args
631 >        dictOut['params'] = ['MaxEvents']
632 >        dictOut['args'] =  self.list_of_args
633          dictOut['jobDestination'] = jobDestination
634          dictOut['njobs']=self.total_number_of_jobs
635          return dictOut
542
636  
637 <    def jobSplittingByLumi(self):
637 >
638 >    def jobSplittingByLumi(self):
639          """
640 +        Split task into jobs by Lumi section paying attention to which
641 +        lumis should be run (according to the analysis dataset).
642 +        This uses WMBS job splitting which does not split files over jobs
643 +        so the job will have AT LEAST as many lumis as requested, perhaps
644 +        more
645          """
646 <        return
646 >
647 >        common.logger.debug('Splitting by Lumi')
648 >        self.checkLumiSettings()
649 >
650 >        blockSites = self.args['blockSites']
651 >        pubdata = self.args['pubdata']
652 >
653 >        lumisPerFile  = pubdata.getLumis()
654 >
655 >        # Make the list of WMBS files for job splitter
656 >        fileList = pubdata.getListFiles()
657 >        thefiles = Fileset(name='FilesToSplit')
658 >        for jobFile in fileList:
659 >            block = jobFile['Block']['Name']
660 >            try:
661 >                jobFile['Block']['StorageElementList'].extend(blockSites[block])
662 >            except:
663 >                continue
664 >            wmbsFile = File(jobFile['LogicalFileName'])
665 >            [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
666 >            wmbsFile['block'] = block
667 >            for lumi in lumisPerFile[jobFile['LogicalFileName']]:
668 >                wmbsFile.addRun(Run(lumi[0], lumi[1]))
669 >            thefiles.addFile(wmbsFile)
670 >
671 >        # Create the factory and workflow
672 >        work = Workflow()
673 >        subs = Subscription(fileset    = thefiles,    workflow = work,
674 >                            split_algo = 'LumiBased', type     = "Processing")
675 >        splitter = SplitterFactory()
676 >        jobFactory = splitter(subs)
677 >
678 >        list_of_lists = []
679 >        jobDestination = []
680 >        jobCount = 0
681 >        lumisCreated = 0
682 >
683 >        if not self.limitJobLumis:
684 >            self.lumisPerJob = pubdata.getMaxLumis() // self.theNumberOfJobs + 1
685 >            common.logger.info('Each job will process about %s lumis.' %
686 >                                self.lumisPerJob)
687 >
688 >        for jobGroup in  jobFactory(lumis_per_job = self.lumisPerJob):
689 >            for job in jobGroup.jobs:
690 >                if (self.limitNJobs and jobCount >= self.theNumberOfJobs):
691 >                    common.logger.info('Limit on number of jobs reached.')
692 >                    break
693 >                if (self.limitTotalLumis and lumisCreated >= self.totalNLumis):
694 >                    common.logger.info('Limit on number of lumis reached.')
695 >                    break
696 >                lumis = []
697 >                lfns  = []
698 >                locations = []
699 >                firstFile = True
700 >                # Collect information from all the files
701 >                for jobFile in job.getFiles():
702 >                    doFile = False
703 >                    if firstFile:  # Get locations from first file in the job
704 >                        for loc in jobFile['locations']:
705 >                            locations.append(loc)
706 >                        firstFile = False
707 >                    # Accumulate Lumis from all files
708 >                    for lumiList in jobFile['runs']:
709 >                        theRun = lumiList.run
710 >                        for theLumi in list(lumiList):
711 >                            lumisCreated += 1
712 >                            if (not self.limitTotalLumis) or \
713 >                               (lumisCreated <= self.totalNLumis):
714 >                                doFile = True
715 >                                lumis.append( (theRun, theLumi) )
716 >                    if doFile:
717 >                        lfns.append(jobFile['lfn'])
718 >                fileString = ','.join(lfns)
719 >                lumiLister = LumiList(lumis = lumis)
720 >                lumiString = lumiLister.getCMSSWString()
721 >                list_of_lists.append([fileString, str(-1), str(0), lumiString])
722 >
723 >                jobDestination.append(locations)
724 >                jobCount += 1
725 >                common.logger.debug('Job %s will run on %s files and %s lumis '
726 >                    % (jobCount, len(lfns), len(lumis) ))
727 >
728 >        common.logger.info('%s jobs created to run on %s lumis' %
729 >                              (jobCount, lumisCreated))
730 >
731 >        # Prepare dict output matching back to non-WMBS job creation
732 >        dictOut = {}
733 >        dictOut['params'] = ['InputFiles', 'MaxEvents', 'SkipEvents', 'Lumis']
734 >        dictOut['args'] = list_of_lists
735 >        dictOut['jobDestination'] = jobDestination
736 >        dictOut['njobs'] = jobCount
737 >
738 >        return dictOut
739 >
740 >
741      def Algos(self):
742          """
743          Define key splittingType matrix
744          """
745 <        SplitAlogs = {
746 <                     'EventBased'           : self.jobSplittingByEvent,
745 >        SplitAlogs = {
746 >                     'EventBased'           : self.jobSplittingByEvent,
747                       'RunBased'             : self.jobSplittingByRun,
748 <                     'LumiBased'            : self.jobSplittingByLumi,
749 <                     'NoInput'              : self.jobSplittingNoInput,
748 >                     'LumiBased'            : self.jobSplittingByLumi,
749 >                     'NoInput'              : self.jobSplittingNoInput,
750                       'ForScript'            : self.jobSplittingForScript
751 <                     }  
751 >                     }
752          return SplitAlogs
753  

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines