ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Publisher.py
Revision: 1.3
Committed: Wed Nov 21 14:40:27 2007 UTC (17 years, 5 months ago) by fanzago
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_0_3
Changes since 1.2: +7 -2 lines
Log Message:
added a control on dbs_url_for_publication entry

File Contents

# User Rev Content
1 fanzago 1.2 import getopt, string
2 slacapra 1.1 import common
3     import time, glob
4     from Actor import *
5     from crab_util import *
6     from crab_logger import Logger
7     from crab_exceptions import *
8 fanzago 1.2 from FwkJobRep.ReportParser import readJobReport
9 slacapra 1.1 from ProdCommon.MCPayloads.WorkflowSpec import WorkflowSpec
10     from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter
11     from ProdCommon.DataMgmt.DBS.DBSErrors import DBSWriterError, formatEx,DBSReaderError
12     from ProdCommon.DataMgmt.DBS.DBSReader import DBSReader
13 fanzago 1.2 from ProdCommon.DataMgmt.DBS.DBSWriter import DBSWriter,DBSWriterObjects
14     import sys
15 slacapra 1.1
16     class Publisher(Actor):
17     def __init__(self, cfg_params):
18     """
19     Publisher class:
20    
21     - parses CRAB FrameworkJobReport on UI
22     - returns <file> section of xml in dictionary format for each xml file in crab_0_xxxx/res directory
23     - publishes output data on DBS and DLS
24     """
25    
26     try:
27     self.processedData = cfg_params['USER.publish_data_name']
28     except KeyError:
29     raise CrabException('Cannot publish output data, because you did not specify USER.publish_data_name parameter in the crab.cfg file')
30    
31     try:
32     if (int(cfg_params['USER.copy_data']) != 1): raise KeyError
33     except KeyError:
34     raise CrabException('You can not publish data because you did not selected *** copy_data = 1 *** in the crab.cfg file')
35    
36 fanzago 1.2 #common.logger.message('processedData = '+self.processedData)
37 slacapra 1.1 self.resDir = common.work_space.resDir()
38 fanzago 1.2 #common.logger.message('resDir = '+self.resDir)
39 fanzago 1.3 try:
40     self.DBSURL=cfg_params['USER.dbs_url_for_publication']
41     common.logger.message('dbs url = '+self.DBSURL)
42     except KeyError:
43     msg = "Error. The [USER] section does not have 'dbs_url_for_publication'"
44     msg = msg + " entry, necessary to publish the data"
45     raise CrabException(msg)
46 slacapra 1.1 self.datasetpath=cfg_params['CMSSW.datasetpath']
47 fanzago 1.2 #common.logger.message('datasetpath = '+self.datasetpath)
48 slacapra 1.1 self.SEName=''
49     self.CMSSW_VERSION=''
50     self.exit_status=''
51     self.time = time.strftime('%y%m%d_%H%M%S',time.localtime(time.time()))
52    
53     def importParentDataset(self,globalDBS, datasetpath):
54     """
55     """
56     dbsWriter = DBSWriter(self.DBSURL,level='ERROR')
57    
58     try:
59     dbsWriter.importDataset(globalDBS, self.datasetpath, self.DBSURL)
60     except DBSWriterError, ex:
61     msg = "Error importing dataset to be processed into local DBS\n"
62     msg += "Source Dataset: %s\n" % datasetpath
63     msg += "Source DBS: %s\n" % globalDBS
64     msg += "Destination DBS: %s\n" % self.DBSURL
65     common.logger.message(msg)
66     return 1
67     return 0
68    
69     def publishDataset(self,file):
70     """
71     """
72     try:
73     jobReport = readJobReport(file)[0]
74     self.exit_status = '0'
75     except IndexError:
76     self.exit_status = '1'
77     msg = "Error: Problem with "+file+" file"
78     common.logger.message(msg)
79     return self.exit_status
80 fanzago 1.2
81 slacapra 1.1 #### the globalDBS has to be written in the crab cfg file!!!!! ###############
82     if (self.datasetpath != 'None'):
83 fanzago 1.2 globalDBS="http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet"
84     common.logger.message("--->>> Importing parent dataset in the dbs")
85 slacapra 1.1 status_import=self.importParentDataset(globalDBS, self.datasetpath)
86     if (status_import == 1):
87     common.logger.message('Problem with parent import from the global DBS '+globalDBS+ 'to the local one '+self.DBSURL)
88     self.exit_status='1'
89     ## ___ >>>>>>> comment out the next line, if you have problem with the import
90     return self.exit_status
91 fanzago 1.2 common.logger.message("Parent import ok")
92    
93 slacapra 1.1 #// DBS to contact
94 fanzago 1.2 dbswriter = DBSWriter(self.DBSURL)
95 slacapra 1.1 try:
96     fileinfo= jobReport.files[0]
97 fanzago 1.2 #fileinfo.lumisections = {}
98 slacapra 1.1 self.exit_status = '0'
99     except IndexError:
100     self.exit_status = '1'
101     msg = "Error: No file to publish in xml file"+file+" file"
102     common.logger.message(msg)
103     return self.exit_status
104    
105 fanzago 1.2 #common.logger.message("FileInfo = " + str(fileinfo))
106 slacapra 1.1 datasets=fileinfo.dataset
107 fanzago 1.2 #common.logger.message("DatasetInfo = " + str(datasets))
108 slacapra 1.1 for dataset in datasets:
109     #### to understand how to fill cfgMeta info ###############
110     cfgMeta = {'name' : 'usercfg' , 'Type' : 'user' , 'annotation': 'user cfg', 'version' : 'private version'} # add real name of user cfg
111     common.logger.message("PrimaryDataset = %s"%dataset['PrimaryDataset'])
112     common.logger.message("ProcessedDataset = %s"%dataset['ProcessedDataset'])
113 fanzago 1.2 common.logger.message("--->>> Inserting primary: %s processed : %s"%(dataset['PrimaryDataset'],dataset['ProcessedDataset']))
114     #common.logger.message("dataset: %s"%dataset)
115 slacapra 1.1
116     primary = DBSWriterObjects.createPrimaryDataset( dataset, dbswriter.dbs)
117 fanzago 1.2 common.logger.message("Primary: %s "%primary)
118 slacapra 1.1
119     algo = DBSWriterObjects.createAlgorithm(dataset, cfgMeta, dbswriter.dbs)
120 fanzago 1.2 common.logger.message("Algo: %s "%algo)
121 slacapra 1.1
122     processed = DBSWriterObjects.createProcessedDataset(primary, algo, dataset, dbswriter.dbs)
123 fanzago 1.2 common.logger.message("Processed: %s "%processed)
124 slacapra 1.1
125     common.logger.message("Inserted primary %s processed %s"%(primary,processed))
126 fanzago 1.2
127     #common.logger.message("exit_status = %s "%self.exit_status)
128 slacapra 1.1 return self.exit_status
129    
130     def publishAJobReport(self,file,procdataset):
131     """
132 fanzago 1.2 input: xml file, processedDataset
133 slacapra 1.1 """
134     try:
135     jobReport = readJobReport(file)[0]
136     self.exit_status = '0'
137     except IndexError:
138     self.exit_status = '1'
139     msg = "Error: Problem with "+file+" file"
140     raise CrabException(msg)
141 fanzago 1.2 # overwrite ProcessedDataset with user defined value
142     # overwrite lumisections with no value
143 slacapra 1.1 for file in jobReport.files:
144 fanzago 1.2 file.lumisections = {}
145 slacapra 1.1 for ds in file.dataset:
146     ds['ProcessedDataset']=procdataset
147     #// DBS to contact
148 fanzago 1.2 dbswriter = DBSWriter(self.DBSURL)
149 slacapra 1.1 # insert files
150     Blocks=None
151     try:
152     Blocks=dbswriter.insertFiles(jobReport)
153 fanzago 1.2 common.logger.message("Blocks = %s"%Blocks)
154 slacapra 1.1 except DBSWriterError, ex:
155 fanzago 1.2 common.logger.message("Insert file error: %s"%ex)
156 slacapra 1.1 return Blocks
157    
158     def run(self):
159     """
160     parse of all xml file on res dir and creation of distionary
161     """
162 fanzago 1.2
163 slacapra 1.1 file_list = glob.glob(self.resDir+"crab_fjr*.xml")
164     common.logger.debug(6, "file_list = "+str(file_list))
165     common.logger.debug(6, "len(file_list) = "+str(len(file_list)))
166 fanzago 1.2
167 slacapra 1.1 if (len(file_list)>0):
168     BlocksList=[]
169 fanzago 1.2 common.logger.message("--->>> Start dataset publication")
170     self.exit_status=self.publishDataset(file_list[0])
171     if (self.exit_status == '1'):
172     return self.exit_status
173     common.logger.message("--->>> End dataset publication")
174    
175    
176     common.logger.message("--->>> Start files publication")
177 slacapra 1.1 for file in file_list:
178     common.logger.message("file = "+file)
179     Blocks=self.publishAJobReport(file,self.processedData)
180     if Blocks:
181     [BlocksList.append(x) for x in Blocks]
182 fanzago 1.2
183 slacapra 1.1 # close the blocks
184 fanzago 1.2 common.logger.message("BlocksList = %s"%BlocksList)
185     # dbswriter = DBSWriter(self.DBSURL,level='ERROR')
186     dbswriter = DBSWriter(self.DBSURL)
187    
188 slacapra 1.1 for BlockName in BlocksList:
189     try:
190     closeBlock=dbswriter.manageFileBlock(BlockName,maxFiles= 1)
191 fanzago 1.2 common.logger.message("closeBlock %s"%closeBlock)
192 slacapra 1.1 #dbswriter.dbs.closeBlock(BlockName)
193     except DBSWriterError, ex:
194 fanzago 1.2 common.logger.message("Close block error %s"%ex)
195     common.logger.message("--->>> End files publication")
196 slacapra 1.1 return self.exit_status
197    
198     else:
199 fanzago 1.2 common.logger.message("--->>> "+self.resDir+" empty: no file to publish on DBS")
200 slacapra 1.1 self.exit_status = '1'
201     return self.exit_status
202