Added support for multiple listeners on multiple servers and topics for general-purpose messaging on the MQTT backend

This commit is contained in:
Fabio Manganiello 2019-03-07 22:51:58 +01:00
parent 38ebe87220
commit 84099b2ab7
2 changed files with 102 additions and 3 deletions

View file

@ -5,6 +5,7 @@ import threading
from platypush.backend import Backend from platypush.backend import Backend
from platypush.context import get_plugin from platypush.context import get_plugin
from platypush.message import Message from platypush.message import Message
from platypush.message.event.mqtt import MQTTMessageEvent
from platypush.message.request import Request from platypush.message.request import Request
from platypush.utils import set_thread_name from platypush.utils import set_thread_name
@ -14,15 +15,22 @@ class MqttBackend(Backend):
Backend that reads messages from a configured MQTT topic (default: Backend that reads messages from a configured MQTT topic (default:
``platypush_bus_mq/<device_id>``) and posts them to the application bus. ``platypush_bus_mq/<device_id>``) and posts them to the application bus.
Triggers:
* :class:`platypush.message.event.mqtt.MQTTMessageEvent` when a new
message is received on one of the custom listeners
Requires: Requires:
* **paho-mqtt** (``pip install paho-mqtt``) * **paho-mqtt** (``pip install paho-mqtt``)
""" """
def __init__(self, host, port=1883, topic='platypush_bus_mq', tls_cafile=None, _default_mqtt_port = 1883
tls_certfile=None, tls_keyfile=None,
def __init__(self, host, port=_default_mqtt_port, topic='platypush_bus_mq',
tls_cafile=None, tls_certfile=None, tls_keyfile=None,
tls_version=None, tls_ciphers=None, username=None, tls_version=None, tls_ciphers=None, username=None,
password=None, *args, **kwargs): password=None, listeners=None, *args, **kwargs):
""" """
:param host: MQTT broker host :param host: MQTT broker host
:type host: str :type host: str
@ -53,6 +61,15 @@ class MqttBackend(Backend):
:param password: Specify it if the MQTT server requires authentication (default: None) :param password: Specify it if the MQTT server requires authentication (default: None)
:type password: str :type password: str
:param listeners: If specified then the MQTT backend will also listen for
messages on the additional configured message queues. This parameter
is a list of maps where each item supports the same arguments passed
to the main backend configuration (host, port, topic, password etc.).
Note that the message queue configured on the main configuration
will expect valid Platypush messages that then can execute, while
message queues registered to the listeners will accept any message.
:type listeners: list[dict]
""" """
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
@ -63,6 +80,7 @@ class MqttBackend(Backend):
self.username = username self.username = username
self.password = password self.password = password
self._client = None self._client = None
self._listeners = []
self.tls_cafile = os.path.abspath(os.path.expanduser(tls_cafile)) \ self.tls_cafile = os.path.abspath(os.path.expanduser(tls_cafile)) \
if tls_cafile else None if tls_cafile else None
@ -75,6 +93,7 @@ class MqttBackend(Backend):
self.tls_version = tls_version self.tls_version = tls_version
self.tls_ciphers = tls_ciphers self.tls_ciphers = tls_ciphers
self.listeners_conf = listeners or []
def send_message(self, msg): def send_message(self, msg):
@ -90,6 +109,60 @@ class MqttBackend(Backend):
except Exception as e: except Exception as e:
self.logger.exception(e) self.logger.exception(e)
def _initialize_listeners(self, listeners_conf):
import paho.mqtt.client as mqtt
def listener_thread(client, host, port):
client.connect(host, port, 60)
client.loop_forever()
def on_connect(topic):
def handler(client, userdata, flags, rc):
client.subscribe(topic)
return handler
def on_message(client, userdata, msg):
data = msg.payload
try:
data = data.decode('utf-8')
data = json.loads(data)
except:
pass
self.bus.post(MQTTMessageEvent(host=client._host, port=client._port,
topic=msg.topic, msg=data))
for i, listener in enumerate(listeners_conf):
host = listener.get('host')
port = listener.get('port', self._default_mqtt_port)
topic = listener.get('topic')
username = listener.get('username')
password = listener.get('password')
tls_cafile = listener.get('tls_cafile')
if not host or not topic:
self.logger.warning('No host nor topic specified for listener ' +
'n.{}'.format(i+1))
continue
client = mqtt.Client()
client.on_connect = on_connect(topic)
client.on_message = on_message
if username and password:
client.username_pw_set(username, password)
if tls_cafile:
client.tls_set(ca_certs=tls_cafile,
certfile=listener.get('tls_certfile'),
keyfile=listener.get('tls_keyfile'),
tls_version=listener.get('tls_version'),
ciphers=listener.get('tls_ciphers'))
threading.Thread(target=listener_thread, kwargs = {
'client': client, 'host': host, 'port': port }).start()
def run(self): def run(self):
def on_connect(client, userdata, flags, rc): def on_connect(client, userdata, flags, rc):
client.subscribe(self.topic) client.subscribe(self.topic)
@ -144,6 +217,7 @@ class MqttBackend(Backend):
self.logger.info('Initialized MQTT backend on host {}:{}, topic {}'. self.logger.info('Initialized MQTT backend on host {}:{}, topic {}'.
format(self.host, self.port, self.topic)) format(self.host, self.port, self.topic))
self._initialize_listeners(self.listeners_conf)
self._client.loop_forever() self._client.loop_forever()
def stop(self): def stop(self):
@ -153,4 +227,14 @@ class MqttBackend(Backend):
self._client.loop_stop() self._client.loop_stop()
self._client = None self._client = None
for listener in self._listeners:
try:
listener.loop_stop()
except Exception as e:
self.logger.warning('Could not stop listener ' +
'{host}:{port}: {error}'.format(
host=listener._host, port=listener._port,
error=str(e)))
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et:

View file

@ -0,0 +1,15 @@
from platypush.message.event import Event
class MQTTMessageEvent(Event):
"""
MQTT message event object. Fired when :mod:`platypush.backend.mqtt` receives
a new event.
"""
def __init__(self, msg, host=None, port=None, topic=None, *args, **kwargs):
super().__init__(msg=msg, host=host, port=port, topic=topic,
*args, **kwargs)
# vim:sw=4:ts=4:et: