ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Publisher.py
Revision: 1.43
Committed: Mon Sep 28 14:34:39 2009 UTC (15 years, 7 months ago) by fanzago
Content type: text/x-python
Branch: MAIN
Changes since 1.42: +18 -21 lines
Log Message:
import of parent using directly the dbs api

File Contents

# User Rev Content
1 fanzago 1.2 import getopt, string
2 slacapra 1.1 import common
3     import time, glob
4     from Actor import *
5     from crab_util import *
6     from crab_exceptions import *
7 fanzago 1.9 from ProdCommon.FwkJobRep.ReportParser import readJobReport
8 afanfani 1.19 from ProdCommon.FwkJobRep.ReportState import checkSuccess
9 slacapra 1.1 from ProdCommon.MCPayloads.WorkflowSpec import WorkflowSpec
10     from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter
11     from ProdCommon.DataMgmt.DBS.DBSErrors import DBSWriterError, formatEx,DBSReaderError
12     from ProdCommon.DataMgmt.DBS.DBSReader import DBSReader
13 fanzago 1.2 from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter,DBSWriterObjects
14     import sys
15 fanzago 1.35 from DBSAPI.dbsApiException import DbsException
16     from DBSAPI.dbsApi import DbsApi
17 slacapra 1.1
18     class Publisher(Actor):
19     def __init__(self, cfg_params):
20     """
21     Publisher class:
22    
23     - parses CRAB FrameworkJobReport on UI
24     - returns <file> section of xml in dictionary format for each xml file in crab_0_xxxx/res directory
25     - publishes output data on DBS and DLS
26     """
27    
28 fanzago 1.28 self.cfg_params=cfg_params
29    
30 slacapra 1.32 if not cfg_params.has_key('USER.publish_data_name'):
31 slacapra 1.1 raise CrabException('Cannot publish output data, because you did not specify USER.publish_data_name parameter in the crab.cfg file')
32 slacapra 1.30 self.userprocessedData = cfg_params['USER.publish_data_name']
33     self.processedData = None
34 slacapra 1.8
35 slacapra 1.30 if (not cfg_params.has_key('USER.copy_data') or int(cfg_params['USER.copy_data']) != 1 ) or \
36     (not cfg_params.has_key('USER.publish_data') or int(cfg_params['USER.publish_data']) != 1 ):
37 spiga 1.22 msg = 'You can not publish data because you did not selected \n'
38 slacapra 1.34 msg += '\t*** copy_data = 1 and publish_data = 1 *** in the crab.cfg file'
39     raise CrabException(msg)
40 slacapra 1.30
41 slacapra 1.33 if not cfg_params.has_key('CMSSW.pset'):
42     raise CrabException('Cannot publish output data, because you did not specify the psetname in [CMSSW] of your crab.cfg file')
43     self.pset = cfg_params['CMSSW.pset']
44 slacapra 1.30
45     self.globalDBS=cfg_params.get('CMSSW.dbs_url',"http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet")
46    
47     if not cfg_params.has_key('USER.dbs_url_for_publication'):
48 fanzago 1.11 msg = "Warning. The [USER] section does not have 'dbs_url_for_publication'"
49     msg = msg + " entry, necessary to publish the data.\n"
50     msg = msg + "Use the command **crab -publish -USER.dbs_url_for_publication=dbs_url_for_publication*** \nwhere dbs_url_for_publication is your local dbs instance."
51 fanzago 1.3 raise CrabException(msg)
52 slacapra 1.30
53     self.DBSURL=cfg_params['USER.dbs_url_for_publication']
54     common.logger.info('<dbs_url_for_publication> = '+self.DBSURL)
55     if (self.DBSURL == "http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet") or (self.DBSURL == "https://cmsdbsprod.cern.ch:8443/cms_dbs_prod_global_writer/servlet/DBSServlet"):
56     msg = "You can not publish your data in the globalDBS = " + self.DBSURL + "\n"
57     msg = msg + "Please write your local one in the [USER] section 'dbs_url_for_publication'"
58     raise CrabException(msg)
59 fanzago 1.4
60     self.content=file(self.pset).read()
61     self.resDir = common.work_space.resDir()
62 fanzago 1.12
63     self.dataset_to_import=[]
64    
65 slacapra 1.1 self.datasetpath=cfg_params['CMSSW.datasetpath']
66 fanzago 1.12 if (self.datasetpath.upper() != 'NONE'):
67     self.dataset_to_import.append(self.datasetpath)
68    
69     ### Added PU dataset
70 spiga 1.13 tmp = cfg_params.get('CMSSW.dataset_pu',None)
71 fanzago 1.12 if tmp :
72     datasets = tmp.split(',')
73     for dataset in datasets:
74     dataset=string.strip(dataset)
75     self.dataset_to_import.append(dataset)
76     ###
77 spiga 1.24
78 fanzago 1.38 self.import_all_parents = cfg_params.get('USER.publish_with_import_all_parents',1)
79 fanzago 1.29 self.skipOcheck=cfg_params.get('CMSSW.publish_zero_event',0)
80 spiga 1.24
81 slacapra 1.1 self.SEName=''
82     self.CMSSW_VERSION=''
83     self.exit_status=''
84     self.time = time.strftime('%y%m%d_%H%M%S',time.localtime(time.time()))
85 fanzago 1.5 self.problemFiles=[]
86     self.noEventsFiles=[]
87     self.noLFN=[]
88 slacapra 1.1
89     def importParentDataset(self,globalDBS, datasetpath):
90     """
91 fanzago 1.35 """
92 slacapra 1.1 dbsWriter = DBSWriter(self.DBSURL,level='ERROR')
93    
94     try:
95 fanzago 1.39 if (self.import_all_parents==1):
96 fanzago 1.37 common.logger.info("--->>> Importing all parents level")
97 fanzago 1.43 start = time.time()
98     common.logger.debug("start import time: " + str(start))
99 fanzago 1.41 ### to skip the ProdCommon api exception in the case of block without location
100     ### skipNoSiteError=True
101 fanzago 1.43 #dbsWriter.importDataset(globalDBS, datasetpath, self.DBSURL, skipNoSiteError=True)
102     ### calling dbs api directly
103     dbsWriter.dbs.migrateDatasetContents(globalDBS, self.DBSURL, datasetpath)
104     stop = time.time()
105     common.logger.debug("stop import time: " + str(stop))
106     common.logger.info("--->>> duration of all parents import (sec): "+str(stop - start))
107    
108 fanzago 1.37 else:
109     common.logger.info("--->>> Importing only the datasetpath " + datasetpath)
110 fanzago 1.43 start = time.time()
111     #dbsWriter.importDatasetWithoutParentage(globalDBS, datasetpath, self.DBSURL, skipNoSiteError=True)
112     ### calling dbs api directly
113     common.logger.debug("start import time: " + str(start))
114     dbsWriter.dbs.migrateDatasetContents(globalDBS, self.DBSURL, datasetpath, noParentsReadOnly = True )
115     stop = time.time()
116     common.logger.debug("stop import time: " + str(stop))
117     common.logger.info("--->>> duration of first level parent import (sec): "+str(stop - start))
118 slacapra 1.1 except DBSWriterError, ex:
119     msg = "Error importing dataset to be processed into local DBS\n"
120     msg += "Source Dataset: %s\n" % datasetpath
121     msg += "Source DBS: %s\n" % globalDBS
122     msg += "Destination DBS: %s\n" % self.DBSURL
123 spiga 1.27 common.logger.info(msg)
124 fanzago 1.42 common.logger.info(str(ex))
125 slacapra 1.1 return 1
126     return 0
127 fanzago 1.43
128 slacapra 1.1 def publishDataset(self,file):
129     """
130     """
131     try:
132     jobReport = readJobReport(file)[0]
133     self.exit_status = '0'
134     except IndexError:
135     self.exit_status = '1'
136     msg = "Error: Problem with "+file+" file"
137 spiga 1.27 common.logger.info(msg)
138 slacapra 1.1 return self.exit_status
139 fanzago 1.12
140     if (len(self.dataset_to_import) != 0):
141     for dataset in self.dataset_to_import:
142 spiga 1.27 common.logger.info("--->>> Importing parent dataset in the dbs: " +dataset)
143 fanzago 1.12 status_import=self.importParentDataset(self.globalDBS, dataset)
144     if (status_import == 1):
145 spiga 1.27 common.logger.info('Problem with parent '+ dataset +' import from the global DBS '+self.globalDBS+ 'to the local one '+self.DBSURL)
146 fanzago 1.12 self.exit_status='1'
147     return self.exit_status
148     else:
149 spiga 1.27 common.logger.info('Import ok of dataset '+dataset)
150 fanzago 1.2
151 slacapra 1.1 #// DBS to contact
152 fanzago 1.2 dbswriter = DBSWriter(self.DBSURL)
153 slacapra 1.1 try:
154     fileinfo= jobReport.files[0]
155     self.exit_status = '0'
156     except IndexError:
157     self.exit_status = '1'
158 fanzago 1.42 msg = "Error: No EDM file to publish in xml file"+file+" file"
159 spiga 1.27 common.logger.info(msg)
160 slacapra 1.1 return self.exit_status
161    
162     datasets=fileinfo.dataset
163 spiga 1.27 common.logger.log(10-1,"FileInfo = " + str(fileinfo))
164     common.logger.log(10-1,"DatasetInfo = " + str(datasets))
165 afanfani 1.19 if len(datasets)<=0:
166     self.exit_status = '1'
167     msg = "Error: No info about dataset in the xml file "+file
168 spiga 1.27 common.logger.info(msg)
169 afanfani 1.19 return self.exit_status
170 slacapra 1.1 for dataset in datasets:
171 fanzago 1.6 #### for production data
172 afanfani 1.16 self.processedData = dataset['ProcessedDataset']
173 fanzago 1.6 if (dataset['PrimaryDataset'] == 'null'):
174 fanzago 1.21 #dataset['PrimaryDataset'] = dataset['ProcessedDataset']
175     dataset['PrimaryDataset'] = self.userprocessedData
176     #else: # add parentage from input dataset
177     elif self.datasetpath.upper() != 'NONE':
178 afanfani 1.14 dataset['ParentDataset']= self.datasetpath
179    
180 fanzago 1.4 dataset['PSetContent']=self.content
181     cfgMeta = {'name' : self.pset , 'Type' : 'user' , 'annotation': 'user cfg', 'version' : 'private version'} # add real name of user cfg
182 spiga 1.27 common.logger.info("PrimaryDataset = %s"%dataset['PrimaryDataset'])
183     common.logger.info("ProcessedDataset = %s"%dataset['ProcessedDataset'])
184     common.logger.info("<User Dataset Name> = /"+dataset['PrimaryDataset']+"/"+dataset['ProcessedDataset']+"/USER")
185 fanzago 1.28 self.dataset_to_check="/"+dataset['PrimaryDataset']+"/"+dataset['ProcessedDataset']+"/USER"
186 fanzago 1.6
187 spiga 1.27 common.logger.log(10-1,"--->>> Inserting primary: %s processed : %s"%(dataset['PrimaryDataset'],dataset['ProcessedDataset']))
188 slacapra 1.1
189     primary = DBSWriterObjects.createPrimaryDataset( dataset, dbswriter.dbs)
190 spiga 1.27 common.logger.log(10-1,"Primary: %s "%primary)
191 slacapra 1.1
192     algo = DBSWriterObjects.createAlgorithm(dataset, cfgMeta, dbswriter.dbs)
193 spiga 1.27 common.logger.log(10-1,"Algo: %s "%algo)
194 slacapra 1.1
195     processed = DBSWriterObjects.createProcessedDataset(primary, algo, dataset, dbswriter.dbs)
196 spiga 1.27 common.logger.log(10-1,"Processed: %s "%processed)
197 slacapra 1.1
198 spiga 1.27 common.logger.log(10-1,"Inserted primary %s processed %s"%(primary,processed))
199 fanzago 1.2
200 spiga 1.27 common.logger.log(10-1,"exit_status = %s "%self.exit_status)
201 slacapra 1.1 return self.exit_status
202    
203     def publishAJobReport(self,file,procdataset):
204     """
205 fanzago 1.2 input: xml file, processedDataset
206 slacapra 1.1 """
207     try:
208     jobReport = readJobReport(file)[0]
209     self.exit_status = '0'
210     except IndexError:
211     self.exit_status = '1'
212     msg = "Error: Problem with "+file+" file"
213     raise CrabException(msg)
214 fanzago 1.4 ### overwrite ProcessedDataset with user defined value
215     ### overwrite lumisections with no value
216     ### skip publication for 0 events files
217     filestopublish=[]
218 slacapra 1.1 for file in jobReport.files:
219 fanzago 1.5 #### added check for problem with copy to SE and empty lfn
220     if (string.find(file['LFN'], 'copy_problems') != -1):
221     self.problemFiles.append(file['LFN'])
222     elif (file['LFN'] == ''):
223     self.noLFN.append(file['PFN'])
224 fanzago 1.4 else:
225 spiga 1.24 if self.skipOcheck==0:
226     if int(file['TotalEvents']) != 0:
227     #file.lumisections = {}
228     # lumi info are now in run hash
229     file.runs = {}
230     for ds in file.dataset:
231     ### Fede for production
232     if (ds['PrimaryDataset'] == 'null'):
233     #ds['PrimaryDataset']=procdataset
234     ds['PrimaryDataset']=self.userprocessedData
235     filestopublish.append(file)
236     else:
237     self.noEventsFiles.append(file['LFN'])
238     else:
239     file.runs = {}
240     for ds in file.dataset:
241     ### Fede for production
242     if (ds['PrimaryDataset'] == 'null'):
243     #ds['PrimaryDataset']=procdataset
244     ds['PrimaryDataset']=self.userprocessedData
245     filestopublish.append(file)
246    
247 fanzago 1.4 jobReport.files = filestopublish
248     ### if all files of FJR have number of events = 0
249     if (len(filestopublish) == 0):
250     return None
251    
252 slacapra 1.1 #// DBS to contact
253 fanzago 1.2 dbswriter = DBSWriter(self.DBSURL)
254 slacapra 1.1 # insert files
255     Blocks=None
256     try:
257     Blocks=dbswriter.insertFiles(jobReport)
258 spiga 1.27 common.logger.info("Inserting file in blocks = %s"%Blocks)
259 slacapra 1.1 except DBSWriterError, ex:
260 spiga 1.27 common.logger.info("Insert file error: %s"%ex)
261 slacapra 1.1 return Blocks
262    
263     def run(self):
264     """
265     parse of all xml file on res dir and creation of distionary
266     """
267 fanzago 1.2
268 slacapra 1.1 file_list = glob.glob(self.resDir+"crab_fjr*.xml")
269 afanfani 1.19 ## Select only those fjr that are succesfull
270 spiga 1.40 if (len(file_list)==0):
271     common.logger.info("--->>> "+self.resDir+" empty: no file to publish on DBS")
272     self.exit_status = '1'
273     return self.exit_status
274    
275 afanfani 1.19 good_list=[]
276     for fjr in file_list:
277     reports = readJobReport(fjr)
278 afanfani 1.20 if len(reports)>0:
279     if reports[0].status == "Success":
280     good_list.append(fjr)
281 afanfani 1.19 file_list=good_list
282     ##
283 spiga 1.27 common.logger.log(10-1, "file_list = "+str(file_list))
284     common.logger.log(10-1, "len(file_list) = "+str(len(file_list)))
285 fanzago 1.2
286 slacapra 1.1 if (len(file_list)>0):
287     BlocksList=[]
288 spiga 1.27 common.logger.info("--->>> Start dataset publication")
289 fanzago 1.2 self.exit_status=self.publishDataset(file_list[0])
290     if (self.exit_status == '1'):
291     return self.exit_status
292 spiga 1.27 common.logger.info("--->>> End dataset publication")
293 fanzago 1.2
294    
295 spiga 1.27 common.logger.info("--->>> Start files publication")
296 slacapra 1.1 for file in file_list:
297 spiga 1.27 common.logger.debug( "file = "+file)
298 slacapra 1.1 Blocks=self.publishAJobReport(file,self.processedData)
299     if Blocks:
300 afanfani 1.14 for x in Blocks: # do not allow multiple entries of the same block
301     if x not in BlocksList:
302     BlocksList.append(x)
303 fanzago 1.2
304 slacapra 1.1 # close the blocks
305 spiga 1.27 common.logger.log(10-1, "BlocksList = %s"%BlocksList)
306 fanzago 1.2 # dbswriter = DBSWriter(self.DBSURL,level='ERROR')
307     dbswriter = DBSWriter(self.DBSURL)
308    
309 slacapra 1.1 for BlockName in BlocksList:
310     try:
311     closeBlock=dbswriter.manageFileBlock(BlockName,maxFiles= 1)
312 spiga 1.27 common.logger.log(10-1, "closeBlock %s"%closeBlock)
313 slacapra 1.1 #dbswriter.dbs.closeBlock(BlockName)
314     except DBSWriterError, ex:
315 spiga 1.27 common.logger.info("Close block error %s"%ex)
316 fanzago 1.4
317 fanzago 1.5 if (len(self.noEventsFiles)>0):
318 spiga 1.27 common.logger.info("--->>> WARNING: "+str(len(self.noEventsFiles))+" files not published because they contain 0 events are:")
319 fanzago 1.5 for lfn in self.noEventsFiles:
320 spiga 1.27 common.logger.info("------ LFN: %s"%lfn)
321 fanzago 1.5 if (len(self.noLFN)>0):
322 spiga 1.27 common.logger.info("--->>> WARNING: there are "+str(len(self.noLFN))+" files not published because they have empty LFN")
323 fanzago 1.5 for pfn in self.noLFN:
324 spiga 1.27 common.logger.info("------ pfn: %s"%pfn)
325 fanzago 1.5 if (len(self.problemFiles)>0):
326 spiga 1.27 common.logger.info("--->>> WARNING: "+str(len(self.problemFiles))+" files not published because they had problem with copy to SE")
327 fanzago 1.5 for lfn in self.problemFiles:
328 spiga 1.27 common.logger.info("------ LFN: %s"%lfn)
329     common.logger.info("--->>> End files publication")
330 fanzago 1.28
331     self.cfg_params['USER.dataset_to_check']=self.dataset_to_check
332     from InspectDBS import InspectDBS
333     check=InspectDBS(self.cfg_params)
334     check.checkPublication()
335 slacapra 1.1 return self.exit_status
336    
337     else:
338 spiga 1.40 common.logger.info("--->>> No valid files to publish on DBS. Your jobs do not report exit codes = 0")
339 slacapra 1.1 self.exit_status = '1'
340     return self.exit_status
341