ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Publisher.py
Revision: 1.40.2.3
Committed: Tue Sep 29 16:08:41 2009 UTC (15 years, 7 months ago) by fanzago
Content type: text/x-python
Branch: CRAB_2_6_X_br
CVS Tags: CRAB_2_6_3_patch_2_pre2, CRAB_2_6_3_patch_2_pre1, CRAB_2_6_3_patch_1, CRAB_2_6_3, CRAB_2_6_3_pre5
Changes since 1.40.2.2: +7 -4 lines
Log Message:
added some debug info

File Contents

# Content
1 import getopt, string
2 import common
3 import time, glob
4 from Actor import *
5 from crab_util import *
6 from crab_exceptions import *
7 from ProdCommon.FwkJobRep.ReportParser import readJobReport
8 from ProdCommon.FwkJobRep.ReportState import checkSuccess
9 from ProdCommon.MCPayloads.WorkflowSpec import WorkflowSpec
10 from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter
11 from ProdCommon.DataMgmt.DBS.DBSErrors import DBSWriterError, formatEx,DBSReaderError
12 from ProdCommon.DataMgmt.DBS.DBSReader import DBSReader
13 from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter,DBSWriterObjects
14 import sys
15 from DBSAPI.dbsApiException import DbsException
16 from DBSAPI.dbsApi import DbsApi
17
18 class Publisher(Actor):
19 def __init__(self, cfg_params):
20 """
21 Publisher class:
22
23 - parses CRAB FrameworkJobReport on UI
24 - returns <file> section of xml in dictionary format for each xml file in crab_0_xxxx/res directory
25 - publishes output data on DBS and DLS
26 """
27
28 self.cfg_params=cfg_params
29
30 if not cfg_params.has_key('USER.publish_data_name'):
31 raise CrabException('Cannot publish output data, because you did not specify USER.publish_data_name parameter in the crab.cfg file')
32 self.userprocessedData = cfg_params['USER.publish_data_name']
33 self.processedData = None
34
35 if (not cfg_params.has_key('USER.copy_data') or int(cfg_params['USER.copy_data']) != 1 ) or \
36 (not cfg_params.has_key('USER.publish_data') or int(cfg_params['USER.publish_data']) != 1 ):
37 msg = 'You can not publish data because you did not selected \n'
38 msg += '\t*** copy_data = 1 and publish_data = 1 *** in the crab.cfg file'
39 raise CrabException(msg)
40
41 if not cfg_params.has_key('CMSSW.pset'):
42 raise CrabException('Cannot publish output data, because you did not specify the psetname in [CMSSW] of your crab.cfg file')
43 self.pset = cfg_params['CMSSW.pset']
44
45 self.globalDBS=cfg_params.get('CMSSW.dbs_url',"http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet")
46
47 if not cfg_params.has_key('USER.dbs_url_for_publication'):
48 msg = "Warning. The [USER] section does not have 'dbs_url_for_publication'"
49 msg = msg + " entry, necessary to publish the data.\n"
50 msg = msg + "Use the command **crab -publish -USER.dbs_url_for_publication=dbs_url_for_publication*** \nwhere dbs_url_for_publication is your local dbs instance."
51 raise CrabException(msg)
52
53 self.DBSURL=cfg_params['USER.dbs_url_for_publication']
54 common.logger.info('<dbs_url_for_publication> = '+self.DBSURL)
55 if (self.DBSURL == "http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet") or (self.DBSURL == "https://cmsdbsprod.cern.ch:8443/cms_dbs_prod_global_writer/servlet/DBSServlet"):
56 msg = "You can not publish your data in the globalDBS = " + self.DBSURL + "\n"
57 msg = msg + "Please write your local one in the [USER] section 'dbs_url_for_publication'"
58 raise CrabException(msg)
59
60 self.content=file(self.pset).read()
61 self.resDir = common.work_space.resDir()
62
63 self.dataset_to_import=[]
64
65 self.datasetpath=cfg_params['CMSSW.datasetpath']
66 if (self.datasetpath.upper() != 'NONE'):
67 self.dataset_to_import.append(self.datasetpath)
68
69 ### Added PU dataset
70 tmp = cfg_params.get('CMSSW.dataset_pu',None)
71 if tmp :
72 datasets = tmp.split(',')
73 for dataset in datasets:
74 dataset=string.strip(dataset)
75 self.dataset_to_import.append(dataset)
76 ###
77
78 self.import_all_parents = cfg_params.get('USER.publish_with_import_all_parents',1)
79 self.skipOcheck=cfg_params.get('CMSSW.publish_zero_event',0)
80
81 self.SEName=''
82 self.CMSSW_VERSION=''
83 self.exit_status=''
84 self.time = time.strftime('%y%m%d_%H%M%S',time.localtime(time.time()))
85 self.problemFiles=[]
86 self.noEventsFiles=[]
87 self.noLFN=[]
88
89 def importParentDataset(self,globalDBS, datasetpath):
90 """
91 """
92 dbsWriter = DBSWriter(self.DBSURL,level='ERROR')
93
94 try:
95 if (self.import_all_parents==1):
96 common.logger.info("--->>> Importing all parents level")
97 start = time.time()
98 common.logger.debug("start import time: " + str(start))
99 ### to skip the ProdCommon api exception in the case of block without location
100 ### skipNoSiteError=True
101 #dbsWriter.importDataset(globalDBS, datasetpath, self.DBSURL, skipNoSiteError=True)
102 ### calling dbs api directly
103 dbsWriter.dbs.migrateDatasetContents(globalDBS, self.DBSURL, datasetpath)
104 stop = time.time()
105 common.logger.debug("stop import time: " + str(stop))
106 common.logger.info("--->>> duration of all parents import (sec): "+str(stop - start))
107
108 else:
109 common.logger.info("--->>> Importing only the datasetpath " + datasetpath)
110 start = time.time()
111 #dbsWriter.importDatasetWithoutParentage(globalDBS, datasetpath, self.DBSURL, skipNoSiteError=True)
112 ### calling dbs api directly
113 common.logger.debug("start import time: " + str(start))
114 dbsWriter.dbs.migrateDatasetContents(globalDBS, self.DBSURL, datasetpath, noParentsReadOnly = True )
115 stop = time.time()
116 common.logger.debug("stop import time: " + str(stop))
117 common.logger.info("--->>> duration of first level parent import (sec): "+str(stop - start))
118 except DBSWriterError, ex:
119 msg = "Error importing dataset to be processed into local DBS\n"
120 msg += "Source Dataset: %s\n" % datasetpath
121 msg += "Source DBS: %s\n" % globalDBS
122 msg += "Destination DBS: %s\n" % self.DBSURL
123 common.logger.info(msg)
124 common.logger.info(str(ex))
125 return 1
126 return 0
127
128 def publishDataset(self,file):
129 """
130 """
131 try:
132 jobReport = readJobReport(file)[0]
133 self.exit_status = '0'
134 except IndexError:
135 self.exit_status = '1'
136 msg = "Error: Problem with "+file+" file"
137 common.logger.info(msg)
138 return self.exit_status
139
140 if (len(self.dataset_to_import) != 0):
141 for dataset in self.dataset_to_import:
142 common.logger.info("--->>> Importing parent dataset in the dbs: " +dataset)
143 status_import=self.importParentDataset(self.globalDBS, dataset)
144 if (status_import == 1):
145 common.logger.info('Problem with parent '+ dataset +' import from the global DBS '+self.globalDBS+ 'to the local one '+self.DBSURL)
146 self.exit_status='1'
147 return self.exit_status
148 else:
149 common.logger.info('Import ok of dataset '+dataset)
150
151 #// DBS to contact
152 dbswriter = DBSWriter(self.DBSURL)
153 try:
154 fileinfo= jobReport.files[0]
155 self.exit_status = '0'
156 except IndexError:
157 self.exit_status = '1'
158 msg = "Error: No EDM file to publish in xml file"+file+" file"
159 common.logger.info(msg)
160 return self.exit_status
161
162 datasets=fileinfo.dataset
163 common.logger.log(10-1,"FileInfo = " + str(fileinfo))
164 common.logger.log(10-1,"DatasetInfo = " + str(datasets))
165 if len(datasets)<=0:
166 self.exit_status = '1'
167 msg = "Error: No info about dataset in the xml file "+file
168 common.logger.info(msg)
169 return self.exit_status
170 for dataset in datasets:
171 #### for production data
172 self.processedData = dataset['ProcessedDataset']
173 if (dataset['PrimaryDataset'] == 'null'):
174 #dataset['PrimaryDataset'] = dataset['ProcessedDataset']
175 dataset['PrimaryDataset'] = self.userprocessedData
176 #else: # add parentage from input dataset
177 elif self.datasetpath.upper() != 'NONE':
178 dataset['ParentDataset']= self.datasetpath
179
180 dataset['PSetContent']=self.content
181 cfgMeta = {'name' : self.pset , 'Type' : 'user' , 'annotation': 'user cfg', 'version' : 'private version'} # add real name of user cfg
182 common.logger.info("PrimaryDataset = %s"%dataset['PrimaryDataset'])
183 common.logger.info("ProcessedDataset = %s"%dataset['ProcessedDataset'])
184 common.logger.info("<User Dataset Name> = /"+dataset['PrimaryDataset']+"/"+dataset['ProcessedDataset']+"/USER")
185 self.dataset_to_check="/"+dataset['PrimaryDataset']+"/"+dataset['ProcessedDataset']+"/USER"
186
187 common.logger.log(10-1,"--->>> Inserting primary: %s processed : %s"%(dataset['PrimaryDataset'],dataset['ProcessedDataset']))
188
189 primary = DBSWriterObjects.createPrimaryDataset( dataset, dbswriter.dbs)
190 common.logger.log(10-1,"Primary: %s "%primary)
191
192 algo = DBSWriterObjects.createAlgorithm(dataset, cfgMeta, dbswriter.dbs)
193 common.logger.log(10-1,"Algo: %s "%algo)
194
195 processed = DBSWriterObjects.createProcessedDataset(primary, algo, dataset, dbswriter.dbs)
196 common.logger.log(10-1,"Processed: %s "%processed)
197
198 common.logger.log(10-1,"Inserted primary %s processed %s"%(primary,processed))
199
200 common.logger.log(10-1,"exit_status = %s "%self.exit_status)
201 return self.exit_status
202
203 def publishAJobReport(self,file,procdataset):
204 """
205 input: xml file, processedDataset
206 """
207 common.logger.debug("FJR = %s"%file)
208 try:
209 jobReport = readJobReport(file)[0]
210 self.exit_status = '0'
211 except IndexError:
212 self.exit_status = '1'
213 msg = "Error: Problem with "+file+" file"
214 raise CrabException(msg)
215 ### overwrite ProcessedDataset with user defined value
216 ### overwrite lumisections with no value
217 ### skip publication for 0 events files
218 filestopublish=[]
219 for file in jobReport.files:
220 #### added check for problem with copy to SE and empty lfn
221 if (string.find(file['LFN'], 'copy_problems') != -1):
222 self.problemFiles.append(file['LFN'])
223 elif (file['LFN'] == ''):
224 self.noLFN.append(file['PFN'])
225 else:
226 if self.skipOcheck==0:
227 if int(file['TotalEvents']) != 0:
228 #file.lumisections = {}
229 # lumi info are now in run hash
230 file.runs = {}
231 for ds in file.dataset:
232 ### Fede for production
233 if (ds['PrimaryDataset'] == 'null'):
234 #ds['PrimaryDataset']=procdataset
235 ds['PrimaryDataset']=self.userprocessedData
236 filestopublish.append(file)
237 else:
238 self.noEventsFiles.append(file['LFN'])
239 else:
240 file.runs = {}
241 for ds in file.dataset:
242 ### Fede for production
243 if (ds['PrimaryDataset'] == 'null'):
244 #ds['PrimaryDataset']=procdataset
245 ds['PrimaryDataset']=self.userprocessedData
246 filestopublish.append(file)
247
248 jobReport.files = filestopublish
249 for file in filestopublish:
250 common.logger.debug("--->>> LFN of file to publish = " + str(file['LFN']))
251 ### if all files of FJR have number of events = 0
252 if (len(filestopublish) == 0):
253 return None
254
255 #// DBS to contact
256 dbswriter = DBSWriter(self.DBSURL)
257 # insert files
258 Blocks=None
259 try:
260 Blocks=dbswriter.insertFiles(jobReport)
261 common.logger.debug("--->>> Inserting file in blocks = %s"%Blocks)
262 except DBSWriterError, ex:
263 common.logger.debug("--->>> Insert file error: %s"%ex)
264 return Blocks
265
266 def run(self):
267 """
268 parse of all xml file on res dir and creation of distionary
269 """
270
271 file_list = glob.glob(self.resDir+"crab_fjr*.xml")
272
273 ## Select only those fjr that are succesfull
274 if (len(file_list)==0):
275 common.logger.info("--->>> "+self.resDir+" empty: no file to publish on DBS")
276 self.exit_status = '1'
277 return self.exit_status
278
279 good_list=[]
280 for fjr in file_list:
281 reports = readJobReport(fjr)
282 if len(reports)>0:
283 if reports[0].status == "Success":
284 good_list.append(fjr)
285 file_list=good_list
286 ##
287 common.logger.log(10-1, "file_list = "+str(file_list))
288 common.logger.log(10-1, "len(file_list) = "+str(len(file_list)))
289
290 if (len(file_list)>0):
291 BlocksList=[]
292 common.logger.info("--->>> Start dataset publication")
293 self.exit_status=self.publishDataset(file_list[0])
294 if (self.exit_status == '1'):
295 return self.exit_status
296 common.logger.info("--->>> End dataset publication")
297
298
299 common.logger.info("--->>> Start files publication")
300 for file in file_list:
301 Blocks=self.publishAJobReport(file,self.processedData)
302 if Blocks:
303 for x in Blocks: # do not allow multiple entries of the same block
304 if x not in BlocksList:
305 BlocksList.append(x)
306
307 # close the blocks
308 common.logger.log(10-1, "BlocksList = %s"%BlocksList)
309 # dbswriter = DBSWriter(self.DBSURL,level='ERROR')
310 dbswriter = DBSWriter(self.DBSURL)
311
312 for BlockName in BlocksList:
313 try:
314 closeBlock=dbswriter.manageFileBlock(BlockName,maxFiles= 1)
315 common.logger.log(10-1, "closeBlock %s"%closeBlock)
316 #dbswriter.dbs.closeBlock(BlockName)
317 except DBSWriterError, ex:
318 common.logger.info("Close block error %s"%ex)
319
320 if (len(self.noEventsFiles)>0):
321 common.logger.info("--->>> WARNING: "+str(len(self.noEventsFiles))+" files not published because they contain 0 events are:")
322 for lfn in self.noEventsFiles:
323 common.logger.info("------ LFN: %s"%lfn)
324 if (len(self.noLFN)>0):
325 common.logger.info("--->>> WARNING: there are "+str(len(self.noLFN))+" files not published because they have empty LFN")
326 for pfn in self.noLFN:
327 common.logger.info("------ pfn: %s"%pfn)
328 if (len(self.problemFiles)>0):
329 common.logger.info("--->>> WARNING: "+str(len(self.problemFiles))+" files not published because they had problem with copy to SE")
330 for lfn in self.problemFiles:
331 common.logger.info("------ LFN: %s"%lfn)
332 common.logger.info("--->>> End files publication")
333
334 self.cfg_params['USER.dataset_to_check']=self.dataset_to_check
335 from InspectDBS import InspectDBS
336 check=InspectDBS(self.cfg_params)
337 check.checkPublication()
338 return self.exit_status
339
340 else:
341 common.logger.info("--->>> No valid files to publish on DBS. Your jobs do not report exit codes = 0")
342 self.exit_status = '1'
343 return self.exit_status
344