ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Splitter.py
(Generate patch)

Comparing COMP/CRAB/python/Splitter.py (file contents):
Revision 1.7 by spiga, Wed Feb 11 22:13:03 2009 UTC vs.
Revision 1.29 by ewv, Thu Oct 1 22:00:40 2009 UTC

# Line 1 | Line 1
1 +
2 + __revision__ = "$Id$"
3 + __version__ = "$Revision$"
4 +
5   import common
6 < from crab_logger import Logger
6 > from sets import Set
7   from crab_exceptions import *
8   from crab_util import *
9 +
10 + from WMCore.DataStructs.File import File
11 + from WMCore.DataStructs.Fileset import Fileset
12 + from WMCore.DataStructs.Run import Run
13 + from WMCore.DataStructs.Subscription import Subscription
14 + from WMCore.DataStructs.Workflow import Workflow
15 + from WMCore.JobSplitting.SplitterFactory import SplitterFactory
16   from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
17  
18   class JobSplitter:
19      def __init__( self, cfg_params,  args ):
20          self.cfg_params = cfg_params
21          self.args=args
22 +
23 +        self.lumisPerJob = -1
24 +        self.totalNLumis = 0
25 +        self.theNumberOfJobs = 0
26 +        self.limitNJobs = False
27 +        self.limitTotalLumis = False
28 +        self.limitJobLumis = False
29 +
30          #self.maxEvents
31          # init BlackWhiteListParser
32 <        seWhiteList = cfg_params.get('EDG.se_white_list',[])
33 <        seBlackList = cfg_params.get('EDG.se_black_list',[])
34 <        self.blackWhiteListParser = SEBlackWhiteListParser(seWhiteList, seBlackList, common.logger)
32 >        self.seWhiteList = cfg_params.get('GRID.se_white_list',[])
33 >        seBlackList = cfg_params.get('GRID.se_black_list',[])
34 >        self.blackWhiteListParser = SEBlackWhiteListParser(self.seWhiteList, seBlackList, common.logger())
35  
36  
37      def checkUserSettings(self):
# Line 44 | Line 63 | class JobSplitter:
63              self.selectTotalNumberEvents = 0
64  
65  
66 +    def checkLumiSettings(self):
67 +        """
68 +        Check to make sure the user has specified enough information to
69 +        perform splitting by Lumis to run the job
70 +        """
71 +        settings = 0
72 +        if self.cfg_params.has_key('CMSSW.lumis_per_job'):
73 +            self.lumisPerJob =int( self.cfg_params['CMSSW.lumis_per_job'])
74 +            self.limitJobLumis = True
75 +            settings += 1
76 +
77 +        if self.cfg_params.has_key('CMSSW.number_of_jobs'):
78 +            self.theNumberOfJobs =int( self.cfg_params['CMSSW.number_of_jobs'])
79 +            self.limitNJobs = True
80 +            settings += 1
81 +
82 +        if self.cfg_params.has_key('CMSSW.total_number_of_lumis'):
83 +            self.totalNLumis = int(self.cfg_params['CMSSW.total_number_of_lumis'])
84 +            self.limitTotalLumis = (self.totalNLumis != -1)
85 +            settings += 1
86 +
87 +        if settings != 2:
88 +            msg = 'When running on analysis datasets you must specify two and only two of:\n'
89 +            msg += '  number_of_jobs, lumis_per_job, total_number_of_lumis'
90 +            raise CrabException(msg)
91 +        if self.limitNJobs and self.limitJobLumis:
92 +            self.limitTotalLumis = True
93 +            self.totalNLumis = self.lumisPerJob * self.theNumberOfJobs
94 +
95 +
96 +    def ComputeSubBlockSites( self, blockSites ):
97 +        """
98 +        """
99 +        sub_blockSites = {}
100 +        for k,v in blockSites.iteritems():
101 +            sites=self.blackWhiteListParser.checkWhiteList(v)
102 +            if sites : sub_blockSites[k]=v
103 +        if len(sub_blockSites) < 1:
104 +            msg = 'WARNING: the sites %s is not hosting any part of data.'%self.seWhiteList
105 +            raise CrabException(msg)
106 +        return sub_blockSites
107 +
108   ########################################################################
109      def jobSplittingByEvent( self ):
110          """
# Line 58 | Line 119 | class JobSplitter:
119                self.list_of_args - File(s) job will run on (a list of lists)
120          """
121  
122 <        jobDestination=[]  
122 >        jobDestination=[]
123          self.checkUserSettings()
124          if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
125              msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
126              raise CrabException(msg)
127 <
128 <        blockSites = self.args['blockSites']
127 >
128 >        blockSites = self.args['blockSites']
129          pubdata = self.args['pubdata']
130          filesbyblock=pubdata.getFiles()
131  
# Line 78 | Line 139 | class JobSplitter:
139          self.useParent = int(self.cfg_params.get('CMSSW.use_parent',0))
140          noBboundary = int(self.cfg_params.get('CMSSW.no_block_boundary',0))
141  
142 +        if noBboundary == 1:
143 +            if self.total_number_of_events== -1:
144 +                msg = 'You are selecting no_block_boundary=1 which does not allow to set total_number_of_events=-1\n'
145 +                msg +='\tYou shoud get the number of event from DBS web interface and use it for your configuration.'
146 +                raise CrabException(msg)
147 +            if len(self.seWhiteList) == 0 or  len(self.seWhiteList.split(',')) != 1:
148 +                msg = 'You are selecting no_block_boundary=1 which requires to choose one and only one site.\n'
149 +                msg += "\tPlease set se_white_list with the site's storage element name."
150 +                raise  CrabException(msg)
151 +            blockSites = self.ComputeSubBlockSites(blockSites)
152 +
153          # ---- Handle the possible job splitting configurations ---- #
154          if (self.selectTotalNumberEvents):
155              totalEventsRequested = self.total_number_of_events
# Line 92 | Line 164 | class JobSplitter:
164          # If user requested more events than are in the dataset
165          elif (totalEventsRequested > self.maxEvents):
166              eventsRemaining = self.maxEvents
167 <            common.logger.message("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
167 >            common.logger.info("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
168          # If user requested less events than are in the dataset
169          else:
170              eventsRemaining = totalEventsRequested
# Line 108 | Line 180 | class JobSplitter:
180              eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs)
181  
182          if (self.selectNumberOfJobs):
183 <            common.logger.message("May not create the exact number_of_jobs requested.")
183 >            common.logger.info("May not create the exact number_of_jobs requested.")
184  
185          # old... to remove Daniele
186          totalNumberOfJobs = 999999999
# Line 125 | Line 197 | class JobSplitter:
197          jobsOfBlock = {}
198  
199          parString = ""
200 +        pString = ""
201          filesEventCount = 0
202 +        msg=''
203  
204          # ---- Iterate over the blocks in the dataset until ---- #
205          # ---- we've met the requested total # of events    ---- #
# Line 137 | Line 211 | class JobSplitter:
211  
212              if self.eventsbyblock.has_key(block) :
213                  numEventsInBlock = self.eventsbyblock[block]
214 <                common.logger.debug(5,'Events in Block File '+str(numEventsInBlock))
214 >                common.logger.debug('Events in Block File '+str(numEventsInBlock))
215  
216                  files = filesbyblock[block]
217                  numFilesInBlock = len(files)
# Line 147 | Line 221 | class JobSplitter:
221                  if noBboundary == 0: # DD
222                      # ---- New block => New job ---- #
223                      parString = ""
224 +                    pString=""
225                      # counter for number of events in files currently worked on
226                      filesEventCount = 0
227                  # flag if next while loop should touch new file
# Line 156 | Line 231 | class JobSplitter:
231  
232                  # ---- Iterate over the files in the block until we've met the requested ---- #
233                  # ---- total # of events or we've gone over all the files in this block  ---- #
234 <                pString=''
234 >                msg='\n'
235                  while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
236                      file = files[fileCount]
237                      if self.useParent==1:
238                          parent = self.parentFiles[file]
239 <                        for f in parent :
165 <                            pString += '\\\"' + f + '\\\"\,'
166 <                        common.logger.debug(6, "File "+str(file)+" has the following parents: "+str(parent))
167 <                        common.logger.write("File "+str(file)+" has the following parents: "+str(parent))
239 >                        common.logger.log(10-1, "File "+str(file)+" has the following parents: "+str(parent))
240                      if newFile :
241                          try:
242                              numEventsInFile = self.eventsbyfile[file]
243 <                            common.logger.debug(6, "File "+str(file)+" has "+str(numEventsInFile)+" events")
243 >                            common.logger.log(10-1, "File "+str(file)+" has "+str(numEventsInFile)+" events")
244                              # increase filesEventCount
245                              filesEventCount += numEventsInFile
246                              # Add file to current job
247 <                            parString += '\\\"' + file + '\\\"\,'
247 >                            parString +=  file + ','
248 >                            if self.useParent==1:
249 >                                for f in parent :
250 >                                    pString += f  + ','
251                              newFile = 0
252                          except KeyError:
253 <                            common.logger.message("File "+str(file)+" has unknown number of events: skipping")
253 >                            common.logger.info("File "+str(file)+" has unknown number of events: skipping")
254  
255                      eventsPerJobRequested = min(eventsPerJobRequested, eventsRemaining)
256                      # if less events in file remain than eventsPerJobRequested
# Line 188 | Line 263 | class JobSplitter:
263                              if ( fileCount == numFilesInBlock-1 ) :
264                                  # end job using last file, use remaining events in block
265                                  # close job and touch new file
266 <                                fullString = parString[:-2]
266 >                                fullString = parString[:-1]
267                                  if self.useParent==1:
268 <                                    fullParentString = pString[:-2]
268 >                                    fullParentString = pString[:-1]
269                                      list_of_lists.append([fullString,fullParentString,str(-1),str(jobSkipEventCount)])
270                                  else:
271                                      list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
272 <                                common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(filesEventCount - jobSkipEventCount)+" events (last file in block).")
272 >                                msg += "Job %s can run over %s  events (last file in block).\n"%(str(jobCount+1), str(filesEventCount - jobSkipEventCount))
273                                  jobDestination.append(blockSites[block])
274 <                                common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(jobDestination[jobCount]))
274 >                                msg += "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
275                                  # fill jobs of block dictionary
276                                  jobsOfBlock[block].append(jobCount+1)
277                                  # reset counter
# Line 217 | Line 292 | class JobSplitter:
292                      # if events in file equal to eventsPerJobRequested
293                      elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
294                          # close job and touch new file
295 <                        fullString = parString[:-2]
295 >                        fullString = parString[:-1]
296                          if self.useParent==1:
297 <                            fullParentString = pString[:-2]
297 >                            fullParentString = pString[:-1]
298                              list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
299                          else:
300                              list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
301 <                        common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
301 >                        msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
302                          jobDestination.append(blockSites[block])
303 <                        common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(jobDestination[jobCount]))
303 >                        msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
304                          jobsOfBlock[block].append(jobCount+1)
305                          # reset counter
306                          jobCount = jobCount + 1
# Line 242 | Line 317 | class JobSplitter:
317                      # if more events in file remain than eventsPerJobRequested
318                      else :
319                          # close job but don't touch new file
320 <                        fullString = parString[:-2]
320 >                        fullString = parString[:-1]
321                          if self.useParent==1:
322 <                            fullParentString = pString[:-2]
322 >                            fullParentString = pString[:-1]
323                              list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
324                          else:
325                              list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
326 <                        common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
326 >                        msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
327                          jobDestination.append(blockSites[block])
328 <                        common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(jobDestination[jobCount]))
328 >                        msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
329                          jobsOfBlock[block].append(jobCount+1)
330                          # increase counter
331                          jobCount = jobCount + 1
# Line 261 | Line 336 | class JobSplitter:
336                          jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
337                          # remove all but the last file
338                          filesEventCount = self.eventsbyfile[file]
339 +                        pString_tmp=''
340                          if self.useParent==1:
341 <                            for f in parent : pString += '\\\"' + f + '\\\"\,'
342 <                        parString = '\\\"' + file + '\\\"\,'
341 >                            for f in parent : pString_tmp +=  f + ','
342 >                        pString =  pString_tmp
343 >                        parString =  file + ','
344                      pass # END if
345                  pass # END while (iterate over files in the block)
346          pass # END while (iterate over blocks in the dataset)
347 +        common.logger.debug(msg)
348          self.ncjobs = self.total_number_of_jobs = jobCount
349          if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
350 <            common.logger.message("Could not run on all requested events because some blocks not hosted at allowed sites.")
351 <        common.logger.message(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
352 <
350 >            common.logger.info("Could not run on all requested events because some blocks not hosted at allowed sites.")
351 >        common.logger.info(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
352 >
353          # skip check on  block with no sites  DD
354          if noBboundary == 0 : self.checkBlockNoSite(blocks,jobsOfBlock,blockSites)
355  
356         # prepare dict output
357          dictOut = {}
358 +        dictOut['params']= ['InputFiles','MaxEvents','SkipEvents']
359 +        if self.useParent: dictOut['params']= ['InputFiles','ParentFiles','MaxEvents','SkipEvents']
360          dictOut['args'] = list_of_lists
361          dictOut['jobDestination'] = jobDestination
362          dictOut['njobs']=self.total_number_of_jobs
# Line 285 | Line 365 | class JobSplitter:
365  
366          # keep trace of block with no sites to print a warning at the end
367  
368 <    def checkBlockNoSite(self,blocks,jobsOfBlock,blockSites):  
368 >    def checkBlockNoSite(self,blocks,jobsOfBlock,blockSites):
369          # screen output
370          screenOutput = "List of jobs and available destination sites:\n\n"
371          noSiteBlock = []
372          bloskNoSite = []
373 +        allBlock = []
374  
375          blockCounter = 0
376          for block in blocks:
377              if block in jobsOfBlock.keys() :
378                  blockCounter += 1
379 +                allBlock.append( blockCounter )
380 +                sites=self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],[block]),[block])
381                  screenOutput += "Block %5i: jobs %20s: sites: %s\n" % (blockCounter,spanRanges(jobsOfBlock[block]),
382 <                    ','.join(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],[block]),[block])))
383 <                if len(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],[block]),[block])) == 0:
382 >                    ', '.join(SE2CMS(sites)))
383 >                if len(sites) == 0:
384                      noSiteBlock.append( spanRanges(jobsOfBlock[block]) )
385                      bloskNoSite.append( blockCounter )
386  
387 <        common.logger.message(screenOutput)
387 >        common.logger.info(screenOutput)
388          if len(noSiteBlock) > 0 and len(bloskNoSite) > 0:
389              msg = 'WARNING: No sites are hosting any part of data for block:\n                '
390              virgola = ""
# Line 309 | Line 392 | class JobSplitter:
392                  virgola = ","
393              for block in bloskNoSite:
394                  msg += ' ' + str(block) + virgola
395 <            msg += '\n               Related jobs:\n                 '
395 >            msg += '\n\t\tRelated jobs:\n                 '
396              virgola = ""
397              if len(noSiteBlock) > 1:
398                  virgola = ","
399              for range_jobs in noSiteBlock:
400                  msg += str(range_jobs) + virgola
401 <            msg += '\n               will not be submitted and this block of data can not be analyzed!\n'
402 <            if self.cfg_params.has_key('EDG.se_white_list'):
403 <                msg += 'WARNING: SE White List: '+self.cfg_params['EDG.se_white_list']+'\n'
404 <                msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
405 <                msg += 'Please check if the dataset is available at this site!)\n'
406 <            if self.cfg_params.has_key('EDG.ce_white_list'):
407 <                msg += 'WARNING: CE White List: '+self.cfg_params['EDG.ce_white_list']+'\n'
408 <                msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
409 <                msg += 'Please check if the dataset is available at this site!)\n'
401 >            msg += '\n\t\twill not be submitted and this block of data can not be analyzed!\n'
402 >            if self.cfg_params.has_key('GRID.se_white_list'):
403 >                msg += '\tWARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
404 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
405 >                msg += '\tPlease check if the dataset is available at this site!)'
406 >            if self.cfg_params.has_key('GRID.ce_white_list'):
407 >                msg += '\tWARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
408 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
409 >                msg += '\tPlease check if the dataset is available at this site!)\n'
410 >
411 >            common.logger.info(msg)
412  
413 <            common.logger.message(msg)
413 >        if bloskNoSite == allBlock:
414 >            raise CrabException('No jobs created')
415  
416          return
417  
418  
419   ########################################################################
420 <    def jobSplittingByRun(self):
420 >    def jobSplittingByRun(self):
421          """
422          """
337        from sets import Set  
338        from WMCore.JobSplitting.RunBased import RunBased
339        from WMCore.DataStructs.Workflow import Workflow
340        from WMCore.DataStructs.File import File
341        from WMCore.DataStructs.Fileset import Fileset
342        from WMCore.DataStructs.Subscription import Subscription
343        from WMCore.JobSplitting.SplitterFactory import SplitterFactory
344        from WMCore.DataStructs.Run import Run
423  
424          self.checkUserSettings()
425 <        blockSites = self.args['blockSites']
425 >        blockSites = self.args['blockSites']
426          pubdata = self.args['pubdata']
427  
428          if self.selectNumberOfJobs == 0 :
429              self.theNumberOfJobs = 9999999
430          blocks = {}
431 <        runList = []
431 >        runList = []
432          thefiles = Fileset(name='FilesToSplit')
433          fileList = pubdata.getListFiles()
434          for f in fileList:
357           # print f
435              block = f['Block']['Name']
436 <          #  if not blocks.has_key(block):
360 <          #      blocks[block] = reader.listFileBlockLocation(block)
361 <            try:
436 >            try:
437                  f['Block']['StorageElementList'].extend(blockSites[block])
438              except:
439                  continue
# Line 366 | Line 441 | class JobSplitter:
441              [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
442              wmbsFile['block'] = block
443              runNum = f['RunsList'][0]['RunNumber']
444 <            runList.append(runNum)
444 >            runList.append(runNum)
445              myRun = Run(runNumber=runNum)
446              wmbsFile.addRun( myRun )
447              thefiles.addFile(
448                  wmbsFile
449                  )
450 <
450 >
451          work = Workflow()
452          subs = Subscription(
453          fileset = thefiles,
# Line 381 | Line 456 | class JobSplitter:
456          type = "Processing")
457          splitter = SplitterFactory()
458          jobfactory = splitter(subs)
459 <        
460 <        #loop over all runs
459 >
460 >        #loop over all runs
461          set = Set(runList)
462          list_of_lists = []
463          jobDestination = []
389
464          count = 0
465 <        for i in list(set):
465 >        for jobGroup in  jobfactory():
466              if count <  self.theNumberOfJobs:
467 <                res = self.getJobInfo(jobfactory())
468 <                parString = ''
467 >                res = self.getJobInfo(jobGroup)
468 >                parString = ''
469                  for file in res['lfns']:
470 <                    parString += '\\\"' + file + '\\\"\,'
471 <                fullString = parString[:-2]
472 <                list_of_lists.append([fullString,str(-1),str(0)])    
470 >                    parString += file + ','
471 >                fullString = parString[:-1]
472 >                list_of_lists.append([fullString,str(-1),str(0)])
473                  #need to check single file location
474 <                jobDestination.append(res['locations'])  
474 >                jobDestination.append(res['locations'])
475                  count +=1
402        #print jobDestination
476         # prepare dict output
477          dictOut = {}
478 +        dictOut['params']= ['InputFiles','MaxEvents','SkipEvents']
479          dictOut['args'] = list_of_lists
480          dictOut['jobDestination'] = jobDestination
481          dictOut['njobs']=count
# Line 410 | Line 484 | class JobSplitter:
484  
485      def getJobInfo( self,jobGroup ):
486          res = {}
487 <        lfns = []        
488 <        locations = []        
487 >        lfns = []
488 >        locations = []
489          tmp_check=0
490          for job in jobGroup.jobs:
491              for file in job.getFiles():
492 <                lfns.append(file['lfn'])
492 >                lfns.append(file['lfn'])
493                  for loc in file['locations']:
494                      if tmp_check < 1 :
495                          locations.append(loc)
496 <                    tmp_check = tmp_check + 1
497 <                ### qui va messo il check per la locations
498 <        res['lfns'] = lfns
499 <        res['locations'] = locations
500 <        return res                
501 <      
496 >                tmp_check = tmp_check + 1
497 >                ### qui va messo il check per la locations
498 >        res['lfns'] = lfns
499 >        res['locations'] = locations
500 >        return res
501 >
502   ########################################################################
503 <    def jobSplittingNoInput(self):
503 >    def prepareSplittingNoInput(self):
504          """
431        Perform job splitting based on number of event per job
505          """
433        common.logger.debug(5,'Splitting per events')
434        self.checkUserSettings()
435        jobDestination=[]
436        if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
437            msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
438            raise CrabException(msg)
439
440        managedGenerators =self.args['managedGenerators']
441        generator = self.args['generator']
442        firstRun = self.cfg_params.get('CMSSW.first_run',None)
443
506          if (self.selectEventsPerJob):
507 <            common.logger.message('Required '+str(self.eventsPerJob)+' events per job ')
507 >            common.logger.info('Required '+str(self.eventsPerJob)+' events per job ')
508          if (self.selectNumberOfJobs):
509 <            common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
509 >            common.logger.info('Required '+str(self.theNumberOfJobs)+' jobs in total ')
510          if (self.selectTotalNumberEvents):
511 <            common.logger.message('Required '+str(self.total_number_of_events)+' events in total ')
511 >            common.logger.info('Required '+str(self.total_number_of_events)+' events in total ')
512  
513          if (self.total_number_of_events < 0):
514              msg='Cannot split jobs per Events with "-1" as total number of events'
# Line 463 | Line 525 | class JobSplitter:
525              self.total_number_of_jobs = self.theNumberOfJobs
526              self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
527  
528 <        common.logger.debug(5,'N jobs  '+str(self.total_number_of_jobs))
528 >
529 >    def jobSplittingNoInput(self):
530 >        """
531 >        Perform job splitting based on number of event per job
532 >        """
533 >        common.logger.debug('Splitting per events')
534 >        self.checkUserSettings()
535 >        jobDestination=[]
536 >        if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
537 >            msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
538 >            raise CrabException(msg)
539 >
540 >        managedGenerators =self.args['managedGenerators']
541 >        generator = self.args['generator']
542 >        firstRun = self.cfg_params.get('CMSSW.first_run', 1)
543 >
544 >        self.prepareSplittingNoInput()
545 >
546 >        common.logger.debug('N jobs  '+str(self.total_number_of_jobs))
547  
548          # is there any remainder?
549          check = int(self.total_number_of_events) - (int(self.total_number_of_jobs)*self.eventsPerJob)
550  
551 <        common.logger.debug(5,'Check  '+str(check))
551 >        common.logger.debug('Check  '+str(check))
552  
553 <        common.logger.message(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
553 >        common.logger.info(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
554          if check > 0:
555 <            common.logger.message('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
555 >            common.logger.info('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
556  
557          # argument is seed number.$i
558          self.list_of_args = []
559          for i in range(self.total_number_of_jobs):
560              ## Since there is no input, any site is good
561 <            jobDestination.append([""]) #must be empty to write correctly the xml
561 >            jobDestination.append([""]) # must be empty to correctly write the XML
562              args=[]
563 <            if (firstRun):
564 <                ## pythia first run
485 <                args.append(str(firstRun)+str(i))
563 >            if (firstRun): # Pythia first run
564 >                args.append(str(int(firstRun)+i))
565              if (generator in managedGenerators):
566 <                if (generator == 'comphep' and i == 0):
566 >               args.append(generator)
567 >               if (generator == 'comphep' and i == 0):
568                      # COMPHEP is brain-dead and wants event #'s like 1,100,200,300
569                      args.append('1')
570 <                else:
570 >               else:
571                      args.append(str(i*self.eventsPerJob))
572              args.append(str(self.eventsPerJob))
573              self.list_of_args.append(args)
574         # prepare dict output
575 +
576          dictOut = {}
577 +        dictOut['params'] = ['MaxEvents']
578 +        if (firstRun):
579 +            dictOut['params'] = ['FirstRun','MaxEvents']
580 +            if ( generator in managedGenerators ) :
581 +                dictOut['params'] = ['FirstRun', 'Generator', 'FirstEvent', 'MaxEvents']
582 +        else:
583 +            if (generator in managedGenerators) :
584 +                dictOut['params'] = ['Generator', 'FirstEvent', 'MaxEvents']
585          dictOut['args'] = self.list_of_args
586          dictOut['jobDestination'] = jobDestination
587          dictOut['njobs']=self.total_number_of_jobs
# Line 509 | Line 598 | class JobSplitter:
598              msg = 'must specify  number_of_jobs.'
599              raise crabexception(msg)
600          jobDestination = []
601 <        common.logger.debug(5,'Splitting per job')
602 <        common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
601 >        common.logger.debug('Splitting per job')
602 >        common.logger.info('Required '+str(self.theNumberOfJobs)+' jobs in total ')
603 >
604 > #        self.total_number_of_jobs = self.theNumberOfJobs
605  
606 <        self.total_number_of_jobs = self.theNumberOfJobs
606 >        self.prepareSplittingNoInput()
607  
608 <        common.logger.debug(5,'N jobs  '+str(self.total_number_of_jobs))
608 >        common.logger.debug('N jobs  '+str(self.total_number_of_jobs))
609  
610 <        common.logger.message(str(self.total_number_of_jobs)+' jobs can be created')
610 >        common.logger.info(str(self.total_number_of_jobs)+' jobs can be created')
611  
612          # argument is seed number.$i
613          self.list_of_args = []
614          for i in range(self.total_number_of_jobs):
615 +            args=[]
616              jobDestination.append([""])
617 <            self.list_of_args.append([str(i)])
617 >            if self.eventsPerJob != 0 :
618 >                args.append(str(self.eventsPerJob))
619 >                self.list_of_args.append(args)
620  
621         # prepare dict output
622          dictOut = {}
623 <        dictOut['args'] = self.list_of_args
623 >        dictOut['params'] = ['MaxEvents']
624 >        dictOut['args'] =  self.list_of_args
625          dictOut['jobDestination'] = jobDestination
626          dictOut['njobs']=self.total_number_of_jobs
627          return dictOut
533
628  
629 <    def jobSplittingByLumi(self):
629 >
630 >    def jobSplittingByLumi(self):
631          """
632 +        Split task into jobs by Lumi section paying attention to which
633 +        lumis should be run (according to the analysis dataset).
634 +        This uses WMBS job splitting which does not split files over jobs
635 +        so the job will have AT LEAST as many lumis as requested, perhaps
636 +        more
637          """
638 <        return
638 >
639 >        common.logger.debug('Splitting by Lumi')
640 >        self.checkLumiSettings()
641 >
642 >        blockSites = self.args['blockSites']
643 >        pubdata = self.args['pubdata']
644 >
645 >        lumisPerFile  = pubdata.getLumis()
646 >
647 >        # Make the list of WMBS files for job splitter
648 >        fileList = pubdata.getListFiles()
649 >        thefiles = Fileset(name='FilesToSplit')
650 >        for jobFile in fileList:
651 >            block = jobFile['Block']['Name']
652 >            try:
653 >                jobFile['Block']['StorageElementList'].extend(blockSites[block])
654 >            except:
655 >                continue
656 >            wmbsFile = File(jobFile['LogicalFileName'])
657 >            [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
658 >            wmbsFile['block'] = block
659 >            for lumi in lumisPerFile[jobFile['LogicalFileName']]:
660 >                wmbsFile.addRun(Run(lumi[0], lumi[1]))
661 >            thefiles.addFile(wmbsFile)
662 >
663 >        # Create the factory and workflow
664 >        work = Workflow()
665 >        subs = Subscription(fileset    = thefiles,    workflow = work,
666 >                            split_algo = 'LumiBased', type     = "Processing")
667 >        splitter = SplitterFactory()
668 >        jobFactory = splitter(subs)
669 >
670 >        list_of_lists = []
671 >        jobDestination = []
672 >        jobCount = 0
673 >        lumisCreated = 0
674 >
675 >        if not self.limitJobLumis:
676 >            self.lumisPerJob = pubdata.getMaxLumis() // self.theNumberOfJobs + 1
677 >            common.logger.info('Each job will process about %s lumis.' %
678 >                                self.lumisPerJob)
679 >
680 >        for jobGroup in  jobFactory(lumis_per_job = self.lumisPerJob):
681 >            for job in jobGroup.jobs:
682 >                if (self.limitNJobs and jobCount >= self.theNumberOfJobs):
683 >                    common.logger.info('Limit on number of jobs reached.')
684 >                    break
685 >                if (self.limitTotalLumis and lumisCreated >= self.totalNLumis):
686 >                    common.logger.info('Limit on number of lumis reached.')
687 >                    break
688 >                lumis = []
689 >                lfns  = []
690 >                locations = []
691 >                firstFile = True
692 >                # Collect information from all the files
693 >                for jobFile in job.getFiles():
694 >                    if firstFile:  # Get locations from first file in the job
695 >                        for loc in jobFile['locations']:
696 >                            locations.append(loc)
697 >                        firstFile = False
698 >                    # Accumulate Lumis from all files
699 >                    for lumiList in jobFile['runs']:
700 >                        theRun = lumiList.run
701 >                        for theLumi in list(lumiList):
702 >                            lumis.append( (theRun, theLumi) )
703 >
704 >                    lfns.append(jobFile['lfn'])
705 >                fileString = ','.join(lfns)
706 >                lumiString = compressLumiString(lumis)
707 >                list_of_lists.append([fileString, str(-1), str(0), lumiString])
708 >
709 >                jobDestination.append(locations)
710 >                jobCount += 1
711 >                lumisCreated += len(lumis)
712 >                common.logger.debug('Job %s will run on %s files and %s lumis '
713 >                    % (jobCount, len(lfns), len(lumis) ))
714 >
715 >        common.logger.info('%s jobs created to run on %s lumis' %
716 >                              (jobCount, lumisCreated))
717 >
718 >        # Prepare dict output matching back to non-WMBS job creation
719 >        dictOut = {}
720 >        dictOut['params'] = ['InputFiles', 'MaxEvents', 'SkipEvents', 'Lumis']
721 >        dictOut['args'] = list_of_lists
722 >        dictOut['jobDestination'] = jobDestination
723 >        dictOut['njobs'] = jobCount
724 >
725 >        return dictOut
726 >
727 >
728      def Algos(self):
729          """
730          Define key splittingType matrix
731          """
732 <        SplitAlogs = {
733 <                     'EventBased'           : self.jobSplittingByEvent,
732 >        SplitAlogs = {
733 >                     'EventBased'           : self.jobSplittingByEvent,
734                       'RunBased'             : self.jobSplittingByRun,
735 <                     'LumiBased'            : self.jobSplittingByLumi,
736 <                     'NoInput'              : self.jobSplittingNoInput,
735 >                     'LumiBased'            : self.jobSplittingByLumi,
736 >                     'NoInput'              : self.jobSplittingNoInput,
737                       'ForScript'            : self.jobSplittingForScript
738 <                     }  
738 >                     }
739          return SplitAlogs
740  
741 +
742 +
743 + def compressLumiString(lumis):
744 +    """
745 +    Turn a list of 2-tuples of run/lumi numbers into a list of the format
746 +    R1:L1,R2:L2-R3:L3 which is acceptable to CMSSW LumiBlockRange variable
747 +    """
748 +
749 +    lumis.sort()
750 +    parts = []
751 +    startRange = None
752 +    endRange = None
753 +
754 +    for lumiBlock in lumis:
755 +        if not startRange: # This is the first one
756 +            startRange = lumiBlock
757 +            endRange = lumiBlock
758 +        elif lumiBlock == endRange: # Same Lumi (different files?)
759 +            pass
760 +        elif lumiBlock[0] == endRange[0] and lumiBlock[1] == endRange[1] + 1: # This is a continuation
761 +            endRange = lumiBlock
762 +        else: # This is the start of a new range
763 +            part = ':'.join(map(str, startRange))
764 +            if startRange != endRange:
765 +                part += '-' + ':'.join(map(str, endRange))
766 +            parts.append(part)
767 +            startRange = lumiBlock
768 +            endRange = lumiBlock
769 +
770 +    # Put out what's left
771 +    if startRange:
772 +        part = ':'.join(map(str, startRange))
773 +        if startRange != endRange:
774 +            part += '-' + ':'.join(map(str, endRange))
775 +        parts.append(part)
776 +
777 +    output = ','.join(parts)
778 +    return output
779 +
780 +

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines