ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Publisher.py
(Generate patch)

Comparing COMP/CRAB/python/Publisher.py (file contents):
Revision 1.17 by afanfani, Wed Oct 15 09:28:21 2008 UTC vs.
Revision 1.40.2.4 by ewv, Tue Oct 13 14:01:15 2009 UTC

# Line 3 | Line 3 | import common
3   import time, glob
4   from Actor import *
5   from crab_util import *
6 from crab_logger import Logger
6   from crab_exceptions import *
7   from ProdCommon.FwkJobRep.ReportParser import readJobReport
8 + from ProdCommon.FwkJobRep.ReportState import checkSuccess
9   from ProdCommon.MCPayloads.WorkflowSpec import WorkflowSpec
10   from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter
11   from ProdCommon.DataMgmt.DBS.DBSErrors import DBSWriterError, formatEx,DBSReaderError
12   from ProdCommon.DataMgmt.DBS.DBSReader import DBSReader
13   from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter,DBSWriterObjects
14   import sys
15 + from DBSAPI.dbsApiException import DbsException
16 + from DBSAPI.dbsApi import DbsApi
17  
18   class Publisher(Actor):
19      def __init__(self, cfg_params):
20          """
21 <        Publisher class:
21 >        Publisher class:
22  
23          - parses CRAB FrameworkJobReport on UI
24          - returns <file> section of xml in dictionary format for each xml file in crab_0_xxxx/res directory
25          - publishes output data on DBS and DLS
26          """
27  
28 <        try:
29 <            userprocessedData = cfg_params['USER.publish_data_name']
30 <            self.processedData = None
29 <        except KeyError:
28 >        self.cfg_params=cfg_params
29 >
30 >        if not cfg_params.has_key('USER.publish_data_name'):
31              raise CrabException('Cannot publish output data, because you did not specify USER.publish_data_name parameter in the crab.cfg file')
32 +        self.userprocessedData = cfg_params['USER.publish_data_name']
33 +        self.processedData = None
34  
35 <        try:
36 <            if (int(cfg_params['USER.copy_data']) != 1): raise KeyError
37 <        except KeyError:
38 <            raise CrabException('You can not publish data because you did not selected *** copy_data = 1  *** in the crab.cfg file')
39 <        try:
40 <            self.pset = cfg_params['CMSSW.pset']
41 <        except KeyError:
35 >        if (not cfg_params.has_key('USER.copy_data') or int(cfg_params['USER.copy_data']) != 1 ) or \
36 >            (not cfg_params.has_key('USER.publish_data') or int(cfg_params['USER.publish_data']) != 1 ):
37 >            msg  = 'You can not publish data because you did not selected \n'
38 >            msg += '\t*** copy_data = 1 and publish_data = 1  *** in the crab.cfg file'
39 >            raise CrabException(msg)
40 >
41 >        if not cfg_params.has_key('CMSSW.pset'):
42              raise CrabException('Cannot publish output data, because you did not specify the psetname in [CMSSW] of your crab.cfg file')
43 <        try:
44 <            self.globalDBS=cfg_params['CMSSW.dbs_url']
45 <        except KeyError:
46 <            self.globalDBS="http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet"
47 <        try:
45 <            self.DBSURL=cfg_params['USER.dbs_url_for_publication']
46 <            common.logger.message('<dbs_url_for_publication> = '+self.DBSURL)
47 <            if (self.DBSURL == "http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet") or (self.DBSURL == "https://cmsdbsprod.cern.ch:8443/cms_dbs_prod_global_writer/servlet/DBSServlet"):
48 <                msg = "You can not publish your data in the globalDBS = " + self.DBSURL + "\n"
49 <                msg = msg + "Please write your local one in the [USER] section 'dbs_url_for_publication'"
50 <                raise CrabException(msg)
51 <        except KeyError:
43 >        self.pset = cfg_params['CMSSW.pset']
44 >
45 >        self.globalDBS=cfg_params.get('CMSSW.dbs_url',"http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet")
46 >
47 >        if not cfg_params.has_key('USER.dbs_url_for_publication'):
48              msg = "Warning. The [USER] section does not have 'dbs_url_for_publication'"
49              msg = msg + " entry, necessary to publish the data.\n"
50              msg = msg + "Use the command **crab -publish -USER.dbs_url_for_publication=dbs_url_for_publication*** \nwhere dbs_url_for_publication is your local dbs instance."
51              raise CrabException(msg)
52 <            
52 >
53 >        self.DBSURL=cfg_params['USER.dbs_url_for_publication']
54 >        common.logger.info('<dbs_url_for_publication> = '+self.DBSURL)
55 >        if (self.DBSURL == "http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet") or (self.DBSURL == "https://cmsdbsprod.cern.ch:8443/cms_dbs_prod_global_writer/servlet/DBSServlet"):
56 >            msg = "You can not publish your data in the globalDBS = " + self.DBSURL + "\n"
57 >            msg = msg + "Please write your local one in the [USER] section 'dbs_url_for_publication'"
58 >            raise CrabException(msg)
59 >
60          self.content=file(self.pset).read()
61          self.resDir = common.work_space.resDir()
62 <        
62 >
63          self.dataset_to_import=[]
64 <        
64 >
65          self.datasetpath=cfg_params['CMSSW.datasetpath']
66          if (self.datasetpath.upper() != 'NONE'):
67              self.dataset_to_import.append(self.datasetpath)
68 <        
68 >
69          ### Added PU dataset
70          tmp = cfg_params.get('CMSSW.dataset_pu',None)
71          if tmp :
# Line 70 | Line 73 | class Publisher(Actor):
73              for dataset in datasets:
74                  dataset=string.strip(dataset)
75                  self.dataset_to_import.append(dataset)
76 <        ###        
77 <                
76 >        ###
77 >
78 >        self.import_all_parents = cfg_params.get('USER.publish_with_import_all_parents',1)
79 >        if not self.import_all_parents:
80 >                raise CrabException("Support for publish_with_import_all_parents has been disabled." +
81 >                                    "If you think you need this, please contact CRAB Feedback HN")
82 >        self.skipOcheck=cfg_params.get('CMSSW.publish_zero_event',0)
83 >
84          self.SEName=''
85          self.CMSSW_VERSION=''
86          self.exit_status=''
87          self.time = time.strftime('%y%m%d_%H%M%S',time.localtime(time.time()))
88 <        self.problemFiles=[]  
88 >        self.problemFiles=[]
89          self.noEventsFiles=[]
90          self.noLFN=[]
91 <        
91 >
92      def importParentDataset(self,globalDBS, datasetpath):
93          """
94 <        """
94 >        """
95          dbsWriter = DBSWriter(self.DBSURL,level='ERROR')
96 <        
96 >
97          try:
98 <            dbsWriter.importDatasetWithoutParentage(globalDBS, self.datasetpath, self.DBSURL)
98 >            if (self.import_all_parents==1):
99 >                common.logger.info("--->>> Importing all parents level")
100 >                start = time.time()
101 >                common.logger.debug("start import time: " + str(start))
102 >                ### to skip the ProdCommon api exception in the case of block without location
103 >                ### skipNoSiteError=True
104 >                #dbsWriter.importDataset(globalDBS, datasetpath, self.DBSURL, skipNoSiteError=True)
105 >                ### calling dbs api directly
106 >                dbsWriter.dbs.migrateDatasetContents(globalDBS, self.DBSURL, datasetpath)
107 >                stop = time.time()
108 >                common.logger.debug("stop import time: " + str(stop))
109 >                common.logger.info("--->>> duration of all parents import (sec): "+str(stop - start))
110 >
111 >            else:
112 >                raise CrabException("Support for publish_with_import_all_parents has been disabled." +
113 >                                    "If you think you need this, please contact CRAB Feedback HN")
114 >                common.logger.info("--->>> Importing only the datasetpath " + datasetpath)
115 >                start = time.time()
116 >                #dbsWriter.importDatasetWithoutParentage(globalDBS, datasetpath, self.DBSURL, skipNoSiteError=True)
117 >                ### calling dbs api directly
118 >                common.logger.debug("start import time: " + str(start))
119 >                dbsWriter.dbs.migrateDatasetContents(globalDBS, self.DBSURL, datasetpath, noParentsReadOnly = True )
120 >                stop = time.time()
121 >                common.logger.debug("stop import time: " + str(stop))
122 >                common.logger.info("--->>> duration of first level parent import (sec): "+str(stop - start))
123          except DBSWriterError, ex:
124              msg = "Error importing dataset to be processed into local DBS\n"
125              msg += "Source Dataset: %s\n" % datasetpath
126              msg += "Source DBS: %s\n" % globalDBS
127              msg += "Destination DBS: %s\n" % self.DBSURL
128 <            common.logger.message(msg)
128 >            common.logger.info(msg)
129 >            common.logger.info(str(ex))
130              return 1
131          return 0
132 <          
132 >
133      def publishDataset(self,file):
134          """
135          """
# Line 104 | Line 138 | class Publisher(Actor):
138              self.exit_status = '0'
139          except IndexError:
140              self.exit_status = '1'
141 <            msg = "Error: Problem with "+file+" file"  
142 <            common.logger.message(msg)
141 >            msg = "Error: Problem with "+file+" file"
142 >            common.logger.info(msg)
143              return self.exit_status
144  
145          if (len(self.dataset_to_import) != 0):
146             for dataset in self.dataset_to_import:
147 <               common.logger.message("--->>> Importing parent dataset in the dbs: " +dataset)
147 >               common.logger.info("--->>> Importing parent dataset in the dbs: " +dataset)
148                 status_import=self.importParentDataset(self.globalDBS, dataset)
149                 if (status_import == 1):
150 <                   common.logger.message('Problem with parent '+ dataset +' import from the global DBS '+self.globalDBS+ 'to the local one '+self.DBSURL)
150 >                   common.logger.info('Problem with parent '+ dataset +' import from the global DBS '+self.globalDBS+ 'to the local one '+self.DBSURL)
151                     self.exit_status='1'
152                     return self.exit_status
153 <               else:    
154 <                   common.logger.message('Import ok of dataset '+dataset)
155 <            
153 >               else:
154 >                   common.logger.info('Import ok of dataset '+dataset)
155 >
156          #// DBS to contact
157 <        dbswriter = DBSWriter(self.DBSURL)                        
158 <        try:  
157 >        dbswriter = DBSWriter(self.DBSURL)
158 >        try:
159              fileinfo= jobReport.files[0]
160              self.exit_status = '0'
161          except IndexError:
162              self.exit_status = '1'
163 <            msg = "Error: No file to publish in xml file"+file+" file"  
164 <            common.logger.message(msg)
163 >            msg = "Error: No EDM file to publish in xml file"+file+" file"
164 >            common.logger.info(msg)
165              return self.exit_status
166  
167 <        datasets=fileinfo.dataset
168 <        common.logger.debug(6,"FileInfo = " + str(fileinfo))
169 <        common.logger.debug(6,"DatasetInfo = " + str(datasets))
167 >        datasets=fileinfo.dataset
168 >        common.logger.log(10-1,"FileInfo = " + str(fileinfo))
169 >        common.logger.log(10-1,"DatasetInfo = " + str(datasets))
170 >        if len(datasets)<=0:
171 >           self.exit_status = '1'
172 >           msg = "Error: No info about dataset in the xml file "+file
173 >           common.logger.info(msg)
174 >           return self.exit_status
175          for dataset in datasets:
176              #### for production data
177              self.processedData = dataset['ProcessedDataset']
178              if (dataset['PrimaryDataset'] == 'null'):
179 <                dataset['PrimaryDataset'] = dataset['ProcessedDataset']
180 <            else: # add parentage from input dataset
179 >                #dataset['PrimaryDataset'] = dataset['ProcessedDataset']
180 >                dataset['PrimaryDataset'] = self.userprocessedData
181 >            #else: # add parentage from input dataset
182 >            elif self.datasetpath.upper() != 'NONE':
183                  dataset['ParentDataset']= self.datasetpath
184 <    
184 >
185              dataset['PSetContent']=self.content
186              cfgMeta = {'name' : self.pset , 'Type' : 'user' , 'annotation': 'user cfg', 'version' : 'private version'} # add real name of user cfg
187 <            common.logger.message("PrimaryDataset = %s"%dataset['PrimaryDataset'])
188 <            common.logger.message("ProcessedDataset = %s"%dataset['ProcessedDataset'])
189 <            common.logger.message("<User Dataset Name> = /"+dataset['PrimaryDataset']+"/"+dataset['ProcessedDataset']+"/USER")
190 <            
191 <            common.logger.debug(6,"--->>> Inserting primary: %s processed : %s"%(dataset['PrimaryDataset'],dataset['ProcessedDataset']))
192 <            
187 >            common.logger.info("PrimaryDataset = %s"%dataset['PrimaryDataset'])
188 >            common.logger.info("ProcessedDataset = %s"%dataset['ProcessedDataset'])
189 >            common.logger.info("<User Dataset Name> = /"+dataset['PrimaryDataset']+"/"+dataset['ProcessedDataset']+"/USER")
190 >            self.dataset_to_check="/"+dataset['PrimaryDataset']+"/"+dataset['ProcessedDataset']+"/USER"
191 >
192 >            common.logger.log(10-1,"--->>> Inserting primary: %s processed : %s"%(dataset['PrimaryDataset'],dataset['ProcessedDataset']))
193 >
194              primary = DBSWriterObjects.createPrimaryDataset( dataset, dbswriter.dbs)
195 <            common.logger.debug(6,"Primary:  %s "%primary)
196 <            
195 >            common.logger.log(10-1,"Primary:  %s "%primary)
196 >
197              algo = DBSWriterObjects.createAlgorithm(dataset, cfgMeta, dbswriter.dbs)
198 <            common.logger.debug(6,"Algo:  %s "%algo)
198 >            common.logger.log(10-1,"Algo:  %s "%algo)
199  
200              processed = DBSWriterObjects.createProcessedDataset(primary, algo, dataset, dbswriter.dbs)
201 <            common.logger.debug(6,"Processed:  %s "%processed)
202 <            
203 <            common.logger.debug(6,"Inserted primary %s processed %s"%(primary,processed))
204 <            
205 <        common.logger.debug(6,"exit_status = %s "%self.exit_status)
206 <        return self.exit_status    
201 >            common.logger.log(10-1,"Processed:  %s "%processed)
202 >
203 >            common.logger.log(10-1,"Inserted primary %s processed %s"%(primary,processed))
204 >
205 >        common.logger.log(10-1,"exit_status = %s "%self.exit_status)
206 >        return self.exit_status
207  
208      def publishAJobReport(self,file,procdataset):
209          """
210             input:  xml file, processedDataset
211          """
212 +        common.logger.debug("FJR = %s"%file)
213          try:
214              jobReport = readJobReport(file)[0]
215              self.exit_status = '0'
# Line 185 | Line 228 | class Publisher(Actor):
228              elif (file['LFN'] == ''):
229                  self.noLFN.append(file['PFN'])
230              else:
231 <                if int(file['TotalEvents']) != 0 :
232 <                    #file.lumisections = {}
233 <                    # lumi info are now in run hash
231 >                if  self.skipOcheck==0:
232 >                    if int(file['TotalEvents']) != 0:
233 >                        #file.lumisections = {}
234 >                        # lumi info are now in run hash
235 >                        file.runs = {}
236 >                        for ds in file.dataset:
237 >                            ### Fede for production
238 >                            if (ds['PrimaryDataset'] == 'null'):
239 >                                #ds['PrimaryDataset']=procdataset
240 >                                ds['PrimaryDataset']=self.userprocessedData
241 >                        filestopublish.append(file)
242 >                    else:
243 >                        self.noEventsFiles.append(file['LFN'])
244 >                else:
245                      file.runs = {}
246                      for ds in file.dataset:
193                        ### FEDE FOR NEW LFN ###
194                        #ds['ProcessedDataset']=procdataset
195                        ########################
247                          ### Fede for production
248                          if (ds['PrimaryDataset'] == 'null'):
249 <                            ds['PrimaryDataset']=procdataset
249 >                            #ds['PrimaryDataset']=procdataset
250 >                            ds['PrimaryDataset']=self.userprocessedData
251                      filestopublish.append(file)
252 <                else:
201 <                    self.noEventsFiles.append(file['LFN'])
252 >
253          jobReport.files = filestopublish
254 +        for file in filestopublish:
255 +            common.logger.debug("--->>> LFN of file to publish =  " + str(file['LFN']))
256          ### if all files of FJR have number of events = 0
257          if (len(filestopublish) == 0):
258 <           return None
259 <          
258 >            return None
259 >
260          #// DBS to contact
261          dbswriter = DBSWriter(self.DBSURL)
262          # insert files
263          Blocks=None
264          try:
265              Blocks=dbswriter.insertFiles(jobReport)
266 <            common.logger.message("Blocks = %s"%Blocks)
266 >            common.logger.debug("--->>> Inserting file in blocks = %s"%Blocks)
267          except DBSWriterError, ex:
268 <            common.logger.message("Insert file error: %s"%ex)
268 >            common.logger.debug("--->>> Insert file error: %s"%ex)
269          return Blocks
270  
271      def run(self):
272          """
273          parse of all xml file on res dir and creation of distionary
274          """
275 <        
275 >
276          file_list = glob.glob(self.resDir+"crab_fjr*.xml")
277 <        common.logger.debug(6, "file_list = "+str(file_list))
278 <        common.logger.debug(6, "len(file_list) = "+str(len(file_list)))
279 <            
277 >
278 >        ## Select only those fjr that are succesfull
279 >        if (len(file_list)==0):
280 >            common.logger.info("--->>> "+self.resDir+" empty: no file to publish on DBS")
281 >            self.exit_status = '1'
282 >            return self.exit_status
283 >
284 >        good_list=[]
285 >        for fjr in file_list:
286 >            reports = readJobReport(fjr)
287 >            if len(reports)>0:
288 >               if reports[0].status == "Success":
289 >                  good_list.append(fjr)
290 >        file_list=good_list
291 >        ##
292 >        common.logger.log(10-1, "file_list = "+str(file_list))
293 >        common.logger.log(10-1, "len(file_list) = "+str(len(file_list)))
294 >
295          if (len(file_list)>0):
296              BlocksList=[]
297 <            common.logger.message("--->>> Start dataset publication")
297 >            common.logger.info("--->>> Start dataset publication")
298              self.exit_status=self.publishDataset(file_list[0])
299              if (self.exit_status == '1'):
300 <                return self.exit_status
301 <            common.logger.message("--->>> End dataset publication")
300 >                return self.exit_status
301 >            common.logger.info("--->>> End dataset publication")
302  
303  
304 <            common.logger.message("--->>> Start files publication")
304 >            common.logger.info("--->>> Start files publication")
305              for file in file_list:
238                common.logger.message("file = "+file)
306                  Blocks=self.publishAJobReport(file,self.processedData)
307                  if Blocks:
308                      for x in Blocks: # do not allow multiple entries of the same block
309                          if x not in BlocksList:
310                             BlocksList.append(x)
311 <                    
311 >
312              # close the blocks
313 <            common.logger.debug(6, "BlocksList = %s"%BlocksList)
313 >            common.logger.log(10-1, "BlocksList = %s"%BlocksList)
314              # dbswriter = DBSWriter(self.DBSURL,level='ERROR')
315              dbswriter = DBSWriter(self.DBSURL)
316 <            
316 >
317              for BlockName in BlocksList:
318 <                try:  
318 >                try:
319                      closeBlock=dbswriter.manageFileBlock(BlockName,maxFiles= 1)
320 <                    common.logger.debug(6, "closeBlock %s"%closeBlock)
320 >                    common.logger.log(10-1, "closeBlock %s"%closeBlock)
321                      #dbswriter.dbs.closeBlock(BlockName)
322                  except DBSWriterError, ex:
323 <                    common.logger.message("Close block error %s"%ex)
323 >                    common.logger.info("Close block error %s"%ex)
324  
325              if (len(self.noEventsFiles)>0):
326 <                common.logger.message("--->>> WARNING: "+str(len(self.noEventsFiles))+" files not published because they contain 0 events are:")
326 >                common.logger.info("--->>> WARNING: "+str(len(self.noEventsFiles))+" files not published because they contain 0 events are:")
327                  for lfn in self.noEventsFiles:
328 <                    common.logger.message("------ LFN: %s"%lfn)
328 >                    common.logger.info("------ LFN: %s"%lfn)
329              if (len(self.noLFN)>0):
330 <                common.logger.message("--->>> WARNING: there are "+str(len(self.noLFN))+" files not published because they have empty LFN")
330 >                common.logger.info("--->>> WARNING: there are "+str(len(self.noLFN))+" files not published because they have empty LFN")
331                  for pfn in self.noLFN:
332 <                    common.logger.message("------ pfn: %s"%pfn)
332 >                    common.logger.info("------ pfn: %s"%pfn)
333              if (len(self.problemFiles)>0):
334 <                common.logger.message("--->>> WARNING: "+str(len(self.problemFiles))+" files not published because they had problem with copy to SE")
334 >                common.logger.info("--->>> WARNING: "+str(len(self.problemFiles))+" files not published because they had problem with copy to SE")
335                  for lfn in self.problemFiles:
336 <                    common.logger.message("------ LFN: %s"%lfn)
337 <            common.logger.message("--->>> End files publication")
338 <            common.logger.message("--->>> To check data publication please use: InspectDBS2.py --DBSURL=<dbs_url_for_publication> --datasetPath=<User Dataset Name>")
336 >                    common.logger.info("------ LFN: %s"%lfn)
337 >            common.logger.info("--->>> End files publication")
338 >
339 >            self.cfg_params['USER.dataset_to_check']=self.dataset_to_check
340 >            from InspectDBS import InspectDBS
341 >            check=InspectDBS(self.cfg_params)
342 >            check.checkPublication()
343              return self.exit_status
344  
345          else:
346 <            common.logger.message("--->>> "+self.resDir+" empty: no file to publish on DBS")
346 >            common.logger.info("--->>> No valid files to publish on DBS. Your jobs do not report exit codes = 0")
347              self.exit_status = '1'
348              return self.exit_status
349 <    
349 >

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines