Moved more zigbee2mqtt methods to the new API

This commit is contained in:
Fabio Manganiello 2021-02-09 02:33:43 +01:00
parent 15d2e1116b
commit 021dd32190
1 changed files with 72 additions and 44 deletions

View File

@ -1,8 +1,9 @@
import json
import threading
from typing import Optional, List, Any, Dict
from typing import Optional, List, Any, Dict, Union
from platypush.message.response import Response
from platypush.plugins.mqtt import MqttPlugin, action
@ -206,6 +207,14 @@ class ZigbeeMqttPlugin(MqttPlugin):
def _topic(self, topic):
return self.base_topic + '/' + topic
@staticmethod
def _parse_response(response: Union[dict, Response]) -> dict:
if isinstance(response, Response):
response = response.output
assert response.get('status') != 'error', response.get('error', 'zigbee2mqtt error')
return response
@action
def devices(self, **kwargs) -> List[Dict[str, Any]]:
"""
@ -380,13 +389,6 @@ class ZigbeeMqttPlugin(MqttPlugin):
"""
return self._get_network_info(**kwargs).get('devices')
def _permit_join_timeout_callback(self, permit: bool, **kwargs):
def callback():
self.logger.info('Restoring permit_join state to {}'.format(permit))
self.permit_join(permit, **kwargs)
return callback
@action
def permit_join(self, permit: bool = True, timeout: Optional[float] = None, **kwargs):
"""
@ -398,19 +400,16 @@ class ZigbeeMqttPlugin(MqttPlugin):
:param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish``
(default: query the default configured device).
"""
self.publish(topic=self._topic('bridge/config/permit_join'), msg=permit, **self._mqtt_args(**kwargs))
if timeout:
threading.Timer(timeout, self._permit_join_timeout_callback(not permit, **kwargs)).start()
return self._parse_response(
self.publish(topic=self._topic('bridge/request/permit_join'),
msg={'value': permit, 'time': timeout},
reply_topic=self._topic('bridge/response/permit_join'),
**self._mqtt_args(**kwargs)))
@action
def reset(self, **kwargs):
"""
Reset the adapter.
:param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish``
(default: query the default configured device).
"""
self.publish(topic=self._topic('bridge/config/reset'), msg='', **self._mqtt_args(**kwargs))
return self.publish(topic=self._topic('bridge/request/permit_join'),
msg={'value': permit},
**self._mqtt_args(**kwargs))
@action
def factory_reset(self, **kwargs):
@ -421,7 +420,7 @@ class ZigbeeMqttPlugin(MqttPlugin):
:param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish``
(default: query the default configured device).
"""
self.publish(topic=self._topic('bridge/config/factory_reset'), msg='', **self._mqtt_args(**kwargs))
self.publish(topic=self._topic('bridge/request/touchlink/factory_reset'), msg='', **self._mqtt_args(**kwargs))
@action
def log_level(self, level: str, **kwargs):
@ -432,7 +431,10 @@ class ZigbeeMqttPlugin(MqttPlugin):
:param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish``
(default: query the default configured device).
"""
self.publish(topic=self._topic('bridge/config/log_level'), msg=level, **self._mqtt_args(**kwargs))
return self._parse_response(
self.publish(topic=self._topic('bridge/request/config/log_level'), msg={'value': level},
reply_topic=self._topic('bridge/response/config/log_level'),
**self._mqtt_args(**kwargs)))
@action
def device_set_option(self, device: str, option: str, value: Any, **kwargs):
@ -445,12 +447,15 @@ class ZigbeeMqttPlugin(MqttPlugin):
:param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish``
(default: query the default configured device).
"""
self.publish(topic=self._topic('bridge/config/device_options'), msg={
'friendly_name': device,
'options': {
option: value,
}
}, **self._mqtt_args(**kwargs))
return self._parse_response(
self.publish(topic=self._topic('bridge/request/device/options'),
reply_topic=self._topic('bridge/response/device/options'),
msg={
'id': device,
'options': {
option: value,
}
}, **self._mqtt_args(**kwargs)))
@action
def device_remove(self, device: str, force: bool = False, **kwargs):
@ -464,8 +469,11 @@ class ZigbeeMqttPlugin(MqttPlugin):
:param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish``
(default: query the default configured device).
"""
topic = self._topic('bridge/config/{}remove'.format('force_' if force else ''))
self.publish(topic=topic, msg=device, **self._mqtt_args(**kwargs))
return self._parse_response(
self.publish(topic=self._topic('bridge/request/device/remove'),
msg={'id': device, 'force': force},
reply_topic=self._topic('bridge/response/device/remove'),
**self._mqtt_args(**kwargs)))
@action
def device_ban(self, device: str, **kwargs):
@ -476,7 +484,11 @@ class ZigbeeMqttPlugin(MqttPlugin):
:param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish``
(default: query the default configured device).
"""
self.publish(topic=self._topic('bridge/config/ban'), msg=device, **self._mqtt_args(**kwargs))
return self._parse_response(
self.publish(topic=self._topic('bridge/request/device/ban'),
reply_topic=self._topic('bridge/response/device/ban'),
msg={'id': device},
**self._mqtt_args(**kwargs)))
@action
def device_whitelist(self, device: str, **kwargs):
@ -488,7 +500,11 @@ class ZigbeeMqttPlugin(MqttPlugin):
:param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish``
(default: query the default configured device).
"""
self.publish(topic=self._topic('bridge/config/whitelist'), msg=device, **self._mqtt_args(**kwargs))
return self._parse_response(
self.publish(topic=self._topic('bridge/request/device/whitelist'),
reply_topic=self._topic('bridge/response/device/whitelist'),
msg={'id': device},
**self._mqtt_args(**kwargs)))
@action
def device_rename(self, name: str, device: Optional[str] = None, **kwargs):
@ -521,9 +537,11 @@ class ZigbeeMqttPlugin(MqttPlugin):
'to': name,
}
self.publish(
topic=self._topic('bridge/request/device/rename'),
msg=req, **self._mqtt_args(**kwargs))
return self._parse_response(
self.publish(topic=self._topic('bridge/request/device/rename'),
msg=req,
reply_topic=self._topic('bridge/response/device/rename'),
**self._mqtt_args(**kwargs)))
@staticmethod
def _build_device_get_request(values: List[Dict[str, Any]]) -> dict:
@ -778,11 +796,17 @@ class ZigbeeMqttPlugin(MqttPlugin):
:param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish``
(default: query the default configured device).
"""
args = {'friendly_name': name}
if id is not None:
args['id'] = id
payload = name if id is None else {
'id': id,
'friendly_name': name,
}
self.publish(topic=self._topic('bridge/config/add_group'), msg=args, **self._mqtt_args(**kwargs))
return self._parse_response(
self.publish(topic=self._topic('bridge/request/group/add'),
reply_topic=self._topic('bridge/response/group/add'),
msg=payload,
**self._mqtt_args(**kwargs))
)
# noinspection PyShadowingBuiltins,DuplicatedCode
@action
@ -826,10 +850,11 @@ class ZigbeeMqttPlugin(MqttPlugin):
groups = {group.get('friendly_name'): group for group in self.groups().output}
assert name not in groups, 'A group named {} already exists on the network'.format(name)
self.publish(
topic=self._topic('bridge/config/rename'),
msg={'old': group, 'new': name} if group else name,
**self._mqtt_args(**kwargs))
return self._parse_response(
self.publish(topic=self._topic('bridge/request/group/rename'),
reply_topic=self._topic('bridge/response/group/rename'),
msg={'from': group, 'to': name} if group else name,
**self._mqtt_args(**kwargs)))
@action
def group_remove(self, name: str, **kwargs):
@ -840,8 +865,11 @@ class ZigbeeMqttPlugin(MqttPlugin):
:param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish``
(default: query the default configured device).
"""
self.publish(topic=self._topic('bridge/config/remove_group'), msg=name,
**self._mqtt_args(**kwargs))
return self._parse_response(
self.publish(topic=self._topic('bridge/request/group/remove'),
reply_topic=self._topic('bridge/response/group/remove'),
msg=name,
**self._mqtt_args(**kwargs)))
@action
def group_add_device(self, group: str, device: str, **kwargs):