diff --git a/platypush/app/_app.py b/platypush/app/_app.py index bf005415..cf74bbdf 100644 --- a/platypush/app/_app.py +++ b/platypush/app/_app.py @@ -1,5 +1,6 @@ from contextlib import contextmanager import logging +import pathlib import os import signal import subprocess @@ -41,6 +42,7 @@ class Application: self, config_file: Optional[str] = None, workdir: Optional[str] = None, + db: Optional[str] = None, logsdir: Optional[str] = None, cachedir: Optional[str] = None, device_id: Optional[str] = None, @@ -56,30 +58,87 @@ class Application: ctrl_sock: Optional[str] = None, ): """ - :param config_file: Configuration file override (default: None). - :param workdir: Overrides the ``workdir`` setting in the configuration - file (default: None). - :param logsdir: Set logging directory. If not specified, the - ``filename`` setting under the ``logging`` section of the - configuration file is used. If not set, logging will be sent to - stdout and stderr. - :param cachedir: Overrides the ``cachedir`` setting in the configuration - file (default: None). - :param device_id: Override the device ID used to identify this - instance. If not passed here, it is inferred from the configuration - (device_id field). If not present there either, it is inferred from - the hostname. + :param config_file: Configuration file. The order of precedence is: + + - The ``config_file`` parameter (or the ``-c``/``--config`` command + line argument). + - The ``PLATYPUSH_CONFIG`` environment variable. + - ``./config.yaml`` + - ``~/.config/platypush/config.yaml`` + - ``/etc/platypush/config.yaml`` + + :param workdir: Working directory where the application will store its + data and integration plugins will store their data. The order of + precedence is: + + - The ``workdir`` parameter (or the ``-w``/``--workdir`` command + line argument). + - The ``PLATYPUSH_WORKDIR`` environment variable. + - The ``workdir`` field in the configuration file. + - ``~/.local/share/platypush`` + + :param db: Main database engine for the application. Supports SQLAlchemy + engine strings. The order of precedence is: + + - The ``db`` parameter (or the ``--main-db``/``--db`` command + line argument). + - The ``PLATYPUSH_DB`` environment variable. + - The ``db`` field in the configuration file. + - ``sqlite:////main.db`` + + :param logsdir: Logs directory where the application will store its logs. + The order of precedence is: + + - The ``logsdir`` parameter (or the ``-l``/``--logsdir`` command + line argument). + - The ``PLATYPUSH_LOGSDIR`` environment variable. + - The ``logging`` -> ``filename`` field in the configuration + file (the ``format`` and ``level`` fields can be set as well + using Python logging configuration). + - stdout and stderr + + :param cachedir: Directory where the application and the plugins will store + their cache data. The order of precedence is: + + - The ``cachedir`` parameter (or the ``--cachedir`` command line + argument). + - The ``PLATYPUSH_CACHEDIR`` environment variable. + - The ``cachedir`` field in the configuration file. + - ``~/.cache/platypush`` + + :param device_id: Device ID used to identify this instance. The order + of precedence is: + + - The ``device_id`` parameter (or the ``--device-id`` command + line argument). + - The ``PLATYPUSH_DEVICE_ID`` environment variable. + - The ``device_id`` field in the configuration file. + - The hostname of the machine. + :param pidfile: File where platypush will store its PID upon launch, useful if you're planning to integrate the application within a - service or a launcher script (default: None). + service or a launcher script. Order of precedence: + + - The ``pidfile`` parameter (or the ``-P``/``--pidfile`` command + line argument). + - The ``PLATYPUSH_PIDFILE`` environment variable. + - No PID file. + :param requests_to_process: Exit after processing the specified - number of requests (default: None, loop forever). + number of requests (default: None, loop forever). This is usually + useful for testing purposes. :param no_capture_stdout: Set to true if you want to disable the stdout capture by the logging system (default: False). :param no_capture_stderr: Set to true if you want to disable the stderr capture by the logging system (default: False). :param redis_queue: Name of the (Redis) queue used for dispatching - messages (default: platypush/bus). + messages. Order of precedence: + + - The ``redis_queue`` parameter (or the ``--redis-queue`` command + line argument). + - The ``PLATYPUSH_REDIS_QUEUE`` environment variable. + - ``platypush/bus`` + :param verbose: Enable debug/verbose logging, overriding the stored configuration (default: False). :param start_redis: If set, it starts a managed Redis instance upon @@ -87,34 +146,57 @@ class Application: server). This is particularly useful when running the application inside of Docker containers, without relying on ``docker-compose`` to start multiple containers, and in tests (default: False). - :param redis_host: Host of the Redis server to be used. It overrides - the settings in the ``redis`` section of the configuration file. - :param redis_port: Port of the local Redis server. It overrides the - settings in the ``redis`` section of the configuration file. + :param redis_host: Host of the Redis server to be used. The order of + precedence is: + + - The ``redis_host`` parameter (or the ``--redis-host`` command + line argument). + - The ``PLATYPUSH_REDIS_HOST`` environment variable. + - The ``redis`` -> ``host`` field in the configuration file. + - The ``backend.redis`` -> ``redis_args`` -> ``host`` field in + the configuration file. + - ``localhost`` + + :param redis_port: Port of the Redis server to be used. The order of + precedence is: + + - The ``redis_port`` parameter (or the ``--redis-port`` command + line argument). + - The ``PLATYPUSH_REDIS_PORT`` environment variable. + - The ``redis`` -> ``port`` field in the configuration file. + - The ``backend.redis`` -> ``redis_args`` -> ``port`` field in + the configuration file. + - ``6379`` + :param ctrl_sock: If set, it identifies a path to a UNIX domain socket that the application can use to send control messages (e.g. STOP and RESTART) to its parent. """ - self.pidfile = pidfile + self.pidfile = pidfile or os.environ.get('PLATYPUSH_PIDFILE') self.bus: Optional[Bus] = None - self.redis_queue = redis_queue or RedisBus.DEFAULT_REDIS_QUEUE - self.config_file = config_file - self._verbose = verbose - self._logsdir = ( - os.path.abspath(os.path.expanduser(logsdir)) if logsdir else None + self.redis_queue = ( + redis_queue + or os.environ.get('PLATYPUSH_REDIS_QUEUE') + or RedisBus.DEFAULT_REDIS_QUEUE ) + self.config_file = config_file or os.environ.get('PLATYPUSH_CONFIG') + self.verbose = verbose + self.db_engine = db or os.environ.get('PLATYPUSH_DB') + self.device_id = device_id or os.environ.get('PLATYPUSH_DEVICE_ID') + self.logsdir = self.expand_path(logsdir or os.environ.get('PLATYPUSH_LOGSDIR')) + self.workdir = self.expand_path(workdir or os.environ.get('PLATYPUSH_WORKDIR')) + self.cachedir = self.expand_path( + cachedir or os.environ.get('PLATYPUSH_CACHEDIR') + ) Config.init( self.config_file, - device_id=device_id, - workdir=os.path.abspath(os.path.expanduser(workdir)) if workdir else None, - cachedir=os.path.abspath(os.path.expanduser(cachedir)) - if cachedir - else None, - ctrl_sock=os.path.abspath(os.path.expanduser(ctrl_sock)) - if ctrl_sock - else None, + device_id=self.device_id, + workdir=self.workdir, + db=self.db_engine, + cachedir=self.cachedir, + ctrl_sock=self.expand_path(ctrl_sock), ) self.no_capture_stdout = no_capture_stdout @@ -125,24 +207,25 @@ class Application: self.processed_requests = 0 self.cron_scheduler = None self.start_redis = start_redis - self.redis_host = redis_host - self.redis_port = redis_port - self.redis_conf = {} + self.redis_host = redis_host or os.environ.get('PLATYPUSH_REDIS_HOST') + self.redis_port = redis_port or os.environ.get('PLATYPUSH_REDIS_PORT') + self._redis_conf = { + 'host': self.redis_host or 'localhost', + 'port': self.redis_port or self._default_redis_port, + } + self._redis_proc: Optional[subprocess.Popen] = None self.cmd_stream = CommandStream(ctrl_sock) self._init_bus() self._init_logging() + @staticmethod + def expand_path(path: Optional[str]) -> Optional[str]: + return os.path.abspath(os.path.expanduser(path)) if path else None + def _init_bus(self): - self._redis_conf = get_redis_conf() - self._redis_conf['port'] = self.redis_port or self._redis_conf.get( - 'port', self._default_redis_port - ) - - if self.redis_host: - self._redis_conf['host'] = self.redis_host - + self._redis_conf = {**self._redis_conf, **get_redis_conf()} Config.set('redis', self._redis_conf) self.bus = RedisBus( redis_queue=self.redis_queue, @@ -152,12 +235,18 @@ class Application: def _init_logging(self): logging_conf = Config.get('logging') or {} - if self._verbose: + if self.verbose: logging_conf['level'] = logging.DEBUG - if self._logsdir: - logging_conf['filename'] = os.path.join(self._logsdir, 'platypush.log') + + if self.logsdir: + logging_conf['filename'] = os.path.join(self.logsdir, 'platypush.log') logging_conf.pop('stream', None) + if logging_conf.get('filename'): + pathlib.Path(os.path.dirname(logging_conf['filename'])).mkdir( + parents=True, exist_ok=True + ) + Config.set('logging', logging_conf) logging.basicConfig(**logging_conf) @@ -214,6 +303,7 @@ class Application: workdir=opts.workdir, cachedir=opts.cachedir, logsdir=opts.logsdir, + db=opts.db_engine, device_id=opts.device_id, pidfile=opts.pidfile, no_capture_stdout=opts.no_capture_stdout, diff --git a/platypush/backend/redis/__init__.py b/platypush/backend/redis/__init__.py index a70c3de8..d5ad3b77 100644 --- a/platypush/backend/redis/__init__.py +++ b/platypush/backend/redis/__init__.py @@ -1,43 +1,41 @@ import json -from typing import Optional, Union +from typing import Any, Dict, Optional, Union from redis import Redis from platypush.backend import Backend -from platypush.context import get_plugin from platypush.message import Message +from platypush.utils import get_redis_conf class RedisBackend(Backend): """ Backend that reads messages from a configured Redis queue (default: - ``platypush_bus_mq``) and posts them to the application bus. Very - useful when you have plugin whose code is executed in another process + ``platypush/backend/redis``) and forwards them to the application bus. + + Useful when you have plugin whose code is executed in another process and can't post events or requests to the application bus. """ - def __init__(self, *args, queue='platypush_bus_mq', redis_args=None, **kwargs): + def __init__( + self, + *args, + queue: str = 'platypush/backend/redis', + redis_args: Optional[Dict[str, Any]] = None, + **kwargs + ): """ - :param queue: Queue name to listen on (default: ``platypush_bus_mq``) - :type queue: str + :param queue: Name of the Redis queue to listen to (default: + ``platypush/backend/redis``). - :param redis_args: Arguments that will be passed to the redis-py constructor (e.g. host, port, password), see - https://redis-py.readthedocs.io/en/latest/ - :type redis_args: dict + :param redis_args: Arguments that will be passed to the redis-py + constructor (e.g. host, port, password). See + https://redis-py.readthedocs.io/en/latest/connections.html#redis.Redis + for supported parameters. """ - super().__init__(*args, **kwargs) - if redis_args is None: - redis_args = {} - + self.redis_args = redis_args or get_redis_conf() self.queue = queue - - if not redis_args: - redis_plugin = get_plugin('redis') - if redis_plugin and redis_plugin.kwargs: - redis_args = redis_plugin.kwargs - - self.redis_args = redis_args self.redis: Optional[Redis] = None def send_message( @@ -47,20 +45,21 @@ class RedisBackend(Backend): Send a message to a Redis queue. :param msg: Message to send, as a ``Message`` object or a string. - :param queue_name: Queue name to send the message to (default: ``platypush_bus_mq``). + :param queue_name: Queue name to send the message to (default: + configured ``queue`` value). """ - if not self.redis: self.logger.warning('The Redis backend is not yet running.') return self.redis.rpush(queue_name or self.queue, str(msg)) - def get_message(self, queue_name=None): + def get_message(self, queue_name: Optional[str] = None) -> Optional[Message]: queue = queue_name or self.queue + assert self.redis, 'The Redis backend is not yet running.' data = self.redis.blpop(queue, timeout=1) if data is None: - return + return None msg = data[1].decode() try: @@ -68,15 +67,9 @@ class RedisBackend(Backend): except Exception as e: self.logger.debug(str(e)) try: - import ast - - msg = Message.build(ast.literal_eval(msg)) + msg = json.loads(msg) except Exception as ee: - self.logger.debug(str(ee)) - try: - msg = json.loads(msg) - except Exception as eee: - self.logger.exception(eee) + self.logger.exception(ee) return msg diff --git a/platypush/cli.py b/platypush/cli.py index b2d0a346..b7c36844 100644 --- a/platypush/cli.py +++ b/platypush/cli.py @@ -32,6 +32,18 @@ def parse_cmdline(args: Sequence[str]) -> argparse.Namespace: help='Custom working directory to be used for the application', ) + parser.add_argument( + '--main-db', + '--db', + dest='db_engine', + required=False, + default=None, + help='Custom database engine to be used for the application ' + '(e.g. sqlite:///:memory: or sqlite:///path/to/db.sqlite). ' + 'If missing, it falls back to the value of the `main.db` setting in the configuration file. ' + 'If missing, it falls back to sqlite:///main.db.', + ) + parser.add_argument( '--cachedir', dest='cachedir', diff --git a/platypush/config/__init__.py b/platypush/config/__init__.py index 6ef459d4..1fb543d7 100644 --- a/platypush/config/__init__.py +++ b/platypush/config/__init__.py @@ -97,6 +97,7 @@ class Config: cfgfile: Optional[str] = None, workdir: Optional[str] = None, cachedir: Optional[str] = None, + db: Optional[str] = None, ): """ Constructor. Always use the class as a singleton (i.e. through @@ -106,6 +107,7 @@ class Config: location in _cfgfile_locations). :param workdir: Overrides the default working directory. :param cachedir: Overrides the default cache directory. + :param db: Overrides the default database connection string. """ self.backends = {} @@ -124,7 +126,7 @@ class Config: self._init_secrets() self._init_dirs(workdir=workdir, cachedir=cachedir) - self._init_db() + self._init_db(db=db) self._init_logging() self._init_device_id() self._init_environment() @@ -175,7 +177,14 @@ class Config: self._config['logging'] = logging_config - def _init_db(self): + def _init_db(self, db: Optional[str] = None): + # If the db connection string is passed as an argument, use it + if db: + self._config['db'] = { + 'engine': db, + } + return + # Initialize the default db connection string db_engine = self._config.get('main.db', '') if db_engine: @@ -474,6 +483,7 @@ class Config: cfgfile: Optional[str] = None, workdir: Optional[str] = None, cachedir: Optional[str] = None, + db: Optional[str] = None, force_reload: bool = False, ) -> "Config": """ @@ -481,7 +491,7 @@ class Config: """ if force_reload or cls._instance is None: cfg_args = [cfgfile] if cfgfile else [] - cls._instance = Config(*cfg_args, workdir=workdir, cachedir=cachedir) + cls._instance = Config(*cfg_args, workdir=workdir, cachedir=cachedir, db=db) return cls._instance @classmethod @@ -546,6 +556,7 @@ class Config: device_id: Optional[str] = None, workdir: Optional[str] = None, cachedir: Optional[str] = None, + db: Optional[str] = None, ctrl_sock: Optional[str] = None, **_, ): @@ -556,10 +567,11 @@ class Config: :param device_id: Override the configured device_id. :param workdir: Override the configured working directory. :param cachedir: Override the configured cache directory. + :param db: Override the configured database connection string. :param ctrl_sock: Override the configured control socket. """ cfg = cls._get_instance( - cfgfile, workdir=workdir, cachedir=cachedir, force_reload=True + cfgfile, workdir=workdir, cachedir=cachedir, db=db, force_reload=True ) if device_id: cfg.set('device_id', device_id) @@ -567,6 +579,8 @@ class Config: cfg.set('workdir', workdir) if cachedir: cfg.set('cachedir', cachedir) + if db: + cfg.set('db', db) if ctrl_sock: cfg.set('ctrl_sock', ctrl_sock) diff --git a/platypush/config/config.yaml b/platypush/config/config.yaml index 6af32c3b..8fbbbe15 100644 --- a/platypush/config/config.yaml +++ b/platypush/config/config.yaml @@ -87,15 +87,20 @@ # # `workdir`. You can specify any other engine string here - the application has # # been tested against SQLite, Postgres and MariaDB/MySQL >= 8. # # +# # You can also specify a custom database engine at runtime (SQLAlchemy syntax +# # supported) through the `--db` argument otherwise. +# # # # NOTE: If you want to use a DBMS other than SQLite, then you will also need to # # ensure that a compatible Python driver is installed on the system where # # Platypush is running. For example, Postgres will require the Python pg8000, # # psycopg or another compatible driver. # # main.db: -# engine: sqlite:///home/user/.local/share/platypush/main.db +# engine: sqlite:////home/user/.local/share/platypush/main.db # # OR, if you want to use e.g. Postgres with the pg8000 driver: # engine: postgresql+pg8000://dbuser:dbpass@dbhost/dbname +# +# # NOTE: The short syntax `main.db: ` is also supported. ### ### --------------------- diff --git a/platypush/plugins/redis/__init__.py b/platypush/plugins/redis/__init__.py index 9b471290..b8327522 100644 --- a/platypush/plugins/redis/__init__.py +++ b/platypush/plugins/redis/__init__.py @@ -7,6 +7,9 @@ from platypush.utils import get_redis_conf class RedisPlugin(Plugin): """ Plugin to send messages on Redis queues. + + See https://redis-py.readthedocs.io/en/latest/connections.html#redis.Redis + for supported parameters. """ def __init__(self, *args, **kwargs): @@ -18,24 +21,19 @@ class RedisPlugin(Plugin): return Redis(*self.args, **self.kwargs) @action - def send_message(self, queue, msg, *args, **kwargs): + def send_message(self, queue: str, msg, *args, **kwargs): """ Send a message to a Redis queue. :param queue: Queue name - :type queue: str - :param msg: Message to be sent - :type msg: str, bytes, list, dict, Message object - - :param args: Args passed to the Redis constructor (see https://redis-py.readthedocs.io/en/latest/#redis.Redis) - :type args: list + :type msg: str, bytes, list, dict, :class:`platypush.message.Message` + :param args: Args passed to the Redis constructor (see + https://redis-py.readthedocs.io/en/latest/connections.html#redis.Redis) :param kwargs: Kwargs passed to the Redis constructor (see - https://redis-py.readthedocs.io/en/latest/#redis.Redis) - :type kwargs: dict + https://redis-py.readthedocs.io/en/latest/connections.html#redis.Redis) """ - if args or kwargs: redis = Redis(*args, **kwargs) else: @@ -48,7 +46,6 @@ class RedisPlugin(Plugin): """ :returns: The values specified in keys as a key/value dict (wraps MGET) """ - return { keys[i]: value.decode() if isinstance(value, bytes) else value for (i, value) in enumerate(self._get_redis().mget(keys, *args)) @@ -59,7 +56,6 @@ class RedisPlugin(Plugin): """ Set key/values based on mapping (wraps MSET) """ - try: return self._get_redis().mset(**kwargs) except TypeError: @@ -80,7 +76,6 @@ class RedisPlugin(Plugin): :param expiration: Expiration timeout (in seconds) :type expiration: int """ - return self._get_redis().expire(key, expiration) @action @@ -90,7 +85,6 @@ class RedisPlugin(Plugin): :param args: Keys to delete """ - return self._get_redis().delete(*args)