diff --git a/platypush/plugins/__init__.py b/platypush/plugins/__init__.py index fa7a06fc..fe9410fd 100644 --- a/platypush/plugins/__init__.py +++ b/platypush/plugins/__init__.py @@ -13,6 +13,8 @@ from platypush.event import EventGenerator from platypush.message.response import Response from platypush.utils import get_decorators, get_plugin_name_by_class, set_thread_name +stop_timeout = 5 # Plugin stop timeout in seconds + def action(f): @wraps(f) @@ -68,14 +70,23 @@ class RunnablePlugin(Plugin): Class for runnable plugins - i.e. plugins that have a start/stop method and can be started. """ - def __init__(self, poll_interval: Optional[float] = None, **kwargs): + def __init__( + self, + poll_interval: Optional[float] = None, + stop_timeout: Optional[float] = stop_timeout, + **kwargs, + ): """ - :param poll_interval: How often the :meth:`.loop` function should be execute (default: None, no pause/interval). + :param poll_interval: How often the :meth:`.loop` function should be + execute (default: None, no pause/interval). + :param stop_timeout: How long we should wait for any running + threads/processes to stop before exiting (default: 5 seconds). """ super().__init__(**kwargs) self.poll_interval = poll_interval self.bus: Optional[Bus] = None self._should_stop = threading.Event() + self._stop_timeout = stop_timeout self._thread: Optional[threading.Thread] = None def main(self): @@ -98,7 +109,16 @@ class RunnablePlugin(Plugin): self.logger.info(f'Waiting for {self.__class__.__name__} to stop') try: if self._thread: - self._thread.join() + self._thread.join(timeout=self._stop_timeout) + if self._thread.is_alive(): + self.logger.warning( + f'Timeout (seconds={self._stop_timeout}) on ' + 'exit for the plugin ' + + ( + get_plugin_name_by_class(self.__class__) + or self.__class__.__name__ + ) + ) except Exception as e: self.logger.warning(f'Could not join thread on stop: {e}') @@ -124,10 +144,9 @@ class AsyncRunnablePlugin(RunnablePlugin, ABC): Class for runnable plugins with an asynchronous event loop attached. """ - def __init__(self, *args, _stop_timeout: Optional[float] = 30.0, **kwargs): + def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - self._stop_timeout = _stop_timeout self._loop: Optional[asyncio.AbstractEventLoop] = None self._loop_runner: Optional[threading.Thread] = None self._task: Optional[asyncio.Task] = None