ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/DataDiscovery.py
Revision: 1.40
Committed: Thu Feb 4 16:35:25 2010 UTC (15 years, 2 months ago) by ewv
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_7_1, CRAB_2_7_1_pre12, CRAB_2_7_1_pre11, CRAB_2_7_1_pre10, CRAB_2_7_1_pre9, CRAB_2_7_1_pre8, CRAB_2_7_1_pre6, CRAB_2_7_1_pre5
Branch point for: CRAB_2_7_1_branch, LumiMask
Changes since 1.39: +8 -19 lines
Log Message:
Revert accidental changes

File Contents

# User Rev Content
1 gutsche 1.6 #!/usr/bin/env python
2 ewv 1.33
3 ewv 1.39 __revision__ = "$Id: DataDiscovery.py,v 1.38 2010/01/21 16:17:13 ewv Exp $"
4     __version__ = "$Revision: 1.38 $"
5 ewv 1.33
6 slacapra 1.18 import exceptions
7     import DBSAPI.dbsApi
8 ewv 1.32 from DBSAPI.dbsApiException import *
9 slacapra 1.18 import common
10     from crab_util import *
11 ewv 1.32 import os
12    
13 afanfani 1.1
14 afanfani 1.3
15 slacapra 1.18 class DBSError(exceptions.Exception):
16     def __init__(self, errorName, errorMessage):
17     args='\nERROR DBS %s : %s \n'%(errorName,errorMessage)
18     exceptions.Exception.__init__(self, args)
19     pass
20 ewv 1.32
21 slacapra 1.18 def getErrorMessage(self):
22     """ Return error message """
23     return "%s" % (self.args)
24    
25 ewv 1.32
26    
27 slacapra 1.18 class DBSInvalidDataTierError(exceptions.Exception):
28     def __init__(self, errorName, errorMessage):
29     args='\nERROR DBS %s : %s \n'%(errorName,errorMessage)
30     exceptions.Exception.__init__(self, args)
31     pass
32 ewv 1.32
33 slacapra 1.18 def getErrorMessage(self):
34     """ Return error message """
35     return "%s" % (self.args)
36    
37 ewv 1.32
38    
39 slacapra 1.18 class DBSInfoError:
40     def __init__(self, url):
41     print '\nERROR accessing DBS url : '+url+'\n'
42     pass
43    
44 ewv 1.32
45    
46 afanfani 1.3 class DataDiscoveryError(exceptions.Exception):
47 slacapra 1.7 def __init__(self, errorMessage):
48 gutsche 1.15 self.args=errorMessage
49 slacapra 1.14 exceptions.Exception.__init__(self, self.args)
50 slacapra 1.7 pass
51    
52     def getErrorMessage(self):
53     """ Return exception error """
54     return "%s" % (self.args)
55 afanfani 1.3
56 ewv 1.32
57    
58 afanfani 1.3 class NotExistingDatasetError(exceptions.Exception):
59 slacapra 1.7 def __init__(self, errorMessage):
60 gutsche 1.15 self.args=errorMessage
61 slacapra 1.14 exceptions.Exception.__init__(self, self.args)
62 slacapra 1.7 pass
63    
64     def getErrorMessage(self):
65     """ Return exception error """
66     return "%s" % (self.args)
67 afanfani 1.1
68 ewv 1.32
69    
70 afanfani 1.3 class NoDataTierinProvenanceError(exceptions.Exception):
71 slacapra 1.7 def __init__(self, errorMessage):
72 gutsche 1.15 self.args=errorMessage
73 slacapra 1.14 exceptions.Exception.__init__(self, self.args)
74 slacapra 1.7 pass
75    
76     def getErrorMessage(self):
77     """ Return exception error """
78     return "%s" % (self.args)
79 afanfani 1.1
80 ewv 1.32
81    
82 afanfani 1.1 class DataDiscovery:
83 ewv 1.32 """
84     Class to find and extact info from published data
85     """
86 spiga 1.22 def __init__(self, datasetPath, cfg_params, skipAnBlocks):
87 afanfani 1.1
88 slacapra 1.18 # Attributes
89 slacapra 1.11 self.datasetPath = datasetPath
90 ewv 1.32 # Analysis dataset is primary/processed/tier/definition
91 spiga 1.34 self.ads = len(self.datasetPath.split("/")) > 4
92 afanfani 1.1 self.cfg_params = cfg_params
93 spiga 1.22 self.skipBlocks = skipAnBlocks
94 afanfani 1.1
95 slacapra 1.11 self.eventsPerBlock = {} # DBS output: map fileblocks-events for collection
96     self.eventsPerFile = {} # DBS output: map files-events
97 ewv 1.32 # self.lumisPerBlock = {} # DBS output: number of lumis in each block
98     # self.lumisPerFile = {} # DBS output: number of lumis in each file
99     self.blocksinfo = {} # DBS output: map fileblocks-files
100 slacapra 1.18 self.maxEvents = 0 # DBS output: max events
101 ewv 1.32 self.maxLumis = 0 # DBS output: total number of lumis
102     self.parent = {} # DBS output: parents of each file
103     self.lumis = {} # DBS output: lumis in each file
104 ewv 1.40
105 afanfani 1.1
106     def fetchDBSInfo(self):
107     """
108     Contact DBS
109     """
110 slacapra 1.11 ## get DBS URL
111 spiga 1.25 global_url="http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet"
112     caf_url = "http://cmsdbsprod.cern.ch/cms_dbs_caf_analysis_01/servlet/DBSServlet"
113     dbs_url_map = {'glite': global_url,
114 spiga 1.37 'glite_slc5':global_url,\
115 spiga 1.25 'glitecoll':global_url,\
116     'condor': global_url,\
117     'condor_g': global_url,\
118     'glidein': global_url,\
119     'lsf': global_url,\
120     'caf': caf_url,\
121 mcinquil 1.35 'sge': global_url,\
122     'arc': global_url,\
123     'pbs': global_url
124 spiga 1.25 }
125 afanfani 1.3
126 spiga 1.25 dbs_url_default = dbs_url_map[(common.scheduler.name()).lower()]
127     dbs_url= self.cfg_params.get('CMSSW.dbs_url', dbs_url_default)
128 spiga 1.36 common.logger.info("Accessing DBS at: "+dbs_url)
129 slacapra 1.18
130     ## check if runs are selected
131 slacapra 1.19 runselection = []
132     if (self.cfg_params.has_key('CMSSW.runselection')):
133 slacapra 1.18 runselection = parseRange2(self.cfg_params['CMSSW.runselection'])
134    
135 spiga 1.26
136     self.splitByRun = int(self.cfg_params.get('CMSSW.split_by_run', 0))
137    
138 spiga 1.31 common.logger.log(10-1,"runselection is: %s"%runselection)
139 slacapra 1.18 ## service API
140     args = {}
141     args['url'] = dbs_url
142     args['level'] = 'CRITICAL'
143    
144 spiga 1.21 ## check if has been requested to use the parent info
145 spiga 1.26 useparent = int(self.cfg_params.get('CMSSW.use_parent',0))
146 spiga 1.21
147 ewv 1.32 ## check if has been asked for a non default file to store/read analyzed fileBlocks
148     defaultName = common.work_space.shareDir()+'AnalyzedBlocks.txt'
149 spiga 1.22 fileBlocks_FileName = os.path.abspath(self.cfg_params.get('CMSSW.fileblocks_file',defaultName))
150 ewv 1.32
151 slacapra 1.18 api = DBSAPI.dbsApi.DbsApi(args)
152 spiga 1.27 self.files = self.queryDbs(api,path=self.datasetPath,runselection=runselection,useParent=useparent)
153 slacapra 1.11
154 spiga 1.22 anFileBlocks = []
155 ewv 1.32 if self.skipBlocks: anFileBlocks = readTXTfile(self, fileBlocks_FileName)
156 spiga 1.22
157 slacapra 1.18 # parse files and fill arrays
158 spiga 1.26 for file in self.files :
159 ewv 1.32 parList = []
160 ewv 1.40 lumiList = [] # List of tuples
161 spiga 1.22 # skip already analyzed blocks
162     fileblock = file['Block']['Name']
163     if fileblock not in anFileBlocks :
164     filename = file['LogicalFileName']
165 ewv 1.32 # asked retry the list of parent for the given child
166     if useparent==1:
167     parList = [x['LogicalFileName'] for x in file['ParentList']]
168 ewv 1.40 if self.ads:
169     lumiList = [ (x['RunNumber'], x['LumiSectionNumber'])
170 ewv 1.32 for x in file['LumiList'] ]
171     self.parent[filename] = parList
172 ewv 1.40 self.lumis[filename] = lumiList
173 spiga 1.22 if filename.find('.dat') < 0 :
174     events = file['NumberOfEvents']
175 ewv 1.32 # Count number of events and lumis per block
176 spiga 1.22 if fileblock in self.eventsPerBlock.keys() :
177     self.eventsPerBlock[fileblock] += events
178     else :
179     self.eventsPerBlock[fileblock] = events
180 ewv 1.32 # Number of events per file
181 spiga 1.22 self.eventsPerFile[filename] = events
182 ewv 1.32
183     # List of files per block
184 spiga 1.22 if fileblock in self.blocksinfo.keys() :
185     self.blocksinfo[fileblock].append(filename)
186     else :
187     self.blocksinfo[fileblock] = [filename]
188 ewv 1.32
189 spiga 1.22 # total number of events
190     self.maxEvents += events
191 ewv 1.40 self.maxLumis += len(lumiList)
192 ewv 1.32
193 spiga 1.22 if self.skipBlocks and len(self.eventsPerBlock.keys()) == 0:
194     msg = "No new fileblocks available for dataset: "+str(self.datasetPath)
195 ewv 1.32 raise CrabException(msg)
196 slacapra 1.11
197 ewv 1.32 saveFblocks=''
198 slacapra 1.18 for block in self.eventsPerBlock.keys() :
199 ewv 1.32 saveFblocks += str(block)+'\n'
200 spiga 1.31 common.logger.log(10-1,"DBSInfo: total nevts %i in block %s "%(self.eventsPerBlock[block],block))
201 ewv 1.32 writeTXTfile(self, fileBlocks_FileName , saveFblocks)
202    
203 slacapra 1.11 if len(self.eventsPerBlock) <= 0:
204 slacapra 1.18 raise NotExistingDatasetError(("\nNo data for %s in DBS\nPlease check"
205 slacapra 1.11 + " dataset path variables in crab.cfg")
206 slacapra 1.18 % self.datasetPath)
207 afanfani 1.1
208    
209 ewv 1.32 def queryDbs(self,api,path=None,runselection=None,useParent=None):
210 spiga 1.26
211 spiga 1.30 allowedRetriveValue = ['retrive_block', 'retrive_run']
212 ewv 1.40 if self.ads: allowedRetriveValue.append('retrive_lumi')
213 ewv 1.32 if useParent == 1: allowedRetriveValue.append('retrive_parent')
214     common.logger.debug("Set of input parameters used for DBS query: %s" % allowedRetriveValue)
215 spiga 1.26 try:
216 spiga 1.27 if len(runselection) <=0 :
217 ewv 1.40 if useParent==1 or self.splitByRun==1 or self.ads:
218 ewv 1.32 if self.ads:
219     files = api.listFiles(analysisDataset=path, retriveList=allowedRetriveValue)
220 spiga 1.26 else :
221 ewv 1.32 files = api.listFiles(path=path, retriveList=allowedRetriveValue)
222 spiga 1.26 else:
223     files = api.listDatasetFiles(self.datasetPath)
224     else :
225     files=[]
226     for arun in runselection:
227     try:
228 ewv 1.32 if self.ads:
229     filesinrun = api.listFiles(analysisDataset=path,retriveList=allowedRetriveValue,runNumber=arun)
230     else:
231     filesinrun = api.listFiles(path=path,retriveList=allowedRetriveValue,runNumber=arun)
232 spiga 1.26 files.extend(filesinrun)
233     except:
234     msg="WARNING: problem extracting info from DBS for run %s "%arun
235 spiga 1.31 common.logger.info(msg)
236 spiga 1.26 pass
237    
238     except DbsBadRequest, msg:
239     raise DataDiscoveryError(msg)
240     except DBSError, msg:
241     raise DataDiscoveryError(msg)
242    
243     return files
244    
245 ewv 1.32
246 afanfani 1.1 def getMaxEvents(self):
247     """
248 ewv 1.32 max events
249 afanfani 1.1 """
250 slacapra 1.18 return self.maxEvents
251 afanfani 1.1
252 ewv 1.32
253 ewv 1.33 def getMaxLumis(self):
254     """
255     Return the number of lumis in the dataset
256     """
257     return self.maxLumis
258    
259    
260 slacapra 1.11 def getEventsPerBlock(self):
261 afanfani 1.1 """
262 ewv 1.32 list the event collections structure by fileblock
263 afanfani 1.1 """
264 slacapra 1.11 return self.eventsPerBlock
265 afanfani 1.1
266 ewv 1.32
267 slacapra 1.11 def getEventsPerFile(self):
268 afanfani 1.1 """
269 ewv 1.32 list the event collections structure by file
270 afanfani 1.1 """
271 slacapra 1.11 return self.eventsPerFile
272 afanfani 1.1
273 ewv 1.32
274 slacapra 1.11 def getFiles(self):
275 afanfani 1.1 """
276 ewv 1.32 return files grouped by fileblock
277 afanfani 1.1 """
278 ewv 1.32 return self.blocksinfo
279    
280 afanfani 1.1
281 spiga 1.21 def getParent(self):
282     """
283 ewv 1.32 return parent grouped by file
284     """
285     return self.parent
286    
287    
288     def getLumis(self):
289     """
290     return lumi sections grouped by file
291 spiga 1.21 """
292 ewv 1.32 return self.lumis
293    
294 spiga 1.21
295 spiga 1.26 def getListFiles(self):
296     """
297 ewv 1.32 return parent grouped by file
298 spiga 1.26 """
299 ewv 1.32 return self.files