ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cms_cmssw.py
(Generate patch)

Comparing COMP/CRAB/python/cms_cmssw.py (file contents):
Revision 1.27 by spiga, Thu Jul 6 17:50:33 2006 UTC vs.
Revision 1.41 by slacapra, Wed Sep 20 17:29:52 2006 UTC

# Line 6 | Line 6 | import math
6   import common
7   import PsetManipulator  
8  
9 < import DBSInfo_EDM
10 < import DataDiscovery_EDM
11 < import DataLocation_EDM
9 > import DBSInfo
10 > import DataDiscovery
11 > import DataLocation
12   import Scram
13  
14   import os, string, re
15  
16   class Cmssw(JobType):
17 <    def __init__(self, cfg_params):
17 >    def __init__(self, cfg_params, ncjobs):
18          JobType.__init__(self, 'CMSSW')
19          common.logger.debug(3,'CMSSW::__init__')
20  
# Line 22 | Line 22 | class Cmssw(JobType):
22          # Marco.
23          self._params = {}
24          self.cfg_params = cfg_params
25 +
26 +        # number of jobs requested to be created, limit ojb splitting
27 +        self.ncjobs = ncjobs
28 +
29          log = common.logger
30          
31          self.scram = Scram.Scram(cfg_params)
# Line 123 | Line 127 | class Cmssw(JobType):
127                    
128          ## additional input files
129          try:
130 <            tmpAddFiles = string.split(cfg_params['CMSSW.additional_input_files'],',')
130 >            tmpAddFiles = string.split(cfg_params['USER.additional_input_files'],',')
131              for tmp in tmpAddFiles:
132                  if not os.path.exists(tmp):
133                      raise CrabException("Additional input file not found: "+tmp)
134 <                tmp=string.strip(tmp)
131 <                self.additional_inbox_files.append(tmp)
134 >                self.additional_inbox_files.append(string.strip(tmp))
135                  pass
136              pass
137          except KeyError:
# Line 136 | Line 139 | class Cmssw(JobType):
139  
140          # files per job
141          try:
142 <            self.filesPerJob = int(cfg_params['CMSSW.files_per_jobs']) #Daniele
143 <            self.selectFilesPerJob = 1
142 >            if (cfg_params['CMSSW.files_per_jobs']):
143 >                raise CrabException("files_per_jobs no longer supported.  Quitting.")
144          except KeyError:
145 <            self.filesPerJob = 0
143 <            self.selectFilesPerJob = 0
145 >            pass
146  
147          ## Events per job
148          try:
# Line 158 | Line 160 | class Cmssw(JobType):
160              self.theNumberOfJobs = 0
161              self.selectNumberOfJobs = 0
162  
161        ## source seed for pythia
163          try:
164 <            self.sourceSeed = int(cfg_params['CMSSW.pythia_seed'])
164 >            self.total_number_of_events = int(cfg_params['CMSSW.total_number_of_events'])
165 >            self.selectTotalNumberEvents = 1
166          except KeyError:
167 <            self.sourceSeed = None
168 <            common.logger.debug(5,"No seed given")
167 >            self.total_number_of_events = 0
168 >            self.selectTotalNumberEvents = 0
169  
170 <        if not (self.selectFilesPerJob + self.selectEventsPerJob + self.selectNumberOfJobs == 1 ):
171 <            msg = 'Must define either files_per_jobs or events_per_job or number_of_jobs'
170 >        if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
171 >            msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
172              raise CrabException(msg)
173  
174 +        ## source seed for pythia
175          try:
176 <            self.total_number_of_events = int(cfg_params['CMSSW.total_number_of_events'])
174 <        except KeyError:
175 <            msg = 'Must define total_number_of_events'
176 <            raise CrabException(msg)
177 <        
178 <        CEBlackList = []
179 <        try:
180 <            tmpBad = string.split(cfg_params['EDG.ce_black_list'],',')
181 <            for tmp in tmpBad:
182 <                tmp=string.strip(tmp)
183 <                CEBlackList.append(tmp)
176 >            self.sourceSeed = int(cfg_params['CMSSW.pythia_seed'])
177          except KeyError:
178 <            pass
179 <
187 <        self.reCEBlackList=[]
188 <        for bad in CEBlackList:
189 <            self.reCEBlackList.append(re.compile( bad ))
190 <
191 <        common.logger.debug(5,'CEBlackList: '+str(CEBlackList))
178 >            self.sourceSeed = None
179 >            common.logger.debug(5,"No seed given")
180  
193        CEWhiteList = []
181          try:
182 <            tmpGood = string.split(cfg_params['EDG.ce_white_list'],',')
196 <            for tmp in tmpGood:
197 <                tmp=string.strip(tmp)
198 <                CEWhiteList.append(tmp)
182 >            self.sourceSeedVtx = int(cfg_params['CMSSW.vtx_seed'])
183          except KeyError:
184 <            pass
185 <
202 <        #print 'CEWhiteList: ',CEWhiteList
203 <        self.reCEWhiteList=[]
204 <        for Good in CEWhiteList:
205 <            self.reCEWhiteList.append(re.compile( Good ))
206 <
207 <        common.logger.debug(5,'CEWhiteList: '+str(CEWhiteList))
184 >            self.sourceSeedVtx = None
185 >            common.logger.debug(5,"No vertex seed given")
186  
187          self.PsetEdit = PsetManipulator.PsetManipulator(self.pset) #Daniele Pset
188  
# Line 212 | Line 190 | class Cmssw(JobType):
190          ## Initialize the variables that are extracted from DBS/DLS and needed in other places of the code
191          self.maxEvents=0  # max events available   ( --> check the requested nb. of evts in Creator.py)
192          self.DBSPaths={}  # all dbs paths requested ( --> input to the site local discovery script)
193 +        self.jobDestination=[]  # Site destination(s) for each job (list of lists)
194          ## Perform the data location and discovery (based on DBS/DLS)
195          ## SL: Don't if NONE is specified as input (pythia use case)
196 <        common.analisys_common_info['sites']=None
196 >        blockSites = {}
197          if self.datasetPath:
198 <            self.DataDiscoveryAndLocation(cfg_params)
198 >            blockSites = self.DataDiscoveryAndLocation(cfg_params)
199          #DBSDLS-end          
200  
201          self.tgzNameWithPath = self.getTarBall(self.executable)
202      
203          ## Select Splitting
204          if self.selectNoInput: self.jobSplittingNoInput()
205 <        elif self.selectFilesPerJob or self.selectEventsPerJob or self.selectNumberOfJobs: self.jobSplittingPerFiles()
227 <        else:
228 <            msg = 'Don\'t know how to split...'
229 <            raise CrabException(msg)
205 >        else: self.jobSplittingByBlocks(blockSites)
206  
207          # modify Pset
208          try:
209              if (self.datasetPath): # standard job
210 <                #self.PsetEdit.maxEvent(self.eventsPerJob)
235 <                # always process all events in a file
236 <                self.PsetEdit.maxEvent("-1")
210 >                # allow to processa a fraction of events in a file
211                  self.PsetEdit.inputModule("INPUT")
212 +                self.PsetEdit.maxEvent("INPUTMAXEVENTS")
213 +                self.PsetEdit.skipEvent("INPUTSKIPEVENTS")
214  
215              else:  # pythia like job
216                  self.PsetEdit.maxEvent(self.eventsPerJob)
217                  if (self.sourceSeed) :
218 <                    self.PsetEdit.pythiaSeed("INPUT","INPUTVTX")
218 >                    self.PsetEdit.pythiaSeed("INPUT")
219 >                    if (self.sourceSeedVtx) :
220 >                        self.PsetEdit.pythiaSeedVtx("INPUTVTX")
221              self.PsetEdit.psetWriter(self.configFilename())
222          except:
223              msg='Error while manipuliating ParameterSet: exiting...'
# Line 256 | Line 234 | class Cmssw(JobType):
234          dataTiers = dataTiersList.split(',')
235  
236          ## Contact the DBS
237 +        common.logger.message("Contacting DBS...")
238          try:
239 <            self.pubdata=DataDiscovery_EDM.DataDiscovery_EDM(datasetPath, dataTiers, cfg_params)
239 >            self.pubdata=DataDiscovery.DataDiscovery(datasetPath, dataTiers, cfg_params)
240              self.pubdata.fetchDBSInfo()
241  
242 <        except DataDiscovery_EDM.NotExistingDatasetError, ex :
242 >        except DataDiscovery.NotExistingDatasetError, ex :
243              msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
244              raise CrabException(msg)
245  
246 <        except DataDiscovery_EDM.NoDataTierinProvenanceError, ex :
246 >        except DataDiscovery.NoDataTierinProvenanceError, ex :
247              msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
248              raise CrabException(msg)
249 <        except DataDiscovery_EDM.DataDiscoveryError, ex:
249 >        except DataDiscovery.DataDiscoveryError, ex:
250              msg = 'ERROR ***: failed Data Discovery in DBS  %s'%ex.getErrorMessage()
251              raise CrabException(msg)
252  
# Line 275 | Line 254 | class Cmssw(JobType):
254          ## self.DBSPaths=self.pubdata.getDBSPaths()
255          common.logger.message("Required data are :"+self.datasetPath)
256  
257 <        filesbyblock=self.pubdata.getFiles()
258 < #        print filesbyblock
259 <        self.AllInputFiles=filesbyblock.values()
260 <        self.files = self.AllInputFiles        
257 >        self.filesbyblock=self.pubdata.getFiles()
258 >        self.eventsbyblock=self.pubdata.getEventsPerBlock()
259 >        self.eventsbyfile=self.pubdata.getEventsPerFile()
260 >        # print str(self.filesbyblock)
261 >        # print 'self.eventsbyfile',len(self.eventsbyfile)
262 >        # print str(self.eventsbyfile)
263  
264          ## get max number of events
284        #common.logger.debug(10,"number of events for primary fileblocks %i"%self.pubdata.getMaxEvents())
265          self.maxEvents=self.pubdata.getMaxEvents() ##  self.maxEvents used in Creator.py
266          common.logger.message("\nThe number of available events is %s"%self.maxEvents)
267  
268 +        common.logger.message("Contacting DLS...")
269          ## Contact the DLS and build a list of sites hosting the fileblocks
270          try:
271 <            dataloc=DataLocation_EDM.DataLocation_EDM(filesbyblock.keys(),cfg_params)
271 >            dataloc=DataLocation.DataLocation(self.filesbyblock.keys(),cfg_params)
272              dataloc.fetchDLSInfo()
273 <        except DataLocation_EDM.DataLocationError , ex:
273 >        except DataLocation.DataLocationError , ex:
274              msg = 'ERROR ***: failed Data Location in DLS \n %s '%ex.getErrorMessage()
275              raise CrabException(msg)
276          
296        allsites=dataloc.getSites()
297        common.logger.debug(5,"sites are %s"%allsites)
298        sites=self.checkBlackList(allsites)
299        common.logger.debug(5,"sites are (after black list) %s"%sites)
300        sites=self.checkWhiteList(sites)
301        common.logger.debug(5,"sites are (after white list) %s"%sites)
277  
278 <        if len(sites)==0:
279 <            msg = 'No sites hosting all the needed data! Exiting... '
280 <            raise CrabException(msg)
278 >        sites = dataloc.getSites()
279 >        allSites = []
280 >        listSites = sites.values()
281 >        for list in listSites:
282 >            for oneSite in list:
283 >                allSites.append(oneSite)
284 >        allSites = self.uniquelist(allSites)
285  
286 <        common.logger.message("List of Sites ("+str(len(sites))+") hosting the data : "+str(sites))
287 <        common.logger.debug(6, "List of Sites: "+str(sites))
288 <        common.analisys_common_info['sites']=sites    ## used in SchedulerEdg.py in createSchScript
310 <        self.setParam_('TargetCE', ','.join(sites))
311 <        return
286 >        common.logger.message("Sites ("+str(len(allSites))+") hosting part/all of dataset: "+str(allSites))
287 >        common.logger.debug(6, "List of Sites: "+str(allSites))
288 >        return sites
289      
290 <    def jobSplittingPerFiles(self):
314 <        """
315 <        Perform job splitting based on number of files to be accessed per job
290 >    def jobSplittingByBlocks(self, blockSites):
291          """
292 <        common.logger.debug(5,'Splitting per input files')
293 <        common.logger.message('Required '+str(self.total_number_of_events)+' events in total ')
294 <        common.logger.message('Available '+str(self.maxEvents)+' events in total ')
295 <        common.logger.message('Required '+str(self.filesPerJob)+' files per job ')
296 <        common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
297 <        common.logger.message('Required '+str(self.eventsPerJob)+' events per job')
298 <
299 <        ## if asked to process all events, do it
300 <        if self.total_number_of_events == -1:
301 <            self.total_number_of_events=self.maxEvents
292 >        Perform job splitting. Jobs run over an integer number of files
293 >        and no more than one block.
294 >        ARGUMENT: blockSites: dictionary with blocks as keys and list of host sites as values
295 >        REQUIRES: self.selectTotalNumberEvents, self.selectEventsPerJob, self.selectNumberofJobs,
296 >                  self.total_number_of_events, self.eventsPerJob, self.theNumberOfJobs,
297 >                  self.maxEvents, self.filesbyblock
298 >        SETS: self.jobDestination - Site destination(s) for each job (a list of lists)
299 >              self.total_number_of_jobs - Total # of jobs
300 >              self.list_of_args - File(s) job will run on (a list of lists)
301 >        """
302 >
303 >        # ---- Handle the possible job splitting configurations ---- #
304 >        if (self.selectTotalNumberEvents):
305 >            totalEventsRequested = self.total_number_of_events
306 >        if (self.selectEventsPerJob):
307 >            eventsPerJobRequested = self.eventsPerJob
308 >            if (self.selectNumberOfJobs):
309 >                totalEventsRequested = self.theNumberOfJobs * self.eventsPerJob
310 >
311 >        # If user requested all the events in the dataset
312 >        if (totalEventsRequested == -1):
313 >            eventsRemaining=self.maxEvents
314 >        # If user requested more events than are in the dataset
315 >        elif (totalEventsRequested > self.maxEvents):
316 >            eventsRemaining = self.maxEvents
317 >            common.logger.message("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
318 >        # If user requested less events than are in the dataset
319          else:
320 <            if self.total_number_of_events>self.maxEvents:
329 <                common.logger.message("Asked "+str(self.total_number_of_events)+" but only "+str(self.maxEvents)+" available.")
330 <                self.total_number_of_events=self.maxEvents
331 <            pass
320 >            eventsRemaining = totalEventsRequested
321  
322 <        ## TODO: SL need to have (from DBS) a detailed list of how many events per each file
323 <        n_tot_files = (len(self.files[0]))
324 <        ## SL: this is wrong if the files have different number of events
336 <        evPerFile = int(self.maxEvents)/n_tot_files
337 <
338 <        common.logger.debug(5,'Events per File '+str(evPerFile))
339 <
340 <        ## compute job splitting parameters: filesPerJob, eventsPerJob and theNumberOfJobs
341 <        if self.selectFilesPerJob:
342 <            ## user define files per event.
343 <            filesPerJob = self.filesPerJob
344 <            eventsPerJob = filesPerJob*evPerFile
345 <            theNumberOfJobs = int(self.total_number_of_events*1./eventsPerJob)
346 <            check = int(self.total_number_of_events) - (theNumberOfJobs*eventsPerJob)
347 <            if check > 0:
348 <                theNumberOfJobs +=1
349 <                filesLastJob = int(check*1./evPerFile+0.5)
350 <                common.logger.message('Warning: last job will be created with '+str(check)+' files')
351 <            else:
352 <                filesLastJob = filesPerJob
322 >        # If user requested more events per job than are in the dataset
323 >        if (self.selectEventsPerJob and eventsPerJobRequested > self.maxEvents):
324 >            eventsPerJobRequested = self.maxEvents
325  
326 <        elif self.selectNumberOfJobs:
327 <            ## User select the number of jobs: last might be bigger to match request of events
356 <            theNumberOfJobs =  self.theNumberOfJobs
357 <
358 <            eventsPerJob = self.total_number_of_events/theNumberOfJobs
359 <            filesPerJob = int(eventsPerJob/evPerFile)
360 <            if (filesPerJob==0) : filesPerJob=1
361 <            check = int(self.total_number_of_events) - (int(theNumberOfJobs)*filesPerJob*evPerFile)
362 <            if not check == 0:
363 <                if check<0:
364 <                    missingFiles = int(check/evPerFile)
365 <                    additionalJobs = int(missingFiles/filesPerJob)
366 <                    #print missingFiles, additionalJobs
367 <                    theNumberOfJobs+=additionalJobs
368 <                    common.logger.message('Warning: will create only '+str(theNumberOfJobs)+' jobs')
369 <                    check = int(self.total_number_of_events) - (int(theNumberOfJobs)*filesPerJob*evPerFile)
370 <                    
371 <                if check >0 :
372 <                    filesLastJob = filesPerJob+int(check*1./evPerFile+0.5)
373 <                    common.logger.message('Warning: last job will be created with '+str(filesLastJob*evPerFile)+' events')
374 <                else:
375 <                    filesLastJob = filesPerJob
376 <            else:
377 <                filesLastJob = filesPerJob
378 <        elif self.selectEventsPerJob:
379 <            # SL case if asked events per job
380 <            ## estimate the number of files per job to match the user requirement
381 <            filesPerJob = int(float(self.eventsPerJob)/float(evPerFile))
382 <            if filesPerJob==0: filesPerJob=1
383 <            common.logger.debug(5,"filesPerJob "+str(filesPerJob))
384 <            if (filesPerJob==0): filesPerJob=1
385 <            eventsPerJob=filesPerJob*evPerFile
386 <            theNumberOfJobs = int(self.total_number_of_events)/int(eventsPerJob)
387 <            check = int(self.total_number_of_events) - (int(theNumberOfJobs)*eventsPerJob)
388 <            if not check == 0:
389 <                missingFiles = int(check/evPerFile)
390 <                additionalJobs = int(missingFiles/filesPerJob)
391 <                if ( additionalJobs>0) : theNumberOfJobs+=additionalJobs
392 <                check = int(self.total_number_of_events) - (int(theNumberOfJobs)*eventsPerJob)
393 <                if not check == 0:
394 <                    if (check <0 ):
395 <                        filesLastJob = filesPerJob+int(check*1./evPerFile-0.5)
396 <                    else:
397 <                        theNumberOfJobs+=1
398 <                        filesLastJob = int(check*1./evPerFile+0.5)
326 >        # For user info at end
327 >        totalEventCount = 0
328  
329 <                    common.logger.message('Warning: last job will be created with '+str(filesLastJob*evPerFile)+' events')
330 <                else:
402 <                    filesLastJob = filesPerJob
403 <            else:
404 <                filesLastJob = filesPerJob
405 <        
406 <        self.total_number_of_jobs = theNumberOfJobs
329 >        if (self.selectTotalNumberEvents and self.selectNumberOfJobs):
330 >            eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs)
331  
332 <        totalEventsToBeUsed=theNumberOfJobs*filesPerJob*evPerFile
333 <        if not check == 0:
410 <        #    print (theNumberOfJobs-1)*filesPerJob*evPerFile,filesLastJob*evPerFile
411 <            totalEventsToBeUsed=(theNumberOfJobs-1)*filesPerJob*evPerFile+filesLastJob*evPerFile
332 >        if (self.selectNumberOfJobs):
333 >            common.logger.message("May not create the exact number_of_jobs requested.")
334  
335 <        common.logger.message(str(self.total_number_of_jobs)+' jobs will be created, each for '+str(filesPerJob*evPerFile)+' events, for a total of '+str(totalEventsToBeUsed)+' events')
335 >        if ( self.ncjobs == 'all' ) :
336 >            totalNumberOfJobs = 999999999
337 >        else :
338 >            totalNumberOfJobs = self.ncjobs
339 >            
340  
341 <        totalFilesToBeUsed=filesPerJob*(theNumberOfJobs-1)+filesLastJob
341 >        blocks = blockSites.keys()
342 >        blockCount = 0
343 >        # Backup variable in case self.maxEvents counted events in a non-included block
344 >        numBlocksInDataset = len(blocks)
345  
346 <        ## set job arguments (files)
346 >        jobCount = 0
347          list_of_lists = []
348 <        lastFile=0
349 <        for i in range(0, int(totalFilesToBeUsed), filesPerJob)[:-1]:
350 <            parString = "\\{"
348 >
349 >        # ---- Iterate over the blocks in the dataset until ---- #
350 >        # ---- we've met the requested total # of events    ---- #
351 >        while ( (eventsRemaining > 0) and (blockCount < numBlocksInDataset) and (jobCount < totalNumberOfJobs)):
352 >            block = blocks[blockCount]
353 >
354 >
355 >            evInBlock = self.eventsbyblock[block]
356 >            common.logger.debug(5,'Events in Block File '+str(evInBlock))
357 >
358 >            #Correct - switch to this when DBS up
359 >            #numEventsInBlock = self.eventsbyblock[block]
360 >            numEventsInBlock = evInBlock
361              
362 <            lastFile=i+filesPerJob
363 <            params = self.files[0][i: lastFile]
364 <            for i in range(len(params) - 1):
365 <                parString += '\\\"' + params[i] + '\\\"\,'
362 >            files = self.filesbyblock[block]
363 >            numFilesInBlock = len(files)
364 >            if (numFilesInBlock <= 0):
365 >                continue
366 >            fileCount = 0
367 >
368 >            # ---- New block => New job ---- #
369 >            parString = "\\{"
370 >            # counter for number of events in files currently worked on
371 >            filesEventCount = 0
372 >            # flag if next while loop should touch new file
373 >            newFile = 1
374 >            # job event counter
375 >            jobSkipEventCount = 0
376              
377 <            parString += '\\\"' + params[len(params) - 1] + '\\\"\\}'
378 <            list_of_lists.append([parString])
379 <            pass
380 <
381 <        ## last job
382 <        parString = "\\{"
383 <        
384 <        params = self.files[0][lastFile: lastFile+filesLastJob]
385 <        for i in range(len(params) - 1):
386 <            parString += '\\\"' + params[i] + '\\\"\,'
377 >            # ---- Iterate over the files in the block until we've met the requested ---- #
378 >            # ---- total # of events or we've gone over all the files in this block  ---- #
379 >            while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
380 >                file = files[fileCount]
381 >                if newFile :
382 >                    try:
383 >                        numEventsInFile = self.eventsbyfile[file]
384 >                        common.logger.debug(6, "File "+str(file)+" has "+str(numEventsInFile)+" events")
385 >                        # increase filesEventCount
386 >                        filesEventCount += numEventsInFile
387 >                        # Add file to current job
388 >                        parString += '\\\"' + file + '\\\"\,'
389 >                        newFile = 0
390 >                    except KeyError:
391 >                        common.logger.message("File "+str(file)+" has unknown numbe of events: skipping")
392 >                        
393 >
394 >                # if less events in file remain than eventsPerJobRequested
395 >                if ( filesEventCount - jobSkipEventCount < eventsPerJobRequested ) :
396 >                    # if last file in block
397 >                    if ( fileCount == numFilesInBlock ) :
398 >                        # end job using last file, use remaining events in block
399 >                        # close job and touch new file
400 >                        fullString = parString[:-2]
401 >                        fullString += '\\}'
402 >                        list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
403 >                        common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(filesEventCount - jobSkipEventCount)+" events (last file in block).")
404 >                        self.jobDestination.append(blockSites[block])
405 >                        common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
406 >                        # reset counter
407 >                        jobCount = jobCount + 1
408 >                        totalEventCount = totalEventCount + eventsPerJobRequested
409 >                        eventsRemaining = eventsRemaining - eventsPerJobRequested
410 >                        jobSkipEventCount = 0
411 >                        # reset file
412 >                        parString = "\\{"
413 >                        filesEventCount = 0
414 >                        newFile = 1
415 >                        fileCount += 1
416 >                    else :
417 >                        # go to next file
418 >                        newFile = 1
419 >                        fileCount += 1
420 >                # if events in file equal to eventsPerJobRequested
421 >                elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
422 >                    # close job and touch new file
423 >                    fullString = parString[:-2]
424 >                    fullString += '\\}'
425 >                    list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
426 >                    common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
427 >                    self.jobDestination.append(blockSites[block])
428 >                    common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
429 >                    # reset counter
430 >                    jobCount = jobCount + 1
431 >                    totalEventCount = totalEventCount + eventsPerJobRequested
432 >                    eventsRemaining = eventsRemaining - eventsPerJobRequested
433 >                    jobSkipEventCount = 0
434 >                    # reset file
435 >                    parString = "\\{"
436 >                    filesEventCount = 0
437 >                    newFile = 1
438 >                    fileCount += 1
439 >                    
440 >                # if more events in file remain than eventsPerJobRequested
441 >                else :
442 >                    # close job but don't touch new file
443 >                    fullString = parString[:-2]
444 >                    fullString += '\\}'
445 >                    list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
446 >                    common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
447 >                    self.jobDestination.append(blockSites[block])
448 >                    common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
449 >                    # increase counter
450 >                    jobCount = jobCount + 1
451 >                    totalEventCount = totalEventCount + eventsPerJobRequested
452 >                    eventsRemaining = eventsRemaining - eventsPerJobRequested
453 >                    # calculate skip events for last file
454 >                    # use filesEventCount (contains several files), jobSkipEventCount and eventsPerJobRequest
455 >                    jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
456 >                    # remove all but the last file
457 >                    filesEventCount = self.eventsbyfile[file]
458 >                    parString = "\\{"
459 >                    parString += '\\\"' + file + '\\\"\,'
460 >                pass # END if
461 >            pass # END while (iterate over files in the block)
462 >        pass # END while (iterate over blocks in the dataset)
463 >        self.ncjobs = self.total_number_of_jobs = jobCount
464 >        if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
465 >            common.logger.message("Could not run on all requested events because some blocks not hosted at allowed sites.")
466 >        common.logger.message("\n"+str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
467          
439        parString += '\\\"' + params[len(params) - 1] + '\\\"\\}'
440        list_of_lists.append([parString])
441        pass
442
468          self.list_of_args = list_of_lists
444        # print self.list_of_args[0]
469          return
470  
471      def jobSplittingNoInput(self):
# Line 470 | Line 494 | class Cmssw(JobType):
494  
495          common.logger.debug(5,'Check  '+str(check))
496  
497 <        common.logger.message(str(self.total_number_of_jobs)+' jobs will be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
497 >        common.logger.message(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
498          if check > 0:
499 <            common.logger.message('Warning: asked '+str(self.total_number_of_events)+' but will do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
476 <
499 >            common.logger.message('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
500  
501          # argument is seed number.$i
502          self.list_of_args = []
503          for i in range(self.total_number_of_jobs):
504 +            ## Since there is no input, any site is good
505 +            self.jobDestination.append(["Any"])
506              if (self.sourceSeed):
507 <                self.list_of_args.append([(str(self.sourceSeed)+str(i))])
507 >                if (self.sourceSeedVtx):
508 >                    ## pythia + vtx random seed
509 >                    self.list_of_args.append([
510 >                                              str(self.sourceSeed)+str(i),
511 >                                              str(self.sourceSeedVtx)+str(i)
512 >                                              ])
513 >                else:
514 >                    ## only pythia random seed
515 >                    self.list_of_args.append([(str(self.sourceSeed)+str(i))])
516              else:
517 +                ## no random seed
518                  self.list_of_args.append([str(i)])
519          #print self.list_of_args
520  
# Line 501 | Line 535 | class Cmssw(JobType):
535              # print str(arglist[job])
536              # print jobParams[job]
537              common.jobDB.setArguments(job, jobParams[job])
538 +            common.logger.debug(5,"Job "+str(job)+" Destination: "+str(self.jobDestination[job]))
539 +            common.jobDB.setDestination(job, self.jobDestination[job])
540  
541          common.jobDB.save()
542          return
# Line 515 | Line 551 | class Cmssw(JobType):
551          # Fabio
552          return self.total_number_of_jobs
553  
518    def checkBlackList(self, allSites):
519        if len(self.reCEBlackList)==0: return allSites
520        sites = []
521        for site in allSites:
522            common.logger.debug(10,'Site '+site)
523            good=1
524            for re in self.reCEBlackList:
525                if re.search(site):
526                    common.logger.message('CE in black list, skipping site '+site)
527                    good=0
528                pass
529            if good: sites.append(site)
530        if len(sites) == 0:
531            common.logger.debug(3,"No sites found after BlackList")
532        return sites
533
534    def checkWhiteList(self, allSites):
535
536        if len(self.reCEWhiteList)==0: return allSites
537        sites = []
538        for site in allSites:
539            good=0
540            for re in self.reCEWhiteList:
541                if re.search(site):
542                    common.logger.debug(5,'CE in white list, adding site '+site)
543                    good=1
544                if not good: continue
545                sites.append(site)
546        if len(sites) == 0:
547            common.logger.message("No sites found after WhiteList\n")
548        else:
549            common.logger.debug(5,"Selected sites via WhiteList are "+str(sites)+"\n")
550        return sites
551
554      def getTarBall(self, exe):
555          """
556          Return the TarBall with lib and exe
# Line 704 | Line 706 | class Cmssw(JobType):
706          txt += "\n"
707          txt += "## number of arguments (first argument always jobnumber)\n"
708          txt += "\n"
709 <        txt += "narg=$#\n"
710 <        txt += "if [ $narg -lt 2 ]\n"
709 > #        txt += "narg=$#\n"
710 >        txt += "if [ $nargs -lt 2 ]\n"
711          txt += "then\n"
712 <        txt += "    echo 'SET_EXE_ENV 1 ==> ERROR Too few arguments' +$narg+ \n"
712 >        txt += "    echo 'SET_EXE_ENV 1 ==> ERROR Too few arguments' +$nargs+ \n"
713          txt += '    echo "JOB_EXIT_STATUS = 50113"\n'
714          txt += '    echo "JobExitCode=50113" | tee -a $RUNTIME_AREA/$repo\n'
715          txt += '    dumpStatus $RUNTIME_AREA/$repo\n'
# Line 738 | Line 740 | class Cmssw(JobType):
740          pset = os.path.basename(job.configFilename())
741          txt += '\n'
742          if (self.datasetPath): # standard job
743 <            txt += 'InputFiles=$2\n'
743 >            #txt += 'InputFiles=$2\n'
744 >            txt += 'InputFiles=${args[1]}\n'
745 >            txt += 'MaxEvents=${args[2]}\n'
746 >            txt += 'SkipEvents=${args[3]}\n'
747              txt += 'echo "Inputfiles:<$InputFiles>"\n'
748 <            txt += 'sed "s#{\'INPUT\'}#$InputFiles#" $RUNTIME_AREA/'+pset+' > pset.cfg\n'
748 >            txt += 'sed "s#{\'INPUT\'}#$InputFiles#" $RUNTIME_AREA/'+pset+' > pset_tmp_1.cfg\n'
749 >            txt += 'echo "MaxEvents:<$MaxEvents>"\n'
750 >            txt += 'sed "s#INPUTMAXEVENTS#$MaxEvents#" $RUNTIME_AREA/ pset_tmp_1.cfg > pset_tmp_2.cfg\n'
751 >            txt += 'echo "SkipEvents:<$SkipEvents>"\n'
752 >            txt += 'sed "s#INPUTSKIPEVENTS#$SkipEvents#" $RUNTIME_AREA/ pset_tmp_2.cfg > pset.cfg\n'
753          else:  # pythia like job
754              if (self.sourceSeed):
755 <                txt += 'Seed=$2\n'
755 > #                txt += 'Seed=$2\n'
756 >                txt += 'Seed=${args[1]}\n'
757                  txt += 'echo "Seed: <$Seed>"\n'
758 <                txt += 'sed "s#INPUT#$Seed#" $RUNTIME_AREA/'+pset+' > pset.cfg\n'
758 >                txt += 'sed "s#\<INPUT\>#$Seed#" $RUNTIME_AREA/'+pset+' > tmp.cfg\n'
759 >                if (self.sourceSeedVtx):
760 > #                    txt += 'VtxSeed=$3\n'
761 >                    txt += 'VtxSeed=${args[2]}\n'
762 >                    txt += 'echo "VtxSeed: <$VtxSeed>"\n'
763 >                    txt += 'sed "s#INPUTVTX#$VtxSeed#" tmp.cfg > pset.cfg\n'
764 >                else:
765 >                    txt += 'mv tmp.cfg pset.cfg\n'
766              else:
767                  txt += '# Copy untouched pset\n'
768                  txt += 'cp $RUNTIME_AREA/'+pset+' pset.cfg\n'
# Line 753 | Line 770 | class Cmssw(JobType):
770  
771          if len(self.additional_inbox_files) > 0:
772              for file in self.additional_inbox_files:
773 <                txt += 'if [ -e $RUNTIME_AREA/'+file+' ] ; then\n'
774 <                txt += '   cp $RUNTIME_AREA/'+file+' .\n'
775 <                txt += '   chmod +x '+file+'\n'
773 >                relFile = file.split("/")[-1]
774 >                txt += 'if [ -e $RUNTIME_AREA/'+relFile+' ] ; then\n'
775 >                txt += '   cp $RUNTIME_AREA/'+relFile+' .\n'
776 >                txt += '   chmod +x '+relFile+'\n'
777                  txt += 'fi\n'
778              pass
779  
# Line 927 | Line 945 | class Cmssw(JobType):
945          # add "_txt"
946          if len(p)>1:
947            ext = p[len(p)-1]
930          #result = name + '_' + str(txt) + "." + ext
948            result = name + '_' + txt + "." + ext
949          else:
933          #result = name + '_' + str(txt)
950            result = name + '_' + txt
951          
952          return result
# Line 944 | Line 960 | class Cmssw(JobType):
960              req='Member("VO-cms-' + \
961                   common.analisys_common_info['sw_version'] + \
962                   '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
963 <        if common.analisys_common_info['sites']:
964 <            if len(common.analisys_common_info['sites'])>0:
965 <                req = req + ' && ('
950 <                for i in range(len(common.analisys_common_info['sites'])):
951 <                    req = req + 'other.GlueCEInfoHostName == "' \
952 <                         + common.analisys_common_info['sites'][i] + '"'
953 <                    if ( i < (int(len(common.analisys_common_info['sites']) - 1)) ):
954 <                        req = req + ' || '
955 <            req = req + ')'
956 <        #print "req = ", req
963 >
964 >        req = req + ' && (other.GlueHostNetworkAdapterOutboundIP)'
965 >
966          return req
967  
968      def configFilename(self):
# Line 971 | Line 980 | class Cmssw(JobType):
980          txt += '   if [ -f $GRID3_APP_DIR/cmssoft/cmsset_default.sh ] ;then\n'
981          txt += '      # Use $GRID3_APP_DIR/cmssoft/cmsset_default.sh to setup cms software\n'
982          txt += '       source $GRID3_APP_DIR/cmssoft/cmsset_default.sh '+self.version+'\n'
983 <        txt += '   elif [ -f $OSG_APP/cmssoft/cmsset_default.sh ] ;then\n'
984 <        txt += '      # Use $OSG_APP/cmssoft/cmsset_default.sh to setup cms software\n'
985 <        txt += '       source $OSG_APP/cmssoft/cmsset_default.sh '+self.version+'\n'
983 >        txt += '   elif [ -f $OSG_APP/cmssoft/cms/cmsset_default.sh ] ;then\n'
984 >        txt += '      # Use $OSG_APP/cmssoft/cms/cmsset_default.sh to setup cms software\n'
985 >        txt += '       source $OSG_APP/cmssoft/cms/cmsset_default.sh '+self.version+'\n'
986          txt += '   else\n'
987 <        txt += '       echo "SET_CMS_ENV 10020 ==> ERROR $GRID3_APP_DIR/cmssoft/cmsset_default.sh and $OSG_APP/cmssoft/cmsset_default.sh file not found"\n'
987 >        txt += '       echo "SET_CMS_ENV 10020 ==> ERROR $GRID3_APP_DIR/cmssoft/cmsset_default.sh and $OSG_APP/cmssoft/cms/cmsset_default.sh file not found"\n'
988          txt += '       echo "JOB_EXIT_STATUS = 10020"\n'
989          txt += '       echo "JobExitCode=10020" | tee -a $RUNTIME_AREA/$repo\n'
990          txt += '       dumpStatus $RUNTIME_AREA/$repo\n'
# Line 988 | Line 997 | class Cmssw(JobType):
997          txt += '       cd $RUNTIME_AREA\n'
998          txt += '       /bin/rm -rf $WORKING_DIR\n'
999          txt += '       if [ -d $WORKING_DIR ] ;then\n'
1000 <        txt += '            echo "SET_CMS_ENV 10017 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after $GRID3_APP_DIR/cmssoft/cmsset_default.sh and $OSG_APP/cmssoft/cmsset_default.sh file not found"\n'
1000 >        txt += '            echo "SET_CMS_ENV 10017 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after $GRID3_APP_DIR/cmssoft/cmsset_default.sh and $OSG_APP/cmssoft/cms/cmsset_default.sh file not found"\n'
1001          txt += '            echo "JOB_EXIT_STATUS = 10017"\n'
1002          txt += '            echo "JobExitCode=10017" | tee -a $RUNTIME_AREA/$repo\n'
1003          txt += '            dumpStatus $RUNTIME_AREA/$repo\n'
# Line 1081 | Line 1090 | class Cmssw(JobType):
1090          
1091      def getTaskid(self):
1092          return self._taskId
1093 +
1094 + #######################################################################
1095 +    def uniquelist(self, old):
1096 +        """
1097 +        remove duplicates from a list
1098 +        """
1099 +        nd={}
1100 +        for e in old:
1101 +            nd[e]=0
1102 +        return nd.keys()

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines