ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Publisher.py
(Generate patch)

Comparing COMP/CRAB/python/Publisher.py (file contents):
Revision 1.1 by slacapra, Fri Nov 16 11:09:31 2007 UTC vs.
Revision 1.2 by fanzago, Wed Nov 21 14:18:17 2007 UTC

# Line 1 | Line 1
1 < import sys, getopt, string
1 > import getopt, string
2   import common
3   import time, glob
4   from Actor import *
5 from FwkJobRep.ReportParser import readJobReport
5   from crab_util import *
6   from crab_logger import Logger
7   from crab_exceptions import *
8 + from FwkJobRep.ReportParser import readJobReport
9   from ProdCommon.MCPayloads.WorkflowSpec import WorkflowSpec
10   from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter
11   from ProdCommon.DataMgmt.DBS.DBSErrors import DBSWriterError, formatEx,DBSReaderError
12   from ProdCommon.DataMgmt.DBS.DBSReader import DBSReader
13 < import ProdCommon.DataMgmt.DBS.DBSWriterObjects as DBSWriterObjects
14 <
15 <
13 > from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter,DBSWriterObjects
14 > import sys
15  
16   class Publisher(Actor):
17      def __init__(self, cfg_params):
# Line 34 | Line 33 | class Publisher(Actor):
33          except KeyError:
34              raise CrabException('You can not publish data because you did not selected *** copy_data = 1  *** in the crab.cfg file')
35  
36 <        common.logger.message('self.processedData = '+self.processedData)
36 >        #common.logger.message('processedData = '+self.processedData)
37          self.resDir = common.work_space.resDir()
38 <        common.logger.message('self.resDir = '+self.resDir)
40 <        #old api self.DBSURL='http://cmssrv18.fnal.gov:8989/DBS/servlet/DBSServlet'
38 >        #common.logger.message('resDir = '+self.resDir)
39          self.DBSURL=cfg_params['USER.dbs_url_for_publication']
40 <        #self.DBSURL='http://cmssrv17.fnal.gov:8989/DBS_1_0_4_pre2/servlet/DBSServlet'
43 <        common.logger.message('self.DBSURL = '+self.DBSURL)
40 >        common.logger.message('dbs url = '+self.DBSURL)
41          self.datasetpath=cfg_params['CMSSW.datasetpath']
42 <        common.logger.message('self.datasetpath = '+self.datasetpath)
42 >        #common.logger.message('datasetpath = '+self.datasetpath)
43          self.SEName=''
44          self.CMSSW_VERSION=''
45          self.exit_status=''
46          self.time = time.strftime('%y%m%d_%H%M%S',time.localtime(time.time()))
47          
51    
48      def importParentDataset(self,globalDBS, datasetpath):
49          """
50          """
51          dbsWriter = DBSWriter(self.DBSURL,level='ERROR')
52          
53          try:
58            common.logger.message("----->>>>importing parent dataset in the local dbs")
54              dbsWriter.importDataset(globalDBS, self.datasetpath, self.DBSURL)
55          except DBSWriterError, ex:
56              msg = "Error importing dataset to be processed into local DBS\n"
# Line 71 | Line 66 | class Publisher(Actor):
66          """
67          try:
68              jobReport = readJobReport(file)[0]
74            msg = "--->>> reading "+file+" file"  
75            common.logger.message(msg)
69              self.exit_status = '0'
70          except IndexError:
71              self.exit_status = '1'
72              msg = "Error: Problem with "+file+" file"  
73              common.logger.message(msg)
74              return self.exit_status
75 <        #####for parents information import #########################################
75 >            
76          #### the globalDBS has to be written in the crab cfg file!!!!! ###############
84        globalDBS="http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet"
77          if (self.datasetpath != 'None'):
78 +            globalDBS="http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet"
79 +            common.logger.message("--->>> Importing parent dataset in the dbs")
80              status_import=self.importParentDataset(globalDBS, self.datasetpath)
81              if (status_import == 1):
82                  common.logger.message('Problem with parent import from the global DBS '+globalDBS+ 'to the local one '+self.DBSURL)
83                  self.exit_status='1'
90                ###############################################################################
84                  ##  ___ >>>>>>>  comment out the next line, if you have problem with the import
92                ###############################################################################
85                  return self.exit_status
86 <            pass
86 >            common.logger.message("Parent import ok")
87 >            
88          #// DBS to contact
89 <        dbswriter = DBSWriter(self.DBSURL,level='ERROR')                        
97 <        # publish a dataset : it should be done only once for the task
98 <        #                     and not for all the JobReport
89 >        dbswriter = DBSWriter(self.DBSURL)                        
90          try:  
91              fileinfo= jobReport.files[0]
92 +            #fileinfo.lumisections = {}
93              self.exit_status = '0'
94          except IndexError:
95              self.exit_status = '1'
# Line 105 | Line 97 | class Publisher(Actor):
97              common.logger.message(msg)
98              return self.exit_status
99  
100 <        common.logger.message("FileInfo = " + str(fileinfo))
100 >        #common.logger.message("FileInfo = " + str(fileinfo))
101          datasets=fileinfo.dataset
102 <        common.logger.message("DatasetInfo = " + str(datasets))
102 >        #common.logger.message("DatasetInfo = " + str(datasets))
103          for dataset in datasets:
104              #### to understand how to fill cfgMeta info ###############
105              cfgMeta = {'name' : 'usercfg' , 'Type' : 'user' , 'annotation': 'user cfg', 'version' : 'private version'} # add real name of user cfg
106              common.logger.message("PrimaryDataset = %s"%dataset['PrimaryDataset'])
107              common.logger.message("ProcessedDataset = %s"%dataset['ProcessedDataset'])
108 <            common.logger.message("Inserting primary: %s processed : %s"%(dataset['PrimaryDataset'],dataset['ProcessedDataset']))
108 >            common.logger.message("--->>> Inserting primary: %s processed : %s"%(dataset['PrimaryDataset'],dataset['ProcessedDataset']))
109 >            #common.logger.message("dataset: %s"%dataset)
110              
111              primary = DBSWriterObjects.createPrimaryDataset( dataset, dbswriter.dbs)
112 +            common.logger.message("Primary:  %s "%primary)
113              
114              algo = DBSWriterObjects.createAlgorithm(dataset, cfgMeta, dbswriter.dbs)
115 +            common.logger.message("Algo:  %s "%algo)
116  
117              processed = DBSWriterObjects.createProcessedDataset(primary, algo, dataset, dbswriter.dbs)
118 +            common.logger.message("Processed:  %s "%processed)
119              
120              common.logger.message("Inserted primary %s processed %s"%(primary,processed))
121 +            
122 +        #common.logger.message("exit_status = %s "%self.exit_status)
123          return self.exit_status    
124  
125      def publishAJobReport(self,file,procdataset):
126          """
127 +           input:  xml file, processedDataset
128          """
129          try:
130              jobReport = readJobReport(file)[0]
# Line 134 | Line 133 | class Publisher(Actor):
133              self.exit_status = '1'
134              msg = "Error: Problem with "+file+" file"
135              raise CrabException(msg)
136 <
137 <        # overwite ProcessedDataset with user defined value
136 >        # overwrite ProcessedDataset with user defined value
137 >        # overwrite lumisections with no value
138          for file in jobReport.files:
139 +            file.lumisections = {}
140              for ds in file.dataset:
141                  ds['ProcessedDataset']=procdataset
142          #// DBS to contact
143 <        dbswriter = DBSWriter(self.DBSURL,level='ERROR')
143 >        dbswriter = DBSWriter(self.DBSURL)
144          # insert files
145          Blocks=None
146          try:
147              Blocks=dbswriter.insertFiles(jobReport)
148 <            common.logger.message("------>>>> Blocks = %s"%Blocks)
148 >            common.logger.message("Blocks = %s"%Blocks)
149          except DBSWriterError, ex:
150 <            common.logger.message("insert file error: %s"%ex)
150 >            common.logger.message("Insert file error: %s"%ex)
151          return Blocks
152  
153      def run(self):
154          """
155          parse of all xml file on res dir and creation of distionary
156          """
157 <        common.logger.message("Starting data publish")
157 >        
158          file_list = glob.glob(self.resDir+"crab_fjr*.xml")
159          common.logger.debug(6, "file_list = "+str(file_list))
160          common.logger.debug(6, "len(file_list) = "+str(len(file_list)))
161 <        # FIXME:
162 <        #  do the dataset publication self.publishDataset here
163 <        #
161 >            
162          if (len(file_list)>0):
163              BlocksList=[]
164 +            common.logger.message("--->>> Start dataset publication")
165 +            self.exit_status=self.publishDataset(file_list[0])
166 +            if (self.exit_status == '1'):
167 +                return self.exit_status
168 +            common.logger.message("--->>> End dataset publication")
169 +
170 +
171 +            common.logger.message("--->>> Start files publication")
172              for file in file_list:
173                  common.logger.message("file = "+file)
168                common.logger.message("Publishing dataset")
169                self.exit_status=self.publishDataset(file)
170                if (self.exit_status == '1'):
171                    return self.exit_status
172                common.logger.message("Publishing files")
174                  Blocks=self.publishAJobReport(file,self.processedData)
175                  if Blocks:
176                      [BlocksList.append(x) for x in Blocks]
177 +                    
178              # close the blocks
179 <            common.logger.message("------>>>> BlocksList = %s"%BlocksList)
180 <            dbswriter = DBSWriter(self.DBSURL,level='ERROR')
179 >            common.logger.message("BlocksList = %s"%BlocksList)
180 >            # dbswriter = DBSWriter(self.DBSURL,level='ERROR')
181 >            dbswriter = DBSWriter(self.DBSURL)
182 >            
183              for BlockName in BlocksList:
184                  try:  
185                      closeBlock=dbswriter.manageFileBlock(BlockName,maxFiles= 1)
186 <                    common.logger.message("------>>>> closeBlock %s"%closeBlock)
186 >                    common.logger.message("closeBlock %s"%closeBlock)
187                      #dbswriter.dbs.closeBlock(BlockName)
188                  except DBSWriterError, ex:
189 <                    common.logger.message("------>>>> close block error %s"%ex)
190 <
189 >                    common.logger.message("Close block error %s"%ex)
190 >            common.logger.message("--->>> End files publication")
191              return self.exit_status
192  
193          else:
194 <            common.logger.message(self.resDir+" empty --> No file to publish on DBS/DLS")
194 >            common.logger.message("--->>> "+self.resDir+" empty: no file to publish on DBS")
195              self.exit_status = '1'
196              return self.exit_status
197      

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines