diff --git a/platypush/plugins/switch/switchbot/__init__.py b/platypush/plugins/switch/switchbot/__init__.py new file mode 100644 index 0000000000..34b3a5c953 --- /dev/null +++ b/platypush/plugins/switch/switchbot/__init__.py @@ -0,0 +1,145 @@ +import logging +import struct +import subprocess +import time + +from bluetooth.ble import DiscoveryService, GATTRequester + +from platypush.message.response import Response + +from .. import SwitchPlugin + +class Scanner(object): + service_uuid = '1bc5d5a5-0200b89f-e6114d22-000da2cb' + + def __init__(self, bt_interface=None, timeout_secs=None): + self.bt_interface = bt_interface + self.timeout_secs = timeout_secs if timeout_secs else 2 + + + @classmethod + def _get_uuids(cls, device): + uuids = set() + + for id in device['uuids']: + if isinstance(id, tuple): + uuid = '' + for i in range(0, len(id)): + token = struct.pack('= self.timeout_secs: + raise RuntimeError('Connection to {} timed out after {} seconds' + .format(self.device, self.timeout_secs)) + + def run_command(self, command): + self.req.write_by_handle(self.handle, self.commands[command]) + data = self.req.read_by_handle(self.handle) + return data + + +class SwitchSwitchbotPlugin(SwitchPlugin): + """ + NOTE: since the interaction with the Switchbot requires root privileges + (in order to scan on the bluetooth interface or setting gattlib in random), + this plugin just wraps the module into a `sudo` flavor, since running + Platypush with root privileges should be considered as a very bad idea. + Make sure that your user has sudo privileges for running this bit of code. + """ + + def __init__(self, bt_interface=None, connect_timeout=None, + scan_timeout=None, *args, **kwargs): + self.bt_interface = bt_interface + self.connect_timeout = connect_timeout if connect_timeout else 5 + self.scan_timeout = scan_timeout if scan_timeout else 2 + + + def _run(self, device, command=None): + output = None + errors = [] + + try: + output = subprocess.check_output(( + 'sudo python3 -m platypush.plugins.switch.switchbot ' + + '--device {} ' + + ('--interface {} '.format(self.bt_interface) if self.bt_interface else '') + + ('--connect-timeout {} '.format(self.connect_timeout) if self.connect_timeout else '') + + ('--{} '.format(command) if command else '')).format(device), + stderr=subprocess.STDOUT, shell=True).decode('utf-8') + except subprocess.CalledProcessError as e: + errors = [e.output.decode('utf-8')] + + return Response(output=output, errors=errors) + + + def press(self, device): + return self._run(device) + + def on(self, device): + return self._run(device, 'on') + + def off(self, device): + return self._run(device, 'off') + + def scan(self): + output = None + errors = [] + + try: + print('sudo python3 -m platypush.plugins.switch.switchbot --scan ' + + ('--interface {} '.format(self.bt_interface) if self.bt_interface else '') + + ('--scan-timeout {} '.format(self.scan_timeout) if self.scan_timeout else '')) + + output = subprocess.check_output( + 'sudo python3 -m platypush.plugins.switch.switchbot --scan ' + + ('--interface {} '.format(self.bt_interface) if self.bt_interface else '') + + ('--scan-timeout {} '.format(self.scan_timeout) if self.scan_timeout else ''), + stderr=subprocess.STDOUT, shell=True).decode('utf-8') + except subprocess.CalledProcessError as e: + errors = [e.output.decode('utf-8')] + + return Response(output=output, errors=errors) + +# vim:sw=4:ts=4:et: + diff --git a/platypush/plugins/switch/switchbot/__main__.py b/platypush/plugins/switch/switchbot/__main__.py new file mode 100644 index 0000000000..62a3e28b2d --- /dev/null +++ b/platypush/plugins/switch/switchbot/__main__.py @@ -0,0 +1,75 @@ +import argparse +import sys + +from . import Driver +from . import Scanner + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument('--scan', '-s', dest='scan', required=False, default=False, action='store_true', + help="Run Switchbot in scan mode - scan devices to control") + + parser.add_argument('--scan-timeout', dest='scan_timeout', required=False, default=None, + help="Device scan timeout (default: 2 seconds)") + + parser.add_argument('--connect-timeout', dest='connect_timeout', required=False, default=None, + help="Device connection timeout (default: 5 seconds)") + + parser.add_argument('--device', '-d', dest='device', required=False, default=None, + help="Specify the address of a device to control") + + parser.add_argument('--interface', '-i', dest='interface', required=False, default=None, + help="Name of the bluetooth adapter (default: hci0 or whichever is the default)") + + parser.add_argument('--press', '-p', dest='press', required=False, default=None, action='store_true', + help="Send a press command (default)") + + parser.add_argument('--on', dest='on', required=False, default=None, action='store_true', + help="Send a press on command") + + parser.add_argument('--off', dest='off', required=False, default=None, action='store_true', + help="Send a press on command") + + opts, args = parser.parse_known_args(sys.argv[1:]) + + if opts.scan: + scanner = Scanner(opts.interface, int(opts.scan_timeout)) + devices = scanner.scan() + + if not devices: + print('No Switchbots found') + sys.exit(1) + + print('Found {} devices: {}'.format(len(devices), devices)) + print('Enter the number of the device you want to control:') + + for i in range(0, len(devices)): + print('\t{}\t{}'.format(i, devices[i])) + + i = int(input()) + bt_addr = devices[i] + elif opts.device: + bt_addr = opts.device + else: + raise RuntimeError('Please specify at least one mode between --scan and --device') + + driver = Driver(device=bt_addr, bt_interface=opts.interface, timeout_secs=int(opts.connect_timeout)) + driver.connect() + print('Connected!') + + if opts.on: + driver.run_command('on') + elif opts.off: + driver.run_command('off') + else: + driver.run_command('press') + + print('Command execution successful') + + +if __name__ == '__main__': + main() + + +# vim:sw=4:ts=4:et: +