ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Splitter.py
(Generate patch)

Comparing COMP/CRAB/python/Splitter.py (file contents):
Revision 1.41 by ewv, Tue Jun 29 17:46:42 2010 UTC vs.
Revision 1.56 by spiga, Wed Jul 11 13:59:21 2012 UTC

# Line 195 | Line 195 | class JobSplitter:
195          if (self.selectNumberOfJobs):
196              common.logger.info("May not create the exact number_of_jobs requested.")
197  
198 +        if (self.theNumberOfJobs < 0):
199 +            common.logger.info("ERROR: Negative number_of_jobs requested. Will result in no jobs.")
200 +            
201          # old... to remove Daniele
202          totalNumberOfJobs = 999999999
203  
# Line 279 | Line 282 | class JobSplitter:
282                                  fullString = parString[:-1]
283                                  if self.useParent==1:
284                                      fullParentString = pString[:-1]
285 <                                    list_of_lists.append([fullString,fullParentString,str(-1),str(jobSkipEventCount)])
285 >                                    list_of_lists.append([fullString,fullParentString,str(-1),str(jobSkipEventCount),block])
286                                  else:
287 <                                    list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
287 >                                    list_of_lists.append([fullString,str(-1),str(jobSkipEventCount),block])
288                                  msg += "Job %s can run over %s  events (last file in block).\n"%(str(jobCount+1), str(filesEventCount - jobSkipEventCount))
289                                  jobDestination.append(blockSites[block])
290                                  msg += "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
# Line 308 | Line 311 | class JobSplitter:
311                          fullString = parString[:-1]
312                          if self.useParent==1:
313                              fullParentString = pString[:-1]
314 <                            list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
314 >                            list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount),block])
315                          else:
316 <                            list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
316 >                            list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount),block])
317                          msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
318                          jobDestination.append(blockSites[block])
319                          msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
# Line 333 | Line 336 | class JobSplitter:
336                          fullString = parString[:-1]
337                          if self.useParent==1:
338                              fullParentString = pString[:-1]
339 <                            list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
339 >                            list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount),block])
340                          else:
341 <                            list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
341 >                            list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount),block])
342                          msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
343                          jobDestination.append(blockSites[block])
344                          msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
# Line 368 | Line 371 | class JobSplitter:
371  
372         # prepare dict output
373          dictOut = {}
374 <        dictOut['params']= ['InputFiles','MaxEvents','SkipEvents']
375 <        if self.useParent: dictOut['params']= ['InputFiles','ParentFiles','MaxEvents','SkipEvents']
374 >        dictOut['params']= ['InputFiles','MaxEvents','SkipEvents','InputBlocks']
375 >        if self.useParent: dictOut['params']= ['InputFiles','ParentFiles','MaxEvents','SkipEvents','InputBlocks']
376          dictOut['args'] = list_of_lists
377          dictOut['jobDestination'] = jobDestination
378          dictOut['njobs']=self.total_number_of_jobs
# Line 428 | Line 431 | class JobSplitter:
431              common.logger.info(msg)
432  
433          if bloskNoSite == allBlock:
434 <            msg += 'Requested jobs cannot be Created! \n'
434 >            msg = 'Requested jobs cannot be Created! \n'
435              if self.cfg_params.has_key('GRID.se_white_list'):
436                  msg += '\tWARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
437                  msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
# Line 464 | Line 467 | class JobSplitter:
467              except:
468                  continue
469              wmbsFile = File(f['LogicalFileName'])
470 +            if not  blockSites[block]:
471 +                msg = 'WARNING: No sites are hosting any part of data for block: %s\n' %block                
472 +                msg += 'Related jobs will not be submitted and this block of data can not be analyzed'
473 +                common.logger.debug(msg)
474              [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
475              wmbsFile['block'] = block
476              runNum = f['RunsList'][0]['RunNumber']
# Line 494 | Line 501 | class JobSplitter:
501                  parString = ''
502                  for file in res['lfns']:
503                      parString += file + ','
504 +                list_of_blocks.append(res['block'])
505                  fullString = parString[:-1]
506 <                list_of_lists.append([fullString,str(-1),str(0)])
506 >                blockString=','.join(list_of_blocks)
507 >                list_of_lists.append([fullString,str(-1),str(0),blockString])
508                  #need to check single file location
509                  jobDestination.append(res['locations'])
501                list_of_blocks.append(res['block'])
510                  count +=1
511          # prepare dict output
512          dictOut = {}
513 <        dictOut['params']= ['InputFiles','MaxEvents','SkipEvents']
513 >        dictOut['params']= ['InputFiles','MaxEvents','SkipEvents','InputBlocks']
514          dictOut['args'] = list_of_lists
515          dictOut['jobDestination'] = jobDestination
516          dictOut['njobs']=count
509
517          self.cacheBlocks(list_of_blocks,jobDestination)
518  
519          return dictOut
# Line 568 | Line 575 | class JobSplitter:
575  
576          managedGenerators =self.args['managedGenerators']
577          generator = self.args['generator']
578 <        firstLumi = self.cfg_params.get('CMSSW.first_lumi', 1)
578 >        firstLumi = int(self.cfg_params.get('CMSSW.first_lumi', 1))
579  
580          self.prepareSplittingNoInput()
581  
# Line 664 | Line 671 | class JobSplitter:
671          so the job will have AT LEAST as many lumis as requested, perhaps
672          more
673          """
674 <
674 >        self.useParent = int(self.cfg_params.get('CMSSW.use_parent',0))
675          common.logger.debug('Splitting by Lumi')
676          self.checkLumiSettings()
677  
# Line 672 | Line 679 | class JobSplitter:
679          pubdata = self.args['pubdata']
680  
681          lumisPerFile  = pubdata.getLumis()
682 <
682 >        self.parentFiles=pubdata.getParent()
683          # Make the list of WMBS files for job splitter
684          fileList = pubdata.getListFiles()
685 <        thefiles = Fileset(name='FilesToSplit')
685 >        wmFileList = []
686          for jobFile in fileList:
687              block = jobFile['Block']['Name']
688              try:
# Line 683 | Line 690 | class JobSplitter:
690              except:
691                  continue
692              wmbsFile = File(jobFile['LogicalFileName'])
693 +            if not  blockSites[block]:
694 +                msg = 'WARNING: No sites are hosting any part of data for block: %s\n' %block
695 +                msg += 'Related jobs will not be submitted and this block of data can not be analyzed'
696 +                common.logger.debug(msg)
697 +               # wmbsFile['locations'].add('Nowhere')
698              [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
699              wmbsFile['block'] = block
700              for lumi in lumisPerFile[jobFile['LogicalFileName']]:
701                  wmbsFile.addRun(Run(lumi[0], lumi[1]))
702 <            thefiles.addFile(wmbsFile)
702 >            wmFileList.append(wmbsFile)
703 >
704 >        fileSet = set(wmFileList)
705 >        thefiles = Fileset(name='FilesToSplit', files = fileSet)
706  
707          # Create the factory and workflow
708          work = Workflow()
# Line 702 | Line 717 | class JobSplitter:
717          lumisCreated = 0
718          list_of_blocks = []
719          if not self.limitJobLumis:
720 <            self.lumisPerJob = pubdata.getMaxLumis() // self.theNumberOfJobs + 1
720 >            if self.totalNLumis > 0:
721 >                self.lumisPerJob = max(self.totalNLumis // self.theNumberOfJobs,1)
722 >            else:
723 >                self.lumisPerJob = pubdata.getMaxLumis() // self.theNumberOfJobs + 1
724              common.logger.info('Each job will process about %s lumis.' %
725                                  self.lumisPerJob)
726  
727          for jobGroup in jobFactory(lumis_per_job = self.lumisPerJob):
728              for job in jobGroup.jobs:
729                  if (self.limitNJobs and jobCount >= self.theNumberOfJobs):
730 <                    common.logger.info('Limit on number of jobs reached.')
730 >                    common.logger.info('Requested number of jobs reached.')
731                      break
732                  if (self.limitTotalLumis and lumisCreated >= self.totalNLumis):
733 <                    common.logger.info('Limit on number of lumis reached.')
733 >                    common.logger.info('Requested number of lumis reached.')
734                      break
735                  lumis = []
736                  lfns  = []
737 +                if self.useParent==1:
738 +                 parentlfns  = []
739 +                 pString =""
740 +
741                  locations = []
742                  blocks = []
743                  firstFile = True
# Line 738 | Line 760 | class JobSplitter:
760                                  lumis.append( (theRun, theLumi) )
761                      if doFile:
762                          lfns.append(jobFile['lfn'])
763 +                        if self.useParent==1:
764 +                           parent = self.parentFiles[jobFile['lfn']]
765 +                           for p in parent :
766 +                               pString += p  + ','
767                  fileString = ','.join(lfns)
768                  lumiLister = LumiList(lumis = lumis)
769                  lumiString = lumiLister.getCMSSWString()
770 <                list_of_lists.append([fileString, str(-1), str(0), lumiString])
770 >                blockString=','.join(blocks)
771 >                if self.useParent==1:
772 >                  common.logger.debug("Files: "+fileString+" with the following parents: "+pString[:-1])
773 >                  pfileString = pString[:-1]
774 >                  list_of_lists.append([fileString, pfileString, str(-1), str(0), lumiString,blockString])
775 >                else:
776 >                 list_of_lists.append([fileString, str(-1), str(0), lumiString, blockString])
777                  list_of_blocks.append(blocks)
778                  jobDestination.append(locations)
779                  jobCount += 1
# Line 753 | Line 785 | class JobSplitter:
785  
786          # Prepare dict output matching back to non-WMBS job creation
787          dictOut = {}
788 <        dictOut['params'] = ['InputFiles', 'MaxEvents', 'SkipEvents', 'Lumis']
788 >        dictOut['params'] = ['InputFiles', 'MaxEvents', 'SkipEvents', 'Lumis','InputBlocks']
789 >        if self.useParent==1:
790 >         dictOut['params']= ['InputFiles','ParentFiles','MaxEvents','SkipEvents','Lumis','InputBlocks']
791          dictOut['args'] = list_of_lists
792          dictOut['jobDestination'] = jobDestination
793          dictOut['njobs'] = jobCount
760
794          self.cacheBlocks(list_of_blocks,jobDestination)
795  
796          return dictOut

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines