ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Splitter.py
(Generate patch)

Comparing COMP/CRAB/python/Splitter.py (file contents):
Revision 1.7 by spiga, Wed Feb 11 22:13:03 2009 UTC vs.
Revision 1.31 by spiga, Tue Dec 15 13:19:13 2009 UTC

# Line 1 | Line 1
1 +
2 + __revision__ = "$Id$"
3 + __version__ = "$Revision$"
4 +
5   import common
2 from crab_logger import Logger
6   from crab_exceptions import *
7   from crab_util import *
8 +
9 + from WMCore.DataStructs.File import File
10 + from WMCore.DataStructs.Fileset import Fileset
11 + from WMCore.DataStructs.Run import Run
12 + from WMCore.DataStructs.Subscription import Subscription
13 + from WMCore.DataStructs.Workflow import Workflow
14 + from WMCore.JobSplitting.SplitterFactory import SplitterFactory
15   from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
16  
17   class JobSplitter:
18      def __init__( self, cfg_params,  args ):
19          self.cfg_params = cfg_params
20          self.args=args
21 +
22 +        self.lumisPerJob = -1
23 +        self.totalNLumis = 0
24 +        self.theNumberOfJobs = 0
25 +        self.limitNJobs = False
26 +        self.limitTotalLumis = False
27 +        self.limitJobLumis = False
28 +
29          #self.maxEvents
30          # init BlackWhiteListParser
31 <        seWhiteList = cfg_params.get('EDG.se_white_list',[])
32 <        seBlackList = cfg_params.get('EDG.se_black_list',[])
33 <        self.blackWhiteListParser = SEBlackWhiteListParser(seWhiteList, seBlackList, common.logger)
31 >        self.seWhiteList = cfg_params.get('GRID.se_white_list',[])
32 >        seBlackList = cfg_params.get('GRID.se_black_list',[])
33 >        self.blackWhiteListParser = SEBlackWhiteListParser(self.seWhiteList, seBlackList, common.logger())
34  
35  
36      def checkUserSettings(self):
# Line 44 | Line 62 | class JobSplitter:
62              self.selectTotalNumberEvents = 0
63  
64  
65 +    def checkLumiSettings(self):
66 +        """
67 +        Check to make sure the user has specified enough information to
68 +        perform splitting by Lumis to run the job
69 +        """
70 +        settings = 0
71 +        if self.cfg_params.has_key('CMSSW.lumis_per_job'):
72 +            self.lumisPerJob =int( self.cfg_params['CMSSW.lumis_per_job'])
73 +            self.limitJobLumis = True
74 +            settings += 1
75 +
76 +        if self.cfg_params.has_key('CMSSW.number_of_jobs'):
77 +            self.theNumberOfJobs =int( self.cfg_params['CMSSW.number_of_jobs'])
78 +            self.limitNJobs = True
79 +            settings += 1
80 +
81 +        if self.cfg_params.has_key('CMSSW.total_number_of_lumis'):
82 +            self.totalNLumis = int(self.cfg_params['CMSSW.total_number_of_lumis'])
83 +            self.limitTotalLumis = (self.totalNLumis != -1)
84 +            settings += 1
85 +
86 +        if settings != 2:
87 +            msg = 'When running on analysis datasets you must specify two and only two of:\n'
88 +            msg += '  number_of_jobs, lumis_per_job, total_number_of_lumis'
89 +            raise CrabException(msg)
90 +        if self.limitNJobs and self.limitJobLumis:
91 +            self.limitTotalLumis = True
92 +            self.totalNLumis = self.lumisPerJob * self.theNumberOfJobs
93 +
94 +
95 +    def ComputeSubBlockSites( self, blockSites ):
96 +        """
97 +        """
98 +        sub_blockSites = {}
99 +        for k,v in blockSites.iteritems():
100 +            sites=self.blackWhiteListParser.checkWhiteList(v)
101 +            if sites : sub_blockSites[k]=v
102 +        if len(sub_blockSites) < 1:
103 +            msg = 'WARNING: the sites %s is not hosting any part of data.'%self.seWhiteList
104 +            raise CrabException(msg)
105 +        return sub_blockSites
106 +
107   ########################################################################
108      def jobSplittingByEvent( self ):
109          """
# Line 58 | Line 118 | class JobSplitter:
118                self.list_of_args - File(s) job will run on (a list of lists)
119          """
120  
121 <        jobDestination=[]  
121 >        jobDestination=[]
122          self.checkUserSettings()
123          if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
124              msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
125              raise CrabException(msg)
126 <
127 <        blockSites = self.args['blockSites']
126 >
127 >        blockSites = self.args['blockSites']
128          pubdata = self.args['pubdata']
129          filesbyblock=pubdata.getFiles()
130  
# Line 78 | Line 138 | class JobSplitter:
138          self.useParent = int(self.cfg_params.get('CMSSW.use_parent',0))
139          noBboundary = int(self.cfg_params.get('CMSSW.no_block_boundary',0))
140  
141 +        if noBboundary == 1:
142 +            if self.total_number_of_events== -1:
143 +                msg = 'You are selecting no_block_boundary=1 which does not allow to set total_number_of_events=-1\n'
144 +                msg +='\tYou shoud get the number of event from DBS web interface and use it for your configuration.'
145 +                raise CrabException(msg)
146 +            if len(self.seWhiteList) == 0 or  len(self.seWhiteList.split(',')) != 1:
147 +                msg = 'You are selecting no_block_boundary=1 which requires to choose one and only one site.\n'
148 +                msg += "\tPlease set se_white_list with the site's storage element name."
149 +                raise  CrabException(msg)
150 +            blockSites = self.ComputeSubBlockSites(blockSites)
151 +
152          # ---- Handle the possible job splitting configurations ---- #
153          if (self.selectTotalNumberEvents):
154              totalEventsRequested = self.total_number_of_events
# Line 92 | Line 163 | class JobSplitter:
163          # If user requested more events than are in the dataset
164          elif (totalEventsRequested > self.maxEvents):
165              eventsRemaining = self.maxEvents
166 <            common.logger.message("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
166 >            common.logger.info("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
167          # If user requested less events than are in the dataset
168          else:
169              eventsRemaining = totalEventsRequested
# Line 108 | Line 179 | class JobSplitter:
179              eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs)
180  
181          if (self.selectNumberOfJobs):
182 <            common.logger.message("May not create the exact number_of_jobs requested.")
182 >            common.logger.info("May not create the exact number_of_jobs requested.")
183  
184          # old... to remove Daniele
185          totalNumberOfJobs = 999999999
# Line 125 | Line 196 | class JobSplitter:
196          jobsOfBlock = {}
197  
198          parString = ""
199 +        pString = ""
200          filesEventCount = 0
201 +        msg=''
202  
203          # ---- Iterate over the blocks in the dataset until ---- #
204          # ---- we've met the requested total # of events    ---- #
# Line 137 | Line 210 | class JobSplitter:
210  
211              if self.eventsbyblock.has_key(block) :
212                  numEventsInBlock = self.eventsbyblock[block]
213 <                common.logger.debug(5,'Events in Block File '+str(numEventsInBlock))
213 >                common.logger.debug('Events in Block File '+str(numEventsInBlock))
214  
215                  files = filesbyblock[block]
216                  numFilesInBlock = len(files)
# Line 147 | Line 220 | class JobSplitter:
220                  if noBboundary == 0: # DD
221                      # ---- New block => New job ---- #
222                      parString = ""
223 +                    pString=""
224                      # counter for number of events in files currently worked on
225                      filesEventCount = 0
226                  # flag if next while loop should touch new file
# Line 156 | Line 230 | class JobSplitter:
230  
231                  # ---- Iterate over the files in the block until we've met the requested ---- #
232                  # ---- total # of events or we've gone over all the files in this block  ---- #
233 <                pString=''
233 >                msg='\n'
234                  while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
235                      file = files[fileCount]
236                      if self.useParent==1:
237                          parent = self.parentFiles[file]
238 <                        for f in parent :
165 <                            pString += '\\\"' + f + '\\\"\,'
166 <                        common.logger.debug(6, "File "+str(file)+" has the following parents: "+str(parent))
167 <                        common.logger.write("File "+str(file)+" has the following parents: "+str(parent))
238 >                        common.logger.log(10-1, "File "+str(file)+" has the following parents: "+str(parent))
239                      if newFile :
240                          try:
241                              numEventsInFile = self.eventsbyfile[file]
242 <                            common.logger.debug(6, "File "+str(file)+" has "+str(numEventsInFile)+" events")
242 >                            common.logger.log(10-1, "File "+str(file)+" has "+str(numEventsInFile)+" events")
243                              # increase filesEventCount
244                              filesEventCount += numEventsInFile
245                              # Add file to current job
246 <                            parString += '\\\"' + file + '\\\"\,'
246 >                            parString +=  file + ','
247 >                            if self.useParent==1:
248 >                                for f in parent :
249 >                                    pString += f  + ','
250                              newFile = 0
251                          except KeyError:
252 <                            common.logger.message("File "+str(file)+" has unknown number of events: skipping")
252 >                            common.logger.info("File "+str(file)+" has unknown number of events: skipping")
253  
254                      eventsPerJobRequested = min(eventsPerJobRequested, eventsRemaining)
255                      # if less events in file remain than eventsPerJobRequested
# Line 188 | Line 262 | class JobSplitter:
262                              if ( fileCount == numFilesInBlock-1 ) :
263                                  # end job using last file, use remaining events in block
264                                  # close job and touch new file
265 <                                fullString = parString[:-2]
265 >                                fullString = parString[:-1]
266                                  if self.useParent==1:
267 <                                    fullParentString = pString[:-2]
267 >                                    fullParentString = pString[:-1]
268                                      list_of_lists.append([fullString,fullParentString,str(-1),str(jobSkipEventCount)])
269                                  else:
270                                      list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
271 <                                common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(filesEventCount - jobSkipEventCount)+" events (last file in block).")
271 >                                msg += "Job %s can run over %s  events (last file in block).\n"%(str(jobCount+1), str(filesEventCount - jobSkipEventCount))
272                                  jobDestination.append(blockSites[block])
273 <                                common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(jobDestination[jobCount]))
273 >                                msg += "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
274                                  # fill jobs of block dictionary
275                                  jobsOfBlock[block].append(jobCount+1)
276                                  # reset counter
# Line 217 | Line 291 | class JobSplitter:
291                      # if events in file equal to eventsPerJobRequested
292                      elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
293                          # close job and touch new file
294 <                        fullString = parString[:-2]
294 >                        fullString = parString[:-1]
295                          if self.useParent==1:
296 <                            fullParentString = pString[:-2]
296 >                            fullParentString = pString[:-1]
297                              list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
298                          else:
299                              list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
300 <                        common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
300 >                        msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
301                          jobDestination.append(blockSites[block])
302 <                        common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(jobDestination[jobCount]))
302 >                        msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
303                          jobsOfBlock[block].append(jobCount+1)
304                          # reset counter
305                          jobCount = jobCount + 1
# Line 242 | Line 316 | class JobSplitter:
316                      # if more events in file remain than eventsPerJobRequested
317                      else :
318                          # close job but don't touch new file
319 <                        fullString = parString[:-2]
319 >                        fullString = parString[:-1]
320                          if self.useParent==1:
321 <                            fullParentString = pString[:-2]
321 >                            fullParentString = pString[:-1]
322                              list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
323                          else:
324                              list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
325 <                        common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
325 >                        msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
326                          jobDestination.append(blockSites[block])
327 <                        common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(jobDestination[jobCount]))
327 >                        msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
328                          jobsOfBlock[block].append(jobCount+1)
329                          # increase counter
330                          jobCount = jobCount + 1
# Line 261 | Line 335 | class JobSplitter:
335                          jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
336                          # remove all but the last file
337                          filesEventCount = self.eventsbyfile[file]
338 +                        pString_tmp=''
339                          if self.useParent==1:
340 <                            for f in parent : pString += '\\\"' + f + '\\\"\,'
341 <                        parString = '\\\"' + file + '\\\"\,'
340 >                            for f in parent : pString_tmp +=  f + ','
341 >                        pString =  pString_tmp
342 >                        parString =  file + ','
343                      pass # END if
344                  pass # END while (iterate over files in the block)
345          pass # END while (iterate over blocks in the dataset)
346 +        common.logger.debug(msg)
347          self.ncjobs = self.total_number_of_jobs = jobCount
348          if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
349 <            common.logger.message("Could not run on all requested events because some blocks not hosted at allowed sites.")
350 <        common.logger.message(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
351 <
349 >            common.logger.info("Could not run on all requested events because some blocks not hosted at allowed sites.")
350 >        common.logger.info(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
351 >
352          # skip check on  block with no sites  DD
353          if noBboundary == 0 : self.checkBlockNoSite(blocks,jobsOfBlock,blockSites)
354  
355         # prepare dict output
356          dictOut = {}
357 +        dictOut['params']= ['InputFiles','MaxEvents','SkipEvents']
358 +        if self.useParent: dictOut['params']= ['InputFiles','ParentFiles','MaxEvents','SkipEvents']
359          dictOut['args'] = list_of_lists
360          dictOut['jobDestination'] = jobDestination
361          dictOut['njobs']=self.total_number_of_jobs
# Line 285 | Line 364 | class JobSplitter:
364  
365          # keep trace of block with no sites to print a warning at the end
366  
367 <    def checkBlockNoSite(self,blocks,jobsOfBlock,blockSites):  
367 >    def checkBlockNoSite(self,blocks,jobsOfBlock,blockSites):
368          # screen output
369          screenOutput = "List of jobs and available destination sites:\n\n"
370          noSiteBlock = []
371          bloskNoSite = []
372 +        allBlock = []
373  
374          blockCounter = 0
375          for block in blocks:
376              if block in jobsOfBlock.keys() :
377                  blockCounter += 1
378 +                allBlock.append( blockCounter )
379 +                sites=self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],[block]),[block])
380                  screenOutput += "Block %5i: jobs %20s: sites: %s\n" % (blockCounter,spanRanges(jobsOfBlock[block]),
381 <                    ','.join(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],[block]),[block])))
382 <                if len(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],[block]),[block])) == 0:
381 >                    ', '.join(SE2CMS(sites)))
382 >                if len(sites) == 0:
383                      noSiteBlock.append( spanRanges(jobsOfBlock[block]) )
384                      bloskNoSite.append( blockCounter )
385  
386 <        common.logger.message(screenOutput)
386 >        common.logger.info(screenOutput)
387          if len(noSiteBlock) > 0 and len(bloskNoSite) > 0:
388              msg = 'WARNING: No sites are hosting any part of data for block:\n                '
389              virgola = ""
# Line 309 | Line 391 | class JobSplitter:
391                  virgola = ","
392              for block in bloskNoSite:
393                  msg += ' ' + str(block) + virgola
394 <            msg += '\n               Related jobs:\n                 '
394 >            msg += '\n\t\tRelated jobs:\n                 '
395              virgola = ""
396              if len(noSiteBlock) > 1:
397                  virgola = ","
398              for range_jobs in noSiteBlock:
399                  msg += str(range_jobs) + virgola
400 <            msg += '\n               will not be submitted and this block of data can not be analyzed!\n'
401 <            if self.cfg_params.has_key('EDG.se_white_list'):
402 <                msg += 'WARNING: SE White List: '+self.cfg_params['EDG.se_white_list']+'\n'
403 <                msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
404 <                msg += 'Please check if the dataset is available at this site!)\n'
405 <            if self.cfg_params.has_key('EDG.ce_white_list'):
406 <                msg += 'WARNING: CE White List: '+self.cfg_params['EDG.ce_white_list']+'\n'
407 <                msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
408 <                msg += 'Please check if the dataset is available at this site!)\n'
400 >            msg += '\n\t\twill not be submitted and this block of data can not be analyzed!\n'
401 >            if self.cfg_params.has_key('GRID.se_white_list'):
402 >                msg += '\tWARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
403 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
404 >                msg += '\tPlease check if the dataset is available at this site!)'
405 >            if self.cfg_params.has_key('GRID.ce_white_list'):
406 >                msg += '\tWARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
407 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
408 >                msg += '\tPlease check if the dataset is available at this site!)\n'
409 >
410 >            common.logger.info(msg)
411  
412 <            common.logger.message(msg)
412 >        if bloskNoSite == allBlock:
413 >            raise CrabException('No jobs created')
414  
415          return
416  
417  
418   ########################################################################
419 <    def jobSplittingByRun(self):
419 >    def jobSplittingByRun(self):
420          """
421          """
337        from sets import Set  
338        from WMCore.JobSplitting.RunBased import RunBased
339        from WMCore.DataStructs.Workflow import Workflow
340        from WMCore.DataStructs.File import File
341        from WMCore.DataStructs.Fileset import Fileset
342        from WMCore.DataStructs.Subscription import Subscription
343        from WMCore.JobSplitting.SplitterFactory import SplitterFactory
344        from WMCore.DataStructs.Run import Run
422  
423          self.checkUserSettings()
424 <        blockSites = self.args['blockSites']
424 >        blockSites = self.args['blockSites']
425          pubdata = self.args['pubdata']
426  
427          if self.selectNumberOfJobs == 0 :
428              self.theNumberOfJobs = 9999999
429          blocks = {}
430 <        runList = []
430 >        runList = []
431          thefiles = Fileset(name='FilesToSplit')
432          fileList = pubdata.getListFiles()
433          for f in fileList:
357           # print f
434              block = f['Block']['Name']
435 <          #  if not blocks.has_key(block):
360 <          #      blocks[block] = reader.listFileBlockLocation(block)
361 <            try:
435 >            try:
436                  f['Block']['StorageElementList'].extend(blockSites[block])
437              except:
438                  continue
# Line 366 | Line 440 | class JobSplitter:
440              [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
441              wmbsFile['block'] = block
442              runNum = f['RunsList'][0]['RunNumber']
443 <            runList.append(runNum)
443 >            runList.append(runNum)
444              myRun = Run(runNumber=runNum)
445              wmbsFile.addRun( myRun )
446              thefiles.addFile(
447                  wmbsFile
448                  )
449 <
449 >
450          work = Workflow()
451          subs = Subscription(
452          fileset = thefiles,
# Line 381 | Line 455 | class JobSplitter:
455          type = "Processing")
456          splitter = SplitterFactory()
457          jobfactory = splitter(subs)
458 <        
459 <        #loop over all runs
386 <        set = Set(runList)
458 >
459 >        #loop over all runs
460          list_of_lists = []
461          jobDestination = []
389
462          count = 0
463 <        for i in list(set):
463 >        for jobGroup in  jobfactory():
464              if count <  self.theNumberOfJobs:
465 <                res = self.getJobInfo(jobfactory())
466 <                parString = ''
465 >                res = self.getJobInfo(jobGroup)
466 >                parString = ''
467                  for file in res['lfns']:
468 <                    parString += '\\\"' + file + '\\\"\,'
469 <                fullString = parString[:-2]
470 <                list_of_lists.append([fullString,str(-1),str(0)])    
468 >                    parString += file + ','
469 >                fullString = parString[:-1]
470 >                list_of_lists.append([fullString,str(-1),str(0)])
471                  #need to check single file location
472 <                jobDestination.append(res['locations'])  
472 >                jobDestination.append(res['locations'])
473                  count +=1
402        #print jobDestination
474         # prepare dict output
475          dictOut = {}
476 +        dictOut['params']= ['InputFiles','MaxEvents','SkipEvents']
477          dictOut['args'] = list_of_lists
478          dictOut['jobDestination'] = jobDestination
479          dictOut['njobs']=count
# Line 410 | Line 482 | class JobSplitter:
482  
483      def getJobInfo( self,jobGroup ):
484          res = {}
485 <        lfns = []        
486 <        locations = []        
485 >        lfns = []
486 >        locations = []
487          tmp_check=0
488          for job in jobGroup.jobs:
489              for file in job.getFiles():
490 <                lfns.append(file['lfn'])
490 >                lfns.append(file['lfn'])
491                  for loc in file['locations']:
492                      if tmp_check < 1 :
493                          locations.append(loc)
494 <                    tmp_check = tmp_check + 1
495 <                ### qui va messo il check per la locations
496 <        res['lfns'] = lfns
497 <        res['locations'] = locations
498 <        return res                
499 <      
494 >                tmp_check = tmp_check + 1
495 >                ### qui va messo il check per la locations
496 >        res['lfns'] = lfns
497 >        res['locations'] = locations
498 >        return res
499 >
500   ########################################################################
501 <    def jobSplittingNoInput(self):
501 >    def prepareSplittingNoInput(self):
502          """
431        Perform job splitting based on number of event per job
503          """
433        common.logger.debug(5,'Splitting per events')
434        self.checkUserSettings()
435        jobDestination=[]
436        if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
437            msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
438            raise CrabException(msg)
439
440        managedGenerators =self.args['managedGenerators']
441        generator = self.args['generator']
442        firstRun = self.cfg_params.get('CMSSW.first_run',None)
443
504          if (self.selectEventsPerJob):
505 <            common.logger.message('Required '+str(self.eventsPerJob)+' events per job ')
505 >            common.logger.info('Required '+str(self.eventsPerJob)+' events per job ')
506          if (self.selectNumberOfJobs):
507 <            common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
507 >            common.logger.info('Required '+str(self.theNumberOfJobs)+' jobs in total ')
508          if (self.selectTotalNumberEvents):
509 <            common.logger.message('Required '+str(self.total_number_of_events)+' events in total ')
509 >            common.logger.info('Required '+str(self.total_number_of_events)+' events in total ')
510  
511          if (self.total_number_of_events < 0):
512              msg='Cannot split jobs per Events with "-1" as total number of events'
# Line 463 | Line 523 | class JobSplitter:
523              self.total_number_of_jobs = self.theNumberOfJobs
524              self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
525  
526 <        common.logger.debug(5,'N jobs  '+str(self.total_number_of_jobs))
526 >
527 >    def jobSplittingNoInput(self):
528 >        """
529 >        Perform job splitting based on number of event per job
530 >        """
531 >        common.logger.debug('Splitting per events')
532 >        self.checkUserSettings()
533 >        jobDestination=[]
534 >        if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
535 >            msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
536 >            raise CrabException(msg)
537 >
538 >        managedGenerators =self.args['managedGenerators']
539 >        generator = self.args['generator']
540 >        firstLumi = self.cfg_params.get('CMSSW.first_lumi', 1)
541 >
542 >        self.prepareSplittingNoInput()
543 >
544 >        common.logger.debug('N jobs  '+str(self.total_number_of_jobs))
545  
546          # is there any remainder?
547          check = int(self.total_number_of_events) - (int(self.total_number_of_jobs)*self.eventsPerJob)
548  
549 <        common.logger.debug(5,'Check  '+str(check))
549 >        common.logger.debug('Check  '+str(check))
550  
551 <        common.logger.message(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
551 >        common.logger.info(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
552          if check > 0:
553 <            common.logger.message('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
553 >            common.logger.info('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
554  
555          # argument is seed number.$i
556          self.list_of_args = []
557          for i in range(self.total_number_of_jobs):
558              ## Since there is no input, any site is good
559 <            jobDestination.append([""]) #must be empty to write correctly the xml
559 >            jobDestination.append([""]) # must be empty to correctly write the XML
560              args=[]
561 <            if (firstRun):
562 <                ## pythia first run
485 <                args.append(str(firstRun)+str(i))
561 >            if (firstLumi): # Pythia first lumi
562 >                args.append(str(int(firstLumi)+i))
563              if (generator in managedGenerators):
564 <                if (generator == 'comphep' and i == 0):
564 >               args.append(generator)
565 >               if (generator == 'comphep' and i == 0):
566                      # COMPHEP is brain-dead and wants event #'s like 1,100,200,300
567                      args.append('1')
568 <                else:
568 >               else:
569                      args.append(str(i*self.eventsPerJob))
570              args.append(str(self.eventsPerJob))
571              self.list_of_args.append(args)
572         # prepare dict output
573 +
574          dictOut = {}
575 +        dictOut['params'] = ['MaxEvents']
576 +        if (firstLumi):
577 +            dictOut['params'] = ['FirstLumi','MaxEvents']
578 +            if (generator in managedGenerators):
579 +                dictOut['params'] = ['FirstLumi', 'Generator', 'FirstEvent', 'MaxEvents']
580 +        else:
581 +            if (generator in managedGenerators) :
582 +                dictOut['params'] = ['Generator', 'FirstEvent', 'MaxEvents']
583          dictOut['args'] = self.list_of_args
584          dictOut['jobDestination'] = jobDestination
585          dictOut['njobs']=self.total_number_of_jobs
# Line 509 | Line 596 | class JobSplitter:
596              msg = 'must specify  number_of_jobs.'
597              raise crabexception(msg)
598          jobDestination = []
599 <        common.logger.debug(5,'Splitting per job')
600 <        common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
599 >        common.logger.debug('Splitting per job')
600 >        common.logger.info('Required '+str(self.theNumberOfJobs)+' jobs in total ')
601  
602 <        self.total_number_of_jobs = self.theNumberOfJobs
602 > #        self.total_number_of_jobs = self.theNumberOfJobs
603  
604 <        common.logger.debug(5,'N jobs  '+str(self.total_number_of_jobs))
604 >        self.prepareSplittingNoInput()
605  
606 <        common.logger.message(str(self.total_number_of_jobs)+' jobs can be created')
606 >        common.logger.debug('N jobs  '+str(self.total_number_of_jobs))
607 >
608 >        common.logger.info(str(self.total_number_of_jobs)+' jobs can be created')
609  
610          # argument is seed number.$i
611          self.list_of_args = []
612          for i in range(self.total_number_of_jobs):
613 +            args=[]
614              jobDestination.append([""])
615 <            self.list_of_args.append([str(i)])
615 >            if self.eventsPerJob != 0 :
616 >                args.append(str(self.eventsPerJob))
617 >                self.list_of_args.append(args)
618  
619         # prepare dict output
620          dictOut = {}
621 <        dictOut['args'] = self.list_of_args
621 >        dictOut['params'] = ['MaxEvents']
622 >        dictOut['args'] =  self.list_of_args
623          dictOut['jobDestination'] = jobDestination
624          dictOut['njobs']=self.total_number_of_jobs
625          return dictOut
533
626  
627 <    def jobSplittingByLumi(self):
627 >
628 >    def jobSplittingByLumi(self):
629          """
630 +        Split task into jobs by Lumi section paying attention to which
631 +        lumis should be run (according to the analysis dataset).
632 +        This uses WMBS job splitting which does not split files over jobs
633 +        so the job will have AT LEAST as many lumis as requested, perhaps
634 +        more
635          """
636 <        return
636 >
637 >        common.logger.debug('Splitting by Lumi')
638 >        self.checkLumiSettings()
639 >
640 >        blockSites = self.args['blockSites']
641 >        pubdata = self.args['pubdata']
642 >
643 >        lumisPerFile  = pubdata.getLumis()
644 >
645 >        # Make the list of WMBS files for job splitter
646 >        fileList = pubdata.getListFiles()
647 >        thefiles = Fileset(name='FilesToSplit')
648 >        for jobFile in fileList:
649 >            block = jobFile['Block']['Name']
650 >            try:
651 >                jobFile['Block']['StorageElementList'].extend(blockSites[block])
652 >            except:
653 >                continue
654 >            wmbsFile = File(jobFile['LogicalFileName'])
655 >            [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
656 >            wmbsFile['block'] = block
657 >            for lumi in lumisPerFile[jobFile['LogicalFileName']]:
658 >                wmbsFile.addRun(Run(lumi[0], lumi[1]))
659 >            thefiles.addFile(wmbsFile)
660 >
661 >        # Create the factory and workflow
662 >        work = Workflow()
663 >        subs = Subscription(fileset    = thefiles,    workflow = work,
664 >                            split_algo = 'LumiBased', type     = "Processing")
665 >        splitter = SplitterFactory()
666 >        jobFactory = splitter(subs)
667 >
668 >        list_of_lists = []
669 >        jobDestination = []
670 >        jobCount = 0
671 >        lumisCreated = 0
672 >
673 >        if not self.limitJobLumis:
674 >            self.lumisPerJob = pubdata.getMaxLumis() // self.theNumberOfJobs + 1
675 >            common.logger.info('Each job will process about %s lumis.' %
676 >                                self.lumisPerJob)
677 >
678 >        for jobGroup in  jobFactory(lumis_per_job = self.lumisPerJob):
679 >            for job in jobGroup.jobs:
680 >                if (self.limitNJobs and jobCount >= self.theNumberOfJobs):
681 >                    common.logger.info('Limit on number of jobs reached.')
682 >                    break
683 >                if (self.limitTotalLumis and lumisCreated >= self.totalNLumis):
684 >                    common.logger.info('Limit on number of lumis reached.')
685 >                    break
686 >                lumis = []
687 >                lfns  = []
688 >                locations = []
689 >                firstFile = True
690 >                # Collect information from all the files
691 >                for jobFile in job.getFiles():
692 >                    if firstFile:  # Get locations from first file in the job
693 >                        for loc in jobFile['locations']:
694 >                            locations.append(loc)
695 >                        firstFile = False
696 >                    # Accumulate Lumis from all files
697 >                    for lumiList in jobFile['runs']:
698 >                        theRun = lumiList.run
699 >                        for theLumi in list(lumiList):
700 >                            lumis.append( (theRun, theLumi) )
701 >
702 >                    lfns.append(jobFile['lfn'])
703 >                fileString = ','.join(lfns)
704 >                lumiString = compressLumiString(lumis)
705 >                list_of_lists.append([fileString, str(-1), str(0), lumiString])
706 >
707 >                jobDestination.append(locations)
708 >                jobCount += 1
709 >                lumisCreated += len(lumis)
710 >                common.logger.debug('Job %s will run on %s files and %s lumis '
711 >                    % (jobCount, len(lfns), len(lumis) ))
712 >
713 >        common.logger.info('%s jobs created to run on %s lumis' %
714 >                              (jobCount, lumisCreated))
715 >
716 >        # Prepare dict output matching back to non-WMBS job creation
717 >        dictOut = {}
718 >        dictOut['params'] = ['InputFiles', 'MaxEvents', 'SkipEvents', 'Lumis']
719 >        dictOut['args'] = list_of_lists
720 >        dictOut['jobDestination'] = jobDestination
721 >        dictOut['njobs'] = jobCount
722 >
723 >        return dictOut
724 >
725 >
726      def Algos(self):
727          """
728          Define key splittingType matrix
729          """
730 <        SplitAlogs = {
731 <                     'EventBased'           : self.jobSplittingByEvent,
730 >        SplitAlogs = {
731 >                     'EventBased'           : self.jobSplittingByEvent,
732                       'RunBased'             : self.jobSplittingByRun,
733 <                     'LumiBased'            : self.jobSplittingByLumi,
734 <                     'NoInput'              : self.jobSplittingNoInput,
733 >                     'LumiBased'            : self.jobSplittingByLumi,
734 >                     'NoInput'              : self.jobSplittingNoInput,
735                       'ForScript'            : self.jobSplittingForScript
736 <                     }  
736 >                     }
737          return SplitAlogs
738  
739 +
740 +
741 + def compressLumiString(lumis):
742 +    """
743 +    Turn a list of 2-tuples of run/lumi numbers into a list of the format
744 +    R1:L1,R2:L2-R3:L3 which is acceptable to CMSSW LumiBlockRange variable
745 +    """
746 +
747 +    lumis.sort()
748 +    parts = []
749 +    startRange = None
750 +    endRange = None
751 +
752 +    for lumiBlock in lumis:
753 +        if not startRange: # This is the first one
754 +            startRange = lumiBlock
755 +            endRange = lumiBlock
756 +        elif lumiBlock == endRange: # Same Lumi (different files?)
757 +            pass
758 +        elif lumiBlock[0] == endRange[0] and lumiBlock[1] == endRange[1] + 1: # This is a continuation
759 +            endRange = lumiBlock
760 +        else: # This is the start of a new range
761 +            part = ':'.join(map(str, startRange))
762 +            if startRange != endRange:
763 +                part += '-' + ':'.join(map(str, endRange))
764 +            parts.append(part)
765 +            startRange = lumiBlock
766 +            endRange = lumiBlock
767 +
768 +    # Put out what's left
769 +    if startRange:
770 +        part = ':'.join(map(str, startRange))
771 +        if startRange != endRange:
772 +            part += '-' + ':'.join(map(str, endRange))
773 +        parts.append(part)
774 +
775 +    output = ','.join(parts)
776 +    return output
777 +
778 +

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines