Merged the zigbee.mqtt backend into the plugin.

- Deprecated the old `zigbee.mqtt` backend
- Black style for the `mqtt` backend
This commit is contained in:
Fabio Manganiello 2023-01-27 01:59:57 +01:00
parent afdeb91f66
commit 341e749d23
Signed by untrusted user: blacklight
GPG key ID: D90FBA7F76362774
6 changed files with 541 additions and 441 deletions

View file

@ -17,11 +17,25 @@ from platypush.utils import set_thread_name
class MqttClient(mqtt.Client, threading.Thread): class MqttClient(mqtt.Client, threading.Thread):
def __init__(self, *args, host: str, port: int, topics: Optional[List[str]] = None, def __init__(
on_message: Optional[Callable] = None, username: Optional[str] = None, password: Optional[str] = None, self,
client_id: Optional[str] = None, tls_cafile: Optional[str] = None, tls_certfile: Optional[str] = None, *args,
tls_keyfile: Optional[str] = None, tls_version: Optional = None, tls_ciphers: Optional = None, host: str,
tls_insecure: bool = False, keepalive: Optional[int] = 60, **kwargs): port: int,
topics: Optional[List[str]] = None,
on_message: Optional[Callable] = None,
username: Optional[str] = None,
password: Optional[str] = None,
client_id: Optional[str] = None,
tls_cafile: Optional[str] = None,
tls_certfile: Optional[str] = None,
tls_keyfile: Optional[str] = None,
tls_version=None,
tls_ciphers=None,
tls_insecure: bool = False,
keepalive: Optional[int] = 60,
**kwargs,
):
mqtt.Client.__init__(self, *args, client_id=client_id, **kwargs) mqtt.Client.__init__(self, *args, client_id=client_id, **kwargs)
threading.Thread.__init__(self) threading.Thread.__init__(self)
@ -38,8 +52,13 @@ class MqttClient(mqtt.Client, threading.Thread):
self.username_pw_set(username, password) self.username_pw_set(username, password)
if tls_cafile: if tls_cafile:
self.tls_set(ca_certs=tls_cafile, certfile=tls_certfile, keyfile=tls_keyfile, tls_version=tls_version, self.tls_set(
ciphers=tls_ciphers) ca_certs=tls_cafile,
certfile=tls_certfile,
keyfile=tls_keyfile,
tls_version=tls_version,
ciphers=tls_ciphers,
)
self.tls_insecure_set(tls_insecure) self.tls_insecure_set(tls_insecure)
@ -106,14 +125,25 @@ class MqttBackend(Backend):
_default_mqtt_port = 1883 _default_mqtt_port = 1883
def __init__(self, host: Optional[str] = None, port: int = _default_mqtt_port, def __init__(
topic='platypush_bus_mq', subscribe_default_topic: bool = True, self,
tls_cafile: Optional[str] = None, tls_certfile: Optional[str] = None, host: Optional[str] = None,
tls_keyfile: Optional[str] = None, tls_version: Optional[str] = None, port: int = _default_mqtt_port,
tls_ciphers: Optional[str] = None, tls_insecure: bool = False, topic='platypush_bus_mq',
username: Optional[str] = None, password: Optional[str] = None, subscribe_default_topic: bool = True,
client_id: Optional[str] = None, listeners=None, tls_cafile: Optional[str] = None,
*args, **kwargs): tls_certfile: Optional[str] = None,
tls_keyfile: Optional[str] = None,
tls_version: Optional[str] = None,
tls_ciphers: Optional[str] = None,
tls_insecure: bool = False,
username: Optional[str] = None,
password: Optional[str] = None,
client_id: Optional[str] = None,
listeners=None,
*args,
**kwargs,
):
""" """
:param host: MQTT broker host. If no host configuration is specified then :param host: MQTT broker host. If no host configuration is specified then
the backend will use the host configuration specified on the ``mqtt`` the backend will use the host configuration specified on the ``mqtt``
@ -172,10 +202,12 @@ class MqttBackend(Backend):
self.tls_insecure = tls_insecure self.tls_insecure = tls_insecure
self.username = username self.username = username
self.password = password self.password = password
self.client_id = client_id or Config.get('device_id') self.client_id: str = client_id or Config.get('device_id') # type: ignore
else: else:
client = get_plugin('mqtt') client = get_plugin('mqtt')
assert client.host, 'No host specified on backend.mqtt nor mqtt configuration' assert (
client.host
), 'No host specified on backend.mqtt nor mqtt configuration'
self.host = client.host self.host = client.host
self.port = client.port self.port = client.port
@ -187,7 +219,7 @@ class MqttBackend(Backend):
self.tls_insecure = client.tls_insecure self.tls_insecure = client.tls_insecure
self.username = client.username self.username = client.username
self.password = client.password self.password = client.password
self.client_id = client_id or client.client_id self.client_id: str = client_id or client.client_id # type: ignore
self.topic = '{}/{}'.format(topic, self.device_id) self.topic = '{}/{}'.format(topic, self.device_id)
self.subscribe_default_topic = subscribe_default_topic self.subscribe_default_topic = subscribe_default_topic
@ -197,12 +229,21 @@ class MqttBackend(Backend):
def send_message(self, msg, topic: Optional[str] = None, **kwargs): def send_message(self, msg, topic: Optional[str] = None, **kwargs):
try: try:
client = get_plugin('mqtt') client = get_plugin('mqtt')
client.send_message(topic=topic or self.topic, msg=msg, host=self.host, client.send_message(
port=self.port, username=self.username, topic=topic or self.topic,
password=self.password, tls_cafile=self.tls_cafile, msg=msg,
tls_certfile=self.tls_certfile, tls_keyfile=self.tls_keyfile, host=self.host,
tls_version=self.tls_version, tls_insecure=self.tls_insecure, port=self.port,
tls_ciphers=self.tls_ciphers, **kwargs) username=self.username,
password=self.password,
tls_cafile=self.tls_cafile,
tls_certfile=self.tls_certfile,
tls_keyfile=self.tls_keyfile,
tls_version=self.tls_version,
tls_insecure=self.tls_insecure,
tls_ciphers=self.tls_ciphers,
**kwargs,
)
except Exception as e: except Exception as e:
self.logger.exception(e) self.logger.exception(e)
@ -238,45 +279,92 @@ class MqttBackend(Backend):
topics = listener.get('topics') topics = listener.get('topics')
if not topics: if not topics:
self.logger.warning('No list of topics specified for listener n.{}'.format(i+1)) self.logger.warning(
'No list of topics specified for listener n.{}'.format(i + 1)
)
continue continue
client = self._get_client(host=host, port=port, topics=topics, username=username, password=password, client = self._get_client(
client_id=self.client_id, tls_cafile=tls_cafile, tls_certfile=tls_certfile, host=host,
tls_keyfile=tls_keyfile, tls_version=tls_version, tls_ciphers=tls_ciphers, port=port,
tls_insecure=tls_insecure) topics=topics,
username=username,
password=password,
client_id=self.client_id,
tls_cafile=tls_cafile,
tls_certfile=tls_certfile,
tls_keyfile=tls_keyfile,
tls_version=tls_version,
tls_ciphers=tls_ciphers,
tls_insecure=tls_insecure,
)
if not client.is_alive(): if not client.is_alive():
client.start() client.start()
def _get_client_id( def _get_client_id(
self, host: str, port: int, topics: Optional[List[str]] = None, self,
client_id: Optional[str] = None, on_message: Optional[bool] = None, host: str,
port: int,
topics: Optional[List[str]] = None,
client_id: Optional[str] = None,
on_message: Optional[bool] = None,
) -> str: ) -> str:
return '{client_id}-{client_hash}'.format( return '{client_id}-{client_hash}'.format(
client_id=client_id or self.client_id, client_id=client_id or self.client_id,
client_hash=hashlib.sha1('|'.join([ client_hash=hashlib.sha1(
host, str(port), '|'.join(
json.dumps(sorted(topics or [])), [
str(id(on_message)) host,
]).encode()).hexdigest(), str(port),
json.dumps(sorted(topics or [])),
str(id(on_message)),
]
).encode()
).hexdigest(),
) )
def _get_client(self, host: str, port: int, topics: Optional[List[str]] = None, username: Optional[str] = None, def _get_client(
password: Optional[str] = None, client_id: Optional[str] = None, tls_cafile: Optional[str] = None, self,
tls_certfile: Optional[str] = None, tls_keyfile: Optional[str] = None, tls_version: Optional = None, host: str,
tls_ciphers: Optional = None, tls_insecure: bool = False, on_message: Optional[Callable] = None) \ port: int,
-> MqttClient: topics: Optional[List[str]] = None,
username: Optional[str] = None,
password: Optional[str] = None,
client_id: Optional[str] = None,
tls_cafile: Optional[str] = None,
tls_certfile: Optional[str] = None,
tls_keyfile: Optional[str] = None,
tls_version=None,
tls_ciphers=None,
tls_insecure: bool = False,
on_message: Optional[Callable] = None,
) -> MqttClient:
on_message = on_message or self.on_mqtt_message() on_message = on_message or self.on_mqtt_message()
client_id = self._get_client_id(host=host, port=port, topics=topics, client_id=client_id, on_message=on_message) client_id = self._get_client_id(
host=host,
port=port,
topics=topics,
client_id=client_id,
on_message=on_message,
)
client = self._listeners.get(client_id) client = self._listeners.get(client_id)
if not (client and client.is_alive()): if not (client and client.is_alive()):
client = self._listeners[client_id] = MqttClient( client = self._listeners[client_id] = MqttClient(
host=host, port=port, topics=topics, username=username, password=password, host=host,
client_id=client_id, tls_cafile=tls_cafile, tls_certfile=tls_certfile, port=port,
tls_keyfile=tls_keyfile, tls_version=tls_version, tls_ciphers=tls_ciphers, topics=topics,
tls_insecure=tls_insecure, on_message=on_message username=username,
password=password,
client_id=client_id,
tls_cafile=tls_cafile,
tls_certfile=tls_certfile,
tls_keyfile=tls_keyfile,
tls_version=tls_version,
tls_ciphers=tls_ciphers,
tls_insecure=tls_insecure,
on_message=on_message,
) )
client.subscribe(*topics) client.subscribe(*topics)
@ -293,7 +381,11 @@ class MqttBackend(Backend):
self.logger.debug(str(e)) self.logger.debug(str(e))
# noinspection PyProtectedMember # noinspection PyProtectedMember
self.bus.post(MQTTMessageEvent(host=client._host, port=client._port, topic=msg.topic, msg=data)) self.bus.post(
MQTTMessageEvent(
host=client._host, port=client._port, topic=msg.topic, msg=data
)
)
return handler return handler
@ -307,8 +399,11 @@ class MqttBackend(Backend):
return return
response_topic = '{}/responses/{}'.format(self.topic, msg.id) response_topic = '{}/responses/{}'.format(self.topic, msg.id)
self.logger.info('Processing response on the MQTT topic {}: {}'. self.logger.info(
format(response_topic, response)) 'Processing response on the MQTT topic {}: {}'.format(
response_topic, response
)
)
self.send_message(response, topic=response_topic) self.send_message(response, topic=response_topic)
@ -332,7 +427,9 @@ class MqttBackend(Backend):
return return
if isinstance(msg, Request): if isinstance(msg, Request):
threading.Thread(target=response_thread, name='MQTTProcessor', args=(msg,)).start() threading.Thread(
target=response_thread, name='MQTTProcessor', args=(msg,)
).start()
return handler return handler
@ -341,16 +438,28 @@ class MqttBackend(Backend):
if self.host and self.subscribe_default_topic: if self.host and self.subscribe_default_topic:
topics = [self.topic] topics = [self.topic]
client = self._get_client(host=self.host, port=self.port, topics=topics, username=self.username, client = self._get_client(
password=self.password, client_id=self.client_id, host=self.host,
tls_cafile=self.tls_cafile, tls_certfile=self.tls_certfile, port=self.port,
tls_keyfile=self.tls_keyfile, tls_version=self.tls_version, topics=topics,
tls_ciphers=self.tls_ciphers, tls_insecure=self.tls_insecure, username=self.username,
on_message=self.on_exec_message()) password=self.password,
client_id=self.client_id,
tls_cafile=self.tls_cafile,
tls_certfile=self.tls_certfile,
tls_keyfile=self.tls_keyfile,
tls_version=self.tls_version,
tls_ciphers=self.tls_ciphers,
tls_insecure=self.tls_insecure,
on_message=self.on_exec_message(),
)
client.start() client.start()
self.logger.info('Initialized MQTT backend on host {}:{}, topic {}'. self.logger.info(
format(self.host, self.port, self.topic)) 'Initialized MQTT backend on host {}:{}, topic {}'.format(
self.host, self.port, self.topic
)
)
self.add_listeners(*self.listeners_conf) self.add_listeners(*self.listeners_conf)
@ -365,4 +474,5 @@ class MqttBackend(Backend):
self.logger.info('MQTT backend terminated') self.logger.info('MQTT backend terminated')
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et:

View file

@ -1,351 +1,34 @@
import contextlib import warnings
import json
from typing import Optional, Mapping
from platypush.backend.mqtt import MqttBackend from platypush.backend import Backend
from platypush.context import get_plugin
from platypush.message.event.zigbee.mqtt import (
ZigbeeMqttOnlineEvent,
ZigbeeMqttOfflineEvent,
ZigbeeMqttDevicePropertySetEvent,
ZigbeeMqttDevicePairingEvent,
ZigbeeMqttDeviceConnectedEvent,
ZigbeeMqttDeviceBannedEvent,
ZigbeeMqttDeviceRemovedEvent,
ZigbeeMqttDeviceRemovedFailedEvent,
ZigbeeMqttDeviceWhitelistedEvent,
ZigbeeMqttDeviceRenamedEvent,
ZigbeeMqttDeviceBindEvent,
ZigbeeMqttDeviceUnbindEvent,
ZigbeeMqttGroupAddedEvent,
ZigbeeMqttGroupAddedFailedEvent,
ZigbeeMqttGroupRemovedEvent,
ZigbeeMqttGroupRemovedFailedEvent,
ZigbeeMqttGroupRemoveAllEvent,
ZigbeeMqttGroupRemoveAllFailedEvent,
ZigbeeMqttErrorEvent,
)
class ZigbeeMqttBackend(MqttBackend): class ZigbeeMqttBackend(Backend):
""" """
Listen for events on a zigbee2mqtt service. Listen for events on a zigbee2mqtt service.
For historical reasons, this backend should be enabled together with the `zigbee.mqtt` plugin. **WARNING**: This backend is **DEPRECATED** and it will be removed in a
future version.
Triggers: It has been merged with
:class:`platypush.plugins.zigbee.mqtt.ZigbeeMqttPlugin`.
* :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttOnlineEvent` when the service comes online.
* :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttOfflineEvent` when the service goes offline.
* :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDevicePropertySetEvent` when the properties of a
connected device change.
* :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDevicePairingEvent` when a device is pairing.
* :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceConnectedEvent` when a device connects
to the network.
* :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceBannedEvent` when a device is banned
from the network.
* :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceRemovedEvent` when a device is removed
from the network.
* :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceRemovedFailedEvent` when a request to
remove a device from the network fails.
* :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceWhitelistedEvent` when a device is
whitelisted on the network.
* :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceRenamedEvent` when a device is
renamed on the network.
* :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceBindEvent` when a device bind event
occurs.
* :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceUnbindEvent` when a device unbind event
occurs.
* :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupAddedEvent` when a group is added.
* :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupAddedFailedEvent` when a request to
add a new group fails.
* :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupRemovedEvent` when a group is removed.
* :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupRemovedFailedEvent` when a request to
remove a group fails.
* :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupRemoveAllEvent` when all the devices
are removed from a group.
* :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupRemoveAllFailedEvent` when a request to
remove all the devices from a group fails.
* :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttErrorEvent` when an internal error occurs
on the zigbee2mqtt service.
Requires:
* **paho-mqtt** (``pip install paho-mqtt``)
* The :class:`platypush.plugins.zigbee.mqtt.ZigbeeMqttPlugin` plugin configured.
Now you can simply configure the `zigbee.mqtt` plugin in order to enable
the Zigbee integration - no need to enable both the plugin and the backend.
""" """
def __init__(
self,
host: Optional[str] = None,
port: Optional[int] = None,
base_topic='zigbee2mqtt',
tls_cafile: Optional[str] = None,
tls_certfile: Optional[str] = None,
tls_keyfile: Optional[str] = None,
tls_version: Optional[str] = None,
tls_ciphers: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
client_id: Optional[str] = None,
*args,
**kwargs
):
"""
:param host: MQTT broker host (default: host configured on the ``zigbee.mqtt`` plugin).
:param port: MQTT broker port (default: 1883).
:param base_topic: Prefix of the topics published by zigbee2mqtt (default: '``zigbee2mqtt``').
:param tls_cafile: If TLS/SSL is enabled on the MQTT server and the certificate requires a certificate authority
to authenticate it, `ssl_cafile` will point to the provided ca.crt file (default: None)
:param tls_certfile: If TLS/SSL is enabled on the MQTT server and a client certificate it required, specify it
here (default: None)
:param tls_keyfile: If TLS/SSL is enabled on the MQTT server and a client certificate key it required,
specify it here (default: None) :type tls_keyfile: str
:param tls_version: If TLS/SSL is enabled on the MQTT server and it requires a certain TLS version, specify it
here (default: None)
:param tls_ciphers: If TLS/SSL is enabled on the MQTT server and an explicit list of supported ciphers is
required, specify it here (default: None)
:param username: Specify it if the MQTT server requires authentication (default: None)
:param password: Specify it if the MQTT server requires authentication (default: None)
:param client_id: MQTT client ID (default: ``<device_id>-zigbee-mqtt``, to prevent clashes with the
:class:`platypush.backend.mqtt.MqttBackend` ``client_id``.
"""
plugin = get_plugin('zigbee.mqtt')
self.base_topic = base_topic or plugin.base_topic
self._devices = {}
self._devices_info = {}
self._groups = {}
self._last_state = None
self.server_info = {
'host': host or plugin.host,
'port': port or plugin.port or self._default_mqtt_port,
'tls_cafile': tls_cafile or plugin.tls_cafile,
'tls_certfile': tls_certfile or plugin.tls_certfile,
'tls_ciphers': tls_ciphers or plugin.tls_ciphers,
'tls_keyfile': tls_keyfile or plugin.tls_keyfile,
'tls_version': tls_version or plugin.tls_version,
'username': username or plugin.username,
'password': password or plugin.password,
}
kwargs = {
**kwargs,
**self.server_info,
}
listeners = [
{
**self.server_info,
'topics': [
self.base_topic + '/' + topic
for topic in [
'bridge/state',
'bridge/log',
'bridge/logging',
'bridge/devices',
'bridge/groups',
]
],
}
]
super().__init__(
*args,
subscribe_default_topic=False,
listeners=listeners,
client_id=client_id,
**kwargs
)
if not client_id:
self.client_id += '-zigbee-mqtt'
def _process_state_message(self, client, msg):
if msg == self._last_state:
return
if msg == 'online':
evt = ZigbeeMqttOnlineEvent
elif msg == 'offline':
evt = ZigbeeMqttOfflineEvent
self.logger.warning('zigbee2mqtt service is offline')
else:
return
# noinspection PyProtectedMember
self.bus.post(evt(host=client._host, port=client._port))
self._last_state = msg
def _process_log_message(self, client, msg):
msg_type = msg.get('type')
text = msg.get('message')
# noinspection PyProtectedMember
args = {'host': client._host, 'port': client._port}
if msg_type == 'devices':
devices = {}
for dev in text or []:
devices[dev['friendly_name']] = dev
client.subscribe(self.base_topic + '/' + dev['friendly_name'])
elif msg_type == 'pairing':
self.bus.post(ZigbeeMqttDevicePairingEvent(device=text, **args))
elif msg_type in ['device_ban', 'device_banned']:
self.bus.post(ZigbeeMqttDeviceBannedEvent(device=text, **args))
elif msg_type in ['device_removed_failed', 'device_force_removed_failed']:
force = msg_type == 'device_force_removed_failed'
self.bus.post(
ZigbeeMqttDeviceRemovedFailedEvent(device=text, force=force, **args)
)
elif msg_type == 'device_whitelisted':
self.bus.post(ZigbeeMqttDeviceWhitelistedEvent(device=text, **args))
elif msg_type == 'device_renamed':
self.bus.post(ZigbeeMqttDeviceRenamedEvent(device=text, **args))
elif msg_type == 'device_bind':
self.bus.post(ZigbeeMqttDeviceBindEvent(device=text, **args))
elif msg_type == 'device_unbind':
self.bus.post(ZigbeeMqttDeviceUnbindEvent(device=text, **args))
elif msg_type == 'device_group_add':
self.bus.post(ZigbeeMqttGroupAddedEvent(group=text, **args))
elif msg_type == 'device_group_add_failed':
self.bus.post(ZigbeeMqttGroupAddedFailedEvent(group=text, **args))
elif msg_type == 'device_group_remove':
self.bus.post(ZigbeeMqttGroupRemovedEvent(group=text, **args))
elif msg_type == 'device_group_remove_failed':
self.bus.post(ZigbeeMqttGroupRemovedFailedEvent(group=text, **args))
elif msg_type == 'device_group_remove_all':
self.bus.post(ZigbeeMqttGroupRemoveAllEvent(group=text, **args))
elif msg_type == 'device_group_remove_all_failed':
self.bus.post(ZigbeeMqttGroupRemoveAllFailedEvent(group=text, **args))
elif msg_type == 'zigbee_publish_error':
self.logger.error('zigbee2mqtt error: {}'.format(text))
self.bus.post(ZigbeeMqttErrorEvent(error=text, **args))
elif msg.get('level') in ['warning', 'error']:
log = getattr(self.logger, msg['level'])
log(
'zigbee2mqtt {}: {}'.format(
msg['level'], text or msg.get('error', msg.get('warning'))
)
)
def _process_devices(self, client, msg):
devices_info = {
device.get('friendly_name', device.get('ieee_address')): device
for device in msg
}
# noinspection PyProtectedMember
event_args = {'host': client._host, 'port': client._port}
client.subscribe(
*[self.base_topic + '/' + device for device in devices_info.keys()]
)
for name, device in devices_info.items():
if name not in self._devices:
self.bus.post(ZigbeeMqttDeviceConnectedEvent(device=name, **event_args))
exposes = (device.get('definition', {}) or {}).get('exposes', [])
payload = self._plugin._build_device_get_request(exposes)
if payload:
client.publish(
self.base_topic + '/' + name + '/get',
json.dumps(payload),
)
devices_copy = [*self._devices.keys()]
for name in devices_copy:
if name not in devices_info:
self.bus.post(ZigbeeMqttDeviceRemovedEvent(device=name, **event_args))
del self._devices[name]
self._devices = {device: {} for device in devices_info.keys()}
self._devices_info = devices_info
def _process_groups(self, client, msg):
# noinspection PyProtectedMember
event_args = {'host': client._host, 'port': client._port}
groups_info = {
group.get('friendly_name', group.get('id')): group for group in msg
}
for name in groups_info.keys():
if name not in self._groups:
self.bus.post(ZigbeeMqttGroupAddedEvent(group=name, **event_args))
groups_copy = [*self._groups.keys()]
for name in groups_copy:
if name not in groups_info:
self.bus.post(ZigbeeMqttGroupRemovedEvent(group=name, **event_args))
del self._groups[name]
self._groups = {group: {} for group in groups_info.keys()}
def on_mqtt_message(self):
def handler(client, _, msg):
topic = msg.topic[len(self.base_topic) + 1 :]
data = msg.payload.decode()
if not data:
return
with contextlib.suppress(ValueError, TypeError):
data = json.loads(data)
if topic == 'bridge/state':
self._process_state_message(client, data)
elif topic in ['bridge/log', 'bridge/logging']:
self._process_log_message(client, data)
elif topic == 'bridge/devices':
self._process_devices(client, data)
elif topic == 'bridge/groups':
self._process_groups(client, data)
else:
suffix = topic.split('/')[-1]
if suffix not in self._devices:
return
name = suffix
changed_props = {
k: v for k, v in data.items() if v != self._devices[name].get(k)
}
if changed_props:
self._process_property_update(name, data)
self.bus.post(
ZigbeeMqttDevicePropertySetEvent(
host=client._host,
port=client._port,
device=name,
properties=changed_props,
)
)
self._devices[name].update(data)
return handler
@property
def _plugin(self):
plugin = get_plugin('zigbee.mqtt')
assert plugin, 'The zigbee.mqtt plugin is not configured'
return plugin
def _process_property_update(self, device_name: str, properties: Mapping):
device_info = self._devices_info.get(device_name)
if not (device_info and properties):
return
self._plugin.publish_entities(
[
{
**device_info,
'state': properties,
}
]
)
def run(self): def run(self):
super().run() super().run()
warnings.warn(
'''
The zigbee.mqtt has been merged into the zigbee.mqtt plugin. It is
now deprecated and it will be removed in a future version. Remove
any references to it from your configuration.
''',
DeprecationWarning,
)
self.wait_stop()
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et:

View file

@ -1,45 +0,0 @@
manifest:
events:
platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceBannedEvent: when a device
is bannedfrom the network.
platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceBindEvent: when a device bind
eventoccurs.
platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceConnectedEvent: when a device
connectsto the network.
platypush.message.event.zigbee.mqtt.ZigbeeMqttDevicePairingEvent: when a device
is pairing.
platypush.message.event.zigbee.mqtt.ZigbeeMqttDevicePropertySetEvent: when the
properties of aconnected device change.
platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceRemovedEvent: when a device
is removedfrom the network.
platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceRemovedFailedEvent: when a
request toremove a device from the network fails.
platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceRenamedEvent: when a device
isrenamed on the network.
platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceUnbindEvent: when a device
unbind eventoccurs.
platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceWhitelistedEvent: when a device
iswhitelisted on the network.
platypush.message.event.zigbee.mqtt.ZigbeeMqttErrorEvent: when an internal error
occurson the zigbee2mqtt service.
platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupAddedEvent: when a group is
added.
platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupAddedFailedEvent: when a request
toadd a new group fails.
platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupRemoveAllEvent: when all the
devicesare removed from a group.
platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupRemoveAllFailedEvent: when
a request toremove all the devices from a group fails.
platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupRemovedEvent: when a group
is removed.
platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupRemovedFailedEvent: when a
request toremove a group fails.
platypush.message.event.zigbee.mqtt.ZigbeeMqttOfflineEvent: when the service goes
offline.
platypush.message.event.zigbee.mqtt.ZigbeeMqttOnlineEvent: when the service comes
online.
install:
pip:
- paho-mqtt
package: platypush.backend.zigbee.mqtt
type: backend

View file

@ -28,11 +28,12 @@ from platypush.entities.sensors import (
from platypush.entities.switches import Switch, EnumSwitch from platypush.entities.switches import Switch, EnumSwitch
from platypush.entities.temperature import TemperatureSensor from platypush.entities.temperature import TemperatureSensor
from platypush.message.response import Response from platypush.message.response import Response
from platypush.plugins import RunnablePlugin
from platypush.plugins.mqtt import MqttPlugin, action from platypush.plugins.mqtt import MqttPlugin, action
@manages(Battery, Device, Dimmer, Light, LinkQuality, Sensor, Switch) @manages(Battery, Device, Dimmer, Light, LinkQuality, Sensor, Switch)
class ZigbeeMqttPlugin(MqttPlugin): # lgtm [py/missing-call-to-init] class ZigbeeMqttPlugin(RunnablePlugin, MqttPlugin): # lgtm [py/missing-call-to-init]
""" """
This plugin allows you to interact with Zigbee devices over MQTT through any Zigbee sniffer and This plugin allows you to interact with Zigbee devices over MQTT through any Zigbee sniffer and
`zigbee2mqtt <https://www.zigbee2mqtt.io/>`_. `zigbee2mqtt <https://www.zigbee2mqtt.io/>`_.
@ -126,6 +127,42 @@ class ZigbeeMqttPlugin(MqttPlugin): # lgtm [py/missing-call-to-init]
* **paho-mqtt** (``pip install paho-mqtt``) * **paho-mqtt** (``pip install paho-mqtt``)
Triggers:
* :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttOnlineEvent` when the service comes online.
* :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttOfflineEvent` when the service goes offline.
* :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDevicePropertySetEvent` when the properties of a
connected device change.
* :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDevicePairingEvent` when a device is pairing.
* :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceConnectedEvent` when a device connects
to the network.
* :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceBannedEvent` when a device is banned
from the network.
* :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceRemovedEvent` when a device is removed
from the network.
* :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceRemovedFailedEvent` when a request to
remove a device from the network fails.
* :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceWhitelistedEvent` when a device is
whitelisted on the network.
* :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceRenamedEvent` when a device is
renamed on the network.
* :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceBindEvent` when a device bind event
occurs.
* :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceUnbindEvent` when a device unbind event
occurs.
* :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupAddedEvent` when a group is added.
* :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupAddedFailedEvent` when a request to
add a new group fails.
* :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupRemovedEvent` when a group is removed.
* :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupRemovedFailedEvent` when a request to
remove a group fails.
* :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupRemoveAllEvent` when all the devices
are removed from a group.
* :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupRemoveAllFailedEvent` when a request to
remove all the devices from a group fails.
* :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttErrorEvent` when an internal error occurs
on the zigbee2mqtt service.
""" """
def __init__( def __init__(
@ -1881,5 +1918,13 @@ class ZigbeeMqttPlugin(MqttPlugin): # lgtm [py/missing-call-to-init]
self.device_set(self._preferred_name(dev), values=data) self.device_set(self._preferred_name(dev), values=data)
def main(self):
from ._listener import ZigbeeMqttListener
listener = ZigbeeMqttListener()
listener.start()
self.wait_stop()
listener.join()
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et:

View file

@ -0,0 +1,269 @@
import contextlib
import json
from typing import Mapping
from platypush.backend.mqtt import MqttBackend
from platypush.bus import Bus
from platypush.context import get_bus, get_plugin
from platypush.message.event.zigbee.mqtt import (
ZigbeeMqttOnlineEvent,
ZigbeeMqttOfflineEvent,
ZigbeeMqttDevicePropertySetEvent,
ZigbeeMqttDevicePairingEvent,
ZigbeeMqttDeviceConnectedEvent,
ZigbeeMqttDeviceBannedEvent,
ZigbeeMqttDeviceRemovedEvent,
ZigbeeMqttDeviceRemovedFailedEvent,
ZigbeeMqttDeviceWhitelistedEvent,
ZigbeeMqttDeviceRenamedEvent,
ZigbeeMqttDeviceBindEvent,
ZigbeeMqttDeviceUnbindEvent,
ZigbeeMqttGroupAddedEvent,
ZigbeeMqttGroupAddedFailedEvent,
ZigbeeMqttGroupRemovedEvent,
ZigbeeMqttGroupRemovedFailedEvent,
ZigbeeMqttGroupRemoveAllEvent,
ZigbeeMqttGroupRemoveAllFailedEvent,
ZigbeeMqttErrorEvent,
)
from platypush.plugins.zigbee.mqtt import ZigbeeMqttPlugin
class ZigbeeMqttListener(MqttBackend):
"""
Listener for zigbee2mqtt events.
"""
def __init__(self):
plugin = self._plugin
self.base_topic = plugin.base_topic # type: ignore
self._devices = {}
self._devices_info = {}
self._groups = {}
self._last_state = None
self.server_info = {
'host': plugin.host, # type: ignore
'port': plugin.port or self._default_mqtt_port, # type: ignore
'tls_cafile': plugin.tls_cafile, # type: ignore
'tls_certfile': plugin.tls_certfile, # type: ignore
'tls_ciphers': plugin.tls_ciphers, # type: ignore
'tls_keyfile': plugin.tls_keyfile, # type: ignore
'tls_version': plugin.tls_version, # type: ignore
'username': plugin.username, # type: ignore
'password': plugin.password, # type: ignore
}
listeners = [
{
**self.server_info,
'topics': [
self.base_topic + '/' + topic
for topic in [
'bridge/state',
'bridge/log',
'bridge/logging',
'bridge/devices',
'bridge/groups',
]
],
}
]
super().__init__(
subscribe_default_topic=False, listeners=listeners, **self.server_info
)
assert self.client_id
self.client_id += '-zigbee-mqtt'
def _process_state_message(self, client, msg):
if msg == self._last_state:
return
if msg == 'online':
evt = ZigbeeMqttOnlineEvent
elif msg == 'offline':
evt = ZigbeeMqttOfflineEvent
self.logger.warning('zigbee2mqtt service is offline')
else:
return
self._bus.post(evt(host=client._host, port=client._port))
self._last_state = msg
def _process_log_message(self, client, msg):
msg_type = msg.get('type')
text = msg.get('message')
args = {'host': client._host, 'port': client._port}
if msg_type == 'devices':
devices = {}
for dev in text or []:
devices[dev['friendly_name']] = dev
client.subscribe(self.base_topic + '/' + dev['friendly_name'])
elif msg_type == 'pairing':
self._bus.post(ZigbeeMqttDevicePairingEvent(device=text, **args))
elif msg_type in ['device_ban', 'device_banned']:
self._bus.post(ZigbeeMqttDeviceBannedEvent(device=text, **args))
elif msg_type in ['device_removed_failed', 'device_force_removed_failed']:
force = msg_type == 'device_force_removed_failed'
self._bus.post(
ZigbeeMqttDeviceRemovedFailedEvent(device=text, force=force, **args)
)
elif msg_type == 'device_whitelisted':
self._bus.post(ZigbeeMqttDeviceWhitelistedEvent(device=text, **args))
elif msg_type == 'device_renamed':
self._bus.post(ZigbeeMqttDeviceRenamedEvent(device=text, **args))
elif msg_type == 'device_bind':
self._bus.post(ZigbeeMqttDeviceBindEvent(device=text, **args))
elif msg_type == 'device_unbind':
self._bus.post(ZigbeeMqttDeviceUnbindEvent(device=text, **args))
elif msg_type == 'device_group_add':
self._bus.post(ZigbeeMqttGroupAddedEvent(group=text, **args))
elif msg_type == 'device_group_add_failed':
self._bus.post(ZigbeeMqttGroupAddedFailedEvent(group=text, **args))
elif msg_type == 'device_group_remove':
self._bus.post(ZigbeeMqttGroupRemovedEvent(group=text, **args))
elif msg_type == 'device_group_remove_failed':
self._bus.post(ZigbeeMqttGroupRemovedFailedEvent(group=text, **args))
elif msg_type == 'device_group_remove_all':
self._bus.post(ZigbeeMqttGroupRemoveAllEvent(group=text, **args))
elif msg_type == 'device_group_remove_all_failed':
self._bus.post(ZigbeeMqttGroupRemoveAllFailedEvent(group=text, **args))
elif msg_type == 'zigbee_publish_error':
self.logger.error('zigbee2mqtt error: {}'.format(text))
self._bus.post(ZigbeeMqttErrorEvent(error=text, **args))
elif msg.get('level') in ['warning', 'error']:
log = getattr(self.logger, msg['level'])
log(
'zigbee2mqtt {}: {}'.format(
msg['level'], text or msg.get('error', msg.get('warning'))
)
)
def _process_devices(self, client, msg):
devices_info = {
device.get('friendly_name', device.get('ieee_address')): device
for device in msg
}
# noinspection PyProtectedMember
event_args = {'host': client._host, 'port': client._port}
client.subscribe(
*[self.base_topic + '/' + device for device in devices_info.keys()]
)
for name, device in devices_info.items():
if name not in self._devices:
self._bus.post(
ZigbeeMqttDeviceConnectedEvent(device=name, **event_args)
)
exposes = (device.get('definition', {}) or {}).get('exposes', [])
payload = self._plugin._build_device_get_request(exposes) # type: ignore
if payload:
client.publish(
self.base_topic + '/' + name + '/get',
json.dumps(payload),
)
devices_copy = [*self._devices.keys()]
for name in devices_copy:
if name not in devices_info:
self._bus.post(ZigbeeMqttDeviceRemovedEvent(device=name, **event_args))
del self._devices[name]
self._devices = {device: {} for device in devices_info.keys()}
self._devices_info = devices_info
def _process_groups(self, client, msg):
# noinspection PyProtectedMember
event_args = {'host': client._host, 'port': client._port}
groups_info = {
group.get('friendly_name', group.get('id')): group for group in msg
}
for name in groups_info.keys():
if name not in self._groups:
self._bus.post(ZigbeeMqttGroupAddedEvent(group=name, **event_args))
groups_copy = [*self._groups.keys()]
for name in groups_copy:
if name not in groups_info:
self._bus.post(ZigbeeMqttGroupRemovedEvent(group=name, **event_args))
del self._groups[name]
self._groups = {group: {} for group in groups_info.keys()}
def on_mqtt_message(self):
def handler(client, _, msg):
topic = msg.topic[len(self.base_topic) + 1 :]
data = msg.payload.decode()
if not data:
return
with contextlib.suppress(ValueError, TypeError):
data = json.loads(data)
if topic == 'bridge/state':
self._process_state_message(client, data)
elif topic in ['bridge/log', 'bridge/logging']:
self._process_log_message(client, data)
elif topic == 'bridge/devices':
self._process_devices(client, data)
elif topic == 'bridge/groups':
self._process_groups(client, data)
else:
suffix = topic.split('/')[-1]
if suffix not in self._devices:
return
name = suffix
changed_props = {
k: v for k, v in data.items() if v != self._devices[name].get(k)
}
if changed_props:
self._process_property_update(name, data)
self._bus.post(
ZigbeeMqttDevicePropertySetEvent(
host=client._host,
port=client._port,
device=name,
properties=changed_props,
)
)
self._devices[name].update(data)
return handler
@property
def _plugin(self) -> ZigbeeMqttPlugin:
plugin = get_plugin('zigbee.mqtt')
assert plugin, 'The zigbee.mqtt plugin is not configured'
return plugin
@property
def _bus(self) -> Bus:
return get_bus()
def _process_property_update(self, device_name: str, properties: Mapping):
device_info = self._devices_info.get(device_name)
if not (device_info and properties):
return
self._plugin.publish_entities( # type: ignore
[
{
**device_info,
'state': properties,
}
]
)
def run(self):
super().run()
# vim:sw=4:ts=4:et:

View file

@ -1,5 +1,43 @@
manifest: manifest:
events: {} events:
platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceBannedEvent: >
when a device is banned from the network.
platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceBindEvent: >
when a device bind event occurs.
platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceConnectedEvent: >
when a device connects to the network.
platypush.message.event.zigbee.mqtt.ZigbeeMqttDevicePairingEvent: >
when a device is pairing.
platypush.message.event.zigbee.mqtt.ZigbeeMqttDevicePropertySetEvent: >
when the properties of a connected device change.
platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceRemovedEvent: >
when a device is removed from the network.
platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceRemovedFailedEvent: >
when a request to remove a device from the network fails.
platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceRenamedEvent: >
when a device is renamed on the network.
platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceUnbindEvent: >
when a device unbind event occurs.
platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceWhitelistedEvent: >
when a device is whitelisted on the network.
platypush.message.event.zigbee.mqtt.ZigbeeMqttErrorEvent: >
when an internal error occurs on the zigbee2mqtt service.
platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupAddedEvent: >
when a group is added.
platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupAddedFailedEvent: >
when a request to add a new group fails.
platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupRemoveAllEvent: >
when all the devices are removed from a group.
platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupRemoveAllFailedEvent: >
when a request to remove all the devices from a group fails.
platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupRemovedEvent: >
when a group is removed.
platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupRemovedFailedEvent: >
when a request to remove a group fails.
platypush.message.event.zigbee.mqtt.ZigbeeMqttOfflineEvent: >
when the service goes offline.
platypush.message.event.zigbee.mqtt.ZigbeeMqttOnlineEvent: >
when the service comes online.
install: install:
pip: pip:
- paho-mqtt - paho-mqtt