ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/apmon.py
Revision: 1.7
Committed: Fri Mar 28 17:18:46 2008 UTC (17 years, 1 month ago) by slacapra
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_9_1, CRAB_2_9_1_pre2, CRAB_2_9_1_pre1, CRAB_2_9_0, CRAB_2_9_0_pre2, CRAB_2_9_0_pre1, CRAB_2_8_8, CRAB_2_8_8_pre1, CRAB_2_8_7_patch3, CRAB_2_8_7_patch2, CRAB_2_8_7_patch1, CRAB_2_8_7, CRAB_2_8_7_pre2, CRAB_2_8_7_pre1, CRAB_2_8_6, CRAB_2_8_6_pre1, CRAB_2_8_5_patch3, CRAB_2_8_5_patch2, CRAB_2_8_5_patch1, CRAB_2_8_5, CRAB_2_8_5_pre5, CRAB_2_8_5_pre4, CRAB_2_8_5_pre3, CRAB_2_8_4_patch3, CRAB_2_8_5_pre2, CRAB_2_8_4_patch2, CRAB_2_8_5_pre1, CRAB_2_8_4_patch1, CRAB_2_8_4, CRAB_2_8_4_pre5, CRAB_2_8_4_pre4, CRAB_2_8_4_pre3, CRAB_2_8_4_pre2, CRAB_2_8_4_pre1, CRAB_2_8_3, CRAB_2_8_3_pre4, CRAB_2_8_3_pre3, CRAB_2_8_3_pre2, CRAB_2_8_3_pre1, CRAB_2_8_2_patch1, CRAB_2_8_2, CRAB_2_8_2_pre5, CRAB_2_8_2_pre4, CRAB_2_8_2_pre3, CRAB_2_8_2_pre2, CRAB_2_8_2_pre1, CRAB_2_8_1, CRAB_2_8_0, CRAB_2_8_0_pre1, CRAB_2_7_10_pre3, CRAB_2_7_9_patch2_pre1, CRAB_2_7_10_pre2, CRAB_2_7_10_pre1, CRAB_2_7_9_patch1, CRAB_2_7_9, CRAB_2_7_9_pre5, CRAB_2_7_9_pre4, CRAB_2_7_9_pre3, CRAB_2_7_9_pre2, CRAB_2_7_8_patch2, CRAB_2_7_9_pre1, CRAB_2_7_8_patch2_pre1, CRAB_2_7_8_patch1, CRAB_2_7_8_patch1_pre1, CRAB_2_7_8, CRAB_2_7_8_pre3, CRAB_2_7_8_pre2, CRAB_2_7_8_dash3, CRAB_2_7_8_dash2, CRAB_2_7_8_dash, CRAB_2_7_7_patch1, CRAB_2_7_7_patch1_pre1, CRAB_2_7_8_pre1, CRAB_2_7_7, CRAB_2_7_7_pre2, CRAB_2_7_7_pre1, CRAB_2_7_6_patch1, CRAB_2_7_6, CRAB_2_7_6_pre1, CRAB_2_7_5_patch1, CRAB_2_7_5, CRAB_2_7_5_pre3, CRAB_2_7_5_pre2, CRAB_2_7_5_pre1, CRAB_2_7_4_patch1, CRAB_2_7_4, CRAB_2_7_4_pre6, CRAB_2_7_4_pre5, CRAB_2_7_4_pre4, CRAB_2_7_4_pre3, CRAB_2_7_4_pre2, CRAB_2_7_4_pre1, CRAB_2_7_3, CRAB_2_7_3_pre3, CRAB_2_7_3_pre3_beta, CRAB_2_7_3_pre2, CRAB_2_7_3_pre2_beta, CRAB_2_7_3_pre1, CRAB_2_7_3_beta3, CRAB_2_7_3_beta2, CRAB_2_7_3_beta1, CRAB_2_7_3_beta, CRAB_2_7_2_p1, CRAB_2_7_1_branch_firstMERGE, CRAB_2_7_2, CRAB_2_7_2_pre4, CRAB_2_7_2_pre3, CRAB_2_7_2_pre2, CRAB_2_7_2_pre1, CRAB_2_7_1, fede_170310, CRAB_2_7_1_pre12, CRAB_2_7_1_pre11, CRAB_2_7_1_pre10, CRAB_2_7_1_pre9, CRAB_LumiMask, CRAB_2_7_lumi, from_LimiMask, CRAB_2_7_1_pre8, CRAB_2_7_1_pre6, CRAB_2_7_1_pre5, CRAB_2_7_1_wmbs_pre4, CRAB_2_7_1_pre4, CRAB_2_7_1_pre3, CRAB_2_6_6_pre6, CRAB_2_7_1_pre2, CRAB_2_6_6_pre5, CRAB_2_7_1_pre1, CRAB_2_6_6_pre4, CRAB_2_6_6_pre3, CRAB_2_6_6_pre2, CRAB_2_6_6_check, CRAB_2_6_6, CRAB_2_6_6_pre1, CRAB_2_7_0, CRAB_2_6_5, CRAB_2_7_0_pre8, CRAB_2_6_5_pre1, CRAB_2_7_0_pre7, CRAB_2_6_4, CRAB_2_7_0_pre6, CRAB_2_6_4_pre1, CRAB_2_7_0_pre5, CRAB_2_6_3_patch_2, CRAB_2_6_3_patch_2_pre2, CRAB_2_6_3_patch_2_pre1, CRAB_2_6_3_patch_1, CRAB_2_7_0_pre4, CRAB_2_7_0_pre3, CRAB_2_6_3, CRAB_2_6_3_pre5, CRAB_2_6_3_pre4, CRAB_2_6_3_pre3, CRAB_2_6_3_pre2, CRAB_2_7_0_pre2, CRAB_2_6_3_pre1, test_1, CRAB_2_7_0_pre1, CRAB_2_6_2, CRAB_2_6_2_pre2, CRAB_2_6_2_pre1, CRAB_2_6_1_pre4, CRAB_2_6_1_pre3, CRAB_2_6_1_pre2, CRAB_2_6_1_pre1, CRAB_2_6_1, CRAB_2_6_0, CRAB_2_6_0_pre14, CRAB_2_6_0_pre13, CRAB_2_6_0_pre12, CRAB_2_6_0_pre11, CRAB_2_6_0_pre10, CRAB_2_6_0_pre9, CRAB_2_6_0_pre8, CRAB_2_6_0_pre7, CRAB_2_6_0_pre6, CRAB_2_6_0_pre5, CRAB_2_6_0_pre4, CRAB_2_6_0_pre3, CRAB_2_6_0_pre2, CRAB_2_6_0_pre1, CRAB_2_5_1, CRAB_2_5_1_pre4, CRAB_2_5_1_pre3, CRAB_2_5_1_pre2, CRAB_2_5_1_pre1, CRAB_2_5_0, CRAB_2_5_0_pre7, CRAB_2_5_0_pre6, CRAB_2_5_0_pre5, CRAB_2_5_0_pre4, CRAB_2_5_0_pre3, CRAB_2_5_0_pre2, CRAB_2_5_0_pre1, CRAB_2_4_4, CRAB_2_4_4_pre6, CRAB_2_4_4_pre5, CRAB_2_4_4_pre4, CRAB_2_4_4_pre3, CRAB_2_4_4_pre2, CRAB_2_4_4_pre1, CRAB_2_4_3, CRAB_2_4_3_pre8, CRAB_2_4_3_pre7, CRAB_2_4_3_pre6, CRAB_2_4_3_pre5, CRAB_2_4_3_pre3, CRAB_2_4_3_pre2, CRAB_2_4_3_pre1, CRAB_2_4_2, CRAB_2_4_2_pre3, CRAB_2_4_2_pre2, CRAB_2_4_2_pre1, CRAB_2_4_1, CRAB_2_4_1_pre4, CRAB_2_4_1_pre3, CRAB_2_4_1_pre2, CRAB_2_4_1_pre1, CRAB_2_4_0_Tutorial, CRAB_2_4_0_Tutorial_pre1, CRAB_2_4_0, CRAB_2_4_0_pre9, CRAB_2_4_0_pre8, CRAB_2_4_0_pre7, CRAB_2_4_0_pre6, CRAB_2_4_0_pre5, CRAB_2_4_0_pre4, CRAB_2_4_0_pre3, CRAB_2_4_0_pre2, CRAB_2_4_0_pre1, CRAB_DLS_PHED1, CRAB_DLS_PHED, CRAB_2_3_2_Fnal, CRAB_2_3_2, CRAB_2_3_2_pre7, CRAB_2_3_2_pre5, CRAB_2_3_2_pre4, CRAB_2_3_2_pre3, CRAB_2_3_2_pre2, CRAB_2_3_2_pre1, CRAB_2_4_0_test, CRAB_2_3_1, CRAB_2_3_1_pre6, CRAB_2_3_1_pre5, CRAB_2_3_1_pre4, CRAB_2_3_1_pre3, CRAB_2_3_1_pre2, CRAB_2_3_1_pre1, CRAB_2_3_0, CRAB_2_3_0_pre6, CRAB_2_3_0_pre1, CRAB_2_2_2_pre5, CRAB_2_2_2_pre4, CRAB_2_2_2_pre3, CRAB_2_2_2_pre2, CRAB_2_2_2_pre1, CRAB_2_2_1, CRAB_2_2_1_pre6, CRAB_2_2_1_pre5, CRAB_2_2_1_pre4, PRODCOMMON_0_10_7_testCS2, CRAB_2_2_1_pre3, CRAB_2_2_1_pre2, CRAB_2_2_1_pre1, CRAB_2_2_0, CRAB_2_2_0_pre21, CRAB_2_2_0_pre19, CRAB_2_2_0_pre18, CRAB_2_2_0_pre17, CRAB_2_2_0_pre16, CRAB_2_2_0_pre15, CRAB_2_2_0_pre13, CRAB_2_2_0_pre12, CRAB_2_2_0_pre11, CRAB_2_2_0_pre10, bp_osg_bdii, CRAB_2_2_0_pre9, CRAB_2_2_0_pre8, CRAB_2_2_0_pre7, CRAB_2_1_2, CRAB_2_2_0_pre5, CRAB_2_1_2_pre2, CRAB_2_1_2_pre1, CRAB_2_2_0_pre4, CRAB_2_2_0_pre2, CRAB_2_1_1, HEAD
Branch point for: CRAB_multiout, CRAB_2_7_1_branch, Lumi2_8, CRAB_2_6_X_br, AnaDataSet, CRAB_2_3_0_br, osg_bdii, CRAB_2_1_2_br
Changes since 1.6: +20 -18 lines
Log Message:
Upgrade of dashboard related classes to improve submission of monotoring infos for larg task: see https://hypernews.cern.ch/HyperNews/CMS/get/crabDevelopment/657/1/1/1/2/1/1.html

