ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/ProcInfo.py
Revision: 1.1
Committed: Tue Aug 30 17:05:59 2005 UTC (19 years, 8 months ago) by corvo
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_1_0_1, CRAB_1_0_0_rc1, CRAB_1_0_0_beta4, CRAB_1_0_0_pre1_boss_2, CRAB_1_0_0_pre1_boss
Log Message:
Integration with Monalisa monitoring

File Contents

# Content
1
2 """
3 * ApMon - Application Monitoring Tool
4 * Version: 2.0.4
5 *
6 * Copyright (C) 2005 California Institute of Technology
7 *
8 * Permission is hereby granted, free of charge, to use, copy and modify
9 * this software and its documentation (the "Software") for any
10 * purpose, provided that existing copyright notices are retained in
11 * all copies and that this notice is included verbatim in any distributions
12 * or substantial portions of the Software.
13 * This software is a part of the MonALISA framework (http://monalisa.cacr.caltech.edu).
14 * Users of the Software are asked to feed back problems, benefits,
15 * and/or suggestions about the software to the MonALISA Development Team
16 * (developers@monalisa.cern.ch). Support for this software - fixing of bugs,
17 * incorporation of new features - is done on a best effort basis. All bug
18 * fixes and enhancements will be made available under the same terms and
19 * conditions as the original software,
20
21 * IN NO EVENT SHALL THE AUTHORS OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
22 * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
23 * OF THE USE OF THIS SOFTWARE, ITS DOCUMENTATION, OR ANY DERIVATIVES THEREOF,
24 * EVEN IF THE AUTHORS HAVE BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
25
26 * THE AUTHORS AND DISTRIBUTORS SPECIFICALLY DISCLAIM ANY WARRANTIES,
27 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY,
28 * FITNESS FOR A PARTICULAR PURPOSE, AND NON-INFRINGEMENT. THIS SOFTWARE IS
29 * PROVIDED ON AN "AS IS" BASIS, AND THE AUTHORS AND DISTRIBUTORS HAVE NO
30 * OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR
31 * MODIFICATIONS.
32 """
33
34
35 import os
36 import re
37 import time
38 import socket
39 import Logger
40
41 """
42 Class ProcInfo
43 extracts information from the proc/ filesystem for system and job monitoring
44 """
45 class ProcInfo:
46 # ProcInfo constructor
47 def __init__ (this, logger):
48 this.DATA = {}; # monitored data that is going to be reported
49 this.OLD_RAW = {}; # helper hashes from which is derived the
50 this.NEW_RAW = {}; # information in DATA for some of the measurements.
51 this.LAST_UPDATE_TIME = 0; # when the last measurement was done
52 this.JOBS = {}; # jobs that will be monitored
53 this.logger = logger # use the given logger
54
55 # This should be called from time to time to update the monitored data,
56 # but not more often than once a second because of the resolution of time()
57 def update (this):
58 if this.LAST_UPDATE_TIME == int(time.time()):
59 this.logger.log(Logger.NOTICE, "ProcInfo: update() called too often!");
60 return;
61 this.readStat();
62 this.readMemInfo();
63 this.readLoadAvg();
64 this.countProcesses();
65 this.readGenericInfo();
66 this.readNetworkInfo();
67 for pid in this.JOBS.keys():
68 this.readJobInfo(pid);
69 this.readJobDiskUsage(pid);
70 this.LAST_UPDATE_TIME = int(time.time());
71
72 # Call this to add another PID to be monitored
73 def addJobToMonitor (this, pid, workDir):
74 this.JOBS[pid] = {};
75 this.JOBS[pid]['WORKDIR'] = workDir;
76 this.JOBS[pid]['DATA'] = {};
77 print this.JOBS;
78
79 # Call this to stop monitoring a PID
80 def removeJobToMonitor (this, pid):
81 if this.JOBS.has_key(pid):
82 del this.JOBS[pid];
83
84 # Return a filtered hash containting the system-related parameters and values
85 def getSystemData (this, params):
86 return this.getFilteredData(this.DATA, params);
87
88 # Return a filtered hash containing the job-related parameters and values
89 def getJobData (this, pid, params):
90 if not this.JOBS.has_key(pid):
91 return [];
92 return this.getFilteredData(this.JOBS[pid]['DATA'], params);
93
94 ############################################################################################
95 # internal functions for system monitoring
96 ############################################################################################
97
98 # this has to be run twice (with the $lastUpdateTime updated) to get some useful results
99 # the information about pages_in/out and swap_in/out isn't available for 2.6 kernels (yet)
100 def readStat (this):
101 this.OLD_RAW = this.NEW_RAW.copy();
102 try:
103 FSTAT = open('/proc/stat');
104 line = FSTAT.readline();
105 while(line != ''):
106 if(line.startswith("cpu ")):
107 elem = re.split("\s+", line);
108 this.NEW_RAW['cpu_usr'] = float(elem[1]);
109 this.NEW_RAW['cpu_nice'] = float(elem[2]);
110 this.NEW_RAW['cpu_sys'] = float(elem[3]);
111 this.NEW_RAW['cpu_idle'] = float(elem[4]);
112 if(line.startswith("page")):
113 elem = line.split();
114 this.NEW_RAW['pages_in'] = float(elem[1]);
115 this.NEW_RAW['pages_out'] = float(elem[2]);
116 if(line.startswith('swap')):
117 elem = line.split();
118 this.NEW_RAW['swap_in'] = float(elem[1]);
119 this.NEW_RAW['swap_out'] = float(elem[2]);
120 line = FSTAT.readline();
121 FSTAT.close();
122 except IOError, ex:
123 this.logger.log(Logger.ERROR, "ProcInfo: cannot open /proc/stat");
124 return;
125 if(len(this.OLD_RAW.keys()) == 0):
126 return;
127 diff = {};
128 cpu_sum = 0;
129 for key in (this.NEW_RAW.keys()):
130 #this.logger.log(Logger.DEBUG, "key = " + key);
131 if key.startswith('cpu_') or key.startswith('pages_') or key.startswith('swap_'):
132 #this.logger.log(Logger.DEBUG, "old = "+`this.OLD_RAW[key]`+" new = "+`this.NEW_RAW[key]`);
133 diff[key] = this.NEW_RAW[key] - this.OLD_RAW[key];
134 if key.startswith('cpu_'):
135 #this.logger.log(Logger.DEBUG, "diff=" + `diff[key]`);
136 cpu_sum += diff[key];
137 for key in ('cpu_usr', 'cpu_nice', 'cpu_sys', 'cpu_idle'):
138 this.DATA[key] = 100.0 * diff[key] / cpu_sum;
139 this.DATA['cpu_usage'] = 100.0 * (diff['cpu_usr'] + diff['cpu_sys'] + diff['cpu_nice']) / cpu_sum;
140
141 if this.NEW_RAW.has_key('pages_in'):
142 now = int(time.time());
143 for key in ('pages_in', 'pages_out', 'swap_in', 'swap_out'):
144 this.DATA[key] = diff[key] / (now - this.LAST_UPDATE_TIME);
145
146 # sizes are reported in MB (except _usage that is in percent).
147 def readMemInfo (this):
148 try:
149 FMEM = open('/proc/meminfo');
150 line = FMEM.readline();
151 while(line != ''):
152 elem = re.split("\s+", line);
153 if(line.startswith("MemFree:")):
154 this.DATA['mem_free'] = float(elem[1]) / 1024.0;
155 if(line.startswith("MemTotal:")):
156 this.DATA['total_mem'] = float(elem[1]) / 1024.0;
157 if(line.startswith("SwapFree:")):
158 this.DATA['swap_free'] = float(elem[1]) / 1024.0;
159 if(line.startswith("SwapTotal:")):
160 this.DATA['total_swap'] = float(elem[1]) / 1024.0;
161 line = FMEM.readline();
162 FMEM.close();
163 this.DATA['mem_used'] = this.DATA['total_mem'] - this.DATA['mem_free'];
164 this.DATA['swap_used'] = this.DATA['total_swap'] - this.DATA['swap_free'];
165 this.DATA['mem_usage'] = 100.0 * this.DATA['mem_used'] / this.DATA['total_mem'];
166 this.DATA['swap_usage'] = 100.0 * this.DATA['swap_used'] / this.DATA['total_swap'];
167 except IOError, ex:
168 this.logger.log(Logger.ERROR, "ProcInfo: cannot open /proc/meminfo");
169 return;
170
171 # read system load average
172 def readLoadAvg (this):
173 try:
174 FAVG = open('/proc/loadavg');
175 line = FAVG.readline();
176 FAVG.close();
177 elem = re.split("\s+", line);
178 this.DATA['load1'] = float(elem[0]);
179 this.DATA['load5'] = float(elem[1]);
180 this.DATA['load15'] = float(elem[2]);
181 except IOError, ex:
182 this.logger.log(Logger.ERROR, "ProcInfo: cannot open /proc/meminfo");
183 return;
184
185 # read the number of processes currently running on the system
186 def countProcesses (this):
187 nr = 0;
188 try:
189 for file in os.listdir("/proc"):
190 if re.match("\d+", file):
191 nr += 1;
192 this.DATA['processes'] = nr;
193 except IOError, ex:
194 this.logger.log(Logger.ERROR, "ProcInfo: cannot open /proc to count processes");
195 return;
196
197 # reads the IP, hostname, cpu_MHz, uptime
198 def readGenericInfo (this):
199 this.DATA['hostname'] = socket.getfqdn();
200 try:
201 output = os.popen('/sbin/ifconfig -a')
202 eth, ip = '', '';
203 line = output.readline();
204 while(line != ''):
205 line = line.strip();
206 if line.startswith("eth"):
207 elem = line.split();
208 eth = elem[0];
209 ip = '';
210 if len(eth) > 0 and line.startswith("inet addr:"):
211 ip = re.match("inet addr:(\d+\.\d+\.\d+\.\d+)", line).group(1);
212 this.DATA[eth + '_ip'] = ip;
213 eth = '';
214 line = output.readline();
215 output.close();
216 except IOError, ex:
217 this.logger.log(Logger.ERROR, "ProcInfo: cannot get output from /sbin/ifconfig -a");
218 return;
219 try:
220 no_cpus = 0;
221 FCPU = open('/proc/cpuinfo');
222 line = FCPU.readline();
223 while(line != ''):
224 if line.startswith("cpu MHz"):
225 this.DATA['cpu_MHz'] = float(re.match("cpu MHz\s+:\s+(\d+\.?\d*)", line).group(1));
226 no_cpus += 1;
227 line = FCPU.readline();
228 FCPU.close();
229 this.DATA['no_CPUs'] = no_cpus;
230 except IOError, ex:
231 this.logger.log(Logger.ERROR, "ProcInfo: cannot open /proc/cpuinfo");
232 return;
233 try:
234 FUPT = open('/proc/uptime');
235 line = FUPT.readline();
236 FUPT.close();
237 elem = line.split();
238 this.DATA['uptime'] = float(elem[0]) / (24.0 * 3600);
239 except IOError, ex:
240 this.logger.log(Logger.ERROR, "ProcInfo: cannot open /proc/uptime");
241 return;
242
243 # read network information like transfered kBps and nr. of errors on each interface
244 def readNetworkInfo (this):
245 this.OLD_RAW = this.NEW_RAW;
246 interval = int(time.time()) - this.LAST_UPDATE_TIME;
247 try:
248 FNET = open('/proc/net/dev');
249 line = FNET.readline();
250 while(line != ''):
251 m = re.match("\s*eth(\d):(\d+)\s+\d+\s+(\d+)\s+\d+\s+\d+\s+\d+\s+\d+\s+\d+\s+(\d+)\s+\d+\s+(\d+)", line);
252 if m != None:
253 this.NEW_RAW['eth'+m.group(1)+'_in'] = float(m.group(2));
254 this.NEW_RAW['eth'+m.group(1)+'_out'] = float(m.group(4));
255 this.DATA['eth'+m.group(1)+'_errs'] = int(m.group(3)) + int(m.group(5));
256 if(this.OLD_RAW.has_key('eth'+m.group(1)+"_in")):
257 #this.logger.log(Logger.DEBUG, "Net I/O eth"+m.group(1)+" interval = "+ `interval`);
258 Bps = (this.NEW_RAW['eth'+m.group(1)+'_in'] - this.OLD_RAW['eth'+m.group(1)+'_in']) / interval;
259 this.DATA['eth'+m.group(1)+'_in'] = Bps / 1024.0;
260 Bps = (this.NEW_RAW['eth'+m.group(1)+'_out'] - this.OLD_RAW['eth'+m.group(1)+'_out']) / interval;
261 this.DATA['eth'+m.group(1)+'_out'] = Bps / 1024.0;
262 line = FNET.readline();
263 FNET.close();
264 except IOError, ex:
265 this.logger.log(Logger.ERROR, "ProcInfo: cannot open /proc/net/dev");
266 return;
267
268 ##############################################################################################
269 # job monitoring related functions
270 ##############################################################################################
271
272 # internal function that gets the full list of children (pids) for a process (pid)
273 def getChildren (this, parent):
274 pidmap = {};
275 try:
276 output = os.popen('ps --no-headers -eo "pid ppid"');
277 line = output.readline();
278 while(line != ''):
279 line = line.strip();
280 elem = re.split("\s+", line);
281 pidmap[elem[0]] = elem[1];
282 line = output.readline();
283 output.close();
284 except IOError, ex:
285 this.logger.log(Logger.ERROR, "ProcInfo: cannot execute ps --no-headers -eo \"pid ppid\"");
286
287 if not pidmap.has_key(parent):
288 this.logger.log(Logger.INFO, 'ProcInfo: No job with pid='+str(parent));
289 this.removeJobToMonitor(parent);
290 return [];
291
292 children = [parent];
293 i = 0;
294 while(i < len(children)):
295 prnt = children[i];
296 for (pid, ppid) in pidmap.items():
297 if ppid == prnt:
298 children.append(pid);
299 i += 1;
300 return children;
301
302 # internal function that parses a time formatted like "days-hours:min:sec" and returns the corresponding
303 # number of seconds.
304 def parsePSTime (this, my_time):
305 my_time = my_time.strip();
306 if m != None:
307 m = re.match("(\d+)-(\d+):(\d+):(\d+)", my_time);
308 return int(m.group(1)) * 24 * 3600 + int(m.group(2)) * 3600 + int(m.group(3)) * 60 + int(m.group(4));
309 else:
310 m = re.match("(\d+):(\d+):(\d+)", my_time);
311 if(m != None):
312 return int(m.group(1)) * 3600 + int(m.group(2)) * 60 + int(m.group(3));
313 else:
314 m = re.match("(\d+):(\d+)", my_time);
315 if(m != None):
316 return int(m.group(1)) * 60 + int(m.group(2));
317 else:
318 return 0;
319
320 # read information about this the JOB_PID process
321 # memory sizes are given in KB
322 def readJobInfo (this, pid):
323 if (pid == '') or this.JOBS.has_key(pid):
324 return;
325 children = this.getChildren(pid);
326 if(len(children) == 0):
327 this.logger.log(Logger.INFO, "ProcInfo: Job with pid="+str(pid)+" terminated; removing it from monitored jobs.");
328 this.removeJobToMonitor(pid);
329 return;
330 try:
331 JSTATUS = os.popen("ps --no-headers --pid " + ",".join([`child` for child in children]) + " -o etime,time,%cpu,%mem,rsz,vsz,comm");
332 mem_cmd_map = {};
333 etime, cputime, pcpu, pmem, rsz, vsz, comm = 0, 0, 0, 0, 0, 0, 0;
334 line = JSTATUS.readline();
335 while(line != ''):
336 line = line.strip();
337 m = re.match("(\S+)\s+(\S+)\s+(\S+)\s+(\S+)\s+(\S+)\s+(\S+)\s+(.+)", line);
338 if m != None:
339 etime1, cputime1, pcpu1, pmem1, rsz1, vsz1, comm1 = m.group(1), m.group(2), m.group(3), m.group(4), m.group(5), m.group(6), m.group(7);
340 sec = this.parsePSTime(etime1);
341 if sec > etime: # the elapsed time is the maximum of all elapsed
342 etime = sec;
343 sec = this.parsePSTime(cputime1); # times corespornding to all child processes.
344 cputime += sec; # total cputime is the sum of cputimes for all processes.
345 pcpu += float(pcpu1); # total %cpu is the sum of all children %cpu.
346 if not mem_cmd_map.has_key(`pmem1`+" "+`rsz1`+" "+`vsz1`+" "+`comm1`):
347 # it's the first thread/process with this memory footprint; add it.
348 mem_cmd_map[`pmem1`+" "+`rsz1`+" "+`vsz1`+" "+`comm1`] = 1;
349 pmem += float(pmem1); rsz += int(rsz1); vsz += int(vsz1);
350 # else not adding memory usage
351 line = JSTATUS.readline();
352 JSTATUS.close();
353 this.JOBS[pid]['DATA']['run_time'] = etime;
354 this.JOBS[pid]['DATA']['cpu_time'] = cputime;
355 this.JOBS[pid]['DATA']['cpu_usage'] = pcpu;
356 this.JOBS[pid]['DATA']['mem_usage'] = pmem;
357 this.JOBS[pid]['DATA']['rss'] = rsz;
358 this.JOBS[pid]['DATA']['virtualmem'] = vsz;
359 except IOError, ex:
360 this.logger.log(Logger.ERROR, "ProcInfo: cannot execute ps --no-headers -eo \"pid ppid\"");
361
362 # if there is an work directory defined, then compute the used space in that directory
363 # and the free disk space on the partition to which that directory belongs
364 # sizes are given in MB
365 def readJobDiskUsage (this, pid):
366 if (pid == '') or this.JOBS.has_key(pid):
367 return;
368 workDir = this.JOBS[pid]['WORKDIR'];
369 if workDir == '':
370 return;
371 try:
372 DU = os.popen("du -Lscm "+workDir+" | tail -1 | cut -f 1");
373 line = DU.readline();
374 this.JOBS[pid]['DATA']['workdir_size'] = int(line);
375 except IOError, ex:
376 this.logger.log(Logger.ERROR, "ERROR", "ProcInfo: cannot run du to get job's disk usage for job "+`pid`);
377 try:
378 DF = os.popen("df -m "+workDir+" | tail -1");
379 line = DF.readline().strip();
380 m = re.match("\S+\s+(\S+)\s+(\S+)\s+(\S+)\s+(\S+)%", line);
381 if m != None:
382 this.JOBS[pid]['DATA']['disk_total'] = m.group(1);
383 this.JOBS[pid]['DATA']['disk_used'] = m.group(2);
384 this.JOBS[pid]['DATA']['disk_free'] = m.group(3);
385 this.JOBS[pid]['DATA']['disk_usage'] = m.group(4);
386 DF.close();
387 except IOError, ex:
388 this.logger.log(Logger.ERROR, "ERROR", "ProcInfo: cannot run df to get job's disk usage for job "+`pid`);
389
390 # Return a hash containing (param,value) pairs with existing values from the requested ones
391 def getFilteredData (this, dataHash, paramsList):
392 result = {};
393 for param in paramsList:
394 m = re.match("^net_(.*)$", param);
395 if m == None:
396 m = re.match("^(ip)$", param);
397 if m != None:
398 net_param = m.group(1);
399 #this.logger.log(Logger.DEBUG, "Querying param "+net_param);
400 for key, value in dataHash.items():
401 m = re.match("eth\d_"+net_param, key);
402 if m != None:
403 result[key] = value;
404 else:
405 if dataHash.has_key(param):
406 result[param] = dataHash[param];
407 return result;
408
409 ######################################################################################
410 # self test
411
412 if __name__ == '__main__':
413 logger = this.logger(Logger.DEBUG);
414 pi = ProcInfo(logger);
415 print "first update";
416 pi.update();
417 print "Sleeping to accumulate";
418 time.sleep(1);
419 pi.update();
420 print "System Monitoring:";
421
422 sys_cpu_params = ['cpu_usr', 'cpu_sys', 'cpu_idle', 'cpu_nice', 'cpu_usage'];
423 sys_2_4_params = ['pages_in', 'pages_out', 'swap_in', 'swap_out'];
424 sys_mem_params = ['mem_used', 'mem_free', 'total_mem', 'mem_usage'];
425 sys_swap_params = ['swap_used', 'swap_free', 'total_swap', 'swap_usage'];
426 sys_load_params = ['load1', 'load5', 'load15', 'processes', 'uptime'];
427 sys_gen_params = ['hostname', 'cpu_MHz', 'no_CPUs'];
428 sys_net_params = ['net_in', 'net_out', 'net_errs', 'ip'];
429
430 print "sys_cpu_params", pi.getSystemData(sys_cpu_params);
431 print "sys_2_4_params", pi.getSystemData(sys_2_4_params);
432 print "sys_mem_params", pi.getSystemData(sys_mem_params);
433 print "sys_swap_params", pi.getSystemData(sys_swap_params);
434 print "sys_load_params", pi.getSystemData(sys_load_params);
435 print "sys_gen_params", pi.getSystemData(sys_gen_params);
436 print "sys_net_params", pi.getSystemData(sys_net_params);
437
438 print "Job (mysefl) monitoring:";
439 pi.addJobToMonitor(os.getpid(), os.getcwd());
440 print "Sleep another second";
441 time.sleep(1);
442 pi.update();
443
444 job_cpu_params = ['run_time', 'cpu_time', 'cpu_usage'];
445 job_mem_params = ['mem_usage', 'rss', 'virtualmem'];
446 job_disk_params = ['workdir_size', 'disk_used', 'disk_free', 'disk_total', 'disk_usage'];
447
448 print "job_cpu_params", pi.getJobData(os.getpid(), job_cpu_params);
449 print "job_mem_params", pi.getJobData(os.getpid(), job_mem_params);
450 print "job_disk_params", pi.getJobData(os.getpid(), job_disk_params);
451
452 pi.removeJobToMonitor(os.getpid());
453