ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/DataDiscovery.py
Revision: 1.41
Committed: Sun Feb 21 12:47:19 2010 UTC (15 years, 2 months ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: fede_170310, CRAB_LumiMask, CRAB_2_7_lumi, from_LimiMask
Branch point for: CRAB_multiout
Changes since 1.40: +21 -10 lines
Log Message:
merging LumiMask..

File Contents

# Content
1 #!/usr/bin/env python
2
3 __revision__ = "$Id: DataDiscovery.py,v 1.40.2.1 2010/02/05 16:05:29 ewv Exp $"
4 __version__ = "$Revision: 1.40.2.1 $"
5
6 import exceptions
7 import DBSAPI.dbsApi
8 from DBSAPI.dbsApiException import *
9 import common
10 from crab_util import *
11 from LumiList import LumiList
12 import os
13
14
15
16 class DBSError(exceptions.Exception):
17 def __init__(self, errorName, errorMessage):
18 args='\nERROR DBS %s : %s \n'%(errorName,errorMessage)
19 exceptions.Exception.__init__(self, args)
20 pass
21
22 def getErrorMessage(self):
23 """ Return error message """
24 return "%s" % (self.args)
25
26
27
28 class DBSInvalidDataTierError(exceptions.Exception):
29 def __init__(self, errorName, errorMessage):
30 args='\nERROR DBS %s : %s \n'%(errorName,errorMessage)
31 exceptions.Exception.__init__(self, args)
32 pass
33
34 def getErrorMessage(self):
35 """ Return error message """
36 return "%s" % (self.args)
37
38
39
40 class DBSInfoError:
41 def __init__(self, url):
42 print '\nERROR accessing DBS url : '+url+'\n'
43 pass
44
45
46
47 class DataDiscoveryError(exceptions.Exception):
48 def __init__(self, errorMessage):
49 self.args=errorMessage
50 exceptions.Exception.__init__(self, self.args)
51 pass
52
53 def getErrorMessage(self):
54 """ Return exception error """
55 return "%s" % (self.args)
56
57
58
59 class NotExistingDatasetError(exceptions.Exception):
60 def __init__(self, errorMessage):
61 self.args=errorMessage
62 exceptions.Exception.__init__(self, self.args)
63 pass
64
65 def getErrorMessage(self):
66 """ Return exception error """
67 return "%s" % (self.args)
68
69
70
71 class NoDataTierinProvenanceError(exceptions.Exception):
72 def __init__(self, errorMessage):
73 self.args=errorMessage
74 exceptions.Exception.__init__(self, self.args)
75 pass
76
77 def getErrorMessage(self):
78 """ Return exception error """
79 return "%s" % (self.args)
80
81
82
83 class DataDiscovery:
84 """
85 Class to find and extact info from published data
86 """
87 def __init__(self, datasetPath, cfg_params, skipAnBlocks):
88
89 # Attributes
90 self.datasetPath = datasetPath
91 # Analysis dataset is primary/processed/tier/definition
92 self.ads = len(self.datasetPath.split("/")) > 4
93 self.cfg_params = cfg_params
94 self.skipBlocks = skipAnBlocks
95
96 self.eventsPerBlock = {} # DBS output: map fileblocks-events for collection
97 self.eventsPerFile = {} # DBS output: map files-events
98 # self.lumisPerBlock = {} # DBS output: number of lumis in each block
99 # self.lumisPerFile = {} # DBS output: number of lumis in each file
100 self.blocksinfo = {} # DBS output: map fileblocks-files
101 self.maxEvents = 0 # DBS output: max events
102 self.maxLumis = 0 # DBS output: total number of lumis
103 self.parent = {} # DBS output: parents of each file
104 self.lumis = {} # DBS output: lumis in each file
105 self.lumiMask = None
106
107 def fetchDBSInfo(self):
108 """
109 Contact DBS
110 """
111 ## get DBS URL
112 global_url="http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet"
113 caf_url = "http://cmsdbsprod.cern.ch/cms_dbs_caf_analysis_01/servlet/DBSServlet"
114 dbs_url_map = {'glite': global_url,
115 'glite_slc5':global_url,\
116 'glitecoll':global_url,\
117 'condor': global_url,\
118 'condor_g': global_url,\
119 'glidein': global_url,\
120 'lsf': global_url,\
121 'caf': caf_url,\
122 'sge': global_url,\
123 'arc': global_url,\
124 'pbs': global_url
125 }
126
127 dbs_url_default = dbs_url_map[(common.scheduler.name()).lower()]
128 dbs_url= self.cfg_params.get('CMSSW.dbs_url', dbs_url_default)
129 common.logger.info("Accessing DBS at: "+dbs_url)
130
131 ## check if runs are selected
132 runselection = []
133 if (self.cfg_params.has_key('CMSSW.runselection')):
134 runselection = parseRange2(self.cfg_params['CMSSW.runselection'])
135
136 ## check if lumiMask is set
137 self.lumiMask = self.cfg_params.get('CMSSW.lumi_mask',None)
138 lumiList = None
139 if self.lumiMask:
140 lumiList = LumiList(filename=self.lumiMask)
141
142 self.splitByRun = int(self.cfg_params.get('CMSSW.split_by_run', 0))
143
144 common.logger.log(10-1,"runselection is: %s"%runselection)
145 ## service API
146 args = {}
147 args['url'] = dbs_url
148 args['level'] = 'CRITICAL'
149
150 ## check if has been requested to use the parent info
151 useparent = int(self.cfg_params.get('CMSSW.use_parent',0))
152
153 ## check if has been asked for a non default file to store/read analyzed fileBlocks
154 defaultName = common.work_space.shareDir()+'AnalyzedBlocks.txt'
155 fileBlocks_FileName = os.path.abspath(self.cfg_params.get('CMSSW.fileblocks_file',defaultName))
156
157 api = DBSAPI.dbsApi.DbsApi(args)
158 self.files = self.queryDbs(api,path=self.datasetPath,runselection=runselection,useParent=useparent)
159
160 anFileBlocks = []
161 if self.skipBlocks: anFileBlocks = readTXTfile(self, fileBlocks_FileName)
162
163 # parse files and fill arrays
164 for file in self.files :
165 parList = []
166 fileLumis = [] # List of tuples
167 # skip already analyzed blocks
168 fileblock = file['Block']['Name']
169 if fileblock not in anFileBlocks :
170 filename = file['LogicalFileName']
171 # asked retry the list of parent for the given child
172 if useparent==1:
173 parList = [x['LogicalFileName'] for x in file['ParentList']]
174 if self.ads or self.lumiMask:
175 fileLumis = [ (x['RunNumber'], x['LumiSectionNumber'])
176 for x in file['LumiList'] ]
177 self.parent[filename] = parList
178 # For LumiMask, intersection of two lists.
179 if self.lumiMask:
180 self.lumis[filename] = lumiList.filterLumis(fileLumis)
181 else:
182 self.lumis[filename] = fileLumis
183 if filename.find('.dat') < 0 :
184 events = file['NumberOfEvents']
185 # Count number of events and lumis per block
186 if fileblock in self.eventsPerBlock.keys() :
187 self.eventsPerBlock[fileblock] += events
188 else :
189 self.eventsPerBlock[fileblock] = events
190 # Number of events per file
191 self.eventsPerFile[filename] = events
192
193 # List of files per block
194 if fileblock in self.blocksinfo.keys() :
195 self.blocksinfo[fileblock].append(filename)
196 else :
197 self.blocksinfo[fileblock] = [filename]
198
199 # total number of events
200 self.maxEvents += events
201 self.maxLumis += len(self.lumis[filename])
202
203 if self.skipBlocks and len(self.eventsPerBlock.keys()) == 0:
204 msg = "No new fileblocks available for dataset: "+str(self.datasetPath)
205 raise CrabException(msg)
206
207 saveFblocks=''
208 for block in self.eventsPerBlock.keys() :
209 saveFblocks += str(block)+'\n'
210 common.logger.log(10-1,"DBSInfo: total nevts %i in block %s "%(self.eventsPerBlock[block],block))
211 writeTXTfile(self, fileBlocks_FileName , saveFblocks)
212
213 if len(self.eventsPerBlock) <= 0:
214 raise NotExistingDatasetError(("\nNo data for %s in DBS\nPlease check"
215 + " dataset path variables in crab.cfg")
216 % self.datasetPath)
217
218
219 def queryDbs(self,api,path=None,runselection=None,useParent=None):
220
221 allowedRetriveValue = ['retrive_block', 'retrive_run']
222 if self.ads or self.lumiMask:
223 allowedRetriveValue.append('retrive_lumi')
224 if useParent == 1: allowedRetriveValue.append('retrive_parent')
225 common.logger.debug("Set of input parameters used for DBS query: %s" % allowedRetriveValue)
226 try:
227 if len(runselection) <=0 :
228 if useParent==1 or self.splitByRun==1 or self.ads or self.lumiMask:
229 if self.ads:
230 files = api.listFiles(analysisDataset=path, retriveList=allowedRetriveValue)
231 else :
232 files = api.listFiles(path=path, retriveList=allowedRetriveValue)
233 else:
234 files = api.listDatasetFiles(self.datasetPath)
235 else :
236 files=[]
237 for arun in runselection:
238 try:
239 if self.ads:
240 filesinrun = api.listFiles(analysisDataset=path,retriveList=allowedRetriveValue,runNumber=arun)
241 else:
242 filesinrun = api.listFiles(path=path,retriveList=allowedRetriveValue,runNumber=arun)
243 files.extend(filesinrun)
244 except:
245 msg="WARNING: problem extracting info from DBS for run %s "%arun
246 common.logger.info(msg)
247 pass
248
249 except DbsBadRequest, msg:
250 raise DataDiscoveryError(msg)
251 except DBSError, msg:
252 raise DataDiscoveryError(msg)
253
254 return files
255
256
257 def getMaxEvents(self):
258 """
259 max events
260 """
261 return self.maxEvents
262
263
264 def getMaxLumis(self):
265 """
266 Return the number of lumis in the dataset
267 """
268 return self.maxLumis
269
270
271 def getEventsPerBlock(self):
272 """
273 list the event collections structure by fileblock
274 """
275 return self.eventsPerBlock
276
277
278 def getEventsPerFile(self):
279 """
280 list the event collections structure by file
281 """
282 return self.eventsPerFile
283
284
285 def getFiles(self):
286 """
287 return files grouped by fileblock
288 """
289 return self.blocksinfo
290
291
292 def getParent(self):
293 """
294 return parent grouped by file
295 """
296 return self.parent
297
298
299 def getLumis(self):
300 """
301 return lumi sections grouped by file
302 """
303 return self.lumis
304
305
306 def getListFiles(self):
307 """
308 return parent grouped by file
309 """
310 return self.files