From 3c355352c5f00e4e117676d2ba5bc38fd805ec82 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Fri, 24 Mar 2023 16:39:30 +0100 Subject: [PATCH] Using the new `StoppableThread` API. --- platypush/plugins/bluetooth/_ble/_manager.py | 1 + .../bluetooth/_legacy/_manager/_base.py | 4 +- platypush/plugins/bluetooth/_manager.py | 17 ++------ platypush/plugins/bluetooth/_plugin.py | 40 ++++++++++--------- 4 files changed, 29 insertions(+), 33 deletions(-) diff --git a/platypush/plugins/bluetooth/_ble/_manager.py b/platypush/plugins/bluetooth/_ble/_manager.py index 9e8635616b..a2db3d47c8 100644 --- a/platypush/plugins/bluetooth/_ble/_manager.py +++ b/platypush/plugins/bluetooth/_ble/_manager.py @@ -426,6 +426,7 @@ class BLEManager(BaseBluetoothManager): Upon stop request, it stops any pending scans and closes all active connections. """ + super().stop() self._close_active_connections() if self._main_loop and self._main_loop.is_running(): self._main_loop.stop() diff --git a/platypush/plugins/bluetooth/_legacy/_manager/_base.py b/platypush/plugins/bluetooth/_legacy/_manager/_base.py index 95f01fc40f..874950bd24 100644 --- a/platypush/plugins/bluetooth/_legacy/_manager/_base.py +++ b/platypush/plugins/bluetooth/_legacy/_manager/_base.py @@ -256,7 +256,7 @@ class LegacyManager(BaseBluetoothManager): except IOError as e: self.logger.warning('Could not discover devices: %s', e) # Wait a bit before a potential retry - self._stop_event.wait(timeout=1) + self.wait_stop(timeout=1) return [] # Pre-fill the services for the devices that have already been scanned. @@ -350,6 +350,8 @@ class LegacyManager(BaseBluetoothManager): @override def stop(self): + super().stop() + # Close any active connections for conn in list(self._connections.values()): conn.close(timeout=5) diff --git a/platypush/plugins/bluetooth/_manager.py b/platypush/plugins/bluetooth/_manager.py index 06bd51139d..268c2c6ba2 100644 --- a/platypush/plugins/bluetooth/_manager.py +++ b/platypush/plugins/bluetooth/_manager.py @@ -3,8 +3,9 @@ import logging from queue import Queue import threading from typing import Collection, Optional, Type, Union -from platypush.context import get_bus +from platypush.common import StoppableThread +from platypush.context import get_bus from platypush.entities.bluetooth import BluetoothDevice from platypush.message.event.bluetooth import BluetoothDeviceEvent @@ -12,7 +13,7 @@ from ._cache import EntityCache from ._types import DevicesBlacklist, RawServiceClass -class BaseBluetoothManager(ABC, threading.Thread): +class BaseBluetoothManager(StoppableThread, ABC): """ Abstract interface for Bluetooth managers. """ @@ -22,7 +23,6 @@ class BaseBluetoothManager(ABC, threading.Thread): interface: str, poll_interval: float, connect_timeout: float, - stop_event: threading.Event, scan_lock: threading.RLock, scan_enabled: threading.Event, device_queue: Queue[BluetoothDevice], @@ -36,7 +36,6 @@ class BaseBluetoothManager(ABC, threading.Thread): :param interface: The Bluetooth interface to use. :param poll_interval: Scan interval in seconds. :param connect_timeout: Connection timeout in seconds. - :param stop_event: Event used to synchronize on whether we should stop the plugin. :param scan_lock: Lock to synchronize scanning access to the Bluetooth device. :param scan_enabled: Event used to enable/disable scanning. :param device_queue: Queue used by the ``EventHandler`` to publish @@ -55,7 +54,6 @@ class BaseBluetoothManager(ABC, threading.Thread): self._interface: Optional[str] = interface self._connect_timeout: float = connect_timeout self._service_uuids: Collection[RawServiceClass] = service_uuids or [] - self._stop_event = stop_event self._scan_lock = scan_lock self._scan_enabled = scan_enabled self._device_queue = device_queue @@ -82,9 +80,6 @@ class BaseBluetoothManager(ABC, threading.Thread): def plugins(self): return self._plugins - def should_stop(self) -> bool: - return self._stop_event.is_set() - @abstractmethod def connect( self, @@ -167,11 +162,5 @@ class BaseBluetoothManager(ABC, threading.Thread): configured `connect_timeout`). """ - @abstractmethod - def stop(self): - """ - Stop any pending tasks and terminate the thread. - """ - # vim:sw=4:ts=4:et: diff --git a/platypush/plugins/bluetooth/_plugin.py b/platypush/plugins/bluetooth/_plugin.py index 6a17a5824c..c4ee9badb8 100644 --- a/platypush/plugins/bluetooth/_plugin.py +++ b/platypush/plugins/bluetooth/_plugin.py @@ -17,6 +17,7 @@ from typing import ( from typing_extensions import override +from platypush.common import StoppableThread from platypush.context import get_bus, get_plugin from platypush.entities import ( EnumSwitchEntityManager, @@ -49,6 +50,9 @@ class BluetoothPlugin(RunnablePlugin, EnumSwitchEntityManager): The full list of devices natively supported can be found `here `_. + It also supports legacy Bluetooth services, as well as the transfer of + files. + Note that the support for Bluetooth low-energy devices requires a Bluetooth adapter compatible with the Bluetooth 5.0 specification or higher. @@ -59,7 +63,6 @@ class BluetoothPlugin(RunnablePlugin, EnumSwitchEntityManager): * **TheengsDecoder** (``pip install TheengsDecoder``) * **pydbus** (``pip install pydbus``) * **pybluez** (``pip install git+https://github.com/pybluez/pybluez``) - * **PyOBEX** (``pip install git+https://github.com/BlackLight/PyOBEX``) Triggers: @@ -639,35 +642,36 @@ class BluetoothPlugin(RunnablePlugin, EnumSwitchEntityManager): """ super().stop() - # Stop any pending scan controller timers self._cancel_scan_controller_timer() + self._stop_threads(self._managers.values()) - # Set the stop events on the manager threads - for manager in self._managers.values(): - if manager and manager.is_alive(): - self.logger.info('Waiting for %s to stop', manager.name) + def _stop_threads(self, threads: Collection[StoppableThread], timeout: float = 5): + """ + Set the stop events on active threads and wait for them to stop. + """ + # Set the stop events and call `.stop` + for thread in threads: + if thread and thread.is_alive(): + self.logger.info('Waiting for %s to stop', thread.name) try: - manager.stop() + thread.stop() except Exception as e: - self.logger.exception( - 'Error while stopping %s: %s', manager.name, e - ) + self.logger.exception('Error while stopping %s: %s', thread.name, e) # Wait for the manager threads to stop - stop_timeout = 5 wait_start = time.time() - for manager in self._managers.values(): + for thread in threads: if ( - manager - and manager.ident != threading.current_thread().ident - and manager.is_alive() + thread + and thread.ident != threading.current_thread().ident + and thread.is_alive() ): - manager.join(timeout=max(0, stop_timeout - (time.time() - wait_start))) + thread.join(timeout=max(0, timeout - (time.time() - wait_start))) - if manager and manager.is_alive(): + if thread and thread.is_alive(): self.logger.warning( - 'Timeout while waiting for %s to stop', manager.name + 'Timeout while waiting for %s to stop', thread.name )