ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/parseCrabFjr.py
(Generate patch)

Comparing COMP/CRAB/python/parseCrabFjr.py (file contents):
Revision 1.9 by afanfani, Tue May 13 10:22:55 2008 UTC vs.
Revision 1.19 by spiga, Fri Apr 22 13:08:00 2011 UTC

# Line 1 | Line 1
1   #!/usr/bin/env python
2  
3 < import sys, getopt, string
3 > import sys, getopt, string, os
4  
5   from ProdCommon.FwkJobRep.ReportParser import readJobReport
6   from DashboardAPI import apmonSend, apmonFree
7  
8 + class parseFjr:
9 +    def __init__(self, argv):
10 +        """
11 +        parseCrabFjr
12 +
13 +        - parse CRAB FrameworkJobReport on WN: { 'protocol' : { 'action' : [attempted,succedeed,total-size,total-time,min-time,max-time] , ... } , ... }
14 +        - report parameters to DashBoard using DashBoardApi.py: for all 'read' actions of all protocols, report MBPS
15 +        - return ExitStatus and dump of DashBoard report separated by semi-colon to WN wrapper script
16 +        """
17 +        # defaults
18 +        self.input = ''
19 +        self.MonitorID = ''
20 +        self.MonitorJobID = ''
21 +        self.info2dash = False
22 +        self.exitCode = False
23 +        self.lfnList = False  
24 +        self.debug = 0
25 +        try:
26 +            opts, args = getopt.getopt(argv, "", ["input=", "dashboard=", "exitcode", "lfn" , "debug", "popularity=", "help"])
27 +        except getopt.GetoptError:
28 +            print self.usage()
29 +            sys.exit(2)
30 +        self.check(opts)  
31 +
32 +        return
33 +
34 +    def check(self,opts):
35 +        # check command line parameter
36 +        for opt, arg in opts :
37 +            if opt  == "--help" :
38 +                print self.usage()
39 +                sys.exit()
40 +            elif opt == "--input" :
41 +                self.input = arg
42 +            elif opt == "--exitcode":
43 +                self.exitCode = True
44 +            elif opt == "--lfn":
45 +                self.lfnList = True
46 +            elif opt == "--popularity":
47 +                self.popularity = True
48 +                try:
49 +                   self.MonitorID = arg.split(",")[0]
50 +                   self.MonitorJobID = arg.split(",")[1]
51 +                   self.inputInfos = arg.split(",")[2]
52 +                except:
53 +                   self.MonitorID = ''
54 +                   self.MonitorJobID = ''
55 +                   self.inputInfos = ''
56 +            elif opt == "--dashboard":
57 +                self.info2dash = True
58 +                try:
59 +                   self.MonitorID = arg.split(",")[0]
60 +                   self.MonitorJobID = arg.split(",")[1]
61 +                except:
62 +                   self.MonitorID = ''
63 +                   self.MonitorJobID = ''
64 +            elif opt == "--debug" :
65 +                self.debug = 1
66 +                
67 +        if self.input == '' or (not self.info2dash and not self.lfnList and not self.exitCode and not self.popularity)  :
68 +            print self.usage()
69 +            sys.exit()
70 +        
71 +        if self.info2dash:
72 +            if self.MonitorID == '' or self.MonitorJobID == '':
73 +                print self.usage()
74 +                sys.exit()
75 +        return
76  
77 < def main(argv) :
78 <    """
79 <    parseCrabFjr
80 <
81 <    - parse CRAB FrameworkJobReport on WN: { 'protocol' : { 'action' : [attempted,succedeed,total-size,total-time,min-time,max-time] , ... } , ... }
82 <    - report parameters to DashBoard using DashBoardApi.py: for all 'read' actions of all protocols, report MBPS
83 <    - return ExitStatus and dump of DashBoard report separated by semi-colon to WN wrapper script
16 <
17 <    required parameters:
18 <    --input            :       input FJR xml file
19 <    --MonitorID        :       DashBoard MonitorID
20 <    --MonitorJobID     :       DashBoard MonitorJobID
21 <
22 <    optional parameters:
23 <    --help             :       help
24 <    --debug            :       debug statements
25 <    
26 <    """
27 <
28 <    # defaults
29 <    input = ''
30 <    MonitorID = ''
31 <    MonitorJobID = ''
32 <    debug = 0
33 <
34 <    try:
35 <        opts, args = getopt.getopt(argv, "", ["input=", "MonitorID=", "MonitorJobID=", "debug", "help"])
36 <    except getopt.GetoptError:
37 <        print main.__doc__
38 <        sys.exit(2)
39 <
40 <    # check command line parameter
41 <    for opt, arg in opts :
42 <        if opt  == "--help" :
43 <            print main.__doc__
77 >    def run(self):
78 >
79 >        # load FwkJobRep
80 >        try:
81 >            jobReport = readJobReport(self.input)[0]
82 >        except:
83 >            print '50115'
84              sys.exit()
45        elif opt == "--input" :
46            input = arg
47        elif opt == "--MonitorID" :
48            MonitorID = arg
49        elif opt == "--MonitorJobID" :
50            MonitorJobID = arg
51        elif opt == "--debug" :
52            debug = 1
85              
86 <    if input == '' or MonitorID == '' or MonitorJobID == '':
87 <        print main.__doc__
88 <        sys.exit()
89 <
90 <    # load FwkJobRep
91 <    jobReport = readJobReport(input)[0]
92 <
93 <    exit_status = ''
94 <    
95 <    ##### temporary fix for FJR incomplete ####
96 <    fjr = open (input)
97 <    len_fjr = len(fjr.readlines())
98 <    if (len_fjr <= 6):
99 <       ### 50115 - cmsRun did not produce a valid/readable job report at runtime
100 <       exit_status = str(50115)
101 <    else:
102 <        # get ExitStatus of last error
103 <        if len(jobReport.errors) != 0 :
104 <            exit_status = str(jobReport.errors[-1]['ExitStatus'])
105 <        else :
106 <            exit_status = str(0)
107 <
108 <    # get i/o statistics
109 <    storageStatistics = str(jobReport.storageStatistics)
110 <
111 <    # dashboard report dictionary
112 <    dashboard_report = {}
113 <
114 < ##Brian's patch to sent number of events procedded to the Dashboard
115 <    # Add NoEventsPerRun to the Dashboard report
116 <    eventsPerRun = 0
117 <    for inputFile in jobReport.inputFiles:
86 >        if self.exitCode :
87 >            self.exitCodes(jobReport)
88 >        if self.lfnList :
89 >           self.lfn_List(jobReport)
90 >        if self.info2dash :
91 >           self.reportDash(jobReport)
92 >        if self.popularity:
93 >           self.popularityInfos(jobReport)
94 >        return
95 >
96 >    def exitCodes(self, jobReport):
97 >
98 >        exit_status = ''
99 >        ##### temporary fix for FJR incomplete ####
100 >        fjr = open (self.input)
101 >        len_fjr = len(fjr.readlines())
102 >        if (len_fjr <= 6):
103 >           ### 50115 - cmsRun did not produce a valid/readable job report at runtime
104 >           exit_status = str(50115)
105 >        else:
106 >            # get ExitStatus of last error
107 >            if len(jobReport.errors) != 0 :
108 >                exit_status = str(jobReport.errors[-1]['ExitStatus'])
109 >            else :
110 >                exit_status = str(0)
111 >        #check exit code
112 >        if string.strip(exit_status) == '': exit_status = -999
113 >        print exit_status  
114 >  
115 >        return
116 >
117 >    def lfn_List(self,jobReport):
118 >        '''
119 >        get list of analyzed files
120 >        '''
121 >        lfnlist = [x['LFN'] for x in jobReport.inputFiles]
122 >        for file in lfnlist: print file
123 >        return
124 >
125 >    def storageStat(self,jobReport):
126 >        '''
127 >        get i/o statistics
128 >        '''
129 >        storageStatistics = str(jobReport.storageStatistics)
130 >        storage_report = {}
131 >
132 >        # check if storageStatistics is valid
133 >        if storageStatistics.find('Storage statistics:') != -1 :
134 >            # report form: { 'protocol' : { 'action' : [attempted,succedeed,total-size,total-time,min-time,max-time] , ... } , ... }
135 >            data = storageStatistics.split('Storage statistics:')[1]
136 >            data_fields = data.split(';')
137 >            for data_field in data_fields:
138 >                # parse: format protocol/action = attepted/succedeed/total-size/total-time/min-time/max-time
139 >                if data_field == ' ' or not data_field or data_field == '':
140 >                   continue
141 >                key = data_field.split('=')[0].strip()
142 >                item = data_field.split('=')[1].strip()
143 >                protocol = str(key.split('/')[0].strip())
144 >                action = str(key.split('/')[1].strip())
145 >                item_array = item.split('/')
146 >                attempted = str(item_array[0].strip())
147 >                succeeded = str(item_array[1].strip())
148 >                total_size = str(item_array[2].strip().split('MB')[0])
149 >                total_time = str(item_array[3].strip().split('ms')[0])
150 >                min_time = str(item_array[4].strip().split('ms')[0])
151 >                max_time = str(item_array[5].strip().split('ms')[0])
152 >                # add to report
153 >                if protocol in storage_report.keys() :
154 >                    if action in storage_report[protocol].keys() :
155 >                        print 'protocol/action:',protocol,'/',action,'listed twice in report, taking the first'
156 >                    else :
157 >                        storage_report[protocol][action] = [attempted,succeeded,total_size,total_time,min_time,max_time]
158 >                else :
159 >                    storage_report[protocol] = {action : [attempted,succeeded,total_size,total_time,min_time,max_time] }
160 >
161 >            if self.debug :
162 >                for protocol in storage_report.keys() :
163 >                    print 'protocol:',protocol
164 >                    for action in storage_report[protocol].keys() :
165 >                        print 'action:',action,'measurement:',storage_report[protocol][action]
166 >
167 >        #####
168 >        # throughput measurements # Fabio
169 >        throughput_report = { 'rSize':0.0, 'rTime':0.0, 'wSize':0.0, 'wTime':0.0 }
170 >        for protocol in storage_report.keys() :
171 >            for action in storage_report[protocol].keys():
172 >                # not interesting
173 >                if 'read' not in action and 'write' not in action and 'seek' not in action:
174 >                    continue
175 >
176 >                # convert values
177 >                try:
178 >                    sizeValue = float(storage_report[protocol][action][2])
179 >                    timeValue = float(storage_report[protocol][action][3])
180 >                except Exception,e:
181 >                    continue
182 >
183 >                # aggregate data
184 >                if 'read' in action:  
185 >                    throughput_report['rSize'] += sizeValue
186 >                    throughput_report['rTime'] += timeValue
187 >                elif 'write' in action:
188 >                    throughput_report['wSize'] += sizeValue
189 >                    throughput_report['wTime'] += timeValue
190 >                elif 'seek' in action:
191 >                    throughput_report['rTime'] += timeValue
192 >                else:
193 >                   continue
194 >
195 >        # calculate global throughput
196 >        throughput_report['readThr'] = 'NULL'
197 >        if throughput_report['rTime'] > 0.0:
198 >            throughput_report['rTime'] /= 1000.0 # scale ms to s
199 >            throughput_report['readThr'] = float(throughput_report['rSize']/throughput_report['rTime'])
200 >
201 >        throughput_report['avgNetThr'] = 'NULL'
202          try:
203 <            eventsRead = str(inputFile.get('EventsRead', 0))
88 <            eventsRead = int(eventsRead.strip())
203 >            throughput_report['avgNetThr'] = throughput_report['rSize'] / float(jobReport.performance.summaries['ExeTime'])
204          except:
205 <            continue
91 <        eventsPerRun += eventsRead
92 <    dashboard_report['NoEventsPerRun'] = eventsPerRun
93 <    dashboard_report['NbEvPerRun'] = eventsPerRun
94 <    dashboard_report['NEventsProcessed'] = eventsPerRun
95 <        #print "Total number of events:", eventsPerRun
96 < ##
97 <
98 <    # check if storageStatistics is valid
99 <    if storageStatistics.find('Storage statistics:') != -1 :
100 <        # report form: { 'protocol' : { 'action' : [attempted,succedeed,total-size,total-time,min-time,max-time] , ... } , ... }
101 <        report = {}
102 <        data = storageStatistics.split('Storage statistics:')[1]
103 <        data_fields = data.split(';')
104 <        for data_field in data_fields:
105 <            # parse: format protocol/action = attepted/succedeed/total-size/total-time/min-time/max-time
106 <            if data_field == ' ' or not data_field or data_field == '':
107 <               continue
108 <            key = data_field.split('=')[0].strip()
109 <            item = data_field.split('=')[1].strip()
110 <            protocol = str(key.split('/')[0].strip())
111 <            action = str(key.split('/')[1].strip())
112 <            item_array = item.split('/')
113 <            attempted = str(item_array[0].strip())
114 <            succeeded = str(item_array[1].strip())
115 <            total_size = str(item_array[2].strip().split('MB')[0])
116 <            total_time = str(item_array[3].strip().split('ms')[0])
117 <            min_time = str(item_array[4].strip().split('ms')[0])
118 <            max_time = str(item_array[5].strip().split('ms')[0])
119 <            # add to report
120 <            if protocol in report.keys() :
121 <                if action in report[protocol].keys() :
122 <                    print 'protocol/action:',protocol,'/',action,'listed twice in report, taking the first'
123 <                else :
124 <                    report[protocol][action] = [attempted,succeeded,total_size,total_time,min_time,max_time]
125 <            else :
126 <                report[protocol] = {action : [attempted,succeeded,total_size,total_time,min_time,max_time] }
205 >            pass
206  
207 <        if debug :
208 <            for protocol in report.keys() :
209 <                print 'protocol:',protocol
210 <                for action in report[protocol].keys() :
211 <                    print 'action:',action,'measurement:',report[protocol][action]
207 >        throughput_report['writeThr'] = 'NULL'
208 >        if throughput_report['wTime'] > 0.0:
209 >            throughput_report['wTime'] /= 1000.0
210 >            throughput_report['writeThr'] = float(throughput_report['wSize']/throughput_report['wTime'])
211 >        #####
212 >
213 >        if self.debug == 1 :
214 >            print storage_report
215 >            print throughput_report
216 >        return storage_report, throughput_report
217 >
218 >    def popularityInfos(self, jobReport):
219 >        report_dict = {}
220 >        inputList = []
221 >        inputParentList = []                        
222 >        report_dict['inputBlocks'] = ''
223 >        if (os.path.exists(self.inputInfos)):
224 >            file=open(self.inputInfos,'r')
225 >            lines = file.readlines()
226 >            for line in lines:
227 >                if line.find("inputBlocks")>=0:
228 >                    report_dict['inputBlocks']= line.split("=")[1].strip()
229 >                if line.find("inputFiles")>=0:
230 >                    inputList = line.split("=")[1].strip().split(";")
231 >                if line.find("parentFiles")>=0:
232 >                    inputParentList = line.split("=")[1].strip().split(";")
233 >            file.close()
234 >        if len(inputList) == 1 and inputList[0] == '':
235 >            inputList=[]
236 >        if len(inputParentList) == 1 and inputParentList[0] == '':
237 >            inputParentList=[]
238 >        basename = ''
239 >        if len(inputList) > 1:
240 >            basename = os.path.commonprefix(inputList)
241 >        elif len(inputList) == 1:
242 >            basename =  "%s/"%os.path.dirname(inputList[0])
243 >        basenameParent = ''
244 >        if len(inputParentList) > 1:
245 >            basenameParent = os.path.commonprefix(inputParentList)
246 >        elif len(inputParentList) == 1:
247 >            basenameParent = "%s/"%os.path.dirname(inputParentList[0])
248 >
249 >        readFile = {}  
250 >
251 >        readFileParent = {}
252 >        fileAttr = []
253 >        fileParentAttr = []
254 >        for inputFile in  jobReport.inputFiles:
255 >            if inputFile['LFN'].find(basename) >=0:
256 >                fileAttr = (inputFile.get("FileType"), "Local", inputFile.get("Runs"))
257 >                readFile[inputFile.get("LFN").split(basename)[1]] = fileAttr
258 >            else:
259 >                fileParentAttr = (inputFile.get("FileType"), "Local", inputFile.get("Runs"))
260 >                readParentFile[inputFile.get("LFN").split(basenameParent)[1]] = fileParentAttr
261 >        cleanedinputList = []
262 >        for file in inputList:        
263 >            cleanedinputList.append(file.split(basename)[1])
264 >        cleanedParentList = []
265 >        for file in inputParentList:        
266 >            cleanedParentList.append(file.split(basenameParent)[1])
267 >
268 >        inputString = ''
269 >        LumisString = ''
270 >        countFile = 1
271 >        for f,t in readFile.items():
272 >            cleanedinputList.remove(f)    
273 >            inputString += '%s::%d::%s::%s::%d;'%(f,1,t[0],t[1],countFile)
274 >            LumisString += '%s::%s::%d;'%(t[2].keys()[0],self.makeRanges(t[2].values()[0]),countFile)  
275 >            countFile += 1
276 >
277 >        inputParentString = ''
278 >        LumisParentString  = ''
279 >        countParentFile = 1
280 >        for fp,tp in readFileParent.items():
281 >            cleanedParentList.remove(fp)    
282 >            inputParentString += '%s::%d::%s::%s::%d;'%(fp,1,tp[0],tp[1],countParentFile)
283 >            LumisParentString += '%s::%s::%d;'%(tp[2].keys()[0],self.makeRanges(tp[2].values()[0]),countParentFile)  
284 >            countParentFile += 1
285 >
286 >        if len(cleanedinputList):
287 >           for file in cleanedinputList :
288 >               if len(jobReport.errors):
289 >                   if jobReport.errors[0]["Description"].find(file) >= 0:
290 >                       inputString += '%s::%d::%s::%s::%s;'%(file,0,'Unknown','Local','Unknown')
291 >                   else:
292 >                       inputString += '%s::%d::%s::%s::%s;'%(file,2,'Unknown','Unknown','Unknown')
293 >               else:
294 >                   inputString += '%s::%d::%s::%s::%s;'%(file,2,'Unknown','Unknown','Unknown')
295 >
296 >        if len(cleanedParentList):
297 >           for file in cleanedParentList :
298 >               if len(jobReport.errors):
299 >                   if jobReport.errors[0]["Description"].find(file) >= 0:
300 >                       inputString += '%s::%d::%s::%s::%s;'%(file,0,'Unknown','Local','Unknown')
301 >                   else:
302 >                       inputString += '%s::%d::%s::%s::%s;'%(file,2,'Unknown','Unknown','Unknown')
303 >               else:
304 >                   inputParentString += '%s::%d::%s::%s::%s;'%(file,2,'Unknown','Unknown','Unknown')
305 >
306 >        report_dict['inputFiles']= inputString
307 >        report_dict['parentFiles']= inputParentString
308 >        report_dict['lumisRange']= LumisString
309 >        report_dict['lumisParentRange']= LumisParentString
310 >        report_dict['Basename']= basename
311 >        report_dict['BasenameParent']= basenameParent
312 >
313 >         # send to DashBoard
314 >        apmonSend(self.MonitorID, self.MonitorJobID, report_dict)
315 >        apmonFree()
316 >
317 >       # if self.debug == 1 :
318 >        print "Popularity start"
319 >        for k,v in report_dict.items():
320 >            print "%s : %s"%(k,v)
321 >        print "Popularity stop"
322 >        return  
323 >
324 >    def n_of_events(self,jobReport):
325 >        '''
326 >        #Brian's patch to sent number of events procedded to the Dashboard
327 >        # Add NoEventsPerRun to the Dashboard report
328 >        '''  
329 >        event_report = {}
330 >        eventsPerRun = 0
331 >        for inputFile in jobReport.inputFiles:
332 >            try:
333 >                eventsRead = str(inputFile.get('EventsRead', 0))
334 >                eventsRead = int(eventsRead.strip())
335 >            except:
336 >                continue
337 >            eventsPerRun += eventsRead
338 >        event_report['NoEventsPerRun'] = eventsPerRun
339 >        event_report['NbEvPerRun'] = eventsPerRun
340 >        event_report['NEventsProcessed'] = eventsPerRun
341 >
342 >        if self.debug == 1 : print event_report
343 >
344 >        return event_report
345 >      
346 >    def reportDash(self,jobReport):
347 >        '''
348 >        dashboard report dictionary
349 >        '''
350 >        event_report = self.n_of_events(jobReport)
351 >        storage_report, throughput_report = self.storageStat(jobReport)
352 >        dashboard_report = {}
353 >        #
354 >        for k,v in event_report.iteritems() :
355 >            dashboard_report[k]=v
356  
357          # extract information to be sent to DashBoard
358          # per protocol and for action=read, calculate MBPS
359          # dashboard key is io_action
360 <        dashboard_report['MonitorID'] = MonitorID
361 <        dashboard_report['MonitorJobID'] = MonitorJobID
362 <        for protocol in report.keys() :
363 <            for action in report[protocol].keys() :
364 <                try: size = float(report[protocol][action][2])
360 >        dashboard_report['MonitorID'] = self.MonitorID
361 >        dashboard_report['MonitorJobID'] = self.MonitorJobID
362 >        for protocol in storage_report.keys() :
363 >            for action in storage_report[protocol].keys() :
364 >                try: size = float(storage_report[protocol][action][2])
365                  except: size = 'NULL'
366 <                try: time = float(report[protocol][action][3])/1000
366 >                try: time = float(storage_report[protocol][action][3])/1000
367                  except: time = 'NULL'
368                  dashboard_report['io_'+protocol+'_'+action] = str(size)+'_'+str(time)
369 <
147 <        if debug :
369 >        if self.debug :
370              ordered = dashboard_report.keys()
371              ordered.sort()
372              for key in ordered:
373                  print key,'=',dashboard_report[key]
374  
375 +        # IO throughput information
376 +        dashboard_report['io_read_throughput'] = throughput_report['readThr']
377 +        dashboard_report['io_write_throughput'] = throughput_report['writeThr']
378 +        dashboard_report['io_netAvg_throughput'] = throughput_report['avgNetThr']
379 +
380          # send to DashBoard
381 <        apmonSend(MonitorID, MonitorJobID, dashboard_report)
381 >        apmonSend(self.MonitorID, self.MonitorJobID, dashboard_report)
382          apmonFree()
383  
384 <    # prepare exit string
158 <    exit_string = str(exit_status)
159 <    for key in dashboard_report.keys() :
160 <        exit_string += ';' + str(key) + '=' + str(dashboard_report[key])
384 >        if self.debug == 1 : print dashboard_report
385  
386 <    
386 >        return
387  
388 <    return exit_string
388 >    def makeRanges(self,lumilist):
389 >        """ convert list to range """
390 >        
391 >        counter = lumilist[0]
392 >        lumilist.remove(counter)
393 >        tempRange=[]
394 >        tempRange.append(counter)
395 >        string = ''
396 >        for i in lumilist:
397 >            if i == counter+1:
398 >                tempRange.append(i)
399 >                counter +=1
400 >            else:
401 >                if len(tempRange)==1:
402 >                    string += "%s,"%tempRange[0]
403 >                else:
404 >                    string += "%s-%s,"%(tempRange[:1][0],tempRange[-1:][0])
405 >                counter = i
406 >                tempRange=[]
407 >                tempRange.append(counter)
408 >            if i == lumilist[-1:][0]   :
409 >                if len(tempRange)==1:
410 >                    string += "%s"%tempRange[0]
411 >                else:
412 >                    string += "%s-%s"%(tempRange[:1][0],tempRange[-1:][0])
413 >        return string
414 >        
415 >    def usage(self):
416 >        
417 >        msg="""
418 >        required parameters:
419 >        --input            :       input FJR xml file
420 >
421 >        optional parameters:
422 >        --dashboard        :       send info to the dashboard. require following args: "MonitorID,MonitorJobID"
423 >            MonitorID        :       DashBoard MonitorID
424 >            MonitorJobID     :       DashBoard MonitorJobID
425 >        --exitcode         :       print executable exit code
426 >        --lfn              :       report list of files really analyzed
427 >        --help             :       help
428 >        --debug            :       debug statements
429 >        """
430 >        return msg
431  
432  
433   if __name__ == '__main__' :
434 <    exit_status = main(sys.argv[1:])
435 <    # output for wrapper script
436 <    print exit_status
437 <
438 <    
434 >    try:
435 >        parseFjr_ = parseFjr(sys.argv[1:])
436 >        parseFjr_.run()  
437 >    except:
438 >        pass

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines