ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cms_cmssw.py
(Generate patch)

Comparing COMP/CRAB/python/cms_cmssw.py (file contents):
Revision 1.9 by slacapra, Tue Jun 20 15:39:17 2006 UTC vs.
Revision 1.35 by gutsche, Thu Aug 3 22:44:40 2006 UTC

# Line 2 | Line 2 | from JobType import JobType
2   from crab_logger import Logger
3   from crab_exceptions import *
4   from crab_util import *
5 + import math
6   import common
7   import PsetManipulator  
8  
# Line 44 | Line 45 | class Cmssw(JobType):
45              log.debug(6, "CMSSW::CMSSW(): datasetPath = "+tmp)
46              if string.lower(tmp)=='none':
47                  self.datasetPath = None
48 +                self.selectNoInput = 1
49              else:
50                  self.datasetPath = tmp
51 +                self.selectNoInput = 0
52          except KeyError:
53              msg = "Error: datasetpath not defined "  
54              raise CrabException(msg)
# Line 109 | Line 112 | class Cmssw(JobType):
112  
113          # script_exe file as additional file in inputSandbox
114          try:
115 <           self.scriptExe = cfg_params['USER.script_exe']
116 <           self.additional_inbox_files.append(self.scriptExe)
115 >            self.scriptExe = cfg_params['USER.script_exe']
116 >            self.additional_inbox_files.append(self.scriptExe)
117 >            if self.scriptExe != '':
118 >               if not os.path.isfile(self.scriptExe):
119 >                  msg ="WARNING. file "+self.scriptExe+" not found"
120 >                  raise CrabException(msg)
121          except KeyError:
122             pass
116        if self.scriptExe != '':
117           if os.path.isfile(self.scriptExe):
118              pass
119           else:
120              log.message("WARNING. file "+self.scriptExe+" not found")
121              sys.exit()
123                    
124          ## additional input files
125          try:
126 <            tmpAddFiles = string.split(cfg_params['CMSSW.additional_input_files'],',')
126 >            tmpAddFiles = string.split(cfg_params['USER.additional_input_files'],',')
127              for tmp in tmpAddFiles:
128                  if not os.path.exists(tmp):
129                      raise CrabException("Additional input file not found: "+tmp)
130 <                tmp=string.strip(tmp)
130 <                self.additional_inbox_files.append(tmp)
130 >                self.additional_inbox_files.append(string.strip(tmp))
131                  pass
132              pass
133          except KeyError:
# Line 135 | Line 135 | class Cmssw(JobType):
135  
136          # files per job
137          try:
138 <            self.filesPerJob = int(cfg_params['CMSSW.files_per_jobs']) #Daniele
139 <            self.selectFilesPerJob = 1
138 >            if (cfg_params['CMSSW.files_per_jobs']):
139 >                raise CrabException("files_per_jobs no longer supported.  Quitting.")
140          except KeyError:
141 <            self.filesPerJob = 0
142 <            self.selectFilesPerJob = 0
141 >            pass
142  
143          ## Events per job
144          try:
145 <            self.eventsPerJob =int( cfg_params['CMSSW.event_per_job'])
145 >            self.eventsPerJob =int( cfg_params['CMSSW.events_per_job'])
146              self.selectEventsPerJob = 1
147          except KeyError:
148              self.eventsPerJob = -1
149              self.selectEventsPerJob = 0
150      
151 <        if (self.selectFilesPerJob == self.selectEventsPerJob):
152 <            msg = 'Must define either files_per_jobs or event_per_job'
153 <            raise CrabException(msg)
151 >        ## number of jobs
152 >        try:
153 >            self.theNumberOfJobs =int( cfg_params['CMSSW.number_of_jobs'])
154 >            self.selectNumberOfJobs = 1
155 >        except KeyError:
156 >            self.theNumberOfJobs = 0
157 >            self.selectNumberOfJobs = 0
158  
159          try:
160              self.total_number_of_events = int(cfg_params['CMSSW.total_number_of_events'])
161 +            self.selectTotalNumberEvents = 1
162          except KeyError:
163 <            msg = 'Must define total_number_of_events'
163 >            self.total_number_of_events = 0
164 >            self.selectTotalNumberEvents = 0
165 >
166 >        if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
167 >            msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
168              raise CrabException(msg)
169 <        
170 <        CEBlackList = []
169 >
170 >        ## source seed for pythia
171          try:
172 <            tmpBad = string.split(cfg_params['EDG.ce_black_list'],',')
165 <            for tmp in tmpBad:
166 <                tmp=string.strip(tmp)
167 <                CEBlackList.append(tmp)
172 >            self.sourceSeed = int(cfg_params['CMSSW.pythia_seed'])
173          except KeyError:
174 <            pass
174 >            self.sourceSeed = None
175 >            common.logger.debug(5,"No seed given")
176  
177 <        self.reCEBlackList=[]
178 <        for bad in CEBlackList:
173 <            self.reCEBlackList.append(re.compile( bad ))
174 <
175 <        common.logger.debug(5,'CEBlackList: '+str(CEBlackList))
176 <
177 <        CEWhiteList = []
178 <        try:
179 <            tmpGood = string.split(cfg_params['EDG.ce_white_list'],',')
180 <            for tmp in tmpGood:
181 <                tmp=string.strip(tmp)
182 <                CEWhiteList.append(tmp)
177 >        try:
178 >            self.sourceSeedVtx = int(cfg_params['CMSSW.vtx_seed'])
179          except KeyError:
180 <            pass
181 <
186 <        #print 'CEWhiteList: ',CEWhiteList
187 <        self.reCEWhiteList=[]
188 <        for Good in CEWhiteList:
189 <            self.reCEWhiteList.append(re.compile( Good ))
190 <
191 <        common.logger.debug(5,'CEWhiteList: '+str(CEWhiteList))
180 >            self.sourceSeedVtx = None
181 >            common.logger.debug(5,"No vertex seed given")
182  
183          self.PsetEdit = PsetManipulator.PsetManipulator(self.pset) #Daniele Pset
184  
# Line 196 | Line 186 | class Cmssw(JobType):
186          ## Initialize the variables that are extracted from DBS/DLS and needed in other places of the code
187          self.maxEvents=0  # max events available   ( --> check the requested nb. of evts in Creator.py)
188          self.DBSPaths={}  # all dbs paths requested ( --> input to the site local discovery script)
189 +        self.jobDestination=[]  # Site destination(s) for each job (list of lists)
190          ## Perform the data location and discovery (based on DBS/DLS)
191          ## SL: Don't if NONE is specified as input (pythia use case)
192 <        common.analisys_common_info['sites']=None
192 >        blockSites = {}
193          if self.datasetPath:
194 <            self.DataDiscoveryAndLocation(cfg_params)
194 >            blockSites = self.DataDiscoveryAndLocation(cfg_params)
195          #DBSDLS-end          
196  
197          self.tgzNameWithPath = self.getTarBall(self.executable)
198 <
198 >    
199          ## Select Splitting
200 <        if self.selectFilesPerJob: self.jobSplittingPerFiles()
201 <        elif self.selectEventsPerJob: self.jobSplittingPerEvents()
211 <        else:
212 <            msg = 'Don\'t know how to split...'
213 <            raise CrabException(msg)
214 <        
215 <        self.PsetEdit.maxEvent(self.eventsPerJob) #Daniele  
216 <        self.PsetEdit.inputModule("INPUT") #Daniele  
217 <        self.PsetEdit.psetWriter(self.configFilename())
200 >        if self.selectNoInput: self.jobSplittingNoInput()
201 >        else: self.jobSplittingByBlocks(blockSites)
202  
203 +        # modify Pset
204 +        try:
205 +            if (self.datasetPath): # standard job
206 +                # always process all events in a file
207 +                self.PsetEdit.maxEvent("-1")
208 +                self.PsetEdit.inputModule("INPUT")
209 +
210 +            else:  # pythia like job
211 +                self.PsetEdit.maxEvent(self.eventsPerJob)
212 +                if (self.sourceSeed) :
213 +                    self.PsetEdit.pythiaSeed("INPUT")
214 +                    if (self.sourceSeedVtx) :
215 +                        self.PsetEdit.pythiaSeedVtx("INPUTVTX")
216 +            self.PsetEdit.psetWriter(self.configFilename())
217 +        except:
218 +            msg='Error while manipuliating ParameterSet: exiting...'
219 +            raise CrabException(msg)
220  
221      def DataDiscoveryAndLocation(self, cfg_params):
222  
# Line 247 | Line 248 | class Cmssw(JobType):
248          ## self.DBSPaths=self.pubdata.getDBSPaths()
249          common.logger.message("Required data are :"+self.datasetPath)
250  
251 <        filesbyblock=self.pubdata.getFiles()
252 <        self.AllInputFiles=filesbyblock.values()
253 <        self.files = self.AllInputFiles        
254 <
254 <        ## TEMP
255 <    #    self.filesTmp = filesbyblock.values()
256 <    #    self.files = []
257 <    #    locPath='rfio:cmsbose2.bo.infn.it:/flatfiles/SE00/cms/fanfani/ProdTest/'
258 <    #    locPath=''
259 <    #    tmp = []
260 <    #    for file in self.filesTmp[0]:
261 <    #        tmp.append(locPath+file)
262 <    #    self.files.append(tmp)
263 <        ## END TEMP
251 >        self.filesbyblock=self.pubdata.getFiles()
252 >        self.eventsbyblock=self.pubdata.getEVC()
253 >        ## SL we probably don't need this
254 >        self.files = self.filesbyblock.values()
255  
256          ## get max number of events
266        #common.logger.debug(10,"number of events for primary fileblocks %i"%self.pubdata.getMaxEvents())
257          self.maxEvents=self.pubdata.getMaxEvents() ##  self.maxEvents used in Creator.py
258          common.logger.message("\nThe number of available events is %s"%self.maxEvents)
259  
260          ## Contact the DLS and build a list of sites hosting the fileblocks
261          try:
262 <            dataloc=DataLocation_EDM.DataLocation_EDM(filesbyblock.keys(),cfg_params)
262 >            dataloc=DataLocation_EDM.DataLocation_EDM(self.filesbyblock.keys(),cfg_params)
263              dataloc.fetchDLSInfo()
264          except DataLocation_EDM.DataLocationError , ex:
265              msg = 'ERROR ***: failed Data Location in DLS \n %s '%ex.getErrorMessage()
266              raise CrabException(msg)
267          
278        allsites=dataloc.getSites()
279        common.logger.debug(5,"sites are %s"%allsites)
280        sites=self.checkBlackList(allsites)
281        common.logger.debug(5,"sites are (after black list) %s"%sites)
282        sites=self.checkWhiteList(sites)
283        common.logger.debug(5,"sites are (after white list) %s"%sites)
268  
269 <        if len(sites)==0:
270 <            msg = 'No sites hosting all the needed data! Exiting... '
271 <            raise CrabException(msg)
269 >        sites = dataloc.getSites()
270 >        allSites = []
271 >        listSites = sites.values()
272 >        for list in listSites:
273 >            for oneSite in list:
274 >                allSites.append(oneSite)
275 >        allSites = self.uniquelist(allSites)
276  
277 <        common.logger.message("List of Sites hosting the data : "+str(sites))
278 <        common.logger.debug(6, "List of Sites: "+str(sites))
291 <        common.analisys_common_info['sites']=sites    ## used in SchedulerEdg.py in createSchScript
277 >        common.logger.message("Sites ("+str(len(allSites))+") hosting part/all of dataset: "+str(allSites))
278 >        common.logger.debug(6, "List of Sites: "+str(allSites))
279          self.setParam_('TargetCE', ','.join(sites))
280 <        return
280 >        return sites
281      
282 <    def jobSplittingPerFiles(self):
296 <        """
297 <        Perform job splitting based on number of files to be accessed per job
282 >    def jobSplittingByBlocks(self, blockSites):
283          """
284 <        common.logger.debug(5,'Splitting per input files')
285 <        common.logger.message('Required '+str(self.filesPerJob)+' files per job ')
286 <        common.logger.message('Required '+str(self.total_number_of_events)+' events in total ')
287 <
288 <        ## TODO: SL need to have (from DBS) a detailed list of how many events per each file
289 <        n_tot_files = (len(self.files[0]))
290 <        ## SL: this is wrong if the files have different number of events
291 <        evPerFile = int(self.maxEvents)/n_tot_files
292 <        
293 <        common.logger.debug(5,'Events per File '+str(evPerFile))
294 <
295 <        ## if asked to process all events, do it
296 <        if self.total_number_of_events == -1:
297 <            self.total_number_of_events=self.maxEvents
298 <            self.total_number_of_jobs = int(n_tot_files)*1/int(self.filesPerJob)
299 <            common.logger.message(str(self.total_number_of_jobs)+' jobs will be created for all available events '+str(self.total_number_of_events)+' events')
300 <        
284 >        Perform job splitting. Jobs run over an integer number of files
285 >        and no more than one block.
286 >        ARGUMENT: blockSites: dictionary with blocks as keys and list of host sites as values
287 >        REQUIRES: self.selectTotalNumberEvents, self.selectEventsPerJob, self.selectNumberofJobs,
288 >                  self.total_number_of_events, self.eventsPerJob, self.theNumberOfJobs,
289 >                  self.maxEvents, self.filesbyblock
290 >        SETS: self.jobDestination - Site destination(s) for each job (a list of lists)
291 >              self.total_number_of_jobs - Total # of jobs
292 >              self.list_of_args - File(s) job will run on (a list of lists)
293 >        """
294 >
295 >        # ---- Handle the possible job splitting configurations ---- #
296 >        if (self.selectTotalNumberEvents):
297 >            totalEventsRequested = self.total_number_of_events
298 >        if (self.selectEventsPerJob):
299 >            eventsPerJobRequested = self.eventsPerJob
300 >            if (self.selectNumberOfJobs):
301 >                totalEventsRequested = self.theNumberOfJobs * self.eventsPerJob
302 >
303 >        # If user requested all the events in the dataset
304 >        if (totalEventsRequested == -1):
305 >            eventsRemaining=self.maxEvents
306 >        # If user requested more events than are in the dataset
307 >        elif (totalEventsRequested > self.maxEvents):
308 >            eventsRemaining = self.maxEvents
309 >            common.logger.message("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
310 >        # If user requested less events than are in the dataset
311          else:
312 <            self.total_number_of_files = int(self.total_number_of_events/evPerFile)
318 <            ## SL: if ask for less event than what is computed to be available on a
319 <            ##     file, process the first file anyhow.
320 <            if self.total_number_of_files == 0:
321 <                self.total_number_of_files = self.total_number_of_files + 1
312 >            eventsRemaining = totalEventsRequested
313  
314 <            common.logger.debug(5,'N files  '+str(self.total_number_of_files))
314 >        # For user info at end
315 >        totalEventCount = 0
316  
317 <            check = 0
318 <            
319 <            ## Compute the number of jobs
320 <            #self.total_number_of_jobs = int(n_tot_files)*1/int(self.filesPerJob)
321 <            self.total_number_of_jobs = int(self.total_number_of_files/self.filesPerJob)
330 <            common.logger.debug(5,'N jobs  '+str(self.total_number_of_jobs))
317 >        if (self.selectTotalNumberEvents and self.selectNumberOfJobs):
318 >            eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs)
319 >
320 >        if (self.selectNumberOfJobs):
321 >            common.logger.message("May not create the exact number_of_jobs requested.")
322  
323 <            ## is there any remainder?
324 <            check = int(self.total_number_of_files) - (int(self.total_number_of_jobs)*self.filesPerJob)
323 >        blocks = blockSites.keys()
324 >        blockCount = 0
325 >        # Backup variable in case self.maxEvents counted events in a non-included block
326 >        numBlocksInDataset = len(blocks)
327  
328 <            common.logger.debug(5,'Check  '+str(check))
328 >        jobCount = 0
329 >        list_of_lists = []
330  
331 <            if check > 0:
332 <                self.total_number_of_jobs =  self.total_number_of_jobs + 1
333 <                common.logger.message('Warning: last job will be created with '+str(check)+' files')
331 >        # ---- Iterate over the blocks in the dataset until ---- #
332 >        # ---- we've met the requested total # of events    ---- #
333 >        while ( (eventsRemaining > 0) and (blockCount < numBlocksInDataset) ):
334 >            block = blocks[blockCount]
335  
341            common.logger.message(str(self.total_number_of_jobs)+' jobs will be created for a total of '+str((self.total_number_of_jobs-1)*self.filesPerJob*evPerFile + check*evPerFile)+' events')
342            pass
336  
337 <        list_of_lists = []
338 <        for i in xrange(0, int(n_tot_files), self.filesPerJob):
339 <            parString = "\\{"
337 >            evInBlock = self.eventsbyblock[block]
338 >            common.logger.debug(5,'Events in Block File '+str(evInBlock))
339 >
340 >            #Correct - switch to this when DBS up
341 >            #numEventsInBlock = self.eventsbyblock[block]
342 >            numEventsInBlock = evInBlock
343              
344 <            params = self.files[0][i: i+self.filesPerJob]
345 <            for i in range(len(params) - 1):
346 <                parString += '\\\"' + params[i] + '\\\"\,'
344 >            files = self.filesbyblock[block]
345 >            numFilesInBlock = len(files)
346 >            if (numFilesInBlock <= 0):
347 >                continue
348 >            fileCount = 0
349 >
350 >            # ---- New block => New job ---- #
351 >            parString = "\\{"
352 >            jobEventCount = 0
353              
354 <            parString += '\\\"' + params[len(params) - 1] + '\\\"\\}'
355 <            list_of_lists.append(parString)
356 <            pass
357 <
354 >            # ---- Iterate over the files in the block until we've met the requested ---- #
355 >            # ---- total # of events or we've gone over all the files in this block  ---- #
356 >            while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) ):
357 >                file = files[fileCount]
358 >                fileCount = fileCount + 1
359 >                #numEventsInFile = numberEventsInFile(file)
360 >                # !!!!!!!!!!!!!!!!!           Need to get the # of events in each file.         !!!!!!!!!!!!!!!!!!!!!!!!!
361 >                # For now, I'm assuming that all files in a block
362 >                # have the same number of events
363 >                numEventsInFile = numEventsInBlock/numFilesInBlock
364 >                common.logger.debug(5,"Estimated # of events in the file: "+str(numEventsInFile))
365 >                numEventsInFile = int(numEventsInFile)
366 >                common.logger.debug(5,"After rounding down: "+str(numEventsInFile))
367 >                # Add file to current job
368 >                parString += '\\\"' + file + '\\\"\,'
369 >                jobEventCount = jobEventCount + numEventsInFile
370 >                totalEventCount = totalEventCount + numEventsInFile
371 >                eventsRemaining = eventsRemaining - numEventsInFile
372 >                if (jobEventCount >= eventsPerJobRequested):
373 >                    # ---- This job has at least CMSSW.events_per_job => End of job ---- #
374 >                    # Don't need the last \,
375 >                    fullString = parString[:-2]
376 >                    fullString += '\\}'
377 >                    list_of_lists.append([fullString])
378 >                    common.logger.message("Job "+str(jobCount+1)+" can run over approximately "+str(jobEventCount)+" events.")
379 >
380 >                    #self.jobDestination[jobCount] = blockSites[block]
381 >                    self.jobDestination.append(blockSites[block])
382 >                    common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
383 >                    if ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) ):
384 >                        # ---- Still need CMSSW.total_number_of_events ---- #
385 >                        # ---- and not about to jump into a new block  ---- #
386 >                        # ---- => New job                              ---- #
387 >                        parString = "\\{"
388 >                        jobEventCount = 0
389 >                        jobCount = jobCount + 1
390 >                    pass # END if
391 >                pass # END if
392 >            pass # END while (iterate over files in the block)
393 >            if (jobEventCount < eventsPerJobRequested):
394 >                # ---- Job ending prematurely due to end of block => End of job ---- #
395 >                # Don't need the last \,
396 >                fullString = parString[:-2]
397 >                fullString += '\\}'
398 >                list_of_lists.append([fullString])
399 >                common.logger.message("Job "+str(jobCount+1)+" can run over approximately "+str(jobEventCount)+" events.")
400 >                #self.jobDestination[jobCount] = blockSites[block]
401 >                self.jobDestination.append(blockSites[block])
402 >                common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
403 >            pass # END if
404 >            blockCount = blockCount + 1
405 >            jobCount = jobCount + 1
406 >        pass # END while (iterate over blocks in the dataset)
407 >        self.total_number_of_jobs = jobCount
408 >        if (eventsRemaining > 0):
409 >            common.logger.message("Could not run on all requested events because some blocks not hosted at allowed sites.")
410 >        common.logger.message("\n"+str(jobCount)+" job(s) can run on approximately "+str(totalEventCount)+" events.\n")
411 >        
412          self.list_of_args = list_of_lists
357        print self.list_of_args
413          return
414  
415 <    def jobSplittingPerEvents(self):
415 >    def jobSplittingNoInput(self):
416          """
417          Perform job splitting based on number of event per job
418          """
419          common.logger.debug(5,'Splitting per events')
420          common.logger.message('Required '+str(self.eventsPerJob)+' events per job ')
421 +        common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
422          common.logger.message('Required '+str(self.total_number_of_events)+' events in total ')
423  
424 <        self.total_number_of_jobs = int(self.total_number_of_events/self.eventsPerJob)
425 <        
424 >        if (self.total_number_of_events < 0):
425 >            msg='Cannot split jobs per Events with "-1" as total number of events'
426 >            raise CrabException(msg)
427 >
428 >        if (self.selectEventsPerJob):
429 >            self.total_number_of_jobs = int(self.total_number_of_events/self.eventsPerJob)
430 >        elif (self.selectNumberOfJobs) :
431 >            self.total_number_of_jobs = self.theNumberOfJobs
432 >            self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
433 >
434          common.logger.debug(5,'N jobs  '+str(self.total_number_of_jobs))
435  
436          # is there any remainder?
# Line 374 | Line 438 | class Cmssw(JobType):
438  
439          common.logger.debug(5,'Check  '+str(check))
440  
441 +        common.logger.message(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
442          if check > 0:
443 <            common.logger.message('Warning: asked '+self.total_number_of_events+' but will do only '+(int(self.total_number_of_jobs)*self.eventsPerJob))
379 <
380 <        common.logger.message(str(self.total_number_of_jobs)+' jobs will be created for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
443 >            common.logger.message('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
444  
445 +        # argument is seed number.$i
446          self.list_of_args = []
447          for i in range(self.total_number_of_jobs):
448 <            self.list_of_args.append(i)
449 <        print self.list_of_args
448 >            ## Since there is no input, any site is good
449 >            self.jobDestination.append(["Any"])
450 >            if (self.sourceSeed):
451 >                if (self.sourceSeedVtx):
452 >                    ## pythia + vtx random seed
453 >                    self.list_of_args.append([
454 >                                              str(self.sourceSeed)+str(i),
455 >                                              str(self.sourceSeedVtx)+str(i)
456 >                                              ])
457 >                else:
458 >                    ## only pythia random seed
459 >                    self.list_of_args.append([(str(self.sourceSeed)+str(i))])
460 >            else:
461 >                ## no random seed
462 >                self.list_of_args.append([str(i)])
463 >        #print self.list_of_args
464  
465          return
466  
# Line 397 | Line 475 | class Cmssw(JobType):
475              jobParams.append("")
476          
477          for job in range(njobs):
478 <            jobParams[job] = str(arglist[job])
478 >            jobParams[job] = arglist[job]
479 >            # print str(arglist[job])
480 >            # print jobParams[job]
481              common.jobDB.setArguments(job, jobParams[job])
482 +            common.logger.debug(5,"Job "+str(job)+" Destination: "+str(self.jobDestination[job]))
483 +            common.jobDB.setDestination(job, self.jobDestination[job])
484  
485          common.jobDB.save()
486          return
487      
488      def getJobTypeArguments(self, nj, sched):
489 <        return common.jobDB.arguments(nj)
489 >        result = ''
490 >        for i in common.jobDB.arguments(nj):
491 >            result=result+str(i)+" "
492 >        return result
493    
494      def numberOfJobs(self):
495          # Fabio
496          return self.total_number_of_jobs
497  
413    def checkBlackList(self, allSites):
414        if len(self.reCEBlackList)==0: return allSites
415        sites = []
416        for site in allSites:
417            common.logger.debug(10,'Site '+site)
418            good=1
419            for re in self.reCEBlackList:
420                if re.search(site):
421                    common.logger.message('CE in black list, skipping site '+site)
422                    good=0
423                pass
424            if good: sites.append(site)
425        if len(sites) == 0:
426            common.logger.debug(3,"No sites found after BlackList")
427        return sites
428
429    def checkWhiteList(self, allSites):
430
431        if len(self.reCEWhiteList)==0: return allSites
432        sites = []
433        for site in allSites:
434            good=0
435            for re in self.reCEWhiteList:
436                if re.search(site):
437                    common.logger.debug(5,'CE in white list, adding site '+site)
438                    good=1
439                if not good: continue
440                sites.append(site)
441        if len(sites) == 0:
442            common.logger.message("No sites found after WhiteList\n")
443        else:
444            common.logger.debug(5,"Selected sites via WhiteList are "+str(sites)+"\n")
445        return sites
446
498      def getTarBall(self, exe):
499          """
500          Return the TarBall with lib and exe
# Line 548 | Line 599 | class Cmssw(JobType):
599          txt += '        echo "JOB_EXIT_STATUS = 10016"\n'
600          txt += '        echo "JobExitCode=10016" | tee -a $RUNTIME_AREA/$repo\n'
601          txt += '        dumpStatus $RUNTIME_AREA/$repo\n'
602 +        txt += '        rm -f $RUNTIME_AREA/$repo \n'
603 +        txt += '        echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
604 +        txt += '        echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
605          txt += '        exit 1\n'
606          txt += '    fi\n'
607          txt += '\n'
# Line 567 | Line 621 | class Cmssw(JobType):
621          txt += '   echo "JOB_EXIT_STATUS = 10034"\n'
622          txt += '   echo "JobExitCode=10034" | tee -a $RUNTIME_AREA/$repo\n'
623          txt += '   dumpStatus $RUNTIME_AREA/$repo\n'
624 +        txt += '   rm -f $RUNTIME_AREA/$repo \n'
625 +        txt += '   echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
626 +        txt += '   echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
627          ## OLI_Daniele
628          txt += '    if [ $middleware == OSG ]; then \n'
629          txt += '        echo "Remove working directory: $WORKING_DIR"\n'
# Line 577 | Line 634 | class Cmssw(JobType):
634          txt += '            echo "JOB_EXIT_STATUS = 10018"\n'
635          txt += '            echo "JobExitCode=10018" | tee -a $RUNTIME_AREA/$repo\n'
636          txt += '            dumpStatus $RUNTIME_AREA/$repo\n'
637 +        txt += '            rm -f $RUNTIME_AREA/$repo \n'
638 +        txt += '            echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
639 +        txt += '            echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
640          txt += '        fi\n'
641          txt += '    fi \n'
642          txt += '   exit 1 \n'
# Line 590 | Line 650 | class Cmssw(JobType):
650          txt += "\n"
651          txt += "## number of arguments (first argument always jobnumber)\n"
652          txt += "\n"
653 <        txt += "narg=$#\n"
654 <        txt += "if [ $narg -lt 2 ]\n"
653 > #        txt += "narg=$#\n"
654 >        txt += "if [ $nargs -lt 2 ]\n"
655          txt += "then\n"
656 <        txt += "    echo 'SET_EXE_ENV 1 ==> ERROR Too few arguments' +$narg+ \n"
656 >        txt += "    echo 'SET_EXE_ENV 1 ==> ERROR Too few arguments' +$nargs+ \n"
657          txt += '    echo "JOB_EXIT_STATUS = 50113"\n'
658          txt += '    echo "JobExitCode=50113" | tee -a $RUNTIME_AREA/$repo\n'
659          txt += '    dumpStatus $RUNTIME_AREA/$repo\n'
660 +        txt += '    rm -f $RUNTIME_AREA/$repo \n'
661 +        txt += '    echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
662 +        txt += '    echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
663          ## OLI_Daniele
664          txt += '    if [ $middleware == OSG ]; then \n'
665          txt += '        echo "Remove working directory: $WORKING_DIR"\n'
# Line 607 | Line 670 | class Cmssw(JobType):
670          txt += '            echo "JOB_EXIT_STATUS = 50114"\n'
671          txt += '            echo "JobExitCode=50114" | tee -a $RUNTIME_AREA/$repo\n'
672          txt += '            dumpStatus $RUNTIME_AREA/$repo\n'
673 +        txt += '            rm -f $RUNTIME_AREA/$repo \n'
674 +        txt += '            echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
675 +        txt += '            echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
676          txt += '        fi\n'
677          txt += '    fi \n'
678          txt += "    exit 1\n"
# Line 617 | Line 683 | class Cmssw(JobType):
683          job = common.job_list[nj]
684          pset = os.path.basename(job.configFilename())
685          txt += '\n'
686 <        txt += 'InputFiles=$2\n'
687 <        txt += 'echo "<$InputFiles>"\n'
688 <        #txt += 'echo sed "s#{\'INPUT\'}#$InputFiles#" $RUNTIME_AREA/'+pset+' \n'
689 <        txt += 'sed "s#{\'INPUT\'}#$InputFiles#" $RUNTIME_AREA/'+pset+' > pset.cfg\n'
690 <        #txt += 'sed "s#{\'INPUT\'}#${InputFiles}#" $RUNTIME_AREA/'+pset+' > pset1.cfg\n'
686 >        if (self.datasetPath): # standard job
687 >            #txt += 'InputFiles=$2\n'
688 >            txt += 'InputFiles=${args[1]}\n'
689 >            txt += 'echo "Inputfiles:<$InputFiles>"\n'
690 >            txt += 'sed "s#{\'INPUT\'}#$InputFiles#" $RUNTIME_AREA/'+pset+' > pset.cfg\n'
691 >        else:  # pythia like job
692 >            if (self.sourceSeed):
693 > #                txt += 'Seed=$2\n'
694 >                txt += 'Seed=${args[1]}\n'
695 >                txt += 'echo "Seed: <$Seed>"\n'
696 >                txt += 'sed "s#\<INPUT\>#$Seed#" $RUNTIME_AREA/'+pset+' > tmp.cfg\n'
697 >                if (self.sourceSeedVtx):
698 > #                    txt += 'VtxSeed=$3\n'
699 >                    txt += 'VtxSeed=${args[2]}\n'
700 >                    txt += 'echo "VtxSeed: <$VtxSeed>"\n'
701 >                    txt += 'sed "s#INPUTVTX#$VtxSeed#" tmp.cfg > pset.cfg\n'
702 >                else:
703 >                    txt += 'mv tmp.cfg pset.cfg\n'
704 >            else:
705 >                txt += '# Copy untouched pset\n'
706 >                txt += 'cp $RUNTIME_AREA/'+pset+' pset.cfg\n'
707 >
708  
709          if len(self.additional_inbox_files) > 0:
710              for file in self.additional_inbox_files:
711 <                txt += 'if [ -e $RUNTIME_AREA/'+file+' ] ; then\n'
712 <                txt += '   cp $RUNTIME_AREA/'+file+' .\n'
713 <                txt += '   chmod +x '+file+'\n'
711 >                relFile = file.split("/")[-1]
712 >                txt += 'if [ -e $RUNTIME_AREA/'+relFile+' ] ; then\n'
713 >                txt += '   cp $RUNTIME_AREA/'+relFile+' .\n'
714 >                txt += '   chmod +x '+relFile+'\n'
715                  txt += 'fi\n'
716              pass
717  
# Line 664 | Line 748 | class Cmssw(JobType):
748              txt += '       cd $RUNTIME_AREA\n'
749              txt += '       /bin/rm -rf $WORKING_DIR\n'
750              txt += '       if [ -d $WORKING_DIR ] ;then\n'
751 <            txt += '        echo "SET_EXE 50999 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after Untarring .tgz file failed"\n'
752 <            txt += '        echo "JOB_EXIT_STATUS = 50999"\n'
753 <            txt += '        echo "JobExitCode=50999" | tee -a $RUNTIME_AREA/$repo\n'
754 <            txt += '        dumpStatus $RUNTIME_AREA/$repo\n'
751 >            txt += '           echo "SET_EXE 50999 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after Untarring .tgz file failed"\n'
752 >            txt += '           echo "JOB_EXIT_STATUS = 50999"\n'
753 >            txt += '           echo "JobExitCode=50999" | tee -a $RUNTIME_AREA/$repo\n'
754 >            txt += '           dumpStatus $RUNTIME_AREA/$repo\n'
755 >            txt += '           rm -f $RUNTIME_AREA/$repo \n'
756 >            txt += '           echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
757 >            txt += '           echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
758              txt += '       fi\n'
759              txt += '   fi \n'
760              txt += '   \n'
# Line 745 | Line 832 | class Cmssw(JobType):
832              txt += '\n'
833              txt += '# check output file\n'
834              txt += 'ls '+fileWithSuffix+'\n'
835 <            txt += 'exe_result=$?\n'
836 <            txt += 'if [ $exe_result -ne 0 ] ; then\n'
837 <            txt += '   echo "ERROR: No output file to manage"\n'
838 <            txt += '   echo "JOB_EXIT_STATUS = $exe_result"\n'
839 <            txt += '   echo "JobExitCode=60302" | tee -a $RUNTIME_AREA/$repo\n'
840 <            txt += '   dumpStatus $RUNTIME_AREA/$repo\n'
835 >            txt += 'ls_result=$?\n'
836 >            #txt += 'exe_result=$?\n'
837 >            txt += 'if [ $ls_result -ne 0 ] ; then\n'
838 >            txt += '   echo "ERROR: Problem with output file"\n'
839 >            #txt += '   echo "JOB_EXIT_STATUS = $exe_result"\n'
840 >            #txt += '   echo "JobExitCode=60302" | tee -a $RUNTIME_AREA/$repo\n'
841 >            #txt += '   dumpStatus $RUNTIME_AREA/$repo\n'
842              ### OLI_DANIELE
843              if common.scheduler.boss_scheduler_name == 'condor_g':
844                  txt += '    if [ $middleware == OSG ]; then \n'
# Line 764 | Line 852 | class Cmssw(JobType):
852          txt += 'cd $RUNTIME_AREA\n'
853          file_list=file_list[:-1]
854          txt += 'file_list="'+file_list+'"\n'
855 +        txt += 'cd $RUNTIME_AREA\n'
856          ### OLI_DANIELE
857          txt += 'if [ $middleware == OSG ]; then\n'  
858          txt += '    cd $RUNTIME_AREA\n'
# Line 774 | Line 863 | class Cmssw(JobType):
863          txt += '        echo "JOB_EXIT_STATUS = 60999"\n'
864          txt += '        echo "JobExitCode=60999" | tee -a $RUNTIME_AREA/$repo\n'
865          txt += '        dumpStatus $RUNTIME_AREA/$repo\n'
866 +        txt += '        rm -f $RUNTIME_AREA/$repo \n'
867 +        txt += '        echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
868 +        txt += '        echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
869          txt += '    fi\n'
870          txt += 'fi\n'
871          txt += '\n'
# Line 799 | Line 891 | class Cmssw(JobType):
891          
892          return result
893  
894 <    def getRequirements(self):
894 >    def getRequirements(self, nj):
895          """
896          return job requirements to add to jdl files
897          """
898          req = ''
899 <        if common.analisys_common_info['sites']:
900 <            if common.analisys_common_info['sw_version']:
901 <                req='Member("VO-cms-' + \
902 <                     common.analisys_common_info['sw_version'] + \
903 <                     '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
904 <            if len(common.analisys_common_info['sites'])>0:
905 <                req = req + ' && ('
906 <                for i in range(len(common.analisys_common_info['sites'])):
907 <                    req = req + 'other.GlueCEInfoHostName == "' \
908 <                         + common.analisys_common_info['sites'][i] + '"'
909 <                    if ( i < (int(len(common.analisys_common_info['sites']) - 1)) ):
910 <                        req = req + ' || '
911 <            req = req + ')'
912 <        #print "req = ", req
899 >        if common.analisys_common_info['sw_version']:
900 >            req='Member("VO-cms-' + \
901 >                 common.analisys_common_info['sw_version'] + \
902 >                 '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
903 >
904 >        req = req + ' && (other.GlueHostNetworkAdapterOutboundIP)'
905 >
906 >        ## here we should get the requirement for job nj
907 >        sites = common.jobDB.destination(nj)
908 >
909 >        # check for "Any" site, in case no requirement for site
910 >        if len(sites)>0 and sites[0]!="Any":
911 >            req = req + ' && anyMatch(other.storage.CloseSEs, ('
912 >            for site in sites:
913 >                #req = req + 'other.GlueCEInfoHostName == "' + site + '" || '
914 >                req = req + 'target.GlueSEUniqueID=="' + site + '" || '
915 >                pass
916 >            # remove last ||
917 >            req = req[0:-4]
918 >            req = req + '))'
919 >
920          return req
921  
922      def configFilename(self):
# Line 843 | Line 942 | class Cmssw(JobType):
942          txt += '       echo "JOB_EXIT_STATUS = 10020"\n'
943          txt += '       echo "JobExitCode=10020" | tee -a $RUNTIME_AREA/$repo\n'
944          txt += '       dumpStatus $RUNTIME_AREA/$repo\n'
945 +        txt += '       rm -f $RUNTIME_AREA/$repo \n'
946 +        txt += '       echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
947 +        txt += '       echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
948          txt += '       exit 1\n'
949          txt += '\n'
950          txt += '       echo "Remove working directory: $WORKING_DIR"\n'
# Line 853 | Line 955 | class Cmssw(JobType):
955          txt += '            echo "JOB_EXIT_STATUS = 10017"\n'
956          txt += '            echo "JobExitCode=10017" | tee -a $RUNTIME_AREA/$repo\n'
957          txt += '            dumpStatus $RUNTIME_AREA/$repo\n'
958 +        txt += '            rm -f $RUNTIME_AREA/$repo \n'
959 +        txt += '            echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
960 +        txt += '            echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
961          txt += '       fi\n'
962          txt += '\n'
963          txt += '       exit 1\n'
# Line 876 | Line 981 | class Cmssw(JobType):
981          txt += '       echo "JOB_EXIT_STATUS = 10031" \n'
982          txt += '       echo "JobExitCode=10031" | tee -a $RUNTIME_AREA/$repo\n'
983          txt += '       dumpStatus $RUNTIME_AREA/$repo\n'
984 +        txt += '       rm -f $RUNTIME_AREA/$repo \n'
985 +        txt += '       echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
986 +        txt += '       echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
987          txt += '       exit 1\n'
988          txt += '   else\n'
989          txt += '       echo "Sourcing environment... "\n'
# Line 884 | Line 992 | class Cmssw(JobType):
992          txt += '           echo "JOB_EXIT_STATUS = 10020"\n'
993          txt += '           echo "JobExitCode=10020" | tee -a $RUNTIME_AREA/$repo\n'
994          txt += '           dumpStatus $RUNTIME_AREA/$repo\n'
995 +        txt += '           rm -f $RUNTIME_AREA/$repo \n'
996 +        txt += '           echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
997 +        txt += '           echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
998          txt += '           exit 1\n'
999          txt += '       fi\n'
1000          txt += '       echo "sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
# Line 894 | Line 1005 | class Cmssw(JobType):
1005          txt += '           echo "JOB_EXIT_STATUS = 10032"\n'
1006          txt += '           echo "JobExitCode=10032" | tee -a $RUNTIME_AREA/$repo\n'
1007          txt += '           dumpStatus $RUNTIME_AREA/$repo\n'
1008 +        txt += '           rm -f $RUNTIME_AREA/$repo \n'
1009 +        txt += '           echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1010 +        txt += '           echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1011          txt += '           exit 1\n'
1012          txt += '       fi\n'
1013          txt += '   fi\n'
# Line 910 | Line 1024 | class Cmssw(JobType):
1024          txt += '       echo "JOB_EXIT_STATUS = 10033"\n'
1025          txt += '       echo "JobExitCode=10033" | tee -a $RUNTIME_AREA/$repo\n'
1026          txt += '       dumpStatus $RUNTIME_AREA/$repo\n'
1027 +        txt += '       rm -f $RUNTIME_AREA/$repo \n'
1028 +        txt += '       echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1029 +        txt += '       echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1030          txt += '       exit 1\n'
1031          txt += '   fi\n'
1032          txt += '   echo "SET_CMS_ENV 0 ==> setup cms environment ok"\n'
# Line 927 | Line 1044 | class Cmssw(JobType):
1044          
1045      def getTaskid(self):
1046          return self._taskId
1047 +
1048 + #######################################################################
1049 +    def uniquelist(self, old):
1050 +        """
1051 +        remove duplicates from a list
1052 +        """
1053 +        nd={}
1054 +        for e in old:
1055 +            nd[e]=0
1056 +        return nd.keys()

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines