From 090e7d6de8e167d2eeaf39713c00227aa27a040e Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Sat, 6 Mar 2021 19:22:13 +0100 Subject: [PATCH] Support for specifying the application Redis queue from the command line or service constructor --- platypush/__init__.py | 19 ++++++++++++++++--- platypush/backend/http/__init__.py | 1 + platypush/backend/http/app/__init__.py | 3 ++- platypush/backend/http/app/utils.py | 4 ++-- platypush/bus/redis.py | 5 ++--- tests/conftest.py | 3 +-- 6 files changed, 24 insertions(+), 11 deletions(-) diff --git a/platypush/__init__.py b/platypush/__init__.py index e0505f40f..ac22e93fb 100644 --- a/platypush/__init__.py +++ b/platypush/__init__.py @@ -41,6 +41,9 @@ class Daemon: # - plugins will post the responses they process bus = None + # Default bus queue name + _default_redis_queue = 'platypush/bus' + pidfile = None # backend_name => backend_obj map @@ -50,7 +53,7 @@ class Daemon: n_tries = 2 def __init__(self, config_file=None, pidfile=None, requests_to_process=None, - no_capture_stdout=False, no_capture_stderr=False): + no_capture_stdout=False, no_capture_stderr=False, redis_queue=None): """ Constructor Params: @@ -64,6 +67,7 @@ class Daemon: capture by the logging system no_capture_stderr -- Set to true if you want to disable the stderr capture by the logging system + redis_queue -- Name of the (Redis) queue used for dispatching messages (default: platypush/bus). """ if pidfile: @@ -71,12 +75,15 @@ class Daemon: with open(self.pidfile, 'w') as f: f.write(str(os.getpid())) + self.redis_queue = redis_queue or self._default_redis_queue self.config_file = config_file Config.init(self.config_file) logging.basicConfig(**Config.get('logging')) redis_conf = Config.get('backend.redis') or {} - self.bus = RedisBus(on_message=self.on_message(), **redis_conf.get('redis_args', {})) + self.bus = RedisBus(redis_queue=self.redis_queue, on_message=self.on_message(), + **redis_conf.get('redis_args', {})) + self.no_capture_stdout = no_capture_stdout self.no_capture_stderr = no_capture_stderr self.event_processor = EventProcessor() @@ -108,11 +115,17 @@ class Daemon: help="Set this flag if you have max stack depth " + "exceeded errors so stderr won't be captured by " + "the logging system") + parser.add_argument('--redis-queue', dest='redis_queue', + required=False, action='store_true', + default=cls._default_redis_queue, + help="Name of the Redis queue to be used to internally deliver messages " + "(default: platypush/bus)") opts, args = parser.parse_known_args(args) return cls(config_file=opts.config, pidfile=opts.pidfile, no_capture_stdout=opts.no_capture_stdout, - no_capture_stderr=opts.no_capture_stderr) + no_capture_stderr=opts.no_capture_stderr, + redis_queue=opts.redis_queue) def on_message(self): """ diff --git a/platypush/backend/http/__init__.py b/platypush/backend/http/__init__.py index d3995b0c8..c898ab20e 100644 --- a/platypush/backend/http/__init__.py +++ b/platypush/backend/http/__init__.py @@ -391,6 +391,7 @@ class HttpBackend(Backend): 'debug': False, } + application.config['redis_queue'] = self.bus.redis_queue if self.ssl_context: kwargs['ssl_context'] = self.ssl_context diff --git a/platypush/backend/http/app/__init__.py b/platypush/backend/http/app/__init__.py index e04208df9..25ef34471 100644 --- a/platypush/backend/http/app/__init__.py +++ b/platypush/backend/http/app/__init__.py @@ -28,7 +28,8 @@ base_folder = os.path.abspath(os.path.join( template_folder = os.path.join(base_folder, 'webapp/dist') static_folder = os.path.join(base_folder, 'webapp/dist/static') -application = Flask('platypush', template_folder=template_folder, +application = Flask('platypush', + template_folder=template_folder, static_folder=static_folder) for route in get_routes(): diff --git a/platypush/backend/http/app/utils.py b/platypush/backend/http/app/utils.py index c075c188f..c46511aa7 100644 --- a/platypush/backend/http/app/utils.py +++ b/platypush/backend/http/app/utils.py @@ -3,7 +3,7 @@ import logging import os from functools import wraps -from flask import abort, request, redirect, Response +from flask import abort, request, redirect, Response, current_app from redis import Redis # NOTE: The HTTP service will *only* work on top of a Redis bus. The default @@ -23,7 +23,7 @@ _logger = None def bus(): global _bus if _bus is None: - _bus = RedisBus() + _bus = RedisBus(redis_queue=current_app.config['redis_queue']) return _bus diff --git a/platypush/bus/redis.py b/platypush/bus/redis.py index 302b31822..3dc48a5ac 100644 --- a/platypush/bus/redis.py +++ b/platypush/bus/redis.py @@ -16,8 +16,7 @@ class RedisBus(Bus): """ Overrides the in-process in-memory local bus with a Redis bus """ _DEFAULT_REDIS_QUEUE = 'platypush/bus' - def __init__(self, on_message=None, redis_queue=_DEFAULT_REDIS_QUEUE, - *args, **kwargs): + def __init__(self, on_message=None, redis_queue=None, *args, **kwargs): super().__init__(on_message=on_message) if not args and not kwargs: @@ -25,7 +24,7 @@ class RedisBus(Bus): self.redis = Redis(*args, **kwargs) self.redis_args = kwargs - self.redis_queue = redis_queue + self.redis_queue = redis_queue or self._DEFAULT_REDIS_QUEUE self.on_message = on_message self.thread_id = threading.get_ident() diff --git a/tests/conftest.py b/tests/conftest.py index a6c7a7bae..bdeb1e988 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -16,8 +16,7 @@ def app(): logging.info('Starting Platypush test service') Config.init(config_file) - app = Daemon(config_file=config_file) - app.bus.redis_queue = 'platypush-tests/bus' + app = Daemon(config_file=config_file, redis_queue='platypush-tests/bus') Thread(target=lambda: app.run()).start() logging.info('Sleeping {} seconds while waiting for the daemon to start up'.format(app_start_timeout)) time.sleep(app_start_timeout)