From 663be43f06be777ef7655dc87d7d447c82b14441 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Fri, 27 Dec 2019 23:26:39 +0100 Subject: [PATCH] Added ping plugin and backend --- platypush/backend/ping.py | 68 ++++++++++++++++++ platypush/message/__init__.py | 21 +++--- platypush/message/event/ping.py | 19 +++++- platypush/message/response/__init__.py | 7 +- platypush/message/response/ping.py | 27 ++++++++ platypush/plugins/ping.py | 95 ++++++++++++++++++++++++++ 6 files changed, 220 insertions(+), 17 deletions(-) create mode 100644 platypush/backend/ping.py create mode 100644 platypush/message/response/ping.py create mode 100644 platypush/plugins/ping.py diff --git a/platypush/backend/ping.py b/platypush/backend/ping.py new file mode 100644 index 000000000..331500720 --- /dev/null +++ b/platypush/backend/ping.py @@ -0,0 +1,68 @@ +import time + +from typing import List, Tuple + +from platypush.backend import Backend +from platypush.context import get_plugin +from platypush.message.event.ping import HostUpEvent, HostDownEvent +from platypush.utils.workers import Worker, Workers + + +class PingBackend(Backend): + """ + This backend allows you to ping multiple remote hosts at regular intervals. + + Triggers: + + - :class:`platypush.message.ping.HostDownEvent` if a host stops responding ping requests + - :class:`platypush.message.ping.HostUpEvent` if a host starts responding ping requests + + """ + + class Pinger(Worker): + def __init__(self, *args, **kwargs): + self.timeout = kwargs.pop('timeout') + super().__init__(*args, **kwargs) + + def process(self, host: str) -> Tuple[str, bool]: + pinger = get_plugin('ping') + response = pinger.ping(host, timeout=self.timeout, count=1).output + return host, response['success'] is True + + def __init__(self, hosts: List[str], timeout: float = 5.0, interval: float = 60.0, *args, **kwargs): + """ + :param hosts: List of IP addresses or host names to monitor. + :param timeout: Ping timeout. + :param interval: Interval between two scans. + """ + + super().__init__(*args, **kwargs) + self.hosts = {h: None for h in hosts} + self.timeout = timeout + self.interval = interval + + def run(self): + super().run() + self.logger.info('Starting ping backend with {} hosts to monitor'.format(len(self.hosts))) + + while not self.should_stop(): + workers = Workers(min(len(self.hosts), 10), self.Pinger, timeout=self.timeout) + + with workers: + for host in self.hosts.keys(): + workers.put(host) + + for response in workers.responses: + host, is_up = response + if is_up != self.hosts[host]: + if is_up: + self.bus.post(HostUpEvent(host)) + else: + self.bus.post(HostDownEvent(host)) + + self.hosts[host] = is_up + + time.sleep(self.interval) + + +# vim:sw=4:ts=4:et: diff --git a/platypush/message/__init__.py b/platypush/message/__init__.py index 36a915332..c63d1225e 100644 --- a/platypush/message/__init__.py +++ b/platypush/message/__init__.py @@ -10,6 +10,15 @@ logger = logging.getLogger(__name__) class Message(object): """ Message generic class """ + class Encoder(json.JSONEncoder): + def default(self, obj): + if isinstance(obj, datetime.datetime) or \ + isinstance(obj, datetime.date) or \ + isinstance(obj, datetime.time): + return obj.isoformat() + + return super().default(obj) + def __init__(self, timestamp=None, *args, **kwargs): self.timestamp = timestamp or time.time() @@ -24,7 +33,7 @@ class Message(object): for attr in self.__dir__() if (attr != '_timestamp' or not attr.startswith('_')) and not inspect.ismethod(getattr(self, attr)) - }, cls=MessageEncoder).replace('\n', ' ') + }, cls=self.Encoder).replace('\n', ' ') def __bytes__(self): """ @@ -133,14 +142,4 @@ class Mapping(dict): return str(self.__dict__) -class MessageEncoder(json.JSONEncoder): - def default(self, obj): - if isinstance(obj, datetime.datetime) or \ - isinstance(obj, datetime.date) or \ - isinstance(obj, datetime.time): - return obj.isoformat() - - return super().default(obj) - - # vim:sw=4:ts=4:et: diff --git a/platypush/message/event/ping.py b/platypush/message/event/ping.py index 132a3b939..095a71cf1 100644 --- a/platypush/message/event/ping.py +++ b/platypush/message/event/ping.py @@ -1,4 +1,4 @@ -from platypush.message.event import Event, EventMatchResult +from platypush.message.event import Event class PingEvent(Event): @@ -13,5 +13,20 @@ class PingEvent(Event): super().__init__(message=message, *args, **kwargs) -# vim:sw=4:ts=4:et: +class HostDownEvent(Event): + """ + Event triggered when a remote host stops responding ping requests. + """ + def __init__(self, host: str, *args, **kwargs): + super().__init__(host=host, *args, **kwargs) + +class HostUpEvent(Event): + """ + Event triggered when a remote host starts responding ping requests. + """ + def __init__(self, host: str, *args, **kwargs): + super().__init__(host=host, *args, **kwargs) + + +# vim:sw=4:ts=4:et: diff --git a/platypush/message/response/__init__.py b/platypush/message/response/__init__.py index c539115b1..67629e0d8 100644 --- a/platypush/message/response/__init__.py +++ b/platypush/message/response/__init__.py @@ -1,7 +1,7 @@ import json import time -from platypush.message import Message, MessageEncoder +from platypush.message import Message class Response(Message): @@ -66,9 +66,8 @@ class Response(Message): Overrides the str() operator and converts the message into a UTF-8 JSON string """ - output = self.output if self.output is not None and self.output != {} else { - 'status': 'ok' if not self.errors else 'error' + 'success': True if not self.errors else False } response_dict = { @@ -86,7 +85,7 @@ class Response(Message): if self.disable_logging: response_dict['_disable_logging'] = self.disable_logging - return json.dumps(response_dict, cls=MessageEncoder) + return json.dumps(response_dict, cls=self.Encoder) # vim:sw=4:ts=4:et: diff --git a/platypush/message/response/ping.py b/platypush/message/response/ping.py new file mode 100644 index 000000000..f13c1f3e5 --- /dev/null +++ b/platypush/message/response/ping.py @@ -0,0 +1,27 @@ +from typing import Optional + +from platypush.message.response import Response + + +class PingResponse(Response): + # noinspection PyShadowingBuiltins + def __init__(self, + host: str, + success: bool, + *args, + min: Optional[float] = None, + max: Optional[float] = None, + avg: Optional[float] = None, + mdev: Optional[float] = None, + **kwargs): + super().__init__(*args, output={ + 'host': host, + 'success': success, + 'min': min, + 'max': max, + 'avg': avg, + 'mdev': mdev, + }, **kwargs) + + +# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/ping.py b/platypush/plugins/ping.py new file mode 100644 index 000000000..b4e572c5e --- /dev/null +++ b/platypush/plugins/ping.py @@ -0,0 +1,95 @@ +import re +import subprocess +import sys + +from typing import Optional, List + +from platypush.message.response.ping import PingResponse +from platypush.plugins import Plugin, action + +PING_MATCHER = re.compile( + r"(?P\d+.\d+)/(?P\d+.\d+)/(?P\d+.\d+)/(?P\d+.\d+)" +) + +PING_MATCHER_BUSYBOX = re.compile( + r"(?P\d+.\d+)/(?P\d+.\d+)/(?P\d+.\d+)" +) + +WIN32_PING_MATCHER = re.compile(r"(?P\d+)ms.+(?P\d+)ms.+(?P\d+)ms") + + +class PingPlugin(Plugin): + """ + Perform ICMP network ping on remote hosts. + """ + + def __init__(self, executable: str = 'ping', count: int = 1, timeout: float = 5.0, **kwargs): + """ + :param executable: Path to the ``ping`` executable. Default: the first ``ping`` executable found in PATH. + :param count: Default number of packets that should be sent (default: 1). + :param timeout: Default timeout before failing a ping request (default: 5 seconds). + """ + + super().__init__(**kwargs) + self.executable = executable + self.count = count + self.timeout = timeout + + def _get_ping_cmd(self, host: str, count: int, timeout: float) -> List[str]: + if sys.platform == 'win32': + return [ + self.executable, + '-n', + str(count or self.count), + '-w', + str((timeout or self.timeout) * 1000), + host, + ] + + return [ + self.executable, + '-n', + '-q', + '-c', + str(count or self.count), + '-W', + str(timeout or self.timeout), + host, + ] + + @action + def ping(self, host: str, count: Optional[int] = None, timeout: Optional[float] = None) -> PingResponse: + """ + Ping a remote host. + :param host: Remote host IP or name + :param count: Number of packets that should be sent (default: 1). + :param timeout: Timeout before failing a ping request (default: 5 seconds). + """ + + count = count or self.count + timeout = timeout or self.timeout + + pinger = subprocess.Popen( + self._get_ping_cmd(host, count=count, timeout=timeout), + stdout=subprocess.PIPE, stderr=subprocess.PIPE) + + try: + out = pinger.communicate() + if sys.platform == "win32": + match = WIN32_PING_MATCHER.search(str(out).split("\n")[-1]) + min_val, avg_val, max_val = match.groups() + mdev_val = None + elif "max/" not in str(out): + match = PING_MATCHER_BUSYBOX.search(str(out).split("\n")[-1]) + min_val, avg_val, max_val = match.groups() + mdev_val = None + else: + match = PING_MATCHER.search(str(out).split("\n")[-1]) + min_val, avg_val, max_val, mdev_val = match.groups() + + return PingResponse(host=host, success=True, min=min_val, max=max_val, avg=avg_val, mdev=mdev_val) + except (subprocess.CalledProcessError, AttributeError): + return PingResponse(host=host, success=False) + + +# vim:sw=4:ts=4:et: