1 |
< |
import sys, getopt, string |
1 |
> |
import getopt, string |
2 |
|
import common |
3 |
|
import time, glob |
4 |
|
from Actor import * |
5 |
– |
from FwkJobRep.ReportParser import readJobReport |
5 |
|
from crab_util import * |
6 |
|
from crab_logger import Logger |
7 |
|
from crab_exceptions import * |
8 |
+ |
from FwkJobRep.ReportParser import readJobReport |
9 |
|
from ProdCommon.MCPayloads.WorkflowSpec import WorkflowSpec |
10 |
|
from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter |
11 |
|
from ProdCommon.DataMgmt.DBS.DBSErrors import DBSWriterError, formatEx,DBSReaderError |
12 |
|
from ProdCommon.DataMgmt.DBS.DBSReader import DBSReader |
13 |
< |
import ProdCommon.DataMgmt.DBS.DBSWriterObjects as DBSWriterObjects |
14 |
< |
|
15 |
< |
|
13 |
> |
from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter,DBSWriterObjects |
14 |
> |
import sys |
15 |
|
|
16 |
|
class Publisher(Actor): |
17 |
|
def __init__(self, cfg_params): |
33 |
|
except KeyError: |
34 |
|
raise CrabException('You can not publish data because you did not selected *** copy_data = 1 *** in the crab.cfg file') |
35 |
|
|
36 |
< |
common.logger.message('self.processedData = '+self.processedData) |
36 |
> |
#common.logger.message('processedData = '+self.processedData) |
37 |
|
self.resDir = common.work_space.resDir() |
38 |
< |
common.logger.message('self.resDir = '+self.resDir) |
40 |
< |
#old api self.DBSURL='http://cmssrv18.fnal.gov:8989/DBS/servlet/DBSServlet' |
38 |
> |
#common.logger.message('resDir = '+self.resDir) |
39 |
|
self.DBSURL=cfg_params['USER.dbs_url_for_publication'] |
40 |
< |
#self.DBSURL='http://cmssrv17.fnal.gov:8989/DBS_1_0_4_pre2/servlet/DBSServlet' |
43 |
< |
common.logger.message('self.DBSURL = '+self.DBSURL) |
40 |
> |
common.logger.message('dbs url = '+self.DBSURL) |
41 |
|
self.datasetpath=cfg_params['CMSSW.datasetpath'] |
42 |
< |
common.logger.message('self.datasetpath = '+self.datasetpath) |
42 |
> |
#common.logger.message('datasetpath = '+self.datasetpath) |
43 |
|
self.SEName='' |
44 |
|
self.CMSSW_VERSION='' |
45 |
|
self.exit_status='' |
46 |
|
self.time = time.strftime('%y%m%d_%H%M%S',time.localtime(time.time())) |
47 |
|
|
51 |
– |
|
48 |
|
def importParentDataset(self,globalDBS, datasetpath): |
49 |
|
""" |
50 |
|
""" |
51 |
|
dbsWriter = DBSWriter(self.DBSURL,level='ERROR') |
52 |
|
|
53 |
|
try: |
58 |
– |
common.logger.message("----->>>>importing parent dataset in the local dbs") |
54 |
|
dbsWriter.importDataset(globalDBS, self.datasetpath, self.DBSURL) |
55 |
|
except DBSWriterError, ex: |
56 |
|
msg = "Error importing dataset to be processed into local DBS\n" |
66 |
|
""" |
67 |
|
try: |
68 |
|
jobReport = readJobReport(file)[0] |
74 |
– |
msg = "--->>> reading "+file+" file" |
75 |
– |
common.logger.message(msg) |
69 |
|
self.exit_status = '0' |
70 |
|
except IndexError: |
71 |
|
self.exit_status = '1' |
72 |
|
msg = "Error: Problem with "+file+" file" |
73 |
|
common.logger.message(msg) |
74 |
|
return self.exit_status |
75 |
< |
#####for parents information import ######################################### |
75 |
> |
|
76 |
|
#### the globalDBS has to be written in the crab cfg file!!!!! ############### |
84 |
– |
globalDBS="http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet" |
77 |
|
if (self.datasetpath != 'None'): |
78 |
+ |
globalDBS="http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet" |
79 |
+ |
common.logger.message("--->>> Importing parent dataset in the dbs") |
80 |
|
status_import=self.importParentDataset(globalDBS, self.datasetpath) |
81 |
|
if (status_import == 1): |
82 |
|
common.logger.message('Problem with parent import from the global DBS '+globalDBS+ 'to the local one '+self.DBSURL) |
83 |
|
self.exit_status='1' |
90 |
– |
############################################################################### |
84 |
|
## ___ >>>>>>> comment out the next line, if you have problem with the import |
92 |
– |
############################################################################### |
85 |
|
return self.exit_status |
86 |
< |
pass |
86 |
> |
common.logger.message("Parent import ok") |
87 |
> |
|
88 |
|
#// DBS to contact |
89 |
< |
dbswriter = DBSWriter(self.DBSURL,level='ERROR') |
97 |
< |
# publish a dataset : it should be done only once for the task |
98 |
< |
# and not for all the JobReport |
89 |
> |
dbswriter = DBSWriter(self.DBSURL) |
90 |
|
try: |
91 |
|
fileinfo= jobReport.files[0] |
92 |
+ |
#fileinfo.lumisections = {} |
93 |
|
self.exit_status = '0' |
94 |
|
except IndexError: |
95 |
|
self.exit_status = '1' |
97 |
|
common.logger.message(msg) |
98 |
|
return self.exit_status |
99 |
|
|
100 |
< |
common.logger.message("FileInfo = " + str(fileinfo)) |
100 |
> |
#common.logger.message("FileInfo = " + str(fileinfo)) |
101 |
|
datasets=fileinfo.dataset |
102 |
< |
common.logger.message("DatasetInfo = " + str(datasets)) |
102 |
> |
#common.logger.message("DatasetInfo = " + str(datasets)) |
103 |
|
for dataset in datasets: |
104 |
|
#### to understand how to fill cfgMeta info ############### |
105 |
|
cfgMeta = {'name' : 'usercfg' , 'Type' : 'user' , 'annotation': 'user cfg', 'version' : 'private version'} # add real name of user cfg |
106 |
|
common.logger.message("PrimaryDataset = %s"%dataset['PrimaryDataset']) |
107 |
|
common.logger.message("ProcessedDataset = %s"%dataset['ProcessedDataset']) |
108 |
< |
common.logger.message("Inserting primary: %s processed : %s"%(dataset['PrimaryDataset'],dataset['ProcessedDataset'])) |
108 |
> |
common.logger.message("--->>> Inserting primary: %s processed : %s"%(dataset['PrimaryDataset'],dataset['ProcessedDataset'])) |
109 |
> |
#common.logger.message("dataset: %s"%dataset) |
110 |
|
|
111 |
|
primary = DBSWriterObjects.createPrimaryDataset( dataset, dbswriter.dbs) |
112 |
+ |
common.logger.message("Primary: %s "%primary) |
113 |
|
|
114 |
|
algo = DBSWriterObjects.createAlgorithm(dataset, cfgMeta, dbswriter.dbs) |
115 |
+ |
common.logger.message("Algo: %s "%algo) |
116 |
|
|
117 |
|
processed = DBSWriterObjects.createProcessedDataset(primary, algo, dataset, dbswriter.dbs) |
118 |
+ |
common.logger.message("Processed: %s "%processed) |
119 |
|
|
120 |
|
common.logger.message("Inserted primary %s processed %s"%(primary,processed)) |
121 |
+ |
|
122 |
+ |
#common.logger.message("exit_status = %s "%self.exit_status) |
123 |
|
return self.exit_status |
124 |
|
|
125 |
|
def publishAJobReport(self,file,procdataset): |
126 |
|
""" |
127 |
+ |
input: xml file, processedDataset |
128 |
|
""" |
129 |
|
try: |
130 |
|
jobReport = readJobReport(file)[0] |
133 |
|
self.exit_status = '1' |
134 |
|
msg = "Error: Problem with "+file+" file" |
135 |
|
raise CrabException(msg) |
136 |
< |
|
137 |
< |
# overwite ProcessedDataset with user defined value |
136 |
> |
# overwrite ProcessedDataset with user defined value |
137 |
> |
# overwrite lumisections with no value |
138 |
|
for file in jobReport.files: |
139 |
+ |
file.lumisections = {} |
140 |
|
for ds in file.dataset: |
141 |
|
ds['ProcessedDataset']=procdataset |
142 |
|
#// DBS to contact |
143 |
< |
dbswriter = DBSWriter(self.DBSURL,level='ERROR') |
143 |
> |
dbswriter = DBSWriter(self.DBSURL) |
144 |
|
# insert files |
145 |
|
Blocks=None |
146 |
|
try: |
147 |
|
Blocks=dbswriter.insertFiles(jobReport) |
148 |
< |
common.logger.message("------>>>> Blocks = %s"%Blocks) |
148 |
> |
common.logger.message("Blocks = %s"%Blocks) |
149 |
|
except DBSWriterError, ex: |
150 |
< |
common.logger.message("insert file error: %s"%ex) |
150 |
> |
common.logger.message("Insert file error: %s"%ex) |
151 |
|
return Blocks |
152 |
|
|
153 |
|
def run(self): |
154 |
|
""" |
155 |
|
parse of all xml file on res dir and creation of distionary |
156 |
|
""" |
157 |
< |
common.logger.message("Starting data publish") |
157 |
> |
|
158 |
|
file_list = glob.glob(self.resDir+"crab_fjr*.xml") |
159 |
|
common.logger.debug(6, "file_list = "+str(file_list)) |
160 |
|
common.logger.debug(6, "len(file_list) = "+str(len(file_list))) |
161 |
< |
# FIXME: |
162 |
< |
# do the dataset publication self.publishDataset here |
163 |
< |
# |
161 |
> |
|
162 |
|
if (len(file_list)>0): |
163 |
|
BlocksList=[] |
164 |
+ |
common.logger.message("--->>> Start dataset publication") |
165 |
+ |
self.exit_status=self.publishDataset(file_list[0]) |
166 |
+ |
if (self.exit_status == '1'): |
167 |
+ |
return self.exit_status |
168 |
+ |
common.logger.message("--->>> End dataset publication") |
169 |
+ |
|
170 |
+ |
|
171 |
+ |
common.logger.message("--->>> Start files publication") |
172 |
|
for file in file_list: |
173 |
|
common.logger.message("file = "+file) |
168 |
– |
common.logger.message("Publishing dataset") |
169 |
– |
self.exit_status=self.publishDataset(file) |
170 |
– |
if (self.exit_status == '1'): |
171 |
– |
return self.exit_status |
172 |
– |
common.logger.message("Publishing files") |
174 |
|
Blocks=self.publishAJobReport(file,self.processedData) |
175 |
|
if Blocks: |
176 |
|
[BlocksList.append(x) for x in Blocks] |
177 |
+ |
|
178 |
|
# close the blocks |
179 |
< |
common.logger.message("------>>>> BlocksList = %s"%BlocksList) |
180 |
< |
dbswriter = DBSWriter(self.DBSURL,level='ERROR') |
179 |
> |
common.logger.message("BlocksList = %s"%BlocksList) |
180 |
> |
# dbswriter = DBSWriter(self.DBSURL,level='ERROR') |
181 |
> |
dbswriter = DBSWriter(self.DBSURL) |
182 |
> |
|
183 |
|
for BlockName in BlocksList: |
184 |
|
try: |
185 |
|
closeBlock=dbswriter.manageFileBlock(BlockName,maxFiles= 1) |
186 |
< |
common.logger.message("------>>>> closeBlock %s"%closeBlock) |
186 |
> |
common.logger.message("closeBlock %s"%closeBlock) |
187 |
|
#dbswriter.dbs.closeBlock(BlockName) |
188 |
|
except DBSWriterError, ex: |
189 |
< |
common.logger.message("------>>>> close block error %s"%ex) |
190 |
< |
|
189 |
> |
common.logger.message("Close block error %s"%ex) |
190 |
> |
common.logger.message("--->>> End files publication") |
191 |
|
return self.exit_status |
192 |
|
|
193 |
|
else: |
194 |
< |
common.logger.message(self.resDir+" empty --> No file to publish on DBS/DLS") |
194 |
> |
common.logger.message("--->>> "+self.resDir+" empty: no file to publish on DBS") |
195 |
|
self.exit_status = '1' |
196 |
|
return self.exit_status |
197 |
|
|