diff --git a/platypush/app.py b/platypush/app.py index ec1c84cc..702d4f79 100644 --- a/platypush/app.py +++ b/platypush/app.py @@ -1,6 +1,7 @@ import argparse import logging import os +import subprocess import sys from typing import Optional @@ -16,7 +17,7 @@ from .message.event import Event from .message.event.application import ApplicationStartedEvent from .message.request import Request from .message.response import Response -from .utils import get_enabled_plugins +from .utils import get_enabled_plugins, get_redis_conf log = logging.getLogger('platypush') @@ -27,6 +28,9 @@ class Application: # Default bus queue name _default_redis_queue = 'platypush/bus' + # Default Redis port + _default_redis_port = 6379 + # backend_name => backend_obj map backends = None @@ -42,6 +46,9 @@ class Application: no_capture_stderr: bool = False, redis_queue: Optional[str] = None, verbose: bool = False, + start_redis: bool = False, + redis_host: Optional[str] = None, + redis_port: Optional[int] = None, ): """ :param config_file: Configuration file override (default: None). @@ -58,6 +65,15 @@ class Application: messages (default: platypush/bus). :param verbose: Enable debug/verbose logging, overriding the stored configuration (default: False). + :param start_redis: If set, it starts a managed Redis instance upon + boot (it requires the ``redis-server`` executable installed on the + server). This is particularly useful when running the application + inside of Docker containers, without relying on ``docker-compose`` + to start multiple containers, and in tests (default: False). + :param redis_host: Host of the Redis server to be used. It overrides + the settings in the ``redis`` section of the configuration file. + :param redis_port: Port of the local Redis server. It overrides the + settings in the ``redis`` section of the configuration file. """ self.pidfile = pidfile @@ -78,16 +94,29 @@ class Application: self.requests_to_process = requests_to_process self.processed_requests = 0 self.cron_scheduler = None + self.start_redis = start_redis + self.redis_host = redis_host + self.redis_port = redis_port + self.redis_conf = {} + self._redis_proc: Optional[subprocess.Popen] = None self._init_bus() self._init_logging() def _init_bus(self): - redis_conf = Config.get('backend.redis') or {} + self._redis_conf = get_redis_conf() + self._redis_conf['port'] = self.redis_port or self._redis_conf.get( + 'port', self._default_redis_port + ) + + if self.redis_host: + self._redis_conf['host'] = self.redis_host + + Config.set('redis', self._redis_conf) self.bus = RedisBus( redis_queue=self.redis_queue, on_message=self.on_message(), - **redis_conf.get('redis_args', {}) + **self._redis_conf, ) def _init_logging(self): @@ -96,6 +125,37 @@ class Application: logging_conf['level'] = logging.DEBUG logging.basicConfig(**logging_conf) + def _start_redis(self): + if self._redis_proc and self._redis_proc.poll() is None: + log.warning( + 'A local Redis instance is already running, refusing to start it again' + ) + return + + port = self._redis_conf['port'] + log.info('Starting local Redis instance on %s', port) + self._redis_proc = subprocess.Popen( + [ + 'redis-server', + '--bind', + 'localhost', + '--port', + str(port), + ], + stdout=subprocess.PIPE, + ) + + log.info('Waiting for Redis to start') + for line in self._redis_proc.stdout: # type: ignore + if b'Ready to accept connections' in line: + break + + def _stop_redis(self): + if self._redis_proc and self._redis_proc.poll() is None: + log.info('Stopping local Redis instance') + self._redis_proc.kill() + self._redis_proc = None + @classmethod def build(cls, *args: str): """ @@ -104,6 +164,7 @@ class Application: from . import __version__ parser = argparse.ArgumentParser() + parser.add_argument( '--config', '-c', @@ -112,6 +173,7 @@ class Application: default=None, help='Custom location for the configuration file', ) + parser.add_argument( '--version', dest='version', @@ -119,6 +181,7 @@ class Application: action='store_true', help="Print the current version and exit", ) + parser.add_argument( '--verbose', '-v', @@ -127,6 +190,7 @@ class Application: action='store_true', help="Enable verbose/debug logging", ) + parser.add_argument( '--pidfile', '-P', @@ -137,6 +201,7 @@ class Application: + "store its PID, useful if you're planning to " + "integrate it in a service", ) + parser.add_argument( '--no-capture-stdout', dest='no_capture_stdout', @@ -146,6 +211,7 @@ class Application: + "exceeded errors so stdout won't be captured by " + "the logging system", ) + parser.add_argument( '--no-capture-stderr', dest='no_capture_stderr', @@ -155,6 +221,7 @@ class Application: + "exceeded errors so stderr won't be captured by " + "the logging system", ) + parser.add_argument( '--redis-queue', dest='redis_queue', @@ -164,6 +231,34 @@ class Application: "(default: platypush/bus)", ) + parser.add_argument( + '--start-redis', + dest='start_redis', + required=False, + action='store_true', + help="Set this flag if you want to run and manage Redis internally " + "from the app rather than using an external server. It requires the " + "redis-server executable to be present in the path", + ) + + parser.add_argument( + '--redis-host', + dest='redis_host', + required=False, + default=None, + help="Overrides the host specified in the redis section of the " + "configuration file", + ) + + parser.add_argument( + '--redis-port', + dest='redis_port', + required=False, + default=None, + help="Overrides the port specified in the redis section of the " + "configuration file", + ) + opts, _ = parser.parse_known_args(args) if opts.version: print(__version__) @@ -176,6 +271,9 @@ class Application: no_capture_stderr=opts.no_capture_stderr, redis_queue=opts.redis_queue, verbose=opts.verbose, + start_redis=opts.start_redis, + redis_host=opts.redis_host, + redis_port=opts.redis_port, ) def on_message(self): @@ -234,6 +332,9 @@ class Application: self.entities_engine.stop() self.entities_engine = None + if self.start_redis: + self._stop_redis() + def run(self): """Start the daemon.""" from . import __version__ @@ -245,6 +346,10 @@ class Application: log.info('---- Starting platypush v.%s', __version__) + # Start the local Redis service if required + if self.start_redis: + self._start_redis() + # Initialize the backends and link them to the bus self.backends = register_backends(bus=self.bus, global_scope=True) diff --git a/platypush/backend/http/app/utils/bus.py b/platypush/backend/http/app/utils/bus.py index 54556ced..6571599e 100644 --- a/platypush/backend/http/app/utils/bus.py +++ b/platypush/backend/http/app/utils/bus.py @@ -5,7 +5,7 @@ from platypush.config import Config from platypush.context import get_backend from platypush.message import Message from platypush.message.request import Request -from platypush.utils import get_redis_queue_name_by_message +from platypush.utils import get_redis_conf, get_redis_queue_name_by_message from .logger import logger @@ -13,17 +13,27 @@ _bus = None def bus(): - global _bus + """ + Lazy getter/initializer for the bus object. + """ + global _bus # pylint: disable=global-statement if _bus is None: redis_queue = get_backend('http').bus.redis_queue # type: ignore - _bus = RedisBus(redis_queue=redis_queue) + _bus = RedisBus(**get_redis_conf(), redis_queue=redis_queue) return _bus def send_message(msg, wait_for_response=True): + """ + Send a message to the bus. + + :param msg: The message to send. + :param wait_for_response: If ``True``, wait for the response to be received + before returning, otherwise return immediately. + """ msg = Message.build(msg) if msg is None: - return + return None if isinstance(msg, Request): msg.origin = 'http' @@ -35,12 +45,22 @@ def send_message(msg, wait_for_response=True): if isinstance(msg, Request) and wait_for_response: response = get_message_response(msg) - logger().debug('Processing response on the HTTP backend: {}'.format(response)) + logger().debug('Processing response on the HTTP backend: %s', response) return response + return None + def send_request(action, wait_for_response=True, **kwargs): + """ + Send a request to the bus. + + :param action: The action to send. + :param wait_for_response: If ``True``, wait for the response to be received + before returning, otherwise return immediately. + :param kwargs: Additional arguments to pass to the action. + """ msg = {'type': 'request', 'action': action} if kwargs: @@ -50,10 +70,16 @@ def send_request(action, wait_for_response=True, **kwargs): def get_message_response(msg): + """ + Get the response to the given message. + + :param msg: The message to get the response for. + :return: The response to the given message. + """ redis = Redis(**bus().redis_args) redis_queue = get_redis_queue_name_by_message(msg) if not redis_queue: - return + return None response = redis.blpop(redis_queue, timeout=60) if response and len(response) > 1: diff --git a/platypush/bus/redis.py b/platypush/bus/redis.py index 15335902..e8273a12 100644 --- a/platypush/bus/redis.py +++ b/platypush/bus/redis.py @@ -29,7 +29,6 @@ class RedisBus(Bus): """ Reads one message from the Redis queue """ - try: if self.should_stop(): return None @@ -49,8 +48,17 @@ class RedisBus(Bus): """ Sends a message to the Redis queue """ + from redis import exceptions - return self.redis.rpush(self.redis_queue, str(msg)) + try: + return self.redis.rpush(self.redis_queue, str(msg)) + except exceptions.ConnectionError as e: + if not self.should_stop(): + # Raise the exception only if the bus it not supposed to be + # stopped + raise e + + return None def stop(self): super().stop() diff --git a/platypush/plugins/redis/__init__.py b/platypush/plugins/redis/__init__.py index 931581cd..9b471290 100644 --- a/platypush/plugins/redis/__init__.py +++ b/platypush/plugins/redis/__init__.py @@ -1,7 +1,7 @@ from redis import Redis -from platypush.context import get_backend from platypush.plugins import Plugin, action +from platypush.utils import get_redis_conf class RedisPlugin(Plugin): @@ -12,15 +12,7 @@ class RedisPlugin(Plugin): def __init__(self, *args, **kwargs): super().__init__() self.args = args - self.kwargs = kwargs - - if not kwargs: - try: - redis_backend = get_backend('redis') - if redis_backend and redis_backend.redis_args: - self.kwargs = redis_backend.redis_args - except Exception as e: - self.logger.debug(e) + self.kwargs = kwargs or get_redis_conf() def _get_redis(self): return Redis(*self.args, **self.kwargs) @@ -58,7 +50,7 @@ class RedisPlugin(Plugin): """ return { - keys[i]: value.decode() if value else value + keys[i]: value.decode() if isinstance(value, bytes) else value for (i, value) in enumerate(self._get_redis().mget(keys, *args)) } @@ -75,7 +67,7 @@ class RedisPlugin(Plugin): # broke back-compatibility with the previous way of passing # key-value pairs to mset directly on kwargs. This try-catch block # is to support things on all the redis-py versions - return self._get_redis().mset(mapping=kwargs) + return self._get_redis().mset(mapping=kwargs) # type: ignore @action def expire(self, key, expiration): diff --git a/platypush/utils/__init__.py b/platypush/utils/__init__.py index 378bbb9b..fd80f1e9 100644 --- a/platypush/utils/__init__.py +++ b/platypush/utils/__init__.py @@ -559,24 +559,31 @@ def get_enabled_plugins() -> dict: return plugins +def get_redis_conf() -> dict: + """ + Get the Redis connection arguments from the configuration. + """ + from platypush.config import Config + + return ( + Config.get('redis') + or (Config.get('backend.redis') or {}).get('redis_args', {}) + or {} + ) + + def get_redis(*args, **kwargs) -> Redis: """ Get a Redis client on the basis of the Redis configuration. The Redis configuration can be loaded from: - 1. The ``backend.redis`` configuration (``redis_args`` attribute) - 2. The ``redis`` plugin. + 1. The ``redis`` plugin. + 2. The ``backend.redis`` configuration (``redis_args`` attribute) """ - from platypush.config import Config - if not (args or kwargs): - kwargs = ( - Config.get('redis') - or (Config.get('backend.redis') or {}).get('redis_args', {}) - or {} - ) + kwargs = get_redis_conf() return Redis(*args, **kwargs)