Added native support for switch entities to the zigbee.mqtt plugin.

This commit is contained in:
Fabio Manganiello 2022-04-05 00:07:55 +02:00
parent 9f2793118b
commit 28b3672432
Signed by: blacklight
GPG key ID: D90FBA7F76362774

View file

@ -9,7 +9,7 @@ from platypush.plugins.mqtt import MqttPlugin, action
from platypush.plugins.switch import SwitchPlugin from platypush.plugins.switch import SwitchPlugin
class ZigbeeMqttPlugin(MqttPlugin, SwitchPlugin): # lgtm [py/missing-call-to-init] class ZigbeeMqttPlugin(MqttPlugin, SwitchPlugin): # lgtm [py/missing-call-to-init]
""" """
This plugin allows you to interact with Zigbee devices over MQTT through any Zigbee sniffer and This plugin allows you to interact with Zigbee devices over MQTT through any Zigbee sniffer and
`zigbee2mqtt <https://www.zigbee2mqtt.io/>`_. `zigbee2mqtt <https://www.zigbee2mqtt.io/>`_.
@ -35,7 +35,8 @@ class ZigbeeMqttPlugin(MqttPlugin, SwitchPlugin): # lgtm [py/missing-call-to-i
.. code-block:: shell .. code-block:: shell
wget https://github.com/Koenkk/Z-Stack-firmware/raw/master/coordinator/Z-Stack_Home_1.2/bin/default/CC2531_DEFAULT_20201127.zip wget https://github.com/Koenkk/Z-Stack-firmware/raw/master\
/coordinator/Z-Stack_Home_1.2/bin/default/CC2531_DEFAULT_20201127.zip
unzip CC2531_DEFAULT_20201127.zip unzip CC2531_DEFAULT_20201127.zip
[sudo] cc-tool -e -w CC2531ZNP-Prod.hex [sudo] cc-tool -e -w CC2531ZNP-Prod.hex
@ -78,7 +79,8 @@ class ZigbeeMqttPlugin(MqttPlugin, SwitchPlugin): # lgtm [py/missing-call-to-i
configured your network, to prevent accidental/malignant joins from outer Zigbee devices. configured your network, to prevent accidental/malignant joins from outer Zigbee devices.
- Start the ``zigbee2mqtt`` daemon on your device (the - Start the ``zigbee2mqtt`` daemon on your device (the
`official documentation <https://www.zigbee2mqtt.io/getting_started/running_zigbee2mqtt.html#5-optional-running-as-a-daemon-with-systemctl>`_ `official documentation <https://www.zigbee2mqtt.io/getting_started
/running_zigbee2mqtt.html#5-optional-running-as-a-daemon-with-systemctl>`_
also contains instructions on how to configure it as a ``systemd`` service: also contains instructions on how to configure it as a ``systemd`` service:
.. code-block:: shell .. code-block:: shell
@ -103,10 +105,20 @@ class ZigbeeMqttPlugin(MqttPlugin, SwitchPlugin): # lgtm [py/missing-call-to-i
""" """
def __init__(self, host: str = 'localhost', port: int = 1883, base_topic: str = 'zigbee2mqtt', timeout: int = 10, def __init__(
tls_certfile: Optional[str] = None, tls_keyfile: Optional[str] = None, self,
tls_version: Optional[str] = None, tls_ciphers: Optional[str] = None, host: str = 'localhost',
username: Optional[str] = None, password: Optional[str] = None, **kwargs): port: int = 1883,
base_topic: str = 'zigbee2mqtt',
timeout: int = 10,
tls_certfile: Optional[str] = None,
tls_keyfile: Optional[str] = None,
tls_version: Optional[str] = None,
tls_ciphers: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
**kwargs,
):
""" """
:param host: Default MQTT broker where ``zigbee2mqtt`` publishes its messages (default: ``localhost``). :param host: Default MQTT broker where ``zigbee2mqtt`` publishes its messages (default: ``localhost``).
:param port: Broker listen port (default: 1883). :param port: Broker listen port (default: 1883).
@ -124,9 +136,17 @@ class ZigbeeMqttPlugin(MqttPlugin, SwitchPlugin): # lgtm [py/missing-call-to-i
:param username: If the connection requires user authentication, specify the username (default: None) :param username: If the connection requires user authentication, specify the username (default: None)
:param password: If the connection requires user authentication, specify the password (default: None) :param password: If the connection requires user authentication, specify the password (default: None)
""" """
super().__init__(host=host, port=port, tls_certfile=tls_certfile, tls_keyfile=tls_keyfile, super().__init__(
tls_version=tls_version, tls_ciphers=tls_ciphers, username=username, host=host,
password=password, **kwargs) port=port,
tls_certfile=tls_certfile,
tls_keyfile=tls_keyfile,
tls_version=tls_version,
tls_ciphers=tls_ciphers,
username=username,
password=password,
**kwargs,
)
self.base_topic = base_topic self.base_topic = base_topic
self.timeout = timeout self.timeout = timeout
@ -135,6 +155,38 @@ class ZigbeeMqttPlugin(MqttPlugin, SwitchPlugin): # lgtm [py/missing-call-to-i
'groups': {}, 'groups': {},
} }
def transform_entities(self, devices):
from platypush.entities.switches import Switch
compatible_entities = []
for dev in devices:
dev_info = {
"type": dev.get("type"),
"date_code": dev.get("date_code"),
"ieee_address": dev.get("ieee_address"),
"network_address": dev.get("network_address"),
"power_source": dev.get("power_source"),
"software_build_id": dev.get("software_build_id"),
"model_id": dev.get("model_id"),
"model": dev.get("definition", {}).get("model"),
"vendor": dev.get("definition", {}).get("vendor"),
"supported": dev.get("supported"),
"description": dev.get("definition", {}).get("description"),
}
switch_info = self._get_switch_info(dev)
if switch_info:
compatible_entities.append(
Switch(
id=dev['ieee_address'],
name=dev.get('friendly_name'),
state=switch_info['property'] == switch_info['value_on'],
data=dev_info,
)
)
return compatible_entities
def _get_network_info(self, **kwargs): def _get_network_info(self, **kwargs):
self.logger.info('Fetching Zigbee network information') self.logger.info('Fetching Zigbee network information')
client = None client = None
@ -157,7 +209,11 @@ class ZigbeeMqttPlugin(MqttPlugin, SwitchPlugin): # lgtm [py/missing-call-to-i
def callback(_, __, msg): def callback(_, __, msg):
topic = msg.topic.split('/')[-1] topic = msg.topic.split('/')[-1]
if topic in info: if topic in info:
info[topic] = msg.payload.decode() if topic == 'state' else json.loads(msg.payload.decode()) info[topic] = (
msg.payload.decode()
if topic == 'state'
else json.loads(msg.payload.decode())
)
info_ready_events[topic].set() info_ready_events[topic].set()
return callback return callback
@ -174,7 +230,9 @@ class ZigbeeMqttPlugin(MqttPlugin, SwitchPlugin): # lgtm [py/missing-call-to-i
for event in info_ready_events.values(): for event in info_ready_events.values():
info_ready = event.wait(timeout=timeout) info_ready = event.wait(timeout=timeout)
if not info_ready: if not info_ready:
raise TimeoutError('A timeout occurred while fetching the Zigbee network information') raise TimeoutError(
'A timeout occurred while fetching the Zigbee network information'
)
# Cache the new results # Cache the new results
self._info['devices'] = { self._info['devices'] = {
@ -183,10 +241,10 @@ class ZigbeeMqttPlugin(MqttPlugin, SwitchPlugin): # lgtm [py/missing-call-to-i
} }
self._info['groups'] = { self._info['groups'] = {
group.get('name'): group group.get('name'): group for group in info.get('groups', [])
for group in info.get('groups', [])
} }
self.publish_entities(self._info['devices'].values()) # type: ignore
self.logger.info('Zigbee network configuration updated') self.logger.info('Zigbee network configuration updated')
return info return info
finally: finally:
@ -194,7 +252,9 @@ class ZigbeeMqttPlugin(MqttPlugin, SwitchPlugin): # lgtm [py/missing-call-to-i
client.loop_stop() client.loop_stop()
client.disconnect() client.disconnect()
except Exception as e: except Exception as e:
self.logger.warning('Error on MQTT client disconnection: {}'.format(str(e))) self.logger.warning(
'Error on MQTT client disconnection: {}'.format(str(e))
)
def _topic(self, topic): def _topic(self, topic):
return self.base_topic + '/' + topic return self.base_topic + '/' + topic
@ -204,7 +264,9 @@ class ZigbeeMqttPlugin(MqttPlugin, SwitchPlugin): # lgtm [py/missing-call-to-i
if isinstance(response, Response): if isinstance(response, Response):
response = response.output response = response.output
assert response.get('status') != 'error', response.get('error', 'zigbee2mqtt error') assert response.get('status') != 'error', response.get(
'error', 'zigbee2mqtt error'
)
return response return response
@action @action
@ -291,7 +353,7 @@ class ZigbeeMqttPlugin(MqttPlugin, SwitchPlugin): # lgtm [py/missing-call-to-i
"value_min": 150 "value_min": 150
}, },
{ {
"description": "Color of this light in the CIE 1931 color space (x/y)", "description": "Color of this light in the XY space",
"features": [ "features": [
{ {
"access": 7, "access": 7,
@ -315,7 +377,7 @@ class ZigbeeMqttPlugin(MqttPlugin, SwitchPlugin): # lgtm [py/missing-call-to-i
}, },
{ {
"access": 2, "access": 2,
"description": "Triggers an effect on the light (e.g. make light blink for a few seconds)", "description": "Triggers an effect on the light",
"name": "effect", "name": "effect",
"property": "effect", "property": "effect",
"type": "enum", "type": "enum",
@ -382,7 +444,9 @@ class ZigbeeMqttPlugin(MqttPlugin, SwitchPlugin): # lgtm [py/missing-call-to-i
return self._get_network_info(**kwargs).get('devices') return self._get_network_info(**kwargs).get('devices')
@action @action
def permit_join(self, permit: bool = True, timeout: Optional[float] = None, **kwargs): def permit_join(
self, permit: bool = True, timeout: Optional[float] = None, **kwargs
):
""" """
Enable/disable devices from joining the network. This is not persistent (will not be saved to Enable/disable devices from joining the network. This is not persistent (will not be saved to
``configuration.yaml``). ``configuration.yaml``).
@ -394,14 +458,19 @@ class ZigbeeMqttPlugin(MqttPlugin, SwitchPlugin): # lgtm [py/missing-call-to-i
""" """
if timeout: if timeout:
return self._parse_response( return self._parse_response(
self.publish(topic=self._topic('bridge/request/permit_join'), self.publish(
msg={'value': permit, 'time': timeout}, topic=self._topic('bridge/request/permit_join'),
reply_topic=self._topic('bridge/response/permit_join'), msg={'value': permit, 'time': timeout},
**self._mqtt_args(**kwargs))) reply_topic=self._topic('bridge/response/permit_join'),
**self._mqtt_args(**kwargs),
)
)
return self.publish(topic=self._topic('bridge/request/permit_join'), return self.publish(
msg={'value': permit}, topic=self._topic('bridge/request/permit_join'),
**self._mqtt_args(**kwargs)) msg={'value': permit},
**self._mqtt_args(**kwargs),
)
@action @action
def factory_reset(self, **kwargs): def factory_reset(self, **kwargs):
@ -413,7 +482,11 @@ class ZigbeeMqttPlugin(MqttPlugin, SwitchPlugin): # lgtm [py/missing-call-to-i
:param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish``
(default: query the default configured device). (default: query the default configured device).
""" """
self.publish(topic=self._topic('bridge/request/touchlink/factory_reset'), msg='', **self._mqtt_args(**kwargs)) self.publish(
topic=self._topic('bridge/request/touchlink/factory_reset'),
msg='',
**self._mqtt_args(**kwargs),
)
@action @action
def log_level(self, level: str, **kwargs): def log_level(self, level: str, **kwargs):
@ -425,9 +498,13 @@ class ZigbeeMqttPlugin(MqttPlugin, SwitchPlugin): # lgtm [py/missing-call-to-i
(default: query the default configured device). (default: query the default configured device).
""" """
return self._parse_response( return self._parse_response(
self.publish(topic=self._topic('bridge/request/config/log_level'), msg={'value': level}, self.publish(
reply_topic=self._topic('bridge/response/config/log_level'), topic=self._topic('bridge/request/config/log_level'),
**self._mqtt_args(**kwargs))) msg={'value': level},
reply_topic=self._topic('bridge/response/config/log_level'),
**self._mqtt_args(**kwargs),
)
)
@action @action
def device_set_option(self, device: str, option: str, value: Any, **kwargs): def device_set_option(self, device: str, option: str, value: Any, **kwargs):
@ -441,14 +518,18 @@ class ZigbeeMqttPlugin(MqttPlugin, SwitchPlugin): # lgtm [py/missing-call-to-i
(default: query the default configured device). (default: query the default configured device).
""" """
return self._parse_response( return self._parse_response(
self.publish(topic=self._topic('bridge/request/device/options'), self.publish(
reply_topic=self._topic('bridge/response/device/options'), topic=self._topic('bridge/request/device/options'),
msg={ reply_topic=self._topic('bridge/response/device/options'),
'id': device, msg={
'options': { 'id': device,
option: value, 'options': {
} option: value,
}, **self._mqtt_args(**kwargs))) },
},
**self._mqtt_args(**kwargs),
)
)
@action @action
def device_remove(self, device: str, force: bool = False, **kwargs): def device_remove(self, device: str, force: bool = False, **kwargs):
@ -463,10 +544,13 @@ class ZigbeeMqttPlugin(MqttPlugin, SwitchPlugin): # lgtm [py/missing-call-to-i
(default: query the default configured device). (default: query the default configured device).
""" """
return self._parse_response( return self._parse_response(
self.publish(topic=self._topic('bridge/request/device/remove'), self.publish(
msg={'id': device, 'force': force}, topic=self._topic('bridge/request/device/remove'),
reply_topic=self._topic('bridge/response/device/remove'), msg={'id': device, 'force': force},
**self._mqtt_args(**kwargs))) reply_topic=self._topic('bridge/response/device/remove'),
**self._mqtt_args(**kwargs),
)
)
@action @action
def device_ban(self, device: str, **kwargs): def device_ban(self, device: str, **kwargs):
@ -478,10 +562,13 @@ class ZigbeeMqttPlugin(MqttPlugin, SwitchPlugin): # lgtm [py/missing-call-to-i
(default: query the default configured device). (default: query the default configured device).
""" """
return self._parse_response( return self._parse_response(
self.publish(topic=self._topic('bridge/request/device/ban'), self.publish(
reply_topic=self._topic('bridge/response/device/ban'), topic=self._topic('bridge/request/device/ban'),
msg={'id': device}, reply_topic=self._topic('bridge/response/device/ban'),
**self._mqtt_args(**kwargs))) msg={'id': device},
**self._mqtt_args(**kwargs),
)
)
@action @action
def device_whitelist(self, device: str, **kwargs): def device_whitelist(self, device: str, **kwargs):
@ -494,10 +581,13 @@ class ZigbeeMqttPlugin(MqttPlugin, SwitchPlugin): # lgtm [py/missing-call-to-i
(default: query the default configured device). (default: query the default configured device).
""" """
return self._parse_response( return self._parse_response(
self.publish(topic=self._topic('bridge/request/device/whitelist'), self.publish(
reply_topic=self._topic('bridge/response/device/whitelist'), topic=self._topic('bridge/request/device/whitelist'),
msg={'id': device}, reply_topic=self._topic('bridge/response/device/whitelist'),
**self._mqtt_args(**kwargs))) msg={'id': device},
**self._mqtt_args(**kwargs),
)
)
@action @action
def device_rename(self, name: str, device: Optional[str] = None, **kwargs): def device_rename(self, name: str, device: Optional[str] = None, **kwargs):
@ -516,8 +606,9 @@ class ZigbeeMqttPlugin(MqttPlugin, SwitchPlugin): # lgtm [py/missing-call-to-i
# noinspection PyUnresolvedReferences # noinspection PyUnresolvedReferences
devices = self.devices().output devices = self.devices().output
assert not [dev for dev in devices if dev.get('friendly_name') == name], \ assert not [
'A device named {} already exists on the network'.format(name) dev for dev in devices if dev.get('friendly_name') == name
], 'A device named {} already exists on the network'.format(name)
if device: if device:
req = { req = {
@ -531,10 +622,13 @@ class ZigbeeMqttPlugin(MqttPlugin, SwitchPlugin): # lgtm [py/missing-call-to-i
} }
return self._parse_response( return self._parse_response(
self.publish(topic=self._topic('bridge/request/device/rename'), self.publish(
msg=req, topic=self._topic('bridge/request/device/rename'),
reply_topic=self._topic('bridge/response/device/rename'), msg=req,
**self._mqtt_args(**kwargs))) reply_topic=self._topic('bridge/response/device/rename'),
**self._mqtt_args(**kwargs),
)
)
@staticmethod @staticmethod
def build_device_get_request(values: List[Dict[str, Any]]) -> dict: def build_device_get_request(values: List[Dict[str, Any]]) -> dict:
@ -563,7 +657,9 @@ class ZigbeeMqttPlugin(MqttPlugin, SwitchPlugin): # lgtm [py/missing-call-to-i
# noinspection PyShadowingBuiltins # noinspection PyShadowingBuiltins
@action @action
def device_get(self, device: str, property: Optional[str] = None, **kwargs) -> Dict[str, Any]: def device_get(
self, device: str, property: Optional[str] = None, **kwargs
) -> Dict[str, Any]:
""" """
Get the properties of a device. The returned keys vary depending on the device. For example, a light bulb Get the properties of a device. The returned keys vary depending on the device. For example, a light bulb
may have the "``state``" and "``brightness``" properties, while an environment sensor may have the may have the "``state``" and "``brightness``" properties, while an environment sensor may have the
@ -578,26 +674,45 @@ class ZigbeeMqttPlugin(MqttPlugin, SwitchPlugin): # lgtm [py/missing-call-to-i
kwargs = self._mqtt_args(**kwargs) kwargs = self._mqtt_args(**kwargs)
if property: if property:
properties = self.publish(topic=self._topic(device) + '/get/' + property, reply_topic=self._topic(device), properties = self.publish(
msg={property: ''}, **kwargs).output topic=self._topic(device) + f'/get/{property}',
reply_topic=self._topic(device),
msg={property: ''},
**kwargs,
).output
assert property in properties, 'No such property: ' + property assert property in properties, f'No such property: {property}'
return {property: properties[property]} return {property: properties[property]}
refreshed = False
if device not in self._info.get('devices', {}): if device not in self._info.get('devices', {}):
# Refresh devices info # Refresh devices info
self._get_network_info(**kwargs) self._get_network_info(**kwargs)
refreshed = True
assert self._info.get('devices', {}).get(device), 'No such device: ' + device assert self._info.get('devices', {}).get(device), f'No such device: {device}'
exposes = (self._info.get('devices', {}).get(device, {}).get('definition', {}) or {}).get('exposes', []) exposes = (
self._info.get('devices', {}).get(device, {}).get('definition', {}) or {}
).get('exposes', [])
if not exposes: if not exposes:
return {} return {}
return self.publish(topic=self._topic(device) + '/get', reply_topic=self._topic(device), device_info = self.publish(
msg=self.build_device_get_request(exposes), **kwargs) topic=self._topic(device) + '/get',
reply_topic=self._topic(device),
msg=self.build_device_get_request(exposes),
**kwargs,
)
if not refreshed:
self.publish_entities([device_info]) # type: ignore
return device_info
@action @action
def devices_get(self, devices: Optional[List[str]] = None, **kwargs) -> Dict[str, dict]: def devices_get(
self, devices: Optional[List[str]] = None, **kwargs
) -> Dict[str, dict]:
""" """
Get the properties of the devices connected to the network. Get the properties of the devices connected to the network.
@ -622,14 +737,14 @@ class ZigbeeMqttPlugin(MqttPlugin, SwitchPlugin): # lgtm [py/missing-call-to-i
kwargs = self._mqtt_args(**kwargs) kwargs = self._mqtt_args(**kwargs)
if not devices: if not devices:
# noinspection PyUnresolvedReferences devices = {
devices = set([ [
device['friendly_name'] or device['ieee_address'] device['friendly_name'] or device['ieee_address']
for device in self.devices(**kwargs).output for device in self.devices(**kwargs).output
]) ]
}
def worker(device: str, q: Queue): def worker(device: str, q: Queue):
# noinspection PyUnresolvedReferences
q.put(self.device_get(device, **kwargs).output) q.put(self.device_get(device, **kwargs).output)
queues = {} queues = {}
@ -638,7 +753,9 @@ class ZigbeeMqttPlugin(MqttPlugin, SwitchPlugin): # lgtm [py/missing-call-to-i
for device in devices: for device in devices:
queues[device] = Queue() queues[device] = Queue()
workers[device] = threading.Thread(target=worker, args=(device, queues[device])) workers[device] = threading.Thread(
target=worker, args=(device, queues[device])
)
workers[device].start() workers[device].start()
for device in devices: for device in devices:
@ -646,8 +763,11 @@ class ZigbeeMqttPlugin(MqttPlugin, SwitchPlugin): # lgtm [py/missing-call-to-i
response[device] = queues[device].get(timeout=kwargs.get('timeout')) response[device] = queues[device].get(timeout=kwargs.get('timeout'))
workers[device].join(timeout=kwargs.get('timeout')) workers[device].join(timeout=kwargs.get('timeout'))
except Exception as e: except Exception as e:
self.logger.warning('An error while getting the status of the device {}: {}'.format( self.logger.warning(
device, str(e))) 'An error while getting the status of the device {}: {}'.format(
device, str(e)
)
)
return response return response
@ -658,7 +778,7 @@ class ZigbeeMqttPlugin(MqttPlugin, SwitchPlugin): # lgtm [py/missing-call-to-i
:param device: Device friendly name (default: get all devices). :param device: Device friendly name (default: get all devices).
""" """
return self.devices_get([device], *args, **kwargs) return self.devices_get([device] if device else None, *args, **kwargs)
# noinspection PyShadowingBuiltins,DuplicatedCode # noinspection PyShadowingBuiltins,DuplicatedCode
@action @action
@ -674,9 +794,12 @@ class ZigbeeMqttPlugin(MqttPlugin, SwitchPlugin): # lgtm [py/missing-call-to-i
:param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish``
(default: query the default configured device). (default: query the default configured device).
""" """
properties = self.publish(topic=self._topic(device + '/set'), properties = self.publish(
reply_topic=self._topic(device), topic=self._topic(device + '/set'),
msg={property: value}, **self._mqtt_args(**kwargs)).output reply_topic=self._topic(device),
msg={property: value},
**self._mqtt_args(**kwargs),
).output
if property: if property:
assert property in properties, 'No such property: ' + property assert property in properties, 'No such property: ' + property
@ -705,9 +828,13 @@ class ZigbeeMqttPlugin(MqttPlugin, SwitchPlugin): # lgtm [py/missing-call-to-i
""" """
ret = self._parse_response( ret = self._parse_response(
self.publish(topic=self._topic('bridge/request/device/ota_update/check'), self.publish(
reply_topic=self._topic('bridge/response/device/ota_update/check'), topic=self._topic('bridge/request/device/ota_update/check'),
msg={'id': device}, **self._mqtt_args(**kwargs))) reply_topic=self._topic('bridge/response/device/ota_update/check'),
msg={'id': device},
**self._mqtt_args(**kwargs),
)
)
return { return {
'status': ret['status'], 'status': ret['status'],
@ -725,9 +852,13 @@ class ZigbeeMqttPlugin(MqttPlugin, SwitchPlugin): # lgtm [py/missing-call-to-i
(default: query the default configured device). (default: query the default configured device).
""" """
return self._parse_response( return self._parse_response(
self.publish(topic=self._topic('bridge/request/device/ota_update/update'), self.publish(
reply_topic=self._topic('bridge/response/device/ota_update/update'), topic=self._topic('bridge/request/device/ota_update/update'),
msg={'id': device}, **self._mqtt_args(**kwargs))) reply_topic=self._topic('bridge/response/device/ota_update/update'),
msg={'id': device},
**self._mqtt_args(**kwargs),
)
)
@action @action
def groups(self, **kwargs) -> List[dict]: def groups(self, **kwargs) -> List[dict]:
@ -883,16 +1014,22 @@ class ZigbeeMqttPlugin(MqttPlugin, SwitchPlugin): # lgtm [py/missing-call-to-i
:param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish``
(default: query the default configured device). (default: query the default configured device).
""" """
payload = name if id is None else { payload = (
'id': id, name
'friendly_name': name, if id is None
} else {
'id': id,
'friendly_name': name,
}
)
return self._parse_response( return self._parse_response(
self.publish(topic=self._topic('bridge/request/group/add'), self.publish(
reply_topic=self._topic('bridge/response/group/add'), topic=self._topic('bridge/request/group/add'),
msg=payload, reply_topic=self._topic('bridge/response/group/add'),
**self._mqtt_args(**kwargs)) msg=payload,
**self._mqtt_args(**kwargs),
)
) )
@action @action
@ -911,9 +1048,12 @@ class ZigbeeMqttPlugin(MqttPlugin, SwitchPlugin): # lgtm [py/missing-call-to-i
if property: if property:
msg = {property: ''} msg = {property: ''}
properties = self.publish(topic=self._topic(group + '/get'), properties = self.publish(
reply_topic=self._topic(group), topic=self._topic(group + '/get'),
msg=msg, **self._mqtt_args(**kwargs)).output reply_topic=self._topic(group),
msg=msg,
**self._mqtt_args(**kwargs),
).output
if property: if property:
assert property in properties, 'No such property: ' + property assert property in properties, 'No such property: ' + property
@ -935,9 +1075,12 @@ class ZigbeeMqttPlugin(MqttPlugin, SwitchPlugin): # lgtm [py/missing-call-to-i
:param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish``
(default: query the default configured device). (default: query the default configured device).
""" """
properties = self.publish(topic=self._topic(group + '/set'), properties = self.publish(
reply_topic=self._topic(group), topic=self._topic(group + '/set'),
msg={property: value}, **self._mqtt_args(**kwargs)).output reply_topic=self._topic(group),
msg={property: value},
**self._mqtt_args(**kwargs),
).output
if property: if property:
assert property in properties, 'No such property: ' + property assert property in properties, 'No such property: ' + property
@ -961,13 +1104,18 @@ class ZigbeeMqttPlugin(MqttPlugin, SwitchPlugin): # lgtm [py/missing-call-to-i
# noinspection PyUnresolvedReferences # noinspection PyUnresolvedReferences
groups = {group.get('friendly_name'): group for group in self.groups().output} groups = {group.get('friendly_name'): group for group in self.groups().output}
assert name not in groups, 'A group named {} already exists on the network'.format(name) assert (
name not in groups
), 'A group named {} already exists on the network'.format(name)
return self._parse_response( return self._parse_response(
self.publish(topic=self._topic('bridge/request/group/rename'), self.publish(
reply_topic=self._topic('bridge/response/group/rename'), topic=self._topic('bridge/request/group/rename'),
msg={'from': group, 'to': name} if group else name, reply_topic=self._topic('bridge/response/group/rename'),
**self._mqtt_args(**kwargs))) msg={'from': group, 'to': name} if group else name,
**self._mqtt_args(**kwargs),
)
)
@action @action
def group_remove(self, name: str, **kwargs): def group_remove(self, name: str, **kwargs):
@ -979,10 +1127,13 @@ class ZigbeeMqttPlugin(MqttPlugin, SwitchPlugin): # lgtm [py/missing-call-to-i
(default: query the default configured device). (default: query the default configured device).
""" """
return self._parse_response( return self._parse_response(
self.publish(topic=self._topic('bridge/request/group/remove'), self.publish(
reply_topic=self._topic('bridge/response/group/remove'), topic=self._topic('bridge/request/group/remove'),
msg=name, reply_topic=self._topic('bridge/response/group/remove'),
**self._mqtt_args(**kwargs))) msg=name,
**self._mqtt_args(**kwargs),
)
)
@action @action
def group_add_device(self, group: str, device: str, **kwargs): def group_add_device(self, group: str, device: str, **kwargs):
@ -995,12 +1146,16 @@ class ZigbeeMqttPlugin(MqttPlugin, SwitchPlugin): # lgtm [py/missing-call-to-i
(default: query the default configured device). (default: query the default configured device).
""" """
return self._parse_response( return self._parse_response(
self.publish(topic=self._topic('bridge/request/group/members/add'), self.publish(
reply_topic=self._topic('bridge/response/group/members/add'), topic=self._topic('bridge/request/group/members/add'),
msg={ reply_topic=self._topic('bridge/response/group/members/add'),
'group': group, msg={
'device': device, 'group': group,
}, **self._mqtt_args(**kwargs))) 'device': device,
},
**self._mqtt_args(**kwargs),
)
)
@action @action
def group_remove_device(self, group: str, device: Optional[str] = None, **kwargs): def group_remove_device(self, group: str, device: Optional[str] = None, **kwargs):
@ -1015,13 +1170,23 @@ class ZigbeeMqttPlugin(MqttPlugin, SwitchPlugin): # lgtm [py/missing-call-to-i
""" """
return self._parse_response( return self._parse_response(
self.publish( self.publish(
topic=self._topic('bridge/request/group/members/remove{}'.format('_all' if device is None else '')), topic=self._topic(
'bridge/request/group/members/remove{}'.format(
'_all' if device is None else ''
)
),
reply_topic=self._topic( reply_topic=self._topic(
'bridge/response/group/members/remove{}'.format('_all' if device is None else '')), 'bridge/response/group/members/remove{}'.format(
'_all' if device is None else ''
)
),
msg={ msg={
'group': group, 'group': group,
'device': device, 'device': device,
}, **self._mqtt_args(**kwargs))) },
**self._mqtt_args(**kwargs),
)
)
@action @action
def bind_devices(self, source: str, target: str, **kwargs): def bind_devices(self, source: str, target: str, **kwargs):
@ -1040,9 +1205,13 @@ class ZigbeeMqttPlugin(MqttPlugin, SwitchPlugin): # lgtm [py/missing-call-to-i
(default: query the default configured device). (default: query the default configured device).
""" """
return self._parse_response( return self._parse_response(
self.publish(topic=self._topic('bridge/request/device/bind'), self.publish(
reply_topic=self._topic('bridge/response/device/bind'), topic=self._topic('bridge/request/device/bind'),
msg={'from': source, 'to': target}, **self._mqtt_args(**kwargs))) reply_topic=self._topic('bridge/response/device/bind'),
msg={'from': source, 'to': target},
**self._mqtt_args(**kwargs),
)
)
@action @action
def unbind_devices(self, source: str, target: str, **kwargs): def unbind_devices(self, source: str, target: str, **kwargs):
@ -1057,9 +1226,13 @@ class ZigbeeMqttPlugin(MqttPlugin, SwitchPlugin): # lgtm [py/missing-call-to-i
(default: query the default configured device). (default: query the default configured device).
""" """
return self._parse_response( return self._parse_response(
self.publish(topic=self._topic('bridge/request/device/unbind'), self.publish(
reply_topic=self._topic('bridge/response/device/unbind'), topic=self._topic('bridge/request/device/unbind'),
msg={'from': source, 'to': target}, **self._mqtt_args(**kwargs))) reply_topic=self._topic('bridge/response/device/unbind'),
msg={'from': source, 'to': target},
**self._mqtt_args(**kwargs),
)
)
@action @action
def on(self, device, *args, **kwargs) -> dict: def on(self, device, *args, **kwargs) -> dict:
@ -1069,8 +1242,12 @@ class ZigbeeMqttPlugin(MqttPlugin, SwitchPlugin): # lgtm [py/missing-call-to-i
""" """
switch_info = self._get_switches_info().get(device) switch_info = self._get_switches_info().get(device)
assert switch_info, '{} is not a valid switch'.format(device) assert switch_info, '{} is not a valid switch'.format(device)
props = self.device_set(device, switch_info['property'], switch_info['value_on']).output props = self.device_set(
return self._properties_to_switch(device=device, props=props, switch_info=switch_info) device, switch_info['property'], switch_info['value_on']
).output
return self._properties_to_switch(
device=device, props=props, switch_info=switch_info
)
@action @action
def off(self, device, *args, **kwargs) -> dict: def off(self, device, *args, **kwargs) -> dict:
@ -1080,8 +1257,12 @@ class ZigbeeMqttPlugin(MqttPlugin, SwitchPlugin): # lgtm [py/missing-call-to-i
""" """
switch_info = self._get_switches_info().get(device) switch_info = self._get_switches_info().get(device)
assert switch_info, '{} is not a valid switch'.format(device) assert switch_info, '{} is not a valid switch'.format(device)
props = self.device_set(device, switch_info['property'], switch_info['value_off']).output props = self.device_set(
return self._properties_to_switch(device=device, props=props, switch_info=switch_info) device, switch_info['property'], switch_info['value_off']
).output
return self._properties_to_switch(
device=device, props=props, switch_info=switch_info
)
@action @action
def toggle(self, device, *args, **kwargs) -> dict: def toggle(self, device, *args, **kwargs) -> dict:
@ -1091,8 +1272,12 @@ class ZigbeeMqttPlugin(MqttPlugin, SwitchPlugin): # lgtm [py/missing-call-to-i
""" """
switch_info = self._get_switches_info().get(device) switch_info = self._get_switches_info().get(device)
assert switch_info, '{} is not a valid switch'.format(device) assert switch_info, '{} is not a valid switch'.format(device)
props = self.device_set(device, switch_info['property'], switch_info['value_toggle']).output props = self.device_set(
return self._properties_to_switch(device=device, props=props, switch_info=switch_info) device, switch_info['property'], switch_info['value_toggle']
).output
return self._properties_to_switch(
device=device, props=props, switch_info=switch_info
)
@staticmethod @staticmethod
def _properties_to_switch(device: str, props: dict, switch_info: dict) -> dict: def _properties_to_switch(device: str, props: dict, switch_info: dict) -> dict:
@ -1103,32 +1288,39 @@ class ZigbeeMqttPlugin(MqttPlugin, SwitchPlugin): # lgtm [py/missing-call-to-i
**props, **props,
} }
@staticmethod
def _get_switch_info(device_info: dict) -> dict:
exposes = (device_info.get('definition', {}) or {}).get('exposes', [])
for exposed in exposes:
for feature in exposed.get('features', []):
if (
feature.get('type') == 'binary'
and 'value_on' in feature
and 'value_off' in feature
and feature.get('access', 0) & 2
):
return {
'property': feature['property'],
'value_on': feature['value_on'],
'value_off': feature['value_off'],
'value_toggle': feature.get('value_toggle', None),
}
return {}
def _get_switches_info(self) -> dict: def _get_switches_info(self) -> dict:
def switch_info(device_info: dict) -> dict:
exposes = (device_info.get('definition', {}) or {}).get('exposes', [])
for exposed in exposes:
for feature in exposed.get('features', []):
if feature.get('type') == 'binary' and 'value_on' in feature and 'value_off' in feature and \
feature.get('access', 0) & 2:
return {
'property': feature['property'],
'value_on': feature['value_on'],
'value_off': feature['value_off'],
'value_toggle': feature.get('value_toggle', None),
}
return {}
# noinspection PyUnresolvedReferences # noinspection PyUnresolvedReferences
devices = self.devices().output devices = self.devices().output
switches_info = {} switches_info = {}
for device in devices: for device in devices:
info = switch_info(device) info = self._get_switch_info(device)
if not info: if not info:
continue continue
switches_info[device.get('friendly_name', device.get('ieee_address'))] = info switches_info[
device.get('friendly_name', device.get('ieee_address'))
] = info
return switches_info return switches_info
@ -1142,8 +1334,12 @@ class ZigbeeMqttPlugin(MqttPlugin, SwitchPlugin): # lgtm [py/missing-call-to-i
switches_info = self._get_switches_info() switches_info = self._get_switches_info()
# noinspection PyUnresolvedReferences # noinspection PyUnresolvedReferences
return [ return [
self._properties_to_switch(device=name, props=switch, switch_info=switches_info[name]) self._properties_to_switch(
for name, switch in self.devices_get(list(switches_info.keys())).output.items() device=name, props=switch, switch_info=switches_info[name]
)
for name, switch in self.devices_get(
list(switches_info.keys())
).output.items()
] ]