ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/DataDiscovery.py
Revision: 1.48
Committed: Tue Jul 6 16:31:55 2010 UTC (14 years, 9 months ago) by ewv
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_7_4, CRAB_2_7_4_pre6, CRAB_2_7_4_pre5, CRAB_2_7_4_pre4, CRAB_2_7_4_pre3, CRAB_2_7_4_pre2
Changes since 1.47: +24 -15 lines
Log Message:
Cleaner logic on retriveValue and throw exception for split by run with no runselection

File Contents

# User Rev Content
1 gutsche 1.6 #!/usr/bin/env python
2 ewv 1.33
3 ewv 1.48 __revision__ = "$Id: DataDiscovery.py,v 1.47 2010/06/29 17:46:42 ewv Exp $"
4     __version__ = "$Revision: 1.47 $"
5 ewv 1.33
6 slacapra 1.18 import exceptions
7     import DBSAPI.dbsApi
8 ewv 1.32 from DBSAPI.dbsApiException import *
9 slacapra 1.18 import common
10     from crab_util import *
11 ewv 1.47 try: # Can remove when CMSSW 3.7 and earlier are dropped
12     from FWCore.PythonUtilities.LumiList import LumiList
13     except ImportError:
14     from LumiList import LumiList
15    
16 ewv 1.32 import os
17    
18 afanfani 1.1
19 afanfani 1.3
20 slacapra 1.18 class DBSError(exceptions.Exception):
21     def __init__(self, errorName, errorMessage):
22     args='\nERROR DBS %s : %s \n'%(errorName,errorMessage)
23     exceptions.Exception.__init__(self, args)
24     pass
25 ewv 1.32
26 slacapra 1.18 def getErrorMessage(self):
27     """ Return error message """
28     return "%s" % (self.args)
29    
30 ewv 1.32
31    
32 slacapra 1.18 class DBSInvalidDataTierError(exceptions.Exception):
33     def __init__(self, errorName, errorMessage):
34     args='\nERROR DBS %s : %s \n'%(errorName,errorMessage)
35     exceptions.Exception.__init__(self, args)
36     pass
37 ewv 1.32
38 slacapra 1.18 def getErrorMessage(self):
39     """ Return error message """
40     return "%s" % (self.args)
41    
42 ewv 1.32
43    
44 slacapra 1.18 class DBSInfoError:
45     def __init__(self, url):
46     print '\nERROR accessing DBS url : '+url+'\n'
47     pass
48    
49 ewv 1.32
50    
51 afanfani 1.3 class DataDiscoveryError(exceptions.Exception):
52 slacapra 1.7 def __init__(self, errorMessage):
53 gutsche 1.15 self.args=errorMessage
54 slacapra 1.14 exceptions.Exception.__init__(self, self.args)
55 slacapra 1.7 pass
56    
57     def getErrorMessage(self):
58     """ Return exception error """
59     return "%s" % (self.args)
60 afanfani 1.3
61 ewv 1.32
62    
63 afanfani 1.3 class NotExistingDatasetError(exceptions.Exception):
64 slacapra 1.7 def __init__(self, errorMessage):
65 gutsche 1.15 self.args=errorMessage
66 slacapra 1.14 exceptions.Exception.__init__(self, self.args)
67 slacapra 1.7 pass
68    
69     def getErrorMessage(self):
70     """ Return exception error """
71     return "%s" % (self.args)
72 afanfani 1.1
73 ewv 1.32
74    
75 afanfani 1.3 class NoDataTierinProvenanceError(exceptions.Exception):
76 slacapra 1.7 def __init__(self, errorMessage):
77 gutsche 1.15 self.args=errorMessage
78 slacapra 1.14 exceptions.Exception.__init__(self, self.args)
79 slacapra 1.7 pass
80    
81     def getErrorMessage(self):
82     """ Return exception error """
83     return "%s" % (self.args)
84 afanfani 1.1
85 ewv 1.32
86    
87 afanfani 1.1 class DataDiscovery:
88 ewv 1.32 """
89     Class to find and extact info from published data
90     """
91 spiga 1.22 def __init__(self, datasetPath, cfg_params, skipAnBlocks):
92 afanfani 1.1
93 slacapra 1.18 # Attributes
94 slacapra 1.11 self.datasetPath = datasetPath
95 ewv 1.32 # Analysis dataset is primary/processed/tier/definition
96 spiga 1.34 self.ads = len(self.datasetPath.split("/")) > 4
97 afanfani 1.1 self.cfg_params = cfg_params
98 spiga 1.22 self.skipBlocks = skipAnBlocks
99 afanfani 1.1
100 slacapra 1.11 self.eventsPerBlock = {} # DBS output: map fileblocks-events for collection
101     self.eventsPerFile = {} # DBS output: map files-events
102 ewv 1.32 # self.lumisPerBlock = {} # DBS output: number of lumis in each block
103     # self.lumisPerFile = {} # DBS output: number of lumis in each file
104     self.blocksinfo = {} # DBS output: map fileblocks-files
105 slacapra 1.18 self.maxEvents = 0 # DBS output: max events
106 ewv 1.32 self.maxLumis = 0 # DBS output: total number of lumis
107     self.parent = {} # DBS output: parents of each file
108     self.lumis = {} # DBS output: lumis in each file
109 spiga 1.41 self.lumiMask = None
110 ewv 1.45 self.splitByLumi = False
111 afanfani 1.1
112     def fetchDBSInfo(self):
113     """
114     Contact DBS
115     """
116 slacapra 1.11 ## get DBS URL
117 spiga 1.25 global_url="http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet"
118 spiga 1.43 dbs_url= self.cfg_params.get('CMSSW.dbs_url', global_url)
119 spiga 1.36 common.logger.info("Accessing DBS at: "+dbs_url)
120 slacapra 1.18
121     ## check if runs are selected
122 slacapra 1.19 runselection = []
123     if (self.cfg_params.has_key('CMSSW.runselection')):
124 slacapra 1.18 runselection = parseRange2(self.cfg_params['CMSSW.runselection'])
125    
126 ewv 1.42 ## check if various lumi parameters are set
127 spiga 1.41 self.lumiMask = self.cfg_params.get('CMSSW.lumi_mask',None)
128 ewv 1.42 self.lumiParams = self.cfg_params.get('CMSSW.total_number_of_lumis',None) or \
129     self.cfg_params.get('CMSSW.lumis_per_job',None)
130    
131 spiga 1.41 lumiList = None
132     if self.lumiMask:
133     lumiList = LumiList(filename=self.lumiMask)
134 ewv 1.44 if runselection:
135     runList = LumiList(runs = runselection)
136 spiga 1.26
137     self.splitByRun = int(self.cfg_params.get('CMSSW.split_by_run', 0))
138 ewv 1.45 common.logger.log(10-1,"runselection is: %s"%runselection)
139    
140     if not self.splitByRun:
141     self.splitByLumi = self.lumiMask or self.lumiParams or self.ads
142 spiga 1.26
143 ewv 1.48 if self.splitByRun and not runselection:
144     msg = "Error: split_by_run must be combined with a runselection"
145     raise CrabException(msg)
146    
147 slacapra 1.18 ## service API
148     args = {}
149     args['url'] = dbs_url
150     args['level'] = 'CRITICAL'
151    
152 spiga 1.21 ## check if has been requested to use the parent info
153 spiga 1.26 useparent = int(self.cfg_params.get('CMSSW.use_parent',0))
154 spiga 1.21
155 ewv 1.32 ## check if has been asked for a non default file to store/read analyzed fileBlocks
156     defaultName = common.work_space.shareDir()+'AnalyzedBlocks.txt'
157 spiga 1.22 fileBlocks_FileName = os.path.abspath(self.cfg_params.get('CMSSW.fileblocks_file',defaultName))
158 ewv 1.32
159 slacapra 1.18 api = DBSAPI.dbsApi.DbsApi(args)
160 spiga 1.27 self.files = self.queryDbs(api,path=self.datasetPath,runselection=runselection,useParent=useparent)
161 slacapra 1.11
162 ewv 1.45 # Check to see what the dataset is
163     pdsName = self.datasetPath.split("/")[1]
164     primDSs = api.listPrimaryDatasets(pdsName)
165     dataType = primDSs[0]['Type']
166     common.logger.debug("Datatype is %s" % dataType)
167     if dataType == 'data' and not (self.splitByRun or self.splitByLumi):
168     msg = 'Data must be split by lumi or by run. ' \
169     'Please see crab -help for the correct settings'
170     raise CrabException(msg)
171    
172    
173    
174 spiga 1.22 anFileBlocks = []
175 ewv 1.32 if self.skipBlocks: anFileBlocks = readTXTfile(self, fileBlocks_FileName)
176 spiga 1.22
177 slacapra 1.18 # parse files and fill arrays
178 spiga 1.26 for file in self.files :
179 ewv 1.32 parList = []
180 spiga 1.41 fileLumis = [] # List of tuples
181 spiga 1.22 # skip already analyzed blocks
182     fileblock = file['Block']['Name']
183     if fileblock not in anFileBlocks :
184     filename = file['LogicalFileName']
185 ewv 1.32 # asked retry the list of parent for the given child
186     if useparent==1:
187     parList = [x['LogicalFileName'] for x in file['ParentList']]
188 ewv 1.45 if self.splitByLumi:
189 spiga 1.41 fileLumis = [ (x['RunNumber'], x['LumiSectionNumber'])
190 ewv 1.32 for x in file['LumiList'] ]
191     self.parent[filename] = parList
192 spiga 1.41 # For LumiMask, intersection of two lists.
193 ewv 1.45 if self.lumiMask and runselection:
194     self.lumis[filename] = runList.filterLumis(lumiList.filterLumis(fileLumis))
195     elif runselection:
196     self.lumis[filename] = runList.filterLumis(fileLumis)
197     elif self.lumiMask:
198 spiga 1.41 self.lumis[filename] = lumiList.filterLumis(fileLumis)
199     else:
200     self.lumis[filename] = fileLumis
201 spiga 1.22 if filename.find('.dat') < 0 :
202     events = file['NumberOfEvents']
203 ewv 1.32 # Count number of events and lumis per block
204 spiga 1.22 if fileblock in self.eventsPerBlock.keys() :
205     self.eventsPerBlock[fileblock] += events
206     else :
207     self.eventsPerBlock[fileblock] = events
208 ewv 1.32 # Number of events per file
209 spiga 1.22 self.eventsPerFile[filename] = events
210 ewv 1.32
211     # List of files per block
212 spiga 1.22 if fileblock in self.blocksinfo.keys() :
213     self.blocksinfo[fileblock].append(filename)
214     else :
215     self.blocksinfo[fileblock] = [filename]
216 ewv 1.32
217 spiga 1.22 # total number of events
218     self.maxEvents += events
219 spiga 1.41 self.maxLumis += len(self.lumis[filename])
220 ewv 1.32
221 spiga 1.22 if self.skipBlocks and len(self.eventsPerBlock.keys()) == 0:
222     msg = "No new fileblocks available for dataset: "+str(self.datasetPath)
223 ewv 1.32 raise CrabException(msg)
224 slacapra 1.11
225 ewv 1.32
226 slacapra 1.11 if len(self.eventsPerBlock) <= 0:
227 slacapra 1.18 raise NotExistingDatasetError(("\nNo data for %s in DBS\nPlease check"
228 slacapra 1.11 + " dataset path variables in crab.cfg")
229 slacapra 1.18 % self.datasetPath)
230 afanfani 1.1
231    
232 ewv 1.32 def queryDbs(self,api,path=None,runselection=None,useParent=None):
233 spiga 1.26
234 ewv 1.48
235     allowedRetriveValue = []
236     if self.splitByLumi or self.splitByRun or useParent == 1:
237     allowedRetriveValue.extend(['retrive_block', 'retrive_run'])
238     if self.splitByLumi:
239 spiga 1.41 allowedRetriveValue.append('retrive_lumi')
240 ewv 1.48 if useParent == 1:
241     allowedRetriveValue.append('retrive_parent')
242 ewv 1.32 common.logger.debug("Set of input parameters used for DBS query: %s" % allowedRetriveValue)
243 spiga 1.26 try:
244 ewv 1.48 if self.splitByRun:
245     files = []
246 spiga 1.26 for arun in runselection:
247     try:
248 ewv 1.32 if self.ads:
249     filesinrun = api.listFiles(analysisDataset=path,retriveList=allowedRetriveValue,runNumber=arun)
250     else:
251     filesinrun = api.listFiles(path=path,retriveList=allowedRetriveValue,runNumber=arun)
252 spiga 1.26 files.extend(filesinrun)
253     except:
254     msg="WARNING: problem extracting info from DBS for run %s "%arun
255 spiga 1.31 common.logger.info(msg)
256 spiga 1.26 pass
257    
258 ewv 1.48 else:
259     if allowedRetriveValue:
260     if self.ads:
261     files = api.listFiles(analysisDataset=path, retriveList=allowedRetriveValue)
262     else :
263     files = api.listFiles(path=path, retriveList=allowedRetriveValue)
264     else:
265     files = api.listDatasetFiles(self.datasetPath)
266    
267 spiga 1.26 except DbsBadRequest, msg:
268     raise DataDiscoveryError(msg)
269     except DBSError, msg:
270     raise DataDiscoveryError(msg)
271    
272     return files
273    
274 ewv 1.32
275 afanfani 1.1 def getMaxEvents(self):
276     """
277 ewv 1.32 max events
278 afanfani 1.1 """
279 slacapra 1.18 return self.maxEvents
280 afanfani 1.1
281 ewv 1.32
282 ewv 1.33 def getMaxLumis(self):
283     """
284     Return the number of lumis in the dataset
285     """
286     return self.maxLumis
287    
288    
289 slacapra 1.11 def getEventsPerBlock(self):
290 afanfani 1.1 """
291 ewv 1.32 list the event collections structure by fileblock
292 afanfani 1.1 """
293 slacapra 1.11 return self.eventsPerBlock
294 afanfani 1.1
295 ewv 1.32
296 slacapra 1.11 def getEventsPerFile(self):
297 afanfani 1.1 """
298 ewv 1.32 list the event collections structure by file
299 afanfani 1.1 """
300 slacapra 1.11 return self.eventsPerFile
301 afanfani 1.1
302 ewv 1.32
303 slacapra 1.11 def getFiles(self):
304 afanfani 1.1 """
305 ewv 1.32 return files grouped by fileblock
306 afanfani 1.1 """
307 ewv 1.32 return self.blocksinfo
308    
309 afanfani 1.1
310 spiga 1.21 def getParent(self):
311     """
312 ewv 1.32 return parent grouped by file
313     """
314     return self.parent
315    
316    
317     def getLumis(self):
318     """
319     return lumi sections grouped by file
320 spiga 1.21 """
321 ewv 1.32 return self.lumis
322    
323 spiga 1.21
324 spiga 1.26 def getListFiles(self):
325     """
326 ewv 1.32 return parent grouped by file
327 spiga 1.26 """
328 ewv 1.32 return self.files