Added ping plugin and backend
This commit is contained in:
parent
ce2b3ae849
commit
663be43f06
6 changed files with 220 additions and 17 deletions
68
platypush/backend/ping.py
Normal file
68
platypush/backend/ping.py
Normal file
|
@ -0,0 +1,68 @@
|
||||||
|
import time
|
||||||
|
|
||||||
|
from typing import List, Tuple
|
||||||
|
|
||||||
|
from platypush.backend import Backend
|
||||||
|
from platypush.context import get_plugin
|
||||||
|
from platypush.message.event.ping import HostUpEvent, HostDownEvent
|
||||||
|
from platypush.utils.workers import Worker, Workers
|
||||||
|
|
||||||
|
|
||||||
|
class PingBackend(Backend):
|
||||||
|
"""
|
||||||
|
This backend allows you to ping multiple remote hosts at regular intervals.
|
||||||
|
|
||||||
|
Triggers:
|
||||||
|
|
||||||
|
- :class:`platypush.message.ping.HostDownEvent` if a host stops responding ping requests
|
||||||
|
- :class:`platypush.message.ping.HostUpEvent` if a host starts responding ping requests
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
class Pinger(Worker):
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
self.timeout = kwargs.pop('timeout')
|
||||||
|
super().__init__(*args, **kwargs)
|
||||||
|
|
||||||
|
def process(self, host: str) -> Tuple[str, bool]:
|
||||||
|
pinger = get_plugin('ping')
|
||||||
|
response = pinger.ping(host, timeout=self.timeout, count=1).output
|
||||||
|
return host, response['success'] is True
|
||||||
|
|
||||||
|
def __init__(self, hosts: List[str], timeout: float = 5.0, interval: float = 60.0, *args, **kwargs):
|
||||||
|
"""
|
||||||
|
:param hosts: List of IP addresses or host names to monitor.
|
||||||
|
:param timeout: Ping timeout.
|
||||||
|
:param interval: Interval between two scans.
|
||||||
|
"""
|
||||||
|
|
||||||
|
super().__init__(*args, **kwargs)
|
||||||
|
self.hosts = {h: None for h in hosts}
|
||||||
|
self.timeout = timeout
|
||||||
|
self.interval = interval
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
super().run()
|
||||||
|
self.logger.info('Starting ping backend with {} hosts to monitor'.format(len(self.hosts)))
|
||||||
|
|
||||||
|
while not self.should_stop():
|
||||||
|
workers = Workers(min(len(self.hosts), 10), self.Pinger, timeout=self.timeout)
|
||||||
|
|
||||||
|
with workers:
|
||||||
|
for host in self.hosts.keys():
|
||||||
|
workers.put(host)
|
||||||
|
|
||||||
|
for response in workers.responses:
|
||||||
|
host, is_up = response
|
||||||
|
if is_up != self.hosts[host]:
|
||||||
|
if is_up:
|
||||||
|
self.bus.post(HostUpEvent(host))
|
||||||
|
else:
|
||||||
|
self.bus.post(HostDownEvent(host))
|
||||||
|
|
||||||
|
self.hosts[host] = is_up
|
||||||
|
|
||||||
|
time.sleep(self.interval)
|
||||||
|
|
||||||
|
|
||||||
|
# vim:sw=4:ts=4:et:
|
|
@ -10,6 +10,15 @@ logger = logging.getLogger(__name__)
|
||||||
class Message(object):
|
class Message(object):
|
||||||
""" Message generic class """
|
""" Message generic class """
|
||||||
|
|
||||||
|
class Encoder(json.JSONEncoder):
|
||||||
|
def default(self, obj):
|
||||||
|
if isinstance(obj, datetime.datetime) or \
|
||||||
|
isinstance(obj, datetime.date) or \
|
||||||
|
isinstance(obj, datetime.time):
|
||||||
|
return obj.isoformat()
|
||||||
|
|
||||||
|
return super().default(obj)
|
||||||
|
|
||||||
def __init__(self, timestamp=None, *args, **kwargs):
|
def __init__(self, timestamp=None, *args, **kwargs):
|
||||||
self.timestamp = timestamp or time.time()
|
self.timestamp = timestamp or time.time()
|
||||||
|
|
||||||
|
@ -24,7 +33,7 @@ class Message(object):
|
||||||
for attr in self.__dir__()
|
for attr in self.__dir__()
|
||||||
if (attr != '_timestamp' or not attr.startswith('_'))
|
if (attr != '_timestamp' or not attr.startswith('_'))
|
||||||
and not inspect.ismethod(getattr(self, attr))
|
and not inspect.ismethod(getattr(self, attr))
|
||||||
}, cls=MessageEncoder).replace('\n', ' ')
|
}, cls=self.Encoder).replace('\n', ' ')
|
||||||
|
|
||||||
def __bytes__(self):
|
def __bytes__(self):
|
||||||
"""
|
"""
|
||||||
|
@ -133,14 +142,4 @@ class Mapping(dict):
|
||||||
return str(self.__dict__)
|
return str(self.__dict__)
|
||||||
|
|
||||||
|
|
||||||
class MessageEncoder(json.JSONEncoder):
|
|
||||||
def default(self, obj):
|
|
||||||
if isinstance(obj, datetime.datetime) or \
|
|
||||||
isinstance(obj, datetime.date) or \
|
|
||||||
isinstance(obj, datetime.time):
|
|
||||||
return obj.isoformat()
|
|
||||||
|
|
||||||
return super().default(obj)
|
|
||||||
|
|
||||||
|
|
||||||
# vim:sw=4:ts=4:et:
|
# vim:sw=4:ts=4:et:
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
from platypush.message.event import Event, EventMatchResult
|
from platypush.message.event import Event
|
||||||
|
|
||||||
|
|
||||||
class PingEvent(Event):
|
class PingEvent(Event):
|
||||||
|
@ -13,5 +13,20 @@ class PingEvent(Event):
|
||||||
super().__init__(message=message, *args, **kwargs)
|
super().__init__(message=message, *args, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
# vim:sw=4:ts=4:et:
|
class HostDownEvent(Event):
|
||||||
|
"""
|
||||||
|
Event triggered when a remote host stops responding ping requests.
|
||||||
|
"""
|
||||||
|
def __init__(self, host: str, *args, **kwargs):
|
||||||
|
super().__init__(host=host, *args, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
class HostUpEvent(Event):
|
||||||
|
"""
|
||||||
|
Event triggered when a remote host starts responding ping requests.
|
||||||
|
"""
|
||||||
|
def __init__(self, host: str, *args, **kwargs):
|
||||||
|
super().__init__(host=host, *args, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
# vim:sw=4:ts=4:et:
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
import json
|
import json
|
||||||
import time
|
import time
|
||||||
|
|
||||||
from platypush.message import Message, MessageEncoder
|
from platypush.message import Message
|
||||||
|
|
||||||
|
|
||||||
class Response(Message):
|
class Response(Message):
|
||||||
|
@ -66,9 +66,8 @@ class Response(Message):
|
||||||
Overrides the str() operator and converts
|
Overrides the str() operator and converts
|
||||||
the message into a UTF-8 JSON string
|
the message into a UTF-8 JSON string
|
||||||
"""
|
"""
|
||||||
|
|
||||||
output = self.output if self.output is not None and self.output != {} else {
|
output = self.output if self.output is not None and self.output != {} else {
|
||||||
'status': 'ok' if not self.errors else 'error'
|
'success': True if not self.errors else False
|
||||||
}
|
}
|
||||||
|
|
||||||
response_dict = {
|
response_dict = {
|
||||||
|
@ -86,7 +85,7 @@ class Response(Message):
|
||||||
if self.disable_logging:
|
if self.disable_logging:
|
||||||
response_dict['_disable_logging'] = self.disable_logging
|
response_dict['_disable_logging'] = self.disable_logging
|
||||||
|
|
||||||
return json.dumps(response_dict, cls=MessageEncoder)
|
return json.dumps(response_dict, cls=self.Encoder)
|
||||||
|
|
||||||
|
|
||||||
# vim:sw=4:ts=4:et:
|
# vim:sw=4:ts=4:et:
|
||||||
|
|
27
platypush/message/response/ping.py
Normal file
27
platypush/message/response/ping.py
Normal file
|
@ -0,0 +1,27 @@
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
from platypush.message.response import Response
|
||||||
|
|
||||||
|
|
||||||
|
class PingResponse(Response):
|
||||||
|
# noinspection PyShadowingBuiltins
|
||||||
|
def __init__(self,
|
||||||
|
host: str,
|
||||||
|
success: bool,
|
||||||
|
*args,
|
||||||
|
min: Optional[float] = None,
|
||||||
|
max: Optional[float] = None,
|
||||||
|
avg: Optional[float] = None,
|
||||||
|
mdev: Optional[float] = None,
|
||||||
|
**kwargs):
|
||||||
|
super().__init__(*args, output={
|
||||||
|
'host': host,
|
||||||
|
'success': success,
|
||||||
|
'min': min,
|
||||||
|
'max': max,
|
||||||
|
'avg': avg,
|
||||||
|
'mdev': mdev,
|
||||||
|
}, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
# vim:sw=4:ts=4:et:
|
95
platypush/plugins/ping.py
Normal file
95
platypush/plugins/ping.py
Normal file
|
@ -0,0 +1,95 @@
|
||||||
|
import re
|
||||||
|
import subprocess
|
||||||
|
import sys
|
||||||
|
|
||||||
|
from typing import Optional, List
|
||||||
|
|
||||||
|
from platypush.message.response.ping import PingResponse
|
||||||
|
from platypush.plugins import Plugin, action
|
||||||
|
|
||||||
|
PING_MATCHER = re.compile(
|
||||||
|
r"(?P<min>\d+.\d+)/(?P<avg>\d+.\d+)/(?P<max>\d+.\d+)/(?P<mdev>\d+.\d+)"
|
||||||
|
)
|
||||||
|
|
||||||
|
PING_MATCHER_BUSYBOX = re.compile(
|
||||||
|
r"(?P<min>\d+.\d+)/(?P<avg>\d+.\d+)/(?P<max>\d+.\d+)"
|
||||||
|
)
|
||||||
|
|
||||||
|
WIN32_PING_MATCHER = re.compile(r"(?P<min>\d+)ms.+(?P<max>\d+)ms.+(?P<avg>\d+)ms")
|
||||||
|
|
||||||
|
|
||||||
|
class PingPlugin(Plugin):
|
||||||
|
"""
|
||||||
|
Perform ICMP network ping on remote hosts.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, executable: str = 'ping', count: int = 1, timeout: float = 5.0, **kwargs):
|
||||||
|
"""
|
||||||
|
:param executable: Path to the ``ping`` executable. Default: the first ``ping`` executable found in PATH.
|
||||||
|
:param count: Default number of packets that should be sent (default: 1).
|
||||||
|
:param timeout: Default timeout before failing a ping request (default: 5 seconds).
|
||||||
|
"""
|
||||||
|
|
||||||
|
super().__init__(**kwargs)
|
||||||
|
self.executable = executable
|
||||||
|
self.count = count
|
||||||
|
self.timeout = timeout
|
||||||
|
|
||||||
|
def _get_ping_cmd(self, host: str, count: int, timeout: float) -> List[str]:
|
||||||
|
if sys.platform == 'win32':
|
||||||
|
return [
|
||||||
|
self.executable,
|
||||||
|
'-n',
|
||||||
|
str(count or self.count),
|
||||||
|
'-w',
|
||||||
|
str((timeout or self.timeout) * 1000),
|
||||||
|
host,
|
||||||
|
]
|
||||||
|
|
||||||
|
return [
|
||||||
|
self.executable,
|
||||||
|
'-n',
|
||||||
|
'-q',
|
||||||
|
'-c',
|
||||||
|
str(count or self.count),
|
||||||
|
'-W',
|
||||||
|
str(timeout or self.timeout),
|
||||||
|
host,
|
||||||
|
]
|
||||||
|
|
||||||
|
@action
|
||||||
|
def ping(self, host: str, count: Optional[int] = None, timeout: Optional[float] = None) -> PingResponse:
|
||||||
|
"""
|
||||||
|
Ping a remote host.
|
||||||
|
:param host: Remote host IP or name
|
||||||
|
:param count: Number of packets that should be sent (default: 1).
|
||||||
|
:param timeout: Timeout before failing a ping request (default: 5 seconds).
|
||||||
|
"""
|
||||||
|
|
||||||
|
count = count or self.count
|
||||||
|
timeout = timeout or self.timeout
|
||||||
|
|
||||||
|
pinger = subprocess.Popen(
|
||||||
|
self._get_ping_cmd(host, count=count, timeout=timeout),
|
||||||
|
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||||
|
|
||||||
|
try:
|
||||||
|
out = pinger.communicate()
|
||||||
|
if sys.platform == "win32":
|
||||||
|
match = WIN32_PING_MATCHER.search(str(out).split("\n")[-1])
|
||||||
|
min_val, avg_val, max_val = match.groups()
|
||||||
|
mdev_val = None
|
||||||
|
elif "max/" not in str(out):
|
||||||
|
match = PING_MATCHER_BUSYBOX.search(str(out).split("\n")[-1])
|
||||||
|
min_val, avg_val, max_val = match.groups()
|
||||||
|
mdev_val = None
|
||||||
|
else:
|
||||||
|
match = PING_MATCHER.search(str(out).split("\n")[-1])
|
||||||
|
min_val, avg_val, max_val, mdev_val = match.groups()
|
||||||
|
|
||||||
|
return PingResponse(host=host, success=True, min=min_val, max=max_val, avg=avg_val, mdev=mdev_val)
|
||||||
|
except (subprocess.CalledProcessError, AttributeError):
|
||||||
|
return PingResponse(host=host, success=False)
|
||||||
|
|
||||||
|
|
||||||
|
# vim:sw=4:ts=4:et:
|
Loading…
Reference in a new issue