1 |
|
|
2 |
|
""" |
3 |
|
* ApMon - Application Monitoring Tool |
4 |
< |
* Version: 2.2.2 |
4 |
> |
* Version: 2.2.4 |
5 |
|
* |
6 |
|
* Copyright (C) 2006 California Institute of Technology |
7 |
|
* |
48 |
|
import re |
49 |
|
import xdrlib |
50 |
|
import socket |
51 |
< |
import urllib2 |
51 |
> |
import struct |
52 |
> |
import StringIO |
53 |
|
import threading |
54 |
|
import time |
55 |
|
import Logger |
59 |
|
|
60 |
|
#__all__ = ["ApMon"] |
61 |
|
|
62 |
< |
#__debug = False # self.destPrevData[destination];set this to True to be verbose |
62 |
> |
#__debug = False # set this to True to be verbose |
63 |
|
|
64 |
|
class ApMon: |
65 |
|
""" |
88 |
|
|
89 |
|
__defaultOptions = { |
90 |
|
'job_monitoring': True, # perform (or not) job monitoring |
91 |
< |
'job_interval' : 10, # at this interval (in seconds) |
91 |
> |
'job_interval' : 120, # at this interval (in seconds) |
92 |
|
'job_data_sent' : 0, # time from Epoch when job information was sent; don't touch! |
93 |
|
|
94 |
|
'job_cpu_time' : True, # elapsed time from the start of this job in seconds |
105 |
|
'job_open_files': True, # number of open file descriptors |
106 |
|
|
107 |
|
'sys_monitoring': True, # perform (or not) system monitoring |
108 |
< |
'sys_interval' : 10, # at this interval (in seconds) |
108 |
> |
'sys_interval' : 120, # at this interval (in seconds) |
109 |
|
'sys_data_sent' : 0, # time from Epoch when system information was sent; don't touch! |
110 |
|
|
111 |
< |
'sys_cpu_usr' : False, # cpu-usage information |
112 |
< |
'sys_cpu_sys' : False, # all these will produce coresponding paramas without "sys_" |
113 |
< |
'sys_cpu_nice' : False, |
114 |
< |
'sys_cpu_idle' : False, |
111 |
> |
'sys_cpu_usr' : True, # cpu-usage information |
112 |
> |
'sys_cpu_sys' : True, # all these will produce coresponding paramas without "sys_" |
113 |
> |
'sys_cpu_nice' : True, |
114 |
> |
'sys_cpu_idle' : True, |
115 |
|
'sys_cpu_usage' : True, |
116 |
|
'sys_load1' : True, # system load information |
117 |
|
'sys_load5' : True, |
118 |
|
'sys_load15' : True, |
119 |
< |
'sys_mem_used' : False, # memory usage information |
120 |
< |
'sys_mem_free' : False, |
119 |
> |
'sys_mem_used' : True, # memory usage information |
120 |
> |
'sys_mem_free' : True, |
121 |
|
'sys_mem_usage' : True, |
122 |
< |
'sys_pages_in' : False, |
123 |
< |
'sys_pages_out' : False, |
122 |
> |
'sys_pages_in' : True, |
123 |
> |
'sys_pages_out' : True, |
124 |
|
'sys_swap_used' : True, # swap usage information |
125 |
< |
'sys_swap_free' : False, |
125 |
> |
'sys_swap_free' : True, |
126 |
|
'sys_swap_usage': True, |
127 |
< |
'sys_swap_in' : False, |
128 |
< |
'sys_swap_out' : False, |
127 |
> |
'sys_swap_in' : True, |
128 |
> |
'sys_swap_out' : True, |
129 |
|
'sys_net_in' : True, # network transfer in kBps |
130 |
|
'sys_net_out' : True, # these will produce params called ethX_in, ethX_out, ethX_errs |
131 |
< |
'sys_net_errs' : False, # for each eth interface |
131 |
> |
'sys_net_errs' : True, # for each eth interface |
132 |
|
'sys_net_sockets' : True, # number of opened sockets for each proto => sockets_tcp/udp/unix ... |
133 |
|
'sys_net_tcp_details' : True, # number of tcp sockets in each state => sockets_tcp_LISTEN, ... |
134 |
|
'sys_processes' : True, |
135 |
|
'sys_uptime' : True, # uptime of the machine, in days (float number) |
136 |
|
|
137 |
< |
'general_info' : True, # send (or not) general host information once every 2 $sys_interval seconds |
137 |
> |
'general_info' : True, # send (or not) general host information once every 2 x $sys_interval seconds |
138 |
|
'general_data_sent': 0, # time from Epoch when general information was sent; don't touch! |
139 |
|
|
140 |
|
'hostname' : True, |
171 |
|
""" |
172 |
|
self.destinations = {} # empty, by default; key = tuple (host, port, pass) ; val = hash {"param_mame" : True/False, ...} |
173 |
|
self.destPrevData = {} # empty, by defaul; key = tuple (host, port, pass) ; val = hash {"param_mame" : value, ...} |
174 |
+ |
self.senderRef = {} # key = tuple (host, port, pass); val = hash {'INSTANCE_ID', 'SEQ_NR' } |
175 |
|
self.configAddresses = [] # empty, by default; list of files/urls from where we read config |
176 |
< |
self.configRecheckInterval = 120 # 2 minutes |
176 |
> |
self.configRecheckInterval = 600 # 10 minutes |
177 |
|
self.configRecheck = True # enabled by default |
178 |
|
self.performBgMonitoring = True # by default, perform background monitoring |
179 |
|
self.monitoredJobs = {} # Monitored jobs; key = pid; value = hash with |
180 |
< |
self.maxMsgRate = 20; # Maximum number of messages allowed to be sent per second |
181 |
< |
self.sender = {'INSTANCE_ID': random.randint(0,0x7FFFFFFE), 'SEQ_NR': 0}; |
180 |
> |
self.maxMsgRate = 10 # Maximum number of messages allowed to be sent per second |
181 |
> |
self.__defaultSenderRef = {'INSTANCE_ID': random.randint(0,0x7FFFFFFE), 'SEQ_NR': 0}; |
182 |
|
self.__defaultUserCluster = "ApMon_UserSend"; |
183 |
|
self.__defaultUserNode = socket.getfqdn(); |
184 |
|
self.__defaultSysMonCluster = "ApMon_SysMon"; |
185 |
|
self.__defaultSysMonNode = socket.getfqdn(); |
186 |
|
# don't touch these: |
187 |
|
self.__freed = False |
186 |
– |
self.__initializedOK = True |
188 |
|
self.__udpSocket = None |
189 |
|
self.__configUpdateLock = threading.Lock() |
190 |
|
self.__configUpdateEvent = threading.Event() |
201 |
|
self.__crtDrop = 0; |
202 |
|
self.__hWeight = 0.92; |
203 |
|
self.logger = Logger.Logger(defaultLogLevel) |
204 |
< |
if type(initValue) == type("string"): |
205 |
< |
self.configAddresses.append(initValue) |
206 |
< |
self.__reloadAddresses() |
207 |
< |
elif type(initValue) == type([]): |
208 |
< |
self.configAddresses = initValue |
209 |
< |
self.__reloadAddresses() |
210 |
< |
elif type(initValue) == type(()): |
211 |
< |
for dest in initValue: |
212 |
< |
self.__addDestination (dest, self.destinations) |
213 |
< |
elif type(initValue) == type({}): |
214 |
< |
for dest, opts in initValue.items(): |
215 |
< |
self.__addDestination (dest, self.destinations, opts) |
216 |
< |
|
217 |
< |
self.__initializedOK = (len (self.destinations) > 0) |
218 |
< |
if not self.__initializedOK: |
218 |
< |
self.logger.log(Logger.ERROR, "Failed to initialize. No destination defined."); |
219 |
< |
# self.__defaultClusterName = None |
220 |
< |
# self.__defaultNodeName = self.getMyHo |
221 |
< |
if self.__initializedOK: |
222 |
< |
self.__udpSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) |
223 |
< |
if len(self.configAddresses) > 0: |
224 |
< |
# if there are addresses that need to be monitored, |
225 |
< |
# start config checking and reloading thread |
226 |
< |
th = threading.Thread(target=self.__configLoader) |
227 |
< |
th.setDaemon(True) # this is a daemon thread |
228 |
< |
th.start() |
229 |
< |
# create the ProcInfo instance |
230 |
< |
self.procInfo = ProcInfo.ProcInfo(self.logger); |
231 |
< |
# self.procInfo.update(); |
232 |
< |
# start the background monitoring thread |
233 |
< |
th = threading.Thread(target=self.__bgMonitor); |
234 |
< |
th.setDaemon(True); |
235 |
< |
th.start(); |
236 |
< |
|
204 |
> |
self.setDestinations(initValue) |
205 |
> |
self.__udpSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) |
206 |
> |
if len(self.configAddresses) > 0: |
207 |
> |
# if there are addresses that need to be monitored, |
208 |
> |
# start config checking and reloading thread |
209 |
> |
th = threading.Thread(target=self.__configLoader) |
210 |
> |
th.setDaemon(True) # this is a daemon thread |
211 |
> |
th.start() |
212 |
> |
# create the ProcInfo instance |
213 |
> |
self.procInfo = ProcInfo.ProcInfo(self.logger); |
214 |
> |
# self.procInfo.update(); |
215 |
> |
# start the background monitoring thread |
216 |
> |
th = threading.Thread(target=self.__bgMonitor); |
217 |
> |
th.setDaemon(True); |
218 |
> |
th.start(); |
219 |
|
|
220 |
|
def sendParams (self, params): |
221 |
|
""" |
269 |
|
|
270 |
|
NOTE that python doesn't know about 32-bit floats (only 64-bit floats!) |
271 |
|
""" |
290 |
– |
if not self.__initializedOK: |
291 |
– |
self.logger.log(Logger.WARNING, "Not initialized correctly. Message NOT sent!"); |
292 |
– |
return |
272 |
|
if (clusterName == None) or (clusterName == ""): |
273 |
|
clusterName = self.__defaultUserCluster |
274 |
|
else: |
277 |
|
nodeName = self.__defaultUserNode |
278 |
|
else: |
279 |
|
self.__defaultUserNode = nodeName |
280 |
+ |
if len(self.destinations) == 0: |
281 |
+ |
self.logger.log(Logger.WARNING, "Not sending parameters since no destination is defined."); |
282 |
+ |
return |
283 |
|
self.__configUpdateLock.acquire(); |
284 |
|
for dest in self.destinations.keys(): |
285 |
< |
self.__directSendParams(self.sender, dest, clusterName, nodeName, timeStamp, params); |
285 |
> |
self.__directSendParams(dest, clusterName, nodeName, timeStamp, params); |
286 |
|
self.__configUpdateLock.release(); |
287 |
|
|
288 |
|
def addJobToMonitor (self, pid, workDir, clusterName, nodeName): |
329 |
|
If mustSend == True, the information is sent regardles of the elapsed time since last sent |
330 |
|
If mustSend == False, the data is sent only if the required interval has passed since last sent |
331 |
|
""" |
332 |
+ |
if len(self.destinations) == 0: |
333 |
+ |
self.logger.log(Logger.WARNING, "Not sending bg monitoring info since no destination is defined."); |
334 |
+ |
return |
335 |
|
self.__bgMonitorLock.acquire(); |
336 |
|
now = int(time.time()); |
337 |
|
updatedProcInfo = False; |
363 |
|
sysParams.append(param); |
364 |
|
options['general_data_sent'] = now; |
365 |
|
|
366 |
< |
if (not updatedProcInfo) and ((len(sysParams) > 0) or (len(jobParams) > 0) ): |
366 |
> |
if (not updatedProcInfo) and (((len(sysParams) > 0) or (len(jobParams) > 0))): |
367 |
|
self.procInfo.update(); |
368 |
< |
updatedProcInf = True; |
368 |
> |
updatedProcInfo = True; |
369 |
|
|
370 |
|
sysResults = {} |
371 |
|
if(len(sysParams) > 0): |
372 |
< |
sysResults = self.procInfo.getSystemData(sysParams, prevRawData); |
373 |
< |
self.updateLastSentTime(options, self.destinations[destination]); |
374 |
< |
if( (type(sysResults) == type( {} )) and len(sysResults.keys()) > 0): |
390 |
< |
self.__directSendParams(self.sender, destination, self.__defaultSysMonCluster, self.__defaultSysMonNode, -1, sysResults); |
391 |
< |
if( (type(sysResults) == type( [] )) and len(sysResults) > 0): |
392 |
< |
self.__directSendParams(self.sender, destination, self.__defaultSysMonCluster, self.__defaultSysMonNode, -1, sysResults); |
372 |
> |
sysResults = self.procInfo.getSystemData(sysParams, prevRawData) |
373 |
> |
if(len(sysResults) > 0): |
374 |
> |
self.__directSendParams(destination, self.__defaultSysMonCluster, self.__defaultSysMonNode, -1, sysResults) |
375 |
|
for pid, props in self.monitoredJobs.items(): |
376 |
< |
jobResults = {}; |
376 |
> |
jobResults = {} |
377 |
|
if(len(jobParams) > 0): |
378 |
< |
jobResults = self.procInfo.getJobData(pid, jobParams); |
378 |
> |
jobResults = self.procInfo.getJobData(pid, jobParams) |
379 |
|
if(len(jobResults) > 0): |
380 |
< |
self.__directSendParams(self.sender, destination, props['CLUSTER_NAME'], props['NODE_NAME'], -1, jobResults); |
380 |
> |
self.__directSendParams(destination, props['CLUSTER_NAME'], props['NODE_NAME'], -1, jobResults) |
381 |
|
self.__bgMonitorLock.release(); |
382 |
+ |
|
383 |
+ |
def setDestinations(self, initValue): |
384 |
+ |
""" |
385 |
+ |
Set the destinations of the ApMon instance. It accepts the same parameters as the constructor. |
386 |
+ |
""" |
387 |
+ |
if type(initValue) == type("string"): |
388 |
+ |
self.configAddresses = [initValue] |
389 |
+ |
self.configRecheck = True |
390 |
+ |
self.configRecheckInterval = 600 |
391 |
+ |
self.__reloadAddresses() |
392 |
+ |
elif type(initValue) == type([]): |
393 |
+ |
self.configAddresses = initValue |
394 |
+ |
self.configRecheck = True |
395 |
+ |
self.configRecheckInterval = 600 |
396 |
+ |
self.__reloadAddresses() |
397 |
+ |
elif type(initValue) == type(()): |
398 |
+ |
self.configAddresses = [] |
399 |
+ |
for dest in initValue: |
400 |
+ |
self.__addDestination (dest, self.destinations) |
401 |
+ |
self.configRecheck = False |
402 |
+ |
elif type(initValue) == type({}): |
403 |
+ |
self.configAddresses = [] |
404 |
+ |
for dest, opts in initValue.items(): |
405 |
+ |
self.__addDestination (dest, self.destinations, opts) |
406 |
+ |
self.configRecheck = False |
407 |
|
|
408 |
< |
# copy the time when last data was sent |
409 |
< |
def updateLastSentTime(self, srcOpts, dstOpts): |
410 |
< |
if srcOpts.has_key('general_data_sent'): |
411 |
< |
dstOpts['general_data_sent'] = srcOpts['general_data_sent']; |
412 |
< |
if srcOpts.has_key('sys_data_sent'): |
413 |
< |
dstOpts['sys_data_sent'] = srcOpts['sys_data_sent']; |
414 |
< |
if srcOpts.has_key('job_data_sent'): |
408 |
< |
dstOpts['job_data_sent'] = srcOpts['job_data_sent']; |
409 |
< |
|
410 |
< |
def setLogLevel (self, strLevel): |
408 |
> |
def initializedOK(self): |
409 |
> |
""" |
410 |
> |
Retruns true if there is no destination where the parameters to be sent. |
411 |
> |
""" |
412 |
> |
return len(self.destinations) > 0 |
413 |
> |
|
414 |
> |
def setLogLevel(self, strLevel): |
415 |
|
""" |
416 |
|
Change the log level. Given level is a string, one of 'FATAL', 'ERROR', 'WARNING', |
417 |
|
'INFO', 'NOTICE', 'DEBUG'. |
446 |
|
######################################################################################### |
447 |
|
# Internal functions - Config reloader thread |
448 |
|
######################################################################################### |
449 |
< |
|
449 |
> |
|
450 |
|
def __configLoader(self): |
451 |
|
""" |
452 |
|
Main loop of the thread that checks for changes and reloads the configuration |
453 |
|
""" |
454 |
|
while not self.__configUpdateEvent.isSet(): |
455 |
< |
self.__configUpdateEvent.wait(self.configRecheckInterval); |
455 |
> |
self.__configUpdateEvent.wait(min(30, self.configRecheckInterval)) # don't recheck more often than 30 sec |
456 |
|
if self.__configUpdateEvent.isSet(): |
457 |
|
break |
458 |
|
if self.configRecheck: |
462 |
|
|
463 |
|
def __reloadAddresses(self): |
464 |
|
""" |
465 |
< |
Refresh destinations hash, by loading data from all sources in configAddresses |
465 |
> |
Refresh now the destinations hash, by loading data from all sources in configAddresses |
466 |
|
""" |
467 |
+ |
print "reloading addresses"; |
468 |
|
newDestinations = {} |
469 |
< |
for src in self.configAddresses: |
469 |
> |
urls = copy.deepcopy(self.configAddresses) |
470 |
> |
while(len(urls) > 0 and len(newDestinations) == 0): |
471 |
> |
src = random.choice(urls) |
472 |
> |
urls.remove(src) |
473 |
|
self.__initializeFromFile(src, newDestinations) |
474 |
|
# avoid changing config in the middle of sending packets to previous destinations |
475 |
|
self.__configUpdateLock.acquire() |
476 |
|
self.destinations = newDestinations |
477 |
|
self.__configUpdateLock.release() |
478 |
+ |
print "finished reloading addresses"; |
479 |
|
|
480 |
|
def __addDestination (self, aDestination, tempDestinations, options = __defaultOptions): |
481 |
|
""" |
511 |
|
return |
512 |
|
alreadyAdded = False |
513 |
|
port = int(port) |
514 |
< |
host = socket.gethostbyname(host) # convert hostnames to IP addresses to avoid suffocating DNSs |
514 |
> |
try: |
515 |
> |
host = socket.gethostbyname(host) # convert hostnames to IP addresses to avoid suffocating DNSs |
516 |
> |
except socket.error, msg: |
517 |
> |
self.logger.log(Logger.ERROR, "Error resolving "+host+": "+str(msg)) |
518 |
> |
return |
519 |
|
for h, p, w in tempDestinations.keys(): |
520 |
|
if (h == host) and (p == port): |
521 |
|
alreadyAdded = True |
522 |
|
break |
523 |
< |
destination = (host, port, passwd); |
523 |
> |
destination = (host, port, passwd) |
524 |
|
if not alreadyAdded: |
525 |
< |
self.logger.log(Logger.INFO, "Adding destination "+host+':'+`port`+' '+passwd); |
526 |
< |
tempDestinations[destination] = copy.deepcopy(self.__defaultOptions); # have a different set of options for each dest. |
527 |
< |
self.destPrevData[destination] = {}; |
525 |
> |
self.logger.log(Logger.INFO, "Adding destination "+host+':'+`port`+' '+passwd) |
526 |
> |
if(self.destinations.has_key(destination)): |
527 |
> |
tempDestinations[destination] = self.destinations[destination] # reuse previous options |
528 |
> |
else: |
529 |
> |
tempDestinations[destination] = copy.deepcopy(self.__defaultOptions) # have a different set of options for each dest |
530 |
> |
if not self.destPrevData.has_key(destination): |
531 |
> |
self.destPrevData[destination] = {} # set it empty only if it's really new |
532 |
> |
if not self.senderRef.has_key(destination): |
533 |
> |
self.senderRef[destination] = copy.deepcopy(self.__defaultSenderRef) # otherwise, don't reset this nr. |
534 |
|
if options != self.__defaultOptions: |
535 |
|
# we have to overwrite defaults with given options |
536 |
|
for key, value in options.items(): |
537 |
< |
self.logger.log(Logger.NOTICE, "Overwritting option: "+key+" = "+`value`); |
538 |
< |
tempDestinations[destination][key] = value; |
537 |
> |
self.logger.log(Logger.NOTICE, "Overwritting option: "+key+" = "+`value`) |
538 |
> |
tempDestinations[destination][key] = value |
539 |
|
else: |
540 |
|
self.logger.log(Logger.NOTICE, "Destination "+host+":"+str(port)+" "+passwd+" already added. Skipping it"); |
541 |
|
|
549 |
|
""" |
550 |
|
try: |
551 |
|
if confFileName.find ("http://") == 0: |
552 |
< |
confFile = urllib2.urlopen (confFileName) |
552 |
> |
confFile = self.__getURL(confFileName) |
553 |
> |
if confFile is None: |
554 |
> |
return |
555 |
|
else: |
556 |
|
confFile = open (confFileName) |
536 |
– |
except urllib2.HTTPError, e: |
537 |
– |
self.logger.log(Logger.ERROR, "Cannot open "+confFileName); |
538 |
– |
if e.code == 401: |
539 |
– |
self.logger.log(Logger.ERROR, 'HTTPError: not authorized.'); |
540 |
– |
elif e.code == 404: |
541 |
– |
self.logger.log(Logger.ERROR, 'HTTPError: not found.'); |
542 |
– |
elif e.code == 503: |
543 |
– |
self.logger.log(Logger.ERROR, 'HTTPError: service unavailable.'); |
544 |
– |
else: |
545 |
– |
self.logger.log(Logger.ERROR, 'HTTPError: unknown error.'); |
546 |
– |
return |
547 |
– |
except urllib2.URLError, ex: |
548 |
– |
self.logger.log(Logger.ERROR, "Cannot open "+confFileName); |
549 |
– |
self.logger.log(Logger.ERROR, "URL Error: "+str(ex.reason[1])); |
550 |
– |
return |
557 |
|
except IOError, ex: |
558 |
|
self.logger.log(Logger.ERROR, "Cannot open "+confFileName); |
559 |
|
self.logger.log(Logger.ERROR, "IOError: "+str(ex)); |
616 |
|
############################################################################################### |
617 |
|
# Internal helper functions |
618 |
|
############################################################################################### |
619 |
+ |
|
620 |
+ |
# this is a simplified replacement for urllib2 which doesn't support setting a timeout. |
621 |
+ |
# by default, if timeout is not specified, it waits 5 seconds |
622 |
+ |
def __getURL (self, url, timeout = 5): |
623 |
+ |
r = re.compile("http://([^:/]+)(:(\d+))?(/.*)").match(url) |
624 |
+ |
if r is None: |
625 |
+ |
self.logger.log(Logger.ERROR, "Cannot open "+url+". Incorrectly formed URL.") |
626 |
+ |
return None |
627 |
+ |
host = r.group(1) |
628 |
+ |
if r.group(3) == None: |
629 |
+ |
port = 80 # no port is given, pick the default 80 for HTTP |
630 |
+ |
else: |
631 |
+ |
port = int(r.group(3)) |
632 |
+ |
if r.group(4) == None: |
633 |
+ |
path = "" # no path is give, let server decide |
634 |
+ |
else: |
635 |
+ |
path = r.group(4) |
636 |
+ |
sock = None |
637 |
+ |
err = None |
638 |
+ |
try: |
639 |
+ |
for res in socket.getaddrinfo(host, port, socket.AF_UNSPEC, socket.SOCK_STREAM): |
640 |
+ |
af, socktype, proto, canonname, sa = res |
641 |
+ |
try: |
642 |
+ |
sock = socket.socket(af, socktype, proto) |
643 |
+ |
except socket.error, msg: |
644 |
+ |
sock = None |
645 |
+ |
err = msg |
646 |
+ |
continue |
647 |
+ |
try: |
648 |
+ |
sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDTIMEO, struct.pack("ii", timeout, 0)) |
649 |
+ |
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVTIMEO, struct.pack("ii", timeout, 0)) |
650 |
+ |
sock.connect(sa) |
651 |
+ |
except socket.error, msg: |
652 |
+ |
sock.close() |
653 |
+ |
sock = None |
654 |
+ |
err = msg |
655 |
+ |
continue |
656 |
+ |
break |
657 |
+ |
except socket.error, msg: |
658 |
+ |
sock = None |
659 |
+ |
err = msg |
660 |
+ |
if sock is None: |
661 |
+ |
self.logger.log(Logger.ERROR, "Cannot open "+url) |
662 |
+ |
self.logger.log(Logger.ERROR, "SocketError: "+str(err)) |
663 |
+ |
return None |
664 |
+ |
try: |
665 |
+ |
sock.send("GET "+path+" HTTP/1.0\n\n"); |
666 |
+ |
data = sock.recv(8192) |
667 |
+ |
file = StringIO.StringIO(data) |
668 |
+ |
httpStatus = 0 |
669 |
+ |
while True: |
670 |
+ |
line = file.readline().strip() |
671 |
+ |
if line == "": |
672 |
+ |
break # exit at the end of file or at the first empty line (finish of http headers) |
673 |
+ |
r = re.compile("HTTP/\d.\d (\d+)").match(line) |
674 |
+ |
if r != None: |
675 |
+ |
httpStatus = int(r.group(1)) |
676 |
+ |
if httpStatus == 200: |
677 |
+ |
return file |
678 |
+ |
else: |
679 |
+ |
self.logger.log(Logger.ERROR, "Cannot open "+url) |
680 |
+ |
if httpStatus == 401: |
681 |
+ |
self.logger.log(Logger.ERROR, 'HTTPError: not authorized ['+str(httpStatus)+']') |
682 |
+ |
elif httpStatus == 404: |
683 |
+ |
self.logger.log(Logger.ERROR, 'HTTPError: not found ['+str(httpStatus)+']') |
684 |
+ |
elif httpStatus == 503: |
685 |
+ |
self.logger.log(Logger.ERROR, 'HTTPError: service unavailable ['+str(httpStatus)+']') |
686 |
+ |
else: |
687 |
+ |
self.logger.log(Logger.ERROR, 'HTTPError: unknown error ['+str(httpStatus)+']') |
688 |
+ |
return None |
689 |
+ |
except socket.error, msg: |
690 |
+ |
self.logger.log(Logger.ERROR, "Cannot open "+url) |
691 |
+ |
self.logger.log(Logger.ERROR, "SocketError: "+str(msg)) |
692 |
+ |
sock.close() |
693 |
+ |
return None |
694 |
|
|
695 |
< |
def __directSendParams (self, senderRef, destination, clusterName, nodeName, timeStamp, params): |
695 |
> |
def __directSendParams (self, destination, clusterName, nodeName, timeStamp, params): |
696 |
|
|
697 |
|
if self.__shouldSend() == False: |
698 |
|
self.logger.log(Logger.DEBUG, "Dropping packet since rate is too fast!"); |
703 |
|
return; |
704 |
|
|
705 |
|
host, port, passwd = destination |
706 |
< |
senderRef['SEQ_NR'] = (senderRef['SEQ_NR'] + 1) % 2000000000; # wrap around 2 mld |
706 |
> |
crtSenderRef = self.senderRef[destination] |
707 |
> |
crtSenderRef['SEQ_NR'] = (crtSenderRef['SEQ_NR'] + 1) % 2000000000; # wrap around 2 mld |
708 |
|
|
709 |
< |
xdrPacker = xdrlib.Packer (); |
628 |
< |
self.logger.log(Logger.DEBUG, "Building XDR packet for ["+str(clusterName)+"/"+str(nodeName)+"] <"+str(senderRef['SEQ_NR'])+"/"+str(senderRef['INSTANCE_ID'])+"> len:"+str(len(params))); |
709 |
> |
xdrPacker = xdrlib.Packer () |
710 |
|
|
711 |
|
xdrPacker.pack_string ("v:"+self.__version+"p:"+passwd) |
712 |
|
|
713 |
< |
xdrPacker.pack_int (senderRef['INSTANCE_ID']); |
714 |
< |
xdrPacker.pack_int (senderRef['SEQ_NR']); |
713 |
> |
xdrPacker.pack_int (crtSenderRef['INSTANCE_ID']) |
714 |
> |
xdrPacker.pack_int (crtSenderRef['SEQ_NR']) |
715 |
|
|
716 |
< |
xdrPacker.pack_string (clusterName); |
717 |
< |
xdrPacker.pack_string (nodeName); |
718 |
< |
xdrPacker.pack_int (len(params)); |
716 |
> |
xdrPacker.pack_string (clusterName) |
717 |
> |
xdrPacker.pack_string (nodeName) |
718 |
> |
|
719 |
> |
sent_params_nr = 0 |
720 |
> |
paramsPacker = xdrlib.Packer () |
721 |
|
|
722 |
|
if type(params) == type( {} ): |
723 |
|
for name, value in params.iteritems(): |
724 |
< |
self.__packParameter(xdrPacker, name, value) |
724 |
> |
if self.__packParameter(paramsPacker, name, value): |
725 |
> |
sent_params_nr += 1 |
726 |
|
elif type(params) == type( [] ): |
727 |
|
for name, value in params: |
728 |
|
self.logger.log(Logger.DEBUG, "Adding parameter "+name+" = "+str(value)); |
729 |
< |
self.__packParameter(xdrPacker, name, value) |
729 |
> |
if self.__packParameter(paramsPacker, name, value): |
730 |
> |
sent_params_nr += 1 |
731 |
|
else: |
732 |
|
self.logger.log(Logger.WARNING, "Unsupported params type in sendParameters: " + str(type(params))); |
733 |
+ |
|
734 |
+ |
xdrPacker.pack_int (sent_params_nr) |
735 |
+ |
|
736 |
|
if (timeStamp != None) and (timeStamp > 0): |
737 |
< |
xdrPacker.pack_int(timeStamp); |
737 |
> |
paramsPacker.pack_int(timeStamp); |
738 |
|
|
739 |
< |
buffer = xdrPacker.get_buffer(); |
739 |
> |
buffer = xdrPacker.get_buffer() + paramsPacker.get_buffer() |
740 |
> |
self.logger.log(Logger.NOTICE, "Building XDR packet ["+str(clusterName)+"/"+str(nodeName)+"] <"+str(crtSenderRef['SEQ_NR'])+"/"+str(crtSenderRef['INSTANCE_ID'])+"> "+str(sent_params_nr)+" params, "+str(len(buffer))+" bytes."); |
741 |
|
# send this buffer to the destination, using udp datagrams |
742 |
|
try: |
743 |
|
self.__udpSocket.sendto(buffer, (host, port)) |
744 |
< |
self.logger.log(Logger.DEBUG, "Packet sent to "+host+":"+str(port)+" "+passwd); |
744 |
> |
self.logger.log(Logger.NOTICE, "Packet sent to "+host+":"+str(port)+" "+passwd) |
745 |
|
except socket.error, msg: |
746 |
< |
self.logger.log(Logger.ERROR, "Cannot send packet to "+host+":"+str(port)+" "+passwd+": "+str(msg[1])); |
746 |
> |
self.logger.log(Logger.ERROR, "Cannot send packet to "+host+":"+str(port)+" "+passwd+": "+str(msg[1])) |
747 |
|
xdrPacker.reset() |
748 |
+ |
paramsPacker.reset() |
749 |
|
|
750 |
|
def __packParameter(self, xdrPacker, name, value): |
751 |
|
if (name is None) or (name is ""): |
752 |
< |
self.logger.log(Logger.WARNING, "Undefine parameter name."); |
753 |
< |
return; |
752 |
> |
self.logger.log(Logger.WARNING, "Undefined parameter name. Ignoring value "+str(value)) |
753 |
> |
return False |
754 |
|
if (value is None): |
755 |
< |
self.logger.log(Logger.WARNING, "Ignore " + str(name)+ " parameter because of None value"); |
756 |
< |
return; |
755 |
> |
self.logger.log(Logger.WARNING, "Ignore " + str(name)+ " parameter because of None value") |
756 |
> |
return False |
757 |
|
try: |
758 |
|
typeValue = self.__valueTypes[type(value)] |
759 |
|
xdrPacker.pack_string (name) |
760 |
|
xdrPacker.pack_int (typeValue) |
761 |
|
self.__packFunctions[typeValue] (xdrPacker, value) |
762 |
< |
self.logger.log(Logger.DEBUG, "Adding parameter "+str(name)+" = "+str(value)); |
762 |
> |
self.logger.log(Logger.DEBUG, "Adding parameter "+str(name)+" = "+str(value)) |
763 |
> |
return True |
764 |
|
except Exception, ex: |
765 |
|
self.logger.log(Logger.WARNING, "Error packing %s = %s; got %s" % (name, str(value), ex)) |
766 |
+ |
return False |
767 |
|
|
768 |
|
# Destructor |
769 |
|
def __del__(self): |
820 |
|
5: xdrlib.Packer.pack_double } |
821 |
|
|
822 |
|
__defaultPort = 8884 |
823 |
< |
__version = "2.2.2-py" # apMon version number |
823 |
> |
__version = "2.2.4-py" # apMon version number |
824 |
|
|