ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Publisher.py
(Generate patch)

Comparing COMP/CRAB/python/Publisher.py (file contents):
Revision 1.46.2.2 by fanzago, Mon Apr 12 13:50:21 2010 UTC vs.
Revision 1.52 by fanzago, Tue Mar 22 09:52:32 2011 UTC

# Line 12 | Line 12 | from ProdCommon.DataMgmt.DBS.DBSErrors i
12   from ProdCommon.DataMgmt.DBS.DBSReader import DBSReader
13   from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter,DBSWriterObjects
14   import sys
15 from DBSAPI.dbsApiException import DbsException
15   from DBSAPI.dbsApi import DbsApi
16 + from DBSAPI.dbsMigrateApi import DbsMigrateApi
17 + from DBSAPI.dbsApiException import *
18  
19   class Publisher(Actor):
20      def __init__(self, cfg_params):
# Line 24 | Line 25 | class Publisher(Actor):
25          - returns <file> section of xml in dictionary format for each xml file in crab_0_xxxx/res directory
26          - publishes output data on DBS and DLS
27          """
27        self.cfg_params=cfg_params
28  
29 +        self.cfg_params=cfg_params
30 +        self.fjrDirectory = cfg_params.get('USER.outputdir' ,
31 +                                           common.work_space.resDir()) + '/'
32 +      
33          if not cfg_params.has_key('USER.publish_data_name'):
34              raise CrabException('Cannot publish output data, because you did not specify USER.publish_data_name parameter in the crab.cfg file')
35          self.userprocessedData = cfg_params['USER.publish_data_name']
# Line 52 | Line 56 | class Publisher(Actor):
56          self.DBSURL=cfg_params['USER.dbs_url_for_publication']
57          common.logger.info('<dbs_url_for_publication> = '+self.DBSURL)
58          if (self.DBSURL == "http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet") or (self.DBSURL == "https://cmsdbsprod.cern.ch:8443/cms_dbs_prod_global_writer/servlet/DBSServlet"):
59 <            msg = "You can not publish your data in the globalDBS = " + self.DBSURL + "\n"
59 >            msg = "You can not publish your data in the globalDBS = " + self.DBSURL + "\n"
60              msg = msg + "Please write your local one in the [USER] section 'dbs_url_for_publication'"
61              raise CrabException(msg)
62 <
62 >            
63          self.content=file(self.pset).read()
64          self.resDir = common.work_space.resDir()
65 <
65 >        
66          self.dataset_to_import=[]
67 <
67 >        
68          self.datasetpath=cfg_params['CMSSW.datasetpath']
69          if (self.datasetpath.upper() != 'NONE'):
70              self.dataset_to_import.append(self.datasetpath)
71 <
71 >        
72          ### Added PU dataset
73          tmp = cfg_params.get('CMSSW.dataset_pu',None)
74          if tmp :
# Line 72 | Line 76 | class Publisher(Actor):
76              for dataset in datasets:
77                  dataset=string.strip(dataset)
78                  self.dataset_to_import.append(dataset)
79 <        ###
80 <
77 <
79 >        ###        
80 >            
81          self.import_all_parents = cfg_params.get('USER.publish_with_import_all_parents',1)
82          
80        ### fede import parent dataset is compulsory ###
83          if ( int(self.import_all_parents) == 0 ):
84              common.logger.info("WARNING: The option USER.publish_with_import_all_parents=0 has been deprecated. The import of parents is compulsory and done by default")
85 <        ############
86 <        
87 <        self.skipOcheck=cfg_params.get('CMSSW.publish_zero_event',0)
86 <
85 >        self.skipOcheck=cfg_params.get('CMSSW.publish_zero_event',1)
86 >        if ( int(self.skipOcheck) == 0 ):
87 >            common.logger.info("WARNING: The option CMSSW.publish_zero_event has been deprecated. The publication is done by default also for files with 0 events")
88          self.SEName=''
89          self.CMSSW_VERSION=''
90          self.exit_status=''
# Line 92 | Line 93 | class Publisher(Actor):
93          self.noEventsFiles=[]
94          self.noLFN=[]
95  
96 +        #### FEDE to allow publication without input data in <file>
97 +        if cfg_params.has_key('USER.no_inp'):
98 +            self.no_inp = cfg_params['USER.no_inp']
99 +        else:
100 +            self.no_inp = 0
101 +        ############################################################
102      def importParentDataset(self,globalDBS, datasetpath):
103          """
104 +           WARNING: it works only with DBS_2_0_9_patch_6
105          """
106 <        dbsWriter = DBSWriter(self.DBSURL,level='ERROR')
106 >
107 >        args={'url':globalDBS}
108 >        try:
109 >            api_reader = DbsApi(args)
110 >        except DbsApiException, ex:
111 >            msg = "%s\n" % formatEx(ex)
112 >            raise CrabException(msg)
113 >
114 >        args={'url':self.DBSURL}
115 >        try:
116 >            api_writer = DbsApi(args)
117 >        except DbsApiException, ex:
118 >            msg = "%s\n" % formatEx(ex)
119 >            raise CrabException(msg)
120  
121          try:
101            #if (self.import_all_parents==1):
122              common.logger.info("--->>> Importing all parents level")
123              start = time.time()
124 <            common.logger.debug("start import time: " + str(start))
125 <            ### to skip the ProdCommon api exception in the case of block without location
126 <            ### skipNoSiteError=True
127 <            #dbsWriter.importDataset(globalDBS, datasetpath, self.DBSURL, skipNoSiteError=True)
128 <            ### calling dbs api directly
129 <            dbsWriter.dbs.migrateDatasetContents(globalDBS, self.DBSURL, datasetpath)
124 >            common.logger.debug("start import parents time: " + str(start))
125 >            for block in api_reader.listBlocks(datasetpath):
126 >                if (str(block['OpenForWriting']) != '1'):
127 >                    api_writer.dbsMigrateBlock(globalDBS,self.DBSURL,block['Name'] )
128 >                else:
129 >                    common.logger.debug("Skipping the import of " + block['Name'] + " it is an open block")
130 >                    continue
131 >                ################
132              stop = time.time()
133 <            common.logger.debug("stop import time: " + str(stop))
133 >            common.logger.debug("stop import parents time: " + str(stop))
134              common.logger.info("--->>> duration of all parents import (sec): "+str(stop - start))
135 <            ## still not removing the code, but TODO for the final release...
114 <            """                                                    
115 <            else:
116 <                common.logger.info("--->>> Importing only the datasetpath " + datasetpath)
117 <                start = time.time()
118 <                #dbsWriter.importDatasetWithoutParentage(globalDBS, datasetpath, self.DBSURL, skipNoSiteError=True)
119 <                ### calling dbs api directly
120 <                common.logger.debug("start import time: " + str(start))
121 <                dbsWriter.dbs.migrateDatasetContents(globalDBS, self.DBSURL, datasetpath, noParentsReadOnly = True )
122 <                stop = time.time()
123 <                common.logger.debug("stop import time: " + str(stop))
124 <                common.logger.info("--->>> duration of first level parent import (sec): "+str(stop - start))
125 <            """
126 <        except DBSWriterError, ex:
135 >        except DbsApiException, ex:
136              msg = "Error importing dataset to be processed into local DBS\n"
137              msg += "Source Dataset: %s\n" % datasetpath
138              msg += "Source DBS: %s\n" % globalDBS
# Line 137 | Line 146 | class Publisher(Actor):
146          """
147          """
148          try:
140            ### fjr content as argument
149              jobReport = readJobReport(file)[0]
150              self.exit_status = '0'
151          except IndexError:
# Line 146 | Line 154 | class Publisher(Actor):
154              common.logger.info(msg)
155              return self.exit_status
156  
149        #print "###################################################"
150        #print "len(jobReport.files) = ", len(jobReport.files)
151        #print "jobReport.files = ", jobReport.files
152        #print "###################################################"
153        
157          if (len(self.dataset_to_import) != 0):
158             for dataset in self.dataset_to_import:
159                 common.logger.info("--->>> Importing parent dataset in the dbs: " +dataset)
# Line 242 | Line 245 | class Publisher(Actor):
245              self.exit_status = '1'
246              msg = "Error: Problem with "+file+" file"
247              raise CrabException(msg)
245        ### overwrite ProcessedDataset with user defined value
246        ### overwrite lumisections with no value
248          ### skip publication for 0 events files
249          filestopublish=[]
250          for file in jobReport.files:
# Line 253 | Line 254 | class Publisher(Actor):
254              elif (file['LFN'] == ''):
255                  self.noLFN.append(file['PFN'])
256              else:
257 <                if  self.skipOcheck==0:
258 <                    if int(file['TotalEvents']) != 0:
259 <                        ### Fede to insert also run and lumi info in DBS
260 <                        #file.runs = {}
261 <                        for ds in file.dataset:
262 <                            ### Fede for production
263 <                            if (ds['PrimaryDataset'] == 'null'):
264 <                                ds['PrimaryDataset']=self.userprocessedData
264 <                        filestopublish.append(file)
265 <                    else:
266 <                        self.noEventsFiles.append(file['LFN'])
267 <                else:
268 <                    ### Fede to insert also run and lumi info in DBS
269 <                    #file.runs = {}
270 <                    for ds in file.dataset:
271 <                        ### For production
272 <                        if (ds['PrimaryDataset'] == 'null'):
273 <                            ds['PrimaryDataset']=self.userprocessedData
274 <                    filestopublish.append(file)
275 <
276 <        ### only good files will be published
257 >                if int(file['TotalEvents']) == 0:
258 >                    self.noEventsFiles.append(file['LFN'])
259 >                for ds in file.dataset:
260 >                    ### Fede for production
261 >                    if (ds['PrimaryDataset'] == 'null'):
262 >                        ds['PrimaryDataset']=self.userprocessedData
263 >                filestopublish.append(file)
264 >      
265          jobReport.files = filestopublish
278        #print "------>>> filestopublish = ", filestopublish
266          for file in filestopublish:
267              common.logger.debug("--->>> LFN of file to publish =  " + str(file['LFN']))
281            #print "--->>> LFN of file to publish = ", str(file['LFN'])
268          ### if all files of FJR have number of events = 0
269          if (len(filestopublish) == 0):
270              return None
271 <            
271 >          
272          #// DBS to contact
273          dbswriter = DBSWriter(self.DBSURL)
274          # insert files
# Line 290 | Line 276 | class Publisher(Actor):
276          try:
277              ### FEDE added insertDetectorData = True to propagate in DBS info about run and lumi
278              Blocks=dbswriter.insertFiles(jobReport, insertDetectorData = True)
279 +            #Blocks=dbswriter.insertFiles(jobReport)
280              common.logger.debug("--->>> Inserting file in blocks = %s"%Blocks)
281          except DBSWriterError, ex:
282              common.logger.debug("--->>> Insert file error: %s"%ex)
283          return Blocks
284  
285 +    def remove_input_from_fjr(self, list_of_good_files):
286 +            """
287 +              to remove the input file from fjr in the case of problem with lfn
288 +            """
289 +            from xml.etree.ElementTree import ElementTree, Element
290 +            new_good_list = []              
291 +            no_inp_dir = self.fjrDirectory + 'no_inp'
292 +            if not os.path.isdir(no_inp_dir):
293 +                try:
294 +                    os.mkdir(no_inp_dir)
295 +                    print "no_inp_dir = ", no_inp_dir
296 +                except:
297 +                    print "problem during no_inp_dir creation: ", no_inp_dir
298 +            for file in  list_of_good_files:
299 +                name_of_file = os.path.basename(file)
300 +                #print "name_of_file = " , name_of_file
301 +                oldxmlfile = ElementTree()
302 +                oldxmlfile.parse(file)
303 +                newxmlfile = ElementTree(Element(oldxmlfile.getroot().tag))
304 +                self.recurse(oldxmlfile.getroot(), newxmlfile.getroot())
305 +                new_good_file = no_inp_dir + '/' + name_of_file
306 +                newxmlfile.write(new_good_file)
307 +                new_good_list.append(new_good_file)
308 +            print "new_good_list = ", new_good_list    
309 +            return new_good_list  
310 +
311 +    def recurse(self,oldnode,newnode):
312 +            """
313 +               recursive function to remove
314 +            """
315 +            from xml.etree.ElementTree import ElementTree, Element
316 +            try:
317 +                newnode.text = oldnode.text
318 +            except AttributeError: pass
319 +            try:
320 +                newnode.attrib = oldnode.attrib
321 +            except AttributeError: pass
322 +            try:
323 +                newnode.tail = oldnode.tail
324 +            except AttributeError: pass
325 +
326 +            for oldi in oldnode.getchildren():
327 +                if oldi.tag != "Inputs" and oldi.tag == "DatasetInfo":
328 +                    newi = Element(oldi.tag)
329 +                    newtag = Element("Entry")
330 +                    newtag.attrib = {'Name':'Description'}
331 +                    newtag.text = 'Unknown provenance'
332 +                    newi.append(newtag)
333 +                    newnode.append(newi)
334 +                    self.recurse(oldi, newi)
335 +                elif oldi.tag != "Inputs" and oldi.tag != "DatasetInfo":    
336 +                    newi = Element(oldi.tag)
337 +                    newnode.append(newi)
338 +                    self.recurse(oldi, newi)
339 +
340      def run(self):
341          """
342          parse of all xml file on res dir and creation of distionary
343          """
302
303        file_list = glob.glob(self.resDir+"crab_fjr*.xml")
304
305        ## Select only those fjr that are succesfull
306        if (len(file_list)==0):
307            common.logger.info("--->>> "+self.resDir+" empty: no fjr files in the res dir to publish on DBS")
308            self.exit_status = '1'
309            return self.exit_status
310
311        good_list=[]
344          
345 <        for fjr in file_list:
345 >        task = common._db.getTask()
346 >        good_list=[]
347 >
348 >        for job in task.getJobs():
349 >            fjr = self.fjrDirectory + job['outputFiles'][-1]
350 >            if (job.runningJob['applicationReturnCode']!=0 or job.runningJob['wrapperReturnCode']!=0): continue
351 >            # get FJR filename
352 >            fjr = self.fjrDirectory + job['outputFiles'][-1]
353              reports = readJobReport(fjr)
354              if len(reports)>0:
316               ### with backup-copy the wrapper_exit_code is 60308 --> failed
355                 if reports[0].status == "Success":
356                    good_list.append(fjr)
357 +        
358 +        ####################################################
359 +        if self.no_inp == 1:
360 +            file_list = self.remove_input_from_fjr(good_list)
361 +        else:
362 +            file_list=good_list
363 +        print "file_list = ", file_list    
364 +        ####################################################    
365  
320        file_list=good_list
321        #print "fjr ok for publication are good_list = ", good_list
322        ##
366          common.logger.log(10-1, "fjr with FrameworkJobReport Status='Success', file_list = "+str(file_list))
367          common.logger.log(10-1, "len(file_list) = "+str(len(file_list)))
368  
369          if (len(file_list)>0):
370              BlocksList=[]
371              common.logger.info("--->>> Start dataset publication")
329            ### primo fjr trovato
372              self.exit_status=self.publishDataset(file_list[0])
331            #sys.exit()
373              if (self.exit_status == '1'):
374                  return self.exit_status
375              common.logger.info("--->>> End dataset publication")
376  
377  
378              common.logger.info("--->>> Start files publication")
338
339            ### file_list composed by only fjr 'success' :
379              for file in file_list:
380                  Blocks=self.publishAJobReport(file,self.processedData)
381                  if Blocks:
# Line 356 | Line 395 | class Publisher(Actor):
395                      common.logger.info("Close block error %s"%ex)
396  
397              if (len(self.noEventsFiles)>0):
398 <                common.logger.info("--->>> WARNING: "+str(len(self.noEventsFiles))+" files not published because they contain 0 events are:")
398 >                common.logger.info("--->>> WARNING: "+str(len(self.noEventsFiles))+" published files contain 0 events are:")
399                  for lfn in self.noEventsFiles:
400                      common.logger.info("------ LFN: %s"%lfn)
401              if (len(self.noLFN)>0):
# Line 369 | Line 408 | class Publisher(Actor):
408                      common.logger.info("------ LFN: %s"%lfn)
409              common.logger.info("--->>> End files publication")
410  
411 <            #### for MULTI PUBLICATION added for ####
411 >            #### FEDE for MULTI ####
412              for dataset_to_check in self.published_datasets:
413                  self.cfg_params['USER.dataset_to_check']=dataset_to_check
414                  from InspectDBS import InspectDBS
415                  check=InspectDBS(self.cfg_params)
416                  check.checkPublication()
417 <                
417 >            #########################
418 >
419              return self.exit_status
420 <            
420 >
421          else:
422              common.logger.info("--->>> No valid files to publish on DBS. Your jobs do not report exit codes = 0")
423              self.exit_status = '1'
424              return self.exit_status
425 +    

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines