ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Publisher.py
Revision: 1.31
Committed: Wed Jun 10 13:23:38 2009 UTC (15 years, 10 months ago) by slacapra
Content type: text/x-python
Branch: MAIN
Changes since 1.30: +0 -1 lines
Log Message:
typo

File Contents

# User Rev Content
1 fanzago 1.2 import getopt, string
2 slacapra 1.1 import common
3     import time, glob
4     from Actor import *
5     from crab_util import *
6     from crab_exceptions import *
7 fanzago 1.9 from ProdCommon.FwkJobRep.ReportParser import readJobReport
8 afanfani 1.19 from ProdCommon.FwkJobRep.ReportState import checkSuccess
9 slacapra 1.1 from ProdCommon.MCPayloads.WorkflowSpec import WorkflowSpec
10     from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter
11     from ProdCommon.DataMgmt.DBS.DBSErrors import DBSWriterError, formatEx,DBSReaderError
12     from ProdCommon.DataMgmt.DBS.DBSReader import DBSReader
13 fanzago 1.2 from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter,DBSWriterObjects
14     import sys
15 slacapra 1.1
16     class Publisher(Actor):
17     def __init__(self, cfg_params):
18     """
19     Publisher class:
20    
21     - parses CRAB FrameworkJobReport on UI
22     - returns <file> section of xml in dictionary format for each xml file in crab_0_xxxx/res directory
23     - publishes output data on DBS and DLS
24     """
25    
26 fanzago 1.28 self.cfg_params=cfg_params
27    
28 slacapra 1.30 if not cfg_params.has_key('USER.publish_data_name')
29 slacapra 1.1 raise CrabException('Cannot publish output data, because you did not specify USER.publish_data_name parameter in the crab.cfg file')
30 slacapra 1.30 self.userprocessedData = cfg_params['USER.publish_data_name']
31     self.processedData = None
32 slacapra 1.8
33 slacapra 1.30 if (not cfg_params.has_key('USER.copy_data') or int(cfg_params['USER.copy_data']) != 1 ) or \
34     (not cfg_params.has_key('USER.publish_data') or int(cfg_params['USER.publish_data']) != 1 ):
35 spiga 1.22 msg = 'You can not publish data because you did not selected \n'
36     msg += '\t*** copy_data = 1 or publish_data = 1 *** in the crab.cfg file'
37 slacapra 1.30
38     # try:
39     # self.pset = cfg_params['CMSSW.pset']
40     # except KeyError:
41     # raise CrabException('Cannot publish output data, because you did not specify the psetname in [CMSSW] of your crab.cfg file')
42    
43     self.globalDBS=cfg_params.get('CMSSW.dbs_url',"http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet")
44    
45     if not cfg_params.has_key('USER.dbs_url_for_publication'):
46 fanzago 1.11 msg = "Warning. The [USER] section does not have 'dbs_url_for_publication'"
47     msg = msg + " entry, necessary to publish the data.\n"
48     msg = msg + "Use the command **crab -publish -USER.dbs_url_for_publication=dbs_url_for_publication*** \nwhere dbs_url_for_publication is your local dbs instance."
49 fanzago 1.3 raise CrabException(msg)
50 slacapra 1.30
51     self.DBSURL=cfg_params['USER.dbs_url_for_publication']
52     common.logger.info('<dbs_url_for_publication> = '+self.DBSURL)
53     if (self.DBSURL == "http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet") or (self.DBSURL == "https://cmsdbsprod.cern.ch:8443/cms_dbs_prod_global_writer/servlet/DBSServlet"):
54     msg = "You can not publish your data in the globalDBS = " + self.DBSURL + "\n"
55     msg = msg + "Please write your local one in the [USER] section 'dbs_url_for_publication'"
56     raise CrabException(msg)
57 fanzago 1.4
58     self.content=file(self.pset).read()
59     self.resDir = common.work_space.resDir()
60 fanzago 1.12
61     self.dataset_to_import=[]
62    
63 slacapra 1.1 self.datasetpath=cfg_params['CMSSW.datasetpath']
64 fanzago 1.12 if (self.datasetpath.upper() != 'NONE'):
65     self.dataset_to_import.append(self.datasetpath)
66    
67     ### Added PU dataset
68 spiga 1.13 tmp = cfg_params.get('CMSSW.dataset_pu',None)
69 fanzago 1.12 if tmp :
70     datasets = tmp.split(',')
71     for dataset in datasets:
72     dataset=string.strip(dataset)
73     self.dataset_to_import.append(dataset)
74     ###
75 spiga 1.24
76 fanzago 1.29 self.skipOcheck=cfg_params.get('CMSSW.publish_zero_event',0)
77 spiga 1.24
78 slacapra 1.1 self.SEName=''
79     self.CMSSW_VERSION=''
80     self.exit_status=''
81     self.time = time.strftime('%y%m%d_%H%M%S',time.localtime(time.time()))
82 fanzago 1.5 self.problemFiles=[]
83     self.noEventsFiles=[]
84     self.noLFN=[]
85 slacapra 1.1
86     def importParentDataset(self,globalDBS, datasetpath):
87     """
88     """
89     dbsWriter = DBSWriter(self.DBSURL,level='ERROR')
90    
91     try:
92 fanzago 1.26 #dbsWriter.importDatasetWithoutParentage(globalDBS, self.datasetpath, self.DBSURL)
93     dbsWriter.importDataset(globalDBS, self.datasetpath, self.DBSURL)
94 slacapra 1.1 except DBSWriterError, ex:
95     msg = "Error importing dataset to be processed into local DBS\n"
96     msg += "Source Dataset: %s\n" % datasetpath
97     msg += "Source DBS: %s\n" % globalDBS
98     msg += "Destination DBS: %s\n" % self.DBSURL
99 spiga 1.27 common.logger.info(msg)
100     common.logger.debug(str(ex))
101 slacapra 1.1 return 1
102     return 0
103    
104     def publishDataset(self,file):
105     """
106     """
107     try:
108     jobReport = readJobReport(file)[0]
109     self.exit_status = '0'
110     except IndexError:
111     self.exit_status = '1'
112     msg = "Error: Problem with "+file+" file"
113 spiga 1.27 common.logger.info(msg)
114 slacapra 1.1 return self.exit_status
115 fanzago 1.12
116     if (len(self.dataset_to_import) != 0):
117     for dataset in self.dataset_to_import:
118 spiga 1.27 common.logger.info("--->>> Importing parent dataset in the dbs: " +dataset)
119 fanzago 1.12 status_import=self.importParentDataset(self.globalDBS, dataset)
120     if (status_import == 1):
121 spiga 1.27 common.logger.info('Problem with parent '+ dataset +' import from the global DBS '+self.globalDBS+ 'to the local one '+self.DBSURL)
122 fanzago 1.12 self.exit_status='1'
123     return self.exit_status
124     else:
125 spiga 1.27 common.logger.info('Import ok of dataset '+dataset)
126 fanzago 1.2
127 slacapra 1.1 #// DBS to contact
128 fanzago 1.2 dbswriter = DBSWriter(self.DBSURL)
129 slacapra 1.1 try:
130     fileinfo= jobReport.files[0]
131     self.exit_status = '0'
132     except IndexError:
133     self.exit_status = '1'
134     msg = "Error: No file to publish in xml file"+file+" file"
135 spiga 1.27 common.logger.info(msg)
136 slacapra 1.1 return self.exit_status
137    
138     datasets=fileinfo.dataset
139 spiga 1.27 common.logger.log(10-1,"FileInfo = " + str(fileinfo))
140     common.logger.log(10-1,"DatasetInfo = " + str(datasets))
141 afanfani 1.19 if len(datasets)<=0:
142     self.exit_status = '1'
143     msg = "Error: No info about dataset in the xml file "+file
144 spiga 1.27 common.logger.info(msg)
145 afanfani 1.19 return self.exit_status
146 slacapra 1.1 for dataset in datasets:
147 fanzago 1.6 #### for production data
148 afanfani 1.16 self.processedData = dataset['ProcessedDataset']
149 fanzago 1.6 if (dataset['PrimaryDataset'] == 'null'):
150 fanzago 1.21 #dataset['PrimaryDataset'] = dataset['ProcessedDataset']
151     dataset['PrimaryDataset'] = self.userprocessedData
152     #else: # add parentage from input dataset
153     elif self.datasetpath.upper() != 'NONE':
154 afanfani 1.14 dataset['ParentDataset']= self.datasetpath
155    
156 fanzago 1.4 dataset['PSetContent']=self.content
157     cfgMeta = {'name' : self.pset , 'Type' : 'user' , 'annotation': 'user cfg', 'version' : 'private version'} # add real name of user cfg
158 spiga 1.27 common.logger.info("PrimaryDataset = %s"%dataset['PrimaryDataset'])
159     common.logger.info("ProcessedDataset = %s"%dataset['ProcessedDataset'])
160     common.logger.info("<User Dataset Name> = /"+dataset['PrimaryDataset']+"/"+dataset['ProcessedDataset']+"/USER")
161 fanzago 1.28 self.dataset_to_check="/"+dataset['PrimaryDataset']+"/"+dataset['ProcessedDataset']+"/USER"
162 fanzago 1.6
163 spiga 1.27 common.logger.log(10-1,"--->>> Inserting primary: %s processed : %s"%(dataset['PrimaryDataset'],dataset['ProcessedDataset']))
164 slacapra 1.1
165     primary = DBSWriterObjects.createPrimaryDataset( dataset, dbswriter.dbs)
166 spiga 1.27 common.logger.log(10-1,"Primary: %s "%primary)
167 slacapra 1.1
168     algo = DBSWriterObjects.createAlgorithm(dataset, cfgMeta, dbswriter.dbs)
169 spiga 1.27 common.logger.log(10-1,"Algo: %s "%algo)
170 slacapra 1.1
171     processed = DBSWriterObjects.createProcessedDataset(primary, algo, dataset, dbswriter.dbs)
172 spiga 1.27 common.logger.log(10-1,"Processed: %s "%processed)
173 slacapra 1.1
174 spiga 1.27 common.logger.log(10-1,"Inserted primary %s processed %s"%(primary,processed))
175 fanzago 1.2
176 spiga 1.27 common.logger.log(10-1,"exit_status = %s "%self.exit_status)
177 slacapra 1.1 return self.exit_status
178    
179     def publishAJobReport(self,file,procdataset):
180     """
181 fanzago 1.2 input: xml file, processedDataset
182 slacapra 1.1 """
183     try:
184     jobReport = readJobReport(file)[0]
185     self.exit_status = '0'
186     except IndexError:
187     self.exit_status = '1'
188     msg = "Error: Problem with "+file+" file"
189     raise CrabException(msg)
190 fanzago 1.4 ### overwrite ProcessedDataset with user defined value
191     ### overwrite lumisections with no value
192     ### skip publication for 0 events files
193     filestopublish=[]
194 slacapra 1.1 for file in jobReport.files:
195 fanzago 1.5 #### added check for problem with copy to SE and empty lfn
196     if (string.find(file['LFN'], 'copy_problems') != -1):
197     self.problemFiles.append(file['LFN'])
198     elif (file['LFN'] == ''):
199     self.noLFN.append(file['PFN'])
200 fanzago 1.4 else:
201 spiga 1.24 if self.skipOcheck==0:
202     if int(file['TotalEvents']) != 0:
203     #file.lumisections = {}
204     # lumi info are now in run hash
205     file.runs = {}
206     for ds in file.dataset:
207     ### Fede for production
208     if (ds['PrimaryDataset'] == 'null'):
209     #ds['PrimaryDataset']=procdataset
210     ds['PrimaryDataset']=self.userprocessedData
211     filestopublish.append(file)
212     else:
213     self.noEventsFiles.append(file['LFN'])
214     else:
215     file.runs = {}
216     for ds in file.dataset:
217     ### Fede for production
218     if (ds['PrimaryDataset'] == 'null'):
219     #ds['PrimaryDataset']=procdataset
220     ds['PrimaryDataset']=self.userprocessedData
221     filestopublish.append(file)
222    
223 fanzago 1.4 jobReport.files = filestopublish
224     ### if all files of FJR have number of events = 0
225     if (len(filestopublish) == 0):
226     return None
227    
228 slacapra 1.1 #// DBS to contact
229 fanzago 1.2 dbswriter = DBSWriter(self.DBSURL)
230 slacapra 1.1 # insert files
231     Blocks=None
232     try:
233     Blocks=dbswriter.insertFiles(jobReport)
234 spiga 1.27 common.logger.info("Inserting file in blocks = %s"%Blocks)
235 slacapra 1.1 except DBSWriterError, ex:
236 spiga 1.27 common.logger.info("Insert file error: %s"%ex)
237 slacapra 1.1 return Blocks
238    
239     def run(self):
240     """
241     parse of all xml file on res dir and creation of distionary
242     """
243 fanzago 1.2
244 slacapra 1.1 file_list = glob.glob(self.resDir+"crab_fjr*.xml")
245 afanfani 1.19 ## Select only those fjr that are succesfull
246     good_list=[]
247     for fjr in file_list:
248     reports = readJobReport(fjr)
249 afanfani 1.20 if len(reports)>0:
250     if reports[0].status == "Success":
251     good_list.append(fjr)
252 afanfani 1.19 file_list=good_list
253     ##
254 spiga 1.27 common.logger.log(10-1, "file_list = "+str(file_list))
255     common.logger.log(10-1, "len(file_list) = "+str(len(file_list)))
256 fanzago 1.2
257 slacapra 1.1 if (len(file_list)>0):
258     BlocksList=[]
259 spiga 1.27 common.logger.info("--->>> Start dataset publication")
260 fanzago 1.2 self.exit_status=self.publishDataset(file_list[0])
261     if (self.exit_status == '1'):
262     return self.exit_status
263 spiga 1.27 common.logger.info("--->>> End dataset publication")
264 fanzago 1.2
265    
266 spiga 1.27 common.logger.info("--->>> Start files publication")
267 slacapra 1.1 for file in file_list:
268 spiga 1.27 common.logger.debug( "file = "+file)
269 slacapra 1.1 Blocks=self.publishAJobReport(file,self.processedData)
270     if Blocks:
271 afanfani 1.14 for x in Blocks: # do not allow multiple entries of the same block
272     if x not in BlocksList:
273     BlocksList.append(x)
274 fanzago 1.2
275 slacapra 1.1 # close the blocks
276 spiga 1.27 common.logger.log(10-1, "BlocksList = %s"%BlocksList)
277 fanzago 1.2 # dbswriter = DBSWriter(self.DBSURL,level='ERROR')
278     dbswriter = DBSWriter(self.DBSURL)
279    
280 slacapra 1.1 for BlockName in BlocksList:
281     try:
282     closeBlock=dbswriter.manageFileBlock(BlockName,maxFiles= 1)
283 spiga 1.27 common.logger.log(10-1, "closeBlock %s"%closeBlock)
284 slacapra 1.1 #dbswriter.dbs.closeBlock(BlockName)
285     except DBSWriterError, ex:
286 spiga 1.27 common.logger.info("Close block error %s"%ex)
287 fanzago 1.4
288 fanzago 1.5 if (len(self.noEventsFiles)>0):
289 spiga 1.27 common.logger.info("--->>> WARNING: "+str(len(self.noEventsFiles))+" files not published because they contain 0 events are:")
290 fanzago 1.5 for lfn in self.noEventsFiles:
291 spiga 1.27 common.logger.info("------ LFN: %s"%lfn)
292 fanzago 1.5 if (len(self.noLFN)>0):
293 spiga 1.27 common.logger.info("--->>> WARNING: there are "+str(len(self.noLFN))+" files not published because they have empty LFN")
294 fanzago 1.5 for pfn in self.noLFN:
295 spiga 1.27 common.logger.info("------ pfn: %s"%pfn)
296 fanzago 1.5 if (len(self.problemFiles)>0):
297 spiga 1.27 common.logger.info("--->>> WARNING: "+str(len(self.problemFiles))+" files not published because they had problem with copy to SE")
298 fanzago 1.5 for lfn in self.problemFiles:
299 spiga 1.27 common.logger.info("------ LFN: %s"%lfn)
300     common.logger.info("--->>> End files publication")
301 fanzago 1.28
302     self.cfg_params['USER.dataset_to_check']=self.dataset_to_check
303     from InspectDBS import InspectDBS
304     check=InspectDBS(self.cfg_params)
305     check.checkPublication()
306 slacapra 1.1 return self.exit_status
307    
308     else:
309 spiga 1.27 common.logger.info("--->>> "+self.resDir+" empty: no file to publish on DBS")
310 slacapra 1.1 self.exit_status = '1'
311     return self.exit_status
312