1 |
< |
import sys, getopt, string |
1 |
> |
import getopt, string |
2 |
|
import common |
3 |
|
import time, glob |
4 |
|
from Actor import * |
5 |
– |
from FwkJobRep.ReportParser import readJobReport |
5 |
|
from crab_util import * |
6 |
|
from crab_logger import Logger |
7 |
|
from crab_exceptions import * |
8 |
+ |
from ProdCommon.FwkJobRep.ReportParser import readJobReport |
9 |
|
from ProdCommon.MCPayloads.WorkflowSpec import WorkflowSpec |
10 |
|
from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter |
11 |
|
from ProdCommon.DataMgmt.DBS.DBSErrors import DBSWriterError, formatEx,DBSReaderError |
12 |
|
from ProdCommon.DataMgmt.DBS.DBSReader import DBSReader |
13 |
< |
import ProdCommon.DataMgmt.DBS.DBSWriterObjects as DBSWriterObjects |
14 |
< |
|
15 |
< |
|
13 |
> |
from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter,DBSWriterObjects |
14 |
> |
import sys |
15 |
|
|
16 |
|
class Publisher(Actor): |
17 |
|
def __init__(self, cfg_params): |
24 |
|
""" |
25 |
|
|
26 |
|
try: |
27 |
< |
self.processedData = cfg_params['USER.publish_data_name'] |
27 |
> |
userprocessedData = cfg_params['USER.publish_data_name'] |
28 |
> |
self.processedData = None |
29 |
|
except KeyError: |
30 |
|
raise CrabException('Cannot publish output data, because you did not specify USER.publish_data_name parameter in the crab.cfg file') |
31 |
|
|
33 |
|
if (int(cfg_params['USER.copy_data']) != 1): raise KeyError |
34 |
|
except KeyError: |
35 |
|
raise CrabException('You can not publish data because you did not selected *** copy_data = 1 *** in the crab.cfg file') |
36 |
< |
|
37 |
< |
common.logger.message('self.processedData = '+self.processedData) |
36 |
> |
try: |
37 |
> |
self.pset = cfg_params['CMSSW.pset'] |
38 |
> |
except KeyError: |
39 |
> |
raise CrabException('Cannot publish output data, because you did not specify the psetname in [CMSSW] of your crab.cfg file') |
40 |
> |
try: |
41 |
> |
self.globalDBS=cfg_params['CMSSW.dbs_url'] |
42 |
> |
except KeyError: |
43 |
> |
self.globalDBS="http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet" |
44 |
> |
try: |
45 |
> |
self.DBSURL=cfg_params['USER.dbs_url_for_publication'] |
46 |
> |
common.logger.message('<dbs_url_for_publication> = '+self.DBSURL) |
47 |
> |
if (self.DBSURL == "http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet") or (self.DBSURL == "https://cmsdbsprod.cern.ch:8443/cms_dbs_prod_global_writer/servlet/DBSServlet"): |
48 |
> |
msg = "You can not publish your data in the globalDBS = " + self.DBSURL + "\n" |
49 |
> |
msg = msg + "Please write your local one in the [USER] section 'dbs_url_for_publication'" |
50 |
> |
raise CrabException(msg) |
51 |
> |
except KeyError: |
52 |
> |
msg = "Warning. The [USER] section does not have 'dbs_url_for_publication'" |
53 |
> |
msg = msg + " entry, necessary to publish the data.\n" |
54 |
> |
msg = msg + "Use the command **crab -publish -USER.dbs_url_for_publication=dbs_url_for_publication*** \nwhere dbs_url_for_publication is your local dbs instance." |
55 |
> |
raise CrabException(msg) |
56 |
> |
|
57 |
> |
self.content=file(self.pset).read() |
58 |
|
self.resDir = common.work_space.resDir() |
59 |
< |
common.logger.message('self.resDir = '+self.resDir) |
60 |
< |
#old api self.DBSURL='http://cmssrv18.fnal.gov:8989/DBS/servlet/DBSServlet' |
61 |
< |
self.DBSURL=cfg_params['USER.dbs_url_for_publication'] |
42 |
< |
#self.DBSURL='http://cmssrv17.fnal.gov:8989/DBS_1_0_4_pre2/servlet/DBSServlet' |
43 |
< |
common.logger.message('self.DBSURL = '+self.DBSURL) |
59 |
> |
|
60 |
> |
self.dataset_to_import=[] |
61 |
> |
|
62 |
|
self.datasetpath=cfg_params['CMSSW.datasetpath'] |
63 |
< |
common.logger.message('self.datasetpath = '+self.datasetpath) |
63 |
> |
if (self.datasetpath.upper() != 'NONE'): |
64 |
> |
self.dataset_to_import.append(self.datasetpath) |
65 |
> |
|
66 |
> |
### Added PU dataset |
67 |
> |
tmp = cfg_params.get('CMSSW.dataset_pu',None) |
68 |
> |
if tmp : |
69 |
> |
datasets = tmp.split(',') |
70 |
> |
for dataset in datasets: |
71 |
> |
dataset=string.strip(dataset) |
72 |
> |
self.dataset_to_import.append(dataset) |
73 |
> |
### |
74 |
> |
|
75 |
|
self.SEName='' |
76 |
|
self.CMSSW_VERSION='' |
77 |
|
self.exit_status='' |
78 |
|
self.time = time.strftime('%y%m%d_%H%M%S',time.localtime(time.time())) |
79 |
+ |
self.problemFiles=[] |
80 |
+ |
self.noEventsFiles=[] |
81 |
+ |
self.noLFN=[] |
82 |
|
|
51 |
– |
|
83 |
|
def importParentDataset(self,globalDBS, datasetpath): |
84 |
|
""" |
85 |
|
""" |
86 |
|
dbsWriter = DBSWriter(self.DBSURL,level='ERROR') |
87 |
|
|
88 |
|
try: |
89 |
< |
common.logger.message("----->>>>importing parent dataset in the local dbs") |
59 |
< |
dbsWriter.importDataset(globalDBS, self.datasetpath, self.DBSURL) |
89 |
> |
dbsWriter.importDatasetWithoutParentage(globalDBS, self.datasetpath, self.DBSURL) |
90 |
|
except DBSWriterError, ex: |
91 |
|
msg = "Error importing dataset to be processed into local DBS\n" |
92 |
|
msg += "Source Dataset: %s\n" % datasetpath |
101 |
|
""" |
102 |
|
try: |
103 |
|
jobReport = readJobReport(file)[0] |
74 |
– |
msg = "--->>> reading "+file+" file" |
75 |
– |
common.logger.message(msg) |
104 |
|
self.exit_status = '0' |
105 |
|
except IndexError: |
106 |
|
self.exit_status = '1' |
107 |
|
msg = "Error: Problem with "+file+" file" |
108 |
|
common.logger.message(msg) |
109 |
|
return self.exit_status |
110 |
< |
#####for parents information import ######################################### |
111 |
< |
#### the globalDBS has to be written in the crab cfg file!!!!! ############### |
112 |
< |
globalDBS="http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet" |
113 |
< |
if (self.datasetpath != 'None'): |
114 |
< |
status_import=self.importParentDataset(globalDBS, self.datasetpath) |
115 |
< |
if (status_import == 1): |
116 |
< |
common.logger.message('Problem with parent import from the global DBS '+globalDBS+ 'to the local one '+self.DBSURL) |
117 |
< |
self.exit_status='1' |
118 |
< |
############################################################################### |
119 |
< |
## ___ >>>>>>> comment out the next line, if you have problem with the import |
120 |
< |
############################################################################### |
121 |
< |
return self.exit_status |
94 |
< |
pass |
110 |
> |
|
111 |
> |
if (len(self.dataset_to_import) != 0): |
112 |
> |
for dataset in self.dataset_to_import: |
113 |
> |
common.logger.message("--->>> Importing parent dataset in the dbs: " +dataset) |
114 |
> |
status_import=self.importParentDataset(self.globalDBS, dataset) |
115 |
> |
if (status_import == 1): |
116 |
> |
common.logger.message('Problem with parent '+ dataset +' import from the global DBS '+self.globalDBS+ 'to the local one '+self.DBSURL) |
117 |
> |
self.exit_status='1' |
118 |
> |
return self.exit_status |
119 |
> |
else: |
120 |
> |
common.logger.message('Import ok of dataset '+dataset) |
121 |
> |
|
122 |
|
#// DBS to contact |
123 |
< |
dbswriter = DBSWriter(self.DBSURL,level='ERROR') |
97 |
< |
# publish a dataset : it should be done only once for the task |
98 |
< |
# and not for all the JobReport |
123 |
> |
dbswriter = DBSWriter(self.DBSURL) |
124 |
|
try: |
125 |
|
fileinfo= jobReport.files[0] |
126 |
|
self.exit_status = '0' |
130 |
|
common.logger.message(msg) |
131 |
|
return self.exit_status |
132 |
|
|
108 |
– |
common.logger.message("FileInfo = " + str(fileinfo)) |
133 |
|
datasets=fileinfo.dataset |
134 |
< |
common.logger.message("DatasetInfo = " + str(datasets)) |
134 |
> |
common.logger.debug(6,"FileInfo = " + str(fileinfo)) |
135 |
> |
common.logger.debug(6,"DatasetInfo = " + str(datasets)) |
136 |
|
for dataset in datasets: |
137 |
< |
#### to understand how to fill cfgMeta info ############### |
138 |
< |
cfgMeta = {'name' : 'usercfg' , 'Type' : 'user' , 'annotation': 'user cfg', 'version' : 'private version'} # add real name of user cfg |
137 |
> |
#### for production data |
138 |
> |
self.processedData = dataset['ProcessedDataset'] |
139 |
> |
if (dataset['PrimaryDataset'] == 'null'): |
140 |
> |
dataset['PrimaryDataset'] = dataset['ProcessedDataset'] |
141 |
> |
else: # add parentage from input dataset |
142 |
> |
dataset['ParentDataset']= self.datasetpath |
143 |
> |
|
144 |
> |
dataset['PSetContent']=self.content |
145 |
> |
cfgMeta = {'name' : self.pset , 'Type' : 'user' , 'annotation': 'user cfg', 'version' : 'private version'} # add real name of user cfg |
146 |
|
common.logger.message("PrimaryDataset = %s"%dataset['PrimaryDataset']) |
147 |
|
common.logger.message("ProcessedDataset = %s"%dataset['ProcessedDataset']) |
148 |
< |
common.logger.message("Inserting primary: %s processed : %s"%(dataset['PrimaryDataset'],dataset['ProcessedDataset'])) |
148 |
> |
common.logger.message("<User Dataset Name> = /"+dataset['PrimaryDataset']+"/"+dataset['ProcessedDataset']+"/USER") |
149 |
> |
|
150 |
> |
common.logger.debug(6,"--->>> Inserting primary: %s processed : %s"%(dataset['PrimaryDataset'],dataset['ProcessedDataset'])) |
151 |
|
|
152 |
|
primary = DBSWriterObjects.createPrimaryDataset( dataset, dbswriter.dbs) |
153 |
+ |
common.logger.debug(6,"Primary: %s "%primary) |
154 |
|
|
155 |
|
algo = DBSWriterObjects.createAlgorithm(dataset, cfgMeta, dbswriter.dbs) |
156 |
+ |
common.logger.debug(6,"Algo: %s "%algo) |
157 |
|
|
158 |
|
processed = DBSWriterObjects.createProcessedDataset(primary, algo, dataset, dbswriter.dbs) |
159 |
+ |
common.logger.debug(6,"Processed: %s "%processed) |
160 |
+ |
|
161 |
+ |
common.logger.debug(6,"Inserted primary %s processed %s"%(primary,processed)) |
162 |
|
|
163 |
< |
common.logger.message("Inserted primary %s processed %s"%(primary,processed)) |
163 |
> |
common.logger.debug(6,"exit_status = %s "%self.exit_status) |
164 |
|
return self.exit_status |
165 |
|
|
166 |
|
def publishAJobReport(self,file,procdataset): |
167 |
|
""" |
168 |
+ |
input: xml file, processedDataset |
169 |
|
""" |
170 |
|
try: |
171 |
|
jobReport = readJobReport(file)[0] |
174 |
|
self.exit_status = '1' |
175 |
|
msg = "Error: Problem with "+file+" file" |
176 |
|
raise CrabException(msg) |
177 |
< |
|
178 |
< |
# overwite ProcessedDataset with user defined value |
177 |
> |
### overwrite ProcessedDataset with user defined value |
178 |
> |
### overwrite lumisections with no value |
179 |
> |
### skip publication for 0 events files |
180 |
> |
filestopublish=[] |
181 |
|
for file in jobReport.files: |
182 |
< |
for ds in file.dataset: |
183 |
< |
ds['ProcessedDataset']=procdataset |
182 |
> |
#### added check for problem with copy to SE and empty lfn |
183 |
> |
if (string.find(file['LFN'], 'copy_problems') != -1): |
184 |
> |
self.problemFiles.append(file['LFN']) |
185 |
> |
elif (file['LFN'] == ''): |
186 |
> |
self.noLFN.append(file['PFN']) |
187 |
> |
else: |
188 |
> |
if int(file['TotalEvents']) != 0 : |
189 |
> |
#file.lumisections = {} |
190 |
> |
# lumi info are now in run hash |
191 |
> |
file.runs = {} |
192 |
> |
for ds in file.dataset: |
193 |
> |
### FEDE FOR NEW LFN ### |
194 |
> |
#ds['ProcessedDataset']=procdataset |
195 |
> |
######################## |
196 |
> |
### Fede for production |
197 |
> |
if (ds['PrimaryDataset'] == 'null'): |
198 |
> |
ds['PrimaryDataset']=procdataset |
199 |
> |
filestopublish.append(file) |
200 |
> |
else: |
201 |
> |
self.noEventsFiles.append(file['LFN']) |
202 |
> |
jobReport.files = filestopublish |
203 |
> |
### if all files of FJR have number of events = 0 |
204 |
> |
if (len(filestopublish) == 0): |
205 |
> |
return None |
206 |
> |
|
207 |
|
#// DBS to contact |
208 |
< |
dbswriter = DBSWriter(self.DBSURL,level='ERROR') |
208 |
> |
dbswriter = DBSWriter(self.DBSURL) |
209 |
|
# insert files |
210 |
|
Blocks=None |
211 |
|
try: |
212 |
|
Blocks=dbswriter.insertFiles(jobReport) |
213 |
< |
common.logger.message("------>>>> Blocks = %s"%Blocks) |
213 |
> |
common.logger.message("Blocks = %s"%Blocks) |
214 |
|
except DBSWriterError, ex: |
215 |
< |
common.logger.message("insert file error: %s"%ex) |
215 |
> |
common.logger.message("Insert file error: %s"%ex) |
216 |
|
return Blocks |
217 |
|
|
218 |
|
def run(self): |
219 |
|
""" |
220 |
|
parse of all xml file on res dir and creation of distionary |
221 |
|
""" |
222 |
< |
common.logger.message("Starting data publish") |
222 |
> |
|
223 |
|
file_list = glob.glob(self.resDir+"crab_fjr*.xml") |
224 |
|
common.logger.debug(6, "file_list = "+str(file_list)) |
225 |
|
common.logger.debug(6, "len(file_list) = "+str(len(file_list))) |
226 |
< |
# FIXME: |
162 |
< |
# do the dataset publication self.publishDataset here |
163 |
< |
# |
226 |
> |
|
227 |
|
if (len(file_list)>0): |
228 |
|
BlocksList=[] |
229 |
+ |
common.logger.message("--->>> Start dataset publication") |
230 |
+ |
self.exit_status=self.publishDataset(file_list[0]) |
231 |
+ |
if (self.exit_status == '1'): |
232 |
+ |
return self.exit_status |
233 |
+ |
common.logger.message("--->>> End dataset publication") |
234 |
+ |
|
235 |
+ |
|
236 |
+ |
common.logger.message("--->>> Start files publication") |
237 |
|
for file in file_list: |
238 |
|
common.logger.message("file = "+file) |
168 |
– |
common.logger.message("Publishing dataset") |
169 |
– |
self.exit_status=self.publishDataset(file) |
170 |
– |
if (self.exit_status == '1'): |
171 |
– |
return self.exit_status |
172 |
– |
common.logger.message("Publishing files") |
239 |
|
Blocks=self.publishAJobReport(file,self.processedData) |
240 |
|
if Blocks: |
241 |
< |
[BlocksList.append(x) for x in Blocks] |
241 |
> |
for x in Blocks: # do not allow multiple entries of the same block |
242 |
> |
if x not in BlocksList: |
243 |
> |
BlocksList.append(x) |
244 |
> |
|
245 |
|
# close the blocks |
246 |
< |
common.logger.message("------>>>> BlocksList = %s"%BlocksList) |
247 |
< |
dbswriter = DBSWriter(self.DBSURL,level='ERROR') |
246 |
> |
common.logger.debug(6, "BlocksList = %s"%BlocksList) |
247 |
> |
# dbswriter = DBSWriter(self.DBSURL,level='ERROR') |
248 |
> |
dbswriter = DBSWriter(self.DBSURL) |
249 |
> |
|
250 |
|
for BlockName in BlocksList: |
251 |
|
try: |
252 |
|
closeBlock=dbswriter.manageFileBlock(BlockName,maxFiles= 1) |
253 |
< |
common.logger.message("------>>>> closeBlock %s"%closeBlock) |
253 |
> |
common.logger.debug(6, "closeBlock %s"%closeBlock) |
254 |
|
#dbswriter.dbs.closeBlock(BlockName) |
255 |
|
except DBSWriterError, ex: |
256 |
< |
common.logger.message("------>>>> close block error %s"%ex) |
256 |
> |
common.logger.message("Close block error %s"%ex) |
257 |
|
|
258 |
+ |
if (len(self.noEventsFiles)>0): |
259 |
+ |
common.logger.message("--->>> WARNING: "+str(len(self.noEventsFiles))+" files not published because they contain 0 events are:") |
260 |
+ |
for lfn in self.noEventsFiles: |
261 |
+ |
common.logger.message("------ LFN: %s"%lfn) |
262 |
+ |
if (len(self.noLFN)>0): |
263 |
+ |
common.logger.message("--->>> WARNING: there are "+str(len(self.noLFN))+" files not published because they have empty LFN") |
264 |
+ |
for pfn in self.noLFN: |
265 |
+ |
common.logger.message("------ pfn: %s"%pfn) |
266 |
+ |
if (len(self.problemFiles)>0): |
267 |
+ |
common.logger.message("--->>> WARNING: "+str(len(self.problemFiles))+" files not published because they had problem with copy to SE") |
268 |
+ |
for lfn in self.problemFiles: |
269 |
+ |
common.logger.message("------ LFN: %s"%lfn) |
270 |
+ |
common.logger.message("--->>> End files publication") |
271 |
+ |
common.logger.message("--->>> To check data publication please use: InspectDBS2.py --DBSURL=<dbs_url_for_publication> --datasetPath=<User Dataset Name>") |
272 |
|
return self.exit_status |
273 |
|
|
274 |
|
else: |
275 |
< |
common.logger.message(self.resDir+" empty --> No file to publish on DBS/DLS") |
275 |
> |
common.logger.message("--->>> "+self.resDir+" empty: no file to publish on DBS") |
276 |
|
self.exit_status = '1' |
277 |
|
return self.exit_status |
278 |
|
|