Refactored and merged backend.mqtt logic into mqtt plugin.

This commit is contained in:
Fabio Manganiello 2023-09-05 20:58:58 +02:00
parent b746d0b402
commit 33a1ef39e4
Signed by: blacklight
GPG key ID: D90FBA7F76362774
3 changed files with 667 additions and 220 deletions

View file

@ -1,16 +1,25 @@
from collections import defaultdict
import hashlib
import io
import json
import os
import threading
from typing import Any, Dict, Iterable, Optional, IO
from typing_extensions import override
from typing import Any, Optional, IO
import paho.mqtt.client as mqtt
from platypush.config import Config
from platypush.context import get_bus
from platypush.message import Message
from platypush.plugins import Plugin, action
from platypush.message.event.mqtt import MQTTMessageEvent
from platypush.message.request import Request
from platypush.plugins import RunnablePlugin, action
from platypush.utils import get_message_response
from ._client import DEFAULT_TIMEOUT, MqttCallback, MqttClient
class MqttPlugin(Plugin):
class MqttPlugin(RunnablePlugin):
"""
This plugin allows you to send custom message to a message queue compatible
with the MQTT protocol, see https://mqtt.org/
@ -19,253 +28,130 @@ class MqttPlugin(Plugin):
* **paho-mqtt** (``pip install paho-mqtt``)
Triggers:
* :class:`platypush.message.event.mqtt.MQTTMessageEvent` when a new
message is received on a subscribed topic.
"""
def __init__(
self,
host=None,
port=1883,
tls_cafile=None,
tls_certfile=None,
tls_keyfile=None,
tls_version=None,
tls_ciphers=None,
tls_insecure=False,
username=None,
password=None,
client_id=None,
timeout=None,
host: Optional[str] = None,
port: int = 1883,
topics: Optional[Iterable[str]] = None,
tls_cafile: Optional[str] = None,
tls_certfile: Optional[str] = None,
tls_keyfile: Optional[str] = None,
tls_version: Optional[str] = None,
tls_ciphers: Optional[str] = None,
tls_insecure: bool = False,
username: Optional[str] = None,
password: Optional[str] = None,
client_id: Optional[str] = None,
timeout: Optional[int] = DEFAULT_TIMEOUT,
run_topic_prefix: Optional[str] = None,
listeners: Optional[Iterable[dict]] = None,
**kwargs,
):
"""
:param host: If set, MQTT messages will by default routed to this host
unless overridden in `send_message` (default: None)
:type host: str
:param port: If a default host is set, specify the listen port
(default: 1883)
:type port: int
:param topics: If a default ``host`` is specified, then this list will
include a default list of topics that should be subscribed on that
broker at startup.
:param tls_cafile: If a default host is set and requires TLS/SSL,
specify the certificate authority file (default: None)
:type tls_cafile: str
:param tls_certfile: If a default host is set and requires TLS/SSL,
specify the certificate file (default: None)
:type tls_certfile: str
:param tls_keyfile: If a default host is set and requires TLS/SSL,
specify the key file (default: None)
:type tls_keyfile: str
:param tls_version: If TLS/SSL is enabled on the MQTT server and it
requires a certain TLS version, specify it here (default: None).
Supported versions: ``tls`` (automatic), ``tlsv1``, ``tlsv1.1``,
``tlsv1.2``.
:type tls_version: str
:param tls_ciphers: If a default host is set and requires TLS/SSL,
specify the supported ciphers (default: None)
:type tls_ciphers: str
:param tls_insecure: Set to True to ignore TLS insecure warnings
(default: False).
:type tls_insecure: bool
:param username: If a default host is set and requires user
authentication, specify the username ciphers (default: None)
:type username: str
:param password: If a default host is set and requires user
authentication, specify the password ciphers (default: None)
:type password: str
:param client_id: ID used to identify the client on the MQTT server
(default: None). If None is specified then
``Config.get('device_id')`` will be used.
:type client_id: str
:param timeout: Client timeout in seconds (default: 30 seconds).
:param run_topic_prefix: If specified, the MQTT plugin will listen for
messages on a topic in the format `{run_topic_prefix}/{device_id}.
When a message is received, it will interpret it as a JSON request
to execute, in the format
``{"type": "request", "action": "plugin.action", "args": {...}}``.
.. warning:: This parameter is mostly kept for backwards
compatibility, but you should avoid it - unless the MQTT broker
is on a personal safe network that you own, or it requires
user authentication and it uses SSL. The reason is that the
messages received on this topic won't be subject to token
verification, allowing unauthenticated arbitrary command
execution on the target host. If you still want the ability of
running commands remotely over an MQTT broker, then you may
consider creating a dedicated topic listener with an attached
event hook on
:class:`platypush.message.event.mqtt.MQTTMessageEvent`. The
hook can implement whichever authentication logic you like.
:param listeners: If specified, the MQTT plugin will listen for
messages on these topics. Use this parameter if you also want to
listen on other MQTT brokers other than the primary one. This
parameter supports a list of maps, where each item supports the
same arguments passed to the main configuration (host, port, topic,
password etc.). If host/port are omitted, then the host/port value
from the plugin configuration will be used. If any of the other
fields are omitted, then their default value will be used (usually
null). Example:
.. code-block:: yaml
listeners:
# This listener use the default configured host/port
- topics:
- topic1
- topic2
- topic3
# This will use a custom MQTT broker host
- host: sensors
port: 11883
username: myuser
password: secret
topics:
- topic4
- topic5
:param timeout: Client timeout in seconds (default: None).
:type timeout: int
"""
super().__init__(**kwargs)
self.host = host
self.port = port
self.username = username
self.password = password
self.client_id = client_id or Config.get('device_id')
self.tls_cafile = self._expandpath(tls_cafile) if tls_cafile else None
self.tls_certfile = self._expandpath(tls_certfile) if tls_certfile else None
self.tls_keyfile = self._expandpath(tls_keyfile) if tls_keyfile else None
self.tls_version = self.get_tls_version(tls_version)
self.tls_insecure = tls_insecure
self.tls_ciphers = tls_ciphers
self.timeout = timeout
@staticmethod
def get_tls_version(version: Optional[str] = None):
import ssl
if not version:
return None
if isinstance(version, type(ssl.PROTOCOL_TLS)):
return version
if isinstance(version, str):
version = version.lower()
if version == 'tls':
return ssl.PROTOCOL_TLS
if version == 'tlsv1':
return ssl.PROTOCOL_TLSv1
if version == 'tlsv1.1':
return ssl.PROTOCOL_TLSv1_1
if version == 'tlsv1.2':
return ssl.PROTOCOL_TLSv1_2
assert f'Unrecognized TLS version: {version}'
def _mqtt_args(self, **kwargs):
return {
'host': kwargs.get('host', self.host),
'port': kwargs.get('port', self.port),
'timeout': kwargs.get('timeout', self.timeout),
'tls_certfile': kwargs.get('tls_certfile', self.tls_certfile),
'tls_keyfile': kwargs.get('tls_keyfile', self.tls_keyfile),
'tls_version': kwargs.get('tls_version', self.tls_version),
'tls_ciphers': kwargs.get('tls_ciphers', self.tls_ciphers),
'username': kwargs.get('username', self.username),
'password': kwargs.get('password', self.password),
}
@staticmethod
def _expandpath(path: Optional[str] = None) -> Optional[str]:
return os.path.abspath(os.path.expanduser(path)) if path else None
def _get_client(
self,
tls_cafile: Optional[str] = None,
tls_certfile: Optional[str] = None,
tls_keyfile: Optional[str] = None,
tls_version: Optional[str] = None,
tls_ciphers: Optional[str] = None,
tls_insecure: Optional[bool] = None,
username: Optional[str] = None,
password: Optional[str] = None,
):
from paho.mqtt.client import Client
tls_cafile = self._expandpath(tls_cafile or self.tls_cafile)
tls_certfile = self._expandpath(tls_certfile or self.tls_certfile)
tls_keyfile = self._expandpath(tls_keyfile or self.tls_keyfile)
tls_ciphers = tls_ciphers or self.tls_ciphers
username = username or self.username
password = password or self.password
tls_version = tls_version or self.tls_version # type: ignore[reportGeneralTypeIssues]
if tls_version:
tls_version = self.get_tls_version(tls_version) # type: ignore[reportGeneralTypeIssues]
if tls_insecure is None:
tls_insecure = self.tls_insecure
client = Client()
if username and password:
client.username_pw_set(username, password)
if tls_cafile:
client.tls_set(
ca_certs=tls_cafile,
certfile=tls_certfile,
keyfile=tls_keyfile,
tls_version=tls_version, # type: ignore[reportGeneralTypeIssues]
ciphers=tls_ciphers,
self.client_id = client_id or str(Config.get('device_id'))
self.run_topic = (
f'{run_topic_prefix}/{Config.get("device_id")}'
if run_topic_prefix
else None
)
client.tls_insecure_set(tls_insecure)
return client
@action
def publish(
self,
topic: str,
msg: Any,
host: Optional[str] = None,
port: Optional[int] = None,
reply_topic: Optional[str] = None,
timeout: int = 60,
tls_cafile: Optional[str] = None,
tls_certfile: Optional[str] = None,
tls_keyfile: Optional[str] = None,
tls_version: Optional[str] = None,
tls_ciphers: Optional[str] = None,
tls_insecure: Optional[bool] = None,
username: Optional[str] = None,
password: Optional[str] = None,
qos: int = 0,
):
"""
Sends a message to a topic.
:param topic: Topic/channel where the message will be delivered
:param msg: Message to be sent. It can be a list, a dict, or a Message
object.
:param host: MQTT broker hostname/IP (default: default host configured
on the plugin).
:param port: MQTT broker port (default: default port configured on the
plugin).
:param reply_topic: If a ``reply_topic`` is specified, then the action
will wait for a response on this topic.
:param timeout: If ``reply_topic`` is set, use this parameter to
specify the maximum amount of time to wait for a response (default:
60 seconds).
:param tls_cafile: If TLS/SSL is enabled on the MQTT server and the
certificate requires a certificate authority to authenticate it,
`ssl_cafile` will point to the provided ca.crt file (default:
None).
:param tls_certfile: If TLS/SSL is enabled on the MQTT server and a
client certificate it required, specify it here (default: None).
:param tls_keyfile: If TLS/SSL is enabled on the MQTT server and a
client certificate key it required, specify it here (default:
None).
:param tls_version: If TLS/SSL is enabled on the MQTT server and it
requires a certain TLS version, specify it here (default: None).
Supported versions: ``tls`` (automatic), ``tlsv1``, ``tlsv1.1``,
``tlsv1.2``.
:param tls_insecure: Set to True to ignore TLS insecure warnings
(default: False).
:param tls_ciphers: If TLS/SSL is enabled on the MQTT server and an
explicit list of supported ciphers is required, specify it here
(default: None).
:param username: Specify it if the MQTT server requires authentication
(default: None).
:param password: Specify it if the MQTT server requires authentication
(default: None).
:param qos: Quality of Service (_QoS_) for the message - see `MQTT QoS
<https://assetwolf.com/learn/mqtt-qos-understanding-quality-of-service>`_
(default: 0).
"""
response_buffer = io.BytesIO()
client = None
try:
# Try to parse it as a platypush message or dump it to JSON from a dict/list
if isinstance(msg, (dict, list)):
msg = json.dumps(msg)
try:
msg = Message.build(json.loads(msg))
except Exception as e:
self.logger.debug('Not a valid JSON: %s', e)
host = host or self.host
port = port or self.port or 1883
assert host, 'No host specified'
client = self._get_client(
self._listeners_lock = defaultdict(threading.RLock)
self.listeners: Dict[str, MqttClient] = {} # client_id -> MqttClient map
self.default_listener = (
self._get_client(
host=host,
port=port,
topics=(
(tuple(topics) if topics else ())
+ ((self.run_topic,) if self.run_topic else ())
),
on_message=self.on_mqtt_message(),
tls_cafile=tls_cafile,
tls_certfile=tls_certfile,
tls_keyfile=tls_keyfile,
@ -274,11 +160,257 @@ class MqttPlugin(Plugin):
tls_insecure=tls_insecure,
username=username,
password=password,
client_id=client_id,
timeout=timeout,
)
if host
else None
)
client.connect(host, port, keepalive=timeout)
for listener in listeners or []:
self._get_client(
**self._mqtt_args(on_message=self.on_mqtt_message(), **listener)
)
def _get_client_id(
self,
host: str,
port: int,
client_id: Optional[str] = None,
on_message: Optional[MqttCallback] = None,
topics: Iterable[str] = (),
**_,
) -> str:
"""
Calculates a unique client ID given an MQTT configuration.
"""
client_id = client_id or self.client_id
client_hash = hashlib.sha1(
'|'.join(
[
host,
str(port),
json.dumps(sorted(topics)),
str(id(on_message)),
]
).encode()
).hexdigest()
return f'{client_id}-{client_hash}'
def _mqtt_args(
self,
host: Optional[str] = None,
port: int = 1883,
timeout: Optional[int] = DEFAULT_TIMEOUT,
topics: Iterable[str] = (),
**kwargs,
):
"""
:return: An MQTT configuration mapping that uses either the specified
arguments (if host is specified), or falls back to the default
configurated arguments.
"""
default_conf = (
self.default_listener.configuration if self.default_listener else {}
)
if not host:
assert (
self.default_listener
), 'No host specified and no configured default host'
return {
**default_conf,
'topics': (*self.default_listener.topics, *topics),
}
return {
'host': host,
'port': port,
'timeout': timeout or default_conf.get('timeout'),
'topics': topics,
**kwargs,
}
def on_mqtt_message(self) -> MqttCallback:
"""
Default MQTT message handler. It forwards a
:class:`platypush.message.event.mqtt.MQTTMessageEvent` event to the
bus.
"""
def handler(client: MqttClient, _, msg: mqtt.MQTTMessage):
data = msg.payload
try:
data = data.decode('utf-8')
data = json.loads(data)
except (TypeError, AttributeError, ValueError):
# Not a serialized JSON
pass
if self.default_listener and msg.topic == self.run_topic:
try:
app_msg = Message.build(data)
self.on_exec_message(client, app_msg)
except Exception as e:
self.logger.warning(
'Message execution error: %s: %s', type(e).__name__, str(e)
)
else:
get_bus().post(
MQTTMessageEvent(
host=client.host, port=client.port, topic=msg.topic, msg=data
)
)
return handler
def on_exec_message(self, client: MqttClient, msg):
"""
Message handler for (legacy) application requests over MQTT.
"""
def response_thread(req: Request):
"""
A separate thread to handle the response to a request.
"""
if not self.run_topic:
return
response = get_message_response(req)
if not response:
return
response_topic = f'{self.run_topic}/responses/{req.id}'
self.logger.info(
'Processing response on the MQTT topic %s: %s',
response_topic,
response,
)
client.publish(payload=str(response), topic=response_topic)
self.logger.info('Received message on the MQTT backend: %s', msg)
try:
get_bus().post(msg)
except Exception as e:
self.logger.exception(e)
return
if isinstance(msg, Request):
threading.Thread(
target=response_thread,
name='MQTTProcessorResponseThread',
args=(msg,),
).start()
def _get_client(
self,
host: Optional[str] = None,
port: int = 1883,
topics: Iterable[str] = (),
client_id: Optional[str] = None,
on_message: Optional[MqttCallback] = None,
**kwargs,
) -> MqttClient:
"""
:return: A :class:`platypush.message.event.mqtt.MqttClient` instance.
It will return the existing client with the given inferred ID if it
already exists, or it will register a new one.
"""
if host:
kwargs['host'] = host
kwargs['port'] = port
else:
assert (
self.default_listener
), 'No host specified and no configured default host'
kwargs = self.default_listener.configuration
kwargs.update(
{
'topics': topics,
'on_message': on_message,
'client_id': client_id,
}
)
on_message = on_message or self.on_mqtt_message()
client_id = self._get_client_id(
host=kwargs['host'],
port=kwargs['port'],
client_id=client_id,
on_message=on_message,
topics=topics,
)
kwargs['client_id'] = client_id
with self._listeners_lock[client_id]:
client = self.listeners.get(client_id)
if not (client and client.is_alive()):
client = self.listeners[
client_id
] = MqttClient( # pylint: disable=E1125
**kwargs
)
if topics:
client.subscribe(*topics)
return client
@action
def publish(
self,
topic: str,
msg: Any,
qos: int = 0,
reply_topic: Optional[str] = None,
**mqtt_kwargs,
):
"""
Sends a message to a topic.
:param topic: Topic/channel where the message will be delivered
:param msg: Message to be sent. It can be a list, a dict, or a Message
object.
:param qos: Quality of Service (_QoS_) for the message - see `MQTT QoS
<https://assetwolf.com/learn/mqtt-qos-understanding-quality-of-service>`_
(default: 0).
:param reply_topic: If a ``reply_topic`` is specified, then the action
will wait for a response on this topic.
:param mqtt_kwargs: MQTT broker configuration (host, port, username,
password etc.). See :meth:`.__init__` parameters.
"""
response_buffer = io.BytesIO()
client = None
try:
# Try to parse it as a Platypush message or dump it to JSON from a dict/list
if isinstance(msg, (dict, list)):
msg = json.dumps(msg)
try:
msg = Message.build(json.loads(msg))
except (KeyError, TypeError, ValueError):
pass
client = self._get_client(**mqtt_kwargs)
client.connect()
response_received = threading.Event()
# If it's a request, then wait for the response
if (
isinstance(msg, Request)
and self.default_listener
and client.host == self.default_listener.host
and self.run_topic
and topic == self.run_topic
):
reply_topic = f'{self.run_topic}/responses/{msg.id}'
if reply_topic:
client.on_message = self._response_callback(
reply_topic=reply_topic,
@ -289,12 +421,13 @@ class MqttPlugin(Plugin):
client.publish(topic, str(msg), qos=qos)
if not reply_topic:
return
return None
client.loop_start()
ok = response_received.wait(timeout=timeout)
ok = response_received.wait(timeout=client.timeout)
if not ok:
raise TimeoutError('Response timed out')
return response_buffer.getvalue()
finally:
response_buffer.close()
@ -303,12 +436,50 @@ class MqttPlugin(Plugin):
try:
client.loop_stop()
except Exception as e:
self.logger.warning('Could not stop client loop: %s', e)
self.logger.warning(
'Could not stop client loop: %s: %s', type(e).__name__, e
)
client.disconnect()
@action
def subscribe(self, topic: str, **mqtt_kwargs):
"""
Programmatically subscribe to a topic on an MQTT broker.
Messages received on this topic will trigger a
:class:`platypush.message.event.mqtt.MQTTMessageEvent` event that you
can subscribe to.
:param topic: Topic to subscribe to.
:param mqtt_kwargs: MQTT broker configuration (host, port, username,
password etc.). See :meth:`.__init__` parameters.
"""
client = self._get_client(
topics=(topic,), on_message=self.on_mqtt_message(), **mqtt_kwargs
)
if not client.is_alive():
client.start()
@action
def unsubscribe(self, topic: str, **mqtt_kwargs):
"""
Programmatically unsubscribe from a topic on an MQTT broker.
:param topic: Topic to unsubscribe from.
:param mqtt_kwargs: MQTT broker configuration (host, port, username,
password etc.). See :meth:`.__init__` parameters.
"""
self._get_client(**mqtt_kwargs).unsubscribe(topic)
@staticmethod
def _response_callback(reply_topic: str, event: threading.Event, buffer: IO[bytes]):
"""
A response callback that writes the response to an IOBuffer and stops
the client loop.
"""
def on_message(client, _, msg):
if msg.topic != reply_topic:
return
@ -322,9 +493,40 @@ class MqttPlugin(Plugin):
@action
def send_message(self, *args, **kwargs):
"""
Alias for :meth:`platypush.plugins.mqtt.MqttPlugin.publish`.
Legacy alias for :meth:`platypush.plugins.mqtt.MqttPlugin.publish`.
"""
return self.publish(*args, **kwargs)
@override
def main(self):
if self.run_topic:
self.logger.warning(
'The MQTT integration is listening for commands on the topic %s.\n'
'This approach is unsafe, as it allows any client to run unauthenticated requests.\n'
'Please only enable it in test/trusted environments.',
self.run_topic,
)
for listener in self.listeners.values():
listener.start()
self.wait_stop()
@override
def stop(self):
"""
Disconnect all the clients upon plugin stop.
"""
for listener in self.listeners.values():
listener.stop()
super().stop()
for listener in self.listeners.values():
try:
listener.join(timeout=1)
except Exception:
pass
# vim:sw=4:ts=4:et:

View file

@ -0,0 +1,244 @@
from enum import IntEnum
import logging
import os
import threading
from typing import Any, Callable, Dict, Final, Iterable, Optional, Union
import paho.mqtt.client as mqtt
from platypush.config import Config
MqttCallback = Callable[["MqttClient", Any, mqtt.MQTTMessage], Any]
DEFAULT_TIMEOUT: Final[int] = 30
class MqttClient(mqtt.Client, threading.Thread):
"""
Wrapper class for an MQTT client executed in a separate thread.
"""
def __init__(
self,
*args,
host: str,
port: int,
client_id: str,
topics: Iterable[str] = (),
on_message: Optional[MqttCallback] = None,
username: Optional[str] = None,
password: Optional[str] = None,
tls_cafile: Optional[str] = None,
tls_certfile: Optional[str] = None,
tls_keyfile: Optional[str] = None,
tls_version: Optional[Union[str, IntEnum]] = None,
tls_ciphers: Optional[str] = None,
tls_insecure: bool = False,
timeout: int = DEFAULT_TIMEOUT,
**kwargs,
):
self.client_id = client_id or str(Config.get('device_id'))
mqtt.Client.__init__(self, *args, client_id=self.client_id, **kwargs)
threading.Thread.__init__(self, name=f'MQTTClient:{self.client_id}')
self.logger = logging.getLogger(self.__class__.__name__)
self.host = host
self.port = port
self.tls_cafile = self._expandpath(tls_cafile)
self.tls_certfile = self._expandpath(tls_certfile)
self.tls_keyfile = self._expandpath(tls_keyfile)
self.tls_version = self._get_tls_version(tls_version)
self.tls_ciphers = self._expandpath(tls_ciphers)
self.tls_insecure = tls_insecure
self.username = username
self.password = password
self.topics = set(topics or [])
self.timeout = timeout
self.on_connect = self.connect_hndl()
self.on_disconnect = self.disconnect_hndl()
if on_message:
self.on_message = on_message # type: ignore
if username and password:
self.username_pw_set(username, password)
if tls_cafile:
self.tls_set(
ca_certs=self.tls_cafile,
certfile=self.tls_certfile,
keyfile=self.tls_keyfile,
tls_version=self.tls_version,
ciphers=self.tls_ciphers,
)
self.tls_insecure_set(self.tls_insecure)
self._running = False
self._stop_scheduled = False
@staticmethod
def _expandpath(path: Optional[str] = None) -> Optional[str]:
"""
Utility method to expand a path string.
"""
return os.path.abspath(os.path.expanduser(path)) if path else None
@staticmethod
def _get_tls_version(version: Optional[Union[str, IntEnum]] = None):
"""
A utility method that normalizes an SSL version string or enum to a
standard ``_SSLMethod`` enum.
"""
import ssl
if not version:
return None
if isinstance(version, type(ssl.PROTOCOL_TLS)):
return version
if isinstance(version, str):
version = version.lower()
if version == 'tls':
return ssl.PROTOCOL_TLS
if version == 'tlsv1':
return ssl.PROTOCOL_TLSv1
if version == 'tlsv1.1':
return ssl.PROTOCOL_TLSv1_1
if version == 'tlsv1.2':
return ssl.PROTOCOL_TLSv1_2
raise AssertionError(f'Unrecognized TLS version: {version}')
def connect(
self,
*args,
host: Optional[str] = None,
port: Optional[int] = None,
keepalive: Optional[int] = None,
**kwargs,
):
"""
Overrides the default connect method.
"""
if not self.is_connected():
self.logger.debug(
'Connecting to MQTT broker %s:%d, client_id=%s...',
self.host,
self.port,
self.client_id,
)
return super().connect(
host=host or self.host,
port=port or self.port,
keepalive=keepalive or self.timeout,
*args,
**kwargs,
)
return None
@property
def configuration(self) -> Dict[str, Any]:
"""
:return: The configuration of the client.
"""
return {
'host': self.host,
'port': self.port,
'topics': self.topics,
'on_message': self.on_message,
'username': self.username,
'password': self.password,
'client_id': self.client_id,
'tls_cafile': self.tls_cafile,
'tls_certfile': self.tls_certfile,
'tls_keyfile': self.tls_keyfile,
'tls_version': self.tls_version,
'tls_ciphers': self.tls_ciphers,
'tls_insecure': self.tls_insecure,
'timeout': self.timeout,
}
def subscribe(self, *topics, **kwargs):
"""
Client subscription handler.
"""
if not topics:
topics = self.topics
self.topics.update(topics)
for topic in topics:
super().subscribe(topic, **kwargs)
def unsubscribe(self, *topics, **kwargs):
"""
Client unsubscribe handler.
"""
if not topics:
topics = self.topics
for topic in topics:
if topic not in self.topics:
self.logger.info('The topic %s is not subscribed', topic)
continue
super().unsubscribe(topic, **kwargs)
self.topics.remove(topic)
def connect_hndl(self):
"""
When the client connects, subscribe to all the registered topics.
"""
def handler(*_, **__):
self.logger.debug(
'Connected to MQTT broker %s:%d, client_id=%s',
self.host,
self.port,
self.client_id,
)
self.subscribe()
return handler
def disconnect_hndl(self):
"""
Notifies the client disconnection.
"""
def handler(*_, **__):
self.logger.debug(
'Disconnected from MQTT broker %s:%d, client_id=%s',
self.host,
self.port,
self.client_id,
)
return handler
def run(self):
"""
Connects to the MQTT server, subscribes to all the registered topics
and listens for messages.
"""
super().run()
self.connect()
self._running = True
self.loop_forever()
def stop(self):
"""
The stop method schedules the stop and disconnects the client.
"""
if not self.is_alive():
return
self._stop_scheduled = True
self.disconnect()
self._running = False
# vim:sw=4:ts=4:et:

View file

@ -1,5 +1,6 @@
manifest:
events: {}
events:
- platypush.message.event.mqtt.MQTTMessageEvent
install:
apk:
- py3-paho-mqtt