ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Publisher.py
Revision: 1.9
Committed: Wed Jan 9 16:05:10 2008 UTC (17 years, 3 months ago) by fanzago
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_1_0_pre5, CRAB_2_1_0_pre4, CRAB_2_1_0_pre3
Changes since 1.8: +1 -1 lines
Log Message:
removed ProdAgent API dependencies for DBS publication

File Contents

# User Rev Content
1 fanzago 1.2 import getopt, string
2 slacapra 1.1 import common
3     import time, glob
4     from Actor import *
5     from crab_util import *
6     from crab_logger import Logger
7     from crab_exceptions import *
8 fanzago 1.9 from ProdCommon.FwkJobRep.ReportParser import readJobReport
9 slacapra 1.1 from ProdCommon.MCPayloads.WorkflowSpec import WorkflowSpec
10     from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter
11     from ProdCommon.DataMgmt.DBS.DBSErrors import DBSWriterError, formatEx,DBSReaderError
12     from ProdCommon.DataMgmt.DBS.DBSReader import DBSReader
13 fanzago 1.2 from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter,DBSWriterObjects
14     import sys
15 slacapra 1.1
16     class Publisher(Actor):
17     def __init__(self, cfg_params):
18     """
19     Publisher class:
20    
21     - parses CRAB FrameworkJobReport on UI
22     - returns <file> section of xml in dictionary format for each xml file in crab_0_xxxx/res directory
23     - publishes output data on DBS and DLS
24     """
25    
26     try:
27     self.processedData = cfg_params['USER.publish_data_name']
28     except KeyError:
29     raise CrabException('Cannot publish output data, because you did not specify USER.publish_data_name parameter in the crab.cfg file')
30 slacapra 1.8
31 slacapra 1.1 try:
32     if (int(cfg_params['USER.copy_data']) != 1): raise KeyError
33     except KeyError:
34     raise CrabException('You can not publish data because you did not selected *** copy_data = 1 *** in the crab.cfg file')
35 fanzago 1.4 try:
36     self.pset = cfg_params['CMSSW.pset']
37     except KeyError:
38     raise CrabException('Cannot publish output data, because you did not specify the psetname in [CMSSW] of your crab.cfg file')
39     try:
40     self.globalDBS=cfg_params['CMSSW.dbs_url']
41     except KeyError:
42     self.globalDBS="http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet"
43 fanzago 1.3 try:
44     self.DBSURL=cfg_params['USER.dbs_url_for_publication']
45 fanzago 1.6 common.logger.message('<dbs_url_for_publication> = '+self.DBSURL)
46     if (self.DBSURL == "http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet") or (self.DBSURL == "https://cmsdbsprod.cern.ch:8443/cms_dbs_prod_global_writer/servlet/DBSServlet"):
47 fanzago 1.4 msg = "You can not publish your data in the globalDBS = " + self.DBSURL + "\n"
48     msg = msg + "Please write your local one in the [USER] section 'dbs_url_for_publication'"
49     raise CrabException(msg)
50 fanzago 1.3 except KeyError:
51     msg = "Error. The [USER] section does not have 'dbs_url_for_publication'"
52     msg = msg + " entry, necessary to publish the data"
53     raise CrabException(msg)
54 fanzago 1.4
55     self.content=file(self.pset).read()
56     self.resDir = common.work_space.resDir()
57 slacapra 1.1 self.datasetpath=cfg_params['CMSSW.datasetpath']
58     self.SEName=''
59     self.CMSSW_VERSION=''
60     self.exit_status=''
61     self.time = time.strftime('%y%m%d_%H%M%S',time.localtime(time.time()))
62 fanzago 1.5 self.problemFiles=[]
63     self.noEventsFiles=[]
64     self.noLFN=[]
65 slacapra 1.1
66     def importParentDataset(self,globalDBS, datasetpath):
67     """
68     """
69     dbsWriter = DBSWriter(self.DBSURL,level='ERROR')
70    
71     try:
72     dbsWriter.importDataset(globalDBS, self.datasetpath, self.DBSURL)
73     except DBSWriterError, ex:
74     msg = "Error importing dataset to be processed into local DBS\n"
75     msg += "Source Dataset: %s\n" % datasetpath
76     msg += "Source DBS: %s\n" % globalDBS
77     msg += "Destination DBS: %s\n" % self.DBSURL
78     common.logger.message(msg)
79     return 1
80     return 0
81    
82     def publishDataset(self,file):
83     """
84     """
85     try:
86     jobReport = readJobReport(file)[0]
87     self.exit_status = '0'
88     except IndexError:
89     self.exit_status = '1'
90     msg = "Error: Problem with "+file+" file"
91     common.logger.message(msg)
92     return self.exit_status
93 fanzago 1.2
94 slacapra 1.1 if (self.datasetpath != 'None'):
95 fanzago 1.2 common.logger.message("--->>> Importing parent dataset in the dbs")
96 fanzago 1.4 status_import=self.importParentDataset(self.globalDBS, self.datasetpath)
97 slacapra 1.1 if (status_import == 1):
98 fanzago 1.4 common.logger.message('Problem with parent import from the global DBS '+self.globalDBS+ 'to the local one '+self.DBSURL)
99 slacapra 1.1 self.exit_status='1'
100     return self.exit_status
101 fanzago 1.2 common.logger.message("Parent import ok")
102    
103 slacapra 1.1 #// DBS to contact
104 fanzago 1.2 dbswriter = DBSWriter(self.DBSURL)
105 slacapra 1.1 try:
106     fileinfo= jobReport.files[0]
107     self.exit_status = '0'
108     except IndexError:
109     self.exit_status = '1'
110     msg = "Error: No file to publish in xml file"+file+" file"
111     common.logger.message(msg)
112     return self.exit_status
113    
114     datasets=fileinfo.dataset
115 fanzago 1.4 common.logger.debug(6,"FileInfo = " + str(fileinfo))
116     common.logger.debug(6,"DatasetInfo = " + str(datasets))
117 slacapra 1.1 for dataset in datasets:
118 fanzago 1.6 #### for production data
119     if (dataset['PrimaryDataset'] == 'null'):
120     dataset['PrimaryDataset'] = dataset['ProcessedDataset']
121    
122 fanzago 1.4 dataset['PSetContent']=self.content
123     cfgMeta = {'name' : self.pset , 'Type' : 'user' , 'annotation': 'user cfg', 'version' : 'private version'} # add real name of user cfg
124 slacapra 1.1 common.logger.message("PrimaryDataset = %s"%dataset['PrimaryDataset'])
125     common.logger.message("ProcessedDataset = %s"%dataset['ProcessedDataset'])
126 fanzago 1.6 common.logger.message("<User Dataset Name> = /"+dataset['PrimaryDataset']+"/"+dataset['ProcessedDataset']+"/USER")
127    
128     common.logger.debug(6,"--->>> Inserting primary: %s processed : %s"%(dataset['PrimaryDataset'],dataset['ProcessedDataset']))
129 slacapra 1.1
130     primary = DBSWriterObjects.createPrimaryDataset( dataset, dbswriter.dbs)
131 fanzago 1.4 common.logger.debug(6,"Primary: %s "%primary)
132 slacapra 1.1
133     algo = DBSWriterObjects.createAlgorithm(dataset, cfgMeta, dbswriter.dbs)
134 fanzago 1.4 common.logger.debug(6,"Algo: %s "%algo)
135 slacapra 1.1
136     processed = DBSWriterObjects.createProcessedDataset(primary, algo, dataset, dbswriter.dbs)
137 fanzago 1.4 common.logger.debug(6,"Processed: %s "%processed)
138 slacapra 1.1
139 fanzago 1.6 common.logger.debug(6,"Inserted primary %s processed %s"%(primary,processed))
140 fanzago 1.2
141 fanzago 1.4 common.logger.debug(6,"exit_status = %s "%self.exit_status)
142 slacapra 1.1 return self.exit_status
143    
144     def publishAJobReport(self,file,procdataset):
145     """
146 fanzago 1.2 input: xml file, processedDataset
147 slacapra 1.1 """
148     try:
149     jobReport = readJobReport(file)[0]
150     self.exit_status = '0'
151     except IndexError:
152     self.exit_status = '1'
153     msg = "Error: Problem with "+file+" file"
154     raise CrabException(msg)
155 fanzago 1.4 ### overwrite ProcessedDataset with user defined value
156     ### overwrite lumisections with no value
157     ### skip publication for 0 events files
158     filestopublish=[]
159 slacapra 1.1 for file in jobReport.files:
160 fanzago 1.5 #### added check for problem with copy to SE and empty lfn
161     if (string.find(file['LFN'], 'copy_problems') != -1):
162     self.problemFiles.append(file['LFN'])
163     elif (file['LFN'] == ''):
164     self.noLFN.append(file['PFN'])
165 fanzago 1.4 else:
166 fanzago 1.5 if int(file['TotalEvents']) != 0 :
167     file.lumisections = {}
168     for ds in file.dataset:
169     ds['ProcessedDataset']=procdataset
170 fanzago 1.6 ### Fede for production
171     if (ds['PrimaryDataset'] == 'null'):
172     ds['PrimaryDataset']=procdataset
173 fanzago 1.5 filestopublish.append(file)
174     else:
175     self.noEventsFiles.append(file['LFN'])
176 fanzago 1.4 jobReport.files = filestopublish
177     ### if all files of FJR have number of events = 0
178     if (len(filestopublish) == 0):
179     return None
180    
181 slacapra 1.1 #// DBS to contact
182 fanzago 1.2 dbswriter = DBSWriter(self.DBSURL)
183 slacapra 1.1 # insert files
184     Blocks=None
185     try:
186     Blocks=dbswriter.insertFiles(jobReport)
187 fanzago 1.2 common.logger.message("Blocks = %s"%Blocks)
188 slacapra 1.1 except DBSWriterError, ex:
189 fanzago 1.2 common.logger.message("Insert file error: %s"%ex)
190 slacapra 1.1 return Blocks
191    
192     def run(self):
193     """
194     parse of all xml file on res dir and creation of distionary
195     """
196 fanzago 1.2
197 slacapra 1.1 file_list = glob.glob(self.resDir+"crab_fjr*.xml")
198     common.logger.debug(6, "file_list = "+str(file_list))
199     common.logger.debug(6, "len(file_list) = "+str(len(file_list)))
200 fanzago 1.2
201 slacapra 1.1 if (len(file_list)>0):
202     BlocksList=[]
203 fanzago 1.2 common.logger.message("--->>> Start dataset publication")
204     self.exit_status=self.publishDataset(file_list[0])
205     if (self.exit_status == '1'):
206     return self.exit_status
207     common.logger.message("--->>> End dataset publication")
208    
209    
210     common.logger.message("--->>> Start files publication")
211 slacapra 1.1 for file in file_list:
212     common.logger.message("file = "+file)
213     Blocks=self.publishAJobReport(file,self.processedData)
214     if Blocks:
215     [BlocksList.append(x) for x in Blocks]
216 fanzago 1.2
217 slacapra 1.1 # close the blocks
218 fanzago 1.6 common.logger.debug(6, "BlocksList = %s"%BlocksList)
219 fanzago 1.2 # dbswriter = DBSWriter(self.DBSURL,level='ERROR')
220     dbswriter = DBSWriter(self.DBSURL)
221    
222 slacapra 1.1 for BlockName in BlocksList:
223     try:
224     closeBlock=dbswriter.manageFileBlock(BlockName,maxFiles= 1)
225 fanzago 1.6 common.logger.debug(6, "closeBlock %s"%closeBlock)
226 slacapra 1.1 #dbswriter.dbs.closeBlock(BlockName)
227     except DBSWriterError, ex:
228 fanzago 1.2 common.logger.message("Close block error %s"%ex)
229 fanzago 1.4
230 fanzago 1.5 if (len(self.noEventsFiles)>0):
231     common.logger.message("--->>> WARNING: "+str(len(self.noEventsFiles))+" files not published because they contain 0 events are:")
232     for lfn in self.noEventsFiles:
233     common.logger.message("------ LFN: %s"%lfn)
234     if (len(self.noLFN)>0):
235     common.logger.message("--->>> WARNING: there are "+str(len(self.noLFN))+" files not published because they have empty LFN")
236     for pfn in self.noLFN:
237     common.logger.message("------ pfn: %s"%pfn)
238     if (len(self.problemFiles)>0):
239     common.logger.message("--->>> WARNING: "+str(len(self.problemFiles))+" files not published because they had problem with copy to SE")
240     for lfn in self.problemFiles:
241 fanzago 1.4 common.logger.message("------ LFN: %s"%lfn)
242 fanzago 1.6 common.logger.message("--->>> End files publication")
243 fanzago 1.7 common.logger.message("--->>> To check data publication please use: InspectDBS2.py --DBSURL=<dbs_url_for_publication> --datasetPath=<User Dataset Name>")
244 slacapra 1.1 return self.exit_status
245    
246     else:
247 fanzago 1.2 common.logger.message("--->>> "+self.resDir+" empty: no file to publish on DBS")
248 slacapra 1.1 self.exit_status = '1'
249     return self.exit_status
250