ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Publisher.py
Revision: 1.19
Committed: Mon Nov 17 13:02:07 2008 UTC (16 years, 5 months ago) by afanfani
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_4_3, CRAB_2_4_3_pre8, CRAB_2_4_3_pre7, CRAB_2_4_3_pre6, CRAB_2_4_3_pre5, CRAB_2_4_3_pre3, CRAB_2_4_3_pre2, CRAB_2_4_3_pre1
Changes since 1.18: +14 -0 lines
Log Message:
Take into account only succesfull fjr . See bug #44121

File Contents

# Content
1 import getopt, string
2 import common
3 import time, glob
4 from Actor import *
5 from crab_util import *
6 from crab_logger import Logger
7 from crab_exceptions import *
8 from ProdCommon.FwkJobRep.ReportParser import readJobReport
9 from ProdCommon.FwkJobRep.ReportState import checkSuccess
10 from ProdCommon.MCPayloads.WorkflowSpec import WorkflowSpec
11 from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter
12 from ProdCommon.DataMgmt.DBS.DBSErrors import DBSWriterError, formatEx,DBSReaderError
13 from ProdCommon.DataMgmt.DBS.DBSReader import DBSReader
14 from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter,DBSWriterObjects
15 import sys
16
17 class Publisher(Actor):
18 def __init__(self, cfg_params):
19 """
20 Publisher class:
21
22 - parses CRAB FrameworkJobReport on UI
23 - returns <file> section of xml in dictionary format for each xml file in crab_0_xxxx/res directory
24 - publishes output data on DBS and DLS
25 """
26
27 try:
28 userprocessedData = cfg_params['USER.publish_data_name']
29 self.processedData = None
30 except KeyError:
31 raise CrabException('Cannot publish output data, because you did not specify USER.publish_data_name parameter in the crab.cfg file')
32
33 try:
34 if (int(cfg_params['USER.copy_data']) != 1): raise KeyError
35 except KeyError:
36 raise CrabException('You can not publish data because you did not selected *** copy_data = 1 *** in the crab.cfg file')
37 try:
38 self.pset = cfg_params['CMSSW.pset']
39 except KeyError:
40 raise CrabException('Cannot publish output data, because you did not specify the psetname in [CMSSW] of your crab.cfg file')
41 try:
42 self.globalDBS=cfg_params['CMSSW.dbs_url']
43 except KeyError:
44 self.globalDBS="http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet"
45 try:
46 self.DBSURL=cfg_params['USER.dbs_url_for_publication']
47 common.logger.message('<dbs_url_for_publication> = '+self.DBSURL)
48 if (self.DBSURL == "http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet") or (self.DBSURL == "https://cmsdbsprod.cern.ch:8443/cms_dbs_prod_global_writer/servlet/DBSServlet"):
49 msg = "You can not publish your data in the globalDBS = " + self.DBSURL + "\n"
50 msg = msg + "Please write your local one in the [USER] section 'dbs_url_for_publication'"
51 raise CrabException(msg)
52 except KeyError:
53 msg = "Warning. The [USER] section does not have 'dbs_url_for_publication'"
54 msg = msg + " entry, necessary to publish the data.\n"
55 msg = msg + "Use the command **crab -publish -USER.dbs_url_for_publication=dbs_url_for_publication*** \nwhere dbs_url_for_publication is your local dbs instance."
56 raise CrabException(msg)
57
58 self.content=file(self.pset).read()
59 self.resDir = common.work_space.resDir()
60
61 self.dataset_to_import=[]
62
63 self.datasetpath=cfg_params['CMSSW.datasetpath']
64 if (self.datasetpath.upper() != 'NONE'):
65 self.dataset_to_import.append(self.datasetpath)
66
67 ### Added PU dataset
68 tmp = cfg_params.get('CMSSW.dataset_pu',None)
69 if tmp :
70 datasets = tmp.split(',')
71 for dataset in datasets:
72 dataset=string.strip(dataset)
73 self.dataset_to_import.append(dataset)
74 ###
75
76 self.SEName=''
77 self.CMSSW_VERSION=''
78 self.exit_status=''
79 self.time = time.strftime('%y%m%d_%H%M%S',time.localtime(time.time()))
80 self.problemFiles=[]
81 self.noEventsFiles=[]
82 self.noLFN=[]
83
84 def importParentDataset(self,globalDBS, datasetpath):
85 """
86 """
87 dbsWriter = DBSWriter(self.DBSURL,level='ERROR')
88
89 try:
90 dbsWriter.importDatasetWithoutParentage(globalDBS, self.datasetpath, self.DBSURL)
91 except DBSWriterError, ex:
92 msg = "Error importing dataset to be processed into local DBS\n"
93 msg += "Source Dataset: %s\n" % datasetpath
94 msg += "Source DBS: %s\n" % globalDBS
95 msg += "Destination DBS: %s\n" % self.DBSURL
96 common.logger.message(msg)
97 return 1
98 return 0
99
100 def publishDataset(self,file):
101 """
102 """
103 try:
104 jobReport = readJobReport(file)[0]
105 self.exit_status = '0'
106 except IndexError:
107 self.exit_status = '1'
108 msg = "Error: Problem with "+file+" file"
109 common.logger.message(msg)
110 return self.exit_status
111
112 if (len(self.dataset_to_import) != 0):
113 for dataset in self.dataset_to_import:
114 common.logger.message("--->>> Importing parent dataset in the dbs: " +dataset)
115 status_import=self.importParentDataset(self.globalDBS, dataset)
116 if (status_import == 1):
117 common.logger.message('Problem with parent '+ dataset +' import from the global DBS '+self.globalDBS+ 'to the local one '+self.DBSURL)
118 self.exit_status='1'
119 return self.exit_status
120 else:
121 common.logger.message('Import ok of dataset '+dataset)
122
123 #// DBS to contact
124 dbswriter = DBSWriter(self.DBSURL)
125 try:
126 fileinfo= jobReport.files[0]
127 self.exit_status = '0'
128 except IndexError:
129 self.exit_status = '1'
130 msg = "Error: No file to publish in xml file"+file+" file"
131 common.logger.message(msg)
132 return self.exit_status
133
134 datasets=fileinfo.dataset
135 common.logger.debug(6,"FileInfo = " + str(fileinfo))
136 common.logger.debug(6,"DatasetInfo = " + str(datasets))
137 if len(datasets)<=0:
138 self.exit_status = '1'
139 msg = "Error: No info about dataset in the xml file "+file
140 common.logger.message(msg)
141 return self.exit_status
142 for dataset in datasets:
143 #### for production data
144 self.processedData = dataset['ProcessedDataset']
145 if (dataset['PrimaryDataset'] == 'null'):
146 dataset['PrimaryDataset'] = dataset['ProcessedDataset']
147 else: # add parentage from input dataset
148 dataset['ParentDataset']= self.datasetpath
149
150 dataset['PSetContent']=self.content
151 cfgMeta = {'name' : self.pset , 'Type' : 'user' , 'annotation': 'user cfg', 'version' : 'private version'} # add real name of user cfg
152 common.logger.message("PrimaryDataset = %s"%dataset['PrimaryDataset'])
153 common.logger.message("ProcessedDataset = %s"%dataset['ProcessedDataset'])
154 common.logger.message("<User Dataset Name> = /"+dataset['PrimaryDataset']+"/"+dataset['ProcessedDataset']+"/USER")
155
156 common.logger.debug(6,"--->>> Inserting primary: %s processed : %s"%(dataset['PrimaryDataset'],dataset['ProcessedDataset']))
157
158 primary = DBSWriterObjects.createPrimaryDataset( dataset, dbswriter.dbs)
159 common.logger.debug(6,"Primary: %s "%primary)
160
161 algo = DBSWriterObjects.createAlgorithm(dataset, cfgMeta, dbswriter.dbs)
162 common.logger.debug(6,"Algo: %s "%algo)
163
164 processed = DBSWriterObjects.createProcessedDataset(primary, algo, dataset, dbswriter.dbs)
165 common.logger.debug(6,"Processed: %s "%processed)
166
167 common.logger.debug(6,"Inserted primary %s processed %s"%(primary,processed))
168
169 common.logger.debug(6,"exit_status = %s "%self.exit_status)
170 return self.exit_status
171
172 def publishAJobReport(self,file,procdataset):
173 """
174 input: xml file, processedDataset
175 """
176 try:
177 jobReport = readJobReport(file)[0]
178 self.exit_status = '0'
179 except IndexError:
180 self.exit_status = '1'
181 msg = "Error: Problem with "+file+" file"
182 raise CrabException(msg)
183 ### overwrite ProcessedDataset with user defined value
184 ### overwrite lumisections with no value
185 ### skip publication for 0 events files
186 filestopublish=[]
187 for file in jobReport.files:
188 #### added check for problem with copy to SE and empty lfn
189 if (string.find(file['LFN'], 'copy_problems') != -1):
190 self.problemFiles.append(file['LFN'])
191 elif (file['LFN'] == ''):
192 self.noLFN.append(file['PFN'])
193 else:
194 if int(file['TotalEvents']) != 0 :
195 #file.lumisections = {}
196 # lumi info are now in run hash
197 file.runs = {}
198 for ds in file.dataset:
199 ### FEDE FOR NEW LFN ###
200 #ds['ProcessedDataset']=procdataset
201 ########################
202 ### Fede for production
203 if (ds['PrimaryDataset'] == 'null'):
204 ds['PrimaryDataset']=procdataset
205 filestopublish.append(file)
206 else:
207 self.noEventsFiles.append(file['LFN'])
208 jobReport.files = filestopublish
209 ### if all files of FJR have number of events = 0
210 if (len(filestopublish) == 0):
211 return None
212
213 #// DBS to contact
214 dbswriter = DBSWriter(self.DBSURL)
215 # insert files
216 Blocks=None
217 try:
218 Blocks=dbswriter.insertFiles(jobReport)
219 common.logger.message("Inserting file in blocks = %s"%Blocks)
220 except DBSWriterError, ex:
221 common.logger.error("Insert file error: %s"%ex)
222 return Blocks
223
224 def run(self):
225 """
226 parse of all xml file on res dir and creation of distionary
227 """
228
229 file_list = glob.glob(self.resDir+"crab_fjr*.xml")
230 ## Select only those fjr that are succesfull
231 good_list=[]
232 for fjr in file_list:
233 reports = readJobReport(fjr)
234 if reports[0].status == "Success":
235 good_list.append(fjr)
236 file_list=good_list
237 ##
238 common.logger.debug(6, "file_list = "+str(file_list))
239 common.logger.debug(6, "len(file_list) = "+str(len(file_list)))
240
241 if (len(file_list)>0):
242 BlocksList=[]
243 common.logger.message("--->>> Start dataset publication")
244 self.exit_status=self.publishDataset(file_list[0])
245 if (self.exit_status == '1'):
246 return self.exit_status
247 common.logger.message("--->>> End dataset publication")
248
249
250 common.logger.message("--->>> Start files publication")
251 for file in file_list:
252 common.logger.debug(1, "file = "+file)
253 Blocks=self.publishAJobReport(file,self.processedData)
254 if Blocks:
255 for x in Blocks: # do not allow multiple entries of the same block
256 if x not in BlocksList:
257 BlocksList.append(x)
258
259 # close the blocks
260 common.logger.debug(6, "BlocksList = %s"%BlocksList)
261 # dbswriter = DBSWriter(self.DBSURL,level='ERROR')
262 dbswriter = DBSWriter(self.DBSURL)
263
264 for BlockName in BlocksList:
265 try:
266 closeBlock=dbswriter.manageFileBlock(BlockName,maxFiles= 1)
267 common.logger.debug(6, "closeBlock %s"%closeBlock)
268 #dbswriter.dbs.closeBlock(BlockName)
269 except DBSWriterError, ex:
270 common.logger.message("Close block error %s"%ex)
271
272 if (len(self.noEventsFiles)>0):
273 common.logger.message("--->>> WARNING: "+str(len(self.noEventsFiles))+" files not published because they contain 0 events are:")
274 for lfn in self.noEventsFiles:
275 common.logger.message("------ LFN: %s"%lfn)
276 if (len(self.noLFN)>0):
277 common.logger.message("--->>> WARNING: there are "+str(len(self.noLFN))+" files not published because they have empty LFN")
278 for pfn in self.noLFN:
279 common.logger.message("------ pfn: %s"%pfn)
280 if (len(self.problemFiles)>0):
281 common.logger.message("--->>> WARNING: "+str(len(self.problemFiles))+" files not published because they had problem with copy to SE")
282 for lfn in self.problemFiles:
283 common.logger.message("------ LFN: %s"%lfn)
284 common.logger.message("--->>> End files publication")
285 common.logger.message("--->>> To check data publication please use: InspectDBS2.py --DBSURL=<dbs_url_for_publication> --datasetPath=<User Dataset Name>")
286 return self.exit_status
287
288 else:
289 common.logger.message("--->>> "+self.resDir+" empty: no file to publish on DBS")
290 self.exit_status = '1'
291 return self.exit_status
292