ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/DataDiscovery.py
Revision: 1.46
Committed: Wed Jun 2 13:55:14 2010 UTC (14 years, 10 months ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_7_3, CRAB_2_7_3_pre3, CRAB_2_7_3_pre3_beta, CRAB_2_7_3_pre2, CRAB_2_7_3_pre2_beta
Changes since 1.45: +2 -7 lines
Log Message:
first change for -extend. Cache blocks only after B/W filtering

File Contents

# User Rev Content
1 gutsche 1.6 #!/usr/bin/env python
2 ewv 1.33
3 spiga 1.46 __revision__ = "$Id: DataDiscovery.py,v 1.45 2010/05/27 18:54:45 ewv Exp $"
4     __version__ = "$Revision: 1.45 $"
5 ewv 1.33
6 slacapra 1.18 import exceptions
7     import DBSAPI.dbsApi
8 ewv 1.32 from DBSAPI.dbsApiException import *
9 slacapra 1.18 import common
10     from crab_util import *
11 spiga 1.41 from LumiList import LumiList
12 ewv 1.32 import os
13    
14 afanfani 1.1
15 afanfani 1.3
16 slacapra 1.18 class DBSError(exceptions.Exception):
17     def __init__(self, errorName, errorMessage):
18     args='\nERROR DBS %s : %s \n'%(errorName,errorMessage)
19     exceptions.Exception.__init__(self, args)
20     pass
21 ewv 1.32
22 slacapra 1.18 def getErrorMessage(self):
23     """ Return error message """
24     return "%s" % (self.args)
25    
26 ewv 1.32
27    
28 slacapra 1.18 class DBSInvalidDataTierError(exceptions.Exception):
29     def __init__(self, errorName, errorMessage):
30     args='\nERROR DBS %s : %s \n'%(errorName,errorMessage)
31     exceptions.Exception.__init__(self, args)
32     pass
33 ewv 1.32
34 slacapra 1.18 def getErrorMessage(self):
35     """ Return error message """
36     return "%s" % (self.args)
37    
38 ewv 1.32
39    
40 slacapra 1.18 class DBSInfoError:
41     def __init__(self, url):
42     print '\nERROR accessing DBS url : '+url+'\n'
43     pass
44    
45 ewv 1.32
46    
47 afanfani 1.3 class DataDiscoveryError(exceptions.Exception):
48 slacapra 1.7 def __init__(self, errorMessage):
49 gutsche 1.15 self.args=errorMessage
50 slacapra 1.14 exceptions.Exception.__init__(self, self.args)
51 slacapra 1.7 pass
52    
53     def getErrorMessage(self):
54     """ Return exception error """
55     return "%s" % (self.args)
56 afanfani 1.3
57 ewv 1.32
58    
59 afanfani 1.3 class NotExistingDatasetError(exceptions.Exception):
60 slacapra 1.7 def __init__(self, errorMessage):
61 gutsche 1.15 self.args=errorMessage
62 slacapra 1.14 exceptions.Exception.__init__(self, self.args)
63 slacapra 1.7 pass
64    
65     def getErrorMessage(self):
66     """ Return exception error """
67     return "%s" % (self.args)
68 afanfani 1.1
69 ewv 1.32
70    
71 afanfani 1.3 class NoDataTierinProvenanceError(exceptions.Exception):
72 slacapra 1.7 def __init__(self, errorMessage):
73 gutsche 1.15 self.args=errorMessage
74 slacapra 1.14 exceptions.Exception.__init__(self, self.args)
75 slacapra 1.7 pass
76    
77     def getErrorMessage(self):
78     """ Return exception error """
79     return "%s" % (self.args)
80 afanfani 1.1
81 ewv 1.32
82    
83 afanfani 1.1 class DataDiscovery:
84 ewv 1.32 """
85     Class to find and extact info from published data
86     """
87 spiga 1.22 def __init__(self, datasetPath, cfg_params, skipAnBlocks):
88 afanfani 1.1
89 slacapra 1.18 # Attributes
90 slacapra 1.11 self.datasetPath = datasetPath
91 ewv 1.32 # Analysis dataset is primary/processed/tier/definition
92 spiga 1.34 self.ads = len(self.datasetPath.split("/")) > 4
93 afanfani 1.1 self.cfg_params = cfg_params
94 spiga 1.22 self.skipBlocks = skipAnBlocks
95 afanfani 1.1
96 slacapra 1.11 self.eventsPerBlock = {} # DBS output: map fileblocks-events for collection
97     self.eventsPerFile = {} # DBS output: map files-events
98 ewv 1.32 # self.lumisPerBlock = {} # DBS output: number of lumis in each block
99     # self.lumisPerFile = {} # DBS output: number of lumis in each file
100     self.blocksinfo = {} # DBS output: map fileblocks-files
101 slacapra 1.18 self.maxEvents = 0 # DBS output: max events
102 ewv 1.32 self.maxLumis = 0 # DBS output: total number of lumis
103     self.parent = {} # DBS output: parents of each file
104     self.lumis = {} # DBS output: lumis in each file
105 spiga 1.41 self.lumiMask = None
106 ewv 1.45 self.splitByLumi = False
107 afanfani 1.1
108     def fetchDBSInfo(self):
109     """
110     Contact DBS
111     """
112 slacapra 1.11 ## get DBS URL
113 spiga 1.25 global_url="http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet"
114 spiga 1.43 dbs_url= self.cfg_params.get('CMSSW.dbs_url', global_url)
115 spiga 1.36 common.logger.info("Accessing DBS at: "+dbs_url)
116 slacapra 1.18
117     ## check if runs are selected
118 slacapra 1.19 runselection = []
119     if (self.cfg_params.has_key('CMSSW.runselection')):
120 slacapra 1.18 runselection = parseRange2(self.cfg_params['CMSSW.runselection'])
121    
122 ewv 1.42 ## check if various lumi parameters are set
123 spiga 1.41 self.lumiMask = self.cfg_params.get('CMSSW.lumi_mask',None)
124 ewv 1.42 self.lumiParams = self.cfg_params.get('CMSSW.total_number_of_lumis',None) or \
125     self.cfg_params.get('CMSSW.lumis_per_job',None)
126    
127 spiga 1.41 lumiList = None
128     if self.lumiMask:
129     lumiList = LumiList(filename=self.lumiMask)
130 ewv 1.44 if runselection:
131     runList = LumiList(runs = runselection)
132 spiga 1.26
133     self.splitByRun = int(self.cfg_params.get('CMSSW.split_by_run', 0))
134 ewv 1.45 common.logger.log(10-1,"runselection is: %s"%runselection)
135    
136     if not self.splitByRun:
137     self.splitByLumi = self.lumiMask or self.lumiParams or self.ads
138 spiga 1.26
139 slacapra 1.18 ## service API
140     args = {}
141     args['url'] = dbs_url
142     args['level'] = 'CRITICAL'
143    
144 spiga 1.21 ## check if has been requested to use the parent info
145 spiga 1.26 useparent = int(self.cfg_params.get('CMSSW.use_parent',0))
146 spiga 1.21
147 ewv 1.32 ## check if has been asked for a non default file to store/read analyzed fileBlocks
148     defaultName = common.work_space.shareDir()+'AnalyzedBlocks.txt'
149 spiga 1.22 fileBlocks_FileName = os.path.abspath(self.cfg_params.get('CMSSW.fileblocks_file',defaultName))
150 ewv 1.32
151 slacapra 1.18 api = DBSAPI.dbsApi.DbsApi(args)
152 spiga 1.27 self.files = self.queryDbs(api,path=self.datasetPath,runselection=runselection,useParent=useparent)
153 slacapra 1.11
154 ewv 1.45 # Check to see what the dataset is
155     pdsName = self.datasetPath.split("/")[1]
156     primDSs = api.listPrimaryDatasets(pdsName)
157     dataType = primDSs[0]['Type']
158     common.logger.debug("Datatype is %s" % dataType)
159     if dataType == 'data' and not (self.splitByRun or self.splitByLumi):
160     msg = 'Data must be split by lumi or by run. ' \
161     'Please see crab -help for the correct settings'
162     raise CrabException(msg)
163    
164    
165    
166 spiga 1.22 anFileBlocks = []
167 ewv 1.32 if self.skipBlocks: anFileBlocks = readTXTfile(self, fileBlocks_FileName)
168 spiga 1.22
169 slacapra 1.18 # parse files and fill arrays
170 spiga 1.26 for file in self.files :
171 ewv 1.32 parList = []
172 spiga 1.41 fileLumis = [] # List of tuples
173 spiga 1.22 # skip already analyzed blocks
174     fileblock = file['Block']['Name']
175     if fileblock not in anFileBlocks :
176     filename = file['LogicalFileName']
177 ewv 1.32 # asked retry the list of parent for the given child
178     if useparent==1:
179     parList = [x['LogicalFileName'] for x in file['ParentList']]
180 ewv 1.45 if self.splitByLumi:
181 spiga 1.41 fileLumis = [ (x['RunNumber'], x['LumiSectionNumber'])
182 ewv 1.32 for x in file['LumiList'] ]
183     self.parent[filename] = parList
184 spiga 1.41 # For LumiMask, intersection of two lists.
185 ewv 1.45 if self.lumiMask and runselection:
186     self.lumis[filename] = runList.filterLumis(lumiList.filterLumis(fileLumis))
187     elif runselection:
188     self.lumis[filename] = runList.filterLumis(fileLumis)
189     elif self.lumiMask:
190 spiga 1.41 self.lumis[filename] = lumiList.filterLumis(fileLumis)
191     else:
192     self.lumis[filename] = fileLumis
193 spiga 1.22 if filename.find('.dat') < 0 :
194     events = file['NumberOfEvents']
195 ewv 1.32 # Count number of events and lumis per block
196 spiga 1.22 if fileblock in self.eventsPerBlock.keys() :
197     self.eventsPerBlock[fileblock] += events
198     else :
199     self.eventsPerBlock[fileblock] = events
200 ewv 1.32 # Number of events per file
201 spiga 1.22 self.eventsPerFile[filename] = events
202 ewv 1.32
203     # List of files per block
204 spiga 1.22 if fileblock in self.blocksinfo.keys() :
205     self.blocksinfo[fileblock].append(filename)
206     else :
207     self.blocksinfo[fileblock] = [filename]
208 ewv 1.32
209 spiga 1.22 # total number of events
210     self.maxEvents += events
211 spiga 1.41 self.maxLumis += len(self.lumis[filename])
212 ewv 1.32
213 spiga 1.22 if self.skipBlocks and len(self.eventsPerBlock.keys()) == 0:
214     msg = "No new fileblocks available for dataset: "+str(self.datasetPath)
215 ewv 1.32 raise CrabException(msg)
216 slacapra 1.11
217 ewv 1.32
218 slacapra 1.11 if len(self.eventsPerBlock) <= 0:
219 slacapra 1.18 raise NotExistingDatasetError(("\nNo data for %s in DBS\nPlease check"
220 slacapra 1.11 + " dataset path variables in crab.cfg")
221 slacapra 1.18 % self.datasetPath)
222 afanfani 1.1
223    
224 ewv 1.32 def queryDbs(self,api,path=None,runselection=None,useParent=None):
225 spiga 1.26
226 spiga 1.30 allowedRetriveValue = ['retrive_block', 'retrive_run']
227 ewv 1.42 if self.ads or self.lumiMask or self.lumiParams:
228 spiga 1.41 allowedRetriveValue.append('retrive_lumi')
229 ewv 1.32 if useParent == 1: allowedRetriveValue.append('retrive_parent')
230     common.logger.debug("Set of input parameters used for DBS query: %s" % allowedRetriveValue)
231 spiga 1.26 try:
232 ewv 1.45 if len(runselection) <=0 or self.splitByLumi:
233     if useParent==1 or self.splitByRun==1 or self.splitByLumi:
234 ewv 1.32 if self.ads:
235     files = api.listFiles(analysisDataset=path, retriveList=allowedRetriveValue)
236 spiga 1.26 else :
237 ewv 1.32 files = api.listFiles(path=path, retriveList=allowedRetriveValue)
238 spiga 1.26 else:
239     files = api.listDatasetFiles(self.datasetPath)
240     else :
241     files=[]
242     for arun in runselection:
243     try:
244 ewv 1.32 if self.ads:
245     filesinrun = api.listFiles(analysisDataset=path,retriveList=allowedRetriveValue,runNumber=arun)
246     else:
247     filesinrun = api.listFiles(path=path,retriveList=allowedRetriveValue,runNumber=arun)
248 spiga 1.26 files.extend(filesinrun)
249     except:
250     msg="WARNING: problem extracting info from DBS for run %s "%arun
251 spiga 1.31 common.logger.info(msg)
252 spiga 1.26 pass
253    
254     except DbsBadRequest, msg:
255     raise DataDiscoveryError(msg)
256     except DBSError, msg:
257     raise DataDiscoveryError(msg)
258    
259     return files
260    
261 ewv 1.32
262 afanfani 1.1 def getMaxEvents(self):
263     """
264 ewv 1.32 max events
265 afanfani 1.1 """
266 slacapra 1.18 return self.maxEvents
267 afanfani 1.1
268 ewv 1.32
269 ewv 1.33 def getMaxLumis(self):
270     """
271     Return the number of lumis in the dataset
272     """
273     return self.maxLumis
274    
275    
276 slacapra 1.11 def getEventsPerBlock(self):
277 afanfani 1.1 """
278 ewv 1.32 list the event collections structure by fileblock
279 afanfani 1.1 """
280 slacapra 1.11 return self.eventsPerBlock
281 afanfani 1.1
282 ewv 1.32
283 slacapra 1.11 def getEventsPerFile(self):
284 afanfani 1.1 """
285 ewv 1.32 list the event collections structure by file
286 afanfani 1.1 """
287 slacapra 1.11 return self.eventsPerFile
288 afanfani 1.1
289 ewv 1.32
290 slacapra 1.11 def getFiles(self):
291 afanfani 1.1 """
292 ewv 1.32 return files grouped by fileblock
293 afanfani 1.1 """
294 ewv 1.32 return self.blocksinfo
295    
296 afanfani 1.1
297 spiga 1.21 def getParent(self):
298     """
299 ewv 1.32 return parent grouped by file
300     """
301     return self.parent
302    
303    
304     def getLumis(self):
305     """
306     return lumi sections grouped by file
307 spiga 1.21 """
308 ewv 1.32 return self.lumis
309    
310 spiga 1.21
311 spiga 1.26 def getListFiles(self):
312     """
313 ewv 1.32 return parent grouped by file
314 spiga 1.26 """
315 ewv 1.32 return self.files