platypush/platypush/plugins/light/hue/__init__.py

279 lines
9.3 KiB
Python
Raw Normal View History

import random
2017-12-11 04:45:55 +01:00
import time
2017-12-11 03:53:26 +01:00
from enum import Enum
from threading import Thread
from redis import Redis
from redis.exceptions import TimeoutError as QueueTimeoutError
2017-12-11 03:53:26 +01:00
from phue import Bridge
from platypush.message.response import Response
2017-12-11 03:53:26 +01:00
from .. import LightPlugin
class LightHuePlugin(LightPlugin):
2017-12-18 01:10:51 +01:00
""" Philips Hue lights plugin """
2017-12-11 03:53:26 +01:00
MAX_BRI = 255
MAX_SAT = 255
MAX_HUE = 65535
ANIMATION_CTRL_QUEUE_NAME = 'platypush/light/hue/AnimationCtrl'
class Animation(Enum):
COLOR_TRANSITION = 'color_transition'
BLINK = 'blink'
def __eq__(self, other):
if isinstance(other, str):
return self.value == other
elif isinstance(other, self.__class__):
return self == other
2017-12-11 03:53:26 +01:00
2017-12-18 01:10:51 +01:00
def __init__(self, bridge, lights=None, groups=None):
"""
Constructor
Params:
bridge -- Bridge address or hostname
lights -- Lights to be controlled (default: all)
groups -- Groups to be controlled (default: all)
"""
super().__init__()
self.bridge_address = bridge
2017-12-11 03:53:26 +01:00
self.bridge = None
2018-06-06 20:09:18 +02:00
self.logger.info('Initializing Hue lights plugin - bridge: "{}"'.
2017-12-11 03:53:26 +01:00
format(self.bridge_address))
self.connect()
self.lights = []; self.groups = []
2017-12-18 01:10:51 +01:00
if lights:
self.lights = lights
elif groups:
self.groups = groups
self._expand_groups()
2017-12-11 03:53:26 +01:00
else:
self.lights = [l.name for l in self.bridge.lights]
self.redis = None
self.animation_thread = None
2018-06-06 20:09:18 +02:00
self.logger.info('Configured lights: "{}"'. format(self.lights))
2017-12-11 03:53:26 +01:00
2017-12-18 01:10:51 +01:00
def _expand_groups(self):
groups = [g for g in self.bridge.groups if g.name in self.groups]
2017-12-11 03:53:26 +01:00
for g in groups:
self.lights.extend([l.name for l in g.lights])
def connect(self):
# Lazy init
if not self.bridge:
self.bridge = Bridge(self.bridge_address)
2018-06-06 20:09:18 +02:00
self.logger.info('Bridge connected')
2017-12-11 03:53:26 +01:00
2017-12-11 04:18:25 +01:00
self.get_scenes()
2017-12-11 03:53:26 +01:00
# uncomment these lines if you're running huectrl for
# the first time and you need to pair it to the switch
# self.bridge.connect()
# self.bridge.get_api()
else:
2018-06-06 20:09:18 +02:00
self.logger.info('Bridge already connected')
2017-12-11 03:53:26 +01:00
2017-12-11 04:18:25 +01:00
def get_scenes(self):
2018-03-27 23:13:42 +02:00
return Response(output=self.bridge.get_scene())
def get_lights(self):
return Response(output=self.bridge.get_light())
def get_groups(self):
return Response(output=self.bridge.get_group())
2017-12-11 04:18:25 +01:00
def _exec(self, attr, *args, **kwargs):
2017-12-11 03:53:26 +01:00
try:
self.connect()
except Exception as e:
# Reset bridge connection
self.bridge = None
raise e
2017-12-11 03:53:26 +01:00
lights = []; groups = []
if 'lights' in kwargs and kwargs['lights']:
lights = kwargs.pop('lights').split(',').strip() \
if isinstance(lights, str) else kwargs.pop('lights')
2018-03-27 23:13:42 +02:00
elif 'groups' in kwargs and kwargs['groups']:
groups = kwargs.pop('groups').split(',').strip() \
if isinstance(groups, str) else kwargs.pop('groups')
2017-12-11 03:53:26 +01:00
else:
lights = self.lights
groups = self.groups
try:
2017-12-11 04:18:25 +01:00
if attr == 'scene':
self.bridge.run_scene(groups[0], kwargs.pop('name'))
2017-12-11 04:18:25 +01:00
elif groups:
self.bridge.set_group(groups, attr, *args, **kwargs)
2017-12-11 03:53:26 +01:00
elif lights:
self.bridge.set_light(lights, attr, *args, **kwargs)
2017-12-11 03:53:26 +01:00
except Exception as e:
# Reset bridge connection
self.bridge = None
raise e
return Response(output='ok')
2017-12-11 03:53:26 +01:00
def set_light(self, light, **kwargs):
self.connect()
self.bridge.set_light(light, **kwargs)
return Response(output='ok')
def set_group(self, group, **kwargs):
self.connect()
self.bridge.set_group(group, **kwargs)
return Response(output='ok')
2017-12-11 03:53:26 +01:00
def on(self, lights=[], groups=[]):
return self._exec('on', True, lights=lights, groups=groups)
2017-12-11 03:53:26 +01:00
def off(self, lights=[], groups=[]):
2018-04-09 02:04:07 +02:00
return self._exec('on', False, lights=lights, groups=groups)
2017-12-11 03:53:26 +01:00
def bri(self, value, lights=[], groups=[]):
return self._exec('bri', int(value) % (self.MAX_BRI+1),
2017-12-11 04:18:25 +01:00
lights=lights, groups=groups)
2017-12-11 03:53:26 +01:00
def sat(self, value, lights=[], groups=[]):
return self._exec('sat', int(value) % (self.MAX_SAT+1),
2017-12-11 04:18:25 +01:00
lights=lights, groups=groups)
def hue(self, value, lights=[], groups=[]):
return self._exec('hue', int(value) % (self.MAX_HUE+1),
2017-12-11 04:18:25 +01:00
lights=lights, groups=groups)
def scene(self, name, lights=[], groups=[]):
return self._exec('scene', name=name, lights=lights, groups=groups)
2017-12-11 03:53:26 +01:00
def stop_animation(self):
if not self.redis:
self.logger.info('No animation is currently running')
return
self.redis.rpush(self.ANIMATION_CTRL_QUEUE_NAME, 'STOP')
def animate(self, animation, duration=None,
hue_range=[0, MAX_HUE], sat_range=[0, MAX_SAT],
bri_range=[MAX_BRI-1, MAX_BRI], lights=None, groups=None,
hue_step=1000, sat_step=2, bri_step=1, transition_seconds=1.0):
def _initialize_light_attrs(lights):
if animation == self.Animation.COLOR_TRANSITION:
return { l: {
'hue': random.randint(hue_range[0], hue_range[1]),
'sat': random.randint(sat_range[0], sat_range[1]),
'bri': random.randint(bri_range[0], bri_range[1]),
} for l in lights }
elif animation == self.Animation.BLINK:
return { l: {
'on': True,
'bri': self.MAX_BRI,
'transitiontime': 0,
} for l in lights }
def _next_light_attrs(lights):
if animation == self.Animation.COLOR_TRANSITION:
for (light, attrs) in lights.items():
for (attr, value) in attrs.items():
if attr == 'hue':
attr_range = hue_range
attr_step = hue_step
elif attr == 'bri':
attr_range = bri_range
attr_step = bri_step
elif attr == 'sat':
attr_range = sat_range
attr_step = sat_step
lights[light][attr] = ((value - attr_range[0] + attr_step) %
(attr_range[1]-attr_range[0]+1)) + \
attr_range[0]
elif animation == self.Animation.BLINK:
lights = { light: {
'on': False if attrs['on'] else True,
'bri': self.MAX_BRI,
'transitiontime': 0,
} for (light, attrs) in lights.items() }
return lights
def _should_stop():
try:
self.redis.blpop(self.ANIMATION_CTRL_QUEUE_NAME)
return True
except QueueTimeoutError:
return False
def _animate_thread(lights):
self.logger.info('Starting {} animation'.format(
animation, (lights or groups)))
lights = _initialize_light_attrs(lights)
animation_start_time = time.time()
stop_animation = False
while True:
if stop_animation or \
(duration and time.time() - animation_start_time > duration):
break
if animation == self.Animation.COLOR_TRANSITION:
for (light, attrs) in lights.items():
self.logger.info('Setting {} to {}'.format(light, attrs))
self.bridge.set_light(light, attrs)
stop_animation = _should_stop()
if stop_animation: break
elif animation == self.Animation.BLINK:
conf = lights[list(lights.keys())[0]]
self.logger.info('Setting lights to {}'.format(conf))
if groups:
self.bridge.set_group([g.name for f in groups], conf)
else:
self.bridge.set_light(lights.keys(), conf)
stop_animation = _should_stop()
if stop_animation: break
lights = _next_light_attrs(lights)
self.logger.info('Stopping animation')
self.animation_thread = None
self.redis = None
self.redis = Redis(socket_timeout=transition_seconds)
if groups:
groups = [g for g in self.bridge.groups if g.name in groups]
lights = lights or []
for g in groups:
lights.extend([l.name for l in g.lights])
elif not lights:
lights = self.lights
if self.animation_thread and self.animation_thread.is_alive():
self.stop_animation()
self.animation_thread = Thread(target=_animate_thread, args=(lights,))
self.animation_thread.start()
return Response(output='ok')
2017-12-11 03:53:26 +01:00
# vim:sw=4:ts=4:et: