Reconnection logic for MQTT disconnections caused by temporary errors

This commit is contained in:
Fabio Manganiello 2021-03-22 01:51:49 +01:00
parent 782be7794b
commit 928bb3667a
1 changed files with 20 additions and 0 deletions

View File

@ -1,6 +1,8 @@
import json import json
import logging
import os import os
import threading import threading
import time
from typing import Optional, List, Callable from typing import Optional, List, Callable
import paho.mqtt.client as mqtt import paho.mqtt.client as mqtt
@ -16,6 +18,8 @@ from platypush.utils import set_thread_name
class MqttClient(mqtt.Client, threading.Thread): class MqttClient(mqtt.Client, threading.Thread):
_reconnect_timeout = 10.0
def __init__(self, *args, host: str, port: int, topics: Optional[List[str]] = None, def __init__(self, *args, host: str, port: int, topics: Optional[List[str]] = None,
on_message: Optional[Callable] = None, username: Optional[str] = None, password: Optional[str] = None, on_message: Optional[Callable] = None, username: Optional[str] = None, password: Optional[str] = None,
client_id: Optional[str] = None, tls_cafile: Optional[str] = None, tls_certfile: Optional[str] = None, client_id: Optional[str] = None, tls_cafile: Optional[str] = None, tls_certfile: Optional[str] = None,
@ -29,6 +33,8 @@ class MqttClient(mqtt.Client, threading.Thread):
self.topics = set(topics or []) self.topics = set(topics or [])
self.keepalive = keepalive self.keepalive = keepalive
self.on_connect = self.connect_hndl() self.on_connect = self.connect_hndl()
self.on_disconnect = self.disconnect_hndl()
self.logger = logging.getLogger(__name__)
if on_message: if on_message:
self.on_message = on_message self.on_message = on_message
@ -73,6 +79,20 @@ class MqttClient(mqtt.Client, threading.Thread):
return handler return handler
def disconnect_hndl(self):
# noinspection PyUnusedLocal
def handler(client, userdata, rc):
if not self._stop_scheduled and rc in {mqtt.MQTT_ERR_INVAL, mqtt.MQTT_ERR_AGAIN, mqtt.MQTT_ERR_CONN_LOST,
mqtt.MQTT_ERR_CONN_REFUSED, mqtt.MQTT_ERR_NOMEM, mqtt.MQTT_ERR_ERRNO,
mqtt.MQTT_ERR_NO_CONN, mqtt.MQTT_ERR_PAYLOAD_SIZE,
mqtt.MQTT_ERR_QUEUE_SIZE, mqtt.MQTT_ERR_UNKNOWN}:
self.logger.warning('Unexpected disconnection from {}:{}. MQTT error: {}'.format(
self.host, self.port, rc))
time.sleep(self._reconnect_timeout)
self.connect(host=self.host, port=self.port, keepalive=self.keepalive)
return handler
def run(self): def run(self):
super().run() super().run()
self.connect(host=self.host, port=self.port, keepalive=self.keepalive) self.connect(host=self.host, port=self.port, keepalive=self.keepalive)