ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Publisher.py
Revision: 1.1
Committed: Fri Nov 16 11:09:31 2007 UTC (17 years, 5 months ago) by slacapra
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_0_3_pre1, CRAB_2_0_2, CRAB_2_0_2_pre6
Log Message:
remove support for DBS1, and remove DBS2 string from  files/class

File Contents

# User Rev Content
1 slacapra 1.1 import sys, getopt, string
2     import common
3     import time, glob
4     from Actor import *
5     from FwkJobRep.ReportParser import readJobReport
6     from crab_util import *
7     from crab_logger import Logger
8     from crab_exceptions import *
9     from ProdCommon.MCPayloads.WorkflowSpec import WorkflowSpec
10     from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter
11     from ProdCommon.DataMgmt.DBS.DBSErrors import DBSWriterError, formatEx,DBSReaderError
12     from ProdCommon.DataMgmt.DBS.DBSReader import DBSReader
13     import ProdCommon.DataMgmt.DBS.DBSWriterObjects as DBSWriterObjects
14    
15    
16    
17     class Publisher(Actor):
18     def __init__(self, cfg_params):
19     """
20     Publisher class:
21    
22     - parses CRAB FrameworkJobReport on UI
23     - returns <file> section of xml in dictionary format for each xml file in crab_0_xxxx/res directory
24     - publishes output data on DBS and DLS
25     """
26    
27     try:
28     self.processedData = cfg_params['USER.publish_data_name']
29     except KeyError:
30     raise CrabException('Cannot publish output data, because you did not specify USER.publish_data_name parameter in the crab.cfg file')
31    
32     try:
33     if (int(cfg_params['USER.copy_data']) != 1): raise KeyError
34     except KeyError:
35     raise CrabException('You can not publish data because you did not selected *** copy_data = 1 *** in the crab.cfg file')
36    
37     common.logger.message('self.processedData = '+self.processedData)
38     self.resDir = common.work_space.resDir()
39     common.logger.message('self.resDir = '+self.resDir)
40     #old api self.DBSURL='http://cmssrv18.fnal.gov:8989/DBS/servlet/DBSServlet'
41     self.DBSURL=cfg_params['USER.dbs_url_for_publication']
42     #self.DBSURL='http://cmssrv17.fnal.gov:8989/DBS_1_0_4_pre2/servlet/DBSServlet'
43     common.logger.message('self.DBSURL = '+self.DBSURL)
44     self.datasetpath=cfg_params['CMSSW.datasetpath']
45     common.logger.message('self.datasetpath = '+self.datasetpath)
46     self.SEName=''
47     self.CMSSW_VERSION=''
48     self.exit_status=''
49     self.time = time.strftime('%y%m%d_%H%M%S',time.localtime(time.time()))
50    
51    
52     def importParentDataset(self,globalDBS, datasetpath):
53     """
54     """
55     dbsWriter = DBSWriter(self.DBSURL,level='ERROR')
56    
57     try:
58     common.logger.message("----->>>>importing parent dataset in the local dbs")
59     dbsWriter.importDataset(globalDBS, self.datasetpath, self.DBSURL)
60     except DBSWriterError, ex:
61     msg = "Error importing dataset to be processed into local DBS\n"
62     msg += "Source Dataset: %s\n" % datasetpath
63     msg += "Source DBS: %s\n" % globalDBS
64     msg += "Destination DBS: %s\n" % self.DBSURL
65     common.logger.message(msg)
66     return 1
67     return 0
68    
69     def publishDataset(self,file):
70     """
71     """
72     try:
73     jobReport = readJobReport(file)[0]
74     msg = "--->>> reading "+file+" file"
75     common.logger.message(msg)
76     self.exit_status = '0'
77     except IndexError:
78     self.exit_status = '1'
79     msg = "Error: Problem with "+file+" file"
80     common.logger.message(msg)
81     return self.exit_status
82     #####for parents information import #########################################
83     #### the globalDBS has to be written in the crab cfg file!!!!! ###############
84     globalDBS="http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet"
85     if (self.datasetpath != 'None'):
86     status_import=self.importParentDataset(globalDBS, self.datasetpath)
87     if (status_import == 1):
88     common.logger.message('Problem with parent import from the global DBS '+globalDBS+ 'to the local one '+self.DBSURL)
89     self.exit_status='1'
90     ###############################################################################
91     ## ___ >>>>>>> comment out the next line, if you have problem with the import
92     ###############################################################################
93     return self.exit_status
94     pass
95     #// DBS to contact
96     dbswriter = DBSWriter(self.DBSURL,level='ERROR')
97     # publish a dataset : it should be done only once for the task
98     # and not for all the JobReport
99     try:
100     fileinfo= jobReport.files[0]
101     self.exit_status = '0'
102     except IndexError:
103     self.exit_status = '1'
104     msg = "Error: No file to publish in xml file"+file+" file"
105     common.logger.message(msg)
106     return self.exit_status
107    
108     common.logger.message("FileInfo = " + str(fileinfo))
109     datasets=fileinfo.dataset
110     common.logger.message("DatasetInfo = " + str(datasets))
111     for dataset in datasets:
112     #### to understand how to fill cfgMeta info ###############
113     cfgMeta = {'name' : 'usercfg' , 'Type' : 'user' , 'annotation': 'user cfg', 'version' : 'private version'} # add real name of user cfg
114     common.logger.message("PrimaryDataset = %s"%dataset['PrimaryDataset'])
115     common.logger.message("ProcessedDataset = %s"%dataset['ProcessedDataset'])
116     common.logger.message("Inserting primary: %s processed : %s"%(dataset['PrimaryDataset'],dataset['ProcessedDataset']))
117    
118     primary = DBSWriterObjects.createPrimaryDataset( dataset, dbswriter.dbs)
119    
120     algo = DBSWriterObjects.createAlgorithm(dataset, cfgMeta, dbswriter.dbs)
121    
122     processed = DBSWriterObjects.createProcessedDataset(primary, algo, dataset, dbswriter.dbs)
123    
124     common.logger.message("Inserted primary %s processed %s"%(primary,processed))
125     return self.exit_status
126    
127     def publishAJobReport(self,file,procdataset):
128     """
129     """
130     try:
131     jobReport = readJobReport(file)[0]
132     self.exit_status = '0'
133     except IndexError:
134     self.exit_status = '1'
135     msg = "Error: Problem with "+file+" file"
136     raise CrabException(msg)
137    
138     # overwite ProcessedDataset with user defined value
139     for file in jobReport.files:
140     for ds in file.dataset:
141     ds['ProcessedDataset']=procdataset
142     #// DBS to contact
143     dbswriter = DBSWriter(self.DBSURL,level='ERROR')
144     # insert files
145     Blocks=None
146     try:
147     Blocks=dbswriter.insertFiles(jobReport)
148     common.logger.message("------>>>> Blocks = %s"%Blocks)
149     except DBSWriterError, ex:
150     common.logger.message("insert file error: %s"%ex)
151     return Blocks
152    
153     def run(self):
154     """
155     parse of all xml file on res dir and creation of distionary
156     """
157     common.logger.message("Starting data publish")
158     file_list = glob.glob(self.resDir+"crab_fjr*.xml")
159     common.logger.debug(6, "file_list = "+str(file_list))
160     common.logger.debug(6, "len(file_list) = "+str(len(file_list)))
161     # FIXME:
162     # do the dataset publication self.publishDataset here
163     #
164     if (len(file_list)>0):
165     BlocksList=[]
166     for file in file_list:
167     common.logger.message("file = "+file)
168     common.logger.message("Publishing dataset")
169     self.exit_status=self.publishDataset(file)
170     if (self.exit_status == '1'):
171     return self.exit_status
172     common.logger.message("Publishing files")
173     Blocks=self.publishAJobReport(file,self.processedData)
174     if Blocks:
175     [BlocksList.append(x) for x in Blocks]
176     # close the blocks
177     common.logger.message("------>>>> BlocksList = %s"%BlocksList)
178     dbswriter = DBSWriter(self.DBSURL,level='ERROR')
179     for BlockName in BlocksList:
180     try:
181     closeBlock=dbswriter.manageFileBlock(BlockName,maxFiles= 1)
182     common.logger.message("------>>>> closeBlock %s"%closeBlock)
183     #dbswriter.dbs.closeBlock(BlockName)
184     except DBSWriterError, ex:
185     common.logger.message("------>>>> close block error %s"%ex)
186    
187     return self.exit_status
188    
189     else:
190     common.logger.message(self.resDir+" empty --> No file to publish on DBS/DLS")
191     self.exit_status = '1'
192     return self.exit_status
193