Added client_id in MQTT integrations

This commit is contained in:
Fabio Manganiello 2020-08-27 16:41:51 +02:00
parent 9f1128e2c0
commit beceb39b0c
2 changed files with 22 additions and 7 deletions

View file

@ -4,6 +4,7 @@ import threading
from typing import Optional
from platypush.backend import Backend
from platypush.config import Config
from platypush.context import get_plugin
from platypush.message import Message
from platypush.message.event.mqtt import MQTTMessageEvent
@ -34,7 +35,8 @@ class MqttBackend(Backend):
tls_cafile: Optional[str] = None, tls_certfile: Optional[str] = None,
tls_keyfile: Optional[str] = None, tls_version: Optional[str] = None,
tls_ciphers: Optional[str] = None, tls_insecure: bool = False,
username: Optional[str] = None, password: Optional[str] = None, listeners=None,
username: Optional[str] = None, password: Optional[str] = None,
client_id: Optional[str] = None, listeners=None,
*args, **kwargs):
"""
:param host: MQTT broker host
@ -56,6 +58,8 @@ class MqttBackend(Backend):
:param tls_insecure: Set to True to ignore TLS insecure warnings (default: False).
:param username: Specify it if the MQTT server requires authentication (default: None)
:param password: Specify it if the MQTT server requires authentication (default: None)
:param client_id: ID used to identify the client on the MQTT server (default: None).
If None is specified then ``Config.get('device_id')`` will be used.
:param listeners: If specified then the MQTT backend will also listen for
messages on the additional configured message queues. This parameter
is a list of maps where each item supports the same arguments passed
@ -85,6 +89,7 @@ class MqttBackend(Backend):
self.subscribe_default_topic = subscribe_default_topic
self.username = username
self.password = password
self.client_id = client_id or Config.get('device_id')
self._client = None
self._listeners = []
@ -104,7 +109,7 @@ class MqttBackend(Backend):
password=self.password, tls_cafile=self.tls_cafile,
tls_certfile=self.tls_certfile, tls_keyfile=self.tls_keyfile,
tls_version=self.tls_version, tls_insecure=self.tls_insecure,
tls_ciphers=self.tls_ciphers, **kwargs)
tls_ciphers=self.tls_ciphers, client_id=self.client_id, **kwargs)
except Exception as e:
self.logger.exception(e)
@ -151,6 +156,7 @@ class MqttBackend(Backend):
topics = listener.get('topics')
username = listener.get('username')
password = listener.get('password')
client_id = listener.get('client_id', self.client_id)
tls_cafile = self._expandpath(listener.get('tls_cafile'))
if not host or not topics:
@ -158,7 +164,7 @@ class MqttBackend(Backend):
'listener n.{}'.format(i + 1))
continue
client = mqtt.Client()
client = mqtt.Client(client_id)
client.on_connect = self.on_connect(*topics)
client.on_message = self.on_mqtt_message()
@ -223,7 +229,7 @@ class MqttBackend(Backend):
self._client = None
if self.host:
self._client = mqtt.Client()
self._client = mqtt.Client(self.client_id)
if self.subscribe_default_topic:
self._client.on_connect = self.on_connect(self.topic)

View file

@ -5,6 +5,7 @@ import threading
from typing import Any, Optional, IO
from platypush.config import Config
from platypush.message import Message
from platypush.plugins import Plugin, action
@ -23,7 +24,7 @@ class MqttPlugin(Plugin):
def __init__(self, host=None, port=1883, tls_cafile=None,
tls_certfile=None, tls_keyfile=None,
tls_version=None, tls_ciphers=None, tls_insecure=False,
username=None, password=None, **kwargs):
username=None, password=None, client_id=None, **kwargs):
"""
:param host: If set, MQTT messages will by default routed to this host unless overridden in `send_message` (default: None)
:type host: str
@ -55,6 +56,10 @@ class MqttPlugin(Plugin):
:param password: If a default host is set and requires user authentication, specify the password ciphers (default: None)
:type password: str
:param client_id: ID used to identify the client on the MQTT server (default: None).
If None is specified then ``Config.get('device_id')`` will be used.
:type client_id: str
"""
super().__init__(**kwargs)
@ -63,6 +68,7 @@ class MqttPlugin(Plugin):
self.port = port
self.username = username
self.password = password
self.client_id = client_id or Config.get('device_id')
self.tls_cafile = os.path.abspath(os.path.expanduser(tls_cafile)) \
if tls_cafile else None
@ -100,7 +106,8 @@ class MqttPlugin(Plugin):
tls_cafile: Optional[str] = None, tls_certfile: Optional[str] = None,
tls_keyfile: Optional[str] = None, tls_version: Optional[str] = None,
tls_ciphers: Optional[str] = None, tls_insecure: Optional[bool] = None,
username: Optional[str] = None, password: Optional[str] = None):
username: Optional[str] = None, password: Optional[str] = None,
client_id: Optional[str] = None):
"""
Sends a message to a topic.
@ -124,12 +131,14 @@ class MqttPlugin(Plugin):
required, specify it here (default: None).
:param username: Specify it if the MQTT server requires authentication (default: None).
:param password: Specify it if the MQTT server requires authentication (default: None).
:param client_id: Override the default client_id (default: None).
"""
from paho.mqtt.client import Client
if not host and not self.host:
raise RuntimeError('No host specified and no default host configured')
client_id = client_id or self.client_id
if not host:
tls_cafile = self.tls_cafile
tls_certfile = self.tls_certfile
@ -145,7 +154,7 @@ class MqttPlugin(Plugin):
if tls_insecure is None:
tls_insecure = self.tls_insecure
client = Client()
client = Client(client_id)
if username and password:
client.username_pw_set(username, password)