diff --git a/platypush/backend/__init__.py b/platypush/backend/__init__.py index 92929b77..c8b0ea65 100644 --- a/platypush/backend/__init__.py +++ b/platypush/backend/__init__.py @@ -6,28 +6,28 @@ import time from threading import Thread, Event as ThreadEvent, get_ident from typing import Optional, Dict +from platypush import __version__ from platypush.bus import Bus from platypush.common import ExtensionWithManifest from platypush.config import Config from platypush.context import get_backend +from platypush.event import EventGenerator +from platypush.message import Message +from platypush.message.event import Event from platypush.message.event.zeroconf import ( ZeroconfServiceAddedEvent, ZeroconfServiceRemovedEvent, ) -from platypush.utils import ( - set_timeout, - clear_timeout, - get_redis_queue_name_by_message, - get_backend_name_by_class, -) - -from platypush import __version__ -from platypush.event import EventGenerator -from platypush.message import Message -from platypush.message.event import Event from platypush.message.request import Request from platypush.message.response import Response -from platypush.utils import get_redis +from platypush.utils import ( + clear_timeout, + get_backend_name_by_class, + get_redis, + get_redis_queue_name_by_message, + get_remaining_timeout, + set_timeout, +) class Backend(Thread, EventGenerator, ExtensionWithManifest): @@ -69,6 +69,7 @@ class Backend(Thread, EventGenerator, ExtensionWithManifest): self.device_id = Config.get('device_id') self.thread_id = None self._stop_event = ThreadEvent() + self._stop_thread: Optional[Thread] = None self._kwargs = kwargs self.logger = logging.getLogger( 'platypush:backend:' + get_backend_name_by_class(self.__class__) @@ -300,14 +301,38 @@ class Backend(Thread, EventGenerator, ExtensionWithManifest): self._stop_event.set() self.unregister_service() self.on_stop() + self._stop_thread = None - Thread(target=_async_stop).start() + if not (self._stop_thread and self._stop_thread.is_alive()): + self._stop_thread = Thread(target=_async_stop) + self._stop_thread.start() def should_stop(self): + """ + :return: True if the backend thread should be stopped, False otherwise. + """ return self._stop_event.is_set() def wait_stop(self, timeout=None) -> bool: - return self._stop_event.wait(timeout) + """ + Waits for the backend thread to stop. + + :param timeout: The maximum time to wait for the backend thread to stop (default: None) + :return: True if the backend thread has stopped, False otherwise. + """ + start = time.time() + + if self._stop_thread: + try: + self._stop_thread.join( + get_remaining_timeout(timeout=timeout, start=start) + ) + except AttributeError: + pass + + return self._stop_event.wait( + get_remaining_timeout(timeout=timeout, start=start) + ) def get_message_response(self, msg): queue = get_redis_queue_name_by_message(msg)