1 |
|
2 |
"""
|
3 |
* ApMon - Application Monitoring Tool
|
4 |
* Version: 2.2.2
|
5 |
*
|
6 |
* Copyright (C) 2006 California Institute of Technology
|
7 |
*
|
8 |
* Permission is hereby granted, free of charge, to use, copy and modify
|
9 |
* this software and its documentation (the "Software") for any
|
10 |
* purpose, provided that existing copyright notices are retained in
|
11 |
* all copies and that this notice is included verbatim in any distributions
|
12 |
* or substantial portions of the Software.
|
13 |
* This software is a part of the MonALISA framework (http://monalisa.cacr.caltech.edu).
|
14 |
* Users of the Software are asked to feed back problems, benefits,
|
15 |
* and/or suggestions about the software to the MonALISA Development Team
|
16 |
* (developers@monalisa.cern.ch). Support for this software - fixing of bugs,
|
17 |
* incorporation of new features - is done on a best effort basis. All bug
|
18 |
* fixes and enhancements will be made available under the same terms and
|
19 |
* conditions as the original software,
|
20 |
|
21 |
* IN NO EVENT SHALL THE AUTHORS OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
|
22 |
* DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
|
23 |
* OF THE USE OF THIS SOFTWARE, ITS DOCUMENTATION, OR ANY DERIVATIVES THEREOF,
|
24 |
* EVEN IF THE AUTHORS HAVE BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
25 |
|
26 |
* THE AUTHORS AND DISTRIBUTORS SPECIFICALLY DISCLAIM ANY WARRANTIES,
|
27 |
* INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY,
|
28 |
* FITNESS FOR A PARTICULAR PURPOSE, AND NON-INFRINGEMENT. THIS SOFTWARE IS
|
29 |
* PROVIDED ON AN "AS IS" BASIS, AND THE AUTHORS AND DISTRIBUTORS HAVE NO
|
30 |
* OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR
|
31 |
* MODIFICATIONS.
|
32 |
"""
|
33 |
|
34 |
"""
|
35 |
apmon.py
|
36 |
|
37 |
This is a python implementation for the ApMon API for sending
|
38 |
data to the MonALISA service.
|
39 |
|
40 |
For further details about ApMon please see the C/C++ or Java documentation
|
41 |
You can find a sample usage of this module in apmTest.py.
|
42 |
|
43 |
Note that the parameters must be either integers(32 bits) or doubles(64 bits).
|
44 |
Sending strings is supported, but they will not be stored in the
|
45 |
farm's store nor shown in the farm's window in the MonALISA client.
|
46 |
"""
|
47 |
|
48 |
import re
|
49 |
import xdrlib
|
50 |
import socket
|
51 |
import urllib2
|
52 |
import threading
|
53 |
import time
|
54 |
import Logger
|
55 |
import ProcInfo
|
56 |
import random
|
57 |
import copy
|
58 |
|
59 |
#__all__ = ["ApMon"]
|
60 |
|
61 |
#__debug = False # self.destPrevData[destination];set this to True to be verbose
|
62 |
|
63 |
class ApMon:
|
64 |
"""
|
65 |
Main class for sending monitoring data to a MonaLisa module.
|
66 |
One or more destinations can be chosen for the data. See constructor.
|
67 |
|
68 |
The data is packed in UDP datagrams, using XDR. The following fields are sent:
|
69 |
- version & password (string)
|
70 |
- cluster name (string)
|
71 |
- node name (string)
|
72 |
- number of parameters (int)
|
73 |
- for each parameter:
|
74 |
- name (string)
|
75 |
- value type (int)
|
76 |
- value
|
77 |
- optionally a (int) with the given timestamp
|
78 |
|
79 |
Attributes (public):
|
80 |
- destinations - a list containing (ip, port, password) tuples
|
81 |
- configAddresses - list with files and urls from where the config is read
|
82 |
- configRecheckInterval - period, in seconds, to check for changes
|
83 |
in the configAddresses list
|
84 |
- configRecheck - boolean - whether to recheck periodically for changes
|
85 |
in the configAddresses list
|
86 |
"""
|
87 |
|
88 |
__defaultOptions = {
|
89 |
'job_monitoring': True, # perform (or not) job monitoring
|
90 |
'job_interval' : 10, # at this interval (in seconds)
|
91 |
'job_data_sent' : 0, # time from Epoch when job information was sent; don't touch!
|
92 |
|
93 |
'job_cpu_time' : True, # elapsed time from the start of this job in seconds
|
94 |
'job_run_time' : True, # processor time spent running this job in seconds
|
95 |
'job_cpu_usage' : True, # current percent of the processor used for this job, as reported by ps
|
96 |
'job_virtualmem': True, # size in JB of the virtual memory occupied by the job, as reported by ps
|
97 |
'job_rss' : True, # size in KB of the resident image size of the job, as reported by ps
|
98 |
'job_mem_usage' : True, # percent of the memory occupied by the job, as reported by ps
|
99 |
'job_workdir_size': True, # size in MB of the working directory of the job
|
100 |
'job_disk_total': True, # size in MB of the total size of the disk partition containing the working directory
|
101 |
'job_disk_used' : True, # size in MB of the used disk partition containing the working directory
|
102 |
'job_disk_free' : True, # size in MB of the free disk partition containing the working directory
|
103 |
'job_disk_usage': True, # percent of the used disk partition containing the working directory
|
104 |
'job_open_files': True, # number of open file descriptors
|
105 |
|
106 |
'sys_monitoring': True, # perform (or not) system monitoring
|
107 |
'sys_interval' : 10, # at this interval (in seconds)
|
108 |
'sys_data_sent' : 0, # time from Epoch when system information was sent; don't touch!
|
109 |
|
110 |
'sys_cpu_usr' : False, # cpu-usage information
|
111 |
'sys_cpu_sys' : False, # all these will produce coresponding paramas without "sys_"
|
112 |
'sys_cpu_nice' : False,
|
113 |
'sys_cpu_idle' : False,
|
114 |
'sys_cpu_usage' : True,
|
115 |
'sys_load1' : True, # system load information
|
116 |
'sys_load5' : True,
|
117 |
'sys_load15' : True,
|
118 |
'sys_mem_used' : False, # memory usage information
|
119 |
'sys_mem_free' : False,
|
120 |
'sys_mem_usage' : True,
|
121 |
'sys_pages_in' : False,
|
122 |
'sys_pages_out' : False,
|
123 |
'sys_swap_used' : True, # swap usage information
|
124 |
'sys_swap_free' : False,
|
125 |
'sys_swap_usage': True,
|
126 |
'sys_swap_in' : False,
|
127 |
'sys_swap_out' : False,
|
128 |
'sys_net_in' : True, # network transfer in kBps
|
129 |
'sys_net_out' : True, # these will produce params called ethX_in, ethX_out, ethX_errs
|
130 |
'sys_net_errs' : False, # for each eth interface
|
131 |
'sys_net_sockets' : True, # number of opened sockets for each proto => sockets_tcp/udp/unix ...
|
132 |
'sys_net_tcp_details' : True, # number of tcp sockets in each state => sockets_tcp_LISTEN, ...
|
133 |
'sys_processes' : True,
|
134 |
'sys_uptime' : True, # uptime of the machine, in days (float number)
|
135 |
|
136 |
'general_info' : True, # send (or not) general host information once every 2 $sys_interval seconds
|
137 |
'general_data_sent': 0, # time from Epoch when general information was sent; don't touch!
|
138 |
|
139 |
'hostname' : True,
|
140 |
'ip' : True, # will produce ethX_ip params for each interface
|
141 |
'cpu_MHz' : True,
|
142 |
'no_CPUs' : True, # number of CPUs
|
143 |
'total_mem' : True,
|
144 |
'total_swap' : True,
|
145 |
'cpu_vendor_id' : True,
|
146 |
'cpu_family' : True,
|
147 |
'cpu_model' : True,
|
148 |
'cpu_model_name': True,
|
149 |
'bogomips' : True};
|
150 |
|
151 |
def __init__ (self, initValue, defaultLogLevel = Logger.INFO):
|
152 |
"""
|
153 |
Class constructor:
|
154 |
- if initValue is a string, put it in configAddresses and load destinations
|
155 |
from the file named like that. if it starts with "http://", the configuration
|
156 |
is loaded from that URL. For background monitoring, given parameters will overwrite defaults
|
157 |
|
158 |
- if initValue is a list, put its contents in configAddresses and create
|
159 |
the list of destinations from all those sources. For background monitoring,
|
160 |
given parameters will overwrite defaults (see __defaultOptions)
|
161 |
|
162 |
- if initValue is a tuple (of strings), initialize destinations with that values.
|
163 |
Strings in this tuple have this form: "{hostname|ip}[:port][ passwd]", the
|
164 |
default port being 8884 and the default password being "". Background monitoring will be
|
165 |
enabled sending the parameters active from __defaultOptions (see end of file)
|
166 |
|
167 |
- if initValue is a hash (key = string(hostname|ip[:port][ passwd]),
|
168 |
val = hash{'param_name': True/False, ...}) the given options for each destination
|
169 |
will overwrite the default parameters (see __defaultOptions)
|
170 |
"""
|
171 |
self.destinations = {} # empty, by default; key = tuple (host, port, pass) ; val = hash {"param_mame" : True/False, ...}
|
172 |
self.destPrevData = {} # empty, by defaul; key = tuple (host, port, pass) ; val = hash {"param_mame" : value, ...}
|
173 |
self.configAddresses = [] # empty, by default; list of files/urls from where we read config
|
174 |
self.configRecheckInterval = 120 # 2 minutes
|
175 |
self.configRecheck = True # enabled by default
|
176 |
self.performBgMonitoring = True # by default, perform background monitoring
|
177 |
self.monitoredJobs = {} # Monitored jobs; key = pid; value = hash with
|
178 |
self.maxMsgRate = 20; # Maximum number of messages allowed to be sent per second
|
179 |
self.sender = {'INSTANCE_ID': random.randint(0,0x7FFFFFFE), 'SEQ_NR': 0};
|
180 |
self.__defaultUserCluster = "ApMon_UserSend";
|
181 |
self.__defaultUserNode = socket.getfqdn();
|
182 |
self.__defaultSysMonCluster = "ApMon_SysMon";
|
183 |
self.__defaultSysMonNode = socket.getfqdn();
|
184 |
# don't touch these:
|
185 |
self.__freed = False
|
186 |
self.__initializedOK = True
|
187 |
self.__udpSocket = None
|
188 |
self.__configUpdateLock = threading.Lock()
|
189 |
self.__configUpdateEvent = threading.Event()
|
190 |
self.__configUpdateFinished = threading.Event()
|
191 |
self.__bgMonitorLock = threading.Lock()
|
192 |
self.__bgMonitorEvent = threading.Event()
|
193 |
self.__bgMonitorFinished = threading.Event()
|
194 |
# don't allow a user to send more than MAX_MSG messages per second, in average
|
195 |
self.__crtTime = 0;
|
196 |
self.__prvTime = 0;
|
197 |
self.__prvSent = 0;
|
198 |
self.__prvDrop = 0;
|
199 |
self.__crtSent = 0;
|
200 |
self.__crtDrop = 0;
|
201 |
self.__hWeight = 0.92;
|
202 |
self.logger = Logger.Logger(defaultLogLevel)
|
203 |
if type(initValue) == type("string"):
|
204 |
self.configAddresses.append(initValue)
|
205 |
self.__reloadAddresses()
|
206 |
elif type(initValue) == type([]):
|
207 |
self.configAddresses = initValue
|
208 |
self.__reloadAddresses()
|
209 |
elif type(initValue) == type(()):
|
210 |
for dest in initValue:
|
211 |
self.__addDestination (dest, self.destinations)
|
212 |
elif type(initValue) == type({}):
|
213 |
for dest, opts in initValue.items():
|
214 |
self.__addDestination (dest, self.destinations, opts)
|
215 |
|
216 |
self.__initializedOK = (len (self.destinations) > 0)
|
217 |
if not self.__initializedOK:
|
218 |
self.logger.log(Logger.ERROR, "Failed to initialize. No destination defined.");
|
219 |
# self.__defaultClusterName = None
|
220 |
# self.__defaultNodeName = self.getMyHo
|
221 |
if self.__initializedOK:
|
222 |
self.__udpSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
223 |
if len(self.configAddresses) > 0:
|
224 |
# if there are addresses that need to be monitored,
|
225 |
# start config checking and reloading thread
|
226 |
th = threading.Thread(target=self.__configLoader)
|
227 |
th.setDaemon(True) # this is a daemon thread
|
228 |
th.start()
|
229 |
# create the ProcInfo instance
|
230 |
self.procInfo = ProcInfo.ProcInfo(self.logger);
|
231 |
# self.procInfo.update();
|
232 |
# start the background monitoring thread
|
233 |
th = threading.Thread(target=self.__bgMonitor);
|
234 |
th.setDaemon(True);
|
235 |
th.start();
|
236 |
|
237 |
|
238 |
def sendParams (self, params):
|
239 |
"""
|
240 |
Send multiple parameters to MonALISA, with default (last given) cluser and node names.
|
241 |
"""
|
242 |
self.sendTimedParams (-1, params)
|
243 |
|
244 |
def sendTimedParams (self, timeStamp, params):
|
245 |
"""
|
246 |
Send multiple parameters, specifying the time for them, with default (last given) cluster and node names.
|
247 |
(See sendTimedParameters for more details)
|
248 |
"""
|
249 |
self.sendTimedParameters (None, None, timeStamp, params);
|
250 |
|
251 |
def sendParameter (self, clusterName, nodeName, paramName, paramValue):
|
252 |
"""
|
253 |
Send a single parameter to MonALISA.
|
254 |
"""
|
255 |
self.sendTimedParameter(clusterName, nodeName, -1, paramName, paramValue);
|
256 |
|
257 |
def sendTimedParameter (self, clusterName, nodeName, timeStamp, paramName, paramValue):
|
258 |
"""
|
259 |
Send a single parameter, with a given time.
|
260 |
"""
|
261 |
self.sendTimedParameters (clusterName, nodeName, timeStamp, {paramName:paramValue})
|
262 |
|
263 |
def sendParameters (self, clusterName, nodeName, params):
|
264 |
"""
|
265 |
Send multiple parameters specifying cluster and node name for them
|
266 |
"""
|
267 |
self.sendTimedParameters (clusterName, nodeName, -1, params);
|
268 |
|
269 |
def sendTimedParameters (self, clusterName, nodeName, timeStamp, params):
|
270 |
"""
|
271 |
Send multiple monitored parameters to MonALISA.
|
272 |
|
273 |
- clusterName is the name of the cluster being monitored. The first
|
274 |
time this function is called, this paramenter must not be None. Then,
|
275 |
it can be None; last given clusterName will be used instead.
|
276 |
- nodeName is the name of the node for which are the parameters. If this
|
277 |
is None, the full hostname of this machine will be sent instead.
|
278 |
- timeStamp, if > 0, is given time for the parameters. This is in seconds from Epoch.
|
279 |
Note that this option should be used only if you are sure about the time for the result.
|
280 |
Otherwize, the parameters will be assigned a correct time (obtained from NTP servers)
|
281 |
in MonALISA service. This option can be usefull when parsing logs, for example.
|
282 |
- params is a dictionary containing pairs with:
|
283 |
- key: parameter name
|
284 |
- value: parameter value, either int or float.
|
285 |
or params is a vector of tuples (key, value). This version can be used
|
286 |
in case you want to send the parameters in a given order.
|
287 |
|
288 |
NOTE that python doesn't know about 32-bit floats (only 64-bit floats!)
|
289 |
"""
|
290 |
if not self.__initializedOK:
|
291 |
self.logger.log(Logger.WARNING, "Not initialized correctly. Message NOT sent!");
|
292 |
return
|
293 |
if (clusterName == None) or (clusterName == ""):
|
294 |
clusterName = self.__defaultUserCluster
|
295 |
else:
|
296 |
self.__defaultUserCluster = clusterName
|
297 |
if nodeName == None:
|
298 |
nodeName = self.__defaultUserNode
|
299 |
else:
|
300 |
self.__defaultUserNode = nodeName
|
301 |
self.__configUpdateLock.acquire();
|
302 |
for dest in self.destinations.keys():
|
303 |
self.__directSendParams(self.sender, dest, clusterName, nodeName, timeStamp, params);
|
304 |
self.__configUpdateLock.release();
|
305 |
|
306 |
def addJobToMonitor (self, pid, workDir, clusterName, nodeName):
|
307 |
"""
|
308 |
Add a new job to monitor.
|
309 |
"""
|
310 |
self.__bgMonitorLock.acquire();
|
311 |
self.monitoredJobs[pid] = {};
|
312 |
self.monitoredJobs[pid]['CLUSTER_NAME'] = clusterName;
|
313 |
self.monitoredJobs[pid]['NODE_NAME'] = nodeName;
|
314 |
self.procInfo.addJobToMonitor(pid, workDir);
|
315 |
self.__bgMonitorLock.release();
|
316 |
|
317 |
def removeJobToMonitor (self, pid):
|
318 |
"""
|
319 |
Remove a job from being monitored.
|
320 |
"""
|
321 |
self.__bgMonitorLock.acquire();
|
322 |
self.procInfo.removeJobToMonitor(pid);
|
323 |
del self.monitoredJobs[pid];
|
324 |
self.__bgMonitorLock.release();
|
325 |
|
326 |
def setMonitorClusterNode (self, clusterName, nodeName):
|
327 |
"""
|
328 |
Set the cluster and node names where to send system related information.
|
329 |
"""
|
330 |
self.__bgMonitorLock.acquire();
|
331 |
if (clusterName != None) and (clusterName != ""):
|
332 |
self.__defaultSysMonCluster = clusterName;
|
333 |
if (nodeName != None) and (nodeName != ""):
|
334 |
self.__defaultSysMonNode = nodeName;
|
335 |
self.__bgMonitorLock.release();
|
336 |
|
337 |
def enableBgMonitoring (self, onOff):
|
338 |
"""
|
339 |
Enable or disable background monitoring. Note that background monitoring information
|
340 |
can still be sent if user calls the sendBgMonitoring method.
|
341 |
"""
|
342 |
self.performBgMonitoring = onOff;
|
343 |
|
344 |
def sendBgMonitoring (self, mustSend = False):
|
345 |
"""
|
346 |
Send background monitoring about system and jobs to all interested destinations.
|
347 |
If mustSend == True, the information is sent regardles of the elapsed time since last sent
|
348 |
If mustSend == False, the data is sent only if the required interval has passed since last sent
|
349 |
"""
|
350 |
self.__bgMonitorLock.acquire();
|
351 |
now = int(time.time());
|
352 |
updatedProcInfo = False;
|
353 |
for destination, options in self.destinations.iteritems():
|
354 |
sysParams = [];
|
355 |
jobParams = [];
|
356 |
prevRawData = self.destPrevData[destination];
|
357 |
# for each destination and its options, check if we have to report any background monitoring data
|
358 |
if(options['sys_monitoring'] and (mustSend or options['sys_data_sent'] + options['sys_interval'] <= now)):
|
359 |
for param, active in options.items():
|
360 |
m = re.match("sys_(.+)", param);
|
361 |
if(m != None and active):
|
362 |
param = m.group(1);
|
363 |
if not (param == 'monitoring' or param == 'interval' or param == 'data_sent'):
|
364 |
sysParams.append(param)
|
365 |
options['sys_data_sent'] = now;
|
366 |
if(options['job_monitoring'] and (mustSend or options['job_data_sent'] + options['job_interval'] <= now)):
|
367 |
for param, active in options.items():
|
368 |
m = re.match("job_(.+)", param);
|
369 |
if(m != None and active):
|
370 |
param = m.group(1);
|
371 |
if not (param == 'monitoring' or param == 'interval' or param == 'data_sent'):
|
372 |
jobParams.append(param);
|
373 |
options['job_data_sent'] = now;
|
374 |
if(options['general_info'] and (mustSend or options['general_data_sent'] + 2 * int(options['sys_interval']) <= now)):
|
375 |
for param, active in options.items():
|
376 |
if not (param.startswith("sys_") or param.startswith("job_")) and active:
|
377 |
if not (param == 'general_info' or param == 'general_data_sent'):
|
378 |
sysParams.append(param);
|
379 |
options['general_data_sent'] = now;
|
380 |
|
381 |
if (not updatedProcInfo) and ((len(sysParams) > 0) or (len(jobParams) > 0) ):
|
382 |
self.procInfo.update();
|
383 |
updatedProcInf = True;
|
384 |
|
385 |
sysResults = {}
|
386 |
if(len(sysParams) > 0):
|
387 |
sysResults = self.procInfo.getSystemData(sysParams, prevRawData);
|
388 |
self.updateLastSentTime(options, self.destinations[destination]);
|
389 |
if( (type(sysResults) == type( {} )) and len(sysResults.keys()) > 0):
|
390 |
self.__directSendParams(self.sender, destination, self.__defaultSysMonCluster, self.__defaultSysMonNode, -1, sysResults);
|
391 |
if( (type(sysResults) == type( [] )) and len(sysResults) > 0):
|
392 |
self.__directSendParams(self.sender, destination, self.__defaultSysMonCluster, self.__defaultSysMonNode, -1, sysResults);
|
393 |
for pid, props in self.monitoredJobs.items():
|
394 |
jobResults = {};
|
395 |
if(len(jobParams) > 0):
|
396 |
jobResults = self.procInfo.getJobData(pid, jobParams);
|
397 |
if(len(jobResults) > 0):
|
398 |
self.__directSendParams(self.sender, destination, props['CLUSTER_NAME'], props['NODE_NAME'], -1, jobResults);
|
399 |
self.__bgMonitorLock.release();
|
400 |
|
401 |
# copy the time when last data was sent
|
402 |
def updateLastSentTime(self, srcOpts, dstOpts):
|
403 |
if srcOpts.has_key('general_data_sent'):
|
404 |
dstOpts['general_data_sent'] = srcOpts['general_data_sent'];
|
405 |
if srcOpts.has_key('sys_data_sent'):
|
406 |
dstOpts['sys_data_sent'] = srcOpts['sys_data_sent'];
|
407 |
if srcOpts.has_key('job_data_sent'):
|
408 |
dstOpts['job_data_sent'] = srcOpts['job_data_sent'];
|
409 |
|
410 |
def setLogLevel (self, strLevel):
|
411 |
"""
|
412 |
Change the log level. Given level is a string, one of 'FATAL', 'ERROR', 'WARNING',
|
413 |
'INFO', 'NOTICE', 'DEBUG'.
|
414 |
"""
|
415 |
self.logger.setLogLevel(strLevel);
|
416 |
|
417 |
def setMaxMsgRate(self, rate):
|
418 |
"""
|
419 |
Set the maximum number of messages that can be sent, per second.
|
420 |
"""
|
421 |
self.maxMsgRate = rate;
|
422 |
self.logger.log(Logger.DEBUG, "Setting maxMsgRate to: " + str(rate));
|
423 |
|
424 |
def free(self):
|
425 |
"""
|
426 |
Stop background threands, close opened sockets. You have to use this function if you want to
|
427 |
free all the resources that ApMon takes, and allow it to be garbage-collected.
|
428 |
"""
|
429 |
if len(self.configAddresses) > 0:
|
430 |
self.__configUpdateEvent.set()
|
431 |
self.__configUpdateFinished.wait()
|
432 |
self.__bgMonitorEvent.set()
|
433 |
self.__bgMonitorFinished.wait()
|
434 |
|
435 |
if self.__udpSocket != None:
|
436 |
self.logger.log(Logger.DEBUG, "Closing UDP socket on ApMon object destroy.");
|
437 |
self.__udpSocket.close();
|
438 |
self.__udpSocket = None;
|
439 |
self.__freed = True
|
440 |
|
441 |
|
442 |
#########################################################################################
|
443 |
# Internal functions - Config reloader thread
|
444 |
#########################################################################################
|
445 |
|
446 |
def __configLoader(self):
|
447 |
"""
|
448 |
Main loop of the thread that checks for changes and reloads the configuration
|
449 |
"""
|
450 |
while not self.__configUpdateEvent.isSet():
|
451 |
self.__configUpdateEvent.wait(self.configRecheckInterval);
|
452 |
if self.__configUpdateEvent.isSet():
|
453 |
break
|
454 |
if self.configRecheck:
|
455 |
self.__reloadAddresses()
|
456 |
self.logger.log(Logger.DEBUG, "Config reloaded. Seleeping for "+`self.configRecheckInterval`+" sec.");
|
457 |
self.__configUpdateFinished.set();
|
458 |
|
459 |
def __reloadAddresses(self):
|
460 |
"""
|
461 |
Refresh destinations hash, by loading data from all sources in configAddresses
|
462 |
"""
|
463 |
newDestinations = {}
|
464 |
for src in self.configAddresses:
|
465 |
self.__initializeFromFile(src, newDestinations)
|
466 |
# avoid changing config in the middle of sending packets to previous destinations
|
467 |
self.__configUpdateLock.acquire()
|
468 |
self.destinations = newDestinations
|
469 |
self.__configUpdateLock.release()
|
470 |
|
471 |
def __addDestination (self, aDestination, tempDestinations, options = __defaultOptions):
|
472 |
"""
|
473 |
Add a destination to the list.
|
474 |
|
475 |
aDestination is a string of the form "{hostname|ip}[:port] [passwd]" without quotes.
|
476 |
If the port is not given, it will be used the default port (8884)
|
477 |
If the password is missing, it will be considered an empty string
|
478 |
"""
|
479 |
aDestination = aDestination.strip().replace('\t', ' ')
|
480 |
while aDestination != aDestination.replace(' ', ' '):
|
481 |
aDestination = aDestination.replace(' ', ' ')
|
482 |
sepPort = aDestination.find (':')
|
483 |
sepPasswd = aDestination.rfind (' ')
|
484 |
if sepPort >= 0:
|
485 |
host = aDestination[0:sepPort].strip()
|
486 |
if sepPasswd > sepPort + 1:
|
487 |
port = aDestination[sepPort+1:sepPasswd].strip()
|
488 |
passwd = aDestination[sepPasswd:].strip()
|
489 |
else:
|
490 |
port = aDestination[sepPort+1:].strip()
|
491 |
passwd = ""
|
492 |
else:
|
493 |
port = str(self.__defaultPort)
|
494 |
if sepPasswd >= 0:
|
495 |
host = aDestination[0:sepPasswd].strip()
|
496 |
passwd = aDestination[sepPasswd:].strip()
|
497 |
else:
|
498 |
host = aDestination.strip()
|
499 |
passwd = ""
|
500 |
if (not port.isdigit()):
|
501 |
self.logger.log(Logger.WARNING, "Bad value for port number "+`port`+" in "+aDestination+" destination");
|
502 |
return
|
503 |
alreadyAdded = False
|
504 |
port = int(port)
|
505 |
host = socket.gethostbyname(host) # convert hostnames to IP addresses to avoid suffocating DNSs
|
506 |
for h, p, w in tempDestinations.keys():
|
507 |
if (h == host) and (p == port):
|
508 |
alreadyAdded = True
|
509 |
break
|
510 |
destination = (host, port, passwd);
|
511 |
if not alreadyAdded:
|
512 |
self.logger.log(Logger.INFO, "Adding destination "+host+':'+`port`+' '+passwd);
|
513 |
tempDestinations[destination] = copy.deepcopy(self.__defaultOptions); # have a different set of options for each dest.
|
514 |
self.destPrevData[destination] = {};
|
515 |
if options != self.__defaultOptions:
|
516 |
# we have to overwrite defaults with given options
|
517 |
for key, value in options.items():
|
518 |
self.logger.log(Logger.NOTICE, "Overwritting option: "+key+" = "+`value`);
|
519 |
tempDestinations[destination][key] = value;
|
520 |
else:
|
521 |
self.logger.log(Logger.NOTICE, "Destination "+host+":"+str(port)+" "+passwd+" already added. Skipping it");
|
522 |
|
523 |
def __initializeFromFile (self, confFileName, tempDestinations):
|
524 |
"""
|
525 |
Load destinations from confFileName file. If it's an URL (starts with "http://")
|
526 |
load configuration from there. Put all destinations in tempDestinations hash.
|
527 |
|
528 |
Calls addDestination for each line that doesn't start with # and
|
529 |
has non-whitespace characters on it
|
530 |
"""
|
531 |
try:
|
532 |
if confFileName.find ("http://") == 0:
|
533 |
confFile = urllib2.urlopen (confFileName)
|
534 |
else:
|
535 |
confFile = open (confFileName)
|
536 |
except urllib2.HTTPError, e:
|
537 |
self.logger.log(Logger.ERROR, "Cannot open "+confFileName);
|
538 |
if e.code == 401:
|
539 |
self.logger.log(Logger.ERROR, 'HTTPError: not authorized.');
|
540 |
elif e.code == 404:
|
541 |
self.logger.log(Logger.ERROR, 'HTTPError: not found.');
|
542 |
elif e.code == 503:
|
543 |
self.logger.log(Logger.ERROR, 'HTTPError: service unavailable.');
|
544 |
else:
|
545 |
self.logger.log(Logger.ERROR, 'HTTPError: unknown error.');
|
546 |
return
|
547 |
except urllib2.URLError, ex:
|
548 |
self.logger.log(Logger.ERROR, "Cannot open "+confFileName);
|
549 |
self.logger.log(Logger.ERROR, "URL Error: "+str(ex.reason[1]));
|
550 |
return
|
551 |
except IOError, ex:
|
552 |
self.logger.log(Logger.ERROR, "Cannot open "+confFileName);
|
553 |
self.logger.log(Logger.ERROR, "IOError: "+str(ex));
|
554 |
return
|
555 |
self.logger.log(Logger.INFO, "Adding destinations from "+confFileName);
|
556 |
dests = []
|
557 |
opts = {}
|
558 |
while(True):
|
559 |
line = confFile.readline();
|
560 |
if line == '':
|
561 |
break;
|
562 |
line = line.strip()
|
563 |
self.logger.log(Logger.DEBUG, "Reading line "+line);
|
564 |
if (len(line) == 0) or (line[0] == '#'):
|
565 |
continue
|
566 |
elif line.startswith("xApMon_"):
|
567 |
m = re.match("xApMon_(.*)", line);
|
568 |
if m != None:
|
569 |
m = re.match("(\S+)\s*=\s*(\S+)", m.group(1));
|
570 |
if m != None:
|
571 |
param = m.group(1); value = m.group(2);
|
572 |
if(value.upper() == "ON"):
|
573 |
value = True;
|
574 |
elif(value.upper() == "OFF"):
|
575 |
value = False;
|
576 |
elif(param.endswith("_interval")):
|
577 |
value = int(value);
|
578 |
if param == "loglevel":
|
579 |
self.logger.setLogLevel(value);
|
580 |
elif param == "maxMsgRate":
|
581 |
self.setMaxMsgRate(int(value));
|
582 |
elif param == "conf_recheck":
|
583 |
self.configRecheck = value;
|
584 |
elif param == "recheck_interval":
|
585 |
self.configRecheckInterval = value;
|
586 |
elif param.endswith("_data_sent"):
|
587 |
pass; # don't reset time in sys/job/general/_data_sent
|
588 |
else:
|
589 |
opts[param] = value;
|
590 |
else:
|
591 |
dests.append(line);
|
592 |
|
593 |
confFile.close ()
|
594 |
for line in dests:
|
595 |
self.__addDestination(line, tempDestinations, opts)
|
596 |
|
597 |
###############################################################################################
|
598 |
# Internal functions - Background monitor thread
|
599 |
###############################################################################################
|
600 |
|
601 |
def __bgMonitor (self):
|
602 |
while not self.__bgMonitorEvent.isSet():
|
603 |
self.__bgMonitorEvent.wait(10)
|
604 |
if self.__bgMonitorEvent.isSet():
|
605 |
break
|
606 |
if self.performBgMonitoring:
|
607 |
self.sendBgMonitoring() # send only if the interval has elapsed
|
608 |
self.__bgMonitorFinished.set()
|
609 |
|
610 |
###############################################################################################
|
611 |
# Internal helper functions
|
612 |
###############################################################################################
|
613 |
|
614 |
def __directSendParams (self, senderRef, destination, clusterName, nodeName, timeStamp, params):
|
615 |
|
616 |
if self.__shouldSend() == False:
|
617 |
self.logger.log(Logger.DEBUG, "Dropping packet since rate is too fast!");
|
618 |
return;
|
619 |
|
620 |
if destination == None:
|
621 |
self.logger.log(Logger.WARNING, "Destination is None");
|
622 |
return;
|
623 |
|
624 |
host, port, passwd = destination
|
625 |
senderRef['SEQ_NR'] = (senderRef['SEQ_NR'] + 1) % 2000000000; # wrap around 2 mld
|
626 |
|
627 |
xdrPacker = xdrlib.Packer ();
|
628 |
self.logger.log(Logger.DEBUG, "Building XDR packet for ["+str(clusterName)+"/"+str(nodeName)+"] <"+str(senderRef['SEQ_NR'])+"/"+str(senderRef['INSTANCE_ID'])+"> len:"+str(len(params)));
|
629 |
|
630 |
xdrPacker.pack_string ("v:"+self.__version+"p:"+passwd)
|
631 |
|
632 |
xdrPacker.pack_int (senderRef['INSTANCE_ID']);
|
633 |
xdrPacker.pack_int (senderRef['SEQ_NR']);
|
634 |
|
635 |
xdrPacker.pack_string (clusterName);
|
636 |
xdrPacker.pack_string (nodeName);
|
637 |
xdrPacker.pack_int (len(params));
|
638 |
|
639 |
if type(params) == type( {} ):
|
640 |
for name, value in params.iteritems():
|
641 |
self.__packParameter(xdrPacker, name, value)
|
642 |
elif type(params) == type( [] ):
|
643 |
for name, value in params:
|
644 |
self.logger.log(Logger.DEBUG, "Adding parameter "+name+" = "+str(value));
|
645 |
self.__packParameter(xdrPacker, name, value)
|
646 |
else:
|
647 |
self.logger.log(Logger.WARNING, "Unsupported params type in sendParameters: " + str(type(params)));
|
648 |
if (timeStamp != None) and (timeStamp > 0):
|
649 |
xdrPacker.pack_int(timeStamp);
|
650 |
|
651 |
buffer = xdrPacker.get_buffer();
|
652 |
# send this buffer to the destination, using udp datagrams
|
653 |
try:
|
654 |
self.__udpSocket.sendto(buffer, (host, port))
|
655 |
self.logger.log(Logger.DEBUG, "Packet sent to "+host+":"+str(port)+" "+passwd);
|
656 |
except socket.error, msg:
|
657 |
self.logger.log(Logger.ERROR, "Cannot send packet to "+host+":"+str(port)+" "+passwd+": "+str(msg[1]));
|
658 |
xdrPacker.reset()
|
659 |
|
660 |
def __packParameter(self, xdrPacker, name, value):
|
661 |
if (name is None) or (name is ""):
|
662 |
self.logger.log(Logger.WARNING, "Undefine parameter name.");
|
663 |
return;
|
664 |
if (value is None):
|
665 |
self.logger.log(Logger.WARNING, "Ignore " + str(name)+ " parameter because of None value");
|
666 |
return;
|
667 |
try:
|
668 |
typeValue = self.__valueTypes[type(value)]
|
669 |
xdrPacker.pack_string (name)
|
670 |
xdrPacker.pack_int (typeValue)
|
671 |
self.__packFunctions[typeValue] (xdrPacker, value)
|
672 |
self.logger.log(Logger.DEBUG, "Adding parameter "+str(name)+" = "+str(value));
|
673 |
except Exception, ex:
|
674 |
self.logger.log(Logger.WARNING, "Error packing %s = %s; got %s" % (name, str(value), ex))
|
675 |
|
676 |
# Destructor
|
677 |
def __del__(self):
|
678 |
if not self.__freed:
|
679 |
self.free();
|
680 |
|
681 |
# Decide if the current datagram should be sent.
|
682 |
# This decision is based on the number of messages previously sent.
|
683 |
def __shouldSend(self):
|
684 |
now = long(time.time());
|
685 |
if now != self.__crtTime :
|
686 |
# new time
|
687 |
# update previous counters;
|
688 |
self.__prvSent = self.__hWeight * self.__prvSent + (1.0 - self.__hWeight) * self.__crtSent / (now - self.__crtTime);
|
689 |
self.__prvTime = self.__crtTime;
|
690 |
self.logger.log(Logger.DEBUG, "previously sent: " + str(self.__crtSent) + "; dropped: " + str(self.__crtDrop));
|
691 |
# reset current counter
|
692 |
self.__crtTime = now;
|
693 |
self.__crtSent = 0;
|
694 |
self.__crtDrop = 0;
|
695 |
|
696 |
# compute the history
|
697 |
valSent = self.__prvSent * self.__hWeight + self.__crtSent * (1 - self.__hWeight);
|
698 |
|
699 |
doSend = True;
|
700 |
|
701 |
# when we should start dropping messages
|
702 |
level = self.maxMsgRate - self.maxMsgRate / 10;
|
703 |
|
704 |
if valSent > (self.maxMsgRate - level) :
|
705 |
if random.randint(0,self.maxMsgRate / 10) >= (self.maxMsgRate - valSent):
|
706 |
doSend = False;
|
707 |
|
708 |
# counting sent and dropped messages
|
709 |
if doSend:
|
710 |
self.__crtSent+=1;
|
711 |
else:
|
712 |
self.__crtDrop+=1;
|
713 |
|
714 |
return doSend;
|
715 |
|
716 |
################################################################################################
|
717 |
# Private variables. Don't touch
|
718 |
################################################################################################
|
719 |
|
720 |
__valueTypes = {
|
721 |
type("string"): 0, # XDR_STRING (see ApMon.h from C/C++ ApMon version)
|
722 |
type(1): 2, # XDR_INT32
|
723 |
type(1.0): 5}; # XDR_REAL64
|
724 |
|
725 |
__packFunctions = {
|
726 |
0: xdrlib.Packer.pack_string,
|
727 |
2: xdrlib.Packer.pack_int,
|
728 |
5: xdrlib.Packer.pack_double }
|
729 |
|
730 |
__defaultPort = 8884
|
731 |
__version = "2.2.2-py" # apMon version number
|
732 |
|