ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/apmon.py
Revision: 1.3
Committed: Wed Mar 8 17:10:49 2006 UTC (19 years, 1 month ago) by corvo
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_1_0_6, CRAB_1_0_5
Changes since 1.2: +102 -45 lines
Log Message:
new apmon verision with thread patch

File Contents

# Content
1
2 """
3 * ApMon - Application Monitoring Tool
4 * Version: 2.2.1
5 *
6 * Copyright (C) 2006 California Institute of Technology
7 *
8 * Permission is hereby granted, free of charge, to use, copy and modify
9 * this software and its documentation (the "Software") for any
10 * purpose, provided that existing copyright notices are retained in
11 * all copies and that this notice is included verbatim in any distributions
12 * or substantial portions of the Software.
13 * This software is a part of the MonALISA framework (http://monalisa.cacr.caltech.edu).
14 * Users of the Software are asked to feed back problems, benefits,
15 * and/or suggestions about the software to the MonALISA Development Team
16 * (developers@monalisa.cern.ch). Support for this software - fixing of bugs,
17 * incorporation of new features - is done on a best effort basis. All bug
18 * fixes and enhancements will be made available under the same terms and
19 * conditions as the original software,
20
21 * IN NO EVENT SHALL THE AUTHORS OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
22 * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
23 * OF THE USE OF THIS SOFTWARE, ITS DOCUMENTATION, OR ANY DERIVATIVES THEREOF,
24 * EVEN IF THE AUTHORS HAVE BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
25
26 * THE AUTHORS AND DISTRIBUTORS SPECIFICALLY DISCLAIM ANY WARRANTIES,
27 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY,
28 * FITNESS FOR A PARTICULAR PURPOSE, AND NON-INFRINGEMENT. THIS SOFTWARE IS
29 * PROVIDED ON AN "AS IS" BASIS, AND THE AUTHORS AND DISTRIBUTORS HAVE NO
30 * OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR
31 * MODIFICATIONS.
32 """
33
34 """
35 apmon.py
36
37 This is a python implementation for the ApMon API for sending
38 data to the MonALISA service.
39
40 For further details about ApMon please see the C/C++ or Java documentation
41 You can find a sample usage of this module in apmTest.py.
42
43 Note that the parameters must be either integers(32 bits) or doubles(64 bits).
44 Sending strings is supported, but they will not be stored in the
45 farm's store nor shown in the farm's window in the MonALISA client.
46 """
47
48 import re
49 import xdrlib
50 import socket
51 import urllib2
52 import threading
53 import time
54 import Logger
55 import ProcInfo
56 import random
57 import copy
58
59 #__all__ = ["ApMon"]
60
61 #__debug = False # self.destPrevData[destination];set this to True to be verbose
62
63 class ApMon:
64 """
65 Main class for sending monitoring data to a MonaLisa module.
66 One or more destinations can be chosen for the data. See constructor.
67
68 The data is packed in UDP datagrams, using XDR. The following fields are sent:
69 - version & password (string)
70 - cluster name (string)
71 - node name (string)
72 - number of parameters (int)
73 - for each parameter:
74 - name (string)
75 - value type (int)
76 - value
77 - optionally a (int) with the given timestamp
78
79 Attributes (public):
80 - destinations - a list containing (ip, port, password) tuples
81 - configAddresses - list with files and urls from where the config is read
82 - configRecheckInterval - period, in seconds, to check for changes
83 in the configAddresses list
84 - configRecheck - boolean - whether to recheck periodically for changes
85 in the configAddresses list
86 """
87
88 __defaultOptions = {
89 'job_monitoring': True, # perform (or not) job monitoring
90 'job_interval' : 10, # at this interval (in seconds)
91 'job_data_sent' : 0, # time from Epoch when job information was sent; don't touch!
92
93 'job_cpu_time' : True, # elapsed time from the start of this job in seconds
94 'job_run_time' : True, # processor time spent running this job in seconds
95 'job_cpu_usage' : True, # current percent of the processor used for this job, as reported by ps
96 'job_virtualmem': True, # size in JB of the virtual memory occupied by the job, as reported by ps
97 'job_rss' : True, # size in KB of the resident image size of the job, as reported by ps
98 'job_mem_usage' : True, # percent of the memory occupied by the job, as reported by ps
99 'job_workdir_size': True, # size in MB of the working directory of the job
100 'job_disk_total': True, # size in MB of the total size of the disk partition containing the working directory
101 'job_disk_used' : True, # size in MB of the used disk partition containing the working directory
102 'job_disk_free' : True, # size in MB of the free disk partition containing the working directory
103 'job_disk_usage': True, # percent of the used disk partition containing the working directory
104 'job_open_files': True, # number of open file descriptors
105
106 'sys_monitoring': True, # perform (or not) system monitoring
107 'sys_interval' : 10, # at this interval (in seconds)
108 'sys_data_sent' : 0, # time from Epoch when system information was sent; don't touch!
109
110 'sys_cpu_usr' : False, # cpu-usage information
111 'sys_cpu_sys' : False, # all these will produce coresponding paramas without "sys_"
112 'sys_cpu_nice' : False,
113 'sys_cpu_idle' : False,
114 'sys_cpu_usage' : True,
115 'sys_load1' : True, # system load information
116 'sys_load5' : True,
117 'sys_load15' : True,
118 'sys_mem_used' : False, # memory usage information
119 'sys_mem_free' : False,
120 'sys_mem_usage' : True,
121 'sys_pages_in' : False,
122 'sys_pages_out' : False,
123 'sys_swap_used' : True, # swap usage information
124 'sys_swap_free' : False,
125 'sys_swap_usage': True,
126 'sys_swap_in' : False,
127 'sys_swap_out' : False,
128 'sys_net_in' : True, # network transfer in kBps
129 'sys_net_out' : True, # these will produce params called ethX_in, ethX_out, ethX_errs
130 'sys_net_errs' : False, # for each eth interface
131 'sys_net_sockets' : True, # number of opened sockets for each proto => sockets_tcp/udp/unix ...
132 'sys_net_tcp_details' : True, # number of tcp sockets in each state => sockets_tcp_LISTEN, ...
133 'sys_processes' : True,
134 'sys_uptime' : True, # uptime of the machine, in days (float number)
135
136 'general_info' : True, # send (or not) general host information once every 2 $sys_interval seconds
137 'general_data_sent': 0, # time from Epoch when general information was sent; don't touch!
138
139 'hostname' : True,
140 'ip' : True, # will produce ethX_ip params for each interface
141 'cpu_MHz' : True,
142 'no_CPUs' : True, # number of CPUs
143 'total_mem' : True,
144 'total_swap' : True,
145 'cpu_vendor_id' : True,
146 'cpu_family' : True,
147 'cpu_model' : True,
148 'cpu_model_name': True,
149 'bogomips' : True};
150
151 def __init__ (self, initValue, defaultLogLevel = Logger.INFO):
152 """
153 Class constructor:
154 - if initValue is a string, put it in configAddresses and load destinations
155 from the file named like that. if it starts with "http://", the configuration
156 is loaded from that URL. For background monitoring, given parameters will overwrite defaults
157
158 - if initValue is a list, put its contents in configAddresses and create
159 the list of destinations from all those sources. For background monitoring,
160 given parameters will overwrite defaults (see __defaultOptions)
161
162 - if initValue is a tuple (of strings), initialize destinations with that values.
163 Strings in this tuple have this form: "{hostname|ip}[:port][ passwd]", the
164 default port being 8884 and the default password being "". Background monitoring will be
165 enabled sending the parameters active from __defaultOptions (see end of file)
166
167 - if initValue is a hash (key = string(hostname|ip[:port][ passwd]),
168 val = hash{'param_name': True/False, ...}) the given options for each destination
169 will overwrite the default parameters (see __defaultOptions)
170 """
171 self.destinations = {} # empty, by default; key = tuple (host, port, pass) ; val = hash {"param_mame" : True/False, ...}
172 self.destPrevData = {} # empty, by defaul; key = tuple (host, port, pass) ; val = hash {"param_mame" : value, ...}
173 self.configAddresses = [] # empty, by default; list of files/urls from where we read config
174 self.configRecheckInterval = 120 # 2 minutes
175 self.configRecheck = True # enabled by default
176 self.performBgMonitoring = True # by default, perform background monitoring
177 self.monitoredJobs = {} # Monitored jobs; key = pid; value = hash with
178 self.maxMsgRate = 20; # Maximum number of messages allowed to be sent per second
179 self.sender = {'INSTANCE_ID': random.randint(0,0x7FFFFFFE), 'SEQ_NR': 0};
180 self.__defaultUserCluster = "ApMon_UserSend";
181 self.__defaultUserNode = socket.getfqdn();
182 self.__defaultSysMonCluster = "ApMon_SysMon";
183 self.__defaultSysMonNode = socket.getfqdn();
184 # don't touch these:
185 self.__freed = False
186 self.__initializedOK = True
187 self.__udpSocket = None
188 self.__configUpdateLock = threading.Lock()
189 self.__configUpdateEvent = threading.Event()
190 self.__bgMonitorLock = threading.Lock()
191 self.__bgMonitorEvent = threading.Event()
192 # don't allow a user to send more than MAX_MSG messages per second, in average
193 self.__crtTime = 0;
194 self.__prvTime = 0;
195 self.__prvSent = 0;
196 self.__prvDrop = 0;
197 self.__crtSent = 0;
198 self.__crtDrop = 0;
199 self.__hWeight = 0.92;
200 self.logger = Logger.Logger(defaultLogLevel)
201 if type(initValue) == type("string"):
202 self.configAddresses.append(initValue)
203 self.__reloadAddresses()
204 elif type(initValue) == type([]):
205 self.configAddresses = initValue
206 self.__reloadAddresses()
207 elif type(initValue) == type(()):
208 for dest in initValue:
209 self.__addDestination (dest, self.destinations)
210 elif type(initValue) == type({}):
211 for dest, opts in initValue.items():
212 self.__addDestination (dest, self.destinations, opts)
213
214 self.__initializedOK = (len (self.destinations) > 0)
215 if not self.__initializedOK:
216 self.logger.log(Logger.ERROR, "Failed to initialize. No destination defined.");
217 # self.__defaultClusterName = None
218 # self.__defaultNodeName = self.getMyHo
219 if self.__initializedOK:
220 self.__udpSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
221 if len(self.configAddresses) > 0:
222 # if there are addresses that need to be monitored,
223 # start config checking and reloading thread
224 th = threading.Thread(target=self.__configLoader)
225 th.setDaemon(True) # this is a daemon thread
226 th.start()
227 # create the ProcInfo instance
228 self.procInfo = ProcInfo.ProcInfo(self.logger);
229 # self.procInfo.update();
230 # start the background monitoring thread
231 th = threading.Thread(target=self.__bgMonitor);
232 th.setDaemon(True);
233 th.start();
234
235
236 def sendParams (self, params):
237 """
238 Send multiple parameters to MonALISA, with default (last given) cluser and node names.
239 """
240 self.sendTimedParams (-1, params)
241
242 def sendTimedParams (self, timeStamp, params):
243 """
244 Send multiple parameters, specifying the time for them, with default (last given) cluster and node names.
245 (See sendTimedParameters for more details)
246 """
247 self.sendTimedParameters (None, None, timeStamp, params);
248
249 def sendParameter (self, clusterName, nodeName, paramName, paramValue):
250 """
251 Send a single parameter to MonALISA.
252 """
253 self.sendTimedParameter(clusterName, nodeName, -1, paramName, paramValue);
254
255 def sendTimedParameter (self, clusterName, nodeName, timeStamp, paramName, paramValue):
256 """
257 Send a single parameter, with a given time.
258 """
259 self.sendTimedParameters (clusterName, nodeName, timeStamp, {paramName:paramValue})
260
261 def sendParameters (self, clusterName, nodeName, params):
262 """
263 Send multiple parameters specifying cluster and node name for them
264 """
265 self.sendTimedParameters (clusterName, nodeName, -1, params);
266
267 def sendTimedParameters (self, clusterName, nodeName, timeStamp, params):
268 """
269 Send multiple monitored parameters to MonALISA.
270
271 - clusterName is the name of the cluster being monitored. The first
272 time this function is called, this paramenter must not be None. Then,
273 it can be None; last given clusterName will be used instead.
274 - nodeName is the name of the node for which are the parameters. If this
275 is None, the full hostname of this machine will be sent instead.
276 - timeStamp, if > 0, is given time for the parameters. This is in seconds from Epoch.
277 Note that this option should be used only if you are sure about the time for the result.
278 Otherwize, the parameters will be assigned a correct time (obtained from NTP servers)
279 in MonALISA service. This option can be usefull when parsing logs, for example.
280 - params is a dictionary containing pairs with:
281 - key: parameter name
282 - value: parameter value, either int or float.
283 or params is a vector of tuples (key, value). This version can be used
284 in case you want to send the parameters in a given order.
285
286 NOTE that python doesn't know about 32-bit floats (only 64-bit floats!)
287 """
288 if not self.__initializedOK:
289 self.logger.log(Logger.WARNING, "Not initialized correctly. Message NOT sent!");
290 return
291 if (clusterName == None) or (clusterName == ""):
292 clusterName = self.__defaultUserCluster
293 else:
294 self.__defaultUserCluster = clusterName
295 if nodeName == None:
296 nodeName = self.__defaultUserNode
297 else:
298 self.__defaultUserNode = nodeName
299 self.__configUpdateLock.acquire();
300 for dest in self.destinations.keys():
301 self.__directSendParams(self.sender, dest, clusterName, nodeName, timeStamp, params);
302 self.__configUpdateLock.release();
303
304 def addJobToMonitor (self, pid, workDir, clusterName, nodeName):
305 """
306 Add a new job to monitor.
307 """
308 self.__bgMonitorLock.acquire();
309 self.monitoredJobs[pid] = {};
310 self.monitoredJobs[pid]['CLUSTER_NAME'] = clusterName;
311 self.monitoredJobs[pid]['NODE_NAME'] = nodeName;
312 self.procInfo.addJobToMonitor(pid, workDir);
313 self.__bgMonitorLock.release();
314
315 def removeJobToMonitor (self, pid):
316 """
317 Remove a job from being monitored.
318 """
319 self.__bgMonitorLock.acquire();
320 self.procInfo.removeJobToMonitor(pid);
321 del self.monitoredJobs[pid];
322 self.__bgMonitorLock.release();
323
324 def setMonitorClusterNode (self, clusterName, nodeName):
325 """
326 Set the cluster and node names where to send system related information.
327 """
328 self.__bgMonitorLock.acquire();
329 if (clusterName != None) and (clusterName != ""):
330 self.__defaultSysMonCluster = clusterName;
331 if (nodeName != None) and (nodeName != ""):
332 self.__defaultSysMonNode = nodeName;
333 self.__bgMonitorLock.release();
334
335 def enableBgMonitoring (self, onOff):
336 """
337 Enable or disable background monitoring. Note that background monitoring information
338 can still be sent if user calls the sendBgMonitoring method.
339 """
340 self.performBgMonitoring = onOff;
341
342 def sendBgMonitoring (self, mustSend = False):
343 """
344 Send background monitoring about system and jobs to all interested destinations.
345 If mustSend == True, the information is sent regardles of the elapsed time since last sent
346 If mustSend == False, the data is sent only if the required interval has passed since last sent
347 """
348 self.__bgMonitorLock.acquire();
349 now = int(time.time());
350 updatedProcInfo = False;
351 for destination, options in self.destinations.iteritems():
352 sysParams = [];
353 jobParams = [];
354 prevRawData = self.destPrevData[destination];
355 # for each destination and its options, check if we have to report any background monitoring data
356 if(options['sys_monitoring'] and (mustSend or options['sys_data_sent'] + options['sys_interval'] <= now)):
357 for param, active in options.items():
358 m = re.match("sys_(.+)", param);
359 if(m != None and active):
360 param = m.group(1);
361 if not (param == 'monitoring' or param == 'interval' or param == 'data_sent'):
362 sysParams.append(param)
363 options['sys_data_sent'] = now;
364 if(options['job_monitoring'] and (mustSend or options['job_data_sent'] + options['job_interval'] <= now)):
365 for param, active in options.items():
366 m = re.match("job_(.+)", param);
367 if(m != None and active):
368 param = m.group(1);
369 if not (param == 'monitoring' or param == 'interval' or param == 'data_sent'):
370 jobParams.append(param);
371 options['job_data_sent'] = now;
372 if(options['general_info'] and (mustSend or options['general_data_sent'] + 2 * int(options['sys_interval']) <= now)):
373 for param, active in options.items():
374 if not (param.startswith("sys_") or param.startswith("job_")) and active:
375 if not (param == 'general_info' or param == 'general_data_sent'):
376 sysParams.append(param);
377 options['general_data_sent'] = now;
378
379 if (not updatedProcInfo) and ((len(sysParams) > 0) or (len(jobParams) > 0) ):
380 self.procInfo.update();
381 updatedProcInf = True;
382
383 sysResults = {}
384 if(len(sysParams) > 0):
385 sysResults = self.procInfo.getSystemData(sysParams, prevRawData);
386 self.updateLastSentTime(options, self.destinations[destination]);
387 if( (type(sysResults) == type( {} )) and len(sysResults.keys()) > 0):
388 self.__directSendParams(self.sender, destination, self.__defaultSysMonCluster, self.__defaultSysMonNode, -1, sysResults);
389 if( (type(sysResults) == type( [] )) and len(sysResults) > 0):
390 self.__directSendParams(self.sender, destination, self.__defaultSysMonCluster, self.__defaultSysMonNode, -1, sysResults);
391 for pid, props in self.monitoredJobs.items():
392 jobResults = {};
393 if(len(jobParams) > 0):
394 jobResults = self.procInfo.getJobData(pid, jobParams);
395 if(len(jobResults) > 0):
396 self.__directSendParams(self.sender, destination, props['CLUSTER_NAME'], props['NODE_NAME'], -1, jobResults);
397 self.__bgMonitorLock.release();
398
399 # copy the time when last data was sent
400 def updateLastSentTime(self, srcOpts, dstOpts):
401 if srcOpts.has_key('general_data_sent'):
402 dstOpts['general_data_sent'] = srcOpts['general_data_sent'];
403 if srcOpts.has_key('sys_data_sent'):
404 dstOpts['sys_data_sent'] = srcOpts['sys_data_sent'];
405 if srcOpts.has_key('job_data_sent'):
406 dstOpts['job_data_sent'] = srcOpts['job_data_sent'];
407
408 def setLogLevel (self, strLevel):
409 """
410 Change the log level. Given level is a string, one of 'FATAL', 'ERROR', 'WARNING',
411 'INFO', 'NOTICE', 'DEBUG'.
412 """
413 self.logger.setLogLevel(strLevel);
414
415 def setMaxMsgRate(self, rate):
416 """
417 Set the maximum number of messages that can be sent, per second.
418 """
419 self.maxMsgRate = rate;
420 self.logger.log(Logger.DEBUG, "Setting maxMsgRate to: " + str(rate));
421
422 def free(self):
423 """
424 Stop background threands, close opened sockets. You have to use this function if you want to
425 free all the resources that ApMon takes, and allow it to be garbage-collected.
426 """
427 if self.__configUpdateEvent != None:
428 self.__configUpdateEvent.set();
429 if self.__bgMonitorEvent != None:
430 self.__bgMonitorEvent.set();
431 time.sleep(0.01);
432 if self.__udpSocket != None:
433 self.logger.log(Logger.DEBUG, "Closing UDP socket on ApMon object destroy.");
434 self.__udpSocket.close();
435 self.__udpSocket = None;
436 self.__freed = True
437
438
439 #########################################################################################
440 # Internal functions - Config reloader thread
441 #########################################################################################
442
443 def __configLoader(self):
444 """
445 Main loop of the thread that checks for changes and reloads the configuration
446 """
447 while not self.__configUpdateEvent.isSet():
448 self.__configUpdateEvent.wait(self.configRecheckInterval);
449 if self.__configUpdateEvent.isSet():
450 return;
451 if self.configRecheck:
452 self.__reloadAddresses()
453 self.logger.log(Logger.DEBUG, "Config reloaded. Seleeping for "+`self.configRecheckInterval`+" sec.");
454
455 def __reloadAddresses(self):
456 """
457 Refresh destinations hash, by loading data from all sources in configAddresses
458 """
459 newDestinations = {}
460 for src in self.configAddresses:
461 self.__initializeFromFile(src, newDestinations)
462 # avoid changing config in the middle of sending packets to previous destinations
463 self.__configUpdateLock.acquire()
464 self.destinations = newDestinations
465 self.__configUpdateLock.release()
466
467 def __addDestination (self, aDestination, tempDestinations, options = __defaultOptions):
468 """
469 Add a destination to the list.
470
471 aDestination is a string of the form "{hostname|ip}[:port] [passwd]" without quotes.
472 If the port is not given, it will be used the default port (8884)
473 If the password is missing, it will be considered an empty string
474 """
475 aDestination = aDestination.strip().replace('\t', ' ')
476 while aDestination != aDestination.replace(' ', ' '):
477 aDestination = aDestination.replace(' ', ' ')
478 sepPort = aDestination.find (':')
479 sepPasswd = aDestination.rfind (' ')
480 if sepPort >= 0:
481 host = aDestination[0:sepPort].strip()
482 if sepPasswd > sepPort + 1:
483 port = aDestination[sepPort+1:sepPasswd].strip()
484 passwd = aDestination[sepPasswd:].strip()
485 else:
486 port = aDestination[sepPort+1:].strip()
487 passwd = ""
488 else:
489 port = str(self.__defaultPort)
490 if sepPasswd >= 0:
491 host = aDestination[0:sepPasswd].strip()
492 passwd = aDestination[sepPasswd:].strip()
493 else:
494 host = aDestination.strip()
495 passwd = ""
496 if (not port.isdigit()):
497 self.logger.log(Logger.WARNING, "Bad value for port number "+`port`+" in "+aDestination+" destination");
498 return
499 alreadyAdded = False
500 port = int(port)
501 host = socket.gethostbyname(host) # convert hostnames to IP addresses to avoid suffocating DNSs
502 for h, p, w in tempDestinations.keys():
503 if (h == host) and (p == port):
504 alreadyAdded = True
505 break
506 destination = (host, port, passwd);
507 if not alreadyAdded:
508 self.logger.log(Logger.INFO, "Adding destination "+host+':'+`port`+' '+passwd);
509 tempDestinations[destination] = copy.deepcopy(self.__defaultOptions); # have a different set of options for each dest.
510 self.destPrevData[destination] = {};
511 if options != self.__defaultOptions:
512 # we have to overwrite defaults with given options
513 for key, value in options.items():
514 self.logger.log(Logger.NOTICE, "Overwritting option: "+key+" = "+`value`);
515 tempDestinations[destination][key] = value;
516 else:
517 self.logger.log(Logger.NOTICE, "Destination "+host+":"+str(port)+" "+passwd+" already added. Skipping it");
518
519 def __initializeFromFile (self, confFileName, tempDestinations):
520 """
521 Load destinations from confFileName file. If it's an URL (starts with "http://")
522 load configuration from there. Put all destinations in tempDestinations hash.
523
524 Calls addDestination for each line that doesn't start with # and
525 has non-whitespace characters on it
526 """
527 try:
528 if confFileName.find ("http://") == 0:
529 confFile = urllib2.urlopen (confFileName)
530 else:
531 confFile = open (confFileName)
532 except urllib2.HTTPError, e:
533 self.logger.log(Logger.ERROR, "Cannot open "+confFileName);
534 if e.code == 401:
535 self.logger.log(Logger.ERROR, 'HTTPError: not authorized.');
536 elif e.code == 404:
537 self.logger.log(Logger.ERROR, 'HTTPError: not found.');
538 elif e.code == 503:
539 self.logger.log(Logger.ERROR, 'HTTPError: service unavailable.');
540 else:
541 self.logger.log(Logger.ERROR, 'HTTPError: unknown error.');
542 return
543 except urllib2.URLError, ex:
544 self.logger.log(Logger.ERROR, "Cannot open "+confFileName);
545 self.logger.log(Logger.ERROR, "URL Error: "+str(ex.reason[1]));
546 return
547 except IOError, ex:
548 self.logger.log(Logger.ERROR, "Cannot open "+confFileName);
549 self.logger.log(Logger.ERROR, "IOError: "+str(ex));
550 return
551 self.logger.log(Logger.INFO, "Adding destinations from "+confFileName);
552 dests = []
553 opts = {}
554 while(True):
555 line = confFile.readline();
556 if line == '':
557 break;
558 line = line.strip()
559 self.logger.log(Logger.DEBUG, "Reading line "+line);
560 if (len(line) == 0) or (line[0] == '#'):
561 continue
562 elif line.startswith("xApMon_"):
563 m = re.match("xApMon_(.*)", line);
564 if m != None:
565 m = re.match("(\S+)\s*=\s*(\S+)", m.group(1));
566 if m != None:
567 param = m.group(1); value = m.group(2);
568 if(value.upper() == "ON"):
569 value = True;
570 elif(value.upper() == "OFF"):
571 value = False;
572 elif(param.endswith("_interval")):
573 value = int(value);
574 if param == "loglevel":
575 self.logger.setLogLevel(value);
576 elif param == "maxMsgRate":
577 self.setMaxMsgRate(int(value));
578 elif param == "conf_recheck":
579 self.configRecheck = value;
580 elif param == "recheck_interval":
581 self.configRecheckInterval = value;
582 elif param.endswith("_data_sent"):
583 pass; # don't reset time in sys/job/general/_data_sent
584 else:
585 opts[param] = value;
586 else:
587 dests.append(line);
588
589 confFile.close ()
590 for line in dests:
591 self.__addDestination(line, tempDestinations, opts)
592
593 ###############################################################################################
594 # Internal functions - Background monitor thread
595 ###############################################################################################
596
597 def __bgMonitor (self):
598 while not self.__bgMonitorEvent.isSet():
599 self.__bgMonitorEvent.wait(10);
600 if self.__bgMonitorEvent.isSet():
601 return;
602 if self.performBgMonitoring:
603 self.sendBgMonitoring() # send only if the interval has elapsed
604
605 ###############################################################################################
606 # Internal helper functions
607 ###############################################################################################
608
609 def __directSendParams (self, senderRef, destination, clusterName, nodeName, timeStamp, params):
610
611 if senderRef=={}:
612 self.logger.log(Logger.WARNING, "Not sending undefined parameters!");
613 return;
614
615 if self.__shouldSend() == False:
616 return;
617
618 if destination == None:
619 self.logger.log(Logger.WARNING, "Destination is None");
620 return;
621
622 host, port, passwd = destination
623 senderRef['SEQ_NR'] = (senderRef['SEQ_NR'] + 1) % 2000000000; # wrap around 2 mld
624
625 xdrPacker = xdrlib.Packer ();
626 self.logger.log(Logger.DEBUG, "Building XDR packet for ["+str(clusterName)+"] <"+str(nodeName)+"> len:"+str(len(params)));
627
628 xdrPacker.pack_string ("v:"+self.__version+"p:"+passwd)
629
630 xdrPacker.pack_int (senderRef['INSTANCE_ID']);
631 xdrPacker.pack_int (senderRef['SEQ_NR']);
632
633 xdrPacker.pack_string (clusterName);
634 xdrPacker.pack_string (nodeName);
635 xdrPacker.pack_int (len(params));
636
637 if type(params) == type( {} ):
638 for name, value in params.iteritems():
639 self.__packParameter(xdrPacker, name, value)
640 elif type(params) == type( [] ):
641 for name, value in params:
642 self.logger.log(Logger.DEBUG, "Adding parameter "+name+" = "+str(value));
643 self.__packParameter(xdrPacker, name, value)
644 else:
645 self.logger.log(Logger.WARNING, "Unsupported params type in sendParameters: " + str(type(params)));
646 if (timeStamp != None) and (timeStamp > 0):
647 xdrPacker.pack_int(timeStamp);
648
649 buffer = xdrPacker.get_buffer();
650 # send this buffer to the destination, using udp datagrams
651 try:
652 self.__udpSocket.sendto(buffer, (host, port))
653 self.logger.log(Logger.DEBUG, "Packet sent to "+host+":"+str(port)+" "+passwd);
654 except socket.error, msg:
655 self.logger.log(Logger.ERROR, "Cannot send packet to "+host+":"+str(port)+" "+passwd+": "+str(msg[1]));
656 xdrPacker.reset()
657
658 def __packParameter(self, xdrPacker, name, value):
659 if (name is None) or (name is ""):
660 self.logger.log(Logger.WARNING, "Undefine parameter name.");
661 return;
662 if (value is None):
663 self.logger.log(Logger.WARNING, "Ignore " + str(name)+ " parameter because of None value");
664 return;
665 try:
666 typeValue = self.__valueTypes[type(value)]
667 xdrPacker.pack_string (name)
668 xdrPacker.pack_int (typeValue)
669 self.__packFunctions[typeValue] (xdrPacker, value)
670 self.logger.log(Logger.DEBUG, "Adding parameter "+str(name)+" = "+str(value));
671 except Exception, ex:
672 print "ApMon: error packing %s = %s; got %s" % (name, str(value), ex)
673
674 # Destructor
675 def __del__(self):
676 if not self.__freed:
677 self.free();
678
679 # Decide if the current datagram should be sent.
680 # This decision is based on the number of messages previously sent.
681 def __shouldSend(self):
682 now = long(time.time());
683 if now != self.__crtTime :
684 # new time
685 # update previous counters;
686 self.__prvSent = self.__hWeight * self.__prvSent + (1.0 - self.__hWeight) * self.__crtSent / (now - self.__crtTime);
687 self.__prvTime = self.__crtTime;
688 self.logger.log(Logger.DEBUG, "previously sent: " + str(self.__crtSent) + "; dropped: " + str(self.__crtDrop));
689 # reset current counter
690 self.__crtTime = now;
691 self.__crtSent = 0;
692 self.__crtDrop = 0;
693
694 # compute the history
695 valSent = self.__prvSent * self.__hWeight + self.__crtSent * (1 - self.__hWeight);
696
697 doSend = True;
698
699 # when we should start dropping messages
700 level = self.maxMsgRate - self.maxMsgRate / 10;
701
702 if valSent > (self.maxMsgRate - level) :
703 if random.randint(0,self.maxMsgRate / 10) >= (self.maxMsgRate - valSent):
704 doSend = False;
705
706 # counting sent and dropped messages
707 if doSend:
708 self.__crtSent+=1;
709 else:
710 self.__crtDrop+=1;
711
712 return doSend;
713
714 ################################################################################################
715 # Private variables. Don't touch
716 ################################################################################################
717
718 __valueTypes = {
719 type("string"): 0, # XDR_STRING (see ApMon.h from C/C++ ApMon version)
720 type(1): 2, # XDR_INT32
721 type(1.0): 5}; # XDR_REAL64
722
723 __packFunctions = {
724 0: xdrlib.Packer.pack_string,
725 2: xdrlib.Packer.pack_int,
726 5: xdrlib.Packer.pack_double }
727
728 __defaultPort = 8884
729 __version = "2.2.1-py" # apMon version number
730