ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/apmon.py
Revision: 1.5
Committed: Mon Apr 3 08:57:32 2006 UTC (19 years, 1 month ago) by corvo
Content type: text/x-python
Branch: MAIN
Changes since 1.4: +221 -129 lines
Log Message:
new apmon version

File Contents

# User Rev Content
1 corvo 1.1
2     """
3     * ApMon - Application Monitoring Tool
4 corvo 1.5 * Version: 2.2.4
5 corvo 1.1 *
6 corvo 1.3 * Copyright (C) 2006 California Institute of Technology
7 corvo 1.1 *
8     * Permission is hereby granted, free of charge, to use, copy and modify
9     * this software and its documentation (the "Software") for any
10     * purpose, provided that existing copyright notices are retained in
11     * all copies and that this notice is included verbatim in any distributions
12     * or substantial portions of the Software.
13     * This software is a part of the MonALISA framework (http://monalisa.cacr.caltech.edu).
14     * Users of the Software are asked to feed back problems, benefits,
15     * and/or suggestions about the software to the MonALISA Development Team
16     * (developers@monalisa.cern.ch). Support for this software - fixing of bugs,
17     * incorporation of new features - is done on a best effort basis. All bug
18     * fixes and enhancements will be made available under the same terms and
19     * conditions as the original software,
20    
21     * IN NO EVENT SHALL THE AUTHORS OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
22     * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
23     * OF THE USE OF THIS SOFTWARE, ITS DOCUMENTATION, OR ANY DERIVATIVES THEREOF,
24     * EVEN IF THE AUTHORS HAVE BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
25    
26     * THE AUTHORS AND DISTRIBUTORS SPECIFICALLY DISCLAIM ANY WARRANTIES,
27     * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY,
28     * FITNESS FOR A PARTICULAR PURPOSE, AND NON-INFRINGEMENT. THIS SOFTWARE IS
29     * PROVIDED ON AN "AS IS" BASIS, AND THE AUTHORS AND DISTRIBUTORS HAVE NO
30     * OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR
31     * MODIFICATIONS.
32     """
33    
34     """
35     apmon.py
36    
37     This is a python implementation for the ApMon API for sending
38     data to the MonALISA service.
39    
40     For further details about ApMon please see the C/C++ or Java documentation
41     You can find a sample usage of this module in apmTest.py.
42    
43     Note that the parameters must be either integers(32 bits) or doubles(64 bits).
44     Sending strings is supported, but they will not be stored in the
45     farm's store nor shown in the farm's window in the MonALISA client.
46     """
47    
48     import re
49     import xdrlib
50     import socket
51 corvo 1.5 import struct
52     import StringIO
53 corvo 1.1 import threading
54     import time
55     import Logger
56     import ProcInfo
57 corvo 1.2 import random
58 corvo 1.3 import copy
59 corvo 1.1
60     #__all__ = ["ApMon"]
61    
62 corvo 1.5 #__debug = False # set this to True to be verbose
63 corvo 1.1
64     class ApMon:
65     """
66     Main class for sending monitoring data to a MonaLisa module.
67     One or more destinations can be chosen for the data. See constructor.
68    
69     The data is packed in UDP datagrams, using XDR. The following fields are sent:
70     - version & password (string)
71     - cluster name (string)
72     - node name (string)
73     - number of parameters (int)
74     - for each parameter:
75     - name (string)
76     - value type (int)
77     - value
78     - optionally a (int) with the given timestamp
79    
80     Attributes (public):
81     - destinations - a list containing (ip, port, password) tuples
82     - configAddresses - list with files and urls from where the config is read
83     - configRecheckInterval - period, in seconds, to check for changes
84     in the configAddresses list
85     - configRecheck - boolean - whether to recheck periodically for changes
86     in the configAddresses list
87     """
88    
89     __defaultOptions = {
90     'job_monitoring': True, # perform (or not) job monitoring
91 corvo 1.5 'job_interval' : 120, # at this interval (in seconds)
92 corvo 1.1 'job_data_sent' : 0, # time from Epoch when job information was sent; don't touch!
93    
94     'job_cpu_time' : True, # elapsed time from the start of this job in seconds
95     'job_run_time' : True, # processor time spent running this job in seconds
96     'job_cpu_usage' : True, # current percent of the processor used for this job, as reported by ps
97     'job_virtualmem': True, # size in JB of the virtual memory occupied by the job, as reported by ps
98     'job_rss' : True, # size in KB of the resident image size of the job, as reported by ps
99     'job_mem_usage' : True, # percent of the memory occupied by the job, as reported by ps
100     'job_workdir_size': True, # size in MB of the working directory of the job
101     'job_disk_total': True, # size in MB of the total size of the disk partition containing the working directory
102     'job_disk_used' : True, # size in MB of the used disk partition containing the working directory
103     'job_disk_free' : True, # size in MB of the free disk partition containing the working directory
104     'job_disk_usage': True, # percent of the used disk partition containing the working directory
105 corvo 1.2 'job_open_files': True, # number of open file descriptors
106 corvo 1.1
107     'sys_monitoring': True, # perform (or not) system monitoring
108 corvo 1.5 'sys_interval' : 120, # at this interval (in seconds)
109 corvo 1.1 'sys_data_sent' : 0, # time from Epoch when system information was sent; don't touch!
110    
111 corvo 1.5 'sys_cpu_usr' : True, # cpu-usage information
112     'sys_cpu_sys' : True, # all these will produce coresponding paramas without "sys_"
113     'sys_cpu_nice' : True,
114     'sys_cpu_idle' : True,
115 corvo 1.1 'sys_cpu_usage' : True,
116     'sys_load1' : True, # system load information
117     'sys_load5' : True,
118     'sys_load15' : True,
119 corvo 1.5 'sys_mem_used' : True, # memory usage information
120     'sys_mem_free' : True,
121 corvo 1.1 'sys_mem_usage' : True,
122 corvo 1.5 'sys_pages_in' : True,
123     'sys_pages_out' : True,
124 corvo 1.1 'sys_swap_used' : True, # swap usage information
125 corvo 1.5 'sys_swap_free' : True,
126 corvo 1.1 'sys_swap_usage': True,
127 corvo 1.5 'sys_swap_in' : True,
128     'sys_swap_out' : True,
129 corvo 1.1 'sys_net_in' : True, # network transfer in kBps
130     'sys_net_out' : True, # these will produce params called ethX_in, ethX_out, ethX_errs
131 corvo 1.5 'sys_net_errs' : True, # for each eth interface
132 corvo 1.3 'sys_net_sockets' : True, # number of opened sockets for each proto => sockets_tcp/udp/unix ...
133     'sys_net_tcp_details' : True, # number of tcp sockets in each state => sockets_tcp_LISTEN, ...
134 corvo 1.1 'sys_processes' : True,
135     'sys_uptime' : True, # uptime of the machine, in days (float number)
136    
137 corvo 1.5 'general_info' : True, # send (or not) general host information once every 2 x $sys_interval seconds
138 corvo 1.1 'general_data_sent': 0, # time from Epoch when general information was sent; don't touch!
139    
140     'hostname' : True,
141     'ip' : True, # will produce ethX_ip params for each interface
142     'cpu_MHz' : True,
143     'no_CPUs' : True, # number of CPUs
144     'total_mem' : True,
145 corvo 1.2 'total_swap' : True,
146     'cpu_vendor_id' : True,
147     'cpu_family' : True,
148     'cpu_model' : True,
149     'cpu_model_name': True,
150     'bogomips' : True};
151 corvo 1.1
152 corvo 1.3 def __init__ (self, initValue, defaultLogLevel = Logger.INFO):
153 corvo 1.1 """
154     Class constructor:
155     - if initValue is a string, put it in configAddresses and load destinations
156     from the file named like that. if it starts with "http://", the configuration
157     is loaded from that URL. For background monitoring, given parameters will overwrite defaults
158    
159     - if initValue is a list, put its contents in configAddresses and create
160     the list of destinations from all those sources. For background monitoring,
161     given parameters will overwrite defaults (see __defaultOptions)
162    
163     - if initValue is a tuple (of strings), initialize destinations with that values.
164     Strings in this tuple have this form: "{hostname|ip}[:port][ passwd]", the
165     default port being 8884 and the default password being "". Background monitoring will be
166     enabled sending the parameters active from __defaultOptions (see end of file)
167    
168     - if initValue is a hash (key = string(hostname|ip[:port][ passwd]),
169     val = hash{'param_name': True/False, ...}) the given options for each destination
170     will overwrite the default parameters (see __defaultOptions)
171     """
172 corvo 1.2 self.destinations = {} # empty, by default; key = tuple (host, port, pass) ; val = hash {"param_mame" : True/False, ...}
173 corvo 1.3 self.destPrevData = {} # empty, by defaul; key = tuple (host, port, pass) ; val = hash {"param_mame" : value, ...}
174 corvo 1.5 self.senderRef = {} # key = tuple (host, port, pass); val = hash {'INSTANCE_ID', 'SEQ_NR' }
175 corvo 1.2 self.configAddresses = [] # empty, by default; list of files/urls from where we read config
176 corvo 1.5 self.configRecheckInterval = 600 # 10 minutes
177 corvo 1.2 self.configRecheck = True # enabled by default
178     self.performBgMonitoring = True # by default, perform background monitoring
179     self.monitoredJobs = {} # Monitored jobs; key = pid; value = hash with
180 corvo 1.5 self.maxMsgRate = 10 # Maximum number of messages allowed to be sent per second
181     self.__defaultSenderRef = {'INSTANCE_ID': random.randint(0,0x7FFFFFFE), 'SEQ_NR': 0};
182 corvo 1.2 self.__defaultUserCluster = "ApMon_UserSend";
183     self.__defaultUserNode = socket.getfqdn();
184     self.__defaultSysMonCluster = "ApMon_SysMon";
185     self.__defaultSysMonNode = socket.getfqdn();
186     # don't touch these:
187 corvo 1.3 self.__freed = False
188 corvo 1.2 self.__udpSocket = None
189     self.__configUpdateLock = threading.Lock()
190     self.__configUpdateEvent = threading.Event()
191 corvo 1.4 self.__configUpdateFinished = threading.Event()
192 corvo 1.2 self.__bgMonitorLock = threading.Lock()
193     self.__bgMonitorEvent = threading.Event()
194 corvo 1.4 self.__bgMonitorFinished = threading.Event()
195 corvo 1.2 # don't allow a user to send more than MAX_MSG messages per second, in average
196     self.__crtTime = 0;
197     self.__prvTime = 0;
198     self.__prvSent = 0;
199     self.__prvDrop = 0;
200     self.__crtSent = 0;
201     self.__crtDrop = 0;
202     self.__hWeight = 0.92;
203 corvo 1.3 self.logger = Logger.Logger(defaultLogLevel)
204 corvo 1.5 self.setDestinations(initValue)
205     self.__udpSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
206     if len(self.configAddresses) > 0:
207     # if there are addresses that need to be monitored,
208     # start config checking and reloading thread
209     th = threading.Thread(target=self.__configLoader)
210     th.setDaemon(True) # this is a daemon thread
211     th.start()
212     # create the ProcInfo instance
213     self.procInfo = ProcInfo.ProcInfo(self.logger);
214     # self.procInfo.update();
215     # start the background monitoring thread
216     th = threading.Thread(target=self.__bgMonitor);
217     th.setDaemon(True);
218     th.start();
219 corvo 1.1
220     def sendParams (self, params):
221     """
222     Send multiple parameters to MonALISA, with default (last given) cluser and node names.
223     """
224     self.sendTimedParams (-1, params)
225    
226     def sendTimedParams (self, timeStamp, params):
227     """
228     Send multiple parameters, specifying the time for them, with default (last given) cluster and node names.
229     (See sendTimedParameters for more details)
230     """
231     self.sendTimedParameters (None, None, timeStamp, params);
232    
233     def sendParameter (self, clusterName, nodeName, paramName, paramValue):
234     """
235     Send a single parameter to MonALISA.
236     """
237     self.sendTimedParameter(clusterName, nodeName, -1, paramName, paramValue);
238    
239     def sendTimedParameter (self, clusterName, nodeName, timeStamp, paramName, paramValue):
240     """
241     Send a single parameter, with a given time.
242     """
243     self.sendTimedParameters (clusterName, nodeName, timeStamp, {paramName:paramValue})
244    
245     def sendParameters (self, clusterName, nodeName, params):
246     """
247     Send multiple parameters specifying cluster and node name for them
248     """
249     self.sendTimedParameters (clusterName, nodeName, -1, params);
250    
251     def sendTimedParameters (self, clusterName, nodeName, timeStamp, params):
252     """
253     Send multiple monitored parameters to MonALISA.
254    
255     - clusterName is the name of the cluster being monitored. The first
256     time this function is called, this paramenter must not be None. Then,
257     it can be None; last given clusterName will be used instead.
258     - nodeName is the name of the node for which are the parameters. If this
259     is None, the full hostname of this machine will be sent instead.
260     - timeStamp, if > 0, is given time for the parameters. This is in seconds from Epoch.
261     Note that this option should be used only if you are sure about the time for the result.
262     Otherwize, the parameters will be assigned a correct time (obtained from NTP servers)
263     in MonALISA service. This option can be usefull when parsing logs, for example.
264     - params is a dictionary containing pairs with:
265     - key: parameter name
266     - value: parameter value, either int or float.
267     or params is a vector of tuples (key, value). This version can be used
268     in case you want to send the parameters in a given order.
269    
270     NOTE that python doesn't know about 32-bit floats (only 64-bit floats!)
271     """
272 corvo 1.3 if (clusterName == None) or (clusterName == ""):
273 corvo 1.1 clusterName = self.__defaultUserCluster
274     else:
275     self.__defaultUserCluster = clusterName
276     if nodeName == None:
277     nodeName = self.__defaultUserNode
278     else:
279     self.__defaultUserNode = nodeName
280 corvo 1.5 if len(self.destinations) == 0:
281     self.logger.log(Logger.WARNING, "Not sending parameters since no destination is defined.");
282     return
283 corvo 1.3 self.__configUpdateLock.acquire();
284 corvo 1.1 for dest in self.destinations.keys():
285 corvo 1.5 self.__directSendParams(dest, clusterName, nodeName, timeStamp, params);
286 corvo 1.3 self.__configUpdateLock.release();
287 corvo 1.1
288     def addJobToMonitor (self, pid, workDir, clusterName, nodeName):
289     """
290     Add a new job to monitor.
291     """
292     self.__bgMonitorLock.acquire();
293     self.monitoredJobs[pid] = {};
294     self.monitoredJobs[pid]['CLUSTER_NAME'] = clusterName;
295     self.monitoredJobs[pid]['NODE_NAME'] = nodeName;
296     self.procInfo.addJobToMonitor(pid, workDir);
297     self.__bgMonitorLock.release();
298    
299     def removeJobToMonitor (self, pid):
300     """
301     Remove a job from being monitored.
302     """
303     self.__bgMonitorLock.acquire();
304     self.procInfo.removeJobToMonitor(pid);
305     del self.monitoredJobs[pid];
306     self.__bgMonitorLock.release();
307    
308     def setMonitorClusterNode (self, clusterName, nodeName):
309     """
310     Set the cluster and node names where to send system related information.
311     """
312     self.__bgMonitorLock.acquire();
313 corvo 1.3 if (clusterName != None) and (clusterName != ""):
314     self.__defaultSysMonCluster = clusterName;
315     if (nodeName != None) and (nodeName != ""):
316     self.__defaultSysMonNode = nodeName;
317 corvo 1.1 self.__bgMonitorLock.release();
318    
319     def enableBgMonitoring (self, onOff):
320     """
321     Enable or disable background monitoring. Note that background monitoring information
322     can still be sent if user calls the sendBgMonitoring method.
323     """
324     self.performBgMonitoring = onOff;
325 corvo 1.3
326     def sendBgMonitoring (self, mustSend = False):
327 corvo 1.1 """
328 corvo 1.3 Send background monitoring about system and jobs to all interested destinations.
329     If mustSend == True, the information is sent regardles of the elapsed time since last sent
330     If mustSend == False, the data is sent only if the required interval has passed since last sent
331 corvo 1.1 """
332 corvo 1.5 if len(self.destinations) == 0:
333     self.logger.log(Logger.WARNING, "Not sending bg monitoring info since no destination is defined.");
334     return
335 corvo 1.1 self.__bgMonitorLock.acquire();
336     now = int(time.time());
337 corvo 1.3 updatedProcInfo = False;
338     for destination, options in self.destinations.iteritems():
339 corvo 1.1 sysParams = [];
340     jobParams = [];
341 corvo 1.3 prevRawData = self.destPrevData[destination];
342 corvo 1.1 # for each destination and its options, check if we have to report any background monitoring data
343 corvo 1.3 if(options['sys_monitoring'] and (mustSend or options['sys_data_sent'] + options['sys_interval'] <= now)):
344 corvo 1.1 for param, active in options.items():
345     m = re.match("sys_(.+)", param);
346     if(m != None and active):
347     param = m.group(1);
348     if not (param == 'monitoring' or param == 'interval' or param == 'data_sent'):
349     sysParams.append(param)
350     options['sys_data_sent'] = now;
351 corvo 1.3 if(options['job_monitoring'] and (mustSend or options['job_data_sent'] + options['job_interval'] <= now)):
352 corvo 1.1 for param, active in options.items():
353     m = re.match("job_(.+)", param);
354     if(m != None and active):
355     param = m.group(1);
356     if not (param == 'monitoring' or param == 'interval' or param == 'data_sent'):
357     jobParams.append(param);
358     options['job_data_sent'] = now;
359 corvo 1.3 if(options['general_info'] and (mustSend or options['general_data_sent'] + 2 * int(options['sys_interval']) <= now)):
360 corvo 1.1 for param, active in options.items():
361     if not (param.startswith("sys_") or param.startswith("job_")) and active:
362     if not (param == 'general_info' or param == 'general_data_sent'):
363     sysParams.append(param);
364 corvo 1.3 options['general_data_sent'] = now;
365    
366 corvo 1.5 if (not updatedProcInfo) and (((len(sysParams) > 0) or (len(jobParams) > 0))):
367 corvo 1.3 self.procInfo.update();
368 corvo 1.5 updatedProcInfo = True;
369 corvo 1.3
370 corvo 1.1 sysResults = {}
371     if(len(sysParams) > 0):
372 corvo 1.5 sysResults = self.procInfo.getSystemData(sysParams, prevRawData)
373     if(len(sysResults) > 0):
374     self.__directSendParams(destination, self.__defaultSysMonCluster, self.__defaultSysMonNode, -1, sysResults)
375 corvo 1.1 for pid, props in self.monitoredJobs.items():
376 corvo 1.5 jobResults = {}
377 corvo 1.1 if(len(jobParams) > 0):
378 corvo 1.5 jobResults = self.procInfo.getJobData(pid, jobParams)
379 corvo 1.1 if(len(jobResults) > 0):
380 corvo 1.5 self.__directSendParams(destination, props['CLUSTER_NAME'], props['NODE_NAME'], -1, jobResults)
381 corvo 1.1 self.__bgMonitorLock.release();
382 corvo 1.5
383     def setDestinations(self, initValue):
384     """
385     Set the destinations of the ApMon instance. It accepts the same parameters as the constructor.
386     """
387     if type(initValue) == type("string"):
388     self.configAddresses = [initValue]
389     self.configRecheck = True
390     self.configRecheckInterval = 600
391     self.__reloadAddresses()
392     elif type(initValue) == type([]):
393     self.configAddresses = initValue
394     self.configRecheck = True
395     self.configRecheckInterval = 600
396     self.__reloadAddresses()
397     elif type(initValue) == type(()):
398     self.configAddresses = []
399     for dest in initValue:
400     self.__addDestination (dest, self.destinations)
401     self.configRecheck = False
402     elif type(initValue) == type({}):
403     self.configAddresses = []
404     for dest, opts in initValue.items():
405     self.__addDestination (dest, self.destinations, opts)
406     self.configRecheck = False
407 corvo 1.1
408 corvo 1.5 def initializedOK(self):
409     """
410     Retruns true if there is no destination where the parameters to be sent.
411     """
412     return len(self.destinations) > 0
413    
414     def setLogLevel(self, strLevel):
415 corvo 1.1 """
416     Change the log level. Given level is a string, one of 'FATAL', 'ERROR', 'WARNING',
417     'INFO', 'NOTICE', 'DEBUG'.
418     """
419     self.logger.setLogLevel(strLevel);
420    
421 corvo 1.2 def setMaxMsgRate(self, rate):
422     """
423     Set the maximum number of messages that can be sent, per second.
424     """
425     self.maxMsgRate = rate;
426     self.logger.log(Logger.DEBUG, "Setting maxMsgRate to: " + str(rate));
427    
428     def free(self):
429     """
430     Stop background threands, close opened sockets. You have to use this function if you want to
431     free all the resources that ApMon takes, and allow it to be garbage-collected.
432     """
433 corvo 1.4 if len(self.configAddresses) > 0:
434     self.__configUpdateEvent.set()
435     self.__configUpdateFinished.wait()
436     self.__bgMonitorEvent.set()
437     self.__bgMonitorFinished.wait()
438    
439 corvo 1.2 if self.__udpSocket != None:
440     self.logger.log(Logger.DEBUG, "Closing UDP socket on ApMon object destroy.");
441     self.__udpSocket.close();
442 corvo 1.3 self.__udpSocket = None;
443     self.__freed = True
444 corvo 1.2
445    
446 corvo 1.1 #########################################################################################
447     # Internal functions - Config reloader thread
448     #########################################################################################
449 corvo 1.5
450 corvo 1.1 def __configLoader(self):
451     """
452     Main loop of the thread that checks for changes and reloads the configuration
453     """
454 corvo 1.2 while not self.__configUpdateEvent.isSet():
455 corvo 1.5 self.__configUpdateEvent.wait(min(30, self.configRecheckInterval)) # don't recheck more often than 30 sec
456 corvo 1.2 if self.__configUpdateEvent.isSet():
457 corvo 1.4 break
458 corvo 1.1 if self.configRecheck:
459     self.__reloadAddresses()
460     self.logger.log(Logger.DEBUG, "Config reloaded. Seleeping for "+`self.configRecheckInterval`+" sec.");
461 corvo 1.4 self.__configUpdateFinished.set();
462 corvo 1.1
463     def __reloadAddresses(self):
464     """
465 corvo 1.5 Refresh now the destinations hash, by loading data from all sources in configAddresses
466 corvo 1.1 """
467 corvo 1.5 print "reloading addresses";
468 corvo 1.1 newDestinations = {}
469 corvo 1.5 urls = copy.deepcopy(self.configAddresses)
470     while(len(urls) > 0 and len(newDestinations) == 0):
471     src = random.choice(urls)
472     urls.remove(src)
473 corvo 1.1 self.__initializeFromFile(src, newDestinations)
474     # avoid changing config in the middle of sending packets to previous destinations
475     self.__configUpdateLock.acquire()
476     self.destinations = newDestinations
477     self.__configUpdateLock.release()
478 corvo 1.5 print "finished reloading addresses";
479 corvo 1.1
480     def __addDestination (self, aDestination, tempDestinations, options = __defaultOptions):
481     """
482     Add a destination to the list.
483    
484     aDestination is a string of the form "{hostname|ip}[:port] [passwd]" without quotes.
485     If the port is not given, it will be used the default port (8884)
486     If the password is missing, it will be considered an empty string
487     """
488     aDestination = aDestination.strip().replace('\t', ' ')
489     while aDestination != aDestination.replace(' ', ' '):
490     aDestination = aDestination.replace(' ', ' ')
491     sepPort = aDestination.find (':')
492     sepPasswd = aDestination.rfind (' ')
493     if sepPort >= 0:
494     host = aDestination[0:sepPort].strip()
495     if sepPasswd > sepPort + 1:
496     port = aDestination[sepPort+1:sepPasswd].strip()
497     passwd = aDestination[sepPasswd:].strip()
498     else:
499     port = aDestination[sepPort+1:].strip()
500     passwd = ""
501     else:
502     port = str(self.__defaultPort)
503     if sepPasswd >= 0:
504     host = aDestination[0:sepPasswd].strip()
505     passwd = aDestination[sepPasswd:].strip()
506     else:
507     host = aDestination.strip()
508     passwd = ""
509     if (not port.isdigit()):
510     self.logger.log(Logger.WARNING, "Bad value for port number "+`port`+" in "+aDestination+" destination");
511     return
512     alreadyAdded = False
513     port = int(port)
514 corvo 1.5 try:
515     host = socket.gethostbyname(host) # convert hostnames to IP addresses to avoid suffocating DNSs
516     except socket.error, msg:
517     self.logger.log(Logger.ERROR, "Error resolving "+host+": "+str(msg))
518     return
519 corvo 1.1 for h, p, w in tempDestinations.keys():
520     if (h == host) and (p == port):
521     alreadyAdded = True
522     break
523 corvo 1.5 destination = (host, port, passwd)
524 corvo 1.1 if not alreadyAdded:
525 corvo 1.5 self.logger.log(Logger.INFO, "Adding destination "+host+':'+`port`+' '+passwd)
526     if(self.destinations.has_key(destination)):
527     tempDestinations[destination] = self.destinations[destination] # reuse previous options
528     else:
529     tempDestinations[destination] = copy.deepcopy(self.__defaultOptions) # have a different set of options for each dest
530     if not self.destPrevData.has_key(destination):
531     self.destPrevData[destination] = {} # set it empty only if it's really new
532     if not self.senderRef.has_key(destination):
533     self.senderRef[destination] = copy.deepcopy(self.__defaultSenderRef) # otherwise, don't reset this nr.
534 corvo 1.1 if options != self.__defaultOptions:
535     # we have to overwrite defaults with given options
536     for key, value in options.items():
537 corvo 1.5 self.logger.log(Logger.NOTICE, "Overwritting option: "+key+" = "+`value`)
538     tempDestinations[destination][key] = value
539 corvo 1.1 else:
540 corvo 1.2 self.logger.log(Logger.NOTICE, "Destination "+host+":"+str(port)+" "+passwd+" already added. Skipping it");
541 corvo 1.1
542     def __initializeFromFile (self, confFileName, tempDestinations):
543     """
544     Load destinations from confFileName file. If it's an URL (starts with "http://")
545     load configuration from there. Put all destinations in tempDestinations hash.
546    
547     Calls addDestination for each line that doesn't start with # and
548     has non-whitespace characters on it
549     """
550     try:
551     if confFileName.find ("http://") == 0:
552 corvo 1.5 confFile = self.__getURL(confFileName)
553     if confFile is None:
554     return
555 corvo 1.1 else:
556     confFile = open (confFileName)
557     except IOError, ex:
558     self.logger.log(Logger.ERROR, "Cannot open "+confFileName);
559     self.logger.log(Logger.ERROR, "IOError: "+str(ex));
560     return
561     self.logger.log(Logger.INFO, "Adding destinations from "+confFileName);
562     dests = []
563     opts = {}
564     while(True):
565     line = confFile.readline();
566     if line == '':
567     break;
568     line = line.strip()
569     self.logger.log(Logger.DEBUG, "Reading line "+line);
570     if (len(line) == 0) or (line[0] == '#'):
571     continue
572     elif line.startswith("xApMon_"):
573     m = re.match("xApMon_(.*)", line);
574     if m != None:
575     m = re.match("(\S+)\s*=\s*(\S+)", m.group(1));
576     if m != None:
577     param = m.group(1); value = m.group(2);
578     if(value.upper() == "ON"):
579     value = True;
580     elif(value.upper() == "OFF"):
581     value = False;
582     elif(param.endswith("_interval")):
583     value = int(value);
584     if param == "loglevel":
585     self.logger.setLogLevel(value);
586 corvo 1.2 elif param == "maxMsgRate":
587     self.setMaxMsgRate(int(value));
588 corvo 1.1 elif param == "conf_recheck":
589     self.configRecheck = value;
590     elif param == "recheck_interval":
591     self.configRecheckInterval = value;
592     elif param.endswith("_data_sent"):
593     pass; # don't reset time in sys/job/general/_data_sent
594     else:
595     opts[param] = value;
596     else:
597     dests.append(line);
598 corvo 1.3
599 corvo 1.1 confFile.close ()
600     for line in dests:
601     self.__addDestination(line, tempDestinations, opts)
602    
603     ###############################################################################################
604     # Internal functions - Background monitor thread
605     ###############################################################################################
606    
607     def __bgMonitor (self):
608 corvo 1.2 while not self.__bgMonitorEvent.isSet():
609 corvo 1.4 self.__bgMonitorEvent.wait(10)
610 corvo 1.2 if self.__bgMonitorEvent.isSet():
611 corvo 1.4 break
612 corvo 1.1 if self.performBgMonitoring:
613 corvo 1.3 self.sendBgMonitoring() # send only if the interval has elapsed
614 corvo 1.4 self.__bgMonitorFinished.set()
615 corvo 1.1
616     ###############################################################################################
617     # Internal helper functions
618     ###############################################################################################
619 corvo 1.5
620     # this is a simplified replacement for urllib2 which doesn't support setting a timeout.
621     # by default, if timeout is not specified, it waits 5 seconds
622     def __getURL (self, url, timeout = 5):
623     r = re.compile("http://([^:/]+)(:(\d+))?(/.*)").match(url)
624     if r is None:
625     self.logger.log(Logger.ERROR, "Cannot open "+url+". Incorrectly formed URL.")
626     return None
627     host = r.group(1)
628     if r.group(3) == None:
629     port = 80 # no port is given, pick the default 80 for HTTP
630     else:
631     port = int(r.group(3))
632     if r.group(4) == None:
633     path = "" # no path is give, let server decide
634     else:
635     path = r.group(4)
636     sock = None
637     err = None
638     try:
639     for res in socket.getaddrinfo(host, port, socket.AF_UNSPEC, socket.SOCK_STREAM):
640     af, socktype, proto, canonname, sa = res
641     try:
642     sock = socket.socket(af, socktype, proto)
643     except socket.error, msg:
644     sock = None
645     err = msg
646     continue
647     try:
648     sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDTIMEO, struct.pack("ii", timeout, 0))
649     sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVTIMEO, struct.pack("ii", timeout, 0))
650     sock.connect(sa)
651     except socket.error, msg:
652     sock.close()
653     sock = None
654     err = msg
655     continue
656     break
657     except socket.error, msg:
658     sock = None
659     err = msg
660     if sock is None:
661     self.logger.log(Logger.ERROR, "Cannot open "+url)
662     self.logger.log(Logger.ERROR, "SocketError: "+str(err))
663     return None
664     try:
665     sock.send("GET "+path+" HTTP/1.0\n\n");
666     data = sock.recv(8192)
667     file = StringIO.StringIO(data)
668     httpStatus = 0
669     while True:
670     line = file.readline().strip()
671     if line == "":
672     break # exit at the end of file or at the first empty line (finish of http headers)
673     r = re.compile("HTTP/\d.\d (\d+)").match(line)
674     if r != None:
675     httpStatus = int(r.group(1))
676     if httpStatus == 200:
677     return file
678     else:
679     self.logger.log(Logger.ERROR, "Cannot open "+url)
680     if httpStatus == 401:
681     self.logger.log(Logger.ERROR, 'HTTPError: not authorized ['+str(httpStatus)+']')
682     elif httpStatus == 404:
683     self.logger.log(Logger.ERROR, 'HTTPError: not found ['+str(httpStatus)+']')
684     elif httpStatus == 503:
685     self.logger.log(Logger.ERROR, 'HTTPError: service unavailable ['+str(httpStatus)+']')
686     else:
687     self.logger.log(Logger.ERROR, 'HTTPError: unknown error ['+str(httpStatus)+']')
688     return None
689     except socket.error, msg:
690     self.logger.log(Logger.ERROR, "Cannot open "+url)
691     self.logger.log(Logger.ERROR, "SocketError: "+str(msg))
692     sock.close()
693     return None
694 corvo 1.1
695 corvo 1.5 def __directSendParams (self, destination, clusterName, nodeName, timeStamp, params):
696 corvo 1.3
697 corvo 1.2 if self.__shouldSend() == False:
698 corvo 1.4 self.logger.log(Logger.DEBUG, "Dropping packet since rate is too fast!");
699 corvo 1.2 return;
700    
701 corvo 1.3 if destination == None:
702     self.logger.log(Logger.WARNING, "Destination is None");
703     return;
704    
705 corvo 1.1 host, port, passwd = destination
706 corvo 1.5 crtSenderRef = self.senderRef[destination]
707     crtSenderRef['SEQ_NR'] = (crtSenderRef['SEQ_NR'] + 1) % 2000000000; # wrap around 2 mld
708 corvo 1.3
709 corvo 1.5 xdrPacker = xdrlib.Packer ()
710 corvo 1.3
711 corvo 1.1 xdrPacker.pack_string ("v:"+self.__version+"p:"+passwd)
712 corvo 1.3
713 corvo 1.5 xdrPacker.pack_int (crtSenderRef['INSTANCE_ID'])
714     xdrPacker.pack_int (crtSenderRef['SEQ_NR'])
715 corvo 1.3
716 corvo 1.5 xdrPacker.pack_string (clusterName)
717     xdrPacker.pack_string (nodeName)
718    
719     sent_params_nr = 0
720     paramsPacker = xdrlib.Packer ()
721 corvo 1.3
722 corvo 1.1 if type(params) == type( {} ):
723     for name, value in params.iteritems():
724 corvo 1.5 if self.__packParameter(paramsPacker, name, value):
725     sent_params_nr += 1
726 corvo 1.1 elif type(params) == type( [] ):
727     for name, value in params:
728     self.logger.log(Logger.DEBUG, "Adding parameter "+name+" = "+str(value));
729 corvo 1.5 if self.__packParameter(paramsPacker, name, value):
730     sent_params_nr += 1
731 corvo 1.1 else:
732     self.logger.log(Logger.WARNING, "Unsupported params type in sendParameters: " + str(type(params)));
733 corvo 1.5
734     xdrPacker.pack_int (sent_params_nr)
735    
736 corvo 1.3 if (timeStamp != None) and (timeStamp > 0):
737 corvo 1.5 paramsPacker.pack_int(timeStamp);
738 corvo 1.3
739 corvo 1.5 buffer = xdrPacker.get_buffer() + paramsPacker.get_buffer()
740     self.logger.log(Logger.NOTICE, "Building XDR packet ["+str(clusterName)+"/"+str(nodeName)+"] <"+str(crtSenderRef['SEQ_NR'])+"/"+str(crtSenderRef['INSTANCE_ID'])+"> "+str(sent_params_nr)+" params, "+str(len(buffer))+" bytes.");
741 corvo 1.1 # send this buffer to the destination, using udp datagrams
742     try:
743     self.__udpSocket.sendto(buffer, (host, port))
744 corvo 1.5 self.logger.log(Logger.NOTICE, "Packet sent to "+host+":"+str(port)+" "+passwd)
745 corvo 1.1 except socket.error, msg:
746 corvo 1.5 self.logger.log(Logger.ERROR, "Cannot send packet to "+host+":"+str(port)+" "+passwd+": "+str(msg[1]))
747 corvo 1.1 xdrPacker.reset()
748 corvo 1.5 paramsPacker.reset()
749 corvo 1.1
750     def __packParameter(self, xdrPacker, name, value):
751 corvo 1.3 if (name is None) or (name is ""):
752 corvo 1.5 self.logger.log(Logger.WARNING, "Undefined parameter name. Ignoring value "+str(value))
753     return False
754 corvo 1.3 if (value is None):
755 corvo 1.5 self.logger.log(Logger.WARNING, "Ignore " + str(name)+ " parameter because of None value")
756     return False
757 corvo 1.1 try:
758     typeValue = self.__valueTypes[type(value)]
759     xdrPacker.pack_string (name)
760     xdrPacker.pack_int (typeValue)
761     self.__packFunctions[typeValue] (xdrPacker, value)
762 corvo 1.5 self.logger.log(Logger.DEBUG, "Adding parameter "+str(name)+" = "+str(value))
763     return True
764 corvo 1.1 except Exception, ex:
765 corvo 1.4 self.logger.log(Logger.WARNING, "Error packing %s = %s; got %s" % (name, str(value), ex))
766 corvo 1.5 return False
767 corvo 1.3
768 corvo 1.1 # Destructor
769     def __del__(self):
770 corvo 1.3 if not self.__freed:
771     self.free();
772 corvo 1.2
773     # Decide if the current datagram should be sent.
774     # This decision is based on the number of messages previously sent.
775     def __shouldSend(self):
776     now = long(time.time());
777     if now != self.__crtTime :
778     # new time
779     # update previous counters;
780     self.__prvSent = self.__hWeight * self.__prvSent + (1.0 - self.__hWeight) * self.__crtSent / (now - self.__crtTime);
781     self.__prvTime = self.__crtTime;
782     self.logger.log(Logger.DEBUG, "previously sent: " + str(self.__crtSent) + "; dropped: " + str(self.__crtDrop));
783     # reset current counter
784     self.__crtTime = now;
785     self.__crtSent = 0;
786     self.__crtDrop = 0;
787    
788     # compute the history
789     valSent = self.__prvSent * self.__hWeight + self.__crtSent * (1 - self.__hWeight);
790    
791     doSend = True;
792    
793     # when we should start dropping messages
794     level = self.maxMsgRate - self.maxMsgRate / 10;
795    
796     if valSent > (self.maxMsgRate - level) :
797     if random.randint(0,self.maxMsgRate / 10) >= (self.maxMsgRate - valSent):
798     doSend = False;
799    
800     # counting sent and dropped messages
801     if doSend:
802     self.__crtSent+=1;
803     else:
804     self.__crtDrop+=1;
805    
806     return doSend;
807    
808 corvo 1.1 ################################################################################################
809     # Private variables. Don't touch
810     ################################################################################################
811    
812     __valueTypes = {
813     type("string"): 0, # XDR_STRING (see ApMon.h from C/C++ ApMon version)
814     type(1): 2, # XDR_INT32
815     type(1.0): 5}; # XDR_REAL64
816    
817     __packFunctions = {
818     0: xdrlib.Packer.pack_string,
819     2: xdrlib.Packer.pack_int,
820     5: xdrlib.Packer.pack_double }
821    
822     __defaultPort = 8884
823 corvo 1.5 __version = "2.2.4-py" # apMon version number
824 corvo 1.1