File Contents

# Content
1
2 """
3 * ApMon - Application Monitoring Tool
4 * Version: 2.2.4
5 *
6 * Copyright (C) 2006 California Institute of Technology
7 *
8 * Permission is hereby granted, free of charge, to use, copy and modify
9 * this software and its documentation (the "Software") for any
10 * purpose, provided that existing copyright notices are retained in
11 * all copies and that this notice is included verbatim in any distributions
12 * or substantial portions of the Software.
13 * This software is a part of the MonALISA framework (http://monalisa.cacr.caltech.edu).
14 * Users of the Software are asked to feed back problems, benefits,
15 * and/or suggestions about the software to the MonALISA Development Team
16 * (developers@monalisa.cern.ch). Support for this software - fixing of bugs,
17 * incorporation of new features - is done on a best effort basis. All bug
18 * fixes and enhancements will be made available under the same terms and
19 * conditions as the original software,
20
21 * IN NO EVENT SHALL THE AUTHORS OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
22 * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
23 * OF THE USE OF THIS SOFTWARE, ITS DOCUMENTATION, OR ANY DERIVATIVES THEREOF,
24 * EVEN IF THE AUTHORS HAVE BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
25
26 * THE AUTHORS AND DISTRIBUTORS SPECIFICALLY DISCLAIM ANY WARRANTIES,
27 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY,
28 * FITNESS FOR A PARTICULAR PURPOSE, AND NON-INFRINGEMENT. THIS SOFTWARE IS
29 * PROVIDED ON AN "AS IS" BASIS, AND THE AUTHORS AND DISTRIBUTORS HAVE NO
30 * OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR
31 * MODIFICATIONS.
32 """
33
34 """
35 apmon.py
36
37 This is a python implementation for the ApMon API for sending
38 data to the MonALISA service.
39
40 For further details about ApMon please see the C/C++ or Java documentation
41 You can find a sample usage of this module in apmTest.py.
42
43 Note that the parameters must be either integers(32 bits) or doubles(64 bits).
44 Sending strings is supported, but they will not be stored in the
45 farm's store nor shown in the farm's window in the MonALISA client.
46 """
47
48 import re
49 import xdrlib
50 import socket
51 import struct
52 import StringIO
53 import threading
54 import time
55 import Logger
56 import ProcInfo
57 import random
58 import copy
59
60 #__all__ = ["ApMon"]
61
62 #__debug = False # set this to True to be verbose
63
64 class ApMon:
65 """
66 Main class for sending monitoring data to a MonaLisa module.
67 One or more destinations can be chosen for the data. See constructor.
68
69 The data is packed in UDP datagrams, using XDR. The following fields are sent:
70 - version & password (string)
71 - cluster name (string)
72 - node name (string)
73 - number of parameters (int)
74 - for each parameter:
75 - name (string)
76 - value type (int)
77 - value
78 - optionally a (int) with the given timestamp
79
80 Attributes (public):
81 - destinations - a list containing (ip, port, password) tuples
82 - configAddresses - list with files and urls from where the config is read
83 - configRecheckInterval - period, in seconds, to check for changes
84 in the configAddresses list
85 - configRecheck - boolean - whether to recheck periodically for changes
86 in the configAddresses list
87 """
88
89 __defaultOptions = {
90 'job_monitoring': True, # perform (or not) job monitoring
91 'job_interval' : 120, # at this interval (in seconds)
92 'job_data_sent' : 0, # time from Epoch when job information was sent; don't touch!
93
94 'job_cpu_time' : True, # elapsed time from the start of this job in seconds
95 'job_run_time' : True, # processor time spent running this job in seconds
96 'job_cpu_usage' : True, # current percent of the processor used for this job, as reported by ps
97 'job_virtualmem': True, # size in JB of the virtual memory occupied by the job, as reported by ps
98 'job_rss' : True, # size in KB of the resident image size of the job, as reported by ps
99 'job_mem_usage' : True, # percent of the memory occupied by the job, as reported by ps
100 'job_workdir_size': True, # size in MB of the working directory of the job
101 'job_disk_total': True, # size in MB of the total size of the disk partition containing the working directory
102 'job_disk_used' : True, # size in MB of the used disk partition containing the working directory
103 'job_disk_free' : True, # size in MB of the free disk partition containing the working directory
104 'job_disk_usage': True, # percent of the used disk partition containing the working directory
105 'job_open_files': True, # number of open file descriptors
106
107 'sys_monitoring': True, # perform (or not) system monitoring
108 'sys_interval' : 120, # at this interval (in seconds)
109 'sys_data_sent' : 0, # time from Epoch when system information was sent; don't touch!
110
111 'sys_cpu_usr' : True, # cpu-usage information
112 'sys_cpu_sys' : True, # all these will produce coresponding paramas without "sys_"
113 'sys_cpu_nice' : True,
114 'sys_cpu_idle' : True,
115 'sys_cpu_usage' : True,
116 'sys_load1' : True, # system load information
117 'sys_load5' : True,
118 'sys_load15' : True,
119 'sys_mem_used' : True, # memory usage information
120 'sys_mem_free' : True,
121 'sys_mem_usage' : True,
122 'sys_pages_in' : True,
123 'sys_pages_out' : True,
124 'sys_swap_used' : True, # swap usage information
125 'sys_swap_free' : True,
126 'sys_swap_usage': True,
127 'sys_swap_in' : True,
128 'sys_swap_out' : True,
129 'sys_net_in' : True, # network transfer in kBps
130 'sys_net_out' : True, # these will produce params called ethX_in, ethX_out, ethX_errs
131 'sys_net_errs' : True, # for each eth interface
132 'sys_net_sockets' : True, # number of opened sockets for each proto => sockets_tcp/udp/unix ...
133 'sys_net_tcp_details' : True, # number of tcp sockets in each state => sockets_tcp_LISTEN, ...
134 'sys_processes' : True,
135 'sys_uptime' : True, # uptime of the machine, in days (float number)
136
137 'general_info' : True, # send (or not) general host information once every 2 x $sys_interval seconds
138 'general_data_sent': 0, # time from Epoch when general information was sent; don't touch!
139
140 'hostname' : True,
141 'ip' : True, # will produce ethX_ip params for each interface
142 'cpu_MHz' : True,
143 'no_CPUs' : True, # number of CPUs
144 'total_mem' : True,
145 'total_swap' : True,
146 'cpu_vendor_id' : True,
147 'cpu_family' : True,
148 'cpu_model' : True,
149 'cpu_model_name': True,
150 'bogomips' : True};
151
152 def __init__ (self, initValue, defaultLogLevel = Logger.INFO):
153 """
154 Class constructor:
155 - if initValue is a string, put it in configAddresses and load destinations
156 from the file named like that. if it starts with "http://", the configuration
157 is loaded from that URL. For background monitoring, given parameters will overwrite defaults
158
159 - if initValue is a list, put its contents in configAddresses and create
160 the list of destinations from all those sources. For background monitoring,
161 given parameters will overwrite defaults (see __defaultOptions)
162
163 - if initValue is a tuple (of strings), initialize destinations with that values.
164 Strings in this tuple have this form: "{hostname|ip}[:port][ passwd]", the
165 default port being 8884 and the default password being "". Background monitoring will be
166 enabled sending the parameters active from __defaultOptions (see end of file)
167
168 - if initValue is a hash (key = string(hostname|ip[:port][ passwd]),
169 val = hash{'param_name': True/False, ...}) the given options for each destination
170 will overwrite the default parameters (see __defaultOptions)
171 """
172 self.destinations = {} # empty, by default; key = tuple (host, port, pass) ; val = hash {"param_mame" : True/False, ...}
173 self.destPrevData = {} # empty, by defaul; key = tuple (host, port, pass) ; val = hash {"param_mame" : value, ...}
174 self.senderRef = {} # key = tuple (host, port, pass); val = hash {'INSTANCE_ID', 'SEQ_NR' }
175 self.configAddresses = [] # empty, by default; list of files/urls from where we read config
176 self.configRecheckInterval = 600 # 10 minutes
177 self.configRecheck = True # enabled by default
178 self.performBgMonitoring = True # by default, perform background monitoring
179 self.monitoredJobs = {} # Monitored jobs; key = pid; value = hash with
180 self.maxMsgRate = 100 # Maximum number of messages allowed to be sent per second
181 self.__defaultSenderRef = {'INSTANCE_ID': random.randint(0,0x7FFFFFFE), 'SEQ_NR': 0};
182 self.__defaultUserCluster = "ApMon_UserSend";
183 self.__defaultUserNode = socket.getfqdn();
184 self.__defaultSysMonCluster = "ApMon_SysMon";
185 self.__defaultSysMonNode = socket.getfqdn();
186 # don't touch these:
187 self.__freed = False
188 self.__udpSocket = None
189 self.__configUpdateLock = threading.Lock()
190 self.__configUpdateEvent = threading.Event()
191 self.__configUpdateFinished = threading.Event()
192 self.__bgMonitorLock = threading.Lock()
193 self.__bgMonitorEvent = threading.Event()
194 self.__bgMonitorFinished = threading.Event()
195 # don't allow a user to send more than MAX_MSG messages per second, in average
196 self.__crtTime = 0;
197 self.__prvTime = 0;
198 self.__prvSent = 0;
199 self.__prvDrop = 0;
200 self.__crtSent = 0;
201 self.__crtDrop = 0;
202 self.__hWeight = 0.95; # in (0,1) increase to wait more time before maxMsgRate kicks-in
203 self.logger = Logger.Logger(defaultLogLevel)
204 self.setDestinations(initValue)
205 self.__udpSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
206 #if len(self.configAddresses) > 0:
207 # # if there are addresses that need to be monitored,
208 # # start config checking and reloading thread
209 # th = threading.Thread(target=self.__configLoader)
210 # th.setDaemon(True) # this is a daemon thread
211 # th.start()
212 # create the ProcInfo instance
213 self.procInfo = ProcInfo.ProcInfo(self.logger);
214 # self.procInfo.update();
215 # start the background monitoring thread
216 #th = threading.Thread(target=self.__bgMonitor);
217 #th.setDaemon(True);
218 #th.start();
219
220 def sendParams (self, params):
221 """
222 Send multiple parameters to MonALISA, with default (last given) cluser and node names.
223 """
224 self.sendTimedParams (-1, params)
225
226 def sendTimedParams (self, timeStamp, params):
227 """
228 Send multiple parameters, specifying the time for them, with default (last given) cluster and node names.
229 (See sendTimedParameters for more details)
230 """
231 self.sendTimedParameters (None, None, timeStamp, params);
232
233 def sendParameter (self, clusterName, nodeName, paramName, paramValue):
234 """
235 Send a single parameter to MonALISA.
236 """
237 self.sendTimedParameter(clusterName, nodeName, -1, paramName, paramValue);
238
239 def sendTimedParameter (self, clusterName, nodeName, timeStamp, paramName, paramValue):
240 """
241 Send a single parameter, with a given time.
242 """
243 self.sendTimedParameters (clusterName, nodeName, timeStamp, {paramName:paramValue})
244
245 def sendParameters (self, clusterName, nodeName, params):
246 """
247 Send multiple parameters specifying cluster and node name for them
248 """
249 self.sendTimedParameters (clusterName, nodeName, -1, params);
250
251 def sendTimedParameters (self, clusterName, nodeName, timeStamp, params):
252 """
253 Send multiple monitored parameters to MonALISA.
254
255 - clusterName is the name of the cluster being monitored. The first
256 time this function is called, this paramenter must not be None. Then,
257 it can be None; last given clusterName will be used instead.
258 - nodeName is the name of the node for which are the parameters. If this
259 is None, the full hostname of this machine will be sent instead.
260 - timeStamp, if > 0, is given time for the parameters. This is in seconds from Epoch.
261 Note that this option should be used only if you are sure about the time for the result.
262 Otherwize, the parameters will be assigned a correct time (obtained from NTP servers)
263 in MonALISA service. This option can be usefull when parsing logs, for example.
264 - params is a dictionary containing pairs with:
265 - key: parameter name
266 - value: parameter value, either int or float.
267 or params is a vector of tuples (key, value). This version can be used
268 in case you want to send the parameters in a given order.
269
270 NOTE that python doesn't know about 32-bit floats (only 64-bit floats!)
271 """
272 if (clusterName == None) or (clusterName == ""):
273 clusterName = self.__defaultUserCluster
274 else:
275 self.__defaultUserCluster = clusterName
276 if nodeName == None:
277 nodeName = self.__defaultUserNode
278 else:
279 self.__defaultUserNode = nodeName
280 if len(self.destinations) == 0:
281 self.logger.log(Logger.WARNING, "Not sending parameters since no destination is defined.");
282 return
283 self.__configUpdateLock.acquire();
284 for dest in self.destinations.keys():
285 self.__directSendParams(dest, clusterName, nodeName, timeStamp, params);
286 self.__configUpdateLock.release();
287
288 def addJobToMonitor (self, pid, workDir, clusterName, nodeName):
289 """
290 Add a new job to monitor.
291 """
292 self.__bgMonitorLock.acquire();
293 self.monitoredJobs[pid] = {};
294 self.monitoredJobs[pid]['CLUSTER_NAME'] = clusterName;
295 self.monitoredJobs[pid]['NODE_NAME'] = nodeName;
296 self.procInfo.addJobToMonitor(pid, workDir);
297 self.__bgMonitorLock.release();
298
299 def removeJobToMonitor (self, pid):
300 """
301 Remove a job from being monitored.
302 """
303 self.__bgMonitorLock.acquire();
304 self.procInfo.removeJobToMonitor(pid);
305 del self.monitoredJobs[pid];
306 self.__bgMonitorLock.release();
307
308 def setMonitorClusterNode (self, clusterName, nodeName):
309 """
310 Set the cluster and node names where to send system related information.
311 """
312 self.__bgMonitorLock.acquire();
313 if (clusterName != None) and (clusterName != ""):
314 self.__defaultSysMonCluster = clusterName;
315 if (nodeName != None) and (nodeName != ""):
316 self.__defaultSysMonNode = nodeName;
317 self.__bgMonitorLock.release();
318
319 def enableBgMonitoring (self, onOff):
320 """
321 Enable or disable background monitoring. Note that background monitoring information
322 can still be sent if user calls the sendBgMonitoring method.
323 """
324 self.performBgMonitoring = onOff;
325
326 def sendBgMonitoring (self, mustSend = False):
327 """
328 Send background monitoring about system and jobs to all interested destinations.
329 If mustSend == True, the information is sent regardles of the elapsed time since last sent
330 If mustSend == False, the data is sent only if the required interval has passed since last sent
331 """
332 if len(self.destinations) == 0:
333 self.logger.log(Logger.WARNING, "Not sending bg monitoring info since no destination is defined.");
334 return
335 self.__bgMonitorLock.acquire();
336 now = int(time.time());
337 updatedProcInfo = False;
338 for destination, options in self.destinations.iteritems():
339 sysParams = [];
340 jobParams = [];
341 prevRawData = self.destPrevData[destination];
342 # for each destination and its options, check if we have to report any background monitoring data
343 if(options['sys_monitoring'] and (mustSend or options['sys_data_sent'] + options['sys_interval'] <= now)):
344 for param, active in options.items():
345 m = re.match("sys_(.+)", param);
346 if(m != None and active):
347 param = m.group(1);
348 if not (param == 'monitoring' or param == 'interval' or param == 'data_sent'):
349 sysParams.append(param)
350 options['sys_data_sent'] = now;
351 if(options['job_monitoring'] and (mustSend or options['job_data_sent'] + options['job_interval'] <= now)):
352 for param, active in options.items():
353 m = re.match("job_(.+)", param);
354 if(m != None and active):
355 param = m.group(1);
356 if not (param == 'monitoring' or param == 'interval' or param == 'data_sent'):
357 jobParams.append(param);
358 options['job_data_sent'] = now;
359 if(options['general_info'] and (mustSend or options['general_data_sent'] + 2 * int(options['sys_interval']) <= now)):
360 for param, active in options.items():
361 if not (param.startswith("sys_") or param.startswith("job_")) and active:
362 if not (param == 'general_info' or param == 'general_data_sent'):
363 sysParams.append(param);
364 options['general_data_sent'] = now;
365
366 if (not updatedProcInfo) and (((len(sysParams) > 0) or (len(jobParams) > 0))):
367 self.procInfo.update();
368 updatedProcInfo = True;
369
370 sysResults = {}
371 if(len(sysParams) > 0):
372 sysResults = self.procInfo.getSystemData(sysParams, prevRawData)
373 if(len(sysResults) > 0):
374 self.__directSendParams(destination, self.__defaultSysMonCluster, self.__defaultSysMonNode, -1, sysResults)
375 for pid, props in self.monitoredJobs.items():
376 jobResults = {}
377 if(len(jobParams) > 0):
378 jobResults = self.procInfo.getJobData(pid, jobParams)
379 if(len(jobResults) > 0):
380 self.__directSendParams(destination, props['CLUSTER_NAME'], props['NODE_NAME'], -1, jobResults)
381 self.__bgMonitorLock.release();
382
383 def setDestinations(self, initValue):
384 """
385 Set the destinations of the ApMon instance. It accepts the same parameters as the constructor.
386 """
387 if type(initValue) == type("string"):
388 self.configAddresses = [initValue]
389 self.configRecheck = True
390 self.configRecheckInterval = 600
391 self.__reloadAddresses()
392 elif type(initValue) == type([]):
393 self.configAddresses = initValue
394 self.configRecheck = True
395 self.configRecheckInterval = 600
396 self.__reloadAddresses()
397 elif type(initValue) == type(()):
398 self.configAddresses = []
399 for dest in initValue:
400 self.__addDestination (dest, self.destinations)
401 self.configRecheck = False
402 elif type(initValue) == type({}):
403 self.configAddresses = []
404 for dest, opts in initValue.items():
405 self.__addDestination (dest, self.destinations, opts)
406 self.configRecheck = False
407
408 def initializedOK(self):
409 """
410 Retruns true if there is no destination where the parameters to be sent.
411 """
412 return len(self.destinations) > 0
413
414 def setLogLevel(self, strLevel):
415 """
416 Change the log level. Given level is a string, one of 'FATAL', 'ERROR', 'WARNING',
417 'INFO', 'NOTICE', 'DEBUG'.
418 """
419 self.logger.setLogLevel(strLevel);
420
421 def setMaxMsgRate(self, rate):
422 """
423 Set the maximum number of messages that can be sent, per second.
424 """
425 self.maxMsgRate = rate;
426 self.logger.log(Logger.DEBUG, "Setting maxMsgRate to: " + str(rate));
427
428 def free(self):
429 """
430 Stop background threands, close opened sockets. You have to use this function if you want to
431 free all the resources that ApMon takes, and allow it to be garbage-collected.
432 """
433 #if len(self.configAddresses) > 0:
434 # self.__configUpdateEvent.set()
435 # self.__configUpdateFinished.wait()
436 #self.__bgMonitorEvent.set()
437 #self.__bgMonitorFinished.wait()
438
439 if self.__udpSocket != None:
440 self.logger.log(Logger.DEBUG, "Closing UDP socket on ApMon object destroy.");
441 self.__udpSocket.close();
442 self.__udpSocket = None;
443 self.__freed = True
444
445
446 #########################################################################################
447 # Internal functions - Config reloader thread
448 #########################################################################################
449
450 def __configLoader(self):
451 """
452 Main loop of the thread that checks for changes and reloads the configuration
453 """
454 while not self.__configUpdateEvent.isSet():
455 self.__configUpdateEvent.wait(min(30, self.configRecheckInterval)) # don't recheck more often than 30 sec
456 if self.__configUpdateEvent.isSet():
457 break
458 if self.configRecheck:
459 self.__reloadAddresses()
460 self.logger.log(Logger.DEBUG, "Config reloaded. Seleeping for "+`self.configRecheckInterval`+" sec.");
461 self.__configUpdateFinished.set();
462
463 def __reloadAddresses(self):
464 """
465 Refresh now the destinations hash, by loading data from all sources in configAddresses
466 """
467 newDestinations = {}
468 urls = copy.deepcopy(self.configAddresses)
469 while(len(urls) > 0 and len(newDestinations) == 0):
470 src = random.choice(urls)
471 urls.remove(src)
472 self.__initializeFromFile(src, newDestinations)
473 # avoid changing config in the middle of sending packets to previous destinations
474 self.__configUpdateLock.acquire()
475 self.destinations = newDestinations
476 self.__configUpdateLock.release()
477
478 def __addDestination (self, aDestination, tempDestinations, options = __defaultOptions):
479 """
480 Add a destination to the list.
481
482 aDestination is a string of the form "{hostname|ip}[:port] [passwd]" without quotes.
483 If the port is not given, it will be used the default port (8884)
484 If the password is missing, it will be considered an empty string
485 """
486 aDestination = aDestination.strip().replace('\t', ' ')
487 while aDestination != aDestination.replace(' ', ' '):
488 aDestination = aDestination.replace(' ', ' ')
489 sepPort = aDestination.find (':')
490 sepPasswd = aDestination.rfind (' ')
491 if sepPort >= 0:
492 host = aDestination[0:sepPort].strip()
493 if sepPasswd > sepPort + 1:
494 port = aDestination[sepPort+1:sepPasswd].strip()
495 passwd = aDestination[sepPasswd:].strip()
496 else:
497 port = aDestination[sepPort+1:].strip()
498 passwd = ""
499 else:
500 port = str(self.__defaultPort)
501 if sepPasswd >= 0:
502 host = aDestination[0:sepPasswd].strip()
503 passwd = aDestination[sepPasswd:].strip()
504 else:
505 host = aDestination.strip()
506 passwd = ""
507 if (not port.isdigit()):
508 self.logger.log(Logger.WARNING, "Bad value for port number "+`port`+" in "+aDestination+" destination");
509 return
510 alreadyAdded = False
511 port = int(port)
512 try:
513 host = socket.gethostbyname(host) # convert hostnames to IP addresses to avoid suffocating DNSs
514 except socket.error, msg:
515 self.logger.log(Logger.ERROR, "Error resolving "+host+": "+str(msg))
516 return
517 for h, p, w in tempDestinations.keys():
518 if (h == host) and (p == port):
519 alreadyAdded = True
520 break
521 destination = (host, port, passwd)
522 if not alreadyAdded:
523 self.logger.log(Logger.INFO, "Adding destination "+host+':'+`port`+' '+passwd)
524 if(self.destinations.has_key(destination)):
525 tempDestinations[destination] = self.destinations[destination] # reuse previous options
526 else:
527 tempDestinations[destination] = copy.deepcopy(self.__defaultOptions) # have a different set of options for each dest
528 if not self.destPrevData.has_key(destination):
529 self.destPrevData[destination] = {} # set it empty only if it's really new
530 if not self.senderRef.has_key(destination):
531 self.senderRef[destination] = copy.deepcopy(self.__defaultSenderRef) # otherwise, don't reset this nr.
532 if options != self.__defaultOptions:
533 # we have to overwrite defaults with given options
534 for key, value in options.items():
535 self.logger.log(Logger.NOTICE, "Overwritting option: "+key+" = "+`value`)
536 tempDestinations[destination][key] = value
537 else:
538 self.logger.log(Logger.NOTICE, "Destination "+host+":"+str(port)+" "+passwd+" already added. Skipping it");
539
540 def __initializeFromFile (self, confFileName, tempDestinations):
541 """
542 Load destinations from confFileName file. If it's an URL (starts with "http://")
543 load configuration from there. Put all destinations in tempDestinations hash.
544
545 Calls addDestination for each line that doesn't start with # and
546 has non-whitespace characters on it
547 """
548 try:
549 if confFileName.find ("http://") == 0:
550 confFile = self.__getURL(confFileName)
551 if confFile is None:
552 return
553 else:
554 confFile = open (confFileName)
555 except IOError, ex:
556 self.logger.log(Logger.ERROR, "Cannot open "+confFileName);
557 self.logger.log(Logger.ERROR, "IOError: "+str(ex));
558 return
559 self.logger.log(Logger.INFO, "Adding destinations from "+confFileName);
560 dests = []
561 opts = {}
562 while(True):
563 line = confFile.readline();
564 if line == '':
565 break;
566 line = line.strip()
567 self.logger.log(Logger.DEBUG, "Reading line "+line);
568 if (len(line) == 0) or (line[0] == '#'):
569 continue
570 elif line.startswith("xApMon_"):
571 m = re.match("xApMon_(.*)", line);
572 if m != None:
573 m = re.match("(\S+)\s*=\s*(\S+)", m.group(1));
574 if m != None:
575 param = m.group(1); value = m.group(2);
576 if(value.upper() == "ON"):
577 value = True;
578 elif(value.upper() == "OFF"):
579 value = False;
580 elif(param.endswith("_interval")):
581 value = int(value);
582 if param == "loglevel":
583 self.logger.setLogLevel(value);
584 elif param == "maxMsgRate":
585 self.setMaxMsgRate(int(value));
586 elif param == "conf_recheck":
587 self.configRecheck = value;
588 elif param == "recheck_interval":
589 self.configRecheckInterval = value;
590 elif param.endswith("_data_sent"):
591 pass; # don't reset time in sys/job/general/_data_sent
592 else:
593 opts[param] = value;
594 else:
595 dests.append(line);
596
597 confFile.close ()
598 for line in dests:
599 self.__addDestination(line, tempDestinations, opts)
600
601 ###############################################################################################
602 # Internal functions - Background monitor thread
603 ###############################################################################################
604
605 def __bgMonitor (self):
606 while not self.__bgMonitorEvent.isSet():
607 self.__bgMonitorEvent.wait(10)
608 if self.__bgMonitorEvent.isSet():
609 break
610 if self.performBgMonitoring:
611 self.sendBgMonitoring() # send only if the interval has elapsed
612 self.__bgMonitorFinished.set()
613
614 ###############################################################################################
615 # Internal helper functions
616 ###############################################################################################
617
618 # this is a simplified replacement for urllib2 which doesn't support setting a timeout.
619 # by default, if timeout is not specified, it waits 5 seconds
620 def __getURL (self, url, timeout = 5):
621 r = re.compile("http://([^:/]+)(:(\d+))?(/.*)").match(url)
622 if r is None:
623 self.logger.log(Logger.ERROR, "Cannot open "+url+". Incorrectly formed URL.")
624 return None
625 host = r.group(1)
626 if r.group(3) == None:
627 port = 80 # no port is given, pick the default 80 for HTTP
628 else:
629 port = int(r.group(3))
630 if r.group(4) == None:
631 path = "" # no path is give, let server decide
632 else:
633 path = r.group(4)
634 sock = None
635 err = None
636 try:
637 for res in socket.getaddrinfo(host, port, socket.AF_UNSPEC, socket.SOCK_STREAM):
638 af, socktype, proto, canonname, sa = res
639 try:
640 sock = socket.socket(af, socktype, proto)
641 except socket.error, msg:
642 sock = None
643 err = msg
644 continue
645 try:
646 sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDTIMEO, struct.pack("ii", timeout, 0))
647 sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVTIMEO, struct.pack("ii", timeout, 0))
648 sock.connect(sa)
649 except socket.error, msg:
650 sock.close()
651 sock = None
652 err = msg
653 continue
654 break
655 except socket.error, msg:
656 sock = None
657 err = msg
658 if sock is None:
659 self.logger.log(Logger.ERROR, "Cannot open "+url)
660 self.logger.log(Logger.ERROR, "SocketError: "+str(err))
661 return None
662 try:
663 sock.send("GET "+path+" HTTP/1.0\n\n");
664 data = sock.recv(8192)
665 file = StringIO.StringIO(data)
666 httpStatus = 0
667 while True:
668 line = file.readline().strip()
669 if line == "":
670 break # exit at the end of file or at the first empty line (finish of http headers)
671 r = re.compile("HTTP/\d.\d (\d+)").match(line)
672 if r != None:
673 httpStatus = int(r.group(1))
674 if httpStatus == 200:
675 return file
676 else:
677 self.logger.log(Logger.ERROR, "Cannot open "+url)
678 if httpStatus == 401:
679 self.logger.log(Logger.ERROR, 'HTTPError: not authorized ['+str(httpStatus)+']')
680 elif httpStatus == 404:
681 self.logger.log(Logger.ERROR, 'HTTPError: not found ['+str(httpStatus)+']')
682 elif httpStatus == 503:
683 self.logger.log(Logger.ERROR, 'HTTPError: service unavailable ['+str(httpStatus)+']')
684 else:
685 self.logger.log(Logger.ERROR, 'HTTPError: unknown error ['+str(httpStatus)+']')
686 return None
687 except socket.error, msg:
688 self.logger.log(Logger.ERROR, "Cannot open "+url)
689 self.logger.log(Logger.ERROR, "SocketError: "+str(msg))
690 sock.close()
691 return None
692
693 def __directSendParams (self, destination, clusterName, nodeName, timeStamp, params):
694
695 if self.__shouldSend() == False:
696 # self.logger.log(Logger.ERROR, "Dropping packet since rate is too fast!");
697 self.logger.log(Logger.INFO, "Pausing 1sec since rate is too fast!");
698 time.sleep(1.0)
699 # return;
700
701 if destination == None:
702 self.logger.log(Logger.WARNING, "Destination is None");
703 return;
704
705 host, port, passwd = destination
706 crtSenderRef = self.senderRef[destination]
707 crtSenderRef['SEQ_NR'] = (crtSenderRef['SEQ_NR'] + 1) % 2000000000; # wrap around 2 mld
708
709 xdrPacker = xdrlib.Packer ()
710
711 xdrPacker.pack_string ("v:"+self.__version+"p:"+passwd)
712
713 xdrPacker.pack_int (crtSenderRef['INSTANCE_ID'])
714 xdrPacker.pack_int (crtSenderRef['SEQ_NR'])
715
716 xdrPacker.pack_string (clusterName)
717 xdrPacker.pack_string (nodeName)
718
719 sent_params_nr = 0
720 paramsPacker = xdrlib.Packer ()
721
722 if type(params) == type( {} ):
723 for name, value in params.iteritems():
724 if self.__packParameter(paramsPacker, name, value):
725 sent_params_nr += 1
726 elif type(params) == type( [] ):
727 for name, value in params:
728 self.logger.log(Logger.DEBUG, "Adding parameter "+name+" = "+str(value));
729 if self.__packParameter(paramsPacker, name, value):
730 sent_params_nr += 1
731 else:
732 self.logger.log(Logger.WARNING, "Unsupported params type in sendParameters: " + str(type(params)));
733
734 xdrPacker.pack_int (sent_params_nr)
735
736 if (timeStamp != None) and (timeStamp > 0):
737 paramsPacker.pack_int(timeStamp);
738
739 buffer = xdrPacker.get_buffer() + paramsPacker.get_buffer()
740 self.logger.log(Logger.NOTICE, "Building XDR packet ["+str(clusterName)+"/"+str(nodeName)+"] <"+str(crtSenderRef['SEQ_NR'])+"/"+str(crtSenderRef['INSTANCE_ID'])+"> "+str(sent_params_nr)+" params, "+str(len(buffer))+" bytes.");
741 # send this buffer to the destination, using udp datagrams
742 try:
743 self.__udpSocket.sendto(buffer, (host, port))
744 self.logger.log(Logger.NOTICE, "Packet sent to "+host+":"+str(port)+" "+passwd)
745 except socket.error, msg:
746 self.logger.log(Logger.ERROR, "Cannot send packet to "+host+":"+str(port)+" "+passwd+": "+str(msg[1]))
747 xdrPacker.reset()
748 paramsPacker.reset()
749
750 def __packParameter(self, xdrPacker, name, value):
751 if (name is None) or (name is ""):
752 self.logger.log(Logger.WARNING, "Undefined parameter name. Ignoring value "+str(value))
753 return False
754 if (value is None):
755 self.logger.log(Logger.WARNING, "Ignore " + str(name)+ " parameter because of None value")
756 return False
757 try:
758 typeValue = self.__valueTypes[type(value)]
759 xdrPacker.pack_string (name)
760 xdrPacker.pack_int (typeValue)
761 self.__packFunctions[typeValue] (xdrPacker, value)
762 self.logger.log(Logger.DEBUG, "Adding parameter "+str(name)+" = "+str(value))
763 return True
764 except Exception, ex:
765 self.logger.log(Logger.WARNING, "Error packing %s = %s; got %s" % (name, str(value), ex))
766 return False
767
768 # Destructor
769 def __del__(self):
770 if not self.__freed:
771 self.free();
772
773 # Decide if the current datagram should be sent.
774 # This decision is based on the number of messages previously sent.
775 def __shouldSend(self):
776 now = long(time.time());
777 if now != self.__crtTime :
778 # new time
779 # update previous counters;
780 self.__prvSent = self.__hWeight * self.__prvSent + (1.0 - self.__hWeight) * self.__crtSent / (now - self.__crtTime);
781 self.__prvTime = self.__crtTime;
782 self.logger.log(Logger.DEBUG, "previously sent: " + str(self.__crtSent) + "; dropped: " + str(self.__crtDrop));
783 # reset current counter
784 self.__crtTime = now;
785 self.__crtSent = 0;
786 self.__crtDrop = 0;
787
788 # compute the history
789 valSent = self.__prvSent * self.__hWeight + self.__crtSent * (1 - self.__hWeight);
790
791 doSend = True;
792
793 # when we should start dropping messages
794 level = self.maxMsgRate - self.maxMsgRate / 10;
795
796 if valSent > (self.maxMsgRate - level) :
797 if random.randint(0,self.maxMsgRate / 10) >= (self.maxMsgRate - valSent):
798 doSend = False;
799
800 # counting sent and dropped messages
801 if doSend:
802 self.__crtSent+=1;
803 else:
804 self.__crtDrop+=1;
805
806 return doSend;
807
808 ################################################################################################
809 # Private variables. Don't touch
810 ################################################################################################
811
812 __valueTypes = {
813 type("string"): 0, # XDR_STRING (see ApMon.h from C/C++ ApMon version)
814 type(1): 2, # XDR_INT32
815 type(1.0): 5}; # XDR_REAL64
816
817 __packFunctions = {
818 0: xdrlib.Packer.pack_string,
819 2: xdrlib.Packer.pack_int,
820 5: xdrlib.Packer.pack_double }
821
822 __defaultPort = 8884
823 __version = "2.2.4-py" # apMon version number
824