ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Publisher.py
(Generate patch)

Comparing COMP/CRAB/python/Publisher.py (file contents):
Revision 1.46 by fanzago, Thu Mar 4 14:56:16 2010 UTC vs.
Revision 1.46.2.1 by fanzago, Fri Mar 19 15:03:03 2010 UTC

# Line 18 | Line 18 | from DBSAPI.dbsApi import DbsApi
18   class Publisher(Actor):
19      def __init__(self, cfg_params):
20          """
21 <        Publisher class:
21 >        Publisher class:
22  
23          - parses CRAB FrameworkJobReport on UI
24          - returns <file> section of xml in dictionary format for each xml file in crab_0_xxxx/res directory
25          - publishes output data on DBS and DLS
26          """
27
27          self.cfg_params=cfg_params
28 <      
28 >
29          if not cfg_params.has_key('USER.publish_data_name'):
30              raise CrabException('Cannot publish output data, because you did not specify USER.publish_data_name parameter in the crab.cfg file')
31 <        self.userprocessedData = cfg_params['USER.publish_data_name']
31 >        self.userprocessedData = cfg_params['USER.publish_data_name']
32          self.processedData = None
33  
34          if (not cfg_params.has_key('USER.copy_data') or int(cfg_params['USER.copy_data']) != 1 ) or \
# Line 53 | Line 52 | class Publisher(Actor):
52          self.DBSURL=cfg_params['USER.dbs_url_for_publication']
53          common.logger.info('<dbs_url_for_publication> = '+self.DBSURL)
54          if (self.DBSURL == "http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet") or (self.DBSURL == "https://cmsdbsprod.cern.ch:8443/cms_dbs_prod_global_writer/servlet/DBSServlet"):
55 <            msg = "You can not publish your data in the globalDBS = " + self.DBSURL + "\n"
55 >            msg = "You can not publish your data in the globalDBS = " + self.DBSURL + "\n"
56              msg = msg + "Please write your local one in the [USER] section 'dbs_url_for_publication'"
57              raise CrabException(msg)
58 <            
58 >
59          self.content=file(self.pset).read()
60          self.resDir = common.work_space.resDir()
61 <        
61 >
62          self.dataset_to_import=[]
63 <        
63 >
64          self.datasetpath=cfg_params['CMSSW.datasetpath']
65          if (self.datasetpath.upper() != 'NONE'):
66              self.dataset_to_import.append(self.datasetpath)
67 <        
67 >
68          ### Added PU dataset
69          tmp = cfg_params.get('CMSSW.dataset_pu',None)
70          if tmp :
# Line 73 | Line 72 | class Publisher(Actor):
72              for dataset in datasets:
73                  dataset=string.strip(dataset)
74                  self.dataset_to_import.append(dataset)
75 <        ###        
76 <            
77 <        #self.import_all_parents = cfg_params.get('USER.publish_with_import_all_parents',1)
75 >        ###
76 >
77 >
78 >        self.import_all_parents = cfg_params.get('USER.publish_with_import_all_parents',1)
79 >        
80 >        ### fede import parent dataset is compulsory ###
81 >        if ( int(self.import_all_parents) == 0 ):
82 >            common.logger.info("WARNING: The option USER.publish_with_import_all_parents=0 has been deprecated. The import of parents is compulsory and done by default")
83 >        ############
84 >        
85          self.skipOcheck=cfg_params.get('CMSSW.publish_zero_event',0)
86 <    
86 >
87          self.SEName=''
88          self.CMSSW_VERSION=''
89          self.exit_status=''
90          self.time = time.strftime('%y%m%d_%H%M%S',time.localtime(time.time()))
91 <        self.problemFiles=[]  
91 >        self.problemFiles=[]
92          self.noEventsFiles=[]
93          self.noLFN=[]
94 <        
94 >
95      def importParentDataset(self,globalDBS, datasetpath):
96          """
97          """
98          dbsWriter = DBSWriter(self.DBSURL,level='ERROR')
99 <        
99 >
100          try:
101              #if (self.import_all_parents==1):
102              common.logger.info("--->>> Importing all parents level")
# Line 131 | Line 137 | class Publisher(Actor):
137          """
138          """
139          try:
140 +            ### fjr content as argument
141              jobReport = readJobReport(file)[0]
142              self.exit_status = '0'
143          except IndexError:
144              self.exit_status = '1'
145 <            msg = "Error: Problem with "+file+" file"  
145 >            msg = "Error: Problem with "+file+" file"
146              common.logger.info(msg)
147              return self.exit_status
148  
149 +        #print "###################################################"
150 +        #print "len(jobReport.files) = ", len(jobReport.files)
151 +        #print "jobReport.files = ", jobReport.files
152 +        #print "###################################################"
153 +        
154          if (len(self.dataset_to_import) != 0):
155             for dataset in self.dataset_to_import:
156                 common.logger.info("--->>> Importing parent dataset in the dbs: " +dataset)
# Line 147 | Line 159 | class Publisher(Actor):
159                     common.logger.info('Problem with parent '+ dataset +' import from the global DBS '+self.globalDBS+ 'to the local one '+self.DBSURL)
160                     self.exit_status='1'
161                     return self.exit_status
162 <               else:    
162 >               else:
163                     common.logger.info('Import ok of dataset '+dataset)
164 <            
165 <        #// DBS to contact
166 <        dbswriter = DBSWriter(self.DBSURL)                        
155 <        try:  
156 <            fileinfo= jobReport.files[0]
157 <            self.exit_status = '0'
158 <        except IndexError:
164 >
165 >        
166 >        if (len(jobReport.files) <= 0) :
167              self.exit_status = '1'
168 <            msg = "Error: No EDM file to publish in xml file"+file+" file"  
168 >            msg = "Error: No EDM file to publish in xml file"+file+" file"
169              common.logger.info(msg)
170              return self.exit_status
171 +        else:
172 +            print " fjr contains some files to publish"
173  
174 <        datasets=fileinfo.dataset
175 <        common.logger.log(10-1,"FileInfo = " + str(fileinfo))
176 <        common.logger.log(10-1,"DatasetInfo = " + str(datasets))
177 <        if len(datasets)<=0:
178 <           self.exit_status = '1'
169 <           msg = "Error: No info about dataset in the xml file "+file
170 <           common.logger.info(msg)
171 <           return self.exit_status
172 <        for dataset in datasets:
173 <            #### for production data
174 <            self.processedData = dataset['ProcessedDataset']
175 <            if (dataset['PrimaryDataset'] == 'null'):
176 <                #dataset['PrimaryDataset'] = dataset['ProcessedDataset']
177 <                dataset['PrimaryDataset'] = self.userprocessedData
178 <            #else: # add parentage from input dataset
179 <            elif self.datasetpath.upper() != 'NONE':
180 <                dataset['ParentDataset']= self.datasetpath
181 <    
182 <            dataset['PSetContent']=self.content
183 <            cfgMeta = {'name' : self.pset , 'Type' : 'user' , 'annotation': 'user cfg', 'version' : 'private version'} # add real name of user cfg
184 <            common.logger.info("PrimaryDataset = %s"%dataset['PrimaryDataset'])
185 <            common.logger.info("ProcessedDataset = %s"%dataset['ProcessedDataset'])
186 <            common.logger.info("<User Dataset Name> = /"+dataset['PrimaryDataset']+"/"+dataset['ProcessedDataset']+"/USER")
187 <            self.dataset_to_check="/"+dataset['PrimaryDataset']+"/"+dataset['ProcessedDataset']+"/USER"
188 <            
189 <            common.logger.log(10-1,"--->>> Inserting primary: %s processed : %s"%(dataset['PrimaryDataset'],dataset['ProcessedDataset']))
190 <            
191 <            primary = DBSWriterObjects.createPrimaryDataset( dataset, dbswriter.dbs)
192 <            common.logger.log(10-1,"Primary:  %s "%primary)
193 <            
194 <            algo = DBSWriterObjects.createAlgorithm(dataset, cfgMeta, dbswriter.dbs)
195 <            common.logger.log(10-1,"Algo:  %s "%algo)
174 >        #### datasets creation in dbs
175 >        #// DBS to contact write and read of the same dbs
176 >        dbsReader = DBSReader(self.DBSURL,level='ERROR')
177 >        dbswriter = DBSWriter(self.DBSURL)
178 >        #####
179  
180 <            processed = DBSWriterObjects.createProcessedDataset(primary, algo, dataset, dbswriter.dbs)
181 <            common.logger.log(10-1,"Processed:  %s "%processed)
182 <            
183 <            common.logger.log(10-1,"Inserted primary %s processed %s"%(primary,processed))
184 <            
180 >        self.published_datasets = []
181 >        for fileinfo in jobReport.files:
182 >            #print "--->>> nel for fileinfo = ", fileinfo
183 >            datasets_info=fileinfo.dataset
184 >            #print "--->>> nel for datasets_info = ", datasets_info
185 >            if len(datasets_info)<=0:
186 >                self.exit_status = '1'
187 >                msg = "Error: No info about dataset in the xml file "+file
188 >                common.logger.info(msg)
189 >                return self.exit_status
190 >            else:
191 >                for dataset in datasets_info:
192 >                    #### for production data
193 >                    self.processedData = dataset['ProcessedDataset']
194 >                    if (dataset['PrimaryDataset'] == 'null'):
195 >                        dataset['PrimaryDataset'] = self.userprocessedData
196 >                    elif self.datasetpath.upper() != 'NONE':
197 >                        dataset['ParentDataset']= self.datasetpath
198 >
199 >                    dataset['PSetContent']=self.content
200 >                    cfgMeta = {'name' : self.pset , 'Type' : 'user' , 'annotation': 'user cfg', 'version' : 'private version'} # add real name of user cfg
201 >                    common.logger.info("PrimaryDataset = %s"%dataset['PrimaryDataset'])
202 >                    common.logger.info("ProcessedDataset = %s"%dataset['ProcessedDataset'])
203 >                    common.logger.info("<User Dataset Name> = /"+dataset['PrimaryDataset']+"/"+dataset['ProcessedDataset']+"/USER")
204 >                    
205 >                    self.dataset_to_check="/"+dataset['PrimaryDataset']+"/"+dataset['ProcessedDataset']+"/USER"
206 >
207 >
208 >                    self.published_datasets.append(self.dataset_to_check)
209 >
210 >                    common.logger.log(10-1,"--->>> Inserting primary: %s processed : %s"%(dataset['PrimaryDataset'],dataset['ProcessedDataset']))
211 >                    
212 >                    #### check if dataset already exists in the DBS
213 >                    result = dbsReader.matchProcessedDatasets(dataset['PrimaryDataset'], 'USER', dataset['ProcessedDataset'])
214 >                    #print "result = ",result
215 >                    if (len(result) != 0):
216 >                       result = dbsReader.listDatasetFiles(self.dataset_to_check)
217 >                    #if (len(result) == 0):
218 >                       #print "len nulla"
219 >                    #else:
220 >                       #print "len non nulla"
221 >                    #   result = dbsReader.listDatasetFiles(self.dataset_to_check)
222 >                       #print "result = ", result
223 >
224 >                    primary = DBSWriterObjects.createPrimaryDataset( dataset, dbswriter.dbs)
225 >                    common.logger.log(10-1,"Primary:  %s "%primary)
226 >                    print "primary = ", primary
227 >
228 >                    algo = DBSWriterObjects.createAlgorithm(dataset, cfgMeta, dbswriter.dbs)
229 >                    common.logger.log(10-1,"Algo:  %s "%algo)
230 >
231 >                    processed = DBSWriterObjects.createProcessedDataset(primary, algo, dataset, dbswriter.dbs)
232 >                    common.logger.log(10-1,"Processed:  %s "%processed)
233 >                    print "processed = ", processed
234 >
235 >                    common.logger.log(10-1,"Inserted primary %s processed %s"%(primary,processed))
236 >                    #######################################################################################
237 >                
238          common.logger.log(10-1,"exit_status = %s "%self.exit_status)
239 <        return self.exit_status    
239 >        return self.exit_status
240  
241      def publishAJobReport(self,file,procdataset):
242          """
243             input:  xml file, processedDataset
244          """
245 +        print "--->>> in publishAJobReport : "
246          common.logger.debug("FJR = %s"%file)
247          try:
248              jobReport = readJobReport(file)[0]
# Line 240 | Line 277 | class Publisher(Actor):
277                      ### Fede to insert also run and lumi info in DBS
278                      #file.runs = {}
279                      for ds in file.dataset:
280 <                        ### Fede for production
280 >                        ### For production
281                          if (ds['PrimaryDataset'] == 'null'):
282                              ds['PrimaryDataset']=self.userprocessedData
283                      filestopublish.append(file)
284 <      
284 >
285 >        ### only good files will be published
286          jobReport.files = filestopublish
287 +        print "------>>> filestopublish = ", filestopublish
288          for file in filestopublish:
289              common.logger.debug("--->>> LFN of file to publish =  " + str(file['LFN']))
290 +            print "--->>> LFN of file to publish = ", str(file['LFN'])
291          ### if all files of FJR have number of events = 0
292          if (len(filestopublish) == 0):
293              return None
294 <          
294 >            
295          #// DBS to contact
296          dbswriter = DBSWriter(self.DBSURL)
297          # insert files
# Line 259 | Line 299 | class Publisher(Actor):
299          try:
300              ### FEDE added insertDetectorData = True to propagate in DBS info about run and lumi
301              Blocks=dbswriter.insertFiles(jobReport, insertDetectorData = True)
262            #Blocks=dbswriter.insertFiles(jobReport)
302              common.logger.debug("--->>> Inserting file in blocks = %s"%Blocks)
303          except DBSWriterError, ex:
304              common.logger.debug("--->>> Insert file error: %s"%ex)
# Line 269 | Line 308 | class Publisher(Actor):
308          """
309          parse of all xml file on res dir and creation of distionary
310          """
311 <        
311 >
312          file_list = glob.glob(self.resDir+"crab_fjr*.xml")
313 <        
313 >
314          ## Select only those fjr that are succesfull
315          if (len(file_list)==0):
316 <            common.logger.info("--->>> "+self.resDir+" empty: no file to publish on DBS")
316 >            common.logger.info("--->>> "+self.resDir+" empty: no fjr files in the res dir to publish on DBS")
317              self.exit_status = '1'
318              return self.exit_status
319  
320          good_list=[]
321 +        
322          for fjr in file_list:
323              reports = readJobReport(fjr)
324              if len(reports)>0:
325 +               ### with backup-copy the wrapper_exit_code is 60308 --> failed
326                 if reports[0].status == "Success":
327                    good_list.append(fjr)
328 +
329          file_list=good_list
330 +        print "fjr ok for publication are good_list = ", good_list
331          ##
332 <        common.logger.log(10-1, "file_list = "+str(file_list))
332 >        common.logger.log(10-1, "fjr with FrameworkJobReport Status='Success', file_list = "+str(file_list))
333          common.logger.log(10-1, "len(file_list) = "+str(len(file_list)))
334 <            
334 >
335          if (len(file_list)>0):
336              BlocksList=[]
337              common.logger.info("--->>> Start dataset publication")
338 +            ### primo fjr trovato
339              self.exit_status=self.publishDataset(file_list[0])
340 +            #sys.exit()
341              if (self.exit_status == '1'):
342 <                return self.exit_status
342 >                return self.exit_status
343              common.logger.info("--->>> End dataset publication")
344  
345  
346              common.logger.info("--->>> Start files publication")
347 +
348 +            ### file_list composed by only fjr 'success' :
349              for file in file_list:
350                  Blocks=self.publishAJobReport(file,self.processedData)
351                  if Blocks:
352                      for x in Blocks: # do not allow multiple entries of the same block
353                          if x not in BlocksList:
354                             BlocksList.append(x)
355 <                    
355 >
356              # close the blocks
357              common.logger.log(10-1, "BlocksList = %s"%BlocksList)
358              dbswriter = DBSWriter(self.DBSURL)
359 <            
359 >
360              for BlockName in BlocksList:
361 <                try:  
361 >                try:
362                      closeBlock=dbswriter.manageFileBlock(BlockName,maxFiles= 1)
363                      common.logger.log(10-1, "closeBlock %s"%closeBlock)
364                  except DBSWriterError, ex:
# Line 330 | Line 377 | class Publisher(Actor):
377                  for lfn in self.problemFiles:
378                      common.logger.info("------ LFN: %s"%lfn)
379              common.logger.info("--->>> End files publication")
333          
334            self.cfg_params['USER.dataset_to_check']=self.dataset_to_check
335            from InspectDBS import InspectDBS
336            check=InspectDBS(self.cfg_params)
337            check.checkPublication()
338            return self.exit_status
380  
381 +            #### for MULTI PUBLICATION added for ####
382 +            for dataset_to_check in self.published_datasets:
383 +                self.cfg_params['USER.dataset_to_check']=dataset_to_check
384 +                from InspectDBS import InspectDBS
385 +                check=InspectDBS(self.cfg_params)
386 +                check.checkPublication()
387 +                
388 +            return self.exit_status
389 +            
390          else:
391              common.logger.info("--->>> No valid files to publish on DBS. Your jobs do not report exit codes = 0")
392              self.exit_status = '1'
393              return self.exit_status
344    

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines