ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/DataDiscovery.py
Revision: 1.49
Committed: Mon Aug 30 10:36:33 2010 UTC (14 years, 8 months ago) by ewv
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_9_1_pre1, CRAB_2_9_0, CRAB_2_9_0_pre2, CRAB_2_9_0_pre1, CRAB_2_8_8, CRAB_2_8_8_pre1, CRAB_2_8_7_patch3, CRAB_2_8_7_patch2, CRAB_2_8_7_patch1, CRAB_2_8_7, CRAB_2_8_7_pre2, CRAB_2_8_7_pre1, CRAB_2_8_6, CRAB_2_8_6_pre1, CRAB_2_8_5_patch3, CRAB_2_8_5_patch2, CRAB_2_8_5_patch1, CRAB_2_8_5, CRAB_2_8_5_pre5, CRAB_2_8_5_pre4, CRAB_2_8_5_pre3, CRAB_2_8_4_patch3, CRAB_2_8_5_pre2, CRAB_2_8_4_patch2, CRAB_2_8_5_pre1, CRAB_2_8_4_patch1, CRAB_2_8_4, CRAB_2_8_4_pre5, CRAB_2_8_4_pre4, CRAB_2_8_4_pre3, CRAB_2_8_4_pre2, CRAB_2_8_4_pre1, CRAB_2_8_3, CRAB_2_8_3_pre4, CRAB_2_8_3_pre3, CRAB_2_8_3_pre2, CRAB_2_8_3_pre1, CRAB_2_8_2_patch1, CRAB_2_8_2, CRAB_2_8_2_pre5, CRAB_2_8_2_pre4, CRAB_2_8_2_pre3, CRAB_2_8_2_pre2, CRAB_2_8_2_pre1, CRAB_2_8_1, CRAB_2_8_0, CRAB_2_8_0_pre1, CRAB_2_7_10_pre3, CRAB_2_7_9_patch2_pre1, CRAB_2_7_10_pre2, CRAB_2_7_10_pre1, CRAB_2_7_9_patch1, CRAB_2_7_9, CRAB_2_7_9_pre5, CRAB_2_7_9_pre4, CRAB_2_7_9_pre3, CRAB_2_7_9_pre2, CRAB_2_7_8_patch2, CRAB_2_7_9_pre1, CRAB_2_7_8_patch2_pre1, CRAB_2_7_8_patch1, CRAB_2_7_8_patch1_pre1, CRAB_2_7_8, CRAB_2_7_8_pre3, CRAB_2_7_8_pre2, CRAB_2_7_8_dash3, CRAB_2_7_8_dash2, CRAB_2_7_8_dash, CRAB_2_7_7_patch1, CRAB_2_7_7_patch1_pre1, CRAB_2_7_8_pre1, CRAB_2_7_7, CRAB_2_7_7_pre2, CRAB_2_7_7_pre1, CRAB_2_7_6_patch1, CRAB_2_7_6, CRAB_2_7_6_pre1, CRAB_2_7_5_patch1, CRAB_2_7_5, CRAB_2_7_5_pre3, CRAB_2_7_5_pre2, CRAB_2_7_5_pre1, CRAB_2_7_4_patch1
Changes since 1.48: +6 -3 lines
Log Message:
Allow split by event for data if split_by_event is set

File Contents

