ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Publisher.py
Revision: 1.16
Committed: Fri Sep 12 19:23:12 2008 UTC (16 years, 7 months ago) by afanfani
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_4_0_pre9, CRAB_2_4_0_pre8, CRAB_2_4_0_pre7, CRAB_2_4_0_pre6, CRAB_2_4_0_pre5, CRAB_2_4_0_pre4, CRAB_2_4_0_pre3, CRAB_2_4_0_pre2, CRAB_2_4_0_pre1
Changes since 1.15: +3 -1 lines
Log Message:
Fix to use the correct processed Dataset in case on datasetpath=None

File Contents

# User Rev Content
1 fanzago 1.2 import getopt, string
2 slacapra 1.1 import common
3     import time, glob
4     from Actor import *
5     from crab_util import *
6     from crab_logger import Logger
7     from crab_exceptions import *
8 fanzago 1.9 from ProdCommon.FwkJobRep.ReportParser import readJobReport
9 slacapra 1.1 from ProdCommon.MCPayloads.WorkflowSpec import WorkflowSpec
10     from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter
11     from ProdCommon.DataMgmt.DBS.DBSErrors import DBSWriterError, formatEx,DBSReaderError
12     from ProdCommon.DataMgmt.DBS.DBSReader import DBSReader
13 fanzago 1.2 from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter,DBSWriterObjects
14     import sys
15 slacapra 1.1
16     class Publisher(Actor):
17     def __init__(self, cfg_params):
18     """
19     Publisher class:
20    
21     - parses CRAB FrameworkJobReport on UI
22     - returns <file> section of xml in dictionary format for each xml file in crab_0_xxxx/res directory
23     - publishes output data on DBS and DLS
24     """
25    
26     try:
27 afanfani 1.16 userprocessedData = cfg_params['USER.publish_data_name']
28     self.processedData = None
29 slacapra 1.1 except KeyError:
30     raise CrabException('Cannot publish output data, because you did not specify USER.publish_data_name parameter in the crab.cfg file')
31 slacapra 1.8
32 slacapra 1.1 try:
33     if (int(cfg_params['USER.copy_data']) != 1): raise KeyError
34     except KeyError:
35     raise CrabException('You can not publish data because you did not selected *** copy_data = 1 *** in the crab.cfg file')
36 fanzago 1.4 try:
37     self.pset = cfg_params['CMSSW.pset']
38     except KeyError:
39     raise CrabException('Cannot publish output data, because you did not specify the psetname in [CMSSW] of your crab.cfg file')
40     try:
41     self.globalDBS=cfg_params['CMSSW.dbs_url']
42     except KeyError:
43     self.globalDBS="http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet"
44 fanzago 1.3 try:
45     self.DBSURL=cfg_params['USER.dbs_url_for_publication']
46 fanzago 1.6 common.logger.message('<dbs_url_for_publication> = '+self.DBSURL)
47     if (self.DBSURL == "http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet") or (self.DBSURL == "https://cmsdbsprod.cern.ch:8443/cms_dbs_prod_global_writer/servlet/DBSServlet"):
48 fanzago 1.4 msg = "You can not publish your data in the globalDBS = " + self.DBSURL + "\n"
49     msg = msg + "Please write your local one in the [USER] section 'dbs_url_for_publication'"
50     raise CrabException(msg)
51 fanzago 1.3 except KeyError:
52 fanzago 1.11 msg = "Warning. The [USER] section does not have 'dbs_url_for_publication'"
53     msg = msg + " entry, necessary to publish the data.\n"
54     msg = msg + "Use the command **crab -publish -USER.dbs_url_for_publication=dbs_url_for_publication*** \nwhere dbs_url_for_publication is your local dbs instance."
55 fanzago 1.3 raise CrabException(msg)
56 fanzago 1.4
57     self.content=file(self.pset).read()
58     self.resDir = common.work_space.resDir()
59 fanzago 1.12
60     self.dataset_to_import=[]
61    
62 slacapra 1.1 self.datasetpath=cfg_params['CMSSW.datasetpath']
63 fanzago 1.12 if (self.datasetpath.upper() != 'NONE'):
64     self.dataset_to_import.append(self.datasetpath)
65    
66     ### Added PU dataset
67 spiga 1.13 tmp = cfg_params.get('CMSSW.dataset_pu',None)
68 fanzago 1.12 if tmp :
69     datasets = tmp.split(',')
70     for dataset in datasets:
71     dataset=string.strip(dataset)
72     self.dataset_to_import.append(dataset)
73     ###
74    
75 slacapra 1.1 self.SEName=''
76     self.CMSSW_VERSION=''
77     self.exit_status=''
78     self.time = time.strftime('%y%m%d_%H%M%S',time.localtime(time.time()))
79 fanzago 1.5 self.problemFiles=[]
80     self.noEventsFiles=[]
81     self.noLFN=[]
82 slacapra 1.1
83     def importParentDataset(self,globalDBS, datasetpath):
84     """
85     """
86     dbsWriter = DBSWriter(self.DBSURL,level='ERROR')
87    
88     try:
89 afanfani 1.10 dbsWriter.importDatasetWithoutParentage(globalDBS, self.datasetpath, self.DBSURL)
90 slacapra 1.1 except DBSWriterError, ex:
91     msg = "Error importing dataset to be processed into local DBS\n"
92     msg += "Source Dataset: %s\n" % datasetpath
93     msg += "Source DBS: %s\n" % globalDBS
94     msg += "Destination DBS: %s\n" % self.DBSURL
95     common.logger.message(msg)
96     return 1
97     return 0
98    
99     def publishDataset(self,file):
100     """
101     """
102     try:
103     jobReport = readJobReport(file)[0]
104     self.exit_status = '0'
105     except IndexError:
106     self.exit_status = '1'
107     msg = "Error: Problem with "+file+" file"
108     common.logger.message(msg)
109     return self.exit_status
110 fanzago 1.12
111     if (len(self.dataset_to_import) != 0):
112     for dataset in self.dataset_to_import:
113     common.logger.message("--->>> Importing parent dataset in the dbs: " +dataset)
114     status_import=self.importParentDataset(self.globalDBS, dataset)
115     if (status_import == 1):
116     common.logger.message('Problem with parent '+ dataset +' import from the global DBS '+self.globalDBS+ 'to the local one '+self.DBSURL)
117     self.exit_status='1'
118     return self.exit_status
119     else:
120     common.logger.message('Import ok of dataset '+dataset)
121 fanzago 1.2
122 slacapra 1.1 #// DBS to contact
123 fanzago 1.2 dbswriter = DBSWriter(self.DBSURL)
124 slacapra 1.1 try:
125     fileinfo= jobReport.files[0]
126     self.exit_status = '0'
127     except IndexError:
128     self.exit_status = '1'
129     msg = "Error: No file to publish in xml file"+file+" file"
130     common.logger.message(msg)
131     return self.exit_status
132    
133     datasets=fileinfo.dataset
134 fanzago 1.4 common.logger.debug(6,"FileInfo = " + str(fileinfo))
135     common.logger.debug(6,"DatasetInfo = " + str(datasets))
136 slacapra 1.1 for dataset in datasets:
137 fanzago 1.6 #### for production data
138 afanfani 1.16 self.processedData = dataset['ProcessedDataset']
139 fanzago 1.6 if (dataset['PrimaryDataset'] == 'null'):
140     dataset['PrimaryDataset'] = dataset['ProcessedDataset']
141 afanfani 1.14 else: # add parentage from input dataset
142     dataset['ParentDataset']= self.datasetpath
143    
144 fanzago 1.4 dataset['PSetContent']=self.content
145     cfgMeta = {'name' : self.pset , 'Type' : 'user' , 'annotation': 'user cfg', 'version' : 'private version'} # add real name of user cfg
146 slacapra 1.1 common.logger.message("PrimaryDataset = %s"%dataset['PrimaryDataset'])
147     common.logger.message("ProcessedDataset = %s"%dataset['ProcessedDataset'])
148 fanzago 1.6 common.logger.message("<User Dataset Name> = /"+dataset['PrimaryDataset']+"/"+dataset['ProcessedDataset']+"/USER")
149    
150     common.logger.debug(6,"--->>> Inserting primary: %s processed : %s"%(dataset['PrimaryDataset'],dataset['ProcessedDataset']))
151 slacapra 1.1
152     primary = DBSWriterObjects.createPrimaryDataset( dataset, dbswriter.dbs)
153 fanzago 1.4 common.logger.debug(6,"Primary: %s "%primary)
154 slacapra 1.1
155     algo = DBSWriterObjects.createAlgorithm(dataset, cfgMeta, dbswriter.dbs)
156 fanzago 1.4 common.logger.debug(6,"Algo: %s "%algo)
157 slacapra 1.1
158     processed = DBSWriterObjects.createProcessedDataset(primary, algo, dataset, dbswriter.dbs)
159 fanzago 1.4 common.logger.debug(6,"Processed: %s "%processed)
160 slacapra 1.1
161 fanzago 1.6 common.logger.debug(6,"Inserted primary %s processed %s"%(primary,processed))
162 fanzago 1.2
163 fanzago 1.4 common.logger.debug(6,"exit_status = %s "%self.exit_status)
164 slacapra 1.1 return self.exit_status
165    
166     def publishAJobReport(self,file,procdataset):
167     """
168 fanzago 1.2 input: xml file, processedDataset
169 slacapra 1.1 """
170     try:
171     jobReport = readJobReport(file)[0]
172     self.exit_status = '0'
173     except IndexError:
174     self.exit_status = '1'
175     msg = "Error: Problem with "+file+" file"
176     raise CrabException(msg)
177 fanzago 1.4 ### overwrite ProcessedDataset with user defined value
178     ### overwrite lumisections with no value
179     ### skip publication for 0 events files
180     filestopublish=[]
181 slacapra 1.1 for file in jobReport.files:
182 fanzago 1.5 #### added check for problem with copy to SE and empty lfn
183     if (string.find(file['LFN'], 'copy_problems') != -1):
184     self.problemFiles.append(file['LFN'])
185     elif (file['LFN'] == ''):
186     self.noLFN.append(file['PFN'])
187 fanzago 1.4 else:
188 fanzago 1.5 if int(file['TotalEvents']) != 0 :
189     file.lumisections = {}
190     for ds in file.dataset:
191 fanzago 1.15 ### FEDE FOR NEW LFN ###
192     #ds['ProcessedDataset']=procdataset
193     ########################
194 fanzago 1.6 ### Fede for production
195     if (ds['PrimaryDataset'] == 'null'):
196     ds['PrimaryDataset']=procdataset
197 fanzago 1.5 filestopublish.append(file)
198     else:
199     self.noEventsFiles.append(file['LFN'])
200 fanzago 1.4 jobReport.files = filestopublish
201     ### if all files of FJR have number of events = 0
202     if (len(filestopublish) == 0):
203     return None
204    
205 slacapra 1.1 #// DBS to contact
206 fanzago 1.2 dbswriter = DBSWriter(self.DBSURL)
207 slacapra 1.1 # insert files
208     Blocks=None
209     try:
210     Blocks=dbswriter.insertFiles(jobReport)
211 fanzago 1.2 common.logger.message("Blocks = %s"%Blocks)
212 slacapra 1.1 except DBSWriterError, ex:
213 fanzago 1.2 common.logger.message("Insert file error: %s"%ex)
214 slacapra 1.1 return Blocks
215    
216     def run(self):
217     """
218     parse of all xml file on res dir and creation of distionary
219     """
220 fanzago 1.2
221 slacapra 1.1 file_list = glob.glob(self.resDir+"crab_fjr*.xml")
222     common.logger.debug(6, "file_list = "+str(file_list))
223     common.logger.debug(6, "len(file_list) = "+str(len(file_list)))
224 fanzago 1.2
225 slacapra 1.1 if (len(file_list)>0):
226     BlocksList=[]
227 fanzago 1.2 common.logger.message("--->>> Start dataset publication")
228     self.exit_status=self.publishDataset(file_list[0])
229     if (self.exit_status == '1'):
230     return self.exit_status
231     common.logger.message("--->>> End dataset publication")
232    
233    
234     common.logger.message("--->>> Start files publication")
235 slacapra 1.1 for file in file_list:
236     common.logger.message("file = "+file)
237     Blocks=self.publishAJobReport(file,self.processedData)
238     if Blocks:
239 afanfani 1.14 for x in Blocks: # do not allow multiple entries of the same block
240     if x not in BlocksList:
241     BlocksList.append(x)
242 fanzago 1.2
243 slacapra 1.1 # close the blocks
244 fanzago 1.6 common.logger.debug(6, "BlocksList = %s"%BlocksList)
245 fanzago 1.2 # dbswriter = DBSWriter(self.DBSURL,level='ERROR')
246     dbswriter = DBSWriter(self.DBSURL)
247    
248 slacapra 1.1 for BlockName in BlocksList:
249     try:
250     closeBlock=dbswriter.manageFileBlock(BlockName,maxFiles= 1)
251 fanzago 1.6 common.logger.debug(6, "closeBlock %s"%closeBlock)
252 slacapra 1.1 #dbswriter.dbs.closeBlock(BlockName)
253     except DBSWriterError, ex:
254 fanzago 1.2 common.logger.message("Close block error %s"%ex)
255 fanzago 1.4
256 fanzago 1.5 if (len(self.noEventsFiles)>0):
257     common.logger.message("--->>> WARNING: "+str(len(self.noEventsFiles))+" files not published because they contain 0 events are:")
258     for lfn in self.noEventsFiles:
259     common.logger.message("------ LFN: %s"%lfn)
260     if (len(self.noLFN)>0):
261     common.logger.message("--->>> WARNING: there are "+str(len(self.noLFN))+" files not published because they have empty LFN")
262     for pfn in self.noLFN:
263     common.logger.message("------ pfn: %s"%pfn)
264     if (len(self.problemFiles)>0):
265     common.logger.message("--->>> WARNING: "+str(len(self.problemFiles))+" files not published because they had problem with copy to SE")
266     for lfn in self.problemFiles:
267 fanzago 1.4 common.logger.message("------ LFN: %s"%lfn)
268 fanzago 1.6 common.logger.message("--->>> End files publication")
269 fanzago 1.7 common.logger.message("--->>> To check data publication please use: InspectDBS2.py --DBSURL=<dbs_url_for_publication> --datasetPath=<User Dataset Name>")
270 slacapra 1.1 return self.exit_status
271    
272     else:
273 fanzago 1.2 common.logger.message("--->>> "+self.resDir+" empty: no file to publish on DBS")
274 slacapra 1.1 self.exit_status = '1'
275     return self.exit_status
276