ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Publisher.py
Revision: 1.40
Committed: Wed Jul 22 10:13:14 2009 UTC (15 years, 9 months ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_6_3_pre2, CRAB_2_7_0_pre2, CRAB_2_6_3_pre1, test_1, CRAB_2_7_0_pre1, CRAB_2_6_2, CRAB_2_6_2_pre2, CRAB_2_6_2_pre1, CRAB_2_6_1_pre4, CRAB_2_6_1
Branch point for: CRAB_2_6_X_br
Changes since 1.39: +6 -1 lines
Log Message:
imporoved message. Added a check.

File Contents

# Content
1 import getopt, string
2 import common
3 import time, glob
4 from Actor import *
5 from crab_util import *
6 from crab_exceptions import *
7 from ProdCommon.FwkJobRep.ReportParser import readJobReport
8 from ProdCommon.FwkJobRep.ReportState import checkSuccess
9 from ProdCommon.MCPayloads.WorkflowSpec import WorkflowSpec
10 from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter
11 from ProdCommon.DataMgmt.DBS.DBSErrors import DBSWriterError, formatEx,DBSReaderError
12 from ProdCommon.DataMgmt.DBS.DBSReader import DBSReader
13 from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter,DBSWriterObjects
14 import sys
15 from DBSAPI.dbsApiException import DbsException
16 from DBSAPI.dbsApi import DbsApi
17
18 class Publisher(Actor):
19 def __init__(self, cfg_params):
20 """
21 Publisher class:
22
23 - parses CRAB FrameworkJobReport on UI
24 - returns <file> section of xml in dictionary format for each xml file in crab_0_xxxx/res directory
25 - publishes output data on DBS and DLS
26 """
27
28 self.cfg_params=cfg_params
29
30 if not cfg_params.has_key('USER.publish_data_name'):
31 raise CrabException('Cannot publish output data, because you did not specify USER.publish_data_name parameter in the crab.cfg file')
32 self.userprocessedData = cfg_params['USER.publish_data_name']
33 self.processedData = None
34
35 if (not cfg_params.has_key('USER.copy_data') or int(cfg_params['USER.copy_data']) != 1 ) or \
36 (not cfg_params.has_key('USER.publish_data') or int(cfg_params['USER.publish_data']) != 1 ):
37 msg = 'You can not publish data because you did not selected \n'
38 msg += '\t*** copy_data = 1 and publish_data = 1 *** in the crab.cfg file'
39 raise CrabException(msg)
40
41 if not cfg_params.has_key('CMSSW.pset'):
42 raise CrabException('Cannot publish output data, because you did not specify the psetname in [CMSSW] of your crab.cfg file')
43 self.pset = cfg_params['CMSSW.pset']
44
45 self.globalDBS=cfg_params.get('CMSSW.dbs_url',"http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet")
46
47 if not cfg_params.has_key('USER.dbs_url_for_publication'):
48 msg = "Warning. The [USER] section does not have 'dbs_url_for_publication'"
49 msg = msg + " entry, necessary to publish the data.\n"
50 msg = msg + "Use the command **crab -publish -USER.dbs_url_for_publication=dbs_url_for_publication*** \nwhere dbs_url_for_publication is your local dbs instance."
51 raise CrabException(msg)
52
53 self.DBSURL=cfg_params['USER.dbs_url_for_publication']
54 common.logger.info('<dbs_url_for_publication> = '+self.DBSURL)
55 if (self.DBSURL == "http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet") or (self.DBSURL == "https://cmsdbsprod.cern.ch:8443/cms_dbs_prod_global_writer/servlet/DBSServlet"):
56 msg = "You can not publish your data in the globalDBS = " + self.DBSURL + "\n"
57 msg = msg + "Please write your local one in the [USER] section 'dbs_url_for_publication'"
58 raise CrabException(msg)
59
60 self.content=file(self.pset).read()
61 self.resDir = common.work_space.resDir()
62
63 self.dataset_to_import=[]
64
65 self.datasetpath=cfg_params['CMSSW.datasetpath']
66 if (self.datasetpath.upper() != 'NONE'):
67 self.dataset_to_import.append(self.datasetpath)
68
69 ### Added PU dataset
70 tmp = cfg_params.get('CMSSW.dataset_pu',None)
71 if tmp :
72 datasets = tmp.split(',')
73 for dataset in datasets:
74 dataset=string.strip(dataset)
75 self.dataset_to_import.append(dataset)
76 ###
77
78 self.import_all_parents = cfg_params.get('USER.publish_with_import_all_parents',1)
79 self.skipOcheck=cfg_params.get('CMSSW.publish_zero_event',0)
80
81 self.SEName=''
82 self.CMSSW_VERSION=''
83 self.exit_status=''
84 self.time = time.strftime('%y%m%d_%H%M%S',time.localtime(time.time()))
85 self.problemFiles=[]
86 self.noEventsFiles=[]
87 self.noLFN=[]
88
89 def importParentDataset(self,globalDBS, datasetpath):
90 """
91 """
92 dbsWriter = DBSWriter(self.DBSURL,level='ERROR')
93
94 try:
95 if (self.import_all_parents==1):
96 common.logger.info("--->>> Importing all parents level")
97 dbsWriter.importDataset(globalDBS, datasetpath, self.DBSURL)
98 else:
99 common.logger.info("--->>> Importing only the datasetpath " + datasetpath)
100 dbsWriter.importDatasetWithoutParentage(globalDBS, datasetpath, self.DBSURL)
101 except DBSWriterError, ex:
102 msg = "Error importing dataset to be processed into local DBS\n"
103 msg += "Source Dataset: %s\n" % datasetpath
104 msg += "Source DBS: %s\n" % globalDBS
105 msg += "Destination DBS: %s\n" % self.DBSURL
106 common.logger.info(msg)
107 common.logger.debug(str(ex))
108 return 1
109 return 0
110 """
111 print " patch for importParentDataset: datasetpath = ", datasetpath
112 try:
113 args={}
114 args['url']=self.DBSURL
115 args['mode']='POST'
116 block = ""
117 api = DbsApi(args)
118 #api.migrateDatasetContents(srcURL, dstURL, path, block , False)
119 api.migrateDatasetContents(globalDBS, self.DBSURL, datasetpath, block , False)
120
121 except DbsException, ex:
122 print "Caught API Exception %s: %s " % (ex.getClassName(), ex.getErrorMessage() )
123 if ex.getErrorCode() not in (None, ""):
124 print "DBS Exception Error Code: ", ex.getErrorCode()
125 return 1
126 print "Done"
127 return 0
128 """
129 def publishDataset(self,file):
130 """
131 """
132 try:
133 jobReport = readJobReport(file)[0]
134 self.exit_status = '0'
135 except IndexError:
136 self.exit_status = '1'
137 msg = "Error: Problem with "+file+" file"
138 common.logger.info(msg)
139 return self.exit_status
140
141 if (len(self.dataset_to_import) != 0):
142 for dataset in self.dataset_to_import:
143 common.logger.info("--->>> Importing parent dataset in the dbs: " +dataset)
144 status_import=self.importParentDataset(self.globalDBS, dataset)
145 if (status_import == 1):
146 common.logger.info('Problem with parent '+ dataset +' import from the global DBS '+self.globalDBS+ 'to the local one '+self.DBSURL)
147 self.exit_status='1'
148 return self.exit_status
149 else:
150 common.logger.info('Import ok of dataset '+dataset)
151
152 #// DBS to contact
153 dbswriter = DBSWriter(self.DBSURL)
154 try:
155 fileinfo= jobReport.files[0]
156 self.exit_status = '0'
157 except IndexError:
158 self.exit_status = '1'
159 msg = "Error: No file to publish in xml file"+file+" file"
160 common.logger.info(msg)
161 return self.exit_status
162
163 datasets=fileinfo.dataset
164 common.logger.log(10-1,"FileInfo = " + str(fileinfo))
165 common.logger.log(10-1,"DatasetInfo = " + str(datasets))
166 if len(datasets)<=0:
167 self.exit_status = '1'
168 msg = "Error: No info about dataset in the xml file "+file
169 common.logger.info(msg)
170 return self.exit_status
171 for dataset in datasets:
172 #### for production data
173 self.processedData = dataset['ProcessedDataset']
174 if (dataset['PrimaryDataset'] == 'null'):
175 #dataset['PrimaryDataset'] = dataset['ProcessedDataset']
176 dataset['PrimaryDataset'] = self.userprocessedData
177 #else: # add parentage from input dataset
178 elif self.datasetpath.upper() != 'NONE':
179 dataset['ParentDataset']= self.datasetpath
180
181 dataset['PSetContent']=self.content
182 cfgMeta = {'name' : self.pset , 'Type' : 'user' , 'annotation': 'user cfg', 'version' : 'private version'} # add real name of user cfg
183 common.logger.info("PrimaryDataset = %s"%dataset['PrimaryDataset'])
184 common.logger.info("ProcessedDataset = %s"%dataset['ProcessedDataset'])
185 common.logger.info("<User Dataset Name> = /"+dataset['PrimaryDataset']+"/"+dataset['ProcessedDataset']+"/USER")
186 self.dataset_to_check="/"+dataset['PrimaryDataset']+"/"+dataset['ProcessedDataset']+"/USER"
187
188 common.logger.log(10-1,"--->>> Inserting primary: %s processed : %s"%(dataset['PrimaryDataset'],dataset['ProcessedDataset']))
189
190 primary = DBSWriterObjects.createPrimaryDataset( dataset, dbswriter.dbs)
191 common.logger.log(10-1,"Primary: %s "%primary)
192
193 algo = DBSWriterObjects.createAlgorithm(dataset, cfgMeta, dbswriter.dbs)
194 common.logger.log(10-1,"Algo: %s "%algo)
195
196 processed = DBSWriterObjects.createProcessedDataset(primary, algo, dataset, dbswriter.dbs)
197 common.logger.log(10-1,"Processed: %s "%processed)
198
199 common.logger.log(10-1,"Inserted primary %s processed %s"%(primary,processed))
200
201 common.logger.log(10-1,"exit_status = %s "%self.exit_status)
202 return self.exit_status
203
204 def publishAJobReport(self,file,procdataset):
205 """
206 input: xml file, processedDataset
207 """
208 try:
209 jobReport = readJobReport(file)[0]
210 self.exit_status = '0'
211 except IndexError:
212 self.exit_status = '1'
213 msg = "Error: Problem with "+file+" file"
214 raise CrabException(msg)
215 ### overwrite ProcessedDataset with user defined value
216 ### overwrite lumisections with no value
217 ### skip publication for 0 events files
218 filestopublish=[]
219 for file in jobReport.files:
220 #### added check for problem with copy to SE and empty lfn
221 if (string.find(file['LFN'], 'copy_problems') != -1):
222 self.problemFiles.append(file['LFN'])
223 elif (file['LFN'] == ''):
224 self.noLFN.append(file['PFN'])
225 else:
226 if self.skipOcheck==0:
227 if int(file['TotalEvents']) != 0:
228 #file.lumisections = {}
229 # lumi info are now in run hash
230 file.runs = {}
231 for ds in file.dataset:
232 ### Fede for production
233 if (ds['PrimaryDataset'] == 'null'):
234 #ds['PrimaryDataset']=procdataset
235 ds['PrimaryDataset']=self.userprocessedData
236 filestopublish.append(file)
237 else:
238 self.noEventsFiles.append(file['LFN'])
239 else:
240 file.runs = {}
241 for ds in file.dataset:
242 ### Fede for production
243 if (ds['PrimaryDataset'] == 'null'):
244 #ds['PrimaryDataset']=procdataset
245 ds['PrimaryDataset']=self.userprocessedData
246 filestopublish.append(file)
247
248 jobReport.files = filestopublish
249 ### if all files of FJR have number of events = 0
250 if (len(filestopublish) == 0):
251 return None
252
253 #// DBS to contact
254 dbswriter = DBSWriter(self.DBSURL)
255 # insert files
256 Blocks=None
257 try:
258 Blocks=dbswriter.insertFiles(jobReport)
259 common.logger.info("Inserting file in blocks = %s"%Blocks)
260 except DBSWriterError, ex:
261 common.logger.info("Insert file error: %s"%ex)
262 return Blocks
263
264 def run(self):
265 """
266 parse of all xml file on res dir and creation of distionary
267 """
268
269 file_list = glob.glob(self.resDir+"crab_fjr*.xml")
270 ## Select only those fjr that are succesfull
271 if (len(file_list)==0):
272 common.logger.info("--->>> "+self.resDir+" empty: no file to publish on DBS")
273 self.exit_status = '1'
274 return self.exit_status
275
276 good_list=[]
277 for fjr in file_list:
278 reports = readJobReport(fjr)
279 if len(reports)>0:
280 if reports[0].status == "Success":
281 good_list.append(fjr)
282 file_list=good_list
283 ##
284 common.logger.log(10-1, "file_list = "+str(file_list))
285 common.logger.log(10-1, "len(file_list) = "+str(len(file_list)))
286
287 if (len(file_list)>0):
288 BlocksList=[]
289 common.logger.info("--->>> Start dataset publication")
290 self.exit_status=self.publishDataset(file_list[0])
291 if (self.exit_status == '1'):
292 return self.exit_status
293 common.logger.info("--->>> End dataset publication")
294
295
296 common.logger.info("--->>> Start files publication")
297 for file in file_list:
298 common.logger.debug( "file = "+file)
299 Blocks=self.publishAJobReport(file,self.processedData)
300 if Blocks:
301 for x in Blocks: # do not allow multiple entries of the same block
302 if x not in BlocksList:
303 BlocksList.append(x)
304
305 # close the blocks
306 common.logger.log(10-1, "BlocksList = %s"%BlocksList)
307 # dbswriter = DBSWriter(self.DBSURL,level='ERROR')
308 dbswriter = DBSWriter(self.DBSURL)
309
310 for BlockName in BlocksList:
311 try:
312 closeBlock=dbswriter.manageFileBlock(BlockName,maxFiles= 1)
313 common.logger.log(10-1, "closeBlock %s"%closeBlock)
314 #dbswriter.dbs.closeBlock(BlockName)
315 except DBSWriterError, ex:
316 common.logger.info("Close block error %s"%ex)
317
318 if (len(self.noEventsFiles)>0):
319 common.logger.info("--->>> WARNING: "+str(len(self.noEventsFiles))+" files not published because they contain 0 events are:")
320 for lfn in self.noEventsFiles:
321 common.logger.info("------ LFN: %s"%lfn)
322 if (len(self.noLFN)>0):
323 common.logger.info("--->>> WARNING: there are "+str(len(self.noLFN))+" files not published because they have empty LFN")
324 for pfn in self.noLFN:
325 common.logger.info("------ pfn: %s"%pfn)
326 if (len(self.problemFiles)>0):
327 common.logger.info("--->>> WARNING: "+str(len(self.problemFiles))+" files not published because they had problem with copy to SE")
328 for lfn in self.problemFiles:
329 common.logger.info("------ LFN: %s"%lfn)
330 common.logger.info("--->>> End files publication")
331
332 self.cfg_params['USER.dataset_to_check']=self.dataset_to_check
333 from InspectDBS import InspectDBS
334 check=InspectDBS(self.cfg_params)
335 check.checkPublication()
336 return self.exit_status
337
338 else:
339 common.logger.info("--->>> No valid files to publish on DBS. Your jobs do not report exit codes = 0")
340 self.exit_status = '1'
341 return self.exit_status
342