Added AsyncRunnablePlugin class.

This class handles runnable plugins that have their own asyncio event
loop, without the pain usually caused by the management of multiple
threads + asyncio loops.
This commit is contained in:
Fabio Manganiello 2022-08-14 22:30:49 +02:00
parent f4672ce5c3
commit dba03d3e33
Signed by: blacklight
GPG key ID: D90FBA7F76362774

View file

@ -1,7 +1,9 @@
import asyncio
import logging import logging
import threading import threading
import time import time
from abc import ABC, abstractmethod
from functools import wraps from functools import wraps
from typing import Optional from typing import Optional
@ -117,4 +119,74 @@ class RunnablePlugin(Plugin):
self._thread = None self._thread = None
class AsyncRunnablePlugin(RunnablePlugin, ABC):
"""
Class for runnable plugins with an asynchronous event loop attached.
"""
def __init__(self, *args, _stop_timeout: Optional[float] = 30.0, **kwargs):
super().__init__(*args, **kwargs)
self._stop_timeout = _stop_timeout
self._loop: Optional[asyncio.AbstractEventLoop] = None
self._loop_runner: Optional[threading.Thread] = None
self._task: Optional[asyncio.Task] = None
@property
def _should_start_runner(self):
return True
@abstractmethod
async def listen(self):
pass
async def _listen(self):
try:
await self.listen()
except KeyboardInterrupt:
pass
except RuntimeError as e:
if not (
str(e).startswith('Event loop stopped before ')
or str(e).startswith('no running event loop')
):
raise e
def _start_listener(self):
set_thread_name(self.__class__.__name__ + ':listener')
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
self._task = self._loop.create_task(self._listen())
self._task.set_name(self.__class__.__name__ + '.listen')
self._loop.run_forever()
def main(self):
if self.should_stop() or (self._loop_runner and self._loop_runner.is_alive()):
self.logger.info('The main loop is already being run/stopped')
return
if self._should_start_runner:
self._loop_runner = threading.Thread(target=self._start_listener)
self._loop_runner.start()
self.wait_stop()
def stop(self):
if self._task and self._loop and not self._task.done():
self._loop.call_soon_threadsafe(self._task.cancel)
if self._loop and self._loop.is_running():
self._loop.call_soon_threadsafe(self._loop.stop)
self._loop = None
if self._loop_runner and self._loop_runner.is_alive():
try:
self._loop_runner.join(timeout=self._stop_timeout)
finally:
self._loop_runner = None
super().stop()
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et: