From b88983f055038cc773ac57793f507ae5febd92b9 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Wed, 5 Oct 2022 01:13:47 +0200 Subject: [PATCH] Added `qos` argument to `mqtt.publish`. --- platypush/plugins/mqtt/__init__.py | 188 ++++++++++++++++++++--------- 1 file changed, 132 insertions(+), 56 deletions(-) diff --git a/platypush/plugins/mqtt/__init__.py b/platypush/plugins/mqtt/__init__.py index d757211cc..20b7034c0 100644 --- a/platypush/plugins/mqtt/__init__.py +++ b/platypush/plugins/mqtt/__init__.py @@ -21,44 +21,68 @@ class MqttPlugin(Plugin): """ - def __init__(self, host=None, port=1883, tls_cafile=None, - tls_certfile=None, tls_keyfile=None, - tls_version=None, tls_ciphers=None, tls_insecure=False, - username=None, password=None, client_id=None, timeout=None, **kwargs): + def __init__( + self, + host=None, + port=1883, + tls_cafile=None, + tls_certfile=None, + tls_keyfile=None, + tls_version=None, + tls_ciphers=None, + tls_insecure=False, + username=None, + password=None, + client_id=None, + timeout=None, + **kwargs, + ): """ - :param host: If set, MQTT messages will by default routed to this host unless overridden in `send_message` (default: None) + :param host: If set, MQTT messages will by default routed to this host + unless overridden in `send_message` (default: None) :type host: str - :param port: If a default host is set, specify the listen port (default: 1883) + :param port: If a default host is set, specify the listen port + (default: 1883) :type port: int - :param tls_cafile: If a default host is set and requires TLS/SSL, specify the certificate authority file (default: None) + :param tls_cafile: If a default host is set and requires TLS/SSL, + specify the certificate authority file (default: None) :type tls_cafile: str - :param tls_certfile: If a default host is set and requires TLS/SSL, specify the certificate file (default: None) + :param tls_certfile: If a default host is set and requires TLS/SSL, + specify the certificate file (default: None) :type tls_certfile: str - :param tls_keyfile: If a default host is set and requires TLS/SSL, specify the key file (default: None) + :param tls_keyfile: If a default host is set and requires TLS/SSL, + specify the key file (default: None) :type tls_keyfile: str - :param tls_version: If TLS/SSL is enabled on the MQTT server and it requires a certain TLS version, specify it - here (default: None). Supported versions: ``tls`` (automatic), ``tlsv1``, ``tlsv1.1``, ``tlsv1.2``. + :param tls_version: If TLS/SSL is enabled on the MQTT server and it + requires a certain TLS version, specify it here (default: None). + Supported versions: ``tls`` (automatic), ``tlsv1``, ``tlsv1.1``, + ``tlsv1.2``. :type tls_version: str - :param tls_ciphers: If a default host is set and requires TLS/SSL, specify the supported ciphers (default: None) + :param tls_ciphers: If a default host is set and requires TLS/SSL, + specify the supported ciphers (default: None) :type tls_ciphers: str - :param tls_insecure: Set to True to ignore TLS insecure warnings (default: False). + :param tls_insecure: Set to True to ignore TLS insecure warnings + (default: False). :type tls_insecure: bool - :param username: If a default host is set and requires user authentication, specify the username ciphers (default: None) + :param username: If a default host is set and requires user + authentication, specify the username ciphers (default: None) :type username: str - :param password: If a default host is set and requires user authentication, specify the password ciphers (default: None) + :param password: If a default host is set and requires user + authentication, specify the password ciphers (default: None) :type password: str - :param client_id: ID used to identify the client on the MQTT server (default: None). - If None is specified then ``Config.get('device_id')`` will be used. + :param client_id: ID used to identify the client on the MQTT server + (default: None). If None is specified then + ``Config.get('device_id')`` will be used. :type client_id: str :param timeout: Client timeout in seconds (default: None). @@ -83,10 +107,11 @@ class MqttPlugin(Plugin): @staticmethod def get_tls_version(version: Optional[str] = None): import ssl + if not version: return None - if type(version) == type(ssl.PROTOCOL_TLS): + if isinstance(version, type(ssl.PROTOCOL_TLS)): return version if isinstance(version, str): @@ -120,10 +145,17 @@ class MqttPlugin(Plugin): def _expandpath(path: Optional[str] = None) -> Optional[str]: return os.path.abspath(os.path.expanduser(path)) if path else None - def _get_client(self, tls_cafile: Optional[str] = None, tls_certfile: Optional[str] = None, - tls_keyfile: Optional[str] = None, tls_version: Optional[str] = None, - tls_ciphers: Optional[str] = None, tls_insecure: Optional[bool] = None, - username: Optional[str] = None, password: Optional[str] = None): + def _get_client( + self, + tls_cafile: Optional[str] = None, + tls_certfile: Optional[str] = None, + tls_keyfile: Optional[str] = None, + tls_version: Optional[str] = None, + tls_ciphers: Optional[str] = None, + tls_insecure: Optional[bool] = None, + username: Optional[str] = None, + password: Optional[str] = None, + ): from paho.mqtt.client import Client tls_cafile = self._expandpath(tls_cafile or self.tls_cafile) @@ -144,43 +176,77 @@ class MqttPlugin(Plugin): if username and password: client.username_pw_set(username, password) if tls_cafile: - client.tls_set(ca_certs=tls_cafile, certfile=tls_certfile, keyfile=tls_keyfile, - tls_version=tls_version, ciphers=tls_ciphers) + client.tls_set( + ca_certs=tls_cafile, + certfile=tls_certfile, + keyfile=tls_keyfile, + tls_version=tls_version, + ciphers=tls_ciphers, + ) client.tls_insecure_set(tls_insecure) return client @action - def publish(self, topic: str, msg: Any, host: Optional[str] = None, port: Optional[int] = None, - reply_topic: Optional[str] = None, timeout: int = 60, - tls_cafile: Optional[str] = None, tls_certfile: Optional[str] = None, - tls_keyfile: Optional[str] = None, tls_version: Optional[str] = None, - tls_ciphers: Optional[str] = None, tls_insecure: Optional[bool] = None, - username: Optional[str] = None, password: Optional[str] = None): + def publish( + self, + topic: str, + msg: Any, + host: Optional[str] = None, + port: Optional[int] = None, + reply_topic: Optional[str] = None, + timeout: int = 60, + tls_cafile: Optional[str] = None, + tls_certfile: Optional[str] = None, + tls_keyfile: Optional[str] = None, + tls_version: Optional[str] = None, + tls_ciphers: Optional[str] = None, + tls_insecure: Optional[bool] = None, + username: Optional[str] = None, + password: Optional[str] = None, + qos: int = 0, + ): """ Sends a message to a topic. :param topic: Topic/channel where the message will be delivered - :param msg: Message to be sent. It can be a list, a dict, or a Message object. - :param host: MQTT broker hostname/IP (default: default host configured on the plugin). - :param port: MQTT broker port (default: default port configured on the plugin). - :param reply_topic: If a ``reply_topic`` is specified, then the action will wait for a response on this topic. - :param timeout: If ``reply_topic`` is set, use this parameter to specify the maximum amount of time to - wait for a response (default: 60 seconds). - :param tls_cafile: If TLS/SSL is enabled on the MQTT server and the certificate requires a certificate authority - to authenticate it, `ssl_cafile` will point to the provided ca.crt file (default: None). - :param tls_certfile: If TLS/SSL is enabled on the MQTT server and a client certificate it required, specify it - here (default: None). - :param tls_keyfile: If TLS/SSL is enabled on the MQTT server and a client certificate key it required, specify - it here (default: None). - :param tls_version: If TLS/SSL is enabled on the MQTT server and it requires a certain TLS version, specify it - here (default: None). Supported versions: ``tls`` (automatic), ``tlsv1``, ``tlsv1.1``, ``tlsv1.2``. - :param tls_insecure: Set to True to ignore TLS insecure warnings (default: False). - :param tls_ciphers: If TLS/SSL is enabled on the MQTT server and an explicit list of supported ciphers is - required, specify it here (default: None). - :param username: Specify it if the MQTT server requires authentication (default: None). - :param password: Specify it if the MQTT server requires authentication (default: None). + :param msg: Message to be sent. It can be a list, a dict, or a Message + object. + :param host: MQTT broker hostname/IP (default: default host configured + on the plugin). + :param port: MQTT broker port (default: default port configured on the + plugin). + :param reply_topic: If a ``reply_topic`` is specified, then the action + will wait for a response on this topic. + :param timeout: If ``reply_topic`` is set, use this parameter to + specify the maximum amount of time to wait for a response (default: + 60 seconds). + :param tls_cafile: If TLS/SSL is enabled on the MQTT server and the + certificate requires a certificate authority to authenticate it, + `ssl_cafile` will point to the provided ca.crt file (default: + None). + :param tls_certfile: If TLS/SSL is enabled on the MQTT server and a + client certificate it required, specify it here (default: None). + :param tls_keyfile: If TLS/SSL is enabled on the MQTT server and a + client certificate key it required, specify it here (default: + None). + :param tls_version: If TLS/SSL is enabled on the MQTT server and it + requires a certain TLS version, specify it here (default: None). + Supported versions: ``tls`` (automatic), ``tlsv1``, ``tlsv1.1``, + ``tlsv1.2``. + :param tls_insecure: Set to True to ignore TLS insecure warnings + (default: False). + :param tls_ciphers: If TLS/SSL is enabled on the MQTT server and an + explicit list of supported ciphers is required, specify it here + (default: None). + :param username: Specify it if the MQTT server requires authentication + (default: None). + :param password: Specify it if the MQTT server requires authentication + (default: None). + :param qos: Quality of Service (_QoS_) for the message - see `MQTT QoS + `_ + (default: 0). """ response_buffer = io.BytesIO() client = None @@ -199,20 +265,29 @@ class MqttPlugin(Plugin): port = port or self.port or 1883 assert host, 'No host specified' - client = self._get_client(tls_cafile=tls_cafile, tls_certfile=tls_certfile, tls_keyfile=tls_keyfile, - tls_version=tls_version, tls_ciphers=tls_ciphers, tls_insecure=tls_insecure, - username=username, password=password) + client = self._get_client( + tls_cafile=tls_cafile, + tls_certfile=tls_certfile, + tls_keyfile=tls_keyfile, + tls_version=tls_version, + tls_ciphers=tls_ciphers, + tls_insecure=tls_insecure, + username=username, + password=password, + ) client.connect(host, port, keepalive=timeout) response_received = threading.Event() if reply_topic: - client.on_message = self._response_callback(reply_topic=reply_topic, - event=response_received, - buffer=response_buffer) + client.on_message = self._response_callback( + reply_topic=reply_topic, + event=response_received, + buffer=response_buffer, + ) client.subscribe(reply_topic) - client.publish(topic, str(msg)) + client.publish(topic, str(msg), qos=qos) if not reply_topic: return @@ -241,6 +316,7 @@ class MqttPlugin(Plugin): buffer.write(msg.payload) client.loop_stop() event.set() + return on_message @action