diff --git a/.drone.yml b/.drone.yml index 9c9f70fb..45b8102c 100644 --- a/.drone.yml +++ b/.drone.yml @@ -66,11 +66,11 @@ steps: image: python:3.11-alpine commands: - apk add --update --no-cache redis - - apk add --update --no-cache --virtual build-base g++ rust + - apk add --update --no-cache --virtual build-base g++ rust linux-headers - pip install -U pip - pip install . - pip install -r requirements-tests.txt - - apk del build-base g++ rust + - apk del build-base g++ rust linux-headers - pytest tests - name: build-ui diff --git a/docs/source/platypush/plugins/application.rst b/docs/source/platypush/plugins/application.rst new file mode 100644 index 00000000..5c0d4696 --- /dev/null +++ b/docs/source/platypush/plugins/application.rst @@ -0,0 +1,5 @@ +``application`` +=============== + +.. automodule:: platypush.plugins.application + :members: diff --git a/docs/source/plugins.rst b/docs/source/plugins.rst index cadf898e..86a68d5d 100644 --- a/docs/source/plugins.rst +++ b/docs/source/plugins.rst @@ -8,6 +8,7 @@ Plugins platypush/plugins/adafruit.io.rst platypush/plugins/alarm.rst + platypush/plugins/application.rst platypush/plugins/arduino.rst platypush/plugins/assistant.echo.rst platypush/plugins/assistant.google.rst diff --git a/examples/docker/Dockerfile b/examples/docker/Dockerfile index 56ef34b1..374ffbdc 100644 --- a/examples/docker/Dockerfile +++ b/examples/docker/Dockerfile @@ -7,10 +7,10 @@ FROM python:3.11-alpine RUN mkdir -p /install /app COPY . /install RUN apk add --update --no-cache redis -RUN apk add --update --no-cache --virtual build-base g++ rust +RUN apk add --update --no-cache --virtual build-base g++ rust linux-headers RUN pip install -U pip RUN cd /install && pip install . -RUN apk del build-base g++ rust +RUN apk del build-base g++ rust linux-headers EXPOSE 8008 VOLUME /app/config diff --git a/platypush/__init__.py b/platypush/__init__.py index f4e9ecb9..ede8fee4 100644 --- a/platypush/__init__.py +++ b/platypush/__init__.py @@ -5,12 +5,13 @@ Platypush .. license: MIT """ -from .app import Application, main +from .app import Application from .config import Config from .context import get_backend, get_bus, get_plugin from .message.event import Event from .message.request import Request from .message.response import Response +from .runner import main __author__ = 'Fabio Manganiello ' diff --git a/platypush/__main__.py b/platypush/__main__.py index 2b4e6908..737e265b 100644 --- a/platypush/__main__.py +++ b/platypush/__main__.py @@ -1,10 +1,5 @@ import sys -from platypush.app import main +from platypush.runner import main - -if __name__ == '__main__': - main(*sys.argv[1:]) - - -# vim:sw=4:ts=4:et: +main(*sys.argv[1:]) diff --git a/platypush/app/__init__.py b/platypush/app/__init__.py new file mode 100644 index 00000000..4bd52831 --- /dev/null +++ b/platypush/app/__init__.py @@ -0,0 +1,4 @@ +from ._app import Application, main + + +__all__ = ["Application", "main"] diff --git a/platypush/app/__main__.py b/platypush/app/__main__.py new file mode 100644 index 00000000..ed2e4220 --- /dev/null +++ b/platypush/app/__main__.py @@ -0,0 +1,7 @@ +import sys + +from ._app import main + + +if __name__ == '__main__': + sys.exit(main(*sys.argv[1:])) diff --git a/platypush/app.py b/platypush/app/_app.py similarity index 64% rename from platypush/app.py rename to platypush/app/_app.py index cea035c7..e95d0db3 100644 --- a/platypush/app.py +++ b/platypush/app/_app.py @@ -1,23 +1,26 @@ -import argparse +from contextlib import contextmanager import logging import os +import signal import subprocess import sys -from typing import Optional +from typing import Optional, Sequence -from .bus import Bus -from .bus.redis import RedisBus -from .config import Config -from .context import register_backends, register_plugins -from .cron.scheduler import CronScheduler -from .entities import init_entities_engine, EntitiesEngine -from .event.processor import EventProcessor -from .logger import Logger -from .message.event import Event -from .message.event.application import ApplicationStartedEvent -from .message.request import Request -from .message.response import Response -from .utils import get_enabled_plugins, get_redis_conf +from platypush.bus import Bus +from platypush.bus.redis import RedisBus +from platypush.cli import parse_cmdline +from platypush.commands import CommandStream +from platypush.config import Config +from platypush.context import register_backends, register_plugins +from platypush.cron.scheduler import CronScheduler +from platypush.entities import init_entities_engine, EntitiesEngine +from platypush.event.processor import EventProcessor +from platypush.logger import Logger +from platypush.message.event import Event +from platypush.message.event.application import ApplicationStartedEvent +from platypush.message.request import Request +from platypush.message.response import Response +from platypush.utils import get_enabled_plugins, get_redis_conf log = logging.getLogger('platypush') @@ -25,9 +28,6 @@ log = logging.getLogger('platypush') class Application: """Main class for the Platypush application.""" - # Default bus queue name - _default_redis_queue = 'platypush/bus' - # Default Redis port _default_redis_port = 6379 @@ -51,6 +51,7 @@ class Application: start_redis: bool = False, redis_host: Optional[str] = None, redis_port: Optional[int] = None, + ctrl_sock: Optional[str] = None, ): """ :param config_file: Configuration file override (default: None). @@ -82,21 +83,22 @@ class Application: the settings in the ``redis`` section of the configuration file. :param redis_port: Port of the local Redis server. It overrides the settings in the ``redis`` section of the configuration file. + :param ctrl_sock: If set, it identifies a path to a UNIX domain socket + that the application can use to send control messages (e.g. STOP + and RESTART) to its parent. """ self.pidfile = pidfile - if pidfile: - with open(pidfile, 'w') as f: - f.write(str(os.getpid())) - self.bus: Optional[Bus] = None - self.redis_queue = redis_queue or self._default_redis_queue + self.redis_queue = redis_queue or RedisBus.DEFAULT_REDIS_QUEUE self.config_file = config_file self._verbose = verbose self._logsdir = ( os.path.abspath(os.path.expanduser(logsdir)) if logsdir else None ) + Config.init(self.config_file) + Config.set('ctrl_sock', ctrl_sock) if workdir: Config.set('workdir', os.path.abspath(os.path.expanduser(workdir))) @@ -113,6 +115,7 @@ class Application: self.redis_port = redis_port self.redis_conf = {} self._redis_proc: Optional[subprocess.Popen] = None + self.cmd_stream = CommandStream(ctrl_sock) self._init_bus() self._init_logging() @@ -187,133 +190,11 @@ class Application: self._redis_proc = None @classmethod - def build(cls, *args: str): + def from_cmdline(cls, args: Sequence[str]) -> "Application": """ Build the app from command line arguments. """ - from . import __version__ - - parser = argparse.ArgumentParser() - - parser.add_argument( - '--config', - '-c', - dest='config', - required=False, - default=None, - help='Custom location for the configuration file', - ) - - parser.add_argument( - '--workdir', - '-w', - dest='workdir', - required=False, - default=None, - help='Custom working directory to be used for the application', - ) - - parser.add_argument( - '--logsdir', - '-l', - dest='logsdir', - required=False, - default=None, - help='Store logs in the specified directory. By default, the ' - '`[logging.]filename` configuration option will be used. If not ' - 'set, logging will be sent to stdout and stderr.', - ) - - parser.add_argument( - '--version', - dest='version', - required=False, - action='store_true', - help="Print the current version and exit", - ) - - parser.add_argument( - '--verbose', - '-v', - dest='verbose', - required=False, - action='store_true', - help="Enable verbose/debug logging", - ) - - parser.add_argument( - '--pidfile', - '-P', - dest='pidfile', - required=False, - default=None, - help="File where platypush will " - + "store its PID, useful if you're planning to " - + "integrate it in a service", - ) - - parser.add_argument( - '--no-capture-stdout', - dest='no_capture_stdout', - required=False, - action='store_true', - help="Set this flag if you have max stack depth " - + "exceeded errors so stdout won't be captured by " - + "the logging system", - ) - - parser.add_argument( - '--no-capture-stderr', - dest='no_capture_stderr', - required=False, - action='store_true', - help="Set this flag if you have max stack depth " - + "exceeded errors so stderr won't be captured by " - + "the logging system", - ) - - parser.add_argument( - '--redis-queue', - dest='redis_queue', - required=False, - default=cls._default_redis_queue, - help="Name of the Redis queue to be used to internally deliver messages " - "(default: platypush/bus)", - ) - - parser.add_argument( - '--start-redis', - dest='start_redis', - required=False, - action='store_true', - help="Set this flag if you want to run and manage Redis internally " - "from the app rather than using an external server. It requires the " - "redis-server executable to be present in the path", - ) - - parser.add_argument( - '--redis-host', - dest='redis_host', - required=False, - default=None, - help="Overrides the host specified in the redis section of the " - "configuration file", - ) - - parser.add_argument( - '--redis-port', - dest='redis_port', - required=False, - default=None, - help="Overrides the port specified in the redis section of the " - "configuration file", - ) - - opts, _ = parser.parse_known_args(args) - if opts.version: - print(__version__) - sys.exit(0) - + opts = parse_cmdline(args) return cls( config_file=opts.config, workdir=opts.workdir, @@ -326,6 +207,7 @@ class Application: start_redis=opts.start_redis, redis_host=opts.redis_host, redis_port=opts.redis_port, + ctrl_sock=opts.ctrl_sock, ) def on_message(self): @@ -351,7 +233,7 @@ class Application: self.requests_to_process and self.processed_requests >= self.requests_to_process ): - self.stop_app() + self.stop() elif isinstance(msg, Response): msg.log() elif isinstance(msg, Event): @@ -360,36 +242,68 @@ class Application: return _f - def stop_app(self): + def stop(self): """Stops the backends and the bus.""" - from .plugins import RunnablePlugin + from platypush.plugins import RunnablePlugin - if self.backends: - for backend in self.backends.values(): - backend.stop() + log.info('Stopping the application') + backends = (self.backends or {}).copy().values() + runnable_plugins = [ + plugin + for plugin in get_enabled_plugins().values() + if isinstance(plugin, RunnablePlugin) + ] - for plugin in get_enabled_plugins().values(): - if isinstance(plugin, RunnablePlugin): - plugin.stop() + for backend in backends: + backend.stop() + + for plugin in runnable_plugins: + plugin.stop() + + for backend in backends: + backend.wait_stop() + + for plugin in runnable_plugins: + plugin.wait_stop() + + if self.entities_engine: + self.entities_engine.stop() + self.entities_engine.wait_stop() + self.entities_engine = None + + if self.cron_scheduler: + self.cron_scheduler.stop() + self.cron_scheduler.wait_stop() + self.cron_scheduler = None if self.bus: self.bus.stop() self.bus = None - if self.cron_scheduler: - self.cron_scheduler.stop() - self.cron_scheduler = None - - if self.entities_engine: - self.entities_engine.stop() - self.entities_engine = None - if self.start_redis: self._stop_redis() - def run(self): - """Start the daemon.""" - from . import __version__ + log.info('Exiting application') + + @contextmanager + def _open_pidfile(self): + if self.pidfile: + try: + with open(self.pidfile, 'w') as f: + f.write(str(os.getpid())) + except OSError as e: + log.warning('Failed to create PID file %s: %s', self.pidfile, e) + + yield + + if self.pidfile: + try: + os.remove(self.pidfile) + except OSError as e: + log.warning('Failed to remove PID file %s: %s', self.pidfile, e) + + def _run(self): + from platypush import __version__ if not self.no_capture_stdout: sys.stdout = Logger(log.info) @@ -428,16 +342,30 @@ class Application: self.bus.poll() except KeyboardInterrupt: log.info('SIGINT received, terminating application') + # Ignore other SIGINT signals + signal.signal(signal.SIGINT, signal.SIG_IGN) finally: - self.stop_app() + self.stop() + + def run(self): + """Run the application.""" + + with self._open_pidfile(): + self._run() def main(*args: str): """ Application entry point. """ - app = Application.build(*args) - app.run() + app = Application.from_cmdline(args) + + try: + app.run() + except KeyboardInterrupt: + pass + + return 0 # vim:sw=4:ts=4:et: diff --git a/platypush/backend/__init__.py b/platypush/backend/__init__.py index 1afc4385..c8b0ea65 100644 --- a/platypush/backend/__init__.py +++ b/platypush/backend/__init__.py @@ -6,27 +6,28 @@ import time from threading import Thread, Event as ThreadEvent, get_ident from typing import Optional, Dict +from platypush import __version__ from platypush.bus import Bus from platypush.common import ExtensionWithManifest from platypush.config import Config from platypush.context import get_backend +from platypush.event import EventGenerator +from platypush.message import Message +from platypush.message.event import Event from platypush.message.event.zeroconf import ( ZeroconfServiceAddedEvent, ZeroconfServiceRemovedEvent, ) -from platypush.utils import ( - set_timeout, - clear_timeout, - get_redis_queue_name_by_message, - get_backend_name_by_class, -) - -from platypush import __version__ -from platypush.event import EventGenerator -from platypush.message import Message -from platypush.message.event import Event from platypush.message.request import Request from platypush.message.response import Response +from platypush.utils import ( + clear_timeout, + get_backend_name_by_class, + get_redis, + get_redis_queue_name_by_message, + get_remaining_timeout, + set_timeout, +) class Backend(Thread, EventGenerator, ExtensionWithManifest): @@ -68,6 +69,7 @@ class Backend(Thread, EventGenerator, ExtensionWithManifest): self.device_id = Config.get('device_id') self.thread_id = None self._stop_event = ThreadEvent() + self._stop_thread: Optional[Thread] = None self._kwargs = kwargs self.logger = logging.getLogger( 'platypush:backend:' + get_backend_name_by_class(self.__class__) @@ -299,30 +301,38 @@ class Backend(Thread, EventGenerator, ExtensionWithManifest): self._stop_event.set() self.unregister_service() self.on_stop() + self._stop_thread = None - Thread(target=_async_stop).start() + if not (self._stop_thread and self._stop_thread.is_alive()): + self._stop_thread = Thread(target=_async_stop) + self._stop_thread.start() def should_stop(self): + """ + :return: True if the backend thread should be stopped, False otherwise. + """ return self._stop_event.is_set() def wait_stop(self, timeout=None) -> bool: - return self._stop_event.wait(timeout) + """ + Waits for the backend thread to stop. - def _get_redis(self): - import redis + :param timeout: The maximum time to wait for the backend thread to stop (default: None) + :return: True if the backend thread has stopped, False otherwise. + """ + start = time.time() - redis_backend = get_backend('redis') - if not redis_backend: - self.logger.warning( - 'Redis backend not configured - some ' - 'web server features may not be working properly' - ) - redis_args = {} - else: - redis_args = redis_backend.redis_args + if self._stop_thread: + try: + self._stop_thread.join( + get_remaining_timeout(timeout=timeout, start=start) + ) + except AttributeError: + pass - redis = redis.Redis(**redis_args) - return redis + return self._stop_event.wait( + get_remaining_timeout(timeout=timeout, start=start) + ) def get_message_response(self, msg): queue = get_redis_queue_name_by_message(msg) @@ -331,7 +341,7 @@ class Backend(Thread, EventGenerator, ExtensionWithManifest): return None try: - redis = self._get_redis() + redis = get_redis() response = redis.blpop(queue, timeout=60) if response and len(response) > 1: response = Message.build(response[1]) diff --git a/platypush/backend/http/__init__.py b/platypush/backend/http/__init__.py index 4df3ce8a..6bda9065 100644 --- a/platypush/backend/http/__init__.py +++ b/platypush/backend/http/__init__.py @@ -2,13 +2,17 @@ import asyncio import os import pathlib import secrets +import signal import threading +from functools import partial from multiprocessing import Process from time import time -from typing import List, Mapping, Optional -from tornado.httpserver import HTTPServer +from typing import Mapping, Optional +import psutil + +from tornado.httpserver import HTTPServer from tornado.netutil import bind_sockets from tornado.process import cpu_count, fork_processes from tornado.wsgi import WSGIContainer @@ -18,9 +22,9 @@ from platypush.backend import Backend from platypush.backend.http.app import application from platypush.backend.http.app.utils import get_streaming_routes, get_ws_routes from platypush.backend.http.app.ws.events import WSEventProxy - from platypush.bus.redis import RedisBus from platypush.config import Config +from platypush.utils import get_remaining_timeout class HttpBackend(Backend): @@ -191,6 +195,9 @@ class HttpBackend(Backend): _DEFAULT_HTTP_PORT = 8008 """The default listen port for the webserver.""" + _STOP_TIMEOUT = 5 + """How long we should wait (in seconds) before killing the worker processes.""" + def __init__( self, port: int = _DEFAULT_HTTP_PORT, @@ -227,7 +234,6 @@ class HttpBackend(Backend): self.port = port self._server_proc: Optional[Process] = None - self._workers: List[Process] = [] self._service_registry_thread = None self.bind_address = bind_address @@ -254,35 +260,37 @@ class HttpBackend(Backend): """On backend stop""" super().on_stop() self.logger.info('Received STOP event on HttpBackend') - - start_time = time() - timeout = 5 - workers = self._workers.copy() - - for i, worker in enumerate(workers[::-1]): - if worker and worker.is_alive(): - worker.terminate() - worker.join(timeout=max(0, start_time + timeout - time())) - - if worker and worker.is_alive(): - worker.kill() - self._workers.pop(i) + start = time() + remaining_time: partial[float] = partial( # type: ignore + get_remaining_timeout, timeout=self._STOP_TIMEOUT, start=start + ) if self._server_proc: - self._server_proc.terminate() - self._server_proc.join(timeout=5) - self._server_proc = None + if self._server_proc.pid: + try: + os.kill(self._server_proc.pid, signal.SIGINT) + except OSError: + pass + + if self._server_proc and self._server_proc.is_alive(): + self._server_proc.join(timeout=remaining_time() / 2) + try: + self._server_proc.terminate() + self._server_proc.join(timeout=remaining_time() / 2) + except AttributeError: + pass if self._server_proc and self._server_proc.is_alive(): self._server_proc.kill() self._server_proc = None - self.logger.info('HTTP server terminated') if self._service_registry_thread and self._service_registry_thread.is_alive(): - self._service_registry_thread.join(timeout=5) + self._service_registry_thread.join(timeout=remaining_time()) self._service_registry_thread = None + self.logger.info('HTTP server terminated') + def notify_web_clients(self, event): """Notify all the connected web clients (over websocket) of a new event""" WSEventProxy.publish(event) # noqa: E1120 @@ -344,7 +352,10 @@ class HttpBackend(Backend): try: await asyncio.Event().wait() except (asyncio.CancelledError, KeyboardInterrupt): - return + pass + finally: + server.stop() + await server.close_all_connections() def _web_server_proc(self): self.logger.info( @@ -371,7 +382,65 @@ class HttpBackend(Backend): future = self._post_fork_main(sockets) asyncio.run(future) except (asyncio.CancelledError, KeyboardInterrupt): - return + pass + finally: + self._stop_workers() + + def _stop_workers(self): + """ + Stop all the worker processes. + + We have to run this manually on server termination because of a + long-standing issue with Tornado not being able to wind down the forked + workers when the server terminates: + https://github.com/tornadoweb/tornado/issues/1912. + """ + parent_pid = ( + self._server_proc.pid + if self._server_proc and self._server_proc.pid + else None + ) + + if not parent_pid: + return + + try: + cur_proc = psutil.Process(parent_pid) + except psutil.NoSuchProcess: + return + + # Send a SIGTERM to all the children + children = cur_proc.children() + for child in children: + if child.pid != parent_pid and child.is_running(): + try: + os.kill(child.pid, signal.SIGTERM) + except OSError as e: + self.logger.warning( + 'Could not send SIGTERM to PID %d: %s', child.pid, e + ) + + # Initialize the timeout + start = time() + remaining_time: partial[int] = partial( # type: ignore + get_remaining_timeout, timeout=self._STOP_TIMEOUT, start=start, cls=int + ) + + # Wait for all children to terminate (with timeout) + for child in children: + if child.pid != parent_pid and child.is_running(): + try: + child.wait(timeout=remaining_time()) + except TimeoutError: + pass + + # Send a SIGKILL to any child process that is still running + for child in children: + if child.pid != parent_pid and child.is_running(): + try: + child.kill() + except OSError: + pass def _start_web_server(self): self._server_proc = Process(target=self._web_server_proc) diff --git a/platypush/bus/redis.py b/platypush/bus/redis.py index e8273a12..83ded8af 100644 --- a/platypush/bus/redis.py +++ b/platypush/bus/redis.py @@ -1,6 +1,6 @@ import logging import threading -from typing import Optional +from typing import Final, Optional from platypush.bus import Bus from platypush.message import Message @@ -13,7 +13,7 @@ class RedisBus(Bus): Overrides the in-process in-memory local bus with a Redis bus """ - _DEFAULT_REDIS_QUEUE = 'platypush/bus' + DEFAULT_REDIS_QUEUE: Final[str] = 'platypush/bus' def __init__(self, *args, on_message=None, redis_queue=None, **kwargs): from platypush.utils import get_redis @@ -21,7 +21,7 @@ class RedisBus(Bus): super().__init__(on_message=on_message) self.redis = get_redis(*args, **kwargs) self.redis_args = kwargs - self.redis_queue = redis_queue or self._DEFAULT_REDIS_QUEUE + self.redis_queue = redis_queue or self.DEFAULT_REDIS_QUEUE self.on_message = on_message self.thread_id = threading.get_ident() diff --git a/platypush/cli.py b/platypush/cli.py new file mode 100644 index 00000000..9864c16b --- /dev/null +++ b/platypush/cli.py @@ -0,0 +1,139 @@ +import argparse +from typing import Sequence + +from platypush.bus.redis import RedisBus +from platypush.utils import get_default_pid_file + + +def parse_cmdline(args: Sequence[str]) -> argparse.Namespace: + """ + Parse command-line arguments from a list of strings. + """ + parser = argparse.ArgumentParser() + + parser.add_argument( + '--config', + '-c', + dest='config', + required=False, + default=None, + help='Custom location for the configuration file', + ) + + parser.add_argument( + '--workdir', + '-w', + dest='workdir', + required=False, + default=None, + help='Custom working directory to be used for the application', + ) + + parser.add_argument( + '--logsdir', + '-l', + dest='logsdir', + required=False, + default=None, + help='Store logs in the specified directory. By default, the ' + '`[logging.]filename` configuration option will be used. If not ' + 'set, logging will be sent to stdout and stderr.', + ) + + parser.add_argument( + '--version', + dest='version', + required=False, + action='store_true', + help="Print the current version and exit", + ) + + parser.add_argument( + '--verbose', + '-v', + dest='verbose', + required=False, + action='store_true', + help="Enable verbose/debug logging", + ) + + parser.add_argument( + '--pidfile', + '-P', + dest='pidfile', + required=False, + default=get_default_pid_file(), + help="File where platypush will " + + "store its PID, useful if you're planning to " + + f"integrate it in a service (default: {get_default_pid_file()})", + ) + + parser.add_argument( + '--no-capture-stdout', + dest='no_capture_stdout', + required=False, + action='store_true', + help="Set this flag if you have max stack depth " + + "exceeded errors so stdout won't be captured by " + + "the logging system", + ) + + parser.add_argument( + '--no-capture-stderr', + dest='no_capture_stderr', + required=False, + action='store_true', + help="Set this flag if you have max stack depth " + + "exceeded errors so stderr won't be captured by " + + "the logging system", + ) + + parser.add_argument( + '--redis-queue', + dest='redis_queue', + required=False, + default=RedisBus.DEFAULT_REDIS_QUEUE, + help="Name of the Redis queue to be used to internally deliver messages " + f"(default: {RedisBus.DEFAULT_REDIS_QUEUE})", + ) + + parser.add_argument( + '--start-redis', + dest='start_redis', + required=False, + action='store_true', + help="Set this flag if you want to run and manage Redis internally " + "from the app rather than using an external server. It requires the " + "redis-server executable to be present in the path", + ) + + parser.add_argument( + '--redis-host', + dest='redis_host', + required=False, + default=None, + help="Overrides the host specified in the redis section of the " + "configuration file", + ) + + parser.add_argument( + '--redis-port', + dest='redis_port', + required=False, + default=None, + help="Overrides the port specified in the redis section of the " + "configuration file", + ) + + parser.add_argument( + '--ctrl-sock', + dest='ctrl_sock', + required=False, + default=None, + help="If set, it identifies a path to a UNIX domain socket that " + "the application can use to send control messages (e.g. STOP and " + "RESTART) to its parent.", + ) + + opts, _ = parser.parse_known_args(args) + return opts diff --git a/platypush/commands/__init__.py b/platypush/commands/__init__.py new file mode 100644 index 00000000..73a4eaf4 --- /dev/null +++ b/platypush/commands/__init__.py @@ -0,0 +1,5 @@ +from ._base import Command +from ._commands import RestartCommand, StopCommand +from ._stream import CommandStream + +__all__ = ["Command", "CommandStream", "RestartCommand", "StopCommand"] diff --git a/platypush/commands/_base.py b/platypush/commands/_base.py new file mode 100644 index 00000000..e0296873 --- /dev/null +++ b/platypush/commands/_base.py @@ -0,0 +1,78 @@ +from abc import ABC, abstractmethod +import json +from logging import getLogger, Logger + + +class Command(ABC): + """ + Base class for application commands. + """ + + END_OF_COMMAND = b'\x00' + """End-of-command marker.""" + + def __init__(self, **args) -> None: + self.args = args + + @property + def logger(self) -> Logger: + """ + The command class logger. + """ + return getLogger(self.__class__.__name__) + + @abstractmethod + def __call__(self, app, *_, **__): + """ + Execute the command. + """ + raise NotImplementedError() + + def __str__(self) -> str: + """ + :return: A JSON representation of the command. + """ + return json.dumps( + { + 'type': 'command', + 'command': self.__class__.__name__, + 'args': self.args, + } + ) + + def to_bytes(self): + """ + :return: A JSON representation of the command. + """ + return str(self).encode('utf-8') + self.END_OF_COMMAND + + @classmethod + def parse(cls, data: bytes) -> "Command": + """ + :param data: A JSON representation of the command. + :raise ValueError: If the data is invalid. + :return: The command instance or None if the data is invalid. + """ + import platypush.commands + + try: + json_data = json.loads(data.decode('utf-8')) + except json.JSONDecodeError as e: + raise ValueError from e + + kind = json_data.pop('type', None) + if kind != 'command': + raise ValueError(f'Invalid command type: {kind}') + + command_name = json_data.get('command') + if not command_name: + raise ValueError(f'Invalid command name: {command_name}') + + cmd_class = getattr(platypush.commands, command_name, None) + if not (cmd_class and issubclass(cmd_class, Command)): + raise ValueError(f'Invalid command class: {command_name}') + + try: + return cmd_class(**json_data.get('args', {})) + except Exception as e: + raise ValueError(e) from e diff --git a/platypush/commands/_commands/__init__.py b/platypush/commands/_commands/__init__.py new file mode 100644 index 00000000..22cdf7be --- /dev/null +++ b/platypush/commands/_commands/__init__.py @@ -0,0 +1,2 @@ +# flake8: noqa +from ._app_ctrl import * diff --git a/platypush/commands/_commands/_app_ctrl.py b/platypush/commands/_commands/_app_ctrl.py new file mode 100644 index 00000000..780d8b15 --- /dev/null +++ b/platypush/commands/_commands/_app_ctrl.py @@ -0,0 +1,25 @@ +from typing_extensions import override + +from platypush.commands import Command + + +class StopCommand(Command): + """ + Stop the application. + """ + + @override + def __call__(self, app, *_, **__): + self.logger.info('Received StopApplication command.') + app.stop() + + +class RestartCommand(Command): + """ + Restart the application. + """ + + @override + def __call__(self, app, *_, **__): + self.logger.info('Received RestartApplication command.') + app.restart() diff --git a/platypush/commands/_reader.py b/platypush/commands/_reader.py new file mode 100644 index 00000000..8eee5a3f --- /dev/null +++ b/platypush/commands/_reader.py @@ -0,0 +1,69 @@ +from logging import getLogger +from socket import socket +from typing import Optional + +from platypush.commands import Command + + +# pylint: disable=too-few-public-methods +class CommandReader: + """ + Reads command objects from file-like I/O objects. + """ + + _max_bufsize = 8192 + """Maximum size of a command that can be queued in the stream.""" + + _bufsize = 1024 + """Size of the buffer used to read commands from the socket.""" + + def __init__(self): + self.logger = getLogger(__name__) + self._buf = bytes() + + def _parse_command(self, data: bytes) -> Optional[Command]: + """ + Parses a command from the received data. + + :param data: Data received from the socket + :return: The parsed command + """ + try: + return Command.parse(data) + except ValueError as e: + self.logger.warning('Error while parsing command: %s', e) + return None + + def read(self, sock: socket) -> Optional[Command]: + """ + Parses the next command from the file-like I/O object. + + :param fp: The file-like I/O object to read from. + :return: The parsed command. + """ + try: + data = sock.recv(self._bufsize) + except OSError as e: + self.logger.warning( + 'Error while reading from socket %s: %s', sock.getsockname(), e + ) + return None + + for ch in data: + if bytes([ch]) == Command.END_OF_COMMAND: + cmd = self._parse_command(self._buf) + self._buf = bytes() + + if cmd: + return cmd + elif len(self._buf) >= self._max_bufsize: + self.logger.warning( + 'The received command is too long: length=%d', len(self._buf) + ) + + self._buf = bytes() + break + else: + self._buf += bytes([ch]) + + return None diff --git a/platypush/commands/_stream.py b/platypush/commands/_stream.py new file mode 100644 index 00000000..3ae185d3 --- /dev/null +++ b/platypush/commands/_stream.py @@ -0,0 +1,139 @@ +from multiprocessing import Queue +import os +from queue import Empty +import socket +import tempfile +from typing import Optional +from typing_extensions import override + +from platypush.process import ControllableProcess + +from ._base import Command +from ._reader import CommandReader +from ._writer import CommandWriter + + +class CommandStream(ControllableProcess): + """ + The command stream is an abstraction built around a UNIX socket that allows + the application to communicate commands to its controller. + + :param path: Path to the UNIX socket + """ + + _max_queue_size = 100 + """Maximum number of commands that can be queued in the stream.""" + + _default_sock_path = tempfile.gettempdir() + '/platypush-cmd-stream.sock' + """Default path to the UNIX socket.""" + + _close_timeout = 5.0 + """Close the client socket after this amount of seconds.""" + + def __init__(self, path: Optional[str] = None): + super().__init__(name='platypush:cmd:stream') + self.path = os.path.abspath(os.path.expanduser(path or self._default_sock_path)) + self._sock: Optional[socket.socket] = None + self._cmd_queue: Queue["Command"] = Queue() + + def reset(self): + if self._sock is not None: + try: + self._sock.close() + except socket.error: + pass + + self._sock = None + + try: + os.unlink(self.path) + except FileNotFoundError: + pass + + self._cmd_queue.close() + self._cmd_queue = Queue() + + @override + def close(self) -> None: + self.reset() + return super().close() + + def __enter__(self) -> "CommandStream": + self.reset() + sock = self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + sock.bind(self.path) + os.chmod(self.path, 0o600) + sock.listen(1) + self.start() + return self + + def __exit__(self, *_, **__): + self.terminate() + self.join() + self.close() + + def _serve(self, sock: socket.socket): + """ + Serves the command stream. + + :param sock: Client socket to serve + """ + reader = CommandReader() + cmd = reader.read(sock) + if cmd: + self._cmd_queue.put_nowait(cmd) + + def read(self, timeout: Optional[float] = None) -> Optional[Command]: + """ + Reads commands from the command stream. + + :param timeout: Maximum time to wait for a command. + :return: The command that was received, if any. + """ + try: + return self._cmd_queue.get(timeout=timeout) + except Empty: + return None + + def write(self, cmd: Command) -> None: + """ + Writes a command to the command stream. + + :param cmd: Command to write + :raise AssertionError: If the command cannot be written + """ + sock = self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + try: + sock.connect(self.path) + self.logger.debug('Sending command: %s', cmd) + CommandWriter().write(cmd, sock) + except OSError as e: + raise AssertionError(f'Unable to connect to socket {self.path}: {e}') from e + finally: + sock.close() + + @staticmethod + def _close_client(sock: socket.socket): + try: + sock.close() + except OSError: + pass + + @override + def main(self): + while self._sock and not self.should_stop: + sock = self._sock + + try: + conn, _ = sock.accept() + except ConnectionResetError: + continue + except KeyboardInterrupt: + break + + try: + self._serve(conn) + except Exception as e: + self.logger.warning('Unexpected socket error: %s', e, exc_info=True) + finally: + self._close_client(conn) diff --git a/platypush/commands/_writer.py b/platypush/commands/_writer.py new file mode 100644 index 00000000..cf701128 --- /dev/null +++ b/platypush/commands/_writer.py @@ -0,0 +1,25 @@ +from logging import getLogger +from socket import socket + +from platypush.commands import Command + + +# pylint: disable=too-few-public-methods +class CommandWriter: + """ + Writes command objects to file-like I/O objects. + """ + + def __init__(self): + self.logger = getLogger(__name__) + + def write(self, cmd: Command, sock: socket): + """ + Writes a command to a file-like I/O object. + + :param cmd: The command to write. + :param fp: The file-like I/O object to write to. + """ + + buf = cmd.to_bytes() + sock.sendall(buf) diff --git a/platypush/cron/scheduler.py b/platypush/cron/scheduler.py index 55b40a98..b7d816fe 100644 --- a/platypush/cron/scheduler.py +++ b/platypush/cron/scheduler.py @@ -2,13 +2,14 @@ import datetime import enum import logging import threading -from typing import Dict +import time +from typing import Dict, Optional import croniter from dateutil.tz import gettz from platypush.procedure import Procedure -from platypush.utils import is_functional_cron +from platypush.utils import get_remaining_timeout, is_functional_cron logger = logging.getLogger('platypush:cron') @@ -198,6 +199,20 @@ class CronScheduler(threading.Thread): def should_stop(self): return self._should_stop.is_set() + def wait_stop(self, timeout: Optional[float] = None): + start = time.time() + stopped = self._should_stop.wait( + timeout=get_remaining_timeout(timeout=timeout, start=start) + ) + + if not stopped: + raise TimeoutError( + f'Timeout waiting for {self.__class__.__name__} to stop.' + ) + + if threading.get_ident() != self.ident: + self.join(timeout=get_remaining_timeout(timeout=timeout, start=start)) + def run(self): logger.info('Running cron scheduler') diff --git a/platypush/entities/_engine/__init__.py b/platypush/entities/_engine/__init__.py index cdd90874..0f3a22f3 100644 --- a/platypush/entities/_engine/__init__.py +++ b/platypush/entities/_engine/__init__.py @@ -1,5 +1,6 @@ from logging import getLogger -from threading import Thread, Event +from threading import Thread, Event, get_ident +from time import time from typing import Dict, Optional from platypush.context import get_bus @@ -9,6 +10,7 @@ from platypush.message.event.entities import EntityUpdateEvent from platypush.entities._base import EntityKey, EntitySavedCallback from platypush.entities._engine.queue import EntitiesQueue from platypush.entities._engine.repo import EntitiesRepository +from platypush.utils import get_remaining_timeout class EntitiesEngine(Thread): @@ -69,6 +71,20 @@ class EntitiesEngine(Thread): def stop(self): self._should_stop.set() + def wait_stop(self, timeout: Optional[float] = None): + start = time() + stopped = self._should_stop.wait( + timeout=get_remaining_timeout(timeout=timeout, start=start) + ) + + if not stopped: + raise TimeoutError( + f'Timeout waiting for {self.__class__.__name__} to stop.' + ) + + if get_ident() != self.ident: + self.join(timeout=get_remaining_timeout(timeout=timeout, start=start)) + def notify(self, *entities: Entity): """ Trigger an EntityUpdateEvent if the entity has been persisted, or queue diff --git a/platypush/plugins/application/__init__.py b/platypush/plugins/application/__init__.py new file mode 100644 index 00000000..85a30559 --- /dev/null +++ b/platypush/plugins/application/__init__.py @@ -0,0 +1,31 @@ +from typing import Optional +from platypush.commands import CommandStream, RestartCommand, StopCommand +from platypush.config import Config +from platypush.plugins import Plugin, action + + +class ApplicationPlugin(Plugin): + """ + This plugin is used to control and inspect the application state. + """ + + @property + def _ctrl_sock(self) -> Optional[str]: + """ + :return: The path to the UNIX socket to control the application. + """ + return Config.get('ctrl_sock') # type: ignore + + @action + def stop(self): + """ + Stop the application. + """ + CommandStream(self._ctrl_sock).write(StopCommand()) + + @action + def restart(self): + """ + Restart the application. + """ + CommandStream(self._ctrl_sock).write(RestartCommand()) diff --git a/platypush/plugins/application/manifest.yaml b/platypush/plugins/application/manifest.yaml new file mode 100644 index 00000000..68cddacb --- /dev/null +++ b/platypush/plugins/application/manifest.yaml @@ -0,0 +1,6 @@ +manifest: + events: {} + install: + pip: [] + package: platypush.plugins.application + type: plugin diff --git a/platypush/plugins/system/manifest.yaml b/platypush/plugins/system/manifest.yaml index bf404b99..67137e13 100644 --- a/platypush/plugins/system/manifest.yaml +++ b/platypush/plugins/system/manifest.yaml @@ -3,6 +3,5 @@ manifest: install: pip: - py-cpuinfo - - psutil package: platypush.plugins.system type: plugin diff --git a/platypush/process/__init__.py b/platypush/process/__init__.py new file mode 100644 index 00000000..582a8e7f --- /dev/null +++ b/platypush/process/__init__.py @@ -0,0 +1,135 @@ +from abc import ABC, abstractmethod +import logging +from multiprocessing import Event, Process, RLock +from os import getpid +from typing import Optional +from typing_extensions import override + + +class ControllableProcess(Process, ABC): + """ + Extends the ``Process`` class to allow for external extended control of the + underlying process. + """ + + _kill_timeout: float = 10.0 + + def __init__(self, *args, timeout: Optional[float] = None, **kwargs): + kwargs['name'] = kwargs.get('name', self.__class__.__name__) + super().__init__(*args, **kwargs) + + self.timeout = timeout + self.logger = logging.getLogger(self.name) + self._should_stop = Event() + self._stop_lock = RLock() + self._should_restart = False + + @property + def _timeout_err(self) -> TimeoutError: + return TimeoutError(f'Process {self.name} timed out') + + @property + def should_stop(self) -> bool: + """ + :return: ``True`` if the process is scheduled for stop, ``False`` + otherwise. + """ + return self._should_stop.is_set() + + def wait_stop(self, timeout: Optional[float] = None) -> None: + """ + Waits for the process to stop. + + :param timeout: The maximum time to wait for the process to stop. + """ + timeout = timeout if timeout is not None else self.timeout + stopped = self._should_stop.wait(timeout=timeout) + + if not stopped: + raise self._timeout_err + + if self.pid == getpid(): + return # Prevent termination deadlock + + self.join(timeout=timeout) + if self.is_alive(): + raise self._timeout_err + + def stop(self, timeout: Optional[float] = None) -> None: + """ + Stops the process. + + :param timeout: The maximum time to wait for the process to stop. + """ + timeout = timeout if timeout is not None else self._kill_timeout + with self._stop_lock: + self._should_stop.set() + self.on_stop() + + try: + if self.pid != getpid(): + self.wait_stop(timeout=timeout) + except TimeoutError: + pass + finally: + self.terminate() + + try: + if self.pid != getpid(): + self.wait_stop(timeout=self._kill_timeout) + except TimeoutError: + self.logger.warning( + 'The process %s is still alive after %f seconds, killing it', + self.name, + self._kill_timeout, + ) + self.kill() + + def on_stop(self) -> None: + """ + Handler called when the process is stopped. + + It can be implemented by subclasses. + """ + + @property + def should_restart(self) -> bool: + """ + :return: ``True`` if the process is marked for restart after stop, + ``False`` otherwise. + """ + return self._should_restart + + def mark_for_restart(self): + """ + Marks the process for restart after stop. + """ + self._should_restart = True + + @abstractmethod + def main(self): + """ + The main function of the process. + + It must be implemented by subclasses. + """ + + def _main(self): + """ + Wrapper for the main function of the process. + """ + self._should_restart = False + return self.main() + + @override + def run(self) -> None: + """ + Executes the process. + """ + super().run() + + try: + self._main() + finally: + self._should_stop.set() + self.logger.info('Process terminated') diff --git a/platypush/runner/__init__.py b/platypush/runner/__init__.py new file mode 100644 index 00000000..e0c8269f --- /dev/null +++ b/platypush/runner/__init__.py @@ -0,0 +1,17 @@ +from ._runner import ApplicationRunner + + +def main(*args: str): + """ + Main application entry point. + + This is usually the entry point that you want to use to start your + application, rather than :meth:`platypush.app.main`, as this entry point + wraps the main application in a controllable process. + """ + + app_runner = ApplicationRunner() + app_runner.run(*args) + + +__all__ = ["ApplicationRunner", "main"] diff --git a/platypush/runner/__main__.py b/platypush/runner/__main__.py new file mode 100644 index 00000000..ab75238d --- /dev/null +++ b/platypush/runner/__main__.py @@ -0,0 +1,5 @@ +import sys + +from . import main + +main(sys.argv[1:]) diff --git a/platypush/runner/_app.py b/platypush/runner/_app.py new file mode 100644 index 00000000..6ff9207b --- /dev/null +++ b/platypush/runner/_app.py @@ -0,0 +1,59 @@ +import logging +import os +import signal +import subprocess +import sys +from typing_extensions import override + +from platypush.process import ControllableProcess + + +class ApplicationProcess(ControllableProcess): + """ + Controllable process wrapper interface for the main application. + """ + + def __init__(self, *args: str, pidfile: str, **kwargs): + super().__init__(name='platypush', **kwargs) + + self.logger = logging.getLogger('platypush') + self.args = args + self.pidfile = pidfile + + def __enter__(self) -> "ApplicationProcess": + self.start() + return self + + def __exit__(self, *_, **__): + self.stop() + + @override + def main(self): + self.logger.info('Starting application...') + + with subprocess.Popen( + ['python', '-m', 'platypush.app', *self.args], + stdin=sys.stdin, + stdout=sys.stdout, + stderr=sys.stderr, + ) as app: + try: + app.wait() + except KeyboardInterrupt: + pass + + @override + def on_stop(self): + try: + with open(self.pidfile, 'r') as f: + pid = int(f.read().strip()) + except (OSError, ValueError): + pid = None + + if not pid: + return + + try: + os.kill(pid, signal.SIGINT) + except OSError: + pass diff --git a/platypush/runner/_runner.py b/platypush/runner/_runner.py new file mode 100644 index 00000000..ac713c9d --- /dev/null +++ b/platypush/runner/_runner.py @@ -0,0 +1,91 @@ +import logging +import sys +from threading import Thread +from typing import Optional + +from platypush.cli import parse_cmdline +from platypush.commands import CommandStream + +from ._app import ApplicationProcess + + +class ApplicationRunner: + """ + Runner for the main application. + + It wraps the main application and provides an interface to control it + externally. + """ + + _default_timeout = 0.5 + + def __init__(self): + logging.basicConfig(level=logging.INFO, stream=sys.stdout) + self.logger = logging.getLogger('platypush:runner') + self._proc: Optional[ApplicationProcess] = None + + def _listen(self, stream: CommandStream): + """ + Listens for external application commands and executes them. + """ + while self.is_running: + cmd = stream.read(timeout=self._default_timeout) + if not cmd: + continue + + self.logger.info(cmd) + Thread(target=cmd, args=(self,)).start() + + def _print_version(self): + from platypush import __version__ + + print(__version__) + sys.exit(0) + + def _run(self, *args: str) -> None: + parsed_args = parse_cmdline(args) + + if parsed_args.version: + self._print_version() + + while True: + with ( + CommandStream(parsed_args.ctrl_sock) as stream, + ApplicationProcess( + *args, pidfile=parsed_args.pidfile, timeout=self._default_timeout + ) as self._proc, + ): + try: + self._listen(stream) + except KeyboardInterrupt: + pass + + if self.should_restart: + self.logger.info('Restarting application...') + continue + + break + + def run(self, *args: str) -> None: + try: + self._run(*args) + finally: + self._proc = None + + def stop(self): + if self._proc is not None: + self._proc.stop() + + def restart(self): + if self._proc is not None: + self._proc.mark_for_restart() + + self.stop() + + @property + def is_running(self) -> bool: + return bool(self._proc and self._proc.is_alive()) + + @property + def should_restart(self) -> bool: + return self._proc.should_restart if self._proc is not None else False diff --git a/platypush/utils/__init__.py b/platypush/utils/__init__.py index fd80f1e9..20763fc9 100644 --- a/platypush/utils/__init__.py +++ b/platypush/utils/__init__.py @@ -14,7 +14,9 @@ import socket import ssl import urllib.request from threading import Lock as TLock -from typing import Generator, Optional, Tuple, Union +from tempfile import gettempdir +import time +from typing import Generator, Optional, Tuple, Type, Union from dateutil import parser, tz from redis import Redis @@ -625,4 +627,23 @@ def get_lock( lock.release() +def get_default_pid_file() -> str: + """ + Get the default PID file path. + """ + return os.path.join(gettempdir(), 'platypush.pid') + + +def get_remaining_timeout( + timeout: Optional[float], start: float, cls: Union[Type[int], Type[float]] = float +) -> Optional[Union[int, float]]: + """ + Get the remaining timeout, given a start time. + """ + if timeout is None: + return None + + return cls(max(0, timeout - (time.time() - start))) + + # vim:sw=4:ts=4:et: diff --git a/requirements.txt b/requirements.txt index d61383ca..22900c92 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,6 +11,7 @@ frozendict marshmallow marshmallow_dataclass paho-mqtt +psutil python-dateutil python-magic pyyaml diff --git a/setup.py b/setup.py index f71f1262..e67b5713 100755 --- a/setup.py +++ b/setup.py @@ -68,6 +68,7 @@ setup( 'frozendict', 'marshmallow', 'marshmallow_dataclass', + 'psutil', 'python-dateutil', 'python-magic', 'pyyaml', @@ -224,7 +225,7 @@ setup( # Support for Graphite integration 'graphite': ['graphyte'], # Support for CPU and memory monitoring and info - 'sys': ['py-cpuinfo', 'psutil'], + 'sys': ['py-cpuinfo'], # Support for nmap integration 'nmap': ['python-nmap'], # Support for zigbee2mqtt diff --git a/tests/conftest.py b/tests/conftest.py index 017042ae..9e3013dd 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -45,7 +45,7 @@ def app(): yield _app logging.info('Stopping Platypush test service') - _app.stop_app() + _app.stop() clear_loggers() db = (Config.get('main.db') or {}).get('engine', '')[len('sqlite:///') :]