diff --git a/platypush/plugins/bluetooth/ble/__init__.py b/platypush/plugins/bluetooth/ble/__init__.py index 4bc4391c3..545d7e782 100644 --- a/platypush/plugins/bluetooth/ble/__init__.py +++ b/platypush/plugins/bluetooth/ble/__init__.py @@ -1,7 +1,9 @@ import base64 -from asyncio import Event, Lock, ensure_future +import asyncio from contextlib import asynccontextmanager +from dataclasses import dataclass from threading import RLock, Timer +import threading from time import time from typing import ( Any, @@ -48,6 +50,19 @@ from ._mappers import device_to_entity, parse_device_args UUIDType = Union[str, UUID] +@dataclass +class _BluetoothConnection: + """ + A class to store information and context about a Bluetooth connection. + """ + + client: BleakClient + device: BLEDevice + loop: asyncio.AbstractEventLoop + close_event: Optional[asyncio.Event] = None + thread: Optional[threading.Thread] = None + + class BluetoothBlePlugin(AsyncRunnablePlugin, EntityManager): """ Plugin to interact with BLE (Bluetooth Low-Energy) devices. @@ -75,7 +90,6 @@ class BluetoothBlePlugin(AsyncRunnablePlugin, EntityManager): * :class:`platypush.message.event.bluetooth.BluetoothDeviceDisconnectedEvent` * :class:`platypush.message.event.bluetooth.BluetoothDeviceFoundEvent` * :class:`platypush.message.event.bluetooth.BluetoothDeviceLostEvent` - * :class:`platypush.message.event.bluetooth.BluetoothDeviceNewDataEvent` * :class:`platypush.message.event.bluetooth.BluetoothDevicePairedEvent` * :class:`platypush.message.event.bluetooth.BluetoothDeviceTrustedEvent` * :class:`platypush.message.event.bluetooth.BluetoothDeviceUnblockedEvent` @@ -83,10 +97,11 @@ class BluetoothBlePlugin(AsyncRunnablePlugin, EntityManager): * :class:`platypush.message.event.bluetooth.BluetoothDeviceUntrustedEvent` * :class:`platypush.message.event.bluetooth.BluetoothScanPausedEvent` * :class:`platypush.message.event.bluetooth.BluetoothScanResumedEvent` + * :class:`platypush.message.event.entities.EntityUpdateEvent` """ - _default_connect_timeout: Final[int] = 5 + _default_connect_timeout: Final[int] = 10 """ Default connection timeout (in seconds) """ _rssi_update_interval: Final[int] = 30 @@ -108,7 +123,7 @@ class BluetoothBlePlugin(AsyncRunnablePlugin, EntityManager): :param interface: Name of the Bluetooth interface to use (e.g. ``hci0`` on Linux). Default: first available interface. :param connect_timeout: Timeout in seconds for the connection to a - Bluetooth device. Default: 5 seconds. + Bluetooth device. Default: 10 seconds. :param uuids: List of service/characteristic UUIDs to discover. Default: all. :param device_names: Bluetooth address -> device name mapping. If not @@ -134,10 +149,10 @@ class BluetoothBlePlugin(AsyncRunnablePlugin, EntityManager): self._connect_timeout: float = connect_timeout self._uuids: Collection[Union[str, UUID]] = uuids or [] self._scan_lock = RLock() - self._scan_enabled = Event() + self._scan_enabled = asyncio.Event() self._scan_controller_timer: Optional[Timer] = None - self._connections: Dict[str, BleakClient] = {} - self._connection_locks: Dict[str, Lock] = {} + self._connections: Dict[str, _BluetoothConnection] = {} + self._connection_locks: Dict[str, asyncio.Lock] = {} self._devices: Dict[str, BLEDevice] = {} self._entities: Dict[str, BluetoothDevice] = {} self._device_last_updated_at: Dict[str, float] = {} @@ -170,6 +185,13 @@ class BluetoothBlePlugin(AsyncRunnablePlugin, EntityManager): def _post_event( self, event_type: Type[BluetoothDeviceEvent], device: BLEDevice, **kwargs ): + if event_type == BluetoothDeviceNewDataEvent: + # Skip BluetoothDeviceNewDataEvent. They are only generated to + # infer if the device has sent some new data, but they can be very + # noisy and affect the latency - especially in case of connections + # that exchange a lot of data. + return + get_bus().post( event_type(address=device.address, **parse_device_args(device), **kwargs) ) @@ -254,6 +276,8 @@ class BluetoothBlePlugin(AsyncRunnablePlugin, EntityManager): self._post_event(event_type, device) self._device_last_updated_at[device.address] = time() + # Explicitly connect the child entities to their parent before + # flushing them for child in entity.children: child.parent = entity @@ -284,20 +308,30 @@ class BluetoothBlePlugin(AsyncRunnablePlugin, EntityManager): device: str, interface: Optional[str] = None, timeout: Optional[float] = None, - ) -> AsyncGenerator[BleakClient, None]: + close_event: Optional[asyncio.Event] = None, + ) -> AsyncGenerator[_BluetoothConnection, None]: dev = await self._get_device(device) - async with self._connection_locks.get(dev.address, Lock()) as lock: - self._connection_locks[dev.address] = lock or Lock() + async with self._connection_locks.get(dev.address, asyncio.Lock()) as lock: + self._connection_locks[dev.address] = lock or asyncio.Lock() async with BleakClient( dev.address, adapter=interface or self._interface, timeout=timeout or self._connect_timeout, ) as client: - self._connections[dev.address] = client - yield client + assert client, f'Could not connect to the device {device}' + self.logger.info('Connected to device %s', device) + self._connections[dev.address] = _BluetoothConnection( + client=client, + device=dev, + loop=asyncio.get_event_loop(), + thread=threading.current_thread(), + close_event=close_event, + ) + yield self._connections[dev.address] self._connections.pop(dev.address, None) + self.logger.info('Disconnected from device %s', device) async def _read( self, @@ -306,8 +340,8 @@ class BluetoothBlePlugin(AsyncRunnablePlugin, EntityManager): interface: Optional[str] = None, connect_timeout: Optional[float] = None, ) -> bytearray: - async with self._connect(device, interface, connect_timeout) as client: - data = await client.read_gatt_char(service_uuid) + async with self._connect(device, interface, connect_timeout) as conn: + data = await conn.client.read_gatt_char(service_uuid) return data @@ -319,8 +353,8 @@ class BluetoothBlePlugin(AsyncRunnablePlugin, EntityManager): interface: Optional[str] = None, connect_timeout: Optional[float] = None, ): - async with self._connect(device, interface, connect_timeout) as client: - await client.write_gatt_char(service_uuid, data) + async with self._connect(device, interface, connect_timeout) as conn: + await conn.client.write_gatt_char(service_uuid, data) async def _scan( self, @@ -370,6 +404,65 @@ class BluetoothBlePlugin(AsyncRunnablePlugin, EntityManager): self._scan_controller_timer = Timer(duration, timer_callback) self._scan_controller_timer.start() + @action + def connect( + self, + device: str, + interface: Optional[str] = None, + timeout: Optional[float] = None, + ): + """ + Pair and connect to a device by address or name. + + :param device: The device address or name. + :param interface: The Bluetooth interface to use (it overrides the + default ``interface``). + :param timeout: The connection timeout in seconds (it overrides the + default ``connect_timeout``). + """ + timeout = timeout or self._connect_timeout + connected_event = threading.Event() + close_event = asyncio.Event() + loop = asyncio.new_event_loop() + + def connect_thread(): + async def connect_wrapper(): + async with self._connect(device, interface, timeout, close_event): + connected_event.set() + await close_event.wait() + + asyncio.set_event_loop(loop) + loop.run_until_complete(connect_wrapper()) + + loop = get_or_create_event_loop() + connector = threading.Thread(target=connect_thread) + connector.start() + success = connected_event.wait(timeout=timeout) + assert success, f'Connection to {device} timed out' + + @action + def disconnect(self, device: str): + """ + Close an active connection to a device. + + :param device: The device address or name. + """ + address: Optional[str] = device + if not self._connections.get(device): + address = self._device_addr_by_name.get(device) + + assert address, f'Device {device} not found' + connection = self._connections.get(address, None) + assert connection, f'No active connections to the device {device} were found' + + if connection.close_event: + connection.close_event.set() + if connection.thread and connection.thread.is_alive(): + connection.thread.join(timeout=5) + assert not ( + connection.thread and connection.thread.is_alive() + ), f'Disconnection from {device} timed out' + @action def scan_pause(self, duration: Optional[float] = None): """ @@ -379,7 +472,9 @@ class BluetoothBlePlugin(AsyncRunnablePlugin, EntityManager): (default: null = indefinitely). """ if self._loop: - ensure_future(self._scan_state_set(False, duration), loop=self._loop) + asyncio.ensure_future( + self._scan_state_set(False, duration), loop=self._loop + ) @action def scan_resume(self, duration: Optional[float] = None): @@ -390,7 +485,7 @@ class BluetoothBlePlugin(AsyncRunnablePlugin, EntityManager): (default: null = indefinitely). """ if self._loop: - ensure_future(self._scan_state_set(True, duration), loop=self._loop) + asyncio.ensure_future(self._scan_state_set(True, duration), loop=self._loop) @action def scan( @@ -518,6 +613,15 @@ class BluetoothBlePlugin(AsyncRunnablePlugin, EntityManager): if self._scan_controller_timer: self._scan_controller_timer.cancel() + connections = list(self._connections.values()) + for conn in connections: + try: + self.disconnect(conn.device.address) + except Exception as e: + self.logger.warning( + 'Error while disconnecting from %s: %s', conn.device.address, e + ) + super().stop()