Added qos argument to mqtt.publish.

This commit is contained in:
Fabio Manganiello 2022-10-05 01:13:47 +02:00
parent 85f583a0ad
commit b88983f055
Signed by untrusted user: blacklight
GPG key ID: D90FBA7F76362774

View file

@ -21,44 +21,68 @@ class MqttPlugin(Plugin):
""" """
def __init__(self, host=None, port=1883, tls_cafile=None, def __init__(
tls_certfile=None, tls_keyfile=None, self,
tls_version=None, tls_ciphers=None, tls_insecure=False, host=None,
username=None, password=None, client_id=None, timeout=None, **kwargs): port=1883,
tls_cafile=None,
tls_certfile=None,
tls_keyfile=None,
tls_version=None,
tls_ciphers=None,
tls_insecure=False,
username=None,
password=None,
client_id=None,
timeout=None,
**kwargs,
):
""" """
:param host: If set, MQTT messages will by default routed to this host unless overridden in `send_message` (default: None) :param host: If set, MQTT messages will by default routed to this host
unless overridden in `send_message` (default: None)
:type host: str :type host: str
:param port: If a default host is set, specify the listen port (default: 1883) :param port: If a default host is set, specify the listen port
(default: 1883)
:type port: int :type port: int
:param tls_cafile: If a default host is set and requires TLS/SSL, specify the certificate authority file (default: None) :param tls_cafile: If a default host is set and requires TLS/SSL,
specify the certificate authority file (default: None)
:type tls_cafile: str :type tls_cafile: str
:param tls_certfile: If a default host is set and requires TLS/SSL, specify the certificate file (default: None) :param tls_certfile: If a default host is set and requires TLS/SSL,
specify the certificate file (default: None)
:type tls_certfile: str :type tls_certfile: str
:param tls_keyfile: If a default host is set and requires TLS/SSL, specify the key file (default: None) :param tls_keyfile: If a default host is set and requires TLS/SSL,
specify the key file (default: None)
:type tls_keyfile: str :type tls_keyfile: str
:param tls_version: If TLS/SSL is enabled on the MQTT server and it requires a certain TLS version, specify it :param tls_version: If TLS/SSL is enabled on the MQTT server and it
here (default: None). Supported versions: ``tls`` (automatic), ``tlsv1``, ``tlsv1.1``, ``tlsv1.2``. requires a certain TLS version, specify it here (default: None).
Supported versions: ``tls`` (automatic), ``tlsv1``, ``tlsv1.1``,
``tlsv1.2``.
:type tls_version: str :type tls_version: str
:param tls_ciphers: If a default host is set and requires TLS/SSL, specify the supported ciphers (default: None) :param tls_ciphers: If a default host is set and requires TLS/SSL,
specify the supported ciphers (default: None)
:type tls_ciphers: str :type tls_ciphers: str
:param tls_insecure: Set to True to ignore TLS insecure warnings (default: False). :param tls_insecure: Set to True to ignore TLS insecure warnings
(default: False).
:type tls_insecure: bool :type tls_insecure: bool
:param username: If a default host is set and requires user authentication, specify the username ciphers (default: None) :param username: If a default host is set and requires user
authentication, specify the username ciphers (default: None)
:type username: str :type username: str
:param password: If a default host is set and requires user authentication, specify the password ciphers (default: None) :param password: If a default host is set and requires user
authentication, specify the password ciphers (default: None)
:type password: str :type password: str
:param client_id: ID used to identify the client on the MQTT server (default: None). :param client_id: ID used to identify the client on the MQTT server
If None is specified then ``Config.get('device_id')`` will be used. (default: None). If None is specified then
``Config.get('device_id')`` will be used.
:type client_id: str :type client_id: str
:param timeout: Client timeout in seconds (default: None). :param timeout: Client timeout in seconds (default: None).
@ -83,10 +107,11 @@ class MqttPlugin(Plugin):
@staticmethod @staticmethod
def get_tls_version(version: Optional[str] = None): def get_tls_version(version: Optional[str] = None):
import ssl import ssl
if not version: if not version:
return None return None
if type(version) == type(ssl.PROTOCOL_TLS): if isinstance(version, type(ssl.PROTOCOL_TLS)):
return version return version
if isinstance(version, str): if isinstance(version, str):
@ -120,10 +145,17 @@ class MqttPlugin(Plugin):
def _expandpath(path: Optional[str] = None) -> Optional[str]: def _expandpath(path: Optional[str] = None) -> Optional[str]:
return os.path.abspath(os.path.expanduser(path)) if path else None return os.path.abspath(os.path.expanduser(path)) if path else None
def _get_client(self, tls_cafile: Optional[str] = None, tls_certfile: Optional[str] = None, def _get_client(
tls_keyfile: Optional[str] = None, tls_version: Optional[str] = None, self,
tls_ciphers: Optional[str] = None, tls_insecure: Optional[bool] = None, tls_cafile: Optional[str] = None,
username: Optional[str] = None, password: Optional[str] = None): tls_certfile: Optional[str] = None,
tls_keyfile: Optional[str] = None,
tls_version: Optional[str] = None,
tls_ciphers: Optional[str] = None,
tls_insecure: Optional[bool] = None,
username: Optional[str] = None,
password: Optional[str] = None,
):
from paho.mqtt.client import Client from paho.mqtt.client import Client
tls_cafile = self._expandpath(tls_cafile or self.tls_cafile) tls_cafile = self._expandpath(tls_cafile or self.tls_cafile)
@ -144,43 +176,77 @@ class MqttPlugin(Plugin):
if username and password: if username and password:
client.username_pw_set(username, password) client.username_pw_set(username, password)
if tls_cafile: if tls_cafile:
client.tls_set(ca_certs=tls_cafile, certfile=tls_certfile, keyfile=tls_keyfile, client.tls_set(
tls_version=tls_version, ciphers=tls_ciphers) ca_certs=tls_cafile,
certfile=tls_certfile,
keyfile=tls_keyfile,
tls_version=tls_version,
ciphers=tls_ciphers,
)
client.tls_insecure_set(tls_insecure) client.tls_insecure_set(tls_insecure)
return client return client
@action @action
def publish(self, topic: str, msg: Any, host: Optional[str] = None, port: Optional[int] = None, def publish(
reply_topic: Optional[str] = None, timeout: int = 60, self,
tls_cafile: Optional[str] = None, tls_certfile: Optional[str] = None, topic: str,
tls_keyfile: Optional[str] = None, tls_version: Optional[str] = None, msg: Any,
tls_ciphers: Optional[str] = None, tls_insecure: Optional[bool] = None, host: Optional[str] = None,
username: Optional[str] = None, password: Optional[str] = None): port: Optional[int] = None,
reply_topic: Optional[str] = None,
timeout: int = 60,
tls_cafile: Optional[str] = None,
tls_certfile: Optional[str] = None,
tls_keyfile: Optional[str] = None,
tls_version: Optional[str] = None,
tls_ciphers: Optional[str] = None,
tls_insecure: Optional[bool] = None,
username: Optional[str] = None,
password: Optional[str] = None,
qos: int = 0,
):
""" """
Sends a message to a topic. Sends a message to a topic.
:param topic: Topic/channel where the message will be delivered :param topic: Topic/channel where the message will be delivered
:param msg: Message to be sent. It can be a list, a dict, or a Message object. :param msg: Message to be sent. It can be a list, a dict, or a Message
:param host: MQTT broker hostname/IP (default: default host configured on the plugin). object.
:param port: MQTT broker port (default: default port configured on the plugin). :param host: MQTT broker hostname/IP (default: default host configured
:param reply_topic: If a ``reply_topic`` is specified, then the action will wait for a response on this topic. on the plugin).
:param timeout: If ``reply_topic`` is set, use this parameter to specify the maximum amount of time to :param port: MQTT broker port (default: default port configured on the
wait for a response (default: 60 seconds). plugin).
:param tls_cafile: If TLS/SSL is enabled on the MQTT server and the certificate requires a certificate authority :param reply_topic: If a ``reply_topic`` is specified, then the action
to authenticate it, `ssl_cafile` will point to the provided ca.crt file (default: None). will wait for a response on this topic.
:param tls_certfile: If TLS/SSL is enabled on the MQTT server and a client certificate it required, specify it :param timeout: If ``reply_topic`` is set, use this parameter to
here (default: None). specify the maximum amount of time to wait for a response (default:
:param tls_keyfile: If TLS/SSL is enabled on the MQTT server and a client certificate key it required, specify 60 seconds).
it here (default: None). :param tls_cafile: If TLS/SSL is enabled on the MQTT server and the
:param tls_version: If TLS/SSL is enabled on the MQTT server and it requires a certain TLS version, specify it certificate requires a certificate authority to authenticate it,
here (default: None). Supported versions: ``tls`` (automatic), ``tlsv1``, ``tlsv1.1``, ``tlsv1.2``. `ssl_cafile` will point to the provided ca.crt file (default:
:param tls_insecure: Set to True to ignore TLS insecure warnings (default: False). None).
:param tls_ciphers: If TLS/SSL is enabled on the MQTT server and an explicit list of supported ciphers is :param tls_certfile: If TLS/SSL is enabled on the MQTT server and a
required, specify it here (default: None). client certificate it required, specify it here (default: None).
:param username: Specify it if the MQTT server requires authentication (default: None). :param tls_keyfile: If TLS/SSL is enabled on the MQTT server and a
:param password: Specify it if the MQTT server requires authentication (default: None). client certificate key it required, specify it here (default:
None).
:param tls_version: If TLS/SSL is enabled on the MQTT server and it
requires a certain TLS version, specify it here (default: None).
Supported versions: ``tls`` (automatic), ``tlsv1``, ``tlsv1.1``,
``tlsv1.2``.
:param tls_insecure: Set to True to ignore TLS insecure warnings
(default: False).
:param tls_ciphers: If TLS/SSL is enabled on the MQTT server and an
explicit list of supported ciphers is required, specify it here
(default: None).
:param username: Specify it if the MQTT server requires authentication
(default: None).
:param password: Specify it if the MQTT server requires authentication
(default: None).
:param qos: Quality of Service (_QoS_) for the message - see `MQTT QoS
<https://assetwolf.com/learn/mqtt-qos-understanding-quality-of-service>`_
(default: 0).
""" """
response_buffer = io.BytesIO() response_buffer = io.BytesIO()
client = None client = None
@ -199,20 +265,29 @@ class MqttPlugin(Plugin):
port = port or self.port or 1883 port = port or self.port or 1883
assert host, 'No host specified' assert host, 'No host specified'
client = self._get_client(tls_cafile=tls_cafile, tls_certfile=tls_certfile, tls_keyfile=tls_keyfile, client = self._get_client(
tls_version=tls_version, tls_ciphers=tls_ciphers, tls_insecure=tls_insecure, tls_cafile=tls_cafile,
username=username, password=password) tls_certfile=tls_certfile,
tls_keyfile=tls_keyfile,
tls_version=tls_version,
tls_ciphers=tls_ciphers,
tls_insecure=tls_insecure,
username=username,
password=password,
)
client.connect(host, port, keepalive=timeout) client.connect(host, port, keepalive=timeout)
response_received = threading.Event() response_received = threading.Event()
if reply_topic: if reply_topic:
client.on_message = self._response_callback(reply_topic=reply_topic, client.on_message = self._response_callback(
event=response_received, reply_topic=reply_topic,
buffer=response_buffer) event=response_received,
buffer=response_buffer,
)
client.subscribe(reply_topic) client.subscribe(reply_topic)
client.publish(topic, str(msg)) client.publish(topic, str(msg), qos=qos)
if not reply_topic: if not reply_topic:
return return
@ -241,6 +316,7 @@ class MqttPlugin(Plugin):
buffer.write(msg.payload) buffer.write(msg.payload)
client.loop_stop() client.loop_stop()
event.set() event.set()
return on_message return on_message
@action @action