ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Splitter.py
(Generate patch)

Comparing COMP/CRAB/python/Splitter.py (file contents):
Revision 1.19 by slacapra, Wed Jun 10 11:31:33 2009 UTC vs.
Revision 1.54 by spiga, Tue Mar 6 19:22:59 2012 UTC

# Line 1 | Line 1
1 +
2 + __revision__ = "$Id$"
3 + __version__ = "$Revision$"
4 +
5   import common
6   from crab_exceptions import *
7   from crab_util import *
8 +
9 + from WMCore.DataStructs.File import File
10 + from WMCore.DataStructs.Fileset import Fileset
11 + from WMCore.DataStructs.Run import Run
12 + from WMCore.DataStructs.Subscription import Subscription
13 + from WMCore.DataStructs.Workflow import Workflow
14 + from WMCore.JobSplitting.SplitterFactory import SplitterFactory
15   from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
16 + try: # Can remove when CMSSW 3.7 and earlier are dropped
17 +    from FWCore.PythonUtilities.LumiList import LumiList
18 + except ImportError:
19 +    from LumiList import LumiList
20  
21   class JobSplitter:
22      def __init__( self, cfg_params,  args ):
23          self.cfg_params = cfg_params
24          self.args=args
25 +
26 +        self.lumisPerJob = -1
27 +        self.totalNLumis = 0
28 +        self.theNumberOfJobs = 0
29 +        self.limitNJobs = False
30 +        self.limitTotalLumis = False
31 +        self.limitJobLumis = False
32 +
33          #self.maxEvents
34          # init BlackWhiteListParser
35 <        seWhiteList = cfg_params.get('GRID.se_white_list',[])
35 >        self.seWhiteList = cfg_params.get('GRID.se_white_list',[])
36          seBlackList = cfg_params.get('GRID.se_black_list',[])
37 <        self.blackWhiteListParser = SEBlackWhiteListParser(seWhiteList, seBlackList, common.logger())
37 >        self.blackWhiteListParser = SEBlackWhiteListParser(self.seWhiteList, seBlackList, common.logger())
38 >
39 >        ## check if has been asked for a non default file to store/read analyzed fileBlocks
40 >        defaultName = common.work_space.shareDir()+'AnalyzedBlocks.txt'
41 >        self.fileBlocks_FileName = os.path.abspath(self.cfg_params.get('CMSSW.fileblocks_file',defaultName))
42  
43  
44      def checkUserSettings(self):
# Line 42 | Line 69 | class JobSplitter:
69              self.total_number_of_events = 0
70              self.selectTotalNumberEvents = 0
71  
72 +        return
73 +
74 +    def checkLumiSettings(self):
75 +        """
76 +        Check to make sure the user has specified enough information to
77 +        perform splitting by Lumis to run the job
78 +        """
79 +        settings = 0
80 +        if self.cfg_params.has_key('CMSSW.lumis_per_job'):
81 +            self.lumisPerJob =int( self.cfg_params['CMSSW.lumis_per_job'])
82 +            self.limitJobLumis = True
83 +            settings += 1
84 +
85 +        if self.cfg_params.has_key('CMSSW.number_of_jobs'):
86 +            self.theNumberOfJobs =int( self.cfg_params['CMSSW.number_of_jobs'])
87 +            self.limitNJobs = True
88 +            settings += 1
89 +
90 +        if self.cfg_params.has_key('CMSSW.total_number_of_lumis'):
91 +            self.totalNLumis = int(self.cfg_params['CMSSW.total_number_of_lumis'])
92 +            self.limitTotalLumis = (self.totalNLumis != -1)
93 +            settings += 1
94 +
95 +        if settings != 2:
96 +            msg = 'When splitting by lumi section you must specify two and only two of:\n'
97 +            msg += '  number_of_jobs, lumis_per_job, total_number_of_lumis'
98 +            raise CrabException(msg)
99 +        if self.limitNJobs and self.limitJobLumis:
100 +            self.limitTotalLumis = True
101 +            self.totalNLumis = self.lumisPerJob * self.theNumberOfJobs
102 +
103 +        # Has the user specified runselection?
104 +        if (self.cfg_params.has_key('CMSSW.runselection')):
105 +            common.logger.info('You have specified runselection and split by lumi.')
106 +            common.logger.info('Good lumi list will be the intersection of runselection and lumimask or ADS (if any).')
107 +        return
108 +
109 +    def ComputeSubBlockSites( self, blockSites ):
110 +        """
111 +        """
112 +        sub_blockSites = {}
113 +        for k,v in blockSites.iteritems():
114 +            sites=self.blackWhiteListParser.checkWhiteList(v)
115 +            if sites : sub_blockSites[k]=v
116 +        if len(sub_blockSites) < 1:
117 +            msg = 'WARNING: the sites %s is not hosting any part of data.'%self.seWhiteList
118 +            raise CrabException(msg)
119 +        return sub_blockSites
120  
121   ########################################################################
122      def jobSplittingByEvent( self ):
# Line 57 | Line 132 | class JobSplitter:
132                self.list_of_args - File(s) job will run on (a list of lists)
133          """
134  
135 <        jobDestination=[]  
135 >        jobDestination=[]
136          self.checkUserSettings()
137          if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
138              msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
139              raise CrabException(msg)
140 <
141 <        blockSites = self.args['blockSites']
140 >
141 >        blockSites = self.args['blockSites']
142          pubdata = self.args['pubdata']
143          filesbyblock=pubdata.getFiles()
144  
# Line 77 | Line 152 | class JobSplitter:
152          self.useParent = int(self.cfg_params.get('CMSSW.use_parent',0))
153          noBboundary = int(self.cfg_params.get('CMSSW.no_block_boundary',0))
154  
155 +        if noBboundary == 1:
156 +            if self.total_number_of_events== -1:
157 +                msg = 'You are selecting no_block_boundary=1 which does not allow to set total_number_of_events=-1\n'
158 +                msg +='\tYou shoud get the number of event from DBS web interface and use it for your configuration.'
159 +                raise CrabException(msg)
160 +            if len(self.seWhiteList) == 0 or  len(self.seWhiteList.split(',')) != 1:
161 +                msg = 'You are selecting no_block_boundary=1 which requires to choose one and only one site.\n'
162 +                msg += "\tPlease set se_white_list with the site's storage element name."
163 +                raise  CrabException(msg)
164 +            blockSites = self.ComputeSubBlockSites(blockSites)
165 +
166          # ---- Handle the possible job splitting configurations ---- #
167          if (self.selectTotalNumberEvents):
168              totalEventsRequested = self.total_number_of_events
# Line 126 | Line 212 | class JobSplitter:
212          parString = ""
213          pString = ""
214          filesEventCount = 0
215 +        msg=''
216  
217          # ---- Iterate over the blocks in the dataset until ---- #
218          # ---- we've met the requested total # of events    ---- #
# Line 192 | Line 279 | class JobSplitter:
279                                  fullString = parString[:-1]
280                                  if self.useParent==1:
281                                      fullParentString = pString[:-1]
282 <                                    list_of_lists.append([fullString,fullParentString,str(-1),str(jobSkipEventCount)])
282 >                                    list_of_lists.append([fullString,fullParentString,str(-1),str(jobSkipEventCount),block])
283                                  else:
284 <                                    list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
284 >                                    list_of_lists.append([fullString,str(-1),str(jobSkipEventCount),block])
285                                  msg += "Job %s can run over %s  events (last file in block).\n"%(str(jobCount+1), str(filesEventCount - jobSkipEventCount))
286                                  jobDestination.append(blockSites[block])
287 <                                msg += "Job %s Destination: %s\n"%(str(jobCount+1),str(destinationCMS(jobDestination[jobCount])))
201 <                                from ProdCommon.SiteDB.CmsSiteMapper import CmsSEMap
202 <                                cms_se = CmsSEMap()
203 <                                SEDestination = [cms_se[dest] for dest in jobDestination[jobCount]]
204 <                                msg+="\t  CMSDestination: %s "%(str(SEDestination))
287 >                                msg += "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
288                                  # fill jobs of block dictionary
289                                  jobsOfBlock[block].append(jobCount+1)
290                                  # reset counter
# Line 225 | Line 308 | class JobSplitter:
308                          fullString = parString[:-1]
309                          if self.useParent==1:
310                              fullParentString = pString[:-1]
311 <                            list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
311 >                            list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount),block])
312                          else:
313 <                            list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
313 >                            list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount),block])
314                          msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
315                          jobDestination.append(blockSites[block])
316 <                        msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(destinationCMS(jobDestination[jobCount])))
316 >                        msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
317                          jobsOfBlock[block].append(jobCount+1)
318                          # reset counter
319                          jobCount = jobCount + 1
# Line 250 | Line 333 | class JobSplitter:
333                          fullString = parString[:-1]
334                          if self.useParent==1:
335                              fullParentString = pString[:-1]
336 <                            list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
336 >                            list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount),block])
337                          else:
338 <                            list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
338 >                            list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount),block])
339                          msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
340                          jobDestination.append(blockSites[block])
341 <                        msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(destinationCMS(jobDestination[jobCount])))
341 >                        msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
342                          jobsOfBlock[block].append(jobCount+1)
343                          # increase counter
344                          jobCount = jobCount + 1
# Line 269 | Line 352 | class JobSplitter:
352                          pString_tmp=''
353                          if self.useParent==1:
354                              for f in parent : pString_tmp +=  f + ','
355 <                        pString =  pString_tmp
355 >                        pString =  pString_tmp
356                          parString =  file + ','
357                      pass # END if
358                  pass # END while (iterate over files in the block)
# Line 279 | Line 362 | class JobSplitter:
362          if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
363              common.logger.info("Could not run on all requested events because some blocks not hosted at allowed sites.")
364          common.logger.info(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
365 <
365 >
366          # skip check on  block with no sites  DD
367          if noBboundary == 0 : self.checkBlockNoSite(blocks,jobsOfBlock,blockSites)
368  
369         # prepare dict output
370          dictOut = {}
371 <        dictOut['params']= ['InputFiles','MaxEvents','SkipEvents']
372 <        if self.useParent: dictOut['params']= ['InputFiles','ParentFiles','MaxEvents','SkipEvents']
371 >        dictOut['params']= ['InputFiles','MaxEvents','SkipEvents','InputBlocks']
372 >        if self.useParent: dictOut['params']= ['InputFiles','ParentFiles','MaxEvents','SkipEvents','InputBlocks']
373          dictOut['args'] = list_of_lists
374          dictOut['jobDestination'] = jobDestination
375          dictOut['njobs']=self.total_number_of_jobs
# Line 295 | Line 378 | class JobSplitter:
378  
379          # keep trace of block with no sites to print a warning at the end
380  
381 <    def checkBlockNoSite(self,blocks,jobsOfBlock,blockSites):  
381 >    def checkBlockNoSite(self,blocks,jobsOfBlock,blockSites):
382          # screen output
383          screenOutput = "List of jobs and available destination sites:\n\n"
384          noSiteBlock = []
# Line 303 | Line 386 | class JobSplitter:
386          allBlock = []
387  
388          blockCounter = 0
389 +        saveFblocks =''
390          for block in blocks:
391              if block in jobsOfBlock.keys() :
392                  blockCounter += 1
393                  allBlock.append( blockCounter )
394                  sites=self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],[block]),[block])
395                  screenOutput += "Block %5i: jobs %20s: sites: %s\n" % (blockCounter,spanRanges(jobsOfBlock[block]),
396 <                    ', '.join(destinationCMS(sites)))
396 >                    ', '.join(SE2CMS(sites)))
397                  if len(sites) == 0:
398                      noSiteBlock.append( spanRanges(jobsOfBlock[block]) )
399                      bloskNoSite.append( blockCounter )
400 +                else:
401 +                    saveFblocks += str(block)+'\n'
402 +        writeTXTfile(self, self.fileBlocks_FileName , saveFblocks)
403  
404          common.logger.info(screenOutput)
405          if len(noSiteBlock) > 0 and len(bloskNoSite) > 0:
# Line 322 | Line 409 | class JobSplitter:
409                  virgola = ","
410              for block in bloskNoSite:
411                  msg += ' ' + str(block) + virgola
412 <            msg += '\n               Related jobs:\n                 '
412 >            msg += '\n\t\tRelated jobs:\n                 '
413              virgola = ""
414              if len(noSiteBlock) > 1:
415                  virgola = ","
416              for range_jobs in noSiteBlock:
417                  msg += str(range_jobs) + virgola
418 <            msg += '\n               will not be submitted and this block of data can not be analyzed!\n'
418 >            msg += '\n\t\twill not be submitted and this block of data can not be analyzed!\n'
419              if self.cfg_params.has_key('GRID.se_white_list'):
420 <                msg += 'WARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
421 <                msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
422 <                msg += 'Please check if the dataset is available at this site!)\n'
420 >                msg += '\tWARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
421 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
422 >                msg += '\tPlease check if the dataset is available at this site!)'
423              if self.cfg_params.has_key('GRID.ce_white_list'):
424 <                msg += 'WARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
425 <                msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
426 <                msg += 'Please check if the dataset is available at this site!)\n'
424 >                msg += '\tWARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
425 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
426 >                msg += '\tPlease check if the dataset is available at this site!)\n'
427  
428              common.logger.info(msg)
429  
430          if bloskNoSite == allBlock:
431 <            raise CrabException('No jobs created')
431 >            msg = 'Requested jobs cannot be Created! \n'
432 >            if self.cfg_params.has_key('GRID.se_white_list'):
433 >                msg += '\tWARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
434 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
435 >                msg += '\tPlease check if the dataset is available at this site!)'
436 >            if self.cfg_params.has_key('GRID.ce_white_list'):
437 >                msg += '\tWARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
438 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
439 >                msg += '\tPlease check if the dataset is available at this site!)\n'
440 >            raise CrabException(msg)
441  
442          return
443  
444  
445   ########################################################################
446 <    def jobSplittingByRun(self):
446 >    def jobSplittingByRun(self):
447          """
448          """
353        from sets import Set  
354        from WMCore.JobSplitting.RunBased import RunBased
355        from WMCore.DataStructs.Workflow import Workflow
356        from WMCore.DataStructs.File import File
357        from WMCore.DataStructs.Fileset import Fileset
358        from WMCore.DataStructs.Subscription import Subscription
359        from WMCore.JobSplitting.SplitterFactory import SplitterFactory
360        from WMCore.DataStructs.Run import Run
449  
450          self.checkUserSettings()
451 <        blockSites = self.args['blockSites']
451 >        blockSites = self.args['blockSites']
452          pubdata = self.args['pubdata']
453  
454          if self.selectNumberOfJobs == 0 :
455              self.theNumberOfJobs = 9999999
456          blocks = {}
457 <        runList = []
457 >        runList = []
458          thefiles = Fileset(name='FilesToSplit')
459          fileList = pubdata.getListFiles()
460          for f in fileList:
461              block = f['Block']['Name']
462 <            try:
462 >            try:
463                  f['Block']['StorageElementList'].extend(blockSites[block])
464              except:
465                  continue
466              wmbsFile = File(f['LogicalFileName'])
467 +            if not  blockSites[block]:
468 +                msg = 'WARNING: No sites are hosting any part of data for block: %s\n' %block                
469 +                msg += 'Related jobs will not be submitted and this block of data can not be analyzed'
470 +                common.logger.debug(msg)
471              [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
472              wmbsFile['block'] = block
473              runNum = f['RunsList'][0]['RunNumber']
474 <            runList.append(runNum)
474 >            runList.append(runNum)
475              myRun = Run(runNumber=runNum)
476              wmbsFile.addRun( myRun )
477              thefiles.addFile(
478                  wmbsFile
479                  )
480 <
480 >
481          work = Workflow()
482          subs = Subscription(
483          fileset = thefiles,
# Line 394 | Line 486 | class JobSplitter:
486          type = "Processing")
487          splitter = SplitterFactory()
488          jobfactory = splitter(subs)
489 <        
490 <        #loop over all runs
399 <        set = Set(runList)
489 >
490 >        #loop over all runs
491          list_of_lists = []
492          jobDestination = []
493 +        list_of_blocks = []
494          count = 0
495          for jobGroup in  jobfactory():
496              if count <  self.theNumberOfJobs:
497                  res = self.getJobInfo(jobGroup)
498 <                parString = ''
498 >                parString = ''
499                  for file in res['lfns']:
500                      parString += file + ','
501 +                list_of_blocks.append(res['block'])
502                  fullString = parString[:-1]
503 <                list_of_lists.append([fullString,str(-1),str(0)])    
503 >                blockString=','.join(list_of_blocks)
504 >                list_of_lists.append([fullString,str(-1),str(0),blockString])
505                  #need to check single file location
506 <                jobDestination.append(res['locations'])  
506 >                jobDestination.append(res['locations'])
507                  count +=1
508 <       # prepare dict output
508 >        # prepare dict output
509          dictOut = {}
510 <        dictOut['params']= ['InputFiles','MaxEvents','SkipEvents']
510 >        dictOut['params']= ['InputFiles','MaxEvents','SkipEvents','InputBlocks']
511          dictOut['args'] = list_of_lists
512          dictOut['jobDestination'] = jobDestination
513          dictOut['njobs']=count
514 +        self.cacheBlocks(list_of_blocks,jobDestination)
515  
516          return dictOut
517  
518      def getJobInfo( self,jobGroup ):
519          res = {}
520 <        lfns = []        
521 <        locations = []        
520 >        lfns = []
521 >        locations = []
522          tmp_check=0
523          for job in jobGroup.jobs:
524              for file in job.getFiles():
525 <                lfns.append(file['lfn'])
525 >                lfns.append(file['lfn'])
526                  for loc in file['locations']:
527                      if tmp_check < 1 :
528                          locations.append(loc)
529 <                tmp_check = tmp_check + 1
530 <                ### qui va messo il check per la locations
531 <        res['lfns'] = lfns
532 <        res['locations'] = locations
533 <        return res                
534 <      
529 >                        res['block']= file['block']
530 >                tmp_check = tmp_check + 1
531 >        res['lfns'] = lfns
532 >        res['locations'] = locations
533 >        return res
534 >
535   ########################################################################
536 <    def jobSplittingNoInput(self):
536 >    def prepareSplittingNoInput(self):
537          """
443        Perform job splitting based on number of event per job
538          """
445        common.logger.debug('Splitting per events')
446        self.checkUserSettings()
447        jobDestination=[]
448        if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
449            msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
450            raise CrabException(msg)
451
452        managedGenerators =self.args['managedGenerators']
453        generator = self.args['generator']
454        firstRun = self.cfg_params.get('CMSSW.first_run',None)
455
539          if (self.selectEventsPerJob):
540              common.logger.info('Required '+str(self.eventsPerJob)+' events per job ')
541          if (self.selectNumberOfJobs):
# Line 475 | Line 558 | class JobSplitter:
558              self.total_number_of_jobs = self.theNumberOfJobs
559              self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
560  
561 +
562 +    def jobSplittingNoInput(self):
563 +        """
564 +        Perform job splitting based on number of event per job
565 +        """
566 +        common.logger.debug('Splitting per events')
567 +        self.checkUserSettings()
568 +        jobDestination=[]
569 +        if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
570 +            msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
571 +            raise CrabException(msg)
572 +
573 +        managedGenerators =self.args['managedGenerators']
574 +        generator = self.args['generator']
575 +        firstLumi = self.cfg_params.get('CMSSW.first_lumi', 1)
576 +
577 +        self.prepareSplittingNoInput()
578 +
579          common.logger.debug('N jobs  '+str(self.total_number_of_jobs))
580  
581          # is there any remainder?
# Line 490 | Line 591 | class JobSplitter:
591          self.list_of_args = []
592          for i in range(self.total_number_of_jobs):
593              ## Since there is no input, any site is good
594 <            jobDestination.append([""]) #must be empty to write correctly the xml
594 >            jobDestination.append([""]) # must be empty to correctly write the XML
595              args=[]
596 <            if (firstRun):
597 <                ## pythia first run
497 <                args.append(str(firstRun)+str(i))
596 >            if (firstLumi): # Pythia first lumi
597 >                args.append(str(int(firstLumi)+i))
598              if (generator in managedGenerators):
599 <                if (generator == 'comphep' and i == 0):
599 >               args.append(generator)
600 >               if (generator == 'comphep' and i == 0):
601                      # COMPHEP is brain-dead and wants event #'s like 1,100,200,300
602                      args.append('1')
603 <                else:
603 >               else:
604                      args.append(str(i*self.eventsPerJob))
605              args.append(str(self.eventsPerJob))
606              self.list_of_args.append(args)
# Line 507 | Line 608 | class JobSplitter:
608  
609          dictOut = {}
610          dictOut['params'] = ['MaxEvents']
611 <        if (firstRun):
612 <            dictOut['params'] = ['FirstRun','MaxEvents']
613 <            if ( generator in managedGenerators ) : dictOut['params'] = ['FirstRun', 'FirstEvent', 'MaxEvents']
614 <        else:  
615 <            if (generator in managedGenerators) : dictOut['params'] = ['FirstEvent', 'MaxEvents']
611 >        if (firstLumi):
612 >            dictOut['params'] = ['FirstLumi','MaxEvents']
613 >            if (generator in managedGenerators):
614 >                dictOut['params'] = ['FirstLumi', 'Generator', 'FirstEvent', 'MaxEvents']
615 >        else:
616 >            if (generator in managedGenerators) :
617 >                dictOut['params'] = ['Generator', 'FirstEvent', 'MaxEvents']
618          dictOut['args'] = self.list_of_args
619          dictOut['jobDestination'] = jobDestination
620          dictOut['njobs']=self.total_number_of_jobs
# Line 531 | Line 634 | class JobSplitter:
634          common.logger.debug('Splitting per job')
635          common.logger.info('Required '+str(self.theNumberOfJobs)+' jobs in total ')
636  
637 <        self.total_number_of_jobs = self.theNumberOfJobs
637 > #        self.total_number_of_jobs = self.theNumberOfJobs
638 >
639 >        self.prepareSplittingNoInput()
640  
641          common.logger.debug('N jobs  '+str(self.total_number_of_jobs))
642  
643          common.logger.info(str(self.total_number_of_jobs)+' jobs can be created')
644  
645          # argument is seed number.$i
646 <        #self.list_of_args = []
646 >        self.list_of_args = []
647          for i in range(self.total_number_of_jobs):
648 +            args=[]
649              jobDestination.append([""])
650 <        #   self.list_of_args.append([str(i)])
650 >            if self.eventsPerJob != 0 :
651 >                args.append(str(self.eventsPerJob))
652 >                self.list_of_args.append(args)
653  
654         # prepare dict output
655          dictOut = {}
656 <        dictOut['args'] = [] # self.list_of_args
656 >        dictOut['params'] = ['MaxEvents']
657 >        dictOut['args'] =  self.list_of_args
658          dictOut['jobDestination'] = jobDestination
659          dictOut['njobs']=self.total_number_of_jobs
660          return dictOut
552
661  
662 <    def jobSplittingByLumi(self):
662 >
663 >    def jobSplittingByLumi(self):
664          """
665 +        Split task into jobs by Lumi section paying attention to which
666 +        lumis should be run (according to the analysis dataset).
667 +        This uses WMBS job splitting which does not split files over jobs
668 +        so the job will have AT LEAST as many lumis as requested, perhaps
669 +        more
670          """
671 <        return
671 >        self.useParent = int(self.cfg_params.get('CMSSW.use_parent',0))
672 >        common.logger.debug('Splitting by Lumi')
673 >        self.checkLumiSettings()
674 >
675 >        blockSites = self.args['blockSites']
676 >        pubdata = self.args['pubdata']
677 >
678 >        lumisPerFile  = pubdata.getLumis()
679 >        self.parentFiles=pubdata.getParent()
680 >        # Make the list of WMBS files for job splitter
681 >        fileList = pubdata.getListFiles()
682 >        wmFileList = []
683 >        for jobFile in fileList:
684 >            block = jobFile['Block']['Name']
685 >            try:
686 >                jobFile['Block']['StorageElementList'].extend(blockSites[block])
687 >            except:
688 >                continue
689 >            wmbsFile = File(jobFile['LogicalFileName'])
690 >            if not  blockSites[block]:
691 >                msg = 'WARNING: No sites are hosting any part of data for block: %s\n' %block
692 >                msg += 'Related jobs will not be submitted and this block of data can not be analyzed'
693 >                common.logger.debug(msg)
694 >               # wmbsFile['locations'].add('Nowhere')
695 >            [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
696 >            wmbsFile['block'] = block
697 >            for lumi in lumisPerFile[jobFile['LogicalFileName']]:
698 >                wmbsFile.addRun(Run(lumi[0], lumi[1]))
699 >            wmFileList.append(wmbsFile)
700 >
701 >        fileSet = set(wmFileList)
702 >        thefiles = Fileset(name='FilesToSplit', files = fileSet)
703 >
704 >        # Create the factory and workflow
705 >        work = Workflow()
706 >        subs = Subscription(fileset    = thefiles,    workflow = work,
707 >                            split_algo = 'LumiBased', type     = "Processing")
708 >        splitter = SplitterFactory()
709 >        jobFactory = splitter(subs)
710 >
711 >        list_of_lists = []
712 >        jobDestination = []
713 >        jobCount = 0
714 >        lumisCreated = 0
715 >        list_of_blocks = []
716 >        if not self.limitJobLumis:
717 >            if self.totalNLumis > 0:
718 >                self.lumisPerJob = max(self.totalNLumis // self.theNumberOfJobs,1)
719 >            else:
720 >                self.lumisPerJob = pubdata.getMaxLumis() // self.theNumberOfJobs + 1
721 >            common.logger.info('Each job will process about %s lumis.' %
722 >                                self.lumisPerJob)
723 >
724 >        for jobGroup in jobFactory(lumis_per_job = self.lumisPerJob):
725 >            for job in jobGroup.jobs:
726 >                if (self.limitNJobs and jobCount >= self.theNumberOfJobs):
727 >                    common.logger.info('Requested number of jobs reached.')
728 >                    break
729 >                if (self.limitTotalLumis and lumisCreated >= self.totalNLumis):
730 >                    common.logger.info('Requested number of lumis reached.')
731 >                    break
732 >                lumis = []
733 >                lfns  = []
734 >                if self.useParent==1:
735 >                 parentlfns  = []
736 >                 pString =""
737 >
738 >                locations = []
739 >                blocks = []
740 >                firstFile = True
741 >                # Collect information from all the files
742 >                for jobFile in job.getFiles():
743 >                    doFile = False
744 >                    if firstFile:  # Get locations from first file in the job
745 >                        for loc in jobFile['locations']:
746 >                            locations.append(loc)
747 >                        blocks.append(jobFile['block'])
748 >                        firstFile = False
749 >                    # Accumulate Lumis from all files
750 >                    for lumiList in jobFile['runs']:
751 >                        theRun = lumiList.run
752 >                        for theLumi in list(lumiList):
753 >                            if (not self.limitTotalLumis) or \
754 >                               (lumisCreated < self.totalNLumis):
755 >                                doFile = True
756 >                                lumisCreated += 1
757 >                                lumis.append( (theRun, theLumi) )
758 >                    if doFile:
759 >                        lfns.append(jobFile['lfn'])
760 >                        if self.useParent==1:
761 >                           parent = self.parentFiles[jobFile['lfn']]
762 >                           for p in parent :
763 >                               pString += p  + ','
764 >                fileString = ','.join(lfns)
765 >                lumiLister = LumiList(lumis = lumis)
766 >                lumiString = lumiLister.getCMSSWString()
767 >                blockString=','.join(blocks)
768 >                if self.useParent==1:
769 >                  common.logger.debug("Files: "+fileString+" with the following parents: "+pString[:-1])
770 >                  pfileString = pString[:-1]
771 >                  list_of_lists.append([fileString, pfileString, str(-1), str(0), lumiString,blockString])
772 >                else:
773 >                 list_of_lists.append([fileString, str(-1), str(0), lumiString, blockString])
774 >                list_of_blocks.append(blocks)
775 >                jobDestination.append(locations)
776 >                jobCount += 1
777 >                common.logger.debug('Job %s will run on %s files and %s lumis '
778 >                    % (jobCount, len(lfns), len(lumis) ))
779 >
780 >        common.logger.info('%s jobs created to run on %s lumis' %
781 >                              (jobCount, lumisCreated))
782 >
783 >        # Prepare dict output matching back to non-WMBS job creation
784 >        dictOut = {}
785 >        dictOut['params'] = ['InputFiles', 'MaxEvents', 'SkipEvents', 'Lumis','InputBlocks']
786 >        if self.useParent==1:
787 >         dictOut['params']= ['InputFiles','ParentFiles','MaxEvents','SkipEvents','Lumis','InputBlocks']
788 >        dictOut['args'] = list_of_lists
789 >        dictOut['jobDestination'] = jobDestination
790 >        dictOut['njobs'] = jobCount
791 >        self.cacheBlocks(list_of_blocks,jobDestination)
792 >
793 >        return dictOut
794 >
795 >    def cacheBlocks(self, blocks,destinations):
796 >
797 >        saveFblocks=''
798 >        for i in range(len(blocks)):
799 >            sites=self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(destinations[i]))
800 >            if len(sites) != 0:
801 >                for block in blocks[i]:
802 >                    saveFblocks += str(block)+'\n'
803 >        writeTXTfile(self, self.fileBlocks_FileName , saveFblocks)
804 >
805      def Algos(self):
806          """
807          Define key splittingType matrix
808          """
809 <        SplitAlogs = {
810 <                     'EventBased'           : self.jobSplittingByEvent,
809 >        SplitAlogs = {
810 >                     'EventBased'           : self.jobSplittingByEvent,
811                       'RunBased'             : self.jobSplittingByRun,
812 <                     'LumiBased'            : self.jobSplittingByLumi,
813 <                     'NoInput'              : self.jobSplittingNoInput,
812 >                     'LumiBased'            : self.jobSplittingByLumi,
813 >                     'NoInput'              : self.jobSplittingNoInput,
814                       'ForScript'            : self.jobSplittingForScript
815 <                     }  
815 >                     }
816          return SplitAlogs
817  

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines