forked from platypush/platypush
[core] New architecture for the Redis bus.
- Use pubsub pattern rather than `rpush`/`blpop` - it saves memory, it's faster, and it decreases the risk of deadlocks. - Use a connection pool. - Propagate `PLATYPUSH_REDIS_QUEUE` environment variable so any subprocesses can access it.
This commit is contained in:
parent
1ad68cac11
commit
84e06e30fe
5 changed files with 110 additions and 32 deletions
|
@ -181,6 +181,8 @@ class Application:
|
||||||
or os.environ.get('PLATYPUSH_REDIS_QUEUE')
|
or os.environ.get('PLATYPUSH_REDIS_QUEUE')
|
||||||
or RedisBus.DEFAULT_REDIS_QUEUE
|
or RedisBus.DEFAULT_REDIS_QUEUE
|
||||||
)
|
)
|
||||||
|
|
||||||
|
os.environ['PLATYPUSH_REDIS_QUEUE'] = self.redis_queue
|
||||||
self.config_file = config_file or os.environ.get('PLATYPUSH_CONFIG')
|
self.config_file = config_file or os.environ.get('PLATYPUSH_CONFIG')
|
||||||
self.verbose = verbose
|
self.verbose = verbose
|
||||||
self.db_engine = db or os.environ.get('PLATYPUSH_DB')
|
self.db_engine = db or os.environ.get('PLATYPUSH_DB')
|
||||||
|
|
|
@ -1,24 +1,57 @@
|
||||||
|
from multiprocessing import Lock
|
||||||
|
|
||||||
from platypush.bus.redis import RedisBus
|
from platypush.bus.redis import RedisBus
|
||||||
|
from platypush.context import get_bus
|
||||||
from platypush.config import Config
|
from platypush.config import Config
|
||||||
from platypush.context import get_backend
|
|
||||||
from platypush.message import Message
|
from platypush.message import Message
|
||||||
from platypush.message.request import Request
|
from platypush.message.request import Request
|
||||||
from platypush.utils import get_redis_conf, get_message_response
|
from platypush.utils import get_message_response
|
||||||
|
|
||||||
from .logger import logger
|
from .logger import logger
|
||||||
|
|
||||||
_bus = None
|
|
||||||
|
class BusWrapper: # pylint: disable=too-few-public-methods
|
||||||
|
"""
|
||||||
|
Lazy singleton wrapper for the bus object.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self._redis_queue = None
|
||||||
|
self._bus = None
|
||||||
|
self._bus_lock = Lock()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def bus(self) -> RedisBus:
|
||||||
|
"""
|
||||||
|
Lazy getter/initializer for the bus object.
|
||||||
|
"""
|
||||||
|
with self._bus_lock:
|
||||||
|
if not self._bus:
|
||||||
|
self._bus = get_bus()
|
||||||
|
|
||||||
|
bus_: RedisBus = self._bus # type: ignore
|
||||||
|
return bus_
|
||||||
|
|
||||||
|
def post(self, msg):
|
||||||
|
"""
|
||||||
|
Send a message to the bus.
|
||||||
|
|
||||||
|
:param msg: The message to send.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
self.bus.post(msg)
|
||||||
|
except Exception as e:
|
||||||
|
logger().exception(e)
|
||||||
|
|
||||||
|
|
||||||
|
_bus = BusWrapper()
|
||||||
|
|
||||||
|
|
||||||
def bus():
|
def bus():
|
||||||
"""
|
"""
|
||||||
Lazy getter/initializer for the bus object.
|
Lazy getter/initializer for the bus object.
|
||||||
"""
|
"""
|
||||||
global _bus # pylint: disable=global-statement
|
return _bus.bus
|
||||||
if _bus is None:
|
|
||||||
redis_queue = get_backend('http').bus.redis_queue # type: ignore
|
|
||||||
_bus = RedisBus(**get_redis_conf(), redis_queue=redis_queue)
|
|
||||||
return _bus
|
|
||||||
|
|
||||||
|
|
||||||
def send_message(msg, wait_for_response=True):
|
def send_message(msg, wait_for_response=True):
|
||||||
|
|
|
@ -1,6 +1,5 @@
|
||||||
import logging
|
import logging
|
||||||
import threading
|
import threading
|
||||||
from typing import Optional
|
|
||||||
|
|
||||||
from platypush.bus import Bus
|
from platypush.bus import Bus
|
||||||
from platypush.message import Message
|
from platypush.message import Message
|
||||||
|
@ -24,25 +23,39 @@ class RedisBus(Bus):
|
||||||
self.redis_queue = redis_queue or self.DEFAULT_REDIS_QUEUE
|
self.redis_queue = redis_queue or self.DEFAULT_REDIS_QUEUE
|
||||||
self.on_message = on_message
|
self.on_message = on_message
|
||||||
self.thread_id = threading.get_ident()
|
self.thread_id = threading.get_ident()
|
||||||
|
self._pubsub = None
|
||||||
|
self._pubsub_lock = threading.RLock()
|
||||||
|
|
||||||
def get(self) -> Optional[Message]:
|
@property
|
||||||
|
def pubsub(self):
|
||||||
|
with self._pubsub_lock:
|
||||||
|
if not self._pubsub:
|
||||||
|
self._pubsub = self.redis.pubsub()
|
||||||
|
return self._pubsub
|
||||||
|
|
||||||
|
def poll(self):
|
||||||
"""
|
"""
|
||||||
Reads one message from the Redis queue
|
Polls the Redis queue for new messages
|
||||||
"""
|
"""
|
||||||
try:
|
with self.pubsub as pubsub:
|
||||||
if self.should_stop():
|
pubsub.subscribe(self.redis_queue)
|
||||||
return None
|
try:
|
||||||
|
for msg in pubsub.listen():
|
||||||
|
if msg.get('type') != 'message':
|
||||||
|
continue
|
||||||
|
|
||||||
msg = self.redis.blpop(self.redis_queue, timeout=1)
|
if self.should_stop():
|
||||||
if not msg or msg[1] is None:
|
break
|
||||||
return None
|
|
||||||
|
|
||||||
msg = msg[1].decode('utf-8')
|
try:
|
||||||
return Message.build(msg)
|
data = msg.get('data', b'').decode('utf-8')
|
||||||
except Exception as e:
|
parsed_msg = Message.build(data)
|
||||||
logger.exception(e)
|
if parsed_msg and self.on_message:
|
||||||
|
self.on_message(parsed_msg)
|
||||||
return None
|
except Exception as e:
|
||||||
|
logger.exception(e)
|
||||||
|
finally:
|
||||||
|
pubsub.unsubscribe(self.redis_queue)
|
||||||
|
|
||||||
def post(self, msg):
|
def post(self, msg):
|
||||||
"""
|
"""
|
||||||
|
@ -51,15 +64,13 @@ class RedisBus(Bus):
|
||||||
from redis import exceptions
|
from redis import exceptions
|
||||||
|
|
||||||
try:
|
try:
|
||||||
return self.redis.rpush(self.redis_queue, str(msg))
|
self.redis.publish(self.redis_queue, str(msg))
|
||||||
except exceptions.ConnectionError as e:
|
except exceptions.ConnectionError as e:
|
||||||
if not self.should_stop():
|
if not self.should_stop():
|
||||||
# Raise the exception only if the bus it not supposed to be
|
# Raise the exception only if the bus it not supposed to be
|
||||||
# stopped
|
# stopped
|
||||||
raise e
|
raise e
|
||||||
|
|
||||||
return None
|
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
super().stop()
|
super().stop()
|
||||||
self.redis.close()
|
self.redis.close()
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
import asyncio
|
import asyncio
|
||||||
import importlib
|
import importlib
|
||||||
import logging
|
import logging
|
||||||
|
import os
|
||||||
|
|
||||||
from dataclasses import dataclass, field
|
from dataclasses import dataclass, field
|
||||||
from threading import RLock
|
from threading import RLock
|
||||||
|
@ -194,11 +195,20 @@ def get_bus() -> Bus:
|
||||||
Get or register the main application bus.
|
Get or register the main application bus.
|
||||||
"""
|
"""
|
||||||
from platypush.bus.redis import RedisBus
|
from platypush.bus.redis import RedisBus
|
||||||
|
from platypush.utils import get_redis_conf
|
||||||
|
|
||||||
if _ctx.bus:
|
if _ctx.bus:
|
||||||
return _ctx.bus
|
return _ctx.bus
|
||||||
|
|
||||||
_ctx.bus = RedisBus()
|
redis_queue = (
|
||||||
|
os.environ.get('PLATYPUSH_REDIS_QUEUE') or RedisBus.DEFAULT_REDIS_QUEUE
|
||||||
|
)
|
||||||
|
|
||||||
|
_ctx.bus = RedisBus(
|
||||||
|
redis_queue=redis_queue,
|
||||||
|
**get_redis_conf(),
|
||||||
|
)
|
||||||
|
|
||||||
return _ctx.bus
|
return _ctx.bus
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -22,12 +22,14 @@ from threading import Event, Lock as TLock
|
||||||
from typing import Generator, Optional, Tuple, Type, Union
|
from typing import Generator, Optional, Tuple, Type, Union
|
||||||
|
|
||||||
from dateutil import parser, tz
|
from dateutil import parser, tz
|
||||||
from redis import Redis
|
from redis import ConnectionPool, Redis
|
||||||
from rsa.key import PublicKey, PrivateKey, newkeys
|
from rsa.key import PublicKey, PrivateKey, newkeys
|
||||||
|
|
||||||
logger = logging.getLogger('utils')
|
logger = logging.getLogger('utils')
|
||||||
Lock = Union[PLock, TLock] # type: ignore
|
Lock = Union[PLock, TLock] # type: ignore
|
||||||
|
|
||||||
|
redis_pools: dict[Tuple[str, int], ConnectionPool] = {}
|
||||||
|
|
||||||
|
|
||||||
def get_module_and_method_from_action(action):
|
def get_module_and_method_from_action(action):
|
||||||
"""
|
"""
|
||||||
|
@ -608,6 +610,29 @@ def get_enabled_backends() -> dict:
|
||||||
return backends
|
return backends
|
||||||
|
|
||||||
|
|
||||||
|
def get_redis_pool(*args, **kwargs) -> ConnectionPool:
|
||||||
|
"""
|
||||||
|
Get a Redis connection pool on the basis of the Redis configuration.
|
||||||
|
|
||||||
|
The Redis configuration can be loaded from:
|
||||||
|
|
||||||
|
1. The ``redis`` plugin.
|
||||||
|
2. The ``backend.redis`` configuration (``redis_args`` attribute)
|
||||||
|
|
||||||
|
"""
|
||||||
|
if not (args or kwargs):
|
||||||
|
kwargs = get_redis_conf()
|
||||||
|
|
||||||
|
pool_key = (kwargs.get('host', 'localhost'), kwargs.get('port', 6379))
|
||||||
|
pool = redis_pools.get(pool_key)
|
||||||
|
|
||||||
|
if not pool:
|
||||||
|
pool = ConnectionPool(*args, **kwargs)
|
||||||
|
redis_pools[pool_key] = pool
|
||||||
|
|
||||||
|
return pool
|
||||||
|
|
||||||
|
|
||||||
def get_redis_conf() -> dict:
|
def get_redis_conf() -> dict:
|
||||||
"""
|
"""
|
||||||
Get the Redis connection arguments from the configuration.
|
Get the Redis connection arguments from the configuration.
|
||||||
|
@ -631,10 +656,7 @@ def get_redis(*args, **kwargs) -> Redis:
|
||||||
2. The ``backend.redis`` configuration (``redis_args`` attribute)
|
2. The ``backend.redis`` configuration (``redis_args`` attribute)
|
||||||
|
|
||||||
"""
|
"""
|
||||||
if not (args or kwargs):
|
return Redis(connection_pool=get_redis_pool(*args, **kwargs))
|
||||||
kwargs = get_redis_conf()
|
|
||||||
|
|
||||||
return Redis(*args, **kwargs)
|
|
||||||
|
|
||||||
|
|
||||||
def to_datetime(t: Union[str, int, float, datetime.datetime]) -> datetime.datetime:
|
def to_datetime(t: Union[str, int, float, datetime.datetime]) -> datetime.datetime:
|
||||||
|
|
Loading…
Reference in a new issue