diff --git a/platypush/entities/bluetooth.py b/platypush/entities/bluetooth.py deleted file mode 100644 index 0a546459f..000000000 --- a/platypush/entities/bluetooth.py +++ /dev/null @@ -1,79 +0,0 @@ -from sqlalchemy import ( - Boolean, - Column, - ForeignKey, - Integer, - JSON, - String, -) - -from platypush.common.db import Base - -from .devices import Device - - -if 'bluetooth_device' not in Base.metadata: - - class BluetoothDevice(Device): - """ - Entity that represents a Bluetooth device. - """ - - __tablename__ = 'bluetooth_device' - - id = Column( - Integer, ForeignKey(Device.id, ondelete='CASCADE'), primary_key=True - ) - - connected = Column(Boolean, default=False) - """ Whether the device is connected. """ - - paired = Column(Boolean, default=False) - """ Whether the device is paired. """ - - trusted = Column(Boolean, default=False) - """ Whether the device is trusted. """ - - blocked = Column(Boolean, default=False) - """ Whether the device is blocked. """ - - rssi = Column(Integer, default=None) - """ Received Signal Strength Indicator. """ - - tx_power = Column(Integer, default=None) - """ Reported transmission power. """ - - manufacturers = Column(JSON) - """ Registered manufacturers for the device, as an ID -> Name map. """ - - uuids = Column(JSON) - """ - Service/characteristic UUIDs exposed by the device, as a - UUID -> Name map. - """ - - brand = Column(String) - """ Device brand, as a string. """ - - model = Column(String) - """ Device model, as a string. """ - - model_id = Column(String) - """ Device model ID. """ - - manufacturer_data = Column(JSON) - """ - Latest manufacturer data published by the device, as a - ``manufacturer_id -> data`` map, where ``data`` is a hexadecimal - string. - """ - - service_data = Column(JSON) - """ - Latest service data published by the device, as a ``service_uuid -> - data`` map, where ``data`` is a hexadecimal string. - """ - - __mapper_args__ = { - 'polymorphic_identity': __tablename__, - } diff --git a/platypush/entities/bluetooth/__init__.py b/platypush/entities/bluetooth/__init__.py new file mode 100644 index 000000000..48a3fc791 --- /dev/null +++ b/platypush/entities/bluetooth/__init__.py @@ -0,0 +1,7 @@ +from ._device import BluetoothDevice +from ._service import BluetoothService + +__all__ = [ + "BluetoothDevice", + "BluetoothService", +] diff --git a/platypush/entities/bluetooth/_device.py b/platypush/entities/bluetooth/_device.py new file mode 100644 index 000000000..0c4f2a98e --- /dev/null +++ b/platypush/entities/bluetooth/_device.py @@ -0,0 +1,167 @@ +from typing import Dict, Iterable, List +from typing_extensions import override + +from sqlalchemy import ( + Boolean, + Column, + ForeignKey, + Integer, + JSON, + String, +) + +from platypush.common.db import Base +from platypush.plugins.bluetooth.model import ( + MajorDeviceClass, + MajorServiceClass, + MinorDeviceClass, + ServiceClass, +) + +from ..devices import Device +from ._service import BluetoothService + + +if 'bluetooth_device' not in Base.metadata: + + class BluetoothDevice(Device): + """ + Entity that represents a Bluetooth device. + """ + + __tablename__ = 'bluetooth_device' + + id = Column( + Integer, ForeignKey(Device.id, ondelete='CASCADE'), primary_key=True + ) + + address = Column(String, nullable=False) + """ Device address. """ + + connected = Column(Boolean, default=False) + """ Whether the device is connected. """ + + supports_ble = Column(Boolean, default=False) + """ + Whether the device supports the Bluetooth Low-Energy specification. + """ + + supports_legacy = Column(Boolean, default=False) + """ + Whether the device supports the legacy (non-BLE) specification. + """ + + rssi = Column(Integer, default=None) + """ Received Signal Strength Indicator. """ + + tx_power = Column(Integer, default=None) + """ Reported transmission power. """ + + _major_service_classes = Column("major_service_classes", JSON, default=None) + """ The reported major service classes, as a list of strings. """ + + _major_device_class = Column("major_device_class", String, default=None) + """ The reported major device class. """ + + _minor_device_classes = Column("minor_device_classes", JSON, default=None) + """ The reported minor device classes, as a list of strings. """ + + manufacturer = Column(String, default=None) + """ Device manufacturer, as a string. """ + + model = Column(String, default=None) + """ Device model, as a string. """ + + model_id = Column(String, default=None) + """ Device model ID. """ + + __mapper_args__ = { + 'polymorphic_identity': __tablename__, + } + + @property + def major_device_class(self) -> MajorDeviceClass: + ret = MajorDeviceClass.UNKNOWN + if self._major_device_class: + matches = [ + cls + for cls in MajorDeviceClass + if cls.value.name == self._major_device_class + ] + + if matches: + ret = matches[0] + + return ret + + @major_device_class.setter + def major_device_class(self, value: MajorDeviceClass): + self._major_device_class = value.value.name + + @property + def minor_device_classes(self) -> List[MinorDeviceClass]: + ret = [] + for dev_cls in self._minor_device_classes or []: + matches = [cls for cls in MinorDeviceClass if cls.value.name == dev_cls] + + if matches: + ret.append(matches[0]) + + return ret + + @minor_device_classes.setter + def minor_device_classes(self, value: Iterable[MinorDeviceClass]): + self._minor_device_classes = [cls.value.name for cls in (value or [])] + + @property + def major_service_classes(self) -> List[MajorServiceClass]: + ret = [] + for dev_cls in self._major_service_classes or []: + matches = [ + cls for cls in MajorServiceClass if cls.value.name == dev_cls + ] + + if matches: + ret.append(matches[0]) + + return ret + + @major_service_classes.setter + def major_service_classes(self, value: Iterable[MajorServiceClass]): + self._major_service_classes = [cls.value.name for cls in (value or [])] + + @property + def services(self) -> List[BluetoothService]: + """ + :return: List of + :class:`platypush.plugins.bluetooth.model.BluetoothService` mapping + all the services exposed by the device. + """ + return [ + child for child in self.children if isinstance(child, BluetoothService) + ] + + @property + def known_services(self) -> Dict[ServiceClass, "BluetoothService"]: + """ + Known services exposed by the device, indexed by + :class:`platypush.plugins.bluetooth.model.ServiceClass` enum value. + """ + return { + child.service_class: child + for child in self.children + if isinstance(child, BluetoothService) + and child.service_class != ServiceClass.UNKNOWN + } + + @override + def to_dict(self): + """ + Overwrites ``to_dict`` to transform private column names into their + public representation, and also include the exposed services and + child entities. + """ + return { + **{k.lstrip('_'): v for k, v in super().to_dict().items()}, + 'children': [child.to_dict() for child in self.children], + } diff --git a/platypush/entities/bluetooth/_service.py b/platypush/entities/bluetooth/_service.py new file mode 100644 index 000000000..699bc91ba --- /dev/null +++ b/platypush/entities/bluetooth/_service.py @@ -0,0 +1,133 @@ +from typing import Union +from typing_extensions import override +from uuid import UUID + +from sqlalchemy import ( + Boolean, + Column, + ForeignKey, + Integer, + String, +) + +from platypush.common.db import Base +from platypush.entities import Entity +from platypush.plugins.bluetooth.model import ( + Protocol, + RawServiceClass, + ServiceClass, +) + +if 'bluetooth_service' not in Base.metadata: + + class BluetoothService(Entity): + """ + Entity that represents a Bluetooth service. + """ + + __tablename__ = 'bluetooth_service' + + id = Column( + Integer, ForeignKey(Entity.id, ondelete='CASCADE'), primary_key=True + ) + + _uuid = Column('uuid', String, nullable=False) + """ + The service class UUID. It can be either a 16-bit or a 128-bit UUID. + """ + + _protocol = Column('protocol', String, default=None) + """ The protocol used by the service. """ + + port = Column(Integer, default=None) + """ The port used by the service. """ + + version = Column(Integer, default=None) + """ The version of the service profile. """ + + is_ble = Column(Boolean, default=False) + """ Whether the service is a BLE service. """ + + __mapper_args__ = { + 'polymorphic_identity': __tablename__, + } + + @staticmethod + def to_uuid(value: Union[str, RawServiceClass]) -> RawServiceClass: + """ + Convert a raw UUID string to a service class UUID. + """ + # If it's already a UUID or an int, just return it. + if isinstance(value, (UUID, int)): + return value + try: + # If it's formatted like a 128-bit UUID, convert it to a UUID + # object. + return UUID(value) + except ValueError: + # Hex string case + return int(value, 16) + + @property + def uuid(self) -> RawServiceClass: + """ + Getter for the service class UUID. + """ + return self.to_uuid(self._uuid) + + @uuid.setter + def uuid(self, value: Union[RawServiceClass, str]): + """ + Setter for the service class UUID. + """ + uuid: Union[RawServiceClass, str] = self.to_uuid(value) + if isinstance(uuid, int): + # Hex-encoded 16-bit UUID case + uuid = f'{uuid:04X}' + + self._uuid = str(uuid) + + @property + def protocol(self) -> Protocol: + """ + Getter for the protocol used by the service. + """ + try: + return Protocol(self._protocol) + except ValueError: + return Protocol.UNKNOWN + + @protocol.setter + def protocol(self, value: Union[str, Protocol]): + """ + Setter for the protocol used by the service. + """ + protocol = Protocol.UNKNOWN + if isinstance(value, Protocol): + protocol = value + else: + try: + protocol = Protocol(value) + except ValueError: + pass + + self._protocol = protocol.value + + @property + def service_class(self) -> ServiceClass: + """ + The :class:`platypush.plugins.bluetooth.model.ServiceClass` enum + value. + """ + try: + return ServiceClass.get(self.uuid) + except (TypeError, ValueError): + return ServiceClass.UNKNOWN + + @override + def to_dict(self) -> dict: + return { + **{k.lstrip('_'): v for k, v in super().to_dict().items()}, + # Human-readable service class name + 'service_class': str(self.service_class), + } diff --git a/platypush/message/event/bluetooth/__init__.py b/platypush/message/event/bluetooth/__init__.py index 5109c06ad..11be2cd3f 100644 --- a/platypush/message/event/bluetooth/__init__.py +++ b/platypush/message/event/bluetooth/__init__.py @@ -1,5 +1,6 @@ -from typing import Dict, Optional +from typing import Iterable, Optional +from platypush.entities.bluetooth import BluetoothDevice from platypush.message.event import Event @@ -27,19 +28,7 @@ class BluetoothScanResumedEvent(BluetoothEvent): super().__init__(*args, duration=duration, **kwargs) -class BluetoothWithPortEvent(Event): - """ - Base class for Bluetooth events with an associated port. - """ - - def __init__(self, *args, port: Optional[str] = None, **kwargs): - """ - :param port: The communication port of the device. - """ - super().__init__(*args, port=port, **kwargs) - - -class BluetoothDeviceEvent(BluetoothWithPortEvent): +class BluetoothDeviceEvent(BluetoothEvent): """ Base class for Bluetooth device events. """ @@ -49,59 +38,52 @@ class BluetoothDeviceEvent(BluetoothWithPortEvent): *args, address: str, connected: bool, - paired: bool, - trusted: bool, - blocked: bool, name: Optional[str] = None, - uuids: Optional[Dict[str, str]] = None, rssi: Optional[int] = None, tx_power: Optional[int] = None, - manufacturers: Optional[Dict[int, str]] = None, - manufacturer_data: Optional[Dict[int, str]] = None, - service_data: Optional[Dict[str, str]] = None, + manufacturer: Optional[str] = None, + services: Optional[Iterable[dict]] = None, **kwargs ): """ :param address: The Bluetooth address of the device. :param connected: Whether the device is connected. - :param paired: Whether the device is paired. - :param trusted: Whether the device is trusted. - :param blocked: Whether the device is blocked. :param name: The name of the device. - :param uuids: The UUIDs of the services exposed by the device. :param rssi: Received Signal Strength Indicator. :param tx_power: Transmission power. :param manufacturers: The manufacturers published by the device, as a ``manufacturer_id -> registered_name`` map. - :param manufacturer_data: The manufacturer data published by the - device, as a ``manufacturer_id -> data`` map, where ``data`` is a - hexadecimal string. - :param service_data: The service data published by the device, as a - ``service_uuid -> data`` map, where ``data`` is a hexadecimal string. + :param services: The services published by the device. """ super().__init__( *args, address=address, name=name, connected=connected, - paired=paired, - blocked=blocked, - trusted=trusted, - uuids=uuids or {}, rssi=rssi, tx_power=tx_power, - manufacturers=manufacturers or {}, - manufacturer_data=manufacturer_data or {}, - service_data=service_data or {}, + manufacturer=manufacturer, + services=services, **kwargs ) + @classmethod + def from_device(cls, device: BluetoothDevice, **kwargs) -> "BluetoothDeviceEvent": + """ + Initialize a Bluetooth event from the parameters of a device. -class BluetoothDeviceNewDataEvent(BluetoothDeviceEvent): - """ - Event triggered when a Bluetooth device publishes new manufacturer/service - data. - """ + :param device: Bluetooth device. + """ + return cls( + address=device.address, + name=device.name, + connected=device.connected, + rssi=device.rssi, + tx_power=device.tx_power, + manufacturer=device.manufacturer, + services=[srv.to_dict() for srv in device.services], + **kwargs + ) class BluetoothDeviceFoundEvent(BluetoothDeviceEvent): @@ -128,73 +110,61 @@ class BluetoothDeviceDisconnectedEvent(BluetoothDeviceEvent): """ -class BluetoothDevicePairedEvent(BluetoothDeviceEvent): - """ - Event triggered when a Bluetooth device is paired. - """ - - -class BluetoothDeviceUnpairedEvent(BluetoothDeviceEvent): - """ - Event triggered when a Bluetooth device is unpaired. - """ - - -class BluetoothDeviceBlockedEvent(BluetoothDeviceEvent): - """ - Event triggered when a Bluetooth device is blocked. - """ - - -class BluetoothDeviceUnblockedEvent(BluetoothDeviceEvent): - """ - Event triggered when a Bluetooth device is unblocked. - """ - - -class BluetoothDeviceTrustedEvent(BluetoothDeviceEvent): - """ - Event triggered when a Bluetooth device is trusted. - """ - - class BluetoothDeviceSignalUpdateEvent(BluetoothDeviceEvent): """ Event triggered when the RSSI/TX power of a Bluetooth device is updated. """ -class BluetoothDeviceUntrustedEvent(BluetoothDeviceEvent): +class BluetoothConnectionFailedEvent(BluetoothDeviceEvent): """ - Event triggered when a Bluetooth device is untrusted. + Event triggered when a Bluetooth connection fails. """ -class BluetoothConnectionRejectedEvent(BluetoothDeviceEvent): +class BluetoothFileEvent(BluetoothDeviceEvent): """ - Event triggered when a Bluetooth connection is rejected. + Base class for Bluetooth file events. + """ + + def __init__(self, *args, file: str, **kwargs): + super().__init__(*args, file=file, **kwargs) + + +class BluetoothFileTransferStartedEvent(BluetoothFileEvent): + """ + Event triggered when a file transfer is initiated. """ -class BluetoothFilePutRequestEvent(BluetoothWithPortEvent): +class BluetoothFileTransferCancelledEvent(BluetoothFileEvent): + """ + Event triggered when a file transfer is cancelled. + """ + + +class BluetoothFileReceivedEvent(BluetoothFileEvent): + """ + Event triggered when a file download is completed. + """ + + +class BluetoothFileSentEvent(BluetoothFileEvent): + """ + Event triggered when a file upload is completed. + """ + + +class BluetoothFilePutRequestEvent(BluetoothFileEvent): """ Event triggered when a file put request is received. """ -class BluetoothFileGetRequestEvent(BluetoothWithPortEvent): +class BluetoothFileGetRequestEvent(BluetoothFileEvent): """ Event triggered when a file get request is received. """ -class BluetoothFileReceivedEvent(BluetoothEvent): - """ - Event triggered when a file transfer is completed. - """ - - def __init__(self, *args, path: str, **kwargs): - super().__init__(*args, path=path, **kwargs) - - # vim:sw=4:ts=4:et: diff --git a/platypush/message/response/bluetooth.py b/platypush/message/response/bluetooth.py deleted file mode 100644 index 3b42be385..000000000 --- a/platypush/message/response/bluetooth.py +++ /dev/null @@ -1,202 +0,0 @@ -from platypush.message.response import Response - - -class BluetoothResponse(Response): - pass - - -class BluetoothScanResponse(BluetoothResponse): - def __init__(self, devices, *args, **kwargs): - if isinstance(devices, list): - self.devices = [ - { - 'addr': dev[0], - 'name': dev[1] if len(dev) > 1 else None, - 'class': hex(dev[2]) if len(dev) > 2 else None, - } - for dev in devices - ] - elif isinstance(devices, dict): - self.devices = [ - { - 'addr': addr, - 'name': name or None, - 'class': 'BLE', - } - for addr, name in devices.items() - ] - else: - raise ValueError('devices must be either a list of tuples or a dict') - - super().__init__(output=self.devices, *args, **kwargs) - - -class BluetoothLookupNameResponse(BluetoothResponse): - def __init__(self, addr: str, name: str, *args, **kwargs): - self.addr = addr - self.name = name - super().__init__(output={ - 'addr': self.addr, - 'name': self.name - }, *args, **kwargs) - - -class BluetoothLookupServiceResponse(BluetoothResponse): - """ - Example services response output: - - .. code-block:: json - - [ - { - "service-classes": [ - "1801" - ], - "profiles": [], - "name": "Service name #1", - "description": null, - "provider": null, - "service-id": null, - "protocol": "L2CAP", - "port": 31, - "host": "00:11:22:33:44:55" - }, - { - "service-classes": [ - "1800" - ], - "profiles": [], - "name": "Service name #2", - "description": null, - "provider": null, - "service-id": null, - "protocol": "L2CAP", - "port": 31, - "host": "00:11:22:33:44:56" - }, - { - "service-classes": [ - "1112", - "1203" - ], - "profiles": [ - [ - "1108", - 258 - ] - ], - "name": "Headset Gateway", - "description": null, - "provider": null, - "service-id": null, - "protocol": "RFCOMM", - "port": 2, - "host": "00:11:22:33:44:57" - } - ] - - """ - def __init__(self, services: list, *args, **kwargs): - self.services = services - super().__init__(output=self.services, *args, **kwargs) - - -class BluetoothDiscoverPrimaryResponse(BluetoothResponse): - """ - Example services response output: - - .. code-block:: json - - [ - { - "uuid": "00001800-0000-1000-8000-00805f9b34fb", - "start": 1, - "end": 7 - }, - { - "uuid": "00001801-0000-1000-8000-00805f9b34fb", - "start": 8, - "end": 8 - }, - { - "uuid": "0000fee7-0000-1000-8000-00805f9b34fb", - "start": 9, - "end": 16 - }, - { - "uuid": "cba20d00-224d-11e6-9fb8-0002a5d5c51b", - "start": 17, - "end": 65535 - } - ] - - """ - def __init__(self, services: list, *args, **kwargs): - self.services = services - super().__init__(output=self.services, *args, **kwargs) - - -class BluetoothDiscoverCharacteristicsResponse(BluetoothResponse): - """ - Example services response output: - - .. code-block:: json - - [ - { - "uuid": "00002a00-0000-1000-8000-00805f9b34fb", - "handle": 2, - "properties": 10, - "value_handle": 3 - }, - { - "uuid": "00002a01-0000-1000-8000-00805f9b34fb", - "handle": 4, - "properties": 2, - "value_handle": 5 - }, - { - "uuid": "00002a04-0000-1000-8000-00805f9b34fb", - "handle": 6, - "properties": 2, - "value_handle": 7 - }, - { - "uuid": "0000fec8-0000-1000-8000-00805f9b34fb", - "handle": 10, - "properties": 32, - "value_handle": 11 - }, - { - "uuid": "0000fec7-0000-1000-8000-00805f9b34fb", - "handle": 13, - "properties": 8, - "value_handle": 14 - }, - { - "uuid": "0000fec9-0000-1000-8000-00805f9b34fb", - "handle": 15, - "properties": 2, - "value_handle": 16 - }, - { - "uuid": "cba20003-224d-11e6-9fb8-0002a5d5c51b", - "handle": 18, - "properties": 16, - "value_handle": 19 - }, - { - "uuid": "cba20002-224d-11e6-9fb8-0002a5d5c51b", - "handle": 21, - "properties": 12, - "value_handle": 22 - } - ] - - """ - def __init__(self, characteristics: list, *args, **kwargs): - self.characteristics = characteristics - super().__init__(output=self.characteristics, *args, **kwargs) - - -# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/bluetooth/__init__.py b/platypush/plugins/bluetooth/__init__.py index f221243b0..e91e061b2 100644 --- a/platypush/plugins/bluetooth/__init__.py +++ b/platypush/plugins/bluetooth/__init__.py @@ -1,434 +1,3 @@ -import base64 -import os -import re -import select +from ._plugin import BluetoothPlugin -from typing import Dict, Optional - -from platypush.plugins.sensor import SensorPlugin - -from platypush.plugins import action -from platypush.message.response.bluetooth import BluetoothScanResponse, \ - BluetoothLookupNameResponse, BluetoothLookupServiceResponse, BluetoothResponse - - -class BluetoothPlugin(SensorPlugin): - """ - Bluetooth plugin - - Requires: - - * **pybluez** (``pip install pybluez``) - * **pyobex** (``pip install git+https://github.com/BlackLight/PyOBEX``) - - """ - - import bluetooth - - class _DeviceDiscoverer(bluetooth.DeviceDiscoverer): - def __init__(self, name, *args, **kwargs): - # noinspection PyArgumentList - super().__init__(*args, **kwargs) - self.name = name - self.device = {} - self.done = True - - def pre_inquiry(self): - self.done = False - - # noinspection PyUnusedLocal - def device_discovered(self, dev_addr, dev_class, rssi, dev_name): - dev_name = dev_name.decode() - if dev_name == self.name: - self.device = { - 'addr': dev_addr, - 'name': dev_name, - 'class': dev_class, - } - - self.done = True - - def inquiry_complete(self): - self.done = True - - def __init__(self, device_id: int = -1, **kwargs): - """ - :param device_id: Default adapter device_id to be used (default: -1, auto) - """ - super().__init__(**kwargs) - self.device_id = device_id - self._devices = [] - self._devices_by_addr = {} - self._devices_by_name = {} - self._port_and_protocol_by_addr_and_srv_uuid = {} - self._port_and_protocol_by_addr_and_srv_name = {} - self._socks = {} - - def _get_device_addr(self, device): - if re.match('([0-9A-F]{2}:){5}[0-9A-F]{2}', device, re.IGNORECASE): - return device - if device in self._devices_by_name: - return self._devices_by_name[device]['addr'] - - return self.lookup_address(device).output['addr'] - - @action - def scan(self, device_id: Optional[int] = None, duration: int = 10) -> BluetoothScanResponse: - """ - Scan for nearby bluetooth devices - - :param device_id: Bluetooth adapter ID to use (default configured if None) - :param duration: Scan duration in seconds - """ - from bluetooth import discover_devices - - if device_id is None: - device_id = self.device_id - - self.logger.debug('Discovering devices on adapter {}, duration: {} seconds'.format( - device_id, duration)) - - devices = discover_devices(duration=duration, lookup_names=True, lookup_class=True, device_id=device_id, - flush_cache=True) - response = BluetoothScanResponse(devices) - - self._devices = response.devices - self._devices_by_addr = {dev['addr']: dev for dev in self._devices} - self._devices_by_name = {dev['name']: dev for dev in self._devices if dev.get('name')} - return response - - @action - def get_measurement(self, device_id: Optional[int] = None, duration: Optional[int] = 10, *args, **kwargs) \ - -> Dict[str, dict]: - """ - Wrapper for ``scan`` that returns bluetooth devices in a format usable by sensor backends. - - :param device_id: Bluetooth adapter ID to use (default configured if None) - :param duration: Scan duration in seconds - :return: Device address -> info map. - """ - devices = self.scan(device_id=device_id, duration=duration).output - return {device['addr']: device for device in devices} - - @action - def lookup_name(self, addr: str, timeout: int = 10) -> BluetoothLookupNameResponse: - """ - Look up the name of a nearby bluetooth device given the address - - :param addr: Device address - :param timeout: Lookup timeout (default: 10 seconds) - """ - from bluetooth import lookup_name - - self.logger.debug('Looking up name for device {}'.format(addr)) - name = lookup_name(addr, timeout=timeout) - - dev = { - 'addr': addr, - 'name': name, - 'class': self._devices_by_addr.get(addr, {}).get('class'), - } - - self._devices_by_addr[addr] = dev - if name: - self._devices_by_name[name] = dev - - return BluetoothLookupNameResponse(addr=addr, name=name) - - @action - def lookup_address(self, name: str, timeout: int = 10) -> BluetoothLookupNameResponse: - """ - Look up the address of a nearby bluetooth device given the name - - :param name: Device name - :param timeout: Lookup timeout (default: 10 seconds) - """ - - self.logger.info('Looking up address for device {}'.format(name)) - discoverer = self._DeviceDiscoverer(name) - discoverer.find_devices(lookup_names=True, duration=timeout) - readfiles = [discoverer] - - while True: - rfds = select.select(readfiles, [], [])[0] - if discoverer in rfds: - discoverer.process_event() - - if discoverer.done: - break - - dev = discoverer.device - if not dev: - raise RuntimeError('No such device: {}'.format(name)) - - addr = dev.get('addr') - self._devices_by_addr[addr] = dev - self._devices_by_name[name] = dev - return BluetoothLookupNameResponse(addr=addr, name=name) - - @action - def find_service(self, name: str = None, addr: str = None, uuid: str = None) -> BluetoothLookupServiceResponse: - """ - Look up for a service published by a nearby bluetooth device. If all the parameters are null then all the - published services on the nearby devices will be returned. See - `:class:platypush.message.response.bluetoothBluetoothLookupServiceResponse` for response structure reference. - - :param name: Service name - :param addr: Service/device address - :param uuid: Service UUID - """ - - import bluetooth - from bluetooth import find_service - services = find_service(name=name, address=addr, uuid=uuid) - - self._port_and_protocol_by_addr_and_srv_uuid.update({ - (srv['host'], srv['service-id']): (srv['port'], getattr(bluetooth, srv['protocol'])) - for srv in services if srv.get('service-id') - }) - - self._port_and_protocol_by_addr_and_srv_name.update({ - (srv['host'], srv['name']): (srv['port'], getattr(bluetooth, srv['protocol'])) - for srv in services if srv.get('name') - }) - - return BluetoothLookupServiceResponse(services) - - def _get_sock(self, protocol=None, device: str = None, port: int = None, service_uuid: str = None, - service_name: str = None, connect_if_closed=False): - sock = None - addr = self._get_device_addr(device) - - if not (addr and port and protocol): - addr, port, protocol = self._get_addr_port_protocol(protocol=protocol, device=device, port=port, - service_uuid=service_uuid, service_name=service_name) - - if (addr, port) in self._socks: - sock = self._socks[(addr, port)] - elif connect_if_closed: - self.connect(protocol=protocol, device=device, port=port, service_uuid=service_uuid, - service_name=service_name) - sock = self._socks[(addr, port)] - - return sock - - def _get_addr_port_protocol(self, protocol=None, device: str = None, port: int = None, service_uuid: str = None, - service_name: str = None) -> tuple: - import bluetooth - - addr = self._get_device_addr(device) if device else None - if service_uuid or service_name: - if addr: - if service_uuid: - (port, protocol) = self._port_and_protocol_by_addr_and_srv_uuid[(addr, service_uuid)] \ - if (addr, service_uuid) in self._port_and_protocol_by_addr_and_srv_uuid else \ - (None, None) - else: - (port, protocol) = self._port_and_protocol_by_addr_and_srv_name[(addr, service_name)] \ - if (addr, service_name) in self._port_and_protocol_by_addr_and_srv_name else \ - (None, None) - - if not (addr and port): - self.logger.info('Discovering devices, service_name={name}, uuid={uuid}, address={addr}'.format( - name=service_name, uuid=service_uuid, addr=addr)) - - services = [ - srv for srv in self.find_service().services - if (service_name is None or srv.get('name') == service_name) and - (addr is None or srv.get('host') == addr) and - (service_uuid is None or srv.get('service-id') == service_uuid) - ] - - if not services: - raise RuntimeError('No such service: name={name} uuid={uuid} address={addr}'.format( - name=service_name, uuid=service_uuid, addr=addr)) - - service = services[0] - addr = service['host'] - port = service['port'] - protocol = getattr(bluetooth, service['protocol']) - elif protocol: - if isinstance(protocol, str): - protocol = getattr(bluetooth, protocol) - else: - raise RuntimeError('No service name/UUID nor bluetooth protocol (RFCOMM/L2CAP) specified') - - if not (addr and port): - raise RuntimeError('No valid device name/address, port, service name or UUID specified') - - return addr, port, protocol - - @action - def connect(self, protocol=None, device: str = None, port: int = None, service_uuid: str = None, - service_name: str = None): - """ - Connect to a bluetooth device. - You can query the advertised services through ``find_service``. - - :param protocol: Supported values: either 'RFCOMM'/'L2CAP' (str) or bluetooth.RFCOMM/bluetooth.L2CAP - int constants (int) - :param device: Device address or name - :param port: Port number - :param service_uuid: Service UUID - :param service_name: Service name - """ - from bluetooth import BluetoothSocket - - addr, port, protocol = self._get_addr_port_protocol(protocol=protocol, device=device, port=port, - service_uuid=service_uuid, service_name=service_name) - sock = self._get_sock(protocol=protocol, device=addr, port=port) - if sock: - self.close(device=addr, port=port) - - sock = BluetoothSocket(protocol) - self.logger.info('Opening connection to device {} on port {}'.format(addr, port)) - sock.connect((addr, port)) - self.logger.info('Connected to device {} on port {}'.format(addr, port)) - self._socks[(addr, port)] = sock - - @action - def close(self, device: str = None, port: int = None, service_uuid: str = None, service_name: str = None): - """ - Close an active bluetooth connection - - :param device: Device address or name - :param port: Port number - :param service_uuid: Service UUID - :param service_name: Service name - """ - sock = self._get_sock(device=device, port=port, service_uuid=service_uuid, service_name=service_name) - - if not sock: - self.logger.debug('Close on device {}({}) that is not connected'.format(device, port)) - return - - try: - sock.close() - except Exception as e: - self.logger.warning('Exception while closing previous connection to {}({}): {}'.format( - device, port, str(e))) - - @action - def send(self, data, device: str = None, port: int = None, service_uuid: str = None, service_name: str = None, - binary: bool = False): - """ - Send data to an active bluetooth connection - - :param data: Data to be sent - :param device: Device address or name - :param service_uuid: Service UUID - :param service_name: Service name - :param port: Port number - :param binary: Set to true if msg is a base64-encoded binary string - """ - from bluetooth import BluetoothError - - sock = self._get_sock(device=device, port=port, service_uuid=service_uuid, service_name=service_name, - connect_if_closed=True) - - if binary: - data = base64.decodebytes(data.encode() if isinstance(data, str) else data) - - try: - sock.send(data) - except BluetoothError as e: - self.close(device=device, port=port, service_uuid=service_uuid, service_name=service_name) - raise e - - @action - def recv(self, device: str, port: int, service_uuid: str = None, service_name: str = None, size: int = 1024, - binary: bool = False) -> BluetoothResponse: - """ - Send data to an active bluetooth connection - - :param device: Device address or name - :param port: Port number - :param service_uuid: Service UUID - :param service_name: Service name - :param size: Maximum number of bytes to be read - :param binary: Set to true to return a base64-encoded binary string - """ - from bluetooth import BluetoothError - - sock = self._get_sock(device=device, port=port, service_uuid=service_uuid, service_name=service_name, - connect_if_closed=True) - - if not sock: - self.connect(device=device, port=port, service_uuid=service_uuid, service_name=service_name) - sock = self._get_sock(device=device, port=port, service_uuid=service_uuid, service_name=service_name) - - try: - data = sock.recv(size) - except BluetoothError as e: - self.close(device=device, port=port, service_uuid=service_uuid, service_name=service_name) - raise e - - if binary: - data = base64.encodebytes(data) - - return BluetoothResponse(output=data.decode()) - - @action - def set_l2cap_mtu(self, mtu: int, device: str = None, port: int = None, service_name: str = None, - service_uuid: str = None): - """ - Set the L2CAP MTU (Maximum Transmission Unit) value for a connected bluetooth device. - Both the devices usually use the same MTU value over a connection. - - :param device: Device address or name - :param port: Port number - :param service_uuid: Service UUID - :param service_name: Service name - :param mtu: New MTU value - """ - from bluetooth import BluetoothError, set_l2cap_mtu, L2CAP - - sock = self._get_sock(protocol=L2CAP, device=device, port=port, service_uuid=service_uuid, - service_name=service_name, connect_if_closed=True) - - if not sock: - raise RuntimeError('set_l2cap_mtu: device not connected') - - try: - set_l2cap_mtu(sock, mtu) - except BluetoothError as e: - self.close(device=device, port=port, service_name=service_name, service_uuid=service_uuid) - raise e - - @action - def send_file(self, filename: str, device: str, port: int = None, data=None, service_name='OBEX Object Push', - binary: bool = False): - """ - Send a local file to a device that exposes an OBEX Object Push service - - :param filename: Path of the file to be sent - :param data: Alternatively to a file on disk you can send raw (string or binary) content - :param device: Device address or name - :param port: Port number - :param service_name: Service name - :param binary: Set to true if data is a base64-encoded binary string - """ - from PyOBEX.client import Client - - if not data: - filename = os.path.abspath(os.path.expanduser(filename)) - with open(filename, 'r') as f: - data = f.read() - filename = os.path.basename(filename) - else: - if binary: - data = base64.decodebytes(data.encode() if isinstance(data, str) else data) - - addr, port, protocol = self._get_addr_port_protocol(device=device, port=port, - service_name=service_name) - - client = Client(addr, port) - self.logger.info('Connecting to device {}'.format(addr)) - client.connect() - self.logger.info('Sending file {} to device {}'.format(filename, addr)) - client.put(filename, data) - self.logger.info('File {} sent to device {}'.format(filename, addr)) - client.disconnect() - - -# vim:sw=4:ts=4:et: +__all__ = ["BluetoothPlugin"] diff --git a/platypush/plugins/bluetooth/_ble/__init__.py b/platypush/plugins/bluetooth/_ble/__init__.py new file mode 100644 index 000000000..e314ebecd --- /dev/null +++ b/platypush/plugins/bluetooth/_ble/__init__.py @@ -0,0 +1,5 @@ +from ._manager import BLEManager + +__all__ = ["BLEManager"] + +# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/bluetooth/_ble/_cache.py b/platypush/plugins/bluetooth/_ble/_cache.py new file mode 100644 index 000000000..c875cd8f9 --- /dev/null +++ b/platypush/plugins/bluetooth/_ble/_cache.py @@ -0,0 +1,41 @@ +from typing import Dict, Iterable, Optional, Tuple +from typing_extensions import override + +from bleak.backends.device import BLEDevice + +from .._cache import BaseCache + + +class DeviceCache(BaseCache): + """ + Cache used to store scanned Bluetooth devices as :class:`BLEDevice`. + """ + + _by_address: Dict[str, BLEDevice] + _by_name: Dict[str, BLEDevice] + + @property + @override + def _address_field(self) -> str: + return 'address' + + @property + @override + def _name_field(self) -> str: + return 'name' + + @override + def get(self, device: str) -> Optional[BLEDevice]: + return super().get(device) + + @override + def add(self, device: BLEDevice) -> BLEDevice: + return super().add(device) + + @override + def values(self) -> Iterable[BLEDevice]: + return super().values() + + @override + def items(self) -> Iterable[Tuple[str, BLEDevice]]: + return super().items() diff --git a/platypush/plugins/bluetooth/_ble/_connection.py b/platypush/plugins/bluetooth/_ble/_connection.py new file mode 100644 index 000000000..008aec54c --- /dev/null +++ b/platypush/plugins/bluetooth/_ble/_connection.py @@ -0,0 +1,20 @@ +import asyncio +from dataclasses import dataclass +import threading +from typing import Optional + +from bleak import BleakClient +from bleak.backends.device import BLEDevice + + +@dataclass +class BluetoothConnection: + """ + A class to store information and context about a Bluetooth connection. + """ + + client: BleakClient + device: BLEDevice + loop: asyncio.AbstractEventLoop + close_event: Optional[asyncio.Event] = None + thread: Optional[threading.Thread] = None diff --git a/platypush/plugins/bluetooth/_ble/_event_handler.py b/platypush/plugins/bluetooth/_ble/_event_handler.py new file mode 100644 index 000000000..b4c20d390 --- /dev/null +++ b/platypush/plugins/bluetooth/_ble/_event_handler.py @@ -0,0 +1,135 @@ +from datetime import datetime, timedelta +from queue import Queue +from typing import Callable, Dict, Final, List, Optional, Type + +from bleak.backends.device import BLEDevice +from bleak.backends.scanner import AdvertisementData + +from platypush.entities.bluetooth import BluetoothDevice +from platypush.message.event.bluetooth import ( + BluetoothDeviceConnectedEvent, + BluetoothDeviceDisconnectedEvent, + BluetoothDeviceFoundEvent, + BluetoothDeviceSignalUpdateEvent, + BluetoothDeviceEvent, +) + +from platypush.context import get_bus + +from .._cache import EntityCache +from ._cache import DeviceCache +from ._mappers import device_to_entity + +_rssi_update_interval: Final[float] = 30.0 +""" How often to trigger RSSI update events for a device. """ + + +def _has_changed( + old: Optional[BluetoothDevice], new: BluetoothDevice, attr: str +) -> bool: + """ + Returns True if the given attribute has changed on the device. + """ + if old is None: + return False # No previous value + + old_value = getattr(old, attr) + new_value = getattr(new, attr) + return old_value != new_value + + +def _has_been_set( + old: Optional[BluetoothDevice], new: BluetoothDevice, attr: str, value: bool +) -> bool: + """ + Returns True if the given attribute has changed and its new value matches + the given value. + """ + if not _has_changed(old, new, attr): + return False + + new_value = getattr(new, attr) + return new_value == value + + +event_matchers: Dict[ + Type[BluetoothDeviceEvent], + Callable[[Optional[BluetoothDevice], BluetoothDevice], bool], +] = { + BluetoothDeviceConnectedEvent: lambda old, new: _has_been_set( + old, new, 'connected', True + ), + BluetoothDeviceDisconnectedEvent: lambda old, new: old is not None + and old.connected + and _has_been_set(old, new, 'connected', False), + BluetoothDeviceFoundEvent: lambda old, new: old is None + or (old.reachable is False and new.reachable is True), + BluetoothDeviceSignalUpdateEvent: lambda old, new: ( + (new.rssi is not None or new.tx_power is not None) + and (_has_changed(old, new, 'rssi') or _has_changed(old, new, 'tx_power')) + and ( + not (old and old.updated_at) + or datetime.utcnow() - old.updated_at + >= timedelta(seconds=_rssi_update_interval) + ) + ), +} +""" A static ``BluetoothDeviceEvent -> MatchCallback`` mapping. """ + + +# pylint: disable=too-few-public-methods +class EventHandler: + """ + Event handler for BLE devices. + """ + + def __init__( + self, + device_queue: Queue, + device_cache: DeviceCache, + entity_cache: EntityCache, + ): + """ + :param device_queue: Queue used to publish updated devices upstream. + :param device_cache: Device cache. + :param entity_cache: Entity cache. + """ + self._device_queue = device_queue + self._device_cache = device_cache + self._entity_cache = entity_cache + + def __call__(self, device: BLEDevice, data: AdvertisementData): + """ + Handler for Bluetooth device advertisement packets. + + 1. It generates the relevant + :class:`platypush.message.event.bluetooth.BluetoothDeviceEvent` if the + state of the device has changed. + + 2. It builds the relevant + :class:`platypush.entity.bluetooth.BluetoothDevice` entity object + populated with children entities that contain the supported + properties. + + 3. Publishes the updated entity to the upstream queue. + + :param device: The Bluetooth device. + :param data: The advertised data. + """ + + events: List[BluetoothDeviceEvent] = [] + existing_entity = self._entity_cache.get(device.address) + new_entity = device_to_entity(device, data) + + events += [ + event_type.from_device(new_entity) + for event_type, matcher in event_matchers.items() + if matcher(existing_entity, new_entity) + ] + + self._device_cache.add(device) + for event in events: + get_bus().post(event) + + if events: + self._device_queue.put_nowait(new_entity) diff --git a/platypush/plugins/bluetooth/_ble/_manager.py b/platypush/plugins/bluetooth/_ble/_manager.py new file mode 100644 index 000000000..55fcdd5fe --- /dev/null +++ b/platypush/plugins/bluetooth/_ble/_manager.py @@ -0,0 +1,375 @@ +import asyncio +from contextlib import asynccontextmanager +import threading +from typing import ( + AsyncGenerator, + Collection, + Final, + List, + Optional, + Dict, + Union, +) + +from bleak import BleakClient, BleakScanner +from bleak.backends.device import BLEDevice +from typing_extensions import override + +from platypush.context import get_or_create_event_loop +from platypush.entities.bluetooth import BluetoothDevice +from platypush.message.event.bluetooth import ( + BluetoothConnectionFailedEvent, + BluetoothDeviceDisconnectedEvent, + BluetoothDeviceLostEvent, +) + +from ._cache import DeviceCache +from ._connection import BluetoothConnection +from ._event_handler import EventHandler +from .._manager import BaseBluetoothManager +from .._types import RawServiceClass + + +class BLEManager(BaseBluetoothManager): + """ + Integration for Bluetooth Low Energy (BLE) devices. + """ + + _rssi_update_interval: Final[int] = 30 + """ + How long we should wait before triggering an update event upon a new + RSSI update, in seconds. + """ + + def __init__(self, *args, **kwargs) -> None: + super().__init__(*args, **kwargs) + + self._connections: Dict[str, BluetoothConnection] = {} + """ + ``address -> BluetoothConnection`` mapping containing the active + connections + """ + self._connection_locks: Dict[str, asyncio.Lock] = {} + """ + ``address -> Lock`` locks used to synchronize concurrent access to + the devices + """ + self._device_cache = DeviceCache() + """ Cache of discovered ``BLEDevice`` objects. """ + self._event_handler = EventHandler( + device_queue=self._device_queue, + device_cache=self._device_cache, + entity_cache=self._cache, + ) + """ Bluetooth device event handler """ + self._main_loop: Optional[asyncio.AbstractEventLoop] = None + """ Main event loop """ + + async def _get_device(self, device: str) -> BLEDevice: + """ + Utility method to get a device by name or address. + """ + dev = self._device_cache.get(device) + if not dev: + self.logger.info('Scanning for unknown device "%s"', device) + await self._scan() + dev = self._device_cache.get(device) + + assert dev, f'Unknown device: "{device}"' + return dev + + def _disconnected_callback(self, client: BleakClient): + self._connections.pop(client.address, None) + dev = self._cache.get(client.address) + if not dev: + return # Unknown device + + dev.connected = False + self.notify(BluetoothDeviceDisconnectedEvent, dev) + + @asynccontextmanager + async def _connect( + self, + device: str, + interface: Optional[str] = None, + timeout: Optional[float] = None, + close_event: Optional[asyncio.Event] = None, + ) -> AsyncGenerator[BluetoothConnection, None]: + """ + Asynchronous context manager that wraps a BLE device connection. + """ + dev = await self._get_device(device) + + async with self._connection_locks.get(dev.address, asyncio.Lock()) as lock: + self._connection_locks[dev.address] = lock or asyncio.Lock() + + async with BleakClient( + dev.address, + adapter=interface or self._interface, + timeout=timeout or self._connect_timeout, + disconnected_callback=self._disconnected_callback, + ) as client: + entity = self._cache.get(client.address) + if not client: + if entity: + entity.connected = False + self.notify(BluetoothConnectionFailedEvent, entity) + + raise AssertionError(f'Could not connect to the device {device}') + + # Yield the BluetoothConnection object + self._connections[dev.address] = BluetoothConnection( + client=client, + device=dev, + loop=asyncio.get_event_loop(), + thread=threading.current_thread(), + close_event=close_event, + ) + yield self._connections[dev.address] + + async def _read( + self, + device: str, + service_uuid: RawServiceClass, + interface: Optional[str] = None, + connect_timeout: Optional[float] = None, + ) -> bytearray: + """ + Asynchronously read the next chunk of raw bytes from a BLE device given + a service UUID. + """ + async with self._connect(device, interface, connect_timeout) as conn: + data = await conn.client.read_gatt_char(service_uuid) + + return data + + async def _write( + self, + device: str, + data: bytes, + service_uuid: RawServiceClass, + interface: Optional[str] = None, + connect_timeout: Optional[float] = None, + ): + """ + Asynchronously write a chunk of raw bytes to a BLE device given a + service UUID. + """ + async with self._connect(device, interface, connect_timeout) as conn: + await conn.client.write_gatt_char(service_uuid, data) + + async def _scan( + self, + duration: Optional[float] = None, + service_uuids: Optional[Collection[RawServiceClass]] = None, + ) -> List[BluetoothDevice]: + """ + Asynchronously scan for BLE devices and return the discovered devices + as a list of :class:`platypush.entities.bluetooth.BluetoothDevice` + entities. + """ + with self._scan_lock: + timeout = duration or self.poll_interval + devices = await BleakScanner.discover( + adapter=self._interface, + timeout=timeout, + service_uuids=list( + map(str, service_uuids or self._service_uuids or []) + ), + detection_callback=self._event_handler, + ) + + addresses = {dev.address for dev in devices} + return [ + dev + for addr, dev in self._cache.items() + if addr.lower() in addresses and dev.reachable + ] + + def _close_active_connections(self): + """ + Terminates all active connections. + """ + connections = list(self._connections.values()) + for conn in connections: + try: + self.disconnect(conn.device.address) + except Exception as e: + self.logger.warning( + 'Error while disconnecting from %s: %s', conn.device.address, e + ) + + @override + def connect( + self, + device: str, + port: Optional[int] = None, + service_uuid: Optional[RawServiceClass] = None, + interface: Optional[str] = None, + timeout: Optional[float] = None, + ): + timeout = timeout or self._connect_timeout + connected_event = threading.Event() + close_event = asyncio.Event() + loop = asyncio.new_event_loop() + + def connect_thread(): + """ + The connection thread. It wraps an asyncio loop with a connect + context manager. + """ + + async def connect_wrapper(): + """ + The asyncio connect wrapper. + """ + async with self._connect(device, interface, timeout, close_event): + connected_event.set() + await close_event.wait() + + asyncio.set_event_loop(loop) + loop.run_until_complete(connect_wrapper()) + + # Initialize the loop and start the connection thread + loop = get_or_create_event_loop() + connector = threading.Thread( + target=connect_thread, + name=f'Bluetooth:connect@{device}', + ) + connector.start() + + # Wait for the connection to succeed + success = connected_event.wait(timeout=timeout) + assert success, f'Connection to {device} timed out' + + @override + def disconnect(self, device: str, *_, **__): + # Get the device + loop = get_or_create_event_loop() + dev = loop.run_until_complete(self._get_device(device)) + assert dev, f'Device {device} not found' + + # Check if there are any active connections + connection = self._connections.get(dev.address, None) + assert connection, f'No active connections to the device {device} were found' + + # Set the close event and wait for any connection thread to terminate + if connection.close_event: + connection.close_event.set() + if connection.thread and connection.thread.is_alive(): + connection.thread.join(timeout=5) + assert not ( + connection.thread and connection.thread.is_alive() + ), f'Disconnection from {device} timed out' + + @override + def scan( + self, + duration: Optional[float] = None, + service_uuids: Optional[Collection[RawServiceClass]] = None, + ) -> List[BluetoothDevice]: + """ + Scan for Bluetooth devices nearby and return the results as a list of + entities. + + :param duration: Scan duration in seconds (default: same as the plugin's + `poll_interval` configuration parameter) + :param service_uuids: List of service UUIDs to discover. Default: any. + """ + loop = get_or_create_event_loop() + return loop.run_until_complete(self._scan(duration, service_uuids)) + + @override + def read( + self, + device: str, + service_uuid: RawServiceClass, + interface: Optional[str] = None, + connect_timeout: Optional[float] = None, + ) -> bytearray: + """ + :param device: Name or address of the device to read from. + :param service_uuid: Service UUID. + :param interface: Bluetooth adapter name to use (default configured if None). + :param connect_timeout: Connection timeout in seconds (default: same as the + configured `connect_timeout`). + """ + loop = get_or_create_event_loop() + return loop.run_until_complete( + self._read(device, service_uuid, interface, connect_timeout) + ) + + @override + def write( + self, + device: str, + data: Union[bytes, bytearray], + service_uuid: RawServiceClass, + interface: Optional[str] = None, + connect_timeout: Optional[float] = None, + ): + get_or_create_event_loop().run_until_complete( + self._write(device, data, service_uuid, interface, connect_timeout) + ) + + async def listen(self): + """ + Main loop listener. + """ + self.logger.info('Starting BLE scanner') + device_addresses = set() + + while not self.should_stop(): + self._scan_enabled.wait() + if self.should_stop(): + break + + entities = await self._scan(service_uuids=self._service_uuids) + new_device_addresses = {e.external_id for e in entities} + missing_device_addresses = device_addresses - new_device_addresses + missing_devices = [ + dev + for addr, dev in self._cache.items() + if addr in missing_device_addresses + ] + + for dev in missing_devices: + dev.reachable = False + dev.connected = False + self.notify(BluetoothDeviceLostEvent, dev) + + device_addresses = new_device_addresses + + @override + def run(self): + super().run() + + self._main_loop = asyncio.new_event_loop() + asyncio.set_event_loop(self._main_loop) + try: + self._main_loop.run_until_complete(self.listen()) + except Exception as e: + if not self.should_stop(): + self.logger.warning('The main loop failed unexpectedly: %s', e) + self.logger.exception(e) + finally: + try: + # Try and release the scan lock if acquired + self._scan_lock.release() + except Exception: + pass + + @override + def stop(self): + """ + Upon stop request, it stops any pending scans and closes all active + connections. + """ + self._close_active_connections() + if self._main_loop and self._main_loop.is_running(): + self._main_loop.stop() + + self.logger.info('Stopped the BLE scanner') + + +# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/bluetooth/ble/_mappers.py b/platypush/plugins/bluetooth/_ble/_mappers.py similarity index 80% rename from platypush/plugins/bluetooth/ble/_mappers.py rename to platypush/plugins/bluetooth/_ble/_mappers.py index 974513568..7d413b61b 100644 --- a/platypush/plugins/bluetooth/ble/_mappers.py +++ b/platypush/plugins/bluetooth/_ble/_mappers.py @@ -1,11 +1,10 @@ import json import struct from dataclasses import dataclass, field -from typing import Any, Callable, Dict, Optional +from typing import Any, Callable, Dict, List, Optional from bleak.backends.device import BLEDevice from bleak.backends.scanner import AdvertisementData -from bleak.uuids import uuidstr_to_str from bluetooth_numbers import company # pylint: disable=no-name-in-module @@ -13,7 +12,7 @@ from TheengsGateway._decoder import decodeBLE, getAttribute, getProperties from platypush.entities import Entity from platypush.entities.batteries import Battery -from platypush.entities.bluetooth import BluetoothDevice +from platypush.entities.bluetooth import BluetoothDevice, BluetoothService from platypush.entities.contact import ContactSensor from platypush.entities.electricity import ( CurrentSensor, @@ -33,6 +32,8 @@ from platypush.entities.temperature import TemperatureSensor from platypush.entities.time import TimeDurationSensor from platypush.entities.weight import WeightSensor +from .._model import Protocol, ServiceClass + @dataclass class TheengsEntity: @@ -42,7 +43,7 @@ class TheengsEntity: data: dict = field(default_factory=dict) properties: dict = field(default_factory=dict) - brand: Optional[str] = None + manufacturer: Optional[str] = None model: Optional[str] = None model_id: Optional[str] = None @@ -196,6 +197,34 @@ _value_type_to_entity: Dict[type, Callable[[Any, Dict[str, Any]], Entity]] = { } +def _parse_services(device: BLEDevice) -> List[BluetoothService]: + """ + :param device: The target device. + :return: The parsed BLE services as a list of + :class:`platypush.entities.bluetooth.BluetoothService`. + """ + services: List[BluetoothService] = [] + for srv in device.metadata.get('uuids', []): + try: + uuid = BluetoothService.to_uuid(srv) + except (TypeError, ValueError): + # Not a valid UUID. + continue + + srv_cls = ServiceClass.get(uuid) + services.append( + BluetoothService( + id=f'{device.address}:{uuid}', + uuid=uuid, + name=str(srv_cls), + protocol=Protocol.L2CAP, + is_ble=True, + ) + ) + + return services + + def device_to_entity(device: BLEDevice, data: AdvertisementData) -> BluetoothDevice: """ Convert the data received from a Bluetooth advertisement packet into a @@ -205,12 +234,26 @@ def device_to_entity(device: BLEDevice, data: AdvertisementData) -> BluetoothDev """ theengs_entity = _parse_advertisement_data(data) + props = (device.details or {}).get('props', {}) + manufacturer = theengs_entity.manufacturer or company.get( + list(device.metadata['manufacturer_data'].keys())[0] + if device.metadata.get('manufacturer_data', {}) + else None + ) + parent_entity = BluetoothDevice( id=device.address, model=theengs_entity.model, - brand=theengs_entity.brand, reachable=True, - **parse_device_args(device), + supports_ble=True, + supports_legacy=False, + address=device.address, + name=device.name or device.address, + connected=props.get('Connected', False), + rssi=device.rssi, + tx_power=props.get('TxPower'), + manufacturer=manufacturer, + children=_parse_services(device), ) parsed_entities = { @@ -239,6 +282,7 @@ def device_to_entity(device: BLEDevice, data: AdvertisementData) -> BluetoothDev entity.id = f'{parent_entity.id}:{prop}' entity.name = prop parent_entity.children.append(entity) + entity.parent = parent_entity return parent_entity @@ -250,7 +294,7 @@ def _parse_advertisement_data(data: AdvertisementData) -> TheengsEntity: maps the parsed attributes. """ - entity_args, properties, brand, model, model_id = ({}, {}, None, None, None) + entity_args, properties, manufacturer, model, model_id = ({}, {}, None, None, None) if data.service_data: parsed_data = list(data.service_data.keys())[0] @@ -286,67 +330,7 @@ def _parse_advertisement_data(data: AdvertisementData) -> TheengsEntity: return TheengsEntity( data=entity_args, properties=properties, - brand=brand, + manufacturer=manufacturer, model=model, model_id=model_id, ) - - -def parse_device_args(device: BLEDevice) -> Dict[str, Any]: - """ - :param device: The device to parse. - :return: The mapped device arguments required to initialize a - :class:`platypush.entity.bluetooth.BluetoothDevice` or - :class:`platypush.message.event.bluetooth.BluetoothDeviceEvent` - object. - """ - - props = (device.details or {}).get('props', {}) - return { - 'name': device.name or device.address, - 'connected': props.get('Connected', False), - 'paired': props.get('Paired', False), - 'blocked': props.get('Blocked', False), - 'trusted': props.get('Trusted', False), - 'rssi': device.rssi, - 'tx_power': props.get('TxPower'), - 'uuids': { - uuid: uuidstr_to_str(uuid) for uuid in device.metadata.get('uuids', []) - }, - 'manufacturers': { - manufacturer_id: company.get(manufacturer_id, 'Unknown') - for manufacturer_id in sorted( - device.metadata.get('manufacturer_data', {}).keys() - ) - }, - 'manufacturer_data': _parse_manufacturer_data(device), - 'service_data': _parse_service_data(device), - } - - -def _parse_manufacturer_data(device: BLEDevice) -> Dict[int, str]: - """ - :param device: The device to parse. - :return: The manufacturer data as a ``manufacturer_id -> hex_string`` - mapping. - """ - return { - manufacturer_id: ''.join([f'{x:02x}' for x in value]) - for manufacturer_id, value in device.metadata.get( - 'manufacturer_data', {} - ).items() - } - - -def _parse_service_data(device: BLEDevice) -> Dict[str, str]: - """ - :param device: The device to parse. - :return: The service data as a ``service_uuid -> hex_string`` mapping. - """ - return { - service_uuid: ''.join([f'{x:02x}' for x in value]) - for service_uuid, value in (device.details or {}) - .get('props', {}) - .get('ServiceData', {}) - .items() - } diff --git a/platypush/plugins/bluetooth/_cache.py b/platypush/plugins/bluetooth/_cache.py new file mode 100644 index 000000000..4c6923b4c --- /dev/null +++ b/platypush/plugins/bluetooth/_cache.py @@ -0,0 +1,207 @@ +from abc import ABC, abstractmethod +from collections import defaultdict +from threading import RLock +from typing import Any, Dict, Iterable, Optional, Tuple, Union +from typing_extensions import override + +from platypush.entities.bluetooth import BluetoothDevice + +from ._model import MajorDeviceClass + + +class BaseCache(ABC): + """ + Base cache class for Bluetooth devices and entities. + """ + + _by_address: Dict[str, Any] + """ Device cache by address. """ + _by_name: Dict[str, Any] + """ Device cache by name. """ + + def __init__(self): + self._by_address = {} + self._by_name = {} + self._insert_locks = defaultdict(RLock) + """ Locks for inserting devices into the cache. """ + + @property + @abstractmethod + def _address_field(self) -> str: + """ + Name of the field that contains the address of the device. + """ + + @property + @abstractmethod + def _name_field(self) -> str: + """ + Name of the field that contains the name of the device. + """ + + def get(self, device: str) -> Optional[Any]: + """ + Get a device by address or name. + """ + dev = self._by_address.get(device) + if not dev: + dev = self._by_name.get(device) + return dev + + def add(self, device: Any) -> Any: + """ + Cache a device. + """ + with self._insert_locks[device.address]: + addr = getattr(device, self._address_field) + name = getattr(device, self._name_field) + self._by_address[addr] = device + if name: + self._by_name[name] = device + + return device + + def keys(self) -> Iterable[str]: + """ + :return: All the cached device addresses. + """ + return list(self._by_address.keys()) + + def values(self) -> Iterable[Any]: + """ + :return: All the cached device entities. + """ + return list(self._by_address.values()) + + def items(self) -> Iterable[Tuple[str, Any]]: + """ + :return: All the cached items, as ``(address, device)`` tuples. + """ + return list(self._by_address.items()) + + def __contains__(self, device: str) -> bool: + """ + :return: ``True`` if the entry is cached, ``False`` otherwise. + """ + return self.get(device) is not None + + +class EntityCache(BaseCache): + """ + Cache used to store scanned Bluetooth devices as :class:`BluetoothDevice`. + """ + + _by_address: Dict[str, BluetoothDevice] + _by_name: Dict[str, BluetoothDevice] + + @property + @override + def _address_field(self) -> str: + return 'address' + + @property + @override + def _name_field(self) -> str: + return 'name' + + @override + def get(self, device: Union[str, BluetoothDevice]) -> Optional[BluetoothDevice]: + dev_filter = device.address if isinstance(device, BluetoothDevice) else device + return super().get(dev_filter) + + @override + def add(self, device: BluetoothDevice) -> BluetoothDevice: + with self._insert_locks[device.address]: + existing_device = self.get(device) + if existing_device: + self._merge_properties(device, existing_device) + self._merge_children(device, existing_device) + device = existing_device + + return super().add(device) + + @override + def values(self) -> Iterable[BluetoothDevice]: + return super().values() + + @override + def items(self) -> Iterable[Tuple[str, BluetoothDevice]]: + return super().items() + + @override + def __contains__(self, device: Union[str, BluetoothDevice]) -> bool: + """ + Override the default ``__contains__`` to support lookup by partial + :class:`BluetoothDevice` objects. + """ + return super().__contains__(device) + + def _merge_properties( + self, device: BluetoothDevice, existing_device: BluetoothDevice + ): + """ + Coalesce the properties of the two device representations. + """ + # Coalesce the major device class + if existing_device.major_device_class == MajorDeviceClass.UNKNOWN: + existing_device.major_device_class = device.major_device_class + + # Coalesce the other device and service classes + for attr in ('major_service_classes', 'minor_device_classes'): + setattr( + existing_device, + attr, + list( + { + *getattr(existing_device, attr, []), + *getattr(device, attr, []), + } + ), + ) + + # Coalesce mutually exclusive supports_* flags + for attr in ('supports_ble', 'supports_legacy'): + if not getattr(existing_device, attr, None): + setattr(existing_device, attr, getattr(device, attr, None) or False) + + # Merge the connected property + existing_device.connected = ( + device.connected + if device.connected is not None + else existing_device.connected + ) + + # Coalesce other manager-specific properties + for attr in ('rssi', 'tx_power'): + if getattr(existing_device, attr, None) is None: + setattr(existing_device, attr, getattr(device, attr, None)) + + # Merge the data and meta dictionaries + for attr in ('data', 'meta'): + setattr( + existing_device, + attr, + { + **(getattr(existing_device, attr) or {}), + **(getattr(device, attr) or {}), + }, + ) + + def _merge_children( + self, device: BluetoothDevice, existing_device: BluetoothDevice + ): + """ + Merge the device's children upon set without overwriting the + existing ones. + """ + # Map of the existing children + existing_children = { + child.external_id: child for child in existing_device.children + } + + # Map of the new children + new_children = {child.id: child for child in device.children} + + # Merge the existing children with the new ones without overwriting them + existing_children.update(new_children) + existing_device.children = list(existing_children.values()) diff --git a/platypush/plugins/bluetooth/_file/__init__.py b/platypush/plugins/bluetooth/_file/__init__.py new file mode 100644 index 000000000..86794d129 --- /dev/null +++ b/platypush/plugins/bluetooth/_file/__init__.py @@ -0,0 +1,4 @@ +from .sender import FileSender + + +__all__ = ["FileSender"] diff --git a/platypush/plugins/bluetooth/_file/sender.py b/platypush/plugins/bluetooth/_file/sender.py new file mode 100644 index 000000000..1fffd0ad2 --- /dev/null +++ b/platypush/plugins/bluetooth/_file/sender.py @@ -0,0 +1,105 @@ +import logging +import os +from typing import Any, Type + +from PyOBEX.client import Client + +from platypush.context import get_bus +from platypush.entities.bluetooth import BluetoothDevice +from platypush.message.event.bluetooth import ( + BluetoothConnectionFailedEvent, + BluetoothDeviceEvent, + BluetoothFileSentEvent, + BluetoothFileTransferCancelledEvent, + BluetoothFileTransferStartedEvent, +) + +from platypush.plugins.bluetooth._legacy import LegacyManager +from platypush.plugins.bluetooth.model import ServiceClass + + +# pylint: disable=too-few-public-methods +class FileSender: + """ + Wrapper for the Bluetooth file send OBEX service. + """ + + def __init__(self, scanner: LegacyManager): + self._scanner = scanner + self.logger = logging.getLogger(__name__) + + def send_file( + self, + file: str, + device: str, + data: bytes, + ): + """ + Send a file to a device. + + :param file: Name/path of the file to send. + :param device: Name or address of the device to send the file to. + :param data: File data. + """ + + dev = self._scanner.get_device(device) + service = dev.known_services.get(ServiceClass.OBEX_OBJECT_PUSH) + assert service, ( + f'The device {device} does not expose the service ' + f'{str(ServiceClass.OBEX_OBJECT_PUSH)}' + ) + + port = service.port + client = self._connect(dev, port) + self._post_event(BluetoothFileTransferStartedEvent, dev, file=file) + self._send_file(client, dev, file, data) + + def _send_file( + self, + client: Client, + dev: BluetoothDevice, + file: str, + data: bytes, + ): + filename = os.path.basename(file) + + try: + client.put(filename, data) + self._post_event(BluetoothFileSentEvent, dev, file=file) + except Exception as e: + self._post_event( + BluetoothFileTransferCancelledEvent, + dev, + reason=str(e), + file=file, + ) + + raise AssertionError( + f'Failed to send file {file} to device {dev.address}: {e}' + ) from e + finally: + client.disconnect() + + def _connect(self, dev: BluetoothDevice, port: str) -> Client: + client = Client(dev.address, port) + + try: + client.connect() + assert ( + client.connection_id is not None + ), 'Could not establish a connection to the device' + except Exception as e: + self._post_event(BluetoothConnectionFailedEvent, dev, reason=str(e)) + raise AssertionError( + f'Connection to device {dev.address} failed: {e}' + ) from e + + return client + + def _post_event( + self, + event_type: Type[BluetoothDeviceEvent], + device: BluetoothDevice, + **event_args: Any, + ): + get_bus().post(event_type.from_device(device, **event_args)) diff --git a/platypush/plugins/bluetooth/_legacy/__init__.py b/platypush/plugins/bluetooth/_legacy/__init__.py index a100fedcb..76875cc52 100644 --- a/platypush/plugins/bluetooth/_legacy/__init__.py +++ b/platypush/plugins/bluetooth/_legacy/__init__.py @@ -1,11 +1,3 @@ -from ._model import BluetoothDevice -from ._scanner import DeviceScanner +from ._manager import LegacyManager - -__all__ = [ - "BluetoothDevice", - "DeviceScanner", -] - - -# vim:sw=4:ts=4:et: +__all__ = ["LegacyManager"] diff --git a/platypush/plugins/bluetooth/_legacy/_manager/__init__.py b/platypush/plugins/bluetooth/_legacy/_manager/__init__.py new file mode 100644 index 000000000..228dbdb62 --- /dev/null +++ b/platypush/plugins/bluetooth/_legacy/_manager/__init__.py @@ -0,0 +1,3 @@ +from ._base import LegacyManager + +__all__ = ["LegacyManager"] diff --git a/platypush/plugins/bluetooth/_legacy/_manager/_base.py b/platypush/plugins/bluetooth/_legacy/_manager/_base.py new file mode 100644 index 000000000..12bf4eddd --- /dev/null +++ b/platypush/plugins/bluetooth/_legacy/_manager/_base.py @@ -0,0 +1,346 @@ +from collections import defaultdict +from contextlib import contextmanager +from queue import Empty, Queue +from threading import Event, RLock, Thread, current_thread +from typing import ( + Dict, + Final, + Generator, + List, + Optional, + Union, +) +from typing_extensions import override + +import bluetooth + +from platypush.entities.bluetooth import BluetoothDevice, BluetoothService +from platypush.message.event.bluetooth import ( + BluetoothConnectionFailedEvent, + BluetoothDeviceConnectedEvent, + BluetoothDeviceDisconnectedEvent, +) + +from ..._manager import BaseBluetoothManager +from ..._types import RawServiceClass +from .._model import BluetoothDeviceBuilder +from ._connection import BluetoothConnection +from ._service import ServiceDiscoverer +from ._types import ConnectionId + + +class LegacyManager(BaseBluetoothManager): + """ + Scanner for Bluetooth non-low-energy devices. + """ + + _service_discovery_timeout: Final[int] = 30 + + def __init__(self, *args, **kwargs) -> None: + super().__init__(*args, **kwargs) + self._connections: Dict[ConnectionId, BluetoothConnection] = {} + """ Maps (address, port) pairs to Bluetooth connections. """ + self._connection_locks: Dict[ConnectionId, RLock] = defaultdict(RLock) + """ Maps (address, port) pairs to connection locks. """ + self._service_scanned_devices: Dict[str, bool] = {} + """ Maps the addresses of the devices whose services have been scanned. """ + + def get_device( + self, + device: str, + scan_duration: Optional[int] = None, + _fail_if_not_cached: bool = False, + ) -> BluetoothDevice: + """ + Get/discover a device by its address or name. + + :param device: Device address or name. + :param scan_duration: Overrides the duration of the scan. + :param _fail_if_not_cached: Throw an assertion error if the device + hasn't been cached yet. + """ + duration = scan_duration or self.poll_interval + dev = self._cache.get(device) + if dev: + return dev # If it's already cached, just return it. + + assert not _fail_if_not_cached, f'Device "{device}" not found' + + # Otherwise, scan for the device. + self.logger.info('Scanning for device "%s"...', device) + self.scan(duration=duration) + + # Run the method again, but this time fail if the device has not been + # found in the latest scan. + return self.get_device(device, scan_duration, _fail_if_not_cached=True) + + def _get_matching_services( + self, + device: str, + port: Optional[int] = None, + service_uuid: Optional[RawServiceClass] = None, + ) -> List[BluetoothService]: + """ + Given a device and a port or service UUID, return a list of matching + services. + """ + assert port or service_uuid, 'Please specify at least one of port/service_uuid' + dev = self.get_device(device) + assert dev, f'Device "{device}" not found' + + matching_services = [] + if port: + matching_services = [srv for srv in dev.services if srv.port == port] + elif service_uuid: + uuid = BluetoothService.to_uuid(service_uuid) + matching_services = [srv for srv in dev.services if uuid == srv.uuid] + + return matching_services + + def _connect_thread( + self, + conn_queue: Queue[BluetoothConnection], + device: str, + port: Optional[int] = None, + service_uuid: Optional[RawServiceClass] = None, + ): + """ + Connection thread that asynchronously pushes a + :class:`BluetoothConnection` object to a queue when connected, so the + caller can wait for the connection to complete and handle timeouts. + """ + dev = self.get_device(device) + matching_services = self._get_matching_services(device, port, service_uuid) + assert matching_services, ( + f'No services found on {dev.address} for ' + f'UUID={service_uuid} port={port}' + ) + + conn = BluetoothConnection( + address=dev.address, + service=matching_services[0], + thread=current_thread(), + ) + + existing_conn = self._connections.get(conn.key) + if existing_conn and existing_conn.socket: + conn = existing_conn + else: + with self._connection_locks[conn.key]: + addr = conn.address + port_ = conn.service.port + self.logger.info( + 'Opening connection to device %s on port %s', addr, port_ + ) + + # Connect to the specified address and port. + conn.socket.connect((addr, port_)) + self.logger.info('Connected to device %s on port %s', addr, port_) + self._connections[conn.key] = conn + + conn_queue.put_nowait(conn) + + @contextmanager + def _connect( + self, + device: str, + port: Optional[int] = None, + service_uuid: Optional[RawServiceClass] = None, + timeout: Optional[float] = None, + ) -> Generator[BluetoothConnection, None, None]: + """ + Wraps the connection thread in a context manager with timeout support. + """ + dev = self.get_device(device) + # Queue where the connection object is pushed once the socket is ready + conn_queue: Queue[BluetoothConnection] = Queue() + + # Start the connection thread + conn_thread = Thread( + target=self._connect_thread, + name=f'Bluetooth:connect@{device}', + args=(conn_queue, device, port, service_uuid), + ) + + conn_thread.start() + + # Wait for the connection object + timeout = timeout or self._connect_timeout + try: + conn = conn_queue.get(timeout=timeout) + except Empty as e: + dev.connected = False + self.notify(BluetoothConnectionFailedEvent, dev, reason=str(e)) + raise AssertionError(f'Connection to {device} timed out') from e + + dev.connected = True + self.notify(BluetoothDeviceConnectedEvent, dev) + yield conn + + # Close the connection once the context is over + with self._connection_locks[conn.key]: + conn.close() + self._connections.pop(conn.key, None) + + dev.connected = False + self.notify(BluetoothDeviceDisconnectedEvent, dev) + + @override + def connect( + self, + device: str, + port: Optional[int] = None, + service_uuid: Optional[RawServiceClass] = None, + interface: Optional[str] = None, + timeout: Optional[float] = None, + ): + connected = Event() + + def connect_thread(): + with self._connect(device, port, service_uuid) as conn: + connected.set() + conn.stop_event.wait() + + self.logger.info('Connection to %s successfully terminated', conn.address) + + # Start the connection thread + Thread( + target=connect_thread, + name=f'Bluetooth:connect:wrapper@{device}', + ).start() + + # Wait for the connected event + timeout = timeout or self._connect_timeout + conn_success = connected.wait(timeout=timeout) + assert conn_success, f'Connection to {device} timed out' + + @override + def disconnect( + self, + device: str, + port: Optional[int] = None, + service_uuid: Optional[RawServiceClass] = None, + ): + matching_connections = [ + conn + for conn in self._connections.values() + if conn.address == device + and (port is None or conn.service.port == port) + and (service_uuid is None or conn.service.uuid == service_uuid) + ] + + assert matching_connections, f'No active connections found to {device}' + for conn in matching_connections: + conn.close() + + @override + def scan(self, duration: Optional[float] = None) -> List[BluetoothDevice]: + duration = duration or self.poll_interval + assert duration, 'Scan duration must be set' + duration = int(max(duration, 1)) + + with self._scan_lock: + # Discover all devices. + try: + info = bluetooth.discover_devices( + duration=duration, lookup_names=True, lookup_class=True + ) + except IOError as e: + self.logger.warning('Could not discover devices: %s', e) + # Wait a bit before a potential retry + self._stop_event.wait(timeout=1) + return [] + + # Pre-fill the services for the devices that have already been scanned. + services: Dict[str, List[BluetoothService]] = { + addr: self._cache.get(addr).services # type: ignore + for addr, _, __ in info + if self._cache.get(addr) is not None + } + + # Check if there are any devices that have not been scanned yet. + unknown_devices = [ + addr + for addr, _, __ in info + if not self._service_scanned_devices.get(addr, False) + ] + + # Discover the services for the devices that have not been scanned. + if unknown_devices: + services.update( + ServiceDiscoverer().discover( + *unknown_devices, timeout=self._service_discovery_timeout + ) + ) + + # Initialize the BluetoothDevice objects. + devices = { + addr: BluetoothDeviceBuilder.build( + address=addr, + name=name, + raw_class=class_, + services=services.get(addr, []), + ) + for addr, name, class_ in info + } + + for dev in devices.values(): + self._service_scanned_devices[dev.address] = True + self._device_queue.put_nowait(dev) + + return list(devices.values()) + + @override + def read( + self, + device: str, + service_uuid: RawServiceClass, + interface: Optional[str] = None, + connect_timeout: Optional[float] = None, + size: int = 1024, + ) -> bytearray: + """ + :param size: Number of bytes to read. + """ + with self._connect( + device, service_uuid=service_uuid, timeout=connect_timeout + ) as conn: + try: + return conn.socket.recv(size) + except bluetooth.BluetoothError as e: + raise AssertionError(f'Error reading from {device}: {e}') from e + + @override + def write( + self, + device: str, + data: Union[bytes, bytearray], + service_uuid: RawServiceClass, + interface: Optional[str] = None, + connect_timeout: Optional[float] = None, + ): + with self._connect( + device, service_uuid=service_uuid, timeout=connect_timeout + ) as conn: + try: + return conn.socket.send(data) + except bluetooth.BluetoothError as e: + raise AssertionError(f'Error reading from {device}: {e}') from e + + @override + def run(self): + super().run() + self.logger.info('Starting legacy Bluetooth scanner') + + while not self.should_stop(): + scan_enabled = self._scan_enabled.wait(timeout=1) + if scan_enabled: + self.scan(duration=self.poll_interval) + + @override + def stop(self): + # Close any active connections + for conn in list(self._connections.values()): + conn.close(timeout=5) + + self.logger.info('Stopped the Bluetooth legacy scanner') diff --git a/platypush/plugins/bluetooth/_legacy/_manager/_connection.py b/platypush/plugins/bluetooth/_legacy/_manager/_connection.py new file mode 100644 index 000000000..750cf3255 --- /dev/null +++ b/platypush/plugins/bluetooth/_legacy/_manager/_connection.py @@ -0,0 +1,64 @@ +from dataclasses import dataclass, field +from logging import getLogger +from threading import Event, Thread, current_thread +from typing import Optional + +import bluetooth + +from platypush.entities.bluetooth import BluetoothService + +from ._types import ConnectionId + +_logger = getLogger(__name__) + + +@dataclass +class BluetoothConnection: + """ + Models a connection to a Bluetooth device. + """ + + address: str + service: BluetoothService + socket: bluetooth.BluetoothSocket = field( + init=False, default_factory=bluetooth.BluetoothSocket + ) + thread: Thread = field(default_factory=current_thread) + stop_event: Event = field(default_factory=Event) + + def __post_init__(self): + """ + Initialize the Bluetooth socket with the given protocol. + """ + self.socket = bluetooth.BluetoothSocket(self.service.protocol.value) + + @property + def key(self) -> ConnectionId: + return self.address, self.service.port + + def close(self, timeout: Optional[float] = None): + _logger.info('Closing connection to %s', self.address) + + # Set the stop event + self.stop_event.set() + + # Close the socket + if self.socket: + try: + self.socket.close() + except Exception as e: + _logger.warning( + 'Failed to close Bluetooth socket on %s: %s', + self.address, + e, + ) + + # Avoid deadlocking by waiting for our own thread to terminate + if current_thread() is self.thread: + return + + # Wait for the connection thread to terminate + if self.thread and self.thread.is_alive(): + self.thread.join(timeout=timeout) + if self.thread and self.thread.is_alive(): + _logger.warning('Connection to %s still alive after closing', self.address) diff --git a/platypush/plugins/bluetooth/_legacy/_manager/_service.py b/platypush/plugins/bluetooth/_legacy/_manager/_service.py new file mode 100644 index 000000000..924d55d04 --- /dev/null +++ b/platypush/plugins/bluetooth/_legacy/_manager/_service.py @@ -0,0 +1,64 @@ +from concurrent.futures import ProcessPoolExecutor +from logging import getLogger +from typing import Dict, List, Optional + +import bluetooth + +from platypush.entities.bluetooth import BluetoothService + +from .._model import BluetoothServicesBuilder + + +# pylint: disable=too-few-public-methods +class ServiceDiscoverer: + """ + Runs the service discovery processes in a pool. + """ + + def __init__(self, max_workers: int = 4, timeout: float = 30): + self._max_workers = max_workers + self._timeout = timeout + self.logger = getLogger(__name__) + + def _discover(self, address: str) -> List[BluetoothService]: + """ + Inner implementation of the service discovery for a specific device. + """ + self.logger.info("Discovering services for %s...", address) + try: + return BluetoothServicesBuilder.build( + bluetooth.find_service(address=address) + ) + except Exception as e: + self.logger.warning( + "Failed to discover services for the device %s: %s", address, e + ) + return [] + finally: + self.logger.info("Service discovery for %s completed", address) + + def discover( + self, *addresses: str, timeout: Optional[float] = None + ) -> Dict[str, List[BluetoothService]]: + """ + Discover the services for the given addresses. Discovery will happen in + parallel through a process pool. + + :param addresses: The addresses to scan. + :param timeout: The timeout in seconds. + :return: An ``{address: [services]}`` dictionary with the discovered + services per device. + """ + discovered_services: Dict[str, List[BluetoothService]] = {} + with ProcessPoolExecutor(max_workers=self._max_workers) as executor: + try: + for i, services in enumerate( + executor.map( + self._discover, addresses, timeout=timeout or self._timeout + ) + ): + discovered_services[addresses[i]] = services + except TimeoutError: + self.logger.warning("Service discovery timed out.") + + return discovered_services diff --git a/platypush/plugins/bluetooth/_legacy/_manager/_types.py b/platypush/plugins/bluetooth/_legacy/_manager/_types.py new file mode 100644 index 000000000..6a21c1a0d --- /dev/null +++ b/platypush/plugins/bluetooth/_legacy/_manager/_types.py @@ -0,0 +1,5 @@ +from typing import Tuple + + +ConnectionId = Tuple[str, int] +""" (address, port) pair. """ diff --git a/platypush/plugins/bluetooth/_legacy/_model/__init__.py b/platypush/plugins/bluetooth/_legacy/_model/__init__.py index f226124b0..4c69312e6 100644 --- a/platypush/plugins/bluetooth/_legacy/_model/__init__.py +++ b/platypush/plugins/bluetooth/_legacy/_model/__init__.py @@ -1,8 +1,8 @@ -from ._device import BluetoothDevice -from ._service import BluetoothService +from ._device import BluetoothDeviceBuilder +from ._services import BluetoothServicesBuilder __all__ = [ - "BluetoothDevice", - "BluetoothService", + "BluetoothDeviceBuilder", + "BluetoothServicesBuilder", ] diff --git a/platypush/plugins/bluetooth/_legacy/_model/_device.py b/platypush/plugins/bluetooth/_legacy/_model/_device.py new file mode 100644 index 000000000..197078597 --- /dev/null +++ b/platypush/plugins/bluetooth/_legacy/_model/_device.py @@ -0,0 +1,76 @@ +from typing import Iterable, List, Optional + +from bluetooth_numbers import oui + +from platypush.entities.bluetooth import BluetoothDevice, BluetoothService + +from platypush.plugins.bluetooth.model import ( + MajorServiceClass, + MajorDeviceClass, + MinorDeviceClass, +) + + +# pylint: disable=too-few-public-methods +class BluetoothDeviceBuilder: + """ + :class:`platypush.entity.bluetooth.BluetoothDevice` entity builder from the + raw pybluez data. + """ + + @classmethod + def build( + cls, + address: str, + name: str, + raw_class: int, + services: Optional[Iterable[BluetoothService]] = None, + ) -> BluetoothDevice: + """ + Builds a :class:`platypush.entity.bluetooth.BluetoothDevice` from the + raw pybluez data. + """ + return BluetoothDevice( + id=address, + address=address, + name=name, + major_service_classes=cls._parse_major_service_classes(raw_class), + major_device_class=cls._parse_major_device_class(raw_class), + minor_device_classes=cls._parse_minor_device_classes(raw_class), + manufacturer=cls._parse_manufacturer(address), + supports_legacy=True, + supports_ble=False, + children=services, + ) + + @staticmethod + def _parse_major_device_class(raw_class: int) -> MajorDeviceClass: + """ + Parse the device major class from the raw exposed class value. + """ + device_classes = [ + cls for cls in MajorDeviceClass if cls.value.matches(raw_class) + ] + + return device_classes[0] if device_classes else MajorDeviceClass.UNKNOWN + + @staticmethod + def _parse_minor_device_classes(raw_class: int) -> List[MinorDeviceClass]: + """ + Parse the device minor classes from the raw exposed class value. + """ + return [cls for cls in MinorDeviceClass if cls.value.matches(raw_class)] + + @staticmethod + def _parse_major_service_classes(raw_class: int) -> List[MajorServiceClass]: + """ + Parse the device major service classes from the raw exposed class value. + """ + return [cls for cls in MajorServiceClass if cls.value.matches(raw_class)] + + @staticmethod + def _parse_manufacturer(address: str) -> Optional[str]: + """ + Parse the device manufacturer name from the raw MAC address. + """ + return oui.get(':'.join(address.split(':')[:3]).upper()) diff --git a/platypush/plugins/bluetooth/_legacy/_model/_service/__init__.py b/platypush/plugins/bluetooth/_legacy/_model/_service/__init__.py deleted file mode 100644 index 66d5d6e27..000000000 --- a/platypush/plugins/bluetooth/_legacy/_model/_service/__init__.py +++ /dev/null @@ -1,4 +0,0 @@ -from ._base import BluetoothService - - -__all__ = ['BluetoothService'] diff --git a/platypush/plugins/bluetooth/_legacy/_model/_service/_base.py b/platypush/plugins/bluetooth/_legacy/_model/_service/_base.py deleted file mode 100644 index fa7796355..000000000 --- a/platypush/plugins/bluetooth/_legacy/_model/_service/_base.py +++ /dev/null @@ -1,127 +0,0 @@ -from dataclasses import dataclass -from typing import Any, Dict, Iterable, Optional, Set, Tuple -from uuid import UUID - -from .._protocol import Protocol -from ._directory import ServiceClass -from ._types import RawServiceClass - -VersionedServices = Dict[ServiceClass, Optional[int]] -""" Service -> Version mapping. """ - - -@dataclass -class BluetoothService: - """ - Models a discovered Bluetooth service. - """ - - address: str - """ The address of the service that exposes the service. """ - port: int - """ The Bluetooth port associated to the service. """ - protocol: Protocol - """ The service protocol. """ - name: Optional[str] - """ The name of the service. """ - description: Optional[str] - """ The description of the service. """ - service_id: Optional[str] - """ The ID of the service. """ - service_classes: VersionedServices - """ - The compatible classes exposed by the service - see - https://btprodspecificationrefs.blob.core.windows.net/assigned-numbers/Assigned%20Number%20Types/Assigned%20Numbers.pdf, - Section 5. - """ - unknown_service_classes: Iterable[RawServiceClass] - """ Service classes that are not supported. """ - - @classmethod - def build(cls, service: Dict[str, Any]) -> 'BluetoothService': - """ - Builds a :class:`BluetoothService` from a service dictionary returned by - pybluez. - """ - return cls( - address=service['host'], - port=service['port'], - protocol=Protocol(service['protocol']), - name=service['name'], - description=service['description'], - service_id=service['service-id'], - service_classes=cls._parse_services( - service['service-classes'], service['profiles'] - ), - unknown_service_classes=cls._parse_unknown_services( - service['service-classes'], - ), - ) - - @classmethod - def _parse_services( - cls, service_classes: Iterable[str], profiles: Iterable[Tuple[str, int]] - ) -> VersionedServices: - """ - Parses the services. - - :param service_classes: The service classes returned by pybluez. - :param profiles: The profiles returned by pybluez as a list of - ``[(service, version)]`` tuples. - :return: A list of parsed service classes. - """ - # Parse the service classes - parsed_services: Dict[RawServiceClass, ServiceClass] = {} - for srv in service_classes: - srv_class = cls._parse_service_class(srv) - parsed_services[srv_class.value] = srv_class - - # Parse the service classes versions - versioned_classes: VersionedServices = {} - for srv, version in profiles: - value = cls._parse_service_class(srv).value - parsed_srv = parsed_services.get(value) - if parsed_srv: - versioned_classes[parsed_srv] = version - - return { - srv: versioned_classes.get(srv) - for srv in parsed_services.values() - if srv != ServiceClass.UNKNOWN - } - - @classmethod - def _parse_unknown_services( - cls, service_classes: Iterable[str] - ) -> Set[RawServiceClass]: - return { - cls._uuid(srv) - for srv in service_classes - if cls._parse_service_class(srv) == ServiceClass.UNKNOWN - } - - @classmethod - def _parse_service_class(cls, srv: str) -> ServiceClass: - """ - :param srv: The service class returned by pybluez as a string (either - hex-encoded or UUID). - :return: The parsed :class:`ServiceClass` object or ``ServiceClass.UNKNOWN``. - """ - srv_class: ServiceClass = ServiceClass.UNKNOWN - try: - srv_class = ServiceClass.get(cls._uuid(srv)) - except (TypeError, ValueError): - pass - - return srv_class - - @staticmethod - def _uuid(s: str) -> RawServiceClass: - """ - :param s: The service class returned by pybluez as a string. - :return: The UUID of the service class as a 16-bit or 128-bit identifier. - """ - try: - return UUID(s) - except ValueError: - return int(s, 16) diff --git a/platypush/plugins/bluetooth/_legacy/_model/_services.py b/platypush/plugins/bluetooth/_legacy/_model/_services.py new file mode 100644 index 000000000..133ad5d16 --- /dev/null +++ b/platypush/plugins/bluetooth/_legacy/_model/_services.py @@ -0,0 +1,47 @@ +from typing import Any, Dict, Iterable, List +from platypush.entities.bluetooth import BluetoothService + +from ..._model import ServiceClass + + +# pylint: disable=too-few-public-methods +class BluetoothServicesBuilder: + """ + Builds a list of :class:`platypush.entities.bluetooth.BluetoothService` + entities from the list of dictionaries returned by + ``bluetooth.find_services()``. + """ + + @classmethod + def build(cls, services: Iterable[Dict[str, Any]]) -> List[BluetoothService]: + """ + Parse the services exposed by the device from the raw pybluez data. + """ + parsed_services = {} + + for srv in services: + service_args = { + key: srv.get(key) for key in ['name', 'description', 'port', 'protocol'] + } + + classes = srv.get('service-classes', []) + versions = dict(srv.get('profiles', [])) + + for srv_cls in classes: + uuid = BluetoothService.to_uuid(srv_cls) + parsed_service = parsed_services[uuid] = BluetoothService( + id=f'{srv["host"]}::{srv_cls}', + uuid=uuid, + version=versions.get(srv_cls), + **service_args, + ) + + # Ensure that the service name is always set + if not parsed_service.name: + parsed_service.name = ( + str(parsed_service.service_class) + if parsed_service.service_class != ServiceClass.UNKNOWN + else f'[{parsed_service.uuid}]' + ) + + return list(parsed_services.values()) diff --git a/platypush/plugins/bluetooth/_manager.py b/platypush/plugins/bluetooth/_manager.py new file mode 100644 index 000000000..5cd663391 --- /dev/null +++ b/platypush/plugins/bluetooth/_manager.py @@ -0,0 +1,163 @@ +from abc import ABC, abstractmethod +import logging +from queue import Queue +import threading +from typing import Collection, Optional, Type, Union +from platypush.context import get_bus + +from platypush.entities.bluetooth import BluetoothDevice +from platypush.message.event.bluetooth import BluetoothDeviceEvent + +from ._cache import EntityCache +from ._types import RawServiceClass + + +class BaseBluetoothManager(ABC, threading.Thread): + """ + Abstract interface for Bluetooth managers. + """ + + def __init__( + self, + interface: str, + poll_interval: float, + connect_timeout: float, + stop_event: threading.Event, + scan_lock: threading.RLock, + scan_enabled: threading.Event, + device_queue: Queue[BluetoothDevice], + service_uuids: Optional[Collection[RawServiceClass]] = None, + device_cache: Optional[EntityCache] = None, + **kwargs, + ): + """ + :param interface: The Bluetooth interface to use. + :param poll_interval: Scan interval in seconds. + :param connect_timeout: Connection timeout in seconds. + :param stop_event: Event used to synchronize on whether we should stop the plugin. + :param scan_lock: Lock to synchronize scanning access to the Bluetooth device. + :param scan_enabled: Event used to enable/disable scanning. + :param device_queue: Queue used by the ``EventHandler`` to publish + updates with the new parsed device entities. + :param device_cache: Cache used to keep track of discovered devices. + """ + kwargs['name'] = f'Bluetooth:{self.__class__.__name__}' + super().__init__(**kwargs) + + self.logger = logging.getLogger(__name__) + self.poll_interval = poll_interval + self._interface: Optional[str] = interface + self._connect_timeout: float = connect_timeout + self._service_uuids: Collection[RawServiceClass] = service_uuids or [] + self._stop_event = stop_event + self._scan_lock = scan_lock + self._scan_enabled = scan_enabled + self._device_queue = device_queue + + self._cache = device_cache or EntityCache() + """ Cache of discovered devices. """ + + def notify( + self, event_type: Type[BluetoothDeviceEvent], device: BluetoothDevice, **kwargs + ): + """ + Notify about a device update event by posting a + :class:`platypush.message.event.bluetooth.BluetoothDeviceEvent` event on + the bus and pushing the updated entity upstream. + """ + get_bus().post(event_type.from_device(device=device, **kwargs)) + self._device_queue.put_nowait(device) + + def should_stop(self) -> bool: + return self._stop_event.is_set() + + @abstractmethod + def connect( + self, + device: str, + port: Optional[int] = None, + service_uuid: Optional[RawServiceClass] = None, + interface: Optional[str] = None, + timeout: Optional[float] = None, + ): + """ + Pair and connect to a device by address or name. + + :param device: The device address or name. + :param port: The Bluetooth port to use. + :param service_uuid: The service UUID to connect to. + :param interface: The Bluetooth interface to use (it overrides the + default ``interface``). + :param timeout: The connection timeout in seconds (it overrides the + default ``connect_timeout``). + """ + + @abstractmethod + def disconnect( + self, + device: str, + port: Optional[int] = None, + service_uuid: Optional[RawServiceClass] = None, + ): + """ + Close an active connection to a device. + + :param device: The device address or name. + :param port: If connected to a non-BLE device, the optional port to + disconnect. Either ``port`` or ``service_uuid`` is required for + non-BLE devices. + :param service_uuid: The UUID of the service to disconnect from. + """ + + @abstractmethod + def scan(self, duration: Optional[float] = None) -> Collection[BluetoothDevice]: + """ + Scan for Bluetooth devices nearby and return the results as a list of + entities. + + :param duration: Scan duration in seconds (default: same as the plugin's + `poll_interval` configuration parameter) + """ + + @abstractmethod + def read( + self, + device: str, + service_uuid: RawServiceClass, + interface: Optional[str] = None, + connect_timeout: Optional[float] = None, + ) -> bytearray: + """ + :param device: Name or address of the device to read from. + :param service_uuid: Service UUID. + :param interface: Bluetooth adapter name to use (default configured if None). + :param connect_timeout: Connection timeout in seconds (default: same as the + configured `connect_timeout`). + """ + + @abstractmethod + def write( + self, + device: str, + data: Union[bytes, bytearray], + service_uuid: RawServiceClass, + interface: Optional[str] = None, + connect_timeout: Optional[float] = None, + ): + """ + :param device: Name or address of the device to read from. + :param data: Raw data to be sent. + :param service_uuid: Service UUID. + :param interface: Bluetooth adapter name to use (default configured if None) + :param connect_timeout: Connection timeout in seconds (default: same as the + configured `connect_timeout`). + """ + + @abstractmethod + def stop(self): + """ + Stop any pending tasks and terminate the thread. + """ + + +# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/bluetooth/_model/__init__.py b/platypush/plugins/bluetooth/_model/__init__.py new file mode 100644 index 000000000..a01d80b50 --- /dev/null +++ b/platypush/plugins/bluetooth/_model/__init__.py @@ -0,0 +1,11 @@ +from ._classes import MajorDeviceClass, MajorServiceClass, MinorDeviceClass +from ._protocol import Protocol +from ._service import ServiceClass + +__all__ = [ + "MajorDeviceClass", + "MajorServiceClass", + "MinorDeviceClass", + "Protocol", + "ServiceClass", +] diff --git a/platypush/plugins/bluetooth/_legacy/_model/_classes/__init__.py b/platypush/plugins/bluetooth/_model/_classes/__init__.py similarity index 100% rename from platypush/plugins/bluetooth/_legacy/_model/_classes/__init__.py rename to platypush/plugins/bluetooth/_model/_classes/__init__.py diff --git a/platypush/plugins/bluetooth/_legacy/_model/_classes/_base.py b/platypush/plugins/bluetooth/_model/_classes/_base.py similarity index 100% rename from platypush/plugins/bluetooth/_legacy/_model/_classes/_base.py rename to platypush/plugins/bluetooth/_model/_classes/_base.py diff --git a/platypush/plugins/bluetooth/_legacy/_model/_classes/_device/__init__.py b/platypush/plugins/bluetooth/_model/_classes/_device/__init__.py similarity index 100% rename from platypush/plugins/bluetooth/_legacy/_model/_classes/_device/__init__.py rename to platypush/plugins/bluetooth/_model/_classes/_device/__init__.py diff --git a/platypush/plugins/bluetooth/_legacy/_model/_classes/_device/_major.py b/platypush/plugins/bluetooth/_model/_classes/_device/_major.py similarity index 100% rename from platypush/plugins/bluetooth/_legacy/_model/_classes/_device/_major.py rename to platypush/plugins/bluetooth/_model/_classes/_device/_major.py diff --git a/platypush/plugins/bluetooth/_legacy/_model/_classes/_device/_minor.py b/platypush/plugins/bluetooth/_model/_classes/_device/_minor.py similarity index 100% rename from platypush/plugins/bluetooth/_legacy/_model/_classes/_device/_minor.py rename to platypush/plugins/bluetooth/_model/_classes/_device/_minor.py diff --git a/platypush/plugins/bluetooth/_legacy/_model/_classes/_service.py b/platypush/plugins/bluetooth/_model/_classes/_service.py similarity index 100% rename from platypush/plugins/bluetooth/_legacy/_model/_classes/_service.py rename to platypush/plugins/bluetooth/_model/_classes/_service.py diff --git a/platypush/plugins/bluetooth/_legacy/_model/_protocol.py b/platypush/plugins/bluetooth/_model/_protocol.py similarity index 97% rename from platypush/plugins/bluetooth/_legacy/_model/_protocol.py rename to platypush/plugins/bluetooth/_model/_protocol.py index db69bf468..e94cefe8c 100644 --- a/platypush/plugins/bluetooth/_legacy/_model/_protocol.py +++ b/platypush/plugins/bluetooth/_model/_protocol.py @@ -6,6 +6,7 @@ class Protocol(Enum): Models a Bluetooth protocol. """ + UNKNOWN = 'UNKNOWN' RFCOMM = 'RFCOMM' L2CAP = 'L2CAP' TCP = 'TCP' diff --git a/platypush/plugins/bluetooth/_model/_service/__init__.py b/platypush/plugins/bluetooth/_model/_service/__init__.py new file mode 100644 index 000000000..3eb506eff --- /dev/null +++ b/platypush/plugins/bluetooth/_model/_service/__init__.py @@ -0,0 +1,6 @@ +from ._directory import ServiceClass + + +__all__ = [ + "ServiceClass", +] diff --git a/platypush/plugins/bluetooth/_legacy/_model/_service/_directory.py b/platypush/plugins/bluetooth/_model/_service/_directory.py similarity index 88% rename from platypush/plugins/bluetooth/_legacy/_model/_service/_directory.py rename to platypush/plugins/bluetooth/_model/_service/_directory.py index cb32c988d..bc0c70820 100644 --- a/platypush/plugins/bluetooth/_legacy/_model/_service/_directory.py +++ b/platypush/plugins/bluetooth/_model/_service/_directory.py @@ -1,10 +1,12 @@ import re from enum import Enum from typing import Dict +from uuid import UUID import bluetooth_numbers +from bleak.uuids import uuid16_dict, uuid128_dict -from ._types import RawServiceClass +from platypush.plugins.bluetooth._types import RawServiceClass def _service_name_to_enum_name(service_name: str) -> str: @@ -111,6 +113,10 @@ Section 3.3. # Section 3.4 _service_classes.update(bluetooth_numbers.service) +# Extend the service classes with the GATT service UUIDs defined in Bleak +_service_classes.update(uuid16_dict) # type: ignore +_service_classes.update({UUID(uuid): name for uuid, name in uuid128_dict.items()}) + _service_classes_by_name: Dict[str, RawServiceClass] = { name: cls for cls, name in _service_classes.items() } @@ -134,6 +140,12 @@ class _ServiceClassMeta: try: return ServiceClass(value) except ValueError: + try: + if isinstance(value, UUID): + return ServiceClass(int(str(value).upper()[4:8], 16)) + except ValueError: + pass + return ServiceClass.UNKNOWN # type: ignore @classmethod @@ -149,7 +161,9 @@ class _ServiceClassMeta: ) def __str__(self) -> str: - return _service_classes.get(self.value, ServiceClass(0).value) + return _service_classes.get( + self.value, ServiceClass.UNKNOWN.value # type: ignore + ) def __repr__(self) -> str: return f"<{self.value}: {str(self)}>" diff --git a/platypush/plugins/bluetooth/_legacy/_model/_service/_directory.pyi b/platypush/plugins/bluetooth/_model/_service/_directory.pyi similarity index 84% rename from platypush/plugins/bluetooth/_legacy/_model/_service/_directory.pyi rename to platypush/plugins/bluetooth/_model/_service/_directory.pyi index 37aea1bbb..ee2186327 100644 --- a/platypush/plugins/bluetooth/_legacy/_model/_service/_directory.pyi +++ b/platypush/plugins/bluetooth/_model/_service/_directory.pyi @@ -2,7 +2,7 @@ from enum import Enum -from ._types import RawServiceClass +from platypush.plugins.bluetooth._types import RawServiceClass class ServiceClass(Enum): """ @@ -14,6 +14,8 @@ class ServiceClass(Enum): UNKNOWN = ... """ A class for unknown services. """ + OBEX_OBJECT_PUSH = ... + """ Class for the OBEX Object Push service. """ @classmethod def get(cls, value: RawServiceClass) -> "ServiceClass": diff --git a/platypush/plugins/bluetooth/_plugin.py b/platypush/plugins/bluetooth/_plugin.py new file mode 100644 index 000000000..4e22ee737 --- /dev/null +++ b/platypush/plugins/bluetooth/_plugin.py @@ -0,0 +1,603 @@ +import base64 +import os +from queue import Empty, Queue +import threading +import time +from typing import ( + Collection, + Dict, + Final, + List, + Optional, + Union, + Type, +) + +from typing_extensions import override + +from platypush.context import get_bus, get_plugin +from platypush.entities import EntityManager, get_entities_engine +from platypush.entities.bluetooth import BluetoothDevice, BluetoothService +from platypush.message.event.bluetooth import ( + BluetoothScanPausedEvent, + BluetoothScanResumedEvent, +) +from platypush.plugins import RunnablePlugin, action +from platypush.plugins.db import DbPlugin + +from ._ble import BLEManager +from ._cache import EntityCache +from ._legacy import LegacyManager +from ._types import RawServiceClass +from ._manager import BaseBluetoothManager + + +class BluetoothPlugin(RunnablePlugin, EntityManager): + """ + Plugin to interact with Bluetooth devices. + + This plugin uses `_Bleak_ `_ to interact + with the Bluetooth stack and `_Theengs_ `_ + to map the services exposed by the devices into native entities. + + The full list of devices natively supported can be found + `here `_. + + Note that the support for Bluetooth low-energy devices requires a Bluetooth + adapter compatible with the Bluetooth 5.0 specification or higher. + + Requires: + + * **bleak** (``pip install bleak``) + * **bluetooth-numbers** (``pip install bluetooth-numbers``) + * **TheengsGateway** (``pip install git+https://github.com/theengs/gateway``) + * **pybluez** (``pip install git+https://github.com/pybluez/pybluez``) + * **pyobex** (``pip install git+https://github.com/BlackLight/PyOBEX``) + + Triggers: + + * :class:`platypush.message.event.bluetooth.BluetoothConnectionFailedEvent` + * :class:`platypush.message.event.bluetooth.BluetoothDeviceConnectedEvent` + * :class:`platypush.message.event.bluetooth.BluetoothDeviceDisconnectedEvent` + * :class:`platypush.message.event.bluetooth.BluetoothDeviceFoundEvent` + * :class:`platypush.message.event.bluetooth.BluetoothDeviceLostEvent` + * :class:`platypush.message.event.bluetooth.BluetoothFileReceivedEvent` + * :class:`platypush.message.event.bluetooth.BluetoothFileSentEvent` + * :class:`platypush.message.event.bluetooth.BluetoothFileTransferStartedEvent` + * :class:`platypush.message.event.bluetooth.BluetoothScanPausedEvent` + * :class:`platypush.message.event.bluetooth.BluetoothScanResumedEvent` + * :class:`platypush.message.event.entities.EntityUpdateEvent` + + """ + + _default_connect_timeout: Final[int] = 20 + """ Default connection timeout (in seconds) """ + + _default_scan_duration: Final[float] = 10.0 + """ Default duration of a discovery session (in seconds) """ + + def __init__( + self, + interface: Optional[str] = None, + connect_timeout: float = _default_connect_timeout, + service_uuids: Optional[Collection[RawServiceClass]] = None, + scan_paused_on_start: bool = False, + poll_interval: float = _default_scan_duration, + **kwargs, + ): + """ + :param interface: Name of the Bluetooth interface to use (e.g. ``hci0`` + on Linux). Default: first available interface. + :param connect_timeout: Timeout in seconds for the connection to a + Bluetooth device. Default: 20 seconds. + :param service_uuids: List of service UUIDs to discover. + Default: all. + :param scan_paused_on_start: If ``True``, the plugin will not the + scanning thread until :meth:`.scan_resume` is called (default: + ``False``). + + """ + kwargs['poll_interval'] = poll_interval + super().__init__(**kwargs) + + self._interface: Optional[str] = interface + """ Default Bluetooth interface to use """ + self._connect_timeout: float = connect_timeout + """ Connection timeout in seconds """ + self._service_uuids: Collection[RawServiceClass] = service_uuids or [] + """ UUIDs to discover """ + self._scan_lock = threading.RLock() + """ Lock to synchronize scanning access to the Bluetooth device """ + self._scan_enabled = threading.Event() + """ Event used to enable/disable scanning """ + self._device_queue: Queue[BluetoothDevice] = Queue() + """ + Queue used by the Bluetooth managers to published the discovered + Bluetooth devices. + """ + self._device_cache = EntityCache() + """ + Cache of the devices discovered by the plugin. + """ + + self._managers: Dict[Type[BaseBluetoothManager], BaseBluetoothManager] = {} + """ + Bluetooth managers threads, one for BLE devices and one for non-BLE + devices. + """ + + self._scan_controller_timer: Optional[threading.Timer] = None + """ Timer used to temporarily pause the discovery process """ + + if not scan_paused_on_start: + self._scan_enabled.set() + + def _refresh_cache(self) -> None: + # Wait for the entities engine to start + get_entities_engine().wait_start() + + with get_plugin(DbPlugin).get_session( + autoflush=False, autocommit=False, expire_on_commit=False + ) as session: + existing_devices = [d.copy() for d in session.query(BluetoothDevice).all()] + + for dev in existing_devices: + self._device_cache.add(dev) + + def _init_bluetooth_managers(self): + """ + Initializes the Bluetooth managers threads. + """ + manager_args = { + 'interface': self._interface, + 'poll_interval': self.poll_interval, + 'connect_timeout': self._connect_timeout, + 'stop_event': self._should_stop, + 'scan_lock': self._scan_lock, + 'scan_enabled': self._scan_enabled, + 'device_queue': self._device_queue, + 'service_uuids': list(map(BluetoothService.to_uuid, self._service_uuids)), + 'device_cache': self._device_cache, + } + + self._managers = { + BLEManager: BLEManager(**manager_args), + LegacyManager: LegacyManager(**manager_args), + } + + def _scan_state_set(self, state: bool, duration: Optional[float] = None): + """ + Set the state of the scanning process. + + :param state: ``True`` to enable the scanning process, ``False`` to + disable it. + :param duration: The duration of the pause (in seconds) or ``None``. + """ + + def timer_callback(): + if state: + self.scan_pause() + else: + self.scan_resume() + + self._scan_controller_timer = None + + with self._scan_lock: + if not state and self._scan_enabled.is_set(): + get_bus().post(BluetoothScanPausedEvent(duration=duration)) + elif state and not self._scan_enabled.is_set(): + get_bus().post(BluetoothScanResumedEvent(duration=duration)) + + if state: + self._scan_enabled.set() + else: + self._scan_enabled.clear() + + if duration and not self._scan_controller_timer: + self._scan_controller_timer = threading.Timer(duration, timer_callback) + self._scan_controller_timer.start() + + def _cancel_scan_controller_timer(self): + """ + Cancels a scan controller timer if scheduled. + """ + if self._scan_controller_timer: + self._scan_controller_timer.cancel() + + def _manager_by_device( + self, + device: BluetoothDevice, + port: Optional[int] = None, + service_uuid: Optional[Union[str, RawServiceClass]] = None, + ) -> BaseBluetoothManager: + """ + :param device: A discovered Bluetooth device. + :param port: The port to connect to. + :param service_uuid: The UUID of the service to connect to. + :return: The manager associated with the device (BLE or legacy). + """ + # No port nor service UUID -> use the BLE manager for direct connection + if not (port or service_uuid): + return self._managers[BLEManager] + + uuid = BluetoothService.to_uuid(service_uuid) if service_uuid else None + matching_services = ( + [srv for srv in device.services if srv.port == port] + if port + else [srv for srv in device.services if srv.uuid == uuid] + ) + + assert matching_services, ( + f'No service found on the device {device} for port={port}, ' + f'service_uuid={service_uuid}' + ) + + srv = matching_services[0] + return ( + self._managers[BLEManager] if srv.is_ble else self._managers[LegacyManager] + ) + + def _get_device(self, device: str, _fail_if_not_cached=False) -> BluetoothDevice: + """ + Get a device by its address or name, and scan for it if it's not + cached. + """ + dev = self._device_cache.get(device) + if dev: + return dev + + assert not _fail_if_not_cached, f'Device {device} not found' + self.logger.info('Scanning for unknown device %s', device) + self.scan() + return self._get_device(device, _fail_if_not_cached=True) + + @action + def connect( + self, + device: str, + port: Optional[int] = None, + service_uuid: Optional[Union[RawServiceClass, str]] = None, + interface: Optional[str] = None, + timeout: Optional[float] = None, + ): + """ + Pair and connect to a device by address or name. + + :param device: The device address or name. + :param port: The port to connect to. Either ``port`` or + ``service_uuid`` is required for non-BLE devices. + :param service_uuid: The UUID of the service to connect to. Either + ``port`` or ``service_uuid`` is required for non-BLE devices. + :param interface: The Bluetooth interface to use (it overrides the + default ``interface``). + :param timeout: The connection timeout in seconds (it overrides the + default ``connect_timeout``). + """ + dev = self._get_device(device) + manager = self._manager_by_device(dev, port=port, service_uuid=service_uuid) + uuid = BluetoothService.to_uuid(service_uuid) if service_uuid else None + manager.connect( + dev.address, + port=port, + service_uuid=uuid, + interface=interface, + timeout=timeout, + ) + + @action + def disconnect( + self, + device: str, + port: Optional[int] = None, + service_uuid: Optional[RawServiceClass] = None, + ): + """ + Close an active connection to a device. + + Note that this method can only close connections that have been + initiated by the application. It can't close connections owned by + other applications or agents. + + :param device: The device address or name. + :param port: If connected to a non-BLE device, the optional port to + disconnect. + :param service_uuid: The optional UUID of the service to disconnect + from, for non-BLE devices. + """ + dev = self._get_device(device) + uuid = BluetoothService.to_uuid(service_uuid) if service_uuid else None + err = None + success = False + + for manager in self._managers.values(): + try: + manager.disconnect(dev.address, port=port, service_uuid=uuid) + success = True + except Exception as e: + err = e + + assert success, f'Could not disconnect from {device}: {err}' + + @action + def scan_pause(self, duration: Optional[float] = None): + """ + Pause the scanning thread. + + :param duration: For how long the scanning thread should be paused + (default: null = indefinitely). + """ + self._scan_state_set(False, duration) + + @action + def scan_resume(self, duration: Optional[float] = None): + """ + Resume the scanning thread, if inactive. + + :param duration: For how long the scanning thread should be running + (default: null = indefinitely). + """ + self._scan_state_set(True, duration) + + @action + def scan( + self, + duration: Optional[float] = None, + devices: Optional[Collection[str]] = None, + service_uuids: Optional[Collection[RawServiceClass]] = None, + ) -> List[BluetoothDevice]: + """ + Scan for Bluetooth devices nearby and return the results as a list of + entities. + + :param duration: Scan duration in seconds (default: same as the plugin's + `poll_interval` configuration parameter) + :param devices: List of device addresses or names to scan for. + :param service_uuids: List of service UUIDs to discover. Default: all. + """ + scanned_device_addresses = set() + duration = duration or self.poll_interval or self._default_scan_duration + uuids = {BluetoothService.to_uuid(uuid) for uuid in (service_uuids or [])} + + for manager in self._managers.values(): + scanned_device_addresses.update( + [ + device.address + for device in manager.scan(duration=duration // len(self._managers)) + if (not uuids or any(srv.uuid in uuids for srv in device.services)) + and ( + not devices + or device.address in devices + or device.name in devices + ) + ] + ) + + with get_plugin(DbPlugin).get_session( + autoflush=False, autocommit=False, expire_on_commit=False + ) as session: + return [ + d.copy() + for d in session.query(BluetoothDevice).all() + if d.address in scanned_device_addresses + ] + + @action + def read( + self, + device: str, + service_uuid: RawServiceClass, + interface: Optional[str] = None, + connect_timeout: Optional[float] = None, + ) -> str: + """ + Read a message from a device. + + :param device: Name or address of the device to read from. + :param service_uuid: Service UUID. + :param interface: Bluetooth adapter name to use (default configured if None). + :param connect_timeout: Connection timeout in seconds (default: same as the + configured `connect_timeout`). + :return: The base64-encoded response received from the device. + """ + dev = self._get_device(device) + uuid = BluetoothService.to_uuid(service_uuid) + manager = self._manager_by_device(dev, service_uuid=uuid) + data = manager.read( + dev.address, uuid, interface=interface, connect_timeout=connect_timeout + ) + return base64.b64encode(data).decode() + + @action + def write( + self, + device: str, + data: str, + service_uuid: RawServiceClass, + interface: Optional[str] = None, + connect_timeout: Optional[float] = None, + ): + """ + Writes data to a device + + :param device: Name or address of the device to read from. + :param data: Data to be written, as a base64-encoded string. + :param service_uuid: Service UUID. + :param interface: Bluetooth adapter name to use (default configured if None) + :param connect_timeout: Connection timeout in seconds (default: same as the + configured `connect_timeout`). + """ + binary_data = base64.b64decode(data.encode()) + dev = self._get_device(device) + uuid = BluetoothService.to_uuid(service_uuid) + manager = self._manager_by_device(dev, service_uuid=uuid) + manager.write( + dev.address, + binary_data, + service_uuid=uuid, + interface=interface, + connect_timeout=connect_timeout, + ) + + @action + def send_file( + self, + file: str, + device: str, + data: Optional[Union[str, bytes, bytearray]] = None, + binary: bool = False, + ): + """ + Send a file to a device that exposes an OBEX Object Push service. + + :param file: Path of the file to be sent. If ``data`` is specified + then ``file`` should include the proposed file on the + receiving host. + :param data: Alternatively to a file on disk you can send raw (string + or binary) content. + :param device: Device address or name. + :param binary: Set to true if data is a base64-encoded binary string. + """ + from ._file import FileSender + + if not data: + file = os.path.abspath(os.path.expanduser(file)) + with open(file, 'rb') as f: + binary_data = f.read() + else: + if binary: + binary_data = base64.b64decode( + data.encode() if isinstance(data, str) else data + ) + + sender = FileSender(self._managers[LegacyManager]) # type: ignore + sender.send_file(file, device, binary_data) + + @override + @action + def status( + self, + *_, + duration: Optional[float] = None, + devices: Optional[Collection[str]] = None, + service_uuids: Optional[Collection[RawServiceClass]] = None, + **__, + ) -> List[BluetoothDevice]: + """ + Retrieve the status of all the devices, or the matching + devices/services. + + If scanning is currently disabled, it will enable it and perform a + scan. + + The differences between this method and :meth:`.scan` are: + + 1. :meth:`.status` will return the status of all the devices known + to the application, while :meth:`.scan` will return the status + only of the devices discovered in the provided time window. + + 2. :meth:`.status` will not initiate a new scan if scanning is + already enabled (it will only returned the status of the known + devices), while :meth:`.scan` will initiate a new scan. + + :param duration: Scan duration in seconds, if scanning is disabled + (default: same as the plugin's `poll_interval` configuration + parameter) + :param devices: List of device addresses or names to filter for. + Default: all. + :param service_uuids: List of service UUIDs to filter for. Default: + all. + """ + if not self._scan_enabled.is_set(): + self.scan( + duration=duration, + devices=devices, + service_uuids=service_uuids, + ) + + with get_plugin(DbPlugin).get_session( + autoflush=False, autocommit=False, expire_on_commit=False + ) as session: + known_devices = [ + d.copy() + for d in session.query(BluetoothDevice).all() + if (not devices or d.address in devices or d.name in devices) + and ( + not service_uuids + or any(str(srv.uuid) in service_uuids for srv in d.services) + ) + ] + + # Send entity update events to keep any asynchronous clients in sync + get_entities_engine().notify(*known_devices) + return known_devices + + @override + def transform_entities( + self, entities: Collection[BluetoothDevice] + ) -> Collection[BluetoothDevice]: + return super().transform_entities(entities) + + @override + def main(self): + self._refresh_cache() + self._init_bluetooth_managers() + + for manager in self._managers.values(): + manager.start() + + try: + while not self.should_stop(): + try: + device = self._device_queue.get(timeout=1) + except Empty: + continue + + device = self._device_cache.add(device) + self.publish_entities([device], callback=self._device_cache.add) + finally: + self.stop() + + @override + def stop(self): + """ + Upon stop request, it stops any pending scans and closes all active + connections. + """ + super().stop() + + # Stop any pending scan controller timers + self._cancel_scan_controller_timer() + + # Set the stop events on the manager threads + for manager in self._managers.values(): + if manager and manager.is_alive(): + self.logger.info('Waiting for %s to stop', manager.name) + try: + manager.stop() + except Exception as e: + self.logger.exception( + 'Error while stopping %s: %s', manager.name, e + ) + + # Wait for the manager threads to stop + stop_timeout = 5 + wait_start = time.time() + + for manager in self._managers.values(): + if ( + manager + and manager.ident != threading.current_thread().ident + and manager.is_alive() + ): + manager.join(timeout=max(0, stop_timeout - (time.time() - wait_start))) + + if manager and manager.is_alive(): + self.logger.warning( + 'Timeout while waiting for %s to stop', manager.name + ) + + +__all__ = ["BluetoothPlugin"] + + +# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/bluetooth/_legacy/_model/_service/_types.py b/platypush/plugins/bluetooth/_types.py similarity index 100% rename from platypush/plugins/bluetooth/_legacy/_model/_service/_types.py rename to platypush/plugins/bluetooth/_types.py diff --git a/platypush/plugins/bluetooth/ble/__init__.py b/platypush/plugins/bluetooth/ble/__init__.py deleted file mode 100644 index 545d7e782..000000000 --- a/platypush/plugins/bluetooth/ble/__init__.py +++ /dev/null @@ -1,628 +0,0 @@ -import base64 -import asyncio -from contextlib import asynccontextmanager -from dataclasses import dataclass -from threading import RLock, Timer -import threading -from time import time -from typing import ( - Any, - AsyncGenerator, - Collection, - Final, - List, - Optional, - Dict, - Type, - Union, -) -from uuid import UUID - -from bleak import BleakClient, BleakScanner -from bleak.backends.device import BLEDevice -from bleak.backends.scanner import AdvertisementData -from typing_extensions import override - -from platypush.context import get_bus, get_or_create_event_loop -from platypush.entities import Entity, EntityManager -from platypush.entities.bluetooth import BluetoothDevice -from platypush.message.event.bluetooth import ( - BluetoothDeviceBlockedEvent, - BluetoothDeviceConnectedEvent, - BluetoothDeviceDisconnectedEvent, - BluetoothDeviceFoundEvent, - BluetoothDeviceLostEvent, - BluetoothDeviceNewDataEvent, - BluetoothDevicePairedEvent, - BluetoothDeviceSignalUpdateEvent, - BluetoothDeviceTrustedEvent, - BluetoothDeviceUnblockedEvent, - BluetoothDeviceUnpairedEvent, - BluetoothDeviceUntrustedEvent, - BluetoothDeviceEvent, - BluetoothScanPausedEvent, - BluetoothScanResumedEvent, -) -from platypush.plugins import AsyncRunnablePlugin, action - -from ._mappers import device_to_entity, parse_device_args - -UUIDType = Union[str, UUID] - - -@dataclass -class _BluetoothConnection: - """ - A class to store information and context about a Bluetooth connection. - """ - - client: BleakClient - device: BLEDevice - loop: asyncio.AbstractEventLoop - close_event: Optional[asyncio.Event] = None - thread: Optional[threading.Thread] = None - - -class BluetoothBlePlugin(AsyncRunnablePlugin, EntityManager): - """ - Plugin to interact with BLE (Bluetooth Low-Energy) devices. - - This plugin uses `_Bleak_ `_ to interact - with the Bluetooth stack and `_Theengs_ `_ - to map the services exposed by the devices into native entities. - - The full list of devices natively supported can be found - `here `_. - - Note that the support for Bluetooth low-energy devices requires a Bluetooth - adapter compatible with the Bluetooth 5.0 specification or higher. - - Requires: - - * **bleak** (``pip install bleak``) - * **bluetooth-numbers** (``pip install bluetooth-numbers``) - * **TheengsGateway** (``pip install git+https://github.com/BlackLight/TheengsGateway``) - - Triggers: - - * :class:`platypush.message.event.bluetooth.BluetoothDeviceBlockedEvent` - * :class:`platypush.message.event.bluetooth.BluetoothDeviceConnectedEvent` - * :class:`platypush.message.event.bluetooth.BluetoothDeviceDisconnectedEvent` - * :class:`platypush.message.event.bluetooth.BluetoothDeviceFoundEvent` - * :class:`platypush.message.event.bluetooth.BluetoothDeviceLostEvent` - * :class:`platypush.message.event.bluetooth.BluetoothDevicePairedEvent` - * :class:`platypush.message.event.bluetooth.BluetoothDeviceTrustedEvent` - * :class:`platypush.message.event.bluetooth.BluetoothDeviceUnblockedEvent` - * :class:`platypush.message.event.bluetooth.BluetoothDeviceUnpairedEvent` - * :class:`platypush.message.event.bluetooth.BluetoothDeviceUntrustedEvent` - * :class:`platypush.message.event.bluetooth.BluetoothScanPausedEvent` - * :class:`platypush.message.event.bluetooth.BluetoothScanResumedEvent` - * :class:`platypush.message.event.entities.EntityUpdateEvent` - - """ - - _default_connect_timeout: Final[int] = 10 - """ Default connection timeout (in seconds) """ - - _rssi_update_interval: Final[int] = 30 - """ - How long we should wait before triggering an update event upon a new - RSSI update, in seconds. - """ - - def __init__( - self, - interface: Optional[str] = None, - connect_timeout: float = _default_connect_timeout, - device_names: Optional[Dict[str, str]] = None, - uuids: Optional[Collection[UUIDType]] = None, - scan_paused_on_start: bool = False, - **kwargs, - ): - """ - :param interface: Name of the Bluetooth interface to use (e.g. ``hci0`` - on Linux). Default: first available interface. - :param connect_timeout: Timeout in seconds for the connection to a - Bluetooth device. Default: 10 seconds. - :param uuids: List of service/characteristic UUIDs to discover. - Default: all. - :param device_names: Bluetooth address -> device name mapping. If not - specified, the device's advertised name will be used, or its - Bluetooth address. Example: - - .. code-block:: json - - { - "00:11:22:33:44:55": "Switchbot", - "00:11:22:33:44:56": "Headphones", - "00:11:22:33:44:57": "Button" - } - - :param scan_paused_on_start: If ``True``, the plugin will not the - scanning thread until :meth:`.scan_resume` is called (default: - ``False``). - - """ - super().__init__(**kwargs) - - self._interface: Optional[str] = interface - self._connect_timeout: float = connect_timeout - self._uuids: Collection[Union[str, UUID]] = uuids or [] - self._scan_lock = RLock() - self._scan_enabled = asyncio.Event() - self._scan_controller_timer: Optional[Timer] = None - self._connections: Dict[str, _BluetoothConnection] = {} - self._connection_locks: Dict[str, asyncio.Lock] = {} - self._devices: Dict[str, BLEDevice] = {} - self._entities: Dict[str, BluetoothDevice] = {} - self._device_last_updated_at: Dict[str, float] = {} - self._device_name_by_addr = device_names or {} - self._device_addr_by_name = { - name: addr for addr, name in self._device_name_by_addr.items() - } - - if not scan_paused_on_start: - self._scan_enabled.set() - - async def _get_device(self, device: str) -> BLEDevice: - """ - Utility method to get a device by name or address. - """ - addr = ( - self._device_addr_by_name[device] - if device in self._device_addr_by_name - else device - ) - - if addr not in self._devices: - self.logger.info('Scanning for unknown device "%s"', device) - await self._scan() - - dev = self._devices.get(addr) - assert dev is not None, f'Unknown device: "{device}"' - return dev - - def _post_event( - self, event_type: Type[BluetoothDeviceEvent], device: BLEDevice, **kwargs - ): - if event_type == BluetoothDeviceNewDataEvent: - # Skip BluetoothDeviceNewDataEvent. They are only generated to - # infer if the device has sent some new data, but they can be very - # noisy and affect the latency - especially in case of connections - # that exchange a lot of data. - return - - get_bus().post( - event_type(address=device.address, **parse_device_args(device), **kwargs) - ) - - def _on_device_event(self, device: BLEDevice, data: AdvertisementData): - """ - Device advertisement packet callback handler. - - 1. It generates the relevant - :class:`platypush.message.event.bluetooth.BluetoothDeviceEvent` if the - state of the device has changed. - - 2. It builds the relevant - :class:`platypush.entity.bluetooth.BluetoothDevice` entity object - populated with children entities that contain the supported - properties. - - :param device: The Bluetooth device. - :param data: The advertisement data. - """ - - event_types: List[Type[BluetoothDeviceEvent]] = [] - entity = device_to_entity(device, data) - existing_entity = self._entities.get(device.address) - existing_device = self._devices.get(device.address) - - if existing_entity and existing_device: - if existing_entity.paired != entity.paired: - event_types.append( - BluetoothDevicePairedEvent - if entity.paired - else BluetoothDeviceUnpairedEvent - ) - - if existing_entity.connected != entity.connected: - event_types.append( - BluetoothDeviceConnectedEvent - if entity.connected - else BluetoothDeviceDisconnectedEvent - ) - - if existing_entity.blocked != entity.blocked: - event_types.append( - BluetoothDeviceBlockedEvent - if entity.blocked - else BluetoothDeviceUnblockedEvent - ) - - if existing_entity.trusted != entity.trusted: - event_types.append( - BluetoothDeviceTrustedEvent - if entity.trusted - else BluetoothDeviceUntrustedEvent - ) - - if ( - time() - self._device_last_updated_at.get(device.address, 0) - ) >= self._rssi_update_interval and ( - existing_entity.rssi != device.rssi - or existing_entity.tx_power != entity.tx_power - ): - event_types.append(BluetoothDeviceSignalUpdateEvent) - - if ( - existing_device.metadata.get('manufacturer_data', {}) - != device.metadata.get('manufacturer_data', {}) - ) or ( - existing_device.details.get('props', {}).get('ServiceData', {}) - != device.details.get('props', {}).get('ServiceData', {}) - ): - event_types.append(BluetoothDeviceNewDataEvent) - else: - event_types.append(BluetoothDeviceFoundEvent) - - self._devices[device.address] = device - if device.name: - self._device_name_by_addr[device.address] = device.name - self._device_addr_by_name[device.name] = device.address - - if event_types: - for event_type in event_types: - self._post_event(event_type, device) - self._device_last_updated_at[device.address] = time() - - # Explicitly connect the child entities to their parent before - # flushing them - for child in entity.children: - child.parent = entity - - self.publish_entities([entity]) - - def _has_changed(self, entity: BluetoothDevice) -> bool: - existing_entity = self._entities.get(entity.id or entity.external_id) - - # If the entity didn't exist before, it's a new device. - if not existing_entity: - return True - - entity_dict = entity.to_json() - existing_entity_dict = entity.to_json() - - # Check if any of the root attributes changed, excluding those that are - # managed by the entities engine). - return any( - attr - for attr, value in entity_dict.items() - if value != existing_entity_dict.get(attr) - and attr not in {'id', 'external_id', 'plugin', 'updated_at'} - ) - - @asynccontextmanager - async def _connect( - self, - device: str, - interface: Optional[str] = None, - timeout: Optional[float] = None, - close_event: Optional[asyncio.Event] = None, - ) -> AsyncGenerator[_BluetoothConnection, None]: - dev = await self._get_device(device) - - async with self._connection_locks.get(dev.address, asyncio.Lock()) as lock: - self._connection_locks[dev.address] = lock or asyncio.Lock() - - async with BleakClient( - dev.address, - adapter=interface or self._interface, - timeout=timeout or self._connect_timeout, - ) as client: - assert client, f'Could not connect to the device {device}' - self.logger.info('Connected to device %s', device) - self._connections[dev.address] = _BluetoothConnection( - client=client, - device=dev, - loop=asyncio.get_event_loop(), - thread=threading.current_thread(), - close_event=close_event, - ) - yield self._connections[dev.address] - self._connections.pop(dev.address, None) - self.logger.info('Disconnected from device %s', device) - - async def _read( - self, - device: str, - service_uuid: UUIDType, - interface: Optional[str] = None, - connect_timeout: Optional[float] = None, - ) -> bytearray: - async with self._connect(device, interface, connect_timeout) as conn: - data = await conn.client.read_gatt_char(service_uuid) - - return data - - async def _write( - self, - device: str, - data: bytes, - service_uuid: UUIDType, - interface: Optional[str] = None, - connect_timeout: Optional[float] = None, - ): - async with self._connect(device, interface, connect_timeout) as conn: - await conn.client.write_gatt_char(service_uuid, data) - - async def _scan( - self, - duration: Optional[float] = None, - uuids: Optional[Collection[UUIDType]] = None, - ) -> Collection[Entity]: - with self._scan_lock: - timeout = duration or self.poll_interval or 5 - devices = await BleakScanner.discover( - adapter=self._interface, - timeout=timeout, - service_uuids=list(map(str, uuids or self._uuids or [])), - detection_callback=self._on_device_event, - ) - - self._devices.update({dev.address: dev for dev in devices}) - addresses = {dev.address.lower() for dev in devices} - return [ - dev - for addr, dev in self._entities.items() - if isinstance(dev, BluetoothDevice) - and addr.lower() in addresses - and dev.reachable - ] - - async def _scan_state_set(self, state: bool, duration: Optional[float] = None): - def timer_callback(): - if state: - self.scan_pause() - else: - self.scan_resume() - - self._scan_controller_timer = None - - with self._scan_lock: - if not state and self._scan_enabled.is_set(): - get_bus().post(BluetoothScanPausedEvent(duration=duration)) - elif state and not self._scan_enabled.is_set(): - get_bus().post(BluetoothScanResumedEvent(duration=duration)) - - if state: - self._scan_enabled.set() - else: - self._scan_enabled.clear() - - if duration and not self._scan_controller_timer: - self._scan_controller_timer = Timer(duration, timer_callback) - self._scan_controller_timer.start() - - @action - def connect( - self, - device: str, - interface: Optional[str] = None, - timeout: Optional[float] = None, - ): - """ - Pair and connect to a device by address or name. - - :param device: The device address or name. - :param interface: The Bluetooth interface to use (it overrides the - default ``interface``). - :param timeout: The connection timeout in seconds (it overrides the - default ``connect_timeout``). - """ - timeout = timeout or self._connect_timeout - connected_event = threading.Event() - close_event = asyncio.Event() - loop = asyncio.new_event_loop() - - def connect_thread(): - async def connect_wrapper(): - async with self._connect(device, interface, timeout, close_event): - connected_event.set() - await close_event.wait() - - asyncio.set_event_loop(loop) - loop.run_until_complete(connect_wrapper()) - - loop = get_or_create_event_loop() - connector = threading.Thread(target=connect_thread) - connector.start() - success = connected_event.wait(timeout=timeout) - assert success, f'Connection to {device} timed out' - - @action - def disconnect(self, device: str): - """ - Close an active connection to a device. - - :param device: The device address or name. - """ - address: Optional[str] = device - if not self._connections.get(device): - address = self._device_addr_by_name.get(device) - - assert address, f'Device {device} not found' - connection = self._connections.get(address, None) - assert connection, f'No active connections to the device {device} were found' - - if connection.close_event: - connection.close_event.set() - if connection.thread and connection.thread.is_alive(): - connection.thread.join(timeout=5) - assert not ( - connection.thread and connection.thread.is_alive() - ), f'Disconnection from {device} timed out' - - @action - def scan_pause(self, duration: Optional[float] = None): - """ - Pause the scanning thread. - - :param duration: For how long the scanning thread should be paused - (default: null = indefinitely). - """ - if self._loop: - asyncio.ensure_future( - self._scan_state_set(False, duration), loop=self._loop - ) - - @action - def scan_resume(self, duration: Optional[float] = None): - """ - Resume the scanning thread, if inactive. - - :param duration: For how long the scanning thread should be running - (default: null = indefinitely). - """ - if self._loop: - asyncio.ensure_future(self._scan_state_set(True, duration), loop=self._loop) - - @action - def scan( - self, - duration: Optional[float] = None, - uuids: Optional[Collection[UUIDType]] = None, - ): - """ - Scan for Bluetooth devices nearby and return the results as a list of - entities. - - :param duration: Scan duration in seconds (default: same as the plugin's - `poll_interval` configuration parameter) - :param uuids: List of characteristic UUIDs to discover. Default: all. - """ - loop = get_or_create_event_loop() - return loop.run_until_complete(self._scan(duration, uuids)) - - @action - def read( - self, - device: str, - service_uuid: UUIDType, - interface: Optional[str] = None, - connect_timeout: Optional[float] = None, - ) -> str: - """ - Read a message from a device. - - :param device: Name or address of the device to read from. - :param service_uuid: Service UUID. - :param interface: Bluetooth adapter name to use (default configured if None). - :param connect_timeout: Connection timeout in seconds (default: same as the - configured `connect_timeout`). - :return: The base64-encoded response received from the device. - """ - loop = get_or_create_event_loop() - data = loop.run_until_complete( - self._read(device, service_uuid, interface, connect_timeout) - ) - return base64.b64encode(data).decode() - - @action - def write( - self, - device: str, - data: Union[str, bytes], - service_uuid: UUIDType, - interface: Optional[str] = None, - connect_timeout: Optional[float] = None, - ): - """ - Writes data to a device - - :param device: Name or address of the device to read from. - :param data: Data to be written, either as bytes or as a base64-encoded string. - :param service_uuid: Service UUID. - :param interface: Bluetooth adapter name to use (default configured if None) - :param connect_timeout: Connection timeout in seconds (default: same as the - configured `connect_timeout`). - """ - loop = get_or_create_event_loop() - if isinstance(data, str): - data = base64.b64decode(data.encode()) - - loop.run_until_complete( - self._write(device, data, service_uuid, interface, connect_timeout) - ) - - @override - @action - def status(self, *_, **__) -> Collection[Entity]: - """ - Alias for :meth:`.scan`. - """ - return self.scan().output - - @override - def publish_entities( - self, entities: Optional[Collection[Any]] - ) -> Collection[Entity]: - self._entities.update({entity.id: entity for entity in (entities or [])}) - - return super().publish_entities(entities) - - @override - def transform_entities( - self, entities: Collection[Union[BLEDevice, BluetoothDevice]] - ) -> Collection[BluetoothDevice]: - return [ - BluetoothDevice( - id=dev.address, - **parse_device_args(dev), - ) - if isinstance(dev, BLEDevice) - else dev - for dev in entities - ] - - @override - async def listen(self): - device_addresses = set() - - while True: - await self._scan_enabled.wait() - entities = await self._scan(uuids=self._uuids) - - new_device_addresses = {e.external_id for e in entities} - missing_device_addresses = device_addresses - new_device_addresses - missing_devices = [ - dev - for addr, dev in self._devices.items() - if addr in missing_device_addresses - ] - - for dev in missing_devices: - self._post_event(BluetoothDeviceLostEvent, dev) - self._devices.pop(dev.address, None) - self._entities.pop(dev.address, None) - - device_addresses = new_device_addresses - - @override - def stop(self): - if self._scan_controller_timer: - self._scan_controller_timer.cancel() - - connections = list(self._connections.values()) - for conn in connections: - try: - self.disconnect(conn.device.address) - except Exception as e: - self.logger.warning( - 'Error while disconnecting from %s: %s', conn.device.address, e - ) - - super().stop() - - -# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/bluetooth/ble/manifest.yaml b/platypush/plugins/bluetooth/ble/manifest.yaml deleted file mode 100644 index 35de98b33..000000000 --- a/platypush/plugins/bluetooth/ble/manifest.yaml +++ /dev/null @@ -1,22 +0,0 @@ -manifest: - events: - platypush.message.event.bluetooth.BluetoothDeviceBlockedEvent: - platypush.message.event.bluetooth.BluetoothDeviceConnectedEvent: - platypush.message.event.bluetooth.BluetoothDeviceDisconnectedEvent: - platypush.message.event.bluetooth.BluetoothDeviceFoundEvent: - platypush.message.event.bluetooth.BluetoothDeviceLostEvent: - platypush.message.event.bluetooth.BluetoothDeviceNewDataEvent: - platypush.message.event.bluetooth.BluetoothDevicePairedEvent: - platypush.message.event.bluetooth.BluetoothDeviceTrustedEvent: - platypush.message.event.bluetooth.BluetoothDeviceUnblockedEvent: - platypush.message.event.bluetooth.BluetoothDeviceUnpairedEvent: - platypush.message.event.bluetooth.BluetoothDeviceUntrustedEvent: - platypush.message.event.bluetooth.BluetoothScanPausedEvent: - platypush.message.event.bluetooth.BluetoothScanResumedEvent: - install: - pip: - - bleak - - bluetooth-numbers - - git+https://github.com/BlackLight/TheengsGateway - package: platypush.plugins.bluetooth.ble - type: plugin diff --git a/platypush/plugins/bluetooth/manifest.yaml b/platypush/plugins/bluetooth/manifest.yaml index 75ab682b7..37014e30a 100644 --- a/platypush/plugins/bluetooth/manifest.yaml +++ b/platypush/plugins/bluetooth/manifest.yaml @@ -1,8 +1,23 @@ manifest: - events: {} + events: + platypush.message.event.bluetooth.BluetoothConnectionFailedEvent: + platypush.message.event.bluetooth.BluetoothDeviceConnectedEvent: + platypush.message.event.bluetooth.BluetoothDeviceDisconnectedEvent: + platypush.message.event.bluetooth.BluetoothDeviceFoundEvent: + platypush.message.event.bluetooth.BluetoothDeviceLostEvent: + platypush.message.event.bluetooth.BluetoothFileReceivedEvent: + platypush.message.event.bluetooth.BluetoothFileSentEvent: + platypush.message.event.bluetooth.BluetoothFileTransferCancelledEvent: + platypush.message.event.bluetooth.BluetoothFileTransferStartedEvent: + platypush.message.event.bluetooth.BluetoothScanPausedEvent: + platypush.message.event.bluetooth.BluetoothScanResumedEvent: + platypush.message.event.entities.EntityUpdateEvent: install: pip: - - pybluez - - pyobex + - bleak + - bluetooth-numbers + - git+https://github.com/pybluez/pybluez + - git+https://github.com/theengs/gateway + - git+https://github.com/BlackLight/PyOBEX package: platypush.plugins.bluetooth type: plugin diff --git a/platypush/plugins/bluetooth/model.py b/platypush/plugins/bluetooth/model.py new file mode 100644 index 000000000..32a12e476 --- /dev/null +++ b/platypush/plugins/bluetooth/model.py @@ -0,0 +1,17 @@ +from ._model import ( + MajorDeviceClass, + MajorServiceClass, + MinorDeviceClass, + Protocol, + ServiceClass, +) +from ._types import RawServiceClass + +__all__ = [ + "MajorDeviceClass", + "MajorServiceClass", + "MinorDeviceClass", + "Protocol", + "RawServiceClass", + "ServiceClass", +] diff --git a/setup.py b/setup.py index cdde64c17..e38afc126 100755 --- a/setup.py +++ b/setup.py @@ -177,8 +177,9 @@ setup( 'bluetooth': [ 'bleak', 'bluetooth-numbers', - 'pybluez', - 'pyobex @ https://github.com/BlackLight/PyOBEX/tarball/master', + 'pybluez @ https://github.com/pybluez/pybluez/tarball/master', + 'PyOBEX @ https://github.com/BlackLight/PyOBEX/tarball/master', + 'TheengsGateway @ https://github.com/theengs/gateway/tarball/development', ], # Support for TP-Link devices 'tplink': ['pyHS100'],