5 |
|
from crab_util import * |
6 |
|
from crab_logger import Logger |
7 |
|
from crab_exceptions import * |
8 |
< |
from FwkJobRep.ReportParser import readJobReport |
8 |
> |
from ProdCommon.FwkJobRep.ReportParser import readJobReport |
9 |
|
from ProdCommon.MCPayloads.WorkflowSpec import WorkflowSpec |
10 |
|
from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter |
11 |
|
from ProdCommon.DataMgmt.DBS.DBSErrors import DBSWriterError, formatEx,DBSReaderError |
24 |
|
""" |
25 |
|
|
26 |
|
try: |
27 |
< |
self.processedData = cfg_params['USER.publish_data_name'] |
27 |
> |
userprocessedData = cfg_params['USER.publish_data_name'] |
28 |
> |
self.processedData = None |
29 |
|
except KeyError: |
30 |
|
raise CrabException('Cannot publish output data, because you did not specify USER.publish_data_name parameter in the crab.cfg file') |
31 |
+ |
|
32 |
|
try: |
33 |
|
if (int(cfg_params['USER.copy_data']) != 1): raise KeyError |
34 |
|
except KeyError: |
49 |
|
msg = msg + "Please write your local one in the [USER] section 'dbs_url_for_publication'" |
50 |
|
raise CrabException(msg) |
51 |
|
except KeyError: |
52 |
< |
msg = "Error. The [USER] section does not have 'dbs_url_for_publication'" |
53 |
< |
msg = msg + " entry, necessary to publish the data" |
52 |
> |
msg = "Warning. The [USER] section does not have 'dbs_url_for_publication'" |
53 |
> |
msg = msg + " entry, necessary to publish the data.\n" |
54 |
> |
msg = msg + "Use the command **crab -publish -USER.dbs_url_for_publication=dbs_url_for_publication*** \nwhere dbs_url_for_publication is your local dbs instance." |
55 |
|
raise CrabException(msg) |
56 |
|
|
57 |
|
self.content=file(self.pset).read() |
58 |
|
self.resDir = common.work_space.resDir() |
59 |
+ |
|
60 |
+ |
self.dataset_to_import=[] |
61 |
+ |
|
62 |
|
self.datasetpath=cfg_params['CMSSW.datasetpath'] |
63 |
+ |
if (self.datasetpath.upper() != 'NONE'): |
64 |
+ |
self.dataset_to_import.append(self.datasetpath) |
65 |
+ |
|
66 |
+ |
### Added PU dataset |
67 |
+ |
tmp = cfg_params.get('CMSSW.dataset_pu',None) |
68 |
+ |
if tmp : |
69 |
+ |
datasets = tmp.split(',') |
70 |
+ |
for dataset in datasets: |
71 |
+ |
dataset=string.strip(dataset) |
72 |
+ |
self.dataset_to_import.append(dataset) |
73 |
+ |
### |
74 |
+ |
|
75 |
|
self.SEName='' |
76 |
|
self.CMSSW_VERSION='' |
77 |
|
self.exit_status='' |
86 |
|
dbsWriter = DBSWriter(self.DBSURL,level='ERROR') |
87 |
|
|
88 |
|
try: |
89 |
< |
dbsWriter.importDataset(globalDBS, self.datasetpath, self.DBSURL) |
89 |
> |
dbsWriter.importDatasetWithoutParentage(globalDBS, self.datasetpath, self.DBSURL) |
90 |
|
except DBSWriterError, ex: |
91 |
|
msg = "Error importing dataset to be processed into local DBS\n" |
92 |
|
msg += "Source Dataset: %s\n" % datasetpath |
107 |
|
msg = "Error: Problem with "+file+" file" |
108 |
|
common.logger.message(msg) |
109 |
|
return self.exit_status |
110 |
< |
|
111 |
< |
if (self.datasetpath != 'None'): |
112 |
< |
common.logger.message("--->>> Importing parent dataset in the dbs") |
113 |
< |
status_import=self.importParentDataset(self.globalDBS, self.datasetpath) |
114 |
< |
if (status_import == 1): |
115 |
< |
common.logger.message('Problem with parent import from the global DBS '+self.globalDBS+ 'to the local one '+self.DBSURL) |
116 |
< |
self.exit_status='1' |
117 |
< |
return self.exit_status |
118 |
< |
common.logger.message("Parent import ok") |
110 |
> |
|
111 |
> |
if (len(self.dataset_to_import) != 0): |
112 |
> |
for dataset in self.dataset_to_import: |
113 |
> |
common.logger.message("--->>> Importing parent dataset in the dbs: " +dataset) |
114 |
> |
status_import=self.importParentDataset(self.globalDBS, dataset) |
115 |
> |
if (status_import == 1): |
116 |
> |
common.logger.message('Problem with parent '+ dataset +' import from the global DBS '+self.globalDBS+ 'to the local one '+self.DBSURL) |
117 |
> |
self.exit_status='1' |
118 |
> |
return self.exit_status |
119 |
> |
else: |
120 |
> |
common.logger.message('Import ok of dataset '+dataset) |
121 |
|
|
122 |
|
#// DBS to contact |
123 |
|
dbswriter = DBSWriter(self.DBSURL) |
135 |
|
common.logger.debug(6,"DatasetInfo = " + str(datasets)) |
136 |
|
for dataset in datasets: |
137 |
|
#### for production data |
138 |
+ |
self.processedData = dataset['ProcessedDataset'] |
139 |
|
if (dataset['PrimaryDataset'] == 'null'): |
140 |
|
dataset['PrimaryDataset'] = dataset['ProcessedDataset'] |
141 |
< |
|
141 |
> |
else: # add parentage from input dataset |
142 |
> |
dataset['ParentDataset']= self.datasetpath |
143 |
> |
|
144 |
|
dataset['PSetContent']=self.content |
145 |
|
cfgMeta = {'name' : self.pset , 'Type' : 'user' , 'annotation': 'user cfg', 'version' : 'private version'} # add real name of user cfg |
146 |
|
common.logger.message("PrimaryDataset = %s"%dataset['PrimaryDataset']) |
186 |
|
self.noLFN.append(file['PFN']) |
187 |
|
else: |
188 |
|
if int(file['TotalEvents']) != 0 : |
189 |
< |
file.lumisections = {} |
189 |
> |
#file.lumisections = {} |
190 |
> |
# lumi info are now in run hash |
191 |
> |
file.runs = {} |
192 |
|
for ds in file.dataset: |
193 |
< |
ds['ProcessedDataset']=procdataset |
193 |
> |
### FEDE FOR NEW LFN ### |
194 |
> |
#ds['ProcessedDataset']=procdataset |
195 |
> |
######################## |
196 |
|
### Fede for production |
197 |
|
if (ds['PrimaryDataset'] == 'null'): |
198 |
|
ds['PrimaryDataset']=procdataset |
210 |
|
Blocks=None |
211 |
|
try: |
212 |
|
Blocks=dbswriter.insertFiles(jobReport) |
213 |
< |
common.logger.message("Blocks = %s"%Blocks) |
213 |
> |
common.logger.message("Inserting file in blocks = %s"%Blocks) |
214 |
|
except DBSWriterError, ex: |
215 |
< |
common.logger.message("Insert file error: %s"%ex) |
215 |
> |
common.logger.error("Insert file error: %s"%ex) |
216 |
|
return Blocks |
217 |
|
|
218 |
|
def run(self): |
235 |
|
|
236 |
|
common.logger.message("--->>> Start files publication") |
237 |
|
for file in file_list: |
238 |
< |
common.logger.message("file = "+file) |
238 |
> |
common.logger.debug(1, "file = "+file) |
239 |
|
Blocks=self.publishAJobReport(file,self.processedData) |
240 |
|
if Blocks: |
241 |
< |
[BlocksList.append(x) for x in Blocks] |
241 |
> |
for x in Blocks: # do not allow multiple entries of the same block |
242 |
> |
if x not in BlocksList: |
243 |
> |
BlocksList.append(x) |
244 |
|
|
245 |
|
# close the blocks |
246 |
|
common.logger.debug(6, "BlocksList = %s"%BlocksList) |