ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/apmon.py
Revision: 1.4
Committed: Tue Mar 21 16:05:47 2006 UTC (19 years, 1 month ago) by corvo
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_1_0_7, CRAB_1_0_7_pre1
Changes since 1.3: +18 -16 lines
Log Message:
Fixed thread problems

File Contents

# Content
1
2 """
3 * ApMon - Application Monitoring Tool
4 * Version: 2.2.2
5 *
6 * Copyright (C) 2006 California Institute of Technology
7 *
8 * Permission is hereby granted, free of charge, to use, copy and modify
9 * this software and its documentation (the "Software") for any
10 * purpose, provided that existing copyright notices are retained in
11 * all copies and that this notice is included verbatim in any distributions
12 * or substantial portions of the Software.
13 * This software is a part of the MonALISA framework (http://monalisa.cacr.caltech.edu).
14 * Users of the Software are asked to feed back problems, benefits,
15 * and/or suggestions about the software to the MonALISA Development Team
16 * (developers@monalisa.cern.ch). Support for this software - fixing of bugs,
17 * incorporation of new features - is done on a best effort basis. All bug
18 * fixes and enhancements will be made available under the same terms and
19 * conditions as the original software,
20
21 * IN NO EVENT SHALL THE AUTHORS OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
22 * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
23 * OF THE USE OF THIS SOFTWARE, ITS DOCUMENTATION, OR ANY DERIVATIVES THEREOF,
24 * EVEN IF THE AUTHORS HAVE BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
25
26 * THE AUTHORS AND DISTRIBUTORS SPECIFICALLY DISCLAIM ANY WARRANTIES,
27 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY,
28 * FITNESS FOR A PARTICULAR PURPOSE, AND NON-INFRINGEMENT. THIS SOFTWARE IS
29 * PROVIDED ON AN "AS IS" BASIS, AND THE AUTHORS AND DISTRIBUTORS HAVE NO
30 * OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR
31 * MODIFICATIONS.
32 """
33
34 """
35 apmon.py
36
37 This is a python implementation for the ApMon API for sending
38 data to the MonALISA service.
39
40 For further details about ApMon please see the C/C++ or Java documentation
41 You can find a sample usage of this module in apmTest.py.
42
43 Note that the parameters must be either integers(32 bits) or doubles(64 bits).
44 Sending strings is supported, but they will not be stored in the
45 farm's store nor shown in the farm's window in the MonALISA client.
46 """
47
48 import re
49 import xdrlib
50 import socket
51 import urllib2
52 import threading
53 import time
54 import Logger
55 import ProcInfo
56 import random
57 import copy
58
59 #__all__ = ["ApMon"]
60
61 #__debug = False # self.destPrevData[destination];set this to True to be verbose
62
63 class ApMon:
64 """
65 Main class for sending monitoring data to a MonaLisa module.
66 One or more destinations can be chosen for the data. See constructor.
67
68 The data is packed in UDP datagrams, using XDR. The following fields are sent:
69 - version & password (string)
70 - cluster name (string)
71 - node name (string)
72 - number of parameters (int)
73 - for each parameter:
74 - name (string)
75 - value type (int)
76 - value
77 - optionally a (int) with the given timestamp
78
79 Attributes (public):
80 - destinations - a list containing (ip, port, password) tuples
81 - configAddresses - list with files and urls from where the config is read
82 - configRecheckInterval - period, in seconds, to check for changes
83 in the configAddresses list
84 - configRecheck - boolean - whether to recheck periodically for changes
85 in the configAddresses list
86 """
87
88 __defaultOptions = {
89 'job_monitoring': True, # perform (or not) job monitoring
90 'job_interval' : 10, # at this interval (in seconds)
91 'job_data_sent' : 0, # time from Epoch when job information was sent; don't touch!
92
93 'job_cpu_time' : True, # elapsed time from the start of this job in seconds
94 'job_run_time' : True, # processor time spent running this job in seconds
95 'job_cpu_usage' : True, # current percent of the processor used for this job, as reported by ps
96 'job_virtualmem': True, # size in JB of the virtual memory occupied by the job, as reported by ps
97 'job_rss' : True, # size in KB of the resident image size of the job, as reported by ps
98 'job_mem_usage' : True, # percent of the memory occupied by the job, as reported by ps
99 'job_workdir_size': True, # size in MB of the working directory of the job
100 'job_disk_total': True, # size in MB of the total size of the disk partition containing the working directory
101 'job_disk_used' : True, # size in MB of the used disk partition containing the working directory
102 'job_disk_free' : True, # size in MB of the free disk partition containing the working directory
103 'job_disk_usage': True, # percent of the used disk partition containing the working directory
104 'job_open_files': True, # number of open file descriptors
105
106 'sys_monitoring': True, # perform (or not) system monitoring
107 'sys_interval' : 10, # at this interval (in seconds)
108 'sys_data_sent' : 0, # time from Epoch when system information was sent; don't touch!
109
110 'sys_cpu_usr' : False, # cpu-usage information
111 'sys_cpu_sys' : False, # all these will produce coresponding paramas without "sys_"
112 'sys_cpu_nice' : False,
113 'sys_cpu_idle' : False,
114 'sys_cpu_usage' : True,
115 'sys_load1' : True, # system load information
116 'sys_load5' : True,
117 'sys_load15' : True,
118 'sys_mem_used' : False, # memory usage information
119 'sys_mem_free' : False,
120 'sys_mem_usage' : True,
121 'sys_pages_in' : False,
122 'sys_pages_out' : False,
123 'sys_swap_used' : True, # swap usage information
124 'sys_swap_free' : False,
125 'sys_swap_usage': True,
126 'sys_swap_in' : False,
127 'sys_swap_out' : False,
128 'sys_net_in' : True, # network transfer in kBps
129 'sys_net_out' : True, # these will produce params called ethX_in, ethX_out, ethX_errs
130 'sys_net_errs' : False, # for each eth interface
131 'sys_net_sockets' : True, # number of opened sockets for each proto => sockets_tcp/udp/unix ...
132 'sys_net_tcp_details' : True, # number of tcp sockets in each state => sockets_tcp_LISTEN, ...
133 'sys_processes' : True,
134 'sys_uptime' : True, # uptime of the machine, in days (float number)
135
136 'general_info' : True, # send (or not) general host information once every 2 $sys_interval seconds
137 'general_data_sent': 0, # time from Epoch when general information was sent; don't touch!
138
139 'hostname' : True,
140 'ip' : True, # will produce ethX_ip params for each interface
141 'cpu_MHz' : True,
142 'no_CPUs' : True, # number of CPUs
143 'total_mem' : True,
144 'total_swap' : True,
145 'cpu_vendor_id' : True,
146 'cpu_family' : True,
147 'cpu_model' : True,
148 'cpu_model_name': True,
149 'bogomips' : True};
150
151 def __init__ (self, initValue, defaultLogLevel = Logger.INFO):
152 """
153 Class constructor:
154 - if initValue is a string, put it in configAddresses and load destinations
155 from the file named like that. if it starts with "http://", the configuration
156 is loaded from that URL. For background monitoring, given parameters will overwrite defaults
157
158 - if initValue is a list, put its contents in configAddresses and create
159 the list of destinations from all those sources. For background monitoring,
160 given parameters will overwrite defaults (see __defaultOptions)
161
162 - if initValue is a tuple (of strings), initialize destinations with that values.
163 Strings in this tuple have this form: "{hostname|ip}[:port][ passwd]", the
164 default port being 8884 and the default password being "". Background monitoring will be
165 enabled sending the parameters active from __defaultOptions (see end of file)
166
167 - if initValue is a hash (key = string(hostname|ip[:port][ passwd]),
168 val = hash{'param_name': True/False, ...}) the given options for each destination
169 will overwrite the default parameters (see __defaultOptions)
170 """
171 self.destinations = {} # empty, by default; key = tuple (host, port, pass) ; val = hash {"param_mame" : True/False, ...}
172 self.destPrevData = {} # empty, by defaul; key = tuple (host, port, pass) ; val = hash {"param_mame" : value, ...}
173 self.configAddresses = [] # empty, by default; list of files/urls from where we read config
174 self.configRecheckInterval = 120 # 2 minutes
175 self.configRecheck = True # enabled by default
176 self.performBgMonitoring = True # by default, perform background monitoring
177 self.monitoredJobs = {} # Monitored jobs; key = pid; value = hash with
178 self.maxMsgRate = 20; # Maximum number of messages allowed to be sent per second
179 self.sender = {'INSTANCE_ID': random.randint(0,0x7FFFFFFE), 'SEQ_NR': 0};
180 self.__defaultUserCluster = "ApMon_UserSend";
181 self.__defaultUserNode = socket.getfqdn();
182 self.__defaultSysMonCluster = "ApMon_SysMon";
183 self.__defaultSysMonNode = socket.getfqdn();
184 # don't touch these:
185 self.__freed = False
186 self.__initializedOK = True
187 self.__udpSocket = None
188 self.__configUpdateLock = threading.Lock()
189 self.__configUpdateEvent = threading.Event()
190 self.__configUpdateFinished = threading.Event()
191 self.__bgMonitorLock = threading.Lock()
192 self.__bgMonitorEvent = threading.Event()
193 self.__bgMonitorFinished = threading.Event()
194 # don't allow a user to send more than MAX_MSG messages per second, in average
195 self.__crtTime = 0;
196 self.__prvTime = 0;
197 self.__prvSent = 0;
198 self.__prvDrop = 0;
199 self.__crtSent = 0;
200 self.__crtDrop = 0;
201 self.__hWeight = 0.92;
202 self.logger = Logger.Logger(defaultLogLevel)
203 if type(initValue) == type("string"):
204 self.configAddresses.append(initValue)
205 self.__reloadAddresses()
206 elif type(initValue) == type([]):
207 self.configAddresses = initValue
208 self.__reloadAddresses()
209 elif type(initValue) == type(()):
210 for dest in initValue:
211 self.__addDestination (dest, self.destinations)
212 elif type(initValue) == type({}):
213 for dest, opts in initValue.items():
214 self.__addDestination (dest, self.destinations, opts)
215
216 self.__initializedOK = (len (self.destinations) > 0)
217 if not self.__initializedOK:
218 self.logger.log(Logger.ERROR, "Failed to initialize. No destination defined.");
219 # self.__defaultClusterName = None
220 # self.__defaultNodeName = self.getMyHo
221 if self.__initializedOK:
222 self.__udpSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
223 if len(self.configAddresses) > 0:
224 # if there are addresses that need to be monitored,
225 # start config checking and reloading thread
226 th = threading.Thread(target=self.__configLoader)
227 th.setDaemon(True) # this is a daemon thread
228 th.start()
229 # create the ProcInfo instance
230 self.procInfo = ProcInfo.ProcInfo(self.logger);
231 # self.procInfo.update();
232 # start the background monitoring thread
233 th = threading.Thread(target=self.__bgMonitor);
234 th.setDaemon(True);
235 th.start();
236
237
238 def sendParams (self, params):
239 """
240 Send multiple parameters to MonALISA, with default (last given) cluser and node names.
241 """
242 self.sendTimedParams (-1, params)
243
244 def sendTimedParams (self, timeStamp, params):
245 """
246 Send multiple parameters, specifying the time for them, with default (last given) cluster and node names.
247 (See sendTimedParameters for more details)
248 """
249 self.sendTimedParameters (None, None, timeStamp, params);
250
251 def sendParameter (self, clusterName, nodeName, paramName, paramValue):
252 """
253 Send a single parameter to MonALISA.
254 """
255 self.sendTimedParameter(clusterName, nodeName, -1, paramName, paramValue);
256
257 def sendTimedParameter (self, clusterName, nodeName, timeStamp, paramName, paramValue):
258 """
259 Send a single parameter, with a given time.
260 """
261 self.sendTimedParameters (clusterName, nodeName, timeStamp, {paramName:paramValue})
262
263 def sendParameters (self, clusterName, nodeName, params):
264 """
265 Send multiple parameters specifying cluster and node name for them
266 """
267 self.sendTimedParameters (clusterName, nodeName, -1, params);
268
269 def sendTimedParameters (self, clusterName, nodeName, timeStamp, params):
270 """
271 Send multiple monitored parameters to MonALISA.
272
273 - clusterName is the name of the cluster being monitored. The first
274 time this function is called, this paramenter must not be None. Then,
275 it can be None; last given clusterName will be used instead.
276 - nodeName is the name of the node for which are the parameters. If this
277 is None, the full hostname of this machine will be sent instead.
278 - timeStamp, if > 0, is given time for the parameters. This is in seconds from Epoch.
279 Note that this option should be used only if you are sure about the time for the result.
280 Otherwize, the parameters will be assigned a correct time (obtained from NTP servers)
281 in MonALISA service. This option can be usefull when parsing logs, for example.
282 - params is a dictionary containing pairs with:
283 - key: parameter name
284 - value: parameter value, either int or float.
285 or params is a vector of tuples (key, value). This version can be used
286 in case you want to send the parameters in a given order.
287
288 NOTE that python doesn't know about 32-bit floats (only 64-bit floats!)
289 """
290 if not self.__initializedOK:
291 self.logger.log(Logger.WARNING, "Not initialized correctly. Message NOT sent!");
292 return
293 if (clusterName == None) or (clusterName == ""):
294 clusterName = self.__defaultUserCluster
295 else:
296 self.__defaultUserCluster = clusterName
297 if nodeName == None:
298 nodeName = self.__defaultUserNode
299 else:
300 self.__defaultUserNode = nodeName
301 self.__configUpdateLock.acquire();
302 for dest in self.destinations.keys():
303 self.__directSendParams(self.sender, dest, clusterName, nodeName, timeStamp, params);
304 self.__configUpdateLock.release();
305
306 def addJobToMonitor (self, pid, workDir, clusterName, nodeName):
307 """
308 Add a new job to monitor.
309 """
310 self.__bgMonitorLock.acquire();
311 self.monitoredJobs[pid] = {};
312 self.monitoredJobs[pid]['CLUSTER_NAME'] = clusterName;
313 self.monitoredJobs[pid]['NODE_NAME'] = nodeName;
314 self.procInfo.addJobToMonitor(pid, workDir);
315 self.__bgMonitorLock.release();
316
317 def removeJobToMonitor (self, pid):
318 """
319 Remove a job from being monitored.
320 """
321 self.__bgMonitorLock.acquire();
322 self.procInfo.removeJobToMonitor(pid);
323 del self.monitoredJobs[pid];
324 self.__bgMonitorLock.release();
325
326 def setMonitorClusterNode (self, clusterName, nodeName):
327 """
328 Set the cluster and node names where to send system related information.
329 """
330 self.__bgMonitorLock.acquire();
331 if (clusterName != None) and (clusterName != ""):
332 self.__defaultSysMonCluster = clusterName;
333 if (nodeName != None) and (nodeName != ""):
334 self.__defaultSysMonNode = nodeName;
335 self.__bgMonitorLock.release();
336
337 def enableBgMonitoring (self, onOff):
338 """
339 Enable or disable background monitoring. Note that background monitoring information
340 can still be sent if user calls the sendBgMonitoring method.
341 """
342 self.performBgMonitoring = onOff;
343
344 def sendBgMonitoring (self, mustSend = False):
345 """
346 Send background monitoring about system and jobs to all interested destinations.
347 If mustSend == True, the information is sent regardles of the elapsed time since last sent
348 If mustSend == False, the data is sent only if the required interval has passed since last sent
349 """
350 self.__bgMonitorLock.acquire();
351 now = int(time.time());
352 updatedProcInfo = False;
353 for destination, options in self.destinations.iteritems():
354 sysParams = [];
355 jobParams = [];
356 prevRawData = self.destPrevData[destination];
357 # for each destination and its options, check if we have to report any background monitoring data
358 if(options['sys_monitoring'] and (mustSend or options['sys_data_sent'] + options['sys_interval'] <= now)):
359 for param, active in options.items():
360 m = re.match("sys_(.+)", param);
361 if(m != None and active):
362 param = m.group(1);
363 if not (param == 'monitoring' or param == 'interval' or param == 'data_sent'):
364 sysParams.append(param)
365 options['sys_data_sent'] = now;
366 if(options['job_monitoring'] and (mustSend or options['job_data_sent'] + options['job_interval'] <= now)):
367 for param, active in options.items():
368 m = re.match("job_(.+)", param);
369 if(m != None and active):
370 param = m.group(1);
371 if not (param == 'monitoring' or param == 'interval' or param == 'data_sent'):
372 jobParams.append(param);
373 options['job_data_sent'] = now;
374 if(options['general_info'] and (mustSend or options['general_data_sent'] + 2 * int(options['sys_interval']) <= now)):
375 for param, active in options.items():
376 if not (param.startswith("sys_") or param.startswith("job_")) and active:
377 if not (param == 'general_info' or param == 'general_data_sent'):
378 sysParams.append(param);
379 options['general_data_sent'] = now;
380
381 if (not updatedProcInfo) and ((len(sysParams) > 0) or (len(jobParams) > 0) ):
382 self.procInfo.update();
383 updatedProcInf = True;
384
385 sysResults = {}
386 if(len(sysParams) > 0):
387 sysResults = self.procInfo.getSystemData(sysParams, prevRawData);
388 self.updateLastSentTime(options, self.destinations[destination]);
389 if( (type(sysResults) == type( {} )) and len(sysResults.keys()) > 0):
390 self.__directSendParams(self.sender, destination, self.__defaultSysMonCluster, self.__defaultSysMonNode, -1, sysResults);
391 if( (type(sysResults) == type( [] )) and len(sysResults) > 0):
392 self.__directSendParams(self.sender, destination, self.__defaultSysMonCluster, self.__defaultSysMonNode, -1, sysResults);
393 for pid, props in self.monitoredJobs.items():
394 jobResults = {};
395 if(len(jobParams) > 0):
396 jobResults = self.procInfo.getJobData(pid, jobParams);
397 if(len(jobResults) > 0):
398 self.__directSendParams(self.sender, destination, props['CLUSTER_NAME'], props['NODE_NAME'], -1, jobResults);
399 self.__bgMonitorLock.release();
400
401 # copy the time when last data was sent
402 def updateLastSentTime(self, srcOpts, dstOpts):
403 if srcOpts.has_key('general_data_sent'):
404 dstOpts['general_data_sent'] = srcOpts['general_data_sent'];
405 if srcOpts.has_key('sys_data_sent'):
406 dstOpts['sys_data_sent'] = srcOpts['sys_data_sent'];
407 if srcOpts.has_key('job_data_sent'):
408 dstOpts['job_data_sent'] = srcOpts['job_data_sent'];
409
410 def setLogLevel (self, strLevel):
411 """
412 Change the log level. Given level is a string, one of 'FATAL', 'ERROR', 'WARNING',
413 'INFO', 'NOTICE', 'DEBUG'.
414 """
415 self.logger.setLogLevel(strLevel);
416
417 def setMaxMsgRate(self, rate):
418 """
419 Set the maximum number of messages that can be sent, per second.
420 """
421 self.maxMsgRate = rate;
422 self.logger.log(Logger.DEBUG, "Setting maxMsgRate to: " + str(rate));
423
424 def free(self):
425 """
426 Stop background threands, close opened sockets. You have to use this function if you want to
427 free all the resources that ApMon takes, and allow it to be garbage-collected.
428 """
429 if len(self.configAddresses) > 0:
430 self.__configUpdateEvent.set()
431 self.__configUpdateFinished.wait()
432 self.__bgMonitorEvent.set()
433 self.__bgMonitorFinished.wait()
434
435 if self.__udpSocket != None:
436 self.logger.log(Logger.DEBUG, "Closing UDP socket on ApMon object destroy.");
437 self.__udpSocket.close();
438 self.__udpSocket = None;
439 self.__freed = True
440
441
442 #########################################################################################
443 # Internal functions - Config reloader thread
444 #########################################################################################
445
446 def __configLoader(self):
447 """
448 Main loop of the thread that checks for changes and reloads the configuration
449 """
450 while not self.__configUpdateEvent.isSet():
451 self.__configUpdateEvent.wait(self.configRecheckInterval);
452 if self.__configUpdateEvent.isSet():
453 break
454 if self.configRecheck:
455 self.__reloadAddresses()
456 self.logger.log(Logger.DEBUG, "Config reloaded. Seleeping for "+`self.configRecheckInterval`+" sec.");
457 self.__configUpdateFinished.set();
458
459 def __reloadAddresses(self):
460 """
461 Refresh destinations hash, by loading data from all sources in configAddresses
462 """
463 newDestinations = {}
464 for src in self.configAddresses:
465 self.__initializeFromFile(src, newDestinations)
466 # avoid changing config in the middle of sending packets to previous destinations
467 self.__configUpdateLock.acquire()
468 self.destinations = newDestinations
469 self.__configUpdateLock.release()
470
471 def __addDestination (self, aDestination, tempDestinations, options = __defaultOptions):
472 """
473 Add a destination to the list.
474
475 aDestination is a string of the form "{hostname|ip}[:port] [passwd]" without quotes.
476 If the port is not given, it will be used the default port (8884)
477 If the password is missing, it will be considered an empty string
478 """
479 aDestination = aDestination.strip().replace('\t', ' ')
480 while aDestination != aDestination.replace(' ', ' '):
481 aDestination = aDestination.replace(' ', ' ')
482 sepPort = aDestination.find (':')
483 sepPasswd = aDestination.rfind (' ')
484 if sepPort >= 0:
485 host = aDestination[0:sepPort].strip()
486 if sepPasswd > sepPort + 1:
487 port = aDestination[sepPort+1:sepPasswd].strip()
488 passwd = aDestination[sepPasswd:].strip()
489 else:
490 port = aDestination[sepPort+1:].strip()
491 passwd = ""
492 else:
493 port = str(self.__defaultPort)
494 if sepPasswd >= 0:
495 host = aDestination[0:sepPasswd].strip()
496 passwd = aDestination[sepPasswd:].strip()
497 else:
498 host = aDestination.strip()
499 passwd = ""
500 if (not port.isdigit()):
501 self.logger.log(Logger.WARNING, "Bad value for port number "+`port`+" in "+aDestination+" destination");
502 return
503 alreadyAdded = False
504 port = int(port)
505 host = socket.gethostbyname(host) # convert hostnames to IP addresses to avoid suffocating DNSs
506 for h, p, w in tempDestinations.keys():
507 if (h == host) and (p == port):
508 alreadyAdded = True
509 break
510 destination = (host, port, passwd);
511 if not alreadyAdded:
512 self.logger.log(Logger.INFO, "Adding destination "+host+':'+`port`+' '+passwd);
513 tempDestinations[destination] = copy.deepcopy(self.__defaultOptions); # have a different set of options for each dest.
514 self.destPrevData[destination] = {};
515 if options != self.__defaultOptions:
516 # we have to overwrite defaults with given options
517 for key, value in options.items():
518 self.logger.log(Logger.NOTICE, "Overwritting option: "+key+" = "+`value`);
519 tempDestinations[destination][key] = value;
520 else:
521 self.logger.log(Logger.NOTICE, "Destination "+host+":"+str(port)+" "+passwd+" already added. Skipping it");
522
523 def __initializeFromFile (self, confFileName, tempDestinations):
524 """
525 Load destinations from confFileName file. If it's an URL (starts with "http://")
526 load configuration from there. Put all destinations in tempDestinations hash.
527
528 Calls addDestination for each line that doesn't start with # and
529 has non-whitespace characters on it
530 """
531 try:
532 if confFileName.find ("http://") == 0:
533 confFile = urllib2.urlopen (confFileName)
534 else:
535 confFile = open (confFileName)
536 except urllib2.HTTPError, e:
537 self.logger.log(Logger.ERROR, "Cannot open "+confFileName);
538 if e.code == 401:
539 self.logger.log(Logger.ERROR, 'HTTPError: not authorized.');
540 elif e.code == 404:
541 self.logger.log(Logger.ERROR, 'HTTPError: not found.');
542 elif e.code == 503:
543 self.logger.log(Logger.ERROR, 'HTTPError: service unavailable.');
544 else:
545 self.logger.log(Logger.ERROR, 'HTTPError: unknown error.');
546 return
547 except urllib2.URLError, ex:
548 self.logger.log(Logger.ERROR, "Cannot open "+confFileName);
549 self.logger.log(Logger.ERROR, "URL Error: "+str(ex.reason[1]));
550 return
551 except IOError, ex:
552 self.logger.log(Logger.ERROR, "Cannot open "+confFileName);
553 self.logger.log(Logger.ERROR, "IOError: "+str(ex));
554 return
555 self.logger.log(Logger.INFO, "Adding destinations from "+confFileName);
556 dests = []
557 opts = {}
558 while(True):
559 line = confFile.readline();
560 if line == '':
561 break;
562 line = line.strip()
563 self.logger.log(Logger.DEBUG, "Reading line "+line);
564 if (len(line) == 0) or (line[0] == '#'):
565 continue
566 elif line.startswith("xApMon_"):
567 m = re.match("xApMon_(.*)", line);
568 if m != None:
569 m = re.match("(\S+)\s*=\s*(\S+)", m.group(1));
570 if m != None:
571 param = m.group(1); value = m.group(2);
572 if(value.upper() == "ON"):
573 value = True;
574 elif(value.upper() == "OFF"):
575 value = False;
576 elif(param.endswith("_interval")):
577 value = int(value);
578 if param == "loglevel":
579 self.logger.setLogLevel(value);
580 elif param == "maxMsgRate":
581 self.setMaxMsgRate(int(value));
582 elif param == "conf_recheck":
583 self.configRecheck = value;
584 elif param == "recheck_interval":
585 self.configRecheckInterval = value;
586 elif param.endswith("_data_sent"):
587 pass; # don't reset time in sys/job/general/_data_sent
588 else:
589 opts[param] = value;
590 else:
591 dests.append(line);
592
593 confFile.close ()
594 for line in dests:
595 self.__addDestination(line, tempDestinations, opts)
596
597 ###############################################################################################
598 # Internal functions - Background monitor thread
599 ###############################################################################################
600
601 def __bgMonitor (self):
602 while not self.__bgMonitorEvent.isSet():
603 self.__bgMonitorEvent.wait(10)
604 if self.__bgMonitorEvent.isSet():
605 break
606 if self.performBgMonitoring:
607 self.sendBgMonitoring() # send only if the interval has elapsed
608 self.__bgMonitorFinished.set()
609
610 ###############################################################################################
611 # Internal helper functions
612 ###############################################################################################
613
614 def __directSendParams (self, senderRef, destination, clusterName, nodeName, timeStamp, params):
615
616 if self.__shouldSend() == False:
617 self.logger.log(Logger.DEBUG, "Dropping packet since rate is too fast!");
618 return;
619
620 if destination == None:
621 self.logger.log(Logger.WARNING, "Destination is None");
622 return;
623
624 host, port, passwd = destination
625 senderRef['SEQ_NR'] = (senderRef['SEQ_NR'] + 1) % 2000000000; # wrap around 2 mld
626
627 xdrPacker = xdrlib.Packer ();
628 self.logger.log(Logger.DEBUG, "Building XDR packet for ["+str(clusterName)+"/"+str(nodeName)+"] <"+str(senderRef['SEQ_NR'])+"/"+str(senderRef['INSTANCE_ID'])+"> len:"+str(len(params)));
629
630 xdrPacker.pack_string ("v:"+self.__version+"p:"+passwd)
631
632 xdrPacker.pack_int (senderRef['INSTANCE_ID']);
633 xdrPacker.pack_int (senderRef['SEQ_NR']);
634
635 xdrPacker.pack_string (clusterName);
636 xdrPacker.pack_string (nodeName);
637 xdrPacker.pack_int (len(params));
638
639 if type(params) == type( {} ):
640 for name, value in params.iteritems():
641 self.__packParameter(xdrPacker, name, value)
642 elif type(params) == type( [] ):
643 for name, value in params:
644 self.logger.log(Logger.DEBUG, "Adding parameter "+name+" = "+str(value));
645 self.__packParameter(xdrPacker, name, value)
646 else:
647 self.logger.log(Logger.WARNING, "Unsupported params type in sendParameters: " + str(type(params)));
648 if (timeStamp != None) and (timeStamp > 0):
649 xdrPacker.pack_int(timeStamp);
650
651 buffer = xdrPacker.get_buffer();
652 # send this buffer to the destination, using udp datagrams
653 try:
654 self.__udpSocket.sendto(buffer, (host, port))
655 self.logger.log(Logger.DEBUG, "Packet sent to "+host+":"+str(port)+" "+passwd);
656 except socket.error, msg:
657 self.logger.log(Logger.ERROR, "Cannot send packet to "+host+":"+str(port)+" "+passwd+": "+str(msg[1]));
658 xdrPacker.reset()
659
660 def __packParameter(self, xdrPacker, name, value):
661 if (name is None) or (name is ""):
662 self.logger.log(Logger.WARNING, "Undefine parameter name.");
663 return;
664 if (value is None):
665 self.logger.log(Logger.WARNING, "Ignore " + str(name)+ " parameter because of None value");
666 return;
667 try:
668 typeValue = self.__valueTypes[type(value)]
669 xdrPacker.pack_string (name)
670 xdrPacker.pack_int (typeValue)
671 self.__packFunctions[typeValue] (xdrPacker, value)
672 self.logger.log(Logger.DEBUG, "Adding parameter "+str(name)+" = "+str(value));
673 except Exception, ex:
674 self.logger.log(Logger.WARNING, "Error packing %s = %s; got %s" % (name, str(value), ex))
675
676 # Destructor
677 def __del__(self):
678 if not self.__freed:
679 self.free();
680
681 # Decide if the current datagram should be sent.
682 # This decision is based on the number of messages previously sent.
683 def __shouldSend(self):
684 now = long(time.time());
685 if now != self.__crtTime :
686 # new time
687 # update previous counters;
688 self.__prvSent = self.__hWeight * self.__prvSent + (1.0 - self.__hWeight) * self.__crtSent / (now - self.__crtTime);
689 self.__prvTime = self.__crtTime;
690 self.logger.log(Logger.DEBUG, "previously sent: " + str(self.__crtSent) + "; dropped: " + str(self.__crtDrop));
691 # reset current counter
692 self.__crtTime = now;
693 self.__crtSent = 0;
694 self.__crtDrop = 0;
695
696 # compute the history
697 valSent = self.__prvSent * self.__hWeight + self.__crtSent * (1 - self.__hWeight);
698
699 doSend = True;
700
701 # when we should start dropping messages
702 level = self.maxMsgRate - self.maxMsgRate / 10;
703
704 if valSent > (self.maxMsgRate - level) :
705 if random.randint(0,self.maxMsgRate / 10) >= (self.maxMsgRate - valSent):
706 doSend = False;
707
708 # counting sent and dropped messages
709 if doSend:
710 self.__crtSent+=1;
711 else:
712 self.__crtDrop+=1;
713
714 return doSend;
715
716 ################################################################################################
717 # Private variables. Don't touch
718 ################################################################################################
719
720 __valueTypes = {
721 type("string"): 0, # XDR_STRING (see ApMon.h from C/C++ ApMon version)
722 type(1): 2, # XDR_INT32
723 type(1.0): 5}; # XDR_REAL64
724
725 __packFunctions = {
726 0: xdrlib.Packer.pack_string,
727 2: xdrlib.Packer.pack_int,
728 5: xdrlib.Packer.pack_double }
729
730 __defaultPort = 8884
731 __version = "2.2.2-py" # apMon version number
732