- Addressed comments in #174

- Replaced active_scan flag with a list of track_devices
This commit is contained in:
Fabio Manganiello 2021-03-09 19:02:50 +01:00
parent b3606a8ac3
commit 093bac3a60

View file

@ -1,6 +1,6 @@
import time
from threading import Thread, RLock
from typing import Dict, Optional
from typing import Dict, Optional, List
from platypush.backend.sensor import SensorBackend
from platypush.context import get_plugin
@ -23,13 +23,12 @@ class BluetoothScannerBackend(SensorBackend):
"""
def __init__(self, device_id: Optional[int] = None, scan_duration: int = 10, active_scan: bool = False, **kwargs):
def __init__(self, device_id: Optional[int] = None, scan_duration: int = 10,
track_devices: Optional[List[str]] = None, **kwargs):
"""
:param device_id: Bluetooth adapter ID to use (default configured on the ``bluetooth`` plugin if None).
:param scan_duration: How long the scan should run (default: 10 seconds).
:param active_scan: Set to True if you want to actively track the devices through a periodic
``.lookup_name`` call. This is likely to produce a more accurate tracking, but it may negatively impact the
duration of devices that run on battery.
:param track_devices: List of addresses of devices to actively track, even if they aren't discoverable.
"""
super().__init__(plugin='bluetooth', plugin_args={
'device_id': device_id,
@ -37,10 +36,9 @@ class BluetoothScannerBackend(SensorBackend):
}, **kwargs)
self._last_seen_devices = {}
self._devices_to_track = set()
self._tracking_thread: Optional[Thread] = None
self._bt_lock = RLock()
self.active_scan = active_scan
self.track_devices = set(track_devices or [])
self.scan_duration = scan_duration
def _add_last_seen_device(self, dev):
@ -49,9 +47,6 @@ class BluetoothScannerBackend(SensorBackend):
self.bus.post(BluetoothDeviceFoundEvent(address=addr, **dev))
self._last_seen_devices[addr] = {'addr': addr, **dev}
if self.active_scan and addr not in self._devices_to_track:
self._devices_to_track.add(addr)
def _remove_last_seen_device(self, addr: str):
dev = self._last_seen_devices.get(addr)
if not dev:
@ -62,7 +57,7 @@ class BluetoothScannerBackend(SensorBackend):
def _addr_tracker(self, addr):
with self._bt_lock:
name = get_plugin('bluetooth').lookup_name(addr, timeout=self.scan_duration)
name = get_plugin('bluetooth').lookup_name(addr, timeout=self.scan_duration).name
if name is None:
self._remove_last_seen_device(addr)
@ -73,7 +68,7 @@ class BluetoothScannerBackend(SensorBackend):
self.logger.info('Starting Bluetooth tracker')
while not self.should_stop():
trackers = []
for addr in self._devices_to_track:
for addr in self.track_devices:
tracker = Thread(target=self._addr_tracker, args=(addr,))
tracker.start()
trackers.append(tracker)
@ -93,16 +88,13 @@ class BluetoothScannerBackend(SensorBackend):
for addr, dev in data.items():
self._add_last_seen_device(dev)
if not self.active_scan:
for addr, dev in self._last_seen_devices.copy().items():
if addr not in data:
self._remove_last_seen_device(addr)
for addr, dev in self._last_seen_devices.copy().items():
if addr not in data and addr not in self.track_devices:
self._remove_last_seen_device(addr)
def run(self):
if self.active_scan:
self._tracking_thread = Thread(target=self._bt_tracker)
self._tracking_thread.start()
self._tracking_thread = Thread(target=self._bt_tracker)
self._tracking_thread.start()
super().run()
def on_stop(self):