ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Splitter.py
(Generate patch)

Comparing COMP/CRAB/python/Splitter.py (file contents):
Revision 1.18 by slacapra, Tue Jun 9 13:12:06 2009 UTC vs.
Revision 1.22 by ewv, Wed Jun 17 20:58:07 2009 UTC

# Line 57 | Line 57 | class JobSplitter:
57                self.list_of_args - File(s) job will run on (a list of lists)
58          """
59  
60 <        jobDestination=[]  
60 >        jobDestination=[]
61          self.checkUserSettings()
62          if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
63              msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
64              raise CrabException(msg)
65 <
66 <        blockSites = self.args['blockSites']
65 >
66 >        blockSites = self.args['blockSites']
67          pubdata = self.args['pubdata']
68          filesbyblock=pubdata.getFiles()
69  
# Line 126 | Line 126 | class JobSplitter:
126          parString = ""
127          pString = ""
128          filesEventCount = 0
129 +        msg=''
130  
131          # ---- Iterate over the blocks in the dataset until ---- #
132          # ---- we've met the requested total # of events    ---- #
# Line 197 | Line 198 | class JobSplitter:
198                                      list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
199                                  msg += "Job %s can run over %s  events (last file in block).\n"%(str(jobCount+1), str(filesEventCount - jobSkipEventCount))
200                                  jobDestination.append(blockSites[block])
201 <                                msg += "Job %s Destination: %s\n"%(str(jobCount+1),str(jobDestination[jobCount]))
201 >                                msg += "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
202                                  # fill jobs of block dictionary
203                                  jobsOfBlock[block].append(jobCount+1)
204                                  # reset counter
# Line 226 | Line 227 | class JobSplitter:
227                              list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
228                          msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
229                          jobDestination.append(blockSites[block])
230 <                        msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(jobDestination[jobCount]))
230 >                        msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
231                          jobsOfBlock[block].append(jobCount+1)
232                          # reset counter
233                          jobCount = jobCount + 1
# Line 251 | Line 252 | class JobSplitter:
252                              list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
253                          msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
254                          jobDestination.append(blockSites[block])
255 <                        msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(jobDestination[jobCount]))
255 >                        msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
256                          jobsOfBlock[block].append(jobCount+1)
257                          # increase counter
258                          jobCount = jobCount + 1
# Line 265 | Line 266 | class JobSplitter:
266                          pString_tmp=''
267                          if self.useParent==1:
268                              for f in parent : pString_tmp +=  f + ','
269 <                        pString =  pString_tmp
269 >                        pString =  pString_tmp
270                          parString =  file + ','
271                      pass # END if
272                  pass # END while (iterate over files in the block)
# Line 275 | Line 276 | class JobSplitter:
276          if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
277              common.logger.info("Could not run on all requested events because some blocks not hosted at allowed sites.")
278          common.logger.info(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
279 <
279 >
280          # skip check on  block with no sites  DD
281          if noBboundary == 0 : self.checkBlockNoSite(blocks,jobsOfBlock,blockSites)
282  
# Line 291 | Line 292 | class JobSplitter:
292  
293          # keep trace of block with no sites to print a warning at the end
294  
295 <    def checkBlockNoSite(self,blocks,jobsOfBlock,blockSites):  
295 >    def checkBlockNoSite(self,blocks,jobsOfBlock,blockSites):
296          # screen output
297          screenOutput = "List of jobs and available destination sites:\n\n"
298          noSiteBlock = []
# Line 303 | Line 304 | class JobSplitter:
304              if block in jobsOfBlock.keys() :
305                  blockCounter += 1
306                  allBlock.append( blockCounter )
307 +                sites=self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],[block]),[block])
308                  screenOutput += "Block %5i: jobs %20s: sites: %s\n" % (blockCounter,spanRanges(jobsOfBlock[block]),
309 <                    ','.join(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],[block]),[block])))
310 <                if len(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],[block]),[block])) == 0:
309 >                    ', '.join(SE2CMS(sites)))
310 >                if len(sites) == 0:
311                      noSiteBlock.append( spanRanges(jobsOfBlock[block]) )
312                      bloskNoSite.append( blockCounter )
313  
# Line 336 | Line 338 | class JobSplitter:
338              common.logger.info(msg)
339  
340          if bloskNoSite == allBlock:
341 <            raise CrabException('No jobs created')
341 >            raise CrabException('No jobs created')
342  
343          return
344  
345  
346   ########################################################################
347 <    def jobSplittingByRun(self):
347 >    def jobSplittingByRun(self):
348          """
349          """
350 <        from sets import Set  
350 >        from sets import Set
351          from WMCore.JobSplitting.RunBased import RunBased
352          from WMCore.DataStructs.Workflow import Workflow
353          from WMCore.DataStructs.File import File
354          from WMCore.DataStructs.Fileset import Fileset
355          from WMCore.DataStructs.Subscription import Subscription
356          from WMCore.JobSplitting.SplitterFactory import SplitterFactory
357 <        from WMCore.DataStructs.Run import Run
357 >        from WMCore.DataStructs.Run import Run
358  
359          self.checkUserSettings()
360 <        blockSites = self.args['blockSites']
360 >        blockSites = self.args['blockSites']
361          pubdata = self.args['pubdata']
362  
363          if self.selectNumberOfJobs == 0 :
364              self.theNumberOfJobs = 9999999
365          blocks = {}
366 <        runList = []
366 >        runList = []
367          thefiles = Fileset(name='FilesToSplit')
368          fileList = pubdata.getListFiles()
369          for f in fileList:
370              block = f['Block']['Name']
371 <            try:
371 >            try:
372                  f['Block']['StorageElementList'].extend(blockSites[block])
373              except:
374                  continue
# Line 374 | Line 376 | class JobSplitter:
376              [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
377              wmbsFile['block'] = block
378              runNum = f['RunsList'][0]['RunNumber']
379 <            runList.append(runNum)
379 >            runList.append(runNum)
380              myRun = Run(runNumber=runNum)
381              wmbsFile.addRun( myRun )
382              thefiles.addFile(
383                  wmbsFile
384                  )
385 <
385 >
386          work = Workflow()
387          subs = Subscription(
388          fileset = thefiles,
# Line 389 | Line 391 | class JobSplitter:
391          type = "Processing")
392          splitter = SplitterFactory()
393          jobfactory = splitter(subs)
394 <        
395 <        #loop over all runs
394 >
395 >        #loop over all runs
396          set = Set(runList)
397          list_of_lists = []
398          jobDestination = []
# Line 398 | Line 400 | class JobSplitter:
400          for jobGroup in  jobfactory():
401              if count <  self.theNumberOfJobs:
402                  res = self.getJobInfo(jobGroup)
403 <                parString = ''
403 >                parString = ''
404                  for file in res['lfns']:
405                      parString += file + ','
406                  fullString = parString[:-1]
407 <                list_of_lists.append([fullString,str(-1),str(0)])    
407 >                list_of_lists.append([fullString,str(-1),str(0)])
408                  #need to check single file location
409 <                jobDestination.append(res['locations'])  
409 >                jobDestination.append(res['locations'])
410                  count +=1
411         # prepare dict output
412          dictOut = {}
# Line 417 | Line 419 | class JobSplitter:
419  
420      def getJobInfo( self,jobGroup ):
421          res = {}
422 <        lfns = []        
423 <        locations = []        
422 >        lfns = []
423 >        locations = []
424          tmp_check=0
425          for job in jobGroup.jobs:
426              for file in job.getFiles():
427 <                lfns.append(file['lfn'])
427 >                lfns.append(file['lfn'])
428                  for loc in file['locations']:
429                      if tmp_check < 1 :
430                          locations.append(loc)
431 <                tmp_check = tmp_check + 1
432 <                ### qui va messo il check per la locations
433 <        res['lfns'] = lfns
434 <        res['locations'] = locations
435 <        return res                
436 <      
431 >                tmp_check = tmp_check + 1
432 >                ### qui va messo il check per la locations
433 >        res['lfns'] = lfns
434 >        res['locations'] = locations
435 >        return res
436 >
437   ########################################################################
438      def jobSplittingNoInput(self):
439          """
# Line 491 | Line 493 | class JobSplitter:
493                  ## pythia first run
494                  args.append(str(firstRun)+str(i))
495              if (generator in managedGenerators):
496 <                if (generator == 'comphep' and i == 0):
496 >               args.append(generator)
497 >               if (generator == 'comphep' and i == 0):
498                      # COMPHEP is brain-dead and wants event #'s like 1,100,200,300
499                      args.append('1')
500 <                else:
500 >               else:
501                      args.append(str(i*self.eventsPerJob))
502              args.append(str(self.eventsPerJob))
503              self.list_of_args.append(args)
# Line 504 | Line 507 | class JobSplitter:
507          dictOut['params'] = ['MaxEvents']
508          if (firstRun):
509              dictOut['params'] = ['FirstRun','MaxEvents']
510 <            if ( generator in managedGenerators ) : dictOut['params'] = ['FirstRun', 'FirstEvent', 'MaxEvents']
511 <        else:  
512 <            if (generator in managedGenerators) : dictOut['params'] = ['FirstEvent', 'MaxEvents']
510 >            if ( generator in managedGenerators ) :
511 >                dictOut['params'] = ['FirstRun', 'Generator', 'FirstEvent', 'MaxEvents']
512 >        else:
513 >            if (generator in managedGenerators) :
514 >                dictOut['params'] = ['Generator', 'FirstEvent', 'MaxEvents']
515          dictOut['args'] = self.list_of_args
516          dictOut['jobDestination'] = jobDestination
517          dictOut['njobs']=self.total_number_of_jobs
# Line 544 | Line 549 | class JobSplitter:
549          dictOut['jobDestination'] = jobDestination
550          dictOut['njobs']=self.total_number_of_jobs
551          return dictOut
547
552  
553 <    def jobSplittingByLumi(self):
553 >
554 >    def jobSplittingByLumi(self):
555          """
556          """
557          return
# Line 554 | Line 559 | class JobSplitter:
559          """
560          Define key splittingType matrix
561          """
562 <        SplitAlogs = {
563 <                     'EventBased'           : self.jobSplittingByEvent,
562 >        SplitAlogs = {
563 >                     'EventBased'           : self.jobSplittingByEvent,
564                       'RunBased'             : self.jobSplittingByRun,
565 <                     'LumiBased'            : self.jobSplittingByLumi,
566 <                     'NoInput'              : self.jobSplittingNoInput,
565 >                     'LumiBased'            : self.jobSplittingByLumi,
566 >                     'NoInput'              : self.jobSplittingNoInput,
567                       'ForScript'            : self.jobSplittingForScript
568 <                     }  
568 >                     }
569          return SplitAlogs
570  

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines