ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Publisher.py
Revision: 1.20
Committed: Fri Jan 9 19:58:09 2009 UTC (16 years, 3 months ago) by afanfani
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_4_4_pre2, CRAB_2_4_4_pre1
Changes since 1.19: +3 -2 lines
Log Message:
fix to deal with invalid fjr (skip them instead of let the publication fail)

File Contents

# User Rev Content
1 fanzago 1.2 import getopt, string
2 slacapra 1.1 import common
3     import time, glob
4     from Actor import *
5     from crab_util import *
6     from crab_logger import Logger
7     from crab_exceptions import *
8 fanzago 1.9 from ProdCommon.FwkJobRep.ReportParser import readJobReport
9 afanfani 1.19 from ProdCommon.FwkJobRep.ReportState import checkSuccess
10 slacapra 1.1 from ProdCommon.MCPayloads.WorkflowSpec import WorkflowSpec
11     from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter
12     from ProdCommon.DataMgmt.DBS.DBSErrors import DBSWriterError, formatEx,DBSReaderError
13     from ProdCommon.DataMgmt.DBS.DBSReader import DBSReader
14 fanzago 1.2 from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter,DBSWriterObjects
15     import sys
16 slacapra 1.1
17     class Publisher(Actor):
18     def __init__(self, cfg_params):
19     """
20     Publisher class:
21    
22     - parses CRAB FrameworkJobReport on UI
23     - returns <file> section of xml in dictionary format for each xml file in crab_0_xxxx/res directory
24     - publishes output data on DBS and DLS
25     """
26    
27     try:
28 afanfani 1.16 userprocessedData = cfg_params['USER.publish_data_name']
29     self.processedData = None
30 slacapra 1.1 except KeyError:
31     raise CrabException('Cannot publish output data, because you did not specify USER.publish_data_name parameter in the crab.cfg file')
32 slacapra 1.8
33 slacapra 1.1 try:
34     if (int(cfg_params['USER.copy_data']) != 1): raise KeyError
35     except KeyError:
36     raise CrabException('You can not publish data because you did not selected *** copy_data = 1 *** in the crab.cfg file')
37 fanzago 1.4 try:
38     self.pset = cfg_params['CMSSW.pset']
39     except KeyError:
40     raise CrabException('Cannot publish output data, because you did not specify the psetname in [CMSSW] of your crab.cfg file')
41     try:
42     self.globalDBS=cfg_params['CMSSW.dbs_url']
43     except KeyError:
44     self.globalDBS="http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet"
45 fanzago 1.3 try:
46     self.DBSURL=cfg_params['USER.dbs_url_for_publication']
47 fanzago 1.6 common.logger.message('<dbs_url_for_publication> = '+self.DBSURL)
48     if (self.DBSURL == "http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet") or (self.DBSURL == "https://cmsdbsprod.cern.ch:8443/cms_dbs_prod_global_writer/servlet/DBSServlet"):
49 fanzago 1.4 msg = "You can not publish your data in the globalDBS = " + self.DBSURL + "\n"
50     msg = msg + "Please write your local one in the [USER] section 'dbs_url_for_publication'"
51     raise CrabException(msg)
52 fanzago 1.3 except KeyError:
53 fanzago 1.11 msg = "Warning. The [USER] section does not have 'dbs_url_for_publication'"
54     msg = msg + " entry, necessary to publish the data.\n"
55     msg = msg + "Use the command **crab -publish -USER.dbs_url_for_publication=dbs_url_for_publication*** \nwhere dbs_url_for_publication is your local dbs instance."
56 fanzago 1.3 raise CrabException(msg)
57 fanzago 1.4
58     self.content=file(self.pset).read()
59     self.resDir = common.work_space.resDir()
60 fanzago 1.12
61     self.dataset_to_import=[]
62    
63 slacapra 1.1 self.datasetpath=cfg_params['CMSSW.datasetpath']
64 fanzago 1.12 if (self.datasetpath.upper() != 'NONE'):
65     self.dataset_to_import.append(self.datasetpath)
66    
67     ### Added PU dataset
68 spiga 1.13 tmp = cfg_params.get('CMSSW.dataset_pu',None)
69 fanzago 1.12 if tmp :
70     datasets = tmp.split(',')
71     for dataset in datasets:
72     dataset=string.strip(dataset)
73     self.dataset_to_import.append(dataset)
74     ###
75    
76 slacapra 1.1 self.SEName=''
77     self.CMSSW_VERSION=''
78     self.exit_status=''
79     self.time = time.strftime('%y%m%d_%H%M%S',time.localtime(time.time()))
80 fanzago 1.5 self.problemFiles=[]
81     self.noEventsFiles=[]
82     self.noLFN=[]
83 slacapra 1.1
84     def importParentDataset(self,globalDBS, datasetpath):
85     """
86     """
87     dbsWriter = DBSWriter(self.DBSURL,level='ERROR')
88    
89     try:
90 afanfani 1.10 dbsWriter.importDatasetWithoutParentage(globalDBS, self.datasetpath, self.DBSURL)
91 slacapra 1.1 except DBSWriterError, ex:
92     msg = "Error importing dataset to be processed into local DBS\n"
93     msg += "Source Dataset: %s\n" % datasetpath
94     msg += "Source DBS: %s\n" % globalDBS
95     msg += "Destination DBS: %s\n" % self.DBSURL
96     common.logger.message(msg)
97     return 1
98     return 0
99    
100     def publishDataset(self,file):
101     """
102     """
103     try:
104     jobReport = readJobReport(file)[0]
105     self.exit_status = '0'
106     except IndexError:
107     self.exit_status = '1'
108     msg = "Error: Problem with "+file+" file"
109     common.logger.message(msg)
110     return self.exit_status
111 fanzago 1.12
112     if (len(self.dataset_to_import) != 0):
113     for dataset in self.dataset_to_import:
114     common.logger.message("--->>> Importing parent dataset in the dbs: " +dataset)
115     status_import=self.importParentDataset(self.globalDBS, dataset)
116     if (status_import == 1):
117     common.logger.message('Problem with parent '+ dataset +' import from the global DBS '+self.globalDBS+ 'to the local one '+self.DBSURL)
118     self.exit_status='1'
119     return self.exit_status
120     else:
121     common.logger.message('Import ok of dataset '+dataset)
122 fanzago 1.2
123 slacapra 1.1 #// DBS to contact
124 fanzago 1.2 dbswriter = DBSWriter(self.DBSURL)
125 slacapra 1.1 try:
126     fileinfo= jobReport.files[0]
127     self.exit_status = '0'
128     except IndexError:
129     self.exit_status = '1'
130     msg = "Error: No file to publish in xml file"+file+" file"
131     common.logger.message(msg)
132     return self.exit_status
133    
134     datasets=fileinfo.dataset
135 fanzago 1.4 common.logger.debug(6,"FileInfo = " + str(fileinfo))
136     common.logger.debug(6,"DatasetInfo = " + str(datasets))
137 afanfani 1.19 if len(datasets)<=0:
138     self.exit_status = '1'
139     msg = "Error: No info about dataset in the xml file "+file
140     common.logger.message(msg)
141     return self.exit_status
142 slacapra 1.1 for dataset in datasets:
143 fanzago 1.6 #### for production data
144 afanfani 1.16 self.processedData = dataset['ProcessedDataset']
145 fanzago 1.6 if (dataset['PrimaryDataset'] == 'null'):
146     dataset['PrimaryDataset'] = dataset['ProcessedDataset']
147 afanfani 1.14 else: # add parentage from input dataset
148     dataset['ParentDataset']= self.datasetpath
149    
150 fanzago 1.4 dataset['PSetContent']=self.content
151     cfgMeta = {'name' : self.pset , 'Type' : 'user' , 'annotation': 'user cfg', 'version' : 'private version'} # add real name of user cfg
152 slacapra 1.1 common.logger.message("PrimaryDataset = %s"%dataset['PrimaryDataset'])
153     common.logger.message("ProcessedDataset = %s"%dataset['ProcessedDataset'])
154 fanzago 1.6 common.logger.message("<User Dataset Name> = /"+dataset['PrimaryDataset']+"/"+dataset['ProcessedDataset']+"/USER")
155    
156     common.logger.debug(6,"--->>> Inserting primary: %s processed : %s"%(dataset['PrimaryDataset'],dataset['ProcessedDataset']))
157 slacapra 1.1
158     primary = DBSWriterObjects.createPrimaryDataset( dataset, dbswriter.dbs)
159 fanzago 1.4 common.logger.debug(6,"Primary: %s "%primary)
160 slacapra 1.1
161     algo = DBSWriterObjects.createAlgorithm(dataset, cfgMeta, dbswriter.dbs)
162 fanzago 1.4 common.logger.debug(6,"Algo: %s "%algo)
163 slacapra 1.1
164     processed = DBSWriterObjects.createProcessedDataset(primary, algo, dataset, dbswriter.dbs)
165 fanzago 1.4 common.logger.debug(6,"Processed: %s "%processed)
166 slacapra 1.1
167 fanzago 1.6 common.logger.debug(6,"Inserted primary %s processed %s"%(primary,processed))
168 fanzago 1.2
169 fanzago 1.4 common.logger.debug(6,"exit_status = %s "%self.exit_status)
170 slacapra 1.1 return self.exit_status
171    
172     def publishAJobReport(self,file,procdataset):
173     """
174 fanzago 1.2 input: xml file, processedDataset
175 slacapra 1.1 """
176     try:
177     jobReport = readJobReport(file)[0]
178     self.exit_status = '0'
179     except IndexError:
180     self.exit_status = '1'
181     msg = "Error: Problem with "+file+" file"
182     raise CrabException(msg)
183 fanzago 1.4 ### overwrite ProcessedDataset with user defined value
184     ### overwrite lumisections with no value
185     ### skip publication for 0 events files
186     filestopublish=[]
187 slacapra 1.1 for file in jobReport.files:
188 fanzago 1.5 #### added check for problem with copy to SE and empty lfn
189     if (string.find(file['LFN'], 'copy_problems') != -1):
190     self.problemFiles.append(file['LFN'])
191     elif (file['LFN'] == ''):
192     self.noLFN.append(file['PFN'])
193 fanzago 1.4 else:
194 fanzago 1.5 if int(file['TotalEvents']) != 0 :
195 afanfani 1.17 #file.lumisections = {}
196     # lumi info are now in run hash
197     file.runs = {}
198 fanzago 1.5 for ds in file.dataset:
199 fanzago 1.15 ### FEDE FOR NEW LFN ###
200     #ds['ProcessedDataset']=procdataset
201     ########################
202 fanzago 1.6 ### Fede for production
203     if (ds['PrimaryDataset'] == 'null'):
204     ds['PrimaryDataset']=procdataset
205 fanzago 1.5 filestopublish.append(file)
206     else:
207     self.noEventsFiles.append(file['LFN'])
208 fanzago 1.4 jobReport.files = filestopublish
209     ### if all files of FJR have number of events = 0
210     if (len(filestopublish) == 0):
211     return None
212    
213 slacapra 1.1 #// DBS to contact
214 fanzago 1.2 dbswriter = DBSWriter(self.DBSURL)
215 slacapra 1.1 # insert files
216     Blocks=None
217     try:
218     Blocks=dbswriter.insertFiles(jobReport)
219 mcinquil 1.18 common.logger.message("Inserting file in blocks = %s"%Blocks)
220 slacapra 1.1 except DBSWriterError, ex:
221 mcinquil 1.18 common.logger.error("Insert file error: %s"%ex)
222 slacapra 1.1 return Blocks
223    
224     def run(self):
225     """
226     parse of all xml file on res dir and creation of distionary
227     """
228 fanzago 1.2
229 slacapra 1.1 file_list = glob.glob(self.resDir+"crab_fjr*.xml")
230 afanfani 1.19 ## Select only those fjr that are succesfull
231     good_list=[]
232     for fjr in file_list:
233     reports = readJobReport(fjr)
234 afanfani 1.20 if len(reports)>0:
235     if reports[0].status == "Success":
236     good_list.append(fjr)
237 afanfani 1.19 file_list=good_list
238     ##
239 slacapra 1.1 common.logger.debug(6, "file_list = "+str(file_list))
240     common.logger.debug(6, "len(file_list) = "+str(len(file_list)))
241 fanzago 1.2
242 slacapra 1.1 if (len(file_list)>0):
243     BlocksList=[]
244 fanzago 1.2 common.logger.message("--->>> Start dataset publication")
245     self.exit_status=self.publishDataset(file_list[0])
246     if (self.exit_status == '1'):
247     return self.exit_status
248     common.logger.message("--->>> End dataset publication")
249    
250    
251     common.logger.message("--->>> Start files publication")
252 slacapra 1.1 for file in file_list:
253 mcinquil 1.18 common.logger.debug(1, "file = "+file)
254 slacapra 1.1 Blocks=self.publishAJobReport(file,self.processedData)
255     if Blocks:
256 afanfani 1.14 for x in Blocks: # do not allow multiple entries of the same block
257     if x not in BlocksList:
258     BlocksList.append(x)
259 fanzago 1.2
260 slacapra 1.1 # close the blocks
261 fanzago 1.6 common.logger.debug(6, "BlocksList = %s"%BlocksList)
262 fanzago 1.2 # dbswriter = DBSWriter(self.DBSURL,level='ERROR')
263     dbswriter = DBSWriter(self.DBSURL)
264    
265 slacapra 1.1 for BlockName in BlocksList:
266     try:
267     closeBlock=dbswriter.manageFileBlock(BlockName,maxFiles= 1)
268 fanzago 1.6 common.logger.debug(6, "closeBlock %s"%closeBlock)
269 slacapra 1.1 #dbswriter.dbs.closeBlock(BlockName)
270     except DBSWriterError, ex:
271 fanzago 1.2 common.logger.message("Close block error %s"%ex)
272 fanzago 1.4
273 fanzago 1.5 if (len(self.noEventsFiles)>0):
274     common.logger.message("--->>> WARNING: "+str(len(self.noEventsFiles))+" files not published because they contain 0 events are:")
275     for lfn in self.noEventsFiles:
276     common.logger.message("------ LFN: %s"%lfn)
277     if (len(self.noLFN)>0):
278     common.logger.message("--->>> WARNING: there are "+str(len(self.noLFN))+" files not published because they have empty LFN")
279     for pfn in self.noLFN:
280     common.logger.message("------ pfn: %s"%pfn)
281     if (len(self.problemFiles)>0):
282     common.logger.message("--->>> WARNING: "+str(len(self.problemFiles))+" files not published because they had problem with copy to SE")
283     for lfn in self.problemFiles:
284 fanzago 1.4 common.logger.message("------ LFN: %s"%lfn)
285 fanzago 1.6 common.logger.message("--->>> End files publication")
286 fanzago 1.7 common.logger.message("--->>> To check data publication please use: InspectDBS2.py --DBSURL=<dbs_url_for_publication> --datasetPath=<User Dataset Name>")
287 slacapra 1.1 return self.exit_status
288    
289     else:
290 fanzago 1.2 common.logger.message("--->>> "+self.resDir+" empty: no file to publish on DBS")
291 slacapra 1.1 self.exit_status = '1'
292     return self.exit_status
293