# User Rev Content
1 gutsche 1.6 #!/usr/bin/env python
2 ewv 1.33
3 ewv 1.49 __revision__ = "$Id: DataDiscovery.py,v 1.48 2010/07/06 16:31:55 ewv Exp $"
4     __version__ = "$Revision: 1.48 $"
5 ewv 1.33
6 slacapra 1.18 import exceptions
7     import DBSAPI.dbsApi
8 ewv 1.32 from DBSAPI.dbsApiException import *
9 slacapra 1.18 import common
10     from crab_util import *
11 ewv 1.47 try: # Can remove when CMSSW 3.7 and earlier are dropped
12     from FWCore.PythonUtilities.LumiList import LumiList
13     except ImportError:
14     from LumiList import LumiList
15    
16 ewv 1.32 import os
17    
18 afanfani 1.1
19 afanfani 1.3
20 slacapra 1.18 class DBSError(exceptions.Exception):
21     def __init__(self, errorName, errorMessage):
22     args='\nERROR DBS %s : %s \n'%(errorName,errorMessage)
23     exceptions.Exception.__init__(self, args)
24     pass
25 ewv 1.32
26 slacapra 1.18 def getErrorMessage(self):
27     """ Return error message """
28     return "%s" % (self.args)
29    
30 ewv 1.32
31    
32 slacapra 1.18 class DBSInvalidDataTierError(exceptions.Exception):
33     def __init__(self, errorName, errorMessage):
34     args='\nERROR DBS %s : %s \n'%(errorName,errorMessage)
35     exceptions.Exception.__init__(self, args)
36     pass
37 ewv 1.32
38 slacapra 1.18 def getErrorMessage(self):
39     """ Return error message """
40     return "%s" % (self.args)
41    
42 ewv 1.32
43    
44 slacapra 1.18 class DBSInfoError:
45     def __init__(self, url):
46     print '\nERROR accessing DBS url : '+url+'\n'
47     pass
48    
49 ewv 1.32
50    
51 afanfani 1.3 class DataDiscoveryError(exceptions.Exception):
52 slacapra 1.7 def __init__(self, errorMessage):
53 gutsche 1.15 self.args=errorMessage
54 slacapra 1.14 exceptions.Exception.__init__(self, self.args)
55 slacapra 1.7 pass
56    
57     def getErrorMessage(self):
58     """ Return exception error """
59     return "%s" % (self.args)
60 afanfani 1.3
61 ewv 1.32
62    
63 afanfani 1.3 class NotExistingDatasetError(exceptions.Exception):
64 slacapra 1.7 def __init__(self, errorMessage):
65 gutsche 1.15 self.args=errorMessage
66 slacapra 1.14 exceptions.Exception.__init__(self, self.args)
67 slacapra 1.7 pass
68    
69     def getErrorMessage(self):
70     """ Return exception error """
71     return "%s" % (self.args)
72 afanfani 1.1
73 ewv 1.32
74    
75 afanfani 1.3 class NoDataTierinProvenanceError(exceptions.Exception):
76 slacapra 1.7 def __init__(self, errorMessage):
77 gutsche 1.15 self.args=errorMessage
78 slacapra 1.14 exceptions.Exception.__init__(self, self.args)
79 slacapra 1.7 pass
80    
81     def getErrorMessage(self):
82     """ Return exception error """
83     return "%s" % (self.args)
84 afanfani 1.1
85 ewv 1.32
86    
87 afanfani 1.1 class DataDiscovery:
88 ewv 1.32 """
89     Class to find and extact info from published data
90     """
91 spiga 1.22 def __init__(self, datasetPath, cfg_params, skipAnBlocks):
92 afanfani 1.1
93 slacapra 1.18 # Attributes
94 slacapra 1.11 self.datasetPath = datasetPath
95 ewv 1.32 # Analysis dataset is primary/processed/tier/definition
96 spiga 1.34 self.ads = len(self.datasetPath.split("/")) > 4
97 afanfani 1.1 self.cfg_params = cfg_params
98 spiga 1.22 self.skipBlocks = skipAnBlocks
99 afanfani 1.1
100 slacapra 1.11 self.eventsPerBlock = {} # DBS output: map fileblocks-events for collection
101     self.eventsPerFile = {} # DBS output: map files-events
102 ewv 1.32 # self.lumisPerBlock = {} # DBS output: number of lumis in each block
103     # self.lumisPerFile = {} # DBS output: number of lumis in each file
104     self.blocksinfo = {} # DBS output: map fileblocks-files
105 slacapra 1.18 self.maxEvents = 0 # DBS output: max events
106 ewv 1.32 self.maxLumis = 0 # DBS output: total number of lumis
107     self.parent = {} # DBS output: parents of each file
108     self.lumis = {} # DBS output: lumis in each file
109 spiga 1.41 self.lumiMask = None
110 ewv 1.45 self.splitByLumi = False
111 ewv 1.49 self.splitDataByEvent = 0
112 afanfani 1.1
113     def fetchDBSInfo(self):
114     """
115     Contact DBS
116     """
117 slacapra 1.11 ## get DBS URL
118 spiga 1.25 global_url="http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet"
119 spiga 1.43 dbs_url= self.cfg_params.get('CMSSW.dbs_url', global_url)
120 spiga 1.36 common.logger.info("Accessing DBS at: "+dbs_url)
121 slacapra 1.18
122     ## check if runs are selected
123 slacapra 1.19 runselection = []
124     if (self.cfg_params.has_key('CMSSW.runselection')):
125 slacapra 1.18 runselection = parseRange2(self.cfg_params['CMSSW.runselection'])
126    
127 ewv 1.42 ## check if various lumi parameters are set
128 spiga 1.41 self.lumiMask = self.cfg_params.get('CMSSW.lumi_mask',None)
129 ewv 1.42 self.lumiParams = self.cfg_params.get('CMSSW.total_number_of_lumis',None) or \
130     self.cfg_params.get('CMSSW.lumis_per_job',None)
131    
132 spiga 1.41 lumiList = None
133     if self.lumiMask:
134     lumiList = LumiList(filename=self.lumiMask)
135 ewv 1.44 if runselection:
136     runList = LumiList(runs = runselection)
137 spiga 1.26
138     self.splitByRun = int(self.cfg_params.get('CMSSW.split_by_run', 0))
139 ewv 1.49 self.splitDataByEvent = int(self.cfg_params.get('CMSSW.split_by_event', 0))
140 ewv 1.45 common.logger.log(10-1,"runselection is: %s"%runselection)
141    
142     if not self.splitByRun:
143     self.splitByLumi = self.lumiMask or self.lumiParams or self.ads
144 spiga 1.26
145 ewv 1.48 if self.splitByRun and not runselection:
146     msg = "Error: split_by_run must be combined with a runselection"
147     raise CrabException(msg)
148    
149 slacapra 1.18 ## service API
150     args = {}
151     args['url'] = dbs_url
152     args['level'] = 'CRITICAL'
153    
154 spiga 1.21 ## check if has been requested to use the parent info
155 spiga 1.26 useparent = int(self.cfg_params.get('CMSSW.use_parent',0))
156 spiga 1.21
157 ewv 1.32 ## check if has been asked for a non default file to store/read analyzed fileBlocks
158     defaultName = common.work_space.shareDir()+'AnalyzedBlocks.txt'
159 spiga 1.22 fileBlocks_FileName = os.path.abspath(self.cfg_params.get('CMSSW.fileblocks_file',defaultName))
160 ewv 1.32
161 slacapra 1.18 api = DBSAPI.dbsApi.DbsApi(args)
162 spiga 1.27 self.files = self.queryDbs(api,path=self.datasetPath,runselection=runselection,useParent=useparent)
163 slacapra 1.11
164 ewv 1.45 # Check to see what the dataset is
165     pdsName = self.datasetPath.split("/")[1]
166     primDSs = api.listPrimaryDatasets(pdsName)
167     dataType = primDSs[0]['Type']
168     common.logger.debug("Datatype is %s" % dataType)
169 ewv 1.49 if dataType == 'data' and not \
170     (self.splitByRun or self.splitByLumi or self.splitDataByEvent):
171 ewv 1.45 msg = 'Data must be split by lumi or by run. ' \
172     'Please see crab -help for the correct settings'
173     raise CrabException(msg)
174    
175    
176    
177 spiga 1.22 anFileBlocks = []
178 ewv 1.32 if self.skipBlocks: anFileBlocks = readTXTfile(self, fileBlocks_FileName)
179 spiga 1.22
180 slacapra 1.18 # parse files and fill arrays
181 spiga 1.26 for file in self.files :
182 ewv 1.32 parList = []
183 spiga 1.41 fileLumis = [] # List of tuples
184 spiga 1.22 # skip already analyzed blocks
185     fileblock = file['Block']['Name']
186     if fileblock not in anFileBlocks :
187     filename = file['LogicalFileName']
188 ewv 1.32 # asked retry the list of parent for the given child
189     if useparent==1:
190     parList = [x['LogicalFileName'] for x in file['ParentList']]
191 ewv 1.45 if self.splitByLumi:
192 spiga 1.41 fileLumis = [ (x['RunNumber'], x['LumiSectionNumber'])
193 ewv 1.32 for x in file['LumiList'] ]
194     self.parent[filename] = parList
195 spiga 1.41 # For LumiMask, intersection of two lists.
196 ewv 1.45 if self.lumiMask and runselection:
197     self.lumis[filename] = runList.filterLumis(lumiList.filterLumis(fileLumis))
198     elif runselection:
199     self.lumis[filename] = runList.filterLumis(fileLumis)
200     elif self.lumiMask:
201 spiga 1.41 self.lumis[filename] = lumiList.filterLumis(fileLumis)
202     else:
203     self.lumis[filename] = fileLumis
204 spiga 1.22 if filename.find('.dat') < 0 :
205     events = file['NumberOfEvents']
206 ewv 1.32 # Count number of events and lumis per block
207 spiga 1.22 if fileblock in self.eventsPerBlock.keys() :
208     self.eventsPerBlock[fileblock] += events
209     else :
210     self.eventsPerBlock[fileblock] = events
211 ewv 1.32 # Number of events per file
212 spiga 1.22 self.eventsPerFile[filename] = events
213 ewv 1.32
214     # List of files per block
215 spiga 1.22 if fileblock in self.blocksinfo.keys() :
216     self.blocksinfo[fileblock].append(filename)
217     else :
218     self.blocksinfo[fileblock] = [filename]
219 ewv 1.32
220 spiga 1.22 # total number of events
221     self.maxEvents += events
222 spiga 1.41 self.maxLumis += len(self.lumis[filename])
223 ewv 1.32
224 spiga 1.22 if self.skipBlocks and len(self.eventsPerBlock.keys()) == 0:
225     msg = "No new fileblocks available for dataset: "+str(self.datasetPath)
226 ewv 1.32 raise CrabException(msg)
227 slacapra 1.11
228 ewv 1.32
229 slacapra 1.11 if len(self.eventsPerBlock) <= 0:
230 slacapra 1.18 raise NotExistingDatasetError(("\nNo data for %s in DBS\nPlease check"
231 slacapra 1.11 + " dataset path variables in crab.cfg")
232 slacapra 1.18 % self.datasetPath)
233 afanfani 1.1
234    
235 ewv 1.32 def queryDbs(self,api,path=None,runselection=None,useParent=None):
236 spiga 1.26
237 ewv 1.48
238     allowedRetriveValue = []
239     if self.splitByLumi or self.splitByRun or useParent == 1:
240     allowedRetriveValue.extend(['retrive_block', 'retrive_run'])
241     if self.splitByLumi:
242 spiga 1.41 allowedRetriveValue.append('retrive_lumi')
243 ewv 1.48 if useParent == 1:
244     allowedRetriveValue.append('retrive_parent')
245 ewv 1.32 common.logger.debug("Set of input parameters used for DBS query: %s" % allowedRetriveValue)
246 spiga 1.26 try:
247 ewv 1.48 if self.splitByRun:
248     files = []
249 spiga 1.26 for arun in runselection:
250     try:
251 ewv 1.32 if self.ads:
252     filesinrun = api.listFiles(analysisDataset=path,retriveList=allowedRetriveValue,runNumber=arun)
253     else:
254     filesinrun = api.listFiles(path=path,retriveList=allowedRetriveValue,runNumber=arun)
255 spiga 1.26 files.extend(filesinrun)
256     except:
257     msg="WARNING: problem extracting info from DBS for run %s "%arun
258 spiga 1.31 common.logger.info(msg)
259 spiga 1.26 pass
260    
261 ewv 1.48 else:
262     if allowedRetriveValue:
263     if self.ads:
264     files = api.listFiles(analysisDataset=path, retriveList=allowedRetriveValue)
265     else :
266     files = api.listFiles(path=path, retriveList=allowedRetriveValue)
267     else:
268     files = api.listDatasetFiles(self.datasetPath)
269    
270 spiga 1.26 except DbsBadRequest, msg:
271     raise DataDiscoveryError(msg)
272     except DBSError, msg:
273     raise DataDiscoveryError(msg)
274    
275     return files
276    
277 ewv 1.32
278 afanfani 1.1 def getMaxEvents(self):
279     """
280 ewv 1.32 max events
281 afanfani 1.1 """
282 slacapra 1.18 return self.maxEvents
283 afanfani 1.1
284 ewv 1.32
285 ewv 1.33 def getMaxLumis(self):
286     """
287     Return the number of lumis in the dataset
288     """
289     return self.maxLumis
290    
291    
292 slacapra 1.11 def getEventsPerBlock(self):
293 afanfani 1.1 """
294 ewv 1.32 list the event collections structure by fileblock
295 afanfani 1.1 """
296 slacapra 1.11 return self.eventsPerBlock
297 afanfani 1.1
298 ewv 1.32
299 slacapra 1.11 def getEventsPerFile(self):
300 afanfani 1.1 """
301 ewv 1.32 list the event collections structure by file
302 afanfani 1.1 """
303 slacapra 1.11 return self.eventsPerFile
304 afanfani 1.1
305 ewv 1.32
306 slacapra 1.11 def getFiles(self):
307 afanfani 1.1 """
308 ewv 1.32 return files grouped by fileblock
309 afanfani 1.1 """
310 ewv 1.32 return self.blocksinfo
311    
312 afanfani 1.1
313 spiga 1.21 def getParent(self):
314     """
315 ewv 1.32 return parent grouped by file
316     """
317     return self.parent
318    
319    
320     def getLumis(self):
321     """
322     return lumi sections grouped by file
323 spiga 1.21 """
324 ewv 1.32 return self.lumis
325    
326 spiga 1.21
327 spiga 1.26 def getListFiles(self):
328     """
329 ewv 1.32 return parent grouped by file
330 spiga 1.26 """
331 ewv 1.32 return self.files