Support for extended information in zeroconf.discover_service

This commit is contained in:
Fabio Manganiello 2020-08-14 15:52:24 +02:00
parent 72bb159263
commit 20b095232d
3 changed files with 80 additions and 32 deletions

View file

@ -16,6 +16,7 @@ from platypush.bus import Bus
from platypush.config import Config from platypush.config import Config
from platypush.context import get_backend from platypush.context import get_backend
from platypush.message.event.zeroconf import ZeroconfServiceAddedEvent, ZeroconfServiceRemovedEvent from platypush.message.event.zeroconf import ZeroconfServiceAddedEvent, ZeroconfServiceRemovedEvent
from platypush.plugins.zeroconf import ZeroconfListener
from platypush.utils import set_timeout, clear_timeout, \ from platypush.utils import set_timeout, clear_timeout, \
get_redis_queue_name_by_message, set_thread_name get_redis_queue_name_by_message, set_thread_name
@ -355,7 +356,8 @@ class Backend(Thread, EventGenerator):
properties=srv_desc) properties=srv_desc)
self.zeroconf.register_service(self.zeroconf_info) self.zeroconf.register_service(self.zeroconf_info)
self.bus.post(ZeroconfServiceAddedEvent(service_type=srv_type, service_name=srv_name)) self.bus.post(ZeroconfServiceAddedEvent(service_type=srv_type, service_name=srv_name,
service_info=ZeroconfListener.parse_service_info(self.zeroconf_info)))
def unregister_service(self): def unregister_service(self):
""" """

View file

@ -10,12 +10,14 @@ class ZeroconfEventType(enum.Enum):
class ZeroconfEvent(Event): class ZeroconfEvent(Event):
def __init__(self, service_event: ZeroconfEventType, service_type: str, service_name: str, *args, **kwargs): def __init__(self, service_event: ZeroconfEventType, service_type: str, service_name: str,
service_info: dict, *args, **kwargs):
super().__init__(*args, service_event=service_event.value, service_type=service_type, super().__init__(*args, service_event=service_event.value, service_type=service_type,
service_name=service_name, **kwargs) service_name=service_name, service_info=service_info, **kwargs)
self.service_type = service_type self.service_type = service_type
self.service_name = service_name self.service_name = service_name
self.service_info = service_info
class ZeroconfServiceAddedEvent(ZeroconfEvent): class ZeroconfServiceAddedEvent(ZeroconfEvent):

View file

@ -1,9 +1,10 @@
import queue import queue
import socket
import time import time
from typing import List, Dict from typing import List, Dict, Any, Optional
import zeroconf import zeroconf
from zeroconf import Zeroconf, ServiceBrowser from zeroconf import Zeroconf, ServiceInfo, ServiceBrowser
from platypush.context import get_bus from platypush.context import get_bus
from platypush.message.event.zeroconf import ZeroconfServiceAddedEvent, ZeroconfServiceRemovedEvent, \ from platypush.message.event.zeroconf import ZeroconfServiceAddedEvent, ZeroconfServiceRemovedEvent, \
@ -16,28 +17,57 @@ class ZeroconfListener(zeroconf.ServiceListener):
super().__init__() super().__init__()
self.evt_queue = evt_queue self.evt_queue = evt_queue
# noinspection PyUnusedLocal @classmethod
def get_service_info(cls, zc: Zeroconf, type_: str, name: str) -> dict:
info = zc.get_service_info(type_, name)
return cls.parse_service_info(info)
@staticmethod
def parse_service_info(info: ServiceInfo) -> dict:
return {
'addresses': [socket.inet_ntoa(addr) for addr in info.addresses],
'port': info.port,
'host_ttl': info.host_ttl,
'other_ttl': info.other_ttl,
'priority': info.priority,
'properties': {k.decode() if isinstance(k, bytes) else k: v.decode() if isinstance(v, bytes) else v
for k, v in info.properties.items()},
'server': info.server,
'weight': info.weight,
}
def add_service(self, zc: Zeroconf, type_: str, name: str): def add_service(self, zc: Zeroconf, type_: str, name: str):
self.evt_queue.put(ZeroconfServiceAddedEvent(service_type=type_, service_name=name)) info = self.get_service_info(zc, type_, name)
self.evt_queue.put(ZeroconfServiceAddedEvent(service_type=type_, service_name=name, service_info=info))
# noinspection PyUnusedLocal
def remove_service(self, zc: Zeroconf, type_: str, name: str): def remove_service(self, zc: Zeroconf, type_: str, name: str):
self.evt_queue.put(ZeroconfServiceRemovedEvent(service_type=type_, service_name=name)) info = self.get_service_info(zc, type_, name)
self.evt_queue.put(ZeroconfServiceRemovedEvent(service_type=type_, service_name=name, service_info=info))
# noinspection PyUnusedLocal
def update_service(self, zc: Zeroconf, type_: str, name: str): def update_service(self, zc: Zeroconf, type_: str, name: str):
self.evt_queue.put(ZeroconfServiceUpdatedEvent(service_type=type_, service_name=name)) info = self.get_service_info(zc, type_, name)
self.evt_queue.put(ZeroconfServiceUpdatedEvent(service_type=type_, service_name=name, service_info=info))
class ZeroconfPlugin(Plugin): class ZeroconfPlugin(Plugin):
""" """
Plugin for Zeroconf services discovery. Plugin for Zeroconf services discovery.
Triggers:
* :class:`platypush.message.event.zeroconf.ZeroconfServiceAddedEvent` when a new service is discovered.
* :class:`platypush.message.event.zeroconf.ZeroconfServiceUpdatedEvent` when a service is updated.
* :class:`platypush.message.event.zeroconf.ZeroconfServiceRemovedEvent` when a service is removed.
Requires: Requires:
* **zeroconf** (``pip install zeroconf``) * **zeroconf** (``pip install zeroconf``)
""" """
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._discovery_in_progress = False
@action @action
def get_services(self, timeout: int = 5) -> List[str]: def get_services(self, timeout: int = 5) -> List[str]:
""" """
@ -49,23 +79,41 @@ class ZeroconfPlugin(Plugin):
return list(zeroconf.ZeroconfServiceTypes.find(timeout=timeout)) return list(zeroconf.ZeroconfServiceTypes.find(timeout=timeout))
@action @action
def discover_service(self, service: str, timeout: int = 5) -> Dict[str, List[str]]: def discover_service(self, service: str, timeout: Optional[int] = 5) -> Dict[str, Any]:
""" """
Find all the services matching the specified type. Find all the services matching the specified type.
:param service: Service type (e.g. ``_http._tcp.local.``). :param service: Service type (e.g. ``_http._tcp.local.``).
:param timeout: Browser timeout in seconds (default: 5). :param timeout: Browser timeout in seconds (default: 5). Specify None for no timeout - in such case the
discovery will loop forever and generate events upon service changes.
:return: A ``service_type -> [service_names]`` mapping. Example:: :return: A ``service_type -> [service_names]`` mapping. Example::
{ {
"_platypush-http._tcp.local.": [ "host1._platypush-http._tcp.local.": {
"host1._platypush-http._tcp.local.", "type": "_platypush-http._tcp.local.",
"host2._platypush-http._tcp.local." "name": "host1._platypush-http._tcp.local.",
], "info": {
"addresses": ["192.168.1.11"],
"port": 8008,
"host_ttl": 120,
"other_ttl": 4500,
"priority": 0,
"properties": {
"name": "Platypush",
"vendor": "Platypush",
"version": "0.13.2"
},
"server": "host1._platypush-http._tcp.local.",
"weight": 0
}
},
... ...
} }
""" """
assert not self._discovery_in_progress, 'A discovery process is already running'
self._discovery_in_progress = True
evt_queue = queue.Queue() evt_queue = queue.Queue()
zc = Zeroconf() zc = Zeroconf()
listener = ZeroconfListener(evt_queue=evt_queue) listener = ZeroconfListener(evt_queue=evt_queue)
@ -74,21 +122,19 @@ class ZeroconfPlugin(Plugin):
services = {} services = {}
try: try:
while time.time() - discovery_start < timeout: while timeout and time.time() - discovery_start < timeout:
to = discovery_start + timeout - time.time() to = discovery_start + timeout - time.time() if timeout else None
try: try:
evt: ZeroconfEvent = evt_queue.get(block=True, timeout=to) evt: ZeroconfEvent = evt_queue.get(block=True, timeout=to)
if isinstance(evt, ZeroconfServiceAddedEvent) or isinstance(evt, ZeroconfServiceUpdatedEvent): if isinstance(evt, ZeroconfServiceAddedEvent) or isinstance(evt, ZeroconfServiceUpdatedEvent):
if evt.service_type not in services: services[evt.service_name] = {
services[evt.service_type] = set() 'type': evt.service_type,
'name': evt.service_name,
services[evt.service_type].add(evt.service_name) 'info': evt.service_info,
}
elif isinstance(evt, ZeroconfServiceRemovedEvent): elif isinstance(evt, ZeroconfServiceRemovedEvent):
if evt.service_type in services: if evt.service_name in services:
if evt.service_name in services[evt.service_type]: del services[evt.service_name]
services[evt.service_type].remove(evt.service_name)
if not services[evt.service_type]:
del services[evt.service_type]
get_bus().post(evt) get_bus().post(evt)
except queue.Empty: except queue.Empty:
@ -97,11 +143,9 @@ class ZeroconfPlugin(Plugin):
finally: finally:
browser.cancel() browser.cancel()
zc.close() zc.close()
self._discovery_in_progress = False
return { return services
type_: list(names)
for type_, names in services.items()
}
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et: