ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cms_cmssw.py
(Generate patch)

Comparing COMP/CRAB/python/cms_cmssw.py (file contents):
Revision 1.65 by spiga, Sat Jan 20 11:16:45 2007 UTC vs.
Revision 1.73 by gutsche, Sun Apr 8 18:39:51 2007 UTC

# Line 4 | Line 4 | from crab_exceptions import *
4   from crab_util import *
5   import common
6   import PsetManipulator  
7
7   import DataDiscovery
8 + import DataDiscovery_DBS2
9   import DataLocation
10   import Scram
11  
12 < import os, string, re, shutil
12 > import os, string, re, shutil, glob
13  
14   class Cmssw(JobType):
15      def __init__(self, cfg_params, ncjobs):
# Line 20 | Line 20 | class Cmssw(JobType):
20          self._params = {}
21          self.cfg_params = cfg_params
22  
23 +        try:
24 +            self.MaxTarBallSize = float(self.cfg_params['EDG.maxtarballsize'])
25 +        except KeyError:
26 +            self.MaxTarBallSize = 100.0
27 +
28          # number of jobs requested to be created, limit obj splitting
29          self.ncjobs = ncjobs
30  
# Line 29 | Line 34 | class Cmssw(JobType):
34          self.additional_inbox_files = []
35          self.scriptExe = ''
36          self.executable = ''
37 +        self.executable_arch = self.scram.getArch()
38          self.tgz_name = 'default.tgz'
39          self.scriptName = 'CMSSW.sh'
40          self.pset = ''      #scrip use case Da  
# Line 42 | Line 48 | class Cmssw(JobType):
48          self.setParam_('application', self.version)
49  
50          ### collect Data cards
51 +
52 +        ## get DBS mode
53 +        try:
54 +            self.use_dbs_2 = int(self.cfg_params['CMSSW.use_dbs_2'])
55 +        except KeyError:
56 +            self.use_dbs_2 = 0
57 +            
58          try:
59              tmp =  cfg_params['CMSSW.datasetpath']
60              log.debug(6, "CMSSW::CMSSW(): datasetPath = "+tmp)
# Line 131 | Line 144 | class Cmssw(JobType):
144                 self.additional_inbox_files.append(string.strip(self.scriptExe))
145          except KeyError:
146              self.scriptExe = ''
147 +
148          #CarlosDaniele
149          if self.datasetPath == None and self.pset == None and self.scriptExe == '' :
150 <           msg ="WARNING. script_exe  not defined"
150 >           msg ="Error. script_exe  not defined"
151             raise CrabException(msg)
152  
153          ## additional input files
154          try:
155              tmpAddFiles = string.split(cfg_params['USER.additional_input_files'],',')
156 <            common.logger.debug(5,"Additional input files: "+str(tmpAddFiles))
157 <            for tmpFile in tmpAddFiles:
158 <                tmpFile = string.strip(tmpFile)
159 <                if not os.path.exists(tmpFile):
160 <                    raise CrabException("Additional input file not found: "+tmpFile)
156 >            for tmp in tmpAddFiles:
157 >                tmp = string.strip(tmp)
158 >                dirname = ''
159 >                if not tmp[0]=="/": dirname = "."
160 >                files = glob.glob(os.path.join(dirname, tmp))
161 >                for file in files:
162 >                    if not os.path.exists(file):
163 >                        raise CrabException("Additional input file not found: "+file)
164                      pass
165 <                storedFile = common.work_space.shareDir()+ tmpFile
166 <                shutil.copyfile(tmpFile, storedFile)
167 <                self.additional_inbox_files.append(string.strip(storedFile))
165 >                    storedFile = common.work_space.shareDir()+file
166 >                    shutil.copyfile(file, storedFile)
167 >                    self.additional_inbox_files.append(string.strip(storedFile))
168                  pass
152            common.logger.debug(5,"Inbox files so far : "+str(self.additional_inbox_files))
169              pass
170 +            common.logger.debug(5,"Additional input files: "+str(self.additional_inbox_files))
171          except KeyError:
172              pass
173  
# Line 265 | Line 282 | class Cmssw(JobType):
282  
283          datasetPath=self.datasetPath
284  
268        ## TODO
269        dataTiersList = ""
270        dataTiers = dataTiersList.split(',')
271
285          ## Contact the DBS
286          common.logger.message("Contacting DBS...")
287          try:
288 <            self.pubdata=DataDiscovery.DataDiscovery(datasetPath, dataTiers, cfg_params)
288 >
289 >            if self.use_dbs_2 == 1 :
290 >                self.pubdata=DataDiscovery_DBS2.DataDiscovery_DBS2(datasetPath, cfg_params)
291 >            else :
292 >                self.pubdata=DataDiscovery.DataDiscovery(datasetPath, cfg_params)
293              self.pubdata.fetchDBSInfo()
294  
295          except DataDiscovery.NotExistingDatasetError, ex :
296              msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
297              raise CrabException(msg)
281
298          except DataDiscovery.NoDataTierinProvenanceError, ex :
299              msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
300              raise CrabException(msg)
301          except DataDiscovery.DataDiscoveryError, ex:
302 <            msg = 'ERROR ***: failed Data Discovery in DBS  %s'%ex.getErrorMessage()
302 >            msg = 'ERROR ***: failed Data Discovery in DBS :  %s'%ex.getErrorMessage()
303 >            raise CrabException(msg)
304 >        except DataDiscovery_DBS2.NotExistingDatasetError_DBS2, ex :
305 >            msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
306 >            raise CrabException(msg)
307 >        except DataDiscovery_DBS2.NoDataTierinProvenanceError_DBS2, ex :
308 >            msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
309 >            raise CrabException(msg)
310 >        except DataDiscovery_DBS2.DataDiscoveryError_DBS2, ex:
311 >            msg = 'ERROR ***: failed Data Discovery in DBS :  %s'%ex.getErrorMessage()
312              raise CrabException(msg)
313  
314          ## get list of all required data in the form of dbs paths  (dbs path = /dataset/datatier/owner)
290        ## self.DBSPaths=self.pubdata.getDBSPaths()
315          common.logger.message("Required data are :"+self.datasetPath)
316  
317          self.filesbyblock=self.pubdata.getFiles()
318          self.eventsbyblock=self.pubdata.getEventsPerBlock()
319          self.eventsbyfile=self.pubdata.getEventsPerFile()
296        # print str(self.filesbyblock)
297        # print 'self.eventsbyfile',len(self.eventsbyfile)
298        # print str(self.eventsbyfile)
320  
321          ## get max number of events
322          self.maxEvents=self.pubdata.getMaxEvents() ##  self.maxEvents used in Creator.py
# Line 388 | Line 409 | class Cmssw(JobType):
409              block = blocks[blockCount]
410              blockCount += 1
411              
412 <
413 <            numEventsInBlock = self.eventsbyblock[block]
414 <            common.logger.debug(5,'Events in Block File '+str(numEventsInBlock))
412 >            if self.eventsbyblock.has_key(block) :
413 >                numEventsInBlock = self.eventsbyblock[block]
414 >                common.logger.debug(5,'Events in Block File '+str(numEventsInBlock))
415              
416 <            files = self.filesbyblock[block]
417 <            numFilesInBlock = len(files)
418 <            if (numFilesInBlock <= 0):
419 <                continue
420 <            fileCount = 0
421 <
422 <            # ---- New block => New job ---- #
423 <            parString = "\\{"
424 <            # counter for number of events in files currently worked on
425 <            filesEventCount = 0
426 <            # flag if next while loop should touch new file
427 <            newFile = 1
428 <            # job event counter
429 <            jobSkipEventCount = 0
416 >                files = self.filesbyblock[block]
417 >                numFilesInBlock = len(files)
418 >                if (numFilesInBlock <= 0):
419 >                    continue
420 >                fileCount = 0
421 >
422 >                # ---- New block => New job ---- #
423 >                parString = "\\{"
424 >                # counter for number of events in files currently worked on
425 >                filesEventCount = 0
426 >                # flag if next while loop should touch new file
427 >                newFile = 1
428 >                # job event counter
429 >                jobSkipEventCount = 0
430              
431 <            # ---- Iterate over the files in the block until we've met the requested ---- #
432 <            # ---- total # of events or we've gone over all the files in this block  ---- #
433 <            while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
434 <                file = files[fileCount]
435 <                if newFile :
436 <                    try:
437 <                        numEventsInFile = self.eventsbyfile[file]
438 <                        common.logger.debug(6, "File "+str(file)+" has "+str(numEventsInFile)+" events")
439 <                        # increase filesEventCount
440 <                        filesEventCount += numEventsInFile
441 <                        # Add file to current job
442 <                        parString += '\\\"' + file + '\\\"\,'
443 <                        newFile = 0
444 <                    except KeyError:
445 <                        common.logger.message("File "+str(file)+" has unknown number of events: skipping")
431 >                # ---- Iterate over the files in the block until we've met the requested ---- #
432 >                # ---- total # of events or we've gone over all the files in this block  ---- #
433 >                while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
434 >                    file = files[fileCount]
435 >                    if newFile :
436 >                        try:
437 >                            numEventsInFile = self.eventsbyfile[file]
438 >                            common.logger.debug(6, "File "+str(file)+" has "+str(numEventsInFile)+" events")
439 >                            # increase filesEventCount
440 >                            filesEventCount += numEventsInFile
441 >                            # Add file to current job
442 >                            parString += '\\\"' + file + '\\\"\,'
443 >                            newFile = 0
444 >                        except KeyError:
445 >                            common.logger.message("File "+str(file)+" has unknown number of events: skipping")
446                          
447  
448 <                # if less events in file remain than eventsPerJobRequested
449 <                if ( filesEventCount - jobSkipEventCount < eventsPerJobRequested ) :
450 <                    # if last file in block
451 <                    if ( fileCount == numFilesInBlock-1 ) :
452 <                        # end job using last file, use remaining events in block
448 >                    # if less events in file remain than eventsPerJobRequested
449 >                    if ( filesEventCount - jobSkipEventCount < eventsPerJobRequested ) :
450 >                        # if last file in block
451 >                        if ( fileCount == numFilesInBlock-1 ) :
452 >                            # end job using last file, use remaining events in block
453 >                            # close job and touch new file
454 >                            fullString = parString[:-2]
455 >                            fullString += '\\}'
456 >                            list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
457 >                            common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(filesEventCount - jobSkipEventCount)+" events (last file in block).")
458 >                            self.jobDestination.append(blockSites[block])
459 >                            common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
460 >                            # reset counter
461 >                            jobCount = jobCount + 1
462 >                            totalEventCount = totalEventCount + filesEventCount - jobSkipEventCount
463 >                            eventsRemaining = eventsRemaining - filesEventCount + jobSkipEventCount
464 >                            jobSkipEventCount = 0
465 >                            # reset file
466 >                            parString = "\\{"
467 >                            filesEventCount = 0
468 >                            newFile = 1
469 >                            fileCount += 1
470 >                        else :
471 >                            # go to next file
472 >                            newFile = 1
473 >                            fileCount += 1
474 >                    # if events in file equal to eventsPerJobRequested
475 >                    elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
476                          # close job and touch new file
477                          fullString = parString[:-2]
478                          fullString += '\\}'
479 <                        list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
480 <                        common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(filesEventCount - jobSkipEventCount)+" events (last file in block).")
479 >                        list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
480 >                        common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
481                          self.jobDestination.append(blockSites[block])
482                          common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
483                          # reset counter
484                          jobCount = jobCount + 1
485 <                        totalEventCount = totalEventCount + filesEventCount - jobSkipEventCount
486 <                        eventsRemaining = eventsRemaining - filesEventCount + jobSkipEventCount
485 >                        totalEventCount = totalEventCount + eventsPerJobRequested
486 >                        eventsRemaining = eventsRemaining - eventsPerJobRequested
487                          jobSkipEventCount = 0
488                          # reset file
489                          parString = "\\{"
490                          filesEventCount = 0
491                          newFile = 1
492                          fileCount += 1
493 +                        
494 +                    # if more events in file remain than eventsPerJobRequested
495                      else :
496 <                        # go to next file
497 <                        newFile = 1
498 <                        fileCount += 1
499 <                # if events in file equal to eventsPerJobRequested
500 <                elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
501 <                    # close job and touch new file
502 <                    fullString = parString[:-2]
503 <                    fullString += '\\}'
504 <                    list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
505 <                    common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
506 <                    self.jobDestination.append(blockSites[block])
507 <                    common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
508 <                    # reset counter
509 <                    jobCount = jobCount + 1
510 <                    totalEventCount = totalEventCount + eventsPerJobRequested
511 <                    eventsRemaining = eventsRemaining - eventsPerJobRequested
512 <                    jobSkipEventCount = 0
513 <                    # reset file
514 <                    parString = "\\{"
515 <                    filesEventCount = 0
470 <                    newFile = 1
471 <                    fileCount += 1
472 <                    
473 <                # if more events in file remain than eventsPerJobRequested
474 <                else :
475 <                    # close job but don't touch new file
476 <                    fullString = parString[:-2]
477 <                    fullString += '\\}'
478 <                    list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
479 <                    common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
480 <                    self.jobDestination.append(blockSites[block])
481 <                    common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
482 <                    # increase counter
483 <                    jobCount = jobCount + 1
484 <                    totalEventCount = totalEventCount + eventsPerJobRequested
485 <                    eventsRemaining = eventsRemaining - eventsPerJobRequested
486 <                    # calculate skip events for last file
487 <                    # use filesEventCount (contains several files), jobSkipEventCount and eventsPerJobRequest
488 <                    jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
489 <                    # remove all but the last file
490 <                    filesEventCount = self.eventsbyfile[file]
491 <                    parString = "\\{"
492 <                    parString += '\\\"' + file + '\\\"\,'
493 <                pass # END if
494 <            pass # END while (iterate over files in the block)
496 >                        # close job but don't touch new file
497 >                        fullString = parString[:-2]
498 >                        fullString += '\\}'
499 >                        list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
500 >                        common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
501 >                        self.jobDestination.append(blockSites[block])
502 >                        common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
503 >                        # increase counter
504 >                        jobCount = jobCount + 1
505 >                        totalEventCount = totalEventCount + eventsPerJobRequested
506 >                        eventsRemaining = eventsRemaining - eventsPerJobRequested
507 >                        # calculate skip events for last file
508 >                        # use filesEventCount (contains several files), jobSkipEventCount and eventsPerJobRequest
509 >                        jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
510 >                        # remove all but the last file
511 >                        filesEventCount = self.eventsbyfile[file]
512 >                        parString = "\\{"
513 >                        parString += '\\\"' + file + '\\\"\,'
514 >                    pass # END if
515 >                pass # END while (iterate over files in the block)
516          pass # END while (iterate over blocks in the dataset)
517          self.ncjobs = self.total_number_of_jobs = jobCount
518          if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
# Line 666 | Line 687 | class Cmssw(JobType):
687          try: # create tar ball
688              tar = tarfile.open(self.tgzNameWithPath, "w:gz")
689              ## First find the executable
690 <            if (self.executable != ''):
690 >            if (executable != ''):
691                  exeWithPath = self.scram.findFile_(executable)
692                  if ( not exeWithPath ):
693                      raise CrabException('User executable '+executable+' not found')
# Line 677 | Line 698 | class Cmssw(JobType):
698                      common.logger.debug(5,"Exe "+exeWithPath+" to be tarred")
699                      path = swArea+'/'
700                      exe = string.replace(exeWithPath, path,'')
701 <                    tar.add(path+exe,exe)
701 >                    tar.add(path+exe,executable)
702                      pass
703                  else:
704                      # the exe is from release, we'll find it on WN
# Line 713 | Line 734 | class Cmssw(JobType):
734              tar.close()
735          except :
736              raise CrabException('Could not create tar-ball')
737 <        
737 >
738 >        ## check for tarball size
739 >        tarballinfo = os.stat(self.tgzNameWithPath)
740 >        if ( tarballinfo.st_size > self.MaxTarBallSize*1024*1024 ) :
741 >            raise CrabException('Input sandbox size of ' + str(float(tarballinfo.st_size)/1024.0/1024.0) + ' MB is larger than the allowed ' + str(self.MaxTarBallSize) + ' MB input sandbox limit and not supported by the used GRID submission system. Please make sure that no unnecessary files are in all data directories in your local CMSSW project area as they are automatically packed into the input sandbox.')
742 >
743          ## create tar-ball with ML stuff
744          self.MLtgzfile =  common.work_space.pathForTgz()+'share/MLfiles.tgz'
745          try:
# Line 791 | Line 817 | class Cmssw(JobType):
817          txt += '   exit 1 \n'
818          txt += 'fi \n'
819          txt += 'echo "CMSSW_VERSION =  '+self.version+'"\n'
820 +        txt += 'export SCRAM_ARCH='+self.executable_arch+'\n'
821          txt += 'cd '+self.version+'\n'
822          ### needed grep for bug in scramv1 ###
823          txt += scram+' runtime -sh\n'
# Line 949 | Line 976 | class Cmssw(JobType):
976          """
977          
978      def executableName(self):
979 <        if self.pset == None: #CarlosDaniele
979 >        if self.scriptExe: #CarlosDaniele
980              return "sh "
981          else:
982              return self.executable
983  
984      def executableArgs(self):
985 <        if self.pset == None:#CarlosDaniele
985 >        if self.scriptExe:#CarlosDaniele
986              return   self.scriptExe + " $NJob"
987          else:
988              return " -p pset.cfg"
# Line 973 | Line 1000 | class Cmssw(JobType):
1000          if os.path.isfile(self.MLtgzfile):
1001              inp_box.append(self.MLtgzfile)
1002          ## config
1003 <        if not self.pset is None: #CarlosDaniele
1003 >        if not self.pset is None:
1004              inp_box.append(common.work_space.pathForTgz() + 'job/' + self.configFilename())
1005          ## additional input files
1006 <        #for file in self.additional_inbox_files:
1007 <        #    inp_box.append(common.work_space.cwdDir()+file)
1006 >        for file in self.additional_inbox_files:
1007 >            inp_box.append(file)
1008          return inp_box
1009  
1010      def outputSandbox(self, nj):
# Line 1178 | Line 1205 | class Cmssw(JobType):
1205          txt += '       fi\n'
1206          txt += '   fi\n'
1207          txt += '   \n'
1181        txt += '   string=`cat /etc/redhat-release`\n'
1182        txt += '   echo $string\n'
1183        txt += '   if [[ $string = *alhalla* ]]; then\n'
1184        txt += '       echo "SCRAM_ARCH= $SCRAM_ARCH"\n'
1185        txt += '   elif [[ $string = *Enterprise* ]] || [[ $string = *cientific* ]]; then\n'
1186        txt += '       export SCRAM_ARCH=slc3_ia32_gcc323\n'
1187        txt += '       echo "SCRAM_ARCH= $SCRAM_ARCH"\n'
1188        txt += '   else\n'
1189        txt += '       echo "SET_CMS_ENV 10033 ==> ERROR OS unknown, LCG environment not initialized"\n'
1190        txt += '       echo "JOB_EXIT_STATUS = 10033"\n'
1191        txt += '       echo "JobExitCode=10033" | tee -a $RUNTIME_AREA/$repo\n'
1192        txt += '       dumpStatus $RUNTIME_AREA/$repo\n'
1193        txt += '       rm -f $RUNTIME_AREA/$repo \n'
1194        txt += '       echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1195        txt += '       echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1196        txt += '       exit 1\n'
1197        txt += '   fi\n'
1208          txt += '   echo "SET_CMS_ENV 0 ==> setup cms environment ok"\n'
1209          txt += '   echo "### END SETUP CMS LCG ENVIRONMENT ###"\n'
1210          return txt

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines