From 928bb3667a480bdc2ed86dc4132749e591b047e7 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Mon, 22 Mar 2021 01:51:49 +0100 Subject: [PATCH] Reconnection logic for MQTT disconnections caused by temporary errors --- platypush/backend/mqtt.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/platypush/backend/mqtt.py b/platypush/backend/mqtt.py index 1091ff7da..427d7e554 100644 --- a/platypush/backend/mqtt.py +++ b/platypush/backend/mqtt.py @@ -1,6 +1,8 @@ import json +import logging import os import threading +import time from typing import Optional, List, Callable import paho.mqtt.client as mqtt @@ -16,6 +18,8 @@ from platypush.utils import set_thread_name class MqttClient(mqtt.Client, threading.Thread): + _reconnect_timeout = 10.0 + def __init__(self, *args, host: str, port: int, topics: Optional[List[str]] = None, on_message: Optional[Callable] = None, username: Optional[str] = None, password: Optional[str] = None, client_id: Optional[str] = None, tls_cafile: Optional[str] = None, tls_certfile: Optional[str] = None, @@ -29,6 +33,8 @@ class MqttClient(mqtt.Client, threading.Thread): self.topics = set(topics or []) self.keepalive = keepalive self.on_connect = self.connect_hndl() + self.on_disconnect = self.disconnect_hndl() + self.logger = logging.getLogger(__name__) if on_message: self.on_message = on_message @@ -73,6 +79,20 @@ class MqttClient(mqtt.Client, threading.Thread): return handler + def disconnect_hndl(self): + # noinspection PyUnusedLocal + def handler(client, userdata, rc): + if not self._stop_scheduled and rc in {mqtt.MQTT_ERR_INVAL, mqtt.MQTT_ERR_AGAIN, mqtt.MQTT_ERR_CONN_LOST, + mqtt.MQTT_ERR_CONN_REFUSED, mqtt.MQTT_ERR_NOMEM, mqtt.MQTT_ERR_ERRNO, + mqtt.MQTT_ERR_NO_CONN, mqtt.MQTT_ERR_PAYLOAD_SIZE, + mqtt.MQTT_ERR_QUEUE_SIZE, mqtt.MQTT_ERR_UNKNOWN}: + self.logger.warning('Unexpected disconnection from {}:{}. MQTT error: {}'.format( + self.host, self.port, rc)) + time.sleep(self._reconnect_timeout) + self.connect(host=self.host, port=self.port, keepalive=self.keepalive) + + return handler + def run(self): super().run() self.connect(host=self.host, port=self.port, keepalive=self.keepalive)