ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Splitter.py
(Generate patch)

Comparing COMP/CRAB/python/Splitter.py (file contents):
Revision 1.7 by spiga, Wed Feb 11 22:13:03 2009 UTC vs.
Revision 1.60 by belforte, Wed May 22 15:57:41 2013 UTC

# Line 1 | Line 1
1 +
2 + __revision__ = "$Id$"
3 + __version__ = "$Revision$"
4 +
5   import common
2 from crab_logger import Logger
6   from crab_exceptions import *
7   from crab_util import *
8 +
9 + from WMCore.DataStructs.File import File
10 + from WMCore.DataStructs.Fileset import Fileset
11 + from WMCore.DataStructs.Run import Run
12 + from WMCore.DataStructs.Subscription import Subscription
13 + from WMCore.DataStructs.Workflow import Workflow
14 + from WMCore.JobSplitting.SplitterFactory import SplitterFactory
15   from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
16 + try: # Can remove when CMSSW 3.7 and earlier are dropped
17 +    from FWCore.PythonUtilities.LumiList import LumiList
18 + except ImportError:
19 +    from LumiList import LumiList
20  
21   class JobSplitter:
22      def __init__( self, cfg_params,  args ):
23          self.cfg_params = cfg_params
24          self.args=args
25 +
26 +        self.lumisPerJob = -1
27 +        self.totalNLumis = 0
28 +        self.theNumberOfJobs = 0
29 +        self.limitNJobs = False
30 +        self.limitTotalLumis = False
31 +        self.limitJobLumis = False
32 +
33          #self.maxEvents
34          # init BlackWhiteListParser
35 <        seWhiteList = cfg_params.get('EDG.se_white_list',[])
36 <        seBlackList = cfg_params.get('EDG.se_black_list',[])
37 <        self.blackWhiteListParser = SEBlackWhiteListParser(seWhiteList, seBlackList, common.logger)
35 >        self.seWhiteList = cfg_params.get('GRID.se_white_list',[])
36 >        if type(self.seWhiteList) == type("string"):
37 >            self.seWhiteList = self.seWhiteList.split(',')
38 >        seBlackList = cfg_params.get('GRID.se_black_list',[])
39 >        if type(seBlackList) == type("string"):
40 >            seBlackList = seBlackList.split(',')
41 >        if common.scheduler.name().upper() == 'REMOTEGLIDEIN' :
42 >            # use central black list
43 >            removeBList = cfg_params.get("GRID.remove_default_blacklist", 0 )
44 >            blackAnaOps = None
45 >            if int(removeBList) == 0:
46 >                blacklist = Downloader("http://cmsdoc.cern.ch/cms/LCG/crab/config/")
47 >                result = blacklist.config("site_black_list.conf").strip().split(',')
48 >                if result != None:
49 >                    blackAnaOps = result
50 >                    common.logger.debug("Enforced black list: %s "%blackAnaOps)
51 >                else:
52 >                    common.logger.info("WARNING: Skipping default black list!")
53 >                if int(removeBList) == 0 and blackAnaOps:
54 >                    seBlackList += blackAnaOps
55 >
56 >        if seBlackList != []:
57 >            common.logger.info("SE black list applied to data location: %s" %\
58 >                           seBlackList)
59 >        if self.seWhiteList != []:
60 >            common.logger.info("SE white list applied to data location: %s" %\
61 >                           self.seWhiteList)
62 >        self.blackWhiteListParser = SEBlackWhiteListParser(self.seWhiteList, seBlackList, common.logger())
63 >
64 >        ## check if has been asked for a non default file to store/read analyzed fileBlocks
65 >        defaultName = common.work_space.shareDir()+'AnalyzedBlocks.txt'
66 >        self.fileBlocks_FileName = os.path.abspath(self.cfg_params.get('CMSSW.fileblocks_file',defaultName))
67  
68  
69      def checkUserSettings(self):
# Line 43 | Line 94 | class JobSplitter:
94              self.total_number_of_events = 0
95              self.selectTotalNumberEvents = 0
96  
97 +        return
98 +
99 +    def checkLumiSettings(self):
100 +        """
101 +        Check to make sure the user has specified enough information to
102 +        perform splitting by Lumis to run the job
103 +        """
104 +        settings = 0
105 +        if self.cfg_params.has_key('CMSSW.lumis_per_job'):
106 +            self.lumisPerJob =int( self.cfg_params['CMSSW.lumis_per_job'])
107 +            self.limitJobLumis = True
108 +            settings += 1
109 +
110 +        if self.cfg_params.has_key('CMSSW.number_of_jobs'):
111 +            self.theNumberOfJobs =int( self.cfg_params['CMSSW.number_of_jobs'])
112 +            self.limitNJobs = True
113 +            settings += 1
114 +
115 +        if self.cfg_params.has_key('CMSSW.total_number_of_lumis'):
116 +            self.totalNLumis = int(self.cfg_params['CMSSW.total_number_of_lumis'])
117 +            self.limitTotalLumis = (self.totalNLumis != -1)
118 +            settings += 1
119 +
120 +        if settings != 2:
121 +            msg = 'When splitting by lumi section you must specify two and only two of:\n'
122 +            msg += '  number_of_jobs, lumis_per_job, total_number_of_lumis'
123 +            raise CrabException(msg)
124 +        if self.limitNJobs and self.limitJobLumis:
125 +            self.limitTotalLumis = True
126 +            self.totalNLumis = self.lumisPerJob * self.theNumberOfJobs
127 +
128 +        # Has the user specified runselection?
129 +        if (self.cfg_params.has_key('CMSSW.runselection')):
130 +            common.logger.info('You have specified runselection and split by lumi.')
131 +            common.logger.info('Good lumi list will be the intersection of runselection and lumimask or ADS (if any).')
132 +        return
133 +
134 +    def ComputeSubBlockSites( self, blockSites ):
135 +        """
136 +        """
137 +        sub_blockSites = {}
138 +        for k,v in blockSites.iteritems():
139 +            sites=self.blackWhiteListParser.checkWhiteList(v)
140 +            if sites : sub_blockSites[k]=v
141 +        if len(sub_blockSites) < 1:
142 +            msg = 'WARNING: the sites %s is not hosting any part of data.'%self.seWhiteList
143 +            raise CrabException(msg)
144 +        return sub_blockSites
145  
146   ########################################################################
147      def jobSplittingByEvent( self ):
# Line 58 | Line 157 | class JobSplitter:
157                self.list_of_args - File(s) job will run on (a list of lists)
158          """
159  
160 <        jobDestination=[]  
160 >        jobDestination=[]
161          self.checkUserSettings()
162          if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
163              msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
164              raise CrabException(msg)
165 <
166 <        blockSites = self.args['blockSites']
165 >
166 >        blockSites = self.args['blockSites']
167          pubdata = self.args['pubdata']
168          filesbyblock=pubdata.getFiles()
169  
# Line 78 | Line 177 | class JobSplitter:
177          self.useParent = int(self.cfg_params.get('CMSSW.use_parent',0))
178          noBboundary = int(self.cfg_params.get('CMSSW.no_block_boundary',0))
179  
180 +        if noBboundary == 1:
181 +            if self.total_number_of_events== -1:
182 +                msg = 'You are selecting no_block_boundary=1 which does not allow to set total_number_of_events=-1\n'
183 +                msg +='\tYou shoud get the number of event from DBS web interface and use it for your configuration.'
184 +                raise CrabException(msg)
185 +            if len(self.seWhiteList) != 1:
186 +                msg = 'You are selecting no_block_boundary=1 which requires to choose one and only one site.\n'
187 +                msg += "\tPlease set se_white_list with the site's storage element name."
188 +                raise  CrabException(msg)
189 +            blockSites = self.ComputeSubBlockSites(blockSites)
190 +
191          # ---- Handle the possible job splitting configurations ---- #
192          if (self.selectTotalNumberEvents):
193              totalEventsRequested = self.total_number_of_events
# Line 92 | Line 202 | class JobSplitter:
202          # If user requested more events than are in the dataset
203          elif (totalEventsRequested > self.maxEvents):
204              eventsRemaining = self.maxEvents
205 <            common.logger.message("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
205 >            common.logger.info("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
206          # If user requested less events than are in the dataset
207          else:
208              eventsRemaining = totalEventsRequested
# Line 108 | Line 218 | class JobSplitter:
218              eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs)
219  
220          if (self.selectNumberOfJobs):
221 <            common.logger.message("May not create the exact number_of_jobs requested.")
221 >            common.logger.info("May not create the exact number_of_jobs requested.")
222  
223 +        if (self.theNumberOfJobs < 0):
224 +            common.logger.info("ERROR: Negative number_of_jobs requested. Will result in no jobs.")
225 +            
226          # old... to remove Daniele
227          totalNumberOfJobs = 999999999
228  
# Line 125 | Line 238 | class JobSplitter:
238          jobsOfBlock = {}
239  
240          parString = ""
241 +        pString = ""
242          filesEventCount = 0
243 +        msg=''
244  
245          # ---- Iterate over the blocks in the dataset until ---- #
246          # ---- we've met the requested total # of events    ---- #
# Line 137 | Line 252 | class JobSplitter:
252  
253              if self.eventsbyblock.has_key(block) :
254                  numEventsInBlock = self.eventsbyblock[block]
255 <                common.logger.debug(5,'Events in Block File '+str(numEventsInBlock))
255 >                common.logger.debug('Events in Block File '+str(numEventsInBlock))
256  
257                  files = filesbyblock[block]
258                  numFilesInBlock = len(files)
# Line 147 | Line 262 | class JobSplitter:
262                  if noBboundary == 0: # DD
263                      # ---- New block => New job ---- #
264                      parString = ""
265 +                    pString=""
266                      # counter for number of events in files currently worked on
267                      filesEventCount = 0
268                  # flag if next while loop should touch new file
# Line 156 | Line 272 | class JobSplitter:
272  
273                  # ---- Iterate over the files in the block until we've met the requested ---- #
274                  # ---- total # of events or we've gone over all the files in this block  ---- #
275 <                pString=''
275 >                msg='\n'
276                  while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
277                      file = files[fileCount]
278                      if self.useParent==1:
279                          parent = self.parentFiles[file]
280 <                        for f in parent :
165 <                            pString += '\\\"' + f + '\\\"\,'
166 <                        common.logger.debug(6, "File "+str(file)+" has the following parents: "+str(parent))
167 <                        common.logger.write("File "+str(file)+" has the following parents: "+str(parent))
280 >                        common.logger.log(10-1, "File "+str(file)+" has the following parents: "+str(parent))
281                      if newFile :
282                          try:
283                              numEventsInFile = self.eventsbyfile[file]
284 <                            common.logger.debug(6, "File "+str(file)+" has "+str(numEventsInFile)+" events")
284 >                            common.logger.log(10-1, "File "+str(file)+" has "+str(numEventsInFile)+" events")
285                              # increase filesEventCount
286                              filesEventCount += numEventsInFile
287                              # Add file to current job
288 <                            parString += '\\\"' + file + '\\\"\,'
288 >                            parString +=  file + ','
289 >                            if self.useParent==1:
290 >                                for f in parent :
291 >                                    pString += f  + ','
292                              newFile = 0
293                          except KeyError:
294 <                            common.logger.message("File "+str(file)+" has unknown number of events: skipping")
294 >                            common.logger.info("File "+str(file)+" has unknown number of events: skipping")
295  
296                      eventsPerJobRequested = min(eventsPerJobRequested, eventsRemaining)
297                      # if less events in file remain than eventsPerJobRequested
# Line 188 | Line 304 | class JobSplitter:
304                              if ( fileCount == numFilesInBlock-1 ) :
305                                  # end job using last file, use remaining events in block
306                                  # close job and touch new file
307 <                                fullString = parString[:-2]
307 >                                fullString = parString[:-1]
308                                  if self.useParent==1:
309 <                                    fullParentString = pString[:-2]
310 <                                    list_of_lists.append([fullString,fullParentString,str(-1),str(jobSkipEventCount)])
309 >                                    fullParentString = pString[:-1]
310 >                                    list_of_lists.append([fullString,fullParentString,str(-1),str(jobSkipEventCount),block])
311                                  else:
312 <                                    list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
313 <                                common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(filesEventCount - jobSkipEventCount)+" events (last file in block).")
312 >                                    list_of_lists.append([fullString,str(-1),str(jobSkipEventCount),block])
313 >                                msg += "Job %s can run over %s  events (last file in block).\n"%(str(jobCount+1), str(filesEventCount - jobSkipEventCount))
314                                  jobDestination.append(blockSites[block])
315 <                                common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(jobDestination[jobCount]))
315 >                                msg += "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
316                                  # fill jobs of block dictionary
317                                  jobsOfBlock[block].append(jobCount+1)
318                                  # reset counter
# Line 217 | Line 333 | class JobSplitter:
333                      # if events in file equal to eventsPerJobRequested
334                      elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
335                          # close job and touch new file
336 <                        fullString = parString[:-2]
336 >                        fullString = parString[:-1]
337                          if self.useParent==1:
338 <                            fullParentString = pString[:-2]
339 <                            list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
338 >                            fullParentString = pString[:-1]
339 >                            list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount),block])
340                          else:
341 <                            list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
342 <                        common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
341 >                            list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount),block])
342 >                        msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
343                          jobDestination.append(blockSites[block])
344 <                        common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(jobDestination[jobCount]))
344 >                        msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
345                          jobsOfBlock[block].append(jobCount+1)
346                          # reset counter
347                          jobCount = jobCount + 1
# Line 242 | Line 358 | class JobSplitter:
358                      # if more events in file remain than eventsPerJobRequested
359                      else :
360                          # close job but don't touch new file
361 <                        fullString = parString[:-2]
361 >                        fullString = parString[:-1]
362                          if self.useParent==1:
363 <                            fullParentString = pString[:-2]
364 <                            list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
363 >                            fullParentString = pString[:-1]
364 >                            list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount),block])
365                          else:
366 <                            list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
367 <                        common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
366 >                            list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount),block])
367 >                        msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
368                          jobDestination.append(blockSites[block])
369 <                        common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(jobDestination[jobCount]))
369 >                        msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
370                          jobsOfBlock[block].append(jobCount+1)
371                          # increase counter
372                          jobCount = jobCount + 1
# Line 261 | Line 377 | class JobSplitter:
377                          jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
378                          # remove all but the last file
379                          filesEventCount = self.eventsbyfile[file]
380 +                        pString_tmp=''
381                          if self.useParent==1:
382 <                            for f in parent : pString += '\\\"' + f + '\\\"\,'
383 <                        parString = '\\\"' + file + '\\\"\,'
382 >                            for f in parent : pString_tmp +=  f + ','
383 >                        pString =  pString_tmp
384 >                        parString =  file + ','
385                      pass # END if
386                  pass # END while (iterate over files in the block)
387          pass # END while (iterate over blocks in the dataset)
388 +        common.logger.debug(msg)
389          self.ncjobs = self.total_number_of_jobs = jobCount
390          if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
391 <            common.logger.message("Could not run on all requested events because some blocks not hosted at allowed sites.")
392 <        common.logger.message(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
393 <
391 >            common.logger.info("Could not run on all requested events because some blocks not hosted at allowed sites.")
392 >        common.logger.info(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
393 >
394          # skip check on  block with no sites  DD
395          if noBboundary == 0 : self.checkBlockNoSite(blocks,jobsOfBlock,blockSites)
396  
397         # prepare dict output
398          dictOut = {}
399 +        dictOut['params']= ['InputFiles','MaxEvents','SkipEvents','InputBlocks']
400 +        if self.useParent: dictOut['params']= ['InputFiles','ParentFiles','MaxEvents','SkipEvents','InputBlocks']
401          dictOut['args'] = list_of_lists
402          dictOut['jobDestination'] = jobDestination
403          dictOut['njobs']=self.total_number_of_jobs
# Line 285 | Line 406 | class JobSplitter:
406  
407          # keep trace of block with no sites to print a warning at the end
408  
409 <    def checkBlockNoSite(self,blocks,jobsOfBlock,blockSites):  
409 >    def checkBlockNoSite(self,blocks,jobsOfBlock,blockSites):
410          # screen output
411          screenOutput = "List of jobs and available destination sites:\n\n"
412          noSiteBlock = []
413          bloskNoSite = []
414 +        allBlock = []
415  
416          blockCounter = 0
417 +        saveFblocks =''
418          for block in blocks:
419              if block in jobsOfBlock.keys() :
420                  blockCounter += 1
421 +                allBlock.append( blockCounter )
422 +                sites=self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],[block]),[block])
423                  screenOutput += "Block %5i: jobs %20s: sites: %s\n" % (blockCounter,spanRanges(jobsOfBlock[block]),
424 <                    ','.join(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],[block]),[block])))
425 <                if len(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],[block]),[block])) == 0:
424 >                    ', '.join(SE2CMS(sites)))
425 >                if len(sites) == 0:
426                      noSiteBlock.append( spanRanges(jobsOfBlock[block]) )
427                      bloskNoSite.append( blockCounter )
428 +                else:
429 +                    saveFblocks += str(block)+'\n'
430 +        writeTXTfile(self, self.fileBlocks_FileName , saveFblocks)
431  
432 <        common.logger.message(screenOutput)
432 >        common.logger.info(screenOutput)
433          if len(noSiteBlock) > 0 and len(bloskNoSite) > 0:
434              msg = 'WARNING: No sites are hosting any part of data for block:\n                '
435              virgola = ""
# Line 309 | Line 437 | class JobSplitter:
437                  virgola = ","
438              for block in bloskNoSite:
439                  msg += ' ' + str(block) + virgola
440 <            msg += '\n               Related jobs:\n                 '
440 >            msg += '\n\t\tRelated jobs:\n                 '
441              virgola = ""
442              if len(noSiteBlock) > 1:
443                  virgola = ","
444              for range_jobs in noSiteBlock:
445                  msg += str(range_jobs) + virgola
446 <            msg += '\n               will not be submitted and this block of data can not be analyzed!\n'
447 <            if self.cfg_params.has_key('EDG.se_white_list'):
448 <                msg += 'WARNING: SE White List: '+self.cfg_params['EDG.se_white_list']+'\n'
449 <                msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
450 <                msg += 'Please check if the dataset is available at this site!)\n'
451 <            if self.cfg_params.has_key('EDG.ce_white_list'):
452 <                msg += 'WARNING: CE White List: '+self.cfg_params['EDG.ce_white_list']+'\n'
453 <                msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
454 <                msg += 'Please check if the dataset is available at this site!)\n'
455 <
456 <            common.logger.message(msg)
446 >            msg += '\n\t\twill not be submitted and this block of data can not be analyzed!\n'
447 >            if self.cfg_params.has_key('GRID.se_white_list'):
448 >                msg += '\tWARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
449 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
450 >                msg += '\tPlease check if the dataset is available at this site!)'
451 >            if self.cfg_params.has_key('GRID.ce_white_list'):
452 >                msg += '\tWARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
453 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
454 >                msg += '\tPlease check if the dataset is available at this site!)\n'
455 >
456 >            common.logger.info(msg)
457 >
458 >        if bloskNoSite == allBlock:
459 >            msg = 'Requested jobs cannot be Created! \n'
460 >            if self.cfg_params.has_key('GRID.se_white_list'):
461 >                msg += '\tWARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
462 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
463 >                msg += '\tPlease check if the dataset is available at this site!)'
464 >            if self.cfg_params.has_key('GRID.ce_white_list'):
465 >                msg += '\tWARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
466 >                msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
467 >                msg += '\tPlease check if the dataset is available at this site!)\n'
468 >            raise CrabException(msg)
469  
470          return
471  
472  
473   ########################################################################
474 <    def jobSplittingByRun(self):
474 >    def jobSplittingByRun(self):
475          """
476          """
337        from sets import Set  
338        from WMCore.JobSplitting.RunBased import RunBased
339        from WMCore.DataStructs.Workflow import Workflow
340        from WMCore.DataStructs.File import File
341        from WMCore.DataStructs.Fileset import Fileset
342        from WMCore.DataStructs.Subscription import Subscription
343        from WMCore.JobSplitting.SplitterFactory import SplitterFactory
344        from WMCore.DataStructs.Run import Run
477  
478          self.checkUserSettings()
479 <        blockSites = self.args['blockSites']
479 >        blockSites = self.args['blockSites']
480          pubdata = self.args['pubdata']
481  
482          if self.selectNumberOfJobs == 0 :
483              self.theNumberOfJobs = 9999999
484          blocks = {}
485 <        runList = []
485 >        runList = []
486          thefiles = Fileset(name='FilesToSplit')
487          fileList = pubdata.getListFiles()
488          for f in fileList:
357           # print f
489              block = f['Block']['Name']
490 <          #  if not blocks.has_key(block):
360 <          #      blocks[block] = reader.listFileBlockLocation(block)
361 <            try:
490 >            try:
491                  f['Block']['StorageElementList'].extend(blockSites[block])
492              except:
493                  continue
494              wmbsFile = File(f['LogicalFileName'])
495 +            if not  blockSites[block]:
496 +                msg = 'WARNING: No sites are hosting any part of data for block: %s\n' %block                
497 +                msg += 'Related jobs will not be submitted and this block of data can not be analyzed'
498 +                common.logger.debug(msg)
499              [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
500              wmbsFile['block'] = block
501              runNum = f['RunsList'][0]['RunNumber']
502 <            runList.append(runNum)
502 >            runList.append(runNum)
503              myRun = Run(runNumber=runNum)
504              wmbsFile.addRun( myRun )
505              thefiles.addFile(
506                  wmbsFile
507                  )
508 <
508 >
509          work = Workflow()
510          subs = Subscription(
511          fileset = thefiles,
# Line 381 | Line 514 | class JobSplitter:
514          type = "Processing")
515          splitter = SplitterFactory()
516          jobfactory = splitter(subs)
517 <        
518 <        #loop over all runs
386 <        set = Set(runList)
517 >
518 >        #loop over all runs
519          list_of_lists = []
520          jobDestination = []
521 <
521 >        list_of_blocks = []
522          count = 0
523 <        for i in list(set):
523 >        for jobGroup in  jobfactory():
524              if count <  self.theNumberOfJobs:
525 <                res = self.getJobInfo(jobfactory())
526 <                parString = ''
525 >                res = self.getJobInfo(jobGroup)
526 >                parString = ''
527                  for file in res['lfns']:
528 <                    parString += '\\\"' + file + '\\\"\,'
529 <                fullString = parString[:-2]
530 <                list_of_lists.append([fullString,str(-1),str(0)])    
528 >                    parString += file + ','
529 >                list_of_blocks.append(res['block'])
530 >                fullString = parString[:-1]
531 >                blockString=','.join(list_of_blocks)
532 >                list_of_lists.append([fullString,str(-1),str(0),blockString])
533                  #need to check single file location
534 <                jobDestination.append(res['locations'])  
534 >                jobDestination.append(res['locations'])
535                  count +=1
536 <        #print jobDestination
403 <       # prepare dict output
536 >        # prepare dict output
537          dictOut = {}
538 +        dictOut['params']= ['InputFiles','MaxEvents','SkipEvents','InputBlocks']
539          dictOut['args'] = list_of_lists
540          dictOut['jobDestination'] = jobDestination
541          dictOut['njobs']=count
542 +        self.cacheBlocks(list_of_blocks,jobDestination)
543  
544          return dictOut
545  
546      def getJobInfo( self,jobGroup ):
547          res = {}
548 <        lfns = []        
549 <        locations = []        
548 >        lfns = []
549 >        locations = []
550          tmp_check=0
551          for job in jobGroup.jobs:
552              for file in job.getFiles():
553 <                lfns.append(file['lfn'])
553 >                lfns.append(file['lfn'])
554                  for loc in file['locations']:
555                      if tmp_check < 1 :
556                          locations.append(loc)
557 <                    tmp_check = tmp_check + 1
558 <                ### qui va messo il check per la locations
559 <        res['lfns'] = lfns
560 <        res['locations'] = locations
561 <        return res                
562 <      
557 >                        res['block']= file['block']
558 >                tmp_check = tmp_check + 1
559 >        res['lfns'] = lfns
560 >        res['locations'] = locations
561 >        return res
562 >
563   ########################################################################
564 <    def jobSplittingNoInput(self):
564 >    def prepareSplittingNoInput(self):
565          """
431        Perform job splitting based on number of event per job
566          """
433        common.logger.debug(5,'Splitting per events')
434        self.checkUserSettings()
435        jobDestination=[]
436        if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
437            msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
438            raise CrabException(msg)
439
440        managedGenerators =self.args['managedGenerators']
441        generator = self.args['generator']
442        firstRun = self.cfg_params.get('CMSSW.first_run',None)
443
567          if (self.selectEventsPerJob):
568 <            common.logger.message('Required '+str(self.eventsPerJob)+' events per job ')
568 >            common.logger.info('Required '+str(self.eventsPerJob)+' events per job ')
569          if (self.selectNumberOfJobs):
570 <            common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
570 >            common.logger.info('Required '+str(self.theNumberOfJobs)+' jobs in total ')
571          if (self.selectTotalNumberEvents):
572 <            common.logger.message('Required '+str(self.total_number_of_events)+' events in total ')
572 >            common.logger.info('Required '+str(self.total_number_of_events)+' events in total ')
573  
574          if (self.total_number_of_events < 0):
575              msg='Cannot split jobs per Events with "-1" as total number of events'
# Line 463 | Line 586 | class JobSplitter:
586              self.total_number_of_jobs = self.theNumberOfJobs
587              self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
588  
589 <        common.logger.debug(5,'N jobs  '+str(self.total_number_of_jobs))
589 >
590 >    def jobSplittingNoInput(self):
591 >        """
592 >        Perform job splitting based on number of event per job
593 >        """
594 >        common.logger.debug('Splitting per events')
595 >        self.checkUserSettings()
596 >        jobDestination=[]
597 >        if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
598 >            msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
599 >            raise CrabException(msg)
600 >
601 >        managedGenerators =self.args['managedGenerators']
602 >        generator = self.args['generator']
603 >        firstLumi = int(self.cfg_params.get('CMSSW.first_lumi', 1))
604 >
605 >        self.prepareSplittingNoInput()
606 >
607 >        common.logger.debug('N jobs  '+str(self.total_number_of_jobs))
608  
609          # is there any remainder?
610          check = int(self.total_number_of_events) - (int(self.total_number_of_jobs)*self.eventsPerJob)
611  
612 <        common.logger.debug(5,'Check  '+str(check))
612 >        common.logger.debug('Check  '+str(check))
613  
614 <        common.logger.message(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
614 >        common.logger.info(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
615          if check > 0:
616 <            common.logger.message('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
616 >            common.logger.info('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
617  
618          # argument is seed number.$i
619          self.list_of_args = []
620          for i in range(self.total_number_of_jobs):
621              ## Since there is no input, any site is good
622 <            jobDestination.append([""]) #must be empty to write correctly the xml
622 >            jobDestination.append([""]) # must be empty to correctly write the XML
623              args=[]
624 <            if (firstRun):
625 <                ## pythia first run
485 <                args.append(str(firstRun)+str(i))
624 >            if (firstLumi): # Pythia first lumi
625 >                args.append(str(int(firstLumi)+i))
626              if (generator in managedGenerators):
627 <                if (generator == 'comphep' and i == 0):
627 >               args.append(generator)
628 >               if (generator == 'comphep' and i == 0):
629                      # COMPHEP is brain-dead and wants event #'s like 1,100,200,300
630                      args.append('1')
631 <                else:
631 >               else:
632                      args.append(str(i*self.eventsPerJob))
633              args.append(str(self.eventsPerJob))
634              self.list_of_args.append(args)
635         # prepare dict output
636 +
637          dictOut = {}
638 +        dictOut['params'] = ['MaxEvents']
639 +        if (firstLumi):
640 +            dictOut['params'] = ['FirstLumi','MaxEvents']
641 +            if (generator in managedGenerators):
642 +                dictOut['params'] = ['FirstLumi', 'Generator', 'FirstEvent', 'MaxEvents']
643 +        else:
644 +            if (generator in managedGenerators) :
645 +                dictOut['params'] = ['Generator', 'FirstEvent', 'MaxEvents']
646          dictOut['args'] = self.list_of_args
647          dictOut['jobDestination'] = jobDestination
648          dictOut['njobs']=self.total_number_of_jobs
# Line 509 | Line 659 | class JobSplitter:
659              msg = 'must specify  number_of_jobs.'
660              raise crabexception(msg)
661          jobDestination = []
662 <        common.logger.debug(5,'Splitting per job')
663 <        common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
662 >        common.logger.debug('Splitting per job')
663 >        common.logger.info('Required '+str(self.theNumberOfJobs)+' jobs in total ')
664 >
665 > #        self.total_number_of_jobs = self.theNumberOfJobs
666  
667 <        self.total_number_of_jobs = self.theNumberOfJobs
667 >        self.prepareSplittingNoInput()
668  
669 <        common.logger.debug(5,'N jobs  '+str(self.total_number_of_jobs))
669 >        common.logger.debug('N jobs  '+str(self.total_number_of_jobs))
670  
671 <        common.logger.message(str(self.total_number_of_jobs)+' jobs can be created')
671 >        common.logger.info(str(self.total_number_of_jobs)+' jobs can be created')
672  
673          # argument is seed number.$i
674          self.list_of_args = []
675          for i in range(self.total_number_of_jobs):
676 +            args=[]
677              jobDestination.append([""])
678 <            self.list_of_args.append([str(i)])
678 >            if self.eventsPerJob != 0 :
679 >                args.append(str(self.eventsPerJob))
680 >                self.list_of_args.append(args)
681  
682         # prepare dict output
683          dictOut = {}
684 <        dictOut['args'] = self.list_of_args
684 >        dictOut['params'] = ['MaxEvents']
685 >        dictOut['args'] =  self.list_of_args
686          dictOut['jobDestination'] = jobDestination
687          dictOut['njobs']=self.total_number_of_jobs
688          return dictOut
533
689  
690 <    def jobSplittingByLumi(self):
690 >
691 >    def jobSplittingByLumi(self):
692          """
693 +        Split task into jobs by Lumi section paying attention to which
694 +        lumis should be run (according to the analysis dataset).
695 +        This uses WMBS job splitting which does not split files over jobs
696 +        so the job will have AT LEAST as many lumis as requested, perhaps
697 +        more
698          """
699 <        return
699 >        self.useParent = int(self.cfg_params.get('CMSSW.use_parent',0))
700 >        common.logger.debug('Splitting by Lumi')
701 >        self.checkLumiSettings()
702 >
703 >        blockSites = self.args['blockSites']
704 >        pubdata = self.args['pubdata']
705 >
706 >        lumisPerFile  = pubdata.getLumis()
707 >        self.parentFiles=pubdata.getParent()
708 >        # Make the list of WMBS files for job splitter
709 >        fileList = pubdata.getListFiles()
710 >        wmFileList = []
711 >        for jobFile in fileList:
712 >            block = jobFile['Block']['Name']
713 >            try:
714 >                jobFile['Block']['StorageElementList'].extend(blockSites[block])
715 >            except:
716 >                continue
717 >            wmbsFile = File(jobFile['LogicalFileName'])
718 >            if not  blockSites[block]:
719 >                msg = 'WARNING: No sites are hosting any part of data for block: %s\n' %block
720 >                msg += 'Related jobs will not be submitted and this block of data can not be analyzed'
721 >                common.logger.debug(msg)
722 >               # wmbsFile['locations'].add('Nowhere')
723 >            [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
724 >            wmbsFile['block'] = block
725 >            for lumi in lumisPerFile[jobFile['LogicalFileName']]:
726 >                wmbsFile.addRun(Run(lumi[0], lumi[1]))
727 >            wmFileList.append(wmbsFile)
728 >
729 >        fileSet = set(wmFileList)
730 >        thefiles = Fileset(name='FilesToSplit', files = fileSet)
731 >
732 >        # Create the factory and workflow
733 >        work = Workflow()
734 >        subs = Subscription(fileset    = thefiles,    workflow = work,
735 >                            split_algo = 'LumiBased', type     = "Processing")
736 >        splitter = SplitterFactory()
737 >        jobFactory = splitter(subs)
738 >
739 >        list_of_lists = []
740 >        jobDestination = []
741 >        jobCount = 0
742 >        lumisCreated = 0
743 >        list_of_blocks = []
744 >        if not self.limitJobLumis:
745 >            if self.totalNLumis > 0:
746 >                self.lumisPerJob = max(self.totalNLumis // self.theNumberOfJobs,1)
747 >            else:
748 >                self.lumisPerJob = pubdata.getMaxLumis() // self.theNumberOfJobs + 1
749 >            common.logger.info('Each job will process about %s lumis.' %
750 >                                self.lumisPerJob)
751 >
752 >        for jobGroup in jobFactory(lumis_per_job = self.lumisPerJob):
753 >            for job in jobGroup.jobs:
754 >                if (self.limitNJobs and jobCount >= self.theNumberOfJobs):
755 >                    common.logger.info('Requested number of jobs reached.')
756 >                    break
757 >                if (self.limitTotalLumis and lumisCreated >= self.totalNLumis):
758 >                    common.logger.info('Requested number of lumis reached.')
759 >                    break
760 >                lumis = []
761 >                lfns  = []
762 >                if self.useParent==1:
763 >                 parentlfns  = []
764 >                 pString =""
765 >
766 >                locations = []
767 >                blocks = []
768 >                firstFile = True
769 >                # Collect information from all the files
770 >                for jobFile in job.getFiles():
771 >                    doFile = False
772 >                    if firstFile:  # Get locations from first file in the job
773 >                        for loc in jobFile['locations']:
774 >                            locations.append(loc)
775 >                        blocks.append(jobFile['block'])
776 >                        firstFile = False
777 >                    # Accumulate Lumis from all files
778 >                    for lumiList in jobFile['runs']:
779 >                        theRun = lumiList.run
780 >                        for theLumi in list(lumiList):
781 >                            if (not self.limitTotalLumis) or \
782 >                               (lumisCreated < self.totalNLumis):
783 >                                doFile = True
784 >                                lumisCreated += 1
785 >                                lumis.append( (theRun, theLumi) )
786 >                    if doFile:
787 >                        lfns.append(jobFile['lfn'])
788 >                        if self.useParent==1:
789 >                           parent = self.parentFiles[jobFile['lfn']]
790 >                           for p in parent :
791 >                               pString += p  + ','
792 >                fileString = ','.join(lfns)
793 >                lumiLister = LumiList(lumis = lumis)
794 >                lumiString = lumiLister.getCMSSWString()
795 >                blockString=','.join(blocks)
796 >                if self.useParent==1:
797 >                  common.logger.debug("Files: "+fileString+" with the following parents: "+pString[:-1])
798 >                  pfileString = pString[:-1]
799 >                  list_of_lists.append([fileString, pfileString, str(-1), str(0), lumiString,blockString])
800 >                else:
801 >                 list_of_lists.append([fileString, str(-1), str(0), lumiString, blockString])
802 >                list_of_blocks.append(blocks)
803 >                jobDestination.append(locations)
804 >                jobCount += 1
805 >                common.logger.debug('Job %s will run on %s files and %s lumis '
806 >                    % (jobCount, len(lfns), len(lumis) ))
807 >
808 >        common.logger.info('%s jobs created to run on %s lumis' %
809 >                              (jobCount, lumisCreated))
810 >
811 >        # Prepare dict output matching back to non-WMBS job creation
812 >        dictOut = {}
813 >        dictOut['params'] = ['InputFiles', 'MaxEvents', 'SkipEvents', 'Lumis','InputBlocks']
814 >        if self.useParent==1:
815 >         dictOut['params']= ['InputFiles','ParentFiles','MaxEvents','SkipEvents','Lumis','InputBlocks']
816 >        dictOut['args'] = list_of_lists
817 >        dictOut['jobDestination'] = jobDestination
818 >        dictOut['njobs'] = jobCount
819 >        self.cacheBlocks(list_of_blocks,jobDestination)
820 >
821 >        return dictOut
822 >
823 >    def cacheBlocks(self, blocks,destinations):
824 >
825 >        saveFblocks=''
826 >        for i in range(len(blocks)):
827 >            sites=self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(destinations[i]))
828 >            if len(sites) != 0:
829 >                for block in blocks[i]:
830 >                    saveFblocks += str(block)+'\n'
831 >        writeTXTfile(self, self.fileBlocks_FileName , saveFblocks)
832 >
833      def Algos(self):
834          """
835          Define key splittingType matrix
836          """
837 <        SplitAlogs = {
838 <                     'EventBased'           : self.jobSplittingByEvent,
837 >        SplitAlogs = {
838 >                     'EventBased'           : self.jobSplittingByEvent,
839                       'RunBased'             : self.jobSplittingByRun,
840 <                     'LumiBased'            : self.jobSplittingByLumi,
841 <                     'NoInput'              : self.jobSplittingNoInput,
840 >                     'LumiBased'            : self.jobSplittingByLumi,
841 >                     'NoInput'              : self.jobSplittingNoInput,
842                       'ForScript'            : self.jobSplittingForScript
843 <                     }  
843 >                     }
844          return SplitAlogs
845  

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines