- Added bluetooth LTE plugin
- Switchbot plugin rewritten as an implementation of bluetooth BLE see #89
This commit is contained in:
parent
e1ed7f681c
commit
af24f36519
4 changed files with 409 additions and 201 deletions
|
@ -98,4 +98,98 @@ class BluetoothLookupServiceResponse(BluetoothResponse):
|
|||
super().__init__(output=self.services, *args, **kwargs)
|
||||
|
||||
|
||||
class BluetoothDiscoverPrimaryResponse(BluetoothResponse):
|
||||
"""
|
||||
Example services response output::
|
||||
|
||||
[
|
||||
{
|
||||
"uuid": "00001800-0000-1000-8000-00805f9b34fb",
|
||||
"start": 1,
|
||||
"end": 7
|
||||
},
|
||||
{
|
||||
"uuid": "00001801-0000-1000-8000-00805f9b34fb",
|
||||
"start": 8,
|
||||
"end": 8
|
||||
},
|
||||
{
|
||||
"uuid": "0000fee7-0000-1000-8000-00805f9b34fb",
|
||||
"start": 9,
|
||||
"end": 16
|
||||
},
|
||||
{
|
||||
"uuid": "cba20d00-224d-11e6-9fb8-0002a5d5c51b",
|
||||
"start": 17,
|
||||
"end": 65535
|
||||
}
|
||||
]
|
||||
|
||||
"""
|
||||
def __init__(self, services: list, *args, **kwargs):
|
||||
self.services = services
|
||||
super().__init__(output=self.services, *args, **kwargs)
|
||||
|
||||
|
||||
class BluetoothDiscoverCharacteristicsResponse(BluetoothResponse):
|
||||
"""
|
||||
Example services response output::
|
||||
|
||||
[
|
||||
{
|
||||
"uuid": "00002a00-0000-1000-8000-00805f9b34fb",
|
||||
"handle": 2,
|
||||
"properties": 10,
|
||||
"value_handle": 3
|
||||
},
|
||||
{
|
||||
"uuid": "00002a01-0000-1000-8000-00805f9b34fb",
|
||||
"handle": 4,
|
||||
"properties": 2,
|
||||
"value_handle": 5
|
||||
},
|
||||
{
|
||||
"uuid": "00002a04-0000-1000-8000-00805f9b34fb",
|
||||
"handle": 6,
|
||||
"properties": 2,
|
||||
"value_handle": 7
|
||||
},
|
||||
{
|
||||
"uuid": "0000fec8-0000-1000-8000-00805f9b34fb",
|
||||
"handle": 10,
|
||||
"properties": 32,
|
||||
"value_handle": 11
|
||||
},
|
||||
{
|
||||
"uuid": "0000fec7-0000-1000-8000-00805f9b34fb",
|
||||
"handle": 13,
|
||||
"properties": 8,
|
||||
"value_handle": 14
|
||||
},
|
||||
{
|
||||
"uuid": "0000fec9-0000-1000-8000-00805f9b34fb",
|
||||
"handle": 15,
|
||||
"properties": 2,
|
||||
"value_handle": 16
|
||||
},
|
||||
{
|
||||
"uuid": "cba20003-224d-11e6-9fb8-0002a5d5c51b",
|
||||
"handle": 18,
|
||||
"properties": 16,
|
||||
"value_handle": 19
|
||||
},
|
||||
{
|
||||
"uuid": "cba20002-224d-11e6-9fb8-0002a5d5c51b",
|
||||
"handle": 21,
|
||||
"properties": 12,
|
||||
"value_handle": 22
|
||||
}
|
||||
]
|
||||
|
||||
"""
|
||||
def __init__(self, characteristics: list, *args, **kwargs):
|
||||
self.characteristics = characteristics
|
||||
super().__init__(output=self.characteristics, *args, **kwargs)
|
||||
|
||||
|
||||
# vim:sw=4:ts=4:et:
|
||||
|
|
259
platypush/plugins/bluetooth/ble.py
Normal file
259
platypush/plugins/bluetooth/ble.py
Normal file
|
@ -0,0 +1,259 @@
|
|||
import base64
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
|
||||
from platypush.plugins import Plugin, action
|
||||
from platypush.message.response.bluetooth import BluetoothScanResponse, BluetoothDiscoverPrimaryResponse, \
|
||||
BluetoothDiscoverCharacteristicsResponse
|
||||
|
||||
|
||||
class BluetoothBlePlugin(Plugin):
|
||||
"""
|
||||
Bluetooth BLE (low-energy) plugin
|
||||
|
||||
Requires:
|
||||
|
||||
* **pybluez** (``pip install pybluez``)
|
||||
* **gattlib** (``pip install gattlib``)
|
||||
|
||||
Note that the support for bluetooth low-energy devices on Linux requires:
|
||||
|
||||
* A bluetooth adapter compatible with the bluetooth 5.0 specification or higher;
|
||||
* To run platypush with root privileges (which is usually a very bad idea), or to set the raw net
|
||||
capabilities on the Python executable (which is also a bad idea, because any Python script will
|
||||
be able to access the kernel raw network API, but it's probably better than running a network
|
||||
server that can execute system commands as root user). If you don't want to set special permissions
|
||||
on the main Python executable and you want to run the bluetooth LTE plugin then the advised approach
|
||||
is to install platypush in a virtual environment and set the capabilities on the venv python executable,
|
||||
or run your platypush instance in Docker.
|
||||
|
||||
You can set the capabilities on the Python executable through the following shell command::
|
||||
|
||||
[sudo] setcap 'cap_net_raw,cap_net_admin+eip' /path/to/your/python
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, interface: str = 'hci0', **kwargs):
|
||||
"""
|
||||
:param interface: Default adapter device to be used (default: 'hci0')
|
||||
"""
|
||||
super().__init__(**kwargs)
|
||||
self.interface = interface
|
||||
self._req_by_addr = {}
|
||||
|
||||
@staticmethod
|
||||
def _get_python_interpreter() -> str:
|
||||
exe = sys.executable
|
||||
|
||||
while os.path.islink(exe):
|
||||
target = os.readlink(exe)
|
||||
if not os.path.isabs(target):
|
||||
target = os.path.abspath(os.path.join(os.path.dirname(exe), target))
|
||||
exe = target
|
||||
|
||||
return exe
|
||||
|
||||
@staticmethod
|
||||
def _python_has_ble_capabilities(exe: str) -> bool:
|
||||
getcap = subprocess.Popen(['getcap', exe], stdout=subprocess.PIPE)
|
||||
output = getcap.communicate()[0].decode().split('\n')
|
||||
if not output:
|
||||
return False
|
||||
|
||||
caps = set(output.pop(0).split('=').pop().strip().split(','))
|
||||
return 'cap_net_raw+eip' in caps and 'cap_net_admin' in caps
|
||||
|
||||
def _check_ble_support(self):
|
||||
# Check if the script is running as root or if the Python executable
|
||||
# has 'cap_net_admin,cap_net_raw+eip' capabilities
|
||||
exe = self._get_python_interpreter()
|
||||
if os.getuid() != 0 and not self._python_has_ble_capabilities(exe):
|
||||
raise RuntimeError('You are not running platypush as root and the Python interpreter has no ' +
|
||||
'capabilities/permissions to access the BLE stack. Set the permissions on ' +
|
||||
'your Python interpreter through:\n' +
|
||||
'\t[sudo] setcap "cap_net_raw,cap_net_admin+eip" {}'.format(exe))
|
||||
|
||||
@action
|
||||
def scan(self, interface: str = None, duration: int = 10) -> BluetoothScanResponse:
|
||||
"""
|
||||
Scan for nearby bluetooth low-energy devices
|
||||
|
||||
:param interface: Bluetooth adapter name to use (default configured if None)
|
||||
:param duration: Scan duration in seconds
|
||||
"""
|
||||
from bluetooth.ble import DiscoveryService
|
||||
|
||||
if interface is None:
|
||||
interface = self.interface
|
||||
|
||||
self._check_ble_support()
|
||||
svc = DiscoveryService(interface)
|
||||
devices = svc.discover(duration)
|
||||
return BluetoothScanResponse(devices)
|
||||
|
||||
# noinspection PyArgumentList
|
||||
@action
|
||||
def connect(self, device: str, interface: str = None, wait: bool = True, channel_type: str = 'public',
|
||||
security_level: str = 'low', psm: int = 0, mtu: int = 0, timeout: float = 10.0):
|
||||
"""
|
||||
Connect to a bluetooth LE device
|
||||
|
||||
:param device: Device address to connect to
|
||||
:param interface: Bluetooth adapter name to use (default configured if None)
|
||||
:param wait: If True then wait for the connection to be established before returning (no timeout)
|
||||
:param channel_type: Channel type, usually 'public' or 'random'
|
||||
:param security_level: Security level - possible values: ['low', 'medium', 'high']
|
||||
:param psm: PSM value (default: 0)
|
||||
:param mtu: MTU value (default: 0)
|
||||
:param timeout: Connection timeout if wait is not set (default: 10 seconds)
|
||||
"""
|
||||
from gattlib import GATTRequester
|
||||
|
||||
req = self._req_by_addr.get(device)
|
||||
if req:
|
||||
if req.is_connected():
|
||||
self.logger.info('Device {} is already connected'.format(device))
|
||||
return
|
||||
|
||||
self._req_by_addr[device] = None
|
||||
|
||||
if not interface:
|
||||
interface = self.interface
|
||||
if interface:
|
||||
req = GATTRequester(device, False, interface)
|
||||
else:
|
||||
req = GATTRequester(device, False)
|
||||
|
||||
self.logger.info('Connecting to {}'.format(device))
|
||||
connect_start_time = time.time()
|
||||
req.connect(wait, channel_type, security_level, psm, mtu)
|
||||
|
||||
if not wait:
|
||||
while not req.is_connected():
|
||||
if time.time() - connect_start_time > timeout:
|
||||
raise TimeoutError('Connection to {} timed out'.format(device))
|
||||
time.sleep(0.1)
|
||||
|
||||
self.logger.info('Connected to {}'.format(device))
|
||||
self._req_by_addr[device] = req
|
||||
|
||||
@action
|
||||
def read(self, device: str, interface: str = None, uuid: str = None, handle: int = None,
|
||||
binary: bool = False, disconnect_on_recv: bool = True, **kwargs) -> str:
|
||||
"""
|
||||
Read a message from a device
|
||||
|
||||
:param device: Device address to connect to
|
||||
:param interface: Bluetooth adapter name to use (default configured if None)
|
||||
:param uuid: Service UUID. Either the UUID or the device handle must be specified
|
||||
:param handle: Device handle. Either the UUID or the device handle must be specified
|
||||
:param binary: Set to true to return data as a base64-encoded binary string
|
||||
:param disconnect_on_recv: If True (default) disconnect when the response is received
|
||||
:param kwargs: Extra arguments to be passed to :meth:`connect`
|
||||
"""
|
||||
if interface is None:
|
||||
interface = self.interface
|
||||
if not (uuid or handle):
|
||||
raise AttributeError('Specify either uuid or handle')
|
||||
|
||||
self.connect(device, interface=interface, **kwargs)
|
||||
req = self._req_by_addr[device]
|
||||
|
||||
if uuid:
|
||||
data = req.read_by_uuid(uuid)[0]
|
||||
else:
|
||||
data = req.read_by_handle(handle)[0]
|
||||
|
||||
if binary:
|
||||
data = base64.encodebytes(data.encode() if isinstance(data, str) else data).decode().strip()
|
||||
if disconnect_on_recv:
|
||||
self.disconnect(device)
|
||||
|
||||
return data
|
||||
|
||||
@action
|
||||
def write(self, device: str, data, handle: int = None, interface: str = None, binary: bool = False,
|
||||
disconnect_on_recv: bool = True, **kwargs) -> str:
|
||||
"""
|
||||
Writes data to a device
|
||||
|
||||
:param device: Device address to connect to
|
||||
:param data: Data to be written (str or bytes)
|
||||
:param interface: Bluetooth adapter name to use (default configured if None)
|
||||
:param handle: Device handle. Either the UUID or the device handle must be specified
|
||||
:param binary: Set to true if data is a base64-encoded binary string
|
||||
:param disconnect_on_recv: If True (default) disconnect when the response is received
|
||||
:param kwargs: Extra arguments to be passed to :meth:`connect`
|
||||
"""
|
||||
if interface is None:
|
||||
interface = self.interface
|
||||
if binary:
|
||||
data = base64.decodebytes(data.encode() if isinstance(data, str) else data)
|
||||
|
||||
self.connect(device, interface=interface, **kwargs)
|
||||
req = self._req_by_addr[device]
|
||||
|
||||
data = req.write_by_handle(handle, data)[0]
|
||||
|
||||
if binary:
|
||||
data = base64.encodebytes(data.encode() if isinstance(data, str) else data).decode().strip()
|
||||
if disconnect_on_recv:
|
||||
self.disconnect(device)
|
||||
|
||||
return data
|
||||
|
||||
@action
|
||||
def disconnect(self, device: str):
|
||||
"""
|
||||
Disconnect from a connected device
|
||||
|
||||
:param device: Device address
|
||||
"""
|
||||
req = self._req_by_addr.get(device)
|
||||
if not req:
|
||||
self.logger.info('Device {} not connected'.format(device))
|
||||
|
||||
req.disconnect()
|
||||
self.logger.info('Device {} disconnected'.format(device))
|
||||
|
||||
@action
|
||||
def discover_primary(self, device: str, interface: str = None, **kwargs) -> BluetoothDiscoverPrimaryResponse:
|
||||
"""
|
||||
Discover the primary services advertised by a LE bluetooth device
|
||||
|
||||
:param device: Device address to connect to
|
||||
:param interface: Bluetooth adapter name to use (default configured if None)
|
||||
:param kwargs: Extra arguments to be passed to :meth:`connect`
|
||||
"""
|
||||
if interface is None:
|
||||
interface = self.interface
|
||||
|
||||
self.connect(device, interface=interface, **kwargs)
|
||||
req = self._req_by_addr[device]
|
||||
services = req.discover_primary()
|
||||
self.disconnect(device)
|
||||
return BluetoothDiscoverPrimaryResponse(services=services)
|
||||
|
||||
@action
|
||||
def discover_characteristics(self, device: str, interface: str = None, **kwargs) \
|
||||
-> BluetoothDiscoverCharacteristicsResponse:
|
||||
"""
|
||||
Discover the characteristics of a LE bluetooth device
|
||||
|
||||
:param device: Device address to connect to
|
||||
:param interface: Bluetooth adapter name to use (default configured if None)
|
||||
:param kwargs: Extra arguments to be passed to :meth:`connect`
|
||||
"""
|
||||
if interface is None:
|
||||
interface = self.interface
|
||||
|
||||
self.connect(device, interface=interface, **kwargs)
|
||||
req = self._req_by_addr[device]
|
||||
characteristics = req.discover_characteristics()
|
||||
self.disconnect(device)
|
||||
return BluetoothDiscoverCharacteristicsResponse(characteristics=characteristics)
|
||||
|
||||
|
||||
# vim:sw=4:ts=4:et:
|
|
@ -1,96 +1,18 @@
|
|||
import struct
|
||||
import subprocess
|
||||
import time
|
||||
import enum
|
||||
|
||||
from platypush.message.response.bluetooth import BluetoothScanResponse
|
||||
from platypush.plugins import action
|
||||
from platypush.plugins.bluetooth.ble import BluetoothBlePlugin
|
||||
from platypush.plugins.switch import SwitchPlugin
|
||||
|
||||
|
||||
class Scanner(object):
|
||||
"""
|
||||
XXX The Scanner object doesn't work. Add your devices by address statically to the plugin configuration for now
|
||||
instead of relying on scanning capabilities
|
||||
"""
|
||||
|
||||
service_uuid = '1bc5d5a5-0200b89f-e6114d22-000da2cb'
|
||||
|
||||
def __init__(self, bt_interface=None, timeout_secs=None):
|
||||
self.bt_interface = bt_interface
|
||||
self.timeout_secs = timeout_secs if timeout_secs else 2
|
||||
|
||||
@classmethod
|
||||
def _get_uuids(cls, device):
|
||||
uuids = set()
|
||||
|
||||
for uuid in device['uuids']:
|
||||
if isinstance(uuid, tuple):
|
||||
uuid = ''
|
||||
for i in range(0, len(uuid)):
|
||||
token = struct.pack('<I', uuid[i])
|
||||
for byte in token:
|
||||
uuid += hex(byte)[2:].zfill(2)
|
||||
uuid += ('-' if i < len(uuid)-1 else '')
|
||||
uuids.add(uuid)
|
||||
else:
|
||||
uuids.add(hex(uuid)[2:])
|
||||
|
||||
return uuids
|
||||
|
||||
def scan(self):
|
||||
from bluetooth.ble import DiscoveryService
|
||||
service = DiscoveryService(self.bt_interface) \
|
||||
if self.bt_interface else DiscoveryService()
|
||||
|
||||
devices = service.discover(self.timeout_secs)
|
||||
return sorted([addr for addr, device in devices.items()
|
||||
if self.service_uuid in self._get_uuids(device)])
|
||||
|
||||
|
||||
class Driver(object):
|
||||
handle = 0x16
|
||||
commands = {
|
||||
'press': '\x57\x01\x00',
|
||||
'on': '\x57\x01\x01',
|
||||
'off': '\x57\x01\x02',
|
||||
}
|
||||
|
||||
def __init__(self, device, bt_interface=None, timeout_secs=None):
|
||||
self.device = device
|
||||
self.bt_interface = bt_interface
|
||||
self.timeout_secs = timeout_secs if timeout_secs else 5
|
||||
self.req = None
|
||||
|
||||
def connect(self):
|
||||
from bluetooth.ble import GATTRequester
|
||||
if self.bt_interface:
|
||||
self.req = GATTRequester(self.device, False, self.bt_interface)
|
||||
else:
|
||||
self.req = GATTRequester(self.device, False)
|
||||
|
||||
self.req.connect(True, 'random')
|
||||
connect_start_time = time.time()
|
||||
|
||||
while not self.req.is_connected():
|
||||
if time.time() - connect_start_time >= self.timeout_secs:
|
||||
raise RuntimeError('Connection to {} timed out after {} seconds'
|
||||
.format(self.device, self.timeout_secs))
|
||||
|
||||
def run_command(self, command):
|
||||
self.req.write_by_handle(self.handle, self.commands[command])
|
||||
data = self.req.read_by_handle(self.handle)
|
||||
return data
|
||||
|
||||
|
||||
class SwitchSwitchbotPlugin(SwitchPlugin):
|
||||
class SwitchSwitchbotPlugin(SwitchPlugin, BluetoothBlePlugin):
|
||||
"""
|
||||
Plugin to interact with a Switchbot (https://www.switch-bot.com/) device and
|
||||
programmatically control buttons.
|
||||
|
||||
NOTE: since the interaction with the Switchbot requires root privileges
|
||||
(in order to scan on the bluetooth interface or setting gattlib in random),
|
||||
this plugin just wraps the module into a `sudo` flavor, since running
|
||||
Platypush with root privileges should be considered as a very bad idea.
|
||||
Make sure that your user has sudo privileges for running this plugin.
|
||||
See :class:`platypush.plugins.bluetooth.ble.BluetoothBlePlugin` for how to enable BLE permissions for
|
||||
the platypush user (a simple solution may be to run it as root, but that's usually NOT a good idea).
|
||||
|
||||
Requires:
|
||||
|
||||
|
@ -99,56 +21,52 @@ class SwitchSwitchbotPlugin(SwitchPlugin):
|
|||
* **libboost** (on Debian ```apt-get install libboost-python-dev libboost-thread-dev``)
|
||||
"""
|
||||
|
||||
def __init__(self, bt_interface=None, connect_timeout=None,
|
||||
uuid = 'cba20002-224d-11e6-9fb8-0002a5d5c51b'
|
||||
handle = 0x16
|
||||
|
||||
class Command(enum.Enum):
|
||||
"""
|
||||
Base64 encoded commands
|
||||
"""
|
||||
# \x57\x01\x00
|
||||
PRESS = 'VwEA'
|
||||
# # \x57\x01\x01
|
||||
ON = 'VwEB'
|
||||
# # \x57\x01\x02
|
||||
OFF = 'VwEC'
|
||||
|
||||
def __init__(self, interface=None, connect_timeout=None,
|
||||
scan_timeout=None, devices=None, **kwargs):
|
||||
"""
|
||||
:param bt_interface: Bluetooth interface to use (e.g. hci0) default: first available one
|
||||
:type bt_interface: str
|
||||
:param interface: Bluetooth interface to use (e.g. hci0) default: first available one
|
||||
:type interface: str
|
||||
|
||||
:param connecct_timeout: Timeout for the conncection to the Switchbot device - default: None
|
||||
:param connect_timeout: Timeout for the connection to the Switchbot device - default: None
|
||||
:type connect_timeout: float
|
||||
|
||||
:param scan_timeout: Timeout for the scan operations - default: None
|
||||
:type scan_timeout: float
|
||||
|
||||
:param devices: Devices to control, as a BMAC address -> name map
|
||||
:param devices: Devices to control, as a MAC address -> name map
|
||||
:type devices: dict
|
||||
"""
|
||||
|
||||
super().__init__(**kwargs)
|
||||
SwitchPlugin.__init__(self, **kwargs)
|
||||
BluetoothBlePlugin.__init__(self, interface=interface)
|
||||
|
||||
if devices is None:
|
||||
devices = {}
|
||||
|
||||
self.bt_interface = bt_interface
|
||||
self.connect_timeout = connect_timeout if connect_timeout else 5
|
||||
self.scan_timeout = scan_timeout if scan_timeout else 2
|
||||
self.configured_devices = devices
|
||||
self.configured_devices = devices or {}
|
||||
self.configured_devices_by_name = {
|
||||
name: addr
|
||||
for addr, name in self.configured_devices.items()
|
||||
}
|
||||
|
||||
def _run(self, device, command=None):
|
||||
def _run(self, device: str, command: Command):
|
||||
if device in self.configured_devices_by_name:
|
||||
device = self.configured_devices_by_name[device]
|
||||
|
||||
try:
|
||||
# XXX this requires sudo and it's executed in its own process
|
||||
# because the Switchbot plugin requires root privileges to send
|
||||
# raw bluetooth messages on the interface. Make sure that the user
|
||||
# that runs platypush has the right permissions to run this with sudo
|
||||
output = subprocess.check_output((
|
||||
'sudo python3 -m platypush.plugins.switch.switchbot ' +
|
||||
'--device {} ' +
|
||||
('--interface {} '.format(self.bt_interface) if self.bt_interface else '') +
|
||||
('--connect-timeout {} '.format(self.connect_timeout) if self.connect_timeout else '') +
|
||||
('--{} '.format(command) if command else '')).format(device),
|
||||
stderr=subprocess.STDOUT, shell=True).decode('utf-8')
|
||||
except subprocess.CalledProcessError as e:
|
||||
raise RuntimeError(e.output.decode('utf-8'))
|
||||
|
||||
self.logger.info('Output of switchbot command: {}'.format(output))
|
||||
self.write(device, command.value, handle=self.handle, channel_type='random', binary=True)
|
||||
return self.status(device)
|
||||
|
||||
@action
|
||||
|
@ -159,7 +77,7 @@ class SwitchSwitchbotPlugin(SwitchPlugin):
|
|||
:param device: Device name or address
|
||||
:type device: str
|
||||
"""
|
||||
return self._run(device)
|
||||
return self._run(device, self.Command.PRESS)
|
||||
|
||||
@action
|
||||
def toggle(self, device, **kwargs):
|
||||
|
@ -173,7 +91,7 @@ class SwitchSwitchbotPlugin(SwitchPlugin):
|
|||
:param device: Device name or address
|
||||
:type device: str
|
||||
"""
|
||||
return self._run(device, 'on')
|
||||
return self._run(device, self.Command.ON)
|
||||
|
||||
@action
|
||||
def off(self, device, **kwargs):
|
||||
|
@ -183,23 +101,35 @@ class SwitchSwitchbotPlugin(SwitchPlugin):
|
|||
:param device: Device name or address
|
||||
:type device: str
|
||||
"""
|
||||
return self._run(device, 'off')
|
||||
return self._run(device, self.Command.OFF)
|
||||
|
||||
@action
|
||||
def scan(self):
|
||||
def scan(self, interface: str = None, duration: int = 10) -> BluetoothScanResponse:
|
||||
"""
|
||||
Scan for available Switchbot devices nearby.
|
||||
XXX This action doesn't work for now. Configure your devices statically for now instead of
|
||||
relying on the scanner
|
||||
|
||||
:param interface: Bluetooth interface to scan (default: default configured interface)
|
||||
:param duration: Scan duration in seconds
|
||||
"""
|
||||
try:
|
||||
return subprocess.check_output(
|
||||
'sudo python3 -m platypush.plugins.switch.switchbot --scan ' +
|
||||
('--interface {} '.format(self.bt_interface) if self.bt_interface else '') +
|
||||
('--scan-timeout {} '.format(self.scan_timeout) if self.scan_timeout else ''),
|
||||
stderr=subprocess.STDOUT, shell=True).decode('utf-8')
|
||||
except subprocess.CalledProcessError as e:
|
||||
raise RuntimeError(e.output.decode('utf-8'))
|
||||
|
||||
devices = super().scan(interface=interface, duration=duration).devices
|
||||
compatible_devices = {}
|
||||
|
||||
for dev in devices:
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
characteristics = [
|
||||
chrc for chrc in self.discover_characteristics(
|
||||
dev['addr'], channel_type='random', wait=False, timeout=2.0).characteristics
|
||||
if chrc.get('uuid') == self.uuid
|
||||
]
|
||||
|
||||
if characteristics:
|
||||
compatible_devices[dev['addr']] = None
|
||||
except:
|
||||
pass
|
||||
|
||||
return BluetoothScanResponse(devices=compatible_devices)
|
||||
|
||||
@property
|
||||
def devices(self):
|
||||
|
|
|
@ -1,75 +0,0 @@
|
|||
import argparse
|
||||
import sys
|
||||
|
||||
from . import Driver
|
||||
from . import Scanner
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument('--scan', '-s', dest='scan', required=False, default=False, action='store_true',
|
||||
help="Run Switchbot in scan mode - scan devices to control")
|
||||
|
||||
parser.add_argument('--scan-timeout', dest='scan_timeout', required=False, default=None,
|
||||
help="Device scan timeout (default: 2 seconds)")
|
||||
|
||||
parser.add_argument('--connect-timeout', dest='connect_timeout', required=False, default=None,
|
||||
help="Device connection timeout (default: 5 seconds)")
|
||||
|
||||
parser.add_argument('--device', '-d', dest='device', required=False, default=None,
|
||||
help="Specify the address of a device to control")
|
||||
|
||||
parser.add_argument('--interface', '-i', dest='interface', required=False, default=None,
|
||||
help="Name of the bluetooth adapter (default: hci0 or whichever is the default)")
|
||||
|
||||
parser.add_argument('--press', '-p', dest='press', required=False, default=None, action='store_true',
|
||||
help="Send a press command (default)")
|
||||
|
||||
parser.add_argument('--on', dest='on', required=False, default=None, action='store_true',
|
||||
help="Send a press on command")
|
||||
|
||||
parser.add_argument('--off', dest='off', required=False, default=None, action='store_true',
|
||||
help="Send a press on command")
|
||||
|
||||
opts, args = parser.parse_known_args(sys.argv[1:])
|
||||
|
||||
if opts.scan:
|
||||
scanner = Scanner(opts.interface, int(opts.scan_timeout))
|
||||
devices = scanner.scan()
|
||||
|
||||
if not devices:
|
||||
print('No Switchbots found')
|
||||
sys.exit(1)
|
||||
|
||||
print('Found {} devices: {}'.format(len(devices), devices))
|
||||
print('Enter the number of the device you want to control:')
|
||||
|
||||
for i in range(0, len(devices)):
|
||||
print('\t{}\t{}'.format(i, devices[i]))
|
||||
|
||||
i = int(input())
|
||||
bt_addr = devices[i]
|
||||
elif opts.device:
|
||||
bt_addr = opts.device
|
||||
else:
|
||||
raise RuntimeError('Please specify at least one mode between --scan and --device')
|
||||
|
||||
driver = Driver(device=bt_addr, bt_interface=opts.interface, timeout_secs=int(opts.connect_timeout))
|
||||
driver.connect()
|
||||
print('Connected!')
|
||||
|
||||
if opts.on:
|
||||
driver.run_command('on')
|
||||
elif opts.off:
|
||||
driver.run_command('off')
|
||||
else:
|
||||
driver.run_command('press')
|
||||
|
||||
print('Command execution successful')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
||||
|
||||
# vim:sw=4:ts=4:et:
|
||||
|
Loading…
Reference in a new issue