platypush/platypush/plugins/switchbot/bluetooth/__init__.py

195 lines
5.6 KiB
Python
Raw Normal View History

import enum
import time
from typing import List, Optional
2018-05-08 15:51:47 +02:00
from platypush.message.response.bluetooth import BluetoothScanResponse
from platypush.plugins import action
from platypush.plugins.bluetooth.ble import BluetoothBlePlugin
from platypush.plugins.switch import SwitchPlugin
2018-05-08 15:51:47 +02:00
2019-07-02 12:02:28 +02:00
class SwitchbotBluetoothPlugin( # lgtm [py/missing-call-to-init]
SwitchPlugin, BluetoothBlePlugin
):
2018-05-08 15:51:47 +02:00
"""
2018-06-25 19:57:43 +02:00
Plugin to interact with a Switchbot (https://www.switch-bot.com/) device and
2021-05-10 18:43:00 +02:00
programmatically control switches over a Bluetooth interface.
2018-06-25 19:57:43 +02:00
See :class:`platypush.plugins.bluetooth.ble.BluetoothBlePlugin` for how to enable BLE permissions for
the platypush user (a simple solution may be to run it as root, but that's usually NOT a good idea).
2018-06-25 19:57:43 +02:00
Requires:
* **pybluez** (``pip install pybluez``)
* **gattlib** (``pip install gattlib``)
* **libboost** (on Debian ```apt-get install libboost-python-dev libboost-thread-dev``)
2021-05-10 18:43:00 +02:00
2018-05-08 15:51:47 +02:00
"""
uuid = 'cba20002-224d-11e6-9fb8-0002a5d5c51b'
handle = 0x16
class Command(enum.Enum):
"""
Base64 encoded commands
"""
# \x57\x01\x00
PRESS = 'VwEA'
# # \x57\x01\x01
ON = 'VwEB'
# # \x57\x01\x02
OFF = 'VwEC'
def __init__(
self,
interface=None,
connect_timeout=None,
scan_timeout=2,
devices=None,
**kwargs
):
2018-06-25 19:57:43 +02:00
"""
:param interface: Bluetooth interface to use (e.g. hci0) default: first available one
:type interface: str
2018-06-25 19:57:43 +02:00
:param connect_timeout: Timeout for the connection to the Switchbot device - default: None
2018-06-25 19:57:43 +02:00
:type connect_timeout: float
:param scan_timeout: Timeout for the scan operations
2018-06-25 19:57:43 +02:00
:type scan_timeout: float
:param devices: Devices to control, as a MAC address -> name map
2018-06-25 19:57:43 +02:00
:type devices: dict
"""
2021-04-06 21:10:48 +02:00
super().__init__(interface=interface, **kwargs)
2019-07-02 12:02:28 +02:00
2018-05-08 15:51:47 +02:00
self.connect_timeout = connect_timeout if connect_timeout else 5
self.scan_timeout = scan_timeout if scan_timeout else 2
self.configured_devices = devices or {}
2019-07-02 12:02:28 +02:00
self.configured_devices_by_name = {
name: addr for addr, name in self.configured_devices.items()
2019-07-02 12:02:28 +02:00
}
2018-05-08 15:51:47 +02:00
def _run(self, device: str, command: Command):
device = self.configured_devices_by_name.get(device, '')
n_tries = 1
try:
self.write(
device,
command.value,
handle=self.handle,
channel_type='random',
binary=True,
)
except Exception as e:
self.logger.exception(e)
n_tries -= 1
if n_tries == 0:
raise e
time.sleep(5)
2019-07-02 12:02:28 +02:00
return self.status(device) # type: ignore
2018-05-08 15:51:47 +02:00
@action
2018-05-08 15:51:47 +02:00
def press(self, device):
2018-06-25 19:57:43 +02:00
"""
Send a press button command to a device
:param device: Device name or address
:type device: str
"""
return self._run(device, self.Command.PRESS)
2018-05-08 15:51:47 +02:00
@action
def toggle(self, device, **_):
2019-07-02 12:02:28 +02:00
return self.press(device)
@action
def on(self, device, **_):
2018-06-25 19:57:43 +02:00
"""
Send a press-on button command to a device
:param device: Device name or address
:type device: str
"""
return self._run(device, self.Command.ON)
2018-05-08 15:51:47 +02:00
@action
def off(self, device, **_):
2018-06-25 19:57:43 +02:00
"""
Send a press-off button command to a device
:param device: Device name or address
:type device: str
"""
return self._run(device, self.Command.OFF)
2018-05-08 15:51:47 +02:00
@action
def scan(
self, interface: Optional[str] = None, duration: int = 10
) -> BluetoothScanResponse:
2019-07-02 12:02:28 +02:00
"""
Scan for available Switchbot devices nearby.
:param interface: Bluetooth interface to scan (default: default configured interface)
:param duration: Scan duration in seconds
2019-07-02 12:02:28 +02:00
"""
devices = super().scan(interface=interface, duration=duration).devices
compatible_devices = {}
for dev in devices:
try:
characteristics = [
chrc
for chrc in self.discover_characteristics(
dev['addr'],
channel_type='random',
wait=False,
timeout=self.scan_timeout,
).characteristics
if chrc.get('uuid') == self.uuid
]
if characteristics:
compatible_devices[dev['addr']] = None
2021-04-05 00:58:44 +02:00
except Exception as e:
self.logger.warning('Device scan error', e)
self.publish_entities(compatible_devices) # type: ignore
return BluetoothScanResponse(devices=compatible_devices)
2018-05-08 15:51:47 +02:00
2019-07-02 12:02:28 +02:00
@property
def switches(self) -> List[dict]:
self.publish_entities(self.configured_devices) # type: ignore
2019-07-02 12:02:28 +02:00
return [
{
'address': addr,
'id': addr,
'name': name,
'on': False,
}
for addr, name in self.configured_devices.items()
]
2018-06-25 19:57:43 +02:00
def transform_entities(self, devices: dict):
from platypush.entities.switches import Switch
return super().transform_entities( # type: ignore
[
Switch(
id=addr,
name=name,
state=False,
is_write_only=True,
)
for addr, name in devices.items()
]
)
2018-05-08 15:51:47 +02:00
2019-07-02 12:02:28 +02:00
# vim:sw=4:ts=4:et: