ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Publisher.py
Revision: 1.37
Committed: Wed Jun 24 16:46:25 2009 UTC (15 years, 10 months ago) by fanzago
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_6_0_pre14, CRAB_2_6_0_pre13
Changes since 1.36: +7 -2 lines
Log Message:
added the key publish_with_import_all_parents, to publish user data in the local dbs importing the complete parents tree

File Contents

# User Rev Content
1 fanzago 1.2 import getopt, string
2 slacapra 1.1 import common
3     import time, glob
4     from Actor import *
5     from crab_util import *
6     from crab_exceptions import *
7 fanzago 1.9 from ProdCommon.FwkJobRep.ReportParser import readJobReport
8 afanfani 1.19 from ProdCommon.FwkJobRep.ReportState import checkSuccess
9 slacapra 1.1 from ProdCommon.MCPayloads.WorkflowSpec import WorkflowSpec
10     from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter
11     from ProdCommon.DataMgmt.DBS.DBSErrors import DBSWriterError, formatEx,DBSReaderError
12     from ProdCommon.DataMgmt.DBS.DBSReader import DBSReader
13 fanzago 1.2 from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter,DBSWriterObjects
14     import sys
15 fanzago 1.35 from DBSAPI.dbsApiException import DbsException
16     from DBSAPI.dbsApi import DbsApi
17 slacapra 1.1
18     class Publisher(Actor):
19     def __init__(self, cfg_params):
20     """
21     Publisher class:
22    
23     - parses CRAB FrameworkJobReport on UI
24     - returns <file> section of xml in dictionary format for each xml file in crab_0_xxxx/res directory
25     - publishes output data on DBS and DLS
26     """
27    
28 fanzago 1.28 self.cfg_params=cfg_params
29    
30 slacapra 1.32 if not cfg_params.has_key('USER.publish_data_name'):
31 slacapra 1.1 raise CrabException('Cannot publish output data, because you did not specify USER.publish_data_name parameter in the crab.cfg file')
32 slacapra 1.30 self.userprocessedData = cfg_params['USER.publish_data_name']
33     self.processedData = None
34 slacapra 1.8
35 slacapra 1.30 if (not cfg_params.has_key('USER.copy_data') or int(cfg_params['USER.copy_data']) != 1 ) or \
36     (not cfg_params.has_key('USER.publish_data') or int(cfg_params['USER.publish_data']) != 1 ):
37 spiga 1.22 msg = 'You can not publish data because you did not selected \n'
38 slacapra 1.34 msg += '\t*** copy_data = 1 and publish_data = 1 *** in the crab.cfg file'
39     raise CrabException(msg)
40 slacapra 1.30
41 slacapra 1.33 if not cfg_params.has_key('CMSSW.pset'):
42     raise CrabException('Cannot publish output data, because you did not specify the psetname in [CMSSW] of your crab.cfg file')
43     self.pset = cfg_params['CMSSW.pset']
44 slacapra 1.30
45     self.globalDBS=cfg_params.get('CMSSW.dbs_url',"http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet")
46    
47     if not cfg_params.has_key('USER.dbs_url_for_publication'):
48 fanzago 1.11 msg = "Warning. The [USER] section does not have 'dbs_url_for_publication'"
49     msg = msg + " entry, necessary to publish the data.\n"
50     msg = msg + "Use the command **crab -publish -USER.dbs_url_for_publication=dbs_url_for_publication*** \nwhere dbs_url_for_publication is your local dbs instance."
51 fanzago 1.3 raise CrabException(msg)
52 slacapra 1.30
53     self.DBSURL=cfg_params['USER.dbs_url_for_publication']
54     common.logger.info('<dbs_url_for_publication> = '+self.DBSURL)
55     if (self.DBSURL == "http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet") or (self.DBSURL == "https://cmsdbsprod.cern.ch:8443/cms_dbs_prod_global_writer/servlet/DBSServlet"):
56     msg = "You can not publish your data in the globalDBS = " + self.DBSURL + "\n"
57     msg = msg + "Please write your local one in the [USER] section 'dbs_url_for_publication'"
58     raise CrabException(msg)
59 fanzago 1.4
60     self.content=file(self.pset).read()
61     self.resDir = common.work_space.resDir()
62 fanzago 1.12
63     self.dataset_to_import=[]
64    
65 slacapra 1.1 self.datasetpath=cfg_params['CMSSW.datasetpath']
66 fanzago 1.12 if (self.datasetpath.upper() != 'NONE'):
67     self.dataset_to_import.append(self.datasetpath)
68    
69     ### Added PU dataset
70 spiga 1.13 tmp = cfg_params.get('CMSSW.dataset_pu',None)
71 fanzago 1.12 if tmp :
72     datasets = tmp.split(',')
73     for dataset in datasets:
74     dataset=string.strip(dataset)
75     self.dataset_to_import.append(dataset)
76     ###
77 spiga 1.24
78 fanzago 1.37 self.import_all_parents = cfg_params.get('USER.publish_with_import_all_parents',0)
79 fanzago 1.29 self.skipOcheck=cfg_params.get('CMSSW.publish_zero_event',0)
80 spiga 1.24
81 slacapra 1.1 self.SEName=''
82     self.CMSSW_VERSION=''
83     self.exit_status=''
84     self.time = time.strftime('%y%m%d_%H%M%S',time.localtime(time.time()))
85 fanzago 1.5 self.problemFiles=[]
86     self.noEventsFiles=[]
87     self.noLFN=[]
88 slacapra 1.1
89     def importParentDataset(self,globalDBS, datasetpath):
90     """
91 fanzago 1.35 """
92 slacapra 1.1 dbsWriter = DBSWriter(self.DBSURL,level='ERROR')
93    
94     try:
95 fanzago 1.37 if (self.import_all_parents=='1'):
96     common.logger.info("--->>> Importing all parents level")
97     dbsWriter.importDataset(globalDBS, datasetpath, self.DBSURL)
98     else:
99     common.logger.info("--->>> Importing only the datasetpath " + datasetpath)
100     dbsWriter.importDatasetWithoutParentage(globalDBS, datasetpath, self.DBSURL)
101 slacapra 1.1 except DBSWriterError, ex:
102     msg = "Error importing dataset to be processed into local DBS\n"
103     msg += "Source Dataset: %s\n" % datasetpath
104     msg += "Source DBS: %s\n" % globalDBS
105     msg += "Destination DBS: %s\n" % self.DBSURL
106 spiga 1.27 common.logger.info(msg)
107     common.logger.debug(str(ex))
108 slacapra 1.1 return 1
109     return 0
110 fanzago 1.35 """
111 fanzago 1.36 print " patch for importParentDataset: datasetpath = ", datasetpath
112 fanzago 1.35 try:
113     args={}
114     args['url']=self.DBSURL
115     args['mode']='POST'
116     block = ""
117     api = DbsApi(args)
118     #api.migrateDatasetContents(srcURL, dstURL, path, block , False)
119     api.migrateDatasetContents(globalDBS, self.DBSURL, datasetpath, block , False)
120    
121     except DbsException, ex:
122     print "Caught API Exception %s: %s " % (ex.getClassName(), ex.getErrorMessage() )
123     if ex.getErrorCode() not in (None, ""):
124     print "DBS Exception Error Code: ", ex.getErrorCode()
125     return 1
126     print "Done"
127     return 0
128 fanzago 1.36 """
129 slacapra 1.1 def publishDataset(self,file):
130     """
131     """
132     try:
133     jobReport = readJobReport(file)[0]
134     self.exit_status = '0'
135     except IndexError:
136     self.exit_status = '1'
137     msg = "Error: Problem with "+file+" file"
138 spiga 1.27 common.logger.info(msg)
139 slacapra 1.1 return self.exit_status
140 fanzago 1.12
141     if (len(self.dataset_to_import) != 0):
142     for dataset in self.dataset_to_import:
143 spiga 1.27 common.logger.info("--->>> Importing parent dataset in the dbs: " +dataset)
144 fanzago 1.12 status_import=self.importParentDataset(self.globalDBS, dataset)
145     if (status_import == 1):
146 spiga 1.27 common.logger.info('Problem with parent '+ dataset +' import from the global DBS '+self.globalDBS+ 'to the local one '+self.DBSURL)
147 fanzago 1.12 self.exit_status='1'
148     return self.exit_status
149     else:
150 spiga 1.27 common.logger.info('Import ok of dataset '+dataset)
151 fanzago 1.2
152 slacapra 1.1 #// DBS to contact
153 fanzago 1.2 dbswriter = DBSWriter(self.DBSURL)
154 slacapra 1.1 try:
155     fileinfo= jobReport.files[0]
156     self.exit_status = '0'
157     except IndexError:
158     self.exit_status = '1'
159     msg = "Error: No file to publish in xml file"+file+" file"
160 spiga 1.27 common.logger.info(msg)
161 slacapra 1.1 return self.exit_status
162    
163     datasets=fileinfo.dataset
164 spiga 1.27 common.logger.log(10-1,"FileInfo = " + str(fileinfo))
165     common.logger.log(10-1,"DatasetInfo = " + str(datasets))
166 afanfani 1.19 if len(datasets)<=0:
167     self.exit_status = '1'
168     msg = "Error: No info about dataset in the xml file "+file
169 spiga 1.27 common.logger.info(msg)
170 afanfani 1.19 return self.exit_status
171 slacapra 1.1 for dataset in datasets:
172 fanzago 1.6 #### for production data
173 afanfani 1.16 self.processedData = dataset['ProcessedDataset']
174 fanzago 1.6 if (dataset['PrimaryDataset'] == 'null'):
175 fanzago 1.21 #dataset['PrimaryDataset'] = dataset['ProcessedDataset']
176     dataset['PrimaryDataset'] = self.userprocessedData
177     #else: # add parentage from input dataset
178     elif self.datasetpath.upper() != 'NONE':
179 afanfani 1.14 dataset['ParentDataset']= self.datasetpath
180    
181 fanzago 1.4 dataset['PSetContent']=self.content
182     cfgMeta = {'name' : self.pset , 'Type' : 'user' , 'annotation': 'user cfg', 'version' : 'private version'} # add real name of user cfg
183 spiga 1.27 common.logger.info("PrimaryDataset = %s"%dataset['PrimaryDataset'])
184     common.logger.info("ProcessedDataset = %s"%dataset['ProcessedDataset'])
185     common.logger.info("<User Dataset Name> = /"+dataset['PrimaryDataset']+"/"+dataset['ProcessedDataset']+"/USER")
186 fanzago 1.28 self.dataset_to_check="/"+dataset['PrimaryDataset']+"/"+dataset['ProcessedDataset']+"/USER"
187 fanzago 1.6
188 spiga 1.27 common.logger.log(10-1,"--->>> Inserting primary: %s processed : %s"%(dataset['PrimaryDataset'],dataset['ProcessedDataset']))
189 slacapra 1.1
190     primary = DBSWriterObjects.createPrimaryDataset( dataset, dbswriter.dbs)
191 spiga 1.27 common.logger.log(10-1,"Primary: %s "%primary)
192 slacapra 1.1
193     algo = DBSWriterObjects.createAlgorithm(dataset, cfgMeta, dbswriter.dbs)
194 spiga 1.27 common.logger.log(10-1,"Algo: %s "%algo)
195 slacapra 1.1
196     processed = DBSWriterObjects.createProcessedDataset(primary, algo, dataset, dbswriter.dbs)
197 spiga 1.27 common.logger.log(10-1,"Processed: %s "%processed)
198 slacapra 1.1
199 spiga 1.27 common.logger.log(10-1,"Inserted primary %s processed %s"%(primary,processed))
200 fanzago 1.2
201 spiga 1.27 common.logger.log(10-1,"exit_status = %s "%self.exit_status)
202 slacapra 1.1 return self.exit_status
203    
204     def publishAJobReport(self,file,procdataset):
205     """
206 fanzago 1.2 input: xml file, processedDataset
207 slacapra 1.1 """
208     try:
209     jobReport = readJobReport(file)[0]
210     self.exit_status = '0'
211     except IndexError:
212     self.exit_status = '1'
213     msg = "Error: Problem with "+file+" file"
214     raise CrabException(msg)
215 fanzago 1.4 ### overwrite ProcessedDataset with user defined value
216     ### overwrite lumisections with no value
217     ### skip publication for 0 events files
218     filestopublish=[]
219 slacapra 1.1 for file in jobReport.files:
220 fanzago 1.5 #### added check for problem with copy to SE and empty lfn
221     if (string.find(file['LFN'], 'copy_problems') != -1):
222     self.problemFiles.append(file['LFN'])
223     elif (file['LFN'] == ''):
224     self.noLFN.append(file['PFN'])
225 fanzago 1.4 else:
226 spiga 1.24 if self.skipOcheck==0:
227     if int(file['TotalEvents']) != 0:
228     #file.lumisections = {}
229     # lumi info are now in run hash
230     file.runs = {}
231     for ds in file.dataset:
232     ### Fede for production
233     if (ds['PrimaryDataset'] == 'null'):
234     #ds['PrimaryDataset']=procdataset
235     ds['PrimaryDataset']=self.userprocessedData
236     filestopublish.append(file)
237     else:
238     self.noEventsFiles.append(file['LFN'])
239     else:
240     file.runs = {}
241     for ds in file.dataset:
242     ### Fede for production
243     if (ds['PrimaryDataset'] == 'null'):
244     #ds['PrimaryDataset']=procdataset
245     ds['PrimaryDataset']=self.userprocessedData
246     filestopublish.append(file)
247    
248 fanzago 1.4 jobReport.files = filestopublish
249     ### if all files of FJR have number of events = 0
250     if (len(filestopublish) == 0):
251     return None
252    
253 slacapra 1.1 #// DBS to contact
254 fanzago 1.2 dbswriter = DBSWriter(self.DBSURL)
255 slacapra 1.1 # insert files
256     Blocks=None
257     try:
258     Blocks=dbswriter.insertFiles(jobReport)
259 spiga 1.27 common.logger.info("Inserting file in blocks = %s"%Blocks)
260 slacapra 1.1 except DBSWriterError, ex:
261 spiga 1.27 common.logger.info("Insert file error: %s"%ex)
262 slacapra 1.1 return Blocks
263    
264     def run(self):
265     """
266     parse of all xml file on res dir and creation of distionary
267     """
268 fanzago 1.2
269 slacapra 1.1 file_list = glob.glob(self.resDir+"crab_fjr*.xml")
270 afanfani 1.19 ## Select only those fjr that are succesfull
271     good_list=[]
272     for fjr in file_list:
273     reports = readJobReport(fjr)
274 afanfani 1.20 if len(reports)>0:
275     if reports[0].status == "Success":
276     good_list.append(fjr)
277 afanfani 1.19 file_list=good_list
278     ##
279 spiga 1.27 common.logger.log(10-1, "file_list = "+str(file_list))
280     common.logger.log(10-1, "len(file_list) = "+str(len(file_list)))
281 fanzago 1.2
282 slacapra 1.1 if (len(file_list)>0):
283     BlocksList=[]
284 spiga 1.27 common.logger.info("--->>> Start dataset publication")
285 fanzago 1.2 self.exit_status=self.publishDataset(file_list[0])
286     if (self.exit_status == '1'):
287     return self.exit_status
288 spiga 1.27 common.logger.info("--->>> End dataset publication")
289 fanzago 1.2
290    
291 spiga 1.27 common.logger.info("--->>> Start files publication")
292 slacapra 1.1 for file in file_list:
293 spiga 1.27 common.logger.debug( "file = "+file)
294 slacapra 1.1 Blocks=self.publishAJobReport(file,self.processedData)
295     if Blocks:
296 afanfani 1.14 for x in Blocks: # do not allow multiple entries of the same block
297     if x not in BlocksList:
298     BlocksList.append(x)
299 fanzago 1.2
300 slacapra 1.1 # close the blocks
301 spiga 1.27 common.logger.log(10-1, "BlocksList = %s"%BlocksList)
302 fanzago 1.2 # dbswriter = DBSWriter(self.DBSURL,level='ERROR')
303     dbswriter = DBSWriter(self.DBSURL)
304    
305 slacapra 1.1 for BlockName in BlocksList:
306     try:
307     closeBlock=dbswriter.manageFileBlock(BlockName,maxFiles= 1)
308 spiga 1.27 common.logger.log(10-1, "closeBlock %s"%closeBlock)
309 slacapra 1.1 #dbswriter.dbs.closeBlock(BlockName)
310     except DBSWriterError, ex:
311 spiga 1.27 common.logger.info("Close block error %s"%ex)
312 fanzago 1.4
313 fanzago 1.5 if (len(self.noEventsFiles)>0):
314 spiga 1.27 common.logger.info("--->>> WARNING: "+str(len(self.noEventsFiles))+" files not published because they contain 0 events are:")
315 fanzago 1.5 for lfn in self.noEventsFiles:
316 spiga 1.27 common.logger.info("------ LFN: %s"%lfn)
317 fanzago 1.5 if (len(self.noLFN)>0):
318 spiga 1.27 common.logger.info("--->>> WARNING: there are "+str(len(self.noLFN))+" files not published because they have empty LFN")
319 fanzago 1.5 for pfn in self.noLFN:
320 spiga 1.27 common.logger.info("------ pfn: %s"%pfn)
321 fanzago 1.5 if (len(self.problemFiles)>0):
322 spiga 1.27 common.logger.info("--->>> WARNING: "+str(len(self.problemFiles))+" files not published because they had problem with copy to SE")
323 fanzago 1.5 for lfn in self.problemFiles:
324 spiga 1.27 common.logger.info("------ LFN: %s"%lfn)
325     common.logger.info("--->>> End files publication")
326 fanzago 1.28
327     self.cfg_params['USER.dataset_to_check']=self.dataset_to_check
328     from InspectDBS import InspectDBS
329     check=InspectDBS(self.cfg_params)
330     check.checkPublication()
331 slacapra 1.1 return self.exit_status
332    
333     else:
334 spiga 1.27 common.logger.info("--->>> "+self.resDir+" empty: no file to publish on DBS")
335 slacapra 1.1 self.exit_status = '1'
336     return self.exit_status
337