ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Splitter.py
(Generate patch)

Comparing COMP/CRAB/python/Splitter.py (file contents):
Revision 1.14 by spiga, Tue May 26 16:53:23 2009 UTC vs.
Revision 1.23 by spiga, Fri Jun 19 09:54:22 2009 UTC

# Line 11 | Line 11 | class JobSplitter:
11          # init BlackWhiteListParser
12          seWhiteList = cfg_params.get('GRID.se_white_list',[])
13          seBlackList = cfg_params.get('GRID.se_black_list',[])
14 <        self.blackWhiteListParser = SEBlackWhiteListParser(seWhiteList, seBlackList, common.logger)
14 >        self.blackWhiteListParser = SEBlackWhiteListParser(seWhiteList, seBlackList, common.logger())
15  
16  
17      def checkUserSettings(self):
# Line 57 | Line 57 | class JobSplitter:
57                self.list_of_args - File(s) job will run on (a list of lists)
58          """
59  
60 <        jobDestination=[]  
60 >        jobDestination=[]
61          self.checkUserSettings()
62          if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
63              msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
64              raise CrabException(msg)
65 <
66 <        blockSites = self.args['blockSites']
65 >
66 >        blockSites = self.args['blockSites']
67          pubdata = self.args['pubdata']
68          filesbyblock=pubdata.getFiles()
69  
# Line 124 | Line 124 | class JobSplitter:
124          jobsOfBlock = {}
125  
126          parString = ""
127 +        pString = ""
128          filesEventCount = 0
129 +        msg=''
130  
131          # ---- Iterate over the blocks in the dataset until ---- #
132          # ---- we've met the requested total # of events    ---- #
# Line 146 | Line 148 | class JobSplitter:
148                  if noBboundary == 0: # DD
149                      # ---- New block => New job ---- #
150                      parString = ""
151 +                    pString=""
152                      # counter for number of events in files currently worked on
153                      filesEventCount = 0
154                  # flag if next while loop should touch new file
# Line 155 | Line 158 | class JobSplitter:
158  
159                  # ---- Iterate over the files in the block until we've met the requested ---- #
160                  # ---- total # of events or we've gone over all the files in this block  ---- #
161 <                pString=''
161 >                msg='\n'
162                  while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
163                      file = files[fileCount]
164                      if self.useParent==1:
165                          parent = self.parentFiles[file]
163                        for f in parent :
164                            pString +=  f + ','
166                          common.logger.log(10-1, "File "+str(file)+" has the following parents: "+str(parent))
167                      if newFile :
168                          try:
# Line 171 | Line 172 | class JobSplitter:
172                              filesEventCount += numEventsInFile
173                              # Add file to current job
174                              parString +=  file + ','
175 +                            if self.useParent==1:
176 +                                for f in parent :
177 +                                    pString += f  + ','
178                              newFile = 0
179                          except KeyError:
180                              common.logger.info("File "+str(file)+" has unknown number of events: skipping")
# Line 192 | Line 196 | class JobSplitter:
196                                      list_of_lists.append([fullString,fullParentString,str(-1),str(jobSkipEventCount)])
197                                  else:
198                                      list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
199 <                                common.logger.debug("Job "+str(jobCount+1)+" can run over "+str(filesEventCount - jobSkipEventCount)+" events (last file in block).")
199 >                                msg += "Job %s can run over %s  events (last file in block).\n"%(str(jobCount+1), str(filesEventCount - jobSkipEventCount))
200                                  jobDestination.append(blockSites[block])
201 <                                common.logger.debug("Job "+str(jobCount+1)+" Destination: "+str(jobDestination[jobCount]))
201 >                                msg += "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
202                                  # fill jobs of block dictionary
203                                  jobsOfBlock[block].append(jobCount+1)
204                                  # reset counter
# Line 221 | Line 225 | class JobSplitter:
225                              list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
226                          else:
227                              list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
228 <                        common.logger.debug("Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
228 >                        msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
229                          jobDestination.append(blockSites[block])
230 <                        common.logger.debug("Job "+str(jobCount+1)+" Destination: "+str(jobDestination[jobCount]))
230 >                        msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
231                          jobsOfBlock[block].append(jobCount+1)
232                          # reset counter
233                          jobCount = jobCount + 1
# Line 246 | Line 250 | class JobSplitter:
250                              list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
251                          else:
252                              list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
253 <                        common.logger.debug("Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
253 >                        msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
254                          jobDestination.append(blockSites[block])
255 <                        common.logger.debug("Job "+str(jobCount+1)+" Destination: "+str(jobDestination[jobCount]))
255 >                        msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
256                          jobsOfBlock[block].append(jobCount+1)
257                          # increase counter
258                          jobCount = jobCount + 1
# Line 259 | Line 263 | class JobSplitter:
263                          jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
264                          # remove all but the last file
265                          filesEventCount = self.eventsbyfile[file]
266 +                        pString_tmp=''
267                          if self.useParent==1:
268 <                            for f in parent : pString +=  f + ','
268 >                            for f in parent : pString_tmp +=  f + ','
269 >                        pString =  pString_tmp
270                          parString =  file + ','
271                      pass # END if
272                  pass # END while (iterate over files in the block)
273          pass # END while (iterate over blocks in the dataset)
274 +        common.logger.debug(msg)
275          self.ncjobs = self.total_number_of_jobs = jobCount
276          if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
277              common.logger.info("Could not run on all requested events because some blocks not hosted at allowed sites.")
278          common.logger.info(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
279 <
279 >
280          # skip check on  block with no sites  DD
281          if noBboundary == 0 : self.checkBlockNoSite(blocks,jobsOfBlock,blockSites)
282  
# Line 285 | Line 292 | class JobSplitter:
292  
293          # keep trace of block with no sites to print a warning at the end
294  
295 <    def checkBlockNoSite(self,blocks,jobsOfBlock,blockSites):  
295 >    def checkBlockNoSite(self,blocks,jobsOfBlock,blockSites):
296          # screen output
297          screenOutput = "List of jobs and available destination sites:\n\n"
298          noSiteBlock = []
# Line 297 | Line 304 | class JobSplitter:
304              if block in jobsOfBlock.keys() :
305                  blockCounter += 1
306                  allBlock.append( blockCounter )
307 +                sites=self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],[block]),[block])
308                  screenOutput += "Block %5i: jobs %20s: sites: %s\n" % (blockCounter,spanRanges(jobsOfBlock[block]),
309 <                    ','.join(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],[block]),[block])))
310 <                if len(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],[block]),[block])) == 0:
309 >                    ', '.join(SE2CMS(sites)))
310 >                if len(sites) == 0:
311                      noSiteBlock.append( spanRanges(jobsOfBlock[block]) )
312                      bloskNoSite.append( blockCounter )
313  
# Line 330 | Line 338 | class JobSplitter:
338              common.logger.info(msg)
339  
340          if bloskNoSite == allBlock:
341 <            raise CrabException('No jobs created')
341 >            raise CrabException('No jobs created')
342  
343          return
344  
345  
346   ########################################################################
347 <    def jobSplittingByRun(self):
347 >    def jobSplittingByRun(self):
348          """
349          """
350 <        from sets import Set  
350 >        from sets import Set
351          from WMCore.JobSplitting.RunBased import RunBased
352          from WMCore.DataStructs.Workflow import Workflow
353          from WMCore.DataStructs.File import File
354          from WMCore.DataStructs.Fileset import Fileset
355          from WMCore.DataStructs.Subscription import Subscription
356          from WMCore.JobSplitting.SplitterFactory import SplitterFactory
357 <        from WMCore.DataStructs.Run import Run
357 >        from WMCore.DataStructs.Run import Run
358  
359          self.checkUserSettings()
360 <        blockSites = self.args['blockSites']
360 >        blockSites = self.args['blockSites']
361          pubdata = self.args['pubdata']
362  
363          if self.selectNumberOfJobs == 0 :
364              self.theNumberOfJobs = 9999999
365          blocks = {}
366 <        runList = []
366 >        runList = []
367          thefiles = Fileset(name='FilesToSplit')
368          fileList = pubdata.getListFiles()
369          for f in fileList:
370              block = f['Block']['Name']
371 <            try:
371 >            try:
372                  f['Block']['StorageElementList'].extend(blockSites[block])
373              except:
374                  continue
# Line 368 | Line 376 | class JobSplitter:
376              [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
377              wmbsFile['block'] = block
378              runNum = f['RunsList'][0]['RunNumber']
379 <            runList.append(runNum)
379 >            runList.append(runNum)
380              myRun = Run(runNumber=runNum)
381              wmbsFile.addRun( myRun )
382              thefiles.addFile(
383                  wmbsFile
384                  )
385 <
385 >
386          work = Workflow()
387          subs = Subscription(
388          fileset = thefiles,
# Line 383 | Line 391 | class JobSplitter:
391          type = "Processing")
392          splitter = SplitterFactory()
393          jobfactory = splitter(subs)
394 <        
395 <        #loop over all runs
394 >
395 >        #loop over all runs
396          set = Set(runList)
397          list_of_lists = []
398          jobDestination = []
391
399          count = 0
400 <        for i in list(set):
400 >        for jobGroup in  jobfactory():
401              if count <  self.theNumberOfJobs:
402 <                res = self.getJobInfo(jobfactory())
403 <                parString = ''
402 >                res = self.getJobInfo(jobGroup)
403 >                parString = ''
404                  for file in res['lfns']:
405                      parString += file + ','
406                  fullString = parString[:-1]
407 <                list_of_lists.append([fullString,str(-1),str(0)])    
407 >                list_of_lists.append([fullString,str(-1),str(0)])
408                  #need to check single file location
409 <                jobDestination.append(res['locations'])  
409 >                jobDestination.append(res['locations'])
410                  count +=1
411         # prepare dict output
412          dictOut = {}
# Line 412 | Line 419 | class JobSplitter:
419  
420      def getJobInfo( self,jobGroup ):
421          res = {}
422 <        lfns = []        
423 <        locations = []        
422 >        lfns = []
423 >        locations = []
424          tmp_check=0
425          for job in jobGroup.jobs:
426              for file in job.getFiles():
427 <                lfns.append(file['lfn'])
427 >                lfns.append(file['lfn'])
428                  for loc in file['locations']:
429                      if tmp_check < 1 :
430                          locations.append(loc)
431 <                tmp_check = tmp_check + 1
432 <                ### qui va messo il check per la locations
433 <        res['lfns'] = lfns
434 <        res['locations'] = locations
435 <        return res                
436 <      
431 >                tmp_check = tmp_check + 1
432 >                ### qui va messo il check per la locations
433 >        res['lfns'] = lfns
434 >        res['locations'] = locations
435 >        return res
436 >
437   ########################################################################
438 <    def jobSplittingNoInput(self):
438 >    def prepareSplittingNoInput(self):
439          """
433        Perform job splitting based on number of event per job
440          """
435        common.logger.debug('Splitting per events')
436        self.checkUserSettings()
437        jobDestination=[]
438        if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
439            msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
440            raise CrabException(msg)
441
442        managedGenerators =self.args['managedGenerators']
443        generator = self.args['generator']
444        firstRun = self.cfg_params.get('CMSSW.first_run',None)
445
441          if (self.selectEventsPerJob):
442              common.logger.info('Required '+str(self.eventsPerJob)+' events per job ')
443          if (self.selectNumberOfJobs):
# Line 465 | Line 460 | class JobSplitter:
460              self.total_number_of_jobs = self.theNumberOfJobs
461              self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
462  
463 +
464 +    def jobSplittingNoInput(self):
465 +        """
466 +        Perform job splitting based on number of event per job
467 +        """
468 +        common.logger.debug('Splitting per events')
469 +        self.checkUserSettings()
470 +        jobDestination=[]
471 +        if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
472 +            msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
473 +            raise CrabException(msg)
474 +
475 +        managedGenerators =self.args['managedGenerators']
476 +        generator = self.args['generator']
477 +        firstRun = self.cfg_params.get('CMSSW.first_run',None)
478 +
479 +        self.prepareSplittingNoInput()
480 +
481          common.logger.debug('N jobs  '+str(self.total_number_of_jobs))
482  
483          # is there any remainder?
# Line 486 | Line 499 | class JobSplitter:
499                  ## pythia first run
500                  args.append(str(firstRun)+str(i))
501              if (generator in managedGenerators):
502 <                if (generator == 'comphep' and i == 0):
502 >               args.append(generator)
503 >               if (generator == 'comphep' and i == 0):
504                      # COMPHEP is brain-dead and wants event #'s like 1,100,200,300
505                      args.append('1')
506 <                else:
506 >               else:
507                      args.append(str(i*self.eventsPerJob))
508              args.append(str(self.eventsPerJob))
509              self.list_of_args.append(args)
# Line 499 | Line 513 | class JobSplitter:
513          dictOut['params'] = ['MaxEvents']
514          if (firstRun):
515              dictOut['params'] = ['FirstRun','MaxEvents']
516 <            if ( generator in managedGenerators ) : dictOut['params'] = ['FirstRun', 'FirstEvent', 'MaxEvents']
517 <        else:  
518 <            if (generator in managedGenerators) : dictOut['params'] = ['FirstEvent', 'MaxEvents']
516 >            if ( generator in managedGenerators ) :
517 >                dictOut['params'] = ['FirstRun', 'Generator', 'FirstEvent', 'MaxEvents']
518 >        else:
519 >            if (generator in managedGenerators) :
520 >                dictOut['params'] = ['Generator', 'FirstEvent', 'MaxEvents']
521          dictOut['args'] = self.list_of_args
522          dictOut['jobDestination'] = jobDestination
523          dictOut['njobs']=self.total_number_of_jobs
# Line 521 | Line 537 | class JobSplitter:
537          common.logger.debug('Splitting per job')
538          common.logger.info('Required '+str(self.theNumberOfJobs)+' jobs in total ')
539  
540 <        self.total_number_of_jobs = self.theNumberOfJobs
540 > #        self.total_number_of_jobs = self.theNumberOfJobs
541 >
542 >        self.prepareSplittingNoInput()
543  
544          common.logger.debug('N jobs  '+str(self.total_number_of_jobs))
545  
546          common.logger.info(str(self.total_number_of_jobs)+' jobs can be created')
547  
548          # argument is seed number.$i
549 <        #self.list_of_args = []
549 >        self.list_of_args = []
550          for i in range(self.total_number_of_jobs):
551 +            args=[]
552              jobDestination.append([""])
553 <        #   self.list_of_args.append([str(i)])
553 >            if self.eventsPerJob != 0 :
554 >                args.append(str(self.eventsPerJob))
555 >                self.list_of_args.append(args)
556  
557         # prepare dict output
558          dictOut = {}
559 <        dictOut['args'] = [] # self.list_of_args
559 >        dictOut['params'] = ['MaxEvents']
560 >        dictOut['args'] =  self.list_of_args
561          dictOut['jobDestination'] = jobDestination
562          dictOut['njobs']=self.total_number_of_jobs
563          return dictOut
542
564  
565 <    def jobSplittingByLumi(self):
565 >
566 >    def jobSplittingByLumi(self):
567          """
568          """
569          return
# Line 549 | Line 571 | class JobSplitter:
571          """
572          Define key splittingType matrix
573          """
574 <        SplitAlogs = {
575 <                     'EventBased'           : self.jobSplittingByEvent,
574 >        SplitAlogs = {
575 >                     'EventBased'           : self.jobSplittingByEvent,
576                       'RunBased'             : self.jobSplittingByRun,
577 <                     'LumiBased'            : self.jobSplittingByLumi,
578 <                     'NoInput'              : self.jobSplittingNoInput,
577 >                     'LumiBased'            : self.jobSplittingByLumi,
578 >                     'NoInput'              : self.jobSplittingNoInput,
579                       'ForScript'            : self.jobSplittingForScript
580 <                     }  
580 >                     }
581          return SplitAlogs
582  

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines