From 53aeb0b3b15ddbec6b9ec458bbba01d22087e83c Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Wed, 2 Aug 2023 22:17:11 +0200 Subject: [PATCH] Better documentation for the Redis server + LINT fixes. 1. Added documentation to the README on the possible options to run the Redis service. 2. Show a relevant message to the user if the application is run with `--start-redis` and Redis couldn't start. 3. Some LINT/black chores on some files that hadn't been touched in a while. --- README.md | 97 +++++++++++++++++++++----- platypush/app.py | 31 +++++--- platypush/bus/__init__.py | 53 +++++++++----- platypush/common/__init__.py | 17 +++-- platypush/common/gstreamer/__init__.py | 39 +++++++---- 5 files changed, 172 insertions(+), 65 deletions(-) diff --git a/README.md b/README.md index 2d0e35089..74cbe2836 100644 --- a/README.md +++ b/README.md @@ -15,16 +15,19 @@ Platypush - [Introduction](#introduction) + [What it can do](#what-it-can-do) - [Installation](#installation) - * [System installation](#system-installation) - + [Install through `pip`](#install-through-pip) - + [Install through a system package manager](#install-through-a-system-package-manager) - + [Install from sources](#install-from-sources) + * [Prerequisites](#prerequisites) + + [Docker installation](#docker-installation) + + [Use an external service](#use-an-external-service) + + [Manual installation](#manual-installation) + * [Install through `pip`](#install-through-pip) + * [Install through a system package manager](#install-through-a-system-package-manager) + * [Install from sources](#install-from-sources) * [Installing the dependencies for your extensions](#installing-the-dependencies-for-your-extensions) + [Install via `extras` name](#install-via-extras-name) + [Install via `manifest.yaml`](#install-via-manifestyaml) + [Check the instructions reported in the documentation](#check-the-instructions-reported-in-the-documentation) * [Virtual environment installation](#virtual-environment-installation) - * [Docker installation](#docker-installation) + * [Docker installation](#docker-installation-1) - [Architecture](#architecture) * [Plugins](#plugins) * [Actions](#actions) @@ -127,26 +130,82 @@ You can use Platypush to do things like: ## Installation -### System installation +### Prerequisites -Platypush uses Redis to deliver and store requests and temporary messages: +Platypush uses [Redis](https://redis.io/) to dispatch requests, responses, +events and custom messages across several processes and integrations. + +#### Docker installation + +You can run Redis on the fly on your local machine using a Docker image: + +```bash +# Expose a Redis server on port 6379 (default) +docker run --rm -p 6379:6379 --name redis redis +``` + +#### Use an external service + +You can let Platypush use an external Redis service, if you wish to avoid +running one on the same machine. + +In such scenario, simply start the application by passing custom values for +`--redis-host` and `--redis-port`, or configure these values in its +configuration file: ```yaml -# Example for Debian-based distributions -[sudo] apt-get install redis-server +redis: + host: some-ip + port: some-port +``` +If you wish to run multiple instances that use the same Redis server, you may +also want to customize the name of the default queue that they use +(`--redis-queue` command-line option) in order to avoid conflicts. + +#### Manual installation + +Unless you are running Platypush in a Docker container, or you are running +Redis in a Docker container, or you want to use a remote Redis service, the +Redis server should be installed on the same machine where Platypush runs: + +```bash +# On Debian-based distributions +sudo apt install redis-server + +# On Arch-based distributions +# The hiredis package is also advised +sudo pacman -S redis + +# On MacOS +brew install redis +``` + +Once Redis is installed, you have two options: + +1. Run it a separate service. This depends on your operating system and + supervisor/service controller. For example, on systemd: + +```bash # Enable and start the service -[sudo] systemctl enable redis -[sudo] systemctl start redis +sudo systemctl enable redis +sudo systemctl start redis ``` -#### Install through `pip` +2. Let Platypush run and control the Redis service. This is a good option if + you want Platypush to run its own service, separate from any other one + running on the same machine, and terminate it as soon as the application + ends. In this case, simply launch the application with the `--start-redis` + option (and optionally `--redis-port ` to customize the listen + port). -```shell -[sudo] pip3 install platypush +### Install through `pip` + +```bash +[sudo] pip install platypush ``` -#### Install through a system package manager +### Install through a system package manager Note: currently only Arch Linux and derived distributions are supported. @@ -157,7 +216,7 @@ latest stable version) or the (for the latest git version) through your favourite AUR package manager. For example, using `yay`: -```shell +```bash yay platypush # Or yay platypush-git @@ -166,14 +225,12 @@ yay platypush-git The Arch Linux packages on AUR are automatically updated upon new git commits or tags. -#### Install from sources +### Install from sources ```shell git clone https://git.platypush.tech/platypush/platypush.git cd platypush [sudo] pip install . -# Or -[sudo] python3 setup.py install ``` ### Installing the dependencies for your extensions @@ -227,6 +284,8 @@ You can then start the service by simply running: platypush ``` +See `platypush --help` for a full list of options. + It's advised to run it as a systemd service though - simply copy the provided [`.service` file](https://git.platypush.tech/platypush/platypush/src/branch/master/examples/systemd/platypush.service) diff --git a/platypush/app.py b/platypush/app.py index b35b62867..cea035c72 100644 --- a/platypush/app.py +++ b/platypush/app.py @@ -153,16 +153,27 @@ class Application: port = self._redis_conf['port'] log.info('Starting local Redis instance on %s', port) - self._redis_proc = subprocess.Popen( # pylint: disable=consider-using-with - [ - 'redis-server', - '--bind', - 'localhost', - '--port', - str(port), - ], - stdout=subprocess.PIPE, - ) + redis_cmd_args = [ + 'redis-server', + '--bind', + 'localhost', + '--port', + str(port), + ] + + try: + self._redis_proc = subprocess.Popen( # pylint: disable=consider-using-with + redis_cmd_args, + stdout=subprocess.PIPE, + ) + except Exception as e: + log.error( + 'Failed to start local Redis instance: "%s": %s', + ' '.join(redis_cmd_args), + e, + ) + + sys.exit(1) log.info('Waiting for Redis to start') for line in self._redis_proc.stdout: # type: ignore diff --git a/platypush/bus/__init__.py b/platypush/bus/__init__.py index 0dd57eb03..8bfc4e6e6 100644 --- a/platypush/bus/__init__.py +++ b/platypush/bus/__init__.py @@ -10,8 +10,10 @@ from platypush.message.event import Event logger = logging.getLogger('platypush:bus') -class Bus(object): - """ Main local bus where the daemon will listen for new messages """ +class Bus: + """ + Main local bus where the daemon will listen for new messages. + """ _MSG_EXPIRY_TIMEOUT = 60.0 # Consider a message on the bus as expired after one minute without being picked up @@ -23,39 +25,45 @@ class Bus(object): self._should_stop = threading.Event() def post(self, msg): - """ Sends a message to the bus """ + """Sends a message to the bus""" self.bus.put(msg) def get(self): - """ Reads one message from the bus """ + """Reads one message from the bus""" try: return self.bus.get(timeout=0.1) except Empty: - return + return None def stop(self): self._should_stop.set() def _msg_executor(self, msg): def event_handler(event: Event, handler: Callable[[Event], None]): - logger.info('Triggering event handler {}'.format(handler.__name__)) + logger.info('Triggering event handler %s', handler.__name__) handler(event) def executor(): if isinstance(msg, Event): - if type(msg) in self.event_handlers: - handlers = self.event_handlers[type(msg)] - else: - handlers = {*[hndl for event_type, hndl in self.event_handlers.items() - if isinstance(msg, event_type)]} + handlers = self.event_handlers.get( + type(msg), + { + *[ + hndl + for event_type, hndl in self.event_handlers.items() + if isinstance(msg, event_type) + ] + }, + ) for hndl in handlers: threading.Thread(target=event_handler, args=(msg, hndl)) try: - self.on_message(msg) + if self.on_message: + self.on_message(msg) except Exception as e: - logger.error('Error on processing message {}'.format(msg)) + logger.error('Error on processing message %s', msg) logger.exception(e) return executor @@ -76,17 +84,24 @@ class Bus(object): if msg is None: continue - timestamp = msg.timestamp if hasattr(msg, 'timestamp') else msg.get('timestamp') + timestamp = ( + msg.timestamp if hasattr(msg, 'timestamp') else msg.get('timestamp') + ) if timestamp and time.time() - timestamp > self._MSG_EXPIRY_TIMEOUT: - logger.debug('{} seconds old message on the bus expired, ignoring it: {}'. - format(int(time.time()-msg.timestamp), msg)) + logger.debug( + '%f seconds old message on the bus expired, ignoring it: %s', + time.time() - msg.timestamp, + msg, + ) continue threading.Thread(target=self._msg_executor(msg)).start() logger.info('Bus service stopped') - def register_handler(self, event_type: Type[Event], handler: Callable[[Event], None]) -> Callable[[], None]: + def register_handler( + self, event_type: Type[Event], handler: Callable[[Event], None] + ) -> Callable[[], None]: """ Register an event handler to the bus. @@ -104,7 +119,9 @@ class Bus(object): return unregister - def unregister_handler(self, event_type: Type[Event], handler: Callable[[Event], None]) -> None: + def unregister_handler( + self, event_type: Type[Event], handler: Callable[[Event], None] + ) -> None: """ Remove an event handler. diff --git a/platypush/common/__init__.py b/platypush/common/__init__.py index 8a0f84215..b4197e2b1 100644 --- a/platypush/common/__init__.py +++ b/platypush/common/__init__.py @@ -1,6 +1,7 @@ import inspect import logging import os +from typing import Any, Callable from platypush.utils.manifest import Manifest @@ -9,7 +10,11 @@ from ._types import StoppableThread logger = logging.getLogger('platypush') -def exec_wrapper(f, *args, **kwargs): +def exec_wrapper(f: Callable[..., Any], *args, **kwargs): + """ + Utility function that runs a callable with its arguments, wraps its + response into a ``Response`` object and handles errors/exceptions. + """ from platypush import Response try: @@ -23,7 +28,13 @@ def exec_wrapper(f, *args, **kwargs): return Response(errors=[str(e)]) +# pylint: disable=too-few-public-methods class ExtensionWithManifest: + """ + This class models an extension with an associated manifest.yaml in the same + folder. + """ + def __init__(self, *_, **__): self._manifest = self.get_manifest() @@ -33,9 +44,7 @@ class ExtensionWithManifest: ) assert os.path.isfile( manifest_file - ), 'The extension {} has no associated manifest.yaml'.format( - self.__class__.__name__ - ) + ), f'The extension {self.__class__.__name__} has no associated manifest.yaml' return Manifest.from_file(manifest_file) diff --git a/platypush/common/gstreamer/__init__.py b/platypush/common/gstreamer/__init__.py index 05b9c9eda..bb4e53129 100644 --- a/platypush/common/gstreamer/__init__.py +++ b/platypush/common/gstreamer/__init__.py @@ -3,18 +3,22 @@ import threading from typing import Optional -# noinspection PyPackageRequirements import gi + gi.require_version('Gst', '1.0') gi.require_version('GstApp', '1.0') -# noinspection PyPackageRequirements,PyUnresolvedReferences +# flake8: noqa from gi.repository import GLib, Gst, GstApp Gst.init(None) class Pipeline: + """ + A GStreamer pipeline. + """ + def __init__(self): self.logger = logging.getLogger('gst-pipeline') self.pipeline = Gst.Pipeline() @@ -57,15 +61,16 @@ class Pipeline: @staticmethod def link(*elements): for i, el in enumerate(elements): - if i == len(elements)-1: + if i == len(elements) - 1: break - el.link(elements[i+1]) + el.link(elements[i + 1]) def emit(self, signal, *args, **kwargs): return self.pipeline.emit(signal, *args, **kwargs) def play(self): self.pipeline.set_state(Gst.State.PLAYING) + assert self.loop, 'No GLib loop is running' self.loop.start() def pause(self): @@ -92,7 +97,7 @@ class Pipeline: def on_buffer(self, sink): sample = GstApp.AppSink.pull_sample(sink) buffer = sample.get_buffer() - size, offset, maxsize = buffer.get_sizes() + size, offset, _ = buffer.get_sizes() self.data = buffer.extract_dup(offset, size) self.data_ready.set() return False @@ -101,9 +106,8 @@ class Pipeline: self.logger.info('End of stream event received') self.stop() - # noinspection PyUnusedLocal - def on_error(self, bus, msg): - self.logger.warning('GStreamer pipeline error: {}'.format(msg.parse_error())) + def on_error(self, _, msg): + self.logger.warning('GStreamer pipeline error: %s', msg.parse_error()) self.stop() def get_source(self): @@ -113,12 +117,11 @@ class Pipeline: return self.sink def get_state(self) -> Gst.State: - state = self.source.current_state - if not state: + if not (self.source and self.source.current_state): self.logger.warning('Unable to get pipeline state') return Gst.State.NULL - return state + return self.source.current_state def is_playing(self) -> bool: return self.get_state() == Gst.State.PLAYING @@ -127,6 +130,10 @@ class Pipeline: return self.get_state() == Gst.State.PAUSED def get_position(self) -> Optional[float]: + if not self.source: + self.logger.warning('Unable to get pipeline state') + return Gst.State.NULL + pos = self.source.query_position(Gst.Format(Gst.Format.TIME)) if not pos[0]: return None @@ -134,6 +141,7 @@ class Pipeline: return pos[1] / 1e9 def get_duration(self) -> Optional[float]: + assert self.source, 'No active source found' pos = self.source.query_duration(Gst.Format(Gst.Format.TIME)) if not pos[0]: return None @@ -157,9 +165,7 @@ class Pipeline: def seek(self, position: float): assert self.source, 'No source specified' - if position < 0: - position = 0 - + position = max(0, position) duration = self.get_duration() if duration and position > duration: position = duration @@ -169,11 +175,16 @@ class Pipeline: class Loop(threading.Thread): + """ + Wraps the execution of a GLib main loop into its own thread. + """ + def __init__(self): super().__init__() self._loop = GLib.MainLoop() def run(self): + assert self._loop, 'No GLib loop is running' self._loop.run() def is_running(self) -> bool: