platypush/platypush/backend/button/flic/__init__.py

103 lines
2.9 KiB
Python

import importlib
import logging
from enum import Enum
from threading import Timer
from time import time
from platypush.backend import Backend
from platypush.message.event.button.flic import FlicButtonEvent
from .fliclib.fliclib import FlicClient, ButtonConnectionChannel, ClickType
class ButtonFlicBackend(Backend):
_long_press_timeout = 0.3
_btn_timeout = 0.5
ShortPressEvent = "ShortPressEvent"
LongPressEvent = "LongPressEvent"
def __init__(self, server, long_press_timeout=_long_press_timeout,
btn_timeout=_btn_timeout, **kwargs):
super().__init__(**kwargs)
self.server = server
self.client = FlicClient(server)
self.client.get_info(self._received_info())
self.client.on_new_verified_button = self._got_button()
self._long_press_timeout = long_press_timeout
self._btn_timeout = btn_timeout
self._btn_timer = None
self._btn_addr = None
self._down_pressed_time = None
self._cur_sequence = []
logging.info('Initialized Flic buttons backend on {}'.format(self.server))
def _got_button(self):
def _f(bd_addr):
cc = ButtonConnectionChannel(bd_addr)
cc.on_button_up_or_down = \
lambda channel, click_type, was_queued, time_diff: \
self._on_event()(bd_addr, channel, click_type, was_queued, time_diff)
self.client.add_connection_channel(cc)
return _f
def _received_info(self):
def _f(items):
for bd_addr in items["bd_addr_of_verified_buttons"]:
self._got_button()(bd_addr)
return _f
def _on_btn_timeout(self):
def _f():
logging.info('Flic event triggered from {}: {}'.format(
self._btn_addr, self._cur_sequence))
self.bus.post(FlicButtonEvent(
btn_addr=self._btn_addr, sequence=self._cur_sequence))
self._cur_sequence = []
return _f
def _on_event(self):
def _f(bd_addr, channel, click_type, was_queued, time_diff):
if was_queued:
return
if self._btn_timer:
self._btn_timer.cancel()
if click_type == ClickType.ButtonDown:
self._down_pressed_time = time()
return
btn_event = self.ShortPressEvent
if self._down_pressed_time:
if time() - self._down_pressed_time >= self._long_press_timeout:
btn_event = self.LongPressEvent
self._down_pressed_time = None
self._cur_sequence.append(btn_event)
self._btn_addr = bd_addr
self._btn_timer = Timer(self._btn_timeout, self._on_btn_timeout())
self._btn_timer.start()
return _f
def send_message(self, msg):
pass
def run(self):
super().run()
self.client.handle_events()
# vim:sw=4:ts=4:et: