Moved more zigbee2mqtt methods to the new API

This commit is contained in:
Fabio Manganiello 2021-02-09 02:33:43 +01:00
parent 86e6ffd18d
commit 1eedcaf2be
1 changed files with 72 additions and 44 deletions

View File

@ -1,8 +1,9 @@
import json import json
import threading import threading
from typing import Optional, List, Any, Dict from typing import Optional, List, Any, Dict, Union
from platypush.message.response import Response
from platypush.plugins.mqtt import MqttPlugin, action from platypush.plugins.mqtt import MqttPlugin, action
@ -206,6 +207,14 @@ class ZigbeeMqttPlugin(MqttPlugin):
def _topic(self, topic): def _topic(self, topic):
return self.base_topic + '/' + topic return self.base_topic + '/' + topic
@staticmethod
def _parse_response(response: Union[dict, Response]) -> dict:
if isinstance(response, Response):
response = response.output
assert response.get('status') != 'error', response.get('error', 'zigbee2mqtt error')
return response
@action @action
def devices(self, **kwargs) -> List[Dict[str, Any]]: def devices(self, **kwargs) -> List[Dict[str, Any]]:
""" """
@ -380,13 +389,6 @@ class ZigbeeMqttPlugin(MqttPlugin):
""" """
return self._get_network_info(**kwargs).get('devices') return self._get_network_info(**kwargs).get('devices')
def _permit_join_timeout_callback(self, permit: bool, **kwargs):
def callback():
self.logger.info('Restoring permit_join state to {}'.format(permit))
self.permit_join(permit, **kwargs)
return callback
@action @action
def permit_join(self, permit: bool = True, timeout: Optional[float] = None, **kwargs): def permit_join(self, permit: bool = True, timeout: Optional[float] = None, **kwargs):
""" """
@ -398,19 +400,16 @@ class ZigbeeMqttPlugin(MqttPlugin):
:param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish``
(default: query the default configured device). (default: query the default configured device).
""" """
self.publish(topic=self._topic('bridge/config/permit_join'), msg=permit, **self._mqtt_args(**kwargs))
if timeout: if timeout:
threading.Timer(timeout, self._permit_join_timeout_callback(not permit, **kwargs)).start() return self._parse_response(
self.publish(topic=self._topic('bridge/request/permit_join'),
msg={'value': permit, 'time': timeout},
reply_topic=self._topic('bridge/response/permit_join'),
**self._mqtt_args(**kwargs)))
@action return self.publish(topic=self._topic('bridge/request/permit_join'),
def reset(self, **kwargs): msg={'value': permit},
""" **self._mqtt_args(**kwargs))
Reset the adapter.
:param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish``
(default: query the default configured device).
"""
self.publish(topic=self._topic('bridge/config/reset'), msg='', **self._mqtt_args(**kwargs))
@action @action
def factory_reset(self, **kwargs): def factory_reset(self, **kwargs):
@ -421,7 +420,7 @@ class ZigbeeMqttPlugin(MqttPlugin):
:param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish``
(default: query the default configured device). (default: query the default configured device).
""" """
self.publish(topic=self._topic('bridge/config/factory_reset'), msg='', **self._mqtt_args(**kwargs)) self.publish(topic=self._topic('bridge/request/touchlink/factory_reset'), msg='', **self._mqtt_args(**kwargs))
@action @action
def log_level(self, level: str, **kwargs): def log_level(self, level: str, **kwargs):
@ -432,7 +431,10 @@ class ZigbeeMqttPlugin(MqttPlugin):
:param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish``
(default: query the default configured device). (default: query the default configured device).
""" """
self.publish(topic=self._topic('bridge/config/log_level'), msg=level, **self._mqtt_args(**kwargs)) return self._parse_response(
self.publish(topic=self._topic('bridge/request/config/log_level'), msg={'value': level},
reply_topic=self._topic('bridge/response/config/log_level'),
**self._mqtt_args(**kwargs)))
@action @action
def device_set_option(self, device: str, option: str, value: Any, **kwargs): def device_set_option(self, device: str, option: str, value: Any, **kwargs):
@ -445,12 +447,15 @@ class ZigbeeMqttPlugin(MqttPlugin):
:param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish``
(default: query the default configured device). (default: query the default configured device).
""" """
self.publish(topic=self._topic('bridge/config/device_options'), msg={ return self._parse_response(
'friendly_name': device, self.publish(topic=self._topic('bridge/request/device/options'),
'options': { reply_topic=self._topic('bridge/response/device/options'),
option: value, msg={
} 'id': device,
}, **self._mqtt_args(**kwargs)) 'options': {
option: value,
}
}, **self._mqtt_args(**kwargs)))
@action @action
def device_remove(self, device: str, force: bool = False, **kwargs): def device_remove(self, device: str, force: bool = False, **kwargs):
@ -464,8 +469,11 @@ class ZigbeeMqttPlugin(MqttPlugin):
:param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish``
(default: query the default configured device). (default: query the default configured device).
""" """
topic = self._topic('bridge/config/{}remove'.format('force_' if force else '')) return self._parse_response(
self.publish(topic=topic, msg=device, **self._mqtt_args(**kwargs)) self.publish(topic=self._topic('bridge/request/device/remove'),
msg={'id': device, 'force': force},
reply_topic=self._topic('bridge/response/device/remove'),
**self._mqtt_args(**kwargs)))
@action @action
def device_ban(self, device: str, **kwargs): def device_ban(self, device: str, **kwargs):
@ -476,7 +484,11 @@ class ZigbeeMqttPlugin(MqttPlugin):
:param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish``
(default: query the default configured device). (default: query the default configured device).
""" """
self.publish(topic=self._topic('bridge/config/ban'), msg=device, **self._mqtt_args(**kwargs)) return self._parse_response(
self.publish(topic=self._topic('bridge/request/device/ban'),
reply_topic=self._topic('bridge/response/device/ban'),
msg={'id': device},
**self._mqtt_args(**kwargs)))
@action @action
def device_whitelist(self, device: str, **kwargs): def device_whitelist(self, device: str, **kwargs):
@ -488,7 +500,11 @@ class ZigbeeMqttPlugin(MqttPlugin):
:param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish``
(default: query the default configured device). (default: query the default configured device).
""" """
self.publish(topic=self._topic('bridge/config/whitelist'), msg=device, **self._mqtt_args(**kwargs)) return self._parse_response(
self.publish(topic=self._topic('bridge/request/device/whitelist'),
reply_topic=self._topic('bridge/response/device/whitelist'),
msg={'id': device},
**self._mqtt_args(**kwargs)))
@action @action
def device_rename(self, name: str, device: Optional[str] = None, **kwargs): def device_rename(self, name: str, device: Optional[str] = None, **kwargs):
@ -521,9 +537,11 @@ class ZigbeeMqttPlugin(MqttPlugin):
'to': name, 'to': name,
} }
self.publish( return self._parse_response(
topic=self._topic('bridge/request/device/rename'), self.publish(topic=self._topic('bridge/request/device/rename'),
msg=req, **self._mqtt_args(**kwargs)) msg=req,
reply_topic=self._topic('bridge/response/device/rename'),
**self._mqtt_args(**kwargs)))
@staticmethod @staticmethod
def _build_device_get_request(values: List[Dict[str, Any]]) -> dict: def _build_device_get_request(values: List[Dict[str, Any]]) -> dict:
@ -778,11 +796,17 @@ class ZigbeeMqttPlugin(MqttPlugin):
:param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish``
(default: query the default configured device). (default: query the default configured device).
""" """
args = {'friendly_name': name} payload = name if id is None else {
if id is not None: 'id': id,
args['id'] = id 'friendly_name': name,
}
self.publish(topic=self._topic('bridge/config/add_group'), msg=args, **self._mqtt_args(**kwargs)) return self._parse_response(
self.publish(topic=self._topic('bridge/request/group/add'),
reply_topic=self._topic('bridge/response/group/add'),
msg=payload,
**self._mqtt_args(**kwargs))
)
# noinspection PyShadowingBuiltins,DuplicatedCode # noinspection PyShadowingBuiltins,DuplicatedCode
@action @action
@ -826,10 +850,11 @@ class ZigbeeMqttPlugin(MqttPlugin):
groups = {group.get('friendly_name'): group for group in self.groups().output} groups = {group.get('friendly_name'): group for group in self.groups().output}
assert name not in groups, 'A group named {} already exists on the network'.format(name) assert name not in groups, 'A group named {} already exists on the network'.format(name)
self.publish( return self._parse_response(
topic=self._topic('bridge/config/rename'), self.publish(topic=self._topic('bridge/request/group/rename'),
msg={'old': group, 'new': name} if group else name, reply_topic=self._topic('bridge/response/group/rename'),
**self._mqtt_args(**kwargs)) msg={'from': group, 'to': name} if group else name,
**self._mqtt_args(**kwargs)))
@action @action
def group_remove(self, name: str, **kwargs): def group_remove(self, name: str, **kwargs):
@ -840,8 +865,11 @@ class ZigbeeMqttPlugin(MqttPlugin):
:param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish``
(default: query the default configured device). (default: query the default configured device).
""" """
self.publish(topic=self._topic('bridge/config/remove_group'), msg=name, return self._parse_response(
**self._mqtt_args(**kwargs)) self.publish(topic=self._topic('bridge/request/group/remove'),
reply_topic=self._topic('bridge/response/group/remove'),
msg=name,
**self._mqtt_args(**kwargs)))
@action @action
def group_add_device(self, group: str, device: str, **kwargs): def group_add_device(self, group: str, device: str, **kwargs):