[core] Better Redis connection fail handling logic.

If the connection to Redis goes down, it shouldn't take down the main
thread.

Instead, catch `RedisConnectionError`, and execute `poll` in a loop
until the connection is restored.
This commit is contained in:
Fabio Manganiello 2024-07-24 21:33:04 +02:00
parent 357d92b479
commit 70db33b4e2
Signed by untrusted user: blacklight
GPG key ID: D90FBA7F76362774

View file

@ -1,5 +1,6 @@
import logging
import threading
import time
from platypush.bus import Bus
from platypush.message import Message
@ -14,18 +15,24 @@ class RedisBus(Bus):
DEFAULT_REDIS_QUEUE: str = 'platypush/bus'
def __init__(self, *args, on_message=None, redis_queue=None, **kwargs):
from platypush.utils import get_redis
def __init__(self, *_, on_message=None, redis_queue=None, **kwargs):
super().__init__(on_message=on_message)
self.redis = get_redis(*args, **kwargs)
self.redis_args = kwargs
self._redis = None
self.redis_queue = redis_queue or self.DEFAULT_REDIS_QUEUE
self.on_message = on_message
self.thread_id = threading.get_ident()
self._pubsub = None
self._pubsub_lock = threading.RLock()
@property
def redis(self):
from platypush.utils import get_redis
if not self._redis:
self._redis = get_redis(**self.redis_args)
return self._redis
@property
def pubsub(self):
with self._pubsub_lock:
@ -37,29 +44,51 @@ class RedisBus(Bus):
"""
Polls the Redis queue for new messages
"""
from redis.exceptions import ConnectionError as RedisConnectionError
from platypush.message.event.application import ApplicationStartedEvent
from platypush.utils import redis_pools
with self.pubsub as pubsub:
pubsub.subscribe(self.redis_queue)
self.post(ApplicationStartedEvent())
has_error = False
try:
for msg in pubsub.listen():
if msg.get('type') != 'message':
continue
while not self.should_stop():
with self.pubsub as pubsub:
try:
pubsub.subscribe(self.redis_queue)
self.post(ApplicationStartedEvent())
if self.should_stop():
break
for msg in pubsub.listen():
if has_error:
logger.info('Redis connection restored')
has_error = False
if msg.get('type') != 'message':
continue
if self.should_stop():
break
try:
data = msg.get('data', b'').decode('utf-8')
parsed_msg = Message.build(data)
if parsed_msg and self.on_message:
self.on_message(parsed_msg)
except Exception as e:
logger.exception(e)
except RedisConnectionError as e:
if not (self.should_stop() or has_error):
logger.warning('Redis connection error: %s', e)
has_error = True
pubsub.close()
redis_pools.clear() # Clear the connection pool
self._redis = None
time.sleep(1)
finally:
try:
data = msg.get('data', b'').decode('utf-8')
parsed_msg = Message.build(data)
if parsed_msg and self.on_message:
self.on_message(parsed_msg)
except Exception as e:
logger.exception(e)
finally:
pubsub.unsubscribe(self.redis_queue)
pubsub.unsubscribe(self.redis_queue)
except RedisConnectionError:
pass
def post(self, msg):
"""