platypush/platypush/backend/mqtt.py

87 lines
2.5 KiB
Python
Raw Normal View History

import json
import threading
import paho.mqtt.client as mqtt
import paho.mqtt.publish as publisher
from platypush.backend import Backend
from platypush.message import Message
from platypush.message.request import Request
class MqttBackend(Backend):
"""
2018-06-26 00:16:39 +02:00
Backend that reads messages from a configured MQTT topic (default:
``platypush_bus_mq/<device_id>``) and posts them to the application bus.
Requires:
* **paho-mqtt** (``pip install paho-mqtt``)
"""
def __init__(self, host, port=1883, topic='platypush_bus_mq', *args, **kwargs):
"""
2018-06-26 00:16:39 +02:00
:param host: MQTT broker host
:type host: str
:param port: MQTT broker port (default: 1883)
:type port: int
:param topic: Topic to read messages from (default: ``platypush_bus_mq/<device_id>``)
:type topic: str
"""
2018-06-26 00:16:39 +02:00
super().__init__(*args, **kwargs)
self.host = host
self.port = port
self.topic = '{}/{}'.format(topic, self.device_id)
def send_message(self, msg):
2018-06-12 09:28:10 +02:00
try:
msg = Message.build(json.loads(msg))
2018-06-12 09:28:10 +02:00
except:
pass
publisher.single(self.topic, str(msg), hostname=self.host, port=self.port)
def run(self):
def on_connect(client, userdata, flags, rc):
client.subscribe(self.topic)
def on_message(client, userdata, msg):
def response_thread(msg):
response = self.get_message_response(msg)
response_topic = '{}/responses/{}'.format(self.topic, msg.id)
self.logger.info('Processing response on the MQTT topic {}: {}'.
format(response_topic, response))
publisher.single(response_topic, str(response),
hostname=self.host, port=self.port)
msg = msg.payload.decode('utf-8')
try: msg = Message.build(json.loads(msg))
except: pass
2018-06-06 20:09:18 +02:00
self.logger.info('Received message on the MQTT backend: {}'.format(msg))
self.on_message(msg)
if isinstance(msg, Request):
threading.Thread(target=response_thread, args=(msg,)).start()
super().run()
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect(self.host, self.port, 60)
2018-06-06 20:09:18 +02:00
self.logger.info('Initialized MQTT backend on host {}:{}, topic {}'.
format(self.host, self.port, self.topic))
client.loop_forever()
# vim:sw=4:ts=4:et: