ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Splitter.py
(Generate patch)

Comparing COMP/CRAB/python/Splitter.py (file contents):
Revision 1.4 by spiga, Mon Feb 9 18:20:20 2009 UTC vs.
Revision 1.26 by ewv, Wed Jul 29 21:42:16 2009 UTC

# Line 1 | Line 1
1   import common
2 < from crab_logger import Logger
2 > from sets import Set
3   from crab_exceptions import *
4   from crab_util import *
5 +
6 + from WMCore.DataStructs.File import File
7 + from WMCore.DataStructs.Fileset import Fileset
8 + from WMCore.DataStructs.Run import Run
9 + from WMCore.DataStructs.Subscription import Subscription
10 + from WMCore.DataStructs.Workflow import Workflow
11 + from WMCore.JobSplitting.SplitterFactory import SplitterFactory
12   from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
13  
14   class JobSplitter:
# Line 10 | Line 17 | class JobSplitter:
17          self.args=args
18          #self.maxEvents
19          # init BlackWhiteListParser
20 <        seWhiteList = cfg_params.get('EDG.se_white_list',[])
21 <        seBlackList = cfg_params.get('EDG.se_black_list',[])
22 <        self.blackWhiteListParser = SEBlackWhiteListParser(seWhiteList, seBlackList, common.logger)
20 >        self.seWhiteList = cfg_params.get('GRID.se_white_list',[])
21 >        seBlackList = cfg_params.get('GRID.se_black_list',[])
22 >        self.blackWhiteListParser = SEBlackWhiteListParser(self.seWhiteList, seBlackList, common.logger())
23  
24  
25      def checkUserSettings(self):
# Line 44 | Line 51 | class JobSplitter:
51              self.selectTotalNumberEvents = 0
52  
53  
54 +    def ComputeSubBlockSites( self, blockSites ):
55 +        """
56 +        """
57 +        sub_blockSites = {}
58 +        for k,v in blockSites.iteritems():
59 +            sites=self.blackWhiteListParser.checkWhiteList(v)
60 +            if sites : sub_blockSites[k]=v
61 +        if len(sub_blockSites) < 1:
62 +            msg = 'WARNING: the sites %s is not hosting any part of data.'%self.seWhiteList
63 +            raise CrabException(msg)
64 +        return sub_blockSites
65 +
66   ########################################################################
67      def jobSplittingByEvent( self ):
68          """
# Line 58 | Line 77 | class JobSplitter:
77                self.list_of_args - File(s) job will run on (a list of lists)
78          """
79  
80 <        jobDestination=[]  
80 >        jobDestination=[]
81          self.checkUserSettings()
82          if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
83              msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
84              raise CrabException(msg)
85 <
86 <        blockSites = self.args['blockSites']
85 >
86 >        blockSites = self.args['blockSites']
87          pubdata = self.args['pubdata']
88          filesbyblock=pubdata.getFiles()
89  
# Line 78 | Line 97 | class JobSplitter:
97          self.useParent = int(self.cfg_params.get('CMSSW.use_parent',0))
98          noBboundary = int(self.cfg_params.get('CMSSW.no_block_boundary',0))
99  
100 +        if noBboundary == 1:
101 +            if self.total_number_of_events== -1:
102 +                msg = 'You are selecting no_block_boundary=1 which does not allow to set total_number_of_events=-1\n'
103 +                msg +='\tYou shoud get the number of event from DBS web interface and use it for your configuration.'
104 +                raise CrabException(msg)
105 +            if len(self.seWhiteList) == 0 or  len(self.seWhiteList.split(',')) != 1:
106 +                msg = 'You are selecting no_block_boundary=1 which requires to choose one and only one site.\n'
107 +                msg += "\tPlease set se_white_list with the site's storage element name."
108 +                raise  CrabException(msg)
109 +            blockSites = self.ComputeSubBlockSites(blockSites)
110 +
111          # ---- Handle the possible job splitting configurations ---- #
112          if (self.selectTotalNumberEvents):
113              totalEventsRequested = self.total_number_of_events
# Line 92 | Line 122 | class JobSplitter:
122          # If user requested more events than are in the dataset
123          elif (totalEventsRequested > self.maxEvents):
124              eventsRemaining = self.maxEvents
125 <            common.logger.message("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
125 >            common.logger.info("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
126          # If user requested less events than are in the dataset
127          else:
128              eventsRemaining = totalEventsRequested
# Line 108 | Line 138 | class JobSplitter:
138              eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs)
139  
140          if (self.selectNumberOfJobs):
141 <            common.logger.message("May not create the exact number_of_jobs requested.")
141 >            common.logger.info("May not create the exact number_of_jobs requested.")
142  
143          # old... to remove Daniele
144          totalNumberOfJobs = 999999999
# Line 125 | Line 155 | class JobSplitter:
155          jobsOfBlock = {}
156  
157          parString = ""
158 +        pString = ""
159          filesEventCount = 0
160 +        msg=''
161  
162          # ---- Iterate over the blocks in the dataset until ---- #
163          # ---- we've met the requested total # of events    ---- #
# Line 137 | Line 169 | class JobSplitter:
169  
170              if self.eventsbyblock.has_key(block) :
171                  numEventsInBlock = self.eventsbyblock[block]
172 <                common.logger.debug(5,'Events in Block File '+str(numEventsInBlock))
172 >                common.logger.debug('Events in Block File '+str(numEventsInBlock))
173  
174                  files = filesbyblock[block]
175                  numFilesInBlock = len(files)
# Line 147 | Line 179 | class JobSplitter:
179                  if noBboundary == 0: # DD
180                      # ---- New block => New job ---- #
181                      parString = ""
182 +                    pString=""
183                      # counter for number of events in files currently worked on
184                      filesEventCount = 0
185                  # flag if next while loop should touch new file
# Line 156 | Line 189 | class JobSplitter:
189  
190                  # ---- Iterate over the files in the block until we've met the requested ---- #
191                  # ---- total # of events or we've gone over all the files in this block  ---- #
192 <                pString=''
192 >                msg='\n'
193                  while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
194                      file = files[fileCount]
195                      if self.useParent==1:
196                          parent = self.parentFiles[file]
197 <                        for f in parent :
165 <                            pString += '\\\"' + f + '\\\"\,'
166 <                        common.logger.debug(6, "File "+str(file)+" has the following parents: "+str(parent))
167 <                        common.logger.write("File "+str(file)+" has the following parents: "+str(parent))
197 >                        common.logger.log(10-1, "File "+str(file)+" has the following parents: "+str(parent))
198                      if newFile :
199                          try:
200                              numEventsInFile = self.eventsbyfile[file]
201 <                            common.logger.debug(6, "File "+str(file)+" has "+str(numEventsInFile)+" events")
201 >                            common.logger.log(10-1, "File "+str(file)+" has "+str(numEventsInFile)+" events")
202                              # increase filesEventCount
203                              filesEventCount += numEventsInFile
204                              # Add file to current job
205 <                            parString += '\\\"' + file + '\\\"\,'
205 >                            parString +=  file + ','
206 >                            if self.useParent==1:
207 >                                for f in parent :
208 >                                    pString += f  + ','
209                              newFile = 0
210                          except KeyError:
211 <                            common.logger.message("File "+str(file)+" has unknown number of events: skipping")
211 >                            common.logger.info("File "+str(file)+" has unknown number of events: skipping")
212  
213                      eventsPerJobRequested = min(eventsPerJobRequested, eventsRemaining)
214                      # if less events in file remain than eventsPerJobRequested
# Line 188 | Line 221 | class JobSplitter:
221                              if ( fileCount == numFilesInBlock-1 ) :
222                                  # end job using last file, use remaining events in block
223                                  # close job and touch new file
224 <                                fullString = parString[:-2]
224 >                                fullString = parString[:-1]
225                                  if self.useParent==1:
226 <                                    fullParentString = pString[:-2]
226 >                                    fullParentString = pString[:-1]
227                                      list_of_lists.append([fullString,fullParentString,str(-1),str(jobSkipEventCount)])
228                                  else:
229                                      list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
230 <                                common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(filesEventCount - jobSkipEventCount)+" events (last file in block).")
230 >                                msg += "Job %s can run over %s  events (last file in block).\n"%(str(jobCount+1), str(filesEventCount - jobSkipEventCount))
231                                  jobDestination.append(blockSites[block])
232 <                                common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(jobDestination[jobCount]))
232 >                                msg += "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
233                                  # fill jobs of block dictionary
234                                  jobsOfBlock[block].append(jobCount+1)
235                                  # reset counter
# Line 217 | Line 250 | class JobSplitter:
250                      # if events in file equal to eventsPerJobRequested
251                      elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
252                          # close job and touch new file
253 <                        fullString = parString[:-2]
253 >                        fullString = parString[:-1]
254                          if self.useParent==1:
255 <                            fullParentString = pString[:-2]
255 >                            fullParentString = pString[:-1]
256                              list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
257                          else:
258                              list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
259 <                        common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
259 >                        msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
260                          jobDestination.append(blockSites[block])
261 <                        common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(jobDestination[jobCount]))
261 >                        msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
262                          jobsOfBlock[block].append(jobCount+1)
263                          # reset counter
264                          jobCount = jobCount + 1
# Line 242 | Line 275 | class JobSplitter:
275                      # if more events in file remain than eventsPerJobRequested
276                      else :
277                          # close job but don't touch new file
278 <                        fullString = parString[:-2]
278 >                        fullString = parString[:-1]
279                          if self.useParent==1:
280 <                            fullParentString = pString[:-2]
280 >                            fullParentString = pString[:-1]
281                              list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
282                          else:
283                              list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
284 <                        common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
284 >                        msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
285                          jobDestination.append(blockSites[block])
286 <                        common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(jobDestination[jobCount]))
286 >                        msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
287                          jobsOfBlock[block].append(jobCount+1)
288                          # increase counter
289                          jobCount = jobCount + 1
# Line 261 | Line 294 | class JobSplitter:
294                          jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
295                          # remove all but the last file
296                          filesEventCount = self.eventsbyfile[file]
297 +                        pString_tmp=''
298                          if self.useParent==1:
299 <                            for f in parent : pString += '\\\"' + f + '\\\"\,'
300 <                        parString = '\\\"' + file + '\\\"\,'
299 >                            for f in parent : pString_tmp +=  f + ','
300 >                        pString =  pString_tmp
301 >                        parString =  file + ','
302                      pass # END if
303                  pass # END while (iterate over files in the block)
304          pass # END while (iterate over blocks in the dataset)
305 +        common.logger.debug(msg)
306          self.ncjobs = self.total_number_of_jobs = jobCount
307          if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
308 <            common.logger.message("Could not run on all requested events because some blocks not hosted at allowed sites.")
309 <        common.logger.message(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
310 <
308 >            common.logger.info("Could not run on all requested events because some blocks not hosted at allowed sites.")
309 >        common.logger.info(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
310 >
311          # skip check on  block with no sites  DD
312 <        if noBboundary == 0 : self.checkBlockNoSite(blocks,jobsOfBlock)
312 >        if noBboundary == 0 : self.checkBlockNoSite(blocks,jobsOfBlock,blockSites)
313  
314         # prepare dict output
315          dictOut = {}
316 +        dictOut['params']= ['InputFiles','MaxEvents','SkipEvents']
317 +        if self.useParent: dictOut['params']= ['InputFiles','ParentFiles','MaxEvents','SkipEvents']
318          dictOut['args'] = list_of_lists
319          dictOut['jobDestination'] = jobDestination
320          dictOut['njobs']=self.total_number_of_jobs
# Line 285 | Line 323 | class JobSplitter:
323  
324          # keep trace of block with no sites to print a warning at the end
325  
326 <    def checkBlockNoSite(self,blocks,jobsOfBlock):  
326 >    def checkBlockNoSite(self,blocks,jobsOfBlock,blockSites):
327          # screen output
328          screenOutput = "List of jobs and available destination sites:\n\n"
329          noSiteBlock = []
330          bloskNoSite = []
331 +        allBlock = []
332  
333          blockCounter = 0
334          for block in blocks:
335              if block in jobsOfBlock.keys() :
336                  blockCounter += 1
337 +                allBlock.append( blockCounter )
338 +                sites=self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],[block]),[block])
339                  screenOutput += "Block %5i: jobs %20s: sites: %s\n" % (blockCounter,spanRanges(jobsOfBlock[block]),
340 <                    ','.join(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],[block]),[block])))
341 <                if len(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],[block]),[block])) == 0:
340 >                    ', '.join(SE2CMS(sites)))
341 >                if len(sites) == 0:
342                      noSiteBlock.append( spanRanges(jobsOfBlock[block]) )
343                      bloskNoSite.append( blockCounter )
344  
345 <        common.logger.message(screenOutput)
345 >        common.logger.info(screenOutput)
346          if len(noSiteBlock) > 0 and len(bloskNoSite) > 0:
347              msg = 'WARNING: No sites are hosting any part of data for block:\n                '
348              virgola = ""
# Line 309 | Line 350 | class JobSplitter:
350                  virgola = ","
351              for block in bloskNoSite:
352                  msg += ' ' + str(block) + virgola
353 <            msg += '\n               Related jobs:\n                 '
353 >            msg += '\n\t\tRelated jobs:\n                 '
354              virgola = ""
355              if len(noSiteBlock) > 1:
356                  virgola = ","
357              for range_jobs in noSiteBlock:
358                  msg += str(range_jobs) + virgola
359 <            msg += '\n               will not be submitted and this block of data can not be analyzed!\n'
360 <            if self.cfg_params.has_key('EDG.se_white_list'):
361 <                msg += 'WARNING: SE White List: '+self.cfg_params['EDG.se_white_list']+'\n'
362 <                msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
363 <                msg += 'Please check if the dataset is available at this site!)\n'
364 <            if self.cfg_params.has_key('EDG.ce_white_list'):
365 <                msg += 'WARNING: CE White List: '+self.cfg_params['EDG.ce_white_list']+'\n'
366 <                msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
367 <                msg += 'Please check if the dataset is available at this site!)\n'
359 >            msg += '\n\t\twill not be submitted and this block of data can not be analyzed!\n'
360 >            if self.cfg_params.has_key('GRID.se_white_list'):
361 >                msg += '\tWARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
362 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
363 >                msg += '\tPlease check if the dataset is available at this site!)'
364 >            if self.cfg_params.has_key('GRID.ce_white_list'):
365 >                msg += '\tWARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
366 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
367 >                msg += '\tPlease check if the dataset is available at this site!)\n'
368 >
369 >            common.logger.info(msg)
370  
371 <            common.logger.message(msg)
371 >        if bloskNoSite == allBlock:
372 >            raise CrabException('No jobs created')
373  
374          return
375  
376  
377   ########################################################################
378 <    def jobSplittingByRun(self):
378 >    def jobSplittingByRun(self):
379          """
380          """
337        from sets import Set  
338        from WMCore.JobSplitting.RunBased import RunBased
339        from WMCore.DataStructs.Workflow import Workflow
340        from WMCore.DataStructs.File import File
341        from WMCore.DataStructs.Fileset import Fileset
342        from WMCore.DataStructs.Subscription import Subscription
343        from WMCore.JobSplitting.SplitterFactory import SplitterFactory
344        from WMCore.DataStructs.Run import Run
381  
382          self.checkUserSettings()
383 <        blockSites = self.args['blockSites']
383 >        blockSites = self.args['blockSites']
384          pubdata = self.args['pubdata']
385  
386          if self.selectNumberOfJobs == 0 :
387              self.theNumberOfJobs = 9999999
388          blocks = {}
389 <        runList = []
389 >        runList = []
390          thefiles = Fileset(name='FilesToSplit')
391          fileList = pubdata.getListFiles()
392          for f in fileList:
357           # print f
393              block = f['Block']['Name']
394 <          #  if not blocks.has_key(block):
360 <          #      blocks[block] = reader.listFileBlockLocation(block)
361 <            try:
394 >            try:
395                  f['Block']['StorageElementList'].extend(blockSites[block])
396              except:
397                  continue
# Line 366 | Line 399 | class JobSplitter:
399              [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
400              wmbsFile['block'] = block
401              runNum = f['RunsList'][0]['RunNumber']
402 <            runList.append(runNum)
402 >            runList.append(runNum)
403              myRun = Run(runNumber=runNum)
404              wmbsFile.addRun( myRun )
405              thefiles.addFile(
406                  wmbsFile
407                  )
408 <
408 >
409          work = Workflow()
410          subs = Subscription(
411          fileset = thefiles,
# Line 381 | Line 414 | class JobSplitter:
414          type = "Processing")
415          splitter = SplitterFactory()
416          jobfactory = splitter(subs)
417 <        
418 <        #loop over all runs
417 >
418 >        #loop over all runs
419          set = Set(runList)
420          list_of_lists = []
421          jobDestination = []
389
422          count = 0
423 <        for i in list(set):
423 >        for jobGroup in  jobfactory():
424              if count <  self.theNumberOfJobs:
425 <                res = self.getJobInfo(jobfactory())
426 <                parString = ''
425 >                res = self.getJobInfo(jobGroup)
426 >                parString = ''
427                  for file in res['lfns']:
428 <                    parString += '\\\"' + file + '\\\"\,'
429 <                fullString = parString[:-2]
430 <                list_of_lists.append([fullString,str(-1),str(0)])    
428 >                    parString += file + ','
429 >                fullString = parString[:-1]
430 >                list_of_lists.append([fullString,str(-1),str(0)])
431                  #need to check single file location
432 <                jobDestination.append(res['locations'])  
432 >                jobDestination.append(res['locations'])
433                  count +=1
402        #print jobDestination
434         # prepare dict output
435          dictOut = {}
436 +        dictOut['params']= ['InputFiles','MaxEvents','SkipEvents']
437          dictOut['args'] = list_of_lists
438          dictOut['jobDestination'] = jobDestination
439          dictOut['njobs']=count
# Line 410 | Line 442 | class JobSplitter:
442  
443      def getJobInfo( self,jobGroup ):
444          res = {}
445 <        lfns = []        
446 <        locations = []        
445 >        lfns = []
446 >        locations = []
447          tmp_check=0
448          for job in jobGroup.jobs:
449              for file in job.getFiles():
450 <                lfns.append(file['lfn'])
450 >                lfns.append(file['lfn'])
451                  for loc in file['locations']:
452                      if tmp_check < 1 :
453                          locations.append(loc)
454 <                    tmp_check = tmp_check + 1
455 <                ### qui va messo il check per la locations
456 <        res['lfns'] = lfns
457 <        res['locations'] = locations
458 <        return res                
459 <      
454 >                tmp_check = tmp_check + 1
455 >                ### qui va messo il check per la locations
456 >        res['lfns'] = lfns
457 >        res['locations'] = locations
458 >        return res
459 >
460   ########################################################################
461 <    def jobSplittingNoInput(self):
461 >    def prepareSplittingNoInput(self):
462          """
431        Perform job splitting based on number of event per job
463          """
433        common.logger.debug(5,'Splitting per events')
434        self.checkUserSettings()
435        jobDestination=[]
436        if (self.selectNumberOfJobs == 0):
437            msg = 'Must specify  number_of_jobs.'
438            raise CrabException(msg)
439
440        managedGenerators =self.args['managedGenerators']
441        generator = self.args['generator']
442        firstRun = self.cfg_params.get('CMSSW.first_run',None)
443
464          if (self.selectEventsPerJob):
465 <            common.logger.message('Required '+str(self.eventsPerJob)+' events per job ')
465 >            common.logger.info('Required '+str(self.eventsPerJob)+' events per job ')
466          if (self.selectNumberOfJobs):
467 <            common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
467 >            common.logger.info('Required '+str(self.theNumberOfJobs)+' jobs in total ')
468          if (self.selectTotalNumberEvents):
469 <            common.logger.message('Required '+str(self.total_number_of_events)+' events in total ')
469 >            common.logger.info('Required '+str(self.total_number_of_events)+' events in total ')
470  
471          if (self.total_number_of_events < 0):
472              msg='Cannot split jobs per Events with "-1" as total number of events'
# Line 463 | Line 483 | class JobSplitter:
483              self.total_number_of_jobs = self.theNumberOfJobs
484              self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
485  
486 <        common.logger.debug(5,'N jobs  '+str(self.total_number_of_jobs))
486 >
487 >    def jobSplittingNoInput(self):
488 >        """
489 >        Perform job splitting based on number of event per job
490 >        """
491 >        common.logger.debug('Splitting per events')
492 >        self.checkUserSettings()
493 >        jobDestination=[]
494 >        if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
495 >            msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
496 >            raise CrabException(msg)
497 >
498 >        managedGenerators =self.args['managedGenerators']
499 >        generator = self.args['generator']
500 >        firstRun = self.cfg_params.get('CMSSW.first_run',None)
501 >
502 >        self.prepareSplittingNoInput()
503 >
504 >        common.logger.debug('N jobs  '+str(self.total_number_of_jobs))
505  
506          # is there any remainder?
507          check = int(self.total_number_of_events) - (int(self.total_number_of_jobs)*self.eventsPerJob)
508  
509 <        common.logger.debug(5,'Check  '+str(check))
509 >        common.logger.debug('Check  '+str(check))
510  
511 <        common.logger.message(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
511 >        common.logger.info(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
512          if check > 0:
513 <            common.logger.message('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
513 >            common.logger.info('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
514  
515          # argument is seed number.$i
516          self.list_of_args = []
# Line 484 | Line 522 | class JobSplitter:
522                  ## pythia first run
523                  args.append(str(firstRun)+str(i))
524              if (generator in managedGenerators):
525 <                if (generator == 'comphep' and i == 0):
525 >               args.append(generator)
526 >               if (generator == 'comphep' and i == 0):
527                      # COMPHEP is brain-dead and wants event #'s like 1,100,200,300
528                      args.append('1')
529 <                else:
529 >               else:
530                      args.append(str(i*self.eventsPerJob))
531 +            args.append(str(self.eventsPerJob))
532              self.list_of_args.append(args)
533         # prepare dict output
534 +
535          dictOut = {}
536 +        dictOut['params'] = ['MaxEvents']
537 +        if (firstRun):
538 +            dictOut['params'] = ['FirstRun','MaxEvents']
539 +            if ( generator in managedGenerators ) :
540 +                dictOut['params'] = ['FirstRun', 'Generator', 'FirstEvent', 'MaxEvents']
541 +        else:
542 +            if (generator in managedGenerators) :
543 +                dictOut['params'] = ['Generator', 'FirstEvent', 'MaxEvents']
544          dictOut['args'] = self.list_of_args
545          dictOut['jobDestination'] = jobDestination
546          dictOut['njobs']=self.total_number_of_jobs
# Line 508 | Line 557 | class JobSplitter:
557              msg = 'must specify  number_of_jobs.'
558              raise crabexception(msg)
559          jobDestination = []
560 <        common.logger.debug(5,'Splitting per job')
561 <        common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
560 >        common.logger.debug('Splitting per job')
561 >        common.logger.info('Required '+str(self.theNumberOfJobs)+' jobs in total ')
562  
563 <        self.total_number_of_jobs = self.theNumberOfJobs
563 > #        self.total_number_of_jobs = self.theNumberOfJobs
564  
565 <        common.logger.debug(5,'N jobs  '+str(self.total_number_of_jobs))
565 >        self.prepareSplittingNoInput()
566  
567 <        common.logger.message(str(self.total_number_of_jobs)+' jobs can be created')
567 >        common.logger.debug('N jobs  '+str(self.total_number_of_jobs))
568 >
569 >        common.logger.info(str(self.total_number_of_jobs)+' jobs can be created')
570  
571          # argument is seed number.$i
572          self.list_of_args = []
573          for i in range(self.total_number_of_jobs):
574 +            args=[]
575              jobDestination.append([""])
576 <            self.list_of_args.append([str(i)])
576 >            if self.eventsPerJob != 0 :
577 >                args.append(str(self.eventsPerJob))
578 >                self.list_of_args.append(args)
579  
580         # prepare dict output
581          dictOut = {}
582 <        dictOut['args'] = self.list_of_args
582 >        dictOut['params'] = ['MaxEvents']
583 >        dictOut['args'] =  self.list_of_args
584          dictOut['jobDestination'] = jobDestination
585          dictOut['njobs']=self.total_number_of_jobs
586          return dictOut
532
587  
588 <    def jobSplittingByLumi(self):
588 >
589 >    def jobSplittingByLumi(self):
590          """
591 +        Split task into jobs by Lumi section paying attention to which
592 +        lumis should be run (according to the analysis dataset).
593 +        This uses WMBS job splitting which does not split files over jobs
594 +        so the job will have AT LEAST as many lumis as requested, perhaps
595 +        more
596          """
597 <        return
597 >
598 >        common.logger.debug('Splitting by Lumi')
599 >        self.checkUserSettings() # FIXME need one for lumis
600 >
601 >        blockSites = self.args['blockSites']
602 >        pubdata = self.args['pubdata']
603 >
604 >        if self.selectNumberOfJobs == 0 :
605 >            self.theNumberOfJobs = 9999999
606 >        lumisPerFile  = pubdata.getLumis()
607 >
608 >        # Make the list of WMBS files for job splitter
609 >        fileList = pubdata.getListFiles()
610 >        thefiles = Fileset(name='FilesToSplit')
611 >        for jobFile in fileList:
612 >            block = jobFile['Block']['Name']
613 >            try:
614 >                jobFile['Block']['StorageElementList'].extend(blockSites[block])
615 >            except:
616 >                continue
617 >            wmbsFile = File(jobFile['LogicalFileName'])
618 >            [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
619 >            wmbsFile['block'] = block
620 >            for lumi in lumisPerFile[jobFile['LogicalFileName']]:
621 >                wmbsFile.addRun(Run(lumi[0], lumi[1]))
622 >            thefiles.addFile(wmbsFile)
623 >
624 >        work = Workflow()
625 >        subs = Subscription(fileset    = thefiles,    workflow = work,
626 >                            split_algo = 'LumiBased', type     = "Processing")
627 >        splitter = SplitterFactory()
628 >        jobFactory = splitter(subs)
629 >
630 >        list_of_lists = []
631 >        jobDestination = []
632 >        jobCount = 0
633 >        self.theNumberOfJobs = 20 #FIXME
634 >        for jobGroup in  jobFactory(lumis_per_job = 50): #FIXME
635 >            for job in jobGroup.jobs:
636 >                if jobCount <  self.theNumberOfJobs:
637 >                    lumis = []
638 >                    lfns  = []
639 >                    locations = []
640 >                    firstFile = True
641 >                    # Collect information from all the files
642 >                    for jobFile in job.getFiles():
643 >                        if firstFile:  # Get locations from first file in the job
644 >                            for loc in jobFile['locations']:
645 >                                locations.append(loc)
646 >                            firstFile = False
647 >                        # Accumulate Lumis from all files
648 >                        for lumiList in jobFile['runs']:
649 >                            theRun = lumiList.run
650 >                            for theLumi in list(lumiList):
651 >                                lumis.append( (theRun, theLumi) )
652 >
653 >                        lfns.append(jobFile['lfn'])
654 >                    fileString = ','.join(lfns)
655 >                    lumiString = compressLumiString(lumis)
656 >                    common.logger.debug('Job %s will run on %s files and %s lumis '
657 >                        % (jobCount, len(lfns), len(lumis) ))
658 >                    list_of_lists.append([fileString, str(-1), str(0), lumiString])
659 >
660 >                    jobDestination.append(locations)
661 >                    jobCount += 1
662 >        # Prepare dict output matching back to non-WMBS job creation
663 >        dictOut = {}
664 >        dictOut['params'] = ['InputFiles', 'MaxEvents', 'SkipEvents', 'Lumis']
665 >        dictOut['args'] = list_of_lists
666 >        dictOut['jobDestination'] = jobDestination
667 >        dictOut['njobs'] = jobCount
668 >
669 >        return dictOut
670 >
671 >
672      def Algos(self):
673          """
674          Define key splittingType matrix
675          """
676 <        SplitAlogs = {
677 <                     'EventBased'           : self.jobSplittingByEvent,
676 >        SplitAlogs = {
677 >                     'EventBased'           : self.jobSplittingByEvent,
678                       'RunBased'             : self.jobSplittingByRun,
679 <                     'LumiBased'            : self.jobSplittingByLumi,
680 <                     'NoInput'              : self.jobSplittingNoInput,
679 >                     'LumiBased'            : self.jobSplittingByLumi,
680 >                     'NoInput'              : self.jobSplittingNoInput,
681                       'ForScript'            : self.jobSplittingForScript
682 <                     }  
682 >                     }
683          return SplitAlogs
684  
685 +
686 +
687 + def compressLumiString(lumis):
688 +    """
689 +    Turn a list of 2-tuples of run/lumi numbers into a list of the format
690 +    R1:L1,R2:L2-R3:L3 which is acceptable to CMSSW LumiBlockRange variable
691 +    """
692 +
693 +    lumis.sort()
694 +    parts = []
695 +    startRange = None
696 +    endRange = None
697 +
698 +    for lumiBlock in lumis:
699 +        if not startRange: # This is the first one
700 +            startRange = lumiBlock
701 +            endRange = lumiBlock
702 +        elif lumiBlock == endRange: # Same Lumi (different files?)
703 +            pass
704 +        elif lumiBlock[0] == endRange[0] and lumiBlock[1] == endRange[1] + 1: # This is a continuation
705 +            endRange = lumiBlock
706 +        else: # This is the start of a new range
707 +            part = ':'.join(map(str, startRange))
708 +            if startRange != endRange:
709 +                part += '-' + ':'.join(map(str, endRange))
710 +            parts.append(part)
711 +            startRange = lumiBlock
712 +            endRange = lumiBlock
713 +
714 +    # Put out what's left
715 +    if startRange:
716 +        part = ':'.join(map(str, startRange))
717 +        if startRange != endRange:
718 +            part += '-' + ':'.join(map(str, endRange))
719 +        parts.append(part)
720 +
721 +    output = ','.join(parts)
722 +    return output
723 +
724 +

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines