ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/parseCrabFjr.py
(Generate patch)

Comparing COMP/CRAB/python/parseCrabFjr.py (file contents):
Revision 1.3 by gutsche, Thu Oct 12 19:04:51 2006 UTC vs.
Revision 1.10 by spiga, Thu May 29 21:01:14 2008 UTC

# Line 2 | Line 2
2  
3   import sys, getopt, string
4  
5 < from FwkJobRep.ReportParser import readJobReport
5 > from ProdCommon.FwkJobRep.ReportParser import readJobReport
6   from DashboardAPI import apmonSend, apmonFree
7  
8 <
9 < def main(argv) :
10 <    """
11 <    parseCrabFjr
12 <
13 <    - parse CRAB FrameworkJobReport on WN: { 'protocol' : { 'action' : [attempted,succedeed,total-size,total-time,min-time,max-time] , ... } , ... }
14 <    - report parameters to DashBoard using DashBoardApi.py: for all 'read' actions of all protocols, report MBPS
15 <    - return ExitStatus and dump of DashBoard report separated by semi-colon to WN wrapper script
16 <
17 <    required parameters:
18 <    --input            :       input FJR xml file
19 <    --MonitorID        :       DashBoard MonitorID
20 <    --MonitorJobID     :       DashBoard MonitorJobID
21 <
22 <    optional parameters:
23 <    --help             :       help
24 <    --debug            :       debug statements
25 <    
26 <    """
27 <
28 <    # defaults
29 <    input = ''
30 <    MonitorID = ''
31 <    MonitorJobID = ''
32 <    debug = 0
33 <
34 <    try:
35 <        opts, args = getopt.getopt(argv, "", ["input=", "MonitorID=", "MonitorJobID=", "debug", "help"])
36 <    except getopt.GetoptError:
37 <        print main.__doc__
38 <        sys.exit(2)
39 <
40 <    # check command line parameter
41 <    for opt, arg in opts :
42 <        if opt  == "--help" :
43 <            print main.__doc__
8 > class parseFjr:
9 >    def __init__(self, argv):
10 >        """
11 >        parseCrabFjr
12 >
13 >        - parse CRAB FrameworkJobReport on WN: { 'protocol' : { 'action' : [attempted,succedeed,total-size,total-time,min-time,max-time] , ... } , ... }
14 >        - report parameters to DashBoard using DashBoardApi.py: for all 'read' actions of all protocols, report MBPS
15 >        - return ExitStatus and dump of DashBoard report separated by semi-colon to WN wrapper script
16 >        """
17 >        # defaults
18 >        self.input = ''
19 >        self.MonitorID = ''
20 >        self.MonitorJobID = ''
21 >        self.info2dash = False
22 >        self.exitCode = False
23 >        self.lfnList = False  
24 >        self.debug = 0
25 >        try:
26 >            opts, args = getopt.getopt(argv, "", ["input=", "dashboard=", "exitcode", "lfn" , "debug", "help"])
27 >        except getopt.GetoptError:
28 >            print self.usage()
29 >            sys.exit(2)
30 >        self.check(opts)  
31 >
32 >        return
33 >
34 >    def check(self,opts):
35 >        # check command line parameter
36 >        for opt, arg in opts :
37 >            if opt  == "--help" :
38 >                print self.usage()
39 >                sys.exit()
40 >            elif opt == "--input" :
41 >                self.input = arg
42 >            elif opt == "--exitcode":
43 >                self.exitCode = True
44 >            elif opt == "--lfn":
45 >                self.lfnList = True
46 >            elif opt == "--dashboard":
47 >                self.info2dash = True
48 >                try:
49 >                   self.MonitorID = arg.split(",")[0]
50 >                   self.MonitorJobID = arg.split(",")[1]
51 >                except:
52 >                   self.MonitorID = ''
53 >                   self.MonitorJobID = ''
54 >            elif opt == "--debug" :
55 >                self.debug = 1
56 >                
57 >        if self.input == '' or (not self.info2dash and not self.lfnList and not self.exitCode)  :
58 >            print self.usage()
59              sys.exit()
60 <        elif opt == "--input" :
61 <            input = arg
62 <        elif opt == "--MonitorID" :
63 <            MonitorID = arg
64 <        elif opt == "--MonitorJobID" :
65 <            MonitorJobID = arg
66 <        elif opt == "--debug" :
67 <            debug = 1
68 <            
69 <    if input == '' or MonitorID == '' or MonitorJobID == '':
70 <        print main.__doc__
71 <        sys.exit()
72 <
73 <    # load FwkJobRep
74 <    jobReport = readJobReport(input)[0]
75 <
76 <    exit_satus = ''
77 <    
78 <    # get ExitStatus of last error
79 <    if len(jobReport.errors) != 0 :
80 <        exit_status = str(jobReport.errors[-1]['ExitStatus'])
81 <    else :
82 <        exit_status = str(0)
83 <
84 <    # get i/o statistics
85 <    storageStatistics = str(jobReport.storageStatistics)
86 <
87 <    # dashboard report dictionary
88 <    dashboard_report = {}
89 <
90 <    # check if storageStatistics is valid
91 <    if storageStatistics.find('Storage statistics:') != -1 :
77 <        # report form: { 'protocol' : { 'action' : [attempted,succedeed,total-size,total-time,min-time,max-time] , ... } , ... }
78 <        report = {}
79 <        data = storageStatistics.split('Storage statistics:')[1]
80 <        data_fields = data.split(';')
81 <        for data_field in data_fields:
82 <            # parse: format protocol/action = attepted/succedeed/total-size/total-time/min-time/max-time
83 <            key = data_field.split('=')[0].strip()
84 <            item = data_field.split('=')[1].strip()
85 <            protocol = str(key.split('/')[0].strip())
86 <            action = str(key.split('/')[1].strip())
87 <            item_array = item.split('/')
88 <            attempted = str(item_array[0].strip())
89 <            succeeded = str(item_array[1].strip())
90 <            total_size = str(item_array[2].strip().split('MB')[0])
91 <            total_time = str(item_array[3].strip().split('ms')[0])
92 <            min_time = str(item_array[4].strip().split('ms')[0])
93 <            max_time = str(item_array[5].strip().split('ms')[0])
94 <            # add to report
95 <            if protocol in report.keys() :
96 <                if action in report[protocol].keys() :
97 <                    print 'protocol/action:',protocol,'/',action,'listed twice in report, taking the first'
98 <                else :
99 <                    report[protocol][action] = [attempted,succeeded,total_size,total_time,min_time,max_time]
60 >        
61 >        if self.info2dash:
62 >            if self.MonitorID == '' or self.MonitorJobID == '':
63 >                print self.usage()
64 >                sys.exit()
65 >        return
66 >
67 >    def run(self):
68 >
69 >        # load FwkJobRep
70 >        jobReport = readJobReport(self.input)[0]
71 >        if self.exitCode :
72 >            self.exitCodes(jobReport)
73 >        if self.lfnList :
74 >           self.lfn_List(jobReport)
75 >        if self.info2dash :
76 >           self.reportDash(jobReport)
77 >        return
78 >
79 >    def exitCodes(self, jobReport):
80 >
81 >        exit_status = ''
82 >        ##### temporary fix for FJR incomplete ####
83 >        fjr = open (self.input)
84 >        len_fjr = len(fjr.readlines())
85 >        if (len_fjr <= 6):
86 >           ### 50115 - cmsRun did not produce a valid/readable job report at runtime
87 >           exit_status = str(50115)
88 >        else:
89 >            # get ExitStatus of last error
90 >            if len(jobReport.errors) != 0 :
91 >                exit_status = str(jobReport.errors[-1]['ExitStatus'])
92              else :
93 <                report[protocol] = {action : [attempted,succeeded,total_size,total_time,min_time,max_time] }
94 <
95 <        if debug :
96 <            for protocol in report.keys() :
97 <                print 'protocol:',protocol
98 <                for action in report[protocol].keys() :
99 <                    print 'action:',action,'measurement:',report[protocol][action]
93 >                exit_status = str(0)
94 >        #check exit code
95 >        if string.strip(exit_status) == '': exit_status = -999
96 >        print exit_status  
97 >  
98 >        return
99 >
100 >    def lfn_List(self,jobReport):
101 >        '''
102 >        get list of analyzed files
103 >        '''
104 >        lfnlist = [x['LFN'] for x in jobReport.inputFiles]
105 >        for file in lfnlist: print file
106 >        return
107 >
108 >    def storageStat(self,jobReport):
109 >        '''
110 >        get i/o statistics
111 >        '''
112 >        storageStatistics = str(jobReport.storageStatistics)
113 >        storage_report = {}
114 >        # check if storageStatistics is valid
115 >        if storageStatistics.find('Storage statistics:') != -1 :
116 >            # report form: { 'protocol' : { 'action' : [attempted,succedeed,total-size,total-time,min-time,max-time] , ... } , ... }
117 >            data = storageStatistics.split('Storage statistics:')[1]
118 >            data_fields = data.split(';')
119 >            for data_field in data_fields:
120 >                # parse: format protocol/action = attepted/succedeed/total-size/total-time/min-time/max-time
121 >                if data_field == ' ' or not data_field or data_field == '':
122 >                   continue
123 >                key = data_field.split('=')[0].strip()
124 >                item = data_field.split('=')[1].strip()
125 >                protocol = str(key.split('/')[0].strip())
126 >                action = str(key.split('/')[1].strip())
127 >                item_array = item.split('/')
128 >                attempted = str(item_array[0].strip())
129 >                succeeded = str(item_array[1].strip())
130 >                total_size = str(item_array[2].strip().split('MB')[0])
131 >                total_time = str(item_array[3].strip().split('ms')[0])
132 >                min_time = str(item_array[4].strip().split('ms')[0])
133 >                max_time = str(item_array[5].strip().split('ms')[0])
134 >                # add to report
135 >                if protocol in storage_report.keys() :
136 >                    if action in storage_report[protocol].keys() :
137 >                        print 'protocol/action:',protocol,'/',action,'listed twice in report, taking the first'
138 >                    else :
139 >                        storage_report[protocol][action] = [attempted,succeeded,total_size,total_time,min_time,max_time]
140 >                else :
141 >                    storage_report[protocol] = {action : [attempted,succeeded,total_size,total_time,min_time,max_time] }
142 >
143 >            if self.debug :
144 >                for protocol in storage_report.keys() :
145 >                    print 'protocol:',protocol
146 >                    for action in storage_report[protocol].keys() :
147 >                        print 'action:',action,'measurement:',storage_report[protocol][action]
148 >
149 >        if self.debug == 1 : print storage_report
150 >        return storage_report
151 >
152 >    def n_of_events(self,jobReport):
153 >        '''
154 >        #Brian's patch to sent number of events procedded to the Dashboard
155 >        # Add NoEventsPerRun to the Dashboard report
156 >        '''  
157 >        event_report = {}
158 >        eventsPerRun = 0
159 >        for inputFile in jobReport.inputFiles:
160 >            try:
161 >                eventsRead = str(inputFile.get('EventsRead', 0))
162 >                eventsRead = int(eventsRead.strip())
163 >            except:
164 >                continue
165 >            eventsPerRun += eventsRead
166 >        event_report['NoEventsPerRun'] = eventsPerRun
167 >        event_report['NbEvPerRun'] = eventsPerRun
168 >        event_report['NEventsProcessed'] = eventsPerRun
169 >
170 >        if self.debug == 1 : print event_report
171 >
172 >        return event_report
173 >      
174 >    def reportDash(self,jobReport):
175 >        '''
176 >        dashboard report dictionary
177 >        '''
178 >        event_report = self.n_of_events(jobReport)
179 >        storage_report = self.storageStat(jobReport)
180 >        dashboard_report = {}
181 >        #
182 >        for k,v in event_report.iteritems() :
183 >            dashboard_report[k]=v
184  
185          # extract information to be sent to DashBoard
186          # per protocol and for action=read, calculate MBPS
187          # dashboard key is io_action
188 <        dashboard_report['MonitorID'] = MonitorID
189 <        dashboard_report['MonitorJobID'] = MonitorJobID
190 <        for protocol in report.keys() :
191 <            for action in report[protocol].keys() :
192 <                try: size = float(report[protocol][action][2])
188 >        dashboard_report['MonitorID'] = self.MonitorID
189 >        dashboard_report['MonitorJobID'] = self.MonitorJobID
190 >        for protocol in storage_report.keys() :
191 >            for action in storage_report[protocol].keys() :
192 >                try: size = float(storage_report[protocol][action][2])
193                  except: size = 'NULL'
194 <                try: time = float(report[protocol][action][3])*1000
194 >                try: time = float(storage_report[protocol][action][3])/1000
195                  except: time = 'NULL'
196                  dashboard_report['io_'+protocol+'_'+action] = str(size)+'_'+str(time)
197 <
122 <        if debug :
197 >        if self.debug :
198              ordered = dashboard_report.keys()
199              ordered.sort()
200              for key in ordered:
201                  print key,'=',dashboard_report[key]
202 <
202 >
203          # send to DashBoard
204 <        apmonSend(MonitorID, MonitorJobID, dashboard_report)
204 >        apmonSend(self.MonitorID, self.MonitorJobID, dashboard_report)
205          apmonFree()
206  
207 <    # prepare exit string
133 <    exit_string = str(exit_status)
134 <    for key in dashboard_report.keys() :
135 <        exit_string += ';' + str(key) + '=' + str(dashboard_report[key])
207 >        if self.debug == 1 : print dashboard_report
208  
209 <    return exit_string
209 >        return
210  
211 +    def usage(self):
212 +        
213 +        msg="""
214 +        required parameters:
215 +        --input            :       input FJR xml file
216 +
217 +        optional parameters:
218 +        --dashboard        :       send info to the dashboard. require following args: "MonitorID,MonitorJobID"
219 +            MonitorID        :       DashBoard MonitorID
220 +            MonitorJobID     :       DashBoard MonitorJobID
221 +        --exitcode         :       print executable exit code
222 +        --lfn              :       report list of files really analyzed
223 +        --help             :       help
224 +        --debug            :       debug statements
225 +        """
226 +        return msg
227  
140 if __name__ == '__main__' :
141    exit_status = main(sys.argv[1:])
142    # output for wrapper script
143    print exit_status
228  
229 <    
229 > if __name__ == '__main__' :
230 >    try:
231 >        parseFjr_ = parseFjr(sys.argv[1:])
232 >        parseFjr_.run()  
233 >    except:
234 >        pass

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines