ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Publisher.py
(Generate patch)

Comparing COMP/CRAB/python/Publisher.py (file contents):
Revision 1.2 by fanzago, Wed Nov 21 14:18:17 2007 UTC vs.
Revision 1.21 by fanzago, Tue Jan 20 11:20:46 2009 UTC

# Line 5 | Line 5 | from Actor import *
5   from crab_util import *
6   from crab_logger import Logger
7   from crab_exceptions import *
8 < from FwkJobRep.ReportParser import readJobReport
8 > from ProdCommon.FwkJobRep.ReportParser import readJobReport
9 > from ProdCommon.FwkJobRep.ReportState import checkSuccess
10   from ProdCommon.MCPayloads.WorkflowSpec import WorkflowSpec
11   from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter
12   from ProdCommon.DataMgmt.DBS.DBSErrors import DBSWriterError, formatEx,DBSReaderError
# Line 24 | Line 25 | class Publisher(Actor):
25          """
26  
27          try:
28 <            self.processedData = cfg_params['USER.publish_data_name']
28 >            self.userprocessedData = cfg_params['USER.publish_data_name']
29 >            self.processedData = None
30          except KeyError:
31              raise CrabException('Cannot publish output data, because you did not specify USER.publish_data_name parameter in the crab.cfg file')
32  
# Line 32 | Line 34 | class Publisher(Actor):
34              if (int(cfg_params['USER.copy_data']) != 1): raise KeyError
35          except KeyError:
36              raise CrabException('You can not publish data because you did not selected *** copy_data = 1  *** in the crab.cfg file')
37 <
38 <        #common.logger.message('processedData = '+self.processedData)
37 >        try:
38 >            self.pset = cfg_params['CMSSW.pset']
39 >        except KeyError:
40 >            raise CrabException('Cannot publish output data, because you did not specify the psetname in [CMSSW] of your crab.cfg file')
41 >        try:
42 >            self.globalDBS=cfg_params['CMSSW.dbs_url']
43 >        except KeyError:
44 >            self.globalDBS="http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet"
45 >        try:
46 >            self.DBSURL=cfg_params['USER.dbs_url_for_publication']
47 >            common.logger.message('<dbs_url_for_publication> = '+self.DBSURL)
48 >            if (self.DBSURL == "http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet") or (self.DBSURL == "https://cmsdbsprod.cern.ch:8443/cms_dbs_prod_global_writer/servlet/DBSServlet"):
49 >                msg = "You can not publish your data in the globalDBS = " + self.DBSURL + "\n"
50 >                msg = msg + "Please write your local one in the [USER] section 'dbs_url_for_publication'"
51 >                raise CrabException(msg)
52 >        except KeyError:
53 >            msg = "Warning. The [USER] section does not have 'dbs_url_for_publication'"
54 >            msg = msg + " entry, necessary to publish the data.\n"
55 >            msg = msg + "Use the command **crab -publish -USER.dbs_url_for_publication=dbs_url_for_publication*** \nwhere dbs_url_for_publication is your local dbs instance."
56 >            raise CrabException(msg)
57 >            
58 >        self.content=file(self.pset).read()
59          self.resDir = common.work_space.resDir()
60 <        #common.logger.message('resDir = '+self.resDir)
61 <        self.DBSURL=cfg_params['USER.dbs_url_for_publication']
62 <        common.logger.message('dbs url = '+self.DBSURL)
60 >        
61 >        self.dataset_to_import=[]
62 >        
63          self.datasetpath=cfg_params['CMSSW.datasetpath']
64 <        #common.logger.message('datasetpath = '+self.datasetpath)
64 >        if (self.datasetpath.upper() != 'NONE'):
65 >            self.dataset_to_import.append(self.datasetpath)
66 >        
67 >        ### Added PU dataset
68 >        tmp = cfg_params.get('CMSSW.dataset_pu',None)
69 >        if tmp :
70 >            datasets = tmp.split(',')
71 >            for dataset in datasets:
72 >                dataset=string.strip(dataset)
73 >                self.dataset_to_import.append(dataset)
74 >        ###        
75 >                
76          self.SEName=''
77          self.CMSSW_VERSION=''
78          self.exit_status=''
79          self.time = time.strftime('%y%m%d_%H%M%S',time.localtime(time.time()))
80 +        self.problemFiles=[]  
81 +        self.noEventsFiles=[]
82 +        self.noLFN=[]
83          
84      def importParentDataset(self,globalDBS, datasetpath):
85          """
# Line 51 | Line 87 | class Publisher(Actor):
87          dbsWriter = DBSWriter(self.DBSURL,level='ERROR')
88          
89          try:
90 <            dbsWriter.importDataset(globalDBS, self.datasetpath, self.DBSURL)
90 >            dbsWriter.importDatasetWithoutParentage(globalDBS, self.datasetpath, self.DBSURL)
91          except DBSWriterError, ex:
92              msg = "Error importing dataset to be processed into local DBS\n"
93              msg += "Source Dataset: %s\n" % datasetpath
# Line 72 | Line 108 | class Publisher(Actor):
108              msg = "Error: Problem with "+file+" file"  
109              common.logger.message(msg)
110              return self.exit_status
111 <            
112 <        #### the globalDBS has to be written in the crab cfg file!!!!! ###############
113 <        if (self.datasetpath != 'None'):
114 <            globalDBS="http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet"
115 <            common.logger.message("--->>> Importing parent dataset in the dbs")
116 <            status_import=self.importParentDataset(globalDBS, self.datasetpath)
117 <            if (status_import == 1):
118 <                common.logger.message('Problem with parent import from the global DBS '+globalDBS+ 'to the local one '+self.DBSURL)
119 <                self.exit_status='1'
120 <                ##  ___ >>>>>>>  comment out the next line, if you have problem with the import
121 <                return self.exit_status
86 <            common.logger.message("Parent import ok")
111 >
112 >        if (len(self.dataset_to_import) != 0):
113 >           for dataset in self.dataset_to_import:
114 >               common.logger.message("--->>> Importing parent dataset in the dbs: " +dataset)
115 >               status_import=self.importParentDataset(self.globalDBS, dataset)
116 >               if (status_import == 1):
117 >                   common.logger.message('Problem with parent '+ dataset +' import from the global DBS '+self.globalDBS+ 'to the local one '+self.DBSURL)
118 >                   self.exit_status='1'
119 >                   return self.exit_status
120 >               else:    
121 >                   common.logger.message('Import ok of dataset '+dataset)
122              
123          #// DBS to contact
124          dbswriter = DBSWriter(self.DBSURL)                        
125          try:  
126              fileinfo= jobReport.files[0]
92            #fileinfo.lumisections = {}
127              self.exit_status = '0'
128          except IndexError:
129              self.exit_status = '1'
# Line 97 | Line 131 | class Publisher(Actor):
131              common.logger.message(msg)
132              return self.exit_status
133  
100        #common.logger.message("FileInfo = " + str(fileinfo))
134          datasets=fileinfo.dataset
135 <        #common.logger.message("DatasetInfo = " + str(datasets))
135 >        common.logger.debug(6,"FileInfo = " + str(fileinfo))
136 >        common.logger.debug(6,"DatasetInfo = " + str(datasets))
137 >        if len(datasets)<=0:
138 >           self.exit_status = '1'
139 >           msg = "Error: No info about dataset in the xml file "+file
140 >           common.logger.message(msg)
141 >           return self.exit_status
142          for dataset in datasets:
143 <            #### to understand how to fill cfgMeta info ###############
144 <            cfgMeta = {'name' : 'usercfg' , 'Type' : 'user' , 'annotation': 'user cfg', 'version' : 'private version'} # add real name of user cfg
143 >            #### for production data
144 >            self.processedData = dataset['ProcessedDataset']
145 >            if (dataset['PrimaryDataset'] == 'null'):
146 >                #dataset['PrimaryDataset'] = dataset['ProcessedDataset']
147 >                dataset['PrimaryDataset'] = self.userprocessedData
148 >            #else: # add parentage from input dataset
149 >            elif self.datasetpath.upper() != 'NONE':
150 >                dataset['ParentDataset']= self.datasetpath
151 >    
152 >            dataset['PSetContent']=self.content
153 >            cfgMeta = {'name' : self.pset , 'Type' : 'user' , 'annotation': 'user cfg', 'version' : 'private version'} # add real name of user cfg
154              common.logger.message("PrimaryDataset = %s"%dataset['PrimaryDataset'])
155              common.logger.message("ProcessedDataset = %s"%dataset['ProcessedDataset'])
156 <            common.logger.message("--->>> Inserting primary: %s processed : %s"%(dataset['PrimaryDataset'],dataset['ProcessedDataset']))
157 <            #common.logger.message("dataset: %s"%dataset)
156 >            common.logger.message("<User Dataset Name> = /"+dataset['PrimaryDataset']+"/"+dataset['ProcessedDataset']+"/USER")
157 >            
158 >            common.logger.debug(6,"--->>> Inserting primary: %s processed : %s"%(dataset['PrimaryDataset'],dataset['ProcessedDataset']))
159              
160              primary = DBSWriterObjects.createPrimaryDataset( dataset, dbswriter.dbs)
161 <            common.logger.message("Primary:  %s "%primary)
161 >            common.logger.debug(6,"Primary:  %s "%primary)
162              
163              algo = DBSWriterObjects.createAlgorithm(dataset, cfgMeta, dbswriter.dbs)
164 <            common.logger.message("Algo:  %s "%algo)
164 >            common.logger.debug(6,"Algo:  %s "%algo)
165  
166              processed = DBSWriterObjects.createProcessedDataset(primary, algo, dataset, dbswriter.dbs)
167 <            common.logger.message("Processed:  %s "%processed)
167 >            common.logger.debug(6,"Processed:  %s "%processed)
168              
169 <            common.logger.message("Inserted primary %s processed %s"%(primary,processed))
169 >            common.logger.debug(6,"Inserted primary %s processed %s"%(primary,processed))
170              
171 <        #common.logger.message("exit_status = %s "%self.exit_status)
171 >        common.logger.debug(6,"exit_status = %s "%self.exit_status)
172          return self.exit_status    
173  
174      def publishAJobReport(self,file,procdataset):
# Line 133 | Line 182 | class Publisher(Actor):
182              self.exit_status = '1'
183              msg = "Error: Problem with "+file+" file"
184              raise CrabException(msg)
185 <        # overwrite ProcessedDataset with user defined value
186 <        # overwrite lumisections with no value
185 >        ### overwrite ProcessedDataset with user defined value
186 >        ### overwrite lumisections with no value
187 >        ### skip publication for 0 events files
188 >        filestopublish=[]
189          for file in jobReport.files:
190 <            file.lumisections = {}
191 <            for ds in file.dataset:
192 <                ds['ProcessedDataset']=procdataset
190 >            #### added check for problem with copy to SE and empty lfn
191 >            if (string.find(file['LFN'], 'copy_problems') != -1):
192 >                self.problemFiles.append(file['LFN'])
193 >            elif (file['LFN'] == ''):
194 >                self.noLFN.append(file['PFN'])
195 >            else:
196 >                if int(file['TotalEvents']) != 0 :
197 >                    #file.lumisections = {}
198 >                    # lumi info are now in run hash
199 >                    file.runs = {}
200 >                    for ds in file.dataset:
201 >                        ### Fede for production
202 >                        if (ds['PrimaryDataset'] == 'null'):
203 >                            #ds['PrimaryDataset']=procdataset
204 >                            ds['PrimaryDataset']=self.userprocessedData
205 >                    filestopublish.append(file)
206 >                else:
207 >                    self.noEventsFiles.append(file['LFN'])
208 >        jobReport.files = filestopublish
209 >        ### if all files of FJR have number of events = 0
210 >        if (len(filestopublish) == 0):
211 >           return None
212 >          
213          #// DBS to contact
214          dbswriter = DBSWriter(self.DBSURL)
215          # insert files
216          Blocks=None
217          try:
218              Blocks=dbswriter.insertFiles(jobReport)
219 <            common.logger.message("Blocks = %s"%Blocks)
219 >            common.logger.message("Inserting file in blocks = %s"%Blocks)
220          except DBSWriterError, ex:
221 <            common.logger.message("Insert file error: %s"%ex)
221 >            common.logger.error("Insert file error: %s"%ex)
222          return Blocks
223  
224      def run(self):
# Line 156 | Line 227 | class Publisher(Actor):
227          """
228          
229          file_list = glob.glob(self.resDir+"crab_fjr*.xml")
230 +        ## Select only those fjr that are succesfull
231 +        good_list=[]
232 +        for fjr in file_list:
233 +            reports = readJobReport(fjr)
234 +            if len(reports)>0:
235 +               if reports[0].status == "Success":
236 +                  good_list.append(fjr)
237 +        file_list=good_list
238 +        ##
239          common.logger.debug(6, "file_list = "+str(file_list))
240          common.logger.debug(6, "len(file_list) = "+str(len(file_list)))
241              
# Line 170 | Line 250 | class Publisher(Actor):
250  
251              common.logger.message("--->>> Start files publication")
252              for file in file_list:
253 <                common.logger.message("file = "+file)
253 >                common.logger.debug(1, "file = "+file)
254                  Blocks=self.publishAJobReport(file,self.processedData)
255                  if Blocks:
256 <                    [BlocksList.append(x) for x in Blocks]
256 >                    for x in Blocks: # do not allow multiple entries of the same block
257 >                        if x not in BlocksList:
258 >                           BlocksList.append(x)
259                      
260              # close the blocks
261 <            common.logger.message("BlocksList = %s"%BlocksList)
261 >            common.logger.debug(6, "BlocksList = %s"%BlocksList)
262              # dbswriter = DBSWriter(self.DBSURL,level='ERROR')
263              dbswriter = DBSWriter(self.DBSURL)
264              
265              for BlockName in BlocksList:
266                  try:  
267                      closeBlock=dbswriter.manageFileBlock(BlockName,maxFiles= 1)
268 <                    common.logger.message("closeBlock %s"%closeBlock)
268 >                    common.logger.debug(6, "closeBlock %s"%closeBlock)
269                      #dbswriter.dbs.closeBlock(BlockName)
270                  except DBSWriterError, ex:
271                      common.logger.message("Close block error %s"%ex)
272 +
273 +            if (len(self.noEventsFiles)>0):
274 +                common.logger.message("--->>> WARNING: "+str(len(self.noEventsFiles))+" files not published because they contain 0 events are:")
275 +                for lfn in self.noEventsFiles:
276 +                    common.logger.message("------ LFN: %s"%lfn)
277 +            if (len(self.noLFN)>0):
278 +                common.logger.message("--->>> WARNING: there are "+str(len(self.noLFN))+" files not published because they have empty LFN")
279 +                for pfn in self.noLFN:
280 +                    common.logger.message("------ pfn: %s"%pfn)
281 +            if (len(self.problemFiles)>0):
282 +                common.logger.message("--->>> WARNING: "+str(len(self.problemFiles))+" files not published because they had problem with copy to SE")
283 +                for lfn in self.problemFiles:
284 +                    common.logger.message("------ LFN: %s"%lfn)
285              common.logger.message("--->>> End files publication")
286 +            common.logger.message("--->>> To check data publication please use: InspectDBS2.py --DBSURL=<dbs_url_for_publication> --datasetPath=<User Dataset Name>")
287              return self.exit_status
288  
289          else:

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines