Using the new StoppableThread API.

This commit is contained in:
Fabio Manganiello 2023-03-24 16:39:30 +01:00
parent 5ebf4e912e
commit 3c355352c5
Signed by untrusted user: blacklight
GPG key ID: D90FBA7F76362774
4 changed files with 29 additions and 33 deletions

View file

@ -426,6 +426,7 @@ class BLEManager(BaseBluetoothManager):
Upon stop request, it stops any pending scans and closes all active Upon stop request, it stops any pending scans and closes all active
connections. connections.
""" """
super().stop()
self._close_active_connections() self._close_active_connections()
if self._main_loop and self._main_loop.is_running(): if self._main_loop and self._main_loop.is_running():
self._main_loop.stop() self._main_loop.stop()

View file

@ -256,7 +256,7 @@ class LegacyManager(BaseBluetoothManager):
except IOError as e: except IOError as e:
self.logger.warning('Could not discover devices: %s', e) self.logger.warning('Could not discover devices: %s', e)
# Wait a bit before a potential retry # Wait a bit before a potential retry
self._stop_event.wait(timeout=1) self.wait_stop(timeout=1)
return [] return []
# Pre-fill the services for the devices that have already been scanned. # Pre-fill the services for the devices that have already been scanned.
@ -350,6 +350,8 @@ class LegacyManager(BaseBluetoothManager):
@override @override
def stop(self): def stop(self):
super().stop()
# Close any active connections # Close any active connections
for conn in list(self._connections.values()): for conn in list(self._connections.values()):
conn.close(timeout=5) conn.close(timeout=5)

View file

@ -3,8 +3,9 @@ import logging
from queue import Queue from queue import Queue
import threading import threading
from typing import Collection, Optional, Type, Union from typing import Collection, Optional, Type, Union
from platypush.context import get_bus
from platypush.common import StoppableThread
from platypush.context import get_bus
from platypush.entities.bluetooth import BluetoothDevice from platypush.entities.bluetooth import BluetoothDevice
from platypush.message.event.bluetooth import BluetoothDeviceEvent from platypush.message.event.bluetooth import BluetoothDeviceEvent
@ -12,7 +13,7 @@ from ._cache import EntityCache
from ._types import DevicesBlacklist, RawServiceClass from ._types import DevicesBlacklist, RawServiceClass
class BaseBluetoothManager(ABC, threading.Thread): class BaseBluetoothManager(StoppableThread, ABC):
""" """
Abstract interface for Bluetooth managers. Abstract interface for Bluetooth managers.
""" """
@ -22,7 +23,6 @@ class BaseBluetoothManager(ABC, threading.Thread):
interface: str, interface: str,
poll_interval: float, poll_interval: float,
connect_timeout: float, connect_timeout: float,
stop_event: threading.Event,
scan_lock: threading.RLock, scan_lock: threading.RLock,
scan_enabled: threading.Event, scan_enabled: threading.Event,
device_queue: Queue[BluetoothDevice], device_queue: Queue[BluetoothDevice],
@ -36,7 +36,6 @@ class BaseBluetoothManager(ABC, threading.Thread):
:param interface: The Bluetooth interface to use. :param interface: The Bluetooth interface to use.
:param poll_interval: Scan interval in seconds. :param poll_interval: Scan interval in seconds.
:param connect_timeout: Connection timeout in seconds. :param connect_timeout: Connection timeout in seconds.
:param stop_event: Event used to synchronize on whether we should stop the plugin.
:param scan_lock: Lock to synchronize scanning access to the Bluetooth device. :param scan_lock: Lock to synchronize scanning access to the Bluetooth device.
:param scan_enabled: Event used to enable/disable scanning. :param scan_enabled: Event used to enable/disable scanning.
:param device_queue: Queue used by the ``EventHandler`` to publish :param device_queue: Queue used by the ``EventHandler`` to publish
@ -55,7 +54,6 @@ class BaseBluetoothManager(ABC, threading.Thread):
self._interface: Optional[str] = interface self._interface: Optional[str] = interface
self._connect_timeout: float = connect_timeout self._connect_timeout: float = connect_timeout
self._service_uuids: Collection[RawServiceClass] = service_uuids or [] self._service_uuids: Collection[RawServiceClass] = service_uuids or []
self._stop_event = stop_event
self._scan_lock = scan_lock self._scan_lock = scan_lock
self._scan_enabled = scan_enabled self._scan_enabled = scan_enabled
self._device_queue = device_queue self._device_queue = device_queue
@ -82,9 +80,6 @@ class BaseBluetoothManager(ABC, threading.Thread):
def plugins(self): def plugins(self):
return self._plugins return self._plugins
def should_stop(self) -> bool:
return self._stop_event.is_set()
@abstractmethod @abstractmethod
def connect( def connect(
self, self,
@ -167,11 +162,5 @@ class BaseBluetoothManager(ABC, threading.Thread):
configured `connect_timeout`). configured `connect_timeout`).
""" """
@abstractmethod
def stop(self):
"""
Stop any pending tasks and terminate the thread.
"""
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et:

View file

@ -17,6 +17,7 @@ from typing import (
from typing_extensions import override from typing_extensions import override
from platypush.common import StoppableThread
from platypush.context import get_bus, get_plugin from platypush.context import get_bus, get_plugin
from platypush.entities import ( from platypush.entities import (
EnumSwitchEntityManager, EnumSwitchEntityManager,
@ -49,6 +50,9 @@ class BluetoothPlugin(RunnablePlugin, EnumSwitchEntityManager):
The full list of devices natively supported can be found The full list of devices natively supported can be found
`here <https://decoder.theengs.io/devices/devices_by_brand.html>`_. `here <https://decoder.theengs.io/devices/devices_by_brand.html>`_.
It also supports legacy Bluetooth services, as well as the transfer of
files.
Note that the support for Bluetooth low-energy devices requires a Bluetooth Note that the support for Bluetooth low-energy devices requires a Bluetooth
adapter compatible with the Bluetooth 5.0 specification or higher. adapter compatible with the Bluetooth 5.0 specification or higher.
@ -59,7 +63,6 @@ class BluetoothPlugin(RunnablePlugin, EnumSwitchEntityManager):
* **TheengsDecoder** (``pip install TheengsDecoder``) * **TheengsDecoder** (``pip install TheengsDecoder``)
* **pydbus** (``pip install pydbus``) * **pydbus** (``pip install pydbus``)
* **pybluez** (``pip install git+https://github.com/pybluez/pybluez``) * **pybluez** (``pip install git+https://github.com/pybluez/pybluez``)
* **PyOBEX** (``pip install git+https://github.com/BlackLight/PyOBEX``)
Triggers: Triggers:
@ -639,35 +642,36 @@ class BluetoothPlugin(RunnablePlugin, EnumSwitchEntityManager):
""" """
super().stop() super().stop()
# Stop any pending scan controller timers
self._cancel_scan_controller_timer() self._cancel_scan_controller_timer()
self._stop_threads(self._managers.values())
# Set the stop events on the manager threads def _stop_threads(self, threads: Collection[StoppableThread], timeout: float = 5):
for manager in self._managers.values(): """
if manager and manager.is_alive(): Set the stop events on active threads and wait for them to stop.
self.logger.info('Waiting for %s to stop', manager.name) """
# Set the stop events and call `.stop`
for thread in threads:
if thread and thread.is_alive():
self.logger.info('Waiting for %s to stop', thread.name)
try: try:
manager.stop() thread.stop()
except Exception as e: except Exception as e:
self.logger.exception( self.logger.exception('Error while stopping %s: %s', thread.name, e)
'Error while stopping %s: %s', manager.name, e
)
# Wait for the manager threads to stop # Wait for the manager threads to stop
stop_timeout = 5
wait_start = time.time() wait_start = time.time()
for manager in self._managers.values(): for thread in threads:
if ( if (
manager thread
and manager.ident != threading.current_thread().ident and thread.ident != threading.current_thread().ident
and manager.is_alive() and thread.is_alive()
): ):
manager.join(timeout=max(0, stop_timeout - (time.time() - wait_start))) thread.join(timeout=max(0, timeout - (time.time() - wait_start)))
if manager and manager.is_alive(): if thread and thread.is_alive():
self.logger.warning( self.logger.warning(
'Timeout while waiting for %s to stop', manager.name 'Timeout while waiting for %s to stop', thread.name
) )