From 7e8bef40cd608c1c4b6a014d757ed9cefb0f40a5 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Fri, 14 Aug 2020 00:34:13 +0200 Subject: [PATCH] Added Zeroconf integration [closes #105] --- docs/source/backends.rst | 6 + docs/source/conf.py | 1 + docs/source/events.rst | 1 + docs/source/platypush/events/zeroconf.rst | 5 + docs/source/platypush/plugins/zeroconf.rst | 5 + docs/source/plugins.rst | 4 + platypush/backend/__init__.py | 64 ++++++++++- .../backend/assistant/snowboy/__init__.py | 1 + platypush/backend/http/__init__.py | 2 + platypush/backend/kafka/__init__.py | 1 + platypush/backend/nodered/__init__.py | 2 + platypush/backend/pushbullet/__init__.py | 1 + platypush/backend/sensor/__init__.py | 1 + platypush/backend/tcp.py | 1 + platypush/backend/websocket.py | 20 ++-- platypush/message/event/zeroconf.py | 48 ++++++++ platypush/plugins/zeroconf.py | 107 ++++++++++++++++++ requirements.txt | 3 + setup.py | 1 + 19 files changed, 261 insertions(+), 13 deletions(-) create mode 100644 docs/source/platypush/events/zeroconf.rst create mode 100644 docs/source/platypush/plugins/zeroconf.rst create mode 100644 platypush/message/event/zeroconf.py create mode 100644 platypush/plugins/zeroconf.py diff --git a/docs/source/backends.rst b/docs/source/backends.rst index e1e6160621..49cdeac9da 100644 --- a/docs/source/backends.rst +++ b/docs/source/backends.rst @@ -11,6 +11,11 @@ Backends platypush/backend/assistant.rst platypush/backend/assistant.google.rst platypush/backend/assistant.snowboy.rst + platypush/backend/bluetooth.rst + platypush/backend/bluetooth.fileserver.rst + platypush/backend/bluetooth.pushserver.rst + platypush/backend/bluetooth.scanner.rst + platypush/backend/bluetooth.scanner.ble.rst platypush/backend/button.flic.rst platypush/backend/camera.pi.rst platypush/backend/chat.telegram.rst @@ -48,6 +53,7 @@ Backends platypush/backend/sensor.distance.vl53l1x.rst platypush/backend/sensor.envirophat.rst platypush/backend/sensor.ir.zeroborg.rst + platypush/backend/sensor.leap.rst platypush/backend/sensor.ltr559.rst platypush/backend/sensor.mcp3008.rst platypush/backend/sensor.motion.pwm3901.rst diff --git a/docs/source/conf.py b/docs/source/conf.py index f788f2301d..5a87d49897 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -253,6 +253,7 @@ autodoc_mock_imports = ['googlesamples.assistant.grpc.audio_helpers', 'samsungtvws', 'paramiko', 'luma', + 'zeroconf', ] sys.path.insert(0, os.path.abspath('../..')) diff --git a/docs/source/events.rst b/docs/source/events.rst index 9196bacec5..9b03f9d61f 100644 --- a/docs/source/events.rst +++ b/docs/source/events.rst @@ -61,5 +61,6 @@ Events platypush/events/web.widget.rst platypush/events/wiimote.rst platypush/events/zeroborg.rst + platypush/events/zeroconf.rst platypush/events/zigbee.mqtt.rst platypush/events/zwave.rst diff --git a/docs/source/platypush/events/zeroconf.rst b/docs/source/platypush/events/zeroconf.rst new file mode 100644 index 0000000000..6ed26bf88f --- /dev/null +++ b/docs/source/platypush/events/zeroconf.rst @@ -0,0 +1,5 @@ +``platypush.message.event.zeroconf`` +==================================== + +.. automodule:: platypush.message.event.zeroconf + :members: diff --git a/docs/source/platypush/plugins/zeroconf.rst b/docs/source/platypush/plugins/zeroconf.rst new file mode 100644 index 0000000000..e8f98a4127 --- /dev/null +++ b/docs/source/platypush/plugins/zeroconf.rst @@ -0,0 +1,5 @@ +``platypush.plugins.zeroconf`` +============================== + +.. automodule:: platypush.plugins.zeroconf + :members: diff --git a/docs/source/plugins.rst b/docs/source/plugins.rst index 93074dc34e..af868a1188 100644 --- a/docs/source/plugins.rst +++ b/docs/source/plugins.rst @@ -8,6 +8,7 @@ Plugins platypush/plugins/adafruit.io.rst platypush/plugins/alarm.rst + platypush/plugins/arduino.rst platypush/plugins/assistant.rst platypush/plugins/assistant.echo.rst platypush/plugins/assistant.google.rst @@ -85,6 +86,7 @@ Plugins platypush/plugins/music.mpd.rst platypush/plugins/music.snapcast.rst platypush/plugins/nmap.rst + platypush/plugins/otp.rst platypush/plugins/pihole.rst platypush/plugins/ping.rst platypush/plugins/printer.cups.rst @@ -97,6 +99,7 @@ Plugins platypush/plugins/sound.rst platypush/plugins/ssh.rst platypush/plugins/stt.rst + platypush/plugins/stt.deepspeech.rst platypush/plugins/stt.picovoice.hotword.rst platypush/plugins/stt.picovoice.speech.rst platypush/plugins/switch.rst @@ -122,5 +125,6 @@ Plugins platypush/plugins/weather.darksky.rst platypush/plugins/websocket.rst platypush/plugins/wiimote.rst + platypush/plugins/zeroconf.rst platypush/plugins/zigbee.mqtt.rst platypush/plugins/zwave.rst diff --git a/platypush/backend/__init__.py b/platypush/backend/__init__.py index f2cbc4d01e..68f00ffa25 100644 --- a/platypush/backend/__init__.py +++ b/platypush/backend/__init__.py @@ -4,6 +4,8 @@ """ import logging +import re +import socket import threading import time @@ -13,9 +15,11 @@ from typing import Optional from platypush.bus import Bus from platypush.config import Config from platypush.context import get_backend +from platypush.message.event.zeroconf import ZeroconfServiceAddedEvent, ZeroconfServiceRemovedEvent from platypush.utils import set_timeout, clear_timeout, \ get_redis_queue_name_by_message, set_thread_name +from platypush import __version__ from platypush.event import EventGenerator from platypush.message import Message from platypush.message.event import Event, StopEvent @@ -62,6 +66,8 @@ class Backend(Thread, EventGenerator): self._stop_event = threading.Event() self._kwargs = kwargs self.logger = logging.getLogger(self.__class__.__name__) + self.zeroconf = None + self.zeroconf_info = None # Internal-only, we set the request context on a backend if that # backend is intended to react for a response to a specific request @@ -257,7 +263,7 @@ class Backend(Thread, EventGenerator): def on_stop(self): """ Callback invoked when the process stops """ - pass + self.unregister_service() def stop(self): """ Stops the backend thread by sending a STOP event on its bus """ @@ -304,5 +310,61 @@ class Backend(Thread, EventGenerator): except Exception as e: self.logger.error('Error while processing response to {}: {}'.format(msg, str(e))) + @staticmethod + def _get_ip() -> str: + """ + Get the IP address of the machine. + """ + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + s.connect(('8.8.8.8', 80)) + addr = s.getsockname()[0] + s.close() + return addr + + def register_service(self, port: Optional[int] = None, name: Optional[str] = None, udp: bool = False): + """ + Initialize the Zeroconf service configuration for this backend. + """ + try: + from zeroconf import ServiceInfo, Zeroconf + except ImportError: + self.logger.warning('zeroconf package not available, service discovery will be disabled.') + return + + self.zeroconf = Zeroconf() + srv_desc = { + 'name': 'Platypush', + 'vendor': 'Platypush', + 'version': __version__, + } + + name = name or re.sub(r'Backend$', '', self.__class__.__name__).lower() + srv_type = '_platypush-{name}._{proto}.local.'.format(name=name, proto='udp' if udp else 'tcp') + srv_name = '{host}.{type}'.format(host=self.device_id, type=srv_type) + + if port: + srv_port = port + else: + srv_port = self.port if hasattr(self, 'port') else None + + self.zeroconf_info = ServiceInfo(srv_type, srv_name, socket.inet_aton(self._get_ip()), + srv_port, 0, 0, srv_desc) + + self.zeroconf.register_service(self.zeroconf_info) + self.bus.post(ZeroconfServiceAddedEvent(service_type=srv_type, service_name=srv_name)) + + def unregister_service(self): + """ + Unregister the Zeroconf service configuration if available. + """ + if self.zeroconf and self.zeroconf_info: + self.zeroconf.unregister_service(self.zeroconf_info) + self.zeroconf.close() + self.bus.post(ZeroconfServiceRemovedEvent(service_type=self.zeroconf_info.type, + service_name=self.zeroconf_info.name)) + + self.zeroconf_info = None + self.zeroconf = None + # vim:sw=4:ts=4:et: diff --git a/platypush/backend/assistant/snowboy/__init__.py b/platypush/backend/assistant/snowboy/__init__.py index 6a1800ae18..f4b1cb5daf 100644 --- a/platypush/backend/assistant/snowboy/__init__.py +++ b/platypush/backend/assistant/snowboy/__init__.py @@ -165,6 +165,7 @@ class AssistantSnowboyBackend(AssistantBackend): return callback def on_stop(self): + super().on_stop() if self.detector: self.detector.terminate() self.detector = None diff --git a/platypush/backend/http/__init__.py b/platypush/backend/http/__init__.py index c54b9d9664..1658180118 100644 --- a/platypush/backend/http/__init__.py +++ b/platypush/backend/http/__init__.py @@ -198,6 +198,7 @@ class HttpBackend(Backend): def on_stop(self): """ On backend stop """ + super().on_stop() self.logger.info('Received STOP event on HttpBackend') if self.server_proc: @@ -312,6 +313,7 @@ class HttpBackend(Backend): def run(self): super().run() + self.register_service(port=self.port) if not self.disable_websocket: self.logger.info('Initializing websocket interface') diff --git a/platypush/backend/kafka/__init__.py b/platypush/backend/kafka/__init__.py index c74b66e1da..75a892a0cd 100644 --- a/platypush/backend/kafka/__init__.py +++ b/platypush/backend/kafka/__init__.py @@ -68,6 +68,7 @@ class KafkaBackend(Backend): server=self.server) def on_stop(self): + super().on_stop() try: if self.producer: self.producer.flush() diff --git a/platypush/backend/nodered/__init__.py b/platypush/backend/nodered/__init__.py index a193dae601..9c225965f5 100644 --- a/platypush/backend/nodered/__init__.py +++ b/platypush/backend/nodered/__init__.py @@ -30,12 +30,14 @@ class NoderedBackend(Backend): self._server = None def on_stop(self): + super().on_stop() if self._server: self._server.terminate() self._server = None def run(self): super().run() + self.register_service(port=self.port, name='node') self._server = subprocess.Popen([sys.executable, '-m', 'pynodered.server', '--port', str(self.port), self._runner_path]) diff --git a/platypush/backend/pushbullet/__init__.py b/platypush/backend/pushbullet/__init__.py index 7d511e6e2d..c3e905b25c 100644 --- a/platypush/backend/pushbullet/__init__.py +++ b/platypush/backend/pushbullet/__init__.py @@ -126,6 +126,7 @@ class PushbulletBackend(Backend): self.listener.close() def on_stop(self): + super().on_stop() return self.close() def run(self): diff --git a/platypush/backend/sensor/__init__.py b/platypush/backend/sensor/__init__.py index 2cb3757ade..8055470ca2 100644 --- a/platypush/backend/sensor/__init__.py +++ b/platypush/backend/sensor/__init__.py @@ -166,6 +166,7 @@ class SensorBackend(Backend): return ret def on_stop(self): + super().on_stop() if not self.plugin: return diff --git a/platypush/backend/tcp.py b/platypush/backend/tcp.py index f586c8c356..da5bcd954b 100644 --- a/platypush/backend/tcp.py +++ b/platypush/backend/tcp.py @@ -94,6 +94,7 @@ class TcpBackend(Backend): def run(self): super().run() + self.register_service(port=self.port) serv_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) serv_sock.bind((self.bind_address, self.port)) diff --git a/platypush/backend/websocket.py b/platypush/backend/websocket.py index 28701a3760..456cd0cf2b 100644 --- a/platypush/backend/websocket.py +++ b/platypush/backend/websocket.py @@ -1,7 +1,4 @@ import asyncio -import os -import ssl -import threading import websockets from platypush.backend import Backend @@ -66,7 +63,7 @@ class WebsocketBackend(Backend): ssl_capath=ssl_capath) \ if ssl_cert else None - def send_message(self, msg): + def send_message(self, msg, **kwargs): websocket = get_plugin('websocket') websocket_args = {} @@ -78,7 +75,6 @@ class WebsocketBackend(Backend): websocket.send(url=url, msg=msg, **websocket_args) - def notify_web_clients(self, event): """ Notify all the connected web clients (over websocket) of a new event """ async def send_event(websocket): @@ -90,21 +86,22 @@ class WebsocketBackend(Backend): loop = get_or_create_event_loop() active_websockets = self.active_websockets.copy() - for websocket in active_websockets: + for ws in active_websockets: try: - loop.run_until_complete(send_event(websocket)) + loop.run_until_complete(send_event(ws)) except websockets.exceptions.ConnectionClosed: self.logger.info('Client connection lost') - self.active_websockets.remove(websocket) - + self.active_websockets.remove(ws) def run(self): super().run() + self.register_service(port=self.port, name='ws') + # noinspection PyUnusedLocal async def serve_client(websocket, path): self.active_websockets.add(websocket) self.logger.debug('New websocket connection from {}'. - format(websocket.remote_address[0])) + format(websocket.remote_address[0])) try: while True: @@ -116,7 +113,7 @@ class WebsocketBackend(Backend): msg = Message.build(msg) self.logger.info('Received message from {}: {}'. - format(websocket.remote_address[0], msg)) + format(websocket.remote_address[0], msg)) self.on_message(msg) @@ -154,4 +151,3 @@ class WebsocketBackend(Backend): # vim:sw=4:ts=4:et: - diff --git a/platypush/message/event/zeroconf.py b/platypush/message/event/zeroconf.py new file mode 100644 index 0000000000..1bc72b1739 --- /dev/null +++ b/platypush/message/event/zeroconf.py @@ -0,0 +1,48 @@ +import enum + +from platypush.message.event import Event + + +class ZeroconfEventType(enum.Enum): + ADD = 'add' + UPDATE = 'update' + REMOVE = 'remove' + + +class ZeroconfEvent(Event): + def __init__(self, service_event: ZeroconfEventType, service_type: str, service_name: str, *args, **kwargs): + super().__init__(*args, service_event=service_event.value, service_type=service_type, + service_name=service_name, **kwargs) + + self.service_type = service_type + self.service_name = service_name + + +class ZeroconfServiceAddedEvent(ZeroconfEvent): + """ + Event triggered when a service is added or discovered. + """ + def __init__(self, *args, **kwargs): + kwargs['service_event'] = ZeroconfEventType.ADD + super().__init__(*args, **kwargs) + + +class ZeroconfServiceUpdatedEvent(ZeroconfEvent): + """ + Event triggered when a service is updated. + """ + def __init__(self, *args, **kwargs): + kwargs['service_event'] = ZeroconfEventType.UPDATE + super().__init__(*args, **kwargs) + + +class ZeroconfServiceRemovedEvent(ZeroconfEvent): + """ + Event triggered when a service is removed. + """ + def __init__(self, *args, **kwargs): + kwargs['service_event'] = ZeroconfEventType.REMOVE + super().__init__(*args, **kwargs) + + +# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/zeroconf.py b/platypush/plugins/zeroconf.py new file mode 100644 index 0000000000..67caade61e --- /dev/null +++ b/platypush/plugins/zeroconf.py @@ -0,0 +1,107 @@ +import queue +import time +from typing import List, Dict + +import zeroconf +from zeroconf import Zeroconf, ServiceBrowser + +from platypush.context import get_bus +from platypush.message.event.zeroconf import ZeroconfServiceAddedEvent, ZeroconfServiceRemovedEvent, \ + ZeroconfServiceUpdatedEvent, ZeroconfEvent +from platypush.plugins import Plugin, action + + +class ZeroconfListener(zeroconf.ServiceListener): + def __init__(self, evt_queue: queue.Queue): + super().__init__() + self.evt_queue = evt_queue + + # noinspection PyUnusedLocal + def add_service(self, zc: Zeroconf, type_: str, name: str): + self.evt_queue.put(ZeroconfServiceAddedEvent(service_type=type_, service_name=name)) + + # noinspection PyUnusedLocal + def remove_service(self, zc: Zeroconf, type_: str, name: str): + self.evt_queue.put(ZeroconfServiceRemovedEvent(service_type=type_, service_name=name)) + + # noinspection PyUnusedLocal + def update_service(self, zc: Zeroconf, type_: str, name: str): + self.evt_queue.put(ZeroconfServiceUpdatedEvent(service_type=type_, service_name=name)) + + +class ZeroconfPlugin(Plugin): + """ + Plugin for Zeroconf services discovery. + + Requires: + + * **zeroconf** (``pip install zeroconf``) + """ + + @action + def get_services(self, timeout: int = 5) -> List[str]: + """ + Get the full list of services found on the network. + + :param timeout: Discovery timeout in seconds (default: 5). + :return: List of the services as strings. + """ + return list(zeroconf.ZeroconfServiceTypes.find(timeout=timeout)) + + @action + def discover_service(self, service: str, timeout: int = 5) -> Dict[str, List[str]]: + """ + Find all the services matching the specified type. + + :param service: Service type (e.g. ``_http._tcp.local.``). + :param timeout: Browser timeout in seconds (default: 5). + :return: A ``service_type -> [service_names]`` mapping. Example:: + + { + "_platypush-http._tcp.local.": [ + "host1._platypush-http._tcp.local.", + "host2._platypush-http._tcp.local." + ], + ... + } + + """ + evt_queue = queue.Queue() + zc = Zeroconf() + listener = ZeroconfListener(evt_queue=evt_queue) + browser = ServiceBrowser(zc, service, listener) + discovery_start = time.time() + services = {} + + try: + while time.time() - discovery_start < timeout: + to = discovery_start + timeout - time.time() + try: + evt: ZeroconfEvent = evt_queue.get(block=True, timeout=to) + if isinstance(evt, ZeroconfServiceAddedEvent) or isinstance(evt, ZeroconfServiceUpdatedEvent): + if evt.service_type not in services: + services[evt.service_type] = set() + + services[evt.service_type].add(evt.service_name) + elif isinstance(evt, ZeroconfServiceRemovedEvent): + if evt.service_type in services: + if evt.service_name in services[evt.service_type]: + services[evt.service_type].remove(evt.service_name) + if not services[evt.service_type]: + del services[evt.service_type] + + get_bus().post(evt) + except queue.Empty: + if not services: + self.logger.warning('No such service discovered: {}'.format(service)) + finally: + browser.cancel() + zc.close() + + return { + type_: list(names) + for type_, names in services.items() + } + + +# vim:sw=4:ts=4:et: diff --git a/requirements.txt b/requirements.txt index aa0db5799e..24aa82d8e8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -35,6 +35,9 @@ sqlalchemy # Support for multi-users and password authentication bcrypt +# Support for Zeroconf/Bonjour +zeroconf + # RSS feeds support # feedparser diff --git a/setup.py b/setup.py index e07bb1d833..c75430ff08 100755 --- a/setup.py +++ b/setup.py @@ -161,6 +161,7 @@ setup( 'websockets', 'websocket-client', 'wheel', + 'zeroconf', ], extras_require={