ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/apmon.py
Revision: 1.2
Committed: Tue Nov 8 16:02:39 2005 UTC (19 years, 5 months ago) by corvo
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_1_0_4, CRAB_1_0_3, CRAB_1_0_2, CRAB_0_2_2
Changes since 1.1: +108 -23 lines
Log Message:
New apmon

File Contents

# Content
1
2 """
3 * ApMon - Application Monitoring Tool
4 * Version: 2.0.4
5 *
6 * Copyright (C) 2005 California Institute of Technology
7 *
8 * Permission is hereby granted, free of charge, to use, copy and modify
9 * this software and its documentation (the "Software") for any
10 * purpose, provided that existing copyright notices are retained in
11 * all copies and that this notice is included verbatim in any distributions
12 * or substantial portions of the Software.
13 * This software is a part of the MonALISA framework (http://monalisa.cacr.caltech.edu).
14 * Users of the Software are asked to feed back problems, benefits,
15 * and/or suggestions about the software to the MonALISA Development Team
16 * (developers@monalisa.cern.ch). Support for this software - fixing of bugs,
17 * incorporation of new features - is done on a best effort basis. All bug
18 * fixes and enhancements will be made available under the same terms and
19 * conditions as the original software,
20
21 * IN NO EVENT SHALL THE AUTHORS OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
22 * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
23 * OF THE USE OF THIS SOFTWARE, ITS DOCUMENTATION, OR ANY DERIVATIVES THEREOF,
24 * EVEN IF THE AUTHORS HAVE BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
25
26 * THE AUTHORS AND DISTRIBUTORS SPECIFICALLY DISCLAIM ANY WARRANTIES,
27 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY,
28 * FITNESS FOR A PARTICULAR PURPOSE, AND NON-INFRINGEMENT. THIS SOFTWARE IS
29 * PROVIDED ON AN "AS IS" BASIS, AND THE AUTHORS AND DISTRIBUTORS HAVE NO
30 * OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR
31 * MODIFICATIONS.
32 """
33
34 """
35 apmon.py
36
37 This is a python implementation for the ApMon API for sending
38 data to the MonALISA service.
39
40 For further details about ApMon please see the C/C++ or Java documentation
41 You can find a sample usage of this module in apmTest.py.
42
43 Note that the parameters must be either integers(32 bits) or doubles(64 bits).
44 Sending strings is supported, but they will not be stored in the
45 farm's store nor shown in the farm's window in the MonALISA client.
46 """
47
48 import re
49 import xdrlib
50 import socket
51 import urllib2
52 import threading
53 import time
54 import Logger
55 import ProcInfo
56 import random
57
58 #__all__ = ["ApMon"]
59
60 #__debug = False # set this to True to be verbose
61
62 class ApMon:
63 """
64 Main class for sending monitoring data to a MonaLisa module.
65 One or more destinations can be chosen for the data. See constructor.
66
67 The data is packed in UDP datagrams, using XDR. The following fields are sent:
68 - version & password (string)
69 - cluster name (string)
70 - node name (string)
71 - number of parameters (int)
72 - for each parameter:
73 - name (string)
74 - value type (int)
75 - value
76 - optionally a (int) with the given timestamp
77
78 Attributes (public):
79 - destinations - a list containing (ip, port, password) tuples
80 - configAddresses - list with files and urls from where the config is read
81 - configRecheckInterval - period, in seconds, to check for changes
82 in the configAddresses list
83 - configRecheck - boolean - whether to recheck periodically for changes
84 in the configAddresses list
85 """
86
87 __defaultOptions = {
88 'job_monitoring': True, # perform (or not) job monitoring
89 'job_interval' : 10, # at this interval (in seconds)
90 'job_data_sent' : 0, # time from Epoch when job information was sent; don't touch!
91
92 'job_cpu_time' : True, # elapsed time from the start of this job in seconds
93 'job_run_time' : True, # processor time spent running this job in seconds
94 'job_cpu_usage' : True, # current percent of the processor used for this job, as reported by ps
95 'job_virtualmem': True, # size in JB of the virtual memory occupied by the job, as reported by ps
96 'job_rss' : True, # size in KB of the resident image size of the job, as reported by ps
97 'job_mem_usage' : True, # percent of the memory occupied by the job, as reported by ps
98 'job_workdir_size': True, # size in MB of the working directory of the job
99 'job_disk_total': True, # size in MB of the total size of the disk partition containing the working directory
100 'job_disk_used' : True, # size in MB of the used disk partition containing the working directory
101 'job_disk_free' : True, # size in MB of the free disk partition containing the working directory
102 'job_disk_usage': True, # percent of the used disk partition containing the working directory
103 'job_open_files': True, # number of open file descriptors
104
105 'sys_monitoring': True, # perform (or not) system monitoring
106 'sys_interval' : 10, # at this interval (in seconds)
107 'sys_data_sent' : 0, # time from Epoch when system information was sent; don't touch!
108
109 'sys_cpu_usr' : False, # cpu-usage information
110 'sys_cpu_sys' : False, # all these will produce coresponding paramas without "sys_"
111 'sys_cpu_nice' : False,
112 'sys_cpu_idle' : False,
113 'sys_cpu_usage' : True,
114 'sys_load1' : True, # system load information
115 'sys_load5' : True,
116 'sys_load15' : True,
117 'sys_mem_used' : False, # memory usage information
118 'sys_mem_free' : False,
119 'sys_mem_usage' : True,
120 'sys_pages_in' : False,
121 'sys_pages_out' : False,
122 'sys_swap_used' : True, # swap usage information
123 'sys_swap_free' : False,
124 'sys_swap_usage': True,
125 'sys_swap_in' : False,
126 'sys_swap_out' : False,
127 'sys_net_in' : True, # network transfer in kBps
128 'sys_net_out' : True, # these will produce params called ethX_in, ethX_out, ethX_errs
129 'sys_net_errs' : False, # for each eth interface
130 'sys_processes' : True,
131 'sys_uptime' : True, # uptime of the machine, in days (float number)
132
133 'general_info' : True, # send (or not) general host information once every 2 $sys_interval seconds
134 'general_data_sent': 0, # time from Epoch when general information was sent; don't touch!
135
136 'hostname' : True,
137 'ip' : True, # will produce ethX_ip params for each interface
138 'cpu_MHz' : True,
139 'no_CPUs' : True, # number of CPUs
140 'total_mem' : True,
141 'total_swap' : True,
142 'cpu_vendor_id' : True,
143 'cpu_family' : True,
144 'cpu_model' : True,
145 'cpu_model_name': True,
146 'bogomips' : True};
147
148 def __init__ (self, initValue):
149 """
150 Class constructor:
151 - if initValue is a string, put it in configAddresses and load destinations
152 from the file named like that. if it starts with "http://", the configuration
153 is loaded from that URL. For background monitoring, given parameters will overwrite defaults
154
155 - if initValue is a list, put its contents in configAddresses and create
156 the list of destinations from all those sources. For background monitoring,
157 given parameters will overwrite defaults (see __defaultOptions)
158
159 - if initValue is a tuple (of strings), initialize destinations with that values.
160 Strings in this tuple have this form: "{hostname|ip}[:port][ passwd]", the
161 default port being 8884 and the default password being "". Background monitoring will be
162 enabled sending the parameters active from __defaultOptions (see end of file)
163
164 - if initValue is a hash (key = string(hostname|ip[:port][ passwd]),
165 val = hash{'param_name': True/False, ...}) the given options for each destination
166 will overwrite the default parameters (see __defaultOptions)
167 """
168 self.destinations = {} # empty, by default; key = tuple (host, port, pass) ; val = hash {"param_mame" : True/False, ...}
169 self.configAddresses = [] # empty, by default; list of files/urls from where we read config
170 self.configRecheckInterval = 120 # 2 minutes
171 self.configRecheck = True # enabled by default
172 self.performBgMonitoring = True # by default, perform background monitoring
173 self.monitoredJobs = {} # Monitored jobs; key = pid; value = hash with
174 self.maxMsgRate = 50; # Maximum number of messages allowed to be sent per second
175 self.__defaultUserCluster = "ApMon_UserSend";
176 self.__defaultUserNode = socket.getfqdn();
177 self.__defaultSysMonCluster = "ApMon_SysMon";
178 self.__defaultSysMonNode = socket.getfqdn();
179 # don't touch these:
180 self.__initializedOK = True
181 self.__udpSocket = None
182 self.__configUpdateLock = threading.Lock()
183 self.__configUpdateEvent = threading.Event()
184 self.__bgMonitorLock = threading.Lock()
185 self.__bgMonitorEvent = threading.Event()
186 # don't allow a user to send more than MAX_MSG messages per second, in average
187 self.__crtTime = 0;
188 self.__prvTime = 0;
189 self.__prvSent = 0;
190 self.__prvDrop = 0;
191 self.__crtSent = 0;
192 self.__crtDrop = 0;
193 self.__hWeight = 0.92;
194
195 self.logger = Logger.Logger(self.__defaultLogLevel)
196 if type(initValue) == type("string"):
197 self.configAddresses.append(initValue)
198 self.__reloadAddresses()
199 elif type(initValue) == type([]):
200 self.configAddresses = initValue
201 self.__reloadAddresses()
202 elif type(initValue) == type(()):
203 for dest in initValue:
204 self.__addDestination (dest, self.destinations)
205 elif type(initValue) == type({}):
206 for dest, opts in initValue.items():
207 self.__addDestination (dest, self.destinations, opts)
208
209 self.__initializedOK = (len (self.destinations) > 0)
210 if not self.__initializedOK:
211 self.logger.log(Logger.ERROR, "Failed to initialize. No destination defined.");
212 #self.__defaultClusterName = None
213 #self.__defaultNodeName = self.getMyHo
214 if self.__initializedOK:
215 self.__udpSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
216 if len(self.configAddresses) > 0:
217 # if there are addresses that need to be monitored,
218 # start config checking and reloading thread
219 th = threading.Thread(target=self.__configLoader)
220 th.setDaemon(True) # this is a daemon thread
221 th.start()
222 # create the ProcInfo instance
223 self.procInfo = ProcInfo.ProcInfo(self.logger);
224 self.procInfo.update();
225 # start the background monitoring thread
226 th = threading.Thread(target=self.__bgMonitor);
227 th.setDaemon(True);
228 th.start();
229
230
231 def sendParams (self, params):
232 """
233 Send multiple parameters to MonALISA, with default (last given) cluser and node names.
234 """
235 self.sendTimedParams (-1, params)
236
237 def sendTimedParams (self, timeStamp, params):
238 """
239 Send multiple parameters, specifying the time for them, with default (last given) cluster and node names.
240 (See sendTimedParameters for more details)
241 """
242 self.sendTimedParameters (None, None, timeStamp, params);
243
244 def sendParameter (self, clusterName, nodeName, paramName, paramValue):
245 """
246 Send a single parameter to MonALISA.
247 """
248 self.sendTimedParameter(clusterName, nodeName, -1, paramName, paramValue);
249
250 def sendTimedParameter (self, clusterName, nodeName, timeStamp, paramName, paramValue):
251 """
252 Send a single parameter, with a given time.
253 """
254 self.sendTimedParameters (clusterName, nodeName, timeStamp, {paramName:paramValue})
255
256 def sendParameters (self, clusterName, nodeName, params):
257 """
258 Send multiple parameters specifying cluster and node name for them
259 """
260 self.sendTimedParameters (clusterName, nodeName, -1, params);
261
262 def sendTimedParameters (self, clusterName, nodeName, timeStamp, params):
263 """
264 Send multiple monitored parameters to MonALISA.
265
266 - clusterName is the name of the cluster being monitored. The first
267 time this function is called, this paramenter must not be None. Then,
268 it can be None; last given clusterName will be used instead.
269 - nodeName is the name of the node for which are the parameters. If this
270 is None, the full hostname of this machine will be sent instead.
271 - timeStamp, if > 0, is given time for the parameters. This is in seconds from Epoch.
272 Note that this option should be used only if you are sure about the time for the result.
273 Otherwize, the parameters will be assigned a correct time (obtained from NTP servers)
274 in MonALISA service. This option can be usefull when parsing logs, for example.
275 - params is a dictionary containing pairs with:
276 - key: parameter name
277 - value: parameter value, either int or float.
278 or params is a vector of tuples (key, value). This version can be used
279 in case you want to send the parameters in a given order.
280
281 NOTE that python doesn't know about 32-bit floats (only 64-bit floats!)
282 """
283 if not self.__initializedOK:
284 self.logger.log(Logger.WARNING, "Not initialized correctly. Message NOT sent!");
285 return
286 if clusterName == None:
287 clusterName = self.__defaultUserCluster
288 else:
289 self.__defaultUserCluster = clusterName
290 if nodeName == None:
291 nodeName = self.__defaultUserNode
292 else:
293 self.__defaultUserNode = nodeName
294 self.__configUpdateLock.acquire()
295 for dest in self.destinations.keys():
296 self.__directSendParams(dest, clusterName, nodeName, timeStamp, params);
297 self.__configUpdateLock.release()
298
299 def addJobToMonitor (self, pid, workDir, clusterName, nodeName):
300 """
301 Add a new job to monitor.
302 """
303 self.__bgMonitorLock.acquire();
304 self.monitoredJobs[pid] = {};
305 self.monitoredJobs[pid]['CLUSTER_NAME'] = clusterName;
306 self.monitoredJobs[pid]['NODE_NAME'] = nodeName;
307 self.procInfo.addJobToMonitor(pid, workDir);
308 self.__bgMonitorLock.release();
309
310 def removeJobToMonitor (self, pid):
311 """
312 Remove a job from being monitored.
313 """
314 self.__bgMonitorLock.acquire();
315 self.procInfo.removeJobToMonitor(pid);
316 del self.monitoredJobs[pid];
317 self.__bgMonitorLock.release();
318
319 def setMonitorClusterNode (self, clusterName, nodeName):
320 """
321 Set the cluster and node names where to send system related information.
322 """
323 self.__bgMonitorLock.acquire();
324 self.__defaultSysMonCluster = clusterName;
325 self.__defaultSysMonNode = nodeName;
326 self.__bgMonitorLock.release();
327
328 def enableBgMonitoring (self, onOff):
329 """
330 Enable or disable background monitoring. Note that background monitoring information
331 can still be sent if user calls the sendBgMonitoring method.
332 """
333 self.performBgMonitoring = onOff;
334
335 def sendBgMonitoring (self):
336 """
337 Send now background monitoring about system and jobs to all interested destinations.
338 """
339 self.__bgMonitorLock.acquire();
340 self.procInfo.update();
341 now = int(time.time());
342 for destination, options in self.destinations.items():
343 sysParams = [];
344 jobParams = [];
345 # for each destination and its options, check if we have to report any background monitoring data
346 if(options['sys_monitoring'] and options['sys_data_sent'] + options['sys_interval'] < now):
347 for param, active in options.items():
348 m = re.match("sys_(.+)", param);
349 if(m != None and active):
350 param = m.group(1);
351 if not (param == 'monitoring' or param == 'interval' or param == 'data_sent'):
352 sysParams.append(param)
353 options['sys_data_sent'] = now;
354 if(options['job_monitoring'] and options['job_data_sent'] + options['job_interval'] < now):
355 for param, active in options.items():
356 m = re.match("job_(.+)", param);
357 if(m != None and active):
358 param = m.group(1);
359 if not (param == 'monitoring' or param == 'interval' or param == 'data_sent'):
360 jobParams.append(param);
361 options['job_data_sent'] = now;
362 if(options['general_info'] and options['general_data_sent'] + 2 * int(options['sys_interval']) < now):
363 for param, active in options.items():
364 if not (param.startswith("sys_") or param.startswith("job_")) and active:
365 if not (param == 'general_info' or param == 'general_data_sent'):
366 sysParams.append(param);
367 sysResults = {}
368 if(len(sysParams) > 0):
369 sysResults = self.procInfo.getSystemData(sysParams);
370 if(len(sysResults.keys()) > 0):
371 self.__directSendParams(destination, self.__defaultSysMonCluster, self.__defaultSysMonNode, -1, sysResults);
372 for pid, props in self.monitoredJobs.items():
373 jobResults = {};
374 if(len(jobParams) > 0):
375 jobResults = self.procInfo.getJobData(pid, jobParams);
376 if(len(jobResults) > 0):
377 self.__directSendParams(destination, props['CLUSTER_NAME'], props['NODE_NAME'], -1, jobResults);
378 self.__bgMonitorLock.release();
379
380 def setLogLevel (self, strLevel):
381 """
382 Change the log level. Given level is a string, one of 'FATAL', 'ERROR', 'WARNING',
383 'INFO', 'NOTICE', 'DEBUG'.
384 """
385 self.logger.setLogLevel(strLevel);
386
387 def setMaxMsgRate(self, rate):
388 """
389 Set the maximum number of messages that can be sent, per second.
390 """
391 self.maxMsgRate = rate;
392 self.logger.log(Logger.DEBUG, "Setting maxMsgRate to: " + str(rate));
393
394 def free(self):
395 """
396 Stop background threands, close opened sockets. You have to use this function if you want to
397 free all the resources that ApMon takes, and allow it to be garbage-collected.
398 """
399 self.__configUpdateEvent.set();
400 self.__bgMonitorEvent.set();
401 time.sleep(0.01);
402 if self.__udpSocket != None:
403 self.logger.log(Logger.DEBUG, "Closing UDP socket on ApMon object destroy.");
404 self.__udpSocket.close();
405
406
407 #########################################################################################
408 # Internal functions - Config reloader thread
409 #########################################################################################
410
411 def __configLoader(self):
412 """
413 Main loop of the thread that checks for changes and reloads the configuration
414 """
415 while not self.__configUpdateEvent.isSet():
416 self.__configUpdateEvent.wait(self.configRecheckInterval);
417 if self.__configUpdateEvent.isSet():
418 return;
419 if self.configRecheck:
420 self.__reloadAddresses()
421 self.logger.log(Logger.DEBUG, "Config reloaded. Seleeping for "+`self.configRecheckInterval`+" sec.");
422
423 def __reloadAddresses(self):
424 """
425 Refresh destinations hash, by loading data from all sources in configAddresses
426 """
427 newDestinations = {}
428 for src in self.configAddresses:
429 self.__initializeFromFile(src, newDestinations)
430 # avoid changing config in the middle of sending packets to previous destinations
431 self.__configUpdateLock.acquire()
432 self.destinations = newDestinations
433 self.__configUpdateLock.release()
434
435 def __addDestination (self, aDestination, tempDestinations, options = __defaultOptions):
436 """
437 Add a destination to the list.
438
439 aDestination is a string of the form "{hostname|ip}[:port] [passwd]" without quotes.
440 If the port is not given, it will be used the default port (8884)
441 If the password is missing, it will be considered an empty string
442 """
443 aDestination = aDestination.strip().replace('\t', ' ')
444 while aDestination != aDestination.replace(' ', ' '):
445 aDestination = aDestination.replace(' ', ' ')
446 sepPort = aDestination.find (':')
447 sepPasswd = aDestination.rfind (' ')
448 if sepPort >= 0:
449 host = aDestination[0:sepPort].strip()
450 if sepPasswd > sepPort + 1:
451 port = aDestination[sepPort+1:sepPasswd].strip()
452 passwd = aDestination[sepPasswd:].strip()
453 else:
454 port = aDestination[sepPort+1:].strip()
455 passwd = ""
456 else:
457 port = str(self.__defaultPort)
458 if sepPasswd >= 0:
459 host = aDestination[0:sepPasswd].strip()
460 passwd = aDestination[sepPasswd:].strip()
461 else:
462 host = aDestination.strip()
463 passwd = ""
464 if (not port.isdigit()):
465 self.logger.log(Logger.WARNING, "Bad value for port number "+`port`+" in "+aDestination+" destination");
466 return
467 alreadyAdded = False
468 port = int(port)
469 host = socket.gethostbyname(host) # convert hostnames to IP addresses to avoid suffocating DNSs
470 for h, p, w in tempDestinations.keys():
471 if (h == host) and (p == port):
472 alreadyAdded = True
473 break
474 if not alreadyAdded:
475 self.logger.log(Logger.INFO, "Adding destination "+host+':'+`port`+' '+passwd);
476 tempDestinations[(host, port, passwd)] = self.__defaultOptions;
477 if options != self.__defaultOptions:
478 # we have to overwrite defaults with given options
479 for key, value in options.items():
480 self.logger.log(Logger.NOTICE, "Overwritting option: "+key+" = "+`value`);
481 tempDestinations[(host, port, passwd)][key] = value;
482 else:
483 self.logger.log(Logger.NOTICE, "Destination "+host+":"+str(port)+" "+passwd+" already added. Skipping it");
484
485 def __initializeFromFile (self, confFileName, tempDestinations):
486 """
487 Load destinations from confFileName file. If it's an URL (starts with "http://")
488 load configuration from there. Put all destinations in tempDestinations hash.
489
490 Calls addDestination for each line that doesn't start with # and
491 has non-whitespace characters on it
492 """
493 try:
494 if confFileName.find ("http://") == 0:
495 confFile = urllib2.urlopen (confFileName)
496 else:
497 confFile = open (confFileName)
498 except urllib2.HTTPError, e:
499 self.logger.log(Logger.ERROR, "Cannot open "+confFileName);
500 if e.code == 401:
501 self.logger.log(Logger.ERROR, 'HTTPError: not authorized.');
502 elif e.code == 404:
503 self.logger.log(Logger.ERROR, 'HTTPError: not found.');
504 elif e.code == 503:
505 self.logger.log(Logger.ERROR, 'HTTPError: service unavailable.');
506 else:
507 self.logger.log(Logger.ERROR, 'HTTPError: unknown error.');
508 return
509 except urllib2.URLError, ex:
510 self.logger.log(Logger.ERROR, "Cannot open "+confFileName);
511 self.logger.log(Logger.ERROR, "URL Error: "+str(ex.reason[1]));
512 return
513 except IOError, ex:
514 self.logger.log(Logger.ERROR, "Cannot open "+confFileName);
515 self.logger.log(Logger.ERROR, "IOError: "+str(ex));
516 return
517 self.logger.log(Logger.INFO, "Adding destinations from "+confFileName);
518 dests = []
519 opts = {}
520 while(True):
521 line = confFile.readline();
522 if line == '':
523 break;
524 line = line.strip()
525 self.logger.log(Logger.DEBUG, "Reading line "+line);
526 if (len(line) == 0) or (line[0] == '#'):
527 continue
528 elif line.startswith("xApMon_"):
529 m = re.match("xApMon_(.*)", line);
530 if m != None:
531 m = re.match("(\S+)\s*=\s*(\S+)", m.group(1));
532 if m != None:
533 param = m.group(1); value = m.group(2);
534 if(value.upper() == "ON"):
535 value = True;
536 elif(value.upper() == "OFF"):
537 value = False;
538 elif(param.endswith("_interval")):
539 value = int(value);
540 if param == "loglevel":
541 self.logger.setLogLevel(value);
542 elif param == "maxMsgRate":
543 self.setMaxMsgRate(int(value));
544 elif param == "conf_recheck":
545 self.configRecheck = value;
546 elif param == "recheck_interval":
547 self.configRecheckInterval = value;
548 elif param.endswith("_data_sent"):
549 pass; # don't reset time in sys/job/general/_data_sent
550 else:
551 opts[param] = value;
552 else:
553 dests.append(line);
554 confFile.close ()
555 for line in dests:
556 self.__addDestination(line, tempDestinations, opts)
557
558 ###############################################################################################
559 # Internal functions - Background monitor thread
560 ###############################################################################################
561
562 def __bgMonitor (self):
563 while not self.__bgMonitorEvent.isSet():
564 self.__bgMonitorEvent.wait(10);
565 if self.__bgMonitorEvent.isSet():
566 return;
567 if self.performBgMonitoring:
568 self.sendBgMonitoring();
569
570 ###############################################################################################
571 # Internal helper functions
572 ###############################################################################################
573
574 def __directSendParams (self, destination, clusterName, nodeName, timeStamp, params):
575
576 if self.__shouldSend() == False:
577 return;
578
579 xdrPacker = xdrlib.Packer ()
580 host, port, passwd = destination
581 self.logger.log(Logger.DEBUG, "Building XDR packet for ["+str(clusterName)+"] <"+str(nodeName)+"> len:"+str(len(params)));
582 xdrPacker.pack_string ("v:"+self.__version+"p:"+passwd)
583 xdrPacker.pack_string (clusterName)
584 xdrPacker.pack_string (nodeName)
585 xdrPacker.pack_int (len(params))
586 if type(params) == type( {} ):
587 for name, value in params.iteritems():
588 self.__packParameter(xdrPacker, name, value)
589 elif type(params) == type( [] ):
590 for name, value in params:
591 self.logger.log(Logger.DEBUG, "Adding parameter "+name+" = "+str(value));
592 self.__packParameter(xdrPacker, name, value)
593 else:
594 self.logger.log(Logger.WARNING, "Unsupported params type in sendParameters: " + str(type(params)));
595 if(timeStamp > 0):
596 xdrPacker.pack_int(timeStamp);
597 buffer = xdrPacker.get_buffer();
598 # send this buffer to the destination, using udp datagrams
599 try:
600 self.__udpSocket.sendto(buffer, (host, port))
601 self.logger.log(Logger.DEBUG, "Packet sent");
602 except socket.error, msg:
603 self.logger.log(Logger.ERROR, "Cannot send packet to "+host+":"+str(port)+" "+passwd+": "+str(msg[1]));
604 xdrPacker.reset()
605
606 def __packParameter(self, xdrPacker, name, value):
607 try:
608 typeValue = self.__valueTypes[type(value)]
609 xdrPacker.pack_string (name)
610 xdrPacker.pack_int (typeValue)
611 self.__packFunctions[typeValue] (xdrPacker, value)
612 self.logger.log(Logger.DEBUG, "Adding parameter "+str(name)+" = "+str(value));
613 except Exception, ex:
614 print "ApMon: error packing %s = %s; got %s" % (name, str(value), ex)
615
616 # Destructor
617 def __del__(self):
618 self.free();
619
620 # Decide if the current datagram should be sent.
621 # This decision is based on the number of messages previously sent.
622 def __shouldSend(self):
623 now = long(time.time());
624 if now != self.__crtTime :
625 # new time
626 # update previous counters;
627 self.__prvSent = self.__hWeight * self.__prvSent + (1.0 - self.__hWeight) * self.__crtSent / (now - self.__crtTime);
628 self.__prvTime = self.__crtTime;
629 self.logger.log(Logger.DEBUG, "previously sent: " + str(self.__crtSent) + "; dropped: " + str(self.__crtDrop));
630 # reset current counter
631 self.__crtTime = now;
632 self.__crtSent = 0;
633 self.__crtDrop = 0;
634
635 # compute the history
636 valSent = self.__prvSent * self.__hWeight + self.__crtSent * (1 - self.__hWeight);
637
638 doSend = True;
639
640 # when we should start dropping messages
641 level = self.maxMsgRate - self.maxMsgRate / 10;
642
643 if valSent > (self.maxMsgRate - level) :
644 if random.randint(0,self.maxMsgRate / 10) >= (self.maxMsgRate - valSent):
645 doSend = False;
646
647 # counting sent and dropped messages
648 if doSend:
649 self.__crtSent+=1;
650 else:
651 self.__crtDrop+=1;
652
653 return doSend;
654
655
656 ################################################################################################
657 # Private variables. Don't touch
658 ################################################################################################
659
660 __valueTypes = {
661 type("string"): 0, # XDR_STRING (see ApMon.h from C/C++ ApMon version)
662 type(1): 2, # XDR_INT32
663 type(1.0): 5}; # XDR_REAL64
664
665 __packFunctions = {
666 0: xdrlib.Packer.pack_string,
667 2: xdrlib.Packer.pack_int,
668 5: xdrlib.Packer.pack_double }
669
670 __defaultPort = 8884
671 __defaultLogLevel = Logger.INFO
672 __version = "2.0.6-py" # apMon version number
673