From 3de510da68cbc8d9f63d65768d54fe51a017274f Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Wed, 6 Sep 2023 02:44:56 +0200 Subject: [PATCH] Migrated `zigbee.mqtt` integration. The plugin has been migrated to the new `mqtt` API and the legacy listener that extended `MqttBackend` has been removed and merged into the plugin. --- platypush/plugins/zigbee/mqtt/__init__.py | 1205 ++++++++++++++------ platypush/plugins/zigbee/mqtt/_listener.py | 269 ----- 2 files changed, 831 insertions(+), 643 deletions(-) delete mode 100644 platypush/plugins/zigbee/mqtt/_listener.py diff --git a/platypush/plugins/zigbee/mqtt/__init__.py b/platypush/plugins/zigbee/mqtt/__init__.py index 8a5fc6cbff..f1a1e7107b 100644 --- a/platypush/plugins/zigbee/mqtt/__init__.py +++ b/platypush/plugins/zigbee/mqtt/__init__.py @@ -1,3 +1,6 @@ +import contextlib +from dataclasses import dataclass, field +from enum import Enum import json import re import threading @@ -8,6 +11,7 @@ from typing import ( Collection, Dict, List, + Mapping, Optional, Tuple, Type, @@ -15,6 +19,10 @@ from typing import ( ) from typing_extensions import override +import paho.mqtt.client as mqtt + +from platypush.bus import Bus +from platypush.context import get_bus from platypush.entities import ( DimmerEntityManager, Entity, @@ -44,14 +52,109 @@ from platypush.entities.sensors import ( ) from platypush.entities.switches import Switch, EnumSwitch from platypush.entities.temperature import TemperatureSensor +from platypush.message.event.zigbee.mqtt import ( + ZigbeeMqttOnlineEvent, + ZigbeeMqttOfflineEvent, + ZigbeeMqttDevicePairingEvent, + ZigbeeMqttDeviceConnectedEvent, + ZigbeeMqttDeviceBannedEvent, + ZigbeeMqttDeviceRemovedEvent, + ZigbeeMqttDeviceRemovedFailedEvent, + ZigbeeMqttDeviceWhitelistedEvent, + ZigbeeMqttDeviceRenamedEvent, + ZigbeeMqttDeviceBindEvent, + ZigbeeMqttDevicePropertySetEvent, + ZigbeeMqttDeviceUnbindEvent, + ZigbeeMqttGroupAddedEvent, + ZigbeeMqttGroupAddedFailedEvent, + ZigbeeMqttGroupRemovedEvent, + ZigbeeMqttGroupRemovedFailedEvent, + ZigbeeMqttGroupRemoveAllEvent, + ZigbeeMqttGroupRemoveAllFailedEvent, + ZigbeeMqttErrorEvent, +) from platypush.message.response import Response -from platypush.plugins import RunnablePlugin -from platypush.plugins.mqtt import MqttPlugin, action +from platypush.plugins.mqtt import DEFAULT_TIMEOUT, MqttClient, MqttPlugin, action + + +class BridgeState(Enum): + """ + Known bridge states. + """ + + ONLINE = 'online' + OFFLINE = 'offline' + + +@dataclass +class ZigbeeDevicesInfo: + """ + Cached information about the devices on the network. + """ + + by_address: Dict[str, dict] = field(default_factory=dict) + by_name: Dict[str, dict] = field(default_factory=dict) + + def __contains__(self, name: str) -> bool: + """ + :return: True if the device with the given name exists in the cache. + """ + return name in self.by_name or name in self.by_address + + def get(self, name: str) -> Optional[dict]: + """ + Retrieves a cached device record either by name or by address. + """ + return self.by_address.get(name, self.by_name.get(name)) + + def add(self, device: dict): + """ + Adds a device record to the cache. + """ + if device.get('ieee_address'): + self.by_address[device['ieee_address']] = device + if device.get('friendly_name'): + self.by_name[device['friendly_name']] = device + + def remove(self, device: Union[str, dict]): + """ + Removes a device record from the cache. + """ + if isinstance(device, str): + dev = self.get(device) + if not dev: + return # No such device + else: + dev = device + + if dev.get('ieee_address'): + self.by_address.pop(dev['ieee_address'], None) + + if dev.get('friendly_name'): + self.by_name.pop(dev['friendly_name'], None) + + def reset(self, *keys: str): + """ + Reset the state for the devices with the given keys. + """ + + for k in keys: + self.by_address[k] = {} + self.by_name[k] = {} + + +@dataclass +class ZigbeeInfo: + """ + Cached information about the devices and groups on the network. + """ + + devices: ZigbeeDevicesInfo = field(default_factory=ZigbeeDevicesInfo) + groups: Dict[str, dict] = field(default_factory=dict) # pylint: disable=too-many-ancestors class ZigbeeMqttPlugin( - RunnablePlugin, MqttPlugin, DimmerEntityManager, EnumSwitchEntityManager, @@ -60,7 +163,7 @@ class ZigbeeMqttPlugin( SwitchEntityManager, ): """ - This plugin allows you to interact with Zigbee devices over MQTT through any Zigbee sniffer and + Support for Zigbee devices using any Zigbee adapter compatible with `zigbee2mqtt `_. In order to get started you'll need: @@ -71,15 +174,18 @@ class ZigbeeMqttPlugin( Instructions: - - Install `cc-tool `_ either from sources or from a package manager. + - Install `cc-tool `_ either from + sources or from a package manager. - Connect the Zigbee to your PC/RaspberryPi in this way: :: USB -> CC debugger -> downloader cable -> CC2531 -> USB - - The debugger and the adapter should be connected *at the same time*. If the later ``cc-tool`` command throws - up an error, put the device in sync while connected by pressing the _Reset_ button on the debugger. + - The debugger and the adapter should be connected *at the same time*. + If the later ``cc-tool`` command throws up an error, put the device in + sync while connected by pressing the _Reset_ button on the debugger. - Check where the device is mapped. On Linux it will usually be ``/dev/ttyACM0``. - - Download the latest `Z-Stack firmware `_ + - Download the latest `Z-Stack firmware + `_ to your device. Instructions for a CC2531 device: .. code-block:: shell @@ -91,10 +197,12 @@ class ZigbeeMqttPlugin( - You can disconnect your debugger and downloader cable once the firmware is flashed. - - Install ``zigbee2mqtt``. First install a node/npm environment, then either install ``zigbee2mqtt`` manually or - through your package manager. **NOTE**: many API breaking changes have occurred on Zigbee2MQTT 1.17.0, - therefore this integration will only be compatible with the version 1.17.0 of the service or higher versions. - Manual instructions: + - Install ``zigbee2mqtt``. First install a node/npm environment, then + either install ``zigbee2mqtt`` manually or through your package + manager. **NOTE**: many API breaking changes have occurred on + Zigbee2MQTT 1.17.0, therefore this integration will only be compatible + with the version 1.17.0 of the service or higher versions. Manual + instructions: .. code-block:: shell @@ -106,10 +214,12 @@ class ZigbeeMqttPlugin( cd /opt/zigbee2mqtt npm install - - You need to have an MQTT broker running somewhere. If not, you can install - `Mosquitto `_ through your package manager on any device in your network. + - You need to have an MQTT broker running somewhere. If not, you can + install `Mosquitto `_ through your package + manager on any device in your network. - - Edit the ``/opt/zigbee2mqtt/data/configuration.yaml`` file to match the configuration of your MQTT broker: + - Edit the ``/opt/zigbee2mqtt/data/configuration.yaml`` file to match + the configuration of your MQTT broker: .. code-block:: yaml @@ -123,27 +233,32 @@ class ZigbeeMqttPlugin( # user: my_user # password: my_password - - Also make sure that ``permit_join`` is set to ``True``, in order to allow Zigbee devices to join the network - while you're configuring it. It's equally important to set ``permit_join`` to ``False`` once you have - configured your network, to prevent accidental/malignant joins from outer Zigbee devices. + - Also make sure that ``permit_join`` is set to ``True``, in order to + allow Zigbee devices to join the network while you're configuring it. + It's equally important to set ``permit_join`` to ``False`` once you + have configured your network, to prevent accidental/malignant joins + from outer Zigbee devices. - - Start the ``zigbee2mqtt`` daemon on your device (the - `official documentation `_ - also contains instructions on how to configure it as a ``systemd`` service: + - Start the ``zigbee2mqtt`` daemon on your device (the `official + documentation + `_ + also contains instructions on how to configure it as a ``systemd`` + service: .. code-block:: shell cd /opt/zigbee2mqtt npm start - - If you have Zigbee devices that are paired to other bridges, unlink them or do a factory reset to pair them - to your new bridge. + - If you have Zigbee devices that are paired to other bridges, unlink + them or do a factory reset to pair them to your new bridge. - - If it all goes fine, once the daemon is running and a new device is found you should see traces like this in - the output of ``zigbee2mqtt``:: + - If it all goes fine, once the daemon is running and a new device is + found you should see traces like this in the output of + ``zigbee2mqtt``:: - zigbee2mqtt:info 2019-11-09T12:19:56: Successfully interviewed '0x00158d0001dc126a', device has - successfully been paired + zigbee2mqtt:info 2019-11-09T12:19:56: Successfully interviewed '0x00158d0001dc126a', + device has successfully been paired - You are now ready to use this integration. @@ -153,45 +268,31 @@ class ZigbeeMqttPlugin( Triggers: - * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttOnlineEvent` when the service comes online. - * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttOfflineEvent` when the service goes offline. - * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDevicePropertySetEvent` when the properties of a - connected device change. - * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDevicePairingEvent` when a device is pairing. - * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceConnectedEvent` when a device connects - to the network. - * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceBannedEvent` when a device is banned - from the network. - * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceRemovedEvent` when a device is removed - from the network. - * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceRemovedFailedEvent` when a request to - remove a device from the network fails. - * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceWhitelistedEvent` when a device is - whitelisted on the network. - * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceRenamedEvent` when a device is - renamed on the network. - * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceBindEvent` when a device bind event - occurs. - * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceUnbindEvent` when a device unbind event - occurs. - * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupAddedEvent` when a group is added. - * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupAddedFailedEvent` when a request to - add a new group fails. - * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupRemovedEvent` when a group is removed. - * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupRemovedFailedEvent` when a request to - remove a group fails. - * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupRemoveAllEvent` when all the devices - are removed from a group. - * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupRemoveAllFailedEvent` when a request to - remove all the devices from a group fails. - * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttErrorEvent` when an internal error occurs - on the zigbee2mqtt service. + * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttOnlineEvent` + * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttOfflineEvent` + * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDevicePropertySetEvent` + * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDevicePairingEvent` + * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceConnectedEvent` + * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceBannedEvent` + * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceRemovedEvent` + * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceRemovedFailedEvent` + * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceWhitelistedEvent` + * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceRenamedEvent` + * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceBindEvent` + * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceUnbindEvent` + * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupAddedEvent` + * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupAddedFailedEvent` + * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupRemovedEvent` + * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupRemovedFailedEvent` + * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupRemoveAllEvent` + * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupRemoveAllFailedEvent` + * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttErrorEvent` """ # noqa: E501 def __init__( self, - host: str = 'localhost', + host: str, port: int = 1883, base_topic: str = 'zigbee2mqtt', timeout: int = 10, @@ -204,25 +305,40 @@ class ZigbeeMqttPlugin( **kwargs, ): """ - :param host: Default MQTT broker where ``zigbee2mqtt`` publishes its messages (default: ``localhost``). + :param host: Default MQTT broker where ``zigbee2mqtt`` publishes its messages. :param port: Broker listen port (default: 1883). - :param base_topic: Topic prefix, as specified in ``/opt/zigbee2mqtt/data/configuration.yaml`` - (default: '``base_topic``'). - :param timeout: If the command expects from a response, then this timeout value will be used - (default: 60 seconds). - :param tls_cafile: If the connection requires TLS/SSL, specify the certificate authority file - (default: None) - :param tls_certfile: If the connection requires TLS/SSL, specify the certificate file (default: None) - :param tls_keyfile: If the connection requires TLS/SSL, specify the key file (default: None) - :param tls_version: If the connection requires TLS/SSL, specify the minimum TLS supported version - (default: None) - :param tls_ciphers: If the connection requires TLS/SSL, specify the supported ciphers (default: None) - :param username: If the connection requires user authentication, specify the username (default: None) - :param password: If the connection requires user authentication, specify the password (default: None) + :param base_topic: Topic prefix, as specified in + ``/opt/zigbee2mqtt/data/configuration.yaml`` (default: '``zigbee2mqtt``'). + :param timeout: If the command expects from a response, then this + timeout value will be used (default: 60 seconds). + :param tls_cafile: If the connection requires TLS/SSL, specify the + certificate authority file (default: None) + :param tls_certfile: If the connection requires TLS/SSL, specify the + certificate file (default: None) + :param tls_keyfile: If the connection requires TLS/SSL, specify the key + file (default: None) + :param tls_version: If the connection requires TLS/SSL, specify the + minimum TLS supported version (default: None) + :param tls_ciphers: If the connection requires TLS/SSL, specify the + supported ciphers (default: None) + :param username: If the connection requires user authentication, specify + the username (default: None) + :param password: If the connection requires user authentication, specify + the password (default: None) """ super().__init__( host=host, port=port, + topics=[ + f'{base_topic}/{topic}' + for topic in [ + 'bridge/state', + 'bridge/log', + 'bridge/logging', + 'bridge/devices', + 'bridge/groups', + ] + ], tls_certfile=tls_certfile, tls_keyfile=tls_keyfile, tls_version=tls_version, @@ -232,16 +348,21 @@ class ZigbeeMqttPlugin( **kwargs, ) + # Append a unique suffix to the client ID to avoid client name clashes + # with other MQTT plugins. + self.client_id += '-zigbee-mqtt' self.base_topic = base_topic self.timeout = timeout - self._info: Dict[str, dict] = { - 'devices_by_addr': {}, - 'devices_by_name': {}, - 'groups': {}, - } + self._info = ZigbeeInfo() + self._devices_meta: Dict[str, dict] = {} + self._bridge_state = BridgeState.OFFLINE @staticmethod def _get_properties(device: dict) -> dict: + """ + Static method that parses the properties of a device from its received + definition. + """ exposes = (device.get('definition') or {}).get('exposes', []).copy() properties = {} @@ -255,12 +376,17 @@ class ZigbeeMqttPlugin( @staticmethod def _get_options(device: dict) -> dict: + """ + Static method that parses the options of a device from its received + definition. + """ return { option['property']: option for option in (device.get('definition') or {}).get('options', []) if option.get('property') } + @override def transform_entities(self, entities: Collection[dict]) -> List[Entity]: compatible_entities = [] for dev in entities: @@ -368,6 +494,10 @@ class ZigbeeMqttPlugin( @staticmethod def _get_device_url(device_info: dict) -> Optional[str]: + """ + Static method that returns the zigbee2mqtt URL with the information + about a certain device, if the model is available in its definition. + """ model = device_info.get('definition', {}).get('model') if not model: return None @@ -376,6 +506,10 @@ class ZigbeeMqttPlugin( @staticmethod def _get_image_url(device_info: dict) -> Optional[str]: + """ + Static method that returns the zigbee2mqtt URL of the image of a + certain device, if the model is available in its definition. + """ model = device_info.get('definition', {}).get('model') if not model: return None @@ -383,13 +517,13 @@ class ZigbeeMqttPlugin( return f'https://www.zigbee2mqtt.io/images/devices/{model}.jpg' def _get_network_info(self, **kwargs) -> dict: + """ + Refreshes the network information. + """ self.logger.info('Fetching Zigbee network information') client = None mqtt_args = self._mqtt_args(**kwargs) - timeout = 30 - if 'timeout' in mqtt_args: - timeout = mqtt_args.pop('timeout') - + timeout = mqtt_args.pop('timeout', DEFAULT_TIMEOUT) info: Dict[str, Any] = { 'state': None, 'info': {}, @@ -400,27 +534,22 @@ class ZigbeeMqttPlugin( info_ready_events = {topic: threading.Event() for topic in info} - def _on_message(): - def callback(_, __, msg): - topic = msg.topic.split('/')[-1] - if topic in info: - info[topic] = ( - msg.payload.decode() - if topic == 'state' - else json.loads(msg.payload.decode()) - ) - info_ready_events[topic].set() - - return callback + def msg_callback(_, __, msg): + topic = msg.topic.split('/')[-1] + if topic in info: + info[topic] = ( + msg.payload.decode() + if topic == 'state' + else json.loads(msg.payload.decode()) + ) + info_ready_events[topic].set() try: - host = mqtt_args.pop('host') - port = mqtt_args.pop('port') client = self._get_client( # pylint: disable=unexpected-keyword-arg **mqtt_args ) - client.on_message = _on_message() - client.connect(host, port, keepalive=timeout) + client.on_message = msg_callback + client.connect() client.subscribe(self.base_topic + '/bridge/#') client.loop_start() @@ -432,16 +561,16 @@ class ZigbeeMqttPlugin( ) # Cache the new results - self._info['devices_by_name'] = { + self._info.devices.by_name = { self._preferred_name(device): device for device in info.get('devices', []) } - self._info['devices_by_addr'] = { + self._info.devices.by_address = { device['ieee_address']: device for device in info.get('devices', []) } - self._info['groups'] = { + self._info.groups = { group.get('name'): group for group in info.get('groups', []) } @@ -456,27 +585,59 @@ class ZigbeeMqttPlugin( return info - def _topic(self, topic): - return self.base_topic + '/' + topic + def _topic(self, topic: str) -> str: + """ + Utility method that construct a topic prefixed by the configured base + topic. + """ + return f'{self.base_topic}/{topic}' @staticmethod def _parse_response(response: Union[dict, Response]) -> dict: + """ + Utility method that flattens a response received on a zigbee2mqtt topic + into a dictionary. + """ if isinstance(response, Response): rs: dict = response.output # type: ignore response = rs - assert response.get('status') != 'error', response.get( - 'error', 'zigbee2mqtt error' + if isinstance(response, dict): + assert response.get('status') != 'error', response.get( + 'error', 'zigbee2mqtt error' + ) + + return response or {} + + def _run_request( + self, + topic: str, + msg: Union[dict, str], + reply_topic: Optional[str] = None, + **kwargs, + ) -> dict: + """ + Sends a request/message to the Zigbeebee2MQTT bridge and waits for a + response. + """ + return self._parse_response( + self.publish( # type: ignore + topic=topic, + msg=msg, + reply_topic=reply_topic, + **self._mqtt_args(**kwargs), + ) + or {} ) - return response @action def devices(self, **kwargs) -> List[Dict[str, Any]]: """ Get the list of devices registered to the service. - :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` - (default: query the default configured device). + :param kwargs: Extra arguments to be passed to + :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` (default: query + the default configured device). :return: List of paired devices. Example output: @@ -648,23 +809,22 @@ class ZigbeeMqttPlugin( self, permit: bool = True, timeout: Optional[float] = None, **kwargs ): """ - Enable/disable devices from joining the network. This is not persistent (will not be saved to - ``configuration.yaml``). + Enable/disable devices from joining the network. + + This is not persistent (it will not be saved to ``configuration.yaml``). :param permit: Set to True to allow joins, False otherwise. :param timeout: Allow/disallow joins only for this amount of time. - :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` - (default: query the default configured device). + :param kwargs: Extra arguments to be passed to + :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` (default: query + the default configured device). """ if timeout: - return self._parse_response( - self.publish( # type: ignore[reportGeneralTypeIssues] - topic=self._topic('bridge/request/permit_join'), - msg={'value': permit, 'time': timeout}, - reply_topic=self._topic('bridge/response/permit_join'), - **self._mqtt_args(**kwargs), - ) - or {} + return self._run_request( + topic=self._topic('bridge/request/permit_join'), + msg={'value': permit, 'time': timeout}, + reply_topic=self._topic('bridge/response/permit_join'), + **self._mqtt_args(**kwargs), ) return self.publish( @@ -676,12 +836,14 @@ class ZigbeeMqttPlugin( @action def factory_reset(self, **kwargs): """ - Perform a factory reset of a device connected to the network, following the procedure required by the particular - device (for instance, Hue bulbs require the Zigbee adapter to be close to the device while a button on the back - of the bulb is pressed). + Perform a factory reset of a device connected to the network, following + the procedure required by the particular device (for instance, Hue bulbs + require the Zigbee adapter to be close to the device while a button on + the back of the bulb is pressed). - :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` - (default: query the default configured device). + :param kwargs: Extra arguments to be passed to + :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` (default: query + the default configured device). """ self.publish( topic=self._topic('bridge/request/touchlink/factory_reset'), @@ -692,44 +854,46 @@ class ZigbeeMqttPlugin( @action def log_level(self, level: str, **kwargs): """ - Change the log level at runtime. This change will not be persistent. + Change the log level at runtime. + + This change will not be persistent. :param level: Possible values: 'debug', 'info', 'warn', 'error'. - :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` - (default: query the default configured device). + :param kwargs: Extra arguments to be passed to + :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` (default: query + the default configured device). """ - return self._parse_response( - self.publish( # type: ignore[reportGeneralTypeIssues] - topic=self._topic('bridge/request/config/log_level'), - msg={'value': level}, - reply_topic=self._topic('bridge/response/config/log_level'), - **self._mqtt_args(**kwargs), - ) + return self._run_request( + topic=self._topic('bridge/request/config/log_level'), + msg={'value': level}, + reply_topic=self._topic('bridge/response/config/log_level'), + **self._mqtt_args(**kwargs), ) @action def device_set_option(self, device: str, option: str, value: Any, **kwargs): """ - Change the options of a device. Options can only be changed, not added or deleted. + Change the options of a device. + + Options can only be changed, not added or deleted. :param device: Display name or IEEE address of the device. :param option: Option name. :param value: New value. - :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` - (default: query the default configured device). + :param kwargs: Extra arguments to be passed to + :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` (default: query + the default configured device). """ - return self._parse_response( - self.publish( # type: ignore[reportGeneralTypeIssues] - topic=self._topic('bridge/request/device/options'), - reply_topic=self._topic('bridge/response/device/options'), - msg={ - 'id': device, - 'options': { - option: value, - }, + return self._run_request( + topic=self._topic('bridge/request/device/options'), + reply_topic=self._topic('bridge/response/device/options'), + msg={ + 'id': device, + 'options': { + option: value, }, - **self._mqtt_args(**kwargs), - ) + }, + **self._mqtt_args(**kwargs), ) @action @@ -738,19 +902,19 @@ class ZigbeeMqttPlugin( Remove a device from the network. :param device: Display name of the device. - :param force: Force the remove also if the removal wasn't acknowledged by the device. Note: a forced remove - only removes the entry from the internal database, but the device is likely to connect again when + :param force: Force the remove also if the removal wasn't acknowledged + by the device. Note: a forced remove only removes the entry from the + internal database, but the device is likely to connect again when restarted unless it's factory reset (default: False). - :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` - (default: query the default configured device). + :param kwargs: Extra arguments to be passed to + :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` (default: query + the default configured device). """ - return self._parse_response( - self.publish( # type: ignore[reportGeneralTypeIssues] - topic=self._topic('bridge/request/device/remove'), - msg={'id': device, 'force': force}, - reply_topic=self._topic('bridge/response/device/remove'), - **self._mqtt_args(**kwargs), - ) + return self._run_request( + topic=self._topic('bridge/request/device/remove'), + msg={'id': device, 'force': force}, + reply_topic=self._topic('bridge/response/device/remove'), + **self._mqtt_args(**kwargs), ) @action @@ -759,35 +923,35 @@ class ZigbeeMqttPlugin( Ban a device from the network. :param device: Display name of the device. - :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` - (default: query the default configured device). + :param kwargs: Extra arguments to be passed to + :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` (default: query + the default configured device). """ - return self._parse_response( - self.publish( # type: ignore[reportGeneralTypeIssues] - topic=self._topic('bridge/request/device/ban'), - reply_topic=self._topic('bridge/response/device/ban'), - msg={'id': device}, - **self._mqtt_args(**kwargs), - ) + return self._run_request( + topic=self._topic('bridge/request/device/ban'), + reply_topic=self._topic('bridge/response/device/ban'), + msg={'id': device}, + **self._mqtt_args(**kwargs), ) @action def device_whitelist(self, device: str, **kwargs): """ - Whitelist a device on the network. Note: once at least a device is whitelisted, all the other non-whitelisted - devices will be removed from the network. + Whitelist a device on the network. + + Note: once at least a device is whitelisted, all the other + non-whitelisted devices will be removed from the network. :param device: Display name of the device. - :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` - (default: query the default configured device). + :param kwargs: Extra arguments to be passed to + :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` (default: query + the default configured device). """ - return self._parse_response( - self.publish( # type: ignore[reportGeneralTypeIssues] - topic=self._topic('bridge/request/device/whitelist'), - reply_topic=self._topic('bridge/response/device/whitelist'), - msg={'id': device}, - **self._mqtt_args(**kwargs), - ) + return self._run_request( + topic=self._topic('bridge/request/device/whitelist'), + reply_topic=self._topic('bridge/response/device/whitelist'), + msg={'id': device}, + **self._mqtt_args(**kwargs), ) @action @@ -796,16 +960,18 @@ class ZigbeeMqttPlugin( Rename a device on the network. :param name: New name. - :param device: Current name of the device to rename. If no name is specified then the rename will - affect the last device that joined the network. - :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` - (default: query the default configured device). + :param device: Current name of the device to rename. If no name is + specified then the rename will affect the last device that joined + the network. + :param kwargs: Extra arguments to be passed to + :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` (default: query + the default configured device). """ if name == device: self.logger.info('Old and new name are the same: nothing to do') - return + return None - devices = self.devices().output # type: ignore[reportGeneralTypeIssues] + devices: dict = self.devices().output # type: ignore assert not [ dev for dev in devices if dev.get('friendly_name') == name ], f'A device named {name} already exists on the network' @@ -821,17 +987,23 @@ class ZigbeeMqttPlugin( 'to': name, } - return self._parse_response( - self.publish( # type: ignore[reportGeneralTypeIssues] - topic=self._topic('bridge/request/device/rename'), - msg=req, - reply_topic=self._topic('bridge/response/device/rename'), - **self._mqtt_args(**kwargs), - ) + return self._run_request( + topic=self._topic('bridge/request/device/rename'), + msg=req, + reply_topic=self._topic('bridge/response/device/rename'), + **self._mqtt_args(**kwargs), ) @staticmethod def _build_device_get_request(values: List[Dict[str, Any]]) -> Dict[str, Any]: + """ + Prepares a ``device_get`` request, as a dictionary to be sent down to + the bridge. + + This makes sure that the properties and options are properly mapped and + converted. + """ + def extract_value(value: dict, root: dict, depth: int = 0): for feature in value.get('features', []): new_root = root @@ -860,11 +1032,14 @@ class ZigbeeMqttPlugin( return ret def _get_device_info(self, device: str, **kwargs) -> dict: - device_info = self._info['devices_by_name'].get( + """ + Get or retrieve the information about a device. + """ + device_info = self._info.devices.by_name.get( # First: check by friendly name device, # Second: check by address - self._info['devices_by_addr'].get(device, {}), + self._info.devices.by_address.get(device, {}), ) if not device_info: @@ -883,10 +1058,18 @@ class ZigbeeMqttPlugin( @staticmethod def _preferred_name(device: dict) -> str: + """ + Utility method that returns the preferred name of a device, on the basis + of which attributes are exposed (friendly name or IEEE address). + """ return device.get('friendly_name') or device.get('ieee_address') or '' @classmethod def _device_name_matches(cls, name: str, device: dict) -> bool: + """ + Utility method that checks if either the friendly name or IEEE address + of a device match a certain string. + """ name = str(cls._ieee_address(name)) return name == device.get('friendly_name') or name == device.get('ieee_address') @@ -895,14 +1078,19 @@ class ZigbeeMqttPlugin( self, device: str, property: Optional[str] = None, **kwargs ) -> Dict[str, Any]: """ - Get the properties of a device. The returned keys vary depending on the device. For example, a light bulb - may have the "``state``" and "``brightness``" properties, while an environment sensor may have the - "``temperature``" and "``humidity``" properties, and so on. + Get the properties of a device. + + The returned keys vary depending on the device. For example, a light + bulb may have the "``state``" and "``brightness``" properties, while an + environment sensor may have the "``temperature``" and "``humidity``" + properties, and so on. :param device: Display name of the device. - :param property: Name of the property that should be retrieved (default: all). - :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` - (default: query the default configured device). + :param property: Name of the property that should be retrieved (default: + all). + :param kwargs: Extra arguments to be passed to + :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` (default: query + the default configured device). :return: Key->value map of the device properties. """ kwargs = self._mqtt_args(**kwargs) @@ -911,12 +1099,12 @@ class ZigbeeMqttPlugin( device = self._preferred_name(device_info) if property: - properties = self.publish( + properties = self._run_request( topic=self._topic(device) + f'/get/{property}', reply_topic=self._topic(device), msg={property: ''}, **kwargs, - ).output # type: ignore[reportGeneralTypeIssues] + ) assert property in properties, f'No such property: {property}' return {property: properties[property]} @@ -932,12 +1120,12 @@ class ZigbeeMqttPlugin( if not req: reply_topic = None - return self.publish( + return self._run_request( topic=self._topic(device) + '/get', reply_topic=reply_topic, msg=req, **kwargs, - ).output # type: ignore[reportGeneralTypeIssues] + ) @action def devices_get( @@ -946,10 +1134,11 @@ class ZigbeeMqttPlugin( """ Get the properties of the devices connected to the network. - :param devices: If set, then only the status of these devices (by friendly name) will be retrieved (default: - retrieve all). - :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` - (default: query the default configured device). + :param devices: If set, then only the status of these devices (by + friendly name) will be retrieved (default: retrieve all). + :param kwargs: Extra arguments to be passed to + :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` (default: query + the default configured device). :return: Key->value map of the device properties: .. code-block:: json @@ -970,13 +1159,13 @@ class ZigbeeMqttPlugin( devices = list( { self._preferred_name(device) - for device in self.devices(**kwargs).output # type: ignore[reportGeneralTypeIssues] + for device in list(self.devices(**kwargs).output) # type: ignore if self._preferred_name(device) } ) def worker(device: str, q: Queue): - q.put(self.device_get(device, **kwargs).output) # type: ignore[reportGeneralTypeIssues] + q.put(self.device_get(device, **kwargs).output) # type: ignore queues: Dict[str, Queue] = {} workers = {} @@ -1005,7 +1194,8 @@ class ZigbeeMqttPlugin( @action def status(self, *args, device: Optional[str] = None, **kwargs): """ - Get the status of a device (by friendly name) or of all the connected devices (it wraps :meth:`.devices_get`). + Get the status of a device (by friendly name) or of all the connected + devices (it wraps :meth:`.devices_get`). :param device: Device friendly name (default: get all devices). """ @@ -1021,16 +1211,21 @@ class ZigbeeMqttPlugin( **kwargs, ): """ - Set a properties on a device. The compatible properties vary depending on the device. For example, a light bulb - may have the "``state``" and "``brightness``" properties, while an environment sensor may have the - "``temperature``" and "``humidity``" properties, and so on. + Set a properties on a device. + + The compatible properties vary depending on the device. For example, a + light bulb may have the "``state``" and "``brightness``" properties, + while an environment sensor may have the "``temperature``" and + "``humidity``" properties, and so on. :param device: Display name of the device. :param property: Name of the property that should be set. :param value: New value of the property. - :param values: If you want to set multiple values, then pass this mapping instead of ``property``+``value``. - :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` - (default: query the default configured device). + :param values: If you want to set multiple values, then pass this + mapping instead of ``property``+``value``. + :param kwargs: Extra arguments to be passed to + :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` (default: query + the default configured device). """ msg = (values or {}).copy() reply_topic = None @@ -1056,12 +1251,12 @@ class ZigbeeMqttPlugin( if self._is_write_only(stored_property): reply_topic = None - properties = self.publish( + properties = self._run_request( topic=self._topic(device + '/set'), reply_topic=reply_topic, msg=msg, **self._mqtt_args(**kwargs), - ).output # type: ignore[reportGeneralTypeIssues] + ) if property and reply_topic: assert ( @@ -1105,8 +1300,9 @@ class ZigbeeMqttPlugin( Check if the specified device has any OTA updates available to install. :param device: Address or friendly name of the device. - :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` - (default: query the default configured device). + :param kwargs: Extra arguments to be passed to + :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` (default: query + the default configured device). :return: @@ -1119,13 +1315,11 @@ class ZigbeeMqttPlugin( } """ - ret = self._parse_response( - self.publish( # type: ignore[reportGeneralTypeIssues] - topic=self._topic('bridge/request/device/ota_update/check'), - reply_topic=self._topic('bridge/response/device/ota_update/check'), - msg={'id': device}, - **self._mqtt_args(**kwargs), - ) + ret = self._run_request( + topic=self._topic('bridge/request/device/ota_update/check'), + reply_topic=self._topic('bridge/response/device/ota_update/check'), + msg={'id': device}, + **self._mqtt_args(**kwargs), ) return { @@ -1140,16 +1334,15 @@ class ZigbeeMqttPlugin( Install OTA updates for a device if available. :param device: Address or friendly name of the device. - :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` - (default: query the default configured device). + :param kwargs: Extra arguments to be passed to + :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` (default: query + the default configured device). """ - return self._parse_response( - self.publish( # type: ignore[reportGeneralTypeIssues] - topic=self._topic('bridge/request/device/ota_update/update'), - reply_topic=self._topic('bridge/response/device/ota_update/update'), - msg={'id': device}, - **self._mqtt_args(**kwargs), - ) + return self._run_request( + topic=self._topic('bridge/request/device/ota_update/update'), + reply_topic=self._topic('bridge/response/device/ota_update/update'), + msg={'id': device}, + **self._mqtt_args(**kwargs), ) @action @@ -1157,8 +1350,9 @@ class ZigbeeMqttPlugin( """ Get the groups registered on the device. - :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` - (default: query the default configured device). + :param kwargs: Extra arguments to be passed to + :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` (default: query + the default configured device). """ return self._get_network_info(**kwargs).get('groups', []) @@ -1167,8 +1361,9 @@ class ZigbeeMqttPlugin( """ Get the information, configuration and state of the network. - :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` - (default: query the default configured device). + :param kwargs: Extra arguments to be passed to + :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` (default: query + the default configured device). :return: Example: @@ -1305,8 +1500,9 @@ class ZigbeeMqttPlugin( :param name: Display name of the group. :param id: Optional numeric ID (default: auto-generated). - :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` - (default: query the default configured device). + :param kwargs: Extra arguments to be passed to + :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` (default: query + the default configured device). """ payload = ( name @@ -1317,13 +1513,11 @@ class ZigbeeMqttPlugin( } ) - return self._parse_response( - self.publish( # type: ignore[reportGeneralTypeIssues] - topic=self._topic('bridge/request/group/add'), - reply_topic=self._topic('bridge/response/group/add'), - msg=payload, - **self._mqtt_args(**kwargs), - ) + return self._run_request( + topic=self._topic('bridge/request/group/add'), + reply_topic=self._topic('bridge/response/group/add'), + msg=payload, + **self._mqtt_args(**kwargs), ) @action @@ -1331,28 +1525,34 @@ class ZigbeeMqttPlugin( self, group: str, property: Optional[str] = None, **kwargs ) -> dict: """ - Get one or more properties of a group. The compatible properties vary depending on the devices on the group. - For example, a light bulb may have the "``state``" (with values ``"ON"`` and ``"OFF"``) and "``brightness``" - properties, while an environment sensor may have the "``temperature``" and "``humidity``" properties, and so on. + Get one or more properties of a group. + + The compatible properties vary depending on the devices on the group. + For example, a light bulb may have the "``state``" (with values ``"ON"`` + and ``"OFF"``) and "``brightness``" properties, while an environment + sensor may have the "``temperature``" and "``humidity``" properties, and + so on. :param group: Display name of the group. - :param property: Name of the property to retrieve (default: all available properties) - :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` - (default: query the default configured device). + :param property: Name of the property to retrieve (default: all + available properties) + :param kwargs: Extra arguments to be passed to + :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` (default: query + the default configured device). """ msg = {} if property: msg = {property: ''} - properties = self.publish( + properties = self._run_request( topic=self._topic(group + '/get'), reply_topic=self._topic(group), msg=msg, **self._mqtt_args(**kwargs), - ).output # type: ignore[reportGeneralTypeIssues] + ) if property: - assert property in properties, 'No such property: ' + property + assert property in properties, f'No such property: {property}' return {property: properties[property]} return properties @@ -1362,25 +1562,31 @@ class ZigbeeMqttPlugin( self, group: str, property: str, value: Any, **kwargs ): """ - Set a properties on a group. The compatible properties vary depending on the devices on the group. - For example, a light bulb may have the "``state``" (with values ``"ON"`` and ``"OFF"``) and "``brightness``" - properties, while an environment sensor may have the "``temperature``" and "``humidity``" properties, and so on. + Set a properties on a group. + + The compatible properties vary depending on the devices on the group. + + For example, a light bulb may have the "``state``" (with values ``"ON"`` + and ``"OFF"``) and "``brightness``" properties, while an environment + sensor may have the "``temperature``" and "``humidity``" properties, and + so on. :param group: Display name of the group. :param property: Name of the property that should be set. :param value: New value of the property. - :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` - (default: query the default configured device). + :param kwargs: Extra arguments to be passed to + :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` (default: query + the default configured device). """ - properties = self.publish( + properties = self._run_request( topic=self._topic(group + '/set'), reply_topic=self._topic(group), msg={property: value}, **self._mqtt_args(**kwargs), - ).output # type: ignore[reportGeneralTypeIssues] + ) if property: - assert property in properties, 'No such property: ' + property + assert property in properties, f'No such property: {property}' return {property: properties[property]} return properties @@ -1392,27 +1598,26 @@ class ZigbeeMqttPlugin( :param name: New name. :param group: Current name of the group to rename. - :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` - (default: query the default configured device). + :param kwargs: Extra arguments to be passed to + :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` (default: query + the default configured device). """ if name == group: self.logger.info('Old and new name are the same: nothing to do') - return + return None groups = { - group.get('friendly_name'): group - for group in self.groups().output # type: ignore[reportGeneralTypeIssues] + g.get('friendly_name'): g + for g in dict(self.groups().output) # type: ignore } assert name not in groups, f'A group named {name} already exists on the network' - return self._parse_response( - self.publish( # type: ignore[reportGeneralTypeIssues] - topic=self._topic('bridge/request/group/rename'), - reply_topic=self._topic('bridge/response/group/rename'), - msg={'from': group, 'to': name} if group else name, - **self._mqtt_args(**kwargs), - ) + return self._run_request( + topic=self._topic('bridge/request/group/rename'), + reply_topic=self._topic('bridge/response/group/rename'), + msg={'from': group, 'to': name} if group else name, + **self._mqtt_args(**kwargs), ) @action @@ -1421,16 +1626,15 @@ class ZigbeeMqttPlugin( Remove a group. :param name: Display name of the group. - :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` - (default: query the default configured device). + :param kwargs: Extra arguments to be passed to + :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` (default: query + the default configured device). """ - return self._parse_response( - self.publish( # type: ignore[reportGeneralTypeIssues] - topic=self._topic('bridge/request/group/remove'), - reply_topic=self._topic('bridge/response/group/remove'), - msg=name, - **self._mqtt_args(**kwargs), - ) + return self._run_request( + topic=self._topic('bridge/request/group/remove'), + reply_topic=self._topic('bridge/response/group/remove'), + msg=name, + **self._mqtt_args(**kwargs), ) @action @@ -1440,19 +1644,18 @@ class ZigbeeMqttPlugin( :param group: Display name of the group. :param device: Display name of the device to be added. - :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` - (default: query the default configured device). + :param kwargs: Extra arguments to be passed to + :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` (default: query + the default configured device). """ - return self._parse_response( - self.publish( # type: ignore[reportGeneralTypeIssues] - topic=self._topic('bridge/request/group/members/add'), - reply_topic=self._topic('bridge/response/group/members/add'), - msg={ - 'group': group, - 'device': device, - }, - **self._mqtt_args(**kwargs), - ) + return self._run_request( + topic=self._topic('bridge/request/group/members/add'), + reply_topic=self._topic('bridge/response/group/members/add'), + msg={ + 'group': group, + 'device': device, + }, + **self._mqtt_args(**kwargs), ) @action @@ -1461,78 +1664,78 @@ class ZigbeeMqttPlugin( Remove a device from a group. :param group: Display name of the group. - :param device: Display name of the device to be removed. If none is specified then all the devices registered - to the specified group will be removed. - :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` - (default: query the default configured device). + :param device: Display name of the device to be removed. If none is + specified then all the devices registered to the specified group + will be removed. + :param kwargs: Extra arguments to be passed to + :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` (default: query + the default configured device). """ remove_suffix = '_all' if device is None else '' - return self._parse_response( - self.publish( # type: ignore[reportGeneralTypeIssues] - topic=self._topic( - f'bridge/request/group/members/remove{remove_suffix}' - ), - reply_topic=self._topic( - f'bridge/response/group/members/remove{remove_suffix}' - ), - msg={ - 'group': group, - 'device': device, - }, - **self._mqtt_args(**kwargs), - ) + return self._run_request( + topic=self._topic(f'bridge/request/group/members/remove{remove_suffix}'), + reply_topic=self._topic( + f'bridge/response/group/members/remove{remove_suffix}' + ), + msg={ + 'group': group, + 'device': device, + }, + **self._mqtt_args(**kwargs), ) @action def bind_devices(self, source: str, target: str, **kwargs): """ - Bind two devices. Binding makes it possible that devices can directly control each other without the - intervention of zigbee2mqtt or any home automation software. You may want to use this feature to bind - for example an IKEA/Philips Hue dimmer switch to a light bulb, or a Zigbee remote to a thermostat. - Read more on the `zigbee2mqtt binding page `_. + Bind two devices. - :param source: Name of the source device. It can also be a group name, although the support is - `still experimental `_. - You can also bind a specific device endpoint - for example ``MySensor/temperature``. - :param target: Name of the target device. - You can also bind a specific device endpoint - for example ``MyLight/state``. - :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` - (default: query the default configured device). + Binding makes it possible that devices can directly control each other + without the intervention of zigbee2mqtt or any home automation software. + You may want to use this feature to bind for example an IKEA/Philips Hue + dimmer switch to a light bulb, or a Zigbee remote to a thermostat. Read + more on the `zigbee2mqtt binding page + `_. + + :param source: Name of the source device. It can also be a group name, + although the support is `still experimental + `_. + You can also bind a specific device endpoint - for example + ``MySensor/temperature``. + :param target: Name of the target device. You can also bind a specific + device endpoint - for example ``MyLight/state``. + :param kwargs: Extra arguments to be passed to + :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` (default: query + the default configured device). """ - return self._parse_response( - self.publish( # type: ignore[reportGeneralTypeIssues] - topic=self._topic('bridge/request/device/bind'), - reply_topic=self._topic('bridge/response/device/bind'), - msg={'from': source, 'to': target}, - **self._mqtt_args(**kwargs), - ) + return self._run_request( + topic=self._topic('bridge/request/device/bind'), + reply_topic=self._topic('bridge/response/device/bind'), + msg={'from': source, 'to': target}, + **self._mqtt_args(**kwargs), ) @action def unbind_devices(self, source: str, target: str, **kwargs): """ - Un-bind two devices. + Remove a binding between two devices. - :param source: Name of the source device. - You can also bind a specific device endpoint - for example ``MySensor/temperature``. - :param target: Name of the target device. - You can also bind a specific device endpoint - for example ``MyLight/state``. - :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` - (default: query the default configured device). + :param source: Name of the source device. You can also bind a specific + device endpoint - for example ``MySensor/temperature``. + :param target: Name of the target device. You can also bind a specific + device endpoint - for example ``MyLight/state``. + :param kwargs: Extra arguments to be passed to + :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` (default: query + the default configured device). """ - return self._parse_response( - self.publish( # type: ignore[reportGeneralTypeIssues] - topic=self._topic('bridge/request/device/unbind'), - reply_topic=self._topic('bridge/response/device/unbind'), - msg={'from': source, 'to': target}, - **self._mqtt_args(**kwargs), - ) + return self._run_request( + topic=self._topic('bridge/request/device/unbind'), + reply_topic=self._topic('bridge/response/device/unbind'), + msg={'from': source, 'to': target}, + **self._mqtt_args(**kwargs), ) @action - def on( # pylint: disable=redefined-builtin,arguments-differ - self, device, *_, **__ - ): + def on(self, device, *_, **__): # pylint: disable=arguments-differ """ Turn on/set to true a switch, a binary property or an option. """ @@ -1542,9 +1745,7 @@ class ZigbeeMqttPlugin( ) @action - def off( # pylint: disable=redefined-builtin,arguments-differ - self, device, *_, **__ - ): + def off(self, device, *_, **__): # pylint: disable=arguments-differ """ Turn off/set to false a switch, a binary property or an option. """ @@ -1554,15 +1755,13 @@ class ZigbeeMqttPlugin( ) @action - def toggle( # pylint: disable=redefined-builtin,arguments-differ - self, device, *_, **__ - ): + def toggle(self, device, *_, **__): # pylint: disable=arguments-differ """ Toggles the state of a switch, a binary property or an option. """ device, prop_info = self._get_switch_info(device) prop = prop_info['property'] - device_state = self.device_get(device).output # type: ignore + device_state: dict = self.device_get(device).output # type: ignore return self.device_set( device, prop, @@ -1575,6 +1774,10 @@ class ZigbeeMqttPlugin( ) def _get_switch_info(self, name: str) -> Tuple[str, dict]: + """ + Get the information about a switch or switch-like device by name or + address. + """ name, prop = self._ieee_address_and_property(name) if not prop or prop == 'light': prop = 'state' @@ -1593,6 +1796,10 @@ class ZigbeeMqttPlugin( @staticmethod def _is_read_only(feature: dict) -> bool: + """ + Utility method that checks if a feature is read-only on the basis of its + access flags. + """ return bool(feature.get('access', 0) & 2) == 0 and ( bool(feature.get('access', 0) & 1) == 1 or bool(feature.get('access', 0) & 4) == 1 @@ -1600,6 +1807,10 @@ class ZigbeeMqttPlugin( @staticmethod def _is_write_only(feature: dict) -> bool: + """ + Utility method that checks if a feature is write-only on the basis of + its access flags. + """ return bool(feature.get('access', 0) & 2) == 1 and ( bool(feature.get('access', 0) & 1) == 0 or bool(feature.get('access', 0) & 4) == 0 @@ -1607,12 +1818,22 @@ class ZigbeeMqttPlugin( @staticmethod def _is_query_disabled(feature: dict) -> bool: + """ + Utility method that checks if a feature doesn't support programmating + querying (i.e. it will only broadcast its state when available) on the + basis of its access flags. + """ return bool(feature.get('access', 0) & 4) == 0 @staticmethod def _ieee_address_and_property( device: Union[dict, str] ) -> Tuple[str, Optional[str]]: + """ + Given a device property, as a dictionary containing the full device + definition or a string containing the device address and property, + return a tuple in the format ``(device_address, property_name)``. + """ # Entity value IDs are stored in the `
:` # format. Therefore, we need to split by `:` if we want to # retrieve the original address. @@ -1630,12 +1851,20 @@ class ZigbeeMqttPlugin( @classmethod def _ieee_address(cls, device: Union[dict, str]) -> str: + """ + :return: The IEEE address of a device, given its full definition or + common name. + """ return cls._ieee_address_and_property(device)[0] @classmethod def _get_switches( cls, device_info: dict, props: dict, options: dict ) -> List[Switch]: + """ + A utility method that parses the properties of a device that can be + mapped to switches (or switch-like entities). + """ return [ cls._to_entity( Switch, @@ -1660,9 +1889,13 @@ class ZigbeeMqttPlugin( ] @classmethod - def _get_sensors( + def _get_sensors( # pylint: disable=too-many-branches cls, device_info: dict, props: dict, options: dict ) -> List[Sensor]: + """ + A utility method that parses the properties of a device that can be + mapped to sensors (or sensor-like entities). + """ sensors = [] properties = [ prop @@ -1727,6 +1960,10 @@ class ZigbeeMqttPlugin( def _get_dimmers( cls, device_info: dict, props: dict, options: dict ) -> List[Dimmer]: + """ + A utility method that parses the properties of a device that can be + mapped to dimmers (or dimmer-like entities). + """ return [ cls._to_entity( Dimmer, @@ -1750,6 +1987,10 @@ class ZigbeeMqttPlugin( def _get_enum_switches( cls, device_info: dict, props: dict, options: dict ) -> List[EnumSwitch]: + """ + A utility method that parses the properties of a device that can be + mapped to switches with enum values. + """ return [ cls._to_entity( EnumSwitch, @@ -1776,6 +2017,10 @@ class ZigbeeMqttPlugin( options: dict, **kwargs, ) -> Entity: + """ + Give the information about a device and its properties and options, it + builds an entity of the right type. + """ return entity_type( id=f'{device_info["ieee_address"]}:{property["property"]}', name=property.get('description', ''), @@ -1788,8 +2033,12 @@ class ZigbeeMqttPlugin( @classmethod def _get_light_meta(cls, device_info: dict) -> dict: - exposes = (device_info.get('definition', {}) or {}).get('exposes', []) - for exposed in exposes: + """ + Parse the attributes of a device that can be mapped to lights (or + light-like entities). + """ + # pylint: disable=too-many-nested-blocks + for exposed in (device_info.get('definition', {}) or {}).get('exposes', []): if exposed.get('type') == 'light': features = exposed.get('features', []) switch = {} @@ -1814,7 +2063,7 @@ class ZigbeeMqttPlugin( 'value_on': feature['value_on'], 'value_off': feature['value_off'], 'state_name': feature['name'], - 'value_toggle': feature.get('value_toggle', None), + 'value_toggle': feature.get('value_toggle'), **data, } elif ( @@ -1954,15 +2203,223 @@ class ZigbeeMqttPlugin( self.device_set(self._preferred_name(dev), values=data) - def main(self): - from ._listener import ZigbeeMqttListener + @override + def on_mqtt_message(self): + """ + Overrides :meth:`platypush.plugins.mqtt.MqttPlugin.on_mqtt_message` to + handle messages from the zigbee2mqtt integration. + """ - listener = ZigbeeMqttListener() - listener.start() - self.wait_stop() + def handler(client: MqttClient, _, msg: mqtt.MQTTMessage): + topic = msg.topic[len(self.base_topic) + 1 :] + data = msg.payload.decode() + if not data: + return - listener.stop() - listener.join() + with contextlib.suppress(ValueError, TypeError): + data = json.loads(data) + + if topic == 'bridge/state': + self._process_state_message(client, data) + elif topic in ['bridge/log', 'bridge/logging']: + self._process_log_message(client, data) + elif topic == 'bridge/devices': + self._process_devices(client, data) + elif topic == 'bridge/groups': + self._process_groups(client, data) + elif isinstance(data, dict): + name = topic.split('/')[-1] + if name not in self._info.devices: + self.logger.debug('Skipping unknown topic: %s', topic) + return + + dev = self._info.devices.get(name) + assert dev is not None, f'No such device: {name}' + changed_props = {k: v for k, v in data.items() if v != dev.get(k)} + + if changed_props: + self._process_property_update(name, data) + self._bus.post( + ZigbeeMqttDevicePropertySetEvent( + host=client.host, + port=client.port, + device=name, + properties=changed_props, + ) + ) + + device_meta = self._devices_meta.get(name) + if device_meta: + data['friendly_name'] = device_meta.get('friendly_name') + data['ieee_address'] = device_meta.get('ieee_address') + self._info.devices.add(data) + + return handler + + @property + def _bus(self) -> Bus: + """ + Utility property for the bus. + """ + return get_bus() + + def _process_state_message(self, client: MqttClient, msg: str): + """ + Process a state message. + """ + if msg == self._bridge_state: + return + + if msg == 'online': + evt = ZigbeeMqttOnlineEvent + self._bridge_state = BridgeState.ONLINE + elif msg == 'offline': + evt = ZigbeeMqttOfflineEvent + self._bridge_state = BridgeState.OFFLINE + self.logger.warning('The zigbee2mqtt service is offline') + else: + return + + self._bus.post(evt(host=client.host, port=client.port)) + + # pylint: disable=too-many-branches + def _process_log_message(self, client, msg): + """ + Process a logevent. + """ + + msg_type = msg.get('type') + text = msg.get('message') + args = {'host': client._host, 'port': client._port} + + if msg_type == 'devices': + devices = {} + for dev in text or []: + devices[dev['friendly_name']] = dev + client.subscribe(self.base_topic + '/' + dev['friendly_name']) + elif msg_type == 'pairing': + self._bus.post(ZigbeeMqttDevicePairingEvent(device=text, **args)) + elif msg_type in ['device_ban', 'device_banned']: + self._bus.post(ZigbeeMqttDeviceBannedEvent(device=text, **args)) + elif msg_type in ['device_removed_failed', 'device_force_removed_failed']: + force = msg_type == 'device_force_removed_failed' + self._bus.post( + ZigbeeMqttDeviceRemovedFailedEvent(device=text, force=force, **args) + ) + elif msg_type == 'device_whitelisted': + self._bus.post(ZigbeeMqttDeviceWhitelistedEvent(device=text, **args)) + elif msg_type == 'device_renamed': + self._bus.post(ZigbeeMqttDeviceRenamedEvent(device=text, **args)) + elif msg_type == 'device_bind': + self._bus.post(ZigbeeMqttDeviceBindEvent(device=text, **args)) + elif msg_type == 'device_unbind': + self._bus.post(ZigbeeMqttDeviceUnbindEvent(device=text, **args)) + elif msg_type == 'device_group_add': + self._bus.post(ZigbeeMqttGroupAddedEvent(group=text, **args)) + elif msg_type == 'device_group_add_failed': + self._bus.post(ZigbeeMqttGroupAddedFailedEvent(group=text, **args)) + elif msg_type == 'device_group_remove': + self._bus.post(ZigbeeMqttGroupRemovedEvent(group=text, **args)) + elif msg_type == 'device_group_remove_failed': + self._bus.post(ZigbeeMqttGroupRemovedFailedEvent(group=text, **args)) + elif msg_type == 'device_group_remove_all': + self._bus.post(ZigbeeMqttGroupRemoveAllEvent(group=text, **args)) + elif msg_type == 'device_group_remove_all_failed': + self._bus.post(ZigbeeMqttGroupRemoveAllFailedEvent(group=text, **args)) + elif msg_type == 'zigbee_publish_error': + self.logger.error('zigbee2mqtt error: {}'.format(text)) + self._bus.post(ZigbeeMqttErrorEvent(error=text, **args)) + elif msg.get('level') in ['warning', 'error']: + log = getattr(self.logger, msg['level']) + log( + 'zigbee2mqtt {}: {}'.format( + msg['level'], text or msg.get('error', msg.get('warning')) + ) + ) + + def _process_devices(self, client: MqttClient, msg): + """ + Process a list of devices received on the zigbee2mqtt bridge. + """ + devices_info = { + device.get('friendly_name', device.get('ieee_address')): device + for device in msg + } + + # Subscribe to updates from all the known devices + event_args = {'host': client.host, 'port': client.port} + client.subscribe( + *[self.base_topic + '/' + device for device in devices_info.keys()] + ) + + for name, device in devices_info.items(): + # If we haven't cached this device yet, then notify about the + # connection of a new device. + if not self._info.devices.get(name): + self._bus.post( + ZigbeeMqttDeviceConnectedEvent(device=name, **event_args) + ) + + # Send a request to fetch all the known properties of this device + exposes = (device.get('definition', {}) or {}).get('exposes', []) + payload = self._build_device_get_request(exposes) + if payload: + client.publish( + self.base_topic + '/' + name + '/get', + json.dumps(payload), + ) + + # Send a request to fetch all the known properties of this device + for name in self._info.devices.by_name.copy(): + if name not in devices_info: + self._bus.post(ZigbeeMqttDeviceRemovedEvent(device=name, **event_args)) + self._info.devices.remove(name) + + self._info.devices.reset(*devices_info) + self._devices_meta = devices_info + + def _process_groups(self, client: MqttClient, msg): + """ + Process an MQTT message containing an updated list of groups. + """ + event_args = {'host': client.host, 'port': client.port} + groups_info = { + group.get('friendly_name', group.get('id')): group for group in msg + } + + # Trigger ZigbeeMqttGroupAddedEvent for each new group + for name in groups_info.keys(): + if name not in self._info.groups: + self._bus.post(ZigbeeMqttGroupAddedEvent(group=name, **event_args)) + + # Trigger ZigbeeMqttGroupRemovedEvent for each removed group + for name in self._info.groups.copy(): + if name not in groups_info: + self._bus.post(ZigbeeMqttGroupRemovedEvent(group=name, **event_args)) + del self._info.groups[name] + + # Reset the groups cache + self._info.groups = {group: {} for group in groups_info.keys()} + + def _process_property_update(self, device_name: str, properties: Mapping): + """ + Process an MQTT message containing a device property update. + + It will appropriately forward an + :class:`platypush.message.event.entities.EntityUpdateEvent` to the bus. + """ + device_info = self._devices_meta.get(device_name) + if not (device_info and properties): + return + + self.publish_entities( + [ + { + **device_info, + 'state': properties, + } + ] + ) # vim:sw=4:ts=4:et: diff --git a/platypush/plugins/zigbee/mqtt/_listener.py b/platypush/plugins/zigbee/mqtt/_listener.py deleted file mode 100644 index 015bbc0574..0000000000 --- a/platypush/plugins/zigbee/mqtt/_listener.py +++ /dev/null @@ -1,269 +0,0 @@ -import contextlib -import json -from typing import Mapping - -from platypush.backend.mqtt import MqttBackend -from platypush.bus import Bus -from platypush.context import get_bus, get_plugin -from platypush.message.event.zigbee.mqtt import ( - ZigbeeMqttOnlineEvent, - ZigbeeMqttOfflineEvent, - ZigbeeMqttDevicePropertySetEvent, - ZigbeeMqttDevicePairingEvent, - ZigbeeMqttDeviceConnectedEvent, - ZigbeeMqttDeviceBannedEvent, - ZigbeeMqttDeviceRemovedEvent, - ZigbeeMqttDeviceRemovedFailedEvent, - ZigbeeMqttDeviceWhitelistedEvent, - ZigbeeMqttDeviceRenamedEvent, - ZigbeeMqttDeviceBindEvent, - ZigbeeMqttDeviceUnbindEvent, - ZigbeeMqttGroupAddedEvent, - ZigbeeMqttGroupAddedFailedEvent, - ZigbeeMqttGroupRemovedEvent, - ZigbeeMqttGroupRemovedFailedEvent, - ZigbeeMqttGroupRemoveAllEvent, - ZigbeeMqttGroupRemoveAllFailedEvent, - ZigbeeMqttErrorEvent, -) -from platypush.plugins.zigbee.mqtt import ZigbeeMqttPlugin - - -class ZigbeeMqttListener(MqttBackend): - """ - Listener for zigbee2mqtt events. - """ - - def __init__(self): - plugin = self._plugin - self.base_topic = plugin.base_topic # type: ignore - self._devices = {} - self._devices_info = {} - self._groups = {} - self._last_state = None - self.server_info = { - 'host': plugin.host, # type: ignore - 'port': plugin.port or self._default_mqtt_port, # type: ignore - 'tls_cafile': plugin.tls_cafile, # type: ignore - 'tls_certfile': plugin.tls_certfile, # type: ignore - 'tls_ciphers': plugin.tls_ciphers, # type: ignore - 'tls_keyfile': plugin.tls_keyfile, # type: ignore - 'tls_version': plugin.tls_version, # type: ignore - 'username': plugin.username, # type: ignore - 'password': plugin.password, # type: ignore - } - - listeners = [ - { - **self.server_info, - 'topics': [ - self.base_topic + '/' + topic - for topic in [ - 'bridge/state', - 'bridge/log', - 'bridge/logging', - 'bridge/devices', - 'bridge/groups', - ] - ], - } - ] - - super().__init__( - subscribe_default_topic=False, listeners=listeners, **self.server_info - ) - - assert self.client_id - self.client_id += '-zigbee-mqtt' - - def _process_state_message(self, client, msg): - if msg == self._last_state: - return - - if msg == 'online': - evt = ZigbeeMqttOnlineEvent - elif msg == 'offline': - evt = ZigbeeMqttOfflineEvent - self.logger.warning('zigbee2mqtt service is offline') - else: - return - - self._bus.post(evt(host=client._host, port=client._port)) - self._last_state = msg - - def _process_log_message(self, client, msg): - msg_type = msg.get('type') - text = msg.get('message') - args = {'host': client._host, 'port': client._port} - - if msg_type == 'devices': - devices = {} - for dev in text or []: - devices[dev['friendly_name']] = dev - client.subscribe(self.base_topic + '/' + dev['friendly_name']) - elif msg_type == 'pairing': - self._bus.post(ZigbeeMqttDevicePairingEvent(device=text, **args)) - elif msg_type in ['device_ban', 'device_banned']: - self._bus.post(ZigbeeMqttDeviceBannedEvent(device=text, **args)) - elif msg_type in ['device_removed_failed', 'device_force_removed_failed']: - force = msg_type == 'device_force_removed_failed' - self._bus.post( - ZigbeeMqttDeviceRemovedFailedEvent(device=text, force=force, **args) - ) - elif msg_type == 'device_whitelisted': - self._bus.post(ZigbeeMqttDeviceWhitelistedEvent(device=text, **args)) - elif msg_type == 'device_renamed': - self._bus.post(ZigbeeMqttDeviceRenamedEvent(device=text, **args)) - elif msg_type == 'device_bind': - self._bus.post(ZigbeeMqttDeviceBindEvent(device=text, **args)) - elif msg_type == 'device_unbind': - self._bus.post(ZigbeeMqttDeviceUnbindEvent(device=text, **args)) - elif msg_type == 'device_group_add': - self._bus.post(ZigbeeMqttGroupAddedEvent(group=text, **args)) - elif msg_type == 'device_group_add_failed': - self._bus.post(ZigbeeMqttGroupAddedFailedEvent(group=text, **args)) - elif msg_type == 'device_group_remove': - self._bus.post(ZigbeeMqttGroupRemovedEvent(group=text, **args)) - elif msg_type == 'device_group_remove_failed': - self._bus.post(ZigbeeMqttGroupRemovedFailedEvent(group=text, **args)) - elif msg_type == 'device_group_remove_all': - self._bus.post(ZigbeeMqttGroupRemoveAllEvent(group=text, **args)) - elif msg_type == 'device_group_remove_all_failed': - self._bus.post(ZigbeeMqttGroupRemoveAllFailedEvent(group=text, **args)) - elif msg_type == 'zigbee_publish_error': - self.logger.error('zigbee2mqtt error: {}'.format(text)) - self._bus.post(ZigbeeMqttErrorEvent(error=text, **args)) - elif msg.get('level') in ['warning', 'error']: - log = getattr(self.logger, msg['level']) - log( - 'zigbee2mqtt {}: {}'.format( - msg['level'], text or msg.get('error', msg.get('warning')) - ) - ) - - def _process_devices(self, client, msg): - devices_info = { - device.get('friendly_name', device.get('ieee_address')): device - for device in msg - } - - # noinspection PyProtectedMember - event_args = {'host': client._host, 'port': client._port} - client.subscribe( - *[self.base_topic + '/' + device for device in devices_info.keys()] - ) - - for name, device in devices_info.items(): - if name not in self._devices: - self._bus.post( - ZigbeeMqttDeviceConnectedEvent(device=name, **event_args) - ) - - exposes = (device.get('definition', {}) or {}).get('exposes', []) - payload = self._plugin._build_device_get_request(exposes) # type: ignore - if payload: - client.publish( - self.base_topic + '/' + name + '/get', - json.dumps(payload), - ) - - devices_copy = [*self._devices.keys()] - for name in devices_copy: - if name not in devices_info: - self._bus.post(ZigbeeMqttDeviceRemovedEvent(device=name, **event_args)) - del self._devices[name] - - self._devices = {device: {} for device in devices_info.keys()} - self._devices_info = devices_info - - def _process_groups(self, client, msg): - # noinspection PyProtectedMember - event_args = {'host': client._host, 'port': client._port} - groups_info = { - group.get('friendly_name', group.get('id')): group for group in msg - } - - for name in groups_info.keys(): - if name not in self._groups: - self._bus.post(ZigbeeMqttGroupAddedEvent(group=name, **event_args)) - - groups_copy = [*self._groups.keys()] - for name in groups_copy: - if name not in groups_info: - self._bus.post(ZigbeeMqttGroupRemovedEvent(group=name, **event_args)) - del self._groups[name] - - self._groups = {group: {} for group in groups_info.keys()} - - def on_mqtt_message(self): - def handler(client, _, msg): - topic = msg.topic[len(self.base_topic) + 1 :] - data = msg.payload.decode() - if not data: - return - - with contextlib.suppress(ValueError, TypeError): - data = json.loads(data) - - if topic == 'bridge/state': - self._process_state_message(client, data) - elif topic in ['bridge/log', 'bridge/logging']: - self._process_log_message(client, data) - elif topic == 'bridge/devices': - self._process_devices(client, data) - elif topic == 'bridge/groups': - self._process_groups(client, data) - else: - suffix = topic.split('/')[-1] - if suffix not in self._devices: - return - - name = suffix - changed_props = { - k: v for k, v in data.items() if v != self._devices[name].get(k) - } - - if changed_props: - self._process_property_update(name, data) - self._bus.post( - ZigbeeMqttDevicePropertySetEvent( - host=client._host, - port=client._port, - device=name, - properties=changed_props, - ) - ) - - self._devices[name].update(data) - - return handler - - @property - def _plugin(self) -> ZigbeeMqttPlugin: - plugin = get_plugin('zigbee.mqtt') - assert plugin, 'The zigbee.mqtt plugin is not configured' - return plugin - - @property - def _bus(self) -> Bus: - return get_bus() - - def _process_property_update(self, device_name: str, properties: Mapping): - device_info = self._devices_info.get(device_name) - if not (device_info and properties): - return - - self._plugin.publish_entities( # type: ignore - [ - { - **device_info, - 'state': properties, - } - ] - ) - - def run(self): - super().run() - - -# vim:sw=4:ts=4:et: