ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Publisher.py
Revision: 1.23
Committed: Mon Feb 9 15:17:51 2009 UTC (16 years, 2 months ago) by fanzago
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_5_0_pre7, CRAB_2_5_0_pre6, CRAB_2_5_0_pre5, CRAB_2_5_0_pre4, CRAB_2_5_0_pre3
Changes since 1.22: +10 -11 lines
Log Message:
removed the check about number of events not 0

File Contents

# User Rev Content
1 fanzago 1.2 import getopt, string
2 slacapra 1.1 import common
3     import time, glob
4     from Actor import *
5     from crab_util import *
6     from crab_logger import Logger
7     from crab_exceptions import *
8 fanzago 1.9 from ProdCommon.FwkJobRep.ReportParser import readJobReport
9 afanfani 1.19 from ProdCommon.FwkJobRep.ReportState import checkSuccess
10 slacapra 1.1 from ProdCommon.MCPayloads.WorkflowSpec import WorkflowSpec
11     from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter
12     from ProdCommon.DataMgmt.DBS.DBSErrors import DBSWriterError, formatEx,DBSReaderError
13     from ProdCommon.DataMgmt.DBS.DBSReader import DBSReader
14 fanzago 1.2 from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter,DBSWriterObjects
15     import sys
16 slacapra 1.1
17     class Publisher(Actor):
18     def __init__(self, cfg_params):
19     """
20     Publisher class:
21    
22     - parses CRAB FrameworkJobReport on UI
23     - returns <file> section of xml in dictionary format for each xml file in crab_0_xxxx/res directory
24     - publishes output data on DBS and DLS
25     """
26    
27     try:
28 fanzago 1.21 self.userprocessedData = cfg_params['USER.publish_data_name']
29 afanfani 1.16 self.processedData = None
30 slacapra 1.1 except KeyError:
31     raise CrabException('Cannot publish output data, because you did not specify USER.publish_data_name parameter in the crab.cfg file')
32 slacapra 1.8
33 slacapra 1.1 try:
34 spiga 1.22 if (int(cfg_params['USER.copy_data']) != 1):
35     raise KeyError
36 slacapra 1.1 except KeyError:
37 spiga 1.22 msg = 'You can not publish data because you did not selected \n'
38     msg += '\t*** copy_data = 1 or publish_data = 1 *** in the crab.cfg file'
39     raise CrabException(msg)
40 fanzago 1.4 try:
41     self.pset = cfg_params['CMSSW.pset']
42     except KeyError:
43     raise CrabException('Cannot publish output data, because you did not specify the psetname in [CMSSW] of your crab.cfg file')
44     try:
45     self.globalDBS=cfg_params['CMSSW.dbs_url']
46     except KeyError:
47     self.globalDBS="http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet"
48 fanzago 1.3 try:
49     self.DBSURL=cfg_params['USER.dbs_url_for_publication']
50 fanzago 1.6 common.logger.message('<dbs_url_for_publication> = '+self.DBSURL)
51     if (self.DBSURL == "http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet") or (self.DBSURL == "https://cmsdbsprod.cern.ch:8443/cms_dbs_prod_global_writer/servlet/DBSServlet"):
52 fanzago 1.4 msg = "You can not publish your data in the globalDBS = " + self.DBSURL + "\n"
53     msg = msg + "Please write your local one in the [USER] section 'dbs_url_for_publication'"
54     raise CrabException(msg)
55 fanzago 1.3 except KeyError:
56 fanzago 1.11 msg = "Warning. The [USER] section does not have 'dbs_url_for_publication'"
57     msg = msg + " entry, necessary to publish the data.\n"
58     msg = msg + "Use the command **crab -publish -USER.dbs_url_for_publication=dbs_url_for_publication*** \nwhere dbs_url_for_publication is your local dbs instance."
59 fanzago 1.3 raise CrabException(msg)
60 fanzago 1.4
61     self.content=file(self.pset).read()
62     self.resDir = common.work_space.resDir()
63 fanzago 1.12
64     self.dataset_to_import=[]
65    
66 slacapra 1.1 self.datasetpath=cfg_params['CMSSW.datasetpath']
67 fanzago 1.12 if (self.datasetpath.upper() != 'NONE'):
68     self.dataset_to_import.append(self.datasetpath)
69    
70     ### Added PU dataset
71 spiga 1.13 tmp = cfg_params.get('CMSSW.dataset_pu',None)
72 fanzago 1.12 if tmp :
73     datasets = tmp.split(',')
74     for dataset in datasets:
75     dataset=string.strip(dataset)
76     self.dataset_to_import.append(dataset)
77     ###
78    
79 slacapra 1.1 self.SEName=''
80     self.CMSSW_VERSION=''
81     self.exit_status=''
82     self.time = time.strftime('%y%m%d_%H%M%S',time.localtime(time.time()))
83 fanzago 1.5 self.problemFiles=[]
84     self.noEventsFiles=[]
85     self.noLFN=[]
86 slacapra 1.1
87     def importParentDataset(self,globalDBS, datasetpath):
88     """
89     """
90     dbsWriter = DBSWriter(self.DBSURL,level='ERROR')
91    
92     try:
93 afanfani 1.10 dbsWriter.importDatasetWithoutParentage(globalDBS, self.datasetpath, self.DBSURL)
94 slacapra 1.1 except DBSWriterError, ex:
95     msg = "Error importing dataset to be processed into local DBS\n"
96     msg += "Source Dataset: %s\n" % datasetpath
97     msg += "Source DBS: %s\n" % globalDBS
98     msg += "Destination DBS: %s\n" % self.DBSURL
99     common.logger.message(msg)
100 spiga 1.22 common.logger.write(str(ex))
101 slacapra 1.1 return 1
102     return 0
103    
104     def publishDataset(self,file):
105     """
106     """
107     try:
108     jobReport = readJobReport(file)[0]
109     self.exit_status = '0'
110     except IndexError:
111     self.exit_status = '1'
112     msg = "Error: Problem with "+file+" file"
113     common.logger.message(msg)
114     return self.exit_status
115 fanzago 1.12
116     if (len(self.dataset_to_import) != 0):
117     for dataset in self.dataset_to_import:
118     common.logger.message("--->>> Importing parent dataset in the dbs: " +dataset)
119     status_import=self.importParentDataset(self.globalDBS, dataset)
120     if (status_import == 1):
121     common.logger.message('Problem with parent '+ dataset +' import from the global DBS '+self.globalDBS+ 'to the local one '+self.DBSURL)
122     self.exit_status='1'
123     return self.exit_status
124     else:
125     common.logger.message('Import ok of dataset '+dataset)
126 fanzago 1.2
127 slacapra 1.1 #// DBS to contact
128 fanzago 1.2 dbswriter = DBSWriter(self.DBSURL)
129 slacapra 1.1 try:
130     fileinfo= jobReport.files[0]
131     self.exit_status = '0'
132     except IndexError:
133     self.exit_status = '1'
134     msg = "Error: No file to publish in xml file"+file+" file"
135     common.logger.message(msg)
136     return self.exit_status
137    
138     datasets=fileinfo.dataset
139 fanzago 1.4 common.logger.debug(6,"FileInfo = " + str(fileinfo))
140     common.logger.debug(6,"DatasetInfo = " + str(datasets))
141 afanfani 1.19 if len(datasets)<=0:
142     self.exit_status = '1'
143     msg = "Error: No info about dataset in the xml file "+file
144     common.logger.message(msg)
145     return self.exit_status
146 slacapra 1.1 for dataset in datasets:
147 fanzago 1.6 #### for production data
148 afanfani 1.16 self.processedData = dataset['ProcessedDataset']
149 fanzago 1.6 if (dataset['PrimaryDataset'] == 'null'):
150 fanzago 1.21 #dataset['PrimaryDataset'] = dataset['ProcessedDataset']
151     dataset['PrimaryDataset'] = self.userprocessedData
152     #else: # add parentage from input dataset
153     elif self.datasetpath.upper() != 'NONE':
154 afanfani 1.14 dataset['ParentDataset']= self.datasetpath
155    
156 fanzago 1.4 dataset['PSetContent']=self.content
157     cfgMeta = {'name' : self.pset , 'Type' : 'user' , 'annotation': 'user cfg', 'version' : 'private version'} # add real name of user cfg
158 slacapra 1.1 common.logger.message("PrimaryDataset = %s"%dataset['PrimaryDataset'])
159     common.logger.message("ProcessedDataset = %s"%dataset['ProcessedDataset'])
160 fanzago 1.6 common.logger.message("<User Dataset Name> = /"+dataset['PrimaryDataset']+"/"+dataset['ProcessedDataset']+"/USER")
161    
162     common.logger.debug(6,"--->>> Inserting primary: %s processed : %s"%(dataset['PrimaryDataset'],dataset['ProcessedDataset']))
163 slacapra 1.1
164     primary = DBSWriterObjects.createPrimaryDataset( dataset, dbswriter.dbs)
165 fanzago 1.4 common.logger.debug(6,"Primary: %s "%primary)
166 slacapra 1.1
167     algo = DBSWriterObjects.createAlgorithm(dataset, cfgMeta, dbswriter.dbs)
168 fanzago 1.4 common.logger.debug(6,"Algo: %s "%algo)
169 slacapra 1.1
170     processed = DBSWriterObjects.createProcessedDataset(primary, algo, dataset, dbswriter.dbs)
171 fanzago 1.4 common.logger.debug(6,"Processed: %s "%processed)
172 slacapra 1.1
173 fanzago 1.6 common.logger.debug(6,"Inserted primary %s processed %s"%(primary,processed))
174 fanzago 1.2
175 fanzago 1.4 common.logger.debug(6,"exit_status = %s "%self.exit_status)
176 slacapra 1.1 return self.exit_status
177    
178     def publishAJobReport(self,file,procdataset):
179     """
180 fanzago 1.2 input: xml file, processedDataset
181 slacapra 1.1 """
182     try:
183     jobReport = readJobReport(file)[0]
184     self.exit_status = '0'
185     except IndexError:
186     self.exit_status = '1'
187     msg = "Error: Problem with "+file+" file"
188     raise CrabException(msg)
189 fanzago 1.4 ### overwrite ProcessedDataset with user defined value
190     ### overwrite lumisections with no value
191     ### skip publication for 0 events files
192     filestopublish=[]
193 slacapra 1.1 for file in jobReport.files:
194 fanzago 1.5 #### added check for problem with copy to SE and empty lfn
195     if (string.find(file['LFN'], 'copy_problems') != -1):
196     self.problemFiles.append(file['LFN'])
197     elif (file['LFN'] == ''):
198     self.noLFN.append(file['PFN'])
199 fanzago 1.4 else:
200 fanzago 1.23 ### Fede removed check about number of events
201     #if int(file['TotalEvents']) != 0 :
202 afanfani 1.17 # lumi info are now in run hash
203 fanzago 1.23 file.runs = {}
204     for ds in file.dataset:
205     ### Fede for production
206     if (ds['PrimaryDataset'] == 'null'):
207     ds['PrimaryDataset']=self.userprocessedData
208     filestopublish.append(file)
209     #else:
210     # self.noEventsFiles.append(file['LFN'])
211 fanzago 1.4 jobReport.files = filestopublish
212     ### if all files of FJR have number of events = 0
213     if (len(filestopublish) == 0):
214     return None
215    
216 slacapra 1.1 #// DBS to contact
217 fanzago 1.2 dbswriter = DBSWriter(self.DBSURL)
218 slacapra 1.1 # insert files
219     Blocks=None
220     try:
221     Blocks=dbswriter.insertFiles(jobReport)
222 mcinquil 1.18 common.logger.message("Inserting file in blocks = %s"%Blocks)
223 slacapra 1.1 except DBSWriterError, ex:
224 mcinquil 1.18 common.logger.error("Insert file error: %s"%ex)
225 slacapra 1.1 return Blocks
226    
227     def run(self):
228     """
229     parse of all xml file on res dir and creation of distionary
230     """
231 fanzago 1.2
232 slacapra 1.1 file_list = glob.glob(self.resDir+"crab_fjr*.xml")
233 afanfani 1.19 ## Select only those fjr that are succesfull
234     good_list=[]
235     for fjr in file_list:
236     reports = readJobReport(fjr)
237 afanfani 1.20 if len(reports)>0:
238     if reports[0].status == "Success":
239     good_list.append(fjr)
240 afanfani 1.19 file_list=good_list
241     ##
242 slacapra 1.1 common.logger.debug(6, "file_list = "+str(file_list))
243     common.logger.debug(6, "len(file_list) = "+str(len(file_list)))
244 fanzago 1.2
245 slacapra 1.1 if (len(file_list)>0):
246     BlocksList=[]
247 fanzago 1.2 common.logger.message("--->>> Start dataset publication")
248     self.exit_status=self.publishDataset(file_list[0])
249     if (self.exit_status == '1'):
250     return self.exit_status
251     common.logger.message("--->>> End dataset publication")
252    
253    
254     common.logger.message("--->>> Start files publication")
255 slacapra 1.1 for file in file_list:
256 mcinquil 1.18 common.logger.debug(1, "file = "+file)
257 slacapra 1.1 Blocks=self.publishAJobReport(file,self.processedData)
258     if Blocks:
259 afanfani 1.14 for x in Blocks: # do not allow multiple entries of the same block
260     if x not in BlocksList:
261     BlocksList.append(x)
262 fanzago 1.2
263 slacapra 1.1 # close the blocks
264 fanzago 1.6 common.logger.debug(6, "BlocksList = %s"%BlocksList)
265 fanzago 1.2 # dbswriter = DBSWriter(self.DBSURL,level='ERROR')
266     dbswriter = DBSWriter(self.DBSURL)
267    
268 slacapra 1.1 for BlockName in BlocksList:
269     try:
270     closeBlock=dbswriter.manageFileBlock(BlockName,maxFiles= 1)
271 fanzago 1.6 common.logger.debug(6, "closeBlock %s"%closeBlock)
272 slacapra 1.1 #dbswriter.dbs.closeBlock(BlockName)
273     except DBSWriterError, ex:
274 fanzago 1.2 common.logger.message("Close block error %s"%ex)
275 fanzago 1.4
276 fanzago 1.5 if (len(self.noEventsFiles)>0):
277     common.logger.message("--->>> WARNING: "+str(len(self.noEventsFiles))+" files not published because they contain 0 events are:")
278     for lfn in self.noEventsFiles:
279     common.logger.message("------ LFN: %s"%lfn)
280     if (len(self.noLFN)>0):
281     common.logger.message("--->>> WARNING: there are "+str(len(self.noLFN))+" files not published because they have empty LFN")
282     for pfn in self.noLFN:
283     common.logger.message("------ pfn: %s"%pfn)
284     if (len(self.problemFiles)>0):
285     common.logger.message("--->>> WARNING: "+str(len(self.problemFiles))+" files not published because they had problem with copy to SE")
286     for lfn in self.problemFiles:
287 fanzago 1.4 common.logger.message("------ LFN: %s"%lfn)
288 fanzago 1.6 common.logger.message("--->>> End files publication")
289 fanzago 1.7 common.logger.message("--->>> To check data publication please use: InspectDBS2.py --DBSURL=<dbs_url_for_publication> --datasetPath=<User Dataset Name>")
290 slacapra 1.1 return self.exit_status
291    
292     else:
293 fanzago 1.2 common.logger.message("--->>> "+self.resDir+" empty: no file to publish on DBS")
294 slacapra 1.1 self.exit_status = '1'
295     return self.exit_status
296