ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/DataDiscovery.py
Revision: 1.32
Committed: Wed Jul 29 21:41:50 2009 UTC (15 years, 9 months ago) by ewv
Content type: text/x-python
Branch: MAIN
Changes since 1.31: +88 -63 lines
Log Message:
Detect ADS, not user set, and fetch lumis for ADS

File Contents

# User Rev Content
1 gutsche 1.6 #!/usr/bin/env python
2 slacapra 1.18 import exceptions
3     import DBSAPI.dbsApi
4 ewv 1.32 from DBSAPI.dbsApiException import *
5 slacapra 1.18 import common
6     from crab_util import *
7 ewv 1.32 import os
8    
9 afanfani 1.1
10 afanfani 1.3
11 slacapra 1.18 class DBSError(exceptions.Exception):
12     def __init__(self, errorName, errorMessage):
13     args='\nERROR DBS %s : %s \n'%(errorName,errorMessage)
14     exceptions.Exception.__init__(self, args)
15     pass
16 ewv 1.32
17 slacapra 1.18 def getErrorMessage(self):
18     """ Return error message """
19     return "%s" % (self.args)
20    
21 ewv 1.32
22    
23 slacapra 1.18 class DBSInvalidDataTierError(exceptions.Exception):
24     def __init__(self, errorName, errorMessage):
25     args='\nERROR DBS %s : %s \n'%(errorName,errorMessage)
26     exceptions.Exception.__init__(self, args)
27     pass
28 ewv 1.32
29 slacapra 1.18 def getErrorMessage(self):
30     """ Return error message """
31     return "%s" % (self.args)
32    
33 ewv 1.32
34    
35 slacapra 1.18 class DBSInfoError:
36     def __init__(self, url):
37     print '\nERROR accessing DBS url : '+url+'\n'
38     pass
39    
40 ewv 1.32
41    
42 afanfani 1.3 class DataDiscoveryError(exceptions.Exception):
43 slacapra 1.7 def __init__(self, errorMessage):
44 gutsche 1.15 self.args=errorMessage
45 slacapra 1.14 exceptions.Exception.__init__(self, self.args)
46 slacapra 1.7 pass
47    
48     def getErrorMessage(self):
49     """ Return exception error """
50     return "%s" % (self.args)
51 afanfani 1.3
52 ewv 1.32
53    
54 afanfani 1.3 class NotExistingDatasetError(exceptions.Exception):
55 slacapra 1.7 def __init__(self, errorMessage):
56 gutsche 1.15 self.args=errorMessage
57 slacapra 1.14 exceptions.Exception.__init__(self, self.args)
58 slacapra 1.7 pass
59    
60     def getErrorMessage(self):
61     """ Return exception error """
62     return "%s" % (self.args)
63 afanfani 1.1
64 ewv 1.32
65    
66 afanfani 1.3 class NoDataTierinProvenanceError(exceptions.Exception):
67 slacapra 1.7 def __init__(self, errorMessage):
68 gutsche 1.15 self.args=errorMessage
69 slacapra 1.14 exceptions.Exception.__init__(self, self.args)
70 slacapra 1.7 pass
71    
72     def getErrorMessage(self):
73     """ Return exception error """
74     return "%s" % (self.args)
75 afanfani 1.1
76 ewv 1.32
77    
78 afanfani 1.1 class DataDiscovery:
79 ewv 1.32 """
80     Class to find and extact info from published data
81     """
82 spiga 1.22 def __init__(self, datasetPath, cfg_params, skipAnBlocks):
83 afanfani 1.1
84 slacapra 1.18 # Attributes
85 slacapra 1.11 self.datasetPath = datasetPath
86 ewv 1.32 # Analysis dataset is primary/processed/tier/definition
87     self.ads = len(self.datasetPath.split("/")) > 3
88 afanfani 1.1 self.cfg_params = cfg_params
89 spiga 1.22 self.skipBlocks = skipAnBlocks
90 afanfani 1.1
91 slacapra 1.11 self.eventsPerBlock = {} # DBS output: map fileblocks-events for collection
92     self.eventsPerFile = {} # DBS output: map files-events
93 ewv 1.32 # self.lumisPerBlock = {} # DBS output: number of lumis in each block
94     # self.lumisPerFile = {} # DBS output: number of lumis in each file
95     self.blocksinfo = {} # DBS output: map fileblocks-files
96 slacapra 1.18 self.maxEvents = 0 # DBS output: max events
97 ewv 1.32 self.maxLumis = 0 # DBS output: total number of lumis
98     self.parent = {} # DBS output: parents of each file
99     self.lumis = {} # DBS output: lumis in each file
100    
101 afanfani 1.1
102     def fetchDBSInfo(self):
103     """
104     Contact DBS
105     """
106 slacapra 1.11 ## get DBS URL
107 spiga 1.25 global_url="http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet"
108     caf_url = "http://cmsdbsprod.cern.ch/cms_dbs_caf_analysis_01/servlet/DBSServlet"
109     dbs_url_map = {'glite': global_url,
110     'glitecoll':global_url,\
111     'condor': global_url,\
112     'condor_g': global_url,\
113     'glidein': global_url,\
114     'lsf': global_url,\
115     'caf': caf_url,\
116 edelmann 1.29 'sge': global_url,
117     'arc': global_url
118 spiga 1.25 }
119 afanfani 1.3
120 spiga 1.25 dbs_url_default = dbs_url_map[(common.scheduler.name()).lower()]
121     dbs_url= self.cfg_params.get('CMSSW.dbs_url', dbs_url_default)
122 spiga 1.31 common.logger.debug("Accessing DBS at: "+dbs_url)
123 slacapra 1.18
124     ## check if runs are selected
125 slacapra 1.19 runselection = []
126     if (self.cfg_params.has_key('CMSSW.runselection')):
127 slacapra 1.18 runselection = parseRange2(self.cfg_params['CMSSW.runselection'])
128    
129 spiga 1.26
130     self.splitByRun = int(self.cfg_params.get('CMSSW.split_by_run', 0))
131    
132 spiga 1.31 common.logger.log(10-1,"runselection is: %s"%runselection)
133 slacapra 1.18 ## service API
134     args = {}
135     args['url'] = dbs_url
136     args['level'] = 'CRITICAL'
137    
138 spiga 1.21 ## check if has been requested to use the parent info
139 spiga 1.26 useparent = int(self.cfg_params.get('CMSSW.use_parent',0))
140 spiga 1.21
141 ewv 1.32 ## check if has been asked for a non default file to store/read analyzed fileBlocks
142     defaultName = common.work_space.shareDir()+'AnalyzedBlocks.txt'
143 spiga 1.22 fileBlocks_FileName = os.path.abspath(self.cfg_params.get('CMSSW.fileblocks_file',defaultName))
144 ewv 1.32
145 slacapra 1.18 api = DBSAPI.dbsApi.DbsApi(args)
146 spiga 1.27 self.files = self.queryDbs(api,path=self.datasetPath,runselection=runselection,useParent=useparent)
147 slacapra 1.11
148 spiga 1.22 anFileBlocks = []
149 ewv 1.32 if self.skipBlocks: anFileBlocks = readTXTfile(self, fileBlocks_FileName)
150 spiga 1.22
151 slacapra 1.18 # parse files and fill arrays
152 spiga 1.26 for file in self.files :
153 ewv 1.32 parList = []
154     lumiList = [] # List of tuples
155 spiga 1.22 # skip already analyzed blocks
156     fileblock = file['Block']['Name']
157     if fileblock not in anFileBlocks :
158     filename = file['LogicalFileName']
159 ewv 1.32 # asked retry the list of parent for the given child
160     if useparent==1:
161     parList = [x['LogicalFileName'] for x in file['ParentList']]
162     if self.ads:
163     lumiList = [ (x['RunNumber'], x['LumiSectionNumber'])
164     for x in file['LumiList'] ]
165     self.parent[filename] = parList
166     self.lumis[filename] = lumiList
167 spiga 1.22 if filename.find('.dat') < 0 :
168     events = file['NumberOfEvents']
169 ewv 1.32 # Count number of events and lumis per block
170 spiga 1.22 if fileblock in self.eventsPerBlock.keys() :
171     self.eventsPerBlock[fileblock] += events
172     else :
173     self.eventsPerBlock[fileblock] = events
174 ewv 1.32 # Number of events per file
175 spiga 1.22 self.eventsPerFile[filename] = events
176 ewv 1.32
177     # List of files per block
178 spiga 1.22 if fileblock in self.blocksinfo.keys() :
179     self.blocksinfo[fileblock].append(filename)
180     else :
181     self.blocksinfo[fileblock] = [filename]
182 ewv 1.32
183 spiga 1.22 # total number of events
184     self.maxEvents += events
185 ewv 1.32 self.maxLumis += len(lumiList)
186    
187 spiga 1.22 if self.skipBlocks and len(self.eventsPerBlock.keys()) == 0:
188     msg = "No new fileblocks available for dataset: "+str(self.datasetPath)
189 ewv 1.32 raise CrabException(msg)
190 slacapra 1.11
191 ewv 1.32 saveFblocks=''
192 slacapra 1.18 for block in self.eventsPerBlock.keys() :
193 ewv 1.32 saveFblocks += str(block)+'\n'
194 spiga 1.31 common.logger.log(10-1,"DBSInfo: total nevts %i in block %s "%(self.eventsPerBlock[block],block))
195 ewv 1.32 writeTXTfile(self, fileBlocks_FileName , saveFblocks)
196    
197 slacapra 1.11 if len(self.eventsPerBlock) <= 0:
198 slacapra 1.18 raise NotExistingDatasetError(("\nNo data for %s in DBS\nPlease check"
199 slacapra 1.11 + " dataset path variables in crab.cfg")
200 slacapra 1.18 % self.datasetPath)
201 afanfani 1.1
202    
203 ewv 1.32 def queryDbs(self,api,path=None,runselection=None,useParent=None):
204 spiga 1.26
205 spiga 1.30 allowedRetriveValue = ['retrive_block', 'retrive_run']
206 ewv 1.32 if self.ads: allowedRetriveValue.append('retrive_lumi')
207     if useParent == 1: allowedRetriveValue.append('retrive_parent')
208     common.logger.debug("Set of input parameters used for DBS query: %s" % allowedRetriveValue)
209 spiga 1.26 try:
210 spiga 1.27 if len(runselection) <=0 :
211 spiga 1.26 if useParent==1 or self.splitByRun==1 :
212 ewv 1.32 if self.ads:
213     files = api.listFiles(analysisDataset=path, retriveList=allowedRetriveValue)
214 spiga 1.26 else :
215 ewv 1.32 files = api.listFiles(path=path, retriveList=allowedRetriveValue)
216 spiga 1.26 else:
217     files = api.listDatasetFiles(self.datasetPath)
218     else :
219     files=[]
220     for arun in runselection:
221     try:
222 ewv 1.32 if self.ads:
223     filesinrun = api.listFiles(analysisDataset=path,retriveList=allowedRetriveValue,runNumber=arun)
224     else:
225     filesinrun = api.listFiles(path=path,retriveList=allowedRetriveValue,runNumber=arun)
226 spiga 1.26 files.extend(filesinrun)
227     except:
228     msg="WARNING: problem extracting info from DBS for run %s "%arun
229 spiga 1.31 common.logger.info(msg)
230 spiga 1.26 pass
231    
232     except DbsBadRequest, msg:
233     raise DataDiscoveryError(msg)
234     except DBSError, msg:
235     raise DataDiscoveryError(msg)
236    
237     return files
238    
239 ewv 1.32
240 afanfani 1.1 def getMaxEvents(self):
241     """
242 ewv 1.32 max events
243 afanfani 1.1 """
244 slacapra 1.18 return self.maxEvents
245 afanfani 1.1
246 ewv 1.32
247 slacapra 1.11 def getEventsPerBlock(self):
248 afanfani 1.1 """
249 ewv 1.32 list the event collections structure by fileblock
250 afanfani 1.1 """
251 slacapra 1.11 return self.eventsPerBlock
252 afanfani 1.1
253 ewv 1.32
254 slacapra 1.11 def getEventsPerFile(self):
255 afanfani 1.1 """
256 ewv 1.32 list the event collections structure by file
257 afanfani 1.1 """
258 slacapra 1.11 return self.eventsPerFile
259 afanfani 1.1
260 ewv 1.32
261 slacapra 1.11 def getFiles(self):
262 afanfani 1.1 """
263 ewv 1.32 return files grouped by fileblock
264 afanfani 1.1 """
265 ewv 1.32 return self.blocksinfo
266    
267 afanfani 1.1
268 spiga 1.21 def getParent(self):
269     """
270 ewv 1.32 return parent grouped by file
271     """
272     return self.parent
273    
274    
275     def getLumis(self):
276     """
277     return lumi sections grouped by file
278 spiga 1.21 """
279 ewv 1.32 return self.lumis
280    
281 spiga 1.21
282 spiga 1.26 def getListFiles(self):
283     """
284 ewv 1.32 return parent grouped by file
285 spiga 1.26 """
286 ewv 1.32 return self.files