12 |
|
from ProdCommon.DataMgmt.DBS.DBSReader import DBSReader |
13 |
|
from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter,DBSWriterObjects |
14 |
|
import sys |
15 |
– |
from DBSAPI.dbsApiException import DbsException |
15 |
|
from DBSAPI.dbsApi import DbsApi |
16 |
+ |
from DBSAPI.dbsMigrateApi import DbsMigrateApi |
17 |
+ |
from DBSAPI.dbsApiException import * |
18 |
|
|
19 |
|
class Publisher(Actor): |
20 |
|
def __init__(self, cfg_params): |
25 |
|
- returns <file> section of xml in dictionary format for each xml file in crab_0_xxxx/res directory |
26 |
|
- publishes output data on DBS and DLS |
27 |
|
""" |
27 |
– |
self.cfg_params=cfg_params |
28 |
|
|
29 |
+ |
self.cfg_params=cfg_params |
30 |
+ |
self.fjrDirectory = cfg_params.get('USER.outputdir' , |
31 |
+ |
common.work_space.resDir()) + '/' |
32 |
+ |
|
33 |
|
if not cfg_params.has_key('USER.publish_data_name'): |
34 |
|
raise CrabException('Cannot publish output data, because you did not specify USER.publish_data_name parameter in the crab.cfg file') |
35 |
|
self.userprocessedData = cfg_params['USER.publish_data_name'] |
56 |
|
self.DBSURL=cfg_params['USER.dbs_url_for_publication'] |
57 |
|
common.logger.info('<dbs_url_for_publication> = '+self.DBSURL) |
58 |
|
if (self.DBSURL == "http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet") or (self.DBSURL == "https://cmsdbsprod.cern.ch:8443/cms_dbs_prod_global_writer/servlet/DBSServlet"): |
59 |
< |
msg = "You can not publish your data in the globalDBS = " + self.DBSURL + "\n" |
59 |
> |
msg = "You can not publish your data in the globalDBS = " + self.DBSURL + "\n" |
60 |
|
msg = msg + "Please write your local one in the [USER] section 'dbs_url_for_publication'" |
61 |
|
raise CrabException(msg) |
62 |
< |
|
62 |
> |
|
63 |
|
self.content=file(self.pset).read() |
64 |
|
self.resDir = common.work_space.resDir() |
65 |
< |
|
65 |
> |
|
66 |
|
self.dataset_to_import=[] |
67 |
< |
|
67 |
> |
|
68 |
|
self.datasetpath=cfg_params['CMSSW.datasetpath'] |
69 |
|
if (self.datasetpath.upper() != 'NONE'): |
70 |
|
self.dataset_to_import.append(self.datasetpath) |
71 |
< |
|
71 |
> |
|
72 |
|
### Added PU dataset |
73 |
|
tmp = cfg_params.get('CMSSW.dataset_pu',None) |
74 |
|
if tmp : |
76 |
|
for dataset in datasets: |
77 |
|
dataset=string.strip(dataset) |
78 |
|
self.dataset_to_import.append(dataset) |
79 |
< |
### |
80 |
< |
|
77 |
< |
|
79 |
> |
### |
80 |
> |
|
81 |
|
self.import_all_parents = cfg_params.get('USER.publish_with_import_all_parents',1) |
82 |
|
|
80 |
– |
### fede import parent dataset is compulsory ### |
83 |
|
if ( int(self.import_all_parents) == 0 ): |
84 |
|
common.logger.info("WARNING: The option USER.publish_with_import_all_parents=0 has been deprecated. The import of parents is compulsory and done by default") |
85 |
< |
############ |
86 |
< |
|
87 |
< |
self.skipOcheck=cfg_params.get('CMSSW.publish_zero_event',0) |
86 |
< |
|
85 |
> |
self.skipOcheck=cfg_params.get('CMSSW.publish_zero_event',1) |
86 |
> |
if ( int(self.skipOcheck) == 0 ): |
87 |
> |
common.logger.info("WARNING: The option CMSSW.publish_zero_event has been deprecated. The publication is done by default also for files with 0 events") |
88 |
|
self.SEName='' |
89 |
|
self.CMSSW_VERSION='' |
90 |
|
self.exit_status='' |
93 |
|
self.noEventsFiles=[] |
94 |
|
self.noLFN=[] |
95 |
|
|
96 |
+ |
#### FEDE to allow publication without input data in <file> |
97 |
+ |
if cfg_params.has_key('USER.no_inp'): |
98 |
+ |
self.no_inp = cfg_params['USER.no_inp'] |
99 |
+ |
else: |
100 |
+ |
self.no_inp = 0 |
101 |
+ |
############################################################ |
102 |
|
def importParentDataset(self,globalDBS, datasetpath): |
103 |
|
""" |
104 |
+ |
WARNING: it works only with DBS_2_0_9_patch_6 |
105 |
|
""" |
106 |
< |
dbsWriter = DBSWriter(self.DBSURL,level='ERROR') |
106 |
> |
|
107 |
> |
args={'url':globalDBS} |
108 |
> |
try: |
109 |
> |
api_reader = DbsApi(args) |
110 |
> |
except DbsApiException, ex: |
111 |
> |
msg = "%s\n" % formatEx(ex) |
112 |
> |
raise CrabException(msg) |
113 |
> |
|
114 |
> |
args={'url':self.DBSURL} |
115 |
> |
try: |
116 |
> |
api_writer = DbsApi(args) |
117 |
> |
except DbsApiException, ex: |
118 |
> |
msg = "%s\n" % formatEx(ex) |
119 |
> |
raise CrabException(msg) |
120 |
|
|
121 |
|
try: |
101 |
– |
#if (self.import_all_parents==1): |
122 |
|
common.logger.info("--->>> Importing all parents level") |
123 |
|
start = time.time() |
124 |
< |
common.logger.debug("start import time: " + str(start)) |
125 |
< |
### to skip the ProdCommon api exception in the case of block without location |
126 |
< |
### skipNoSiteError=True |
127 |
< |
#dbsWriter.importDataset(globalDBS, datasetpath, self.DBSURL, skipNoSiteError=True) |
128 |
< |
### calling dbs api directly |
129 |
< |
dbsWriter.dbs.migrateDatasetContents(globalDBS, self.DBSURL, datasetpath) |
124 |
> |
common.logger.debug("start import parents time: " + str(start)) |
125 |
> |
for block in api_reader.listBlocks(datasetpath): |
126 |
> |
if (str(block['OpenForWriting']) != '1'): |
127 |
> |
api_writer.dbsMigrateBlock(globalDBS,self.DBSURL,block['Name'] ) |
128 |
> |
else: |
129 |
> |
common.logger.debug("Skipping the import of " + block['Name'] + " it is an open block") |
130 |
> |
continue |
131 |
> |
################ |
132 |
|
stop = time.time() |
133 |
< |
common.logger.debug("stop import time: " + str(stop)) |
133 |
> |
common.logger.debug("stop import parents time: " + str(stop)) |
134 |
|
common.logger.info("--->>> duration of all parents import (sec): "+str(stop - start)) |
135 |
< |
## still not removing the code, but TODO for the final release... |
114 |
< |
""" |
115 |
< |
else: |
116 |
< |
common.logger.info("--->>> Importing only the datasetpath " + datasetpath) |
117 |
< |
start = time.time() |
118 |
< |
#dbsWriter.importDatasetWithoutParentage(globalDBS, datasetpath, self.DBSURL, skipNoSiteError=True) |
119 |
< |
### calling dbs api directly |
120 |
< |
common.logger.debug("start import time: " + str(start)) |
121 |
< |
dbsWriter.dbs.migrateDatasetContents(globalDBS, self.DBSURL, datasetpath, noParentsReadOnly = True ) |
122 |
< |
stop = time.time() |
123 |
< |
common.logger.debug("stop import time: " + str(stop)) |
124 |
< |
common.logger.info("--->>> duration of first level parent import (sec): "+str(stop - start)) |
125 |
< |
""" |
126 |
< |
except DBSWriterError, ex: |
135 |
> |
except DbsApiException, ex: |
136 |
|
msg = "Error importing dataset to be processed into local DBS\n" |
137 |
|
msg += "Source Dataset: %s\n" % datasetpath |
138 |
|
msg += "Source DBS: %s\n" % globalDBS |
146 |
|
""" |
147 |
|
""" |
148 |
|
try: |
140 |
– |
### fjr content as argument |
149 |
|
jobReport = readJobReport(file)[0] |
150 |
|
self.exit_status = '0' |
151 |
|
except IndexError: |
154 |
|
common.logger.info(msg) |
155 |
|
return self.exit_status |
156 |
|
|
149 |
– |
#print "###################################################" |
150 |
– |
#print "len(jobReport.files) = ", len(jobReport.files) |
151 |
– |
#print "jobReport.files = ", jobReport.files |
152 |
– |
#print "###################################################" |
153 |
– |
|
157 |
|
if (len(self.dataset_to_import) != 0): |
158 |
|
for dataset in self.dataset_to_import: |
159 |
|
common.logger.info("--->>> Importing parent dataset in the dbs: " +dataset) |
245 |
|
self.exit_status = '1' |
246 |
|
msg = "Error: Problem with "+file+" file" |
247 |
|
raise CrabException(msg) |
245 |
– |
### overwrite ProcessedDataset with user defined value |
246 |
– |
### overwrite lumisections with no value |
248 |
|
### skip publication for 0 events files |
249 |
|
filestopublish=[] |
250 |
|
for file in jobReport.files: |
254 |
|
elif (file['LFN'] == ''): |
255 |
|
self.noLFN.append(file['PFN']) |
256 |
|
else: |
257 |
< |
if self.skipOcheck==0: |
258 |
< |
if int(file['TotalEvents']) != 0: |
259 |
< |
### Fede to insert also run and lumi info in DBS |
260 |
< |
#file.runs = {} |
261 |
< |
for ds in file.dataset: |
262 |
< |
### Fede for production |
263 |
< |
if (ds['PrimaryDataset'] == 'null'): |
264 |
< |
ds['PrimaryDataset']=self.userprocessedData |
264 |
< |
filestopublish.append(file) |
265 |
< |
else: |
266 |
< |
self.noEventsFiles.append(file['LFN']) |
267 |
< |
else: |
268 |
< |
### Fede to insert also run and lumi info in DBS |
269 |
< |
#file.runs = {} |
270 |
< |
for ds in file.dataset: |
271 |
< |
### For production |
272 |
< |
if (ds['PrimaryDataset'] == 'null'): |
273 |
< |
ds['PrimaryDataset']=self.userprocessedData |
274 |
< |
filestopublish.append(file) |
275 |
< |
|
276 |
< |
### only good files will be published |
257 |
> |
if int(file['TotalEvents']) == 0: |
258 |
> |
self.noEventsFiles.append(file['LFN']) |
259 |
> |
for ds in file.dataset: |
260 |
> |
### Fede for production |
261 |
> |
if (ds['PrimaryDataset'] == 'null'): |
262 |
> |
ds['PrimaryDataset']=self.userprocessedData |
263 |
> |
filestopublish.append(file) |
264 |
> |
|
265 |
|
jobReport.files = filestopublish |
278 |
– |
#print "------>>> filestopublish = ", filestopublish |
266 |
|
for file in filestopublish: |
267 |
|
common.logger.debug("--->>> LFN of file to publish = " + str(file['LFN'])) |
281 |
– |
#print "--->>> LFN of file to publish = ", str(file['LFN']) |
268 |
|
### if all files of FJR have number of events = 0 |
269 |
|
if (len(filestopublish) == 0): |
270 |
|
return None |
271 |
< |
|
271 |
> |
|
272 |
|
#// DBS to contact |
273 |
|
dbswriter = DBSWriter(self.DBSURL) |
274 |
|
# insert files |
276 |
|
try: |
277 |
|
### FEDE added insertDetectorData = True to propagate in DBS info about run and lumi |
278 |
|
Blocks=dbswriter.insertFiles(jobReport, insertDetectorData = True) |
279 |
+ |
#Blocks=dbswriter.insertFiles(jobReport) |
280 |
|
common.logger.debug("--->>> Inserting file in blocks = %s"%Blocks) |
281 |
|
except DBSWriterError, ex: |
282 |
|
common.logger.debug("--->>> Insert file error: %s"%ex) |
283 |
|
return Blocks |
284 |
|
|
285 |
+ |
def remove_input_from_fjr(self, list_of_good_files): |
286 |
+ |
""" |
287 |
+ |
to remove the input file from fjr in the case of problem with lfn |
288 |
+ |
""" |
289 |
+ |
from xml.etree.ElementTree import ElementTree, Element |
290 |
+ |
new_good_list = [] |
291 |
+ |
no_inp_dir = self.fjrDirectory + 'no_inp' |
292 |
+ |
if not os.path.isdir(no_inp_dir): |
293 |
+ |
try: |
294 |
+ |
os.mkdir(no_inp_dir) |
295 |
+ |
print "no_inp_dir = ", no_inp_dir |
296 |
+ |
except: |
297 |
+ |
print "problem during no_inp_dir creation: ", no_inp_dir |
298 |
+ |
for file in list_of_good_files: |
299 |
+ |
name_of_file = os.path.basename(file) |
300 |
+ |
#print "name_of_file = " , name_of_file |
301 |
+ |
oldxmlfile = ElementTree() |
302 |
+ |
oldxmlfile.parse(file) |
303 |
+ |
newxmlfile = ElementTree(Element(oldxmlfile.getroot().tag)) |
304 |
+ |
self.recurse(oldxmlfile.getroot(), newxmlfile.getroot()) |
305 |
+ |
new_good_file = no_inp_dir + '/' + name_of_file |
306 |
+ |
newxmlfile.write(new_good_file) |
307 |
+ |
new_good_list.append(new_good_file) |
308 |
+ |
print "new_good_list = ", new_good_list |
309 |
+ |
return new_good_list |
310 |
+ |
|
311 |
+ |
def recurse(self,oldnode,newnode): |
312 |
+ |
""" |
313 |
+ |
recursive function to remove |
314 |
+ |
""" |
315 |
+ |
from xml.etree.ElementTree import ElementTree, Element |
316 |
+ |
try: |
317 |
+ |
newnode.text = oldnode.text |
318 |
+ |
except AttributeError: pass |
319 |
+ |
try: |
320 |
+ |
newnode.attrib = oldnode.attrib |
321 |
+ |
except AttributeError: pass |
322 |
+ |
try: |
323 |
+ |
newnode.tail = oldnode.tail |
324 |
+ |
except AttributeError: pass |
325 |
+ |
|
326 |
+ |
for oldi in oldnode.getchildren(): |
327 |
+ |
if oldi.tag != "Inputs" and oldi.tag == "DatasetInfo": |
328 |
+ |
newi = Element(oldi.tag) |
329 |
+ |
newtag = Element("Entry") |
330 |
+ |
newtag.attrib = {'Name':'Description'} |
331 |
+ |
newtag.text = 'Unknown provenance' |
332 |
+ |
newi.append(newtag) |
333 |
+ |
newnode.append(newi) |
334 |
+ |
self.recurse(oldi, newi) |
335 |
+ |
elif oldi.tag != "Inputs" and oldi.tag != "DatasetInfo": |
336 |
+ |
newi = Element(oldi.tag) |
337 |
+ |
newnode.append(newi) |
338 |
+ |
self.recurse(oldi, newi) |
339 |
+ |
|
340 |
|
def run(self): |
341 |
|
""" |
342 |
|
parse of all xml file on res dir and creation of distionary |
343 |
|
""" |
302 |
– |
|
303 |
– |
file_list = glob.glob(self.resDir+"crab_fjr*.xml") |
304 |
– |
|
305 |
– |
## Select only those fjr that are succesfull |
306 |
– |
if (len(file_list)==0): |
307 |
– |
common.logger.info("--->>> "+self.resDir+" empty: no fjr files in the res dir to publish on DBS") |
308 |
– |
self.exit_status = '1' |
309 |
– |
return self.exit_status |
310 |
– |
|
311 |
– |
good_list=[] |
344 |
|
|
345 |
< |
for fjr in file_list: |
345 |
> |
task = common._db.getTask() |
346 |
> |
good_list=[] |
347 |
> |
|
348 |
> |
for job in task.getJobs(): |
349 |
> |
fjr = self.fjrDirectory + job['outputFiles'][-1] |
350 |
> |
if (job.runningJob['applicationReturnCode']!=0 or job.runningJob['wrapperReturnCode']!=0): continue |
351 |
> |
# get FJR filename |
352 |
> |
fjr = self.fjrDirectory + job['outputFiles'][-1] |
353 |
|
reports = readJobReport(fjr) |
354 |
|
if len(reports)>0: |
316 |
– |
### with backup-copy the wrapper_exit_code is 60308 --> failed |
355 |
|
if reports[0].status == "Success": |
356 |
|
good_list.append(fjr) |
357 |
+ |
|
358 |
+ |
#################################################### |
359 |
+ |
if self.no_inp == 1: |
360 |
+ |
file_list = self.remove_input_from_fjr(good_list) |
361 |
+ |
else: |
362 |
+ |
file_list=good_list |
363 |
+ |
print "file_list = ", file_list |
364 |
+ |
#################################################### |
365 |
|
|
320 |
– |
file_list=good_list |
321 |
– |
#print "fjr ok for publication are good_list = ", good_list |
322 |
– |
## |
366 |
|
common.logger.log(10-1, "fjr with FrameworkJobReport Status='Success', file_list = "+str(file_list)) |
367 |
|
common.logger.log(10-1, "len(file_list) = "+str(len(file_list))) |
368 |
|
|
369 |
|
if (len(file_list)>0): |
370 |
|
BlocksList=[] |
371 |
|
common.logger.info("--->>> Start dataset publication") |
329 |
– |
### primo fjr trovato |
372 |
|
self.exit_status=self.publishDataset(file_list[0]) |
331 |
– |
#sys.exit() |
373 |
|
if (self.exit_status == '1'): |
374 |
|
return self.exit_status |
375 |
|
common.logger.info("--->>> End dataset publication") |
376 |
|
|
377 |
|
|
378 |
|
common.logger.info("--->>> Start files publication") |
338 |
– |
|
339 |
– |
### file_list composed by only fjr 'success' : |
379 |
|
for file in file_list: |
380 |
|
Blocks=self.publishAJobReport(file,self.processedData) |
381 |
|
if Blocks: |
395 |
|
common.logger.info("Close block error %s"%ex) |
396 |
|
|
397 |
|
if (len(self.noEventsFiles)>0): |
398 |
< |
common.logger.info("--->>> WARNING: "+str(len(self.noEventsFiles))+" files not published because they contain 0 events are:") |
398 |
> |
common.logger.info("--->>> WARNING: "+str(len(self.noEventsFiles))+" published files contain 0 events are:") |
399 |
|
for lfn in self.noEventsFiles: |
400 |
|
common.logger.info("------ LFN: %s"%lfn) |
401 |
|
if (len(self.noLFN)>0): |
408 |
|
common.logger.info("------ LFN: %s"%lfn) |
409 |
|
common.logger.info("--->>> End files publication") |
410 |
|
|
411 |
< |
#### for MULTI PUBLICATION added for #### |
411 |
> |
#### FEDE for MULTI #### |
412 |
|
for dataset_to_check in self.published_datasets: |
413 |
|
self.cfg_params['USER.dataset_to_check']=dataset_to_check |
414 |
|
from InspectDBS import InspectDBS |
415 |
|
check=InspectDBS(self.cfg_params) |
416 |
|
check.checkPublication() |
417 |
< |
|
417 |
> |
######################### |
418 |
> |
|
419 |
|
return self.exit_status |
420 |
< |
|
420 |
> |
|
421 |
|
else: |
422 |
|
common.logger.info("--->>> No valid files to publish on DBS. Your jobs do not report exit codes = 0") |
423 |
|
self.exit_status = '1' |
424 |
|
return self.exit_status |
425 |
+ |
|