ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Publisher.py
Revision: 1.21
Committed: Tue Jan 20 11:20:46 2009 UTC (16 years, 3 months ago) by fanzago
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_5_0_pre1, CRAB_2_4_4, CRAB_2_4_4_pre6, CRAB_2_4_4_pre5, CRAB_2_4_4_pre4, CRAB_2_4_4_pre3
Changes since 1.20: +7 -7 lines
Log Message:
shorter dataset name in case of production, fixed problem with datasetpath NONE

File Contents

# User Rev Content
1 fanzago 1.2 import getopt, string
2 slacapra 1.1 import common
3     import time, glob
4     from Actor import *
5     from crab_util import *
6     from crab_logger import Logger
7     from crab_exceptions import *
8 fanzago 1.9 from ProdCommon.FwkJobRep.ReportParser import readJobReport
9 afanfani 1.19 from ProdCommon.FwkJobRep.ReportState import checkSuccess
10 slacapra 1.1 from ProdCommon.MCPayloads.WorkflowSpec import WorkflowSpec
11     from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter
12     from ProdCommon.DataMgmt.DBS.DBSErrors import DBSWriterError, formatEx,DBSReaderError
13     from ProdCommon.DataMgmt.DBS.DBSReader import DBSReader
14 fanzago 1.2 from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter,DBSWriterObjects
15     import sys
16 slacapra 1.1
17     class Publisher(Actor):
18     def __init__(self, cfg_params):
19     """
20     Publisher class:
21    
22     - parses CRAB FrameworkJobReport on UI
23     - returns <file> section of xml in dictionary format for each xml file in crab_0_xxxx/res directory
24     - publishes output data on DBS and DLS
25     """
26    
27     try:
28 fanzago 1.21 self.userprocessedData = cfg_params['USER.publish_data_name']
29 afanfani 1.16 self.processedData = None
30 slacapra 1.1 except KeyError:
31     raise CrabException('Cannot publish output data, because you did not specify USER.publish_data_name parameter in the crab.cfg file')
32 slacapra 1.8
33 slacapra 1.1 try:
34     if (int(cfg_params['USER.copy_data']) != 1): raise KeyError
35     except KeyError:
36     raise CrabException('You can not publish data because you did not selected *** copy_data = 1 *** in the crab.cfg file')
37 fanzago 1.4 try:
38     self.pset = cfg_params['CMSSW.pset']
39     except KeyError:
40     raise CrabException('Cannot publish output data, because you did not specify the psetname in [CMSSW] of your crab.cfg file')
41     try:
42     self.globalDBS=cfg_params['CMSSW.dbs_url']
43     except KeyError:
44     self.globalDBS="http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet"
45 fanzago 1.3 try:
46     self.DBSURL=cfg_params['USER.dbs_url_for_publication']
47 fanzago 1.6 common.logger.message('<dbs_url_for_publication> = '+self.DBSURL)
48     if (self.DBSURL == "http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet") or (self.DBSURL == "https://cmsdbsprod.cern.ch:8443/cms_dbs_prod_global_writer/servlet/DBSServlet"):
49 fanzago 1.4 msg = "You can not publish your data in the globalDBS = " + self.DBSURL + "\n"
50     msg = msg + "Please write your local one in the [USER] section 'dbs_url_for_publication'"
51     raise CrabException(msg)
52 fanzago 1.3 except KeyError:
53 fanzago 1.11 msg = "Warning. The [USER] section does not have 'dbs_url_for_publication'"
54     msg = msg + " entry, necessary to publish the data.\n"
55     msg = msg + "Use the command **crab -publish -USER.dbs_url_for_publication=dbs_url_for_publication*** \nwhere dbs_url_for_publication is your local dbs instance."
56 fanzago 1.3 raise CrabException(msg)
57 fanzago 1.4
58     self.content=file(self.pset).read()
59     self.resDir = common.work_space.resDir()
60 fanzago 1.12
61     self.dataset_to_import=[]
62    
63 slacapra 1.1 self.datasetpath=cfg_params['CMSSW.datasetpath']
64 fanzago 1.12 if (self.datasetpath.upper() != 'NONE'):
65     self.dataset_to_import.append(self.datasetpath)
66    
67     ### Added PU dataset
68 spiga 1.13 tmp = cfg_params.get('CMSSW.dataset_pu',None)
69 fanzago 1.12 if tmp :
70     datasets = tmp.split(',')
71     for dataset in datasets:
72     dataset=string.strip(dataset)
73     self.dataset_to_import.append(dataset)
74     ###
75    
76 slacapra 1.1 self.SEName=''
77     self.CMSSW_VERSION=''
78     self.exit_status=''
79     self.time = time.strftime('%y%m%d_%H%M%S',time.localtime(time.time()))
80 fanzago 1.5 self.problemFiles=[]
81     self.noEventsFiles=[]
82     self.noLFN=[]
83 slacapra 1.1
84     def importParentDataset(self,globalDBS, datasetpath):
85     """
86     """
87     dbsWriter = DBSWriter(self.DBSURL,level='ERROR')
88    
89     try:
90 afanfani 1.10 dbsWriter.importDatasetWithoutParentage(globalDBS, self.datasetpath, self.DBSURL)
91 slacapra 1.1 except DBSWriterError, ex:
92     msg = "Error importing dataset to be processed into local DBS\n"
93     msg += "Source Dataset: %s\n" % datasetpath
94     msg += "Source DBS: %s\n" % globalDBS
95     msg += "Destination DBS: %s\n" % self.DBSURL
96     common.logger.message(msg)
97     return 1
98     return 0
99    
100     def publishDataset(self,file):
101     """
102     """
103     try:
104     jobReport = readJobReport(file)[0]
105     self.exit_status = '0'
106     except IndexError:
107     self.exit_status = '1'
108     msg = "Error: Problem with "+file+" file"
109     common.logger.message(msg)
110     return self.exit_status
111 fanzago 1.12
112     if (len(self.dataset_to_import) != 0):
113     for dataset in self.dataset_to_import:
114     common.logger.message("--->>> Importing parent dataset in the dbs: " +dataset)
115     status_import=self.importParentDataset(self.globalDBS, dataset)
116     if (status_import == 1):
117     common.logger.message('Problem with parent '+ dataset +' import from the global DBS '+self.globalDBS+ 'to the local one '+self.DBSURL)
118     self.exit_status='1'
119     return self.exit_status
120     else:
121     common.logger.message('Import ok of dataset '+dataset)
122 fanzago 1.2
123 slacapra 1.1 #// DBS to contact
124 fanzago 1.2 dbswriter = DBSWriter(self.DBSURL)
125 slacapra 1.1 try:
126     fileinfo= jobReport.files[0]
127     self.exit_status = '0'
128     except IndexError:
129     self.exit_status = '1'
130     msg = "Error: No file to publish in xml file"+file+" file"
131     common.logger.message(msg)
132     return self.exit_status
133    
134     datasets=fileinfo.dataset
135 fanzago 1.4 common.logger.debug(6,"FileInfo = " + str(fileinfo))
136     common.logger.debug(6,"DatasetInfo = " + str(datasets))
137 afanfani 1.19 if len(datasets)<=0:
138     self.exit_status = '1'
139     msg = "Error: No info about dataset in the xml file "+file
140     common.logger.message(msg)
141     return self.exit_status
142 slacapra 1.1 for dataset in datasets:
143 fanzago 1.6 #### for production data
144 afanfani 1.16 self.processedData = dataset['ProcessedDataset']
145 fanzago 1.6 if (dataset['PrimaryDataset'] == 'null'):
146 fanzago 1.21 #dataset['PrimaryDataset'] = dataset['ProcessedDataset']
147     dataset['PrimaryDataset'] = self.userprocessedData
148     #else: # add parentage from input dataset
149     elif self.datasetpath.upper() != 'NONE':
150 afanfani 1.14 dataset['ParentDataset']= self.datasetpath
151    
152 fanzago 1.4 dataset['PSetContent']=self.content
153     cfgMeta = {'name' : self.pset , 'Type' : 'user' , 'annotation': 'user cfg', 'version' : 'private version'} # add real name of user cfg
154 slacapra 1.1 common.logger.message("PrimaryDataset = %s"%dataset['PrimaryDataset'])
155     common.logger.message("ProcessedDataset = %s"%dataset['ProcessedDataset'])
156 fanzago 1.6 common.logger.message("<User Dataset Name> = /"+dataset['PrimaryDataset']+"/"+dataset['ProcessedDataset']+"/USER")
157    
158     common.logger.debug(6,"--->>> Inserting primary: %s processed : %s"%(dataset['PrimaryDataset'],dataset['ProcessedDataset']))
159 slacapra 1.1
160     primary = DBSWriterObjects.createPrimaryDataset( dataset, dbswriter.dbs)
161 fanzago 1.4 common.logger.debug(6,"Primary: %s "%primary)
162 slacapra 1.1
163     algo = DBSWriterObjects.createAlgorithm(dataset, cfgMeta, dbswriter.dbs)
164 fanzago 1.4 common.logger.debug(6,"Algo: %s "%algo)
165 slacapra 1.1
166     processed = DBSWriterObjects.createProcessedDataset(primary, algo, dataset, dbswriter.dbs)
167 fanzago 1.4 common.logger.debug(6,"Processed: %s "%processed)
168 slacapra 1.1
169 fanzago 1.6 common.logger.debug(6,"Inserted primary %s processed %s"%(primary,processed))
170 fanzago 1.2
171 fanzago 1.4 common.logger.debug(6,"exit_status = %s "%self.exit_status)
172 slacapra 1.1 return self.exit_status
173    
174     def publishAJobReport(self,file,procdataset):
175     """
176 fanzago 1.2 input: xml file, processedDataset
177 slacapra 1.1 """
178     try:
179     jobReport = readJobReport(file)[0]
180     self.exit_status = '0'
181     except IndexError:
182     self.exit_status = '1'
183     msg = "Error: Problem with "+file+" file"
184     raise CrabException(msg)
185 fanzago 1.4 ### overwrite ProcessedDataset with user defined value
186     ### overwrite lumisections with no value
187     ### skip publication for 0 events files
188     filestopublish=[]
189 slacapra 1.1 for file in jobReport.files:
190 fanzago 1.5 #### added check for problem with copy to SE and empty lfn
191     if (string.find(file['LFN'], 'copy_problems') != -1):
192     self.problemFiles.append(file['LFN'])
193     elif (file['LFN'] == ''):
194     self.noLFN.append(file['PFN'])
195 fanzago 1.4 else:
196 fanzago 1.5 if int(file['TotalEvents']) != 0 :
197 afanfani 1.17 #file.lumisections = {}
198     # lumi info are now in run hash
199     file.runs = {}
200 fanzago 1.5 for ds in file.dataset:
201 fanzago 1.6 ### Fede for production
202     if (ds['PrimaryDataset'] == 'null'):
203 fanzago 1.21 #ds['PrimaryDataset']=procdataset
204     ds['PrimaryDataset']=self.userprocessedData
205 fanzago 1.5 filestopublish.append(file)
206     else:
207     self.noEventsFiles.append(file['LFN'])
208 fanzago 1.4 jobReport.files = filestopublish
209     ### if all files of FJR have number of events = 0
210     if (len(filestopublish) == 0):
211     return None
212    
213 slacapra 1.1 #// DBS to contact
214 fanzago 1.2 dbswriter = DBSWriter(self.DBSURL)
215 slacapra 1.1 # insert files
216     Blocks=None
217     try:
218     Blocks=dbswriter.insertFiles(jobReport)
219 mcinquil 1.18 common.logger.message("Inserting file in blocks = %s"%Blocks)
220 slacapra 1.1 except DBSWriterError, ex:
221 mcinquil 1.18 common.logger.error("Insert file error: %s"%ex)
222 slacapra 1.1 return Blocks
223    
224     def run(self):
225     """
226     parse of all xml file on res dir and creation of distionary
227     """
228 fanzago 1.2
229 slacapra 1.1 file_list = glob.glob(self.resDir+"crab_fjr*.xml")
230 afanfani 1.19 ## Select only those fjr that are succesfull
231     good_list=[]
232     for fjr in file_list:
233     reports = readJobReport(fjr)
234 afanfani 1.20 if len(reports)>0:
235     if reports[0].status == "Success":
236     good_list.append(fjr)
237 afanfani 1.19 file_list=good_list
238     ##
239 slacapra 1.1 common.logger.debug(6, "file_list = "+str(file_list))
240     common.logger.debug(6, "len(file_list) = "+str(len(file_list)))
241 fanzago 1.2
242 slacapra 1.1 if (len(file_list)>0):
243     BlocksList=[]
244 fanzago 1.2 common.logger.message("--->>> Start dataset publication")
245     self.exit_status=self.publishDataset(file_list[0])
246     if (self.exit_status == '1'):
247     return self.exit_status
248     common.logger.message("--->>> End dataset publication")
249    
250    
251     common.logger.message("--->>> Start files publication")
252 slacapra 1.1 for file in file_list:
253 mcinquil 1.18 common.logger.debug(1, "file = "+file)
254 slacapra 1.1 Blocks=self.publishAJobReport(file,self.processedData)
255     if Blocks:
256 afanfani 1.14 for x in Blocks: # do not allow multiple entries of the same block
257     if x not in BlocksList:
258     BlocksList.append(x)
259 fanzago 1.2
260 slacapra 1.1 # close the blocks
261 fanzago 1.6 common.logger.debug(6, "BlocksList = %s"%BlocksList)
262 fanzago 1.2 # dbswriter = DBSWriter(self.DBSURL,level='ERROR')
263     dbswriter = DBSWriter(self.DBSURL)
264    
265 slacapra 1.1 for BlockName in BlocksList:
266     try:
267     closeBlock=dbswriter.manageFileBlock(BlockName,maxFiles= 1)
268 fanzago 1.6 common.logger.debug(6, "closeBlock %s"%closeBlock)
269 slacapra 1.1 #dbswriter.dbs.closeBlock(BlockName)
270     except DBSWriterError, ex:
271 fanzago 1.2 common.logger.message("Close block error %s"%ex)
272 fanzago 1.4
273 fanzago 1.5 if (len(self.noEventsFiles)>0):
274     common.logger.message("--->>> WARNING: "+str(len(self.noEventsFiles))+" files not published because they contain 0 events are:")
275     for lfn in self.noEventsFiles:
276     common.logger.message("------ LFN: %s"%lfn)
277     if (len(self.noLFN)>0):
278     common.logger.message("--->>> WARNING: there are "+str(len(self.noLFN))+" files not published because they have empty LFN")
279     for pfn in self.noLFN:
280     common.logger.message("------ pfn: %s"%pfn)
281     if (len(self.problemFiles)>0):
282     common.logger.message("--->>> WARNING: "+str(len(self.problemFiles))+" files not published because they had problem with copy to SE")
283     for lfn in self.problemFiles:
284 fanzago 1.4 common.logger.message("------ LFN: %s"%lfn)
285 fanzago 1.6 common.logger.message("--->>> End files publication")
286 fanzago 1.7 common.logger.message("--->>> To check data publication please use: InspectDBS2.py --DBSURL=<dbs_url_for_publication> --datasetPath=<User Dataset Name>")
287 slacapra 1.1 return self.exit_status
288    
289     else:
290 fanzago 1.2 common.logger.message("--->>> "+self.resDir+" empty: no file to publish on DBS")
291 slacapra 1.1 self.exit_status = '1'
292     return self.exit_status
293