ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Splitter.py
(Generate patch)

Comparing COMP/CRAB/python/Splitter.py (file contents):
Revision 1.7 by spiga, Wed Feb 11 22:13:03 2009 UTC vs.
Revision 1.14 by spiga, Tue May 26 16:53:23 2009 UTC

# Line 1 | Line 1
1   import common
2 from crab_logger import Logger
2   from crab_exceptions import *
3   from crab_util import *
4   from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
# Line 10 | Line 9 | class JobSplitter:
9          self.args=args
10          #self.maxEvents
11          # init BlackWhiteListParser
12 <        seWhiteList = cfg_params.get('EDG.se_white_list',[])
13 <        seBlackList = cfg_params.get('EDG.se_black_list',[])
12 >        seWhiteList = cfg_params.get('GRID.se_white_list',[])
13 >        seBlackList = cfg_params.get('GRID.se_black_list',[])
14          self.blackWhiteListParser = SEBlackWhiteListParser(seWhiteList, seBlackList, common.logger)
15  
16  
# Line 92 | Line 91 | class JobSplitter:
91          # If user requested more events than are in the dataset
92          elif (totalEventsRequested > self.maxEvents):
93              eventsRemaining = self.maxEvents
94 <            common.logger.message("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
94 >            common.logger.info("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
95          # If user requested less events than are in the dataset
96          else:
97              eventsRemaining = totalEventsRequested
# Line 108 | Line 107 | class JobSplitter:
107              eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs)
108  
109          if (self.selectNumberOfJobs):
110 <            common.logger.message("May not create the exact number_of_jobs requested.")
110 >            common.logger.info("May not create the exact number_of_jobs requested.")
111  
112          # old... to remove Daniele
113          totalNumberOfJobs = 999999999
# Line 137 | Line 136 | class JobSplitter:
136  
137              if self.eventsbyblock.has_key(block) :
138                  numEventsInBlock = self.eventsbyblock[block]
139 <                common.logger.debug(5,'Events in Block File '+str(numEventsInBlock))
139 >                common.logger.debug('Events in Block File '+str(numEventsInBlock))
140  
141                  files = filesbyblock[block]
142                  numFilesInBlock = len(files)
# Line 162 | Line 161 | class JobSplitter:
161                      if self.useParent==1:
162                          parent = self.parentFiles[file]
163                          for f in parent :
164 <                            pString += '\\\"' + f + '\\\"\,'
165 <                        common.logger.debug(6, "File "+str(file)+" has the following parents: "+str(parent))
167 <                        common.logger.write("File "+str(file)+" has the following parents: "+str(parent))
164 >                            pString +=  f + ','
165 >                        common.logger.log(10-1, "File "+str(file)+" has the following parents: "+str(parent))
166                      if newFile :
167                          try:
168                              numEventsInFile = self.eventsbyfile[file]
169 <                            common.logger.debug(6, "File "+str(file)+" has "+str(numEventsInFile)+" events")
169 >                            common.logger.log(10-1, "File "+str(file)+" has "+str(numEventsInFile)+" events")
170                              # increase filesEventCount
171                              filesEventCount += numEventsInFile
172                              # Add file to current job
173 <                            parString += '\\\"' + file + '\\\"\,'
173 >                            parString +=  file + ','
174                              newFile = 0
175                          except KeyError:
176 <                            common.logger.message("File "+str(file)+" has unknown number of events: skipping")
176 >                            common.logger.info("File "+str(file)+" has unknown number of events: skipping")
177  
178                      eventsPerJobRequested = min(eventsPerJobRequested, eventsRemaining)
179                      # if less events in file remain than eventsPerJobRequested
# Line 188 | Line 186 | class JobSplitter:
186                              if ( fileCount == numFilesInBlock-1 ) :
187                                  # end job using last file, use remaining events in block
188                                  # close job and touch new file
189 <                                fullString = parString[:-2]
189 >                                fullString = parString[:-1]
190                                  if self.useParent==1:
191 <                                    fullParentString = pString[:-2]
191 >                                    fullParentString = pString[:-1]
192                                      list_of_lists.append([fullString,fullParentString,str(-1),str(jobSkipEventCount)])
193                                  else:
194                                      list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
195 <                                common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(filesEventCount - jobSkipEventCount)+" events (last file in block).")
195 >                                common.logger.debug("Job "+str(jobCount+1)+" can run over "+str(filesEventCount - jobSkipEventCount)+" events (last file in block).")
196                                  jobDestination.append(blockSites[block])
197 <                                common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(jobDestination[jobCount]))
197 >                                common.logger.debug("Job "+str(jobCount+1)+" Destination: "+str(jobDestination[jobCount]))
198                                  # fill jobs of block dictionary
199                                  jobsOfBlock[block].append(jobCount+1)
200                                  # reset counter
# Line 217 | Line 215 | class JobSplitter:
215                      # if events in file equal to eventsPerJobRequested
216                      elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
217                          # close job and touch new file
218 <                        fullString = parString[:-2]
218 >                        fullString = parString[:-1]
219                          if self.useParent==1:
220 <                            fullParentString = pString[:-2]
220 >                            fullParentString = pString[:-1]
221                              list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
222                          else:
223                              list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
224 <                        common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
224 >                        common.logger.debug("Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
225                          jobDestination.append(blockSites[block])
226 <                        common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(jobDestination[jobCount]))
226 >                        common.logger.debug("Job "+str(jobCount+1)+" Destination: "+str(jobDestination[jobCount]))
227                          jobsOfBlock[block].append(jobCount+1)
228                          # reset counter
229                          jobCount = jobCount + 1
# Line 242 | Line 240 | class JobSplitter:
240                      # if more events in file remain than eventsPerJobRequested
241                      else :
242                          # close job but don't touch new file
243 <                        fullString = parString[:-2]
243 >                        fullString = parString[:-1]
244                          if self.useParent==1:
245 <                            fullParentString = pString[:-2]
245 >                            fullParentString = pString[:-1]
246                              list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
247                          else:
248                              list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
249 <                        common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
249 >                        common.logger.debug("Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
250                          jobDestination.append(blockSites[block])
251 <                        common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(jobDestination[jobCount]))
251 >                        common.logger.debug("Job "+str(jobCount+1)+" Destination: "+str(jobDestination[jobCount]))
252                          jobsOfBlock[block].append(jobCount+1)
253                          # increase counter
254                          jobCount = jobCount + 1
# Line 262 | Line 260 | class JobSplitter:
260                          # remove all but the last file
261                          filesEventCount = self.eventsbyfile[file]
262                          if self.useParent==1:
263 <                            for f in parent : pString += '\\\"' + f + '\\\"\,'
264 <                        parString = '\\\"' + file + '\\\"\,'
263 >                            for f in parent : pString +=  f + ','
264 >                        parString =  file + ','
265                      pass # END if
266                  pass # END while (iterate over files in the block)
267          pass # END while (iterate over blocks in the dataset)
268          self.ncjobs = self.total_number_of_jobs = jobCount
269          if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
270 <            common.logger.message("Could not run on all requested events because some blocks not hosted at allowed sites.")
271 <        common.logger.message(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
270 >            common.logger.info("Could not run on all requested events because some blocks not hosted at allowed sites.")
271 >        common.logger.info(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
272  
273          # skip check on  block with no sites  DD
274          if noBboundary == 0 : self.checkBlockNoSite(blocks,jobsOfBlock,blockSites)
275  
276         # prepare dict output
277          dictOut = {}
278 +        dictOut['params']= ['InputFiles','MaxEvents','SkipEvents']
279 +        if self.useParent: dictOut['params']= ['InputFiles','ParentFiles','MaxEvents','SkipEvents']
280          dictOut['args'] = list_of_lists
281          dictOut['jobDestination'] = jobDestination
282          dictOut['njobs']=self.total_number_of_jobs
# Line 290 | Line 290 | class JobSplitter:
290          screenOutput = "List of jobs and available destination sites:\n\n"
291          noSiteBlock = []
292          bloskNoSite = []
293 +        allBlock = []
294  
295          blockCounter = 0
296          for block in blocks:
297              if block in jobsOfBlock.keys() :
298                  blockCounter += 1
299 +                allBlock.append( blockCounter )
300                  screenOutput += "Block %5i: jobs %20s: sites: %s\n" % (blockCounter,spanRanges(jobsOfBlock[block]),
301                      ','.join(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],[block]),[block])))
302                  if len(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],[block]),[block])) == 0:
303                      noSiteBlock.append( spanRanges(jobsOfBlock[block]) )
304                      bloskNoSite.append( blockCounter )
305  
306 <        common.logger.message(screenOutput)
306 >        common.logger.info(screenOutput)
307          if len(noSiteBlock) > 0 and len(bloskNoSite) > 0:
308              msg = 'WARNING: No sites are hosting any part of data for block:\n                '
309              virgola = ""
# Line 316 | Line 318 | class JobSplitter:
318              for range_jobs in noSiteBlock:
319                  msg += str(range_jobs) + virgola
320              msg += '\n               will not be submitted and this block of data can not be analyzed!\n'
321 <            if self.cfg_params.has_key('EDG.se_white_list'):
322 <                msg += 'WARNING: SE White List: '+self.cfg_params['EDG.se_white_list']+'\n'
321 >            if self.cfg_params.has_key('GRID.se_white_list'):
322 >                msg += 'WARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
323                  msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
324                  msg += 'Please check if the dataset is available at this site!)\n'
325 <            if self.cfg_params.has_key('EDG.ce_white_list'):
326 <                msg += 'WARNING: CE White List: '+self.cfg_params['EDG.ce_white_list']+'\n'
325 >            if self.cfg_params.has_key('GRID.ce_white_list'):
326 >                msg += 'WARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
327                  msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
328                  msg += 'Please check if the dataset is available at this site!)\n'
329  
330 <            common.logger.message(msg)
330 >            common.logger.info(msg)
331 >
332 >        if bloskNoSite == allBlock:
333 >            raise CrabException('No jobs created')
334  
335          return
336  
# Line 354 | Line 359 | class JobSplitter:
359          thefiles = Fileset(name='FilesToSplit')
360          fileList = pubdata.getListFiles()
361          for f in fileList:
357           # print f
362              block = f['Block']['Name']
359          #  if not blocks.has_key(block):
360          #      blocks[block] = reader.listFileBlockLocation(block)
363              try:
364                  f['Block']['StorageElementList'].extend(blockSites[block])
365              except:
# Line 393 | Line 395 | class JobSplitter:
395                  res = self.getJobInfo(jobfactory())
396                  parString = ''
397                  for file in res['lfns']:
398 <                    parString += '\\\"' + file + '\\\"\,'
399 <                fullString = parString[:-2]
398 >                    parString += file + ','
399 >                fullString = parString[:-1]
400                  list_of_lists.append([fullString,str(-1),str(0)])    
401                  #need to check single file location
402                  jobDestination.append(res['locations'])  
403                  count +=1
402        #print jobDestination
404         # prepare dict output
405          dictOut = {}
406 +        dictOut['params']= ['InputFiles','MaxEvents','SkipEvents']
407          dictOut['args'] = list_of_lists
408          dictOut['jobDestination'] = jobDestination
409          dictOut['njobs']=count
# Line 419 | Line 421 | class JobSplitter:
421                  for loc in file['locations']:
422                      if tmp_check < 1 :
423                          locations.append(loc)
424 <                    tmp_check = tmp_check + 1
424 >                tmp_check = tmp_check + 1
425                  ### qui va messo il check per la locations
426          res['lfns'] = lfns
427          res['locations'] = locations
# Line 430 | Line 432 | class JobSplitter:
432          """
433          Perform job splitting based on number of event per job
434          """
435 <        common.logger.debug(5,'Splitting per events')
435 >        common.logger.debug('Splitting per events')
436          self.checkUserSettings()
437          jobDestination=[]
438          if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
# Line 442 | Line 444 | class JobSplitter:
444          firstRun = self.cfg_params.get('CMSSW.first_run',None)
445  
446          if (self.selectEventsPerJob):
447 <            common.logger.message('Required '+str(self.eventsPerJob)+' events per job ')
447 >            common.logger.info('Required '+str(self.eventsPerJob)+' events per job ')
448          if (self.selectNumberOfJobs):
449 <            common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
449 >            common.logger.info('Required '+str(self.theNumberOfJobs)+' jobs in total ')
450          if (self.selectTotalNumberEvents):
451 <            common.logger.message('Required '+str(self.total_number_of_events)+' events in total ')
451 >            common.logger.info('Required '+str(self.total_number_of_events)+' events in total ')
452  
453          if (self.total_number_of_events < 0):
454              msg='Cannot split jobs per Events with "-1" as total number of events'
# Line 463 | Line 465 | class JobSplitter:
465              self.total_number_of_jobs = self.theNumberOfJobs
466              self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
467  
468 <        common.logger.debug(5,'N jobs  '+str(self.total_number_of_jobs))
468 >        common.logger.debug('N jobs  '+str(self.total_number_of_jobs))
469  
470          # is there any remainder?
471          check = int(self.total_number_of_events) - (int(self.total_number_of_jobs)*self.eventsPerJob)
472  
473 <        common.logger.debug(5,'Check  '+str(check))
473 >        common.logger.debug('Check  '+str(check))
474  
475 <        common.logger.message(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
475 >        common.logger.info(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
476          if check > 0:
477 <            common.logger.message('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
477 >            common.logger.info('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
478  
479          # argument is seed number.$i
480          self.list_of_args = []
# Line 492 | Line 494 | class JobSplitter:
494              args.append(str(self.eventsPerJob))
495              self.list_of_args.append(args)
496         # prepare dict output
497 +
498          dictOut = {}
499 +        dictOut['params'] = ['MaxEvents']
500 +        if (firstRun):
501 +            dictOut['params'] = ['FirstRun','MaxEvents']
502 +            if ( generator in managedGenerators ) : dictOut['params'] = ['FirstRun', 'FirstEvent', 'MaxEvents']
503 +        else:  
504 +            if (generator in managedGenerators) : dictOut['params'] = ['FirstEvent', 'MaxEvents']
505          dictOut['args'] = self.list_of_args
506          dictOut['jobDestination'] = jobDestination
507          dictOut['njobs']=self.total_number_of_jobs
# Line 509 | Line 518 | class JobSplitter:
518              msg = 'must specify  number_of_jobs.'
519              raise crabexception(msg)
520          jobDestination = []
521 <        common.logger.debug(5,'Splitting per job')
522 <        common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
521 >        common.logger.debug('Splitting per job')
522 >        common.logger.info('Required '+str(self.theNumberOfJobs)+' jobs in total ')
523  
524          self.total_number_of_jobs = self.theNumberOfJobs
525  
526 <        common.logger.debug(5,'N jobs  '+str(self.total_number_of_jobs))
526 >        common.logger.debug('N jobs  '+str(self.total_number_of_jobs))
527  
528 <        common.logger.message(str(self.total_number_of_jobs)+' jobs can be created')
528 >        common.logger.info(str(self.total_number_of_jobs)+' jobs can be created')
529  
530          # argument is seed number.$i
531 <        self.list_of_args = []
531 >        #self.list_of_args = []
532          for i in range(self.total_number_of_jobs):
533              jobDestination.append([""])
534 <            self.list_of_args.append([str(i)])
534 >        #   self.list_of_args.append([str(i)])
535  
536         # prepare dict output
537          dictOut = {}
538 <        dictOut['args'] = self.list_of_args
538 >        dictOut['args'] = [] # self.list_of_args
539          dictOut['jobDestination'] = jobDestination
540          dictOut['njobs']=self.total_number_of_jobs
541          return dictOut

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines