ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Publisher.py
Revision: 1.15
Committed: Wed Aug 20 12:43:43 2008 UTC (16 years, 8 months ago) by fanzago
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_DLS_PHED1, CRAB_DLS_PHED, CRAB_2_3_2_Fnal, CRAB_2_3_2, CRAB_2_3_2_pre7, CRAB_2_3_2_pre5, CRAB_2_3_2_pre4, CRAB_2_3_2_pre3
Changes since 1.14: +3 -1 lines
Log Message:
changes related the new LFN and new datasetpath to publish (task 7471)

File Contents

# User Rev Content
1 fanzago 1.2 import getopt, string
2 slacapra 1.1 import common
3     import time, glob
4     from Actor import *
5     from crab_util import *
6     from crab_logger import Logger
7     from crab_exceptions import *
8 fanzago 1.9 from ProdCommon.FwkJobRep.ReportParser import readJobReport
9 slacapra 1.1 from ProdCommon.MCPayloads.WorkflowSpec import WorkflowSpec
10     from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter
11     from ProdCommon.DataMgmt.DBS.DBSErrors import DBSWriterError, formatEx,DBSReaderError
12     from ProdCommon.DataMgmt.DBS.DBSReader import DBSReader
13 fanzago 1.2 from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter,DBSWriterObjects
14     import sys
15 slacapra 1.1
16     class Publisher(Actor):
17     def __init__(self, cfg_params):
18     """
19     Publisher class:
20    
21     - parses CRAB FrameworkJobReport on UI
22     - returns <file> section of xml in dictionary format for each xml file in crab_0_xxxx/res directory
23     - publishes output data on DBS and DLS
24     """
25    
26     try:
27     self.processedData = cfg_params['USER.publish_data_name']
28     except KeyError:
29     raise CrabException('Cannot publish output data, because you did not specify USER.publish_data_name parameter in the crab.cfg file')
30 slacapra 1.8
31 slacapra 1.1 try:
32     if (int(cfg_params['USER.copy_data']) != 1): raise KeyError
33     except KeyError:
34     raise CrabException('You can not publish data because you did not selected *** copy_data = 1 *** in the crab.cfg file')
35 fanzago 1.4 try:
36     self.pset = cfg_params['CMSSW.pset']
37     except KeyError:
38     raise CrabException('Cannot publish output data, because you did not specify the psetname in [CMSSW] of your crab.cfg file')
39     try:
40     self.globalDBS=cfg_params['CMSSW.dbs_url']
41     except KeyError:
42     self.globalDBS="http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet"
43 fanzago 1.3 try:
44     self.DBSURL=cfg_params['USER.dbs_url_for_publication']
45 fanzago 1.6 common.logger.message('<dbs_url_for_publication> = '+self.DBSURL)
46     if (self.DBSURL == "http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet") or (self.DBSURL == "https://cmsdbsprod.cern.ch:8443/cms_dbs_prod_global_writer/servlet/DBSServlet"):
47 fanzago 1.4 msg = "You can not publish your data in the globalDBS = " + self.DBSURL + "\n"
48     msg = msg + "Please write your local one in the [USER] section 'dbs_url_for_publication'"
49     raise CrabException(msg)
50 fanzago 1.3 except KeyError:
51 fanzago 1.11 msg = "Warning. The [USER] section does not have 'dbs_url_for_publication'"
52     msg = msg + " entry, necessary to publish the data.\n"
53     msg = msg + "Use the command **crab -publish -USER.dbs_url_for_publication=dbs_url_for_publication*** \nwhere dbs_url_for_publication is your local dbs instance."
54 fanzago 1.3 raise CrabException(msg)
55 fanzago 1.4
56     self.content=file(self.pset).read()
57     self.resDir = common.work_space.resDir()
58 fanzago 1.12
59     self.dataset_to_import=[]
60    
61 slacapra 1.1 self.datasetpath=cfg_params['CMSSW.datasetpath']
62 fanzago 1.12 if (self.datasetpath.upper() != 'NONE'):
63     self.dataset_to_import.append(self.datasetpath)
64    
65     ### Added PU dataset
66 spiga 1.13 tmp = cfg_params.get('CMSSW.dataset_pu',None)
67 fanzago 1.12 if tmp :
68     datasets = tmp.split(',')
69     for dataset in datasets:
70     dataset=string.strip(dataset)
71     self.dataset_to_import.append(dataset)
72     ###
73    
74 slacapra 1.1 self.SEName=''
75     self.CMSSW_VERSION=''
76     self.exit_status=''
77     self.time = time.strftime('%y%m%d_%H%M%S',time.localtime(time.time()))
78 fanzago 1.5 self.problemFiles=[]
79     self.noEventsFiles=[]
80     self.noLFN=[]
81 slacapra 1.1
82     def importParentDataset(self,globalDBS, datasetpath):
83     """
84     """
85     dbsWriter = DBSWriter(self.DBSURL,level='ERROR')
86    
87     try:
88 afanfani 1.10 dbsWriter.importDatasetWithoutParentage(globalDBS, self.datasetpath, self.DBSURL)
89 slacapra 1.1 except DBSWriterError, ex:
90     msg = "Error importing dataset to be processed into local DBS\n"
91     msg += "Source Dataset: %s\n" % datasetpath
92     msg += "Source DBS: %s\n" % globalDBS
93     msg += "Destination DBS: %s\n" % self.DBSURL
94     common.logger.message(msg)
95     return 1
96     return 0
97    
98     def publishDataset(self,file):
99     """
100     """
101     try:
102     jobReport = readJobReport(file)[0]
103     self.exit_status = '0'
104     except IndexError:
105     self.exit_status = '1'
106     msg = "Error: Problem with "+file+" file"
107     common.logger.message(msg)
108     return self.exit_status
109 fanzago 1.12
110     if (len(self.dataset_to_import) != 0):
111     for dataset in self.dataset_to_import:
112     common.logger.message("--->>> Importing parent dataset in the dbs: " +dataset)
113     status_import=self.importParentDataset(self.globalDBS, dataset)
114     if (status_import == 1):
115     common.logger.message('Problem with parent '+ dataset +' import from the global DBS '+self.globalDBS+ 'to the local one '+self.DBSURL)
116     self.exit_status='1'
117     return self.exit_status
118     else:
119     common.logger.message('Import ok of dataset '+dataset)
120 fanzago 1.2
121 slacapra 1.1 #// DBS to contact
122 fanzago 1.2 dbswriter = DBSWriter(self.DBSURL)
123 slacapra 1.1 try:
124     fileinfo= jobReport.files[0]
125     self.exit_status = '0'
126     except IndexError:
127     self.exit_status = '1'
128     msg = "Error: No file to publish in xml file"+file+" file"
129     common.logger.message(msg)
130     return self.exit_status
131    
132     datasets=fileinfo.dataset
133 fanzago 1.4 common.logger.debug(6,"FileInfo = " + str(fileinfo))
134     common.logger.debug(6,"DatasetInfo = " + str(datasets))
135 slacapra 1.1 for dataset in datasets:
136 fanzago 1.6 #### for production data
137     if (dataset['PrimaryDataset'] == 'null'):
138     dataset['PrimaryDataset'] = dataset['ProcessedDataset']
139 afanfani 1.14 else: # add parentage from input dataset
140     dataset['ParentDataset']= self.datasetpath
141    
142 fanzago 1.4 dataset['PSetContent']=self.content
143     cfgMeta = {'name' : self.pset , 'Type' : 'user' , 'annotation': 'user cfg', 'version' : 'private version'} # add real name of user cfg
144 slacapra 1.1 common.logger.message("PrimaryDataset = %s"%dataset['PrimaryDataset'])
145     common.logger.message("ProcessedDataset = %s"%dataset['ProcessedDataset'])
146 fanzago 1.6 common.logger.message("<User Dataset Name> = /"+dataset['PrimaryDataset']+"/"+dataset['ProcessedDataset']+"/USER")
147    
148     common.logger.debug(6,"--->>> Inserting primary: %s processed : %s"%(dataset['PrimaryDataset'],dataset['ProcessedDataset']))
149 slacapra 1.1
150     primary = DBSWriterObjects.createPrimaryDataset( dataset, dbswriter.dbs)
151 fanzago 1.4 common.logger.debug(6,"Primary: %s "%primary)
152 slacapra 1.1
153     algo = DBSWriterObjects.createAlgorithm(dataset, cfgMeta, dbswriter.dbs)
154 fanzago 1.4 common.logger.debug(6,"Algo: %s "%algo)
155 slacapra 1.1
156     processed = DBSWriterObjects.createProcessedDataset(primary, algo, dataset, dbswriter.dbs)
157 fanzago 1.4 common.logger.debug(6,"Processed: %s "%processed)
158 slacapra 1.1
159 fanzago 1.6 common.logger.debug(6,"Inserted primary %s processed %s"%(primary,processed))
160 fanzago 1.2
161 fanzago 1.4 common.logger.debug(6,"exit_status = %s "%self.exit_status)
162 slacapra 1.1 return self.exit_status
163    
164     def publishAJobReport(self,file,procdataset):
165     """
166 fanzago 1.2 input: xml file, processedDataset
167 slacapra 1.1 """
168     try:
169     jobReport = readJobReport(file)[0]
170     self.exit_status = '0'
171     except IndexError:
172     self.exit_status = '1'
173     msg = "Error: Problem with "+file+" file"
174     raise CrabException(msg)
175 fanzago 1.4 ### overwrite ProcessedDataset with user defined value
176     ### overwrite lumisections with no value
177     ### skip publication for 0 events files
178     filestopublish=[]
179 slacapra 1.1 for file in jobReport.files:
180 fanzago 1.5 #### added check for problem with copy to SE and empty lfn
181     if (string.find(file['LFN'], 'copy_problems') != -1):
182     self.problemFiles.append(file['LFN'])
183     elif (file['LFN'] == ''):
184     self.noLFN.append(file['PFN'])
185 fanzago 1.4 else:
186 fanzago 1.5 if int(file['TotalEvents']) != 0 :
187     file.lumisections = {}
188     for ds in file.dataset:
189 fanzago 1.15 ### FEDE FOR NEW LFN ###
190     #ds['ProcessedDataset']=procdataset
191     ########################
192 fanzago 1.6 ### Fede for production
193     if (ds['PrimaryDataset'] == 'null'):
194     ds['PrimaryDataset']=procdataset
195 fanzago 1.5 filestopublish.append(file)
196     else:
197     self.noEventsFiles.append(file['LFN'])
198 fanzago 1.4 jobReport.files = filestopublish
199     ### if all files of FJR have number of events = 0
200     if (len(filestopublish) == 0):
201     return None
202    
203 slacapra 1.1 #// DBS to contact
204 fanzago 1.2 dbswriter = DBSWriter(self.DBSURL)
205 slacapra 1.1 # insert files
206     Blocks=None
207     try:
208     Blocks=dbswriter.insertFiles(jobReport)
209 fanzago 1.2 common.logger.message("Blocks = %s"%Blocks)
210 slacapra 1.1 except DBSWriterError, ex:
211 fanzago 1.2 common.logger.message("Insert file error: %s"%ex)
212 slacapra 1.1 return Blocks
213    
214     def run(self):
215     """
216     parse of all xml file on res dir and creation of distionary
217     """
218 fanzago 1.2
219 slacapra 1.1 file_list = glob.glob(self.resDir+"crab_fjr*.xml")
220     common.logger.debug(6, "file_list = "+str(file_list))
221     common.logger.debug(6, "len(file_list) = "+str(len(file_list)))
222 fanzago 1.2
223 slacapra 1.1 if (len(file_list)>0):
224     BlocksList=[]
225 fanzago 1.2 common.logger.message("--->>> Start dataset publication")
226     self.exit_status=self.publishDataset(file_list[0])
227     if (self.exit_status == '1'):
228     return self.exit_status
229     common.logger.message("--->>> End dataset publication")
230    
231    
232     common.logger.message("--->>> Start files publication")
233 slacapra 1.1 for file in file_list:
234     common.logger.message("file = "+file)
235     Blocks=self.publishAJobReport(file,self.processedData)
236     if Blocks:
237 afanfani 1.14 for x in Blocks: # do not allow multiple entries of the same block
238     if x not in BlocksList:
239     BlocksList.append(x)
240 fanzago 1.2
241 slacapra 1.1 # close the blocks
242 fanzago 1.6 common.logger.debug(6, "BlocksList = %s"%BlocksList)
243 fanzago 1.2 # dbswriter = DBSWriter(self.DBSURL,level='ERROR')
244     dbswriter = DBSWriter(self.DBSURL)
245    
246 slacapra 1.1 for BlockName in BlocksList:
247     try:
248     closeBlock=dbswriter.manageFileBlock(BlockName,maxFiles= 1)
249 fanzago 1.6 common.logger.debug(6, "closeBlock %s"%closeBlock)
250 slacapra 1.1 #dbswriter.dbs.closeBlock(BlockName)
251     except DBSWriterError, ex:
252 fanzago 1.2 common.logger.message("Close block error %s"%ex)
253 fanzago 1.4
254 fanzago 1.5 if (len(self.noEventsFiles)>0):
255     common.logger.message("--->>> WARNING: "+str(len(self.noEventsFiles))+" files not published because they contain 0 events are:")
256     for lfn in self.noEventsFiles:
257     common.logger.message("------ LFN: %s"%lfn)
258     if (len(self.noLFN)>0):
259     common.logger.message("--->>> WARNING: there are "+str(len(self.noLFN))+" files not published because they have empty LFN")
260     for pfn in self.noLFN:
261     common.logger.message("------ pfn: %s"%pfn)
262     if (len(self.problemFiles)>0):
263     common.logger.message("--->>> WARNING: "+str(len(self.problemFiles))+" files not published because they had problem with copy to SE")
264     for lfn in self.problemFiles:
265 fanzago 1.4 common.logger.message("------ LFN: %s"%lfn)
266 fanzago 1.6 common.logger.message("--->>> End files publication")
267 fanzago 1.7 common.logger.message("--->>> To check data publication please use: InspectDBS2.py --DBSURL=<dbs_url_for_publication> --datasetPath=<User Dataset Name>")
268 slacapra 1.1 return self.exit_status
269    
270     else:
271 fanzago 1.2 common.logger.message("--->>> "+self.resDir+" empty: no file to publish on DBS")
272 slacapra 1.1 self.exit_status = '1'
273     return self.exit_status
274