Support for TLS/SSL and user authentication on MQTT

This commit is contained in:
Fabio Manganiello 2018-11-02 15:15:48 +00:00
parent e2ff62f15d
commit 788a2652c8
2 changed files with 114 additions and 11 deletions

View file

@ -1,10 +1,12 @@
import json import json
import os
import threading import threading
import paho.mqtt.client as mqtt import paho.mqtt.client as mqtt
import paho.mqtt.publish as publisher import paho.mqtt.publish as publisher
from platypush.backend import Backend from platypush.backend import Backend
from platypush.context import get_plugin
from platypush.message import Message from platypush.message import Message
from platypush.message.request import Request from platypush.message.request import Request
@ -19,7 +21,10 @@ class MqttBackend(Backend):
* **paho-mqtt** (``pip install paho-mqtt``) * **paho-mqtt** (``pip install paho-mqtt``)
""" """
def __init__(self, host, port=1883, topic='platypush_bus_mq', *args, **kwargs): def __init__(self, host, port=1883, topic='platypush_bus_mq', tls_cafile=None,
tls_certfile=None, tls_keyfile=None,
tls_version=None, tls_ciphers=None, username=None,
password=None, *args, **kwargs):
""" """
:param host: MQTT broker host :param host: MQTT broker host
:type host: str :type host: str
@ -29,6 +34,27 @@ class MqttBackend(Backend):
:param topic: Topic to read messages from (default: ``platypush_bus_mq/<device_id>``) :param topic: Topic to read messages from (default: ``platypush_bus_mq/<device_id>``)
:type topic: str :type topic: str
:param tls_cafile: If TLS/SSL is enabled on the MQTT server and the certificate requires a certificate authority to authenticate it, `ssl_cafile` will point to the provided ca.crt file (default: None)
:type tls_cafile: str
:param tls_certfile: If TLS/SSL is enabled on the MQTT server and a client certificate it required, specify it here (default: None)
:type tls_certfile: str
:param tls_keyfile: If TLS/SSL is enabled on the MQTT server and a client certificate key it required, specify it here (default: None)
:type tls_keyfile: str
:param tls_version: If TLS/SSL is enabled on the MQTT server and it requires a certain TLS version, specify it here (default: None)
:type tls_version: str
:param tls_ciphers: If TLS/SSL is enabled on the MQTT server and an explicit list of supported ciphers is required, specify it here (default: None)
:type tls_ciphers: str
:param username: Specify it if the MQTT server requires authentication (default: None)
:type username: str
:param password: Specify it if the MQTT server requires authentication (default: None)
:type password: str
""" """
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
@ -36,15 +62,31 @@ class MqttBackend(Backend):
self.host = host self.host = host
self.port = port self.port = port
self.topic = '{}/{}'.format(topic, self.device_id) self.topic = '{}/{}'.format(topic, self.device_id)
self.username = username
self.password = password
self.tls_cafile = os.path.abspath(os.path.expanduser(tls_cafile)) \
if tls_cafile else None
self.tls_certfile = os.path.abspath(os.path.expanduser(tls_certfile)) \
if tls_certfile else None
self.tls_keyfile = os.path.abspath(os.path.expanduser(tls_keyfile)) \
if tls_keyfile else None
self.tls_version = tls_version
self.tls_ciphers = tls_ciphers
def send_message(self, msg): def send_message(self, msg):
try: client = get_plugin('mqtt')
msg = Message.build(json.loads(msg)) client.send_message(topic=self.topic, msg=msg, host=self.host,
except: port=self.port, username=self.username,
pass password=self.password, tls_cafile=self.tls_cafile,
tls_certfile=self.tls_certfile,
publisher.single(self.topic, str(msg), hostname=self.host, port=self.port) tls_keyfile=self.tls_keyfile,
tls_version=self.tls_version,
tls_ciphers=self.tls_ciphers)
def run(self): def run(self):
def on_connect(client, userdata, flags, rc): def on_connect(client, userdata, flags, rc):
@ -58,8 +100,14 @@ class MqttBackend(Backend):
self.logger.info('Processing response on the MQTT topic {}: {}'. self.logger.info('Processing response on the MQTT topic {}: {}'.
format(response_topic, response)) format(response_topic, response))
publisher.single(response_topic, str(response), client = get_plugin('mqtt')
hostname=self.host, port=self.port) client.send_message(topic=self.topic, msg=msg, host=self.host,
port=self.port, username=self.username,
password=self.password, tls_cafile=self.tls_cafile,
tls_certfile=self.tls_certfile,
tls_keyfile=self.tls_keyfile,
tls_version=self.tls_version,
tls_ciphers=self.tls_ciphers)
msg = msg.payload.decode('utf-8') msg = msg.payload.decode('utf-8')
try: msg = Message.build(json.loads(msg)) try: msg = Message.build(json.loads(msg))
@ -75,6 +123,15 @@ class MqttBackend(Backend):
client = mqtt.Client() client = mqtt.Client()
client.on_connect = on_connect client.on_connect = on_connect
client.on_message = on_message client.on_message = on_message
if self.username and self.password:
client.username_pw_set(self.username, self.password)
if self.tls_cafile:
client.tls_set(ca_certs=self.tls_cafile, certfile=self.tls_certfile,
keyfile=self.tls_keyfile, tls_version=self.tls_version,
ciphers=self.tls_ciphers)
client.connect(self.host, self.port, 60) client.connect(self.host, self.port, 60)
self.logger.info('Initialized MQTT backend on host {}:{}, topic {}'. self.logger.info('Initialized MQTT backend on host {}:{}, topic {}'.
format(self.host, self.port, self.topic)) format(self.host, self.port, self.topic))

View file

@ -15,7 +15,10 @@ class MqttPlugin(Plugin):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
@action @action
def send_message(self, topic, msg, host, port=1883, *args, **kwargs): def send_message(self, topic, msg, host, port=1883, tls_cafile=None,
tls_certfile=None, tls_keyfile=None,
tls_version=None, tls_ciphers=None, username=None,
password=None, *args, **kwargs):
""" """
Sends a message to a topic/channel. Sends a message to a topic/channel.
@ -29,15 +32,58 @@ class MqttPlugin(Plugin):
:param port: MQTT broker port (default: 1883) :param port: MQTT broker port (default: 1883)
:type port: int :type port: int
:param tls_cafile: If TLS/SSL is enabled on the MQTT server and the certificate requires a certificate authority to authenticate it, `ssl_cafile` will point to the provided ca.crt file (default: None)
:type tls_cafile: str
:param tls_certfile: If TLS/SSL is enabled on the MQTT server and a client certificate it required, specify it here (default: None)
:type tls_certfile: str
:param tls_keyfile: If TLS/SSL is enabled on the MQTT server and a client certificate key it required, specify it here (default: None)
:type tls_keyfile: str
:param tls_version: If TLS/SSL is enabled on the MQTT server and it requires a certain TLS version, specify it here (default: None)
:type tls_version: str
:param tls_ciphers: If TLS/SSL is enabled on the MQTT server and an explicit list of supported ciphers is required, specify it here (default: None)
:type tls_ciphers: str
:param username: Specify it if the MQTT server requires authentication (default: None)
:type username: str
:param password: Specify it if the MQTT server requires authentication (default: None)
:type password: str
""" """
publisher_args = {
'hostname': host,
'port': port,
}
if username and password:
publisher_args['auth'] = {
'username': username,
'password': password,
}
if tls_cafile:
publisher_args['tls'] = { 'ca_certs': tls_cafile }
if tls_certfile:
publishers_args['tls']['certfile'] = tls_certfile
if tls_keyfile:
publishers_args['tls']['keyfile'] = tls_keyfile
if tls_version:
publishers_args['tls']['tls_version'] = tls_version
if tls_ciphers:
publishers_args['tls']['ciphers'] = tls_ciphers
try: msg = json.dumps(msg) try: msg = json.dumps(msg)
except: pass except: pass
try: msg = Message.build(json.loads(msg)) try: msg = Message.build(json.loads(msg))
except: pass except: pass
publisher.single(topic, str(msg), hostname=host, port=port) publisher.single(topic, str(msg), **publisher_args)
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et: