[#272] Support for external stop/restart control on the application #273

Merged
blacklight merged 18 commits from 272/external-proc-controller into master 2023-08-15 21:34:44 +02:00
34 changed files with 1142 additions and 243 deletions

View File

@ -66,11 +66,11 @@ steps:
image: python:3.11-alpine image: python:3.11-alpine
commands: commands:
- apk add --update --no-cache redis - apk add --update --no-cache redis
- apk add --update --no-cache --virtual build-base g++ rust - apk add --update --no-cache --virtual build-base g++ rust linux-headers
- pip install -U pip - pip install -U pip
- pip install . - pip install .
- pip install -r requirements-tests.txt - pip install -r requirements-tests.txt
- apk del build-base g++ rust - apk del build-base g++ rust linux-headers
- pytest tests - pytest tests
- name: build-ui - name: build-ui

View File

@ -0,0 +1,5 @@
``application``
===============
.. automodule:: platypush.plugins.application
:members:

View File

@ -8,6 +8,7 @@ Plugins
platypush/plugins/adafruit.io.rst platypush/plugins/adafruit.io.rst
platypush/plugins/alarm.rst platypush/plugins/alarm.rst
platypush/plugins/application.rst
platypush/plugins/arduino.rst platypush/plugins/arduino.rst
platypush/plugins/assistant.echo.rst platypush/plugins/assistant.echo.rst
platypush/plugins/assistant.google.rst platypush/plugins/assistant.google.rst

View File

@ -7,10 +7,10 @@ FROM python:3.11-alpine
RUN mkdir -p /install /app RUN mkdir -p /install /app
COPY . /install COPY . /install
RUN apk add --update --no-cache redis RUN apk add --update --no-cache redis
RUN apk add --update --no-cache --virtual build-base g++ rust RUN apk add --update --no-cache --virtual build-base g++ rust linux-headers
RUN pip install -U pip RUN pip install -U pip
RUN cd /install && pip install . RUN cd /install && pip install .
RUN apk del build-base g++ rust RUN apk del build-base g++ rust linux-headers
EXPOSE 8008 EXPOSE 8008
VOLUME /app/config VOLUME /app/config

View File

@ -5,12 +5,13 @@ Platypush
.. license: MIT .. license: MIT
""" """
from .app import Application, main from .app import Application
from .config import Config from .config import Config
from .context import get_backend, get_bus, get_plugin from .context import get_backend, get_bus, get_plugin
from .message.event import Event from .message.event import Event
from .message.request import Request from .message.request import Request
from .message.response import Response from .message.response import Response
from .runner import main
__author__ = 'Fabio Manganiello <fabio@manganiello.tech>' __author__ = 'Fabio Manganiello <fabio@manganiello.tech>'

View File

@ -1,10 +1,5 @@
import sys import sys
from platypush.app import main from platypush.runner import main
main(*sys.argv[1:])
if __name__ == '__main__':
main(*sys.argv[1:])
# vim:sw=4:ts=4:et:

View File

@ -0,0 +1,4 @@
from ._app import Application, main
__all__ = ["Application", "main"]

View File

@ -0,0 +1,7 @@
import sys
from ._app import main
if __name__ == '__main__':
sys.exit(main(*sys.argv[1:]))

View File

@ -1,23 +1,26 @@
import argparse from contextlib import contextmanager
import logging import logging
import os import os
import signal
import subprocess import subprocess
import sys import sys
from typing import Optional from typing import Optional, Sequence
from .bus import Bus from platypush.bus import Bus
from .bus.redis import RedisBus from platypush.bus.redis import RedisBus
from .config import Config from platypush.cli import parse_cmdline
from .context import register_backends, register_plugins from platypush.commands import CommandStream
from .cron.scheduler import CronScheduler from platypush.config import Config
from .entities import init_entities_engine, EntitiesEngine from platypush.context import register_backends, register_plugins
from .event.processor import EventProcessor from platypush.cron.scheduler import CronScheduler
from .logger import Logger from platypush.entities import init_entities_engine, EntitiesEngine
from .message.event import Event from platypush.event.processor import EventProcessor
from .message.event.application import ApplicationStartedEvent from platypush.logger import Logger
from .message.request import Request from platypush.message.event import Event
from .message.response import Response from platypush.message.event.application import ApplicationStartedEvent
from .utils import get_enabled_plugins, get_redis_conf from platypush.message.request import Request
from platypush.message.response import Response
from platypush.utils import get_enabled_plugins, get_redis_conf
log = logging.getLogger('platypush') log = logging.getLogger('platypush')
@ -25,9 +28,6 @@ log = logging.getLogger('platypush')
class Application: class Application:
"""Main class for the Platypush application.""" """Main class for the Platypush application."""
# Default bus queue name
_default_redis_queue = 'platypush/bus'
# Default Redis port # Default Redis port
_default_redis_port = 6379 _default_redis_port = 6379
@ -51,6 +51,7 @@ class Application:
start_redis: bool = False, start_redis: bool = False,
redis_host: Optional[str] = None, redis_host: Optional[str] = None,
redis_port: Optional[int] = None, redis_port: Optional[int] = None,
ctrl_sock: Optional[str] = None,
): ):
""" """
:param config_file: Configuration file override (default: None). :param config_file: Configuration file override (default: None).
@ -82,21 +83,22 @@ class Application:
the settings in the ``redis`` section of the configuration file. the settings in the ``redis`` section of the configuration file.
:param redis_port: Port of the local Redis server. It overrides the :param redis_port: Port of the local Redis server. It overrides the
settings in the ``redis`` section of the configuration file. settings in the ``redis`` section of the configuration file.
:param ctrl_sock: If set, it identifies a path to a UNIX domain socket
that the application can use to send control messages (e.g. STOP
and RESTART) to its parent.
""" """
self.pidfile = pidfile self.pidfile = pidfile
if pidfile:
with open(pidfile, 'w') as f:
f.write(str(os.getpid()))
self.bus: Optional[Bus] = None self.bus: Optional[Bus] = None
self.redis_queue = redis_queue or self._default_redis_queue self.redis_queue = redis_queue or RedisBus.DEFAULT_REDIS_QUEUE
self.config_file = config_file self.config_file = config_file
self._verbose = verbose self._verbose = verbose
self._logsdir = ( self._logsdir = (
os.path.abspath(os.path.expanduser(logsdir)) if logsdir else None os.path.abspath(os.path.expanduser(logsdir)) if logsdir else None
) )
Config.init(self.config_file) Config.init(self.config_file)
Config.set('ctrl_sock', ctrl_sock)
if workdir: if workdir:
Config.set('workdir', os.path.abspath(os.path.expanduser(workdir))) Config.set('workdir', os.path.abspath(os.path.expanduser(workdir)))
@ -113,6 +115,7 @@ class Application:
self.redis_port = redis_port self.redis_port = redis_port
self.redis_conf = {} self.redis_conf = {}
self._redis_proc: Optional[subprocess.Popen] = None self._redis_proc: Optional[subprocess.Popen] = None
self.cmd_stream = CommandStream(ctrl_sock)
self._init_bus() self._init_bus()
self._init_logging() self._init_logging()
@ -187,133 +190,11 @@ class Application:
self._redis_proc = None self._redis_proc = None
@classmethod @classmethod
def build(cls, *args: str): def from_cmdline(cls, args: Sequence[str]) -> "Application":
""" """
Build the app from command line arguments. Build the app from command line arguments.
""" """
from . import __version__ opts = parse_cmdline(args)
parser = argparse.ArgumentParser()
parser.add_argument(
'--config',
'-c',
dest='config',
required=False,
default=None,
help='Custom location for the configuration file',
)
parser.add_argument(
'--workdir',
'-w',
dest='workdir',
required=False,
default=None,
help='Custom working directory to be used for the application',
)
parser.add_argument(
'--logsdir',
'-l',
dest='logsdir',
required=False,
default=None,
help='Store logs in the specified directory. By default, the '
'`[logging.]filename` configuration option will be used. If not '
'set, logging will be sent to stdout and stderr.',
)
parser.add_argument(
'--version',
dest='version',
required=False,
action='store_true',
help="Print the current version and exit",
)
parser.add_argument(
'--verbose',
'-v',
dest='verbose',
required=False,
action='store_true',
help="Enable verbose/debug logging",
)
parser.add_argument(
'--pidfile',
'-P',
dest='pidfile',
required=False,
default=None,
help="File where platypush will "
+ "store its PID, useful if you're planning to "
+ "integrate it in a service",
)
parser.add_argument(
'--no-capture-stdout',
dest='no_capture_stdout',
required=False,
action='store_true',
help="Set this flag if you have max stack depth "
+ "exceeded errors so stdout won't be captured by "
+ "the logging system",
)
parser.add_argument(
'--no-capture-stderr',
dest='no_capture_stderr',
required=False,
action='store_true',
help="Set this flag if you have max stack depth "
+ "exceeded errors so stderr won't be captured by "
+ "the logging system",
)
parser.add_argument(
'--redis-queue',
dest='redis_queue',
required=False,
default=cls._default_redis_queue,
help="Name of the Redis queue to be used to internally deliver messages "
"(default: platypush/bus)",
)
parser.add_argument(
'--start-redis',
dest='start_redis',
required=False,
action='store_true',
help="Set this flag if you want to run and manage Redis internally "
"from the app rather than using an external server. It requires the "
"redis-server executable to be present in the path",
)
parser.add_argument(
'--redis-host',
dest='redis_host',
required=False,
default=None,
help="Overrides the host specified in the redis section of the "
"configuration file",
)
parser.add_argument(
'--redis-port',
dest='redis_port',
required=False,
default=None,
help="Overrides the port specified in the redis section of the "
"configuration file",
)
opts, _ = parser.parse_known_args(args)
if opts.version:
print(__version__)
sys.exit(0)
return cls( return cls(
config_file=opts.config, config_file=opts.config,
workdir=opts.workdir, workdir=opts.workdir,
@ -326,6 +207,7 @@ class Application:
start_redis=opts.start_redis, start_redis=opts.start_redis,
redis_host=opts.redis_host, redis_host=opts.redis_host,
redis_port=opts.redis_port, redis_port=opts.redis_port,
ctrl_sock=opts.ctrl_sock,
) )
def on_message(self): def on_message(self):
@ -351,7 +233,7 @@ class Application:
self.requests_to_process self.requests_to_process
and self.processed_requests >= self.requests_to_process and self.processed_requests >= self.requests_to_process
): ):
self.stop_app() self.stop()
elif isinstance(msg, Response): elif isinstance(msg, Response):
msg.log() msg.log()
elif isinstance(msg, Event): elif isinstance(msg, Event):
@ -360,36 +242,68 @@ class Application:
return _f return _f
def stop_app(self): def stop(self):
"""Stops the backends and the bus.""" """Stops the backends and the bus."""
from .plugins import RunnablePlugin from platypush.plugins import RunnablePlugin
if self.backends: log.info('Stopping the application')
for backend in self.backends.values(): backends = (self.backends or {}).copy().values()
backend.stop() runnable_plugins = [
plugin
for plugin in get_enabled_plugins().values()
if isinstance(plugin, RunnablePlugin)
]
for plugin in get_enabled_plugins().values(): for backend in backends:
if isinstance(plugin, RunnablePlugin): backend.stop()
plugin.stop()
for plugin in runnable_plugins:
plugin.stop()
for backend in backends:
backend.wait_stop()
for plugin in runnable_plugins:
plugin.wait_stop()
if self.entities_engine:
self.entities_engine.stop()
self.entities_engine.wait_stop()
self.entities_engine = None
if self.cron_scheduler:
self.cron_scheduler.stop()
self.cron_scheduler.wait_stop()
self.cron_scheduler = None
if self.bus: if self.bus:
self.bus.stop() self.bus.stop()
self.bus = None self.bus = None
if self.cron_scheduler:
self.cron_scheduler.stop()
self.cron_scheduler = None
if self.entities_engine:
self.entities_engine.stop()
self.entities_engine = None
if self.start_redis: if self.start_redis:
self._stop_redis() self._stop_redis()
def run(self): log.info('Exiting application')
"""Start the daemon."""
from . import __version__ @contextmanager
def _open_pidfile(self):
if self.pidfile:
try:
with open(self.pidfile, 'w') as f:
f.write(str(os.getpid()))
except OSError as e:
log.warning('Failed to create PID file %s: %s', self.pidfile, e)
yield
if self.pidfile:
try:
os.remove(self.pidfile)
except OSError as e:
log.warning('Failed to remove PID file %s: %s', self.pidfile, e)
def _run(self):
from platypush import __version__
if not self.no_capture_stdout: if not self.no_capture_stdout:
sys.stdout = Logger(log.info) sys.stdout = Logger(log.info)
@ -428,16 +342,30 @@ class Application:
self.bus.poll() self.bus.poll()
except KeyboardInterrupt: except KeyboardInterrupt:
log.info('SIGINT received, terminating application') log.info('SIGINT received, terminating application')
# Ignore other SIGINT signals
signal.signal(signal.SIGINT, signal.SIG_IGN)
finally: finally:
self.stop_app() self.stop()
def run(self):
"""Run the application."""
with self._open_pidfile():
self._run()
def main(*args: str): def main(*args: str):
""" """
Application entry point. Application entry point.
""" """
app = Application.build(*args) app = Application.from_cmdline(args)
app.run()
try:
app.run()
except KeyboardInterrupt:
pass
return 0
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et:

View File

@ -6,27 +6,28 @@ import time
from threading import Thread, Event as ThreadEvent, get_ident from threading import Thread, Event as ThreadEvent, get_ident
from typing import Optional, Dict from typing import Optional, Dict
from platypush import __version__
from platypush.bus import Bus from platypush.bus import Bus
from platypush.common import ExtensionWithManifest from platypush.common import ExtensionWithManifest
from platypush.config import Config from platypush.config import Config
from platypush.context import get_backend from platypush.context import get_backend
from platypush.event import EventGenerator
from platypush.message import Message
from platypush.message.event import Event
from platypush.message.event.zeroconf import ( from platypush.message.event.zeroconf import (
ZeroconfServiceAddedEvent, ZeroconfServiceAddedEvent,
ZeroconfServiceRemovedEvent, ZeroconfServiceRemovedEvent,
) )
from platypush.utils import (
set_timeout,
clear_timeout,
get_redis_queue_name_by_message,
get_backend_name_by_class,
)
from platypush import __version__
from platypush.event import EventGenerator
from platypush.message import Message
from platypush.message.event import Event
from platypush.message.request import Request from platypush.message.request import Request
from platypush.message.response import Response from platypush.message.response import Response
from platypush.utils import (
clear_timeout,
get_backend_name_by_class,
get_redis,
get_redis_queue_name_by_message,
get_remaining_timeout,
set_timeout,
)
class Backend(Thread, EventGenerator, ExtensionWithManifest): class Backend(Thread, EventGenerator, ExtensionWithManifest):
@ -68,6 +69,7 @@ class Backend(Thread, EventGenerator, ExtensionWithManifest):
self.device_id = Config.get('device_id') self.device_id = Config.get('device_id')
self.thread_id = None self.thread_id = None
self._stop_event = ThreadEvent() self._stop_event = ThreadEvent()
self._stop_thread: Optional[Thread] = None
self._kwargs = kwargs self._kwargs = kwargs
self.logger = logging.getLogger( self.logger = logging.getLogger(
'platypush:backend:' + get_backend_name_by_class(self.__class__) 'platypush:backend:' + get_backend_name_by_class(self.__class__)
@ -299,30 +301,38 @@ class Backend(Thread, EventGenerator, ExtensionWithManifest):
self._stop_event.set() self._stop_event.set()
self.unregister_service() self.unregister_service()
self.on_stop() self.on_stop()
self._stop_thread = None
Thread(target=_async_stop).start() if not (self._stop_thread and self._stop_thread.is_alive()):
self._stop_thread = Thread(target=_async_stop)
self._stop_thread.start()
def should_stop(self): def should_stop(self):
"""
:return: True if the backend thread should be stopped, False otherwise.
"""
return self._stop_event.is_set() return self._stop_event.is_set()
def wait_stop(self, timeout=None) -> bool: def wait_stop(self, timeout=None) -> bool:
return self._stop_event.wait(timeout) """
Waits for the backend thread to stop.
def _get_redis(self): :param timeout: The maximum time to wait for the backend thread to stop (default: None)
import redis :return: True if the backend thread has stopped, False otherwise.
"""
start = time.time()
redis_backend = get_backend('redis') if self._stop_thread:
if not redis_backend: try:
self.logger.warning( self._stop_thread.join(
'Redis backend not configured - some ' get_remaining_timeout(timeout=timeout, start=start)
'web server features may not be working properly' )
) except AttributeError:
redis_args = {} pass
else:
redis_args = redis_backend.redis_args
redis = redis.Redis(**redis_args) return self._stop_event.wait(
return redis get_remaining_timeout(timeout=timeout, start=start)
)
def get_message_response(self, msg): def get_message_response(self, msg):
queue = get_redis_queue_name_by_message(msg) queue = get_redis_queue_name_by_message(msg)
@ -331,7 +341,7 @@ class Backend(Thread, EventGenerator, ExtensionWithManifest):
return None return None
try: try:
redis = self._get_redis() redis = get_redis()
response = redis.blpop(queue, timeout=60) response = redis.blpop(queue, timeout=60)
if response and len(response) > 1: if response and len(response) > 1:
response = Message.build(response[1]) response = Message.build(response[1])

View File

@ -2,13 +2,17 @@ import asyncio
import os import os
import pathlib import pathlib
import secrets import secrets
import signal
import threading import threading
from functools import partial
from multiprocessing import Process from multiprocessing import Process
from time import time from time import time
from typing import List, Mapping, Optional from typing import Mapping, Optional
from tornado.httpserver import HTTPServer
import psutil
from tornado.httpserver import HTTPServer
from tornado.netutil import bind_sockets from tornado.netutil import bind_sockets
from tornado.process import cpu_count, fork_processes from tornado.process import cpu_count, fork_processes
from tornado.wsgi import WSGIContainer from tornado.wsgi import WSGIContainer
@ -18,9 +22,9 @@ from platypush.backend import Backend
from platypush.backend.http.app import application from platypush.backend.http.app import application
from platypush.backend.http.app.utils import get_streaming_routes, get_ws_routes from platypush.backend.http.app.utils import get_streaming_routes, get_ws_routes
from platypush.backend.http.app.ws.events import WSEventProxy from platypush.backend.http.app.ws.events import WSEventProxy
from platypush.bus.redis import RedisBus from platypush.bus.redis import RedisBus
from platypush.config import Config from platypush.config import Config
from platypush.utils import get_remaining_timeout
class HttpBackend(Backend): class HttpBackend(Backend):
@ -191,6 +195,9 @@ class HttpBackend(Backend):
_DEFAULT_HTTP_PORT = 8008 _DEFAULT_HTTP_PORT = 8008
"""The default listen port for the webserver.""" """The default listen port for the webserver."""
_STOP_TIMEOUT = 5
"""How long we should wait (in seconds) before killing the worker processes."""
def __init__( def __init__(
self, self,
port: int = _DEFAULT_HTTP_PORT, port: int = _DEFAULT_HTTP_PORT,
@ -227,7 +234,6 @@ class HttpBackend(Backend):
self.port = port self.port = port
self._server_proc: Optional[Process] = None self._server_proc: Optional[Process] = None
self._workers: List[Process] = []
self._service_registry_thread = None self._service_registry_thread = None
self.bind_address = bind_address self.bind_address = bind_address
@ -254,35 +260,37 @@ class HttpBackend(Backend):
"""On backend stop""" """On backend stop"""
super().on_stop() super().on_stop()
self.logger.info('Received STOP event on HttpBackend') self.logger.info('Received STOP event on HttpBackend')
start = time()
start_time = time() remaining_time: partial[float] = partial( # type: ignore
timeout = 5 get_remaining_timeout, timeout=self._STOP_TIMEOUT, start=start
workers = self._workers.copy() )
for i, worker in enumerate(workers[::-1]):
if worker and worker.is_alive():
worker.terminate()
worker.join(timeout=max(0, start_time + timeout - time()))
if worker and worker.is_alive():
worker.kill()
self._workers.pop(i)
if self._server_proc: if self._server_proc:
self._server_proc.terminate() if self._server_proc.pid:
self._server_proc.join(timeout=5) try:
self._server_proc = None os.kill(self._server_proc.pid, signal.SIGINT)
except OSError:
pass
if self._server_proc and self._server_proc.is_alive():
self._server_proc.join(timeout=remaining_time() / 2)
try:
self._server_proc.terminate()
self._server_proc.join(timeout=remaining_time() / 2)
except AttributeError:
pass
if self._server_proc and self._server_proc.is_alive(): if self._server_proc and self._server_proc.is_alive():
self._server_proc.kill() self._server_proc.kill()
self._server_proc = None self._server_proc = None
self.logger.info('HTTP server terminated')
if self._service_registry_thread and self._service_registry_thread.is_alive(): if self._service_registry_thread and self._service_registry_thread.is_alive():
self._service_registry_thread.join(timeout=5) self._service_registry_thread.join(timeout=remaining_time())
self._service_registry_thread = None self._service_registry_thread = None
self.logger.info('HTTP server terminated')
def notify_web_clients(self, event): def notify_web_clients(self, event):
"""Notify all the connected web clients (over websocket) of a new event""" """Notify all the connected web clients (over websocket) of a new event"""
WSEventProxy.publish(event) # noqa: E1120 WSEventProxy.publish(event) # noqa: E1120
@ -344,7 +352,10 @@ class HttpBackend(Backend):
try: try:
await asyncio.Event().wait() await asyncio.Event().wait()
except (asyncio.CancelledError, KeyboardInterrupt): except (asyncio.CancelledError, KeyboardInterrupt):
return pass
finally:
server.stop()
await server.close_all_connections()
def _web_server_proc(self): def _web_server_proc(self):
self.logger.info( self.logger.info(
@ -371,7 +382,65 @@ class HttpBackend(Backend):
future = self._post_fork_main(sockets) future = self._post_fork_main(sockets)
asyncio.run(future) asyncio.run(future)
except (asyncio.CancelledError, KeyboardInterrupt): except (asyncio.CancelledError, KeyboardInterrupt):
return pass
finally:
self._stop_workers()
def _stop_workers(self):
"""
Stop all the worker processes.
We have to run this manually on server termination because of a
long-standing issue with Tornado not being able to wind down the forked
workers when the server terminates:
https://github.com/tornadoweb/tornado/issues/1912.
"""
parent_pid = (
self._server_proc.pid
if self._server_proc and self._server_proc.pid
else None
)
if not parent_pid:
return
try:
cur_proc = psutil.Process(parent_pid)
except psutil.NoSuchProcess:
return
# Send a SIGTERM to all the children
children = cur_proc.children()
for child in children:
if child.pid != parent_pid and child.is_running():
try:
os.kill(child.pid, signal.SIGTERM)
except OSError as e:
self.logger.warning(
'Could not send SIGTERM to PID %d: %s', child.pid, e
)
# Initialize the timeout
start = time()
remaining_time: partial[int] = partial( # type: ignore
get_remaining_timeout, timeout=self._STOP_TIMEOUT, start=start, cls=int
)
# Wait for all children to terminate (with timeout)
for child in children:
if child.pid != parent_pid and child.is_running():
try:
child.wait(timeout=remaining_time())
except TimeoutError:
pass
# Send a SIGKILL to any child process that is still running
for child in children:
if child.pid != parent_pid and child.is_running():
try:
child.kill()
except OSError:
pass
def _start_web_server(self): def _start_web_server(self):
self._server_proc = Process(target=self._web_server_proc) self._server_proc = Process(target=self._web_server_proc)

View File

@ -1,6 +1,6 @@
import logging import logging
import threading import threading
from typing import Optional from typing import Final, Optional
from platypush.bus import Bus from platypush.bus import Bus
from platypush.message import Message from platypush.message import Message
@ -13,7 +13,7 @@ class RedisBus(Bus):
Overrides the in-process in-memory local bus with a Redis bus Overrides the in-process in-memory local bus with a Redis bus
""" """
_DEFAULT_REDIS_QUEUE = 'platypush/bus' DEFAULT_REDIS_QUEUE: Final[str] = 'platypush/bus'
def __init__(self, *args, on_message=None, redis_queue=None, **kwargs): def __init__(self, *args, on_message=None, redis_queue=None, **kwargs):
from platypush.utils import get_redis from platypush.utils import get_redis
@ -21,7 +21,7 @@ class RedisBus(Bus):
super().__init__(on_message=on_message) super().__init__(on_message=on_message)
self.redis = get_redis(*args, **kwargs) self.redis = get_redis(*args, **kwargs)
self.redis_args = kwargs self.redis_args = kwargs
self.redis_queue = redis_queue or self._DEFAULT_REDIS_QUEUE self.redis_queue = redis_queue or self.DEFAULT_REDIS_QUEUE
self.on_message = on_message self.on_message = on_message
self.thread_id = threading.get_ident() self.thread_id = threading.get_ident()

139
platypush/cli.py Normal file
View File

@ -0,0 +1,139 @@
import argparse
from typing import Sequence
from platypush.bus.redis import RedisBus
from platypush.utils import get_default_pid_file
def parse_cmdline(args: Sequence[str]) -> argparse.Namespace:
"""
Parse command-line arguments from a list of strings.
"""
parser = argparse.ArgumentParser()
parser.add_argument(
'--config',
'-c',
dest='config',
required=False,
default=None,
help='Custom location for the configuration file',
)
parser.add_argument(
'--workdir',
'-w',
dest='workdir',
required=False,
default=None,
help='Custom working directory to be used for the application',
)
parser.add_argument(
'--logsdir',
'-l',
dest='logsdir',
required=False,
default=None,
help='Store logs in the specified directory. By default, the '
'`[logging.]filename` configuration option will be used. If not '
'set, logging will be sent to stdout and stderr.',
)
parser.add_argument(
'--version',
dest='version',
required=False,
action='store_true',
help="Print the current version and exit",
)
parser.add_argument(
'--verbose',
'-v',
dest='verbose',
required=False,
action='store_true',
help="Enable verbose/debug logging",
)
parser.add_argument(
'--pidfile',
'-P',
dest='pidfile',
required=False,
default=get_default_pid_file(),
help="File where platypush will "
+ "store its PID, useful if you're planning to "
+ f"integrate it in a service (default: {get_default_pid_file()})",
)
parser.add_argument(
'--no-capture-stdout',
dest='no_capture_stdout',
required=False,
action='store_true',
help="Set this flag if you have max stack depth "
+ "exceeded errors so stdout won't be captured by "
+ "the logging system",
)
parser.add_argument(
'--no-capture-stderr',
dest='no_capture_stderr',
required=False,
action='store_true',
help="Set this flag if you have max stack depth "
+ "exceeded errors so stderr won't be captured by "
+ "the logging system",
)
parser.add_argument(
'--redis-queue',
dest='redis_queue',
required=False,
default=RedisBus.DEFAULT_REDIS_QUEUE,
help="Name of the Redis queue to be used to internally deliver messages "
f"(default: {RedisBus.DEFAULT_REDIS_QUEUE})",
)
parser.add_argument(
'--start-redis',
dest='start_redis',
required=False,
action='store_true',
help="Set this flag if you want to run and manage Redis internally "
"from the app rather than using an external server. It requires the "
"redis-server executable to be present in the path",
)
parser.add_argument(
'--redis-host',
dest='redis_host',
required=False,
default=None,
help="Overrides the host specified in the redis section of the "
"configuration file",
)
parser.add_argument(
'--redis-port',
dest='redis_port',
required=False,
default=None,
help="Overrides the port specified in the redis section of the "
"configuration file",
)
parser.add_argument(
'--ctrl-sock',
dest='ctrl_sock',
required=False,
default=None,
help="If set, it identifies a path to a UNIX domain socket that "
"the application can use to send control messages (e.g. STOP and "
"RESTART) to its parent.",
)
opts, _ = parser.parse_known_args(args)
return opts

View File

@ -0,0 +1,5 @@
from ._base import Command
from ._commands import RestartCommand, StopCommand
from ._stream import CommandStream
__all__ = ["Command", "CommandStream", "RestartCommand", "StopCommand"]

View File

@ -0,0 +1,78 @@
from abc import ABC, abstractmethod
import json
from logging import getLogger, Logger
class Command(ABC):
"""
Base class for application commands.
"""
END_OF_COMMAND = b'\x00'
"""End-of-command marker."""
def __init__(self, **args) -> None:
self.args = args
@property
def logger(self) -> Logger:
"""
The command class logger.
"""
return getLogger(self.__class__.__name__)
@abstractmethod
def __call__(self, app, *_, **__):
"""
Execute the command.
"""
raise NotImplementedError()
def __str__(self) -> str:
"""
:return: A JSON representation of the command.
"""
return json.dumps(
{
'type': 'command',
'command': self.__class__.__name__,
'args': self.args,
}
)
def to_bytes(self):
"""
:return: A JSON representation of the command.
"""
return str(self).encode('utf-8') + self.END_OF_COMMAND
@classmethod
def parse(cls, data: bytes) -> "Command":
"""
:param data: A JSON representation of the command.
:raise ValueError: If the data is invalid.
:return: The command instance or None if the data is invalid.
"""
import platypush.commands
try:
json_data = json.loads(data.decode('utf-8'))
except json.JSONDecodeError as e:
raise ValueError from e
kind = json_data.pop('type', None)
if kind != 'command':
raise ValueError(f'Invalid command type: {kind}')
command_name = json_data.get('command')
if not command_name:
raise ValueError(f'Invalid command name: {command_name}')
cmd_class = getattr(platypush.commands, command_name, None)
if not (cmd_class and issubclass(cmd_class, Command)):
raise ValueError(f'Invalid command class: {command_name}')
try:
return cmd_class(**json_data.get('args', {}))
except Exception as e:
raise ValueError(e) from e

View File

@ -0,0 +1,2 @@
# flake8: noqa
from ._app_ctrl import *

View File

@ -0,0 +1,25 @@
from typing_extensions import override
from platypush.commands import Command
class StopCommand(Command):
"""
Stop the application.
"""
@override
def __call__(self, app, *_, **__):
self.logger.info('Received StopApplication command.')
app.stop()
class RestartCommand(Command):
"""
Restart the application.
"""
@override
def __call__(self, app, *_, **__):
self.logger.info('Received RestartApplication command.')
app.restart()

View File

@ -0,0 +1,69 @@
from logging import getLogger
from socket import socket
from typing import Optional
from platypush.commands import Command
# pylint: disable=too-few-public-methods
class CommandReader:
"""
Reads command objects from file-like I/O objects.
"""
_max_bufsize = 8192
"""Maximum size of a command that can be queued in the stream."""
_bufsize = 1024
"""Size of the buffer used to read commands from the socket."""
def __init__(self):
self.logger = getLogger(__name__)
self._buf = bytes()
def _parse_command(self, data: bytes) -> Optional[Command]:
"""
Parses a command from the received data.
:param data: Data received from the socket
:return: The parsed command
"""
try:
return Command.parse(data)
except ValueError as e:
self.logger.warning('Error while parsing command: %s', e)
return None
def read(self, sock: socket) -> Optional[Command]:
"""
Parses the next command from the file-like I/O object.
:param fp: The file-like I/O object to read from.
:return: The parsed command.
"""
try:
data = sock.recv(self._bufsize)
except OSError as e:
self.logger.warning(
'Error while reading from socket %s: %s', sock.getsockname(), e
)
return None
for ch in data:
if bytes([ch]) == Command.END_OF_COMMAND:
cmd = self._parse_command(self._buf)
self._buf = bytes()
if cmd:
return cmd
elif len(self._buf) >= self._max_bufsize:
self.logger.warning(
'The received command is too long: length=%d', len(self._buf)
)
self._buf = bytes()
break
else:
self._buf += bytes([ch])
return None

View File

@ -0,0 +1,139 @@
from multiprocessing import Queue
import os
from queue import Empty
import socket
import tempfile
from typing import Optional
from typing_extensions import override
from platypush.process import ControllableProcess
from ._base import Command
from ._reader import CommandReader
from ._writer import CommandWriter
class CommandStream(ControllableProcess):
"""
The command stream is an abstraction built around a UNIX socket that allows
the application to communicate commands to its controller.
:param path: Path to the UNIX socket
"""
_max_queue_size = 100
"""Maximum number of commands that can be queued in the stream."""
_default_sock_path = tempfile.gettempdir() + '/platypush-cmd-stream.sock'
"""Default path to the UNIX socket."""
_close_timeout = 5.0
"""Close the client socket after this amount of seconds."""
def __init__(self, path: Optional[str] = None):
super().__init__(name='platypush:cmd:stream')
self.path = os.path.abspath(os.path.expanduser(path or self._default_sock_path))
self._sock: Optional[socket.socket] = None
self._cmd_queue: Queue["Command"] = Queue()
def reset(self):
if self._sock is not None:
try:
self._sock.close()
except socket.error:
pass
self._sock = None
try:
os.unlink(self.path)
except FileNotFoundError:
pass
self._cmd_queue.close()
self._cmd_queue = Queue()
@override
def close(self) -> None:
self.reset()
return super().close()
def __enter__(self) -> "CommandStream":
self.reset()
sock = self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.bind(self.path)
os.chmod(self.path, 0o600)
sock.listen(1)
self.start()
return self
def __exit__(self, *_, **__):
self.terminate()
self.join()
self.close()
def _serve(self, sock: socket.socket):
"""
Serves the command stream.
:param sock: Client socket to serve
"""
reader = CommandReader()
cmd = reader.read(sock)
if cmd:
self._cmd_queue.put_nowait(cmd)
def read(self, timeout: Optional[float] = None) -> Optional[Command]:
"""
Reads commands from the command stream.
:param timeout: Maximum time to wait for a command.
:return: The command that was received, if any.
"""
try:
return self._cmd_queue.get(timeout=timeout)
except Empty:
return None
def write(self, cmd: Command) -> None:
"""
Writes a command to the command stream.
:param cmd: Command to write
:raise AssertionError: If the command cannot be written
"""
sock = self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
sock.connect(self.path)
self.logger.debug('Sending command: %s', cmd)
CommandWriter().write(cmd, sock)
except OSError as e:
raise AssertionError(f'Unable to connect to socket {self.path}: {e}') from e
finally:
sock.close()
@staticmethod
def _close_client(sock: socket.socket):
try:
sock.close()
except OSError:
pass
@override
def main(self):
while self._sock and not self.should_stop:
sock = self._sock
try:
conn, _ = sock.accept()
except ConnectionResetError:
continue
except KeyboardInterrupt:
break
try:
self._serve(conn)
except Exception as e:
self.logger.warning('Unexpected socket error: %s', e, exc_info=True)
finally:
self._close_client(conn)

View File

@ -0,0 +1,25 @@
from logging import getLogger
from socket import socket
from platypush.commands import Command
# pylint: disable=too-few-public-methods
class CommandWriter:
"""
Writes command objects to file-like I/O objects.
"""
def __init__(self):
self.logger = getLogger(__name__)
def write(self, cmd: Command, sock: socket):
"""
Writes a command to a file-like I/O object.
:param cmd: The command to write.
:param fp: The file-like I/O object to write to.
"""
buf = cmd.to_bytes()
sock.sendall(buf)

View File

@ -2,13 +2,14 @@ import datetime
import enum import enum
import logging import logging
import threading import threading
from typing import Dict import time
from typing import Dict, Optional
import croniter import croniter
from dateutil.tz import gettz from dateutil.tz import gettz
from platypush.procedure import Procedure from platypush.procedure import Procedure
from platypush.utils import is_functional_cron from platypush.utils import get_remaining_timeout, is_functional_cron
logger = logging.getLogger('platypush:cron') logger = logging.getLogger('platypush:cron')
@ -198,6 +199,20 @@ class CronScheduler(threading.Thread):
def should_stop(self): def should_stop(self):
return self._should_stop.is_set() return self._should_stop.is_set()
def wait_stop(self, timeout: Optional[float] = None):
start = time.time()
stopped = self._should_stop.wait(
timeout=get_remaining_timeout(timeout=timeout, start=start)
)
if not stopped:
raise TimeoutError(
f'Timeout waiting for {self.__class__.__name__} to stop.'
)
if threading.get_ident() != self.ident:
self.join(timeout=get_remaining_timeout(timeout=timeout, start=start))
def run(self): def run(self):
logger.info('Running cron scheduler') logger.info('Running cron scheduler')

View File

@ -1,5 +1,6 @@
from logging import getLogger from logging import getLogger
from threading import Thread, Event from threading import Thread, Event, get_ident
from time import time
from typing import Dict, Optional from typing import Dict, Optional
from platypush.context import get_bus from platypush.context import get_bus
@ -9,6 +10,7 @@ from platypush.message.event.entities import EntityUpdateEvent
from platypush.entities._base import EntityKey, EntitySavedCallback from platypush.entities._base import EntityKey, EntitySavedCallback
from platypush.entities._engine.queue import EntitiesQueue from platypush.entities._engine.queue import EntitiesQueue
from platypush.entities._engine.repo import EntitiesRepository from platypush.entities._engine.repo import EntitiesRepository
from platypush.utils import get_remaining_timeout
class EntitiesEngine(Thread): class EntitiesEngine(Thread):
@ -69,6 +71,20 @@ class EntitiesEngine(Thread):
def stop(self): def stop(self):
self._should_stop.set() self._should_stop.set()
def wait_stop(self, timeout: Optional[float] = None):
start = time()
stopped = self._should_stop.wait(
timeout=get_remaining_timeout(timeout=timeout, start=start)
)
if not stopped:
raise TimeoutError(
f'Timeout waiting for {self.__class__.__name__} to stop.'
)
if get_ident() != self.ident:
self.join(timeout=get_remaining_timeout(timeout=timeout, start=start))
def notify(self, *entities: Entity): def notify(self, *entities: Entity):
""" """
Trigger an EntityUpdateEvent if the entity has been persisted, or queue Trigger an EntityUpdateEvent if the entity has been persisted, or queue

View File

@ -0,0 +1,31 @@
from typing import Optional
from platypush.commands import CommandStream, RestartCommand, StopCommand
from platypush.config import Config
from platypush.plugins import Plugin, action
class ApplicationPlugin(Plugin):
"""
This plugin is used to control and inspect the application state.
"""
@property
def _ctrl_sock(self) -> Optional[str]:
"""
:return: The path to the UNIX socket to control the application.
"""
return Config.get('ctrl_sock') # type: ignore
@action
def stop(self):
"""
Stop the application.
"""
CommandStream(self._ctrl_sock).write(StopCommand())
@action
def restart(self):
"""
Restart the application.
"""
CommandStream(self._ctrl_sock).write(RestartCommand())

View File

@ -0,0 +1,6 @@
manifest:
events: {}
install:
pip: []
package: platypush.plugins.application
type: plugin

View File

@ -3,6 +3,5 @@ manifest:
install: install:
pip: pip:
- py-cpuinfo - py-cpuinfo
- psutil
package: platypush.plugins.system package: platypush.plugins.system
type: plugin type: plugin

View File

@ -0,0 +1,135 @@
from abc import ABC, abstractmethod
import logging
from multiprocessing import Event, Process, RLock
from os import getpid
from typing import Optional
from typing_extensions import override
class ControllableProcess(Process, ABC):
"""
Extends the ``Process`` class to allow for external extended control of the
underlying process.
"""
_kill_timeout: float = 10.0
def __init__(self, *args, timeout: Optional[float] = None, **kwargs):
kwargs['name'] = kwargs.get('name', self.__class__.__name__)
super().__init__(*args, **kwargs)
self.timeout = timeout
self.logger = logging.getLogger(self.name)
self._should_stop = Event()
self._stop_lock = RLock()
self._should_restart = False
@property
def _timeout_err(self) -> TimeoutError:
return TimeoutError(f'Process {self.name} timed out')
@property
def should_stop(self) -> bool:
"""
:return: ``True`` if the process is scheduled for stop, ``False``
otherwise.
"""
return self._should_stop.is_set()
def wait_stop(self, timeout: Optional[float] = None) -> None:
"""
Waits for the process to stop.
:param timeout: The maximum time to wait for the process to stop.
"""
timeout = timeout if timeout is not None else self.timeout
stopped = self._should_stop.wait(timeout=timeout)
if not stopped:
raise self._timeout_err
if self.pid == getpid():
return # Prevent termination deadlock
self.join(timeout=timeout)
if self.is_alive():
raise self._timeout_err
def stop(self, timeout: Optional[float] = None) -> None:
"""
Stops the process.
:param timeout: The maximum time to wait for the process to stop.
"""
timeout = timeout if timeout is not None else self._kill_timeout
with self._stop_lock:
self._should_stop.set()
self.on_stop()
try:
if self.pid != getpid():
self.wait_stop(timeout=timeout)
except TimeoutError:
pass
finally:
self.terminate()
try:
if self.pid != getpid():
self.wait_stop(timeout=self._kill_timeout)
except TimeoutError:
self.logger.warning(
'The process %s is still alive after %f seconds, killing it',
self.name,
self._kill_timeout,
)
self.kill()
def on_stop(self) -> None:
"""
Handler called when the process is stopped.
It can be implemented by subclasses.
"""
@property
def should_restart(self) -> bool:
"""
:return: ``True`` if the process is marked for restart after stop,
``False`` otherwise.
"""
return self._should_restart
def mark_for_restart(self):
"""
Marks the process for restart after stop.
"""
self._should_restart = True
@abstractmethod
def main(self):
"""
The main function of the process.
It must be implemented by subclasses.
"""
def _main(self):
"""
Wrapper for the main function of the process.
"""
self._should_restart = False
return self.main()
@override
def run(self) -> None:
"""
Executes the process.
"""
super().run()
try:
self._main()
finally:
self._should_stop.set()
self.logger.info('Process terminated')

View File

@ -0,0 +1,17 @@
from ._runner import ApplicationRunner
def main(*args: str):
"""
Main application entry point.
This is usually the entry point that you want to use to start your
application, rather than :meth:`platypush.app.main`, as this entry point
wraps the main application in a controllable process.
"""
app_runner = ApplicationRunner()
app_runner.run(*args)
__all__ = ["ApplicationRunner", "main"]

View File

@ -0,0 +1,5 @@
import sys
from . import main
main(sys.argv[1:])

59
platypush/runner/_app.py Normal file
View File

@ -0,0 +1,59 @@
import logging
import os
import signal
import subprocess
import sys
from typing_extensions import override
from platypush.process import ControllableProcess
class ApplicationProcess(ControllableProcess):
"""
Controllable process wrapper interface for the main application.
"""
def __init__(self, *args: str, pidfile: str, **kwargs):
super().__init__(name='platypush', **kwargs)
self.logger = logging.getLogger('platypush')
self.args = args
self.pidfile = pidfile
def __enter__(self) -> "ApplicationProcess":
self.start()
return self
def __exit__(self, *_, **__):
self.stop()
@override
def main(self):
self.logger.info('Starting application...')
with subprocess.Popen(
['python', '-m', 'platypush.app', *self.args],
stdin=sys.stdin,
stdout=sys.stdout,
stderr=sys.stderr,
) as app:
try:
app.wait()
except KeyboardInterrupt:
pass
@override
def on_stop(self):
try:
with open(self.pidfile, 'r') as f:
pid = int(f.read().strip())
except (OSError, ValueError):
pid = None
if not pid:
return
try:
os.kill(pid, signal.SIGINT)
except OSError:
pass

View File

@ -0,0 +1,91 @@
import logging
import sys
from threading import Thread
from typing import Optional
from platypush.cli import parse_cmdline
from platypush.commands import CommandStream
from ._app import ApplicationProcess
class ApplicationRunner:
"""
Runner for the main application.
It wraps the main application and provides an interface to control it
externally.
"""
_default_timeout = 0.5
def __init__(self):
logging.basicConfig(level=logging.INFO, stream=sys.stdout)
self.logger = logging.getLogger('platypush:runner')
self._proc: Optional[ApplicationProcess] = None
def _listen(self, stream: CommandStream):
"""
Listens for external application commands and executes them.
"""
while self.is_running:
cmd = stream.read(timeout=self._default_timeout)
if not cmd:
continue
self.logger.info(cmd)
Thread(target=cmd, args=(self,)).start()
def _print_version(self):
from platypush import __version__
print(__version__)
sys.exit(0)
def _run(self, *args: str) -> None:
parsed_args = parse_cmdline(args)
if parsed_args.version:
self._print_version()
while True:
with (
CommandStream(parsed_args.ctrl_sock) as stream,
ApplicationProcess(
*args, pidfile=parsed_args.pidfile, timeout=self._default_timeout
) as self._proc,
):
try:
self._listen(stream)
except KeyboardInterrupt:
pass
if self.should_restart:
self.logger.info('Restarting application...')
continue
break
def run(self, *args: str) -> None:
try:
self._run(*args)
finally:
self._proc = None
def stop(self):
if self._proc is not None:
self._proc.stop()
def restart(self):
if self._proc is not None:
self._proc.mark_for_restart()
self.stop()
@property
def is_running(self) -> bool:
return bool(self._proc and self._proc.is_alive())
@property
def should_restart(self) -> bool:
return self._proc.should_restart if self._proc is not None else False

View File

@ -14,7 +14,9 @@ import socket
import ssl import ssl
import urllib.request import urllib.request
from threading import Lock as TLock from threading import Lock as TLock
from typing import Generator, Optional, Tuple, Union from tempfile import gettempdir
import time
from typing import Generator, Optional, Tuple, Type, Union
from dateutil import parser, tz from dateutil import parser, tz
from redis import Redis from redis import Redis
@ -625,4 +627,23 @@ def get_lock(
lock.release() lock.release()
def get_default_pid_file() -> str:
"""
Get the default PID file path.
"""
return os.path.join(gettempdir(), 'platypush.pid')
def get_remaining_timeout(
timeout: Optional[float], start: float, cls: Union[Type[int], Type[float]] = float
) -> Optional[Union[int, float]]:
"""
Get the remaining timeout, given a start time.
"""
if timeout is None:
return None
return cls(max(0, timeout - (time.time() - start)))
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et:

View File

@ -11,6 +11,7 @@ frozendict
marshmallow marshmallow
marshmallow_dataclass marshmallow_dataclass
paho-mqtt paho-mqtt
psutil
python-dateutil python-dateutil
python-magic python-magic
pyyaml pyyaml

View File

@ -68,6 +68,7 @@ setup(
'frozendict', 'frozendict',
'marshmallow', 'marshmallow',
'marshmallow_dataclass', 'marshmallow_dataclass',
'psutil',
'python-dateutil', 'python-dateutil',
'python-magic', 'python-magic',
'pyyaml', 'pyyaml',
@ -224,7 +225,7 @@ setup(
# Support for Graphite integration # Support for Graphite integration
'graphite': ['graphyte'], 'graphite': ['graphyte'],
# Support for CPU and memory monitoring and info # Support for CPU and memory monitoring and info
'sys': ['py-cpuinfo', 'psutil'], 'sys': ['py-cpuinfo'],
# Support for nmap integration # Support for nmap integration
'nmap': ['python-nmap'], 'nmap': ['python-nmap'],
# Support for zigbee2mqtt # Support for zigbee2mqtt

View File

@ -45,7 +45,7 @@ def app():
yield _app yield _app
logging.info('Stopping Platypush test service') logging.info('Stopping Platypush test service')
_app.stop_app() _app.stop()
clear_loggers() clear_loggers()
db = (Config.get('main.db') or {}).get('engine', '')[len('sqlite:///') :] db = (Config.get('main.db') or {}).get('engine', '')[len('sqlite:///') :]