From af24f36519e585e92b9093e2a4a0efde3647af18 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Fri, 13 Dec 2019 02:08:43 +0100 Subject: [PATCH] - Added bluetooth LTE plugin - Switchbot plugin rewritten as an implementation of bluetooth BLE see #89 --- platypush/message/response/bluetooth.py | 94 +++++++ platypush/plugins/bluetooth/ble.py | 259 ++++++++++++++++++ .../plugins/switch/switchbot/__init__.py | 182 ++++-------- .../plugins/switch/switchbot/__main__.py | 75 ----- 4 files changed, 409 insertions(+), 201 deletions(-) create mode 100644 platypush/plugins/bluetooth/ble.py delete mode 100644 platypush/plugins/switch/switchbot/__main__.py diff --git a/platypush/message/response/bluetooth.py b/platypush/message/response/bluetooth.py index 69e3cc75..bcc82fec 100644 --- a/platypush/message/response/bluetooth.py +++ b/platypush/message/response/bluetooth.py @@ -98,4 +98,98 @@ class BluetoothLookupServiceResponse(BluetoothResponse): super().__init__(output=self.services, *args, **kwargs) +class BluetoothDiscoverPrimaryResponse(BluetoothResponse): + """ + Example services response output:: + + [ + { + "uuid": "00001800-0000-1000-8000-00805f9b34fb", + "start": 1, + "end": 7 + }, + { + "uuid": "00001801-0000-1000-8000-00805f9b34fb", + "start": 8, + "end": 8 + }, + { + "uuid": "0000fee7-0000-1000-8000-00805f9b34fb", + "start": 9, + "end": 16 + }, + { + "uuid": "cba20d00-224d-11e6-9fb8-0002a5d5c51b", + "start": 17, + "end": 65535 + } + ] + + """ + def __init__(self, services: list, *args, **kwargs): + self.services = services + super().__init__(output=self.services, *args, **kwargs) + + +class BluetoothDiscoverCharacteristicsResponse(BluetoothResponse): + """ + Example services response output:: + + [ + { + "uuid": "00002a00-0000-1000-8000-00805f9b34fb", + "handle": 2, + "properties": 10, + "value_handle": 3 + }, + { + "uuid": "00002a01-0000-1000-8000-00805f9b34fb", + "handle": 4, + "properties": 2, + "value_handle": 5 + }, + { + "uuid": "00002a04-0000-1000-8000-00805f9b34fb", + "handle": 6, + "properties": 2, + "value_handle": 7 + }, + { + "uuid": "0000fec8-0000-1000-8000-00805f9b34fb", + "handle": 10, + "properties": 32, + "value_handle": 11 + }, + { + "uuid": "0000fec7-0000-1000-8000-00805f9b34fb", + "handle": 13, + "properties": 8, + "value_handle": 14 + }, + { + "uuid": "0000fec9-0000-1000-8000-00805f9b34fb", + "handle": 15, + "properties": 2, + "value_handle": 16 + }, + { + "uuid": "cba20003-224d-11e6-9fb8-0002a5d5c51b", + "handle": 18, + "properties": 16, + "value_handle": 19 + }, + { + "uuid": "cba20002-224d-11e6-9fb8-0002a5d5c51b", + "handle": 21, + "properties": 12, + "value_handle": 22 + } + ] + + """ + def __init__(self, characteristics: list, *args, **kwargs): + self.characteristics = characteristics + super().__init__(output=self.characteristics, *args, **kwargs) + + # vim:sw=4:ts=4:et: diff --git a/platypush/plugins/bluetooth/ble.py b/platypush/plugins/bluetooth/ble.py new file mode 100644 index 00000000..0c8c70cb --- /dev/null +++ b/platypush/plugins/bluetooth/ble.py @@ -0,0 +1,259 @@ +import base64 +import os +import subprocess +import sys +import time + +from platypush.plugins import Plugin, action +from platypush.message.response.bluetooth import BluetoothScanResponse, BluetoothDiscoverPrimaryResponse, \ + BluetoothDiscoverCharacteristicsResponse + + +class BluetoothBlePlugin(Plugin): + """ + Bluetooth BLE (low-energy) plugin + + Requires: + + * **pybluez** (``pip install pybluez``) + * **gattlib** (``pip install gattlib``) + + Note that the support for bluetooth low-energy devices on Linux requires: + + * A bluetooth adapter compatible with the bluetooth 5.0 specification or higher; + * To run platypush with root privileges (which is usually a very bad idea), or to set the raw net + capabilities on the Python executable (which is also a bad idea, because any Python script will + be able to access the kernel raw network API, but it's probably better than running a network + server that can execute system commands as root user). If you don't want to set special permissions + on the main Python executable and you want to run the bluetooth LTE plugin then the advised approach + is to install platypush in a virtual environment and set the capabilities on the venv python executable, + or run your platypush instance in Docker. + + You can set the capabilities on the Python executable through the following shell command:: + + [sudo] setcap 'cap_net_raw,cap_net_admin+eip' /path/to/your/python + + """ + + def __init__(self, interface: str = 'hci0', **kwargs): + """ + :param interface: Default adapter device to be used (default: 'hci0') + """ + super().__init__(**kwargs) + self.interface = interface + self._req_by_addr = {} + + @staticmethod + def _get_python_interpreter() -> str: + exe = sys.executable + + while os.path.islink(exe): + target = os.readlink(exe) + if not os.path.isabs(target): + target = os.path.abspath(os.path.join(os.path.dirname(exe), target)) + exe = target + + return exe + + @staticmethod + def _python_has_ble_capabilities(exe: str) -> bool: + getcap = subprocess.Popen(['getcap', exe], stdout=subprocess.PIPE) + output = getcap.communicate()[0].decode().split('\n') + if not output: + return False + + caps = set(output.pop(0).split('=').pop().strip().split(',')) + return 'cap_net_raw+eip' in caps and 'cap_net_admin' in caps + + def _check_ble_support(self): + # Check if the script is running as root or if the Python executable + # has 'cap_net_admin,cap_net_raw+eip' capabilities + exe = self._get_python_interpreter() + if os.getuid() != 0 and not self._python_has_ble_capabilities(exe): + raise RuntimeError('You are not running platypush as root and the Python interpreter has no ' + + 'capabilities/permissions to access the BLE stack. Set the permissions on ' + + 'your Python interpreter through:\n' + + '\t[sudo] setcap "cap_net_raw,cap_net_admin+eip" {}'.format(exe)) + + @action + def scan(self, interface: str = None, duration: int = 10) -> BluetoothScanResponse: + """ + Scan for nearby bluetooth low-energy devices + + :param interface: Bluetooth adapter name to use (default configured if None) + :param duration: Scan duration in seconds + """ + from bluetooth.ble import DiscoveryService + + if interface is None: + interface = self.interface + + self._check_ble_support() + svc = DiscoveryService(interface) + devices = svc.discover(duration) + return BluetoothScanResponse(devices) + + # noinspection PyArgumentList + @action + def connect(self, device: str, interface: str = None, wait: bool = True, channel_type: str = 'public', + security_level: str = 'low', psm: int = 0, mtu: int = 0, timeout: float = 10.0): + """ + Connect to a bluetooth LE device + + :param device: Device address to connect to + :param interface: Bluetooth adapter name to use (default configured if None) + :param wait: If True then wait for the connection to be established before returning (no timeout) + :param channel_type: Channel type, usually 'public' or 'random' + :param security_level: Security level - possible values: ['low', 'medium', 'high'] + :param psm: PSM value (default: 0) + :param mtu: MTU value (default: 0) + :param timeout: Connection timeout if wait is not set (default: 10 seconds) + """ + from gattlib import GATTRequester + + req = self._req_by_addr.get(device) + if req: + if req.is_connected(): + self.logger.info('Device {} is already connected'.format(device)) + return + + self._req_by_addr[device] = None + + if not interface: + interface = self.interface + if interface: + req = GATTRequester(device, False, interface) + else: + req = GATTRequester(device, False) + + self.logger.info('Connecting to {}'.format(device)) + connect_start_time = time.time() + req.connect(wait, channel_type, security_level, psm, mtu) + + if not wait: + while not req.is_connected(): + if time.time() - connect_start_time > timeout: + raise TimeoutError('Connection to {} timed out'.format(device)) + time.sleep(0.1) + + self.logger.info('Connected to {}'.format(device)) + self._req_by_addr[device] = req + + @action + def read(self, device: str, interface: str = None, uuid: str = None, handle: int = None, + binary: bool = False, disconnect_on_recv: bool = True, **kwargs) -> str: + """ + Read a message from a device + + :param device: Device address to connect to + :param interface: Bluetooth adapter name to use (default configured if None) + :param uuid: Service UUID. Either the UUID or the device handle must be specified + :param handle: Device handle. Either the UUID or the device handle must be specified + :param binary: Set to true to return data as a base64-encoded binary string + :param disconnect_on_recv: If True (default) disconnect when the response is received + :param kwargs: Extra arguments to be passed to :meth:`connect` + """ + if interface is None: + interface = self.interface + if not (uuid or handle): + raise AttributeError('Specify either uuid or handle') + + self.connect(device, interface=interface, **kwargs) + req = self._req_by_addr[device] + + if uuid: + data = req.read_by_uuid(uuid)[0] + else: + data = req.read_by_handle(handle)[0] + + if binary: + data = base64.encodebytes(data.encode() if isinstance(data, str) else data).decode().strip() + if disconnect_on_recv: + self.disconnect(device) + + return data + + @action + def write(self, device: str, data, handle: int = None, interface: str = None, binary: bool = False, + disconnect_on_recv: bool = True, **kwargs) -> str: + """ + Writes data to a device + + :param device: Device address to connect to + :param data: Data to be written (str or bytes) + :param interface: Bluetooth adapter name to use (default configured if None) + :param handle: Device handle. Either the UUID or the device handle must be specified + :param binary: Set to true if data is a base64-encoded binary string + :param disconnect_on_recv: If True (default) disconnect when the response is received + :param kwargs: Extra arguments to be passed to :meth:`connect` + """ + if interface is None: + interface = self.interface + if binary: + data = base64.decodebytes(data.encode() if isinstance(data, str) else data) + + self.connect(device, interface=interface, **kwargs) + req = self._req_by_addr[device] + + data = req.write_by_handle(handle, data)[0] + + if binary: + data = base64.encodebytes(data.encode() if isinstance(data, str) else data).decode().strip() + if disconnect_on_recv: + self.disconnect(device) + + return data + + @action + def disconnect(self, device: str): + """ + Disconnect from a connected device + + :param device: Device address + """ + req = self._req_by_addr.get(device) + if not req: + self.logger.info('Device {} not connected'.format(device)) + + req.disconnect() + self.logger.info('Device {} disconnected'.format(device)) + + @action + def discover_primary(self, device: str, interface: str = None, **kwargs) -> BluetoothDiscoverPrimaryResponse: + """ + Discover the primary services advertised by a LE bluetooth device + + :param device: Device address to connect to + :param interface: Bluetooth adapter name to use (default configured if None) + :param kwargs: Extra arguments to be passed to :meth:`connect` + """ + if interface is None: + interface = self.interface + + self.connect(device, interface=interface, **kwargs) + req = self._req_by_addr[device] + services = req.discover_primary() + self.disconnect(device) + return BluetoothDiscoverPrimaryResponse(services=services) + + @action + def discover_characteristics(self, device: str, interface: str = None, **kwargs) \ + -> BluetoothDiscoverCharacteristicsResponse: + """ + Discover the characteristics of a LE bluetooth device + + :param device: Device address to connect to + :param interface: Bluetooth adapter name to use (default configured if None) + :param kwargs: Extra arguments to be passed to :meth:`connect` + """ + if interface is None: + interface = self.interface + + self.connect(device, interface=interface, **kwargs) + req = self._req_by_addr[device] + characteristics = req.discover_characteristics() + self.disconnect(device) + return BluetoothDiscoverCharacteristicsResponse(characteristics=characteristics) + + +# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/switch/switchbot/__init__.py b/platypush/plugins/switch/switchbot/__init__.py index 22c00ca2..233591d0 100644 --- a/platypush/plugins/switch/switchbot/__init__.py +++ b/platypush/plugins/switch/switchbot/__init__.py @@ -1,96 +1,18 @@ -import struct -import subprocess -import time +import enum +from platypush.message.response.bluetooth import BluetoothScanResponse from platypush.plugins import action +from platypush.plugins.bluetooth.ble import BluetoothBlePlugin from platypush.plugins.switch import SwitchPlugin -class Scanner(object): - """ - XXX The Scanner object doesn't work. Add your devices by address statically to the plugin configuration for now - instead of relying on scanning capabilities - """ - - service_uuid = '1bc5d5a5-0200b89f-e6114d22-000da2cb' - - def __init__(self, bt_interface=None, timeout_secs=None): - self.bt_interface = bt_interface - self.timeout_secs = timeout_secs if timeout_secs else 2 - - @classmethod - def _get_uuids(cls, device): - uuids = set() - - for uuid in device['uuids']: - if isinstance(uuid, tuple): - uuid = '' - for i in range(0, len(uuid)): - token = struct.pack('= self.timeout_secs: - raise RuntimeError('Connection to {} timed out after {} seconds' - .format(self.device, self.timeout_secs)) - - def run_command(self, command): - self.req.write_by_handle(self.handle, self.commands[command]) - data = self.req.read_by_handle(self.handle) - return data - - -class SwitchSwitchbotPlugin(SwitchPlugin): +class SwitchSwitchbotPlugin(SwitchPlugin, BluetoothBlePlugin): """ Plugin to interact with a Switchbot (https://www.switch-bot.com/) device and programmatically control buttons. - NOTE: since the interaction with the Switchbot requires root privileges - (in order to scan on the bluetooth interface or setting gattlib in random), - this plugin just wraps the module into a `sudo` flavor, since running - Platypush with root privileges should be considered as a very bad idea. - Make sure that your user has sudo privileges for running this plugin. + See :class:`platypush.plugins.bluetooth.ble.BluetoothBlePlugin` for how to enable BLE permissions for + the platypush user (a simple solution may be to run it as root, but that's usually NOT a good idea). Requires: @@ -99,56 +21,52 @@ class SwitchSwitchbotPlugin(SwitchPlugin): * **libboost** (on Debian ```apt-get install libboost-python-dev libboost-thread-dev``) """ - def __init__(self, bt_interface=None, connect_timeout=None, + uuid = 'cba20002-224d-11e6-9fb8-0002a5d5c51b' + handle = 0x16 + + class Command(enum.Enum): + """ + Base64 encoded commands + """ + # \x57\x01\x00 + PRESS = 'VwEA' + # # \x57\x01\x01 + ON = 'VwEB' + # # \x57\x01\x02 + OFF = 'VwEC' + + def __init__(self, interface=None, connect_timeout=None, scan_timeout=None, devices=None, **kwargs): """ - :param bt_interface: Bluetooth interface to use (e.g. hci0) default: first available one - :type bt_interface: str + :param interface: Bluetooth interface to use (e.g. hci0) default: first available one + :type interface: str - :param connecct_timeout: Timeout for the conncection to the Switchbot device - default: None + :param connect_timeout: Timeout for the connection to the Switchbot device - default: None :type connect_timeout: float :param scan_timeout: Timeout for the scan operations - default: None :type scan_timeout: float - :param devices: Devices to control, as a BMAC address -> name map + :param devices: Devices to control, as a MAC address -> name map :type devices: dict """ - super().__init__(**kwargs) + SwitchPlugin.__init__(self, **kwargs) + BluetoothBlePlugin.__init__(self, interface=interface) - if devices is None: - devices = {} - - self.bt_interface = bt_interface self.connect_timeout = connect_timeout if connect_timeout else 5 self.scan_timeout = scan_timeout if scan_timeout else 2 - self.configured_devices = devices + self.configured_devices = devices or {} self.configured_devices_by_name = { name: addr for addr, name in self.configured_devices.items() } - def _run(self, device, command=None): + def _run(self, device: str, command: Command): if device in self.configured_devices_by_name: device = self.configured_devices_by_name[device] - try: - # XXX this requires sudo and it's executed in its own process - # because the Switchbot plugin requires root privileges to send - # raw bluetooth messages on the interface. Make sure that the user - # that runs platypush has the right permissions to run this with sudo - output = subprocess.check_output(( - 'sudo python3 -m platypush.plugins.switch.switchbot ' + - '--device {} ' + - ('--interface {} '.format(self.bt_interface) if self.bt_interface else '') + - ('--connect-timeout {} '.format(self.connect_timeout) if self.connect_timeout else '') + - ('--{} '.format(command) if command else '')).format(device), - stderr=subprocess.STDOUT, shell=True).decode('utf-8') - except subprocess.CalledProcessError as e: - raise RuntimeError(e.output.decode('utf-8')) - - self.logger.info('Output of switchbot command: {}'.format(output)) + self.write(device, command.value, handle=self.handle, channel_type='random', binary=True) return self.status(device) @action @@ -159,7 +77,7 @@ class SwitchSwitchbotPlugin(SwitchPlugin): :param device: Device name or address :type device: str """ - return self._run(device) + return self._run(device, self.Command.PRESS) @action def toggle(self, device, **kwargs): @@ -173,7 +91,7 @@ class SwitchSwitchbotPlugin(SwitchPlugin): :param device: Device name or address :type device: str """ - return self._run(device, 'on') + return self._run(device, self.Command.ON) @action def off(self, device, **kwargs): @@ -183,23 +101,35 @@ class SwitchSwitchbotPlugin(SwitchPlugin): :param device: Device name or address :type device: str """ - return self._run(device, 'off') + return self._run(device, self.Command.OFF) @action - def scan(self): + def scan(self, interface: str = None, duration: int = 10) -> BluetoothScanResponse: """ Scan for available Switchbot devices nearby. - XXX This action doesn't work for now. Configure your devices statically for now instead of - relying on the scanner + + :param interface: Bluetooth interface to scan (default: default configured interface) + :param duration: Scan duration in seconds """ - try: - return subprocess.check_output( - 'sudo python3 -m platypush.plugins.switch.switchbot --scan ' + - ('--interface {} '.format(self.bt_interface) if self.bt_interface else '') + - ('--scan-timeout {} '.format(self.scan_timeout) if self.scan_timeout else ''), - stderr=subprocess.STDOUT, shell=True).decode('utf-8') - except subprocess.CalledProcessError as e: - raise RuntimeError(e.output.decode('utf-8')) + + devices = super().scan(interface=interface, duration=duration).devices + compatible_devices = {} + + for dev in devices: + # noinspection PyBroadException + try: + characteristics = [ + chrc for chrc in self.discover_characteristics( + dev['addr'], channel_type='random', wait=False, timeout=2.0).characteristics + if chrc.get('uuid') == self.uuid + ] + + if characteristics: + compatible_devices[dev['addr']] = None + except: + pass + + return BluetoothScanResponse(devices=compatible_devices) @property def devices(self): diff --git a/platypush/plugins/switch/switchbot/__main__.py b/platypush/plugins/switch/switchbot/__main__.py deleted file mode 100644 index 62a3e28b..00000000 --- a/platypush/plugins/switch/switchbot/__main__.py +++ /dev/null @@ -1,75 +0,0 @@ -import argparse -import sys - -from . import Driver -from . import Scanner - -def main(): - parser = argparse.ArgumentParser() - parser.add_argument('--scan', '-s', dest='scan', required=False, default=False, action='store_true', - help="Run Switchbot in scan mode - scan devices to control") - - parser.add_argument('--scan-timeout', dest='scan_timeout', required=False, default=None, - help="Device scan timeout (default: 2 seconds)") - - parser.add_argument('--connect-timeout', dest='connect_timeout', required=False, default=None, - help="Device connection timeout (default: 5 seconds)") - - parser.add_argument('--device', '-d', dest='device', required=False, default=None, - help="Specify the address of a device to control") - - parser.add_argument('--interface', '-i', dest='interface', required=False, default=None, - help="Name of the bluetooth adapter (default: hci0 or whichever is the default)") - - parser.add_argument('--press', '-p', dest='press', required=False, default=None, action='store_true', - help="Send a press command (default)") - - parser.add_argument('--on', dest='on', required=False, default=None, action='store_true', - help="Send a press on command") - - parser.add_argument('--off', dest='off', required=False, default=None, action='store_true', - help="Send a press on command") - - opts, args = parser.parse_known_args(sys.argv[1:]) - - if opts.scan: - scanner = Scanner(opts.interface, int(opts.scan_timeout)) - devices = scanner.scan() - - if not devices: - print('No Switchbots found') - sys.exit(1) - - print('Found {} devices: {}'.format(len(devices), devices)) - print('Enter the number of the device you want to control:') - - for i in range(0, len(devices)): - print('\t{}\t{}'.format(i, devices[i])) - - i = int(input()) - bt_addr = devices[i] - elif opts.device: - bt_addr = opts.device - else: - raise RuntimeError('Please specify at least one mode between --scan and --device') - - driver = Driver(device=bt_addr, bt_interface=opts.interface, timeout_secs=int(opts.connect_timeout)) - driver.connect() - print('Connected!') - - if opts.on: - driver.run_command('on') - elif opts.off: - driver.run_command('off') - else: - driver.run_command('press') - - print('Command execution successful') - - -if __name__ == '__main__': - main() - - -# vim:sw=4:ts=4:et: -