ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/DataDiscovery.py
(Generate patch)

Comparing COMP/CRAB/python/DataDiscovery.py (file contents):
Revision 1.29 by edelmann, Tue Mar 17 14:36:44 2009 UTC vs.
Revision 1.40.4.2 by spiga, Thu Apr 22 14:51:47 2010 UTC

# Line 1 | Line 1
1   #!/usr/bin/env python
2 +
3 + __revision__ = "$Id$"
4 + __version__ = "$Revision$"
5 +
6   import exceptions
7   import DBSAPI.dbsApi
8 < from DBSAPI.dbsApiException import *
8 > from DBSAPI.dbsApiException import *
9   import common
10   from crab_util import *
11 < import os
11 > from LumiList import LumiList
12 > import os
13 >
14  
15  
10 # #######################################
16   class DBSError(exceptions.Exception):
17      def __init__(self, errorName, errorMessage):
18          args='\nERROR DBS %s : %s \n'%(errorName,errorMessage)
19          exceptions.Exception.__init__(self, args)
20          pass
21 <    
21 >
22      def getErrorMessage(self):
23          """ Return error message """
24          return "%s" % (self.args)
25  
26 < # #######################################
26 >
27 >
28   class DBSInvalidDataTierError(exceptions.Exception):
29      def __init__(self, errorName, errorMessage):
30          args='\nERROR DBS %s : %s \n'%(errorName,errorMessage)
31          exceptions.Exception.__init__(self, args)
32          pass
33 <    
33 >
34      def getErrorMessage(self):
35          """ Return error message """
36          return "%s" % (self.args)
37  
38 < # #######################################
38 >
39 >
40   class DBSInfoError:
41      def __init__(self, url):
42          print '\nERROR accessing DBS url : '+url+'\n'
43          pass
44  
45 < # ####################################
45 >
46 >
47   class DataDiscoveryError(exceptions.Exception):
48      def __init__(self, errorMessage):
49          self.args=errorMessage
# Line 46 | Line 54 | class DataDiscoveryError(exceptions.Exce
54          """ Return exception error """
55          return "%s" % (self.args)
56  
57 < # ####################################
57 >
58 >
59   class NotExistingDatasetError(exceptions.Exception):
60      def __init__(self, errorMessage):
61          self.args=errorMessage
# Line 57 | Line 66 | class NotExistingDatasetError(exceptions
66          """ Return exception error """
67          return "%s" % (self.args)
68  
69 < # ####################################
69 >
70 >
71   class NoDataTierinProvenanceError(exceptions.Exception):
72      def __init__(self, errorMessage):
73          self.args=errorMessage
# Line 68 | Line 78 | class NoDataTierinProvenanceError(except
78          """ Return exception error """
79          return "%s" % (self.args)
80  
81 < # ####################################
82 < # class to find and extact info from published data
81 >
82 >
83   class DataDiscovery:
84 +    """
85 +    Class to find and extact info from published data
86 +    """
87      def __init__(self, datasetPath, cfg_params, skipAnBlocks):
88  
89          #       Attributes
90          self.datasetPath = datasetPath
91 +        # Analysis dataset is primary/processed/tier/definition
92 +        self.ads = len(self.datasetPath.split("/")) > 4
93          self.cfg_params = cfg_params
94          self.skipBlocks = skipAnBlocks
95  
96          self.eventsPerBlock = {}  # DBS output: map fileblocks-events for collection
97          self.eventsPerFile = {}   # DBS output: map files-events
98 <        self.blocksinfo = {}      # DBS output: map fileblocks-files
98 > #         self.lumisPerBlock = {}   # DBS output: number of lumis in each block
99 > #         self.lumisPerFile = {}    # DBS output: number of lumis in each file
100 >        self.blocksinfo = {}      # DBS output: map fileblocks-files
101          self.maxEvents = 0        # DBS output: max events
102 <        self.parent = {}       # DBS output: max events
102 >        self.maxLumis = 0         # DBS output: total number of lumis
103 >        self.parent = {}          # DBS output: parents of each file
104 >        self.lumis = {}           # DBS output: lumis in each file
105 >        self.lumiMask = None
106  
87 # ####################################
107      def fetchDBSInfo(self):
108          """
109          Contact DBS
110          """
111          ## get DBS URL
112          global_url="http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet"
113 <        caf_url = "http://cmsdbsprod.cern.ch/cms_dbs_caf_analysis_01/servlet/DBSServlet"
114 <        dbs_url_map  =   {'glite':    global_url,
96 <                          'glitecoll':global_url,\
97 <                          'condor':   global_url,\
98 <                          'condor_g': global_url,\
99 <                          'glidein':  global_url,\
100 <                          'lsf':      global_url,\
101 <                          'caf':      caf_url,\
102 <                          'sge':      global_url,
103 <                          'arc':      global_url
104 <                          }
105 <
106 <        dbs_url_default = dbs_url_map[(common.scheduler.name()).lower()]
107 <        dbs_url=  self.cfg_params.get('CMSSW.dbs_url', dbs_url_default)
108 <        common.logger.debug(3,"Accessing DBS at: "+dbs_url)
113 >        dbs_url=  self.cfg_params.get('CMSSW.dbs_url', global_url)
114 >        common.logger.info("Accessing DBS at: "+dbs_url)
115  
116          ## check if runs are selected
117          runselection = []
118          if (self.cfg_params.has_key('CMSSW.runselection')):
119              runselection = parseRange2(self.cfg_params['CMSSW.runselection'])
120  
121 +        ## check if lumiMask is set
122 +        self.lumiMask = self.cfg_params.get('CMSSW.lumi_mask',None)
123 +        lumiList = None
124 +        if self.lumiMask:
125 +            lumiList = LumiList(filename=self.lumiMask)
126  
127          self.splitByRun = int(self.cfg_params.get('CMSSW.split_by_run', 0))
117          
118        self.ads = int(self.cfg_params.get('CMSSW.ads', 0))
128  
129 <        common.logger.debug(6,"runselection is: %s"%runselection)
129 >        common.logger.log(10-1,"runselection is: %s"%runselection)
130          ## service API
131          args = {}
132          args['url']     = dbs_url
# Line 126 | Line 135 | class DataDiscovery:
135          ## check if has been requested to use the parent info
136          useparent = int(self.cfg_params.get('CMSSW.use_parent',0))
137  
138 <        ## check if has been asked for a non default file to store/read analyzed fileBlocks  
139 <        defaultName = common.work_space.shareDir()+'AnalyzedBlocks.txt'  
138 >        ## check if has been asked for a non default file to store/read analyzed fileBlocks
139 >        defaultName = common.work_space.shareDir()+'AnalyzedBlocks.txt'
140          fileBlocks_FileName = os.path.abspath(self.cfg_params.get('CMSSW.fileblocks_file',defaultName))
132
133        api = DBSAPI.dbsApi.DbsApi(args)
141  
142 +        api = DBSAPI.dbsApi.DbsApi(args)
143          self.files = self.queryDbs(api,path=self.datasetPath,runselection=runselection,useParent=useparent)
144  
145          anFileBlocks = []
146 <        if self.skipBlocks: anFileBlocks = readTXTfile(self, fileBlocks_FileName)
146 >        if self.skipBlocks: anFileBlocks = readTXTfile(self, fileBlocks_FileName)
147  
148          # parse files and fill arrays
149          for file in self.files :
150 <            parList = []
150 >            parList  = []
151 >            fileLumis = [] # List of tuples
152              # skip already analyzed blocks
153              fileblock = file['Block']['Name']
154              if fileblock not in anFileBlocks :
155                  filename = file['LogicalFileName']
156 <                # asked retry the list of parent for the given child
157 <                if useparent==1: parList = [x['LogicalFileName'] for x in file['ParentList']]
158 <                self.parent[filename] = parList
156 >                # asked retry the list of parent for the given child
157 >                if useparent==1:
158 >                    parList = [x['LogicalFileName'] for x in file['ParentList']]
159 >                if self.ads or self.lumiMask:
160 >                    fileLumis = [ (x['RunNumber'], x['LumiSectionNumber'])
161 >                                 for x in file['LumiList'] ]
162 >                self.parent[filename] = parList
163 >                # For LumiMask, intersection of two lists.
164 >                if self.lumiMask:
165 >                    self.lumis[filename] = lumiList.filterLumis(fileLumis)
166 >                else:
167 >                    self.lumis[filename] = fileLumis
168                  if filename.find('.dat') < 0 :
169                      events    = file['NumberOfEvents']
170 <                    # number of events per block
170 >                    # Count number of events and lumis per block
171                      if fileblock in self.eventsPerBlock.keys() :
172                          self.eventsPerBlock[fileblock] += events
173                      else :
174                          self.eventsPerBlock[fileblock] = events
175 <                    # number of events per file
175 >                    # Number of events per file
176                      self.eventsPerFile[filename] = events
177 <            
178 <                    # number of events per block
177 >
178 >                    # List of files per block
179                      if fileblock in self.blocksinfo.keys() :
180                          self.blocksinfo[fileblock].append(filename)
181                      else :
182                          self.blocksinfo[fileblock] = [filename]
183 <            
183 >
184                      # total number of events
185                      self.maxEvents += events
186 +                    self.maxLumis  += len(self.lumis[filename])
187 +
188          if  self.skipBlocks and len(self.eventsPerBlock.keys()) == 0:
189              msg = "No new fileblocks available for dataset: "+str(self.datasetPath)
190 <            raise  CrabException(msg)    
190 >            raise  CrabException(msg)
191  
192 <        saveFblocks=''
192 >        saveFblocks=''
193          for block in self.eventsPerBlock.keys() :
194 <            saveFblocks += str(block)+'\n'
195 <            common.logger.debug(6,"DBSInfo: total nevts %i in block %s "%(self.eventsPerBlock[block],block))
196 <        writeTXTfile(self, fileBlocks_FileName , saveFblocks)
197 <                      
194 >            saveFblocks += str(block)+'\n'
195 >            common.logger.log(10-1,"DBSInfo: total nevts %i in block %s "%(self.eventsPerBlock[block],block))
196 >        writeTXTfile(self, fileBlocks_FileName , saveFblocks)
197 >
198          if len(self.eventsPerBlock) <= 0:
199              raise NotExistingDatasetError(("\nNo data for %s in DBS\nPlease check"
200                                              + " dataset path variables in crab.cfg")
201                                              % self.datasetPath)
202  
203  
184 ###########################
185
204      def queryDbs(self,api,path=None,runselection=None,useParent=None):
205 <
206 <        allowedRetriveValue = [#'retrive_parent',
207 <                               'retrive_block',
208 <                               #'retrive_lumi',
209 <                               'retrive_run'
210 <                               ]
205 >
206 >        allowedRetriveValue = ['retrive_block', 'retrive_run']
207 >        if self.ads or self.lumiMask:
208 >            allowedRetriveValue.append('retrive_lumi')
209 >        if useParent == 1: allowedRetriveValue.append('retrive_parent')
210 >        common.logger.debug("Set of input parameters used for DBS query: %s" % allowedRetriveValue)
211          try:
212              if len(runselection) <=0 :
213 <                if useParent==1 or self.splitByRun==1 :
214 <                    if self.ads==1 :          
215 <                        files = api.listFiles(analysisDataset=path, retriveList=allowedRetriveValue)
213 >                if useParent==1 or self.splitByRun==1 or self.ads or self.lumiMask:
214 >                    if self.ads:
215 >                        files = api.listFiles(analysisDataset=path, retriveList=allowedRetriveValue)
216                      else :
217 <                        files = api.listFiles(path=path, retriveList=allowedRetriveValue)
200 <                    common.logger.debug(5,"Set of input parameters used for DBS query : \n"+str(allowedRetriveValue))
201 <                    common.logger.write("Set of input parameters used for DBS query : \n"+str(allowedRetriveValue))
217 >                        files = api.listFiles(path=path, retriveList=allowedRetriveValue)
218                  else:
219                      files = api.listDatasetFiles(self.datasetPath)
220              else :
221                  files=[]
222                  for arun in runselection:
223                      try:
224 <                        if self.ads==1 : filesinrun = api.listFiles(analysisDataset=path,retriveList=allowedRetriveValue,runNumber=arun)
225 <                        else: filesinrun = api.listFiles(path=path,retriveList=allowedRetriveValue,runNumber=arun)
224 >                        if self.ads:
225 >                            filesinrun = api.listFiles(analysisDataset=path,retriveList=allowedRetriveValue,runNumber=arun)
226 >                        else:
227 >                            filesinrun = api.listFiles(path=path,retriveList=allowedRetriveValue,runNumber=arun)
228                          files.extend(filesinrun)
229                      except:
230                          msg="WARNING: problem extracting info from DBS for run %s "%arun
231 <                        common.logger.message(msg)
231 >                        common.logger.info(msg)
232                          pass
233  
234          except DbsBadRequest, msg:
# Line 220 | Line 238 | class DataDiscovery:
238  
239          return files
240  
241 < # #################################################
241 >
242      def getMaxEvents(self):
243          """
244 <        max events
244 >        max events
245          """
246          return self.maxEvents
247  
248 < # #################################################
248 >
249 >    def getMaxLumis(self):
250 >        """
251 >        Return the number of lumis in the dataset
252 >        """
253 >        return self.maxLumis
254 >
255 >
256      def getEventsPerBlock(self):
257          """
258 <        list the event collections structure by fileblock
258 >        list the event collections structure by fileblock
259          """
260          return self.eventsPerBlock
261  
262 < # #################################################
262 >
263      def getEventsPerFile(self):
264          """
265 <        list the event collections structure by file
265 >        list the event collections structure by file
266          """
267          return self.eventsPerFile
268  
269 < # #################################################
269 >
270      def getFiles(self):
271          """
272 <        return files grouped by fileblock
272 >        return files grouped by fileblock
273          """
274 <        return self.blocksinfo        
274 >        return self.blocksinfo
275 >
276  
251 # #################################################
277      def getParent(self):
278          """
279 <        return parent grouped by file
279 >        return parent grouped by file
280          """
281 <        return self.parent        
281 >        return self.parent
282  
283 < # #################################################
284 <    def getListFiles(self):
283 >
284 >    def getLumis(self):
285          """
286 <        return parent grouped by file
286 >        return lumi sections grouped by file
287          """
288 <        return self.files        
288 >        return self.lumis
289 >
290  
291 < ########################################################################
291 >    def getListFiles(self):
292 >        """
293 >        return parent grouped by file
294 >        """
295 >        return self.files

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines