ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/parseCrabFjr.py
(Generate patch)

Comparing COMP/CRAB/python/parseCrabFjr.py (file contents):
Revision 1.16 by spiga, Wed Mar 23 10:48:39 2011 UTC vs.
Revision 1.20 by spiga, Thu Jan 19 17:48:35 2012 UTC

# Line 9 | Line 9 | class parseFjr:
9      def __init__(self, argv):
10          """
11          parseCrabFjr
12 <
12 >
13          - parse CRAB FrameworkJobReport on WN: { 'protocol' : { 'action' : [attempted,succedeed,total-size,total-time,min-time,max-time] , ... } , ... }
14          - report parameters to DashBoard using DashBoardApi.py: for all 'read' actions of all protocols, report MBPS
15          - return ExitStatus and dump of DashBoard report separated by semi-colon to WN wrapper script
# Line 18 | Line 18 | class parseFjr:
18          self.input = ''
19          self.MonitorID = ''
20          self.MonitorJobID = ''
21 <        self.info2dash = False
22 <        self.exitCode = False
23 <        self.lfnList = False  
21 >        self.info2dash = False
22 >        self.exitCode = False
23 >        self.lfnList = False
24          self.debug = 0
25          try:
26              opts, args = getopt.getopt(argv, "", ["input=", "dashboard=", "exitcode", "lfn" , "debug", "popularity=", "help"])
27          except getopt.GetoptError:
28              print self.usage()
29              sys.exit(2)
30 <        self.check(opts)  
30 >        self.check(opts)
31  
32          return
33  
34 <    def check(self,opts):
34 >    def check(self,opts):
35          # check command line parameter
36          for opt, arg in opts :
37              if opt  == "--help" :
# Line 40 | Line 40 | class parseFjr:
40              elif opt == "--input" :
41                  self.input = arg
42              elif opt == "--exitcode":
43 <                self.exitCode = True
43 >                self.exitCode = True
44              elif opt == "--lfn":
45 <                self.lfnList = True
45 >                self.lfnList = True
46              elif opt == "--popularity":
47 <                self.popularity = True
48 <                try:
47 >                self.popularity = True
48 >                try:
49                     self.MonitorID = arg.split(",")[0]
50 <                   self.MonitorJobID = arg.split(",")[1]
51 <                   self.inputInfos = arg.split(",")[2]
50 >                   self.MonitorJobID = arg.split(",")[1]
51 >                   self.inputInfos = arg.split(",")[2]
52                  except:
53                     self.MonitorID = ''
54                     self.MonitorJobID = ''
55                     self.inputInfos = ''
56              elif opt == "--dashboard":
57 <                self.info2dash = True
58 <                try:
57 >                self.info2dash = True
58 >                try:
59                     self.MonitorID = arg.split(",")[0]
60                     self.MonitorJobID = arg.split(",")[1]
61                  except:
# Line 63 | Line 63 | class parseFjr:
63                     self.MonitorJobID = ''
64              elif opt == "--debug" :
65                  self.debug = 1
66 <                
66 >
67          if self.input == '' or (not self.info2dash and not self.lfnList and not self.exitCode and not self.popularity)  :
68              print self.usage()
69              sys.exit()
70 <        
71 <        if self.info2dash:
70 >
71 >        if self.info2dash:
72              if self.MonitorID == '' or self.MonitorJobID == '':
73                  print self.usage()
74                  sys.exit()
75          return
76  
77 <    def run(self):
77 >    def run(self):
78  
79          # load FwkJobRep
80          try:
# Line 82 | Line 82 | class parseFjr:
82          except:
83              print '50115'
84              sys.exit()
85 <            
86 <        if self.exitCode :
85 >
86 >        if self.exitCode :
87              self.exitCodes(jobReport)
88 <        if self.lfnList :
88 >        if self.lfnList :
89             self.lfn_List(jobReport)
90 <        if self.info2dash :
90 >        if self.info2dash :
91             self.reportDash(jobReport)
92          if self.popularity:
93             self.popularityInfos(jobReport)
# Line 102 | Line 102 | class parseFjr:
102          if (len_fjr <= 6):
103             ### 50115 - cmsRun did not produce a valid/readable job report at runtime
104             exit_status = str(50115)
105 <        else:
105 >        else:
106              # get ExitStatus of last error
107              if len(jobReport.errors) != 0 :
108                  exit_status = str(jobReport.errors[-1]['ExitStatus'])
109              else :
110                  exit_status = str(0)
111 <        #check exit code
111 >        #check exit code
112          if string.strip(exit_status) == '': exit_status = -999
113 <        print exit_status  
114 <  
113 >        print exit_status
114 >
115          return
116  
117      def lfn_List(self,jobReport):
118 <        '''
119 <        get list of analyzed files
118 >        '''
119 >        get list of analyzed files
120          '''
121          lfnlist = [x['LFN'] for x in jobReport.inputFiles]
122          for file in lfnlist: print file
123 <        return
123 >        return
124  
125      def storageStat(self,jobReport):
126 <        '''
127 <        get i/o statistics
126 >        '''
127 >        get i/o statistics
128          '''
129          storageStatistics = str(jobReport.storageStatistics)
130          storage_report = {}
# Line 157 | Line 157 | class parseFjr:
157                          storage_report[protocol][action] = [attempted,succeeded,total_size,total_time,min_time,max_time]
158                  else :
159                      storage_report[protocol] = {action : [attempted,succeeded,total_size,total_time,min_time,max_time] }
160 <
160 >
161              if self.debug :
162                  for protocol in storage_report.keys() :
163                      print 'protocol:',protocol
164                      for action in storage_report[protocol].keys() :
165                          print 'action:',action,'measurement:',storage_report[protocol][action]
166  
167 <        #####
167 >        #####
168          # throughput measurements # Fabio
169          throughput_report = { 'rSize':0.0, 'rTime':0.0, 'wSize':0.0, 'wTime':0.0 }
170          for protocol in storage_report.keys() :
# Line 174 | Line 174 | class parseFjr:
174                      continue
175  
176                  # convert values
177 <                try:
177 >                try:
178                      sizeValue = float(storage_report[protocol][action][2])
179                      timeValue = float(storage_report[protocol][action][3])
180                  except Exception,e:
181                      continue
182 <
182 >
183                  # aggregate data
184 <                if 'read' in action:  
184 >                if 'read' in action:
185                      throughput_report['rSize'] += sizeValue
186                      throughput_report['rTime'] += timeValue
187                  elif 'write' in action:
# Line 200 | Line 200 | class parseFjr:
200  
201          throughput_report['avgNetThr'] = 'NULL'
202          try:
203 <            throughput_report['avgNetThr'] = throughput_report['rSize'] / float(jobReport.performance.summaries['ExeTime'])
203 >            throughput_report['avgNetThr'] = throughput_report['rSize'] / float(jobReport.performance.summaries['ExeTime'])
204          except:
205              pass
206  
207          throughput_report['writeThr'] = 'NULL'
208          if throughput_report['wTime'] > 0.0:
209 <            throughput_report['wTime'] /= 1000.0
209 >            throughput_report['wTime'] /= 1000.0
210              throughput_report['writeThr'] = float(throughput_report['wSize']/throughput_report['wTime'])
211 <        #####
212 <
213 <        if self.debug == 1 :
211 >        #####
212 >
213 >        if self.debug == 1 :
214              print storage_report
215              print throughput_report
216          return storage_report, throughput_report
# Line 218 | Line 218 | class parseFjr:
218      def popularityInfos(self, jobReport):
219          report_dict = {}
220          inputList = []
221 <        inputParentList = []                        
221 >        inputParentList = []
222          report_dict['inputBlocks'] = ''
223          if (os.path.exists(self.inputInfos)):
224              file=open(self.inputInfos,'r')
# Line 231 | Line 231 | class parseFjr:
231                  if line.find("parentFiles")>=0:
232                      inputParentList = line.split("=")[1].strip().split(";")
233              file.close()
234 +        if len(inputList) == 1 and inputList[0] == '':
235 +            inputList=[]
236 +        if len(inputParentList) == 1 and inputParentList[0] == '':
237 +            inputParentList=[]
238          basename = ''
239 <        if len(inputList):
239 >        if len(inputList) > 1:
240              basename = os.path.commonprefix(inputList)
241 <        basenameParent = ''
242 <        if len(inputParentList):
241 >        elif len(inputList) == 1:
242 >            basename =  "%s/"%os.path.dirname(inputList[0])
243 >        basenameParent = ''
244 >        if len(inputParentList) > 1:
245              basenameParent = os.path.commonprefix(inputParentList)
246 +        elif len(inputParentList) == 1:
247 +            basenameParent = "%s/"%os.path.dirname(inputParentList[0])
248  
249 <        readFile = {}  
249 >        readFile = {}
250  
251 <        readFileParent = {}
251 >        readFileParent = {}
252 >        fileAttr = []
253 >        fileParentAttr = []
254          for inputFile in  jobReport.inputFiles:
255 +            fileAccess = 'Local'
256 +            if inputFile.get("PFN").find('xrootd'): fileAccess = 'Remote'
257              if inputFile['LFN'].find(basename) >=0:
258 <                fileAttr = (inputFile.get("FileType"), "Local", inputFile.get("Runs"))
258 >                fileAttr = (inputFile.get("FileType"), fileAccess, inputFile.get("Runs"))
259                  readFile[inputFile.get("LFN").split(basename)[1]] = fileAttr
260              else:
261 <                fileParentAttr = (inputFile.get("FileType"), "Local", inputFile.get("Runs"))
261 >                fileParentAttr = (inputFile.get("FileType"), fileAccess, inputFile.get("Runs"))
262                  readParentFile[inputFile.get("LFN").split(basenameParent)[1]] = fileParentAttr
263          cleanedinputList = []
264 <        for file in inputList:        
264 >        for file in inputList:
265              cleanedinputList.append(file.split(basename)[1])
266          cleanedParentList = []
267 <        for file in inputParentList:        
267 >        for file in inputParentList:
268              cleanedParentList.append(file.split(basenameParent)[1])
269  
270          inputString = ''
271          LumisString = ''
272 +        countFile = 1
273          for f,t in readFile.items():
274 <            cleanedinputList.remove(f)    
275 <            inputString += '%s::%d::%s::%s;'%(f,1,t[0],t[1])
276 <            LumisString += '%s::%s;'%(t[2].keys()[0],t[2].values()[0])  
274 >            cleanedinputList.remove(f)
275 >            inputString += '%s::%d::%s::%s::%d;'%(f,1,t[0],t[1],countFile)
276 >            LumisString += '%s::%s::%d;'%(t[2].keys()[0],self.makeRanges(t[2].values()[0]),countFile)
277 >            countFile += 1
278  
279          inputParentString = ''
280          LumisParentString  = ''
281 +        countParentFile = 1
282          for fp,tp in readFileParent.items():
283 <            cleanedParentList.remove(fp)    
284 <            inputParentString += '%s::%d::%s::%s;'%(fp,1,tp[0],tp[1])
285 <            LumisParentString += '%s::%s;'%(tp[2].keys()[0],tp[2].values()[0])  
283 >            cleanedParentList.remove(fp)
284 >            inputParentString += '%s::%d::%s::%s::%d;'%(fp,1,tp[0],tp[1],countParentFile)
285 >            LumisParentString += '%s::%s::%d;'%(tp[2].keys()[0],self.makeRanges(tp[2].values()[0]),countParentFile)
286 >            countParentFile += 1
287  
288          if len(cleanedinputList):
289             for file in cleanedinputList :
290                 if len(jobReport.errors):
291                     if jobReport.errors[0]["Description"].find(file) >= 0:
292 <                       inputString += '%s::%d::%s::%s;'%(file,0,'Unknown','Local')
292 >                       fileAccess = 'Local'
293 >                       if jobReport.errors[0]["Description"].find('xrootd') >= 0: fileAccess = 'Remote'
294 >                       inputString += '%s::%d::%s::%s::%s;'%(file,0,'Unknown',fileAccess,'Unknown')
295                     else:
296 <                       inputString += '%s::%d::%s::%s;'%(file,2,'Unknown','Unknown')
296 >                       inputString += '%s::%d::%s::%s::%s;'%(file,2,'Unknown','Unknown','Unknown')
297                 else:
298 <                   inputString += '%s::%d::%s::%s;'%(file,2,'Unknown','Unknown')
298 >                   inputString += '%s::%d::%s::%s::%s;'%(file,2,'Unknown','Unknown','Unknown')
299  
300          if len(cleanedParentList):
301             for file in cleanedParentList :
302                 if len(jobReport.errors):
303                     if jobReport.errors[0]["Description"].find(file) >= 0:
304 <                       inputString += '%s::%d::%s::%s;'%(file,0,'Unknown','Local')
304 >                       inputString += '%s::%d::%s::%s::%s;'%(file,0,'Unknown','Local','Unknown')
305                     else:
306 <                       inputString += '%s::%d::%s::%s;'%(file,2,'Unknown','Unknown')
306 >                       inputString += '%s::%d::%s::%s::%s;'%(file,2,'Unknown','Unknown','Unknown')
307                 else:
308 <                   inputParentString += '%s::%d::%s::%s;'%(file,2,'Unknown','Unknown')
308 >                   inputParentString += '%s::%d::%s::%s::%s;'%(file,2,'Unknown','Unknown','Unknown')
309  
310          report_dict['inputFiles']= inputString
311 <        report_dict['parentFIles']= inputParentString
311 >        report_dict['parentFiles']= inputParentString
312          report_dict['lumisRange']= LumisString
313          report_dict['lumisParentRange']= LumisParentString
314          report_dict['Basename']= basename
# Line 300 | Line 318 | class parseFjr:
318          apmonSend(self.MonitorID, self.MonitorJobID, report_dict)
319          apmonFree()
320  
321 <        if self.debug == 1 :
322 <            for k,v in report_dict.items():
323 <                print "%s : %s"%(k,v)
324 <        return  
321 >       # if self.debug == 1 :
322 >        print "Popularity start"
323 >        for k,v in report_dict.items():
324 >            print "%s : %s"%(k,v)
325 >        print "Popularity stop"
326 >        return
327  
328      def n_of_events(self,jobReport):
329          '''
330 <        #Brian's patch to sent number of events procedded to the Dashboard
330 >        #Brian's patch to sent number of events procedded to the Dashboard
331          # Add NoEventsPerRun to the Dashboard report
332 <        '''  
332 >        '''
333          event_report = {}
334          eventsPerRun = 0
335          for inputFile in jobReport.inputFiles:
# Line 326 | Line 346 | class parseFjr:
346          if self.debug == 1 : print event_report
347  
348          return event_report
349 <      
349 >
350      def reportDash(self,jobReport):
351          '''
352 <        dashboard report dictionary
352 >        dashboard report dictionary
353          '''
354          event_report = self.n_of_events(jobReport)
355          storage_report, throughput_report = self.storageStat(jobReport)
# Line 360 | Line 380 | class parseFjr:
380          dashboard_report['io_read_throughput'] = throughput_report['readThr']
381          dashboard_report['io_write_throughput'] = throughput_report['writeThr']
382          dashboard_report['io_netAvg_throughput'] = throughput_report['avgNetThr']
383 <
383 >
384          # send to DashBoard
385          apmonSend(self.MonitorID, self.MonitorJobID, dashboard_report)
386          apmonFree()
# Line 369 | Line 389 | class parseFjr:
389  
390          return
391  
392 +    def makeRanges(self,lumilist):
393 +        """ convert list to range """
394 +
395 +        counter = lumilist[0]
396 +        lumilist.remove(counter)
397 +        tempRange=[]
398 +        tempRange.append(counter)
399 +        string = ''
400 +        for i in lumilist:
401 +            if i == counter+1:
402 +                tempRange.append(i)
403 +                counter +=1
404 +            else:
405 +                if len(tempRange)==1:
406 +                    string += "%s,"%tempRange[0]
407 +                else:
408 +                    string += "%s-%s,"%(tempRange[:1][0],tempRange[-1:][0])
409 +                counter = i
410 +                tempRange=[]
411 +                tempRange.append(counter)
412 +            if i == lumilist[-1:][0]   :
413 +                if len(tempRange)==1:
414 +                    string += "%s"%tempRange[0]
415 +                else:
416 +                    string += "%s-%s"%(tempRange[:1][0],tempRange[-1:][0])
417 +        return string
418 +
419      def usage(self):
420 <        
421 <        msg="""
420 >
421 >        msg="""
422          required parameters:
423          --input            :       input FJR xml file
424 <
424 >
425          optional parameters:
426          --dashboard        :       send info to the dashboard. require following args: "MonitorID,MonitorJobID"
427              MonitorID        :       DashBoard MonitorID
428              MonitorJobID     :       DashBoard MonitorJobID
429 <        --exitcode         :       print executable exit code
429 >        --exitcode         :       print executable exit code
430          --lfn              :       report list of files really analyzed
431          --help             :       help
432          --debug            :       debug statements
433          """
434 <        return msg
434 >        return msg
435  
436  
437   if __name__ == '__main__' :
438 <    try:
439 <        parseFjr_ = parseFjr(sys.argv[1:])
440 <        parseFjr_.run()  
438 >    try:
439 >        parseFjr_ = parseFjr(sys.argv[1:])
440 >        parseFjr_.run()
441      except:
442          pass

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines