ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cms_cmssw.py
(Generate patch)

Comparing COMP/CRAB/python/cms_cmssw.py (file contents):
Revision 1.235 by spiga, Fri Aug 29 15:06:41 2008 UTC vs.
Revision 1.254 by ewv, Thu Oct 30 16:25:24 2008 UTC

# Line 2 | Line 2 | from JobType import JobType
2   from crab_logger import Logger
3   from crab_exceptions import *
4   from crab_util import *
5 < from BlackWhiteListParser import SEBlackWhiteListParser
5 > from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
6   import common
7   import Scram
8 from LFNBaseName import *
8  
9   import os, string, glob
10  
# Line 19 | Line 18 | class Cmssw(JobType):
18  
19          self._params = {}
20          self.cfg_params = cfg_params
21 +
22          # init BlackWhiteListParser
23 <        self.blackWhiteListParser = SEBlackWhiteListParser(cfg_params)
23 >        seWhiteList = cfg_params.get('EDG.se_white_list',[])
24 >        seBlackList = cfg_params.get('EDG.se_black_list',[])
25 >        self.blackWhiteListParser = SEBlackWhiteListParser(seWhiteList, seBlackList, common.logger)
26  
27          ### Temporary patch to automatically skip the ISB size check:
28          server=self.cfg_params.get('CRAB.server_name',None)
29 <        size = 9.5
30 <        if server: size = 99999
29 >        size = 9.5
30 >        if server or common.scheduler.name().upper() in ['LSF','CAF']: size = 99999
31          ### D.S.
32          self.MaxTarBallSize = float(self.cfg_params.get('EDG.maxtarballsize',size))
33  
# Line 71 | Line 73 | class Cmssw(JobType):
73  
74          tmp =  cfg_params['CMSSW.datasetpath']
75          log.debug(6, "CMSSW::CMSSW(): datasetPath = "+tmp)
76 <        if string.lower(tmp)=='none':
76 >
77 >        if tmp =='':
78 >            msg = "Error: datasetpath not defined "
79 >            raise CrabException(msg)
80 >        elif string.lower(tmp)=='none':
81              self.datasetPath = None
82              self.selectNoInput = 1
83          else:
# Line 235 | Line 241 | class Cmssw(JobType):
241                  self.jobSplittingForScript()
242              else:
243                  self.jobSplittingNoInput()
244 +        elif (cfg_params.get('CMSSW.noblockboundary',0)):
245 +            self.jobSplittingNoBlockBoundary(blockSites)
246          else:
247              self.jobSplittingByBlocks(blockSites)
248  
# Line 580 | Line 588 | class Cmssw(JobType):
588          self.list_of_args = list_of_lists
589          return
590  
591 +    def jobSplittingNoBlockBoundary(self,blockSites):
592 +        """
593 +        """
594 +        # ---- Handle the possible job splitting configurations ---- #
595 +        if (self.selectTotalNumberEvents):
596 +            totalEventsRequested = self.total_number_of_events
597 +        if (self.selectEventsPerJob):
598 +            eventsPerJobRequested = self.eventsPerJob
599 +            if (self.selectNumberOfJobs):
600 +                totalEventsRequested = self.theNumberOfJobs * self.eventsPerJob
601 +
602 +        # If user requested all the events in the dataset
603 +        if (totalEventsRequested == -1):
604 +            eventsRemaining=self.maxEvents
605 +        # If user requested more events than are in the dataset
606 +        elif (totalEventsRequested > self.maxEvents):
607 +            eventsRemaining = self.maxEvents
608 +            common.logger.message("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
609 +        # If user requested less events than are in the dataset
610 +        else:
611 +            eventsRemaining = totalEventsRequested
612 +
613 +        # If user requested more events per job than are in the dataset
614 +        if (self.selectEventsPerJob and eventsPerJobRequested > self.maxEvents):
615 +            eventsPerJobRequested = self.maxEvents
616 +
617 +        # For user info at end
618 +        totalEventCount = 0
619 +
620 +        if (self.selectTotalNumberEvents and self.selectNumberOfJobs):
621 +            eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs)
622 +
623 +        if (self.selectNumberOfJobs):
624 +            common.logger.message("May not create the exact number_of_jobs requested.")
625 +
626 +        if ( self.ncjobs == 'all' ) :
627 +            totalNumberOfJobs = 999999999
628 +        else :
629 +            totalNumberOfJobs = self.ncjobs
630 +
631 +        blocks = blockSites.keys()
632 +        blockCount = 0
633 +        # Backup variable in case self.maxEvents counted events in a non-included block
634 +        numBlocksInDataset = len(blocks)
635 +
636 +        jobCount = 0
637 +        list_of_lists = []
638 +
639 +        #AF
640 +        #AF do not reset input files and event count on block boundary
641 +        #AF
642 +        parString=""
643 +        filesEventCount = 0
644 +        #AF
645 +
646 +        # list tracking which jobs are in which jobs belong to which block
647 +        jobsOfBlock = {}
648 +        while ( (eventsRemaining > 0) and (blockCount < numBlocksInDataset) and (jobCount < totalNumberOfJobs)):
649 +            block = blocks[blockCount]
650 +            blockCount += 1
651 +            if block not in jobsOfBlock.keys() :
652 +                jobsOfBlock[block] = []
653 +
654 +            if self.eventsbyblock.has_key(block) :
655 +                numEventsInBlock = self.eventsbyblock[block]
656 +                common.logger.debug(5,'Events in Block File '+str(numEventsInBlock))
657 +                files = self.filesbyblock[block]
658 +                numFilesInBlock = len(files)
659 +                if (numFilesInBlock <= 0):
660 +                    continue
661 +                fileCount = 0
662 +                #AF
663 +                #AF do not reset input files and event count of block boundary
664 +                #AF
665 +                ## ---- New block => New job ---- #
666 +                #parString = ""
667 +                # counter for number of events in files currently worked on
668 +                #filesEventCount = 0
669 +                #AF
670 +                # flag if next while loop should touch new file
671 +                newFile = 1
672 +                # job event counter
673 +                jobSkipEventCount = 0
674 +
675 +                # ---- Iterate over the files in the block until we've met the requested ---- #
676 +                # ---- total # of events or we've gone over all the files in this block  ---- #
677 +                pString=''
678 +                while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
679 +                    file = files[fileCount]
680 +                    if self.useParent:
681 +                        parent = self.parentFiles[file]
682 +                        for f in parent :
683 +                            pString += '\\\"' + f + '\\\"\,'
684 +                        common.logger.debug(6, "File "+str(file)+" has the following parents: "+str(parent))
685 +                        common.logger.write("File "+str(file)+" has the following parents: "+str(parent))
686 +                    if newFile :
687 +                        try:
688 +                            numEventsInFile = self.eventsbyfile[file]
689 +                            common.logger.debug(6, "File "+str(file)+" has "+str(numEventsInFile)+" events")
690 +                            # increase filesEventCount
691 +                            filesEventCount += numEventsInFile
692 +                            # Add file to current job
693 +                            parString += '\\\"' + file + '\\\"\,'
694 +                            newFile = 0
695 +                        except KeyError:
696 +                            common.logger.message("File "+str(file)+" has unknown number of events: skipping")
697 +                    eventsPerJobRequested = min(eventsPerJobRequested, eventsRemaining)
698 +                    #common.logger.message("AF filesEventCount %s - jobSkipEventCount %s "%(filesEventCount,jobSkipEventCount))
699 +                    # if less events in file remain than eventsPerJobRequested
700 +                    if ( filesEventCount - jobSkipEventCount < eventsPerJobRequested):
701 +                      #AF
702 +                      #AF skip fileboundary part
703 +                      #AF
704 +                            # go to next file
705 +                            newFile = 1
706 +                            fileCount += 1
707 +                    # if events in file equal to eventsPerJobRequested
708 +                    elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
709 +                        # close job and touch new file
710 +                        fullString = parString[:-2]
711 +                        if self.useParent:
712 +                            fullParentString = pString[:-2]
713 +                            list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
714 +                        else:
715 +                            list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
716 +                        common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
717 +                        self.jobDestination.append(blockSites[block])
718 +                        common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
719 +                        jobsOfBlock[block].append(jobCount+1)
720 +                        # reset counter
721 +                        jobCount = jobCount + 1
722 +                        totalEventCount = totalEventCount + eventsPerJobRequested
723 +                        eventsRemaining = eventsRemaining - eventsPerJobRequested
724 +                        jobSkipEventCount = 0
725 +                        # reset file
726 +                        pString = ""
727 +                        parString = ""
728 +                        filesEventCount = 0
729 +                        newFile = 1
730 +                        fileCount += 1
731 +
732 +                    # if more events in file remain than eventsPerJobRequested
733 +                    else :
734 +                        # close job but don't touch new file
735 +                        fullString = parString[:-2]
736 +                        if self.useParent:
737 +                            fullParentString = pString[:-2]
738 +                            list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
739 +                        else:
740 +                            list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
741 +                        common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
742 +                        self.jobDestination.append(blockSites[block])
743 +                        common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
744 +                        jobsOfBlock[block].append(jobCount+1)
745 +                        # increase counter
746 +                        jobCount = jobCount + 1
747 +                        totalEventCount = totalEventCount + eventsPerJobRequested
748 +                        eventsRemaining = eventsRemaining - eventsPerJobRequested
749 +                        # calculate skip events for last file
750 +                        # use filesEventCount (contains several files), jobSkipEventCount and eventsPerJobRequest
751 +                        jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
752 +                        # remove all but the last file
753 +                        filesEventCount = self.eventsbyfile[file]
754 +                        if self.useParent:
755 +                            for f in parent : pString += '\\\"' + f + '\\\"\,'
756 +                        parString = '\\\"' + file + '\\\"\,'
757 +                    pass # END if
758 +                pass # END while (iterate over files in the block)
759 +        pass # END while (iterate over blocks in the dataset)
760 +        self.ncjobs = self.total_number_of_jobs = jobCount
761 +        if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
762 +            common.logger.message("eventsRemaining "+str(eventsRemaining))
763 +            common.logger.message("jobCount "+str(jobCount))
764 +            common.logger.message(" totalNumberOfJobs "+str(totalNumberOfJobs))
765 +            common.logger.message("Could not run on all requested events because some blocks not hosted at allowed sites.")
766 +        common.logger.message(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
767 +
768 +        # screen output
769 +        screenOutput = "List of jobs and available destination sites:\n\n"
770 +
771 +        #AF
772 +        #AF   skip check on  block with no sites
773 +        #AF
774 +        self.list_of_args = list_of_lists
775 +
776 +        return
777 +
778 +
779 +
780      def jobSplittingNoInput(self):
781          """
782          Perform job splitting based on number of event per job
# Line 690 | Line 887 | class Cmssw(JobType):
887          """
888          Return the TarBall with lib and exe
889          """
890 <        self.tgzNameWithPath = common.work_space.pathForTgz()+'share/'+self.tgz_name
890 >        self.tgzNameWithPath = common.work_space.pathForTgz()+self.tgz_name
891          if os.path.exists(self.tgzNameWithPath):
892              return self.tgzNameWithPath
893  
# Line 775 | Line 972 | class Cmssw(JobType):
972              ## Add ProdCommon dir to tar
973              prodcommonDir = './'
974              prodcommonPath = os.environ['CRABDIR'] + '/' + 'external/'
975 <            neededStuff = ['ProdCommon/__init__.py','ProdCommon/FwkJobRep', 'ProdCommon/CMSConfigTools','ProdCommon/Core','ProdCommon/MCPayloads', 'IMProv']
975 >            neededStuff = ['ProdCommon/__init__.py','ProdCommon/FwkJobRep', 'ProdCommon/CMSConfigTools', \
976 >                           'ProdCommon/Core', 'ProdCommon/MCPayloads', 'IMProv', 'ProdCommon/Storage']
977              for file in neededStuff:
978                  tar.add(prodcommonPath+file,prodcommonDir+file)
979              common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
# Line 788 | Line 986 | class Cmssw(JobType):
986              common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
987  
988              ##### Utils
989 <            Utils_file_list=['parseCrabFjr.py','writeCfg.py', 'fillCrabFjr.py']
989 >            Utils_file_list=['parseCrabFjr.py','writeCfg.py', 'fillCrabFjr.py','cmscp.py']
990              for file in Utils_file_list:
991                  tar.add(path+file,file)
992              common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
993  
994              ##### AdditionalFiles
995 +            tar.dereference=True
996              for file in self.additional_inbox_files:
997                  tar.add(file,string.split(file,'/')[-1])
998 +            tar.dereference=False
999              common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
1000  
1001              tar.close()
1002 <        except IOError:
1002 >        except IOError, exc:
1003 >            common.logger.write(str(exc))
1004              raise CrabException('Could not create tar-ball '+self.tgzNameWithPath)
1005 <        except tarfile.TarError:
1005 >        except tarfile.TarError, exc:
1006 >            common.logger.write(str(exc))
1007              raise CrabException('Could not create tar-ball '+self.tgzNameWithPath)
1008  
1009          ## check for tarball size
1010          tarballinfo = os.stat(self.tgzNameWithPath)
1011          if ( tarballinfo.st_size > self.MaxTarBallSize*1024*1024 ) :
1012 <            raise CrabException('Input sandbox size of ' + str(float(tarballinfo.st_size)/1024.0/1024.0) + ' MB is larger than the allowed ' + str(self.MaxTarBallSize) + ' MB input sandbox limit and not supported by the used GRID submission system. Please make sure that no unnecessary files are in all data directories in your local CMSSW project area as they are automatically packed into the input sandbox.')
1012 >            msg  = 'Input sandbox size of ' + str(float(tarballinfo.st_size)/1024.0/1024.0) + ' MB is larger than the allowed ' + str(self.MaxTarBallSize) \
1013 >               +'MB input sandbox limit \n'
1014 >            msg += '      and not supported by the direct GRID submission system.\n'
1015 >            msg += '      Please use the CRAB server mode by setting server_name=<NAME> in section [CRAB] of your crab.cfg.\n'
1016 >            msg += '      For further infos please see https://twiki.cern.ch/twiki/bin/view/CMS/CrabServer#CRABSERVER_for_Users'
1017 >            raise CrabException(msg)
1018  
1019          ## create tar-ball with ML stuff
1020  
# Line 877 | Line 1084 | class Cmssw(JobType):
1084          # Prepare job-specific part
1085          job = common.job_list[nj]
1086          if (self.datasetPath):
1087 +            self.primaryDataset = self.datasetPath.split("/")[1]
1088 +            DataTier = self.datasetPath.split("/")[2]
1089              txt += '\n'
1090              txt += 'DatasetPath='+self.datasetPath+'\n'
1091  
1092 <            datasetpath_split = self.datasetPath.split("/")
1093 <            ### FEDE FOR NEW LFN ###
885 <            self.primaryDataset = datasetpath_split[1]
886 <            ########################
887 <            txt += 'PrimaryDataset='+datasetpath_split[1]+'\n'
888 <            txt += 'DataTier='+datasetpath_split[2]+'\n'
1092 >            txt += 'PrimaryDataset='+self.primaryDataset +'\n'
1093 >            txt += 'DataTier='+DataTier+'\n'
1094              txt += 'ApplicationFamily=cmsRun\n'
1095  
1096          else:
1097 +            self.primaryDataset = 'null'
1098              txt += 'DatasetPath=MCDataTier\n'
893            ### FEDE FOR NEW LFN ###
894            self.primaryDataset = 'null'
895            ########################
1099              txt += 'PrimaryDataset=null\n'
1100              txt += 'DataTier=null\n'
1101              txt += 'ApplicationFamily=MCDataTier\n'
# Line 1039 | Line 1242 | class Cmssw(JobType):
1242          inp_box = []
1243          if os.path.isfile(self.tgzNameWithPath):
1244              inp_box.append(self.tgzNameWithPath)
1245 <        wrapper = os.path.basename(str(common._db.queryTask('scriptName')))
1043 <        inp_box.append(common.work_space.pathForTgz() +'job/'+ wrapper)
1245 >        inp_box.append(common.work_space.jobDir() + self.scriptName)
1246          return inp_box
1247  
1248      def outputSandbox(self, nj):
# Line 1090 | Line 1292 | class Cmssw(JobType):
1292              txt += 'fi\n'
1293          file_list = []
1294          for fileWithSuffix in (self.output_file):
1295 <             file_list.append(numberFile(fileWithSuffix, '$NJob'))
1295 >             file_list.append(numberFile('$SOFTWARE_DIR/'+fileWithSuffix, '$NJob'))
1296  
1297 <        txt += 'file_list="'+string.join(file_list,' ')+'"\n'
1297 >        txt += 'file_list="'+string.join(file_list,',')+'"\n'
1298          txt += '\n'
1299          txt += 'echo ">>> current directory (SOFTWARE_DIR): $SOFTWARE_DIR" \n'
1300          txt += 'echo ">>> current directory content:"\n'
# Line 1189 | Line 1391 | class Cmssw(JobType):
1391          txt += '    echo "==> setup cms environment ok"\n'
1392          return txt
1393  
1394 <    def modifyReport(self, nj):
1394 >    def wsModifyReport(self, nj):
1395          """
1396          insert the part of the script that modifies the FrameworkJob Report
1397          """
1398 <        txt = '\n#Written by cms_cmssw::modifyReport\n'
1398 >        txt = '\n#Written by cms_cmssw::wsModifyReport\n'
1399          publish_data = int(self.cfg_params.get('USER.publish_data',0))
1400          if (publish_data == 1):
1401 +
1402              processedDataset = self.cfg_params['USER.publish_data_name']
1403 <            if (self.primaryDataset == 'null'):
1404 <                 self.primaryDataset = processedDataset
1405 <            if (common.scheduler.name().upper() == "CAF" or common.scheduler.name().upper() == "LSF"):
1203 <                ### FEDE FOR NEW LFN ###
1204 <                LFNBaseName = LFNBase(self.primaryDataset, processedDataset, LocalUser=True)
1205 <                self.user = getUserName(LocalUser=True)
1206 <                ########################
1207 <            else :
1208 <                ### FEDE FOR NEW LFN ###
1209 <                LFNBaseName = LFNBase(self.primaryDataset, processedDataset)
1210 <                self.user = getUserName()
1211 <                ########################
1212 <
1213 <            txt += 'if [ $copy_exit_status -eq 0 ]; then\n'
1214 <            ### FEDE FOR NEW LFN ###
1215 <            #txt += '    FOR_LFN=%s_${PSETHASH}/\n'%(LFNBaseName)
1216 <            txt += '    FOR_LFN=%s/${PSETHASH}/\n'%(LFNBaseName)
1217 <            ########################
1403 >
1404 >            txt += 'if [ $StageOutExitStatus -eq 0 ]; then\n'
1405 >            txt += '    FOR_LFN=$LFNBaseName\n'
1406              txt += 'else\n'
1407              txt += '    FOR_LFN=/copy_problems/ \n'
1408              txt += '    SE=""\n'
# Line 1224 | Line 1412 | class Cmssw(JobType):
1412              txt += 'echo ">>> Modify Job Report:" \n'
1413              txt += 'chmod a+x $RUNTIME_AREA/ProdCommon/FwkJobRep/ModifyJobReport.py\n'
1414              txt += 'ProcessedDataset='+processedDataset+'\n'
1415 +            #txt += 'ProcessedDataset=$procDataset \n'
1416              txt += 'echo "ProcessedDataset = $ProcessedDataset"\n'
1417              txt += 'echo "SE = $SE"\n'
1418              txt += 'echo "SE_PATH = $SE_PATH"\n'
1419              txt += 'echo "FOR_LFN = $FOR_LFN" \n'
1420              txt += 'echo "CMSSW_VERSION = $CMSSW_VERSION"\n\n'
1421 <            ### FEDE FOR NEW LFN ###
1422 <            txt += 'echo "$RUNTIME_AREA/ProdCommon/FwkJobRep/ModifyJobReport.py $RUNTIME_AREA/crab_fjr_$NJob.xml $NJob $FOR_LFN $PrimaryDataset $DataTier ' + self.user + '-$ProcessedDataset-$PSETHASH $ApplicationFamily $executable $CMSSW_VERSION $PSETHASH $SE $SE_PATH"\n'
1423 <            txt += '$RUNTIME_AREA/ProdCommon/FwkJobRep/ModifyJobReport.py $RUNTIME_AREA/crab_fjr_$NJob.xml $NJob $FOR_LFN $PrimaryDataset $DataTier ' + self.user + '-$ProcessedDataset-$PSETHASH $ApplicationFamily $executable $CMSSW_VERSION $PSETHASH $SE $SE_PATH\n'
1424 <            ########################
1421 >            args = '$RUNTIME_AREA/crab_fjr_$NJob.xml $NJob $FOR_LFN $PrimaryDataset $DataTier ' \
1422 >                   '$USER-$ProcessedDataset-$PSETHASH $ApplicationFamily '+ \
1423 >                    '  $executable $CMSSW_VERSION $PSETHASH $SE $SE_PATH'
1424 >            txt += 'echo "$RUNTIME_AREA/ProdCommon/FwkJobRep/ModifyJobReport.py '+str(args)+'"\n'
1425 >            txt += '$RUNTIME_AREA/ProdCommon/FwkJobRep/ModifyJobReport.py '+str(args)+'\n'
1426              txt += 'modifyReport_result=$?\n'
1427              txt += 'if [ $modifyReport_result -ne 0 ]; then\n'
1428              txt += '    modifyReport_result=70500\n'
# Line 1269 | Line 1459 | class Cmssw(JobType):
1459          txt += '        echo "CRAB python script to parse CRAB FrameworkJobReport crab_fjr.xml is not available, using exit code of executable from command line."\n'
1460          txt += '    fi\n'
1461            #### Patch to check input data reading for CMSSW16x Hopefully we-ll remove it asap
1272
1462          txt += '    if [ $executable_exit_status -eq 0 ];then\n'
1463          txt += '      echo ">>> Executable succeded  $executable_exit_status"\n'
1464          if (self.datasetPath and not (self.dataset_pu or self.useParent)) :
# Line 1299 | Line 1488 | class Cmssw(JobType):
1488              txt += '      fi\n'
1489          txt += '    elif [ $executable_exit_status -ne 0 ] || [ $executable_exit_status -ne 50015 ] || [ $executable_exit_status -ne 50017 ];then\n'
1490          txt += '      echo ">>> Executable failed  $executable_exit_status"\n'
1491 +        txt += '      echo "ExeExitCode=$executable_exit_status" | tee -a $RUNTIME_AREA/$repo\n'
1492 +        txt += '      echo "EXECUTABLE_EXIT_STATUS = $executable_exit_status"\n'
1493 +        txt += '      job_exit_code=$executable_exit_status\n'
1494          txt += '      func_exit\n'
1495          txt += '    fi\n'
1496          txt += '\n'

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines