forked from platypush/platypush
Fabio Manganiello
b76f141b61
If the client that forwarded the request is no longer available (either because an exception or a timeout was raised) then its I/O buffer and event loop may be closed. In this case, the response callback should handle and report the exception, and still set the event, so that any other threads waiting for the response can move on.
544 lines
18 KiB
Python
544 lines
18 KiB
Python
from collections import defaultdict
|
|
import hashlib
|
|
import io
|
|
import json
|
|
import threading
|
|
from typing import Any, Dict, Iterable, Optional, IO
|
|
|
|
import paho.mqtt.client as mqtt
|
|
|
|
from platypush.config import Config
|
|
from platypush.context import get_bus
|
|
from platypush.message import Message
|
|
from platypush.message.event.mqtt import MQTTMessageEvent
|
|
from platypush.message.request import Request
|
|
from platypush.plugins import RunnablePlugin, action
|
|
from platypush.utils import get_message_response
|
|
|
|
from ._client import DEFAULT_TIMEOUT, MqttCallback, MqttClient
|
|
|
|
|
|
class MqttPlugin(RunnablePlugin):
|
|
"""
|
|
This plugin allows you to send custom message to a message queue compatible
|
|
with the MQTT protocol, see https://mqtt.org/
|
|
|
|
Requires:
|
|
|
|
* **paho-mqtt** (``pip install paho-mqtt``)
|
|
|
|
Triggers:
|
|
|
|
* :class:`platypush.message.event.mqtt.MQTTMessageEvent` when a new
|
|
message is received on a subscribed topic.
|
|
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
host: Optional[str] = None,
|
|
port: int = 1883,
|
|
topics: Optional[Iterable[str]] = None,
|
|
tls_cafile: Optional[str] = None,
|
|
tls_certfile: Optional[str] = None,
|
|
tls_keyfile: Optional[str] = None,
|
|
tls_version: Optional[str] = None,
|
|
tls_ciphers: Optional[str] = None,
|
|
tls_insecure: bool = False,
|
|
username: Optional[str] = None,
|
|
password: Optional[str] = None,
|
|
client_id: Optional[str] = None,
|
|
timeout: Optional[int] = DEFAULT_TIMEOUT,
|
|
run_topic_prefix: Optional[str] = None,
|
|
listeners: Optional[Iterable[dict]] = None,
|
|
**kwargs,
|
|
):
|
|
"""
|
|
:param host: If set, MQTT messages will by default routed to this host
|
|
unless overridden in `send_message` (default: None)
|
|
:param port: If a default host is set, specify the listen port
|
|
(default: 1883)
|
|
:param topics: If a default ``host`` is specified, then this list will
|
|
include a default list of topics that should be subscribed on that
|
|
broker at startup.
|
|
:param tls_cafile: If a default host is set and requires TLS/SSL,
|
|
specify the certificate authority file (default: None)
|
|
:param tls_certfile: If a default host is set and requires TLS/SSL,
|
|
specify the certificate file (default: None)
|
|
:param tls_keyfile: If a default host is set and requires TLS/SSL,
|
|
specify the key file (default: None)
|
|
:param tls_version: If TLS/SSL is enabled on the MQTT server and it
|
|
requires a certain TLS version, specify it here (default: None).
|
|
Supported versions: ``tls`` (automatic), ``tlsv1``, ``tlsv1.1``,
|
|
``tlsv1.2``.
|
|
:param tls_ciphers: If a default host is set and requires TLS/SSL,
|
|
specify the supported ciphers (default: None)
|
|
:param tls_insecure: Set to True to ignore TLS insecure warnings
|
|
(default: False).
|
|
:param username: If a default host is set and requires user
|
|
authentication, specify the username ciphers (default: None)
|
|
:param password: If a default host is set and requires user
|
|
authentication, specify the password ciphers (default: None)
|
|
:param client_id: ID used to identify the client on the MQTT server
|
|
(default: None). If None is specified then
|
|
``Config.get('device_id')`` will be used.
|
|
:param timeout: Client timeout in seconds (default: 30 seconds).
|
|
:param run_topic_prefix: If specified, the MQTT plugin will listen for
|
|
messages on a topic in the format `{run_topic_prefix}/{device_id}.
|
|
When a message is received, it will interpret it as a JSON request
|
|
to execute, in the format
|
|
``{"type": "request", "action": "plugin.action", "args": {...}}``.
|
|
|
|
.. warning:: This parameter is mostly kept for backwards
|
|
compatibility, but you should avoid it - unless the MQTT broker
|
|
is on a personal safe network that you own, or it requires
|
|
user authentication and it uses SSL. The reason is that the
|
|
messages received on this topic won't be subject to token
|
|
verification, allowing unauthenticated arbitrary command
|
|
execution on the target host. If you still want the ability of
|
|
running commands remotely over an MQTT broker, then you may
|
|
consider creating a dedicated topic listener with an attached
|
|
event hook on
|
|
:class:`platypush.message.event.mqtt.MQTTMessageEvent`. The
|
|
hook can implement whichever authentication logic you like.
|
|
|
|
:param listeners: If specified, the MQTT plugin will listen for
|
|
messages on these topics. Use this parameter if you also want to
|
|
listen on other MQTT brokers other than the primary one. This
|
|
parameter supports a list of maps, where each item supports the
|
|
same arguments passed to the main configuration (host, port, topic,
|
|
password etc.). If host/port are omitted, then the host/port value
|
|
from the plugin configuration will be used. If any of the other
|
|
fields are omitted, then their default value will be used (usually
|
|
null). Example:
|
|
|
|
.. code-block:: yaml
|
|
|
|
listeners:
|
|
# This listener use the default configured host/port
|
|
- topics:
|
|
- topic1
|
|
- topic2
|
|
- topic3
|
|
|
|
# This will use a custom MQTT broker host
|
|
- host: sensors
|
|
port: 11883
|
|
username: myuser
|
|
password: secret
|
|
topics:
|
|
- topic4
|
|
- topic5
|
|
|
|
"""
|
|
super().__init__(**kwargs)
|
|
|
|
self.client_id = client_id or str(Config.get('device_id'))
|
|
self.run_topic = (
|
|
f'{run_topic_prefix}/{Config.get("device_id")}'
|
|
if type(self) == MqttPlugin and run_topic_prefix
|
|
else None
|
|
)
|
|
|
|
self._listeners_lock = defaultdict(threading.RLock)
|
|
self.listeners: Dict[str, MqttClient] = {} # client_id -> MqttClient map
|
|
self.timeout = timeout
|
|
self.default_listener = (
|
|
self._get_client(
|
|
host=host,
|
|
port=port,
|
|
topics=(
|
|
(tuple(topics) if topics else ())
|
|
+ ((self.run_topic,) if self.run_topic else ())
|
|
),
|
|
on_message=self.on_mqtt_message(),
|
|
tls_cafile=tls_cafile,
|
|
tls_certfile=tls_certfile,
|
|
tls_keyfile=tls_keyfile,
|
|
tls_version=tls_version,
|
|
tls_ciphers=tls_ciphers,
|
|
tls_insecure=tls_insecure,
|
|
username=username,
|
|
password=password,
|
|
client_id=client_id,
|
|
timeout=timeout,
|
|
)
|
|
if host
|
|
else None
|
|
)
|
|
|
|
for listener in listeners or []:
|
|
self._get_client(
|
|
**self._mqtt_args(on_message=self.on_mqtt_message(), **listener)
|
|
)
|
|
|
|
def _get_client_id(
|
|
self,
|
|
host: str,
|
|
port: int,
|
|
client_id: Optional[str] = None,
|
|
topics: Iterable[str] = (),
|
|
**_,
|
|
) -> str:
|
|
"""
|
|
Calculates a unique client ID given an MQTT configuration.
|
|
"""
|
|
client_id = client_id or self.client_id
|
|
client_hash = hashlib.sha1(
|
|
'|'.join(
|
|
[
|
|
self.__class__.__name__,
|
|
host,
|
|
str(port),
|
|
json.dumps(sorted(topics)),
|
|
]
|
|
).encode()
|
|
).hexdigest()
|
|
|
|
return f'{client_id}-{client_hash}'
|
|
|
|
def _mqtt_args(
|
|
self,
|
|
host: Optional[str] = None,
|
|
port: int = 1883,
|
|
timeout: Optional[int] = DEFAULT_TIMEOUT,
|
|
topics: Iterable[str] = (),
|
|
**kwargs,
|
|
):
|
|
"""
|
|
:return: An MQTT configuration mapping that uses either the specified
|
|
arguments (if host is specified), or falls back to the default
|
|
configurated arguments.
|
|
"""
|
|
default_conf = (
|
|
self.default_listener.configuration if self.default_listener else {}
|
|
)
|
|
|
|
if not host:
|
|
assert (
|
|
self.default_listener
|
|
), 'No host specified and no configured default host'
|
|
|
|
return {
|
|
**default_conf,
|
|
'topics': (*self.default_listener.topics, *topics),
|
|
}
|
|
|
|
return {
|
|
'host': host,
|
|
'port': port,
|
|
'timeout': timeout or default_conf.get('timeout'),
|
|
'topics': topics,
|
|
**kwargs,
|
|
}
|
|
|
|
def on_mqtt_message(self) -> MqttCallback:
|
|
"""
|
|
Default MQTT message handler. It forwards a
|
|
:class:`platypush.message.event.mqtt.MQTTMessageEvent` event to the
|
|
bus.
|
|
"""
|
|
|
|
def handler(client: MqttClient, _, msg: mqtt.MQTTMessage):
|
|
data = msg.payload
|
|
try:
|
|
data = data.decode('utf-8')
|
|
data = json.loads(data)
|
|
except (TypeError, AttributeError, ValueError):
|
|
# Not a serialized JSON
|
|
pass
|
|
|
|
if self.default_listener and msg.topic == self.run_topic:
|
|
try:
|
|
app_msg = Message.build(data)
|
|
self.on_exec_message(client, app_msg)
|
|
except Exception as e:
|
|
self.logger.warning(
|
|
'Message execution error: %s: %s', type(e).__name__, str(e)
|
|
)
|
|
else:
|
|
get_bus().post(
|
|
MQTTMessageEvent(
|
|
host=client.host, port=client.port, topic=msg.topic, msg=data
|
|
)
|
|
)
|
|
|
|
return handler
|
|
|
|
def on_exec_message(self, client: MqttClient, msg):
|
|
"""
|
|
Message handler for (legacy) application requests over MQTT.
|
|
"""
|
|
|
|
def response_thread(req: Request):
|
|
"""
|
|
A separate thread to handle the response to a request.
|
|
"""
|
|
if not self.run_topic:
|
|
return
|
|
|
|
response = get_message_response(req)
|
|
if not response:
|
|
return
|
|
|
|
response_topic = f'{self.run_topic}/responses/{req.id}'
|
|
self.logger.info(
|
|
'Processing response on the MQTT topic %s: %s',
|
|
response_topic,
|
|
response,
|
|
)
|
|
|
|
client.publish(payload=str(response), topic=response_topic)
|
|
|
|
self.logger.info('Received message on the MQTT backend: %s', msg)
|
|
|
|
try:
|
|
get_bus().post(msg)
|
|
except Exception as e:
|
|
self.logger.exception(e)
|
|
return
|
|
|
|
if isinstance(msg, Request):
|
|
threading.Thread(
|
|
target=response_thread,
|
|
name='MQTTProcessorResponseThread',
|
|
args=(msg,),
|
|
).start()
|
|
|
|
def _get_client(
|
|
self,
|
|
host: Optional[str] = None,
|
|
port: int = 1883,
|
|
topics: Iterable[str] = (),
|
|
client_id: Optional[str] = None,
|
|
on_message: Optional[MqttCallback] = None,
|
|
**kwargs,
|
|
) -> MqttClient:
|
|
"""
|
|
:return: A :class:`platypush.message.event.mqtt.MqttClient` instance.
|
|
It will return the existing client with the given inferred ID if it
|
|
already exists, or it will register a new one.
|
|
"""
|
|
if host:
|
|
kwargs['host'] = host
|
|
kwargs['port'] = port
|
|
else:
|
|
assert (
|
|
self.default_listener
|
|
), 'No host specified and no configured default host'
|
|
kwargs = self.default_listener.configuration
|
|
|
|
kwargs.update(
|
|
{
|
|
'topics': topics,
|
|
'on_message': on_message,
|
|
'client_id': client_id,
|
|
}
|
|
)
|
|
|
|
on_message = on_message or self.on_mqtt_message()
|
|
client_id = self._get_client_id(
|
|
host=kwargs['host'],
|
|
port=kwargs['port'],
|
|
client_id=client_id,
|
|
topics=topics,
|
|
)
|
|
|
|
kwargs['client_id'] = client_id
|
|
with self._listeners_lock[client_id]:
|
|
client = self.listeners.get(client_id)
|
|
if not (client and client.is_alive()):
|
|
client = self.listeners[
|
|
client_id
|
|
] = MqttClient( # pylint: disable=E1125
|
|
**kwargs
|
|
)
|
|
|
|
if topics:
|
|
client.subscribe(*topics)
|
|
|
|
return client
|
|
|
|
@action
|
|
def publish(
|
|
self,
|
|
topic: str,
|
|
msg: Any,
|
|
qos: int = 0,
|
|
reply_topic: Optional[str] = None,
|
|
**mqtt_kwargs,
|
|
):
|
|
"""
|
|
Sends a message to a topic.
|
|
|
|
:param topic: Topic/channel where the message will be delivered
|
|
:param msg: Message to be sent. It can be a list, a dict, or a Message
|
|
object.
|
|
:param qos: Quality of Service (_QoS_) for the message - see `MQTT QoS
|
|
<https://assetwolf.com/learn/mqtt-qos-understanding-quality-of-service>`_
|
|
(default: 0).
|
|
:param reply_topic: If a ``reply_topic`` is specified, then the action
|
|
will wait for a response on this topic.
|
|
:param mqtt_kwargs: MQTT broker configuration (host, port, username,
|
|
password etc.). See :meth:`.__init__` parameters.
|
|
"""
|
|
response_buffer = io.BytesIO()
|
|
client = None
|
|
|
|
try:
|
|
# Try to parse it as a Platypush message or dump it to JSON from a dict/list
|
|
if isinstance(msg, (dict, list)):
|
|
msg = json.dumps(msg)
|
|
|
|
try:
|
|
msg = Message.build(json.loads(msg))
|
|
except (KeyError, TypeError, ValueError):
|
|
pass
|
|
|
|
client = self._get_client(**mqtt_kwargs)
|
|
client.connect()
|
|
response_received = threading.Event()
|
|
|
|
# If it's a request, then wait for the response
|
|
if (
|
|
isinstance(msg, Request)
|
|
and self.default_listener
|
|
and client.host == self.default_listener.host
|
|
and self.run_topic
|
|
and topic == self.run_topic
|
|
):
|
|
reply_topic = f'{self.run_topic}/responses/{msg.id}'
|
|
|
|
if reply_topic:
|
|
client.on_message = self._response_callback(
|
|
reply_topic=reply_topic,
|
|
event=response_received,
|
|
buffer=response_buffer,
|
|
)
|
|
client.subscribe(reply_topic)
|
|
|
|
client.publish(topic, str(msg), qos=qos)
|
|
if not reply_topic:
|
|
return None
|
|
|
|
client.loop_start()
|
|
ok = response_received.wait(timeout=client.timeout)
|
|
if not ok:
|
|
raise TimeoutError('Response timed out')
|
|
|
|
return response_buffer.getvalue()
|
|
finally:
|
|
response_buffer.close()
|
|
|
|
if client:
|
|
client.stop()
|
|
del client
|
|
|
|
@action
|
|
def subscribe(self, topic: str, **mqtt_kwargs):
|
|
"""
|
|
Programmatically subscribe to a topic on an MQTT broker.
|
|
|
|
Messages received on this topic will trigger a
|
|
:class:`platypush.message.event.mqtt.MQTTMessageEvent` event that you
|
|
can subscribe to.
|
|
|
|
:param topic: Topic to subscribe to.
|
|
:param mqtt_kwargs: MQTT broker configuration (host, port, username,
|
|
password etc.). See :meth:`.__init__` parameters.
|
|
"""
|
|
client = self._get_client(
|
|
topics=(topic,), on_message=self.on_mqtt_message(), **mqtt_kwargs
|
|
)
|
|
|
|
if not client.is_alive():
|
|
client.start()
|
|
|
|
@action
|
|
def unsubscribe(self, topic: str, **mqtt_kwargs):
|
|
"""
|
|
Programmatically unsubscribe from a topic on an MQTT broker.
|
|
|
|
:param topic: Topic to unsubscribe from.
|
|
:param mqtt_kwargs: MQTT broker configuration (host, port, username,
|
|
password etc.). See :meth:`.__init__` parameters.
|
|
"""
|
|
client_id = self._get_client_id(
|
|
topics=(topic,),
|
|
**mqtt_kwargs,
|
|
)
|
|
|
|
with self._listeners_lock[client_id]:
|
|
client = self.listeners.get(client_id)
|
|
|
|
if not client:
|
|
self.logger.info('No subscriptions found for topic %s', topic)
|
|
return
|
|
|
|
client.unsubscribe(topic)
|
|
client.stop()
|
|
del client
|
|
|
|
def _response_callback(
|
|
self, reply_topic: str, event: threading.Event, buffer: IO[bytes]
|
|
):
|
|
"""
|
|
A response callback that writes the response to an IOBuffer and stops
|
|
the client loop.
|
|
"""
|
|
|
|
def on_message(client, _, msg):
|
|
if msg.topic != reply_topic:
|
|
return
|
|
|
|
try:
|
|
buffer.write(msg.payload)
|
|
client.loop_stop()
|
|
except Exception as e:
|
|
self.logger.warning(
|
|
'Could not write the response back to the MQTT client: %s', e
|
|
)
|
|
finally:
|
|
event.set()
|
|
|
|
return on_message
|
|
|
|
@action
|
|
def send_message(self, *args, **kwargs):
|
|
"""
|
|
Legacy alias for :meth:`platypush.plugins.mqtt.MqttPlugin.publish`.
|
|
"""
|
|
return self.publish(*args, **kwargs)
|
|
|
|
def main(self):
|
|
if self.run_topic:
|
|
self.logger.warning(
|
|
'The MQTT integration is listening for commands on the topic %s.\n'
|
|
'This approach is unsafe, as it allows any client to run unauthenticated requests.\n'
|
|
'Please only enable it in test/trusted environments.',
|
|
self.run_topic,
|
|
)
|
|
|
|
for listener in self.listeners.values():
|
|
listener.start()
|
|
|
|
self.wait_stop()
|
|
|
|
def stop(self):
|
|
"""
|
|
Disconnect all the clients upon plugin stop.
|
|
"""
|
|
for listener in self.listeners.values():
|
|
listener.stop()
|
|
|
|
super().stop()
|
|
|
|
for listener in self.listeners.values():
|
|
try:
|
|
listener.join(timeout=1)
|
|
del listener
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
# vim:sw=4:ts=4:et:
|