ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/apmon.py
Revision: 1.3
Committed: Wed Mar 8 17:10:49 2006 UTC (19 years, 1 month ago) by corvo
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_1_0_6, CRAB_1_0_5
Changes since 1.2: +102 -45 lines
Log Message:
new apmon verision with thread patch

File Contents

# User Rev Content
1 corvo 1.1
2     """
3     * ApMon - Application Monitoring Tool
4 corvo 1.3 * Version: 2.2.1
5 corvo 1.1 *
6 corvo 1.3 * Copyright (C) 2006 California Institute of Technology
7 corvo 1.1 *
8     * Permission is hereby granted, free of charge, to use, copy and modify
9     * this software and its documentation (the "Software") for any
10     * purpose, provided that existing copyright notices are retained in
11     * all copies and that this notice is included verbatim in any distributions
12     * or substantial portions of the Software.
13     * This software is a part of the MonALISA framework (http://monalisa.cacr.caltech.edu).
14     * Users of the Software are asked to feed back problems, benefits,
15     * and/or suggestions about the software to the MonALISA Development Team
16     * (developers@monalisa.cern.ch). Support for this software - fixing of bugs,
17     * incorporation of new features - is done on a best effort basis. All bug
18     * fixes and enhancements will be made available under the same terms and
19     * conditions as the original software,
20    
21     * IN NO EVENT SHALL THE AUTHORS OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
22     * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
23     * OF THE USE OF THIS SOFTWARE, ITS DOCUMENTATION, OR ANY DERIVATIVES THEREOF,
24     * EVEN IF THE AUTHORS HAVE BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
25    
26     * THE AUTHORS AND DISTRIBUTORS SPECIFICALLY DISCLAIM ANY WARRANTIES,
27     * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY,
28     * FITNESS FOR A PARTICULAR PURPOSE, AND NON-INFRINGEMENT. THIS SOFTWARE IS
29     * PROVIDED ON AN "AS IS" BASIS, AND THE AUTHORS AND DISTRIBUTORS HAVE NO
30     * OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR
31     * MODIFICATIONS.
32     """
33    
34     """
35     apmon.py
36    
37     This is a python implementation for the ApMon API for sending
38     data to the MonALISA service.
39    
40     For further details about ApMon please see the C/C++ or Java documentation
41     You can find a sample usage of this module in apmTest.py.
42    
43     Note that the parameters must be either integers(32 bits) or doubles(64 bits).
44     Sending strings is supported, but they will not be stored in the
45     farm's store nor shown in the farm's window in the MonALISA client.
46     """
47    
48     import re
49     import xdrlib
50     import socket
51     import urllib2
52     import threading
53     import time
54     import Logger
55     import ProcInfo
56 corvo 1.2 import random
57 corvo 1.3 import copy
58 corvo 1.1
59     #__all__ = ["ApMon"]
60    
61 corvo 1.3 #__debug = False # self.destPrevData[destination];set this to True to be verbose
62 corvo 1.1
63     class ApMon:
64     """
65     Main class for sending monitoring data to a MonaLisa module.
66     One or more destinations can be chosen for the data. See constructor.
67    
68     The data is packed in UDP datagrams, using XDR. The following fields are sent:
69     - version & password (string)
70     - cluster name (string)
71     - node name (string)
72     - number of parameters (int)
73     - for each parameter:
74     - name (string)
75     - value type (int)
76     - value
77     - optionally a (int) with the given timestamp
78    
79     Attributes (public):
80     - destinations - a list containing (ip, port, password) tuples
81     - configAddresses - list with files and urls from where the config is read
82     - configRecheckInterval - period, in seconds, to check for changes
83     in the configAddresses list
84     - configRecheck - boolean - whether to recheck periodically for changes
85     in the configAddresses list
86     """
87    
88     __defaultOptions = {
89     'job_monitoring': True, # perform (or not) job monitoring
90     'job_interval' : 10, # at this interval (in seconds)
91     'job_data_sent' : 0, # time from Epoch when job information was sent; don't touch!
92    
93     'job_cpu_time' : True, # elapsed time from the start of this job in seconds
94     'job_run_time' : True, # processor time spent running this job in seconds
95     'job_cpu_usage' : True, # current percent of the processor used for this job, as reported by ps
96     'job_virtualmem': True, # size in JB of the virtual memory occupied by the job, as reported by ps
97     'job_rss' : True, # size in KB of the resident image size of the job, as reported by ps
98     'job_mem_usage' : True, # percent of the memory occupied by the job, as reported by ps
99     'job_workdir_size': True, # size in MB of the working directory of the job
100     'job_disk_total': True, # size in MB of the total size of the disk partition containing the working directory
101     'job_disk_used' : True, # size in MB of the used disk partition containing the working directory
102     'job_disk_free' : True, # size in MB of the free disk partition containing the working directory
103     'job_disk_usage': True, # percent of the used disk partition containing the working directory
104 corvo 1.2 'job_open_files': True, # number of open file descriptors
105 corvo 1.1
106     'sys_monitoring': True, # perform (or not) system monitoring
107     'sys_interval' : 10, # at this interval (in seconds)
108     'sys_data_sent' : 0, # time from Epoch when system information was sent; don't touch!
109    
110     'sys_cpu_usr' : False, # cpu-usage information
111     'sys_cpu_sys' : False, # all these will produce coresponding paramas without "sys_"
112     'sys_cpu_nice' : False,
113     'sys_cpu_idle' : False,
114     'sys_cpu_usage' : True,
115     'sys_load1' : True, # system load information
116     'sys_load5' : True,
117     'sys_load15' : True,
118     'sys_mem_used' : False, # memory usage information
119     'sys_mem_free' : False,
120     'sys_mem_usage' : True,
121     'sys_pages_in' : False,
122     'sys_pages_out' : False,
123     'sys_swap_used' : True, # swap usage information
124     'sys_swap_free' : False,
125     'sys_swap_usage': True,
126     'sys_swap_in' : False,
127     'sys_swap_out' : False,
128     'sys_net_in' : True, # network transfer in kBps
129     'sys_net_out' : True, # these will produce params called ethX_in, ethX_out, ethX_errs
130     'sys_net_errs' : False, # for each eth interface
131 corvo 1.3 'sys_net_sockets' : True, # number of opened sockets for each proto => sockets_tcp/udp/unix ...
132     'sys_net_tcp_details' : True, # number of tcp sockets in each state => sockets_tcp_LISTEN, ...
133 corvo 1.1 'sys_processes' : True,
134     'sys_uptime' : True, # uptime of the machine, in days (float number)
135    
136     'general_info' : True, # send (or not) general host information once every 2 $sys_interval seconds
137     'general_data_sent': 0, # time from Epoch when general information was sent; don't touch!
138    
139     'hostname' : True,
140     'ip' : True, # will produce ethX_ip params for each interface
141     'cpu_MHz' : True,
142     'no_CPUs' : True, # number of CPUs
143     'total_mem' : True,
144 corvo 1.2 'total_swap' : True,
145     'cpu_vendor_id' : True,
146     'cpu_family' : True,
147     'cpu_model' : True,
148     'cpu_model_name': True,
149     'bogomips' : True};
150 corvo 1.1
151 corvo 1.3 def __init__ (self, initValue, defaultLogLevel = Logger.INFO):
152 corvo 1.1 """
153     Class constructor:
154     - if initValue is a string, put it in configAddresses and load destinations
155     from the file named like that. if it starts with "http://", the configuration
156     is loaded from that URL. For background monitoring, given parameters will overwrite defaults
157    
158     - if initValue is a list, put its contents in configAddresses and create
159     the list of destinations from all those sources. For background monitoring,
160     given parameters will overwrite defaults (see __defaultOptions)
161    
162     - if initValue is a tuple (of strings), initialize destinations with that values.
163     Strings in this tuple have this form: "{hostname|ip}[:port][ passwd]", the
164     default port being 8884 and the default password being "". Background monitoring will be
165     enabled sending the parameters active from __defaultOptions (see end of file)
166    
167     - if initValue is a hash (key = string(hostname|ip[:port][ passwd]),
168     val = hash{'param_name': True/False, ...}) the given options for each destination
169     will overwrite the default parameters (see __defaultOptions)
170     """
171 corvo 1.2 self.destinations = {} # empty, by default; key = tuple (host, port, pass) ; val = hash {"param_mame" : True/False, ...}
172 corvo 1.3 self.destPrevData = {} # empty, by defaul; key = tuple (host, port, pass) ; val = hash {"param_mame" : value, ...}
173 corvo 1.2 self.configAddresses = [] # empty, by default; list of files/urls from where we read config
174     self.configRecheckInterval = 120 # 2 minutes
175     self.configRecheck = True # enabled by default
176     self.performBgMonitoring = True # by default, perform background monitoring
177     self.monitoredJobs = {} # Monitored jobs; key = pid; value = hash with
178 corvo 1.3 self.maxMsgRate = 20; # Maximum number of messages allowed to be sent per second
179     self.sender = {'INSTANCE_ID': random.randint(0,0x7FFFFFFE), 'SEQ_NR': 0};
180 corvo 1.2 self.__defaultUserCluster = "ApMon_UserSend";
181     self.__defaultUserNode = socket.getfqdn();
182     self.__defaultSysMonCluster = "ApMon_SysMon";
183     self.__defaultSysMonNode = socket.getfqdn();
184     # don't touch these:
185 corvo 1.3 self.__freed = False
186 corvo 1.2 self.__initializedOK = True
187     self.__udpSocket = None
188     self.__configUpdateLock = threading.Lock()
189     self.__configUpdateEvent = threading.Event()
190     self.__bgMonitorLock = threading.Lock()
191     self.__bgMonitorEvent = threading.Event()
192     # don't allow a user to send more than MAX_MSG messages per second, in average
193     self.__crtTime = 0;
194     self.__prvTime = 0;
195     self.__prvSent = 0;
196     self.__prvDrop = 0;
197     self.__crtSent = 0;
198     self.__crtDrop = 0;
199     self.__hWeight = 0.92;
200 corvo 1.3 self.logger = Logger.Logger(defaultLogLevel)
201 corvo 1.1 if type(initValue) == type("string"):
202     self.configAddresses.append(initValue)
203     self.__reloadAddresses()
204     elif type(initValue) == type([]):
205     self.configAddresses = initValue
206     self.__reloadAddresses()
207     elif type(initValue) == type(()):
208     for dest in initValue:
209     self.__addDestination (dest, self.destinations)
210     elif type(initValue) == type({}):
211     for dest, opts in initValue.items():
212     self.__addDestination (dest, self.destinations, opts)
213    
214     self.__initializedOK = (len (self.destinations) > 0)
215     if not self.__initializedOK:
216     self.logger.log(Logger.ERROR, "Failed to initialize. No destination defined.");
217 corvo 1.3 # self.__defaultClusterName = None
218     # self.__defaultNodeName = self.getMyHo
219 corvo 1.1 if self.__initializedOK:
220     self.__udpSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
221     if len(self.configAddresses) > 0:
222     # if there are addresses that need to be monitored,
223     # start config checking and reloading thread
224     th = threading.Thread(target=self.__configLoader)
225     th.setDaemon(True) # this is a daemon thread
226     th.start()
227     # create the ProcInfo instance
228     self.procInfo = ProcInfo.ProcInfo(self.logger);
229 corvo 1.3 # self.procInfo.update();
230 corvo 1.1 # start the background monitoring thread
231     th = threading.Thread(target=self.__bgMonitor);
232     th.setDaemon(True);
233     th.start();
234    
235    
236     def sendParams (self, params):
237     """
238     Send multiple parameters to MonALISA, with default (last given) cluser and node names.
239     """
240     self.sendTimedParams (-1, params)
241    
242     def sendTimedParams (self, timeStamp, params):
243     """
244     Send multiple parameters, specifying the time for them, with default (last given) cluster and node names.
245     (See sendTimedParameters for more details)
246     """
247     self.sendTimedParameters (None, None, timeStamp, params);
248    
249     def sendParameter (self, clusterName, nodeName, paramName, paramValue):
250     """
251     Send a single parameter to MonALISA.
252     """
253     self.sendTimedParameter(clusterName, nodeName, -1, paramName, paramValue);
254    
255     def sendTimedParameter (self, clusterName, nodeName, timeStamp, paramName, paramValue):
256     """
257     Send a single parameter, with a given time.
258     """
259     self.sendTimedParameters (clusterName, nodeName, timeStamp, {paramName:paramValue})
260    
261     def sendParameters (self, clusterName, nodeName, params):
262     """
263     Send multiple parameters specifying cluster and node name for them
264     """
265     self.sendTimedParameters (clusterName, nodeName, -1, params);
266    
267     def sendTimedParameters (self, clusterName, nodeName, timeStamp, params):
268     """
269     Send multiple monitored parameters to MonALISA.
270    
271     - clusterName is the name of the cluster being monitored. The first
272     time this function is called, this paramenter must not be None. Then,
273     it can be None; last given clusterName will be used instead.
274     - nodeName is the name of the node for which are the parameters. If this
275     is None, the full hostname of this machine will be sent instead.
276     - timeStamp, if > 0, is given time for the parameters. This is in seconds from Epoch.
277     Note that this option should be used only if you are sure about the time for the result.
278     Otherwize, the parameters will be assigned a correct time (obtained from NTP servers)
279     in MonALISA service. This option can be usefull when parsing logs, for example.
280     - params is a dictionary containing pairs with:
281     - key: parameter name
282     - value: parameter value, either int or float.
283     or params is a vector of tuples (key, value). This version can be used
284     in case you want to send the parameters in a given order.
285    
286     NOTE that python doesn't know about 32-bit floats (only 64-bit floats!)
287     """
288     if not self.__initializedOK:
289     self.logger.log(Logger.WARNING, "Not initialized correctly. Message NOT sent!");
290     return
291 corvo 1.3 if (clusterName == None) or (clusterName == ""):
292 corvo 1.1 clusterName = self.__defaultUserCluster
293     else:
294     self.__defaultUserCluster = clusterName
295     if nodeName == None:
296     nodeName = self.__defaultUserNode
297     else:
298     self.__defaultUserNode = nodeName
299 corvo 1.3 self.__configUpdateLock.acquire();
300 corvo 1.1 for dest in self.destinations.keys():
301 corvo 1.3 self.__directSendParams(self.sender, dest, clusterName, nodeName, timeStamp, params);
302     self.__configUpdateLock.release();
303 corvo 1.1
304     def addJobToMonitor (self, pid, workDir, clusterName, nodeName):
305     """
306     Add a new job to monitor.
307     """
308     self.__bgMonitorLock.acquire();
309     self.monitoredJobs[pid] = {};
310     self.monitoredJobs[pid]['CLUSTER_NAME'] = clusterName;
311     self.monitoredJobs[pid]['NODE_NAME'] = nodeName;
312     self.procInfo.addJobToMonitor(pid, workDir);
313     self.__bgMonitorLock.release();
314    
315     def removeJobToMonitor (self, pid):
316     """
317     Remove a job from being monitored.
318     """
319     self.__bgMonitorLock.acquire();
320     self.procInfo.removeJobToMonitor(pid);
321     del self.monitoredJobs[pid];
322     self.__bgMonitorLock.release();
323    
324     def setMonitorClusterNode (self, clusterName, nodeName):
325     """
326     Set the cluster and node names where to send system related information.
327     """
328     self.__bgMonitorLock.acquire();
329 corvo 1.3 if (clusterName != None) and (clusterName != ""):
330     self.__defaultSysMonCluster = clusterName;
331     if (nodeName != None) and (nodeName != ""):
332     self.__defaultSysMonNode = nodeName;
333 corvo 1.1 self.__bgMonitorLock.release();
334    
335     def enableBgMonitoring (self, onOff):
336     """
337     Enable or disable background monitoring. Note that background monitoring information
338     can still be sent if user calls the sendBgMonitoring method.
339     """
340     self.performBgMonitoring = onOff;
341 corvo 1.3
342     def sendBgMonitoring (self, mustSend = False):
343 corvo 1.1 """
344 corvo 1.3 Send background monitoring about system and jobs to all interested destinations.
345     If mustSend == True, the information is sent regardles of the elapsed time since last sent
346     If mustSend == False, the data is sent only if the required interval has passed since last sent
347 corvo 1.1 """
348     self.__bgMonitorLock.acquire();
349     now = int(time.time());
350 corvo 1.3 updatedProcInfo = False;
351     for destination, options in self.destinations.iteritems():
352 corvo 1.1 sysParams = [];
353     jobParams = [];
354 corvo 1.3 prevRawData = self.destPrevData[destination];
355 corvo 1.1 # for each destination and its options, check if we have to report any background monitoring data
356 corvo 1.3 if(options['sys_monitoring'] and (mustSend or options['sys_data_sent'] + options['sys_interval'] <= now)):
357 corvo 1.1 for param, active in options.items():
358     m = re.match("sys_(.+)", param);
359     if(m != None and active):
360     param = m.group(1);
361     if not (param == 'monitoring' or param == 'interval' or param == 'data_sent'):
362     sysParams.append(param)
363     options['sys_data_sent'] = now;
364 corvo 1.3 if(options['job_monitoring'] and (mustSend or options['job_data_sent'] + options['job_interval'] <= now)):
365 corvo 1.1 for param, active in options.items():
366     m = re.match("job_(.+)", param);
367     if(m != None and active):
368     param = m.group(1);
369     if not (param == 'monitoring' or param == 'interval' or param == 'data_sent'):
370     jobParams.append(param);
371     options['job_data_sent'] = now;
372 corvo 1.3 if(options['general_info'] and (mustSend or options['general_data_sent'] + 2 * int(options['sys_interval']) <= now)):
373 corvo 1.1 for param, active in options.items():
374     if not (param.startswith("sys_") or param.startswith("job_")) and active:
375     if not (param == 'general_info' or param == 'general_data_sent'):
376     sysParams.append(param);
377 corvo 1.3 options['general_data_sent'] = now;
378    
379     if (not updatedProcInfo) and ((len(sysParams) > 0) or (len(jobParams) > 0) ):
380     self.procInfo.update();
381     updatedProcInf = True;
382    
383 corvo 1.1 sysResults = {}
384     if(len(sysParams) > 0):
385 corvo 1.3 sysResults = self.procInfo.getSystemData(sysParams, prevRawData);
386     self.updateLastSentTime(options, self.destinations[destination]);
387     if( (type(sysResults) == type( {} )) and len(sysResults.keys()) > 0):
388     self.__directSendParams(self.sender, destination, self.__defaultSysMonCluster, self.__defaultSysMonNode, -1, sysResults);
389     if( (type(sysResults) == type( [] )) and len(sysResults) > 0):
390     self.__directSendParams(self.sender, destination, self.__defaultSysMonCluster, self.__defaultSysMonNode, -1, sysResults);
391 corvo 1.1 for pid, props in self.monitoredJobs.items():
392     jobResults = {};
393     if(len(jobParams) > 0):
394     jobResults = self.procInfo.getJobData(pid, jobParams);
395     if(len(jobResults) > 0):
396 corvo 1.3 self.__directSendParams(self.sender, destination, props['CLUSTER_NAME'], props['NODE_NAME'], -1, jobResults);
397 corvo 1.1 self.__bgMonitorLock.release();
398    
399 corvo 1.3 # copy the time when last data was sent
400     def updateLastSentTime(self, srcOpts, dstOpts):
401     if srcOpts.has_key('general_data_sent'):
402     dstOpts['general_data_sent'] = srcOpts['general_data_sent'];
403     if srcOpts.has_key('sys_data_sent'):
404     dstOpts['sys_data_sent'] = srcOpts['sys_data_sent'];
405     if srcOpts.has_key('job_data_sent'):
406     dstOpts['job_data_sent'] = srcOpts['job_data_sent'];
407    
408 corvo 1.1 def setLogLevel (self, strLevel):
409     """
410     Change the log level. Given level is a string, one of 'FATAL', 'ERROR', 'WARNING',
411     'INFO', 'NOTICE', 'DEBUG'.
412     """
413     self.logger.setLogLevel(strLevel);
414    
415 corvo 1.2 def setMaxMsgRate(self, rate):
416     """
417     Set the maximum number of messages that can be sent, per second.
418     """
419     self.maxMsgRate = rate;
420     self.logger.log(Logger.DEBUG, "Setting maxMsgRate to: " + str(rate));
421    
422     def free(self):
423     """
424     Stop background threands, close opened sockets. You have to use this function if you want to
425     free all the resources that ApMon takes, and allow it to be garbage-collected.
426     """
427 corvo 1.3 if self.__configUpdateEvent != None:
428     self.__configUpdateEvent.set();
429     if self.__bgMonitorEvent != None:
430     self.__bgMonitorEvent.set();
431 corvo 1.2 time.sleep(0.01);
432     if self.__udpSocket != None:
433     self.logger.log(Logger.DEBUG, "Closing UDP socket on ApMon object destroy.");
434     self.__udpSocket.close();
435 corvo 1.3 self.__udpSocket = None;
436     self.__freed = True
437 corvo 1.2
438    
439 corvo 1.1 #########################################################################################
440     # Internal functions - Config reloader thread
441     #########################################################################################
442    
443     def __configLoader(self):
444     """
445     Main loop of the thread that checks for changes and reloads the configuration
446     """
447 corvo 1.2 while not self.__configUpdateEvent.isSet():
448     self.__configUpdateEvent.wait(self.configRecheckInterval);
449     if self.__configUpdateEvent.isSet():
450     return;
451 corvo 1.1 if self.configRecheck:
452     self.__reloadAddresses()
453     self.logger.log(Logger.DEBUG, "Config reloaded. Seleeping for "+`self.configRecheckInterval`+" sec.");
454    
455     def __reloadAddresses(self):
456     """
457     Refresh destinations hash, by loading data from all sources in configAddresses
458     """
459     newDestinations = {}
460     for src in self.configAddresses:
461     self.__initializeFromFile(src, newDestinations)
462     # avoid changing config in the middle of sending packets to previous destinations
463     self.__configUpdateLock.acquire()
464     self.destinations = newDestinations
465     self.__configUpdateLock.release()
466    
467     def __addDestination (self, aDestination, tempDestinations, options = __defaultOptions):
468     """
469     Add a destination to the list.
470    
471     aDestination is a string of the form "{hostname|ip}[:port] [passwd]" without quotes.
472     If the port is not given, it will be used the default port (8884)
473     If the password is missing, it will be considered an empty string
474     """
475     aDestination = aDestination.strip().replace('\t', ' ')
476     while aDestination != aDestination.replace(' ', ' '):
477     aDestination = aDestination.replace(' ', ' ')
478     sepPort = aDestination.find (':')
479     sepPasswd = aDestination.rfind (' ')
480     if sepPort >= 0:
481     host = aDestination[0:sepPort].strip()
482     if sepPasswd > sepPort + 1:
483     port = aDestination[sepPort+1:sepPasswd].strip()
484     passwd = aDestination[sepPasswd:].strip()
485     else:
486     port = aDestination[sepPort+1:].strip()
487     passwd = ""
488     else:
489     port = str(self.__defaultPort)
490     if sepPasswd >= 0:
491     host = aDestination[0:sepPasswd].strip()
492     passwd = aDestination[sepPasswd:].strip()
493     else:
494     host = aDestination.strip()
495     passwd = ""
496     if (not port.isdigit()):
497     self.logger.log(Logger.WARNING, "Bad value for port number "+`port`+" in "+aDestination+" destination");
498     return
499     alreadyAdded = False
500     port = int(port)
501     host = socket.gethostbyname(host) # convert hostnames to IP addresses to avoid suffocating DNSs
502     for h, p, w in tempDestinations.keys():
503     if (h == host) and (p == port):
504     alreadyAdded = True
505     break
506 corvo 1.3 destination = (host, port, passwd);
507 corvo 1.1 if not alreadyAdded:
508     self.logger.log(Logger.INFO, "Adding destination "+host+':'+`port`+' '+passwd);
509 corvo 1.3 tempDestinations[destination] = copy.deepcopy(self.__defaultOptions); # have a different set of options for each dest.
510     self.destPrevData[destination] = {};
511 corvo 1.1 if options != self.__defaultOptions:
512     # we have to overwrite defaults with given options
513     for key, value in options.items():
514     self.logger.log(Logger.NOTICE, "Overwritting option: "+key+" = "+`value`);
515 corvo 1.3 tempDestinations[destination][key] = value;
516 corvo 1.1 else:
517 corvo 1.2 self.logger.log(Logger.NOTICE, "Destination "+host+":"+str(port)+" "+passwd+" already added. Skipping it");
518 corvo 1.1
519     def __initializeFromFile (self, confFileName, tempDestinations):
520     """
521     Load destinations from confFileName file. If it's an URL (starts with "http://")
522     load configuration from there. Put all destinations in tempDestinations hash.
523    
524     Calls addDestination for each line that doesn't start with # and
525     has non-whitespace characters on it
526     """
527     try:
528     if confFileName.find ("http://") == 0:
529     confFile = urllib2.urlopen (confFileName)
530     else:
531     confFile = open (confFileName)
532     except urllib2.HTTPError, e:
533     self.logger.log(Logger.ERROR, "Cannot open "+confFileName);
534     if e.code == 401:
535     self.logger.log(Logger.ERROR, 'HTTPError: not authorized.');
536     elif e.code == 404:
537     self.logger.log(Logger.ERROR, 'HTTPError: not found.');
538     elif e.code == 503:
539     self.logger.log(Logger.ERROR, 'HTTPError: service unavailable.');
540     else:
541     self.logger.log(Logger.ERROR, 'HTTPError: unknown error.');
542     return
543     except urllib2.URLError, ex:
544     self.logger.log(Logger.ERROR, "Cannot open "+confFileName);
545     self.logger.log(Logger.ERROR, "URL Error: "+str(ex.reason[1]));
546     return
547     except IOError, ex:
548     self.logger.log(Logger.ERROR, "Cannot open "+confFileName);
549     self.logger.log(Logger.ERROR, "IOError: "+str(ex));
550     return
551     self.logger.log(Logger.INFO, "Adding destinations from "+confFileName);
552     dests = []
553     opts = {}
554     while(True):
555     line = confFile.readline();
556     if line == '':
557     break;
558     line = line.strip()
559     self.logger.log(Logger.DEBUG, "Reading line "+line);
560     if (len(line) == 0) or (line[0] == '#'):
561     continue
562     elif line.startswith("xApMon_"):
563     m = re.match("xApMon_(.*)", line);
564     if m != None:
565     m = re.match("(\S+)\s*=\s*(\S+)", m.group(1));
566     if m != None:
567     param = m.group(1); value = m.group(2);
568     if(value.upper() == "ON"):
569     value = True;
570     elif(value.upper() == "OFF"):
571     value = False;
572     elif(param.endswith("_interval")):
573     value = int(value);
574     if param == "loglevel":
575     self.logger.setLogLevel(value);
576 corvo 1.2 elif param == "maxMsgRate":
577     self.setMaxMsgRate(int(value));
578 corvo 1.1 elif param == "conf_recheck":
579     self.configRecheck = value;
580     elif param == "recheck_interval":
581     self.configRecheckInterval = value;
582     elif param.endswith("_data_sent"):
583     pass; # don't reset time in sys/job/general/_data_sent
584     else:
585     opts[param] = value;
586     else:
587     dests.append(line);
588 corvo 1.3
589 corvo 1.1 confFile.close ()
590     for line in dests:
591     self.__addDestination(line, tempDestinations, opts)
592    
593     ###############################################################################################
594     # Internal functions - Background monitor thread
595     ###############################################################################################
596    
597     def __bgMonitor (self):
598 corvo 1.2 while not self.__bgMonitorEvent.isSet():
599     self.__bgMonitorEvent.wait(10);
600     if self.__bgMonitorEvent.isSet():
601     return;
602 corvo 1.1 if self.performBgMonitoring:
603 corvo 1.3 self.sendBgMonitoring() # send only if the interval has elapsed
604 corvo 1.1
605     ###############################################################################################
606     # Internal helper functions
607     ###############################################################################################
608    
609 corvo 1.3 def __directSendParams (self, senderRef, destination, clusterName, nodeName, timeStamp, params):
610    
611     if senderRef=={}:
612     self.logger.log(Logger.WARNING, "Not sending undefined parameters!");
613     return;
614 corvo 1.2
615     if self.__shouldSend() == False:
616     return;
617    
618 corvo 1.3 if destination == None:
619     self.logger.log(Logger.WARNING, "Destination is None");
620     return;
621    
622 corvo 1.1 host, port, passwd = destination
623 corvo 1.3 senderRef['SEQ_NR'] = (senderRef['SEQ_NR'] + 1) % 2000000000; # wrap around 2 mld
624    
625     xdrPacker = xdrlib.Packer ();
626 corvo 1.1 self.logger.log(Logger.DEBUG, "Building XDR packet for ["+str(clusterName)+"] <"+str(nodeName)+"> len:"+str(len(params)));
627 corvo 1.3
628 corvo 1.1 xdrPacker.pack_string ("v:"+self.__version+"p:"+passwd)
629 corvo 1.3
630     xdrPacker.pack_int (senderRef['INSTANCE_ID']);
631     xdrPacker.pack_int (senderRef['SEQ_NR']);
632    
633     xdrPacker.pack_string (clusterName);
634     xdrPacker.pack_string (nodeName);
635     xdrPacker.pack_int (len(params));
636    
637 corvo 1.1 if type(params) == type( {} ):
638     for name, value in params.iteritems():
639     self.__packParameter(xdrPacker, name, value)
640     elif type(params) == type( [] ):
641     for name, value in params:
642     self.logger.log(Logger.DEBUG, "Adding parameter "+name+" = "+str(value));
643     self.__packParameter(xdrPacker, name, value)
644     else:
645     self.logger.log(Logger.WARNING, "Unsupported params type in sendParameters: " + str(type(params)));
646 corvo 1.3 if (timeStamp != None) and (timeStamp > 0):
647 corvo 1.1 xdrPacker.pack_int(timeStamp);
648 corvo 1.3
649 corvo 1.1 buffer = xdrPacker.get_buffer();
650     # send this buffer to the destination, using udp datagrams
651     try:
652     self.__udpSocket.sendto(buffer, (host, port))
653 corvo 1.3 self.logger.log(Logger.DEBUG, "Packet sent to "+host+":"+str(port)+" "+passwd);
654 corvo 1.1 except socket.error, msg:
655     self.logger.log(Logger.ERROR, "Cannot send packet to "+host+":"+str(port)+" "+passwd+": "+str(msg[1]));
656     xdrPacker.reset()
657    
658     def __packParameter(self, xdrPacker, name, value):
659 corvo 1.3 if (name is None) or (name is ""):
660     self.logger.log(Logger.WARNING, "Undefine parameter name.");
661     return;
662     if (value is None):
663     self.logger.log(Logger.WARNING, "Ignore " + str(name)+ " parameter because of None value");
664     return;
665 corvo 1.1 try:
666     typeValue = self.__valueTypes[type(value)]
667     xdrPacker.pack_string (name)
668     xdrPacker.pack_int (typeValue)
669     self.__packFunctions[typeValue] (xdrPacker, value)
670     self.logger.log(Logger.DEBUG, "Adding parameter "+str(name)+" = "+str(value));
671     except Exception, ex:
672     print "ApMon: error packing %s = %s; got %s" % (name, str(value), ex)
673 corvo 1.3
674 corvo 1.1 # Destructor
675     def __del__(self):
676 corvo 1.3 if not self.__freed:
677     self.free();
678 corvo 1.2
679     # Decide if the current datagram should be sent.
680     # This decision is based on the number of messages previously sent.
681     def __shouldSend(self):
682     now = long(time.time());
683     if now != self.__crtTime :
684     # new time
685     # update previous counters;
686     self.__prvSent = self.__hWeight * self.__prvSent + (1.0 - self.__hWeight) * self.__crtSent / (now - self.__crtTime);
687     self.__prvTime = self.__crtTime;
688     self.logger.log(Logger.DEBUG, "previously sent: " + str(self.__crtSent) + "; dropped: " + str(self.__crtDrop));
689     # reset current counter
690     self.__crtTime = now;
691     self.__crtSent = 0;
692     self.__crtDrop = 0;
693    
694     # compute the history
695     valSent = self.__prvSent * self.__hWeight + self.__crtSent * (1 - self.__hWeight);
696    
697     doSend = True;
698    
699     # when we should start dropping messages
700     level = self.maxMsgRate - self.maxMsgRate / 10;
701    
702     if valSent > (self.maxMsgRate - level) :
703     if random.randint(0,self.maxMsgRate / 10) >= (self.maxMsgRate - valSent):
704     doSend = False;
705    
706     # counting sent and dropped messages
707     if doSend:
708     self.__crtSent+=1;
709     else:
710     self.__crtDrop+=1;
711    
712     return doSend;
713    
714 corvo 1.1 ################################################################################################
715     # Private variables. Don't touch
716     ################################################################################################
717    
718     __valueTypes = {
719     type("string"): 0, # XDR_STRING (see ApMon.h from C/C++ ApMon version)
720     type(1): 2, # XDR_INT32
721     type(1.0): 5}; # XDR_REAL64
722    
723     __packFunctions = {
724     0: xdrlib.Packer.pack_string,
725     2: xdrlib.Packer.pack_int,
726     5: xdrlib.Packer.pack_double }
727    
728     __defaultPort = 8884
729 corvo 1.3 __version = "2.2.1-py" # apMon version number
730 corvo 1.1