Migrated/refactored switchbot.bluetooth integration.

- Out `gattlib` + `pybluez`, in `bleak`. It's not platform-dependent, it doesn't
  require libboost and other heavy build dependencies, and it doesn't require the
  user that runs the service from having special privileges to access raw
  Bluetooth sockets.

- Better integration with Platypush native entities. The devices are now mapped
  to write-only `EnumSwitch` entities, and the status returns the serialized
  representation of those entities instead of the previous intermediate
  representation.
This commit is contained in:
Fabio Manganiello 2023-02-08 22:42:00 +01:00
parent 35719b0da9
commit 8469a1027f
Signed by untrusted user: blacklight
GPG key ID: D90FBA7F76362774
4 changed files with 135 additions and 121 deletions

View file

@ -296,6 +296,7 @@ autodoc_mock_imports = [
'aiofiles',
'aiofiles.os',
'async_lru',
'bleak',
]
sys.path.insert(0, os.path.abspath('../..'))

View file

@ -1,131 +1,154 @@
import enum
import time
from typing import Any, Collection, Dict, List, Optional
from uuid import UUID
from threading import RLock
from typing import Collection, Dict, Optional, Union
from platypush.entities import EnumSwitchEntityManager
from bleak import BleakClient, BleakScanner
from bleak.backends.device import BLEDevice
from platypush.context import get_or_create_event_loop
from platypush.entities import Entity, EnumSwitchEntityManager
from platypush.entities.switches import EnumSwitch
from platypush.message.response.bluetooth import BluetoothScanResponse
from platypush.plugins import action
from platypush.plugins.bluetooth.ble import BluetoothBlePlugin
from platypush.plugins import AsyncRunnablePlugin, action
# pylint: disable=too-many-ancestors
class SwitchbotBluetoothPlugin(BluetoothBlePlugin, EnumSwitchEntityManager):
class SwitchbotBluetoothPlugin(AsyncRunnablePlugin, EnumSwitchEntityManager):
"""
Plugin to interact with a Switchbot (https://www.switch-bot.com/) device and
programmatically control switches over a Bluetooth interface.
See :class:`platypush.plugins.bluetooth.ble.BluetoothBlePlugin` for how to enable BLE permissions for
the platypush user (a simple solution may be to run it as root, but that's usually NOT a good idea).
Note that this plugin currently only supports Switchbot "bot" devices
(mechanical switch pressers). For support for other devices, you may want
the :class:`platypush.plugins.switchbot.SwitchbotPlugin` integration
(which requires a Switchbot hub).
Requires:
* **pybluez** (``pip install pybluez``)
* **gattlib** (``pip install gattlib``)
* **libboost** (on Debian ```apt-get install libboost-python-dev libboost-thread-dev``)
* **bleak** (``pip install bleak``)
"""
uuid = 'cba20002-224d-11e6-9fb8-0002a5d5c51b'
handle = 0x16
# Bluetooth UUID prefixes exposed by SwitchBot devices
_uuid_prefixes = {
'tx': '002',
'rx': '003',
'service': 'd00',
}
# Static list of Bluetooth UUIDs commonly exposed by SwitchBot devices.
_uuids = {
service: UUID(f'cba20{prefix}-224d-11e6-9fb8-0002a5d5c51b')
for service, prefix in _uuid_prefixes.items()
}
class Command(enum.Enum):
"""
Base64 encoded commands
Supported commands.
"""
# \x57\x01\x00
PRESS = 'VwEA'
# # \x57\x01\x01
ON = 'VwEB'
# # \x57\x01\x02
OFF = 'VwEC'
PRESS = b'\x57\x01\x00'
ON = b'\x57\x01\x01'
OFF = b'\x57\x01\x02'
def __init__(
self,
interface=None,
connect_timeout=None,
scan_timeout=2,
devices=None,
**kwargs
connect_timeout: Optional[float] = 5,
device_names: Optional[Dict[str, str]] = None,
**kwargs,
):
"""
:param interface: Bluetooth interface to use (e.g. hci0) default: first available one
:type interface: str
:param connect_timeout: Timeout in seconds for the connection to the
Switchbot device. Default: 5 seconds
:param device_names: Bluetooth address -> device name mapping. If not
specified, the device's address will be used as a name as well.
:param connect_timeout: Timeout for the connection to the Switchbot device - default: None
:type connect_timeout: float
Example:
.. code-block:: json
:param scan_timeout: Timeout for the scan operations
:type scan_timeout: float
{
'00:11:22:33:44:55': 'My Switchbot',
'00:11:22:33:44:56': 'My Switchbot 2',
'00:11:22:33:44:57': 'My Switchbot 3'
}
:param devices: Devices to control, as a MAC address -> name map
:type devices: dict
"""
super().__init__(interface=interface, **kwargs)
super().__init__(**kwargs)
self.connect_timeout = connect_timeout if connect_timeout else 5
self.scan_timeout = scan_timeout if scan_timeout else 2
self.configured_devices = devices or {}
self.configured_devices_by_name = {
name: addr for addr, name in self.configured_devices.items()
self._connect_timeout = connect_timeout if connect_timeout else 5
self._scan_lock = RLock()
self._devices: Dict[str, BLEDevice] = {}
self._device_name_by_addr = device_names or {}
self._device_addr_by_name = {
name: addr for addr, name in self._device_name_by_addr.items()
}
def _run(self, device: str, command: Command):
device = self.configured_devices_by_name.get(device, '')
n_tries = 1
async def _run(
self, device: str, command: Command, uuid: Union[UUID, str] = _uuids['tx']
):
"""
Run a command on a Switchbot device.
try:
self.write(
device,
command.value,
handle=self.handle,
channel_type='random',
binary=True,
)
except Exception as e:
self.logger.exception(e)
n_tries -= 1
:param device: Device name or address.
:param command: Command to run.
:param uuid: On which UUID the command should be sent. Default: the
Switchbot registered ``tx`` service.
"""
dev = await self._get_device(device)
async with BleakClient(dev.address, timeout=self._connect_timeout) as client:
await client.write_gatt_char(str(uuid), command.value)
if n_tries == 0:
raise e
time.sleep(5)
async def _get_device(self, device: str) -> BLEDevice:
"""
Utility method to get a device by name or address.
"""
addr = (
self._device_addr_by_name[device]
if device in self._device_addr_by_name
else device
)
return self.status(device)
if addr not in self._devices:
self.logger.info('Scanning for unknown device "%s"', device)
await self._scan()
dev = self._devices.get(addr)
assert dev is not None, f'Unknown device: "{device}"'
return dev
@action
def press(self, device):
def press(self, device: str):
"""
Send a press button command to a device
:param device: Device name or address
:type device: str
"""
return self._run(device, self.Command.PRESS)
loop = get_or_create_event_loop()
return loop.run_until_complete(self._run(device, self.Command.PRESS))
@action
def toggle(self, device, **_):
return self.press(device)
@action
def on(self, device, **_):
def on(self, device: str, **_):
"""
Send a press-on button command to a device
:param device: Device name or address
:type device: str
"""
return self._run(device, self.Command.ON)
loop = get_or_create_event_loop()
return loop.run_until_complete(self._run(device, self.Command.ON))
@action
def off(self, device, **_):
def off(self, device: str, **_):
"""
Send a press-off button command to a device
:param device: Device name or address
:type device: str
"""
return self._run(device, self.Command.OFF)
loop = get_or_create_event_loop()
return loop.run_until_complete(self._run(device, self.Command.OFF))
@action
# pylint: disable=arguments-differ
@ -152,66 +175,63 @@ class SwitchbotBluetoothPlugin(BluetoothBlePlugin, EnumSwitchEntityManager):
return None
@action
def scan(
self, interface: Optional[str] = None, duration: int = 10
) -> BluetoothScanResponse:
def scan(self, duration: Optional[float] = None) -> Collection[Entity]:
"""
Scan for available Switchbot devices nearby.
:param interface: Bluetooth interface to scan (default: default configured interface)
:param duration: Scan duration in seconds
:param duration: Scan duration in seconds (default: same as the plugin's
`poll_interval` configuration parameter)
:return: The list of discovered Switchbot devices.
"""
compatible_devices: Dict[str, Any] = {}
devices = (
super().scan(interface=interface, duration=duration).devices # type: ignore
)
for dev in devices:
try:
characteristics = [
chrc
for chrc in self.discover_characteristics( # type: ignore
dev['addr'],
channel_type='random',
wait=False,
timeout=self.scan_timeout,
).characteristics
if chrc.get('uuid') == self.uuid
]
if characteristics:
compatible_devices[dev['addr']] = None
except Exception as e:
self.logger.warning('Device scan error: %s', e)
self.publish_entities(compatible_devices)
return BluetoothScanResponse(devices=compatible_devices)
loop = get_or_create_event_loop()
return loop.run_until_complete(self._scan(duration))
@action
def status(self, *_, **__) -> List[dict]:
self.publish_entities(self.configured_devices)
return [
{
'address': addr,
'id': addr,
'name': name,
'on': False,
}
for addr, name in self.configured_devices.items()
def status(self, *_, **__) -> Collection[Entity]:
"""
Alias for :meth:`.scan`.
"""
return self.scan().output
async def _scan(self, duration: Optional[float] = None) -> Collection[Entity]:
with self._scan_lock:
timeout = duration or self.poll_interval or 5
devices = await BleakScanner.discover(timeout=timeout)
compatible_devices = [
d
for d in devices
if set(d.metadata.get('uuids', [])).intersection(
map(str, self._uuids.values())
)
]
new_devices = [
dev for dev in compatible_devices if dev.address not in self._devices
]
def transform_entities(self, entities: Collection[dict]) -> Collection[EnumSwitch]:
self._devices.update({dev.address: dev for dev in compatible_devices})
entities = self.transform_entities(compatible_devices)
self.publish_entities(new_devices)
return entities
def transform_entities(
self, entities: Collection[BLEDevice]
) -> Collection[EnumSwitch]:
return [
EnumSwitch(
id=addr,
name=name,
id=dev.address,
name=self._device_name_by_addr.get(dev.address, dev.name),
value='on',
values=['on', 'off', 'press'],
is_write_only=True,
)
for addr, name in entities
for dev in entities
]
async def listen(self):
while True:
await self._scan()
# vim:sw=4:ts=4:et:

View file

@ -2,12 +2,6 @@ manifest:
events: {}
install:
pip:
- pybluez
- gattlib
apt:
- libboost-python-dev
- libboost-thread-dev
pacman:
- boost-libs
- bleak
package: platypush.plugins.switchbot.bluetooth
type: plugin

View file

@ -175,8 +175,7 @@ setup(
'alexa': ['avs @ https://github.com/BlackLight/avs/tarball/master'],
# Support for bluetooth devices
'bluetooth': [
'pybluez',
'gattlib',
'bleak',
'pyobex @ https://github.com/BlackLight/PyOBEX/tarball/master',
],
# Support for TP-Link devices