ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/DataDiscovery.py
Revision: 1.35
Committed: Thu Oct 8 15:15:17 2009 UTC (15 years, 6 months ago) by mcinquil
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_7_0, CRAB_2_7_0_pre8, CRAB_2_7_0_pre7, CRAB_2_7_0_pre6, CRAB_2_7_0_pre5
Branch point for: Lumi2_8
Changes since 1.34: +5 -4 lines
Log Message:
adding SchedulerPbs

File Contents

# User Rev Content
1 gutsche 1.6 #!/usr/bin/env python
2 ewv 1.33
3 mcinquil 1.35 __revision__ = "$Id: DataDiscovery.py,v 1.34 2009/08/20 09:25:58 spiga Exp $"
4     __version__ = "$Revision: 1.34 $"
5 ewv 1.33
6 slacapra 1.18 import exceptions
7     import DBSAPI.dbsApi
8 ewv 1.32 from DBSAPI.dbsApiException import *
9 slacapra 1.18 import common
10     from crab_util import *
11 ewv 1.32 import os
12    
13 afanfani 1.1
14 afanfani 1.3
15 slacapra 1.18 class DBSError(exceptions.Exception):
16     def __init__(self, errorName, errorMessage):
17     args='\nERROR DBS %s : %s \n'%(errorName,errorMessage)
18     exceptions.Exception.__init__(self, args)
19     pass
20 ewv 1.32
21 slacapra 1.18 def getErrorMessage(self):
22     """ Return error message """
23     return "%s" % (self.args)
24    
25 ewv 1.32
26    
27 slacapra 1.18 class DBSInvalidDataTierError(exceptions.Exception):
28     def __init__(self, errorName, errorMessage):
29     args='\nERROR DBS %s : %s \n'%(errorName,errorMessage)
30     exceptions.Exception.__init__(self, args)
31     pass
32 ewv 1.32
33 slacapra 1.18 def getErrorMessage(self):
34     """ Return error message """
35     return "%s" % (self.args)
36    
37 ewv 1.32
38    
39 slacapra 1.18 class DBSInfoError:
40     def __init__(self, url):
41     print '\nERROR accessing DBS url : '+url+'\n'
42     pass
43    
44 ewv 1.32
45    
46 afanfani 1.3 class DataDiscoveryError(exceptions.Exception):
47 slacapra 1.7 def __init__(self, errorMessage):
48 gutsche 1.15 self.args=errorMessage
49 slacapra 1.14 exceptions.Exception.__init__(self, self.args)
50 slacapra 1.7 pass
51    
52     def getErrorMessage(self):
53     """ Return exception error """
54     return "%s" % (self.args)
55 afanfani 1.3
56 ewv 1.32
57    
58 afanfani 1.3 class NotExistingDatasetError(exceptions.Exception):
59 slacapra 1.7 def __init__(self, errorMessage):
60 gutsche 1.15 self.args=errorMessage
61 slacapra 1.14 exceptions.Exception.__init__(self, self.args)
62 slacapra 1.7 pass
63    
64     def getErrorMessage(self):
65     """ Return exception error """
66     return "%s" % (self.args)
67 afanfani 1.1
68 ewv 1.32
69    
70 afanfani 1.3 class NoDataTierinProvenanceError(exceptions.Exception):
71 slacapra 1.7 def __init__(self, errorMessage):
72 gutsche 1.15 self.args=errorMessage
73 slacapra 1.14 exceptions.Exception.__init__(self, self.args)
74 slacapra 1.7 pass
75    
76     def getErrorMessage(self):
77     """ Return exception error """
78     return "%s" % (self.args)
79 afanfani 1.1
80 ewv 1.32
81    
82 afanfani 1.1 class DataDiscovery:
83 ewv 1.32 """
84     Class to find and extact info from published data
85     """
86 spiga 1.22 def __init__(self, datasetPath, cfg_params, skipAnBlocks):
87 afanfani 1.1
88 slacapra 1.18 # Attributes
89 slacapra 1.11 self.datasetPath = datasetPath
90 ewv 1.32 # Analysis dataset is primary/processed/tier/definition
91 spiga 1.34 self.ads = len(self.datasetPath.split("/")) > 4
92 afanfani 1.1 self.cfg_params = cfg_params
93 spiga 1.22 self.skipBlocks = skipAnBlocks
94 afanfani 1.1
95 slacapra 1.11 self.eventsPerBlock = {} # DBS output: map fileblocks-events for collection
96     self.eventsPerFile = {} # DBS output: map files-events
97 ewv 1.32 # self.lumisPerBlock = {} # DBS output: number of lumis in each block
98     # self.lumisPerFile = {} # DBS output: number of lumis in each file
99     self.blocksinfo = {} # DBS output: map fileblocks-files
100 slacapra 1.18 self.maxEvents = 0 # DBS output: max events
101 ewv 1.32 self.maxLumis = 0 # DBS output: total number of lumis
102     self.parent = {} # DBS output: parents of each file
103     self.lumis = {} # DBS output: lumis in each file
104    
105 afanfani 1.1
106     def fetchDBSInfo(self):
107     """
108     Contact DBS
109     """
110 slacapra 1.11 ## get DBS URL
111 spiga 1.25 global_url="http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet"
112     caf_url = "http://cmsdbsprod.cern.ch/cms_dbs_caf_analysis_01/servlet/DBSServlet"
113     dbs_url_map = {'glite': global_url,
114     'glitecoll':global_url,\
115     'condor': global_url,\
116     'condor_g': global_url,\
117     'glidein': global_url,\
118     'lsf': global_url,\
119     'caf': caf_url,\
120 mcinquil 1.35 'sge': global_url,\
121     'arc': global_url,\
122     'pbs': global_url
123 spiga 1.25 }
124 afanfani 1.3
125 spiga 1.25 dbs_url_default = dbs_url_map[(common.scheduler.name()).lower()]
126     dbs_url= self.cfg_params.get('CMSSW.dbs_url', dbs_url_default)
127 spiga 1.31 common.logger.debug("Accessing DBS at: "+dbs_url)
128 slacapra 1.18
129     ## check if runs are selected
130 slacapra 1.19 runselection = []
131     if (self.cfg_params.has_key('CMSSW.runselection')):
132 slacapra 1.18 runselection = parseRange2(self.cfg_params['CMSSW.runselection'])
133    
134 spiga 1.26
135     self.splitByRun = int(self.cfg_params.get('CMSSW.split_by_run', 0))
136    
137 spiga 1.31 common.logger.log(10-1,"runselection is: %s"%runselection)
138 slacapra 1.18 ## service API
139     args = {}
140     args['url'] = dbs_url
141     args['level'] = 'CRITICAL'
142    
143 spiga 1.21 ## check if has been requested to use the parent info
144 spiga 1.26 useparent = int(self.cfg_params.get('CMSSW.use_parent',0))
145 spiga 1.21
146 ewv 1.32 ## check if has been asked for a non default file to store/read analyzed fileBlocks
147     defaultName = common.work_space.shareDir()+'AnalyzedBlocks.txt'
148 spiga 1.22 fileBlocks_FileName = os.path.abspath(self.cfg_params.get('CMSSW.fileblocks_file',defaultName))
149 ewv 1.32
150 slacapra 1.18 api = DBSAPI.dbsApi.DbsApi(args)
151 spiga 1.27 self.files = self.queryDbs(api,path=self.datasetPath,runselection=runselection,useParent=useparent)
152 slacapra 1.11
153 spiga 1.22 anFileBlocks = []
154 ewv 1.32 if self.skipBlocks: anFileBlocks = readTXTfile(self, fileBlocks_FileName)
155 spiga 1.22
156 slacapra 1.18 # parse files and fill arrays
157 spiga 1.26 for file in self.files :
158 ewv 1.32 parList = []
159     lumiList = [] # List of tuples
160 spiga 1.22 # skip already analyzed blocks
161     fileblock = file['Block']['Name']
162     if fileblock not in anFileBlocks :
163     filename = file['LogicalFileName']
164 ewv 1.32 # asked retry the list of parent for the given child
165     if useparent==1:
166     parList = [x['LogicalFileName'] for x in file['ParentList']]
167     if self.ads:
168     lumiList = [ (x['RunNumber'], x['LumiSectionNumber'])
169     for x in file['LumiList'] ]
170     self.parent[filename] = parList
171     self.lumis[filename] = lumiList
172 spiga 1.22 if filename.find('.dat') < 0 :
173     events = file['NumberOfEvents']
174 ewv 1.32 # Count number of events and lumis per block
175 spiga 1.22 if fileblock in self.eventsPerBlock.keys() :
176     self.eventsPerBlock[fileblock] += events
177     else :
178     self.eventsPerBlock[fileblock] = events
179 ewv 1.32 # Number of events per file
180 spiga 1.22 self.eventsPerFile[filename] = events
181 ewv 1.32
182     # List of files per block
183 spiga 1.22 if fileblock in self.blocksinfo.keys() :
184     self.blocksinfo[fileblock].append(filename)
185     else :
186     self.blocksinfo[fileblock] = [filename]
187 ewv 1.32
188 spiga 1.22 # total number of events
189     self.maxEvents += events
190 ewv 1.32 self.maxLumis += len(lumiList)
191    
192 spiga 1.22 if self.skipBlocks and len(self.eventsPerBlock.keys()) == 0:
193     msg = "No new fileblocks available for dataset: "+str(self.datasetPath)
194 ewv 1.32 raise CrabException(msg)
195 slacapra 1.11
196 ewv 1.32 saveFblocks=''
197 slacapra 1.18 for block in self.eventsPerBlock.keys() :
198 ewv 1.32 saveFblocks += str(block)+'\n'
199 spiga 1.31 common.logger.log(10-1,"DBSInfo: total nevts %i in block %s "%(self.eventsPerBlock[block],block))
200 ewv 1.32 writeTXTfile(self, fileBlocks_FileName , saveFblocks)
201    
202 slacapra 1.11 if len(self.eventsPerBlock) <= 0:
203 slacapra 1.18 raise NotExistingDatasetError(("\nNo data for %s in DBS\nPlease check"
204 slacapra 1.11 + " dataset path variables in crab.cfg")
205 slacapra 1.18 % self.datasetPath)
206 afanfani 1.1
207    
208 ewv 1.32 def queryDbs(self,api,path=None,runselection=None,useParent=None):
209 spiga 1.26
210 spiga 1.30 allowedRetriveValue = ['retrive_block', 'retrive_run']
211 ewv 1.32 if self.ads: allowedRetriveValue.append('retrive_lumi')
212     if useParent == 1: allowedRetriveValue.append('retrive_parent')
213     common.logger.debug("Set of input parameters used for DBS query: %s" % allowedRetriveValue)
214 spiga 1.26 try:
215 spiga 1.27 if len(runselection) <=0 :
216 spiga 1.26 if useParent==1 or self.splitByRun==1 :
217 ewv 1.32 if self.ads:
218     files = api.listFiles(analysisDataset=path, retriveList=allowedRetriveValue)
219 spiga 1.26 else :
220 ewv 1.32 files = api.listFiles(path=path, retriveList=allowedRetriveValue)
221 spiga 1.26 else:
222     files = api.listDatasetFiles(self.datasetPath)
223     else :
224     files=[]
225     for arun in runselection:
226     try:
227 ewv 1.32 if self.ads:
228     filesinrun = api.listFiles(analysisDataset=path,retriveList=allowedRetriveValue,runNumber=arun)
229     else:
230     filesinrun = api.listFiles(path=path,retriveList=allowedRetriveValue,runNumber=arun)
231 spiga 1.26 files.extend(filesinrun)
232     except:
233     msg="WARNING: problem extracting info from DBS for run %s "%arun
234 spiga 1.31 common.logger.info(msg)
235 spiga 1.26 pass
236    
237     except DbsBadRequest, msg:
238     raise DataDiscoveryError(msg)
239     except DBSError, msg:
240     raise DataDiscoveryError(msg)
241    
242     return files
243    
244 ewv 1.32
245 afanfani 1.1 def getMaxEvents(self):
246     """
247 ewv 1.32 max events
248 afanfani 1.1 """
249 slacapra 1.18 return self.maxEvents
250 afanfani 1.1
251 ewv 1.32
252 ewv 1.33 def getMaxLumis(self):
253     """
254     Return the number of lumis in the dataset
255     """
256     return self.maxLumis
257    
258    
259 slacapra 1.11 def getEventsPerBlock(self):
260 afanfani 1.1 """
261 ewv 1.32 list the event collections structure by fileblock
262 afanfani 1.1 """
263 slacapra 1.11 return self.eventsPerBlock
264 afanfani 1.1
265 ewv 1.32
266 slacapra 1.11 def getEventsPerFile(self):
267 afanfani 1.1 """
268 ewv 1.32 list the event collections structure by file
269 afanfani 1.1 """
270 slacapra 1.11 return self.eventsPerFile
271 afanfani 1.1
272 ewv 1.32
273 slacapra 1.11 def getFiles(self):
274 afanfani 1.1 """
275 ewv 1.32 return files grouped by fileblock
276 afanfani 1.1 """
277 ewv 1.32 return self.blocksinfo
278    
279 afanfani 1.1
280 spiga 1.21 def getParent(self):
281     """
282 ewv 1.32 return parent grouped by file
283     """
284     return self.parent
285    
286    
287     def getLumis(self):
288     """
289     return lumi sections grouped by file
290 spiga 1.21 """
291 ewv 1.32 return self.lumis
292    
293 spiga 1.21
294 spiga 1.26 def getListFiles(self):
295     """
296 ewv 1.32 return parent grouped by file
297 spiga 1.26 """
298 ewv 1.32 return self.files