ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Splitter.py
(Generate patch)

Comparing COMP/CRAB/python/Splitter.py (file contents):
Revision 1.46 by spiga, Mon Sep 27 08:56:08 2010 UTC vs.
Revision 1.60 by belforte, Wed May 22 15:57:41 2013 UTC

# Line 33 | Line 33 | class JobSplitter:
33          #self.maxEvents
34          # init BlackWhiteListParser
35          self.seWhiteList = cfg_params.get('GRID.se_white_list',[])
36 +        if type(self.seWhiteList) == type("string"):
37 +            self.seWhiteList = self.seWhiteList.split(',')
38          seBlackList = cfg_params.get('GRID.se_black_list',[])
39 +        if type(seBlackList) == type("string"):
40 +            seBlackList = seBlackList.split(',')
41 +        if common.scheduler.name().upper() == 'REMOTEGLIDEIN' :
42 +            # use central black list
43 +            removeBList = cfg_params.get("GRID.remove_default_blacklist", 0 )
44 +            blackAnaOps = None
45 +            if int(removeBList) == 0:
46 +                blacklist = Downloader("http://cmsdoc.cern.ch/cms/LCG/crab/config/")
47 +                result = blacklist.config("site_black_list.conf").strip().split(',')
48 +                if result != None:
49 +                    blackAnaOps = result
50 +                    common.logger.debug("Enforced black list: %s "%blackAnaOps)
51 +                else:
52 +                    common.logger.info("WARNING: Skipping default black list!")
53 +                if int(removeBList) == 0 and blackAnaOps:
54 +                    seBlackList += blackAnaOps
55 +
56 +        if seBlackList != []:
57 +            common.logger.info("SE black list applied to data location: %s" %\
58 +                           seBlackList)
59 +        if self.seWhiteList != []:
60 +            common.logger.info("SE white list applied to data location: %s" %\
61 +                           self.seWhiteList)
62          self.blackWhiteListParser = SEBlackWhiteListParser(self.seWhiteList, seBlackList, common.logger())
63  
64          ## check if has been asked for a non default file to store/read analyzed fileBlocks
# Line 157 | Line 182 | class JobSplitter:
182                  msg = 'You are selecting no_block_boundary=1 which does not allow to set total_number_of_events=-1\n'
183                  msg +='\tYou shoud get the number of event from DBS web interface and use it for your configuration.'
184                  raise CrabException(msg)
185 <            if len(self.seWhiteList) == 0 or  len(self.seWhiteList.split(',')) != 1:
185 >            if len(self.seWhiteList) != 1:
186                  msg = 'You are selecting no_block_boundary=1 which requires to choose one and only one site.\n'
187                  msg += "\tPlease set se_white_list with the site's storage element name."
188                  raise  CrabException(msg)
# Line 195 | Line 220 | class JobSplitter:
220          if (self.selectNumberOfJobs):
221              common.logger.info("May not create the exact number_of_jobs requested.")
222  
223 +        if (self.theNumberOfJobs < 0):
224 +            common.logger.info("ERROR: Negative number_of_jobs requested. Will result in no jobs.")
225 +            
226          # old... to remove Daniele
227          totalNumberOfJobs = 999999999
228  
# Line 279 | Line 307 | class JobSplitter:
307                                  fullString = parString[:-1]
308                                  if self.useParent==1:
309                                      fullParentString = pString[:-1]
310 <                                    list_of_lists.append([fullString,fullParentString,str(-1),str(jobSkipEventCount)])
310 >                                    list_of_lists.append([fullString,fullParentString,str(-1),str(jobSkipEventCount),block])
311                                  else:
312 <                                    list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
312 >                                    list_of_lists.append([fullString,str(-1),str(jobSkipEventCount),block])
313                                  msg += "Job %s can run over %s  events (last file in block).\n"%(str(jobCount+1), str(filesEventCount - jobSkipEventCount))
314                                  jobDestination.append(blockSites[block])
315                                  msg += "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
# Line 308 | Line 336 | class JobSplitter:
336                          fullString = parString[:-1]
337                          if self.useParent==1:
338                              fullParentString = pString[:-1]
339 <                            list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
339 >                            list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount),block])
340                          else:
341 <                            list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
341 >                            list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount),block])
342                          msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
343                          jobDestination.append(blockSites[block])
344                          msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
# Line 333 | Line 361 | class JobSplitter:
361                          fullString = parString[:-1]
362                          if self.useParent==1:
363                              fullParentString = pString[:-1]
364 <                            list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
364 >                            list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount),block])
365                          else:
366 <                            list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
366 >                            list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount),block])
367                          msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
368                          jobDestination.append(blockSites[block])
369                          msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
# Line 368 | Line 396 | class JobSplitter:
396  
397         # prepare dict output
398          dictOut = {}
399 <        dictOut['params']= ['InputFiles','MaxEvents','SkipEvents']
400 <        if self.useParent: dictOut['params']= ['InputFiles','ParentFiles','MaxEvents','SkipEvents']
399 >        dictOut['params']= ['InputFiles','MaxEvents','SkipEvents','InputBlocks']
400 >        if self.useParent: dictOut['params']= ['InputFiles','ParentFiles','MaxEvents','SkipEvents','InputBlocks']
401          dictOut['args'] = list_of_lists
402          dictOut['jobDestination'] = jobDestination
403          dictOut['njobs']=self.total_number_of_jobs
# Line 428 | Line 456 | class JobSplitter:
456              common.logger.info(msg)
457  
458          if bloskNoSite == allBlock:
459 <            msg += 'Requested jobs cannot be Created! \n'
459 >            msg = 'Requested jobs cannot be Created! \n'
460              if self.cfg_params.has_key('GRID.se_white_list'):
461                  msg += '\tWARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
462                  msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
# Line 464 | Line 492 | class JobSplitter:
492              except:
493                  continue
494              wmbsFile = File(f['LogicalFileName'])
495 +            if not  blockSites[block]:
496 +                msg = 'WARNING: No sites are hosting any part of data for block: %s\n' %block                
497 +                msg += 'Related jobs will not be submitted and this block of data can not be analyzed'
498 +                common.logger.debug(msg)
499              [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
500              wmbsFile['block'] = block
501              runNum = f['RunsList'][0]['RunNumber']
# Line 494 | Line 526 | class JobSplitter:
526                  parString = ''
527                  for file in res['lfns']:
528                      parString += file + ','
529 +                list_of_blocks.append(res['block'])
530                  fullString = parString[:-1]
531 <                list_of_lists.append([fullString,str(-1),str(0)])
531 >                blockString=','.join(list_of_blocks)
532 >                list_of_lists.append([fullString,str(-1),str(0),blockString])
533                  #need to check single file location
534                  jobDestination.append(res['locations'])
501                list_of_blocks.append(res['block'])
535                  count +=1
536          # prepare dict output
537          dictOut = {}
538 <        dictOut['params']= ['InputFiles','MaxEvents','SkipEvents']
538 >        dictOut['params']= ['InputFiles','MaxEvents','SkipEvents','InputBlocks']
539          dictOut['args'] = list_of_lists
540          dictOut['jobDestination'] = jobDestination
541          dictOut['njobs']=count
509
542          self.cacheBlocks(list_of_blocks,jobDestination)
543  
544          return dictOut
# Line 568 | Line 600 | class JobSplitter:
600  
601          managedGenerators =self.args['managedGenerators']
602          generator = self.args['generator']
603 <        firstLumi = self.cfg_params.get('CMSSW.first_lumi', 1)
603 >        firstLumi = int(self.cfg_params.get('CMSSW.first_lumi', 1))
604  
605          self.prepareSplittingNoInput()
606  
# Line 684 | Line 716 | class JobSplitter:
716                  continue
717              wmbsFile = File(jobFile['LogicalFileName'])
718              if not  blockSites[block]:
719 <                wmbsFile['locations'].add('Nowhere')
719 >                msg = 'WARNING: No sites are hosting any part of data for block: %s\n' %block
720 >                msg += 'Related jobs will not be submitted and this block of data can not be analyzed'
721 >                common.logger.debug(msg)
722 >               # wmbsFile['locations'].add('Nowhere')
723              [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
724              wmbsFile['block'] = block
725              for lumi in lumisPerFile[jobFile['LogicalFileName']]:
# Line 707 | Line 742 | class JobSplitter:
742          lumisCreated = 0
743          list_of_blocks = []
744          if not self.limitJobLumis:
745 <            self.lumisPerJob = pubdata.getMaxLumis() // self.theNumberOfJobs + 1
745 >            if self.totalNLumis > 0:
746 >                self.lumisPerJob = max(self.totalNLumis // self.theNumberOfJobs,1)
747 >            else:
748 >                self.lumisPerJob = pubdata.getMaxLumis() // self.theNumberOfJobs + 1
749              common.logger.info('Each job will process about %s lumis.' %
750                                  self.lumisPerJob)
751  
# Line 722 | Line 760 | class JobSplitter:
760                  lumis = []
761                  lfns  = []
762                  if self.useParent==1:
763 <                 parentlfns  = []  
763 >                 parentlfns  = []
764                   pString =""
765  
766                  locations = []
# Line 754 | Line 792 | class JobSplitter:
792                  fileString = ','.join(lfns)
793                  lumiLister = LumiList(lumis = lumis)
794                  lumiString = lumiLister.getCMSSWString()
795 +                blockString=','.join(blocks)
796                  if self.useParent==1:
797                    common.logger.debug("Files: "+fileString+" with the following parents: "+pString[:-1])
798                    pfileString = pString[:-1]
799 <                  list_of_lists.append([fileString, pfileString, str(-1), str(0), lumiString])
800 <                else:
801 <                 list_of_lists.append([fileString, str(-1), str(0), lumiString])
799 >                  list_of_lists.append([fileString, pfileString, str(-1), str(0), lumiString,blockString])
800 >                else:
801 >                 list_of_lists.append([fileString, str(-1), str(0), lumiString, blockString])
802                  list_of_blocks.append(blocks)
803                  jobDestination.append(locations)
804                  jobCount += 1
# Line 771 | Line 810 | class JobSplitter:
810  
811          # Prepare dict output matching back to non-WMBS job creation
812          dictOut = {}
813 <        dictOut['params'] = ['InputFiles', 'MaxEvents', 'SkipEvents', 'Lumis']
814 <        if self.useParent==1:  
815 <         dictOut['params']= ['InputFiles','ParentFiles','MaxEvents','SkipEvents','Lumis']
813 >        dictOut['params'] = ['InputFiles', 'MaxEvents', 'SkipEvents', 'Lumis','InputBlocks']
814 >        if self.useParent==1:
815 >         dictOut['params']= ['InputFiles','ParentFiles','MaxEvents','SkipEvents','Lumis','InputBlocks']
816          dictOut['args'] = list_of_lists
817          dictOut['jobDestination'] = jobDestination
818          dictOut['njobs'] = jobCount
780
819          self.cacheBlocks(list_of_blocks,jobDestination)
820  
821          return dictOut

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines