ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cms_cmssw.py
(Generate patch)

Comparing COMP/CRAB/python/cms_cmssw.py (file contents):
Revision 1.32 by mkirn, Fri Jul 28 18:04:01 2006 UTC vs.
Revision 1.49 by slacapra, Thu Oct 5 15:32:20 2006 UTC

# Line 6 | Line 6 | import math
6   import common
7   import PsetManipulator  
8  
9 < import DBSInfo_EDM
10 < import DataDiscovery_EDM
11 < import DataLocation_EDM
9 > import DBSInfo
10 > import DataDiscovery
11 > import DataLocation
12   import Scram
13  
14 < import os, string, re
14 > import glob, os, string, re
15  
16   class Cmssw(JobType):
17 <    def __init__(self, cfg_params):
17 >    def __init__(self, cfg_params, ncjobs):
18          JobType.__init__(self, 'CMSSW')
19          common.logger.debug(3,'CMSSW::__init__')
20  
21        self.analisys_common_info = {}
21          # Marco.
22          self._params = {}
23          self.cfg_params = cfg_params
24 +
25 +        # number of jobs requested to be created, limit obj splitting
26 +        self.ncjobs = ncjobs
27 +
28          log = common.logger
29          
30          self.scram = Scram.Scram(cfg_params)
# Line 30 | Line 33 | class Cmssw(JobType):
33          self.scriptExe = ''
34          self.executable = ''
35          self.tgz_name = 'default.tgz'
36 <
36 >        self.pset = ''      #scrip use case Da  
37 >        self.datasetPath = '' #scrip use case Da
38  
39          self.version = self.scram.getSWVersion()
40          self.setParam_('application', self.version)
37        common.analisys_common_info['sw_version'] = self.version
38        ### FEDE
39        common.analisys_common_info['copy_input_data'] = 0
40        common.analisys_common_info['events_management'] = 1
41  
42          ### collect Data cards
43          try:
# Line 85 | Line 85 | class Cmssw(JobType):
85          try:
86              self.pset = cfg_params['CMSSW.pset']
87              log.debug(6, "Cmssw::Cmssw(): PSet file = "+self.pset)
88 <            if (not os.path.exists(self.pset)):
89 <                raise CrabException("User defined PSet file "+self.pset+" does not exist")
88 >            if self.pset.lower() != 'none' :
89 >                if (not os.path.exists(self.pset)):
90 >                    raise CrabException("User defined PSet file "+self.pset+" does not exist")
91 >            else:
92 >                self.pset = None
93          except KeyError:
94              raise CrabException("PSet file missing. Cannot run cmsRun ")
95  
# Line 113 | Line 116 | class Cmssw(JobType):
116          # script_exe file as additional file in inputSandbox
117          try:
118              self.scriptExe = cfg_params['USER.script_exe']
116            self.additional_inbox_files.append(self.scriptExe)
119              if self.scriptExe != '':
120                 if not os.path.isfile(self.scriptExe):
121                    msg ="WARNING. file "+self.scriptExe+" not found"
122                    raise CrabException(msg)
123 +               self.additional_inbox_files.append(string.strip(self.scriptExe))
124          except KeyError:
125 <           pass
126 <                  
125 >            self.scriptExe = ''
126 >        #CarlosDaniele
127 >        if self.datasetPath == None and self.pset == None and self.scriptExe == '' :
128 >           msg ="WARNING. script_exe  not defined"
129 >           raise CrabException(msg)
130 >
131          ## additional input files
132          try:
133              tmpAddFiles = string.split(cfg_params['USER.additional_input_files'],',')
134              for tmp in tmpAddFiles:
135 <                if not os.path.exists(tmp):
136 <                    raise CrabException("Additional input file not found: "+tmp)
137 <                self.additional_inbox_files.append(string.strip(tmp))
135 >                dirname = ''
136 >                if not tmp[0]=="/": dirname = "."
137 >                files = glob.glob(os.path.join(dirname, tmp))
138 >                for file in files:
139 >                    if not os.path.exists(file):
140 >                        raise CrabException("Additional input file not found: "+file)
141 >                    pass
142 >                    self.additional_inbox_files.append(string.strip(file))
143                  pass
144              pass
145 +            common.logger.debug(5,"Additional input files: "+str(self.additional_inbox_files))
146          except KeyError:
147              pass
148  
149          # files per job
150          try:
151 <            self.filesPerJob = int(cfg_params['CMSSW.files_per_jobs']) #Daniele
152 <            self.selectFilesPerJob = 1
151 >            if (cfg_params['CMSSW.files_per_jobs']):
152 >                raise CrabException("files_per_jobs no longer supported.  Quitting.")
153          except KeyError:
154 <            self.filesPerJob = 0
142 <            self.selectFilesPerJob = 0
154 >            pass
155  
156          ## Events per job
157          try:
# Line 157 | Line 169 | class Cmssw(JobType):
169              self.theNumberOfJobs = 0
170              self.selectNumberOfJobs = 0
171  
172 +        try:
173 +            self.total_number_of_events = int(cfg_params['CMSSW.total_number_of_events'])
174 +            self.selectTotalNumberEvents = 1
175 +        except KeyError:
176 +            self.total_number_of_events = 0
177 +            self.selectTotalNumberEvents = 0
178 +
179 +        if self.pset != None: #CarlosDaniele
180 +             if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
181 +                 msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
182 +                 raise CrabException(msg)
183 +        else:
184 +             if (self.selectNumberOfJobs == 0):
185 +                 msg = 'Must specify  number_of_jobs.'
186 +                 raise CrabException(msg)
187 +
188          ## source seed for pythia
189          try:
190              self.sourceSeed = int(cfg_params['CMSSW.pythia_seed'])
# Line 169 | Line 197 | class Cmssw(JobType):
197          except KeyError:
198              self.sourceSeedVtx = None
199              common.logger.debug(5,"No vertex seed given")
200 <
201 <        if not (self.selectFilesPerJob + self.selectEventsPerJob + self.selectNumberOfJobs == 1 ):
174 <            msg = 'Must define either files_per_jobs or events_per_job or number_of_jobs'
175 <            raise CrabException(msg)
176 <
177 <        try:
178 <            self.total_number_of_events = int(cfg_params['CMSSW.total_number_of_events'])
179 <        except KeyError:
180 <            msg = 'Must define total_number_of_events'
181 <            raise CrabException(msg)
182 <        
183 <        CEBlackList = []
184 <        try:
185 <            tmpBad = string.split(cfg_params['EDG.ce_black_list'],',')
186 <            for tmp in tmpBad:
187 <                tmp=string.strip(tmp)
188 <                CEBlackList.append(tmp)
189 <        except KeyError:
190 <            pass
191 <
192 <        self.reCEBlackList=[]
193 <        for bad in CEBlackList:
194 <            self.reCEBlackList.append(re.compile( bad ))
195 <
196 <        common.logger.debug(5,'CEBlackList: '+str(CEBlackList))
197 <
198 <        CEWhiteList = []
199 <        try:
200 <            tmpGood = string.split(cfg_params['EDG.ce_white_list'],',')
201 <            for tmp in tmpGood:
202 <                tmp=string.strip(tmp)
203 <                CEWhiteList.append(tmp)
204 <        except KeyError:
205 <            pass
206 <
207 <        #print 'CEWhiteList: ',CEWhiteList
208 <        self.reCEWhiteList=[]
209 <        for Good in CEWhiteList:
210 <            self.reCEWhiteList.append(re.compile( Good ))
211 <
212 <        common.logger.debug(5,'CEWhiteList: '+str(CEWhiteList))
213 <
214 <        self.PsetEdit = PsetManipulator.PsetManipulator(self.pset) #Daniele Pset
200 >        if self.pset != None: #CarlosDaniele
201 >            self.PsetEdit = PsetManipulator.PsetManipulator(self.pset) #Daniele Pset
202  
203          #DBSDLS-start
204          ## Initialize the variables that are extracted from DBS/DLS and needed in other places of the code
205          self.maxEvents=0  # max events available   ( --> check the requested nb. of evts in Creator.py)
206          self.DBSPaths={}  # all dbs paths requested ( --> input to the site local discovery script)
207 +        self.jobDestination=[]  # Site destination(s) for each job (list of lists)
208          ## Perform the data location and discovery (based on DBS/DLS)
209          ## SL: Don't if NONE is specified as input (pythia use case)
210 <        common.analisys_common_info['sites']=None
210 >        blockSites = {}
211          if self.datasetPath:
212 <            self.DataDiscoveryAndLocation(cfg_params)
212 >            blockSites = self.DataDiscoveryAndLocation(cfg_params)
213          #DBSDLS-end          
214  
215          self.tgzNameWithPath = self.getTarBall(self.executable)
216      
217          ## Select Splitting
218 <        if self.selectNoInput: self.jobSplittingNoInput()
219 <        elif self.selectFilesPerJob or self.selectEventsPerJob or self.selectNumberOfJobs: self.jobSplittingPerFiles()
220 <        else:
221 <            msg = 'Don\'t know how to split...'
222 <            raise CrabException(msg)
218 >        if self.selectNoInput:
219 >            if self.pset == None: #CarlosDaniele
220 >                self.jobSplittingForScript()
221 >            else:
222 >                self.jobSplittingNoInput()
223 >        else: self.jobSplittingByBlocks(blockSites)
224  
225          # modify Pset
226 <        try:
227 <            if (self.datasetPath): # standard job
228 <                #self.PsetEdit.maxEvent(self.eventsPerJob)
229 <                # always process all events in a file
230 <                self.PsetEdit.maxEvent("-1")
231 <                self.PsetEdit.inputModule("INPUT")
232 <
233 <            else:  # pythia like job
234 <                self.PsetEdit.maxEvent(self.eventsPerJob)
235 <                if (self.sourceSeed) :
236 <                    self.PsetEdit.pythiaSeed("INPUT")
237 <                    if (self.sourceSeedVtx) :
238 <                        self.PsetEdit.pythiaSeedVtx("INPUTVTX")
239 <            self.PsetEdit.psetWriter(self.configFilename())
240 <        except:
241 <            msg='Error while manipuliating ParameterSet: exiting...'
242 <            raise CrabException(msg)
226 >        if self.pset != None: #CarlosDaniele
227 >            try:
228 >                if (self.datasetPath): # standard job
229 >                    # allow to processa a fraction of events in a file
230 >                    self.PsetEdit.inputModule("INPUT")
231 >                    self.PsetEdit.maxEvent("INPUTMAXEVENTS")
232 >                    self.PsetEdit.skipEvent("INPUTSKIPEVENTS")
233 >                else:  # pythia like job
234 >                    self.PsetEdit.maxEvent(self.eventsPerJob)
235 >                    if (self.sourceSeed) :
236 >                        self.PsetEdit.pythiaSeed("INPUT")
237 >                        if (self.sourceSeedVtx) :
238 >                            self.PsetEdit.pythiaSeedVtx("INPUTVTX")
239 >                self.PsetEdit.psetWriter(self.configFilename())
240 >            except:
241 >                msg='Error while manipuliating ParameterSet: exiting...'
242 >                raise CrabException(msg)
243  
244      def DataDiscoveryAndLocation(self, cfg_params):
245  
# Line 263 | Line 252 | class Cmssw(JobType):
252          dataTiers = dataTiersList.split(',')
253  
254          ## Contact the DBS
255 +        common.logger.message("Contacting DBS...")
256          try:
257 <            self.pubdata=DataDiscovery_EDM.DataDiscovery_EDM(datasetPath, dataTiers, cfg_params)
257 >            self.pubdata=DataDiscovery.DataDiscovery(datasetPath, dataTiers, cfg_params)
258              self.pubdata.fetchDBSInfo()
259  
260 <        except DataDiscovery_EDM.NotExistingDatasetError, ex :
260 >        except DataDiscovery.NotExistingDatasetError, ex :
261              msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
262              raise CrabException(msg)
263  
264 <        except DataDiscovery_EDM.NoDataTierinProvenanceError, ex :
264 >        except DataDiscovery.NoDataTierinProvenanceError, ex :
265              msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
266              raise CrabException(msg)
267 <        except DataDiscovery_EDM.DataDiscoveryError, ex:
267 >        except DataDiscovery.DataDiscoveryError, ex:
268              msg = 'ERROR ***: failed Data Discovery in DBS  %s'%ex.getErrorMessage()
269              raise CrabException(msg)
270  
# Line 282 | Line 272 | class Cmssw(JobType):
272          ## self.DBSPaths=self.pubdata.getDBSPaths()
273          common.logger.message("Required data are :"+self.datasetPath)
274  
275 <        filesbyblock=self.pubdata.getFiles()
276 < #        print filesbyblock
277 <        self.AllInputFiles=filesbyblock.values()
278 <        self.files = self.AllInputFiles        
275 >        self.filesbyblock=self.pubdata.getFiles()
276 >        self.eventsbyblock=self.pubdata.getEventsPerBlock()
277 >        self.eventsbyfile=self.pubdata.getEventsPerFile()
278 >        # print str(self.filesbyblock)
279 >        # print 'self.eventsbyfile',len(self.eventsbyfile)
280 >        # print str(self.eventsbyfile)
281  
282          ## get max number of events
291        #common.logger.debug(10,"number of events for primary fileblocks %i"%self.pubdata.getMaxEvents())
283          self.maxEvents=self.pubdata.getMaxEvents() ##  self.maxEvents used in Creator.py
284 <        common.logger.message("\nThe number of available events is %s"%self.maxEvents)
284 >        common.logger.message("The number of available events is %s\n"%self.maxEvents)
285  
286 +        common.logger.message("Contacting DLS...")
287          ## Contact the DLS and build a list of sites hosting the fileblocks
288          try:
289 <            dataloc=DataLocation_EDM.DataLocation_EDM(filesbyblock.keys(),cfg_params)
289 >            dataloc=DataLocation.DataLocation(self.filesbyblock.keys(),cfg_params)
290              dataloc.fetchDLSInfo()
291 <        except DataLocation_EDM.DataLocationError , ex:
291 >        except DataLocation.DataLocationError , ex:
292              msg = 'ERROR ***: failed Data Location in DLS \n %s '%ex.getErrorMessage()
293              raise CrabException(msg)
294          
303        allsites=dataloc.getSites()
304        common.logger.debug(5,"sites are %s"%allsites)
305        sites=self.checkBlackList(allsites)
306        common.logger.debug(5,"sites are (after black list) %s"%sites)
307        sites=self.checkWhiteList(sites)
308        common.logger.debug(5,"sites are (after white list) %s"%sites)
295  
296 <        if len(sites)==0:
297 <            msg = 'No sites hosting all the needed data! Exiting... '
298 <            raise CrabException(msg)
296 >        sites = dataloc.getSites()
297 >        allSites = []
298 >        listSites = sites.values()
299 >        for list in listSites:
300 >            for oneSite in list:
301 >                allSites.append(oneSite)
302 >        allSites = self.uniquelist(allSites)
303  
304 <        common.logger.message("List of Sites ("+str(len(sites))+") hosting the data : "+str(sites))
305 <        common.logger.debug(6, "List of Sites: "+str(sites))
306 <        common.analisys_common_info['sites']=sites    ## used in SchedulerEdg.py in createSchScript
317 <        self.setParam_('TargetCE', ','.join(sites))
318 <        return
304 >        common.logger.message("Sites ("+str(len(allSites))+") hosting part/all of dataset: "+str(allSites))
305 >        common.logger.debug(6, "List of Sites: "+str(allSites))
306 >        return sites
307      
308 <    def jobSplittingPerFiles(self):
308 >    def jobSplittingByBlocks(self, blockSites):
309          """
310 <        Perform job splitting based on number of files to be accessed per job
311 <        """
312 <        common.logger.debug(5,'Splitting per input files')
313 <        common.logger.message('Required '+str(self.total_number_of_events)+' events in total ')
314 <        common.logger.message('Available '+str(self.maxEvents)+' events in total ')
315 <        common.logger.message('Required '+str(self.filesPerJob)+' files per job ')
316 <        common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
317 <        common.logger.message('Required '+str(self.eventsPerJob)+' events per job')
318 <
319 <        ## if asked to process all events, do it
320 <        if self.total_number_of_events == -1:
321 <            self.total_number_of_events=self.maxEvents
310 >        Perform job splitting. Jobs run over an integer number of files
311 >        and no more than one block.
312 >        ARGUMENT: blockSites: dictionary with blocks as keys and list of host sites as values
313 >        REQUIRES: self.selectTotalNumberEvents, self.selectEventsPerJob, self.selectNumberofJobs,
314 >                  self.total_number_of_events, self.eventsPerJob, self.theNumberOfJobs,
315 >                  self.maxEvents, self.filesbyblock
316 >        SETS: self.jobDestination - Site destination(s) for each job (a list of lists)
317 >              self.total_number_of_jobs - Total # of jobs
318 >              self.list_of_args - File(s) job will run on (a list of lists)
319 >        """
320 >
321 >        # ---- Handle the possible job splitting configurations ---- #
322 >        if (self.selectTotalNumberEvents):
323 >            totalEventsRequested = self.total_number_of_events
324 >        if (self.selectEventsPerJob):
325 >            eventsPerJobRequested = self.eventsPerJob
326 >            if (self.selectNumberOfJobs):
327 >                totalEventsRequested = self.theNumberOfJobs * self.eventsPerJob
328 >
329 >        # If user requested all the events in the dataset
330 >        if (totalEventsRequested == -1):
331 >            eventsRemaining=self.maxEvents
332 >        # If user requested more events than are in the dataset
333 >        elif (totalEventsRequested > self.maxEvents):
334 >            eventsRemaining = self.maxEvents
335 >            common.logger.message("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
336 >        # If user requested less events than are in the dataset
337          else:
338 <            if self.total_number_of_events>self.maxEvents:
336 <                common.logger.message("Asked "+str(self.total_number_of_events)+" but only "+str(self.maxEvents)+" available.")
337 <                self.total_number_of_events=self.maxEvents
338 <            pass
338 >            eventsRemaining = totalEventsRequested
339  
340 <        ## TODO: SL need to have (from DBS) a detailed list of how many events per each file
341 <        n_tot_files = (len(self.files[0]))
342 <        ## SL: this is wrong if the files have different number of events
343 <        evPerFile = int(self.maxEvents)/n_tot_files
344 <
345 <        common.logger.debug(5,'Events per File '+str(evPerFile))
346 <
347 <        ## compute job splitting parameters: filesPerJob, eventsPerJob and theNumberOfJobs
348 <        if self.selectFilesPerJob:
349 <            ## user define files per event.
350 <            filesPerJob = self.filesPerJob
351 <            eventsPerJob = filesPerJob*evPerFile
352 <            theNumberOfJobs = int(self.total_number_of_events*1./eventsPerJob)
353 <            check = int(self.total_number_of_events) - (theNumberOfJobs*eventsPerJob)
354 <            if check > 0:
355 <                theNumberOfJobs +=1
356 <                filesLastJob = int(check*1./evPerFile+0.5)
357 <                common.logger.message('Warning: last job will be created with '+str(check)+' files')
358 <            else:
359 <                filesLastJob = filesPerJob
340 >        # If user requested more events per job than are in the dataset
341 >        if (self.selectEventsPerJob and eventsPerJobRequested > self.maxEvents):
342 >            eventsPerJobRequested = self.maxEvents
343  
344 <        elif self.selectNumberOfJobs:
345 <            ## User select the number of jobs: last might be bigger to match request of events
363 <            theNumberOfJobs =  self.theNumberOfJobs
364 <
365 <            eventsPerJob = self.total_number_of_events/theNumberOfJobs
366 <            filesPerJob = int(eventsPerJob/evPerFile)
367 <            if (filesPerJob==0) : filesPerJob=1
368 <            check = int(self.total_number_of_events) - (int(theNumberOfJobs)*filesPerJob*evPerFile)
369 <            if not check == 0:
370 <                if check<0:
371 <                    missingFiles = int(check/evPerFile)
372 <                    additionalJobs = int(missingFiles/filesPerJob)
373 <                    #print missingFiles, additionalJobs
374 <                    theNumberOfJobs+=additionalJobs
375 <                    common.logger.message('Warning: will create only '+str(theNumberOfJobs)+' jobs')
376 <                    check = int(self.total_number_of_events) - (int(theNumberOfJobs)*filesPerJob*evPerFile)
377 <                    
378 <                if check >0 :
379 <                    filesLastJob = filesPerJob+int(check*1./evPerFile+0.5)
380 <                    common.logger.message('Warning: last job will be created with '+str(filesLastJob*evPerFile)+' events')
381 <                else:
382 <                    filesLastJob = filesPerJob
383 <            else:
384 <                filesLastJob = filesPerJob
385 <        elif self.selectEventsPerJob:
386 <            # SL case if asked events per job
387 <            ## estimate the number of files per job to match the user requirement
388 <            filesPerJob = int(float(self.eventsPerJob)/float(evPerFile))
389 <            if filesPerJob==0: filesPerJob=1
390 <            common.logger.debug(5,"filesPerJob "+str(filesPerJob))
391 <            if (filesPerJob==0): filesPerJob=1
392 <            eventsPerJob=filesPerJob*evPerFile
393 <            theNumberOfJobs = int(self.total_number_of_events)/int(eventsPerJob)
394 <            check = int(self.total_number_of_events) - (int(theNumberOfJobs)*eventsPerJob)
395 <            if not check == 0:
396 <                missingFiles = int(check/evPerFile)
397 <                additionalJobs = int(missingFiles/filesPerJob)
398 <                if ( additionalJobs>0) : theNumberOfJobs+=additionalJobs
399 <                check = int(self.total_number_of_events) - (int(theNumberOfJobs)*eventsPerJob)
400 <                if not check == 0:
401 <                    if (check <0 ):
402 <                        filesLastJob = filesPerJob+int(check*1./evPerFile-0.5)
403 <                    else:
404 <                        theNumberOfJobs+=1
405 <                        filesLastJob = int(check*1./evPerFile+0.5)
344 >        # For user info at end
345 >        totalEventCount = 0
346  
347 <                    common.logger.message('Warning: last job will be created with '+str(filesLastJob*evPerFile)+' events')
348 <                else:
409 <                    filesLastJob = filesPerJob
410 <            else:
411 <                filesLastJob = filesPerJob
412 <        
413 <        self.total_number_of_jobs = theNumberOfJobs
347 >        if (self.selectTotalNumberEvents and self.selectNumberOfJobs):
348 >            eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs)
349  
350 <        totalEventsToBeUsed=theNumberOfJobs*filesPerJob*evPerFile
351 <        if not check == 0:
417 <        #    print (theNumberOfJobs-1)*filesPerJob*evPerFile,filesLastJob*evPerFile
418 <            totalEventsToBeUsed=(theNumberOfJobs-1)*filesPerJob*evPerFile+filesLastJob*evPerFile
350 >        if (self.selectNumberOfJobs):
351 >            common.logger.message("May not create the exact number_of_jobs requested.")
352  
353 <        common.logger.message(str(self.total_number_of_jobs)+' jobs will be created, each for '+str(filesPerJob*evPerFile)+' events, for a total of '+str(totalEventsToBeUsed)+' events')
353 >        if ( self.ncjobs == 'all' ) :
354 >            totalNumberOfJobs = 999999999
355 >        else :
356 >            totalNumberOfJobs = self.ncjobs
357 >            
358  
359 <        totalFilesToBeUsed=filesPerJob*(theNumberOfJobs-1)+filesLastJob
359 >        blocks = blockSites.keys()
360 >        blockCount = 0
361 >        # Backup variable in case self.maxEvents counted events in a non-included block
362 >        numBlocksInDataset = len(blocks)
363  
364 <        ## set job arguments (files)
364 >        jobCount = 0
365          list_of_lists = []
366 <        lastFile=0
367 <        for i in range(0, int(totalFilesToBeUsed), filesPerJob)[:-1]:
368 <            parString = "\\{"
369 <            
370 <            lastFile=i+filesPerJob
371 <            params = self.files[0][i: lastFile]
432 <            for i in range(len(params) - 1):
433 <                parString += '\\\"' + params[i] + '\\\"\,'
366 >
367 >        # ---- Iterate over the blocks in the dataset until ---- #
368 >        # ---- we've met the requested total # of events    ---- #
369 >        while ( (eventsRemaining > 0) and (blockCount < numBlocksInDataset) and (jobCount < totalNumberOfJobs)):
370 >            block = blocks[blockCount]
371 >            blockCount += 1
372              
435            parString += '\\\"' + params[len(params) - 1] + '\\\"\\}'
436            list_of_lists.append([parString])
437            pass
373  
374 <        ## last job
375 <        parString = "\\{"
376 <        
377 <        params = self.files[0][lastFile: lastFile+filesLastJob]
378 <        for i in range(len(params) - 1):
379 <            parString += '\\\"' + params[i] + '\\\"\,'
374 >            numEventsInBlock = self.eventsbyblock[block]
375 >            common.logger.debug(5,'Events in Block File '+str(numEventsInBlock))
376 >            
377 >            files = self.filesbyblock[block]
378 >            numFilesInBlock = len(files)
379 >            if (numFilesInBlock <= 0):
380 >                continue
381 >            fileCount = 0
382 >
383 >            # ---- New block => New job ---- #
384 >            parString = "\\{"
385 >            # counter for number of events in files currently worked on
386 >            filesEventCount = 0
387 >            # flag if next while loop should touch new file
388 >            newFile = 1
389 >            # job event counter
390 >            jobSkipEventCount = 0
391 >            
392 >            # ---- Iterate over the files in the block until we've met the requested ---- #
393 >            # ---- total # of events or we've gone over all the files in this block  ---- #
394 >            while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
395 >                file = files[fileCount]
396 >                if newFile :
397 >                    try:
398 >                        numEventsInFile = self.eventsbyfile[file]
399 >                        common.logger.debug(6, "File "+str(file)+" has "+str(numEventsInFile)+" events")
400 >                        # increase filesEventCount
401 >                        filesEventCount += numEventsInFile
402 >                        # Add file to current job
403 >                        parString += '\\\"' + file + '\\\"\,'
404 >                        newFile = 0
405 >                    except KeyError:
406 >                        common.logger.message("File "+str(file)+" has unknown number of events: skipping")
407 >                        
408 >
409 >                # if less events in file remain than eventsPerJobRequested
410 >                if ( filesEventCount - jobSkipEventCount < eventsPerJobRequested ) :
411 >                    # if last file in block
412 >                    if ( fileCount == numFilesInBlock-1 ) :
413 >                        # end job using last file, use remaining events in block
414 >                        # close job and touch new file
415 >                        fullString = parString[:-2]
416 >                        fullString += '\\}'
417 >                        list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
418 >                        common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(filesEventCount - jobSkipEventCount)+" events (last file in block).")
419 >                        self.jobDestination.append(blockSites[block])
420 >                        common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
421 >                        # reset counter
422 >                        jobCount = jobCount + 1
423 >                        totalEventCount = totalEventCount + filesEventCount - jobSkipEventCount
424 >                        eventsRemaining = eventsRemaining - filesEventCount + jobSkipEventCount
425 >                        jobSkipEventCount = 0
426 >                        # reset file
427 >                        parString = "\\{"
428 >                        filesEventCount = 0
429 >                        newFile = 1
430 >                        fileCount += 1
431 >                    else :
432 >                        # go to next file
433 >                        newFile = 1
434 >                        fileCount += 1
435 >                # if events in file equal to eventsPerJobRequested
436 >                elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
437 >                    # close job and touch new file
438 >                    fullString = parString[:-2]
439 >                    fullString += '\\}'
440 >                    list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
441 >                    common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
442 >                    self.jobDestination.append(blockSites[block])
443 >                    common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
444 >                    # reset counter
445 >                    jobCount = jobCount + 1
446 >                    totalEventCount = totalEventCount + eventsPerJobRequested
447 >                    eventsRemaining = eventsRemaining - eventsPerJobRequested
448 >                    jobSkipEventCount = 0
449 >                    # reset file
450 >                    parString = "\\{"
451 >                    filesEventCount = 0
452 >                    newFile = 1
453 >                    fileCount += 1
454 >                    
455 >                # if more events in file remain than eventsPerJobRequested
456 >                else :
457 >                    # close job but don't touch new file
458 >                    fullString = parString[:-2]
459 >                    fullString += '\\}'
460 >                    list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
461 >                    common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
462 >                    self.jobDestination.append(blockSites[block])
463 >                    common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
464 >                    # increase counter
465 >                    jobCount = jobCount + 1
466 >                    totalEventCount = totalEventCount + eventsPerJobRequested
467 >                    eventsRemaining = eventsRemaining - eventsPerJobRequested
468 >                    # calculate skip events for last file
469 >                    # use filesEventCount (contains several files), jobSkipEventCount and eventsPerJobRequest
470 >                    jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
471 >                    # remove all but the last file
472 >                    filesEventCount = self.eventsbyfile[file]
473 >                    parString = "\\{"
474 >                    parString += '\\\"' + file + '\\\"\,'
475 >                pass # END if
476 >            pass # END while (iterate over files in the block)
477 >        pass # END while (iterate over blocks in the dataset)
478 >        self.ncjobs = self.total_number_of_jobs = jobCount
479 >        if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
480 >            common.logger.message("Could not run on all requested events because some blocks not hosted at allowed sites.")
481 >        common.logger.message("\n"+str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
482          
446        parString += '\\\"' + params[len(params) - 1] + '\\\"\\}'
447        list_of_lists.append([parString])
448        pass
449
483          self.list_of_args = list_of_lists
451        # print self.list_of_args[0]
484          return
485  
486      def jobSplittingNoInput(self):
# Line 477 | Line 509 | class Cmssw(JobType):
509  
510          common.logger.debug(5,'Check  '+str(check))
511  
512 <        common.logger.message(str(self.total_number_of_jobs)+' jobs will be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
512 >        common.logger.message(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
513          if check > 0:
514 <            common.logger.message('Warning: asked '+str(self.total_number_of_events)+' but will do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
483 <
514 >            common.logger.message('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
515  
516          # argument is seed number.$i
517          self.list_of_args = []
518          for i in range(self.total_number_of_jobs):
519 +            ## Since there is no input, any site is good
520 +           # self.jobDestination.append(["Any"])
521 +            self.jobDestination.append([""]) #must be empty to write correctly the xml
522              if (self.sourceSeed):
523                  if (self.sourceSeedVtx):
524                      ## pythia + vtx random seed
# Line 502 | Line 536 | class Cmssw(JobType):
536  
537          return
538  
539 +
540 +    def jobSplittingForScript(self):#CarlosDaniele
541 +        """
542 +        Perform job splitting based on number of job
543 +        """
544 +        common.logger.debug(5,'Splitting per job')
545 +        common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
546 +
547 +        self.total_number_of_jobs = self.theNumberOfJobs
548 +
549 +        common.logger.debug(5,'N jobs  '+str(self.total_number_of_jobs))
550 +
551 +        common.logger.message(str(self.total_number_of_jobs)+' jobs can be created')
552 +
553 +        # argument is seed number.$i
554 +        self.list_of_args = []
555 +        for i in range(self.total_number_of_jobs):
556 +            ## Since there is no input, any site is good
557 +           # self.jobDestination.append(["Any"])
558 +            self.jobDestination.append([""])
559 +            ## no random seed
560 +            self.list_of_args.append([str(i)])
561 +        return
562 +
563      def split(self, jobParams):
564  
565          common.jobDB.load()
# Line 517 | Line 575 | class Cmssw(JobType):
575              # print str(arglist[job])
576              # print jobParams[job]
577              common.jobDB.setArguments(job, jobParams[job])
578 +            common.logger.debug(5,"Job "+str(job)+" Destination: "+str(self.jobDestination[job]))
579 +            common.jobDB.setDestination(job, self.jobDestination[job])
580  
581          common.jobDB.save()
582          return
# Line 531 | Line 591 | class Cmssw(JobType):
591          # Fabio
592          return self.total_number_of_jobs
593  
534    def checkBlackList(self, allSites):
535        if len(self.reCEBlackList)==0: return allSites
536        sites = []
537        for site in allSites:
538            common.logger.debug(10,'Site '+site)
539            good=1
540            for re in self.reCEBlackList:
541                if re.search(site):
542                    common.logger.message('CE in black list, skipping site '+site)
543                    good=0
544                pass
545            if good: sites.append(site)
546        if len(sites) == 0:
547            common.logger.debug(3,"No sites found after BlackList")
548        return sites
549
550    def checkWhiteList(self, allSites):
551
552        if len(self.reCEWhiteList)==0: return allSites
553        sites = []
554        for site in allSites:
555            good=0
556            for re in self.reCEWhiteList:
557                if re.search(site):
558                    common.logger.debug(5,'CE in white list, adding site '+site)
559                    good=1
560                if not good: continue
561                sites.append(site)
562        if len(sites) == 0:
563            common.logger.message("No sites found after WhiteList\n")
564        else:
565            common.logger.debug(5,"Selected sites via WhiteList are "+str(sites)+"\n")
566        return sites
567
594      def getTarBall(self, exe):
595          """
596          Return the TarBall with lib and exe
# Line 660 | Line 686 | class Cmssw(JobType):
686          txt += 'if [ $middleware == LCG ]; then \n'
687          txt += self.wsSetupCMSLCGEnvironment_()
688          txt += 'elif [ $middleware == OSG ]; then\n'
689 <        txt += '    time=`date -u +"%s"`\n'
690 <        txt += '    WORKING_DIR=$OSG_WN_TMP/cms_$time\n'
665 <        txt += '    echo "Creating working directory: $WORKING_DIR"\n'
666 <        txt += '    /bin/mkdir -p $WORKING_DIR\n'
689 >        txt += '    WORKING_DIR=`/bin/mktemp  -d $OSG_WN_TMP/cms_XXXXXXXXXXXX`\n'
690 >        txt += '    echo "Created working directory: $WORKING_DIR"\n'
691          txt += '    if [ ! -d $WORKING_DIR ] ;then\n'
692          txt += '        echo "SET_CMS_ENV 10016 ==> OSG $WORKING_DIR could not be created on WN `hostname`"\n'
693          txt += '        echo "JOB_EXIT_STATUS = 10016"\n'
# Line 721 | Line 745 | class Cmssw(JobType):
745          txt += "## number of arguments (first argument always jobnumber)\n"
746          txt += "\n"
747   #        txt += "narg=$#\n"
724 # Malina fix
748          txt += "if [ $nargs -lt 2 ]\n"
749          txt += "then\n"
750 <        txt += "    echo 'SET_EXE_ENV 1 ==> ERROR Too few arguments' +$narg+ \n"
750 >        txt += "    echo 'SET_EXE_ENV 1 ==> ERROR Too few arguments' +$nargs+ \n"
751          txt += '    echo "JOB_EXIT_STATUS = 50113"\n'
752          txt += '    echo "JobExitCode=50113" | tee -a $RUNTIME_AREA/$repo\n'
753          txt += '    dumpStatus $RUNTIME_AREA/$repo\n'
# Line 752 | Line 775 | class Cmssw(JobType):
775  
776          # Prepare job-specific part
777          job = common.job_list[nj]
778 <        pset = os.path.basename(job.configFilename())
779 <        txt += '\n'
780 <        if (self.datasetPath): # standard job
781 <            #txt += 'InputFiles=$2\n'
782 <            txt += 'InputFiles=${args[1]}\n'
783 <            txt += 'echo "Inputfiles:<$InputFiles>"\n'
784 <            txt += 'sed "s#{\'INPUT\'}#$InputFiles#" $RUNTIME_AREA/'+pset+' > pset.cfg\n'
785 <        else:  # pythia like job
786 <            if (self.sourceSeed):
787 <                txt += 'Seed=$2\n'
788 <                txt += 'echo "Seed: <$Seed>"\n'
789 <                txt += 'sed "s#\<INPUT\>#$Seed#" $RUNTIME_AREA/'+pset+' > tmp.cfg\n'
790 <                if (self.sourceSeedVtx):
791 <                    txt += 'VtxSeed=$3\n'
792 <                    txt += 'echo "VtxSeed: <$VtxSeed>"\n'
793 <                    txt += 'sed "s#INPUTVTX#$VtxSeed#" tmp.cfg > pset.cfg\n'
778 >        if self.pset != None: #CarlosDaniele
779 >            pset = os.path.basename(job.configFilename())
780 >            txt += '\n'
781 >            if (self.datasetPath): # standard job
782 >                #txt += 'InputFiles=$2\n'
783 >                txt += 'InputFiles=${args[1]}\n'
784 >                txt += 'MaxEvents=${args[2]}\n'
785 >                txt += 'SkipEvents=${args[3]}\n'
786 >                txt += 'echo "Inputfiles:<$InputFiles>"\n'
787 >                txt += 'sed "s#{\'INPUT\'}#$InputFiles#" $RUNTIME_AREA/'+pset+' > pset_tmp_1.cfg\n'
788 >                txt += 'echo "MaxEvents:<$MaxEvents>"\n'
789 >                txt += 'sed "s#INPUTMAXEVENTS#$MaxEvents#" pset_tmp_1.cfg > pset_tmp_2.cfg\n'
790 >                txt += 'echo "SkipEvents:<$SkipEvents>"\n'
791 >                txt += 'sed "s#INPUTSKIPEVENTS#$SkipEvents#" pset_tmp_2.cfg > pset.cfg\n'
792 >            else:  # pythia like job
793 >                if (self.sourceSeed):
794 > #                    txt += 'Seed=$2\n'
795 >                    txt += 'Seed=${args[1]}\n'
796 >                    txt += 'echo "Seed: <$Seed>"\n'
797 >                    txt += 'sed "s#\<INPUT\>#$Seed#" $RUNTIME_AREA/'+pset+' > tmp.cfg\n'
798 >                    if (self.sourceSeedVtx):
799 > #                        txt += 'VtxSeed=$3\n'
800 >                        txt += 'VtxSeed=${args[2]}\n'
801 >                        txt += 'echo "VtxSeed: <$VtxSeed>"\n'
802 >                        txt += 'sed "s#INPUTVTX#$VtxSeed#" tmp.cfg > pset.cfg\n'
803 >                    else:
804 >                        txt += 'mv tmp.cfg pset.cfg\n'
805                  else:
806 <                    txt += 'mv tmp.cfg pset.cfg\n'
807 <            else:
774 <                txt += '# Copy untouched pset\n'
775 <                txt += 'cp $RUNTIME_AREA/'+pset+' pset.cfg\n'
806 >                    txt += '# Copy untouched pset\n'
807 >                    txt += 'cp $RUNTIME_AREA/'+pset+' pset.cfg\n'
808  
809  
810          if len(self.additional_inbox_files) > 0:
# Line 784 | Line 816 | class Cmssw(JobType):
816                  txt += 'fi\n'
817              pass
818  
819 <        txt += 'echo "### END JOB SETUP ENVIRONMENT ###"\n\n'
820 <
821 <        txt += '\n'
822 <        txt += 'echo "***** cat pset.cfg *********"\n'
823 <        txt += 'cat pset.cfg\n'
824 <        txt += 'echo "****** end pset.cfg ********"\n'
825 <        txt += '\n'
826 <        # txt += 'echo "***** cat pset1.cfg *********"\n'
827 <        # txt += 'cat pset1.cfg\n'
828 <        # txt += 'echo "****** end pset1.cfg ********"\n'
819 >        if self.pset != None: #CarlosDaniele
820 >            txt += 'echo "### END JOB SETUP ENVIRONMENT ###"\n\n'
821 >        
822 >            txt += '\n'
823 >            txt += 'echo "***** cat pset.cfg *********"\n'
824 >            txt += 'cat pset.cfg\n'
825 >            txt += 'echo "****** end pset.cfg ********"\n'
826 >            txt += '\n'
827 >            # txt += 'echo "***** cat pset1.cfg *********"\n'
828 >            # txt += 'cat pset1.cfg\n'
829 >            # txt += 'echo "****** end pset1.cfg ********"\n'
830          return txt
831  
832      def wsBuildExe(self, nj):
# Line 842 | Line 875 | class Cmssw(JobType):
875          """
876          
877      def executableName(self):
878 <        return self.executable
878 >        if self.pset == None: #CarlosDaniele
879 >            return "sh "
880 >        else:
881 >            return self.executable
882  
883      def executableArgs(self):
884 <        return " -p pset.cfg"
884 >        if self.pset == None:#CarlosDaniele
885 >            return   self.scriptExe + " $NJob"
886 >        else:
887 >            return " -p pset.cfg"
888  
889      def inputSandbox(self, nj):
890          """
# Line 858 | Line 897 | class Cmssw(JobType):
897          if os.path.isfile(self.tgzNameWithPath):
898              inp_box.append(self.tgzNameWithPath)
899          ## config
900 <        inp_box.append(common.job_list[nj].configFilename())
900 >        if not self.pset is None: #CarlosDaniele
901 >            inp_box.append(common.job_list[nj].configFilename())
902          ## additional input files
903          #for file in self.additional_inbox_files:
904          #    inp_box.append(common.work_space.cwdDir()+file)
# Line 952 | Line 992 | class Cmssw(JobType):
992          # add "_txt"
993          if len(p)>1:
994            ext = p[len(p)-1]
955          #result = name + '_' + str(txt) + "." + ext
995            result = name + '_' + txt + "." + ext
996          else:
958          #result = name + '_' + str(txt)
997            result = name + '_' + txt
998          
999          return result
# Line 965 | Line 1003 | class Cmssw(JobType):
1003          return job requirements to add to jdl files
1004          """
1005          req = ''
1006 <        if common.analisys_common_info['sw_version']:
1006 >        if self.version:
1007              req='Member("VO-cms-' + \
1008 <                 common.analisys_common_info['sw_version'] + \
1008 >                 self.version + \
1009                   '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
1010 <        if common.analisys_common_info['sites']:
1011 <            if len(common.analisys_common_info['sites'])>0:
1012 <                req = req + ' && ('
975 <                for i in range(len(common.analisys_common_info['sites'])):
976 <                    req = req + 'other.GlueCEInfoHostName == "' \
977 <                         + common.analisys_common_info['sites'][i] + '"'
978 <                    if ( i < (int(len(common.analisys_common_info['sites']) - 1)) ):
979 <                        req = req + ' || '
980 <            req = req + ')'
981 <        #print "req = ", req
1010 >
1011 >        req = req + ' && (other.GlueHostNetworkAdapterOutboundIP)'
1012 >
1013          return req
1014  
1015      def configFilename(self):
# Line 996 | Line 1027 | class Cmssw(JobType):
1027          txt += '   if [ -f $GRID3_APP_DIR/cmssoft/cmsset_default.sh ] ;then\n'
1028          txt += '      # Use $GRID3_APP_DIR/cmssoft/cmsset_default.sh to setup cms software\n'
1029          txt += '       source $GRID3_APP_DIR/cmssoft/cmsset_default.sh '+self.version+'\n'
1030 <        txt += '   elif [ -f $OSG_APP/cmssoft/cmsset_default.sh ] ;then\n'
1031 <        txt += '      # Use $OSG_APP/cmssoft/cmsset_default.sh to setup cms software\n'
1032 <        txt += '       source $OSG_APP/cmssoft/cmsset_default.sh '+self.version+'\n'
1030 >        txt += '   elif [ -f $OSG_APP/cmssoft/cms/cmsset_default.sh ] ;then\n'
1031 >        txt += '      # Use $OSG_APP/cmssoft/cms/cmsset_default.sh to setup cms software\n'
1032 >        txt += '       source $OSG_APP/cmssoft/cms/cmsset_default.sh '+self.version+'\n'
1033          txt += '   else\n'
1034 <        txt += '       echo "SET_CMS_ENV 10020 ==> ERROR $GRID3_APP_DIR/cmssoft/cmsset_default.sh and $OSG_APP/cmssoft/cmsset_default.sh file not found"\n'
1034 >        txt += '       echo "SET_CMS_ENV 10020 ==> ERROR $GRID3_APP_DIR/cmssoft/cmsset_default.sh and $OSG_APP/cmssoft/cms/cmsset_default.sh file not found"\n'
1035          txt += '       echo "JOB_EXIT_STATUS = 10020"\n'
1036          txt += '       echo "JobExitCode=10020" | tee -a $RUNTIME_AREA/$repo\n'
1037          txt += '       dumpStatus $RUNTIME_AREA/$repo\n'
# Line 1013 | Line 1044 | class Cmssw(JobType):
1044          txt += '       cd $RUNTIME_AREA\n'
1045          txt += '       /bin/rm -rf $WORKING_DIR\n'
1046          txt += '       if [ -d $WORKING_DIR ] ;then\n'
1047 <        txt += '            echo "SET_CMS_ENV 10017 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after $GRID3_APP_DIR/cmssoft/cmsset_default.sh and $OSG_APP/cmssoft/cmsset_default.sh file not found"\n'
1047 >        txt += '            echo "SET_CMS_ENV 10017 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after $GRID3_APP_DIR/cmssoft/cmsset_default.sh and $OSG_APP/cmssoft/cms/cmsset_default.sh file not found"\n'
1048          txt += '            echo "JOB_EXIT_STATUS = 10017"\n'
1049          txt += '            echo "JobExitCode=10017" | tee -a $RUNTIME_AREA/$repo\n'
1050          txt += '            dumpStatus $RUNTIME_AREA/$repo\n'
# Line 1106 | Line 1137 | class Cmssw(JobType):
1137          
1138      def getTaskid(self):
1139          return self._taskId
1140 +
1141 + #######################################################################
1142 +    def uniquelist(self, old):
1143 +        """
1144 +        remove duplicates from a list
1145 +        """
1146 +        nd={}
1147 +        for e in old:
1148 +            nd[e]=0
1149 +        return nd.keys()

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines