From 8635ed8268e39020dd86c06bfe0068466d421adf Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Fri, 26 Oct 2018 13:33:23 +0000 Subject: [PATCH] Added support for MQTT backend to process responses on a topic named platypush_bus_mq//responses/ --- platypush/backend/mqtt.py | 16 +++++++++++++++- platypush/backend/tcp.py | 2 +- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/platypush/backend/mqtt.py b/platypush/backend/mqtt.py index 899897d4..7527d42b 100644 --- a/platypush/backend/mqtt.py +++ b/platypush/backend/mqtt.py @@ -1,10 +1,12 @@ import json +import threading import paho.mqtt.client as mqtt import paho.mqtt.publish as publisher from platypush.backend import Backend from platypush.message import Message +from platypush.message.request import Request class MqttBackend(Backend): @@ -44,12 +46,21 @@ class MqttBackend(Backend): publisher.single(self.topic, str(msg), hostname=self.host, port=self.port) - def run(self): def on_connect(client, userdata, flags, rc): client.subscribe(self.topic) def on_message(client, userdata, msg): + def response_thread(msg): + response = self.get_message_response(msg) + response_topic = '{}/responses/{}'.format(self.topic, msg.id) + + self.logger.info('Processing response on the MQTT topic {}: {}'. + format(response_topic, response)) + + publisher.single(response_topic, str(response), + hostname=self.host, port=self.port) + msg = msg.payload.decode('utf-8') try: msg = Message.build(json.loads(msg)) except: pass @@ -57,6 +68,9 @@ class MqttBackend(Backend): self.logger.info('Received message on the MQTT backend: {}'.format(msg)) self.on_message(msg) + if isinstance(msg, Request): + threading.Thread(target=response_thread, args=(msg,)).start() + super().run() client = mqtt.Client() client.on_connect = on_connect diff --git a/platypush/backend/tcp.py b/platypush/backend/tcp.py index 37faf63e..b5a7faf1 100644 --- a/platypush/backend/tcp.py +++ b/platypush/backend/tcp.py @@ -88,7 +88,7 @@ class TcpBackend(Backend): finally: sock.close() - threading.Thread(target=_f_wrapper).run() + threading.Thread(target=_f_wrapper).start() def run(self): super().run()