diff --git a/platypush/backend/__init__.py b/platypush/backend/__init__.py index 545ef3dd..7ccc0d88 100644 --- a/platypush/backend/__init__.py +++ b/platypush/backend/__init__.py @@ -16,6 +16,7 @@ from platypush.bus import Bus from platypush.config import Config from platypush.context import get_backend from platypush.message.event.zeroconf import ZeroconfServiceAddedEvent, ZeroconfServiceRemovedEvent +from platypush.plugins.zeroconf import ZeroconfListener from platypush.utils import set_timeout, clear_timeout, \ get_redis_queue_name_by_message, set_thread_name @@ -355,7 +356,8 @@ class Backend(Thread, EventGenerator): properties=srv_desc) self.zeroconf.register_service(self.zeroconf_info) - self.bus.post(ZeroconfServiceAddedEvent(service_type=srv_type, service_name=srv_name)) + self.bus.post(ZeroconfServiceAddedEvent(service_type=srv_type, service_name=srv_name, + service_info=ZeroconfListener.parse_service_info(self.zeroconf_info))) def unregister_service(self): """ diff --git a/platypush/message/event/zeroconf.py b/platypush/message/event/zeroconf.py index 1bc72b17..b8119663 100644 --- a/platypush/message/event/zeroconf.py +++ b/platypush/message/event/zeroconf.py @@ -10,12 +10,14 @@ class ZeroconfEventType(enum.Enum): class ZeroconfEvent(Event): - def __init__(self, service_event: ZeroconfEventType, service_type: str, service_name: str, *args, **kwargs): + def __init__(self, service_event: ZeroconfEventType, service_type: str, service_name: str, + service_info: dict, *args, **kwargs): super().__init__(*args, service_event=service_event.value, service_type=service_type, - service_name=service_name, **kwargs) + service_name=service_name, service_info=service_info, **kwargs) self.service_type = service_type self.service_name = service_name + self.service_info = service_info class ZeroconfServiceAddedEvent(ZeroconfEvent): diff --git a/platypush/plugins/zeroconf.py b/platypush/plugins/zeroconf.py index 67caade6..2c39b3f0 100644 --- a/platypush/plugins/zeroconf.py +++ b/platypush/plugins/zeroconf.py @@ -1,9 +1,10 @@ import queue +import socket import time -from typing import List, Dict +from typing import List, Dict, Any, Optional import zeroconf -from zeroconf import Zeroconf, ServiceBrowser +from zeroconf import Zeroconf, ServiceInfo, ServiceBrowser from platypush.context import get_bus from platypush.message.event.zeroconf import ZeroconfServiceAddedEvent, ZeroconfServiceRemovedEvent, \ @@ -16,28 +17,57 @@ class ZeroconfListener(zeroconf.ServiceListener): super().__init__() self.evt_queue = evt_queue - # noinspection PyUnusedLocal + @classmethod + def get_service_info(cls, zc: Zeroconf, type_: str, name: str) -> dict: + info = zc.get_service_info(type_, name) + return cls.parse_service_info(info) + + @staticmethod + def parse_service_info(info: ServiceInfo) -> dict: + return { + 'addresses': [socket.inet_ntoa(addr) for addr in info.addresses], + 'port': info.port, + 'host_ttl': info.host_ttl, + 'other_ttl': info.other_ttl, + 'priority': info.priority, + 'properties': {k.decode() if isinstance(k, bytes) else k: v.decode() if isinstance(v, bytes) else v + for k, v in info.properties.items()}, + 'server': info.server, + 'weight': info.weight, + } + def add_service(self, zc: Zeroconf, type_: str, name: str): - self.evt_queue.put(ZeroconfServiceAddedEvent(service_type=type_, service_name=name)) + info = self.get_service_info(zc, type_, name) + self.evt_queue.put(ZeroconfServiceAddedEvent(service_type=type_, service_name=name, service_info=info)) - # noinspection PyUnusedLocal def remove_service(self, zc: Zeroconf, type_: str, name: str): - self.evt_queue.put(ZeroconfServiceRemovedEvent(service_type=type_, service_name=name)) + info = self.get_service_info(zc, type_, name) + self.evt_queue.put(ZeroconfServiceRemovedEvent(service_type=type_, service_name=name, service_info=info)) - # noinspection PyUnusedLocal def update_service(self, zc: Zeroconf, type_: str, name: str): - self.evt_queue.put(ZeroconfServiceUpdatedEvent(service_type=type_, service_name=name)) + info = self.get_service_info(zc, type_, name) + self.evt_queue.put(ZeroconfServiceUpdatedEvent(service_type=type_, service_name=name, service_info=info)) class ZeroconfPlugin(Plugin): """ Plugin for Zeroconf services discovery. + Triggers: + + * :class:`platypush.message.event.zeroconf.ZeroconfServiceAddedEvent` when a new service is discovered. + * :class:`platypush.message.event.zeroconf.ZeroconfServiceUpdatedEvent` when a service is updated. + * :class:`platypush.message.event.zeroconf.ZeroconfServiceRemovedEvent` when a service is removed. + Requires: * **zeroconf** (``pip install zeroconf``) """ + def __init__(self, **kwargs): + super().__init__(**kwargs) + self._discovery_in_progress = False + @action def get_services(self, timeout: int = 5) -> List[str]: """ @@ -49,23 +79,41 @@ class ZeroconfPlugin(Plugin): return list(zeroconf.ZeroconfServiceTypes.find(timeout=timeout)) @action - def discover_service(self, service: str, timeout: int = 5) -> Dict[str, List[str]]: + def discover_service(self, service: str, timeout: Optional[int] = 5) -> Dict[str, Any]: """ Find all the services matching the specified type. :param service: Service type (e.g. ``_http._tcp.local.``). - :param timeout: Browser timeout in seconds (default: 5). + :param timeout: Browser timeout in seconds (default: 5). Specify None for no timeout - in such case the + discovery will loop forever and generate events upon service changes. :return: A ``service_type -> [service_names]`` mapping. Example:: { - "_platypush-http._tcp.local.": [ - "host1._platypush-http._tcp.local.", - "host2._platypush-http._tcp.local." - ], + "host1._platypush-http._tcp.local.": { + "type": "_platypush-http._tcp.local.", + "name": "host1._platypush-http._tcp.local.", + "info": { + "addresses": ["192.168.1.11"], + "port": 8008, + "host_ttl": 120, + "other_ttl": 4500, + "priority": 0, + "properties": { + "name": "Platypush", + "vendor": "Platypush", + "version": "0.13.2" + }, + "server": "host1._platypush-http._tcp.local.", + "weight": 0 + } + }, ... } """ + assert not self._discovery_in_progress, 'A discovery process is already running' + self._discovery_in_progress = True + evt_queue = queue.Queue() zc = Zeroconf() listener = ZeroconfListener(evt_queue=evt_queue) @@ -74,21 +122,19 @@ class ZeroconfPlugin(Plugin): services = {} try: - while time.time() - discovery_start < timeout: - to = discovery_start + timeout - time.time() + while timeout and time.time() - discovery_start < timeout: + to = discovery_start + timeout - time.time() if timeout else None try: evt: ZeroconfEvent = evt_queue.get(block=True, timeout=to) if isinstance(evt, ZeroconfServiceAddedEvent) or isinstance(evt, ZeroconfServiceUpdatedEvent): - if evt.service_type not in services: - services[evt.service_type] = set() - - services[evt.service_type].add(evt.service_name) + services[evt.service_name] = { + 'type': evt.service_type, + 'name': evt.service_name, + 'info': evt.service_info, + } elif isinstance(evt, ZeroconfServiceRemovedEvent): - if evt.service_type in services: - if evt.service_name in services[evt.service_type]: - services[evt.service_type].remove(evt.service_name) - if not services[evt.service_type]: - del services[evt.service_type] + if evt.service_name in services: + del services[evt.service_name] get_bus().post(evt) except queue.Empty: @@ -97,11 +143,9 @@ class ZeroconfPlugin(Plugin): finally: browser.cancel() zc.close() + self._discovery_in_progress = False - return { - type_: list(names) - for type_, names in services.items() - } + return services # vim:sw=4:ts=4:et: