ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Publisher.py
Revision: 1.40.2.4
Committed: Tue Oct 13 14:01:15 2009 UTC (15 years, 6 months ago) by ewv
Content type: text/x-python
Branch: CRAB_2_6_X_br
CVS Tags: CRAB_2_6_6_pre6, CRAB_2_6_6_pre5, CRAB_2_6_6_pre4, CRAB_2_6_6_pre3, CRAB_2_6_6_pre2, CRAB_2_6_6_check, CRAB_2_6_6, CRAB_2_6_6_pre1, CRAB_2_6_5, CRAB_2_6_5_pre1, CRAB_2_6_4, CRAB_2_6_4_pre1, CRAB_2_6_3_patch_2
Changes since 1.40.2.3: +46 -41 lines
Log Message:
Disable option to use publish_with_import_all_parents so we can reevaluate

File Contents

# Content
1 import getopt, string
2 import common
3 import time, glob
4 from Actor import *
5 from crab_util import *
6 from crab_exceptions import *
7 from ProdCommon.FwkJobRep.ReportParser import readJobReport
8 from ProdCommon.FwkJobRep.ReportState import checkSuccess
9 from ProdCommon.MCPayloads.WorkflowSpec import WorkflowSpec
10 from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter
11 from ProdCommon.DataMgmt.DBS.DBSErrors import DBSWriterError, formatEx,DBSReaderError
12 from ProdCommon.DataMgmt.DBS.DBSReader import DBSReader
13 from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter,DBSWriterObjects
14 import sys
15 from DBSAPI.dbsApiException import DbsException
16 from DBSAPI.dbsApi import DbsApi
17
18 class Publisher(Actor):
19 def __init__(self, cfg_params):
20 """
21 Publisher class:
22
23 - parses CRAB FrameworkJobReport on UI
24 - returns <file> section of xml in dictionary format for each xml file in crab_0_xxxx/res directory
25 - publishes output data on DBS and DLS
26 """
27
28 self.cfg_params=cfg_params
29
30 if not cfg_params.has_key('USER.publish_data_name'):
31 raise CrabException('Cannot publish output data, because you did not specify USER.publish_data_name parameter in the crab.cfg file')
32 self.userprocessedData = cfg_params['USER.publish_data_name']
33 self.processedData = None
34
35 if (not cfg_params.has_key('USER.copy_data') or int(cfg_params['USER.copy_data']) != 1 ) or \
36 (not cfg_params.has_key('USER.publish_data') or int(cfg_params['USER.publish_data']) != 1 ):
37 msg = 'You can not publish data because you did not selected \n'
38 msg += '\t*** copy_data = 1 and publish_data = 1 *** in the crab.cfg file'
39 raise CrabException(msg)
40
41 if not cfg_params.has_key('CMSSW.pset'):
42 raise CrabException('Cannot publish output data, because you did not specify the psetname in [CMSSW] of your crab.cfg file')
43 self.pset = cfg_params['CMSSW.pset']
44
45 self.globalDBS=cfg_params.get('CMSSW.dbs_url',"http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet")
46
47 if not cfg_params.has_key('USER.dbs_url_for_publication'):
48 msg = "Warning. The [USER] section does not have 'dbs_url_for_publication'"
49 msg = msg + " entry, necessary to publish the data.\n"
50 msg = msg + "Use the command **crab -publish -USER.dbs_url_for_publication=dbs_url_for_publication*** \nwhere dbs_url_for_publication is your local dbs instance."
51 raise CrabException(msg)
52
53 self.DBSURL=cfg_params['USER.dbs_url_for_publication']
54 common.logger.info('<dbs_url_for_publication> = '+self.DBSURL)
55 if (self.DBSURL == "http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet") or (self.DBSURL == "https://cmsdbsprod.cern.ch:8443/cms_dbs_prod_global_writer/servlet/DBSServlet"):
56 msg = "You can not publish your data in the globalDBS = " + self.DBSURL + "\n"
57 msg = msg + "Please write your local one in the [USER] section 'dbs_url_for_publication'"
58 raise CrabException(msg)
59
60 self.content=file(self.pset).read()
61 self.resDir = common.work_space.resDir()
62
63 self.dataset_to_import=[]
64
65 self.datasetpath=cfg_params['CMSSW.datasetpath']
66 if (self.datasetpath.upper() != 'NONE'):
67 self.dataset_to_import.append(self.datasetpath)
68
69 ### Added PU dataset
70 tmp = cfg_params.get('CMSSW.dataset_pu',None)
71 if tmp :
72 datasets = tmp.split(',')
73 for dataset in datasets:
74 dataset=string.strip(dataset)
75 self.dataset_to_import.append(dataset)
76 ###
77
78 self.import_all_parents = cfg_params.get('USER.publish_with_import_all_parents',1)
79 if not self.import_all_parents:
80 raise CrabException("Support for publish_with_import_all_parents has been disabled." +
81 "If you think you need this, please contact CRAB Feedback HN")
82 self.skipOcheck=cfg_params.get('CMSSW.publish_zero_event',0)
83
84 self.SEName=''
85 self.CMSSW_VERSION=''
86 self.exit_status=''
87 self.time = time.strftime('%y%m%d_%H%M%S',time.localtime(time.time()))
88 self.problemFiles=[]
89 self.noEventsFiles=[]
90 self.noLFN=[]
91
92 def importParentDataset(self,globalDBS, datasetpath):
93 """
94 """
95 dbsWriter = DBSWriter(self.DBSURL,level='ERROR')
96
97 try:
98 if (self.import_all_parents==1):
99 common.logger.info("--->>> Importing all parents level")
100 start = time.time()
101 common.logger.debug("start import time: " + str(start))
102 ### to skip the ProdCommon api exception in the case of block without location
103 ### skipNoSiteError=True
104 #dbsWriter.importDataset(globalDBS, datasetpath, self.DBSURL, skipNoSiteError=True)
105 ### calling dbs api directly
106 dbsWriter.dbs.migrateDatasetContents(globalDBS, self.DBSURL, datasetpath)
107 stop = time.time()
108 common.logger.debug("stop import time: " + str(stop))
109 common.logger.info("--->>> duration of all parents import (sec): "+str(stop - start))
110
111 else:
112 raise CrabException("Support for publish_with_import_all_parents has been disabled." +
113 "If you think you need this, please contact CRAB Feedback HN")
114 common.logger.info("--->>> Importing only the datasetpath " + datasetpath)
115 start = time.time()
116 #dbsWriter.importDatasetWithoutParentage(globalDBS, datasetpath, self.DBSURL, skipNoSiteError=True)
117 ### calling dbs api directly
118 common.logger.debug("start import time: " + str(start))
119 dbsWriter.dbs.migrateDatasetContents(globalDBS, self.DBSURL, datasetpath, noParentsReadOnly = True )
120 stop = time.time()
121 common.logger.debug("stop import time: " + str(stop))
122 common.logger.info("--->>> duration of first level parent import (sec): "+str(stop - start))
123 except DBSWriterError, ex:
124 msg = "Error importing dataset to be processed into local DBS\n"
125 msg += "Source Dataset: %s\n" % datasetpath
126 msg += "Source DBS: %s\n" % globalDBS
127 msg += "Destination DBS: %s\n" % self.DBSURL
128 common.logger.info(msg)
129 common.logger.info(str(ex))
130 return 1
131 return 0
132
133 def publishDataset(self,file):
134 """
135 """
136 try:
137 jobReport = readJobReport(file)[0]
138 self.exit_status = '0'
139 except IndexError:
140 self.exit_status = '1'
141 msg = "Error: Problem with "+file+" file"
142 common.logger.info(msg)
143 return self.exit_status
144
145 if (len(self.dataset_to_import) != 0):
146 for dataset in self.dataset_to_import:
147 common.logger.info("--->>> Importing parent dataset in the dbs: " +dataset)
148 status_import=self.importParentDataset(self.globalDBS, dataset)
149 if (status_import == 1):
150 common.logger.info('Problem with parent '+ dataset +' import from the global DBS '+self.globalDBS+ 'to the local one '+self.DBSURL)
151 self.exit_status='1'
152 return self.exit_status
153 else:
154 common.logger.info('Import ok of dataset '+dataset)
155
156 #// DBS to contact
157 dbswriter = DBSWriter(self.DBSURL)
158 try:
159 fileinfo= jobReport.files[0]
160 self.exit_status = '0'
161 except IndexError:
162 self.exit_status = '1'
163 msg = "Error: No EDM file to publish in xml file"+file+" file"
164 common.logger.info(msg)
165 return self.exit_status
166
167 datasets=fileinfo.dataset
168 common.logger.log(10-1,"FileInfo = " + str(fileinfo))
169 common.logger.log(10-1,"DatasetInfo = " + str(datasets))
170 if len(datasets)<=0:
171 self.exit_status = '1'
172 msg = "Error: No info about dataset in the xml file "+file
173 common.logger.info(msg)
174 return self.exit_status
175 for dataset in datasets:
176 #### for production data
177 self.processedData = dataset['ProcessedDataset']
178 if (dataset['PrimaryDataset'] == 'null'):
179 #dataset['PrimaryDataset'] = dataset['ProcessedDataset']
180 dataset['PrimaryDataset'] = self.userprocessedData
181 #else: # add parentage from input dataset
182 elif self.datasetpath.upper() != 'NONE':
183 dataset['ParentDataset']= self.datasetpath
184
185 dataset['PSetContent']=self.content
186 cfgMeta = {'name' : self.pset , 'Type' : 'user' , 'annotation': 'user cfg', 'version' : 'private version'} # add real name of user cfg
187 common.logger.info("PrimaryDataset = %s"%dataset['PrimaryDataset'])
188 common.logger.info("ProcessedDataset = %s"%dataset['ProcessedDataset'])
189 common.logger.info("<User Dataset Name> = /"+dataset['PrimaryDataset']+"/"+dataset['ProcessedDataset']+"/USER")
190 self.dataset_to_check="/"+dataset['PrimaryDataset']+"/"+dataset['ProcessedDataset']+"/USER"
191
192 common.logger.log(10-1,"--->>> Inserting primary: %s processed : %s"%(dataset['PrimaryDataset'],dataset['ProcessedDataset']))
193
194 primary = DBSWriterObjects.createPrimaryDataset( dataset, dbswriter.dbs)
195 common.logger.log(10-1,"Primary: %s "%primary)
196
197 algo = DBSWriterObjects.createAlgorithm(dataset, cfgMeta, dbswriter.dbs)
198 common.logger.log(10-1,"Algo: %s "%algo)
199
200 processed = DBSWriterObjects.createProcessedDataset(primary, algo, dataset, dbswriter.dbs)
201 common.logger.log(10-1,"Processed: %s "%processed)
202
203 common.logger.log(10-1,"Inserted primary %s processed %s"%(primary,processed))
204
205 common.logger.log(10-1,"exit_status = %s "%self.exit_status)
206 return self.exit_status
207
208 def publishAJobReport(self,file,procdataset):
209 """
210 input: xml file, processedDataset
211 """
212 common.logger.debug("FJR = %s"%file)
213 try:
214 jobReport = readJobReport(file)[0]
215 self.exit_status = '0'
216 except IndexError:
217 self.exit_status = '1'
218 msg = "Error: Problem with "+file+" file"
219 raise CrabException(msg)
220 ### overwrite ProcessedDataset with user defined value
221 ### overwrite lumisections with no value
222 ### skip publication for 0 events files
223 filestopublish=[]
224 for file in jobReport.files:
225 #### added check for problem with copy to SE and empty lfn
226 if (string.find(file['LFN'], 'copy_problems') != -1):
227 self.problemFiles.append(file['LFN'])
228 elif (file['LFN'] == ''):
229 self.noLFN.append(file['PFN'])
230 else:
231 if self.skipOcheck==0:
232 if int(file['TotalEvents']) != 0:
233 #file.lumisections = {}
234 # lumi info are now in run hash
235 file.runs = {}
236 for ds in file.dataset:
237 ### Fede for production
238 if (ds['PrimaryDataset'] == 'null'):
239 #ds['PrimaryDataset']=procdataset
240 ds['PrimaryDataset']=self.userprocessedData
241 filestopublish.append(file)
242 else:
243 self.noEventsFiles.append(file['LFN'])
244 else:
245 file.runs = {}
246 for ds in file.dataset:
247 ### Fede for production
248 if (ds['PrimaryDataset'] == 'null'):
249 #ds['PrimaryDataset']=procdataset
250 ds['PrimaryDataset']=self.userprocessedData
251 filestopublish.append(file)
252
253 jobReport.files = filestopublish
254 for file in filestopublish:
255 common.logger.debug("--->>> LFN of file to publish = " + str(file['LFN']))
256 ### if all files of FJR have number of events = 0
257 if (len(filestopublish) == 0):
258 return None
259
260 #// DBS to contact
261 dbswriter = DBSWriter(self.DBSURL)
262 # insert files
263 Blocks=None
264 try:
265 Blocks=dbswriter.insertFiles(jobReport)
266 common.logger.debug("--->>> Inserting file in blocks = %s"%Blocks)
267 except DBSWriterError, ex:
268 common.logger.debug("--->>> Insert file error: %s"%ex)
269 return Blocks
270
271 def run(self):
272 """
273 parse of all xml file on res dir and creation of distionary
274 """
275
276 file_list = glob.glob(self.resDir+"crab_fjr*.xml")
277
278 ## Select only those fjr that are succesfull
279 if (len(file_list)==0):
280 common.logger.info("--->>> "+self.resDir+" empty: no file to publish on DBS")
281 self.exit_status = '1'
282 return self.exit_status
283
284 good_list=[]
285 for fjr in file_list:
286 reports = readJobReport(fjr)
287 if len(reports)>0:
288 if reports[0].status == "Success":
289 good_list.append(fjr)
290 file_list=good_list
291 ##
292 common.logger.log(10-1, "file_list = "+str(file_list))
293 common.logger.log(10-1, "len(file_list) = "+str(len(file_list)))
294
295 if (len(file_list)>0):
296 BlocksList=[]
297 common.logger.info("--->>> Start dataset publication")
298 self.exit_status=self.publishDataset(file_list[0])
299 if (self.exit_status == '1'):
300 return self.exit_status
301 common.logger.info("--->>> End dataset publication")
302
303
304 common.logger.info("--->>> Start files publication")
305 for file in file_list:
306 Blocks=self.publishAJobReport(file,self.processedData)
307 if Blocks:
308 for x in Blocks: # do not allow multiple entries of the same block
309 if x not in BlocksList:
310 BlocksList.append(x)
311
312 # close the blocks
313 common.logger.log(10-1, "BlocksList = %s"%BlocksList)
314 # dbswriter = DBSWriter(self.DBSURL,level='ERROR')
315 dbswriter = DBSWriter(self.DBSURL)
316
317 for BlockName in BlocksList:
318 try:
319 closeBlock=dbswriter.manageFileBlock(BlockName,maxFiles= 1)
320 common.logger.log(10-1, "closeBlock %s"%closeBlock)
321 #dbswriter.dbs.closeBlock(BlockName)
322 except DBSWriterError, ex:
323 common.logger.info("Close block error %s"%ex)
324
325 if (len(self.noEventsFiles)>0):
326 common.logger.info("--->>> WARNING: "+str(len(self.noEventsFiles))+" files not published because they contain 0 events are:")
327 for lfn in self.noEventsFiles:
328 common.logger.info("------ LFN: %s"%lfn)
329 if (len(self.noLFN)>0):
330 common.logger.info("--->>> WARNING: there are "+str(len(self.noLFN))+" files not published because they have empty LFN")
331 for pfn in self.noLFN:
332 common.logger.info("------ pfn: %s"%pfn)
333 if (len(self.problemFiles)>0):
334 common.logger.info("--->>> WARNING: "+str(len(self.problemFiles))+" files not published because they had problem with copy to SE")
335 for lfn in self.problemFiles:
336 common.logger.info("------ LFN: %s"%lfn)
337 common.logger.info("--->>> End files publication")
338
339 self.cfg_params['USER.dataset_to_check']=self.dataset_to_check
340 from InspectDBS import InspectDBS
341 check=InspectDBS(self.cfg_params)
342 check.checkPublication()
343 return self.exit_status
344
345 else:
346 common.logger.info("--->>> No valid files to publish on DBS. Your jobs do not report exit codes = 0")
347 self.exit_status = '1'
348 return self.exit_status
349