From 452533db177c0526cb7bf5d2dfd5ad9308fa077a Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Mon, 8 Feb 2021 01:44:26 +0100 Subject: [PATCH 1/7] Fixed MQTT over SSL default version spec in case the parameter is not a string --- platypush/plugins/mqtt.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/platypush/plugins/mqtt.py b/platypush/plugins/mqtt.py index fa6e78892c..a0c53c95cc 100644 --- a/platypush/plugins/mqtt.py +++ b/platypush/plugins/mqtt.py @@ -82,7 +82,12 @@ class MqttPlugin(Plugin): if not version: return None - version = version.lower() + if type(version) == type(ssl.PROTOCOL_TLS): + return version + + if isinstance(version, str): + version = version.lower() + if version == 'tls': return ssl.PROTOCOL_TLS if version == 'tlsv1': From 15d2e1116b08bdbb0a634fa7a985dfe15337b2bd Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Mon, 8 Feb 2021 01:45:21 +0100 Subject: [PATCH 2/7] Fixed device_get and device_rename to use the new zigbee2mqtt API --- platypush/plugins/zigbee/mqtt.py | 104 ++++++++++++++++++++++++------- 1 file changed, 81 insertions(+), 23 deletions(-) diff --git a/platypush/plugins/zigbee/mqtt.py b/platypush/plugins/zigbee/mqtt.py index 185e40fcbc..20db38299d 100644 --- a/platypush/plugins/zigbee/mqtt.py +++ b/platypush/plugins/zigbee/mqtt.py @@ -124,6 +124,10 @@ class ZigbeeMqttPlugin(MqttPlugin): self.base_topic = base_topic self.timeout = timeout + self._info = { + 'devices': {}, + 'groups': {}, + } def _get_network_info(self, **kwargs): self.logger.info('Fetching Zigbee network information') @@ -166,6 +170,18 @@ class ZigbeeMqttPlugin(MqttPlugin): if not info_ready: raise TimeoutError('A timeout occurred while fetching the Zigbee network information') + # Cache the new results + self._info['devices'] = { + device.get('friendly_name', device['ieee_address']): device + for device in info.get('devices', []) + } + + self._info['groups'] = { + group.get('name'): group + for group in info.get('groups', []) + } + + self.logger.info('Zigbee network configuration updated') return info finally: try: @@ -174,21 +190,18 @@ class ZigbeeMqttPlugin(MqttPlugin): except Exception as e: self.logger.warning('Error on MQTT client disconnection: {}'.format(str(e))) - def _mqtt_args(self, host: Optional[str] = None, **kwargs): - if not host: - return { - 'host': self.host, - 'port': self.port, - 'timeout': self.timeout, - 'tls_certfile': self.tls_certfile, - 'tls_keyfile': self.tls_keyfile, - 'tls_version': self.tls_version, - 'tls_ciphers': self.tls_ciphers, - 'username': self.username, - 'password': self.password, - } - - return kwargs + def _mqtt_args(self, **kwargs): + return { + 'host': kwargs.get('host', self.host), + 'port': kwargs.get('port', self.port), + 'timeout': kwargs.get('timeout', self.timeout), + 'tls_certfile': kwargs.get('tls_certfile', self.tls_certfile), + 'tls_keyfile': kwargs.get('tls_keyfile', self.tls_keyfile), + 'tls_version': kwargs.get('tls_version', self.tls_version), + 'tls_ciphers': kwargs.get('tls_ciphers', self.tls_ciphers), + 'username': kwargs.get('username', self.username), + 'password': kwargs.get('password', self.password), + } def _topic(self, topic): return self.base_topic + '/' + topic @@ -497,10 +510,45 @@ class ZigbeeMqttPlugin(MqttPlugin): assert not [dev for dev in devices if dev.get('friendly_name') == name], \ 'A device named {} already exists on the network'.format(name) + if device: + req = { + 'from': device, + 'to': name, + } + else: + req = { + 'last': True, + 'to': name, + } + self.publish( - topic=self._topic('bridge/config/rename{}'.format('_last' if not device else '')), - msg={'old': device, 'new': name} if device else name, - **self._mqtt_args(**kwargs)) + topic=self._topic('bridge/request/device/rename'), + msg=req, **self._mqtt_args(**kwargs)) + + @staticmethod + def _build_device_get_request(values: List[Dict[str, Any]]) -> dict: + def extract_value(value: dict, root: dict): + if not value.get('access', 1) & 0x1: + # Property not readable + return + + if 'features' not in value: + if 'property' in value: + root[value['property']] = '' + return + + if 'property' in value: + root[value['property']] = root.get(value['property'], {}) + root = root[value['property']] + + for feature in value['features']: + extract_value(feature, root) + + ret = {} + for value in values: + extract_value(value, root=ret) + + return ret # noinspection PyShadowingBuiltins @action @@ -516,16 +564,26 @@ class ZigbeeMqttPlugin(MqttPlugin): (default: query the default configured device). :return: Key->value map of the device properties. """ - properties = self.publish(topic=self._topic(device + ('/get' if property else '')), - reply_topic=self._topic(device), - msg={property: ''} if property else '', - **self._mqtt_args(**kwargs)).output + kwargs = self._mqtt_args(**kwargs) if property: + properties = self.publish(topic=self._topic(device) + '/get/' + property, reply_topic=self._topic(device), + msg={property: ''}, **kwargs).output + assert property in properties, 'No such property: ' + property return {property: properties[property]} - return properties + if device not in self._info.get('devices', {}): + # Refresh devices info + self._get_network_info(**kwargs) + + assert self._info.get('devices', {}).get(device), 'No such device: ' + device + exposes = (self._info.get('devices', {}).get(device, {}).get('definition', {}) or {}).get('exposes', []) + if not exposes: + return {} + + return self.publish(topic=self._topic(device) + '/get', reply_topic=self._topic(device), + msg=self._build_device_get_request(exposes), **kwargs) # noinspection PyShadowingBuiltins,DuplicatedCode @action From 021dd3219003a2db051e3ca0f557c59c44f216fa Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Tue, 9 Feb 2021 02:33:43 +0100 Subject: [PATCH 3/7] Moved more zigbee2mqtt methods to the new API --- platypush/plugins/zigbee/mqtt.py | 116 +++++++++++++++++++------------ 1 file changed, 72 insertions(+), 44 deletions(-) diff --git a/platypush/plugins/zigbee/mqtt.py b/platypush/plugins/zigbee/mqtt.py index 20db38299d..48b699ab73 100644 --- a/platypush/plugins/zigbee/mqtt.py +++ b/platypush/plugins/zigbee/mqtt.py @@ -1,8 +1,9 @@ import json import threading -from typing import Optional, List, Any, Dict +from typing import Optional, List, Any, Dict, Union +from platypush.message.response import Response from platypush.plugins.mqtt import MqttPlugin, action @@ -206,6 +207,14 @@ class ZigbeeMqttPlugin(MqttPlugin): def _topic(self, topic): return self.base_topic + '/' + topic + @staticmethod + def _parse_response(response: Union[dict, Response]) -> dict: + if isinstance(response, Response): + response = response.output + + assert response.get('status') != 'error', response.get('error', 'zigbee2mqtt error') + return response + @action def devices(self, **kwargs) -> List[Dict[str, Any]]: """ @@ -380,13 +389,6 @@ class ZigbeeMqttPlugin(MqttPlugin): """ return self._get_network_info(**kwargs).get('devices') - def _permit_join_timeout_callback(self, permit: bool, **kwargs): - def callback(): - self.logger.info('Restoring permit_join state to {}'.format(permit)) - self.permit_join(permit, **kwargs) - - return callback - @action def permit_join(self, permit: bool = True, timeout: Optional[float] = None, **kwargs): """ @@ -398,19 +400,16 @@ class ZigbeeMqttPlugin(MqttPlugin): :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` (default: query the default configured device). """ - self.publish(topic=self._topic('bridge/config/permit_join'), msg=permit, **self._mqtt_args(**kwargs)) if timeout: - threading.Timer(timeout, self._permit_join_timeout_callback(not permit, **kwargs)).start() + return self._parse_response( + self.publish(topic=self._topic('bridge/request/permit_join'), + msg={'value': permit, 'time': timeout}, + reply_topic=self._topic('bridge/response/permit_join'), + **self._mqtt_args(**kwargs))) - @action - def reset(self, **kwargs): - """ - Reset the adapter. - - :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` - (default: query the default configured device). - """ - self.publish(topic=self._topic('bridge/config/reset'), msg='', **self._mqtt_args(**kwargs)) + return self.publish(topic=self._topic('bridge/request/permit_join'), + msg={'value': permit}, + **self._mqtt_args(**kwargs)) @action def factory_reset(self, **kwargs): @@ -421,7 +420,7 @@ class ZigbeeMqttPlugin(MqttPlugin): :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` (default: query the default configured device). """ - self.publish(topic=self._topic('bridge/config/factory_reset'), msg='', **self._mqtt_args(**kwargs)) + self.publish(topic=self._topic('bridge/request/touchlink/factory_reset'), msg='', **self._mqtt_args(**kwargs)) @action def log_level(self, level: str, **kwargs): @@ -432,7 +431,10 @@ class ZigbeeMqttPlugin(MqttPlugin): :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` (default: query the default configured device). """ - self.publish(topic=self._topic('bridge/config/log_level'), msg=level, **self._mqtt_args(**kwargs)) + return self._parse_response( + self.publish(topic=self._topic('bridge/request/config/log_level'), msg={'value': level}, + reply_topic=self._topic('bridge/response/config/log_level'), + **self._mqtt_args(**kwargs))) @action def device_set_option(self, device: str, option: str, value: Any, **kwargs): @@ -445,12 +447,15 @@ class ZigbeeMqttPlugin(MqttPlugin): :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` (default: query the default configured device). """ - self.publish(topic=self._topic('bridge/config/device_options'), msg={ - 'friendly_name': device, - 'options': { - option: value, - } - }, **self._mqtt_args(**kwargs)) + return self._parse_response( + self.publish(topic=self._topic('bridge/request/device/options'), + reply_topic=self._topic('bridge/response/device/options'), + msg={ + 'id': device, + 'options': { + option: value, + } + }, **self._mqtt_args(**kwargs))) @action def device_remove(self, device: str, force: bool = False, **kwargs): @@ -464,8 +469,11 @@ class ZigbeeMqttPlugin(MqttPlugin): :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` (default: query the default configured device). """ - topic = self._topic('bridge/config/{}remove'.format('force_' if force else '')) - self.publish(topic=topic, msg=device, **self._mqtt_args(**kwargs)) + return self._parse_response( + self.publish(topic=self._topic('bridge/request/device/remove'), + msg={'id': device, 'force': force}, + reply_topic=self._topic('bridge/response/device/remove'), + **self._mqtt_args(**kwargs))) @action def device_ban(self, device: str, **kwargs): @@ -476,7 +484,11 @@ class ZigbeeMqttPlugin(MqttPlugin): :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` (default: query the default configured device). """ - self.publish(topic=self._topic('bridge/config/ban'), msg=device, **self._mqtt_args(**kwargs)) + return self._parse_response( + self.publish(topic=self._topic('bridge/request/device/ban'), + reply_topic=self._topic('bridge/response/device/ban'), + msg={'id': device}, + **self._mqtt_args(**kwargs))) @action def device_whitelist(self, device: str, **kwargs): @@ -488,7 +500,11 @@ class ZigbeeMqttPlugin(MqttPlugin): :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` (default: query the default configured device). """ - self.publish(topic=self._topic('bridge/config/whitelist'), msg=device, **self._mqtt_args(**kwargs)) + return self._parse_response( + self.publish(topic=self._topic('bridge/request/device/whitelist'), + reply_topic=self._topic('bridge/response/device/whitelist'), + msg={'id': device}, + **self._mqtt_args(**kwargs))) @action def device_rename(self, name: str, device: Optional[str] = None, **kwargs): @@ -521,9 +537,11 @@ class ZigbeeMqttPlugin(MqttPlugin): 'to': name, } - self.publish( - topic=self._topic('bridge/request/device/rename'), - msg=req, **self._mqtt_args(**kwargs)) + return self._parse_response( + self.publish(topic=self._topic('bridge/request/device/rename'), + msg=req, + reply_topic=self._topic('bridge/response/device/rename'), + **self._mqtt_args(**kwargs))) @staticmethod def _build_device_get_request(values: List[Dict[str, Any]]) -> dict: @@ -778,11 +796,17 @@ class ZigbeeMqttPlugin(MqttPlugin): :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` (default: query the default configured device). """ - args = {'friendly_name': name} - if id is not None: - args['id'] = id + payload = name if id is None else { + 'id': id, + 'friendly_name': name, + } - self.publish(topic=self._topic('bridge/config/add_group'), msg=args, **self._mqtt_args(**kwargs)) + return self._parse_response( + self.publish(topic=self._topic('bridge/request/group/add'), + reply_topic=self._topic('bridge/response/group/add'), + msg=payload, + **self._mqtt_args(**kwargs)) + ) # noinspection PyShadowingBuiltins,DuplicatedCode @action @@ -826,10 +850,11 @@ class ZigbeeMqttPlugin(MqttPlugin): groups = {group.get('friendly_name'): group for group in self.groups().output} assert name not in groups, 'A group named {} already exists on the network'.format(name) - self.publish( - topic=self._topic('bridge/config/rename'), - msg={'old': group, 'new': name} if group else name, - **self._mqtt_args(**kwargs)) + return self._parse_response( + self.publish(topic=self._topic('bridge/request/group/rename'), + reply_topic=self._topic('bridge/response/group/rename'), + msg={'from': group, 'to': name} if group else name, + **self._mqtt_args(**kwargs))) @action def group_remove(self, name: str, **kwargs): @@ -840,8 +865,11 @@ class ZigbeeMqttPlugin(MqttPlugin): :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` (default: query the default configured device). """ - self.publish(topic=self._topic('bridge/config/remove_group'), msg=name, - **self._mqtt_args(**kwargs)) + return self._parse_response( + self.publish(topic=self._topic('bridge/request/group/remove'), + reply_topic=self._topic('bridge/response/group/remove'), + msg=name, + **self._mqtt_args(**kwargs))) @action def group_add_device(self, group: str, device: str, **kwargs): From 7325c8706860fe07a0478a624d6ea297d42c10ae Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Wed, 10 Feb 2021 02:00:52 +0100 Subject: [PATCH 4/7] Fixed remaining Zigbee groups and binding methods to work with the new zigbee2mqtt API [see #163] --- platypush/plugins/zigbee/mqtt.py | 115 +++++++++++++++++++++++++------ 1 file changed, 94 insertions(+), 21 deletions(-) diff --git a/platypush/plugins/zigbee/mqtt.py b/platypush/plugins/zigbee/mqtt.py index 48b699ab73..8b964105c1 100644 --- a/platypush/plugins/zigbee/mqtt.py +++ b/platypush/plugins/zigbee/mqtt.py @@ -414,8 +414,9 @@ class ZigbeeMqttPlugin(MqttPlugin): @action def factory_reset(self, **kwargs): """ - Perform a factory reset of the device. Of course, you should only do it if you know what you're doing, - as you will lose all the paired devices and may also lose the Z-Stack firmware. + Perform a factory reset of a device connected to the network, following the procedure required by the particular + device (for instance, Hue bulbs require the Zigbee adapter to be close to the device while a button on the back + of the bulb is pressed). :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` (default: query the default configured device). @@ -628,19 +629,49 @@ class ZigbeeMqttPlugin(MqttPlugin): return properties @action - def device_groups(self, device: str, **kwargs) -> List[int]: + def device_check_ota_updates(self, device: str, **kwargs) -> dict: """ - List the groups a given device belongs to. + Check if the specified device has any OTA updates available to install. - :param device: Display name of the device. + :param device: Address or friendly name of the device. :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` (default: query the default configured device). - :return: List of group IDs the device is linked to. - """ - return self.publish(topic=self._topic('bridge/device/{}/get_group_membership'.format(device)), - reply_topic=self._topic(device), msg=device, **self._mqtt_args(**kwargs)). \ - output.get('group_list', []) + :return: + + .. code-block:: json + + { + "id": "", + "update_available": true, + "status": "ok" + } + + """ + ret = self._parse_response( + self.publish(topic=self._topic('bridge/request/device/ota_update/check'), + reply_topic=self._topic('bridge/response/device/ota_update/check'), + msg={'id': device}, **self._mqtt_args(**kwargs))) + + return { + 'status': ret['status'], + 'id': ret.get('data', {}).get('id'), + 'update_available': ret.get('data', {}).get('update_available', False), + } + + @action + def device_install_ota_updates(self, device: str, **kwargs): + """ + Install OTA updates for a device if available. + + :param device: Address or friendly name of the device. + :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` + (default: query the default configured device). + """ + return self._parse_response( + self.publish(topic=self._topic('bridge/request/device/ota_update/update'), + reply_topic=self._topic('bridge/response/device/ota_update/update'), + msg={'id': device}, **self._mqtt_args(**kwargs))) @action def groups(self, **kwargs) -> List[dict]: @@ -808,6 +839,32 @@ class ZigbeeMqttPlugin(MqttPlugin): **self._mqtt_args(**kwargs)) ) + @action + def group_get(self, group: str, property: Optional[str] = None, **kwargs) -> dict: + """ + Get one or more properties of a group. The compatible properties vary depending on the devices on the group. + For example, a light bulb may have the "``state``" (with values ``"ON"`` and ``"OFF"``) and "``brightness``" + properties, while an environment sensor may have the "``temperature``" and "``humidity``" properties, and so on. + + :param group: Display name of the group. + :param property: Name of the property to retrieve (default: all available properties) + :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` + (default: query the default configured device). + """ + msg = {} + if property: + msg = {property: ''} + + properties = self.publish(topic=self._topic(group + '/get'), + reply_topic=self._topic(group), + msg=msg, **self._mqtt_args(**kwargs)).output + + if property: + assert property in properties, 'No such property: ' + property + return {property: properties[property]} + + return properties + # noinspection PyShadowingBuiltins,DuplicatedCode @action def group_set(self, group: str, property: str, value: Any, **kwargs): @@ -881,8 +938,13 @@ class ZigbeeMqttPlugin(MqttPlugin): :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` (default: query the default configured device). """ - self.publish(topic=self._topic('bridge/group/{}/add'.format(group)), - msg=device, **self._mqtt_args(**kwargs)) + return self._parse_response( + self.publish(topic=self._topic('bridge/request/group/members/add'), + reply_topic=self._topic('bridge/response/group/members/add'), + msg={ + 'group': group, + 'device': device, + }, **self._mqtt_args(**kwargs))) @action def group_remove_device(self, group: str, device: Optional[str] = None, **kwargs): @@ -895,11 +957,16 @@ class ZigbeeMqttPlugin(MqttPlugin): :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` (default: query the default configured device). """ - self.publish(topic=self._topic('bridge/group/{}/remove{}'.format(group, '_all' if not device else '')), - msg=device, **self._mqtt_args(**kwargs)) + return self._parse_response( + self.publish(topic=self._topic('bridge/request/group/members/remove{}'.format('_all' if device is None else '')), + reply_topic=self._topic('bridge/response/group/members/remove{}'.format('_all' if device is None else '')), + msg={ + 'group': group, + 'device': device, + }, **self._mqtt_args(**kwargs))) @action - def bind_devices(self, source: str, target: str, endpoint: Optional[str] = None, **kwargs): + def bind_devices(self, source: str, target: str, **kwargs): """ Bind two devices. Binding makes it possible that devices can directly control each other without the intervention of zigbee2mqtt or any home automation software. You may want to use this feature to bind @@ -908,14 +975,16 @@ class ZigbeeMqttPlugin(MqttPlugin): :param source: Name of the source device. It can also be a group name, although the support is `still experimental `_. + You can also bind a specific device endpoint - for example ``MySensor/temperature``. :param target: Name of the target device. - :param endpoint: The target may support multiple endpoints (e.g. 'left', 'down', 'up' etc.). If so, - you can bind the source to a specific endpoint on the target device. + You can also bind a specific device endpoint - for example ``MyLight/state``. :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` (default: query the default configured device). """ - self.publish(topic=self._topic('bridge/bind/' + source + ('/' + endpoint if endpoint else '')), - msg=target, **self._mqtt_args(**kwargs)) + return self._parse_response( + self.publish(topic=self._topic('bridge/request/device/bind'), + reply_topic=self._topic('bridge/response/device/bind'), + msg={'from': source, 'to': target}, **self._mqtt_args(**kwargs)) ) @action def unbind_devices(self, source: str, target: str, **kwargs): @@ -923,12 +992,16 @@ class ZigbeeMqttPlugin(MqttPlugin): Un-bind two devices. :param source: Name of the source device. + You can also bind a specific device endpoint - for example ``MySensor/temperature``. :param target: Name of the target device. + You can also bind a specific device endpoint - for example ``MyLight/state``. :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` (default: query the default configured device). """ - self.publish(topic=self._topic('bridge/unbind/' + source), - msg=target, **self._mqtt_args(**kwargs)) + return self._parse_response( + self.publish(topic=self._topic('bridge/request/device/unbind'), + reply_topic=self._topic('bridge/response/device/unbind'), + msg={'from': source, 'to': target}, **self._mqtt_args(**kwargs)) ) # vim:sw=4:ts=4:et: \ No newline at end of file From ade04a6ea11a5fea49d01b1f04858deb3a0572b2 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Wed, 10 Feb 2021 22:26:51 +0100 Subject: [PATCH 5/7] Refactored backend.mqtt to reuse connections whenever possible, as well as programmatically subscribe/unsubscribe topics at runtime --- platypush/backend/mqtt.py | 211 +++++++++++++++++++++++--------------- 1 file changed, 127 insertions(+), 84 deletions(-) diff --git a/platypush/backend/mqtt.py b/platypush/backend/mqtt.py index 80daeaa5df..53a6e10404 100644 --- a/platypush/backend/mqtt.py +++ b/platypush/backend/mqtt.py @@ -1,7 +1,9 @@ import json import os import threading -from typing import Optional +from typing import Optional, List, Callable + +import paho.mqtt.client as mqtt from platypush.backend import Backend from platypush.config import Config @@ -13,6 +15,73 @@ from platypush.plugins.mqtt import MqttPlugin as MQTTPlugin from platypush.utils import set_thread_name +class MqttClient(mqtt.Client, threading.Thread): + def __init__(self, *args, host: str, port: int, topics: Optional[List[str]] = None, + on_message: Optional[Callable] = None, username: Optional[str] = None, password: Optional[str] = None, + client_id: Optional[str] = None, tls_cafile: Optional[str] = None, tls_certfile: Optional[str] = None, + tls_keyfile: Optional[str] = None, tls_version: Optional = None, tls_ciphers: Optional = None, + tls_insecure: bool = False, keepalive: Optional[int] = 60, **kwargs): + mqtt.Client.__init__(self, *args, client_id=client_id, **kwargs) + threading.Thread.__init__(self) + + self.host = host + self.port = port + self.topics = set(topics or []) + self.keepalive = keepalive + self.on_connect = self.connect_hndl() + + if on_message: + self.on_message = on_message + + if username and password: + self.username_pw_set(username, password) + + if tls_cafile: + self.tls_set(ca_certs=tls_cafile, certfile=tls_certfile, keyfile=tls_keyfile, tls_version=tls_version, + ciphers=tls_ciphers) + + self.tls_insecure_set(tls_insecure) + + self._running = False + self._stop_scheduled = False + + def subscribe(self, *topics, **kwargs): + if not topics: + topics = self.topics + + self.topics.update(topics) + for topic in topics: + super().subscribe(topic, **kwargs) + + def unsubscribe(self, *topics, **kwargs): + if not topics: + topics = self.topics + + for topic in topics: + super().unsubscribe(topic, **kwargs) + self.topics.remove(topic) + + def connect_hndl(self): + def handler(*_, **__): + self.subscribe() + + return handler + + def run(self): + super().run() + self.connect(host=self.host, port=self.port, keepalive=self.keepalive) + self._running = True + self.loop_forever() + + def stop(self): + if not self.is_alive(): + return + + self._stop_scheduled = True + self.disconnect() + self._running = False + + class MqttBackend(Backend): """ Backend that reads messages from a configured MQTT topic (default: @@ -115,9 +184,7 @@ class MqttBackend(Backend): self.topic = '{}/{}'.format(topic, self.device_id) self.subscribe_default_topic = subscribe_default_topic - self._client = None - self._listeners = [] - + self._listeners = {} # (host, port, msg_handler) -> MqttClient map self.listeners_conf = listeners or [] def send_message(self, msg, topic: Optional[str] = None, **kwargs): @@ -128,51 +195,20 @@ class MqttBackend(Backend): password=self.password, tls_cafile=self.tls_cafile, tls_certfile=self.tls_certfile, tls_keyfile=self.tls_keyfile, tls_version=self.tls_version, tls_insecure=self.tls_insecure, - tls_ciphers=self.tls_ciphers, client_id=self.client_id, **kwargs) + tls_ciphers=self.tls_ciphers, **kwargs) except Exception as e: self.logger.exception(e) - @staticmethod - def on_connect(*topics): - # noinspection PyUnusedLocal - def handler(client, userdata, flags, rc): - for topic in topics: - client.subscribe(topic) - - return handler - - def on_mqtt_message(self): - def handler(client, _, msg): - data = msg.payload - # noinspection PyBroadException - try: - data = data.decode('utf-8') - data = json.loads(data) - except: - pass - - # noinspection PyProtectedMember - self.bus.post(MQTTMessageEvent(host=client._host, port=client._port, topic=msg.topic, msg=data)) - - return handler - @staticmethod def _expandpath(path: str) -> str: return os.path.abspath(os.path.expanduser(path)) if path else path - def _initialize_listeners(self, listeners_conf): - import paho.mqtt.client as mqtt - - def listener_thread(client_, host, port): - client_.connect(host, port) - client_.loop_forever() - + def add_listeners(self, *listeners): # noinspection PyShadowingNames,PyUnusedLocal - for i, listener in enumerate(listeners_conf): + for i, listener in enumerate(listeners): host = listener.get('host') if host: port = listener.get('port', self._default_mqtt_port) - topics = listener.get('topics') username = listener.get('username') password = listener.get('password') tls_cafile = self._expandpath(listener.get('tls_cafile')) @@ -189,7 +225,7 @@ class MqttBackend(Backend): tls_cafile = self.tls_cafile tls_certfile = self.tls_certfile tls_keyfile = self.tls_keyfile - tls_version = self.tls_keyfile + tls_version = self.tls_version tls_ciphers = self.tls_ciphers tls_insecure = self.tls_insecure @@ -198,24 +234,48 @@ class MqttBackend(Backend): self.logger.warning('No list of topics specified for listener n.{}'.format(i+1)) continue - client = mqtt.Client() - client.on_connect = self.on_connect(*topics) - client.on_message = self.on_mqtt_message() + client = self._get_client(host=host, port=port, topics=topics, username=username, password=password, + client_id=self.client_id, tls_cafile=tls_cafile, tls_certfile=tls_certfile, + tls_keyfile=tls_keyfile, tls_version=tls_version, tls_ciphers=tls_ciphers, + tls_insecure=tls_insecure) - if username and password: - client.username_pw_set(username, password) + if not client.is_alive(): + client.start() - if tls_cafile: - client.tls_set(ca_certs=tls_cafile, - certfile=tls_certfile, - keyfile=tls_keyfile, - tls_version=tls_version, - ciphers=tls_ciphers) + def _get_client(self, host: str, port: int, topics: Optional[List[str]] = None, username: Optional[str] = None, + password: Optional[str] = None, client_id: Optional[str] = None, tls_cafile: Optional[str] = None, + tls_certfile: Optional[str] = None, tls_keyfile: Optional[str] = None, tls_version: Optional = None, + tls_ciphers: Optional = None, tls_insecure: bool = False, on_message: Optional[Callable] = None) \ + -> MqttClient: + on_message = on_message or self.on_mqtt_message() + on_message_name = repr(on_message) + client = self._listeners.get((host, port, on_message_name)) - client.tls_insecure_set(tls_insecure) + if not (client and client.is_alive()): + client = MqttClient(host=host, port=port, topics=topics, username=username, password=password, + client_id=client_id, tls_cafile=tls_cafile, tls_certfile=tls_certfile, + tls_keyfile=tls_keyfile, tls_version=tls_version, tls_ciphers=tls_ciphers, + tls_insecure=tls_insecure, on_message=on_message) - threading.Thread(target=listener_thread, kwargs={ - 'client_': client, 'host': host, 'port': port}).start() + self._listeners[(host, port, on_message_name)] = client + + client.subscribe(*topics) + return client + + def on_mqtt_message(self): + def handler(client, __, msg): + data = msg.payload + # noinspection PyBroadException + try: + data = data.decode('utf-8') + data = json.loads(data) + except: + pass + + # noinspection PyProtectedMember + self.bus.post(MQTTMessageEvent(host=client._host, port=client._port, topic=msg.topic, msg=data)) + + return handler def on_exec_message(self): def handler(_, __, msg): @@ -257,51 +317,34 @@ class MqttBackend(Backend): return handler def run(self): - import paho.mqtt.client as mqtt - super().run() - self._client = None if self.host: - self._client = mqtt.Client(self.client_id) - if self.subscribe_default_topic: - self._client.on_connect = self.on_connect(self.topic) + topics = [self.topic] if self.subscribe_default_topic else [] + client = self._get_client(host=self.host, port=self.port, topics=topics, username=self.username, + password=self.password, client_id=self.client_id, + tls_cafile=self.tls_cafile, tls_certfile=self.tls_certfile, + tls_keyfile=self.tls_keyfile, tls_version=self.tls_version, + tls_ciphers=self.tls_ciphers, tls_insecure=self.tls_insecure, + on_message=self.on_exec_message()) - self._client.on_message = self.on_exec_message() - if self.username and self.password: - self._client.username_pw_set(self.username, self.password) - - if self.tls_cafile: - self._client.tls_set(ca_certs=self.tls_cafile, certfile=self.tls_certfile, - keyfile=self.tls_keyfile, - tls_version=self.tls_version, - ciphers=self.tls_ciphers) - - self._client.tls_insecure_set(self.tls_insecure) - - self._client.connect(self.host, self.port, 60) + client.start() self.logger.info('Initialized MQTT backend on host {}:{}, topic {}'. format(self.host, self.port, self.topic)) - self._initialize_listeners(self.listeners_conf) - if self._client: - self._client.loop_forever() + self.add_listeners(*self.listeners_conf) def stop(self): - self.logger.info('Received STOP event on MqttBackend') - if self._client: - self._client.disconnect() - self._client.loop_stop() - self._client = None + self.logger.info('Received STOP event on the MQTT backend') - for listener in self._listeners: + for ((host, port, _), listener) in self._listeners.items(): try: listener.loop_stop() + listener.disconnect() except Exception as e: # noinspection PyProtectedMember - self.logger.warning('Could not stop listener {host}:{port}: {error}'.format( - host=listener._host, port=listener._port, - error=str(e))) + self.logger.warning('Could not stop listener {host}:{port}: {error}'. + format(host=host, port=port, error=str(e))) # vim:sw=4:ts=4:et: From 73cc742dfbd3440f27491b34e804eb8cfdb6b2d0 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Thu, 11 Feb 2021 23:50:28 +0100 Subject: [PATCH 6/7] Fixed backend.zigbee.mqtt to work with the new zigbee2mqtt API --- platypush/backend/zigbee/mqtt.py | 130 ++++++++++++++++++++------ platypush/message/request/__init__.py | 4 +- platypush/plugins/zigbee/mqtt.py | 12 ++- 3 files changed, 110 insertions(+), 36 deletions(-) diff --git a/platypush/backend/zigbee/mqtt.py b/platypush/backend/zigbee/mqtt.py index 22a1c31166..2637bb759c 100644 --- a/platypush/backend/zigbee/mqtt.py +++ b/platypush/backend/zigbee/mqtt.py @@ -84,7 +84,9 @@ class ZigbeeMqttBackend(MqttBackend): plugin = get_plugin('zigbee.mqtt') self.base_topic = base_topic or plugin.base_topic - listeners = [{ + self._devices = {} + self._groups = {} + self.server_info = { 'host': host or plugin.host, 'port': port or plugin.port or self._default_mqtt_port, 'tls_cafile': tls_cafile or plugin.tls_cafile, @@ -92,11 +94,15 @@ class ZigbeeMqttBackend(MqttBackend): 'tls_ciphers': tls_ciphers or plugin.tls_ciphers, 'tls_keyfile': tls_keyfile or plugin.tls_keyfile, 'tls_version': tls_version or plugin.tls_version, - 'username': username or plugin.username , + 'username': username or plugin.username, 'password': password or plugin.password, + } + + listeners = [{ + **self.server_info, 'topics': [ self.base_topic + '/' + topic - for topic in ['bridge/state', 'bridge/log'] + for topic in ['bridge/state', 'bridge/log', 'bridge/logging', 'bridge/devices', 'bridge/groups'] ], }] @@ -116,50 +122,99 @@ class ZigbeeMqttBackend(MqttBackend): def _process_log_message(self, client, msg): msg_type = msg.get('type') - msg = msg.get('message') + text = msg.get('message') # noinspection PyProtectedMember args = {'host': client._host, 'port': client._port} if msg_type == 'devices': devices = {} - for dev in (msg or []): + for dev in (text or []): devices[dev['friendly_name']] = dev client.subscribe(self.base_topic + '/' + dev['friendly_name']) elif msg_type == 'pairing': - self.bus.post(ZigbeeMqttDevicePairingEvent(device=msg, **args)) - elif msg_type == 'device_connected': - self.bus.post(ZigbeeMqttDeviceConnectedEvent(device=msg, **args)) + self.bus.post(ZigbeeMqttDevicePairingEvent(device=text, **args)) elif msg_type in ['device_ban', 'device_banned']: - self.bus.post(ZigbeeMqttDeviceBannedEvent(device=msg, **args)) - elif msg_type in ['device_removed', 'device_force_removed']: - force = msg_type == 'device_force_removed' - self.bus.post(ZigbeeMqttDeviceRemovedEvent(device=msg, force=force, **args)) + self.bus.post(ZigbeeMqttDeviceBannedEvent(device=text, **args)) elif msg_type in ['device_removed_failed', 'device_force_removed_failed']: force = msg_type == 'device_force_removed_failed' - self.bus.post(ZigbeeMqttDeviceRemovedFailedEvent(device=msg, force=force, **args)) + self.bus.post(ZigbeeMqttDeviceRemovedFailedEvent(device=text, force=force, **args)) elif msg_type == 'device_whitelisted': - self.bus.post(ZigbeeMqttDeviceWhitelistedEvent(device=msg, **args)) + self.bus.post(ZigbeeMqttDeviceWhitelistedEvent(device=text, **args)) elif msg_type == 'device_renamed': - self.bus.post(ZigbeeMqttDeviceRenamedEvent(device=msg, **args)) + self.bus.post(ZigbeeMqttDeviceRenamedEvent(device=text, **args)) elif msg_type == 'device_bind': - self.bus.post(ZigbeeMqttDeviceBindEvent(device=msg, **args)) + self.bus.post(ZigbeeMqttDeviceBindEvent(device=text, **args)) elif msg_type == 'device_unbind': - self.bus.post(ZigbeeMqttDeviceUnbindEvent(device=msg, **args)) + self.bus.post(ZigbeeMqttDeviceUnbindEvent(device=text, **args)) elif msg_type == 'device_group_add': - self.bus.post(ZigbeeMqttGroupAddedEvent(group=msg, **args)) + self.bus.post(ZigbeeMqttGroupAddedEvent(group=text, **args)) elif msg_type == 'device_group_add_failed': - self.bus.post(ZigbeeMqttGroupAddedFailedEvent(group=msg, **args)) + self.bus.post(ZigbeeMqttGroupAddedFailedEvent(group=text, **args)) elif msg_type == 'device_group_remove': - self.bus.post(ZigbeeMqttGroupRemovedEvent(group=msg, **args)) + self.bus.post(ZigbeeMqttGroupRemovedEvent(group=text, **args)) elif msg_type == 'device_group_remove_failed': - self.bus.post(ZigbeeMqttGroupRemovedFailedEvent(group=msg, **args)) + self.bus.post(ZigbeeMqttGroupRemovedFailedEvent(group=text, **args)) elif msg_type == 'device_group_remove_all': - self.bus.post(ZigbeeMqttGroupRemoveAllEvent(group=msg, **args)) + self.bus.post(ZigbeeMqttGroupRemoveAllEvent(group=text, **args)) elif msg_type == 'device_group_remove_all_failed': - self.bus.post(ZigbeeMqttGroupRemoveAllFailedEvent(group=msg, **args)) + self.bus.post(ZigbeeMqttGroupRemoveAllFailedEvent(group=text, **args)) elif msg_type == 'zigbee_publish_error': - self.logger.warning('zigbee2mqtt internal error: {}'.format(msg)) - self.bus.post(ZigbeeMqttErrorEvent(error=msg, **args)) + self.logger.error('zigbee2mqtt error: {}'.format(text)) + self.bus.post(ZigbeeMqttErrorEvent(error=text, **args)) + elif msg.get('level') in ['warning', 'error']: + log = getattr(self.logger, msg['level']) + log('zigbee2mqtt {}: {}'.format(msg['level'], text or msg.get('error', msg.get('warning')))) + + def _process_devices(self, client, msg): + devices_info = { + device.get('friendly_name', device.get('ieee_address')): device + for device in msg + } + + # noinspection PyProtectedMember + event_args = {'host': client._host, 'port': client._port} + client.subscribe(*[ + self.base_topic + '/' + device + for device in devices_info.keys() + ]) + + for name, device in devices_info.items(): + if name not in self._devices: + self.bus.post(ZigbeeMqttDeviceConnectedEvent(device=name, **event_args)) + + exposes = (device.get('definition', {}) or {}).get('exposes', []) + client.publish( + self.base_topic + '/' + name + '/get', + json.dumps(get_plugin('zigbee.mqtt').build_device_get_request(exposes)) + ) + + devices_copy = [*self._devices.keys()] + for name in devices_copy: + if name not in devices_info: + self.bus.post(ZigbeeMqttDeviceRemovedEvent(device=name, **event_args)) + del self._devices[name] + + self._devices = {device: {} for device in devices_info.keys()} + + def _process_groups(self, client, msg): + # noinspection PyProtectedMember + event_args = {'host': client._host, 'port': client._port} + groups_info = { + group.get('friendly_name', group.get('id')): group + for group in msg + } + + for name in groups_info.keys(): + if name not in self._groups: + self.bus.post(ZigbeeMqttGroupAddedEvent(group=name, **event_args)) + + groups_copy = [*self._groups.keys()] + for name in groups_copy: + if name not in groups_info: + self.bus.post(ZigbeeMqttGroupRemovedEvent(group=name, **event_args)) + del self._groups[name] + + self._groups = {group: {} for group in groups_info.keys()} def on_mqtt_message(self): def handler(client, _, msg): @@ -174,14 +229,31 @@ class ZigbeeMqttBackend(MqttBackend): if topic == 'bridge/state': self._process_state_message(client, data) - elif topic == 'bridge/log': + elif topic in ['bridge/log', 'bridge/logging']: self._process_log_message(client, data) + elif topic == 'bridge/devices': + self._process_devices(client, data) + elif topic == 'bridge/groups': + self._process_groups(client, data) else: - # noinspection PyProtectedMember - self.bus.post(ZigbeeMqttDevicePropertySetEvent(host=client._host, port=client._port, - device=topic, properties=data)) + suffix = topic.split('/')[-1] + if suffix not in self._devices: + return + + name = suffix + changed_props = {k: v for k, v in data.items() if v != self._devices[name].get(k)} + + if changed_props: + # noinspection PyProtectedMember + self.bus.post(ZigbeeMqttDevicePropertySetEvent(host=client._host, port=client._port, + device=name, properties=changed_props)) + + self._devices[name].update(data) return handler + def run(self): + super().run() + # vim:sw=4:ts=4:et: diff --git a/platypush/message/request/__init__.py b/platypush/message/request/__init__.py index 0980ff0677..690b7c349d 100644 --- a/platypush/message/request/__init__.py +++ b/platypush/message/request/__init__.py @@ -242,9 +242,9 @@ class Request(Message): elif not response.disable_logging: logger.info('Processed response from action {}: {}'. format(action, str(response))) - except AssertionError as e: + except (AssertionError, TimeoutError) as e: plugin.logger.exception(e) - logger.warning('Assertion error from action [{}]: {}'.format(action, str(e))) + logger.warning('{} from action [{}]: {}'.format(type(e), action, str(e))) response = Response(output=None, errors=[str(e)]) except Exception as e: # Retry mechanism diff --git a/platypush/plugins/zigbee/mqtt.py b/platypush/plugins/zigbee/mqtt.py index 8b964105c1..5f08eeafd4 100644 --- a/platypush/plugins/zigbee/mqtt.py +++ b/platypush/plugins/zigbee/mqtt.py @@ -38,7 +38,9 @@ class ZigbeeMqttPlugin(MqttPlugin): - You can disconnect your debugger and downloader cable once the firmware is flashed. - Install ``zigbee2mqtt``. First install a node/npm environment, then either install ``zigbee2mqtt`` manually or - through your package manager. Manual instructions: + through your package manager. **NOTE**: many API breaking changes have occurred on Zigbee2MQTT 1.17.0, + therefore this integration will only be compatible with the version 1.17.0 of the service or higher versions. + Manual instructions: .. code-block:: shell @@ -97,7 +99,7 @@ class ZigbeeMqttPlugin(MqttPlugin): """ - def __init__(self, host: str = 'localhost', port: int = 1883, base_topic: str = 'zigbee2mqtt', timeout: int = 60, + def __init__(self, host: str = 'localhost', port: int = 1883, base_topic: str = 'zigbee2mqtt', timeout: int = 10, tls_certfile: Optional[str] = None, tls_keyfile: Optional[str] = None, tls_version: Optional[str] = None, tls_ciphers: Optional[str] = None, username: Optional[str] = None, password: Optional[str] = None, **kwargs): @@ -545,7 +547,7 @@ class ZigbeeMqttPlugin(MqttPlugin): **self._mqtt_args(**kwargs))) @staticmethod - def _build_device_get_request(values: List[Dict[str, Any]]) -> dict: + def build_device_get_request(values: List[Dict[str, Any]]) -> dict: def extract_value(value: dict, root: dict): if not value.get('access', 1) & 0x1: # Property not readable @@ -553,7 +555,7 @@ class ZigbeeMqttPlugin(MqttPlugin): if 'features' not in value: if 'property' in value: - root[value['property']] = '' + root[value['property']] = 0 if value['type'] == 'numeric' else '' return if 'property' in value: @@ -602,7 +604,7 @@ class ZigbeeMqttPlugin(MqttPlugin): return {} return self.publish(topic=self._topic(device) + '/get', reply_topic=self._topic(device), - msg=self._build_device_get_request(exposes), **kwargs) + msg=self.build_device_get_request(exposes), **kwargs) # noinspection PyShadowingBuiltins,DuplicatedCode @action From 9b9334682ff212f83a3c6704f7e46eb985f605d2 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Fri, 12 Feb 2021 02:19:36 +0100 Subject: [PATCH 7/7] Still support default host/port fallback on mqtt.publish --- platypush/plugins/mqtt.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/platypush/plugins/mqtt.py b/platypush/plugins/mqtt.py index a0c53c95cc..fa1b3a9b75 100644 --- a/platypush/plugins/mqtt.py +++ b/platypush/plugins/mqtt.py @@ -135,7 +135,7 @@ class MqttPlugin(Plugin): return client @action - def publish(self, topic: str, msg: Any, host: Optional[str] = None, port: int = 1883, + def publish(self, topic: str, msg: Any, host: Optional[str] = None, port: Optional[int] = None, reply_topic: Optional[str] = None, timeout: int = 60, tls_cafile: Optional[str] = None, tls_certfile: Optional[str] = None, tls_keyfile: Optional[str] = None, tls_version: Optional[str] = None, @@ -146,8 +146,8 @@ class MqttPlugin(Plugin): :param topic: Topic/channel where the message will be delivered :param msg: Message to be sent. It can be a list, a dict, or a Message object. - :param host: MQTT broker hostname/IP. - :param port: MQTT broker port (default: 1883). + :param host: MQTT broker hostname/IP (default: default host configured on the plugin). + :param port: MQTT broker port (default: default port configured on the plugin). :param reply_topic: If a ``reply_topic`` is specified, then the action will wait for a response on this topic. :param timeout: If ``reply_topic`` is set, use this parameter to specify the maximum amount of time to wait for a response (default: 60 seconds). @@ -179,6 +179,10 @@ class MqttPlugin(Plugin): except: pass + host = host or self.host + port = port or self.port or 1883 + assert host, 'No host specified' + client = self._get_client(tls_cafile=tls_cafile, tls_certfile=tls_certfile, tls_keyfile=tls_keyfile, tls_version=tls_version, tls_ciphers=tls_ciphers, tls_insecure=tls_insecure, username=username, password=password)