ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Splitter.py
(Generate patch)

Comparing COMP/CRAB/python/Splitter.py (file contents):
Revision 1.9 by spiga, Fri Mar 6 17:03:08 2009 UTC vs.
Revision 1.37 by ewv, Wed May 26 19:46:12 2010 UTC

# Line 1 | Line 1
1 +
2 + __revision__ = "$Id$"
3 + __version__ = "$Revision$"
4 +
5   import common
2 from crab_logger import Logger
6   from crab_exceptions import *
7   from crab_util import *
8 +
9 + from WMCore.DataStructs.File import File
10 + from WMCore.DataStructs.Fileset import Fileset
11 + from WMCore.DataStructs.Run import Run
12 + from WMCore.DataStructs.Subscription import Subscription
13 + from WMCore.DataStructs.Workflow import Workflow
14 + from WMCore.JobSplitting.SplitterFactory import SplitterFactory
15   from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
16 + from LumiList import LumiList
17  
18   class JobSplitter:
19      def __init__( self, cfg_params,  args ):
20          self.cfg_params = cfg_params
21          self.args=args
22 +
23 +        self.lumisPerJob = -1
24 +        self.totalNLumis = 0
25 +        self.theNumberOfJobs = 0
26 +        self.limitNJobs = False
27 +        self.limitTotalLumis = False
28 +        self.limitJobLumis = False
29 +
30          #self.maxEvents
31          # init BlackWhiteListParser
32 <        seWhiteList = cfg_params.get('EDG.se_white_list',[])
33 <        seBlackList = cfg_params.get('EDG.se_black_list',[])
34 <        self.blackWhiteListParser = SEBlackWhiteListParser(seWhiteList, seBlackList, common.logger)
32 >        self.seWhiteList = cfg_params.get('GRID.se_white_list',[])
33 >        seBlackList = cfg_params.get('GRID.se_black_list',[])
34 >        self.blackWhiteListParser = SEBlackWhiteListParser(self.seWhiteList, seBlackList, common.logger())
35  
36  
37      def checkUserSettings(self):
# Line 43 | Line 62 | class JobSplitter:
62              self.total_number_of_events = 0
63              self.selectTotalNumberEvents = 0
64  
65 +        return
66 +
67 +    def checkLumiSettings(self):
68 +        """
69 +        Check to make sure the user has specified enough information to
70 +        perform splitting by Lumis to run the job
71 +        """
72 +        settings = 0
73 +        if self.cfg_params.has_key('CMSSW.lumis_per_job'):
74 +            self.lumisPerJob =int( self.cfg_params['CMSSW.lumis_per_job'])
75 +            self.limitJobLumis = True
76 +            settings += 1
77 +
78 +        if self.cfg_params.has_key('CMSSW.number_of_jobs'):
79 +            self.theNumberOfJobs =int( self.cfg_params['CMSSW.number_of_jobs'])
80 +            self.limitNJobs = True
81 +            settings += 1
82 +
83 +        if self.cfg_params.has_key('CMSSW.total_number_of_lumis'):
84 +            self.totalNLumis = int(self.cfg_params['CMSSW.total_number_of_lumis'])
85 +            self.limitTotalLumis = (self.totalNLumis != -1)
86 +            settings += 1
87 +
88 +        if settings != 2:
89 +            msg = 'When splitting by lumi section you must specify two and only two of:\n'
90 +            msg += '  number_of_jobs, lumis_per_job, total_number_of_lumis'
91 +            raise CrabException(msg)
92 +        if self.limitNJobs and self.limitJobLumis:
93 +            self.limitTotalLumis = True
94 +            self.totalNLumis = self.lumisPerJob * self.theNumberOfJobs
95 +
96 +        # Has the user specified runselection?
97 +        if (self.cfg_params.has_key('CMSSW.runselection')):
98 +            common.logger.info('You have specified runselection and split by lumi.')
99 +            common.logger.info('Good lumi list will be the intersection of runselection and lumimask or ADS (if any).')
100 +        return
101 +
102 +    def ComputeSubBlockSites( self, blockSites ):
103 +        """
104 +        """
105 +        sub_blockSites = {}
106 +        for k,v in blockSites.iteritems():
107 +            sites=self.blackWhiteListParser.checkWhiteList(v)
108 +            if sites : sub_blockSites[k]=v
109 +        if len(sub_blockSites) < 1:
110 +            msg = 'WARNING: the sites %s is not hosting any part of data.'%self.seWhiteList
111 +            raise CrabException(msg)
112 +        return sub_blockSites
113  
114   ########################################################################
115      def jobSplittingByEvent( self ):
# Line 58 | Line 125 | class JobSplitter:
125                self.list_of_args - File(s) job will run on (a list of lists)
126          """
127  
128 <        jobDestination=[]  
128 >        jobDestination=[]
129          self.checkUserSettings()
130          if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
131              msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
132              raise CrabException(msg)
133 <
134 <        blockSites = self.args['blockSites']
133 >
134 >        blockSites = self.args['blockSites']
135          pubdata = self.args['pubdata']
136          filesbyblock=pubdata.getFiles()
137  
# Line 78 | Line 145 | class JobSplitter:
145          self.useParent = int(self.cfg_params.get('CMSSW.use_parent',0))
146          noBboundary = int(self.cfg_params.get('CMSSW.no_block_boundary',0))
147  
148 +        if noBboundary == 1:
149 +            if self.total_number_of_events== -1:
150 +                msg = 'You are selecting no_block_boundary=1 which does not allow to set total_number_of_events=-1\n'
151 +                msg +='\tYou shoud get the number of event from DBS web interface and use it for your configuration.'
152 +                raise CrabException(msg)
153 +            if len(self.seWhiteList) == 0 or  len(self.seWhiteList.split(',')) != 1:
154 +                msg = 'You are selecting no_block_boundary=1 which requires to choose one and only one site.\n'
155 +                msg += "\tPlease set se_white_list with the site's storage element name."
156 +                raise  CrabException(msg)
157 +            blockSites = self.ComputeSubBlockSites(blockSites)
158 +
159          # ---- Handle the possible job splitting configurations ---- #
160          if (self.selectTotalNumberEvents):
161              totalEventsRequested = self.total_number_of_events
# Line 92 | Line 170 | class JobSplitter:
170          # If user requested more events than are in the dataset
171          elif (totalEventsRequested > self.maxEvents):
172              eventsRemaining = self.maxEvents
173 <            common.logger.message("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
173 >            common.logger.info("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
174          # If user requested less events than are in the dataset
175          else:
176              eventsRemaining = totalEventsRequested
# Line 108 | Line 186 | class JobSplitter:
186              eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs)
187  
188          if (self.selectNumberOfJobs):
189 <            common.logger.message("May not create the exact number_of_jobs requested.")
189 >            common.logger.info("May not create the exact number_of_jobs requested.")
190  
191          # old... to remove Daniele
192          totalNumberOfJobs = 999999999
# Line 125 | Line 203 | class JobSplitter:
203          jobsOfBlock = {}
204  
205          parString = ""
206 +        pString = ""
207          filesEventCount = 0
208 +        msg=''
209  
210          # ---- Iterate over the blocks in the dataset until ---- #
211          # ---- we've met the requested total # of events    ---- #
# Line 137 | Line 217 | class JobSplitter:
217  
218              if self.eventsbyblock.has_key(block) :
219                  numEventsInBlock = self.eventsbyblock[block]
220 <                common.logger.debug(5,'Events in Block File '+str(numEventsInBlock))
220 >                common.logger.debug('Events in Block File '+str(numEventsInBlock))
221  
222                  files = filesbyblock[block]
223                  numFilesInBlock = len(files)
# Line 147 | Line 227 | class JobSplitter:
227                  if noBboundary == 0: # DD
228                      # ---- New block => New job ---- #
229                      parString = ""
230 +                    pString=""
231                      # counter for number of events in files currently worked on
232                      filesEventCount = 0
233                  # flag if next while loop should touch new file
# Line 156 | Line 237 | class JobSplitter:
237  
238                  # ---- Iterate over the files in the block until we've met the requested ---- #
239                  # ---- total # of events or we've gone over all the files in this block  ---- #
240 <                pString=''
240 >                msg='\n'
241                  while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
242                      file = files[fileCount]
243                      if self.useParent==1:
244                          parent = self.parentFiles[file]
245 <                        for f in parent :
165 <                            pString += '\\\"' + f + '\\\"\,'
166 <                        common.logger.debug(6, "File "+str(file)+" has the following parents: "+str(parent))
167 <                        common.logger.write("File "+str(file)+" has the following parents: "+str(parent))
245 >                        common.logger.log(10-1, "File "+str(file)+" has the following parents: "+str(parent))
246                      if newFile :
247                          try:
248                              numEventsInFile = self.eventsbyfile[file]
249 <                            common.logger.debug(6, "File "+str(file)+" has "+str(numEventsInFile)+" events")
249 >                            common.logger.log(10-1, "File "+str(file)+" has "+str(numEventsInFile)+" events")
250                              # increase filesEventCount
251                              filesEventCount += numEventsInFile
252                              # Add file to current job
253 <                            parString += '\\\"' + file + '\\\"\,'
253 >                            parString +=  file + ','
254 >                            if self.useParent==1:
255 >                                for f in parent :
256 >                                    pString += f  + ','
257                              newFile = 0
258                          except KeyError:
259 <                            common.logger.message("File "+str(file)+" has unknown number of events: skipping")
259 >                            common.logger.info("File "+str(file)+" has unknown number of events: skipping")
260  
261                      eventsPerJobRequested = min(eventsPerJobRequested, eventsRemaining)
262                      # if less events in file remain than eventsPerJobRequested
# Line 188 | Line 269 | class JobSplitter:
269                              if ( fileCount == numFilesInBlock-1 ) :
270                                  # end job using last file, use remaining events in block
271                                  # close job and touch new file
272 <                                fullString = parString[:-2]
272 >                                fullString = parString[:-1]
273                                  if self.useParent==1:
274 <                                    fullParentString = pString[:-2]
274 >                                    fullParentString = pString[:-1]
275                                      list_of_lists.append([fullString,fullParentString,str(-1),str(jobSkipEventCount)])
276                                  else:
277                                      list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
278 <                                common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(filesEventCount - jobSkipEventCount)+" events (last file in block).")
278 >                                msg += "Job %s can run over %s  events (last file in block).\n"%(str(jobCount+1), str(filesEventCount - jobSkipEventCount))
279                                  jobDestination.append(blockSites[block])
280 <                                common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(jobDestination[jobCount]))
280 >                                msg += "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
281                                  # fill jobs of block dictionary
282                                  jobsOfBlock[block].append(jobCount+1)
283                                  # reset counter
# Line 217 | Line 298 | class JobSplitter:
298                      # if events in file equal to eventsPerJobRequested
299                      elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
300                          # close job and touch new file
301 <                        fullString = parString[:-2]
301 >                        fullString = parString[:-1]
302                          if self.useParent==1:
303 <                            fullParentString = pString[:-2]
303 >                            fullParentString = pString[:-1]
304                              list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
305                          else:
306                              list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
307 <                        common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
307 >                        msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
308                          jobDestination.append(blockSites[block])
309 <                        common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(jobDestination[jobCount]))
309 >                        msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
310                          jobsOfBlock[block].append(jobCount+1)
311                          # reset counter
312                          jobCount = jobCount + 1
# Line 242 | Line 323 | class JobSplitter:
323                      # if more events in file remain than eventsPerJobRequested
324                      else :
325                          # close job but don't touch new file
326 <                        fullString = parString[:-2]
326 >                        fullString = parString[:-1]
327                          if self.useParent==1:
328 <                            fullParentString = pString[:-2]
328 >                            fullParentString = pString[:-1]
329                              list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
330                          else:
331                              list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
332 <                        common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
332 >                        msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
333                          jobDestination.append(blockSites[block])
334 <                        common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(jobDestination[jobCount]))
334 >                        msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
335                          jobsOfBlock[block].append(jobCount+1)
336                          # increase counter
337                          jobCount = jobCount + 1
# Line 261 | Line 342 | class JobSplitter:
342                          jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
343                          # remove all but the last file
344                          filesEventCount = self.eventsbyfile[file]
345 +                        pString_tmp=''
346                          if self.useParent==1:
347 <                            for f in parent : pString += '\\\"' + f + '\\\"\,'
348 <                        parString = '\\\"' + file + '\\\"\,'
347 >                            for f in parent : pString_tmp +=  f + ','
348 >                        pString =  pString_tmp
349 >                        parString =  file + ','
350                      pass # END if
351                  pass # END while (iterate over files in the block)
352          pass # END while (iterate over blocks in the dataset)
353 +        common.logger.debug(msg)
354          self.ncjobs = self.total_number_of_jobs = jobCount
355          if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
356 <            common.logger.message("Could not run on all requested events because some blocks not hosted at allowed sites.")
357 <        common.logger.message(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
358 <
356 >            common.logger.info("Could not run on all requested events because some blocks not hosted at allowed sites.")
357 >        common.logger.info(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
358 >
359          # skip check on  block with no sites  DD
360          if noBboundary == 0 : self.checkBlockNoSite(blocks,jobsOfBlock,blockSites)
361  
362         # prepare dict output
363          dictOut = {}
364 +        dictOut['params']= ['InputFiles','MaxEvents','SkipEvents']
365 +        if self.useParent: dictOut['params']= ['InputFiles','ParentFiles','MaxEvents','SkipEvents']
366          dictOut['args'] = list_of_lists
367          dictOut['jobDestination'] = jobDestination
368          dictOut['njobs']=self.total_number_of_jobs
# Line 285 | Line 371 | class JobSplitter:
371  
372          # keep trace of block with no sites to print a warning at the end
373  
374 <    def checkBlockNoSite(self,blocks,jobsOfBlock,blockSites):  
374 >    def checkBlockNoSite(self,blocks,jobsOfBlock,blockSites):
375          # screen output
376          screenOutput = "List of jobs and available destination sites:\n\n"
377          noSiteBlock = []
378          bloskNoSite = []
379 +        allBlock = []
380  
381          blockCounter = 0
382          for block in blocks:
383              if block in jobsOfBlock.keys() :
384                  blockCounter += 1
385 +                allBlock.append( blockCounter )
386 +                sites=self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],[block]),[block])
387                  screenOutput += "Block %5i: jobs %20s: sites: %s\n" % (blockCounter,spanRanges(jobsOfBlock[block]),
388 <                    ','.join(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],[block]),[block])))
389 <                if len(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],[block]),[block])) == 0:
388 >                    ', '.join(SE2CMS(sites)))
389 >                if len(sites) == 0:
390                      noSiteBlock.append( spanRanges(jobsOfBlock[block]) )
391                      bloskNoSite.append( blockCounter )
392  
393 <        common.logger.message(screenOutput)
393 >        common.logger.info(screenOutput)
394          if len(noSiteBlock) > 0 and len(bloskNoSite) > 0:
395              msg = 'WARNING: No sites are hosting any part of data for block:\n                '
396              virgola = ""
# Line 309 | Line 398 | class JobSplitter:
398                  virgola = ","
399              for block in bloskNoSite:
400                  msg += ' ' + str(block) + virgola
401 <            msg += '\n               Related jobs:\n                 '
401 >            msg += '\n\t\tRelated jobs:\n                 '
402              virgola = ""
403              if len(noSiteBlock) > 1:
404                  virgola = ","
405              for range_jobs in noSiteBlock:
406                  msg += str(range_jobs) + virgola
407 <            msg += '\n               will not be submitted and this block of data can not be analyzed!\n'
408 <            if self.cfg_params.has_key('EDG.se_white_list'):
409 <                msg += 'WARNING: SE White List: '+self.cfg_params['EDG.se_white_list']+'\n'
410 <                msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
411 <                msg += 'Please check if the dataset is available at this site!)\n'
412 <            if self.cfg_params.has_key('EDG.ce_white_list'):
413 <                msg += 'WARNING: CE White List: '+self.cfg_params['EDG.ce_white_list']+'\n'
414 <                msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
415 <                msg += 'Please check if the dataset is available at this site!)\n'
416 <
417 <            common.logger.message(msg)
407 >            msg += '\n\t\twill not be submitted and this block of data can not be analyzed!\n'
408 >            if self.cfg_params.has_key('GRID.se_white_list'):
409 >                msg += '\tWARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
410 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
411 >                msg += '\tPlease check if the dataset is available at this site!)'
412 >            if self.cfg_params.has_key('GRID.ce_white_list'):
413 >                msg += '\tWARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
414 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
415 >                msg += '\tPlease check if the dataset is available at this site!)\n'
416 >
417 >            common.logger.info(msg)
418 >
419 >        if bloskNoSite == allBlock:
420 >            msg += 'Requested jobs cannot be Created! \n'
421 >            if self.cfg_params.has_key('GRID.se_white_list'):
422 >                msg += '\tWARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
423 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
424 >                msg += '\tPlease check if the dataset is available at this site!)'
425 >            if self.cfg_params.has_key('GRID.ce_white_list'):
426 >                msg += '\tWARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
427 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
428 >                msg += '\tPlease check if the dataset is available at this site!)\n'
429 >            raise CrabException(msg)
430  
431          return
432  
433  
434   ########################################################################
435 <    def jobSplittingByRun(self):
435 >    def jobSplittingByRun(self):
436          """
437          """
337        from sets import Set  
338        from WMCore.JobSplitting.RunBased import RunBased
339        from WMCore.DataStructs.Workflow import Workflow
340        from WMCore.DataStructs.File import File
341        from WMCore.DataStructs.Fileset import Fileset
342        from WMCore.DataStructs.Subscription import Subscription
343        from WMCore.JobSplitting.SplitterFactory import SplitterFactory
344        from WMCore.DataStructs.Run import Run
438  
439          self.checkUserSettings()
440 <        blockSites = self.args['blockSites']
440 >        blockSites = self.args['blockSites']
441          pubdata = self.args['pubdata']
442  
443          if self.selectNumberOfJobs == 0 :
444              self.theNumberOfJobs = 9999999
445          blocks = {}
446 <        runList = []
446 >        runList = []
447          thefiles = Fileset(name='FilesToSplit')
448          fileList = pubdata.getListFiles()
449          for f in fileList:
450              block = f['Block']['Name']
451 <            try:
451 >            try:
452                  f['Block']['StorageElementList'].extend(blockSites[block])
453              except:
454                  continue
# Line 363 | Line 456 | class JobSplitter:
456              [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
457              wmbsFile['block'] = block
458              runNum = f['RunsList'][0]['RunNumber']
459 <            runList.append(runNum)
459 >            runList.append(runNum)
460              myRun = Run(runNumber=runNum)
461              wmbsFile.addRun( myRun )
462              thefiles.addFile(
463                  wmbsFile
464                  )
465 <
465 >
466          work = Workflow()
467          subs = Subscription(
468          fileset = thefiles,
# Line 378 | Line 471 | class JobSplitter:
471          type = "Processing")
472          splitter = SplitterFactory()
473          jobfactory = splitter(subs)
474 <        
475 <        #loop over all runs
383 <        set = Set(runList)
474 >
475 >        #loop over all runs
476          list_of_lists = []
477          jobDestination = []
386
478          count = 0
479 <        for i in list(set):
479 >        for jobGroup in  jobfactory():
480              if count <  self.theNumberOfJobs:
481 <                res = self.getJobInfo(jobfactory())
482 <                parString = ''
481 >                res = self.getJobInfo(jobGroup)
482 >                parString = ''
483                  for file in res['lfns']:
484 <                    parString += '\\\"' + file + '\\\"\,'
485 <                fullString = parString[:-2]
486 <                list_of_lists.append([fullString,str(-1),str(0)])    
484 >                    parString += file + ','
485 >                fullString = parString[:-1]
486 >                list_of_lists.append([fullString,str(-1),str(0)])
487                  #need to check single file location
488 <                jobDestination.append(res['locations'])  
488 >                jobDestination.append(res['locations'])
489                  count +=1
490 <       # prepare dict output
490 >        # prepare dict output
491          dictOut = {}
492 +        dictOut['params']= ['InputFiles','MaxEvents','SkipEvents']
493          dictOut['args'] = list_of_lists
494          dictOut['jobDestination'] = jobDestination
495          dictOut['njobs']=count
# Line 406 | Line 498 | class JobSplitter:
498  
499      def getJobInfo( self,jobGroup ):
500          res = {}
501 <        lfns = []        
502 <        locations = []        
501 >        lfns = []
502 >        locations = []
503          tmp_check=0
504          for job in jobGroup.jobs:
505              for file in job.getFiles():
506 <                lfns.append(file['lfn'])
506 >                lfns.append(file['lfn'])
507                  for loc in file['locations']:
508                      if tmp_check < 1 :
509                          locations.append(loc)
510 <                tmp_check = tmp_check + 1
511 <                ### qui va messo il check per la locations
512 <        res['lfns'] = lfns
513 <        res['locations'] = locations
514 <        return res                
515 <      
510 >                tmp_check = tmp_check + 1
511 >                ### qui va messo il check per la locations
512 >        res['lfns'] = lfns
513 >        res['locations'] = locations
514 >        return res
515 >
516   ########################################################################
517 <    def jobSplittingNoInput(self):
517 >    def prepareSplittingNoInput(self):
518          """
427        Perform job splitting based on number of event per job
519          """
429        common.logger.debug(5,'Splitting per events')
430        self.checkUserSettings()
431        jobDestination=[]
432        if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
433            msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
434            raise CrabException(msg)
435
436        managedGenerators =self.args['managedGenerators']
437        generator = self.args['generator']
438        firstRun = self.cfg_params.get('CMSSW.first_run',None)
439
520          if (self.selectEventsPerJob):
521 <            common.logger.message('Required '+str(self.eventsPerJob)+' events per job ')
521 >            common.logger.info('Required '+str(self.eventsPerJob)+' events per job ')
522          if (self.selectNumberOfJobs):
523 <            common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
523 >            common.logger.info('Required '+str(self.theNumberOfJobs)+' jobs in total ')
524          if (self.selectTotalNumberEvents):
525 <            common.logger.message('Required '+str(self.total_number_of_events)+' events in total ')
525 >            common.logger.info('Required '+str(self.total_number_of_events)+' events in total ')
526  
527          if (self.total_number_of_events < 0):
528              msg='Cannot split jobs per Events with "-1" as total number of events'
# Line 459 | Line 539 | class JobSplitter:
539              self.total_number_of_jobs = self.theNumberOfJobs
540              self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
541  
542 <        common.logger.debug(5,'N jobs  '+str(self.total_number_of_jobs))
542 >
543 >    def jobSplittingNoInput(self):
544 >        """
545 >        Perform job splitting based on number of event per job
546 >        """
547 >        common.logger.debug('Splitting per events')
548 >        self.checkUserSettings()
549 >        jobDestination=[]
550 >        if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
551 >            msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
552 >            raise CrabException(msg)
553 >
554 >        managedGenerators =self.args['managedGenerators']
555 >        generator = self.args['generator']
556 >        firstLumi = self.cfg_params.get('CMSSW.first_lumi', 1)
557 >
558 >        self.prepareSplittingNoInput()
559 >
560 >        common.logger.debug('N jobs  '+str(self.total_number_of_jobs))
561  
562          # is there any remainder?
563          check = int(self.total_number_of_events) - (int(self.total_number_of_jobs)*self.eventsPerJob)
564  
565 <        common.logger.debug(5,'Check  '+str(check))
565 >        common.logger.debug('Check  '+str(check))
566  
567 <        common.logger.message(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
567 >        common.logger.info(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
568          if check > 0:
569 <            common.logger.message('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
569 >            common.logger.info('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
570  
571          # argument is seed number.$i
572          self.list_of_args = []
573          for i in range(self.total_number_of_jobs):
574              ## Since there is no input, any site is good
575 <            jobDestination.append([""]) #must be empty to write correctly the xml
575 >            jobDestination.append([""]) # must be empty to correctly write the XML
576              args=[]
577 <            if (firstRun):
578 <                ## pythia first run
481 <                args.append(str(firstRun)+str(i))
577 >            if (firstLumi): # Pythia first lumi
578 >                args.append(str(int(firstLumi)+i))
579              if (generator in managedGenerators):
580 <                if (generator == 'comphep' and i == 0):
580 >               args.append(generator)
581 >               if (generator == 'comphep' and i == 0):
582                      # COMPHEP is brain-dead and wants event #'s like 1,100,200,300
583                      args.append('1')
584 <                else:
584 >               else:
585                      args.append(str(i*self.eventsPerJob))
586              args.append(str(self.eventsPerJob))
587              self.list_of_args.append(args)
588         # prepare dict output
589 +
590          dictOut = {}
591 +        dictOut['params'] = ['MaxEvents']
592 +        if (firstLumi):
593 +            dictOut['params'] = ['FirstLumi','MaxEvents']
594 +            if (generator in managedGenerators):
595 +                dictOut['params'] = ['FirstLumi', 'Generator', 'FirstEvent', 'MaxEvents']
596 +        else:
597 +            if (generator in managedGenerators) :
598 +                dictOut['params'] = ['Generator', 'FirstEvent', 'MaxEvents']
599          dictOut['args'] = self.list_of_args
600          dictOut['jobDestination'] = jobDestination
601          dictOut['njobs']=self.total_number_of_jobs
# Line 505 | Line 612 | class JobSplitter:
612              msg = 'must specify  number_of_jobs.'
613              raise crabexception(msg)
614          jobDestination = []
615 <        common.logger.debug(5,'Splitting per job')
616 <        common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
615 >        common.logger.debug('Splitting per job')
616 >        common.logger.info('Required '+str(self.theNumberOfJobs)+' jobs in total ')
617 >
618 > #        self.total_number_of_jobs = self.theNumberOfJobs
619  
620 <        self.total_number_of_jobs = self.theNumberOfJobs
620 >        self.prepareSplittingNoInput()
621  
622 <        common.logger.debug(5,'N jobs  '+str(self.total_number_of_jobs))
622 >        common.logger.debug('N jobs  '+str(self.total_number_of_jobs))
623  
624 <        common.logger.message(str(self.total_number_of_jobs)+' jobs can be created')
624 >        common.logger.info(str(self.total_number_of_jobs)+' jobs can be created')
625  
626          # argument is seed number.$i
627          self.list_of_args = []
628          for i in range(self.total_number_of_jobs):
629 +            args=[]
630              jobDestination.append([""])
631 <            self.list_of_args.append([str(i)])
631 >            if self.eventsPerJob != 0 :
632 >                args.append(str(self.eventsPerJob))
633 >                self.list_of_args.append(args)
634  
635         # prepare dict output
636          dictOut = {}
637 <        dictOut['args'] = self.list_of_args
637 >        dictOut['params'] = ['MaxEvents']
638 >        dictOut['args'] =  self.list_of_args
639          dictOut['jobDestination'] = jobDestination
640          dictOut['njobs']=self.total_number_of_jobs
641          return dictOut
529
642  
643 <    def jobSplittingByLumi(self):
643 >
644 >    def jobSplittingByLumi(self):
645          """
646 +        Split task into jobs by Lumi section paying attention to which
647 +        lumis should be run (according to the analysis dataset).
648 +        This uses WMBS job splitting which does not split files over jobs
649 +        so the job will have AT LEAST as many lumis as requested, perhaps
650 +        more
651          """
652 <        return
652 >
653 >        common.logger.debug('Splitting by Lumi')
654 >        self.checkLumiSettings()
655 >
656 >        blockSites = self.args['blockSites']
657 >        pubdata = self.args['pubdata']
658 >
659 >        lumisPerFile  = pubdata.getLumis()
660 >
661 >        # Make the list of WMBS files for job splitter
662 >        fileList = pubdata.getListFiles()
663 >        thefiles = Fileset(name='FilesToSplit')
664 >        for jobFile in fileList:
665 >            block = jobFile['Block']['Name']
666 >            try:
667 >                jobFile['Block']['StorageElementList'].extend(blockSites[block])
668 >            except:
669 >                continue
670 >            wmbsFile = File(jobFile['LogicalFileName'])
671 >            [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
672 >            wmbsFile['block'] = block
673 >            for lumi in lumisPerFile[jobFile['LogicalFileName']]:
674 >                wmbsFile.addRun(Run(lumi[0], lumi[1]))
675 >            thefiles.addFile(wmbsFile)
676 >
677 >        # Create the factory and workflow
678 >        work = Workflow()
679 >        subs = Subscription(fileset    = thefiles,    workflow = work,
680 >                            split_algo = 'LumiBased', type     = "Processing")
681 >        splitter = SplitterFactory()
682 >        jobFactory = splitter(subs)
683 >
684 >        list_of_lists = []
685 >        jobDestination = []
686 >        jobCount = 0
687 >        lumisCreated = 0
688 >
689 >        if not self.limitJobLumis:
690 >            self.lumisPerJob = pubdata.getMaxLumis() // self.theNumberOfJobs + 1
691 >            common.logger.info('Each job will process about %s lumis.' %
692 >                                self.lumisPerJob)
693 >
694 >        for jobGroup in  jobFactory(lumis_per_job = self.lumisPerJob):
695 >            for job in jobGroup.jobs:
696 >                if (self.limitNJobs and jobCount >= self.theNumberOfJobs):
697 >                    common.logger.info('Limit on number of jobs reached.')
698 >                    break
699 >                if (self.limitTotalLumis and lumisCreated >= self.totalNLumis):
700 >                    common.logger.info('Limit on number of lumis reached.')
701 >                    break
702 >                lumis = []
703 >                lfns  = []
704 >                locations = []
705 >                firstFile = True
706 >                # Collect information from all the files
707 >                for jobFile in job.getFiles():
708 >                    doFile = False
709 >                    if firstFile:  # Get locations from first file in the job
710 >                        for loc in jobFile['locations']:
711 >                            locations.append(loc)
712 >                        firstFile = False
713 >                    # Accumulate Lumis from all files
714 >                    for lumiList in jobFile['runs']:
715 >                        theRun = lumiList.run
716 >                        for theLumi in list(lumiList):
717 >                            if (not self.limitTotalLumis) or \
718 >                               (lumisCreated <= self.totalNLumis):
719 >                                doFile = True
720 >                                lumisCreated += 1
721 >                                lumis.append( (theRun, theLumi) )
722 >                    if doFile:
723 >                        lfns.append(jobFile['lfn'])
724 >                fileString = ','.join(lfns)
725 >                lumiLister = LumiList(lumis = lumis)
726 >                lumiString = lumiLister.getCMSSWString()
727 >                list_of_lists.append([fileString, str(-1), str(0), lumiString])
728 >
729 >                jobDestination.append(locations)
730 >                jobCount += 1
731 >                common.logger.debug('Job %s will run on %s files and %s lumis '
732 >                    % (jobCount, len(lfns), len(lumis) ))
733 >
734 >        common.logger.info('%s jobs created to run on %s lumis' %
735 >                              (jobCount, lumisCreated))
736 >
737 >        # Prepare dict output matching back to non-WMBS job creation
738 >        dictOut = {}
739 >        dictOut['params'] = ['InputFiles', 'MaxEvents', 'SkipEvents', 'Lumis']
740 >        dictOut['args'] = list_of_lists
741 >        dictOut['jobDestination'] = jobDestination
742 >        dictOut['njobs'] = jobCount
743 >
744 >        return dictOut
745 >
746 >
747      def Algos(self):
748          """
749          Define key splittingType matrix
750          """
751 <        SplitAlogs = {
752 <                     'EventBased'           : self.jobSplittingByEvent,
751 >        SplitAlogs = {
752 >                     'EventBased'           : self.jobSplittingByEvent,
753                       'RunBased'             : self.jobSplittingByRun,
754 <                     'LumiBased'            : self.jobSplittingByLumi,
755 <                     'NoInput'              : self.jobSplittingNoInput,
754 >                     'LumiBased'            : self.jobSplittingByLumi,
755 >                     'NoInput'              : self.jobSplittingNoInput,
756                       'ForScript'            : self.jobSplittingForScript
757 <                     }  
757 >                     }
758          return SplitAlogs
759  

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines