From 827b56400675acb2f601a2be88cb48109a777925 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Sun, 13 Aug 2023 22:20:16 +0200 Subject: [PATCH 01/18] Using a single constant for DEFAULT_REDIS_QUEUE. Also, catch `AttributeError` on `self._proc.terminate` in the `HttpBackend`, since the process may already have been terminated and set to null by another worker process. --- platypush/app.py | 5 +---- platypush/backend/http/__init__.py | 8 ++++++-- platypush/bus/redis.py | 6 +++--- 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/platypush/app.py b/platypush/app.py index cea035c720..9f6151b5a2 100644 --- a/platypush/app.py +++ b/platypush/app.py @@ -25,9 +25,6 @@ log = logging.getLogger('platypush') class Application: """Main class for the Platypush application.""" - # Default bus queue name - _default_redis_queue = 'platypush/bus' - # Default Redis port _default_redis_port = 6379 @@ -90,7 +87,7 @@ class Application: f.write(str(os.getpid())) self.bus: Optional[Bus] = None - self.redis_queue = redis_queue or self._default_redis_queue + self.redis_queue = redis_queue or RedisBus.DEFAULT_REDIS_QUEUE self.config_file = config_file self._verbose = verbose self._logsdir = ( diff --git a/platypush/backend/http/__init__.py b/platypush/backend/http/__init__.py index 4df3ce8a7e..12ce9f2c5d 100644 --- a/platypush/backend/http/__init__.py +++ b/platypush/backend/http/__init__.py @@ -269,8 +269,12 @@ class HttpBackend(Backend): self._workers.pop(i) if self._server_proc: - self._server_proc.terminate() - self._server_proc.join(timeout=5) + try: + self._server_proc.terminate() + self._server_proc.join(timeout=5) + except AttributeError: + pass + self._server_proc = None if self._server_proc and self._server_proc.is_alive(): diff --git a/platypush/bus/redis.py b/platypush/bus/redis.py index e8273a1268..83ded8aff7 100644 --- a/platypush/bus/redis.py +++ b/platypush/bus/redis.py @@ -1,6 +1,6 @@ import logging import threading -from typing import Optional +from typing import Final, Optional from platypush.bus import Bus from platypush.message import Message @@ -13,7 +13,7 @@ class RedisBus(Bus): Overrides the in-process in-memory local bus with a Redis bus """ - _DEFAULT_REDIS_QUEUE = 'platypush/bus' + DEFAULT_REDIS_QUEUE: Final[str] = 'platypush/bus' def __init__(self, *args, on_message=None, redis_queue=None, **kwargs): from platypush.utils import get_redis @@ -21,7 +21,7 @@ class RedisBus(Bus): super().__init__(on_message=on_message) self.redis = get_redis(*args, **kwargs) self.redis_args = kwargs - self.redis_queue = redis_queue or self._DEFAULT_REDIS_QUEUE + self.redis_queue = redis_queue or self.DEFAULT_REDIS_QUEUE self.on_message = on_message self.thread_id = threading.get_ident() From 1819ee75ef260d468b9fa1e64e387f1ec8ed7d6e Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Sun, 13 Aug 2023 22:23:15 +0200 Subject: [PATCH 02/18] `s/Application.stop_app/Application.stop/g` --- platypush/app.py | 8 +++++--- tests/conftest.py | 2 +- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/platypush/app.py b/platypush/app.py index 9f6151b5a2..bb162489d4 100644 --- a/platypush/app.py +++ b/platypush/app.py @@ -348,7 +348,7 @@ class Application: self.requests_to_process and self.processed_requests >= self.requests_to_process ): - self.stop_app() + self.stop() elif isinstance(msg, Response): msg.log() elif isinstance(msg, Event): @@ -357,10 +357,12 @@ class Application: return _f - def stop_app(self): + def stop(self): """Stops the backends and the bus.""" from .plugins import RunnablePlugin + log.info('Stopping the application') + if self.backends: for backend in self.backends.values(): backend.stop() @@ -426,7 +428,7 @@ class Application: except KeyboardInterrupt: log.info('SIGINT received, terminating application') finally: - self.stop_app() + self.stop() def main(*args: str): diff --git a/tests/conftest.py b/tests/conftest.py index 017042aed7..9e3013ddb2 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -45,7 +45,7 @@ def app(): yield _app logging.info('Stopping Platypush test service') - _app.stop_app() + _app.stop() clear_loggers() db = (Config.get('main.db') or {}).get('engine', '')[len('sqlite:///') :] From 97adc3f775a1f0133645d41bf0644d56de345853 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Sun, 13 Aug 2023 22:27:16 +0200 Subject: [PATCH 03/18] Moved application argument parser to an external `platypush.cli` module. --- platypush/app.py | 132 ++------------------------------------------- platypush/cli.py | 138 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 143 insertions(+), 127 deletions(-) create mode 100644 platypush/cli.py diff --git a/platypush/app.py b/platypush/app.py index bb162489d4..78fc8f4967 100644 --- a/platypush/app.py +++ b/platypush/app.py @@ -1,12 +1,12 @@ -import argparse import logging import os import subprocess import sys -from typing import Optional +from typing import Optional, Sequence from .bus import Bus from .bus.redis import RedisBus +from .cli import parse_cmdline from .config import Config from .context import register_backends, register_plugins from .cron.scheduler import CronScheduler @@ -184,133 +184,11 @@ class Application: self._redis_proc = None @classmethod - def build(cls, *args: str): + def from_cmdline(cls, args: Sequence[str]) -> "Application": """ Build the app from command line arguments. """ - from . import __version__ - - parser = argparse.ArgumentParser() - - parser.add_argument( - '--config', - '-c', - dest='config', - required=False, - default=None, - help='Custom location for the configuration file', - ) - - parser.add_argument( - '--workdir', - '-w', - dest='workdir', - required=False, - default=None, - help='Custom working directory to be used for the application', - ) - - parser.add_argument( - '--logsdir', - '-l', - dest='logsdir', - required=False, - default=None, - help='Store logs in the specified directory. By default, the ' - '`[logging.]filename` configuration option will be used. If not ' - 'set, logging will be sent to stdout and stderr.', - ) - - parser.add_argument( - '--version', - dest='version', - required=False, - action='store_true', - help="Print the current version and exit", - ) - - parser.add_argument( - '--verbose', - '-v', - dest='verbose', - required=False, - action='store_true', - help="Enable verbose/debug logging", - ) - - parser.add_argument( - '--pidfile', - '-P', - dest='pidfile', - required=False, - default=None, - help="File where platypush will " - + "store its PID, useful if you're planning to " - + "integrate it in a service", - ) - - parser.add_argument( - '--no-capture-stdout', - dest='no_capture_stdout', - required=False, - action='store_true', - help="Set this flag if you have max stack depth " - + "exceeded errors so stdout won't be captured by " - + "the logging system", - ) - - parser.add_argument( - '--no-capture-stderr', - dest='no_capture_stderr', - required=False, - action='store_true', - help="Set this flag if you have max stack depth " - + "exceeded errors so stderr won't be captured by " - + "the logging system", - ) - - parser.add_argument( - '--redis-queue', - dest='redis_queue', - required=False, - default=cls._default_redis_queue, - help="Name of the Redis queue to be used to internally deliver messages " - "(default: platypush/bus)", - ) - - parser.add_argument( - '--start-redis', - dest='start_redis', - required=False, - action='store_true', - help="Set this flag if you want to run and manage Redis internally " - "from the app rather than using an external server. It requires the " - "redis-server executable to be present in the path", - ) - - parser.add_argument( - '--redis-host', - dest='redis_host', - required=False, - default=None, - help="Overrides the host specified in the redis section of the " - "configuration file", - ) - - parser.add_argument( - '--redis-port', - dest='redis_port', - required=False, - default=None, - help="Overrides the port specified in the redis section of the " - "configuration file", - ) - - opts, _ = parser.parse_known_args(args) - if opts.version: - print(__version__) - sys.exit(0) - + opts = parse_cmdline(args) return cls( config_file=opts.config, workdir=opts.workdir, @@ -435,7 +313,7 @@ def main(*args: str): """ Application entry point. """ - app = Application.build(*args) + app = Application.from_cmdline(args) app.run() diff --git a/platypush/cli.py b/platypush/cli.py new file mode 100644 index 0000000000..81bc4c35f4 --- /dev/null +++ b/platypush/cli.py @@ -0,0 +1,138 @@ +import argparse +from typing import Sequence + +from platypush.bus.redis import RedisBus + + +def parse_cmdline(args: Sequence[str]) -> argparse.Namespace: + """ + Parse command-line arguments from a list of strings. + """ + parser = argparse.ArgumentParser() + + parser.add_argument( + '--config', + '-c', + dest='config', + required=False, + default=None, + help='Custom location for the configuration file', + ) + + parser.add_argument( + '--workdir', + '-w', + dest='workdir', + required=False, + default=None, + help='Custom working directory to be used for the application', + ) + + parser.add_argument( + '--logsdir', + '-l', + dest='logsdir', + required=False, + default=None, + help='Store logs in the specified directory. By default, the ' + '`[logging.]filename` configuration option will be used. If not ' + 'set, logging will be sent to stdout and stderr.', + ) + + parser.add_argument( + '--version', + dest='version', + required=False, + action='store_true', + help="Print the current version and exit", + ) + + parser.add_argument( + '--verbose', + '-v', + dest='verbose', + required=False, + action='store_true', + help="Enable verbose/debug logging", + ) + + parser.add_argument( + '--pidfile', + '-P', + dest='pidfile', + required=False, + default=None, + help="File where platypush will " + + "store its PID, useful if you're planning to " + + "integrate it in a service", + ) + + parser.add_argument( + '--no-capture-stdout', + dest='no_capture_stdout', + required=False, + action='store_true', + help="Set this flag if you have max stack depth " + + "exceeded errors so stdout won't be captured by " + + "the logging system", + ) + + parser.add_argument( + '--no-capture-stderr', + dest='no_capture_stderr', + required=False, + action='store_true', + help="Set this flag if you have max stack depth " + + "exceeded errors so stderr won't be captured by " + + "the logging system", + ) + + parser.add_argument( + '--redis-queue', + dest='redis_queue', + required=False, + default=RedisBus.DEFAULT_REDIS_QUEUE, + help="Name of the Redis queue to be used to internally deliver messages " + f"(default: {RedisBus.DEFAULT_REDIS_QUEUE})", + ) + + parser.add_argument( + '--start-redis', + dest='start_redis', + required=False, + action='store_true', + help="Set this flag if you want to run and manage Redis internally " + "from the app rather than using an external server. It requires the " + "redis-server executable to be present in the path", + ) + + parser.add_argument( + '--redis-host', + dest='redis_host', + required=False, + default=None, + help="Overrides the host specified in the redis section of the " + "configuration file", + ) + + parser.add_argument( + '--redis-port', + dest='redis_port', + required=False, + default=None, + help="Overrides the port specified in the redis section of the " + "configuration file", + ) + + parser.add_argument( + '--ctrl-sock', + dest='ctrl_sock', + required=False, + default=None, + help="If set, it identifies a path to a UNIX domain socket that " + "the application can use to send control messages (e.g. STOP and " + "RESTART) to its parent.", + ) + + opts, _ = parser.parse_known_args(args) + return opts From efef9d7bc09fd2d26d920e51ef88fbfc5cb3fe06 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Sun, 13 Aug 2023 23:21:36 +0200 Subject: [PATCH 04/18] Added `commands` module. --- platypush/commands/__init__.py | 0 platypush/commands/_base.py | 78 ++++++++++++++++++++++++++++++++++ platypush/commands/_reader.py | 69 ++++++++++++++++++++++++++++++ platypush/commands/_writer.py | 25 +++++++++++ 4 files changed, 172 insertions(+) create mode 100644 platypush/commands/__init__.py create mode 100644 platypush/commands/_base.py create mode 100644 platypush/commands/_reader.py create mode 100644 platypush/commands/_writer.py diff --git a/platypush/commands/__init__.py b/platypush/commands/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/platypush/commands/_base.py b/platypush/commands/_base.py new file mode 100644 index 0000000000..e029687341 --- /dev/null +++ b/platypush/commands/_base.py @@ -0,0 +1,78 @@ +from abc import ABC, abstractmethod +import json +from logging import getLogger, Logger + + +class Command(ABC): + """ + Base class for application commands. + """ + + END_OF_COMMAND = b'\x00' + """End-of-command marker.""" + + def __init__(self, **args) -> None: + self.args = args + + @property + def logger(self) -> Logger: + """ + The command class logger. + """ + return getLogger(self.__class__.__name__) + + @abstractmethod + def __call__(self, app, *_, **__): + """ + Execute the command. + """ + raise NotImplementedError() + + def __str__(self) -> str: + """ + :return: A JSON representation of the command. + """ + return json.dumps( + { + 'type': 'command', + 'command': self.__class__.__name__, + 'args': self.args, + } + ) + + def to_bytes(self): + """ + :return: A JSON representation of the command. + """ + return str(self).encode('utf-8') + self.END_OF_COMMAND + + @classmethod + def parse(cls, data: bytes) -> "Command": + """ + :param data: A JSON representation of the command. + :raise ValueError: If the data is invalid. + :return: The command instance or None if the data is invalid. + """ + import platypush.commands + + try: + json_data = json.loads(data.decode('utf-8')) + except json.JSONDecodeError as e: + raise ValueError from e + + kind = json_data.pop('type', None) + if kind != 'command': + raise ValueError(f'Invalid command type: {kind}') + + command_name = json_data.get('command') + if not command_name: + raise ValueError(f'Invalid command name: {command_name}') + + cmd_class = getattr(platypush.commands, command_name, None) + if not (cmd_class and issubclass(cmd_class, Command)): + raise ValueError(f'Invalid command class: {command_name}') + + try: + return cmd_class(**json_data.get('args', {})) + except Exception as e: + raise ValueError(e) from e diff --git a/platypush/commands/_reader.py b/platypush/commands/_reader.py new file mode 100644 index 0000000000..8eee5a3fc7 --- /dev/null +++ b/platypush/commands/_reader.py @@ -0,0 +1,69 @@ +from logging import getLogger +from socket import socket +from typing import Optional + +from platypush.commands import Command + + +# pylint: disable=too-few-public-methods +class CommandReader: + """ + Reads command objects from file-like I/O objects. + """ + + _max_bufsize = 8192 + """Maximum size of a command that can be queued in the stream.""" + + _bufsize = 1024 + """Size of the buffer used to read commands from the socket.""" + + def __init__(self): + self.logger = getLogger(__name__) + self._buf = bytes() + + def _parse_command(self, data: bytes) -> Optional[Command]: + """ + Parses a command from the received data. + + :param data: Data received from the socket + :return: The parsed command + """ + try: + return Command.parse(data) + except ValueError as e: + self.logger.warning('Error while parsing command: %s', e) + return None + + def read(self, sock: socket) -> Optional[Command]: + """ + Parses the next command from the file-like I/O object. + + :param fp: The file-like I/O object to read from. + :return: The parsed command. + """ + try: + data = sock.recv(self._bufsize) + except OSError as e: + self.logger.warning( + 'Error while reading from socket %s: %s', sock.getsockname(), e + ) + return None + + for ch in data: + if bytes([ch]) == Command.END_OF_COMMAND: + cmd = self._parse_command(self._buf) + self._buf = bytes() + + if cmd: + return cmd + elif len(self._buf) >= self._max_bufsize: + self.logger.warning( + 'The received command is too long: length=%d', len(self._buf) + ) + + self._buf = bytes() + break + else: + self._buf += bytes([ch]) + + return None diff --git a/platypush/commands/_writer.py b/platypush/commands/_writer.py new file mode 100644 index 0000000000..cf701128da --- /dev/null +++ b/platypush/commands/_writer.py @@ -0,0 +1,25 @@ +from logging import getLogger +from socket import socket + +from platypush.commands import Command + + +# pylint: disable=too-few-public-methods +class CommandWriter: + """ + Writes command objects to file-like I/O objects. + """ + + def __init__(self): + self.logger = getLogger(__name__) + + def write(self, cmd: Command, sock: socket): + """ + Writes a command to a file-like I/O object. + + :param cmd: The command to write. + :param fp: The file-like I/O object to write to. + """ + + buf = cmd.to_bytes() + sock.sendall(buf) From b1f244a812c1880be7fde918c8941fceb41cf973 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Sun, 13 Aug 2023 23:33:08 +0200 Subject: [PATCH 05/18] Added `ControllableProcess` class. This class can be used to easily control the execution of underlying processes. --- platypush/process/__init__.py | 135 ++++++++++++++++++++++++++++++++++ 1 file changed, 135 insertions(+) create mode 100644 platypush/process/__init__.py diff --git a/platypush/process/__init__.py b/platypush/process/__init__.py new file mode 100644 index 0000000000..37eddac49f --- /dev/null +++ b/platypush/process/__init__.py @@ -0,0 +1,135 @@ +from abc import ABC, abstractmethod +from logging import getLogger +from multiprocessing import Event, Process, RLock +from os import getpid +from typing import Optional +from typing_extensions import override + + +class ControllableProcess(Process, ABC): + """ + Extends the ``Process`` class to allow for external extended control of the + underlying process. + """ + + _kill_timeout: float = 10.0 + + def __init__(self, *args, timeout: Optional[float] = None, **kwargs): + kwargs['name'] = kwargs.get('name', self.__class__.__name__) + super().__init__(*args, **kwargs) + self.timeout = timeout + self.logger = getLogger(self.name) + self._should_stop = Event() + self._stop_lock = RLock() + self._should_restart = False + + @property + def _timeout_err(self) -> TimeoutError: + return TimeoutError(f'Process {self.name} timed out') + + @property + def should_stop(self) -> bool: + """ + :return: ``True`` if the process is scheduled for stop, ``False`` + otherwise. + """ + return self._should_stop.is_set() + + def wait_stop(self, timeout: Optional[float] = None) -> None: + """ + Waits for the process to stop. + + :param timeout: The maximum time to wait for the process to stop. + """ + timeout = timeout if timeout is not None else self.timeout + stopped = self._should_stop.wait(timeout=timeout) + + if not stopped: + raise self._timeout_err + + if self.pid == getpid(): + return # Prevent termination deadlock + + self.join(timeout=timeout) + if self.is_alive(): + raise self._timeout_err + + def stop(self, timeout: Optional[float] = None) -> None: + """ + Stops the process. + + :param timeout: The maximum time to wait for the process to stop. + """ + timeout = timeout if timeout is not None else self.timeout + with self._stop_lock: + self._should_stop.set() + self.on_stop() + + if self.pid == getpid(): + return # Prevent termination deadlock + + try: + self.wait_stop(timeout=timeout) + except TimeoutError: + pass + finally: + self.terminate() + + try: + self.wait_stop(timeout=self._kill_timeout) + except TimeoutError: + self.logger.warning( + 'The process %s is still alive after %f seconds, killing it', + self.name, + self._kill_timeout, + ) + self.kill() + + def on_stop(self) -> None: + """ + Handler called when the process is stopped. + + It can be implemented by subclasses. + """ + + @property + def should_restart(self) -> bool: + """ + :return: ``True`` if the process is marked for restart after stop, + ``False`` otherwise. + """ + return self._should_restart + + def mark_for_restart(self): + """ + Marks the process for restart after stop. + """ + self._should_restart = True + + @abstractmethod + def main(self): + """ + The main function of the process. + + It must be implemented by subclasses. + """ + + def _main(self): + """ + Wrapper for the main function of the process. + """ + self._should_restart = False + return self.main() + + @override + def run(self) -> None: + """ + Executes the process. + """ + super().run() + + try: + self._main() + finally: + self._should_stop.set() + self.logger.info('Process terminated') From c89f99286711f2c7a6ee9803552471d0fd3522bc Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Sun, 13 Aug 2023 23:36:36 +0200 Subject: [PATCH 06/18] Added `StopCommand` and `RestartCommand`. --- platypush/commands/_commands/__init__.py | 2 ++ platypush/commands/_commands/_app_ctrl.py | 25 +++++++++++++++++++++++ 2 files changed, 27 insertions(+) create mode 100644 platypush/commands/_commands/__init__.py create mode 100644 platypush/commands/_commands/_app_ctrl.py diff --git a/platypush/commands/_commands/__init__.py b/platypush/commands/_commands/__init__.py new file mode 100644 index 0000000000..22cdf7beca --- /dev/null +++ b/platypush/commands/_commands/__init__.py @@ -0,0 +1,2 @@ +# flake8: noqa +from ._app_ctrl import * diff --git a/platypush/commands/_commands/_app_ctrl.py b/platypush/commands/_commands/_app_ctrl.py new file mode 100644 index 0000000000..780d8b15cc --- /dev/null +++ b/platypush/commands/_commands/_app_ctrl.py @@ -0,0 +1,25 @@ +from typing_extensions import override + +from platypush.commands import Command + + +class StopCommand(Command): + """ + Stop the application. + """ + + @override + def __call__(self, app, *_, **__): + self.logger.info('Received StopApplication command.') + app.stop() + + +class RestartCommand(Command): + """ + Restart the application. + """ + + @override + def __call__(self, app, *_, **__): + self.logger.info('Received RestartApplication command.') + app.restart() From 06111587f7de6bc2c987aade55634e25248a747d Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Sun, 13 Aug 2023 23:37:51 +0200 Subject: [PATCH 07/18] Added `CommandStream` class. This abstraction is used to write and read commands over a UNIX socket. --- platypush/commands/__init__.py | 5 ++ platypush/commands/_stream.py | 138 +++++++++++++++++++++++++++++++++ 2 files changed, 143 insertions(+) create mode 100644 platypush/commands/_stream.py diff --git a/platypush/commands/__init__.py b/platypush/commands/__init__.py index e69de29bb2..73a4eaf420 100644 --- a/platypush/commands/__init__.py +++ b/platypush/commands/__init__.py @@ -0,0 +1,5 @@ +from ._base import Command +from ._commands import RestartCommand, StopCommand +from ._stream import CommandStream + +__all__ = ["Command", "CommandStream", "RestartCommand", "StopCommand"] diff --git a/platypush/commands/_stream.py b/platypush/commands/_stream.py new file mode 100644 index 0000000000..555c1301fd --- /dev/null +++ b/platypush/commands/_stream.py @@ -0,0 +1,138 @@ +from multiprocessing import Queue +import os +from queue import Empty +import socket +import tempfile +from typing import Optional +from typing_extensions import override + +from platypush.process import ControllableProcess + +from ._base import Command +from ._reader import CommandReader +from ._writer import CommandWriter + + +class CommandStream(ControllableProcess): + """ + The command stream is an abstraction built around a UNIX socket that allows + the application to communicate commands to its controller. + + :param path: Path to the UNIX socket + """ + + _max_queue_size = 100 + """Maximum number of commands that can be queued in the stream.""" + + _default_sock_path = tempfile.gettempdir() + '/platypush-cmd-stream.sock' + """Default path to the UNIX socket.""" + + _close_timeout = 5.0 + """Close the client socket after this amount of seconds.""" + + def __init__(self, path: Optional[str] = None): + super().__init__(name='platypush-cmd-stream') + self.path = os.path.abspath(os.path.expanduser(path or self._default_sock_path)) + self._sock: Optional[socket.socket] = None + self._cmd_queue: Queue["Command"] = Queue() + + def reset(self): + if self._sock is not None: + try: + self._sock.close() + except socket.error: + pass + + self._sock = None + + try: + os.unlink(self.path) + except FileNotFoundError: + pass + + self._cmd_queue.close() + self._cmd_queue = Queue() + + @override + def close(self) -> None: + self.reset() + return super().close() + + def __enter__(self) -> "CommandStream": + self.reset() + sock = self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + sock.bind(self.path) + os.chmod(self.path, 0o600) + sock.listen(1) + return self + + def __exit__(self, *_, **__): + self.terminate() + self.join() + self.close() + + def _serve(self, sock: socket.socket): + """ + Serves the command stream. + + :param sock: Client socket to serve + """ + reader = CommandReader() + cmd = reader.read(sock) + if cmd: + self._cmd_queue.put_nowait(cmd) + + def read(self, timeout: Optional[float] = None) -> Optional[Command]: + """ + Reads commands from the command stream. + + :param timeout: Maximum time to wait for a command. + :return: The command that was received, if any. + """ + try: + return self._cmd_queue.get(timeout=timeout) + except Empty: + return None + + def write(self, cmd: Command) -> None: + """ + Writes a command to the command stream. + + :param cmd: Command to write + :raise AssertionError: If the command cannot be written + """ + sock = self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + try: + sock.connect(self.path) + self.logger.debug('Sending command: %s', cmd) + CommandWriter().write(cmd, sock) + except OSError as e: + raise AssertionError(f'Unable to connect to socket {self.path}: {e}') from e + finally: + sock.close() + + @staticmethod + def _close_client(sock: socket.socket): + try: + sock.close() + except OSError: + pass + + @override + def main(self): + while self._sock and not self.should_stop: + sock = self._sock + + try: + conn, _ = sock.accept() + except ConnectionResetError: + continue + except KeyboardInterrupt: + break + + try: + self._serve(conn) + except Exception as e: + self.logger.warning('Unexpected socket error: %s', e, exc_info=True) + finally: + self._close_client(conn) From 8819a0ed4c149ffef3bad21b694b584816adb0ad Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Sun, 13 Aug 2023 23:51:21 +0200 Subject: [PATCH 08/18] Added `CommandStream` to the main `Application`. The stream will be used to communicate command messages from the application to its runner. --- platypush/app.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/platypush/app.py b/platypush/app.py index 78fc8f4967..9d218ab99b 100644 --- a/platypush/app.py +++ b/platypush/app.py @@ -7,6 +7,7 @@ from typing import Optional, Sequence from .bus import Bus from .bus.redis import RedisBus from .cli import parse_cmdline +from .commands import CommandStream from .config import Config from .context import register_backends, register_plugins from .cron.scheduler import CronScheduler @@ -48,6 +49,7 @@ class Application: start_redis: bool = False, redis_host: Optional[str] = None, redis_port: Optional[int] = None, + ctrl_sock: Optional[str] = None, ): """ :param config_file: Configuration file override (default: None). @@ -79,6 +81,9 @@ class Application: the settings in the ``redis`` section of the configuration file. :param redis_port: Port of the local Redis server. It overrides the settings in the ``redis`` section of the configuration file. + :param ctrl_sock: If set, it identifies a path to a UNIX domain socket + that the application can use to send control messages (e.g. STOP + and RESTART) to its parent. """ self.pidfile = pidfile @@ -110,6 +115,7 @@ class Application: self.redis_port = redis_port self.redis_conf = {} self._redis_proc: Optional[subprocess.Popen] = None + self.cmd_stream = CommandStream(ctrl_sock) self._init_bus() self._init_logging() @@ -201,6 +207,7 @@ class Application: start_redis=opts.start_redis, redis_host=opts.redis_host, redis_port=opts.redis_port, + ctrl_sock=opts.ctrl_sock, ) def on_message(self): From c11bc69a665c92da173968e3acab1b4fb3ee5cb9 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Sun, 13 Aug 2023 23:55:40 +0200 Subject: [PATCH 09/18] Handle `KeyboardInterrupt` and process return code in the main. --- platypush/app.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/platypush/app.py b/platypush/app.py index 9d218ab99b..d262b6158e 100644 --- a/platypush/app.py +++ b/platypush/app.py @@ -321,7 +321,18 @@ def main(*args: str): Application entry point. """ app = Application.from_cmdline(args) - app.run() + + try: + app.run() + except KeyboardInterrupt: + pass + + log.info('Application stopped') + return 0 + + +if __name__ == '__main__': + sys.exit(main(*sys.argv[1:])) # vim:sw=4:ts=4:et: From dc1a1524335ff36b09b7a6f7c49446e4c0151005 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Mon, 14 Aug 2023 10:46:27 +0200 Subject: [PATCH 10/18] Added `get_default_pid_file` utility method. --- platypush/__init__.py | 3 ++- platypush/cli.py | 5 +++-- platypush/utils/__init__.py | 8 ++++++++ 3 files changed, 13 insertions(+), 3 deletions(-) diff --git a/platypush/__init__.py b/platypush/__init__.py index f4e9ecb921..ede8fee4bc 100644 --- a/platypush/__init__.py +++ b/platypush/__init__.py @@ -5,12 +5,13 @@ Platypush .. license: MIT """ -from .app import Application, main +from .app import Application from .config import Config from .context import get_backend, get_bus, get_plugin from .message.event import Event from .message.request import Request from .message.response import Response +from .runner import main __author__ = 'Fabio Manganiello ' diff --git a/platypush/cli.py b/platypush/cli.py index 81bc4c35f4..9864c16be0 100644 --- a/platypush/cli.py +++ b/platypush/cli.py @@ -2,6 +2,7 @@ import argparse from typing import Sequence from platypush.bus.redis import RedisBus +from platypush.utils import get_default_pid_file def parse_cmdline(args: Sequence[str]) -> argparse.Namespace: @@ -61,10 +62,10 @@ def parse_cmdline(args: Sequence[str]) -> argparse.Namespace: '-P', dest='pidfile', required=False, - default=None, + default=get_default_pid_file(), help="File where platypush will " + "store its PID, useful if you're planning to " - + "integrate it in a service", + + f"integrate it in a service (default: {get_default_pid_file()})", ) parser.add_argument( diff --git a/platypush/utils/__init__.py b/platypush/utils/__init__.py index fd80f1e9e6..a164c982b0 100644 --- a/platypush/utils/__init__.py +++ b/platypush/utils/__init__.py @@ -14,6 +14,7 @@ import socket import ssl import urllib.request from threading import Lock as TLock +from tempfile import gettempdir from typing import Generator, Optional, Tuple, Union from dateutil import parser, tz @@ -625,4 +626,11 @@ def get_lock( lock.release() +def get_default_pid_file() -> str: + """ + Get the default PID file path. + """ + return os.path.join(gettempdir(), 'platypush.pid') + + # vim:sw=4:ts=4:et: From 7157936b87b22d1224141ec4d1a843e7338f628e Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Mon, 14 Aug 2023 23:05:32 +0200 Subject: [PATCH 11/18] Added get_remaining_timeout utility function. --- platypush/utils/__init__.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/platypush/utils/__init__.py b/platypush/utils/__init__.py index a164c982b0..20763fc907 100644 --- a/platypush/utils/__init__.py +++ b/platypush/utils/__init__.py @@ -15,7 +15,8 @@ import ssl import urllib.request from threading import Lock as TLock from tempfile import gettempdir -from typing import Generator, Optional, Tuple, Union +import time +from typing import Generator, Optional, Tuple, Type, Union from dateutil import parser, tz from redis import Redis @@ -633,4 +634,16 @@ def get_default_pid_file() -> str: return os.path.join(gettempdir(), 'platypush.pid') +def get_remaining_timeout( + timeout: Optional[float], start: float, cls: Union[Type[int], Type[float]] = float +) -> Optional[Union[int, float]]: + """ + Get the remaining timeout, given a start time. + """ + if timeout is None: + return None + + return cls(max(0, timeout - (time.time() - start))) + + # vim:sw=4:ts=4:et: From ceb9d6d1bac525bbcb1d5e7b6c9195f3efa3d1e5 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Mon, 14 Aug 2023 23:37:38 +0200 Subject: [PATCH 12/18] Removed redundant `Backend._get_redis()` method. It was used only once, and it could easily be replaced by `platypush.utils.get_redis()`. --- platypush/backend/__init__.py | 19 ++----------------- 1 file changed, 2 insertions(+), 17 deletions(-) diff --git a/platypush/backend/__init__.py b/platypush/backend/__init__.py index 1afc4385be..92929b77f0 100644 --- a/platypush/backend/__init__.py +++ b/platypush/backend/__init__.py @@ -27,6 +27,7 @@ from platypush.message import Message from platypush.message.event import Event from platypush.message.request import Request from platypush.message.response import Response +from platypush.utils import get_redis class Backend(Thread, EventGenerator, ExtensionWithManifest): @@ -308,22 +309,6 @@ class Backend(Thread, EventGenerator, ExtensionWithManifest): def wait_stop(self, timeout=None) -> bool: return self._stop_event.wait(timeout) - def _get_redis(self): - import redis - - redis_backend = get_backend('redis') - if not redis_backend: - self.logger.warning( - 'Redis backend not configured - some ' - 'web server features may not be working properly' - ) - redis_args = {} - else: - redis_args = redis_backend.redis_args - - redis = redis.Redis(**redis_args) - return redis - def get_message_response(self, msg): queue = get_redis_queue_name_by_message(msg) if not queue: @@ -331,7 +316,7 @@ class Backend(Thread, EventGenerator, ExtensionWithManifest): return None try: - redis = self._get_redis() + redis = get_redis() response = redis.blpop(queue, timeout=60) if response and len(response) > 1: response = Message.build(response[1]) From 04921c759fd0300a078ee652e1e925fa4f30ed5c Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Mon, 14 Aug 2023 23:49:47 +0200 Subject: [PATCH 13/18] Added `wait_stop` method to the entities engine and cron scheduler. --- platypush/cron/scheduler.py | 19 +++++++++++++++++-- platypush/entities/_engine/__init__.py | 18 +++++++++++++++++- 2 files changed, 34 insertions(+), 3 deletions(-) diff --git a/platypush/cron/scheduler.py b/platypush/cron/scheduler.py index 55b40a98de..b7d816fe01 100644 --- a/platypush/cron/scheduler.py +++ b/platypush/cron/scheduler.py @@ -2,13 +2,14 @@ import datetime import enum import logging import threading -from typing import Dict +import time +from typing import Dict, Optional import croniter from dateutil.tz import gettz from platypush.procedure import Procedure -from platypush.utils import is_functional_cron +from platypush.utils import get_remaining_timeout, is_functional_cron logger = logging.getLogger('platypush:cron') @@ -198,6 +199,20 @@ class CronScheduler(threading.Thread): def should_stop(self): return self._should_stop.is_set() + def wait_stop(self, timeout: Optional[float] = None): + start = time.time() + stopped = self._should_stop.wait( + timeout=get_remaining_timeout(timeout=timeout, start=start) + ) + + if not stopped: + raise TimeoutError( + f'Timeout waiting for {self.__class__.__name__} to stop.' + ) + + if threading.get_ident() != self.ident: + self.join(timeout=get_remaining_timeout(timeout=timeout, start=start)) + def run(self): logger.info('Running cron scheduler') diff --git a/platypush/entities/_engine/__init__.py b/platypush/entities/_engine/__init__.py index cdd9087402..0f3a22f339 100644 --- a/platypush/entities/_engine/__init__.py +++ b/platypush/entities/_engine/__init__.py @@ -1,5 +1,6 @@ from logging import getLogger -from threading import Thread, Event +from threading import Thread, Event, get_ident +from time import time from typing import Dict, Optional from platypush.context import get_bus @@ -9,6 +10,7 @@ from platypush.message.event.entities import EntityUpdateEvent from platypush.entities._base import EntityKey, EntitySavedCallback from platypush.entities._engine.queue import EntitiesQueue from platypush.entities._engine.repo import EntitiesRepository +from platypush.utils import get_remaining_timeout class EntitiesEngine(Thread): @@ -69,6 +71,20 @@ class EntitiesEngine(Thread): def stop(self): self._should_stop.set() + def wait_stop(self, timeout: Optional[float] = None): + start = time() + stopped = self._should_stop.wait( + timeout=get_remaining_timeout(timeout=timeout, start=start) + ) + + if not stopped: + raise TimeoutError( + f'Timeout waiting for {self.__class__.__name__} to stop.' + ) + + if get_ident() != self.ident: + self.join(timeout=get_remaining_timeout(timeout=timeout, start=start)) + def notify(self, *entities: Entity): """ Trigger an EntityUpdateEvent if the entity has been persisted, or queue From a8a7ceb2ac48bd9fa7a8e14ac5652b3d7c05e92b Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Tue, 15 Aug 2023 00:13:34 +0200 Subject: [PATCH 14/18] Implemented `HttpBackend._stop_workers`. The Tornado WSGI container won't guarantee the termination of the spawned workers upon termination, so the code of the backend has to take care of it and terminate all the children processes of the server process when it terminates. This also means that `psutil` is now a required base dependency, as we need to expand the process subtree under the webserver launcher. --- platypush/backend/http/__init__.py | 119 ++++++++++++++++++++++------- 1 file changed, 92 insertions(+), 27 deletions(-) diff --git a/platypush/backend/http/__init__.py b/platypush/backend/http/__init__.py index 12ce9f2c5d..6bda90657e 100644 --- a/platypush/backend/http/__init__.py +++ b/platypush/backend/http/__init__.py @@ -2,13 +2,17 @@ import asyncio import os import pathlib import secrets +import signal import threading +from functools import partial from multiprocessing import Process from time import time -from typing import List, Mapping, Optional -from tornado.httpserver import HTTPServer +from typing import Mapping, Optional +import psutil + +from tornado.httpserver import HTTPServer from tornado.netutil import bind_sockets from tornado.process import cpu_count, fork_processes from tornado.wsgi import WSGIContainer @@ -18,9 +22,9 @@ from platypush.backend import Backend from platypush.backend.http.app import application from platypush.backend.http.app.utils import get_streaming_routes, get_ws_routes from platypush.backend.http.app.ws.events import WSEventProxy - from platypush.bus.redis import RedisBus from platypush.config import Config +from platypush.utils import get_remaining_timeout class HttpBackend(Backend): @@ -191,6 +195,9 @@ class HttpBackend(Backend): _DEFAULT_HTTP_PORT = 8008 """The default listen port for the webserver.""" + _STOP_TIMEOUT = 5 + """How long we should wait (in seconds) before killing the worker processes.""" + def __init__( self, port: int = _DEFAULT_HTTP_PORT, @@ -227,7 +234,6 @@ class HttpBackend(Backend): self.port = port self._server_proc: Optional[Process] = None - self._workers: List[Process] = [] self._service_registry_thread = None self.bind_address = bind_address @@ -254,39 +260,37 @@ class HttpBackend(Backend): """On backend stop""" super().on_stop() self.logger.info('Received STOP event on HttpBackend') - - start_time = time() - timeout = 5 - workers = self._workers.copy() - - for i, worker in enumerate(workers[::-1]): - if worker and worker.is_alive(): - worker.terminate() - worker.join(timeout=max(0, start_time + timeout - time())) - - if worker and worker.is_alive(): - worker.kill() - self._workers.pop(i) + start = time() + remaining_time: partial[float] = partial( # type: ignore + get_remaining_timeout, timeout=self._STOP_TIMEOUT, start=start + ) if self._server_proc: - try: - self._server_proc.terminate() - self._server_proc.join(timeout=5) - except AttributeError: - pass + if self._server_proc.pid: + try: + os.kill(self._server_proc.pid, signal.SIGINT) + except OSError: + pass - self._server_proc = None + if self._server_proc and self._server_proc.is_alive(): + self._server_proc.join(timeout=remaining_time() / 2) + try: + self._server_proc.terminate() + self._server_proc.join(timeout=remaining_time() / 2) + except AttributeError: + pass if self._server_proc and self._server_proc.is_alive(): self._server_proc.kill() self._server_proc = None - self.logger.info('HTTP server terminated') if self._service_registry_thread and self._service_registry_thread.is_alive(): - self._service_registry_thread.join(timeout=5) + self._service_registry_thread.join(timeout=remaining_time()) self._service_registry_thread = None + self.logger.info('HTTP server terminated') + def notify_web_clients(self, event): """Notify all the connected web clients (over websocket) of a new event""" WSEventProxy.publish(event) # noqa: E1120 @@ -348,7 +352,10 @@ class HttpBackend(Backend): try: await asyncio.Event().wait() except (asyncio.CancelledError, KeyboardInterrupt): - return + pass + finally: + server.stop() + await server.close_all_connections() def _web_server_proc(self): self.logger.info( @@ -375,7 +382,65 @@ class HttpBackend(Backend): future = self._post_fork_main(sockets) asyncio.run(future) except (asyncio.CancelledError, KeyboardInterrupt): - return + pass + finally: + self._stop_workers() + + def _stop_workers(self): + """ + Stop all the worker processes. + + We have to run this manually on server termination because of a + long-standing issue with Tornado not being able to wind down the forked + workers when the server terminates: + https://github.com/tornadoweb/tornado/issues/1912. + """ + parent_pid = ( + self._server_proc.pid + if self._server_proc and self._server_proc.pid + else None + ) + + if not parent_pid: + return + + try: + cur_proc = psutil.Process(parent_pid) + except psutil.NoSuchProcess: + return + + # Send a SIGTERM to all the children + children = cur_proc.children() + for child in children: + if child.pid != parent_pid and child.is_running(): + try: + os.kill(child.pid, signal.SIGTERM) + except OSError as e: + self.logger.warning( + 'Could not send SIGTERM to PID %d: %s', child.pid, e + ) + + # Initialize the timeout + start = time() + remaining_time: partial[int] = partial( # type: ignore + get_remaining_timeout, timeout=self._STOP_TIMEOUT, start=start, cls=int + ) + + # Wait for all children to terminate (with timeout) + for child in children: + if child.pid != parent_pid and child.is_running(): + try: + child.wait(timeout=remaining_time()) + except TimeoutError: + pass + + # Send a SIGKILL to any child process that is still running + for child in children: + if child.pid != parent_pid and child.is_running(): + try: + child.kill() + except OSError: + pass def _start_web_server(self): self._server_proc = Process(target=self._web_server_proc) From 46245e851f228ba1f1e75bbe07265773fcbee371 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Tue, 15 Aug 2023 02:06:13 +0200 Subject: [PATCH 15/18] Synchronize with the currently running stop thread (if any) in `Backend.wait_stop`. --- platypush/backend/__init__.py | 53 ++++++++++++++++++++++++++--------- 1 file changed, 39 insertions(+), 14 deletions(-) diff --git a/platypush/backend/__init__.py b/platypush/backend/__init__.py index 92929b77f0..c8b0ea65d6 100644 --- a/platypush/backend/__init__.py +++ b/platypush/backend/__init__.py @@ -6,28 +6,28 @@ import time from threading import Thread, Event as ThreadEvent, get_ident from typing import Optional, Dict +from platypush import __version__ from platypush.bus import Bus from platypush.common import ExtensionWithManifest from platypush.config import Config from platypush.context import get_backend +from platypush.event import EventGenerator +from platypush.message import Message +from platypush.message.event import Event from platypush.message.event.zeroconf import ( ZeroconfServiceAddedEvent, ZeroconfServiceRemovedEvent, ) -from platypush.utils import ( - set_timeout, - clear_timeout, - get_redis_queue_name_by_message, - get_backend_name_by_class, -) - -from platypush import __version__ -from platypush.event import EventGenerator -from platypush.message import Message -from platypush.message.event import Event from platypush.message.request import Request from platypush.message.response import Response -from platypush.utils import get_redis +from platypush.utils import ( + clear_timeout, + get_backend_name_by_class, + get_redis, + get_redis_queue_name_by_message, + get_remaining_timeout, + set_timeout, +) class Backend(Thread, EventGenerator, ExtensionWithManifest): @@ -69,6 +69,7 @@ class Backend(Thread, EventGenerator, ExtensionWithManifest): self.device_id = Config.get('device_id') self.thread_id = None self._stop_event = ThreadEvent() + self._stop_thread: Optional[Thread] = None self._kwargs = kwargs self.logger = logging.getLogger( 'platypush:backend:' + get_backend_name_by_class(self.__class__) @@ -300,14 +301,38 @@ class Backend(Thread, EventGenerator, ExtensionWithManifest): self._stop_event.set() self.unregister_service() self.on_stop() + self._stop_thread = None - Thread(target=_async_stop).start() + if not (self._stop_thread and self._stop_thread.is_alive()): + self._stop_thread = Thread(target=_async_stop) + self._stop_thread.start() def should_stop(self): + """ + :return: True if the backend thread should be stopped, False otherwise. + """ return self._stop_event.is_set() def wait_stop(self, timeout=None) -> bool: - return self._stop_event.wait(timeout) + """ + Waits for the backend thread to stop. + + :param timeout: The maximum time to wait for the backend thread to stop (default: None) + :return: True if the backend thread has stopped, False otherwise. + """ + start = time.time() + + if self._stop_thread: + try: + self._stop_thread.join( + get_remaining_timeout(timeout=timeout, start=start) + ) + except AttributeError: + pass + + return self._stop_event.wait( + get_remaining_timeout(timeout=timeout, start=start) + ) def get_message_response(self, msg): queue = get_redis_queue_name_by_message(msg) From f51beb271ee2ebf4227c00cd754e95434fd36eb1 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Tue, 15 Aug 2023 11:12:21 +0200 Subject: [PATCH 16/18] Large refactor + stability fixes for the external process control logic. --- platypush/__main__.py | 9 +- platypush/app/__init__.py | 4 + platypush/app/__main__.py | 7 ++ platypush/{app.py => app/_app.py} | 117 ++++++++++++++++--------- platypush/commands/_stream.py | 3 +- platypush/plugins/system/manifest.yaml | 1 - platypush/process/__init__.py | 16 ++-- platypush/runner/__init__.py | 17 ++++ platypush/runner/__main__.py | 5 ++ platypush/runner/_app.py | 59 +++++++++++++ platypush/runner/_runner.py | 91 +++++++++++++++++++ requirements.txt | 1 + setup.py | 3 +- 13 files changed, 273 insertions(+), 60 deletions(-) create mode 100644 platypush/app/__init__.py create mode 100644 platypush/app/__main__.py rename platypush/{app.py => app/_app.py} (81%) create mode 100644 platypush/runner/__init__.py create mode 100644 platypush/runner/__main__.py create mode 100644 platypush/runner/_app.py create mode 100644 platypush/runner/_runner.py diff --git a/platypush/__main__.py b/platypush/__main__.py index 2b4e690896..737e265b75 100644 --- a/platypush/__main__.py +++ b/platypush/__main__.py @@ -1,10 +1,5 @@ import sys -from platypush.app import main +from platypush.runner import main - -if __name__ == '__main__': - main(*sys.argv[1:]) - - -# vim:sw=4:ts=4:et: +main(*sys.argv[1:]) diff --git a/platypush/app/__init__.py b/platypush/app/__init__.py new file mode 100644 index 0000000000..4bd528318e --- /dev/null +++ b/platypush/app/__init__.py @@ -0,0 +1,4 @@ +from ._app import Application, main + + +__all__ = ["Application", "main"] diff --git a/platypush/app/__main__.py b/platypush/app/__main__.py new file mode 100644 index 0000000000..ed2e42208f --- /dev/null +++ b/platypush/app/__main__.py @@ -0,0 +1,7 @@ +import sys + +from ._app import main + + +if __name__ == '__main__': + sys.exit(main(*sys.argv[1:])) diff --git a/platypush/app.py b/platypush/app/_app.py similarity index 81% rename from platypush/app.py rename to platypush/app/_app.py index d262b6158e..e95d0db3f6 100644 --- a/platypush/app.py +++ b/platypush/app/_app.py @@ -1,24 +1,26 @@ +from contextlib import contextmanager import logging import os +import signal import subprocess import sys from typing import Optional, Sequence -from .bus import Bus -from .bus.redis import RedisBus -from .cli import parse_cmdline -from .commands import CommandStream -from .config import Config -from .context import register_backends, register_plugins -from .cron.scheduler import CronScheduler -from .entities import init_entities_engine, EntitiesEngine -from .event.processor import EventProcessor -from .logger import Logger -from .message.event import Event -from .message.event.application import ApplicationStartedEvent -from .message.request import Request -from .message.response import Response -from .utils import get_enabled_plugins, get_redis_conf +from platypush.bus import Bus +from platypush.bus.redis import RedisBus +from platypush.cli import parse_cmdline +from platypush.commands import CommandStream +from platypush.config import Config +from platypush.context import register_backends, register_plugins +from platypush.cron.scheduler import CronScheduler +from platypush.entities import init_entities_engine, EntitiesEngine +from platypush.event.processor import EventProcessor +from platypush.logger import Logger +from platypush.message.event import Event +from platypush.message.event.application import ApplicationStartedEvent +from platypush.message.request import Request +from platypush.message.response import Response +from platypush.utils import get_enabled_plugins, get_redis_conf log = logging.getLogger('platypush') @@ -87,10 +89,6 @@ class Application: """ self.pidfile = pidfile - if pidfile: - with open(pidfile, 'w') as f: - f.write(str(os.getpid())) - self.bus: Optional[Bus] = None self.redis_queue = redis_queue or RedisBus.DEFAULT_REDIS_QUEUE self.config_file = config_file @@ -98,7 +96,9 @@ class Application: self._logsdir = ( os.path.abspath(os.path.expanduser(logsdir)) if logsdir else None ) + Config.init(self.config_file) + Config.set('ctrl_sock', ctrl_sock) if workdir: Config.set('workdir', os.path.abspath(os.path.expanduser(workdir))) @@ -244,36 +244,66 @@ class Application: def stop(self): """Stops the backends and the bus.""" - from .plugins import RunnablePlugin + from platypush.plugins import RunnablePlugin log.info('Stopping the application') + backends = (self.backends or {}).copy().values() + runnable_plugins = [ + plugin + for plugin in get_enabled_plugins().values() + if isinstance(plugin, RunnablePlugin) + ] - if self.backends: - for backend in self.backends.values(): - backend.stop() + for backend in backends: + backend.stop() - for plugin in get_enabled_plugins().values(): - if isinstance(plugin, RunnablePlugin): - plugin.stop() + for plugin in runnable_plugins: + plugin.stop() + + for backend in backends: + backend.wait_stop() + + for plugin in runnable_plugins: + plugin.wait_stop() + + if self.entities_engine: + self.entities_engine.stop() + self.entities_engine.wait_stop() + self.entities_engine = None + + if self.cron_scheduler: + self.cron_scheduler.stop() + self.cron_scheduler.wait_stop() + self.cron_scheduler = None if self.bus: self.bus.stop() self.bus = None - if self.cron_scheduler: - self.cron_scheduler.stop() - self.cron_scheduler = None - - if self.entities_engine: - self.entities_engine.stop() - self.entities_engine = None - if self.start_redis: self._stop_redis() - def run(self): - """Start the daemon.""" - from . import __version__ + log.info('Exiting application') + + @contextmanager + def _open_pidfile(self): + if self.pidfile: + try: + with open(self.pidfile, 'w') as f: + f.write(str(os.getpid())) + except OSError as e: + log.warning('Failed to create PID file %s: %s', self.pidfile, e) + + yield + + if self.pidfile: + try: + os.remove(self.pidfile) + except OSError as e: + log.warning('Failed to remove PID file %s: %s', self.pidfile, e) + + def _run(self): + from platypush import __version__ if not self.no_capture_stdout: sys.stdout = Logger(log.info) @@ -312,9 +342,17 @@ class Application: self.bus.poll() except KeyboardInterrupt: log.info('SIGINT received, terminating application') + # Ignore other SIGINT signals + signal.signal(signal.SIGINT, signal.SIG_IGN) finally: self.stop() + def run(self): + """Run the application.""" + + with self._open_pidfile(): + self._run() + def main(*args: str): """ @@ -327,12 +365,7 @@ def main(*args: str): except KeyboardInterrupt: pass - log.info('Application stopped') return 0 -if __name__ == '__main__': - sys.exit(main(*sys.argv[1:])) - - # vim:sw=4:ts=4:et: diff --git a/platypush/commands/_stream.py b/platypush/commands/_stream.py index 555c1301fd..3ae185d3ea 100644 --- a/platypush/commands/_stream.py +++ b/platypush/commands/_stream.py @@ -31,7 +31,7 @@ class CommandStream(ControllableProcess): """Close the client socket after this amount of seconds.""" def __init__(self, path: Optional[str] = None): - super().__init__(name='platypush-cmd-stream') + super().__init__(name='platypush:cmd:stream') self.path = os.path.abspath(os.path.expanduser(path or self._default_sock_path)) self._sock: Optional[socket.socket] = None self._cmd_queue: Queue["Command"] = Queue() @@ -64,6 +64,7 @@ class CommandStream(ControllableProcess): sock.bind(self.path) os.chmod(self.path, 0o600) sock.listen(1) + self.start() return self def __exit__(self, *_, **__): diff --git a/platypush/plugins/system/manifest.yaml b/platypush/plugins/system/manifest.yaml index bf404b996d..67137e13b8 100644 --- a/platypush/plugins/system/manifest.yaml +++ b/platypush/plugins/system/manifest.yaml @@ -3,6 +3,5 @@ manifest: install: pip: - py-cpuinfo - - psutil package: platypush.plugins.system type: plugin diff --git a/platypush/process/__init__.py b/platypush/process/__init__.py index 37eddac49f..582a8e7f63 100644 --- a/platypush/process/__init__.py +++ b/platypush/process/__init__.py @@ -1,5 +1,5 @@ from abc import ABC, abstractmethod -from logging import getLogger +import logging from multiprocessing import Event, Process, RLock from os import getpid from typing import Optional @@ -17,8 +17,9 @@ class ControllableProcess(Process, ABC): def __init__(self, *args, timeout: Optional[float] = None, **kwargs): kwargs['name'] = kwargs.get('name', self.__class__.__name__) super().__init__(*args, **kwargs) + self.timeout = timeout - self.logger = getLogger(self.name) + self.logger = logging.getLogger(self.name) self._should_stop = Event() self._stop_lock = RLock() self._should_restart = False @@ -60,23 +61,22 @@ class ControllableProcess(Process, ABC): :param timeout: The maximum time to wait for the process to stop. """ - timeout = timeout if timeout is not None else self.timeout + timeout = timeout if timeout is not None else self._kill_timeout with self._stop_lock: self._should_stop.set() self.on_stop() - if self.pid == getpid(): - return # Prevent termination deadlock - try: - self.wait_stop(timeout=timeout) + if self.pid != getpid(): + self.wait_stop(timeout=timeout) except TimeoutError: pass finally: self.terminate() try: - self.wait_stop(timeout=self._kill_timeout) + if self.pid != getpid(): + self.wait_stop(timeout=self._kill_timeout) except TimeoutError: self.logger.warning( 'The process %s is still alive after %f seconds, killing it', diff --git a/platypush/runner/__init__.py b/platypush/runner/__init__.py new file mode 100644 index 0000000000..e0c8269fde --- /dev/null +++ b/platypush/runner/__init__.py @@ -0,0 +1,17 @@ +from ._runner import ApplicationRunner + + +def main(*args: str): + """ + Main application entry point. + + This is usually the entry point that you want to use to start your + application, rather than :meth:`platypush.app.main`, as this entry point + wraps the main application in a controllable process. + """ + + app_runner = ApplicationRunner() + app_runner.run(*args) + + +__all__ = ["ApplicationRunner", "main"] diff --git a/platypush/runner/__main__.py b/platypush/runner/__main__.py new file mode 100644 index 0000000000..ab75238d54 --- /dev/null +++ b/platypush/runner/__main__.py @@ -0,0 +1,5 @@ +import sys + +from . import main + +main(sys.argv[1:]) diff --git a/platypush/runner/_app.py b/platypush/runner/_app.py new file mode 100644 index 0000000000..6ff9207be5 --- /dev/null +++ b/platypush/runner/_app.py @@ -0,0 +1,59 @@ +import logging +import os +import signal +import subprocess +import sys +from typing_extensions import override + +from platypush.process import ControllableProcess + + +class ApplicationProcess(ControllableProcess): + """ + Controllable process wrapper interface for the main application. + """ + + def __init__(self, *args: str, pidfile: str, **kwargs): + super().__init__(name='platypush', **kwargs) + + self.logger = logging.getLogger('platypush') + self.args = args + self.pidfile = pidfile + + def __enter__(self) -> "ApplicationProcess": + self.start() + return self + + def __exit__(self, *_, **__): + self.stop() + + @override + def main(self): + self.logger.info('Starting application...') + + with subprocess.Popen( + ['python', '-m', 'platypush.app', *self.args], + stdin=sys.stdin, + stdout=sys.stdout, + stderr=sys.stderr, + ) as app: + try: + app.wait() + except KeyboardInterrupt: + pass + + @override + def on_stop(self): + try: + with open(self.pidfile, 'r') as f: + pid = int(f.read().strip()) + except (OSError, ValueError): + pid = None + + if not pid: + return + + try: + os.kill(pid, signal.SIGINT) + except OSError: + pass diff --git a/platypush/runner/_runner.py b/platypush/runner/_runner.py new file mode 100644 index 0000000000..ac713c9d25 --- /dev/null +++ b/platypush/runner/_runner.py @@ -0,0 +1,91 @@ +import logging +import sys +from threading import Thread +from typing import Optional + +from platypush.cli import parse_cmdline +from platypush.commands import CommandStream + +from ._app import ApplicationProcess + + +class ApplicationRunner: + """ + Runner for the main application. + + It wraps the main application and provides an interface to control it + externally. + """ + + _default_timeout = 0.5 + + def __init__(self): + logging.basicConfig(level=logging.INFO, stream=sys.stdout) + self.logger = logging.getLogger('platypush:runner') + self._proc: Optional[ApplicationProcess] = None + + def _listen(self, stream: CommandStream): + """ + Listens for external application commands and executes them. + """ + while self.is_running: + cmd = stream.read(timeout=self._default_timeout) + if not cmd: + continue + + self.logger.info(cmd) + Thread(target=cmd, args=(self,)).start() + + def _print_version(self): + from platypush import __version__ + + print(__version__) + sys.exit(0) + + def _run(self, *args: str) -> None: + parsed_args = parse_cmdline(args) + + if parsed_args.version: + self._print_version() + + while True: + with ( + CommandStream(parsed_args.ctrl_sock) as stream, + ApplicationProcess( + *args, pidfile=parsed_args.pidfile, timeout=self._default_timeout + ) as self._proc, + ): + try: + self._listen(stream) + except KeyboardInterrupt: + pass + + if self.should_restart: + self.logger.info('Restarting application...') + continue + + break + + def run(self, *args: str) -> None: + try: + self._run(*args) + finally: + self._proc = None + + def stop(self): + if self._proc is not None: + self._proc.stop() + + def restart(self): + if self._proc is not None: + self._proc.mark_for_restart() + + self.stop() + + @property + def is_running(self) -> bool: + return bool(self._proc and self._proc.is_alive()) + + @property + def should_restart(self) -> bool: + return self._proc.should_restart if self._proc is not None else False diff --git a/requirements.txt b/requirements.txt index d61383ca57..22900c9268 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,6 +11,7 @@ frozendict marshmallow marshmallow_dataclass paho-mqtt +psutil python-dateutil python-magic pyyaml diff --git a/setup.py b/setup.py index f71f126250..e67b5713d4 100755 --- a/setup.py +++ b/setup.py @@ -68,6 +68,7 @@ setup( 'frozendict', 'marshmallow', 'marshmallow_dataclass', + 'psutil', 'python-dateutil', 'python-magic', 'pyyaml', @@ -224,7 +225,7 @@ setup( # Support for Graphite integration 'graphite': ['graphyte'], # Support for CPU and memory monitoring and info - 'sys': ['py-cpuinfo', 'psutil'], + 'sys': ['py-cpuinfo'], # Support for nmap integration 'nmap': ['python-nmap'], # Support for zigbee2mqtt From 00863a176eb634c65e6e0d08715eb7172a32bd24 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Tue, 15 Aug 2023 11:13:57 +0200 Subject: [PATCH 17/18] Added `application` plugin. --- docs/source/platypush/plugins/application.rst | 5 +++ docs/source/plugins.rst | 1 + platypush/plugins/application/__init__.py | 31 +++++++++++++++++++ platypush/plugins/application/manifest.yaml | 6 ++++ 4 files changed, 43 insertions(+) create mode 100644 docs/source/platypush/plugins/application.rst create mode 100644 platypush/plugins/application/__init__.py create mode 100644 platypush/plugins/application/manifest.yaml diff --git a/docs/source/platypush/plugins/application.rst b/docs/source/platypush/plugins/application.rst new file mode 100644 index 0000000000..5c0d4696b6 --- /dev/null +++ b/docs/source/platypush/plugins/application.rst @@ -0,0 +1,5 @@ +``application`` +=============== + +.. automodule:: platypush.plugins.application + :members: diff --git a/docs/source/plugins.rst b/docs/source/plugins.rst index cadf898e7d..86a68d5d39 100644 --- a/docs/source/plugins.rst +++ b/docs/source/plugins.rst @@ -8,6 +8,7 @@ Plugins platypush/plugins/adafruit.io.rst platypush/plugins/alarm.rst + platypush/plugins/application.rst platypush/plugins/arduino.rst platypush/plugins/assistant.echo.rst platypush/plugins/assistant.google.rst diff --git a/platypush/plugins/application/__init__.py b/platypush/plugins/application/__init__.py new file mode 100644 index 0000000000..85a30559eb --- /dev/null +++ b/platypush/plugins/application/__init__.py @@ -0,0 +1,31 @@ +from typing import Optional +from platypush.commands import CommandStream, RestartCommand, StopCommand +from platypush.config import Config +from platypush.plugins import Plugin, action + + +class ApplicationPlugin(Plugin): + """ + This plugin is used to control and inspect the application state. + """ + + @property + def _ctrl_sock(self) -> Optional[str]: + """ + :return: The path to the UNIX socket to control the application. + """ + return Config.get('ctrl_sock') # type: ignore + + @action + def stop(self): + """ + Stop the application. + """ + CommandStream(self._ctrl_sock).write(StopCommand()) + + @action + def restart(self): + """ + Restart the application. + """ + CommandStream(self._ctrl_sock).write(RestartCommand()) diff --git a/platypush/plugins/application/manifest.yaml b/platypush/plugins/application/manifest.yaml new file mode 100644 index 0000000000..68cddacb7f --- /dev/null +++ b/platypush/plugins/application/manifest.yaml @@ -0,0 +1,6 @@ +manifest: + events: {} + install: + pip: [] + package: platypush.plugins.application + type: plugin From fef6513cc84769a13b9db0d0f7759cd465585a49 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Tue, 15 Aug 2023 11:40:02 +0200 Subject: [PATCH 18/18] Alpine Linux requires the linux-headers package to build psutil. --- .drone.yml | 4 ++-- examples/docker/Dockerfile | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/.drone.yml b/.drone.yml index 9c9f70fbbb..45b8102cf4 100644 --- a/.drone.yml +++ b/.drone.yml @@ -66,11 +66,11 @@ steps: image: python:3.11-alpine commands: - apk add --update --no-cache redis - - apk add --update --no-cache --virtual build-base g++ rust + - apk add --update --no-cache --virtual build-base g++ rust linux-headers - pip install -U pip - pip install . - pip install -r requirements-tests.txt - - apk del build-base g++ rust + - apk del build-base g++ rust linux-headers - pytest tests - name: build-ui diff --git a/examples/docker/Dockerfile b/examples/docker/Dockerfile index 56ef34b182..374ffbdc5d 100644 --- a/examples/docker/Dockerfile +++ b/examples/docker/Dockerfile @@ -7,10 +7,10 @@ FROM python:3.11-alpine RUN mkdir -p /install /app COPY . /install RUN apk add --update --no-cache redis -RUN apk add --update --no-cache --virtual build-base g++ rust +RUN apk add --update --no-cache --virtual build-base g++ rust linux-headers RUN pip install -U pip RUN cd /install && pip install . -RUN apk del build-base g++ rust +RUN apk del build-base g++ rust linux-headers EXPOSE 8008 VOLUME /app/config