ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/apmon.py
Revision: 1.2
Committed: Tue Nov 8 16:02:39 2005 UTC (19 years, 5 months ago) by corvo
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_1_0_4, CRAB_1_0_3, CRAB_1_0_2, CRAB_0_2_2
Changes since 1.1: +108 -23 lines
Log Message:
New apmon

File Contents

# User Rev Content
1 corvo 1.1
2     """
3     * ApMon - Application Monitoring Tool
4     * Version: 2.0.4
5     *
6     * Copyright (C) 2005 California Institute of Technology
7     *
8     * Permission is hereby granted, free of charge, to use, copy and modify
9     * this software and its documentation (the "Software") for any
10     * purpose, provided that existing copyright notices are retained in
11     * all copies and that this notice is included verbatim in any distributions
12     * or substantial portions of the Software.
13     * This software is a part of the MonALISA framework (http://monalisa.cacr.caltech.edu).
14     * Users of the Software are asked to feed back problems, benefits,
15     * and/or suggestions about the software to the MonALISA Development Team
16     * (developers@monalisa.cern.ch). Support for this software - fixing of bugs,
17     * incorporation of new features - is done on a best effort basis. All bug
18     * fixes and enhancements will be made available under the same terms and
19     * conditions as the original software,
20    
21     * IN NO EVENT SHALL THE AUTHORS OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
22     * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
23     * OF THE USE OF THIS SOFTWARE, ITS DOCUMENTATION, OR ANY DERIVATIVES THEREOF,
24     * EVEN IF THE AUTHORS HAVE BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
25    
26     * THE AUTHORS AND DISTRIBUTORS SPECIFICALLY DISCLAIM ANY WARRANTIES,
27     * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY,
28     * FITNESS FOR A PARTICULAR PURPOSE, AND NON-INFRINGEMENT. THIS SOFTWARE IS
29     * PROVIDED ON AN "AS IS" BASIS, AND THE AUTHORS AND DISTRIBUTORS HAVE NO
30     * OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR
31     * MODIFICATIONS.
32     """
33    
34     """
35     apmon.py
36    
37     This is a python implementation for the ApMon API for sending
38     data to the MonALISA service.
39    
40     For further details about ApMon please see the C/C++ or Java documentation
41     You can find a sample usage of this module in apmTest.py.
42    
43     Note that the parameters must be either integers(32 bits) or doubles(64 bits).
44     Sending strings is supported, but they will not be stored in the
45     farm's store nor shown in the farm's window in the MonALISA client.
46     """
47    
48     import re
49     import xdrlib
50     import socket
51     import urllib2
52     import threading
53     import time
54     import Logger
55     import ProcInfo
56 corvo 1.2 import random
57 corvo 1.1
58     #__all__ = ["ApMon"]
59    
60     #__debug = False # set this to True to be verbose
61    
62     class ApMon:
63     """
64     Main class for sending monitoring data to a MonaLisa module.
65     One or more destinations can be chosen for the data. See constructor.
66    
67     The data is packed in UDP datagrams, using XDR. The following fields are sent:
68     - version & password (string)
69     - cluster name (string)
70     - node name (string)
71     - number of parameters (int)
72     - for each parameter:
73     - name (string)
74     - value type (int)
75     - value
76     - optionally a (int) with the given timestamp
77    
78     Attributes (public):
79     - destinations - a list containing (ip, port, password) tuples
80     - configAddresses - list with files and urls from where the config is read
81     - configRecheckInterval - period, in seconds, to check for changes
82     in the configAddresses list
83     - configRecheck - boolean - whether to recheck periodically for changes
84     in the configAddresses list
85     """
86    
87     __defaultOptions = {
88     'job_monitoring': True, # perform (or not) job monitoring
89     'job_interval' : 10, # at this interval (in seconds)
90     'job_data_sent' : 0, # time from Epoch when job information was sent; don't touch!
91    
92     'job_cpu_time' : True, # elapsed time from the start of this job in seconds
93     'job_run_time' : True, # processor time spent running this job in seconds
94     'job_cpu_usage' : True, # current percent of the processor used for this job, as reported by ps
95     'job_virtualmem': True, # size in JB of the virtual memory occupied by the job, as reported by ps
96     'job_rss' : True, # size in KB of the resident image size of the job, as reported by ps
97     'job_mem_usage' : True, # percent of the memory occupied by the job, as reported by ps
98     'job_workdir_size': True, # size in MB of the working directory of the job
99     'job_disk_total': True, # size in MB of the total size of the disk partition containing the working directory
100     'job_disk_used' : True, # size in MB of the used disk partition containing the working directory
101     'job_disk_free' : True, # size in MB of the free disk partition containing the working directory
102     'job_disk_usage': True, # percent of the used disk partition containing the working directory
103 corvo 1.2 'job_open_files': True, # number of open file descriptors
104 corvo 1.1
105     'sys_monitoring': True, # perform (or not) system monitoring
106     'sys_interval' : 10, # at this interval (in seconds)
107     'sys_data_sent' : 0, # time from Epoch when system information was sent; don't touch!
108    
109     'sys_cpu_usr' : False, # cpu-usage information
110     'sys_cpu_sys' : False, # all these will produce coresponding paramas without "sys_"
111     'sys_cpu_nice' : False,
112     'sys_cpu_idle' : False,
113     'sys_cpu_usage' : True,
114     'sys_load1' : True, # system load information
115     'sys_load5' : True,
116     'sys_load15' : True,
117     'sys_mem_used' : False, # memory usage information
118     'sys_mem_free' : False,
119     'sys_mem_usage' : True,
120     'sys_pages_in' : False,
121     'sys_pages_out' : False,
122     'sys_swap_used' : True, # swap usage information
123     'sys_swap_free' : False,
124     'sys_swap_usage': True,
125     'sys_swap_in' : False,
126     'sys_swap_out' : False,
127     'sys_net_in' : True, # network transfer in kBps
128     'sys_net_out' : True, # these will produce params called ethX_in, ethX_out, ethX_errs
129     'sys_net_errs' : False, # for each eth interface
130     'sys_processes' : True,
131     'sys_uptime' : True, # uptime of the machine, in days (float number)
132    
133     'general_info' : True, # send (or not) general host information once every 2 $sys_interval seconds
134     'general_data_sent': 0, # time from Epoch when general information was sent; don't touch!
135    
136     'hostname' : True,
137     'ip' : True, # will produce ethX_ip params for each interface
138     'cpu_MHz' : True,
139     'no_CPUs' : True, # number of CPUs
140     'total_mem' : True,
141 corvo 1.2 'total_swap' : True,
142     'cpu_vendor_id' : True,
143     'cpu_family' : True,
144     'cpu_model' : True,
145     'cpu_model_name': True,
146     'bogomips' : True};
147 corvo 1.1
148     def __init__ (self, initValue):
149     """
150     Class constructor:
151     - if initValue is a string, put it in configAddresses and load destinations
152     from the file named like that. if it starts with "http://", the configuration
153     is loaded from that URL. For background monitoring, given parameters will overwrite defaults
154    
155     - if initValue is a list, put its contents in configAddresses and create
156     the list of destinations from all those sources. For background monitoring,
157     given parameters will overwrite defaults (see __defaultOptions)
158    
159     - if initValue is a tuple (of strings), initialize destinations with that values.
160     Strings in this tuple have this form: "{hostname|ip}[:port][ passwd]", the
161     default port being 8884 and the default password being "". Background monitoring will be
162     enabled sending the parameters active from __defaultOptions (see end of file)
163    
164     - if initValue is a hash (key = string(hostname|ip[:port][ passwd]),
165     val = hash{'param_name': True/False, ...}) the given options for each destination
166     will overwrite the default parameters (see __defaultOptions)
167     """
168 corvo 1.2 self.destinations = {} # empty, by default; key = tuple (host, port, pass) ; val = hash {"param_mame" : True/False, ...}
169     self.configAddresses = [] # empty, by default; list of files/urls from where we read config
170     self.configRecheckInterval = 120 # 2 minutes
171     self.configRecheck = True # enabled by default
172     self.performBgMonitoring = True # by default, perform background monitoring
173     self.monitoredJobs = {} # Monitored jobs; key = pid; value = hash with
174     self.maxMsgRate = 50; # Maximum number of messages allowed to be sent per second
175     self.__defaultUserCluster = "ApMon_UserSend";
176     self.__defaultUserNode = socket.getfqdn();
177     self.__defaultSysMonCluster = "ApMon_SysMon";
178     self.__defaultSysMonNode = socket.getfqdn();
179     # don't touch these:
180     self.__initializedOK = True
181     self.__udpSocket = None
182     self.__configUpdateLock = threading.Lock()
183     self.__configUpdateEvent = threading.Event()
184     self.__bgMonitorLock = threading.Lock()
185     self.__bgMonitorEvent = threading.Event()
186     # don't allow a user to send more than MAX_MSG messages per second, in average
187     self.__crtTime = 0;
188     self.__prvTime = 0;
189     self.__prvSent = 0;
190     self.__prvDrop = 0;
191     self.__crtSent = 0;
192     self.__crtDrop = 0;
193     self.__hWeight = 0.92;
194    
195 corvo 1.1 self.logger = Logger.Logger(self.__defaultLogLevel)
196     if type(initValue) == type("string"):
197     self.configAddresses.append(initValue)
198     self.__reloadAddresses()
199     elif type(initValue) == type([]):
200     self.configAddresses = initValue
201     self.__reloadAddresses()
202     elif type(initValue) == type(()):
203     for dest in initValue:
204     self.__addDestination (dest, self.destinations)
205     elif type(initValue) == type({}):
206     for dest, opts in initValue.items():
207     self.__addDestination (dest, self.destinations, opts)
208    
209     self.__initializedOK = (len (self.destinations) > 0)
210     if not self.__initializedOK:
211     self.logger.log(Logger.ERROR, "Failed to initialize. No destination defined.");
212     #self.__defaultClusterName = None
213     #self.__defaultNodeName = self.getMyHo
214     if self.__initializedOK:
215     self.__udpSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
216     if len(self.configAddresses) > 0:
217     # if there are addresses that need to be monitored,
218     # start config checking and reloading thread
219     th = threading.Thread(target=self.__configLoader)
220     th.setDaemon(True) # this is a daemon thread
221     th.start()
222     # create the ProcInfo instance
223     self.procInfo = ProcInfo.ProcInfo(self.logger);
224     self.procInfo.update();
225     # start the background monitoring thread
226     th = threading.Thread(target=self.__bgMonitor);
227     th.setDaemon(True);
228     th.start();
229    
230    
231     def sendParams (self, params):
232     """
233     Send multiple parameters to MonALISA, with default (last given) cluser and node names.
234     """
235     self.sendTimedParams (-1, params)
236    
237     def sendTimedParams (self, timeStamp, params):
238     """
239     Send multiple parameters, specifying the time for them, with default (last given) cluster and node names.
240     (See sendTimedParameters for more details)
241     """
242     self.sendTimedParameters (None, None, timeStamp, params);
243    
244     def sendParameter (self, clusterName, nodeName, paramName, paramValue):
245     """
246     Send a single parameter to MonALISA.
247     """
248     self.sendTimedParameter(clusterName, nodeName, -1, paramName, paramValue);
249    
250     def sendTimedParameter (self, clusterName, nodeName, timeStamp, paramName, paramValue):
251     """
252     Send a single parameter, with a given time.
253     """
254     self.sendTimedParameters (clusterName, nodeName, timeStamp, {paramName:paramValue})
255    
256     def sendParameters (self, clusterName, nodeName, params):
257     """
258     Send multiple parameters specifying cluster and node name for them
259     """
260     self.sendTimedParameters (clusterName, nodeName, -1, params);
261    
262     def sendTimedParameters (self, clusterName, nodeName, timeStamp, params):
263     """
264     Send multiple monitored parameters to MonALISA.
265    
266     - clusterName is the name of the cluster being monitored. The first
267     time this function is called, this paramenter must not be None. Then,
268     it can be None; last given clusterName will be used instead.
269     - nodeName is the name of the node for which are the parameters. If this
270     is None, the full hostname of this machine will be sent instead.
271     - timeStamp, if > 0, is given time for the parameters. This is in seconds from Epoch.
272     Note that this option should be used only if you are sure about the time for the result.
273     Otherwize, the parameters will be assigned a correct time (obtained from NTP servers)
274     in MonALISA service. This option can be usefull when parsing logs, for example.
275     - params is a dictionary containing pairs with:
276     - key: parameter name
277     - value: parameter value, either int or float.
278     or params is a vector of tuples (key, value). This version can be used
279     in case you want to send the parameters in a given order.
280    
281     NOTE that python doesn't know about 32-bit floats (only 64-bit floats!)
282     """
283     if not self.__initializedOK:
284     self.logger.log(Logger.WARNING, "Not initialized correctly. Message NOT sent!");
285     return
286     if clusterName == None:
287     clusterName = self.__defaultUserCluster
288     else:
289     self.__defaultUserCluster = clusterName
290     if nodeName == None:
291     nodeName = self.__defaultUserNode
292     else:
293     self.__defaultUserNode = nodeName
294     self.__configUpdateLock.acquire()
295     for dest in self.destinations.keys():
296     self.__directSendParams(dest, clusterName, nodeName, timeStamp, params);
297     self.__configUpdateLock.release()
298    
299     def addJobToMonitor (self, pid, workDir, clusterName, nodeName):
300     """
301     Add a new job to monitor.
302     """
303     self.__bgMonitorLock.acquire();
304     self.monitoredJobs[pid] = {};
305     self.monitoredJobs[pid]['CLUSTER_NAME'] = clusterName;
306     self.monitoredJobs[pid]['NODE_NAME'] = nodeName;
307     self.procInfo.addJobToMonitor(pid, workDir);
308     self.__bgMonitorLock.release();
309    
310     def removeJobToMonitor (self, pid):
311     """
312     Remove a job from being monitored.
313     """
314     self.__bgMonitorLock.acquire();
315     self.procInfo.removeJobToMonitor(pid);
316     del self.monitoredJobs[pid];
317     self.__bgMonitorLock.release();
318    
319     def setMonitorClusterNode (self, clusterName, nodeName):
320     """
321     Set the cluster and node names where to send system related information.
322     """
323     self.__bgMonitorLock.acquire();
324     self.__defaultSysMonCluster = clusterName;
325     self.__defaultSysMonNode = nodeName;
326     self.__bgMonitorLock.release();
327    
328     def enableBgMonitoring (self, onOff):
329     """
330     Enable or disable background monitoring. Note that background monitoring information
331     can still be sent if user calls the sendBgMonitoring method.
332     """
333     self.performBgMonitoring = onOff;
334    
335     def sendBgMonitoring (self):
336     """
337     Send now background monitoring about system and jobs to all interested destinations.
338     """
339     self.__bgMonitorLock.acquire();
340     self.procInfo.update();
341     now = int(time.time());
342     for destination, options in self.destinations.items():
343     sysParams = [];
344     jobParams = [];
345     # for each destination and its options, check if we have to report any background monitoring data
346     if(options['sys_monitoring'] and options['sys_data_sent'] + options['sys_interval'] < now):
347     for param, active in options.items():
348     m = re.match("sys_(.+)", param);
349     if(m != None and active):
350     param = m.group(1);
351     if not (param == 'monitoring' or param == 'interval' or param == 'data_sent'):
352     sysParams.append(param)
353     options['sys_data_sent'] = now;
354     if(options['job_monitoring'] and options['job_data_sent'] + options['job_interval'] < now):
355     for param, active in options.items():
356     m = re.match("job_(.+)", param);
357     if(m != None and active):
358     param = m.group(1);
359     if not (param == 'monitoring' or param == 'interval' or param == 'data_sent'):
360     jobParams.append(param);
361     options['job_data_sent'] = now;
362     if(options['general_info'] and options['general_data_sent'] + 2 * int(options['sys_interval']) < now):
363     for param, active in options.items():
364     if not (param.startswith("sys_") or param.startswith("job_")) and active:
365     if not (param == 'general_info' or param == 'general_data_sent'):
366     sysParams.append(param);
367     sysResults = {}
368     if(len(sysParams) > 0):
369     sysResults = self.procInfo.getSystemData(sysParams);
370     if(len(sysResults.keys()) > 0):
371     self.__directSendParams(destination, self.__defaultSysMonCluster, self.__defaultSysMonNode, -1, sysResults);
372     for pid, props in self.monitoredJobs.items():
373     jobResults = {};
374     if(len(jobParams) > 0):
375     jobResults = self.procInfo.getJobData(pid, jobParams);
376     if(len(jobResults) > 0):
377     self.__directSendParams(destination, props['CLUSTER_NAME'], props['NODE_NAME'], -1, jobResults);
378     self.__bgMonitorLock.release();
379    
380     def setLogLevel (self, strLevel):
381     """
382     Change the log level. Given level is a string, one of 'FATAL', 'ERROR', 'WARNING',
383     'INFO', 'NOTICE', 'DEBUG'.
384     """
385     self.logger.setLogLevel(strLevel);
386    
387 corvo 1.2 def setMaxMsgRate(self, rate):
388     """
389     Set the maximum number of messages that can be sent, per second.
390     """
391     self.maxMsgRate = rate;
392     self.logger.log(Logger.DEBUG, "Setting maxMsgRate to: " + str(rate));
393    
394     def free(self):
395     """
396     Stop background threands, close opened sockets. You have to use this function if you want to
397     free all the resources that ApMon takes, and allow it to be garbage-collected.
398     """
399     self.__configUpdateEvent.set();
400     self.__bgMonitorEvent.set();
401     time.sleep(0.01);
402     if self.__udpSocket != None:
403     self.logger.log(Logger.DEBUG, "Closing UDP socket on ApMon object destroy.");
404     self.__udpSocket.close();
405    
406    
407 corvo 1.1 #########################################################################################
408     # Internal functions - Config reloader thread
409     #########################################################################################
410    
411     def __configLoader(self):
412     """
413     Main loop of the thread that checks for changes and reloads the configuration
414     """
415 corvo 1.2 while not self.__configUpdateEvent.isSet():
416     self.__configUpdateEvent.wait(self.configRecheckInterval);
417     if self.__configUpdateEvent.isSet():
418     return;
419 corvo 1.1 if self.configRecheck:
420     self.__reloadAddresses()
421     self.logger.log(Logger.DEBUG, "Config reloaded. Seleeping for "+`self.configRecheckInterval`+" sec.");
422    
423     def __reloadAddresses(self):
424     """
425     Refresh destinations hash, by loading data from all sources in configAddresses
426     """
427     newDestinations = {}
428     for src in self.configAddresses:
429     self.__initializeFromFile(src, newDestinations)
430     # avoid changing config in the middle of sending packets to previous destinations
431     self.__configUpdateLock.acquire()
432     self.destinations = newDestinations
433     self.__configUpdateLock.release()
434    
435     def __addDestination (self, aDestination, tempDestinations, options = __defaultOptions):
436     """
437     Add a destination to the list.
438    
439     aDestination is a string of the form "{hostname|ip}[:port] [passwd]" without quotes.
440     If the port is not given, it will be used the default port (8884)
441     If the password is missing, it will be considered an empty string
442     """
443     aDestination = aDestination.strip().replace('\t', ' ')
444     while aDestination != aDestination.replace(' ', ' '):
445     aDestination = aDestination.replace(' ', ' ')
446     sepPort = aDestination.find (':')
447     sepPasswd = aDestination.rfind (' ')
448     if sepPort >= 0:
449     host = aDestination[0:sepPort].strip()
450     if sepPasswd > sepPort + 1:
451     port = aDestination[sepPort+1:sepPasswd].strip()
452     passwd = aDestination[sepPasswd:].strip()
453     else:
454     port = aDestination[sepPort+1:].strip()
455     passwd = ""
456     else:
457     port = str(self.__defaultPort)
458     if sepPasswd >= 0:
459     host = aDestination[0:sepPasswd].strip()
460     passwd = aDestination[sepPasswd:].strip()
461     else:
462     host = aDestination.strip()
463     passwd = ""
464     if (not port.isdigit()):
465     self.logger.log(Logger.WARNING, "Bad value for port number "+`port`+" in "+aDestination+" destination");
466     return
467     alreadyAdded = False
468     port = int(port)
469     host = socket.gethostbyname(host) # convert hostnames to IP addresses to avoid suffocating DNSs
470     for h, p, w in tempDestinations.keys():
471     if (h == host) and (p == port):
472     alreadyAdded = True
473     break
474     if not alreadyAdded:
475     self.logger.log(Logger.INFO, "Adding destination "+host+':'+`port`+' '+passwd);
476     tempDestinations[(host, port, passwd)] = self.__defaultOptions;
477     if options != self.__defaultOptions:
478     # we have to overwrite defaults with given options
479     for key, value in options.items():
480     self.logger.log(Logger.NOTICE, "Overwritting option: "+key+" = "+`value`);
481     tempDestinations[(host, port, passwd)][key] = value;
482     else:
483 corvo 1.2 self.logger.log(Logger.NOTICE, "Destination "+host+":"+str(port)+" "+passwd+" already added. Skipping it");
484 corvo 1.1
485     def __initializeFromFile (self, confFileName, tempDestinations):
486     """
487     Load destinations from confFileName file. If it's an URL (starts with "http://")
488     load configuration from there. Put all destinations in tempDestinations hash.
489    
490     Calls addDestination for each line that doesn't start with # and
491     has non-whitespace characters on it
492     """
493     try:
494     if confFileName.find ("http://") == 0:
495     confFile = urllib2.urlopen (confFileName)
496     else:
497     confFile = open (confFileName)
498     except urllib2.HTTPError, e:
499     self.logger.log(Logger.ERROR, "Cannot open "+confFileName);
500     if e.code == 401:
501     self.logger.log(Logger.ERROR, 'HTTPError: not authorized.');
502     elif e.code == 404:
503     self.logger.log(Logger.ERROR, 'HTTPError: not found.');
504     elif e.code == 503:
505     self.logger.log(Logger.ERROR, 'HTTPError: service unavailable.');
506     else:
507     self.logger.log(Logger.ERROR, 'HTTPError: unknown error.');
508     return
509     except urllib2.URLError, ex:
510     self.logger.log(Logger.ERROR, "Cannot open "+confFileName);
511     self.logger.log(Logger.ERROR, "URL Error: "+str(ex.reason[1]));
512     return
513     except IOError, ex:
514     self.logger.log(Logger.ERROR, "Cannot open "+confFileName);
515     self.logger.log(Logger.ERROR, "IOError: "+str(ex));
516     return
517     self.logger.log(Logger.INFO, "Adding destinations from "+confFileName);
518     dests = []
519     opts = {}
520     while(True):
521     line = confFile.readline();
522     if line == '':
523     break;
524     line = line.strip()
525     self.logger.log(Logger.DEBUG, "Reading line "+line);
526     if (len(line) == 0) or (line[0] == '#'):
527     continue
528     elif line.startswith("xApMon_"):
529     m = re.match("xApMon_(.*)", line);
530     if m != None:
531     m = re.match("(\S+)\s*=\s*(\S+)", m.group(1));
532     if m != None:
533     param = m.group(1); value = m.group(2);
534     if(value.upper() == "ON"):
535     value = True;
536     elif(value.upper() == "OFF"):
537     value = False;
538     elif(param.endswith("_interval")):
539     value = int(value);
540     if param == "loglevel":
541     self.logger.setLogLevel(value);
542 corvo 1.2 elif param == "maxMsgRate":
543     self.setMaxMsgRate(int(value));
544 corvo 1.1 elif param == "conf_recheck":
545     self.configRecheck = value;
546     elif param == "recheck_interval":
547     self.configRecheckInterval = value;
548     elif param.endswith("_data_sent"):
549     pass; # don't reset time in sys/job/general/_data_sent
550     else:
551     opts[param] = value;
552     else:
553     dests.append(line);
554     confFile.close ()
555     for line in dests:
556     self.__addDestination(line, tempDestinations, opts)
557    
558     ###############################################################################################
559     # Internal functions - Background monitor thread
560     ###############################################################################################
561    
562     def __bgMonitor (self):
563 corvo 1.2 while not self.__bgMonitorEvent.isSet():
564     self.__bgMonitorEvent.wait(10);
565     if self.__bgMonitorEvent.isSet():
566     return;
567 corvo 1.1 if self.performBgMonitoring:
568     self.sendBgMonitoring();
569    
570     ###############################################################################################
571     # Internal helper functions
572     ###############################################################################################
573    
574     def __directSendParams (self, destination, clusterName, nodeName, timeStamp, params):
575 corvo 1.2
576     if self.__shouldSend() == False:
577     return;
578    
579 corvo 1.1 xdrPacker = xdrlib.Packer ()
580     host, port, passwd = destination
581     self.logger.log(Logger.DEBUG, "Building XDR packet for ["+str(clusterName)+"] <"+str(nodeName)+"> len:"+str(len(params)));
582     xdrPacker.pack_string ("v:"+self.__version+"p:"+passwd)
583     xdrPacker.pack_string (clusterName)
584     xdrPacker.pack_string (nodeName)
585     xdrPacker.pack_int (len(params))
586     if type(params) == type( {} ):
587     for name, value in params.iteritems():
588     self.__packParameter(xdrPacker, name, value)
589     elif type(params) == type( [] ):
590     for name, value in params:
591     self.logger.log(Logger.DEBUG, "Adding parameter "+name+" = "+str(value));
592     self.__packParameter(xdrPacker, name, value)
593     else:
594     self.logger.log(Logger.WARNING, "Unsupported params type in sendParameters: " + str(type(params)));
595     if(timeStamp > 0):
596     xdrPacker.pack_int(timeStamp);
597     buffer = xdrPacker.get_buffer();
598     # send this buffer to the destination, using udp datagrams
599     try:
600     self.__udpSocket.sendto(buffer, (host, port))
601     self.logger.log(Logger.DEBUG, "Packet sent");
602     except socket.error, msg:
603     self.logger.log(Logger.ERROR, "Cannot send packet to "+host+":"+str(port)+" "+passwd+": "+str(msg[1]));
604     xdrPacker.reset()
605    
606     def __packParameter(self, xdrPacker, name, value):
607     try:
608     typeValue = self.__valueTypes[type(value)]
609     xdrPacker.pack_string (name)
610     xdrPacker.pack_int (typeValue)
611     self.__packFunctions[typeValue] (xdrPacker, value)
612     self.logger.log(Logger.DEBUG, "Adding parameter "+str(name)+" = "+str(value));
613     except Exception, ex:
614     print "ApMon: error packing %s = %s; got %s" % (name, str(value), ex)
615    
616     # Destructor
617     def __del__(self):
618 corvo 1.2 self.free();
619    
620     # Decide if the current datagram should be sent.
621     # This decision is based on the number of messages previously sent.
622     def __shouldSend(self):
623     now = long(time.time());
624     if now != self.__crtTime :
625     # new time
626     # update previous counters;
627     self.__prvSent = self.__hWeight * self.__prvSent + (1.0 - self.__hWeight) * self.__crtSent / (now - self.__crtTime);
628     self.__prvTime = self.__crtTime;
629     self.logger.log(Logger.DEBUG, "previously sent: " + str(self.__crtSent) + "; dropped: " + str(self.__crtDrop));
630     # reset current counter
631     self.__crtTime = now;
632     self.__crtSent = 0;
633     self.__crtDrop = 0;
634    
635     # compute the history
636     valSent = self.__prvSent * self.__hWeight + self.__crtSent * (1 - self.__hWeight);
637    
638     doSend = True;
639    
640     # when we should start dropping messages
641     level = self.maxMsgRate - self.maxMsgRate / 10;
642    
643     if valSent > (self.maxMsgRate - level) :
644     if random.randint(0,self.maxMsgRate / 10) >= (self.maxMsgRate - valSent):
645     doSend = False;
646    
647     # counting sent and dropped messages
648     if doSend:
649     self.__crtSent+=1;
650     else:
651     self.__crtDrop+=1;
652    
653     return doSend;
654    
655 corvo 1.1
656     ################################################################################################
657     # Private variables. Don't touch
658     ################################################################################################
659    
660     __valueTypes = {
661     type("string"): 0, # XDR_STRING (see ApMon.h from C/C++ ApMon version)
662     type(1): 2, # XDR_INT32
663     type(1.0): 5}; # XDR_REAL64
664    
665     __packFunctions = {
666     0: xdrlib.Packer.pack_string,
667     2: xdrlib.Packer.pack_int,
668     5: xdrlib.Packer.pack_double }
669    
670     __defaultPort = 8884
671     __defaultLogLevel = Logger.INFO
672 corvo 1.2 __version = "2.0.6-py" # apMon version number
673 corvo 1.1