Added support for Bluetooth devices blacklist.

Based on device address, name or manufacturer.
This commit is contained in:
Fabio Manganiello 2023-03-24 01:52:39 +01:00
parent 0cebcf4f9b
commit d46d4e2300
Signed by: blacklight
GPG key ID: D90FBA7F76362774
6 changed files with 71 additions and 6 deletions

View file

@ -20,6 +20,7 @@ from platypush.message.event.bluetooth import (
from .._cache import EntityCache
from .._model import ServiceClass
from .._plugins import BaseBluetoothPlugin
from .._types import DevicesBlacklist
from ._cache import DeviceCache
from ._mappers import device_to_entity
@ -118,18 +119,21 @@ class EventHandler:
entity_cache: EntityCache,
plugins: Collection[BaseBluetoothPlugin],
exclude_known_noisy_beacons: bool,
blacklist: DevicesBlacklist,
):
"""
:param device_queue: Queue used to publish updated devices upstream.
:param device_cache: Device cache.
:param entity_cache: Entity cache.
:param exclude_known_noisy_beacons: Exclude known noisy beacons.
:param blacklist: Blacklist rules.
"""
self._device_queue = device_queue
self._device_cache = device_cache
self._entity_cache = entity_cache
self._plugins = plugins
self._exclude_known_noisy_beacons = exclude_known_noisy_beacons
self._blacklist = blacklist
def __call__(self, device: BLEDevice, data: AdvertisementData):
"""
@ -158,6 +162,10 @@ class EventHandler:
)
return
if self._blacklist.matches(new_entity):
logger.debug('Ignoring blacklisted device: %s', device.address)
return
# Extend the new entity with children entities added by the plugins
for plugin in self._plugins:
plugin.extend_device(new_entity)

View file

@ -68,6 +68,7 @@ class BLEManager(BaseBluetoothManager):
device_cache=self._device_cache,
entity_cache=self._cache,
plugins=self._plugins,
blacklist=self._blacklist,
exclude_known_noisy_beacons=self._exclude_known_noisy_beacons,
)
""" Bluetooth device event handler """
@ -206,7 +207,7 @@ class BLEManager(BaseBluetoothManager):
detection_callback=self._event_handler,
)
addresses = {dev.address for dev in devices}
addresses = {dev.address.lower() for dev in devices}
return [
dev
for addr, dev in self._cache.items()

View file

@ -294,7 +294,10 @@ class LegacyManager(BaseBluetoothManager):
for dev in devices.values():
self._service_scanned_devices[dev.address] = True
self._device_queue.put_nowait(dev)
if self._blacklist.matches(dev):
self.logger.debug('Ignoring blacklisted device: %s', dev.address)
else:
self._device_queue.put_nowait(dev)
return list(devices.values())

View file

@ -9,7 +9,7 @@ from platypush.entities.bluetooth import BluetoothDevice
from platypush.message.event.bluetooth import BluetoothDeviceEvent
from ._cache import EntityCache
from ._types import RawServiceClass
from ._types import DevicesBlacklist, RawServiceClass
class BaseBluetoothManager(ABC, threading.Thread):
@ -29,6 +29,7 @@ class BaseBluetoothManager(ABC, threading.Thread):
service_uuids: Optional[Collection[RawServiceClass]] = None,
device_cache: Optional[EntityCache] = None,
exclude_known_noisy_beacons: bool = True,
blacklist: Optional[DevicesBlacklist] = None,
**kwargs,
):
"""
@ -42,6 +43,7 @@ class BaseBluetoothManager(ABC, threading.Thread):
updates with the new parsed device entities.
:param device_cache: Cache used to keep track of discovered devices.
:param exclude_known_noisy_beacons: Exclude known noisy beacons.
:param blacklist: Blacklist of devices to exclude from discovery.
"""
from ._plugins import scan_plugins
@ -58,6 +60,7 @@ class BaseBluetoothManager(ABC, threading.Thread):
self._scan_enabled = scan_enabled
self._device_queue = device_queue
self._exclude_known_noisy_beacons = exclude_known_noisy_beacons
self._blacklist = blacklist or DevicesBlacklist()
self._cache = device_cache or EntityCache()
""" Cache of discovered devices. """

View file

@ -33,7 +33,7 @@ from platypush.plugins.db import DbPlugin
from ._ble import BLEManager
from ._cache import EntityCache
from ._legacy import LegacyManager
from ._types import RawServiceClass
from ._types import DevicesBlacklist, RawServiceClass
from ._manager import BaseBluetoothManager
@ -58,7 +58,7 @@ class BluetoothPlugin(RunnablePlugin, EnumSwitchEntityManager):
* **bluetooth-numbers** (``pip install bluetooth-numbers``)
* **TheengsDecoder** (``pip install TheengsDecoder``)
* **pybluez** (``pip install git+https://github.com/pybluez/pybluez``)
* **pyobex** (``pip install git+https://github.com/BlackLight/PyOBEX``)
* **PyOBEX** (``pip install git+https://github.com/BlackLight/PyOBEX``)
Triggers:
@ -90,6 +90,9 @@ class BluetoothPlugin(RunnablePlugin, EnumSwitchEntityManager):
scan_paused_on_start: bool = False,
poll_interval: float = _default_scan_duration,
exclude_known_noisy_beacons: bool = True,
ignored_device_addresses: Optional[Collection[str]] = None,
ignored_device_names: Optional[Collection[str]] = None,
ignored_device_manufacturers: Optional[Collection[str]] = None,
**kwargs,
):
"""
@ -110,6 +113,10 @@ class BluetoothPlugin(RunnablePlugin, EnumSwitchEntityManager):
Disable this flag if you need to track BLE beacons from these
devices, but beware that you may need periodically clean up your
list of scanned devices.
:param ignored_device_addresses: List of device addresses to ignore.
:param ignored_device_names: List of device names to ignore.
:param ignored_device_manufacturers: List of device manufacturers to
ignore.
"""
kwargs['poll_interval'] = poll_interval
super().__init__(**kwargs)
@ -136,6 +143,13 @@ class BluetoothPlugin(RunnablePlugin, EnumSwitchEntityManager):
self._excluded_known_noisy_beacons = exclude_known_noisy_beacons
""" Exclude known noisy BLE beacons. """
self._blacklist = DevicesBlacklist(
addresses=set(ignored_device_addresses or []),
names=set(ignored_device_names or []),
manufacturers=set(ignored_device_manufacturers or []),
)
""" Blacklist rules for the devices to ignore. """
self._managers: Dict[Type[BaseBluetoothManager], BaseBluetoothManager] = {}
"""
Bluetooth managers threads, one for BLE devices and one for non-BLE
@ -175,6 +189,7 @@ class BluetoothPlugin(RunnablePlugin, EnumSwitchEntityManager):
'service_uuids': list(map(BluetoothService.to_uuid, self._service_uuids)),
'device_cache': self._device_cache,
'exclude_known_noisy_beacons': self._excluded_known_noisy_beacons,
'blacklist': self._blacklist,
}
self._managers = {

View file

@ -1,4 +1,5 @@
from typing import Union
from dataclasses import dataclass, field
from typing import Collection, Union
from uuid import UUID
RawServiceClass = Union[UUID, int]
@ -6,3 +7,37 @@ RawServiceClass = Union[UUID, int]
Raw type for service classes received by pybluez.
Can be either a 16-bit integer or a UUID.
"""
@dataclass
class DevicesBlacklist:
"""
A data class representing the rules for blacklisting devices.
"""
addresses: Collection[str] = field(default_factory=set[str])
""" MAC addresses to ignore. """
names: Collection[str] = field(default_factory=set[str])
""" Device names to ignore. """
manufacturers: Collection[str] = field(default_factory=set[str])
""" Manufacturers strings to ignore. """
def __post_init__(self):
"""
Normalize the blacklist rules.
"""
# Normalize case-sensitivity for addresses
self.addresses = set(map(str.lower, self.addresses))
# Make sure that all the collections are sets
self.names = set(self.names)
self.manufacturers = set(self.manufacturers)
def matches(self, device) -> bool:
"""
:type device: :class:`platypush.entities.bluetooth.BluetoothDevice`.
"""
return (
device.address.lower() in self.addresses
or device.name in self.names
or device.manufacturer in self.manufacturers
)