ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Publisher.py
Revision: 1.13
Committed: Thu Jun 19 14:29:18 2008 UTC (16 years, 10 months ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_4_0_test
Changes since 1.12: +1 -1 lines
Log Message:
dataset_pu moved from USER to CMSSW section

File Contents

# Content
1 import getopt, string
2 import common
3 import time, glob
4 from Actor import *
5 from crab_util import *
6 from crab_logger import Logger
7 from crab_exceptions import *
8 from ProdCommon.FwkJobRep.ReportParser import readJobReport
9 from ProdCommon.MCPayloads.WorkflowSpec import WorkflowSpec
10 from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter
11 from ProdCommon.DataMgmt.DBS.DBSErrors import DBSWriterError, formatEx,DBSReaderError
12 from ProdCommon.DataMgmt.DBS.DBSReader import DBSReader
13 from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter,DBSWriterObjects
14 import sys
15
16 class Publisher(Actor):
17 def __init__(self, cfg_params):
18 """
19 Publisher class:
20
21 - parses CRAB FrameworkJobReport on UI
22 - returns <file> section of xml in dictionary format for each xml file in crab_0_xxxx/res directory
23 - publishes output data on DBS and DLS
24 """
25
26 try:
27 self.processedData = cfg_params['USER.publish_data_name']
28 except KeyError:
29 raise CrabException('Cannot publish output data, because you did not specify USER.publish_data_name parameter in the crab.cfg file')
30
31 try:
32 if (int(cfg_params['USER.copy_data']) != 1): raise KeyError
33 except KeyError:
34 raise CrabException('You can not publish data because you did not selected *** copy_data = 1 *** in the crab.cfg file')
35 try:
36 self.pset = cfg_params['CMSSW.pset']
37 except KeyError:
38 raise CrabException('Cannot publish output data, because you did not specify the psetname in [CMSSW] of your crab.cfg file')
39 try:
40 self.globalDBS=cfg_params['CMSSW.dbs_url']
41 except KeyError:
42 self.globalDBS="http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet"
43 try:
44 self.DBSURL=cfg_params['USER.dbs_url_for_publication']
45 common.logger.message('<dbs_url_for_publication> = '+self.DBSURL)
46 if (self.DBSURL == "http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet") or (self.DBSURL == "https://cmsdbsprod.cern.ch:8443/cms_dbs_prod_global_writer/servlet/DBSServlet"):
47 msg = "You can not publish your data in the globalDBS = " + self.DBSURL + "\n"
48 msg = msg + "Please write your local one in the [USER] section 'dbs_url_for_publication'"
49 raise CrabException(msg)
50 except KeyError:
51 msg = "Warning. The [USER] section does not have 'dbs_url_for_publication'"
52 msg = msg + " entry, necessary to publish the data.\n"
53 msg = msg + "Use the command **crab -publish -USER.dbs_url_for_publication=dbs_url_for_publication*** \nwhere dbs_url_for_publication is your local dbs instance."
54 raise CrabException(msg)
55
56 self.content=file(self.pset).read()
57 self.resDir = common.work_space.resDir()
58
59 self.dataset_to_import=[]
60
61 self.datasetpath=cfg_params['CMSSW.datasetpath']
62 if (self.datasetpath.upper() != 'NONE'):
63 self.dataset_to_import.append(self.datasetpath)
64
65 ### Added PU dataset
66 tmp = cfg_params.get('CMSSW.dataset_pu',None)
67 if tmp :
68 datasets = tmp.split(',')
69 for dataset in datasets:
70 dataset=string.strip(dataset)
71 self.dataset_to_import.append(dataset)
72 ###
73
74 self.SEName=''
75 self.CMSSW_VERSION=''
76 self.exit_status=''
77 self.time = time.strftime('%y%m%d_%H%M%S',time.localtime(time.time()))
78 self.problemFiles=[]
79 self.noEventsFiles=[]
80 self.noLFN=[]
81
82 def importParentDataset(self,globalDBS, datasetpath):
83 """
84 """
85 dbsWriter = DBSWriter(self.DBSURL,level='ERROR')
86
87 try:
88 dbsWriter.importDatasetWithoutParentage(globalDBS, self.datasetpath, self.DBSURL)
89 except DBSWriterError, ex:
90 msg = "Error importing dataset to be processed into local DBS\n"
91 msg += "Source Dataset: %s\n" % datasetpath
92 msg += "Source DBS: %s\n" % globalDBS
93 msg += "Destination DBS: %s\n" % self.DBSURL
94 common.logger.message(msg)
95 return 1
96 return 0
97
98 def publishDataset(self,file):
99 """
100 """
101 try:
102 jobReport = readJobReport(file)[0]
103 self.exit_status = '0'
104 except IndexError:
105 self.exit_status = '1'
106 msg = "Error: Problem with "+file+" file"
107 common.logger.message(msg)
108 return self.exit_status
109
110 if (len(self.dataset_to_import) != 0):
111 for dataset in self.dataset_to_import:
112 common.logger.message("--->>> Importing parent dataset in the dbs: " +dataset)
113 status_import=self.importParentDataset(self.globalDBS, dataset)
114 if (status_import == 1):
115 common.logger.message('Problem with parent '+ dataset +' import from the global DBS '+self.globalDBS+ 'to the local one '+self.DBSURL)
116 self.exit_status='1'
117 return self.exit_status
118 else:
119 common.logger.message('Import ok of dataset '+dataset)
120
121 #// DBS to contact
122 dbswriter = DBSWriter(self.DBSURL)
123 try:
124 fileinfo= jobReport.files[0]
125 self.exit_status = '0'
126 except IndexError:
127 self.exit_status = '1'
128 msg = "Error: No file to publish in xml file"+file+" file"
129 common.logger.message(msg)
130 return self.exit_status
131
132 datasets=fileinfo.dataset
133 common.logger.debug(6,"FileInfo = " + str(fileinfo))
134 common.logger.debug(6,"DatasetInfo = " + str(datasets))
135 for dataset in datasets:
136 #### for production data
137 if (dataset['PrimaryDataset'] == 'null'):
138 dataset['PrimaryDataset'] = dataset['ProcessedDataset']
139
140 dataset['PSetContent']=self.content
141 cfgMeta = {'name' : self.pset , 'Type' : 'user' , 'annotation': 'user cfg', 'version' : 'private version'} # add real name of user cfg
142 common.logger.message("PrimaryDataset = %s"%dataset['PrimaryDataset'])
143 common.logger.message("ProcessedDataset = %s"%dataset['ProcessedDataset'])
144 common.logger.message("<User Dataset Name> = /"+dataset['PrimaryDataset']+"/"+dataset['ProcessedDataset']+"/USER")
145
146 common.logger.debug(6,"--->>> Inserting primary: %s processed : %s"%(dataset['PrimaryDataset'],dataset['ProcessedDataset']))
147
148 primary = DBSWriterObjects.createPrimaryDataset( dataset, dbswriter.dbs)
149 common.logger.debug(6,"Primary: %s "%primary)
150
151 algo = DBSWriterObjects.createAlgorithm(dataset, cfgMeta, dbswriter.dbs)
152 common.logger.debug(6,"Algo: %s "%algo)
153
154 processed = DBSWriterObjects.createProcessedDataset(primary, algo, dataset, dbswriter.dbs)
155 common.logger.debug(6,"Processed: %s "%processed)
156
157 common.logger.debug(6,"Inserted primary %s processed %s"%(primary,processed))
158
159 common.logger.debug(6,"exit_status = %s "%self.exit_status)
160 return self.exit_status
161
162 def publishAJobReport(self,file,procdataset):
163 """
164 input: xml file, processedDataset
165 """
166 try:
167 jobReport = readJobReport(file)[0]
168 self.exit_status = '0'
169 except IndexError:
170 self.exit_status = '1'
171 msg = "Error: Problem with "+file+" file"
172 raise CrabException(msg)
173 ### overwrite ProcessedDataset with user defined value
174 ### overwrite lumisections with no value
175 ### skip publication for 0 events files
176 filestopublish=[]
177 for file in jobReport.files:
178 #### added check for problem with copy to SE and empty lfn
179 if (string.find(file['LFN'], 'copy_problems') != -1):
180 self.problemFiles.append(file['LFN'])
181 elif (file['LFN'] == ''):
182 self.noLFN.append(file['PFN'])
183 else:
184 if int(file['TotalEvents']) != 0 :
185 file.lumisections = {}
186 for ds in file.dataset:
187 ds['ProcessedDataset']=procdataset
188 ### Fede for production
189 if (ds['PrimaryDataset'] == 'null'):
190 ds['PrimaryDataset']=procdataset
191 filestopublish.append(file)
192 else:
193 self.noEventsFiles.append(file['LFN'])
194 jobReport.files = filestopublish
195 ### if all files of FJR have number of events = 0
196 if (len(filestopublish) == 0):
197 return None
198
199 #// DBS to contact
200 dbswriter = DBSWriter(self.DBSURL)
201 # insert files
202 Blocks=None
203 try:
204 Blocks=dbswriter.insertFiles(jobReport)
205 common.logger.message("Blocks = %s"%Blocks)
206 except DBSWriterError, ex:
207 common.logger.message("Insert file error: %s"%ex)
208 return Blocks
209
210 def run(self):
211 """
212 parse of all xml file on res dir and creation of distionary
213 """
214
215 file_list = glob.glob(self.resDir+"crab_fjr*.xml")
216 common.logger.debug(6, "file_list = "+str(file_list))
217 common.logger.debug(6, "len(file_list) = "+str(len(file_list)))
218
219 if (len(file_list)>0):
220 BlocksList=[]
221 common.logger.message("--->>> Start dataset publication")
222 self.exit_status=self.publishDataset(file_list[0])
223 if (self.exit_status == '1'):
224 return self.exit_status
225 common.logger.message("--->>> End dataset publication")
226
227
228 common.logger.message("--->>> Start files publication")
229 for file in file_list:
230 common.logger.message("file = "+file)
231 Blocks=self.publishAJobReport(file,self.processedData)
232 if Blocks:
233 [BlocksList.append(x) for x in Blocks]
234
235 # close the blocks
236 common.logger.debug(6, "BlocksList = %s"%BlocksList)
237 # dbswriter = DBSWriter(self.DBSURL,level='ERROR')
238 dbswriter = DBSWriter(self.DBSURL)
239
240 for BlockName in BlocksList:
241 try:
242 closeBlock=dbswriter.manageFileBlock(BlockName,maxFiles= 1)
243 common.logger.debug(6, "closeBlock %s"%closeBlock)
244 #dbswriter.dbs.closeBlock(BlockName)
245 except DBSWriterError, ex:
246 common.logger.message("Close block error %s"%ex)
247
248 if (len(self.noEventsFiles)>0):
249 common.logger.message("--->>> WARNING: "+str(len(self.noEventsFiles))+" files not published because they contain 0 events are:")
250 for lfn in self.noEventsFiles:
251 common.logger.message("------ LFN: %s"%lfn)
252 if (len(self.noLFN)>0):
253 common.logger.message("--->>> WARNING: there are "+str(len(self.noLFN))+" files not published because they have empty LFN")
254 for pfn in self.noLFN:
255 common.logger.message("------ pfn: %s"%pfn)
256 if (len(self.problemFiles)>0):
257 common.logger.message("--->>> WARNING: "+str(len(self.problemFiles))+" files not published because they had problem with copy to SE")
258 for lfn in self.problemFiles:
259 common.logger.message("------ LFN: %s"%lfn)
260 common.logger.message("--->>> End files publication")
261 common.logger.message("--->>> To check data publication please use: InspectDBS2.py --DBSURL=<dbs_url_for_publication> --datasetPath=<User Dataset Name>")
262 return self.exit_status
263
264 else:
265 common.logger.message("--->>> "+self.resDir+" empty: no file to publish on DBS")
266 self.exit_status = '1'
267 return self.exit_status
268