ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Splitter.py
(Generate patch)

Comparing COMP/CRAB/python/Splitter.py (file contents):
Revision 1.9 by spiga, Fri Mar 6 17:03:08 2009 UTC vs.
Revision 1.35 by ewv, Mon Mar 22 15:29:42 2010 UTC

# Line 1 | Line 1
1 +
2 + __revision__ = "$Id$"
3 + __version__ = "$Revision$"
4 +
5   import common
2 from crab_logger import Logger
6   from crab_exceptions import *
7   from crab_util import *
8 +
9 + from WMCore.DataStructs.File import File
10 + from WMCore.DataStructs.Fileset import Fileset
11 + from WMCore.DataStructs.Run import Run
12 + from WMCore.DataStructs.Subscription import Subscription
13 + from WMCore.DataStructs.Workflow import Workflow
14 + from WMCore.JobSplitting.SplitterFactory import SplitterFactory
15   from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
16 + from LumiList import LumiList
17  
18   class JobSplitter:
19      def __init__( self, cfg_params,  args ):
20          self.cfg_params = cfg_params
21          self.args=args
22 +
23 +        self.lumisPerJob = -1
24 +        self.totalNLumis = 0
25 +        self.theNumberOfJobs = 0
26 +        self.limitNJobs = False
27 +        self.limitTotalLumis = False
28 +        self.limitJobLumis = False
29 +
30          #self.maxEvents
31          # init BlackWhiteListParser
32 <        seWhiteList = cfg_params.get('EDG.se_white_list',[])
33 <        seBlackList = cfg_params.get('EDG.se_black_list',[])
34 <        self.blackWhiteListParser = SEBlackWhiteListParser(seWhiteList, seBlackList, common.logger)
32 >        self.seWhiteList = cfg_params.get('GRID.se_white_list',[])
33 >        seBlackList = cfg_params.get('GRID.se_black_list',[])
34 >        self.blackWhiteListParser = SEBlackWhiteListParser(self.seWhiteList, seBlackList, common.logger())
35  
36  
37      def checkUserSettings(self):
# Line 44 | Line 63 | class JobSplitter:
63              self.selectTotalNumberEvents = 0
64  
65  
66 +    def checkLumiSettings(self):
67 +        """
68 +        Check to make sure the user has specified enough information to
69 +        perform splitting by Lumis to run the job
70 +        """
71 +        settings = 0
72 +        if self.cfg_params.has_key('CMSSW.lumis_per_job'):
73 +            self.lumisPerJob =int( self.cfg_params['CMSSW.lumis_per_job'])
74 +            self.limitJobLumis = True
75 +            settings += 1
76 +
77 +        if self.cfg_params.has_key('CMSSW.number_of_jobs'):
78 +            self.theNumberOfJobs =int( self.cfg_params['CMSSW.number_of_jobs'])
79 +            self.limitNJobs = True
80 +            settings += 1
81 +
82 +        if self.cfg_params.has_key('CMSSW.total_number_of_lumis'):
83 +            self.totalNLumis = int(self.cfg_params['CMSSW.total_number_of_lumis'])
84 +            self.limitTotalLumis = (self.totalNLumis != -1)
85 +            settings += 1
86 +
87 +        if settings != 2:
88 +            msg = 'When splitting by lumi section you must specify two and only two of:\n'
89 +            msg += '  number_of_jobs, lumis_per_job, total_number_of_lumis'
90 +            raise CrabException(msg)
91 +        if self.limitNJobs and self.limitJobLumis:
92 +            self.limitTotalLumis = True
93 +            self.totalNLumis = self.lumisPerJob * self.theNumberOfJobs
94 +
95 +
96 +    def ComputeSubBlockSites( self, blockSites ):
97 +        """
98 +        """
99 +        sub_blockSites = {}
100 +        for k,v in blockSites.iteritems():
101 +            sites=self.blackWhiteListParser.checkWhiteList(v)
102 +            if sites : sub_blockSites[k]=v
103 +        if len(sub_blockSites) < 1:
104 +            msg = 'WARNING: the sites %s is not hosting any part of data.'%self.seWhiteList
105 +            raise CrabException(msg)
106 +        return sub_blockSites
107 +
108   ########################################################################
109      def jobSplittingByEvent( self ):
110          """
# Line 58 | Line 119 | class JobSplitter:
119                self.list_of_args - File(s) job will run on (a list of lists)
120          """
121  
122 <        jobDestination=[]  
122 >        jobDestination=[]
123          self.checkUserSettings()
124          if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
125              msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
126              raise CrabException(msg)
127 <
128 <        blockSites = self.args['blockSites']
127 >
128 >        blockSites = self.args['blockSites']
129          pubdata = self.args['pubdata']
130          filesbyblock=pubdata.getFiles()
131  
# Line 78 | Line 139 | class JobSplitter:
139          self.useParent = int(self.cfg_params.get('CMSSW.use_parent',0))
140          noBboundary = int(self.cfg_params.get('CMSSW.no_block_boundary',0))
141  
142 +        if noBboundary == 1:
143 +            if self.total_number_of_events== -1:
144 +                msg = 'You are selecting no_block_boundary=1 which does not allow to set total_number_of_events=-1\n'
145 +                msg +='\tYou shoud get the number of event from DBS web interface and use it for your configuration.'
146 +                raise CrabException(msg)
147 +            if len(self.seWhiteList) == 0 or  len(self.seWhiteList.split(',')) != 1:
148 +                msg = 'You are selecting no_block_boundary=1 which requires to choose one and only one site.\n'
149 +                msg += "\tPlease set se_white_list with the site's storage element name."
150 +                raise  CrabException(msg)
151 +            blockSites = self.ComputeSubBlockSites(blockSites)
152 +
153          # ---- Handle the possible job splitting configurations ---- #
154          if (self.selectTotalNumberEvents):
155              totalEventsRequested = self.total_number_of_events
# Line 92 | Line 164 | class JobSplitter:
164          # If user requested more events than are in the dataset
165          elif (totalEventsRequested > self.maxEvents):
166              eventsRemaining = self.maxEvents
167 <            common.logger.message("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
167 >            common.logger.info("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
168          # If user requested less events than are in the dataset
169          else:
170              eventsRemaining = totalEventsRequested
# Line 108 | Line 180 | class JobSplitter:
180              eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs)
181  
182          if (self.selectNumberOfJobs):
183 <            common.logger.message("May not create the exact number_of_jobs requested.")
183 >            common.logger.info("May not create the exact number_of_jobs requested.")
184  
185          # old... to remove Daniele
186          totalNumberOfJobs = 999999999
# Line 125 | Line 197 | class JobSplitter:
197          jobsOfBlock = {}
198  
199          parString = ""
200 +        pString = ""
201          filesEventCount = 0
202 +        msg=''
203  
204          # ---- Iterate over the blocks in the dataset until ---- #
205          # ---- we've met the requested total # of events    ---- #
# Line 137 | Line 211 | class JobSplitter:
211  
212              if self.eventsbyblock.has_key(block) :
213                  numEventsInBlock = self.eventsbyblock[block]
214 <                common.logger.debug(5,'Events in Block File '+str(numEventsInBlock))
214 >                common.logger.debug('Events in Block File '+str(numEventsInBlock))
215  
216                  files = filesbyblock[block]
217                  numFilesInBlock = len(files)
# Line 147 | Line 221 | class JobSplitter:
221                  if noBboundary == 0: # DD
222                      # ---- New block => New job ---- #
223                      parString = ""
224 +                    pString=""
225                      # counter for number of events in files currently worked on
226                      filesEventCount = 0
227                  # flag if next while loop should touch new file
# Line 156 | Line 231 | class JobSplitter:
231  
232                  # ---- Iterate over the files in the block until we've met the requested ---- #
233                  # ---- total # of events or we've gone over all the files in this block  ---- #
234 <                pString=''
234 >                msg='\n'
235                  while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
236                      file = files[fileCount]
237                      if self.useParent==1:
238                          parent = self.parentFiles[file]
239 <                        for f in parent :
165 <                            pString += '\\\"' + f + '\\\"\,'
166 <                        common.logger.debug(6, "File "+str(file)+" has the following parents: "+str(parent))
167 <                        common.logger.write("File "+str(file)+" has the following parents: "+str(parent))
239 >                        common.logger.log(10-1, "File "+str(file)+" has the following parents: "+str(parent))
240                      if newFile :
241                          try:
242                              numEventsInFile = self.eventsbyfile[file]
243 <                            common.logger.debug(6, "File "+str(file)+" has "+str(numEventsInFile)+" events")
243 >                            common.logger.log(10-1, "File "+str(file)+" has "+str(numEventsInFile)+" events")
244                              # increase filesEventCount
245                              filesEventCount += numEventsInFile
246                              # Add file to current job
247 <                            parString += '\\\"' + file + '\\\"\,'
247 >                            parString +=  file + ','
248 >                            if self.useParent==1:
249 >                                for f in parent :
250 >                                    pString += f  + ','
251                              newFile = 0
252                          except KeyError:
253 <                            common.logger.message("File "+str(file)+" has unknown number of events: skipping")
253 >                            common.logger.info("File "+str(file)+" has unknown number of events: skipping")
254  
255                      eventsPerJobRequested = min(eventsPerJobRequested, eventsRemaining)
256                      # if less events in file remain than eventsPerJobRequested
# Line 188 | Line 263 | class JobSplitter:
263                              if ( fileCount == numFilesInBlock-1 ) :
264                                  # end job using last file, use remaining events in block
265                                  # close job and touch new file
266 <                                fullString = parString[:-2]
266 >                                fullString = parString[:-1]
267                                  if self.useParent==1:
268 <                                    fullParentString = pString[:-2]
268 >                                    fullParentString = pString[:-1]
269                                      list_of_lists.append([fullString,fullParentString,str(-1),str(jobSkipEventCount)])
270                                  else:
271                                      list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
272 <                                common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(filesEventCount - jobSkipEventCount)+" events (last file in block).")
272 >                                msg += "Job %s can run over %s  events (last file in block).\n"%(str(jobCount+1), str(filesEventCount - jobSkipEventCount))
273                                  jobDestination.append(blockSites[block])
274 <                                common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(jobDestination[jobCount]))
274 >                                msg += "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
275                                  # fill jobs of block dictionary
276                                  jobsOfBlock[block].append(jobCount+1)
277                                  # reset counter
# Line 217 | Line 292 | class JobSplitter:
292                      # if events in file equal to eventsPerJobRequested
293                      elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
294                          # close job and touch new file
295 <                        fullString = parString[:-2]
295 >                        fullString = parString[:-1]
296                          if self.useParent==1:
297 <                            fullParentString = pString[:-2]
297 >                            fullParentString = pString[:-1]
298                              list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
299                          else:
300                              list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
301 <                        common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
301 >                        msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
302                          jobDestination.append(blockSites[block])
303 <                        common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(jobDestination[jobCount]))
303 >                        msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
304                          jobsOfBlock[block].append(jobCount+1)
305                          # reset counter
306                          jobCount = jobCount + 1
# Line 242 | Line 317 | class JobSplitter:
317                      # if more events in file remain than eventsPerJobRequested
318                      else :
319                          # close job but don't touch new file
320 <                        fullString = parString[:-2]
320 >                        fullString = parString[:-1]
321                          if self.useParent==1:
322 <                            fullParentString = pString[:-2]
322 >                            fullParentString = pString[:-1]
323                              list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
324                          else:
325                              list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
326 <                        common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
326 >                        msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
327                          jobDestination.append(blockSites[block])
328 <                        common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(jobDestination[jobCount]))
328 >                        msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
329                          jobsOfBlock[block].append(jobCount+1)
330                          # increase counter
331                          jobCount = jobCount + 1
# Line 261 | Line 336 | class JobSplitter:
336                          jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
337                          # remove all but the last file
338                          filesEventCount = self.eventsbyfile[file]
339 +                        pString_tmp=''
340                          if self.useParent==1:
341 <                            for f in parent : pString += '\\\"' + f + '\\\"\,'
342 <                        parString = '\\\"' + file + '\\\"\,'
341 >                            for f in parent : pString_tmp +=  f + ','
342 >                        pString =  pString_tmp
343 >                        parString =  file + ','
344                      pass # END if
345                  pass # END while (iterate over files in the block)
346          pass # END while (iterate over blocks in the dataset)
347 +        common.logger.debug(msg)
348          self.ncjobs = self.total_number_of_jobs = jobCount
349          if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
350 <            common.logger.message("Could not run on all requested events because some blocks not hosted at allowed sites.")
351 <        common.logger.message(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
352 <
350 >            common.logger.info("Could not run on all requested events because some blocks not hosted at allowed sites.")
351 >        common.logger.info(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
352 >
353          # skip check on  block with no sites  DD
354          if noBboundary == 0 : self.checkBlockNoSite(blocks,jobsOfBlock,blockSites)
355  
356         # prepare dict output
357          dictOut = {}
358 +        dictOut['params']= ['InputFiles','MaxEvents','SkipEvents']
359 +        if self.useParent: dictOut['params']= ['InputFiles','ParentFiles','MaxEvents','SkipEvents']
360          dictOut['args'] = list_of_lists
361          dictOut['jobDestination'] = jobDestination
362          dictOut['njobs']=self.total_number_of_jobs
# Line 285 | Line 365 | class JobSplitter:
365  
366          # keep trace of block with no sites to print a warning at the end
367  
368 <    def checkBlockNoSite(self,blocks,jobsOfBlock,blockSites):  
368 >    def checkBlockNoSite(self,blocks,jobsOfBlock,blockSites):
369          # screen output
370          screenOutput = "List of jobs and available destination sites:\n\n"
371          noSiteBlock = []
372          bloskNoSite = []
373 +        allBlock = []
374  
375          blockCounter = 0
376          for block in blocks:
377              if block in jobsOfBlock.keys() :
378                  blockCounter += 1
379 +                allBlock.append( blockCounter )
380 +                sites=self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],[block]),[block])
381                  screenOutput += "Block %5i: jobs %20s: sites: %s\n" % (blockCounter,spanRanges(jobsOfBlock[block]),
382 <                    ','.join(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],[block]),[block])))
383 <                if len(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],[block]),[block])) == 0:
382 >                    ', '.join(SE2CMS(sites)))
383 >                if len(sites) == 0:
384                      noSiteBlock.append( spanRanges(jobsOfBlock[block]) )
385                      bloskNoSite.append( blockCounter )
386  
387 <        common.logger.message(screenOutput)
387 >        common.logger.info(screenOutput)
388          if len(noSiteBlock) > 0 and len(bloskNoSite) > 0:
389              msg = 'WARNING: No sites are hosting any part of data for block:\n                '
390              virgola = ""
# Line 309 | Line 392 | class JobSplitter:
392                  virgola = ","
393              for block in bloskNoSite:
394                  msg += ' ' + str(block) + virgola
395 <            msg += '\n               Related jobs:\n                 '
395 >            msg += '\n\t\tRelated jobs:\n                 '
396              virgola = ""
397              if len(noSiteBlock) > 1:
398                  virgola = ","
399              for range_jobs in noSiteBlock:
400                  msg += str(range_jobs) + virgola
401 <            msg += '\n               will not be submitted and this block of data can not be analyzed!\n'
402 <            if self.cfg_params.has_key('EDG.se_white_list'):
403 <                msg += 'WARNING: SE White List: '+self.cfg_params['EDG.se_white_list']+'\n'
404 <                msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
405 <                msg += 'Please check if the dataset is available at this site!)\n'
406 <            if self.cfg_params.has_key('EDG.ce_white_list'):
407 <                msg += 'WARNING: CE White List: '+self.cfg_params['EDG.ce_white_list']+'\n'
408 <                msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
409 <                msg += 'Please check if the dataset is available at this site!)\n'
410 <
411 <            common.logger.message(msg)
401 >            msg += '\n\t\twill not be submitted and this block of data can not be analyzed!\n'
402 >            if self.cfg_params.has_key('GRID.se_white_list'):
403 >                msg += '\tWARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
404 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
405 >                msg += '\tPlease check if the dataset is available at this site!)'
406 >            if self.cfg_params.has_key('GRID.ce_white_list'):
407 >                msg += '\tWARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
408 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
409 >                msg += '\tPlease check if the dataset is available at this site!)\n'
410 >
411 >            common.logger.info(msg)
412 >
413 >        if bloskNoSite == allBlock:
414 >            msg += 'Requested jobs cannot be Created! \n'
415 >            if self.cfg_params.has_key('GRID.se_white_list'):
416 >                msg += '\tWARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
417 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
418 >                msg += '\tPlease check if the dataset is available at this site!)'
419 >            if self.cfg_params.has_key('GRID.ce_white_list'):
420 >                msg += '\tWARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
421 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
422 >                msg += '\tPlease check if the dataset is available at this site!)\n'
423 >            raise CrabException(msg)
424  
425          return
426  
427  
428   ########################################################################
429 <    def jobSplittingByRun(self):
429 >    def jobSplittingByRun(self):
430          """
431          """
337        from sets import Set  
338        from WMCore.JobSplitting.RunBased import RunBased
339        from WMCore.DataStructs.Workflow import Workflow
340        from WMCore.DataStructs.File import File
341        from WMCore.DataStructs.Fileset import Fileset
342        from WMCore.DataStructs.Subscription import Subscription
343        from WMCore.JobSplitting.SplitterFactory import SplitterFactory
344        from WMCore.DataStructs.Run import Run
432  
433          self.checkUserSettings()
434 <        blockSites = self.args['blockSites']
434 >        blockSites = self.args['blockSites']
435          pubdata = self.args['pubdata']
436  
437          if self.selectNumberOfJobs == 0 :
438              self.theNumberOfJobs = 9999999
439          blocks = {}
440 <        runList = []
440 >        runList = []
441          thefiles = Fileset(name='FilesToSplit')
442          fileList = pubdata.getListFiles()
443          for f in fileList:
444              block = f['Block']['Name']
445 <            try:
445 >            try:
446                  f['Block']['StorageElementList'].extend(blockSites[block])
447              except:
448                  continue
# Line 363 | Line 450 | class JobSplitter:
450              [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
451              wmbsFile['block'] = block
452              runNum = f['RunsList'][0]['RunNumber']
453 <            runList.append(runNum)
453 >            runList.append(runNum)
454              myRun = Run(runNumber=runNum)
455              wmbsFile.addRun( myRun )
456              thefiles.addFile(
457                  wmbsFile
458                  )
459 <
459 >
460          work = Workflow()
461          subs = Subscription(
462          fileset = thefiles,
# Line 378 | Line 465 | class JobSplitter:
465          type = "Processing")
466          splitter = SplitterFactory()
467          jobfactory = splitter(subs)
468 <        
469 <        #loop over all runs
383 <        set = Set(runList)
468 >
469 >        #loop over all runs
470          list_of_lists = []
471          jobDestination = []
386
472          count = 0
473 <        for i in list(set):
473 >        for jobGroup in  jobfactory():
474              if count <  self.theNumberOfJobs:
475 <                res = self.getJobInfo(jobfactory())
476 <                parString = ''
475 >                res = self.getJobInfo(jobGroup)
476 >                parString = ''
477                  for file in res['lfns']:
478 <                    parString += '\\\"' + file + '\\\"\,'
479 <                fullString = parString[:-2]
480 <                list_of_lists.append([fullString,str(-1),str(0)])    
478 >                    parString += file + ','
479 >                fullString = parString[:-1]
480 >                list_of_lists.append([fullString,str(-1),str(0)])
481                  #need to check single file location
482 <                jobDestination.append(res['locations'])  
482 >                jobDestination.append(res['locations'])
483                  count +=1
484         # prepare dict output
485          dictOut = {}
486 +        dictOut['params']= ['InputFiles','MaxEvents','SkipEvents']
487          dictOut['args'] = list_of_lists
488          dictOut['jobDestination'] = jobDestination
489          dictOut['njobs']=count
# Line 406 | Line 492 | class JobSplitter:
492  
493      def getJobInfo( self,jobGroup ):
494          res = {}
495 <        lfns = []        
496 <        locations = []        
495 >        lfns = []
496 >        locations = []
497          tmp_check=0
498          for job in jobGroup.jobs:
499              for file in job.getFiles():
500 <                lfns.append(file['lfn'])
500 >                lfns.append(file['lfn'])
501                  for loc in file['locations']:
502                      if tmp_check < 1 :
503                          locations.append(loc)
504 <                tmp_check = tmp_check + 1
505 <                ### qui va messo il check per la locations
506 <        res['lfns'] = lfns
507 <        res['locations'] = locations
508 <        return res                
509 <      
504 >                tmp_check = tmp_check + 1
505 >                ### qui va messo il check per la locations
506 >        res['lfns'] = lfns
507 >        res['locations'] = locations
508 >        return res
509 >
510   ########################################################################
511 <    def jobSplittingNoInput(self):
511 >    def prepareSplittingNoInput(self):
512          """
427        Perform job splitting based on number of event per job
513          """
429        common.logger.debug(5,'Splitting per events')
430        self.checkUserSettings()
431        jobDestination=[]
432        if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
433            msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
434            raise CrabException(msg)
435
436        managedGenerators =self.args['managedGenerators']
437        generator = self.args['generator']
438        firstRun = self.cfg_params.get('CMSSW.first_run',None)
439
514          if (self.selectEventsPerJob):
515 <            common.logger.message('Required '+str(self.eventsPerJob)+' events per job ')
515 >            common.logger.info('Required '+str(self.eventsPerJob)+' events per job ')
516          if (self.selectNumberOfJobs):
517 <            common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
517 >            common.logger.info('Required '+str(self.theNumberOfJobs)+' jobs in total ')
518          if (self.selectTotalNumberEvents):
519 <            common.logger.message('Required '+str(self.total_number_of_events)+' events in total ')
519 >            common.logger.info('Required '+str(self.total_number_of_events)+' events in total ')
520  
521          if (self.total_number_of_events < 0):
522              msg='Cannot split jobs per Events with "-1" as total number of events'
# Line 459 | Line 533 | class JobSplitter:
533              self.total_number_of_jobs = self.theNumberOfJobs
534              self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
535  
536 <        common.logger.debug(5,'N jobs  '+str(self.total_number_of_jobs))
536 >
537 >    def jobSplittingNoInput(self):
538 >        """
539 >        Perform job splitting based on number of event per job
540 >        """
541 >        common.logger.debug('Splitting per events')
542 >        self.checkUserSettings()
543 >        jobDestination=[]
544 >        if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
545 >            msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
546 >            raise CrabException(msg)
547 >
548 >        managedGenerators =self.args['managedGenerators']
549 >        generator = self.args['generator']
550 >        firstLumi = self.cfg_params.get('CMSSW.first_lumi', 1)
551 >
552 >        self.prepareSplittingNoInput()
553 >
554 >        common.logger.debug('N jobs  '+str(self.total_number_of_jobs))
555  
556          # is there any remainder?
557          check = int(self.total_number_of_events) - (int(self.total_number_of_jobs)*self.eventsPerJob)
558  
559 <        common.logger.debug(5,'Check  '+str(check))
559 >        common.logger.debug('Check  '+str(check))
560  
561 <        common.logger.message(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
561 >        common.logger.info(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
562          if check > 0:
563 <            common.logger.message('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
563 >            common.logger.info('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
564  
565          # argument is seed number.$i
566          self.list_of_args = []
567          for i in range(self.total_number_of_jobs):
568              ## Since there is no input, any site is good
569 <            jobDestination.append([""]) #must be empty to write correctly the xml
569 >            jobDestination.append([""]) # must be empty to correctly write the XML
570              args=[]
571 <            if (firstRun):
572 <                ## pythia first run
481 <                args.append(str(firstRun)+str(i))
571 >            if (firstLumi): # Pythia first lumi
572 >                args.append(str(int(firstLumi)+i))
573              if (generator in managedGenerators):
574 <                if (generator == 'comphep' and i == 0):
574 >               args.append(generator)
575 >               if (generator == 'comphep' and i == 0):
576                      # COMPHEP is brain-dead and wants event #'s like 1,100,200,300
577                      args.append('1')
578 <                else:
578 >               else:
579                      args.append(str(i*self.eventsPerJob))
580              args.append(str(self.eventsPerJob))
581              self.list_of_args.append(args)
582         # prepare dict output
583 +
584          dictOut = {}
585 +        dictOut['params'] = ['MaxEvents']
586 +        if (firstLumi):
587 +            dictOut['params'] = ['FirstLumi','MaxEvents']
588 +            if (generator in managedGenerators):
589 +                dictOut['params'] = ['FirstLumi', 'Generator', 'FirstEvent', 'MaxEvents']
590 +        else:
591 +            if (generator in managedGenerators) :
592 +                dictOut['params'] = ['Generator', 'FirstEvent', 'MaxEvents']
593          dictOut['args'] = self.list_of_args
594          dictOut['jobDestination'] = jobDestination
595          dictOut['njobs']=self.total_number_of_jobs
# Line 505 | Line 606 | class JobSplitter:
606              msg = 'must specify  number_of_jobs.'
607              raise crabexception(msg)
608          jobDestination = []
609 <        common.logger.debug(5,'Splitting per job')
610 <        common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
609 >        common.logger.debug('Splitting per job')
610 >        common.logger.info('Required '+str(self.theNumberOfJobs)+' jobs in total ')
611  
612 <        self.total_number_of_jobs = self.theNumberOfJobs
612 > #        self.total_number_of_jobs = self.theNumberOfJobs
613  
614 <        common.logger.debug(5,'N jobs  '+str(self.total_number_of_jobs))
614 >        self.prepareSplittingNoInput()
615  
616 <        common.logger.message(str(self.total_number_of_jobs)+' jobs can be created')
616 >        common.logger.debug('N jobs  '+str(self.total_number_of_jobs))
617 >
618 >        common.logger.info(str(self.total_number_of_jobs)+' jobs can be created')
619  
620          # argument is seed number.$i
621          self.list_of_args = []
622          for i in range(self.total_number_of_jobs):
623 +            args=[]
624              jobDestination.append([""])
625 <            self.list_of_args.append([str(i)])
625 >            if self.eventsPerJob != 0 :
626 >                args.append(str(self.eventsPerJob))
627 >                self.list_of_args.append(args)
628  
629         # prepare dict output
630          dictOut = {}
631 <        dictOut['args'] = self.list_of_args
631 >        dictOut['params'] = ['MaxEvents']
632 >        dictOut['args'] =  self.list_of_args
633          dictOut['jobDestination'] = jobDestination
634          dictOut['njobs']=self.total_number_of_jobs
635          return dictOut
529
636  
637 <    def jobSplittingByLumi(self):
637 >
638 >    def jobSplittingByLumi(self):
639          """
640 +        Split task into jobs by Lumi section paying attention to which
641 +        lumis should be run (according to the analysis dataset).
642 +        This uses WMBS job splitting which does not split files over jobs
643 +        so the job will have AT LEAST as many lumis as requested, perhaps
644 +        more
645          """
646 <        return
646 >
647 >        common.logger.debug('Splitting by Lumi')
648 >        self.checkLumiSettings()
649 >
650 >        blockSites = self.args['blockSites']
651 >        pubdata = self.args['pubdata']
652 >
653 >        lumisPerFile  = pubdata.getLumis()
654 >
655 >        # Make the list of WMBS files for job splitter
656 >        fileList = pubdata.getListFiles()
657 >        thefiles = Fileset(name='FilesToSplit')
658 >        for jobFile in fileList:
659 >            block = jobFile['Block']['Name']
660 >            try:
661 >                jobFile['Block']['StorageElementList'].extend(blockSites[block])
662 >            except:
663 >                continue
664 >            wmbsFile = File(jobFile['LogicalFileName'])
665 >            [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
666 >            wmbsFile['block'] = block
667 >            for lumi in lumisPerFile[jobFile['LogicalFileName']]:
668 >                wmbsFile.addRun(Run(lumi[0], lumi[1]))
669 >            thefiles.addFile(wmbsFile)
670 >
671 >        # Create the factory and workflow
672 >        work = Workflow()
673 >        subs = Subscription(fileset    = thefiles,    workflow = work,
674 >                            split_algo = 'LumiBased', type     = "Processing")
675 >        splitter = SplitterFactory()
676 >        jobFactory = splitter(subs)
677 >
678 >        list_of_lists = []
679 >        jobDestination = []
680 >        jobCount = 0
681 >        lumisCreated = 0
682 >
683 >        if not self.limitJobLumis:
684 >            self.lumisPerJob = pubdata.getMaxLumis() // self.theNumberOfJobs + 1
685 >            common.logger.info('Each job will process about %s lumis.' %
686 >                                self.lumisPerJob)
687 >
688 >        for jobGroup in  jobFactory(lumis_per_job = self.lumisPerJob):
689 >            for job in jobGroup.jobs:
690 >                if (self.limitNJobs and jobCount >= self.theNumberOfJobs):
691 >                    common.logger.info('Limit on number of jobs reached.')
692 >                    break
693 >                if (self.limitTotalLumis and lumisCreated >= self.totalNLumis):
694 >                    common.logger.info('Limit on number of lumis reached.')
695 >                    break
696 >                lumis = []
697 >                lfns  = []
698 >                locations = []
699 >                firstFile = True
700 >                # Collect information from all the files
701 >                for jobFile in job.getFiles():
702 >                    doFile = False
703 >                    if firstFile:  # Get locations from first file in the job
704 >                        for loc in jobFile['locations']:
705 >                            locations.append(loc)
706 >                        firstFile = False
707 >                    # Accumulate Lumis from all files
708 >                    for lumiList in jobFile['runs']:
709 >                        theRun = lumiList.run
710 >                        for theLumi in list(lumiList):
711 >                            lumisCreated += 1
712 >                            if (not self.limitTotalLumis) or \
713 >                               (lumisCreated <= self.totalNLumis):
714 >                                doFile = True
715 >                                lumis.append( (theRun, theLumi) )
716 >                    if doFile:
717 >                        lfns.append(jobFile['lfn'])
718 >                fileString = ','.join(lfns)
719 >                lumiLister = LumiList(lumis = lumis)
720 >                lumiString = lumiLister.getCMSSWString()
721 >                list_of_lists.append([fileString, str(-1), str(0), lumiString])
722 >
723 >                jobDestination.append(locations)
724 >                jobCount += 1
725 >                common.logger.debug('Job %s will run on %s files and %s lumis '
726 >                    % (jobCount, len(lfns), len(lumis) ))
727 >
728 >        common.logger.info('%s jobs created to run on %s lumis' %
729 >                              (jobCount, lumisCreated))
730 >
731 >        # Prepare dict output matching back to non-WMBS job creation
732 >        dictOut = {}
733 >        dictOut['params'] = ['InputFiles', 'MaxEvents', 'SkipEvents', 'Lumis']
734 >        dictOut['args'] = list_of_lists
735 >        dictOut['jobDestination'] = jobDestination
736 >        dictOut['njobs'] = jobCount
737 >
738 >        return dictOut
739 >
740 >
741      def Algos(self):
742          """
743          Define key splittingType matrix
744          """
745 <        SplitAlogs = {
746 <                     'EventBased'           : self.jobSplittingByEvent,
745 >        SplitAlogs = {
746 >                     'EventBased'           : self.jobSplittingByEvent,
747                       'RunBased'             : self.jobSplittingByRun,
748 <                     'LumiBased'            : self.jobSplittingByLumi,
749 <                     'NoInput'              : self.jobSplittingNoInput,
748 >                     'LumiBased'            : self.jobSplittingByLumi,
749 >                     'NoInput'              : self.jobSplittingNoInput,
750                       'ForScript'            : self.jobSplittingForScript
751 <                     }  
751 >                     }
752          return SplitAlogs
753  

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines