ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Splitter.py
(Generate patch)

Comparing COMP/CRAB/python/Splitter.py (file contents):
Revision 1.2 by spiga, Wed Feb 4 15:09:03 2009 UTC vs.
Revision 1.52 by spiga, Mon May 9 11:51:28 2011 UTC

# Line 1 | Line 1
1 +
2 + __revision__ = "$Id$"
3 + __version__ = "$Revision$"
4 +
5   import common
2 from crab_logger import Logger
6   from crab_exceptions import *
7   from crab_util import *
8 +
9 + from WMCore.DataStructs.File import File
10 + from WMCore.DataStructs.Fileset import Fileset
11 + from WMCore.DataStructs.Run import Run
12 + from WMCore.DataStructs.Subscription import Subscription
13 + from WMCore.DataStructs.Workflow import Workflow
14 + from WMCore.JobSplitting.SplitterFactory import SplitterFactory
15   from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
16 + try: # Can remove when CMSSW 3.7 and earlier are dropped
17 +    from FWCore.PythonUtilities.LumiList import LumiList
18 + except ImportError:
19 +    from LumiList import LumiList
20  
21   class JobSplitter:
22      def __init__( self, cfg_params,  args ):
23          self.cfg_params = cfg_params
24 <        self.blockSites = args['blockSites']
25 <        self.pubdata = args['pubdata']
24 >        self.args=args
25 >
26 >        self.lumisPerJob = -1
27 >        self.totalNLumis = 0
28 >        self.theNumberOfJobs = 0
29 >        self.limitNJobs = False
30 >        self.limitTotalLumis = False
31 >        self.limitJobLumis = False
32 >
33          #self.maxEvents
13        self.jobDestination=[]  # Site destination(s) for each job (list of lists)
34          # init BlackWhiteListParser
35 <        seWhiteList = cfg_params.get('EDG.se_white_list',[])
36 <        seBlackList = cfg_params.get('EDG.se_black_list',[])
37 <        self.blackWhiteListParser = SEBlackWhiteListParser(seWhiteList, seBlackList, common.logger)
35 >        self.seWhiteList = cfg_params.get('GRID.se_white_list',[])
36 >        seBlackList = cfg_params.get('GRID.se_black_list',[])
37 >        self.blackWhiteListParser = SEBlackWhiteListParser(self.seWhiteList, seBlackList, common.logger())
38 >
39 >        ## check if has been asked for a non default file to store/read analyzed fileBlocks
40 >        defaultName = common.work_space.shareDir()+'AnalyzedBlocks.txt'
41 >        self.fileBlocks_FileName = os.path.abspath(self.cfg_params.get('CMSSW.fileblocks_file',defaultName))
42  
43  
44      def checkUserSettings(self):
# Line 45 | Line 69 | class JobSplitter:
69              self.total_number_of_events = 0
70              self.selectTotalNumberEvents = 0
71  
72 +        return
73 +
74 +    def checkLumiSettings(self):
75 +        """
76 +        Check to make sure the user has specified enough information to
77 +        perform splitting by Lumis to run the job
78 +        """
79 +        settings = 0
80 +        if self.cfg_params.has_key('CMSSW.lumis_per_job'):
81 +            self.lumisPerJob =int( self.cfg_params['CMSSW.lumis_per_job'])
82 +            self.limitJobLumis = True
83 +            settings += 1
84 +
85 +        if self.cfg_params.has_key('CMSSW.number_of_jobs'):
86 +            self.theNumberOfJobs =int( self.cfg_params['CMSSW.number_of_jobs'])
87 +            self.limitNJobs = True
88 +            settings += 1
89 +
90 +        if self.cfg_params.has_key('CMSSW.total_number_of_lumis'):
91 +            self.totalNLumis = int(self.cfg_params['CMSSW.total_number_of_lumis'])
92 +            self.limitTotalLumis = (self.totalNLumis != -1)
93 +            settings += 1
94 +
95 +        if settings != 2:
96 +            msg = 'When splitting by lumi section you must specify two and only two of:\n'
97 +            msg += '  number_of_jobs, lumis_per_job, total_number_of_lumis'
98 +            raise CrabException(msg)
99 +        if self.limitNJobs and self.limitJobLumis:
100 +            self.limitTotalLumis = True
101 +            self.totalNLumis = self.lumisPerJob * self.theNumberOfJobs
102 +
103 +        # Has the user specified runselection?
104 +        if (self.cfg_params.has_key('CMSSW.runselection')):
105 +            common.logger.info('You have specified runselection and split by lumi.')
106 +            common.logger.info('Good lumi list will be the intersection of runselection and lumimask or ADS (if any).')
107 +        return
108 +
109 +    def ComputeSubBlockSites( self, blockSites ):
110 +        """
111 +        """
112 +        sub_blockSites = {}
113 +        for k,v in blockSites.iteritems():
114 +            sites=self.blackWhiteListParser.checkWhiteList(v)
115 +            if sites : sub_blockSites[k]=v
116 +        if len(sub_blockSites) < 1:
117 +            msg = 'WARNING: the sites %s is not hosting any part of data.'%self.seWhiteList
118 +            raise CrabException(msg)
119 +        return sub_blockSites
120  
121   ########################################################################
122      def jobSplittingByEvent( self ):
# Line 55 | Line 127 | class JobSplitter:
127          REQUIRES: self.selectTotalNumberEvents, self.selectEventsPerJob, self.selectNumberofJobs,
128                    self.total_number_of_events, self.eventsPerJob, self.theNumberOfJobs,
129                    self.maxEvents, self.filesbyblock
130 <        SETS: self.jobDestination - Site destination(s) for each job (a list of lists)
130 >        SETS: jobDestination - Site destination(s) for each job (a list of lists)
131                self.total_number_of_jobs - Total # of jobs
132                self.list_of_args - File(s) job will run on (a list of lists)
133          """
134  
135 +        jobDestination=[]
136          self.checkUserSettings()
137          if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
138              msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
139              raise CrabException(msg)
67
68        self.filesbyblock=self.pubdata.getFiles()
140  
141 <        self.eventsbyblock=self.pubdata.getEventsPerBlock()
142 <        self.eventsbyfile=self.pubdata.getEventsPerFile()
143 <        self.parentFiles=self.pubdata.getParent()
141 >        blockSites = self.args['blockSites']
142 >        pubdata = self.args['pubdata']
143 >        filesbyblock=pubdata.getFiles()
144 >
145 >        self.eventsbyblock=pubdata.getEventsPerBlock()
146 >        self.eventsbyfile=pubdata.getEventsPerFile()
147 >        self.parentFiles=pubdata.getParent()
148  
149          ## get max number of events
150 <        self.maxEvents=self.pubdata.getMaxEvents()
150 >        self.maxEvents=pubdata.getMaxEvents()
151  
152          self.useParent = int(self.cfg_params.get('CMSSW.use_parent',0))
153          noBboundary = int(self.cfg_params.get('CMSSW.no_block_boundary',0))
154  
155 +        if noBboundary == 1:
156 +            if self.total_number_of_events== -1:
157 +                msg = 'You are selecting no_block_boundary=1 which does not allow to set total_number_of_events=-1\n'
158 +                msg +='\tYou shoud get the number of event from DBS web interface and use it for your configuration.'
159 +                raise CrabException(msg)
160 +            if len(self.seWhiteList) == 0 or  len(self.seWhiteList.split(',')) != 1:
161 +                msg = 'You are selecting no_block_boundary=1 which requires to choose one and only one site.\n'
162 +                msg += "\tPlease set se_white_list with the site's storage element name."
163 +                raise  CrabException(msg)
164 +            blockSites = self.ComputeSubBlockSites(blockSites)
165 +
166          # ---- Handle the possible job splitting configurations ---- #
167          if (self.selectTotalNumberEvents):
168              totalEventsRequested = self.total_number_of_events
# Line 91 | Line 177 | class JobSplitter:
177          # If user requested more events than are in the dataset
178          elif (totalEventsRequested > self.maxEvents):
179              eventsRemaining = self.maxEvents
180 <            common.logger.message("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
180 >            common.logger.info("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
181          # If user requested less events than are in the dataset
182          else:
183              eventsRemaining = totalEventsRequested
# Line 107 | Line 193 | class JobSplitter:
193              eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs)
194  
195          if (self.selectNumberOfJobs):
196 <            common.logger.message("May not create the exact number_of_jobs requested.")
196 >            common.logger.info("May not create the exact number_of_jobs requested.")
197  
198          # old... to remove Daniele
199          totalNumberOfJobs = 999999999
200  
201 <        blocks = self.blockSites.keys()
201 >        blocks = blockSites.keys()
202          blockCount = 0
203          # Backup variable in case self.maxEvents counted events in a non-included block
204          numBlocksInDataset = len(blocks)
# Line 124 | Line 210 | class JobSplitter:
210          jobsOfBlock = {}
211  
212          parString = ""
213 +        pString = ""
214          filesEventCount = 0
215 +        msg=''
216  
217          # ---- Iterate over the blocks in the dataset until ---- #
218          # ---- we've met the requested total # of events    ---- #
# Line 136 | Line 224 | class JobSplitter:
224  
225              if self.eventsbyblock.has_key(block) :
226                  numEventsInBlock = self.eventsbyblock[block]
227 <                common.logger.debug(5,'Events in Block File '+str(numEventsInBlock))
227 >                common.logger.debug('Events in Block File '+str(numEventsInBlock))
228  
229 <                files = self.filesbyblock[block]
229 >                files = filesbyblock[block]
230                  numFilesInBlock = len(files)
231                  if (numFilesInBlock <= 0):
232                      continue
# Line 146 | Line 234 | class JobSplitter:
234                  if noBboundary == 0: # DD
235                      # ---- New block => New job ---- #
236                      parString = ""
237 +                    pString=""
238                      # counter for number of events in files currently worked on
239                      filesEventCount = 0
240                  # flag if next while loop should touch new file
# Line 155 | Line 244 | class JobSplitter:
244  
245                  # ---- Iterate over the files in the block until we've met the requested ---- #
246                  # ---- total # of events or we've gone over all the files in this block  ---- #
247 <                pString=''
247 >                msg='\n'
248                  while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
249                      file = files[fileCount]
250                      if self.useParent==1:
251                          parent = self.parentFiles[file]
252 <                        for f in parent :
164 <                            pString += '\\\"' + f + '\\\"\,'
165 <                        common.logger.debug(6, "File "+str(file)+" has the following parents: "+str(parent))
166 <                        common.logger.write("File "+str(file)+" has the following parents: "+str(parent))
252 >                        common.logger.log(10-1, "File "+str(file)+" has the following parents: "+str(parent))
253                      if newFile :
254                          try:
255                              numEventsInFile = self.eventsbyfile[file]
256 <                            common.logger.debug(6, "File "+str(file)+" has "+str(numEventsInFile)+" events")
256 >                            common.logger.log(10-1, "File "+str(file)+" has "+str(numEventsInFile)+" events")
257                              # increase filesEventCount
258                              filesEventCount += numEventsInFile
259                              # Add file to current job
260 <                            parString += '\\\"' + file + '\\\"\,'
260 >                            parString +=  file + ','
261 >                            if self.useParent==1:
262 >                                for f in parent :
263 >                                    pString += f  + ','
264                              newFile = 0
265                          except KeyError:
266 <                            common.logger.message("File "+str(file)+" has unknown number of events: skipping")
266 >                            common.logger.info("File "+str(file)+" has unknown number of events: skipping")
267  
268                      eventsPerJobRequested = min(eventsPerJobRequested, eventsRemaining)
269                      # if less events in file remain than eventsPerJobRequested
# Line 187 | Line 276 | class JobSplitter:
276                              if ( fileCount == numFilesInBlock-1 ) :
277                                  # end job using last file, use remaining events in block
278                                  # close job and touch new file
279 <                                fullString = parString[:-2]
279 >                                fullString = parString[:-1]
280                                  if self.useParent==1:
281 <                                    fullParentString = pString[:-2]
282 <                                    list_of_lists.append([fullString,fullParentString,str(-1),str(jobSkipEventCount)])
281 >                                    fullParentString = pString[:-1]
282 >                                    list_of_lists.append([fullString,fullParentString,str(-1),str(jobSkipEventCount),block])
283                                  else:
284 <                                    list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
285 <                                common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(filesEventCount - jobSkipEventCount)+" events (last file in block).")
286 <                                self.jobDestination.append(self.blockSites[block])
287 <                                common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
284 >                                    list_of_lists.append([fullString,str(-1),str(jobSkipEventCount),block])
285 >                                msg += "Job %s can run over %s  events (last file in block).\n"%(str(jobCount+1), str(filesEventCount - jobSkipEventCount))
286 >                                jobDestination.append(blockSites[block])
287 >                                msg += "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
288                                  # fill jobs of block dictionary
289                                  jobsOfBlock[block].append(jobCount+1)
290                                  # reset counter
# Line 216 | Line 305 | class JobSplitter:
305                      # if events in file equal to eventsPerJobRequested
306                      elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
307                          # close job and touch new file
308 <                        fullString = parString[:-2]
308 >                        fullString = parString[:-1]
309                          if self.useParent==1:
310 <                            fullParentString = pString[:-2]
311 <                            list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
310 >                            fullParentString = pString[:-1]
311 >                            list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount),block])
312                          else:
313 <                            list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
314 <                        common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
315 <                        self.jobDestination.append(self.blockSites[block])
316 <                        common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
313 >                            list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount),block])
314 >                        msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
315 >                        jobDestination.append(blockSites[block])
316 >                        msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
317                          jobsOfBlock[block].append(jobCount+1)
318                          # reset counter
319                          jobCount = jobCount + 1
# Line 241 | Line 330 | class JobSplitter:
330                      # if more events in file remain than eventsPerJobRequested
331                      else :
332                          # close job but don't touch new file
333 <                        fullString = parString[:-2]
333 >                        fullString = parString[:-1]
334                          if self.useParent==1:
335 <                            fullParentString = pString[:-2]
336 <                            list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
335 >                            fullParentString = pString[:-1]
336 >                            list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount),block])
337                          else:
338 <                            list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
339 <                        common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
340 <                        self.jobDestination.append(self.blockSites[block])
341 <                        common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
338 >                            list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount),block])
339 >                        msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
340 >                        jobDestination.append(blockSites[block])
341 >                        msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
342                          jobsOfBlock[block].append(jobCount+1)
343                          # increase counter
344                          jobCount = jobCount + 1
# Line 260 | Line 349 | class JobSplitter:
349                          jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
350                          # remove all but the last file
351                          filesEventCount = self.eventsbyfile[file]
352 +                        pString_tmp=''
353                          if self.useParent==1:
354 <                            for f in parent : pString += '\\\"' + f + '\\\"\,'
355 <                        parString = '\\\"' + file + '\\\"\,'
354 >                            for f in parent : pString_tmp +=  f + ','
355 >                        pString =  pString_tmp
356 >                        parString =  file + ','
357                      pass # END if
358                  pass # END while (iterate over files in the block)
359          pass # END while (iterate over blocks in the dataset)
360 +        common.logger.debug(msg)
361          self.ncjobs = self.total_number_of_jobs = jobCount
362          if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
363 <            common.logger.message("Could not run on all requested events because some blocks not hosted at allowed sites.")
364 <        common.logger.message(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
365 <
363 >            common.logger.info("Could not run on all requested events because some blocks not hosted at allowed sites.")
364 >        common.logger.info(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
365 >
366          # skip check on  block with no sites  DD
367 <        if noBboundary == 0 : self.checkBlockNoSite(blocks,jobsOfBlock)
367 >        if noBboundary == 0 : self.checkBlockNoSite(blocks,jobsOfBlock,blockSites)
368  
369         # prepare dict output
370          dictOut = {}
371 +        dictOut['params']= ['InputFiles','MaxEvents','SkipEvents','InputBlocks']
372 +        if self.useParent: dictOut['params']= ['InputFiles','ParentFiles','MaxEvents','SkipEvents','InputBlocks']
373          dictOut['args'] = list_of_lists
374 <        dictOut['jobDestination'] = self.jobDestination
374 >        dictOut['jobDestination'] = jobDestination
375          dictOut['njobs']=self.total_number_of_jobs
376  
377          return dictOut
378  
379          # keep trace of block with no sites to print a warning at the end
380  
381 <    def checkBlockNoSite(self,blocks,jobsOfBlock):  
381 >    def checkBlockNoSite(self,blocks,jobsOfBlock,blockSites):
382          # screen output
383          screenOutput = "List of jobs and available destination sites:\n\n"
384          noSiteBlock = []
385          bloskNoSite = []
386 +        allBlock = []
387  
388          blockCounter = 0
389 +        saveFblocks =''
390          for block in blocks:
391              if block in jobsOfBlock.keys() :
392                  blockCounter += 1
393 +                allBlock.append( blockCounter )
394 +                sites=self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],[block]),[block])
395                  screenOutput += "Block %5i: jobs %20s: sites: %s\n" % (blockCounter,spanRanges(jobsOfBlock[block]),
396 <                    ','.join(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(self.blockSites[block],block),block)))
397 <                if len(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(self.blockSites[block],block),block)) == 0:
396 >                    ', '.join(SE2CMS(sites)))
397 >                if len(sites) == 0:
398                      noSiteBlock.append( spanRanges(jobsOfBlock[block]) )
399                      bloskNoSite.append( blockCounter )
400 +                else:
401 +                    saveFblocks += str(block)+'\n'
402 +        writeTXTfile(self, self.fileBlocks_FileName , saveFblocks)
403  
404 <        common.logger.message(screenOutput)
404 >        common.logger.info(screenOutput)
405          if len(noSiteBlock) > 0 and len(bloskNoSite) > 0:
406              msg = 'WARNING: No sites are hosting any part of data for block:\n                '
407              virgola = ""
# Line 308 | Line 409 | class JobSplitter:
409                  virgola = ","
410              for block in bloskNoSite:
411                  msg += ' ' + str(block) + virgola
412 <            msg += '\n               Related jobs:\n                 '
412 >            msg += '\n\t\tRelated jobs:\n                 '
413              virgola = ""
414              if len(noSiteBlock) > 1:
415                  virgola = ","
416              for range_jobs in noSiteBlock:
417                  msg += str(range_jobs) + virgola
418 <            msg += '\n               will not be submitted and this block of data can not be analyzed!\n'
419 <            if self.cfg_params.has_key('EDG.se_white_list'):
420 <                msg += 'WARNING: SE White List: '+self.cfg_params['EDG.se_white_list']+'\n'
421 <                msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
422 <                msg += 'Please check if the dataset is available at this site!)\n'
423 <            if self.cfg_params.has_key('EDG.ce_white_list'):
424 <                msg += 'WARNING: CE White List: '+self.cfg_params['EDG.ce_white_list']+'\n'
425 <                msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
426 <                msg += 'Please check if the dataset is available at this site!)\n'
427 <
428 <            common.logger.message(msg)
418 >            msg += '\n\t\twill not be submitted and this block of data can not be analyzed!\n'
419 >            if self.cfg_params.has_key('GRID.se_white_list'):
420 >                msg += '\tWARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
421 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
422 >                msg += '\tPlease check if the dataset is available at this site!)'
423 >            if self.cfg_params.has_key('GRID.ce_white_list'):
424 >                msg += '\tWARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
425 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
426 >                msg += '\tPlease check if the dataset is available at this site!)\n'
427 >
428 >            common.logger.info(msg)
429 >
430 >        if bloskNoSite == allBlock:
431 >            msg += 'Requested jobs cannot be Created! \n'
432 >            if self.cfg_params.has_key('GRID.se_white_list'):
433 >                msg += '\tWARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
434 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
435 >                msg += '\tPlease check if the dataset is available at this site!)'
436 >            if self.cfg_params.has_key('GRID.ce_white_list'):
437 >                msg += '\tWARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
438 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
439 >                msg += '\tPlease check if the dataset is available at this site!)\n'
440 >            raise CrabException(msg)
441  
442          return
443  
444  
445   ########################################################################
446 <    def jobSplittingByRun(self):
446 >    def jobSplittingByRun(self):
447          """
448          """
336        from sets import Set  
337        from WMCore.JobSplitting.RunBased import RunBased
338        from WMCore.DataStructs.Workflow import Workflow
339        from WMCore.DataStructs.File import File
340        from WMCore.DataStructs.Fileset import Fileset
341        from WMCore.DataStructs.Subscription import Subscription
342        from WMCore.JobSplitting.SplitterFactory import SplitterFactory
343        from WMCore.DataStructs.Run import Run
449  
450          self.checkUserSettings()
451 +        blockSites = self.args['blockSites']
452 +        pubdata = self.args['pubdata']
453  
454          if self.selectNumberOfJobs == 0 :
455              self.theNumberOfJobs = 9999999
456          blocks = {}
457 <        runList = []
457 >        runList = []
458          thefiles = Fileset(name='FilesToSplit')
459 <        fileList = self.pubdata.getListFiles()
459 >        fileList = pubdata.getListFiles()
460          for f in fileList:
354           # print f
461              block = f['Block']['Name']
462 <          #  if not blocks.has_key(block):
463 <          #      blocks[block] = reader.listFileBlockLocation(block)
358 <            try:
359 <                f['Block']['StorageElementList'].extend(self.blockSites[block])
462 >            try:
463 >                f['Block']['StorageElementList'].extend(blockSites[block])
464              except:
465                  continue
466              wmbsFile = File(f['LogicalFileName'])
467 <            [ wmbsFile['locations'].add(x) for x in self.blockSites[block] ]
467 >            [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
468              wmbsFile['block'] = block
469              runNum = f['RunsList'][0]['RunNumber']
470 <            runList.append(runNum)
470 >            runList.append(runNum)
471              myRun = Run(runNumber=runNum)
472              wmbsFile.addRun( myRun )
473              thefiles.addFile(
474                  wmbsFile
475                  )
476 <
476 >
477          work = Workflow()
478          subs = Subscription(
479          fileset = thefiles,
# Line 378 | Line 482 | class JobSplitter:
482          type = "Processing")
483          splitter = SplitterFactory()
484          jobfactory = splitter(subs)
485 <        
486 <        #loop over all runs
383 <        set = Set(runList)
485 >
486 >        #loop over all runs
487          list_of_lists = []
488          jobDestination = []
489 <
489 >        list_of_blocks = []
490          count = 0
491 <        for i in list(set):
491 >        for jobGroup in  jobfactory():
492              if count <  self.theNumberOfJobs:
493 <                res = self.getJobInfo(jobfactory())
494 <                parString = ''
493 >                res = self.getJobInfo(jobGroup)
494 >                parString = ''
495                  for file in res['lfns']:
496 <                    parString += '\\\"' + file + '\\\"\,'
497 <                fullString = parString[:-2]
498 <                list_of_lists.append([fullString,str(-1),str(0)])    
496 >                    parString += file + ','
497 >                list_of_blocks.append(res['block'])
498 >                fullString = parString[:-1]
499 >                blockString=','.join(list_of_blocks)
500 >                list_of_lists.append([fullString,str(-1),str(0),blockString])
501                  #need to check single file location
502 <                jobDestination.append(res['locations'])  
502 >                jobDestination.append(res['locations'])
503                  count +=1
504 <        #print jobDestination
400 <       # prepare dict output
504 >        # prepare dict output
505          dictOut = {}
506 +        dictOut['params']= ['InputFiles','MaxEvents','SkipEvents','InputBlocks']
507          dictOut['args'] = list_of_lists
508          dictOut['jobDestination'] = jobDestination
509          dictOut['njobs']=count
510 +        self.cacheBlocks(list_of_blocks,jobDestination)
511  
512          return dictOut
513  
514      def getJobInfo( self,jobGroup ):
515          res = {}
516 <        lfns = []        
517 <        locations = []        
516 >        lfns = []
517 >        locations = []
518          tmp_check=0
519          for job in jobGroup.jobs:
520              for file in job.getFiles():
521 <                lfns.append(file['lfn'])
521 >                lfns.append(file['lfn'])
522                  for loc in file['locations']:
523                      if tmp_check < 1 :
524                          locations.append(loc)
525 <                    tmp_check = tmp_check + 1
526 <                ### qui va messo il check per la locations
527 <        res['lfns'] = lfns
528 <        res['locations'] = locations
529 <        return res                
530 <      
525 >                        res['block']= file['block']
526 >                tmp_check = tmp_check + 1
527 >        res['lfns'] = lfns
528 >        res['locations'] = locations
529 >        return res
530 >
531   ########################################################################
532 <    def jobSplittingNoInput(self):
532 >    def prepareSplittingNoInput(self):
533          """
428        Perform job splitting based on number of event per job
534          """
430        common.logger.debug(5,'Splitting per events')
431
535          if (self.selectEventsPerJob):
536 <            common.logger.message('Required '+str(self.eventsPerJob)+' events per job ')
536 >            common.logger.info('Required '+str(self.eventsPerJob)+' events per job ')
537          if (self.selectNumberOfJobs):
538 <            common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
538 >            common.logger.info('Required '+str(self.theNumberOfJobs)+' jobs in total ')
539          if (self.selectTotalNumberEvents):
540 <            common.logger.message('Required '+str(self.total_number_of_events)+' events in total ')
540 >            common.logger.info('Required '+str(self.total_number_of_events)+' events in total ')
541  
542          if (self.total_number_of_events < 0):
543              msg='Cannot split jobs per Events with "-1" as total number of events'
# Line 451 | Line 554 | class JobSplitter:
554              self.total_number_of_jobs = self.theNumberOfJobs
555              self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
556  
557 <        common.logger.debug(5,'N jobs  '+str(self.total_number_of_jobs))
557 >
558 >    def jobSplittingNoInput(self):
559 >        """
560 >        Perform job splitting based on number of event per job
561 >        """
562 >        common.logger.debug('Splitting per events')
563 >        self.checkUserSettings()
564 >        jobDestination=[]
565 >        if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
566 >            msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
567 >            raise CrabException(msg)
568 >
569 >        managedGenerators =self.args['managedGenerators']
570 >        generator = self.args['generator']
571 >        firstLumi = self.cfg_params.get('CMSSW.first_lumi', 1)
572 >
573 >        self.prepareSplittingNoInput()
574 >
575 >        common.logger.debug('N jobs  '+str(self.total_number_of_jobs))
576  
577          # is there any remainder?
578          check = int(self.total_number_of_events) - (int(self.total_number_of_jobs)*self.eventsPerJob)
579  
580 <        common.logger.debug(5,'Check  '+str(check))
580 >        common.logger.debug('Check  '+str(check))
581  
582 <        common.logger.message(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
582 >        common.logger.info(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
583          if check > 0:
584 <            common.logger.message('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
584 >            common.logger.info('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
585  
586          # argument is seed number.$i
587          self.list_of_args = []
588          for i in range(self.total_number_of_jobs):
589              ## Since there is no input, any site is good
590 <            self.jobDestination.append([""]) #must be empty to write correctly the xml
590 >            jobDestination.append([""]) # must be empty to correctly write the XML
591              args=[]
592 <            if (self.firstRun):
593 <                ## pythia first run
594 <                args.append(str(self.firstRun)+str(i))
595 <            if (self.generator in self.managedGenerators):
596 <                if (self.generator == 'comphep' and i == 0):
592 >            if (firstLumi): # Pythia first lumi
593 >                args.append(str(int(firstLumi)+i))
594 >            if (generator in managedGenerators):
595 >               args.append(generator)
596 >               if (generator == 'comphep' and i == 0):
597                      # COMPHEP is brain-dead and wants event #'s like 1,100,200,300
598                      args.append('1')
599 <                else:
599 >               else:
600                      args.append(str(i*self.eventsPerJob))
601 +            args.append(str(self.eventsPerJob))
602              self.list_of_args.append(args)
603         # prepare dict output
604 +
605          dictOut = {}
606 +        dictOut['params'] = ['MaxEvents']
607 +        if (firstLumi):
608 +            dictOut['params'] = ['FirstLumi','MaxEvents']
609 +            if (generator in managedGenerators):
610 +                dictOut['params'] = ['FirstLumi', 'Generator', 'FirstEvent', 'MaxEvents']
611 +        else:
612 +            if (generator in managedGenerators) :
613 +                dictOut['params'] = ['Generator', 'FirstEvent', 'MaxEvents']
614          dictOut['args'] = self.list_of_args
615 <        dictOut['jobDestination'] = self.jobDestination
615 >        dictOut['jobDestination'] = jobDestination
616          dictOut['njobs']=self.total_number_of_jobs
617  
618          return dictOut
# Line 492 | Line 623 | class JobSplitter:
623          Perform job splitting based on number of job
624          """
625          self.checkUserSettings()
626 <        if (self.selectnumberofjobs == 0):
626 >        if (self.selectNumberOfJobs == 0):
627              msg = 'must specify  number_of_jobs.'
628              raise crabexception(msg)
629 +        jobDestination = []
630 +        common.logger.debug('Splitting per job')
631 +        common.logger.info('Required '+str(self.theNumberOfJobs)+' jobs in total ')
632  
633 <        common.logger.debug(5,'Splitting per job')
500 <        common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
633 > #        self.total_number_of_jobs = self.theNumberOfJobs
634  
635 <        self.total_number_of_jobs = self.theNumberOfJobs
635 >        self.prepareSplittingNoInput()
636  
637 <        common.logger.debug(5,'N jobs  '+str(self.total_number_of_jobs))
637 >        common.logger.debug('N jobs  '+str(self.total_number_of_jobs))
638  
639 <        common.logger.message(str(self.total_number_of_jobs)+' jobs can be created')
639 >        common.logger.info(str(self.total_number_of_jobs)+' jobs can be created')
640  
641          # argument is seed number.$i
642          self.list_of_args = []
643          for i in range(self.total_number_of_jobs):
644 <            self.jobDestination.append([""])
645 <            self.list_of_args.append([str(i)])
644 >            args=[]
645 >            jobDestination.append([""])
646 >            if self.eventsPerJob != 0 :
647 >                args.append(str(self.eventsPerJob))
648 >                self.list_of_args.append(args)
649  
650         # prepare dict output
651          dictOut = {}
652 <        dictOut['args'] = self.list_of_args
653 <        dictOut['jobDestination'] = []
652 >        dictOut['params'] = ['MaxEvents']
653 >        dictOut['args'] =  self.list_of_args
654 >        dictOut['jobDestination'] = jobDestination
655          dictOut['njobs']=self.total_number_of_jobs
656          return dictOut
520
657  
658 <    def jobSplittingByLumi(self):
658 >
659 >    def jobSplittingByLumi(self):
660          """
661 +        Split task into jobs by Lumi section paying attention to which
662 +        lumis should be run (according to the analysis dataset).
663 +        This uses WMBS job splitting which does not split files over jobs
664 +        so the job will have AT LEAST as many lumis as requested, perhaps
665 +        more
666          """
667 <        return
667 >        self.useParent = int(self.cfg_params.get('CMSSW.use_parent',0))
668 >        common.logger.debug('Splitting by Lumi')
669 >        self.checkLumiSettings()
670 >
671 >        blockSites = self.args['blockSites']
672 >        pubdata = self.args['pubdata']
673 >
674 >        lumisPerFile  = pubdata.getLumis()
675 >        self.parentFiles=pubdata.getParent()
676 >        # Make the list of WMBS files for job splitter
677 >        fileList = pubdata.getListFiles()
678 >        wmFileList = []
679 >        for jobFile in fileList:
680 >            block = jobFile['Block']['Name']
681 >            try:
682 >                jobFile['Block']['StorageElementList'].extend(blockSites[block])
683 >            except:
684 >                continue
685 >            wmbsFile = File(jobFile['LogicalFileName'])
686 >            if not  blockSites[block]:
687 >                wmbsFile['locations'].add('Nowhere')
688 >            [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
689 >            wmbsFile['block'] = block
690 >            for lumi in lumisPerFile[jobFile['LogicalFileName']]:
691 >                wmbsFile.addRun(Run(lumi[0], lumi[1]))
692 >            wmFileList.append(wmbsFile)
693 >
694 >        fileSet = set(wmFileList)
695 >        thefiles = Fileset(name='FilesToSplit', files = fileSet)
696 >
697 >        # Create the factory and workflow
698 >        work = Workflow()
699 >        subs = Subscription(fileset    = thefiles,    workflow = work,
700 >                            split_algo = 'LumiBased', type     = "Processing")
701 >        splitter = SplitterFactory()
702 >        jobFactory = splitter(subs)
703 >
704 >        list_of_lists = []
705 >        jobDestination = []
706 >        jobCount = 0
707 >        lumisCreated = 0
708 >        list_of_blocks = []
709 >        if not self.limitJobLumis:
710 >            if self.totalNLumis > 0:
711 >                self.lumisPerJob = max(self.totalNLumis // self.theNumberOfJobs,1)
712 >            else:
713 >                self.lumisPerJob = pubdata.getMaxLumis() // self.theNumberOfJobs + 1
714 >            common.logger.info('Each job will process about %s lumis.' %
715 >                                self.lumisPerJob)
716 >
717 >        for jobGroup in jobFactory(lumis_per_job = self.lumisPerJob):
718 >            for job in jobGroup.jobs:
719 >                if (self.limitNJobs and jobCount >= self.theNumberOfJobs):
720 >                    common.logger.info('Requested number of jobs reached.')
721 >                    break
722 >                if (self.limitTotalLumis and lumisCreated >= self.totalNLumis):
723 >                    common.logger.info('Requested number of lumis reached.')
724 >                    break
725 >                lumis = []
726 >                lfns  = []
727 >                if self.useParent==1:
728 >                 parentlfns  = []
729 >                 pString =""
730 >
731 >                locations = []
732 >                blocks = []
733 >                firstFile = True
734 >                # Collect information from all the files
735 >                for jobFile in job.getFiles():
736 >                    doFile = False
737 >                    if firstFile:  # Get locations from first file in the job
738 >                        for loc in jobFile['locations']:
739 >                            locations.append(loc)
740 >                        blocks.append(jobFile['block'])
741 >                        firstFile = False
742 >                    # Accumulate Lumis from all files
743 >                    for lumiList in jobFile['runs']:
744 >                        theRun = lumiList.run
745 >                        for theLumi in list(lumiList):
746 >                            if (not self.limitTotalLumis) or \
747 >                               (lumisCreated < self.totalNLumis):
748 >                                doFile = True
749 >                                lumisCreated += 1
750 >                                lumis.append( (theRun, theLumi) )
751 >                    if doFile:
752 >                        lfns.append(jobFile['lfn'])
753 >                        if self.useParent==1:
754 >                           parent = self.parentFiles[jobFile['lfn']]
755 >                           for p in parent :
756 >                               pString += p  + ','
757 >                fileString = ','.join(lfns)
758 >                lumiLister = LumiList(lumis = lumis)
759 >                lumiString = lumiLister.getCMSSWString()
760 >                blockString=','.join(blocks)
761 >                if self.useParent==1:
762 >                  common.logger.debug("Files: "+fileString+" with the following parents: "+pString[:-1])
763 >                  pfileString = pString[:-1]
764 >                  list_of_lists.append([fileString, pfileString, str(-1), str(0), lumiString,blockString])
765 >                else:
766 >                 list_of_lists.append([fileString, str(-1), str(0), lumiString, blockString])
767 >                list_of_blocks.append(blocks)
768 >                jobDestination.append(locations)
769 >                jobCount += 1
770 >                common.logger.debug('Job %s will run on %s files and %s lumis '
771 >                    % (jobCount, len(lfns), len(lumis) ))
772 >
773 >        common.logger.info('%s jobs created to run on %s lumis' %
774 >                              (jobCount, lumisCreated))
775 >
776 >        # Prepare dict output matching back to non-WMBS job creation
777 >        dictOut = {}
778 >        dictOut['params'] = ['InputFiles', 'MaxEvents', 'SkipEvents', 'Lumis','InputBlocks']
779 >        if self.useParent==1:
780 >         dictOut['params']= ['InputFiles','ParentFiles','MaxEvents','SkipEvents','Lumis','InputBlocks']
781 >        dictOut['args'] = list_of_lists
782 >        dictOut['jobDestination'] = jobDestination
783 >        dictOut['njobs'] = jobCount
784 >        self.cacheBlocks(list_of_blocks,jobDestination)
785 >
786 >        return dictOut
787 >
788 >    def cacheBlocks(self, blocks,destinations):
789 >
790 >        saveFblocks=''
791 >        for i in range(len(blocks)):
792 >            sites=self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(destinations[i]))
793 >            if len(sites) != 0:
794 >                for block in blocks[i]:
795 >                    saveFblocks += str(block)+'\n'
796 >        writeTXTfile(self, self.fileBlocks_FileName , saveFblocks)
797 >
798      def Algos(self):
799          """
800          Define key splittingType matrix
801          """
802 <        SplitAlogs = {
803 <                     'EventBased'           : self.jobSplittingByEvent,
802 >        SplitAlogs = {
803 >                     'EventBased'           : self.jobSplittingByEvent,
804                       'RunBased'             : self.jobSplittingByRun,
805 <                     'LumiBased'            : self.jobSplittingByLumi,
806 <                     'NoInput'              : self.jobSplittingNoInput,
805 >                     'LumiBased'            : self.jobSplittingByLumi,
806 >                     'NoInput'              : self.jobSplittingNoInput,
807                       'ForScript'            : self.jobSplittingForScript
808 <                     }  
808 >                     }
809          return SplitAlogs
810  

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines