ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Publisher.py
(Generate patch)

Comparing COMP/CRAB/python/Publisher.py (file contents):
Revision 1.4 by fanzago, Wed Dec 5 11:21:05 2007 UTC vs.
Revision 1.11.4.1 by fanzago, Fri Jul 4 14:01:01 2008 UTC

# Line 5 | Line 5 | from Actor import *
5   from crab_util import *
6   from crab_logger import Logger
7   from crab_exceptions import *
8 < from FwkJobRep.ReportParser import readJobReport
8 > from ProdCommon.FwkJobRep.ReportParser import readJobReport
9   from ProdCommon.MCPayloads.WorkflowSpec import WorkflowSpec
10   from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter
11   from ProdCommon.DataMgmt.DBS.DBSErrors import DBSWriterError, formatEx,DBSReaderError
# Line 27 | Line 27 | class Publisher(Actor):
27              self.processedData = cfg_params['USER.publish_data_name']
28          except KeyError:
29              raise CrabException('Cannot publish output data, because you did not specify USER.publish_data_name parameter in the crab.cfg file')
30 +
31          try:
32              if (int(cfg_params['USER.copy_data']) != 1): raise KeyError
33          except KeyError:
# Line 41 | Line 42 | class Publisher(Actor):
42              self.globalDBS="http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet"
43          try:
44              self.DBSURL=cfg_params['USER.dbs_url_for_publication']
45 <            common.logger.message('dbs url = '+self.DBSURL)
46 <            if (self.DBSURL == "http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet"):
45 >            common.logger.message('<dbs_url_for_publication> = '+self.DBSURL)
46 >            if (self.DBSURL == "http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet") or (self.DBSURL == "https://cmsdbsprod.cern.ch:8443/cms_dbs_prod_global_writer/servlet/DBSServlet"):
47                  msg = "You can not publish your data in the globalDBS = " + self.DBSURL + "\n"
48                  msg = msg + "Please write your local one in the [USER] section 'dbs_url_for_publication'"
49                  raise CrabException(msg)
50          except KeyError:
51 <            msg = "Error. The [USER] section does not have 'dbs_url_for_publication'"
52 <            msg = msg + " entry, necessary to publish the data"
51 >            msg = "Warning. The [USER] section does not have 'dbs_url_for_publication'"
52 >            msg = msg + " entry, necessary to publish the data.\n"
53 >            msg = msg + "Use the command **crab -publish -USER.dbs_url_for_publication=dbs_url_for_publication*** \nwhere dbs_url_for_publication is your local dbs instance."
54              raise CrabException(msg)
55              
56          self.content=file(self.pset).read()
57          self.resDir = common.work_space.resDir()
58 +        
59 +        self.dataset_to_import=[]
60 +        
61          self.datasetpath=cfg_params['CMSSW.datasetpath']
62 +        if (self.datasetpath.upper() != 'NONE'):
63 +            self.dataset_to_import.append(self.datasetpath)
64 +        
65 +        ### Added PU dataset
66 +        tmp = cfg_params.get('CMSSW.dataset_pu',None)
67 +        if tmp :
68 +            datasets = tmp.split(',')
69 +            for dataset in datasets:
70 +                dataset=string.strip(dataset)
71 +                self.dataset_to_import.append(dataset)
72 +        ###        
73 +                
74          self.SEName=''
75          self.CMSSW_VERSION=''
76          self.exit_status=''
77          self.time = time.strftime('%y%m%d_%H%M%S',time.localtime(time.time()))
78 <        self.emptyLFNs=[]
78 >        self.problemFiles=[]  
79 >        self.noEventsFiles=[]
80 >        self.noLFN=[]
81          
82      def importParentDataset(self,globalDBS, datasetpath):
83          """
# Line 66 | Line 85 | class Publisher(Actor):
85          dbsWriter = DBSWriter(self.DBSURL,level='ERROR')
86          
87          try:
88 <            dbsWriter.importDataset(globalDBS, self.datasetpath, self.DBSURL)
88 >            dbsWriter.importDatasetWithoutParentage(globalDBS, self.datasetpath, self.DBSURL)
89          except DBSWriterError, ex:
90              msg = "Error importing dataset to be processed into local DBS\n"
91              msg += "Source Dataset: %s\n" % datasetpath
# Line 87 | Line 106 | class Publisher(Actor):
106              msg = "Error: Problem with "+file+" file"  
107              common.logger.message(msg)
108              return self.exit_status
109 <            
110 <        if (self.datasetpath != 'None'):
111 <            common.logger.message("--->>> Importing parent dataset in the dbs")
112 <            status_import=self.importParentDataset(self.globalDBS, self.datasetpath)
113 <            if (status_import == 1):
114 <                common.logger.message('Problem with parent import from the global DBS '+self.globalDBS+ 'to the local one '+self.DBSURL)
115 <                self.exit_status='1'
116 <                return self.exit_status
117 <            common.logger.message("Parent import ok")
109 >
110 >        if (len(self.dataset_to_import) != 0):
111 >           for dataset in self.dataset_to_import:
112 >               common.logger.message("--->>> Importing parent dataset in the dbs: " +dataset)
113 >               status_import=self.importParentDataset(self.globalDBS, dataset)
114 >               if (status_import == 1):
115 >                   common.logger.message('Problem with parent '+ dataset +' import from the global DBS '+self.globalDBS+ 'to the local one '+self.DBSURL)
116 >                   self.exit_status='1'
117 >                   return self.exit_status
118 >               else:    
119 >                   common.logger.message('Import ok of dataset '+dataset)
120              
121          #// DBS to contact
122          dbswriter = DBSWriter(self.DBSURL)                        
# Line 112 | Line 133 | class Publisher(Actor):
133          common.logger.debug(6,"FileInfo = " + str(fileinfo))
134          common.logger.debug(6,"DatasetInfo = " + str(datasets))
135          for dataset in datasets:
136 +            #### for production data
137 +            if (dataset['PrimaryDataset'] == 'null'):
138 +                dataset['PrimaryDataset'] = dataset['ProcessedDataset']
139 +                
140              dataset['PSetContent']=self.content
116            #cfgMeta = {'name' : 'usercfg' , 'Type' : 'user' , 'annotation': 'user cfg', 'version' : 'private version'} # add real name of user cfg
141              cfgMeta = {'name' : self.pset , 'Type' : 'user' , 'annotation': 'user cfg', 'version' : 'private version'} # add real name of user cfg
142              common.logger.message("PrimaryDataset = %s"%dataset['PrimaryDataset'])
143              common.logger.message("ProcessedDataset = %s"%dataset['ProcessedDataset'])
144 <            common.logger.message("--->>> Inserting primary: %s processed : %s"%(dataset['PrimaryDataset'],dataset['ProcessedDataset']))
144 >            common.logger.message("<User Dataset Name> = /"+dataset['PrimaryDataset']+"/"+dataset['ProcessedDataset']+"/USER")
145 >            
146 >            common.logger.debug(6,"--->>> Inserting primary: %s processed : %s"%(dataset['PrimaryDataset'],dataset['ProcessedDataset']))
147              
148              primary = DBSWriterObjects.createPrimaryDataset( dataset, dbswriter.dbs)
149              common.logger.debug(6,"Primary:  %s "%primary)
# Line 128 | Line 154 | class Publisher(Actor):
154              processed = DBSWriterObjects.createProcessedDataset(primary, algo, dataset, dbswriter.dbs)
155              common.logger.debug(6,"Processed:  %s "%processed)
156              
157 <            common.logger.message("Inserted primary %s processed %s"%(primary,processed))
157 >            common.logger.debug(6,"Inserted primary %s processed %s"%(primary,processed))
158              
159          common.logger.debug(6,"exit_status = %s "%self.exit_status)
160          return self.exit_status    
# Line 149 | Line 175 | class Publisher(Actor):
175          ### skip publication for 0 events files
176          filestopublish=[]
177          for file in jobReport.files:
178 <            if int(file['TotalEvents']) != 0 :
179 <                file.lumisections = {}
180 <                for ds in file.dataset:
181 <                    ds['ProcessedDataset']=procdataset
182 <                filestopublish.append(file)
178 >            #### added check for problem with copy to SE and empty lfn
179 >            if (string.find(file['LFN'], 'copy_problems') != -1):
180 >                self.problemFiles.append(file['LFN'])
181 >            elif (file['LFN'] == ''):
182 >                self.noLFN.append(file['PFN'])
183              else:
184 <                self.emptyLFNs.append(file['LFN'])
184 >                if int(file['TotalEvents']) != 0 :
185 >                    file.lumisections = {}
186 >                    for ds in file.dataset:
187 >                        ds['ProcessedDataset']=procdataset
188 >                        ### Fede for production
189 >                        if (ds['PrimaryDataset'] == 'null'):
190 >                            ds['PrimaryDataset']=procdataset
191 >                    filestopublish.append(file)
192 >                else:
193 >                    self.noEventsFiles.append(file['LFN'])
194          jobReport.files = filestopublish
195          ### if all files of FJR have number of events = 0
196          if (len(filestopublish) == 0):
# Line 198 | Line 233 | class Publisher(Actor):
233                      [BlocksList.append(x) for x in Blocks]
234                      
235              # close the blocks
236 <            common.logger.message("BlocksList = %s"%BlocksList)
236 >            common.logger.debug(6, "BlocksList = %s"%BlocksList)
237              # dbswriter = DBSWriter(self.DBSURL,level='ERROR')
238              dbswriter = DBSWriter(self.DBSURL)
239              
240              for BlockName in BlocksList:
241                  try:  
242                      closeBlock=dbswriter.manageFileBlock(BlockName,maxFiles= 1)
243 <                    common.logger.message("closeBlock %s"%closeBlock)
243 >                    common.logger.debug(6, "closeBlock %s"%closeBlock)
244                      #dbswriter.dbs.closeBlock(BlockName)
245                  except DBSWriterError, ex:
246                      common.logger.message("Close block error %s"%ex)
247  
248 <            common.logger.message("--->>> End files publication")
249 <            if (len(self.emptyLFNs)>0):
250 <                common.logger.message("--->>> WARNING: files not published because they contain 0 events are:")
216 <                for lfn in self.emptyLFNs:
248 >            if (len(self.noEventsFiles)>0):
249 >                common.logger.message("--->>> WARNING: "+str(len(self.noEventsFiles))+" files not published because they contain 0 events are:")
250 >                for lfn in self.noEventsFiles:
251                      common.logger.message("------ LFN: %s"%lfn)
252 +            if (len(self.noLFN)>0):
253 +                common.logger.message("--->>> WARNING: there are "+str(len(self.noLFN))+" files not published because they have empty LFN")
254 +                for pfn in self.noLFN:
255 +                    common.logger.message("------ pfn: %s"%pfn)
256 +            if (len(self.problemFiles)>0):
257 +                common.logger.message("--->>> WARNING: "+str(len(self.problemFiles))+" files not published because they had problem with copy to SE")
258 +                for lfn in self.problemFiles:
259 +                    common.logger.message("------ LFN: %s"%lfn)
260 +            common.logger.message("--->>> End files publication")
261 +            common.logger.message("--->>> To check data publication please use: InspectDBS2.py --DBSURL=<dbs_url_for_publication> --datasetPath=<User Dataset Name>")
262              return self.exit_status
263  
264          else:

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines