Added zigbee2mqtt integration (see #76)

This commit is contained in:
Fabio Manganiello 2020-01-22 18:34:28 +01:00
parent 976c4c4854
commit 8255f9af28
15 changed files with 1049 additions and 161 deletions

View file

@ -7,6 +7,7 @@ Backends
:caption: Backends:
platypush/backend/adafruit.io.rst
platypush/backend/assistant.rst
platypush/backend/assistant.google.rst
platypush/backend/assistant.snowboy.rst
platypush/backend/bluetooth.rst
@ -59,3 +60,4 @@ Backends
platypush/backend/weather.darksky.rst
platypush/backend/websocket.rst
platypush/backend/wiimote.rst
platypush/backend/zigbee.mqtt.rst

View file

@ -54,3 +54,4 @@ Events
platypush/events/web.widget.rst
platypush/events/wiimote.rst
platypush/events/zeroborg.rst
platypush/events/zigbee.mqtt.rst

View file

@ -0,0 +1,5 @@
``platypush.backend.assistant``
===============================
.. automodule:: platypush.backend.assistant
:members:

View file

@ -0,0 +1,5 @@
``platypush.backend.zigbee.mqtt``
=================================
.. automodule:: platypush.backend.zigbee.mqtt
:members:

View file

@ -0,0 +1,5 @@
``platypush.message.event.zigbee.mqtt``
=======================================
.. automodule:: platypush.message.event.zigbee.mqtt
:members:

View file

@ -0,0 +1,5 @@
``platypush.plugins.zigbee.mqtt``
=================================
.. automodule:: platypush.plugins.zigbee.mqtt
:members:

View file

@ -110,3 +110,4 @@ Plugins
platypush/plugins/weather.darksky.rst
platypush/plugins/websocket.rst
platypush/plugins/wiimote.rst
platypush/plugins/zigbee.mqtt.rst

View file

@ -1,6 +1,7 @@
import json
import os
import threading
from typing import Optional
from platypush.backend import Backend
from platypush.context import get_plugin
@ -27,53 +28,50 @@ class MqttBackend(Backend):
_default_mqtt_port = 1883
def __init__(self, host, port=_default_mqtt_port, topic='platypush_bus_mq',
tls_cafile=None, tls_certfile=None, tls_keyfile=None,
tls_version=None, tls_ciphers=None, username=None,
password=None, listeners=None, *args, **kwargs):
def __init__(self, host: Optional[str] = None, port: int = _default_mqtt_port,
topic='platypush_bus_mq', subscribe_default_topic: bool = True,
tls_cafile: Optional[str] = None, tls_certfile: Optional[str] = None,
tls_keyfile: Optional[str] = None, tls_version: Optional[str] = None,
tls_ciphers: Optional[str] = None, username: Optional[str] = None,
password: Optional[str] = None, listeners=None, *args, **kwargs):
"""
:param host: MQTT broker host
:type host: str
:param port: MQTT broker port (default: 1883)
:type port: int
:param topic: Topic to read messages from (default: ``platypush_bus_mq/<device_id>``)
:type topic: str
:param subscribe_default_topic: Whether the backend should subscribe the default topic (default:
``platypush_bus_mq/<device_id>``) and execute the messages received there as action requests
(default: True).
:param tls_cafile: If TLS/SSL is enabled on the MQTT server and the certificate requires a certificate authority
to authenticate it, `ssl_cafile` will point to the provided ca.crt file (default: None)
:type tls_cafile: str
:param tls_certfile: If TLS/SSL is enabled on the MQTT server and a client certificate it required, specify it
here (default: None)
:type tls_certfile: str
:param tls_keyfile: If TLS/SSL is enabled on the MQTT server and a client certificate key it required,
specify it here (default: None) :type tls_keyfile: str
:param tls_version: If TLS/SSL is enabled on the MQTT server and it requires a certain TLS version, specify it
here (default: None)
:type tls_version: str
:param tls_ciphers: If TLS/SSL is enabled on the MQTT server and an explicit list of supported ciphers is
required, specify it here (default: None)
:type tls_ciphers: str
:param username: Specify it if the MQTT server requires authentication (default: None)
:type username: str
:param password: Specify it if the MQTT server requires authentication (default: None)
:type password: str
:param listeners: If specified then the MQTT backend will also listen for
messages on the additional configured message queues. This parameter
is a list of maps where each item supports the same arguments passed
to the main backend configuration (host, port, topic, password etc.).
Note that the message queue configured on the main configuration
will expect valid Platypush messages that then can execute, while
message queues registered to the listeners will accept any message.
:type listeners: list[dict]
message queues registered to the listeners will accept any message. Example::
listeners:
- host: localhost
topics:
- topic1
- topic2
- topic3
- host: sensors
topics:
- topic4
- topic5
"""
super().__init__(*args, **kwargs)
@ -81,6 +79,7 @@ class MqttBackend(Backend):
self.host = host
self.port = port
self.topic = '{}/{}'.format(topic, self.device_id)
self.subscribe_default_topic = subscribe_default_topic
self.username = username
self.password = password
self._client = None
@ -99,38 +98,30 @@ class MqttBackend(Backend):
self.tls_ciphers = tls_ciphers
self.listeners_conf = listeners or []
def send_message(self, msg, **kwargs):
def send_message(self, msg, topic: Optional[str] = None, **kwargs):
try:
client = get_plugin('mqtt')
client.send_message(topic=self.topic, msg=msg, host=self.host,
client.send_message(topic=topic or self.topic, msg=msg, host=self.host,
port=self.port, username=self.username,
password=self.password, tls_cafile=self.tls_cafile,
tls_certfile=self.tls_certfile,
tls_keyfile=self.tls_keyfile,
tls_version=self.tls_version,
tls_ciphers=self.tls_ciphers)
tls_ciphers=self.tls_ciphers, **kwargs)
except Exception as e:
self.logger.exception(e)
def _initialize_listeners(self, listeners_conf):
import paho.mqtt.client as mqtt
# noinspection PyShadowingNames
def listener_thread(client, host, port):
client.connect(host, port, 60)
client.loop_forever()
# noinspection PyShadowingNames
def on_connect(topics):
# noinspection PyShadowingNames,PyUnusedLocal
@staticmethod
def on_connect(*topics):
# noinspection PyUnusedLocal
def handler(client, userdata, flags, rc):
for topic in topics:
client.subscribe(topic)
return handler
# noinspection PyShadowingNames,PyUnusedLocal
def on_message(client, userdata, msg):
def on_mqtt_message(self):
def handler(client, _, msg):
data = msg.payload
# noinspection PyBroadException
try:
@ -140,9 +131,19 @@ class MqttBackend(Backend):
pass
# noinspection PyProtectedMember
self.bus.post(MQTTMessageEvent(host=client._host, port=client._port,
topic=msg.topic, msg=data))
self.bus.post(MQTTMessageEvent(host=client._host, port=client._port, topic=msg.topic, msg=data))
return handler
def _initialize_listeners(self, listeners_conf):
import paho.mqtt.client as mqtt
# noinspection PyShadowingNames
def listener_thread(client, host, port):
client.connect(host, port)
client.loop_forever()
# noinspection PyShadowingNames,PyUnusedLocal
for i, listener in enumerate(listeners_conf):
host = listener.get('host')
port = listener.get('port', self._default_mqtt_port)
@ -157,8 +158,8 @@ class MqttBackend(Backend):
continue
client = mqtt.Client()
client.on_connect = on_connect(topics)
client.on_message = on_message
client.on_connect = self.on_connect(*topics)
client.on_message = self.on_mqtt_message()
if username and password:
client.username_pw_set(username, password)
@ -173,13 +174,8 @@ class MqttBackend(Backend):
threading.Thread(target=listener_thread, kwargs={
'client': client, 'host': host, 'port': port}).start()
def run(self):
# noinspection PyUnusedLocal
def on_connect(client, userdata, flags, rc):
client.subscribe(self.topic)
# noinspection PyUnusedLocal
def on_message(client, userdata, msg):
def on_exec_message(self):
def handler(_, __, msg):
# noinspection PyShadowingNames
def response_thread(msg):
set_thread_name('MQTTProcessor')
@ -196,7 +192,8 @@ class MqttBackend(Backend):
msg = msg.payload.decode('utf-8')
# noinspection PyBroadException
try:
msg = Message.build(json.loads(msg))
msg = json.loads(msg)
msg = Message.build(msg)
except:
pass
@ -212,17 +209,22 @@ class MqttBackend(Backend):
return
if isinstance(msg, Request):
threading.Thread(target=response_thread,
name='MQTTProcessor',
args=(msg,)).start()
threading.Thread(target=response_thread, name='MQTTProcessor', args=(msg,)).start()
return handler
def run(self):
import paho.mqtt.client as mqtt
super().run()
self._client = mqtt.Client()
self._client.on_connect = on_connect
self._client.on_message = on_message
self._client = None
if self.host:
self._client = mqtt.Client()
if self.subscribe_default_topic:
self._client.on_connect = self.on_connect(self.topic)
self._client.on_message = self.on_exec_message()
if self.username and self.password:
self._client.username_pw_set(self.username, self.password)
@ -236,6 +238,7 @@ class MqttBackend(Backend):
format(self.host, self.port, self.topic))
self._initialize_listeners(self.listeners_conf)
if self._client:
self._client.loop_forever()
def stop(self):
@ -250,9 +253,9 @@ class MqttBackend(Backend):
listener.loop_stop()
except Exception as e:
# noinspection PyProtectedMember
self.logger.warning('Could not stop listener ' +
'{host}:{port}: {error}'.format(
self.logger.warning('Could not stop listener {host}:{port}: {error}'.format(
host=listener._host, port=listener._port,
error=str(e)))
# vim:sw=4:ts=4:et:

View file

View file

@ -0,0 +1,212 @@
import json
from typing import Optional
from platypush.backend.mqtt import MqttBackend
from platypush.context import get_plugin
from platypush.message.event.zigbee.mqtt import ZigbeeMqttOnlineEvent, ZigbeeMqttOfflineEvent, \
ZigbeeMqttDevicePropertySetEvent, ZigbeeMqttDevicePairingEvent, ZigbeeMqttDeviceConnectedEvent, \
ZigbeeMqttDeviceBannedEvent, ZigbeeMqttDeviceRemovedEvent, ZigbeeMqttDeviceRemovedFailedEvent, \
ZigbeeMqttDeviceWhitelistedEvent, ZigbeeMqttDeviceRenamedEvent, ZigbeeMqttDeviceBindEvent, \
ZigbeeMqttDeviceUnbindEvent, ZigbeeMqttGroupAddedEvent, ZigbeeMqttGroupAddedFailedEvent, \
ZigbeeMqttGroupRemovedEvent, ZigbeeMqttGroupRemovedFailedEvent, ZigbeeMqttGroupRemoveAllEvent, \
ZigbeeMqttGroupRemoveAllFailedEvent, ZigbeeMqttErrorEvent
class ZigbeeMqttBackend(MqttBackend):
"""
Listen for events on a zigbee2mqtt service.
Triggers:
* :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttOnlineEvent` when the service comes online.
* :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttOfflineEvent` when the service goes offline.
* :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDevicePropertySetEvent` when the properties of a
connected device change.
* :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDevicePairingEvent` when a device is pairing.
* :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceConnectedEvent` when a device connects
to the network.
* :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceBannedEvent` when a device is banned
from the network.
* :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceRemovedEvent` when a device is removed
from the network.
* :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceRemovedFailedEvent` when a request to
remove a device from the network fails.
* :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceWhitelistedEvent` when a device is
whitelisted on the network.
* :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceRenamedEvent` when a device is
renamed on the network.
* :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceBindEvent` when a device bind event
occurs.
* :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceUnbindEvent` when a device unbind event
occurs.
* :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupAddedEvent` when a group is added.
* :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupAddedFailedEvent` when a request to
add a new group fails.
* :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupRemovedEvent` when a group is removed.
* :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupRemovedFailedEvent` when a request to
remove a group fails.
* :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupRemoveAllEvent` when all the devices
are removed from a group.
* :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupRemoveAllFailedEvent` when a request to
remove all the devices from a group fails.
* :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttErrorEvent` when an internal error occurs
on the zigbee2mqtt service.
Requires:
* **paho-mqtt** (``pip install paho-mqtt``)
* The :class:`platypush.plugins.zigbee.mqtt.ZigbeeMqttPlugin` plugin configured.
"""
def __init__(self, host: Optional[str] = None, port: Optional[int] = None, base_topic='zigbee2mqtt',
tls_cafile: Optional[str] = None, tls_certfile: Optional[str] = None,
tls_keyfile: Optional[str] = None, tls_version: Optional[str] = None,
tls_ciphers: Optional[str] = None, username: Optional[str] = None,
password: Optional[str] = None, *args, **kwargs):
"""
:param host: MQTT broker host (default: host configured on the ``zigbee.mqtt`` plugin).
:param port: MQTT broker port (default: 1883).
:param base_topic: Prefix of the topics published by zigbee2mqtt (default: '``zigbee2mqtt``').
:param tls_cafile: If TLS/SSL is enabled on the MQTT server and the certificate requires a certificate authority
to authenticate it, `ssl_cafile` will point to the provided ca.crt file (default: None)
:param tls_certfile: If TLS/SSL is enabled on the MQTT server and a client certificate it required, specify it
here (default: None)
:param tls_keyfile: If TLS/SSL is enabled on the MQTT server and a client certificate key it required,
specify it here (default: None) :type tls_keyfile: str
:param tls_version: If TLS/SSL is enabled on the MQTT server and it requires a certain TLS version, specify it
here (default: None)
:param tls_ciphers: If TLS/SSL is enabled on the MQTT server and an explicit list of supported ciphers is
required, specify it here (default: None)
:param username: Specify it if the MQTT server requires authentication (default: None)
:param password: Specify it if the MQTT server requires authentication (default: None)
"""
if host:
self.base_topic = base_topic
listeners = [{
'host': host,
'port': port or self._default_mqtt_port,
'tls_cafile': tls_cafile,
'tls_certfile': tls_certfile,
'tls_ciphers': tls_ciphers,
'tls_keyfile': tls_keyfile,
'tls_version': tls_version,
'username': username,
'password': password,
'topics': [
base_topic + '/' + topic
for topic in ['bridge/state', 'bridge/log']
],
}]
else:
plugin = get_plugin('zigbee.mqtt')
self.base_topic = plugin.base_topic
listeners = [{
'host': plugin.host,
'port': plugin.port or self._default_mqtt_port,
'tls_cafile': plugin.tls_cafile,
'tls_certfile': plugin.tls_certfile,
'tls_ciphers': plugin.tls_ciphers,
'username': plugin.username,
'password': plugin.password,
'topics': [
plugin.base_topic + '/' + topic
for topic in ['bridge/state', 'bridge/log']
],
}]
super().__init__(subscribe_default_topic=False, listeners=listeners, *args, **kwargs)
self._devices = {}
def _process_state_message(self, client, msg):
if msg == 'online':
evt = ZigbeeMqttOnlineEvent
self._refresh_devices(client)
elif msg == 'offline':
evt = ZigbeeMqttOfflineEvent
self.logger.warning('zigbee2mqtt service is offline')
else:
return
# noinspection PyProtectedMember
self.bus.post(evt(host=client._host, port=client._port))
def _refresh_devices(self, client):
client.publish(self.base_topic + '/' + 'bridge/config/devices/get')
def _process_log_message(self, client, msg):
msg_type = msg.get('type')
msg = msg.get('message')
# noinspection PyProtectedMember
args = {'host': client._host, 'port': client._port}
if msg_type == 'devices':
devices = {}
for dev in (msg or []):
devices[dev['friendly_name']] = dev
client.subscribe(self.base_topic + '/' + dev['friendly_name'])
self._devices = devices
elif msg_type == 'pairing':
self.bus.post(ZigbeeMqttDevicePairingEvent(device=msg, **args))
elif msg_type == 'device_connected':
self.bus.post(ZigbeeMqttDeviceConnectedEvent(device=msg, **args))
self._refresh_devices(client)
elif msg_type in ['device_ban', 'device_banned']:
self.bus.post(ZigbeeMqttDeviceBannedEvent(device=msg, **args))
elif msg_type in ['device_removed', 'device_force_removed']:
force = msg_type == 'device_force_removed'
self.bus.post(ZigbeeMqttDeviceRemovedEvent(device=msg, force=force, **args))
elif msg_type in ['device_removed_failed', 'device_force_removed_failed']:
force = msg_type == 'device_force_removed_failed'
self.bus.post(ZigbeeMqttDeviceRemovedFailedEvent(device=msg, force=force, **args))
elif msg_type == 'device_whitelisted':
self.bus.post(ZigbeeMqttDeviceWhitelistedEvent(device=msg, **args))
elif msg_type == 'device_renamed':
self.bus.post(ZigbeeMqttDeviceRenamedEvent(device=msg, **args))
self._refresh_devices(client)
elif msg_type == 'device_bind':
self.bus.post(ZigbeeMqttDeviceBindEvent(device=msg, **args))
elif msg_type == 'device_unbind':
self.bus.post(ZigbeeMqttDeviceUnbindEvent(device=msg, **args))
elif msg_type == 'device_group_add':
self.bus.post(ZigbeeMqttGroupAddedEvent(group=msg, **args))
elif msg_type == 'device_group_add_failed':
self.bus.post(ZigbeeMqttGroupAddedFailedEvent(group=msg, **args))
elif msg_type == 'device_group_remove':
self.bus.post(ZigbeeMqttGroupRemovedEvent(group=msg, **args))
elif msg_type == 'device_group_remove_failed':
self.bus.post(ZigbeeMqttGroupRemovedFailedEvent(group=msg, **args))
elif msg_type == 'device_group_remove_all':
self.bus.post(ZigbeeMqttGroupRemoveAllEvent(group=msg, **args))
elif msg_type == 'device_group_remove_all_failed':
self.bus.post(ZigbeeMqttGroupRemoveAllFailedEvent(group=msg, **args))
elif msg_type == 'zigbee_publish_error':
self.logger.warning('zigbee2mqtt internal error: {}'.format(msg))
self.bus.post(ZigbeeMqttErrorEvent(error=msg, **args))
def on_mqtt_message(self):
def handler(client, _, msg):
topic = msg.topic[len(self.base_topic)+1:]
data = msg.payload.decode()
# noinspection PyBroadException
try:
data = json.loads(data)
except:
pass
if topic == 'bridge/state':
self._process_state_message(client, data)
elif topic == 'bridge/log':
self._process_log_message(client, data)
else:
# noinspection PyProtectedMember
self.bus.post(ZigbeeMqttDevicePropertySetEvent(host=client._host, port=client._port,
device=topic, properties=data))
return handler
# vim:sw=4:ts=4:et:

View file

@ -0,0 +1,162 @@
from typing import Optional, Dict, Any
from platypush.message.event import Event
class ZigbeeMqttEvent(Event):
pass
class ZigbeeMqttOnlineEvent(ZigbeeMqttEvent):
"""
Triggered when a zigbee2mqtt service goes online.
"""
def __init__(self, host: str, port: int, *args, **kwargs):
super().__init__(*args, host=host, port=port, **kwargs)
class ZigbeeMqttOfflineEvent(ZigbeeMqttEvent):
"""
Triggered when a zigbee2mqtt service goes offline.
"""
def __init__(self, host: str, port: int, *args, **kwargs):
super().__init__(*args, host=host, port=port, **kwargs)
class ZigbeeMqttDevicePropertySetEvent(ZigbeeMqttEvent):
"""
Triggered when a the properties of a Zigbee connected devices (state, brightness, alert etc.) change.
"""
def __init__(self, host: str, port: int, device: str, properties: Dict[str, Any], *args, **kwargs):
super().__init__(*args, host=host, port=port, device=device, properties=properties, **kwargs)
class ZigbeeMqttDevicePairingEvent(ZigbeeMqttEvent):
"""
Triggered when a device is pairing to the network.
"""
def __init__(self, host: str, port: int, device=None, *args, **kwargs):
super().__init__(*args, host=host, port=port, device=device, **kwargs)
class ZigbeeMqttDeviceConnectedEvent(ZigbeeMqttEvent):
"""
Triggered when a device connects to the network.
"""
def __init__(self, host: str, port: int, device=None, *args, **kwargs):
super().__init__(*args, host=host, port=port, device=device, **kwargs)
class ZigbeeMqttDeviceBannedEvent(ZigbeeMqttEvent):
"""
Triggered when a device is banned from the network.
"""
def __init__(self, host: str, port: int, device=None, *args, **kwargs):
super().__init__(*args, host=host, port=port, device=device, **kwargs)
class ZigbeeMqttDeviceRemovedEvent(ZigbeeMqttEvent):
"""
Triggered when a device is removed from the network.
"""
def __init__(self, host: str, port: int, device=None, force=False, *args, **kwargs):
super().__init__(*args, host=host, port=port, device=device, force=force, **kwargs)
class ZigbeeMqttDeviceRemovedFailedEvent(ZigbeeMqttEvent):
"""
Triggered when the removal of a device from the network failed.
"""
def __init__(self, host: str, port: int, device=None, *args, **kwargs):
super().__init__(*args, host=host, port=port, device=device, **kwargs)
class ZigbeeMqttDeviceWhitelistedEvent(ZigbeeMqttEvent):
"""
Triggered when a device is whitelisted on the network.
"""
def __init__(self, host: str, port: int, device=None, *args, **kwargs):
super().__init__(*args, host=host, port=port, device=device, **kwargs)
class ZigbeeMqttDeviceRenamedEvent(ZigbeeMqttEvent):
"""
Triggered when a device is renamed on the network.
"""
def __init__(self, host: str, port: int, device=None, *args, **kwargs):
super().__init__(*args, host=host, port=port, device=device, **kwargs)
class ZigbeeMqttDeviceBindEvent(ZigbeeMqttEvent):
"""
Triggered when a device bind occurs on the network.
"""
def __init__(self, host: str, port: int, device=None, *args, **kwargs):
super().__init__(*args, host=host, port=port, device=device, **kwargs)
class ZigbeeMqttDeviceUnbindEvent(ZigbeeMqttEvent):
"""
Triggered when a device bind occurs on the network.
"""
def __init__(self, host: str, port: int, device=None, *args, **kwargs):
super().__init__(*args, host=host, port=port, device=device, **kwargs)
class ZigbeeMqttGroupAddedEvent(ZigbeeMqttEvent):
"""
Triggered when a group is added.
"""
def __init__(self, host: str, port: int, group=None, *args, **kwargs):
super().__init__(*args, host=host, port=port, device=device, **kwargs)
class ZigbeeMqttGroupAddedFailedEvent(ZigbeeMqttEvent):
"""
Triggered when a request to add a group fails.
"""
def __init__(self, host: str, port: int, group=None, *args, **kwargs):
super().__init__(*args, host=host, port=port, device=device, **kwargs)
class ZigbeeMqttGroupRemovedEvent(ZigbeeMqttEvent):
"""
Triggered when a group is removed.
"""
def __init__(self, host: str, port: int, group=None, *args, **kwargs):
super().__init__(*args, host=host, port=port, device=device, **kwargs)
class ZigbeeMqttGroupRemovedFailedEvent(ZigbeeMqttEvent):
"""
Triggered when a request to remove a group fails.
"""
def __init__(self, host: str, port: int, group=None, *args, **kwargs):
super().__init__(*args, host=host, port=port, device=device, **kwargs)
class ZigbeeMqttGroupRemoveAllEvent(ZigbeeMqttEvent):
"""
Triggered when all the devices are removed from a group.
"""
def __init__(self, host: str, port: int, group=None, *args, **kwargs):
super().__init__(*args, host=host, port=port, device=device, **kwargs)
class ZigbeeMqttGroupRemoveAllFailedEvent(ZigbeeMqttEvent):
"""
Triggered when a request to remove all the devices from a group fails.
"""
def __init__(self, host: str, port: int, group=None, *args, **kwargs):
super().__init__(*args, host=host, port=port, device=device, **kwargs)
class ZigbeeMqttErrorEvent(ZigbeeMqttEvent):
"""
Triggered when an error happens on the zigbee2mqtt service.
"""
def __init__(self, host: str, port: int, error=None, *args, **kwargs):
super().__init__(*args, host=host, port=port, error=error, **kwargs)
# vim:sw=4:ts=4:et:

View file

@ -1,5 +1,9 @@
import io
import json
import os
import threading
from typing import Any, Optional, IO
from platypush.message import Message
from platypush.plugins import Plugin, action
@ -9,12 +13,17 @@ class MqttPlugin(Plugin):
"""
This plugin allows you to send custom message to a message queue compatible
with the MQTT protocol, see http://mqtt.org/
Requires:
* **paho-mqtt** (``pip install paho-mqtt``)
"""
def __init__(self, host=None, port=1883, tls_cafile=None,
tls_certfile=None, tls_keyfile=None,
tls_version=None, tls_ciphers=None, username=None,
password=None, *args, **kwargs):
password=None, **kwargs):
"""
:param host: If set, MQTT messages will by default routed to this host unless overridden in `send_message` (default: None)
:type host: str
@ -44,7 +53,7 @@ class MqttPlugin(Plugin):
:type password: str
"""
super().__init__(*args, **kwargs)
super().__init__(**kwargs)
self.host = host
self.port = port
@ -62,102 +71,117 @@ class MqttPlugin(Plugin):
self.tls_version = tls_version
self.tls_ciphers = tls_ciphers
@action
def send_message(self, topic, msg, host=None, port=1883, tls_cafile=None,
tls_certfile=None, tls_keyfile=None,
tls_version=None, tls_ciphers=None, username=None,
password=None, *args, **kwargs):
def publish(self, topic: str, msg: Any, host: Optional[str] = None, port: int = 1883,
reply_topic: Optional[str] = None, timeout: int = 60,
tls_cafile: Optional[str] = None, tls_certfile: Optional[str] = None,
tls_keyfile: Optional[str] = None, tls_version: Optional[str] = None,
tls_ciphers: Optional[str] = None, username: Optional[str] = None,
password: Optional[str] = None):
"""
Sends a message to a topic/channel.
Sends a message to a topic.
:param topic: Topic/channel where the message will be delivered
:type topic: str
:param msg: Message to be sent. It can be a list, a dict, or a Message object
:param host: MQTT broker hostname/IP
:type host: str
:param port: MQTT broker port (default: 1883)
:type port: int
:param tls_cafile: If TLS/SSL is enabled on the MQTT server and the certificate requires a certificate authority to authenticate it, `ssl_cafile` will point to the provided ca.crt file (default: None)
:type tls_cafile: str
:param tls_certfile: If TLS/SSL is enabled on the MQTT server and a client certificate it required, specify it here (default: None)
:type tls_certfile: str
:param tls_keyfile: If TLS/SSL is enabled on the MQTT server and a client certificate key it required, specify it here (default: None)
:type tls_keyfile: str
:param tls_version: If TLS/SSL is enabled on the MQTT server and it requires a certain TLS version, specify it here (default: None)
:type tls_version: str
:param tls_ciphers: If TLS/SSL is enabled on the MQTT server and an explicit list of supported ciphers is required, specify it here (default: None)
:type tls_ciphers: str
:param username: Specify it if the MQTT server requires authentication (default: None)
:type username: str
:param password: Specify it if the MQTT server requires authentication (default: None)
:type password: str
:param msg: Message to be sent. It can be a list, a dict, or a Message object.
:param host: MQTT broker hostname/IP.
:param port: MQTT broker port (default: 1883).
:param reply_topic: If a ``reply_topic`` is specified, then the action will wait for a response on this topic.
:param timeout: If ``reply_topic`` is set, use this parameter to specify the maximum amount of time to
wait for a response (default: 60 seconds).
:param tls_cafile: If TLS/SSL is enabled on the MQTT server and the certificate requires a certificate authority
to authenticate it, `ssl_cafile` will point to the provided ca.crt file (default: None).
:param tls_certfile: If TLS/SSL is enabled on the MQTT server and a client certificate it required, specify it
here (default: None).
:param tls_keyfile: If TLS/SSL is enabled on the MQTT server and a client certificate key it required, specify
it here (default: None).
:param tls_version: If TLS/SSL is enabled on the MQTT server and it requires a certain TLS version, specify it
here (default: None).
:param tls_ciphers: If TLS/SSL is enabled on the MQTT server and an explicit list of supported ciphers is
required, specify it here (default: None).
:param username: Specify it if the MQTT server requires authentication (default: None).
:param password: Specify it if the MQTT server requires authentication (default: None).
"""
import paho.mqtt.publish as publisher
from paho.mqtt.client import Client
if not host and not self.host:
raise RuntimeError('No host specified and no default host configured')
publisher_args = {
'hostname': host or self.host,
'port': port or self.port,
}
if not host:
tls_cafile = self.tls_cafile
tls_certfile = self.tls_certfile
tls_keyfile = self.tls_keyfile
tls_version = self.tls_version
tls_ciphers = self.tls_ciphers
username = self.username
password = self.password
client = Client()
if host:
if username and password:
publisher_args['auth'] = {
'username': username,
'password': password,
}
else:
if self.username and self.password:
publisher_args['auth'] = {
'username': username,
'password': password,
}
if host:
client.username_pw_set(username, password)
if tls_cafile:
publisher_args['tls'] = { 'ca_certs': tls_cafile }
if tls_certfile:
publisher_args['tls']['certfile'] = tls_certfile
if tls_keyfile:
publisher_args['tls']['keyfile'] = tls_keyfile
if tls_version:
publisher_args['tls']['tls_version'] = tls_version
if tls_ciphers:
publisher_args['tls']['ciphers'] = tls_ciphers
else:
if self.tls_cafile:
publisher_args['tls'] = { 'ca_certs': self.tls_cafile }
if self.tls_certfile:
publisher_args['tls']['certfile'] = self.tls_certfile
if self.tls_keyfile:
publisher_args['tls']['keyfile'] = self.tls_keyfile
if self.tls_version:
publisher_args['tls']['tls_version'] = self.tls_version
if self.tls_ciphers:
publisher_args['tls']['ciphers'] = self.tls_ciphers
client.tls_set(ca_certs=tls_cafile, certfile=tls_certfile, keyfile=tls_keyfile, tls_version=tls_version,
ciphers=tls_ciphers)
try: msg = json.dumps(msg)
except: pass
# Try to parse it as a platypush message or dump it to JSON from a dict/list
if isinstance(msg, dict) or isinstance(msg, list):
msg = json.dumps(msg)
try: msg = Message.build(json.loads(msg))
except: pass
# noinspection PyBroadException
try:
msg = Message.build(json.loads(msg))
except:
pass
publisher.single(topic, str(msg), **publisher_args)
client.connect(host, port, keepalive=timeout)
response_buffer = io.BytesIO()
try:
response_received = threading.Event()
if reply_topic:
client.on_message = self._response_callback(reply_topic=reply_topic,
event=response_received,
buffer=response_buffer)
client.subscribe(reply_topic)
client.publish(topic, str(msg))
if not reply_topic:
return
client.loop_start()
ok = response_received.wait(timeout=timeout)
if not ok:
raise TimeoutError('Response timed out')
return response_buffer.getvalue()
finally:
response_buffer.close()
# noinspection PyBroadException
try:
client.loop_stop()
except:
pass
client.disconnect()
@staticmethod
def _response_callback(reply_topic: str, event: threading.Event, buffer: IO[bytes]):
def on_message(client, _, msg):
if msg.topic != reply_topic:
return
buffer.write(msg.payload)
client.loop_stop()
event.set()
return on_message
@action
def send_message(self, *args, **kwargs):
"""
Alias for :meth:`platypush.plugins.mqtt.MqttPlugin.publish`.
"""
return self.publish(*args, **kwargs)
# vim:sw=4:ts=4:et:

View file

View file

@ -0,0 +1,463 @@
import threading
from typing import Optional, List, Any, Dict
from platypush.plugins.mqtt import MqttPlugin, action
class ZigbeeMqttPlugin(MqttPlugin):
"""
This plugin allows you to interact with Zigbee devices over MQTT through any Zigbee sniffer and
`zigbee2mqtt <https://www.zigbee2mqtt.io/>`_.
In order to get started you'll need:
- A Zigbee USB adapter/sniffer (in this example I'll use the `CC2531 <https://hackaday.io/project/163487-zigbee-cc2531-smart-home-usb-adapter>`_.
- A Zigbee debugger/emulator + downloader cable (only to flash the firmware).
Instructions:
- Install `cc-tool <https://github.com/dashesy/cc-tool>`_ either from sources or from a package manager.
- Connect the Zigbee to your PC/RaspberryPi in this way: USB -> CC debugger -> downloader cable -> CC2531 -> USB.
The debugger and the adapter should be connected _at the same time_. If the later ``cc-tool`` command throws up
an error, put the device in sync while connected by pressing the _Reset_ button on the debugger.
- Check where the device is mapped. On Linux it will usually be ``/dev/ttyACM0``.
- Download the latest `Z-Stack firmware <https://github.com/Koenkk/Z-Stack-firmware/tree/master/coordinator>`_ to your device.
Instructions for a CC2531 device:
.. code-block:: shell
wget https://github.com/Koenkk/Z-Stack-firmware/raw/master/coordinator/Z-Stack_Home_1.2/bin/default/CC2531_DEFAULT_20190608.zip
unzip CC2531_DEFAULT_20190608.zip
[sudo] cc-tool -e -w CC2531ZNP-Prod.hex
- You can disconnect your debugger and downloader cable once the firmware is flashed.
- Install ``zigbee2mqtt``. First install a node/npm environment, then either install ``zigbee2mqtt`` manually or through your
package manager. Manual instructions:
.. code-block:: shell
# Clone zigbee2mqtt repository
[sudo] git clone https://github.com/Koenkk/zigbee2mqtt.git /opt/zigbee2mqtt
[sudo] chown -R pi:pi /opt/zigbee2mqtt # Or whichever is your user
# Install dependencies (as user "pi")
cd /opt/zigbee2mqtt
npm install
- You need to have an MQTT broker running somewhere. If not, you can install
`Mosquitto <https://mosquitto.org/>`_ through your package manager on any device in your network.
- Edit the ``/opt/zigbee2mqtt/data/configuration.yaml`` file to match the configuration of your MQTT broker:
.. code-block:: yaml
# MQTT settings
mqtt:
# MQTT base topic for zigbee2mqtt MQTT messages
base_topic: zigbee2mqtt
# MQTT server URL
server: 'mqtt://localhost'
# MQTT server authentication, uncomment if required:
# user: my_user
# password: my_password
- Also make sure that ``permit_join`` is set to ``True``, in order to allow Zigbee devices to join the network
while you're configuring it. It's equally important to set ``permit_join`` to ``False`` once you have
configured your network, to prevent accidental/malignant joins from outer Zigbee devices.
- Start the ``zigbee2mqtt`` daemon on your device (the
`official documentation <https://www.zigbee2mqtt.io/getting_started/running_zigbee2mqtt.html#5-optional-running-as-a-daemon-with-systemctl>`_
also contains instructions on how to configure it as a ``systemd`` service:
.. code-block:: shell
cd /opt/zigbee2mqtt
npm start
- If you have Zigbee devices that are paired to other bridges, unlink them or do a factory reset to pair them
to your new bridge.
- If it all goes fine, once the daemon is running and a new device is found you should see traces like this in
the output of ``zigbee2mqtt``::
zigbee2mqtt:info 2019-11-09T12:19:56: Successfully interviewed '0x00158d0001dc126a', device has successfully been paired
- You are now ready to use this integration.
Requires:
* **paho-mqtt** (``pip install paho-mqtt``)
"""
def __init__(self, host: str, port: int = 1883, base_topic: str = 'zigbee2mqtt', timeout: int = 60,
tls_certfile: Optional[str] = None, tls_keyfile: Optional[str] = None,
tls_version: Optional[str] = None, tls_ciphers: Optional[str] = None,
username: Optional[str] = None, password: Optional[str] = None, **kwargs):
"""
:param host: Default MQTT broker where ``zigbee2mqtt`` publishes its messages.
:param port: Broker listen port (default: 1883).
:param base_topic: Topic prefix, as specified in ``/opt/zigbee2mqtt/data/configuration.yaml``
(default: '``base_topic``').
:param timeout: If the command expects from a response, then this timeout value will be used
(default: 60 seconds).
:param tls_cafile: If the connection requires TLS/SSL, specify the certificate authority file
(default: None)
:param tls_certfile: If the connection requires TLS/SSL, specify the certificate file (default: None)
:param tls_keyfile: If the connection requires TLS/SSL, specify the key file (default: None)
:param tls_version: If the connection requires TLS/SSL, specify the minimum TLS supported version
(default: None)
:param tls_ciphers: If the connection requires TLS/SSL, specify the supported ciphers (default: None)
:param username: If the connection requires user authentication, specify the username (default: None)
:param password: If the connection requires user authentication, specify the password (default: None)
"""
super().__init__(host=host, port=port, tls_certfile=tls_certfile, tls_keyfile=tls_keyfile,
tls_version=tls_version, tls_ciphers=tls_ciphers, username=username,
password=password, **kwargs)
self.base_topic = base_topic
self.timeout = timeout
def _mqtt_args(self, host: Optional[str] = None, **kwargs):
if not host:
return {
'host': self.host,
'port': self.port,
'timeout': self.timeout,
'tls_certfile': self.tls_certfile,
'tls_keyfile': self.tls_keyfile,
'tls_version': self.tls_version,
'tls_ciphers': self.tls_ciphers,
'username': self.username,
'password': self.password,
}
return kwargs
def _topic(self, topic):
return self.base_topic + '/' + topic
@action
def devices(self, **kwargs) -> List[Dict[str, Any]]:
"""
Get the list of devices registered to the service.
:param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish``
(default: query the default configured device).
:return: List of paired devices. Example output:
.. code-block:: json
[
{
"dateCode": "20190608",
"friendly_name": "Coordinator",
"ieeeAddr": "0x00123456789abcde",
"lastSeen": 1579640601215,
"networkAddress": 0,
"softwareBuildID": "zStack12",
"type": "Coordinator"
},
{
"dateCode": "20160906",
"friendly_name": "My Lightbulb",
"hardwareVersion": 1,
"ieeeAddr": "0x00123456789abcdf",
"lastSeen": 1579595191623,
"manufacturerID": 4107,
"manufacturerName": "Philips",
"model": "8718696598283",
"modelID": "LTW013",
"networkAddress": 52715,
"powerSource": "Mains (single phase)",
"softwareBuildID": "1.15.2_r19181",
"type": "Router"
}
]
"""
return self.publish(
topic=self._topic('bridge/config/devices/get'), msg='',
reply_topic=self._topic('bridge/config/devices'),
**self._mqtt_args(**kwargs))
def _permit_join_timeout_callback(self, permit: bool, **kwargs):
def callback():
self.logger.info('Restoring permit_join state to {}'.format(permit))
self.permit_join(permit, **kwargs)
return callback
@action
def permit_join(self, permit: bool = True, timeout: Optional[float] = None, **kwargs):
"""
Enable/disable devices from joining the network. This is not persistent (will not be saved to
``configuration.yaml``).
:param permit: Set to True to allow joins, False otherwise.
:param timeout: Allow/disallow joins only for this amount of time.
:param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish``
(default: query the default configured device).
"""
self.publish(topic=self._topic('bridge/config/permit_join'), msg=permit, **self._mqtt_args(**kwargs))
if timeout:
threading.Timer(timeout, self._permit_join_timeout_callback(not permit, **kwargs)).start()
@action
def reset(self, **kwargs):
"""
Reset the adapter.
:param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish``
(default: query the default configured device).
"""
self.publish(topic=self._topic('bridge/config/reset'), msg='', **self._mqtt_args(**kwargs))
@action
def factory_reset(self, **kwargs):
"""
Perform a factory reset of the device. Of course, you should only do it if you know what you're doing,
as you will lose all the paired devices and may also lose the Z-Stack firmware.
:param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish``
(default: query the default configured device).
"""
self.publish(topic=self._topic('bridge/config/factory_reset'), msg='', **self._mqtt_args(**kwargs))
@action
def log_level(self, level: str, **kwargs):
"""
Change the log level at runtime. This change will not be persistent.
:param level: Possible values: 'debug', 'info', 'warn', 'error'.
:param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish``
(default: query the default configured device).
"""
self.publish(topic=self._topic('bridge/config/log_level'), msg=level, **self._mqtt_args(**kwargs))
@action
def device_set_option(self, device: str, option: str, value: Any, **kwargs):
"""
Change the options of a device. Options can only be changed, not added or deleted.
:param device: Display name of the device.
:param option: Option name.
:param value: New value.
:param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish``
(default: query the default configured device).
"""
self.publish(topic=self._topic('bridge/config/device_options'), msg={
'friendly_name': device,
'options': {
option: value,
}
}, **self._mqtt_args(**kwargs))
@action
def device_remove(self, device: str, force: bool = False, **kwargs):
"""
Remove a device from the network.
:param device: Display name of the device.
:param force: Force the remove also if the removal wasn't acknowledged by the device. Note: a forced remove
only removes the entry from the internal database, but the device is likely to connect again when
restarted unless it's factory reset (default: False).
:param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish``
(default: query the default configured device).
"""
topic = self._topic('bridge/config/{}remove'.format('force_' if force else ''))
self.publish(topic=topic, msg=device, **self._mqtt_args(**kwargs))
@action
def device_ban(self, device: str, **kwargs):
"""
Ban a device from the network.
:param device: Display name of the device.
:param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish``
(default: query the default configured device).
"""
self.publish(topic=self._topic('bridge/config/ban'), msg=device, **self._mqtt_args(**kwargs))
@action
def device_whitelist(self, device: str, **kwargs):
"""
Whitelist a device on the network. Note: once at least a device is whitelisted, all the other non-whitelisted
devices will be removed from the network.
:param device: Display name of the device.
:param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish``
(default: query the default configured device).
"""
self.publish(topic=self._topic('bridge/config/whitelist'), msg=device, **self._mqtt_args(**kwargs))
@action
def device_rename(self, name: str, device: Optional[str] = None, **kwargs):
"""
Rename a device on the network.
:param name: New name.
:param device: Current name of the device to rename. If no name is specified then the rename will
affect the last device that joined the network.
:param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish``
(default: query the default configured device).
"""
self.publish(
topic=self._topic('bridge/config/rename{}'.format('_last' if not device else '')),
msg={'old': device, 'new': name} if device else name,
**self._mqtt_args(**kwargs))
# noinspection PyShadowingBuiltins
@action
def device_get(self, device: str, property: Optional[str] = None, **kwargs) -> Dict[str, Any]:
"""
Get the properties of a device. The returned keys vary depending on the device. For example, a light bulb
may have the "``state``" and "``brightness``" properties, while an environment sensor may have the
"``temperature``" and "``humidity``" properties, and so on.
:param device: Display name of the device.
:param property: Name of the property that should be retrieved (default: all).
:param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish``
(default: query the default configured device).
:return: Key->value map of the device properties.
"""
properties = self.publish(topic=self._topic(device + '/get'),
reply_topic=self._topic(device), msg='', **self._mqtt_args(**kwargs)).output
if property:
assert property in properties, 'No such property: ' + property
return {property: properties[property]}
return properties
# noinspection PyShadowingBuiltins
@action
def device_set(self, device: str, property: str, value: Any, **kwargs):
"""
Set a properties on a device. The compatible properties vary depending on the device. For example, a light bulb
may have the "``state``" and "``brightness``" properties, while an environment sensor may have the
"``temperature``" and "``humidity``" properties, and so on.
:param device: Display name of the device.
:param property: Name of the property that should be set.
:param value: New value of the property.
:param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish``
(default: query the default configured device).
"""
properties = self.publish(topic=self._topic(device + '/set'),
reply_topic=self._topic(device),
msg={property: value}, **self._mqtt_args(**kwargs)).output
if property:
assert property in properties, 'No such property: ' + property
return {property: properties[property]}
return properties
@action
def device_groups(self, device: str, **kwargs) -> List[int]:
"""
List the groups a given device belongs to.
:param device: Display name of the device.
:param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish``
(default: query the default configured device).
:return: List of group IDs the device is linked to.
"""
return self.publish(topic=self._topic('bridge/device/{}/get_group_membership'.format(device)),
reply_topic=self._topic(device), msg=device, **self._mqtt_args(**kwargs)).\
output.get('group_list', [])
@action
def group_add(self, name: str, id: Optional[int] = None, **kwargs):
"""
Add a new group.
:param name: Display name of the group.
:param id: Optional numeric ID (default: auto-generated).
:param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish``
(default: query the default configured device).
"""
args = {'friendly_name': name}
if id is not None:
args['id'] = id
self.publish(topic=self._topic('bridge/config/add_group'), msg=args, **self._mqtt_args(**kwargs))
@action
def group_remove(self, name: str, **kwargs):
"""
Remove a group.
:param name: Display name of the group.
:param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish``
(default: query the default configured device).
"""
self.publish(topic=self._topic('bridge/config/remove_group'), msg=name,
**self._mqtt_args(**kwargs))
@action
def group_add_device(self, group: str, device: str, **kwargs):
"""
Add a device to a group.
:param group: Display name of the group.
:param device: Display name of the device to be added.
:param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish``
(default: query the default configured device).
"""
self.publish(topic=self._topic('bridge/group/{}/add'.format(group)),
msg=device, **self._mqtt_args(**kwargs))
@action
def group_remove_device(self, group: str, device: Optional[str] = None, **kwargs):
"""
Remove a device from a group.
:param group: Display name of the group.
:param device: Display name of the device to be removed. If none is specified then all the devices registered
to the specified group will be removed.
:param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish``
(default: query the default configured device).
"""
self.publish(topic=self._topic('bridge/group/{}/remove{}'.format(group, '_all' if not device else '')),
msg=device, **self._mqtt_args(**kwargs))
@action
def bind_devices(self, source: str, target: str, endpoint: Optional[str] = None, **kwargs):
"""
Bind two devices. Binding makes it possible that devices can directly control each other without the
intervention of zigbee2mqtt or any home automation software. You may want to use this feature to bind
for example an IKEA/Philips Hue dimmer switch to a light bulb, or a Zigbee remote to a thermostat.
Read more on the `zigbee2mqtt binding page <https://www.zigbee2mqtt.io/information/binding.html>`_.
:param source: Name of the source device. It can also be a group name, although the support is
`still experimental <https://www.zigbee2mqtt.io/information/binding.html#binding-a-group>`_.
:param target: Name of the target device.
:param endpoint: The target may support multiple endpoints (e.g. 'left', 'down', 'up' etc.). If so,
you can bind the source to a specific endpoint on the target device.
:param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish``
(default: query the default configured device).
"""
self.publish(topic=self._topic('bridge/bind/' + source + ('/' + endpoint if endpoint else '')),
msg=target, **self._mqtt_args(**kwargs))
@action
def unbind_devices(self, source: str, target: str, **kwargs):
"""
Un-bind two devices.
:param source: Name of the source device.
:param target: Name of the target device.
:param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish``
(default: query the default configured device).
"""
self.publish(topic=self._topic('bridge/unbind/' + source),
msg=target, **self._mqtt_args(**kwargs))
# vim:sw=4:ts=4:et: