ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Splitter.py
(Generate patch)

Comparing COMP/CRAB/python/Splitter.py (file contents):
Revision 1.7 by spiga, Wed Feb 11 22:13:03 2009 UTC vs.
Revision 1.56 by spiga, Wed Jul 11 13:59:21 2012 UTC

# Line 1 | Line 1
1 +
2 + __revision__ = "$Id$"
3 + __version__ = "$Revision$"
4 +
5   import common
2 from crab_logger import Logger
6   from crab_exceptions import *
7   from crab_util import *
8 +
9 + from WMCore.DataStructs.File import File
10 + from WMCore.DataStructs.Fileset import Fileset
11 + from WMCore.DataStructs.Run import Run
12 + from WMCore.DataStructs.Subscription import Subscription
13 + from WMCore.DataStructs.Workflow import Workflow
14 + from WMCore.JobSplitting.SplitterFactory import SplitterFactory
15   from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
16 + try: # Can remove when CMSSW 3.7 and earlier are dropped
17 +    from FWCore.PythonUtilities.LumiList import LumiList
18 + except ImportError:
19 +    from LumiList import LumiList
20  
21   class JobSplitter:
22      def __init__( self, cfg_params,  args ):
23          self.cfg_params = cfg_params
24          self.args=args
25 +
26 +        self.lumisPerJob = -1
27 +        self.totalNLumis = 0
28 +        self.theNumberOfJobs = 0
29 +        self.limitNJobs = False
30 +        self.limitTotalLumis = False
31 +        self.limitJobLumis = False
32 +
33          #self.maxEvents
34          # init BlackWhiteListParser
35 <        seWhiteList = cfg_params.get('EDG.se_white_list',[])
36 <        seBlackList = cfg_params.get('EDG.se_black_list',[])
37 <        self.blackWhiteListParser = SEBlackWhiteListParser(seWhiteList, seBlackList, common.logger)
35 >        self.seWhiteList = cfg_params.get('GRID.se_white_list',[])
36 >        seBlackList = cfg_params.get('GRID.se_black_list',[])
37 >        self.blackWhiteListParser = SEBlackWhiteListParser(self.seWhiteList, seBlackList, common.logger())
38 >
39 >        ## check if has been asked for a non default file to store/read analyzed fileBlocks
40 >        defaultName = common.work_space.shareDir()+'AnalyzedBlocks.txt'
41 >        self.fileBlocks_FileName = os.path.abspath(self.cfg_params.get('CMSSW.fileblocks_file',defaultName))
42  
43  
44      def checkUserSettings(self):
# Line 43 | Line 69 | class JobSplitter:
69              self.total_number_of_events = 0
70              self.selectTotalNumberEvents = 0
71  
72 +        return
73 +
74 +    def checkLumiSettings(self):
75 +        """
76 +        Check to make sure the user has specified enough information to
77 +        perform splitting by Lumis to run the job
78 +        """
79 +        settings = 0
80 +        if self.cfg_params.has_key('CMSSW.lumis_per_job'):
81 +            self.lumisPerJob =int( self.cfg_params['CMSSW.lumis_per_job'])
82 +            self.limitJobLumis = True
83 +            settings += 1
84 +
85 +        if self.cfg_params.has_key('CMSSW.number_of_jobs'):
86 +            self.theNumberOfJobs =int( self.cfg_params['CMSSW.number_of_jobs'])
87 +            self.limitNJobs = True
88 +            settings += 1
89 +
90 +        if self.cfg_params.has_key('CMSSW.total_number_of_lumis'):
91 +            self.totalNLumis = int(self.cfg_params['CMSSW.total_number_of_lumis'])
92 +            self.limitTotalLumis = (self.totalNLumis != -1)
93 +            settings += 1
94 +
95 +        if settings != 2:
96 +            msg = 'When splitting by lumi section you must specify two and only two of:\n'
97 +            msg += '  number_of_jobs, lumis_per_job, total_number_of_lumis'
98 +            raise CrabException(msg)
99 +        if self.limitNJobs and self.limitJobLumis:
100 +            self.limitTotalLumis = True
101 +            self.totalNLumis = self.lumisPerJob * self.theNumberOfJobs
102 +
103 +        # Has the user specified runselection?
104 +        if (self.cfg_params.has_key('CMSSW.runselection')):
105 +            common.logger.info('You have specified runselection and split by lumi.')
106 +            common.logger.info('Good lumi list will be the intersection of runselection and lumimask or ADS (if any).')
107 +        return
108 +
109 +    def ComputeSubBlockSites( self, blockSites ):
110 +        """
111 +        """
112 +        sub_blockSites = {}
113 +        for k,v in blockSites.iteritems():
114 +            sites=self.blackWhiteListParser.checkWhiteList(v)
115 +            if sites : sub_blockSites[k]=v
116 +        if len(sub_blockSites) < 1:
117 +            msg = 'WARNING: the sites %s is not hosting any part of data.'%self.seWhiteList
118 +            raise CrabException(msg)
119 +        return sub_blockSites
120  
121   ########################################################################
122      def jobSplittingByEvent( self ):
# Line 58 | Line 132 | class JobSplitter:
132                self.list_of_args - File(s) job will run on (a list of lists)
133          """
134  
135 <        jobDestination=[]  
135 >        jobDestination=[]
136          self.checkUserSettings()
137          if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
138              msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
139              raise CrabException(msg)
140 <
141 <        blockSites = self.args['blockSites']
140 >
141 >        blockSites = self.args['blockSites']
142          pubdata = self.args['pubdata']
143          filesbyblock=pubdata.getFiles()
144  
# Line 78 | Line 152 | class JobSplitter:
152          self.useParent = int(self.cfg_params.get('CMSSW.use_parent',0))
153          noBboundary = int(self.cfg_params.get('CMSSW.no_block_boundary',0))
154  
155 +        if noBboundary == 1:
156 +            if self.total_number_of_events== -1:
157 +                msg = 'You are selecting no_block_boundary=1 which does not allow to set total_number_of_events=-1\n'
158 +                msg +='\tYou shoud get the number of event from DBS web interface and use it for your configuration.'
159 +                raise CrabException(msg)
160 +            if len(self.seWhiteList) == 0 or  len(self.seWhiteList.split(',')) != 1:
161 +                msg = 'You are selecting no_block_boundary=1 which requires to choose one and only one site.\n'
162 +                msg += "\tPlease set se_white_list with the site's storage element name."
163 +                raise  CrabException(msg)
164 +            blockSites = self.ComputeSubBlockSites(blockSites)
165 +
166          # ---- Handle the possible job splitting configurations ---- #
167          if (self.selectTotalNumberEvents):
168              totalEventsRequested = self.total_number_of_events
# Line 92 | Line 177 | class JobSplitter:
177          # If user requested more events than are in the dataset
178          elif (totalEventsRequested > self.maxEvents):
179              eventsRemaining = self.maxEvents
180 <            common.logger.message("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
180 >            common.logger.info("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
181          # If user requested less events than are in the dataset
182          else:
183              eventsRemaining = totalEventsRequested
# Line 108 | Line 193 | class JobSplitter:
193              eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs)
194  
195          if (self.selectNumberOfJobs):
196 <            common.logger.message("May not create the exact number_of_jobs requested.")
196 >            common.logger.info("May not create the exact number_of_jobs requested.")
197  
198 +        if (self.theNumberOfJobs < 0):
199 +            common.logger.info("ERROR: Negative number_of_jobs requested. Will result in no jobs.")
200 +            
201          # old... to remove Daniele
202          totalNumberOfJobs = 999999999
203  
# Line 125 | Line 213 | class JobSplitter:
213          jobsOfBlock = {}
214  
215          parString = ""
216 +        pString = ""
217          filesEventCount = 0
218 +        msg=''
219  
220          # ---- Iterate over the blocks in the dataset until ---- #
221          # ---- we've met the requested total # of events    ---- #
# Line 137 | Line 227 | class JobSplitter:
227  
228              if self.eventsbyblock.has_key(block) :
229                  numEventsInBlock = self.eventsbyblock[block]
230 <                common.logger.debug(5,'Events in Block File '+str(numEventsInBlock))
230 >                common.logger.debug('Events in Block File '+str(numEventsInBlock))
231  
232                  files = filesbyblock[block]
233                  numFilesInBlock = len(files)
# Line 147 | Line 237 | class JobSplitter:
237                  if noBboundary == 0: # DD
238                      # ---- New block => New job ---- #
239                      parString = ""
240 +                    pString=""
241                      # counter for number of events in files currently worked on
242                      filesEventCount = 0
243                  # flag if next while loop should touch new file
# Line 156 | Line 247 | class JobSplitter:
247  
248                  # ---- Iterate over the files in the block until we've met the requested ---- #
249                  # ---- total # of events or we've gone over all the files in this block  ---- #
250 <                pString=''
250 >                msg='\n'
251                  while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
252                      file = files[fileCount]
253                      if self.useParent==1:
254                          parent = self.parentFiles[file]
255 <                        for f in parent :
165 <                            pString += '\\\"' + f + '\\\"\,'
166 <                        common.logger.debug(6, "File "+str(file)+" has the following parents: "+str(parent))
167 <                        common.logger.write("File "+str(file)+" has the following parents: "+str(parent))
255 >                        common.logger.log(10-1, "File "+str(file)+" has the following parents: "+str(parent))
256                      if newFile :
257                          try:
258                              numEventsInFile = self.eventsbyfile[file]
259 <                            common.logger.debug(6, "File "+str(file)+" has "+str(numEventsInFile)+" events")
259 >                            common.logger.log(10-1, "File "+str(file)+" has "+str(numEventsInFile)+" events")
260                              # increase filesEventCount
261                              filesEventCount += numEventsInFile
262                              # Add file to current job
263 <                            parString += '\\\"' + file + '\\\"\,'
263 >                            parString +=  file + ','
264 >                            if self.useParent==1:
265 >                                for f in parent :
266 >                                    pString += f  + ','
267                              newFile = 0
268                          except KeyError:
269 <                            common.logger.message("File "+str(file)+" has unknown number of events: skipping")
269 >                            common.logger.info("File "+str(file)+" has unknown number of events: skipping")
270  
271                      eventsPerJobRequested = min(eventsPerJobRequested, eventsRemaining)
272                      # if less events in file remain than eventsPerJobRequested
# Line 188 | Line 279 | class JobSplitter:
279                              if ( fileCount == numFilesInBlock-1 ) :
280                                  # end job using last file, use remaining events in block
281                                  # close job and touch new file
282 <                                fullString = parString[:-2]
282 >                                fullString = parString[:-1]
283                                  if self.useParent==1:
284 <                                    fullParentString = pString[:-2]
285 <                                    list_of_lists.append([fullString,fullParentString,str(-1),str(jobSkipEventCount)])
284 >                                    fullParentString = pString[:-1]
285 >                                    list_of_lists.append([fullString,fullParentString,str(-1),str(jobSkipEventCount),block])
286                                  else:
287 <                                    list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
288 <                                common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(filesEventCount - jobSkipEventCount)+" events (last file in block).")
287 >                                    list_of_lists.append([fullString,str(-1),str(jobSkipEventCount),block])
288 >                                msg += "Job %s can run over %s  events (last file in block).\n"%(str(jobCount+1), str(filesEventCount - jobSkipEventCount))
289                                  jobDestination.append(blockSites[block])
290 <                                common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(jobDestination[jobCount]))
290 >                                msg += "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
291                                  # fill jobs of block dictionary
292                                  jobsOfBlock[block].append(jobCount+1)
293                                  # reset counter
# Line 217 | Line 308 | class JobSplitter:
308                      # if events in file equal to eventsPerJobRequested
309                      elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
310                          # close job and touch new file
311 <                        fullString = parString[:-2]
311 >                        fullString = parString[:-1]
312                          if self.useParent==1:
313 <                            fullParentString = pString[:-2]
314 <                            list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
313 >                            fullParentString = pString[:-1]
314 >                            list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount),block])
315                          else:
316 <                            list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
317 <                        common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
316 >                            list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount),block])
317 >                        msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
318                          jobDestination.append(blockSites[block])
319 <                        common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(jobDestination[jobCount]))
319 >                        msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
320                          jobsOfBlock[block].append(jobCount+1)
321                          # reset counter
322                          jobCount = jobCount + 1
# Line 242 | Line 333 | class JobSplitter:
333                      # if more events in file remain than eventsPerJobRequested
334                      else :
335                          # close job but don't touch new file
336 <                        fullString = parString[:-2]
336 >                        fullString = parString[:-1]
337                          if self.useParent==1:
338 <                            fullParentString = pString[:-2]
339 <                            list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
338 >                            fullParentString = pString[:-1]
339 >                            list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount),block])
340                          else:
341 <                            list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
342 <                        common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
341 >                            list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount),block])
342 >                        msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
343                          jobDestination.append(blockSites[block])
344 <                        common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(jobDestination[jobCount]))
344 >                        msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
345                          jobsOfBlock[block].append(jobCount+1)
346                          # increase counter
347                          jobCount = jobCount + 1
# Line 261 | Line 352 | class JobSplitter:
352                          jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
353                          # remove all but the last file
354                          filesEventCount = self.eventsbyfile[file]
355 +                        pString_tmp=''
356                          if self.useParent==1:
357 <                            for f in parent : pString += '\\\"' + f + '\\\"\,'
358 <                        parString = '\\\"' + file + '\\\"\,'
357 >                            for f in parent : pString_tmp +=  f + ','
358 >                        pString =  pString_tmp
359 >                        parString =  file + ','
360                      pass # END if
361                  pass # END while (iterate over files in the block)
362          pass # END while (iterate over blocks in the dataset)
363 +        common.logger.debug(msg)
364          self.ncjobs = self.total_number_of_jobs = jobCount
365          if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
366 <            common.logger.message("Could not run on all requested events because some blocks not hosted at allowed sites.")
367 <        common.logger.message(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
368 <
366 >            common.logger.info("Could not run on all requested events because some blocks not hosted at allowed sites.")
367 >        common.logger.info(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
368 >
369          # skip check on  block with no sites  DD
370          if noBboundary == 0 : self.checkBlockNoSite(blocks,jobsOfBlock,blockSites)
371  
372         # prepare dict output
373          dictOut = {}
374 +        dictOut['params']= ['InputFiles','MaxEvents','SkipEvents','InputBlocks']
375 +        if self.useParent: dictOut['params']= ['InputFiles','ParentFiles','MaxEvents','SkipEvents','InputBlocks']
376          dictOut['args'] = list_of_lists
377          dictOut['jobDestination'] = jobDestination
378          dictOut['njobs']=self.total_number_of_jobs
# Line 285 | Line 381 | class JobSplitter:
381  
382          # keep trace of block with no sites to print a warning at the end
383  
384 <    def checkBlockNoSite(self,blocks,jobsOfBlock,blockSites):  
384 >    def checkBlockNoSite(self,blocks,jobsOfBlock,blockSites):
385          # screen output
386          screenOutput = "List of jobs and available destination sites:\n\n"
387          noSiteBlock = []
388          bloskNoSite = []
389 +        allBlock = []
390  
391          blockCounter = 0
392 +        saveFblocks =''
393          for block in blocks:
394              if block in jobsOfBlock.keys() :
395                  blockCounter += 1
396 +                allBlock.append( blockCounter )
397 +                sites=self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],[block]),[block])
398                  screenOutput += "Block %5i: jobs %20s: sites: %s\n" % (blockCounter,spanRanges(jobsOfBlock[block]),
399 <                    ','.join(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],[block]),[block])))
400 <                if len(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],[block]),[block])) == 0:
399 >                    ', '.join(SE2CMS(sites)))
400 >                if len(sites) == 0:
401                      noSiteBlock.append( spanRanges(jobsOfBlock[block]) )
402                      bloskNoSite.append( blockCounter )
403 +                else:
404 +                    saveFblocks += str(block)+'\n'
405 +        writeTXTfile(self, self.fileBlocks_FileName , saveFblocks)
406  
407 <        common.logger.message(screenOutput)
407 >        common.logger.info(screenOutput)
408          if len(noSiteBlock) > 0 and len(bloskNoSite) > 0:
409              msg = 'WARNING: No sites are hosting any part of data for block:\n                '
410              virgola = ""
# Line 309 | Line 412 | class JobSplitter:
412                  virgola = ","
413              for block in bloskNoSite:
414                  msg += ' ' + str(block) + virgola
415 <            msg += '\n               Related jobs:\n                 '
415 >            msg += '\n\t\tRelated jobs:\n                 '
416              virgola = ""
417              if len(noSiteBlock) > 1:
418                  virgola = ","
419              for range_jobs in noSiteBlock:
420                  msg += str(range_jobs) + virgola
421 <            msg += '\n               will not be submitted and this block of data can not be analyzed!\n'
422 <            if self.cfg_params.has_key('EDG.se_white_list'):
423 <                msg += 'WARNING: SE White List: '+self.cfg_params['EDG.se_white_list']+'\n'
424 <                msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
425 <                msg += 'Please check if the dataset is available at this site!)\n'
426 <            if self.cfg_params.has_key('EDG.ce_white_list'):
427 <                msg += 'WARNING: CE White List: '+self.cfg_params['EDG.ce_white_list']+'\n'
428 <                msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
429 <                msg += 'Please check if the dataset is available at this site!)\n'
430 <
431 <            common.logger.message(msg)
421 >            msg += '\n\t\twill not be submitted and this block of data can not be analyzed!\n'
422 >            if self.cfg_params.has_key('GRID.se_white_list'):
423 >                msg += '\tWARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
424 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
425 >                msg += '\tPlease check if the dataset is available at this site!)'
426 >            if self.cfg_params.has_key('GRID.ce_white_list'):
427 >                msg += '\tWARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
428 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
429 >                msg += '\tPlease check if the dataset is available at this site!)\n'
430 >
431 >            common.logger.info(msg)
432 >
433 >        if bloskNoSite == allBlock:
434 >            msg = 'Requested jobs cannot be Created! \n'
435 >            if self.cfg_params.has_key('GRID.se_white_list'):
436 >                msg += '\tWARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
437 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
438 >                msg += '\tPlease check if the dataset is available at this site!)'
439 >            if self.cfg_params.has_key('GRID.ce_white_list'):
440 >                msg += '\tWARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
441 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
442 >                msg += '\tPlease check if the dataset is available at this site!)\n'
443 >            raise CrabException(msg)
444  
445          return
446  
447  
448   ########################################################################
449 <    def jobSplittingByRun(self):
449 >    def jobSplittingByRun(self):
450          """
451          """
337        from sets import Set  
338        from WMCore.JobSplitting.RunBased import RunBased
339        from WMCore.DataStructs.Workflow import Workflow
340        from WMCore.DataStructs.File import File
341        from WMCore.DataStructs.Fileset import Fileset
342        from WMCore.DataStructs.Subscription import Subscription
343        from WMCore.JobSplitting.SplitterFactory import SplitterFactory
344        from WMCore.DataStructs.Run import Run
452  
453          self.checkUserSettings()
454 <        blockSites = self.args['blockSites']
454 >        blockSites = self.args['blockSites']
455          pubdata = self.args['pubdata']
456  
457          if self.selectNumberOfJobs == 0 :
458              self.theNumberOfJobs = 9999999
459          blocks = {}
460 <        runList = []
460 >        runList = []
461          thefiles = Fileset(name='FilesToSplit')
462          fileList = pubdata.getListFiles()
463          for f in fileList:
357           # print f
464              block = f['Block']['Name']
465 <          #  if not blocks.has_key(block):
360 <          #      blocks[block] = reader.listFileBlockLocation(block)
361 <            try:
465 >            try:
466                  f['Block']['StorageElementList'].extend(blockSites[block])
467              except:
468                  continue
469              wmbsFile = File(f['LogicalFileName'])
470 +            if not  blockSites[block]:
471 +                msg = 'WARNING: No sites are hosting any part of data for block: %s\n' %block                
472 +                msg += 'Related jobs will not be submitted and this block of data can not be analyzed'
473 +                common.logger.debug(msg)
474              [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
475              wmbsFile['block'] = block
476              runNum = f['RunsList'][0]['RunNumber']
477 <            runList.append(runNum)
477 >            runList.append(runNum)
478              myRun = Run(runNumber=runNum)
479              wmbsFile.addRun( myRun )
480              thefiles.addFile(
481                  wmbsFile
482                  )
483 <
483 >
484          work = Workflow()
485          subs = Subscription(
486          fileset = thefiles,
# Line 381 | Line 489 | class JobSplitter:
489          type = "Processing")
490          splitter = SplitterFactory()
491          jobfactory = splitter(subs)
492 <        
493 <        #loop over all runs
386 <        set = Set(runList)
492 >
493 >        #loop over all runs
494          list_of_lists = []
495          jobDestination = []
496 <
496 >        list_of_blocks = []
497          count = 0
498 <        for i in list(set):
498 >        for jobGroup in  jobfactory():
499              if count <  self.theNumberOfJobs:
500 <                res = self.getJobInfo(jobfactory())
501 <                parString = ''
500 >                res = self.getJobInfo(jobGroup)
501 >                parString = ''
502                  for file in res['lfns']:
503 <                    parString += '\\\"' + file + '\\\"\,'
504 <                fullString = parString[:-2]
505 <                list_of_lists.append([fullString,str(-1),str(0)])    
503 >                    parString += file + ','
504 >                list_of_blocks.append(res['block'])
505 >                fullString = parString[:-1]
506 >                blockString=','.join(list_of_blocks)
507 >                list_of_lists.append([fullString,str(-1),str(0),blockString])
508                  #need to check single file location
509 <                jobDestination.append(res['locations'])  
509 >                jobDestination.append(res['locations'])
510                  count +=1
511 <        #print jobDestination
403 <       # prepare dict output
511 >        # prepare dict output
512          dictOut = {}
513 +        dictOut['params']= ['InputFiles','MaxEvents','SkipEvents','InputBlocks']
514          dictOut['args'] = list_of_lists
515          dictOut['jobDestination'] = jobDestination
516          dictOut['njobs']=count
517 +        self.cacheBlocks(list_of_blocks,jobDestination)
518  
519          return dictOut
520  
521      def getJobInfo( self,jobGroup ):
522          res = {}
523 <        lfns = []        
524 <        locations = []        
523 >        lfns = []
524 >        locations = []
525          tmp_check=0
526          for job in jobGroup.jobs:
527              for file in job.getFiles():
528 <                lfns.append(file['lfn'])
528 >                lfns.append(file['lfn'])
529                  for loc in file['locations']:
530                      if tmp_check < 1 :
531                          locations.append(loc)
532 <                    tmp_check = tmp_check + 1
533 <                ### qui va messo il check per la locations
534 <        res['lfns'] = lfns
535 <        res['locations'] = locations
536 <        return res                
537 <      
532 >                        res['block']= file['block']
533 >                tmp_check = tmp_check + 1
534 >        res['lfns'] = lfns
535 >        res['locations'] = locations
536 >        return res
537 >
538   ########################################################################
539 <    def jobSplittingNoInput(self):
539 >    def prepareSplittingNoInput(self):
540          """
431        Perform job splitting based on number of event per job
541          """
433        common.logger.debug(5,'Splitting per events')
434        self.checkUserSettings()
435        jobDestination=[]
436        if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
437            msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
438            raise CrabException(msg)
439
440        managedGenerators =self.args['managedGenerators']
441        generator = self.args['generator']
442        firstRun = self.cfg_params.get('CMSSW.first_run',None)
443
542          if (self.selectEventsPerJob):
543 <            common.logger.message('Required '+str(self.eventsPerJob)+' events per job ')
543 >            common.logger.info('Required '+str(self.eventsPerJob)+' events per job ')
544          if (self.selectNumberOfJobs):
545 <            common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
545 >            common.logger.info('Required '+str(self.theNumberOfJobs)+' jobs in total ')
546          if (self.selectTotalNumberEvents):
547 <            common.logger.message('Required '+str(self.total_number_of_events)+' events in total ')
547 >            common.logger.info('Required '+str(self.total_number_of_events)+' events in total ')
548  
549          if (self.total_number_of_events < 0):
550              msg='Cannot split jobs per Events with "-1" as total number of events'
# Line 463 | Line 561 | class JobSplitter:
561              self.total_number_of_jobs = self.theNumberOfJobs
562              self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
563  
564 <        common.logger.debug(5,'N jobs  '+str(self.total_number_of_jobs))
564 >
565 >    def jobSplittingNoInput(self):
566 >        """
567 >        Perform job splitting based on number of event per job
568 >        """
569 >        common.logger.debug('Splitting per events')
570 >        self.checkUserSettings()
571 >        jobDestination=[]
572 >        if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
573 >            msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
574 >            raise CrabException(msg)
575 >
576 >        managedGenerators =self.args['managedGenerators']
577 >        generator = self.args['generator']
578 >        firstLumi = int(self.cfg_params.get('CMSSW.first_lumi', 1))
579 >
580 >        self.prepareSplittingNoInput()
581 >
582 >        common.logger.debug('N jobs  '+str(self.total_number_of_jobs))
583  
584          # is there any remainder?
585          check = int(self.total_number_of_events) - (int(self.total_number_of_jobs)*self.eventsPerJob)
586  
587 <        common.logger.debug(5,'Check  '+str(check))
587 >        common.logger.debug('Check  '+str(check))
588  
589 <        common.logger.message(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
589 >        common.logger.info(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
590          if check > 0:
591 <            common.logger.message('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
591 >            common.logger.info('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
592  
593          # argument is seed number.$i
594          self.list_of_args = []
595          for i in range(self.total_number_of_jobs):
596              ## Since there is no input, any site is good
597 <            jobDestination.append([""]) #must be empty to write correctly the xml
597 >            jobDestination.append([""]) # must be empty to correctly write the XML
598              args=[]
599 <            if (firstRun):
600 <                ## pythia first run
485 <                args.append(str(firstRun)+str(i))
599 >            if (firstLumi): # Pythia first lumi
600 >                args.append(str(int(firstLumi)+i))
601              if (generator in managedGenerators):
602 <                if (generator == 'comphep' and i == 0):
602 >               args.append(generator)
603 >               if (generator == 'comphep' and i == 0):
604                      # COMPHEP is brain-dead and wants event #'s like 1,100,200,300
605                      args.append('1')
606 <                else:
606 >               else:
607                      args.append(str(i*self.eventsPerJob))
608              args.append(str(self.eventsPerJob))
609              self.list_of_args.append(args)
610         # prepare dict output
611 +
612          dictOut = {}
613 +        dictOut['params'] = ['MaxEvents']
614 +        if (firstLumi):
615 +            dictOut['params'] = ['FirstLumi','MaxEvents']
616 +            if (generator in managedGenerators):
617 +                dictOut['params'] = ['FirstLumi', 'Generator', 'FirstEvent', 'MaxEvents']
618 +        else:
619 +            if (generator in managedGenerators) :
620 +                dictOut['params'] = ['Generator', 'FirstEvent', 'MaxEvents']
621          dictOut['args'] = self.list_of_args
622          dictOut['jobDestination'] = jobDestination
623          dictOut['njobs']=self.total_number_of_jobs
# Line 509 | Line 634 | class JobSplitter:
634              msg = 'must specify  number_of_jobs.'
635              raise crabexception(msg)
636          jobDestination = []
637 <        common.logger.debug(5,'Splitting per job')
638 <        common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
637 >        common.logger.debug('Splitting per job')
638 >        common.logger.info('Required '+str(self.theNumberOfJobs)+' jobs in total ')
639 >
640 > #        self.total_number_of_jobs = self.theNumberOfJobs
641  
642 <        self.total_number_of_jobs = self.theNumberOfJobs
642 >        self.prepareSplittingNoInput()
643  
644 <        common.logger.debug(5,'N jobs  '+str(self.total_number_of_jobs))
644 >        common.logger.debug('N jobs  '+str(self.total_number_of_jobs))
645  
646 <        common.logger.message(str(self.total_number_of_jobs)+' jobs can be created')
646 >        common.logger.info(str(self.total_number_of_jobs)+' jobs can be created')
647  
648          # argument is seed number.$i
649          self.list_of_args = []
650          for i in range(self.total_number_of_jobs):
651 +            args=[]
652              jobDestination.append([""])
653 <            self.list_of_args.append([str(i)])
653 >            if self.eventsPerJob != 0 :
654 >                args.append(str(self.eventsPerJob))
655 >                self.list_of_args.append(args)
656  
657         # prepare dict output
658          dictOut = {}
659 <        dictOut['args'] = self.list_of_args
659 >        dictOut['params'] = ['MaxEvents']
660 >        dictOut['args'] =  self.list_of_args
661          dictOut['jobDestination'] = jobDestination
662          dictOut['njobs']=self.total_number_of_jobs
663          return dictOut
533
664  
665 <    def jobSplittingByLumi(self):
665 >
666 >    def jobSplittingByLumi(self):
667          """
668 +        Split task into jobs by Lumi section paying attention to which
669 +        lumis should be run (according to the analysis dataset).
670 +        This uses WMBS job splitting which does not split files over jobs
671 +        so the job will have AT LEAST as many lumis as requested, perhaps
672 +        more
673          """
674 <        return
674 >        self.useParent = int(self.cfg_params.get('CMSSW.use_parent',0))
675 >        common.logger.debug('Splitting by Lumi')
676 >        self.checkLumiSettings()
677 >
678 >        blockSites = self.args['blockSites']
679 >        pubdata = self.args['pubdata']
680 >
681 >        lumisPerFile  = pubdata.getLumis()
682 >        self.parentFiles=pubdata.getParent()
683 >        # Make the list of WMBS files for job splitter
684 >        fileList = pubdata.getListFiles()
685 >        wmFileList = []
686 >        for jobFile in fileList:
687 >            block = jobFile['Block']['Name']
688 >            try:
689 >                jobFile['Block']['StorageElementList'].extend(blockSites[block])
690 >            except:
691 >                continue
692 >            wmbsFile = File(jobFile['LogicalFileName'])
693 >            if not  blockSites[block]:
694 >                msg = 'WARNING: No sites are hosting any part of data for block: %s\n' %block
695 >                msg += 'Related jobs will not be submitted and this block of data can not be analyzed'
696 >                common.logger.debug(msg)
697 >               # wmbsFile['locations'].add('Nowhere')
698 >            [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
699 >            wmbsFile['block'] = block
700 >            for lumi in lumisPerFile[jobFile['LogicalFileName']]:
701 >                wmbsFile.addRun(Run(lumi[0], lumi[1]))
702 >            wmFileList.append(wmbsFile)
703 >
704 >        fileSet = set(wmFileList)
705 >        thefiles = Fileset(name='FilesToSplit', files = fileSet)
706 >
707 >        # Create the factory and workflow
708 >        work = Workflow()
709 >        subs = Subscription(fileset    = thefiles,    workflow = work,
710 >                            split_algo = 'LumiBased', type     = "Processing")
711 >        splitter = SplitterFactory()
712 >        jobFactory = splitter(subs)
713 >
714 >        list_of_lists = []
715 >        jobDestination = []
716 >        jobCount = 0
717 >        lumisCreated = 0
718 >        list_of_blocks = []
719 >        if not self.limitJobLumis:
720 >            if self.totalNLumis > 0:
721 >                self.lumisPerJob = max(self.totalNLumis // self.theNumberOfJobs,1)
722 >            else:
723 >                self.lumisPerJob = pubdata.getMaxLumis() // self.theNumberOfJobs + 1
724 >            common.logger.info('Each job will process about %s lumis.' %
725 >                                self.lumisPerJob)
726 >
727 >        for jobGroup in jobFactory(lumis_per_job = self.lumisPerJob):
728 >            for job in jobGroup.jobs:
729 >                if (self.limitNJobs and jobCount >= self.theNumberOfJobs):
730 >                    common.logger.info('Requested number of jobs reached.')
731 >                    break
732 >                if (self.limitTotalLumis and lumisCreated >= self.totalNLumis):
733 >                    common.logger.info('Requested number of lumis reached.')
734 >                    break
735 >                lumis = []
736 >                lfns  = []
737 >                if self.useParent==1:
738 >                 parentlfns  = []
739 >                 pString =""
740 >
741 >                locations = []
742 >                blocks = []
743 >                firstFile = True
744 >                # Collect information from all the files
745 >                for jobFile in job.getFiles():
746 >                    doFile = False
747 >                    if firstFile:  # Get locations from first file in the job
748 >                        for loc in jobFile['locations']:
749 >                            locations.append(loc)
750 >                        blocks.append(jobFile['block'])
751 >                        firstFile = False
752 >                    # Accumulate Lumis from all files
753 >                    for lumiList in jobFile['runs']:
754 >                        theRun = lumiList.run
755 >                        for theLumi in list(lumiList):
756 >                            if (not self.limitTotalLumis) or \
757 >                               (lumisCreated < self.totalNLumis):
758 >                                doFile = True
759 >                                lumisCreated += 1
760 >                                lumis.append( (theRun, theLumi) )
761 >                    if doFile:
762 >                        lfns.append(jobFile['lfn'])
763 >                        if self.useParent==1:
764 >                           parent = self.parentFiles[jobFile['lfn']]
765 >                           for p in parent :
766 >                               pString += p  + ','
767 >                fileString = ','.join(lfns)
768 >                lumiLister = LumiList(lumis = lumis)
769 >                lumiString = lumiLister.getCMSSWString()
770 >                blockString=','.join(blocks)
771 >                if self.useParent==1:
772 >                  common.logger.debug("Files: "+fileString+" with the following parents: "+pString[:-1])
773 >                  pfileString = pString[:-1]
774 >                  list_of_lists.append([fileString, pfileString, str(-1), str(0), lumiString,blockString])
775 >                else:
776 >                 list_of_lists.append([fileString, str(-1), str(0), lumiString, blockString])
777 >                list_of_blocks.append(blocks)
778 >                jobDestination.append(locations)
779 >                jobCount += 1
780 >                common.logger.debug('Job %s will run on %s files and %s lumis '
781 >                    % (jobCount, len(lfns), len(lumis) ))
782 >
783 >        common.logger.info('%s jobs created to run on %s lumis' %
784 >                              (jobCount, lumisCreated))
785 >
786 >        # Prepare dict output matching back to non-WMBS job creation
787 >        dictOut = {}
788 >        dictOut['params'] = ['InputFiles', 'MaxEvents', 'SkipEvents', 'Lumis','InputBlocks']
789 >        if self.useParent==1:
790 >         dictOut['params']= ['InputFiles','ParentFiles','MaxEvents','SkipEvents','Lumis','InputBlocks']
791 >        dictOut['args'] = list_of_lists
792 >        dictOut['jobDestination'] = jobDestination
793 >        dictOut['njobs'] = jobCount
794 >        self.cacheBlocks(list_of_blocks,jobDestination)
795 >
796 >        return dictOut
797 >
798 >    def cacheBlocks(self, blocks,destinations):
799 >
800 >        saveFblocks=''
801 >        for i in range(len(blocks)):
802 >            sites=self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(destinations[i]))
803 >            if len(sites) != 0:
804 >                for block in blocks[i]:
805 >                    saveFblocks += str(block)+'\n'
806 >        writeTXTfile(self, self.fileBlocks_FileName , saveFblocks)
807 >
808      def Algos(self):
809          """
810          Define key splittingType matrix
811          """
812 <        SplitAlogs = {
813 <                     'EventBased'           : self.jobSplittingByEvent,
812 >        SplitAlogs = {
813 >                     'EventBased'           : self.jobSplittingByEvent,
814                       'RunBased'             : self.jobSplittingByRun,
815 <                     'LumiBased'            : self.jobSplittingByLumi,
816 <                     'NoInput'              : self.jobSplittingNoInput,
815 >                     'LumiBased'            : self.jobSplittingByLumi,
816 >                     'NoInput'              : self.jobSplittingNoInput,
817                       'ForScript'            : self.jobSplittingForScript
818 <                     }  
818 >                     }
819          return SplitAlogs
820  

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines