diff --git a/platypush/backend/ping/__init__.py b/platypush/backend/ping/__init__.py deleted file mode 100644 index 4577bbad1..000000000 --- a/platypush/backend/ping/__init__.py +++ /dev/null @@ -1,76 +0,0 @@ -import time - -from typing import List, Tuple - -from platypush.backend import Backend -from platypush.context import get_plugin -from platypush.message.event.ping import HostUpEvent, HostDownEvent -from platypush.utils.workers import Worker, Workers - - -class PingBackend(Backend): - """ - This backend allows you to ping multiple remote hosts at regular intervals. - """ - - class Pinger(Worker): - def __init__(self, *args, **kwargs): - self.timeout = kwargs.pop('timeout') - self.count = kwargs.pop('count') - super().__init__(*args, **kwargs) - - def process(self, host: str) -> Tuple[str, bool]: - pinger = get_plugin('ping') - response = pinger.ping(host, timeout=self.timeout, count=self.count).output - return host, response['success'] is True - - def __init__( - self, - hosts: List[str], - timeout: float = 5.0, - interval: float = 60.0, - count: int = 1, - *args, - **kwargs - ): - """ - :param hosts: List of IP addresses or host names to monitor. - :param timeout: Ping timeout. - :param interval: Interval between two scans. - :param count: Number of pings per host. A host will be considered down - if all the ping requests fail. - """ - - super().__init__(*args, **kwargs) - self.hosts = {h: None for h in hosts} - self.timeout = timeout - self.interval = interval - self.count = count - - def run(self): - super().run() - self.logger.info( - 'Starting ping backend with {} hosts to monitor'.format(len(self.hosts)) - ) - - while not self.should_stop(): - workers = Workers(10, self.Pinger, timeout=self.timeout, count=self.count) - - with workers: - for host in self.hosts.keys(): - workers.put(host) - - for response in workers.responses: - host, is_up = response - if is_up != self.hosts[host]: - if is_up: - self.bus.post(HostUpEvent(host)) - else: - self.bus.post(HostDownEvent(host)) - - self.hosts[host] = is_up - - time.sleep(self.interval) - - -# vim:sw=4:ts=4:et: diff --git a/platypush/backend/ping/manifest.yaml b/platypush/backend/ping/manifest.yaml deleted file mode 100644 index d9900d385..000000000 --- a/platypush/backend/ping/manifest.yaml +++ /dev/null @@ -1,14 +0,0 @@ -manifest: - events: - platypush.message.event.ping.HostDownEvent: if a host stops responding ping requests - platypush.message.event.ping.HostUpEvent: if a host starts responding ping requests - install: - apt: - - iputils-ping - dnf: - - iputils - pacman: - - iputils - - package: platypush.backend.ping - type: backend diff --git a/platypush/message/event/ping.py b/platypush/message/event/ping.py index 095a71cf1..9428a150b 100644 --- a/platypush/message/event/ping.py +++ b/platypush/message/event/ping.py @@ -2,21 +2,22 @@ from platypush.message.event import Event class PingEvent(Event): - """ Ping event, used for testing purposes """ + """Ping event, used for testing purposes""" - def __init__(self, message=None, *args, **kwargs): + def __init__(self, *args, message=None, **kwargs): """ :param message: Ping message :type message: object """ - super().__init__(message=message, *args, **kwargs) + super().__init__(*args, message=message, **kwargs) class HostDownEvent(Event): """ Event triggered when a remote host stops responding ping requests. """ + def __init__(self, host: str, *args, **kwargs): super().__init__(host=host, *args, **kwargs) @@ -25,8 +26,36 @@ class HostUpEvent(Event): """ Event triggered when a remote host starts responding ping requests. """ + def __init__(self, host: str, *args, **kwargs): super().__init__(host=host, *args, **kwargs) +class PingResponseEvent(Event): + """ + Event triggered when a ping response is received. + """ + + def __init__( + self, + host: str, + min: float, + max: float, + avg: float, + mdev: float, + *args, + **kwargs + ): + """ + :param host: Remote host IP or name. + :param min: Minimum round-trip time (in ms). + :param max: Maximum round-trip time (in ms). + :param avg: Average round-trip time (in ms). + :param mdev: Standard deviation of the round-trip time (in ms). + """ + super().__init__( + host=host, min=min, max=max, avg=avg, mdev=mdev, *args, **kwargs + ) + + # vim:sw=4:ts=4:et: diff --git a/platypush/message/response/ping.py b/platypush/message/response/ping.py deleted file mode 100644 index f13c1f3e5..000000000 --- a/platypush/message/response/ping.py +++ /dev/null @@ -1,27 +0,0 @@ -from typing import Optional - -from platypush.message.response import Response - - -class PingResponse(Response): - # noinspection PyShadowingBuiltins - def __init__(self, - host: str, - success: bool, - *args, - min: Optional[float] = None, - max: Optional[float] = None, - avg: Optional[float] = None, - mdev: Optional[float] = None, - **kwargs): - super().__init__(*args, output={ - 'host': host, - 'success': success, - 'min': min, - 'max': max, - 'avg': avg, - 'mdev': mdev, - }, **kwargs) - - -# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/ping/__init__.py b/platypush/plugins/ping/__init__.py index b4e572c5e..8663f0847 100644 --- a/platypush/plugins/ping/__init__.py +++ b/platypush/plugins/ping/__init__.py @@ -1,43 +1,113 @@ +import logging import re import subprocess import sys +from concurrent.futures import ProcessPoolExecutor +from typing import Dict, Optional, List -from typing import Optional, List - -from platypush.message.response.ping import PingResponse -from platypush.plugins import Plugin, action +from platypush.message.event.ping import HostUpEvent, HostDownEvent, PingResponseEvent +from platypush.plugins import RunnablePlugin, action +from platypush.schemas.ping import PingResponseSchema PING_MATCHER = re.compile( r"(?P\d+.\d+)/(?P\d+.\d+)/(?P\d+.\d+)/(?P\d+.\d+)" ) -PING_MATCHER_BUSYBOX = re.compile( - r"(?P\d+.\d+)/(?P\d+.\d+)/(?P\d+.\d+)" -) +PING_MATCHER_BUSYBOX = re.compile(r"(?P\d+.\d+)/(?P\d+.\d+)/(?P\d+.\d+)") WIN32_PING_MATCHER = re.compile(r"(?P\d+)ms.+(?P\d+)ms.+(?P\d+)ms") -class PingPlugin(Plugin): +def ping(host: str, ping_cmd: List[str], logger: logging.Logger) -> dict: + err_response = dict( + PingResponseSchema().dump( + { + "host": host, + "success": False, + } + ) + ) + + with subprocess.Popen( + ping_cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) as pinger: + try: + out = pinger.communicate() + if sys.platform == "win32": + match = WIN32_PING_MATCHER.search(str(out).rsplit("\n", maxsplit=1)[-1]) + min_val, avg_val, max_val = match.groups() + mdev_val = None + elif "max/" not in str(out): + match = PING_MATCHER_BUSYBOX.search( + str(out).rsplit("\n", maxsplit=1)[-1] + ) + assert match is not None, f"No match found in {out}" + min_val, avg_val, max_val = match.groups() + mdev_val = None + else: + match = PING_MATCHER.search(str(out).rsplit("\n", maxsplit=1)[-1]) + assert match is not None, f"No match found in {out}" + min_val, avg_val, max_val, mdev_val = match.groups() + + return dict( + PingResponseSchema().dump( + { + "host": host, + "success": True, + "min": float(min_val), + "max": float(max_val), + "avg": float(avg_val), + "mdev": float(mdev_val) if mdev_val is not None else None, + } + ) + ) + except Exception as e: + if not isinstance(e, (subprocess.CalledProcessError, KeyboardInterrupt)): + logger.warning("Error while pinging host %s: %s", host, e) + logger.exception(e) + + pinger.kill() + pinger.wait() + return err_response + + +class PingPlugin(RunnablePlugin): """ - Perform ICMP network ping on remote hosts. + This integration allows you to: + + 1. Programmatic ping a remote host. + 2. Monitor the status of a remote host. + """ - def __init__(self, executable: str = 'ping', count: int = 1, timeout: float = 5.0, **kwargs): + def __init__( + self, + executable: str = 'ping', + count: int = 1, + timeout: float = 5.0, + hosts: Optional[List[str]] = None, + poll_interval: float = 10.0, + **kwargs, + ): """ :param executable: Path to the ``ping`` executable. Default: the first ``ping`` executable found in PATH. :param count: Default number of packets that should be sent (default: 1). :param timeout: Default timeout before failing a ping request (default: 5 seconds). + :param hosts: List of hosts to monitor. If not specified then no hosts will be monitored. + :param poll_interval: How often the hosts should be monitored (default: 10 seconds). """ - super().__init__(**kwargs) + super().__init__(poll_interval=poll_interval, **kwargs) self.executable = executable self.count = count self.timeout = timeout + self.hosts: Dict[str, Optional[bool]] = {h: None for h in (hosts or [])} def _get_ping_cmd(self, host: str, count: int, timeout: float) -> List[str]: if sys.platform == 'win32': - return [ + return [ # noqa self.executable, '-n', str(count or self.count), @@ -58,38 +128,78 @@ class PingPlugin(Plugin): ] @action - def ping(self, host: str, count: Optional[int] = None, timeout: Optional[float] = None) -> PingResponse: + def ping( + self, host: str, count: Optional[int] = None, timeout: Optional[float] = None + ) -> dict: """ Ping a remote host. :param host: Remote host IP or name - :param count: Number of packets that should be sent (default: 1). - :param timeout: Timeout before failing a ping request (default: 5 seconds). + :param count: Overrides the configured number of packets that should be + sent (default: 1). + :param timeout: Overrides the configured timeout before failing a ping + request (default: 5 seconds). + :return: .. schema:: ping.PingResponseSchema """ + return self._ping(host=host, count=count, timeout=timeout) - count = count or self.count - timeout = timeout or self.timeout + def _ping( + self, host: str, count: Optional[int] = None, timeout: Optional[float] = None + ) -> dict: + return ping( + host=host, + ping_cmd=self._get_ping_cmd( + host=host, count=count or self.count, timeout=timeout or self.timeout + ), + logger=self.logger, + ) - pinger = subprocess.Popen( - self._get_ping_cmd(host, count=count, timeout=timeout), - stdout=subprocess.PIPE, stderr=subprocess.PIPE) + def _process_ping_result(self, result: dict): + host = result.get("host") + if host is None: + return - try: - out = pinger.communicate() - if sys.platform == "win32": - match = WIN32_PING_MATCHER.search(str(out).split("\n")[-1]) - min_val, avg_val, max_val = match.groups() - mdev_val = None - elif "max/" not in str(out): - match = PING_MATCHER_BUSYBOX.search(str(out).split("\n")[-1]) - min_val, avg_val, max_val = match.groups() - mdev_val = None - else: - match = PING_MATCHER.search(str(out).split("\n")[-1]) - min_val, avg_val, max_val, mdev_val = match.groups() + success = result.get("success") + if success is None: + return - return PingResponse(host=host, success=True, min=min_val, max=max_val, avg=avg_val, mdev=mdev_val) - except (subprocess.CalledProcessError, AttributeError): - return PingResponse(host=host, success=False) + if success: + if self.hosts.get(host) in (False, None): + self._bus.post(HostUpEvent(host=host)) + + result.pop("success", None) + self._bus.post(PingResponseEvent(**result)) + self.hosts[host] = True + else: + if self.hosts.get(host) in (True, None): + self._bus.post(HostDownEvent(host=host)) + self.hosts[host] = False + + def main(self): + # Don't start the thread if no monitored hosts are configured + if not self.hosts: + self.wait_stop() + return + + while not self.should_stop(): + try: + with ProcessPoolExecutor() as executor: + for result in executor.map( + ping, + self.hosts.keys(), + [ + self._get_ping_cmd(h, self.count, self.timeout) + for h in self.hosts.keys() + ], + [self.logger] * len(self.hosts), + ): + self._process_ping_result(result) + except KeyboardInterrupt: + break + except Exception as e: + self.logger.warning("Error while pinging hosts: %s", e) + self.logger.exception(e) + finally: + self.wait_stop(self.poll_interval) # vim:sw=4:ts=4:et: diff --git a/platypush/plugins/ping/manifest.yaml b/platypush/plugins/ping/manifest.yaml index 57800cd41..d0018e1dd 100644 --- a/platypush/plugins/ping/manifest.yaml +++ b/platypush/plugins/ping/manifest.yaml @@ -1,5 +1,8 @@ manifest: - events: {} + events: + - platypush.message.event.ping.HostDownEvent + - platypush.message.event.ping.HostUpEvent + - platypush.message.event.ping.PingResponseEvent install: apt: - iputils-ping @@ -7,6 +10,5 @@ manifest: - iputils pacman: - iputils - package: platypush.plugins.ping type: plugin diff --git a/platypush/schemas/ping.py b/platypush/schemas/ping.py new file mode 100644 index 000000000..b84e23be0 --- /dev/null +++ b/platypush/schemas/ping.py @@ -0,0 +1,56 @@ +from marshmallow import fields +from marshmallow.schema import Schema + + +class PingResponseSchema(Schema): + """ + Ping response schema. + """ + + host = fields.String( + required=True, + metadata={ + "description": "Remote host IP or name", + "example": "platypush.tech", + }, + ) + + success = fields.Boolean( + required=True, + metadata={ + "description": "True if the ping was successful, False otherwise", + "example": True, + }, + ) + + min = fields.Float( + required=False, + metadata={ + "description": "Minimum round-trip time (in ms)", + "example": 0.1, + }, + ) + + max = fields.Float( + required=False, + metadata={ + "description": "Maximum round-trip time (in ms)", + "example": 0.2, + }, + ) + + avg = fields.Float( + required=False, + metadata={ + "description": "Average round-trip time (in ms)", + "example": 0.15, + }, + ) + + mdev = fields.Float( + required=False, + metadata={ + "description": "Standard deviation of the round-trip time (in ms)", + "example": 0.05, + }, + )