ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Publisher.py
(Generate patch)

Comparing COMP/CRAB/python/Publisher.py (file contents):
Revision 1.21 by fanzago, Tue Jan 20 11:20:46 2009 UTC vs.
Revision 1.46 by fanzago, Thu Mar 4 14:56:16 2010 UTC

# Line 3 | Line 3 | import common
3   import time, glob
4   from Actor import *
5   from crab_util import *
6 from crab_logger import Logger
6   from crab_exceptions import *
7   from ProdCommon.FwkJobRep.ReportParser import readJobReport
8   from ProdCommon.FwkJobRep.ReportState import checkSuccess
# Line 13 | Line 12 | from ProdCommon.DataMgmt.DBS.DBSErrors i
12   from ProdCommon.DataMgmt.DBS.DBSReader import DBSReader
13   from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter,DBSWriterObjects
14   import sys
15 + from DBSAPI.dbsApiException import DbsException
16 + from DBSAPI.dbsApi import DbsApi
17  
18   class Publisher(Actor):
19      def __init__(self, cfg_params):
# Line 24 | Line 25 | class Publisher(Actor):
25          - publishes output data on DBS and DLS
26          """
27  
28 <        try:
29 <            self.userprocessedData = cfg_params['USER.publish_data_name']
30 <            self.processedData = None
30 <        except KeyError:
28 >        self.cfg_params=cfg_params
29 >      
30 >        if not cfg_params.has_key('USER.publish_data_name'):
31              raise CrabException('Cannot publish output data, because you did not specify USER.publish_data_name parameter in the crab.cfg file')
32 +        self.userprocessedData = cfg_params['USER.publish_data_name']
33 +        self.processedData = None
34  
35 <        try:
36 <            if (int(cfg_params['USER.copy_data']) != 1): raise KeyError
37 <        except KeyError:
38 <            raise CrabException('You can not publish data because you did not selected *** copy_data = 1  *** in the crab.cfg file')
39 <        try:
40 <            self.pset = cfg_params['CMSSW.pset']
41 <        except KeyError:
35 >        if (not cfg_params.has_key('USER.copy_data') or int(cfg_params['USER.copy_data']) != 1 ) or \
36 >            (not cfg_params.has_key('USER.publish_data') or int(cfg_params['USER.publish_data']) != 1 ):
37 >            msg  = 'You can not publish data because you did not selected \n'
38 >            msg += '\t*** copy_data = 1 and publish_data = 1  *** in the crab.cfg file'
39 >            raise CrabException(msg)
40 >
41 >        if not cfg_params.has_key('CMSSW.pset'):
42              raise CrabException('Cannot publish output data, because you did not specify the psetname in [CMSSW] of your crab.cfg file')
43 <        try:
44 <            self.globalDBS=cfg_params['CMSSW.dbs_url']
45 <        except KeyError:
46 <            self.globalDBS="http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet"
47 <        try:
46 <            self.DBSURL=cfg_params['USER.dbs_url_for_publication']
47 <            common.logger.message('<dbs_url_for_publication> = '+self.DBSURL)
48 <            if (self.DBSURL == "http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet") or (self.DBSURL == "https://cmsdbsprod.cern.ch:8443/cms_dbs_prod_global_writer/servlet/DBSServlet"):
49 <                msg = "You can not publish your data in the globalDBS = " + self.DBSURL + "\n"
50 <                msg = msg + "Please write your local one in the [USER] section 'dbs_url_for_publication'"
51 <                raise CrabException(msg)
52 <        except KeyError:
43 >        self.pset = cfg_params['CMSSW.pset']
44 >
45 >        self.globalDBS=cfg_params.get('CMSSW.dbs_url',"http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet")
46 >
47 >        if not cfg_params.has_key('USER.dbs_url_for_publication'):
48              msg = "Warning. The [USER] section does not have 'dbs_url_for_publication'"
49              msg = msg + " entry, necessary to publish the data.\n"
50              msg = msg + "Use the command **crab -publish -USER.dbs_url_for_publication=dbs_url_for_publication*** \nwhere dbs_url_for_publication is your local dbs instance."
51              raise CrabException(msg)
52 +
53 +        self.DBSURL=cfg_params['USER.dbs_url_for_publication']
54 +        common.logger.info('<dbs_url_for_publication> = '+self.DBSURL)
55 +        if (self.DBSURL == "http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet") or (self.DBSURL == "https://cmsdbsprod.cern.ch:8443/cms_dbs_prod_global_writer/servlet/DBSServlet"):
56 +            msg = "You can not publish your data in the globalDBS = " + self.DBSURL + "\n"
57 +            msg = msg + "Please write your local one in the [USER] section 'dbs_url_for_publication'"
58 +            raise CrabException(msg)
59              
60          self.content=file(self.pset).read()
61          self.resDir = common.work_space.resDir()
# Line 72 | Line 74 | class Publisher(Actor):
74                  dataset=string.strip(dataset)
75                  self.dataset_to_import.append(dataset)
76          ###        
77 <                
77 >            
78 >        #self.import_all_parents = cfg_params.get('USER.publish_with_import_all_parents',1)
79 >        self.skipOcheck=cfg_params.get('CMSSW.publish_zero_event',0)
80 >    
81          self.SEName=''
82          self.CMSSW_VERSION=''
83          self.exit_status=''
# Line 83 | Line 88 | class Publisher(Actor):
88          
89      def importParentDataset(self,globalDBS, datasetpath):
90          """
91 <        """
91 >        """
92          dbsWriter = DBSWriter(self.DBSURL,level='ERROR')
93          
94          try:
95 <            dbsWriter.importDatasetWithoutParentage(globalDBS, self.datasetpath, self.DBSURL)
95 >            #if (self.import_all_parents==1):
96 >            common.logger.info("--->>> Importing all parents level")
97 >            start = time.time()
98 >            common.logger.debug("start import time: " + str(start))
99 >            ### to skip the ProdCommon api exception in the case of block without location
100 >            ### skipNoSiteError=True
101 >            #dbsWriter.importDataset(globalDBS, datasetpath, self.DBSURL, skipNoSiteError=True)
102 >            ### calling dbs api directly
103 >            dbsWriter.dbs.migrateDatasetContents(globalDBS, self.DBSURL, datasetpath)
104 >            stop = time.time()
105 >            common.logger.debug("stop import time: " + str(stop))
106 >            common.logger.info("--->>> duration of all parents import (sec): "+str(stop - start))
107 >            ## still not removing the code, but TODO for the final release...
108 >            """                                                    
109 >            else:
110 >                common.logger.info("--->>> Importing only the datasetpath " + datasetpath)
111 >                start = time.time()
112 >                #dbsWriter.importDatasetWithoutParentage(globalDBS, datasetpath, self.DBSURL, skipNoSiteError=True)
113 >                ### calling dbs api directly
114 >                common.logger.debug("start import time: " + str(start))
115 >                dbsWriter.dbs.migrateDatasetContents(globalDBS, self.DBSURL, datasetpath, noParentsReadOnly = True )
116 >                stop = time.time()
117 >                common.logger.debug("stop import time: " + str(stop))
118 >                common.logger.info("--->>> duration of first level parent import (sec): "+str(stop - start))
119 >            """
120          except DBSWriterError, ex:
121              msg = "Error importing dataset to be processed into local DBS\n"
122              msg += "Source Dataset: %s\n" % datasetpath
123              msg += "Source DBS: %s\n" % globalDBS
124              msg += "Destination DBS: %s\n" % self.DBSURL
125 <            common.logger.message(msg)
125 >            common.logger.info(msg)
126 >            common.logger.info(str(ex))
127              return 1
128          return 0
129 <          
129 >
130      def publishDataset(self,file):
131          """
132          """
# Line 106 | Line 136 | class Publisher(Actor):
136          except IndexError:
137              self.exit_status = '1'
138              msg = "Error: Problem with "+file+" file"  
139 <            common.logger.message(msg)
139 >            common.logger.info(msg)
140              return self.exit_status
141  
142          if (len(self.dataset_to_import) != 0):
143             for dataset in self.dataset_to_import:
144 <               common.logger.message("--->>> Importing parent dataset in the dbs: " +dataset)
144 >               common.logger.info("--->>> Importing parent dataset in the dbs: " +dataset)
145                 status_import=self.importParentDataset(self.globalDBS, dataset)
146                 if (status_import == 1):
147 <                   common.logger.message('Problem with parent '+ dataset +' import from the global DBS '+self.globalDBS+ 'to the local one '+self.DBSURL)
147 >                   common.logger.info('Problem with parent '+ dataset +' import from the global DBS '+self.globalDBS+ 'to the local one '+self.DBSURL)
148                     self.exit_status='1'
149                     return self.exit_status
150                 else:    
151 <                   common.logger.message('Import ok of dataset '+dataset)
151 >                   common.logger.info('Import ok of dataset '+dataset)
152              
153          #// DBS to contact
154          dbswriter = DBSWriter(self.DBSURL)                        
# Line 127 | Line 157 | class Publisher(Actor):
157              self.exit_status = '0'
158          except IndexError:
159              self.exit_status = '1'
160 <            msg = "Error: No file to publish in xml file"+file+" file"  
161 <            common.logger.message(msg)
160 >            msg = "Error: No EDM file to publish in xml file"+file+" file"  
161 >            common.logger.info(msg)
162              return self.exit_status
163  
164          datasets=fileinfo.dataset
165 <        common.logger.debug(6,"FileInfo = " + str(fileinfo))
166 <        common.logger.debug(6,"DatasetInfo = " + str(datasets))
165 >        common.logger.log(10-1,"FileInfo = " + str(fileinfo))
166 >        common.logger.log(10-1,"DatasetInfo = " + str(datasets))
167          if len(datasets)<=0:
168             self.exit_status = '1'
169             msg = "Error: No info about dataset in the xml file "+file
170 <           common.logger.message(msg)
170 >           common.logger.info(msg)
171             return self.exit_status
172          for dataset in datasets:
173              #### for production data
# Line 151 | Line 181 | class Publisher(Actor):
181      
182              dataset['PSetContent']=self.content
183              cfgMeta = {'name' : self.pset , 'Type' : 'user' , 'annotation': 'user cfg', 'version' : 'private version'} # add real name of user cfg
184 <            common.logger.message("PrimaryDataset = %s"%dataset['PrimaryDataset'])
185 <            common.logger.message("ProcessedDataset = %s"%dataset['ProcessedDataset'])
186 <            common.logger.message("<User Dataset Name> = /"+dataset['PrimaryDataset']+"/"+dataset['ProcessedDataset']+"/USER")
184 >            common.logger.info("PrimaryDataset = %s"%dataset['PrimaryDataset'])
185 >            common.logger.info("ProcessedDataset = %s"%dataset['ProcessedDataset'])
186 >            common.logger.info("<User Dataset Name> = /"+dataset['PrimaryDataset']+"/"+dataset['ProcessedDataset']+"/USER")
187 >            self.dataset_to_check="/"+dataset['PrimaryDataset']+"/"+dataset['ProcessedDataset']+"/USER"
188              
189 <            common.logger.debug(6,"--->>> Inserting primary: %s processed : %s"%(dataset['PrimaryDataset'],dataset['ProcessedDataset']))
189 >            common.logger.log(10-1,"--->>> Inserting primary: %s processed : %s"%(dataset['PrimaryDataset'],dataset['ProcessedDataset']))
190              
191              primary = DBSWriterObjects.createPrimaryDataset( dataset, dbswriter.dbs)
192 <            common.logger.debug(6,"Primary:  %s "%primary)
192 >            common.logger.log(10-1,"Primary:  %s "%primary)
193              
194              algo = DBSWriterObjects.createAlgorithm(dataset, cfgMeta, dbswriter.dbs)
195 <            common.logger.debug(6,"Algo:  %s "%algo)
195 >            common.logger.log(10-1,"Algo:  %s "%algo)
196  
197              processed = DBSWriterObjects.createProcessedDataset(primary, algo, dataset, dbswriter.dbs)
198 <            common.logger.debug(6,"Processed:  %s "%processed)
198 >            common.logger.log(10-1,"Processed:  %s "%processed)
199              
200 <            common.logger.debug(6,"Inserted primary %s processed %s"%(primary,processed))
200 >            common.logger.log(10-1,"Inserted primary %s processed %s"%(primary,processed))
201              
202 <        common.logger.debug(6,"exit_status = %s "%self.exit_status)
202 >        common.logger.log(10-1,"exit_status = %s "%self.exit_status)
203          return self.exit_status    
204  
205      def publishAJobReport(self,file,procdataset):
206          """
207             input:  xml file, processedDataset
208          """
209 +        common.logger.debug("FJR = %s"%file)
210          try:
211              jobReport = readJobReport(file)[0]
212              self.exit_status = '0'
# Line 193 | Line 225 | class Publisher(Actor):
225              elif (file['LFN'] == ''):
226                  self.noLFN.append(file['PFN'])
227              else:
228 <                if int(file['TotalEvents']) != 0 :
229 <                    #file.lumisections = {}
230 <                    # lumi info are now in run hash
231 <                    file.runs = {}
228 >                if  self.skipOcheck==0:
229 >                    if int(file['TotalEvents']) != 0:
230 >                        ### Fede to insert also run and lumi info in DBS
231 >                        #file.runs = {}
232 >                        for ds in file.dataset:
233 >                            ### Fede for production
234 >                            if (ds['PrimaryDataset'] == 'null'):
235 >                                ds['PrimaryDataset']=self.userprocessedData
236 >                        filestopublish.append(file)
237 >                    else:
238 >                        self.noEventsFiles.append(file['LFN'])
239 >                else:
240 >                    ### Fede to insert also run and lumi info in DBS
241 >                    #file.runs = {}
242                      for ds in file.dataset:
243                          ### Fede for production
244                          if (ds['PrimaryDataset'] == 'null'):
203                            #ds['PrimaryDataset']=procdataset
245                              ds['PrimaryDataset']=self.userprocessedData
246                      filestopublish.append(file)
247 <                else:
207 <                    self.noEventsFiles.append(file['LFN'])
247 >      
248          jobReport.files = filestopublish
249 +        for file in filestopublish:
250 +            common.logger.debug("--->>> LFN of file to publish =  " + str(file['LFN']))
251          ### if all files of FJR have number of events = 0
252          if (len(filestopublish) == 0):
253 <           return None
253 >            return None
254            
255          #// DBS to contact
256          dbswriter = DBSWriter(self.DBSURL)
257          # insert files
258          Blocks=None
259          try:
260 <            Blocks=dbswriter.insertFiles(jobReport)
261 <            common.logger.message("Inserting file in blocks = %s"%Blocks)
260 >            ### FEDE added insertDetectorData = True to propagate in DBS info about run and lumi
261 >            Blocks=dbswriter.insertFiles(jobReport, insertDetectorData = True)
262 >            #Blocks=dbswriter.insertFiles(jobReport)
263 >            common.logger.debug("--->>> Inserting file in blocks = %s"%Blocks)
264          except DBSWriterError, ex:
265 <            common.logger.error("Insert file error: %s"%ex)
265 >            common.logger.debug("--->>> Insert file error: %s"%ex)
266          return Blocks
267  
268      def run(self):
# Line 227 | Line 271 | class Publisher(Actor):
271          """
272          
273          file_list = glob.glob(self.resDir+"crab_fjr*.xml")
274 +        
275          ## Select only those fjr that are succesfull
276 +        if (len(file_list)==0):
277 +            common.logger.info("--->>> "+self.resDir+" empty: no file to publish on DBS")
278 +            self.exit_status = '1'
279 +            return self.exit_status
280 +
281          good_list=[]
282          for fjr in file_list:
283              reports = readJobReport(fjr)
# Line 236 | Line 286 | class Publisher(Actor):
286                    good_list.append(fjr)
287          file_list=good_list
288          ##
289 <        common.logger.debug(6, "file_list = "+str(file_list))
290 <        common.logger.debug(6, "len(file_list) = "+str(len(file_list)))
289 >        common.logger.log(10-1, "file_list = "+str(file_list))
290 >        common.logger.log(10-1, "len(file_list) = "+str(len(file_list)))
291              
292          if (len(file_list)>0):
293              BlocksList=[]
294 <            common.logger.message("--->>> Start dataset publication")
294 >            common.logger.info("--->>> Start dataset publication")
295              self.exit_status=self.publishDataset(file_list[0])
296              if (self.exit_status == '1'):
297                  return self.exit_status
298 <            common.logger.message("--->>> End dataset publication")
298 >            common.logger.info("--->>> End dataset publication")
299  
300  
301 <            common.logger.message("--->>> Start files publication")
301 >            common.logger.info("--->>> Start files publication")
302              for file in file_list:
253                common.logger.debug(1, "file = "+file)
303                  Blocks=self.publishAJobReport(file,self.processedData)
304                  if Blocks:
305                      for x in Blocks: # do not allow multiple entries of the same block
# Line 258 | Line 307 | class Publisher(Actor):
307                             BlocksList.append(x)
308                      
309              # close the blocks
310 <            common.logger.debug(6, "BlocksList = %s"%BlocksList)
262 <            # dbswriter = DBSWriter(self.DBSURL,level='ERROR')
310 >            common.logger.log(10-1, "BlocksList = %s"%BlocksList)
311              dbswriter = DBSWriter(self.DBSURL)
312              
313              for BlockName in BlocksList:
314                  try:  
315                      closeBlock=dbswriter.manageFileBlock(BlockName,maxFiles= 1)
316 <                    common.logger.debug(6, "closeBlock %s"%closeBlock)
269 <                    #dbswriter.dbs.closeBlock(BlockName)
316 >                    common.logger.log(10-1, "closeBlock %s"%closeBlock)
317                  except DBSWriterError, ex:
318 <                    common.logger.message("Close block error %s"%ex)
318 >                    common.logger.info("Close block error %s"%ex)
319  
320              if (len(self.noEventsFiles)>0):
321 <                common.logger.message("--->>> WARNING: "+str(len(self.noEventsFiles))+" files not published because they contain 0 events are:")
321 >                common.logger.info("--->>> WARNING: "+str(len(self.noEventsFiles))+" files not published because they contain 0 events are:")
322                  for lfn in self.noEventsFiles:
323 <                    common.logger.message("------ LFN: %s"%lfn)
323 >                    common.logger.info("------ LFN: %s"%lfn)
324              if (len(self.noLFN)>0):
325 <                common.logger.message("--->>> WARNING: there are "+str(len(self.noLFN))+" files not published because they have empty LFN")
325 >                common.logger.info("--->>> WARNING: there are "+str(len(self.noLFN))+" files not published because they have empty LFN")
326                  for pfn in self.noLFN:
327 <                    common.logger.message("------ pfn: %s"%pfn)
327 >                    common.logger.info("------ pfn: %s"%pfn)
328              if (len(self.problemFiles)>0):
329 <                common.logger.message("--->>> WARNING: "+str(len(self.problemFiles))+" files not published because they had problem with copy to SE")
329 >                common.logger.info("--->>> WARNING: "+str(len(self.problemFiles))+" files not published because they had problem with copy to SE")
330                  for lfn in self.problemFiles:
331 <                    common.logger.message("------ LFN: %s"%lfn)
332 <            common.logger.message("--->>> End files publication")
333 <            common.logger.message("--->>> To check data publication please use: InspectDBS2.py --DBSURL=<dbs_url_for_publication> --datasetPath=<User Dataset Name>")
331 >                    common.logger.info("------ LFN: %s"%lfn)
332 >            common.logger.info("--->>> End files publication")
333 >          
334 >            self.cfg_params['USER.dataset_to_check']=self.dataset_to_check
335 >            from InspectDBS import InspectDBS
336 >            check=InspectDBS(self.cfg_params)
337 >            check.checkPublication()
338              return self.exit_status
339  
340          else:
341 <            common.logger.message("--->>> "+self.resDir+" empty: no file to publish on DBS")
341 >            common.logger.info("--->>> No valid files to publish on DBS. Your jobs do not report exit codes = 0")
342              self.exit_status = '1'
343              return self.exit_status
344      

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines