From dba03d3e33aea8b7faf8385ad5fc8e619484f535 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Sun, 14 Aug 2022 22:30:49 +0200 Subject: [PATCH] Added AsyncRunnablePlugin class. This class handles runnable plugins that have their own asyncio event loop, without the pain usually caused by the management of multiple threads + asyncio loops. --- platypush/plugins/__init__.py | 72 +++++++++++++++++++++++++++++++++++ 1 file changed, 72 insertions(+) diff --git a/platypush/plugins/__init__.py b/platypush/plugins/__init__.py index 2d75697f5..f35f06a4e 100644 --- a/platypush/plugins/__init__.py +++ b/platypush/plugins/__init__.py @@ -1,7 +1,9 @@ +import asyncio import logging import threading import time +from abc import ABC, abstractmethod from functools import wraps from typing import Optional @@ -117,4 +119,74 @@ class RunnablePlugin(Plugin): self._thread = None +class AsyncRunnablePlugin(RunnablePlugin, ABC): + """ + Class for runnable plugins with an asynchronous event loop attached. + """ + + def __init__(self, *args, _stop_timeout: Optional[float] = 30.0, **kwargs): + super().__init__(*args, **kwargs) + + self._stop_timeout = _stop_timeout + self._loop: Optional[asyncio.AbstractEventLoop] = None + self._loop_runner: Optional[threading.Thread] = None + self._task: Optional[asyncio.Task] = None + + @property + def _should_start_runner(self): + return True + + @abstractmethod + async def listen(self): + pass + + async def _listen(self): + try: + await self.listen() + except KeyboardInterrupt: + pass + except RuntimeError as e: + if not ( + str(e).startswith('Event loop stopped before ') + or str(e).startswith('no running event loop') + ): + raise e + + def _start_listener(self): + set_thread_name(self.__class__.__name__ + ':listener') + self._loop = asyncio.new_event_loop() + asyncio.set_event_loop(self._loop) + + self._task = self._loop.create_task(self._listen()) + self._task.set_name(self.__class__.__name__ + '.listen') + self._loop.run_forever() + + def main(self): + if self.should_stop() or (self._loop_runner and self._loop_runner.is_alive()): + self.logger.info('The main loop is already being run/stopped') + return + + if self._should_start_runner: + self._loop_runner = threading.Thread(target=self._start_listener) + self._loop_runner.start() + + self.wait_stop() + + def stop(self): + if self._task and self._loop and not self._task.done(): + self._loop.call_soon_threadsafe(self._task.cancel) + + if self._loop and self._loop.is_running(): + self._loop.call_soon_threadsafe(self._loop.stop) + self._loop = None + + if self._loop_runner and self._loop_runner.is_alive(): + try: + self._loop_runner.join(timeout=self._stop_timeout) + finally: + self._loop_runner = None + + super().stop() + + # vim:sw=4:ts=4:et: