From 73bf2446bd195d7ee6789b05502bf2a68aebc3b8 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Sun, 19 Feb 2023 23:11:19 +0100 Subject: [PATCH] Wrap `bluetooth.connect` in a per-device locked section. --- platypush/plugins/bluetooth/ble/__init__.py | 24 ++++++++++++--------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/platypush/plugins/bluetooth/ble/__init__.py b/platypush/plugins/bluetooth/ble/__init__.py index c7a64e0c64..cd048b82ce 100644 --- a/platypush/plugins/bluetooth/ble/__init__.py +++ b/platypush/plugins/bluetooth/ble/__init__.py @@ -1,5 +1,5 @@ import base64 -from asyncio import Event, ensure_future +from asyncio import Event, Lock, ensure_future from contextlib import asynccontextmanager from threading import RLock, Timer from time import time @@ -127,6 +127,7 @@ class BluetoothBlePlugin(AsyncRunnablePlugin, EntityManager): self._scan_enabled = Event() self._scan_controller_timer: Optional[Timer] = None self._connections: Dict[str, BleakClient] = {} + self._connection_locks: Dict[str, Lock] = {} self._devices: Dict[str, BLEDevice] = {} self._device_last_updated_at: Dict[str, float] = {} self._device_name_by_addr = device_names or {} @@ -255,7 +256,6 @@ class BluetoothBlePlugin(AsyncRunnablePlugin, EntityManager): or old_props.get('TxPower') != new_props.get('TxPower') ): event_types.append(BluetoothDeviceSignalUpdateEvent) - else: event_types.append(BluetoothDeviceFoundEvent) @@ -278,14 +278,18 @@ class BluetoothBlePlugin(AsyncRunnablePlugin, EntityManager): timeout: Optional[float] = None, ) -> AsyncGenerator[BleakClient, None]: dev = await self._get_device(device) - async with BleakClient( - dev.address, - adapter=interface or self._interface, - timeout=timeout or self._connect_timeout, - ) as client: - self._connections[dev.address] = client - yield client - self._connections.pop(dev.address) + + async with self._connection_locks.get(dev.address, Lock()) as lock: + self._connection_locks[dev.address] = lock or Lock() + + async with BleakClient( + dev.address, + adapter=interface or self._interface, + timeout=timeout or self._connect_timeout, + ) as client: + self._connections[dev.address] = client + yield client + self._connections.pop(dev.address, None) async def _read( self,