ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/DataDiscovery.py
Revision: 1.39
Committed: Thu Feb 4 16:32:38 2010 UTC (15 years, 2 months ago) by ewv
Content type: text/x-python
Branch: MAIN
Changes since 1.38: +21 -10 lines
Log Message:
Note on CAF's in Submitter, clarify

File Contents

# User Rev Content
1 gutsche 1.6 #!/usr/bin/env python
2 ewv 1.33
3 ewv 1.39 __revision__ = "$Id: DataDiscovery.py,v 1.38 2010/01/21 16:17:13 ewv Exp $"
4     __version__ = "$Revision: 1.38 $"
5 ewv 1.33
6 slacapra 1.18 import exceptions
7     import DBSAPI.dbsApi
8 ewv 1.32 from DBSAPI.dbsApiException import *
9 slacapra 1.18 import common
10     from crab_util import *
11 ewv 1.39 from LumiList import LumiList
12 ewv 1.32 import os
13    
14 afanfani 1.1
15 afanfani 1.3
16 slacapra 1.18 class DBSError(exceptions.Exception):
17     def __init__(self, errorName, errorMessage):
18     args='\nERROR DBS %s : %s \n'%(errorName,errorMessage)
19     exceptions.Exception.__init__(self, args)
20     pass
21 ewv 1.32
22 slacapra 1.18 def getErrorMessage(self):
23     """ Return error message """
24     return "%s" % (self.args)
25    
26 ewv 1.32
27    
28 slacapra 1.18 class DBSInvalidDataTierError(exceptions.Exception):
29     def __init__(self, errorName, errorMessage):
30     args='\nERROR DBS %s : %s \n'%(errorName,errorMessage)
31     exceptions.Exception.__init__(self, args)
32     pass
33 ewv 1.32
34 slacapra 1.18 def getErrorMessage(self):
35     """ Return error message """
36     return "%s" % (self.args)
37    
38 ewv 1.32
39    
40 slacapra 1.18 class DBSInfoError:
41     def __init__(self, url):
42     print '\nERROR accessing DBS url : '+url+'\n'
43     pass
44    
45 ewv 1.32
46    
47 afanfani 1.3 class DataDiscoveryError(exceptions.Exception):
48 slacapra 1.7 def __init__(self, errorMessage):
49 gutsche 1.15 self.args=errorMessage
50 slacapra 1.14 exceptions.Exception.__init__(self, self.args)
51 slacapra 1.7 pass
52    
53     def getErrorMessage(self):
54     """ Return exception error """
55     return "%s" % (self.args)
56 afanfani 1.3
57 ewv 1.32
58    
59 afanfani 1.3 class NotExistingDatasetError(exceptions.Exception):
60 slacapra 1.7 def __init__(self, errorMessage):
61 gutsche 1.15 self.args=errorMessage
62 slacapra 1.14 exceptions.Exception.__init__(self, self.args)
63 slacapra 1.7 pass
64    
65     def getErrorMessage(self):
66     """ Return exception error """
67     return "%s" % (self.args)
68 afanfani 1.1
69 ewv 1.32
70    
71 afanfani 1.3 class NoDataTierinProvenanceError(exceptions.Exception):
72 slacapra 1.7 def __init__(self, errorMessage):
73 gutsche 1.15 self.args=errorMessage
74 slacapra 1.14 exceptions.Exception.__init__(self, self.args)
75 slacapra 1.7 pass
76    
77     def getErrorMessage(self):
78     """ Return exception error """
79     return "%s" % (self.args)
80 afanfani 1.1
81 ewv 1.32
82    
83 afanfani 1.1 class DataDiscovery:
84 ewv 1.32 """
85     Class to find and extact info from published data
86     """
87 spiga 1.22 def __init__(self, datasetPath, cfg_params, skipAnBlocks):
88 afanfani 1.1
89 slacapra 1.18 # Attributes
90 slacapra 1.11 self.datasetPath = datasetPath
91 ewv 1.32 # Analysis dataset is primary/processed/tier/definition
92 spiga 1.34 self.ads = len(self.datasetPath.split("/")) > 4
93 afanfani 1.1 self.cfg_params = cfg_params
94 spiga 1.22 self.skipBlocks = skipAnBlocks
95 afanfani 1.1
96 slacapra 1.11 self.eventsPerBlock = {} # DBS output: map fileblocks-events for collection
97     self.eventsPerFile = {} # DBS output: map files-events
98 ewv 1.32 # self.lumisPerBlock = {} # DBS output: number of lumis in each block
99     # self.lumisPerFile = {} # DBS output: number of lumis in each file
100     self.blocksinfo = {} # DBS output: map fileblocks-files
101 slacapra 1.18 self.maxEvents = 0 # DBS output: max events
102 ewv 1.32 self.maxLumis = 0 # DBS output: total number of lumis
103     self.parent = {} # DBS output: parents of each file
104     self.lumis = {} # DBS output: lumis in each file
105 ewv 1.39 self.lumiMask = None
106 afanfani 1.1
107     def fetchDBSInfo(self):
108     """
109     Contact DBS
110     """
111 slacapra 1.11 ## get DBS URL
112 spiga 1.25 global_url="http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet"
113     caf_url = "http://cmsdbsprod.cern.ch/cms_dbs_caf_analysis_01/servlet/DBSServlet"
114     dbs_url_map = {'glite': global_url,
115 spiga 1.37 'glite_slc5':global_url,\
116 spiga 1.25 'glitecoll':global_url,\
117     'condor': global_url,\
118     'condor_g': global_url,\
119     'glidein': global_url,\
120     'lsf': global_url,\
121     'caf': caf_url,\
122 mcinquil 1.35 'sge': global_url,\
123     'arc': global_url,\
124     'pbs': global_url
125 spiga 1.25 }
126 afanfani 1.3
127 spiga 1.25 dbs_url_default = dbs_url_map[(common.scheduler.name()).lower()]
128     dbs_url= self.cfg_params.get('CMSSW.dbs_url', dbs_url_default)
129 spiga 1.36 common.logger.info("Accessing DBS at: "+dbs_url)
130 slacapra 1.18
131     ## check if runs are selected
132 slacapra 1.19 runselection = []
133     if (self.cfg_params.has_key('CMSSW.runselection')):
134 slacapra 1.18 runselection = parseRange2(self.cfg_params['CMSSW.runselection'])
135    
136 ewv 1.39 ## check if lumiMask is set
137     self.lumiMask = self.cfg_params.get('CMSSW.lumi_mask',None)
138     lumiList = None
139     if self.lumiMask:
140     lumiList = LumiList(filename=self.lumiMask)
141 spiga 1.26
142     self.splitByRun = int(self.cfg_params.get('CMSSW.split_by_run', 0))
143    
144 spiga 1.31 common.logger.log(10-1,"runselection is: %s"%runselection)
145 slacapra 1.18 ## service API
146     args = {}
147     args['url'] = dbs_url
148     args['level'] = 'CRITICAL'
149    
150 spiga 1.21 ## check if has been requested to use the parent info
151 spiga 1.26 useparent = int(self.cfg_params.get('CMSSW.use_parent',0))
152 spiga 1.21
153 ewv 1.32 ## check if has been asked for a non default file to store/read analyzed fileBlocks
154     defaultName = common.work_space.shareDir()+'AnalyzedBlocks.txt'
155 spiga 1.22 fileBlocks_FileName = os.path.abspath(self.cfg_params.get('CMSSW.fileblocks_file',defaultName))
156 ewv 1.32
157 slacapra 1.18 api = DBSAPI.dbsApi.DbsApi(args)
158 spiga 1.27 self.files = self.queryDbs(api,path=self.datasetPath,runselection=runselection,useParent=useparent)
159 slacapra 1.11
160 spiga 1.22 anFileBlocks = []
161 ewv 1.32 if self.skipBlocks: anFileBlocks = readTXTfile(self, fileBlocks_FileName)
162 spiga 1.22
163 slacapra 1.18 # parse files and fill arrays
164 spiga 1.26 for file in self.files :
165 ewv 1.32 parList = []
166 ewv 1.39 fileLumis = [] # List of tuples
167 spiga 1.22 # skip already analyzed blocks
168     fileblock = file['Block']['Name']
169     if fileblock not in anFileBlocks :
170     filename = file['LogicalFileName']
171 ewv 1.32 # asked retry the list of parent for the given child
172     if useparent==1:
173     parList = [x['LogicalFileName'] for x in file['ParentList']]
174 ewv 1.39 if self.ads or self.lumiMask:
175     fileLumis = [ (x['RunNumber'], x['LumiSectionNumber'])
176 ewv 1.32 for x in file['LumiList'] ]
177     self.parent[filename] = parList
178 ewv 1.39 # For LumiMask, intersection of two lists.
179     if self.lumiMask:
180     self.lumis[filename] = lumiList.filterLumis(fileLumis)
181     else:
182     self.lumis[filename] = fileLumis
183 spiga 1.22 if filename.find('.dat') < 0 :
184     events = file['NumberOfEvents']
185 ewv 1.32 # Count number of events and lumis per block
186 spiga 1.22 if fileblock in self.eventsPerBlock.keys() :
187     self.eventsPerBlock[fileblock] += events
188     else :
189     self.eventsPerBlock[fileblock] = events
190 ewv 1.32 # Number of events per file
191 spiga 1.22 self.eventsPerFile[filename] = events
192 ewv 1.32
193     # List of files per block
194 spiga 1.22 if fileblock in self.blocksinfo.keys() :
195     self.blocksinfo[fileblock].append(filename)
196     else :
197     self.blocksinfo[fileblock] = [filename]
198 ewv 1.32
199 spiga 1.22 # total number of events
200     self.maxEvents += events
201 ewv 1.39 self.maxLumis += len(self.lumis[filename])
202 ewv 1.32
203 spiga 1.22 if self.skipBlocks and len(self.eventsPerBlock.keys()) == 0:
204     msg = "No new fileblocks available for dataset: "+str(self.datasetPath)
205 ewv 1.32 raise CrabException(msg)
206 slacapra 1.11
207 ewv 1.32 saveFblocks=''
208 slacapra 1.18 for block in self.eventsPerBlock.keys() :
209 ewv 1.32 saveFblocks += str(block)+'\n'
210 spiga 1.31 common.logger.log(10-1,"DBSInfo: total nevts %i in block %s "%(self.eventsPerBlock[block],block))
211 ewv 1.32 writeTXTfile(self, fileBlocks_FileName , saveFblocks)
212    
213 slacapra 1.11 if len(self.eventsPerBlock) <= 0:
214 slacapra 1.18 raise NotExistingDatasetError(("\nNo data for %s in DBS\nPlease check"
215 slacapra 1.11 + " dataset path variables in crab.cfg")
216 slacapra 1.18 % self.datasetPath)
217 afanfani 1.1
218    
219 ewv 1.32 def queryDbs(self,api,path=None,runselection=None,useParent=None):
220 spiga 1.26
221 spiga 1.30 allowedRetriveValue = ['retrive_block', 'retrive_run']
222 ewv 1.39 if self.ads or self.lumiMask:
223     allowedRetriveValue.append('retrive_lumi')
224 ewv 1.32 if useParent == 1: allowedRetriveValue.append('retrive_parent')
225     common.logger.debug("Set of input parameters used for DBS query: %s" % allowedRetriveValue)
226 spiga 1.26 try:
227 spiga 1.27 if len(runselection) <=0 :
228 ewv 1.39 if useParent==1 or self.splitByRun==1 or self.ads or self.lumiMask:
229 ewv 1.32 if self.ads:
230     files = api.listFiles(analysisDataset=path, retriveList=allowedRetriveValue)
231 spiga 1.26 else :
232 ewv 1.32 files = api.listFiles(path=path, retriveList=allowedRetriveValue)
233 spiga 1.26 else:
234     files = api.listDatasetFiles(self.datasetPath)
235     else :
236     files=[]
237     for arun in runselection:
238     try:
239 ewv 1.32 if self.ads:
240     filesinrun = api.listFiles(analysisDataset=path,retriveList=allowedRetriveValue,runNumber=arun)
241     else:
242     filesinrun = api.listFiles(path=path,retriveList=allowedRetriveValue,runNumber=arun)
243 spiga 1.26 files.extend(filesinrun)
244     except:
245     msg="WARNING: problem extracting info from DBS for run %s "%arun
246 spiga 1.31 common.logger.info(msg)
247 spiga 1.26 pass
248    
249     except DbsBadRequest, msg:
250     raise DataDiscoveryError(msg)
251     except DBSError, msg:
252     raise DataDiscoveryError(msg)
253    
254     return files
255    
256 ewv 1.32
257 afanfani 1.1 def getMaxEvents(self):
258     """
259 ewv 1.32 max events
260 afanfani 1.1 """
261 slacapra 1.18 return self.maxEvents
262 afanfani 1.1
263 ewv 1.32
264 ewv 1.33 def getMaxLumis(self):
265     """
266     Return the number of lumis in the dataset
267     """
268     return self.maxLumis
269    
270    
271 slacapra 1.11 def getEventsPerBlock(self):
272 afanfani 1.1 """
273 ewv 1.32 list the event collections structure by fileblock
274 afanfani 1.1 """
275 slacapra 1.11 return self.eventsPerBlock
276 afanfani 1.1
277 ewv 1.32
278 slacapra 1.11 def getEventsPerFile(self):
279 afanfani 1.1 """
280 ewv 1.32 list the event collections structure by file
281 afanfani 1.1 """
282 slacapra 1.11 return self.eventsPerFile
283 afanfani 1.1
284 ewv 1.32
285 slacapra 1.11 def getFiles(self):
286 afanfani 1.1 """
287 ewv 1.32 return files grouped by fileblock
288 afanfani 1.1 """
289 ewv 1.32 return self.blocksinfo
290    
291 afanfani 1.1
292 spiga 1.21 def getParent(self):
293     """
294 ewv 1.32 return parent grouped by file
295     """
296     return self.parent
297    
298    
299     def getLumis(self):
300     """
301     return lumi sections grouped by file
302 spiga 1.21 """
303 ewv 1.32 return self.lumis
304    
305 spiga 1.21
306 spiga 1.26 def getListFiles(self):
307     """
308 ewv 1.32 return parent grouped by file
309 spiga 1.26 """
310 ewv 1.32 return self.files