ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Publisher.py
Revision: 1.19
Committed: Mon Nov 17 13:02:07 2008 UTC (16 years, 5 months ago) by afanfani
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_4_3, CRAB_2_4_3_pre8, CRAB_2_4_3_pre7, CRAB_2_4_3_pre6, CRAB_2_4_3_pre5, CRAB_2_4_3_pre3, CRAB_2_4_3_pre2, CRAB_2_4_3_pre1
Changes since 1.18: +14 -0 lines
Log Message:
Take into account only succesfull fjr . See bug #44121

File Contents

# User Rev Content
1 fanzago 1.2 import getopt, string
2 slacapra 1.1 import common
3     import time, glob
4     from Actor import *
5     from crab_util import *
6     from crab_logger import Logger
7     from crab_exceptions import *
8 fanzago 1.9 from ProdCommon.FwkJobRep.ReportParser import readJobReport
9 afanfani 1.19 from ProdCommon.FwkJobRep.ReportState import checkSuccess
10 slacapra 1.1 from ProdCommon.MCPayloads.WorkflowSpec import WorkflowSpec
11     from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter
12     from ProdCommon.DataMgmt.DBS.DBSErrors import DBSWriterError, formatEx,DBSReaderError
13     from ProdCommon.DataMgmt.DBS.DBSReader import DBSReader
14 fanzago 1.2 from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter,DBSWriterObjects
15     import sys
16 slacapra 1.1
17     class Publisher(Actor):
18     def __init__(self, cfg_params):
19     """
20     Publisher class:
21    
22     - parses CRAB FrameworkJobReport on UI
23     - returns <file> section of xml in dictionary format for each xml file in crab_0_xxxx/res directory
24     - publishes output data on DBS and DLS
25     """
26    
27     try:
28 afanfani 1.16 userprocessedData = cfg_params['USER.publish_data_name']
29     self.processedData = None
30 slacapra 1.1 except KeyError:
31     raise CrabException('Cannot publish output data, because you did not specify USER.publish_data_name parameter in the crab.cfg file')
32 slacapra 1.8
33 slacapra 1.1 try:
34     if (int(cfg_params['USER.copy_data']) != 1): raise KeyError
35     except KeyError:
36     raise CrabException('You can not publish data because you did not selected *** copy_data = 1 *** in the crab.cfg file')
37 fanzago 1.4 try:
38     self.pset = cfg_params['CMSSW.pset']
39     except KeyError:
40     raise CrabException('Cannot publish output data, because you did not specify the psetname in [CMSSW] of your crab.cfg file')
41     try:
42     self.globalDBS=cfg_params['CMSSW.dbs_url']
43     except KeyError:
44     self.globalDBS="http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet"
45 fanzago 1.3 try:
46     self.DBSURL=cfg_params['USER.dbs_url_for_publication']
47 fanzago 1.6 common.logger.message('<dbs_url_for_publication> = '+self.DBSURL)
48     if (self.DBSURL == "http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet") or (self.DBSURL == "https://cmsdbsprod.cern.ch:8443/cms_dbs_prod_global_writer/servlet/DBSServlet"):
49 fanzago 1.4 msg = "You can not publish your data in the globalDBS = " + self.DBSURL + "\n"
50     msg = msg + "Please write your local one in the [USER] section 'dbs_url_for_publication'"
51     raise CrabException(msg)
52 fanzago 1.3 except KeyError:
53 fanzago 1.11 msg = "Warning. The [USER] section does not have 'dbs_url_for_publication'"
54     msg = msg + " entry, necessary to publish the data.\n"
55     msg = msg + "Use the command **crab -publish -USER.dbs_url_for_publication=dbs_url_for_publication*** \nwhere dbs_url_for_publication is your local dbs instance."
56 fanzago 1.3 raise CrabException(msg)
57 fanzago 1.4
58     self.content=file(self.pset).read()
59     self.resDir = common.work_space.resDir()
60 fanzago 1.12
61     self.dataset_to_import=[]
62    
63 slacapra 1.1 self.datasetpath=cfg_params['CMSSW.datasetpath']
64 fanzago 1.12 if (self.datasetpath.upper() != 'NONE'):
65     self.dataset_to_import.append(self.datasetpath)
66    
67     ### Added PU dataset
68 spiga 1.13 tmp = cfg_params.get('CMSSW.dataset_pu',None)
69 fanzago 1.12 if tmp :
70     datasets = tmp.split(',')
71     for dataset in datasets:
72     dataset=string.strip(dataset)
73     self.dataset_to_import.append(dataset)
74     ###
75    
76 slacapra 1.1 self.SEName=''
77     self.CMSSW_VERSION=''
78     self.exit_status=''
79     self.time = time.strftime('%y%m%d_%H%M%S',time.localtime(time.time()))
80 fanzago 1.5 self.problemFiles=[]
81     self.noEventsFiles=[]
82     self.noLFN=[]
83 slacapra 1.1
84     def importParentDataset(self,globalDBS, datasetpath):
85     """
86     """
87     dbsWriter = DBSWriter(self.DBSURL,level='ERROR')
88    
89     try:
90 afanfani 1.10 dbsWriter.importDatasetWithoutParentage(globalDBS, self.datasetpath, self.DBSURL)
91 slacapra 1.1 except DBSWriterError, ex:
92     msg = "Error importing dataset to be processed into local DBS\n"
93     msg += "Source Dataset: %s\n" % datasetpath
94     msg += "Source DBS: %s\n" % globalDBS
95     msg += "Destination DBS: %s\n" % self.DBSURL
96     common.logger.message(msg)
97     return 1
98     return 0
99    
100     def publishDataset(self,file):
101     """
102     """
103     try:
104     jobReport = readJobReport(file)[0]
105     self.exit_status = '0'
106     except IndexError:
107     self.exit_status = '1'
108     msg = "Error: Problem with "+file+" file"
109     common.logger.message(msg)
110     return self.exit_status
111 fanzago 1.12
112     if (len(self.dataset_to_import) != 0):
113     for dataset in self.dataset_to_import:
114     common.logger.message("--->>> Importing parent dataset in the dbs: " +dataset)
115     status_import=self.importParentDataset(self.globalDBS, dataset)
116     if (status_import == 1):
117     common.logger.message('Problem with parent '+ dataset +' import from the global DBS '+self.globalDBS+ 'to the local one '+self.DBSURL)
118     self.exit_status='1'
119     return self.exit_status
120     else:
121     common.logger.message('Import ok of dataset '+dataset)
122 fanzago 1.2
123 slacapra 1.1 #// DBS to contact
124 fanzago 1.2 dbswriter = DBSWriter(self.DBSURL)
125 slacapra 1.1 try:
126     fileinfo= jobReport.files[0]
127     self.exit_status = '0'
128     except IndexError:
129     self.exit_status = '1'
130     msg = "Error: No file to publish in xml file"+file+" file"
131     common.logger.message(msg)
132     return self.exit_status
133    
134     datasets=fileinfo.dataset
135 fanzago 1.4 common.logger.debug(6,"FileInfo = " + str(fileinfo))
136     common.logger.debug(6,"DatasetInfo = " + str(datasets))
137 afanfani 1.19 if len(datasets)<=0:
138     self.exit_status = '1'
139     msg = "Error: No info about dataset in the xml file "+file
140     common.logger.message(msg)
141     return self.exit_status
142 slacapra 1.1 for dataset in datasets:
143 fanzago 1.6 #### for production data
144 afanfani 1.16 self.processedData = dataset['ProcessedDataset']
145 fanzago 1.6 if (dataset['PrimaryDataset'] == 'null'):
146     dataset['PrimaryDataset'] = dataset['ProcessedDataset']
147 afanfani 1.14 else: # add parentage from input dataset
148     dataset['ParentDataset']= self.datasetpath
149    
150 fanzago 1.4 dataset['PSetContent']=self.content
151     cfgMeta = {'name' : self.pset , 'Type' : 'user' , 'annotation': 'user cfg', 'version' : 'private version'} # add real name of user cfg
152 slacapra 1.1 common.logger.message("PrimaryDataset = %s"%dataset['PrimaryDataset'])
153     common.logger.message("ProcessedDataset = %s"%dataset['ProcessedDataset'])
154 fanzago 1.6 common.logger.message("<User Dataset Name> = /"+dataset['PrimaryDataset']+"/"+dataset['ProcessedDataset']+"/USER")
155    
156     common.logger.debug(6,"--->>> Inserting primary: %s processed : %s"%(dataset['PrimaryDataset'],dataset['ProcessedDataset']))
157 slacapra 1.1
158     primary = DBSWriterObjects.createPrimaryDataset( dataset, dbswriter.dbs)
159 fanzago 1.4 common.logger.debug(6,"Primary: %s "%primary)
160 slacapra 1.1
161     algo = DBSWriterObjects.createAlgorithm(dataset, cfgMeta, dbswriter.dbs)
162 fanzago 1.4 common.logger.debug(6,"Algo: %s "%algo)
163 slacapra 1.1
164     processed = DBSWriterObjects.createProcessedDataset(primary, algo, dataset, dbswriter.dbs)
165 fanzago 1.4 common.logger.debug(6,"Processed: %s "%processed)
166 slacapra 1.1
167 fanzago 1.6 common.logger.debug(6,"Inserted primary %s processed %s"%(primary,processed))
168 fanzago 1.2
169 fanzago 1.4 common.logger.debug(6,"exit_status = %s "%self.exit_status)
170 slacapra 1.1 return self.exit_status
171    
172     def publishAJobReport(self,file,procdataset):
173     """
174 fanzago 1.2 input: xml file, processedDataset
175 slacapra 1.1 """
176     try:
177     jobReport = readJobReport(file)[0]
178     self.exit_status = '0'
179     except IndexError:
180     self.exit_status = '1'
181     msg = "Error: Problem with "+file+" file"
182     raise CrabException(msg)
183 fanzago 1.4 ### overwrite ProcessedDataset with user defined value
184     ### overwrite lumisections with no value
185     ### skip publication for 0 events files
186     filestopublish=[]
187 slacapra 1.1 for file in jobReport.files:
188 fanzago 1.5 #### added check for problem with copy to SE and empty lfn
189     if (string.find(file['LFN'], 'copy_problems') != -1):
190     self.problemFiles.append(file['LFN'])
191     elif (file['LFN'] == ''):
192     self.noLFN.append(file['PFN'])
193 fanzago 1.4 else:
194 fanzago 1.5 if int(file['TotalEvents']) != 0 :
195 afanfani 1.17 #file.lumisections = {}
196     # lumi info are now in run hash
197     file.runs = {}
198 fanzago 1.5 for ds in file.dataset:
199 fanzago 1.15 ### FEDE FOR NEW LFN ###
200     #ds['ProcessedDataset']=procdataset
201     ########################
202 fanzago 1.6 ### Fede for production
203     if (ds['PrimaryDataset'] == 'null'):
204     ds['PrimaryDataset']=procdataset
205 fanzago 1.5 filestopublish.append(file)
206     else:
207     self.noEventsFiles.append(file['LFN'])
208 fanzago 1.4 jobReport.files = filestopublish
209     ### if all files of FJR have number of events = 0
210     if (len(filestopublish) == 0):
211     return None
212    
213 slacapra 1.1 #// DBS to contact
214 fanzago 1.2 dbswriter = DBSWriter(self.DBSURL)
215 slacapra 1.1 # insert files
216     Blocks=None
217     try:
218     Blocks=dbswriter.insertFiles(jobReport)
219 mcinquil 1.18 common.logger.message("Inserting file in blocks = %s"%Blocks)
220 slacapra 1.1 except DBSWriterError, ex:
221 mcinquil 1.18 common.logger.error("Insert file error: %s"%ex)
222 slacapra 1.1 return Blocks
223    
224     def run(self):
225     """
226     parse of all xml file on res dir and creation of distionary
227     """
228 fanzago 1.2
229 slacapra 1.1 file_list = glob.glob(self.resDir+"crab_fjr*.xml")
230 afanfani 1.19 ## Select only those fjr that are succesfull
231     good_list=[]
232     for fjr in file_list:
233     reports = readJobReport(fjr)
234     if reports[0].status == "Success":
235     good_list.append(fjr)
236     file_list=good_list
237     ##
238 slacapra 1.1 common.logger.debug(6, "file_list = "+str(file_list))
239     common.logger.debug(6, "len(file_list) = "+str(len(file_list)))
240 fanzago 1.2
241 slacapra 1.1 if (len(file_list)>0):
242     BlocksList=[]
243 fanzago 1.2 common.logger.message("--->>> Start dataset publication")
244     self.exit_status=self.publishDataset(file_list[0])
245     if (self.exit_status == '1'):
246     return self.exit_status
247     common.logger.message("--->>> End dataset publication")
248    
249    
250     common.logger.message("--->>> Start files publication")
251 slacapra 1.1 for file in file_list:
252 mcinquil 1.18 common.logger.debug(1, "file = "+file)
253 slacapra 1.1 Blocks=self.publishAJobReport(file,self.processedData)
254     if Blocks:
255 afanfani 1.14 for x in Blocks: # do not allow multiple entries of the same block
256     if x not in BlocksList:
257     BlocksList.append(x)
258 fanzago 1.2
259 slacapra 1.1 # close the blocks
260 fanzago 1.6 common.logger.debug(6, "BlocksList = %s"%BlocksList)
261 fanzago 1.2 # dbswriter = DBSWriter(self.DBSURL,level='ERROR')
262     dbswriter = DBSWriter(self.DBSURL)
263    
264 slacapra 1.1 for BlockName in BlocksList:
265     try:
266     closeBlock=dbswriter.manageFileBlock(BlockName,maxFiles= 1)
267 fanzago 1.6 common.logger.debug(6, "closeBlock %s"%closeBlock)
268 slacapra 1.1 #dbswriter.dbs.closeBlock(BlockName)
269     except DBSWriterError, ex:
270 fanzago 1.2 common.logger.message("Close block error %s"%ex)
271 fanzago 1.4
272 fanzago 1.5 if (len(self.noEventsFiles)>0):
273     common.logger.message("--->>> WARNING: "+str(len(self.noEventsFiles))+" files not published because they contain 0 events are:")
274     for lfn in self.noEventsFiles:
275     common.logger.message("------ LFN: %s"%lfn)
276     if (len(self.noLFN)>0):
277     common.logger.message("--->>> WARNING: there are "+str(len(self.noLFN))+" files not published because they have empty LFN")
278     for pfn in self.noLFN:
279     common.logger.message("------ pfn: %s"%pfn)
280     if (len(self.problemFiles)>0):
281     common.logger.message("--->>> WARNING: "+str(len(self.problemFiles))+" files not published because they had problem with copy to SE")
282     for lfn in self.problemFiles:
283 fanzago 1.4 common.logger.message("------ LFN: %s"%lfn)
284 fanzago 1.6 common.logger.message("--->>> End files publication")
285 fanzago 1.7 common.logger.message("--->>> To check data publication please use: InspectDBS2.py --DBSURL=<dbs_url_for_publication> --datasetPath=<User Dataset Name>")
286 slacapra 1.1 return self.exit_status
287    
288     else:
289 fanzago 1.2 common.logger.message("--->>> "+self.resDir+" empty: no file to publish on DBS")
290 slacapra 1.1 self.exit_status = '1'
291     return self.exit_status
292