1 |
|
|
2 |
|
""" |
3 |
|
* ApMon - Application Monitoring Tool |
4 |
< |
* Version: 2.0.4 |
4 |
> |
* Version: 2.2.1 |
5 |
|
* |
6 |
< |
* Copyright (C) 2005 California Institute of Technology |
6 |
> |
* Copyright (C) 2006 California Institute of Technology |
7 |
|
* |
8 |
|
* Permission is hereby granted, free of charge, to use, copy and modify |
9 |
|
* this software and its documentation (the "Software") for any |
54 |
|
import Logger |
55 |
|
import ProcInfo |
56 |
|
import random |
57 |
+ |
import copy |
58 |
|
|
59 |
|
#__all__ = ["ApMon"] |
60 |
|
|
61 |
< |
#__debug = False # set this to True to be verbose |
61 |
> |
#__debug = False # self.destPrevData[destination];set this to True to be verbose |
62 |
|
|
63 |
|
class ApMon: |
64 |
|
""" |
128 |
|
'sys_net_in' : True, # network transfer in kBps |
129 |
|
'sys_net_out' : True, # these will produce params called ethX_in, ethX_out, ethX_errs |
130 |
|
'sys_net_errs' : False, # for each eth interface |
131 |
+ |
'sys_net_sockets' : True, # number of opened sockets for each proto => sockets_tcp/udp/unix ... |
132 |
+ |
'sys_net_tcp_details' : True, # number of tcp sockets in each state => sockets_tcp_LISTEN, ... |
133 |
|
'sys_processes' : True, |
134 |
|
'sys_uptime' : True, # uptime of the machine, in days (float number) |
135 |
|
|
148 |
|
'cpu_model_name': True, |
149 |
|
'bogomips' : True}; |
150 |
|
|
151 |
< |
def __init__ (self, initValue): |
151 |
> |
def __init__ (self, initValue, defaultLogLevel = Logger.INFO): |
152 |
|
""" |
153 |
|
Class constructor: |
154 |
|
- if initValue is a string, put it in configAddresses and load destinations |
169 |
|
will overwrite the default parameters (see __defaultOptions) |
170 |
|
""" |
171 |
|
self.destinations = {} # empty, by default; key = tuple (host, port, pass) ; val = hash {"param_mame" : True/False, ...} |
172 |
+ |
self.destPrevData = {} # empty, by defaul; key = tuple (host, port, pass) ; val = hash {"param_mame" : value, ...} |
173 |
|
self.configAddresses = [] # empty, by default; list of files/urls from where we read config |
174 |
|
self.configRecheckInterval = 120 # 2 minutes |
175 |
|
self.configRecheck = True # enabled by default |
176 |
|
self.performBgMonitoring = True # by default, perform background monitoring |
177 |
|
self.monitoredJobs = {} # Monitored jobs; key = pid; value = hash with |
178 |
< |
self.maxMsgRate = 50; # Maximum number of messages allowed to be sent per second |
178 |
> |
self.maxMsgRate = 20; # Maximum number of messages allowed to be sent per second |
179 |
> |
self.sender = {'INSTANCE_ID': random.randint(0,0x7FFFFFFE), 'SEQ_NR': 0}; |
180 |
|
self.__defaultUserCluster = "ApMon_UserSend"; |
181 |
|
self.__defaultUserNode = socket.getfqdn(); |
182 |
|
self.__defaultSysMonCluster = "ApMon_SysMon"; |
183 |
|
self.__defaultSysMonNode = socket.getfqdn(); |
184 |
|
# don't touch these: |
185 |
+ |
self.__freed = False |
186 |
|
self.__initializedOK = True |
187 |
|
self.__udpSocket = None |
188 |
|
self.__configUpdateLock = threading.Lock() |
197 |
|
self.__crtSent = 0; |
198 |
|
self.__crtDrop = 0; |
199 |
|
self.__hWeight = 0.92; |
200 |
< |
|
195 |
< |
self.logger = Logger.Logger(self.__defaultLogLevel) |
200 |
> |
self.logger = Logger.Logger(defaultLogLevel) |
201 |
|
if type(initValue) == type("string"): |
202 |
|
self.configAddresses.append(initValue) |
203 |
|
self.__reloadAddresses() |
214 |
|
self.__initializedOK = (len (self.destinations) > 0) |
215 |
|
if not self.__initializedOK: |
216 |
|
self.logger.log(Logger.ERROR, "Failed to initialize. No destination defined."); |
217 |
< |
#self.__defaultClusterName = None |
218 |
< |
#self.__defaultNodeName = self.getMyHo |
217 |
> |
# self.__defaultClusterName = None |
218 |
> |
# self.__defaultNodeName = self.getMyHo |
219 |
|
if self.__initializedOK: |
220 |
|
self.__udpSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) |
221 |
|
if len(self.configAddresses) > 0: |
226 |
|
th.start() |
227 |
|
# create the ProcInfo instance |
228 |
|
self.procInfo = ProcInfo.ProcInfo(self.logger); |
229 |
< |
self.procInfo.update(); |
229 |
> |
# self.procInfo.update(); |
230 |
|
# start the background monitoring thread |
231 |
|
th = threading.Thread(target=self.__bgMonitor); |
232 |
|
th.setDaemon(True); |
288 |
|
if not self.__initializedOK: |
289 |
|
self.logger.log(Logger.WARNING, "Not initialized correctly. Message NOT sent!"); |
290 |
|
return |
291 |
< |
if clusterName == None: |
291 |
> |
if (clusterName == None) or (clusterName == ""): |
292 |
|
clusterName = self.__defaultUserCluster |
293 |
|
else: |
294 |
|
self.__defaultUserCluster = clusterName |
296 |
|
nodeName = self.__defaultUserNode |
297 |
|
else: |
298 |
|
self.__defaultUserNode = nodeName |
299 |
< |
self.__configUpdateLock.acquire() |
299 |
> |
self.__configUpdateLock.acquire(); |
300 |
|
for dest in self.destinations.keys(): |
301 |
< |
self.__directSendParams(dest, clusterName, nodeName, timeStamp, params); |
302 |
< |
self.__configUpdateLock.release() |
301 |
> |
self.__directSendParams(self.sender, dest, clusterName, nodeName, timeStamp, params); |
302 |
> |
self.__configUpdateLock.release(); |
303 |
|
|
304 |
|
def addJobToMonitor (self, pid, workDir, clusterName, nodeName): |
305 |
|
""" |
326 |
|
Set the cluster and node names where to send system related information. |
327 |
|
""" |
328 |
|
self.__bgMonitorLock.acquire(); |
329 |
< |
self.__defaultSysMonCluster = clusterName; |
330 |
< |
self.__defaultSysMonNode = nodeName; |
329 |
> |
if (clusterName != None) and (clusterName != ""): |
330 |
> |
self.__defaultSysMonCluster = clusterName; |
331 |
> |
if (nodeName != None) and (nodeName != ""): |
332 |
> |
self.__defaultSysMonNode = nodeName; |
333 |
|
self.__bgMonitorLock.release(); |
334 |
|
|
335 |
|
def enableBgMonitoring (self, onOff): |
338 |
|
can still be sent if user calls the sendBgMonitoring method. |
339 |
|
""" |
340 |
|
self.performBgMonitoring = onOff; |
341 |
< |
|
342 |
< |
def sendBgMonitoring (self): |
341 |
> |
|
342 |
> |
def sendBgMonitoring (self, mustSend = False): |
343 |
|
""" |
344 |
< |
Send now background monitoring about system and jobs to all interested destinations. |
344 |
> |
Send background monitoring about system and jobs to all interested destinations. |
345 |
> |
If mustSend == True, the information is sent regardles of the elapsed time since last sent |
346 |
> |
If mustSend == False, the data is sent only if the required interval has passed since last sent |
347 |
|
""" |
348 |
|
self.__bgMonitorLock.acquire(); |
340 |
– |
self.procInfo.update(); |
349 |
|
now = int(time.time()); |
350 |
< |
for destination, options in self.destinations.items(): |
350 |
> |
updatedProcInfo = False; |
351 |
> |
for destination, options in self.destinations.iteritems(): |
352 |
|
sysParams = []; |
353 |
|
jobParams = []; |
354 |
+ |
prevRawData = self.destPrevData[destination]; |
355 |
|
# for each destination and its options, check if we have to report any background monitoring data |
356 |
< |
if(options['sys_monitoring'] and options['sys_data_sent'] + options['sys_interval'] < now): |
356 |
> |
if(options['sys_monitoring'] and (mustSend or options['sys_data_sent'] + options['sys_interval'] <= now)): |
357 |
|
for param, active in options.items(): |
358 |
|
m = re.match("sys_(.+)", param); |
359 |
|
if(m != None and active): |
361 |
|
if not (param == 'monitoring' or param == 'interval' or param == 'data_sent'): |
362 |
|
sysParams.append(param) |
363 |
|
options['sys_data_sent'] = now; |
364 |
< |
if(options['job_monitoring'] and options['job_data_sent'] + options['job_interval'] < now): |
364 |
> |
if(options['job_monitoring'] and (mustSend or options['job_data_sent'] + options['job_interval'] <= now)): |
365 |
|
for param, active in options.items(): |
366 |
|
m = re.match("job_(.+)", param); |
367 |
|
if(m != None and active): |
369 |
|
if not (param == 'monitoring' or param == 'interval' or param == 'data_sent'): |
370 |
|
jobParams.append(param); |
371 |
|
options['job_data_sent'] = now; |
372 |
< |
if(options['general_info'] and options['general_data_sent'] + 2 * int(options['sys_interval']) < now): |
372 |
> |
if(options['general_info'] and (mustSend or options['general_data_sent'] + 2 * int(options['sys_interval']) <= now)): |
373 |
|
for param, active in options.items(): |
374 |
|
if not (param.startswith("sys_") or param.startswith("job_")) and active: |
375 |
|
if not (param == 'general_info' or param == 'general_data_sent'): |
376 |
|
sysParams.append(param); |
377 |
+ |
options['general_data_sent'] = now; |
378 |
+ |
|
379 |
+ |
if (not updatedProcInfo) and ((len(sysParams) > 0) or (len(jobParams) > 0) ): |
380 |
+ |
self.procInfo.update(); |
381 |
+ |
updatedProcInf = True; |
382 |
+ |
|
383 |
|
sysResults = {} |
384 |
|
if(len(sysParams) > 0): |
385 |
< |
sysResults = self.procInfo.getSystemData(sysParams); |
386 |
< |
if(len(sysResults.keys()) > 0): |
387 |
< |
self.__directSendParams(destination, self.__defaultSysMonCluster, self.__defaultSysMonNode, -1, sysResults); |
385 |
> |
sysResults = self.procInfo.getSystemData(sysParams, prevRawData); |
386 |
> |
self.updateLastSentTime(options, self.destinations[destination]); |
387 |
> |
if( (type(sysResults) == type( {} )) and len(sysResults.keys()) > 0): |
388 |
> |
self.__directSendParams(self.sender, destination, self.__defaultSysMonCluster, self.__defaultSysMonNode, -1, sysResults); |
389 |
> |
if( (type(sysResults) == type( [] )) and len(sysResults) > 0): |
390 |
> |
self.__directSendParams(self.sender, destination, self.__defaultSysMonCluster, self.__defaultSysMonNode, -1, sysResults); |
391 |
|
for pid, props in self.monitoredJobs.items(): |
392 |
|
jobResults = {}; |
393 |
|
if(len(jobParams) > 0): |
394 |
|
jobResults = self.procInfo.getJobData(pid, jobParams); |
395 |
|
if(len(jobResults) > 0): |
396 |
< |
self.__directSendParams(destination, props['CLUSTER_NAME'], props['NODE_NAME'], -1, jobResults); |
396 |
> |
self.__directSendParams(self.sender, destination, props['CLUSTER_NAME'], props['NODE_NAME'], -1, jobResults); |
397 |
|
self.__bgMonitorLock.release(); |
398 |
|
|
399 |
+ |
# copy the time when last data was sent |
400 |
+ |
def updateLastSentTime(self, srcOpts, dstOpts): |
401 |
+ |
if srcOpts.has_key('general_data_sent'): |
402 |
+ |
dstOpts['general_data_sent'] = srcOpts['general_data_sent']; |
403 |
+ |
if srcOpts.has_key('sys_data_sent'): |
404 |
+ |
dstOpts['sys_data_sent'] = srcOpts['sys_data_sent']; |
405 |
+ |
if srcOpts.has_key('job_data_sent'): |
406 |
+ |
dstOpts['job_data_sent'] = srcOpts['job_data_sent']; |
407 |
+ |
|
408 |
|
def setLogLevel (self, strLevel): |
409 |
|
""" |
410 |
|
Change the log level. Given level is a string, one of 'FATAL', 'ERROR', 'WARNING', |
424 |
|
Stop background threands, close opened sockets. You have to use this function if you want to |
425 |
|
free all the resources that ApMon takes, and allow it to be garbage-collected. |
426 |
|
""" |
427 |
< |
self.__configUpdateEvent.set(); |
428 |
< |
self.__bgMonitorEvent.set(); |
427 |
> |
if self.__configUpdateEvent != None: |
428 |
> |
self.__configUpdateEvent.set(); |
429 |
> |
if self.__bgMonitorEvent != None: |
430 |
> |
self.__bgMonitorEvent.set(); |
431 |
|
time.sleep(0.01); |
432 |
|
if self.__udpSocket != None: |
433 |
|
self.logger.log(Logger.DEBUG, "Closing UDP socket on ApMon object destroy."); |
434 |
|
self.__udpSocket.close(); |
435 |
+ |
self.__udpSocket = None; |
436 |
+ |
self.__freed = True |
437 |
|
|
438 |
|
|
439 |
|
######################################################################################### |
503 |
|
if (h == host) and (p == port): |
504 |
|
alreadyAdded = True |
505 |
|
break |
506 |
+ |
destination = (host, port, passwd); |
507 |
|
if not alreadyAdded: |
508 |
|
self.logger.log(Logger.INFO, "Adding destination "+host+':'+`port`+' '+passwd); |
509 |
< |
tempDestinations[(host, port, passwd)] = self.__defaultOptions; |
509 |
> |
tempDestinations[destination] = copy.deepcopy(self.__defaultOptions); # have a different set of options for each dest. |
510 |
> |
self.destPrevData[destination] = {}; |
511 |
|
if options != self.__defaultOptions: |
512 |
|
# we have to overwrite defaults with given options |
513 |
|
for key, value in options.items(): |
514 |
|
self.logger.log(Logger.NOTICE, "Overwritting option: "+key+" = "+`value`); |
515 |
< |
tempDestinations[(host, port, passwd)][key] = value; |
515 |
> |
tempDestinations[destination][key] = value; |
516 |
|
else: |
517 |
|
self.logger.log(Logger.NOTICE, "Destination "+host+":"+str(port)+" "+passwd+" already added. Skipping it"); |
518 |
|
|
585 |
|
opts[param] = value; |
586 |
|
else: |
587 |
|
dests.append(line); |
588 |
+ |
|
589 |
|
confFile.close () |
590 |
|
for line in dests: |
591 |
|
self.__addDestination(line, tempDestinations, opts) |
600 |
|
if self.__bgMonitorEvent.isSet(): |
601 |
|
return; |
602 |
|
if self.performBgMonitoring: |
603 |
< |
self.sendBgMonitoring(); |
603 |
> |
self.sendBgMonitoring() # send only if the interval has elapsed |
604 |
|
|
605 |
|
############################################################################################### |
606 |
|
# Internal helper functions |
607 |
|
############################################################################################### |
608 |
|
|
609 |
< |
def __directSendParams (self, destination, clusterName, nodeName, timeStamp, params): |
609 |
> |
def __directSendParams (self, senderRef, destination, clusterName, nodeName, timeStamp, params): |
610 |
> |
|
611 |
> |
if senderRef=={}: |
612 |
> |
self.logger.log(Logger.WARNING, "Not sending undefined parameters!"); |
613 |
> |
return; |
614 |
|
|
615 |
|
if self.__shouldSend() == False: |
616 |
|
return; |
617 |
|
|
618 |
< |
xdrPacker = xdrlib.Packer () |
618 |
> |
if destination == None: |
619 |
> |
self.logger.log(Logger.WARNING, "Destination is None"); |
620 |
> |
return; |
621 |
> |
|
622 |
|
host, port, passwd = destination |
623 |
+ |
senderRef['SEQ_NR'] = (senderRef['SEQ_NR'] + 1) % 2000000000; # wrap around 2 mld |
624 |
+ |
|
625 |
+ |
xdrPacker = xdrlib.Packer (); |
626 |
|
self.logger.log(Logger.DEBUG, "Building XDR packet for ["+str(clusterName)+"] <"+str(nodeName)+"> len:"+str(len(params))); |
627 |
+ |
|
628 |
|
xdrPacker.pack_string ("v:"+self.__version+"p:"+passwd) |
629 |
< |
xdrPacker.pack_string (clusterName) |
630 |
< |
xdrPacker.pack_string (nodeName) |
631 |
< |
xdrPacker.pack_int (len(params)) |
629 |
> |
|
630 |
> |
xdrPacker.pack_int (senderRef['INSTANCE_ID']); |
631 |
> |
xdrPacker.pack_int (senderRef['SEQ_NR']); |
632 |
> |
|
633 |
> |
xdrPacker.pack_string (clusterName); |
634 |
> |
xdrPacker.pack_string (nodeName); |
635 |
> |
xdrPacker.pack_int (len(params)); |
636 |
> |
|
637 |
|
if type(params) == type( {} ): |
638 |
|
for name, value in params.iteritems(): |
639 |
|
self.__packParameter(xdrPacker, name, value) |
643 |
|
self.__packParameter(xdrPacker, name, value) |
644 |
|
else: |
645 |
|
self.logger.log(Logger.WARNING, "Unsupported params type in sendParameters: " + str(type(params))); |
646 |
< |
if(timeStamp > 0): |
646 |
> |
if (timeStamp != None) and (timeStamp > 0): |
647 |
|
xdrPacker.pack_int(timeStamp); |
648 |
+ |
|
649 |
|
buffer = xdrPacker.get_buffer(); |
650 |
|
# send this buffer to the destination, using udp datagrams |
651 |
|
try: |
652 |
|
self.__udpSocket.sendto(buffer, (host, port)) |
653 |
< |
self.logger.log(Logger.DEBUG, "Packet sent"); |
653 |
> |
self.logger.log(Logger.DEBUG, "Packet sent to "+host+":"+str(port)+" "+passwd); |
654 |
|
except socket.error, msg: |
655 |
|
self.logger.log(Logger.ERROR, "Cannot send packet to "+host+":"+str(port)+" "+passwd+": "+str(msg[1])); |
656 |
|
xdrPacker.reset() |
657 |
|
|
658 |
|
def __packParameter(self, xdrPacker, name, value): |
659 |
+ |
if (name is None) or (name is ""): |
660 |
+ |
self.logger.log(Logger.WARNING, "Undefine parameter name."); |
661 |
+ |
return; |
662 |
+ |
if (value is None): |
663 |
+ |
self.logger.log(Logger.WARNING, "Ignore " + str(name)+ " parameter because of None value"); |
664 |
+ |
return; |
665 |
|
try: |
666 |
|
typeValue = self.__valueTypes[type(value)] |
667 |
|
xdrPacker.pack_string (name) |
670 |
|
self.logger.log(Logger.DEBUG, "Adding parameter "+str(name)+" = "+str(value)); |
671 |
|
except Exception, ex: |
672 |
|
print "ApMon: error packing %s = %s; got %s" % (name, str(value), ex) |
673 |
< |
|
673 |
> |
|
674 |
|
# Destructor |
675 |
|
def __del__(self): |
676 |
< |
self.free(); |
676 |
> |
if not self.__freed: |
677 |
> |
self.free(); |
678 |
|
|
679 |
|
# Decide if the current datagram should be sent. |
680 |
|
# This decision is based on the number of messages previously sent. |
711 |
|
|
712 |
|
return doSend; |
713 |
|
|
655 |
– |
|
714 |
|
################################################################################################ |
715 |
|
# Private variables. Don't touch |
716 |
|
################################################################################################ |
726 |
|
5: xdrlib.Packer.pack_double } |
727 |
|
|
728 |
|
__defaultPort = 8884 |
729 |
< |
__defaultLogLevel = Logger.INFO |
672 |
< |
__version = "2.0.6-py" # apMon version number |
729 |
> |
__version = "2.2.1-py" # apMon version number |
730 |
|
|