246 lines
8.0 KiB
Python
246 lines
8.0 KiB
Python
import logging
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
from concurrent.futures import ProcessPoolExecutor
|
|
from concurrent.futures.process import BrokenProcessPool
|
|
from typing import Collection, Dict, Iterable, Optional, List
|
|
|
|
from platypush.entities import EntityManager
|
|
from platypush.entities.ping import PingHost
|
|
from platypush.message.event.ping import HostUpEvent, HostDownEvent, PingResponseEvent
|
|
from platypush.plugins import RunnablePlugin, action
|
|
from platypush.schemas.ping import PingResponseSchema
|
|
|
|
PING_MATCHER = re.compile(
|
|
r"(?P<min>\d+.\d+)/(?P<avg>\d+.\d+)/(?P<max>\d+.\d+)/(?P<mdev>\d+.\d+)"
|
|
)
|
|
|
|
PING_MATCHER_BUSYBOX = re.compile(r"(?P<min>\d+.\d+)/(?P<avg>\d+.\d+)/(?P<max>\d+.\d+)")
|
|
|
|
WIN32_PING_MATCHER = re.compile(r"(?P<min>\d+)ms.+(?P<max>\d+)ms.+(?P<avg>\d+)ms")
|
|
|
|
|
|
def ping(host: str, ping_cmd: List[str], logger: logging.Logger) -> dict:
|
|
out = None
|
|
pinger = None
|
|
err_response = dict(
|
|
PingResponseSchema().dump(
|
|
{
|
|
"host": host,
|
|
"success": False,
|
|
}
|
|
)
|
|
)
|
|
|
|
try:
|
|
with subprocess.Popen(
|
|
ping_cmd,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
) as pinger:
|
|
out = pinger.communicate()
|
|
if sys.platform == "win32":
|
|
match = WIN32_PING_MATCHER.search(str(out).rsplit("\n", maxsplit=1)[-1])
|
|
min_val, avg_val, max_val = match.groups()
|
|
mdev_val = None
|
|
elif "max/" not in str(out):
|
|
match = PING_MATCHER_BUSYBOX.search(
|
|
str(out).rsplit("\n", maxsplit=1)[-1]
|
|
)
|
|
assert match is not None, out
|
|
min_val, avg_val, max_val = match.groups()
|
|
mdev_val = None
|
|
else:
|
|
match = PING_MATCHER.search(str(out).rsplit("\n", maxsplit=1)[-1])
|
|
assert match is not None, out
|
|
min_val, avg_val, max_val, mdev_val = match.groups()
|
|
|
|
return dict(
|
|
PingResponseSchema().dump(
|
|
{
|
|
"host": host,
|
|
"success": True,
|
|
"min": float(min_val),
|
|
"max": float(max_val),
|
|
"avg": float(avg_val),
|
|
"mdev": float(mdev_val) if mdev_val is not None else None,
|
|
}
|
|
)
|
|
)
|
|
except Exception as e:
|
|
err = (
|
|
'\n'.join(line.decode().strip() for line in out)
|
|
if isinstance(out, (tuple, list))
|
|
else str(e)
|
|
)
|
|
|
|
logger.warning("Error while pinging host %s: %s", host, err)
|
|
if pinger and pinger.poll() is None:
|
|
pinger.kill()
|
|
pinger.wait()
|
|
|
|
return err_response
|
|
|
|
|
|
class PingPlugin(RunnablePlugin, EntityManager):
|
|
"""
|
|
This integration allows you to:
|
|
|
|
- Programmatic ping a remote host.
|
|
- Monitor the status of a remote host.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
executable: str = 'ping',
|
|
count: int = 1,
|
|
timeout: float = 5.0,
|
|
hosts: Optional[List[str]] = None,
|
|
poll_interval: float = 20.0,
|
|
**kwargs,
|
|
):
|
|
"""
|
|
:param executable: Path to the ``ping`` executable. Default: the first ``ping`` executable found in PATH.
|
|
:param count: Default number of packets that should be sent (default: 1).
|
|
:param timeout: Default timeout before failing a ping request (default: 5 seconds).
|
|
:param hosts: List of hosts to monitor. If not specified then no hosts will be monitored.
|
|
:param poll_interval: How often the hosts should be monitored (default: 10 seconds).
|
|
"""
|
|
|
|
super().__init__(poll_interval=poll_interval, **kwargs)
|
|
self.executable = executable
|
|
self.count = count
|
|
self.timeout = timeout
|
|
self.hosts: Dict[str, Optional[dict]] = {h: None for h in (hosts or [])}
|
|
|
|
def _get_ping_cmd(self, host: str, count: int, timeout: float) -> List[str]:
|
|
if sys.platform == 'win32':
|
|
return [ # noqa
|
|
self.executable,
|
|
'-n',
|
|
str(count or self.count),
|
|
'-w',
|
|
str((timeout or self.timeout) * 1000),
|
|
host,
|
|
]
|
|
|
|
return [
|
|
self.executable,
|
|
'-n',
|
|
'-q',
|
|
'-c',
|
|
str(count or self.count),
|
|
'-W',
|
|
str(timeout or self.timeout),
|
|
host,
|
|
]
|
|
|
|
@action
|
|
def ping(
|
|
self, host: str, count: Optional[int] = None, timeout: Optional[float] = None
|
|
) -> dict:
|
|
"""
|
|
Ping a remote host.
|
|
|
|
:param host: Remote host IP or name
|
|
:param count: Overrides the configured number of packets that should be
|
|
sent (default: 1).
|
|
:param timeout: Overrides the configured timeout before failing a ping
|
|
request (default: 5 seconds).
|
|
:return: .. schema:: ping.PingResponseSchema
|
|
"""
|
|
return self._ping(host=host, count=count, timeout=timeout)
|
|
|
|
def _ping(
|
|
self, host: str, count: Optional[int] = None, timeout: Optional[float] = None
|
|
) -> dict:
|
|
return ping(
|
|
host=host,
|
|
ping_cmd=self._get_ping_cmd(
|
|
host=host, count=count or self.count, timeout=timeout or self.timeout
|
|
),
|
|
logger=self.logger,
|
|
)
|
|
|
|
def _process_ping_result(self, result: dict):
|
|
host = result.get("host")
|
|
if host is None:
|
|
return
|
|
|
|
success = result.get("success")
|
|
if success is None:
|
|
return
|
|
|
|
prev_result = self.hosts.get(host)
|
|
if success:
|
|
if not (prev_result and prev_result.get("success")):
|
|
self._bus.post(HostUpEvent(host=host))
|
|
|
|
self._bus.post(PingResponseEvent(**result))
|
|
self.hosts[host] = result
|
|
else:
|
|
if not prev_result or prev_result.get("success"):
|
|
self._bus.post(HostDownEvent(host=host))
|
|
self.hosts[host] = result
|
|
|
|
@action
|
|
def status(self) -> Collection[PingHost]:
|
|
"""
|
|
Get the status of the monitored hosts.
|
|
:return: Dictionary of monitored hosts and their status.
|
|
"""
|
|
return self.publish_entities()
|
|
|
|
def publish_entities(self, *_, **__) -> Collection[PingHost]:
|
|
return super().publish_entities(self.hosts.values())
|
|
|
|
def transform_entities(
|
|
self, entities: Collection[Optional[dict]], **_
|
|
) -> Iterable[PingHost]:
|
|
return super().transform_entities(
|
|
[
|
|
PingHost(
|
|
id=status.get("host"),
|
|
name=status.get("host"),
|
|
reachable=status.get("success"),
|
|
min=status.get("min"),
|
|
max=status.get("max"),
|
|
avg=status.get("avg"),
|
|
)
|
|
for status in entities
|
|
if status
|
|
]
|
|
)
|
|
|
|
def main(self):
|
|
# Don't start the thread if no monitored hosts are configured
|
|
if not self.hosts:
|
|
self.wait_stop()
|
|
return
|
|
|
|
while not self.should_stop():
|
|
try:
|
|
with ProcessPoolExecutor() as executor:
|
|
for result in executor.map(
|
|
ping,
|
|
self.hosts.keys(),
|
|
[
|
|
self._get_ping_cmd(h, self.count, self.timeout)
|
|
for h in self.hosts.keys()
|
|
],
|
|
[self.logger] * len(self.hosts),
|
|
):
|
|
self._process_ping_result(result)
|
|
except (KeyboardInterrupt, BrokenProcessPool):
|
|
break
|
|
except Exception as e:
|
|
self.logger.warning("Error while pinging hosts: %s", e)
|
|
self.logger.exception(e)
|
|
finally:
|
|
self.publish_entities()
|
|
self.wait_stop(self.poll_interval)
|
|
|
|
|
|
# vim:sw=4:ts=4:et:
|