From aa0b909fffabe38568930a2d6d89c064b0c58f11 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Mon, 20 Feb 2023 20:27:17 +0100 Subject: [PATCH] Use the TheengsDecoder to parse Bluetooth packets and map services to native entities. --- docs/source/conf.py | 1 + platypush/plugins/bluetooth/ble/__init__.py | 158 +++++------ platypush/plugins/bluetooth/ble/_mappers.py | 253 ++++++++++++++++++ platypush/plugins/bluetooth/ble/manifest.yaml | 1 + 4 files changed, 334 insertions(+), 79 deletions(-) create mode 100644 platypush/plugins/bluetooth/ble/_mappers.py diff --git a/docs/source/conf.py b/docs/source/conf.py index bc25e1b3f0..ecc0ca5445 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -298,6 +298,7 @@ autodoc_mock_imports = [ 'async_lru', 'bleak', 'bluetooth_numbers', + 'TheengsGateway', ] sys.path.insert(0, os.path.abspath('../..')) diff --git a/platypush/plugins/bluetooth/ble/__init__.py b/platypush/plugins/bluetooth/ble/__init__.py index cd048b82ce..2dd95ee932 100644 --- a/platypush/plugins/bluetooth/ble/__init__.py +++ b/platypush/plugins/bluetooth/ble/__init__.py @@ -19,8 +19,6 @@ from uuid import UUID from bleak import BleakClient, BleakScanner from bleak.backends.device import BLEDevice from bleak.backends.scanner import AdvertisementData -from bleak.uuids import uuidstr_to_str -from bluetooth_numbers import company from typing_extensions import override from platypush.context import get_bus, get_or_create_event_loop @@ -44,6 +42,8 @@ from platypush.message.event.bluetooth import ( ) from platypush.plugins import AsyncRunnablePlugin, action +from ._mappers import device_to_entity, parse_device_args + UUIDType = Union[str, UUID] @@ -58,6 +58,7 @@ class BluetoothBlePlugin(AsyncRunnablePlugin, EntityManager): * **bleak** (``pip install bleak``) * **bluetooth-numbers** (``pip install bluetooth-numbers``) + * **TheengsGateway** (``pip install git+https://github.com/BlackLight/TheengsGateway``) Triggers: @@ -129,6 +130,7 @@ class BluetoothBlePlugin(AsyncRunnablePlugin, EntityManager): self._connections: Dict[str, BleakClient] = {} self._connection_locks: Dict[str, Lock] = {} self._devices: Dict[str, BLEDevice] = {} + self._entities: Dict[str, BluetoothDevice] = {} self._device_last_updated_at: Dict[str, float] = {} self._device_name_by_addr = device_names or {} self._device_addr_by_name = { @@ -156,104 +158,68 @@ class BluetoothBlePlugin(AsyncRunnablePlugin, EntityManager): assert dev is not None, f'Unknown device: "{device}"' return dev - def _get_device_name(self, device: BLEDevice) -> str: - return ( - self._device_name_by_addr.get(device.address) - or device.name - or device.address - ) - def _post_event( self, event_type: Type[BluetoothDeviceEvent], device: BLEDevice, **kwargs ): get_bus().post( - event_type( - address=device.address, **self._parse_device_args(device), **kwargs - ) + event_type(address=device.address, **parse_device_args(device), **kwargs) ) - def _parse_device_args(self, device: BLEDevice) -> Dict[str, Any]: - props = device.details.get('props', {}) - return { - 'name': self._get_device_name(device), - 'connected': props.get('Connected', False), - 'paired': props.get('Paired', False), - 'blocked': props.get('Blocked', False), - 'trusted': props.get('Trusted', False), - 'rssi': device.rssi, - 'tx_power': props.get('TxPower'), - 'uuids': { - uuid: uuidstr_to_str(uuid) for uuid in device.metadata.get('uuids', []) - }, - 'manufacturers': { - manufacturer_id: company.get(manufacturer_id, 'Unknown') - for manufacturer_id in sorted( - device.metadata.get('manufacturer_data', {}).keys() - ) - }, - 'manufacturer_data': self._parse_manufacturer_data(device), - 'service_data': self._parse_service_data(device), - } + def _on_device_event(self, device: BLEDevice, data: AdvertisementData): + """ + Device advertisement packet callback handler. - @staticmethod - def _parse_manufacturer_data(device: BLEDevice) -> Dict[int, str]: - return { - manufacturer_id: ':'.join([f'{x:02x}' for x in value]) - for manufacturer_id, value in device.metadata.get( - 'manufacturer_data', {} - ).items() - } + 1. It generates the relevant + :class:`platypush.message.event.bluetooth.BluetoothDeviceEvent` if the + state of the device has changed. - @staticmethod - def _parse_service_data(device: BLEDevice) -> Dict[str, str]: - return { - service_uuid: ':'.join([f'{x:02x}' for x in value]) - for service_uuid, value in device.details.get('props', {}) - .get('ServiceData', {}) - .items() - } + 2. It builds the relevant + :class:`platypush.entity.bluetooth.BluetoothDevice` entity object + populated with children entities that contain the supported + properties. + + :param device: The Bluetooth device. + :param data: The advertisement data. + """ - def _on_device_event(self, device: BLEDevice, _: AdvertisementData): event_types: List[Type[BluetoothDeviceEvent]] = [] - existing_device = self._devices.get(device.address) + existing_entity = self._entities.get(device.address) + entity = device_to_entity(device, data) - if existing_device: - old_props = existing_device.details.get('props', {}) - new_props = device.details.get('props', {}) - - if old_props.get('Paired') != new_props.get('Paired'): + if existing_entity: + if existing_entity.paired != entity.paired: event_types.append( BluetoothDevicePairedEvent - if new_props.get('Paired') + if entity.paired else BluetoothDeviceUnpairedEvent ) - if old_props.get('Connected') != new_props.get('Connected'): + if existing_entity.connected != entity.connected: event_types.append( BluetoothDeviceConnectedEvent - if new_props.get('Connected') + if entity.connected else BluetoothDeviceDisconnectedEvent ) - if old_props.get('Blocked') != new_props.get('Blocked'): + if existing_entity.blocked != entity.blocked: event_types.append( BluetoothDeviceBlockedEvent - if new_props.get('Blocked') + if entity.blocked else BluetoothDeviceUnblockedEvent ) - if old_props.get('Trusted') != new_props.get('Trusted'): + if existing_entity.trusted != entity.trusted: event_types.append( BluetoothDeviceTrustedEvent - if new_props.get('Trusted') + if entity.trusted else BluetoothDeviceUntrustedEvent ) if ( time() - self._device_last_updated_at.get(device.address, 0) ) >= self._rssi_update_interval and ( - existing_device.rssi != device.rssi - or old_props.get('TxPower') != new_props.get('TxPower') + existing_entity.rssi != device.rssi + or existing_entity.tx_power != entity.tx_power ): event_types.append(BluetoothDeviceSignalUpdateEvent) else: @@ -267,9 +233,32 @@ class BluetoothBlePlugin(AsyncRunnablePlugin, EntityManager): if event_types: for event_type in event_types: self._post_event(event_type, device) - self.publish_entities([device]) self._device_last_updated_at[device.address] = time() + for child in entity.children: + child.parent = entity + + self.publish_entities([entity]) + + def _has_changed(self, entity: BluetoothDevice) -> bool: + existing_entity = self._entities.get(entity.id or entity.external_id) + + # If the entity didn't exist before, it's a new device. + if not existing_entity: + return True + + entity_dict = entity.to_json() + existing_entity_dict = entity.to_json() + + # Check if any of the root attributes changed, excluding those that are + # managed by the entities engine). + return any( + attr + for attr, value in entity_dict.items() + if value != existing_entity_dict.get(attr) + and attr not in {'id', 'external_id', 'plugin', 'updated_at'} + ) + @asynccontextmanager async def _connect( self, @@ -318,7 +307,6 @@ class BluetoothBlePlugin(AsyncRunnablePlugin, EntityManager): self, duration: Optional[float] = None, uuids: Optional[Collection[UUIDType]] = None, - publish_entities: bool = False, ) -> Collection[Entity]: with self._scan_lock: timeout = duration or self.poll_interval or 5 @@ -330,11 +318,14 @@ class BluetoothBlePlugin(AsyncRunnablePlugin, EntityManager): ) self._devices.update({dev.address: dev for dev in devices}) - return ( - self.publish_entities(devices) - if publish_entities - else self.transform_entities(devices) - ) + addresses = {dev.address.lower() for dev in devices} + return [ + dev + for addr, dev in self._entities.items() + if isinstance(dev, BluetoothDevice) + and addr.lower() in addresses + and dev.reachable + ] async def _scan_state_set(self, state: bool, duration: Optional[float] = None): def timer_callback(): @@ -397,9 +388,7 @@ class BluetoothBlePlugin(AsyncRunnablePlugin, EntityManager): :param uuids: List of characteristic UUIDs to discover. Default: all. """ loop = get_or_create_event_loop() - return loop.run_until_complete( - self._scan(duration, uuids, publish_entities=True) - ) + return loop.run_until_complete(self._scan(duration, uuids)) @action def read( @@ -460,15 +449,25 @@ class BluetoothBlePlugin(AsyncRunnablePlugin, EntityManager): """ return self.scan().output + @override + def publish_entities( + self, entities: Optional[Collection[Any]] + ) -> Collection[Entity]: + self._entities.update({entity.id: entity for entity in (entities or [])}) + + return super().publish_entities(entities) + @override def transform_entities( - self, entities: Collection[BLEDevice] + self, entities: Collection[Union[BLEDevice, BluetoothDevice]] ) -> Collection[BluetoothDevice]: return [ BluetoothDevice( id=dev.address, - **self._parse_device_args(dev), + **parse_device_args(dev), ) + if isinstance(dev, BLEDevice) + else dev for dev in entities ] @@ -480,7 +479,7 @@ class BluetoothBlePlugin(AsyncRunnablePlugin, EntityManager): await self._scan_enabled.wait() entities = await self._scan() - new_device_addresses = {e.id for e in entities} + new_device_addresses = {e.external_id for e in entities} missing_device_addresses = device_addresses - new_device_addresses missing_devices = [ dev @@ -491,6 +490,7 @@ class BluetoothBlePlugin(AsyncRunnablePlugin, EntityManager): for dev in missing_devices: self._post_event(BluetoothDeviceLostEvent, dev) self._devices.pop(dev.address, None) + self._entities.pop(dev.address, None) device_addresses = new_device_addresses diff --git a/platypush/plugins/bluetooth/ble/_mappers.py b/platypush/plugins/bluetooth/ble/_mappers.py new file mode 100644 index 0000000000..ac5d4a330f --- /dev/null +++ b/platypush/plugins/bluetooth/ble/_mappers.py @@ -0,0 +1,253 @@ +import json +import struct +from dataclasses import dataclass, field +from typing import Any, Callable, Dict, Optional + +from bleak.backends.device import BLEDevice +from bleak.backends.scanner import AdvertisementData +from bleak.uuids import uuidstr_to_str +from bluetooth_numbers import company + +# pylint: disable=no-name-in-module +from TheengsGateway._decoder import decodeBLE, getAttribute, getProperties + +from platypush.entities import Entity +from platypush.entities.batteries import Battery +from platypush.entities.bluetooth import BluetoothDevice +from platypush.entities.electricity import ( + CurrentSensor, + EnergySensor, + PowerSensor, + VoltageSensor, +) +from platypush.entities.humidity import HumiditySensor +from platypush.entities.illuminance import IlluminanceSensor +from platypush.entities.motion import MotionSensor +from platypush.entities.sensors import BinarySensor, NumericSensor, RawSensor +from platypush.entities.temperature import TemperatureSensor + + +@dataclass +class TheengsEntity: + """ + Utility class to store the data parsed from the Theengs library. + """ + + data: dict = field(default_factory=dict) + properties: dict = field(default_factory=dict) + brand: Optional[str] = None + model: Optional[str] = None + model_id: Optional[str] = None + + +# Maps property names to transformer methods (first mapper choice). +_property_to_entity: Dict[str, Callable[[Any, Dict[str, Any]], Entity]] = { + 'battery': lambda value, conf: Battery( + value=value, + unit=conf.get('unit', '%'), + min=conf.get('min', 0), + max=conf.get('min', 100), + ), + 'current': lambda value, conf: CurrentSensor( + value=value, + unit=conf.get('unit', 'A'), + ), + 'energy': lambda value, conf: EnergySensor( + value=value, + unit=conf.get('unit', 'kWh'), + ), + 'humidity': lambda value, conf: HumiditySensor( + value=value, + unit=conf.get('unit', '%'), + min=conf.get('min', 0), + max=conf.get('min', 100), + ), + 'light level': lambda value, _: IlluminanceSensor(value=value), + 'power': lambda value, conf: PowerSensor( + value=value, + unit=conf.get('unit', 'W'), + ), + 'motion': lambda value, _: MotionSensor(value=value), + 'temperature': lambda value, conf: TemperatureSensor( + value=value, + unit=conf.get('unit', 'C'), + ), + 'voltage': lambda value, conf: VoltageSensor( + value=value, + unit=conf.get('unit', 'V'), + ), +} + +# Maps reported units to transformer methods (second mapper choice). +_unit_to_entity: Dict[str, Callable[[Any, Dict[str, Any]], Entity]] = { + 'status': lambda value, _: BinarySensor(value=value), + 'int': lambda value, _: NumericSensor(value=value), + '%': lambda value, conf: NumericSensor( + value=value, + unit='%', + min=conf.get('min', 0), + max=conf.get('min', 100), + ), +} + + +# Maps value types to transformer methods (third mapper choice). +_value_type_to_entity: Dict[type, Callable[[Any, Dict[str, Any]], Entity]] = { + bool: lambda value, _: BinarySensor(value=value), + int: lambda value, _: NumericSensor(value=value), + float: lambda value, _: NumericSensor(value=value), +} + + +def device_to_entity(device: BLEDevice, data: AdvertisementData) -> BluetoothDevice: + """ + Convert the data received from a Bluetooth advertisement packet into a + compatible Platypush :class:`platypush.entity.bluetooth.BluetoothDevice` + entity, with the discovered services and characteristics exposed as children + entities. + """ + + theengs_entity = _parse_advertisement_data(data) + parent_entity = BluetoothDevice( + id=device.address, + model=theengs_entity.model, + brand=theengs_entity.brand, + reachable=True, + **parse_device_args(device), + ) + + parsed_entities = { + # Check if we can infer an entity mapper from the property name. + conf.get('name', name): _property_to_entity.get( + conf.get('name'), + # If not, check if we can infer an entity mapper from the reported unit. + _unit_to_entity.get( + conf.get('unit'), + # If not, check if we can infer an entity mapper from the value type. + _value_type_to_entity.get( + type(theengs_entity.data.get(name)), + # If not, default to a raw sensor. + lambda value, _: RawSensor(value=value), + ), + ), + )(theengs_entity.data.get(name), conf) + for name, conf in theengs_entity.properties.items() + } + + for prop, entity in parsed_entities.items(): + entity.id = f'{parent_entity.id}:{prop}' + entity.name = prop + parent_entity.children.append(entity) + + return parent_entity + + +def _parse_advertisement_data(data: AdvertisementData) -> TheengsEntity: + """ + :param data: The data received from a Bluetooth advertisement packet. + :return: A :class:`platypush.entity.bluetooth.TheengsEntity` instance that + maps the parsed attributes. + """ + + entity_args, properties, brand, model, model_id = ({}, {}, None, None, None) + + if data.service_data: + parsed_data = list(data.service_data.keys())[0] + # TheengsDecoder only accepts 16 bit uuid's, this converts the 128 bit uuid to 16 bit. + entity_args['servicedatauuid'] = parsed_data[4:8] + parsed_data = str(list(data.service_data.values())[0].hex()) + entity_args['servicedata'] = parsed_data + + if data.manufacturer_data: + parsed_data = str( + struct.pack(' Dict[str, Any]: + """ + :param device: The device to parse. + :return: The mapped device arguments required to initialize a + :class:`platypush.entity.bluetooth.BluetoothDevice` or + :class:`platypush.message.event.bluetooth.BluetoothDeviceEvent` + object. + """ + + props = device.details.get('props', {}) + return { + 'name': device.name or device.address, + 'connected': props.get('Connected', False), + 'paired': props.get('Paired', False), + 'blocked': props.get('Blocked', False), + 'trusted': props.get('Trusted', False), + 'rssi': device.rssi, + 'tx_power': props.get('TxPower'), + 'uuids': { + uuid: uuidstr_to_str(uuid) for uuid in device.metadata.get('uuids', []) + }, + 'manufacturers': { + manufacturer_id: company.get(manufacturer_id, 'Unknown') + for manufacturer_id in sorted( + device.metadata.get('manufacturer_data', {}).keys() + ) + }, + 'manufacturer_data': _parse_manufacturer_data(device), + 'service_data': _parse_service_data(device), + } + + +def _parse_manufacturer_data(device: BLEDevice) -> Dict[int, str]: + """ + :param device: The device to parse. + :return: The manufacturer data as a ``manufacturer_id -> hex_string`` + mapping. + """ + return { + manufacturer_id: ''.join([f'{x:02x}' for x in value]) + for manufacturer_id, value in device.metadata.get( + 'manufacturer_data', {} + ).items() + } + + +def _parse_service_data(device: BLEDevice) -> Dict[str, str]: + """ + :param device: The device to parse. + :return: The service data as a ``service_uuid -> hex_string`` mapping. + """ + return { + service_uuid: ''.join([f'{x:02x}' for x in value]) + for service_uuid, value in device.details.get('props', {}) + .get('ServiceData', {}) + .items() + } diff --git a/platypush/plugins/bluetooth/ble/manifest.yaml b/platypush/plugins/bluetooth/ble/manifest.yaml index 4e65f46b2f..e1ada83059 100644 --- a/platypush/plugins/bluetooth/ble/manifest.yaml +++ b/platypush/plugins/bluetooth/ble/manifest.yaml @@ -16,5 +16,6 @@ manifest: pip: - bleak - bluetooth-numbers + - git+https://github.com/BlackLight/TheengsGateway package: platypush.plugins.bluetooth.ble type: plugin