ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/parseCrabFjr.py
(Generate patch)

Comparing COMP/CRAB/python/parseCrabFjr.py (file contents):
Revision 1.9 by afanfani, Tue May 13 10:22:55 2008 UTC vs.
Revision 1.14 by fanzago, Wed Jul 2 16:07:30 2008 UTC

# Line 5 | Line 5 | import sys, getopt, string
5   from ProdCommon.FwkJobRep.ReportParser import readJobReport
6   from DashboardAPI import apmonSend, apmonFree
7  
8 + class parseFjr:
9 +    def __init__(self, argv):
10 +        """
11 +        parseCrabFjr
12 +
13 +        - parse CRAB FrameworkJobReport on WN: { 'protocol' : { 'action' : [attempted,succedeed,total-size,total-time,min-time,max-time] , ... } , ... }
14 +        - report parameters to DashBoard using DashBoardApi.py: for all 'read' actions of all protocols, report MBPS
15 +        - return ExitStatus and dump of DashBoard report separated by semi-colon to WN wrapper script
16 +        """
17 +        # defaults
18 +        self.input = ''
19 +        self.MonitorID = ''
20 +        self.MonitorJobID = ''
21 +        self.info2dash = False
22 +        self.exitCode = False
23 +        self.lfnList = False  
24 +        self.debug = 0
25 +        try:
26 +            opts, args = getopt.getopt(argv, "", ["input=", "dashboard=", "exitcode", "lfn" , "debug", "help"])
27 +        except getopt.GetoptError:
28 +            print self.usage()
29 +            sys.exit(2)
30 +        self.check(opts)  
31 +
32 +        return
33 +
34 +    def check(self,opts):
35 +        # check command line parameter
36 +        for opt, arg in opts :
37 +            if opt  == "--help" :
38 +                print self.usage()
39 +                sys.exit()
40 +            elif opt == "--input" :
41 +                self.input = arg
42 +            elif opt == "--exitcode":
43 +                self.exitCode = True
44 +            elif opt == "--lfn":
45 +                self.lfnList = True
46 +            elif opt == "--dashboard":
47 +                self.info2dash = True
48 +                try:
49 +                   self.MonitorID = arg.split(",")[0]
50 +                   self.MonitorJobID = arg.split(",")[1]
51 +                except:
52 +                   self.MonitorID = ''
53 +                   self.MonitorJobID = ''
54 +            elif opt == "--debug" :
55 +                self.debug = 1
56 +                
57 +        if self.input == '' or (not self.info2dash and not self.lfnList and not self.exitCode)  :
58 +            print self.usage()
59 +            sys.exit()
60 +        
61 +        if self.info2dash:
62 +            if self.MonitorID == '' or self.MonitorJobID == '':
63 +                print self.usage()
64 +                sys.exit()
65 +        return
66 +
67 +    def run(self):
68  
69 < def main(argv) :
70 <    """
71 <    parseCrabFjr
72 <
73 <    - parse CRAB FrameworkJobReport on WN: { 'protocol' : { 'action' : [attempted,succedeed,total-size,total-time,min-time,max-time] , ... } , ... }
14 <    - report parameters to DashBoard using DashBoardApi.py: for all 'read' actions of all protocols, report MBPS
15 <    - return ExitStatus and dump of DashBoard report separated by semi-colon to WN wrapper script
16 <
17 <    required parameters:
18 <    --input            :       input FJR xml file
19 <    --MonitorID        :       DashBoard MonitorID
20 <    --MonitorJobID     :       DashBoard MonitorJobID
21 <
22 <    optional parameters:
23 <    --help             :       help
24 <    --debug            :       debug statements
25 <    
26 <    """
27 <
28 <    # defaults
29 <    input = ''
30 <    MonitorID = ''
31 <    MonitorJobID = ''
32 <    debug = 0
33 <
34 <    try:
35 <        opts, args = getopt.getopt(argv, "", ["input=", "MonitorID=", "MonitorJobID=", "debug", "help"])
36 <    except getopt.GetoptError:
37 <        print main.__doc__
38 <        sys.exit(2)
39 <
40 <    # check command line parameter
41 <    for opt, arg in opts :
42 <        if opt  == "--help" :
43 <            print main.__doc__
69 >        # load FwkJobRep
70 >        try:
71 >            jobReport = readJobReport(self.input)[0]
72 >        except:
73 >            print '50115'
74              sys.exit()
45        elif opt == "--input" :
46            input = arg
47        elif opt == "--MonitorID" :
48            MonitorID = arg
49        elif opt == "--MonitorJobID" :
50            MonitorJobID = arg
51        elif opt == "--debug" :
52            debug = 1
75              
76 <    if input == '' or MonitorID == '' or MonitorJobID == '':
77 <        print main.__doc__
78 <        sys.exit()
79 <
80 <    # load FwkJobRep
81 <    jobReport = readJobReport(input)[0]
82 <
83 <    exit_status = ''
84 <    
85 <    ##### temporary fix for FJR incomplete ####
86 <    fjr = open (input)
87 <    len_fjr = len(fjr.readlines())
88 <    if (len_fjr <= 6):
89 <       ### 50115 - cmsRun did not produce a valid/readable job report at runtime
90 <       exit_status = str(50115)
91 <    else:
92 <        # get ExitStatus of last error
93 <        if len(jobReport.errors) != 0 :
94 <            exit_status = str(jobReport.errors[-1]['ExitStatus'])
95 <        else :
96 <            exit_status = str(0)
97 <
98 <    # get i/o statistics
99 <    storageStatistics = str(jobReport.storageStatistics)
100 <
101 <    # dashboard report dictionary
102 <    dashboard_report = {}
103 <
104 < ##Brian's patch to sent number of events procedded to the Dashboard
105 <    # Add NoEventsPerRun to the Dashboard report
106 <    eventsPerRun = 0
107 <    for inputFile in jobReport.inputFiles:
76 >        if self.exitCode :
77 >            self.exitCodes(jobReport)
78 >        if self.lfnList :
79 >           self.lfn_List(jobReport)
80 >        if self.info2dash :
81 >           self.reportDash(jobReport)
82 >        return
83 >
84 >    def exitCodes(self, jobReport):
85 >
86 >        exit_status = ''
87 >        ##### temporary fix for FJR incomplete ####
88 >        fjr = open (self.input)
89 >        len_fjr = len(fjr.readlines())
90 >        if (len_fjr <= 6):
91 >           ### 50115 - cmsRun did not produce a valid/readable job report at runtime
92 >           exit_status = str(50115)
93 >        else:
94 >            # get ExitStatus of last error
95 >            if len(jobReport.errors) != 0 :
96 >                exit_status = str(jobReport.errors[-1]['ExitStatus'])
97 >            else :
98 >                exit_status = str(0)
99 >        #check exit code
100 >        if string.strip(exit_status) == '': exit_status = -999
101 >        print exit_status  
102 >  
103 >        return
104 >
105 >    def lfn_List(self,jobReport):
106 >        '''
107 >        get list of analyzed files
108 >        '''
109 >        lfnlist = [x['LFN'] for x in jobReport.inputFiles]
110 >        for file in lfnlist: print file
111 >        return
112 >
113 >    def storageStat(self,jobReport):
114 >        '''
115 >        get i/o statistics
116 >        '''
117 >        storageStatistics = str(jobReport.storageStatistics)
118 >        storage_report = {}
119 >
120 >        # check if storageStatistics is valid
121 >        if storageStatistics.find('Storage statistics:') != -1 :
122 >            # report form: { 'protocol' : { 'action' : [attempted,succedeed,total-size,total-time,min-time,max-time] , ... } , ... }
123 >            data = storageStatistics.split('Storage statistics:')[1]
124 >            data_fields = data.split(';')
125 >            for data_field in data_fields:
126 >                # parse: format protocol/action = attepted/succedeed/total-size/total-time/min-time/max-time
127 >                if data_field == ' ' or not data_field or data_field == '':
128 >                   continue
129 >                key = data_field.split('=')[0].strip()
130 >                item = data_field.split('=')[1].strip()
131 >                protocol = str(key.split('/')[0].strip())
132 >                action = str(key.split('/')[1].strip())
133 >                item_array = item.split('/')
134 >                attempted = str(item_array[0].strip())
135 >                succeeded = str(item_array[1].strip())
136 >                total_size = str(item_array[2].strip().split('MB')[0])
137 >                total_time = str(item_array[3].strip().split('ms')[0])
138 >                min_time = str(item_array[4].strip().split('ms')[0])
139 >                max_time = str(item_array[5].strip().split('ms')[0])
140 >                # add to report
141 >                if protocol in storage_report.keys() :
142 >                    if action in storage_report[protocol].keys() :
143 >                        print 'protocol/action:',protocol,'/',action,'listed twice in report, taking the first'
144 >                    else :
145 >                        storage_report[protocol][action] = [attempted,succeeded,total_size,total_time,min_time,max_time]
146 >                else :
147 >                    storage_report[protocol] = {action : [attempted,succeeded,total_size,total_time,min_time,max_time] }
148 >
149 >            if self.debug :
150 >                for protocol in storage_report.keys() :
151 >                    print 'protocol:',protocol
152 >                    for action in storage_report[protocol].keys() :
153 >                        print 'action:',action,'measurement:',storage_report[protocol][action]
154 >
155 >        #####
156 >        # throughput measurements # Fabio
157 >        throughput_report = { 'rSize':0.0, 'rTime':0.0, 'wSize':0.0, 'wTime':0.0 }
158 >        for protocol in storage_report.keys() :
159 >            for action in storage_report[protocol].keys():
160 >                # not interesting
161 >                if 'read' not in action and 'write' not in action and 'seek' not in action:
162 >                    continue
163 >
164 >                # convert values
165 >                try:
166 >                    sizeValue = float(storage_report[protocol][action][2])
167 >                    timeValue = float(storage_report[protocol][action][3])
168 >                except Exception,e:
169 >                    continue
170 >
171 >                # aggregate data
172 >                if 'read' in action:  
173 >                    throughput_report['rSize'] += sizeValue
174 >                    throughput_report['rTime'] += timeValue
175 >                elif 'write' in action:
176 >                    throughput_report['wSize'] += sizeValue
177 >                    throughput_report['wTime'] += timeValue
178 >                elif 'seek' in action:
179 >                    throughput_report['rTime'] += timeValue
180 >                else:
181 >                   continue
182 >
183 >        # calculate global throughput
184 >        throughput_report['readThr'] = 'NULL'
185 >        if throughput_report['rTime'] > 0.0:
186 >            throughput_report['rTime'] /= 1000.0 # scale ms to s
187 >            throughput_report['readThr'] = float(throughput_report['rSize']/throughput_report['rTime'])
188 >
189 >        throughput_report['avgNetThr'] = 'NULL'
190          try:
191 <            eventsRead = str(inputFile.get('EventsRead', 0))
88 <            eventsRead = int(eventsRead.strip())
191 >            throughput_report['avgNetThr'] = throughput_report['rSize'] / float(jobReport.performance.summaries['ExeTime'])
192          except:
193 <            continue
91 <        eventsPerRun += eventsRead
92 <    dashboard_report['NoEventsPerRun'] = eventsPerRun
93 <    dashboard_report['NbEvPerRun'] = eventsPerRun
94 <    dashboard_report['NEventsProcessed'] = eventsPerRun
95 <        #print "Total number of events:", eventsPerRun
96 < ##
97 <
98 <    # check if storageStatistics is valid
99 <    if storageStatistics.find('Storage statistics:') != -1 :
100 <        # report form: { 'protocol' : { 'action' : [attempted,succedeed,total-size,total-time,min-time,max-time] , ... } , ... }
101 <        report = {}
102 <        data = storageStatistics.split('Storage statistics:')[1]
103 <        data_fields = data.split(';')
104 <        for data_field in data_fields:
105 <            # parse: format protocol/action = attepted/succedeed/total-size/total-time/min-time/max-time
106 <            if data_field == ' ' or not data_field or data_field == '':
107 <               continue
108 <            key = data_field.split('=')[0].strip()
109 <            item = data_field.split('=')[1].strip()
110 <            protocol = str(key.split('/')[0].strip())
111 <            action = str(key.split('/')[1].strip())
112 <            item_array = item.split('/')
113 <            attempted = str(item_array[0].strip())
114 <            succeeded = str(item_array[1].strip())
115 <            total_size = str(item_array[2].strip().split('MB')[0])
116 <            total_time = str(item_array[3].strip().split('ms')[0])
117 <            min_time = str(item_array[4].strip().split('ms')[0])
118 <            max_time = str(item_array[5].strip().split('ms')[0])
119 <            # add to report
120 <            if protocol in report.keys() :
121 <                if action in report[protocol].keys() :
122 <                    print 'protocol/action:',protocol,'/',action,'listed twice in report, taking the first'
123 <                else :
124 <                    report[protocol][action] = [attempted,succeeded,total_size,total_time,min_time,max_time]
125 <            else :
126 <                report[protocol] = {action : [attempted,succeeded,total_size,total_time,min_time,max_time] }
193 >            pass
194  
195 <        if debug :
196 <            for protocol in report.keys() :
197 <                print 'protocol:',protocol
198 <                for action in report[protocol].keys() :
199 <                    print 'action:',action,'measurement:',report[protocol][action]
195 >        throughput_report['writeThr'] = 'NULL'
196 >        if throughput_report['wTime'] > 0.0:
197 >            throughput_report['wTime'] /= 1000.0
198 >            throughput_report['writeThr'] = float(throughput_report['wSize']/throughput_report['wTime'])
199 >        #####
200 >
201 >        if self.debug == 1 :
202 >            print storage_report
203 >            print throughput_report
204 >        return storage_report, throughput_report
205 >
206 >    def n_of_events(self,jobReport):
207 >        '''
208 >        #Brian's patch to sent number of events procedded to the Dashboard
209 >        # Add NoEventsPerRun to the Dashboard report
210 >        '''  
211 >        event_report = {}
212 >        eventsPerRun = 0
213 >        for inputFile in jobReport.inputFiles:
214 >            try:
215 >                eventsRead = str(inputFile.get('EventsRead', 0))
216 >                eventsRead = int(eventsRead.strip())
217 >            except:
218 >                continue
219 >            eventsPerRun += eventsRead
220 >        event_report['NoEventsPerRun'] = eventsPerRun
221 >        event_report['NbEvPerRun'] = eventsPerRun
222 >        event_report['NEventsProcessed'] = eventsPerRun
223 >
224 >        if self.debug == 1 : print event_report
225 >
226 >        return event_report
227 >      
228 >    def reportDash(self,jobReport):
229 >        '''
230 >        dashboard report dictionary
231 >        '''
232 >        event_report = self.n_of_events(jobReport)
233 >        storage_report, throughput_report = self.storageStat(jobReport)
234 >        dashboard_report = {}
235 >        #
236 >        for k,v in event_report.iteritems() :
237 >            dashboard_report[k]=v
238  
239          # extract information to be sent to DashBoard
240          # per protocol and for action=read, calculate MBPS
241          # dashboard key is io_action
242 <        dashboard_report['MonitorID'] = MonitorID
243 <        dashboard_report['MonitorJobID'] = MonitorJobID
244 <        for protocol in report.keys() :
245 <            for action in report[protocol].keys() :
246 <                try: size = float(report[protocol][action][2])
242 >        dashboard_report['MonitorID'] = self.MonitorID
243 >        dashboard_report['MonitorJobID'] = self.MonitorJobID
244 >        for protocol in storage_report.keys() :
245 >            for action in storage_report[protocol].keys() :
246 >                try: size = float(storage_report[protocol][action][2])
247                  except: size = 'NULL'
248 <                try: time = float(report[protocol][action][3])/1000
248 >                try: time = float(storage_report[protocol][action][3])/1000
249                  except: time = 'NULL'
250                  dashboard_report['io_'+protocol+'_'+action] = str(size)+'_'+str(time)
251 <
147 <        if debug :
251 >        if self.debug :
252              ordered = dashboard_report.keys()
253              ordered.sort()
254              for key in ordered:
255                  print key,'=',dashboard_report[key]
256  
257 +        # IO throughput information
258 +        dashboard_report['io_read_throughput'] = throughput_report['readThr']
259 +        dashboard_report['io_write_throughput'] = throughput_report['writeThr']
260 +        dashboard_report['io_netAvg_throughput'] = throughput_report['avgNetThr']
261 +
262          # send to DashBoard
263 <        apmonSend(MonitorID, MonitorJobID, dashboard_report)
263 >        apmonSend(self.MonitorID, self.MonitorJobID, dashboard_report)
264          apmonFree()
265  
266 <    # prepare exit string
158 <    exit_string = str(exit_status)
159 <    for key in dashboard_report.keys() :
160 <        exit_string += ';' + str(key) + '=' + str(dashboard_report[key])
266 >        if self.debug == 1 : print dashboard_report
267  
268 <    
268 >        return
269  
270 <    return exit_string
270 >    def usage(self):
271 >        
272 >        msg="""
273 >        required parameters:
274 >        --input            :       input FJR xml file
275 >
276 >        optional parameters:
277 >        --dashboard        :       send info to the dashboard. require following args: "MonitorID,MonitorJobID"
278 >            MonitorID        :       DashBoard MonitorID
279 >            MonitorJobID     :       DashBoard MonitorJobID
280 >        --exitcode         :       print executable exit code
281 >        --lfn              :       report list of files really analyzed
282 >        --help             :       help
283 >        --debug            :       debug statements
284 >        """
285 >        return msg
286  
287  
288   if __name__ == '__main__' :
289 <    exit_status = main(sys.argv[1:])
290 <    # output for wrapper script
291 <    print exit_status
292 <
293 <    
289 >    try:
290 >        parseFjr_ = parseFjr(sys.argv[1:])
291 >        parseFjr_.run()  
292 >    except:
293 >        pass

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines