ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Publisher.py
Revision: 1.25
Committed: Wed Mar 25 11:35:30 2009 UTC (16 years, 1 month ago) by fanzago
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_6_0_pre2, CRAB_2_6_0_pre1, CRAB_2_5_1, CRAB_2_5_1_pre4, CRAB_2_5_1_pre3, CRAB_2_5_1_pre2, CRAB_2_5_1_pre1
Changes since 1.24: +1 -1 lines
Log Message:
changed logger.error in logger.message

File Contents

# User Rev Content
1 fanzago 1.2 import getopt, string
2 slacapra 1.1 import common
3     import time, glob
4     from Actor import *
5     from crab_util import *
6     from crab_logger import Logger
7     from crab_exceptions import *
8 fanzago 1.9 from ProdCommon.FwkJobRep.ReportParser import readJobReport
9 afanfani 1.19 from ProdCommon.FwkJobRep.ReportState import checkSuccess
10 slacapra 1.1 from ProdCommon.MCPayloads.WorkflowSpec import WorkflowSpec
11     from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter
12     from ProdCommon.DataMgmt.DBS.DBSErrors import DBSWriterError, formatEx,DBSReaderError
13     from ProdCommon.DataMgmt.DBS.DBSReader import DBSReader
14 fanzago 1.2 from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter,DBSWriterObjects
15     import sys
16 slacapra 1.1
17     class Publisher(Actor):
18     def __init__(self, cfg_params):
19     """
20     Publisher class:
21    
22     - parses CRAB FrameworkJobReport on UI
23     - returns <file> section of xml in dictionary format for each xml file in crab_0_xxxx/res directory
24     - publishes output data on DBS and DLS
25     """
26    
27     try:
28 fanzago 1.21 self.userprocessedData = cfg_params['USER.publish_data_name']
29 afanfani 1.16 self.processedData = None
30 slacapra 1.1 except KeyError:
31     raise CrabException('Cannot publish output data, because you did not specify USER.publish_data_name parameter in the crab.cfg file')
32 slacapra 1.8
33 slacapra 1.1 try:
34 spiga 1.22 if (int(cfg_params['USER.copy_data']) != 1):
35     raise KeyError
36 slacapra 1.1 except KeyError:
37 spiga 1.22 msg = 'You can not publish data because you did not selected \n'
38     msg += '\t*** copy_data = 1 or publish_data = 1 *** in the crab.cfg file'
39     raise CrabException(msg)
40 fanzago 1.4 try:
41     self.pset = cfg_params['CMSSW.pset']
42     except KeyError:
43     raise CrabException('Cannot publish output data, because you did not specify the psetname in [CMSSW] of your crab.cfg file')
44     try:
45     self.globalDBS=cfg_params['CMSSW.dbs_url']
46     except KeyError:
47     self.globalDBS="http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet"
48 fanzago 1.3 try:
49     self.DBSURL=cfg_params['USER.dbs_url_for_publication']
50 fanzago 1.6 common.logger.message('<dbs_url_for_publication> = '+self.DBSURL)
51     if (self.DBSURL == "http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet") or (self.DBSURL == "https://cmsdbsprod.cern.ch:8443/cms_dbs_prod_global_writer/servlet/DBSServlet"):
52 fanzago 1.4 msg = "You can not publish your data in the globalDBS = " + self.DBSURL + "\n"
53     msg = msg + "Please write your local one in the [USER] section 'dbs_url_for_publication'"
54     raise CrabException(msg)
55 fanzago 1.3 except KeyError:
56 fanzago 1.11 msg = "Warning. The [USER] section does not have 'dbs_url_for_publication'"
57     msg = msg + " entry, necessary to publish the data.\n"
58     msg = msg + "Use the command **crab -publish -USER.dbs_url_for_publication=dbs_url_for_publication*** \nwhere dbs_url_for_publication is your local dbs instance."
59 fanzago 1.3 raise CrabException(msg)
60 fanzago 1.4
61     self.content=file(self.pset).read()
62     self.resDir = common.work_space.resDir()
63 fanzago 1.12
64     self.dataset_to_import=[]
65    
66 slacapra 1.1 self.datasetpath=cfg_params['CMSSW.datasetpath']
67 fanzago 1.12 if (self.datasetpath.upper() != 'NONE'):
68     self.dataset_to_import.append(self.datasetpath)
69    
70     ### Added PU dataset
71 spiga 1.13 tmp = cfg_params.get('CMSSW.dataset_pu',None)
72 fanzago 1.12 if tmp :
73     datasets = tmp.split(',')
74     for dataset in datasets:
75     dataset=string.strip(dataset)
76     self.dataset_to_import.append(dataset)
77     ###
78 spiga 1.24
79     self.skipOcheck=cfg_params.get('CMSSW.pubilish_zero_event',0)
80    
81 slacapra 1.1 self.SEName=''
82     self.CMSSW_VERSION=''
83     self.exit_status=''
84     self.time = time.strftime('%y%m%d_%H%M%S',time.localtime(time.time()))
85 fanzago 1.5 self.problemFiles=[]
86     self.noEventsFiles=[]
87     self.noLFN=[]
88 slacapra 1.1
89     def importParentDataset(self,globalDBS, datasetpath):
90     """
91     """
92     dbsWriter = DBSWriter(self.DBSURL,level='ERROR')
93    
94     try:
95 afanfani 1.10 dbsWriter.importDatasetWithoutParentage(globalDBS, self.datasetpath, self.DBSURL)
96 slacapra 1.1 except DBSWriterError, ex:
97     msg = "Error importing dataset to be processed into local DBS\n"
98     msg += "Source Dataset: %s\n" % datasetpath
99     msg += "Source DBS: %s\n" % globalDBS
100     msg += "Destination DBS: %s\n" % self.DBSURL
101     common.logger.message(msg)
102 spiga 1.22 common.logger.write(str(ex))
103 slacapra 1.1 return 1
104     return 0
105    
106     def publishDataset(self,file):
107     """
108     """
109     try:
110     jobReport = readJobReport(file)[0]
111     self.exit_status = '0'
112     except IndexError:
113     self.exit_status = '1'
114     msg = "Error: Problem with "+file+" file"
115     common.logger.message(msg)
116     return self.exit_status
117 fanzago 1.12
118     if (len(self.dataset_to_import) != 0):
119     for dataset in self.dataset_to_import:
120     common.logger.message("--->>> Importing parent dataset in the dbs: " +dataset)
121     status_import=self.importParentDataset(self.globalDBS, dataset)
122     if (status_import == 1):
123     common.logger.message('Problem with parent '+ dataset +' import from the global DBS '+self.globalDBS+ 'to the local one '+self.DBSURL)
124     self.exit_status='1'
125     return self.exit_status
126     else:
127     common.logger.message('Import ok of dataset '+dataset)
128 fanzago 1.2
129 slacapra 1.1 #// DBS to contact
130 fanzago 1.2 dbswriter = DBSWriter(self.DBSURL)
131 slacapra 1.1 try:
132     fileinfo= jobReport.files[0]
133     self.exit_status = '0'
134     except IndexError:
135     self.exit_status = '1'
136     msg = "Error: No file to publish in xml file"+file+" file"
137     common.logger.message(msg)
138     return self.exit_status
139    
140     datasets=fileinfo.dataset
141 fanzago 1.4 common.logger.debug(6,"FileInfo = " + str(fileinfo))
142     common.logger.debug(6,"DatasetInfo = " + str(datasets))
143 afanfani 1.19 if len(datasets)<=0:
144     self.exit_status = '1'
145     msg = "Error: No info about dataset in the xml file "+file
146     common.logger.message(msg)
147     return self.exit_status
148 slacapra 1.1 for dataset in datasets:
149 fanzago 1.6 #### for production data
150 afanfani 1.16 self.processedData = dataset['ProcessedDataset']
151 fanzago 1.6 if (dataset['PrimaryDataset'] == 'null'):
152 fanzago 1.21 #dataset['PrimaryDataset'] = dataset['ProcessedDataset']
153     dataset['PrimaryDataset'] = self.userprocessedData
154     #else: # add parentage from input dataset
155     elif self.datasetpath.upper() != 'NONE':
156 afanfani 1.14 dataset['ParentDataset']= self.datasetpath
157    
158 fanzago 1.4 dataset['PSetContent']=self.content
159     cfgMeta = {'name' : self.pset , 'Type' : 'user' , 'annotation': 'user cfg', 'version' : 'private version'} # add real name of user cfg
160 slacapra 1.1 common.logger.message("PrimaryDataset = %s"%dataset['PrimaryDataset'])
161     common.logger.message("ProcessedDataset = %s"%dataset['ProcessedDataset'])
162 fanzago 1.6 common.logger.message("<User Dataset Name> = /"+dataset['PrimaryDataset']+"/"+dataset['ProcessedDataset']+"/USER")
163    
164     common.logger.debug(6,"--->>> Inserting primary: %s processed : %s"%(dataset['PrimaryDataset'],dataset['ProcessedDataset']))
165 slacapra 1.1
166     primary = DBSWriterObjects.createPrimaryDataset( dataset, dbswriter.dbs)
167 fanzago 1.4 common.logger.debug(6,"Primary: %s "%primary)
168 slacapra 1.1
169     algo = DBSWriterObjects.createAlgorithm(dataset, cfgMeta, dbswriter.dbs)
170 fanzago 1.4 common.logger.debug(6,"Algo: %s "%algo)
171 slacapra 1.1
172     processed = DBSWriterObjects.createProcessedDataset(primary, algo, dataset, dbswriter.dbs)
173 fanzago 1.4 common.logger.debug(6,"Processed: %s "%processed)
174 slacapra 1.1
175 fanzago 1.6 common.logger.debug(6,"Inserted primary %s processed %s"%(primary,processed))
176 fanzago 1.2
177 fanzago 1.4 common.logger.debug(6,"exit_status = %s "%self.exit_status)
178 slacapra 1.1 return self.exit_status
179    
180     def publishAJobReport(self,file,procdataset):
181     """
182 fanzago 1.2 input: xml file, processedDataset
183 slacapra 1.1 """
184     try:
185     jobReport = readJobReport(file)[0]
186     self.exit_status = '0'
187     except IndexError:
188     self.exit_status = '1'
189     msg = "Error: Problem with "+file+" file"
190     raise CrabException(msg)
191 fanzago 1.4 ### overwrite ProcessedDataset with user defined value
192     ### overwrite lumisections with no value
193     ### skip publication for 0 events files
194     filestopublish=[]
195 slacapra 1.1 for file in jobReport.files:
196 fanzago 1.5 #### added check for problem with copy to SE and empty lfn
197     if (string.find(file['LFN'], 'copy_problems') != -1):
198     self.problemFiles.append(file['LFN'])
199     elif (file['LFN'] == ''):
200     self.noLFN.append(file['PFN'])
201 fanzago 1.4 else:
202 spiga 1.24 if self.skipOcheck==0:
203     if int(file['TotalEvents']) != 0:
204     #file.lumisections = {}
205     # lumi info are now in run hash
206     file.runs = {}
207     for ds in file.dataset:
208     ### Fede for production
209     if (ds['PrimaryDataset'] == 'null'):
210     #ds['PrimaryDataset']=procdataset
211     ds['PrimaryDataset']=self.userprocessedData
212     filestopublish.append(file)
213     else:
214     self.noEventsFiles.append(file['LFN'])
215     else:
216     file.runs = {}
217     for ds in file.dataset:
218     ### Fede for production
219     if (ds['PrimaryDataset'] == 'null'):
220     #ds['PrimaryDataset']=procdataset
221     ds['PrimaryDataset']=self.userprocessedData
222     filestopublish.append(file)
223    
224 fanzago 1.4 jobReport.files = filestopublish
225     ### if all files of FJR have number of events = 0
226     if (len(filestopublish) == 0):
227     return None
228    
229 slacapra 1.1 #// DBS to contact
230 fanzago 1.2 dbswriter = DBSWriter(self.DBSURL)
231 slacapra 1.1 # insert files
232     Blocks=None
233     try:
234     Blocks=dbswriter.insertFiles(jobReport)
235 mcinquil 1.18 common.logger.message("Inserting file in blocks = %s"%Blocks)
236 slacapra 1.1 except DBSWriterError, ex:
237 fanzago 1.25 common.logger.message("Insert file error: %s"%ex)
238 slacapra 1.1 return Blocks
239    
240     def run(self):
241     """
242     parse of all xml file on res dir and creation of distionary
243     """
244 fanzago 1.2
245 slacapra 1.1 file_list = glob.glob(self.resDir+"crab_fjr*.xml")
246 afanfani 1.19 ## Select only those fjr that are succesfull
247     good_list=[]
248     for fjr in file_list:
249     reports = readJobReport(fjr)
250 afanfani 1.20 if len(reports)>0:
251     if reports[0].status == "Success":
252     good_list.append(fjr)
253 afanfani 1.19 file_list=good_list
254     ##
255 slacapra 1.1 common.logger.debug(6, "file_list = "+str(file_list))
256     common.logger.debug(6, "len(file_list) = "+str(len(file_list)))
257 fanzago 1.2
258 slacapra 1.1 if (len(file_list)>0):
259     BlocksList=[]
260 fanzago 1.2 common.logger.message("--->>> Start dataset publication")
261     self.exit_status=self.publishDataset(file_list[0])
262     if (self.exit_status == '1'):
263     return self.exit_status
264     common.logger.message("--->>> End dataset publication")
265    
266    
267     common.logger.message("--->>> Start files publication")
268 slacapra 1.1 for file in file_list:
269 mcinquil 1.18 common.logger.debug(1, "file = "+file)
270 slacapra 1.1 Blocks=self.publishAJobReport(file,self.processedData)
271     if Blocks:
272 afanfani 1.14 for x in Blocks: # do not allow multiple entries of the same block
273     if x not in BlocksList:
274     BlocksList.append(x)
275 fanzago 1.2
276 slacapra 1.1 # close the blocks
277 fanzago 1.6 common.logger.debug(6, "BlocksList = %s"%BlocksList)
278 fanzago 1.2 # dbswriter = DBSWriter(self.DBSURL,level='ERROR')
279     dbswriter = DBSWriter(self.DBSURL)
280    
281 slacapra 1.1 for BlockName in BlocksList:
282     try:
283     closeBlock=dbswriter.manageFileBlock(BlockName,maxFiles= 1)
284 fanzago 1.6 common.logger.debug(6, "closeBlock %s"%closeBlock)
285 slacapra 1.1 #dbswriter.dbs.closeBlock(BlockName)
286     except DBSWriterError, ex:
287 fanzago 1.2 common.logger.message("Close block error %s"%ex)
288 fanzago 1.4
289 fanzago 1.5 if (len(self.noEventsFiles)>0):
290     common.logger.message("--->>> WARNING: "+str(len(self.noEventsFiles))+" files not published because they contain 0 events are:")
291     for lfn in self.noEventsFiles:
292     common.logger.message("------ LFN: %s"%lfn)
293     if (len(self.noLFN)>0):
294     common.logger.message("--->>> WARNING: there are "+str(len(self.noLFN))+" files not published because they have empty LFN")
295     for pfn in self.noLFN:
296     common.logger.message("------ pfn: %s"%pfn)
297     if (len(self.problemFiles)>0):
298     common.logger.message("--->>> WARNING: "+str(len(self.problemFiles))+" files not published because they had problem with copy to SE")
299     for lfn in self.problemFiles:
300 fanzago 1.4 common.logger.message("------ LFN: %s"%lfn)
301 fanzago 1.6 common.logger.message("--->>> End files publication")
302 fanzago 1.7 common.logger.message("--->>> To check data publication please use: InspectDBS2.py --DBSURL=<dbs_url_for_publication> --datasetPath=<User Dataset Name>")
303 slacapra 1.1 return self.exit_status
304    
305     else:
306 fanzago 1.2 common.logger.message("--->>> "+self.resDir+" empty: no file to publish on DBS")
307 slacapra 1.1 self.exit_status = '1'
308     return self.exit_status
309