import queue import socket import time from typing import List, Dict, Any, Optional, Union from zeroconf import ( Zeroconf, ServiceInfo, ServiceBrowser, ServiceListener, ZeroconfServiceTypes, ) from platypush.context import get_bus from platypush.message.event.zeroconf import ( ZeroconfServiceAddedEvent, ZeroconfServiceRemovedEvent, ZeroconfServiceUpdatedEvent, ) from platypush.plugins import Plugin, action class ZeroconfListener(ServiceListener): def __init__(self, evt_queue: queue.Queue): super().__init__() self.evt_queue = evt_queue @classmethod def get_service_info(cls, zc: Zeroconf, type_: str, name: str) -> dict: info = zc.get_service_info(type_, name) if not info: return {} return cls.parse_service_info(info) @staticmethod def parse_service_info(info: ServiceInfo) -> dict: return { 'addresses': [ socket.inet_ntoa(addr) for addr in info.addresses if info.addresses ], 'port': info.port, 'host_ttl': info.host_ttl, 'other_ttl': info.other_ttl, 'priority': info.priority, 'properties': { k.decode() if isinstance(k, bytes) else k: (v.decode() if isinstance(v, bytes) else v) for k, v in info.properties.items() }, 'server': info.server, 'weight': info.weight, } def add_service(self, zc: Zeroconf, type_: str, name: str): info = self.get_service_info(zc, type_, name) self.evt_queue.put( ZeroconfServiceAddedEvent( service_type=type_, service_name=name, service_info=info ) ) def remove_service(self, zc: Zeroconf, type_: str, name: str): info = self.get_service_info(zc, type_, name) self.evt_queue.put( ZeroconfServiceRemovedEvent( service_type=type_, service_name=name, service_info=info ) ) def update_service(self, zc: Zeroconf, type_: str, name: str): info = self.get_service_info(zc, type_, name) self.evt_queue.put( ZeroconfServiceUpdatedEvent( service_type=type_, service_name=name, service_info=info ) ) class ZeroconfPlugin(Plugin): """ Plugin for Zeroconf services discovery. """ def __init__(self, **kwargs): super().__init__(**kwargs) self._discovery_in_progress = False @action def get_services(self, timeout: int = 5) -> List[str]: """ Get the full list of services found on the network. :param timeout: Discovery timeout in seconds (default: 5). :return: List of the services as strings. """ return list(ZeroconfServiceTypes.find(timeout=timeout)) @action def discover_service( self, service: Union[str, list], timeout: Optional[int] = 5 ) -> Dict[str, Any]: """ Find all the services matching the specified type. :param service: Service type (e.g. ``_http._tcp.local.``) or list of service types. :param timeout: Browser timeout in seconds (default: 5). Specify None for no timeout - in such case the discovery will loop forever and generate events upon service changes. :return: A ``service_type -> [service_names]`` mapping. Example: .. code-block:: json { "host1._platypush-http._tcp.local.": { "type": "_platypush-http._tcp.local.", "name": "host1._platypush-http._tcp.local.", "info": { "addresses": ["192.168.1.11"], "port": 8008, "host_ttl": 120, "other_ttl": 4500, "priority": 0, "properties": { "name": "Platypush", "vendor": "Platypush", "version": "0.13.2" }, "server": "host1._platypush-http._tcp.local.", "weight": 0 } } } """ assert not self._discovery_in_progress, 'A discovery process is already running' self._discovery_in_progress = True evt_queue = queue.Queue() zc = Zeroconf() listener = ZeroconfListener(evt_queue=evt_queue) discovery_start = time.time() services = {} browser = None try: browser = ServiceBrowser(zc, service, listener) while timeout and time.time() - discovery_start < timeout: to = discovery_start + timeout - time.time() if timeout else None try: evt = evt_queue.get(block=True, timeout=to) if isinstance( evt, (ZeroconfServiceAddedEvent, ZeroconfServiceUpdatedEvent) ): services[evt.service_name] = { 'type': evt.service_type, 'name': evt.service_name, 'info': evt.service_info, } elif isinstance(evt, ZeroconfServiceRemovedEvent): services.pop(evt.service_name, None) get_bus().post(evt) except queue.Empty: if not services: self.logger.warning('No such service discovered: %s', service) finally: if browser: browser.cancel() zc.close() self._discovery_in_progress = False return services # vim:sw=4:ts=4:et: