1 |
|
2 |
"""
|
3 |
* ApMon - Application Monitoring Tool
|
4 |
* Version: 2.2.4
|
5 |
*
|
6 |
* Copyright (C) 2006 California Institute of Technology
|
7 |
*
|
8 |
* Permission is hereby granted, free of charge, to use, copy and modify
|
9 |
* this software and its documentation (the "Software") for any
|
10 |
* purpose, provided that existing copyright notices are retained in
|
11 |
* all copies and that this notice is included verbatim in any distributions
|
12 |
* or substantial portions of the Software.
|
13 |
* This software is a part of the MonALISA framework (http://monalisa.cacr.caltech.edu).
|
14 |
* Users of the Software are asked to feed back problems, benefits,
|
15 |
* and/or suggestions about the software to the MonALISA Development Team
|
16 |
* (developers@monalisa.cern.ch). Support for this software - fixing of bugs,
|
17 |
* incorporation of new features - is done on a best effort basis. All bug
|
18 |
* fixes and enhancements will be made available under the same terms and
|
19 |
* conditions as the original software,
|
20 |
|
21 |
* IN NO EVENT SHALL THE AUTHORS OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
|
22 |
* DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
|
23 |
* OF THE USE OF THIS SOFTWARE, ITS DOCUMENTATION, OR ANY DERIVATIVES THEREOF,
|
24 |
* EVEN IF THE AUTHORS HAVE BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
25 |
|
26 |
* THE AUTHORS AND DISTRIBUTORS SPECIFICALLY DISCLAIM ANY WARRANTIES,
|
27 |
* INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY,
|
28 |
* FITNESS FOR A PARTICULAR PURPOSE, AND NON-INFRINGEMENT. THIS SOFTWARE IS
|
29 |
* PROVIDED ON AN "AS IS" BASIS, AND THE AUTHORS AND DISTRIBUTORS HAVE NO
|
30 |
* OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR
|
31 |
* MODIFICATIONS.
|
32 |
"""
|
33 |
|
34 |
"""
|
35 |
apmon.py
|
36 |
|
37 |
This is a python implementation for the ApMon API for sending
|
38 |
data to the MonALISA service.
|
39 |
|
40 |
For further details about ApMon please see the C/C++ or Java documentation
|
41 |
You can find a sample usage of this module in apmTest.py.
|
42 |
|
43 |
Note that the parameters must be either integers(32 bits) or doubles(64 bits).
|
44 |
Sending strings is supported, but they will not be stored in the
|
45 |
farm's store nor shown in the farm's window in the MonALISA client.
|
46 |
"""
|
47 |
|
48 |
import re
|
49 |
import xdrlib
|
50 |
import socket
|
51 |
import struct
|
52 |
import StringIO
|
53 |
import threading
|
54 |
import time
|
55 |
import Logger
|
56 |
import ProcInfo
|
57 |
import random
|
58 |
import copy
|
59 |
|
60 |
#__all__ = ["ApMon"]
|
61 |
|
62 |
#__debug = False # set this to True to be verbose
|
63 |
|
64 |
class ApMon:
|
65 |
"""
|
66 |
Main class for sending monitoring data to a MonaLisa module.
|
67 |
One or more destinations can be chosen for the data. See constructor.
|
68 |
|
69 |
The data is packed in UDP datagrams, using XDR. The following fields are sent:
|
70 |
- version & password (string)
|
71 |
- cluster name (string)
|
72 |
- node name (string)
|
73 |
- number of parameters (int)
|
74 |
- for each parameter:
|
75 |
- name (string)
|
76 |
- value type (int)
|
77 |
- value
|
78 |
- optionally a (int) with the given timestamp
|
79 |
|
80 |
Attributes (public):
|
81 |
- destinations - a list containing (ip, port, password) tuples
|
82 |
- configAddresses - list with files and urls from where the config is read
|
83 |
- configRecheckInterval - period, in seconds, to check for changes
|
84 |
in the configAddresses list
|
85 |
- configRecheck - boolean - whether to recheck periodically for changes
|
86 |
in the configAddresses list
|
87 |
"""
|
88 |
|
89 |
__defaultOptions = {
|
90 |
'job_monitoring': True, # perform (or not) job monitoring
|
91 |
'job_interval' : 120, # at this interval (in seconds)
|
92 |
'job_data_sent' : 0, # time from Epoch when job information was sent; don't touch!
|
93 |
|
94 |
'job_cpu_time' : True, # elapsed time from the start of this job in seconds
|
95 |
'job_run_time' : True, # processor time spent running this job in seconds
|
96 |
'job_cpu_usage' : True, # current percent of the processor used for this job, as reported by ps
|
97 |
'job_virtualmem': True, # size in JB of the virtual memory occupied by the job, as reported by ps
|
98 |
'job_rss' : True, # size in KB of the resident image size of the job, as reported by ps
|
99 |
'job_mem_usage' : True, # percent of the memory occupied by the job, as reported by ps
|
100 |
'job_workdir_size': True, # size in MB of the working directory of the job
|
101 |
'job_disk_total': True, # size in MB of the total size of the disk partition containing the working directory
|
102 |
'job_disk_used' : True, # size in MB of the used disk partition containing the working directory
|
103 |
'job_disk_free' : True, # size in MB of the free disk partition containing the working directory
|
104 |
'job_disk_usage': True, # percent of the used disk partition containing the working directory
|
105 |
'job_open_files': True, # number of open file descriptors
|
106 |
|
107 |
'sys_monitoring': True, # perform (or not) system monitoring
|
108 |
'sys_interval' : 120, # at this interval (in seconds)
|
109 |
'sys_data_sent' : 0, # time from Epoch when system information was sent; don't touch!
|
110 |
|
111 |
'sys_cpu_usr' : True, # cpu-usage information
|
112 |
'sys_cpu_sys' : True, # all these will produce coresponding paramas without "sys_"
|
113 |
'sys_cpu_nice' : True,
|
114 |
'sys_cpu_idle' : True,
|
115 |
'sys_cpu_usage' : True,
|
116 |
'sys_load1' : True, # system load information
|
117 |
'sys_load5' : True,
|
118 |
'sys_load15' : True,
|
119 |
'sys_mem_used' : True, # memory usage information
|
120 |
'sys_mem_free' : True,
|
121 |
'sys_mem_usage' : True,
|
122 |
'sys_pages_in' : True,
|
123 |
'sys_pages_out' : True,
|
124 |
'sys_swap_used' : True, # swap usage information
|
125 |
'sys_swap_free' : True,
|
126 |
'sys_swap_usage': True,
|
127 |
'sys_swap_in' : True,
|
128 |
'sys_swap_out' : True,
|
129 |
'sys_net_in' : True, # network transfer in kBps
|
130 |
'sys_net_out' : True, # these will produce params called ethX_in, ethX_out, ethX_errs
|
131 |
'sys_net_errs' : True, # for each eth interface
|
132 |
'sys_net_sockets' : True, # number of opened sockets for each proto => sockets_tcp/udp/unix ...
|
133 |
'sys_net_tcp_details' : True, # number of tcp sockets in each state => sockets_tcp_LISTEN, ...
|
134 |
'sys_processes' : True,
|
135 |
'sys_uptime' : True, # uptime of the machine, in days (float number)
|
136 |
|
137 |
'general_info' : True, # send (or not) general host information once every 2 x $sys_interval seconds
|
138 |
'general_data_sent': 0, # time from Epoch when general information was sent; don't touch!
|
139 |
|
140 |
'hostname' : True,
|
141 |
'ip' : True, # will produce ethX_ip params for each interface
|
142 |
'cpu_MHz' : True,
|
143 |
'no_CPUs' : True, # number of CPUs
|
144 |
'total_mem' : True,
|
145 |
'total_swap' : True,
|
146 |
'cpu_vendor_id' : True,
|
147 |
'cpu_family' : True,
|
148 |
'cpu_model' : True,
|
149 |
'cpu_model_name': True,
|
150 |
'bogomips' : True};
|
151 |
|
152 |
def __init__ (self, initValue, defaultLogLevel = Logger.INFO):
|
153 |
"""
|
154 |
Class constructor:
|
155 |
- if initValue is a string, put it in configAddresses and load destinations
|
156 |
from the file named like that. if it starts with "http://", the configuration
|
157 |
is loaded from that URL. For background monitoring, given parameters will overwrite defaults
|
158 |
|
159 |
- if initValue is a list, put its contents in configAddresses and create
|
160 |
the list of destinations from all those sources. For background monitoring,
|
161 |
given parameters will overwrite defaults (see __defaultOptions)
|
162 |
|
163 |
- if initValue is a tuple (of strings), initialize destinations with that values.
|
164 |
Strings in this tuple have this form: "{hostname|ip}[:port][ passwd]", the
|
165 |
default port being 8884 and the default password being "". Background monitoring will be
|
166 |
enabled sending the parameters active from __defaultOptions (see end of file)
|
167 |
|
168 |
- if initValue is a hash (key = string(hostname|ip[:port][ passwd]),
|
169 |
val = hash{'param_name': True/False, ...}) the given options for each destination
|
170 |
will overwrite the default parameters (see __defaultOptions)
|
171 |
"""
|
172 |
self.destinations = {} # empty, by default; key = tuple (host, port, pass) ; val = hash {"param_mame" : True/False, ...}
|
173 |
self.destPrevData = {} # empty, by defaul; key = tuple (host, port, pass) ; val = hash {"param_mame" : value, ...}
|
174 |
self.senderRef = {} # key = tuple (host, port, pass); val = hash {'INSTANCE_ID', 'SEQ_NR' }
|
175 |
self.configAddresses = [] # empty, by default; list of files/urls from where we read config
|
176 |
self.configRecheckInterval = 600 # 10 minutes
|
177 |
self.configRecheck = True # enabled by default
|
178 |
self.performBgMonitoring = True # by default, perform background monitoring
|
179 |
self.monitoredJobs = {} # Monitored jobs; key = pid; value = hash with
|
180 |
self.maxMsgRate = 100 # Maximum number of messages allowed to be sent per second
|
181 |
self.__defaultSenderRef = {'INSTANCE_ID': random.randint(0,0x7FFFFFFE), 'SEQ_NR': 0};
|
182 |
self.__defaultUserCluster = "ApMon_UserSend";
|
183 |
self.__defaultUserNode = socket.getfqdn();
|
184 |
self.__defaultSysMonCluster = "ApMon_SysMon";
|
185 |
self.__defaultSysMonNode = socket.getfqdn();
|
186 |
# don't touch these:
|
187 |
self.__freed = False
|
188 |
self.__udpSocket = None
|
189 |
self.__configUpdateLock = threading.Lock()
|
190 |
self.__configUpdateEvent = threading.Event()
|
191 |
self.__configUpdateFinished = threading.Event()
|
192 |
self.__bgMonitorLock = threading.Lock()
|
193 |
self.__bgMonitorEvent = threading.Event()
|
194 |
self.__bgMonitorFinished = threading.Event()
|
195 |
# don't allow a user to send more than MAX_MSG messages per second, in average
|
196 |
self.__crtTime = 0;
|
197 |
self.__prvTime = 0;
|
198 |
self.__prvSent = 0;
|
199 |
self.__prvDrop = 0;
|
200 |
self.__crtSent = 0;
|
201 |
self.__crtDrop = 0;
|
202 |
self.__hWeight = 0.95; # in (0,1) increase to wait more time before maxMsgRate kicks-in
|
203 |
self.logger = Logger.Logger(defaultLogLevel)
|
204 |
self.setDestinations(initValue)
|
205 |
self.__udpSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
206 |
#if len(self.configAddresses) > 0:
|
207 |
# # if there are addresses that need to be monitored,
|
208 |
# # start config checking and reloading thread
|
209 |
# th = threading.Thread(target=self.__configLoader)
|
210 |
# th.setDaemon(True) # this is a daemon thread
|
211 |
# th.start()
|
212 |
# create the ProcInfo instance
|
213 |
self.procInfo = ProcInfo.ProcInfo(self.logger);
|
214 |
# self.procInfo.update();
|
215 |
# start the background monitoring thread
|
216 |
#th = threading.Thread(target=self.__bgMonitor);
|
217 |
#th.setDaemon(True);
|
218 |
#th.start();
|
219 |
|
220 |
def sendParams (self, params):
|
221 |
"""
|
222 |
Send multiple parameters to MonALISA, with default (last given) cluser and node names.
|
223 |
"""
|
224 |
self.sendTimedParams (-1, params)
|
225 |
|
226 |
def sendTimedParams (self, timeStamp, params):
|
227 |
"""
|
228 |
Send multiple parameters, specifying the time for them, with default (last given) cluster and node names.
|
229 |
(See sendTimedParameters for more details)
|
230 |
"""
|
231 |
self.sendTimedParameters (None, None, timeStamp, params);
|
232 |
|
233 |
def sendParameter (self, clusterName, nodeName, paramName, paramValue):
|
234 |
"""
|
235 |
Send a single parameter to MonALISA.
|
236 |
"""
|
237 |
self.sendTimedParameter(clusterName, nodeName, -1, paramName, paramValue);
|
238 |
|
239 |
def sendTimedParameter (self, clusterName, nodeName, timeStamp, paramName, paramValue):
|
240 |
"""
|
241 |
Send a single parameter, with a given time.
|
242 |
"""
|
243 |
self.sendTimedParameters (clusterName, nodeName, timeStamp, {paramName:paramValue})
|
244 |
|
245 |
def sendParameters (self, clusterName, nodeName, params):
|
246 |
"""
|
247 |
Send multiple parameters specifying cluster and node name for them
|
248 |
"""
|
249 |
self.sendTimedParameters (clusterName, nodeName, -1, params);
|
250 |
|
251 |
def sendTimedParameters (self, clusterName, nodeName, timeStamp, params):
|
252 |
"""
|
253 |
Send multiple monitored parameters to MonALISA.
|
254 |
|
255 |
- clusterName is the name of the cluster being monitored. The first
|
256 |
time this function is called, this paramenter must not be None. Then,
|
257 |
it can be None; last given clusterName will be used instead.
|
258 |
- nodeName is the name of the node for which are the parameters. If this
|
259 |
is None, the full hostname of this machine will be sent instead.
|
260 |
- timeStamp, if > 0, is given time for the parameters. This is in seconds from Epoch.
|
261 |
Note that this option should be used only if you are sure about the time for the result.
|
262 |
Otherwize, the parameters will be assigned a correct time (obtained from NTP servers)
|
263 |
in MonALISA service. This option can be usefull when parsing logs, for example.
|
264 |
- params is a dictionary containing pairs with:
|
265 |
- key: parameter name
|
266 |
- value: parameter value, either int or float.
|
267 |
or params is a vector of tuples (key, value). This version can be used
|
268 |
in case you want to send the parameters in a given order.
|
269 |
|
270 |
NOTE that python doesn't know about 32-bit floats (only 64-bit floats!)
|
271 |
"""
|
272 |
if (clusterName == None) or (clusterName == ""):
|
273 |
clusterName = self.__defaultUserCluster
|
274 |
else:
|
275 |
self.__defaultUserCluster = clusterName
|
276 |
if nodeName == None:
|
277 |
nodeName = self.__defaultUserNode
|
278 |
else:
|
279 |
self.__defaultUserNode = nodeName
|
280 |
if len(self.destinations) == 0:
|
281 |
self.logger.log(Logger.WARNING, "Not sending parameters since no destination is defined.");
|
282 |
return
|
283 |
self.__configUpdateLock.acquire();
|
284 |
for dest in self.destinations.keys():
|
285 |
self.__directSendParams(dest, clusterName, nodeName, timeStamp, params);
|
286 |
self.__configUpdateLock.release();
|
287 |
|
288 |
def addJobToMonitor (self, pid, workDir, clusterName, nodeName):
|
289 |
"""
|
290 |
Add a new job to monitor.
|
291 |
"""
|
292 |
self.__bgMonitorLock.acquire();
|
293 |
self.monitoredJobs[pid] = {};
|
294 |
self.monitoredJobs[pid]['CLUSTER_NAME'] = clusterName;
|
295 |
self.monitoredJobs[pid]['NODE_NAME'] = nodeName;
|
296 |
self.procInfo.addJobToMonitor(pid, workDir);
|
297 |
self.__bgMonitorLock.release();
|
298 |
|
299 |
def removeJobToMonitor (self, pid):
|
300 |
"""
|
301 |
Remove a job from being monitored.
|
302 |
"""
|
303 |
self.__bgMonitorLock.acquire();
|
304 |
self.procInfo.removeJobToMonitor(pid);
|
305 |
del self.monitoredJobs[pid];
|
306 |
self.__bgMonitorLock.release();
|
307 |
|
308 |
def setMonitorClusterNode (self, clusterName, nodeName):
|
309 |
"""
|
310 |
Set the cluster and node names where to send system related information.
|
311 |
"""
|
312 |
self.__bgMonitorLock.acquire();
|
313 |
if (clusterName != None) and (clusterName != ""):
|
314 |
self.__defaultSysMonCluster = clusterName;
|
315 |
if (nodeName != None) and (nodeName != ""):
|
316 |
self.__defaultSysMonNode = nodeName;
|
317 |
self.__bgMonitorLock.release();
|
318 |
|
319 |
def enableBgMonitoring (self, onOff):
|
320 |
"""
|
321 |
Enable or disable background monitoring. Note that background monitoring information
|
322 |
can still be sent if user calls the sendBgMonitoring method.
|
323 |
"""
|
324 |
self.performBgMonitoring = onOff;
|
325 |
|
326 |
def sendBgMonitoring (self, mustSend = False):
|
327 |
"""
|
328 |
Send background monitoring about system and jobs to all interested destinations.
|
329 |
If mustSend == True, the information is sent regardles of the elapsed time since last sent
|
330 |
If mustSend == False, the data is sent only if the required interval has passed since last sent
|
331 |
"""
|
332 |
if len(self.destinations) == 0:
|
333 |
self.logger.log(Logger.WARNING, "Not sending bg monitoring info since no destination is defined.");
|
334 |
return
|
335 |
self.__bgMonitorLock.acquire();
|
336 |
now = int(time.time());
|
337 |
updatedProcInfo = False;
|
338 |
for destination, options in self.destinations.iteritems():
|
339 |
sysParams = [];
|
340 |
jobParams = [];
|
341 |
prevRawData = self.destPrevData[destination];
|
342 |
# for each destination and its options, check if we have to report any background monitoring data
|
343 |
if(options['sys_monitoring'] and (mustSend or options['sys_data_sent'] + options['sys_interval'] <= now)):
|
344 |
for param, active in options.items():
|
345 |
m = re.match("sys_(.+)", param);
|
346 |
if(m != None and active):
|
347 |
param = m.group(1);
|
348 |
if not (param == 'monitoring' or param == 'interval' or param == 'data_sent'):
|
349 |
sysParams.append(param)
|
350 |
options['sys_data_sent'] = now;
|
351 |
if(options['job_monitoring'] and (mustSend or options['job_data_sent'] + options['job_interval'] <= now)):
|
352 |
for param, active in options.items():
|
353 |
m = re.match("job_(.+)", param);
|
354 |
if(m != None and active):
|
355 |
param = m.group(1);
|
356 |
if not (param == 'monitoring' or param == 'interval' or param == 'data_sent'):
|
357 |
jobParams.append(param);
|
358 |
options['job_data_sent'] = now;
|
359 |
if(options['general_info'] and (mustSend or options['general_data_sent'] + 2 * int(options['sys_interval']) <= now)):
|
360 |
for param, active in options.items():
|
361 |
if not (param.startswith("sys_") or param.startswith("job_")) and active:
|
362 |
if not (param == 'general_info' or param == 'general_data_sent'):
|
363 |
sysParams.append(param);
|
364 |
options['general_data_sent'] = now;
|
365 |
|
366 |
if (not updatedProcInfo) and (((len(sysParams) > 0) or (len(jobParams) > 0))):
|
367 |
self.procInfo.update();
|
368 |
updatedProcInfo = True;
|
369 |
|
370 |
sysResults = {}
|
371 |
if(len(sysParams) > 0):
|
372 |
sysResults = self.procInfo.getSystemData(sysParams, prevRawData)
|
373 |
if(len(sysResults) > 0):
|
374 |
self.__directSendParams(destination, self.__defaultSysMonCluster, self.__defaultSysMonNode, -1, sysResults)
|
375 |
for pid, props in self.monitoredJobs.items():
|
376 |
jobResults = {}
|
377 |
if(len(jobParams) > 0):
|
378 |
jobResults = self.procInfo.getJobData(pid, jobParams)
|
379 |
if(len(jobResults) > 0):
|
380 |
self.__directSendParams(destination, props['CLUSTER_NAME'], props['NODE_NAME'], -1, jobResults)
|
381 |
self.__bgMonitorLock.release();
|
382 |
|
383 |
def setDestinations(self, initValue):
|
384 |
"""
|
385 |
Set the destinations of the ApMon instance. It accepts the same parameters as the constructor.
|
386 |
"""
|
387 |
if type(initValue) == type("string"):
|
388 |
self.configAddresses = [initValue]
|
389 |
self.configRecheck = True
|
390 |
self.configRecheckInterval = 600
|
391 |
self.__reloadAddresses()
|
392 |
elif type(initValue) == type([]):
|
393 |
self.configAddresses = initValue
|
394 |
self.configRecheck = True
|
395 |
self.configRecheckInterval = 600
|
396 |
self.__reloadAddresses()
|
397 |
elif type(initValue) == type(()):
|
398 |
self.configAddresses = []
|
399 |
for dest in initValue:
|
400 |
self.__addDestination (dest, self.destinations)
|
401 |
self.configRecheck = False
|
402 |
elif type(initValue) == type({}):
|
403 |
self.configAddresses = []
|
404 |
for dest, opts in initValue.items():
|
405 |
self.__addDestination (dest, self.destinations, opts)
|
406 |
self.configRecheck = False
|
407 |
|
408 |
def initializedOK(self):
|
409 |
"""
|
410 |
Retruns true if there is no destination where the parameters to be sent.
|
411 |
"""
|
412 |
return len(self.destinations) > 0
|
413 |
|
414 |
def setLogLevel(self, strLevel):
|
415 |
"""
|
416 |
Change the log level. Given level is a string, one of 'FATAL', 'ERROR', 'WARNING',
|
417 |
'INFO', 'NOTICE', 'DEBUG'.
|
418 |
"""
|
419 |
self.logger.setLogLevel(strLevel);
|
420 |
|
421 |
def setMaxMsgRate(self, rate):
|
422 |
"""
|
423 |
Set the maximum number of messages that can be sent, per second.
|
424 |
"""
|
425 |
self.maxMsgRate = rate;
|
426 |
self.logger.log(Logger.DEBUG, "Setting maxMsgRate to: " + str(rate));
|
427 |
|
428 |
def free(self):
|
429 |
"""
|
430 |
Stop background threands, close opened sockets. You have to use this function if you want to
|
431 |
free all the resources that ApMon takes, and allow it to be garbage-collected.
|
432 |
"""
|
433 |
#if len(self.configAddresses) > 0:
|
434 |
# self.__configUpdateEvent.set()
|
435 |
# self.__configUpdateFinished.wait()
|
436 |
#self.__bgMonitorEvent.set()
|
437 |
#self.__bgMonitorFinished.wait()
|
438 |
|
439 |
if self.__udpSocket != None:
|
440 |
self.logger.log(Logger.DEBUG, "Closing UDP socket on ApMon object destroy.");
|
441 |
self.__udpSocket.close();
|
442 |
self.__udpSocket = None;
|
443 |
self.__freed = True
|
444 |
|
445 |
|
446 |
#########################################################################################
|
447 |
# Internal functions - Config reloader thread
|
448 |
#########################################################################################
|
449 |
|
450 |
def __configLoader(self):
|
451 |
"""
|
452 |
Main loop of the thread that checks for changes and reloads the configuration
|
453 |
"""
|
454 |
while not self.__configUpdateEvent.isSet():
|
455 |
self.__configUpdateEvent.wait(min(30, self.configRecheckInterval)) # don't recheck more often than 30 sec
|
456 |
if self.__configUpdateEvent.isSet():
|
457 |
break
|
458 |
if self.configRecheck:
|
459 |
self.__reloadAddresses()
|
460 |
self.logger.log(Logger.DEBUG, "Config reloaded. Seleeping for "+`self.configRecheckInterval`+" sec.");
|
461 |
self.__configUpdateFinished.set();
|
462 |
|
463 |
def __reloadAddresses(self):
|
464 |
"""
|
465 |
Refresh now the destinations hash, by loading data from all sources in configAddresses
|
466 |
"""
|
467 |
newDestinations = {}
|
468 |
urls = copy.deepcopy(self.configAddresses)
|
469 |
while(len(urls) > 0 and len(newDestinations) == 0):
|
470 |
src = random.choice(urls)
|
471 |
urls.remove(src)
|
472 |
self.__initializeFromFile(src, newDestinations)
|
473 |
# avoid changing config in the middle of sending packets to previous destinations
|
474 |
self.__configUpdateLock.acquire()
|
475 |
self.destinations = newDestinations
|
476 |
self.__configUpdateLock.release()
|
477 |
|
478 |
def __addDestination (self, aDestination, tempDestinations, options = __defaultOptions):
|
479 |
"""
|
480 |
Add a destination to the list.
|
481 |
|
482 |
aDestination is a string of the form "{hostname|ip}[:port] [passwd]" without quotes.
|
483 |
If the port is not given, it will be used the default port (8884)
|
484 |
If the password is missing, it will be considered an empty string
|
485 |
"""
|
486 |
aDestination = aDestination.strip().replace('\t', ' ')
|
487 |
while aDestination != aDestination.replace(' ', ' '):
|
488 |
aDestination = aDestination.replace(' ', ' ')
|
489 |
sepPort = aDestination.find (':')
|
490 |
sepPasswd = aDestination.rfind (' ')
|
491 |
if sepPort >= 0:
|
492 |
host = aDestination[0:sepPort].strip()
|
493 |
if sepPasswd > sepPort + 1:
|
494 |
port = aDestination[sepPort+1:sepPasswd].strip()
|
495 |
passwd = aDestination[sepPasswd:].strip()
|
496 |
else:
|
497 |
port = aDestination[sepPort+1:].strip()
|
498 |
passwd = ""
|
499 |
else:
|
500 |
port = str(self.__defaultPort)
|
501 |
if sepPasswd >= 0:
|
502 |
host = aDestination[0:sepPasswd].strip()
|
503 |
passwd = aDestination[sepPasswd:].strip()
|
504 |
else:
|
505 |
host = aDestination.strip()
|
506 |
passwd = ""
|
507 |
if (not port.isdigit()):
|
508 |
self.logger.log(Logger.WARNING, "Bad value for port number "+`port`+" in "+aDestination+" destination");
|
509 |
return
|
510 |
alreadyAdded = False
|
511 |
port = int(port)
|
512 |
try:
|
513 |
host = socket.gethostbyname(host) # convert hostnames to IP addresses to avoid suffocating DNSs
|
514 |
except socket.error, msg:
|
515 |
self.logger.log(Logger.ERROR, "Error resolving "+host+": "+str(msg))
|
516 |
return
|
517 |
for h, p, w in tempDestinations.keys():
|
518 |
if (h == host) and (p == port):
|
519 |
alreadyAdded = True
|
520 |
break
|
521 |
destination = (host, port, passwd)
|
522 |
if not alreadyAdded:
|
523 |
self.logger.log(Logger.INFO, "Adding destination "+host+':'+`port`+' '+passwd)
|
524 |
if(self.destinations.has_key(destination)):
|
525 |
tempDestinations[destination] = self.destinations[destination] # reuse previous options
|
526 |
else:
|
527 |
tempDestinations[destination] = copy.deepcopy(self.__defaultOptions) # have a different set of options for each dest
|
528 |
if not self.destPrevData.has_key(destination):
|
529 |
self.destPrevData[destination] = {} # set it empty only if it's really new
|
530 |
if not self.senderRef.has_key(destination):
|
531 |
self.senderRef[destination] = copy.deepcopy(self.__defaultSenderRef) # otherwise, don't reset this nr.
|
532 |
if options != self.__defaultOptions:
|
533 |
# we have to overwrite defaults with given options
|
534 |
for key, value in options.items():
|
535 |
self.logger.log(Logger.NOTICE, "Overwritting option: "+key+" = "+`value`)
|
536 |
tempDestinations[destination][key] = value
|
537 |
else:
|
538 |
self.logger.log(Logger.NOTICE, "Destination "+host+":"+str(port)+" "+passwd+" already added. Skipping it");
|
539 |
|
540 |
def __initializeFromFile (self, confFileName, tempDestinations):
|
541 |
"""
|
542 |
Load destinations from confFileName file. If it's an URL (starts with "http://")
|
543 |
load configuration from there. Put all destinations in tempDestinations hash.
|
544 |
|
545 |
Calls addDestination for each line that doesn't start with # and
|
546 |
has non-whitespace characters on it
|
547 |
"""
|
548 |
try:
|
549 |
if confFileName.find ("http://") == 0:
|
550 |
confFile = self.__getURL(confFileName)
|
551 |
if confFile is None:
|
552 |
return
|
553 |
else:
|
554 |
confFile = open (confFileName)
|
555 |
except IOError, ex:
|
556 |
self.logger.log(Logger.ERROR, "Cannot open "+confFileName);
|
557 |
self.logger.log(Logger.ERROR, "IOError: "+str(ex));
|
558 |
return
|
559 |
self.logger.log(Logger.INFO, "Adding destinations from "+confFileName);
|
560 |
dests = []
|
561 |
opts = {}
|
562 |
while(True):
|
563 |
line = confFile.readline();
|
564 |
if line == '':
|
565 |
break;
|
566 |
line = line.strip()
|
567 |
self.logger.log(Logger.DEBUG, "Reading line "+line);
|
568 |
if (len(line) == 0) or (line[0] == '#'):
|
569 |
continue
|
570 |
elif line.startswith("xApMon_"):
|
571 |
m = re.match("xApMon_(.*)", line);
|
572 |
if m != None:
|
573 |
m = re.match("(\S+)\s*=\s*(\S+)", m.group(1));
|
574 |
if m != None:
|
575 |
param = m.group(1); value = m.group(2);
|
576 |
if(value.upper() == "ON"):
|
577 |
value = True;
|
578 |
elif(value.upper() == "OFF"):
|
579 |
value = False;
|
580 |
elif(param.endswith("_interval")):
|
581 |
value = int(value);
|
582 |
if param == "loglevel":
|
583 |
self.logger.setLogLevel(value);
|
584 |
elif param == "maxMsgRate":
|
585 |
self.setMaxMsgRate(int(value));
|
586 |
elif param == "conf_recheck":
|
587 |
self.configRecheck = value;
|
588 |
elif param == "recheck_interval":
|
589 |
self.configRecheckInterval = value;
|
590 |
elif param.endswith("_data_sent"):
|
591 |
pass; # don't reset time in sys/job/general/_data_sent
|
592 |
else:
|
593 |
opts[param] = value;
|
594 |
else:
|
595 |
dests.append(line);
|
596 |
|
597 |
confFile.close ()
|
598 |
for line in dests:
|
599 |
self.__addDestination(line, tempDestinations, opts)
|
600 |
|
601 |
###############################################################################################
|
602 |
# Internal functions - Background monitor thread
|
603 |
###############################################################################################
|
604 |
|
605 |
def __bgMonitor (self):
|
606 |
while not self.__bgMonitorEvent.isSet():
|
607 |
self.__bgMonitorEvent.wait(10)
|
608 |
if self.__bgMonitorEvent.isSet():
|
609 |
break
|
610 |
if self.performBgMonitoring:
|
611 |
self.sendBgMonitoring() # send only if the interval has elapsed
|
612 |
self.__bgMonitorFinished.set()
|
613 |
|
614 |
###############################################################################################
|
615 |
# Internal helper functions
|
616 |
###############################################################################################
|
617 |
|
618 |
# this is a simplified replacement for urllib2 which doesn't support setting a timeout.
|
619 |
# by default, if timeout is not specified, it waits 5 seconds
|
620 |
def __getURL (self, url, timeout = 5):
|
621 |
r = re.compile("http://([^:/]+)(:(\d+))?(/.*)").match(url)
|
622 |
if r is None:
|
623 |
self.logger.log(Logger.ERROR, "Cannot open "+url+". Incorrectly formed URL.")
|
624 |
return None
|
625 |
host = r.group(1)
|
626 |
if r.group(3) == None:
|
627 |
port = 80 # no port is given, pick the default 80 for HTTP
|
628 |
else:
|
629 |
port = int(r.group(3))
|
630 |
if r.group(4) == None:
|
631 |
path = "" # no path is give, let server decide
|
632 |
else:
|
633 |
path = r.group(4)
|
634 |
sock = None
|
635 |
err = None
|
636 |
try:
|
637 |
for res in socket.getaddrinfo(host, port, socket.AF_UNSPEC, socket.SOCK_STREAM):
|
638 |
af, socktype, proto, canonname, sa = res
|
639 |
try:
|
640 |
sock = socket.socket(af, socktype, proto)
|
641 |
except socket.error, msg:
|
642 |
sock = None
|
643 |
err = msg
|
644 |
continue
|
645 |
try:
|
646 |
sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDTIMEO, struct.pack("ii", timeout, 0))
|
647 |
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVTIMEO, struct.pack("ii", timeout, 0))
|
648 |
sock.connect(sa)
|
649 |
except socket.error, msg:
|
650 |
sock.close()
|
651 |
sock = None
|
652 |
err = msg
|
653 |
continue
|
654 |
break
|
655 |
except socket.error, msg:
|
656 |
sock = None
|
657 |
err = msg
|
658 |
if sock is None:
|
659 |
self.logger.log(Logger.ERROR, "Cannot open "+url)
|
660 |
self.logger.log(Logger.ERROR, "SocketError: "+str(err))
|
661 |
return None
|
662 |
try:
|
663 |
sock.send("GET "+path+" HTTP/1.0\n\n");
|
664 |
data = sock.recv(8192)
|
665 |
file = StringIO.StringIO(data)
|
666 |
httpStatus = 0
|
667 |
while True:
|
668 |
line = file.readline().strip()
|
669 |
if line == "":
|
670 |
break # exit at the end of file or at the first empty line (finish of http headers)
|
671 |
r = re.compile("HTTP/\d.\d (\d+)").match(line)
|
672 |
if r != None:
|
673 |
httpStatus = int(r.group(1))
|
674 |
if httpStatus == 200:
|
675 |
return file
|
676 |
else:
|
677 |
self.logger.log(Logger.ERROR, "Cannot open "+url)
|
678 |
if httpStatus == 401:
|
679 |
self.logger.log(Logger.ERROR, 'HTTPError: not authorized ['+str(httpStatus)+']')
|
680 |
elif httpStatus == 404:
|
681 |
self.logger.log(Logger.ERROR, 'HTTPError: not found ['+str(httpStatus)+']')
|
682 |
elif httpStatus == 503:
|
683 |
self.logger.log(Logger.ERROR, 'HTTPError: service unavailable ['+str(httpStatus)+']')
|
684 |
else:
|
685 |
self.logger.log(Logger.ERROR, 'HTTPError: unknown error ['+str(httpStatus)+']')
|
686 |
return None
|
687 |
except socket.error, msg:
|
688 |
self.logger.log(Logger.ERROR, "Cannot open "+url)
|
689 |
self.logger.log(Logger.ERROR, "SocketError: "+str(msg))
|
690 |
sock.close()
|
691 |
return None
|
692 |
|
693 |
def __directSendParams (self, destination, clusterName, nodeName, timeStamp, params):
|
694 |
|
695 |
if self.__shouldSend() == False:
|
696 |
# self.logger.log(Logger.ERROR, "Dropping packet since rate is too fast!");
|
697 |
self.logger.log(Logger.INFO, "Pausing 1sec since rate is too fast!");
|
698 |
time.sleep(1.0)
|
699 |
# return;
|
700 |
|
701 |
if destination == None:
|
702 |
self.logger.log(Logger.WARNING, "Destination is None");
|
703 |
return;
|
704 |
|
705 |
host, port, passwd = destination
|
706 |
crtSenderRef = self.senderRef[destination]
|
707 |
crtSenderRef['SEQ_NR'] = (crtSenderRef['SEQ_NR'] + 1) % 2000000000; # wrap around 2 mld
|
708 |
|
709 |
xdrPacker = xdrlib.Packer ()
|
710 |
|
711 |
xdrPacker.pack_string ("v:"+self.__version+"p:"+passwd)
|
712 |
|
713 |
xdrPacker.pack_int (crtSenderRef['INSTANCE_ID'])
|
714 |
xdrPacker.pack_int (crtSenderRef['SEQ_NR'])
|
715 |
|
716 |
xdrPacker.pack_string (clusterName)
|
717 |
xdrPacker.pack_string (nodeName)
|
718 |
|
719 |
sent_params_nr = 0
|
720 |
paramsPacker = xdrlib.Packer ()
|
721 |
|
722 |
if type(params) == type( {} ):
|
723 |
for name, value in params.iteritems():
|
724 |
if self.__packParameter(paramsPacker, name, value):
|
725 |
sent_params_nr += 1
|
726 |
elif type(params) == type( [] ):
|
727 |
for name, value in params:
|
728 |
self.logger.log(Logger.DEBUG, "Adding parameter "+name+" = "+str(value));
|
729 |
if self.__packParameter(paramsPacker, name, value):
|
730 |
sent_params_nr += 1
|
731 |
else:
|
732 |
self.logger.log(Logger.WARNING, "Unsupported params type in sendParameters: " + str(type(params)));
|
733 |
|
734 |
xdrPacker.pack_int (sent_params_nr)
|
735 |
|
736 |
if (timeStamp != None) and (timeStamp > 0):
|
737 |
paramsPacker.pack_int(timeStamp);
|
738 |
|
739 |
buffer = xdrPacker.get_buffer() + paramsPacker.get_buffer()
|
740 |
self.logger.log(Logger.NOTICE, "Building XDR packet ["+str(clusterName)+"/"+str(nodeName)+"] <"+str(crtSenderRef['SEQ_NR'])+"/"+str(crtSenderRef['INSTANCE_ID'])+"> "+str(sent_params_nr)+" params, "+str(len(buffer))+" bytes.");
|
741 |
# send this buffer to the destination, using udp datagrams
|
742 |
try:
|
743 |
self.__udpSocket.sendto(buffer, (host, port))
|
744 |
self.logger.log(Logger.NOTICE, "Packet sent to "+host+":"+str(port)+" "+passwd)
|
745 |
except socket.error, msg:
|
746 |
self.logger.log(Logger.ERROR, "Cannot send packet to "+host+":"+str(port)+" "+passwd+": "+str(msg[1]))
|
747 |
xdrPacker.reset()
|
748 |
paramsPacker.reset()
|
749 |
|
750 |
def __packParameter(self, xdrPacker, name, value):
|
751 |
if (name is None) or (name is ""):
|
752 |
self.logger.log(Logger.WARNING, "Undefined parameter name. Ignoring value "+str(value))
|
753 |
return False
|
754 |
if (value is None):
|
755 |
self.logger.log(Logger.WARNING, "Ignore " + str(name)+ " parameter because of None value")
|
756 |
return False
|
757 |
try:
|
758 |
typeValue = self.__valueTypes[type(value)]
|
759 |
xdrPacker.pack_string (name)
|
760 |
xdrPacker.pack_int (typeValue)
|
761 |
self.__packFunctions[typeValue] (xdrPacker, value)
|
762 |
self.logger.log(Logger.DEBUG, "Adding parameter "+str(name)+" = "+str(value))
|
763 |
return True
|
764 |
except Exception, ex:
|
765 |
self.logger.log(Logger.WARNING, "Error packing %s = %s; got %s" % (name, str(value), ex))
|
766 |
return False
|
767 |
|
768 |
# Destructor
|
769 |
def __del__(self):
|
770 |
if not self.__freed:
|
771 |
self.free();
|
772 |
|
773 |
# Decide if the current datagram should be sent.
|
774 |
# This decision is based on the number of messages previously sent.
|
775 |
def __shouldSend(self):
|
776 |
now = long(time.time());
|
777 |
if now != self.__crtTime :
|
778 |
# new time
|
779 |
# update previous counters;
|
780 |
self.__prvSent = self.__hWeight * self.__prvSent + (1.0 - self.__hWeight) * self.__crtSent / (now - self.__crtTime);
|
781 |
self.__prvTime = self.__crtTime;
|
782 |
self.logger.log(Logger.DEBUG, "previously sent: " + str(self.__crtSent) + "; dropped: " + str(self.__crtDrop));
|
783 |
# reset current counter
|
784 |
self.__crtTime = now;
|
785 |
self.__crtSent = 0;
|
786 |
self.__crtDrop = 0;
|
787 |
|
788 |
# compute the history
|
789 |
valSent = self.__prvSent * self.__hWeight + self.__crtSent * (1 - self.__hWeight);
|
790 |
|
791 |
doSend = True;
|
792 |
|
793 |
# when we should start dropping messages
|
794 |
level = self.maxMsgRate - self.maxMsgRate / 10;
|
795 |
|
796 |
if valSent > (self.maxMsgRate - level) :
|
797 |
if random.randint(0,self.maxMsgRate / 10) >= (self.maxMsgRate - valSent):
|
798 |
doSend = False;
|
799 |
|
800 |
# counting sent and dropped messages
|
801 |
if doSend:
|
802 |
self.__crtSent+=1;
|
803 |
else:
|
804 |
self.__crtDrop+=1;
|
805 |
|
806 |
return doSend;
|
807 |
|
808 |
################################################################################################
|
809 |
# Private variables. Don't touch
|
810 |
################################################################################################
|
811 |
|
812 |
__valueTypes = {
|
813 |
type("string"): 0, # XDR_STRING (see ApMon.h from C/C++ ApMon version)
|
814 |
type(1): 2, # XDR_INT32
|
815 |
type(1.0): 5}; # XDR_REAL64
|
816 |
|
817 |
__packFunctions = {
|
818 |
0: xdrlib.Packer.pack_string,
|
819 |
2: xdrlib.Packer.pack_int,
|
820 |
5: xdrlib.Packer.pack_double }
|
821 |
|
822 |
__defaultPort = 8884
|
823 |
__version = "2.2.4-py" # apMon version number
|
824 |
|