platypush/platypush/plugins/__init__.py

193 lines
5.9 KiB
Python
Raw Normal View History

import asyncio
2017-12-11 03:53:26 +01:00
import logging
2021-07-22 01:02:15 +02:00
import threading
import time
from abc import ABC, abstractmethod
2018-11-01 19:42:40 +01:00
from functools import wraps
2021-07-22 01:02:15 +02:00
from typing import Optional
2018-11-01 19:42:40 +01:00
2021-07-22 01:02:15 +02:00
from platypush.bus import Bus
from platypush.common import ExtensionWithManifest
from platypush.event import EventGenerator
from platypush.message.response import Response
2021-07-22 01:02:15 +02:00
from platypush.utils import get_decorators, get_plugin_name_by_class, set_thread_name
2019-12-17 00:56:28 +01:00
def action(f):
2018-11-01 19:42:40 +01:00
@wraps(f)
def _execute_action(*args, **kwargs):
response = Response()
result = f(*args, **kwargs)
if result and isinstance(result, Response):
result.errors = (
result.errors if isinstance(result.errors, list) else [result.errors]
)
response = result
elif isinstance(result, tuple) and len(result) == 2:
response.errors = result[1] if isinstance(result[1], list) else [result[1]]
if len(response.errors) == 1 and response.errors[0] is None:
response.errors = []
response.output = result[0]
else:
response = Response(output=result, errors=[])
return response
# Propagate the docstring
_execute_action.__doc__ = f.__doc__
return _execute_action
class Plugin(EventGenerator, ExtensionWithManifest): # lgtm [py/missing-call-to-init]
"""Base plugin class"""
2017-11-03 15:06:29 +01:00
2017-12-18 01:10:51 +01:00
def __init__(self, **kwargs):
super().__init__()
self.logger = logging.getLogger(
'platypush:plugin:' + get_plugin_name_by_class(self.__class__)
)
2018-06-06 20:09:18 +02:00
if 'logging' in kwargs:
self.logger.setLevel(getattr(logging, kwargs['logging'].upper()))
self.registered_actions = set(
get_decorators(self.__class__, climb_class_hierarchy=True).get('action', [])
)
def run(self, method, *args, **kwargs):
assert (
method in self.registered_actions
), '{} is not a registered action on {}'.format(method, self.__class__.__name__)
return getattr(self, method)(*args, **kwargs)
2017-10-31 09:20:35 +01:00
class RunnablePlugin(Plugin):
2021-07-22 01:02:15 +02:00
"""
Class for runnable plugins - i.e. plugins that have a start/stop method and can be started.
"""
2021-07-22 01:02:15 +02:00
def __init__(self, poll_interval: Optional[float] = None, **kwargs):
"""
:param poll_interval: How often the :meth:`.loop` function should be execute (default: None, no pause/interval).
"""
super().__init__(**kwargs)
self.poll_interval = poll_interval
self.bus: Optional[Bus] = None
self._should_stop = threading.Event()
self._thread: Optional[threading.Thread] = None
def main(self):
raise NotImplementedError()
def should_stop(self):
return self._should_stop.is_set()
def wait_stop(self, timeout=None):
return self._should_stop.wait(timeout=timeout)
2021-07-22 01:02:15 +02:00
def start(self):
set_thread_name(self.__class__.__name__)
self._thread = threading.Thread(target=self._runner)
self._thread.start()
def stop(self):
self._should_stop.set()
if self._thread and self._thread.is_alive():
self.logger.info(f'Waiting for {self.__class__.__name__} to stop')
try:
if self._thread:
self._thread.join()
2021-09-17 00:47:33 +02:00
except Exception as e:
self.logger.warning(f'Could not join thread on stop: {e}')
2021-07-22 01:02:15 +02:00
self.logger.info(f'{self.__class__.__name__} stopped')
def _runner(self):
self.logger.info(f'Starting {self.__class__.__name__}')
while not self.should_stop():
try:
self.main()
except Exception as e:
self.logger.exception(e)
if self.poll_interval:
time.sleep(self.poll_interval)
self._thread = None
class AsyncRunnablePlugin(RunnablePlugin, ABC):
"""
Class for runnable plugins with an asynchronous event loop attached.
"""
def __init__(self, *args, _stop_timeout: Optional[float] = 30.0, **kwargs):
super().__init__(*args, **kwargs)
self._stop_timeout = _stop_timeout
self._loop: Optional[asyncio.AbstractEventLoop] = None
self._loop_runner: Optional[threading.Thread] = None
self._task: Optional[asyncio.Task] = None
@property
def _should_start_runner(self):
return True
@abstractmethod
async def listen(self):
pass
async def _listen(self):
try:
await self.listen()
except KeyboardInterrupt:
pass
except RuntimeError as e:
if not (
str(e).startswith('Event loop stopped before ')
or str(e).startswith('no running event loop')
):
raise e
def _start_listener(self):
set_thread_name(self.__class__.__name__ + ':listener')
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
self._task = self._loop.create_task(self._listen())
self._task.set_name(self.__class__.__name__ + '.listen')
self._loop.run_forever()
def main(self):
if self.should_stop() or (self._loop_runner and self._loop_runner.is_alive()):
self.logger.info('The main loop is already being run/stopped')
return
if self._should_start_runner:
self._loop_runner = threading.Thread(target=self._start_listener)
self._loop_runner.start()
self.wait_stop()
def stop(self):
if self._task and self._loop and not self._task.done():
self._loop.call_soon_threadsafe(self._task.cancel)
if self._loop and self._loop.is_running():
self._loop.call_soon_threadsafe(self._loop.stop)
self._loop = None
if self._loop_runner and self._loop_runner.is_alive():
try:
self._loop_runner.join(timeout=self._stop_timeout)
finally:
self._loop_runner = None
super().stop()
2017-10-31 09:20:35 +01:00
# vim:sw=4:ts=4:et: