ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Splitter.py
(Generate patch)

Comparing COMP/CRAB/python/Splitter.py (file contents):
Revision 1.2 by spiga, Wed Feb 4 15:09:03 2009 UTC vs.
Revision 1.4 by spiga, Mon Feb 9 18:20:20 2009 UTC

# Line 7 | Line 7 | from WMCore.SiteScreening.BlackWhiteList
7   class JobSplitter:
8      def __init__( self, cfg_params,  args ):
9          self.cfg_params = cfg_params
10 <        self.blockSites = args['blockSites']
11 <        self.pubdata = args['pubdata']
10 >        self.args=args
11          #self.maxEvents
13        self.jobDestination=[]  # Site destination(s) for each job (list of lists)
12          # init BlackWhiteListParser
13          seWhiteList = cfg_params.get('EDG.se_white_list',[])
14          seBlackList = cfg_params.get('EDG.se_black_list',[])
# Line 55 | Line 53 | class JobSplitter:
53          REQUIRES: self.selectTotalNumberEvents, self.selectEventsPerJob, self.selectNumberofJobs,
54                    self.total_number_of_events, self.eventsPerJob, self.theNumberOfJobs,
55                    self.maxEvents, self.filesbyblock
56 <        SETS: self.jobDestination - Site destination(s) for each job (a list of lists)
56 >        SETS: jobDestination - Site destination(s) for each job (a list of lists)
57                self.total_number_of_jobs - Total # of jobs
58                self.list_of_args - File(s) job will run on (a list of lists)
59          """
60  
61 +        jobDestination=[]  
62          self.checkUserSettings()
63          if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
64              msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
65              raise CrabException(msg)
66  
67 <        self.filesbyblock=self.pubdata.getFiles()
68 <
69 <        self.eventsbyblock=self.pubdata.getEventsPerBlock()
70 <        self.eventsbyfile=self.pubdata.getEventsPerFile()
71 <        self.parentFiles=self.pubdata.getParent()
67 >        blockSites = self.args['blockSites']
68 >        pubdata = self.args['pubdata']
69 >        filesbyblock=pubdata.getFiles()
70 >
71 >        self.eventsbyblock=pubdata.getEventsPerBlock()
72 >        self.eventsbyfile=pubdata.getEventsPerFile()
73 >        self.parentFiles=pubdata.getParent()
74  
75          ## get max number of events
76 <        self.maxEvents=self.pubdata.getMaxEvents()
76 >        self.maxEvents=pubdata.getMaxEvents()
77  
78          self.useParent = int(self.cfg_params.get('CMSSW.use_parent',0))
79          noBboundary = int(self.cfg_params.get('CMSSW.no_block_boundary',0))
# Line 112 | Line 113 | class JobSplitter:
113          # old... to remove Daniele
114          totalNumberOfJobs = 999999999
115  
116 <        blocks = self.blockSites.keys()
116 >        blocks = blockSites.keys()
117          blockCount = 0
118          # Backup variable in case self.maxEvents counted events in a non-included block
119          numBlocksInDataset = len(blocks)
# Line 138 | Line 139 | class JobSplitter:
139                  numEventsInBlock = self.eventsbyblock[block]
140                  common.logger.debug(5,'Events in Block File '+str(numEventsInBlock))
141  
142 <                files = self.filesbyblock[block]
142 >                files = filesbyblock[block]
143                  numFilesInBlock = len(files)
144                  if (numFilesInBlock <= 0):
145                      continue
# Line 194 | Line 195 | class JobSplitter:
195                                  else:
196                                      list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
197                                  common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(filesEventCount - jobSkipEventCount)+" events (last file in block).")
198 <                                self.jobDestination.append(self.blockSites[block])
199 <                                common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
198 >                                jobDestination.append(blockSites[block])
199 >                                common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(jobDestination[jobCount]))
200                                  # fill jobs of block dictionary
201                                  jobsOfBlock[block].append(jobCount+1)
202                                  # reset counter
# Line 223 | Line 224 | class JobSplitter:
224                          else:
225                              list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
226                          common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
227 <                        self.jobDestination.append(self.blockSites[block])
228 <                        common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
227 >                        jobDestination.append(blockSites[block])
228 >                        common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(jobDestination[jobCount]))
229                          jobsOfBlock[block].append(jobCount+1)
230                          # reset counter
231                          jobCount = jobCount + 1
# Line 248 | Line 249 | class JobSplitter:
249                          else:
250                              list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
251                          common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
252 <                        self.jobDestination.append(self.blockSites[block])
253 <                        common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
252 >                        jobDestination.append(blockSites[block])
253 >                        common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(jobDestination[jobCount]))
254                          jobsOfBlock[block].append(jobCount+1)
255                          # increase counter
256                          jobCount = jobCount + 1
# Line 277 | Line 278 | class JobSplitter:
278         # prepare dict output
279          dictOut = {}
280          dictOut['args'] = list_of_lists
281 <        dictOut['jobDestination'] = self.jobDestination
281 >        dictOut['jobDestination'] = jobDestination
282          dictOut['njobs']=self.total_number_of_jobs
283  
284          return dictOut
# Line 295 | Line 296 | class JobSplitter:
296              if block in jobsOfBlock.keys() :
297                  blockCounter += 1
298                  screenOutput += "Block %5i: jobs %20s: sites: %s\n" % (blockCounter,spanRanges(jobsOfBlock[block]),
299 <                    ','.join(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(self.blockSites[block],block),block)))
300 <                if len(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(self.blockSites[block],block),block)) == 0:
299 >                    ','.join(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],[block]),[block])))
300 >                if len(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],[block]),[block])) == 0:
301                      noSiteBlock.append( spanRanges(jobsOfBlock[block]) )
302                      bloskNoSite.append( blockCounter )
303  
# Line 343 | Line 344 | class JobSplitter:
344          from WMCore.DataStructs.Run import Run
345  
346          self.checkUserSettings()
347 +        blockSites = self.args['blockSites']
348 +        pubdata = self.args['pubdata']
349  
350          if self.selectNumberOfJobs == 0 :
351              self.theNumberOfJobs = 9999999
352          blocks = {}
353          runList = []
354          thefiles = Fileset(name='FilesToSplit')
355 <        fileList = self.pubdata.getListFiles()
355 >        fileList = pubdata.getListFiles()
356          for f in fileList:
357             # print f
358              block = f['Block']['Name']
359            #  if not blocks.has_key(block):
360            #      blocks[block] = reader.listFileBlockLocation(block)
361              try:
362 <                f['Block']['StorageElementList'].extend(self.blockSites[block])
362 >                f['Block']['StorageElementList'].extend(blockSites[block])
363              except:
364                  continue
365              wmbsFile = File(f['LogicalFileName'])
366 <            [ wmbsFile['locations'].add(x) for x in self.blockSites[block] ]
366 >            [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
367              wmbsFile['block'] = block
368              runNum = f['RunsList'][0]['RunNumber']
369              runList.append(runNum)
# Line 428 | Line 431 | class JobSplitter:
431          Perform job splitting based on number of event per job
432          """
433          common.logger.debug(5,'Splitting per events')
434 +        self.checkUserSettings()
435 +        jobDestination=[]
436 +        if (self.selectNumberOfJobs == 0):
437 +            msg = 'Must specify  number_of_jobs.'
438 +            raise CrabException(msg)
439 +
440 +        managedGenerators =self.args['managedGenerators']
441 +        generator = self.args['generator']
442 +        firstRun = self.cfg_params.get('CMSSW.first_run',None)
443  
444          if (self.selectEventsPerJob):
445              common.logger.message('Required '+str(self.eventsPerJob)+' events per job ')
# Line 466 | Line 478 | class JobSplitter:
478          self.list_of_args = []
479          for i in range(self.total_number_of_jobs):
480              ## Since there is no input, any site is good
481 <            self.jobDestination.append([""]) #must be empty to write correctly the xml
481 >            jobDestination.append([""]) #must be empty to write correctly the xml
482              args=[]
483 <            if (self.firstRun):
483 >            if (firstRun):
484                  ## pythia first run
485 <                args.append(str(self.firstRun)+str(i))
486 <            if (self.generator in self.managedGenerators):
487 <                if (self.generator == 'comphep' and i == 0):
485 >                args.append(str(firstRun)+str(i))
486 >            if (generator in managedGenerators):
487 >                if (generator == 'comphep' and i == 0):
488                      # COMPHEP is brain-dead and wants event #'s like 1,100,200,300
489                      args.append('1')
490                  else:
# Line 481 | Line 493 | class JobSplitter:
493         # prepare dict output
494          dictOut = {}
495          dictOut['args'] = self.list_of_args
496 <        dictOut['jobDestination'] = self.jobDestination
496 >        dictOut['jobDestination'] = jobDestination
497          dictOut['njobs']=self.total_number_of_jobs
498  
499          return dictOut
# Line 492 | Line 504 | class JobSplitter:
504          Perform job splitting based on number of job
505          """
506          self.checkUserSettings()
507 <        if (self.selectnumberofjobs == 0):
507 >        if (self.selectNumberOfJobs == 0):
508              msg = 'must specify  number_of_jobs.'
509              raise crabexception(msg)
510 <
510 >        jobDestination = []
511          common.logger.debug(5,'Splitting per job')
512          common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
513  
# Line 508 | Line 520 | class JobSplitter:
520          # argument is seed number.$i
521          self.list_of_args = []
522          for i in range(self.total_number_of_jobs):
523 <            self.jobDestination.append([""])
523 >            jobDestination.append([""])
524              self.list_of_args.append([str(i)])
525  
526         # prepare dict output
527          dictOut = {}
528          dictOut['args'] = self.list_of_args
529 <        dictOut['jobDestination'] = []
529 >        dictOut['jobDestination'] = jobDestination
530          dictOut['njobs']=self.total_number_of_jobs
531          return dictOut
532  

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines