Support for specifying the application Redis queue from the command line or service constructor

This commit is contained in:
Fabio Manganiello 2021-03-06 19:22:13 +01:00
parent 6f85318868
commit 090e7d6de8
6 changed files with 24 additions and 11 deletions

View file

@ -41,6 +41,9 @@ class Daemon:
# - plugins will post the responses they process # - plugins will post the responses they process
bus = None bus = None
# Default bus queue name
_default_redis_queue = 'platypush/bus'
pidfile = None pidfile = None
# backend_name => backend_obj map # backend_name => backend_obj map
@ -50,7 +53,7 @@ class Daemon:
n_tries = 2 n_tries = 2
def __init__(self, config_file=None, pidfile=None, requests_to_process=None, def __init__(self, config_file=None, pidfile=None, requests_to_process=None,
no_capture_stdout=False, no_capture_stderr=False): no_capture_stdout=False, no_capture_stderr=False, redis_queue=None):
""" """
Constructor Constructor
Params: Params:
@ -64,6 +67,7 @@ class Daemon:
capture by the logging system capture by the logging system
no_capture_stderr -- Set to true if you want to disable the stderr no_capture_stderr -- Set to true if you want to disable the stderr
capture by the logging system capture by the logging system
redis_queue -- Name of the (Redis) queue used for dispatching messages (default: platypush/bus).
""" """
if pidfile: if pidfile:
@ -71,12 +75,15 @@ class Daemon:
with open(self.pidfile, 'w') as f: with open(self.pidfile, 'w') as f:
f.write(str(os.getpid())) f.write(str(os.getpid()))
self.redis_queue = redis_queue or self._default_redis_queue
self.config_file = config_file self.config_file = config_file
Config.init(self.config_file) Config.init(self.config_file)
logging.basicConfig(**Config.get('logging')) logging.basicConfig(**Config.get('logging'))
redis_conf = Config.get('backend.redis') or {} redis_conf = Config.get('backend.redis') or {}
self.bus = RedisBus(on_message=self.on_message(), **redis_conf.get('redis_args', {})) self.bus = RedisBus(redis_queue=self.redis_queue, on_message=self.on_message(),
**redis_conf.get('redis_args', {}))
self.no_capture_stdout = no_capture_stdout self.no_capture_stdout = no_capture_stdout
self.no_capture_stderr = no_capture_stderr self.no_capture_stderr = no_capture_stderr
self.event_processor = EventProcessor() self.event_processor = EventProcessor()
@ -108,11 +115,17 @@ class Daemon:
help="Set this flag if you have max stack depth " + help="Set this flag if you have max stack depth " +
"exceeded errors so stderr won't be captured by " + "exceeded errors so stderr won't be captured by " +
"the logging system") "the logging system")
parser.add_argument('--redis-queue', dest='redis_queue',
required=False, action='store_true',
default=cls._default_redis_queue,
help="Name of the Redis queue to be used to internally deliver messages "
"(default: platypush/bus)")
opts, args = parser.parse_known_args(args) opts, args = parser.parse_known_args(args)
return cls(config_file=opts.config, pidfile=opts.pidfile, return cls(config_file=opts.config, pidfile=opts.pidfile,
no_capture_stdout=opts.no_capture_stdout, no_capture_stdout=opts.no_capture_stdout,
no_capture_stderr=opts.no_capture_stderr) no_capture_stderr=opts.no_capture_stderr,
redis_queue=opts.redis_queue)
def on_message(self): def on_message(self):
""" """

View file

@ -391,6 +391,7 @@ class HttpBackend(Backend):
'debug': False, 'debug': False,
} }
application.config['redis_queue'] = self.bus.redis_queue
if self.ssl_context: if self.ssl_context:
kwargs['ssl_context'] = self.ssl_context kwargs['ssl_context'] = self.ssl_context

View file

@ -28,7 +28,8 @@ base_folder = os.path.abspath(os.path.join(
template_folder = os.path.join(base_folder, 'webapp/dist') template_folder = os.path.join(base_folder, 'webapp/dist')
static_folder = os.path.join(base_folder, 'webapp/dist/static') static_folder = os.path.join(base_folder, 'webapp/dist/static')
application = Flask('platypush', template_folder=template_folder, application = Flask('platypush',
template_folder=template_folder,
static_folder=static_folder) static_folder=static_folder)
for route in get_routes(): for route in get_routes():

View file

@ -3,7 +3,7 @@ import logging
import os import os
from functools import wraps from functools import wraps
from flask import abort, request, redirect, Response from flask import abort, request, redirect, Response, current_app
from redis import Redis from redis import Redis
# NOTE: The HTTP service will *only* work on top of a Redis bus. The default # NOTE: The HTTP service will *only* work on top of a Redis bus. The default
@ -23,7 +23,7 @@ _logger = None
def bus(): def bus():
global _bus global _bus
if _bus is None: if _bus is None:
_bus = RedisBus() _bus = RedisBus(redis_queue=current_app.config['redis_queue'])
return _bus return _bus

View file

@ -16,8 +16,7 @@ class RedisBus(Bus):
""" Overrides the in-process in-memory local bus with a Redis bus """ """ Overrides the in-process in-memory local bus with a Redis bus """
_DEFAULT_REDIS_QUEUE = 'platypush/bus' _DEFAULT_REDIS_QUEUE = 'platypush/bus'
def __init__(self, on_message=None, redis_queue=_DEFAULT_REDIS_QUEUE, def __init__(self, on_message=None, redis_queue=None, *args, **kwargs):
*args, **kwargs):
super().__init__(on_message=on_message) super().__init__(on_message=on_message)
if not args and not kwargs: if not args and not kwargs:
@ -25,7 +24,7 @@ class RedisBus(Bus):
self.redis = Redis(*args, **kwargs) self.redis = Redis(*args, **kwargs)
self.redis_args = kwargs self.redis_args = kwargs
self.redis_queue = redis_queue self.redis_queue = redis_queue or self._DEFAULT_REDIS_QUEUE
self.on_message = on_message self.on_message = on_message
self.thread_id = threading.get_ident() self.thread_id = threading.get_ident()

View file

@ -16,8 +16,7 @@ def app():
logging.info('Starting Platypush test service') logging.info('Starting Platypush test service')
Config.init(config_file) Config.init(config_file)
app = Daemon(config_file=config_file) app = Daemon(config_file=config_file, redis_queue='platypush-tests/bus')
app.bus.redis_queue = 'platypush-tests/bus'
Thread(target=lambda: app.run()).start() Thread(target=lambda: app.run()).start()
logging.info('Sleeping {} seconds while waiting for the daemon to start up'.format(app_start_timeout)) logging.info('Sleeping {} seconds while waiting for the daemon to start up'.format(app_start_timeout))
time.sleep(app_start_timeout) time.sleep(app_start_timeout)