ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Publisher.py
Revision: 1.12
Committed: Wed Jun 18 14:02:11 2008 UTC (16 years, 10 months ago) by fanzago
Content type: text/x-python
Branch: MAIN
Changes since 1.11: +26 -9 lines
Log Message:
added the import of PU dataset in the local DBS

File Contents

# User Rev Content
1 fanzago 1.2 import getopt, string
2 slacapra 1.1 import common
3     import time, glob
4     from Actor import *
5     from crab_util import *
6     from crab_logger import Logger
7     from crab_exceptions import *
8 fanzago 1.9 from ProdCommon.FwkJobRep.ReportParser import readJobReport
9 slacapra 1.1 from ProdCommon.MCPayloads.WorkflowSpec import WorkflowSpec
10     from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter
11     from ProdCommon.DataMgmt.DBS.DBSErrors import DBSWriterError, formatEx,DBSReaderError
12     from ProdCommon.DataMgmt.DBS.DBSReader import DBSReader
13 fanzago 1.2 from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter,DBSWriterObjects
14     import sys
15 slacapra 1.1
16     class Publisher(Actor):
17     def __init__(self, cfg_params):
18     """
19     Publisher class:
20    
21     - parses CRAB FrameworkJobReport on UI
22     - returns <file> section of xml in dictionary format for each xml file in crab_0_xxxx/res directory
23     - publishes output data on DBS and DLS
24     """
25    
26     try:
27     self.processedData = cfg_params['USER.publish_data_name']
28     except KeyError:
29     raise CrabException('Cannot publish output data, because you did not specify USER.publish_data_name parameter in the crab.cfg file')
30 slacapra 1.8
31 slacapra 1.1 try:
32     if (int(cfg_params['USER.copy_data']) != 1): raise KeyError
33     except KeyError:
34     raise CrabException('You can not publish data because you did not selected *** copy_data = 1 *** in the crab.cfg file')
35 fanzago 1.4 try:
36     self.pset = cfg_params['CMSSW.pset']
37     except KeyError:
38     raise CrabException('Cannot publish output data, because you did not specify the psetname in [CMSSW] of your crab.cfg file')
39     try:
40     self.globalDBS=cfg_params['CMSSW.dbs_url']
41     except KeyError:
42     self.globalDBS="http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet"
43 fanzago 1.3 try:
44     self.DBSURL=cfg_params['USER.dbs_url_for_publication']
45 fanzago 1.6 common.logger.message('<dbs_url_for_publication> = '+self.DBSURL)
46     if (self.DBSURL == "http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet") or (self.DBSURL == "https://cmsdbsprod.cern.ch:8443/cms_dbs_prod_global_writer/servlet/DBSServlet"):
47 fanzago 1.4 msg = "You can not publish your data in the globalDBS = " + self.DBSURL + "\n"
48     msg = msg + "Please write your local one in the [USER] section 'dbs_url_for_publication'"
49     raise CrabException(msg)
50 fanzago 1.3 except KeyError:
51 fanzago 1.11 msg = "Warning. The [USER] section does not have 'dbs_url_for_publication'"
52     msg = msg + " entry, necessary to publish the data.\n"
53     msg = msg + "Use the command **crab -publish -USER.dbs_url_for_publication=dbs_url_for_publication*** \nwhere dbs_url_for_publication is your local dbs instance."
54 fanzago 1.3 raise CrabException(msg)
55 fanzago 1.4
56     self.content=file(self.pset).read()
57     self.resDir = common.work_space.resDir()
58 fanzago 1.12
59     self.dataset_to_import=[]
60    
61 slacapra 1.1 self.datasetpath=cfg_params['CMSSW.datasetpath']
62 fanzago 1.12 if (self.datasetpath.upper() != 'NONE'):
63     self.dataset_to_import.append(self.datasetpath)
64    
65     ### Added PU dataset
66     tmp = cfg_params.get('USER.dataset_pu',None)
67     if tmp :
68     datasets = tmp.split(',')
69     for dataset in datasets:
70     dataset=string.strip(dataset)
71     self.dataset_to_import.append(dataset)
72     ###
73    
74 slacapra 1.1 self.SEName=''
75     self.CMSSW_VERSION=''
76     self.exit_status=''
77     self.time = time.strftime('%y%m%d_%H%M%S',time.localtime(time.time()))
78 fanzago 1.5 self.problemFiles=[]
79     self.noEventsFiles=[]
80     self.noLFN=[]
81 slacapra 1.1
82     def importParentDataset(self,globalDBS, datasetpath):
83     """
84     """
85     dbsWriter = DBSWriter(self.DBSURL,level='ERROR')
86    
87     try:
88 afanfani 1.10 dbsWriter.importDatasetWithoutParentage(globalDBS, self.datasetpath, self.DBSURL)
89 slacapra 1.1 except DBSWriterError, ex:
90     msg = "Error importing dataset to be processed into local DBS\n"
91     msg += "Source Dataset: %s\n" % datasetpath
92     msg += "Source DBS: %s\n" % globalDBS
93     msg += "Destination DBS: %s\n" % self.DBSURL
94     common.logger.message(msg)
95     return 1
96     return 0
97    
98     def publishDataset(self,file):
99     """
100     """
101     try:
102     jobReport = readJobReport(file)[0]
103     self.exit_status = '0'
104     except IndexError:
105     self.exit_status = '1'
106     msg = "Error: Problem with "+file+" file"
107     common.logger.message(msg)
108     return self.exit_status
109 fanzago 1.12
110     if (len(self.dataset_to_import) != 0):
111     for dataset in self.dataset_to_import:
112     common.logger.message("--->>> Importing parent dataset in the dbs: " +dataset)
113     status_import=self.importParentDataset(self.globalDBS, dataset)
114     if (status_import == 1):
115     common.logger.message('Problem with parent '+ dataset +' import from the global DBS '+self.globalDBS+ 'to the local one '+self.DBSURL)
116     self.exit_status='1'
117     return self.exit_status
118     else:
119     common.logger.message('Import ok of dataset '+dataset)
120 fanzago 1.2
121 slacapra 1.1 #// DBS to contact
122 fanzago 1.2 dbswriter = DBSWriter(self.DBSURL)
123 slacapra 1.1 try:
124     fileinfo= jobReport.files[0]
125     self.exit_status = '0'
126     except IndexError:
127     self.exit_status = '1'
128     msg = "Error: No file to publish in xml file"+file+" file"
129     common.logger.message(msg)
130     return self.exit_status
131    
132     datasets=fileinfo.dataset
133 fanzago 1.4 common.logger.debug(6,"FileInfo = " + str(fileinfo))
134     common.logger.debug(6,"DatasetInfo = " + str(datasets))
135 slacapra 1.1 for dataset in datasets:
136 fanzago 1.6 #### for production data
137     if (dataset['PrimaryDataset'] == 'null'):
138     dataset['PrimaryDataset'] = dataset['ProcessedDataset']
139    
140 fanzago 1.4 dataset['PSetContent']=self.content
141     cfgMeta = {'name' : self.pset , 'Type' : 'user' , 'annotation': 'user cfg', 'version' : 'private version'} # add real name of user cfg
142 slacapra 1.1 common.logger.message("PrimaryDataset = %s"%dataset['PrimaryDataset'])
143     common.logger.message("ProcessedDataset = %s"%dataset['ProcessedDataset'])
144 fanzago 1.6 common.logger.message("<User Dataset Name> = /"+dataset['PrimaryDataset']+"/"+dataset['ProcessedDataset']+"/USER")
145    
146     common.logger.debug(6,"--->>> Inserting primary: %s processed : %s"%(dataset['PrimaryDataset'],dataset['ProcessedDataset']))
147 slacapra 1.1
148     primary = DBSWriterObjects.createPrimaryDataset( dataset, dbswriter.dbs)
149 fanzago 1.4 common.logger.debug(6,"Primary: %s "%primary)
150 slacapra 1.1
151     algo = DBSWriterObjects.createAlgorithm(dataset, cfgMeta, dbswriter.dbs)
152 fanzago 1.4 common.logger.debug(6,"Algo: %s "%algo)
153 slacapra 1.1
154     processed = DBSWriterObjects.createProcessedDataset(primary, algo, dataset, dbswriter.dbs)
155 fanzago 1.4 common.logger.debug(6,"Processed: %s "%processed)
156 slacapra 1.1
157 fanzago 1.6 common.logger.debug(6,"Inserted primary %s processed %s"%(primary,processed))
158 fanzago 1.2
159 fanzago 1.4 common.logger.debug(6,"exit_status = %s "%self.exit_status)
160 slacapra 1.1 return self.exit_status
161    
162     def publishAJobReport(self,file,procdataset):
163     """
164 fanzago 1.2 input: xml file, processedDataset
165 slacapra 1.1 """
166     try:
167     jobReport = readJobReport(file)[0]
168     self.exit_status = '0'
169     except IndexError:
170     self.exit_status = '1'
171     msg = "Error: Problem with "+file+" file"
172     raise CrabException(msg)
173 fanzago 1.4 ### overwrite ProcessedDataset with user defined value
174     ### overwrite lumisections with no value
175     ### skip publication for 0 events files
176     filestopublish=[]
177 slacapra 1.1 for file in jobReport.files:
178 fanzago 1.5 #### added check for problem with copy to SE and empty lfn
179     if (string.find(file['LFN'], 'copy_problems') != -1):
180     self.problemFiles.append(file['LFN'])
181     elif (file['LFN'] == ''):
182     self.noLFN.append(file['PFN'])
183 fanzago 1.4 else:
184 fanzago 1.5 if int(file['TotalEvents']) != 0 :
185     file.lumisections = {}
186     for ds in file.dataset:
187     ds['ProcessedDataset']=procdataset
188 fanzago 1.6 ### Fede for production
189     if (ds['PrimaryDataset'] == 'null'):
190     ds['PrimaryDataset']=procdataset
191 fanzago 1.5 filestopublish.append(file)
192     else:
193     self.noEventsFiles.append(file['LFN'])
194 fanzago 1.4 jobReport.files = filestopublish
195     ### if all files of FJR have number of events = 0
196     if (len(filestopublish) == 0):
197     return None
198    
199 slacapra 1.1 #// DBS to contact
200 fanzago 1.2 dbswriter = DBSWriter(self.DBSURL)
201 slacapra 1.1 # insert files
202     Blocks=None
203     try:
204     Blocks=dbswriter.insertFiles(jobReport)
205 fanzago 1.2 common.logger.message("Blocks = %s"%Blocks)
206 slacapra 1.1 except DBSWriterError, ex:
207 fanzago 1.2 common.logger.message("Insert file error: %s"%ex)
208 slacapra 1.1 return Blocks
209    
210     def run(self):
211     """
212     parse of all xml file on res dir and creation of distionary
213     """
214 fanzago 1.2
215 slacapra 1.1 file_list = glob.glob(self.resDir+"crab_fjr*.xml")
216     common.logger.debug(6, "file_list = "+str(file_list))
217     common.logger.debug(6, "len(file_list) = "+str(len(file_list)))
218 fanzago 1.2
219 slacapra 1.1 if (len(file_list)>0):
220     BlocksList=[]
221 fanzago 1.2 common.logger.message("--->>> Start dataset publication")
222     self.exit_status=self.publishDataset(file_list[0])
223     if (self.exit_status == '1'):
224     return self.exit_status
225     common.logger.message("--->>> End dataset publication")
226    
227    
228     common.logger.message("--->>> Start files publication")
229 slacapra 1.1 for file in file_list:
230     common.logger.message("file = "+file)
231     Blocks=self.publishAJobReport(file,self.processedData)
232     if Blocks:
233     [BlocksList.append(x) for x in Blocks]
234 fanzago 1.2
235 slacapra 1.1 # close the blocks
236 fanzago 1.6 common.logger.debug(6, "BlocksList = %s"%BlocksList)
237 fanzago 1.2 # dbswriter = DBSWriter(self.DBSURL,level='ERROR')
238     dbswriter = DBSWriter(self.DBSURL)
239    
240 slacapra 1.1 for BlockName in BlocksList:
241     try:
242     closeBlock=dbswriter.manageFileBlock(BlockName,maxFiles= 1)
243 fanzago 1.6 common.logger.debug(6, "closeBlock %s"%closeBlock)
244 slacapra 1.1 #dbswriter.dbs.closeBlock(BlockName)
245     except DBSWriterError, ex:
246 fanzago 1.2 common.logger.message("Close block error %s"%ex)
247 fanzago 1.4
248 fanzago 1.5 if (len(self.noEventsFiles)>0):
249     common.logger.message("--->>> WARNING: "+str(len(self.noEventsFiles))+" files not published because they contain 0 events are:")
250     for lfn in self.noEventsFiles:
251     common.logger.message("------ LFN: %s"%lfn)
252     if (len(self.noLFN)>0):
253     common.logger.message("--->>> WARNING: there are "+str(len(self.noLFN))+" files not published because they have empty LFN")
254     for pfn in self.noLFN:
255     common.logger.message("------ pfn: %s"%pfn)
256     if (len(self.problemFiles)>0):
257     common.logger.message("--->>> WARNING: "+str(len(self.problemFiles))+" files not published because they had problem with copy to SE")
258     for lfn in self.problemFiles:
259 fanzago 1.4 common.logger.message("------ LFN: %s"%lfn)
260 fanzago 1.6 common.logger.message("--->>> End files publication")
261 fanzago 1.7 common.logger.message("--->>> To check data publication please use: InspectDBS2.py --DBSURL=<dbs_url_for_publication> --datasetPath=<User Dataset Name>")
262 slacapra 1.1 return self.exit_status
263    
264     else:
265 fanzago 1.2 common.logger.message("--->>> "+self.resDir+" empty: no file to publish on DBS")
266 slacapra 1.1 self.exit_status = '1'
267     return self.exit_status
268