Added stand-alone connect and disconnect actions to bluetooth.

This commit is contained in:
Fabio Manganiello 2023-02-25 01:59:35 +01:00
parent 70d1bb893c
commit 15fadb93bb
Signed by untrusted user: blacklight
GPG key ID: D90FBA7F76362774

View file

@ -1,7 +1,9 @@
import base64
from asyncio import Event, Lock, ensure_future
import asyncio
from contextlib import asynccontextmanager
from dataclasses import dataclass
from threading import RLock, Timer
import threading
from time import time
from typing import (
Any,
@ -48,6 +50,19 @@ from ._mappers import device_to_entity, parse_device_args
UUIDType = Union[str, UUID]
@dataclass
class _BluetoothConnection:
"""
A class to store information and context about a Bluetooth connection.
"""
client: BleakClient
device: BLEDevice
loop: asyncio.AbstractEventLoop
close_event: Optional[asyncio.Event] = None
thread: Optional[threading.Thread] = None
class BluetoothBlePlugin(AsyncRunnablePlugin, EntityManager):
"""
Plugin to interact with BLE (Bluetooth Low-Energy) devices.
@ -75,7 +90,6 @@ class BluetoothBlePlugin(AsyncRunnablePlugin, EntityManager):
* :class:`platypush.message.event.bluetooth.BluetoothDeviceDisconnectedEvent`
* :class:`platypush.message.event.bluetooth.BluetoothDeviceFoundEvent`
* :class:`platypush.message.event.bluetooth.BluetoothDeviceLostEvent`
* :class:`platypush.message.event.bluetooth.BluetoothDeviceNewDataEvent`
* :class:`platypush.message.event.bluetooth.BluetoothDevicePairedEvent`
* :class:`platypush.message.event.bluetooth.BluetoothDeviceTrustedEvent`
* :class:`platypush.message.event.bluetooth.BluetoothDeviceUnblockedEvent`
@ -83,10 +97,11 @@ class BluetoothBlePlugin(AsyncRunnablePlugin, EntityManager):
* :class:`platypush.message.event.bluetooth.BluetoothDeviceUntrustedEvent`
* :class:`platypush.message.event.bluetooth.BluetoothScanPausedEvent`
* :class:`platypush.message.event.bluetooth.BluetoothScanResumedEvent`
* :class:`platypush.message.event.entities.EntityUpdateEvent`
"""
_default_connect_timeout: Final[int] = 5
_default_connect_timeout: Final[int] = 10
""" Default connection timeout (in seconds) """
_rssi_update_interval: Final[int] = 30
@ -108,7 +123,7 @@ class BluetoothBlePlugin(AsyncRunnablePlugin, EntityManager):
:param interface: Name of the Bluetooth interface to use (e.g. ``hci0``
on Linux). Default: first available interface.
:param connect_timeout: Timeout in seconds for the connection to a
Bluetooth device. Default: 5 seconds.
Bluetooth device. Default: 10 seconds.
:param uuids: List of service/characteristic UUIDs to discover.
Default: all.
:param device_names: Bluetooth address -> device name mapping. If not
@ -134,10 +149,10 @@ class BluetoothBlePlugin(AsyncRunnablePlugin, EntityManager):
self._connect_timeout: float = connect_timeout
self._uuids: Collection[Union[str, UUID]] = uuids or []
self._scan_lock = RLock()
self._scan_enabled = Event()
self._scan_enabled = asyncio.Event()
self._scan_controller_timer: Optional[Timer] = None
self._connections: Dict[str, BleakClient] = {}
self._connection_locks: Dict[str, Lock] = {}
self._connections: Dict[str, _BluetoothConnection] = {}
self._connection_locks: Dict[str, asyncio.Lock] = {}
self._devices: Dict[str, BLEDevice] = {}
self._entities: Dict[str, BluetoothDevice] = {}
self._device_last_updated_at: Dict[str, float] = {}
@ -170,6 +185,13 @@ class BluetoothBlePlugin(AsyncRunnablePlugin, EntityManager):
def _post_event(
self, event_type: Type[BluetoothDeviceEvent], device: BLEDevice, **kwargs
):
if event_type == BluetoothDeviceNewDataEvent:
# Skip BluetoothDeviceNewDataEvent. They are only generated to
# infer if the device has sent some new data, but they can be very
# noisy and affect the latency - especially in case of connections
# that exchange a lot of data.
return
get_bus().post(
event_type(address=device.address, **parse_device_args(device), **kwargs)
)
@ -254,6 +276,8 @@ class BluetoothBlePlugin(AsyncRunnablePlugin, EntityManager):
self._post_event(event_type, device)
self._device_last_updated_at[device.address] = time()
# Explicitly connect the child entities to their parent before
# flushing them
for child in entity.children:
child.parent = entity
@ -284,20 +308,30 @@ class BluetoothBlePlugin(AsyncRunnablePlugin, EntityManager):
device: str,
interface: Optional[str] = None,
timeout: Optional[float] = None,
) -> AsyncGenerator[BleakClient, None]:
close_event: Optional[asyncio.Event] = None,
) -> AsyncGenerator[_BluetoothConnection, None]:
dev = await self._get_device(device)
async with self._connection_locks.get(dev.address, Lock()) as lock:
self._connection_locks[dev.address] = lock or Lock()
async with self._connection_locks.get(dev.address, asyncio.Lock()) as lock:
self._connection_locks[dev.address] = lock or asyncio.Lock()
async with BleakClient(
dev.address,
adapter=interface or self._interface,
timeout=timeout or self._connect_timeout,
) as client:
self._connections[dev.address] = client
yield client
assert client, f'Could not connect to the device {device}'
self.logger.info('Connected to device %s', device)
self._connections[dev.address] = _BluetoothConnection(
client=client,
device=dev,
loop=asyncio.get_event_loop(),
thread=threading.current_thread(),
close_event=close_event,
)
yield self._connections[dev.address]
self._connections.pop(dev.address, None)
self.logger.info('Disconnected from device %s', device)
async def _read(
self,
@ -306,8 +340,8 @@ class BluetoothBlePlugin(AsyncRunnablePlugin, EntityManager):
interface: Optional[str] = None,
connect_timeout: Optional[float] = None,
) -> bytearray:
async with self._connect(device, interface, connect_timeout) as client:
data = await client.read_gatt_char(service_uuid)
async with self._connect(device, interface, connect_timeout) as conn:
data = await conn.client.read_gatt_char(service_uuid)
return data
@ -319,8 +353,8 @@ class BluetoothBlePlugin(AsyncRunnablePlugin, EntityManager):
interface: Optional[str] = None,
connect_timeout: Optional[float] = None,
):
async with self._connect(device, interface, connect_timeout) as client:
await client.write_gatt_char(service_uuid, data)
async with self._connect(device, interface, connect_timeout) as conn:
await conn.client.write_gatt_char(service_uuid, data)
async def _scan(
self,
@ -370,6 +404,65 @@ class BluetoothBlePlugin(AsyncRunnablePlugin, EntityManager):
self._scan_controller_timer = Timer(duration, timer_callback)
self._scan_controller_timer.start()
@action
def connect(
self,
device: str,
interface: Optional[str] = None,
timeout: Optional[float] = None,
):
"""
Pair and connect to a device by address or name.
:param device: The device address or name.
:param interface: The Bluetooth interface to use (it overrides the
default ``interface``).
:param timeout: The connection timeout in seconds (it overrides the
default ``connect_timeout``).
"""
timeout = timeout or self._connect_timeout
connected_event = threading.Event()
close_event = asyncio.Event()
loop = asyncio.new_event_loop()
def connect_thread():
async def connect_wrapper():
async with self._connect(device, interface, timeout, close_event):
connected_event.set()
await close_event.wait()
asyncio.set_event_loop(loop)
loop.run_until_complete(connect_wrapper())
loop = get_or_create_event_loop()
connector = threading.Thread(target=connect_thread)
connector.start()
success = connected_event.wait(timeout=timeout)
assert success, f'Connection to {device} timed out'
@action
def disconnect(self, device: str):
"""
Close an active connection to a device.
:param device: The device address or name.
"""
address: Optional[str] = device
if not self._connections.get(device):
address = self._device_addr_by_name.get(device)
assert address, f'Device {device} not found'
connection = self._connections.get(address, None)
assert connection, f'No active connections to the device {device} were found'
if connection.close_event:
connection.close_event.set()
if connection.thread and connection.thread.is_alive():
connection.thread.join(timeout=5)
assert not (
connection.thread and connection.thread.is_alive()
), f'Disconnection from {device} timed out'
@action
def scan_pause(self, duration: Optional[float] = None):
"""
@ -379,7 +472,9 @@ class BluetoothBlePlugin(AsyncRunnablePlugin, EntityManager):
(default: null = indefinitely).
"""
if self._loop:
ensure_future(self._scan_state_set(False, duration), loop=self._loop)
asyncio.ensure_future(
self._scan_state_set(False, duration), loop=self._loop
)
@action
def scan_resume(self, duration: Optional[float] = None):
@ -390,7 +485,7 @@ class BluetoothBlePlugin(AsyncRunnablePlugin, EntityManager):
(default: null = indefinitely).
"""
if self._loop:
ensure_future(self._scan_state_set(True, duration), loop=self._loop)
asyncio.ensure_future(self._scan_state_set(True, duration), loop=self._loop)
@action
def scan(
@ -518,6 +613,15 @@ class BluetoothBlePlugin(AsyncRunnablePlugin, EntityManager):
if self._scan_controller_timer:
self._scan_controller_timer.cancel()
connections = list(self._connections.values())
for conn in connections:
try:
self.disconnect(conn.device.address)
except Exception as e:
self.logger.warning(
'Error while disconnecting from %s: %s', conn.device.address, e
)
super().stop()