Major refactor for the `light.hue` plugin.

- Added support for lights as native platform entities.
- Improved performance by using the JSON API objects whenever possible
  to interact with the bridge instead of the native Python objects,
  which perform a bunch of lazy API calls under the hood resulting in
  degraded performance.
- Fixed lights animation attributes by setting only the ones actually
  supported by a light.
- Several LINT fixes.
This commit is contained in:
Fabio Manganiello 2022-04-30 01:07:00 +02:00
parent 975d37c562
commit 8d57cf06c2
Signed by: blacklight
GPG Key ID: D90FBA7F76362774
2 changed files with 366 additions and 198 deletions

View File

@ -1,10 +1,12 @@
from abc import ABC, abstractmethod
from platypush.plugins import action
from platypush.plugins.switch import SwitchPlugin
from platypush.entities import manages
from platypush.entities.lights import Light
from platypush.plugins import Plugin, action
class LightPlugin(SwitchPlugin, ABC):
@manages(Light)
class LightPlugin(Plugin, ABC):
"""
Abstract plugin to interface your logic with lights/bulbs.
"""
@ -12,19 +14,27 @@ class LightPlugin(SwitchPlugin, ABC):
@action
@abstractmethod
def on(self):
""" Turn the light on """
"""Turn the light on"""
raise NotImplementedError()
@action
@abstractmethod
def off(self):
""" Turn the light off """
"""Turn the light off"""
raise NotImplementedError()
@action
@abstractmethod
def toggle(self):
""" Toggle the light status (on/off) """
"""Toggle the light status (on/off)"""
raise NotImplementedError()
@action
@abstractmethod
def status(self):
"""
Get the current status of the lights.
"""
raise NotImplementedError()

View File

@ -4,10 +4,15 @@ import time
from enum import Enum
from threading import Thread, Event
from typing import List
from typing import Iterable, Union, Mapping, Any, Set
from platypush.context import get_bus
from platypush.message.event.light import LightAnimationStartedEvent, LightAnimationStoppedEvent
from platypush.entities import Entity
from platypush.entities.lights import Light as LightEntity
from platypush.message.event.light import (
LightAnimationStartedEvent,
LightAnimationStoppedEvent,
)
from platypush.plugins import action
from platypush.plugins.light import LightPlugin
from platypush.utils import set_thread_name
@ -34,6 +39,7 @@ class LightHuePlugin(LightPlugin):
ANIMATION_CTRL_QUEUE_NAME = 'platypush/light/hue/AnimationCtrl'
_BRIDGE_RECONNECT_SECONDS = 5
_MAX_RECONNECT_TRIES = 5
_UNINITIALIZED_BRIDGE_ERR = 'The Hue bridge is not initialized'
class Animation(Enum):
COLOR_TRANSITION = 'color_transition'
@ -61,32 +67,43 @@ class LightHuePlugin(LightPlugin):
self.bridge_address = bridge
self.bridge = None
self.logger.info('Initializing Hue lights plugin - bridge: "{}"'.format(self.bridge_address))
self.logger.info(
'Initializing Hue lights plugin - bridge: "{}"'.format(self.bridge_address)
)
self.connect()
self.lights = []
self.groups = []
self.lights = set()
self.groups = set()
if lights:
self.lights = lights
self.lights = set(lights)
elif groups:
self.groups = groups
self._expand_groups()
self.groups = set(groups)
self.lights.update(self._expand_groups(self.groups))
else:
# noinspection PyUnresolvedReferences
self.lights = [light.name for light in self.bridge.lights]
self.lights = {light['name'] for light in self._get_lights().values()}
self.animation_thread = None
self.animations = {}
self._animation_stop = Event()
self._init_animations()
self.logger.info('Configured lights: "{}"'.format(self.lights))
self.logger.info(f'Configured lights: {self.lights}')
def _expand_groups(self):
groups = [g for g in self.bridge.groups if g.name in self.groups]
for group in groups:
for light in group.lights:
self.lights += [light.name]
def _expand_groups(self, groups: Iterable[str]) -> Set[str]:
lights = set()
light_id_to_name = {
light_id: light['name'] for light_id, light in self._get_lights().items()
}
groups_ = [g for g in self._get_groups().values() if g.get('name') in groups]
for group in groups_:
for light_id in group.get('lights', []):
light_name = light_id_to_name.get(light_id)
if light_name:
lights.add(light_name)
return lights
def _init_animations(self):
self.animations = {
@ -94,10 +111,10 @@ class LightHuePlugin(LightPlugin):
'lights': {},
}
for group in self.bridge.groups:
self.animations['groups'][group.group_id] = None
for light in self.bridge.lights:
self.animations['lights'][light.light_id] = None
for group_id in self._get_groups():
self.animations['groups'][group_id] = None
for light_id in self._get_lights():
self.animations['lights'][light_id] = None
@action
def connect(self):
@ -110,6 +127,7 @@ class LightHuePlugin(LightPlugin):
# Lazy init
if not self.bridge:
from phue import Bridge, PhueRegistrationException
success = False
n_tries = 0
@ -119,12 +137,14 @@ class LightHuePlugin(LightPlugin):
self.bridge = Bridge(self.bridge_address)
success = True
except PhueRegistrationException as e:
self.logger.warning('Bridge registration error: {}'.
format(str(e)))
self.logger.warning('Bridge registration error: {}'.format(str(e)))
if n_tries >= self._MAX_RECONNECT_TRIES:
self.logger.error(('Bridge registration failed after ' +
'{} attempts').format(n_tries))
self.logger.error(
(
'Bridge registration failed after ' + '{} attempts'
).format(n_tries)
)
break
time.sleep(self._BRIDGE_RECONNECT_SECONDS)
@ -168,7 +188,7 @@ class LightHuePlugin(LightPlugin):
'id': id,
**scene,
}
for id, scene in self.bridge.get_scene().items()
for id, scene in self._get_scenes().items()
}
@action
@ -215,7 +235,7 @@ class LightHuePlugin(LightPlugin):
'id': id,
**light,
}
for id, light in self.bridge.get_light().items()
for id, light in self._get_lights().items()
}
@action
@ -273,7 +293,7 @@ class LightHuePlugin(LightPlugin):
'id': id,
**group,
}
for id, group in self.bridge.get_group().items()
for id, group in self._get_groups().items()
}
@action
@ -321,15 +341,22 @@ class LightHuePlugin(LightPlugin):
self.bridge = None
raise e
assert self.bridge, self._UNINITIALIZED_BRIDGE_ERR
lights = []
groups = []
if 'lights' in kwargs:
lights = kwargs.pop('lights').split(',').strip() \
if isinstance(lights, str) else kwargs.pop('lights')
lights = (
kwargs.pop('lights').split(',').strip()
if isinstance(lights, str)
else kwargs.pop('lights')
)
if 'groups' in kwargs:
groups = kwargs.pop('groups').split(',').strip() \
if isinstance(groups, str) else kwargs.pop('groups')
groups = (
kwargs.pop('groups').split(',').strip()
if isinstance(groups, str)
else kwargs.pop('groups')
)
if not lights and not groups:
lights = self.lights
@ -340,12 +367,13 @@ class LightHuePlugin(LightPlugin):
try:
if attr == 'scene':
self.bridge.run_scene(groups[0], kwargs.pop('name'))
assert groups, 'No groups specified'
self.bridge.run_scene(list(groups)[0], kwargs.pop('name'))
else:
if groups:
self.bridge.set_group(groups, attr, *args, **kwargs)
self.bridge.set_group(list(groups), attr, *args, **kwargs)
if lights:
self.bridge.set_light(lights, attr, *args, **kwargs)
self.bridge.set_light(list(lights), attr, *args, **kwargs)
except Exception as e:
# Reset bridge connection
self.bridge = None
@ -375,6 +403,7 @@ class LightHuePlugin(LightPlugin):
"""
self.connect()
assert self.bridge, self._UNINITIALIZED_BRIDGE_ERR
self.bridge.set_light(light, **kwargs)
@action
@ -382,7 +411,8 @@ class LightHuePlugin(LightPlugin):
"""
Set a group (or groups) property.
:param group: Group or groups to set. Can be a string representing the group name, a group object, a list of strings, or a list of group objects.
:param group: Group or groups to set. It can be a string representing the
group name, a group object, a list of strings, or a list of group objects.
:param kwargs: key-value list of parameters to set.
Example call::
@ -400,6 +430,7 @@ class LightHuePlugin(LightPlugin):
"""
self.connect()
assert self.bridge, self._UNINITIALIZED_BRIDGE_ERR
self.bridge.set_group(group, **kwargs)
@action
@ -451,15 +482,16 @@ class LightHuePlugin(LightPlugin):
groups_off = []
if groups:
all_groups = self.bridge.get_group().values()
all_groups = self._get_groups().values()
groups_on = [
group['name'] for group in all_groups
group['name']
for group in all_groups
if group['name'] in groups and group['state']['any_on'] is True
]
groups_off = [
group['name'] for group in all_groups
group['name']
for group in all_groups
if group['name'] in groups and group['state']['any_on'] is False
]
@ -467,15 +499,17 @@ class LightHuePlugin(LightPlugin):
lights = self.lights
if lights:
all_lights = self.bridge.get_light().values()
all_lights = self._get_lights().values()
lights_on = [
light['name'] for light in all_lights
light['name']
for light in all_lights
if light['name'] in lights and light['state']['on'] is True
]
lights_off = [
light['name'] for light in all_lights
light['name']
for light in all_lights
if light['name'] in lights and light['state']['on'] is False
]
@ -499,8 +533,13 @@ class LightHuePlugin(LightPlugin):
groups = []
if lights is None:
lights = []
return self._exec('bri', int(value) % (self.MAX_BRI + 1),
lights=lights, groups=groups, **kwargs)
return self._exec(
'bri',
int(value) % (self.MAX_BRI + 1),
lights=lights,
groups=groups,
**kwargs,
)
@action
def sat(self, value, lights=None, groups=None, **kwargs):
@ -516,8 +555,13 @@ class LightHuePlugin(LightPlugin):
groups = []
if lights is None:
lights = []
return self._exec('sat', int(value) % (self.MAX_SAT + 1),
lights=lights, groups=groups, **kwargs)
return self._exec(
'sat',
int(value) % (self.MAX_SAT + 1),
lights=lights,
groups=groups,
**kwargs,
)
@action
def hue(self, value, lights=None, groups=None, **kwargs):
@ -533,8 +577,13 @@ class LightHuePlugin(LightPlugin):
groups = []
if lights is None:
lights = []
return self._exec('hue', int(value) % (self.MAX_HUE + 1),
lights=lights, groups=groups, **kwargs)
return self._exec(
'hue',
int(value) % (self.MAX_HUE + 1),
lights=lights,
groups=groups,
**kwargs,
)
@action
def xy(self, value, lights=None, groups=None, **kwargs):
@ -584,25 +633,31 @@ class LightHuePlugin(LightPlugin):
lights = []
if lights:
bri = statistics.mean([
light['state']['bri']
for light in self.bridge.get_light().values()
if light['name'] in lights
])
bri = statistics.mean(
[
light['state']['bri']
for light in self._get_lights().values()
if light['name'] in lights
]
)
elif groups:
bri = statistics.mean([
group['action']['bri']
for group in self.bridge.get_group().values()
if group['name'] in groups
])
bri = statistics.mean(
[
group['action']['bri']
for group in self._get_groups().values()
if group['name'] in groups
]
)
else:
bri = statistics.mean([
light['state']['bri']
for light in self.bridge.get_light().values()
if light['name'] in self.lights
])
bri = statistics.mean(
[
light['state']['bri']
for light in self._get_lights().values()
if light['name'] in self.lights
]
)
delta *= (self.MAX_BRI / 100)
delta *= self.MAX_BRI / 100
if bri + delta < 0:
bri = 0
elif bri + delta > self.MAX_BRI:
@ -628,25 +683,31 @@ class LightHuePlugin(LightPlugin):
lights = []
if lights:
sat = statistics.mean([
light['state']['sat']
for light in self.bridge.get_light().values()
if light['name'] in lights
])
sat = statistics.mean(
[
light['state']['sat']
for light in self._get_lights().values()
if light['name'] in lights
]
)
elif groups:
sat = statistics.mean([
group['action']['sat']
for group in self.bridge.get_group().values()
if group['name'] in groups
])
sat = statistics.mean(
[
group['action']['sat']
for group in self._get_groups().values()
if group['name'] in groups
]
)
else:
sat = statistics.mean([
light['state']['sat']
for light in self.bridge.get_light().values()
if light['name'] in self.lights
])
sat = statistics.mean(
[
light['state']['sat']
for light in self._get_lights().values()
if light['name'] in self.lights
]
)
delta *= (self.MAX_SAT / 100)
delta *= self.MAX_SAT / 100
if sat + delta < 0:
sat = 0
elif sat + delta > self.MAX_SAT:
@ -672,25 +733,31 @@ class LightHuePlugin(LightPlugin):
lights = []
if lights:
hue = statistics.mean([
light['state']['hue']
for light in self.bridge.get_light().values()
if light['name'] in lights
])
hue = statistics.mean(
[
light['state']['hue']
for light in self._get_lights().values()
if light['name'] in lights
]
)
elif groups:
hue = statistics.mean([
group['action']['hue']
for group in self.bridge.get_group().values()
if group['name'] in groups
])
hue = statistics.mean(
[
group['action']['hue']
for group in self._get_groups().values()
if group['name'] in groups
]
)
else:
hue = statistics.mean([
light['state']['hue']
for light in self.bridge.get_light().values()
if light['name'] in self.lights
])
hue = statistics.mean(
[
light['state']['hue']
for light in self._get_lights().values()
if light['name'] in self.lights
]
)
delta *= (self.MAX_HUE / 100)
delta *= self.MAX_HUE / 100
if hue + delta < 0:
hue = 0
elif hue + delta > self.MAX_HUE:
@ -734,10 +801,20 @@ class LightHuePlugin(LightPlugin):
self._init_animations()
@action
def animate(self, animation, duration=None,
hue_range=None, sat_range=None,
bri_range=None, lights=None, groups=None,
hue_step=1000, sat_step=2, bri_step=1, transition_seconds=1.0):
def animate(
self,
animation,
duration=None,
hue_range=None,
sat_range=None,
bri_range=None,
lights=None,
groups=None,
hue_step=1000,
sat_step=2,
bri_step=1,
transition_seconds=1.0,
):
"""
Run a lights animation.
@ -747,28 +824,33 @@ class LightHuePlugin(LightPlugin):
:param duration: Animation duration in seconds (default: None, i.e. continue until stop)
:type duration: float
:param hue_range: If you selected a ``color_transition``, this will specify the hue range of your color ``color_transition``.
Default: [0, 65535]
:param hue_range: If you selected a ``color_transition``, this will
specify the hue range of your color ``color_transition``. Default: [0, 65535]
:type hue_range: list[int]
:param sat_range: If you selected a color ``color_transition``, this will specify the saturation range of your color
``color_transition``. Default: [0, 255]
:param sat_range: If you selected a color ``color_transition``, this
will specify the saturation range of your color ``color_transition``.
Default: [0, 255]
:type sat_range: list[int]
:param bri_range: If you selected a color ``color_transition``, this will specify the brightness range of your color
``color_transition``. Default: [254, 255] :type bri_range: list[int]
:param bri_range: If you selected a color ``color_transition``, this
will specify the brightness range of your color ``color_transition``.
Default: [254, 255] :type bri_range: list[int]
:param lights: Lights to control (names, IDs or light objects). Default: plugin default lights
:param groups: Groups to control (names, IDs or group objects). Default: plugin default groups
:param hue_step: If you selected a color ``color_transition``, this will specify by how much the color hue will change
between iterations. Default: 1000 :type hue_step: int
:param hue_step: If you selected a color ``color_transition``, this
will specify by how much the color hue will change between iterations.
Default: 1000 :type hue_step: int
:param sat_step: If you selected a color ``color_transition``, this will specify by how much the saturation will change
between iterations. Default: 2 :type sat_step: int
:param sat_step: If you selected a color ``color_transition``, this
will specify by how much the saturation will change between iterations.
Default: 2 :type sat_step: int
:param bri_step: If you selected a color ``color_transition``, this will specify by how much the brightness will change
between iterations. Default: 1 :type bri_step: int
:param bri_step: If you selected a color ``color_transition``, this
will specify by how much the brightness will change between iterations.
Default: 1 :type bri_step: int
:param transition_seconds: Time between two transitions or blinks in seconds. Default: 1.0
:type transition_seconds: float
@ -776,20 +858,26 @@ class LightHuePlugin(LightPlugin):
self.stop_animation()
self._animation_stop.clear()
all_lights = self._get_lights()
bri_range = bri_range or [self.MAX_BRI - 1, self.MAX_BRI]
sat_range = sat_range or [0, self.MAX_SAT]
hue_range = hue_range or [0, self.MAX_HUE]
if bri_range is None:
bri_range = [self.MAX_BRI - 1, self.MAX_BRI]
if sat_range is None:
sat_range = [0, self.MAX_SAT]
if hue_range is None:
hue_range = [0, self.MAX_HUE]
if groups:
groups = [g for g in self.bridge.groups if g.name in groups or g.group_id in groups]
lights = lights or []
for group in groups:
lights.extend([light.name for light in group.lights])
groups = {
group_id: group
for group_id, group in self._get_groups().items()
if group.get('name') in groups or group_id in groups
}
lights = set(lights or [])
lights.update(self._expand_groups([g['name'] for g in groups.values()]))
elif lights:
lights = [light.name for light in self.bridge.lights if light.name in lights or light.light_id in lights]
lights = {
light['name']
for light_id, light in all_lights.items()
if light['name'] in lights or int(light_id) in lights
}
else:
lights = self.lights
@ -806,26 +894,50 @@ class LightHuePlugin(LightPlugin):
}
if groups:
for group in groups:
self.animations['groups'][group.group_id] = info
for group_id in groups:
self.animations['groups'][group_id] = info
for light in self.bridge.lights:
if light.name in lights:
self.animations['lights'][light.light_id] = info
for light_id, light in all_lights.items():
if light['name'] in lights:
self.animations['lights'][light_id] = info
def _initialize_light_attrs(lights):
lights_by_name = {
light['name']: light for light in self._get_lights().values()
}
if animation == self.Animation.COLOR_TRANSITION:
return {light: {
'hue': random.randint(hue_range[0], hue_range[1]),
'sat': random.randint(sat_range[0], sat_range[1]),
'bri': random.randint(bri_range[0], bri_range[1]),
} for light in lights}
return {
light: {
**(
{'hue': random.randint(hue_range[0], hue_range[1])} # type: ignore
if 'hue' in lights_by_name.get(light, {}).get('state', {})
else {}
),
**(
{'sat': random.randint(sat_range[0], sat_range[1])} # type: ignore
if 'sat' in lights_by_name.get(light, {}).get('state', {})
else {}
),
**(
{'bri': random.randint(bri_range[0], bri_range[1])} # type: ignore
if 'bri' in lights_by_name.get(light, {}).get('state', {})
else {}
),
}
for light in lights
}
elif animation == self.Animation.BLINK:
return {light: {
'on': True,
'bri': self.MAX_BRI,
'transitiontime': 0,
} for light in lights}
return {
light: {
'on': True,
**({'bri': self.MAX_BRI} if 'bri' in light else {}),
'transitiontime': 0,
}
for light in lights
}
raise AssertionError(f'Unknown animation type: {animation}')
def _next_light_attrs(lights):
if animation == self.Animation.COLOR_TRANSITION:
@ -843,15 +955,19 @@ class LightHuePlugin(LightPlugin):
else:
continue
lights[light][attr] = ((value - attr_range[0] + attr_step) %
(attr_range[1] - attr_range[0] + 1)) + \
attr_range[0]
lights[light][attr] = (
(value - attr_range[0] + attr_step)
% (attr_range[1] - attr_range[0] + 1)
) + attr_range[0]
elif animation == self.Animation.BLINK:
lights = {light: {
'on': False if attrs['on'] else True,
'bri': self.MAX_BRI,
'transitiontime': 0,
} for (light, attrs) in lights.items()}
lights = {
light: {
'on': not attrs['on'],
'bri': self.MAX_BRI,
'transitiontime': 0,
}
for (light, attrs) in lights.items()
}
return lights
@ -860,13 +976,23 @@ class LightHuePlugin(LightPlugin):
def _animate_thread(lights):
set_thread_name('HueAnimate')
get_bus().post(LightAnimationStartedEvent(lights=lights, groups=groups, animation=animation))
get_bus().post(
LightAnimationStartedEvent(
lights=lights,
groups=list((groups or {}).keys()),
animation=animation,
)
)
lights = _initialize_light_attrs(lights)
animation_start_time = time.time()
stop_animation = False
while not stop_animation and not (duration and time.time() - animation_start_time > duration):
while not stop_animation and not (
duration and time.time() - animation_start_time > duration
):
assert self.bridge, self._UNINITIALIZED_BRIDGE_ERR
try:
if animation == self.Animation.COLOR_TRANSITION:
for (light, attrs) in lights.items():
@ -877,7 +1003,9 @@ class LightHuePlugin(LightPlugin):
self.logger.debug('Setting lights to {}'.format(conf))
if groups:
self.bridge.set_group([g.name for g in groups], conf)
self.bridge.set_group(
[g['name'] for g in groups.values()], conf
)
else:
self.bridge.set_light(lights.keys(), conf)
@ -891,57 +1019,87 @@ class LightHuePlugin(LightPlugin):
lights = _next_light_attrs(lights)
get_bus().post(LightAnimationStoppedEvent(lights=lights, groups=groups, animation=animation))
get_bus().post(
LightAnimationStoppedEvent(
lights=list(lights.keys()),
groups=list((groups or {}).keys()),
animation=animation,
)
)
self.animation_thread = None
self.animation_thread = Thread(target=_animate_thread,
name='HueAnimate',
args=(lights,))
self.animation_thread = Thread(
target=_animate_thread, name='HueAnimate', args=(lights,)
)
self.animation_thread.start()
@property
def switches(self) -> List[dict]:
"""
:returns: Implements :meth:`platypush.plugins.switch.SwitchPlugin.switches` and returns the status of the
configured lights. Example:
def _get_light_attr(self, light, attr: str):
try:
return getattr(light, attr, None)
except KeyError:
return None
.. code-block:: json
def transform_entities(
self, entities: Union[Iterable[Union[dict, Entity]], Mapping[Any, dict]]
) -> Iterable[Entity]:
new_entities = []
if isinstance(entities, dict):
entities = [{'id': id, **e} for id, e in entities.items()]
[
{
"id": "3",
"name": "Lightbulb 1",
"on": true,
"bri": 254,
"hue": 1532,
"sat": 215,
"effect": "none",
"xy": [
0.6163,
0.3403
],
"ct": 153,
"alert": "none",
"colormode": "hs",
"reachable": true
"type": "Extended color light",
"modelid": "LCT001",
"manufacturername": "Philips",
"uniqueid": "00:11:22:33:44:55:66:77-88",
"swversion": "5.105.0.21169"
}
]
for entity in entities:
if isinstance(entity, Entity):
new_entities.append(entity)
elif isinstance(entity, dict):
new_entities.append(
LightEntity(
id=entity['id'],
name=entity['name'],
description=entity['type'],
on=entity.get('state', {}).get('on', False),
brightness=entity.get('state', {}).get('bri'),
saturation=entity.get('state', {}).get('sat'),
hue=entity.get('state', {}).get('hue'),
temperature=entity.get('state', {}).get('ct'),
colormode=entity.get('colormode'),
reachable=entity.get('reachable'),
data={
'effect': entity.get('state', {}).get('effect'),
'xy': entity.get('state', {}).get('xy'),
},
)
)
"""
return super().transform_entities(new_entities) # type: ignore
return [
{
'id': id,
**light.pop('state', {}),
**light,
}
for id, light in self.bridge.get_light().items()
]
def _get_lights(self) -> dict:
assert self.bridge, self._UNINITIALIZED_BRIDGE_ERR
lights = self.bridge.get_light()
self.publish_entities(lights) # type: ignore
return lights
def _get_groups(self) -> dict:
assert self.bridge, self._UNINITIALIZED_BRIDGE_ERR
groups = self.bridge.get_group() or {}
return groups
def _get_scenes(self) -> dict:
assert self.bridge, self._UNINITIALIZED_BRIDGE_ERR
scenes = self.bridge.get_scene() or {}
return scenes
@action
def status(self) -> Iterable[LightEntity]:
lights = self.transform_entities(self._get_lights())
for light in lights:
light.id = light.external_id
for attr, value in (light.data or {}).items():
setattr(light, attr, value)
del light.external_id
del light.data
return lights
# vim:sw=4:ts=4:et: