diff --git a/platypush/backend/redis.py b/platypush/backend/redis.py index 655ead527..45d2a6225 100644 --- a/platypush/backend/redis.py +++ b/platypush/backend/redis.py @@ -1,4 +1,5 @@ import json +from typing import Optional from redis import Redis @@ -17,6 +18,7 @@ class RedisBackend(Backend): Requires: * **redis** (``pip install redis``) + """ def __init__(self, queue='platypush_bus_mq', redis_args=None, *args, **kwargs): @@ -25,7 +27,7 @@ class RedisBackend(Backend): :type queue: str :param redis_args: Arguments that will be passed to the redis-py constructor (e.g. host, port, password), see - http://redis-py.readthedocs.io/en/latest/ + https://redis-py.readthedocs.io/en/latest/ :type redis_args: dict """ @@ -41,7 +43,7 @@ class RedisBackend(Backend): redis_args = redis_plugin.kwargs self.redis_args = redis_args - self.redis = Redis(**self.redis_args) + self.redis: Optional[Redis] = None def send_message(self, msg, queue_name=None, **kwargs): msg = str(msg) @@ -50,7 +52,6 @@ class RedisBackend(Backend): else: self.redis.rpush(self.queue, msg) - # noinspection PyBroadException def get_message(self, queue_name=None): queue = queue_name or self.queue data = self.redis.blpop(queue, timeout=1) @@ -76,21 +77,19 @@ class RedisBackend(Backend): def run(self): super().run() + self.logger.info('Initialized Redis backend on queue {} with arguments {}'.format(self.queue, self.redis_args)) - self.logger.info('Initialized Redis backend on queue {} with arguments {}'. - format(self.queue, self.redis_args)) + with Redis(**self.redis_args) as self.redis: + while not self.should_stop(): + try: + msg = self.get_message() + if not msg: + continue - while not self.should_stop(): - try: - msg = self.get_message() - if not msg: - continue - - self.logger.info('Received message on the Redis backend: {}'. - format(msg)) - self.on_message(msg) - except Exception as e: - self.logger.exception(e) + self.logger.info('Received message on the Redis backend: {}'.format(msg)) + self.on_message(msg) + except Exception as e: + self.logger.exception(e) self.logger.info('Redis backend terminated')