diff --git a/platypush/backend/redis/__init__.py b/platypush/backend/redis/__init__.py index d0b1e7c5..a70c3de8 100644 --- a/platypush/backend/redis/__init__.py +++ b/platypush/backend/redis/__init__.py @@ -1,5 +1,5 @@ import json -from typing import Optional +from typing import Optional, Union from redis import Redis @@ -16,7 +16,7 @@ class RedisBackend(Backend): and can't post events or requests to the application bus. """ - def __init__(self, queue='platypush_bus_mq', redis_args=None, *args, **kwargs): + def __init__(self, *args, queue='platypush_bus_mq', redis_args=None, **kwargs): """ :param queue: Queue name to listen on (default: ``platypush_bus_mq``) :type queue: str @@ -40,12 +40,21 @@ class RedisBackend(Backend): self.redis_args = redis_args self.redis: Optional[Redis] = None - def send_message(self, msg, queue_name=None, **kwargs): - msg = str(msg) - if queue_name: - self.redis.rpush(queue_name, msg) - else: - self.redis.rpush(self.queue, msg) + def send_message( + self, msg: Union[str, Message], queue_name: Optional[str] = None, **_ + ): + """ + Send a message to a Redis queue. + + :param msg: Message to send, as a ``Message`` object or a string. + :param queue_name: Queue name to send the message to (default: ``platypush_bus_mq``). + """ + + if not self.redis: + self.logger.warning('The Redis backend is not yet running.') + return + + self.redis.rpush(queue_name or self.queue, str(msg)) def get_message(self, queue_name=None): queue = queue_name or self.queue @@ -60,6 +69,7 @@ class RedisBackend(Backend): self.logger.debug(str(e)) try: import ast + msg = Message.build(ast.literal_eval(msg)) except Exception as ee: self.logger.debug(str(ee)) @@ -72,7 +82,11 @@ class RedisBackend(Backend): def run(self): super().run() - self.logger.info('Initialized Redis backend on queue {} with arguments {}'.format(self.queue, self.redis_args)) + self.logger.info( + 'Initialized Redis backend on queue %s with arguments %s', + self.queue, + self.redis_args, + ) with Redis(**self.redis_args) as self.redis: while not self.should_stop(): @@ -81,7 +95,7 @@ class RedisBackend(Backend): if not msg: continue - self.logger.info('Received message on the Redis backend: {}'.format(msg)) + self.logger.info('Received message on the Redis backend: %s', msg) self.on_message(msg) except Exception as e: self.logger.exception(e) diff --git a/platypush/bus/redis.py b/platypush/bus/redis.py index 1c64e8b3..15335902 100644 --- a/platypush/bus/redis.py +++ b/platypush/bus/redis.py @@ -1,48 +1,55 @@ import logging import threading - -from redis import Redis +from typing import Optional from platypush.bus import Bus -from platypush.config import Config from platypush.message import Message logger = logging.getLogger('platypush:bus:redis') class RedisBus(Bus): - """ Overrides the in-process in-memory local bus with a Redis bus """ + """ + Overrides the in-process in-memory local bus with a Redis bus + """ + _DEFAULT_REDIS_QUEUE = 'platypush/bus' def __init__(self, *args, on_message=None, redis_queue=None, **kwargs): + from platypush.utils import get_redis + super().__init__(on_message=on_message) - - if not args and not kwargs: - kwargs = (Config.get('backend.redis') or {}).get('redis_args', {}) - - self.redis = Redis(*args, **kwargs) + self.redis = get_redis(*args, **kwargs) self.redis_args = kwargs self.redis_queue = redis_queue or self._DEFAULT_REDIS_QUEUE self.on_message = on_message self.thread_id = threading.get_ident() - def get(self): - """ Reads one message from the Redis queue """ + def get(self) -> Optional[Message]: + """ + Reads one message from the Redis queue + """ + try: if self.should_stop(): - return + return None msg = self.redis.blpop(self.redis_queue, timeout=1) if not msg or msg[1] is None: - return + return None msg = msg[1].decode('utf-8') return Message.build(msg) except Exception as e: logger.exception(e) + return None + def post(self, msg): - """ Sends a message to the Redis queue """ + """ + Sends a message to the Redis queue + """ + return self.redis.rpush(self.redis_queue, str(msg)) def stop(self): diff --git a/platypush/context/__init__.py b/platypush/context/__init__.py index d2421ac6..982926e7 100644 --- a/platypush/context/__init__.py +++ b/platypush/context/__init__.py @@ -2,6 +2,7 @@ import asyncio import importlib import logging +from dataclasses import dataclass, field from threading import RLock from typing import Optional, Any @@ -11,36 +12,62 @@ from ..utils import get_enabled_plugins logger = logging.getLogger('platypush:context') -# Map: backend_name -> backend_instance -backends = {} -# Map: plugin_name -> plugin_instance -plugins = {} +@dataclass +class Context: + """ + Data class to hold the context of the application. + """ + + # backend_name -> backend_instance + backends: dict = field(default_factory=dict) + # plugin_name -> plugin_instance + plugins: dict = field(default_factory=dict) + # Reference to the main application bus + bus: Optional[Bus] = None + + +_ctx = Context() + +# # Map: backend_name -> backend_instance +# backends = {} + +# # Map: plugin_name -> plugin_instance +# plugins = {} # Map: plugin_name -> init_lock to make sure that a plugin isn't initialized # multiple times plugins_init_locks = {} # Reference to the main application bus -main_bus = None +# main_bus = None + + +def get_context() -> Context: + """ + Get the current application context. + """ + + return _ctx def register_backends(bus=None, global_scope=False, **kwargs): - """Initialize the backend objects based on the configuration and returns - a name -> backend_instance map. + """ + Initialize the backend objects based on the configuration and returns a + name -> backend_instance map. + Params: bus -- If specific (it usually should), the messages processed by the backends will be posted on this bus. - - kwargs -- Any additional key-value parameters required to initialize the backends + kwargs -- Any additional key-value parameters required to initialize + the backends """ - global main_bus if bus: - main_bus = bus + _ctx.bus = bus if global_scope: - global backends + backends = _ctx.backends else: backends = {} @@ -57,13 +84,16 @@ def register_backends(bus=None, global_scope=False, **kwargs): b = getattr(module, cls_name)(bus=bus, **cfg, **kwargs) backends[name] = b except AttributeError as e: - logger.warning('No such class in {}: {}'.format(module.__name__, cls_name)) - raise RuntimeError(e) + logger.warning('No such class in %s: %s', module.__name__, cls_name) + raise RuntimeError(e) from e return backends def register_plugins(bus=None): + """ + Register and start all the ``RunnablePlugin`` configured implementations. + """ from ..plugins import RunnablePlugin for plugin in get_enabled_plugins().values(): @@ -75,27 +105,25 @@ def register_plugins(bus=None): def get_backend(name): """Returns the backend instance identified by name if it exists""" - global backends - return backends.get(name) + return _ctx.backends.get(name) def get_plugin(plugin_name, reload=False): - """Registers a plugin instance by name if not registered already, or - returns the registered plugin instance""" - global plugins - global plugins_init_locks - + """ + Registers a plugin instance by name if not registered already, or returns + the registered plugin instance. + """ if plugin_name not in plugins_init_locks: plugins_init_locks[plugin_name] = RLock() - if plugin_name in plugins and not reload: - return plugins[plugin_name] + if plugin_name in _ctx.plugins and not reload: + return _ctx.plugins[plugin_name] try: plugin = importlib.import_module('platypush.plugins.' + plugin_name) except ImportError as e: - logger.warning('No such plugin: {}'.format(plugin_name)) - raise RuntimeError(e) + logger.warning('No such plugin: %s', plugin_name) + raise RuntimeError(e) from e # e.g. plugins.music.mpd main class: MusicMpdPlugin cls_name = '' @@ -120,30 +148,34 @@ def get_plugin(plugin_name, reload=False): try: plugin_class = getattr(plugin, cls_name) except AttributeError as e: - logger.warning( - 'No such class in {}: {} [error: {}]'.format(plugin_name, cls_name, str(e)) - ) - raise RuntimeError(e) + logger.warning('No such class in %s: %s [error: %s]', plugin_name, cls_name, e) + raise RuntimeError(e) from e with plugins_init_locks[plugin_name]: - if plugins.get(plugin_name) and not reload: - return plugins[plugin_name] - plugins[plugin_name] = plugin_class(**plugin_conf) + if _ctx.plugins.get(plugin_name) and not reload: + return _ctx.plugins[plugin_name] + _ctx.plugins[plugin_name] = plugin_class(**plugin_conf) - return plugins[plugin_name] + return _ctx.plugins[plugin_name] def get_bus() -> Bus: - global main_bus - if main_bus: - return main_bus - + """ + Get or register the main application bus. + """ from platypush.bus.redis import RedisBus - return RedisBus() + if _ctx.bus: + return _ctx.bus + + _ctx.bus = RedisBus() + return _ctx.bus -def get_or_create_event_loop(): +def get_or_create_event_loop() -> asyncio.AbstractEventLoop: + """ + Get or create a new event loop + """ try: loop = asyncio.get_event_loop() except (DeprecationWarning, RuntimeError): diff --git a/platypush/plugins/redis/__init__.py b/platypush/plugins/redis/__init__.py index 56aa26a9..931581cd 100644 --- a/platypush/plugins/redis/__init__.py +++ b/platypush/plugins/redis/__init__.py @@ -71,7 +71,7 @@ class RedisPlugin(Plugin): try: return self._get_redis().mset(**kwargs) except TypeError: - # XXX commit https://github.com/andymccurdy/redis-py/commit/90a52dd5de111f0053bb3ebaa7c78f73a82a1e3e + # Commit https://github.com/andymccurdy/redis-py/commit/90a52dd5de111f0053bb3ebaa7c78f73a82a1e3e # broke back-compatibility with the previous way of passing # key-value pairs to mset directly on kwargs. This try-catch block # is to support things on all the redis-py versions diff --git a/platypush/utils/__init__.py b/platypush/utils/__init__.py index 22bc0c06..7b46b096 100644 --- a/platypush/utils/__init__.py +++ b/platypush/utils/__init__.py @@ -525,7 +525,7 @@ def get_enabled_plugins() -> dict: return plugins -def get_redis() -> Redis: +def get_redis(*args, **kwargs) -> Redis: """ Get a Redis client on the basis of the Redis configuration. @@ -537,13 +537,14 @@ def get_redis() -> Redis: """ from platypush.config import Config - return Redis( - **( + if not (args or kwargs): + kwargs = ( (Config.get('backend.redis') or {}).get('redis_args', {}) or Config.get('redis') or {} ) - ) + + return Redis(*args, **kwargs) def to_datetime(t: Union[str, int, float, datetime.datetime]) -> datetime.datetime: