ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Publisher.py
(Generate patch)

Comparing COMP/CRAB/python/Publisher.py (file contents):
Revision 1.19 by afanfani, Mon Nov 17 13:02:07 2008 UTC vs.
Revision 1.48 by spiga, Tue May 4 16:35:43 2010 UTC

# Line 3 | Line 3 | import common
3   import time, glob
4   from Actor import *
5   from crab_util import *
6 from crab_logger import Logger
6   from crab_exceptions import *
7   from ProdCommon.FwkJobRep.ReportParser import readJobReport
8   from ProdCommon.FwkJobRep.ReportState import checkSuccess
# Line 13 | Line 12 | from ProdCommon.DataMgmt.DBS.DBSErrors i
12   from ProdCommon.DataMgmt.DBS.DBSReader import DBSReader
13   from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter,DBSWriterObjects
14   import sys
15 + from DBSAPI.dbsApi import DbsApi
16 + from DBSAPI.dbsMigrateApi import DbsMigrateApi
17 + from DBSAPI.dbsApiException import *
18  
19   class Publisher(Actor):
20      def __init__(self, cfg_params):
# Line 24 | Line 26 | class Publisher(Actor):
26          - publishes output data on DBS and DLS
27          """
28  
29 <        try:
30 <            userprocessedData = cfg_params['USER.publish_data_name']
31 <            self.processedData = None
30 <        except KeyError:
29 >        self.cfg_params=cfg_params
30 >      
31 >        if not cfg_params.has_key('USER.publish_data_name'):
32              raise CrabException('Cannot publish output data, because you did not specify USER.publish_data_name parameter in the crab.cfg file')
33 +        self.userprocessedData = cfg_params['USER.publish_data_name']
34 +        self.processedData = None
35  
36 <        try:
37 <            if (int(cfg_params['USER.copy_data']) != 1): raise KeyError
38 <        except KeyError:
39 <            raise CrabException('You can not publish data because you did not selected *** copy_data = 1  *** in the crab.cfg file')
40 <        try:
41 <            self.pset = cfg_params['CMSSW.pset']
42 <        except KeyError:
36 >        if (not cfg_params.has_key('USER.copy_data') or int(cfg_params['USER.copy_data']) != 1 ) or \
37 >            (not cfg_params.has_key('USER.publish_data') or int(cfg_params['USER.publish_data']) != 1 ):
38 >            msg  = 'You can not publish data because you did not selected \n'
39 >            msg += '\t*** copy_data = 1 and publish_data = 1  *** in the crab.cfg file'
40 >            raise CrabException(msg)
41 >
42 >        if not cfg_params.has_key('CMSSW.pset'):
43              raise CrabException('Cannot publish output data, because you did not specify the psetname in [CMSSW] of your crab.cfg file')
44 <        try:
45 <            self.globalDBS=cfg_params['CMSSW.dbs_url']
46 <        except KeyError:
47 <            self.globalDBS="http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet"
48 <        try:
46 <            self.DBSURL=cfg_params['USER.dbs_url_for_publication']
47 <            common.logger.message('<dbs_url_for_publication> = '+self.DBSURL)
48 <            if (self.DBSURL == "http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet") or (self.DBSURL == "https://cmsdbsprod.cern.ch:8443/cms_dbs_prod_global_writer/servlet/DBSServlet"):
49 <                msg = "You can not publish your data in the globalDBS = " + self.DBSURL + "\n"
50 <                msg = msg + "Please write your local one in the [USER] section 'dbs_url_for_publication'"
51 <                raise CrabException(msg)
52 <        except KeyError:
44 >        self.pset = cfg_params['CMSSW.pset']
45 >
46 >        self.globalDBS=cfg_params.get('CMSSW.dbs_url',"http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet")
47 >
48 >        if not cfg_params.has_key('USER.dbs_url_for_publication'):
49              msg = "Warning. The [USER] section does not have 'dbs_url_for_publication'"
50              msg = msg + " entry, necessary to publish the data.\n"
51              msg = msg + "Use the command **crab -publish -USER.dbs_url_for_publication=dbs_url_for_publication*** \nwhere dbs_url_for_publication is your local dbs instance."
52              raise CrabException(msg)
53 +
54 +        self.DBSURL=cfg_params['USER.dbs_url_for_publication']
55 +        common.logger.info('<dbs_url_for_publication> = '+self.DBSURL)
56 +        if (self.DBSURL == "http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet") or (self.DBSURL == "https://cmsdbsprod.cern.ch:8443/cms_dbs_prod_global_writer/servlet/DBSServlet"):
57 +            msg = "You can not publish your data in the globalDBS = " + self.DBSURL + "\n"
58 +            msg = msg + "Please write your local one in the [USER] section 'dbs_url_for_publication'"
59 +            raise CrabException(msg)
60              
61          self.content=file(self.pset).read()
62          self.resDir = common.work_space.resDir()
# Line 72 | Line 75 | class Publisher(Actor):
75                  dataset=string.strip(dataset)
76                  self.dataset_to_import.append(dataset)
77          ###        
78 <                
78 >            
79 >        self.import_all_parents = cfg_params.get('USER.publish_with_import_all_parents',1)
80 >        
81 >        if ( int(self.import_all_parents) == 0 ):
82 >            common.logger.info("WARNING: The option USER.publish_with_import_all_parents=0 has been deprecated. The import of parents is compulsory and done by default")
83 >        self.skipOcheck=cfg_params.get('CMSSW.publish_zero_event',1)
84 >        if ( int(self.skipOcheck) == 0 ):
85 >            common.logger.info("WARNING: The option CMSSW.publish_zero_event has been deprecated. The publication is done by default also for files with 0 events")
86          self.SEName=''
87          self.CMSSW_VERSION=''
88          self.exit_status=''
# Line 80 | Line 90 | class Publisher(Actor):
90          self.problemFiles=[]  
91          self.noEventsFiles=[]
92          self.noLFN=[]
93 <        
93 >
94      def importParentDataset(self,globalDBS, datasetpath):
95          """
96 <        """
97 <        dbsWriter = DBSWriter(self.DBSURL,level='ERROR')
98 <        
96 >           WARNING: it works only with DBS_2_0_9_patch_6
97 >        """
98 >
99 >        args={'url':globalDBS}
100          try:
101 <            dbsWriter.importDatasetWithoutParentage(globalDBS, self.datasetpath, self.DBSURL)
102 <        except DBSWriterError, ex:
101 >            api_reader = DbsApi(args)
102 >        except DbsApiException, ex:
103 >            msg = "%s\n" % formatEx(ex)
104 >            raise CrabException(msg)
105 >
106 >        args={'url':self.DBSURL}
107 >        try:
108 >            api_writer = DbsApi(args)
109 >        except DbsApiException, ex:
110 >            msg = "%s\n" % formatEx(ex)
111 >            raise CrabException(msg)
112 >
113 >        try:
114 >            common.logger.info("--->>> Importing all parents level")
115 >            start = time.time()
116 >            common.logger.debug("start import parents time: " + str(start))
117 >            for block in api_reader.listBlocks(datasetpath):
118 >                print "blockName = ", block['Name']
119 >                api_writer.dbsMigrateBlock(globalDBS,self.DBSURL,block['Name'] )
120 >            stop = time.time()
121 >            common.logger.debug("stop import parents time: " + str(stop))
122 >            common.logger.info("--->>> duration of all parents import (sec): "+str(stop - start))
123 >        except DbsApiException, ex:
124              msg = "Error importing dataset to be processed into local DBS\n"
125              msg += "Source Dataset: %s\n" % datasetpath
126              msg += "Source DBS: %s\n" % globalDBS
127              msg += "Destination DBS: %s\n" % self.DBSURL
128 <            common.logger.message(msg)
128 >            common.logger.info(msg)
129 >            common.logger.info(str(ex))
130              return 1
131          return 0
132 <          
132 >        
133      def publishDataset(self,file):
134          """
135          """
# Line 106 | Line 139 | class Publisher(Actor):
139          except IndexError:
140              self.exit_status = '1'
141              msg = "Error: Problem with "+file+" file"  
142 <            common.logger.message(msg)
142 >            common.logger.info(msg)
143              return self.exit_status
144  
145          if (len(self.dataset_to_import) != 0):
146             for dataset in self.dataset_to_import:
147 <               common.logger.message("--->>> Importing parent dataset in the dbs: " +dataset)
147 >               common.logger.info("--->>> Importing parent dataset in the dbs: " +dataset)
148                 status_import=self.importParentDataset(self.globalDBS, dataset)
149                 if (status_import == 1):
150 <                   common.logger.message('Problem with parent '+ dataset +' import from the global DBS '+self.globalDBS+ 'to the local one '+self.DBSURL)
150 >                   common.logger.info('Problem with parent '+ dataset +' import from the global DBS '+self.globalDBS+ 'to the local one '+self.DBSURL)
151                     self.exit_status='1'
152                     return self.exit_status
153                 else:    
154 <                   common.logger.message('Import ok of dataset '+dataset)
154 >                   common.logger.info('Import ok of dataset '+dataset)
155              
156          #// DBS to contact
157          dbswriter = DBSWriter(self.DBSURL)                        
# Line 127 | Line 160 | class Publisher(Actor):
160              self.exit_status = '0'
161          except IndexError:
162              self.exit_status = '1'
163 <            msg = "Error: No file to publish in xml file"+file+" file"  
164 <            common.logger.message(msg)
163 >            msg = "Error: No EDM file to publish in xml file"+file+" file"  
164 >            common.logger.info(msg)
165              return self.exit_status
166  
167          datasets=fileinfo.dataset
168 <        common.logger.debug(6,"FileInfo = " + str(fileinfo))
169 <        common.logger.debug(6,"DatasetInfo = " + str(datasets))
168 >        common.logger.log(10-1,"FileInfo = " + str(fileinfo))
169 >        common.logger.log(10-1,"DatasetInfo = " + str(datasets))
170          if len(datasets)<=0:
171             self.exit_status = '1'
172             msg = "Error: No info about dataset in the xml file "+file
173 <           common.logger.message(msg)
173 >           common.logger.info(msg)
174             return self.exit_status
175          for dataset in datasets:
176              #### for production data
177              self.processedData = dataset['ProcessedDataset']
178              if (dataset['PrimaryDataset'] == 'null'):
179 <                dataset['PrimaryDataset'] = dataset['ProcessedDataset']
180 <            else: # add parentage from input dataset
179 >                #dataset['PrimaryDataset'] = dataset['ProcessedDataset']
180 >                dataset['PrimaryDataset'] = self.userprocessedData
181 >            #else: # add parentage from input dataset
182 >            elif self.datasetpath.upper() != 'NONE':
183                  dataset['ParentDataset']= self.datasetpath
184      
185              dataset['PSetContent']=self.content
186              cfgMeta = {'name' : self.pset , 'Type' : 'user' , 'annotation': 'user cfg', 'version' : 'private version'} # add real name of user cfg
187 <            common.logger.message("PrimaryDataset = %s"%dataset['PrimaryDataset'])
188 <            common.logger.message("ProcessedDataset = %s"%dataset['ProcessedDataset'])
189 <            common.logger.message("<User Dataset Name> = /"+dataset['PrimaryDataset']+"/"+dataset['ProcessedDataset']+"/USER")
187 >            common.logger.info("PrimaryDataset = %s"%dataset['PrimaryDataset'])
188 >            common.logger.info("ProcessedDataset = %s"%dataset['ProcessedDataset'])
189 >            common.logger.info("<User Dataset Name> = /"+dataset['PrimaryDataset']+"/"+dataset['ProcessedDataset']+"/USER")
190 >            self.dataset_to_check="/"+dataset['PrimaryDataset']+"/"+dataset['ProcessedDataset']+"/USER"
191              
192 <            common.logger.debug(6,"--->>> Inserting primary: %s processed : %s"%(dataset['PrimaryDataset'],dataset['ProcessedDataset']))
192 >            common.logger.log(10-1,"--->>> Inserting primary: %s processed : %s"%(dataset['PrimaryDataset'],dataset['ProcessedDataset']))
193              
194              primary = DBSWriterObjects.createPrimaryDataset( dataset, dbswriter.dbs)
195 <            common.logger.debug(6,"Primary:  %s "%primary)
195 >            common.logger.log(10-1,"Primary:  %s "%primary)
196              
197              algo = DBSWriterObjects.createAlgorithm(dataset, cfgMeta, dbswriter.dbs)
198 <            common.logger.debug(6,"Algo:  %s "%algo)
198 >            common.logger.log(10-1,"Algo:  %s "%algo)
199  
200              processed = DBSWriterObjects.createProcessedDataset(primary, algo, dataset, dbswriter.dbs)
201 <            common.logger.debug(6,"Processed:  %s "%processed)
201 >            common.logger.log(10-1,"Processed:  %s "%processed)
202              
203 <            common.logger.debug(6,"Inserted primary %s processed %s"%(primary,processed))
203 >            common.logger.log(10-1,"Inserted primary %s processed %s"%(primary,processed))
204              
205 <        common.logger.debug(6,"exit_status = %s "%self.exit_status)
205 >        common.logger.log(10-1,"exit_status = %s "%self.exit_status)
206          return self.exit_status    
207  
208      def publishAJobReport(self,file,procdataset):
209          """
210             input:  xml file, processedDataset
211          """
212 +        common.logger.debug("FJR = %s"%file)
213          try:
214              jobReport = readJobReport(file)[0]
215              self.exit_status = '0'
# Line 180 | Line 217 | class Publisher(Actor):
217              self.exit_status = '1'
218              msg = "Error: Problem with "+file+" file"
219              raise CrabException(msg)
183        ### overwrite ProcessedDataset with user defined value
184        ### overwrite lumisections with no value
220          ### skip publication for 0 events files
221          filestopublish=[]
222          for file in jobReport.files:
# Line 191 | Line 226 | class Publisher(Actor):
226              elif (file['LFN'] == ''):
227                  self.noLFN.append(file['PFN'])
228              else:
229 <                if int(file['TotalEvents']) != 0 :
230 <                    #file.lumisections = {}
231 <                    # lumi info are now in run hash
232 <                    file.runs = {}
233 <                    for ds in file.dataset:
234 <                        ### FEDE FOR NEW LFN ###
235 <                        #ds['ProcessedDataset']=procdataset
236 <                        ########################
237 <                        ### Fede for production
238 <                        if (ds['PrimaryDataset'] == 'null'):
239 <                            ds['PrimaryDataset']=procdataset
205 <                    filestopublish.append(file)
206 <                else:
229 >                #if  self.skipOcheck==0:
230 >                #    if int(file['TotalEvents']) != 0:
231 >                #        for ds in file.dataset:
232 >                #            ### Fede for production
233 >                #            if (ds['PrimaryDataset'] == 'null'):
234 >                #                ds['PrimaryDataset']=self.userprocessedData
235 >                #        filestopublish.append(file)
236 >                #    else:
237 >                #        self.noEventsFiles.append(file['LFN'])
238 >                #else:
239 >                if int(file['TotalEvents']) == 0:
240                      self.noEventsFiles.append(file['LFN'])
241 +                for ds in file.dataset:
242 +                    ### Fede for production
243 +                    if (ds['PrimaryDataset'] == 'null'):
244 +                        ds['PrimaryDataset']=self.userprocessedData
245 +                filestopublish.append(file)
246 +      
247          jobReport.files = filestopublish
248 +        for file in filestopublish:
249 +            common.logger.debug("--->>> LFN of file to publish =  " + str(file['LFN']))
250          ### if all files of FJR have number of events = 0
251          if (len(filestopublish) == 0):
252 <           return None
252 >            return None
253            
254          #// DBS to contact
255          dbswriter = DBSWriter(self.DBSURL)
256          # insert files
257          Blocks=None
258          try:
259 <            Blocks=dbswriter.insertFiles(jobReport)
260 <            common.logger.message("Inserting file in blocks = %s"%Blocks)
259 >            ### FEDE added insertDetectorData = True to propagate in DBS info about run and lumi
260 >            Blocks=dbswriter.insertFiles(jobReport, insertDetectorData = True)
261 >            #Blocks=dbswriter.insertFiles(jobReport)
262 >            common.logger.debug("--->>> Inserting file in blocks = %s"%Blocks)
263          except DBSWriterError, ex:
264 <            common.logger.error("Insert file error: %s"%ex)
264 >            common.logger.debug("--->>> Insert file error: %s"%ex)
265          return Blocks
266  
267      def run(self):
# Line 227 | Line 270 | class Publisher(Actor):
270          """
271          
272          file_list = glob.glob(self.resDir+"crab_fjr*.xml")
273 +        
274          ## Select only those fjr that are succesfull
275 +        if (len(file_list)==0):
276 +            common.logger.info("--->>> "+self.resDir+" empty: no file to publish on DBS")
277 +            self.exit_status = '1'
278 +            return self.exit_status
279 +
280          good_list=[]
281          for fjr in file_list:
282              reports = readJobReport(fjr)
283 <            if reports[0].status == "Success":
284 <               good_list.append(fjr)
283 >            if len(reports)>0:
284 >               if reports[0].status == "Success":
285 >                  good_list.append(fjr)
286          file_list=good_list
287          ##
288 <        common.logger.debug(6, "file_list = "+str(file_list))
289 <        common.logger.debug(6, "len(file_list) = "+str(len(file_list)))
288 >        common.logger.log(10-1, "file_list = "+str(file_list))
289 >        common.logger.log(10-1, "len(file_list) = "+str(len(file_list)))
290              
291          if (len(file_list)>0):
292              BlocksList=[]
293 <            common.logger.message("--->>> Start dataset publication")
293 >            common.logger.info("--->>> Start dataset publication")
294              self.exit_status=self.publishDataset(file_list[0])
295              if (self.exit_status == '1'):
296                  return self.exit_status
297 <            common.logger.message("--->>> End dataset publication")
297 >            common.logger.info("--->>> End dataset publication")
298  
299  
300 <            common.logger.message("--->>> Start files publication")
300 >            common.logger.info("--->>> Start files publication")
301              for file in file_list:
252                common.logger.debug(1, "file = "+file)
302                  Blocks=self.publishAJobReport(file,self.processedData)
303                  if Blocks:
304                      for x in Blocks: # do not allow multiple entries of the same block
# Line 257 | Line 306 | class Publisher(Actor):
306                             BlocksList.append(x)
307                      
308              # close the blocks
309 <            common.logger.debug(6, "BlocksList = %s"%BlocksList)
261 <            # dbswriter = DBSWriter(self.DBSURL,level='ERROR')
309 >            common.logger.log(10-1, "BlocksList = %s"%BlocksList)
310              dbswriter = DBSWriter(self.DBSURL)
311              
312              for BlockName in BlocksList:
313                  try:  
314                      closeBlock=dbswriter.manageFileBlock(BlockName,maxFiles= 1)
315 <                    common.logger.debug(6, "closeBlock %s"%closeBlock)
268 <                    #dbswriter.dbs.closeBlock(BlockName)
315 >                    common.logger.log(10-1, "closeBlock %s"%closeBlock)
316                  except DBSWriterError, ex:
317 <                    common.logger.message("Close block error %s"%ex)
317 >                    common.logger.info("Close block error %s"%ex)
318  
319              if (len(self.noEventsFiles)>0):
320 <                common.logger.message("--->>> WARNING: "+str(len(self.noEventsFiles))+" files not published because they contain 0 events are:")
320 >                common.logger.info("--->>> WARNING: "+str(len(self.noEventsFiles))+" published files contain 0 events are:")
321                  for lfn in self.noEventsFiles:
322 <                    common.logger.message("------ LFN: %s"%lfn)
322 >                    common.logger.info("------ LFN: %s"%lfn)
323              if (len(self.noLFN)>0):
324 <                common.logger.message("--->>> WARNING: there are "+str(len(self.noLFN))+" files not published because they have empty LFN")
324 >                common.logger.info("--->>> WARNING: there are "+str(len(self.noLFN))+" files not published because they have empty LFN")
325                  for pfn in self.noLFN:
326 <                    common.logger.message("------ pfn: %s"%pfn)
326 >                    common.logger.info("------ pfn: %s"%pfn)
327              if (len(self.problemFiles)>0):
328 <                common.logger.message("--->>> WARNING: "+str(len(self.problemFiles))+" files not published because they had problem with copy to SE")
328 >                common.logger.info("--->>> WARNING: "+str(len(self.problemFiles))+" files not published because they had problem with copy to SE")
329                  for lfn in self.problemFiles:
330 <                    common.logger.message("------ LFN: %s"%lfn)
331 <            common.logger.message("--->>> End files publication")
332 <            common.logger.message("--->>> To check data publication please use: InspectDBS2.py --DBSURL=<dbs_url_for_publication> --datasetPath=<User Dataset Name>")
330 >                    common.logger.info("------ LFN: %s"%lfn)
331 >            common.logger.info("--->>> End files publication")
332 >          
333 >            self.cfg_params['USER.dataset_to_check']=self.dataset_to_check
334 >            from InspectDBS import InspectDBS
335 >            check=InspectDBS(self.cfg_params)
336 >            check.checkPublication()
337              return self.exit_status
338  
339          else:
340 <            common.logger.message("--->>> "+self.resDir+" empty: no file to publish on DBS")
340 >            common.logger.info("--->>> No valid files to publish on DBS. Your jobs do not report exit codes = 0")
341              self.exit_status = '1'
342              return self.exit_status
343      

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines