ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/DataDiscovery.py
Revision: 1.50
Committed: Thu Sep 12 13:45:22 2013 UTC (11 years, 7 months ago) by belforte
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_9_1, CRAB_2_9_1_pre2, HEAD
Changes since 1.49: +6 -2 lines
Log Message:
protect against huge runselection range: https://savannah.cern.ch/bugs/index.php?95734

File Contents

# User Rev Content
1 gutsche 1.6 #!/usr/bin/env python
2 ewv 1.33
3 belforte 1.50 __revision__ = "$Id: DataDiscovery.py,v 1.49 2010/08/30 10:36:33 ewv Exp $"
4     __version__ = "$Revision: 1.49 $"
5 ewv 1.33
6 slacapra 1.18 import exceptions
7     import DBSAPI.dbsApi
8 ewv 1.32 from DBSAPI.dbsApiException import *
9 slacapra 1.18 import common
10     from crab_util import *
11 ewv 1.47 try: # Can remove when CMSSW 3.7 and earlier are dropped
12     from FWCore.PythonUtilities.LumiList import LumiList
13     except ImportError:
14     from LumiList import LumiList
15    
16 ewv 1.32 import os
17    
18 afanfani 1.1
19 afanfani 1.3
20 slacapra 1.18 class DBSError(exceptions.Exception):
21     def __init__(self, errorName, errorMessage):
22     args='\nERROR DBS %s : %s \n'%(errorName,errorMessage)
23     exceptions.Exception.__init__(self, args)
24     pass
25 ewv 1.32
26 slacapra 1.18 def getErrorMessage(self):
27     """ Return error message """
28     return "%s" % (self.args)
29    
30 ewv 1.32
31    
32 slacapra 1.18 class DBSInvalidDataTierError(exceptions.Exception):
33     def __init__(self, errorName, errorMessage):
34     args='\nERROR DBS %s : %s \n'%(errorName,errorMessage)
35     exceptions.Exception.__init__(self, args)
36     pass
37 ewv 1.32
38 slacapra 1.18 def getErrorMessage(self):
39     """ Return error message """
40     return "%s" % (self.args)
41    
42 ewv 1.32
43    
44 slacapra 1.18 class DBSInfoError:
45     def __init__(self, url):
46     print '\nERROR accessing DBS url : '+url+'\n'
47     pass
48    
49 ewv 1.32
50    
51 afanfani 1.3 class DataDiscoveryError(exceptions.Exception):
52 slacapra 1.7 def __init__(self, errorMessage):
53 gutsche 1.15 self.args=errorMessage
54 slacapra 1.14 exceptions.Exception.__init__(self, self.args)
55 slacapra 1.7 pass
56    
57     def getErrorMessage(self):
58     """ Return exception error """
59     return "%s" % (self.args)
60 afanfani 1.3
61 ewv 1.32
62    
63 afanfani 1.3 class NotExistingDatasetError(exceptions.Exception):
64 slacapra 1.7 def __init__(self, errorMessage):
65 gutsche 1.15 self.args=errorMessage
66 slacapra 1.14 exceptions.Exception.__init__(self, self.args)
67 slacapra 1.7 pass
68    
69     def getErrorMessage(self):
70     """ Return exception error """
71     return "%s" % (self.args)
72 afanfani 1.1
73 ewv 1.32
74    
75 afanfani 1.3 class NoDataTierinProvenanceError(exceptions.Exception):
76 slacapra 1.7 def __init__(self, errorMessage):
77 gutsche 1.15 self.args=errorMessage
78 slacapra 1.14 exceptions.Exception.__init__(self, self.args)
79 slacapra 1.7 pass
80    
81     def getErrorMessage(self):
82     """ Return exception error """
83     return "%s" % (self.args)
84 afanfani 1.1
85 ewv 1.32
86    
87 afanfani 1.1 class DataDiscovery:
88 ewv 1.32 """
89     Class to find and extact info from published data
90     """
91 spiga 1.22 def __init__(self, datasetPath, cfg_params, skipAnBlocks):
92 afanfani 1.1
93 slacapra 1.18 # Attributes
94 slacapra 1.11 self.datasetPath = datasetPath
95 ewv 1.32 # Analysis dataset is primary/processed/tier/definition
96 spiga 1.34 self.ads = len(self.datasetPath.split("/")) > 4
97 afanfani 1.1 self.cfg_params = cfg_params
98 spiga 1.22 self.skipBlocks = skipAnBlocks
99 afanfani 1.1
100 slacapra 1.11 self.eventsPerBlock = {} # DBS output: map fileblocks-events for collection
101     self.eventsPerFile = {} # DBS output: map files-events
102 ewv 1.32 # self.lumisPerBlock = {} # DBS output: number of lumis in each block
103     # self.lumisPerFile = {} # DBS output: number of lumis in each file
104     self.blocksinfo = {} # DBS output: map fileblocks-files
105 slacapra 1.18 self.maxEvents = 0 # DBS output: max events
106 ewv 1.32 self.maxLumis = 0 # DBS output: total number of lumis
107     self.parent = {} # DBS output: parents of each file
108     self.lumis = {} # DBS output: lumis in each file
109 spiga 1.41 self.lumiMask = None
110 ewv 1.45 self.splitByLumi = False
111 ewv 1.49 self.splitDataByEvent = 0
112 afanfani 1.1
113     def fetchDBSInfo(self):
114     """
115     Contact DBS
116     """
117 slacapra 1.11 ## get DBS URL
118 spiga 1.25 global_url="http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet"
119 spiga 1.43 dbs_url= self.cfg_params.get('CMSSW.dbs_url', global_url)
120 spiga 1.36 common.logger.info("Accessing DBS at: "+dbs_url)
121 slacapra 1.18
122     ## check if runs are selected
123 slacapra 1.19 runselection = []
124     if (self.cfg_params.has_key('CMSSW.runselection')):
125 slacapra 1.18 runselection = parseRange2(self.cfg_params['CMSSW.runselection'])
126 belforte 1.50 if len(runselection)>1000000:
127     common.logger.info("ERROR: runselection range has more then 1M numbers")
128     common.logger.info("ERROR: Too large. runselection is ignored")
129     runselection=[]
130 slacapra 1.18
131 ewv 1.42 ## check if various lumi parameters are set
132 spiga 1.41 self.lumiMask = self.cfg_params.get('CMSSW.lumi_mask',None)
133 ewv 1.42 self.lumiParams = self.cfg_params.get('CMSSW.total_number_of_lumis',None) or \
134     self.cfg_params.get('CMSSW.lumis_per_job',None)
135    
136 spiga 1.41 lumiList = None
137     if self.lumiMask:
138     lumiList = LumiList(filename=self.lumiMask)
139 ewv 1.44 if runselection:
140     runList = LumiList(runs = runselection)
141 spiga 1.26
142     self.splitByRun = int(self.cfg_params.get('CMSSW.split_by_run', 0))
143 ewv 1.49 self.splitDataByEvent = int(self.cfg_params.get('CMSSW.split_by_event', 0))
144 ewv 1.45 common.logger.log(10-1,"runselection is: %s"%runselection)
145    
146     if not self.splitByRun:
147     self.splitByLumi = self.lumiMask or self.lumiParams or self.ads
148 spiga 1.26
149 ewv 1.48 if self.splitByRun and not runselection:
150     msg = "Error: split_by_run must be combined with a runselection"
151     raise CrabException(msg)
152    
153 slacapra 1.18 ## service API
154     args = {}
155     args['url'] = dbs_url
156     args['level'] = 'CRITICAL'
157    
158 spiga 1.21 ## check if has been requested to use the parent info
159 spiga 1.26 useparent = int(self.cfg_params.get('CMSSW.use_parent',0))
160 spiga 1.21
161 ewv 1.32 ## check if has been asked for a non default file to store/read analyzed fileBlocks
162     defaultName = common.work_space.shareDir()+'AnalyzedBlocks.txt'
163 spiga 1.22 fileBlocks_FileName = os.path.abspath(self.cfg_params.get('CMSSW.fileblocks_file',defaultName))
164 ewv 1.32
165 slacapra 1.18 api = DBSAPI.dbsApi.DbsApi(args)
166 spiga 1.27 self.files = self.queryDbs(api,path=self.datasetPath,runselection=runselection,useParent=useparent)
167 slacapra 1.11
168 ewv 1.45 # Check to see what the dataset is
169     pdsName = self.datasetPath.split("/")[1]
170     primDSs = api.listPrimaryDatasets(pdsName)
171     dataType = primDSs[0]['Type']
172     common.logger.debug("Datatype is %s" % dataType)
173 ewv 1.49 if dataType == 'data' and not \
174     (self.splitByRun or self.splitByLumi or self.splitDataByEvent):
175 ewv 1.45 msg = 'Data must be split by lumi or by run. ' \
176     'Please see crab -help for the correct settings'
177     raise CrabException(msg)
178    
179    
180    
181 spiga 1.22 anFileBlocks = []
182 ewv 1.32 if self.skipBlocks: anFileBlocks = readTXTfile(self, fileBlocks_FileName)
183 spiga 1.22
184 slacapra 1.18 # parse files and fill arrays
185 spiga 1.26 for file in self.files :
186 ewv 1.32 parList = []
187 spiga 1.41 fileLumis = [] # List of tuples
188 spiga 1.22 # skip already analyzed blocks
189     fileblock = file['Block']['Name']
190     if fileblock not in anFileBlocks :
191     filename = file['LogicalFileName']
192 ewv 1.32 # asked retry the list of parent for the given child
193     if useparent==1:
194     parList = [x['LogicalFileName'] for x in file['ParentList']]
195 ewv 1.45 if self.splitByLumi:
196 spiga 1.41 fileLumis = [ (x['RunNumber'], x['LumiSectionNumber'])
197 ewv 1.32 for x in file['LumiList'] ]
198     self.parent[filename] = parList
199 spiga 1.41 # For LumiMask, intersection of two lists.
200 ewv 1.45 if self.lumiMask and runselection:
201     self.lumis[filename] = runList.filterLumis(lumiList.filterLumis(fileLumis))
202     elif runselection:
203     self.lumis[filename] = runList.filterLumis(fileLumis)
204     elif self.lumiMask:
205 spiga 1.41 self.lumis[filename] = lumiList.filterLumis(fileLumis)
206     else:
207     self.lumis[filename] = fileLumis
208 spiga 1.22 if filename.find('.dat') < 0 :
209     events = file['NumberOfEvents']
210 ewv 1.32 # Count number of events and lumis per block
211 spiga 1.22 if fileblock in self.eventsPerBlock.keys() :
212     self.eventsPerBlock[fileblock] += events
213     else :
214     self.eventsPerBlock[fileblock] = events
215 ewv 1.32 # Number of events per file
216 spiga 1.22 self.eventsPerFile[filename] = events
217 ewv 1.32
218     # List of files per block
219 spiga 1.22 if fileblock in self.blocksinfo.keys() :
220     self.blocksinfo[fileblock].append(filename)
221     else :
222     self.blocksinfo[fileblock] = [filename]
223 ewv 1.32
224 spiga 1.22 # total number of events
225     self.maxEvents += events
226 spiga 1.41 self.maxLumis += len(self.lumis[filename])
227 ewv 1.32
228 spiga 1.22 if self.skipBlocks and len(self.eventsPerBlock.keys()) == 0:
229     msg = "No new fileblocks available for dataset: "+str(self.datasetPath)
230 ewv 1.32 raise CrabException(msg)
231 slacapra 1.11
232 ewv 1.32
233 slacapra 1.11 if len(self.eventsPerBlock) <= 0:
234 slacapra 1.18 raise NotExistingDatasetError(("\nNo data for %s in DBS\nPlease check"
235 slacapra 1.11 + " dataset path variables in crab.cfg")
236 slacapra 1.18 % self.datasetPath)
237 afanfani 1.1
238    
239 ewv 1.32 def queryDbs(self,api,path=None,runselection=None,useParent=None):
240 spiga 1.26
241 ewv 1.48
242     allowedRetriveValue = []
243     if self.splitByLumi or self.splitByRun or useParent == 1:
244     allowedRetriveValue.extend(['retrive_block', 'retrive_run'])
245     if self.splitByLumi:
246 spiga 1.41 allowedRetriveValue.append('retrive_lumi')
247 ewv 1.48 if useParent == 1:
248     allowedRetriveValue.append('retrive_parent')
249 ewv 1.32 common.logger.debug("Set of input parameters used for DBS query: %s" % allowedRetriveValue)
250 spiga 1.26 try:
251 ewv 1.48 if self.splitByRun:
252     files = []
253 spiga 1.26 for arun in runselection:
254     try:
255 ewv 1.32 if self.ads:
256     filesinrun = api.listFiles(analysisDataset=path,retriveList=allowedRetriveValue,runNumber=arun)
257     else:
258     filesinrun = api.listFiles(path=path,retriveList=allowedRetriveValue,runNumber=arun)
259 spiga 1.26 files.extend(filesinrun)
260     except:
261     msg="WARNING: problem extracting info from DBS for run %s "%arun
262 spiga 1.31 common.logger.info(msg)
263 spiga 1.26 pass
264    
265 ewv 1.48 else:
266     if allowedRetriveValue:
267     if self.ads:
268     files = api.listFiles(analysisDataset=path, retriveList=allowedRetriveValue)
269     else :
270     files = api.listFiles(path=path, retriveList=allowedRetriveValue)
271     else:
272     files = api.listDatasetFiles(self.datasetPath)
273    
274 spiga 1.26 except DbsBadRequest, msg:
275     raise DataDiscoveryError(msg)
276     except DBSError, msg:
277     raise DataDiscoveryError(msg)
278    
279     return files
280    
281 ewv 1.32
282 afanfani 1.1 def getMaxEvents(self):
283     """
284 ewv 1.32 max events
285 afanfani 1.1 """
286 slacapra 1.18 return self.maxEvents
287 afanfani 1.1
288 ewv 1.32
289 ewv 1.33 def getMaxLumis(self):
290     """
291     Return the number of lumis in the dataset
292     """
293     return self.maxLumis
294    
295    
296 slacapra 1.11 def getEventsPerBlock(self):
297 afanfani 1.1 """
298 ewv 1.32 list the event collections structure by fileblock
299 afanfani 1.1 """
300 slacapra 1.11 return self.eventsPerBlock
301 afanfani 1.1
302 ewv 1.32
303 slacapra 1.11 def getEventsPerFile(self):
304 afanfani 1.1 """
305 ewv 1.32 list the event collections structure by file
306 afanfani 1.1 """
307 slacapra 1.11 return self.eventsPerFile
308 afanfani 1.1
309 ewv 1.32
310 slacapra 1.11 def getFiles(self):
311 afanfani 1.1 """
312 ewv 1.32 return files grouped by fileblock
313 afanfani 1.1 """
314 ewv 1.32 return self.blocksinfo
315    
316 afanfani 1.1
317 spiga 1.21 def getParent(self):
318     """
319 ewv 1.32 return parent grouped by file
320     """
321     return self.parent
322    
323    
324     def getLumis(self):
325     """
326     return lumi sections grouped by file
327 spiga 1.21 """
328 ewv 1.32 return self.lumis
329    
330 spiga 1.21
331 spiga 1.26 def getListFiles(self):
332     """
333 ewv 1.32 return parent grouped by file
334 spiga 1.26 """
335 ewv 1.32 return self.files