import time from threading import Thread, RLock from typing import Dict, Optional from platypush.backend.sensor import SensorBackend from platypush.context import get_plugin from platypush.message.event.bluetooth import BluetoothDeviceFoundEvent, BluetoothDeviceLostEvent class BluetoothScannerBackend(SensorBackend): """ This backend periodically scans for available bluetooth devices and returns events when a devices enter or exits the range. Triggers: * :class:`platypush.message.event.bluetooth.BluetoothDeviceFoundEvent` when a new bluetooth device is found. * :class:`platypush.message.event.bluetooth.BluetoothDeviceLostEvent` when a bluetooth device is lost. Requires: * The :class:`platypush.plugins.bluetooth.BluetoothPlugin` plugin working. """ def __init__(self, device_id: Optional[int] = None, scan_duration: int = 10, active_scan: bool = False, **kwargs): """ :param device_id: Bluetooth adapter ID to use (default configured on the ``bluetooth`` plugin if None). :param scan_duration: How long the scan should run (default: 10 seconds). :param active_scan: Set to True if you want to actively track the devices through a periodic ``.lookup_name`` call. This is likely to produce a more accurate tracking, but it may negatively impact the duration of devices that run on battery. """ super().__init__(plugin='bluetooth', plugin_args={ 'device_id': device_id, 'duration': scan_duration, }, **kwargs) self._last_seen_devices = {} self._devices_to_track = set() self._tracking_thread: Optional[Thread] = None self._bt_lock = RLock() self.active_scan = active_scan self.scan_duration = scan_duration def _add_last_seen_device(self, dev): addr = dev.pop('addr') if addr not in self._last_seen_devices: self.bus.post(BluetoothDeviceFoundEvent(address=addr, **dev)) self._last_seen_devices[addr] = {'addr': addr, **dev} if self.active_scan and addr not in self._devices_to_track: self._devices_to_track.add(addr) def _remove_last_seen_device(self, addr: str): dev = self._last_seen_devices.get(addr) if not dev: return self.bus.post(BluetoothDeviceLostEvent(address=addr, **dev)) del self._last_seen_devices[addr] def _addr_tracker(self, addr): with self._bt_lock: name = get_plugin('bluetooth').lookup_name(addr, timeout=self.scan_duration) if name is None: self._remove_last_seen_device(addr) else: self._add_last_seen_device({'addr': addr, 'name': name}) def _bt_tracker(self): self.logger.info('Starting Bluetooth tracker') while not self.should_stop(): trackers = [] for addr in self._devices_to_track: tracker = Thread(target=self._addr_tracker, args=(addr,)) tracker.start() trackers.append(tracker) for tracker in trackers: tracker.join(timeout=self.scan_duration) time.sleep(self.scan_duration) self.logger.info('Bluetooth tracker stopped') def get_measurement(self): with self._bt_lock: return super().get_measurement() def process_data(self, data: Dict[str, dict], new_data: Dict[str, dict]): for addr, dev in data.items(): self._add_last_seen_device(dev) if not self.active_scan: for addr, dev in self._last_seen_devices.copy().items(): if addr not in data: self._remove_last_seen_device(addr) def run(self): if self.active_scan: self._tracking_thread = Thread(target=self._bt_tracker) self._tracking_thread.start() super().run() def on_stop(self): super().on_stop() if self._tracking_thread and self._tracking_thread.is_alive(): self.logger.info('Waiting for the Bluetooth tracking thread to stop') self._tracking_thread.join(timeout=self.scan_duration) # vim:sw=4:ts=4:et: