ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Splitter.py
(Generate patch)

Comparing COMP/CRAB/python/Splitter.py (file contents):
Revision 1.13 by spiga, Tue May 26 10:23:01 2009 UTC vs.
Revision 1.21 by spiga, Thu Jun 11 09:30:19 2009 UTC

# Line 9 | Line 9 | class JobSplitter:
9          self.args=args
10          #self.maxEvents
11          # init BlackWhiteListParser
12 <        seWhiteList = cfg_params.get('EDG.se_white_list',[])
13 <        seBlackList = cfg_params.get('EDG.se_black_list',[])
14 <        self.blackWhiteListParser = SEBlackWhiteListParser(seWhiteList, seBlackList, common.logger)
12 >        seWhiteList = cfg_params.get('GRID.se_white_list',[])
13 >        seBlackList = cfg_params.get('GRID.se_black_list',[])
14 >        self.blackWhiteListParser = SEBlackWhiteListParser(seWhiteList, seBlackList, common.logger())
15  
16  
17      def checkUserSettings(self):
# Line 124 | Line 124 | class JobSplitter:
124          jobsOfBlock = {}
125  
126          parString = ""
127 +        pString = ""
128          filesEventCount = 0
129 +        msg=''
130  
131          # ---- Iterate over the blocks in the dataset until ---- #
132          # ---- we've met the requested total # of events    ---- #
# Line 146 | Line 148 | class JobSplitter:
148                  if noBboundary == 0: # DD
149                      # ---- New block => New job ---- #
150                      parString = ""
151 +                    pString=""
152                      # counter for number of events in files currently worked on
153                      filesEventCount = 0
154                  # flag if next while loop should touch new file
# Line 155 | Line 158 | class JobSplitter:
158  
159                  # ---- Iterate over the files in the block until we've met the requested ---- #
160                  # ---- total # of events or we've gone over all the files in this block  ---- #
161 <                pString=''
161 >                msg='\n'
162                  while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
163                      file = files[fileCount]
164                      if self.useParent==1:
165                          parent = self.parentFiles[file]
163                        for f in parent :
164                            pString +=  f + ','
166                          common.logger.log(10-1, "File "+str(file)+" has the following parents: "+str(parent))
167                      if newFile :
168                          try:
# Line 171 | Line 172 | class JobSplitter:
172                              filesEventCount += numEventsInFile
173                              # Add file to current job
174                              parString +=  file + ','
175 +                            if self.useParent==1:
176 +                                for f in parent :
177 +                                    pString += f  + ','
178                              newFile = 0
179                          except KeyError:
180                              common.logger.info("File "+str(file)+" has unknown number of events: skipping")
# Line 192 | Line 196 | class JobSplitter:
196                                      list_of_lists.append([fullString,fullParentString,str(-1),str(jobSkipEventCount)])
197                                  else:
198                                      list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
199 <                                common.logger.debug("Job "+str(jobCount+1)+" can run over "+str(filesEventCount - jobSkipEventCount)+" events (last file in block).")
199 >                                msg += "Job %s can run over %s  events (last file in block).\n"%(str(jobCount+1), str(filesEventCount - jobSkipEventCount))
200                                  jobDestination.append(blockSites[block])
201 <                                common.logger.debug("Job "+str(jobCount+1)+" Destination: "+str(jobDestination[jobCount]))
201 >                                msg += "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
202                                  # fill jobs of block dictionary
203                                  jobsOfBlock[block].append(jobCount+1)
204                                  # reset counter
# Line 221 | Line 225 | class JobSplitter:
225                              list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
226                          else:
227                              list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
228 <                        common.logger.debug("Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
228 >                        msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
229                          jobDestination.append(blockSites[block])
230 <                        common.logger.debug("Job "+str(jobCount+1)+" Destination: "+str(jobDestination[jobCount]))
230 >                        msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
231                          jobsOfBlock[block].append(jobCount+1)
232                          # reset counter
233                          jobCount = jobCount + 1
# Line 246 | Line 250 | class JobSplitter:
250                              list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
251                          else:
252                              list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
253 <                        common.logger.debug("Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
253 >                        msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
254                          jobDestination.append(blockSites[block])
255 <                        common.logger.debug("Job "+str(jobCount+1)+" Destination: "+str(jobDestination[jobCount]))
255 >                        msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
256                          jobsOfBlock[block].append(jobCount+1)
257                          # increase counter
258                          jobCount = jobCount + 1
# Line 259 | Line 263 | class JobSplitter:
263                          jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
264                          # remove all but the last file
265                          filesEventCount = self.eventsbyfile[file]
266 +                        pString_tmp=''
267                          if self.useParent==1:
268 <                            for f in parent : pString +=  f + ','
268 >                            for f in parent : pString_tmp +=  f + ','
269 >                        pString =  pString_tmp
270                          parString =  file + ','
271                      pass # END if
272                  pass # END while (iterate over files in the block)
273          pass # END while (iterate over blocks in the dataset)
274 +        common.logger.debug(msg)
275          self.ncjobs = self.total_number_of_jobs = jobCount
276          if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
277              common.logger.info("Could not run on all requested events because some blocks not hosted at allowed sites.")
# Line 297 | Line 304 | class JobSplitter:
304              if block in jobsOfBlock.keys() :
305                  blockCounter += 1
306                  allBlock.append( blockCounter )
307 +                sites=self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],[block]),[block])
308                  screenOutput += "Block %5i: jobs %20s: sites: %s\n" % (blockCounter,spanRanges(jobsOfBlock[block]),
309 <                    ','.join(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],[block]),[block])))
310 <                if len(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],[block]),[block])) == 0:
309 >                    ', '.join(SE2CMS(sites)))
310 >                if len(sites) == 0:
311                      noSiteBlock.append( spanRanges(jobsOfBlock[block]) )
312                      bloskNoSite.append( blockCounter )
313  
# Line 318 | Line 326 | class JobSplitter:
326              for range_jobs in noSiteBlock:
327                  msg += str(range_jobs) + virgola
328              msg += '\n               will not be submitted and this block of data can not be analyzed!\n'
329 <            if self.cfg_params.has_key('EDG.se_white_list'):
330 <                msg += 'WARNING: SE White List: '+self.cfg_params['EDG.se_white_list']+'\n'
329 >            if self.cfg_params.has_key('GRID.se_white_list'):
330 >                msg += 'WARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
331                  msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
332                  msg += 'Please check if the dataset is available at this site!)\n'
333 <            if self.cfg_params.has_key('EDG.ce_white_list'):
334 <                msg += 'WARNING: CE White List: '+self.cfg_params['EDG.ce_white_list']+'\n'
333 >            if self.cfg_params.has_key('GRID.ce_white_list'):
334 >                msg += 'WARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
335                  msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
336                  msg += 'Please check if the dataset is available at this site!)\n'
337  
# Line 388 | Line 396 | class JobSplitter:
396          set = Set(runList)
397          list_of_lists = []
398          jobDestination = []
391
399          count = 0
400 <        for i in list(set):
400 >        for jobGroup in  jobfactory():
401              if count <  self.theNumberOfJobs:
402 <                res = self.getJobInfo(jobfactory())
402 >                res = self.getJobInfo(jobGroup)
403                  parString = ''
404                  for file in res['lfns']:
405                      parString += file + ','

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines