Added AsyncRunnablePlugin class.

This class handles runnable plugins that have their own asyncio event
loop, without the pain usually caused by the management of multiple
threads + asyncio loops.
This commit is contained in:
Fabio Manganiello 2022-08-14 22:30:49 +02:00
parent 4043878afd
commit 3b1ab78268
Signed by: blacklight
GPG key ID: D90FBA7F76362774

View file

@ -1,7 +1,9 @@
import asyncio
import logging
import threading
import time
from abc import ABC, abstractmethod
from functools import wraps
from typing import Optional
@ -110,4 +112,74 @@ class RunnablePlugin(Plugin):
self._thread = None
class AsyncRunnablePlugin(RunnablePlugin, ABC):
"""
Class for runnable plugins with an asynchronous event loop attached.
"""
def __init__(self, *args, _stop_timeout: Optional[float] = 30.0, **kwargs):
super().__init__(*args, **kwargs)
self._stop_timeout = _stop_timeout
self._loop: Optional[asyncio.AbstractEventLoop] = None
self._loop_runner: Optional[threading.Thread] = None
self._task: Optional[asyncio.Task] = None
@property
def _should_start_runner(self):
return True
@abstractmethod
async def listen(self):
pass
async def _listen(self):
try:
await self.listen()
except KeyboardInterrupt:
pass
except RuntimeError as e:
if not (
str(e).startswith('Event loop stopped before ')
or str(e).startswith('no running event loop')
):
raise e
def _start_listener(self):
set_thread_name(self.__class__.__name__ + ':listener')
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
self._task = self._loop.create_task(self._listen())
self._task.set_name(self.__class__.__name__ + '.listen')
self._loop.run_forever()
def main(self):
if self.should_stop() or (self._loop_runner and self._loop_runner.is_alive()):
self.logger.info('The main loop is already being run/stopped')
return
if self._should_start_runner:
self._loop_runner = threading.Thread(target=self._start_listener)
self._loop_runner.start()
self.wait_stop()
def stop(self):
if self._task and self._loop and not self._task.done():
self._loop.call_soon_threadsafe(self._task.cancel)
if self._loop and self._loop.is_running():
self._loop.call_soon_threadsafe(self._loop.stop)
self._loop = None
if self._loop_runner and self._loop_runner.is_alive():
try:
self._loop_runner.join(timeout=self._stop_timeout)
finally:
self._loop_runner = None
super().stop()
# vim:sw=4:ts=4:et: