ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/apmon.py
Revision: 1.4
Committed: Tue Mar 21 16:05:47 2006 UTC (19 years, 1 month ago) by corvo
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_1_0_7, CRAB_1_0_7_pre1
Changes since 1.3: +18 -16 lines
Log Message:
Fixed thread problems

File Contents

# User Rev Content
1 corvo 1.1
2     """
3     * ApMon - Application Monitoring Tool
4 corvo 1.4 * Version: 2.2.2
5 corvo 1.1 *
6 corvo 1.3 * Copyright (C) 2006 California Institute of Technology
7 corvo 1.1 *
8     * Permission is hereby granted, free of charge, to use, copy and modify
9     * this software and its documentation (the "Software") for any
10     * purpose, provided that existing copyright notices are retained in
11     * all copies and that this notice is included verbatim in any distributions
12     * or substantial portions of the Software.
13     * This software is a part of the MonALISA framework (http://monalisa.cacr.caltech.edu).
14     * Users of the Software are asked to feed back problems, benefits,
15     * and/or suggestions about the software to the MonALISA Development Team
16     * (developers@monalisa.cern.ch). Support for this software - fixing of bugs,
17     * incorporation of new features - is done on a best effort basis. All bug
18     * fixes and enhancements will be made available under the same terms and
19     * conditions as the original software,
20    
21     * IN NO EVENT SHALL THE AUTHORS OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
22     * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
23     * OF THE USE OF THIS SOFTWARE, ITS DOCUMENTATION, OR ANY DERIVATIVES THEREOF,
24     * EVEN IF THE AUTHORS HAVE BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
25    
26     * THE AUTHORS AND DISTRIBUTORS SPECIFICALLY DISCLAIM ANY WARRANTIES,
27     * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY,
28     * FITNESS FOR A PARTICULAR PURPOSE, AND NON-INFRINGEMENT. THIS SOFTWARE IS
29     * PROVIDED ON AN "AS IS" BASIS, AND THE AUTHORS AND DISTRIBUTORS HAVE NO
30     * OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR
31     * MODIFICATIONS.
32     """
33    
34     """
35     apmon.py
36    
37     This is a python implementation for the ApMon API for sending
38     data to the MonALISA service.
39    
40     For further details about ApMon please see the C/C++ or Java documentation
41     You can find a sample usage of this module in apmTest.py.
42    
43     Note that the parameters must be either integers(32 bits) or doubles(64 bits).
44     Sending strings is supported, but they will not be stored in the
45     farm's store nor shown in the farm's window in the MonALISA client.
46     """
47    
48     import re
49     import xdrlib
50     import socket
51     import urllib2
52     import threading
53     import time
54     import Logger
55     import ProcInfo
56 corvo 1.2 import random
57 corvo 1.3 import copy
58 corvo 1.1
59     #__all__ = ["ApMon"]
60    
61 corvo 1.3 #__debug = False # self.destPrevData[destination];set this to True to be verbose
62 corvo 1.1
63     class ApMon:
64     """
65     Main class for sending monitoring data to a MonaLisa module.
66     One or more destinations can be chosen for the data. See constructor.
67    
68     The data is packed in UDP datagrams, using XDR. The following fields are sent:
69     - version & password (string)
70     - cluster name (string)
71     - node name (string)
72     - number of parameters (int)
73     - for each parameter:
74     - name (string)
75     - value type (int)
76     - value
77     - optionally a (int) with the given timestamp
78    
79     Attributes (public):
80     - destinations - a list containing (ip, port, password) tuples
81     - configAddresses - list with files and urls from where the config is read
82     - configRecheckInterval - period, in seconds, to check for changes
83     in the configAddresses list
84     - configRecheck - boolean - whether to recheck periodically for changes
85     in the configAddresses list
86     """
87    
88     __defaultOptions = {
89     'job_monitoring': True, # perform (or not) job monitoring
90     'job_interval' : 10, # at this interval (in seconds)
91     'job_data_sent' : 0, # time from Epoch when job information was sent; don't touch!
92    
93     'job_cpu_time' : True, # elapsed time from the start of this job in seconds
94     'job_run_time' : True, # processor time spent running this job in seconds
95     'job_cpu_usage' : True, # current percent of the processor used for this job, as reported by ps
96     'job_virtualmem': True, # size in JB of the virtual memory occupied by the job, as reported by ps
97     'job_rss' : True, # size in KB of the resident image size of the job, as reported by ps
98     'job_mem_usage' : True, # percent of the memory occupied by the job, as reported by ps
99     'job_workdir_size': True, # size in MB of the working directory of the job
100     'job_disk_total': True, # size in MB of the total size of the disk partition containing the working directory
101     'job_disk_used' : True, # size in MB of the used disk partition containing the working directory
102     'job_disk_free' : True, # size in MB of the free disk partition containing the working directory
103     'job_disk_usage': True, # percent of the used disk partition containing the working directory
104 corvo 1.2 'job_open_files': True, # number of open file descriptors
105 corvo 1.1
106     'sys_monitoring': True, # perform (or not) system monitoring
107     'sys_interval' : 10, # at this interval (in seconds)
108     'sys_data_sent' : 0, # time from Epoch when system information was sent; don't touch!
109    
110     'sys_cpu_usr' : False, # cpu-usage information
111     'sys_cpu_sys' : False, # all these will produce coresponding paramas without "sys_"
112     'sys_cpu_nice' : False,
113     'sys_cpu_idle' : False,
114     'sys_cpu_usage' : True,
115     'sys_load1' : True, # system load information
116     'sys_load5' : True,
117     'sys_load15' : True,
118     'sys_mem_used' : False, # memory usage information
119     'sys_mem_free' : False,
120     'sys_mem_usage' : True,
121     'sys_pages_in' : False,
122     'sys_pages_out' : False,
123     'sys_swap_used' : True, # swap usage information
124     'sys_swap_free' : False,
125     'sys_swap_usage': True,
126     'sys_swap_in' : False,
127     'sys_swap_out' : False,
128     'sys_net_in' : True, # network transfer in kBps
129     'sys_net_out' : True, # these will produce params called ethX_in, ethX_out, ethX_errs
130     'sys_net_errs' : False, # for each eth interface
131 corvo 1.3 'sys_net_sockets' : True, # number of opened sockets for each proto => sockets_tcp/udp/unix ...
132     'sys_net_tcp_details' : True, # number of tcp sockets in each state => sockets_tcp_LISTEN, ...
133 corvo 1.1 'sys_processes' : True,
134     'sys_uptime' : True, # uptime of the machine, in days (float number)
135    
136     'general_info' : True, # send (or not) general host information once every 2 $sys_interval seconds
137     'general_data_sent': 0, # time from Epoch when general information was sent; don't touch!
138    
139     'hostname' : True,
140     'ip' : True, # will produce ethX_ip params for each interface
141     'cpu_MHz' : True,
142     'no_CPUs' : True, # number of CPUs
143     'total_mem' : True,
144 corvo 1.2 'total_swap' : True,
145     'cpu_vendor_id' : True,
146     'cpu_family' : True,
147     'cpu_model' : True,
148     'cpu_model_name': True,
149     'bogomips' : True};
150 corvo 1.1
151 corvo 1.3 def __init__ (self, initValue, defaultLogLevel = Logger.INFO):
152 corvo 1.1 """
153     Class constructor:
154     - if initValue is a string, put it in configAddresses and load destinations
155     from the file named like that. if it starts with "http://", the configuration
156     is loaded from that URL. For background monitoring, given parameters will overwrite defaults
157    
158     - if initValue is a list, put its contents in configAddresses and create
159     the list of destinations from all those sources. For background monitoring,
160     given parameters will overwrite defaults (see __defaultOptions)
161    
162     - if initValue is a tuple (of strings), initialize destinations with that values.
163     Strings in this tuple have this form: "{hostname|ip}[:port][ passwd]", the
164     default port being 8884 and the default password being "". Background monitoring will be
165     enabled sending the parameters active from __defaultOptions (see end of file)
166    
167     - if initValue is a hash (key = string(hostname|ip[:port][ passwd]),
168     val = hash{'param_name': True/False, ...}) the given options for each destination
169     will overwrite the default parameters (see __defaultOptions)
170     """
171 corvo 1.2 self.destinations = {} # empty, by default; key = tuple (host, port, pass) ; val = hash {"param_mame" : True/False, ...}
172 corvo 1.3 self.destPrevData = {} # empty, by defaul; key = tuple (host, port, pass) ; val = hash {"param_mame" : value, ...}
173 corvo 1.2 self.configAddresses = [] # empty, by default; list of files/urls from where we read config
174     self.configRecheckInterval = 120 # 2 minutes
175     self.configRecheck = True # enabled by default
176     self.performBgMonitoring = True # by default, perform background monitoring
177     self.monitoredJobs = {} # Monitored jobs; key = pid; value = hash with
178 corvo 1.3 self.maxMsgRate = 20; # Maximum number of messages allowed to be sent per second
179     self.sender = {'INSTANCE_ID': random.randint(0,0x7FFFFFFE), 'SEQ_NR': 0};
180 corvo 1.2 self.__defaultUserCluster = "ApMon_UserSend";
181     self.__defaultUserNode = socket.getfqdn();
182     self.__defaultSysMonCluster = "ApMon_SysMon";
183     self.__defaultSysMonNode = socket.getfqdn();
184     # don't touch these:
185 corvo 1.3 self.__freed = False
186 corvo 1.2 self.__initializedOK = True
187     self.__udpSocket = None
188     self.__configUpdateLock = threading.Lock()
189     self.__configUpdateEvent = threading.Event()
190 corvo 1.4 self.__configUpdateFinished = threading.Event()
191 corvo 1.2 self.__bgMonitorLock = threading.Lock()
192     self.__bgMonitorEvent = threading.Event()
193 corvo 1.4 self.__bgMonitorFinished = threading.Event()
194 corvo 1.2 # don't allow a user to send more than MAX_MSG messages per second, in average
195     self.__crtTime = 0;
196     self.__prvTime = 0;
197     self.__prvSent = 0;
198     self.__prvDrop = 0;
199     self.__crtSent = 0;
200     self.__crtDrop = 0;
201     self.__hWeight = 0.92;
202 corvo 1.3 self.logger = Logger.Logger(defaultLogLevel)
203 corvo 1.1 if type(initValue) == type("string"):
204     self.configAddresses.append(initValue)
205     self.__reloadAddresses()
206     elif type(initValue) == type([]):
207     self.configAddresses = initValue
208     self.__reloadAddresses()
209     elif type(initValue) == type(()):
210     for dest in initValue:
211     self.__addDestination (dest, self.destinations)
212     elif type(initValue) == type({}):
213     for dest, opts in initValue.items():
214     self.__addDestination (dest, self.destinations, opts)
215    
216     self.__initializedOK = (len (self.destinations) > 0)
217     if not self.__initializedOK:
218     self.logger.log(Logger.ERROR, "Failed to initialize. No destination defined.");
219 corvo 1.3 # self.__defaultClusterName = None
220     # self.__defaultNodeName = self.getMyHo
221 corvo 1.1 if self.__initializedOK:
222     self.__udpSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
223     if len(self.configAddresses) > 0:
224     # if there are addresses that need to be monitored,
225     # start config checking and reloading thread
226     th = threading.Thread(target=self.__configLoader)
227     th.setDaemon(True) # this is a daemon thread
228     th.start()
229     # create the ProcInfo instance
230     self.procInfo = ProcInfo.ProcInfo(self.logger);
231 corvo 1.3 # self.procInfo.update();
232 corvo 1.1 # start the background monitoring thread
233     th = threading.Thread(target=self.__bgMonitor);
234     th.setDaemon(True);
235     th.start();
236    
237    
238     def sendParams (self, params):
239     """
240     Send multiple parameters to MonALISA, with default (last given) cluser and node names.
241     """
242     self.sendTimedParams (-1, params)
243    
244     def sendTimedParams (self, timeStamp, params):
245     """
246     Send multiple parameters, specifying the time for them, with default (last given) cluster and node names.
247     (See sendTimedParameters for more details)
248     """
249     self.sendTimedParameters (None, None, timeStamp, params);
250    
251     def sendParameter (self, clusterName, nodeName, paramName, paramValue):
252     """
253     Send a single parameter to MonALISA.
254     """
255     self.sendTimedParameter(clusterName, nodeName, -1, paramName, paramValue);
256    
257     def sendTimedParameter (self, clusterName, nodeName, timeStamp, paramName, paramValue):
258     """
259     Send a single parameter, with a given time.
260     """
261     self.sendTimedParameters (clusterName, nodeName, timeStamp, {paramName:paramValue})
262    
263     def sendParameters (self, clusterName, nodeName, params):
264     """
265     Send multiple parameters specifying cluster and node name for them
266     """
267     self.sendTimedParameters (clusterName, nodeName, -1, params);
268    
269     def sendTimedParameters (self, clusterName, nodeName, timeStamp, params):
270     """
271     Send multiple monitored parameters to MonALISA.
272    
273     - clusterName is the name of the cluster being monitored. The first
274     time this function is called, this paramenter must not be None. Then,
275     it can be None; last given clusterName will be used instead.
276     - nodeName is the name of the node for which are the parameters. If this
277     is None, the full hostname of this machine will be sent instead.
278     - timeStamp, if > 0, is given time for the parameters. This is in seconds from Epoch.
279     Note that this option should be used only if you are sure about the time for the result.
280     Otherwize, the parameters will be assigned a correct time (obtained from NTP servers)
281     in MonALISA service. This option can be usefull when parsing logs, for example.
282     - params is a dictionary containing pairs with:
283     - key: parameter name
284     - value: parameter value, either int or float.
285     or params is a vector of tuples (key, value). This version can be used
286     in case you want to send the parameters in a given order.
287    
288     NOTE that python doesn't know about 32-bit floats (only 64-bit floats!)
289     """
290     if not self.__initializedOK:
291     self.logger.log(Logger.WARNING, "Not initialized correctly. Message NOT sent!");
292     return
293 corvo 1.3 if (clusterName == None) or (clusterName == ""):
294 corvo 1.1 clusterName = self.__defaultUserCluster
295     else:
296     self.__defaultUserCluster = clusterName
297     if nodeName == None:
298     nodeName = self.__defaultUserNode
299     else:
300     self.__defaultUserNode = nodeName
301 corvo 1.3 self.__configUpdateLock.acquire();
302 corvo 1.1 for dest in self.destinations.keys():
303 corvo 1.3 self.__directSendParams(self.sender, dest, clusterName, nodeName, timeStamp, params);
304     self.__configUpdateLock.release();
305 corvo 1.1
306     def addJobToMonitor (self, pid, workDir, clusterName, nodeName):
307     """
308     Add a new job to monitor.
309     """
310     self.__bgMonitorLock.acquire();
311     self.monitoredJobs[pid] = {};
312     self.monitoredJobs[pid]['CLUSTER_NAME'] = clusterName;
313     self.monitoredJobs[pid]['NODE_NAME'] = nodeName;
314     self.procInfo.addJobToMonitor(pid, workDir);
315     self.__bgMonitorLock.release();
316    
317     def removeJobToMonitor (self, pid):
318     """
319     Remove a job from being monitored.
320     """
321     self.__bgMonitorLock.acquire();
322     self.procInfo.removeJobToMonitor(pid);
323     del self.monitoredJobs[pid];
324     self.__bgMonitorLock.release();
325    
326     def setMonitorClusterNode (self, clusterName, nodeName):
327     """
328     Set the cluster and node names where to send system related information.
329     """
330     self.__bgMonitorLock.acquire();
331 corvo 1.3 if (clusterName != None) and (clusterName != ""):
332     self.__defaultSysMonCluster = clusterName;
333     if (nodeName != None) and (nodeName != ""):
334     self.__defaultSysMonNode = nodeName;
335 corvo 1.1 self.__bgMonitorLock.release();
336    
337     def enableBgMonitoring (self, onOff):
338     """
339     Enable or disable background monitoring. Note that background monitoring information
340     can still be sent if user calls the sendBgMonitoring method.
341     """
342     self.performBgMonitoring = onOff;
343 corvo 1.3
344     def sendBgMonitoring (self, mustSend = False):
345 corvo 1.1 """
346 corvo 1.3 Send background monitoring about system and jobs to all interested destinations.
347     If mustSend == True, the information is sent regardles of the elapsed time since last sent
348     If mustSend == False, the data is sent only if the required interval has passed since last sent
349 corvo 1.1 """
350     self.__bgMonitorLock.acquire();
351     now = int(time.time());
352 corvo 1.3 updatedProcInfo = False;
353     for destination, options in self.destinations.iteritems():
354 corvo 1.1 sysParams = [];
355     jobParams = [];
356 corvo 1.3 prevRawData = self.destPrevData[destination];
357 corvo 1.1 # for each destination and its options, check if we have to report any background monitoring data
358 corvo 1.3 if(options['sys_monitoring'] and (mustSend or options['sys_data_sent'] + options['sys_interval'] <= now)):
359 corvo 1.1 for param, active in options.items():
360     m = re.match("sys_(.+)", param);
361     if(m != None and active):
362     param = m.group(1);
363     if not (param == 'monitoring' or param == 'interval' or param == 'data_sent'):
364     sysParams.append(param)
365     options['sys_data_sent'] = now;
366 corvo 1.3 if(options['job_monitoring'] and (mustSend or options['job_data_sent'] + options['job_interval'] <= now)):
367 corvo 1.1 for param, active in options.items():
368     m = re.match("job_(.+)", param);
369     if(m != None and active):
370     param = m.group(1);
371     if not (param == 'monitoring' or param == 'interval' or param == 'data_sent'):
372     jobParams.append(param);
373     options['job_data_sent'] = now;
374 corvo 1.3 if(options['general_info'] and (mustSend or options['general_data_sent'] + 2 * int(options['sys_interval']) <= now)):
375 corvo 1.1 for param, active in options.items():
376     if not (param.startswith("sys_") or param.startswith("job_")) and active:
377     if not (param == 'general_info' or param == 'general_data_sent'):
378     sysParams.append(param);
379 corvo 1.3 options['general_data_sent'] = now;
380    
381     if (not updatedProcInfo) and ((len(sysParams) > 0) or (len(jobParams) > 0) ):
382     self.procInfo.update();
383     updatedProcInf = True;
384    
385 corvo 1.1 sysResults = {}
386     if(len(sysParams) > 0):
387 corvo 1.3 sysResults = self.procInfo.getSystemData(sysParams, prevRawData);
388     self.updateLastSentTime(options, self.destinations[destination]);
389     if( (type(sysResults) == type( {} )) and len(sysResults.keys()) > 0):
390     self.__directSendParams(self.sender, destination, self.__defaultSysMonCluster, self.__defaultSysMonNode, -1, sysResults);
391     if( (type(sysResults) == type( [] )) and len(sysResults) > 0):
392     self.__directSendParams(self.sender, destination, self.__defaultSysMonCluster, self.__defaultSysMonNode, -1, sysResults);
393 corvo 1.1 for pid, props in self.monitoredJobs.items():
394     jobResults = {};
395     if(len(jobParams) > 0):
396     jobResults = self.procInfo.getJobData(pid, jobParams);
397     if(len(jobResults) > 0):
398 corvo 1.3 self.__directSendParams(self.sender, destination, props['CLUSTER_NAME'], props['NODE_NAME'], -1, jobResults);
399 corvo 1.1 self.__bgMonitorLock.release();
400    
401 corvo 1.3 # copy the time when last data was sent
402     def updateLastSentTime(self, srcOpts, dstOpts):
403     if srcOpts.has_key('general_data_sent'):
404     dstOpts['general_data_sent'] = srcOpts['general_data_sent'];
405     if srcOpts.has_key('sys_data_sent'):
406     dstOpts['sys_data_sent'] = srcOpts['sys_data_sent'];
407     if srcOpts.has_key('job_data_sent'):
408     dstOpts['job_data_sent'] = srcOpts['job_data_sent'];
409    
410 corvo 1.1 def setLogLevel (self, strLevel):
411     """
412     Change the log level. Given level is a string, one of 'FATAL', 'ERROR', 'WARNING',
413     'INFO', 'NOTICE', 'DEBUG'.
414     """
415     self.logger.setLogLevel(strLevel);
416    
417 corvo 1.2 def setMaxMsgRate(self, rate):
418     """
419     Set the maximum number of messages that can be sent, per second.
420     """
421     self.maxMsgRate = rate;
422     self.logger.log(Logger.DEBUG, "Setting maxMsgRate to: " + str(rate));
423    
424     def free(self):
425     """
426     Stop background threands, close opened sockets. You have to use this function if you want to
427     free all the resources that ApMon takes, and allow it to be garbage-collected.
428     """
429 corvo 1.4 if len(self.configAddresses) > 0:
430     self.__configUpdateEvent.set()
431     self.__configUpdateFinished.wait()
432     self.__bgMonitorEvent.set()
433     self.__bgMonitorFinished.wait()
434    
435 corvo 1.2 if self.__udpSocket != None:
436     self.logger.log(Logger.DEBUG, "Closing UDP socket on ApMon object destroy.");
437     self.__udpSocket.close();
438 corvo 1.3 self.__udpSocket = None;
439     self.__freed = True
440 corvo 1.2
441    
442 corvo 1.1 #########################################################################################
443     # Internal functions - Config reloader thread
444     #########################################################################################
445    
446     def __configLoader(self):
447     """
448     Main loop of the thread that checks for changes and reloads the configuration
449     """
450 corvo 1.2 while not self.__configUpdateEvent.isSet():
451     self.__configUpdateEvent.wait(self.configRecheckInterval);
452     if self.__configUpdateEvent.isSet():
453 corvo 1.4 break
454 corvo 1.1 if self.configRecheck:
455     self.__reloadAddresses()
456     self.logger.log(Logger.DEBUG, "Config reloaded. Seleeping for "+`self.configRecheckInterval`+" sec.");
457 corvo 1.4 self.__configUpdateFinished.set();
458 corvo 1.1
459     def __reloadAddresses(self):
460     """
461     Refresh destinations hash, by loading data from all sources in configAddresses
462     """
463     newDestinations = {}
464     for src in self.configAddresses:
465     self.__initializeFromFile(src, newDestinations)
466     # avoid changing config in the middle of sending packets to previous destinations
467     self.__configUpdateLock.acquire()
468     self.destinations = newDestinations
469     self.__configUpdateLock.release()
470    
471     def __addDestination (self, aDestination, tempDestinations, options = __defaultOptions):
472     """
473     Add a destination to the list.
474    
475     aDestination is a string of the form "{hostname|ip}[:port] [passwd]" without quotes.
476     If the port is not given, it will be used the default port (8884)
477     If the password is missing, it will be considered an empty string
478     """
479     aDestination = aDestination.strip().replace('\t', ' ')
480     while aDestination != aDestination.replace(' ', ' '):
481     aDestination = aDestination.replace(' ', ' ')
482     sepPort = aDestination.find (':')
483     sepPasswd = aDestination.rfind (' ')
484     if sepPort >= 0:
485     host = aDestination[0:sepPort].strip()
486     if sepPasswd > sepPort + 1:
487     port = aDestination[sepPort+1:sepPasswd].strip()
488     passwd = aDestination[sepPasswd:].strip()
489     else:
490     port = aDestination[sepPort+1:].strip()
491     passwd = ""
492     else:
493     port = str(self.__defaultPort)
494     if sepPasswd >= 0:
495     host = aDestination[0:sepPasswd].strip()
496     passwd = aDestination[sepPasswd:].strip()
497     else:
498     host = aDestination.strip()
499     passwd = ""
500     if (not port.isdigit()):
501     self.logger.log(Logger.WARNING, "Bad value for port number "+`port`+" in "+aDestination+" destination");
502     return
503     alreadyAdded = False
504     port = int(port)
505     host = socket.gethostbyname(host) # convert hostnames to IP addresses to avoid suffocating DNSs
506     for h, p, w in tempDestinations.keys():
507     if (h == host) and (p == port):
508     alreadyAdded = True
509     break
510 corvo 1.3 destination = (host, port, passwd);
511 corvo 1.1 if not alreadyAdded:
512     self.logger.log(Logger.INFO, "Adding destination "+host+':'+`port`+' '+passwd);
513 corvo 1.3 tempDestinations[destination] = copy.deepcopy(self.__defaultOptions); # have a different set of options for each dest.
514     self.destPrevData[destination] = {};
515 corvo 1.1 if options != self.__defaultOptions:
516     # we have to overwrite defaults with given options
517     for key, value in options.items():
518     self.logger.log(Logger.NOTICE, "Overwritting option: "+key+" = "+`value`);
519 corvo 1.3 tempDestinations[destination][key] = value;
520 corvo 1.1 else:
521 corvo 1.2 self.logger.log(Logger.NOTICE, "Destination "+host+":"+str(port)+" "+passwd+" already added. Skipping it");
522 corvo 1.1
523     def __initializeFromFile (self, confFileName, tempDestinations):
524     """
525     Load destinations from confFileName file. If it's an URL (starts with "http://")
526     load configuration from there. Put all destinations in tempDestinations hash.
527    
528     Calls addDestination for each line that doesn't start with # and
529     has non-whitespace characters on it
530     """
531     try:
532     if confFileName.find ("http://") == 0:
533     confFile = urllib2.urlopen (confFileName)
534     else:
535     confFile = open (confFileName)
536     except urllib2.HTTPError, e:
537     self.logger.log(Logger.ERROR, "Cannot open "+confFileName);
538     if e.code == 401:
539     self.logger.log(Logger.ERROR, 'HTTPError: not authorized.');
540     elif e.code == 404:
541     self.logger.log(Logger.ERROR, 'HTTPError: not found.');
542     elif e.code == 503:
543     self.logger.log(Logger.ERROR, 'HTTPError: service unavailable.');
544     else:
545     self.logger.log(Logger.ERROR, 'HTTPError: unknown error.');
546     return
547     except urllib2.URLError, ex:
548     self.logger.log(Logger.ERROR, "Cannot open "+confFileName);
549     self.logger.log(Logger.ERROR, "URL Error: "+str(ex.reason[1]));
550     return
551     except IOError, ex:
552     self.logger.log(Logger.ERROR, "Cannot open "+confFileName);
553     self.logger.log(Logger.ERROR, "IOError: "+str(ex));
554     return
555     self.logger.log(Logger.INFO, "Adding destinations from "+confFileName);
556     dests = []
557     opts = {}
558     while(True):
559     line = confFile.readline();
560     if line == '':
561     break;
562     line = line.strip()
563     self.logger.log(Logger.DEBUG, "Reading line "+line);
564     if (len(line) == 0) or (line[0] == '#'):
565     continue
566     elif line.startswith("xApMon_"):
567     m = re.match("xApMon_(.*)", line);
568     if m != None:
569     m = re.match("(\S+)\s*=\s*(\S+)", m.group(1));
570     if m != None:
571     param = m.group(1); value = m.group(2);
572     if(value.upper() == "ON"):
573     value = True;
574     elif(value.upper() == "OFF"):
575     value = False;
576     elif(param.endswith("_interval")):
577     value = int(value);
578     if param == "loglevel":
579     self.logger.setLogLevel(value);
580 corvo 1.2 elif param == "maxMsgRate":
581     self.setMaxMsgRate(int(value));
582 corvo 1.1 elif param == "conf_recheck":
583     self.configRecheck = value;
584     elif param == "recheck_interval":
585     self.configRecheckInterval = value;
586     elif param.endswith("_data_sent"):
587     pass; # don't reset time in sys/job/general/_data_sent
588     else:
589     opts[param] = value;
590     else:
591     dests.append(line);
592 corvo 1.3
593 corvo 1.1 confFile.close ()
594     for line in dests:
595     self.__addDestination(line, tempDestinations, opts)
596    
597     ###############################################################################################
598     # Internal functions - Background monitor thread
599     ###############################################################################################
600    
601     def __bgMonitor (self):
602 corvo 1.2 while not self.__bgMonitorEvent.isSet():
603 corvo 1.4 self.__bgMonitorEvent.wait(10)
604 corvo 1.2 if self.__bgMonitorEvent.isSet():
605 corvo 1.4 break
606 corvo 1.1 if self.performBgMonitoring:
607 corvo 1.3 self.sendBgMonitoring() # send only if the interval has elapsed
608 corvo 1.4 self.__bgMonitorFinished.set()
609 corvo 1.1
610     ###############################################################################################
611     # Internal helper functions
612     ###############################################################################################
613    
614 corvo 1.3 def __directSendParams (self, senderRef, destination, clusterName, nodeName, timeStamp, params):
615    
616 corvo 1.2 if self.__shouldSend() == False:
617 corvo 1.4 self.logger.log(Logger.DEBUG, "Dropping packet since rate is too fast!");
618 corvo 1.2 return;
619    
620 corvo 1.3 if destination == None:
621     self.logger.log(Logger.WARNING, "Destination is None");
622     return;
623    
624 corvo 1.1 host, port, passwd = destination
625 corvo 1.3 senderRef['SEQ_NR'] = (senderRef['SEQ_NR'] + 1) % 2000000000; # wrap around 2 mld
626    
627     xdrPacker = xdrlib.Packer ();
628 corvo 1.4 self.logger.log(Logger.DEBUG, "Building XDR packet for ["+str(clusterName)+"/"+str(nodeName)+"] <"+str(senderRef['SEQ_NR'])+"/"+str(senderRef['INSTANCE_ID'])+"> len:"+str(len(params)));
629 corvo 1.3
630 corvo 1.1 xdrPacker.pack_string ("v:"+self.__version+"p:"+passwd)
631 corvo 1.3
632     xdrPacker.pack_int (senderRef['INSTANCE_ID']);
633     xdrPacker.pack_int (senderRef['SEQ_NR']);
634    
635     xdrPacker.pack_string (clusterName);
636     xdrPacker.pack_string (nodeName);
637     xdrPacker.pack_int (len(params));
638    
639 corvo 1.1 if type(params) == type( {} ):
640     for name, value in params.iteritems():
641     self.__packParameter(xdrPacker, name, value)
642     elif type(params) == type( [] ):
643     for name, value in params:
644     self.logger.log(Logger.DEBUG, "Adding parameter "+name+" = "+str(value));
645     self.__packParameter(xdrPacker, name, value)
646     else:
647     self.logger.log(Logger.WARNING, "Unsupported params type in sendParameters: " + str(type(params)));
648 corvo 1.3 if (timeStamp != None) and (timeStamp > 0):
649 corvo 1.1 xdrPacker.pack_int(timeStamp);
650 corvo 1.3
651 corvo 1.1 buffer = xdrPacker.get_buffer();
652     # send this buffer to the destination, using udp datagrams
653     try:
654     self.__udpSocket.sendto(buffer, (host, port))
655 corvo 1.3 self.logger.log(Logger.DEBUG, "Packet sent to "+host+":"+str(port)+" "+passwd);
656 corvo 1.1 except socket.error, msg:
657     self.logger.log(Logger.ERROR, "Cannot send packet to "+host+":"+str(port)+" "+passwd+": "+str(msg[1]));
658     xdrPacker.reset()
659    
660     def __packParameter(self, xdrPacker, name, value):
661 corvo 1.3 if (name is None) or (name is ""):
662     self.logger.log(Logger.WARNING, "Undefine parameter name.");
663     return;
664     if (value is None):
665     self.logger.log(Logger.WARNING, "Ignore " + str(name)+ " parameter because of None value");
666     return;
667 corvo 1.1 try:
668     typeValue = self.__valueTypes[type(value)]
669     xdrPacker.pack_string (name)
670     xdrPacker.pack_int (typeValue)
671     self.__packFunctions[typeValue] (xdrPacker, value)
672     self.logger.log(Logger.DEBUG, "Adding parameter "+str(name)+" = "+str(value));
673     except Exception, ex:
674 corvo 1.4 self.logger.log(Logger.WARNING, "Error packing %s = %s; got %s" % (name, str(value), ex))
675 corvo 1.3
676 corvo 1.1 # Destructor
677     def __del__(self):
678 corvo 1.3 if not self.__freed:
679     self.free();
680 corvo 1.2
681     # Decide if the current datagram should be sent.
682     # This decision is based on the number of messages previously sent.
683     def __shouldSend(self):
684     now = long(time.time());
685     if now != self.__crtTime :
686     # new time
687     # update previous counters;
688     self.__prvSent = self.__hWeight * self.__prvSent + (1.0 - self.__hWeight) * self.__crtSent / (now - self.__crtTime);
689     self.__prvTime = self.__crtTime;
690     self.logger.log(Logger.DEBUG, "previously sent: " + str(self.__crtSent) + "; dropped: " + str(self.__crtDrop));
691     # reset current counter
692     self.__crtTime = now;
693     self.__crtSent = 0;
694     self.__crtDrop = 0;
695    
696     # compute the history
697     valSent = self.__prvSent * self.__hWeight + self.__crtSent * (1 - self.__hWeight);
698    
699     doSend = True;
700    
701     # when we should start dropping messages
702     level = self.maxMsgRate - self.maxMsgRate / 10;
703    
704     if valSent > (self.maxMsgRate - level) :
705     if random.randint(0,self.maxMsgRate / 10) >= (self.maxMsgRate - valSent):
706     doSend = False;
707    
708     # counting sent and dropped messages
709     if doSend:
710     self.__crtSent+=1;
711     else:
712     self.__crtDrop+=1;
713    
714     return doSend;
715    
716 corvo 1.1 ################################################################################################
717     # Private variables. Don't touch
718     ################################################################################################
719    
720     __valueTypes = {
721     type("string"): 0, # XDR_STRING (see ApMon.h from C/C++ ApMon version)
722     type(1): 2, # XDR_INT32
723     type(1.0): 5}; # XDR_REAL64
724    
725     __packFunctions = {
726     0: xdrlib.Packer.pack_string,
727     2: xdrlib.Packer.pack_int,
728     5: xdrlib.Packer.pack_double }
729    
730     __defaultPort = 8884
731 corvo 1.4 __version = "2.2.2-py" # apMon version number
732 corvo 1.1