Added native support for switch entities to switchbot.bluetooth plugin

This commit is contained in:
Fabio Manganiello 2022-04-04 21:12:59 +02:00
parent 91ff8d811f
commit b9c78ad913
Signed by untrusted user: blacklight
GPG key ID: D90FBA7F76362774

View file

@ -1,6 +1,6 @@
import enum
import time
from typing import List
from typing import List, Optional
from platypush.message.response.bluetooth import BluetoothScanResponse
from platypush.plugins import action
@ -8,7 +8,9 @@ from platypush.plugins.bluetooth.ble import BluetoothBlePlugin
from platypush.plugins.switch import SwitchPlugin
class SwitchbotBluetoothPlugin(SwitchPlugin, BluetoothBlePlugin): # lgtm [py/missing-call-to-init]
class SwitchbotBluetoothPlugin( # lgtm [py/missing-call-to-init]
SwitchPlugin, BluetoothBlePlugin
):
"""
Plugin to interact with a Switchbot (https://www.switch-bot.com/) device and
programmatically control switches over a Bluetooth interface.
@ -31,6 +33,7 @@ class SwitchbotBluetoothPlugin(SwitchPlugin, BluetoothBlePlugin): # lgtm [py/m
"""
Base64 encoded commands
"""
# \x57\x01\x00
PRESS = 'VwEA'
# # \x57\x01\x01
@ -38,8 +41,14 @@ class SwitchbotBluetoothPlugin(SwitchPlugin, BluetoothBlePlugin): # lgtm [py/m
# # \x57\x01\x02
OFF = 'VwEC'
def __init__(self, interface=None, connect_timeout=None,
scan_timeout=2, devices=None, **kwargs):
def __init__(
self,
interface=None,
connect_timeout=None,
scan_timeout=2,
devices=None,
**kwargs
):
"""
:param interface: Bluetooth interface to use (e.g. hci0) default: first available one
:type interface: str
@ -59,17 +68,21 @@ class SwitchbotBluetoothPlugin(SwitchPlugin, BluetoothBlePlugin): # lgtm [py/m
self.scan_timeout = scan_timeout if scan_timeout else 2
self.configured_devices = devices or {}
self.configured_devices_by_name = {
name: addr
for addr, name in self.configured_devices.items()
name: addr for addr, name in self.configured_devices.items()
}
def _run(self, device: str, command: Command):
if device in self.configured_devices_by_name:
device = self.configured_devices_by_name[device]
device = self.configured_devices_by_name.get(device, '')
n_tries = 1
try:
self.write(device, command.value, handle=self.handle, channel_type='random', binary=True)
self.write(
device,
command.value,
handle=self.handle,
channel_type='random',
binary=True,
)
except Exception as e:
self.logger.exception(e)
n_tries -= 1
@ -78,7 +91,7 @@ class SwitchbotBluetoothPlugin(SwitchPlugin, BluetoothBlePlugin): # lgtm [py/m
raise e
time.sleep(5)
return self.status(device)
return self.status(device) # type: ignore
@action
def press(self, device):
@ -91,11 +104,11 @@ class SwitchbotBluetoothPlugin(SwitchPlugin, BluetoothBlePlugin): # lgtm [py/m
return self._run(device, self.Command.PRESS)
@action
def toggle(self, device, **kwargs):
def toggle(self, device, **_):
return self.press(device)
@action
def on(self, device, **kwargs):
def on(self, device, **_):
"""
Send a press-on button command to a device
@ -105,7 +118,7 @@ class SwitchbotBluetoothPlugin(SwitchPlugin, BluetoothBlePlugin): # lgtm [py/m
return self._run(device, self.Command.ON)
@action
def off(self, device, **kwargs):
def off(self, device, **_):
"""
Send a press-off button command to a device
@ -115,7 +128,9 @@ class SwitchbotBluetoothPlugin(SwitchPlugin, BluetoothBlePlugin): # lgtm [py/m
return self._run(device, self.Command.OFF)
@action
def scan(self, interface: str = None, duration: int = 10) -> BluetoothScanResponse:
def scan(
self, interface: Optional[str] = None, duration: int = 10
) -> BluetoothScanResponse:
"""
Scan for available Switchbot devices nearby.
@ -129,9 +144,13 @@ class SwitchbotBluetoothPlugin(SwitchPlugin, BluetoothBlePlugin): # lgtm [py/m
for dev in devices:
try:
characteristics = [
chrc for chrc in self.discover_characteristics(
dev['addr'], channel_type='random', wait=False,
timeout=self.scan_timeout).characteristics
chrc
for chrc in self.discover_characteristics(
dev['addr'],
channel_type='random',
wait=False,
timeout=self.scan_timeout,
).characteristics
if chrc.get('uuid') == self.uuid
]
@ -140,10 +159,12 @@ class SwitchbotBluetoothPlugin(SwitchPlugin, BluetoothBlePlugin): # lgtm [py/m
except Exception as e:
self.logger.warning('Device scan error', e)
self.publish_entities(compatible_devices) # type: ignore
return BluetoothScanResponse(devices=compatible_devices)
@property
def switches(self) -> List[dict]:
self.publish_entities(self.configured_devices) # type: ignore
return [
{
'address': addr,
@ -154,5 +175,19 @@ class SwitchbotBluetoothPlugin(SwitchPlugin, BluetoothBlePlugin): # lgtm [py/m
for addr, name in self.configured_devices.items()
]
def transform_entities(self, devices: dict):
from platypush.entities.switches import Switch
return super().transform_entities( # type: ignore
[
Switch(
id=addr,
name=name,
state=False,
)
for addr, name in devices.items()
]
)
# vim:sw=4:ts=4:et: