ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Publisher.py
Revision: 1.7
Committed: Tue Dec 18 10:58:20 2007 UTC (17 years, 4 months ago) by fanzago
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_0_4, CRAB_2_0_4_pre2
Changes since 1.6: +1 -1 lines
Log Message:
removed a / from the final message

File Contents

# User Rev Content
1 fanzago 1.2 import getopt, string
2 slacapra 1.1 import common
3     import time, glob
4     from Actor import *
5     from crab_util import *
6     from crab_logger import Logger
7     from crab_exceptions import *
8 fanzago 1.2 from FwkJobRep.ReportParser import readJobReport
9 slacapra 1.1 from ProdCommon.MCPayloads.WorkflowSpec import WorkflowSpec
10     from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter
11     from ProdCommon.DataMgmt.DBS.DBSErrors import DBSWriterError, formatEx,DBSReaderError
12     from ProdCommon.DataMgmt.DBS.DBSReader import DBSReader
13 fanzago 1.2 from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter,DBSWriterObjects
14     import sys
15 slacapra 1.1
16     class Publisher(Actor):
17     def __init__(self, cfg_params):
18     """
19     Publisher class:
20    
21     - parses CRAB FrameworkJobReport on UI
22     - returns <file> section of xml in dictionary format for each xml file in crab_0_xxxx/res directory
23     - publishes output data on DBS and DLS
24     """
25    
26     try:
27     self.processedData = cfg_params['USER.publish_data_name']
28     except KeyError:
29     raise CrabException('Cannot publish output data, because you did not specify USER.publish_data_name parameter in the crab.cfg file')
30     try:
31     if (int(cfg_params['USER.copy_data']) != 1): raise KeyError
32     except KeyError:
33     raise CrabException('You can not publish data because you did not selected *** copy_data = 1 *** in the crab.cfg file')
34 fanzago 1.4 try:
35     self.pset = cfg_params['CMSSW.pset']
36     except KeyError:
37     raise CrabException('Cannot publish output data, because you did not specify the psetname in [CMSSW] of your crab.cfg file')
38     try:
39     self.globalDBS=cfg_params['CMSSW.dbs_url']
40     except KeyError:
41     self.globalDBS="http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet"
42 fanzago 1.3 try:
43     self.DBSURL=cfg_params['USER.dbs_url_for_publication']
44 fanzago 1.6 common.logger.message('<dbs_url_for_publication> = '+self.DBSURL)
45     if (self.DBSURL == "http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet") or (self.DBSURL == "https://cmsdbsprod.cern.ch:8443/cms_dbs_prod_global_writer/servlet/DBSServlet"):
46 fanzago 1.4 msg = "You can not publish your data in the globalDBS = " + self.DBSURL + "\n"
47     msg = msg + "Please write your local one in the [USER] section 'dbs_url_for_publication'"
48     raise CrabException(msg)
49 fanzago 1.3 except KeyError:
50     msg = "Error. The [USER] section does not have 'dbs_url_for_publication'"
51     msg = msg + " entry, necessary to publish the data"
52     raise CrabException(msg)
53 fanzago 1.4
54     self.content=file(self.pset).read()
55     self.resDir = common.work_space.resDir()
56 slacapra 1.1 self.datasetpath=cfg_params['CMSSW.datasetpath']
57     self.SEName=''
58     self.CMSSW_VERSION=''
59     self.exit_status=''
60     self.time = time.strftime('%y%m%d_%H%M%S',time.localtime(time.time()))
61 fanzago 1.5 self.problemFiles=[]
62     self.noEventsFiles=[]
63     self.noLFN=[]
64 slacapra 1.1
65     def importParentDataset(self,globalDBS, datasetpath):
66     """
67     """
68     dbsWriter = DBSWriter(self.DBSURL,level='ERROR')
69    
70     try:
71     dbsWriter.importDataset(globalDBS, self.datasetpath, self.DBSURL)
72     except DBSWriterError, ex:
73     msg = "Error importing dataset to be processed into local DBS\n"
74     msg += "Source Dataset: %s\n" % datasetpath
75     msg += "Source DBS: %s\n" % globalDBS
76     msg += "Destination DBS: %s\n" % self.DBSURL
77     common.logger.message(msg)
78     return 1
79     return 0
80    
81     def publishDataset(self,file):
82     """
83     """
84     try:
85     jobReport = readJobReport(file)[0]
86     self.exit_status = '0'
87     except IndexError:
88     self.exit_status = '1'
89     msg = "Error: Problem with "+file+" file"
90     common.logger.message(msg)
91     return self.exit_status
92 fanzago 1.2
93 slacapra 1.1 if (self.datasetpath != 'None'):
94 fanzago 1.2 common.logger.message("--->>> Importing parent dataset in the dbs")
95 fanzago 1.4 status_import=self.importParentDataset(self.globalDBS, self.datasetpath)
96 slacapra 1.1 if (status_import == 1):
97 fanzago 1.4 common.logger.message('Problem with parent import from the global DBS '+self.globalDBS+ 'to the local one '+self.DBSURL)
98 slacapra 1.1 self.exit_status='1'
99     return self.exit_status
100 fanzago 1.2 common.logger.message("Parent import ok")
101    
102 slacapra 1.1 #// DBS to contact
103 fanzago 1.2 dbswriter = DBSWriter(self.DBSURL)
104 slacapra 1.1 try:
105     fileinfo= jobReport.files[0]
106     self.exit_status = '0'
107     except IndexError:
108     self.exit_status = '1'
109     msg = "Error: No file to publish in xml file"+file+" file"
110     common.logger.message(msg)
111     return self.exit_status
112    
113     datasets=fileinfo.dataset
114 fanzago 1.4 common.logger.debug(6,"FileInfo = " + str(fileinfo))
115     common.logger.debug(6,"DatasetInfo = " + str(datasets))
116 slacapra 1.1 for dataset in datasets:
117 fanzago 1.6 #### for production data
118     if (dataset['PrimaryDataset'] == 'null'):
119     dataset['PrimaryDataset'] = dataset['ProcessedDataset']
120    
121 fanzago 1.4 dataset['PSetContent']=self.content
122     cfgMeta = {'name' : self.pset , 'Type' : 'user' , 'annotation': 'user cfg', 'version' : 'private version'} # add real name of user cfg
123 slacapra 1.1 common.logger.message("PrimaryDataset = %s"%dataset['PrimaryDataset'])
124     common.logger.message("ProcessedDataset = %s"%dataset['ProcessedDataset'])
125 fanzago 1.6 common.logger.message("<User Dataset Name> = /"+dataset['PrimaryDataset']+"/"+dataset['ProcessedDataset']+"/USER")
126    
127     common.logger.debug(6,"--->>> Inserting primary: %s processed : %s"%(dataset['PrimaryDataset'],dataset['ProcessedDataset']))
128 slacapra 1.1
129     primary = DBSWriterObjects.createPrimaryDataset( dataset, dbswriter.dbs)
130 fanzago 1.4 common.logger.debug(6,"Primary: %s "%primary)
131 slacapra 1.1
132     algo = DBSWriterObjects.createAlgorithm(dataset, cfgMeta, dbswriter.dbs)
133 fanzago 1.4 common.logger.debug(6,"Algo: %s "%algo)
134 slacapra 1.1
135     processed = DBSWriterObjects.createProcessedDataset(primary, algo, dataset, dbswriter.dbs)
136 fanzago 1.4 common.logger.debug(6,"Processed: %s "%processed)
137 slacapra 1.1
138 fanzago 1.6 common.logger.debug(6,"Inserted primary %s processed %s"%(primary,processed))
139 fanzago 1.2
140 fanzago 1.4 common.logger.debug(6,"exit_status = %s "%self.exit_status)
141 slacapra 1.1 return self.exit_status
142    
143     def publishAJobReport(self,file,procdataset):
144     """
145 fanzago 1.2 input: xml file, processedDataset
146 slacapra 1.1 """
147     try:
148     jobReport = readJobReport(file)[0]
149     self.exit_status = '0'
150     except IndexError:
151     self.exit_status = '1'
152     msg = "Error: Problem with "+file+" file"
153     raise CrabException(msg)
154 fanzago 1.4 ### overwrite ProcessedDataset with user defined value
155     ### overwrite lumisections with no value
156     ### skip publication for 0 events files
157     filestopublish=[]
158 slacapra 1.1 for file in jobReport.files:
159 fanzago 1.5 #### added check for problem with copy to SE and empty lfn
160     if (string.find(file['LFN'], 'copy_problems') != -1):
161     self.problemFiles.append(file['LFN'])
162     elif (file['LFN'] == ''):
163     self.noLFN.append(file['PFN'])
164 fanzago 1.4 else:
165 fanzago 1.5 if int(file['TotalEvents']) != 0 :
166     file.lumisections = {}
167     for ds in file.dataset:
168     ds['ProcessedDataset']=procdataset
169 fanzago 1.6 ### Fede for production
170     if (ds['PrimaryDataset'] == 'null'):
171     ds['PrimaryDataset']=procdataset
172 fanzago 1.5 filestopublish.append(file)
173     else:
174     self.noEventsFiles.append(file['LFN'])
175 fanzago 1.4 jobReport.files = filestopublish
176     ### if all files of FJR have number of events = 0
177     if (len(filestopublish) == 0):
178     return None
179    
180 slacapra 1.1 #// DBS to contact
181 fanzago 1.2 dbswriter = DBSWriter(self.DBSURL)
182 slacapra 1.1 # insert files
183     Blocks=None
184     try:
185     Blocks=dbswriter.insertFiles(jobReport)
186 fanzago 1.2 common.logger.message("Blocks = %s"%Blocks)
187 slacapra 1.1 except DBSWriterError, ex:
188 fanzago 1.2 common.logger.message("Insert file error: %s"%ex)
189 slacapra 1.1 return Blocks
190    
191     def run(self):
192     """
193     parse of all xml file on res dir and creation of distionary
194     """
195 fanzago 1.2
196 slacapra 1.1 file_list = glob.glob(self.resDir+"crab_fjr*.xml")
197     common.logger.debug(6, "file_list = "+str(file_list))
198     common.logger.debug(6, "len(file_list) = "+str(len(file_list)))
199 fanzago 1.2
200 slacapra 1.1 if (len(file_list)>0):
201     BlocksList=[]
202 fanzago 1.2 common.logger.message("--->>> Start dataset publication")
203     self.exit_status=self.publishDataset(file_list[0])
204     if (self.exit_status == '1'):
205     return self.exit_status
206     common.logger.message("--->>> End dataset publication")
207    
208    
209     common.logger.message("--->>> Start files publication")
210 slacapra 1.1 for file in file_list:
211     common.logger.message("file = "+file)
212     Blocks=self.publishAJobReport(file,self.processedData)
213     if Blocks:
214     [BlocksList.append(x) for x in Blocks]
215 fanzago 1.2
216 slacapra 1.1 # close the blocks
217 fanzago 1.6 common.logger.debug(6, "BlocksList = %s"%BlocksList)
218 fanzago 1.2 # dbswriter = DBSWriter(self.DBSURL,level='ERROR')
219     dbswriter = DBSWriter(self.DBSURL)
220    
221 slacapra 1.1 for BlockName in BlocksList:
222     try:
223     closeBlock=dbswriter.manageFileBlock(BlockName,maxFiles= 1)
224 fanzago 1.6 common.logger.debug(6, "closeBlock %s"%closeBlock)
225 slacapra 1.1 #dbswriter.dbs.closeBlock(BlockName)
226     except DBSWriterError, ex:
227 fanzago 1.2 common.logger.message("Close block error %s"%ex)
228 fanzago 1.4
229 fanzago 1.5 if (len(self.noEventsFiles)>0):
230     common.logger.message("--->>> WARNING: "+str(len(self.noEventsFiles))+" files not published because they contain 0 events are:")
231     for lfn in self.noEventsFiles:
232     common.logger.message("------ LFN: %s"%lfn)
233     if (len(self.noLFN)>0):
234     common.logger.message("--->>> WARNING: there are "+str(len(self.noLFN))+" files not published because they have empty LFN")
235     for pfn in self.noLFN:
236     common.logger.message("------ pfn: %s"%pfn)
237     if (len(self.problemFiles)>0):
238     common.logger.message("--->>> WARNING: "+str(len(self.problemFiles))+" files not published because they had problem with copy to SE")
239     for lfn in self.problemFiles:
240 fanzago 1.4 common.logger.message("------ LFN: %s"%lfn)
241 fanzago 1.6 common.logger.message("--->>> End files publication")
242 fanzago 1.7 common.logger.message("--->>> To check data publication please use: InspectDBS2.py --DBSURL=<dbs_url_for_publication> --datasetPath=<User Dataset Name>")
243 slacapra 1.1 return self.exit_status
244    
245     else:
246 fanzago 1.2 common.logger.message("--->>> "+self.resDir+" empty: no file to publish on DBS")
247 slacapra 1.1 self.exit_status = '1'
248     return self.exit_status
249