From b76f141b615ea26362c47d9a472269f6a617ed5d Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Wed, 27 Sep 2023 11:23:55 +0200 Subject: [PATCH] Catch response write errors in the MQTT callback. If the client that forwarded the request is no longer available (either because an exception or a timeout was raised) then its I/O buffer and event loop may be closed. In this case, the response callback should handle and report the exception, and still set the event, so that any other threads waiting for the response can move on. --- platypush/plugins/mqtt/__init__.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/platypush/plugins/mqtt/__init__.py b/platypush/plugins/mqtt/__init__.py index a43c037732..d6dc1dd08e 100644 --- a/platypush/plugins/mqtt/__init__.py +++ b/platypush/plugins/mqtt/__init__.py @@ -479,8 +479,9 @@ class MqttPlugin(RunnablePlugin): client.stop() del client - @staticmethod - def _response_callback(reply_topic: str, event: threading.Event, buffer: IO[bytes]): + def _response_callback( + self, reply_topic: str, event: threading.Event, buffer: IO[bytes] + ): """ A response callback that writes the response to an IOBuffer and stops the client loop. @@ -490,9 +491,15 @@ class MqttPlugin(RunnablePlugin): if msg.topic != reply_topic: return - buffer.write(msg.payload) - client.loop_stop() - event.set() + try: + buffer.write(msg.payload) + client.loop_stop() + except Exception as e: + self.logger.warning( + 'Could not write the response back to the MQTT client: %s', e + ) + finally: + event.set() return on_message