Defined a unique stop_timeout (default=5) for RunnablePlugin

This commit is contained in:
Fabio Manganiello 2023-01-22 14:28:16 +01:00
parent 6d4cf64253
commit bb637a1411
Signed by: blacklight
GPG key ID: D90FBA7F76362774

View file

@ -13,6 +13,8 @@ from platypush.event import EventGenerator
from platypush.message.response import Response from platypush.message.response import Response
from platypush.utils import get_decorators, get_plugin_name_by_class, set_thread_name from platypush.utils import get_decorators, get_plugin_name_by_class, set_thread_name
stop_timeout = 5 # Plugin stop timeout in seconds
def action(f): def action(f):
@wraps(f) @wraps(f)
@ -68,14 +70,23 @@ class RunnablePlugin(Plugin):
Class for runnable plugins - i.e. plugins that have a start/stop method and can be started. Class for runnable plugins - i.e. plugins that have a start/stop method and can be started.
""" """
def __init__(self, poll_interval: Optional[float] = None, **kwargs): def __init__(
self,
poll_interval: Optional[float] = None,
stop_timeout: Optional[float] = stop_timeout,
**kwargs,
):
""" """
:param poll_interval: How often the :meth:`.loop` function should be execute (default: None, no pause/interval). :param poll_interval: How often the :meth:`.loop` function should be
execute (default: None, no pause/interval).
:param stop_timeout: How long we should wait for any running
threads/processes to stop before exiting (default: 5 seconds).
""" """
super().__init__(**kwargs) super().__init__(**kwargs)
self.poll_interval = poll_interval self.poll_interval = poll_interval
self.bus: Optional[Bus] = None self.bus: Optional[Bus] = None
self._should_stop = threading.Event() self._should_stop = threading.Event()
self._stop_timeout = stop_timeout
self._thread: Optional[threading.Thread] = None self._thread: Optional[threading.Thread] = None
def main(self): def main(self):
@ -98,7 +109,16 @@ class RunnablePlugin(Plugin):
self.logger.info(f'Waiting for {self.__class__.__name__} to stop') self.logger.info(f'Waiting for {self.__class__.__name__} to stop')
try: try:
if self._thread: if self._thread:
self._thread.join() self._thread.join(timeout=self._stop_timeout)
if self._thread.is_alive():
self.logger.warning(
f'Timeout (seconds={self._stop_timeout}) on '
'exit for the plugin '
+ (
get_plugin_name_by_class(self.__class__)
or self.__class__.__name__
)
)
except Exception as e: except Exception as e:
self.logger.warning(f'Could not join thread on stop: {e}') self.logger.warning(f'Could not join thread on stop: {e}')
@ -124,10 +144,9 @@ class AsyncRunnablePlugin(RunnablePlugin, ABC):
Class for runnable plugins with an asynchronous event loop attached. Class for runnable plugins with an asynchronous event loop attached.
""" """
def __init__(self, *args, _stop_timeout: Optional[float] = 30.0, **kwargs): def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self._stop_timeout = _stop_timeout
self._loop: Optional[asyncio.AbstractEventLoop] = None self._loop: Optional[asyncio.AbstractEventLoop] = None
self._loop_runner: Optional[threading.Thread] = None self._loop_runner: Optional[threading.Thread] = None
self._task: Optional[asyncio.Task] = None self._task: Optional[asyncio.Task] = None