Added support for MQTT backend to process responses on a topic named platypush_bus_mq/<device>/responses/<msg_id>

This commit is contained in:
Fabio Manganiello 2018-10-26 13:33:23 +00:00
parent 284e0638f8
commit 8635ed8268
2 changed files with 16 additions and 2 deletions

View File

@ -1,10 +1,12 @@
import json import json
import threading
import paho.mqtt.client as mqtt import paho.mqtt.client as mqtt
import paho.mqtt.publish as publisher import paho.mqtt.publish as publisher
from platypush.backend import Backend from platypush.backend import Backend
from platypush.message import Message from platypush.message import Message
from platypush.message.request import Request
class MqttBackend(Backend): class MqttBackend(Backend):
@ -44,12 +46,21 @@ class MqttBackend(Backend):
publisher.single(self.topic, str(msg), hostname=self.host, port=self.port) publisher.single(self.topic, str(msg), hostname=self.host, port=self.port)
def run(self): def run(self):
def on_connect(client, userdata, flags, rc): def on_connect(client, userdata, flags, rc):
client.subscribe(self.topic) client.subscribe(self.topic)
def on_message(client, userdata, msg): def on_message(client, userdata, msg):
def response_thread(msg):
response = self.get_message_response(msg)
response_topic = '{}/responses/{}'.format(self.topic, msg.id)
self.logger.info('Processing response on the MQTT topic {}: {}'.
format(response_topic, response))
publisher.single(response_topic, str(response),
hostname=self.host, port=self.port)
msg = msg.payload.decode('utf-8') msg = msg.payload.decode('utf-8')
try: msg = Message.build(json.loads(msg)) try: msg = Message.build(json.loads(msg))
except: pass except: pass
@ -57,6 +68,9 @@ class MqttBackend(Backend):
self.logger.info('Received message on the MQTT backend: {}'.format(msg)) self.logger.info('Received message on the MQTT backend: {}'.format(msg))
self.on_message(msg) self.on_message(msg)
if isinstance(msg, Request):
threading.Thread(target=response_thread, args=(msg,)).start()
super().run() super().run()
client = mqtt.Client() client = mqtt.Client()
client.on_connect = on_connect client.on_connect = on_connect

View File

@ -88,7 +88,7 @@ class TcpBackend(Backend):
finally: finally:
sock.close() sock.close()
threading.Thread(target=_f_wrapper).run() threading.Thread(target=_f_wrapper).start()
def run(self): def run(self):
super().run() super().run()