ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Publisher.py
(Generate patch)

Comparing COMP/CRAB/python/Publisher.py (file contents):
Revision 1.37 by fanzago, Wed Jun 24 16:46:25 2009 UTC vs.
Revision 1.51 by fanzago, Tue Jan 4 14:03:06 2011 UTC

# Line 12 | Line 12 | from ProdCommon.DataMgmt.DBS.DBSErrors i
12   from ProdCommon.DataMgmt.DBS.DBSReader import DBSReader
13   from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter,DBSWriterObjects
14   import sys
15 from DBSAPI.dbsApiException import DbsException
15   from DBSAPI.dbsApi import DbsApi
16 + from DBSAPI.dbsMigrateApi import DbsMigrateApi
17 + from DBSAPI.dbsApiException import *
18  
19   class Publisher(Actor):
20      def __init__(self, cfg_params):
21          """
22 <        Publisher class:
22 >        Publisher class:
23  
24          - parses CRAB FrameworkJobReport on UI
25          - returns <file> section of xml in dictionary format for each xml file in crab_0_xxxx/res directory
# Line 26 | Line 27 | class Publisher(Actor):
27          """
28  
29          self.cfg_params=cfg_params
30 +        self.fjrDirectory = cfg_params.get('USER.outputdir' ,
31 +                                           common.work_space.resDir()) + '/'
32        
33          if not cfg_params.has_key('USER.publish_data_name'):
34              raise CrabException('Cannot publish output data, because you did not specify USER.publish_data_name parameter in the crab.cfg file')
35 <        self.userprocessedData = cfg_params['USER.publish_data_name']
35 >        self.userprocessedData = cfg_params['USER.publish_data_name']
36          self.processedData = None
37  
38          if (not cfg_params.has_key('USER.copy_data') or int(cfg_params['USER.copy_data']) != 1 ) or \
# Line 75 | Line 78 | class Publisher(Actor):
78                  self.dataset_to_import.append(dataset)
79          ###        
80              
81 <        self.import_all_parents = cfg_params.get('USER.publish_with_import_all_parents',0)
82 <        self.skipOcheck=cfg_params.get('CMSSW.publish_zero_event',0)
83 <    
81 >        self.import_all_parents = cfg_params.get('USER.publish_with_import_all_parents',1)
82 >        
83 >        if ( int(self.import_all_parents) == 0 ):
84 >            common.logger.info("WARNING: The option USER.publish_with_import_all_parents=0 has been deprecated. The import of parents is compulsory and done by default")
85 >        self.skipOcheck=cfg_params.get('CMSSW.publish_zero_event',1)
86 >        if ( int(self.skipOcheck) == 0 ):
87 >            common.logger.info("WARNING: The option CMSSW.publish_zero_event has been deprecated. The publication is done by default also for files with 0 events")
88          self.SEName=''
89          self.CMSSW_VERSION=''
90          self.exit_status=''
91          self.time = time.strftime('%y%m%d_%H%M%S',time.localtime(time.time()))
92 <        self.problemFiles=[]  
92 >        self.problemFiles=[]
93          self.noEventsFiles=[]
94          self.noLFN=[]
95 <        
95 >
96      def importParentDataset(self,globalDBS, datasetpath):
97          """
98 +           WARNING: it works only with DBS_2_0_9_patch_6
99          """
100 <        dbsWriter = DBSWriter(self.DBSURL,level='ERROR')
101 <        
100 >
101 >        args={'url':globalDBS}
102          try:
103 <            if (self.import_all_parents=='1'):
104 <                common.logger.info("--->>> Importing all parents level")
105 <                dbsWriter.importDataset(globalDBS, datasetpath, self.DBSURL)
106 <            else:
107 <                common.logger.info("--->>> Importing only the datasetpath " + datasetpath)
108 <                dbsWriter.importDatasetWithoutParentage(globalDBS, datasetpath, self.DBSURL)
109 <        except DBSWriterError, ex:
103 >            api_reader = DbsApi(args)
104 >        except DbsApiException, ex:
105 >            msg = "%s\n" % formatEx(ex)
106 >            raise CrabException(msg)
107 >
108 >        args={'url':self.DBSURL}
109 >        try:
110 >            api_writer = DbsApi(args)
111 >        except DbsApiException, ex:
112 >            msg = "%s\n" % formatEx(ex)
113 >            raise CrabException(msg)
114 >
115 >        try:
116 >            common.logger.info("--->>> Importing all parents level")
117 >            start = time.time()
118 >            common.logger.debug("start import parents time: " + str(start))
119 >            for block in api_reader.listBlocks(datasetpath):
120 >                if (str(block['OpenForWriting']) != '1'):
121 >                    api_writer.dbsMigrateBlock(globalDBS,self.DBSURL,block['Name'] )
122 >                else:
123 >                    common.logger.debug("Skipping the import of " + block['Name'] + " it is an open block")
124 >                    continue
125 >                ################
126 >            stop = time.time()
127 >            common.logger.debug("stop import parents time: " + str(stop))
128 >            common.logger.info("--->>> duration of all parents import (sec): "+str(stop - start))
129 >        except DbsApiException, ex:
130              msg = "Error importing dataset to be processed into local DBS\n"
131              msg += "Source Dataset: %s\n" % datasetpath
132              msg += "Source DBS: %s\n" % globalDBS
133              msg += "Destination DBS: %s\n" % self.DBSURL
134              common.logger.info(msg)
135 <            common.logger.debug(str(ex))
135 >            common.logger.info(str(ex))
136              return 1
137          return 0
138 <        """
111 <        print " patch for importParentDataset: datasetpath = ", datasetpath
112 <        try:
113 <            args={}
114 <            args['url']=self.DBSURL
115 <            args['mode']='POST'
116 <            block = ""
117 <            api = DbsApi(args)
118 <            #api.migrateDatasetContents(srcURL, dstURL, path, block , False)
119 <            api.migrateDatasetContents(globalDBS, self.DBSURL, datasetpath, block , False)
120 <
121 <        except DbsException, ex:
122 <            print "Caught API Exception %s: %s "  % (ex.getClassName(), ex.getErrorMessage() )
123 <            if ex.getErrorCode() not in (None, ""):
124 <                print "DBS Exception Error Code: ", ex.getErrorCode()
125 <            return 1
126 <        print "Done"
127 <        return 0
128 <        """  
138 >
139      def publishDataset(self,file):
140          """
141          """
# Line 134 | Line 144 | class Publisher(Actor):
144              self.exit_status = '0'
145          except IndexError:
146              self.exit_status = '1'
147 <            msg = "Error: Problem with "+file+" file"  
147 >            msg = "Error: Problem with "+file+" file"
148              common.logger.info(msg)
149              return self.exit_status
150  
# Line 146 | Line 156 | class Publisher(Actor):
156                     common.logger.info('Problem with parent '+ dataset +' import from the global DBS '+self.globalDBS+ 'to the local one '+self.DBSURL)
157                     self.exit_status='1'
158                     return self.exit_status
159 <               else:    
159 >               else:
160                     common.logger.info('Import ok of dataset '+dataset)
161 <            
162 <        #// DBS to contact
163 <        dbswriter = DBSWriter(self.DBSURL)                        
154 <        try:  
155 <            fileinfo= jobReport.files[0]
156 <            self.exit_status = '0'
157 <        except IndexError:
161 >
162 >        
163 >        if (len(jobReport.files) <= 0) :
164              self.exit_status = '1'
165 <            msg = "Error: No file to publish in xml file"+file+" file"  
165 >            msg = "Error: No EDM file to publish in xml file"+file+" file"
166              common.logger.info(msg)
167              return self.exit_status
168 +        else:
169 +            msg = "fjr contains some files to publish"
170 +            common.logger.debug(msg)
171  
172 <        datasets=fileinfo.dataset
173 <        common.logger.log(10-1,"FileInfo = " + str(fileinfo))
174 <        common.logger.log(10-1,"DatasetInfo = " + str(datasets))
175 <        if len(datasets)<=0:
176 <           self.exit_status = '1'
168 <           msg = "Error: No info about dataset in the xml file "+file
169 <           common.logger.info(msg)
170 <           return self.exit_status
171 <        for dataset in datasets:
172 <            #### for production data
173 <            self.processedData = dataset['ProcessedDataset']
174 <            if (dataset['PrimaryDataset'] == 'null'):
175 <                #dataset['PrimaryDataset'] = dataset['ProcessedDataset']
176 <                dataset['PrimaryDataset'] = self.userprocessedData
177 <            #else: # add parentage from input dataset
178 <            elif self.datasetpath.upper() != 'NONE':
179 <                dataset['ParentDataset']= self.datasetpath
180 <    
181 <            dataset['PSetContent']=self.content
182 <            cfgMeta = {'name' : self.pset , 'Type' : 'user' , 'annotation': 'user cfg', 'version' : 'private version'} # add real name of user cfg
183 <            common.logger.info("PrimaryDataset = %s"%dataset['PrimaryDataset'])
184 <            common.logger.info("ProcessedDataset = %s"%dataset['ProcessedDataset'])
185 <            common.logger.info("<User Dataset Name> = /"+dataset['PrimaryDataset']+"/"+dataset['ProcessedDataset']+"/USER")
186 <            self.dataset_to_check="/"+dataset['PrimaryDataset']+"/"+dataset['ProcessedDataset']+"/USER"
187 <            
188 <            common.logger.log(10-1,"--->>> Inserting primary: %s processed : %s"%(dataset['PrimaryDataset'],dataset['ProcessedDataset']))
189 <            
190 <            primary = DBSWriterObjects.createPrimaryDataset( dataset, dbswriter.dbs)
191 <            common.logger.log(10-1,"Primary:  %s "%primary)
192 <            
193 <            algo = DBSWriterObjects.createAlgorithm(dataset, cfgMeta, dbswriter.dbs)
194 <            common.logger.log(10-1,"Algo:  %s "%algo)
172 >        #### datasets creation in dbs
173 >        #// DBS to contact write and read of the same dbs
174 >        dbsReader = DBSReader(self.DBSURL,level='ERROR')
175 >        dbswriter = DBSWriter(self.DBSURL)
176 >        #####
177  
178 <            processed = DBSWriterObjects.createProcessedDataset(primary, algo, dataset, dbswriter.dbs)
179 <            common.logger.log(10-1,"Processed:  %s "%processed)
180 <            
181 <            common.logger.log(10-1,"Inserted primary %s processed %s"%(primary,processed))
182 <            
178 >        self.published_datasets = []
179 >        for fileinfo in jobReport.files:
180 >            datasets_info=fileinfo.dataset
181 >            if len(datasets_info)<=0:
182 >                self.exit_status = '1'
183 >                msg = "Error: No info about dataset in the xml file "+file
184 >                common.logger.info(msg)
185 >                return self.exit_status
186 >            else:
187 >                for dataset in datasets_info:
188 >                    #### for production data
189 >                    self.processedData = dataset['ProcessedDataset']
190 >                    if (dataset['PrimaryDataset'] == 'null'):
191 >                        dataset['PrimaryDataset'] = self.userprocessedData
192 >                    elif self.datasetpath.upper() != 'NONE':
193 >                        dataset['ParentDataset']= self.datasetpath
194 >
195 >                    dataset['PSetContent']=self.content
196 >                    cfgMeta = {'name' : self.pset , 'Type' : 'user' , 'annotation': 'user cfg', 'version' : 'private version'} # add real name of user cfg
197 >                    common.logger.info("PrimaryDataset = %s"%dataset['PrimaryDataset'])
198 >                    common.logger.info("ProcessedDataset = %s"%dataset['ProcessedDataset'])
199 >                    common.logger.info("<User Dataset Name> = /"+dataset['PrimaryDataset']+"/"+dataset['ProcessedDataset']+"/USER")
200 >                    
201 >                    self.dataset_to_check="/"+dataset['PrimaryDataset']+"/"+dataset['ProcessedDataset']+"/USER"
202 >
203 >
204 >                    self.published_datasets.append(self.dataset_to_check)
205 >
206 >                    common.logger.log(10-1,"--->>> Inserting primary: %s processed : %s"%(dataset['PrimaryDataset'],dataset['ProcessedDataset']))
207 >                    
208 >                    #### check if dataset already exists in the DBS
209 >                    result = dbsReader.matchProcessedDatasets(dataset['PrimaryDataset'], 'USER', dataset['ProcessedDataset'])
210 >                    if (len(result) != 0):
211 >                       result = dbsReader.listDatasetFiles(self.dataset_to_check)
212 >
213 >                    primary = DBSWriterObjects.createPrimaryDataset( dataset, dbswriter.dbs)
214 >                    common.logger.log(10-1,"Primary:  %s "%primary)
215 >                    print "primary = ", primary
216 >
217 >                    algo = DBSWriterObjects.createAlgorithm(dataset, cfgMeta, dbswriter.dbs)
218 >                    common.logger.log(10-1,"Algo:  %s "%algo)
219 >
220 >                    processed = DBSWriterObjects.createProcessedDataset(primary, algo, dataset, dbswriter.dbs)
221 >                    common.logger.log(10-1,"Processed:  %s "%processed)
222 >                    print "processed = ", processed
223 >
224 >                    common.logger.log(10-1,"Inserted primary %s processed %s"%(primary,processed))
225 >                    #######################################################################################
226 >                
227          common.logger.log(10-1,"exit_status = %s "%self.exit_status)
228 <        return self.exit_status    
228 >        return self.exit_status
229  
230      def publishAJobReport(self,file,procdataset):
231          """
232             input:  xml file, processedDataset
233          """
234 +        common.logger.debug("FJR = %s"%file)
235          try:
236              jobReport = readJobReport(file)[0]
237              self.exit_status = '0'
# Line 212 | Line 239 | class Publisher(Actor):
239              self.exit_status = '1'
240              msg = "Error: Problem with "+file+" file"
241              raise CrabException(msg)
215        ### overwrite ProcessedDataset with user defined value
216        ### overwrite lumisections with no value
242          ### skip publication for 0 events files
243          filestopublish=[]
244          for file in jobReport.files:
# Line 223 | Line 248 | class Publisher(Actor):
248              elif (file['LFN'] == ''):
249                  self.noLFN.append(file['PFN'])
250              else:
251 <                if  self.skipOcheck==0:
252 <                    if int(file['TotalEvents']) != 0:
253 <                        #file.lumisections = {}
254 <                        # lumi info are now in run hash
255 <                        file.runs = {}
256 <                        for ds in file.dataset:
257 <                            ### Fede for production
233 <                            if (ds['PrimaryDataset'] == 'null'):
234 <                                #ds['PrimaryDataset']=procdataset
235 <                                ds['PrimaryDataset']=self.userprocessedData
236 <                        filestopublish.append(file)
237 <                    else:
238 <                        self.noEventsFiles.append(file['LFN'])
239 <                else:
240 <                    file.runs = {}
241 <                    for ds in file.dataset:
242 <                        ### Fede for production
243 <                        if (ds['PrimaryDataset'] == 'null'):
244 <                            #ds['PrimaryDataset']=procdataset
245 <                            ds['PrimaryDataset']=self.userprocessedData
246 <                    filestopublish.append(file)
251 >                if int(file['TotalEvents']) == 0:
252 >                    self.noEventsFiles.append(file['LFN'])
253 >                for ds in file.dataset:
254 >                    ### Fede for production
255 >                    if (ds['PrimaryDataset'] == 'null'):
256 >                        ds['PrimaryDataset']=self.userprocessedData
257 >                filestopublish.append(file)
258        
259          jobReport.files = filestopublish
260 +        for file in filestopublish:
261 +            common.logger.debug("--->>> LFN of file to publish =  " + str(file['LFN']))
262          ### if all files of FJR have number of events = 0
263          if (len(filestopublish) == 0):
264 <           return None
264 >            return None
265            
266          #// DBS to contact
267          dbswriter = DBSWriter(self.DBSURL)
268          # insert files
269          Blocks=None
270          try:
271 <            Blocks=dbswriter.insertFiles(jobReport)
272 <            common.logger.info("Inserting file in blocks = %s"%Blocks)
271 >            ### FEDE added insertDetectorData = True to propagate in DBS info about run and lumi
272 >            Blocks=dbswriter.insertFiles(jobReport, insertDetectorData = True)
273 >            #Blocks=dbswriter.insertFiles(jobReport)
274 >            common.logger.debug("--->>> Inserting file in blocks = %s"%Blocks)
275          except DBSWriterError, ex:
276 <            common.logger.info("Insert file error: %s"%ex)
276 >            common.logger.debug("--->>> Insert file error: %s"%ex)
277          return Blocks
278  
279      def run(self):
# Line 266 | Line 281 | class Publisher(Actor):
281          parse of all xml file on res dir and creation of distionary
282          """
283          
284 <        file_list = glob.glob(self.resDir+"crab_fjr*.xml")
270 <        ## Select only those fjr that are succesfull
284 >        task = common._db.getTask()
285          good_list=[]
286 <        for fjr in file_list:
286 >
287 >        for job in task.getJobs():
288 >            fjr = self.fjrDirectory + job['outputFiles'][-1]
289 >            if (job.runningJob['applicationReturnCode']!=0 or job.runningJob['wrapperReturnCode']!=0): continue
290 >            # get FJR filename
291 >            fjr = self.fjrDirectory + job['outputFiles'][-1]
292              reports = readJobReport(fjr)
293              if len(reports)>0:
294                 if reports[0].status == "Success":
295                    good_list.append(fjr)
296          file_list=good_list
297 +
298 +        #file_list = glob.glob(self.resDir+"crab_fjr*.xml")
299 +        
300 +        ## Select only those fjr that are succesfull
301 +        #if (len(file_list)==0):
302 +        #    common.logger.info("--->>> "+self.resDir+" empty: no file to publish on DBS")
303 +        #    self.exit_status = '1'
304 +        #    return self.exit_status
305 +
306 +        #good_list=[]
307 +        #for fjr in file_list:
308 +        #    reports = readJobReport(fjr)
309 +        #    if len(reports)>0:
310 +        #       if reports[0].status == "Success":
311 +        #          good_list.append(fjr)
312 +        #file_list=good_list
313          ##
314 <        common.logger.log(10-1, "file_list = "+str(file_list))
314 >        common.logger.log(10-1, "fjr with FrameworkJobReport Status='Success', file_list = "+str(file_list))
315          common.logger.log(10-1, "len(file_list) = "+str(len(file_list)))
316 <            
316 >
317          if (len(file_list)>0):
318              BlocksList=[]
319              common.logger.info("--->>> Start dataset publication")
320              self.exit_status=self.publishDataset(file_list[0])
321              if (self.exit_status == '1'):
322 <                return self.exit_status
322 >                return self.exit_status
323              common.logger.info("--->>> End dataset publication")
324  
325  
326              common.logger.info("--->>> Start files publication")
327              for file in file_list:
293                common.logger.debug( "file = "+file)
328                  Blocks=self.publishAJobReport(file,self.processedData)
329                  if Blocks:
330                      for x in Blocks: # do not allow multiple entries of the same block
331                          if x not in BlocksList:
332                             BlocksList.append(x)
333 <                    
333 >
334              # close the blocks
335              common.logger.log(10-1, "BlocksList = %s"%BlocksList)
302            # dbswriter = DBSWriter(self.DBSURL,level='ERROR')
336              dbswriter = DBSWriter(self.DBSURL)
337 <            
337 >
338              for BlockName in BlocksList:
339 <                try:  
339 >                try:
340                      closeBlock=dbswriter.manageFileBlock(BlockName,maxFiles= 1)
341                      common.logger.log(10-1, "closeBlock %s"%closeBlock)
309                    #dbswriter.dbs.closeBlock(BlockName)
342                  except DBSWriterError, ex:
343                      common.logger.info("Close block error %s"%ex)
344  
345              if (len(self.noEventsFiles)>0):
346 <                common.logger.info("--->>> WARNING: "+str(len(self.noEventsFiles))+" files not published because they contain 0 events are:")
346 >                common.logger.info("--->>> WARNING: "+str(len(self.noEventsFiles))+" published files contain 0 events are:")
347                  for lfn in self.noEventsFiles:
348                      common.logger.info("------ LFN: %s"%lfn)
349              if (len(self.noLFN)>0):
# Line 323 | Line 355 | class Publisher(Actor):
355                  for lfn in self.problemFiles:
356                      common.logger.info("------ LFN: %s"%lfn)
357              common.logger.info("--->>> End files publication")
358 <          
359 <            self.cfg_params['USER.dataset_to_check']=self.dataset_to_check
360 <            from InspectDBS import InspectDBS
361 <            check=InspectDBS(self.cfg_params)
362 <            check.checkPublication()
358 >
359 >            #### FEDE for MULTI ####
360 >            for dataset_to_check in self.published_datasets:
361 >                self.cfg_params['USER.dataset_to_check']=dataset_to_check
362 >                from InspectDBS import InspectDBS
363 >                check=InspectDBS(self.cfg_params)
364 >                check.checkPublication()
365 >            #########################
366 >
367              return self.exit_status
368  
369          else:
370 <            common.logger.info("--->>> "+self.resDir+" empty: no file to publish on DBS")
370 >            common.logger.info("--->>> No valid files to publish on DBS. Your jobs do not report exit codes = 0")
371              self.exit_status = '1'
372              return self.exit_status
373      

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines