More refactors and fixes for `zigbee.mqtt`

This commit is contained in:
Fabio Manganiello 2023-01-13 02:58:47 +01:00
parent 38438230d7
commit 22a566a88b
Signed by: blacklight
GPG Key ID: D90FBA7F76362774
3 changed files with 100 additions and 113 deletions

View File

@ -1,6 +1,6 @@
<template>
<div class="entity-container-wrapper"
:class="{'with-children': hasChildren, collapsed: isCollapsed}">
:class="{'with-children': hasChildren, collapsed: isCollapsed, hidden: !value?.name?.length}">
<div class="row item entity-container"
:class="{blink: justUpdated, 'with-children': hasChildren, collapsed: isCollapsed}">
<div class="adjuster" :class="{'col-12': !hasChildren, 'col-11': hasChildren}">

View File

@ -246,7 +246,7 @@ class ZigbeeMqttBackend(MqttBackend):
self.bus.post(ZigbeeMqttDeviceConnectedEvent(device=name, **event_args))
exposes = (device.get('definition', {}) or {}).get('exposes', [])
payload = self._plugin.build_device_get_request(exposes)
payload = self._plugin._build_device_get_request(exposes)
if payload:
client.publish(
self.base_topic + '/' + name + '/get',

View File

@ -27,7 +27,6 @@ from platypush.entities.sensors import (
)
from platypush.entities.switches import Switch, EnumSwitch
from platypush.entities.temperature import TemperatureSensor
from platypush.message import Mapping
from platypush.message.response import Response
from platypush.plugins.mqtt import MqttPlugin, action
@ -175,9 +174,9 @@ class ZigbeeMqttPlugin(MqttPlugin): # lgtm [py/missing-call-to-init]
self.base_topic = base_topic
self.timeout = timeout
self._info = {
'devices': {},
'groups': {},
'devices_by_addr': {},
'devices_by_name': {},
'groups': {},
}
@staticmethod
@ -350,8 +349,8 @@ class ZigbeeMqttPlugin(MqttPlugin): # lgtm [py/missing-call-to-init]
)
# Cache the new results
self._info['devices'] = {
device.get('friendly_name', device['ieee_address']): device
self._info['devices_by_name'] = {
self._preferred_name(device): device
for device in info.get('devices', [])
}
@ -751,7 +750,7 @@ class ZigbeeMqttPlugin(MqttPlugin): # lgtm [py/missing-call-to-init]
)
@staticmethod
def build_device_get_request(values: List[Dict[str, Any]]) -> dict:
def _build_device_get_request(values: List[Dict[str, Any]]) -> dict:
def extract_value(value: dict, root: dict, depth: int = 0):
for feature in value.get('features', []):
new_root = root
@ -779,11 +778,37 @@ class ZigbeeMqttPlugin(MqttPlugin): # lgtm [py/missing-call-to-init]
return ret
def _get_device_info(self, device: str) -> Mapping:
return self._info['devices'].get(
device, self._info['devices_by_addr'].get(device, {})
def _get_device_info(self, device: str, **kwargs) -> dict:
device_info = self._info['devices_by_name'].get(
# First: check by friendly name
device,
# Second: check by address
self._info['devices_by_addr'].get(device, {}),
)
if not device_info:
# Third: try and get the device from upstream
network_info = self._get_network_info(**kwargs)
next(
iter(
d
for d in network_info.get('devices', [])
if self._device_name_matches(device, d)
),
{},
)
return device_info
@staticmethod
def _preferred_name(device: dict) -> str:
return device.get('friendly_name') or device.get('ieee_address') or ''
@classmethod
def _device_name_matches(cls, name: str, device: dict) -> bool:
name = str(cls._ieee_address(name))
return name == device.get('friendly_name') or name == device.get('ieee_address')
@action
def device_get(
self, device: str, property: Optional[str] = None, **kwargs
@ -800,9 +825,9 @@ class ZigbeeMqttPlugin(MqttPlugin): # lgtm [py/missing-call-to-init]
:return: Key->value map of the device properties.
"""
kwargs = self._mqtt_args(**kwargs)
device_info = self._get_device_info(device)
if device_info:
device = device_info.get('friendly_name') or self._ieee_address(device_info) # type: ignore
device_info = self._get_device_info(device, **kwargs)
assert device_info, f'No such device: {device}'
device = self._preferred_name(device_info)
if property:
properties = self.publish(
@ -815,40 +840,24 @@ class ZigbeeMqttPlugin(MqttPlugin): # lgtm [py/missing-call-to-init]
assert property in properties, f'No such property: {property}'
return {property: properties[property]}
if device not in self._info.get('devices', {}):
# Refresh devices info
self._get_network_info(**kwargs)
dev = self._info.get('devices', {}).get(
device, self._info.get('devices_by_addr', {}).get(device)
)
assert dev, f'No such device: {device}'
exposes = (
self._info.get('devices', {}).get(device, {}).get('definition', {}) or {}
).get('exposes', [])
exposes = (device_info.get('definition', {}) or {}).get('exposes', [])
if not exposes:
return {}
device_state = self.publish(
# If the device has no queriable properties, don't specify a reply
# topic to listen on
req = self._build_device_get_request(exposes)
reply_topic = self._topic(device)
if not req:
reply_topic = None
return self.publish(
topic=self._topic(device) + '/get',
reply_topic=self._topic(device),
msg=self.build_device_get_request(exposes),
reply_topic=reply_topic,
msg=req,
**kwargs,
).output # type: ignore[reportGeneralTypeIssues]
if device_info:
self.publish_entities( # type: ignore
[
{
**device_info,
'state': device_state,
}
]
)
return device_state
@action
def devices_get(
self, devices: Optional[List[str]] = None, **kwargs
@ -879,8 +888,9 @@ class ZigbeeMqttPlugin(MqttPlugin): # lgtm [py/missing-call-to-init]
if not devices:
devices = list(
{
device['friendly_name'] or device['ieee_address']
self._preferred_name(device)
for device in self.devices(**kwargs).output # type: ignore[reportGeneralTypeIssues]
if self._preferred_name(device)
}
)
@ -942,40 +952,27 @@ class ZigbeeMqttPlugin(MqttPlugin): # lgtm [py/missing-call-to-init]
(default: query the default configured device).
"""
msg = (values or {}).copy()
reply_topic = self._topic(device)
reply_topic = None
device_info = self._get_device_info(device, **kwargs)
assert device_info, f'No such device: {device}'
device = self._preferred_name(device_info)
if property:
dev_def = (
self._info.get('devices_by_addr', {}).get(device, {}).get('definition')
or {}
)
# Check if we're trying to set an option
stored_option = self._get_options(device_info).get(property)
if stored_option:
return self.device_set_option(device, property, value, **kwargs)
stored_property = next(
iter(
exposed
for exposed in dev_def.get('exposes', {})
if exposed.get('property') == property
),
None,
)
# Check if it's a property
reply_topic = self._topic(device)
stored_property = self._get_properties(device_info).get(property)
assert stored_property, f'No such property: {property}'
if stored_property:
msg[property] = value
else:
stored_property = next(
iter(
option
for option in dev_def.get('options', {})
if option.get('property') == property
),
None,
)
# Set the new value on the message
msg[property] = value
if stored_property:
return self.device_set_option(device, property, value, **kwargs)
if stored_property and self._is_write_only(stored_property):
# Don't wait for an update from a value that is not readable
# Don't wait for an update from a value that is not readable
if self._is_write_only(stored_property):
reply_topic = None
properties = self.publish(
@ -986,7 +983,9 @@ class ZigbeeMqttPlugin(MqttPlugin): # lgtm [py/missing-call-to-init]
).output # type: ignore[reportGeneralTypeIssues]
if property and reply_topic:
assert property in properties, 'No such property: ' + property
assert (
property in properties
), f'Could not retrieve the new state for {property}'
return {property: properties[property]}
return properties
@ -1448,10 +1447,9 @@ class ZigbeeMqttPlugin(MqttPlugin): # lgtm [py/missing-call-to-init]
"""
Turn on/set to true a switch, a binary property or an option.
"""
switch = self._get_switch(device)
address, prop = self._ieee_address(str(switch.id), with_property=True)
device, prop_info = self._get_switch_info(device)
self.device_set(
address, prop, switch.data.get('value_on', 'ON')
device, prop_info['property'], prop_info.get('value_on', 'ON')
).output # type: ignore[reportGeneralTypeIssues]
@action
@ -1459,10 +1457,9 @@ class ZigbeeMqttPlugin(MqttPlugin): # lgtm [py/missing-call-to-init]
"""
Turn off/set to false a switch, a binary property or an option.
"""
switch = self._get_switch(device)
address, prop = self._ieee_address(str(switch.id), with_property=True)
device, prop_info = self._get_switch_info(device)
self.device_set(
address, prop, switch.data.get('value_on', 'OFF')
device, prop_info['property'], prop_info.get('value_off', 'OFF')
).output # type: ignore[reportGeneralTypeIssues]
@action
@ -1470,40 +1467,36 @@ class ZigbeeMqttPlugin(MqttPlugin): # lgtm [py/missing-call-to-init]
"""
Toggles the state of a switch, a binary property or an option.
"""
switch = self._get_switch(device)
address, prop = self._ieee_address(str(switch.id), with_property=True)
device, prop_info = self._get_switch_info(device)
prop = prop_info['property']
device_state = self.device_get(device).output # type: ignore
self.device_set(
address,
device,
prop,
switch.data.get(
prop_info.get(
'value_toggle',
'OFF' if switch.state == switch.data.get('value_on', 'ON') else 'ON',
'OFF'
if device_state.get(prop) == prop_info.get('value_on', 'ON')
else 'ON',
),
).output # type: ignore[reportGeneralTypeIssues]
def _get_switch(self, name: str) -> Switch:
address, prop = self._ieee_address(name, with_property=True)
all_switches = self._get_all_switches()
entity_id = f'{address}:state'
if prop:
entity_id = f'{address}:{prop}'
def _get_switch_info(self, name: str) -> Tuple[str, dict]:
name, prop = self._ieee_address(name, with_property=True)
if not prop or prop == 'light':
prop = 'state'
switch = all_switches.get(entity_id)
assert switch, f'No such entity ID: {entity_id}'
return switch
device_info = self._get_device_info(name)
assert device_info, f'No such device: {name}'
name = self._preferred_name(device_info)
def _get_all_switches(self) -> Dict[str, Switch]:
devices = self.devices().output # type: ignore[reportGeneralTypeIssues]
all_switches = {}
property = self._get_properties(device_info).get(prop)
option = self._get_options(device_info).get(prop)
if option:
return name, option
for device in devices:
exposed = self._get_properties(device)
options = self._get_options(device)
switches = self._get_switches(device, exposed, options)
for switch in switches:
all_switches[switch.id] = switch
return all_switches
assert property, f'No such property on device {name}: {prop}'
return name, property
@staticmethod
def _is_read_only(feature: dict) -> bool:
@ -1821,14 +1814,10 @@ class ZigbeeMqttPlugin(MqttPlugin): # lgtm [py/missing-call-to-init]
Set the state for one or more Zigbee lights.
"""
lights = [lights] if isinstance(lights, str) else lights
lights = [self._ieee_address(t) for t in lights]
devices = [
dev
for dev in self._get_network_info().get('devices', [])
if self._ieee_address(dev) in lights or dev.get('friendly_name') in lights
]
devices = [self._get_device_info(light) for light in lights]
for dev in devices:
for i, dev in enumerate(devices):
assert dev, f'No such device: {lights[i]}'
light_meta = self._get_light_meta(dev)
assert light_meta, f'{dev["name"]} is not a light'
data = {}
@ -1869,9 +1858,7 @@ class ZigbeeMqttPlugin(MqttPlugin): # lgtm [py/missing-call-to-init]
else:
data[attr] = value
self.device_set(
dev.get('friendly_name', dev.get('ieee_address')), values=data
)
self.device_set(self._preferred_name(dev), values=data)
# vim:sw=4:ts=4:et: