diff --git a/platypush/__main__.py b/platypush/__main__.py index 2b4e690896..737e265b75 100644 --- a/platypush/__main__.py +++ b/platypush/__main__.py @@ -1,10 +1,5 @@ import sys -from platypush.app import main +from platypush.runner import main - -if __name__ == '__main__': - main(*sys.argv[1:]) - - -# vim:sw=4:ts=4:et: +main(*sys.argv[1:]) diff --git a/platypush/app/__init__.py b/platypush/app/__init__.py new file mode 100644 index 0000000000..4bd528318e --- /dev/null +++ b/platypush/app/__init__.py @@ -0,0 +1,4 @@ +from ._app import Application, main + + +__all__ = ["Application", "main"] diff --git a/platypush/app/__main__.py b/platypush/app/__main__.py new file mode 100644 index 0000000000..ed2e42208f --- /dev/null +++ b/platypush/app/__main__.py @@ -0,0 +1,7 @@ +import sys + +from ._app import main + + +if __name__ == '__main__': + sys.exit(main(*sys.argv[1:])) diff --git a/platypush/app.py b/platypush/app/_app.py similarity index 81% rename from platypush/app.py rename to platypush/app/_app.py index d262b6158e..e95d0db3f6 100644 --- a/platypush/app.py +++ b/platypush/app/_app.py @@ -1,24 +1,26 @@ +from contextlib import contextmanager import logging import os +import signal import subprocess import sys from typing import Optional, Sequence -from .bus import Bus -from .bus.redis import RedisBus -from .cli import parse_cmdline -from .commands import CommandStream -from .config import Config -from .context import register_backends, register_plugins -from .cron.scheduler import CronScheduler -from .entities import init_entities_engine, EntitiesEngine -from .event.processor import EventProcessor -from .logger import Logger -from .message.event import Event -from .message.event.application import ApplicationStartedEvent -from .message.request import Request -from .message.response import Response -from .utils import get_enabled_plugins, get_redis_conf +from platypush.bus import Bus +from platypush.bus.redis import RedisBus +from platypush.cli import parse_cmdline +from platypush.commands import CommandStream +from platypush.config import Config +from platypush.context import register_backends, register_plugins +from platypush.cron.scheduler import CronScheduler +from platypush.entities import init_entities_engine, EntitiesEngine +from platypush.event.processor import EventProcessor +from platypush.logger import Logger +from platypush.message.event import Event +from platypush.message.event.application import ApplicationStartedEvent +from platypush.message.request import Request +from platypush.message.response import Response +from platypush.utils import get_enabled_plugins, get_redis_conf log = logging.getLogger('platypush') @@ -87,10 +89,6 @@ class Application: """ self.pidfile = pidfile - if pidfile: - with open(pidfile, 'w') as f: - f.write(str(os.getpid())) - self.bus: Optional[Bus] = None self.redis_queue = redis_queue or RedisBus.DEFAULT_REDIS_QUEUE self.config_file = config_file @@ -98,7 +96,9 @@ class Application: self._logsdir = ( os.path.abspath(os.path.expanduser(logsdir)) if logsdir else None ) + Config.init(self.config_file) + Config.set('ctrl_sock', ctrl_sock) if workdir: Config.set('workdir', os.path.abspath(os.path.expanduser(workdir))) @@ -244,36 +244,66 @@ class Application: def stop(self): """Stops the backends and the bus.""" - from .plugins import RunnablePlugin + from platypush.plugins import RunnablePlugin log.info('Stopping the application') + backends = (self.backends or {}).copy().values() + runnable_plugins = [ + plugin + for plugin in get_enabled_plugins().values() + if isinstance(plugin, RunnablePlugin) + ] - if self.backends: - for backend in self.backends.values(): - backend.stop() + for backend in backends: + backend.stop() - for plugin in get_enabled_plugins().values(): - if isinstance(plugin, RunnablePlugin): - plugin.stop() + for plugin in runnable_plugins: + plugin.stop() + + for backend in backends: + backend.wait_stop() + + for plugin in runnable_plugins: + plugin.wait_stop() + + if self.entities_engine: + self.entities_engine.stop() + self.entities_engine.wait_stop() + self.entities_engine = None + + if self.cron_scheduler: + self.cron_scheduler.stop() + self.cron_scheduler.wait_stop() + self.cron_scheduler = None if self.bus: self.bus.stop() self.bus = None - if self.cron_scheduler: - self.cron_scheduler.stop() - self.cron_scheduler = None - - if self.entities_engine: - self.entities_engine.stop() - self.entities_engine = None - if self.start_redis: self._stop_redis() - def run(self): - """Start the daemon.""" - from . import __version__ + log.info('Exiting application') + + @contextmanager + def _open_pidfile(self): + if self.pidfile: + try: + with open(self.pidfile, 'w') as f: + f.write(str(os.getpid())) + except OSError as e: + log.warning('Failed to create PID file %s: %s', self.pidfile, e) + + yield + + if self.pidfile: + try: + os.remove(self.pidfile) + except OSError as e: + log.warning('Failed to remove PID file %s: %s', self.pidfile, e) + + def _run(self): + from platypush import __version__ if not self.no_capture_stdout: sys.stdout = Logger(log.info) @@ -312,9 +342,17 @@ class Application: self.bus.poll() except KeyboardInterrupt: log.info('SIGINT received, terminating application') + # Ignore other SIGINT signals + signal.signal(signal.SIGINT, signal.SIG_IGN) finally: self.stop() + def run(self): + """Run the application.""" + + with self._open_pidfile(): + self._run() + def main(*args: str): """ @@ -327,12 +365,7 @@ def main(*args: str): except KeyboardInterrupt: pass - log.info('Application stopped') return 0 -if __name__ == '__main__': - sys.exit(main(*sys.argv[1:])) - - # vim:sw=4:ts=4:et: diff --git a/platypush/commands/_stream.py b/platypush/commands/_stream.py index 555c1301fd..3ae185d3ea 100644 --- a/platypush/commands/_stream.py +++ b/platypush/commands/_stream.py @@ -31,7 +31,7 @@ class CommandStream(ControllableProcess): """Close the client socket after this amount of seconds.""" def __init__(self, path: Optional[str] = None): - super().__init__(name='platypush-cmd-stream') + super().__init__(name='platypush:cmd:stream') self.path = os.path.abspath(os.path.expanduser(path or self._default_sock_path)) self._sock: Optional[socket.socket] = None self._cmd_queue: Queue["Command"] = Queue() @@ -64,6 +64,7 @@ class CommandStream(ControllableProcess): sock.bind(self.path) os.chmod(self.path, 0o600) sock.listen(1) + self.start() return self def __exit__(self, *_, **__): diff --git a/platypush/plugins/system/manifest.yaml b/platypush/plugins/system/manifest.yaml index bf404b996d..67137e13b8 100644 --- a/platypush/plugins/system/manifest.yaml +++ b/platypush/plugins/system/manifest.yaml @@ -3,6 +3,5 @@ manifest: install: pip: - py-cpuinfo - - psutil package: platypush.plugins.system type: plugin diff --git a/platypush/process/__init__.py b/platypush/process/__init__.py index 37eddac49f..582a8e7f63 100644 --- a/platypush/process/__init__.py +++ b/platypush/process/__init__.py @@ -1,5 +1,5 @@ from abc import ABC, abstractmethod -from logging import getLogger +import logging from multiprocessing import Event, Process, RLock from os import getpid from typing import Optional @@ -17,8 +17,9 @@ class ControllableProcess(Process, ABC): def __init__(self, *args, timeout: Optional[float] = None, **kwargs): kwargs['name'] = kwargs.get('name', self.__class__.__name__) super().__init__(*args, **kwargs) + self.timeout = timeout - self.logger = getLogger(self.name) + self.logger = logging.getLogger(self.name) self._should_stop = Event() self._stop_lock = RLock() self._should_restart = False @@ -60,23 +61,22 @@ class ControllableProcess(Process, ABC): :param timeout: The maximum time to wait for the process to stop. """ - timeout = timeout if timeout is not None else self.timeout + timeout = timeout if timeout is not None else self._kill_timeout with self._stop_lock: self._should_stop.set() self.on_stop() - if self.pid == getpid(): - return # Prevent termination deadlock - try: - self.wait_stop(timeout=timeout) + if self.pid != getpid(): + self.wait_stop(timeout=timeout) except TimeoutError: pass finally: self.terminate() try: - self.wait_stop(timeout=self._kill_timeout) + if self.pid != getpid(): + self.wait_stop(timeout=self._kill_timeout) except TimeoutError: self.logger.warning( 'The process %s is still alive after %f seconds, killing it', diff --git a/platypush/runner/__init__.py b/platypush/runner/__init__.py new file mode 100644 index 0000000000..e0c8269fde --- /dev/null +++ b/platypush/runner/__init__.py @@ -0,0 +1,17 @@ +from ._runner import ApplicationRunner + + +def main(*args: str): + """ + Main application entry point. + + This is usually the entry point that you want to use to start your + application, rather than :meth:`platypush.app.main`, as this entry point + wraps the main application in a controllable process. + """ + + app_runner = ApplicationRunner() + app_runner.run(*args) + + +__all__ = ["ApplicationRunner", "main"] diff --git a/platypush/runner/__main__.py b/platypush/runner/__main__.py new file mode 100644 index 0000000000..ab75238d54 --- /dev/null +++ b/platypush/runner/__main__.py @@ -0,0 +1,5 @@ +import sys + +from . import main + +main(sys.argv[1:]) diff --git a/platypush/runner/_app.py b/platypush/runner/_app.py new file mode 100644 index 0000000000..6ff9207be5 --- /dev/null +++ b/platypush/runner/_app.py @@ -0,0 +1,59 @@ +import logging +import os +import signal +import subprocess +import sys +from typing_extensions import override + +from platypush.process import ControllableProcess + + +class ApplicationProcess(ControllableProcess): + """ + Controllable process wrapper interface for the main application. + """ + + def __init__(self, *args: str, pidfile: str, **kwargs): + super().__init__(name='platypush', **kwargs) + + self.logger = logging.getLogger('platypush') + self.args = args + self.pidfile = pidfile + + def __enter__(self) -> "ApplicationProcess": + self.start() + return self + + def __exit__(self, *_, **__): + self.stop() + + @override + def main(self): + self.logger.info('Starting application...') + + with subprocess.Popen( + ['python', '-m', 'platypush.app', *self.args], + stdin=sys.stdin, + stdout=sys.stdout, + stderr=sys.stderr, + ) as app: + try: + app.wait() + except KeyboardInterrupt: + pass + + @override + def on_stop(self): + try: + with open(self.pidfile, 'r') as f: + pid = int(f.read().strip()) + except (OSError, ValueError): + pid = None + + if not pid: + return + + try: + os.kill(pid, signal.SIGINT) + except OSError: + pass diff --git a/platypush/runner/_runner.py b/platypush/runner/_runner.py new file mode 100644 index 0000000000..ac713c9d25 --- /dev/null +++ b/platypush/runner/_runner.py @@ -0,0 +1,91 @@ +import logging +import sys +from threading import Thread +from typing import Optional + +from platypush.cli import parse_cmdline +from platypush.commands import CommandStream + +from ._app import ApplicationProcess + + +class ApplicationRunner: + """ + Runner for the main application. + + It wraps the main application and provides an interface to control it + externally. + """ + + _default_timeout = 0.5 + + def __init__(self): + logging.basicConfig(level=logging.INFO, stream=sys.stdout) + self.logger = logging.getLogger('platypush:runner') + self._proc: Optional[ApplicationProcess] = None + + def _listen(self, stream: CommandStream): + """ + Listens for external application commands and executes them. + """ + while self.is_running: + cmd = stream.read(timeout=self._default_timeout) + if not cmd: + continue + + self.logger.info(cmd) + Thread(target=cmd, args=(self,)).start() + + def _print_version(self): + from platypush import __version__ + + print(__version__) + sys.exit(0) + + def _run(self, *args: str) -> None: + parsed_args = parse_cmdline(args) + + if parsed_args.version: + self._print_version() + + while True: + with ( + CommandStream(parsed_args.ctrl_sock) as stream, + ApplicationProcess( + *args, pidfile=parsed_args.pidfile, timeout=self._default_timeout + ) as self._proc, + ): + try: + self._listen(stream) + except KeyboardInterrupt: + pass + + if self.should_restart: + self.logger.info('Restarting application...') + continue + + break + + def run(self, *args: str) -> None: + try: + self._run(*args) + finally: + self._proc = None + + def stop(self): + if self._proc is not None: + self._proc.stop() + + def restart(self): + if self._proc is not None: + self._proc.mark_for_restart() + + self.stop() + + @property + def is_running(self) -> bool: + return bool(self._proc and self._proc.is_alive()) + + @property + def should_restart(self) -> bool: + return self._proc.should_restart if self._proc is not None else False diff --git a/requirements.txt b/requirements.txt index d61383ca57..22900c9268 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,6 +11,7 @@ frozendict marshmallow marshmallow_dataclass paho-mqtt +psutil python-dateutil python-magic pyyaml diff --git a/setup.py b/setup.py index f71f126250..e67b5713d4 100755 --- a/setup.py +++ b/setup.py @@ -68,6 +68,7 @@ setup( 'frozendict', 'marshmallow', 'marshmallow_dataclass', + 'psutil', 'python-dateutil', 'python-magic', 'pyyaml', @@ -224,7 +225,7 @@ setup( # Support for Graphite integration 'graphite': ['graphyte'], # Support for CPU and memory monitoring and info - 'sys': ['py-cpuinfo', 'psutil'], + 'sys': ['py-cpuinfo'], # Support for nmap integration 'nmap': ['python-nmap'], # Support for zigbee2mqtt