ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/apmon.py
Revision: 1.5
Committed: Mon Apr 3 08:57:32 2006 UTC (19 years, 1 month ago) by corvo
Content type: text/x-python
Branch: MAIN
Changes since 1.4: +221 -129 lines
Log Message:
new apmon version

File Contents

# Content
1
2 """
3 * ApMon - Application Monitoring Tool
4 * Version: 2.2.4
5 *
6 * Copyright (C) 2006 California Institute of Technology
7 *
8 * Permission is hereby granted, free of charge, to use, copy and modify
9 * this software and its documentation (the "Software") for any
10 * purpose, provided that existing copyright notices are retained in
11 * all copies and that this notice is included verbatim in any distributions
12 * or substantial portions of the Software.
13 * This software is a part of the MonALISA framework (http://monalisa.cacr.caltech.edu).
14 * Users of the Software are asked to feed back problems, benefits,
15 * and/or suggestions about the software to the MonALISA Development Team
16 * (developers@monalisa.cern.ch). Support for this software - fixing of bugs,
17 * incorporation of new features - is done on a best effort basis. All bug
18 * fixes and enhancements will be made available under the same terms and
19 * conditions as the original software,
20
21 * IN NO EVENT SHALL THE AUTHORS OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
22 * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
23 * OF THE USE OF THIS SOFTWARE, ITS DOCUMENTATION, OR ANY DERIVATIVES THEREOF,
24 * EVEN IF THE AUTHORS HAVE BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
25
26 * THE AUTHORS AND DISTRIBUTORS SPECIFICALLY DISCLAIM ANY WARRANTIES,
27 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY,
28 * FITNESS FOR A PARTICULAR PURPOSE, AND NON-INFRINGEMENT. THIS SOFTWARE IS
29 * PROVIDED ON AN "AS IS" BASIS, AND THE AUTHORS AND DISTRIBUTORS HAVE NO
30 * OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR
31 * MODIFICATIONS.
32 """
33
34 """
35 apmon.py
36
37 This is a python implementation for the ApMon API for sending
38 data to the MonALISA service.
39
40 For further details about ApMon please see the C/C++ or Java documentation
41 You can find a sample usage of this module in apmTest.py.
42
43 Note that the parameters must be either integers(32 bits) or doubles(64 bits).
44 Sending strings is supported, but they will not be stored in the
45 farm's store nor shown in the farm's window in the MonALISA client.
46 """
47
48 import re
49 import xdrlib
50 import socket
51 import struct
52 import StringIO
53 import threading
54 import time
55 import Logger
56 import ProcInfo
57 import random
58 import copy
59
60 #__all__ = ["ApMon"]
61
62 #__debug = False # set this to True to be verbose
63
64 class ApMon:
65 """
66 Main class for sending monitoring data to a MonaLisa module.
67 One or more destinations can be chosen for the data. See constructor.
68
69 The data is packed in UDP datagrams, using XDR. The following fields are sent:
70 - version & password (string)
71 - cluster name (string)
72 - node name (string)
73 - number of parameters (int)
74 - for each parameter:
75 - name (string)
76 - value type (int)
77 - value
78 - optionally a (int) with the given timestamp
79
80 Attributes (public):
81 - destinations - a list containing (ip, port, password) tuples
82 - configAddresses - list with files and urls from where the config is read
83 - configRecheckInterval - period, in seconds, to check for changes
84 in the configAddresses list
85 - configRecheck - boolean - whether to recheck periodically for changes
86 in the configAddresses list
87 """
88
89 __defaultOptions = {
90 'job_monitoring': True, # perform (or not) job monitoring
91 'job_interval' : 120, # at this interval (in seconds)
92 'job_data_sent' : 0, # time from Epoch when job information was sent; don't touch!
93
94 'job_cpu_time' : True, # elapsed time from the start of this job in seconds
95 'job_run_time' : True, # processor time spent running this job in seconds
96 'job_cpu_usage' : True, # current percent of the processor used for this job, as reported by ps
97 'job_virtualmem': True, # size in JB of the virtual memory occupied by the job, as reported by ps
98 'job_rss' : True, # size in KB of the resident image size of the job, as reported by ps
99 'job_mem_usage' : True, # percent of the memory occupied by the job, as reported by ps
100 'job_workdir_size': True, # size in MB of the working directory of the job
101 'job_disk_total': True, # size in MB of the total size of the disk partition containing the working directory
102 'job_disk_used' : True, # size in MB of the used disk partition containing the working directory
103 'job_disk_free' : True, # size in MB of the free disk partition containing the working directory
104 'job_disk_usage': True, # percent of the used disk partition containing the working directory
105 'job_open_files': True, # number of open file descriptors
106
107 'sys_monitoring': True, # perform (or not) system monitoring
108 'sys_interval' : 120, # at this interval (in seconds)
109 'sys_data_sent' : 0, # time from Epoch when system information was sent; don't touch!
110
111 'sys_cpu_usr' : True, # cpu-usage information
112 'sys_cpu_sys' : True, # all these will produce coresponding paramas without "sys_"
113 'sys_cpu_nice' : True,
114 'sys_cpu_idle' : True,
115 'sys_cpu_usage' : True,
116 'sys_load1' : True, # system load information
117 'sys_load5' : True,
118 'sys_load15' : True,
119 'sys_mem_used' : True, # memory usage information
120 'sys_mem_free' : True,
121 'sys_mem_usage' : True,
122 'sys_pages_in' : True,
123 'sys_pages_out' : True,
124 'sys_swap_used' : True, # swap usage information
125 'sys_swap_free' : True,
126 'sys_swap_usage': True,
127 'sys_swap_in' : True,
128 'sys_swap_out' : True,
129 'sys_net_in' : True, # network transfer in kBps
130 'sys_net_out' : True, # these will produce params called ethX_in, ethX_out, ethX_errs
131 'sys_net_errs' : True, # for each eth interface
132 'sys_net_sockets' : True, # number of opened sockets for each proto => sockets_tcp/udp/unix ...
133 'sys_net_tcp_details' : True, # number of tcp sockets in each state => sockets_tcp_LISTEN, ...
134 'sys_processes' : True,
135 'sys_uptime' : True, # uptime of the machine, in days (float number)
136
137 'general_info' : True, # send (or not) general host information once every 2 x $sys_interval seconds
138 'general_data_sent': 0, # time from Epoch when general information was sent; don't touch!
139
140 'hostname' : True,
141 'ip' : True, # will produce ethX_ip params for each interface
142 'cpu_MHz' : True,
143 'no_CPUs' : True, # number of CPUs
144 'total_mem' : True,
145 'total_swap' : True,
146 'cpu_vendor_id' : True,
147 'cpu_family' : True,
148 'cpu_model' : True,
149 'cpu_model_name': True,
150 'bogomips' : True};
151
152 def __init__ (self, initValue, defaultLogLevel = Logger.INFO):
153 """
154 Class constructor:
155 - if initValue is a string, put it in configAddresses and load destinations
156 from the file named like that. if it starts with "http://", the configuration
157 is loaded from that URL. For background monitoring, given parameters will overwrite defaults
158
159 - if initValue is a list, put its contents in configAddresses and create
160 the list of destinations from all those sources. For background monitoring,
161 given parameters will overwrite defaults (see __defaultOptions)
162
163 - if initValue is a tuple (of strings), initialize destinations with that values.
164 Strings in this tuple have this form: "{hostname|ip}[:port][ passwd]", the
165 default port being 8884 and the default password being "". Background monitoring will be
166 enabled sending the parameters active from __defaultOptions (see end of file)
167
168 - if initValue is a hash (key = string(hostname|ip[:port][ passwd]),
169 val = hash{'param_name': True/False, ...}) the given options for each destination
170 will overwrite the default parameters (see __defaultOptions)
171 """
172 self.destinations = {} # empty, by default; key = tuple (host, port, pass) ; val = hash {"param_mame" : True/False, ...}
173 self.destPrevData = {} # empty, by defaul; key = tuple (host, port, pass) ; val = hash {"param_mame" : value, ...}
174 self.senderRef = {} # key = tuple (host, port, pass); val = hash {'INSTANCE_ID', 'SEQ_NR' }
175 self.configAddresses = [] # empty, by default; list of files/urls from where we read config
176 self.configRecheckInterval = 600 # 10 minutes
177 self.configRecheck = True # enabled by default
178 self.performBgMonitoring = True # by default, perform background monitoring
179 self.monitoredJobs = {} # Monitored jobs; key = pid; value = hash with
180 self.maxMsgRate = 10 # Maximum number of messages allowed to be sent per second
181 self.__defaultSenderRef = {'INSTANCE_ID': random.randint(0,0x7FFFFFFE), 'SEQ_NR': 0};
182 self.__defaultUserCluster = "ApMon_UserSend";
183 self.__defaultUserNode = socket.getfqdn();
184 self.__defaultSysMonCluster = "ApMon_SysMon";
185 self.__defaultSysMonNode = socket.getfqdn();
186 # don't touch these:
187 self.__freed = False
188 self.__udpSocket = None
189 self.__configUpdateLock = threading.Lock()
190 self.__configUpdateEvent = threading.Event()
191 self.__configUpdateFinished = threading.Event()
192 self.__bgMonitorLock = threading.Lock()
193 self.__bgMonitorEvent = threading.Event()
194 self.__bgMonitorFinished = threading.Event()
195 # don't allow a user to send more than MAX_MSG messages per second, in average
196 self.__crtTime = 0;
197 self.__prvTime = 0;
198 self.__prvSent = 0;
199 self.__prvDrop = 0;
200 self.__crtSent = 0;
201 self.__crtDrop = 0;
202 self.__hWeight = 0.92;
203 self.logger = Logger.Logger(defaultLogLevel)
204 self.setDestinations(initValue)
205 self.__udpSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
206 if len(self.configAddresses) > 0:
207 # if there are addresses that need to be monitored,
208 # start config checking and reloading thread
209 th = threading.Thread(target=self.__configLoader)
210 th.setDaemon(True) # this is a daemon thread
211 th.start()
212 # create the ProcInfo instance
213 self.procInfo = ProcInfo.ProcInfo(self.logger);
214 # self.procInfo.update();
215 # start the background monitoring thread
216 th = threading.Thread(target=self.__bgMonitor);
217 th.setDaemon(True);
218 th.start();
219
220 def sendParams (self, params):
221 """
222 Send multiple parameters to MonALISA, with default (last given) cluser and node names.
223 """
224 self.sendTimedParams (-1, params)
225
226 def sendTimedParams (self, timeStamp, params):
227 """
228 Send multiple parameters, specifying the time for them, with default (last given) cluster and node names.
229 (See sendTimedParameters for more details)
230 """
231 self.sendTimedParameters (None, None, timeStamp, params);
232
233 def sendParameter (self, clusterName, nodeName, paramName, paramValue):
234 """
235 Send a single parameter to MonALISA.
236 """
237 self.sendTimedParameter(clusterName, nodeName, -1, paramName, paramValue);
238
239 def sendTimedParameter (self, clusterName, nodeName, timeStamp, paramName, paramValue):
240 """
241 Send a single parameter, with a given time.
242 """
243 self.sendTimedParameters (clusterName, nodeName, timeStamp, {paramName:paramValue})
244
245 def sendParameters (self, clusterName, nodeName, params):
246 """
247 Send multiple parameters specifying cluster and node name for them
248 """
249 self.sendTimedParameters (clusterName, nodeName, -1, params);
250
251 def sendTimedParameters (self, clusterName, nodeName, timeStamp, params):
252 """
253 Send multiple monitored parameters to MonALISA.
254
255 - clusterName is the name of the cluster being monitored. The first
256 time this function is called, this paramenter must not be None. Then,
257 it can be None; last given clusterName will be used instead.
258 - nodeName is the name of the node for which are the parameters. If this
259 is None, the full hostname of this machine will be sent instead.
260 - timeStamp, if > 0, is given time for the parameters. This is in seconds from Epoch.
261 Note that this option should be used only if you are sure about the time for the result.
262 Otherwize, the parameters will be assigned a correct time (obtained from NTP servers)
263 in MonALISA service. This option can be usefull when parsing logs, for example.
264 - params is a dictionary containing pairs with:
265 - key: parameter name
266 - value: parameter value, either int or float.
267 or params is a vector of tuples (key, value). This version can be used
268 in case you want to send the parameters in a given order.
269
270 NOTE that python doesn't know about 32-bit floats (only 64-bit floats!)
271 """
272 if (clusterName == None) or (clusterName == ""):
273 clusterName = self.__defaultUserCluster
274 else:
275 self.__defaultUserCluster = clusterName
276 if nodeName == None:
277 nodeName = self.__defaultUserNode
278 else:
279 self.__defaultUserNode = nodeName
280 if len(self.destinations) == 0:
281 self.logger.log(Logger.WARNING, "Not sending parameters since no destination is defined.");
282 return
283 self.__configUpdateLock.acquire();
284 for dest in self.destinations.keys():
285 self.__directSendParams(dest, clusterName, nodeName, timeStamp, params);
286 self.__configUpdateLock.release();
287
288 def addJobToMonitor (self, pid, workDir, clusterName, nodeName):
289 """
290 Add a new job to monitor.
291 """
292 self.__bgMonitorLock.acquire();
293 self.monitoredJobs[pid] = {};
294 self.monitoredJobs[pid]['CLUSTER_NAME'] = clusterName;
295 self.monitoredJobs[pid]['NODE_NAME'] = nodeName;
296 self.procInfo.addJobToMonitor(pid, workDir);
297 self.__bgMonitorLock.release();
298
299 def removeJobToMonitor (self, pid):
300 """
301 Remove a job from being monitored.
302 """
303 self.__bgMonitorLock.acquire();
304 self.procInfo.removeJobToMonitor(pid);
305 del self.monitoredJobs[pid];
306 self.__bgMonitorLock.release();
307
308 def setMonitorClusterNode (self, clusterName, nodeName):
309 """
310 Set the cluster and node names where to send system related information.
311 """
312 self.__bgMonitorLock.acquire();
313 if (clusterName != None) and (clusterName != ""):
314 self.__defaultSysMonCluster = clusterName;
315 if (nodeName != None) and (nodeName != ""):
316 self.__defaultSysMonNode = nodeName;
317 self.__bgMonitorLock.release();
318
319 def enableBgMonitoring (self, onOff):
320 """
321 Enable or disable background monitoring. Note that background monitoring information
322 can still be sent if user calls the sendBgMonitoring method.
323 """
324 self.performBgMonitoring = onOff;
325
326 def sendBgMonitoring (self, mustSend = False):
327 """
328 Send background monitoring about system and jobs to all interested destinations.
329 If mustSend == True, the information is sent regardles of the elapsed time since last sent
330 If mustSend == False, the data is sent only if the required interval has passed since last sent
331 """
332 if len(self.destinations) == 0:
333 self.logger.log(Logger.WARNING, "Not sending bg monitoring info since no destination is defined.");
334 return
335 self.__bgMonitorLock.acquire();
336 now = int(time.time());
337 updatedProcInfo = False;
338 for destination, options in self.destinations.iteritems():
339 sysParams = [];
340 jobParams = [];
341 prevRawData = self.destPrevData[destination];
342 # for each destination and its options, check if we have to report any background monitoring data
343 if(options['sys_monitoring'] and (mustSend or options['sys_data_sent'] + options['sys_interval'] <= now)):
344 for param, active in options.items():
345 m = re.match("sys_(.+)", param);
346 if(m != None and active):
347 param = m.group(1);
348 if not (param == 'monitoring' or param == 'interval' or param == 'data_sent'):
349 sysParams.append(param)
350 options['sys_data_sent'] = now;
351 if(options['job_monitoring'] and (mustSend or options['job_data_sent'] + options['job_interval'] <= now)):
352 for param, active in options.items():
353 m = re.match("job_(.+)", param);
354 if(m != None and active):
355 param = m.group(1);
356 if not (param == 'monitoring' or param == 'interval' or param == 'data_sent'):
357 jobParams.append(param);
358 options['job_data_sent'] = now;
359 if(options['general_info'] and (mustSend or options['general_data_sent'] + 2 * int(options['sys_interval']) <= now)):
360 for param, active in options.items():
361 if not (param.startswith("sys_") or param.startswith("job_")) and active:
362 if not (param == 'general_info' or param == 'general_data_sent'):
363 sysParams.append(param);
364 options['general_data_sent'] = now;
365
366 if (not updatedProcInfo) and (((len(sysParams) > 0) or (len(jobParams) > 0))):
367 self.procInfo.update();
368 updatedProcInfo = True;
369
370 sysResults = {}
371 if(len(sysParams) > 0):
372 sysResults = self.procInfo.getSystemData(sysParams, prevRawData)
373 if(len(sysResults) > 0):
374 self.__directSendParams(destination, self.__defaultSysMonCluster, self.__defaultSysMonNode, -1, sysResults)
375 for pid, props in self.monitoredJobs.items():
376 jobResults = {}
377 if(len(jobParams) > 0):
378 jobResults = self.procInfo.getJobData(pid, jobParams)
379 if(len(jobResults) > 0):
380 self.__directSendParams(destination, props['CLUSTER_NAME'], props['NODE_NAME'], -1, jobResults)
381 self.__bgMonitorLock.release();
382
383 def setDestinations(self, initValue):
384 """
385 Set the destinations of the ApMon instance. It accepts the same parameters as the constructor.
386 """
387 if type(initValue) == type("string"):
388 self.configAddresses = [initValue]
389 self.configRecheck = True
390 self.configRecheckInterval = 600
391 self.__reloadAddresses()
392 elif type(initValue) == type([]):
393 self.configAddresses = initValue
394 self.configRecheck = True
395 self.configRecheckInterval = 600
396 self.__reloadAddresses()
397 elif type(initValue) == type(()):
398 self.configAddresses = []
399 for dest in initValue:
400 self.__addDestination (dest, self.destinations)
401 self.configRecheck = False
402 elif type(initValue) == type({}):
403 self.configAddresses = []
404 for dest, opts in initValue.items():
405 self.__addDestination (dest, self.destinations, opts)
406 self.configRecheck = False
407
408 def initializedOK(self):
409 """
410 Retruns true if there is no destination where the parameters to be sent.
411 """
412 return len(self.destinations) > 0
413
414 def setLogLevel(self, strLevel):
415 """
416 Change the log level. Given level is a string, one of 'FATAL', 'ERROR', 'WARNING',
417 'INFO', 'NOTICE', 'DEBUG'.
418 """
419 self.logger.setLogLevel(strLevel);
420
421 def setMaxMsgRate(self, rate):
422 """
423 Set the maximum number of messages that can be sent, per second.
424 """
425 self.maxMsgRate = rate;
426 self.logger.log(Logger.DEBUG, "Setting maxMsgRate to: " + str(rate));
427
428 def free(self):
429 """
430 Stop background threands, close opened sockets. You have to use this function if you want to
431 free all the resources that ApMon takes, and allow it to be garbage-collected.
432 """
433 if len(self.configAddresses) > 0:
434 self.__configUpdateEvent.set()
435 self.__configUpdateFinished.wait()
436 self.__bgMonitorEvent.set()
437 self.__bgMonitorFinished.wait()
438
439 if self.__udpSocket != None:
440 self.logger.log(Logger.DEBUG, "Closing UDP socket on ApMon object destroy.");
441 self.__udpSocket.close();
442 self.__udpSocket = None;
443 self.__freed = True
444
445
446 #########################################################################################
447 # Internal functions - Config reloader thread
448 #########################################################################################
449
450 def __configLoader(self):
451 """
452 Main loop of the thread that checks for changes and reloads the configuration
453 """
454 while not self.__configUpdateEvent.isSet():
455 self.__configUpdateEvent.wait(min(30, self.configRecheckInterval)) # don't recheck more often than 30 sec
456 if self.__configUpdateEvent.isSet():
457 break
458 if self.configRecheck:
459 self.__reloadAddresses()
460 self.logger.log(Logger.DEBUG, "Config reloaded. Seleeping for "+`self.configRecheckInterval`+" sec.");
461 self.__configUpdateFinished.set();
462
463 def __reloadAddresses(self):
464 """
465 Refresh now the destinations hash, by loading data from all sources in configAddresses
466 """
467 print "reloading addresses";
468 newDestinations = {}
469 urls = copy.deepcopy(self.configAddresses)
470 while(len(urls) > 0 and len(newDestinations) == 0):
471 src = random.choice(urls)
472 urls.remove(src)
473 self.__initializeFromFile(src, newDestinations)
474 # avoid changing config in the middle of sending packets to previous destinations
475 self.__configUpdateLock.acquire()
476 self.destinations = newDestinations
477 self.__configUpdateLock.release()
478 print "finished reloading addresses";
479
480 def __addDestination (self, aDestination, tempDestinations, options = __defaultOptions):
481 """
482 Add a destination to the list.
483
484 aDestination is a string of the form "{hostname|ip}[:port] [passwd]" without quotes.
485 If the port is not given, it will be used the default port (8884)
486 If the password is missing, it will be considered an empty string
487 """
488 aDestination = aDestination.strip().replace('\t', ' ')
489 while aDestination != aDestination.replace(' ', ' '):
490 aDestination = aDestination.replace(' ', ' ')
491 sepPort = aDestination.find (':')
492 sepPasswd = aDestination.rfind (' ')
493 if sepPort >= 0:
494 host = aDestination[0:sepPort].strip()
495 if sepPasswd > sepPort + 1:
496 port = aDestination[sepPort+1:sepPasswd].strip()
497 passwd = aDestination[sepPasswd:].strip()
498 else:
499 port = aDestination[sepPort+1:].strip()
500 passwd = ""
501 else:
502 port = str(self.__defaultPort)
503 if sepPasswd >= 0:
504 host = aDestination[0:sepPasswd].strip()
505 passwd = aDestination[sepPasswd:].strip()
506 else:
507 host = aDestination.strip()
508 passwd = ""
509 if (not port.isdigit()):
510 self.logger.log(Logger.WARNING, "Bad value for port number "+`port`+" in "+aDestination+" destination");
511 return
512 alreadyAdded = False
513 port = int(port)
514 try:
515 host = socket.gethostbyname(host) # convert hostnames to IP addresses to avoid suffocating DNSs
516 except socket.error, msg:
517 self.logger.log(Logger.ERROR, "Error resolving "+host+": "+str(msg))
518 return
519 for h, p, w in tempDestinations.keys():
520 if (h == host) and (p == port):
521 alreadyAdded = True
522 break
523 destination = (host, port, passwd)
524 if not alreadyAdded:
525 self.logger.log(Logger.INFO, "Adding destination "+host+':'+`port`+' '+passwd)
526 if(self.destinations.has_key(destination)):
527 tempDestinations[destination] = self.destinations[destination] # reuse previous options
528 else:
529 tempDestinations[destination] = copy.deepcopy(self.__defaultOptions) # have a different set of options for each dest
530 if not self.destPrevData.has_key(destination):
531 self.destPrevData[destination] = {} # set it empty only if it's really new
532 if not self.senderRef.has_key(destination):
533 self.senderRef[destination] = copy.deepcopy(self.__defaultSenderRef) # otherwise, don't reset this nr.
534 if options != self.__defaultOptions:
535 # we have to overwrite defaults with given options
536 for key, value in options.items():
537 self.logger.log(Logger.NOTICE, "Overwritting option: "+key+" = "+`value`)
538 tempDestinations[destination][key] = value
539 else:
540 self.logger.log(Logger.NOTICE, "Destination "+host+":"+str(port)+" "+passwd+" already added. Skipping it");
541
542 def __initializeFromFile (self, confFileName, tempDestinations):
543 """
544 Load destinations from confFileName file. If it's an URL (starts with "http://")
545 load configuration from there. Put all destinations in tempDestinations hash.
546
547 Calls addDestination for each line that doesn't start with # and
548 has non-whitespace characters on it
549 """
550 try:
551 if confFileName.find ("http://") == 0:
552 confFile = self.__getURL(confFileName)
553 if confFile is None:
554 return
555 else:
556 confFile = open (confFileName)
557 except IOError, ex:
558 self.logger.log(Logger.ERROR, "Cannot open "+confFileName);
559 self.logger.log(Logger.ERROR, "IOError: "+str(ex));
560 return
561 self.logger.log(Logger.INFO, "Adding destinations from "+confFileName);
562 dests = []
563 opts = {}
564 while(True):
565 line = confFile.readline();
566 if line == '':
567 break;
568 line = line.strip()
569 self.logger.log(Logger.DEBUG, "Reading line "+line);
570 if (len(line) == 0) or (line[0] == '#'):
571 continue
572 elif line.startswith("xApMon_"):
573 m = re.match("xApMon_(.*)", line);
574 if m != None:
575 m = re.match("(\S+)\s*=\s*(\S+)", m.group(1));
576 if m != None:
577 param = m.group(1); value = m.group(2);
578 if(value.upper() == "ON"):
579 value = True;
580 elif(value.upper() == "OFF"):
581 value = False;
582 elif(param.endswith("_interval")):
583 value = int(value);
584 if param == "loglevel":
585 self.logger.setLogLevel(value);
586 elif param == "maxMsgRate":
587 self.setMaxMsgRate(int(value));
588 elif param == "conf_recheck":
589 self.configRecheck = value;
590 elif param == "recheck_interval":
591 self.configRecheckInterval = value;
592 elif param.endswith("_data_sent"):
593 pass; # don't reset time in sys/job/general/_data_sent
594 else:
595 opts[param] = value;
596 else:
597 dests.append(line);
598
599 confFile.close ()
600 for line in dests:
601 self.__addDestination(line, tempDestinations, opts)
602
603 ###############################################################################################
604 # Internal functions - Background monitor thread
605 ###############################################################################################
606
607 def __bgMonitor (self):
608 while not self.__bgMonitorEvent.isSet():
609 self.__bgMonitorEvent.wait(10)
610 if self.__bgMonitorEvent.isSet():
611 break
612 if self.performBgMonitoring:
613 self.sendBgMonitoring() # send only if the interval has elapsed
614 self.__bgMonitorFinished.set()
615
616 ###############################################################################################
617 # Internal helper functions
618 ###############################################################################################
619
620 # this is a simplified replacement for urllib2 which doesn't support setting a timeout.
621 # by default, if timeout is not specified, it waits 5 seconds
622 def __getURL (self, url, timeout = 5):
623 r = re.compile("http://([^:/]+)(:(\d+))?(/.*)").match(url)
624 if r is None:
625 self.logger.log(Logger.ERROR, "Cannot open "+url+". Incorrectly formed URL.")
626 return None
627 host = r.group(1)
628 if r.group(3) == None:
629 port = 80 # no port is given, pick the default 80 for HTTP
630 else:
631 port = int(r.group(3))
632 if r.group(4) == None:
633 path = "" # no path is give, let server decide
634 else:
635 path = r.group(4)
636 sock = None
637 err = None
638 try:
639 for res in socket.getaddrinfo(host, port, socket.AF_UNSPEC, socket.SOCK_STREAM):
640 af, socktype, proto, canonname, sa = res
641 try:
642 sock = socket.socket(af, socktype, proto)
643 except socket.error, msg:
644 sock = None
645 err = msg
646 continue
647 try:
648 sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDTIMEO, struct.pack("ii", timeout, 0))
649 sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVTIMEO, struct.pack("ii", timeout, 0))
650 sock.connect(sa)
651 except socket.error, msg:
652 sock.close()
653 sock = None
654 err = msg
655 continue
656 break
657 except socket.error, msg:
658 sock = None
659 err = msg
660 if sock is None:
661 self.logger.log(Logger.ERROR, "Cannot open "+url)
662 self.logger.log(Logger.ERROR, "SocketError: "+str(err))
663 return None
664 try:
665 sock.send("GET "+path+" HTTP/1.0\n\n");
666 data = sock.recv(8192)
667 file = StringIO.StringIO(data)
668 httpStatus = 0
669 while True:
670 line = file.readline().strip()
671 if line == "":
672 break # exit at the end of file or at the first empty line (finish of http headers)
673 r = re.compile("HTTP/\d.\d (\d+)").match(line)
674 if r != None:
675 httpStatus = int(r.group(1))
676 if httpStatus == 200:
677 return file
678 else:
679 self.logger.log(Logger.ERROR, "Cannot open "+url)
680 if httpStatus == 401:
681 self.logger.log(Logger.ERROR, 'HTTPError: not authorized ['+str(httpStatus)+']')
682 elif httpStatus == 404:
683 self.logger.log(Logger.ERROR, 'HTTPError: not found ['+str(httpStatus)+']')
684 elif httpStatus == 503:
685 self.logger.log(Logger.ERROR, 'HTTPError: service unavailable ['+str(httpStatus)+']')
686 else:
687 self.logger.log(Logger.ERROR, 'HTTPError: unknown error ['+str(httpStatus)+']')
688 return None
689 except socket.error, msg:
690 self.logger.log(Logger.ERROR, "Cannot open "+url)
691 self.logger.log(Logger.ERROR, "SocketError: "+str(msg))
692 sock.close()
693 return None
694
695 def __directSendParams (self, destination, clusterName, nodeName, timeStamp, params):
696
697 if self.__shouldSend() == False:
698 self.logger.log(Logger.DEBUG, "Dropping packet since rate is too fast!");
699 return;
700
701 if destination == None:
702 self.logger.log(Logger.WARNING, "Destination is None");
703 return;
704
705 host, port, passwd = destination
706 crtSenderRef = self.senderRef[destination]
707 crtSenderRef['SEQ_NR'] = (crtSenderRef['SEQ_NR'] + 1) % 2000000000; # wrap around 2 mld
708
709 xdrPacker = xdrlib.Packer ()
710
711 xdrPacker.pack_string ("v:"+self.__version+"p:"+passwd)
712
713 xdrPacker.pack_int (crtSenderRef['INSTANCE_ID'])
714 xdrPacker.pack_int (crtSenderRef['SEQ_NR'])
715
716 xdrPacker.pack_string (clusterName)
717 xdrPacker.pack_string (nodeName)
718
719 sent_params_nr = 0
720 paramsPacker = xdrlib.Packer ()
721
722 if type(params) == type( {} ):
723 for name, value in params.iteritems():
724 if self.__packParameter(paramsPacker, name, value):
725 sent_params_nr += 1
726 elif type(params) == type( [] ):
727 for name, value in params:
728 self.logger.log(Logger.DEBUG, "Adding parameter "+name+" = "+str(value));
729 if self.__packParameter(paramsPacker, name, value):
730 sent_params_nr += 1
731 else:
732 self.logger.log(Logger.WARNING, "Unsupported params type in sendParameters: " + str(type(params)));
733
734 xdrPacker.pack_int (sent_params_nr)
735
736 if (timeStamp != None) and (timeStamp > 0):
737 paramsPacker.pack_int(timeStamp);
738
739 buffer = xdrPacker.get_buffer() + paramsPacker.get_buffer()
740 self.logger.log(Logger.NOTICE, "Building XDR packet ["+str(clusterName)+"/"+str(nodeName)+"] <"+str(crtSenderRef['SEQ_NR'])+"/"+str(crtSenderRef['INSTANCE_ID'])+"> "+str(sent_params_nr)+" params, "+str(len(buffer))+" bytes.");
741 # send this buffer to the destination, using udp datagrams
742 try:
743 self.__udpSocket.sendto(buffer, (host, port))
744 self.logger.log(Logger.NOTICE, "Packet sent to "+host+":"+str(port)+" "+passwd)
745 except socket.error, msg:
746 self.logger.log(Logger.ERROR, "Cannot send packet to "+host+":"+str(port)+" "+passwd+": "+str(msg[1]))
747 xdrPacker.reset()
748 paramsPacker.reset()
749
750 def __packParameter(self, xdrPacker, name, value):
751 if (name is None) or (name is ""):
752 self.logger.log(Logger.WARNING, "Undefined parameter name. Ignoring value "+str(value))
753 return False
754 if (value is None):
755 self.logger.log(Logger.WARNING, "Ignore " + str(name)+ " parameter because of None value")
756 return False
757 try:
758 typeValue = self.__valueTypes[type(value)]
759 xdrPacker.pack_string (name)
760 xdrPacker.pack_int (typeValue)
761 self.__packFunctions[typeValue] (xdrPacker, value)
762 self.logger.log(Logger.DEBUG, "Adding parameter "+str(name)+" = "+str(value))
763 return True
764 except Exception, ex:
765 self.logger.log(Logger.WARNING, "Error packing %s = %s; got %s" % (name, str(value), ex))
766 return False
767
768 # Destructor
769 def __del__(self):
770 if not self.__freed:
771 self.free();
772
773 # Decide if the current datagram should be sent.
774 # This decision is based on the number of messages previously sent.
775 def __shouldSend(self):
776 now = long(time.time());
777 if now != self.__crtTime :
778 # new time
779 # update previous counters;
780 self.__prvSent = self.__hWeight * self.__prvSent + (1.0 - self.__hWeight) * self.__crtSent / (now - self.__crtTime);
781 self.__prvTime = self.__crtTime;
782 self.logger.log(Logger.DEBUG, "previously sent: " + str(self.__crtSent) + "; dropped: " + str(self.__crtDrop));
783 # reset current counter
784 self.__crtTime = now;
785 self.__crtSent = 0;
786 self.__crtDrop = 0;
787
788 # compute the history
789 valSent = self.__prvSent * self.__hWeight + self.__crtSent * (1 - self.__hWeight);
790
791 doSend = True;
792
793 # when we should start dropping messages
794 level = self.maxMsgRate - self.maxMsgRate / 10;
795
796 if valSent > (self.maxMsgRate - level) :
797 if random.randint(0,self.maxMsgRate / 10) >= (self.maxMsgRate - valSent):
798 doSend = False;
799
800 # counting sent and dropped messages
801 if doSend:
802 self.__crtSent+=1;
803 else:
804 self.__crtDrop+=1;
805
806 return doSend;
807
808 ################################################################################################
809 # Private variables. Don't touch
810 ################################################################################################
811
812 __valueTypes = {
813 type("string"): 0, # XDR_STRING (see ApMon.h from C/C++ ApMon version)
814 type(1): 2, # XDR_INT32
815 type(1.0): 5}; # XDR_REAL64
816
817 __packFunctions = {
818 0: xdrlib.Packer.pack_string,
819 2: xdrlib.Packer.pack_int,
820 5: xdrlib.Packer.pack_double }
821
822 __defaultPort = 8884
823 __version = "2.2.4-py" # apMon version number
824