ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Publisher.py
Revision: 1.48
Committed: Tue May 4 16:35:43 2010 UTC (14 years, 11 months ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_7_3_beta
Changes since 1.47: +47 -53 lines
Log Message:
merge 2.7.1_branch

File Contents

# Content
1 import getopt, string
2 import common
3 import time, glob
4 from Actor import *
5 from crab_util import *
6 from crab_exceptions import *
7 from ProdCommon.FwkJobRep.ReportParser import readJobReport
8 from ProdCommon.FwkJobRep.ReportState import checkSuccess
9 from ProdCommon.MCPayloads.WorkflowSpec import WorkflowSpec
10 from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter
11 from ProdCommon.DataMgmt.DBS.DBSErrors import DBSWriterError, formatEx,DBSReaderError
12 from ProdCommon.DataMgmt.DBS.DBSReader import DBSReader
13 from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter,DBSWriterObjects
14 import sys
15 from DBSAPI.dbsApi import DbsApi
16 from DBSAPI.dbsMigrateApi import DbsMigrateApi
17 from DBSAPI.dbsApiException import *
18
19 class Publisher(Actor):
20 def __init__(self, cfg_params):
21 """
22 Publisher class:
23
24 - parses CRAB FrameworkJobReport on UI
25 - returns <file> section of xml in dictionary format for each xml file in crab_0_xxxx/res directory
26 - publishes output data on DBS and DLS
27 """
28
29 self.cfg_params=cfg_params
30
31 if not cfg_params.has_key('USER.publish_data_name'):
32 raise CrabException('Cannot publish output data, because you did not specify USER.publish_data_name parameter in the crab.cfg file')
33 self.userprocessedData = cfg_params['USER.publish_data_name']
34 self.processedData = None
35
36 if (not cfg_params.has_key('USER.copy_data') or int(cfg_params['USER.copy_data']) != 1 ) or \
37 (not cfg_params.has_key('USER.publish_data') or int(cfg_params['USER.publish_data']) != 1 ):
38 msg = 'You can not publish data because you did not selected \n'
39 msg += '\t*** copy_data = 1 and publish_data = 1 *** in the crab.cfg file'
40 raise CrabException(msg)
41
42 if not cfg_params.has_key('CMSSW.pset'):
43 raise CrabException('Cannot publish output data, because you did not specify the psetname in [CMSSW] of your crab.cfg file')
44 self.pset = cfg_params['CMSSW.pset']
45
46 self.globalDBS=cfg_params.get('CMSSW.dbs_url',"http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet")
47
48 if not cfg_params.has_key('USER.dbs_url_for_publication'):
49 msg = "Warning. The [USER] section does not have 'dbs_url_for_publication'"
50 msg = msg + " entry, necessary to publish the data.\n"
51 msg = msg + "Use the command **crab -publish -USER.dbs_url_for_publication=dbs_url_for_publication*** \nwhere dbs_url_for_publication is your local dbs instance."
52 raise CrabException(msg)
53
54 self.DBSURL=cfg_params['USER.dbs_url_for_publication']
55 common.logger.info('<dbs_url_for_publication> = '+self.DBSURL)
56 if (self.DBSURL == "http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet") or (self.DBSURL == "https://cmsdbsprod.cern.ch:8443/cms_dbs_prod_global_writer/servlet/DBSServlet"):
57 msg = "You can not publish your data in the globalDBS = " + self.DBSURL + "\n"
58 msg = msg + "Please write your local one in the [USER] section 'dbs_url_for_publication'"
59 raise CrabException(msg)
60
61 self.content=file(self.pset).read()
62 self.resDir = common.work_space.resDir()
63
64 self.dataset_to_import=[]
65
66 self.datasetpath=cfg_params['CMSSW.datasetpath']
67 if (self.datasetpath.upper() != 'NONE'):
68 self.dataset_to_import.append(self.datasetpath)
69
70 ### Added PU dataset
71 tmp = cfg_params.get('CMSSW.dataset_pu',None)
72 if tmp :
73 datasets = tmp.split(',')
74 for dataset in datasets:
75 dataset=string.strip(dataset)
76 self.dataset_to_import.append(dataset)
77 ###
78
79 self.import_all_parents = cfg_params.get('USER.publish_with_import_all_parents',1)
80
81 if ( int(self.import_all_parents) == 0 ):
82 common.logger.info("WARNING: The option USER.publish_with_import_all_parents=0 has been deprecated. The import of parents is compulsory and done by default")
83 self.skipOcheck=cfg_params.get('CMSSW.publish_zero_event',1)
84 if ( int(self.skipOcheck) == 0 ):
85 common.logger.info("WARNING: The option CMSSW.publish_zero_event has been deprecated. The publication is done by default also for files with 0 events")
86 self.SEName=''
87 self.CMSSW_VERSION=''
88 self.exit_status=''
89 self.time = time.strftime('%y%m%d_%H%M%S',time.localtime(time.time()))
90 self.problemFiles=[]
91 self.noEventsFiles=[]
92 self.noLFN=[]
93
94 def importParentDataset(self,globalDBS, datasetpath):
95 """
96 WARNING: it works only with DBS_2_0_9_patch_6
97 """
98
99 args={'url':globalDBS}
100 try:
101 api_reader = DbsApi(args)
102 except DbsApiException, ex:
103 msg = "%s\n" % formatEx(ex)
104 raise CrabException(msg)
105
106 args={'url':self.DBSURL}
107 try:
108 api_writer = DbsApi(args)
109 except DbsApiException, ex:
110 msg = "%s\n" % formatEx(ex)
111 raise CrabException(msg)
112
113 try:
114 common.logger.info("--->>> Importing all parents level")
115 start = time.time()
116 common.logger.debug("start import parents time: " + str(start))
117 for block in api_reader.listBlocks(datasetpath):
118 print "blockName = ", block['Name']
119 api_writer.dbsMigrateBlock(globalDBS,self.DBSURL,block['Name'] )
120 stop = time.time()
121 common.logger.debug("stop import parents time: " + str(stop))
122 common.logger.info("--->>> duration of all parents import (sec): "+str(stop - start))
123 except DbsApiException, ex:
124 msg = "Error importing dataset to be processed into local DBS\n"
125 msg += "Source Dataset: %s\n" % datasetpath
126 msg += "Source DBS: %s\n" % globalDBS
127 msg += "Destination DBS: %s\n" % self.DBSURL
128 common.logger.info(msg)
129 common.logger.info(str(ex))
130 return 1
131 return 0
132
133 def publishDataset(self,file):
134 """
135 """
136 try:
137 jobReport = readJobReport(file)[0]
138 self.exit_status = '0'
139 except IndexError:
140 self.exit_status = '1'
141 msg = "Error: Problem with "+file+" file"
142 common.logger.info(msg)
143 return self.exit_status
144
145 if (len(self.dataset_to_import) != 0):
146 for dataset in self.dataset_to_import:
147 common.logger.info("--->>> Importing parent dataset in the dbs: " +dataset)
148 status_import=self.importParentDataset(self.globalDBS, dataset)
149 if (status_import == 1):
150 common.logger.info('Problem with parent '+ dataset +' import from the global DBS '+self.globalDBS+ 'to the local one '+self.DBSURL)
151 self.exit_status='1'
152 return self.exit_status
153 else:
154 common.logger.info('Import ok of dataset '+dataset)
155
156 #// DBS to contact
157 dbswriter = DBSWriter(self.DBSURL)
158 try:
159 fileinfo= jobReport.files[0]
160 self.exit_status = '0'
161 except IndexError:
162 self.exit_status = '1'
163 msg = "Error: No EDM file to publish in xml file"+file+" file"
164 common.logger.info(msg)
165 return self.exit_status
166
167 datasets=fileinfo.dataset
168 common.logger.log(10-1,"FileInfo = " + str(fileinfo))
169 common.logger.log(10-1,"DatasetInfo = " + str(datasets))
170 if len(datasets)<=0:
171 self.exit_status = '1'
172 msg = "Error: No info about dataset in the xml file "+file
173 common.logger.info(msg)
174 return self.exit_status
175 for dataset in datasets:
176 #### for production data
177 self.processedData = dataset['ProcessedDataset']
178 if (dataset['PrimaryDataset'] == 'null'):
179 #dataset['PrimaryDataset'] = dataset['ProcessedDataset']
180 dataset['PrimaryDataset'] = self.userprocessedData
181 #else: # add parentage from input dataset
182 elif self.datasetpath.upper() != 'NONE':
183 dataset['ParentDataset']= self.datasetpath
184
185 dataset['PSetContent']=self.content
186 cfgMeta = {'name' : self.pset , 'Type' : 'user' , 'annotation': 'user cfg', 'version' : 'private version'} # add real name of user cfg
187 common.logger.info("PrimaryDataset = %s"%dataset['PrimaryDataset'])
188 common.logger.info("ProcessedDataset = %s"%dataset['ProcessedDataset'])
189 common.logger.info("<User Dataset Name> = /"+dataset['PrimaryDataset']+"/"+dataset['ProcessedDataset']+"/USER")
190 self.dataset_to_check="/"+dataset['PrimaryDataset']+"/"+dataset['ProcessedDataset']+"/USER"
191
192 common.logger.log(10-1,"--->>> Inserting primary: %s processed : %s"%(dataset['PrimaryDataset'],dataset['ProcessedDataset']))
193
194 primary = DBSWriterObjects.createPrimaryDataset( dataset, dbswriter.dbs)
195 common.logger.log(10-1,"Primary: %s "%primary)
196
197 algo = DBSWriterObjects.createAlgorithm(dataset, cfgMeta, dbswriter.dbs)
198 common.logger.log(10-1,"Algo: %s "%algo)
199
200 processed = DBSWriterObjects.createProcessedDataset(primary, algo, dataset, dbswriter.dbs)
201 common.logger.log(10-1,"Processed: %s "%processed)
202
203 common.logger.log(10-1,"Inserted primary %s processed %s"%(primary,processed))
204
205 common.logger.log(10-1,"exit_status = %s "%self.exit_status)
206 return self.exit_status
207
208 def publishAJobReport(self,file,procdataset):
209 """
210 input: xml file, processedDataset
211 """
212 common.logger.debug("FJR = %s"%file)
213 try:
214 jobReport = readJobReport(file)[0]
215 self.exit_status = '0'
216 except IndexError:
217 self.exit_status = '1'
218 msg = "Error: Problem with "+file+" file"
219 raise CrabException(msg)
220 ### skip publication for 0 events files
221 filestopublish=[]
222 for file in jobReport.files:
223 #### added check for problem with copy to SE and empty lfn
224 if (string.find(file['LFN'], 'copy_problems') != -1):
225 self.problemFiles.append(file['LFN'])
226 elif (file['LFN'] == ''):
227 self.noLFN.append(file['PFN'])
228 else:
229 #if self.skipOcheck==0:
230 # if int(file['TotalEvents']) != 0:
231 # for ds in file.dataset:
232 # ### Fede for production
233 # if (ds['PrimaryDataset'] == 'null'):
234 # ds['PrimaryDataset']=self.userprocessedData
235 # filestopublish.append(file)
236 # else:
237 # self.noEventsFiles.append(file['LFN'])
238 #else:
239 if int(file['TotalEvents']) == 0:
240 self.noEventsFiles.append(file['LFN'])
241 for ds in file.dataset:
242 ### Fede for production
243 if (ds['PrimaryDataset'] == 'null'):
244 ds['PrimaryDataset']=self.userprocessedData
245 filestopublish.append(file)
246
247 jobReport.files = filestopublish
248 for file in filestopublish:
249 common.logger.debug("--->>> LFN of file to publish = " + str(file['LFN']))
250 ### if all files of FJR have number of events = 0
251 if (len(filestopublish) == 0):
252 return None
253
254 #// DBS to contact
255 dbswriter = DBSWriter(self.DBSURL)
256 # insert files
257 Blocks=None
258 try:
259 ### FEDE added insertDetectorData = True to propagate in DBS info about run and lumi
260 Blocks=dbswriter.insertFiles(jobReport, insertDetectorData = True)
261 #Blocks=dbswriter.insertFiles(jobReport)
262 common.logger.debug("--->>> Inserting file in blocks = %s"%Blocks)
263 except DBSWriterError, ex:
264 common.logger.debug("--->>> Insert file error: %s"%ex)
265 return Blocks
266
267 def run(self):
268 """
269 parse of all xml file on res dir and creation of distionary
270 """
271
272 file_list = glob.glob(self.resDir+"crab_fjr*.xml")
273
274 ## Select only those fjr that are succesfull
275 if (len(file_list)==0):
276 common.logger.info("--->>> "+self.resDir+" empty: no file to publish on DBS")
277 self.exit_status = '1'
278 return self.exit_status
279
280 good_list=[]
281 for fjr in file_list:
282 reports = readJobReport(fjr)
283 if len(reports)>0:
284 if reports[0].status == "Success":
285 good_list.append(fjr)
286 file_list=good_list
287 ##
288 common.logger.log(10-1, "file_list = "+str(file_list))
289 common.logger.log(10-1, "len(file_list) = "+str(len(file_list)))
290
291 if (len(file_list)>0):
292 BlocksList=[]
293 common.logger.info("--->>> Start dataset publication")
294 self.exit_status=self.publishDataset(file_list[0])
295 if (self.exit_status == '1'):
296 return self.exit_status
297 common.logger.info("--->>> End dataset publication")
298
299
300 common.logger.info("--->>> Start files publication")
301 for file in file_list:
302 Blocks=self.publishAJobReport(file,self.processedData)
303 if Blocks:
304 for x in Blocks: # do not allow multiple entries of the same block
305 if x not in BlocksList:
306 BlocksList.append(x)
307
308 # close the blocks
309 common.logger.log(10-1, "BlocksList = %s"%BlocksList)
310 dbswriter = DBSWriter(self.DBSURL)
311
312 for BlockName in BlocksList:
313 try:
314 closeBlock=dbswriter.manageFileBlock(BlockName,maxFiles= 1)
315 common.logger.log(10-1, "closeBlock %s"%closeBlock)
316 except DBSWriterError, ex:
317 common.logger.info("Close block error %s"%ex)
318
319 if (len(self.noEventsFiles)>0):
320 common.logger.info("--->>> WARNING: "+str(len(self.noEventsFiles))+" published files contain 0 events are:")
321 for lfn in self.noEventsFiles:
322 common.logger.info("------ LFN: %s"%lfn)
323 if (len(self.noLFN)>0):
324 common.logger.info("--->>> WARNING: there are "+str(len(self.noLFN))+" files not published because they have empty LFN")
325 for pfn in self.noLFN:
326 common.logger.info("------ pfn: %s"%pfn)
327 if (len(self.problemFiles)>0):
328 common.logger.info("--->>> WARNING: "+str(len(self.problemFiles))+" files not published because they had problem with copy to SE")
329 for lfn in self.problemFiles:
330 common.logger.info("------ LFN: %s"%lfn)
331 common.logger.info("--->>> End files publication")
332
333 self.cfg_params['USER.dataset_to_check']=self.dataset_to_check
334 from InspectDBS import InspectDBS
335 check=InspectDBS(self.cfg_params)
336 check.checkPublication()
337 return self.exit_status
338
339 else:
340 common.logger.info("--->>> No valid files to publish on DBS. Your jobs do not report exit codes = 0")
341 self.exit_status = '1'
342 return self.exit_status
343