diff --git a/platypush/backend/mqtt.py b/platypush/backend/mqtt.py index 9422bd62d1..56b4347689 100644 --- a/platypush/backend/mqtt.py +++ b/platypush/backend/mqtt.py @@ -5,6 +5,7 @@ import threading from platypush.backend import Backend from platypush.context import get_plugin from platypush.message import Message +from platypush.message.event.mqtt import MQTTMessageEvent from platypush.message.request import Request from platypush.utils import set_thread_name @@ -14,15 +15,22 @@ class MqttBackend(Backend): Backend that reads messages from a configured MQTT topic (default: ``platypush_bus_mq/``) and posts them to the application bus. + Triggers: + + * :class:`platypush.message.event.mqtt.MQTTMessageEvent` when a new + message is received on one of the custom listeners + Requires: * **paho-mqtt** (``pip install paho-mqtt``) """ - def __init__(self, host, port=1883, topic='platypush_bus_mq', tls_cafile=None, - tls_certfile=None, tls_keyfile=None, + _default_mqtt_port = 1883 + + def __init__(self, host, port=_default_mqtt_port, topic='platypush_bus_mq', + tls_cafile=None, tls_certfile=None, tls_keyfile=None, tls_version=None, tls_ciphers=None, username=None, - password=None, *args, **kwargs): + password=None, listeners=None, *args, **kwargs): """ :param host: MQTT broker host :type host: str @@ -53,6 +61,15 @@ class MqttBackend(Backend): :param password: Specify it if the MQTT server requires authentication (default: None) :type password: str + + :param listeners: If specified then the MQTT backend will also listen for + messages on the additional configured message queues. This parameter + is a list of maps where each item supports the same arguments passed + to the main backend configuration (host, port, topic, password etc.). + Note that the message queue configured on the main configuration + will expect valid Platypush messages that then can execute, while + message queues registered to the listeners will accept any message. + :type listeners: list[dict] """ super().__init__(*args, **kwargs) @@ -63,6 +80,7 @@ class MqttBackend(Backend): self.username = username self.password = password self._client = None + self._listeners = [] self.tls_cafile = os.path.abspath(os.path.expanduser(tls_cafile)) \ if tls_cafile else None @@ -75,6 +93,7 @@ class MqttBackend(Backend): self.tls_version = tls_version self.tls_ciphers = tls_ciphers + self.listeners_conf = listeners or [] def send_message(self, msg): @@ -90,6 +109,60 @@ class MqttBackend(Backend): except Exception as e: self.logger.exception(e) + def _initialize_listeners(self, listeners_conf): + import paho.mqtt.client as mqtt + + def listener_thread(client, host, port): + client.connect(host, port, 60) + client.loop_forever() + + def on_connect(topic): + def handler(client, userdata, flags, rc): + client.subscribe(topic) + return handler + + def on_message(client, userdata, msg): + data = msg.payload + try: + data = data.decode('utf-8') + data = json.loads(data) + except: + pass + + self.bus.post(MQTTMessageEvent(host=client._host, port=client._port, + topic=msg.topic, msg=data)) + + for i, listener in enumerate(listeners_conf): + host = listener.get('host') + port = listener.get('port', self._default_mqtt_port) + topic = listener.get('topic') + username = listener.get('username') + password = listener.get('password') + tls_cafile = listener.get('tls_cafile') + + if not host or not topic: + self.logger.warning('No host nor topic specified for listener ' + + 'n.{}'.format(i+1)) + continue + + client = mqtt.Client() + client.on_connect = on_connect(topic) + client.on_message = on_message + + if username and password: + client.username_pw_set(username, password) + + if tls_cafile: + client.tls_set(ca_certs=tls_cafile, + certfile=listener.get('tls_certfile'), + keyfile=listener.get('tls_keyfile'), + tls_version=listener.get('tls_version'), + ciphers=listener.get('tls_ciphers')) + + threading.Thread(target=listener_thread, kwargs = { + 'client': client, 'host': host, 'port': port }).start() + + def run(self): def on_connect(client, userdata, flags, rc): client.subscribe(self.topic) @@ -144,6 +217,7 @@ class MqttBackend(Backend): self.logger.info('Initialized MQTT backend on host {}:{}, topic {}'. format(self.host, self.port, self.topic)) + self._initialize_listeners(self.listeners_conf) self._client.loop_forever() def stop(self): @@ -153,4 +227,14 @@ class MqttBackend(Backend): self._client.loop_stop() self._client = None + for listener in self._listeners: + try: + listener.loop_stop() + except Exception as e: + self.logger.warning('Could not stop listener ' + + '{host}:{port}: {error}'.format( + host=listener._host, port=listener._port, + error=str(e))) + + # vim:sw=4:ts=4:et: diff --git a/platypush/message/event/mqtt.py b/platypush/message/event/mqtt.py new file mode 100644 index 0000000000..ec31bf8667 --- /dev/null +++ b/platypush/message/event/mqtt.py @@ -0,0 +1,15 @@ +from platypush.message.event import Event + + +class MQTTMessageEvent(Event): + """ + MQTT message event object. Fired when :mod:`platypush.backend.mqtt` receives + a new event. + """ + + def __init__(self, msg, host=None, port=None, topic=None, *args, **kwargs): + super().__init__(msg=msg, host=host, port=port, topic=topic, + *args, **kwargs) + + +# vim:sw=4:ts=4:et: