3 |
|
import time, glob |
4 |
|
from Actor import * |
5 |
|
from crab_util import * |
6 |
– |
from crab_logger import Logger |
6 |
|
from crab_exceptions import * |
7 |
|
from ProdCommon.FwkJobRep.ReportParser import readJobReport |
8 |
|
from ProdCommon.FwkJobRep.ReportState import checkSuccess |
30 |
|
raise CrabException('Cannot publish output data, because you did not specify USER.publish_data_name parameter in the crab.cfg file') |
31 |
|
|
32 |
|
try: |
33 |
< |
if (int(cfg_params['USER.copy_data']) != 1): raise KeyError |
33 |
> |
if (int(cfg_params['USER.copy_data']) != 1): |
34 |
> |
raise KeyError |
35 |
|
except KeyError: |
36 |
< |
raise CrabException('You can not publish data because you did not selected *** copy_data = 1 *** in the crab.cfg file') |
36 |
> |
msg = 'You can not publish data because you did not selected \n' |
37 |
> |
msg += '\t*** copy_data = 1 or publish_data = 1 *** in the crab.cfg file' |
38 |
> |
raise CrabException(msg) |
39 |
|
try: |
40 |
|
self.pset = cfg_params['CMSSW.pset'] |
41 |
|
except KeyError: |
46 |
|
self.globalDBS="http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet" |
47 |
|
try: |
48 |
|
self.DBSURL=cfg_params['USER.dbs_url_for_publication'] |
49 |
< |
common.logger.message('<dbs_url_for_publication> = '+self.DBSURL) |
49 |
> |
common.logger.info('<dbs_url_for_publication> = '+self.DBSURL) |
50 |
|
if (self.DBSURL == "http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet") or (self.DBSURL == "https://cmsdbsprod.cern.ch:8443/cms_dbs_prod_global_writer/servlet/DBSServlet"): |
51 |
|
msg = "You can not publish your data in the globalDBS = " + self.DBSURL + "\n" |
52 |
|
msg = msg + "Please write your local one in the [USER] section 'dbs_url_for_publication'" |
74 |
|
dataset=string.strip(dataset) |
75 |
|
self.dataset_to_import.append(dataset) |
76 |
|
### |
77 |
< |
|
77 |
> |
|
78 |
> |
self.skipOcheck=cfg_params.get('CMSSW.pubilish_zero_event',0) |
79 |
> |
|
80 |
|
self.SEName='' |
81 |
|
self.CMSSW_VERSION='' |
82 |
|
self.exit_status='' |
91 |
|
dbsWriter = DBSWriter(self.DBSURL,level='ERROR') |
92 |
|
|
93 |
|
try: |
94 |
< |
dbsWriter.importDatasetWithoutParentage(globalDBS, self.datasetpath, self.DBSURL) |
94 |
> |
#dbsWriter.importDatasetWithoutParentage(globalDBS, self.datasetpath, self.DBSURL) |
95 |
> |
dbsWriter.importDataset(globalDBS, self.datasetpath, self.DBSURL) |
96 |
|
except DBSWriterError, ex: |
97 |
|
msg = "Error importing dataset to be processed into local DBS\n" |
98 |
|
msg += "Source Dataset: %s\n" % datasetpath |
99 |
|
msg += "Source DBS: %s\n" % globalDBS |
100 |
|
msg += "Destination DBS: %s\n" % self.DBSURL |
101 |
< |
common.logger.message(msg) |
101 |
> |
common.logger.info(msg) |
102 |
> |
common.logger.debug(str(ex)) |
103 |
|
return 1 |
104 |
|
return 0 |
105 |
|
|
112 |
|
except IndexError: |
113 |
|
self.exit_status = '1' |
114 |
|
msg = "Error: Problem with "+file+" file" |
115 |
< |
common.logger.message(msg) |
115 |
> |
common.logger.info(msg) |
116 |
|
return self.exit_status |
117 |
|
|
118 |
|
if (len(self.dataset_to_import) != 0): |
119 |
|
for dataset in self.dataset_to_import: |
120 |
< |
common.logger.message("--->>> Importing parent dataset in the dbs: " +dataset) |
120 |
> |
common.logger.info("--->>> Importing parent dataset in the dbs: " +dataset) |
121 |
|
status_import=self.importParentDataset(self.globalDBS, dataset) |
122 |
|
if (status_import == 1): |
123 |
< |
common.logger.message('Problem with parent '+ dataset +' import from the global DBS '+self.globalDBS+ 'to the local one '+self.DBSURL) |
123 |
> |
common.logger.info('Problem with parent '+ dataset +' import from the global DBS '+self.globalDBS+ 'to the local one '+self.DBSURL) |
124 |
|
self.exit_status='1' |
125 |
|
return self.exit_status |
126 |
|
else: |
127 |
< |
common.logger.message('Import ok of dataset '+dataset) |
127 |
> |
common.logger.info('Import ok of dataset '+dataset) |
128 |
|
|
129 |
|
#// DBS to contact |
130 |
|
dbswriter = DBSWriter(self.DBSURL) |
134 |
|
except IndexError: |
135 |
|
self.exit_status = '1' |
136 |
|
msg = "Error: No file to publish in xml file"+file+" file" |
137 |
< |
common.logger.message(msg) |
137 |
> |
common.logger.info(msg) |
138 |
|
return self.exit_status |
139 |
|
|
140 |
|
datasets=fileinfo.dataset |
141 |
< |
common.logger.debug(6,"FileInfo = " + str(fileinfo)) |
142 |
< |
common.logger.debug(6,"DatasetInfo = " + str(datasets)) |
141 |
> |
common.logger.log(10-1,"FileInfo = " + str(fileinfo)) |
142 |
> |
common.logger.log(10-1,"DatasetInfo = " + str(datasets)) |
143 |
|
if len(datasets)<=0: |
144 |
|
self.exit_status = '1' |
145 |
|
msg = "Error: No info about dataset in the xml file "+file |
146 |
< |
common.logger.message(msg) |
146 |
> |
common.logger.info(msg) |
147 |
|
return self.exit_status |
148 |
|
for dataset in datasets: |
149 |
|
#### for production data |
157 |
|
|
158 |
|
dataset['PSetContent']=self.content |
159 |
|
cfgMeta = {'name' : self.pset , 'Type' : 'user' , 'annotation': 'user cfg', 'version' : 'private version'} # add real name of user cfg |
160 |
< |
common.logger.message("PrimaryDataset = %s"%dataset['PrimaryDataset']) |
161 |
< |
common.logger.message("ProcessedDataset = %s"%dataset['ProcessedDataset']) |
162 |
< |
common.logger.message("<User Dataset Name> = /"+dataset['PrimaryDataset']+"/"+dataset['ProcessedDataset']+"/USER") |
160 |
> |
common.logger.info("PrimaryDataset = %s"%dataset['PrimaryDataset']) |
161 |
> |
common.logger.info("ProcessedDataset = %s"%dataset['ProcessedDataset']) |
162 |
> |
common.logger.info("<User Dataset Name> = /"+dataset['PrimaryDataset']+"/"+dataset['ProcessedDataset']+"/USER") |
163 |
|
|
164 |
< |
common.logger.debug(6,"--->>> Inserting primary: %s processed : %s"%(dataset['PrimaryDataset'],dataset['ProcessedDataset'])) |
164 |
> |
common.logger.log(10-1,"--->>> Inserting primary: %s processed : %s"%(dataset['PrimaryDataset'],dataset['ProcessedDataset'])) |
165 |
|
|
166 |
|
primary = DBSWriterObjects.createPrimaryDataset( dataset, dbswriter.dbs) |
167 |
< |
common.logger.debug(6,"Primary: %s "%primary) |
167 |
> |
common.logger.log(10-1,"Primary: %s "%primary) |
168 |
|
|
169 |
|
algo = DBSWriterObjects.createAlgorithm(dataset, cfgMeta, dbswriter.dbs) |
170 |
< |
common.logger.debug(6,"Algo: %s "%algo) |
170 |
> |
common.logger.log(10-1,"Algo: %s "%algo) |
171 |
|
|
172 |
|
processed = DBSWriterObjects.createProcessedDataset(primary, algo, dataset, dbswriter.dbs) |
173 |
< |
common.logger.debug(6,"Processed: %s "%processed) |
173 |
> |
common.logger.log(10-1,"Processed: %s "%processed) |
174 |
|
|
175 |
< |
common.logger.debug(6,"Inserted primary %s processed %s"%(primary,processed)) |
175 |
> |
common.logger.log(10-1,"Inserted primary %s processed %s"%(primary,processed)) |
176 |
|
|
177 |
< |
common.logger.debug(6,"exit_status = %s "%self.exit_status) |
177 |
> |
common.logger.log(10-1,"exit_status = %s "%self.exit_status) |
178 |
|
return self.exit_status |
179 |
|
|
180 |
|
def publishAJobReport(self,file,procdataset): |
199 |
|
elif (file['LFN'] == ''): |
200 |
|
self.noLFN.append(file['PFN']) |
201 |
|
else: |
202 |
< |
if int(file['TotalEvents']) != 0 : |
203 |
< |
#file.lumisections = {} |
204 |
< |
# lumi info are now in run hash |
202 |
> |
if self.skipOcheck==0: |
203 |
> |
if int(file['TotalEvents']) != 0: |
204 |
> |
#file.lumisections = {} |
205 |
> |
# lumi info are now in run hash |
206 |
> |
file.runs = {} |
207 |
> |
for ds in file.dataset: |
208 |
> |
### Fede for production |
209 |
> |
if (ds['PrimaryDataset'] == 'null'): |
210 |
> |
#ds['PrimaryDataset']=procdataset |
211 |
> |
ds['PrimaryDataset']=self.userprocessedData |
212 |
> |
filestopublish.append(file) |
213 |
> |
else: |
214 |
> |
self.noEventsFiles.append(file['LFN']) |
215 |
> |
else: |
216 |
|
file.runs = {} |
217 |
|
for ds in file.dataset: |
218 |
|
### Fede for production |
220 |
|
#ds['PrimaryDataset']=procdataset |
221 |
|
ds['PrimaryDataset']=self.userprocessedData |
222 |
|
filestopublish.append(file) |
223 |
< |
else: |
207 |
< |
self.noEventsFiles.append(file['LFN']) |
223 |
> |
|
224 |
|
jobReport.files = filestopublish |
225 |
|
### if all files of FJR have number of events = 0 |
226 |
|
if (len(filestopublish) == 0): |
232 |
|
Blocks=None |
233 |
|
try: |
234 |
|
Blocks=dbswriter.insertFiles(jobReport) |
235 |
< |
common.logger.message("Inserting file in blocks = %s"%Blocks) |
235 |
> |
common.logger.info("Inserting file in blocks = %s"%Blocks) |
236 |
|
except DBSWriterError, ex: |
237 |
< |
common.logger.error("Insert file error: %s"%ex) |
237 |
> |
common.logger.info("Insert file error: %s"%ex) |
238 |
|
return Blocks |
239 |
|
|
240 |
|
def run(self): |
252 |
|
good_list.append(fjr) |
253 |
|
file_list=good_list |
254 |
|
## |
255 |
< |
common.logger.debug(6, "file_list = "+str(file_list)) |
256 |
< |
common.logger.debug(6, "len(file_list) = "+str(len(file_list))) |
255 |
> |
common.logger.log(10-1, "file_list = "+str(file_list)) |
256 |
> |
common.logger.log(10-1, "len(file_list) = "+str(len(file_list))) |
257 |
|
|
258 |
|
if (len(file_list)>0): |
259 |
|
BlocksList=[] |
260 |
< |
common.logger.message("--->>> Start dataset publication") |
260 |
> |
common.logger.info("--->>> Start dataset publication") |
261 |
|
self.exit_status=self.publishDataset(file_list[0]) |
262 |
|
if (self.exit_status == '1'): |
263 |
|
return self.exit_status |
264 |
< |
common.logger.message("--->>> End dataset publication") |
264 |
> |
common.logger.info("--->>> End dataset publication") |
265 |
|
|
266 |
|
|
267 |
< |
common.logger.message("--->>> Start files publication") |
267 |
> |
common.logger.info("--->>> Start files publication") |
268 |
|
for file in file_list: |
269 |
< |
common.logger.debug(1, "file = "+file) |
269 |
> |
common.logger.debug( "file = "+file) |
270 |
|
Blocks=self.publishAJobReport(file,self.processedData) |
271 |
|
if Blocks: |
272 |
|
for x in Blocks: # do not allow multiple entries of the same block |
274 |
|
BlocksList.append(x) |
275 |
|
|
276 |
|
# close the blocks |
277 |
< |
common.logger.debug(6, "BlocksList = %s"%BlocksList) |
277 |
> |
common.logger.log(10-1, "BlocksList = %s"%BlocksList) |
278 |
|
# dbswriter = DBSWriter(self.DBSURL,level='ERROR') |
279 |
|
dbswriter = DBSWriter(self.DBSURL) |
280 |
|
|
281 |
|
for BlockName in BlocksList: |
282 |
|
try: |
283 |
|
closeBlock=dbswriter.manageFileBlock(BlockName,maxFiles= 1) |
284 |
< |
common.logger.debug(6, "closeBlock %s"%closeBlock) |
284 |
> |
common.logger.log(10-1, "closeBlock %s"%closeBlock) |
285 |
|
#dbswriter.dbs.closeBlock(BlockName) |
286 |
|
except DBSWriterError, ex: |
287 |
< |
common.logger.message("Close block error %s"%ex) |
287 |
> |
common.logger.info("Close block error %s"%ex) |
288 |
|
|
289 |
|
if (len(self.noEventsFiles)>0): |
290 |
< |
common.logger.message("--->>> WARNING: "+str(len(self.noEventsFiles))+" files not published because they contain 0 events are:") |
290 |
> |
common.logger.info("--->>> WARNING: "+str(len(self.noEventsFiles))+" files not published because they contain 0 events are:") |
291 |
|
for lfn in self.noEventsFiles: |
292 |
< |
common.logger.message("------ LFN: %s"%lfn) |
292 |
> |
common.logger.info("------ LFN: %s"%lfn) |
293 |
|
if (len(self.noLFN)>0): |
294 |
< |
common.logger.message("--->>> WARNING: there are "+str(len(self.noLFN))+" files not published because they have empty LFN") |
294 |
> |
common.logger.info("--->>> WARNING: there are "+str(len(self.noLFN))+" files not published because they have empty LFN") |
295 |
|
for pfn in self.noLFN: |
296 |
< |
common.logger.message("------ pfn: %s"%pfn) |
296 |
> |
common.logger.info("------ pfn: %s"%pfn) |
297 |
|
if (len(self.problemFiles)>0): |
298 |
< |
common.logger.message("--->>> WARNING: "+str(len(self.problemFiles))+" files not published because they had problem with copy to SE") |
298 |
> |
common.logger.info("--->>> WARNING: "+str(len(self.problemFiles))+" files not published because they had problem with copy to SE") |
299 |
|
for lfn in self.problemFiles: |
300 |
< |
common.logger.message("------ LFN: %s"%lfn) |
301 |
< |
common.logger.message("--->>> End files publication") |
302 |
< |
common.logger.message("--->>> To check data publication please use: InspectDBS2.py --DBSURL=<dbs_url_for_publication> --datasetPath=<User Dataset Name>") |
300 |
> |
common.logger.info("------ LFN: %s"%lfn) |
301 |
> |
common.logger.info("--->>> End files publication") |
302 |
> |
common.logger.info("--->>> To check data publication please use: InspectDBS2.py --DBSURL=<dbs_url_for_publication> --datasetPath=<User Dataset Name>") |
303 |
|
return self.exit_status |
304 |
|
|
305 |
|
else: |
306 |
< |
common.logger.message("--->>> "+self.resDir+" empty: no file to publish on DBS") |
306 |
> |
common.logger.info("--->>> "+self.resDir+" empty: no file to publish on DBS") |
307 |
|
self.exit_status = '1' |
308 |
|
return self.exit_status |
309 |
|
|