ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/apmon.py
(Generate patch)

Comparing COMP/CRAB/python/apmon.py (file contents):
Revision 1.4 by corvo, Tue Mar 21 16:05:47 2006 UTC vs.
Revision 1.5 by corvo, Mon Apr 3 08:57:32 2006 UTC

# Line 1 | Line 1
1  
2   """
3   * ApMon - Application Monitoring Tool
4 < * Version: 2.2.2
4 > * Version: 2.2.4
5   *
6   * Copyright (C) 2006 California Institute of Technology
7   *
# Line 48 | Line 48 | farm's store nor shown in the farm's win
48   import re
49   import xdrlib
50   import socket
51 < import urllib2
51 > import struct
52 > import StringIO
53   import threading
54   import time
55   import Logger
# Line 58 | Line 59 | import copy
59  
60   #__all__ = ["ApMon"]
61  
62 < #__debug = False   # self.destPrevData[destination];set this to True to be verbose
62 > #__debug = False # set this to True to be verbose
63  
64   class ApMon:
65          """
# Line 87 | Line 88 | class ApMon:
88  
89          __defaultOptions = {
90                  'job_monitoring': True,       # perform (or not) job monitoring
91 <                'job_interval'  : 10,         # at this interval (in seconds)
91 >                'job_interval'  : 120,         # at this interval (in seconds)
92                  'job_data_sent' : 0,          # time from Epoch when job information was sent; don't touch!
93  
94                  'job_cpu_time'  : True,       # elapsed time from the start of this job in seconds
# Line 104 | Line 105 | class ApMon:
105                  'job_open_files': True,       # number of open file descriptors
106  
107                  'sys_monitoring': True,       # perform (or not) system monitoring
108 <                'sys_interval'  : 10,         # at this interval (in seconds)
108 >                'sys_interval'  : 120,         # at this interval (in seconds)
109                  'sys_data_sent' : 0,          # time from Epoch when system information was sent; don't touch!
110  
111 <                'sys_cpu_usr'   : False,      # cpu-usage information
112 <                'sys_cpu_sys'   : False,      # all these will produce coresponding paramas without "sys_"
113 <                'sys_cpu_nice'  : False,
114 <                'sys_cpu_idle'  : False,
111 >                'sys_cpu_usr'   : True,      # cpu-usage information
112 >                'sys_cpu_sys'   : True,      # all these will produce coresponding paramas without "sys_"
113 >                'sys_cpu_nice'  : True,
114 >                'sys_cpu_idle'  : True,
115                  'sys_cpu_usage' : True,
116                  'sys_load1'     : True,       # system load information
117                  'sys_load5'     : True,
118                  'sys_load15'    : True,
119 <                'sys_mem_used'  : False,      # memory usage information
120 <                'sys_mem_free'  : False,
119 >                'sys_mem_used'  : True,      # memory usage information
120 >                'sys_mem_free'  : True,
121                  'sys_mem_usage' : True,
122 <                'sys_pages_in'  : False,
123 <                'sys_pages_out' : False,
122 >                'sys_pages_in'  : True,
123 >                'sys_pages_out' : True,
124                  'sys_swap_used' : True,       # swap usage information
125 <                'sys_swap_free' : False,
125 >                'sys_swap_free' : True,
126                  'sys_swap_usage': True,
127 <                'sys_swap_in'   : False,
128 <                'sys_swap_out'  : False,
127 >                'sys_swap_in'   : True,
128 >                'sys_swap_out'  : True,
129                  'sys_net_in'    : True,       # network transfer in kBps
130                  'sys_net_out'   : True,       # these will produce params called ethX_in, ethX_out, ethX_errs
131 <                'sys_net_errs'  : False,      # for each eth interface
131 >                'sys_net_errs'  : True,      # for each eth interface
132                  'sys_net_sockets' : True,     # number of opened sockets for each proto => sockets_tcp/udp/unix ...
133                  'sys_net_tcp_details' : True, # number of tcp sockets in each state => sockets_tcp_LISTEN, ...
134                  'sys_processes' : True,
135                  'sys_uptime'    : True,       # uptime of the machine, in days (float number)
136                  
137 <                'general_info'  : True,       # send (or not) general host information once every 2 $sys_interval seconds
137 >                'general_info'  : True,       # send (or not) general host information once every 2 x $sys_interval seconds
138                  'general_data_sent': 0,       # time from Epoch when general information was sent; don't touch!
139  
140                  'hostname'      : True,
# Line 170 | Line 171 | class ApMon:
171                  """
172                  self.destinations = {}              # empty, by default; key = tuple (host, port, pass) ; val = hash {"param_mame" : True/False, ...}
173                  self.destPrevData = {}              # empty, by defaul; key = tuple (host, port, pass) ; val = hash {"param_mame" : value, ...}
174 +                self.senderRef = {}                 # key = tuple (host, port, pass); val = hash {'INSTANCE_ID', 'SEQ_NR' }
175                  self.configAddresses = []           # empty, by default; list of files/urls from where we read config
176 <                self.configRecheckInterval = 120    # 2 minutes
176 >                self.configRecheckInterval = 600    # 10 minutes
177                  self.configRecheck = True           # enabled by default
178                  self.performBgMonitoring = True     # by default, perform background monitoring
179                  self.monitoredJobs = {}             # Monitored jobs; key = pid; value = hash with
180 <                self.maxMsgRate = 20;                   # Maximum number of messages allowed to be sent per second
181 <                self.sender = {'INSTANCE_ID': random.randint(0,0x7FFFFFFE), 'SEQ_NR': 0};
180 >                self.maxMsgRate = 10                # Maximum number of messages allowed to be sent per second
181 >                self.__defaultSenderRef = {'INSTANCE_ID': random.randint(0,0x7FFFFFFE), 'SEQ_NR': 0};
182                  self.__defaultUserCluster = "ApMon_UserSend";
183                  self.__defaultUserNode = socket.getfqdn();
184                  self.__defaultSysMonCluster = "ApMon_SysMon";
185                  self.__defaultSysMonNode = socket.getfqdn();
186                  # don't touch these:
187                  self.__freed = False
186                self.__initializedOK = True
188                  self.__udpSocket = None
189                  self.__configUpdateLock = threading.Lock()
190                  self.__configUpdateEvent = threading.Event()
# Line 200 | Line 201 | class ApMon:
201                  self.__crtDrop = 0;
202                  self.__hWeight = 0.92;
203                  self.logger = Logger.Logger(defaultLogLevel)
204 <                if type(initValue) == type("string"):
205 <                        self.configAddresses.append(initValue)
206 <                        self.__reloadAddresses()
207 <                elif type(initValue) == type([]):
208 <                        self.configAddresses = initValue
209 <                        self.__reloadAddresses()
210 <                elif type(initValue) == type(()):
211 <                        for dest in initValue:
212 <                                self.__addDestination (dest, self.destinations)
213 <                elif type(initValue) == type({}):
214 <                        for dest, opts in initValue.items():
215 <                                self.__addDestination (dest, self.destinations, opts)          
216 <                
217 <                self.__initializedOK = (len (self.destinations) > 0)
218 <                if not self.__initializedOK:
218 <                        self.logger.log(Logger.ERROR, "Failed to initialize. No destination defined.");
219 <                # self.__defaultClusterName = None
220 <                # self.__defaultNodeName = self.getMyHo
221 <                if self.__initializedOK:
222 <                        self.__udpSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
223 <                        if len(self.configAddresses) > 0:
224 <                                # if there are addresses that need to be monitored,
225 <                                # start config checking and reloading thread
226 <                                th = threading.Thread(target=self.__configLoader)
227 <                                th.setDaemon(True)  # this is a daemon thread
228 <                                th.start()
229 <                        # create the ProcInfo instance
230 <                        self.procInfo = ProcInfo.ProcInfo(self.logger);
231 <                        # self.procInfo.update();
232 <                        # start the background monitoring thread
233 <                        th = threading.Thread(target=self.__bgMonitor);
234 <                        th.setDaemon(True);
235 <                        th.start();
236 <
204 >                self.setDestinations(initValue)
205 >                self.__udpSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
206 >                if len(self.configAddresses) > 0:
207 >                        # if there are addresses that need to be monitored,
208 >                        # start config checking and reloading thread
209 >                        th = threading.Thread(target=self.__configLoader)
210 >                        th.setDaemon(True)  # this is a daemon thread
211 >                        th.start()
212 >                # create the ProcInfo instance
213 >                self.procInfo = ProcInfo.ProcInfo(self.logger);
214 >                # self.procInfo.update();
215 >                # start the background monitoring thread
216 >                th = threading.Thread(target=self.__bgMonitor);
217 >                th.setDaemon(True);
218 >                th.start();
219  
220          def sendParams (self, params):
221                  """
# Line 287 | Line 269 | class ApMon:
269                  
270                  NOTE that python doesn't know about 32-bit floats (only 64-bit floats!)
271                  """
290                if not self.__initializedOK:
291                        self.logger.log(Logger.WARNING, "Not initialized correctly. Message NOT sent!");
292                        return
272                  if (clusterName == None) or (clusterName == ""):
273                          clusterName = self.__defaultUserCluster
274                  else:
# Line 298 | Line 277 | class ApMon:
277                          nodeName = self.__defaultUserNode
278                  else:
279                          self.__defaultUserNode = nodeName
280 +                if len(self.destinations) == 0:
281 +                        self.logger.log(Logger.WARNING, "Not sending parameters since no destination is defined.");
282 +                        return
283                  self.__configUpdateLock.acquire();
284                  for dest in self.destinations.keys():
285 <                        self.__directSendParams(self.sender, dest, clusterName, nodeName, timeStamp, params);
285 >                        self.__directSendParams(dest, clusterName, nodeName, timeStamp, params);
286                  self.__configUpdateLock.release();
287          
288          def addJobToMonitor (self, pid, workDir, clusterName, nodeName):
# Line 347 | Line 329 | class ApMon:
329                  If mustSend == True, the information is sent regardles of the elapsed time since last sent
330                  If mustSend == False, the data is sent only if the required interval has passed since last sent
331                  """
332 +                if len(self.destinations) == 0:
333 +                        self.logger.log(Logger.WARNING, "Not sending bg monitoring info since no destination is defined.");
334 +                        return
335                  self.__bgMonitorLock.acquire();
336                  now = int(time.time());
337                  updatedProcInfo = False;
# Line 378 | Line 363 | class ApMon:
363                                                          sysParams.append(param);
364                                  options['general_data_sent'] = now;
365                                  
366 <                        if (not updatedProcInfo) and ((len(sysParams) > 0) or (len(jobParams) > 0) ):
366 >                        if (not updatedProcInfo) and (((len(sysParams) > 0) or (len(jobParams) > 0))):
367                                  self.procInfo.update();
368 <                                updatedProcInf = True;
368 >                                updatedProcInfo = True;
369  
370                          sysResults = {}
371                          if(len(sysParams) > 0):
372 <                                sysResults = self.procInfo.getSystemData(sysParams, prevRawData);
373 <                                self.updateLastSentTime(options, self.destinations[destination]);
374 <                        if( (type(sysResults) == type( {} )) and len(sysResults.keys()) > 0):
390 <                                self.__directSendParams(self.sender, destination, self.__defaultSysMonCluster, self.__defaultSysMonNode, -1, sysResults);
391 <                        if( (type(sysResults) == type( [] )) and len(sysResults) > 0):
392 <                                self.__directSendParams(self.sender, destination, self.__defaultSysMonCluster, self.__defaultSysMonNode, -1, sysResults);
372 >                                sysResults = self.procInfo.getSystemData(sysParams, prevRawData)
373 >                        if(len(sysResults) > 0):
374 >                                self.__directSendParams(destination, self.__defaultSysMonCluster, self.__defaultSysMonNode, -1, sysResults)
375                          for pid, props in self.monitoredJobs.items():
376 <                                jobResults = {};
376 >                                jobResults = {}
377                                  if(len(jobParams) > 0):
378 <                                        jobResults = self.procInfo.getJobData(pid, jobParams);
378 >                                        jobResults = self.procInfo.getJobData(pid, jobParams)
379                                  if(len(jobResults) > 0):
380 <                                        self.__directSendParams(self.sender, destination, props['CLUSTER_NAME'], props['NODE_NAME'], -1, jobResults);
380 >                                        self.__directSendParams(destination, props['CLUSTER_NAME'], props['NODE_NAME'], -1, jobResults)
381                  self.__bgMonitorLock.release();
382 +
383 +        def setDestinations(self, initValue):
384 +                """
385 +                Set the destinations of the ApMon instance. It accepts the same parameters as the constructor.
386 +                """
387 +                if type(initValue) == type("string"):
388 +                        self.configAddresses = [initValue]
389 +                        self.configRecheck = True
390 +                        self.configRecheckInterval = 600
391 +                        self.__reloadAddresses()
392 +                elif type(initValue) == type([]):
393 +                        self.configAddresses = initValue
394 +                        self.configRecheck = True
395 +                        self.configRecheckInterval = 600
396 +                        self.__reloadAddresses()
397 +                elif type(initValue) == type(()):
398 +                        self.configAddresses = []
399 +                        for dest in initValue:
400 +                                self.__addDestination (dest, self.destinations)
401 +                        self.configRecheck = False
402 +                elif type(initValue) == type({}):
403 +                        self.configAddresses = []
404 +                        for dest, opts in initValue.items():
405 +                                self.__addDestination (dest, self.destinations, opts)
406 +                        self.configRecheck = False
407          
408 <        # copy the time when last data was sent
409 <        def updateLastSentTime(self, srcOpts, dstOpts):
410 <                if srcOpts.has_key('general_data_sent'):
411 <                        dstOpts['general_data_sent'] = srcOpts['general_data_sent'];
412 <                if  srcOpts.has_key('sys_data_sent'):
413 <                        dstOpts['sys_data_sent'] = srcOpts['sys_data_sent'];
414 <                if srcOpts.has_key('job_data_sent'):
408 <                        dstOpts['job_data_sent'] = srcOpts['job_data_sent'];
409 <        
410 <        def setLogLevel (self, strLevel):
408 >        def initializedOK(self):
409 >                """
410 >                Retruns true if there is no destination where the parameters to be sent.
411 >                """
412 >                return len(self.destinations) > 0
413 >
414 >        def setLogLevel(self, strLevel):
415                  """
416                  Change the log level. Given level is a string, one of 'FATAL', 'ERROR', 'WARNING',
417                  'INFO', 'NOTICE', 'DEBUG'.
# Line 442 | Line 446 | class ApMon:
446          #########################################################################################
447          # Internal functions - Config reloader thread
448          #########################################################################################
449 <        
449 >
450          def __configLoader(self):
451                  """
452                  Main loop of the thread that checks for changes and reloads the configuration
453                  """
454                  while not self.__configUpdateEvent.isSet():
455 <                        self.__configUpdateEvent.wait(self.configRecheckInterval);
455 >                        self.__configUpdateEvent.wait(min(30, self.configRecheckInterval)) # don't recheck more often than 30 sec
456                          if self.__configUpdateEvent.isSet():
457                                  break
458                          if self.configRecheck:
# Line 458 | Line 462 | class ApMon:
462  
463          def __reloadAddresses(self):
464                  """
465 <                Refresh destinations hash, by loading data from all sources in configAddresses
465 >                Refresh now the destinations hash, by loading data from all sources in configAddresses
466                  """
467 +                print "reloading addresses";
468                  newDestinations = {}
469 <                for src in self.configAddresses:
469 >                urls = copy.deepcopy(self.configAddresses)
470 >                while(len(urls) > 0 and len(newDestinations) == 0):
471 >                        src = random.choice(urls)
472 >                        urls.remove(src)
473                          self.__initializeFromFile(src, newDestinations)
474                  # avoid changing config in the middle of sending packets to previous destinations
475                  self.__configUpdateLock.acquire()
476                  self.destinations = newDestinations
477                  self.__configUpdateLock.release()
478 +                print "finished reloading addresses";
479  
480          def __addDestination (self, aDestination, tempDestinations, options = __defaultOptions):
481                  """
# Line 502 | Line 511 | class ApMon:
511                          return
512                  alreadyAdded = False
513                  port = int(port)
514 <                host = socket.gethostbyname(host) # convert hostnames to IP addresses to avoid suffocating DNSs
514 >                try:
515 >                        host = socket.gethostbyname(host) # convert hostnames to IP addresses to avoid suffocating DNSs
516 >                except socket.error, msg:
517 >                        self.logger.log(Logger.ERROR, "Error resolving "+host+": "+str(msg))
518 >                        return
519                  for h, p, w in tempDestinations.keys():
520                          if (h == host) and (p == port):
521                                  alreadyAdded = True
522                                  break
523 <                destination = (host, port, passwd);
523 >                destination = (host, port, passwd)
524                  if not alreadyAdded:
525 <                        self.logger.log(Logger.INFO, "Adding destination "+host+':'+`port`+' '+passwd);
526 <                        tempDestinations[destination] = copy.deepcopy(self.__defaultOptions); # have a different set of options for each dest.
527 <                        self.destPrevData[destination] = {};
525 >                        self.logger.log(Logger.INFO, "Adding destination "+host+':'+`port`+' '+passwd)
526 >                        if(self.destinations.has_key(destination)):
527 >                                tempDestinations[destination] = self.destinations[destination]  # reuse previous options
528 >                        else:
529 >                                tempDestinations[destination] = copy.deepcopy(self.__defaultOptions)  # have a different set of options for each dest
530 >                        if not self.destPrevData.has_key(destination):
531 >                                self.destPrevData[destination] = {}     # set it empty only if it's really new
532 >                        if not self.senderRef.has_key(destination):
533 >                                self.senderRef[destination] = copy.deepcopy(self.__defaultSenderRef) # otherwise, don't reset this nr.
534                          if options != self.__defaultOptions:
535                                  # we have to overwrite defaults with given options
536                                  for key, value in options.items():
537 <                                        self.logger.log(Logger.NOTICE, "Overwritting option: "+key+" = "+`value`);
538 <                                        tempDestinations[destination][key] = value;
537 >                                        self.logger.log(Logger.NOTICE, "Overwritting option: "+key+" = "+`value`)
538 >                                        tempDestinations[destination][key] = value
539                  else:
540                          self.logger.log(Logger.NOTICE, "Destination "+host+":"+str(port)+" "+passwd+" already added. Skipping it");
541  
# Line 530 | Line 549 | class ApMon:
549                  """
550                  try:
551                          if confFileName.find ("http://") == 0:
552 <                                confFile = urllib2.urlopen (confFileName)
552 >                                confFile = self.__getURL(confFileName)
553 >                                if confFile is None:
554 >                                        return
555                          else:
556                                  confFile = open (confFileName)
536                except urllib2.HTTPError, e:
537                        self.logger.log(Logger.ERROR, "Cannot open "+confFileName);
538                        if e.code == 401:
539                                self.logger.log(Logger.ERROR, 'HTTPError: not authorized.');
540                        elif e.code == 404:
541                                self.logger.log(Logger.ERROR, 'HTTPError: not found.');
542                        elif e.code == 503:
543                                self.logger.log(Logger.ERROR, 'HTTPError: service unavailable.');
544                        else:
545                                self.logger.log(Logger.ERROR, 'HTTPError: unknown error.');
546                        return
547                except urllib2.URLError, ex:
548                        self.logger.log(Logger.ERROR, "Cannot open "+confFileName);
549                        self.logger.log(Logger.ERROR, "URL Error: "+str(ex.reason[1]));
550                        return
557                  except IOError, ex:
558                          self.logger.log(Logger.ERROR, "Cannot open "+confFileName);
559                          self.logger.log(Logger.ERROR, "IOError: "+str(ex));
# Line 610 | Line 616 | class ApMon:
616          ###############################################################################################
617          # Internal helper functions
618          ###############################################################################################
619 +
620 +        # this is a simplified replacement for urllib2 which doesn't support setting a timeout.
621 +        # by default, if timeout is not specified, it waits 5 seconds
622 +        def __getURL (self, url, timeout = 5):
623 +                r = re.compile("http://([^:/]+)(:(\d+))?(/.*)").match(url)
624 +                if r is None:
625 +                        self.logger.log(Logger.ERROR, "Cannot open "+url+". Incorrectly formed URL.")
626 +                        return None
627 +                host = r.group(1)
628 +                if r.group(3) == None:
629 +                        port = 80       # no port is given, pick the default 80 for HTTP
630 +                else:
631 +                        port = int(r.group(3))
632 +                if r.group(4) == None:
633 +                        path = ""       # no path is give, let server decide
634 +                else:
635 +                        path = r.group(4)
636 +                sock = None
637 +                err = None
638 +                try:
639 +                        for res in socket.getaddrinfo(host, port, socket.AF_UNSPEC, socket.SOCK_STREAM):
640 +                                af, socktype, proto, canonname, sa = res
641 +                                try:
642 +                                        sock = socket.socket(af, socktype, proto)
643 +                                except socket.error, msg:
644 +                                        sock = None
645 +                                        err = msg
646 +                                        continue
647 +                                try:
648 +                                        sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDTIMEO, struct.pack("ii", timeout, 0))
649 +                                        sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVTIMEO, struct.pack("ii", timeout, 0))
650 +                                        sock.connect(sa)
651 +                                except socket.error, msg:
652 +                                        sock.close()
653 +                                        sock = None
654 +                                        err = msg
655 +                                        continue
656 +                                break
657 +                except socket.error, msg:
658 +                        sock = None
659 +                        err = msg
660 +                if sock is None:
661 +                        self.logger.log(Logger.ERROR, "Cannot open "+url)
662 +                        self.logger.log(Logger.ERROR, "SocketError: "+str(err))
663 +                        return None
664 +                try:
665 +                        sock.send("GET "+path+" HTTP/1.0\n\n");
666 +                        data = sock.recv(8192)
667 +                        file = StringIO.StringIO(data)
668 +                        httpStatus = 0
669 +                        while True:
670 +                                line = file.readline().strip()
671 +                                if line == "":
672 +                                        break  # exit at the end of file or at the first empty line (finish of http headers)
673 +                                r = re.compile("HTTP/\d.\d (\d+)").match(line)
674 +                                if r != None:
675 +                                        httpStatus = int(r.group(1))
676 +                        if httpStatus == 200:
677 +                                return file
678 +                        else:
679 +                                self.logger.log(Logger.ERROR, "Cannot open "+url)
680 +                                if httpStatus == 401:
681 +                                        self.logger.log(Logger.ERROR, 'HTTPError: not authorized ['+str(httpStatus)+']')
682 +                                elif httpStatus == 404:
683 +                                        self.logger.log(Logger.ERROR, 'HTTPError: not found ['+str(httpStatus)+']')
684 +                                elif httpStatus == 503:
685 +                                        self.logger.log(Logger.ERROR, 'HTTPError: service unavailable ['+str(httpStatus)+']')
686 +                                else:
687 +                                        self.logger.log(Logger.ERROR, 'HTTPError: unknown error ['+str(httpStatus)+']')
688 +                                return None
689 +                except socket.error, msg:
690 +                        self.logger.log(Logger.ERROR, "Cannot open "+url)
691 +                        self.logger.log(Logger.ERROR, "SocketError: "+str(msg))
692 +                        sock.close()
693 +                        return None
694          
695 <        def __directSendParams (self, senderRef, destination, clusterName, nodeName, timeStamp, params):
695 >        def __directSendParams (self, destination, clusterName, nodeName, timeStamp, params):
696                  
697                  if self.__shouldSend() == False:
698                          self.logger.log(Logger.DEBUG, "Dropping packet since rate is too fast!");
# Line 622 | Line 703 | class ApMon:
703                          return;
704                  
705                  host, port, passwd = destination
706 <                senderRef['SEQ_NR'] = (senderRef['SEQ_NR'] + 1) % 2000000000; # wrap around 2 mld
706 >                crtSenderRef = self.senderRef[destination]
707 >                crtSenderRef['SEQ_NR'] = (crtSenderRef['SEQ_NR'] + 1) % 2000000000; # wrap around 2 mld
708                  
709 <                xdrPacker = xdrlib.Packer ();
628 <                self.logger.log(Logger.DEBUG, "Building XDR packet for ["+str(clusterName)+"/"+str(nodeName)+"] <"+str(senderRef['SEQ_NR'])+"/"+str(senderRef['INSTANCE_ID'])+"> len:"+str(len(params)));
709 >                xdrPacker = xdrlib.Packer ()
710                  
711                  xdrPacker.pack_string ("v:"+self.__version+"p:"+passwd)
712                  
713 <                xdrPacker.pack_int (senderRef['INSTANCE_ID']);
714 <                xdrPacker.pack_int (senderRef['SEQ_NR']);
713 >                xdrPacker.pack_int (crtSenderRef['INSTANCE_ID'])
714 >                xdrPacker.pack_int (crtSenderRef['SEQ_NR'])
715                  
716 <                xdrPacker.pack_string (clusterName);
717 <                xdrPacker.pack_string (nodeName);
718 <                xdrPacker.pack_int (len(params));
716 >                xdrPacker.pack_string (clusterName)
717 >                xdrPacker.pack_string (nodeName)
718 >
719 >                sent_params_nr = 0
720 >                paramsPacker = xdrlib.Packer ()
721                  
722                  if type(params) == type( {} ):
723                          for name, value in params.iteritems():
724 <                                self.__packParameter(xdrPacker, name, value)
724 >                                if self.__packParameter(paramsPacker, name, value):
725 >                                        sent_params_nr += 1
726                  elif type(params) == type( [] ):
727                          for name, value in params:
728                                  self.logger.log(Logger.DEBUG, "Adding parameter "+name+" = "+str(value));
729 <                                self.__packParameter(xdrPacker, name, value)
729 >                                if self.__packParameter(paramsPacker, name, value):
730 >                                        sent_params_nr += 1
731                  else:
732                          self.logger.log(Logger.WARNING, "Unsupported params type in sendParameters: " + str(type(params)));
733 +                
734 +                xdrPacker.pack_int (sent_params_nr)
735 +                
736                  if (timeStamp != None) and (timeStamp > 0):
737 <                        xdrPacker.pack_int(timeStamp);
737 >                        paramsPacker.pack_int(timeStamp);
738                  
739 <                buffer = xdrPacker.get_buffer();
739 >                buffer = xdrPacker.get_buffer() + paramsPacker.get_buffer()
740 >                self.logger.log(Logger.NOTICE, "Building XDR packet ["+str(clusterName)+"/"+str(nodeName)+"] <"+str(crtSenderRef['SEQ_NR'])+"/"+str(crtSenderRef['INSTANCE_ID'])+"> "+str(sent_params_nr)+" params, "+str(len(buffer))+" bytes.");
741                  # send this buffer to the destination, using udp datagrams
742                  try:
743                          self.__udpSocket.sendto(buffer, (host, port))
744 <                        self.logger.log(Logger.DEBUG, "Packet sent to "+host+":"+str(port)+" "+passwd);
744 >                        self.logger.log(Logger.NOTICE, "Packet sent to "+host+":"+str(port)+" "+passwd)
745                  except socket.error, msg:
746 <                        self.logger.log(Logger.ERROR, "Cannot send packet to "+host+":"+str(port)+" "+passwd+": "+str(msg[1]));
746 >                        self.logger.log(Logger.ERROR, "Cannot send packet to "+host+":"+str(port)+" "+passwd+": "+str(msg[1]))
747                  xdrPacker.reset()
748 +                paramsPacker.reset()
749          
750          def __packParameter(self, xdrPacker, name, value):
751                  if (name is None) or (name is ""):
752 <                        self.logger.log(Logger.WARNING, "Undefine parameter name.");
753 <                        return;
752 >                        self.logger.log(Logger.WARNING, "Undefined parameter name. Ignoring value "+str(value))
753 >                        return False
754                  if (value is None):
755 <                        self.logger.log(Logger.WARNING, "Ignore " + str(name)+ " parameter because of None value");
756 <                        return;
755 >                        self.logger.log(Logger.WARNING, "Ignore " + str(name)+ " parameter because of None value")
756 >                        return False
757                  try:
758                          typeValue = self.__valueTypes[type(value)]
759                          xdrPacker.pack_string (name)
760                          xdrPacker.pack_int (typeValue)
761                          self.__packFunctions[typeValue] (xdrPacker, value)
762 <                        self.logger.log(Logger.DEBUG, "Adding parameter "+str(name)+" = "+str(value));
762 >                        self.logger.log(Logger.DEBUG, "Adding parameter "+str(name)+" = "+str(value))
763 >                        return True
764                  except Exception, ex:
765                          self.logger.log(Logger.WARNING, "Error packing %s = %s; got %s" % (name, str(value), ex))
766 +                        return False
767  
768          # Destructor
769          def __del__(self):
# Line 728 | Line 820 | class ApMon:
820                  5: xdrlib.Packer.pack_double }
821          
822          __defaultPort = 8884
823 <        __version = "2.2.2-py"                  # apMon version number
823 >        __version = "2.2.4-py"                  # apMon version number
824  

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines