Close Redis connection upon backend stop

This commit is contained in:
Fabio Manganiello 2021-07-26 01:06:35 +02:00
parent 8e2d4d0bce
commit 0f37102ce4
1 changed files with 15 additions and 16 deletions

View File

@ -1,4 +1,5 @@
import json import json
from typing import Optional
from redis import Redis from redis import Redis
@ -17,6 +18,7 @@ class RedisBackend(Backend):
Requires: Requires:
* **redis** (``pip install redis``) * **redis** (``pip install redis``)
""" """
def __init__(self, queue='platypush_bus_mq', redis_args=None, *args, **kwargs): def __init__(self, queue='platypush_bus_mq', redis_args=None, *args, **kwargs):
@ -25,7 +27,7 @@ class RedisBackend(Backend):
:type queue: str :type queue: str
:param redis_args: Arguments that will be passed to the redis-py constructor (e.g. host, port, password), see :param redis_args: Arguments that will be passed to the redis-py constructor (e.g. host, port, password), see
http://redis-py.readthedocs.io/en/latest/ https://redis-py.readthedocs.io/en/latest/
:type redis_args: dict :type redis_args: dict
""" """
@ -41,7 +43,7 @@ class RedisBackend(Backend):
redis_args = redis_plugin.kwargs redis_args = redis_plugin.kwargs
self.redis_args = redis_args self.redis_args = redis_args
self.redis = Redis(**self.redis_args) self.redis: Optional[Redis] = None
def send_message(self, msg, queue_name=None, **kwargs): def send_message(self, msg, queue_name=None, **kwargs):
msg = str(msg) msg = str(msg)
@ -50,7 +52,6 @@ class RedisBackend(Backend):
else: else:
self.redis.rpush(self.queue, msg) self.redis.rpush(self.queue, msg)
# noinspection PyBroadException
def get_message(self, queue_name=None): def get_message(self, queue_name=None):
queue = queue_name or self.queue queue = queue_name or self.queue
data = self.redis.blpop(queue, timeout=1) data = self.redis.blpop(queue, timeout=1)
@ -76,21 +77,19 @@ class RedisBackend(Backend):
def run(self): def run(self):
super().run() super().run()
self.logger.info('Initialized Redis backend on queue {} with arguments {}'.format(self.queue, self.redis_args))
self.logger.info('Initialized Redis backend on queue {} with arguments {}'. with Redis(**self.redis_args) as self.redis:
format(self.queue, self.redis_args)) while not self.should_stop():
try:
msg = self.get_message()
if not msg:
continue
while not self.should_stop(): self.logger.info('Received message on the Redis backend: {}'.format(msg))
try: self.on_message(msg)
msg = self.get_message() except Exception as e:
if not msg: self.logger.exception(e)
continue
self.logger.info('Received message on the Redis backend: {}'.
format(msg))
self.on_message(msg)
except Exception as e:
self.logger.exception(e)
self.logger.info('Redis backend terminated') self.logger.info('Redis backend terminated')