"""Flic client library for python Requires python 3.3 or higher. For detailed documentation, see the protocol documentation. Notes on the data type used in this python implementation compared to the protocol documentation: All kind of integers are represented as python integers. Booleans use the Boolean type. Enums use the defined python enums below. Bd addr are represented as standard python strings, e.g. "aa:bb:cc:dd:ee:ff". """ from enum import Enum from collections import namedtuple import time import socket import select import struct import itertools import queue import threading class CreateConnectionChannelError(Enum): NoError = 0 MaxPendingConnectionsReached = 1 class ConnectionStatus(Enum): Disconnected = 0 Connected = 1 Ready = 2 class DisconnectReason(Enum): Unspecified = 0 ConnectionEstablishmentFailed = 1 TimedOut = 2 BondingKeysMismatch = 3 class RemovedReason(Enum): RemovedByThisClient = 0 ForceDisconnectedByThisClient = 1 ForceDisconnectedByOtherClient = 2 ButtonIsPrivate = 3 VerifyTimeout = 4 InternetBackendError = 5 InvalidData = 6 CouldntLoadDevice = 7 class ClickType(Enum): ButtonDown = 0 ButtonUp = 1 ButtonClick = 2 ButtonSingleClick = 3 ButtonDoubleClick = 4 ButtonHold = 5 class BdAddrType(Enum): PublicBdAddrType = 0 RandomBdAddrType = 1 class LatencyMode(Enum): NormalLatency = 0 LowLatency = 1 HighLatency = 2 class BluetoothControllerState(Enum): Detached = 0 Resetting = 1 Attached = 2 class ScanWizardResult(Enum): WizardSuccess = 0 WizardCancelledByUser = 1 WizardFailedTimeout = 2 WizardButtonIsPrivate = 3 WizardBluetoothUnavailable = 4 WizardInternetBackendError = 5 WizardInvalidData = 6 class ButtonScanner: """ButtonScanner class. Usage: scanner = ButtonScanner() scanner.on_advertisement_packet = lambda scanner, bd_addr, name, rssi, is_private, already_verified: ... client.add_scanner(scanner) """ _cnt = itertools.count() def __init__(self): self._scan_id = next(ButtonScanner._cnt) self.on_advertisement_packet = lambda scanner, bd_addr, name, rssi, is_private, already_verified: None class ScanWizard: """ScanWizard class Usage: wizard = ScanWizard() wizard.on_found_private_button = lambda scan_wizard: ... wizard.on_found_public_button = lambda scan_wizard, bd_addr, name: ... wizard.on_button_connected = lambda scan_wizard, bd_addr, name: ... wizard.on_completed = lambda scan_wizard, result, bd_addr, name: ... client.add_scan_wizard(wizard) """ _cnt = itertools.count() def __init__(self): self._scan_wizard_id = next(ScanWizard._cnt) self._bd_addr = None self._name = None self.on_found_private_button = lambda scan_wizard: None self.on_found_public_button = lambda scan_wizard, bd_addr, name: None self.on_button_connected = lambda scan_wizard, bd_addr, name: None self.on_completed = lambda scan_wizard, result, bd_addr, name: None class ButtonConnectionChannel: """ButtonConnectionChannel class. This class represents a connection channel to a Flic button. Add this button connection channel to a FlicClient by executing client.add_connection_channel(connection_channel). You may only have this connection channel added to one FlicClient at a time. Before you add the connection channel to the client, you should set up your callback functions by assigning the corresponding properties to this object with a function. Each callback function has a channel parameter as the first one, referencing this object. Available properties and the function parameters are: on_create_connection_channel_response: channel, error, connection_status on_removed: channel, removed_reason on_connection_status_changed: channel, connection_status, disconnect_reason on_button_up_or_down / on_button_click_or_hold / on_button_single_or_double_click / on_button_single_or_double_click_or_hold: channel, click_type, was_queued, time_diff """ _cnt = itertools.count() def __init__(self, bd_addr, latency_mode = LatencyMode.NormalLatency, auto_disconnect_time = 511): self._conn_id = next(ButtonConnectionChannel._cnt) self._bd_addr = bd_addr self._latency_mode = latency_mode self._auto_disconnect_time = auto_disconnect_time self._client = None self.on_create_connection_channel_response = lambda channel, error, connection_status: None self.on_removed = lambda channel, removed_reason: None self.on_connection_status_changed = lambda channel, connection_status, disconnect_reason: None self.on_button_up_or_down = lambda channel, click_type, was_queued, time_diff: None self.on_button_click_or_hold = lambda channel, click_type, was_queued, time_diff: None self.on_button_single_or_double_click = lambda channel, click_type, was_queued, time_diff: None self.on_button_single_or_double_click_or_hold = lambda channel, click_type, was_queued, time_diff: None @property def bd_addr(self): return self._bd_addr @property def latency_mode(self): return self._latency_mode @latency_mode.setter def latency_mode(self, latency_mode): if self._client is None: self._latency_mode = latency_mode return with self._client._lock: self._latency_mode = latency_mode if not self._client._closed: self._client._send_command("CmdChangeModeParameters", {"conn_id": self._conn_id, "latency_mode": self._latency_mode, "auto_disconnect_time": self._auto_disconnect_time}) @property def auto_disconnect_time(self): return self._auto_disconnect_time @auto_disconnect_time.setter def auto_disconnect_time(self, auto_disconnect_time): if self._client is None: self._auto_disconnect_time = auto_disconnect_time return with self._client._lock: self._auto_disconnect_time = auto_disconnect_time if not self._client._closed: self._client._send_command("CmdChangeModeParameters", {"conn_id": self._conn_id, "latency_mode": self._latency_mode, "auto_disconnect_time": self._auto_disconnect_time}) class FlicClient: """FlicClient class. When this class is constructed, a socket connection is established. You may then send commands to the server and set timers. Once you are ready with the initialization you must call the handle_events() method which is a main loop that never exits, unless the socket is closed. For a more detailed description of all commands, events and enums, check the protocol specification. All commands are wrapped in more high level functions and events are reported using callback functions. All methods called on this class will take effect only if you eventually call the handle_events() method. The ButtonScanner is used to set up a handler for advertisement packets. The ButtonConnectionChannel is used to interact with connections to flic buttons and receive their events. Other events are handled by the following callback functions that can be assigned to this object (and a list of the callback function parameters): on_new_verified_button: bd_addr on_no_space_for_new_connection: max_concurrently_connected_buttons on_got_space_for_new_connection: max_concurrently_connected_buttons on_bluetooth_controller_state_change: state """ _EVENTS = [ ("EvtAdvertisementPacket", "> 8 bytes[2] = opcode bytes += data_bytes with self._lock: if not self._closed: self._sock.sendall(bytes) def _dispatch_event(self, data): if len(data) == 0: return opcode = data[0] if opcode >= len(FlicClient._EVENTS) or FlicClient._EVENTS[opcode] is None: return event_name = FlicClient._EVENTS[opcode][0] data_tuple = FlicClient._EVENT_STRUCTS[opcode].unpack(data[1 : 1 + FlicClient._EVENT_STRUCTS[opcode].size]) items = FlicClient._EVENT_NAMED_TUPLES[opcode]._make(data_tuple)._asdict() # Process some kind of items whose data type is not supported by struct if "bd_addr" in items: items["bd_addr"] = FlicClient._bdaddr_bytes_to_string(items["bd_addr"]) if "name" in items: items["name"] = items["name"].decode("utf-8") if event_name == "EvtCreateConnectionChannelResponse": items["error"] = CreateConnectionChannelError(items["error"]) items["connection_status"] = ConnectionStatus(items["connection_status"]) if event_name == "EvtConnectionStatusChanged": items["connection_status"] = ConnectionStatus(items["connection_status"]) items["disconnect_reason"] = DisconnectReason(items["disconnect_reason"]) if event_name == "EvtConnectionChannelRemoved": items["removed_reason"] = RemovedReason(items["removed_reason"]) if event_name.startswith("EvtButton"): items["click_type"] = ClickType(items["click_type"]) if event_name == "EvtGetInfoResponse": items["bluetooth_controller_state"] = BluetoothControllerState(items["bluetooth_controller_state"]) items["my_bd_addr"] = FlicClient._bdaddr_bytes_to_string(items["my_bd_addr"]) items["my_bd_addr_type"] = BdAddrType(items["my_bd_addr_type"]) items["bd_addr_of_verified_buttons"] = [] pos = FlicClient._EVENT_STRUCTS[opcode].size for i in range(items["nb_verified_buttons"]): items["bd_addr_of_verified_buttons"].append(FlicClient._bdaddr_bytes_to_string(data[1 + pos : 1 + pos + 6])) pos += 6 if event_name == "EvtBluetoothControllerStateChange": items["state"] = BluetoothControllerState(items["state"]) if event_name == "EvtGetButtonUUIDResponse": items["uuid"] = "".join(map(lambda x: "%02x" % x, items["uuid"])) if items["uuid"] == "00000000000000000000000000000000": items["uuid"] = None if event_name == "EvtScanWizardCompleted": items["result"] = ScanWizardResult(items["result"]) # Process event if event_name == "EvtAdvertisementPacket": scanner = self._scanners.get(items["scan_id"]) if scanner is not None: scanner.on_advertisement_packet(scanner, items["bd_addr"], items["name"], items["rssi"], items["is_private"], items["already_verified"]) if event_name == "EvtCreateConnectionChannelResponse": channel = self._connection_channels[items["conn_id"]] if items["error"] != CreateConnectionChannelError.NoError: del self._connection_channels[items["conn_id"]] channel.on_create_connection_channel_response(channel, items["error"], items["connection_status"]) if event_name == "EvtConnectionStatusChanged": channel = self._connection_channels[items["conn_id"]] channel.on_connection_status_changed(channel, items["connection_status"], items["disconnect_reason"]) if event_name == "EvtConnectionChannelRemoved": channel = self._connection_channels[items["conn_id"]] del self._connection_channels[items["conn_id"]] channel.on_removed(channel, items["removed_reason"]) if event_name == "EvtButtonUpOrDown": channel = self._connection_channels[items["conn_id"]] channel.on_button_up_or_down(channel, items["click_type"], items["was_queued"], items["time_diff"]) if event_name == "EvtButtonClickOrHold": channel = self._connection_channels[items["conn_id"]] channel.on_button_click_or_hold(channel, items["click_type"], items["was_queued"], items["time_diff"]) if event_name == "EvtButtonSingleOrDoubleClick": channel = self._connection_channels[items["conn_id"]] channel.on_button_single_or_double_click(channel, items["click_type"], items["was_queued"], items["time_diff"]) if event_name == "EvtButtonSingleOrDoubleClickOrHold": channel = self._connection_channels[items["conn_id"]] channel.on_button_single_or_double_click_or_hold(channel, items["click_type"], items["was_queued"], items["time_diff"]) if event_name == "EvtNewVerifiedButton": self.on_new_verified_button(items["bd_addr"]) if event_name == "EvtGetInfoResponse": self._get_info_response_queue.get()(items) if event_name == "EvtNoSpaceForNewConnection": self.on_no_space_for_new_connection(items["max_concurrently_connected_buttons"]) if event_name == "EvtGotSpaceForNewConnection": self.on_got_space_for_new_connection(items["max_concurrently_connected_buttons"]) if event_name == "EvtBluetoothControllerStateChange": self.on_bluetooth_controller_state_change(items["state"]) if event_name == "EvtGetButtonUUIDResponse": self._get_button_uuid_queue.get()(items["bd_addr"], items["uuid"]) if event_name == "EvtScanWizardFoundPrivateButton": scan_wizard = self._scan_wizards[items["scan_wizard_id"]] scan_wizard.on_found_private_button(scan_wizard) if event_name == "EvtScanWizardFoundPublicButton": scan_wizard = self._scan_wizards[items["scan_wizard_id"]] scan_wizard._bd_addr = items["bd_addr"] scan_wizard._name = items["name"] scan_wizard.on_found_public_button(scan_wizard, scan_wizard._bd_addr, scan_wizard._name) if event_name == "EvtScanWizardButtonConnected": scan_wizard = self._scan_wizards[items["scan_wizard_id"]] scan_wizard.on_button_connected(scan_wizard, scan_wizard._bd_addr, scan_wizard._name) if event_name == "EvtScanWizardCompleted": scan_wizard = self._scan_wizards[items["scan_wizard_id"]] del self._scan_wizards[items["scan_wizard_id"]] scan_wizard.on_completed(scan_wizard, items["result"], scan_wizard._bd_addr, scan_wizard._name) def _handle_one_event(self): if len(self._timers.queue) > 0: current_timer = self._timers.queue[0] timeout = max(current_timer[0] - time.monotonic(), 0) if timeout == 0: self._timers.get()[1]() return True if len(select.select([self._sock], [], [], timeout)[0]) == 0: return True len_arr = bytearray(2) view = memoryview(len_arr) toread = 2 while toread > 0: nbytes = self._sock.recv_into(view, toread) if nbytes == 0: return False view = view[nbytes:] toread -= nbytes packet_len = len_arr[0] | (len_arr[1] << 8) data = bytearray(packet_len) view = memoryview(data) toread = packet_len while toread > 0: nbytes = self._sock.recv_into(view, toread) if nbytes == 0: return False view = view[nbytes:] toread -= nbytes self._dispatch_event(data) return True def handle_events(self): """Start the main loop for this client. This method will not return until the socket has been closed. Once it has returned, any use of this FlicClient is illegal. """ self._handle_event_thread_ident = threading.get_ident() while not self._closed: if not self._handle_one_event(): break self._sock.close()