ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Publisher.py
(Generate patch)

Comparing COMP/CRAB/python/Publisher.py (file contents):
Revision 1.7 by fanzago, Tue Dec 18 10:58:20 2007 UTC vs.
Revision 1.23 by fanzago, Mon Feb 9 15:17:51 2009 UTC

# Line 5 | Line 5 | from Actor import *
5   from crab_util import *
6   from crab_logger import Logger
7   from crab_exceptions import *
8 < from FwkJobRep.ReportParser import readJobReport
8 > from ProdCommon.FwkJobRep.ReportParser import readJobReport
9 > from ProdCommon.FwkJobRep.ReportState import checkSuccess
10   from ProdCommon.MCPayloads.WorkflowSpec import WorkflowSpec
11   from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter
12   from ProdCommon.DataMgmt.DBS.DBSErrors import DBSWriterError, formatEx,DBSReaderError
# Line 24 | Line 25 | class Publisher(Actor):
25          """
26  
27          try:
28 <            self.processedData = cfg_params['USER.publish_data_name']
28 >            self.userprocessedData = cfg_params['USER.publish_data_name']
29 >            self.processedData = None
30          except KeyError:
31              raise CrabException('Cannot publish output data, because you did not specify USER.publish_data_name parameter in the crab.cfg file')
32 +
33          try:
34 <            if (int(cfg_params['USER.copy_data']) != 1): raise KeyError
34 >            if (int(cfg_params['USER.copy_data']) != 1):
35 >                raise KeyError
36          except KeyError:
37 <            raise CrabException('You can not publish data because you did not selected *** copy_data = 1  *** in the crab.cfg file')
37 >            msg  = 'You can not publish data because you did not selected \n'
38 >            msg += '\t*** copy_data = 1 or publish_data = 1  *** in the crab.cfg file'
39 >            raise CrabException(msg)
40          try:
41              self.pset = cfg_params['CMSSW.pset']
42          except KeyError:
# Line 47 | Line 53 | class Publisher(Actor):
53                  msg = msg + "Please write your local one in the [USER] section 'dbs_url_for_publication'"
54                  raise CrabException(msg)
55          except KeyError:
56 <            msg = "Error. The [USER] section does not have 'dbs_url_for_publication'"
57 <            msg = msg + " entry, necessary to publish the data"
56 >            msg = "Warning. The [USER] section does not have 'dbs_url_for_publication'"
57 >            msg = msg + " entry, necessary to publish the data.\n"
58 >            msg = msg + "Use the command **crab -publish -USER.dbs_url_for_publication=dbs_url_for_publication*** \nwhere dbs_url_for_publication is your local dbs instance."
59              raise CrabException(msg)
60              
61          self.content=file(self.pset).read()
62          self.resDir = common.work_space.resDir()
63 +        
64 +        self.dataset_to_import=[]
65 +        
66          self.datasetpath=cfg_params['CMSSW.datasetpath']
67 +        if (self.datasetpath.upper() != 'NONE'):
68 +            self.dataset_to_import.append(self.datasetpath)
69 +        
70 +        ### Added PU dataset
71 +        tmp = cfg_params.get('CMSSW.dataset_pu',None)
72 +        if tmp :
73 +            datasets = tmp.split(',')
74 +            for dataset in datasets:
75 +                dataset=string.strip(dataset)
76 +                self.dataset_to_import.append(dataset)
77 +        ###        
78 +                
79          self.SEName=''
80          self.CMSSW_VERSION=''
81          self.exit_status=''
# Line 68 | Line 90 | class Publisher(Actor):
90          dbsWriter = DBSWriter(self.DBSURL,level='ERROR')
91          
92          try:
93 <            dbsWriter.importDataset(globalDBS, self.datasetpath, self.DBSURL)
93 >            dbsWriter.importDatasetWithoutParentage(globalDBS, self.datasetpath, self.DBSURL)
94          except DBSWriterError, ex:
95              msg = "Error importing dataset to be processed into local DBS\n"
96              msg += "Source Dataset: %s\n" % datasetpath
97              msg += "Source DBS: %s\n" % globalDBS
98              msg += "Destination DBS: %s\n" % self.DBSURL
99              common.logger.message(msg)
100 +            common.logger.write(str(ex))
101              return 1
102          return 0
103            
# Line 89 | Line 112 | class Publisher(Actor):
112              msg = "Error: Problem with "+file+" file"  
113              common.logger.message(msg)
114              return self.exit_status
115 <            
116 <        if (self.datasetpath != 'None'):
117 <            common.logger.message("--->>> Importing parent dataset in the dbs")
118 <            status_import=self.importParentDataset(self.globalDBS, self.datasetpath)
119 <            if (status_import == 1):
120 <                common.logger.message('Problem with parent import from the global DBS '+self.globalDBS+ 'to the local one '+self.DBSURL)
121 <                self.exit_status='1'
122 <                return self.exit_status
123 <            common.logger.message("Parent import ok")
115 >
116 >        if (len(self.dataset_to_import) != 0):
117 >           for dataset in self.dataset_to_import:
118 >               common.logger.message("--->>> Importing parent dataset in the dbs: " +dataset)
119 >               status_import=self.importParentDataset(self.globalDBS, dataset)
120 >               if (status_import == 1):
121 >                   common.logger.message('Problem with parent '+ dataset +' import from the global DBS '+self.globalDBS+ 'to the local one '+self.DBSURL)
122 >                   self.exit_status='1'
123 >                   return self.exit_status
124 >               else:    
125 >                   common.logger.message('Import ok of dataset '+dataset)
126              
127          #// DBS to contact
128          dbswriter = DBSWriter(self.DBSURL)                        
# Line 113 | Line 138 | class Publisher(Actor):
138          datasets=fileinfo.dataset
139          common.logger.debug(6,"FileInfo = " + str(fileinfo))
140          common.logger.debug(6,"DatasetInfo = " + str(datasets))
141 +        if len(datasets)<=0:
142 +           self.exit_status = '1'
143 +           msg = "Error: No info about dataset in the xml file "+file
144 +           common.logger.message(msg)
145 +           return self.exit_status
146          for dataset in datasets:
147              #### for production data
148 +            self.processedData = dataset['ProcessedDataset']
149              if (dataset['PrimaryDataset'] == 'null'):
150 <                dataset['PrimaryDataset'] = dataset['ProcessedDataset']
151 <                
150 >                #dataset['PrimaryDataset'] = dataset['ProcessedDataset']
151 >                dataset['PrimaryDataset'] = self.userprocessedData
152 >            #else: # add parentage from input dataset
153 >            elif self.datasetpath.upper() != 'NONE':
154 >                dataset['ParentDataset']= self.datasetpath
155 >    
156              dataset['PSetContent']=self.content
157              cfgMeta = {'name' : self.pset , 'Type' : 'user' , 'annotation': 'user cfg', 'version' : 'private version'} # add real name of user cfg
158              common.logger.message("PrimaryDataset = %s"%dataset['PrimaryDataset'])
# Line 162 | Line 197 | class Publisher(Actor):
197              elif (file['LFN'] == ''):
198                  self.noLFN.append(file['PFN'])
199              else:
200 <                if int(file['TotalEvents']) != 0 :
201 <                    file.lumisections = {}
202 <                    for ds in file.dataset:
203 <                        ds['ProcessedDataset']=procdataset
204 <                        ### Fede for production
205 <                        if (ds['PrimaryDataset'] == 'null'):
206 <                            ds['PrimaryDataset']=procdataset
207 <                    filestopublish.append(file)
208 <                else:
209 <                    self.noEventsFiles.append(file['LFN'])
200 >                ### Fede removed check about number of events
201 >                #if int(file['TotalEvents']) != 0 :
202 >                    # lumi info are now in run hash
203 >                file.runs = {}
204 >                for ds in file.dataset:
205 >                    ### Fede for production
206 >                    if (ds['PrimaryDataset'] == 'null'):
207 >                        ds['PrimaryDataset']=self.userprocessedData
208 >                filestopublish.append(file)
209 >                #else:
210 >                #    self.noEventsFiles.append(file['LFN'])
211          jobReport.files = filestopublish
212          ### if all files of FJR have number of events = 0
213          if (len(filestopublish) == 0):
# Line 183 | Line 219 | class Publisher(Actor):
219          Blocks=None
220          try:
221              Blocks=dbswriter.insertFiles(jobReport)
222 <            common.logger.message("Blocks = %s"%Blocks)
222 >            common.logger.message("Inserting file in blocks = %s"%Blocks)
223          except DBSWriterError, ex:
224 <            common.logger.message("Insert file error: %s"%ex)
224 >            common.logger.error("Insert file error: %s"%ex)
225          return Blocks
226  
227      def run(self):
# Line 194 | Line 230 | class Publisher(Actor):
230          """
231          
232          file_list = glob.glob(self.resDir+"crab_fjr*.xml")
233 +        ## Select only those fjr that are succesfull
234 +        good_list=[]
235 +        for fjr in file_list:
236 +            reports = readJobReport(fjr)
237 +            if len(reports)>0:
238 +               if reports[0].status == "Success":
239 +                  good_list.append(fjr)
240 +        file_list=good_list
241 +        ##
242          common.logger.debug(6, "file_list = "+str(file_list))
243          common.logger.debug(6, "len(file_list) = "+str(len(file_list)))
244              
# Line 208 | Line 253 | class Publisher(Actor):
253  
254              common.logger.message("--->>> Start files publication")
255              for file in file_list:
256 <                common.logger.message("file = "+file)
256 >                common.logger.debug(1, "file = "+file)
257                  Blocks=self.publishAJobReport(file,self.processedData)
258                  if Blocks:
259 <                    [BlocksList.append(x) for x in Blocks]
259 >                    for x in Blocks: # do not allow multiple entries of the same block
260 >                        if x not in BlocksList:
261 >                           BlocksList.append(x)
262                      
263              # close the blocks
264              common.logger.debug(6, "BlocksList = %s"%BlocksList)

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines