diff --git a/.drone.yml b/.drone.yml
index 9c9f70fbb..45b8102cf 100644
--- a/.drone.yml
+++ b/.drone.yml
@@ -66,11 +66,11 @@ steps:
   image: python:3.11-alpine
   commands:
     - apk add --update --no-cache redis
-    - apk add --update --no-cache --virtual build-base g++ rust
+    - apk add --update --no-cache --virtual build-base g++ rust linux-headers
     - pip install -U pip
     - pip install .
     - pip install -r requirements-tests.txt
-    - apk del build-base g++ rust
+    - apk del build-base g++ rust linux-headers
     - pytest tests
 
 - name: build-ui
diff --git a/docs/source/platypush/plugins/application.rst b/docs/source/platypush/plugins/application.rst
new file mode 100644
index 000000000..5c0d4696b
--- /dev/null
+++ b/docs/source/platypush/plugins/application.rst
@@ -0,0 +1,5 @@
+``application``
+===============
+
+.. automodule:: platypush.plugins.application
+    :members:
diff --git a/docs/source/plugins.rst b/docs/source/plugins.rst
index cadf898e7..86a68d5d3 100644
--- a/docs/source/plugins.rst
+++ b/docs/source/plugins.rst
@@ -8,6 +8,7 @@ Plugins
 
     platypush/plugins/adafruit.io.rst
     platypush/plugins/alarm.rst
+    platypush/plugins/application.rst
     platypush/plugins/arduino.rst
     platypush/plugins/assistant.echo.rst
     platypush/plugins/assistant.google.rst
diff --git a/examples/docker/Dockerfile b/examples/docker/Dockerfile
index 56ef34b18..374ffbdc5 100644
--- a/examples/docker/Dockerfile
+++ b/examples/docker/Dockerfile
@@ -7,10 +7,10 @@ FROM python:3.11-alpine
 RUN mkdir -p /install /app
 COPY . /install
 RUN apk add --update --no-cache redis
-RUN apk add --update --no-cache --virtual build-base g++ rust
+RUN apk add --update --no-cache --virtual build-base g++ rust linux-headers
 RUN pip install -U pip
 RUN cd /install && pip install .
-RUN apk del build-base g++ rust
+RUN apk del build-base g++ rust linux-headers
 
 EXPOSE 8008
 VOLUME /app/config
diff --git a/platypush/__init__.py b/platypush/__init__.py
index f4e9ecb92..ede8fee4b 100644
--- a/platypush/__init__.py
+++ b/platypush/__init__.py
@@ -5,12 +5,13 @@ Platypush
 .. license: MIT
 """
 
-from .app import Application, main
+from .app import Application
 from .config import Config
 from .context import get_backend, get_bus, get_plugin
 from .message.event import Event
 from .message.request import Request
 from .message.response import Response
+from .runner import main
 
 
 __author__ = 'Fabio Manganiello <fabio@manganiello.tech>'
diff --git a/platypush/__main__.py b/platypush/__main__.py
index 2b4e69089..737e265b7 100644
--- a/platypush/__main__.py
+++ b/platypush/__main__.py
@@ -1,10 +1,5 @@
 import sys
 
-from platypush.app import main
+from platypush.runner import main
 
-
-if __name__ == '__main__':
-    main(*sys.argv[1:])
-
-
-# vim:sw=4:ts=4:et:
+main(*sys.argv[1:])
diff --git a/platypush/app/__init__.py b/platypush/app/__init__.py
new file mode 100644
index 000000000..4bd528318
--- /dev/null
+++ b/platypush/app/__init__.py
@@ -0,0 +1,4 @@
+from ._app import Application, main
+
+
+__all__ = ["Application", "main"]
diff --git a/platypush/app/__main__.py b/platypush/app/__main__.py
new file mode 100644
index 000000000..ed2e42208
--- /dev/null
+++ b/platypush/app/__main__.py
@@ -0,0 +1,7 @@
+import sys
+
+from ._app import main
+
+
+if __name__ == '__main__':
+    sys.exit(main(*sys.argv[1:]))
diff --git a/platypush/app.py b/platypush/app/_app.py
similarity index 64%
rename from platypush/app.py
rename to platypush/app/_app.py
index cea035c72..e95d0db3f 100644
--- a/platypush/app.py
+++ b/platypush/app/_app.py
@@ -1,23 +1,26 @@
-import argparse
+from contextlib import contextmanager
 import logging
 import os
+import signal
 import subprocess
 import sys
-from typing import Optional
+from typing import Optional, Sequence
 
-from .bus import Bus
-from .bus.redis import RedisBus
-from .config import Config
-from .context import register_backends, register_plugins
-from .cron.scheduler import CronScheduler
-from .entities import init_entities_engine, EntitiesEngine
-from .event.processor import EventProcessor
-from .logger import Logger
-from .message.event import Event
-from .message.event.application import ApplicationStartedEvent
-from .message.request import Request
-from .message.response import Response
-from .utils import get_enabled_plugins, get_redis_conf
+from platypush.bus import Bus
+from platypush.bus.redis import RedisBus
+from platypush.cli import parse_cmdline
+from platypush.commands import CommandStream
+from platypush.config import Config
+from platypush.context import register_backends, register_plugins
+from platypush.cron.scheduler import CronScheduler
+from platypush.entities import init_entities_engine, EntitiesEngine
+from platypush.event.processor import EventProcessor
+from platypush.logger import Logger
+from platypush.message.event import Event
+from platypush.message.event.application import ApplicationStartedEvent
+from platypush.message.request import Request
+from platypush.message.response import Response
+from platypush.utils import get_enabled_plugins, get_redis_conf
 
 log = logging.getLogger('platypush')
 
@@ -25,9 +28,6 @@ log = logging.getLogger('platypush')
 class Application:
     """Main class for the Platypush application."""
 
-    # Default bus queue name
-    _default_redis_queue = 'platypush/bus'
-
     # Default Redis port
     _default_redis_port = 6379
 
@@ -51,6 +51,7 @@ class Application:
         start_redis: bool = False,
         redis_host: Optional[str] = None,
         redis_port: Optional[int] = None,
+        ctrl_sock: Optional[str] = None,
     ):
         """
         :param config_file: Configuration file override (default: None).
@@ -82,21 +83,22 @@ class Application:
             the settings in the ``redis`` section of the configuration file.
         :param redis_port: Port of the local Redis server. It overrides the
             settings in the ``redis`` section of the configuration file.
+        :param ctrl_sock: If set, it identifies a path to a UNIX domain socket
+            that the application can use to send control messages (e.g. STOP
+            and RESTART) to its parent.
         """
 
         self.pidfile = pidfile
-        if pidfile:
-            with open(pidfile, 'w') as f:
-                f.write(str(os.getpid()))
-
         self.bus: Optional[Bus] = None
-        self.redis_queue = redis_queue or self._default_redis_queue
+        self.redis_queue = redis_queue or RedisBus.DEFAULT_REDIS_QUEUE
         self.config_file = config_file
         self._verbose = verbose
         self._logsdir = (
             os.path.abspath(os.path.expanduser(logsdir)) if logsdir else None
         )
+
         Config.init(self.config_file)
+        Config.set('ctrl_sock', ctrl_sock)
 
         if workdir:
             Config.set('workdir', os.path.abspath(os.path.expanduser(workdir)))
@@ -113,6 +115,7 @@ class Application:
         self.redis_port = redis_port
         self.redis_conf = {}
         self._redis_proc: Optional[subprocess.Popen] = None
+        self.cmd_stream = CommandStream(ctrl_sock)
 
         self._init_bus()
         self._init_logging()
@@ -187,133 +190,11 @@ class Application:
             self._redis_proc = None
 
     @classmethod
-    def build(cls, *args: str):
+    def from_cmdline(cls, args: Sequence[str]) -> "Application":
         """
         Build the app from command line arguments.
         """
-        from . import __version__
-
-        parser = argparse.ArgumentParser()
-
-        parser.add_argument(
-            '--config',
-            '-c',
-            dest='config',
-            required=False,
-            default=None,
-            help='Custom location for the configuration file',
-        )
-
-        parser.add_argument(
-            '--workdir',
-            '-w',
-            dest='workdir',
-            required=False,
-            default=None,
-            help='Custom working directory to be used for the application',
-        )
-
-        parser.add_argument(
-            '--logsdir',
-            '-l',
-            dest='logsdir',
-            required=False,
-            default=None,
-            help='Store logs in the specified directory. By default, the '
-            '`[logging.]filename` configuration option will be used. If not '
-            'set, logging will be sent to stdout and stderr.',
-        )
-
-        parser.add_argument(
-            '--version',
-            dest='version',
-            required=False,
-            action='store_true',
-            help="Print the current version and exit",
-        )
-
-        parser.add_argument(
-            '--verbose',
-            '-v',
-            dest='verbose',
-            required=False,
-            action='store_true',
-            help="Enable verbose/debug logging",
-        )
-
-        parser.add_argument(
-            '--pidfile',
-            '-P',
-            dest='pidfile',
-            required=False,
-            default=None,
-            help="File where platypush will "
-            + "store its PID, useful if you're planning to "
-            + "integrate it in a service",
-        )
-
-        parser.add_argument(
-            '--no-capture-stdout',
-            dest='no_capture_stdout',
-            required=False,
-            action='store_true',
-            help="Set this flag if you have max stack depth "
-            + "exceeded errors so stdout won't be captured by "
-            + "the logging system",
-        )
-
-        parser.add_argument(
-            '--no-capture-stderr',
-            dest='no_capture_stderr',
-            required=False,
-            action='store_true',
-            help="Set this flag if you have max stack depth "
-            + "exceeded errors so stderr won't be captured by "
-            + "the logging system",
-        )
-
-        parser.add_argument(
-            '--redis-queue',
-            dest='redis_queue',
-            required=False,
-            default=cls._default_redis_queue,
-            help="Name of the Redis queue to be used to internally deliver messages "
-            "(default: platypush/bus)",
-        )
-
-        parser.add_argument(
-            '--start-redis',
-            dest='start_redis',
-            required=False,
-            action='store_true',
-            help="Set this flag if you want to run and manage Redis internally "
-            "from the app rather than using an external server. It requires the "
-            "redis-server executable to be present in the path",
-        )
-
-        parser.add_argument(
-            '--redis-host',
-            dest='redis_host',
-            required=False,
-            default=None,
-            help="Overrides the host specified in the redis section of the "
-            "configuration file",
-        )
-
-        parser.add_argument(
-            '--redis-port',
-            dest='redis_port',
-            required=False,
-            default=None,
-            help="Overrides the port specified in the redis section of the "
-            "configuration file",
-        )
-
-        opts, _ = parser.parse_known_args(args)
-        if opts.version:
-            print(__version__)
-            sys.exit(0)
-
+        opts = parse_cmdline(args)
         return cls(
             config_file=opts.config,
             workdir=opts.workdir,
@@ -326,6 +207,7 @@ class Application:
             start_redis=opts.start_redis,
             redis_host=opts.redis_host,
             redis_port=opts.redis_port,
+            ctrl_sock=opts.ctrl_sock,
         )
 
     def on_message(self):
@@ -351,7 +233,7 @@ class Application:
                     self.requests_to_process
                     and self.processed_requests >= self.requests_to_process
                 ):
-                    self.stop_app()
+                    self.stop()
             elif isinstance(msg, Response):
                 msg.log()
             elif isinstance(msg, Event):
@@ -360,36 +242,68 @@ class Application:
 
         return _f
 
-    def stop_app(self):
+    def stop(self):
         """Stops the backends and the bus."""
-        from .plugins import RunnablePlugin
+        from platypush.plugins import RunnablePlugin
 
-        if self.backends:
-            for backend in self.backends.values():
-                backend.stop()
+        log.info('Stopping the application')
+        backends = (self.backends or {}).copy().values()
+        runnable_plugins = [
+            plugin
+            for plugin in get_enabled_plugins().values()
+            if isinstance(plugin, RunnablePlugin)
+        ]
 
-        for plugin in get_enabled_plugins().values():
-            if isinstance(plugin, RunnablePlugin):
-                plugin.stop()
+        for backend in backends:
+            backend.stop()
+
+        for plugin in runnable_plugins:
+            plugin.stop()
+
+        for backend in backends:
+            backend.wait_stop()
+
+        for plugin in runnable_plugins:
+            plugin.wait_stop()
+
+        if self.entities_engine:
+            self.entities_engine.stop()
+            self.entities_engine.wait_stop()
+            self.entities_engine = None
+
+        if self.cron_scheduler:
+            self.cron_scheduler.stop()
+            self.cron_scheduler.wait_stop()
+            self.cron_scheduler = None
 
         if self.bus:
             self.bus.stop()
             self.bus = None
 
-        if self.cron_scheduler:
-            self.cron_scheduler.stop()
-            self.cron_scheduler = None
-
-        if self.entities_engine:
-            self.entities_engine.stop()
-            self.entities_engine = None
-
         if self.start_redis:
             self._stop_redis()
 
-    def run(self):
-        """Start the daemon."""
-        from . import __version__
+        log.info('Exiting application')
+
+    @contextmanager
+    def _open_pidfile(self):
+        if self.pidfile:
+            try:
+                with open(self.pidfile, 'w') as f:
+                    f.write(str(os.getpid()))
+            except OSError as e:
+                log.warning('Failed to create PID file %s: %s', self.pidfile, e)
+
+        yield
+
+        if self.pidfile:
+            try:
+                os.remove(self.pidfile)
+            except OSError as e:
+                log.warning('Failed to remove PID file %s: %s', self.pidfile, e)
+
+    def _run(self):
+        from platypush import __version__
 
         if not self.no_capture_stdout:
             sys.stdout = Logger(log.info)
@@ -428,16 +342,30 @@ class Application:
             self.bus.poll()
         except KeyboardInterrupt:
             log.info('SIGINT received, terminating application')
+            # Ignore other SIGINT signals
+            signal.signal(signal.SIGINT, signal.SIG_IGN)
         finally:
-            self.stop_app()
+            self.stop()
+
+    def run(self):
+        """Run the application."""
+
+        with self._open_pidfile():
+            self._run()
 
 
 def main(*args: str):
     """
     Application entry point.
     """
-    app = Application.build(*args)
-    app.run()
+    app = Application.from_cmdline(args)
+
+    try:
+        app.run()
+    except KeyboardInterrupt:
+        pass
+
+    return 0
 
 
 # vim:sw=4:ts=4:et:
diff --git a/platypush/backend/__init__.py b/platypush/backend/__init__.py
index 1afc4385b..c8b0ea65d 100644
--- a/platypush/backend/__init__.py
+++ b/platypush/backend/__init__.py
@@ -6,27 +6,28 @@ import time
 from threading import Thread, Event as ThreadEvent, get_ident
 from typing import Optional, Dict
 
+from platypush import __version__
 from platypush.bus import Bus
 from platypush.common import ExtensionWithManifest
 from platypush.config import Config
 from platypush.context import get_backend
+from platypush.event import EventGenerator
+from platypush.message import Message
+from platypush.message.event import Event
 from platypush.message.event.zeroconf import (
     ZeroconfServiceAddedEvent,
     ZeroconfServiceRemovedEvent,
 )
-from platypush.utils import (
-    set_timeout,
-    clear_timeout,
-    get_redis_queue_name_by_message,
-    get_backend_name_by_class,
-)
-
-from platypush import __version__
-from platypush.event import EventGenerator
-from platypush.message import Message
-from platypush.message.event import Event
 from platypush.message.request import Request
 from platypush.message.response import Response
+from platypush.utils import (
+    clear_timeout,
+    get_backend_name_by_class,
+    get_redis,
+    get_redis_queue_name_by_message,
+    get_remaining_timeout,
+    set_timeout,
+)
 
 
 class Backend(Thread, EventGenerator, ExtensionWithManifest):
@@ -68,6 +69,7 @@ class Backend(Thread, EventGenerator, ExtensionWithManifest):
         self.device_id = Config.get('device_id')
         self.thread_id = None
         self._stop_event = ThreadEvent()
+        self._stop_thread: Optional[Thread] = None
         self._kwargs = kwargs
         self.logger = logging.getLogger(
             'platypush:backend:' + get_backend_name_by_class(self.__class__)
@@ -299,30 +301,38 @@ class Backend(Thread, EventGenerator, ExtensionWithManifest):
             self._stop_event.set()
             self.unregister_service()
             self.on_stop()
+            self._stop_thread = None
 
-        Thread(target=_async_stop).start()
+        if not (self._stop_thread and self._stop_thread.is_alive()):
+            self._stop_thread = Thread(target=_async_stop)
+            self._stop_thread.start()
 
     def should_stop(self):
+        """
+        :return: True if the backend thread should be stopped, False otherwise.
+        """
         return self._stop_event.is_set()
 
     def wait_stop(self, timeout=None) -> bool:
-        return self._stop_event.wait(timeout)
+        """
+        Waits for the backend thread to stop.
 
-    def _get_redis(self):
-        import redis
+        :param timeout: The maximum time to wait for the backend thread to stop (default: None)
+        :return: True if the backend thread has stopped, False otherwise.
+        """
+        start = time.time()
 
-        redis_backend = get_backend('redis')
-        if not redis_backend:
-            self.logger.warning(
-                'Redis backend not configured - some '
-                'web server features may not be working properly'
-            )
-            redis_args = {}
-        else:
-            redis_args = redis_backend.redis_args
+        if self._stop_thread:
+            try:
+                self._stop_thread.join(
+                    get_remaining_timeout(timeout=timeout, start=start)
+                )
+            except AttributeError:
+                pass
 
-        redis = redis.Redis(**redis_args)
-        return redis
+        return self._stop_event.wait(
+            get_remaining_timeout(timeout=timeout, start=start)
+        )
 
     def get_message_response(self, msg):
         queue = get_redis_queue_name_by_message(msg)
@@ -331,7 +341,7 @@ class Backend(Thread, EventGenerator, ExtensionWithManifest):
             return None
 
         try:
-            redis = self._get_redis()
+            redis = get_redis()
             response = redis.blpop(queue, timeout=60)
             if response and len(response) > 1:
                 response = Message.build(response[1])
diff --git a/platypush/backend/http/__init__.py b/platypush/backend/http/__init__.py
index 4df3ce8a7..6bda90657 100644
--- a/platypush/backend/http/__init__.py
+++ b/platypush/backend/http/__init__.py
@@ -2,13 +2,17 @@ import asyncio
 import os
 import pathlib
 import secrets
+import signal
 import threading
 
+from functools import partial
 from multiprocessing import Process
 from time import time
-from typing import List, Mapping, Optional
-from tornado.httpserver import HTTPServer
+from typing import Mapping, Optional
 
+import psutil
+
+from tornado.httpserver import HTTPServer
 from tornado.netutil import bind_sockets
 from tornado.process import cpu_count, fork_processes
 from tornado.wsgi import WSGIContainer
@@ -18,9 +22,9 @@ from platypush.backend import Backend
 from platypush.backend.http.app import application
 from platypush.backend.http.app.utils import get_streaming_routes, get_ws_routes
 from platypush.backend.http.app.ws.events import WSEventProxy
-
 from platypush.bus.redis import RedisBus
 from platypush.config import Config
+from platypush.utils import get_remaining_timeout
 
 
 class HttpBackend(Backend):
@@ -191,6 +195,9 @@ class HttpBackend(Backend):
     _DEFAULT_HTTP_PORT = 8008
     """The default listen port for the webserver."""
 
+    _STOP_TIMEOUT = 5
+    """How long we should wait (in seconds) before killing the worker processes."""
+
     def __init__(
         self,
         port: int = _DEFAULT_HTTP_PORT,
@@ -227,7 +234,6 @@ class HttpBackend(Backend):
 
         self.port = port
         self._server_proc: Optional[Process] = None
-        self._workers: List[Process] = []
         self._service_registry_thread = None
         self.bind_address = bind_address
 
@@ -254,35 +260,37 @@ class HttpBackend(Backend):
         """On backend stop"""
         super().on_stop()
         self.logger.info('Received STOP event on HttpBackend')
-
-        start_time = time()
-        timeout = 5
-        workers = self._workers.copy()
-
-        for i, worker in enumerate(workers[::-1]):
-            if worker and worker.is_alive():
-                worker.terminate()
-                worker.join(timeout=max(0, start_time + timeout - time()))
-
-            if worker and worker.is_alive():
-                worker.kill()
-                self._workers.pop(i)
+        start = time()
+        remaining_time: partial[float] = partial(  # type: ignore
+            get_remaining_timeout, timeout=self._STOP_TIMEOUT, start=start
+        )
 
         if self._server_proc:
-            self._server_proc.terminate()
-            self._server_proc.join(timeout=5)
-            self._server_proc = None
+            if self._server_proc.pid:
+                try:
+                    os.kill(self._server_proc.pid, signal.SIGINT)
+                except OSError:
+                    pass
+
+            if self._server_proc and self._server_proc.is_alive():
+                self._server_proc.join(timeout=remaining_time() / 2)
+                try:
+                    self._server_proc.terminate()
+                    self._server_proc.join(timeout=remaining_time() / 2)
+                except AttributeError:
+                    pass
 
         if self._server_proc and self._server_proc.is_alive():
             self._server_proc.kill()
 
         self._server_proc = None
-        self.logger.info('HTTP server terminated')
 
         if self._service_registry_thread and self._service_registry_thread.is_alive():
-            self._service_registry_thread.join(timeout=5)
+            self._service_registry_thread.join(timeout=remaining_time())
             self._service_registry_thread = None
 
+        self.logger.info('HTTP server terminated')
+
     def notify_web_clients(self, event):
         """Notify all the connected web clients (over websocket) of a new event"""
         WSEventProxy.publish(event)  # noqa: E1120
@@ -344,7 +352,10 @@ class HttpBackend(Backend):
         try:
             await asyncio.Event().wait()
         except (asyncio.CancelledError, KeyboardInterrupt):
-            return
+            pass
+        finally:
+            server.stop()
+            await server.close_all_connections()
 
     def _web_server_proc(self):
         self.logger.info(
@@ -371,7 +382,65 @@ class HttpBackend(Backend):
                 future = self._post_fork_main(sockets)
                 asyncio.run(future)
             except (asyncio.CancelledError, KeyboardInterrupt):
-                return
+                pass
+            finally:
+                self._stop_workers()
+
+    def _stop_workers(self):
+        """
+        Stop all the worker processes.
+
+        We have to run this manually on server termination because of a
+        long-standing issue with Tornado not being able to wind down the forked
+        workers when the server terminates:
+        https://github.com/tornadoweb/tornado/issues/1912.
+        """
+        parent_pid = (
+            self._server_proc.pid
+            if self._server_proc and self._server_proc.pid
+            else None
+        )
+
+        if not parent_pid:
+            return
+
+        try:
+            cur_proc = psutil.Process(parent_pid)
+        except psutil.NoSuchProcess:
+            return
+
+        # Send a SIGTERM to all the children
+        children = cur_proc.children()
+        for child in children:
+            if child.pid != parent_pid and child.is_running():
+                try:
+                    os.kill(child.pid, signal.SIGTERM)
+                except OSError as e:
+                    self.logger.warning(
+                        'Could not send SIGTERM to PID %d: %s', child.pid, e
+                    )
+
+        # Initialize the timeout
+        start = time()
+        remaining_time: partial[int] = partial(  # type: ignore
+            get_remaining_timeout, timeout=self._STOP_TIMEOUT, start=start, cls=int
+        )
+
+        # Wait for all children to terminate (with timeout)
+        for child in children:
+            if child.pid != parent_pid and child.is_running():
+                try:
+                    child.wait(timeout=remaining_time())
+                except TimeoutError:
+                    pass
+
+        # Send a SIGKILL to any child process that is still running
+        for child in children:
+            if child.pid != parent_pid and child.is_running():
+                try:
+                    child.kill()
+                except OSError:
+                    pass
 
     def _start_web_server(self):
         self._server_proc = Process(target=self._web_server_proc)
diff --git a/platypush/bus/redis.py b/platypush/bus/redis.py
index e8273a126..83ded8aff 100644
--- a/platypush/bus/redis.py
+++ b/platypush/bus/redis.py
@@ -1,6 +1,6 @@
 import logging
 import threading
-from typing import Optional
+from typing import Final, Optional
 
 from platypush.bus import Bus
 from platypush.message import Message
@@ -13,7 +13,7 @@ class RedisBus(Bus):
     Overrides the in-process in-memory local bus with a Redis bus
     """
 
-    _DEFAULT_REDIS_QUEUE = 'platypush/bus'
+    DEFAULT_REDIS_QUEUE: Final[str] = 'platypush/bus'
 
     def __init__(self, *args, on_message=None, redis_queue=None, **kwargs):
         from platypush.utils import get_redis
@@ -21,7 +21,7 @@ class RedisBus(Bus):
         super().__init__(on_message=on_message)
         self.redis = get_redis(*args, **kwargs)
         self.redis_args = kwargs
-        self.redis_queue = redis_queue or self._DEFAULT_REDIS_QUEUE
+        self.redis_queue = redis_queue or self.DEFAULT_REDIS_QUEUE
         self.on_message = on_message
         self.thread_id = threading.get_ident()
 
diff --git a/platypush/cli.py b/platypush/cli.py
new file mode 100644
index 000000000..9864c16be
--- /dev/null
+++ b/platypush/cli.py
@@ -0,0 +1,139 @@
+import argparse
+from typing import Sequence
+
+from platypush.bus.redis import RedisBus
+from platypush.utils import get_default_pid_file
+
+
+def parse_cmdline(args: Sequence[str]) -> argparse.Namespace:
+    """
+    Parse command-line arguments from a list of strings.
+    """
+    parser = argparse.ArgumentParser()
+
+    parser.add_argument(
+        '--config',
+        '-c',
+        dest='config',
+        required=False,
+        default=None,
+        help='Custom location for the configuration file',
+    )
+
+    parser.add_argument(
+        '--workdir',
+        '-w',
+        dest='workdir',
+        required=False,
+        default=None,
+        help='Custom working directory to be used for the application',
+    )
+
+    parser.add_argument(
+        '--logsdir',
+        '-l',
+        dest='logsdir',
+        required=False,
+        default=None,
+        help='Store logs in the specified directory. By default, the '
+        '`[logging.]filename` configuration option will be used. If not '
+        'set, logging will be sent to stdout and stderr.',
+    )
+
+    parser.add_argument(
+        '--version',
+        dest='version',
+        required=False,
+        action='store_true',
+        help="Print the current version and exit",
+    )
+
+    parser.add_argument(
+        '--verbose',
+        '-v',
+        dest='verbose',
+        required=False,
+        action='store_true',
+        help="Enable verbose/debug logging",
+    )
+
+    parser.add_argument(
+        '--pidfile',
+        '-P',
+        dest='pidfile',
+        required=False,
+        default=get_default_pid_file(),
+        help="File where platypush will "
+        + "store its PID, useful if you're planning to "
+        + f"integrate it in a service (default: {get_default_pid_file()})",
+    )
+
+    parser.add_argument(
+        '--no-capture-stdout',
+        dest='no_capture_stdout',
+        required=False,
+        action='store_true',
+        help="Set this flag if you have max stack depth "
+        + "exceeded errors so stdout won't be captured by "
+        + "the logging system",
+    )
+
+    parser.add_argument(
+        '--no-capture-stderr',
+        dest='no_capture_stderr',
+        required=False,
+        action='store_true',
+        help="Set this flag if you have max stack depth "
+        + "exceeded errors so stderr won't be captured by "
+        + "the logging system",
+    )
+
+    parser.add_argument(
+        '--redis-queue',
+        dest='redis_queue',
+        required=False,
+        default=RedisBus.DEFAULT_REDIS_QUEUE,
+        help="Name of the Redis queue to be used to internally deliver messages "
+        f"(default: {RedisBus.DEFAULT_REDIS_QUEUE})",
+    )
+
+    parser.add_argument(
+        '--start-redis',
+        dest='start_redis',
+        required=False,
+        action='store_true',
+        help="Set this flag if you want to run and manage Redis internally "
+        "from the app rather than using an external server. It requires the "
+        "redis-server executable to be present in the path",
+    )
+
+    parser.add_argument(
+        '--redis-host',
+        dest='redis_host',
+        required=False,
+        default=None,
+        help="Overrides the host specified in the redis section of the "
+        "configuration file",
+    )
+
+    parser.add_argument(
+        '--redis-port',
+        dest='redis_port',
+        required=False,
+        default=None,
+        help="Overrides the port specified in the redis section of the "
+        "configuration file",
+    )
+
+    parser.add_argument(
+        '--ctrl-sock',
+        dest='ctrl_sock',
+        required=False,
+        default=None,
+        help="If set, it identifies a path to a UNIX domain socket that "
+        "the application can use to send control messages (e.g. STOP and "
+        "RESTART) to its parent.",
+    )
+
+    opts, _ = parser.parse_known_args(args)
+    return opts
diff --git a/platypush/commands/__init__.py b/platypush/commands/__init__.py
new file mode 100644
index 000000000..73a4eaf42
--- /dev/null
+++ b/platypush/commands/__init__.py
@@ -0,0 +1,5 @@
+from ._base import Command
+from ._commands import RestartCommand, StopCommand
+from ._stream import CommandStream
+
+__all__ = ["Command", "CommandStream", "RestartCommand", "StopCommand"]
diff --git a/platypush/commands/_base.py b/platypush/commands/_base.py
new file mode 100644
index 000000000..e02968734
--- /dev/null
+++ b/platypush/commands/_base.py
@@ -0,0 +1,78 @@
+from abc import ABC, abstractmethod
+import json
+from logging import getLogger, Logger
+
+
+class Command(ABC):
+    """
+    Base class for application commands.
+    """
+
+    END_OF_COMMAND = b'\x00'
+    """End-of-command marker."""
+
+    def __init__(self, **args) -> None:
+        self.args = args
+
+    @property
+    def logger(self) -> Logger:
+        """
+        The command class logger.
+        """
+        return getLogger(self.__class__.__name__)
+
+    @abstractmethod
+    def __call__(self, app, *_, **__):
+        """
+        Execute the command.
+        """
+        raise NotImplementedError()
+
+    def __str__(self) -> str:
+        """
+        :return: A JSON representation of the command.
+        """
+        return json.dumps(
+            {
+                'type': 'command',
+                'command': self.__class__.__name__,
+                'args': self.args,
+            }
+        )
+
+    def to_bytes(self):
+        """
+        :return: A JSON representation of the command.
+        """
+        return str(self).encode('utf-8') + self.END_OF_COMMAND
+
+    @classmethod
+    def parse(cls, data: bytes) -> "Command":
+        """
+        :param data: A JSON representation of the command.
+        :raise ValueError: If the data is invalid.
+        :return: The command instance or None if the data is invalid.
+        """
+        import platypush.commands
+
+        try:
+            json_data = json.loads(data.decode('utf-8'))
+        except json.JSONDecodeError as e:
+            raise ValueError from e
+
+        kind = json_data.pop('type', None)
+        if kind != 'command':
+            raise ValueError(f'Invalid command type: {kind}')
+
+        command_name = json_data.get('command')
+        if not command_name:
+            raise ValueError(f'Invalid command name: {command_name}')
+
+        cmd_class = getattr(platypush.commands, command_name, None)
+        if not (cmd_class and issubclass(cmd_class, Command)):
+            raise ValueError(f'Invalid command class: {command_name}')
+
+        try:
+            return cmd_class(**json_data.get('args', {}))
+        except Exception as e:
+            raise ValueError(e) from e
diff --git a/platypush/commands/_commands/__init__.py b/platypush/commands/_commands/__init__.py
new file mode 100644
index 000000000..22cdf7bec
--- /dev/null
+++ b/platypush/commands/_commands/__init__.py
@@ -0,0 +1,2 @@
+# flake8: noqa
+from ._app_ctrl import *
diff --git a/platypush/commands/_commands/_app_ctrl.py b/platypush/commands/_commands/_app_ctrl.py
new file mode 100644
index 000000000..780d8b15c
--- /dev/null
+++ b/platypush/commands/_commands/_app_ctrl.py
@@ -0,0 +1,25 @@
+from typing_extensions import override
+
+from platypush.commands import Command
+
+
+class StopCommand(Command):
+    """
+    Stop the application.
+    """
+
+    @override
+    def __call__(self, app, *_, **__):
+        self.logger.info('Received StopApplication command.')
+        app.stop()
+
+
+class RestartCommand(Command):
+    """
+    Restart the application.
+    """
+
+    @override
+    def __call__(self, app, *_, **__):
+        self.logger.info('Received RestartApplication command.')
+        app.restart()
diff --git a/platypush/commands/_reader.py b/platypush/commands/_reader.py
new file mode 100644
index 000000000..8eee5a3fc
--- /dev/null
+++ b/platypush/commands/_reader.py
@@ -0,0 +1,69 @@
+from logging import getLogger
+from socket import socket
+from typing import Optional
+
+from platypush.commands import Command
+
+
+# pylint: disable=too-few-public-methods
+class CommandReader:
+    """
+    Reads command objects from file-like I/O objects.
+    """
+
+    _max_bufsize = 8192
+    """Maximum size of a command that can be queued in the stream."""
+
+    _bufsize = 1024
+    """Size of the buffer used to read commands from the socket."""
+
+    def __init__(self):
+        self.logger = getLogger(__name__)
+        self._buf = bytes()
+
+    def _parse_command(self, data: bytes) -> Optional[Command]:
+        """
+        Parses a command from the received data.
+
+        :param data: Data received from the socket
+        :return: The parsed command
+        """
+        try:
+            return Command.parse(data)
+        except ValueError as e:
+            self.logger.warning('Error while parsing command: %s', e)
+            return None
+
+    def read(self, sock: socket) -> Optional[Command]:
+        """
+        Parses the next command from the file-like I/O object.
+
+        :param fp: The file-like I/O object to read from.
+        :return: The parsed command.
+        """
+        try:
+            data = sock.recv(self._bufsize)
+        except OSError as e:
+            self.logger.warning(
+                'Error while reading from socket %s: %s', sock.getsockname(), e
+            )
+            return None
+
+        for ch in data:
+            if bytes([ch]) == Command.END_OF_COMMAND:
+                cmd = self._parse_command(self._buf)
+                self._buf = bytes()
+
+                if cmd:
+                    return cmd
+            elif len(self._buf) >= self._max_bufsize:
+                self.logger.warning(
+                    'The received command is too long: length=%d', len(self._buf)
+                )
+
+                self._buf = bytes()
+                break
+            else:
+                self._buf += bytes([ch])
+
+        return None
diff --git a/platypush/commands/_stream.py b/platypush/commands/_stream.py
new file mode 100644
index 000000000..3ae185d3e
--- /dev/null
+++ b/platypush/commands/_stream.py
@@ -0,0 +1,139 @@
+from multiprocessing import Queue
+import os
+from queue import Empty
+import socket
+import tempfile
+from typing import Optional
+from typing_extensions import override
+
+from platypush.process import ControllableProcess
+
+from ._base import Command
+from ._reader import CommandReader
+from ._writer import CommandWriter
+
+
+class CommandStream(ControllableProcess):
+    """
+    The command stream is an abstraction built around a UNIX socket that allows
+    the application to communicate commands to its controller.
+
+    :param path: Path to the UNIX socket
+    """
+
+    _max_queue_size = 100
+    """Maximum number of commands that can be queued in the stream."""
+
+    _default_sock_path = tempfile.gettempdir() + '/platypush-cmd-stream.sock'
+    """Default path to the UNIX socket."""
+
+    _close_timeout = 5.0
+    """Close the client socket after this amount of seconds."""
+
+    def __init__(self, path: Optional[str] = None):
+        super().__init__(name='platypush:cmd:stream')
+        self.path = os.path.abspath(os.path.expanduser(path or self._default_sock_path))
+        self._sock: Optional[socket.socket] = None
+        self._cmd_queue: Queue["Command"] = Queue()
+
+    def reset(self):
+        if self._sock is not None:
+            try:
+                self._sock.close()
+            except socket.error:
+                pass
+
+            self._sock = None
+
+        try:
+            os.unlink(self.path)
+        except FileNotFoundError:
+            pass
+
+        self._cmd_queue.close()
+        self._cmd_queue = Queue()
+
+    @override
+    def close(self) -> None:
+        self.reset()
+        return super().close()
+
+    def __enter__(self) -> "CommandStream":
+        self.reset()
+        sock = self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+        sock.bind(self.path)
+        os.chmod(self.path, 0o600)
+        sock.listen(1)
+        self.start()
+        return self
+
+    def __exit__(self, *_, **__):
+        self.terminate()
+        self.join()
+        self.close()
+
+    def _serve(self, sock: socket.socket):
+        """
+        Serves the command stream.
+
+        :param sock: Client socket to serve
+        """
+        reader = CommandReader()
+        cmd = reader.read(sock)
+        if cmd:
+            self._cmd_queue.put_nowait(cmd)
+
+    def read(self, timeout: Optional[float] = None) -> Optional[Command]:
+        """
+        Reads commands from the command stream.
+
+        :param timeout: Maximum time to wait for a command.
+        :return: The command that was received, if any.
+        """
+        try:
+            return self._cmd_queue.get(timeout=timeout)
+        except Empty:
+            return None
+
+    def write(self, cmd: Command) -> None:
+        """
+        Writes a command to the command stream.
+
+        :param cmd: Command to write
+        :raise AssertionError: If the command cannot be written
+        """
+        sock = self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+        try:
+            sock.connect(self.path)
+            self.logger.debug('Sending command: %s', cmd)
+            CommandWriter().write(cmd, sock)
+        except OSError as e:
+            raise AssertionError(f'Unable to connect to socket {self.path}: {e}') from e
+        finally:
+            sock.close()
+
+    @staticmethod
+    def _close_client(sock: socket.socket):
+        try:
+            sock.close()
+        except OSError:
+            pass
+
+    @override
+    def main(self):
+        while self._sock and not self.should_stop:
+            sock = self._sock
+
+            try:
+                conn, _ = sock.accept()
+            except ConnectionResetError:
+                continue
+            except KeyboardInterrupt:
+                break
+
+            try:
+                self._serve(conn)
+            except Exception as e:
+                self.logger.warning('Unexpected socket error: %s', e, exc_info=True)
+            finally:
+                self._close_client(conn)
diff --git a/platypush/commands/_writer.py b/platypush/commands/_writer.py
new file mode 100644
index 000000000..cf701128d
--- /dev/null
+++ b/platypush/commands/_writer.py
@@ -0,0 +1,25 @@
+from logging import getLogger
+from socket import socket
+
+from platypush.commands import Command
+
+
+# pylint: disable=too-few-public-methods
+class CommandWriter:
+    """
+    Writes command objects to file-like I/O objects.
+    """
+
+    def __init__(self):
+        self.logger = getLogger(__name__)
+
+    def write(self, cmd: Command, sock: socket):
+        """
+        Writes a command to a file-like I/O object.
+
+        :param cmd: The command to write.
+        :param fp: The file-like I/O object to write to.
+        """
+
+        buf = cmd.to_bytes()
+        sock.sendall(buf)
diff --git a/platypush/cron/scheduler.py b/platypush/cron/scheduler.py
index 55b40a98d..b7d816fe0 100644
--- a/platypush/cron/scheduler.py
+++ b/platypush/cron/scheduler.py
@@ -2,13 +2,14 @@ import datetime
 import enum
 import logging
 import threading
-from typing import Dict
+import time
+from typing import Dict, Optional
 
 import croniter
 from dateutil.tz import gettz
 
 from platypush.procedure import Procedure
-from platypush.utils import is_functional_cron
+from platypush.utils import get_remaining_timeout, is_functional_cron
 
 logger = logging.getLogger('platypush:cron')
 
@@ -198,6 +199,20 @@ class CronScheduler(threading.Thread):
     def should_stop(self):
         return self._should_stop.is_set()
 
+    def wait_stop(self, timeout: Optional[float] = None):
+        start = time.time()
+        stopped = self._should_stop.wait(
+            timeout=get_remaining_timeout(timeout=timeout, start=start)
+        )
+
+        if not stopped:
+            raise TimeoutError(
+                f'Timeout waiting for {self.__class__.__name__} to stop.'
+            )
+
+        if threading.get_ident() != self.ident:
+            self.join(timeout=get_remaining_timeout(timeout=timeout, start=start))
+
     def run(self):
         logger.info('Running cron scheduler')
 
diff --git a/platypush/entities/_engine/__init__.py b/platypush/entities/_engine/__init__.py
index cdd908740..0f3a22f33 100644
--- a/platypush/entities/_engine/__init__.py
+++ b/platypush/entities/_engine/__init__.py
@@ -1,5 +1,6 @@
 from logging import getLogger
-from threading import Thread, Event
+from threading import Thread, Event, get_ident
+from time import time
 from typing import Dict, Optional
 
 from platypush.context import get_bus
@@ -9,6 +10,7 @@ from platypush.message.event.entities import EntityUpdateEvent
 from platypush.entities._base import EntityKey, EntitySavedCallback
 from platypush.entities._engine.queue import EntitiesQueue
 from platypush.entities._engine.repo import EntitiesRepository
+from platypush.utils import get_remaining_timeout
 
 
 class EntitiesEngine(Thread):
@@ -69,6 +71,20 @@ class EntitiesEngine(Thread):
     def stop(self):
         self._should_stop.set()
 
+    def wait_stop(self, timeout: Optional[float] = None):
+        start = time()
+        stopped = self._should_stop.wait(
+            timeout=get_remaining_timeout(timeout=timeout, start=start)
+        )
+
+        if not stopped:
+            raise TimeoutError(
+                f'Timeout waiting for {self.__class__.__name__} to stop.'
+            )
+
+        if get_ident() != self.ident:
+            self.join(timeout=get_remaining_timeout(timeout=timeout, start=start))
+
     def notify(self, *entities: Entity):
         """
         Trigger an EntityUpdateEvent if the entity has been persisted, or queue
diff --git a/platypush/plugins/application/__init__.py b/platypush/plugins/application/__init__.py
new file mode 100644
index 000000000..85a30559e
--- /dev/null
+++ b/platypush/plugins/application/__init__.py
@@ -0,0 +1,31 @@
+from typing import Optional
+from platypush.commands import CommandStream, RestartCommand, StopCommand
+from platypush.config import Config
+from platypush.plugins import Plugin, action
+
+
+class ApplicationPlugin(Plugin):
+    """
+    This plugin is used to control and inspect the application state.
+    """
+
+    @property
+    def _ctrl_sock(self) -> Optional[str]:
+        """
+        :return: The path to the UNIX socket to control the application.
+        """
+        return Config.get('ctrl_sock')  # type: ignore
+
+    @action
+    def stop(self):
+        """
+        Stop the application.
+        """
+        CommandStream(self._ctrl_sock).write(StopCommand())
+
+    @action
+    def restart(self):
+        """
+        Restart the application.
+        """
+        CommandStream(self._ctrl_sock).write(RestartCommand())
diff --git a/platypush/plugins/application/manifest.yaml b/platypush/plugins/application/manifest.yaml
new file mode 100644
index 000000000..68cddacb7
--- /dev/null
+++ b/platypush/plugins/application/manifest.yaml
@@ -0,0 +1,6 @@
+manifest:
+  events: {}
+  install:
+    pip: []
+  package: platypush.plugins.application
+  type: plugin
diff --git a/platypush/plugins/system/manifest.yaml b/platypush/plugins/system/manifest.yaml
index bf404b996..67137e13b 100644
--- a/platypush/plugins/system/manifest.yaml
+++ b/platypush/plugins/system/manifest.yaml
@@ -3,6 +3,5 @@ manifest:
   install:
     pip:
     - py-cpuinfo
-    - psutil
   package: platypush.plugins.system
   type: plugin
diff --git a/platypush/process/__init__.py b/platypush/process/__init__.py
new file mode 100644
index 000000000..582a8e7f6
--- /dev/null
+++ b/platypush/process/__init__.py
@@ -0,0 +1,135 @@
+from abc import ABC, abstractmethod
+import logging
+from multiprocessing import Event, Process, RLock
+from os import getpid
+from typing import Optional
+from typing_extensions import override
+
+
+class ControllableProcess(Process, ABC):
+    """
+    Extends the ``Process`` class to allow for external extended control of the
+    underlying process.
+    """
+
+    _kill_timeout: float = 10.0
+
+    def __init__(self, *args, timeout: Optional[float] = None, **kwargs):
+        kwargs['name'] = kwargs.get('name', self.__class__.__name__)
+        super().__init__(*args, **kwargs)
+
+        self.timeout = timeout
+        self.logger = logging.getLogger(self.name)
+        self._should_stop = Event()
+        self._stop_lock = RLock()
+        self._should_restart = False
+
+    @property
+    def _timeout_err(self) -> TimeoutError:
+        return TimeoutError(f'Process {self.name} timed out')
+
+    @property
+    def should_stop(self) -> bool:
+        """
+        :return: ``True`` if the process is scheduled for stop, ``False``
+            otherwise.
+        """
+        return self._should_stop.is_set()
+
+    def wait_stop(self, timeout: Optional[float] = None) -> None:
+        """
+        Waits for the process to stop.
+
+        :param timeout: The maximum time to wait for the process to stop.
+        """
+        timeout = timeout if timeout is not None else self.timeout
+        stopped = self._should_stop.wait(timeout=timeout)
+
+        if not stopped:
+            raise self._timeout_err
+
+        if self.pid == getpid():
+            return  # Prevent termination deadlock
+
+        self.join(timeout=timeout)
+        if self.is_alive():
+            raise self._timeout_err
+
+    def stop(self, timeout: Optional[float] = None) -> None:
+        """
+        Stops the process.
+
+        :param timeout: The maximum time to wait for the process to stop.
+        """
+        timeout = timeout if timeout is not None else self._kill_timeout
+        with self._stop_lock:
+            self._should_stop.set()
+            self.on_stop()
+
+            try:
+                if self.pid != getpid():
+                    self.wait_stop(timeout=timeout)
+            except TimeoutError:
+                pass
+            finally:
+                self.terminate()
+
+            try:
+                if self.pid != getpid():
+                    self.wait_stop(timeout=self._kill_timeout)
+            except TimeoutError:
+                self.logger.warning(
+                    'The process %s is still alive after %f seconds, killing it',
+                    self.name,
+                    self._kill_timeout,
+                )
+                self.kill()
+
+    def on_stop(self) -> None:
+        """
+        Handler called when the process is stopped.
+
+        It can be implemented by subclasses.
+        """
+
+    @property
+    def should_restart(self) -> bool:
+        """
+        :return: ``True`` if the process is marked for restart after stop,
+            ``False`` otherwise.
+        """
+        return self._should_restart
+
+    def mark_for_restart(self):
+        """
+        Marks the process for restart after stop.
+        """
+        self._should_restart = True
+
+    @abstractmethod
+    def main(self):
+        """
+        The main function of the process.
+
+        It must be implemented by subclasses.
+        """
+
+    def _main(self):
+        """
+        Wrapper for the main function of the process.
+        """
+        self._should_restart = False
+        return self.main()
+
+    @override
+    def run(self) -> None:
+        """
+        Executes the process.
+        """
+        super().run()
+
+        try:
+            self._main()
+        finally:
+            self._should_stop.set()
+            self.logger.info('Process terminated')
diff --git a/platypush/runner/__init__.py b/platypush/runner/__init__.py
new file mode 100644
index 000000000..e0c8269fd
--- /dev/null
+++ b/platypush/runner/__init__.py
@@ -0,0 +1,17 @@
+from ._runner import ApplicationRunner
+
+
+def main(*args: str):
+    """
+    Main application entry point.
+
+    This is usually the entry point that you want to use to start your
+    application, rather than :meth:`platypush.app.main`, as this entry point
+    wraps the main application in a controllable process.
+    """
+
+    app_runner = ApplicationRunner()
+    app_runner.run(*args)
+
+
+__all__ = ["ApplicationRunner", "main"]
diff --git a/platypush/runner/__main__.py b/platypush/runner/__main__.py
new file mode 100644
index 000000000..ab75238d5
--- /dev/null
+++ b/platypush/runner/__main__.py
@@ -0,0 +1,5 @@
+import sys
+
+from . import main
+
+main(sys.argv[1:])
diff --git a/platypush/runner/_app.py b/platypush/runner/_app.py
new file mode 100644
index 000000000..6ff9207be
--- /dev/null
+++ b/platypush/runner/_app.py
@@ -0,0 +1,59 @@
+import logging
+import os
+import signal
+import subprocess
+import sys
+from typing_extensions import override
+
+from platypush.process import ControllableProcess
+
+
+class ApplicationProcess(ControllableProcess):
+    """
+    Controllable process wrapper interface for the main application.
+    """
+
+    def __init__(self, *args: str, pidfile: str, **kwargs):
+        super().__init__(name='platypush', **kwargs)
+
+        self.logger = logging.getLogger('platypush')
+        self.args = args
+        self.pidfile = pidfile
+
+    def __enter__(self) -> "ApplicationProcess":
+        self.start()
+        return self
+
+    def __exit__(self, *_, **__):
+        self.stop()
+
+    @override
+    def main(self):
+        self.logger.info('Starting application...')
+
+        with subprocess.Popen(
+            ['python', '-m', 'platypush.app', *self.args],
+            stdin=sys.stdin,
+            stdout=sys.stdout,
+            stderr=sys.stderr,
+        ) as app:
+            try:
+                app.wait()
+            except KeyboardInterrupt:
+                pass
+
+    @override
+    def on_stop(self):
+        try:
+            with open(self.pidfile, 'r') as f:
+                pid = int(f.read().strip())
+        except (OSError, ValueError):
+            pid = None
+
+        if not pid:
+            return
+
+        try:
+            os.kill(pid, signal.SIGINT)
+        except OSError:
+            pass
diff --git a/platypush/runner/_runner.py b/platypush/runner/_runner.py
new file mode 100644
index 000000000..ac713c9d2
--- /dev/null
+++ b/platypush/runner/_runner.py
@@ -0,0 +1,91 @@
+import logging
+import sys
+from threading import Thread
+from typing import Optional
+
+from platypush.cli import parse_cmdline
+from platypush.commands import CommandStream
+
+from ._app import ApplicationProcess
+
+
+class ApplicationRunner:
+    """
+    Runner for the main application.
+
+    It wraps the main application and provides an interface to control it
+    externally.
+    """
+
+    _default_timeout = 0.5
+
+    def __init__(self):
+        logging.basicConfig(level=logging.INFO, stream=sys.stdout)
+        self.logger = logging.getLogger('platypush:runner')
+        self._proc: Optional[ApplicationProcess] = None
+
+    def _listen(self, stream: CommandStream):
+        """
+        Listens for external application commands and executes them.
+        """
+        while self.is_running:
+            cmd = stream.read(timeout=self._default_timeout)
+            if not cmd:
+                continue
+
+            self.logger.info(cmd)
+            Thread(target=cmd, args=(self,)).start()
+
+    def _print_version(self):
+        from platypush import __version__
+
+        print(__version__)
+        sys.exit(0)
+
+    def _run(self, *args: str) -> None:
+        parsed_args = parse_cmdline(args)
+
+        if parsed_args.version:
+            self._print_version()
+
+        while True:
+            with (
+                CommandStream(parsed_args.ctrl_sock) as stream,
+                ApplicationProcess(
+                    *args, pidfile=parsed_args.pidfile, timeout=self._default_timeout
+                ) as self._proc,
+            ):
+                try:
+                    self._listen(stream)
+                except KeyboardInterrupt:
+                    pass
+
+                if self.should_restart:
+                    self.logger.info('Restarting application...')
+                    continue
+
+                break
+
+    def run(self, *args: str) -> None:
+        try:
+            self._run(*args)
+        finally:
+            self._proc = None
+
+    def stop(self):
+        if self._proc is not None:
+            self._proc.stop()
+
+    def restart(self):
+        if self._proc is not None:
+            self._proc.mark_for_restart()
+
+        self.stop()
+
+    @property
+    def is_running(self) -> bool:
+        return bool(self._proc and self._proc.is_alive())
+
+    @property
+    def should_restart(self) -> bool:
+        return self._proc.should_restart if self._proc is not None else False
diff --git a/platypush/utils/__init__.py b/platypush/utils/__init__.py
index fd80f1e9e..20763fc90 100644
--- a/platypush/utils/__init__.py
+++ b/platypush/utils/__init__.py
@@ -14,7 +14,9 @@ import socket
 import ssl
 import urllib.request
 from threading import Lock as TLock
-from typing import Generator, Optional, Tuple, Union
+from tempfile import gettempdir
+import time
+from typing import Generator, Optional, Tuple, Type, Union
 
 from dateutil import parser, tz
 from redis import Redis
@@ -625,4 +627,23 @@ def get_lock(
             lock.release()
 
 
+def get_default_pid_file() -> str:
+    """
+    Get the default PID file path.
+    """
+    return os.path.join(gettempdir(), 'platypush.pid')
+
+
+def get_remaining_timeout(
+    timeout: Optional[float], start: float, cls: Union[Type[int], Type[float]] = float
+) -> Optional[Union[int, float]]:
+    """
+    Get the remaining timeout, given a start time.
+    """
+    if timeout is None:
+        return None
+
+    return cls(max(0, timeout - (time.time() - start)))
+
+
 # vim:sw=4:ts=4:et:
diff --git a/requirements.txt b/requirements.txt
index d61383ca5..22900c926 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -11,6 +11,7 @@ frozendict
 marshmallow
 marshmallow_dataclass
 paho-mqtt
+psutil
 python-dateutil
 python-magic
 pyyaml
diff --git a/setup.py b/setup.py
index f71f12625..e67b5713d 100755
--- a/setup.py
+++ b/setup.py
@@ -68,6 +68,7 @@ setup(
         'frozendict',
         'marshmallow',
         'marshmallow_dataclass',
+        'psutil',
         'python-dateutil',
         'python-magic',
         'pyyaml',
@@ -224,7 +225,7 @@ setup(
         # Support for Graphite integration
         'graphite': ['graphyte'],
         # Support for CPU and memory monitoring and info
-        'sys': ['py-cpuinfo', 'psutil'],
+        'sys': ['py-cpuinfo'],
         # Support for nmap integration
         'nmap': ['python-nmap'],
         # Support for zigbee2mqtt
diff --git a/tests/conftest.py b/tests/conftest.py
index 017042aed..9e3013ddb 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -45,7 +45,7 @@ def app():
     yield _app
 
     logging.info('Stopping Platypush test service')
-    _app.stop_app()
+    _app.stop()
     clear_loggers()
     db = (Config.get('main.db') or {}).get('engine', '')[len('sqlite:///') :]