diff --git a/platypush/backend/zwave/mqtt/__init__.py b/platypush/backend/zwave/mqtt/__init__.py index 7d38c8e184..ae5cb7d060 100644 --- a/platypush/backend/zwave/mqtt/__init__.py +++ b/platypush/backend/zwave/mqtt/__init__.py @@ -5,9 +5,17 @@ from typing import Optional, Type from platypush.backend.mqtt import MqttBackend from platypush.context import get_plugin -from platypush.message.event.zwave import ZwaveEvent, ZwaveNodeAddedEvent, ZwaveValueChangedEvent, \ - ZwaveNodeRemovedEvent, ZwaveNodeRenamedEvent, ZwaveNodeReadyEvent, ZwaveNodeEvent, ZwaveNodeAsleepEvent, \ - ZwaveNodeAwakeEvent +from platypush.message.event.zwave import ( + ZwaveEvent, + ZwaveNodeAddedEvent, + ZwaveValueChangedEvent, + ZwaveNodeRemovedEvent, + ZwaveNodeRenamedEvent, + ZwaveNodeReadyEvent, + ZwaveNodeEvent, + ZwaveNodeAsleepEvent, + ZwaveNodeAwakeEvent, +) class ZwaveMqttBackend(MqttBackend): @@ -41,6 +49,7 @@ class ZwaveMqttBackend(MqttBackend): """ from platypush.plugins.zwave.mqtt import ZwaveMqttPlugin + self.plugin: ZwaveMqttPlugin = get_plugin('zwave.mqtt') assert self.plugin, 'The zwave.mqtt plugin is not configured' @@ -61,75 +70,117 @@ class ZwaveMqttBackend(MqttBackend): 'password': self.plugin.password, } - listeners = [{ - **self.server_info, - 'topics': [ - self.plugin.events_topic + '/node/' + topic - for topic in ['node_ready', 'node_sleep', 'node_value_updated', 'node_metadata_updated', 'node_wakeup'] - ], - }] + listeners = [ + { + **self.server_info, + 'topics': [ + self.plugin.events_topic + '/node/' + topic + for topic in [ + 'node_ready', + 'node_sleep', + 'node_value_updated', + 'node_metadata_updated', + 'node_wakeup', + ] + ], + } + ] - super().__init__(*args, subscribe_default_topic=False, listeners=listeners, client_id=client_id, **kwargs) + super().__init__( + *args, + subscribe_default_topic=False, + listeners=listeners, + client_id=client_id, + **kwargs, + ) if not client_id: self.client_id += '-zwavejs-mqtt' - def _dispatch_event(self, event_type: Type[ZwaveEvent], node: Optional[dict] = None, value: Optional[dict] = None, - **kwargs): - if value and 'id' not in value: - value_id = f"{value['commandClass']}-{value.get('endpoint', 0)}-{value['property']}" - if 'propertyKey' in value: - value_id += '-' + str(value['propertyKey']) + def _dispatch_event( + self, + event_type: Type[ZwaveEvent], + node: dict, + value: Optional[dict] = None, + **kwargs, + ): + node_id = node.get('id') + assert node_id is not None, 'No node ID specified' - if value_id not in node.get('values', {}): - self.logger.warning(f'value_id {value_id} not found on node {node["id"]}') + # This is far from efficient (we are querying the latest version of the whole + # node for every event we receive), but this is the best we can do with recent + # versions of ZWaveJS that only transmit partial representations of the node and + # the value. The alternative would be to come up with a complex logic for merging + # cached and new values, with the risk of breaking back-compatibility with earlier + # implementations of zwavejs2mqtt. + node = kwargs['node'] = self.plugin.get_nodes(node_id).output # type: ignore + node_values = node.get('values', {}) + + if node and value: + # Infer the value_id structure if it's not provided on the event + value_id = value.get('id') + if value_id is None: + value_id = f"{value['commandClass']}-{value.get('endpoint', 0)}-{value['property']}" + if 'propertyKey' in value: + value_id += '-' + str(value['propertyKey']) + + # Prepend the node_id to value_id if it's not available in node['values'] + # (compatibility with more recent versions of ZwaveJS that don't provide + # the value_id on the events) + if value_id not in node_values: + value_id = f"{node_id}-{value_id}" + + if value_id not in node_values: + self.logger.warning(f'value_id {value_id} not found on node {node_id}') return - value = node['values'][value_id] + value = kwargs['value'] = node_values[value_id] - if value: - kwargs['value'] = self.plugin.value_to_dict(value) - - if node: - kwargs['node'] = self.plugin.node_to_dict(node) - node_id = kwargs['node']['node_id'] - - if event_type == ZwaveNodeEvent: - if node_id not in self._nodes: - event_type = ZwaveNodeAddedEvent - elif kwargs['node']['name'] != self._nodes[node_id]['name']: - event_type = ZwaveNodeRenamedEvent + if event_type == ZwaveNodeEvent: + # If this node_id wasn't cached before, then it's a new node + if node_id not in self._nodes: + event_type = ZwaveNodeAddedEvent + # If the name has changed, we have a rename event + elif node['name'] != self._nodes[node_id]['name']: + event_type = ZwaveNodeRenamedEvent if event_type == ZwaveNodeRemovedEvent: self._nodes.pop(node_id, None) else: - self._nodes[node_id] = kwargs['node'] + self._nodes[node_id] = node evt = event_type(**kwargs) self._events_queue.put(evt) - # zwavejs2mqtt currently treats some values (e.g. binary switches) in an inconsistent way, - # using two values - a read-only value called currentValue that gets updated on the - # node_value_updated topic, and a writable value called targetValue that doesn't get updated - # (see https://github.com/zwave-js/zwavejs2mqtt/blob/4a6a5c5f1274763fd3aced4cae2c72ea060716b5/docs/guide/migrating.md). - # To properly manage updates on writable values, propagate an event for both. - if event_type == ZwaveValueChangedEvent and kwargs.get('value', {}).get('property_id') == 'currentValue': - value = kwargs['value'].copy() - target_value_id = f'{kwargs["node"]["node_id"]}-{value["command_class"]}-{value.get("endpoint", 0)}' \ - f'-targetValue' - kwargs['value'] = kwargs['node'].get('values', {}).get(target_value_id) + if value and event_type == ZwaveValueChangedEvent: + value = value.copy() - if kwargs['value']: - kwargs['value']['data'] = value['data'] - kwargs['node']['values'][target_value_id] = kwargs['value'] - evt = event_type(**kwargs) - self._events_queue.put(evt) + # zwavejs2mqtt currently treats some values (e.g. binary switches) in an inconsistent way, + # using two values - a read-only value called currentValue that gets updated on the + # node_value_updated topic, and a writable value called targetValue that doesn't get updated + # (see https://github.com/zwave-js/zwavejs2mqtt/blob/4a6a5c5f1274763fd3aced4cae2c72ea060716b5 \ + # /docs/guide/migrating.md). + # To properly manage updates on writable values, propagate an event for both. + if value.get('property_id') == 'currentValue': + target_value_id = ( + f'{node_id}-{value["command_class"]}-{value.get("endpoint", 0)}' + f'-targetValue' + ) + target_value = node_values.get(target_value_id) + + if target_value: + kwargs['value']['data'] = value['data'] + kwargs['node']['values'][target_value_id] = kwargs['value'] + evt = event_type(**kwargs) + self._events_queue.put(evt) def on_mqtt_message(self): def handler(_, __, msg): if not msg.topic.startswith(self.events_topic): return - topic = msg.topic[len(self.events_topic) + 1:].split('/').pop() + topic = ( + msg.topic[(len(self.events_topic) + 1) :].split('/').pop() # noqa: E203 + ) data = msg.payload.decode() if not data: return @@ -141,7 +192,9 @@ class ZwaveMqttBackend(MqttBackend): try: if topic == 'node_value_updated': - self._dispatch_event(ZwaveValueChangedEvent, node=data[0], value=data[1]) + self._dispatch_event( + ZwaveValueChangedEvent, node=data[0], value=data[1] + ) elif topic == 'node_metadata_updated': self._dispatch_event(ZwaveNodeEvent, node=data[0]) elif topic == 'node_sleep': @@ -160,7 +213,6 @@ class ZwaveMqttBackend(MqttBackend): def run(self): super().run() self.logger.debug('Refreshing Z-Wave nodes') - # noinspection PyUnresolvedReferences self._nodes = self.plugin.get_nodes().output while not self.should_stop():