ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Publisher.py
Revision: 1.33
Committed: Wed Jun 10 13:31:42 2009 UTC (15 years, 10 months ago) by slacapra
Content type: text/x-python
Branch: MAIN
Changes since 1.32: +3 -4 lines
Log Message:
re-add self.pset

File Contents

# User Rev Content
1 fanzago 1.2 import getopt, string
2 slacapra 1.1 import common
3     import time, glob
4     from Actor import *
5     from crab_util import *
6     from crab_exceptions import *
7 fanzago 1.9 from ProdCommon.FwkJobRep.ReportParser import readJobReport
8 afanfani 1.19 from ProdCommon.FwkJobRep.ReportState import checkSuccess
9 slacapra 1.1 from ProdCommon.MCPayloads.WorkflowSpec import WorkflowSpec
10     from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter
11     from ProdCommon.DataMgmt.DBS.DBSErrors import DBSWriterError, formatEx,DBSReaderError
12     from ProdCommon.DataMgmt.DBS.DBSReader import DBSReader
13 fanzago 1.2 from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter,DBSWriterObjects
14     import sys
15 slacapra 1.1
16     class Publisher(Actor):
17     def __init__(self, cfg_params):
18     """
19     Publisher class:
20    
21     - parses CRAB FrameworkJobReport on UI
22     - returns <file> section of xml in dictionary format for each xml file in crab_0_xxxx/res directory
23     - publishes output data on DBS and DLS
24     """
25    
26 fanzago 1.28 self.cfg_params=cfg_params
27    
28 slacapra 1.32 if not cfg_params.has_key('USER.publish_data_name'):
29 slacapra 1.1 raise CrabException('Cannot publish output data, because you did not specify USER.publish_data_name parameter in the crab.cfg file')
30 slacapra 1.30 self.userprocessedData = cfg_params['USER.publish_data_name']
31     self.processedData = None
32 slacapra 1.8
33 slacapra 1.30 if (not cfg_params.has_key('USER.copy_data') or int(cfg_params['USER.copy_data']) != 1 ) or \
34     (not cfg_params.has_key('USER.publish_data') or int(cfg_params['USER.publish_data']) != 1 ):
35 spiga 1.22 msg = 'You can not publish data because you did not selected \n'
36     msg += '\t*** copy_data = 1 or publish_data = 1 *** in the crab.cfg file'
37 slacapra 1.30
38 slacapra 1.33 if not cfg_params.has_key('CMSSW.pset'):
39     raise CrabException('Cannot publish output data, because you did not specify the psetname in [CMSSW] of your crab.cfg file')
40     self.pset = cfg_params['CMSSW.pset']
41 slacapra 1.30
42     self.globalDBS=cfg_params.get('CMSSW.dbs_url',"http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet")
43    
44     if not cfg_params.has_key('USER.dbs_url_for_publication'):
45 fanzago 1.11 msg = "Warning. The [USER] section does not have 'dbs_url_for_publication'"
46     msg = msg + " entry, necessary to publish the data.\n"
47     msg = msg + "Use the command **crab -publish -USER.dbs_url_for_publication=dbs_url_for_publication*** \nwhere dbs_url_for_publication is your local dbs instance."
48 fanzago 1.3 raise CrabException(msg)
49 slacapra 1.30
50     self.DBSURL=cfg_params['USER.dbs_url_for_publication']
51     common.logger.info('<dbs_url_for_publication> = '+self.DBSURL)
52     if (self.DBSURL == "http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet") or (self.DBSURL == "https://cmsdbsprod.cern.ch:8443/cms_dbs_prod_global_writer/servlet/DBSServlet"):
53     msg = "You can not publish your data in the globalDBS = " + self.DBSURL + "\n"
54     msg = msg + "Please write your local one in the [USER] section 'dbs_url_for_publication'"
55     raise CrabException(msg)
56 fanzago 1.4
57     self.content=file(self.pset).read()
58     self.resDir = common.work_space.resDir()
59 fanzago 1.12
60     self.dataset_to_import=[]
61    
62 slacapra 1.1 self.datasetpath=cfg_params['CMSSW.datasetpath']
63 fanzago 1.12 if (self.datasetpath.upper() != 'NONE'):
64     self.dataset_to_import.append(self.datasetpath)
65    
66     ### Added PU dataset
67 spiga 1.13 tmp = cfg_params.get('CMSSW.dataset_pu',None)
68 fanzago 1.12 if tmp :
69     datasets = tmp.split(',')
70     for dataset in datasets:
71     dataset=string.strip(dataset)
72     self.dataset_to_import.append(dataset)
73     ###
74 spiga 1.24
75 fanzago 1.29 self.skipOcheck=cfg_params.get('CMSSW.publish_zero_event',0)
76 spiga 1.24
77 slacapra 1.1 self.SEName=''
78     self.CMSSW_VERSION=''
79     self.exit_status=''
80     self.time = time.strftime('%y%m%d_%H%M%S',time.localtime(time.time()))
81 fanzago 1.5 self.problemFiles=[]
82     self.noEventsFiles=[]
83     self.noLFN=[]
84 slacapra 1.1
85     def importParentDataset(self,globalDBS, datasetpath):
86     """
87     """
88     dbsWriter = DBSWriter(self.DBSURL,level='ERROR')
89    
90     try:
91 fanzago 1.26 #dbsWriter.importDatasetWithoutParentage(globalDBS, self.datasetpath, self.DBSURL)
92     dbsWriter.importDataset(globalDBS, self.datasetpath, self.DBSURL)
93 slacapra 1.1 except DBSWriterError, ex:
94     msg = "Error importing dataset to be processed into local DBS\n"
95     msg += "Source Dataset: %s\n" % datasetpath
96     msg += "Source DBS: %s\n" % globalDBS
97     msg += "Destination DBS: %s\n" % self.DBSURL
98 spiga 1.27 common.logger.info(msg)
99     common.logger.debug(str(ex))
100 slacapra 1.1 return 1
101     return 0
102    
103     def publishDataset(self,file):
104     """
105     """
106     try:
107     jobReport = readJobReport(file)[0]
108     self.exit_status = '0'
109     except IndexError:
110     self.exit_status = '1'
111     msg = "Error: Problem with "+file+" file"
112 spiga 1.27 common.logger.info(msg)
113 slacapra 1.1 return self.exit_status
114 fanzago 1.12
115     if (len(self.dataset_to_import) != 0):
116     for dataset in self.dataset_to_import:
117 spiga 1.27 common.logger.info("--->>> Importing parent dataset in the dbs: " +dataset)
118 fanzago 1.12 status_import=self.importParentDataset(self.globalDBS, dataset)
119     if (status_import == 1):
120 spiga 1.27 common.logger.info('Problem with parent '+ dataset +' import from the global DBS '+self.globalDBS+ 'to the local one '+self.DBSURL)
121 fanzago 1.12 self.exit_status='1'
122     return self.exit_status
123     else:
124 spiga 1.27 common.logger.info('Import ok of dataset '+dataset)
125 fanzago 1.2
126 slacapra 1.1 #// DBS to contact
127 fanzago 1.2 dbswriter = DBSWriter(self.DBSURL)
128 slacapra 1.1 try:
129     fileinfo= jobReport.files[0]
130     self.exit_status = '0'
131     except IndexError:
132     self.exit_status = '1'
133     msg = "Error: No file to publish in xml file"+file+" file"
134 spiga 1.27 common.logger.info(msg)
135 slacapra 1.1 return self.exit_status
136    
137     datasets=fileinfo.dataset
138 spiga 1.27 common.logger.log(10-1,"FileInfo = " + str(fileinfo))
139     common.logger.log(10-1,"DatasetInfo = " + str(datasets))
140 afanfani 1.19 if len(datasets)<=0:
141     self.exit_status = '1'
142     msg = "Error: No info about dataset in the xml file "+file
143 spiga 1.27 common.logger.info(msg)
144 afanfani 1.19 return self.exit_status
145 slacapra 1.1 for dataset in datasets:
146 fanzago 1.6 #### for production data
147 afanfani 1.16 self.processedData = dataset['ProcessedDataset']
148 fanzago 1.6 if (dataset['PrimaryDataset'] == 'null'):
149 fanzago 1.21 #dataset['PrimaryDataset'] = dataset['ProcessedDataset']
150     dataset['PrimaryDataset'] = self.userprocessedData
151     #else: # add parentage from input dataset
152     elif self.datasetpath.upper() != 'NONE':
153 afanfani 1.14 dataset['ParentDataset']= self.datasetpath
154    
155 fanzago 1.4 dataset['PSetContent']=self.content
156     cfgMeta = {'name' : self.pset , 'Type' : 'user' , 'annotation': 'user cfg', 'version' : 'private version'} # add real name of user cfg
157 spiga 1.27 common.logger.info("PrimaryDataset = %s"%dataset['PrimaryDataset'])
158     common.logger.info("ProcessedDataset = %s"%dataset['ProcessedDataset'])
159     common.logger.info("<User Dataset Name> = /"+dataset['PrimaryDataset']+"/"+dataset['ProcessedDataset']+"/USER")
160 fanzago 1.28 self.dataset_to_check="/"+dataset['PrimaryDataset']+"/"+dataset['ProcessedDataset']+"/USER"
161 fanzago 1.6
162 spiga 1.27 common.logger.log(10-1,"--->>> Inserting primary: %s processed : %s"%(dataset['PrimaryDataset'],dataset['ProcessedDataset']))
163 slacapra 1.1
164     primary = DBSWriterObjects.createPrimaryDataset( dataset, dbswriter.dbs)
165 spiga 1.27 common.logger.log(10-1,"Primary: %s "%primary)
166 slacapra 1.1
167     algo = DBSWriterObjects.createAlgorithm(dataset, cfgMeta, dbswriter.dbs)
168 spiga 1.27 common.logger.log(10-1,"Algo: %s "%algo)
169 slacapra 1.1
170     processed = DBSWriterObjects.createProcessedDataset(primary, algo, dataset, dbswriter.dbs)
171 spiga 1.27 common.logger.log(10-1,"Processed: %s "%processed)
172 slacapra 1.1
173 spiga 1.27 common.logger.log(10-1,"Inserted primary %s processed %s"%(primary,processed))
174 fanzago 1.2
175 spiga 1.27 common.logger.log(10-1,"exit_status = %s "%self.exit_status)
176 slacapra 1.1 return self.exit_status
177    
178     def publishAJobReport(self,file,procdataset):
179     """
180 fanzago 1.2 input: xml file, processedDataset
181 slacapra 1.1 """
182     try:
183     jobReport = readJobReport(file)[0]
184     self.exit_status = '0'
185     except IndexError:
186     self.exit_status = '1'
187     msg = "Error: Problem with "+file+" file"
188     raise CrabException(msg)
189 fanzago 1.4 ### overwrite ProcessedDataset with user defined value
190     ### overwrite lumisections with no value
191     ### skip publication for 0 events files
192     filestopublish=[]
193 slacapra 1.1 for file in jobReport.files:
194 fanzago 1.5 #### added check for problem with copy to SE and empty lfn
195     if (string.find(file['LFN'], 'copy_problems') != -1):
196     self.problemFiles.append(file['LFN'])
197     elif (file['LFN'] == ''):
198     self.noLFN.append(file['PFN'])
199 fanzago 1.4 else:
200 spiga 1.24 if self.skipOcheck==0:
201     if int(file['TotalEvents']) != 0:
202     #file.lumisections = {}
203     # lumi info are now in run hash
204     file.runs = {}
205     for ds in file.dataset:
206     ### Fede for production
207     if (ds['PrimaryDataset'] == 'null'):
208     #ds['PrimaryDataset']=procdataset
209     ds['PrimaryDataset']=self.userprocessedData
210     filestopublish.append(file)
211     else:
212     self.noEventsFiles.append(file['LFN'])
213     else:
214     file.runs = {}
215     for ds in file.dataset:
216     ### Fede for production
217     if (ds['PrimaryDataset'] == 'null'):
218     #ds['PrimaryDataset']=procdataset
219     ds['PrimaryDataset']=self.userprocessedData
220     filestopublish.append(file)
221    
222 fanzago 1.4 jobReport.files = filestopublish
223     ### if all files of FJR have number of events = 0
224     if (len(filestopublish) == 0):
225     return None
226    
227 slacapra 1.1 #// DBS to contact
228 fanzago 1.2 dbswriter = DBSWriter(self.DBSURL)
229 slacapra 1.1 # insert files
230     Blocks=None
231     try:
232     Blocks=dbswriter.insertFiles(jobReport)
233 spiga 1.27 common.logger.info("Inserting file in blocks = %s"%Blocks)
234 slacapra 1.1 except DBSWriterError, ex:
235 spiga 1.27 common.logger.info("Insert file error: %s"%ex)
236 slacapra 1.1 return Blocks
237    
238     def run(self):
239     """
240     parse of all xml file on res dir and creation of distionary
241     """
242 fanzago 1.2
243 slacapra 1.1 file_list = glob.glob(self.resDir+"crab_fjr*.xml")
244 afanfani 1.19 ## Select only those fjr that are succesfull
245     good_list=[]
246     for fjr in file_list:
247     reports = readJobReport(fjr)
248 afanfani 1.20 if len(reports)>0:
249     if reports[0].status == "Success":
250     good_list.append(fjr)
251 afanfani 1.19 file_list=good_list
252     ##
253 spiga 1.27 common.logger.log(10-1, "file_list = "+str(file_list))
254     common.logger.log(10-1, "len(file_list) = "+str(len(file_list)))
255 fanzago 1.2
256 slacapra 1.1 if (len(file_list)>0):
257     BlocksList=[]
258 spiga 1.27 common.logger.info("--->>> Start dataset publication")
259 fanzago 1.2 self.exit_status=self.publishDataset(file_list[0])
260     if (self.exit_status == '1'):
261     return self.exit_status
262 spiga 1.27 common.logger.info("--->>> End dataset publication")
263 fanzago 1.2
264    
265 spiga 1.27 common.logger.info("--->>> Start files publication")
266 slacapra 1.1 for file in file_list:
267 spiga 1.27 common.logger.debug( "file = "+file)
268 slacapra 1.1 Blocks=self.publishAJobReport(file,self.processedData)
269     if Blocks:
270 afanfani 1.14 for x in Blocks: # do not allow multiple entries of the same block
271     if x not in BlocksList:
272     BlocksList.append(x)
273 fanzago 1.2
274 slacapra 1.1 # close the blocks
275 spiga 1.27 common.logger.log(10-1, "BlocksList = %s"%BlocksList)
276 fanzago 1.2 # dbswriter = DBSWriter(self.DBSURL,level='ERROR')
277     dbswriter = DBSWriter(self.DBSURL)
278    
279 slacapra 1.1 for BlockName in BlocksList:
280     try:
281     closeBlock=dbswriter.manageFileBlock(BlockName,maxFiles= 1)
282 spiga 1.27 common.logger.log(10-1, "closeBlock %s"%closeBlock)
283 slacapra 1.1 #dbswriter.dbs.closeBlock(BlockName)
284     except DBSWriterError, ex:
285 spiga 1.27 common.logger.info("Close block error %s"%ex)
286 fanzago 1.4
287 fanzago 1.5 if (len(self.noEventsFiles)>0):
288 spiga 1.27 common.logger.info("--->>> WARNING: "+str(len(self.noEventsFiles))+" files not published because they contain 0 events are:")
289 fanzago 1.5 for lfn in self.noEventsFiles:
290 spiga 1.27 common.logger.info("------ LFN: %s"%lfn)
291 fanzago 1.5 if (len(self.noLFN)>0):
292 spiga 1.27 common.logger.info("--->>> WARNING: there are "+str(len(self.noLFN))+" files not published because they have empty LFN")
293 fanzago 1.5 for pfn in self.noLFN:
294 spiga 1.27 common.logger.info("------ pfn: %s"%pfn)
295 fanzago 1.5 if (len(self.problemFiles)>0):
296 spiga 1.27 common.logger.info("--->>> WARNING: "+str(len(self.problemFiles))+" files not published because they had problem with copy to SE")
297 fanzago 1.5 for lfn in self.problemFiles:
298 spiga 1.27 common.logger.info("------ LFN: %s"%lfn)
299     common.logger.info("--->>> End files publication")
300 fanzago 1.28
301     self.cfg_params['USER.dataset_to_check']=self.dataset_to_check
302     from InspectDBS import InspectDBS
303     check=InspectDBS(self.cfg_params)
304     check.checkPublication()
305 slacapra 1.1 return self.exit_status
306    
307     else:
308 spiga 1.27 common.logger.info("--->>> "+self.resDir+" empty: no file to publish on DBS")
309 slacapra 1.1 self.exit_status = '1'
310     return self.exit_status
311