ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/DataDiscovery.py
Revision: 1.34
Committed: Thu Aug 20 09:25:58 2009 UTC (15 years, 8 months ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_6_6_pre6, CRAB_2_6_6_pre5, CRAB_2_6_6_pre4, CRAB_2_6_6_pre3, CRAB_2_6_6_pre2, CRAB_2_6_6_check, CRAB_2_6_6, CRAB_2_6_6_pre1, CRAB_2_6_5, CRAB_2_6_5_pre1, CRAB_2_6_4, CRAB_2_6_4_pre1, CRAB_2_6_3_patch_2, CRAB_2_6_3_patch_2_pre2, CRAB_2_6_3_patch_2_pre1, CRAB_2_6_3_patch_1, CRAB_2_7_0_pre4, CRAB_2_7_0_pre3, CRAB_2_6_3, CRAB_2_6_3_pre5, CRAB_2_6_3_pre4, CRAB_2_6_3_pre3, CRAB_2_6_3_pre2, CRAB_2_7_0_pre2, CRAB_2_6_3_pre1, test_1, CRAB_2_7_0_pre1, CRAB_2_6_2, CRAB_2_6_2_pre2, CRAB_2_6_2_pre1
Branch point for: CRAB_2_6_X_br
Changes since 1.33: +3 -3 lines
Log Message:
fix for ads VS dataset discriminator

File Contents

# User Rev Content
1 gutsche 1.6 #!/usr/bin/env python
2 ewv 1.33
3 spiga 1.34 __revision__ = "$Id: DataDiscovery.py,v 1.33 2009/07/30 18:45:44 ewv Exp $"
4     __version__ = "$Revision: 1.33 $"
5 ewv 1.33
6 slacapra 1.18 import exceptions
7     import DBSAPI.dbsApi
8 ewv 1.32 from DBSAPI.dbsApiException import *
9 slacapra 1.18 import common
10     from crab_util import *
11 ewv 1.32 import os
12    
13 afanfani 1.1
14 afanfani 1.3
15 slacapra 1.18 class DBSError(exceptions.Exception):
16     def __init__(self, errorName, errorMessage):
17     args='\nERROR DBS %s : %s \n'%(errorName,errorMessage)
18     exceptions.Exception.__init__(self, args)
19     pass
20 ewv 1.32
21 slacapra 1.18 def getErrorMessage(self):
22     """ Return error message """
23     return "%s" % (self.args)
24    
25 ewv 1.32
26    
27 slacapra 1.18 class DBSInvalidDataTierError(exceptions.Exception):
28     def __init__(self, errorName, errorMessage):
29     args='\nERROR DBS %s : %s \n'%(errorName,errorMessage)
30     exceptions.Exception.__init__(self, args)
31     pass
32 ewv 1.32
33 slacapra 1.18 def getErrorMessage(self):
34     """ Return error message """
35     return "%s" % (self.args)
36    
37 ewv 1.32
38    
39 slacapra 1.18 class DBSInfoError:
40     def __init__(self, url):
41     print '\nERROR accessing DBS url : '+url+'\n'
42     pass
43    
44 ewv 1.32
45    
46 afanfani 1.3 class DataDiscoveryError(exceptions.Exception):
47 slacapra 1.7 def __init__(self, errorMessage):
48 gutsche 1.15 self.args=errorMessage
49 slacapra 1.14 exceptions.Exception.__init__(self, self.args)
50 slacapra 1.7 pass
51    
52     def getErrorMessage(self):
53     """ Return exception error """
54     return "%s" % (self.args)
55 afanfani 1.3
56 ewv 1.32
57    
58 afanfani 1.3 class NotExistingDatasetError(exceptions.Exception):
59 slacapra 1.7 def __init__(self, errorMessage):
60 gutsche 1.15 self.args=errorMessage
61 slacapra 1.14 exceptions.Exception.__init__(self, self.args)
62 slacapra 1.7 pass
63    
64     def getErrorMessage(self):
65     """ Return exception error """
66     return "%s" % (self.args)
67 afanfani 1.1
68 ewv 1.32
69    
70 afanfani 1.3 class NoDataTierinProvenanceError(exceptions.Exception):
71 slacapra 1.7 def __init__(self, errorMessage):
72 gutsche 1.15 self.args=errorMessage
73 slacapra 1.14 exceptions.Exception.__init__(self, self.args)
74 slacapra 1.7 pass
75    
76     def getErrorMessage(self):
77     """ Return exception error """
78     return "%s" % (self.args)
79 afanfani 1.1
80 ewv 1.32
81    
82 afanfani 1.1 class DataDiscovery:
83 ewv 1.32 """
84     Class to find and extact info from published data
85     """
86 spiga 1.22 def __init__(self, datasetPath, cfg_params, skipAnBlocks):
87 afanfani 1.1
88 slacapra 1.18 # Attributes
89 slacapra 1.11 self.datasetPath = datasetPath
90 ewv 1.32 # Analysis dataset is primary/processed/tier/definition
91 spiga 1.34 self.ads = len(self.datasetPath.split("/")) > 4
92 afanfani 1.1 self.cfg_params = cfg_params
93 spiga 1.22 self.skipBlocks = skipAnBlocks
94 afanfani 1.1
95 slacapra 1.11 self.eventsPerBlock = {} # DBS output: map fileblocks-events for collection
96     self.eventsPerFile = {} # DBS output: map files-events
97 ewv 1.32 # self.lumisPerBlock = {} # DBS output: number of lumis in each block
98     # self.lumisPerFile = {} # DBS output: number of lumis in each file
99     self.blocksinfo = {} # DBS output: map fileblocks-files
100 slacapra 1.18 self.maxEvents = 0 # DBS output: max events
101 ewv 1.32 self.maxLumis = 0 # DBS output: total number of lumis
102     self.parent = {} # DBS output: parents of each file
103     self.lumis = {} # DBS output: lumis in each file
104    
105 afanfani 1.1
106     def fetchDBSInfo(self):
107     """
108     Contact DBS
109     """
110 slacapra 1.11 ## get DBS URL
111 spiga 1.25 global_url="http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet"
112     caf_url = "http://cmsdbsprod.cern.ch/cms_dbs_caf_analysis_01/servlet/DBSServlet"
113     dbs_url_map = {'glite': global_url,
114     'glitecoll':global_url,\
115     'condor': global_url,\
116     'condor_g': global_url,\
117     'glidein': global_url,\
118     'lsf': global_url,\
119     'caf': caf_url,\
120 edelmann 1.29 'sge': global_url,
121     'arc': global_url
122 spiga 1.25 }
123 afanfani 1.3
124 spiga 1.25 dbs_url_default = dbs_url_map[(common.scheduler.name()).lower()]
125     dbs_url= self.cfg_params.get('CMSSW.dbs_url', dbs_url_default)
126 spiga 1.31 common.logger.debug("Accessing DBS at: "+dbs_url)
127 slacapra 1.18
128     ## check if runs are selected
129 slacapra 1.19 runselection = []
130     if (self.cfg_params.has_key('CMSSW.runselection')):
131 slacapra 1.18 runselection = parseRange2(self.cfg_params['CMSSW.runselection'])
132    
133 spiga 1.26
134     self.splitByRun = int(self.cfg_params.get('CMSSW.split_by_run', 0))
135    
136 spiga 1.31 common.logger.log(10-1,"runselection is: %s"%runselection)
137 slacapra 1.18 ## service API
138     args = {}
139     args['url'] = dbs_url
140     args['level'] = 'CRITICAL'
141    
142 spiga 1.21 ## check if has been requested to use the parent info
143 spiga 1.26 useparent = int(self.cfg_params.get('CMSSW.use_parent',0))
144 spiga 1.21
145 ewv 1.32 ## check if has been asked for a non default file to store/read analyzed fileBlocks
146     defaultName = common.work_space.shareDir()+'AnalyzedBlocks.txt'
147 spiga 1.22 fileBlocks_FileName = os.path.abspath(self.cfg_params.get('CMSSW.fileblocks_file',defaultName))
148 ewv 1.32
149 slacapra 1.18 api = DBSAPI.dbsApi.DbsApi(args)
150 spiga 1.27 self.files = self.queryDbs(api,path=self.datasetPath,runselection=runselection,useParent=useparent)
151 slacapra 1.11
152 spiga 1.22 anFileBlocks = []
153 ewv 1.32 if self.skipBlocks: anFileBlocks = readTXTfile(self, fileBlocks_FileName)
154 spiga 1.22
155 slacapra 1.18 # parse files and fill arrays
156 spiga 1.26 for file in self.files :
157 ewv 1.32 parList = []
158     lumiList = [] # List of tuples
159 spiga 1.22 # skip already analyzed blocks
160     fileblock = file['Block']['Name']
161     if fileblock not in anFileBlocks :
162     filename = file['LogicalFileName']
163 ewv 1.32 # asked retry the list of parent for the given child
164     if useparent==1:
165     parList = [x['LogicalFileName'] for x in file['ParentList']]
166     if self.ads:
167     lumiList = [ (x['RunNumber'], x['LumiSectionNumber'])
168     for x in file['LumiList'] ]
169     self.parent[filename] = parList
170     self.lumis[filename] = lumiList
171 spiga 1.22 if filename.find('.dat') < 0 :
172     events = file['NumberOfEvents']
173 ewv 1.32 # Count number of events and lumis per block
174 spiga 1.22 if fileblock in self.eventsPerBlock.keys() :
175     self.eventsPerBlock[fileblock] += events
176     else :
177     self.eventsPerBlock[fileblock] = events
178 ewv 1.32 # Number of events per file
179 spiga 1.22 self.eventsPerFile[filename] = events
180 ewv 1.32
181     # List of files per block
182 spiga 1.22 if fileblock in self.blocksinfo.keys() :
183     self.blocksinfo[fileblock].append(filename)
184     else :
185     self.blocksinfo[fileblock] = [filename]
186 ewv 1.32
187 spiga 1.22 # total number of events
188     self.maxEvents += events
189 ewv 1.32 self.maxLumis += len(lumiList)
190    
191 spiga 1.22 if self.skipBlocks and len(self.eventsPerBlock.keys()) == 0:
192     msg = "No new fileblocks available for dataset: "+str(self.datasetPath)
193 ewv 1.32 raise CrabException(msg)
194 slacapra 1.11
195 ewv 1.32 saveFblocks=''
196 slacapra 1.18 for block in self.eventsPerBlock.keys() :
197 ewv 1.32 saveFblocks += str(block)+'\n'
198 spiga 1.31 common.logger.log(10-1,"DBSInfo: total nevts %i in block %s "%(self.eventsPerBlock[block],block))
199 ewv 1.32 writeTXTfile(self, fileBlocks_FileName , saveFblocks)
200    
201 slacapra 1.11 if len(self.eventsPerBlock) <= 0:
202 slacapra 1.18 raise NotExistingDatasetError(("\nNo data for %s in DBS\nPlease check"
203 slacapra 1.11 + " dataset path variables in crab.cfg")
204 slacapra 1.18 % self.datasetPath)
205 afanfani 1.1
206    
207 ewv 1.32 def queryDbs(self,api,path=None,runselection=None,useParent=None):
208 spiga 1.26
209 spiga 1.30 allowedRetriveValue = ['retrive_block', 'retrive_run']
210 ewv 1.32 if self.ads: allowedRetriveValue.append('retrive_lumi')
211     if useParent == 1: allowedRetriveValue.append('retrive_parent')
212     common.logger.debug("Set of input parameters used for DBS query: %s" % allowedRetriveValue)
213 spiga 1.26 try:
214 spiga 1.27 if len(runselection) <=0 :
215 spiga 1.26 if useParent==1 or self.splitByRun==1 :
216 ewv 1.32 if self.ads:
217     files = api.listFiles(analysisDataset=path, retriveList=allowedRetriveValue)
218 spiga 1.26 else :
219 ewv 1.32 files = api.listFiles(path=path, retriveList=allowedRetriveValue)
220 spiga 1.26 else:
221     files = api.listDatasetFiles(self.datasetPath)
222     else :
223     files=[]
224     for arun in runselection:
225     try:
226 ewv 1.32 if self.ads:
227     filesinrun = api.listFiles(analysisDataset=path,retriveList=allowedRetriveValue,runNumber=arun)
228     else:
229     filesinrun = api.listFiles(path=path,retriveList=allowedRetriveValue,runNumber=arun)
230 spiga 1.26 files.extend(filesinrun)
231     except:
232     msg="WARNING: problem extracting info from DBS for run %s "%arun
233 spiga 1.31 common.logger.info(msg)
234 spiga 1.26 pass
235    
236     except DbsBadRequest, msg:
237     raise DataDiscoveryError(msg)
238     except DBSError, msg:
239     raise DataDiscoveryError(msg)
240    
241     return files
242    
243 ewv 1.32
244 afanfani 1.1 def getMaxEvents(self):
245     """
246 ewv 1.32 max events
247 afanfani 1.1 """
248 slacapra 1.18 return self.maxEvents
249 afanfani 1.1
250 ewv 1.32
251 ewv 1.33 def getMaxLumis(self):
252     """
253     Return the number of lumis in the dataset
254     """
255     return self.maxLumis
256    
257    
258 slacapra 1.11 def getEventsPerBlock(self):
259 afanfani 1.1 """
260 ewv 1.32 list the event collections structure by fileblock
261 afanfani 1.1 """
262 slacapra 1.11 return self.eventsPerBlock
263 afanfani 1.1
264 ewv 1.32
265 slacapra 1.11 def getEventsPerFile(self):
266 afanfani 1.1 """
267 ewv 1.32 list the event collections structure by file
268 afanfani 1.1 """
269 slacapra 1.11 return self.eventsPerFile
270 afanfani 1.1
271 ewv 1.32
272 slacapra 1.11 def getFiles(self):
273 afanfani 1.1 """
274 ewv 1.32 return files grouped by fileblock
275 afanfani 1.1 """
276 ewv 1.32 return self.blocksinfo
277    
278 afanfani 1.1
279 spiga 1.21 def getParent(self):
280     """
281 ewv 1.32 return parent grouped by file
282     """
283     return self.parent
284    
285    
286     def getLumis(self):
287     """
288     return lumi sections grouped by file
289 spiga 1.21 """
290 ewv 1.32 return self.lumis
291    
292 spiga 1.21
293 spiga 1.26 def getListFiles(self):
294     """
295 ewv 1.32 return parent grouped by file
296 spiga 1.26 """
297 ewv 1.32 return self.files