From 33a1ef39e4a5b1b93fc630dd8ae08b3cde6d04a0 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Tue, 5 Sep 2023 20:58:58 +0200 Subject: [PATCH] Refactored and merged `backend.mqtt` logic into `mqtt` plugin. --- platypush/plugins/mqtt/__init__.py | 640 ++++++++++++++++++--------- platypush/plugins/mqtt/_client.py | 244 ++++++++++ platypush/plugins/mqtt/manifest.yaml | 3 +- 3 files changed, 667 insertions(+), 220 deletions(-) create mode 100644 platypush/plugins/mqtt/_client.py diff --git a/platypush/plugins/mqtt/__init__.py b/platypush/plugins/mqtt/__init__.py index 811837b8d..5043634dc 100644 --- a/platypush/plugins/mqtt/__init__.py +++ b/platypush/plugins/mqtt/__init__.py @@ -1,16 +1,25 @@ +from collections import defaultdict +import hashlib import io import json -import os import threading +from typing import Any, Dict, Iterable, Optional, IO +from typing_extensions import override -from typing import Any, Optional, IO +import paho.mqtt.client as mqtt from platypush.config import Config +from platypush.context import get_bus from platypush.message import Message -from platypush.plugins import Plugin, action +from platypush.message.event.mqtt import MQTTMessageEvent +from platypush.message.request import Request +from platypush.plugins import RunnablePlugin, action +from platypush.utils import get_message_response + +from ._client import DEFAULT_TIMEOUT, MqttCallback, MqttClient -class MqttPlugin(Plugin): +class MqttPlugin(RunnablePlugin): """ This plugin allows you to send custom message to a message queue compatible with the MQTT protocol, see https://mqtt.org/ @@ -19,253 +28,130 @@ class MqttPlugin(Plugin): * **paho-mqtt** (``pip install paho-mqtt``) + Triggers: + + * :class:`platypush.message.event.mqtt.MQTTMessageEvent` when a new + message is received on a subscribed topic. + """ def __init__( self, - host=None, - port=1883, - tls_cafile=None, - tls_certfile=None, - tls_keyfile=None, - tls_version=None, - tls_ciphers=None, - tls_insecure=False, - username=None, - password=None, - client_id=None, - timeout=None, + host: Optional[str] = None, + port: int = 1883, + topics: Optional[Iterable[str]] = None, + tls_cafile: Optional[str] = None, + tls_certfile: Optional[str] = None, + tls_keyfile: Optional[str] = None, + tls_version: Optional[str] = None, + tls_ciphers: Optional[str] = None, + tls_insecure: bool = False, + username: Optional[str] = None, + password: Optional[str] = None, + client_id: Optional[str] = None, + timeout: Optional[int] = DEFAULT_TIMEOUT, + run_topic_prefix: Optional[str] = None, + listeners: Optional[Iterable[dict]] = None, **kwargs, ): """ :param host: If set, MQTT messages will by default routed to this host unless overridden in `send_message` (default: None) - :type host: str - :param port: If a default host is set, specify the listen port (default: 1883) - :type port: int - + :param topics: If a default ``host`` is specified, then this list will + include a default list of topics that should be subscribed on that + broker at startup. :param tls_cafile: If a default host is set and requires TLS/SSL, specify the certificate authority file (default: None) - :type tls_cafile: str - :param tls_certfile: If a default host is set and requires TLS/SSL, specify the certificate file (default: None) - :type tls_certfile: str - :param tls_keyfile: If a default host is set and requires TLS/SSL, specify the key file (default: None) - :type tls_keyfile: str - :param tls_version: If TLS/SSL is enabled on the MQTT server and it requires a certain TLS version, specify it here (default: None). Supported versions: ``tls`` (automatic), ``tlsv1``, ``tlsv1.1``, ``tlsv1.2``. - :type tls_version: str - :param tls_ciphers: If a default host is set and requires TLS/SSL, specify the supported ciphers (default: None) - :type tls_ciphers: str - :param tls_insecure: Set to True to ignore TLS insecure warnings (default: False). - :type tls_insecure: bool - :param username: If a default host is set and requires user authentication, specify the username ciphers (default: None) - :type username: str - :param password: If a default host is set and requires user authentication, specify the password ciphers (default: None) - :type password: str - :param client_id: ID used to identify the client on the MQTT server (default: None). If None is specified then ``Config.get('device_id')`` will be used. - :type client_id: str + :param timeout: Client timeout in seconds (default: 30 seconds). + :param run_topic_prefix: If specified, the MQTT plugin will listen for + messages on a topic in the format `{run_topic_prefix}/{device_id}. + When a message is received, it will interpret it as a JSON request + to execute, in the format + ``{"type": "request", "action": "plugin.action", "args": {...}}``. + + .. warning:: This parameter is mostly kept for backwards + compatibility, but you should avoid it - unless the MQTT broker + is on a personal safe network that you own, or it requires + user authentication and it uses SSL. The reason is that the + messages received on this topic won't be subject to token + verification, allowing unauthenticated arbitrary command + execution on the target host. If you still want the ability of + running commands remotely over an MQTT broker, then you may + consider creating a dedicated topic listener with an attached + event hook on + :class:`platypush.message.event.mqtt.MQTTMessageEvent`. The + hook can implement whichever authentication logic you like. + + :param listeners: If specified, the MQTT plugin will listen for + messages on these topics. Use this parameter if you also want to + listen on other MQTT brokers other than the primary one. This + parameter supports a list of maps, where each item supports the + same arguments passed to the main configuration (host, port, topic, + password etc.). If host/port are omitted, then the host/port value + from the plugin configuration will be used. If any of the other + fields are omitted, then their default value will be used (usually + null). Example: + + .. code-block:: yaml + + listeners: + # This listener use the default configured host/port + - topics: + - topic1 + - topic2 + - topic3 + + # This will use a custom MQTT broker host + - host: sensors + port: 11883 + username: myuser + password: secret + topics: + - topic4 + - topic5 - :param timeout: Client timeout in seconds (default: None). - :type timeout: int """ - super().__init__(**kwargs) - self.host = host - self.port = port - self.username = username - self.password = password - self.client_id = client_id or Config.get('device_id') - self.tls_cafile = self._expandpath(tls_cafile) if tls_cafile else None - self.tls_certfile = self._expandpath(tls_certfile) if tls_certfile else None - self.tls_keyfile = self._expandpath(tls_keyfile) if tls_keyfile else None - self.tls_version = self.get_tls_version(tls_version) - self.tls_insecure = tls_insecure - self.tls_ciphers = tls_ciphers - self.timeout = timeout + self.client_id = client_id or str(Config.get('device_id')) + self.run_topic = ( + f'{run_topic_prefix}/{Config.get("device_id")}' + if run_topic_prefix + else None + ) - @staticmethod - def get_tls_version(version: Optional[str] = None): - import ssl - - if not version: - return None - - if isinstance(version, type(ssl.PROTOCOL_TLS)): - return version - - if isinstance(version, str): - version = version.lower() - - if version == 'tls': - return ssl.PROTOCOL_TLS - if version == 'tlsv1': - return ssl.PROTOCOL_TLSv1 - if version == 'tlsv1.1': - return ssl.PROTOCOL_TLSv1_1 - if version == 'tlsv1.2': - return ssl.PROTOCOL_TLSv1_2 - - assert f'Unrecognized TLS version: {version}' - - def _mqtt_args(self, **kwargs): - return { - 'host': kwargs.get('host', self.host), - 'port': kwargs.get('port', self.port), - 'timeout': kwargs.get('timeout', self.timeout), - 'tls_certfile': kwargs.get('tls_certfile', self.tls_certfile), - 'tls_keyfile': kwargs.get('tls_keyfile', self.tls_keyfile), - 'tls_version': kwargs.get('tls_version', self.tls_version), - 'tls_ciphers': kwargs.get('tls_ciphers', self.tls_ciphers), - 'username': kwargs.get('username', self.username), - 'password': kwargs.get('password', self.password), - } - - @staticmethod - def _expandpath(path: Optional[str] = None) -> Optional[str]: - return os.path.abspath(os.path.expanduser(path)) if path else None - - def _get_client( - self, - tls_cafile: Optional[str] = None, - tls_certfile: Optional[str] = None, - tls_keyfile: Optional[str] = None, - tls_version: Optional[str] = None, - tls_ciphers: Optional[str] = None, - tls_insecure: Optional[bool] = None, - username: Optional[str] = None, - password: Optional[str] = None, - ): - from paho.mqtt.client import Client - - tls_cafile = self._expandpath(tls_cafile or self.tls_cafile) - tls_certfile = self._expandpath(tls_certfile or self.tls_certfile) - tls_keyfile = self._expandpath(tls_keyfile or self.tls_keyfile) - tls_ciphers = tls_ciphers or self.tls_ciphers - username = username or self.username - password = password or self.password - - tls_version = tls_version or self.tls_version # type: ignore[reportGeneralTypeIssues] - if tls_version: - tls_version = self.get_tls_version(tls_version) # type: ignore[reportGeneralTypeIssues] - if tls_insecure is None: - tls_insecure = self.tls_insecure - - client = Client() - - if username and password: - client.username_pw_set(username, password) - if tls_cafile: - client.tls_set( - ca_certs=tls_cafile, - certfile=tls_certfile, - keyfile=tls_keyfile, - tls_version=tls_version, # type: ignore[reportGeneralTypeIssues] - ciphers=tls_ciphers, - ) - - client.tls_insecure_set(tls_insecure) - - return client - - @action - def publish( - self, - topic: str, - msg: Any, - host: Optional[str] = None, - port: Optional[int] = None, - reply_topic: Optional[str] = None, - timeout: int = 60, - tls_cafile: Optional[str] = None, - tls_certfile: Optional[str] = None, - tls_keyfile: Optional[str] = None, - tls_version: Optional[str] = None, - tls_ciphers: Optional[str] = None, - tls_insecure: Optional[bool] = None, - username: Optional[str] = None, - password: Optional[str] = None, - qos: int = 0, - ): - """ - Sends a message to a topic. - - :param topic: Topic/channel where the message will be delivered - :param msg: Message to be sent. It can be a list, a dict, or a Message - object. - :param host: MQTT broker hostname/IP (default: default host configured - on the plugin). - :param port: MQTT broker port (default: default port configured on the - plugin). - :param reply_topic: If a ``reply_topic`` is specified, then the action - will wait for a response on this topic. - :param timeout: If ``reply_topic`` is set, use this parameter to - specify the maximum amount of time to wait for a response (default: - 60 seconds). - :param tls_cafile: If TLS/SSL is enabled on the MQTT server and the - certificate requires a certificate authority to authenticate it, - `ssl_cafile` will point to the provided ca.crt file (default: - None). - :param tls_certfile: If TLS/SSL is enabled on the MQTT server and a - client certificate it required, specify it here (default: None). - :param tls_keyfile: If TLS/SSL is enabled on the MQTT server and a - client certificate key it required, specify it here (default: - None). - :param tls_version: If TLS/SSL is enabled on the MQTT server and it - requires a certain TLS version, specify it here (default: None). - Supported versions: ``tls`` (automatic), ``tlsv1``, ``tlsv1.1``, - ``tlsv1.2``. - :param tls_insecure: Set to True to ignore TLS insecure warnings - (default: False). - :param tls_ciphers: If TLS/SSL is enabled on the MQTT server and an - explicit list of supported ciphers is required, specify it here - (default: None). - :param username: Specify it if the MQTT server requires authentication - (default: None). - :param password: Specify it if the MQTT server requires authentication - (default: None). - :param qos: Quality of Service (_QoS_) for the message - see `MQTT QoS - `_ - (default: 0). - """ - response_buffer = io.BytesIO() - client = None - - try: - # Try to parse it as a platypush message or dump it to JSON from a dict/list - if isinstance(msg, (dict, list)): - msg = json.dumps(msg) - - try: - msg = Message.build(json.loads(msg)) - except Exception as e: - self.logger.debug('Not a valid JSON: %s', e) - - host = host or self.host - port = port or self.port or 1883 - assert host, 'No host specified' - - client = self._get_client( + self._listeners_lock = defaultdict(threading.RLock) + self.listeners: Dict[str, MqttClient] = {} # client_id -> MqttClient map + self.default_listener = ( + self._get_client( + host=host, + port=port, + topics=( + (tuple(topics) if topics else ()) + + ((self.run_topic,) if self.run_topic else ()) + ), + on_message=self.on_mqtt_message(), tls_cafile=tls_cafile, tls_certfile=tls_certfile, tls_keyfile=tls_keyfile, @@ -274,11 +160,257 @@ class MqttPlugin(Plugin): tls_insecure=tls_insecure, username=username, password=password, + client_id=client_id, + timeout=timeout, + ) + if host + else None + ) + + for listener in listeners or []: + self._get_client( + **self._mqtt_args(on_message=self.on_mqtt_message(), **listener) ) - client.connect(host, port, keepalive=timeout) + def _get_client_id( + self, + host: str, + port: int, + client_id: Optional[str] = None, + on_message: Optional[MqttCallback] = None, + topics: Iterable[str] = (), + **_, + ) -> str: + """ + Calculates a unique client ID given an MQTT configuration. + """ + client_id = client_id or self.client_id + client_hash = hashlib.sha1( + '|'.join( + [ + host, + str(port), + json.dumps(sorted(topics)), + str(id(on_message)), + ] + ).encode() + ).hexdigest() + + return f'{client_id}-{client_hash}' + + def _mqtt_args( + self, + host: Optional[str] = None, + port: int = 1883, + timeout: Optional[int] = DEFAULT_TIMEOUT, + topics: Iterable[str] = (), + **kwargs, + ): + """ + :return: An MQTT configuration mapping that uses either the specified + arguments (if host is specified), or falls back to the default + configurated arguments. + """ + default_conf = ( + self.default_listener.configuration if self.default_listener else {} + ) + + if not host: + assert ( + self.default_listener + ), 'No host specified and no configured default host' + + return { + **default_conf, + 'topics': (*self.default_listener.topics, *topics), + } + + return { + 'host': host, + 'port': port, + 'timeout': timeout or default_conf.get('timeout'), + 'topics': topics, + **kwargs, + } + + def on_mqtt_message(self) -> MqttCallback: + """ + Default MQTT message handler. It forwards a + :class:`platypush.message.event.mqtt.MQTTMessageEvent` event to the + bus. + """ + + def handler(client: MqttClient, _, msg: mqtt.MQTTMessage): + data = msg.payload + try: + data = data.decode('utf-8') + data = json.loads(data) + except (TypeError, AttributeError, ValueError): + # Not a serialized JSON + pass + + if self.default_listener and msg.topic == self.run_topic: + try: + app_msg = Message.build(data) + self.on_exec_message(client, app_msg) + except Exception as e: + self.logger.warning( + 'Message execution error: %s: %s', type(e).__name__, str(e) + ) + else: + get_bus().post( + MQTTMessageEvent( + host=client.host, port=client.port, topic=msg.topic, msg=data + ) + ) + + return handler + + def on_exec_message(self, client: MqttClient, msg): + """ + Message handler for (legacy) application requests over MQTT. + """ + + def response_thread(req: Request): + """ + A separate thread to handle the response to a request. + """ + if not self.run_topic: + return + + response = get_message_response(req) + if not response: + return + + response_topic = f'{self.run_topic}/responses/{req.id}' + self.logger.info( + 'Processing response on the MQTT topic %s: %s', + response_topic, + response, + ) + + client.publish(payload=str(response), topic=response_topic) + + self.logger.info('Received message on the MQTT backend: %s', msg) + + try: + get_bus().post(msg) + except Exception as e: + self.logger.exception(e) + return + + if isinstance(msg, Request): + threading.Thread( + target=response_thread, + name='MQTTProcessorResponseThread', + args=(msg,), + ).start() + + def _get_client( + self, + host: Optional[str] = None, + port: int = 1883, + topics: Iterable[str] = (), + client_id: Optional[str] = None, + on_message: Optional[MqttCallback] = None, + **kwargs, + ) -> MqttClient: + """ + :return: A :class:`platypush.message.event.mqtt.MqttClient` instance. + It will return the existing client with the given inferred ID if it + already exists, or it will register a new one. + """ + if host: + kwargs['host'] = host + kwargs['port'] = port + else: + assert ( + self.default_listener + ), 'No host specified and no configured default host' + kwargs = self.default_listener.configuration + + kwargs.update( + { + 'topics': topics, + 'on_message': on_message, + 'client_id': client_id, + } + ) + + on_message = on_message or self.on_mqtt_message() + client_id = self._get_client_id( + host=kwargs['host'], + port=kwargs['port'], + client_id=client_id, + on_message=on_message, + topics=topics, + ) + + kwargs['client_id'] = client_id + with self._listeners_lock[client_id]: + client = self.listeners.get(client_id) + if not (client and client.is_alive()): + client = self.listeners[ + client_id + ] = MqttClient( # pylint: disable=E1125 + **kwargs + ) + + if topics: + client.subscribe(*topics) + + return client + + @action + def publish( + self, + topic: str, + msg: Any, + qos: int = 0, + reply_topic: Optional[str] = None, + **mqtt_kwargs, + ): + """ + Sends a message to a topic. + + :param topic: Topic/channel where the message will be delivered + :param msg: Message to be sent. It can be a list, a dict, or a Message + object. + :param qos: Quality of Service (_QoS_) for the message - see `MQTT QoS + `_ + (default: 0). + :param reply_topic: If a ``reply_topic`` is specified, then the action + will wait for a response on this topic. + :param mqtt_kwargs: MQTT broker configuration (host, port, username, + password etc.). See :meth:`.__init__` parameters. + """ + response_buffer = io.BytesIO() + client = None + + try: + # Try to parse it as a Platypush message or dump it to JSON from a dict/list + if isinstance(msg, (dict, list)): + msg = json.dumps(msg) + + try: + msg = Message.build(json.loads(msg)) + except (KeyError, TypeError, ValueError): + pass + + client = self._get_client(**mqtt_kwargs) + client.connect() response_received = threading.Event() + # If it's a request, then wait for the response + if ( + isinstance(msg, Request) + and self.default_listener + and client.host == self.default_listener.host + and self.run_topic + and topic == self.run_topic + ): + reply_topic = f'{self.run_topic}/responses/{msg.id}' + if reply_topic: client.on_message = self._response_callback( reply_topic=reply_topic, @@ -289,12 +421,13 @@ class MqttPlugin(Plugin): client.publish(topic, str(msg), qos=qos) if not reply_topic: - return + return None client.loop_start() - ok = response_received.wait(timeout=timeout) + ok = response_received.wait(timeout=client.timeout) if not ok: raise TimeoutError('Response timed out') + return response_buffer.getvalue() finally: response_buffer.close() @@ -303,12 +436,50 @@ class MqttPlugin(Plugin): try: client.loop_stop() except Exception as e: - self.logger.warning('Could not stop client loop: %s', e) + self.logger.warning( + 'Could not stop client loop: %s: %s', type(e).__name__, e + ) client.disconnect() + @action + def subscribe(self, topic: str, **mqtt_kwargs): + """ + Programmatically subscribe to a topic on an MQTT broker. + + Messages received on this topic will trigger a + :class:`platypush.message.event.mqtt.MQTTMessageEvent` event that you + can subscribe to. + + :param topic: Topic to subscribe to. + :param mqtt_kwargs: MQTT broker configuration (host, port, username, + password etc.). See :meth:`.__init__` parameters. + """ + client = self._get_client( + topics=(topic,), on_message=self.on_mqtt_message(), **mqtt_kwargs + ) + + if not client.is_alive(): + client.start() + + @action + def unsubscribe(self, topic: str, **mqtt_kwargs): + """ + Programmatically unsubscribe from a topic on an MQTT broker. + + :param topic: Topic to unsubscribe from. + :param mqtt_kwargs: MQTT broker configuration (host, port, username, + password etc.). See :meth:`.__init__` parameters. + """ + self._get_client(**mqtt_kwargs).unsubscribe(topic) + @staticmethod def _response_callback(reply_topic: str, event: threading.Event, buffer: IO[bytes]): + """ + A response callback that writes the response to an IOBuffer and stops + the client loop. + """ + def on_message(client, _, msg): if msg.topic != reply_topic: return @@ -322,9 +493,40 @@ class MqttPlugin(Plugin): @action def send_message(self, *args, **kwargs): """ - Alias for :meth:`platypush.plugins.mqtt.MqttPlugin.publish`. + Legacy alias for :meth:`platypush.plugins.mqtt.MqttPlugin.publish`. """ return self.publish(*args, **kwargs) + @override + def main(self): + if self.run_topic: + self.logger.warning( + 'The MQTT integration is listening for commands on the topic %s.\n' + 'This approach is unsafe, as it allows any client to run unauthenticated requests.\n' + 'Please only enable it in test/trusted environments.', + self.run_topic, + ) + + for listener in self.listeners.values(): + listener.start() + + self.wait_stop() + + @override + def stop(self): + """ + Disconnect all the clients upon plugin stop. + """ + for listener in self.listeners.values(): + listener.stop() + + super().stop() + + for listener in self.listeners.values(): + try: + listener.join(timeout=1) + except Exception: + pass + # vim:sw=4:ts=4:et: diff --git a/platypush/plugins/mqtt/_client.py b/platypush/plugins/mqtt/_client.py new file mode 100644 index 000000000..5d813bf7b --- /dev/null +++ b/platypush/plugins/mqtt/_client.py @@ -0,0 +1,244 @@ +from enum import IntEnum +import logging +import os +import threading +from typing import Any, Callable, Dict, Final, Iterable, Optional, Union + +import paho.mqtt.client as mqtt + +from platypush.config import Config + +MqttCallback = Callable[["MqttClient", Any, mqtt.MQTTMessage], Any] +DEFAULT_TIMEOUT: Final[int] = 30 + + +class MqttClient(mqtt.Client, threading.Thread): + """ + Wrapper class for an MQTT client executed in a separate thread. + """ + + def __init__( + self, + *args, + host: str, + port: int, + client_id: str, + topics: Iterable[str] = (), + on_message: Optional[MqttCallback] = None, + username: Optional[str] = None, + password: Optional[str] = None, + tls_cafile: Optional[str] = None, + tls_certfile: Optional[str] = None, + tls_keyfile: Optional[str] = None, + tls_version: Optional[Union[str, IntEnum]] = None, + tls_ciphers: Optional[str] = None, + tls_insecure: bool = False, + timeout: int = DEFAULT_TIMEOUT, + **kwargs, + ): + self.client_id = client_id or str(Config.get('device_id')) + mqtt.Client.__init__(self, *args, client_id=self.client_id, **kwargs) + threading.Thread.__init__(self, name=f'MQTTClient:{self.client_id}') + + self.logger = logging.getLogger(self.__class__.__name__) + self.host = host + self.port = port + self.tls_cafile = self._expandpath(tls_cafile) + self.tls_certfile = self._expandpath(tls_certfile) + self.tls_keyfile = self._expandpath(tls_keyfile) + self.tls_version = self._get_tls_version(tls_version) + self.tls_ciphers = self._expandpath(tls_ciphers) + self.tls_insecure = tls_insecure + self.username = username + self.password = password + self.topics = set(topics or []) + self.timeout = timeout + self.on_connect = self.connect_hndl() + self.on_disconnect = self.disconnect_hndl() + + if on_message: + self.on_message = on_message # type: ignore + + if username and password: + self.username_pw_set(username, password) + + if tls_cafile: + self.tls_set( + ca_certs=self.tls_cafile, + certfile=self.tls_certfile, + keyfile=self.tls_keyfile, + tls_version=self.tls_version, + ciphers=self.tls_ciphers, + ) + + self.tls_insecure_set(self.tls_insecure) + + self._running = False + self._stop_scheduled = False + + @staticmethod + def _expandpath(path: Optional[str] = None) -> Optional[str]: + """ + Utility method to expand a path string. + """ + return os.path.abspath(os.path.expanduser(path)) if path else None + + @staticmethod + def _get_tls_version(version: Optional[Union[str, IntEnum]] = None): + """ + A utility method that normalizes an SSL version string or enum to a + standard ``_SSLMethod`` enum. + """ + import ssl + + if not version: + return None + + if isinstance(version, type(ssl.PROTOCOL_TLS)): + return version + + if isinstance(version, str): + version = version.lower() + + if version == 'tls': + return ssl.PROTOCOL_TLS + if version == 'tlsv1': + return ssl.PROTOCOL_TLSv1 + if version == 'tlsv1.1': + return ssl.PROTOCOL_TLSv1_1 + if version == 'tlsv1.2': + return ssl.PROTOCOL_TLSv1_2 + + raise AssertionError(f'Unrecognized TLS version: {version}') + + def connect( + self, + *args, + host: Optional[str] = None, + port: Optional[int] = None, + keepalive: Optional[int] = None, + **kwargs, + ): + """ + Overrides the default connect method. + """ + if not self.is_connected(): + self.logger.debug( + 'Connecting to MQTT broker %s:%d, client_id=%s...', + self.host, + self.port, + self.client_id, + ) + + return super().connect( + host=host or self.host, + port=port or self.port, + keepalive=keepalive or self.timeout, + *args, + **kwargs, + ) + + return None + + @property + def configuration(self) -> Dict[str, Any]: + """ + :return: The configuration of the client. + """ + return { + 'host': self.host, + 'port': self.port, + 'topics': self.topics, + 'on_message': self.on_message, + 'username': self.username, + 'password': self.password, + 'client_id': self.client_id, + 'tls_cafile': self.tls_cafile, + 'tls_certfile': self.tls_certfile, + 'tls_keyfile': self.tls_keyfile, + 'tls_version': self.tls_version, + 'tls_ciphers': self.tls_ciphers, + 'tls_insecure': self.tls_insecure, + 'timeout': self.timeout, + } + + def subscribe(self, *topics, **kwargs): + """ + Client subscription handler. + """ + if not topics: + topics = self.topics + + self.topics.update(topics) + for topic in topics: + super().subscribe(topic, **kwargs) + + def unsubscribe(self, *topics, **kwargs): + """ + Client unsubscribe handler. + """ + if not topics: + topics = self.topics + + for topic in topics: + if topic not in self.topics: + self.logger.info('The topic %s is not subscribed', topic) + continue + + super().unsubscribe(topic, **kwargs) + self.topics.remove(topic) + + def connect_hndl(self): + """ + When the client connects, subscribe to all the registered topics. + """ + + def handler(*_, **__): + self.logger.debug( + 'Connected to MQTT broker %s:%d, client_id=%s', + self.host, + self.port, + self.client_id, + ) + self.subscribe() + + return handler + + def disconnect_hndl(self): + """ + Notifies the client disconnection. + """ + + def handler(*_, **__): + self.logger.debug( + 'Disconnected from MQTT broker %s:%d, client_id=%s', + self.host, + self.port, + self.client_id, + ) + + return handler + + def run(self): + """ + Connects to the MQTT server, subscribes to all the registered topics + and listens for messages. + """ + super().run() + self.connect() + self._running = True + self.loop_forever() + + def stop(self): + """ + The stop method schedules the stop and disconnects the client. + """ + if not self.is_alive(): + return + + self._stop_scheduled = True + self.disconnect() + self._running = False + + +# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/mqtt/manifest.yaml b/platypush/plugins/mqtt/manifest.yaml index 6a49eca3a..9eb63c375 100644 --- a/platypush/plugins/mqtt/manifest.yaml +++ b/platypush/plugins/mqtt/manifest.yaml @@ -1,5 +1,6 @@ manifest: - events: {} + events: + - platypush.message.event.mqtt.MQTTMessageEvent install: apk: - py3-paho-mqtt