53 |
|
import time |
54 |
|
import Logger |
55 |
|
import ProcInfo |
56 |
+ |
import random |
57 |
|
|
58 |
|
#__all__ = ["ApMon"] |
59 |
|
|
83 |
|
- configRecheck - boolean - whether to recheck periodically for changes |
84 |
|
in the configAddresses list |
85 |
|
""" |
85 |
– |
destinations = {} # empty, by default; key = tuple (host, port, pass) ; val = hash {"param_mame" : True/False, ...} |
86 |
– |
configAddresses = [] # empty, by default; list of files/urls from where we read config |
87 |
– |
configRecheckInterval = 120 # 2 minutes |
88 |
– |
configRecheck = True # enabled by default |
89 |
– |
performBgMonitoring = True # by default, perform background monitoring |
90 |
– |
monitoredJobs = {} # Monitored jobs; key = pid; value = hash with |
86 |
|
|
87 |
|
__defaultOptions = { |
88 |
|
'job_monitoring': True, # perform (or not) job monitoring |
100 |
|
'job_disk_used' : True, # size in MB of the used disk partition containing the working directory |
101 |
|
'job_disk_free' : True, # size in MB of the free disk partition containing the working directory |
102 |
|
'job_disk_usage': True, # percent of the used disk partition containing the working directory |
103 |
+ |
'job_open_files': True, # number of open file descriptors |
104 |
|
|
105 |
|
'sys_monitoring': True, # perform (or not) system monitoring |
106 |
|
'sys_interval' : 10, # at this interval (in seconds) |
138 |
|
'cpu_MHz' : True, |
139 |
|
'no_CPUs' : True, # number of CPUs |
140 |
|
'total_mem' : True, |
141 |
< |
'total_swap' : True}; |
141 |
> |
'total_swap' : True, |
142 |
> |
'cpu_vendor_id' : True, |
143 |
> |
'cpu_family' : True, |
144 |
> |
'cpu_model' : True, |
145 |
> |
'cpu_model_name': True, |
146 |
> |
'bogomips' : True}; |
147 |
|
|
148 |
|
def __init__ (self, initValue): |
149 |
|
""" |
165 |
|
val = hash{'param_name': True/False, ...}) the given options for each destination |
166 |
|
will overwrite the default parameters (see __defaultOptions) |
167 |
|
""" |
168 |
+ |
self.destinations = {} # empty, by default; key = tuple (host, port, pass) ; val = hash {"param_mame" : True/False, ...} |
169 |
+ |
self.configAddresses = [] # empty, by default; list of files/urls from where we read config |
170 |
+ |
self.configRecheckInterval = 120 # 2 minutes |
171 |
+ |
self.configRecheck = True # enabled by default |
172 |
+ |
self.performBgMonitoring = True # by default, perform background monitoring |
173 |
+ |
self.monitoredJobs = {} # Monitored jobs; key = pid; value = hash with |
174 |
+ |
self.maxMsgRate = 50; # Maximum number of messages allowed to be sent per second |
175 |
+ |
self.__defaultUserCluster = "ApMon_UserSend"; |
176 |
+ |
self.__defaultUserNode = socket.getfqdn(); |
177 |
+ |
self.__defaultSysMonCluster = "ApMon_SysMon"; |
178 |
+ |
self.__defaultSysMonNode = socket.getfqdn(); |
179 |
+ |
# don't touch these: |
180 |
+ |
self.__initializedOK = True |
181 |
+ |
self.__udpSocket = None |
182 |
+ |
self.__configUpdateLock = threading.Lock() |
183 |
+ |
self.__configUpdateEvent = threading.Event() |
184 |
+ |
self.__bgMonitorLock = threading.Lock() |
185 |
+ |
self.__bgMonitorEvent = threading.Event() |
186 |
+ |
# don't allow a user to send more than MAX_MSG messages per second, in average |
187 |
+ |
self.__crtTime = 0; |
188 |
+ |
self.__prvTime = 0; |
189 |
+ |
self.__prvSent = 0; |
190 |
+ |
self.__prvDrop = 0; |
191 |
+ |
self.__crtSent = 0; |
192 |
+ |
self.__crtDrop = 0; |
193 |
+ |
self.__hWeight = 0.92; |
194 |
+ |
|
195 |
|
self.logger = Logger.Logger(self.__defaultLogLevel) |
196 |
|
if type(initValue) == type("string"): |
197 |
|
self.configAddresses.append(initValue) |
384 |
|
""" |
385 |
|
self.logger.setLogLevel(strLevel); |
386 |
|
|
387 |
+ |
def setMaxMsgRate(self, rate): |
388 |
+ |
""" |
389 |
+ |
Set the maximum number of messages that can be sent, per second. |
390 |
+ |
""" |
391 |
+ |
self.maxMsgRate = rate; |
392 |
+ |
self.logger.log(Logger.DEBUG, "Setting maxMsgRate to: " + str(rate)); |
393 |
+ |
|
394 |
+ |
def free(self): |
395 |
+ |
""" |
396 |
+ |
Stop background threands, close opened sockets. You have to use this function if you want to |
397 |
+ |
free all the resources that ApMon takes, and allow it to be garbage-collected. |
398 |
+ |
""" |
399 |
+ |
self.__configUpdateEvent.set(); |
400 |
+ |
self.__bgMonitorEvent.set(); |
401 |
+ |
time.sleep(0.01); |
402 |
+ |
if self.__udpSocket != None: |
403 |
+ |
self.logger.log(Logger.DEBUG, "Closing UDP socket on ApMon object destroy."); |
404 |
+ |
self.__udpSocket.close(); |
405 |
+ |
|
406 |
+ |
|
407 |
|
######################################################################################### |
408 |
|
# Internal functions - Config reloader thread |
409 |
|
######################################################################################### |
412 |
|
""" |
413 |
|
Main loop of the thread that checks for changes and reloads the configuration |
414 |
|
""" |
415 |
< |
while True: |
416 |
< |
time.sleep(self.configRecheckInterval) |
415 |
> |
while not self.__configUpdateEvent.isSet(): |
416 |
> |
self.__configUpdateEvent.wait(self.configRecheckInterval); |
417 |
> |
if self.__configUpdateEvent.isSet(): |
418 |
> |
return; |
419 |
|
if self.configRecheck: |
420 |
|
self.__reloadAddresses() |
421 |
|
self.logger.log(Logger.DEBUG, "Config reloaded. Seleeping for "+`self.configRecheckInterval`+" sec."); |
480 |
|
self.logger.log(Logger.NOTICE, "Overwritting option: "+key+" = "+`value`); |
481 |
|
tempDestinations[(host, port, passwd)][key] = value; |
482 |
|
else: |
483 |
< |
self.logger.log(Logger.NOTICE, "Destination "+host+":"+port+" "+passwd+" already added. Skipping it"); |
483 |
> |
self.logger.log(Logger.NOTICE, "Destination "+host+":"+str(port)+" "+passwd+" already added. Skipping it"); |
484 |
|
|
485 |
|
def __initializeFromFile (self, confFileName, tempDestinations): |
486 |
|
""" |
539 |
|
value = int(value); |
540 |
|
if param == "loglevel": |
541 |
|
self.logger.setLogLevel(value); |
542 |
+ |
elif param == "maxMsgRate": |
543 |
+ |
self.setMaxMsgRate(int(value)); |
544 |
|
elif param == "conf_recheck": |
545 |
|
self.configRecheck = value; |
546 |
|
elif param == "recheck_interval": |
560 |
|
############################################################################################### |
561 |
|
|
562 |
|
def __bgMonitor (self): |
563 |
< |
while True: |
564 |
< |
time.sleep(10); |
563 |
> |
while not self.__bgMonitorEvent.isSet(): |
564 |
> |
self.__bgMonitorEvent.wait(10); |
565 |
> |
if self.__bgMonitorEvent.isSet(): |
566 |
> |
return; |
567 |
|
if self.performBgMonitoring: |
568 |
|
self.sendBgMonitoring(); |
569 |
|
|
572 |
|
############################################################################################### |
573 |
|
|
574 |
|
def __directSendParams (self, destination, clusterName, nodeName, timeStamp, params): |
575 |
+ |
|
576 |
+ |
if self.__shouldSend() == False: |
577 |
+ |
return; |
578 |
+ |
|
579 |
|
xdrPacker = xdrlib.Packer () |
580 |
|
host, port, passwd = destination |
581 |
|
self.logger.log(Logger.DEBUG, "Building XDR packet for ["+str(clusterName)+"] <"+str(nodeName)+"> len:"+str(len(params))); |
615 |
|
|
616 |
|
# Destructor |
617 |
|
def __del__(self): |
618 |
< |
self.__udpSocket.close(); |
618 |
> |
self.free(); |
619 |
> |
|
620 |
> |
# Decide if the current datagram should be sent. |
621 |
> |
# This decision is based on the number of messages previously sent. |
622 |
> |
def __shouldSend(self): |
623 |
> |
now = long(time.time()); |
624 |
> |
if now != self.__crtTime : |
625 |
> |
# new time |
626 |
> |
# update previous counters; |
627 |
> |
self.__prvSent = self.__hWeight * self.__prvSent + (1.0 - self.__hWeight) * self.__crtSent / (now - self.__crtTime); |
628 |
> |
self.__prvTime = self.__crtTime; |
629 |
> |
self.logger.log(Logger.DEBUG, "previously sent: " + str(self.__crtSent) + "; dropped: " + str(self.__crtDrop)); |
630 |
> |
# reset current counter |
631 |
> |
self.__crtTime = now; |
632 |
> |
self.__crtSent = 0; |
633 |
> |
self.__crtDrop = 0; |
634 |
> |
|
635 |
> |
# compute the history |
636 |
> |
valSent = self.__prvSent * self.__hWeight + self.__crtSent * (1 - self.__hWeight); |
637 |
> |
|
638 |
> |
doSend = True; |
639 |
> |
|
640 |
> |
# when we should start dropping messages |
641 |
> |
level = self.maxMsgRate - self.maxMsgRate / 10; |
642 |
> |
|
643 |
> |
if valSent > (self.maxMsgRate - level) : |
644 |
> |
if random.randint(0,self.maxMsgRate / 10) >= (self.maxMsgRate - valSent): |
645 |
> |
doSend = False; |
646 |
> |
|
647 |
> |
# counting sent and dropped messages |
648 |
> |
if doSend: |
649 |
> |
self.__crtSent+=1; |
650 |
> |
else: |
651 |
> |
self.__crtDrop+=1; |
652 |
> |
|
653 |
> |
return doSend; |
654 |
> |
|
655 |
|
|
656 |
|
################################################################################################ |
657 |
|
# Private variables. Don't touch |
667 |
|
2: xdrlib.Packer.pack_int, |
668 |
|
5: xdrlib.Packer.pack_double } |
669 |
|
|
576 |
– |
__defaultUserCluster = "ApMon_UserSend"; |
577 |
– |
__defaultUserNode = socket.getfqdn(); |
578 |
– |
__defaultSysMonCluster = "ApMon_SysMon"; |
579 |
– |
__defaultSysMonNode = socket.getfqdn(); |
580 |
– |
|
581 |
– |
__initializedOK = True |
582 |
– |
__configUpdateLock = threading.Lock() |
583 |
– |
__bgMonitorLock = threading.Lock() |
584 |
– |
|
670 |
|
__defaultPort = 8884 |
671 |
|
__defaultLogLevel = Logger.INFO |
672 |
< |
__version = "2.0.2-py" # apMon version number |
672 |
> |
__version = "2.0.6-py" # apMon version number |
673 |
|
|