1 |
corvo |
1.1 |
|
2 |
|
|
"""
|
3 |
|
|
* ApMon - Application Monitoring Tool
|
4 |
corvo |
1.3 |
* Version: 2.2.1
|
5 |
corvo |
1.1 |
*
|
6 |
corvo |
1.3 |
* Copyright (C) 2006 California Institute of Technology
|
7 |
corvo |
1.1 |
*
|
8 |
|
|
* Permission is hereby granted, free of charge, to use, copy and modify
|
9 |
|
|
* this software and its documentation (the "Software") for any
|
10 |
|
|
* purpose, provided that existing copyright notices are retained in
|
11 |
|
|
* all copies and that this notice is included verbatim in any distributions
|
12 |
|
|
* or substantial portions of the Software.
|
13 |
|
|
* This software is a part of the MonALISA framework (http://monalisa.cacr.caltech.edu).
|
14 |
|
|
* Users of the Software are asked to feed back problems, benefits,
|
15 |
|
|
* and/or suggestions about the software to the MonALISA Development Team
|
16 |
|
|
* (developers@monalisa.cern.ch). Support for this software - fixing of bugs,
|
17 |
|
|
* incorporation of new features - is done on a best effort basis. All bug
|
18 |
|
|
* fixes and enhancements will be made available under the same terms and
|
19 |
|
|
* conditions as the original software,
|
20 |
|
|
|
21 |
|
|
* IN NO EVENT SHALL THE AUTHORS OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
|
22 |
|
|
* DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
|
23 |
|
|
* OF THE USE OF THIS SOFTWARE, ITS DOCUMENTATION, OR ANY DERIVATIVES THEREOF,
|
24 |
|
|
* EVEN IF THE AUTHORS HAVE BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
25 |
|
|
|
26 |
|
|
* THE AUTHORS AND DISTRIBUTORS SPECIFICALLY DISCLAIM ANY WARRANTIES,
|
27 |
|
|
* INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY,
|
28 |
|
|
* FITNESS FOR A PARTICULAR PURPOSE, AND NON-INFRINGEMENT. THIS SOFTWARE IS
|
29 |
|
|
* PROVIDED ON AN "AS IS" BASIS, AND THE AUTHORS AND DISTRIBUTORS HAVE NO
|
30 |
|
|
* OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR
|
31 |
|
|
* MODIFICATIONS.
|
32 |
|
|
"""
|
33 |
|
|
|
34 |
|
|
"""
|
35 |
|
|
apmon.py
|
36 |
|
|
|
37 |
|
|
This is a python implementation for the ApMon API for sending
|
38 |
|
|
data to the MonALISA service.
|
39 |
|
|
|
40 |
|
|
For further details about ApMon please see the C/C++ or Java documentation
|
41 |
|
|
You can find a sample usage of this module in apmTest.py.
|
42 |
|
|
|
43 |
|
|
Note that the parameters must be either integers(32 bits) or doubles(64 bits).
|
44 |
|
|
Sending strings is supported, but they will not be stored in the
|
45 |
|
|
farm's store nor shown in the farm's window in the MonALISA client.
|
46 |
|
|
"""
|
47 |
|
|
|
48 |
|
|
import re
|
49 |
|
|
import xdrlib
|
50 |
|
|
import socket
|
51 |
|
|
import urllib2
|
52 |
|
|
import threading
|
53 |
|
|
import time
|
54 |
|
|
import Logger
|
55 |
|
|
import ProcInfo
|
56 |
corvo |
1.2 |
import random
|
57 |
corvo |
1.3 |
import copy
|
58 |
corvo |
1.1 |
|
59 |
|
|
#__all__ = ["ApMon"]
|
60 |
|
|
|
61 |
corvo |
1.3 |
#__debug = False # self.destPrevData[destination];set this to True to be verbose
|
62 |
corvo |
1.1 |
|
63 |
|
|
class ApMon:
|
64 |
|
|
"""
|
65 |
|
|
Main class for sending monitoring data to a MonaLisa module.
|
66 |
|
|
One or more destinations can be chosen for the data. See constructor.
|
67 |
|
|
|
68 |
|
|
The data is packed in UDP datagrams, using XDR. The following fields are sent:
|
69 |
|
|
- version & password (string)
|
70 |
|
|
- cluster name (string)
|
71 |
|
|
- node name (string)
|
72 |
|
|
- number of parameters (int)
|
73 |
|
|
- for each parameter:
|
74 |
|
|
- name (string)
|
75 |
|
|
- value type (int)
|
76 |
|
|
- value
|
77 |
|
|
- optionally a (int) with the given timestamp
|
78 |
|
|
|
79 |
|
|
Attributes (public):
|
80 |
|
|
- destinations - a list containing (ip, port, password) tuples
|
81 |
|
|
- configAddresses - list with files and urls from where the config is read
|
82 |
|
|
- configRecheckInterval - period, in seconds, to check for changes
|
83 |
|
|
in the configAddresses list
|
84 |
|
|
- configRecheck - boolean - whether to recheck periodically for changes
|
85 |
|
|
in the configAddresses list
|
86 |
|
|
"""
|
87 |
|
|
|
88 |
|
|
__defaultOptions = {
|
89 |
|
|
'job_monitoring': True, # perform (or not) job monitoring
|
90 |
|
|
'job_interval' : 10, # at this interval (in seconds)
|
91 |
|
|
'job_data_sent' : 0, # time from Epoch when job information was sent; don't touch!
|
92 |
|
|
|
93 |
|
|
'job_cpu_time' : True, # elapsed time from the start of this job in seconds
|
94 |
|
|
'job_run_time' : True, # processor time spent running this job in seconds
|
95 |
|
|
'job_cpu_usage' : True, # current percent of the processor used for this job, as reported by ps
|
96 |
|
|
'job_virtualmem': True, # size in JB of the virtual memory occupied by the job, as reported by ps
|
97 |
|
|
'job_rss' : True, # size in KB of the resident image size of the job, as reported by ps
|
98 |
|
|
'job_mem_usage' : True, # percent of the memory occupied by the job, as reported by ps
|
99 |
|
|
'job_workdir_size': True, # size in MB of the working directory of the job
|
100 |
|
|
'job_disk_total': True, # size in MB of the total size of the disk partition containing the working directory
|
101 |
|
|
'job_disk_used' : True, # size in MB of the used disk partition containing the working directory
|
102 |
|
|
'job_disk_free' : True, # size in MB of the free disk partition containing the working directory
|
103 |
|
|
'job_disk_usage': True, # percent of the used disk partition containing the working directory
|
104 |
corvo |
1.2 |
'job_open_files': True, # number of open file descriptors
|
105 |
corvo |
1.1 |
|
106 |
|
|
'sys_monitoring': True, # perform (or not) system monitoring
|
107 |
|
|
'sys_interval' : 10, # at this interval (in seconds)
|
108 |
|
|
'sys_data_sent' : 0, # time from Epoch when system information was sent; don't touch!
|
109 |
|
|
|
110 |
|
|
'sys_cpu_usr' : False, # cpu-usage information
|
111 |
|
|
'sys_cpu_sys' : False, # all these will produce coresponding paramas without "sys_"
|
112 |
|
|
'sys_cpu_nice' : False,
|
113 |
|
|
'sys_cpu_idle' : False,
|
114 |
|
|
'sys_cpu_usage' : True,
|
115 |
|
|
'sys_load1' : True, # system load information
|
116 |
|
|
'sys_load5' : True,
|
117 |
|
|
'sys_load15' : True,
|
118 |
|
|
'sys_mem_used' : False, # memory usage information
|
119 |
|
|
'sys_mem_free' : False,
|
120 |
|
|
'sys_mem_usage' : True,
|
121 |
|
|
'sys_pages_in' : False,
|
122 |
|
|
'sys_pages_out' : False,
|
123 |
|
|
'sys_swap_used' : True, # swap usage information
|
124 |
|
|
'sys_swap_free' : False,
|
125 |
|
|
'sys_swap_usage': True,
|
126 |
|
|
'sys_swap_in' : False,
|
127 |
|
|
'sys_swap_out' : False,
|
128 |
|
|
'sys_net_in' : True, # network transfer in kBps
|
129 |
|
|
'sys_net_out' : True, # these will produce params called ethX_in, ethX_out, ethX_errs
|
130 |
|
|
'sys_net_errs' : False, # for each eth interface
|
131 |
corvo |
1.3 |
'sys_net_sockets' : True, # number of opened sockets for each proto => sockets_tcp/udp/unix ...
|
132 |
|
|
'sys_net_tcp_details' : True, # number of tcp sockets in each state => sockets_tcp_LISTEN, ...
|
133 |
corvo |
1.1 |
'sys_processes' : True,
|
134 |
|
|
'sys_uptime' : True, # uptime of the machine, in days (float number)
|
135 |
|
|
|
136 |
|
|
'general_info' : True, # send (or not) general host information once every 2 $sys_interval seconds
|
137 |
|
|
'general_data_sent': 0, # time from Epoch when general information was sent; don't touch!
|
138 |
|
|
|
139 |
|
|
'hostname' : True,
|
140 |
|
|
'ip' : True, # will produce ethX_ip params for each interface
|
141 |
|
|
'cpu_MHz' : True,
|
142 |
|
|
'no_CPUs' : True, # number of CPUs
|
143 |
|
|
'total_mem' : True,
|
144 |
corvo |
1.2 |
'total_swap' : True,
|
145 |
|
|
'cpu_vendor_id' : True,
|
146 |
|
|
'cpu_family' : True,
|
147 |
|
|
'cpu_model' : True,
|
148 |
|
|
'cpu_model_name': True,
|
149 |
|
|
'bogomips' : True};
|
150 |
corvo |
1.1 |
|
151 |
corvo |
1.3 |
def __init__ (self, initValue, defaultLogLevel = Logger.INFO):
|
152 |
corvo |
1.1 |
"""
|
153 |
|
|
Class constructor:
|
154 |
|
|
- if initValue is a string, put it in configAddresses and load destinations
|
155 |
|
|
from the file named like that. if it starts with "http://", the configuration
|
156 |
|
|
is loaded from that URL. For background monitoring, given parameters will overwrite defaults
|
157 |
|
|
|
158 |
|
|
- if initValue is a list, put its contents in configAddresses and create
|
159 |
|
|
the list of destinations from all those sources. For background monitoring,
|
160 |
|
|
given parameters will overwrite defaults (see __defaultOptions)
|
161 |
|
|
|
162 |
|
|
- if initValue is a tuple (of strings), initialize destinations with that values.
|
163 |
|
|
Strings in this tuple have this form: "{hostname|ip}[:port][ passwd]", the
|
164 |
|
|
default port being 8884 and the default password being "". Background monitoring will be
|
165 |
|
|
enabled sending the parameters active from __defaultOptions (see end of file)
|
166 |
|
|
|
167 |
|
|
- if initValue is a hash (key = string(hostname|ip[:port][ passwd]),
|
168 |
|
|
val = hash{'param_name': True/False, ...}) the given options for each destination
|
169 |
|
|
will overwrite the default parameters (see __defaultOptions)
|
170 |
|
|
"""
|
171 |
corvo |
1.2 |
self.destinations = {} # empty, by default; key = tuple (host, port, pass) ; val = hash {"param_mame" : True/False, ...}
|
172 |
corvo |
1.3 |
self.destPrevData = {} # empty, by defaul; key = tuple (host, port, pass) ; val = hash {"param_mame" : value, ...}
|
173 |
corvo |
1.2 |
self.configAddresses = [] # empty, by default; list of files/urls from where we read config
|
174 |
|
|
self.configRecheckInterval = 120 # 2 minutes
|
175 |
|
|
self.configRecheck = True # enabled by default
|
176 |
|
|
self.performBgMonitoring = True # by default, perform background monitoring
|
177 |
|
|
self.monitoredJobs = {} # Monitored jobs; key = pid; value = hash with
|
178 |
corvo |
1.3 |
self.maxMsgRate = 20; # Maximum number of messages allowed to be sent per second
|
179 |
|
|
self.sender = {'INSTANCE_ID': random.randint(0,0x7FFFFFFE), 'SEQ_NR': 0};
|
180 |
corvo |
1.2 |
self.__defaultUserCluster = "ApMon_UserSend";
|
181 |
|
|
self.__defaultUserNode = socket.getfqdn();
|
182 |
|
|
self.__defaultSysMonCluster = "ApMon_SysMon";
|
183 |
|
|
self.__defaultSysMonNode = socket.getfqdn();
|
184 |
|
|
# don't touch these:
|
185 |
corvo |
1.3 |
self.__freed = False
|
186 |
corvo |
1.2 |
self.__initializedOK = True
|
187 |
|
|
self.__udpSocket = None
|
188 |
|
|
self.__configUpdateLock = threading.Lock()
|
189 |
|
|
self.__configUpdateEvent = threading.Event()
|
190 |
|
|
self.__bgMonitorLock = threading.Lock()
|
191 |
|
|
self.__bgMonitorEvent = threading.Event()
|
192 |
|
|
# don't allow a user to send more than MAX_MSG messages per second, in average
|
193 |
|
|
self.__crtTime = 0;
|
194 |
|
|
self.__prvTime = 0;
|
195 |
|
|
self.__prvSent = 0;
|
196 |
|
|
self.__prvDrop = 0;
|
197 |
|
|
self.__crtSent = 0;
|
198 |
|
|
self.__crtDrop = 0;
|
199 |
|
|
self.__hWeight = 0.92;
|
200 |
corvo |
1.3 |
self.logger = Logger.Logger(defaultLogLevel)
|
201 |
corvo |
1.1 |
if type(initValue) == type("string"):
|
202 |
|
|
self.configAddresses.append(initValue)
|
203 |
|
|
self.__reloadAddresses()
|
204 |
|
|
elif type(initValue) == type([]):
|
205 |
|
|
self.configAddresses = initValue
|
206 |
|
|
self.__reloadAddresses()
|
207 |
|
|
elif type(initValue) == type(()):
|
208 |
|
|
for dest in initValue:
|
209 |
|
|
self.__addDestination (dest, self.destinations)
|
210 |
|
|
elif type(initValue) == type({}):
|
211 |
|
|
for dest, opts in initValue.items():
|
212 |
|
|
self.__addDestination (dest, self.destinations, opts)
|
213 |
|
|
|
214 |
|
|
self.__initializedOK = (len (self.destinations) > 0)
|
215 |
|
|
if not self.__initializedOK:
|
216 |
|
|
self.logger.log(Logger.ERROR, "Failed to initialize. No destination defined.");
|
217 |
corvo |
1.3 |
# self.__defaultClusterName = None
|
218 |
|
|
# self.__defaultNodeName = self.getMyHo
|
219 |
corvo |
1.1 |
if self.__initializedOK:
|
220 |
|
|
self.__udpSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
221 |
|
|
if len(self.configAddresses) > 0:
|
222 |
|
|
# if there are addresses that need to be monitored,
|
223 |
|
|
# start config checking and reloading thread
|
224 |
|
|
th = threading.Thread(target=self.__configLoader)
|
225 |
|
|
th.setDaemon(True) # this is a daemon thread
|
226 |
|
|
th.start()
|
227 |
|
|
# create the ProcInfo instance
|
228 |
|
|
self.procInfo = ProcInfo.ProcInfo(self.logger);
|
229 |
corvo |
1.3 |
# self.procInfo.update();
|
230 |
corvo |
1.1 |
# start the background monitoring thread
|
231 |
|
|
th = threading.Thread(target=self.__bgMonitor);
|
232 |
|
|
th.setDaemon(True);
|
233 |
|
|
th.start();
|
234 |
|
|
|
235 |
|
|
|
236 |
|
|
def sendParams (self, params):
|
237 |
|
|
"""
|
238 |
|
|
Send multiple parameters to MonALISA, with default (last given) cluser and node names.
|
239 |
|
|
"""
|
240 |
|
|
self.sendTimedParams (-1, params)
|
241 |
|
|
|
242 |
|
|
def sendTimedParams (self, timeStamp, params):
|
243 |
|
|
"""
|
244 |
|
|
Send multiple parameters, specifying the time for them, with default (last given) cluster and node names.
|
245 |
|
|
(See sendTimedParameters for more details)
|
246 |
|
|
"""
|
247 |
|
|
self.sendTimedParameters (None, None, timeStamp, params);
|
248 |
|
|
|
249 |
|
|
def sendParameter (self, clusterName, nodeName, paramName, paramValue):
|
250 |
|
|
"""
|
251 |
|
|
Send a single parameter to MonALISA.
|
252 |
|
|
"""
|
253 |
|
|
self.sendTimedParameter(clusterName, nodeName, -1, paramName, paramValue);
|
254 |
|
|
|
255 |
|
|
def sendTimedParameter (self, clusterName, nodeName, timeStamp, paramName, paramValue):
|
256 |
|
|
"""
|
257 |
|
|
Send a single parameter, with a given time.
|
258 |
|
|
"""
|
259 |
|
|
self.sendTimedParameters (clusterName, nodeName, timeStamp, {paramName:paramValue})
|
260 |
|
|
|
261 |
|
|
def sendParameters (self, clusterName, nodeName, params):
|
262 |
|
|
"""
|
263 |
|
|
Send multiple parameters specifying cluster and node name for them
|
264 |
|
|
"""
|
265 |
|
|
self.sendTimedParameters (clusterName, nodeName, -1, params);
|
266 |
|
|
|
267 |
|
|
def sendTimedParameters (self, clusterName, nodeName, timeStamp, params):
|
268 |
|
|
"""
|
269 |
|
|
Send multiple monitored parameters to MonALISA.
|
270 |
|
|
|
271 |
|
|
- clusterName is the name of the cluster being monitored. The first
|
272 |
|
|
time this function is called, this paramenter must not be None. Then,
|
273 |
|
|
it can be None; last given clusterName will be used instead.
|
274 |
|
|
- nodeName is the name of the node for which are the parameters. If this
|
275 |
|
|
is None, the full hostname of this machine will be sent instead.
|
276 |
|
|
- timeStamp, if > 0, is given time for the parameters. This is in seconds from Epoch.
|
277 |
|
|
Note that this option should be used only if you are sure about the time for the result.
|
278 |
|
|
Otherwize, the parameters will be assigned a correct time (obtained from NTP servers)
|
279 |
|
|
in MonALISA service. This option can be usefull when parsing logs, for example.
|
280 |
|
|
- params is a dictionary containing pairs with:
|
281 |
|
|
- key: parameter name
|
282 |
|
|
- value: parameter value, either int or float.
|
283 |
|
|
or params is a vector of tuples (key, value). This version can be used
|
284 |
|
|
in case you want to send the parameters in a given order.
|
285 |
|
|
|
286 |
|
|
NOTE that python doesn't know about 32-bit floats (only 64-bit floats!)
|
287 |
|
|
"""
|
288 |
|
|
if not self.__initializedOK:
|
289 |
|
|
self.logger.log(Logger.WARNING, "Not initialized correctly. Message NOT sent!");
|
290 |
|
|
return
|
291 |
corvo |
1.3 |
if (clusterName == None) or (clusterName == ""):
|
292 |
corvo |
1.1 |
clusterName = self.__defaultUserCluster
|
293 |
|
|
else:
|
294 |
|
|
self.__defaultUserCluster = clusterName
|
295 |
|
|
if nodeName == None:
|
296 |
|
|
nodeName = self.__defaultUserNode
|
297 |
|
|
else:
|
298 |
|
|
self.__defaultUserNode = nodeName
|
299 |
corvo |
1.3 |
self.__configUpdateLock.acquire();
|
300 |
corvo |
1.1 |
for dest in self.destinations.keys():
|
301 |
corvo |
1.3 |
self.__directSendParams(self.sender, dest, clusterName, nodeName, timeStamp, params);
|
302 |
|
|
self.__configUpdateLock.release();
|
303 |
corvo |
1.1 |
|
304 |
|
|
def addJobToMonitor (self, pid, workDir, clusterName, nodeName):
|
305 |
|
|
"""
|
306 |
|
|
Add a new job to monitor.
|
307 |
|
|
"""
|
308 |
|
|
self.__bgMonitorLock.acquire();
|
309 |
|
|
self.monitoredJobs[pid] = {};
|
310 |
|
|
self.monitoredJobs[pid]['CLUSTER_NAME'] = clusterName;
|
311 |
|
|
self.monitoredJobs[pid]['NODE_NAME'] = nodeName;
|
312 |
|
|
self.procInfo.addJobToMonitor(pid, workDir);
|
313 |
|
|
self.__bgMonitorLock.release();
|
314 |
|
|
|
315 |
|
|
def removeJobToMonitor (self, pid):
|
316 |
|
|
"""
|
317 |
|
|
Remove a job from being monitored.
|
318 |
|
|
"""
|
319 |
|
|
self.__bgMonitorLock.acquire();
|
320 |
|
|
self.procInfo.removeJobToMonitor(pid);
|
321 |
|
|
del self.monitoredJobs[pid];
|
322 |
|
|
self.__bgMonitorLock.release();
|
323 |
|
|
|
324 |
|
|
def setMonitorClusterNode (self, clusterName, nodeName):
|
325 |
|
|
"""
|
326 |
|
|
Set the cluster and node names where to send system related information.
|
327 |
|
|
"""
|
328 |
|
|
self.__bgMonitorLock.acquire();
|
329 |
corvo |
1.3 |
if (clusterName != None) and (clusterName != ""):
|
330 |
|
|
self.__defaultSysMonCluster = clusterName;
|
331 |
|
|
if (nodeName != None) and (nodeName != ""):
|
332 |
|
|
self.__defaultSysMonNode = nodeName;
|
333 |
corvo |
1.1 |
self.__bgMonitorLock.release();
|
334 |
|
|
|
335 |
|
|
def enableBgMonitoring (self, onOff):
|
336 |
|
|
"""
|
337 |
|
|
Enable or disable background monitoring. Note that background monitoring information
|
338 |
|
|
can still be sent if user calls the sendBgMonitoring method.
|
339 |
|
|
"""
|
340 |
|
|
self.performBgMonitoring = onOff;
|
341 |
corvo |
1.3 |
|
342 |
|
|
def sendBgMonitoring (self, mustSend = False):
|
343 |
corvo |
1.1 |
"""
|
344 |
corvo |
1.3 |
Send background monitoring about system and jobs to all interested destinations.
|
345 |
|
|
If mustSend == True, the information is sent regardles of the elapsed time since last sent
|
346 |
|
|
If mustSend == False, the data is sent only if the required interval has passed since last sent
|
347 |
corvo |
1.1 |
"""
|
348 |
|
|
self.__bgMonitorLock.acquire();
|
349 |
|
|
now = int(time.time());
|
350 |
corvo |
1.3 |
updatedProcInfo = False;
|
351 |
|
|
for destination, options in self.destinations.iteritems():
|
352 |
corvo |
1.1 |
sysParams = [];
|
353 |
|
|
jobParams = [];
|
354 |
corvo |
1.3 |
prevRawData = self.destPrevData[destination];
|
355 |
corvo |
1.1 |
# for each destination and its options, check if we have to report any background monitoring data
|
356 |
corvo |
1.3 |
if(options['sys_monitoring'] and (mustSend or options['sys_data_sent'] + options['sys_interval'] <= now)):
|
357 |
corvo |
1.1 |
for param, active in options.items():
|
358 |
|
|
m = re.match("sys_(.+)", param);
|
359 |
|
|
if(m != None and active):
|
360 |
|
|
param = m.group(1);
|
361 |
|
|
if not (param == 'monitoring' or param == 'interval' or param == 'data_sent'):
|
362 |
|
|
sysParams.append(param)
|
363 |
|
|
options['sys_data_sent'] = now;
|
364 |
corvo |
1.3 |
if(options['job_monitoring'] and (mustSend or options['job_data_sent'] + options['job_interval'] <= now)):
|
365 |
corvo |
1.1 |
for param, active in options.items():
|
366 |
|
|
m = re.match("job_(.+)", param);
|
367 |
|
|
if(m != None and active):
|
368 |
|
|
param = m.group(1);
|
369 |
|
|
if not (param == 'monitoring' or param == 'interval' or param == 'data_sent'):
|
370 |
|
|
jobParams.append(param);
|
371 |
|
|
options['job_data_sent'] = now;
|
372 |
corvo |
1.3 |
if(options['general_info'] and (mustSend or options['general_data_sent'] + 2 * int(options['sys_interval']) <= now)):
|
373 |
corvo |
1.1 |
for param, active in options.items():
|
374 |
|
|
if not (param.startswith("sys_") or param.startswith("job_")) and active:
|
375 |
|
|
if not (param == 'general_info' or param == 'general_data_sent'):
|
376 |
|
|
sysParams.append(param);
|
377 |
corvo |
1.3 |
options['general_data_sent'] = now;
|
378 |
|
|
|
379 |
|
|
if (not updatedProcInfo) and ((len(sysParams) > 0) or (len(jobParams) > 0) ):
|
380 |
|
|
self.procInfo.update();
|
381 |
|
|
updatedProcInf = True;
|
382 |
|
|
|
383 |
corvo |
1.1 |
sysResults = {}
|
384 |
|
|
if(len(sysParams) > 0):
|
385 |
corvo |
1.3 |
sysResults = self.procInfo.getSystemData(sysParams, prevRawData);
|
386 |
|
|
self.updateLastSentTime(options, self.destinations[destination]);
|
387 |
|
|
if( (type(sysResults) == type( {} )) and len(sysResults.keys()) > 0):
|
388 |
|
|
self.__directSendParams(self.sender, destination, self.__defaultSysMonCluster, self.__defaultSysMonNode, -1, sysResults);
|
389 |
|
|
if( (type(sysResults) == type( [] )) and len(sysResults) > 0):
|
390 |
|
|
self.__directSendParams(self.sender, destination, self.__defaultSysMonCluster, self.__defaultSysMonNode, -1, sysResults);
|
391 |
corvo |
1.1 |
for pid, props in self.monitoredJobs.items():
|
392 |
|
|
jobResults = {};
|
393 |
|
|
if(len(jobParams) > 0):
|
394 |
|
|
jobResults = self.procInfo.getJobData(pid, jobParams);
|
395 |
|
|
if(len(jobResults) > 0):
|
396 |
corvo |
1.3 |
self.__directSendParams(self.sender, destination, props['CLUSTER_NAME'], props['NODE_NAME'], -1, jobResults);
|
397 |
corvo |
1.1 |
self.__bgMonitorLock.release();
|
398 |
|
|
|
399 |
corvo |
1.3 |
# copy the time when last data was sent
|
400 |
|
|
def updateLastSentTime(self, srcOpts, dstOpts):
|
401 |
|
|
if srcOpts.has_key('general_data_sent'):
|
402 |
|
|
dstOpts['general_data_sent'] = srcOpts['general_data_sent'];
|
403 |
|
|
if srcOpts.has_key('sys_data_sent'):
|
404 |
|
|
dstOpts['sys_data_sent'] = srcOpts['sys_data_sent'];
|
405 |
|
|
if srcOpts.has_key('job_data_sent'):
|
406 |
|
|
dstOpts['job_data_sent'] = srcOpts['job_data_sent'];
|
407 |
|
|
|
408 |
corvo |
1.1 |
def setLogLevel (self, strLevel):
|
409 |
|
|
"""
|
410 |
|
|
Change the log level. Given level is a string, one of 'FATAL', 'ERROR', 'WARNING',
|
411 |
|
|
'INFO', 'NOTICE', 'DEBUG'.
|
412 |
|
|
"""
|
413 |
|
|
self.logger.setLogLevel(strLevel);
|
414 |
|
|
|
415 |
corvo |
1.2 |
def setMaxMsgRate(self, rate):
|
416 |
|
|
"""
|
417 |
|
|
Set the maximum number of messages that can be sent, per second.
|
418 |
|
|
"""
|
419 |
|
|
self.maxMsgRate = rate;
|
420 |
|
|
self.logger.log(Logger.DEBUG, "Setting maxMsgRate to: " + str(rate));
|
421 |
|
|
|
422 |
|
|
def free(self):
|
423 |
|
|
"""
|
424 |
|
|
Stop background threands, close opened sockets. You have to use this function if you want to
|
425 |
|
|
free all the resources that ApMon takes, and allow it to be garbage-collected.
|
426 |
|
|
"""
|
427 |
corvo |
1.3 |
if self.__configUpdateEvent != None:
|
428 |
|
|
self.__configUpdateEvent.set();
|
429 |
|
|
if self.__bgMonitorEvent != None:
|
430 |
|
|
self.__bgMonitorEvent.set();
|
431 |
corvo |
1.2 |
time.sleep(0.01);
|
432 |
|
|
if self.__udpSocket != None:
|
433 |
|
|
self.logger.log(Logger.DEBUG, "Closing UDP socket on ApMon object destroy.");
|
434 |
|
|
self.__udpSocket.close();
|
435 |
corvo |
1.3 |
self.__udpSocket = None;
|
436 |
|
|
self.__freed = True
|
437 |
corvo |
1.2 |
|
438 |
|
|
|
439 |
corvo |
1.1 |
#########################################################################################
|
440 |
|
|
# Internal functions - Config reloader thread
|
441 |
|
|
#########################################################################################
|
442 |
|
|
|
443 |
|
|
def __configLoader(self):
|
444 |
|
|
"""
|
445 |
|
|
Main loop of the thread that checks for changes and reloads the configuration
|
446 |
|
|
"""
|
447 |
corvo |
1.2 |
while not self.__configUpdateEvent.isSet():
|
448 |
|
|
self.__configUpdateEvent.wait(self.configRecheckInterval);
|
449 |
|
|
if self.__configUpdateEvent.isSet():
|
450 |
|
|
return;
|
451 |
corvo |
1.1 |
if self.configRecheck:
|
452 |
|
|
self.__reloadAddresses()
|
453 |
|
|
self.logger.log(Logger.DEBUG, "Config reloaded. Seleeping for "+`self.configRecheckInterval`+" sec.");
|
454 |
|
|
|
455 |
|
|
def __reloadAddresses(self):
|
456 |
|
|
"""
|
457 |
|
|
Refresh destinations hash, by loading data from all sources in configAddresses
|
458 |
|
|
"""
|
459 |
|
|
newDestinations = {}
|
460 |
|
|
for src in self.configAddresses:
|
461 |
|
|
self.__initializeFromFile(src, newDestinations)
|
462 |
|
|
# avoid changing config in the middle of sending packets to previous destinations
|
463 |
|
|
self.__configUpdateLock.acquire()
|
464 |
|
|
self.destinations = newDestinations
|
465 |
|
|
self.__configUpdateLock.release()
|
466 |
|
|
|
467 |
|
|
def __addDestination (self, aDestination, tempDestinations, options = __defaultOptions):
|
468 |
|
|
"""
|
469 |
|
|
Add a destination to the list.
|
470 |
|
|
|
471 |
|
|
aDestination is a string of the form "{hostname|ip}[:port] [passwd]" without quotes.
|
472 |
|
|
If the port is not given, it will be used the default port (8884)
|
473 |
|
|
If the password is missing, it will be considered an empty string
|
474 |
|
|
"""
|
475 |
|
|
aDestination = aDestination.strip().replace('\t', ' ')
|
476 |
|
|
while aDestination != aDestination.replace(' ', ' '):
|
477 |
|
|
aDestination = aDestination.replace(' ', ' ')
|
478 |
|
|
sepPort = aDestination.find (':')
|
479 |
|
|
sepPasswd = aDestination.rfind (' ')
|
480 |
|
|
if sepPort >= 0:
|
481 |
|
|
host = aDestination[0:sepPort].strip()
|
482 |
|
|
if sepPasswd > sepPort + 1:
|
483 |
|
|
port = aDestination[sepPort+1:sepPasswd].strip()
|
484 |
|
|
passwd = aDestination[sepPasswd:].strip()
|
485 |
|
|
else:
|
486 |
|
|
port = aDestination[sepPort+1:].strip()
|
487 |
|
|
passwd = ""
|
488 |
|
|
else:
|
489 |
|
|
port = str(self.__defaultPort)
|
490 |
|
|
if sepPasswd >= 0:
|
491 |
|
|
host = aDestination[0:sepPasswd].strip()
|
492 |
|
|
passwd = aDestination[sepPasswd:].strip()
|
493 |
|
|
else:
|
494 |
|
|
host = aDestination.strip()
|
495 |
|
|
passwd = ""
|
496 |
|
|
if (not port.isdigit()):
|
497 |
|
|
self.logger.log(Logger.WARNING, "Bad value for port number "+`port`+" in "+aDestination+" destination");
|
498 |
|
|
return
|
499 |
|
|
alreadyAdded = False
|
500 |
|
|
port = int(port)
|
501 |
|
|
host = socket.gethostbyname(host) # convert hostnames to IP addresses to avoid suffocating DNSs
|
502 |
|
|
for h, p, w in tempDestinations.keys():
|
503 |
|
|
if (h == host) and (p == port):
|
504 |
|
|
alreadyAdded = True
|
505 |
|
|
break
|
506 |
corvo |
1.3 |
destination = (host, port, passwd);
|
507 |
corvo |
1.1 |
if not alreadyAdded:
|
508 |
|
|
self.logger.log(Logger.INFO, "Adding destination "+host+':'+`port`+' '+passwd);
|
509 |
corvo |
1.3 |
tempDestinations[destination] = copy.deepcopy(self.__defaultOptions); # have a different set of options for each dest.
|
510 |
|
|
self.destPrevData[destination] = {};
|
511 |
corvo |
1.1 |
if options != self.__defaultOptions:
|
512 |
|
|
# we have to overwrite defaults with given options
|
513 |
|
|
for key, value in options.items():
|
514 |
|
|
self.logger.log(Logger.NOTICE, "Overwritting option: "+key+" = "+`value`);
|
515 |
corvo |
1.3 |
tempDestinations[destination][key] = value;
|
516 |
corvo |
1.1 |
else:
|
517 |
corvo |
1.2 |
self.logger.log(Logger.NOTICE, "Destination "+host+":"+str(port)+" "+passwd+" already added. Skipping it");
|
518 |
corvo |
1.1 |
|
519 |
|
|
def __initializeFromFile (self, confFileName, tempDestinations):
|
520 |
|
|
"""
|
521 |
|
|
Load destinations from confFileName file. If it's an URL (starts with "http://")
|
522 |
|
|
load configuration from there. Put all destinations in tempDestinations hash.
|
523 |
|
|
|
524 |
|
|
Calls addDestination for each line that doesn't start with # and
|
525 |
|
|
has non-whitespace characters on it
|
526 |
|
|
"""
|
527 |
|
|
try:
|
528 |
|
|
if confFileName.find ("http://") == 0:
|
529 |
|
|
confFile = urllib2.urlopen (confFileName)
|
530 |
|
|
else:
|
531 |
|
|
confFile = open (confFileName)
|
532 |
|
|
except urllib2.HTTPError, e:
|
533 |
|
|
self.logger.log(Logger.ERROR, "Cannot open "+confFileName);
|
534 |
|
|
if e.code == 401:
|
535 |
|
|
self.logger.log(Logger.ERROR, 'HTTPError: not authorized.');
|
536 |
|
|
elif e.code == 404:
|
537 |
|
|
self.logger.log(Logger.ERROR, 'HTTPError: not found.');
|
538 |
|
|
elif e.code == 503:
|
539 |
|
|
self.logger.log(Logger.ERROR, 'HTTPError: service unavailable.');
|
540 |
|
|
else:
|
541 |
|
|
self.logger.log(Logger.ERROR, 'HTTPError: unknown error.');
|
542 |
|
|
return
|
543 |
|
|
except urllib2.URLError, ex:
|
544 |
|
|
self.logger.log(Logger.ERROR, "Cannot open "+confFileName);
|
545 |
|
|
self.logger.log(Logger.ERROR, "URL Error: "+str(ex.reason[1]));
|
546 |
|
|
return
|
547 |
|
|
except IOError, ex:
|
548 |
|
|
self.logger.log(Logger.ERROR, "Cannot open "+confFileName);
|
549 |
|
|
self.logger.log(Logger.ERROR, "IOError: "+str(ex));
|
550 |
|
|
return
|
551 |
|
|
self.logger.log(Logger.INFO, "Adding destinations from "+confFileName);
|
552 |
|
|
dests = []
|
553 |
|
|
opts = {}
|
554 |
|
|
while(True):
|
555 |
|
|
line = confFile.readline();
|
556 |
|
|
if line == '':
|
557 |
|
|
break;
|
558 |
|
|
line = line.strip()
|
559 |
|
|
self.logger.log(Logger.DEBUG, "Reading line "+line);
|
560 |
|
|
if (len(line) == 0) or (line[0] == '#'):
|
561 |
|
|
continue
|
562 |
|
|
elif line.startswith("xApMon_"):
|
563 |
|
|
m = re.match("xApMon_(.*)", line);
|
564 |
|
|
if m != None:
|
565 |
|
|
m = re.match("(\S+)\s*=\s*(\S+)", m.group(1));
|
566 |
|
|
if m != None:
|
567 |
|
|
param = m.group(1); value = m.group(2);
|
568 |
|
|
if(value.upper() == "ON"):
|
569 |
|
|
value = True;
|
570 |
|
|
elif(value.upper() == "OFF"):
|
571 |
|
|
value = False;
|
572 |
|
|
elif(param.endswith("_interval")):
|
573 |
|
|
value = int(value);
|
574 |
|
|
if param == "loglevel":
|
575 |
|
|
self.logger.setLogLevel(value);
|
576 |
corvo |
1.2 |
elif param == "maxMsgRate":
|
577 |
|
|
self.setMaxMsgRate(int(value));
|
578 |
corvo |
1.1 |
elif param == "conf_recheck":
|
579 |
|
|
self.configRecheck = value;
|
580 |
|
|
elif param == "recheck_interval":
|
581 |
|
|
self.configRecheckInterval = value;
|
582 |
|
|
elif param.endswith("_data_sent"):
|
583 |
|
|
pass; # don't reset time in sys/job/general/_data_sent
|
584 |
|
|
else:
|
585 |
|
|
opts[param] = value;
|
586 |
|
|
else:
|
587 |
|
|
dests.append(line);
|
588 |
corvo |
1.3 |
|
589 |
corvo |
1.1 |
confFile.close ()
|
590 |
|
|
for line in dests:
|
591 |
|
|
self.__addDestination(line, tempDestinations, opts)
|
592 |
|
|
|
593 |
|
|
###############################################################################################
|
594 |
|
|
# Internal functions - Background monitor thread
|
595 |
|
|
###############################################################################################
|
596 |
|
|
|
597 |
|
|
def __bgMonitor (self):
|
598 |
corvo |
1.2 |
while not self.__bgMonitorEvent.isSet():
|
599 |
|
|
self.__bgMonitorEvent.wait(10);
|
600 |
|
|
if self.__bgMonitorEvent.isSet():
|
601 |
|
|
return;
|
602 |
corvo |
1.1 |
if self.performBgMonitoring:
|
603 |
corvo |
1.3 |
self.sendBgMonitoring() # send only if the interval has elapsed
|
604 |
corvo |
1.1 |
|
605 |
|
|
###############################################################################################
|
606 |
|
|
# Internal helper functions
|
607 |
|
|
###############################################################################################
|
608 |
|
|
|
609 |
corvo |
1.3 |
def __directSendParams (self, senderRef, destination, clusterName, nodeName, timeStamp, params):
|
610 |
|
|
|
611 |
|
|
if senderRef=={}:
|
612 |
|
|
self.logger.log(Logger.WARNING, "Not sending undefined parameters!");
|
613 |
|
|
return;
|
614 |
corvo |
1.2 |
|
615 |
|
|
if self.__shouldSend() == False:
|
616 |
|
|
return;
|
617 |
|
|
|
618 |
corvo |
1.3 |
if destination == None:
|
619 |
|
|
self.logger.log(Logger.WARNING, "Destination is None");
|
620 |
|
|
return;
|
621 |
|
|
|
622 |
corvo |
1.1 |
host, port, passwd = destination
|
623 |
corvo |
1.3 |
senderRef['SEQ_NR'] = (senderRef['SEQ_NR'] + 1) % 2000000000; # wrap around 2 mld
|
624 |
|
|
|
625 |
|
|
xdrPacker = xdrlib.Packer ();
|
626 |
corvo |
1.1 |
self.logger.log(Logger.DEBUG, "Building XDR packet for ["+str(clusterName)+"] <"+str(nodeName)+"> len:"+str(len(params)));
|
627 |
corvo |
1.3 |
|
628 |
corvo |
1.1 |
xdrPacker.pack_string ("v:"+self.__version+"p:"+passwd)
|
629 |
corvo |
1.3 |
|
630 |
|
|
xdrPacker.pack_int (senderRef['INSTANCE_ID']);
|
631 |
|
|
xdrPacker.pack_int (senderRef['SEQ_NR']);
|
632 |
|
|
|
633 |
|
|
xdrPacker.pack_string (clusterName);
|
634 |
|
|
xdrPacker.pack_string (nodeName);
|
635 |
|
|
xdrPacker.pack_int (len(params));
|
636 |
|
|
|
637 |
corvo |
1.1 |
if type(params) == type( {} ):
|
638 |
|
|
for name, value in params.iteritems():
|
639 |
|
|
self.__packParameter(xdrPacker, name, value)
|
640 |
|
|
elif type(params) == type( [] ):
|
641 |
|
|
for name, value in params:
|
642 |
|
|
self.logger.log(Logger.DEBUG, "Adding parameter "+name+" = "+str(value));
|
643 |
|
|
self.__packParameter(xdrPacker, name, value)
|
644 |
|
|
else:
|
645 |
|
|
self.logger.log(Logger.WARNING, "Unsupported params type in sendParameters: " + str(type(params)));
|
646 |
corvo |
1.3 |
if (timeStamp != None) and (timeStamp > 0):
|
647 |
corvo |
1.1 |
xdrPacker.pack_int(timeStamp);
|
648 |
corvo |
1.3 |
|
649 |
corvo |
1.1 |
buffer = xdrPacker.get_buffer();
|
650 |
|
|
# send this buffer to the destination, using udp datagrams
|
651 |
|
|
try:
|
652 |
|
|
self.__udpSocket.sendto(buffer, (host, port))
|
653 |
corvo |
1.3 |
self.logger.log(Logger.DEBUG, "Packet sent to "+host+":"+str(port)+" "+passwd);
|
654 |
corvo |
1.1 |
except socket.error, msg:
|
655 |
|
|
self.logger.log(Logger.ERROR, "Cannot send packet to "+host+":"+str(port)+" "+passwd+": "+str(msg[1]));
|
656 |
|
|
xdrPacker.reset()
|
657 |
|
|
|
658 |
|
|
def __packParameter(self, xdrPacker, name, value):
|
659 |
corvo |
1.3 |
if (name is None) or (name is ""):
|
660 |
|
|
self.logger.log(Logger.WARNING, "Undefine parameter name.");
|
661 |
|
|
return;
|
662 |
|
|
if (value is None):
|
663 |
|
|
self.logger.log(Logger.WARNING, "Ignore " + str(name)+ " parameter because of None value");
|
664 |
|
|
return;
|
665 |
corvo |
1.1 |
try:
|
666 |
|
|
typeValue = self.__valueTypes[type(value)]
|
667 |
|
|
xdrPacker.pack_string (name)
|
668 |
|
|
xdrPacker.pack_int (typeValue)
|
669 |
|
|
self.__packFunctions[typeValue] (xdrPacker, value)
|
670 |
|
|
self.logger.log(Logger.DEBUG, "Adding parameter "+str(name)+" = "+str(value));
|
671 |
|
|
except Exception, ex:
|
672 |
|
|
print "ApMon: error packing %s = %s; got %s" % (name, str(value), ex)
|
673 |
corvo |
1.3 |
|
674 |
corvo |
1.1 |
# Destructor
|
675 |
|
|
def __del__(self):
|
676 |
corvo |
1.3 |
if not self.__freed:
|
677 |
|
|
self.free();
|
678 |
corvo |
1.2 |
|
679 |
|
|
# Decide if the current datagram should be sent.
|
680 |
|
|
# This decision is based on the number of messages previously sent.
|
681 |
|
|
def __shouldSend(self):
|
682 |
|
|
now = long(time.time());
|
683 |
|
|
if now != self.__crtTime :
|
684 |
|
|
# new time
|
685 |
|
|
# update previous counters;
|
686 |
|
|
self.__prvSent = self.__hWeight * self.__prvSent + (1.0 - self.__hWeight) * self.__crtSent / (now - self.__crtTime);
|
687 |
|
|
self.__prvTime = self.__crtTime;
|
688 |
|
|
self.logger.log(Logger.DEBUG, "previously sent: " + str(self.__crtSent) + "; dropped: " + str(self.__crtDrop));
|
689 |
|
|
# reset current counter
|
690 |
|
|
self.__crtTime = now;
|
691 |
|
|
self.__crtSent = 0;
|
692 |
|
|
self.__crtDrop = 0;
|
693 |
|
|
|
694 |
|
|
# compute the history
|
695 |
|
|
valSent = self.__prvSent * self.__hWeight + self.__crtSent * (1 - self.__hWeight);
|
696 |
|
|
|
697 |
|
|
doSend = True;
|
698 |
|
|
|
699 |
|
|
# when we should start dropping messages
|
700 |
|
|
level = self.maxMsgRate - self.maxMsgRate / 10;
|
701 |
|
|
|
702 |
|
|
if valSent > (self.maxMsgRate - level) :
|
703 |
|
|
if random.randint(0,self.maxMsgRate / 10) >= (self.maxMsgRate - valSent):
|
704 |
|
|
doSend = False;
|
705 |
|
|
|
706 |
|
|
# counting sent and dropped messages
|
707 |
|
|
if doSend:
|
708 |
|
|
self.__crtSent+=1;
|
709 |
|
|
else:
|
710 |
|
|
self.__crtDrop+=1;
|
711 |
|
|
|
712 |
|
|
return doSend;
|
713 |
|
|
|
714 |
corvo |
1.1 |
################################################################################################
|
715 |
|
|
# Private variables. Don't touch
|
716 |
|
|
################################################################################################
|
717 |
|
|
|
718 |
|
|
__valueTypes = {
|
719 |
|
|
type("string"): 0, # XDR_STRING (see ApMon.h from C/C++ ApMon version)
|
720 |
|
|
type(1): 2, # XDR_INT32
|
721 |
|
|
type(1.0): 5}; # XDR_REAL64
|
722 |
|
|
|
723 |
|
|
__packFunctions = {
|
724 |
|
|
0: xdrlib.Packer.pack_string,
|
725 |
|
|
2: xdrlib.Packer.pack_int,
|
726 |
|
|
5: xdrlib.Packer.pack_double }
|
727 |
|
|
|
728 |
|
|
__defaultPort = 8884
|
729 |
corvo |
1.3 |
__version = "2.2.1-py" # apMon version number
|
730 |
corvo |
1.1 |
|