ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Publisher.py
(Generate patch)

Comparing COMP/CRAB/python/Publisher.py (file contents):
Revision 1.10 by afanfani, Fri Feb 15 09:53:14 2008 UTC vs.
Revision 1.37 by fanzago, Wed Jun 24 16:46:25 2009 UTC

# Line 3 | Line 3 | import common
3   import time, glob
4   from Actor import *
5   from crab_util import *
6 from crab_logger import Logger
6   from crab_exceptions import *
7   from ProdCommon.FwkJobRep.ReportParser import readJobReport
8 + from ProdCommon.FwkJobRep.ReportState import checkSuccess
9   from ProdCommon.MCPayloads.WorkflowSpec import WorkflowSpec
10   from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter
11   from ProdCommon.DataMgmt.DBS.DBSErrors import DBSWriterError, formatEx,DBSReaderError
12   from ProdCommon.DataMgmt.DBS.DBSReader import DBSReader
13   from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter,DBSWriterObjects
14   import sys
15 + from DBSAPI.dbsApiException import DbsException
16 + from DBSAPI.dbsApi import DbsApi
17  
18   class Publisher(Actor):
19      def __init__(self, cfg_params):
# Line 23 | Line 25 | class Publisher(Actor):
25          - publishes output data on DBS and DLS
26          """
27  
28 <        try:
29 <            self.processedData = cfg_params['USER.publish_data_name']
30 <        except KeyError:
28 >        self.cfg_params=cfg_params
29 >      
30 >        if not cfg_params.has_key('USER.publish_data_name'):
31              raise CrabException('Cannot publish output data, because you did not specify USER.publish_data_name parameter in the crab.cfg file')
32 +        self.userprocessedData = cfg_params['USER.publish_data_name']
33 +        self.processedData = None
34  
35 <        try:
36 <            if (int(cfg_params['USER.copy_data']) != 1): raise KeyError
37 <        except KeyError:
38 <            raise CrabException('You can not publish data because you did not selected *** copy_data = 1  *** in the crab.cfg file')
39 <        try:
40 <            self.pset = cfg_params['CMSSW.pset']
41 <        except KeyError:
35 >        if (not cfg_params.has_key('USER.copy_data') or int(cfg_params['USER.copy_data']) != 1 ) or \
36 >            (not cfg_params.has_key('USER.publish_data') or int(cfg_params['USER.publish_data']) != 1 ):
37 >            msg  = 'You can not publish data because you did not selected \n'
38 >            msg += '\t*** copy_data = 1 and publish_data = 1  *** in the crab.cfg file'
39 >            raise CrabException(msg)
40 >
41 >        if not cfg_params.has_key('CMSSW.pset'):
42              raise CrabException('Cannot publish output data, because you did not specify the psetname in [CMSSW] of your crab.cfg file')
43 <        try:
44 <            self.globalDBS=cfg_params['CMSSW.dbs_url']
45 <        except KeyError:
46 <            self.globalDBS="http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet"
47 <        try:
48 <            self.DBSURL=cfg_params['USER.dbs_url_for_publication']
49 <            common.logger.message('<dbs_url_for_publication> = '+self.DBSURL)
50 <            if (self.DBSURL == "http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet") or (self.DBSURL == "https://cmsdbsprod.cern.ch:8443/cms_dbs_prod_global_writer/servlet/DBSServlet"):
51 <                msg = "You can not publish your data in the globalDBS = " + self.DBSURL + "\n"
52 <                msg = msg + "Please write your local one in the [USER] section 'dbs_url_for_publication'"
53 <                raise CrabException(msg)
54 <        except KeyError:
55 <            msg = "Error. The [USER] section does not have 'dbs_url_for_publication'"
56 <            msg = msg + " entry, necessary to publish the data"
43 >        self.pset = cfg_params['CMSSW.pset']
44 >
45 >        self.globalDBS=cfg_params.get('CMSSW.dbs_url',"http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet")
46 >
47 >        if not cfg_params.has_key('USER.dbs_url_for_publication'):
48 >            msg = "Warning. The [USER] section does not have 'dbs_url_for_publication'"
49 >            msg = msg + " entry, necessary to publish the data.\n"
50 >            msg = msg + "Use the command **crab -publish -USER.dbs_url_for_publication=dbs_url_for_publication*** \nwhere dbs_url_for_publication is your local dbs instance."
51 >            raise CrabException(msg)
52 >
53 >        self.DBSURL=cfg_params['USER.dbs_url_for_publication']
54 >        common.logger.info('<dbs_url_for_publication> = '+self.DBSURL)
55 >        if (self.DBSURL == "http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet") or (self.DBSURL == "https://cmsdbsprod.cern.ch:8443/cms_dbs_prod_global_writer/servlet/DBSServlet"):
56 >            msg = "You can not publish your data in the globalDBS = " + self.DBSURL + "\n"
57 >            msg = msg + "Please write your local one in the [USER] section 'dbs_url_for_publication'"
58              raise CrabException(msg)
59              
60          self.content=file(self.pset).read()
61          self.resDir = common.work_space.resDir()
62 +        
63 +        self.dataset_to_import=[]
64 +        
65          self.datasetpath=cfg_params['CMSSW.datasetpath']
66 +        if (self.datasetpath.upper() != 'NONE'):
67 +            self.dataset_to_import.append(self.datasetpath)
68 +        
69 +        ### Added PU dataset
70 +        tmp = cfg_params.get('CMSSW.dataset_pu',None)
71 +        if tmp :
72 +            datasets = tmp.split(',')
73 +            for dataset in datasets:
74 +                dataset=string.strip(dataset)
75 +                self.dataset_to_import.append(dataset)
76 +        ###        
77 +            
78 +        self.import_all_parents = cfg_params.get('USER.publish_with_import_all_parents',0)
79 +        self.skipOcheck=cfg_params.get('CMSSW.publish_zero_event',0)
80 +    
81          self.SEName=''
82          self.CMSSW_VERSION=''
83          self.exit_status=''
# Line 65 | Line 88 | class Publisher(Actor):
88          
89      def importParentDataset(self,globalDBS, datasetpath):
90          """
91 <        """
91 >        """
92          dbsWriter = DBSWriter(self.DBSURL,level='ERROR')
93          
94          try:
95 <            dbsWriter.importDatasetWithoutParentage(globalDBS, self.datasetpath, self.DBSURL)
95 >            if (self.import_all_parents=='1'):
96 >                common.logger.info("--->>> Importing all parents level")
97 >                dbsWriter.importDataset(globalDBS, datasetpath, self.DBSURL)
98 >            else:
99 >                common.logger.info("--->>> Importing only the datasetpath " + datasetpath)
100 >                dbsWriter.importDatasetWithoutParentage(globalDBS, datasetpath, self.DBSURL)
101          except DBSWriterError, ex:
102              msg = "Error importing dataset to be processed into local DBS\n"
103              msg += "Source Dataset: %s\n" % datasetpath
104              msg += "Source DBS: %s\n" % globalDBS
105              msg += "Destination DBS: %s\n" % self.DBSURL
106 <            common.logger.message(msg)
106 >            common.logger.info(msg)
107 >            common.logger.debug(str(ex))
108 >            return 1
109 >        return 0
110 >        """
111 >        print " patch for importParentDataset: datasetpath = ", datasetpath
112 >        try:
113 >            args={}
114 >            args['url']=self.DBSURL
115 >            args['mode']='POST'
116 >            block = ""
117 >            api = DbsApi(args)
118 >            #api.migrateDatasetContents(srcURL, dstURL, path, block , False)
119 >            api.migrateDatasetContents(globalDBS, self.DBSURL, datasetpath, block , False)
120 >
121 >        except DbsException, ex:
122 >            print "Caught API Exception %s: %s "  % (ex.getClassName(), ex.getErrorMessage() )
123 >            if ex.getErrorCode() not in (None, ""):
124 >                print "DBS Exception Error Code: ", ex.getErrorCode()
125              return 1
126 +        print "Done"
127          return 0
128 <          
128 >        """  
129      def publishDataset(self,file):
130          """
131          """
# Line 88 | Line 135 | class Publisher(Actor):
135          except IndexError:
136              self.exit_status = '1'
137              msg = "Error: Problem with "+file+" file"  
138 <            common.logger.message(msg)
138 >            common.logger.info(msg)
139              return self.exit_status
140 <            
141 <        if (self.datasetpath != 'None'):
142 <            common.logger.message("--->>> Importing parent dataset in the dbs")
143 <            status_import=self.importParentDataset(self.globalDBS, self.datasetpath)
144 <            if (status_import == 1):
145 <                common.logger.message('Problem with parent import from the global DBS '+self.globalDBS+ 'to the local one '+self.DBSURL)
146 <                self.exit_status='1'
147 <                return self.exit_status
148 <            common.logger.message("Parent import ok")
140 >
141 >        if (len(self.dataset_to_import) != 0):
142 >           for dataset in self.dataset_to_import:
143 >               common.logger.info("--->>> Importing parent dataset in the dbs: " +dataset)
144 >               status_import=self.importParentDataset(self.globalDBS, dataset)
145 >               if (status_import == 1):
146 >                   common.logger.info('Problem with parent '+ dataset +' import from the global DBS '+self.globalDBS+ 'to the local one '+self.DBSURL)
147 >                   self.exit_status='1'
148 >                   return self.exit_status
149 >               else:    
150 >                   common.logger.info('Import ok of dataset '+dataset)
151              
152          #// DBS to contact
153          dbswriter = DBSWriter(self.DBSURL)                        
# Line 108 | Line 157 | class Publisher(Actor):
157          except IndexError:
158              self.exit_status = '1'
159              msg = "Error: No file to publish in xml file"+file+" file"  
160 <            common.logger.message(msg)
160 >            common.logger.info(msg)
161              return self.exit_status
162  
163          datasets=fileinfo.dataset
164 <        common.logger.debug(6,"FileInfo = " + str(fileinfo))
165 <        common.logger.debug(6,"DatasetInfo = " + str(datasets))
164 >        common.logger.log(10-1,"FileInfo = " + str(fileinfo))
165 >        common.logger.log(10-1,"DatasetInfo = " + str(datasets))
166 >        if len(datasets)<=0:
167 >           self.exit_status = '1'
168 >           msg = "Error: No info about dataset in the xml file "+file
169 >           common.logger.info(msg)
170 >           return self.exit_status
171          for dataset in datasets:
172              #### for production data
173 +            self.processedData = dataset['ProcessedDataset']
174              if (dataset['PrimaryDataset'] == 'null'):
175 <                dataset['PrimaryDataset'] = dataset['ProcessedDataset']
176 <                
175 >                #dataset['PrimaryDataset'] = dataset['ProcessedDataset']
176 >                dataset['PrimaryDataset'] = self.userprocessedData
177 >            #else: # add parentage from input dataset
178 >            elif self.datasetpath.upper() != 'NONE':
179 >                dataset['ParentDataset']= self.datasetpath
180 >    
181              dataset['PSetContent']=self.content
182              cfgMeta = {'name' : self.pset , 'Type' : 'user' , 'annotation': 'user cfg', 'version' : 'private version'} # add real name of user cfg
183 <            common.logger.message("PrimaryDataset = %s"%dataset['PrimaryDataset'])
184 <            common.logger.message("ProcessedDataset = %s"%dataset['ProcessedDataset'])
185 <            common.logger.message("<User Dataset Name> = /"+dataset['PrimaryDataset']+"/"+dataset['ProcessedDataset']+"/USER")
183 >            common.logger.info("PrimaryDataset = %s"%dataset['PrimaryDataset'])
184 >            common.logger.info("ProcessedDataset = %s"%dataset['ProcessedDataset'])
185 >            common.logger.info("<User Dataset Name> = /"+dataset['PrimaryDataset']+"/"+dataset['ProcessedDataset']+"/USER")
186 >            self.dataset_to_check="/"+dataset['PrimaryDataset']+"/"+dataset['ProcessedDataset']+"/USER"
187              
188 <            common.logger.debug(6,"--->>> Inserting primary: %s processed : %s"%(dataset['PrimaryDataset'],dataset['ProcessedDataset']))
188 >            common.logger.log(10-1,"--->>> Inserting primary: %s processed : %s"%(dataset['PrimaryDataset'],dataset['ProcessedDataset']))
189              
190              primary = DBSWriterObjects.createPrimaryDataset( dataset, dbswriter.dbs)
191 <            common.logger.debug(6,"Primary:  %s "%primary)
191 >            common.logger.log(10-1,"Primary:  %s "%primary)
192              
193              algo = DBSWriterObjects.createAlgorithm(dataset, cfgMeta, dbswriter.dbs)
194 <            common.logger.debug(6,"Algo:  %s "%algo)
194 >            common.logger.log(10-1,"Algo:  %s "%algo)
195  
196              processed = DBSWriterObjects.createProcessedDataset(primary, algo, dataset, dbswriter.dbs)
197 <            common.logger.debug(6,"Processed:  %s "%processed)
197 >            common.logger.log(10-1,"Processed:  %s "%processed)
198              
199 <            common.logger.debug(6,"Inserted primary %s processed %s"%(primary,processed))
199 >            common.logger.log(10-1,"Inserted primary %s processed %s"%(primary,processed))
200              
201 <        common.logger.debug(6,"exit_status = %s "%self.exit_status)
201 >        common.logger.log(10-1,"exit_status = %s "%self.exit_status)
202          return self.exit_status    
203  
204      def publishAJobReport(self,file,procdataset):
# Line 163 | Line 223 | class Publisher(Actor):
223              elif (file['LFN'] == ''):
224                  self.noLFN.append(file['PFN'])
225              else:
226 <                if int(file['TotalEvents']) != 0 :
227 <                    file.lumisections = {}
226 >                if  self.skipOcheck==0:
227 >                    if int(file['TotalEvents']) != 0:
228 >                        #file.lumisections = {}
229 >                        # lumi info are now in run hash
230 >                        file.runs = {}
231 >                        for ds in file.dataset:
232 >                            ### Fede for production
233 >                            if (ds['PrimaryDataset'] == 'null'):
234 >                                #ds['PrimaryDataset']=procdataset
235 >                                ds['PrimaryDataset']=self.userprocessedData
236 >                        filestopublish.append(file)
237 >                    else:
238 >                        self.noEventsFiles.append(file['LFN'])
239 >                else:
240 >                    file.runs = {}
241                      for ds in file.dataset:
169                        ds['ProcessedDataset']=procdataset
242                          ### Fede for production
243                          if (ds['PrimaryDataset'] == 'null'):
244 <                            ds['PrimaryDataset']=procdataset
244 >                            #ds['PrimaryDataset']=procdataset
245 >                            ds['PrimaryDataset']=self.userprocessedData
246                      filestopublish.append(file)
247 <                else:
175 <                    self.noEventsFiles.append(file['LFN'])
247 >      
248          jobReport.files = filestopublish
249          ### if all files of FJR have number of events = 0
250          if (len(filestopublish) == 0):
# Line 184 | Line 256 | class Publisher(Actor):
256          Blocks=None
257          try:
258              Blocks=dbswriter.insertFiles(jobReport)
259 <            common.logger.message("Blocks = %s"%Blocks)
259 >            common.logger.info("Inserting file in blocks = %s"%Blocks)
260          except DBSWriterError, ex:
261 <            common.logger.message("Insert file error: %s"%ex)
261 >            common.logger.info("Insert file error: %s"%ex)
262          return Blocks
263  
264      def run(self):
# Line 195 | Line 267 | class Publisher(Actor):
267          """
268          
269          file_list = glob.glob(self.resDir+"crab_fjr*.xml")
270 <        common.logger.debug(6, "file_list = "+str(file_list))
271 <        common.logger.debug(6, "len(file_list) = "+str(len(file_list)))
270 >        ## Select only those fjr that are succesfull
271 >        good_list=[]
272 >        for fjr in file_list:
273 >            reports = readJobReport(fjr)
274 >            if len(reports)>0:
275 >               if reports[0].status == "Success":
276 >                  good_list.append(fjr)
277 >        file_list=good_list
278 >        ##
279 >        common.logger.log(10-1, "file_list = "+str(file_list))
280 >        common.logger.log(10-1, "len(file_list) = "+str(len(file_list)))
281              
282          if (len(file_list)>0):
283              BlocksList=[]
284 <            common.logger.message("--->>> Start dataset publication")
284 >            common.logger.info("--->>> Start dataset publication")
285              self.exit_status=self.publishDataset(file_list[0])
286              if (self.exit_status == '1'):
287                  return self.exit_status
288 <            common.logger.message("--->>> End dataset publication")
288 >            common.logger.info("--->>> End dataset publication")
289  
290  
291 <            common.logger.message("--->>> Start files publication")
291 >            common.logger.info("--->>> Start files publication")
292              for file in file_list:
293 <                common.logger.message("file = "+file)
293 >                common.logger.debug( "file = "+file)
294                  Blocks=self.publishAJobReport(file,self.processedData)
295                  if Blocks:
296 <                    [BlocksList.append(x) for x in Blocks]
296 >                    for x in Blocks: # do not allow multiple entries of the same block
297 >                        if x not in BlocksList:
298 >                           BlocksList.append(x)
299                      
300              # close the blocks
301 <            common.logger.debug(6, "BlocksList = %s"%BlocksList)
301 >            common.logger.log(10-1, "BlocksList = %s"%BlocksList)
302              # dbswriter = DBSWriter(self.DBSURL,level='ERROR')
303              dbswriter = DBSWriter(self.DBSURL)
304              
305              for BlockName in BlocksList:
306                  try:  
307                      closeBlock=dbswriter.manageFileBlock(BlockName,maxFiles= 1)
308 <                    common.logger.debug(6, "closeBlock %s"%closeBlock)
308 >                    common.logger.log(10-1, "closeBlock %s"%closeBlock)
309                      #dbswriter.dbs.closeBlock(BlockName)
310                  except DBSWriterError, ex:
311 <                    common.logger.message("Close block error %s"%ex)
311 >                    common.logger.info("Close block error %s"%ex)
312  
313              if (len(self.noEventsFiles)>0):
314 <                common.logger.message("--->>> WARNING: "+str(len(self.noEventsFiles))+" files not published because they contain 0 events are:")
314 >                common.logger.info("--->>> WARNING: "+str(len(self.noEventsFiles))+" files not published because they contain 0 events are:")
315                  for lfn in self.noEventsFiles:
316 <                    common.logger.message("------ LFN: %s"%lfn)
316 >                    common.logger.info("------ LFN: %s"%lfn)
317              if (len(self.noLFN)>0):
318 <                common.logger.message("--->>> WARNING: there are "+str(len(self.noLFN))+" files not published because they have empty LFN")
318 >                common.logger.info("--->>> WARNING: there are "+str(len(self.noLFN))+" files not published because they have empty LFN")
319                  for pfn in self.noLFN:
320 <                    common.logger.message("------ pfn: %s"%pfn)
320 >                    common.logger.info("------ pfn: %s"%pfn)
321              if (len(self.problemFiles)>0):
322 <                common.logger.message("--->>> WARNING: "+str(len(self.problemFiles))+" files not published because they had problem with copy to SE")
322 >                common.logger.info("--->>> WARNING: "+str(len(self.problemFiles))+" files not published because they had problem with copy to SE")
323                  for lfn in self.problemFiles:
324 <                    common.logger.message("------ LFN: %s"%lfn)
325 <            common.logger.message("--->>> End files publication")
326 <            common.logger.message("--->>> To check data publication please use: InspectDBS2.py --DBSURL=<dbs_url_for_publication> --datasetPath=<User Dataset Name>")
324 >                    common.logger.info("------ LFN: %s"%lfn)
325 >            common.logger.info("--->>> End files publication")
326 >          
327 >            self.cfg_params['USER.dataset_to_check']=self.dataset_to_check
328 >            from InspectDBS import InspectDBS
329 >            check=InspectDBS(self.cfg_params)
330 >            check.checkPublication()
331              return self.exit_status
332  
333          else:
334 <            common.logger.message("--->>> "+self.resDir+" empty: no file to publish on DBS")
334 >            common.logger.info("--->>> "+self.resDir+" empty: no file to publish on DBS")
335              self.exit_status = '1'
336              return self.exit_status
337      

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines