ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Publisher.py
(Generate patch)

Comparing COMP/CRAB/python/Publisher.py (file contents):
Revision 1.17 by afanfani, Wed Oct 15 09:28:21 2008 UTC vs.
Revision 1.32 by slacapra, Wed Jun 10 13:26:27 2009 UTC

# Line 3 | Line 3 | import common
3   import time, glob
4   from Actor import *
5   from crab_util import *
6 from crab_logger import Logger
6   from crab_exceptions import *
7   from ProdCommon.FwkJobRep.ReportParser import readJobReport
8 + from ProdCommon.FwkJobRep.ReportState import checkSuccess
9   from ProdCommon.MCPayloads.WorkflowSpec import WorkflowSpec
10   from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter
11   from ProdCommon.DataMgmt.DBS.DBSErrors import DBSWriterError, formatEx,DBSReaderError
# Line 23 | Line 23 | class Publisher(Actor):
23          - publishes output data on DBS and DLS
24          """
25  
26 <        try:
27 <            userprocessedData = cfg_params['USER.publish_data_name']
28 <            self.processedData = None
29 <        except KeyError:
26 >        self.cfg_params=cfg_params
27 >      
28 >        if not cfg_params.has_key('USER.publish_data_name'):
29              raise CrabException('Cannot publish output data, because you did not specify USER.publish_data_name parameter in the crab.cfg file')
30 +        self.userprocessedData = cfg_params['USER.publish_data_name']
31 +        self.processedData = None
32  
33 <        try:
34 <            if (int(cfg_params['USER.copy_data']) != 1): raise KeyError
35 <        except KeyError:
36 <            raise CrabException('You can not publish data because you did not selected *** copy_data = 1  *** in the crab.cfg file')
37 <        try:
38 <            self.pset = cfg_params['CMSSW.pset']
39 <        except KeyError:
40 <            raise CrabException('Cannot publish output data, because you did not specify the psetname in [CMSSW] of your crab.cfg file')
41 <        try:
42 <            self.globalDBS=cfg_params['CMSSW.dbs_url']
43 <        except KeyError:
44 <            self.globalDBS="http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet"
45 <        try:
45 <            self.DBSURL=cfg_params['USER.dbs_url_for_publication']
46 <            common.logger.message('<dbs_url_for_publication> = '+self.DBSURL)
47 <            if (self.DBSURL == "http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet") or (self.DBSURL == "https://cmsdbsprod.cern.ch:8443/cms_dbs_prod_global_writer/servlet/DBSServlet"):
48 <                msg = "You can not publish your data in the globalDBS = " + self.DBSURL + "\n"
49 <                msg = msg + "Please write your local one in the [USER] section 'dbs_url_for_publication'"
50 <                raise CrabException(msg)
51 <        except KeyError:
33 >        if (not cfg_params.has_key('USER.copy_data') or int(cfg_params['USER.copy_data']) != 1 ) or \
34 >            (not cfg_params.has_key('USER.publish_data') or int(cfg_params['USER.publish_data']) != 1 ):
35 >            msg  = 'You can not publish data because you did not selected \n'
36 >            msg += '\t*** copy_data = 1 or publish_data = 1  *** in the crab.cfg file'
37 >
38 >        # try:
39 >        #     self.pset = cfg_params['CMSSW.pset']
40 >        # except KeyError:
41 >        #     raise CrabException('Cannot publish output data, because you did not specify the psetname in [CMSSW] of your crab.cfg file')
42 >
43 >        self.globalDBS=cfg_params.get('CMSSW.dbs_url',"http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet")
44 >
45 >        if not cfg_params.has_key('USER.dbs_url_for_publication'):
46              msg = "Warning. The [USER] section does not have 'dbs_url_for_publication'"
47              msg = msg + " entry, necessary to publish the data.\n"
48              msg = msg + "Use the command **crab -publish -USER.dbs_url_for_publication=dbs_url_for_publication*** \nwhere dbs_url_for_publication is your local dbs instance."
49              raise CrabException(msg)
50 +
51 +        self.DBSURL=cfg_params['USER.dbs_url_for_publication']
52 +        common.logger.info('<dbs_url_for_publication> = '+self.DBSURL)
53 +        if (self.DBSURL == "http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet") or (self.DBSURL == "https://cmsdbsprod.cern.ch:8443/cms_dbs_prod_global_writer/servlet/DBSServlet"):
54 +            msg = "You can not publish your data in the globalDBS = " + self.DBSURL + "\n"
55 +            msg = msg + "Please write your local one in the [USER] section 'dbs_url_for_publication'"
56 +            raise CrabException(msg)
57              
58          self.content=file(self.pset).read()
59          self.resDir = common.work_space.resDir()
# Line 71 | Line 72 | class Publisher(Actor):
72                  dataset=string.strip(dataset)
73                  self.dataset_to_import.append(dataset)
74          ###        
75 <                
75 >            
76 >        self.skipOcheck=cfg_params.get('CMSSW.publish_zero_event',0)
77 >    
78          self.SEName=''
79          self.CMSSW_VERSION=''
80          self.exit_status=''
# Line 86 | Line 89 | class Publisher(Actor):
89          dbsWriter = DBSWriter(self.DBSURL,level='ERROR')
90          
91          try:
92 <            dbsWriter.importDatasetWithoutParentage(globalDBS, self.datasetpath, self.DBSURL)
92 >            #dbsWriter.importDatasetWithoutParentage(globalDBS, self.datasetpath, self.DBSURL)
93 >            dbsWriter.importDataset(globalDBS, self.datasetpath, self.DBSURL)
94          except DBSWriterError, ex:
95              msg = "Error importing dataset to be processed into local DBS\n"
96              msg += "Source Dataset: %s\n" % datasetpath
97              msg += "Source DBS: %s\n" % globalDBS
98              msg += "Destination DBS: %s\n" % self.DBSURL
99 <            common.logger.message(msg)
99 >            common.logger.info(msg)
100 >            common.logger.debug(str(ex))
101              return 1
102          return 0
103            
# Line 105 | Line 110 | class Publisher(Actor):
110          except IndexError:
111              self.exit_status = '1'
112              msg = "Error: Problem with "+file+" file"  
113 <            common.logger.message(msg)
113 >            common.logger.info(msg)
114              return self.exit_status
115  
116          if (len(self.dataset_to_import) != 0):
117             for dataset in self.dataset_to_import:
118 <               common.logger.message("--->>> Importing parent dataset in the dbs: " +dataset)
118 >               common.logger.info("--->>> Importing parent dataset in the dbs: " +dataset)
119                 status_import=self.importParentDataset(self.globalDBS, dataset)
120                 if (status_import == 1):
121 <                   common.logger.message('Problem with parent '+ dataset +' import from the global DBS '+self.globalDBS+ 'to the local one '+self.DBSURL)
121 >                   common.logger.info('Problem with parent '+ dataset +' import from the global DBS '+self.globalDBS+ 'to the local one '+self.DBSURL)
122                     self.exit_status='1'
123                     return self.exit_status
124                 else:    
125 <                   common.logger.message('Import ok of dataset '+dataset)
125 >                   common.logger.info('Import ok of dataset '+dataset)
126              
127          #// DBS to contact
128          dbswriter = DBSWriter(self.DBSURL)                        
# Line 127 | Line 132 | class Publisher(Actor):
132          except IndexError:
133              self.exit_status = '1'
134              msg = "Error: No file to publish in xml file"+file+" file"  
135 <            common.logger.message(msg)
135 >            common.logger.info(msg)
136              return self.exit_status
137  
138          datasets=fileinfo.dataset
139 <        common.logger.debug(6,"FileInfo = " + str(fileinfo))
140 <        common.logger.debug(6,"DatasetInfo = " + str(datasets))
139 >        common.logger.log(10-1,"FileInfo = " + str(fileinfo))
140 >        common.logger.log(10-1,"DatasetInfo = " + str(datasets))
141 >        if len(datasets)<=0:
142 >           self.exit_status = '1'
143 >           msg = "Error: No info about dataset in the xml file "+file
144 >           common.logger.info(msg)
145 >           return self.exit_status
146          for dataset in datasets:
147              #### for production data
148              self.processedData = dataset['ProcessedDataset']
149              if (dataset['PrimaryDataset'] == 'null'):
150 <                dataset['PrimaryDataset'] = dataset['ProcessedDataset']
151 <            else: # add parentage from input dataset
150 >                #dataset['PrimaryDataset'] = dataset['ProcessedDataset']
151 >                dataset['PrimaryDataset'] = self.userprocessedData
152 >            #else: # add parentage from input dataset
153 >            elif self.datasetpath.upper() != 'NONE':
154                  dataset['ParentDataset']= self.datasetpath
155      
156              dataset['PSetContent']=self.content
157              cfgMeta = {'name' : self.pset , 'Type' : 'user' , 'annotation': 'user cfg', 'version' : 'private version'} # add real name of user cfg
158 <            common.logger.message("PrimaryDataset = %s"%dataset['PrimaryDataset'])
159 <            common.logger.message("ProcessedDataset = %s"%dataset['ProcessedDataset'])
160 <            common.logger.message("<User Dataset Name> = /"+dataset['PrimaryDataset']+"/"+dataset['ProcessedDataset']+"/USER")
158 >            common.logger.info("PrimaryDataset = %s"%dataset['PrimaryDataset'])
159 >            common.logger.info("ProcessedDataset = %s"%dataset['ProcessedDataset'])
160 >            common.logger.info("<User Dataset Name> = /"+dataset['PrimaryDataset']+"/"+dataset['ProcessedDataset']+"/USER")
161 >            self.dataset_to_check="/"+dataset['PrimaryDataset']+"/"+dataset['ProcessedDataset']+"/USER"
162              
163 <            common.logger.debug(6,"--->>> Inserting primary: %s processed : %s"%(dataset['PrimaryDataset'],dataset['ProcessedDataset']))
163 >            common.logger.log(10-1,"--->>> Inserting primary: %s processed : %s"%(dataset['PrimaryDataset'],dataset['ProcessedDataset']))
164              
165              primary = DBSWriterObjects.createPrimaryDataset( dataset, dbswriter.dbs)
166 <            common.logger.debug(6,"Primary:  %s "%primary)
166 >            common.logger.log(10-1,"Primary:  %s "%primary)
167              
168              algo = DBSWriterObjects.createAlgorithm(dataset, cfgMeta, dbswriter.dbs)
169 <            common.logger.debug(6,"Algo:  %s "%algo)
169 >            common.logger.log(10-1,"Algo:  %s "%algo)
170  
171              processed = DBSWriterObjects.createProcessedDataset(primary, algo, dataset, dbswriter.dbs)
172 <            common.logger.debug(6,"Processed:  %s "%processed)
172 >            common.logger.log(10-1,"Processed:  %s "%processed)
173              
174 <            common.logger.debug(6,"Inserted primary %s processed %s"%(primary,processed))
174 >            common.logger.log(10-1,"Inserted primary %s processed %s"%(primary,processed))
175              
176 <        common.logger.debug(6,"exit_status = %s "%self.exit_status)
176 >        common.logger.log(10-1,"exit_status = %s "%self.exit_status)
177          return self.exit_status    
178  
179      def publishAJobReport(self,file,procdataset):
# Line 185 | Line 198 | class Publisher(Actor):
198              elif (file['LFN'] == ''):
199                  self.noLFN.append(file['PFN'])
200              else:
201 <                if int(file['TotalEvents']) != 0 :
202 <                    #file.lumisections = {}
203 <                    # lumi info are now in run hash
201 >                if  self.skipOcheck==0:
202 >                    if int(file['TotalEvents']) != 0:
203 >                        #file.lumisections = {}
204 >                        # lumi info are now in run hash
205 >                        file.runs = {}
206 >                        for ds in file.dataset:
207 >                            ### Fede for production
208 >                            if (ds['PrimaryDataset'] == 'null'):
209 >                                #ds['PrimaryDataset']=procdataset
210 >                                ds['PrimaryDataset']=self.userprocessedData
211 >                        filestopublish.append(file)
212 >                    else:
213 >                        self.noEventsFiles.append(file['LFN'])
214 >                else:
215                      file.runs = {}
216                      for ds in file.dataset:
193                        ### FEDE FOR NEW LFN ###
194                        #ds['ProcessedDataset']=procdataset
195                        ########################
217                          ### Fede for production
218                          if (ds['PrimaryDataset'] == 'null'):
219 <                            ds['PrimaryDataset']=procdataset
219 >                            #ds['PrimaryDataset']=procdataset
220 >                            ds['PrimaryDataset']=self.userprocessedData
221                      filestopublish.append(file)
222 <                else:
201 <                    self.noEventsFiles.append(file['LFN'])
222 >      
223          jobReport.files = filestopublish
224          ### if all files of FJR have number of events = 0
225          if (len(filestopublish) == 0):
# Line 210 | Line 231 | class Publisher(Actor):
231          Blocks=None
232          try:
233              Blocks=dbswriter.insertFiles(jobReport)
234 <            common.logger.message("Blocks = %s"%Blocks)
234 >            common.logger.info("Inserting file in blocks = %s"%Blocks)
235          except DBSWriterError, ex:
236 <            common.logger.message("Insert file error: %s"%ex)
236 >            common.logger.info("Insert file error: %s"%ex)
237          return Blocks
238  
239      def run(self):
# Line 221 | Line 242 | class Publisher(Actor):
242          """
243          
244          file_list = glob.glob(self.resDir+"crab_fjr*.xml")
245 <        common.logger.debug(6, "file_list = "+str(file_list))
246 <        common.logger.debug(6, "len(file_list) = "+str(len(file_list)))
245 >        ## Select only those fjr that are succesfull
246 >        good_list=[]
247 >        for fjr in file_list:
248 >            reports = readJobReport(fjr)
249 >            if len(reports)>0:
250 >               if reports[0].status == "Success":
251 >                  good_list.append(fjr)
252 >        file_list=good_list
253 >        ##
254 >        common.logger.log(10-1, "file_list = "+str(file_list))
255 >        common.logger.log(10-1, "len(file_list) = "+str(len(file_list)))
256              
257          if (len(file_list)>0):
258              BlocksList=[]
259 <            common.logger.message("--->>> Start dataset publication")
259 >            common.logger.info("--->>> Start dataset publication")
260              self.exit_status=self.publishDataset(file_list[0])
261              if (self.exit_status == '1'):
262                  return self.exit_status
263 <            common.logger.message("--->>> End dataset publication")
263 >            common.logger.info("--->>> End dataset publication")
264  
265  
266 <            common.logger.message("--->>> Start files publication")
266 >            common.logger.info("--->>> Start files publication")
267              for file in file_list:
268 <                common.logger.message("file = "+file)
268 >                common.logger.debug( "file = "+file)
269                  Blocks=self.publishAJobReport(file,self.processedData)
270                  if Blocks:
271                      for x in Blocks: # do not allow multiple entries of the same block
# Line 243 | Line 273 | class Publisher(Actor):
273                             BlocksList.append(x)
274                      
275              # close the blocks
276 <            common.logger.debug(6, "BlocksList = %s"%BlocksList)
276 >            common.logger.log(10-1, "BlocksList = %s"%BlocksList)
277              # dbswriter = DBSWriter(self.DBSURL,level='ERROR')
278              dbswriter = DBSWriter(self.DBSURL)
279              
280              for BlockName in BlocksList:
281                  try:  
282                      closeBlock=dbswriter.manageFileBlock(BlockName,maxFiles= 1)
283 <                    common.logger.debug(6, "closeBlock %s"%closeBlock)
283 >                    common.logger.log(10-1, "closeBlock %s"%closeBlock)
284                      #dbswriter.dbs.closeBlock(BlockName)
285                  except DBSWriterError, ex:
286 <                    common.logger.message("Close block error %s"%ex)
286 >                    common.logger.info("Close block error %s"%ex)
287  
288              if (len(self.noEventsFiles)>0):
289 <                common.logger.message("--->>> WARNING: "+str(len(self.noEventsFiles))+" files not published because they contain 0 events are:")
289 >                common.logger.info("--->>> WARNING: "+str(len(self.noEventsFiles))+" files not published because they contain 0 events are:")
290                  for lfn in self.noEventsFiles:
291 <                    common.logger.message("------ LFN: %s"%lfn)
291 >                    common.logger.info("------ LFN: %s"%lfn)
292              if (len(self.noLFN)>0):
293 <                common.logger.message("--->>> WARNING: there are "+str(len(self.noLFN))+" files not published because they have empty LFN")
293 >                common.logger.info("--->>> WARNING: there are "+str(len(self.noLFN))+" files not published because they have empty LFN")
294                  for pfn in self.noLFN:
295 <                    common.logger.message("------ pfn: %s"%pfn)
295 >                    common.logger.info("------ pfn: %s"%pfn)
296              if (len(self.problemFiles)>0):
297 <                common.logger.message("--->>> WARNING: "+str(len(self.problemFiles))+" files not published because they had problem with copy to SE")
297 >                common.logger.info("--->>> WARNING: "+str(len(self.problemFiles))+" files not published because they had problem with copy to SE")
298                  for lfn in self.problemFiles:
299 <                    common.logger.message("------ LFN: %s"%lfn)
300 <            common.logger.message("--->>> End files publication")
301 <            common.logger.message("--->>> To check data publication please use: InspectDBS2.py --DBSURL=<dbs_url_for_publication> --datasetPath=<User Dataset Name>")
299 >                    common.logger.info("------ LFN: %s"%lfn)
300 >            common.logger.info("--->>> End files publication")
301 >          
302 >            self.cfg_params['USER.dataset_to_check']=self.dataset_to_check
303 >            from InspectDBS import InspectDBS
304 >            check=InspectDBS(self.cfg_params)
305 >            check.checkPublication()
306              return self.exit_status
307  
308          else:
309 <            common.logger.message("--->>> "+self.resDir+" empty: no file to publish on DBS")
309 >            common.logger.info("--->>> "+self.resDir+" empty: no file to publish on DBS")
310              self.exit_status = '1'
311              return self.exit_status
312      

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines