Fixed backend.zwave event logic dispatch for recent versions of ZWaveJS.

ZWaveJS has broken back-compatibility with zwavejs2mqtt when it comes to
events format.

Only a partial representation of the node and value objects is
forwarded, and that's often not sufficient to infer the full state of
the node with its values.

The `_dispatch_event` logic has therefore been modified to accommodate
both the implementation.

This means that we have to go conservative in order to preserve
back-compatibility and not over-complicate things, even if it (slightly)
comes at the expense of performance.
This commit is contained in:
Fabio Manganiello 2022-12-10 14:52:10 +01:00
parent a17bc3c474
commit 6713bf6994
Signed by: blacklight
GPG key ID: D90FBA7F76362774

View file

@ -5,9 +5,17 @@ from typing import Optional, Type
from platypush.backend.mqtt import MqttBackend from platypush.backend.mqtt import MqttBackend
from platypush.context import get_plugin from platypush.context import get_plugin
from platypush.message.event.zwave import ZwaveEvent, ZwaveNodeAddedEvent, ZwaveValueChangedEvent, \ from platypush.message.event.zwave import (
ZwaveNodeRemovedEvent, ZwaveNodeRenamedEvent, ZwaveNodeReadyEvent, ZwaveNodeEvent, ZwaveNodeAsleepEvent, \ ZwaveEvent,
ZwaveNodeAwakeEvent ZwaveNodeAddedEvent,
ZwaveValueChangedEvent,
ZwaveNodeRemovedEvent,
ZwaveNodeRenamedEvent,
ZwaveNodeReadyEvent,
ZwaveNodeEvent,
ZwaveNodeAsleepEvent,
ZwaveNodeAwakeEvent,
)
class ZwaveMqttBackend(MqttBackend): class ZwaveMqttBackend(MqttBackend):
@ -41,6 +49,7 @@ class ZwaveMqttBackend(MqttBackend):
""" """
from platypush.plugins.zwave.mqtt import ZwaveMqttPlugin from platypush.plugins.zwave.mqtt import ZwaveMqttPlugin
self.plugin: ZwaveMqttPlugin = get_plugin('zwave.mqtt') self.plugin: ZwaveMqttPlugin = get_plugin('zwave.mqtt')
assert self.plugin, 'The zwave.mqtt plugin is not configured' assert self.plugin, 'The zwave.mqtt plugin is not configured'
@ -61,75 +70,117 @@ class ZwaveMqttBackend(MqttBackend):
'password': self.plugin.password, 'password': self.plugin.password,
} }
listeners = [{ listeners = [
**self.server_info, {
'topics': [ **self.server_info,
self.plugin.events_topic + '/node/' + topic 'topics': [
for topic in ['node_ready', 'node_sleep', 'node_value_updated', 'node_metadata_updated', 'node_wakeup'] self.plugin.events_topic + '/node/' + topic
], for topic in [
}] 'node_ready',
'node_sleep',
'node_value_updated',
'node_metadata_updated',
'node_wakeup',
]
],
}
]
super().__init__(*args, subscribe_default_topic=False, listeners=listeners, client_id=client_id, **kwargs) super().__init__(
*args,
subscribe_default_topic=False,
listeners=listeners,
client_id=client_id,
**kwargs,
)
if not client_id: if not client_id:
self.client_id += '-zwavejs-mqtt' self.client_id += '-zwavejs-mqtt'
def _dispatch_event(self, event_type: Type[ZwaveEvent], node: Optional[dict] = None, value: Optional[dict] = None, def _dispatch_event(
**kwargs): self,
if value and 'id' not in value: event_type: Type[ZwaveEvent],
value_id = f"{value['commandClass']}-{value.get('endpoint', 0)}-{value['property']}" node: dict,
if 'propertyKey' in value: value: Optional[dict] = None,
value_id += '-' + str(value['propertyKey']) **kwargs,
):
node_id = node.get('id')
assert node_id is not None, 'No node ID specified'
if value_id not in node.get('values', {}): # This is far from efficient (we are querying the latest version of the whole
self.logger.warning(f'value_id {value_id} not found on node {node["id"]}') # node for every event we receive), but this is the best we can do with recent
# versions of ZWaveJS that only transmit partial representations of the node and
# the value. The alternative would be to come up with a complex logic for merging
# cached and new values, with the risk of breaking back-compatibility with earlier
# implementations of zwavejs2mqtt.
node = kwargs['node'] = self.plugin.get_nodes(node_id).output # type: ignore
node_values = node.get('values', {})
if node and value:
# Infer the value_id structure if it's not provided on the event
value_id = value.get('id')
if value_id is None:
value_id = f"{value['commandClass']}-{value.get('endpoint', 0)}-{value['property']}"
if 'propertyKey' in value:
value_id += '-' + str(value['propertyKey'])
# Prepend the node_id to value_id if it's not available in node['values']
# (compatibility with more recent versions of ZwaveJS that don't provide
# the value_id on the events)
if value_id not in node_values:
value_id = f"{node_id}-{value_id}"
if value_id not in node_values:
self.logger.warning(f'value_id {value_id} not found on node {node_id}')
return return
value = node['values'][value_id] value = kwargs['value'] = node_values[value_id]
if value: if event_type == ZwaveNodeEvent:
kwargs['value'] = self.plugin.value_to_dict(value) # If this node_id wasn't cached before, then it's a new node
if node_id not in self._nodes:
if node: event_type = ZwaveNodeAddedEvent
kwargs['node'] = self.plugin.node_to_dict(node) # If the name has changed, we have a rename event
node_id = kwargs['node']['node_id'] elif node['name'] != self._nodes[node_id]['name']:
event_type = ZwaveNodeRenamedEvent
if event_type == ZwaveNodeEvent:
if node_id not in self._nodes:
event_type = ZwaveNodeAddedEvent
elif kwargs['node']['name'] != self._nodes[node_id]['name']:
event_type = ZwaveNodeRenamedEvent
if event_type == ZwaveNodeRemovedEvent: if event_type == ZwaveNodeRemovedEvent:
self._nodes.pop(node_id, None) self._nodes.pop(node_id, None)
else: else:
self._nodes[node_id] = kwargs['node'] self._nodes[node_id] = node
evt = event_type(**kwargs) evt = event_type(**kwargs)
self._events_queue.put(evt) self._events_queue.put(evt)
# zwavejs2mqtt currently treats some values (e.g. binary switches) in an inconsistent way, if value and event_type == ZwaveValueChangedEvent:
# using two values - a read-only value called currentValue that gets updated on the value = value.copy()
# node_value_updated topic, and a writable value called targetValue that doesn't get updated
# (see https://github.com/zwave-js/zwavejs2mqtt/blob/4a6a5c5f1274763fd3aced4cae2c72ea060716b5/docs/guide/migrating.md).
# To properly manage updates on writable values, propagate an event for both.
if event_type == ZwaveValueChangedEvent and kwargs.get('value', {}).get('property_id') == 'currentValue':
value = kwargs['value'].copy()
target_value_id = f'{kwargs["node"]["node_id"]}-{value["command_class"]}-{value.get("endpoint", 0)}' \
f'-targetValue'
kwargs['value'] = kwargs['node'].get('values', {}).get(target_value_id)
if kwargs['value']: # zwavejs2mqtt currently treats some values (e.g. binary switches) in an inconsistent way,
kwargs['value']['data'] = value['data'] # using two values - a read-only value called currentValue that gets updated on the
kwargs['node']['values'][target_value_id] = kwargs['value'] # node_value_updated topic, and a writable value called targetValue that doesn't get updated
evt = event_type(**kwargs) # (see https://github.com/zwave-js/zwavejs2mqtt/blob/4a6a5c5f1274763fd3aced4cae2c72ea060716b5 \
self._events_queue.put(evt) # /docs/guide/migrating.md).
# To properly manage updates on writable values, propagate an event for both.
if value.get('property_id') == 'currentValue':
target_value_id = (
f'{node_id}-{value["command_class"]}-{value.get("endpoint", 0)}'
f'-targetValue'
)
target_value = node_values.get(target_value_id)
if target_value:
kwargs['value']['data'] = value['data']
kwargs['node']['values'][target_value_id] = kwargs['value']
evt = event_type(**kwargs)
self._events_queue.put(evt)
def on_mqtt_message(self): def on_mqtt_message(self):
def handler(_, __, msg): def handler(_, __, msg):
if not msg.topic.startswith(self.events_topic): if not msg.topic.startswith(self.events_topic):
return return
topic = msg.topic[len(self.events_topic) + 1:].split('/').pop() topic = (
msg.topic[(len(self.events_topic) + 1) :].split('/').pop() # noqa: E203
)
data = msg.payload.decode() data = msg.payload.decode()
if not data: if not data:
return return
@ -141,7 +192,9 @@ class ZwaveMqttBackend(MqttBackend):
try: try:
if topic == 'node_value_updated': if topic == 'node_value_updated':
self._dispatch_event(ZwaveValueChangedEvent, node=data[0], value=data[1]) self._dispatch_event(
ZwaveValueChangedEvent, node=data[0], value=data[1]
)
elif topic == 'node_metadata_updated': elif topic == 'node_metadata_updated':
self._dispatch_event(ZwaveNodeEvent, node=data[0]) self._dispatch_event(ZwaveNodeEvent, node=data[0])
elif topic == 'node_sleep': elif topic == 'node_sleep':
@ -160,7 +213,6 @@ class ZwaveMqttBackend(MqttBackend):
def run(self): def run(self):
super().run() super().run()
self.logger.debug('Refreshing Z-Wave nodes') self.logger.debug('Refreshing Z-Wave nodes')
# noinspection PyUnresolvedReferences
self._nodes = self.plugin.get_nodes().output self._nodes = self.plugin.get_nodes().output
while not self.should_stop(): while not self.should_stop():