18 |
|
class Publisher(Actor): |
19 |
|
def __init__(self, cfg_params): |
20 |
|
""" |
21 |
< |
Publisher class: |
21 |
> |
Publisher class: |
22 |
|
|
23 |
|
- parses CRAB FrameworkJobReport on UI |
24 |
|
- returns <file> section of xml in dictionary format for each xml file in crab_0_xxxx/res directory |
25 |
|
- publishes output data on DBS and DLS |
26 |
|
""" |
27 |
– |
|
27 |
|
self.cfg_params=cfg_params |
28 |
< |
|
28 |
> |
|
29 |
|
if not cfg_params.has_key('USER.publish_data_name'): |
30 |
|
raise CrabException('Cannot publish output data, because you did not specify USER.publish_data_name parameter in the crab.cfg file') |
31 |
< |
self.userprocessedData = cfg_params['USER.publish_data_name'] |
31 |
> |
self.userprocessedData = cfg_params['USER.publish_data_name'] |
32 |
|
self.processedData = None |
33 |
|
|
34 |
|
if (not cfg_params.has_key('USER.copy_data') or int(cfg_params['USER.copy_data']) != 1 ) or \ |
52 |
|
self.DBSURL=cfg_params['USER.dbs_url_for_publication'] |
53 |
|
common.logger.info('<dbs_url_for_publication> = '+self.DBSURL) |
54 |
|
if (self.DBSURL == "http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet") or (self.DBSURL == "https://cmsdbsprod.cern.ch:8443/cms_dbs_prod_global_writer/servlet/DBSServlet"): |
55 |
< |
msg = "You can not publish your data in the globalDBS = " + self.DBSURL + "\n" |
55 |
> |
msg = "You can not publish your data in the globalDBS = " + self.DBSURL + "\n" |
56 |
|
msg = msg + "Please write your local one in the [USER] section 'dbs_url_for_publication'" |
57 |
|
raise CrabException(msg) |
58 |
< |
|
58 |
> |
|
59 |
|
self.content=file(self.pset).read() |
60 |
|
self.resDir = common.work_space.resDir() |
61 |
< |
|
61 |
> |
|
62 |
|
self.dataset_to_import=[] |
63 |
< |
|
63 |
> |
|
64 |
|
self.datasetpath=cfg_params['CMSSW.datasetpath'] |
65 |
|
if (self.datasetpath.upper() != 'NONE'): |
66 |
|
self.dataset_to_import.append(self.datasetpath) |
67 |
< |
|
67 |
> |
|
68 |
|
### Added PU dataset |
69 |
|
tmp = cfg_params.get('CMSSW.dataset_pu',None) |
70 |
|
if tmp : |
72 |
|
for dataset in datasets: |
73 |
|
dataset=string.strip(dataset) |
74 |
|
self.dataset_to_import.append(dataset) |
75 |
< |
### |
76 |
< |
|
77 |
< |
#self.import_all_parents = cfg_params.get('USER.publish_with_import_all_parents',1) |
75 |
> |
### |
76 |
> |
|
77 |
> |
|
78 |
> |
self.import_all_parents = cfg_params.get('USER.publish_with_import_all_parents',1) |
79 |
> |
|
80 |
> |
### fede import parent dataset is compulsory ### |
81 |
> |
if ( int(self.import_all_parents) == 0 ): |
82 |
> |
common.logger.info("WARNING: The option USER.publish_with_import_all_parents=0 has been deprecated. The import of parents is compulsory and done by default") |
83 |
> |
############ |
84 |
> |
|
85 |
|
self.skipOcheck=cfg_params.get('CMSSW.publish_zero_event',0) |
86 |
< |
|
86 |
> |
|
87 |
|
self.SEName='' |
88 |
|
self.CMSSW_VERSION='' |
89 |
|
self.exit_status='' |
90 |
|
self.time = time.strftime('%y%m%d_%H%M%S',time.localtime(time.time())) |
91 |
< |
self.problemFiles=[] |
91 |
> |
self.problemFiles=[] |
92 |
|
self.noEventsFiles=[] |
93 |
|
self.noLFN=[] |
94 |
< |
|
94 |
> |
|
95 |
|
def importParentDataset(self,globalDBS, datasetpath): |
96 |
|
""" |
97 |
|
""" |
98 |
|
dbsWriter = DBSWriter(self.DBSURL,level='ERROR') |
99 |
< |
|
99 |
> |
|
100 |
|
try: |
101 |
|
#if (self.import_all_parents==1): |
102 |
|
common.logger.info("--->>> Importing all parents level") |
137 |
|
""" |
138 |
|
""" |
139 |
|
try: |
140 |
+ |
### fjr content as argument |
141 |
|
jobReport = readJobReport(file)[0] |
142 |
|
self.exit_status = '0' |
143 |
|
except IndexError: |
144 |
|
self.exit_status = '1' |
145 |
< |
msg = "Error: Problem with "+file+" file" |
145 |
> |
msg = "Error: Problem with "+file+" file" |
146 |
|
common.logger.info(msg) |
147 |
|
return self.exit_status |
148 |
|
|
149 |
+ |
#print "###################################################" |
150 |
+ |
#print "len(jobReport.files) = ", len(jobReport.files) |
151 |
+ |
#print "jobReport.files = ", jobReport.files |
152 |
+ |
#print "###################################################" |
153 |
+ |
|
154 |
|
if (len(self.dataset_to_import) != 0): |
155 |
|
for dataset in self.dataset_to_import: |
156 |
|
common.logger.info("--->>> Importing parent dataset in the dbs: " +dataset) |
159 |
|
common.logger.info('Problem with parent '+ dataset +' import from the global DBS '+self.globalDBS+ 'to the local one '+self.DBSURL) |
160 |
|
self.exit_status='1' |
161 |
|
return self.exit_status |
162 |
< |
else: |
162 |
> |
else: |
163 |
|
common.logger.info('Import ok of dataset '+dataset) |
164 |
< |
|
165 |
< |
#// DBS to contact |
166 |
< |
dbswriter = DBSWriter(self.DBSURL) |
155 |
< |
try: |
156 |
< |
fileinfo= jobReport.files[0] |
157 |
< |
self.exit_status = '0' |
158 |
< |
except IndexError: |
164 |
> |
|
165 |
> |
|
166 |
> |
if (len(jobReport.files) <= 0) : |
167 |
|
self.exit_status = '1' |
168 |
< |
msg = "Error: No EDM file to publish in xml file"+file+" file" |
168 |
> |
msg = "Error: No EDM file to publish in xml file"+file+" file" |
169 |
|
common.logger.info(msg) |
170 |
|
return self.exit_status |
171 |
+ |
else: |
172 |
+ |
print " fjr contains some files to publish" |
173 |
|
|
174 |
< |
datasets=fileinfo.dataset |
175 |
< |
common.logger.log(10-1,"FileInfo = " + str(fileinfo)) |
176 |
< |
common.logger.log(10-1,"DatasetInfo = " + str(datasets)) |
177 |
< |
if len(datasets)<=0: |
178 |
< |
self.exit_status = '1' |
169 |
< |
msg = "Error: No info about dataset in the xml file "+file |
170 |
< |
common.logger.info(msg) |
171 |
< |
return self.exit_status |
172 |
< |
for dataset in datasets: |
173 |
< |
#### for production data |
174 |
< |
self.processedData = dataset['ProcessedDataset'] |
175 |
< |
if (dataset['PrimaryDataset'] == 'null'): |
176 |
< |
#dataset['PrimaryDataset'] = dataset['ProcessedDataset'] |
177 |
< |
dataset['PrimaryDataset'] = self.userprocessedData |
178 |
< |
#else: # add parentage from input dataset |
179 |
< |
elif self.datasetpath.upper() != 'NONE': |
180 |
< |
dataset['ParentDataset']= self.datasetpath |
181 |
< |
|
182 |
< |
dataset['PSetContent']=self.content |
183 |
< |
cfgMeta = {'name' : self.pset , 'Type' : 'user' , 'annotation': 'user cfg', 'version' : 'private version'} # add real name of user cfg |
184 |
< |
common.logger.info("PrimaryDataset = %s"%dataset['PrimaryDataset']) |
185 |
< |
common.logger.info("ProcessedDataset = %s"%dataset['ProcessedDataset']) |
186 |
< |
common.logger.info("<User Dataset Name> = /"+dataset['PrimaryDataset']+"/"+dataset['ProcessedDataset']+"/USER") |
187 |
< |
self.dataset_to_check="/"+dataset['PrimaryDataset']+"/"+dataset['ProcessedDataset']+"/USER" |
188 |
< |
|
189 |
< |
common.logger.log(10-1,"--->>> Inserting primary: %s processed : %s"%(dataset['PrimaryDataset'],dataset['ProcessedDataset'])) |
190 |
< |
|
191 |
< |
primary = DBSWriterObjects.createPrimaryDataset( dataset, dbswriter.dbs) |
192 |
< |
common.logger.log(10-1,"Primary: %s "%primary) |
193 |
< |
|
194 |
< |
algo = DBSWriterObjects.createAlgorithm(dataset, cfgMeta, dbswriter.dbs) |
195 |
< |
common.logger.log(10-1,"Algo: %s "%algo) |
174 |
> |
#### datasets creation in dbs |
175 |
> |
#// DBS to contact write and read of the same dbs |
176 |
> |
dbsReader = DBSReader(self.DBSURL,level='ERROR') |
177 |
> |
dbswriter = DBSWriter(self.DBSURL) |
178 |
> |
##### |
179 |
|
|
180 |
< |
processed = DBSWriterObjects.createProcessedDataset(primary, algo, dataset, dbswriter.dbs) |
181 |
< |
common.logger.log(10-1,"Processed: %s "%processed) |
182 |
< |
|
183 |
< |
common.logger.log(10-1,"Inserted primary %s processed %s"%(primary,processed)) |
184 |
< |
|
180 |
> |
self.published_datasets = [] |
181 |
> |
for fileinfo in jobReport.files: |
182 |
> |
#print "--->>> nel for fileinfo = ", fileinfo |
183 |
> |
datasets_info=fileinfo.dataset |
184 |
> |
#print "--->>> nel for datasets_info = ", datasets_info |
185 |
> |
if len(datasets_info)<=0: |
186 |
> |
self.exit_status = '1' |
187 |
> |
msg = "Error: No info about dataset in the xml file "+file |
188 |
> |
common.logger.info(msg) |
189 |
> |
return self.exit_status |
190 |
> |
else: |
191 |
> |
for dataset in datasets_info: |
192 |
> |
#### for production data |
193 |
> |
self.processedData = dataset['ProcessedDataset'] |
194 |
> |
if (dataset['PrimaryDataset'] == 'null'): |
195 |
> |
dataset['PrimaryDataset'] = self.userprocessedData |
196 |
> |
elif self.datasetpath.upper() != 'NONE': |
197 |
> |
dataset['ParentDataset']= self.datasetpath |
198 |
> |
|
199 |
> |
dataset['PSetContent']=self.content |
200 |
> |
cfgMeta = {'name' : self.pset , 'Type' : 'user' , 'annotation': 'user cfg', 'version' : 'private version'} # add real name of user cfg |
201 |
> |
common.logger.info("PrimaryDataset = %s"%dataset['PrimaryDataset']) |
202 |
> |
common.logger.info("ProcessedDataset = %s"%dataset['ProcessedDataset']) |
203 |
> |
common.logger.info("<User Dataset Name> = /"+dataset['PrimaryDataset']+"/"+dataset['ProcessedDataset']+"/USER") |
204 |
> |
|
205 |
> |
self.dataset_to_check="/"+dataset['PrimaryDataset']+"/"+dataset['ProcessedDataset']+"/USER" |
206 |
> |
|
207 |
> |
|
208 |
> |
self.published_datasets.append(self.dataset_to_check) |
209 |
> |
|
210 |
> |
common.logger.log(10-1,"--->>> Inserting primary: %s processed : %s"%(dataset['PrimaryDataset'],dataset['ProcessedDataset'])) |
211 |
> |
|
212 |
> |
#### check if dataset already exists in the DBS |
213 |
> |
result = dbsReader.matchProcessedDatasets(dataset['PrimaryDataset'], 'USER', dataset['ProcessedDataset']) |
214 |
> |
#print "result = ",result |
215 |
> |
if (len(result) != 0): |
216 |
> |
result = dbsReader.listDatasetFiles(self.dataset_to_check) |
217 |
> |
#if (len(result) == 0): |
218 |
> |
#print "len nulla" |
219 |
> |
#else: |
220 |
> |
#print "len non nulla" |
221 |
> |
# result = dbsReader.listDatasetFiles(self.dataset_to_check) |
222 |
> |
#print "result = ", result |
223 |
> |
|
224 |
> |
primary = DBSWriterObjects.createPrimaryDataset( dataset, dbswriter.dbs) |
225 |
> |
common.logger.log(10-1,"Primary: %s "%primary) |
226 |
> |
print "primary = ", primary |
227 |
> |
|
228 |
> |
algo = DBSWriterObjects.createAlgorithm(dataset, cfgMeta, dbswriter.dbs) |
229 |
> |
common.logger.log(10-1,"Algo: %s "%algo) |
230 |
> |
|
231 |
> |
processed = DBSWriterObjects.createProcessedDataset(primary, algo, dataset, dbswriter.dbs) |
232 |
> |
common.logger.log(10-1,"Processed: %s "%processed) |
233 |
> |
print "processed = ", processed |
234 |
> |
|
235 |
> |
common.logger.log(10-1,"Inserted primary %s processed %s"%(primary,processed)) |
236 |
> |
####################################################################################### |
237 |
> |
|
238 |
|
common.logger.log(10-1,"exit_status = %s "%self.exit_status) |
239 |
< |
return self.exit_status |
239 |
> |
return self.exit_status |
240 |
|
|
241 |
|
def publishAJobReport(self,file,procdataset): |
242 |
|
""" |
243 |
|
input: xml file, processedDataset |
244 |
|
""" |
245 |
+ |
print "--->>> in publishAJobReport : " |
246 |
|
common.logger.debug("FJR = %s"%file) |
247 |
|
try: |
248 |
|
jobReport = readJobReport(file)[0] |
277 |
|
### Fede to insert also run and lumi info in DBS |
278 |
|
#file.runs = {} |
279 |
|
for ds in file.dataset: |
280 |
< |
### Fede for production |
280 |
> |
### For production |
281 |
|
if (ds['PrimaryDataset'] == 'null'): |
282 |
|
ds['PrimaryDataset']=self.userprocessedData |
283 |
|
filestopublish.append(file) |
284 |
< |
|
284 |
> |
|
285 |
> |
### only good files will be published |
286 |
|
jobReport.files = filestopublish |
287 |
+ |
print "------>>> filestopublish = ", filestopublish |
288 |
|
for file in filestopublish: |
289 |
|
common.logger.debug("--->>> LFN of file to publish = " + str(file['LFN'])) |
290 |
+ |
print "--->>> LFN of file to publish = ", str(file['LFN']) |
291 |
|
### if all files of FJR have number of events = 0 |
292 |
|
if (len(filestopublish) == 0): |
293 |
|
return None |
294 |
< |
|
294 |
> |
|
295 |
|
#// DBS to contact |
296 |
|
dbswriter = DBSWriter(self.DBSURL) |
297 |
|
# insert files |
299 |
|
try: |
300 |
|
### FEDE added insertDetectorData = True to propagate in DBS info about run and lumi |
301 |
|
Blocks=dbswriter.insertFiles(jobReport, insertDetectorData = True) |
262 |
– |
#Blocks=dbswriter.insertFiles(jobReport) |
302 |
|
common.logger.debug("--->>> Inserting file in blocks = %s"%Blocks) |
303 |
|
except DBSWriterError, ex: |
304 |
|
common.logger.debug("--->>> Insert file error: %s"%ex) |
308 |
|
""" |
309 |
|
parse of all xml file on res dir and creation of distionary |
310 |
|
""" |
311 |
< |
|
311 |
> |
|
312 |
|
file_list = glob.glob(self.resDir+"crab_fjr*.xml") |
313 |
< |
|
313 |
> |
|
314 |
|
## Select only those fjr that are succesfull |
315 |
|
if (len(file_list)==0): |
316 |
< |
common.logger.info("--->>> "+self.resDir+" empty: no file to publish on DBS") |
316 |
> |
common.logger.info("--->>> "+self.resDir+" empty: no fjr files in the res dir to publish on DBS") |
317 |
|
self.exit_status = '1' |
318 |
|
return self.exit_status |
319 |
|
|
320 |
|
good_list=[] |
321 |
+ |
|
322 |
|
for fjr in file_list: |
323 |
|
reports = readJobReport(fjr) |
324 |
|
if len(reports)>0: |
325 |
+ |
### with backup-copy the wrapper_exit_code is 60308 --> failed |
326 |
|
if reports[0].status == "Success": |
327 |
|
good_list.append(fjr) |
328 |
+ |
|
329 |
|
file_list=good_list |
330 |
+ |
print "fjr ok for publication are good_list = ", good_list |
331 |
|
## |
332 |
< |
common.logger.log(10-1, "file_list = "+str(file_list)) |
332 |
> |
common.logger.log(10-1, "fjr with FrameworkJobReport Status='Success', file_list = "+str(file_list)) |
333 |
|
common.logger.log(10-1, "len(file_list) = "+str(len(file_list))) |
334 |
< |
|
334 |
> |
|
335 |
|
if (len(file_list)>0): |
336 |
|
BlocksList=[] |
337 |
|
common.logger.info("--->>> Start dataset publication") |
338 |
+ |
### primo fjr trovato |
339 |
|
self.exit_status=self.publishDataset(file_list[0]) |
340 |
+ |
#sys.exit() |
341 |
|
if (self.exit_status == '1'): |
342 |
< |
return self.exit_status |
342 |
> |
return self.exit_status |
343 |
|
common.logger.info("--->>> End dataset publication") |
344 |
|
|
345 |
|
|
346 |
|
common.logger.info("--->>> Start files publication") |
347 |
+ |
|
348 |
+ |
### file_list composed by only fjr 'success' : |
349 |
|
for file in file_list: |
350 |
|
Blocks=self.publishAJobReport(file,self.processedData) |
351 |
|
if Blocks: |
352 |
|
for x in Blocks: # do not allow multiple entries of the same block |
353 |
|
if x not in BlocksList: |
354 |
|
BlocksList.append(x) |
355 |
< |
|
355 |
> |
|
356 |
|
# close the blocks |
357 |
|
common.logger.log(10-1, "BlocksList = %s"%BlocksList) |
358 |
|
dbswriter = DBSWriter(self.DBSURL) |
359 |
< |
|
359 |
> |
|
360 |
|
for BlockName in BlocksList: |
361 |
< |
try: |
361 |
> |
try: |
362 |
|
closeBlock=dbswriter.manageFileBlock(BlockName,maxFiles= 1) |
363 |
|
common.logger.log(10-1, "closeBlock %s"%closeBlock) |
364 |
|
except DBSWriterError, ex: |
377 |
|
for lfn in self.problemFiles: |
378 |
|
common.logger.info("------ LFN: %s"%lfn) |
379 |
|
common.logger.info("--->>> End files publication") |
333 |
– |
|
334 |
– |
self.cfg_params['USER.dataset_to_check']=self.dataset_to_check |
335 |
– |
from InspectDBS import InspectDBS |
336 |
– |
check=InspectDBS(self.cfg_params) |
337 |
– |
check.checkPublication() |
338 |
– |
return self.exit_status |
380 |
|
|
381 |
+ |
#### for MULTI PUBLICATION added for #### |
382 |
+ |
for dataset_to_check in self.published_datasets: |
383 |
+ |
self.cfg_params['USER.dataset_to_check']=dataset_to_check |
384 |
+ |
from InspectDBS import InspectDBS |
385 |
+ |
check=InspectDBS(self.cfg_params) |
386 |
+ |
check.checkPublication() |
387 |
+ |
|
388 |
+ |
return self.exit_status |
389 |
+ |
|
390 |
|
else: |
391 |
|
common.logger.info("--->>> No valid files to publish on DBS. Your jobs do not report exit codes = 0") |
392 |
|
self.exit_status = '1' |
393 |
|
return self.exit_status |
344 |
– |
|