Merge pull request '[#272] Support for external stop/restart control on the application' (#273) from 272/external-proc-controller into master

Reviewed-on: platypush/platypush#273
This commit is contained in:
Fabio Manganiello 2023-08-15 21:34:43 +02:00
commit c2a17f0d75
34 changed files with 1142 additions and 243 deletions

View file

@ -66,11 +66,11 @@ steps:
image: python:3.11-alpine
commands:
- apk add --update --no-cache redis
- apk add --update --no-cache --virtual build-base g++ rust
- apk add --update --no-cache --virtual build-base g++ rust linux-headers
- pip install -U pip
- pip install .
- pip install -r requirements-tests.txt
- apk del build-base g++ rust
- apk del build-base g++ rust linux-headers
- pytest tests
- name: build-ui

View file

@ -0,0 +1,5 @@
``application``
===============
.. automodule:: platypush.plugins.application
:members:

View file

@ -8,6 +8,7 @@ Plugins
platypush/plugins/adafruit.io.rst
platypush/plugins/alarm.rst
platypush/plugins/application.rst
platypush/plugins/arduino.rst
platypush/plugins/assistant.echo.rst
platypush/plugins/assistant.google.rst

View file

@ -7,10 +7,10 @@ FROM python:3.11-alpine
RUN mkdir -p /install /app
COPY . /install
RUN apk add --update --no-cache redis
RUN apk add --update --no-cache --virtual build-base g++ rust
RUN apk add --update --no-cache --virtual build-base g++ rust linux-headers
RUN pip install -U pip
RUN cd /install && pip install .
RUN apk del build-base g++ rust
RUN apk del build-base g++ rust linux-headers
EXPOSE 8008
VOLUME /app/config

View file

@ -5,12 +5,13 @@ Platypush
.. license: MIT
"""
from .app import Application, main
from .app import Application
from .config import Config
from .context import get_backend, get_bus, get_plugin
from .message.event import Event
from .message.request import Request
from .message.response import Response
from .runner import main
__author__ = 'Fabio Manganiello <fabio@manganiello.tech>'

View file

@ -1,10 +1,5 @@
import sys
from platypush.app import main
from platypush.runner import main
if __name__ == '__main__':
main(*sys.argv[1:])
# vim:sw=4:ts=4:et:

View file

@ -0,0 +1,4 @@
from ._app import Application, main
__all__ = ["Application", "main"]

View file

@ -0,0 +1,7 @@
import sys
from ._app import main
if __name__ == '__main__':
sys.exit(main(*sys.argv[1:]))

View file

@ -1,23 +1,26 @@
import argparse
from contextlib import contextmanager
import logging
import os
import signal
import subprocess
import sys
from typing import Optional
from typing import Optional, Sequence
from .bus import Bus
from .bus.redis import RedisBus
from .config import Config
from .context import register_backends, register_plugins
from .cron.scheduler import CronScheduler
from .entities import init_entities_engine, EntitiesEngine
from .event.processor import EventProcessor
from .logger import Logger
from .message.event import Event
from .message.event.application import ApplicationStartedEvent
from .message.request import Request
from .message.response import Response
from .utils import get_enabled_plugins, get_redis_conf
from platypush.bus import Bus
from platypush.bus.redis import RedisBus
from platypush.cli import parse_cmdline
from platypush.commands import CommandStream
from platypush.config import Config
from platypush.context import register_backends, register_plugins
from platypush.cron.scheduler import CronScheduler
from platypush.entities import init_entities_engine, EntitiesEngine
from platypush.event.processor import EventProcessor
from platypush.logger import Logger
from platypush.message.event import Event
from platypush.message.event.application import ApplicationStartedEvent
from platypush.message.request import Request
from platypush.message.response import Response
from platypush.utils import get_enabled_plugins, get_redis_conf
log = logging.getLogger('platypush')
@ -25,9 +28,6 @@ log = logging.getLogger('platypush')
class Application:
"""Main class for the Platypush application."""
# Default bus queue name
_default_redis_queue = 'platypush/bus'
# Default Redis port
_default_redis_port = 6379
@ -51,6 +51,7 @@ class Application:
start_redis: bool = False,
redis_host: Optional[str] = None,
redis_port: Optional[int] = None,
ctrl_sock: Optional[str] = None,
):
"""
:param config_file: Configuration file override (default: None).
@ -82,21 +83,22 @@ class Application:
the settings in the ``redis`` section of the configuration file.
:param redis_port: Port of the local Redis server. It overrides the
settings in the ``redis`` section of the configuration file.
:param ctrl_sock: If set, it identifies a path to a UNIX domain socket
that the application can use to send control messages (e.g. STOP
and RESTART) to its parent.
"""
self.pidfile = pidfile
if pidfile:
with open(pidfile, 'w') as f:
f.write(str(os.getpid()))
self.bus: Optional[Bus] = None
self.redis_queue = redis_queue or self._default_redis_queue
self.redis_queue = redis_queue or RedisBus.DEFAULT_REDIS_QUEUE
self.config_file = config_file
self._verbose = verbose
self._logsdir = (
os.path.abspath(os.path.expanduser(logsdir)) if logsdir else None
)
Config.init(self.config_file)
Config.set('ctrl_sock', ctrl_sock)
if workdir:
Config.set('workdir', os.path.abspath(os.path.expanduser(workdir)))
@ -113,6 +115,7 @@ class Application:
self.redis_port = redis_port
self.redis_conf = {}
self._redis_proc: Optional[subprocess.Popen] = None
self.cmd_stream = CommandStream(ctrl_sock)
self._init_bus()
self._init_logging()
@ -187,133 +190,11 @@ class Application:
self._redis_proc = None
@classmethod
def build(cls, *args: str):
def from_cmdline(cls, args: Sequence[str]) -> "Application":
"""
Build the app from command line arguments.
"""
from . import __version__
parser = argparse.ArgumentParser()
parser.add_argument(
'--config',
'-c',
dest='config',
required=False,
default=None,
help='Custom location for the configuration file',
)
parser.add_argument(
'--workdir',
'-w',
dest='workdir',
required=False,
default=None,
help='Custom working directory to be used for the application',
)
parser.add_argument(
'--logsdir',
'-l',
dest='logsdir',
required=False,
default=None,
help='Store logs in the specified directory. By default, the '
'`[logging.]filename` configuration option will be used. If not '
'set, logging will be sent to stdout and stderr.',
)
parser.add_argument(
'--version',
dest='version',
required=False,
action='store_true',
help="Print the current version and exit",
)
parser.add_argument(
'--verbose',
'-v',
dest='verbose',
required=False,
action='store_true',
help="Enable verbose/debug logging",
)
parser.add_argument(
'--pidfile',
'-P',
dest='pidfile',
required=False,
default=None,
help="File where platypush will "
+ "store its PID, useful if you're planning to "
+ "integrate it in a service",
)
parser.add_argument(
'--no-capture-stdout',
dest='no_capture_stdout',
required=False,
action='store_true',
help="Set this flag if you have max stack depth "
+ "exceeded errors so stdout won't be captured by "
+ "the logging system",
)
parser.add_argument(
'--no-capture-stderr',
dest='no_capture_stderr',
required=False,
action='store_true',
help="Set this flag if you have max stack depth "
+ "exceeded errors so stderr won't be captured by "
+ "the logging system",
)
parser.add_argument(
'--redis-queue',
dest='redis_queue',
required=False,
default=cls._default_redis_queue,
help="Name of the Redis queue to be used to internally deliver messages "
"(default: platypush/bus)",
)
parser.add_argument(
'--start-redis',
dest='start_redis',
required=False,
action='store_true',
help="Set this flag if you want to run and manage Redis internally "
"from the app rather than using an external server. It requires the "
"redis-server executable to be present in the path",
)
parser.add_argument(
'--redis-host',
dest='redis_host',
required=False,
default=None,
help="Overrides the host specified in the redis section of the "
"configuration file",
)
parser.add_argument(
'--redis-port',
dest='redis_port',
required=False,
default=None,
help="Overrides the port specified in the redis section of the "
"configuration file",
)
opts, _ = parser.parse_known_args(args)
if opts.version:
print(__version__)
sys.exit(0)
opts = parse_cmdline(args)
return cls(
config_file=opts.config,
workdir=opts.workdir,
@ -326,6 +207,7 @@ class Application:
start_redis=opts.start_redis,
redis_host=opts.redis_host,
redis_port=opts.redis_port,
ctrl_sock=opts.ctrl_sock,
)
def on_message(self):
@ -351,7 +233,7 @@ class Application:
self.requests_to_process
and self.processed_requests >= self.requests_to_process
):
self.stop_app()
self.stop()
elif isinstance(msg, Response):
msg.log()
elif isinstance(msg, Event):
@ -360,36 +242,68 @@ class Application:
return _f
def stop_app(self):
def stop(self):
"""Stops the backends and the bus."""
from .plugins import RunnablePlugin
from platypush.plugins import RunnablePlugin
if self.backends:
for backend in self.backends.values():
log.info('Stopping the application')
backends = (self.backends or {}).copy().values()
runnable_plugins = [
plugin
for plugin in get_enabled_plugins().values()
if isinstance(plugin, RunnablePlugin)
]
for backend in backends:
backend.stop()
for plugin in get_enabled_plugins().values():
if isinstance(plugin, RunnablePlugin):
for plugin in runnable_plugins:
plugin.stop()
for backend in backends:
backend.wait_stop()
for plugin in runnable_plugins:
plugin.wait_stop()
if self.entities_engine:
self.entities_engine.stop()
self.entities_engine.wait_stop()
self.entities_engine = None
if self.cron_scheduler:
self.cron_scheduler.stop()
self.cron_scheduler.wait_stop()
self.cron_scheduler = None
if self.bus:
self.bus.stop()
self.bus = None
if self.cron_scheduler:
self.cron_scheduler.stop()
self.cron_scheduler = None
if self.entities_engine:
self.entities_engine.stop()
self.entities_engine = None
if self.start_redis:
self._stop_redis()
def run(self):
"""Start the daemon."""
from . import __version__
log.info('Exiting application')
@contextmanager
def _open_pidfile(self):
if self.pidfile:
try:
with open(self.pidfile, 'w') as f:
f.write(str(os.getpid()))
except OSError as e:
log.warning('Failed to create PID file %s: %s', self.pidfile, e)
yield
if self.pidfile:
try:
os.remove(self.pidfile)
except OSError as e:
log.warning('Failed to remove PID file %s: %s', self.pidfile, e)
def _run(self):
from platypush import __version__
if not self.no_capture_stdout:
sys.stdout = Logger(log.info)
@ -428,16 +342,30 @@ class Application:
self.bus.poll()
except KeyboardInterrupt:
log.info('SIGINT received, terminating application')
# Ignore other SIGINT signals
signal.signal(signal.SIGINT, signal.SIG_IGN)
finally:
self.stop_app()
self.stop()
def run(self):
"""Run the application."""
with self._open_pidfile():
self._run()
def main(*args: str):
"""
Application entry point.
"""
app = Application.build(*args)
app = Application.from_cmdline(args)
try:
app.run()
except KeyboardInterrupt:
pass
return 0
# vim:sw=4:ts=4:et:

View file

@ -6,27 +6,28 @@ import time
from threading import Thread, Event as ThreadEvent, get_ident
from typing import Optional, Dict
from platypush import __version__
from platypush.bus import Bus
from platypush.common import ExtensionWithManifest
from platypush.config import Config
from platypush.context import get_backend
from platypush.event import EventGenerator
from platypush.message import Message
from platypush.message.event import Event
from platypush.message.event.zeroconf import (
ZeroconfServiceAddedEvent,
ZeroconfServiceRemovedEvent,
)
from platypush.utils import (
set_timeout,
clear_timeout,
get_redis_queue_name_by_message,
get_backend_name_by_class,
)
from platypush import __version__
from platypush.event import EventGenerator
from platypush.message import Message
from platypush.message.event import Event
from platypush.message.request import Request
from platypush.message.response import Response
from platypush.utils import (
clear_timeout,
get_backend_name_by_class,
get_redis,
get_redis_queue_name_by_message,
get_remaining_timeout,
set_timeout,
)
class Backend(Thread, EventGenerator, ExtensionWithManifest):
@ -68,6 +69,7 @@ class Backend(Thread, EventGenerator, ExtensionWithManifest):
self.device_id = Config.get('device_id')
self.thread_id = None
self._stop_event = ThreadEvent()
self._stop_thread: Optional[Thread] = None
self._kwargs = kwargs
self.logger = logging.getLogger(
'platypush:backend:' + get_backend_name_by_class(self.__class__)
@ -299,30 +301,38 @@ class Backend(Thread, EventGenerator, ExtensionWithManifest):
self._stop_event.set()
self.unregister_service()
self.on_stop()
self._stop_thread = None
Thread(target=_async_stop).start()
if not (self._stop_thread and self._stop_thread.is_alive()):
self._stop_thread = Thread(target=_async_stop)
self._stop_thread.start()
def should_stop(self):
"""
:return: True if the backend thread should be stopped, False otherwise.
"""
return self._stop_event.is_set()
def wait_stop(self, timeout=None) -> bool:
return self._stop_event.wait(timeout)
"""
Waits for the backend thread to stop.
def _get_redis(self):
import redis
:param timeout: The maximum time to wait for the backend thread to stop (default: None)
:return: True if the backend thread has stopped, False otherwise.
"""
start = time.time()
redis_backend = get_backend('redis')
if not redis_backend:
self.logger.warning(
'Redis backend not configured - some '
'web server features may not be working properly'
if self._stop_thread:
try:
self._stop_thread.join(
get_remaining_timeout(timeout=timeout, start=start)
)
redis_args = {}
else:
redis_args = redis_backend.redis_args
except AttributeError:
pass
redis = redis.Redis(**redis_args)
return redis
return self._stop_event.wait(
get_remaining_timeout(timeout=timeout, start=start)
)
def get_message_response(self, msg):
queue = get_redis_queue_name_by_message(msg)
@ -331,7 +341,7 @@ class Backend(Thread, EventGenerator, ExtensionWithManifest):
return None
try:
redis = self._get_redis()
redis = get_redis()
response = redis.blpop(queue, timeout=60)
if response and len(response) > 1:
response = Message.build(response[1])

View file

@ -2,13 +2,17 @@ import asyncio
import os
import pathlib
import secrets
import signal
import threading
from functools import partial
from multiprocessing import Process
from time import time
from typing import List, Mapping, Optional
from tornado.httpserver import HTTPServer
from typing import Mapping, Optional
import psutil
from tornado.httpserver import HTTPServer
from tornado.netutil import bind_sockets
from tornado.process import cpu_count, fork_processes
from tornado.wsgi import WSGIContainer
@ -18,9 +22,9 @@ from platypush.backend import Backend
from platypush.backend.http.app import application
from platypush.backend.http.app.utils import get_streaming_routes, get_ws_routes
from platypush.backend.http.app.ws.events import WSEventProxy
from platypush.bus.redis import RedisBus
from platypush.config import Config
from platypush.utils import get_remaining_timeout
class HttpBackend(Backend):
@ -191,6 +195,9 @@ class HttpBackend(Backend):
_DEFAULT_HTTP_PORT = 8008
"""The default listen port for the webserver."""
_STOP_TIMEOUT = 5
"""How long we should wait (in seconds) before killing the worker processes."""
def __init__(
self,
port: int = _DEFAULT_HTTP_PORT,
@ -227,7 +234,6 @@ class HttpBackend(Backend):
self.port = port
self._server_proc: Optional[Process] = None
self._workers: List[Process] = []
self._service_registry_thread = None
self.bind_address = bind_address
@ -254,35 +260,37 @@ class HttpBackend(Backend):
"""On backend stop"""
super().on_stop()
self.logger.info('Received STOP event on HttpBackend')
start_time = time()
timeout = 5
workers = self._workers.copy()
for i, worker in enumerate(workers[::-1]):
if worker and worker.is_alive():
worker.terminate()
worker.join(timeout=max(0, start_time + timeout - time()))
if worker and worker.is_alive():
worker.kill()
self._workers.pop(i)
start = time()
remaining_time: partial[float] = partial( # type: ignore
get_remaining_timeout, timeout=self._STOP_TIMEOUT, start=start
)
if self._server_proc:
if self._server_proc.pid:
try:
os.kill(self._server_proc.pid, signal.SIGINT)
except OSError:
pass
if self._server_proc and self._server_proc.is_alive():
self._server_proc.join(timeout=remaining_time() / 2)
try:
self._server_proc.terminate()
self._server_proc.join(timeout=5)
self._server_proc = None
self._server_proc.join(timeout=remaining_time() / 2)
except AttributeError:
pass
if self._server_proc and self._server_proc.is_alive():
self._server_proc.kill()
self._server_proc = None
self.logger.info('HTTP server terminated')
if self._service_registry_thread and self._service_registry_thread.is_alive():
self._service_registry_thread.join(timeout=5)
self._service_registry_thread.join(timeout=remaining_time())
self._service_registry_thread = None
self.logger.info('HTTP server terminated')
def notify_web_clients(self, event):
"""Notify all the connected web clients (over websocket) of a new event"""
WSEventProxy.publish(event) # noqa: E1120
@ -344,7 +352,10 @@ class HttpBackend(Backend):
try:
await asyncio.Event().wait()
except (asyncio.CancelledError, KeyboardInterrupt):
return
pass
finally:
server.stop()
await server.close_all_connections()
def _web_server_proc(self):
self.logger.info(
@ -371,8 +382,66 @@ class HttpBackend(Backend):
future = self._post_fork_main(sockets)
asyncio.run(future)
except (asyncio.CancelledError, KeyboardInterrupt):
pass
finally:
self._stop_workers()
def _stop_workers(self):
"""
Stop all the worker processes.
We have to run this manually on server termination because of a
long-standing issue with Tornado not being able to wind down the forked
workers when the server terminates:
https://github.com/tornadoweb/tornado/issues/1912.
"""
parent_pid = (
self._server_proc.pid
if self._server_proc and self._server_proc.pid
else None
)
if not parent_pid:
return
try:
cur_proc = psutil.Process(parent_pid)
except psutil.NoSuchProcess:
return
# Send a SIGTERM to all the children
children = cur_proc.children()
for child in children:
if child.pid != parent_pid and child.is_running():
try:
os.kill(child.pid, signal.SIGTERM)
except OSError as e:
self.logger.warning(
'Could not send SIGTERM to PID %d: %s', child.pid, e
)
# Initialize the timeout
start = time()
remaining_time: partial[int] = partial( # type: ignore
get_remaining_timeout, timeout=self._STOP_TIMEOUT, start=start, cls=int
)
# Wait for all children to terminate (with timeout)
for child in children:
if child.pid != parent_pid and child.is_running():
try:
child.wait(timeout=remaining_time())
except TimeoutError:
pass
# Send a SIGKILL to any child process that is still running
for child in children:
if child.pid != parent_pid and child.is_running():
try:
child.kill()
except OSError:
pass
def _start_web_server(self):
self._server_proc = Process(target=self._web_server_proc)
self._server_proc.start()

View file

@ -1,6 +1,6 @@
import logging
import threading
from typing import Optional
from typing import Final, Optional
from platypush.bus import Bus
from platypush.message import Message
@ -13,7 +13,7 @@ class RedisBus(Bus):
Overrides the in-process in-memory local bus with a Redis bus
"""
_DEFAULT_REDIS_QUEUE = 'platypush/bus'
DEFAULT_REDIS_QUEUE: Final[str] = 'platypush/bus'
def __init__(self, *args, on_message=None, redis_queue=None, **kwargs):
from platypush.utils import get_redis
@ -21,7 +21,7 @@ class RedisBus(Bus):
super().__init__(on_message=on_message)
self.redis = get_redis(*args, **kwargs)
self.redis_args = kwargs
self.redis_queue = redis_queue or self._DEFAULT_REDIS_QUEUE
self.redis_queue = redis_queue or self.DEFAULT_REDIS_QUEUE
self.on_message = on_message
self.thread_id = threading.get_ident()

139
platypush/cli.py Normal file
View file

@ -0,0 +1,139 @@
import argparse
from typing import Sequence
from platypush.bus.redis import RedisBus
from platypush.utils import get_default_pid_file
def parse_cmdline(args: Sequence[str]) -> argparse.Namespace:
"""
Parse command-line arguments from a list of strings.
"""
parser = argparse.ArgumentParser()
parser.add_argument(
'--config',
'-c',
dest='config',
required=False,
default=None,
help='Custom location for the configuration file',
)
parser.add_argument(
'--workdir',
'-w',
dest='workdir',
required=False,
default=None,
help='Custom working directory to be used for the application',
)
parser.add_argument(
'--logsdir',
'-l',
dest='logsdir',
required=False,
default=None,
help='Store logs in the specified directory. By default, the '
'`[logging.]filename` configuration option will be used. If not '
'set, logging will be sent to stdout and stderr.',
)
parser.add_argument(
'--version',
dest='version',
required=False,
action='store_true',
help="Print the current version and exit",
)
parser.add_argument(
'--verbose',
'-v',
dest='verbose',
required=False,
action='store_true',
help="Enable verbose/debug logging",
)
parser.add_argument(
'--pidfile',
'-P',
dest='pidfile',
required=False,
default=get_default_pid_file(),
help="File where platypush will "
+ "store its PID, useful if you're planning to "
+ f"integrate it in a service (default: {get_default_pid_file()})",
)
parser.add_argument(
'--no-capture-stdout',
dest='no_capture_stdout',
required=False,
action='store_true',
help="Set this flag if you have max stack depth "
+ "exceeded errors so stdout won't be captured by "
+ "the logging system",
)
parser.add_argument(
'--no-capture-stderr',
dest='no_capture_stderr',
required=False,
action='store_true',
help="Set this flag if you have max stack depth "
+ "exceeded errors so stderr won't be captured by "
+ "the logging system",
)
parser.add_argument(
'--redis-queue',
dest='redis_queue',
required=False,
default=RedisBus.DEFAULT_REDIS_QUEUE,
help="Name of the Redis queue to be used to internally deliver messages "
f"(default: {RedisBus.DEFAULT_REDIS_QUEUE})",
)
parser.add_argument(
'--start-redis',
dest='start_redis',
required=False,
action='store_true',
help="Set this flag if you want to run and manage Redis internally "
"from the app rather than using an external server. It requires the "
"redis-server executable to be present in the path",
)
parser.add_argument(
'--redis-host',
dest='redis_host',
required=False,
default=None,
help="Overrides the host specified in the redis section of the "
"configuration file",
)
parser.add_argument(
'--redis-port',
dest='redis_port',
required=False,
default=None,
help="Overrides the port specified in the redis section of the "
"configuration file",
)
parser.add_argument(
'--ctrl-sock',
dest='ctrl_sock',
required=False,
default=None,
help="If set, it identifies a path to a UNIX domain socket that "
"the application can use to send control messages (e.g. STOP and "
"RESTART) to its parent.",
)
opts, _ = parser.parse_known_args(args)
return opts

View file

@ -0,0 +1,5 @@
from ._base import Command
from ._commands import RestartCommand, StopCommand
from ._stream import CommandStream
__all__ = ["Command", "CommandStream", "RestartCommand", "StopCommand"]

View file

@ -0,0 +1,78 @@
from abc import ABC, abstractmethod
import json
from logging import getLogger, Logger
class Command(ABC):
"""
Base class for application commands.
"""
END_OF_COMMAND = b'\x00'
"""End-of-command marker."""
def __init__(self, **args) -> None:
self.args = args
@property
def logger(self) -> Logger:
"""
The command class logger.
"""
return getLogger(self.__class__.__name__)
@abstractmethod
def __call__(self, app, *_, **__):
"""
Execute the command.
"""
raise NotImplementedError()
def __str__(self) -> str:
"""
:return: A JSON representation of the command.
"""
return json.dumps(
{
'type': 'command',
'command': self.__class__.__name__,
'args': self.args,
}
)
def to_bytes(self):
"""
:return: A JSON representation of the command.
"""
return str(self).encode('utf-8') + self.END_OF_COMMAND
@classmethod
def parse(cls, data: bytes) -> "Command":
"""
:param data: A JSON representation of the command.
:raise ValueError: If the data is invalid.
:return: The command instance or None if the data is invalid.
"""
import platypush.commands
try:
json_data = json.loads(data.decode('utf-8'))
except json.JSONDecodeError as e:
raise ValueError from e
kind = json_data.pop('type', None)
if kind != 'command':
raise ValueError(f'Invalid command type: {kind}')
command_name = json_data.get('command')
if not command_name:
raise ValueError(f'Invalid command name: {command_name}')
cmd_class = getattr(platypush.commands, command_name, None)
if not (cmd_class and issubclass(cmd_class, Command)):
raise ValueError(f'Invalid command class: {command_name}')
try:
return cmd_class(**json_data.get('args', {}))
except Exception as e:
raise ValueError(e) from e

View file

@ -0,0 +1,2 @@
# flake8: noqa
from ._app_ctrl import *

View file

@ -0,0 +1,25 @@
from typing_extensions import override
from platypush.commands import Command
class StopCommand(Command):
"""
Stop the application.
"""
@override
def __call__(self, app, *_, **__):
self.logger.info('Received StopApplication command.')
app.stop()
class RestartCommand(Command):
"""
Restart the application.
"""
@override
def __call__(self, app, *_, **__):
self.logger.info('Received RestartApplication command.')
app.restart()

View file

@ -0,0 +1,69 @@
from logging import getLogger
from socket import socket
from typing import Optional
from platypush.commands import Command
# pylint: disable=too-few-public-methods
class CommandReader:
"""
Reads command objects from file-like I/O objects.
"""
_max_bufsize = 8192
"""Maximum size of a command that can be queued in the stream."""
_bufsize = 1024
"""Size of the buffer used to read commands from the socket."""
def __init__(self):
self.logger = getLogger(__name__)
self._buf = bytes()
def _parse_command(self, data: bytes) -> Optional[Command]:
"""
Parses a command from the received data.
:param data: Data received from the socket
:return: The parsed command
"""
try:
return Command.parse(data)
except ValueError as e:
self.logger.warning('Error while parsing command: %s', e)
return None
def read(self, sock: socket) -> Optional[Command]:
"""
Parses the next command from the file-like I/O object.
:param fp: The file-like I/O object to read from.
:return: The parsed command.
"""
try:
data = sock.recv(self._bufsize)
except OSError as e:
self.logger.warning(
'Error while reading from socket %s: %s', sock.getsockname(), e
)
return None
for ch in data:
if bytes([ch]) == Command.END_OF_COMMAND:
cmd = self._parse_command(self._buf)
self._buf = bytes()
if cmd:
return cmd
elif len(self._buf) >= self._max_bufsize:
self.logger.warning(
'The received command is too long: length=%d', len(self._buf)
)
self._buf = bytes()
break
else:
self._buf += bytes([ch])
return None

View file

@ -0,0 +1,139 @@
from multiprocessing import Queue
import os
from queue import Empty
import socket
import tempfile
from typing import Optional
from typing_extensions import override
from platypush.process import ControllableProcess
from ._base import Command
from ._reader import CommandReader
from ._writer import CommandWriter
class CommandStream(ControllableProcess):
"""
The command stream is an abstraction built around a UNIX socket that allows
the application to communicate commands to its controller.
:param path: Path to the UNIX socket
"""
_max_queue_size = 100
"""Maximum number of commands that can be queued in the stream."""
_default_sock_path = tempfile.gettempdir() + '/platypush-cmd-stream.sock'
"""Default path to the UNIX socket."""
_close_timeout = 5.0
"""Close the client socket after this amount of seconds."""
def __init__(self, path: Optional[str] = None):
super().__init__(name='platypush:cmd:stream')
self.path = os.path.abspath(os.path.expanduser(path or self._default_sock_path))
self._sock: Optional[socket.socket] = None
self._cmd_queue: Queue["Command"] = Queue()
def reset(self):
if self._sock is not None:
try:
self._sock.close()
except socket.error:
pass
self._sock = None
try:
os.unlink(self.path)
except FileNotFoundError:
pass
self._cmd_queue.close()
self._cmd_queue = Queue()
@override
def close(self) -> None:
self.reset()
return super().close()
def __enter__(self) -> "CommandStream":
self.reset()
sock = self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.bind(self.path)
os.chmod(self.path, 0o600)
sock.listen(1)
self.start()
return self
def __exit__(self, *_, **__):
self.terminate()
self.join()
self.close()
def _serve(self, sock: socket.socket):
"""
Serves the command stream.
:param sock: Client socket to serve
"""
reader = CommandReader()
cmd = reader.read(sock)
if cmd:
self._cmd_queue.put_nowait(cmd)
def read(self, timeout: Optional[float] = None) -> Optional[Command]:
"""
Reads commands from the command stream.
:param timeout: Maximum time to wait for a command.
:return: The command that was received, if any.
"""
try:
return self._cmd_queue.get(timeout=timeout)
except Empty:
return None
def write(self, cmd: Command) -> None:
"""
Writes a command to the command stream.
:param cmd: Command to write
:raise AssertionError: If the command cannot be written
"""
sock = self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
sock.connect(self.path)
self.logger.debug('Sending command: %s', cmd)
CommandWriter().write(cmd, sock)
except OSError as e:
raise AssertionError(f'Unable to connect to socket {self.path}: {e}') from e
finally:
sock.close()
@staticmethod
def _close_client(sock: socket.socket):
try:
sock.close()
except OSError:
pass
@override
def main(self):
while self._sock and not self.should_stop:
sock = self._sock
try:
conn, _ = sock.accept()
except ConnectionResetError:
continue
except KeyboardInterrupt:
break
try:
self._serve(conn)
except Exception as e:
self.logger.warning('Unexpected socket error: %s', e, exc_info=True)
finally:
self._close_client(conn)

View file

@ -0,0 +1,25 @@
from logging import getLogger
from socket import socket
from platypush.commands import Command
# pylint: disable=too-few-public-methods
class CommandWriter:
"""
Writes command objects to file-like I/O objects.
"""
def __init__(self):
self.logger = getLogger(__name__)
def write(self, cmd: Command, sock: socket):
"""
Writes a command to a file-like I/O object.
:param cmd: The command to write.
:param fp: The file-like I/O object to write to.
"""
buf = cmd.to_bytes()
sock.sendall(buf)

View file

@ -2,13 +2,14 @@ import datetime
import enum
import logging
import threading
from typing import Dict
import time
from typing import Dict, Optional
import croniter
from dateutil.tz import gettz
from platypush.procedure import Procedure
from platypush.utils import is_functional_cron
from platypush.utils import get_remaining_timeout, is_functional_cron
logger = logging.getLogger('platypush:cron')
@ -198,6 +199,20 @@ class CronScheduler(threading.Thread):
def should_stop(self):
return self._should_stop.is_set()
def wait_stop(self, timeout: Optional[float] = None):
start = time.time()
stopped = self._should_stop.wait(
timeout=get_remaining_timeout(timeout=timeout, start=start)
)
if not stopped:
raise TimeoutError(
f'Timeout waiting for {self.__class__.__name__} to stop.'
)
if threading.get_ident() != self.ident:
self.join(timeout=get_remaining_timeout(timeout=timeout, start=start))
def run(self):
logger.info('Running cron scheduler')

View file

@ -1,5 +1,6 @@
from logging import getLogger
from threading import Thread, Event
from threading import Thread, Event, get_ident
from time import time
from typing import Dict, Optional
from platypush.context import get_bus
@ -9,6 +10,7 @@ from platypush.message.event.entities import EntityUpdateEvent
from platypush.entities._base import EntityKey, EntitySavedCallback
from platypush.entities._engine.queue import EntitiesQueue
from platypush.entities._engine.repo import EntitiesRepository
from platypush.utils import get_remaining_timeout
class EntitiesEngine(Thread):
@ -69,6 +71,20 @@ class EntitiesEngine(Thread):
def stop(self):
self._should_stop.set()
def wait_stop(self, timeout: Optional[float] = None):
start = time()
stopped = self._should_stop.wait(
timeout=get_remaining_timeout(timeout=timeout, start=start)
)
if not stopped:
raise TimeoutError(
f'Timeout waiting for {self.__class__.__name__} to stop.'
)
if get_ident() != self.ident:
self.join(timeout=get_remaining_timeout(timeout=timeout, start=start))
def notify(self, *entities: Entity):
"""
Trigger an EntityUpdateEvent if the entity has been persisted, or queue

View file

@ -0,0 +1,31 @@
from typing import Optional
from platypush.commands import CommandStream, RestartCommand, StopCommand
from platypush.config import Config
from platypush.plugins import Plugin, action
class ApplicationPlugin(Plugin):
"""
This plugin is used to control and inspect the application state.
"""
@property
def _ctrl_sock(self) -> Optional[str]:
"""
:return: The path to the UNIX socket to control the application.
"""
return Config.get('ctrl_sock') # type: ignore
@action
def stop(self):
"""
Stop the application.
"""
CommandStream(self._ctrl_sock).write(StopCommand())
@action
def restart(self):
"""
Restart the application.
"""
CommandStream(self._ctrl_sock).write(RestartCommand())

View file

@ -0,0 +1,6 @@
manifest:
events: {}
install:
pip: []
package: platypush.plugins.application
type: plugin

View file

@ -3,6 +3,5 @@ manifest:
install:
pip:
- py-cpuinfo
- psutil
package: platypush.plugins.system
type: plugin

View file

@ -0,0 +1,135 @@
from abc import ABC, abstractmethod
import logging
from multiprocessing import Event, Process, RLock
from os import getpid
from typing import Optional
from typing_extensions import override
class ControllableProcess(Process, ABC):
"""
Extends the ``Process`` class to allow for external extended control of the
underlying process.
"""
_kill_timeout: float = 10.0
def __init__(self, *args, timeout: Optional[float] = None, **kwargs):
kwargs['name'] = kwargs.get('name', self.__class__.__name__)
super().__init__(*args, **kwargs)
self.timeout = timeout
self.logger = logging.getLogger(self.name)
self._should_stop = Event()
self._stop_lock = RLock()
self._should_restart = False
@property
def _timeout_err(self) -> TimeoutError:
return TimeoutError(f'Process {self.name} timed out')
@property
def should_stop(self) -> bool:
"""
:return: ``True`` if the process is scheduled for stop, ``False``
otherwise.
"""
return self._should_stop.is_set()
def wait_stop(self, timeout: Optional[float] = None) -> None:
"""
Waits for the process to stop.
:param timeout: The maximum time to wait for the process to stop.
"""
timeout = timeout if timeout is not None else self.timeout
stopped = self._should_stop.wait(timeout=timeout)
if not stopped:
raise self._timeout_err
if self.pid == getpid():
return # Prevent termination deadlock
self.join(timeout=timeout)
if self.is_alive():
raise self._timeout_err
def stop(self, timeout: Optional[float] = None) -> None:
"""
Stops the process.
:param timeout: The maximum time to wait for the process to stop.
"""
timeout = timeout if timeout is not None else self._kill_timeout
with self._stop_lock:
self._should_stop.set()
self.on_stop()
try:
if self.pid != getpid():
self.wait_stop(timeout=timeout)
except TimeoutError:
pass
finally:
self.terminate()
try:
if self.pid != getpid():
self.wait_stop(timeout=self._kill_timeout)
except TimeoutError:
self.logger.warning(
'The process %s is still alive after %f seconds, killing it',
self.name,
self._kill_timeout,
)
self.kill()
def on_stop(self) -> None:
"""
Handler called when the process is stopped.
It can be implemented by subclasses.
"""
@property
def should_restart(self) -> bool:
"""
:return: ``True`` if the process is marked for restart after stop,
``False`` otherwise.
"""
return self._should_restart
def mark_for_restart(self):
"""
Marks the process for restart after stop.
"""
self._should_restart = True
@abstractmethod
def main(self):
"""
The main function of the process.
It must be implemented by subclasses.
"""
def _main(self):
"""
Wrapper for the main function of the process.
"""
self._should_restart = False
return self.main()
@override
def run(self) -> None:
"""
Executes the process.
"""
super().run()
try:
self._main()
finally:
self._should_stop.set()
self.logger.info('Process terminated')

View file

@ -0,0 +1,17 @@
from ._runner import ApplicationRunner
def main(*args: str):
"""
Main application entry point.
This is usually the entry point that you want to use to start your
application, rather than :meth:`platypush.app.main`, as this entry point
wraps the main application in a controllable process.
"""
app_runner = ApplicationRunner()
app_runner.run(*args)
__all__ = ["ApplicationRunner", "main"]

View file

@ -0,0 +1,5 @@
import sys
from . import main
main(sys.argv[1:])

59
platypush/runner/_app.py Normal file
View file

@ -0,0 +1,59 @@
import logging
import os
import signal
import subprocess
import sys
from typing_extensions import override
from platypush.process import ControllableProcess
class ApplicationProcess(ControllableProcess):
"""
Controllable process wrapper interface for the main application.
"""
def __init__(self, *args: str, pidfile: str, **kwargs):
super().__init__(name='platypush', **kwargs)
self.logger = logging.getLogger('platypush')
self.args = args
self.pidfile = pidfile
def __enter__(self) -> "ApplicationProcess":
self.start()
return self
def __exit__(self, *_, **__):
self.stop()
@override
def main(self):
self.logger.info('Starting application...')
with subprocess.Popen(
['python', '-m', 'platypush.app', *self.args],
stdin=sys.stdin,
stdout=sys.stdout,
stderr=sys.stderr,
) as app:
try:
app.wait()
except KeyboardInterrupt:
pass
@override
def on_stop(self):
try:
with open(self.pidfile, 'r') as f:
pid = int(f.read().strip())
except (OSError, ValueError):
pid = None
if not pid:
return
try:
os.kill(pid, signal.SIGINT)
except OSError:
pass

View file

@ -0,0 +1,91 @@
import logging
import sys
from threading import Thread
from typing import Optional
from platypush.cli import parse_cmdline
from platypush.commands import CommandStream
from ._app import ApplicationProcess
class ApplicationRunner:
"""
Runner for the main application.
It wraps the main application and provides an interface to control it
externally.
"""
_default_timeout = 0.5
def __init__(self):
logging.basicConfig(level=logging.INFO, stream=sys.stdout)
self.logger = logging.getLogger('platypush:runner')
self._proc: Optional[ApplicationProcess] = None
def _listen(self, stream: CommandStream):
"""
Listens for external application commands and executes them.
"""
while self.is_running:
cmd = stream.read(timeout=self._default_timeout)
if not cmd:
continue
self.logger.info(cmd)
Thread(target=cmd, args=(self,)).start()
def _print_version(self):
from platypush import __version__
print(__version__)
sys.exit(0)
def _run(self, *args: str) -> None:
parsed_args = parse_cmdline(args)
if parsed_args.version:
self._print_version()
while True:
with (
CommandStream(parsed_args.ctrl_sock) as stream,
ApplicationProcess(
*args, pidfile=parsed_args.pidfile, timeout=self._default_timeout
) as self._proc,
):
try:
self._listen(stream)
except KeyboardInterrupt:
pass
if self.should_restart:
self.logger.info('Restarting application...')
continue
break
def run(self, *args: str) -> None:
try:
self._run(*args)
finally:
self._proc = None
def stop(self):
if self._proc is not None:
self._proc.stop()
def restart(self):
if self._proc is not None:
self._proc.mark_for_restart()
self.stop()
@property
def is_running(self) -> bool:
return bool(self._proc and self._proc.is_alive())
@property
def should_restart(self) -> bool:
return self._proc.should_restart if self._proc is not None else False

View file

@ -14,7 +14,9 @@ import socket
import ssl
import urllib.request
from threading import Lock as TLock
from typing import Generator, Optional, Tuple, Union
from tempfile import gettempdir
import time
from typing import Generator, Optional, Tuple, Type, Union
from dateutil import parser, tz
from redis import Redis
@ -625,4 +627,23 @@ def get_lock(
lock.release()
def get_default_pid_file() -> str:
"""
Get the default PID file path.
"""
return os.path.join(gettempdir(), 'platypush.pid')
def get_remaining_timeout(
timeout: Optional[float], start: float, cls: Union[Type[int], Type[float]] = float
) -> Optional[Union[int, float]]:
"""
Get the remaining timeout, given a start time.
"""
if timeout is None:
return None
return cls(max(0, timeout - (time.time() - start)))
# vim:sw=4:ts=4:et:

View file

@ -11,6 +11,7 @@ frozendict
marshmallow
marshmallow_dataclass
paho-mqtt
psutil
python-dateutil
python-magic
pyyaml

View file

@ -68,6 +68,7 @@ setup(
'frozendict',
'marshmallow',
'marshmallow_dataclass',
'psutil',
'python-dateutil',
'python-magic',
'pyyaml',
@ -224,7 +225,7 @@ setup(
# Support for Graphite integration
'graphite': ['graphyte'],
# Support for CPU and memory monitoring and info
'sys': ['py-cpuinfo', 'psutil'],
'sys': ['py-cpuinfo'],
# Support for nmap integration
'nmap': ['python-nmap'],
# Support for zigbee2mqtt

View file

@ -45,7 +45,7 @@ def app():
yield _app
logging.info('Stopping Platypush test service')
_app.stop_app()
_app.stop()
clear_loggers()
db = (Config.get('main.db') or {}).get('engine', '')[len('sqlite:///') :]