ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/parseCrabFjr.py
(Generate patch)

Comparing COMP/CRAB/python/parseCrabFjr.py (file contents):
Revision 1.13 by farinafa, Wed Jun 18 08:32:02 2008 UTC vs.
Revision 1.20 by spiga, Thu Jan 19 17:48:35 2012 UTC

# Line 1 | Line 1
1   #!/usr/bin/env python
2  
3 < import sys, getopt, string
3 > import sys, getopt, string, os
4  
5   from ProdCommon.FwkJobRep.ReportParser import readJobReport
6   from DashboardAPI import apmonSend, apmonFree
# Line 9 | Line 9 | class parseFjr:
9      def __init__(self, argv):
10          """
11          parseCrabFjr
12 <
12 >
13          - parse CRAB FrameworkJobReport on WN: { 'protocol' : { 'action' : [attempted,succedeed,total-size,total-time,min-time,max-time] , ... } , ... }
14          - report parameters to DashBoard using DashBoardApi.py: for all 'read' actions of all protocols, report MBPS
15          - return ExitStatus and dump of DashBoard report separated by semi-colon to WN wrapper script
# Line 18 | Line 18 | class parseFjr:
18          self.input = ''
19          self.MonitorID = ''
20          self.MonitorJobID = ''
21 <        self.info2dash = False
22 <        self.exitCode = False
23 <        self.lfnList = False  
21 >        self.info2dash = False
22 >        self.exitCode = False
23 >        self.lfnList = False
24          self.debug = 0
25          try:
26 <            opts, args = getopt.getopt(argv, "", ["input=", "dashboard=", "exitcode", "lfn" , "debug", "help"])
26 >            opts, args = getopt.getopt(argv, "", ["input=", "dashboard=", "exitcode", "lfn" , "debug", "popularity=", "help"])
27          except getopt.GetoptError:
28              print self.usage()
29              sys.exit(2)
30 <        self.check(opts)  
30 >        self.check(opts)
31  
32          return
33  
34 <    def check(self,opts):
34 >    def check(self,opts):
35          # check command line parameter
36          for opt, arg in opts :
37              if opt  == "--help" :
# Line 40 | Line 40 | class parseFjr:
40              elif opt == "--input" :
41                  self.input = arg
42              elif opt == "--exitcode":
43 <                self.exitCode = True
43 >                self.exitCode = True
44              elif opt == "--lfn":
45 <                self.lfnList = True
45 >                self.lfnList = True
46 >            elif opt == "--popularity":
47 >                self.popularity = True
48 >                try:
49 >                   self.MonitorID = arg.split(",")[0]
50 >                   self.MonitorJobID = arg.split(",")[1]
51 >                   self.inputInfos = arg.split(",")[2]
52 >                except:
53 >                   self.MonitorID = ''
54 >                   self.MonitorJobID = ''
55 >                   self.inputInfos = ''
56              elif opt == "--dashboard":
57 <                self.info2dash = True
58 <                try:
57 >                self.info2dash = True
58 >                try:
59                     self.MonitorID = arg.split(",")[0]
60                     self.MonitorJobID = arg.split(",")[1]
61                  except:
# Line 53 | Line 63 | class parseFjr:
63                     self.MonitorJobID = ''
64              elif opt == "--debug" :
65                  self.debug = 1
66 <                
67 <        if self.input == '' or (not self.info2dash and not self.lfnList and not self.exitCode)  :
66 >
67 >        if self.input == '' or (not self.info2dash and not self.lfnList and not self.exitCode and not self.popularity)  :
68              print self.usage()
69              sys.exit()
70 <        
71 <        if self.info2dash:
70 >
71 >        if self.info2dash:
72              if self.MonitorID == '' or self.MonitorJobID == '':
73                  print self.usage()
74                  sys.exit()
75          return
76  
77 <    def run(self):
77 >    def run(self):
78  
79          # load FwkJobRep
80 <        jobReport = readJobReport(self.input)[0]
81 <        if self.exitCode :
80 >        try:
81 >            jobReport = readJobReport(self.input)[0]
82 >        except:
83 >            print '50115'
84 >            sys.exit()
85 >
86 >        if self.exitCode :
87              self.exitCodes(jobReport)
88 <        if self.lfnList :
88 >        if self.lfnList :
89             self.lfn_List(jobReport)
90 <        if self.info2dash :
90 >        if self.info2dash :
91             self.reportDash(jobReport)
92 +        if self.popularity:
93 +           self.popularityInfos(jobReport)
94          return
95  
96      def exitCodes(self, jobReport):
# Line 85 | Line 102 | class parseFjr:
102          if (len_fjr <= 6):
103             ### 50115 - cmsRun did not produce a valid/readable job report at runtime
104             exit_status = str(50115)
105 <        else:
105 >        else:
106              # get ExitStatus of last error
107              if len(jobReport.errors) != 0 :
108                  exit_status = str(jobReport.errors[-1]['ExitStatus'])
109              else :
110                  exit_status = str(0)
111 <        #check exit code
111 >        #check exit code
112          if string.strip(exit_status) == '': exit_status = -999
113 <        print exit_status  
114 <  
113 >        print exit_status
114 >
115          return
116  
117      def lfn_List(self,jobReport):
118 <        '''
119 <        get list of analyzed files
118 >        '''
119 >        get list of analyzed files
120          '''
121          lfnlist = [x['LFN'] for x in jobReport.inputFiles]
122          for file in lfnlist: print file
123 <        return
123 >        return
124  
125      def storageStat(self,jobReport):
126 <        '''
127 <        get i/o statistics
126 >        '''
127 >        get i/o statistics
128          '''
129          storageStatistics = str(jobReport.storageStatistics)
130          storage_report = {}
# Line 140 | Line 157 | class parseFjr:
157                          storage_report[protocol][action] = [attempted,succeeded,total_size,total_time,min_time,max_time]
158                  else :
159                      storage_report[protocol] = {action : [attempted,succeeded,total_size,total_time,min_time,max_time] }
160 <
160 >
161              if self.debug :
162                  for protocol in storage_report.keys() :
163                      print 'protocol:',protocol
164                      for action in storage_report[protocol].keys() :
165                          print 'action:',action,'measurement:',storage_report[protocol][action]
166  
167 <        #####
167 >        #####
168          # throughput measurements # Fabio
169          throughput_report = { 'rSize':0.0, 'rTime':0.0, 'wSize':0.0, 'wTime':0.0 }
170          for protocol in storage_report.keys() :
# Line 157 | Line 174 | class parseFjr:
174                      continue
175  
176                  # convert values
177 <                try:
177 >                try:
178                      sizeValue = float(storage_report[protocol][action][2])
179                      timeValue = float(storage_report[protocol][action][3])
180                  except Exception,e:
181                      continue
182 <
182 >
183                  # aggregate data
184 <                if 'read' in action:  
184 >                if 'read' in action:
185                      throughput_report['rSize'] += sizeValue
186                      throughput_report['rTime'] += timeValue
187                  elif 'write' in action:
# Line 183 | Line 200 | class parseFjr:
200  
201          throughput_report['avgNetThr'] = 'NULL'
202          try:
203 <            throughput_report['avgNetThr'] = throughput_report['rSize'] / float(jobReport.performance.summaries['ExeTime'])
203 >            throughput_report['avgNetThr'] = throughput_report['rSize'] / float(jobReport.performance.summaries['ExeTime'])
204          except:
205              pass
206  
207          throughput_report['writeThr'] = 'NULL'
208          if throughput_report['wTime'] > 0.0:
209 <            throughput_report['wTime'] /= 1000.0
209 >            throughput_report['wTime'] /= 1000.0
210              throughput_report['writeThr'] = float(throughput_report['wSize']/throughput_report['wTime'])
211 <        #####
212 <
213 <        if self.debug == 1 :
211 >        #####
212 >
213 >        if self.debug == 1 :
214              print storage_report
215              print throughput_report
216          return storage_report, throughput_report
217  
218 +    def popularityInfos(self, jobReport):
219 +        report_dict = {}
220 +        inputList = []
221 +        inputParentList = []
222 +        report_dict['inputBlocks'] = ''
223 +        if (os.path.exists(self.inputInfos)):
224 +            file=open(self.inputInfos,'r')
225 +            lines = file.readlines()
226 +            for line in lines:
227 +                if line.find("inputBlocks")>=0:
228 +                    report_dict['inputBlocks']= line.split("=")[1].strip()
229 +                if line.find("inputFiles")>=0:
230 +                    inputList = line.split("=")[1].strip().split(";")
231 +                if line.find("parentFiles")>=0:
232 +                    inputParentList = line.split("=")[1].strip().split(";")
233 +            file.close()
234 +        if len(inputList) == 1 and inputList[0] == '':
235 +            inputList=[]
236 +        if len(inputParentList) == 1 and inputParentList[0] == '':
237 +            inputParentList=[]
238 +        basename = ''
239 +        if len(inputList) > 1:
240 +            basename = os.path.commonprefix(inputList)
241 +        elif len(inputList) == 1:
242 +            basename =  "%s/"%os.path.dirname(inputList[0])
243 +        basenameParent = ''
244 +        if len(inputParentList) > 1:
245 +            basenameParent = os.path.commonprefix(inputParentList)
246 +        elif len(inputParentList) == 1:
247 +            basenameParent = "%s/"%os.path.dirname(inputParentList[0])
248 +
249 +        readFile = {}
250 +
251 +        readFileParent = {}
252 +        fileAttr = []
253 +        fileParentAttr = []
254 +        for inputFile in  jobReport.inputFiles:
255 +            fileAccess = 'Local'
256 +            if inputFile.get("PFN").find('xrootd'): fileAccess = 'Remote'
257 +            if inputFile['LFN'].find(basename) >=0:
258 +                fileAttr = (inputFile.get("FileType"), fileAccess, inputFile.get("Runs"))
259 +                readFile[inputFile.get("LFN").split(basename)[1]] = fileAttr
260 +            else:
261 +                fileParentAttr = (inputFile.get("FileType"), fileAccess, inputFile.get("Runs"))
262 +                readParentFile[inputFile.get("LFN").split(basenameParent)[1]] = fileParentAttr
263 +        cleanedinputList = []
264 +        for file in inputList:
265 +            cleanedinputList.append(file.split(basename)[1])
266 +        cleanedParentList = []
267 +        for file in inputParentList:
268 +            cleanedParentList.append(file.split(basenameParent)[1])
269 +
270 +        inputString = ''
271 +        LumisString = ''
272 +        countFile = 1
273 +        for f,t in readFile.items():
274 +            cleanedinputList.remove(f)
275 +            inputString += '%s::%d::%s::%s::%d;'%(f,1,t[0],t[1],countFile)
276 +            LumisString += '%s::%s::%d;'%(t[2].keys()[0],self.makeRanges(t[2].values()[0]),countFile)
277 +            countFile += 1
278 +
279 +        inputParentString = ''
280 +        LumisParentString  = ''
281 +        countParentFile = 1
282 +        for fp,tp in readFileParent.items():
283 +            cleanedParentList.remove(fp)
284 +            inputParentString += '%s::%d::%s::%s::%d;'%(fp,1,tp[0],tp[1],countParentFile)
285 +            LumisParentString += '%s::%s::%d;'%(tp[2].keys()[0],self.makeRanges(tp[2].values()[0]),countParentFile)
286 +            countParentFile += 1
287 +
288 +        if len(cleanedinputList):
289 +           for file in cleanedinputList :
290 +               if len(jobReport.errors):
291 +                   if jobReport.errors[0]["Description"].find(file) >= 0:
292 +                       fileAccess = 'Local'
293 +                       if jobReport.errors[0]["Description"].find('xrootd') >= 0: fileAccess = 'Remote'
294 +                       inputString += '%s::%d::%s::%s::%s;'%(file,0,'Unknown',fileAccess,'Unknown')
295 +                   else:
296 +                       inputString += '%s::%d::%s::%s::%s;'%(file,2,'Unknown','Unknown','Unknown')
297 +               else:
298 +                   inputString += '%s::%d::%s::%s::%s;'%(file,2,'Unknown','Unknown','Unknown')
299 +
300 +        if len(cleanedParentList):
301 +           for file in cleanedParentList :
302 +               if len(jobReport.errors):
303 +                   if jobReport.errors[0]["Description"].find(file) >= 0:
304 +                       inputString += '%s::%d::%s::%s::%s;'%(file,0,'Unknown','Local','Unknown')
305 +                   else:
306 +                       inputString += '%s::%d::%s::%s::%s;'%(file,2,'Unknown','Unknown','Unknown')
307 +               else:
308 +                   inputParentString += '%s::%d::%s::%s::%s;'%(file,2,'Unknown','Unknown','Unknown')
309 +
310 +        report_dict['inputFiles']= inputString
311 +        report_dict['parentFiles']= inputParentString
312 +        report_dict['lumisRange']= LumisString
313 +        report_dict['lumisParentRange']= LumisParentString
314 +        report_dict['Basename']= basename
315 +        report_dict['BasenameParent']= basenameParent
316 +
317 +         # send to DashBoard
318 +        apmonSend(self.MonitorID, self.MonitorJobID, report_dict)
319 +        apmonFree()
320 +
321 +       # if self.debug == 1 :
322 +        print "Popularity start"
323 +        for k,v in report_dict.items():
324 +            print "%s : %s"%(k,v)
325 +        print "Popularity stop"
326 +        return
327 +
328      def n_of_events(self,jobReport):
329          '''
330 <        #Brian's patch to sent number of events procedded to the Dashboard
330 >        #Brian's patch to sent number of events procedded to the Dashboard
331          # Add NoEventsPerRun to the Dashboard report
332 <        '''  
332 >        '''
333          event_report = {}
334          eventsPerRun = 0
335          for inputFile in jobReport.inputFiles:
# Line 219 | Line 346 | class parseFjr:
346          if self.debug == 1 : print event_report
347  
348          return event_report
349 <      
349 >
350      def reportDash(self,jobReport):
351          '''
352 <        dashboard report dictionary
352 >        dashboard report dictionary
353          '''
354          event_report = self.n_of_events(jobReport)
355          storage_report, throughput_report = self.storageStat(jobReport)
# Line 253 | Line 380 | class parseFjr:
380          dashboard_report['io_read_throughput'] = throughput_report['readThr']
381          dashboard_report['io_write_throughput'] = throughput_report['writeThr']
382          dashboard_report['io_netAvg_throughput'] = throughput_report['avgNetThr']
383 <
383 >
384          # send to DashBoard
385          apmonSend(self.MonitorID, self.MonitorJobID, dashboard_report)
386          apmonFree()
# Line 262 | Line 389 | class parseFjr:
389  
390          return
391  
392 +    def makeRanges(self,lumilist):
393 +        """ convert list to range """
394 +
395 +        counter = lumilist[0]
396 +        lumilist.remove(counter)
397 +        tempRange=[]
398 +        tempRange.append(counter)
399 +        string = ''
400 +        for i in lumilist:
401 +            if i == counter+1:
402 +                tempRange.append(i)
403 +                counter +=1
404 +            else:
405 +                if len(tempRange)==1:
406 +                    string += "%s,"%tempRange[0]
407 +                else:
408 +                    string += "%s-%s,"%(tempRange[:1][0],tempRange[-1:][0])
409 +                counter = i
410 +                tempRange=[]
411 +                tempRange.append(counter)
412 +            if i == lumilist[-1:][0]   :
413 +                if len(tempRange)==1:
414 +                    string += "%s"%tempRange[0]
415 +                else:
416 +                    string += "%s-%s"%(tempRange[:1][0],tempRange[-1:][0])
417 +        return string
418 +
419      def usage(self):
420 <        
421 <        msg="""
420 >
421 >        msg="""
422          required parameters:
423          --input            :       input FJR xml file
424 <
424 >
425          optional parameters:
426          --dashboard        :       send info to the dashboard. require following args: "MonitorID,MonitorJobID"
427              MonitorID        :       DashBoard MonitorID
428              MonitorJobID     :       DashBoard MonitorJobID
429 <        --exitcode         :       print executable exit code
429 >        --exitcode         :       print executable exit code
430          --lfn              :       report list of files really analyzed
431          --help             :       help
432          --debug            :       debug statements
433          """
434 <        return msg
434 >        return msg
435  
436  
437   if __name__ == '__main__' :
438 <    try:
439 <        parseFjr_ = parseFjr(sys.argv[1:])
440 <        parseFjr_.run()  
438 >    try:
439 >        parseFjr_ = parseFjr(sys.argv[1:])
440 >        parseFjr_.run()
441      except:
442          pass

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines