ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/apmon.py
(Generate patch)

Comparing COMP/CRAB/python/apmon.py (file contents):
Revision 1.2 by corvo, Tue Nov 8 16:02:39 2005 UTC vs.
Revision 1.3 by corvo, Wed Mar 8 17:10:49 2006 UTC

# Line 1 | Line 1
1  
2   """
3   * ApMon - Application Monitoring Tool
4 < * Version: 2.0.4
4 > * Version: 2.2.1
5   *
6 < * Copyright (C) 2005 California Institute of Technology
6 > * Copyright (C) 2006 California Institute of Technology
7   *
8   * Permission is hereby granted, free of charge, to use, copy and modify
9   * this software and its documentation (the "Software") for any
# Line 54 | Line 54 | import time
54   import Logger
55   import ProcInfo
56   import random
57 + import copy
58  
59   #__all__ = ["ApMon"]
60  
61 < #__debug = False   # set this to True to be verbose
61 > #__debug = False   # self.destPrevData[destination];set this to True to be verbose
62  
63   class ApMon:
64          """
# Line 127 | Line 128 | class ApMon:
128                  'sys_net_in'    : True,       # network transfer in kBps
129                  'sys_net_out'   : True,       # these will produce params called ethX_in, ethX_out, ethX_errs
130                  'sys_net_errs'  : False,      # for each eth interface
131 +                'sys_net_sockets' : True,     # number of opened sockets for each proto => sockets_tcp/udp/unix ...
132 +                'sys_net_tcp_details' : True, # number of tcp sockets in each state => sockets_tcp_LISTEN, ...
133                  'sys_processes' : True,
134                  'sys_uptime'    : True,       # uptime of the machine, in days (float number)
135                  
# Line 145 | Line 148 | class ApMon:
148                  'cpu_model_name': True,
149                  'bogomips'      : True};
150  
151 <        def __init__ (self, initValue):
151 >        def __init__ (self, initValue, defaultLogLevel = Logger.INFO):
152                  """
153                  Class constructor:
154                  - if initValue is a string, put it in configAddresses and load destinations
# Line 166 | Line 169 | class ApMon:
169                    will overwrite the default parameters (see __defaultOptions)
170                  """
171                  self.destinations = {}              # empty, by default; key = tuple (host, port, pass) ; val = hash {"param_mame" : True/False, ...}
172 +                self.destPrevData = {}              # empty, by defaul; key = tuple (host, port, pass) ; val = hash {"param_mame" : value, ...}
173                  self.configAddresses = []           # empty, by default; list of files/urls from where we read config
174                  self.configRecheckInterval = 120    # 2 minutes
175                  self.configRecheck = True           # enabled by default
176                  self.performBgMonitoring = True     # by default, perform background monitoring
177                  self.monitoredJobs = {}             # Monitored jobs; key = pid; value = hash with
178 <                self.maxMsgRate = 50;               # Maximum number of messages allowed to be sent per second
178 >                self.maxMsgRate = 20;                   # Maximum number of messages allowed to be sent per second
179 >                self.sender = {'INSTANCE_ID': random.randint(0,0x7FFFFFFE), 'SEQ_NR': 0};
180                  self.__defaultUserCluster = "ApMon_UserSend";
181                  self.__defaultUserNode = socket.getfqdn();
182                  self.__defaultSysMonCluster = "ApMon_SysMon";
183                  self.__defaultSysMonNode = socket.getfqdn();
184                  # don't touch these:
185 +                self.__freed = False
186                  self.__initializedOK = True
187                  self.__udpSocket = None
188                  self.__configUpdateLock = threading.Lock()
# Line 191 | Line 197 | class ApMon:
197                  self.__crtSent = 0;
198                  self.__crtDrop = 0;
199                  self.__hWeight = 0.92;
200 <        
195 <                self.logger = Logger.Logger(self.__defaultLogLevel)
200 >                self.logger = Logger.Logger(defaultLogLevel)
201                  if type(initValue) == type("string"):
202                          self.configAddresses.append(initValue)
203                          self.__reloadAddresses()
# Line 209 | Line 214 | class ApMon:
214                  self.__initializedOK = (len (self.destinations) > 0)
215                  if not self.__initializedOK:
216                          self.logger.log(Logger.ERROR, "Failed to initialize. No destination defined.");
217 <                #self.__defaultClusterName = None
218 <                #self.__defaultNodeName = self.getMyHo
217 >                # self.__defaultClusterName = None
218 >                # self.__defaultNodeName = self.getMyHo
219                  if self.__initializedOK:
220                          self.__udpSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
221                          if len(self.configAddresses) > 0:
# Line 221 | Line 226 | class ApMon:
226                                  th.start()
227                          # create the ProcInfo instance
228                          self.procInfo = ProcInfo.ProcInfo(self.logger);
229 <                        self.procInfo.update();
229 >                        # self.procInfo.update();
230                          # start the background monitoring thread
231                          th = threading.Thread(target=self.__bgMonitor);
232                          th.setDaemon(True);
# Line 283 | Line 288 | class ApMon:
288                  if not self.__initializedOK:
289                          self.logger.log(Logger.WARNING, "Not initialized correctly. Message NOT sent!");
290                          return
291 <                if clusterName == None:
291 >                if (clusterName == None) or (clusterName == ""):
292                          clusterName = self.__defaultUserCluster
293                  else:
294                          self.__defaultUserCluster = clusterName
# Line 291 | Line 296 | class ApMon:
296                          nodeName = self.__defaultUserNode
297                  else:
298                          self.__defaultUserNode = nodeName
299 <                self.__configUpdateLock.acquire()
299 >                self.__configUpdateLock.acquire();
300                  for dest in self.destinations.keys():
301 <                        self.__directSendParams(dest, clusterName, nodeName, timeStamp, params);
302 <                self.__configUpdateLock.release()
301 >                        self.__directSendParams(self.sender, dest, clusterName, nodeName, timeStamp, params);
302 >                self.__configUpdateLock.release();
303          
304          def addJobToMonitor (self, pid, workDir, clusterName, nodeName):
305                  """
# Line 321 | Line 326 | class ApMon:
326                  Set the cluster and node names where to send system related information.
327                  """
328                  self.__bgMonitorLock.acquire();
329 <                self.__defaultSysMonCluster = clusterName;
330 <                self.__defaultSysMonNode = nodeName;
329 >                if (clusterName != None) and (clusterName != ""):
330 >                    self.__defaultSysMonCluster = clusterName;
331 >                if (nodeName != None) and (nodeName != ""):
332 >                    self.__defaultSysMonNode = nodeName;
333                  self.__bgMonitorLock.release();
334          
335          def enableBgMonitoring (self, onOff):
# Line 331 | Line 338 | class ApMon:
338                  can still be sent if user calls the sendBgMonitoring method.
339                  """
340                  self.performBgMonitoring = onOff;
341 <        
342 <        def sendBgMonitoring (self):
341 >
342 >        def sendBgMonitoring (self, mustSend = False):
343                  """
344 <                Send now background monitoring about system and jobs to all interested destinations.
344 >                Send background monitoring about system and jobs to all interested destinations.
345 >                If mustSend == True, the information is sent regardles of the elapsed time since last sent
346 >                If mustSend == False, the data is sent only if the required interval has passed since last sent
347                  """
348                  self.__bgMonitorLock.acquire();
340                self.procInfo.update();
349                  now = int(time.time());
350 <                for destination, options in self.destinations.items():
350 >                updatedProcInfo = False;
351 >                for destination, options in self.destinations.iteritems():
352                          sysParams = [];
353                          jobParams = [];
354 +                        prevRawData = self.destPrevData[destination];
355                          # for each destination and its options, check if we have to report any background monitoring data
356 <                        if(options['sys_monitoring'] and options['sys_data_sent'] + options['sys_interval'] < now):
356 >                        if(options['sys_monitoring'] and (mustSend or options['sys_data_sent'] + options['sys_interval'] <= now)):
357                                  for param, active in options.items():
358                                          m = re.match("sys_(.+)", param);
359                                          if(m != None and active):
# Line 351 | Line 361 | class ApMon:
361                                                  if not (param == 'monitoring' or param == 'interval' or param == 'data_sent'):
362                                                          sysParams.append(param)
363                                  options['sys_data_sent'] = now;
364 <                        if(options['job_monitoring'] and options['job_data_sent'] + options['job_interval'] < now):
364 >                        if(options['job_monitoring'] and (mustSend or options['job_data_sent'] + options['job_interval'] <= now)):
365                                  for param, active in options.items():
366                                          m = re.match("job_(.+)", param);
367                                          if(m != None and active):
# Line 359 | Line 369 | class ApMon:
369                                                  if not (param == 'monitoring' or param == 'interval' or param == 'data_sent'):
370                                                          jobParams.append(param);
371                                  options['job_data_sent'] = now;
372 <                        if(options['general_info'] and options['general_data_sent'] + 2 * int(options['sys_interval']) < now):
372 >                        if(options['general_info'] and (mustSend or options['general_data_sent'] + 2 * int(options['sys_interval']) <= now)):
373                                  for param, active in options.items():
374                                          if not (param.startswith("sys_") or param.startswith("job_")) and active:
375                                                  if not (param == 'general_info' or param == 'general_data_sent'):
376                                                          sysParams.append(param);
377 +                                options['general_data_sent'] = now;
378 +                                
379 +                        if (not updatedProcInfo) and ((len(sysParams) > 0) or (len(jobParams) > 0) ):
380 +                                self.procInfo.update();
381 +                                updatedProcInf = True;
382 +
383                          sysResults = {}
384                          if(len(sysParams) > 0):
385 <                                sysResults = self.procInfo.getSystemData(sysParams);
386 <                        if(len(sysResults.keys()) > 0):
387 <                                self.__directSendParams(destination, self.__defaultSysMonCluster, self.__defaultSysMonNode, -1, sysResults);
385 >                                sysResults = self.procInfo.getSystemData(sysParams, prevRawData);
386 >                                self.updateLastSentTime(options, self.destinations[destination]);
387 >                        if( (type(sysResults) == type( {} )) and len(sysResults.keys()) > 0):
388 >                                self.__directSendParams(self.sender, destination, self.__defaultSysMonCluster, self.__defaultSysMonNode, -1, sysResults);
389 >                        if( (type(sysResults) == type( [] )) and len(sysResults) > 0):
390 >                                self.__directSendParams(self.sender, destination, self.__defaultSysMonCluster, self.__defaultSysMonNode, -1, sysResults);
391                          for pid, props in self.monitoredJobs.items():
392                                  jobResults = {};
393                                  if(len(jobParams) > 0):
394                                          jobResults = self.procInfo.getJobData(pid, jobParams);
395                                  if(len(jobResults) > 0):
396 <                                        self.__directSendParams(destination, props['CLUSTER_NAME'], props['NODE_NAME'], -1, jobResults);
396 >                                        self.__directSendParams(self.sender, destination, props['CLUSTER_NAME'], props['NODE_NAME'], -1, jobResults);
397                  self.__bgMonitorLock.release();
398          
399 +        # copy the time when last data was sent
400 +        def updateLastSentTime(self, srcOpts, dstOpts):
401 +                if srcOpts.has_key('general_data_sent'):
402 +                        dstOpts['general_data_sent'] = srcOpts['general_data_sent'];
403 +                if  srcOpts.has_key('sys_data_sent'):
404 +                        dstOpts['sys_data_sent'] = srcOpts['sys_data_sent'];
405 +                if srcOpts.has_key('job_data_sent'):
406 +                        dstOpts['job_data_sent'] = srcOpts['job_data_sent'];
407 +        
408          def setLogLevel (self, strLevel):
409                  """
410                  Change the log level. Given level is a string, one of 'FATAL', 'ERROR', 'WARNING',
# Line 396 | Line 424 | class ApMon:
424                  Stop background threands, close opened sockets. You have to use this function if you want to
425                  free all the resources that ApMon takes, and allow it to be garbage-collected.
426                  """
427 <                self.__configUpdateEvent.set();
428 <                self.__bgMonitorEvent.set();
427 >                if self.__configUpdateEvent != None:
428 >                        self.__configUpdateEvent.set();
429 >                if self.__bgMonitorEvent != None:
430 >                        self.__bgMonitorEvent.set();
431                  time.sleep(0.01);
432                  if self.__udpSocket != None:
433                          self.logger.log(Logger.DEBUG, "Closing UDP socket on ApMon object destroy.");
434                          self.__udpSocket.close();
435 +                        self.__udpSocket = None;
436 +                self.__freed = True
437  
438  
439          #########################################################################################
# Line 471 | Line 503 | class ApMon:
503                          if (h == host) and (p == port):
504                                  alreadyAdded = True
505                                  break
506 +                destination = (host, port, passwd);
507                  if not alreadyAdded:
508                          self.logger.log(Logger.INFO, "Adding destination "+host+':'+`port`+' '+passwd);
509 <                        tempDestinations[(host, port, passwd)] = self.__defaultOptions;
509 >                        tempDestinations[destination] = copy.deepcopy(self.__defaultOptions); # have a different set of options for each dest.
510 >                        self.destPrevData[destination] = {};
511                          if options != self.__defaultOptions:
512                                  # we have to overwrite defaults with given options
513                                  for key, value in options.items():
514                                          self.logger.log(Logger.NOTICE, "Overwritting option: "+key+" = "+`value`);
515 <                                        tempDestinations[(host, port, passwd)][key] = value;
515 >                                        tempDestinations[destination][key] = value;
516                  else:
517                          self.logger.log(Logger.NOTICE, "Destination "+host+":"+str(port)+" "+passwd+" already added. Skipping it");
518  
# Line 551 | Line 585 | class ApMon:
585                                                          opts[param] = value;
586                          else:
587                                  dests.append(line);
588 +                                
589                  confFile.close ()
590                  for line in dests:
591                          self.__addDestination(line, tempDestinations, opts)
# Line 565 | Line 600 | class ApMon:
600                          if self.__bgMonitorEvent.isSet():
601                                  return;
602                          if self.performBgMonitoring:
603 <                                self.sendBgMonitoring();
603 >                                self.sendBgMonitoring() # send only if the interval has elapsed
604  
605          ###############################################################################################
606          # Internal helper functions
607          ###############################################################################################
608          
609 <        def __directSendParams (self, destination, clusterName, nodeName, timeStamp, params):
609 >        def __directSendParams (self, senderRef, destination, clusterName, nodeName, timeStamp, params):
610 >                
611 >                if senderRef=={}:
612 >                        self.logger.log(Logger.WARNING, "Not sending undefined parameters!");
613 >                        return;
614                  
615                  if self.__shouldSend() == False:
616                          return;
617                  
618 <                xdrPacker = xdrlib.Packer ()
618 >                if destination == None:
619 >                        self.logger.log(Logger.WARNING, "Destination is None");
620 >                        return;
621 >                
622                  host, port, passwd = destination
623 +                senderRef['SEQ_NR'] = (senderRef['SEQ_NR'] + 1) % 2000000000; # wrap around 2 mld
624 +                
625 +                xdrPacker = xdrlib.Packer ();
626                  self.logger.log(Logger.DEBUG, "Building XDR packet for ["+str(clusterName)+"] <"+str(nodeName)+"> len:"+str(len(params)));
627 +                
628                  xdrPacker.pack_string ("v:"+self.__version+"p:"+passwd)
629 <                xdrPacker.pack_string (clusterName)
630 <                xdrPacker.pack_string (nodeName)
631 <                xdrPacker.pack_int (len(params))
629 >                
630 >                xdrPacker.pack_int (senderRef['INSTANCE_ID']);
631 >                xdrPacker.pack_int (senderRef['SEQ_NR']);
632 >                
633 >                xdrPacker.pack_string (clusterName);
634 >                xdrPacker.pack_string (nodeName);
635 >                xdrPacker.pack_int (len(params));
636 >                
637                  if type(params) == type( {} ):
638                          for name, value in params.iteritems():
639                                  self.__packParameter(xdrPacker, name, value)
# Line 592 | Line 643 | class ApMon:
643                                  self.__packParameter(xdrPacker, name, value)
644                  else:
645                          self.logger.log(Logger.WARNING, "Unsupported params type in sendParameters: " + str(type(params)));
646 <                if(timeStamp > 0):
646 >                if (timeStamp != None) and (timeStamp > 0):
647                          xdrPacker.pack_int(timeStamp);
648 +                
649                  buffer = xdrPacker.get_buffer();
650                  # send this buffer to the destination, using udp datagrams
651                  try:
652                          self.__udpSocket.sendto(buffer, (host, port))
653 <                        self.logger.log(Logger.DEBUG, "Packet sent");
653 >                        self.logger.log(Logger.DEBUG, "Packet sent to "+host+":"+str(port)+" "+passwd);
654                  except socket.error, msg:
655                          self.logger.log(Logger.ERROR, "Cannot send packet to "+host+":"+str(port)+" "+passwd+": "+str(msg[1]));
656                  xdrPacker.reset()
657          
658          def __packParameter(self, xdrPacker, name, value):
659 +                if (name is None) or (name is ""):
660 +                        self.logger.log(Logger.WARNING, "Undefine parameter name.");
661 +                        return;
662 +                if (value is None):
663 +                        self.logger.log(Logger.WARNING, "Ignore " + str(name)+ " parameter because of None value");
664 +                        return;
665                  try:
666                          typeValue = self.__valueTypes[type(value)]
667                          xdrPacker.pack_string (name)
# Line 612 | Line 670 | class ApMon:
670                          self.logger.log(Logger.DEBUG, "Adding parameter "+str(name)+" = "+str(value));
671                  except Exception, ex:
672                          print "ApMon: error packing %s = %s; got %s" % (name, str(value), ex)
673 <        
673 >
674          # Destructor
675          def __del__(self):
676 <                self.free();
676 >                if not self.__freed:
677 >                        self.free();
678  
679          # Decide if the current datagram should be sent.
680          # This decision is based on the number of messages previously sent.
# Line 652 | Line 711 | class ApMon:
711  
712                  return doSend;
713  
655        
714          ################################################################################################
715          # Private variables. Don't touch
716          ################################################################################################
# Line 668 | Line 726 | class ApMon:
726                  5: xdrlib.Packer.pack_double }
727          
728          __defaultPort = 8884
729 <        __defaultLogLevel = Logger.INFO
672 <        __version = "2.0.6-py"                  # apMon version number
729 >        __version = "2.2.1-py"                  # apMon version number
730  

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines