ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Splitter.py
(Generate patch)

Comparing COMP/CRAB/python/Splitter.py (file contents):
Revision 1.43 by ewv, Fri Jul 16 14:23:24 2010 UTC vs.
Revision 1.49 by spiga, Wed Mar 2 10:48:46 2011 UTC

# Line 279 | Line 279 | class JobSplitter:
279                                  fullString = parString[:-1]
280                                  if self.useParent==1:
281                                      fullParentString = pString[:-1]
282 <                                    list_of_lists.append([fullString,fullParentString,str(-1),str(jobSkipEventCount)])
282 >                                    list_of_lists.append([fullString,fullParentString,str(-1),str(jobSkipEventCount)],block)
283                                  else:
284 <                                    list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
284 >                                    list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)],block)
285                                  msg += "Job %s can run over %s  events (last file in block).\n"%(str(jobCount+1), str(filesEventCount - jobSkipEventCount))
286                                  jobDestination.append(blockSites[block])
287                                  msg += "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
# Line 308 | Line 308 | class JobSplitter:
308                          fullString = parString[:-1]
309                          if self.useParent==1:
310                              fullParentString = pString[:-1]
311 <                            list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
311 >                            list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)],block)
312                          else:
313 <                            list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
313 >                            list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)],block)
314                          msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
315                          jobDestination.append(blockSites[block])
316                          msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
# Line 333 | Line 333 | class JobSplitter:
333                          fullString = parString[:-1]
334                          if self.useParent==1:
335                              fullParentString = pString[:-1]
336 <                            list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
336 >                            list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)],block)
337                          else:
338 <                            list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
338 >                            list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)],block)
339                          msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
340                          jobDestination.append(blockSites[block])
341                          msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
# Line 368 | Line 368 | class JobSplitter:
368  
369         # prepare dict output
370          dictOut = {}
371 <        dictOut['params']= ['InputFiles','MaxEvents','SkipEvents']
372 <        if self.useParent: dictOut['params']= ['InputFiles','ParentFiles','MaxEvents','SkipEvents']
371 >        dictOut['params']= ['InputFiles','MaxEvents','SkipEvents','InputBlocks']
372 >        if self.useParent: dictOut['params']= ['InputFiles','ParentFiles','MaxEvents','SkipEvents','InputBlocks']
373          dictOut['args'] = list_of_lists
374          dictOut['jobDestination'] = jobDestination
375          dictOut['njobs']=self.total_number_of_jobs
# Line 494 | Line 494 | class JobSplitter:
494                  parString = ''
495                  for file in res['lfns']:
496                      parString += file + ','
497 +                list_of_blocks.append(res['block'])
498                  fullString = parString[:-1]
499 <                list_of_lists.append([fullString,str(-1),str(0)])
499 >                blockString=','.join(list_of_blocks)
500 >                list_of_lists.append([fullString,str(-1),str(0)],blockString)
501                  #need to check single file location
502                  jobDestination.append(res['locations'])
501                list_of_blocks.append(res['block'])
503                  count +=1
504          # prepare dict output
505          dictOut = {}
506 <        dictOut['params']= ['InputFiles','MaxEvents','SkipEvents']
506 >        dictOut['params']= ['InputFiles','MaxEvents','SkipEvents','InputBlocks']
507          dictOut['args'] = list_of_lists
508          dictOut['jobDestination'] = jobDestination
509          dictOut['njobs']=count
509
510          self.cacheBlocks(list_of_blocks,jobDestination)
511  
512          return dictOut
# Line 664 | Line 664 | class JobSplitter:
664          so the job will have AT LEAST as many lumis as requested, perhaps
665          more
666          """
667 <
667 >        self.useParent = int(self.cfg_params.get('CMSSW.use_parent',0))
668          common.logger.debug('Splitting by Lumi')
669          self.checkLumiSettings()
670  
# Line 672 | Line 672 | class JobSplitter:
672          pubdata = self.args['pubdata']
673  
674          lumisPerFile  = pubdata.getLumis()
675 <
675 >        self.parentFiles=pubdata.getParent()
676          # Make the list of WMBS files for job splitter
677          fileList = pubdata.getListFiles()
678 <        thefiles = Fileset(name='FilesToSplit')
678 >        wmFileList = []
679          for jobFile in fileList:
680              block = jobFile['Block']['Name']
681              try:
# Line 689 | Line 689 | class JobSplitter:
689              wmbsFile['block'] = block
690              for lumi in lumisPerFile[jobFile['LogicalFileName']]:
691                  wmbsFile.addRun(Run(lumi[0], lumi[1]))
692 <            thefiles.addFile(wmbsFile)
692 >            wmFileList.append(wmbsFile)
693 >
694 >        fileSet = set(wmFileList)
695 >        thefiles = Fileset(name='FilesToSplit', files = fileSet)
696  
697          # Create the factory and workflow
698          work = Workflow()
# Line 704 | Line 707 | class JobSplitter:
707          lumisCreated = 0
708          list_of_blocks = []
709          if not self.limitJobLumis:
710 <            self.lumisPerJob = pubdata.getMaxLumis() // self.theNumberOfJobs + 1
710 >            if self.totalNLumis > 0:
711 >                self.lumisPerJob = max(self.totalNLumis // self.theNumberOfJobs,1)
712 >            else:
713 >                self.lumisPerJob = pubdata.getMaxLumis() // self.theNumberOfJobs + 1
714              common.logger.info('Each job will process about %s lumis.' %
715                                  self.lumisPerJob)
716  
717          for jobGroup in jobFactory(lumis_per_job = self.lumisPerJob):
718              for job in jobGroup.jobs:
719                  if (self.limitNJobs and jobCount >= self.theNumberOfJobs):
720 <                    common.logger.info('Limit on number of jobs reached.')
720 >                    common.logger.info('Requested number of jobs reached.')
721                      break
722                  if (self.limitTotalLumis and lumisCreated >= self.totalNLumis):
723 <                    common.logger.info('Limit on number of lumis reached.')
723 >                    common.logger.info('Requested number of lumis reached.')
724                      break
725                  lumis = []
726                  lfns  = []
727 +                if self.useParent==1:
728 +                 parentlfns  = []
729 +                 pString =""
730 +
731                  locations = []
732                  blocks = []
733                  firstFile = True
# Line 740 | Line 750 | class JobSplitter:
750                                  lumis.append( (theRun, theLumi) )
751                      if doFile:
752                          lfns.append(jobFile['lfn'])
753 +                        if self.useParent==1:
754 +                           parent = self.parentFiles[jobFile['lfn']]
755 +                           for p in parent :
756 +                               pString += p  + ','
757                  fileString = ','.join(lfns)
758                  lumiLister = LumiList(lumis = lumis)
759                  lumiString = lumiLister.getCMSSWString()
760 <                list_of_lists.append([fileString, str(-1), str(0), lumiString])
760 >                blockString=','.join(blocks)
761 >                if self.useParent==1:
762 >                  common.logger.debug("Files: "+fileString+" with the following parents: "+pString[:-1])
763 >                  pfileString = pString[:-1]
764 >                  list_of_lists.append([fileString, pfileString, str(-1), str(0), lumiString,blockString])
765 >                else:
766 >                 list_of_lists.append([fileString, str(-1), str(0), lumiString, blockString])
767                  list_of_blocks.append(blocks)
768                  jobDestination.append(locations)
769                  jobCount += 1
# Line 755 | Line 775 | class JobSplitter:
775  
776          # Prepare dict output matching back to non-WMBS job creation
777          dictOut = {}
778 <        dictOut['params'] = ['InputFiles', 'MaxEvents', 'SkipEvents', 'Lumis']
778 >        dictOut['params'] = ['InputFiles', 'MaxEvents', 'SkipEvents', 'Lumis','InputBlocks']
779 >        if self.useParent==1:
780 >         dictOut['params']= ['InputFiles','ParentFiles','MaxEvents','SkipEvents','Lumis','InputBlocks']
781          dictOut['args'] = list_of_lists
782          dictOut['jobDestination'] = jobDestination
783          dictOut['njobs'] = jobCount
762
784          self.cacheBlocks(list_of_blocks,jobDestination)
785  
786          return dictOut

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines