Synchronize with the currently running stop thread (if any) in `Backend.wait_stop`.

This commit is contained in:
Fabio Manganiello 2023-08-15 02:06:13 +02:00
parent a8a7ceb2ac
commit 46245e851f
Signed by: blacklight
GPG Key ID: D90FBA7F76362774
1 changed files with 39 additions and 14 deletions

View File

@ -6,28 +6,28 @@ import time
from threading import Thread, Event as ThreadEvent, get_ident
from typing import Optional, Dict
from platypush import __version__
from platypush.bus import Bus
from platypush.common import ExtensionWithManifest
from platypush.config import Config
from platypush.context import get_backend
from platypush.event import EventGenerator
from platypush.message import Message
from platypush.message.event import Event
from platypush.message.event.zeroconf import (
ZeroconfServiceAddedEvent,
ZeroconfServiceRemovedEvent,
)
from platypush.utils import (
set_timeout,
clear_timeout,
get_redis_queue_name_by_message,
get_backend_name_by_class,
)
from platypush import __version__
from platypush.event import EventGenerator
from platypush.message import Message
from platypush.message.event import Event
from platypush.message.request import Request
from platypush.message.response import Response
from platypush.utils import get_redis
from platypush.utils import (
clear_timeout,
get_backend_name_by_class,
get_redis,
get_redis_queue_name_by_message,
get_remaining_timeout,
set_timeout,
)
class Backend(Thread, EventGenerator, ExtensionWithManifest):
@ -69,6 +69,7 @@ class Backend(Thread, EventGenerator, ExtensionWithManifest):
self.device_id = Config.get('device_id')
self.thread_id = None
self._stop_event = ThreadEvent()
self._stop_thread: Optional[Thread] = None
self._kwargs = kwargs
self.logger = logging.getLogger(
'platypush:backend:' + get_backend_name_by_class(self.__class__)
@ -300,14 +301,38 @@ class Backend(Thread, EventGenerator, ExtensionWithManifest):
self._stop_event.set()
self.unregister_service()
self.on_stop()
self._stop_thread = None
Thread(target=_async_stop).start()
if not (self._stop_thread and self._stop_thread.is_alive()):
self._stop_thread = Thread(target=_async_stop)
self._stop_thread.start()
def should_stop(self):
"""
:return: True if the backend thread should be stopped, False otherwise.
"""
return self._stop_event.is_set()
def wait_stop(self, timeout=None) -> bool:
return self._stop_event.wait(timeout)
"""
Waits for the backend thread to stop.
:param timeout: The maximum time to wait for the backend thread to stop (default: None)
:return: True if the backend thread has stopped, False otherwise.
"""
start = time.time()
if self._stop_thread:
try:
self._stop_thread.join(
get_remaining_timeout(timeout=timeout, start=start)
)
except AttributeError:
pass
return self._stop_event.wait(
get_remaining_timeout(timeout=timeout, start=start)
)
def get_message_response(self, msg):
queue = get_redis_queue_name_by_message(msg)