diff --git a/platypush/backend/zwave/mqtt/__init__.py b/platypush/backend/zwave/mqtt/__init__.py index 7d38c8e18..5a5d426c3 100644 --- a/platypush/backend/zwave/mqtt/__init__.py +++ b/platypush/backend/zwave/mqtt/__init__.py @@ -1,3 +1,4 @@ +import contextlib import json from queue import Queue, Empty from typing import Optional, Type @@ -5,14 +6,24 @@ from typing import Optional, Type from platypush.backend.mqtt import MqttBackend from platypush.context import get_plugin -from platypush.message.event.zwave import ZwaveEvent, ZwaveNodeAddedEvent, ZwaveValueChangedEvent, \ - ZwaveNodeRemovedEvent, ZwaveNodeRenamedEvent, ZwaveNodeReadyEvent, ZwaveNodeEvent, ZwaveNodeAsleepEvent, \ - ZwaveNodeAwakeEvent +from platypush.message.event.zwave import ( + ZwaveEvent, + ZwaveNodeAddedEvent, + ZwaveValueChangedEvent, + ZwaveNodeRemovedEvent, + ZwaveNodeRenamedEvent, + ZwaveNodeReadyEvent, + ZwaveNodeEvent, + ZwaveNodeAsleepEvent, + ZwaveNodeAwakeEvent, +) class ZwaveMqttBackend(MqttBackend): """ Listen for events on a `zwavejs2mqtt `_ service. + For historical reasons, this should be enabled together with the ``zwave.mqtt`` plugin, + even though the actual configuration is only specified on the plugin. Triggers: @@ -41,6 +52,7 @@ class ZwaveMqttBackend(MqttBackend): """ from platypush.plugins.zwave.mqtt import ZwaveMqttPlugin + self.plugin: ZwaveMqttPlugin = get_plugin('zwave.mqtt') assert self.plugin, 'The zwave.mqtt plugin is not configured' @@ -61,27 +73,48 @@ class ZwaveMqttBackend(MqttBackend): 'password': self.plugin.password, } - listeners = [{ - **self.server_info, - 'topics': [ - self.plugin.events_topic + '/node/' + topic - for topic in ['node_ready', 'node_sleep', 'node_value_updated', 'node_metadata_updated', 'node_wakeup'] - ], - }] + listeners = [ + { + **self.server_info, + 'topics': [ + self.plugin.events_topic + '/node/' + topic + for topic in [ + 'node_ready', + 'node_sleep', + 'node_value_updated', + 'node_metadata_updated', + 'node_wakeup', + ] + ], + } + ] - super().__init__(*args, subscribe_default_topic=False, listeners=listeners, client_id=client_id, **kwargs) + super().__init__( + *args, + subscribe_default_topic=False, + listeners=listeners, + client_id=client_id, + **kwargs, + ) if not client_id: self.client_id += '-zwavejs-mqtt' - def _dispatch_event(self, event_type: Type[ZwaveEvent], node: Optional[dict] = None, value: Optional[dict] = None, - **kwargs): + def _dispatch_event( + self, + event_type: Type[ZwaveEvent], + node: Optional[dict] = None, + value: Optional[dict] = None, + **kwargs, + ): if value and 'id' not in value: value_id = f"{value['commandClass']}-{value.get('endpoint', 0)}-{value['property']}" if 'propertyKey' in value: value_id += '-' + str(value['propertyKey']) if value_id not in node.get('values', {}): - self.logger.warning(f'value_id {value_id} not found on node {node["id"]}') + self.logger.warning( + f'value_id {value_id} not found on node {node["id"]}' + ) return value = node['values'][value_id] @@ -107,41 +140,47 @@ class ZwaveMqttBackend(MqttBackend): evt = event_type(**kwargs) self._events_queue.put(evt) - # zwavejs2mqtt currently treats some values (e.g. binary switches) in an inconsistent way, - # using two values - a read-only value called currentValue that gets updated on the - # node_value_updated topic, and a writable value called targetValue that doesn't get updated - # (see https://github.com/zwave-js/zwavejs2mqtt/blob/4a6a5c5f1274763fd3aced4cae2c72ea060716b5/docs/guide/migrating.md). - # To properly manage updates on writable values, propagate an event for both. - if event_type == ZwaveValueChangedEvent and kwargs.get('value', {}).get('property_id') == 'currentValue': - value = kwargs['value'].copy() - target_value_id = f'{kwargs["node"]["node_id"]}-{value["command_class"]}-{value.get("endpoint", 0)}' \ - f'-targetValue' - kwargs['value'] = kwargs['node'].get('values', {}).get(target_value_id) + if event_type == ZwaveValueChangedEvent: + # zwavejs2mqtt currently treats some values (e.g. binary switches) in an inconsistent way, + # using two values - a read-only value called currentValue that gets updated on the + # node_value_updated topic, and a writable value called targetValue that doesn't get updated + # (see https://github.com/zwave-js/zwavejs2mqtt/blob/4a6a5c5f1274763fd3aced4cae2c72ea060716b5 \ + # /docs/guide/migrating.md). + # To properly manage updates on writable values, propagate an event for both. + if kwargs.get('value', {}).get('property_id') == 'currentValue': + value = kwargs['value'].copy() + target_value_id = ( + f'{kwargs["node"]["node_id"]}-{value["command_class"]}-{value.get("endpoint", 0)}' + f'-targetValue' + ) + kwargs['value'] = kwargs['node'].get('values', {}).get(target_value_id) - if kwargs['value']: - kwargs['value']['data'] = value['data'] - kwargs['node']['values'][target_value_id] = kwargs['value'] - evt = event_type(**kwargs) - self._events_queue.put(evt) + if kwargs['value']: + kwargs['value']['data'] = value['data'] + kwargs['node']['values'][target_value_id] = kwargs['value'] + evt = event_type(**kwargs) + self._events_queue.put(evt) + + self.plugin.publish_entities([kwargs['value']]) # type: ignore def on_mqtt_message(self): def handler(_, __, msg): if not msg.topic.startswith(self.events_topic): return - topic = msg.topic[len(self.events_topic) + 1:].split('/').pop() + topic = msg.topic[len(self.events_topic) + 1 :].split('/').pop() data = msg.payload.decode() if not data: return - try: + with contextlib.suppress(ValueError, TypeError): data = json.loads(data)['data'] - except (ValueError, TypeError): - pass try: if topic == 'node_value_updated': - self._dispatch_event(ZwaveValueChangedEvent, node=data[0], value=data[1]) + self._dispatch_event( + ZwaveValueChangedEvent, node=data[0], value=data[1] + ) elif topic == 'node_metadata_updated': self._dispatch_event(ZwaveNodeEvent, node=data[0]) elif topic == 'node_sleep': diff --git a/platypush/plugins/zwave/mqtt/__init__.py b/platypush/plugins/zwave/mqtt/__init__.py index 772a046df..4c1bbab14 100644 --- a/platypush/plugins/zwave/mqtt/__init__.py +++ b/platypush/plugins/zwave/mqtt/__init__.py @@ -22,6 +22,10 @@ class ZwaveMqttPlugin(MqttPlugin, ZwaveBasePlugin): This plugin allows you to manage a Z-Wave network over MQTT through `zwavejs2mqtt `_. + For historical reasons, it is advised to enabled this plugin together + with the ``zwave.mqtt`` backend, or you may lose the ability to listen + to asynchronous events. + Configuration required on the zwavejs2mqtt gateway: * Install the gateway following the instructions reported @@ -1124,21 +1128,21 @@ class ZwaveMqttPlugin(MqttPlugin, ZwaveBasePlugin): ) @action - def set_value_label(self, **kwargs): + def set_value_label(self, **_): """ Change the label/name of a value (not implemented by zwavejs2mqtt). """ raise _NOT_IMPLEMENTED_ERR @action - def node_add_value(self, **kwargs): + def node_add_value(self, **_): """ Add a value to a node (not implemented by zwavejs2mqtt). """ raise _NOT_IMPLEMENTED_ERR @action - def node_remove_value(self, **kwargs): + def node_remove_value(self, **_): """ Remove a value from a node (not implemented by zwavejs2mqtt). """ @@ -1492,7 +1496,7 @@ class ZwaveMqttPlugin(MqttPlugin, ZwaveBasePlugin): return self._groups_cache @action - def get_scenes(self, **kwargs) -> Dict[int, Dict[str, Any]]: + def get_scenes(self, **_) -> Dict[int, Dict[str, Any]]: """ Get the scenes configured on the network. @@ -1528,7 +1532,7 @@ class ZwaveMqttPlugin(MqttPlugin, ZwaveBasePlugin): } @action - def create_scene(self, label: str, **kwargs): + def create_scene(self, label: str, **_): """ Create a new scene. @@ -1788,7 +1792,7 @@ class ZwaveMqttPlugin(MqttPlugin, ZwaveBasePlugin): ) @action - def create_new_primary(self, **kwargs): + def create_new_primary(self, **_): """ Create a new primary controller on the network when the previous primary fails (not implemented by zwavejs2mqtt). @@ -1799,7 +1803,7 @@ class ZwaveMqttPlugin(MqttPlugin, ZwaveBasePlugin): raise _NOT_IMPLEMENTED_ERR @action - def hard_reset(self, **kwargs): + def hard_reset(self, **_): """ Perform a hard reset of the controller. It erases its network configuration settings. The controller becomes a primary controller ready to add devices to a new network. @@ -1810,7 +1814,7 @@ class ZwaveMqttPlugin(MqttPlugin, ZwaveBasePlugin): self._api_request('hardReset') @action - def soft_reset(self, **kwargs): + def soft_reset(self, **_): """ Perform a soft reset of the controller. Resets a controller without erasing its network configuration settings (not implemented by zwavejs2). @@ -1821,7 +1825,7 @@ class ZwaveMqttPlugin(MqttPlugin, ZwaveBasePlugin): raise _NOT_IMPLEMENTED_ERR @action - def write_config(self, **kwargs): + def write_config(self, **_): """ Store the current configuration of the network to the user directory (not implemented by zwavejs2mqtt). @@ -1831,7 +1835,7 @@ class ZwaveMqttPlugin(MqttPlugin, ZwaveBasePlugin): raise _NOT_IMPLEMENTED_ERR @action - def on(self, device: str, *args, **kwargs): + def on(self, device: str, *_, **kwargs): """ Turn on a switch on a device. @@ -1842,7 +1846,7 @@ class ZwaveMqttPlugin(MqttPlugin, ZwaveBasePlugin): self.set_value(data=True, id_on_network=device, **kwargs) @action - def off(self, device: str, *args, **kwargs): + def off(self, device: str, *_, **kwargs): """ Turn off a switch on a device. @@ -1853,7 +1857,7 @@ class ZwaveMqttPlugin(MqttPlugin, ZwaveBasePlugin): self.set_value(data=False, id_on_network=device, **kwargs) @action - def toggle(self, device: str, *args, **kwargs) -> dict: + def toggle(self, device: str, *_, **kwargs) -> dict: """ Toggle a switch on a device.