ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/DataDiscovery.py
Revision: 1.44
Committed: Wed May 26 19:46:12 2010 UTC (14 years, 11 months ago) by ewv
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_7_3_beta
Changes since 1.43: +7 -3 lines
Log Message:
Add runselection along with lumi_mask. Needs Splitter, LumiList, and DataDiscovery

File Contents

# User Rev Content
1 gutsche 1.6 #!/usr/bin/env python
2 ewv 1.33
3 ewv 1.44 __revision__ = "$Id: DataDiscovery.py,v 1.43 2010/05/04 15:49:40 spiga Exp $"
4     __version__ = "$Revision: 1.43 $"
5 ewv 1.33
6 slacapra 1.18 import exceptions
7     import DBSAPI.dbsApi
8 ewv 1.32 from DBSAPI.dbsApiException import *
9 slacapra 1.18 import common
10     from crab_util import *
11 spiga 1.41 from LumiList import LumiList
12 ewv 1.32 import os
13    
14 afanfani 1.1
15 afanfani 1.3
16 slacapra 1.18 class DBSError(exceptions.Exception):
17     def __init__(self, errorName, errorMessage):
18     args='\nERROR DBS %s : %s \n'%(errorName,errorMessage)
19     exceptions.Exception.__init__(self, args)
20     pass
21 ewv 1.32
22 slacapra 1.18 def getErrorMessage(self):
23     """ Return error message """
24     return "%s" % (self.args)
25    
26 ewv 1.32
27    
28 slacapra 1.18 class DBSInvalidDataTierError(exceptions.Exception):
29     def __init__(self, errorName, errorMessage):
30     args='\nERROR DBS %s : %s \n'%(errorName,errorMessage)
31     exceptions.Exception.__init__(self, args)
32     pass
33 ewv 1.32
34 slacapra 1.18 def getErrorMessage(self):
35     """ Return error message """
36     return "%s" % (self.args)
37    
38 ewv 1.32
39    
40 slacapra 1.18 class DBSInfoError:
41     def __init__(self, url):
42     print '\nERROR accessing DBS url : '+url+'\n'
43     pass
44    
45 ewv 1.32
46    
47 afanfani 1.3 class DataDiscoveryError(exceptions.Exception):
48 slacapra 1.7 def __init__(self, errorMessage):
49 gutsche 1.15 self.args=errorMessage
50 slacapra 1.14 exceptions.Exception.__init__(self, self.args)
51 slacapra 1.7 pass
52    
53     def getErrorMessage(self):
54     """ Return exception error """
55     return "%s" % (self.args)
56 afanfani 1.3
57 ewv 1.32
58    
59 afanfani 1.3 class NotExistingDatasetError(exceptions.Exception):
60 slacapra 1.7 def __init__(self, errorMessage):
61 gutsche 1.15 self.args=errorMessage
62 slacapra 1.14 exceptions.Exception.__init__(self, self.args)
63 slacapra 1.7 pass
64    
65     def getErrorMessage(self):
66     """ Return exception error """
67     return "%s" % (self.args)
68 afanfani 1.1
69 ewv 1.32
70    
71 afanfani 1.3 class NoDataTierinProvenanceError(exceptions.Exception):
72 slacapra 1.7 def __init__(self, errorMessage):
73 gutsche 1.15 self.args=errorMessage
74 slacapra 1.14 exceptions.Exception.__init__(self, self.args)
75 slacapra 1.7 pass
76    
77     def getErrorMessage(self):
78     """ Return exception error """
79     return "%s" % (self.args)
80 afanfani 1.1
81 ewv 1.32
82    
83 afanfani 1.1 class DataDiscovery:
84 ewv 1.32 """
85     Class to find and extact info from published data
86     """
87 spiga 1.22 def __init__(self, datasetPath, cfg_params, skipAnBlocks):
88 afanfani 1.1
89 slacapra 1.18 # Attributes
90 slacapra 1.11 self.datasetPath = datasetPath
91 ewv 1.32 # Analysis dataset is primary/processed/tier/definition
92 spiga 1.34 self.ads = len(self.datasetPath.split("/")) > 4
93 afanfani 1.1 self.cfg_params = cfg_params
94 spiga 1.22 self.skipBlocks = skipAnBlocks
95 afanfani 1.1
96 slacapra 1.11 self.eventsPerBlock = {} # DBS output: map fileblocks-events for collection
97     self.eventsPerFile = {} # DBS output: map files-events
98 ewv 1.32 # self.lumisPerBlock = {} # DBS output: number of lumis in each block
99     # self.lumisPerFile = {} # DBS output: number of lumis in each file
100     self.blocksinfo = {} # DBS output: map fileblocks-files
101 slacapra 1.18 self.maxEvents = 0 # DBS output: max events
102 ewv 1.32 self.maxLumis = 0 # DBS output: total number of lumis
103     self.parent = {} # DBS output: parents of each file
104     self.lumis = {} # DBS output: lumis in each file
105 spiga 1.41 self.lumiMask = None
106 afanfani 1.1
107     def fetchDBSInfo(self):
108     """
109     Contact DBS
110     """
111 slacapra 1.11 ## get DBS URL
112 spiga 1.25 global_url="http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet"
113 spiga 1.43 dbs_url= self.cfg_params.get('CMSSW.dbs_url', global_url)
114 spiga 1.36 common.logger.info("Accessing DBS at: "+dbs_url)
115 slacapra 1.18
116     ## check if runs are selected
117 slacapra 1.19 runselection = []
118     if (self.cfg_params.has_key('CMSSW.runselection')):
119 slacapra 1.18 runselection = parseRange2(self.cfg_params['CMSSW.runselection'])
120    
121 ewv 1.42 ## check if various lumi parameters are set
122 spiga 1.41 self.lumiMask = self.cfg_params.get('CMSSW.lumi_mask',None)
123 ewv 1.42 self.lumiParams = self.cfg_params.get('CMSSW.total_number_of_lumis',None) or \
124     self.cfg_params.get('CMSSW.lumis_per_job',None)
125    
126 spiga 1.41 lumiList = None
127     if self.lumiMask:
128     lumiList = LumiList(filename=self.lumiMask)
129 ewv 1.44 if runselection:
130     runList = LumiList(runs = runselection)
131 spiga 1.26
132     self.splitByRun = int(self.cfg_params.get('CMSSW.split_by_run', 0))
133    
134 spiga 1.31 common.logger.log(10-1,"runselection is: %s"%runselection)
135 slacapra 1.18 ## service API
136     args = {}
137     args['url'] = dbs_url
138     args['level'] = 'CRITICAL'
139    
140 spiga 1.21 ## check if has been requested to use the parent info
141 spiga 1.26 useparent = int(self.cfg_params.get('CMSSW.use_parent',0))
142 spiga 1.21
143 ewv 1.32 ## check if has been asked for a non default file to store/read analyzed fileBlocks
144     defaultName = common.work_space.shareDir()+'AnalyzedBlocks.txt'
145 spiga 1.22 fileBlocks_FileName = os.path.abspath(self.cfg_params.get('CMSSW.fileblocks_file',defaultName))
146 ewv 1.32
147 slacapra 1.18 api = DBSAPI.dbsApi.DbsApi(args)
148 spiga 1.27 self.files = self.queryDbs(api,path=self.datasetPath,runselection=runselection,useParent=useparent)
149 slacapra 1.11
150 spiga 1.22 anFileBlocks = []
151 ewv 1.32 if self.skipBlocks: anFileBlocks = readTXTfile(self, fileBlocks_FileName)
152 spiga 1.22
153 slacapra 1.18 # parse files and fill arrays
154 spiga 1.26 for file in self.files :
155 ewv 1.32 parList = []
156 spiga 1.41 fileLumis = [] # List of tuples
157 spiga 1.22 # skip already analyzed blocks
158     fileblock = file['Block']['Name']
159     if fileblock not in anFileBlocks :
160     filename = file['LogicalFileName']
161 ewv 1.32 # asked retry the list of parent for the given child
162     if useparent==1:
163     parList = [x['LogicalFileName'] for x in file['ParentList']]
164 ewv 1.42 if self.ads or self.lumiMask or self.lumiParams:
165 spiga 1.41 fileLumis = [ (x['RunNumber'], x['LumiSectionNumber'])
166 ewv 1.32 for x in file['LumiList'] ]
167     self.parent[filename] = parList
168 spiga 1.41 # For LumiMask, intersection of two lists.
169     if self.lumiMask:
170     self.lumis[filename] = lumiList.filterLumis(fileLumis)
171 ewv 1.44 if runselection:
172     self.lumis[filename] = runList.filterLumis(self.lumis[filename])
173 spiga 1.41 else:
174     self.lumis[filename] = fileLumis
175 spiga 1.22 if filename.find('.dat') < 0 :
176     events = file['NumberOfEvents']
177 ewv 1.32 # Count number of events and lumis per block
178 spiga 1.22 if fileblock in self.eventsPerBlock.keys() :
179     self.eventsPerBlock[fileblock] += events
180     else :
181     self.eventsPerBlock[fileblock] = events
182 ewv 1.32 # Number of events per file
183 spiga 1.22 self.eventsPerFile[filename] = events
184 ewv 1.32
185     # List of files per block
186 spiga 1.22 if fileblock in self.blocksinfo.keys() :
187     self.blocksinfo[fileblock].append(filename)
188     else :
189     self.blocksinfo[fileblock] = [filename]
190 ewv 1.32
191 spiga 1.22 # total number of events
192     self.maxEvents += events
193 spiga 1.41 self.maxLumis += len(self.lumis[filename])
194 ewv 1.32
195 spiga 1.22 if self.skipBlocks and len(self.eventsPerBlock.keys()) == 0:
196     msg = "No new fileblocks available for dataset: "+str(self.datasetPath)
197 ewv 1.32 raise CrabException(msg)
198 slacapra 1.11
199 ewv 1.32 saveFblocks=''
200 slacapra 1.18 for block in self.eventsPerBlock.keys() :
201 ewv 1.32 saveFblocks += str(block)+'\n'
202 spiga 1.31 common.logger.log(10-1,"DBSInfo: total nevts %i in block %s "%(self.eventsPerBlock[block],block))
203 ewv 1.32 writeTXTfile(self, fileBlocks_FileName , saveFblocks)
204    
205 slacapra 1.11 if len(self.eventsPerBlock) <= 0:
206 slacapra 1.18 raise NotExistingDatasetError(("\nNo data for %s in DBS\nPlease check"
207 slacapra 1.11 + " dataset path variables in crab.cfg")
208 slacapra 1.18 % self.datasetPath)
209 afanfani 1.1
210    
211 ewv 1.32 def queryDbs(self,api,path=None,runselection=None,useParent=None):
212 spiga 1.26
213 spiga 1.30 allowedRetriveValue = ['retrive_block', 'retrive_run']
214 ewv 1.42 if self.ads or self.lumiMask or self.lumiParams:
215 spiga 1.41 allowedRetriveValue.append('retrive_lumi')
216 ewv 1.32 if useParent == 1: allowedRetriveValue.append('retrive_parent')
217     common.logger.debug("Set of input parameters used for DBS query: %s" % allowedRetriveValue)
218 spiga 1.26 try:
219 ewv 1.44 if len(runselection) <=0 or self.ads or self.lumiMask:
220 ewv 1.42 if useParent==1 or self.splitByRun==1 or self.ads or self.lumiMask or self.lumiParams:
221 ewv 1.32 if self.ads:
222     files = api.listFiles(analysisDataset=path, retriveList=allowedRetriveValue)
223 spiga 1.26 else :
224 ewv 1.32 files = api.listFiles(path=path, retriveList=allowedRetriveValue)
225 spiga 1.26 else:
226     files = api.listDatasetFiles(self.datasetPath)
227     else :
228     files=[]
229     for arun in runselection:
230     try:
231 ewv 1.32 if self.ads:
232     filesinrun = api.listFiles(analysisDataset=path,retriveList=allowedRetriveValue,runNumber=arun)
233     else:
234     filesinrun = api.listFiles(path=path,retriveList=allowedRetriveValue,runNumber=arun)
235 spiga 1.26 files.extend(filesinrun)
236     except:
237     msg="WARNING: problem extracting info from DBS for run %s "%arun
238 spiga 1.31 common.logger.info(msg)
239 spiga 1.26 pass
240    
241     except DbsBadRequest, msg:
242     raise DataDiscoveryError(msg)
243     except DBSError, msg:
244     raise DataDiscoveryError(msg)
245    
246     return files
247    
248 ewv 1.32
249 afanfani 1.1 def getMaxEvents(self):
250     """
251 ewv 1.32 max events
252 afanfani 1.1 """
253 slacapra 1.18 return self.maxEvents
254 afanfani 1.1
255 ewv 1.32
256 ewv 1.33 def getMaxLumis(self):
257     """
258     Return the number of lumis in the dataset
259     """
260     return self.maxLumis
261    
262    
263 slacapra 1.11 def getEventsPerBlock(self):
264 afanfani 1.1 """
265 ewv 1.32 list the event collections structure by fileblock
266 afanfani 1.1 """
267 slacapra 1.11 return self.eventsPerBlock
268 afanfani 1.1
269 ewv 1.32
270 slacapra 1.11 def getEventsPerFile(self):
271 afanfani 1.1 """
272 ewv 1.32 list the event collections structure by file
273 afanfani 1.1 """
274 slacapra 1.11 return self.eventsPerFile
275 afanfani 1.1
276 ewv 1.32
277 slacapra 1.11 def getFiles(self):
278 afanfani 1.1 """
279 ewv 1.32 return files grouped by fileblock
280 afanfani 1.1 """
281 ewv 1.32 return self.blocksinfo
282    
283 afanfani 1.1
284 spiga 1.21 def getParent(self):
285     """
286 ewv 1.32 return parent grouped by file
287     """
288     return self.parent
289    
290    
291     def getLumis(self):
292     """
293     return lumi sections grouped by file
294 spiga 1.21 """
295 ewv 1.32 return self.lumis
296    
297 spiga 1.21
298 spiga 1.26 def getListFiles(self):
299     """
300 ewv 1.32 return parent grouped by file
301 spiga 1.26 """
302 ewv 1.32 return self.files