ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Publisher.py
(Generate patch)

Comparing COMP/CRAB/python/Publisher.py (file contents):
Revision 1.1 by slacapra, Fri Nov 16 11:09:31 2007 UTC vs.
Revision 1.17 by afanfani, Wed Oct 15 09:28:21 2008 UTC

# Line 1 | Line 1
1 < import sys, getopt, string
1 > import getopt, string
2   import common
3   import time, glob
4   from Actor import *
5 from FwkJobRep.ReportParser import readJobReport
5   from crab_util import *
6   from crab_logger import Logger
7   from crab_exceptions import *
8 + from ProdCommon.FwkJobRep.ReportParser import readJobReport
9   from ProdCommon.MCPayloads.WorkflowSpec import WorkflowSpec
10   from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter
11   from ProdCommon.DataMgmt.DBS.DBSErrors import DBSWriterError, formatEx,DBSReaderError
12   from ProdCommon.DataMgmt.DBS.DBSReader import DBSReader
13 < import ProdCommon.DataMgmt.DBS.DBSWriterObjects as DBSWriterObjects
14 <
15 <
13 > from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter,DBSWriterObjects
14 > import sys
15  
16   class Publisher(Actor):
17      def __init__(self, cfg_params):
# Line 25 | Line 24 | class Publisher(Actor):
24          """
25  
26          try:
27 <            self.processedData = cfg_params['USER.publish_data_name']
27 >            userprocessedData = cfg_params['USER.publish_data_name']
28 >            self.processedData = None
29          except KeyError:
30              raise CrabException('Cannot publish output data, because you did not specify USER.publish_data_name parameter in the crab.cfg file')
31  
# Line 33 | Line 33 | class Publisher(Actor):
33              if (int(cfg_params['USER.copy_data']) != 1): raise KeyError
34          except KeyError:
35              raise CrabException('You can not publish data because you did not selected *** copy_data = 1  *** in the crab.cfg file')
36 <
37 <        common.logger.message('self.processedData = '+self.processedData)
36 >        try:
37 >            self.pset = cfg_params['CMSSW.pset']
38 >        except KeyError:
39 >            raise CrabException('Cannot publish output data, because you did not specify the psetname in [CMSSW] of your crab.cfg file')
40 >        try:
41 >            self.globalDBS=cfg_params['CMSSW.dbs_url']
42 >        except KeyError:
43 >            self.globalDBS="http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet"
44 >        try:
45 >            self.DBSURL=cfg_params['USER.dbs_url_for_publication']
46 >            common.logger.message('<dbs_url_for_publication> = '+self.DBSURL)
47 >            if (self.DBSURL == "http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet") or (self.DBSURL == "https://cmsdbsprod.cern.ch:8443/cms_dbs_prod_global_writer/servlet/DBSServlet"):
48 >                msg = "You can not publish your data in the globalDBS = " + self.DBSURL + "\n"
49 >                msg = msg + "Please write your local one in the [USER] section 'dbs_url_for_publication'"
50 >                raise CrabException(msg)
51 >        except KeyError:
52 >            msg = "Warning. The [USER] section does not have 'dbs_url_for_publication'"
53 >            msg = msg + " entry, necessary to publish the data.\n"
54 >            msg = msg + "Use the command **crab -publish -USER.dbs_url_for_publication=dbs_url_for_publication*** \nwhere dbs_url_for_publication is your local dbs instance."
55 >            raise CrabException(msg)
56 >            
57 >        self.content=file(self.pset).read()
58          self.resDir = common.work_space.resDir()
59 <        common.logger.message('self.resDir = '+self.resDir)
60 <        #old api self.DBSURL='http://cmssrv18.fnal.gov:8989/DBS/servlet/DBSServlet'
61 <        self.DBSURL=cfg_params['USER.dbs_url_for_publication']
42 <        #self.DBSURL='http://cmssrv17.fnal.gov:8989/DBS_1_0_4_pre2/servlet/DBSServlet'
43 <        common.logger.message('self.DBSURL = '+self.DBSURL)
59 >        
60 >        self.dataset_to_import=[]
61 >        
62          self.datasetpath=cfg_params['CMSSW.datasetpath']
63 <        common.logger.message('self.datasetpath = '+self.datasetpath)
63 >        if (self.datasetpath.upper() != 'NONE'):
64 >            self.dataset_to_import.append(self.datasetpath)
65 >        
66 >        ### Added PU dataset
67 >        tmp = cfg_params.get('CMSSW.dataset_pu',None)
68 >        if tmp :
69 >            datasets = tmp.split(',')
70 >            for dataset in datasets:
71 >                dataset=string.strip(dataset)
72 >                self.dataset_to_import.append(dataset)
73 >        ###        
74 >                
75          self.SEName=''
76          self.CMSSW_VERSION=''
77          self.exit_status=''
78          self.time = time.strftime('%y%m%d_%H%M%S',time.localtime(time.time()))
79 +        self.problemFiles=[]  
80 +        self.noEventsFiles=[]
81 +        self.noLFN=[]
82          
51    
83      def importParentDataset(self,globalDBS, datasetpath):
84          """
85          """
86          dbsWriter = DBSWriter(self.DBSURL,level='ERROR')
87          
88          try:
89 <            common.logger.message("----->>>>importing parent dataset in the local dbs")
59 <            dbsWriter.importDataset(globalDBS, self.datasetpath, self.DBSURL)
89 >            dbsWriter.importDatasetWithoutParentage(globalDBS, self.datasetpath, self.DBSURL)
90          except DBSWriterError, ex:
91              msg = "Error importing dataset to be processed into local DBS\n"
92              msg += "Source Dataset: %s\n" % datasetpath
# Line 71 | Line 101 | class Publisher(Actor):
101          """
102          try:
103              jobReport = readJobReport(file)[0]
74            msg = "--->>> reading "+file+" file"  
75            common.logger.message(msg)
104              self.exit_status = '0'
105          except IndexError:
106              self.exit_status = '1'
107              msg = "Error: Problem with "+file+" file"  
108              common.logger.message(msg)
109              return self.exit_status
110 <        #####for parents information import #########################################
111 <        #### the globalDBS has to be written in the crab cfg file!!!!! ###############
112 <        globalDBS="http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet"
113 <        if (self.datasetpath != 'None'):
114 <            status_import=self.importParentDataset(globalDBS, self.datasetpath)
115 <            if (status_import == 1):
116 <                common.logger.message('Problem with parent import from the global DBS '+globalDBS+ 'to the local one '+self.DBSURL)
117 <                self.exit_status='1'
118 <                ###############################################################################
119 <                ##  ___ >>>>>>>  comment out the next line, if you have problem with the import
120 <                ###############################################################################
121 <                return self.exit_status
94 <            pass
110 >
111 >        if (len(self.dataset_to_import) != 0):
112 >           for dataset in self.dataset_to_import:
113 >               common.logger.message("--->>> Importing parent dataset in the dbs: " +dataset)
114 >               status_import=self.importParentDataset(self.globalDBS, dataset)
115 >               if (status_import == 1):
116 >                   common.logger.message('Problem with parent '+ dataset +' import from the global DBS '+self.globalDBS+ 'to the local one '+self.DBSURL)
117 >                   self.exit_status='1'
118 >                   return self.exit_status
119 >               else:    
120 >                   common.logger.message('Import ok of dataset '+dataset)
121 >            
122          #// DBS to contact
123 <        dbswriter = DBSWriter(self.DBSURL,level='ERROR')                        
97 <        # publish a dataset : it should be done only once for the task
98 <        #                     and not for all the JobReport
123 >        dbswriter = DBSWriter(self.DBSURL)                        
124          try:  
125              fileinfo= jobReport.files[0]
126              self.exit_status = '0'
# Line 105 | Line 130 | class Publisher(Actor):
130              common.logger.message(msg)
131              return self.exit_status
132  
108        common.logger.message("FileInfo = " + str(fileinfo))
133          datasets=fileinfo.dataset
134 <        common.logger.message("DatasetInfo = " + str(datasets))
134 >        common.logger.debug(6,"FileInfo = " + str(fileinfo))
135 >        common.logger.debug(6,"DatasetInfo = " + str(datasets))
136          for dataset in datasets:
137 <            #### to understand how to fill cfgMeta info ###############
138 <            cfgMeta = {'name' : 'usercfg' , 'Type' : 'user' , 'annotation': 'user cfg', 'version' : 'private version'} # add real name of user cfg
137 >            #### for production data
138 >            self.processedData = dataset['ProcessedDataset']
139 >            if (dataset['PrimaryDataset'] == 'null'):
140 >                dataset['PrimaryDataset'] = dataset['ProcessedDataset']
141 >            else: # add parentage from input dataset
142 >                dataset['ParentDataset']= self.datasetpath
143 >    
144 >            dataset['PSetContent']=self.content
145 >            cfgMeta = {'name' : self.pset , 'Type' : 'user' , 'annotation': 'user cfg', 'version' : 'private version'} # add real name of user cfg
146              common.logger.message("PrimaryDataset = %s"%dataset['PrimaryDataset'])
147              common.logger.message("ProcessedDataset = %s"%dataset['ProcessedDataset'])
148 <            common.logger.message("Inserting primary: %s processed : %s"%(dataset['PrimaryDataset'],dataset['ProcessedDataset']))
148 >            common.logger.message("<User Dataset Name> = /"+dataset['PrimaryDataset']+"/"+dataset['ProcessedDataset']+"/USER")
149 >            
150 >            common.logger.debug(6,"--->>> Inserting primary: %s processed : %s"%(dataset['PrimaryDataset'],dataset['ProcessedDataset']))
151              
152              primary = DBSWriterObjects.createPrimaryDataset( dataset, dbswriter.dbs)
153 +            common.logger.debug(6,"Primary:  %s "%primary)
154              
155              algo = DBSWriterObjects.createAlgorithm(dataset, cfgMeta, dbswriter.dbs)
156 +            common.logger.debug(6,"Algo:  %s "%algo)
157  
158              processed = DBSWriterObjects.createProcessedDataset(primary, algo, dataset, dbswriter.dbs)
159 +            common.logger.debug(6,"Processed:  %s "%processed)
160 +            
161 +            common.logger.debug(6,"Inserted primary %s processed %s"%(primary,processed))
162              
163 <            common.logger.message("Inserted primary %s processed %s"%(primary,processed))
163 >        common.logger.debug(6,"exit_status = %s "%self.exit_status)
164          return self.exit_status    
165  
166      def publishAJobReport(self,file,procdataset):
167          """
168 +           input:  xml file, processedDataset
169          """
170          try:
171              jobReport = readJobReport(file)[0]
# Line 134 | Line 174 | class Publisher(Actor):
174              self.exit_status = '1'
175              msg = "Error: Problem with "+file+" file"
176              raise CrabException(msg)
177 <
178 <        # overwite ProcessedDataset with user defined value
177 >        ### overwrite ProcessedDataset with user defined value
178 >        ### overwrite lumisections with no value
179 >        ### skip publication for 0 events files
180 >        filestopublish=[]
181          for file in jobReport.files:
182 <            for ds in file.dataset:
183 <                ds['ProcessedDataset']=procdataset
182 >            #### added check for problem with copy to SE and empty lfn
183 >            if (string.find(file['LFN'], 'copy_problems') != -1):
184 >                self.problemFiles.append(file['LFN'])
185 >            elif (file['LFN'] == ''):
186 >                self.noLFN.append(file['PFN'])
187 >            else:
188 >                if int(file['TotalEvents']) != 0 :
189 >                    #file.lumisections = {}
190 >                    # lumi info are now in run hash
191 >                    file.runs = {}
192 >                    for ds in file.dataset:
193 >                        ### FEDE FOR NEW LFN ###
194 >                        #ds['ProcessedDataset']=procdataset
195 >                        ########################
196 >                        ### Fede for production
197 >                        if (ds['PrimaryDataset'] == 'null'):
198 >                            ds['PrimaryDataset']=procdataset
199 >                    filestopublish.append(file)
200 >                else:
201 >                    self.noEventsFiles.append(file['LFN'])
202 >        jobReport.files = filestopublish
203 >        ### if all files of FJR have number of events = 0
204 >        if (len(filestopublish) == 0):
205 >           return None
206 >          
207          #// DBS to contact
208 <        dbswriter = DBSWriter(self.DBSURL,level='ERROR')
208 >        dbswriter = DBSWriter(self.DBSURL)
209          # insert files
210          Blocks=None
211          try:
212              Blocks=dbswriter.insertFiles(jobReport)
213 <            common.logger.message("------>>>> Blocks = %s"%Blocks)
213 >            common.logger.message("Blocks = %s"%Blocks)
214          except DBSWriterError, ex:
215 <            common.logger.message("insert file error: %s"%ex)
215 >            common.logger.message("Insert file error: %s"%ex)
216          return Blocks
217  
218      def run(self):
219          """
220          parse of all xml file on res dir and creation of distionary
221          """
222 <        common.logger.message("Starting data publish")
222 >        
223          file_list = glob.glob(self.resDir+"crab_fjr*.xml")
224          common.logger.debug(6, "file_list = "+str(file_list))
225          common.logger.debug(6, "len(file_list) = "+str(len(file_list)))
226 <        # FIXME:
162 <        #  do the dataset publication self.publishDataset here
163 <        #
226 >            
227          if (len(file_list)>0):
228              BlocksList=[]
229 +            common.logger.message("--->>> Start dataset publication")
230 +            self.exit_status=self.publishDataset(file_list[0])
231 +            if (self.exit_status == '1'):
232 +                return self.exit_status
233 +            common.logger.message("--->>> End dataset publication")
234 +
235 +
236 +            common.logger.message("--->>> Start files publication")
237              for file in file_list:
238                  common.logger.message("file = "+file)
168                common.logger.message("Publishing dataset")
169                self.exit_status=self.publishDataset(file)
170                if (self.exit_status == '1'):
171                    return self.exit_status
172                common.logger.message("Publishing files")
239                  Blocks=self.publishAJobReport(file,self.processedData)
240                  if Blocks:
241 <                    [BlocksList.append(x) for x in Blocks]
241 >                    for x in Blocks: # do not allow multiple entries of the same block
242 >                        if x not in BlocksList:
243 >                           BlocksList.append(x)
244 >                    
245              # close the blocks
246 <            common.logger.message("------>>>> BlocksList = %s"%BlocksList)
247 <            dbswriter = DBSWriter(self.DBSURL,level='ERROR')
246 >            common.logger.debug(6, "BlocksList = %s"%BlocksList)
247 >            # dbswriter = DBSWriter(self.DBSURL,level='ERROR')
248 >            dbswriter = DBSWriter(self.DBSURL)
249 >            
250              for BlockName in BlocksList:
251                  try:  
252                      closeBlock=dbswriter.manageFileBlock(BlockName,maxFiles= 1)
253 <                    common.logger.message("------>>>> closeBlock %s"%closeBlock)
253 >                    common.logger.debug(6, "closeBlock %s"%closeBlock)
254                      #dbswriter.dbs.closeBlock(BlockName)
255                  except DBSWriterError, ex:
256 <                    common.logger.message("------>>>> close block error %s"%ex)
256 >                    common.logger.message("Close block error %s"%ex)
257  
258 +            if (len(self.noEventsFiles)>0):
259 +                common.logger.message("--->>> WARNING: "+str(len(self.noEventsFiles))+" files not published because they contain 0 events are:")
260 +                for lfn in self.noEventsFiles:
261 +                    common.logger.message("------ LFN: %s"%lfn)
262 +            if (len(self.noLFN)>0):
263 +                common.logger.message("--->>> WARNING: there are "+str(len(self.noLFN))+" files not published because they have empty LFN")
264 +                for pfn in self.noLFN:
265 +                    common.logger.message("------ pfn: %s"%pfn)
266 +            if (len(self.problemFiles)>0):
267 +                common.logger.message("--->>> WARNING: "+str(len(self.problemFiles))+" files not published because they had problem with copy to SE")
268 +                for lfn in self.problemFiles:
269 +                    common.logger.message("------ LFN: %s"%lfn)
270 +            common.logger.message("--->>> End files publication")
271 +            common.logger.message("--->>> To check data publication please use: InspectDBS2.py --DBSURL=<dbs_url_for_publication> --datasetPath=<User Dataset Name>")
272              return self.exit_status
273  
274          else:
275 <            common.logger.message(self.resDir+" empty --> No file to publish on DBS/DLS")
275 >            common.logger.message("--->>> "+self.resDir+" empty: no file to publish on DBS")
276              self.exit_status = '1'
277              return self.exit_status
278      

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines