From 8aff1819564a55bbf540f8f6286673c01294de8b Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Sun, 29 Jan 2023 02:34:48 +0100 Subject: [PATCH] Merged `zwave.mqtt` backend into the `zwave.mqtt` plugin --- platypush/backend/zigbee/mqtt/__init__.py | 7 +- platypush/backend/zwave/mqtt/__init__.py | 226 ++------------------- platypush/entities/_registry.py | 6 +- platypush/plugins/zigbee/mqtt/__init__.py | 75 +++---- platypush/plugins/zwave/mqtt/__init__.py | 63 +++--- platypush/plugins/zwave/mqtt/_listener.py | 197 ++++++++++++++++++ platypush/plugins/zwave/mqtt/manifest.yaml | 18 +- 7 files changed, 316 insertions(+), 276 deletions(-) create mode 100644 platypush/plugins/zwave/mqtt/_listener.py diff --git a/platypush/backend/zigbee/mqtt/__init__.py b/platypush/backend/zigbee/mqtt/__init__.py index 983fbe65e..823d0a840 100644 --- a/platypush/backend/zigbee/mqtt/__init__.py +++ b/platypush/backend/zigbee/mqtt/__init__.py @@ -21,9 +21,10 @@ class ZigbeeMqttBackend(Backend): super().run() warnings.warn( ''' - The zigbee.mqtt has been merged into the zigbee.mqtt plugin. It is - now deprecated and it will be removed in a future version. Remove - any references to it from your configuration. + The zigbee.mqtt backend has been merged into the zigbee.mqtt + plugin. It is now deprecated and it will be removed in a future + version. + Please remove any references to it from your configuration. ''', DeprecationWarning, ) diff --git a/platypush/backend/zwave/mqtt/__init__.py b/platypush/backend/zwave/mqtt/__init__.py index a00b4aeee..9bf93da5c 100644 --- a/platypush/backend/zwave/mqtt/__init__.py +++ b/platypush/backend/zwave/mqtt/__init__.py @@ -1,222 +1,34 @@ -import contextlib -import json -from queue import Queue, Empty -from typing import Optional, Type +import warnings -from platypush.backend.mqtt import MqttBackend -from platypush.context import get_plugin - -from platypush.config import Config -from platypush.message.event.zwave import ( - ZwaveEvent, - ZwaveNodeAddedEvent, - ZwaveValueChangedEvent, - ZwaveNodeRemovedEvent, - ZwaveNodeRenamedEvent, - ZwaveNodeReadyEvent, - ZwaveNodeEvent, - ZwaveNodeAsleepEvent, - ZwaveNodeAwakeEvent, - ZwaveValueRemovedEvent, -) +from platypush.backend import Backend -class ZwaveMqttBackend(MqttBackend): +class ZwaveMqttBackend(Backend): """ - Listen for events on a `zwave-js-ui `_ - service. For historical reasons, this should be enabled together with the - ``zwave.mqtt`` plugin, even though the actual configuration is only - specified on the plugin. For this reason, this backend will be deprecated in - the near future and merged with its associated plugin. + Listen for events on a zwave2mqtt service. - Triggers: + **WARNING**: This backend is **DEPRECATED** and it will be removed in a + future version. - * :class:`platypush.message.event.zwave.ZwaveNodeEvent` when a node attribute changes. - * :class:`platypush.message.event.zwave.ZwaveNodeAddedEvent` when a node is added to the network. - * :class:`platypush.message.event.zwave.ZwaveNodeRemovedEvent` when a node is removed from the network. - * :class:`platypush.message.event.zwave.ZwaveNodeRenamedEvent` when a node is renamed. - * :class:`platypush.message.event.zwave.ZwaveNodeReadyEvent` when a node is ready. - * :class:`platypush.message.event.zwave.ZwaveValueChangedEvent` when the value of a node on the network - changes. - * :class:`platypush.message.event.zwave.ZwaveNodeAsleepEvent` when a node goes into sleep mode. - * :class:`platypush.message.event.zwave.ZwaveNodeAwakeEvent` when a node goes back into awake mode. - - Requires: - - * **paho-mqtt** (``pip install paho-mqtt``) - * A `zwave-js-ui instance `_. - * The :class:`platypush.plugins.zwave.mqtt.ZwaveMqttPlugin` plugin configured. + It has been merged with + :class:`platypush.plugins.zwave.mqtt.ZwaveMqttPlugin`. + Now you can simply configure the `zwave.mqtt` plugin in order to enable + the Zwave integration - no need to enable both the plugin and the backend. """ - def __init__(self, client_id: Optional[str] = None, *args, **kwargs): - """ - :param client_id: MQTT client ID (default: ``-zwavejs-mqtt``, to prevent clashes with the - :class:`platypush.backend.mqtt.MqttBackend` ``client_id``. - """ - - from platypush.plugins.zwave.mqtt import ZwaveMqttPlugin - - plugin: Optional[ZwaveMqttPlugin] = get_plugin('zwave.mqtt') - assert plugin, 'The zwave.mqtt plugin is not configured' - self.plugin = plugin - - self._nodes = {} - self._groups = {} - self._last_state = None - self._events_queue = Queue() - self.events_topic = self.plugin.events_topic - self.server_info = { - 'host': self.plugin.host, - 'port': self.plugin.port or self._default_mqtt_port, - 'tls_cafile': self.plugin.tls_cafile, - 'tls_certfile': self.plugin.tls_certfile, - 'tls_ciphers': self.plugin.tls_ciphers, - 'tls_keyfile': self.plugin.tls_keyfile, - 'tls_version': self.plugin.tls_version, - 'username': self.plugin.username, - 'password': self.plugin.password, - } - - listeners = [ - { - **self.server_info, - 'topics': [ - self.plugin.events_topic + '/node/' + topic - for topic in [ - 'node_ready', - 'node_sleep', - 'node_value_updated', - 'node_metadata_updated', - 'node_wakeup', - ] - ], - } - ] - - super().__init__( - *args, - subscribe_default_topic=False, - listeners=listeners, - client_id=client_id, - **kwargs, - ) - if not client_id: - self.client_id = ( - str(self.client_id or Config.get('device_id')) + '-zwavejs-mqtt' - ) - - def _dispatch_event( - self, - event_type: Type[ZwaveEvent], - node: dict, - value: Optional[dict] = None, - **kwargs, - ): - node_id = node.get('id') - assert node_id is not None, 'No node ID specified' - - # This is far from efficient (we are querying the latest version of the whole - # node for every event we receive), but this is the best we can do with recent - # versions of ZWaveJS that only transmit partial representations of the node and - # the value. The alternative would be to come up with a complex logic for merging - # cached and new values, with the risk of breaking back-compatibility with earlier - # implementations of zwavejs2mqtt. - node = kwargs['node'] = self.plugin.get_nodes(node_id).output # type: ignore - node_values = node.get('values', {}) - - if node and value: - # Infer the value_id structure if it's not provided on the event - value_id = value.get('id') - if value_id is None: - value_id = f"{value['commandClass']}-{value.get('endpoint', 0)}-{value['property']}" - if 'propertyKey' in value: - value_id += '-' + str(value['propertyKey']) - - # Prepend the node_id to value_id if it's not available in node['values'] - # (compatibility with more recent versions of ZwaveJS that don't provide - # the value_id on the events) - if value_id not in node_values: - value_id = f"{node_id}-{value_id}" - - if value_id not in node_values: - self.logger.warning(f'value_id {value_id} not found on node {node_id}') - return - - value = kwargs['value'] = node_values[value_id] - - if issubclass(event_type, ZwaveNodeEvent): - # If the node has been removed, remove it from the cache - if event_type == ZwaveNodeRemovedEvent: - self._nodes.pop(node_id, None) - # If this node_id wasn't cached before, then it's a new node - elif node_id not in self._nodes: - event_type = ZwaveNodeAddedEvent - # If the name has changed, we have a rename event - elif node['name'] != self._nodes[node_id]['name']: - event_type = ZwaveNodeRenamedEvent - # If nothing relevant has changed, update the cached instance and return - else: - self._nodes[node_id] = node - return - - evt = event_type(**kwargs) - self._events_queue.put(evt) - - if ( - value - and issubclass(event_type, ZwaveValueChangedEvent) - and event_type != ZwaveValueRemovedEvent - ): - self.plugin.publish_entities([kwargs['value']]) # type: ignore - - def on_mqtt_message(self): - def handler(_, __, msg): - if not msg.topic.startswith(self.events_topic): - return - - topic = ( - msg.topic[(len(self.events_topic) + 1) :].split('/').pop() # noqa: E203 - ) - data = msg.payload.decode() - if not data: - return - - with contextlib.suppress(ValueError, TypeError): - data = json.loads(data)['data'] - - try: - if topic == 'node_value_updated': - self._dispatch_event( - ZwaveValueChangedEvent, node=data[0], value=data[1] - ) - elif topic == 'node_metadata_updated': - self._dispatch_event(ZwaveNodeEvent, node=data[0]) - elif topic == 'node_sleep': - self._dispatch_event(ZwaveNodeAsleepEvent, node=data[0]) - elif topic == 'node_wakeup': - self._dispatch_event(ZwaveNodeAwakeEvent, node=data[0]) - elif topic == 'node_ready': - self._dispatch_event(ZwaveNodeReadyEvent, node=data[0]) - elif topic == 'node_removed': - self._dispatch_event(ZwaveNodeRemovedEvent, node=data[0]) - except Exception as e: - self.logger.exception(e) - - return handler - def run(self): super().run() - self.logger.debug('Refreshing Z-Wave nodes') - self._nodes = self.plugin.get_nodes().output # type: ignore + warnings.warn( + ''' + The zwave.mqtt backend has been merged into the zwave.mqtt plugin. + It is now deprecated and it will be removed in a future version. + Please remove any references to it from your configuration. + ''', + DeprecationWarning, + ) - while not self.should_stop(): - try: - evt = self._events_queue.get(block=True, timeout=1) - except Empty: - continue - - self.bus.post(evt) + self.wait_stop() # vim:sw=4:ts=4:et: diff --git a/platypush/entities/_registry.py b/platypush/entities/_registry.py index 1bee049d4..baaeabc95 100644 --- a/platypush/entities/_registry.py +++ b/platypush/entities/_registry.py @@ -106,8 +106,8 @@ class EntityManagerMixin: """ from . import publish_entities - entities = self.transform_entities(entities) - publish_entities(entities) + transformed_entities = self.transform_entities(entities) + publish_entities(transformed_entities) def manages(*entities: Type[Entity]): @@ -125,7 +125,7 @@ def manages(*entities: Type[Entity]): init(self, *args, **kwargs) - plugin.__init__ = __init__ + plugin.__init__ = __init__ # type: ignore # Inject the EntityManagerMixin if EntityManagerMixin not in plugin.__bases__: plugin.__bases__ = (EntityManagerMixin,) + plugin.__bases__ diff --git a/platypush/plugins/zigbee/mqtt/__init__.py b/platypush/plugins/zigbee/mqtt/__init__.py index c00747c39..0501bbc23 100644 --- a/platypush/plugins/zigbee/mqtt/__init__.py +++ b/platypush/plugins/zigbee/mqtt/__init__.py @@ -3,7 +3,15 @@ import re import threading from queue import Queue -from typing import Optional, List, Any, Dict, Type, Union, Tuple +from typing import ( + Any, + Dict, + List, + Optional, + Tuple, + Type, + Union, +) from platypush.entities import Entity, manages from platypush.entities.batteries import Battery @@ -210,7 +218,7 @@ class ZigbeeMqttPlugin(RunnablePlugin, MqttPlugin): # lgtm [py/missing-call-to- self.base_topic = base_topic self.timeout = timeout - self._info = { + self._info: Dict[str, dict] = { 'devices_by_addr': {}, 'devices_by_name': {}, 'groups': {}, @@ -346,7 +354,7 @@ class ZigbeeMqttPlugin(RunnablePlugin, MqttPlugin): # lgtm [py/missing-call-to- def _get_device_url(device_info: dict) -> Optional[str]: model = device_info.get('definition', {}).get('model') if not model: - return + return None return f'https://www.zigbee2mqtt.io/devices/{model}.html' @@ -354,7 +362,7 @@ class ZigbeeMqttPlugin(RunnablePlugin, MqttPlugin): # lgtm [py/missing-call-to- def _get_image_url(device_info: dict) -> Optional[str]: model = device_info.get('definition', {}).get('model') if not model: - return + return None return f'https://www.zigbee2mqtt.io/images/devices/{model}.jpg' @@ -366,7 +374,7 @@ class ZigbeeMqttPlugin(RunnablePlugin, MqttPlugin): # lgtm [py/missing-call-to- if 'timeout' in mqtt_args: timeout = mqtt_args.pop('timeout') - info = { + info: Dict[str, Any] = { 'state': None, 'info': {}, 'config': {}, @@ -374,7 +382,7 @@ class ZigbeeMqttPlugin(RunnablePlugin, MqttPlugin): # lgtm [py/missing-call-to- 'groups': [], } - info_ready_events = {topic: threading.Event() for topic in info.keys()} + info_ready_events = {topic: threading.Event() for topic in info} def _on_message(): def callback(_, __, msg): @@ -426,9 +434,7 @@ class ZigbeeMqttPlugin(RunnablePlugin, MqttPlugin): # lgtm [py/missing-call-to- client.loop_stop() client.disconnect() except Exception as e: - self.logger.warning( - 'Error on MQTT client disconnection: {}'.format(str(e)) - ) + self.logger.warning('Error on MQTT client disconnection: %s', e) return info @@ -438,12 +444,12 @@ class ZigbeeMqttPlugin(RunnablePlugin, MqttPlugin): # lgtm [py/missing-call-to- @staticmethod def _parse_response(response: Union[dict, Response]) -> dict: if isinstance(response, Response): - response = response.output # type: ignore[reportGeneralTypeIssues] + response = dict(response.output) - assert response.get('status') != 'error', response.get( # type: ignore[reportGeneralTypeIssues] + assert response.get('status') != 'error', response.get( 'error', 'zigbee2mqtt error' ) - return response # type: ignore[reportGeneralTypeIssues] + return response @action def devices(self, **kwargs) -> List[Dict[str, Any]]: @@ -784,10 +790,10 @@ class ZigbeeMqttPlugin(RunnablePlugin, MqttPlugin): # lgtm [py/missing-call-to- devices = self.devices().output # type: ignore[reportGeneralTypeIssues] assert not [ dev for dev in devices if dev.get('friendly_name') == name - ], 'A device named {} already exists on the network'.format(name) + ], f'A device named {name} already exists on the network' if device: - req = { + req: Dict[str, Any] = { 'from': device, 'to': name, } @@ -807,7 +813,7 @@ class ZigbeeMqttPlugin(RunnablePlugin, MqttPlugin): # lgtm [py/missing-call-to- ) @staticmethod - def _build_device_get_request(values: List[Dict[str, Any]]) -> dict: + def _build_device_get_request(values: List[Dict[str, Any]]) -> Dict[str, Any]: def extract_value(value: dict, root: dict, depth: int = 0): for feature in value.get('features', []): new_root = root @@ -829,7 +835,7 @@ class ZigbeeMqttPlugin(RunnablePlugin, MqttPlugin): # lgtm [py/missing-call-to- root[value['property']] = root.get(value['property'], {}) root = root[value['property']] - ret = {} + ret: Dict[str, Any] = {} for value in values: extract_value(value, root=ret) @@ -954,7 +960,7 @@ class ZigbeeMqttPlugin(RunnablePlugin, MqttPlugin): # lgtm [py/missing-call-to- def worker(device: str, q: Queue): q.put(self.device_get(device, **kwargs).output) # type: ignore[reportGeneralTypeIssues] - queues = {} + queues: Dict[str, Queue] = {} workers = {} response = {} @@ -971,15 +977,15 @@ class ZigbeeMqttPlugin(RunnablePlugin, MqttPlugin): # lgtm [py/missing-call-to- workers[device].join(timeout=kwargs.get('timeout')) except Exception as e: self.logger.warning( - 'An error occurred while getting the status of the device {}: {}'.format( - device, str(e) - ) + 'An error occurred while getting the status of the device %s: %s', + device, + e, ) return response @action - def status(self, device: Optional[str] = None, *args, **kwargs): + def status(self, *args, device: Optional[str] = None, **kwargs): """ Get the status of a device (by friendly name) or of all the connected devices (it wraps :meth:`.devices_get`). @@ -1369,9 +1375,7 @@ class ZigbeeMqttPlugin(RunnablePlugin, MqttPlugin): # lgtm [py/missing-call-to- for group in self.groups().output # type: ignore[reportGeneralTypeIssues] } - assert ( - name not in groups - ), 'A group named {} already exists on the network'.format(name) + assert name not in groups, f'A group named {name} already exists on the network' return self._parse_response( self.publish( # type: ignore[reportGeneralTypeIssues] @@ -1433,17 +1437,14 @@ class ZigbeeMqttPlugin(RunnablePlugin, MqttPlugin): # lgtm [py/missing-call-to- :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` (default: query the default configured device). """ + remove_suffix = '_all' if device is None else '' return self._parse_response( self.publish( # type: ignore[reportGeneralTypeIssues] topic=self._topic( - 'bridge/request/group/members/remove{}'.format( - '_all' if device is None else '' - ) + f'bridge/request/group/members/remove{remove_suffix}' ), reply_topic=self._topic( - 'bridge/response/group/members/remove{}'.format( - '_all' if device is None else '' - ) + f'bridge/response/group/members/remove{remove_suffix}' ), msg={ 'group': group, @@ -1505,9 +1506,9 @@ class ZigbeeMqttPlugin(RunnablePlugin, MqttPlugin): # lgtm [py/missing-call-to- Turn on/set to true a switch, a binary property or an option. """ device, prop_info = self._get_switch_info(device) - self.device_set( + return self.device_set( device, prop_info['property'], prop_info.get('value_on', 'ON') - ).output # type: ignore[reportGeneralTypeIssues] + ) @action def off(self, device, *_, **__): @@ -1515,9 +1516,9 @@ class ZigbeeMqttPlugin(RunnablePlugin, MqttPlugin): # lgtm [py/missing-call-to- Turn off/set to false a switch, a binary property or an option. """ device, prop_info = self._get_switch_info(device) - self.device_set( + return self.device_set( device, prop_info['property'], prop_info.get('value_off', 'OFF') - ).output # type: ignore[reportGeneralTypeIssues] + ) @action def toggle(self, device, *_, **__): @@ -1527,7 +1528,7 @@ class ZigbeeMqttPlugin(RunnablePlugin, MqttPlugin): # lgtm [py/missing-call-to- device, prop_info = self._get_switch_info(device) prop = prop_info['property'] device_state = self.device_get(device).output # type: ignore - self.device_set( + return self.device_set( device, prop, prop_info.get( @@ -1536,7 +1537,7 @@ class ZigbeeMqttPlugin(RunnablePlugin, MqttPlugin): # lgtm [py/missing-call-to- if device_state.get(prop) == prop_info.get('value_on', 'ON') else 'ON', ), - ).output # type: ignore[reportGeneralTypeIssues] + ) def _get_switch_info(self, name: str) -> Tuple[str, dict]: name, prop = self._ieee_address(name, with_property=True) @@ -1924,6 +1925,8 @@ class ZigbeeMqttPlugin(RunnablePlugin, MqttPlugin): # lgtm [py/missing-call-to- listener = ZigbeeMqttListener() listener.start() self.wait_stop() + + listener.stop() listener.join() diff --git a/platypush/plugins/zwave/mqtt/__init__.py b/platypush/plugins/zwave/mqtt/__init__.py index 14d8a5940..1334dca12 100644 --- a/platypush/plugins/zwave/mqtt/__init__.py +++ b/platypush/plugins/zwave/mqtt/__init__.py @@ -36,8 +36,9 @@ from platypush.entities.switches import EnumSwitch, Switch from platypush.entities.temperature import TemperatureSensor from platypush.message.event.zwave import ZwaveNodeRenamedEvent, ZwaveNodeEvent -from platypush.context import get_backend, get_bus +from platypush.context import get_bus from platypush.message.response import Response +from platypush.plugins import RunnablePlugin from platypush.plugins.mqtt import MqttPlugin, action from platypush.plugins.zwave._base import ZwaveBasePlugin from platypush.plugins.zwave._constants import command_class_by_name @@ -45,7 +46,7 @@ from platypush.plugins.zwave._constants import command_class_by_name _NOT_IMPLEMENTED_ERR = NotImplementedError('Not implemented by zwave.mqtt') -class ZwaveMqttPlugin(MqttPlugin, ZwaveBasePlugin): +class ZwaveMqttPlugin(MqttPlugin, RunnablePlugin, ZwaveBasePlugin): """ This plugin allows you to manage a Z-Wave network over MQTT through `zwave-js-ui `_. @@ -76,6 +77,18 @@ class ZwaveMqttPlugin(MqttPlugin, ZwaveBasePlugin): * **paho-mqtt** (``pip install paho-mqtt``) + Triggers: + + * :class:`platypush.message.event.zwave.ZwaveNodeEvent` when a node attribute changes. + * :class:`platypush.message.event.zwave.ZwaveNodeAddedEvent` when a node is added to the network. + * :class:`platypush.message.event.zwave.ZwaveNodeRemovedEvent` when a node is removed from the network. + * :class:`platypush.message.event.zwave.ZwaveNodeRenamedEvent` when a node is renamed. + * :class:`platypush.message.event.zwave.ZwaveNodeReadyEvent` when a node is ready. + * :class:`platypush.message.event.zwave.ZwaveValueChangedEvent` when the value of a node on the network + changes. + * :class:`platypush.message.event.zwave.ZwaveNodeAsleepEvent` when a node goes into sleep mode. + * :class:`platypush.message.event.zwave.ZwaveNodeAwakeEvent` when a node goes back into awake mode. + """ # These classes are ignored by the entity parsing logic @@ -156,40 +169,30 @@ class ZwaveMqttPlugin(MqttPlugin, ZwaveBasePlugin): self.base_topic = topic_prefix + '/{}/ZWAVE_GATEWAY-' + name self.events_topic = self.base_topic.format('_EVENTS') self.timeout = timeout - self._info = { + self._info: Mapping[str, dict] = { 'devices': {}, 'groups': {}, } - self._nodes_cache = { + self._nodes_cache: Dict[str, dict] = { 'by_id': {}, 'by_name': {}, } - self._values_cache = { + self._values_cache: Dict[str, dict] = { 'by_id': {}, 'by_label': {}, } - self._scenes_cache = { + self._scenes_cache: Dict[str, dict] = { 'by_id': {}, 'by_label': {}, } - self._groups_cache = {} - - @staticmethod - def _get_backend(): - backend = get_backend('zwave.mqtt') - if not backend: - raise AssertionError('zwave.mqtt backend not configured') - return backend + self._groups_cache: Dict[str, dict] = {} def _api_topic(self, api: str) -> str: - return self.base_topic.format('_CLIENTS') + '/api/{}'.format(api) - - def _topic(self, topic): - return self.base_topic + '/' + topic + return self.base_topic.format('_CLIENTS') + f'/api/{api}' @staticmethod def _parse_response(response: Union[dict, Response]) -> dict: @@ -222,6 +225,7 @@ class ZwaveMqttPlugin(MqttPlugin, ZwaveBasePlugin): def _convert_timestamp(t: Optional[int]) -> Optional[datetime]: if t: return datetime.fromtimestamp(t / 1000) + return None def _get_scene( self, @@ -375,7 +379,7 @@ class ZwaveMqttPlugin(MqttPlugin, ZwaveBasePlugin): 'device_id': device_id.replace('0x', ''), 'name': node.get('name'), 'capabilities': capabilities, - 'manufacturer_id': '0x{:04x}'.format(node['manufacturerId']) + 'manufacturer_id': f'0x{node["manufacturerId"]:04x}' if node.get('manufacturerId') else None, 'manufacturer_name': node.get('manufacturer'), @@ -395,7 +399,7 @@ class ZwaveMqttPlugin(MqttPlugin, ZwaveBasePlugin): 'is_security_device': node.get('supportsSecurity'), 'is_sleeping': node.get('ready') and node.get('status') == 'Asleep', 'last_update': cls._convert_timestamp(node.get('lastActive')), - 'product_id': '0x{:04x}'.format(node['productId']) + 'product_id': f'0x{node["productId"]:04x}' if node.get('productId') else None, 'product_type': '0x{:04x}'.format(node['productType']) @@ -785,7 +789,7 @@ class ZwaveMqttPlugin(MqttPlugin, ZwaveBasePlugin): product_type = node.get('product_type') firmware_version = node.get('firmware_version', '0.0') if not (manufacturer_id and product_id and product_type): - return + return None return ( 'https://devices.zwave-js.io/?jumpTo=' @@ -849,9 +853,6 @@ class ZwaveMqttPlugin(MqttPlugin, ZwaveBasePlugin): return list(new_values.values()) - def _topic_by_value_id(self, value_id: str) -> str: - return self.topic_prefix + '/' + '/'.join(value_id.split('-')) - def _filter_values( self, command_classes: Optional[Iterable[str]] = None, # type: ignore[reportGeneralTypeIssues] @@ -1080,7 +1081,7 @@ class ZwaveMqttPlugin(MqttPlugin, ZwaveBasePlugin): self._api_request('refreshInfo', node_id, **kwargs) @action - def request_node_neighbour_update(self, **kwargs): + def request_node_neighbour_update(self, *_, **kwargs): """ Request a neighbours list update. @@ -1271,7 +1272,7 @@ class ZwaveMqttPlugin(MqttPlugin, ZwaveBasePlugin): return nodes @action - def get_node_stats(self, **_): + def get_node_stats(self, *_, **__): """ Get the statistics of a node on the network (not implemented by zwavejs2mqtt). """ @@ -2286,5 +2287,15 @@ class ZwaveMqttPlugin(MqttPlugin, ZwaveBasePlugin): for dev in devices ] + def main(self): + from ._listener import ZwaveMqttListener + + listener = ZwaveMqttListener() + listener.start() + self.wait_stop() + + listener.stop() + listener.join() + # vim:sw=4:ts=4:et: diff --git a/platypush/plugins/zwave/mqtt/_listener.py b/platypush/plugins/zwave/mqtt/_listener.py new file mode 100644 index 000000000..d85f76cba --- /dev/null +++ b/platypush/plugins/zwave/mqtt/_listener.py @@ -0,0 +1,197 @@ +import contextlib +import json +from queue import Queue, Empty +from typing import Optional, Type + +from platypush.backend.mqtt import MqttBackend +from platypush.context import get_bus, get_plugin + +from platypush.config import Config +from platypush.message.event.zwave import ( + ZwaveEvent, + ZwaveNodeAddedEvent, + ZwaveValueChangedEvent, + ZwaveNodeRemovedEvent, + ZwaveNodeRenamedEvent, + ZwaveNodeReadyEvent, + ZwaveNodeEvent, + ZwaveNodeAsleepEvent, + ZwaveNodeAwakeEvent, + ZwaveValueRemovedEvent, +) + + +class ZwaveMqttListener(MqttBackend): + """ + Internal MQTT listener for ``zwave.mqtt`` events. + """ + + def __init__(self, *args, **kwargs): + self._nodes = {} + self._groups = {} + self._last_state = None + self._events_queue = Queue() + self.events_topic = self.plugin.events_topic + self.server_info = { + 'host': self.plugin.host, + 'port': self.plugin.port or self._default_mqtt_port, + 'tls_cafile': self.plugin.tls_cafile, + 'tls_certfile': self.plugin.tls_certfile, + 'tls_ciphers': self.plugin.tls_ciphers, + 'tls_keyfile': self.plugin.tls_keyfile, + 'tls_version': self.plugin.tls_version, + 'username': self.plugin.username, + 'password': self.plugin.password, + } + + listeners = [ + { + **self.server_info, + 'topics': [ + self.plugin.events_topic + '/node/' + topic + for topic in [ + 'node_ready', + 'node_sleep', + 'node_value_updated', + 'node_metadata_updated', + 'node_wakeup', + ] + ], + } + ] + + super().__init__( + *args, + subscribe_default_topic=False, + listeners=listeners, + **kwargs, + ) + + self.client_id = ( + str(self.client_id or Config.get('device_id')) + '-zwavejs-mqtt' + ) + + @property + def plugin(self): + from platypush.plugins.zwave.mqtt import ZwaveMqttPlugin + + plugin: Optional[ZwaveMqttPlugin] = get_plugin('zwave.mqtt') + assert plugin, 'The zwave.mqtt plugin is not configured' + return plugin + + def _dispatch_event( + self, + event_type: Type[ZwaveEvent], + node: dict, + value: Optional[dict] = None, + **kwargs, + ): + node_id = node.get('id') + assert node_id is not None, 'No node ID specified' + + # This is far from efficient (we are querying the latest version of the whole + # node for every event we receive), but this is the best we can do with recent + # versions of ZWaveJS that only transmit partial representations of the node and + # the value. The alternative would be to come up with a complex logic for merging + # cached and new values, with the risk of breaking back-compatibility with earlier + # implementations of zwavejs2mqtt. + node = kwargs['node'] = self.plugin.get_nodes(node_id).output # type: ignore + node_values = node.get('values', {}) + + if node and value: + # Infer the value_id structure if it's not provided on the event + value_id = value.get('id') + if value_id is None: + value_id = f"{value['commandClass']}-{value.get('endpoint', 0)}-{value['property']}" + if 'propertyKey' in value: + value_id += '-' + str(value['propertyKey']) + + # Prepend the node_id to value_id if it's not available in node['values'] + # (compatibility with more recent versions of ZwaveJS that don't provide + # the value_id on the events) + if value_id not in node_values: + value_id = f"{node_id}-{value_id}" + + if value_id not in node_values: + self.logger.warning( + 'value_id %s not found on node %s', value_id, node_id + ) + return + + value = kwargs['value'] = node_values[value_id] + + if issubclass(event_type, ZwaveNodeEvent): + # If the node has been removed, remove it from the cache + if event_type == ZwaveNodeRemovedEvent: + self._nodes.pop(node_id, None) + # If this node_id wasn't cached before, then it's a new node + elif node_id not in self._nodes: + event_type = ZwaveNodeAddedEvent + # If the name has changed, we have a rename event + elif node['name'] != self._nodes[node_id]['name']: + event_type = ZwaveNodeRenamedEvent + # If nothing relevant has changed, update the cached instance and return + else: + self._nodes[node_id] = node + return + + evt = event_type(**kwargs) + self._events_queue.put(evt) + + if ( + value + and issubclass(event_type, ZwaveValueChangedEvent) + and event_type != ZwaveValueRemovedEvent + ): + self.plugin.publish_entities([kwargs['value']]) # type: ignore + + def on_mqtt_message(self): + def handler(_, __, msg): + if not msg.topic.startswith(self.events_topic): + return + + topic = ( + msg.topic[(len(self.events_topic) + 1) :].split('/').pop() # noqa: E203 + ) + data = msg.payload.decode() + if not data: + return + + with contextlib.suppress(ValueError, TypeError): + data = json.loads(data)['data'] + + try: + if topic == 'node_value_updated': + self._dispatch_event( + ZwaveValueChangedEvent, node=data[0], value=data[1] + ) + elif topic == 'node_metadata_updated': + self._dispatch_event(ZwaveNodeEvent, node=data[0]) + elif topic == 'node_sleep': + self._dispatch_event(ZwaveNodeAsleepEvent, node=data[0]) + elif topic == 'node_wakeup': + self._dispatch_event(ZwaveNodeAwakeEvent, node=data[0]) + elif topic == 'node_ready': + self._dispatch_event(ZwaveNodeReadyEvent, node=data[0]) + elif topic == 'node_removed': + self._dispatch_event(ZwaveNodeRemovedEvent, node=data[0]) + except Exception as e: + self.logger.exception(e) + + return handler + + def run(self): + super().run() + self.logger.debug('Refreshing Z-Wave nodes') + self._nodes = self.plugin.get_nodes().output # type: ignore + + while not self.should_stop(): + try: + evt = self._events_queue.get(block=True, timeout=1) + except Empty: + continue + + get_bus().post(evt) + + +# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/zwave/mqtt/manifest.yaml b/platypush/plugins/zwave/mqtt/manifest.yaml index 9cce3fd0c..96cccdaa4 100644 --- a/platypush/plugins/zwave/mqtt/manifest.yaml +++ b/platypush/plugins/zwave/mqtt/manifest.yaml @@ -1,5 +1,21 @@ manifest: - events: {} + events: + platypush.message.event.zwave.ZwaveNodeAddedEvent: > + when a node is added to the network. + platypush.message.event.zwave.ZwaveNodeAsleepEvent: > + when a node goes into sleep mode. + platypush.message.event.zwave.ZwaveNodeAwakeEvent: > + when a node goes back into awake mode. + platypush.message.event.zwave.ZwaveNodeEvent: > + when a node attribute changes. + platypush.message.event.zwave.ZwaveNodeReadyEvent: > + when a node is ready. + platypush.message.event.zwave.ZwaveNodeRemovedEvent: > + when a node is removed from the network. + platypush.message.event.zwave.ZwaveNodeRenamedEvent: > + when a node is renamed. + platypush.message.event.zwave.ZwaveValueChangedEvent: > + when the value of a node on the network changes. install: pip: - paho-mqtt