diff --git a/platypush/plugins/mqtt/__init__.py b/platypush/plugins/mqtt/__init__.py index a40c6253e6..ac9e22a176 100644 --- a/platypush/plugins/mqtt/__init__.py +++ b/platypush/plugins/mqtt/__init__.py @@ -178,7 +178,6 @@ class MqttPlugin(RunnablePlugin): host: str, port: int, client_id: Optional[str] = None, - on_message: Optional[MqttCallback] = None, topics: Iterable[str] = (), **_, ) -> str: @@ -193,7 +192,6 @@ class MqttPlugin(RunnablePlugin): host, str(port), json.dumps(sorted(topics)), - str(id(on_message)), ] ).encode() ).hexdigest() @@ -344,7 +342,6 @@ class MqttPlugin(RunnablePlugin): host=kwargs['host'], port=kwargs['port'], client_id=client_id, - on_message=on_message, topics=topics, ) @@ -435,14 +432,8 @@ class MqttPlugin(RunnablePlugin): response_buffer.close() if client: - try: - client.loop_stop() - except Exception as e: - self.logger.warning( - 'Could not stop client loop: %s: %s', type(e).__name__, e - ) - - client.disconnect() + client.stop() + del client @action def subscribe(self, topic: str, **mqtt_kwargs): @@ -473,7 +464,21 @@ class MqttPlugin(RunnablePlugin): :param mqtt_kwargs: MQTT broker configuration (host, port, username, password etc.). See :meth:`.__init__` parameters. """ - self._get_client(**mqtt_kwargs).unsubscribe(topic) + client_id = self._get_client_id( + topics=(topic,), + **mqtt_kwargs, + ) + + with self._listeners_lock[client_id]: + client = self.listeners.get(client_id) + + if not client: + self.logger.info('No subscriptions found for topic %s', topic) + return + + client.unsubscribe(topic) + client.stop() + del client @staticmethod def _response_callback(reply_topic: str, event: threading.Event, buffer: IO[bytes]): @@ -527,6 +532,7 @@ class MqttPlugin(RunnablePlugin): for listener in self.listeners.values(): try: listener.join(timeout=1) + del listener except Exception: pass diff --git a/platypush/plugins/mqtt/_client.py b/platypush/plugins/mqtt/_client.py index 5d813bf7bc..5ef0c9d4d8 100644 --- a/platypush/plugins/mqtt/_client.py +++ b/platypush/plugins/mqtt/_client.py @@ -236,6 +236,11 @@ class MqttClient(mqtt.Client, threading.Thread): if not self.is_alive(): return + try: + self.loop_stop() + except Exception as e: + self.logger.debug('Could not stop client loop: %s: %s', type(e).__name__, e) + self._stop_scheduled = True self.disconnect() self._running = False