Fire an EntityUpdateEvent when the zwave.mqtt backend gets a value changed message

This commit is contained in:
Fabio Manganiello 2022-04-11 01:40:49 +02:00
parent 4471001110
commit db4ad5825e
Signed by: blacklight
GPG key ID: D90FBA7F76362774
2 changed files with 89 additions and 46 deletions

View file

@ -1,3 +1,4 @@
import contextlib
import json import json
from queue import Queue, Empty from queue import Queue, Empty
from typing import Optional, Type from typing import Optional, Type
@ -5,14 +6,24 @@ from typing import Optional, Type
from platypush.backend.mqtt import MqttBackend from platypush.backend.mqtt import MqttBackend
from platypush.context import get_plugin from platypush.context import get_plugin
from platypush.message.event.zwave import ZwaveEvent, ZwaveNodeAddedEvent, ZwaveValueChangedEvent, \ from platypush.message.event.zwave import (
ZwaveNodeRemovedEvent, ZwaveNodeRenamedEvent, ZwaveNodeReadyEvent, ZwaveNodeEvent, ZwaveNodeAsleepEvent, \ ZwaveEvent,
ZwaveNodeAwakeEvent ZwaveNodeAddedEvent,
ZwaveValueChangedEvent,
ZwaveNodeRemovedEvent,
ZwaveNodeRenamedEvent,
ZwaveNodeReadyEvent,
ZwaveNodeEvent,
ZwaveNodeAsleepEvent,
ZwaveNodeAwakeEvent,
)
class ZwaveMqttBackend(MqttBackend): class ZwaveMqttBackend(MqttBackend):
""" """
Listen for events on a `zwavejs2mqtt <https://github.com/zwave-js/zwavejs2mqtt>`_ service. Listen for events on a `zwavejs2mqtt <https://github.com/zwave-js/zwavejs2mqtt>`_ service.
For historical reasons, this should be enabled together with the ``zwave.mqtt`` plugin,
even though the actual configuration is only specified on the plugin.
Triggers: Triggers:
@ -41,6 +52,7 @@ class ZwaveMqttBackend(MqttBackend):
""" """
from platypush.plugins.zwave.mqtt import ZwaveMqttPlugin from platypush.plugins.zwave.mqtt import ZwaveMqttPlugin
self.plugin: ZwaveMqttPlugin = get_plugin('zwave.mqtt') self.plugin: ZwaveMqttPlugin = get_plugin('zwave.mqtt')
assert self.plugin, 'The zwave.mqtt plugin is not configured' assert self.plugin, 'The zwave.mqtt plugin is not configured'
@ -61,27 +73,48 @@ class ZwaveMqttBackend(MqttBackend):
'password': self.plugin.password, 'password': self.plugin.password,
} }
listeners = [{ listeners = [
**self.server_info, {
'topics': [ **self.server_info,
self.plugin.events_topic + '/node/' + topic 'topics': [
for topic in ['node_ready', 'node_sleep', 'node_value_updated', 'node_metadata_updated', 'node_wakeup'] self.plugin.events_topic + '/node/' + topic
], for topic in [
}] 'node_ready',
'node_sleep',
'node_value_updated',
'node_metadata_updated',
'node_wakeup',
]
],
}
]
super().__init__(*args, subscribe_default_topic=False, listeners=listeners, client_id=client_id, **kwargs) super().__init__(
*args,
subscribe_default_topic=False,
listeners=listeners,
client_id=client_id,
**kwargs,
)
if not client_id: if not client_id:
self.client_id += '-zwavejs-mqtt' self.client_id += '-zwavejs-mqtt'
def _dispatch_event(self, event_type: Type[ZwaveEvent], node: Optional[dict] = None, value: Optional[dict] = None, def _dispatch_event(
**kwargs): self,
event_type: Type[ZwaveEvent],
node: Optional[dict] = None,
value: Optional[dict] = None,
**kwargs,
):
if value and 'id' not in value: if value and 'id' not in value:
value_id = f"{value['commandClass']}-{value.get('endpoint', 0)}-{value['property']}" value_id = f"{value['commandClass']}-{value.get('endpoint', 0)}-{value['property']}"
if 'propertyKey' in value: if 'propertyKey' in value:
value_id += '-' + str(value['propertyKey']) value_id += '-' + str(value['propertyKey'])
if value_id not in node.get('values', {}): if value_id not in node.get('values', {}):
self.logger.warning(f'value_id {value_id} not found on node {node["id"]}') self.logger.warning(
f'value_id {value_id} not found on node {node["id"]}'
)
return return
value = node['values'][value_id] value = node['values'][value_id]
@ -107,41 +140,47 @@ class ZwaveMqttBackend(MqttBackend):
evt = event_type(**kwargs) evt = event_type(**kwargs)
self._events_queue.put(evt) self._events_queue.put(evt)
# zwavejs2mqtt currently treats some values (e.g. binary switches) in an inconsistent way, if event_type == ZwaveValueChangedEvent:
# using two values - a read-only value called currentValue that gets updated on the # zwavejs2mqtt currently treats some values (e.g. binary switches) in an inconsistent way,
# node_value_updated topic, and a writable value called targetValue that doesn't get updated # using two values - a read-only value called currentValue that gets updated on the
# (see https://github.com/zwave-js/zwavejs2mqtt/blob/4a6a5c5f1274763fd3aced4cae2c72ea060716b5/docs/guide/migrating.md). # node_value_updated topic, and a writable value called targetValue that doesn't get updated
# To properly manage updates on writable values, propagate an event for both. # (see https://github.com/zwave-js/zwavejs2mqtt/blob/4a6a5c5f1274763fd3aced4cae2c72ea060716b5 \
if event_type == ZwaveValueChangedEvent and kwargs.get('value', {}).get('property_id') == 'currentValue': # /docs/guide/migrating.md).
value = kwargs['value'].copy() # To properly manage updates on writable values, propagate an event for both.
target_value_id = f'{kwargs["node"]["node_id"]}-{value["command_class"]}-{value.get("endpoint", 0)}' \ if kwargs.get('value', {}).get('property_id') == 'currentValue':
f'-targetValue' value = kwargs['value'].copy()
kwargs['value'] = kwargs['node'].get('values', {}).get(target_value_id) target_value_id = (
f'{kwargs["node"]["node_id"]}-{value["command_class"]}-{value.get("endpoint", 0)}'
f'-targetValue'
)
kwargs['value'] = kwargs['node'].get('values', {}).get(target_value_id)
if kwargs['value']: if kwargs['value']:
kwargs['value']['data'] = value['data'] kwargs['value']['data'] = value['data']
kwargs['node']['values'][target_value_id] = kwargs['value'] kwargs['node']['values'][target_value_id] = kwargs['value']
evt = event_type(**kwargs) evt = event_type(**kwargs)
self._events_queue.put(evt) self._events_queue.put(evt)
self.plugin.publish_entities([kwargs['value']]) # type: ignore
def on_mqtt_message(self): def on_mqtt_message(self):
def handler(_, __, msg): def handler(_, __, msg):
if not msg.topic.startswith(self.events_topic): if not msg.topic.startswith(self.events_topic):
return return
topic = msg.topic[len(self.events_topic) + 1:].split('/').pop() topic = msg.topic[len(self.events_topic) + 1 :].split('/').pop()
data = msg.payload.decode() data = msg.payload.decode()
if not data: if not data:
return return
try: with contextlib.suppress(ValueError, TypeError):
data = json.loads(data)['data'] data = json.loads(data)['data']
except (ValueError, TypeError):
pass
try: try:
if topic == 'node_value_updated': if topic == 'node_value_updated':
self._dispatch_event(ZwaveValueChangedEvent, node=data[0], value=data[1]) self._dispatch_event(
ZwaveValueChangedEvent, node=data[0], value=data[1]
)
elif topic == 'node_metadata_updated': elif topic == 'node_metadata_updated':
self._dispatch_event(ZwaveNodeEvent, node=data[0]) self._dispatch_event(ZwaveNodeEvent, node=data[0])
elif topic == 'node_sleep': elif topic == 'node_sleep':

View file

@ -22,6 +22,10 @@ class ZwaveMqttPlugin(MqttPlugin, ZwaveBasePlugin):
This plugin allows you to manage a Z-Wave network over MQTT through This plugin allows you to manage a Z-Wave network over MQTT through
`zwavejs2mqtt <https://github.com/zwave-js/zwavejs2mqtt>`_. `zwavejs2mqtt <https://github.com/zwave-js/zwavejs2mqtt>`_.
For historical reasons, it is advised to enabled this plugin together
with the ``zwave.mqtt`` backend, or you may lose the ability to listen
to asynchronous events.
Configuration required on the zwavejs2mqtt gateway: Configuration required on the zwavejs2mqtt gateway:
* Install the gateway following the instructions reported * Install the gateway following the instructions reported
@ -1124,21 +1128,21 @@ class ZwaveMqttPlugin(MqttPlugin, ZwaveBasePlugin):
) )
@action @action
def set_value_label(self, **kwargs): def set_value_label(self, **_):
""" """
Change the label/name of a value (not implemented by zwavejs2mqtt). Change the label/name of a value (not implemented by zwavejs2mqtt).
""" """
raise _NOT_IMPLEMENTED_ERR raise _NOT_IMPLEMENTED_ERR
@action @action
def node_add_value(self, **kwargs): def node_add_value(self, **_):
""" """
Add a value to a node (not implemented by zwavejs2mqtt). Add a value to a node (not implemented by zwavejs2mqtt).
""" """
raise _NOT_IMPLEMENTED_ERR raise _NOT_IMPLEMENTED_ERR
@action @action
def node_remove_value(self, **kwargs): def node_remove_value(self, **_):
""" """
Remove a value from a node (not implemented by zwavejs2mqtt). Remove a value from a node (not implemented by zwavejs2mqtt).
""" """
@ -1492,7 +1496,7 @@ class ZwaveMqttPlugin(MqttPlugin, ZwaveBasePlugin):
return self._groups_cache return self._groups_cache
@action @action
def get_scenes(self, **kwargs) -> Dict[int, Dict[str, Any]]: def get_scenes(self, **_) -> Dict[int, Dict[str, Any]]:
""" """
Get the scenes configured on the network. Get the scenes configured on the network.
@ -1528,7 +1532,7 @@ class ZwaveMqttPlugin(MqttPlugin, ZwaveBasePlugin):
} }
@action @action
def create_scene(self, label: str, **kwargs): def create_scene(self, label: str, **_):
""" """
Create a new scene. Create a new scene.
@ -1788,7 +1792,7 @@ class ZwaveMqttPlugin(MqttPlugin, ZwaveBasePlugin):
) )
@action @action
def create_new_primary(self, **kwargs): def create_new_primary(self, **_):
""" """
Create a new primary controller on the network when the previous primary fails Create a new primary controller on the network when the previous primary fails
(not implemented by zwavejs2mqtt). (not implemented by zwavejs2mqtt).
@ -1799,7 +1803,7 @@ class ZwaveMqttPlugin(MqttPlugin, ZwaveBasePlugin):
raise _NOT_IMPLEMENTED_ERR raise _NOT_IMPLEMENTED_ERR
@action @action
def hard_reset(self, **kwargs): def hard_reset(self, **_):
""" """
Perform a hard reset of the controller. It erases its network configuration settings. Perform a hard reset of the controller. It erases its network configuration settings.
The controller becomes a primary controller ready to add devices to a new network. The controller becomes a primary controller ready to add devices to a new network.
@ -1810,7 +1814,7 @@ class ZwaveMqttPlugin(MqttPlugin, ZwaveBasePlugin):
self._api_request('hardReset') self._api_request('hardReset')
@action @action
def soft_reset(self, **kwargs): def soft_reset(self, **_):
""" """
Perform a soft reset of the controller. Perform a soft reset of the controller.
Resets a controller without erasing its network configuration settings (not implemented by zwavejs2). Resets a controller without erasing its network configuration settings (not implemented by zwavejs2).
@ -1821,7 +1825,7 @@ class ZwaveMqttPlugin(MqttPlugin, ZwaveBasePlugin):
raise _NOT_IMPLEMENTED_ERR raise _NOT_IMPLEMENTED_ERR
@action @action
def write_config(self, **kwargs): def write_config(self, **_):
""" """
Store the current configuration of the network to the user directory (not implemented by zwavejs2mqtt). Store the current configuration of the network to the user directory (not implemented by zwavejs2mqtt).
@ -1831,7 +1835,7 @@ class ZwaveMqttPlugin(MqttPlugin, ZwaveBasePlugin):
raise _NOT_IMPLEMENTED_ERR raise _NOT_IMPLEMENTED_ERR
@action @action
def on(self, device: str, *args, **kwargs): def on(self, device: str, *_, **kwargs):
""" """
Turn on a switch on a device. Turn on a switch on a device.
@ -1842,7 +1846,7 @@ class ZwaveMqttPlugin(MqttPlugin, ZwaveBasePlugin):
self.set_value(data=True, id_on_network=device, **kwargs) self.set_value(data=True, id_on_network=device, **kwargs)
@action @action
def off(self, device: str, *args, **kwargs): def off(self, device: str, *_, **kwargs):
""" """
Turn off a switch on a device. Turn off a switch on a device.
@ -1853,7 +1857,7 @@ class ZwaveMqttPlugin(MqttPlugin, ZwaveBasePlugin):
self.set_value(data=False, id_on_network=device, **kwargs) self.set_value(data=False, id_on_network=device, **kwargs)
@action @action
def toggle(self, device: str, *args, **kwargs) -> dict: def toggle(self, device: str, *_, **kwargs) -> dict:
""" """
Toggle a switch on a device. Toggle a switch on a device.