ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/ProcInfo.py
Revision: 1.1
Committed: Tue Aug 30 17:05:59 2005 UTC (19 years, 8 months ago) by corvo
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_1_0_1, CRAB_1_0_0_rc1, CRAB_1_0_0_beta4, CRAB_1_0_0_pre1_boss_2, CRAB_1_0_0_pre1_boss
Log Message:
Integration with Monalisa monitoring

File Contents

# User Rev Content
1 corvo 1.1
2     """
3     * ApMon - Application Monitoring Tool
4     * Version: 2.0.4
5     *
6     * Copyright (C) 2005 California Institute of Technology
7     *
8     * Permission is hereby granted, free of charge, to use, copy and modify
9     * this software and its documentation (the "Software") for any
10     * purpose, provided that existing copyright notices are retained in
11     * all copies and that this notice is included verbatim in any distributions
12     * or substantial portions of the Software.
13     * This software is a part of the MonALISA framework (http://monalisa.cacr.caltech.edu).
14     * Users of the Software are asked to feed back problems, benefits,
15     * and/or suggestions about the software to the MonALISA Development Team
16     * (developers@monalisa.cern.ch). Support for this software - fixing of bugs,
17     * incorporation of new features - is done on a best effort basis. All bug
18     * fixes and enhancements will be made available under the same terms and
19     * conditions as the original software,
20    
21     * IN NO EVENT SHALL THE AUTHORS OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
22     * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
23     * OF THE USE OF THIS SOFTWARE, ITS DOCUMENTATION, OR ANY DERIVATIVES THEREOF,
24     * EVEN IF THE AUTHORS HAVE BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
25    
26     * THE AUTHORS AND DISTRIBUTORS SPECIFICALLY DISCLAIM ANY WARRANTIES,
27     * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY,
28     * FITNESS FOR A PARTICULAR PURPOSE, AND NON-INFRINGEMENT. THIS SOFTWARE IS
29     * PROVIDED ON AN "AS IS" BASIS, AND THE AUTHORS AND DISTRIBUTORS HAVE NO
30     * OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR
31     * MODIFICATIONS.
32     """
33    
34    
35     import os
36     import re
37     import time
38     import socket
39     import Logger
40    
41     """
42     Class ProcInfo
43     extracts information from the proc/ filesystem for system and job monitoring
44     """
45     class ProcInfo:
46     # ProcInfo constructor
47     def __init__ (this, logger):
48     this.DATA = {}; # monitored data that is going to be reported
49     this.OLD_RAW = {}; # helper hashes from which is derived the
50     this.NEW_RAW = {}; # information in DATA for some of the measurements.
51     this.LAST_UPDATE_TIME = 0; # when the last measurement was done
52     this.JOBS = {}; # jobs that will be monitored
53     this.logger = logger # use the given logger
54    
55     # This should be called from time to time to update the monitored data,
56     # but not more often than once a second because of the resolution of time()
57     def update (this):
58     if this.LAST_UPDATE_TIME == int(time.time()):
59     this.logger.log(Logger.NOTICE, "ProcInfo: update() called too often!");
60     return;
61     this.readStat();
62     this.readMemInfo();
63     this.readLoadAvg();
64     this.countProcesses();
65     this.readGenericInfo();
66     this.readNetworkInfo();
67     for pid in this.JOBS.keys():
68     this.readJobInfo(pid);
69     this.readJobDiskUsage(pid);
70     this.LAST_UPDATE_TIME = int(time.time());
71    
72     # Call this to add another PID to be monitored
73     def addJobToMonitor (this, pid, workDir):
74     this.JOBS[pid] = {};
75     this.JOBS[pid]['WORKDIR'] = workDir;
76     this.JOBS[pid]['DATA'] = {};
77     print this.JOBS;
78    
79     # Call this to stop monitoring a PID
80     def removeJobToMonitor (this, pid):
81     if this.JOBS.has_key(pid):
82     del this.JOBS[pid];
83    
84     # Return a filtered hash containting the system-related parameters and values
85     def getSystemData (this, params):
86     return this.getFilteredData(this.DATA, params);
87    
88     # Return a filtered hash containing the job-related parameters and values
89     def getJobData (this, pid, params):
90     if not this.JOBS.has_key(pid):
91     return [];
92     return this.getFilteredData(this.JOBS[pid]['DATA'], params);
93    
94     ############################################################################################
95     # internal functions for system monitoring
96     ############################################################################################
97    
98     # this has to be run twice (with the $lastUpdateTime updated) to get some useful results
99     # the information about pages_in/out and swap_in/out isn't available for 2.6 kernels (yet)
100     def readStat (this):
101     this.OLD_RAW = this.NEW_RAW.copy();
102     try:
103     FSTAT = open('/proc/stat');
104     line = FSTAT.readline();
105     while(line != ''):
106     if(line.startswith("cpu ")):
107     elem = re.split("\s+", line);
108     this.NEW_RAW['cpu_usr'] = float(elem[1]);
109     this.NEW_RAW['cpu_nice'] = float(elem[2]);
110     this.NEW_RAW['cpu_sys'] = float(elem[3]);
111     this.NEW_RAW['cpu_idle'] = float(elem[4]);
112     if(line.startswith("page")):
113     elem = line.split();
114     this.NEW_RAW['pages_in'] = float(elem[1]);
115     this.NEW_RAW['pages_out'] = float(elem[2]);
116     if(line.startswith('swap')):
117     elem = line.split();
118     this.NEW_RAW['swap_in'] = float(elem[1]);
119     this.NEW_RAW['swap_out'] = float(elem[2]);
120     line = FSTAT.readline();
121     FSTAT.close();
122     except IOError, ex:
123     this.logger.log(Logger.ERROR, "ProcInfo: cannot open /proc/stat");
124     return;
125     if(len(this.OLD_RAW.keys()) == 0):
126     return;
127     diff = {};
128     cpu_sum = 0;
129     for key in (this.NEW_RAW.keys()):
130     #this.logger.log(Logger.DEBUG, "key = " + key);
131     if key.startswith('cpu_') or key.startswith('pages_') or key.startswith('swap_'):
132     #this.logger.log(Logger.DEBUG, "old = "+`this.OLD_RAW[key]`+" new = "+`this.NEW_RAW[key]`);
133     diff[key] = this.NEW_RAW[key] - this.OLD_RAW[key];
134     if key.startswith('cpu_'):
135     #this.logger.log(Logger.DEBUG, "diff=" + `diff[key]`);
136     cpu_sum += diff[key];
137     for key in ('cpu_usr', 'cpu_nice', 'cpu_sys', 'cpu_idle'):
138     this.DATA[key] = 100.0 * diff[key] / cpu_sum;
139     this.DATA['cpu_usage'] = 100.0 * (diff['cpu_usr'] + diff['cpu_sys'] + diff['cpu_nice']) / cpu_sum;
140    
141     if this.NEW_RAW.has_key('pages_in'):
142     now = int(time.time());
143     for key in ('pages_in', 'pages_out', 'swap_in', 'swap_out'):
144     this.DATA[key] = diff[key] / (now - this.LAST_UPDATE_TIME);
145    
146     # sizes are reported in MB (except _usage that is in percent).
147     def readMemInfo (this):
148     try:
149     FMEM = open('/proc/meminfo');
150     line = FMEM.readline();
151     while(line != ''):
152     elem = re.split("\s+", line);
153     if(line.startswith("MemFree:")):
154     this.DATA['mem_free'] = float(elem[1]) / 1024.0;
155     if(line.startswith("MemTotal:")):
156     this.DATA['total_mem'] = float(elem[1]) / 1024.0;
157     if(line.startswith("SwapFree:")):
158     this.DATA['swap_free'] = float(elem[1]) / 1024.0;
159     if(line.startswith("SwapTotal:")):
160     this.DATA['total_swap'] = float(elem[1]) / 1024.0;
161     line = FMEM.readline();
162     FMEM.close();
163     this.DATA['mem_used'] = this.DATA['total_mem'] - this.DATA['mem_free'];
164     this.DATA['swap_used'] = this.DATA['total_swap'] - this.DATA['swap_free'];
165     this.DATA['mem_usage'] = 100.0 * this.DATA['mem_used'] / this.DATA['total_mem'];
166     this.DATA['swap_usage'] = 100.0 * this.DATA['swap_used'] / this.DATA['total_swap'];
167     except IOError, ex:
168     this.logger.log(Logger.ERROR, "ProcInfo: cannot open /proc/meminfo");
169     return;
170    
171     # read system load average
172     def readLoadAvg (this):
173     try:
174     FAVG = open('/proc/loadavg');
175     line = FAVG.readline();
176     FAVG.close();
177     elem = re.split("\s+", line);
178     this.DATA['load1'] = float(elem[0]);
179     this.DATA['load5'] = float(elem[1]);
180     this.DATA['load15'] = float(elem[2]);
181     except IOError, ex:
182     this.logger.log(Logger.ERROR, "ProcInfo: cannot open /proc/meminfo");
183     return;
184    
185     # read the number of processes currently running on the system
186     def countProcesses (this):
187     nr = 0;
188     try:
189     for file in os.listdir("/proc"):
190     if re.match("\d+", file):
191     nr += 1;
192     this.DATA['processes'] = nr;
193     except IOError, ex:
194     this.logger.log(Logger.ERROR, "ProcInfo: cannot open /proc to count processes");
195     return;
196    
197     # reads the IP, hostname, cpu_MHz, uptime
198     def readGenericInfo (this):
199     this.DATA['hostname'] = socket.getfqdn();
200     try:
201     output = os.popen('/sbin/ifconfig -a')
202     eth, ip = '', '';
203     line = output.readline();
204     while(line != ''):
205     line = line.strip();
206     if line.startswith("eth"):
207     elem = line.split();
208     eth = elem[0];
209     ip = '';
210     if len(eth) > 0 and line.startswith("inet addr:"):
211     ip = re.match("inet addr:(\d+\.\d+\.\d+\.\d+)", line).group(1);
212     this.DATA[eth + '_ip'] = ip;
213     eth = '';
214     line = output.readline();
215     output.close();
216     except IOError, ex:
217     this.logger.log(Logger.ERROR, "ProcInfo: cannot get output from /sbin/ifconfig -a");
218     return;
219     try:
220     no_cpus = 0;
221     FCPU = open('/proc/cpuinfo');
222     line = FCPU.readline();
223     while(line != ''):
224     if line.startswith("cpu MHz"):
225     this.DATA['cpu_MHz'] = float(re.match("cpu MHz\s+:\s+(\d+\.?\d*)", line).group(1));
226     no_cpus += 1;
227     line = FCPU.readline();
228     FCPU.close();
229     this.DATA['no_CPUs'] = no_cpus;
230     except IOError, ex:
231     this.logger.log(Logger.ERROR, "ProcInfo: cannot open /proc/cpuinfo");
232     return;
233     try:
234     FUPT = open('/proc/uptime');
235     line = FUPT.readline();
236     FUPT.close();
237     elem = line.split();
238     this.DATA['uptime'] = float(elem[0]) / (24.0 * 3600);
239     except IOError, ex:
240     this.logger.log(Logger.ERROR, "ProcInfo: cannot open /proc/uptime");
241     return;
242    
243     # read network information like transfered kBps and nr. of errors on each interface
244     def readNetworkInfo (this):
245     this.OLD_RAW = this.NEW_RAW;
246     interval = int(time.time()) - this.LAST_UPDATE_TIME;
247     try:
248     FNET = open('/proc/net/dev');
249     line = FNET.readline();
250     while(line != ''):
251     m = re.match("\s*eth(\d):(\d+)\s+\d+\s+(\d+)\s+\d+\s+\d+\s+\d+\s+\d+\s+\d+\s+(\d+)\s+\d+\s+(\d+)", line);
252     if m != None:
253     this.NEW_RAW['eth'+m.group(1)+'_in'] = float(m.group(2));
254     this.NEW_RAW['eth'+m.group(1)+'_out'] = float(m.group(4));
255     this.DATA['eth'+m.group(1)+'_errs'] = int(m.group(3)) + int(m.group(5));
256     if(this.OLD_RAW.has_key('eth'+m.group(1)+"_in")):
257     #this.logger.log(Logger.DEBUG, "Net I/O eth"+m.group(1)+" interval = "+ `interval`);
258     Bps = (this.NEW_RAW['eth'+m.group(1)+'_in'] - this.OLD_RAW['eth'+m.group(1)+'_in']) / interval;
259     this.DATA['eth'+m.group(1)+'_in'] = Bps / 1024.0;
260     Bps = (this.NEW_RAW['eth'+m.group(1)+'_out'] - this.OLD_RAW['eth'+m.group(1)+'_out']) / interval;
261     this.DATA['eth'+m.group(1)+'_out'] = Bps / 1024.0;
262     line = FNET.readline();
263     FNET.close();
264     except IOError, ex:
265     this.logger.log(Logger.ERROR, "ProcInfo: cannot open /proc/net/dev");
266     return;
267    
268     ##############################################################################################
269     # job monitoring related functions
270     ##############################################################################################
271    
272     # internal function that gets the full list of children (pids) for a process (pid)
273     def getChildren (this, parent):
274     pidmap = {};
275     try:
276     output = os.popen('ps --no-headers -eo "pid ppid"');
277     line = output.readline();
278     while(line != ''):
279     line = line.strip();
280     elem = re.split("\s+", line);
281     pidmap[elem[0]] = elem[1];
282     line = output.readline();
283     output.close();
284     except IOError, ex:
285     this.logger.log(Logger.ERROR, "ProcInfo: cannot execute ps --no-headers -eo \"pid ppid\"");
286    
287     if not pidmap.has_key(parent):
288     this.logger.log(Logger.INFO, 'ProcInfo: No job with pid='+str(parent));
289     this.removeJobToMonitor(parent);
290     return [];
291    
292     children = [parent];
293     i = 0;
294     while(i < len(children)):
295     prnt = children[i];
296     for (pid, ppid) in pidmap.items():
297     if ppid == prnt:
298     children.append(pid);
299     i += 1;
300     return children;
301    
302     # internal function that parses a time formatted like "days-hours:min:sec" and returns the corresponding
303     # number of seconds.
304     def parsePSTime (this, my_time):
305     my_time = my_time.strip();
306     if m != None:
307     m = re.match("(\d+)-(\d+):(\d+):(\d+)", my_time);
308     return int(m.group(1)) * 24 * 3600 + int(m.group(2)) * 3600 + int(m.group(3)) * 60 + int(m.group(4));
309     else:
310     m = re.match("(\d+):(\d+):(\d+)", my_time);
311     if(m != None):
312     return int(m.group(1)) * 3600 + int(m.group(2)) * 60 + int(m.group(3));
313     else:
314     m = re.match("(\d+):(\d+)", my_time);
315     if(m != None):
316     return int(m.group(1)) * 60 + int(m.group(2));
317     else:
318     return 0;
319    
320     # read information about this the JOB_PID process
321     # memory sizes are given in KB
322     def readJobInfo (this, pid):
323     if (pid == '') or this.JOBS.has_key(pid):
324     return;
325     children = this.getChildren(pid);
326     if(len(children) == 0):
327     this.logger.log(Logger.INFO, "ProcInfo: Job with pid="+str(pid)+" terminated; removing it from monitored jobs.");
328     this.removeJobToMonitor(pid);
329     return;
330     try:
331     JSTATUS = os.popen("ps --no-headers --pid " + ",".join([`child` for child in children]) + " -o etime,time,%cpu,%mem,rsz,vsz,comm");
332     mem_cmd_map = {};
333     etime, cputime, pcpu, pmem, rsz, vsz, comm = 0, 0, 0, 0, 0, 0, 0;
334     line = JSTATUS.readline();
335     while(line != ''):
336     line = line.strip();
337     m = re.match("(\S+)\s+(\S+)\s+(\S+)\s+(\S+)\s+(\S+)\s+(\S+)\s+(.+)", line);
338     if m != None:
339     etime1, cputime1, pcpu1, pmem1, rsz1, vsz1, comm1 = m.group(1), m.group(2), m.group(3), m.group(4), m.group(5), m.group(6), m.group(7);
340     sec = this.parsePSTime(etime1);
341     if sec > etime: # the elapsed time is the maximum of all elapsed
342     etime = sec;
343     sec = this.parsePSTime(cputime1); # times corespornding to all child processes.
344     cputime += sec; # total cputime is the sum of cputimes for all processes.
345     pcpu += float(pcpu1); # total %cpu is the sum of all children %cpu.
346     if not mem_cmd_map.has_key(`pmem1`+" "+`rsz1`+" "+`vsz1`+" "+`comm1`):
347     # it's the first thread/process with this memory footprint; add it.
348     mem_cmd_map[`pmem1`+" "+`rsz1`+" "+`vsz1`+" "+`comm1`] = 1;
349     pmem += float(pmem1); rsz += int(rsz1); vsz += int(vsz1);
350     # else not adding memory usage
351     line = JSTATUS.readline();
352     JSTATUS.close();
353     this.JOBS[pid]['DATA']['run_time'] = etime;
354     this.JOBS[pid]['DATA']['cpu_time'] = cputime;
355     this.JOBS[pid]['DATA']['cpu_usage'] = pcpu;
356     this.JOBS[pid]['DATA']['mem_usage'] = pmem;
357     this.JOBS[pid]['DATA']['rss'] = rsz;
358     this.JOBS[pid]['DATA']['virtualmem'] = vsz;
359     except IOError, ex:
360     this.logger.log(Logger.ERROR, "ProcInfo: cannot execute ps --no-headers -eo \"pid ppid\"");
361    
362     # if there is an work directory defined, then compute the used space in that directory
363     # and the free disk space on the partition to which that directory belongs
364     # sizes are given in MB
365     def readJobDiskUsage (this, pid):
366     if (pid == '') or this.JOBS.has_key(pid):
367     return;
368     workDir = this.JOBS[pid]['WORKDIR'];
369     if workDir == '':
370     return;
371     try:
372     DU = os.popen("du -Lscm "+workDir+" | tail -1 | cut -f 1");
373     line = DU.readline();
374     this.JOBS[pid]['DATA']['workdir_size'] = int(line);
375     except IOError, ex:
376     this.logger.log(Logger.ERROR, "ERROR", "ProcInfo: cannot run du to get job's disk usage for job "+`pid`);
377     try:
378     DF = os.popen("df -m "+workDir+" | tail -1");
379     line = DF.readline().strip();
380     m = re.match("\S+\s+(\S+)\s+(\S+)\s+(\S+)\s+(\S+)%", line);
381     if m != None:
382     this.JOBS[pid]['DATA']['disk_total'] = m.group(1);
383     this.JOBS[pid]['DATA']['disk_used'] = m.group(2);
384     this.JOBS[pid]['DATA']['disk_free'] = m.group(3);
385     this.JOBS[pid]['DATA']['disk_usage'] = m.group(4);
386     DF.close();
387     except IOError, ex:
388     this.logger.log(Logger.ERROR, "ERROR", "ProcInfo: cannot run df to get job's disk usage for job "+`pid`);
389    
390     # Return a hash containing (param,value) pairs with existing values from the requested ones
391     def getFilteredData (this, dataHash, paramsList):
392     result = {};
393     for param in paramsList:
394     m = re.match("^net_(.*)$", param);
395     if m == None:
396     m = re.match("^(ip)$", param);
397     if m != None:
398     net_param = m.group(1);
399     #this.logger.log(Logger.DEBUG, "Querying param "+net_param);
400     for key, value in dataHash.items():
401     m = re.match("eth\d_"+net_param, key);
402     if m != None:
403     result[key] = value;
404     else:
405     if dataHash.has_key(param):
406     result[param] = dataHash[param];
407     return result;
408    
409     ######################################################################################
410     # self test
411    
412     if __name__ == '__main__':
413     logger = this.logger(Logger.DEBUG);
414     pi = ProcInfo(logger);
415     print "first update";
416     pi.update();
417     print "Sleeping to accumulate";
418     time.sleep(1);
419     pi.update();
420     print "System Monitoring:";
421    
422     sys_cpu_params = ['cpu_usr', 'cpu_sys', 'cpu_idle', 'cpu_nice', 'cpu_usage'];
423     sys_2_4_params = ['pages_in', 'pages_out', 'swap_in', 'swap_out'];
424     sys_mem_params = ['mem_used', 'mem_free', 'total_mem', 'mem_usage'];
425     sys_swap_params = ['swap_used', 'swap_free', 'total_swap', 'swap_usage'];
426     sys_load_params = ['load1', 'load5', 'load15', 'processes', 'uptime'];
427     sys_gen_params = ['hostname', 'cpu_MHz', 'no_CPUs'];
428     sys_net_params = ['net_in', 'net_out', 'net_errs', 'ip'];
429    
430     print "sys_cpu_params", pi.getSystemData(sys_cpu_params);
431     print "sys_2_4_params", pi.getSystemData(sys_2_4_params);
432     print "sys_mem_params", pi.getSystemData(sys_mem_params);
433     print "sys_swap_params", pi.getSystemData(sys_swap_params);
434     print "sys_load_params", pi.getSystemData(sys_load_params);
435     print "sys_gen_params", pi.getSystemData(sys_gen_params);
436     print "sys_net_params", pi.getSystemData(sys_net_params);
437    
438     print "Job (mysefl) monitoring:";
439     pi.addJobToMonitor(os.getpid(), os.getcwd());
440     print "Sleep another second";
441     time.sleep(1);
442     pi.update();
443    
444     job_cpu_params = ['run_time', 'cpu_time', 'cpu_usage'];
445     job_mem_params = ['mem_usage', 'rss', 'virtualmem'];
446     job_disk_params = ['workdir_size', 'disk_used', 'disk_free', 'disk_total', 'disk_usage'];
447    
448     print "job_cpu_params", pi.getJobData(os.getpid(), job_cpu_params);
449     print "job_mem_params", pi.getJobData(os.getpid(), job_mem_params);
450     print "job_disk_params", pi.getJobData(os.getpid(), job_disk_params);
451    
452     pi.removeJobToMonitor(os.getpid());
453