ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Publisher.py
Revision: 1.11
Committed: Fri May 9 10:03:54 2008 UTC (16 years, 11 months ago) by fanzago
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_3_1_pre4, CRAB_2_3_1_pre3, CRAB_2_3_1_pre2, CRAB_2_3_1_pre1, CRAB_2_3_0, CRAB_2_3_0_pre6, CRAB_2_3_0_pre1, CRAB_2_2_2_pre5, CRAB_2_2_2_pre4, CRAB_2_2_2_pre3, CRAB_2_2_2_pre2, CRAB_2_2_2_pre1, CRAB_2_2_1, CRAB_2_2_1_pre6, CRAB_2_2_1_pre5, CRAB_2_2_1_pre4, PRODCOMMON_0_10_7_testCS2, CRAB_2_2_1_pre3, CRAB_2_2_1_pre2, CRAB_2_2_1_pre1, CRAB_2_2_0, CRAB_2_2_0_pre21, CRAB_2_2_0_pre19
Branch point for: CRAB_2_3_0_br
Changes since 1.10: +5 -4 lines
Log Message:
fixed problem with datasetpath none

File Contents

# User Rev Content
1 fanzago 1.2 import getopt, string
2 slacapra 1.1 import common
3     import time, glob
4     from Actor import *
5     from crab_util import *
6     from crab_logger import Logger
7     from crab_exceptions import *
8 fanzago 1.9 from ProdCommon.FwkJobRep.ReportParser import readJobReport
9 slacapra 1.1 from ProdCommon.MCPayloads.WorkflowSpec import WorkflowSpec
10     from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter
11     from ProdCommon.DataMgmt.DBS.DBSErrors import DBSWriterError, formatEx,DBSReaderError
12     from ProdCommon.DataMgmt.DBS.DBSReader import DBSReader
13 fanzago 1.2 from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter,DBSWriterObjects
14     import sys
15 slacapra 1.1
16     class Publisher(Actor):
17     def __init__(self, cfg_params):
18     """
19     Publisher class:
20    
21     - parses CRAB FrameworkJobReport on UI
22     - returns <file> section of xml in dictionary format for each xml file in crab_0_xxxx/res directory
23     - publishes output data on DBS and DLS
24     """
25    
26     try:
27     self.processedData = cfg_params['USER.publish_data_name']
28     except KeyError:
29     raise CrabException('Cannot publish output data, because you did not specify USER.publish_data_name parameter in the crab.cfg file')
30 slacapra 1.8
31 slacapra 1.1 try:
32     if (int(cfg_params['USER.copy_data']) != 1): raise KeyError
33     except KeyError:
34     raise CrabException('You can not publish data because you did not selected *** copy_data = 1 *** in the crab.cfg file')
35 fanzago 1.4 try:
36     self.pset = cfg_params['CMSSW.pset']
37     except KeyError:
38     raise CrabException('Cannot publish output data, because you did not specify the psetname in [CMSSW] of your crab.cfg file')
39     try:
40     self.globalDBS=cfg_params['CMSSW.dbs_url']
41     except KeyError:
42     self.globalDBS="http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet"
43 fanzago 1.3 try:
44     self.DBSURL=cfg_params['USER.dbs_url_for_publication']
45 fanzago 1.6 common.logger.message('<dbs_url_for_publication> = '+self.DBSURL)
46     if (self.DBSURL == "http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet") or (self.DBSURL == "https://cmsdbsprod.cern.ch:8443/cms_dbs_prod_global_writer/servlet/DBSServlet"):
47 fanzago 1.4 msg = "You can not publish your data in the globalDBS = " + self.DBSURL + "\n"
48     msg = msg + "Please write your local one in the [USER] section 'dbs_url_for_publication'"
49     raise CrabException(msg)
50 fanzago 1.3 except KeyError:
51 fanzago 1.11 msg = "Warning. The [USER] section does not have 'dbs_url_for_publication'"
52     msg = msg + " entry, necessary to publish the data.\n"
53     msg = msg + "Use the command **crab -publish -USER.dbs_url_for_publication=dbs_url_for_publication*** \nwhere dbs_url_for_publication is your local dbs instance."
54 fanzago 1.3 raise CrabException(msg)
55 fanzago 1.4
56     self.content=file(self.pset).read()
57     self.resDir = common.work_space.resDir()
58 slacapra 1.1 self.datasetpath=cfg_params['CMSSW.datasetpath']
59     self.SEName=''
60     self.CMSSW_VERSION=''
61     self.exit_status=''
62     self.time = time.strftime('%y%m%d_%H%M%S',time.localtime(time.time()))
63 fanzago 1.5 self.problemFiles=[]
64     self.noEventsFiles=[]
65     self.noLFN=[]
66 slacapra 1.1
67     def importParentDataset(self,globalDBS, datasetpath):
68     """
69     """
70     dbsWriter = DBSWriter(self.DBSURL,level='ERROR')
71    
72     try:
73 afanfani 1.10 dbsWriter.importDatasetWithoutParentage(globalDBS, self.datasetpath, self.DBSURL)
74 slacapra 1.1 except DBSWriterError, ex:
75     msg = "Error importing dataset to be processed into local DBS\n"
76     msg += "Source Dataset: %s\n" % datasetpath
77     msg += "Source DBS: %s\n" % globalDBS
78     msg += "Destination DBS: %s\n" % self.DBSURL
79     common.logger.message(msg)
80     return 1
81     return 0
82    
83     def publishDataset(self,file):
84     """
85     """
86     try:
87     jobReport = readJobReport(file)[0]
88     self.exit_status = '0'
89     except IndexError:
90     self.exit_status = '1'
91     msg = "Error: Problem with "+file+" file"
92     common.logger.message(msg)
93     return self.exit_status
94 fanzago 1.11
95     if (self.datasetpath.upper() != 'NONE'):
96 fanzago 1.2 common.logger.message("--->>> Importing parent dataset in the dbs")
97 fanzago 1.4 status_import=self.importParentDataset(self.globalDBS, self.datasetpath)
98 slacapra 1.1 if (status_import == 1):
99 fanzago 1.4 common.logger.message('Problem with parent import from the global DBS '+self.globalDBS+ 'to the local one '+self.DBSURL)
100 slacapra 1.1 self.exit_status='1'
101     return self.exit_status
102 fanzago 1.2 common.logger.message("Parent import ok")
103    
104 slacapra 1.1 #// DBS to contact
105 fanzago 1.2 dbswriter = DBSWriter(self.DBSURL)
106 slacapra 1.1 try:
107     fileinfo= jobReport.files[0]
108     self.exit_status = '0'
109     except IndexError:
110     self.exit_status = '1'
111     msg = "Error: No file to publish in xml file"+file+" file"
112     common.logger.message(msg)
113     return self.exit_status
114    
115     datasets=fileinfo.dataset
116 fanzago 1.4 common.logger.debug(6,"FileInfo = " + str(fileinfo))
117     common.logger.debug(6,"DatasetInfo = " + str(datasets))
118 slacapra 1.1 for dataset in datasets:
119 fanzago 1.6 #### for production data
120     if (dataset['PrimaryDataset'] == 'null'):
121     dataset['PrimaryDataset'] = dataset['ProcessedDataset']
122    
123 fanzago 1.4 dataset['PSetContent']=self.content
124     cfgMeta = {'name' : self.pset , 'Type' : 'user' , 'annotation': 'user cfg', 'version' : 'private version'} # add real name of user cfg
125 slacapra 1.1 common.logger.message("PrimaryDataset = %s"%dataset['PrimaryDataset'])
126     common.logger.message("ProcessedDataset = %s"%dataset['ProcessedDataset'])
127 fanzago 1.6 common.logger.message("<User Dataset Name> = /"+dataset['PrimaryDataset']+"/"+dataset['ProcessedDataset']+"/USER")
128    
129     common.logger.debug(6,"--->>> Inserting primary: %s processed : %s"%(dataset['PrimaryDataset'],dataset['ProcessedDataset']))
130 slacapra 1.1
131     primary = DBSWriterObjects.createPrimaryDataset( dataset, dbswriter.dbs)
132 fanzago 1.4 common.logger.debug(6,"Primary: %s "%primary)
133 slacapra 1.1
134     algo = DBSWriterObjects.createAlgorithm(dataset, cfgMeta, dbswriter.dbs)
135 fanzago 1.4 common.logger.debug(6,"Algo: %s "%algo)
136 slacapra 1.1
137     processed = DBSWriterObjects.createProcessedDataset(primary, algo, dataset, dbswriter.dbs)
138 fanzago 1.4 common.logger.debug(6,"Processed: %s "%processed)
139 slacapra 1.1
140 fanzago 1.6 common.logger.debug(6,"Inserted primary %s processed %s"%(primary,processed))
141 fanzago 1.2
142 fanzago 1.4 common.logger.debug(6,"exit_status = %s "%self.exit_status)
143 slacapra 1.1 return self.exit_status
144    
145     def publishAJobReport(self,file,procdataset):
146     """
147 fanzago 1.2 input: xml file, processedDataset
148 slacapra 1.1 """
149     try:
150     jobReport = readJobReport(file)[0]
151     self.exit_status = '0'
152     except IndexError:
153     self.exit_status = '1'
154     msg = "Error: Problem with "+file+" file"
155     raise CrabException(msg)
156 fanzago 1.4 ### overwrite ProcessedDataset with user defined value
157     ### overwrite lumisections with no value
158     ### skip publication for 0 events files
159     filestopublish=[]
160 slacapra 1.1 for file in jobReport.files:
161 fanzago 1.5 #### added check for problem with copy to SE and empty lfn
162     if (string.find(file['LFN'], 'copy_problems') != -1):
163     self.problemFiles.append(file['LFN'])
164     elif (file['LFN'] == ''):
165     self.noLFN.append(file['PFN'])
166 fanzago 1.4 else:
167 fanzago 1.5 if int(file['TotalEvents']) != 0 :
168     file.lumisections = {}
169     for ds in file.dataset:
170     ds['ProcessedDataset']=procdataset
171 fanzago 1.6 ### Fede for production
172     if (ds['PrimaryDataset'] == 'null'):
173     ds['PrimaryDataset']=procdataset
174 fanzago 1.5 filestopublish.append(file)
175     else:
176     self.noEventsFiles.append(file['LFN'])
177 fanzago 1.4 jobReport.files = filestopublish
178     ### if all files of FJR have number of events = 0
179     if (len(filestopublish) == 0):
180     return None
181    
182 slacapra 1.1 #// DBS to contact
183 fanzago 1.2 dbswriter = DBSWriter(self.DBSURL)
184 slacapra 1.1 # insert files
185     Blocks=None
186     try:
187     Blocks=dbswriter.insertFiles(jobReport)
188 fanzago 1.2 common.logger.message("Blocks = %s"%Blocks)
189 slacapra 1.1 except DBSWriterError, ex:
190 fanzago 1.2 common.logger.message("Insert file error: %s"%ex)
191 slacapra 1.1 return Blocks
192    
193     def run(self):
194     """
195     parse of all xml file on res dir and creation of distionary
196     """
197 fanzago 1.2
198 slacapra 1.1 file_list = glob.glob(self.resDir+"crab_fjr*.xml")
199     common.logger.debug(6, "file_list = "+str(file_list))
200     common.logger.debug(6, "len(file_list) = "+str(len(file_list)))
201 fanzago 1.2
202 slacapra 1.1 if (len(file_list)>0):
203     BlocksList=[]
204 fanzago 1.2 common.logger.message("--->>> Start dataset publication")
205     self.exit_status=self.publishDataset(file_list[0])
206     if (self.exit_status == '1'):
207     return self.exit_status
208     common.logger.message("--->>> End dataset publication")
209    
210    
211     common.logger.message("--->>> Start files publication")
212 slacapra 1.1 for file in file_list:
213     common.logger.message("file = "+file)
214     Blocks=self.publishAJobReport(file,self.processedData)
215     if Blocks:
216     [BlocksList.append(x) for x in Blocks]
217 fanzago 1.2
218 slacapra 1.1 # close the blocks
219 fanzago 1.6 common.logger.debug(6, "BlocksList = %s"%BlocksList)
220 fanzago 1.2 # dbswriter = DBSWriter(self.DBSURL,level='ERROR')
221     dbswriter = DBSWriter(self.DBSURL)
222    
223 slacapra 1.1 for BlockName in BlocksList:
224     try:
225     closeBlock=dbswriter.manageFileBlock(BlockName,maxFiles= 1)
226 fanzago 1.6 common.logger.debug(6, "closeBlock %s"%closeBlock)
227 slacapra 1.1 #dbswriter.dbs.closeBlock(BlockName)
228     except DBSWriterError, ex:
229 fanzago 1.2 common.logger.message("Close block error %s"%ex)
230 fanzago 1.4
231 fanzago 1.5 if (len(self.noEventsFiles)>0):
232     common.logger.message("--->>> WARNING: "+str(len(self.noEventsFiles))+" files not published because they contain 0 events are:")
233     for lfn in self.noEventsFiles:
234     common.logger.message("------ LFN: %s"%lfn)
235     if (len(self.noLFN)>0):
236     common.logger.message("--->>> WARNING: there are "+str(len(self.noLFN))+" files not published because they have empty LFN")
237     for pfn in self.noLFN:
238     common.logger.message("------ pfn: %s"%pfn)
239     if (len(self.problemFiles)>0):
240     common.logger.message("--->>> WARNING: "+str(len(self.problemFiles))+" files not published because they had problem with copy to SE")
241     for lfn in self.problemFiles:
242 fanzago 1.4 common.logger.message("------ LFN: %s"%lfn)
243 fanzago 1.6 common.logger.message("--->>> End files publication")
244 fanzago 1.7 common.logger.message("--->>> To check data publication please use: InspectDBS2.py --DBSURL=<dbs_url_for_publication> --datasetPath=<User Dataset Name>")
245 slacapra 1.1 return self.exit_status
246    
247     else:
248 fanzago 1.2 common.logger.message("--->>> "+self.resDir+" empty: no file to publish on DBS")
249 slacapra 1.1 self.exit_status = '1'
250     return self.exit_status
251