ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/apmon.py
(Generate patch)

Comparing COMP/CRAB/python/apmon.py (file contents):
Revision 1.1 by corvo, Tue Aug 30 13:15:51 2005 UTC vs.
Revision 1.2 by corvo, Tue Nov 8 16:02:39 2005 UTC

# Line 53 | Line 53 | import threading
53   import time
54   import Logger
55   import ProcInfo
56 + import random
57  
58   #__all__ = ["ApMon"]
59  
# Line 82 | Line 83 | class ApMon:
83          - configRecheck - boolean - whether to recheck periodically for changes
84            in the configAddresses list
85          """
85        destinations = {}              # empty, by default; key = tuple (host, port, pass) ; val = hash {"param_mame" : True/False, ...}
86        configAddresses = []           # empty, by default; list of files/urls from where we read config
87        configRecheckInterval = 120    # 2 minutes
88        configRecheck = True           # enabled by default
89        performBgMonitoring = True          # by default, perform background monitoring
90        monitoredJobs = {}             # Monitored jobs; key = pid; value = hash with
86  
87          __defaultOptions = {
88                  'job_monitoring': True,       # perform (or not) job monitoring
# Line 105 | Line 100 | class ApMon:
100                  'job_disk_used' : True,       # size in MB of the used disk partition containing the working directory
101                  'job_disk_free' : True,       # size in MB of the free disk partition containing the working directory
102                  'job_disk_usage': True,       # percent of the used disk partition containing the working directory
103 +                'job_open_files': True,       # number of open file descriptors
104  
105                  'sys_monitoring': True,       # perform (or not) system monitoring
106                  'sys_interval'  : 10,         # at this interval (in seconds)
# Line 142 | Line 138 | class ApMon:
138                  'cpu_MHz'       : True,
139                  'no_CPUs'       : True,       # number of CPUs
140                  'total_mem'     : True,
141 <                'total_swap'    : True};
141 >                'total_swap'    : True,
142 >                'cpu_vendor_id' : True,
143 >                'cpu_family'    : True,
144 >                'cpu_model'     : True,
145 >                'cpu_model_name': True,
146 >                'bogomips'      : True};
147  
148          def __init__ (self, initValue):
149                  """
# Line 164 | Line 165 | class ApMon:
165                    val = hash{'param_name': True/False, ...}) the given options for each destination
166                    will overwrite the default parameters (see __defaultOptions)
167                  """
168 +                self.destinations = {}              # empty, by default; key = tuple (host, port, pass) ; val = hash {"param_mame" : True/False, ...}
169 +                self.configAddresses = []           # empty, by default; list of files/urls from where we read config
170 +                self.configRecheckInterval = 120    # 2 minutes
171 +                self.configRecheck = True           # enabled by default
172 +                self.performBgMonitoring = True     # by default, perform background monitoring
173 +                self.monitoredJobs = {}             # Monitored jobs; key = pid; value = hash with
174 +                self.maxMsgRate = 50;               # Maximum number of messages allowed to be sent per second
175 +                self.__defaultUserCluster = "ApMon_UserSend";
176 +                self.__defaultUserNode = socket.getfqdn();
177 +                self.__defaultSysMonCluster = "ApMon_SysMon";
178 +                self.__defaultSysMonNode = socket.getfqdn();
179 +                # don't touch these:
180 +                self.__initializedOK = True
181 +                self.__udpSocket = None
182 +                self.__configUpdateLock = threading.Lock()
183 +                self.__configUpdateEvent = threading.Event()
184 +                self.__bgMonitorLock = threading.Lock()
185 +                self.__bgMonitorEvent = threading.Event()
186 +                # don't allow a user to send more than MAX_MSG messages per second, in average
187 +                self.__crtTime = 0;
188 +                self.__prvTime = 0;
189 +                self.__prvSent = 0;
190 +                self.__prvDrop = 0;
191 +                self.__crtSent = 0;
192 +                self.__crtDrop = 0;
193 +                self.__hWeight = 0.92;
194 +        
195                  self.logger = Logger.Logger(self.__defaultLogLevel)
196                  if type(initValue) == type("string"):
197                          self.configAddresses.append(initValue)
# Line 356 | Line 384 | class ApMon:
384                  """
385                  self.logger.setLogLevel(strLevel);
386          
387 +        def setMaxMsgRate(self, rate):
388 +                """
389 +                Set the maximum number of messages that can be sent, per second.
390 +                """
391 +                self.maxMsgRate = rate;
392 +                self.logger.log(Logger.DEBUG, "Setting maxMsgRate to: " + str(rate));
393 +        
394 +        def free(self):
395 +                """
396 +                Stop background threands, close opened sockets. You have to use this function if you want to
397 +                free all the resources that ApMon takes, and allow it to be garbage-collected.
398 +                """
399 +                self.__configUpdateEvent.set();
400 +                self.__bgMonitorEvent.set();
401 +                time.sleep(0.01);
402 +                if self.__udpSocket != None:
403 +                        self.logger.log(Logger.DEBUG, "Closing UDP socket on ApMon object destroy.");
404 +                        self.__udpSocket.close();
405 +
406 +
407          #########################################################################################
408          # Internal functions - Config reloader thread
409          #########################################################################################
# Line 364 | Line 412 | class ApMon:
412                  """
413                  Main loop of the thread that checks for changes and reloads the configuration
414                  """
415 <                while True:
416 <                        time.sleep(self.configRecheckInterval)
415 >                while not self.__configUpdateEvent.isSet():
416 >                        self.__configUpdateEvent.wait(self.configRecheckInterval);
417 >                        if self.__configUpdateEvent.isSet():
418 >                                return;
419                          if self.configRecheck:
420                                  self.__reloadAddresses()
421                                  self.logger.log(Logger.DEBUG, "Config reloaded. Seleeping for "+`self.configRecheckInterval`+" sec.");
# Line 430 | Line 480 | class ApMon:
480                                          self.logger.log(Logger.NOTICE, "Overwritting option: "+key+" = "+`value`);
481                                          tempDestinations[(host, port, passwd)][key] = value;
482                  else:
483 <                        self.logger.log(Logger.NOTICE, "Destination "+host+":"+port+" "+passwd+" already added. Skipping it");
483 >                        self.logger.log(Logger.NOTICE, "Destination "+host+":"+str(port)+" "+passwd+" already added. Skipping it");
484  
485          def __initializeFromFile (self, confFileName, tempDestinations):
486                  """
# Line 489 | Line 539 | class ApMon:
539                                                          value = int(value);
540                                                  if param == "loglevel":
541                                                          self.logger.setLogLevel(value);
542 +                                                elif param == "maxMsgRate":
543 +                                                        self.setMaxMsgRate(int(value));
544                                                  elif param == "conf_recheck":
545                                                          self.configRecheck = value;
546                                                  elif param == "recheck_interval":
# Line 508 | Line 560 | class ApMon:
560          ###############################################################################################
561  
562          def __bgMonitor (self):
563 <                while True:
564 <                        time.sleep(10);
563 >                while not self.__bgMonitorEvent.isSet():
564 >                        self.__bgMonitorEvent.wait(10);
565 >                        if self.__bgMonitorEvent.isSet():
566 >                                return;
567                          if self.performBgMonitoring:
568                                  self.sendBgMonitoring();
569  
# Line 518 | Line 572 | class ApMon:
572          ###############################################################################################
573          
574          def __directSendParams (self, destination, clusterName, nodeName, timeStamp, params):
575 +                
576 +                if self.__shouldSend() == False:
577 +                        return;
578 +                
579                  xdrPacker = xdrlib.Packer ()
580                  host, port, passwd = destination
581                  self.logger.log(Logger.DEBUG, "Building XDR packet for ["+str(clusterName)+"] <"+str(nodeName)+"> len:"+str(len(params)));
# Line 557 | Line 615 | class ApMon:
615          
616          # Destructor
617          def __del__(self):
618 <                self.__udpSocket.close();
618 >                self.free();
619 >
620 >        # Decide if the current datagram should be sent.
621 >        # This decision is based on the number of messages previously sent.
622 >        def __shouldSend(self):
623 >                now = long(time.time());
624 >                if now != self.__crtTime :
625 >                        # new time
626 >                        # update previous counters;
627 >                        self.__prvSent = self.__hWeight * self.__prvSent + (1.0 - self.__hWeight) * self.__crtSent / (now - self.__crtTime);
628 >                        self.__prvTime = self.__crtTime;
629 >                        self.logger.log(Logger.DEBUG, "previously sent: " + str(self.__crtSent) + "; dropped: " + str(self.__crtDrop));
630 >                        # reset current counter
631 >                        self.__crtTime = now;
632 >                        self.__crtSent = 0;
633 >                        self.__crtDrop = 0;
634 >                
635 >                # compute the history
636 >                valSent = self.__prvSent * self.__hWeight + self.__crtSent * (1 - self.__hWeight);
637 >                
638 >                doSend = True;
639 >                        
640 >                # when we should start dropping messages
641 >                level = self.maxMsgRate - self.maxMsgRate / 10;
642 >        
643 >                if valSent > (self.maxMsgRate - level) :
644 >                        if random.randint(0,self.maxMsgRate / 10) >= (self.maxMsgRate - valSent):
645 >                                doSend = False;
646 >        
647 >                # counting sent and dropped messages
648 >                if doSend:
649 >                        self.__crtSent+=1;
650 >                else:
651 >                        self.__crtDrop+=1;
652 >
653 >                return doSend;
654 >
655          
656          ################################################################################################
657          # Private variables. Don't touch
# Line 573 | Line 667 | class ApMon:
667                  2: xdrlib.Packer.pack_int,
668                  5: xdrlib.Packer.pack_double }
669          
576        __defaultUserCluster = "ApMon_UserSend";
577        __defaultUserNode = socket.getfqdn();
578        __defaultSysMonCluster = "ApMon_SysMon";
579        __defaultSysMonNode = socket.getfqdn();
580        
581        __initializedOK = True
582        __configUpdateLock = threading.Lock()
583        __bgMonitorLock = threading.Lock()
584        
670          __defaultPort = 8884
671          __defaultLogLevel = Logger.INFO
672 <        __version = "2.0.2-py"                  # apMon version number
672 >        __version = "2.0.6-py"                  # apMon version number
673  

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines