ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Splitter.py
(Generate patch)

Comparing COMP/CRAB/python/Splitter.py (file contents):
Revision 1.7 by spiga, Wed Feb 11 22:13:03 2009 UTC vs.
Revision 1.21 by spiga, Thu Jun 11 09:30:19 2009 UTC

# Line 1 | Line 1
1   import common
2 from crab_logger import Logger
2   from crab_exceptions import *
3   from crab_util import *
4   from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
# Line 10 | Line 9 | class JobSplitter:
9          self.args=args
10          #self.maxEvents
11          # init BlackWhiteListParser
12 <        seWhiteList = cfg_params.get('EDG.se_white_list',[])
13 <        seBlackList = cfg_params.get('EDG.se_black_list',[])
14 <        self.blackWhiteListParser = SEBlackWhiteListParser(seWhiteList, seBlackList, common.logger)
12 >        seWhiteList = cfg_params.get('GRID.se_white_list',[])
13 >        seBlackList = cfg_params.get('GRID.se_black_list',[])
14 >        self.blackWhiteListParser = SEBlackWhiteListParser(seWhiteList, seBlackList, common.logger())
15  
16  
17      def checkUserSettings(self):
# Line 92 | Line 91 | class JobSplitter:
91          # If user requested more events than are in the dataset
92          elif (totalEventsRequested > self.maxEvents):
93              eventsRemaining = self.maxEvents
94 <            common.logger.message("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
94 >            common.logger.info("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
95          # If user requested less events than are in the dataset
96          else:
97              eventsRemaining = totalEventsRequested
# Line 108 | Line 107 | class JobSplitter:
107              eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs)
108  
109          if (self.selectNumberOfJobs):
110 <            common.logger.message("May not create the exact number_of_jobs requested.")
110 >            common.logger.info("May not create the exact number_of_jobs requested.")
111  
112          # old... to remove Daniele
113          totalNumberOfJobs = 999999999
# Line 125 | Line 124 | class JobSplitter:
124          jobsOfBlock = {}
125  
126          parString = ""
127 +        pString = ""
128          filesEventCount = 0
129 +        msg=''
130  
131          # ---- Iterate over the blocks in the dataset until ---- #
132          # ---- we've met the requested total # of events    ---- #
# Line 137 | Line 138 | class JobSplitter:
138  
139              if self.eventsbyblock.has_key(block) :
140                  numEventsInBlock = self.eventsbyblock[block]
141 <                common.logger.debug(5,'Events in Block File '+str(numEventsInBlock))
141 >                common.logger.debug('Events in Block File '+str(numEventsInBlock))
142  
143                  files = filesbyblock[block]
144                  numFilesInBlock = len(files)
# Line 147 | Line 148 | class JobSplitter:
148                  if noBboundary == 0: # DD
149                      # ---- New block => New job ---- #
150                      parString = ""
151 +                    pString=""
152                      # counter for number of events in files currently worked on
153                      filesEventCount = 0
154                  # flag if next while loop should touch new file
# Line 156 | Line 158 | class JobSplitter:
158  
159                  # ---- Iterate over the files in the block until we've met the requested ---- #
160                  # ---- total # of events or we've gone over all the files in this block  ---- #
161 <                pString=''
161 >                msg='\n'
162                  while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
163                      file = files[fileCount]
164                      if self.useParent==1:
165                          parent = self.parentFiles[file]
166 <                        for f in parent :
165 <                            pString += '\\\"' + f + '\\\"\,'
166 <                        common.logger.debug(6, "File "+str(file)+" has the following parents: "+str(parent))
167 <                        common.logger.write("File "+str(file)+" has the following parents: "+str(parent))
166 >                        common.logger.log(10-1, "File "+str(file)+" has the following parents: "+str(parent))
167                      if newFile :
168                          try:
169                              numEventsInFile = self.eventsbyfile[file]
170 <                            common.logger.debug(6, "File "+str(file)+" has "+str(numEventsInFile)+" events")
170 >                            common.logger.log(10-1, "File "+str(file)+" has "+str(numEventsInFile)+" events")
171                              # increase filesEventCount
172                              filesEventCount += numEventsInFile
173                              # Add file to current job
174 <                            parString += '\\\"' + file + '\\\"\,'
174 >                            parString +=  file + ','
175 >                            if self.useParent==1:
176 >                                for f in parent :
177 >                                    pString += f  + ','
178                              newFile = 0
179                          except KeyError:
180 <                            common.logger.message("File "+str(file)+" has unknown number of events: skipping")
180 >                            common.logger.info("File "+str(file)+" has unknown number of events: skipping")
181  
182                      eventsPerJobRequested = min(eventsPerJobRequested, eventsRemaining)
183                      # if less events in file remain than eventsPerJobRequested
# Line 188 | Line 190 | class JobSplitter:
190                              if ( fileCount == numFilesInBlock-1 ) :
191                                  # end job using last file, use remaining events in block
192                                  # close job and touch new file
193 <                                fullString = parString[:-2]
193 >                                fullString = parString[:-1]
194                                  if self.useParent==1:
195 <                                    fullParentString = pString[:-2]
195 >                                    fullParentString = pString[:-1]
196                                      list_of_lists.append([fullString,fullParentString,str(-1),str(jobSkipEventCount)])
197                                  else:
198                                      list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
199 <                                common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(filesEventCount - jobSkipEventCount)+" events (last file in block).")
199 >                                msg += "Job %s can run over %s  events (last file in block).\n"%(str(jobCount+1), str(filesEventCount - jobSkipEventCount))
200                                  jobDestination.append(blockSites[block])
201 <                                common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(jobDestination[jobCount]))
201 >                                msg += "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
202                                  # fill jobs of block dictionary
203                                  jobsOfBlock[block].append(jobCount+1)
204                                  # reset counter
# Line 217 | Line 219 | class JobSplitter:
219                      # if events in file equal to eventsPerJobRequested
220                      elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
221                          # close job and touch new file
222 <                        fullString = parString[:-2]
222 >                        fullString = parString[:-1]
223                          if self.useParent==1:
224 <                            fullParentString = pString[:-2]
224 >                            fullParentString = pString[:-1]
225                              list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
226                          else:
227                              list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
228 <                        common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
228 >                        msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
229                          jobDestination.append(blockSites[block])
230 <                        common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(jobDestination[jobCount]))
230 >                        msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
231                          jobsOfBlock[block].append(jobCount+1)
232                          # reset counter
233                          jobCount = jobCount + 1
# Line 242 | Line 244 | class JobSplitter:
244                      # if more events in file remain than eventsPerJobRequested
245                      else :
246                          # close job but don't touch new file
247 <                        fullString = parString[:-2]
247 >                        fullString = parString[:-1]
248                          if self.useParent==1:
249 <                            fullParentString = pString[:-2]
249 >                            fullParentString = pString[:-1]
250                              list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
251                          else:
252                              list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
253 <                        common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
253 >                        msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
254                          jobDestination.append(blockSites[block])
255 <                        common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(jobDestination[jobCount]))
255 >                        msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
256                          jobsOfBlock[block].append(jobCount+1)
257                          # increase counter
258                          jobCount = jobCount + 1
# Line 261 | Line 263 | class JobSplitter:
263                          jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
264                          # remove all but the last file
265                          filesEventCount = self.eventsbyfile[file]
266 +                        pString_tmp=''
267                          if self.useParent==1:
268 <                            for f in parent : pString += '\\\"' + f + '\\\"\,'
269 <                        parString = '\\\"' + file + '\\\"\,'
268 >                            for f in parent : pString_tmp +=  f + ','
269 >                        pString =  pString_tmp
270 >                        parString =  file + ','
271                      pass # END if
272                  pass # END while (iterate over files in the block)
273          pass # END while (iterate over blocks in the dataset)
274 +        common.logger.debug(msg)
275          self.ncjobs = self.total_number_of_jobs = jobCount
276          if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
277 <            common.logger.message("Could not run on all requested events because some blocks not hosted at allowed sites.")
278 <        common.logger.message(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
277 >            common.logger.info("Could not run on all requested events because some blocks not hosted at allowed sites.")
278 >        common.logger.info(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
279  
280          # skip check on  block with no sites  DD
281          if noBboundary == 0 : self.checkBlockNoSite(blocks,jobsOfBlock,blockSites)
282  
283         # prepare dict output
284          dictOut = {}
285 +        dictOut['params']= ['InputFiles','MaxEvents','SkipEvents']
286 +        if self.useParent: dictOut['params']= ['InputFiles','ParentFiles','MaxEvents','SkipEvents']
287          dictOut['args'] = list_of_lists
288          dictOut['jobDestination'] = jobDestination
289          dictOut['njobs']=self.total_number_of_jobs
# Line 290 | Line 297 | class JobSplitter:
297          screenOutput = "List of jobs and available destination sites:\n\n"
298          noSiteBlock = []
299          bloskNoSite = []
300 +        allBlock = []
301  
302          blockCounter = 0
303          for block in blocks:
304              if block in jobsOfBlock.keys() :
305                  blockCounter += 1
306 +                allBlock.append( blockCounter )
307 +                sites=self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],[block]),[block])
308                  screenOutput += "Block %5i: jobs %20s: sites: %s\n" % (blockCounter,spanRanges(jobsOfBlock[block]),
309 <                    ','.join(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],[block]),[block])))
310 <                if len(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],[block]),[block])) == 0:
309 >                    ', '.join(SE2CMS(sites)))
310 >                if len(sites) == 0:
311                      noSiteBlock.append( spanRanges(jobsOfBlock[block]) )
312                      bloskNoSite.append( blockCounter )
313  
314 <        common.logger.message(screenOutput)
314 >        common.logger.info(screenOutput)
315          if len(noSiteBlock) > 0 and len(bloskNoSite) > 0:
316              msg = 'WARNING: No sites are hosting any part of data for block:\n                '
317              virgola = ""
# Line 316 | Line 326 | class JobSplitter:
326              for range_jobs in noSiteBlock:
327                  msg += str(range_jobs) + virgola
328              msg += '\n               will not be submitted and this block of data can not be analyzed!\n'
329 <            if self.cfg_params.has_key('EDG.se_white_list'):
330 <                msg += 'WARNING: SE White List: '+self.cfg_params['EDG.se_white_list']+'\n'
329 >            if self.cfg_params.has_key('GRID.se_white_list'):
330 >                msg += 'WARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
331                  msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
332                  msg += 'Please check if the dataset is available at this site!)\n'
333 <            if self.cfg_params.has_key('EDG.ce_white_list'):
334 <                msg += 'WARNING: CE White List: '+self.cfg_params['EDG.ce_white_list']+'\n'
333 >            if self.cfg_params.has_key('GRID.ce_white_list'):
334 >                msg += 'WARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
335                  msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
336                  msg += 'Please check if the dataset is available at this site!)\n'
337  
338 <            common.logger.message(msg)
338 >            common.logger.info(msg)
339 >
340 >        if bloskNoSite == allBlock:
341 >            raise CrabException('No jobs created')
342  
343          return
344  
# Line 354 | Line 367 | class JobSplitter:
367          thefiles = Fileset(name='FilesToSplit')
368          fileList = pubdata.getListFiles()
369          for f in fileList:
357           # print f
370              block = f['Block']['Name']
359          #  if not blocks.has_key(block):
360          #      blocks[block] = reader.listFileBlockLocation(block)
371              try:
372                  f['Block']['StorageElementList'].extend(blockSites[block])
373              except:
# Line 386 | Line 396 | class JobSplitter:
396          set = Set(runList)
397          list_of_lists = []
398          jobDestination = []
389
399          count = 0
400 <        for i in list(set):
400 >        for jobGroup in  jobfactory():
401              if count <  self.theNumberOfJobs:
402 <                res = self.getJobInfo(jobfactory())
402 >                res = self.getJobInfo(jobGroup)
403                  parString = ''
404                  for file in res['lfns']:
405 <                    parString += '\\\"' + file + '\\\"\,'
406 <                fullString = parString[:-2]
405 >                    parString += file + ','
406 >                fullString = parString[:-1]
407                  list_of_lists.append([fullString,str(-1),str(0)])    
408                  #need to check single file location
409                  jobDestination.append(res['locations'])  
410                  count +=1
402        #print jobDestination
411         # prepare dict output
412          dictOut = {}
413 +        dictOut['params']= ['InputFiles','MaxEvents','SkipEvents']
414          dictOut['args'] = list_of_lists
415          dictOut['jobDestination'] = jobDestination
416          dictOut['njobs']=count
# Line 419 | Line 428 | class JobSplitter:
428                  for loc in file['locations']:
429                      if tmp_check < 1 :
430                          locations.append(loc)
431 <                    tmp_check = tmp_check + 1
431 >                tmp_check = tmp_check + 1
432                  ### qui va messo il check per la locations
433          res['lfns'] = lfns
434          res['locations'] = locations
# Line 430 | Line 439 | class JobSplitter:
439          """
440          Perform job splitting based on number of event per job
441          """
442 <        common.logger.debug(5,'Splitting per events')
442 >        common.logger.debug('Splitting per events')
443          self.checkUserSettings()
444          jobDestination=[]
445          if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
# Line 442 | Line 451 | class JobSplitter:
451          firstRun = self.cfg_params.get('CMSSW.first_run',None)
452  
453          if (self.selectEventsPerJob):
454 <            common.logger.message('Required '+str(self.eventsPerJob)+' events per job ')
454 >            common.logger.info('Required '+str(self.eventsPerJob)+' events per job ')
455          if (self.selectNumberOfJobs):
456 <            common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
456 >            common.logger.info('Required '+str(self.theNumberOfJobs)+' jobs in total ')
457          if (self.selectTotalNumberEvents):
458 <            common.logger.message('Required '+str(self.total_number_of_events)+' events in total ')
458 >            common.logger.info('Required '+str(self.total_number_of_events)+' events in total ')
459  
460          if (self.total_number_of_events < 0):
461              msg='Cannot split jobs per Events with "-1" as total number of events'
# Line 463 | Line 472 | class JobSplitter:
472              self.total_number_of_jobs = self.theNumberOfJobs
473              self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
474  
475 <        common.logger.debug(5,'N jobs  '+str(self.total_number_of_jobs))
475 >        common.logger.debug('N jobs  '+str(self.total_number_of_jobs))
476  
477          # is there any remainder?
478          check = int(self.total_number_of_events) - (int(self.total_number_of_jobs)*self.eventsPerJob)
479  
480 <        common.logger.debug(5,'Check  '+str(check))
480 >        common.logger.debug('Check  '+str(check))
481  
482 <        common.logger.message(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
482 >        common.logger.info(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
483          if check > 0:
484 <            common.logger.message('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
484 >            common.logger.info('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
485  
486          # argument is seed number.$i
487          self.list_of_args = []
# Line 492 | Line 501 | class JobSplitter:
501              args.append(str(self.eventsPerJob))
502              self.list_of_args.append(args)
503         # prepare dict output
504 +
505          dictOut = {}
506 +        dictOut['params'] = ['MaxEvents']
507 +        if (firstRun):
508 +            dictOut['params'] = ['FirstRun','MaxEvents']
509 +            if ( generator in managedGenerators ) : dictOut['params'] = ['FirstRun', 'FirstEvent', 'MaxEvents']
510 +        else:  
511 +            if (generator in managedGenerators) : dictOut['params'] = ['FirstEvent', 'MaxEvents']
512          dictOut['args'] = self.list_of_args
513          dictOut['jobDestination'] = jobDestination
514          dictOut['njobs']=self.total_number_of_jobs
# Line 509 | Line 525 | class JobSplitter:
525              msg = 'must specify  number_of_jobs.'
526              raise crabexception(msg)
527          jobDestination = []
528 <        common.logger.debug(5,'Splitting per job')
529 <        common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
528 >        common.logger.debug('Splitting per job')
529 >        common.logger.info('Required '+str(self.theNumberOfJobs)+' jobs in total ')
530  
531          self.total_number_of_jobs = self.theNumberOfJobs
532  
533 <        common.logger.debug(5,'N jobs  '+str(self.total_number_of_jobs))
533 >        common.logger.debug('N jobs  '+str(self.total_number_of_jobs))
534  
535 <        common.logger.message(str(self.total_number_of_jobs)+' jobs can be created')
535 >        common.logger.info(str(self.total_number_of_jobs)+' jobs can be created')
536  
537          # argument is seed number.$i
538 <        self.list_of_args = []
538 >        #self.list_of_args = []
539          for i in range(self.total_number_of_jobs):
540              jobDestination.append([""])
541 <            self.list_of_args.append([str(i)])
541 >        #   self.list_of_args.append([str(i)])
542  
543         # prepare dict output
544          dictOut = {}
545 <        dictOut['args'] = self.list_of_args
545 >        dictOut['args'] = [] # self.list_of_args
546          dictOut['jobDestination'] = jobDestination
547          dictOut['njobs']=self.total_number_of_jobs
548          return dictOut

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines