ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/DataDiscovery.py
Revision: 1.45
Committed: Thu May 27 18:54:45 2010 UTC (14 years, 11 months ago) by ewv
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_7_3_pre1, CRAB_2_7_3_beta3, CRAB_2_7_3_beta2, CRAB_2_7_3_beta1
Changes since 1.44: +27 -9 lines
Log Message:
Enforce split by lumi for data, enable split by lumi with runselection but no lumi mask

File Contents

# Content
1 #!/usr/bin/env python
2
3 __revision__ = "$Id: DataDiscovery.py,v 1.42 2010/03/22 21:17:15 ewv Exp $"
4 __version__ = "$Revision: 1.42 $"
5
6 import exceptions
7 import DBSAPI.dbsApi
8 from DBSAPI.dbsApiException import *
9 import common
10 from crab_util import *
11 from LumiList import LumiList
12 import os
13
14
15
16 class DBSError(exceptions.Exception):
17 def __init__(self, errorName, errorMessage):
18 args='\nERROR DBS %s : %s \n'%(errorName,errorMessage)
19 exceptions.Exception.__init__(self, args)
20 pass
21
22 def getErrorMessage(self):
23 """ Return error message """
24 return "%s" % (self.args)
25
26
27
28 class DBSInvalidDataTierError(exceptions.Exception):
29 def __init__(self, errorName, errorMessage):
30 args='\nERROR DBS %s : %s \n'%(errorName,errorMessage)
31 exceptions.Exception.__init__(self, args)
32 pass
33
34 def getErrorMessage(self):
35 """ Return error message """
36 return "%s" % (self.args)
37
38
39
40 class DBSInfoError:
41 def __init__(self, url):
42 print '\nERROR accessing DBS url : '+url+'\n'
43 pass
44
45
46
47 class DataDiscoveryError(exceptions.Exception):
48 def __init__(self, errorMessage):
49 self.args=errorMessage
50 exceptions.Exception.__init__(self, self.args)
51 pass
52
53 def getErrorMessage(self):
54 """ Return exception error """
55 return "%s" % (self.args)
56
57
58
59 class NotExistingDatasetError(exceptions.Exception):
60 def __init__(self, errorMessage):
61 self.args=errorMessage
62 exceptions.Exception.__init__(self, self.args)
63 pass
64
65 def getErrorMessage(self):
66 """ Return exception error """
67 return "%s" % (self.args)
68
69
70
71 class NoDataTierinProvenanceError(exceptions.Exception):
72 def __init__(self, errorMessage):
73 self.args=errorMessage
74 exceptions.Exception.__init__(self, self.args)
75 pass
76
77 def getErrorMessage(self):
78 """ Return exception error """
79 return "%s" % (self.args)
80
81
82
83 class DataDiscovery:
84 """
85 Class to find and extact info from published data
86 """
87 def __init__(self, datasetPath, cfg_params, skipAnBlocks):
88
89 # Attributes
90 self.datasetPath = datasetPath
91 # Analysis dataset is primary/processed/tier/definition
92 self.ads = len(self.datasetPath.split("/")) > 4
93 self.cfg_params = cfg_params
94 self.skipBlocks = skipAnBlocks
95
96 self.eventsPerBlock = {} # DBS output: map fileblocks-events for collection
97 self.eventsPerFile = {} # DBS output: map files-events
98 # self.lumisPerBlock = {} # DBS output: number of lumis in each block
99 # self.lumisPerFile = {} # DBS output: number of lumis in each file
100 self.blocksinfo = {} # DBS output: map fileblocks-files
101 self.maxEvents = 0 # DBS output: max events
102 self.maxLumis = 0 # DBS output: total number of lumis
103 self.parent = {} # DBS output: parents of each file
104 self.lumis = {} # DBS output: lumis in each file
105 self.lumiMask = None
106 self.splitByLumi = False
107
108 def fetchDBSInfo(self):
109 """
110 Contact DBS
111 """
112 ## get DBS URL
113 global_url="http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet"
114 dbs_url= self.cfg_params.get('CMSSW.dbs_url', global_url)
115 common.logger.info("Accessing DBS at: "+dbs_url)
116
117 ## check if runs are selected
118 runselection = []
119 if (self.cfg_params.has_key('CMSSW.runselection')):
120 runselection = parseRange2(self.cfg_params['CMSSW.runselection'])
121
122 ## check if various lumi parameters are set
123 self.lumiMask = self.cfg_params.get('CMSSW.lumi_mask',None)
124 self.lumiParams = self.cfg_params.get('CMSSW.total_number_of_lumis',None) or \
125 self.cfg_params.get('CMSSW.lumis_per_job',None)
126
127 lumiList = None
128 if self.lumiMask:
129 lumiList = LumiList(filename=self.lumiMask)
130 if runselection:
131 runList = LumiList(runs = runselection)
132
133 self.splitByRun = int(self.cfg_params.get('CMSSW.split_by_run', 0))
134 common.logger.log(10-1,"runselection is: %s"%runselection)
135
136 if not self.splitByRun:
137 self.splitByLumi = self.lumiMask or self.lumiParams or self.ads
138
139 ## service API
140 args = {}
141 args['url'] = dbs_url
142 args['level'] = 'CRITICAL'
143
144 ## check if has been requested to use the parent info
145 useparent = int(self.cfg_params.get('CMSSW.use_parent',0))
146
147 ## check if has been asked for a non default file to store/read analyzed fileBlocks
148 defaultName = common.work_space.shareDir()+'AnalyzedBlocks.txt'
149 fileBlocks_FileName = os.path.abspath(self.cfg_params.get('CMSSW.fileblocks_file',defaultName))
150
151 api = DBSAPI.dbsApi.DbsApi(args)
152 self.files = self.queryDbs(api,path=self.datasetPath,runselection=runselection,useParent=useparent)
153
154 # Check to see what the dataset is
155 pdsName = self.datasetPath.split("/")[1]
156 primDSs = api.listPrimaryDatasets(pdsName)
157 dataType = primDSs[0]['Type']
158 common.logger.debug("Datatype is %s" % dataType)
159 if dataType == 'data' and not (self.splitByRun or self.splitByLumi):
160 msg = 'Data must be split by lumi or by run. ' \
161 'Please see crab -help for the correct settings'
162 raise CrabException(msg)
163
164
165
166 anFileBlocks = []
167 if self.skipBlocks: anFileBlocks = readTXTfile(self, fileBlocks_FileName)
168
169 # parse files and fill arrays
170 for file in self.files :
171 parList = []
172 fileLumis = [] # List of tuples
173 # skip already analyzed blocks
174 fileblock = file['Block']['Name']
175 if fileblock not in anFileBlocks :
176 filename = file['LogicalFileName']
177 # asked retry the list of parent for the given child
178 if useparent==1:
179 parList = [x['LogicalFileName'] for x in file['ParentList']]
180 if self.splitByLumi:
181 fileLumis = [ (x['RunNumber'], x['LumiSectionNumber'])
182 for x in file['LumiList'] ]
183 self.parent[filename] = parList
184 # For LumiMask, intersection of two lists.
185 if self.lumiMask and runselection:
186 self.lumis[filename] = runList.filterLumis(lumiList.filterLumis(fileLumis))
187 elif runselection:
188 self.lumis[filename] = runList.filterLumis(fileLumis)
189 elif self.lumiMask:
190 self.lumis[filename] = lumiList.filterLumis(fileLumis)
191 else:
192 self.lumis[filename] = fileLumis
193 if filename.find('.dat') < 0 :
194 events = file['NumberOfEvents']
195 # Count number of events and lumis per block
196 if fileblock in self.eventsPerBlock.keys() :
197 self.eventsPerBlock[fileblock] += events
198 else :
199 self.eventsPerBlock[fileblock] = events
200 # Number of events per file
201 self.eventsPerFile[filename] = events
202
203 # List of files per block
204 if fileblock in self.blocksinfo.keys() :
205 self.blocksinfo[fileblock].append(filename)
206 else :
207 self.blocksinfo[fileblock] = [filename]
208
209 # total number of events
210 self.maxEvents += events
211 self.maxLumis += len(self.lumis[filename])
212
213 if self.skipBlocks and len(self.eventsPerBlock.keys()) == 0:
214 msg = "No new fileblocks available for dataset: "+str(self.datasetPath)
215 raise CrabException(msg)
216
217 saveFblocks=''
218 for block in self.eventsPerBlock.keys() :
219 saveFblocks += str(block)+'\n'
220 common.logger.log(10-1,"DBSInfo: total nevts %i in block %s "%(self.eventsPerBlock[block],block))
221 writeTXTfile(self, fileBlocks_FileName , saveFblocks)
222
223 if len(self.eventsPerBlock) <= 0:
224 raise NotExistingDatasetError(("\nNo data for %s in DBS\nPlease check"
225 + " dataset path variables in crab.cfg")
226 % self.datasetPath)
227
228
229 def queryDbs(self,api,path=None,runselection=None,useParent=None):
230
231 allowedRetriveValue = ['retrive_block', 'retrive_run']
232 if self.ads or self.lumiMask or self.lumiParams:
233 allowedRetriveValue.append('retrive_lumi')
234 if useParent == 1: allowedRetriveValue.append('retrive_parent')
235 common.logger.debug("Set of input parameters used for DBS query: %s" % allowedRetriveValue)
236 try:
237 if len(runselection) <=0 or self.splitByLumi:
238 if useParent==1 or self.splitByRun==1 or self.splitByLumi:
239 if self.ads:
240 files = api.listFiles(analysisDataset=path, retriveList=allowedRetriveValue)
241 else :
242 files = api.listFiles(path=path, retriveList=allowedRetriveValue)
243 else:
244 files = api.listDatasetFiles(self.datasetPath)
245 else :
246 files=[]
247 for arun in runselection:
248 try:
249 if self.ads:
250 filesinrun = api.listFiles(analysisDataset=path,retriveList=allowedRetriveValue,runNumber=arun)
251 else:
252 filesinrun = api.listFiles(path=path,retriveList=allowedRetriveValue,runNumber=arun)
253 files.extend(filesinrun)
254 except:
255 msg="WARNING: problem extracting info from DBS for run %s "%arun
256 common.logger.info(msg)
257 pass
258
259 except DbsBadRequest, msg:
260 raise DataDiscoveryError(msg)
261 except DBSError, msg:
262 raise DataDiscoveryError(msg)
263
264 return files
265
266
267 def getMaxEvents(self):
268 """
269 max events
270 """
271 return self.maxEvents
272
273
274 def getMaxLumis(self):
275 """
276 Return the number of lumis in the dataset
277 """
278 return self.maxLumis
279
280
281 def getEventsPerBlock(self):
282 """
283 list the event collections structure by fileblock
284 """
285 return self.eventsPerBlock
286
287
288 def getEventsPerFile(self):
289 """
290 list the event collections structure by file
291 """
292 return self.eventsPerFile
293
294
295 def getFiles(self):
296 """
297 return files grouped by fileblock
298 """
299 return self.blocksinfo
300
301
302 def getParent(self):
303 """
304 return parent grouped by file
305 """
306 return self.parent
307
308
309 def getLumis(self):
310 """
311 return lumi sections grouped by file
312 """
313 return self.lumis
314
315
316 def getListFiles(self):
317 """
318 return parent grouped by file
319 """
320 return self.files