Unified interface to retrieve the Redis bus configuration.

A common `utils.get_redis_conf` has been created to handle the cascade
fallback logic used to retrive the default Redis configuration.
This commit is contained in:
Fabio Manganiello 2023-07-24 01:04:13 +02:00
parent 77ffefdccb
commit e9a568fdd2
Signed by: blacklight
GPG key ID: D90FBA7F76362774
5 changed files with 170 additions and 32 deletions

View file

@ -1,6 +1,7 @@
import argparse import argparse
import logging import logging
import os import os
import subprocess
import sys import sys
from typing import Optional from typing import Optional
@ -16,7 +17,7 @@ from .message.event import Event
from .message.event.application import ApplicationStartedEvent from .message.event.application import ApplicationStartedEvent
from .message.request import Request from .message.request import Request
from .message.response import Response from .message.response import Response
from .utils import get_enabled_plugins from .utils import get_enabled_plugins, get_redis_conf
log = logging.getLogger('platypush') log = logging.getLogger('platypush')
@ -27,6 +28,9 @@ class Application:
# Default bus queue name # Default bus queue name
_default_redis_queue = 'platypush/bus' _default_redis_queue = 'platypush/bus'
# Default Redis port
_default_redis_port = 6379
# backend_name => backend_obj map # backend_name => backend_obj map
backends = None backends = None
@ -42,6 +46,9 @@ class Application:
no_capture_stderr: bool = False, no_capture_stderr: bool = False,
redis_queue: Optional[str] = None, redis_queue: Optional[str] = None,
verbose: bool = False, verbose: bool = False,
start_redis: bool = False,
redis_host: Optional[str] = None,
redis_port: Optional[int] = None,
): ):
""" """
:param config_file: Configuration file override (default: None). :param config_file: Configuration file override (default: None).
@ -58,6 +65,15 @@ class Application:
messages (default: platypush/bus). messages (default: platypush/bus).
:param verbose: Enable debug/verbose logging, overriding the stored :param verbose: Enable debug/verbose logging, overriding the stored
configuration (default: False). configuration (default: False).
:param start_redis: If set, it starts a managed Redis instance upon
boot (it requires the ``redis-server`` executable installed on the
server). This is particularly useful when running the application
inside of Docker containers, without relying on ``docker-compose``
to start multiple containers, and in tests (default: False).
:param redis_host: Host of the Redis server to be used. It overrides
the settings in the ``redis`` section of the configuration file.
:param redis_port: Port of the local Redis server. It overrides the
settings in the ``redis`` section of the configuration file.
""" """
self.pidfile = pidfile self.pidfile = pidfile
@ -78,16 +94,29 @@ class Application:
self.requests_to_process = requests_to_process self.requests_to_process = requests_to_process
self.processed_requests = 0 self.processed_requests = 0
self.cron_scheduler = None self.cron_scheduler = None
self.start_redis = start_redis
self.redis_host = redis_host
self.redis_port = redis_port
self.redis_conf = {}
self._redis_proc: Optional[subprocess.Popen] = None
self._init_bus() self._init_bus()
self._init_logging() self._init_logging()
def _init_bus(self): def _init_bus(self):
redis_conf = Config.get('backend.redis') or {} self._redis_conf = get_redis_conf()
self._redis_conf['port'] = self.redis_port or self._redis_conf.get(
'port', self._default_redis_port
)
if self.redis_host:
self._redis_conf['host'] = self.redis_host
Config.set('redis', self._redis_conf)
self.bus = RedisBus( self.bus = RedisBus(
redis_queue=self.redis_queue, redis_queue=self.redis_queue,
on_message=self.on_message(), on_message=self.on_message(),
**redis_conf.get('redis_args', {}) **self._redis_conf,
) )
def _init_logging(self): def _init_logging(self):
@ -96,6 +125,37 @@ class Application:
logging_conf['level'] = logging.DEBUG logging_conf['level'] = logging.DEBUG
logging.basicConfig(**logging_conf) logging.basicConfig(**logging_conf)
def _start_redis(self):
if self._redis_proc and self._redis_proc.poll() is None:
log.warning(
'A local Redis instance is already running, refusing to start it again'
)
return
port = self._redis_conf['port']
log.info('Starting local Redis instance on %s', port)
self._redis_proc = subprocess.Popen(
[
'redis-server',
'--bind',
'localhost',
'--port',
str(port),
],
stdout=subprocess.PIPE,
)
log.info('Waiting for Redis to start')
for line in self._redis_proc.stdout: # type: ignore
if b'Ready to accept connections' in line:
break
def _stop_redis(self):
if self._redis_proc and self._redis_proc.poll() is None:
log.info('Stopping local Redis instance')
self._redis_proc.kill()
self._redis_proc = None
@classmethod @classmethod
def build(cls, *args: str): def build(cls, *args: str):
""" """
@ -104,6 +164,7 @@ class Application:
from . import __version__ from . import __version__
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument( parser.add_argument(
'--config', '--config',
'-c', '-c',
@ -112,6 +173,7 @@ class Application:
default=None, default=None,
help='Custom location for the configuration file', help='Custom location for the configuration file',
) )
parser.add_argument( parser.add_argument(
'--version', '--version',
dest='version', dest='version',
@ -119,6 +181,7 @@ class Application:
action='store_true', action='store_true',
help="Print the current version and exit", help="Print the current version and exit",
) )
parser.add_argument( parser.add_argument(
'--verbose', '--verbose',
'-v', '-v',
@ -127,6 +190,7 @@ class Application:
action='store_true', action='store_true',
help="Enable verbose/debug logging", help="Enable verbose/debug logging",
) )
parser.add_argument( parser.add_argument(
'--pidfile', '--pidfile',
'-P', '-P',
@ -137,6 +201,7 @@ class Application:
+ "store its PID, useful if you're planning to " + "store its PID, useful if you're planning to "
+ "integrate it in a service", + "integrate it in a service",
) )
parser.add_argument( parser.add_argument(
'--no-capture-stdout', '--no-capture-stdout',
dest='no_capture_stdout', dest='no_capture_stdout',
@ -146,6 +211,7 @@ class Application:
+ "exceeded errors so stdout won't be captured by " + "exceeded errors so stdout won't be captured by "
+ "the logging system", + "the logging system",
) )
parser.add_argument( parser.add_argument(
'--no-capture-stderr', '--no-capture-stderr',
dest='no_capture_stderr', dest='no_capture_stderr',
@ -155,6 +221,7 @@ class Application:
+ "exceeded errors so stderr won't be captured by " + "exceeded errors so stderr won't be captured by "
+ "the logging system", + "the logging system",
) )
parser.add_argument( parser.add_argument(
'--redis-queue', '--redis-queue',
dest='redis_queue', dest='redis_queue',
@ -164,6 +231,34 @@ class Application:
"(default: platypush/bus)", "(default: platypush/bus)",
) )
parser.add_argument(
'--start-redis',
dest='start_redis',
required=False,
action='store_true',
help="Set this flag if you want to run and manage Redis internally "
"from the app rather than using an external server. It requires the "
"redis-server executable to be present in the path",
)
parser.add_argument(
'--redis-host',
dest='redis_host',
required=False,
default=None,
help="Overrides the host specified in the redis section of the "
"configuration file",
)
parser.add_argument(
'--redis-port',
dest='redis_port',
required=False,
default=None,
help="Overrides the port specified in the redis section of the "
"configuration file",
)
opts, _ = parser.parse_known_args(args) opts, _ = parser.parse_known_args(args)
if opts.version: if opts.version:
print(__version__) print(__version__)
@ -176,6 +271,9 @@ class Application:
no_capture_stderr=opts.no_capture_stderr, no_capture_stderr=opts.no_capture_stderr,
redis_queue=opts.redis_queue, redis_queue=opts.redis_queue,
verbose=opts.verbose, verbose=opts.verbose,
start_redis=opts.start_redis,
redis_host=opts.redis_host,
redis_port=opts.redis_port,
) )
def on_message(self): def on_message(self):
@ -234,6 +332,9 @@ class Application:
self.entities_engine.stop() self.entities_engine.stop()
self.entities_engine = None self.entities_engine = None
if self.start_redis:
self._stop_redis()
def run(self): def run(self):
"""Start the daemon.""" """Start the daemon."""
from . import __version__ from . import __version__
@ -245,6 +346,10 @@ class Application:
log.info('---- Starting platypush v.%s', __version__) log.info('---- Starting platypush v.%s', __version__)
# Start the local Redis service if required
if self.start_redis:
self._start_redis()
# Initialize the backends and link them to the bus # Initialize the backends and link them to the bus
self.backends = register_backends(bus=self.bus, global_scope=True) self.backends = register_backends(bus=self.bus, global_scope=True)

View file

@ -5,7 +5,7 @@ from platypush.config import Config
from platypush.context import get_backend from platypush.context import get_backend
from platypush.message import Message from platypush.message import Message
from platypush.message.request import Request from platypush.message.request import Request
from platypush.utils import get_redis_queue_name_by_message from platypush.utils import get_redis_conf, get_redis_queue_name_by_message
from .logger import logger from .logger import logger
@ -13,17 +13,27 @@ _bus = None
def bus(): def bus():
global _bus """
Lazy getter/initializer for the bus object.
"""
global _bus # pylint: disable=global-statement
if _bus is None: if _bus is None:
redis_queue = get_backend('http').bus.redis_queue # type: ignore redis_queue = get_backend('http').bus.redis_queue # type: ignore
_bus = RedisBus(redis_queue=redis_queue) _bus = RedisBus(**get_redis_conf(), redis_queue=redis_queue)
return _bus return _bus
def send_message(msg, wait_for_response=True): def send_message(msg, wait_for_response=True):
"""
Send a message to the bus.
:param msg: The message to send.
:param wait_for_response: If ``True``, wait for the response to be received
before returning, otherwise return immediately.
"""
msg = Message.build(msg) msg = Message.build(msg)
if msg is None: if msg is None:
return return None
if isinstance(msg, Request): if isinstance(msg, Request):
msg.origin = 'http' msg.origin = 'http'
@ -35,12 +45,22 @@ def send_message(msg, wait_for_response=True):
if isinstance(msg, Request) and wait_for_response: if isinstance(msg, Request) and wait_for_response:
response = get_message_response(msg) response = get_message_response(msg)
logger().debug('Processing response on the HTTP backend: {}'.format(response)) logger().debug('Processing response on the HTTP backend: %s', response)
return response return response
return None
def send_request(action, wait_for_response=True, **kwargs): def send_request(action, wait_for_response=True, **kwargs):
"""
Send a request to the bus.
:param action: The action to send.
:param wait_for_response: If ``True``, wait for the response to be received
before returning, otherwise return immediately.
:param kwargs: Additional arguments to pass to the action.
"""
msg = {'type': 'request', 'action': action} msg = {'type': 'request', 'action': action}
if kwargs: if kwargs:
@ -50,10 +70,16 @@ def send_request(action, wait_for_response=True, **kwargs):
def get_message_response(msg): def get_message_response(msg):
"""
Get the response to the given message.
:param msg: The message to get the response for.
:return: The response to the given message.
"""
redis = Redis(**bus().redis_args) redis = Redis(**bus().redis_args)
redis_queue = get_redis_queue_name_by_message(msg) redis_queue = get_redis_queue_name_by_message(msg)
if not redis_queue: if not redis_queue:
return return None
response = redis.blpop(redis_queue, timeout=60) response = redis.blpop(redis_queue, timeout=60)
if response and len(response) > 1: if response and len(response) > 1:

View file

@ -29,7 +29,6 @@ class RedisBus(Bus):
""" """
Reads one message from the Redis queue Reads one message from the Redis queue
""" """
try: try:
if self.should_stop(): if self.should_stop():
return None return None
@ -49,8 +48,17 @@ class RedisBus(Bus):
""" """
Sends a message to the Redis queue Sends a message to the Redis queue
""" """
from redis import exceptions
return self.redis.rpush(self.redis_queue, str(msg)) try:
return self.redis.rpush(self.redis_queue, str(msg))
except exceptions.ConnectionError as e:
if not self.should_stop():
# Raise the exception only if the bus it not supposed to be
# stopped
raise e
return None
def stop(self): def stop(self):
super().stop() super().stop()

View file

@ -1,7 +1,7 @@
from redis import Redis from redis import Redis
from platypush.context import get_backend
from platypush.plugins import Plugin, action from platypush.plugins import Plugin, action
from platypush.utils import get_redis_conf
class RedisPlugin(Plugin): class RedisPlugin(Plugin):
@ -12,15 +12,7 @@ class RedisPlugin(Plugin):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super().__init__() super().__init__()
self.args = args self.args = args
self.kwargs = kwargs self.kwargs = kwargs or get_redis_conf()
if not kwargs:
try:
redis_backend = get_backend('redis')
if redis_backend and redis_backend.redis_args:
self.kwargs = redis_backend.redis_args
except Exception as e:
self.logger.debug(e)
def _get_redis(self): def _get_redis(self):
return Redis(*self.args, **self.kwargs) return Redis(*self.args, **self.kwargs)
@ -58,7 +50,7 @@ class RedisPlugin(Plugin):
""" """
return { return {
keys[i]: value.decode() if value else value keys[i]: value.decode() if isinstance(value, bytes) else value
for (i, value) in enumerate(self._get_redis().mget(keys, *args)) for (i, value) in enumerate(self._get_redis().mget(keys, *args))
} }
@ -75,7 +67,7 @@ class RedisPlugin(Plugin):
# broke back-compatibility with the previous way of passing # broke back-compatibility with the previous way of passing
# key-value pairs to mset directly on kwargs. This try-catch block # key-value pairs to mset directly on kwargs. This try-catch block
# is to support things on all the redis-py versions # is to support things on all the redis-py versions
return self._get_redis().mset(mapping=kwargs) return self._get_redis().mset(mapping=kwargs) # type: ignore
@action @action
def expire(self, key, expiration): def expire(self, key, expiration):

View file

@ -559,24 +559,31 @@ def get_enabled_plugins() -> dict:
return plugins return plugins
def get_redis_conf() -> dict:
"""
Get the Redis connection arguments from the configuration.
"""
from platypush.config import Config
return (
Config.get('redis')
or (Config.get('backend.redis') or {}).get('redis_args', {})
or {}
)
def get_redis(*args, **kwargs) -> Redis: def get_redis(*args, **kwargs) -> Redis:
""" """
Get a Redis client on the basis of the Redis configuration. Get a Redis client on the basis of the Redis configuration.
The Redis configuration can be loaded from: The Redis configuration can be loaded from:
1. The ``backend.redis`` configuration (``redis_args`` attribute) 1. The ``redis`` plugin.
2. The ``redis`` plugin. 2. The ``backend.redis`` configuration (``redis_args`` attribute)
""" """
from platypush.config import Config
if not (args or kwargs): if not (args or kwargs):
kwargs = ( kwargs = get_redis_conf()
Config.get('redis')
or (Config.get('backend.redis') or {}).get('redis_args', {})
or {}
)
return Redis(*args, **kwargs) return Redis(*args, **kwargs)