ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Publisher.py
Revision: 1.5
Committed: Fri Dec 7 13:26:14 2007 UTC (17 years, 4 months ago) by fanzago
Content type: text/x-python
Branch: MAIN
Changes since 1.4: +26 -10 lines
Log Message:
added some check for data publication

File Contents

# User Rev Content
1 fanzago 1.2 import getopt, string
2 slacapra 1.1 import common
3     import time, glob
4     from Actor import *
5     from crab_util import *
6     from crab_logger import Logger
7     from crab_exceptions import *
8 fanzago 1.2 from FwkJobRep.ReportParser import readJobReport
9 slacapra 1.1 from ProdCommon.MCPayloads.WorkflowSpec import WorkflowSpec
10     from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter
11     from ProdCommon.DataMgmt.DBS.DBSErrors import DBSWriterError, formatEx,DBSReaderError
12     from ProdCommon.DataMgmt.DBS.DBSReader import DBSReader
13 fanzago 1.2 from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter,DBSWriterObjects
14     import sys
15 slacapra 1.1
16     class Publisher(Actor):
17     def __init__(self, cfg_params):
18     """
19     Publisher class:
20    
21     - parses CRAB FrameworkJobReport on UI
22     - returns <file> section of xml in dictionary format for each xml file in crab_0_xxxx/res directory
23     - publishes output data on DBS and DLS
24     """
25    
26     try:
27     self.processedData = cfg_params['USER.publish_data_name']
28     except KeyError:
29     raise CrabException('Cannot publish output data, because you did not specify USER.publish_data_name parameter in the crab.cfg file')
30     try:
31     if (int(cfg_params['USER.copy_data']) != 1): raise KeyError
32     except KeyError:
33     raise CrabException('You can not publish data because you did not selected *** copy_data = 1 *** in the crab.cfg file')
34 fanzago 1.4 try:
35     self.pset = cfg_params['CMSSW.pset']
36     except KeyError:
37     raise CrabException('Cannot publish output data, because you did not specify the psetname in [CMSSW] of your crab.cfg file')
38     try:
39     self.globalDBS=cfg_params['CMSSW.dbs_url']
40     except KeyError:
41     self.globalDBS="http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet"
42 fanzago 1.3 try:
43     self.DBSURL=cfg_params['USER.dbs_url_for_publication']
44     common.logger.message('dbs url = '+self.DBSURL)
45 fanzago 1.4 if (self.DBSURL == "http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet"):
46     msg = "You can not publish your data in the globalDBS = " + self.DBSURL + "\n"
47     msg = msg + "Please write your local one in the [USER] section 'dbs_url_for_publication'"
48     raise CrabException(msg)
49 fanzago 1.3 except KeyError:
50     msg = "Error. The [USER] section does not have 'dbs_url_for_publication'"
51     msg = msg + " entry, necessary to publish the data"
52     raise CrabException(msg)
53 fanzago 1.4
54     self.content=file(self.pset).read()
55     self.resDir = common.work_space.resDir()
56 slacapra 1.1 self.datasetpath=cfg_params['CMSSW.datasetpath']
57     self.SEName=''
58     self.CMSSW_VERSION=''
59     self.exit_status=''
60     self.time = time.strftime('%y%m%d_%H%M%S',time.localtime(time.time()))
61 fanzago 1.5 self.problemFiles=[]
62     self.noEventsFiles=[]
63     self.noLFN=[]
64 slacapra 1.1
65     def importParentDataset(self,globalDBS, datasetpath):
66     """
67     """
68     dbsWriter = DBSWriter(self.DBSURL,level='ERROR')
69    
70     try:
71     dbsWriter.importDataset(globalDBS, self.datasetpath, self.DBSURL)
72     except DBSWriterError, ex:
73     msg = "Error importing dataset to be processed into local DBS\n"
74     msg += "Source Dataset: %s\n" % datasetpath
75     msg += "Source DBS: %s\n" % globalDBS
76     msg += "Destination DBS: %s\n" % self.DBSURL
77     common.logger.message(msg)
78     return 1
79     return 0
80    
81     def publishDataset(self,file):
82     """
83     """
84     try:
85     jobReport = readJobReport(file)[0]
86     self.exit_status = '0'
87     except IndexError:
88     self.exit_status = '1'
89     msg = "Error: Problem with "+file+" file"
90     common.logger.message(msg)
91     return self.exit_status
92 fanzago 1.2
93 slacapra 1.1 if (self.datasetpath != 'None'):
94 fanzago 1.2 common.logger.message("--->>> Importing parent dataset in the dbs")
95 fanzago 1.4 status_import=self.importParentDataset(self.globalDBS, self.datasetpath)
96 slacapra 1.1 if (status_import == 1):
97 fanzago 1.4 common.logger.message('Problem with parent import from the global DBS '+self.globalDBS+ 'to the local one '+self.DBSURL)
98 slacapra 1.1 self.exit_status='1'
99     return self.exit_status
100 fanzago 1.2 common.logger.message("Parent import ok")
101    
102 slacapra 1.1 #// DBS to contact
103 fanzago 1.2 dbswriter = DBSWriter(self.DBSURL)
104 slacapra 1.1 try:
105     fileinfo= jobReport.files[0]
106     self.exit_status = '0'
107     except IndexError:
108     self.exit_status = '1'
109     msg = "Error: No file to publish in xml file"+file+" file"
110     common.logger.message(msg)
111     return self.exit_status
112    
113     datasets=fileinfo.dataset
114 fanzago 1.4 common.logger.debug(6,"FileInfo = " + str(fileinfo))
115     common.logger.debug(6,"DatasetInfo = " + str(datasets))
116 slacapra 1.1 for dataset in datasets:
117 fanzago 1.4 dataset['PSetContent']=self.content
118     #cfgMeta = {'name' : 'usercfg' , 'Type' : 'user' , 'annotation': 'user cfg', 'version' : 'private version'} # add real name of user cfg
119     cfgMeta = {'name' : self.pset , 'Type' : 'user' , 'annotation': 'user cfg', 'version' : 'private version'} # add real name of user cfg
120 slacapra 1.1 common.logger.message("PrimaryDataset = %s"%dataset['PrimaryDataset'])
121     common.logger.message("ProcessedDataset = %s"%dataset['ProcessedDataset'])
122 fanzago 1.2 common.logger.message("--->>> Inserting primary: %s processed : %s"%(dataset['PrimaryDataset'],dataset['ProcessedDataset']))
123 slacapra 1.1
124     primary = DBSWriterObjects.createPrimaryDataset( dataset, dbswriter.dbs)
125 fanzago 1.4 common.logger.debug(6,"Primary: %s "%primary)
126 slacapra 1.1
127     algo = DBSWriterObjects.createAlgorithm(dataset, cfgMeta, dbswriter.dbs)
128 fanzago 1.4 common.logger.debug(6,"Algo: %s "%algo)
129 slacapra 1.1
130     processed = DBSWriterObjects.createProcessedDataset(primary, algo, dataset, dbswriter.dbs)
131 fanzago 1.4 common.logger.debug(6,"Processed: %s "%processed)
132 slacapra 1.1
133     common.logger.message("Inserted primary %s processed %s"%(primary,processed))
134 fanzago 1.2
135 fanzago 1.4 common.logger.debug(6,"exit_status = %s "%self.exit_status)
136 slacapra 1.1 return self.exit_status
137    
138     def publishAJobReport(self,file,procdataset):
139     """
140 fanzago 1.2 input: xml file, processedDataset
141 slacapra 1.1 """
142     try:
143     jobReport = readJobReport(file)[0]
144     self.exit_status = '0'
145     except IndexError:
146     self.exit_status = '1'
147     msg = "Error: Problem with "+file+" file"
148     raise CrabException(msg)
149 fanzago 1.4 ### overwrite ProcessedDataset with user defined value
150     ### overwrite lumisections with no value
151     ### skip publication for 0 events files
152     filestopublish=[]
153 slacapra 1.1 for file in jobReport.files:
154 fanzago 1.5 #### added check for problem with copy to SE and empty lfn
155     if (string.find(file['LFN'], 'copy_problems') != -1):
156     self.problemFiles.append(file['LFN'])
157     elif (file['LFN'] == ''):
158     self.noLFN.append(file['PFN'])
159 fanzago 1.4 else:
160 fanzago 1.5 if int(file['TotalEvents']) != 0 :
161     file.lumisections = {}
162     for ds in file.dataset:
163     ds['ProcessedDataset']=procdataset
164     filestopublish.append(file)
165     else:
166     self.noEventsFiles.append(file['LFN'])
167 fanzago 1.4 jobReport.files = filestopublish
168     ### if all files of FJR have number of events = 0
169     if (len(filestopublish) == 0):
170     return None
171    
172 slacapra 1.1 #// DBS to contact
173 fanzago 1.2 dbswriter = DBSWriter(self.DBSURL)
174 slacapra 1.1 # insert files
175     Blocks=None
176     try:
177     Blocks=dbswriter.insertFiles(jobReport)
178 fanzago 1.2 common.logger.message("Blocks = %s"%Blocks)
179 slacapra 1.1 except DBSWriterError, ex:
180 fanzago 1.2 common.logger.message("Insert file error: %s"%ex)
181 slacapra 1.1 return Blocks
182    
183     def run(self):
184     """
185     parse of all xml file on res dir and creation of distionary
186     """
187 fanzago 1.2
188 slacapra 1.1 file_list = glob.glob(self.resDir+"crab_fjr*.xml")
189     common.logger.debug(6, "file_list = "+str(file_list))
190     common.logger.debug(6, "len(file_list) = "+str(len(file_list)))
191 fanzago 1.2
192 slacapra 1.1 if (len(file_list)>0):
193     BlocksList=[]
194 fanzago 1.2 common.logger.message("--->>> Start dataset publication")
195     self.exit_status=self.publishDataset(file_list[0])
196     if (self.exit_status == '1'):
197     return self.exit_status
198     common.logger.message("--->>> End dataset publication")
199    
200    
201     common.logger.message("--->>> Start files publication")
202 slacapra 1.1 for file in file_list:
203     common.logger.message("file = "+file)
204     Blocks=self.publishAJobReport(file,self.processedData)
205     if Blocks:
206     [BlocksList.append(x) for x in Blocks]
207 fanzago 1.2
208 slacapra 1.1 # close the blocks
209 fanzago 1.2 common.logger.message("BlocksList = %s"%BlocksList)
210     # dbswriter = DBSWriter(self.DBSURL,level='ERROR')
211     dbswriter = DBSWriter(self.DBSURL)
212    
213 slacapra 1.1 for BlockName in BlocksList:
214     try:
215     closeBlock=dbswriter.manageFileBlock(BlockName,maxFiles= 1)
216 fanzago 1.2 common.logger.message("closeBlock %s"%closeBlock)
217 slacapra 1.1 #dbswriter.dbs.closeBlock(BlockName)
218     except DBSWriterError, ex:
219 fanzago 1.2 common.logger.message("Close block error %s"%ex)
220 fanzago 1.4
221 fanzago 1.2 common.logger.message("--->>> End files publication")
222 fanzago 1.5 if (len(self.noEventsFiles)>0):
223     common.logger.message("--->>> WARNING: "+str(len(self.noEventsFiles))+" files not published because they contain 0 events are:")
224     for lfn in self.noEventsFiles:
225     common.logger.message("------ LFN: %s"%lfn)
226     if (len(self.noLFN)>0):
227     common.logger.message("--->>> WARNING: there are "+str(len(self.noLFN))+" files not published because they have empty LFN")
228     for pfn in self.noLFN:
229     common.logger.message("------ pfn: %s"%pfn)
230     if (len(self.problemFiles)>0):
231     common.logger.message("--->>> WARNING: "+str(len(self.problemFiles))+" files not published because they had problem with copy to SE")
232     for lfn in self.problemFiles:
233 fanzago 1.4 common.logger.message("------ LFN: %s"%lfn)
234 slacapra 1.1 return self.exit_status
235    
236     else:
237 fanzago 1.2 common.logger.message("--->>> "+self.resDir+" empty: no file to publish on DBS")
238 slacapra 1.1 self.exit_status = '1'
239     return self.exit_status
240