ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Publisher.py
(Generate patch)

Comparing COMP/CRAB/python/Publisher.py (file contents):
Revision 1.7 by fanzago, Tue Dec 18 10:58:20 2007 UTC vs.
Revision 1.45.4.2 by fanzago, Mon Apr 19 16:24:10 2010 UTC

# Line 3 | Line 3 | import common
3   import time, glob
4   from Actor import *
5   from crab_util import *
6 from crab_logger import Logger
6   from crab_exceptions import *
7 < from FwkJobRep.ReportParser import readJobReport
7 > from ProdCommon.FwkJobRep.ReportParser import readJobReport
8 > from ProdCommon.FwkJobRep.ReportState import checkSuccess
9   from ProdCommon.MCPayloads.WorkflowSpec import WorkflowSpec
10   from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter
11   from ProdCommon.DataMgmt.DBS.DBSErrors import DBSWriterError, formatEx,DBSReaderError
12   from ProdCommon.DataMgmt.DBS.DBSReader import DBSReader
13   from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter,DBSWriterObjects
14   import sys
15 + from DBSAPI.dbsApi import DbsApi
16 + from DBSAPI.dbsMigrateApi import DbsMigrateApi
17 + from DBSAPI.dbsApiException import *
18  
19   class Publisher(Actor):
20      def __init__(self, cfg_params):
# Line 23 | Line 26 | class Publisher(Actor):
26          - publishes output data on DBS and DLS
27          """
28  
29 <        try:
30 <            self.processedData = cfg_params['USER.publish_data_name']
31 <        except KeyError:
29 >        self.cfg_params=cfg_params
30 >      
31 >        if not cfg_params.has_key('USER.publish_data_name'):
32              raise CrabException('Cannot publish output data, because you did not specify USER.publish_data_name parameter in the crab.cfg file')
33 <        try:
34 <            if (int(cfg_params['USER.copy_data']) != 1): raise KeyError
35 <        except KeyError:
36 <            raise CrabException('You can not publish data because you did not selected *** copy_data = 1  *** in the crab.cfg file')
37 <        try:
38 <            self.pset = cfg_params['CMSSW.pset']
39 <        except KeyError:
33 >        self.userprocessedData = cfg_params['USER.publish_data_name']
34 >        self.processedData = None
35 >
36 >        if (not cfg_params.has_key('USER.copy_data') or int(cfg_params['USER.copy_data']) != 1 ) or \
37 >            (not cfg_params.has_key('USER.publish_data') or int(cfg_params['USER.publish_data']) != 1 ):
38 >            msg  = 'You can not publish data because you did not selected \n'
39 >            msg += '\t*** copy_data = 1 and publish_data = 1  *** in the crab.cfg file'
40 >            raise CrabException(msg)
41 >
42 >        if not cfg_params.has_key('CMSSW.pset'):
43              raise CrabException('Cannot publish output data, because you did not specify the psetname in [CMSSW] of your crab.cfg file')
44 <        try:
45 <            self.globalDBS=cfg_params['CMSSW.dbs_url']
46 <        except KeyError:
47 <            self.globalDBS="http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet"
48 <        try:
49 <            self.DBSURL=cfg_params['USER.dbs_url_for_publication']
50 <            common.logger.message('<dbs_url_for_publication> = '+self.DBSURL)
51 <            if (self.DBSURL == "http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet") or (self.DBSURL == "https://cmsdbsprod.cern.ch:8443/cms_dbs_prod_global_writer/servlet/DBSServlet"):
52 <                msg = "You can not publish your data in the globalDBS = " + self.DBSURL + "\n"
53 <                msg = msg + "Please write your local one in the [USER] section 'dbs_url_for_publication'"
54 <                raise CrabException(msg)
55 <        except KeyError:
56 <            msg = "Error. The [USER] section does not have 'dbs_url_for_publication'"
57 <            msg = msg + " entry, necessary to publish the data"
44 >        self.pset = cfg_params['CMSSW.pset']
45 >
46 >        self.globalDBS=cfg_params.get('CMSSW.dbs_url',"http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet")
47 >
48 >        if not cfg_params.has_key('USER.dbs_url_for_publication'):
49 >            msg = "Warning. The [USER] section does not have 'dbs_url_for_publication'"
50 >            msg = msg + " entry, necessary to publish the data.\n"
51 >            msg = msg + "Use the command **crab -publish -USER.dbs_url_for_publication=dbs_url_for_publication*** \nwhere dbs_url_for_publication is your local dbs instance."
52 >            raise CrabException(msg)
53 >
54 >        self.DBSURL=cfg_params['USER.dbs_url_for_publication']
55 >        common.logger.info('<dbs_url_for_publication> = '+self.DBSURL)
56 >        if (self.DBSURL == "http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet") or (self.DBSURL == "https://cmsdbsprod.cern.ch:8443/cms_dbs_prod_global_writer/servlet/DBSServlet"):
57 >            msg = "You can not publish your data in the globalDBS = " + self.DBSURL + "\n"
58 >            msg = msg + "Please write your local one in the [USER] section 'dbs_url_for_publication'"
59              raise CrabException(msg)
60              
61          self.content=file(self.pset).read()
62          self.resDir = common.work_space.resDir()
63 +        
64 +        self.dataset_to_import=[]
65 +        
66          self.datasetpath=cfg_params['CMSSW.datasetpath']
67 +        if (self.datasetpath.upper() != 'NONE'):
68 +            self.dataset_to_import.append(self.datasetpath)
69 +        
70 +        ### Added PU dataset
71 +        tmp = cfg_params.get('CMSSW.dataset_pu',None)
72 +        if tmp :
73 +            datasets = tmp.split(',')
74 +            for dataset in datasets:
75 +                dataset=string.strip(dataset)
76 +                self.dataset_to_import.append(dataset)
77 +        ###        
78 +            
79 +        self.import_all_parents = cfg_params.get('USER.publish_with_import_all_parents',1)
80 +        
81 +        ### fede import parent dataset is compulsory ###
82 +        if ( int(self.import_all_parents) == 0 ):
83 +            common.logger.info("WARNING: The option USER.publish_with_import_all_parents=0 has been deprecated. The import of parents is compulsory and done by default")
84 +        ############
85 +        self.skipOcheck=cfg_params.get('CMSSW.publish_zero_event',0)
86 +    
87          self.SEName=''
88          self.CMSSW_VERSION=''
89          self.exit_status=''
# Line 61 | Line 91 | class Publisher(Actor):
91          self.problemFiles=[]  
92          self.noEventsFiles=[]
93          self.noLFN=[]
94 <        
94 >
95      def importParentDataset(self,globalDBS, datasetpath):
96          """
97 <        """
98 <        dbsWriter = DBSWriter(self.DBSURL,level='ERROR')
99 <        
97 >           WARNING: it works only with DBS_2_0_9_patch_6
98 >        """
99 >
100 >        args={'url':globalDBS}
101          try:
102 <            dbsWriter.importDataset(globalDBS, self.datasetpath, self.DBSURL)
103 <        except DBSWriterError, ex:
102 >            api_reader = DbsApi(args)
103 >        except DbsApiException, ex:
104 >            msg = "%s\n" % formatEx(ex)
105 >            raise CrabException(msg)
106 >
107 >        args={'url':self.DBSURL}
108 >        try:
109 >            api_writer = DbsApi(args)
110 >        except DbsApiException, ex:
111 >            msg = "%s\n" % formatEx(ex)
112 >            raise CrabException(msg)
113 >
114 >        try:
115 >            common.logger.info("--->>> Importing all parents level")
116 >            start = time.time()
117 >            common.logger.debug("start import parents time: " + str(start))
118 >            for block in api_reader.listBlocks(datasetpath):
119 >                print "blockName = ", block['Name']
120 >                api_writer.dbsMigrateBlock(globalDBS,self.DBSURL,block['Name'] )
121 >            stop = time.time()
122 >            common.logger.debug("stop import parents time: " + str(stop))
123 >            common.logger.info("--->>> duration of all parents import (sec): "+str(stop - start))
124 >        except DbsApiException, ex:
125              msg = "Error importing dataset to be processed into local DBS\n"
126              msg += "Source Dataset: %s\n" % datasetpath
127              msg += "Source DBS: %s\n" % globalDBS
128              msg += "Destination DBS: %s\n" % self.DBSURL
129 <            common.logger.message(msg)
129 >            common.logger.info(msg)
130 >            common.logger.info(str(ex))
131              return 1
132          return 0
133 <          
133 >        
134      def publishDataset(self,file):
135          """
136          """
# Line 87 | Line 140 | class Publisher(Actor):
140          except IndexError:
141              self.exit_status = '1'
142              msg = "Error: Problem with "+file+" file"  
143 <            common.logger.message(msg)
143 >            common.logger.info(msg)
144              return self.exit_status
145 <            
146 <        if (self.datasetpath != 'None'):
147 <            common.logger.message("--->>> Importing parent dataset in the dbs")
148 <            status_import=self.importParentDataset(self.globalDBS, self.datasetpath)
149 <            if (status_import == 1):
150 <                common.logger.message('Problem with parent import from the global DBS '+self.globalDBS+ 'to the local one '+self.DBSURL)
151 <                self.exit_status='1'
152 <                return self.exit_status
153 <            common.logger.message("Parent import ok")
145 >
146 >        if (len(self.dataset_to_import) != 0):
147 >           for dataset in self.dataset_to_import:
148 >               common.logger.info("--->>> Importing parent dataset in the dbs: " +dataset)
149 >               status_import=self.importParentDataset(self.globalDBS, dataset)
150 >               if (status_import == 1):
151 >                   common.logger.info('Problem with parent '+ dataset +' import from the global DBS '+self.globalDBS+ 'to the local one '+self.DBSURL)
152 >                   self.exit_status='1'
153 >                   return self.exit_status
154 >               else:    
155 >                   common.logger.info('Import ok of dataset '+dataset)
156              
157          #// DBS to contact
158          dbswriter = DBSWriter(self.DBSURL)                        
# Line 106 | Line 161 | class Publisher(Actor):
161              self.exit_status = '0'
162          except IndexError:
163              self.exit_status = '1'
164 <            msg = "Error: No file to publish in xml file"+file+" file"  
165 <            common.logger.message(msg)
164 >            msg = "Error: No EDM file to publish in xml file"+file+" file"  
165 >            common.logger.info(msg)
166              return self.exit_status
167  
168          datasets=fileinfo.dataset
169 <        common.logger.debug(6,"FileInfo = " + str(fileinfo))
170 <        common.logger.debug(6,"DatasetInfo = " + str(datasets))
169 >        common.logger.log(10-1,"FileInfo = " + str(fileinfo))
170 >        common.logger.log(10-1,"DatasetInfo = " + str(datasets))
171 >        if len(datasets)<=0:
172 >           self.exit_status = '1'
173 >           msg = "Error: No info about dataset in the xml file "+file
174 >           common.logger.info(msg)
175 >           return self.exit_status
176          for dataset in datasets:
177              #### for production data
178 +            self.processedData = dataset['ProcessedDataset']
179              if (dataset['PrimaryDataset'] == 'null'):
180 <                dataset['PrimaryDataset'] = dataset['ProcessedDataset']
181 <                
180 >                #dataset['PrimaryDataset'] = dataset['ProcessedDataset']
181 >                dataset['PrimaryDataset'] = self.userprocessedData
182 >            #else: # add parentage from input dataset
183 >            elif self.datasetpath.upper() != 'NONE':
184 >                dataset['ParentDataset']= self.datasetpath
185 >    
186              dataset['PSetContent']=self.content
187              cfgMeta = {'name' : self.pset , 'Type' : 'user' , 'annotation': 'user cfg', 'version' : 'private version'} # add real name of user cfg
188 <            common.logger.message("PrimaryDataset = %s"%dataset['PrimaryDataset'])
189 <            common.logger.message("ProcessedDataset = %s"%dataset['ProcessedDataset'])
190 <            common.logger.message("<User Dataset Name> = /"+dataset['PrimaryDataset']+"/"+dataset['ProcessedDataset']+"/USER")
188 >            common.logger.info("PrimaryDataset = %s"%dataset['PrimaryDataset'])
189 >            common.logger.info("ProcessedDataset = %s"%dataset['ProcessedDataset'])
190 >            common.logger.info("<User Dataset Name> = /"+dataset['PrimaryDataset']+"/"+dataset['ProcessedDataset']+"/USER")
191 >            self.dataset_to_check="/"+dataset['PrimaryDataset']+"/"+dataset['ProcessedDataset']+"/USER"
192              
193 <            common.logger.debug(6,"--->>> Inserting primary: %s processed : %s"%(dataset['PrimaryDataset'],dataset['ProcessedDataset']))
193 >            common.logger.log(10-1,"--->>> Inserting primary: %s processed : %s"%(dataset['PrimaryDataset'],dataset['ProcessedDataset']))
194              
195              primary = DBSWriterObjects.createPrimaryDataset( dataset, dbswriter.dbs)
196 <            common.logger.debug(6,"Primary:  %s "%primary)
196 >            common.logger.log(10-1,"Primary:  %s "%primary)
197              
198              algo = DBSWriterObjects.createAlgorithm(dataset, cfgMeta, dbswriter.dbs)
199 <            common.logger.debug(6,"Algo:  %s "%algo)
199 >            common.logger.log(10-1,"Algo:  %s "%algo)
200  
201              processed = DBSWriterObjects.createProcessedDataset(primary, algo, dataset, dbswriter.dbs)
202 <            common.logger.debug(6,"Processed:  %s "%processed)
202 >            common.logger.log(10-1,"Processed:  %s "%processed)
203              
204 <            common.logger.debug(6,"Inserted primary %s processed %s"%(primary,processed))
204 >            common.logger.log(10-1,"Inserted primary %s processed %s"%(primary,processed))
205              
206 <        common.logger.debug(6,"exit_status = %s "%self.exit_status)
206 >        common.logger.log(10-1,"exit_status = %s "%self.exit_status)
207          return self.exit_status    
208  
209      def publishAJobReport(self,file,procdataset):
210          """
211             input:  xml file, processedDataset
212          """
213 +        common.logger.debug("FJR = %s"%file)
214          try:
215              jobReport = readJobReport(file)[0]
216              self.exit_status = '0'
# Line 162 | Line 229 | class Publisher(Actor):
229              elif (file['LFN'] == ''):
230                  self.noLFN.append(file['PFN'])
231              else:
232 <                if int(file['TotalEvents']) != 0 :
233 <                    file.lumisections = {}
232 >                if  self.skipOcheck==0:
233 >                    if int(file['TotalEvents']) != 0:
234 >                        ### Fede to insert also run and lumi info in DBS
235 >                        #file.runs = {}
236 >                        for ds in file.dataset:
237 >                            ### Fede for production
238 >                            if (ds['PrimaryDataset'] == 'null'):
239 >                                ds['PrimaryDataset']=self.userprocessedData
240 >                        filestopublish.append(file)
241 >                    else:
242 >                        self.noEventsFiles.append(file['LFN'])
243 >                else:
244 >                    ### Fede to insert also run and lumi info in DBS
245 >                    #file.runs = {}
246                      for ds in file.dataset:
168                        ds['ProcessedDataset']=procdataset
247                          ### Fede for production
248                          if (ds['PrimaryDataset'] == 'null'):
249 <                            ds['PrimaryDataset']=procdataset
249 >                            ds['PrimaryDataset']=self.userprocessedData
250                      filestopublish.append(file)
251 <                else:
174 <                    self.noEventsFiles.append(file['LFN'])
251 >      
252          jobReport.files = filestopublish
253 +        for file in filestopublish:
254 +            common.logger.debug("--->>> LFN of file to publish =  " + str(file['LFN']))
255          ### if all files of FJR have number of events = 0
256          if (len(filestopublish) == 0):
257 <           return None
257 >            return None
258            
259          #// DBS to contact
260          dbswriter = DBSWriter(self.DBSURL)
261          # insert files
262          Blocks=None
263          try:
264 <            Blocks=dbswriter.insertFiles(jobReport)
265 <            common.logger.message("Blocks = %s"%Blocks)
264 >            ### FEDE added insertDetectorData = True to propagate in DBS info about run and lumi
265 >            Blocks=dbswriter.insertFiles(jobReport, insertDetectorData = True)
266 >            #Blocks=dbswriter.insertFiles(jobReport)
267 >            common.logger.debug("--->>> Inserting file in blocks = %s"%Blocks)
268          except DBSWriterError, ex:
269 <            common.logger.message("Insert file error: %s"%ex)
269 >            common.logger.debug("--->>> Insert file error: %s"%ex)
270          return Blocks
271  
272      def run(self):
# Line 194 | Line 275 | class Publisher(Actor):
275          """
276          
277          file_list = glob.glob(self.resDir+"crab_fjr*.xml")
278 <        common.logger.debug(6, "file_list = "+str(file_list))
279 <        common.logger.debug(6, "len(file_list) = "+str(len(file_list)))
278 >        
279 >        ## Select only those fjr that are succesfull
280 >        if (len(file_list)==0):
281 >            common.logger.info("--->>> "+self.resDir+" empty: no file to publish on DBS")
282 >            self.exit_status = '1'
283 >            return self.exit_status
284 >
285 >        good_list=[]
286 >        for fjr in file_list:
287 >            reports = readJobReport(fjr)
288 >            if len(reports)>0:
289 >               if reports[0].status == "Success":
290 >                  good_list.append(fjr)
291 >        file_list=good_list
292 >        ##
293 >        common.logger.log(10-1, "file_list = "+str(file_list))
294 >        common.logger.log(10-1, "len(file_list) = "+str(len(file_list)))
295              
296          if (len(file_list)>0):
297              BlocksList=[]
298 <            common.logger.message("--->>> Start dataset publication")
298 >            common.logger.info("--->>> Start dataset publication")
299              self.exit_status=self.publishDataset(file_list[0])
300              if (self.exit_status == '1'):
301                  return self.exit_status
302 <            common.logger.message("--->>> End dataset publication")
302 >            common.logger.info("--->>> End dataset publication")
303  
304  
305 <            common.logger.message("--->>> Start files publication")
305 >            common.logger.info("--->>> Start files publication")
306              for file in file_list:
211                common.logger.message("file = "+file)
307                  Blocks=self.publishAJobReport(file,self.processedData)
308                  if Blocks:
309 <                    [BlocksList.append(x) for x in Blocks]
309 >                    for x in Blocks: # do not allow multiple entries of the same block
310 >                        if x not in BlocksList:
311 >                           BlocksList.append(x)
312                      
313              # close the blocks
314 <            common.logger.debug(6, "BlocksList = %s"%BlocksList)
218 <            # dbswriter = DBSWriter(self.DBSURL,level='ERROR')
314 >            common.logger.log(10-1, "BlocksList = %s"%BlocksList)
315              dbswriter = DBSWriter(self.DBSURL)
316              
317              for BlockName in BlocksList:
318                  try:  
319                      closeBlock=dbswriter.manageFileBlock(BlockName,maxFiles= 1)
320 <                    common.logger.debug(6, "closeBlock %s"%closeBlock)
225 <                    #dbswriter.dbs.closeBlock(BlockName)
320 >                    common.logger.log(10-1, "closeBlock %s"%closeBlock)
321                  except DBSWriterError, ex:
322 <                    common.logger.message("Close block error %s"%ex)
322 >                    common.logger.info("Close block error %s"%ex)
323  
324              if (len(self.noEventsFiles)>0):
325 <                common.logger.message("--->>> WARNING: "+str(len(self.noEventsFiles))+" files not published because they contain 0 events are:")
325 >                common.logger.info("--->>> WARNING: "+str(len(self.noEventsFiles))+" files not published because they contain 0 events are:")
326                  for lfn in self.noEventsFiles:
327 <                    common.logger.message("------ LFN: %s"%lfn)
327 >                    common.logger.info("------ LFN: %s"%lfn)
328              if (len(self.noLFN)>0):
329 <                common.logger.message("--->>> WARNING: there are "+str(len(self.noLFN))+" files not published because they have empty LFN")
329 >                common.logger.info("--->>> WARNING: there are "+str(len(self.noLFN))+" files not published because they have empty LFN")
330                  for pfn in self.noLFN:
331 <                    common.logger.message("------ pfn: %s"%pfn)
331 >                    common.logger.info("------ pfn: %s"%pfn)
332              if (len(self.problemFiles)>0):
333 <                common.logger.message("--->>> WARNING: "+str(len(self.problemFiles))+" files not published because they had problem with copy to SE")
333 >                common.logger.info("--->>> WARNING: "+str(len(self.problemFiles))+" files not published because they had problem with copy to SE")
334                  for lfn in self.problemFiles:
335 <                    common.logger.message("------ LFN: %s"%lfn)
336 <            common.logger.message("--->>> End files publication")
337 <            common.logger.message("--->>> To check data publication please use: InspectDBS2.py --DBSURL=<dbs_url_for_publication> --datasetPath=<User Dataset Name>")
335 >                    common.logger.info("------ LFN: %s"%lfn)
336 >            common.logger.info("--->>> End files publication")
337 >          
338 >            self.cfg_params['USER.dataset_to_check']=self.dataset_to_check
339 >            from InspectDBS import InspectDBS
340 >            check=InspectDBS(self.cfg_params)
341 >            check.checkPublication()
342              return self.exit_status
343  
344          else:
345 <            common.logger.message("--->>> "+self.resDir+" empty: no file to publish on DBS")
345 >            common.logger.info("--->>> No valid files to publish on DBS. Your jobs do not report exit codes = 0")
346              self.exit_status = '1'
347              return self.exit_status
348      

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines