ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Publisher.py
Revision: 1.45
Committed: Tue Oct 27 13:01:23 2009 UTC (15 years, 6 months ago) by mcinquil
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_7_1, CRAB_2_7_1_pre12, CRAB_2_7_1_pre11, CRAB_2_7_1_pre10, CRAB_2_7_1_pre9, CRAB_LumiMask, CRAB_2_7_lumi, from_LimiMask, CRAB_2_7_1_pre8, CRAB_2_7_1_pre6, CRAB_2_7_1_pre5, CRAB_2_7_1_wmbs_pre4, CRAB_2_7_1_pre4, CRAB_2_7_1_pre3, CRAB_2_7_1_pre2, CRAB_2_7_1_pre1, CRAB_2_7_0, CRAB_2_7_0_pre8, CRAB_2_7_0_pre7, CRAB_2_7_0_pre6
Branch point for: CRAB_2_7_1_branch, Lumi2_8
Changes since 1.44: +16 -14 lines
Log Message:
Removing deprecated import_all_parents=0 feature

File Contents

# User Rev Content
1 fanzago 1.2 import getopt, string
2 slacapra 1.1 import common
3     import time, glob
4     from Actor import *
5     from crab_util import *
6     from crab_exceptions import *
7 fanzago 1.9 from ProdCommon.FwkJobRep.ReportParser import readJobReport
8 afanfani 1.19 from ProdCommon.FwkJobRep.ReportState import checkSuccess
9 slacapra 1.1 from ProdCommon.MCPayloads.WorkflowSpec import WorkflowSpec
10     from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter
11     from ProdCommon.DataMgmt.DBS.DBSErrors import DBSWriterError, formatEx,DBSReaderError
12     from ProdCommon.DataMgmt.DBS.DBSReader import DBSReader
13 fanzago 1.2 from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter,DBSWriterObjects
14     import sys
15 fanzago 1.35 from DBSAPI.dbsApiException import DbsException
16     from DBSAPI.dbsApi import DbsApi
17 slacapra 1.1
18     class Publisher(Actor):
19     def __init__(self, cfg_params):
20     """
21     Publisher class:
22    
23     - parses CRAB FrameworkJobReport on UI
24     - returns <file> section of xml in dictionary format for each xml file in crab_0_xxxx/res directory
25     - publishes output data on DBS and DLS
26     """
27    
28 fanzago 1.28 self.cfg_params=cfg_params
29    
30 slacapra 1.32 if not cfg_params.has_key('USER.publish_data_name'):
31 slacapra 1.1 raise CrabException('Cannot publish output data, because you did not specify USER.publish_data_name parameter in the crab.cfg file')
32 slacapra 1.30 self.userprocessedData = cfg_params['USER.publish_data_name']
33     self.processedData = None
34 slacapra 1.8
35 slacapra 1.30 if (not cfg_params.has_key('USER.copy_data') or int(cfg_params['USER.copy_data']) != 1 ) or \
36     (not cfg_params.has_key('USER.publish_data') or int(cfg_params['USER.publish_data']) != 1 ):
37 spiga 1.22 msg = 'You can not publish data because you did not selected \n'
38 slacapra 1.34 msg += '\t*** copy_data = 1 and publish_data = 1 *** in the crab.cfg file'
39     raise CrabException(msg)
40 slacapra 1.30
41 slacapra 1.33 if not cfg_params.has_key('CMSSW.pset'):
42     raise CrabException('Cannot publish output data, because you did not specify the psetname in [CMSSW] of your crab.cfg file')
43     self.pset = cfg_params['CMSSW.pset']
44 slacapra 1.30
45     self.globalDBS=cfg_params.get('CMSSW.dbs_url',"http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet")
46    
47     if not cfg_params.has_key('USER.dbs_url_for_publication'):
48 fanzago 1.11 msg = "Warning. The [USER] section does not have 'dbs_url_for_publication'"
49     msg = msg + " entry, necessary to publish the data.\n"
50     msg = msg + "Use the command **crab -publish -USER.dbs_url_for_publication=dbs_url_for_publication*** \nwhere dbs_url_for_publication is your local dbs instance."
51 fanzago 1.3 raise CrabException(msg)
52 slacapra 1.30
53     self.DBSURL=cfg_params['USER.dbs_url_for_publication']
54     common.logger.info('<dbs_url_for_publication> = '+self.DBSURL)
55     if (self.DBSURL == "http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet") or (self.DBSURL == "https://cmsdbsprod.cern.ch:8443/cms_dbs_prod_global_writer/servlet/DBSServlet"):
56     msg = "You can not publish your data in the globalDBS = " + self.DBSURL + "\n"
57     msg = msg + "Please write your local one in the [USER] section 'dbs_url_for_publication'"
58     raise CrabException(msg)
59 fanzago 1.4
60     self.content=file(self.pset).read()
61     self.resDir = common.work_space.resDir()
62 fanzago 1.12
63     self.dataset_to_import=[]
64    
65 slacapra 1.1 self.datasetpath=cfg_params['CMSSW.datasetpath']
66 fanzago 1.12 if (self.datasetpath.upper() != 'NONE'):
67     self.dataset_to_import.append(self.datasetpath)
68    
69     ### Added PU dataset
70 spiga 1.13 tmp = cfg_params.get('CMSSW.dataset_pu',None)
71 fanzago 1.12 if tmp :
72     datasets = tmp.split(',')
73     for dataset in datasets:
74     dataset=string.strip(dataset)
75     self.dataset_to_import.append(dataset)
76     ###
77 spiga 1.24
78 mcinquil 1.45 #self.import_all_parents = cfg_params.get('USER.publish_with_import_all_parents',1)
79 fanzago 1.29 self.skipOcheck=cfg_params.get('CMSSW.publish_zero_event',0)
80 spiga 1.24
81 slacapra 1.1 self.SEName=''
82     self.CMSSW_VERSION=''
83     self.exit_status=''
84     self.time = time.strftime('%y%m%d_%H%M%S',time.localtime(time.time()))
85 fanzago 1.5 self.problemFiles=[]
86     self.noEventsFiles=[]
87     self.noLFN=[]
88 slacapra 1.1
89     def importParentDataset(self,globalDBS, datasetpath):
90     """
91 fanzago 1.35 """
92 slacapra 1.1 dbsWriter = DBSWriter(self.DBSURL,level='ERROR')
93    
94     try:
95 mcinquil 1.45 #if (self.import_all_parents==1):
96     common.logger.info("--->>> Importing all parents level")
97     start = time.time()
98     common.logger.debug("start import time: " + str(start))
99     ### to skip the ProdCommon api exception in the case of block without location
100     ### skipNoSiteError=True
101     #dbsWriter.importDataset(globalDBS, datasetpath, self.DBSURL, skipNoSiteError=True)
102     ### calling dbs api directly
103     dbsWriter.dbs.migrateDatasetContents(globalDBS, self.DBSURL, datasetpath)
104     stop = time.time()
105     common.logger.debug("stop import time: " + str(stop))
106     common.logger.info("--->>> duration of all parents import (sec): "+str(stop - start))
107     ## still not removing the code, but TODO for the final release...
108     """
109 fanzago 1.37 else:
110     common.logger.info("--->>> Importing only the datasetpath " + datasetpath)
111 fanzago 1.43 start = time.time()
112     #dbsWriter.importDatasetWithoutParentage(globalDBS, datasetpath, self.DBSURL, skipNoSiteError=True)
113     ### calling dbs api directly
114     common.logger.debug("start import time: " + str(start))
115     dbsWriter.dbs.migrateDatasetContents(globalDBS, self.DBSURL, datasetpath, noParentsReadOnly = True )
116     stop = time.time()
117     common.logger.debug("stop import time: " + str(stop))
118     common.logger.info("--->>> duration of first level parent import (sec): "+str(stop - start))
119 mcinquil 1.45 """
120 slacapra 1.1 except DBSWriterError, ex:
121     msg = "Error importing dataset to be processed into local DBS\n"
122     msg += "Source Dataset: %s\n" % datasetpath
123     msg += "Source DBS: %s\n" % globalDBS
124     msg += "Destination DBS: %s\n" % self.DBSURL
125 spiga 1.27 common.logger.info(msg)
126 fanzago 1.42 common.logger.info(str(ex))
127 slacapra 1.1 return 1
128     return 0
129 fanzago 1.43
130 slacapra 1.1 def publishDataset(self,file):
131     """
132     """
133     try:
134     jobReport = readJobReport(file)[0]
135     self.exit_status = '0'
136     except IndexError:
137     self.exit_status = '1'
138     msg = "Error: Problem with "+file+" file"
139 spiga 1.27 common.logger.info(msg)
140 slacapra 1.1 return self.exit_status
141 fanzago 1.12
142     if (len(self.dataset_to_import) != 0):
143     for dataset in self.dataset_to_import:
144 spiga 1.27 common.logger.info("--->>> Importing parent dataset in the dbs: " +dataset)
145 fanzago 1.12 status_import=self.importParentDataset(self.globalDBS, dataset)
146     if (status_import == 1):
147 spiga 1.27 common.logger.info('Problem with parent '+ dataset +' import from the global DBS '+self.globalDBS+ 'to the local one '+self.DBSURL)
148 fanzago 1.12 self.exit_status='1'
149     return self.exit_status
150     else:
151 spiga 1.27 common.logger.info('Import ok of dataset '+dataset)
152 fanzago 1.2
153 slacapra 1.1 #// DBS to contact
154 fanzago 1.2 dbswriter = DBSWriter(self.DBSURL)
155 slacapra 1.1 try:
156     fileinfo= jobReport.files[0]
157     self.exit_status = '0'
158     except IndexError:
159     self.exit_status = '1'
160 fanzago 1.42 msg = "Error: No EDM file to publish in xml file"+file+" file"
161 spiga 1.27 common.logger.info(msg)
162 slacapra 1.1 return self.exit_status
163    
164     datasets=fileinfo.dataset
165 spiga 1.27 common.logger.log(10-1,"FileInfo = " + str(fileinfo))
166     common.logger.log(10-1,"DatasetInfo = " + str(datasets))
167 afanfani 1.19 if len(datasets)<=0:
168     self.exit_status = '1'
169     msg = "Error: No info about dataset in the xml file "+file
170 spiga 1.27 common.logger.info(msg)
171 afanfani 1.19 return self.exit_status
172 slacapra 1.1 for dataset in datasets:
173 fanzago 1.6 #### for production data
174 afanfani 1.16 self.processedData = dataset['ProcessedDataset']
175 fanzago 1.6 if (dataset['PrimaryDataset'] == 'null'):
176 fanzago 1.21 #dataset['PrimaryDataset'] = dataset['ProcessedDataset']
177     dataset['PrimaryDataset'] = self.userprocessedData
178     #else: # add parentage from input dataset
179     elif self.datasetpath.upper() != 'NONE':
180 afanfani 1.14 dataset['ParentDataset']= self.datasetpath
181    
182 fanzago 1.4 dataset['PSetContent']=self.content
183     cfgMeta = {'name' : self.pset , 'Type' : 'user' , 'annotation': 'user cfg', 'version' : 'private version'} # add real name of user cfg
184 spiga 1.27 common.logger.info("PrimaryDataset = %s"%dataset['PrimaryDataset'])
185     common.logger.info("ProcessedDataset = %s"%dataset['ProcessedDataset'])
186     common.logger.info("<User Dataset Name> = /"+dataset['PrimaryDataset']+"/"+dataset['ProcessedDataset']+"/USER")
187 fanzago 1.28 self.dataset_to_check="/"+dataset['PrimaryDataset']+"/"+dataset['ProcessedDataset']+"/USER"
188 fanzago 1.6
189 spiga 1.27 common.logger.log(10-1,"--->>> Inserting primary: %s processed : %s"%(dataset['PrimaryDataset'],dataset['ProcessedDataset']))
190 slacapra 1.1
191     primary = DBSWriterObjects.createPrimaryDataset( dataset, dbswriter.dbs)
192 spiga 1.27 common.logger.log(10-1,"Primary: %s "%primary)
193 slacapra 1.1
194     algo = DBSWriterObjects.createAlgorithm(dataset, cfgMeta, dbswriter.dbs)
195 spiga 1.27 common.logger.log(10-1,"Algo: %s "%algo)
196 slacapra 1.1
197     processed = DBSWriterObjects.createProcessedDataset(primary, algo, dataset, dbswriter.dbs)
198 spiga 1.27 common.logger.log(10-1,"Processed: %s "%processed)
199 slacapra 1.1
200 spiga 1.27 common.logger.log(10-1,"Inserted primary %s processed %s"%(primary,processed))
201 fanzago 1.2
202 spiga 1.27 common.logger.log(10-1,"exit_status = %s "%self.exit_status)
203 slacapra 1.1 return self.exit_status
204    
205     def publishAJobReport(self,file,procdataset):
206     """
207 fanzago 1.2 input: xml file, processedDataset
208 slacapra 1.1 """
209 fanzago 1.44 common.logger.debug("FJR = %s"%file)
210 slacapra 1.1 try:
211     jobReport = readJobReport(file)[0]
212     self.exit_status = '0'
213     except IndexError:
214     self.exit_status = '1'
215     msg = "Error: Problem with "+file+" file"
216     raise CrabException(msg)
217 fanzago 1.4 ### overwrite ProcessedDataset with user defined value
218     ### overwrite lumisections with no value
219     ### skip publication for 0 events files
220     filestopublish=[]
221 slacapra 1.1 for file in jobReport.files:
222 fanzago 1.5 #### added check for problem with copy to SE and empty lfn
223     if (string.find(file['LFN'], 'copy_problems') != -1):
224     self.problemFiles.append(file['LFN'])
225     elif (file['LFN'] == ''):
226     self.noLFN.append(file['PFN'])
227 fanzago 1.4 else:
228 spiga 1.24 if self.skipOcheck==0:
229     if int(file['TotalEvents']) != 0:
230     #file.lumisections = {}
231     # lumi info are now in run hash
232     file.runs = {}
233     for ds in file.dataset:
234     ### Fede for production
235     if (ds['PrimaryDataset'] == 'null'):
236     #ds['PrimaryDataset']=procdataset
237     ds['PrimaryDataset']=self.userprocessedData
238     filestopublish.append(file)
239     else:
240     self.noEventsFiles.append(file['LFN'])
241     else:
242     file.runs = {}
243     for ds in file.dataset:
244     ### Fede for production
245     if (ds['PrimaryDataset'] == 'null'):
246     #ds['PrimaryDataset']=procdataset
247     ds['PrimaryDataset']=self.userprocessedData
248     filestopublish.append(file)
249    
250 fanzago 1.4 jobReport.files = filestopublish
251 fanzago 1.44 for file in filestopublish:
252     common.logger.debug("--->>> LFN of file to publish = " + str(file['LFN']))
253 fanzago 1.4 ### if all files of FJR have number of events = 0
254     if (len(filestopublish) == 0):
255 fanzago 1.44 return None
256 fanzago 1.4
257 slacapra 1.1 #// DBS to contact
258 fanzago 1.2 dbswriter = DBSWriter(self.DBSURL)
259 slacapra 1.1 # insert files
260     Blocks=None
261     try:
262     Blocks=dbswriter.insertFiles(jobReport)
263 fanzago 1.44 common.logger.debug("--->>> Inserting file in blocks = %s"%Blocks)
264 slacapra 1.1 except DBSWriterError, ex:
265 fanzago 1.44 common.logger.debug("--->>> Insert file error: %s"%ex)
266 slacapra 1.1 return Blocks
267    
268     def run(self):
269     """
270     parse of all xml file on res dir and creation of distionary
271     """
272 fanzago 1.2
273 slacapra 1.1 file_list = glob.glob(self.resDir+"crab_fjr*.xml")
274 fanzago 1.44
275 afanfani 1.19 ## Select only those fjr that are succesfull
276 spiga 1.40 if (len(file_list)==0):
277     common.logger.info("--->>> "+self.resDir+" empty: no file to publish on DBS")
278     self.exit_status = '1'
279     return self.exit_status
280    
281 afanfani 1.19 good_list=[]
282     for fjr in file_list:
283     reports = readJobReport(fjr)
284 afanfani 1.20 if len(reports)>0:
285     if reports[0].status == "Success":
286     good_list.append(fjr)
287 afanfani 1.19 file_list=good_list
288     ##
289 spiga 1.27 common.logger.log(10-1, "file_list = "+str(file_list))
290     common.logger.log(10-1, "len(file_list) = "+str(len(file_list)))
291 fanzago 1.2
292 slacapra 1.1 if (len(file_list)>0):
293     BlocksList=[]
294 spiga 1.27 common.logger.info("--->>> Start dataset publication")
295 fanzago 1.2 self.exit_status=self.publishDataset(file_list[0])
296     if (self.exit_status == '1'):
297     return self.exit_status
298 spiga 1.27 common.logger.info("--->>> End dataset publication")
299 fanzago 1.2
300    
301 spiga 1.27 common.logger.info("--->>> Start files publication")
302 slacapra 1.1 for file in file_list:
303     Blocks=self.publishAJobReport(file,self.processedData)
304     if Blocks:
305 afanfani 1.14 for x in Blocks: # do not allow multiple entries of the same block
306     if x not in BlocksList:
307     BlocksList.append(x)
308 fanzago 1.2
309 slacapra 1.1 # close the blocks
310 spiga 1.27 common.logger.log(10-1, "BlocksList = %s"%BlocksList)
311 fanzago 1.2 # dbswriter = DBSWriter(self.DBSURL,level='ERROR')
312     dbswriter = DBSWriter(self.DBSURL)
313    
314 slacapra 1.1 for BlockName in BlocksList:
315     try:
316     closeBlock=dbswriter.manageFileBlock(BlockName,maxFiles= 1)
317 spiga 1.27 common.logger.log(10-1, "closeBlock %s"%closeBlock)
318 slacapra 1.1 #dbswriter.dbs.closeBlock(BlockName)
319     except DBSWriterError, ex:
320 spiga 1.27 common.logger.info("Close block error %s"%ex)
321 fanzago 1.4
322 fanzago 1.5 if (len(self.noEventsFiles)>0):
323 spiga 1.27 common.logger.info("--->>> WARNING: "+str(len(self.noEventsFiles))+" files not published because they contain 0 events are:")
324 fanzago 1.5 for lfn in self.noEventsFiles:
325 spiga 1.27 common.logger.info("------ LFN: %s"%lfn)
326 fanzago 1.5 if (len(self.noLFN)>0):
327 spiga 1.27 common.logger.info("--->>> WARNING: there are "+str(len(self.noLFN))+" files not published because they have empty LFN")
328 fanzago 1.5 for pfn in self.noLFN:
329 spiga 1.27 common.logger.info("------ pfn: %s"%pfn)
330 fanzago 1.5 if (len(self.problemFiles)>0):
331 spiga 1.27 common.logger.info("--->>> WARNING: "+str(len(self.problemFiles))+" files not published because they had problem with copy to SE")
332 fanzago 1.5 for lfn in self.problemFiles:
333 spiga 1.27 common.logger.info("------ LFN: %s"%lfn)
334     common.logger.info("--->>> End files publication")
335 fanzago 1.28
336     self.cfg_params['USER.dataset_to_check']=self.dataset_to_check
337     from InspectDBS import InspectDBS
338     check=InspectDBS(self.cfg_params)
339     check.checkPublication()
340 slacapra 1.1 return self.exit_status
341    
342     else:
343 spiga 1.40 common.logger.info("--->>> No valid files to publish on DBS. Your jobs do not report exit codes = 0")
344 slacapra 1.1 self.exit_status = '1'
345     return self.exit_status
346