forked from platypush/platypush
Fabio Manganiello
dba03d3e33
This class handles runnable plugins that have their own asyncio event loop, without the pain usually caused by the management of multiple threads + asyncio loops.
192 lines
5.9 KiB
Python
192 lines
5.9 KiB
Python
import asyncio
|
|
import logging
|
|
import threading
|
|
import time
|
|
|
|
from abc import ABC, abstractmethod
|
|
from functools import wraps
|
|
from typing import Optional
|
|
|
|
from platypush.bus import Bus
|
|
from platypush.common import ExtensionWithManifest
|
|
from platypush.event import EventGenerator
|
|
from platypush.message.response import Response
|
|
from platypush.utils import get_decorators, get_plugin_name_by_class, set_thread_name
|
|
|
|
|
|
def action(f):
|
|
@wraps(f)
|
|
def _execute_action(*args, **kwargs):
|
|
response = Response()
|
|
result = f(*args, **kwargs)
|
|
|
|
if result and isinstance(result, Response):
|
|
result.errors = (
|
|
result.errors if isinstance(result.errors, list) else [result.errors]
|
|
)
|
|
response = result
|
|
elif isinstance(result, tuple) and len(result) == 2:
|
|
response.errors = result[1] if isinstance(result[1], list) else [result[1]]
|
|
|
|
if len(response.errors) == 1 and response.errors[0] is None:
|
|
response.errors = []
|
|
response.output = result[0]
|
|
else:
|
|
response = Response(output=result, errors=[])
|
|
|
|
return response
|
|
|
|
# Propagate the docstring
|
|
_execute_action.__doc__ = f.__doc__
|
|
return _execute_action
|
|
|
|
|
|
class Plugin(EventGenerator, ExtensionWithManifest): # lgtm [py/missing-call-to-init]
|
|
"""Base plugin class"""
|
|
|
|
def __init__(self, **kwargs):
|
|
super().__init__()
|
|
self.logger = logging.getLogger(
|
|
'platypush:plugin:' + get_plugin_name_by_class(self.__class__)
|
|
)
|
|
if 'logging' in kwargs:
|
|
self.logger.setLevel(getattr(logging, kwargs['logging'].upper()))
|
|
|
|
self.registered_actions = set(
|
|
get_decorators(self.__class__, climb_class_hierarchy=True).get('action', [])
|
|
)
|
|
|
|
def run(self, method, *args, **kwargs):
|
|
assert (
|
|
method in self.registered_actions
|
|
), '{} is not a registered action on {}'.format(method, self.__class__.__name__)
|
|
return getattr(self, method)(*args, **kwargs)
|
|
|
|
|
|
class RunnablePlugin(Plugin):
|
|
"""
|
|
Class for runnable plugins - i.e. plugins that have a start/stop method and can be started.
|
|
"""
|
|
|
|
def __init__(self, poll_interval: Optional[float] = None, **kwargs):
|
|
"""
|
|
:param poll_interval: How often the :meth:`.loop` function should be execute (default: None, no pause/interval).
|
|
"""
|
|
super().__init__(**kwargs)
|
|
self.poll_interval = poll_interval
|
|
self.bus: Optional[Bus] = None
|
|
self._should_stop = threading.Event()
|
|
self._thread: Optional[threading.Thread] = None
|
|
|
|
def main(self):
|
|
raise NotImplementedError()
|
|
|
|
def should_stop(self):
|
|
return self._should_stop.is_set()
|
|
|
|
def wait_stop(self, timeout=None):
|
|
return self._should_stop.wait(timeout=timeout)
|
|
|
|
def start(self):
|
|
set_thread_name(self.__class__.__name__)
|
|
self._thread = threading.Thread(target=self._runner)
|
|
self._thread.start()
|
|
|
|
def stop(self):
|
|
self._should_stop.set()
|
|
if self._thread and self._thread.is_alive():
|
|
self.logger.info(f'Waiting for {self.__class__.__name__} to stop')
|
|
try:
|
|
if self._thread:
|
|
self._thread.join()
|
|
except Exception as e:
|
|
self.logger.warning(f'Could not join thread on stop: {e}')
|
|
|
|
self.logger.info(f'{self.__class__.__name__} stopped')
|
|
|
|
def _runner(self):
|
|
self.logger.info(f'Starting {self.__class__.__name__}')
|
|
|
|
while not self.should_stop():
|
|
try:
|
|
self.main()
|
|
except Exception as e:
|
|
self.logger.exception(e)
|
|
|
|
if self.poll_interval:
|
|
time.sleep(self.poll_interval)
|
|
|
|
self._thread = None
|
|
|
|
|
|
class AsyncRunnablePlugin(RunnablePlugin, ABC):
|
|
"""
|
|
Class for runnable plugins with an asynchronous event loop attached.
|
|
"""
|
|
|
|
def __init__(self, *args, _stop_timeout: Optional[float] = 30.0, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
|
|
self._stop_timeout = _stop_timeout
|
|
self._loop: Optional[asyncio.AbstractEventLoop] = None
|
|
self._loop_runner: Optional[threading.Thread] = None
|
|
self._task: Optional[asyncio.Task] = None
|
|
|
|
@property
|
|
def _should_start_runner(self):
|
|
return True
|
|
|
|
@abstractmethod
|
|
async def listen(self):
|
|
pass
|
|
|
|
async def _listen(self):
|
|
try:
|
|
await self.listen()
|
|
except KeyboardInterrupt:
|
|
pass
|
|
except RuntimeError as e:
|
|
if not (
|
|
str(e).startswith('Event loop stopped before ')
|
|
or str(e).startswith('no running event loop')
|
|
):
|
|
raise e
|
|
|
|
def _start_listener(self):
|
|
set_thread_name(self.__class__.__name__ + ':listener')
|
|
self._loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(self._loop)
|
|
|
|
self._task = self._loop.create_task(self._listen())
|
|
self._task.set_name(self.__class__.__name__ + '.listen')
|
|
self._loop.run_forever()
|
|
|
|
def main(self):
|
|
if self.should_stop() or (self._loop_runner and self._loop_runner.is_alive()):
|
|
self.logger.info('The main loop is already being run/stopped')
|
|
return
|
|
|
|
if self._should_start_runner:
|
|
self._loop_runner = threading.Thread(target=self._start_listener)
|
|
self._loop_runner.start()
|
|
|
|
self.wait_stop()
|
|
|
|
def stop(self):
|
|
if self._task and self._loop and not self._task.done():
|
|
self._loop.call_soon_threadsafe(self._task.cancel)
|
|
|
|
if self._loop and self._loop.is_running():
|
|
self._loop.call_soon_threadsafe(self._loop.stop)
|
|
self._loop = None
|
|
|
|
if self._loop_runner and self._loop_runner.is_alive():
|
|
try:
|
|
self._loop_runner.join(timeout=self._stop_timeout)
|
|
finally:
|
|
self._loop_runner = None
|
|
|
|
super().stop()
|
|
|
|
|
|
# vim:sw=4:ts=4:et:
|