diff --git a/platypush/plugins/zigbee/mqtt/__init__.py b/platypush/plugins/zigbee/mqtt/__init__.py
index 8a5fc6cbf..f1a1e7107 100644
--- a/platypush/plugins/zigbee/mqtt/__init__.py
+++ b/platypush/plugins/zigbee/mqtt/__init__.py
@@ -1,3 +1,6 @@
+import contextlib
+from dataclasses import dataclass, field
+from enum import Enum
import json
import re
import threading
@@ -8,6 +11,7 @@ from typing import (
Collection,
Dict,
List,
+ Mapping,
Optional,
Tuple,
Type,
@@ -15,6 +19,10 @@ from typing import (
)
from typing_extensions import override
+import paho.mqtt.client as mqtt
+
+from platypush.bus import Bus
+from platypush.context import get_bus
from platypush.entities import (
DimmerEntityManager,
Entity,
@@ -44,14 +52,109 @@ from platypush.entities.sensors import (
)
from platypush.entities.switches import Switch, EnumSwitch
from platypush.entities.temperature import TemperatureSensor
+from platypush.message.event.zigbee.mqtt import (
+ ZigbeeMqttOnlineEvent,
+ ZigbeeMqttOfflineEvent,
+ ZigbeeMqttDevicePairingEvent,
+ ZigbeeMqttDeviceConnectedEvent,
+ ZigbeeMqttDeviceBannedEvent,
+ ZigbeeMqttDeviceRemovedEvent,
+ ZigbeeMqttDeviceRemovedFailedEvent,
+ ZigbeeMqttDeviceWhitelistedEvent,
+ ZigbeeMqttDeviceRenamedEvent,
+ ZigbeeMqttDeviceBindEvent,
+ ZigbeeMqttDevicePropertySetEvent,
+ ZigbeeMqttDeviceUnbindEvent,
+ ZigbeeMqttGroupAddedEvent,
+ ZigbeeMqttGroupAddedFailedEvent,
+ ZigbeeMqttGroupRemovedEvent,
+ ZigbeeMqttGroupRemovedFailedEvent,
+ ZigbeeMqttGroupRemoveAllEvent,
+ ZigbeeMqttGroupRemoveAllFailedEvent,
+ ZigbeeMqttErrorEvent,
+)
from platypush.message.response import Response
-from platypush.plugins import RunnablePlugin
-from platypush.plugins.mqtt import MqttPlugin, action
+from platypush.plugins.mqtt import DEFAULT_TIMEOUT, MqttClient, MqttPlugin, action
+
+
+class BridgeState(Enum):
+ """
+ Known bridge states.
+ """
+
+ ONLINE = 'online'
+ OFFLINE = 'offline'
+
+
+@dataclass
+class ZigbeeDevicesInfo:
+ """
+ Cached information about the devices on the network.
+ """
+
+ by_address: Dict[str, dict] = field(default_factory=dict)
+ by_name: Dict[str, dict] = field(default_factory=dict)
+
+ def __contains__(self, name: str) -> bool:
+ """
+ :return: True if the device with the given name exists in the cache.
+ """
+ return name in self.by_name or name in self.by_address
+
+ def get(self, name: str) -> Optional[dict]:
+ """
+ Retrieves a cached device record either by name or by address.
+ """
+ return self.by_address.get(name, self.by_name.get(name))
+
+ def add(self, device: dict):
+ """
+ Adds a device record to the cache.
+ """
+ if device.get('ieee_address'):
+ self.by_address[device['ieee_address']] = device
+ if device.get('friendly_name'):
+ self.by_name[device['friendly_name']] = device
+
+ def remove(self, device: Union[str, dict]):
+ """
+ Removes a device record from the cache.
+ """
+ if isinstance(device, str):
+ dev = self.get(device)
+ if not dev:
+ return # No such device
+ else:
+ dev = device
+
+ if dev.get('ieee_address'):
+ self.by_address.pop(dev['ieee_address'], None)
+
+ if dev.get('friendly_name'):
+ self.by_name.pop(dev['friendly_name'], None)
+
+ def reset(self, *keys: str):
+ """
+ Reset the state for the devices with the given keys.
+ """
+
+ for k in keys:
+ self.by_address[k] = {}
+ self.by_name[k] = {}
+
+
+@dataclass
+class ZigbeeInfo:
+ """
+ Cached information about the devices and groups on the network.
+ """
+
+ devices: ZigbeeDevicesInfo = field(default_factory=ZigbeeDevicesInfo)
+ groups: Dict[str, dict] = field(default_factory=dict)
# pylint: disable=too-many-ancestors
class ZigbeeMqttPlugin(
- RunnablePlugin,
MqttPlugin,
DimmerEntityManager,
EnumSwitchEntityManager,
@@ -60,7 +163,7 @@ class ZigbeeMqttPlugin(
SwitchEntityManager,
):
"""
- This plugin allows you to interact with Zigbee devices over MQTT through any Zigbee sniffer and
+ Support for Zigbee devices using any Zigbee adapter compatible with
`zigbee2mqtt `_.
In order to get started you'll need:
@@ -71,15 +174,18 @@ class ZigbeeMqttPlugin(
Instructions:
- - Install `cc-tool `_ either from sources or from a package manager.
+ - Install `cc-tool `_ either from
+ sources or from a package manager.
- Connect the Zigbee to your PC/RaspberryPi in this way: ::
USB -> CC debugger -> downloader cable -> CC2531 -> USB
- - The debugger and the adapter should be connected *at the same time*. If the later ``cc-tool`` command throws
- up an error, put the device in sync while connected by pressing the _Reset_ button on the debugger.
+ - The debugger and the adapter should be connected *at the same time*.
+ If the later ``cc-tool`` command throws up an error, put the device in
+ sync while connected by pressing the _Reset_ button on the debugger.
- Check where the device is mapped. On Linux it will usually be ``/dev/ttyACM0``.
- - Download the latest `Z-Stack firmware `_
+ - Download the latest `Z-Stack firmware
+ `_
to your device. Instructions for a CC2531 device:
.. code-block:: shell
@@ -91,10 +197,12 @@ class ZigbeeMqttPlugin(
- You can disconnect your debugger and downloader cable once the firmware is flashed.
- - Install ``zigbee2mqtt``. First install a node/npm environment, then either install ``zigbee2mqtt`` manually or
- through your package manager. **NOTE**: many API breaking changes have occurred on Zigbee2MQTT 1.17.0,
- therefore this integration will only be compatible with the version 1.17.0 of the service or higher versions.
- Manual instructions:
+ - Install ``zigbee2mqtt``. First install a node/npm environment, then
+ either install ``zigbee2mqtt`` manually or through your package
+ manager. **NOTE**: many API breaking changes have occurred on
+ Zigbee2MQTT 1.17.0, therefore this integration will only be compatible
+ with the version 1.17.0 of the service or higher versions. Manual
+ instructions:
.. code-block:: shell
@@ -106,10 +214,12 @@ class ZigbeeMqttPlugin(
cd /opt/zigbee2mqtt
npm install
- - You need to have an MQTT broker running somewhere. If not, you can install
- `Mosquitto `_ through your package manager on any device in your network.
+ - You need to have an MQTT broker running somewhere. If not, you can
+ install `Mosquitto `_ through your package
+ manager on any device in your network.
- - Edit the ``/opt/zigbee2mqtt/data/configuration.yaml`` file to match the configuration of your MQTT broker:
+ - Edit the ``/opt/zigbee2mqtt/data/configuration.yaml`` file to match
+ the configuration of your MQTT broker:
.. code-block:: yaml
@@ -123,27 +233,32 @@ class ZigbeeMqttPlugin(
# user: my_user
# password: my_password
- - Also make sure that ``permit_join`` is set to ``True``, in order to allow Zigbee devices to join the network
- while you're configuring it. It's equally important to set ``permit_join`` to ``False`` once you have
- configured your network, to prevent accidental/malignant joins from outer Zigbee devices.
+ - Also make sure that ``permit_join`` is set to ``True``, in order to
+ allow Zigbee devices to join the network while you're configuring it.
+ It's equally important to set ``permit_join`` to ``False`` once you
+ have configured your network, to prevent accidental/malignant joins
+ from outer Zigbee devices.
- - Start the ``zigbee2mqtt`` daemon on your device (the
- `official documentation `_
- also contains instructions on how to configure it as a ``systemd`` service:
+ - Start the ``zigbee2mqtt`` daemon on your device (the `official
+ documentation
+ `_
+ also contains instructions on how to configure it as a ``systemd``
+ service:
.. code-block:: shell
cd /opt/zigbee2mqtt
npm start
- - If you have Zigbee devices that are paired to other bridges, unlink them or do a factory reset to pair them
- to your new bridge.
+ - If you have Zigbee devices that are paired to other bridges, unlink
+ them or do a factory reset to pair them to your new bridge.
- - If it all goes fine, once the daemon is running and a new device is found you should see traces like this in
- the output of ``zigbee2mqtt``::
+ - If it all goes fine, once the daemon is running and a new device is
+ found you should see traces like this in the output of
+ ``zigbee2mqtt``::
- zigbee2mqtt:info 2019-11-09T12:19:56: Successfully interviewed '0x00158d0001dc126a', device has
- successfully been paired
+ zigbee2mqtt:info 2019-11-09T12:19:56: Successfully interviewed '0x00158d0001dc126a',
+ device has successfully been paired
- You are now ready to use this integration.
@@ -153,45 +268,31 @@ class ZigbeeMqttPlugin(
Triggers:
- * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttOnlineEvent` when the service comes online.
- * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttOfflineEvent` when the service goes offline.
- * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDevicePropertySetEvent` when the properties of a
- connected device change.
- * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDevicePairingEvent` when a device is pairing.
- * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceConnectedEvent` when a device connects
- to the network.
- * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceBannedEvent` when a device is banned
- from the network.
- * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceRemovedEvent` when a device is removed
- from the network.
- * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceRemovedFailedEvent` when a request to
- remove a device from the network fails.
- * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceWhitelistedEvent` when a device is
- whitelisted on the network.
- * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceRenamedEvent` when a device is
- renamed on the network.
- * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceBindEvent` when a device bind event
- occurs.
- * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceUnbindEvent` when a device unbind event
- occurs.
- * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupAddedEvent` when a group is added.
- * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupAddedFailedEvent` when a request to
- add a new group fails.
- * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupRemovedEvent` when a group is removed.
- * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupRemovedFailedEvent` when a request to
- remove a group fails.
- * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupRemoveAllEvent` when all the devices
- are removed from a group.
- * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupRemoveAllFailedEvent` when a request to
- remove all the devices from a group fails.
- * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttErrorEvent` when an internal error occurs
- on the zigbee2mqtt service.
+ * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttOnlineEvent`
+ * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttOfflineEvent`
+ * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDevicePropertySetEvent`
+ * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDevicePairingEvent`
+ * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceConnectedEvent`
+ * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceBannedEvent`
+ * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceRemovedEvent`
+ * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceRemovedFailedEvent`
+ * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceWhitelistedEvent`
+ * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceRenamedEvent`
+ * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceBindEvent`
+ * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceUnbindEvent`
+ * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupAddedEvent`
+ * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupAddedFailedEvent`
+ * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupRemovedEvent`
+ * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupRemovedFailedEvent`
+ * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupRemoveAllEvent`
+ * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupRemoveAllFailedEvent`
+ * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttErrorEvent`
""" # noqa: E501
def __init__(
self,
- host: str = 'localhost',
+ host: str,
port: int = 1883,
base_topic: str = 'zigbee2mqtt',
timeout: int = 10,
@@ -204,25 +305,40 @@ class ZigbeeMqttPlugin(
**kwargs,
):
"""
- :param host: Default MQTT broker where ``zigbee2mqtt`` publishes its messages (default: ``localhost``).
+ :param host: Default MQTT broker where ``zigbee2mqtt`` publishes its messages.
:param port: Broker listen port (default: 1883).
- :param base_topic: Topic prefix, as specified in ``/opt/zigbee2mqtt/data/configuration.yaml``
- (default: '``base_topic``').
- :param timeout: If the command expects from a response, then this timeout value will be used
- (default: 60 seconds).
- :param tls_cafile: If the connection requires TLS/SSL, specify the certificate authority file
- (default: None)
- :param tls_certfile: If the connection requires TLS/SSL, specify the certificate file (default: None)
- :param tls_keyfile: If the connection requires TLS/SSL, specify the key file (default: None)
- :param tls_version: If the connection requires TLS/SSL, specify the minimum TLS supported version
- (default: None)
- :param tls_ciphers: If the connection requires TLS/SSL, specify the supported ciphers (default: None)
- :param username: If the connection requires user authentication, specify the username (default: None)
- :param password: If the connection requires user authentication, specify the password (default: None)
+ :param base_topic: Topic prefix, as specified in
+ ``/opt/zigbee2mqtt/data/configuration.yaml`` (default: '``zigbee2mqtt``').
+ :param timeout: If the command expects from a response, then this
+ timeout value will be used (default: 60 seconds).
+ :param tls_cafile: If the connection requires TLS/SSL, specify the
+ certificate authority file (default: None)
+ :param tls_certfile: If the connection requires TLS/SSL, specify the
+ certificate file (default: None)
+ :param tls_keyfile: If the connection requires TLS/SSL, specify the key
+ file (default: None)
+ :param tls_version: If the connection requires TLS/SSL, specify the
+ minimum TLS supported version (default: None)
+ :param tls_ciphers: If the connection requires TLS/SSL, specify the
+ supported ciphers (default: None)
+ :param username: If the connection requires user authentication, specify
+ the username (default: None)
+ :param password: If the connection requires user authentication, specify
+ the password (default: None)
"""
super().__init__(
host=host,
port=port,
+ topics=[
+ f'{base_topic}/{topic}'
+ for topic in [
+ 'bridge/state',
+ 'bridge/log',
+ 'bridge/logging',
+ 'bridge/devices',
+ 'bridge/groups',
+ ]
+ ],
tls_certfile=tls_certfile,
tls_keyfile=tls_keyfile,
tls_version=tls_version,
@@ -232,16 +348,21 @@ class ZigbeeMqttPlugin(
**kwargs,
)
+ # Append a unique suffix to the client ID to avoid client name clashes
+ # with other MQTT plugins.
+ self.client_id += '-zigbee-mqtt'
self.base_topic = base_topic
self.timeout = timeout
- self._info: Dict[str, dict] = {
- 'devices_by_addr': {},
- 'devices_by_name': {},
- 'groups': {},
- }
+ self._info = ZigbeeInfo()
+ self._devices_meta: Dict[str, dict] = {}
+ self._bridge_state = BridgeState.OFFLINE
@staticmethod
def _get_properties(device: dict) -> dict:
+ """
+ Static method that parses the properties of a device from its received
+ definition.
+ """
exposes = (device.get('definition') or {}).get('exposes', []).copy()
properties = {}
@@ -255,12 +376,17 @@ class ZigbeeMqttPlugin(
@staticmethod
def _get_options(device: dict) -> dict:
+ """
+ Static method that parses the options of a device from its received
+ definition.
+ """
return {
option['property']: option
for option in (device.get('definition') or {}).get('options', [])
if option.get('property')
}
+ @override
def transform_entities(self, entities: Collection[dict]) -> List[Entity]:
compatible_entities = []
for dev in entities:
@@ -368,6 +494,10 @@ class ZigbeeMqttPlugin(
@staticmethod
def _get_device_url(device_info: dict) -> Optional[str]:
+ """
+ Static method that returns the zigbee2mqtt URL with the information
+ about a certain device, if the model is available in its definition.
+ """
model = device_info.get('definition', {}).get('model')
if not model:
return None
@@ -376,6 +506,10 @@ class ZigbeeMqttPlugin(
@staticmethod
def _get_image_url(device_info: dict) -> Optional[str]:
+ """
+ Static method that returns the zigbee2mqtt URL of the image of a
+ certain device, if the model is available in its definition.
+ """
model = device_info.get('definition', {}).get('model')
if not model:
return None
@@ -383,13 +517,13 @@ class ZigbeeMqttPlugin(
return f'https://www.zigbee2mqtt.io/images/devices/{model}.jpg'
def _get_network_info(self, **kwargs) -> dict:
+ """
+ Refreshes the network information.
+ """
self.logger.info('Fetching Zigbee network information')
client = None
mqtt_args = self._mqtt_args(**kwargs)
- timeout = 30
- if 'timeout' in mqtt_args:
- timeout = mqtt_args.pop('timeout')
-
+ timeout = mqtt_args.pop('timeout', DEFAULT_TIMEOUT)
info: Dict[str, Any] = {
'state': None,
'info': {},
@@ -400,27 +534,22 @@ class ZigbeeMqttPlugin(
info_ready_events = {topic: threading.Event() for topic in info}
- def _on_message():
- def callback(_, __, msg):
- topic = msg.topic.split('/')[-1]
- if topic in info:
- info[topic] = (
- msg.payload.decode()
- if topic == 'state'
- else json.loads(msg.payload.decode())
- )
- info_ready_events[topic].set()
-
- return callback
+ def msg_callback(_, __, msg):
+ topic = msg.topic.split('/')[-1]
+ if topic in info:
+ info[topic] = (
+ msg.payload.decode()
+ if topic == 'state'
+ else json.loads(msg.payload.decode())
+ )
+ info_ready_events[topic].set()
try:
- host = mqtt_args.pop('host')
- port = mqtt_args.pop('port')
client = self._get_client( # pylint: disable=unexpected-keyword-arg
**mqtt_args
)
- client.on_message = _on_message()
- client.connect(host, port, keepalive=timeout)
+ client.on_message = msg_callback
+ client.connect()
client.subscribe(self.base_topic + '/bridge/#')
client.loop_start()
@@ -432,16 +561,16 @@ class ZigbeeMqttPlugin(
)
# Cache the new results
- self._info['devices_by_name'] = {
+ self._info.devices.by_name = {
self._preferred_name(device): device
for device in info.get('devices', [])
}
- self._info['devices_by_addr'] = {
+ self._info.devices.by_address = {
device['ieee_address']: device for device in info.get('devices', [])
}
- self._info['groups'] = {
+ self._info.groups = {
group.get('name'): group for group in info.get('groups', [])
}
@@ -456,27 +585,59 @@ class ZigbeeMqttPlugin(
return info
- def _topic(self, topic):
- return self.base_topic + '/' + topic
+ def _topic(self, topic: str) -> str:
+ """
+ Utility method that construct a topic prefixed by the configured base
+ topic.
+ """
+ return f'{self.base_topic}/{topic}'
@staticmethod
def _parse_response(response: Union[dict, Response]) -> dict:
+ """
+ Utility method that flattens a response received on a zigbee2mqtt topic
+ into a dictionary.
+ """
if isinstance(response, Response):
rs: dict = response.output # type: ignore
response = rs
- assert response.get('status') != 'error', response.get(
- 'error', 'zigbee2mqtt error'
+ if isinstance(response, dict):
+ assert response.get('status') != 'error', response.get(
+ 'error', 'zigbee2mqtt error'
+ )
+
+ return response or {}
+
+ def _run_request(
+ self,
+ topic: str,
+ msg: Union[dict, str],
+ reply_topic: Optional[str] = None,
+ **kwargs,
+ ) -> dict:
+ """
+ Sends a request/message to the Zigbeebee2MQTT bridge and waits for a
+ response.
+ """
+ return self._parse_response(
+ self.publish( # type: ignore
+ topic=topic,
+ msg=msg,
+ reply_topic=reply_topic,
+ **self._mqtt_args(**kwargs),
+ )
+ or {}
)
- return response
@action
def devices(self, **kwargs) -> List[Dict[str, Any]]:
"""
Get the list of devices registered to the service.
- :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish``
- (default: query the default configured device).
+ :param kwargs: Extra arguments to be passed to
+ :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` (default: query
+ the default configured device).
:return: List of paired devices. Example output:
@@ -648,23 +809,22 @@ class ZigbeeMqttPlugin(
self, permit: bool = True, timeout: Optional[float] = None, **kwargs
):
"""
- Enable/disable devices from joining the network. This is not persistent (will not be saved to
- ``configuration.yaml``).
+ Enable/disable devices from joining the network.
+
+ This is not persistent (it will not be saved to ``configuration.yaml``).
:param permit: Set to True to allow joins, False otherwise.
:param timeout: Allow/disallow joins only for this amount of time.
- :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish``
- (default: query the default configured device).
+ :param kwargs: Extra arguments to be passed to
+ :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` (default: query
+ the default configured device).
"""
if timeout:
- return self._parse_response(
- self.publish( # type: ignore[reportGeneralTypeIssues]
- topic=self._topic('bridge/request/permit_join'),
- msg={'value': permit, 'time': timeout},
- reply_topic=self._topic('bridge/response/permit_join'),
- **self._mqtt_args(**kwargs),
- )
- or {}
+ return self._run_request(
+ topic=self._topic('bridge/request/permit_join'),
+ msg={'value': permit, 'time': timeout},
+ reply_topic=self._topic('bridge/response/permit_join'),
+ **self._mqtt_args(**kwargs),
)
return self.publish(
@@ -676,12 +836,14 @@ class ZigbeeMqttPlugin(
@action
def factory_reset(self, **kwargs):
"""
- Perform a factory reset of a device connected to the network, following the procedure required by the particular
- device (for instance, Hue bulbs require the Zigbee adapter to be close to the device while a button on the back
- of the bulb is pressed).
+ Perform a factory reset of a device connected to the network, following
+ the procedure required by the particular device (for instance, Hue bulbs
+ require the Zigbee adapter to be close to the device while a button on
+ the back of the bulb is pressed).
- :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish``
- (default: query the default configured device).
+ :param kwargs: Extra arguments to be passed to
+ :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` (default: query
+ the default configured device).
"""
self.publish(
topic=self._topic('bridge/request/touchlink/factory_reset'),
@@ -692,44 +854,46 @@ class ZigbeeMqttPlugin(
@action
def log_level(self, level: str, **kwargs):
"""
- Change the log level at runtime. This change will not be persistent.
+ Change the log level at runtime.
+
+ This change will not be persistent.
:param level: Possible values: 'debug', 'info', 'warn', 'error'.
- :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish``
- (default: query the default configured device).
+ :param kwargs: Extra arguments to be passed to
+ :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` (default: query
+ the default configured device).
"""
- return self._parse_response(
- self.publish( # type: ignore[reportGeneralTypeIssues]
- topic=self._topic('bridge/request/config/log_level'),
- msg={'value': level},
- reply_topic=self._topic('bridge/response/config/log_level'),
- **self._mqtt_args(**kwargs),
- )
+ return self._run_request(
+ topic=self._topic('bridge/request/config/log_level'),
+ msg={'value': level},
+ reply_topic=self._topic('bridge/response/config/log_level'),
+ **self._mqtt_args(**kwargs),
)
@action
def device_set_option(self, device: str, option: str, value: Any, **kwargs):
"""
- Change the options of a device. Options can only be changed, not added or deleted.
+ Change the options of a device.
+
+ Options can only be changed, not added or deleted.
:param device: Display name or IEEE address of the device.
:param option: Option name.
:param value: New value.
- :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish``
- (default: query the default configured device).
+ :param kwargs: Extra arguments to be passed to
+ :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` (default: query
+ the default configured device).
"""
- return self._parse_response(
- self.publish( # type: ignore[reportGeneralTypeIssues]
- topic=self._topic('bridge/request/device/options'),
- reply_topic=self._topic('bridge/response/device/options'),
- msg={
- 'id': device,
- 'options': {
- option: value,
- },
+ return self._run_request(
+ topic=self._topic('bridge/request/device/options'),
+ reply_topic=self._topic('bridge/response/device/options'),
+ msg={
+ 'id': device,
+ 'options': {
+ option: value,
},
- **self._mqtt_args(**kwargs),
- )
+ },
+ **self._mqtt_args(**kwargs),
)
@action
@@ -738,19 +902,19 @@ class ZigbeeMqttPlugin(
Remove a device from the network.
:param device: Display name of the device.
- :param force: Force the remove also if the removal wasn't acknowledged by the device. Note: a forced remove
- only removes the entry from the internal database, but the device is likely to connect again when
+ :param force: Force the remove also if the removal wasn't acknowledged
+ by the device. Note: a forced remove only removes the entry from the
+ internal database, but the device is likely to connect again when
restarted unless it's factory reset (default: False).
- :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish``
- (default: query the default configured device).
+ :param kwargs: Extra arguments to be passed to
+ :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` (default: query
+ the default configured device).
"""
- return self._parse_response(
- self.publish( # type: ignore[reportGeneralTypeIssues]
- topic=self._topic('bridge/request/device/remove'),
- msg={'id': device, 'force': force},
- reply_topic=self._topic('bridge/response/device/remove'),
- **self._mqtt_args(**kwargs),
- )
+ return self._run_request(
+ topic=self._topic('bridge/request/device/remove'),
+ msg={'id': device, 'force': force},
+ reply_topic=self._topic('bridge/response/device/remove'),
+ **self._mqtt_args(**kwargs),
)
@action
@@ -759,35 +923,35 @@ class ZigbeeMqttPlugin(
Ban a device from the network.
:param device: Display name of the device.
- :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish``
- (default: query the default configured device).
+ :param kwargs: Extra arguments to be passed to
+ :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` (default: query
+ the default configured device).
"""
- return self._parse_response(
- self.publish( # type: ignore[reportGeneralTypeIssues]
- topic=self._topic('bridge/request/device/ban'),
- reply_topic=self._topic('bridge/response/device/ban'),
- msg={'id': device},
- **self._mqtt_args(**kwargs),
- )
+ return self._run_request(
+ topic=self._topic('bridge/request/device/ban'),
+ reply_topic=self._topic('bridge/response/device/ban'),
+ msg={'id': device},
+ **self._mqtt_args(**kwargs),
)
@action
def device_whitelist(self, device: str, **kwargs):
"""
- Whitelist a device on the network. Note: once at least a device is whitelisted, all the other non-whitelisted
- devices will be removed from the network.
+ Whitelist a device on the network.
+
+ Note: once at least a device is whitelisted, all the other
+ non-whitelisted devices will be removed from the network.
:param device: Display name of the device.
- :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish``
- (default: query the default configured device).
+ :param kwargs: Extra arguments to be passed to
+ :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` (default: query
+ the default configured device).
"""
- return self._parse_response(
- self.publish( # type: ignore[reportGeneralTypeIssues]
- topic=self._topic('bridge/request/device/whitelist'),
- reply_topic=self._topic('bridge/response/device/whitelist'),
- msg={'id': device},
- **self._mqtt_args(**kwargs),
- )
+ return self._run_request(
+ topic=self._topic('bridge/request/device/whitelist'),
+ reply_topic=self._topic('bridge/response/device/whitelist'),
+ msg={'id': device},
+ **self._mqtt_args(**kwargs),
)
@action
@@ -796,16 +960,18 @@ class ZigbeeMqttPlugin(
Rename a device on the network.
:param name: New name.
- :param device: Current name of the device to rename. If no name is specified then the rename will
- affect the last device that joined the network.
- :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish``
- (default: query the default configured device).
+ :param device: Current name of the device to rename. If no name is
+ specified then the rename will affect the last device that joined
+ the network.
+ :param kwargs: Extra arguments to be passed to
+ :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` (default: query
+ the default configured device).
"""
if name == device:
self.logger.info('Old and new name are the same: nothing to do')
- return
+ return None
- devices = self.devices().output # type: ignore[reportGeneralTypeIssues]
+ devices: dict = self.devices().output # type: ignore
assert not [
dev for dev in devices if dev.get('friendly_name') == name
], f'A device named {name} already exists on the network'
@@ -821,17 +987,23 @@ class ZigbeeMqttPlugin(
'to': name,
}
- return self._parse_response(
- self.publish( # type: ignore[reportGeneralTypeIssues]
- topic=self._topic('bridge/request/device/rename'),
- msg=req,
- reply_topic=self._topic('bridge/response/device/rename'),
- **self._mqtt_args(**kwargs),
- )
+ return self._run_request(
+ topic=self._topic('bridge/request/device/rename'),
+ msg=req,
+ reply_topic=self._topic('bridge/response/device/rename'),
+ **self._mqtt_args(**kwargs),
)
@staticmethod
def _build_device_get_request(values: List[Dict[str, Any]]) -> Dict[str, Any]:
+ """
+ Prepares a ``device_get`` request, as a dictionary to be sent down to
+ the bridge.
+
+ This makes sure that the properties and options are properly mapped and
+ converted.
+ """
+
def extract_value(value: dict, root: dict, depth: int = 0):
for feature in value.get('features', []):
new_root = root
@@ -860,11 +1032,14 @@ class ZigbeeMqttPlugin(
return ret
def _get_device_info(self, device: str, **kwargs) -> dict:
- device_info = self._info['devices_by_name'].get(
+ """
+ Get or retrieve the information about a device.
+ """
+ device_info = self._info.devices.by_name.get(
# First: check by friendly name
device,
# Second: check by address
- self._info['devices_by_addr'].get(device, {}),
+ self._info.devices.by_address.get(device, {}),
)
if not device_info:
@@ -883,10 +1058,18 @@ class ZigbeeMqttPlugin(
@staticmethod
def _preferred_name(device: dict) -> str:
+ """
+ Utility method that returns the preferred name of a device, on the basis
+ of which attributes are exposed (friendly name or IEEE address).
+ """
return device.get('friendly_name') or device.get('ieee_address') or ''
@classmethod
def _device_name_matches(cls, name: str, device: dict) -> bool:
+ """
+ Utility method that checks if either the friendly name or IEEE address
+ of a device match a certain string.
+ """
name = str(cls._ieee_address(name))
return name == device.get('friendly_name') or name == device.get('ieee_address')
@@ -895,14 +1078,19 @@ class ZigbeeMqttPlugin(
self, device: str, property: Optional[str] = None, **kwargs
) -> Dict[str, Any]:
"""
- Get the properties of a device. The returned keys vary depending on the device. For example, a light bulb
- may have the "``state``" and "``brightness``" properties, while an environment sensor may have the
- "``temperature``" and "``humidity``" properties, and so on.
+ Get the properties of a device.
+
+ The returned keys vary depending on the device. For example, a light
+ bulb may have the "``state``" and "``brightness``" properties, while an
+ environment sensor may have the "``temperature``" and "``humidity``"
+ properties, and so on.
:param device: Display name of the device.
- :param property: Name of the property that should be retrieved (default: all).
- :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish``
- (default: query the default configured device).
+ :param property: Name of the property that should be retrieved (default:
+ all).
+ :param kwargs: Extra arguments to be passed to
+ :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` (default: query
+ the default configured device).
:return: Key->value map of the device properties.
"""
kwargs = self._mqtt_args(**kwargs)
@@ -911,12 +1099,12 @@ class ZigbeeMqttPlugin(
device = self._preferred_name(device_info)
if property:
- properties = self.publish(
+ properties = self._run_request(
topic=self._topic(device) + f'/get/{property}',
reply_topic=self._topic(device),
msg={property: ''},
**kwargs,
- ).output # type: ignore[reportGeneralTypeIssues]
+ )
assert property in properties, f'No such property: {property}'
return {property: properties[property]}
@@ -932,12 +1120,12 @@ class ZigbeeMqttPlugin(
if not req:
reply_topic = None
- return self.publish(
+ return self._run_request(
topic=self._topic(device) + '/get',
reply_topic=reply_topic,
msg=req,
**kwargs,
- ).output # type: ignore[reportGeneralTypeIssues]
+ )
@action
def devices_get(
@@ -946,10 +1134,11 @@ class ZigbeeMqttPlugin(
"""
Get the properties of the devices connected to the network.
- :param devices: If set, then only the status of these devices (by friendly name) will be retrieved (default:
- retrieve all).
- :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish``
- (default: query the default configured device).
+ :param devices: If set, then only the status of these devices (by
+ friendly name) will be retrieved (default: retrieve all).
+ :param kwargs: Extra arguments to be passed to
+ :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` (default: query
+ the default configured device).
:return: Key->value map of the device properties:
.. code-block:: json
@@ -970,13 +1159,13 @@ class ZigbeeMqttPlugin(
devices = list(
{
self._preferred_name(device)
- for device in self.devices(**kwargs).output # type: ignore[reportGeneralTypeIssues]
+ for device in list(self.devices(**kwargs).output) # type: ignore
if self._preferred_name(device)
}
)
def worker(device: str, q: Queue):
- q.put(self.device_get(device, **kwargs).output) # type: ignore[reportGeneralTypeIssues]
+ q.put(self.device_get(device, **kwargs).output) # type: ignore
queues: Dict[str, Queue] = {}
workers = {}
@@ -1005,7 +1194,8 @@ class ZigbeeMqttPlugin(
@action
def status(self, *args, device: Optional[str] = None, **kwargs):
"""
- Get the status of a device (by friendly name) or of all the connected devices (it wraps :meth:`.devices_get`).
+ Get the status of a device (by friendly name) or of all the connected
+ devices (it wraps :meth:`.devices_get`).
:param device: Device friendly name (default: get all devices).
"""
@@ -1021,16 +1211,21 @@ class ZigbeeMqttPlugin(
**kwargs,
):
"""
- Set a properties on a device. The compatible properties vary depending on the device. For example, a light bulb
- may have the "``state``" and "``brightness``" properties, while an environment sensor may have the
- "``temperature``" and "``humidity``" properties, and so on.
+ Set a properties on a device.
+
+ The compatible properties vary depending on the device. For example, a
+ light bulb may have the "``state``" and "``brightness``" properties,
+ while an environment sensor may have the "``temperature``" and
+ "``humidity``" properties, and so on.
:param device: Display name of the device.
:param property: Name of the property that should be set.
:param value: New value of the property.
- :param values: If you want to set multiple values, then pass this mapping instead of ``property``+``value``.
- :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish``
- (default: query the default configured device).
+ :param values: If you want to set multiple values, then pass this
+ mapping instead of ``property``+``value``.
+ :param kwargs: Extra arguments to be passed to
+ :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` (default: query
+ the default configured device).
"""
msg = (values or {}).copy()
reply_topic = None
@@ -1056,12 +1251,12 @@ class ZigbeeMqttPlugin(
if self._is_write_only(stored_property):
reply_topic = None
- properties = self.publish(
+ properties = self._run_request(
topic=self._topic(device + '/set'),
reply_topic=reply_topic,
msg=msg,
**self._mqtt_args(**kwargs),
- ).output # type: ignore[reportGeneralTypeIssues]
+ )
if property and reply_topic:
assert (
@@ -1105,8 +1300,9 @@ class ZigbeeMqttPlugin(
Check if the specified device has any OTA updates available to install.
:param device: Address or friendly name of the device.
- :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish``
- (default: query the default configured device).
+ :param kwargs: Extra arguments to be passed to
+ :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` (default: query
+ the default configured device).
:return:
@@ -1119,13 +1315,11 @@ class ZigbeeMqttPlugin(
}
"""
- ret = self._parse_response(
- self.publish( # type: ignore[reportGeneralTypeIssues]
- topic=self._topic('bridge/request/device/ota_update/check'),
- reply_topic=self._topic('bridge/response/device/ota_update/check'),
- msg={'id': device},
- **self._mqtt_args(**kwargs),
- )
+ ret = self._run_request(
+ topic=self._topic('bridge/request/device/ota_update/check'),
+ reply_topic=self._topic('bridge/response/device/ota_update/check'),
+ msg={'id': device},
+ **self._mqtt_args(**kwargs),
)
return {
@@ -1140,16 +1334,15 @@ class ZigbeeMqttPlugin(
Install OTA updates for a device if available.
:param device: Address or friendly name of the device.
- :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish``
- (default: query the default configured device).
+ :param kwargs: Extra arguments to be passed to
+ :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` (default: query
+ the default configured device).
"""
- return self._parse_response(
- self.publish( # type: ignore[reportGeneralTypeIssues]
- topic=self._topic('bridge/request/device/ota_update/update'),
- reply_topic=self._topic('bridge/response/device/ota_update/update'),
- msg={'id': device},
- **self._mqtt_args(**kwargs),
- )
+ return self._run_request(
+ topic=self._topic('bridge/request/device/ota_update/update'),
+ reply_topic=self._topic('bridge/response/device/ota_update/update'),
+ msg={'id': device},
+ **self._mqtt_args(**kwargs),
)
@action
@@ -1157,8 +1350,9 @@ class ZigbeeMqttPlugin(
"""
Get the groups registered on the device.
- :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish``
- (default: query the default configured device).
+ :param kwargs: Extra arguments to be passed to
+ :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` (default: query
+ the default configured device).
"""
return self._get_network_info(**kwargs).get('groups', [])
@@ -1167,8 +1361,9 @@ class ZigbeeMqttPlugin(
"""
Get the information, configuration and state of the network.
- :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish``
- (default: query the default configured device).
+ :param kwargs: Extra arguments to be passed to
+ :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` (default: query
+ the default configured device).
:return: Example:
@@ -1305,8 +1500,9 @@ class ZigbeeMqttPlugin(
:param name: Display name of the group.
:param id: Optional numeric ID (default: auto-generated).
- :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish``
- (default: query the default configured device).
+ :param kwargs: Extra arguments to be passed to
+ :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` (default: query
+ the default configured device).
"""
payload = (
name
@@ -1317,13 +1513,11 @@ class ZigbeeMqttPlugin(
}
)
- return self._parse_response(
- self.publish( # type: ignore[reportGeneralTypeIssues]
- topic=self._topic('bridge/request/group/add'),
- reply_topic=self._topic('bridge/response/group/add'),
- msg=payload,
- **self._mqtt_args(**kwargs),
- )
+ return self._run_request(
+ topic=self._topic('bridge/request/group/add'),
+ reply_topic=self._topic('bridge/response/group/add'),
+ msg=payload,
+ **self._mqtt_args(**kwargs),
)
@action
@@ -1331,28 +1525,34 @@ class ZigbeeMqttPlugin(
self, group: str, property: Optional[str] = None, **kwargs
) -> dict:
"""
- Get one or more properties of a group. The compatible properties vary depending on the devices on the group.
- For example, a light bulb may have the "``state``" (with values ``"ON"`` and ``"OFF"``) and "``brightness``"
- properties, while an environment sensor may have the "``temperature``" and "``humidity``" properties, and so on.
+ Get one or more properties of a group.
+
+ The compatible properties vary depending on the devices on the group.
+ For example, a light bulb may have the "``state``" (with values ``"ON"``
+ and ``"OFF"``) and "``brightness``" properties, while an environment
+ sensor may have the "``temperature``" and "``humidity``" properties, and
+ so on.
:param group: Display name of the group.
- :param property: Name of the property to retrieve (default: all available properties)
- :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish``
- (default: query the default configured device).
+ :param property: Name of the property to retrieve (default: all
+ available properties)
+ :param kwargs: Extra arguments to be passed to
+ :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` (default: query
+ the default configured device).
"""
msg = {}
if property:
msg = {property: ''}
- properties = self.publish(
+ properties = self._run_request(
topic=self._topic(group + '/get'),
reply_topic=self._topic(group),
msg=msg,
**self._mqtt_args(**kwargs),
- ).output # type: ignore[reportGeneralTypeIssues]
+ )
if property:
- assert property in properties, 'No such property: ' + property
+ assert property in properties, f'No such property: {property}'
return {property: properties[property]}
return properties
@@ -1362,25 +1562,31 @@ class ZigbeeMqttPlugin(
self, group: str, property: str, value: Any, **kwargs
):
"""
- Set a properties on a group. The compatible properties vary depending on the devices on the group.
- For example, a light bulb may have the "``state``" (with values ``"ON"`` and ``"OFF"``) and "``brightness``"
- properties, while an environment sensor may have the "``temperature``" and "``humidity``" properties, and so on.
+ Set a properties on a group.
+
+ The compatible properties vary depending on the devices on the group.
+
+ For example, a light bulb may have the "``state``" (with values ``"ON"``
+ and ``"OFF"``) and "``brightness``" properties, while an environment
+ sensor may have the "``temperature``" and "``humidity``" properties, and
+ so on.
:param group: Display name of the group.
:param property: Name of the property that should be set.
:param value: New value of the property.
- :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish``
- (default: query the default configured device).
+ :param kwargs: Extra arguments to be passed to
+ :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` (default: query
+ the default configured device).
"""
- properties = self.publish(
+ properties = self._run_request(
topic=self._topic(group + '/set'),
reply_topic=self._topic(group),
msg={property: value},
**self._mqtt_args(**kwargs),
- ).output # type: ignore[reportGeneralTypeIssues]
+ )
if property:
- assert property in properties, 'No such property: ' + property
+ assert property in properties, f'No such property: {property}'
return {property: properties[property]}
return properties
@@ -1392,27 +1598,26 @@ class ZigbeeMqttPlugin(
:param name: New name.
:param group: Current name of the group to rename.
- :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish``
- (default: query the default configured device).
+ :param kwargs: Extra arguments to be passed to
+ :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` (default: query
+ the default configured device).
"""
if name == group:
self.logger.info('Old and new name are the same: nothing to do')
- return
+ return None
groups = {
- group.get('friendly_name'): group
- for group in self.groups().output # type: ignore[reportGeneralTypeIssues]
+ g.get('friendly_name'): g
+ for g in dict(self.groups().output) # type: ignore
}
assert name not in groups, f'A group named {name} already exists on the network'
- return self._parse_response(
- self.publish( # type: ignore[reportGeneralTypeIssues]
- topic=self._topic('bridge/request/group/rename'),
- reply_topic=self._topic('bridge/response/group/rename'),
- msg={'from': group, 'to': name} if group else name,
- **self._mqtt_args(**kwargs),
- )
+ return self._run_request(
+ topic=self._topic('bridge/request/group/rename'),
+ reply_topic=self._topic('bridge/response/group/rename'),
+ msg={'from': group, 'to': name} if group else name,
+ **self._mqtt_args(**kwargs),
)
@action
@@ -1421,16 +1626,15 @@ class ZigbeeMqttPlugin(
Remove a group.
:param name: Display name of the group.
- :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish``
- (default: query the default configured device).
+ :param kwargs: Extra arguments to be passed to
+ :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` (default: query
+ the default configured device).
"""
- return self._parse_response(
- self.publish( # type: ignore[reportGeneralTypeIssues]
- topic=self._topic('bridge/request/group/remove'),
- reply_topic=self._topic('bridge/response/group/remove'),
- msg=name,
- **self._mqtt_args(**kwargs),
- )
+ return self._run_request(
+ topic=self._topic('bridge/request/group/remove'),
+ reply_topic=self._topic('bridge/response/group/remove'),
+ msg=name,
+ **self._mqtt_args(**kwargs),
)
@action
@@ -1440,19 +1644,18 @@ class ZigbeeMqttPlugin(
:param group: Display name of the group.
:param device: Display name of the device to be added.
- :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish``
- (default: query the default configured device).
+ :param kwargs: Extra arguments to be passed to
+ :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` (default: query
+ the default configured device).
"""
- return self._parse_response(
- self.publish( # type: ignore[reportGeneralTypeIssues]
- topic=self._topic('bridge/request/group/members/add'),
- reply_topic=self._topic('bridge/response/group/members/add'),
- msg={
- 'group': group,
- 'device': device,
- },
- **self._mqtt_args(**kwargs),
- )
+ return self._run_request(
+ topic=self._topic('bridge/request/group/members/add'),
+ reply_topic=self._topic('bridge/response/group/members/add'),
+ msg={
+ 'group': group,
+ 'device': device,
+ },
+ **self._mqtt_args(**kwargs),
)
@action
@@ -1461,78 +1664,78 @@ class ZigbeeMqttPlugin(
Remove a device from a group.
:param group: Display name of the group.
- :param device: Display name of the device to be removed. If none is specified then all the devices registered
- to the specified group will be removed.
- :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish``
- (default: query the default configured device).
+ :param device: Display name of the device to be removed. If none is
+ specified then all the devices registered to the specified group
+ will be removed.
+ :param kwargs: Extra arguments to be passed to
+ :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` (default: query
+ the default configured device).
"""
remove_suffix = '_all' if device is None else ''
- return self._parse_response(
- self.publish( # type: ignore[reportGeneralTypeIssues]
- topic=self._topic(
- f'bridge/request/group/members/remove{remove_suffix}'
- ),
- reply_topic=self._topic(
- f'bridge/response/group/members/remove{remove_suffix}'
- ),
- msg={
- 'group': group,
- 'device': device,
- },
- **self._mqtt_args(**kwargs),
- )
+ return self._run_request(
+ topic=self._topic(f'bridge/request/group/members/remove{remove_suffix}'),
+ reply_topic=self._topic(
+ f'bridge/response/group/members/remove{remove_suffix}'
+ ),
+ msg={
+ 'group': group,
+ 'device': device,
+ },
+ **self._mqtt_args(**kwargs),
)
@action
def bind_devices(self, source: str, target: str, **kwargs):
"""
- Bind two devices. Binding makes it possible that devices can directly control each other without the
- intervention of zigbee2mqtt or any home automation software. You may want to use this feature to bind
- for example an IKEA/Philips Hue dimmer switch to a light bulb, or a Zigbee remote to a thermostat.
- Read more on the `zigbee2mqtt binding page `_.
+ Bind two devices.
- :param source: Name of the source device. It can also be a group name, although the support is
- `still experimental `_.
- You can also bind a specific device endpoint - for example ``MySensor/temperature``.
- :param target: Name of the target device.
- You can also bind a specific device endpoint - for example ``MyLight/state``.
- :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish``
- (default: query the default configured device).
+ Binding makes it possible that devices can directly control each other
+ without the intervention of zigbee2mqtt or any home automation software.
+ You may want to use this feature to bind for example an IKEA/Philips Hue
+ dimmer switch to a light bulb, or a Zigbee remote to a thermostat. Read
+ more on the `zigbee2mqtt binding page
+ `_.
+
+ :param source: Name of the source device. It can also be a group name,
+ although the support is `still experimental
+ `_.
+ You can also bind a specific device endpoint - for example
+ ``MySensor/temperature``.
+ :param target: Name of the target device. You can also bind a specific
+ device endpoint - for example ``MyLight/state``.
+ :param kwargs: Extra arguments to be passed to
+ :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` (default: query
+ the default configured device).
"""
- return self._parse_response(
- self.publish( # type: ignore[reportGeneralTypeIssues]
- topic=self._topic('bridge/request/device/bind'),
- reply_topic=self._topic('bridge/response/device/bind'),
- msg={'from': source, 'to': target},
- **self._mqtt_args(**kwargs),
- )
+ return self._run_request(
+ topic=self._topic('bridge/request/device/bind'),
+ reply_topic=self._topic('bridge/response/device/bind'),
+ msg={'from': source, 'to': target},
+ **self._mqtt_args(**kwargs),
)
@action
def unbind_devices(self, source: str, target: str, **kwargs):
"""
- Un-bind two devices.
+ Remove a binding between two devices.
- :param source: Name of the source device.
- You can also bind a specific device endpoint - for example ``MySensor/temperature``.
- :param target: Name of the target device.
- You can also bind a specific device endpoint - for example ``MyLight/state``.
- :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish``
- (default: query the default configured device).
+ :param source: Name of the source device. You can also bind a specific
+ device endpoint - for example ``MySensor/temperature``.
+ :param target: Name of the target device. You can also bind a specific
+ device endpoint - for example ``MyLight/state``.
+ :param kwargs: Extra arguments to be passed to
+ :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` (default: query
+ the default configured device).
"""
- return self._parse_response(
- self.publish( # type: ignore[reportGeneralTypeIssues]
- topic=self._topic('bridge/request/device/unbind'),
- reply_topic=self._topic('bridge/response/device/unbind'),
- msg={'from': source, 'to': target},
- **self._mqtt_args(**kwargs),
- )
+ return self._run_request(
+ topic=self._topic('bridge/request/device/unbind'),
+ reply_topic=self._topic('bridge/response/device/unbind'),
+ msg={'from': source, 'to': target},
+ **self._mqtt_args(**kwargs),
)
@action
- def on( # pylint: disable=redefined-builtin,arguments-differ
- self, device, *_, **__
- ):
+ def on(self, device, *_, **__): # pylint: disable=arguments-differ
"""
Turn on/set to true a switch, a binary property or an option.
"""
@@ -1542,9 +1745,7 @@ class ZigbeeMqttPlugin(
)
@action
- def off( # pylint: disable=redefined-builtin,arguments-differ
- self, device, *_, **__
- ):
+ def off(self, device, *_, **__): # pylint: disable=arguments-differ
"""
Turn off/set to false a switch, a binary property or an option.
"""
@@ -1554,15 +1755,13 @@ class ZigbeeMqttPlugin(
)
@action
- def toggle( # pylint: disable=redefined-builtin,arguments-differ
- self, device, *_, **__
- ):
+ def toggle(self, device, *_, **__): # pylint: disable=arguments-differ
"""
Toggles the state of a switch, a binary property or an option.
"""
device, prop_info = self._get_switch_info(device)
prop = prop_info['property']
- device_state = self.device_get(device).output # type: ignore
+ device_state: dict = self.device_get(device).output # type: ignore
return self.device_set(
device,
prop,
@@ -1575,6 +1774,10 @@ class ZigbeeMqttPlugin(
)
def _get_switch_info(self, name: str) -> Tuple[str, dict]:
+ """
+ Get the information about a switch or switch-like device by name or
+ address.
+ """
name, prop = self._ieee_address_and_property(name)
if not prop or prop == 'light':
prop = 'state'
@@ -1593,6 +1796,10 @@ class ZigbeeMqttPlugin(
@staticmethod
def _is_read_only(feature: dict) -> bool:
+ """
+ Utility method that checks if a feature is read-only on the basis of its
+ access flags.
+ """
return bool(feature.get('access', 0) & 2) == 0 and (
bool(feature.get('access', 0) & 1) == 1
or bool(feature.get('access', 0) & 4) == 1
@@ -1600,6 +1807,10 @@ class ZigbeeMqttPlugin(
@staticmethod
def _is_write_only(feature: dict) -> bool:
+ """
+ Utility method that checks if a feature is write-only on the basis of
+ its access flags.
+ """
return bool(feature.get('access', 0) & 2) == 1 and (
bool(feature.get('access', 0) & 1) == 0
or bool(feature.get('access', 0) & 4) == 0
@@ -1607,12 +1818,22 @@ class ZigbeeMqttPlugin(
@staticmethod
def _is_query_disabled(feature: dict) -> bool:
+ """
+ Utility method that checks if a feature doesn't support programmating
+ querying (i.e. it will only broadcast its state when available) on the
+ basis of its access flags.
+ """
return bool(feature.get('access', 0) & 4) == 0
@staticmethod
def _ieee_address_and_property(
device: Union[dict, str]
) -> Tuple[str, Optional[str]]:
+ """
+ Given a device property, as a dictionary containing the full device
+ definition or a string containing the device address and property,
+ return a tuple in the format ``(device_address, property_name)``.
+ """
# Entity value IDs are stored in the `:`
# format. Therefore, we need to split by `:` if we want to
# retrieve the original address.
@@ -1630,12 +1851,20 @@ class ZigbeeMqttPlugin(
@classmethod
def _ieee_address(cls, device: Union[dict, str]) -> str:
+ """
+ :return: The IEEE address of a device, given its full definition or
+ common name.
+ """
return cls._ieee_address_and_property(device)[0]
@classmethod
def _get_switches(
cls, device_info: dict, props: dict, options: dict
) -> List[Switch]:
+ """
+ A utility method that parses the properties of a device that can be
+ mapped to switches (or switch-like entities).
+ """
return [
cls._to_entity(
Switch,
@@ -1660,9 +1889,13 @@ class ZigbeeMqttPlugin(
]
@classmethod
- def _get_sensors(
+ def _get_sensors( # pylint: disable=too-many-branches
cls, device_info: dict, props: dict, options: dict
) -> List[Sensor]:
+ """
+ A utility method that parses the properties of a device that can be
+ mapped to sensors (or sensor-like entities).
+ """
sensors = []
properties = [
prop
@@ -1727,6 +1960,10 @@ class ZigbeeMqttPlugin(
def _get_dimmers(
cls, device_info: dict, props: dict, options: dict
) -> List[Dimmer]:
+ """
+ A utility method that parses the properties of a device that can be
+ mapped to dimmers (or dimmer-like entities).
+ """
return [
cls._to_entity(
Dimmer,
@@ -1750,6 +1987,10 @@ class ZigbeeMqttPlugin(
def _get_enum_switches(
cls, device_info: dict, props: dict, options: dict
) -> List[EnumSwitch]:
+ """
+ A utility method that parses the properties of a device that can be
+ mapped to switches with enum values.
+ """
return [
cls._to_entity(
EnumSwitch,
@@ -1776,6 +2017,10 @@ class ZigbeeMqttPlugin(
options: dict,
**kwargs,
) -> Entity:
+ """
+ Give the information about a device and its properties and options, it
+ builds an entity of the right type.
+ """
return entity_type(
id=f'{device_info["ieee_address"]}:{property["property"]}',
name=property.get('description', ''),
@@ -1788,8 +2033,12 @@ class ZigbeeMqttPlugin(
@classmethod
def _get_light_meta(cls, device_info: dict) -> dict:
- exposes = (device_info.get('definition', {}) or {}).get('exposes', [])
- for exposed in exposes:
+ """
+ Parse the attributes of a device that can be mapped to lights (or
+ light-like entities).
+ """
+ # pylint: disable=too-many-nested-blocks
+ for exposed in (device_info.get('definition', {}) or {}).get('exposes', []):
if exposed.get('type') == 'light':
features = exposed.get('features', [])
switch = {}
@@ -1814,7 +2063,7 @@ class ZigbeeMqttPlugin(
'value_on': feature['value_on'],
'value_off': feature['value_off'],
'state_name': feature['name'],
- 'value_toggle': feature.get('value_toggle', None),
+ 'value_toggle': feature.get('value_toggle'),
**data,
}
elif (
@@ -1954,15 +2203,223 @@ class ZigbeeMqttPlugin(
self.device_set(self._preferred_name(dev), values=data)
- def main(self):
- from ._listener import ZigbeeMqttListener
+ @override
+ def on_mqtt_message(self):
+ """
+ Overrides :meth:`platypush.plugins.mqtt.MqttPlugin.on_mqtt_message` to
+ handle messages from the zigbee2mqtt integration.
+ """
- listener = ZigbeeMqttListener()
- listener.start()
- self.wait_stop()
+ def handler(client: MqttClient, _, msg: mqtt.MQTTMessage):
+ topic = msg.topic[len(self.base_topic) + 1 :]
+ data = msg.payload.decode()
+ if not data:
+ return
- listener.stop()
- listener.join()
+ with contextlib.suppress(ValueError, TypeError):
+ data = json.loads(data)
+
+ if topic == 'bridge/state':
+ self._process_state_message(client, data)
+ elif topic in ['bridge/log', 'bridge/logging']:
+ self._process_log_message(client, data)
+ elif topic == 'bridge/devices':
+ self._process_devices(client, data)
+ elif topic == 'bridge/groups':
+ self._process_groups(client, data)
+ elif isinstance(data, dict):
+ name = topic.split('/')[-1]
+ if name not in self._info.devices:
+ self.logger.debug('Skipping unknown topic: %s', topic)
+ return
+
+ dev = self._info.devices.get(name)
+ assert dev is not None, f'No such device: {name}'
+ changed_props = {k: v for k, v in data.items() if v != dev.get(k)}
+
+ if changed_props:
+ self._process_property_update(name, data)
+ self._bus.post(
+ ZigbeeMqttDevicePropertySetEvent(
+ host=client.host,
+ port=client.port,
+ device=name,
+ properties=changed_props,
+ )
+ )
+
+ device_meta = self._devices_meta.get(name)
+ if device_meta:
+ data['friendly_name'] = device_meta.get('friendly_name')
+ data['ieee_address'] = device_meta.get('ieee_address')
+ self._info.devices.add(data)
+
+ return handler
+
+ @property
+ def _bus(self) -> Bus:
+ """
+ Utility property for the bus.
+ """
+ return get_bus()
+
+ def _process_state_message(self, client: MqttClient, msg: str):
+ """
+ Process a state message.
+ """
+ if msg == self._bridge_state:
+ return
+
+ if msg == 'online':
+ evt = ZigbeeMqttOnlineEvent
+ self._bridge_state = BridgeState.ONLINE
+ elif msg == 'offline':
+ evt = ZigbeeMqttOfflineEvent
+ self._bridge_state = BridgeState.OFFLINE
+ self.logger.warning('The zigbee2mqtt service is offline')
+ else:
+ return
+
+ self._bus.post(evt(host=client.host, port=client.port))
+
+ # pylint: disable=too-many-branches
+ def _process_log_message(self, client, msg):
+ """
+ Process a logevent.
+ """
+
+ msg_type = msg.get('type')
+ text = msg.get('message')
+ args = {'host': client._host, 'port': client._port}
+
+ if msg_type == 'devices':
+ devices = {}
+ for dev in text or []:
+ devices[dev['friendly_name']] = dev
+ client.subscribe(self.base_topic + '/' + dev['friendly_name'])
+ elif msg_type == 'pairing':
+ self._bus.post(ZigbeeMqttDevicePairingEvent(device=text, **args))
+ elif msg_type in ['device_ban', 'device_banned']:
+ self._bus.post(ZigbeeMqttDeviceBannedEvent(device=text, **args))
+ elif msg_type in ['device_removed_failed', 'device_force_removed_failed']:
+ force = msg_type == 'device_force_removed_failed'
+ self._bus.post(
+ ZigbeeMqttDeviceRemovedFailedEvent(device=text, force=force, **args)
+ )
+ elif msg_type == 'device_whitelisted':
+ self._bus.post(ZigbeeMqttDeviceWhitelistedEvent(device=text, **args))
+ elif msg_type == 'device_renamed':
+ self._bus.post(ZigbeeMqttDeviceRenamedEvent(device=text, **args))
+ elif msg_type == 'device_bind':
+ self._bus.post(ZigbeeMqttDeviceBindEvent(device=text, **args))
+ elif msg_type == 'device_unbind':
+ self._bus.post(ZigbeeMqttDeviceUnbindEvent(device=text, **args))
+ elif msg_type == 'device_group_add':
+ self._bus.post(ZigbeeMqttGroupAddedEvent(group=text, **args))
+ elif msg_type == 'device_group_add_failed':
+ self._bus.post(ZigbeeMqttGroupAddedFailedEvent(group=text, **args))
+ elif msg_type == 'device_group_remove':
+ self._bus.post(ZigbeeMqttGroupRemovedEvent(group=text, **args))
+ elif msg_type == 'device_group_remove_failed':
+ self._bus.post(ZigbeeMqttGroupRemovedFailedEvent(group=text, **args))
+ elif msg_type == 'device_group_remove_all':
+ self._bus.post(ZigbeeMqttGroupRemoveAllEvent(group=text, **args))
+ elif msg_type == 'device_group_remove_all_failed':
+ self._bus.post(ZigbeeMqttGroupRemoveAllFailedEvent(group=text, **args))
+ elif msg_type == 'zigbee_publish_error':
+ self.logger.error('zigbee2mqtt error: {}'.format(text))
+ self._bus.post(ZigbeeMqttErrorEvent(error=text, **args))
+ elif msg.get('level') in ['warning', 'error']:
+ log = getattr(self.logger, msg['level'])
+ log(
+ 'zigbee2mqtt {}: {}'.format(
+ msg['level'], text or msg.get('error', msg.get('warning'))
+ )
+ )
+
+ def _process_devices(self, client: MqttClient, msg):
+ """
+ Process a list of devices received on the zigbee2mqtt bridge.
+ """
+ devices_info = {
+ device.get('friendly_name', device.get('ieee_address')): device
+ for device in msg
+ }
+
+ # Subscribe to updates from all the known devices
+ event_args = {'host': client.host, 'port': client.port}
+ client.subscribe(
+ *[self.base_topic + '/' + device for device in devices_info.keys()]
+ )
+
+ for name, device in devices_info.items():
+ # If we haven't cached this device yet, then notify about the
+ # connection of a new device.
+ if not self._info.devices.get(name):
+ self._bus.post(
+ ZigbeeMqttDeviceConnectedEvent(device=name, **event_args)
+ )
+
+ # Send a request to fetch all the known properties of this device
+ exposes = (device.get('definition', {}) or {}).get('exposes', [])
+ payload = self._build_device_get_request(exposes)
+ if payload:
+ client.publish(
+ self.base_topic + '/' + name + '/get',
+ json.dumps(payload),
+ )
+
+ # Send a request to fetch all the known properties of this device
+ for name in self._info.devices.by_name.copy():
+ if name not in devices_info:
+ self._bus.post(ZigbeeMqttDeviceRemovedEvent(device=name, **event_args))
+ self._info.devices.remove(name)
+
+ self._info.devices.reset(*devices_info)
+ self._devices_meta = devices_info
+
+ def _process_groups(self, client: MqttClient, msg):
+ """
+ Process an MQTT message containing an updated list of groups.
+ """
+ event_args = {'host': client.host, 'port': client.port}
+ groups_info = {
+ group.get('friendly_name', group.get('id')): group for group in msg
+ }
+
+ # Trigger ZigbeeMqttGroupAddedEvent for each new group
+ for name in groups_info.keys():
+ if name not in self._info.groups:
+ self._bus.post(ZigbeeMqttGroupAddedEvent(group=name, **event_args))
+
+ # Trigger ZigbeeMqttGroupRemovedEvent for each removed group
+ for name in self._info.groups.copy():
+ if name not in groups_info:
+ self._bus.post(ZigbeeMqttGroupRemovedEvent(group=name, **event_args))
+ del self._info.groups[name]
+
+ # Reset the groups cache
+ self._info.groups = {group: {} for group in groups_info.keys()}
+
+ def _process_property_update(self, device_name: str, properties: Mapping):
+ """
+ Process an MQTT message containing a device property update.
+
+ It will appropriately forward an
+ :class:`platypush.message.event.entities.EntityUpdateEvent` to the bus.
+ """
+ device_info = self._devices_meta.get(device_name)
+ if not (device_info and properties):
+ return
+
+ self.publish_entities(
+ [
+ {
+ **device_info,
+ 'state': properties,
+ }
+ ]
+ )
# vim:sw=4:ts=4:et:
diff --git a/platypush/plugins/zigbee/mqtt/_listener.py b/platypush/plugins/zigbee/mqtt/_listener.py
deleted file mode 100644
index 015bbc057..000000000
--- a/platypush/plugins/zigbee/mqtt/_listener.py
+++ /dev/null
@@ -1,269 +0,0 @@
-import contextlib
-import json
-from typing import Mapping
-
-from platypush.backend.mqtt import MqttBackend
-from platypush.bus import Bus
-from platypush.context import get_bus, get_plugin
-from platypush.message.event.zigbee.mqtt import (
- ZigbeeMqttOnlineEvent,
- ZigbeeMqttOfflineEvent,
- ZigbeeMqttDevicePropertySetEvent,
- ZigbeeMqttDevicePairingEvent,
- ZigbeeMqttDeviceConnectedEvent,
- ZigbeeMqttDeviceBannedEvent,
- ZigbeeMqttDeviceRemovedEvent,
- ZigbeeMqttDeviceRemovedFailedEvent,
- ZigbeeMqttDeviceWhitelistedEvent,
- ZigbeeMqttDeviceRenamedEvent,
- ZigbeeMqttDeviceBindEvent,
- ZigbeeMqttDeviceUnbindEvent,
- ZigbeeMqttGroupAddedEvent,
- ZigbeeMqttGroupAddedFailedEvent,
- ZigbeeMqttGroupRemovedEvent,
- ZigbeeMqttGroupRemovedFailedEvent,
- ZigbeeMqttGroupRemoveAllEvent,
- ZigbeeMqttGroupRemoveAllFailedEvent,
- ZigbeeMqttErrorEvent,
-)
-from platypush.plugins.zigbee.mqtt import ZigbeeMqttPlugin
-
-
-class ZigbeeMqttListener(MqttBackend):
- """
- Listener for zigbee2mqtt events.
- """
-
- def __init__(self):
- plugin = self._plugin
- self.base_topic = plugin.base_topic # type: ignore
- self._devices = {}
- self._devices_info = {}
- self._groups = {}
- self._last_state = None
- self.server_info = {
- 'host': plugin.host, # type: ignore
- 'port': plugin.port or self._default_mqtt_port, # type: ignore
- 'tls_cafile': plugin.tls_cafile, # type: ignore
- 'tls_certfile': plugin.tls_certfile, # type: ignore
- 'tls_ciphers': plugin.tls_ciphers, # type: ignore
- 'tls_keyfile': plugin.tls_keyfile, # type: ignore
- 'tls_version': plugin.tls_version, # type: ignore
- 'username': plugin.username, # type: ignore
- 'password': plugin.password, # type: ignore
- }
-
- listeners = [
- {
- **self.server_info,
- 'topics': [
- self.base_topic + '/' + topic
- for topic in [
- 'bridge/state',
- 'bridge/log',
- 'bridge/logging',
- 'bridge/devices',
- 'bridge/groups',
- ]
- ],
- }
- ]
-
- super().__init__(
- subscribe_default_topic=False, listeners=listeners, **self.server_info
- )
-
- assert self.client_id
- self.client_id += '-zigbee-mqtt'
-
- def _process_state_message(self, client, msg):
- if msg == self._last_state:
- return
-
- if msg == 'online':
- evt = ZigbeeMqttOnlineEvent
- elif msg == 'offline':
- evt = ZigbeeMqttOfflineEvent
- self.logger.warning('zigbee2mqtt service is offline')
- else:
- return
-
- self._bus.post(evt(host=client._host, port=client._port))
- self._last_state = msg
-
- def _process_log_message(self, client, msg):
- msg_type = msg.get('type')
- text = msg.get('message')
- args = {'host': client._host, 'port': client._port}
-
- if msg_type == 'devices':
- devices = {}
- for dev in text or []:
- devices[dev['friendly_name']] = dev
- client.subscribe(self.base_topic + '/' + dev['friendly_name'])
- elif msg_type == 'pairing':
- self._bus.post(ZigbeeMqttDevicePairingEvent(device=text, **args))
- elif msg_type in ['device_ban', 'device_banned']:
- self._bus.post(ZigbeeMqttDeviceBannedEvent(device=text, **args))
- elif msg_type in ['device_removed_failed', 'device_force_removed_failed']:
- force = msg_type == 'device_force_removed_failed'
- self._bus.post(
- ZigbeeMqttDeviceRemovedFailedEvent(device=text, force=force, **args)
- )
- elif msg_type == 'device_whitelisted':
- self._bus.post(ZigbeeMqttDeviceWhitelistedEvent(device=text, **args))
- elif msg_type == 'device_renamed':
- self._bus.post(ZigbeeMqttDeviceRenamedEvent(device=text, **args))
- elif msg_type == 'device_bind':
- self._bus.post(ZigbeeMqttDeviceBindEvent(device=text, **args))
- elif msg_type == 'device_unbind':
- self._bus.post(ZigbeeMqttDeviceUnbindEvent(device=text, **args))
- elif msg_type == 'device_group_add':
- self._bus.post(ZigbeeMqttGroupAddedEvent(group=text, **args))
- elif msg_type == 'device_group_add_failed':
- self._bus.post(ZigbeeMqttGroupAddedFailedEvent(group=text, **args))
- elif msg_type == 'device_group_remove':
- self._bus.post(ZigbeeMqttGroupRemovedEvent(group=text, **args))
- elif msg_type == 'device_group_remove_failed':
- self._bus.post(ZigbeeMqttGroupRemovedFailedEvent(group=text, **args))
- elif msg_type == 'device_group_remove_all':
- self._bus.post(ZigbeeMqttGroupRemoveAllEvent(group=text, **args))
- elif msg_type == 'device_group_remove_all_failed':
- self._bus.post(ZigbeeMqttGroupRemoveAllFailedEvent(group=text, **args))
- elif msg_type == 'zigbee_publish_error':
- self.logger.error('zigbee2mqtt error: {}'.format(text))
- self._bus.post(ZigbeeMqttErrorEvent(error=text, **args))
- elif msg.get('level') in ['warning', 'error']:
- log = getattr(self.logger, msg['level'])
- log(
- 'zigbee2mqtt {}: {}'.format(
- msg['level'], text or msg.get('error', msg.get('warning'))
- )
- )
-
- def _process_devices(self, client, msg):
- devices_info = {
- device.get('friendly_name', device.get('ieee_address')): device
- for device in msg
- }
-
- # noinspection PyProtectedMember
- event_args = {'host': client._host, 'port': client._port}
- client.subscribe(
- *[self.base_topic + '/' + device for device in devices_info.keys()]
- )
-
- for name, device in devices_info.items():
- if name not in self._devices:
- self._bus.post(
- ZigbeeMqttDeviceConnectedEvent(device=name, **event_args)
- )
-
- exposes = (device.get('definition', {}) or {}).get('exposes', [])
- payload = self._plugin._build_device_get_request(exposes) # type: ignore
- if payload:
- client.publish(
- self.base_topic + '/' + name + '/get',
- json.dumps(payload),
- )
-
- devices_copy = [*self._devices.keys()]
- for name in devices_copy:
- if name not in devices_info:
- self._bus.post(ZigbeeMqttDeviceRemovedEvent(device=name, **event_args))
- del self._devices[name]
-
- self._devices = {device: {} for device in devices_info.keys()}
- self._devices_info = devices_info
-
- def _process_groups(self, client, msg):
- # noinspection PyProtectedMember
- event_args = {'host': client._host, 'port': client._port}
- groups_info = {
- group.get('friendly_name', group.get('id')): group for group in msg
- }
-
- for name in groups_info.keys():
- if name not in self._groups:
- self._bus.post(ZigbeeMqttGroupAddedEvent(group=name, **event_args))
-
- groups_copy = [*self._groups.keys()]
- for name in groups_copy:
- if name not in groups_info:
- self._bus.post(ZigbeeMqttGroupRemovedEvent(group=name, **event_args))
- del self._groups[name]
-
- self._groups = {group: {} for group in groups_info.keys()}
-
- def on_mqtt_message(self):
- def handler(client, _, msg):
- topic = msg.topic[len(self.base_topic) + 1 :]
- data = msg.payload.decode()
- if not data:
- return
-
- with contextlib.suppress(ValueError, TypeError):
- data = json.loads(data)
-
- if topic == 'bridge/state':
- self._process_state_message(client, data)
- elif topic in ['bridge/log', 'bridge/logging']:
- self._process_log_message(client, data)
- elif topic == 'bridge/devices':
- self._process_devices(client, data)
- elif topic == 'bridge/groups':
- self._process_groups(client, data)
- else:
- suffix = topic.split('/')[-1]
- if suffix not in self._devices:
- return
-
- name = suffix
- changed_props = {
- k: v for k, v in data.items() if v != self._devices[name].get(k)
- }
-
- if changed_props:
- self._process_property_update(name, data)
- self._bus.post(
- ZigbeeMqttDevicePropertySetEvent(
- host=client._host,
- port=client._port,
- device=name,
- properties=changed_props,
- )
- )
-
- self._devices[name].update(data)
-
- return handler
-
- @property
- def _plugin(self) -> ZigbeeMqttPlugin:
- plugin = get_plugin('zigbee.mqtt')
- assert plugin, 'The zigbee.mqtt plugin is not configured'
- return plugin
-
- @property
- def _bus(self) -> Bus:
- return get_bus()
-
- def _process_property_update(self, device_name: str, properties: Mapping):
- device_info = self._devices_info.get(device_name)
- if not (device_info and properties):
- return
-
- self._plugin.publish_entities( # type: ignore
- [
- {
- **device_info,
- 'state': properties,
- }
- ]
- )
-
- def run(self):
- super().run()
-
-
-# vim:sw=4:ts=4:et: