ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Splitter.py
(Generate patch)

Comparing COMP/CRAB/python/Splitter.py (file contents):
Revision 1.2 by spiga, Wed Feb 4 15:09:03 2009 UTC vs.
Revision 1.29 by ewv, Thu Oct 1 22:00:40 2009 UTC

# Line 1 | Line 1
1 +
2 + __revision__ = "$Id$"
3 + __version__ = "$Revision$"
4 +
5   import common
6 < from crab_logger import Logger
6 > from sets import Set
7   from crab_exceptions import *
8   from crab_util import *
9 +
10 + from WMCore.DataStructs.File import File
11 + from WMCore.DataStructs.Fileset import Fileset
12 + from WMCore.DataStructs.Run import Run
13 + from WMCore.DataStructs.Subscription import Subscription
14 + from WMCore.DataStructs.Workflow import Workflow
15 + from WMCore.JobSplitting.SplitterFactory import SplitterFactory
16   from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
17  
18   class JobSplitter:
19      def __init__( self, cfg_params,  args ):
20          self.cfg_params = cfg_params
21 <        self.blockSites = args['blockSites']
22 <        self.pubdata = args['pubdata']
21 >        self.args=args
22 >
23 >        self.lumisPerJob = -1
24 >        self.totalNLumis = 0
25 >        self.theNumberOfJobs = 0
26 >        self.limitNJobs = False
27 >        self.limitTotalLumis = False
28 >        self.limitJobLumis = False
29 >
30          #self.maxEvents
13        self.jobDestination=[]  # Site destination(s) for each job (list of lists)
31          # init BlackWhiteListParser
32 <        seWhiteList = cfg_params.get('EDG.se_white_list',[])
33 <        seBlackList = cfg_params.get('EDG.se_black_list',[])
34 <        self.blackWhiteListParser = SEBlackWhiteListParser(seWhiteList, seBlackList, common.logger)
32 >        self.seWhiteList = cfg_params.get('GRID.se_white_list',[])
33 >        seBlackList = cfg_params.get('GRID.se_black_list',[])
34 >        self.blackWhiteListParser = SEBlackWhiteListParser(self.seWhiteList, seBlackList, common.logger())
35  
36  
37      def checkUserSettings(self):
# Line 46 | Line 63 | class JobSplitter:
63              self.selectTotalNumberEvents = 0
64  
65  
66 +    def checkLumiSettings(self):
67 +        """
68 +        Check to make sure the user has specified enough information to
69 +        perform splitting by Lumis to run the job
70 +        """
71 +        settings = 0
72 +        if self.cfg_params.has_key('CMSSW.lumis_per_job'):
73 +            self.lumisPerJob =int( self.cfg_params['CMSSW.lumis_per_job'])
74 +            self.limitJobLumis = True
75 +            settings += 1
76 +
77 +        if self.cfg_params.has_key('CMSSW.number_of_jobs'):
78 +            self.theNumberOfJobs =int( self.cfg_params['CMSSW.number_of_jobs'])
79 +            self.limitNJobs = True
80 +            settings += 1
81 +
82 +        if self.cfg_params.has_key('CMSSW.total_number_of_lumis'):
83 +            self.totalNLumis = int(self.cfg_params['CMSSW.total_number_of_lumis'])
84 +            self.limitTotalLumis = (self.totalNLumis != -1)
85 +            settings += 1
86 +
87 +        if settings != 2:
88 +            msg = 'When running on analysis datasets you must specify two and only two of:\n'
89 +            msg += '  number_of_jobs, lumis_per_job, total_number_of_lumis'
90 +            raise CrabException(msg)
91 +        if self.limitNJobs and self.limitJobLumis:
92 +            self.limitTotalLumis = True
93 +            self.totalNLumis = self.lumisPerJob * self.theNumberOfJobs
94 +
95 +
96 +    def ComputeSubBlockSites( self, blockSites ):
97 +        """
98 +        """
99 +        sub_blockSites = {}
100 +        for k,v in blockSites.iteritems():
101 +            sites=self.blackWhiteListParser.checkWhiteList(v)
102 +            if sites : sub_blockSites[k]=v
103 +        if len(sub_blockSites) < 1:
104 +            msg = 'WARNING: the sites %s is not hosting any part of data.'%self.seWhiteList
105 +            raise CrabException(msg)
106 +        return sub_blockSites
107 +
108   ########################################################################
109      def jobSplittingByEvent( self ):
110          """
# Line 55 | Line 114 | class JobSplitter:
114          REQUIRES: self.selectTotalNumberEvents, self.selectEventsPerJob, self.selectNumberofJobs,
115                    self.total_number_of_events, self.eventsPerJob, self.theNumberOfJobs,
116                    self.maxEvents, self.filesbyblock
117 <        SETS: self.jobDestination - Site destination(s) for each job (a list of lists)
117 >        SETS: jobDestination - Site destination(s) for each job (a list of lists)
118                self.total_number_of_jobs - Total # of jobs
119                self.list_of_args - File(s) job will run on (a list of lists)
120          """
121  
122 +        jobDestination=[]
123          self.checkUserSettings()
124          if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
125              msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
126              raise CrabException(msg)
67
68        self.filesbyblock=self.pubdata.getFiles()
127  
128 <        self.eventsbyblock=self.pubdata.getEventsPerBlock()
129 <        self.eventsbyfile=self.pubdata.getEventsPerFile()
130 <        self.parentFiles=self.pubdata.getParent()
128 >        blockSites = self.args['blockSites']
129 >        pubdata = self.args['pubdata']
130 >        filesbyblock=pubdata.getFiles()
131 >
132 >        self.eventsbyblock=pubdata.getEventsPerBlock()
133 >        self.eventsbyfile=pubdata.getEventsPerFile()
134 >        self.parentFiles=pubdata.getParent()
135  
136          ## get max number of events
137 <        self.maxEvents=self.pubdata.getMaxEvents()
137 >        self.maxEvents=pubdata.getMaxEvents()
138  
139          self.useParent = int(self.cfg_params.get('CMSSW.use_parent',0))
140          noBboundary = int(self.cfg_params.get('CMSSW.no_block_boundary',0))
141  
142 +        if noBboundary == 1:
143 +            if self.total_number_of_events== -1:
144 +                msg = 'You are selecting no_block_boundary=1 which does not allow to set total_number_of_events=-1\n'
145 +                msg +='\tYou shoud get the number of event from DBS web interface and use it for your configuration.'
146 +                raise CrabException(msg)
147 +            if len(self.seWhiteList) == 0 or  len(self.seWhiteList.split(',')) != 1:
148 +                msg = 'You are selecting no_block_boundary=1 which requires to choose one and only one site.\n'
149 +                msg += "\tPlease set se_white_list with the site's storage element name."
150 +                raise  CrabException(msg)
151 +            blockSites = self.ComputeSubBlockSites(blockSites)
152 +
153          # ---- Handle the possible job splitting configurations ---- #
154          if (self.selectTotalNumberEvents):
155              totalEventsRequested = self.total_number_of_events
# Line 91 | Line 164 | class JobSplitter:
164          # If user requested more events than are in the dataset
165          elif (totalEventsRequested > self.maxEvents):
166              eventsRemaining = self.maxEvents
167 <            common.logger.message("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
167 >            common.logger.info("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
168          # If user requested less events than are in the dataset
169          else:
170              eventsRemaining = totalEventsRequested
# Line 107 | Line 180 | class JobSplitter:
180              eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs)
181  
182          if (self.selectNumberOfJobs):
183 <            common.logger.message("May not create the exact number_of_jobs requested.")
183 >            common.logger.info("May not create the exact number_of_jobs requested.")
184  
185          # old... to remove Daniele
186          totalNumberOfJobs = 999999999
187  
188 <        blocks = self.blockSites.keys()
188 >        blocks = blockSites.keys()
189          blockCount = 0
190          # Backup variable in case self.maxEvents counted events in a non-included block
191          numBlocksInDataset = len(blocks)
# Line 124 | Line 197 | class JobSplitter:
197          jobsOfBlock = {}
198  
199          parString = ""
200 +        pString = ""
201          filesEventCount = 0
202 +        msg=''
203  
204          # ---- Iterate over the blocks in the dataset until ---- #
205          # ---- we've met the requested total # of events    ---- #
# Line 136 | Line 211 | class JobSplitter:
211  
212              if self.eventsbyblock.has_key(block) :
213                  numEventsInBlock = self.eventsbyblock[block]
214 <                common.logger.debug(5,'Events in Block File '+str(numEventsInBlock))
214 >                common.logger.debug('Events in Block File '+str(numEventsInBlock))
215  
216 <                files = self.filesbyblock[block]
216 >                files = filesbyblock[block]
217                  numFilesInBlock = len(files)
218                  if (numFilesInBlock <= 0):
219                      continue
# Line 146 | Line 221 | class JobSplitter:
221                  if noBboundary == 0: # DD
222                      # ---- New block => New job ---- #
223                      parString = ""
224 +                    pString=""
225                      # counter for number of events in files currently worked on
226                      filesEventCount = 0
227                  # flag if next while loop should touch new file
# Line 155 | Line 231 | class JobSplitter:
231  
232                  # ---- Iterate over the files in the block until we've met the requested ---- #
233                  # ---- total # of events or we've gone over all the files in this block  ---- #
234 <                pString=''
234 >                msg='\n'
235                  while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
236                      file = files[fileCount]
237                      if self.useParent==1:
238                          parent = self.parentFiles[file]
239 <                        for f in parent :
164 <                            pString += '\\\"' + f + '\\\"\,'
165 <                        common.logger.debug(6, "File "+str(file)+" has the following parents: "+str(parent))
166 <                        common.logger.write("File "+str(file)+" has the following parents: "+str(parent))
239 >                        common.logger.log(10-1, "File "+str(file)+" has the following parents: "+str(parent))
240                      if newFile :
241                          try:
242                              numEventsInFile = self.eventsbyfile[file]
243 <                            common.logger.debug(6, "File "+str(file)+" has "+str(numEventsInFile)+" events")
243 >                            common.logger.log(10-1, "File "+str(file)+" has "+str(numEventsInFile)+" events")
244                              # increase filesEventCount
245                              filesEventCount += numEventsInFile
246                              # Add file to current job
247 <                            parString += '\\\"' + file + '\\\"\,'
247 >                            parString +=  file + ','
248 >                            if self.useParent==1:
249 >                                for f in parent :
250 >                                    pString += f  + ','
251                              newFile = 0
252                          except KeyError:
253 <                            common.logger.message("File "+str(file)+" has unknown number of events: skipping")
253 >                            common.logger.info("File "+str(file)+" has unknown number of events: skipping")
254  
255                      eventsPerJobRequested = min(eventsPerJobRequested, eventsRemaining)
256                      # if less events in file remain than eventsPerJobRequested
# Line 187 | Line 263 | class JobSplitter:
263                              if ( fileCount == numFilesInBlock-1 ) :
264                                  # end job using last file, use remaining events in block
265                                  # close job and touch new file
266 <                                fullString = parString[:-2]
266 >                                fullString = parString[:-1]
267                                  if self.useParent==1:
268 <                                    fullParentString = pString[:-2]
268 >                                    fullParentString = pString[:-1]
269                                      list_of_lists.append([fullString,fullParentString,str(-1),str(jobSkipEventCount)])
270                                  else:
271                                      list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
272 <                                common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(filesEventCount - jobSkipEventCount)+" events (last file in block).")
273 <                                self.jobDestination.append(self.blockSites[block])
274 <                                common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
272 >                                msg += "Job %s can run over %s  events (last file in block).\n"%(str(jobCount+1), str(filesEventCount - jobSkipEventCount))
273 >                                jobDestination.append(blockSites[block])
274 >                                msg += "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
275                                  # fill jobs of block dictionary
276                                  jobsOfBlock[block].append(jobCount+1)
277                                  # reset counter
# Line 216 | Line 292 | class JobSplitter:
292                      # if events in file equal to eventsPerJobRequested
293                      elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
294                          # close job and touch new file
295 <                        fullString = parString[:-2]
295 >                        fullString = parString[:-1]
296                          if self.useParent==1:
297 <                            fullParentString = pString[:-2]
297 >                            fullParentString = pString[:-1]
298                              list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
299                          else:
300                              list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
301 <                        common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
302 <                        self.jobDestination.append(self.blockSites[block])
303 <                        common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
301 >                        msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
302 >                        jobDestination.append(blockSites[block])
303 >                        msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
304                          jobsOfBlock[block].append(jobCount+1)
305                          # reset counter
306                          jobCount = jobCount + 1
# Line 241 | Line 317 | class JobSplitter:
317                      # if more events in file remain than eventsPerJobRequested
318                      else :
319                          # close job but don't touch new file
320 <                        fullString = parString[:-2]
320 >                        fullString = parString[:-1]
321                          if self.useParent==1:
322 <                            fullParentString = pString[:-2]
322 >                            fullParentString = pString[:-1]
323                              list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
324                          else:
325                              list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
326 <                        common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
327 <                        self.jobDestination.append(self.blockSites[block])
328 <                        common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
326 >                        msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
327 >                        jobDestination.append(blockSites[block])
328 >                        msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
329                          jobsOfBlock[block].append(jobCount+1)
330                          # increase counter
331                          jobCount = jobCount + 1
# Line 260 | Line 336 | class JobSplitter:
336                          jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
337                          # remove all but the last file
338                          filesEventCount = self.eventsbyfile[file]
339 +                        pString_tmp=''
340                          if self.useParent==1:
341 <                            for f in parent : pString += '\\\"' + f + '\\\"\,'
342 <                        parString = '\\\"' + file + '\\\"\,'
341 >                            for f in parent : pString_tmp +=  f + ','
342 >                        pString =  pString_tmp
343 >                        parString =  file + ','
344                      pass # END if
345                  pass # END while (iterate over files in the block)
346          pass # END while (iterate over blocks in the dataset)
347 +        common.logger.debug(msg)
348          self.ncjobs = self.total_number_of_jobs = jobCount
349          if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
350 <            common.logger.message("Could not run on all requested events because some blocks not hosted at allowed sites.")
351 <        common.logger.message(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
352 <
350 >            common.logger.info("Could not run on all requested events because some blocks not hosted at allowed sites.")
351 >        common.logger.info(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
352 >
353          # skip check on  block with no sites  DD
354 <        if noBboundary == 0 : self.checkBlockNoSite(blocks,jobsOfBlock)
354 >        if noBboundary == 0 : self.checkBlockNoSite(blocks,jobsOfBlock,blockSites)
355  
356         # prepare dict output
357          dictOut = {}
358 +        dictOut['params']= ['InputFiles','MaxEvents','SkipEvents']
359 +        if self.useParent: dictOut['params']= ['InputFiles','ParentFiles','MaxEvents','SkipEvents']
360          dictOut['args'] = list_of_lists
361 <        dictOut['jobDestination'] = self.jobDestination
361 >        dictOut['jobDestination'] = jobDestination
362          dictOut['njobs']=self.total_number_of_jobs
363  
364          return dictOut
365  
366          # keep trace of block with no sites to print a warning at the end
367  
368 <    def checkBlockNoSite(self,blocks,jobsOfBlock):  
368 >    def checkBlockNoSite(self,blocks,jobsOfBlock,blockSites):
369          # screen output
370          screenOutput = "List of jobs and available destination sites:\n\n"
371          noSiteBlock = []
372          bloskNoSite = []
373 +        allBlock = []
374  
375          blockCounter = 0
376          for block in blocks:
377              if block in jobsOfBlock.keys() :
378                  blockCounter += 1
379 +                allBlock.append( blockCounter )
380 +                sites=self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],[block]),[block])
381                  screenOutput += "Block %5i: jobs %20s: sites: %s\n" % (blockCounter,spanRanges(jobsOfBlock[block]),
382 <                    ','.join(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(self.blockSites[block],block),block)))
383 <                if len(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(self.blockSites[block],block),block)) == 0:
382 >                    ', '.join(SE2CMS(sites)))
383 >                if len(sites) == 0:
384                      noSiteBlock.append( spanRanges(jobsOfBlock[block]) )
385                      bloskNoSite.append( blockCounter )
386  
387 <        common.logger.message(screenOutput)
387 >        common.logger.info(screenOutput)
388          if len(noSiteBlock) > 0 and len(bloskNoSite) > 0:
389              msg = 'WARNING: No sites are hosting any part of data for block:\n                '
390              virgola = ""
# Line 308 | Line 392 | class JobSplitter:
392                  virgola = ","
393              for block in bloskNoSite:
394                  msg += ' ' + str(block) + virgola
395 <            msg += '\n               Related jobs:\n                 '
395 >            msg += '\n\t\tRelated jobs:\n                 '
396              virgola = ""
397              if len(noSiteBlock) > 1:
398                  virgola = ","
399              for range_jobs in noSiteBlock:
400                  msg += str(range_jobs) + virgola
401 <            msg += '\n               will not be submitted and this block of data can not be analyzed!\n'
402 <            if self.cfg_params.has_key('EDG.se_white_list'):
403 <                msg += 'WARNING: SE White List: '+self.cfg_params['EDG.se_white_list']+'\n'
404 <                msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
405 <                msg += 'Please check if the dataset is available at this site!)\n'
406 <            if self.cfg_params.has_key('EDG.ce_white_list'):
407 <                msg += 'WARNING: CE White List: '+self.cfg_params['EDG.ce_white_list']+'\n'
408 <                msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
409 <                msg += 'Please check if the dataset is available at this site!)\n'
401 >            msg += '\n\t\twill not be submitted and this block of data can not be analyzed!\n'
402 >            if self.cfg_params.has_key('GRID.se_white_list'):
403 >                msg += '\tWARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
404 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
405 >                msg += '\tPlease check if the dataset is available at this site!)'
406 >            if self.cfg_params.has_key('GRID.ce_white_list'):
407 >                msg += '\tWARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
408 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
409 >                msg += '\tPlease check if the dataset is available at this site!)\n'
410 >
411 >            common.logger.info(msg)
412  
413 <            common.logger.message(msg)
413 >        if bloskNoSite == allBlock:
414 >            raise CrabException('No jobs created')
415  
416          return
417  
418  
419   ########################################################################
420 <    def jobSplittingByRun(self):
420 >    def jobSplittingByRun(self):
421          """
422          """
336        from sets import Set  
337        from WMCore.JobSplitting.RunBased import RunBased
338        from WMCore.DataStructs.Workflow import Workflow
339        from WMCore.DataStructs.File import File
340        from WMCore.DataStructs.Fileset import Fileset
341        from WMCore.DataStructs.Subscription import Subscription
342        from WMCore.JobSplitting.SplitterFactory import SplitterFactory
343        from WMCore.DataStructs.Run import Run
423  
424          self.checkUserSettings()
425 +        blockSites = self.args['blockSites']
426 +        pubdata = self.args['pubdata']
427  
428          if self.selectNumberOfJobs == 0 :
429              self.theNumberOfJobs = 9999999
430          blocks = {}
431 <        runList = []
431 >        runList = []
432          thefiles = Fileset(name='FilesToSplit')
433 <        fileList = self.pubdata.getListFiles()
433 >        fileList = pubdata.getListFiles()
434          for f in fileList:
354           # print f
435              block = f['Block']['Name']
436 <          #  if not blocks.has_key(block):
437 <          #      blocks[block] = reader.listFileBlockLocation(block)
358 <            try:
359 <                f['Block']['StorageElementList'].extend(self.blockSites[block])
436 >            try:
437 >                f['Block']['StorageElementList'].extend(blockSites[block])
438              except:
439                  continue
440              wmbsFile = File(f['LogicalFileName'])
441 <            [ wmbsFile['locations'].add(x) for x in self.blockSites[block] ]
441 >            [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
442              wmbsFile['block'] = block
443              runNum = f['RunsList'][0]['RunNumber']
444 <            runList.append(runNum)
444 >            runList.append(runNum)
445              myRun = Run(runNumber=runNum)
446              wmbsFile.addRun( myRun )
447              thefiles.addFile(
448                  wmbsFile
449                  )
450 <
450 >
451          work = Workflow()
452          subs = Subscription(
453          fileset = thefiles,
# Line 378 | Line 456 | class JobSplitter:
456          type = "Processing")
457          splitter = SplitterFactory()
458          jobfactory = splitter(subs)
459 <        
460 <        #loop over all runs
459 >
460 >        #loop over all runs
461          set = Set(runList)
462          list_of_lists = []
463          jobDestination = []
386
464          count = 0
465 <        for i in list(set):
465 >        for jobGroup in  jobfactory():
466              if count <  self.theNumberOfJobs:
467 <                res = self.getJobInfo(jobfactory())
468 <                parString = ''
467 >                res = self.getJobInfo(jobGroup)
468 >                parString = ''
469                  for file in res['lfns']:
470 <                    parString += '\\\"' + file + '\\\"\,'
471 <                fullString = parString[:-2]
472 <                list_of_lists.append([fullString,str(-1),str(0)])    
470 >                    parString += file + ','
471 >                fullString = parString[:-1]
472 >                list_of_lists.append([fullString,str(-1),str(0)])
473                  #need to check single file location
474 <                jobDestination.append(res['locations'])  
474 >                jobDestination.append(res['locations'])
475                  count +=1
399        #print jobDestination
476         # prepare dict output
477          dictOut = {}
478 +        dictOut['params']= ['InputFiles','MaxEvents','SkipEvents']
479          dictOut['args'] = list_of_lists
480          dictOut['jobDestination'] = jobDestination
481          dictOut['njobs']=count
# Line 407 | Line 484 | class JobSplitter:
484  
485      def getJobInfo( self,jobGroup ):
486          res = {}
487 <        lfns = []        
488 <        locations = []        
487 >        lfns = []
488 >        locations = []
489          tmp_check=0
490          for job in jobGroup.jobs:
491              for file in job.getFiles():
492 <                lfns.append(file['lfn'])
492 >                lfns.append(file['lfn'])
493                  for loc in file['locations']:
494                      if tmp_check < 1 :
495                          locations.append(loc)
496 <                    tmp_check = tmp_check + 1
497 <                ### qui va messo il check per la locations
498 <        res['lfns'] = lfns
499 <        res['locations'] = locations
500 <        return res                
501 <      
496 >                tmp_check = tmp_check + 1
497 >                ### qui va messo il check per la locations
498 >        res['lfns'] = lfns
499 >        res['locations'] = locations
500 >        return res
501 >
502   ########################################################################
503 <    def jobSplittingNoInput(self):
503 >    def prepareSplittingNoInput(self):
504          """
428        Perform job splitting based on number of event per job
505          """
430        common.logger.debug(5,'Splitting per events')
431
506          if (self.selectEventsPerJob):
507 <            common.logger.message('Required '+str(self.eventsPerJob)+' events per job ')
507 >            common.logger.info('Required '+str(self.eventsPerJob)+' events per job ')
508          if (self.selectNumberOfJobs):
509 <            common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
509 >            common.logger.info('Required '+str(self.theNumberOfJobs)+' jobs in total ')
510          if (self.selectTotalNumberEvents):
511 <            common.logger.message('Required '+str(self.total_number_of_events)+' events in total ')
511 >            common.logger.info('Required '+str(self.total_number_of_events)+' events in total ')
512  
513          if (self.total_number_of_events < 0):
514              msg='Cannot split jobs per Events with "-1" as total number of events'
# Line 451 | Line 525 | class JobSplitter:
525              self.total_number_of_jobs = self.theNumberOfJobs
526              self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
527  
528 <        common.logger.debug(5,'N jobs  '+str(self.total_number_of_jobs))
528 >
529 >    def jobSplittingNoInput(self):
530 >        """
531 >        Perform job splitting based on number of event per job
532 >        """
533 >        common.logger.debug('Splitting per events')
534 >        self.checkUserSettings()
535 >        jobDestination=[]
536 >        if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
537 >            msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
538 >            raise CrabException(msg)
539 >
540 >        managedGenerators =self.args['managedGenerators']
541 >        generator = self.args['generator']
542 >        firstRun = self.cfg_params.get('CMSSW.first_run', 1)
543 >
544 >        self.prepareSplittingNoInput()
545 >
546 >        common.logger.debug('N jobs  '+str(self.total_number_of_jobs))
547  
548          # is there any remainder?
549          check = int(self.total_number_of_events) - (int(self.total_number_of_jobs)*self.eventsPerJob)
550  
551 <        common.logger.debug(5,'Check  '+str(check))
551 >        common.logger.debug('Check  '+str(check))
552  
553 <        common.logger.message(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
553 >        common.logger.info(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
554          if check > 0:
555 <            common.logger.message('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
555 >            common.logger.info('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
556  
557          # argument is seed number.$i
558          self.list_of_args = []
559          for i in range(self.total_number_of_jobs):
560              ## Since there is no input, any site is good
561 <            self.jobDestination.append([""]) #must be empty to write correctly the xml
561 >            jobDestination.append([""]) # must be empty to correctly write the XML
562              args=[]
563 <            if (self.firstRun):
564 <                ## pythia first run
565 <                args.append(str(self.firstRun)+str(i))
566 <            if (self.generator in self.managedGenerators):
567 <                if (self.generator == 'comphep' and i == 0):
563 >            if (firstRun): # Pythia first run
564 >                args.append(str(int(firstRun)+i))
565 >            if (generator in managedGenerators):
566 >               args.append(generator)
567 >               if (generator == 'comphep' and i == 0):
568                      # COMPHEP is brain-dead and wants event #'s like 1,100,200,300
569                      args.append('1')
570 <                else:
570 >               else:
571                      args.append(str(i*self.eventsPerJob))
572 +            args.append(str(self.eventsPerJob))
573              self.list_of_args.append(args)
574         # prepare dict output
575 +
576          dictOut = {}
577 +        dictOut['params'] = ['MaxEvents']
578 +        if (firstRun):
579 +            dictOut['params'] = ['FirstRun','MaxEvents']
580 +            if ( generator in managedGenerators ) :
581 +                dictOut['params'] = ['FirstRun', 'Generator', 'FirstEvent', 'MaxEvents']
582 +        else:
583 +            if (generator in managedGenerators) :
584 +                dictOut['params'] = ['Generator', 'FirstEvent', 'MaxEvents']
585          dictOut['args'] = self.list_of_args
586 <        dictOut['jobDestination'] = self.jobDestination
586 >        dictOut['jobDestination'] = jobDestination
587          dictOut['njobs']=self.total_number_of_jobs
588  
589          return dictOut
# Line 492 | Line 594 | class JobSplitter:
594          Perform job splitting based on number of job
595          """
596          self.checkUserSettings()
597 <        if (self.selectnumberofjobs == 0):
597 >        if (self.selectNumberOfJobs == 0):
598              msg = 'must specify  number_of_jobs.'
599              raise crabexception(msg)
600 +        jobDestination = []
601 +        common.logger.debug('Splitting per job')
602 +        common.logger.info('Required '+str(self.theNumberOfJobs)+' jobs in total ')
603  
604 <        common.logger.debug(5,'Splitting per job')
500 <        common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
604 > #        self.total_number_of_jobs = self.theNumberOfJobs
605  
606 <        self.total_number_of_jobs = self.theNumberOfJobs
606 >        self.prepareSplittingNoInput()
607  
608 <        common.logger.debug(5,'N jobs  '+str(self.total_number_of_jobs))
608 >        common.logger.debug('N jobs  '+str(self.total_number_of_jobs))
609  
610 <        common.logger.message(str(self.total_number_of_jobs)+' jobs can be created')
610 >        common.logger.info(str(self.total_number_of_jobs)+' jobs can be created')
611  
612          # argument is seed number.$i
613          self.list_of_args = []
614          for i in range(self.total_number_of_jobs):
615 <            self.jobDestination.append([""])
616 <            self.list_of_args.append([str(i)])
615 >            args=[]
616 >            jobDestination.append([""])
617 >            if self.eventsPerJob != 0 :
618 >                args.append(str(self.eventsPerJob))
619 >                self.list_of_args.append(args)
620  
621         # prepare dict output
622          dictOut = {}
623 <        dictOut['args'] = self.list_of_args
624 <        dictOut['jobDestination'] = []
623 >        dictOut['params'] = ['MaxEvents']
624 >        dictOut['args'] =  self.list_of_args
625 >        dictOut['jobDestination'] = jobDestination
626          dictOut['njobs']=self.total_number_of_jobs
627          return dictOut
520
628  
629 <    def jobSplittingByLumi(self):
629 >
630 >    def jobSplittingByLumi(self):
631          """
632 +        Split task into jobs by Lumi section paying attention to which
633 +        lumis should be run (according to the analysis dataset).
634 +        This uses WMBS job splitting which does not split files over jobs
635 +        so the job will have AT LEAST as many lumis as requested, perhaps
636 +        more
637          """
638 <        return
638 >
639 >        common.logger.debug('Splitting by Lumi')
640 >        self.checkLumiSettings()
641 >
642 >        blockSites = self.args['blockSites']
643 >        pubdata = self.args['pubdata']
644 >
645 >        lumisPerFile  = pubdata.getLumis()
646 >
647 >        # Make the list of WMBS files for job splitter
648 >        fileList = pubdata.getListFiles()
649 >        thefiles = Fileset(name='FilesToSplit')
650 >        for jobFile in fileList:
651 >            block = jobFile['Block']['Name']
652 >            try:
653 >                jobFile['Block']['StorageElementList'].extend(blockSites[block])
654 >            except:
655 >                continue
656 >            wmbsFile = File(jobFile['LogicalFileName'])
657 >            [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
658 >            wmbsFile['block'] = block
659 >            for lumi in lumisPerFile[jobFile['LogicalFileName']]:
660 >                wmbsFile.addRun(Run(lumi[0], lumi[1]))
661 >            thefiles.addFile(wmbsFile)
662 >
663 >        # Create the factory and workflow
664 >        work = Workflow()
665 >        subs = Subscription(fileset    = thefiles,    workflow = work,
666 >                            split_algo = 'LumiBased', type     = "Processing")
667 >        splitter = SplitterFactory()
668 >        jobFactory = splitter(subs)
669 >
670 >        list_of_lists = []
671 >        jobDestination = []
672 >        jobCount = 0
673 >        lumisCreated = 0
674 >
675 >        if not self.limitJobLumis:
676 >            self.lumisPerJob = pubdata.getMaxLumis() // self.theNumberOfJobs + 1
677 >            common.logger.info('Each job will process about %s lumis.' %
678 >                                self.lumisPerJob)
679 >
680 >        for jobGroup in  jobFactory(lumis_per_job = self.lumisPerJob):
681 >            for job in jobGroup.jobs:
682 >                if (self.limitNJobs and jobCount >= self.theNumberOfJobs):
683 >                    common.logger.info('Limit on number of jobs reached.')
684 >                    break
685 >                if (self.limitTotalLumis and lumisCreated >= self.totalNLumis):
686 >                    common.logger.info('Limit on number of lumis reached.')
687 >                    break
688 >                lumis = []
689 >                lfns  = []
690 >                locations = []
691 >                firstFile = True
692 >                # Collect information from all the files
693 >                for jobFile in job.getFiles():
694 >                    if firstFile:  # Get locations from first file in the job
695 >                        for loc in jobFile['locations']:
696 >                            locations.append(loc)
697 >                        firstFile = False
698 >                    # Accumulate Lumis from all files
699 >                    for lumiList in jobFile['runs']:
700 >                        theRun = lumiList.run
701 >                        for theLumi in list(lumiList):
702 >                            lumis.append( (theRun, theLumi) )
703 >
704 >                    lfns.append(jobFile['lfn'])
705 >                fileString = ','.join(lfns)
706 >                lumiString = compressLumiString(lumis)
707 >                list_of_lists.append([fileString, str(-1), str(0), lumiString])
708 >
709 >                jobDestination.append(locations)
710 >                jobCount += 1
711 >                lumisCreated += len(lumis)
712 >                common.logger.debug('Job %s will run on %s files and %s lumis '
713 >                    % (jobCount, len(lfns), len(lumis) ))
714 >
715 >        common.logger.info('%s jobs created to run on %s lumis' %
716 >                              (jobCount, lumisCreated))
717 >
718 >        # Prepare dict output matching back to non-WMBS job creation
719 >        dictOut = {}
720 >        dictOut['params'] = ['InputFiles', 'MaxEvents', 'SkipEvents', 'Lumis']
721 >        dictOut['args'] = list_of_lists
722 >        dictOut['jobDestination'] = jobDestination
723 >        dictOut['njobs'] = jobCount
724 >
725 >        return dictOut
726 >
727 >
728      def Algos(self):
729          """
730          Define key splittingType matrix
731          """
732 <        SplitAlogs = {
733 <                     'EventBased'           : self.jobSplittingByEvent,
732 >        SplitAlogs = {
733 >                     'EventBased'           : self.jobSplittingByEvent,
734                       'RunBased'             : self.jobSplittingByRun,
735 <                     'LumiBased'            : self.jobSplittingByLumi,
736 <                     'NoInput'              : self.jobSplittingNoInput,
735 >                     'LumiBased'            : self.jobSplittingByLumi,
736 >                     'NoInput'              : self.jobSplittingNoInput,
737                       'ForScript'            : self.jobSplittingForScript
738 <                     }  
738 >                     }
739          return SplitAlogs
740  
741 +
742 +
743 + def compressLumiString(lumis):
744 +    """
745 +    Turn a list of 2-tuples of run/lumi numbers into a list of the format
746 +    R1:L1,R2:L2-R3:L3 which is acceptable to CMSSW LumiBlockRange variable
747 +    """
748 +
749 +    lumis.sort()
750 +    parts = []
751 +    startRange = None
752 +    endRange = None
753 +
754 +    for lumiBlock in lumis:
755 +        if not startRange: # This is the first one
756 +            startRange = lumiBlock
757 +            endRange = lumiBlock
758 +        elif lumiBlock == endRange: # Same Lumi (different files?)
759 +            pass
760 +        elif lumiBlock[0] == endRange[0] and lumiBlock[1] == endRange[1] + 1: # This is a continuation
761 +            endRange = lumiBlock
762 +        else: # This is the start of a new range
763 +            part = ':'.join(map(str, startRange))
764 +            if startRange != endRange:
765 +                part += '-' + ':'.join(map(str, endRange))
766 +            parts.append(part)
767 +            startRange = lumiBlock
768 +            endRange = lumiBlock
769 +
770 +    # Put out what's left
771 +    if startRange:
772 +        part = ':'.join(map(str, startRange))
773 +        if startRange != endRange:
774 +            part += '-' + ':'.join(map(str, endRange))
775 +        parts.append(part)
776 +
777 +    output = ','.join(parts)
778 +    return output
779 +
780 +

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines