ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Publisher.py
Revision: 1.17
Committed: Wed Oct 15 09:28:21 2008 UTC (16 years, 6 months ago) by afanfani
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_4_0_Tutorial_pre1, CRAB_2_4_0
Changes since 1.16: +3 -1 lines
Log Message:
drop the run/lumisection part compliant with mods in PC FwjobReport

File Contents

# Content
1 import getopt, string
2 import common
3 import time, glob
4 from Actor import *
5 from crab_util import *
6 from crab_logger import Logger
7 from crab_exceptions import *
8 from ProdCommon.FwkJobRep.ReportParser import readJobReport
9 from ProdCommon.MCPayloads.WorkflowSpec import WorkflowSpec
10 from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter
11 from ProdCommon.DataMgmt.DBS.DBSErrors import DBSWriterError, formatEx,DBSReaderError
12 from ProdCommon.DataMgmt.DBS.DBSReader import DBSReader
13 from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter,DBSWriterObjects
14 import sys
15
16 class Publisher(Actor):
17 def __init__(self, cfg_params):
18 """
19 Publisher class:
20
21 - parses CRAB FrameworkJobReport on UI
22 - returns <file> section of xml in dictionary format for each xml file in crab_0_xxxx/res directory
23 - publishes output data on DBS and DLS
24 """
25
26 try:
27 userprocessedData = cfg_params['USER.publish_data_name']
28 self.processedData = None
29 except KeyError:
30 raise CrabException('Cannot publish output data, because you did not specify USER.publish_data_name parameter in the crab.cfg file')
31
32 try:
33 if (int(cfg_params['USER.copy_data']) != 1): raise KeyError
34 except KeyError:
35 raise CrabException('You can not publish data because you did not selected *** copy_data = 1 *** in the crab.cfg file')
36 try:
37 self.pset = cfg_params['CMSSW.pset']
38 except KeyError:
39 raise CrabException('Cannot publish output data, because you did not specify the psetname in [CMSSW] of your crab.cfg file')
40 try:
41 self.globalDBS=cfg_params['CMSSW.dbs_url']
42 except KeyError:
43 self.globalDBS="http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet"
44 try:
45 self.DBSURL=cfg_params['USER.dbs_url_for_publication']
46 common.logger.message('<dbs_url_for_publication> = '+self.DBSURL)
47 if (self.DBSURL == "http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet") or (self.DBSURL == "https://cmsdbsprod.cern.ch:8443/cms_dbs_prod_global_writer/servlet/DBSServlet"):
48 msg = "You can not publish your data in the globalDBS = " + self.DBSURL + "\n"
49 msg = msg + "Please write your local one in the [USER] section 'dbs_url_for_publication'"
50 raise CrabException(msg)
51 except KeyError:
52 msg = "Warning. The [USER] section does not have 'dbs_url_for_publication'"
53 msg = msg + " entry, necessary to publish the data.\n"
54 msg = msg + "Use the command **crab -publish -USER.dbs_url_for_publication=dbs_url_for_publication*** \nwhere dbs_url_for_publication is your local dbs instance."
55 raise CrabException(msg)
56
57 self.content=file(self.pset).read()
58 self.resDir = common.work_space.resDir()
59
60 self.dataset_to_import=[]
61
62 self.datasetpath=cfg_params['CMSSW.datasetpath']
63 if (self.datasetpath.upper() != 'NONE'):
64 self.dataset_to_import.append(self.datasetpath)
65
66 ### Added PU dataset
67 tmp = cfg_params.get('CMSSW.dataset_pu',None)
68 if tmp :
69 datasets = tmp.split(',')
70 for dataset in datasets:
71 dataset=string.strip(dataset)
72 self.dataset_to_import.append(dataset)
73 ###
74
75 self.SEName=''
76 self.CMSSW_VERSION=''
77 self.exit_status=''
78 self.time = time.strftime('%y%m%d_%H%M%S',time.localtime(time.time()))
79 self.problemFiles=[]
80 self.noEventsFiles=[]
81 self.noLFN=[]
82
83 def importParentDataset(self,globalDBS, datasetpath):
84 """
85 """
86 dbsWriter = DBSWriter(self.DBSURL,level='ERROR')
87
88 try:
89 dbsWriter.importDatasetWithoutParentage(globalDBS, self.datasetpath, self.DBSURL)
90 except DBSWriterError, ex:
91 msg = "Error importing dataset to be processed into local DBS\n"
92 msg += "Source Dataset: %s\n" % datasetpath
93 msg += "Source DBS: %s\n" % globalDBS
94 msg += "Destination DBS: %s\n" % self.DBSURL
95 common.logger.message(msg)
96 return 1
97 return 0
98
99 def publishDataset(self,file):
100 """
101 """
102 try:
103 jobReport = readJobReport(file)[0]
104 self.exit_status = '0'
105 except IndexError:
106 self.exit_status = '1'
107 msg = "Error: Problem with "+file+" file"
108 common.logger.message(msg)
109 return self.exit_status
110
111 if (len(self.dataset_to_import) != 0):
112 for dataset in self.dataset_to_import:
113 common.logger.message("--->>> Importing parent dataset in the dbs: " +dataset)
114 status_import=self.importParentDataset(self.globalDBS, dataset)
115 if (status_import == 1):
116 common.logger.message('Problem with parent '+ dataset +' import from the global DBS '+self.globalDBS+ 'to the local one '+self.DBSURL)
117 self.exit_status='1'
118 return self.exit_status
119 else:
120 common.logger.message('Import ok of dataset '+dataset)
121
122 #// DBS to contact
123 dbswriter = DBSWriter(self.DBSURL)
124 try:
125 fileinfo= jobReport.files[0]
126 self.exit_status = '0'
127 except IndexError:
128 self.exit_status = '1'
129 msg = "Error: No file to publish in xml file"+file+" file"
130 common.logger.message(msg)
131 return self.exit_status
132
133 datasets=fileinfo.dataset
134 common.logger.debug(6,"FileInfo = " + str(fileinfo))
135 common.logger.debug(6,"DatasetInfo = " + str(datasets))
136 for dataset in datasets:
137 #### for production data
138 self.processedData = dataset['ProcessedDataset']
139 if (dataset['PrimaryDataset'] == 'null'):
140 dataset['PrimaryDataset'] = dataset['ProcessedDataset']
141 else: # add parentage from input dataset
142 dataset['ParentDataset']= self.datasetpath
143
144 dataset['PSetContent']=self.content
145 cfgMeta = {'name' : self.pset , 'Type' : 'user' , 'annotation': 'user cfg', 'version' : 'private version'} # add real name of user cfg
146 common.logger.message("PrimaryDataset = %s"%dataset['PrimaryDataset'])
147 common.logger.message("ProcessedDataset = %s"%dataset['ProcessedDataset'])
148 common.logger.message("<User Dataset Name> = /"+dataset['PrimaryDataset']+"/"+dataset['ProcessedDataset']+"/USER")
149
150 common.logger.debug(6,"--->>> Inserting primary: %s processed : %s"%(dataset['PrimaryDataset'],dataset['ProcessedDataset']))
151
152 primary = DBSWriterObjects.createPrimaryDataset( dataset, dbswriter.dbs)
153 common.logger.debug(6,"Primary: %s "%primary)
154
155 algo = DBSWriterObjects.createAlgorithm(dataset, cfgMeta, dbswriter.dbs)
156 common.logger.debug(6,"Algo: %s "%algo)
157
158 processed = DBSWriterObjects.createProcessedDataset(primary, algo, dataset, dbswriter.dbs)
159 common.logger.debug(6,"Processed: %s "%processed)
160
161 common.logger.debug(6,"Inserted primary %s processed %s"%(primary,processed))
162
163 common.logger.debug(6,"exit_status = %s "%self.exit_status)
164 return self.exit_status
165
166 def publishAJobReport(self,file,procdataset):
167 """
168 input: xml file, processedDataset
169 """
170 try:
171 jobReport = readJobReport(file)[0]
172 self.exit_status = '0'
173 except IndexError:
174 self.exit_status = '1'
175 msg = "Error: Problem with "+file+" file"
176 raise CrabException(msg)
177 ### overwrite ProcessedDataset with user defined value
178 ### overwrite lumisections with no value
179 ### skip publication for 0 events files
180 filestopublish=[]
181 for file in jobReport.files:
182 #### added check for problem with copy to SE and empty lfn
183 if (string.find(file['LFN'], 'copy_problems') != -1):
184 self.problemFiles.append(file['LFN'])
185 elif (file['LFN'] == ''):
186 self.noLFN.append(file['PFN'])
187 else:
188 if int(file['TotalEvents']) != 0 :
189 #file.lumisections = {}
190 # lumi info are now in run hash
191 file.runs = {}
192 for ds in file.dataset:
193 ### FEDE FOR NEW LFN ###
194 #ds['ProcessedDataset']=procdataset
195 ########################
196 ### Fede for production
197 if (ds['PrimaryDataset'] == 'null'):
198 ds['PrimaryDataset']=procdataset
199 filestopublish.append(file)
200 else:
201 self.noEventsFiles.append(file['LFN'])
202 jobReport.files = filestopublish
203 ### if all files of FJR have number of events = 0
204 if (len(filestopublish) == 0):
205 return None
206
207 #// DBS to contact
208 dbswriter = DBSWriter(self.DBSURL)
209 # insert files
210 Blocks=None
211 try:
212 Blocks=dbswriter.insertFiles(jobReport)
213 common.logger.message("Blocks = %s"%Blocks)
214 except DBSWriterError, ex:
215 common.logger.message("Insert file error: %s"%ex)
216 return Blocks
217
218 def run(self):
219 """
220 parse of all xml file on res dir and creation of distionary
221 """
222
223 file_list = glob.glob(self.resDir+"crab_fjr*.xml")
224 common.logger.debug(6, "file_list = "+str(file_list))
225 common.logger.debug(6, "len(file_list) = "+str(len(file_list)))
226
227 if (len(file_list)>0):
228 BlocksList=[]
229 common.logger.message("--->>> Start dataset publication")
230 self.exit_status=self.publishDataset(file_list[0])
231 if (self.exit_status == '1'):
232 return self.exit_status
233 common.logger.message("--->>> End dataset publication")
234
235
236 common.logger.message("--->>> Start files publication")
237 for file in file_list:
238 common.logger.message("file = "+file)
239 Blocks=self.publishAJobReport(file,self.processedData)
240 if Blocks:
241 for x in Blocks: # do not allow multiple entries of the same block
242 if x not in BlocksList:
243 BlocksList.append(x)
244
245 # close the blocks
246 common.logger.debug(6, "BlocksList = %s"%BlocksList)
247 # dbswriter = DBSWriter(self.DBSURL,level='ERROR')
248 dbswriter = DBSWriter(self.DBSURL)
249
250 for BlockName in BlocksList:
251 try:
252 closeBlock=dbswriter.manageFileBlock(BlockName,maxFiles= 1)
253 common.logger.debug(6, "closeBlock %s"%closeBlock)
254 #dbswriter.dbs.closeBlock(BlockName)
255 except DBSWriterError, ex:
256 common.logger.message("Close block error %s"%ex)
257
258 if (len(self.noEventsFiles)>0):
259 common.logger.message("--->>> WARNING: "+str(len(self.noEventsFiles))+" files not published because they contain 0 events are:")
260 for lfn in self.noEventsFiles:
261 common.logger.message("------ LFN: %s"%lfn)
262 if (len(self.noLFN)>0):
263 common.logger.message("--->>> WARNING: there are "+str(len(self.noLFN))+" files not published because they have empty LFN")
264 for pfn in self.noLFN:
265 common.logger.message("------ pfn: %s"%pfn)
266 if (len(self.problemFiles)>0):
267 common.logger.message("--->>> WARNING: "+str(len(self.problemFiles))+" files not published because they had problem with copy to SE")
268 for lfn in self.problemFiles:
269 common.logger.message("------ LFN: %s"%lfn)
270 common.logger.message("--->>> End files publication")
271 common.logger.message("--->>> To check data publication please use: InspectDBS2.py --DBSURL=<dbs_url_for_publication> --datasetPath=<User Dataset Name>")
272 return self.exit_status
273
274 else:
275 common.logger.message("--->>> "+self.resDir+" empty: no file to publish on DBS")
276 self.exit_status = '1'
277 return self.exit_status
278