From 70db33b4e2f0b615b7361aac6a3a843e0df0cbeb Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Wed, 24 Jul 2024 21:33:04 +0200 Subject: [PATCH] [core] Better Redis connection fail handling logic. If the connection to Redis goes down, it shouldn't take down the main thread. Instead, catch `RedisConnectionError`, and execute `poll` in a loop until the connection is restored. --- platypush/bus/redis.py | 71 +++++++++++++++++++++++++++++------------- 1 file changed, 50 insertions(+), 21 deletions(-) diff --git a/platypush/bus/redis.py b/platypush/bus/redis.py index c23c74622b..e5969452fe 100644 --- a/platypush/bus/redis.py +++ b/platypush/bus/redis.py @@ -1,5 +1,6 @@ import logging import threading +import time from platypush.bus import Bus from platypush.message import Message @@ -14,18 +15,24 @@ class RedisBus(Bus): DEFAULT_REDIS_QUEUE: str = 'platypush/bus' - def __init__(self, *args, on_message=None, redis_queue=None, **kwargs): - from platypush.utils import get_redis - + def __init__(self, *_, on_message=None, redis_queue=None, **kwargs): super().__init__(on_message=on_message) - self.redis = get_redis(*args, **kwargs) self.redis_args = kwargs + self._redis = None self.redis_queue = redis_queue or self.DEFAULT_REDIS_QUEUE self.on_message = on_message self.thread_id = threading.get_ident() self._pubsub = None self._pubsub_lock = threading.RLock() + @property + def redis(self): + from platypush.utils import get_redis + + if not self._redis: + self._redis = get_redis(**self.redis_args) + return self._redis + @property def pubsub(self): with self._pubsub_lock: @@ -37,29 +44,51 @@ class RedisBus(Bus): """ Polls the Redis queue for new messages """ + from redis.exceptions import ConnectionError as RedisConnectionError + from platypush.message.event.application import ApplicationStartedEvent + from platypush.utils import redis_pools - with self.pubsub as pubsub: - pubsub.subscribe(self.redis_queue) - self.post(ApplicationStartedEvent()) + has_error = False - try: - for msg in pubsub.listen(): - if msg.get('type') != 'message': - continue + while not self.should_stop(): + with self.pubsub as pubsub: + try: + pubsub.subscribe(self.redis_queue) + self.post(ApplicationStartedEvent()) - if self.should_stop(): - break + for msg in pubsub.listen(): + if has_error: + logger.info('Redis connection restored') + has_error = False + if msg.get('type') != 'message': + continue + + if self.should_stop(): + break + + try: + data = msg.get('data', b'').decode('utf-8') + parsed_msg = Message.build(data) + if parsed_msg and self.on_message: + self.on_message(parsed_msg) + except Exception as e: + logger.exception(e) + except RedisConnectionError as e: + if not (self.should_stop() or has_error): + logger.warning('Redis connection error: %s', e) + + has_error = True + pubsub.close() + redis_pools.clear() # Clear the connection pool + self._redis = None + time.sleep(1) + finally: try: - data = msg.get('data', b'').decode('utf-8') - parsed_msg = Message.build(data) - if parsed_msg and self.on_message: - self.on_message(parsed_msg) - except Exception as e: - logger.exception(e) - finally: - pubsub.unsubscribe(self.redis_queue) + pubsub.unsubscribe(self.redis_queue) + except RedisConnectionError: + pass def post(self, msg): """