ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Publisher.py
(Generate patch)

Comparing COMP/CRAB/python/Publisher.py (file contents):
Revision 1.10 by afanfani, Fri Feb 15 09:53:14 2008 UTC vs.
Revision 1.19 by afanfani, Mon Nov 17 13:02:07 2008 UTC

# Line 6 | Line 6 | from crab_util import *
6   from crab_logger import Logger
7   from crab_exceptions import *
8   from ProdCommon.FwkJobRep.ReportParser import readJobReport
9 + from ProdCommon.FwkJobRep.ReportState import checkSuccess
10   from ProdCommon.MCPayloads.WorkflowSpec import WorkflowSpec
11   from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter
12   from ProdCommon.DataMgmt.DBS.DBSErrors import DBSWriterError, formatEx,DBSReaderError
# Line 24 | Line 25 | class Publisher(Actor):
25          """
26  
27          try:
28 <            self.processedData = cfg_params['USER.publish_data_name']
28 >            userprocessedData = cfg_params['USER.publish_data_name']
29 >            self.processedData = None
30          except KeyError:
31              raise CrabException('Cannot publish output data, because you did not specify USER.publish_data_name parameter in the crab.cfg file')
32  
# Line 48 | Line 50 | class Publisher(Actor):
50                  msg = msg + "Please write your local one in the [USER] section 'dbs_url_for_publication'"
51                  raise CrabException(msg)
52          except KeyError:
53 <            msg = "Error. The [USER] section does not have 'dbs_url_for_publication'"
54 <            msg = msg + " entry, necessary to publish the data"
53 >            msg = "Warning. The [USER] section does not have 'dbs_url_for_publication'"
54 >            msg = msg + " entry, necessary to publish the data.\n"
55 >            msg = msg + "Use the command **crab -publish -USER.dbs_url_for_publication=dbs_url_for_publication*** \nwhere dbs_url_for_publication is your local dbs instance."
56              raise CrabException(msg)
57              
58          self.content=file(self.pset).read()
59          self.resDir = common.work_space.resDir()
60 +        
61 +        self.dataset_to_import=[]
62 +        
63          self.datasetpath=cfg_params['CMSSW.datasetpath']
64 +        if (self.datasetpath.upper() != 'NONE'):
65 +            self.dataset_to_import.append(self.datasetpath)
66 +        
67 +        ### Added PU dataset
68 +        tmp = cfg_params.get('CMSSW.dataset_pu',None)
69 +        if tmp :
70 +            datasets = tmp.split(',')
71 +            for dataset in datasets:
72 +                dataset=string.strip(dataset)
73 +                self.dataset_to_import.append(dataset)
74 +        ###        
75 +                
76          self.SEName=''
77          self.CMSSW_VERSION=''
78          self.exit_status=''
# Line 90 | Line 108 | class Publisher(Actor):
108              msg = "Error: Problem with "+file+" file"  
109              common.logger.message(msg)
110              return self.exit_status
111 <            
112 <        if (self.datasetpath != 'None'):
113 <            common.logger.message("--->>> Importing parent dataset in the dbs")
114 <            status_import=self.importParentDataset(self.globalDBS, self.datasetpath)
115 <            if (status_import == 1):
116 <                common.logger.message('Problem with parent import from the global DBS '+self.globalDBS+ 'to the local one '+self.DBSURL)
117 <                self.exit_status='1'
118 <                return self.exit_status
119 <            common.logger.message("Parent import ok")
111 >
112 >        if (len(self.dataset_to_import) != 0):
113 >           for dataset in self.dataset_to_import:
114 >               common.logger.message("--->>> Importing parent dataset in the dbs: " +dataset)
115 >               status_import=self.importParentDataset(self.globalDBS, dataset)
116 >               if (status_import == 1):
117 >                   common.logger.message('Problem with parent '+ dataset +' import from the global DBS '+self.globalDBS+ 'to the local one '+self.DBSURL)
118 >                   self.exit_status='1'
119 >                   return self.exit_status
120 >               else:    
121 >                   common.logger.message('Import ok of dataset '+dataset)
122              
123          #// DBS to contact
124          dbswriter = DBSWriter(self.DBSURL)                        
# Line 114 | Line 134 | class Publisher(Actor):
134          datasets=fileinfo.dataset
135          common.logger.debug(6,"FileInfo = " + str(fileinfo))
136          common.logger.debug(6,"DatasetInfo = " + str(datasets))
137 +        if len(datasets)<=0:
138 +           self.exit_status = '1'
139 +           msg = "Error: No info about dataset in the xml file "+file
140 +           common.logger.message(msg)
141 +           return self.exit_status
142          for dataset in datasets:
143              #### for production data
144 +            self.processedData = dataset['ProcessedDataset']
145              if (dataset['PrimaryDataset'] == 'null'):
146                  dataset['PrimaryDataset'] = dataset['ProcessedDataset']
147 <                
147 >            else: # add parentage from input dataset
148 >                dataset['ParentDataset']= self.datasetpath
149 >    
150              dataset['PSetContent']=self.content
151              cfgMeta = {'name' : self.pset , 'Type' : 'user' , 'annotation': 'user cfg', 'version' : 'private version'} # add real name of user cfg
152              common.logger.message("PrimaryDataset = %s"%dataset['PrimaryDataset'])
# Line 164 | Line 192 | class Publisher(Actor):
192                  self.noLFN.append(file['PFN'])
193              else:
194                  if int(file['TotalEvents']) != 0 :
195 <                    file.lumisections = {}
195 >                    #file.lumisections = {}
196 >                    # lumi info are now in run hash
197 >                    file.runs = {}
198                      for ds in file.dataset:
199 <                        ds['ProcessedDataset']=procdataset
199 >                        ### FEDE FOR NEW LFN ###
200 >                        #ds['ProcessedDataset']=procdataset
201 >                        ########################
202                          ### Fede for production
203                          if (ds['PrimaryDataset'] == 'null'):
204                              ds['PrimaryDataset']=procdataset
# Line 184 | Line 216 | class Publisher(Actor):
216          Blocks=None
217          try:
218              Blocks=dbswriter.insertFiles(jobReport)
219 <            common.logger.message("Blocks = %s"%Blocks)
219 >            common.logger.message("Inserting file in blocks = %s"%Blocks)
220          except DBSWriterError, ex:
221 <            common.logger.message("Insert file error: %s"%ex)
221 >            common.logger.error("Insert file error: %s"%ex)
222          return Blocks
223  
224      def run(self):
# Line 195 | Line 227 | class Publisher(Actor):
227          """
228          
229          file_list = glob.glob(self.resDir+"crab_fjr*.xml")
230 +        ## Select only those fjr that are succesfull
231 +        good_list=[]
232 +        for fjr in file_list:
233 +            reports = readJobReport(fjr)
234 +            if reports[0].status == "Success":
235 +               good_list.append(fjr)
236 +        file_list=good_list
237 +        ##
238          common.logger.debug(6, "file_list = "+str(file_list))
239          common.logger.debug(6, "len(file_list) = "+str(len(file_list)))
240              
# Line 209 | Line 249 | class Publisher(Actor):
249  
250              common.logger.message("--->>> Start files publication")
251              for file in file_list:
252 <                common.logger.message("file = "+file)
252 >                common.logger.debug(1, "file = "+file)
253                  Blocks=self.publishAJobReport(file,self.processedData)
254                  if Blocks:
255 <                    [BlocksList.append(x) for x in Blocks]
255 >                    for x in Blocks: # do not allow multiple entries of the same block
256 >                        if x not in BlocksList:
257 >                           BlocksList.append(x)
258                      
259              # close the blocks
260              common.logger.debug(6, "BlocksList = %s"%BlocksList)

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines