Fixed management of state on `zigbee.mqtt`.

Before the merge of the plugin and the listener those components
used to have their own separate state, which led to inconsistencies.
This commit is contained in:
Fabio Manganiello 2023-09-14 23:05:27 +02:00
parent 5a514fdcce
commit ac72b2f7a8
Signed by: blacklight
GPG Key ID: D90FBA7F76362774
3 changed files with 139 additions and 126 deletions

View File

@ -25,8 +25,8 @@ class EntitiesEngine(Thread):
together (preventing excessive writes and throttling events), and
prevents race conditions when SQLite is used.
2. Merge any existing entities with their newer representations.
3. Update the entities taxonomy.
4. Persist the new state to the entities database.
3. Update the entities' taxonomy.
4. Persist the new state to the entities' database.
5. Trigger events for the updated entities.
"""

View File

@ -1,11 +1,9 @@
import contextlib
from dataclasses import dataclass, field
from enum import Enum
import json
import re
import threading
from queue import Queue
from queue import Empty, Queue
from typing import (
Any,
Collection,
@ -75,82 +73,7 @@ from platypush.message.event.zigbee.mqtt import (
)
from platypush.message.response import Response
from platypush.plugins.mqtt import DEFAULT_TIMEOUT, MqttClient, MqttPlugin, action
class BridgeState(Enum):
"""
Known bridge states.
"""
ONLINE = 'online'
OFFLINE = 'offline'
@dataclass
class ZigbeeDevicesInfo:
"""
Cached information about the devices on the network.
"""
by_address: Dict[str, dict] = field(default_factory=dict)
by_name: Dict[str, dict] = field(default_factory=dict)
def __contains__(self, name: str) -> bool:
"""
:return: True if the device with the given name exists in the cache.
"""
return name in self.by_name or name in self.by_address
def get(self, name: str) -> Optional[dict]:
"""
Retrieves a cached device record either by name or by address.
"""
return self.by_address.get(name, self.by_name.get(name))
def add(self, device: dict):
"""
Adds a device record to the cache.
"""
if device.get('ieee_address'):
self.by_address[device['ieee_address']] = device
if device.get('friendly_name'):
self.by_name[device['friendly_name']] = device
def remove(self, device: Union[str, dict]):
"""
Removes a device record from the cache.
"""
if isinstance(device, str):
dev = self.get(device)
if not dev:
return # No such device
else:
dev = device
if dev.get('ieee_address'):
self.by_address.pop(dev['ieee_address'], None)
if dev.get('friendly_name'):
self.by_name.pop(dev['friendly_name'], None)
def reset(self, *keys: str):
"""
Reset the state for the devices with the given keys.
"""
for k in keys:
self.by_address[k] = {}
self.by_name[k] = {}
@dataclass
class ZigbeeInfo:
"""
Cached information about the devices and groups on the network.
"""
devices: ZigbeeDevicesInfo = field(default_factory=ZigbeeDevicesInfo)
groups: Dict[str, dict] = field(default_factory=dict)
from ._state import BridgeState, ZigbeeState
# pylint: disable=too-many-ancestors
@ -331,7 +254,7 @@ class ZigbeeMqttPlugin(
"""
if base_topic:
self.logger.warning(
'base_topic is deprprecated, please use topic_prefix instead'
'base_topic is deprecated, please use topic_prefix instead'
)
topic_prefix = base_topic
@ -354,17 +277,12 @@ class ZigbeeMqttPlugin(
tls_ciphers=tls_ciphers,
username=username,
password=password,
timeout=timeout,
**kwargs,
)
# Append a unique suffix to the client ID to avoid client name clashes
# with other MQTT plugins.
self.client_id += '-zigbee-mqtt'
self.topic_prefix = topic_prefix
self.timeout = timeout
self._info = ZigbeeInfo()
self._devices_meta: Dict[str, dict] = {}
self._bridge_state = BridgeState.OFFLINE
self._info = ZigbeeState()
@staticmethod
def _get_properties(device: dict) -> dict:
@ -626,7 +544,7 @@ class ZigbeeMqttPlugin(
**kwargs,
) -> dict:
"""
Sends a request/message to the Zigbeebee2MQTT bridge and waits for a
Sends a request/message to the Zigbee2MQTT bridge and waits for a
response.
"""
return self._parse_response(
@ -677,7 +595,7 @@ class ZigbeeMqttPlugin(
{
"date_code": "20180906",
"friendly_name": "My Lightbulb",
"friendly_name": "My Light Bulb",
"ieee_address": "0x00123456789abcdf",
"network_address": 52715,
"power_source": "Mains (single phase)",
@ -1013,26 +931,25 @@ class ZigbeeMqttPlugin(
converted.
"""
def extract_value(value: dict, root: dict, depth: int = 0):
for feature in value.get('features', []):
def extract_value(val: dict, root: dict, depth: int = 0):
for feature in val.get('features', []):
new_root = root
if depth > 0:
new_root = root[value['property']] = root.get(value['property'], {})
new_root = root[val['property']] = root.get(val['property'], {})
extract_value(feature, new_root, depth=depth + 1)
if not value.get('access', 1) & 0x4:
if not val.get('access', 1) & 0x4:
# Property not readable/query-able
return
if 'features' not in value:
if 'property' in value:
root[value['property']] = 0 if value['type'] == 'numeric' else ''
if 'features' not in val:
if 'property' in val:
root[val['property']] = 0 if val['type'] == 'numeric' else ''
return
if 'property' in value:
root[value['property']] = root.get(value['property'], {})
root = root[value['property']]
if 'property' in val:
root[val['property']] = root.get(val['property'], {})
ret: Dict[str, Any] = {}
for value in values:
@ -1125,10 +1042,7 @@ class ZigbeeMqttPlugin(
# If the device has no queryable properties, don't specify a reply
# topic to listen on
req = self._build_device_get_request(exposes)
reply_topic = self._topic(device)
if not req:
reply_topic = None
reply_topic = self._topic(device) if req else None
return self._run_request(
topic=self._topic(device) + '/get',
reply_topic=reply_topic,
@ -1174,7 +1088,7 @@ class ZigbeeMqttPlugin(
)
def worker(device: str, q: Queue):
q.put(self.device_get(device, **kwargs).output) # type: ignore
q.put_nowait(self.device_get(device, **kwargs).output) # type: ignore
queues: Dict[str, Queue] = {}
workers = {}
@ -1188,9 +1102,16 @@ class ZigbeeMqttPlugin(
workers[device].start()
for device in devices:
timeout = kwargs.get('timeout')
try:
response[device] = queues[device].get(timeout=kwargs.get('timeout'))
workers[device].join(timeout=kwargs.get('timeout'))
response[device] = queues[device].get(timeout=timeout)
workers[device].join(timeout=timeout)
except Empty:
self.logger.warning(
'Could not get the status of the device %s: timeout after %f seconds',
device,
timeout,
)
except Exception as e:
self.logger.warning(
'An error occurred while getting the status of the device %s: %s',
@ -1198,6 +1119,8 @@ class ZigbeeMqttPlugin(
e,
)
self.logger.exception(e)
return response
@action
@ -1288,6 +1211,7 @@ class ZigbeeMqttPlugin(
:param property: Name of the property to set. If not specified here, it
should be specified on ``device`` in ``<address>:<property>``
format.
:param data: Value to set for the property.
:param kwargs: Extra arguments to be passed to
:meth:`platypush.plugins.mqtt.MqttPlugin.publish`` (default: query
the default configured device).
@ -1429,7 +1353,7 @@ class ZigbeeMqttPlugin(
"device_options": {},
"devices": {
"0x00123456789abcdf": {
"friendly_name": "My Lightbulb"
"friendly_name": "My Light Bulb"
}
},
"experimental": {
@ -1828,7 +1752,7 @@ class ZigbeeMqttPlugin(
@staticmethod
def _is_query_disabled(feature: dict) -> bool:
"""
Utility method that checks if a feature doesn't support programmating
Utility method that checks if a feature doesn't support programmatic
querying (i.e. it will only broadcast its state when available) on the
basis of its access flags.
"""
@ -1854,9 +1778,9 @@ class ZigbeeMqttPlugin(
# IEEE address + property format
if re.search(r'^0x[0-9a-fA-F]{16}:', dev):
parts = dev.split(':')
return (parts[0], parts[1] if len(parts) > 1 else None)
return parts[0], parts[1] if len(parts) > 1 else None
return (dev, None)
return dev, None
@classmethod
def _ieee_address(cls, device: Union[dict, str]) -> str:
@ -2220,7 +2144,8 @@ class ZigbeeMqttPlugin(
"""
def handler(client: MqttClient, _, msg: mqtt.MQTTMessage):
topic = msg.topic[len(self.topic_prefix) + 1 :]
topic_idx = len(self.topic_prefix) + 1
topic = msg.topic[topic_idx:]
data = msg.payload.decode()
if not data:
return
@ -2244,7 +2169,9 @@ class ZigbeeMqttPlugin(
dev = self._info.devices.get(name)
assert dev is not None, f'No such device: {name}'
changed_props = {k: v for k, v in data.items() if v != dev.get(k)}
changed_props = {
k: v for k, v in data.items() if v != dev.get('state', {}).get(k)
}
if changed_props:
self._process_property_update(name, data)
@ -2257,11 +2184,11 @@ class ZigbeeMqttPlugin(
)
)
device_meta = self._devices_meta.get(name)
if device_meta:
data['friendly_name'] = device_meta.get('friendly_name')
data['ieee_address'] = device_meta.get('ieee_address')
self._info.devices.add(data)
dev = self._info.devices.get(name)
if dev:
self._info.devices.set_state(
dev.get('friendly_name') or dev.get('ieee_address'), data
)
return handler
@ -2276,15 +2203,15 @@ class ZigbeeMqttPlugin(
"""
Process a state message.
"""
if msg == self._bridge_state:
if msg == self._info.bridge_state:
return
if msg == 'online':
evt = ZigbeeMqttOnlineEvent
self._bridge_state = BridgeState.ONLINE
self._info.bridge_state = BridgeState.ONLINE
elif msg == 'offline':
evt = ZigbeeMqttOfflineEvent
self._bridge_state = BridgeState.OFFLINE
self._info.bridge_state = BridgeState.OFFLINE
self.logger.warning('The zigbee2mqtt service is offline')
else:
return
@ -2294,7 +2221,7 @@ class ZigbeeMqttPlugin(
# pylint: disable=too-many-branches
def _process_log_message(self, client, msg):
"""
Process a logevent.
Process a log event.
"""
msg_type = msg.get('type')
@ -2384,8 +2311,8 @@ class ZigbeeMqttPlugin(
self._bus.post(ZigbeeMqttDeviceRemovedEvent(device=name, **event_args))
self._info.devices.remove(name)
self._info.devices.reset(*devices_info)
self._devices_meta = devices_info
for dev in devices_info.values():
self._info.devices.add(dev)
def _process_groups(self, client: MqttClient, msg):
"""
@ -2417,7 +2344,7 @@ class ZigbeeMqttPlugin(
It will appropriately forward an
:class:`platypush.message.event.entities.EntityUpdateEvent` to the bus.
"""
device_info = self._devices_meta.get(device_name)
device_info = self._info.devices.get(device_name)
if not (device_info and properties):
return

View File

@ -0,0 +1,86 @@
from dataclasses import dataclass, field
from enum import Enum
from typing import Dict, Optional, Union
class BridgeState(Enum):
"""
Known bridge states.
"""
ONLINE = 'online'
OFFLINE = 'offline'
@dataclass
class ZigbeeDevices:
"""
Cached information about the devices on the network.
"""
by_address: Dict[str, dict] = field(default_factory=dict)
by_name: Dict[str, dict] = field(default_factory=dict)
def __contains__(self, name: str) -> bool:
"""
:return: True if the device with the given name exists in the cache.
"""
return name in self.by_name or name in self.by_address
def get(self, name: str) -> Optional[dict]:
"""
Retrieves a cached device record either by name or by address.
"""
return self.by_address.get(name, self.by_name.get(name))
def add(self, device: dict):
"""
Adds a device record to the cache.
"""
if device.get('ieee_address'):
self.by_address[device['ieee_address']] = device
if device.get('friendly_name'):
self.by_name[device['friendly_name']] = device
if not device.get('state'):
device['state'] = {}
def remove(self, device: Union[str, dict]):
"""
Removes a device record from the cache.
"""
if isinstance(device, str):
dev = self.get(device)
if not dev:
return # No such device
else:
dev = device
if dev.get('ieee_address'):
self.by_address.pop(dev['ieee_address'], None)
if dev.get('friendly_name'):
self.by_name.pop(dev['friendly_name'], None)
def set_state(self, device: str, state: dict):
"""
Updates the state of a device in the cache.
:param device: Name or address of the device.
:param state: Map containing the new state.
"""
dev = self.get(device)
if not dev:
return
dev['state'] = state
@dataclass
class ZigbeeState:
"""
Cached information about the devices and groups on the network.
"""
devices: ZigbeeDevices = field(default_factory=ZigbeeDevices)
groups: Dict[str, dict] = field(default_factory=dict)
bridge_state: BridgeState = BridgeState.OFFLINE