diff --git a/platypush/backend/mqtt.py b/platypush/backend/mqtt.py new file mode 100644 index 0000000000..16b3b2f8f8 --- /dev/null +++ b/platypush/backend/mqtt.py @@ -0,0 +1,61 @@ +import logging +import json + +import paho.mqtt.client as mqtt +import paho.mqtt.publish as publisher + +from platypush.backend import Backend +from platypush.message import Message + + +class MqttBackend(Backend): + """ + Backend that reads messages from a configured MQTT topic + (default: `platypush_bus_mq/`) and posts them to the application bus. + """ + + def __init__(self, host, port=1883, topic='platypush_bus_mq', *args, **kwargs): + """ + Params: + host -- MQTT broker host + port -- MQTT broker port (default: 1883) + topic -- Topic to read messages from (default: platypush_bus_mq/) + """ + super().__init__(*args, **kwargs) + + self.host = host + self.port = port + self.topic = '{}/{}'.format(topic, self.device_id) + + + def send_message(self, msg): + publisher.single(self.topic, str(msg), hostname=self.host, port=self.port) + + + def run(self): + def on_connect(client, userdata, flags, rc): + client.subscribe(self.topic) + + def on_message(client, userdata, msg): + msg = msg.payload.decode('utf-8') + try: + msg = Message.build(json.loads(msg)) + except: + pass + + logging.info('Received message on the MQTT backend: {}'.format(msg)) + self.bus.post(msg) + + super().run() + client = mqtt.Client() + client.on_connect = on_connect + client.on_message = on_message + client.connect(self.host, self.port, 60) + logging.info('Initialized MQTT backend on host {}:{}, topic {}'. + format(self.host, self.port, self.topic)) + + client.loop_forever() + + +# vim:sw=4:ts=4:et: + diff --git a/platypush/plugins/mqtt.py b/platypush/plugins/mqtt.py new file mode 100644 index 0000000000..487d913ac1 --- /dev/null +++ b/platypush/plugins/mqtt.py @@ -0,0 +1,16 @@ +import logging + +import paho.mqtt.publish as publisher + +from platypush.message.response import Response +from platypush.plugins import Plugin + + +class MqttPlugin(Plugin): + def send_message(self, topic, msg, host, port=1883, *args, **kwargs): + publisher.single(topic, str(msg), hostname=host, port=port) + return Response(output={'state': 'ok'}) + + +# vim:sw=4:ts=4:et: +