diff --git a/platypush/backend/mqtt/__init__.py b/platypush/backend/mqtt/__init__.py index 31d083798..2dbc89b67 100644 --- a/platypush/backend/mqtt/__init__.py +++ b/platypush/backend/mqtt/__init__.py @@ -17,11 +17,25 @@ from platypush.utils import set_thread_name class MqttClient(mqtt.Client, threading.Thread): - def __init__(self, *args, host: str, port: int, topics: Optional[List[str]] = None, - on_message: Optional[Callable] = None, username: Optional[str] = None, password: Optional[str] = None, - client_id: Optional[str] = None, tls_cafile: Optional[str] = None, tls_certfile: Optional[str] = None, - tls_keyfile: Optional[str] = None, tls_version: Optional = None, tls_ciphers: Optional = None, - tls_insecure: bool = False, keepalive: Optional[int] = 60, **kwargs): + def __init__( + self, + *args, + host: str, + port: int, + topics: Optional[List[str]] = None, + on_message: Optional[Callable] = None, + username: Optional[str] = None, + password: Optional[str] = None, + client_id: Optional[str] = None, + tls_cafile: Optional[str] = None, + tls_certfile: Optional[str] = None, + tls_keyfile: Optional[str] = None, + tls_version=None, + tls_ciphers=None, + tls_insecure: bool = False, + keepalive: Optional[int] = 60, + **kwargs, + ): mqtt.Client.__init__(self, *args, client_id=client_id, **kwargs) threading.Thread.__init__(self) @@ -38,8 +52,13 @@ class MqttClient(mqtt.Client, threading.Thread): self.username_pw_set(username, password) if tls_cafile: - self.tls_set(ca_certs=tls_cafile, certfile=tls_certfile, keyfile=tls_keyfile, tls_version=tls_version, - ciphers=tls_ciphers) + self.tls_set( + ca_certs=tls_cafile, + certfile=tls_certfile, + keyfile=tls_keyfile, + tls_version=tls_version, + ciphers=tls_ciphers, + ) self.tls_insecure_set(tls_insecure) @@ -106,14 +125,25 @@ class MqttBackend(Backend): _default_mqtt_port = 1883 - def __init__(self, host: Optional[str] = None, port: int = _default_mqtt_port, - topic='platypush_bus_mq', subscribe_default_topic: bool = True, - tls_cafile: Optional[str] = None, tls_certfile: Optional[str] = None, - tls_keyfile: Optional[str] = None, tls_version: Optional[str] = None, - tls_ciphers: Optional[str] = None, tls_insecure: bool = False, - username: Optional[str] = None, password: Optional[str] = None, - client_id: Optional[str] = None, listeners=None, - *args, **kwargs): + def __init__( + self, + host: Optional[str] = None, + port: int = _default_mqtt_port, + topic='platypush_bus_mq', + subscribe_default_topic: bool = True, + tls_cafile: Optional[str] = None, + tls_certfile: Optional[str] = None, + tls_keyfile: Optional[str] = None, + tls_version: Optional[str] = None, + tls_ciphers: Optional[str] = None, + tls_insecure: bool = False, + username: Optional[str] = None, + password: Optional[str] = None, + client_id: Optional[str] = None, + listeners=None, + *args, + **kwargs, + ): """ :param host: MQTT broker host. If no host configuration is specified then the backend will use the host configuration specified on the ``mqtt`` @@ -172,10 +202,12 @@ class MqttBackend(Backend): self.tls_insecure = tls_insecure self.username = username self.password = password - self.client_id = client_id or Config.get('device_id') + self.client_id: str = client_id or Config.get('device_id') # type: ignore else: client = get_plugin('mqtt') - assert client.host, 'No host specified on backend.mqtt nor mqtt configuration' + assert ( + client.host + ), 'No host specified on backend.mqtt nor mqtt configuration' self.host = client.host self.port = client.port @@ -187,7 +219,7 @@ class MqttBackend(Backend): self.tls_insecure = client.tls_insecure self.username = client.username self.password = client.password - self.client_id = client_id or client.client_id + self.client_id: str = client_id or client.client_id # type: ignore self.topic = '{}/{}'.format(topic, self.device_id) self.subscribe_default_topic = subscribe_default_topic @@ -197,12 +229,21 @@ class MqttBackend(Backend): def send_message(self, msg, topic: Optional[str] = None, **kwargs): try: client = get_plugin('mqtt') - client.send_message(topic=topic or self.topic, msg=msg, host=self.host, - port=self.port, username=self.username, - password=self.password, tls_cafile=self.tls_cafile, - tls_certfile=self.tls_certfile, tls_keyfile=self.tls_keyfile, - tls_version=self.tls_version, tls_insecure=self.tls_insecure, - tls_ciphers=self.tls_ciphers, **kwargs) + client.send_message( + topic=topic or self.topic, + msg=msg, + host=self.host, + port=self.port, + username=self.username, + password=self.password, + tls_cafile=self.tls_cafile, + tls_certfile=self.tls_certfile, + tls_keyfile=self.tls_keyfile, + tls_version=self.tls_version, + tls_insecure=self.tls_insecure, + tls_ciphers=self.tls_ciphers, + **kwargs, + ) except Exception as e: self.logger.exception(e) @@ -238,45 +279,92 @@ class MqttBackend(Backend): topics = listener.get('topics') if not topics: - self.logger.warning('No list of topics specified for listener n.{}'.format(i+1)) + self.logger.warning( + 'No list of topics specified for listener n.{}'.format(i + 1) + ) continue - client = self._get_client(host=host, port=port, topics=topics, username=username, password=password, - client_id=self.client_id, tls_cafile=tls_cafile, tls_certfile=tls_certfile, - tls_keyfile=tls_keyfile, tls_version=tls_version, tls_ciphers=tls_ciphers, - tls_insecure=tls_insecure) + client = self._get_client( + host=host, + port=port, + topics=topics, + username=username, + password=password, + client_id=self.client_id, + tls_cafile=tls_cafile, + tls_certfile=tls_certfile, + tls_keyfile=tls_keyfile, + tls_version=tls_version, + tls_ciphers=tls_ciphers, + tls_insecure=tls_insecure, + ) if not client.is_alive(): client.start() def _get_client_id( - self, host: str, port: int, topics: Optional[List[str]] = None, - client_id: Optional[str] = None, on_message: Optional[bool] = None, + self, + host: str, + port: int, + topics: Optional[List[str]] = None, + client_id: Optional[str] = None, + on_message: Optional[bool] = None, ) -> str: return '{client_id}-{client_hash}'.format( client_id=client_id or self.client_id, - client_hash=hashlib.sha1('|'.join([ - host, str(port), - json.dumps(sorted(topics or [])), - str(id(on_message)) - ]).encode()).hexdigest(), + client_hash=hashlib.sha1( + '|'.join( + [ + host, + str(port), + json.dumps(sorted(topics or [])), + str(id(on_message)), + ] + ).encode() + ).hexdigest(), ) - def _get_client(self, host: str, port: int, topics: Optional[List[str]] = None, username: Optional[str] = None, - password: Optional[str] = None, client_id: Optional[str] = None, tls_cafile: Optional[str] = None, - tls_certfile: Optional[str] = None, tls_keyfile: Optional[str] = None, tls_version: Optional = None, - tls_ciphers: Optional = None, tls_insecure: bool = False, on_message: Optional[Callable] = None) \ - -> MqttClient: + def _get_client( + self, + host: str, + port: int, + topics: Optional[List[str]] = None, + username: Optional[str] = None, + password: Optional[str] = None, + client_id: Optional[str] = None, + tls_cafile: Optional[str] = None, + tls_certfile: Optional[str] = None, + tls_keyfile: Optional[str] = None, + tls_version=None, + tls_ciphers=None, + tls_insecure: bool = False, + on_message: Optional[Callable] = None, + ) -> MqttClient: on_message = on_message or self.on_mqtt_message() - client_id = self._get_client_id(host=host, port=port, topics=topics, client_id=client_id, on_message=on_message) + client_id = self._get_client_id( + host=host, + port=port, + topics=topics, + client_id=client_id, + on_message=on_message, + ) client = self._listeners.get(client_id) if not (client and client.is_alive()): client = self._listeners[client_id] = MqttClient( - host=host, port=port, topics=topics, username=username, password=password, - client_id=client_id, tls_cafile=tls_cafile, tls_certfile=tls_certfile, - tls_keyfile=tls_keyfile, tls_version=tls_version, tls_ciphers=tls_ciphers, - tls_insecure=tls_insecure, on_message=on_message + host=host, + port=port, + topics=topics, + username=username, + password=password, + client_id=client_id, + tls_cafile=tls_cafile, + tls_certfile=tls_certfile, + tls_keyfile=tls_keyfile, + tls_version=tls_version, + tls_ciphers=tls_ciphers, + tls_insecure=tls_insecure, + on_message=on_message, ) client.subscribe(*topics) @@ -293,7 +381,11 @@ class MqttBackend(Backend): self.logger.debug(str(e)) # noinspection PyProtectedMember - self.bus.post(MQTTMessageEvent(host=client._host, port=client._port, topic=msg.topic, msg=data)) + self.bus.post( + MQTTMessageEvent( + host=client._host, port=client._port, topic=msg.topic, msg=data + ) + ) return handler @@ -307,8 +399,11 @@ class MqttBackend(Backend): return response_topic = '{}/responses/{}'.format(self.topic, msg.id) - self.logger.info('Processing response on the MQTT topic {}: {}'. - format(response_topic, response)) + self.logger.info( + 'Processing response on the MQTT topic {}: {}'.format( + response_topic, response + ) + ) self.send_message(response, topic=response_topic) @@ -332,7 +427,9 @@ class MqttBackend(Backend): return if isinstance(msg, Request): - threading.Thread(target=response_thread, name='MQTTProcessor', args=(msg,)).start() + threading.Thread( + target=response_thread, name='MQTTProcessor', args=(msg,) + ).start() return handler @@ -341,16 +438,28 @@ class MqttBackend(Backend): if self.host and self.subscribe_default_topic: topics = [self.topic] - client = self._get_client(host=self.host, port=self.port, topics=topics, username=self.username, - password=self.password, client_id=self.client_id, - tls_cafile=self.tls_cafile, tls_certfile=self.tls_certfile, - tls_keyfile=self.tls_keyfile, tls_version=self.tls_version, - tls_ciphers=self.tls_ciphers, tls_insecure=self.tls_insecure, - on_message=self.on_exec_message()) + client = self._get_client( + host=self.host, + port=self.port, + topics=topics, + username=self.username, + password=self.password, + client_id=self.client_id, + tls_cafile=self.tls_cafile, + tls_certfile=self.tls_certfile, + tls_keyfile=self.tls_keyfile, + tls_version=self.tls_version, + tls_ciphers=self.tls_ciphers, + tls_insecure=self.tls_insecure, + on_message=self.on_exec_message(), + ) client.start() - self.logger.info('Initialized MQTT backend on host {}:{}, topic {}'. - format(self.host, self.port, self.topic)) + self.logger.info( + 'Initialized MQTT backend on host {}:{}, topic {}'.format( + self.host, self.port, self.topic + ) + ) self.add_listeners(*self.listeners_conf) @@ -365,4 +474,5 @@ class MqttBackend(Backend): self.logger.info('MQTT backend terminated') + # vim:sw=4:ts=4:et: diff --git a/platypush/backend/zigbee/mqtt/__init__.py b/platypush/backend/zigbee/mqtt/__init__.py index fac191a5f..983fbe65e 100644 --- a/platypush/backend/zigbee/mqtt/__init__.py +++ b/platypush/backend/zigbee/mqtt/__init__.py @@ -1,351 +1,34 @@ -import contextlib -import json -from typing import Optional, Mapping +import warnings -from platypush.backend.mqtt import MqttBackend -from platypush.context import get_plugin -from platypush.message.event.zigbee.mqtt import ( - ZigbeeMqttOnlineEvent, - ZigbeeMqttOfflineEvent, - ZigbeeMqttDevicePropertySetEvent, - ZigbeeMqttDevicePairingEvent, - ZigbeeMqttDeviceConnectedEvent, - ZigbeeMqttDeviceBannedEvent, - ZigbeeMqttDeviceRemovedEvent, - ZigbeeMqttDeviceRemovedFailedEvent, - ZigbeeMqttDeviceWhitelistedEvent, - ZigbeeMqttDeviceRenamedEvent, - ZigbeeMqttDeviceBindEvent, - ZigbeeMqttDeviceUnbindEvent, - ZigbeeMqttGroupAddedEvent, - ZigbeeMqttGroupAddedFailedEvent, - ZigbeeMqttGroupRemovedEvent, - ZigbeeMqttGroupRemovedFailedEvent, - ZigbeeMqttGroupRemoveAllEvent, - ZigbeeMqttGroupRemoveAllFailedEvent, - ZigbeeMqttErrorEvent, -) +from platypush.backend import Backend -class ZigbeeMqttBackend(MqttBackend): +class ZigbeeMqttBackend(Backend): """ Listen for events on a zigbee2mqtt service. - For historical reasons, this backend should be enabled together with the `zigbee.mqtt` plugin. + **WARNING**: This backend is **DEPRECATED** and it will be removed in a + future version. - Triggers: - - * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttOnlineEvent` when the service comes online. - * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttOfflineEvent` when the service goes offline. - * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDevicePropertySetEvent` when the properties of a - connected device change. - * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDevicePairingEvent` when a device is pairing. - * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceConnectedEvent` when a device connects - to the network. - * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceBannedEvent` when a device is banned - from the network. - * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceRemovedEvent` when a device is removed - from the network. - * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceRemovedFailedEvent` when a request to - remove a device from the network fails. - * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceWhitelistedEvent` when a device is - whitelisted on the network. - * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceRenamedEvent` when a device is - renamed on the network. - * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceBindEvent` when a device bind event - occurs. - * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceUnbindEvent` when a device unbind event - occurs. - * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupAddedEvent` when a group is added. - * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupAddedFailedEvent` when a request to - add a new group fails. - * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupRemovedEvent` when a group is removed. - * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupRemovedFailedEvent` when a request to - remove a group fails. - * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupRemoveAllEvent` when all the devices - are removed from a group. - * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupRemoveAllFailedEvent` when a request to - remove all the devices from a group fails. - * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttErrorEvent` when an internal error occurs - on the zigbee2mqtt service. - - Requires: - - * **paho-mqtt** (``pip install paho-mqtt``) - * The :class:`platypush.plugins.zigbee.mqtt.ZigbeeMqttPlugin` plugin configured. + It has been merged with + :class:`platypush.plugins.zigbee.mqtt.ZigbeeMqttPlugin`. + Now you can simply configure the `zigbee.mqtt` plugin in order to enable + the Zigbee integration - no need to enable both the plugin and the backend. """ - def __init__( - self, - host: Optional[str] = None, - port: Optional[int] = None, - base_topic='zigbee2mqtt', - tls_cafile: Optional[str] = None, - tls_certfile: Optional[str] = None, - tls_keyfile: Optional[str] = None, - tls_version: Optional[str] = None, - tls_ciphers: Optional[str] = None, - username: Optional[str] = None, - password: Optional[str] = None, - client_id: Optional[str] = None, - *args, - **kwargs - ): - """ - :param host: MQTT broker host (default: host configured on the ``zigbee.mqtt`` plugin). - :param port: MQTT broker port (default: 1883). - :param base_topic: Prefix of the topics published by zigbee2mqtt (default: '``zigbee2mqtt``'). - :param tls_cafile: If TLS/SSL is enabled on the MQTT server and the certificate requires a certificate authority - to authenticate it, `ssl_cafile` will point to the provided ca.crt file (default: None) - :param tls_certfile: If TLS/SSL is enabled on the MQTT server and a client certificate it required, specify it - here (default: None) - :param tls_keyfile: If TLS/SSL is enabled on the MQTT server and a client certificate key it required, - specify it here (default: None) :type tls_keyfile: str - :param tls_version: If TLS/SSL is enabled on the MQTT server and it requires a certain TLS version, specify it - here (default: None) - :param tls_ciphers: If TLS/SSL is enabled on the MQTT server and an explicit list of supported ciphers is - required, specify it here (default: None) - :param username: Specify it if the MQTT server requires authentication (default: None) - :param password: Specify it if the MQTT server requires authentication (default: None) - :param client_id: MQTT client ID (default: ``-zigbee-mqtt``, to prevent clashes with the - :class:`platypush.backend.mqtt.MqttBackend` ``client_id``. - """ - - plugin = get_plugin('zigbee.mqtt') - self.base_topic = base_topic or plugin.base_topic - self._devices = {} - self._devices_info = {} - self._groups = {} - self._last_state = None - self.server_info = { - 'host': host or plugin.host, - 'port': port or plugin.port or self._default_mqtt_port, - 'tls_cafile': tls_cafile or plugin.tls_cafile, - 'tls_certfile': tls_certfile or plugin.tls_certfile, - 'tls_ciphers': tls_ciphers or plugin.tls_ciphers, - 'tls_keyfile': tls_keyfile or plugin.tls_keyfile, - 'tls_version': tls_version or plugin.tls_version, - 'username': username or plugin.username, - 'password': password or plugin.password, - } - - kwargs = { - **kwargs, - **self.server_info, - } - - listeners = [ - { - **self.server_info, - 'topics': [ - self.base_topic + '/' + topic - for topic in [ - 'bridge/state', - 'bridge/log', - 'bridge/logging', - 'bridge/devices', - 'bridge/groups', - ] - ], - } - ] - - super().__init__( - *args, - subscribe_default_topic=False, - listeners=listeners, - client_id=client_id, - **kwargs - ) - - if not client_id: - self.client_id += '-zigbee-mqtt' - - def _process_state_message(self, client, msg): - if msg == self._last_state: - return - - if msg == 'online': - evt = ZigbeeMqttOnlineEvent - elif msg == 'offline': - evt = ZigbeeMqttOfflineEvent - self.logger.warning('zigbee2mqtt service is offline') - else: - return - - # noinspection PyProtectedMember - self.bus.post(evt(host=client._host, port=client._port)) - self._last_state = msg - - def _process_log_message(self, client, msg): - msg_type = msg.get('type') - text = msg.get('message') - # noinspection PyProtectedMember - args = {'host': client._host, 'port': client._port} - - if msg_type == 'devices': - devices = {} - for dev in text or []: - devices[dev['friendly_name']] = dev - client.subscribe(self.base_topic + '/' + dev['friendly_name']) - elif msg_type == 'pairing': - self.bus.post(ZigbeeMqttDevicePairingEvent(device=text, **args)) - elif msg_type in ['device_ban', 'device_banned']: - self.bus.post(ZigbeeMqttDeviceBannedEvent(device=text, **args)) - elif msg_type in ['device_removed_failed', 'device_force_removed_failed']: - force = msg_type == 'device_force_removed_failed' - self.bus.post( - ZigbeeMqttDeviceRemovedFailedEvent(device=text, force=force, **args) - ) - elif msg_type == 'device_whitelisted': - self.bus.post(ZigbeeMqttDeviceWhitelistedEvent(device=text, **args)) - elif msg_type == 'device_renamed': - self.bus.post(ZigbeeMqttDeviceRenamedEvent(device=text, **args)) - elif msg_type == 'device_bind': - self.bus.post(ZigbeeMqttDeviceBindEvent(device=text, **args)) - elif msg_type == 'device_unbind': - self.bus.post(ZigbeeMqttDeviceUnbindEvent(device=text, **args)) - elif msg_type == 'device_group_add': - self.bus.post(ZigbeeMqttGroupAddedEvent(group=text, **args)) - elif msg_type == 'device_group_add_failed': - self.bus.post(ZigbeeMqttGroupAddedFailedEvent(group=text, **args)) - elif msg_type == 'device_group_remove': - self.bus.post(ZigbeeMqttGroupRemovedEvent(group=text, **args)) - elif msg_type == 'device_group_remove_failed': - self.bus.post(ZigbeeMqttGroupRemovedFailedEvent(group=text, **args)) - elif msg_type == 'device_group_remove_all': - self.bus.post(ZigbeeMqttGroupRemoveAllEvent(group=text, **args)) - elif msg_type == 'device_group_remove_all_failed': - self.bus.post(ZigbeeMqttGroupRemoveAllFailedEvent(group=text, **args)) - elif msg_type == 'zigbee_publish_error': - self.logger.error('zigbee2mqtt error: {}'.format(text)) - self.bus.post(ZigbeeMqttErrorEvent(error=text, **args)) - elif msg.get('level') in ['warning', 'error']: - log = getattr(self.logger, msg['level']) - log( - 'zigbee2mqtt {}: {}'.format( - msg['level'], text or msg.get('error', msg.get('warning')) - ) - ) - - def _process_devices(self, client, msg): - devices_info = { - device.get('friendly_name', device.get('ieee_address')): device - for device in msg - } - - # noinspection PyProtectedMember - event_args = {'host': client._host, 'port': client._port} - client.subscribe( - *[self.base_topic + '/' + device for device in devices_info.keys()] - ) - - for name, device in devices_info.items(): - if name not in self._devices: - self.bus.post(ZigbeeMqttDeviceConnectedEvent(device=name, **event_args)) - - exposes = (device.get('definition', {}) or {}).get('exposes', []) - payload = self._plugin._build_device_get_request(exposes) - if payload: - client.publish( - self.base_topic + '/' + name + '/get', - json.dumps(payload), - ) - - devices_copy = [*self._devices.keys()] - for name in devices_copy: - if name not in devices_info: - self.bus.post(ZigbeeMqttDeviceRemovedEvent(device=name, **event_args)) - del self._devices[name] - - self._devices = {device: {} for device in devices_info.keys()} - self._devices_info = devices_info - - def _process_groups(self, client, msg): - # noinspection PyProtectedMember - event_args = {'host': client._host, 'port': client._port} - groups_info = { - group.get('friendly_name', group.get('id')): group for group in msg - } - - for name in groups_info.keys(): - if name not in self._groups: - self.bus.post(ZigbeeMqttGroupAddedEvent(group=name, **event_args)) - - groups_copy = [*self._groups.keys()] - for name in groups_copy: - if name not in groups_info: - self.bus.post(ZigbeeMqttGroupRemovedEvent(group=name, **event_args)) - del self._groups[name] - - self._groups = {group: {} for group in groups_info.keys()} - - def on_mqtt_message(self): - def handler(client, _, msg): - topic = msg.topic[len(self.base_topic) + 1 :] - data = msg.payload.decode() - if not data: - return - - with contextlib.suppress(ValueError, TypeError): - data = json.loads(data) - - if topic == 'bridge/state': - self._process_state_message(client, data) - elif topic in ['bridge/log', 'bridge/logging']: - self._process_log_message(client, data) - elif topic == 'bridge/devices': - self._process_devices(client, data) - elif topic == 'bridge/groups': - self._process_groups(client, data) - else: - suffix = topic.split('/')[-1] - if suffix not in self._devices: - return - - name = suffix - changed_props = { - k: v for k, v in data.items() if v != self._devices[name].get(k) - } - - if changed_props: - self._process_property_update(name, data) - self.bus.post( - ZigbeeMqttDevicePropertySetEvent( - host=client._host, - port=client._port, - device=name, - properties=changed_props, - ) - ) - - self._devices[name].update(data) - - return handler - - @property - def _plugin(self): - plugin = get_plugin('zigbee.mqtt') - assert plugin, 'The zigbee.mqtt plugin is not configured' - return plugin - - def _process_property_update(self, device_name: str, properties: Mapping): - device_info = self._devices_info.get(device_name) - if not (device_info and properties): - return - - self._plugin.publish_entities( - [ - { - **device_info, - 'state': properties, - } - ] - ) - def run(self): super().run() + warnings.warn( + ''' + The zigbee.mqtt has been merged into the zigbee.mqtt plugin. It is + now deprecated and it will be removed in a future version. Remove + any references to it from your configuration. + ''', + DeprecationWarning, + ) + + self.wait_stop() # vim:sw=4:ts=4:et: diff --git a/platypush/backend/zigbee/mqtt/manifest.yaml b/platypush/backend/zigbee/mqtt/manifest.yaml deleted file mode 100644 index f1d8ca0fb..000000000 --- a/platypush/backend/zigbee/mqtt/manifest.yaml +++ /dev/null @@ -1,45 +0,0 @@ -manifest: - events: - platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceBannedEvent: when a device - is bannedfrom the network. - platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceBindEvent: when a device bind - eventoccurs. - platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceConnectedEvent: when a device - connectsto the network. - platypush.message.event.zigbee.mqtt.ZigbeeMqttDevicePairingEvent: when a device - is pairing. - platypush.message.event.zigbee.mqtt.ZigbeeMqttDevicePropertySetEvent: when the - properties of aconnected device change. - platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceRemovedEvent: when a device - is removedfrom the network. - platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceRemovedFailedEvent: when a - request toremove a device from the network fails. - platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceRenamedEvent: when a device - isrenamed on the network. - platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceUnbindEvent: when a device - unbind eventoccurs. - platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceWhitelistedEvent: when a device - iswhitelisted on the network. - platypush.message.event.zigbee.mqtt.ZigbeeMqttErrorEvent: when an internal error - occurson the zigbee2mqtt service. - platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupAddedEvent: when a group is - added. - platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupAddedFailedEvent: when a request - toadd a new group fails. - platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupRemoveAllEvent: when all the - devicesare removed from a group. - platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupRemoveAllFailedEvent: when - a request toremove all the devices from a group fails. - platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupRemovedEvent: when a group - is removed. - platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupRemovedFailedEvent: when a - request toremove a group fails. - platypush.message.event.zigbee.mqtt.ZigbeeMqttOfflineEvent: when the service goes - offline. - platypush.message.event.zigbee.mqtt.ZigbeeMqttOnlineEvent: when the service comes - online. - install: - pip: - - paho-mqtt - package: platypush.backend.zigbee.mqtt - type: backend diff --git a/platypush/plugins/zigbee/mqtt/__init__.py b/platypush/plugins/zigbee/mqtt/__init__.py index 862bf396a..c00747c39 100644 --- a/platypush/plugins/zigbee/mqtt/__init__.py +++ b/platypush/plugins/zigbee/mqtt/__init__.py @@ -28,11 +28,12 @@ from platypush.entities.sensors import ( from platypush.entities.switches import Switch, EnumSwitch from platypush.entities.temperature import TemperatureSensor from platypush.message.response import Response +from platypush.plugins import RunnablePlugin from platypush.plugins.mqtt import MqttPlugin, action @manages(Battery, Device, Dimmer, Light, LinkQuality, Sensor, Switch) -class ZigbeeMqttPlugin(MqttPlugin): # lgtm [py/missing-call-to-init] +class ZigbeeMqttPlugin(RunnablePlugin, MqttPlugin): # lgtm [py/missing-call-to-init] """ This plugin allows you to interact with Zigbee devices over MQTT through any Zigbee sniffer and `zigbee2mqtt `_. @@ -126,6 +127,42 @@ class ZigbeeMqttPlugin(MqttPlugin): # lgtm [py/missing-call-to-init] * **paho-mqtt** (``pip install paho-mqtt``) + Triggers: + + * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttOnlineEvent` when the service comes online. + * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttOfflineEvent` when the service goes offline. + * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDevicePropertySetEvent` when the properties of a + connected device change. + * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDevicePairingEvent` when a device is pairing. + * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceConnectedEvent` when a device connects + to the network. + * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceBannedEvent` when a device is banned + from the network. + * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceRemovedEvent` when a device is removed + from the network. + * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceRemovedFailedEvent` when a request to + remove a device from the network fails. + * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceWhitelistedEvent` when a device is + whitelisted on the network. + * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceRenamedEvent` when a device is + renamed on the network. + * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceBindEvent` when a device bind event + occurs. + * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceUnbindEvent` when a device unbind event + occurs. + * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupAddedEvent` when a group is added. + * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupAddedFailedEvent` when a request to + add a new group fails. + * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupRemovedEvent` when a group is removed. + * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupRemovedFailedEvent` when a request to + remove a group fails. + * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupRemoveAllEvent` when all the devices + are removed from a group. + * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupRemoveAllFailedEvent` when a request to + remove all the devices from a group fails. + * :class:`platypush.message.event.zigbee.mqtt.ZigbeeMqttErrorEvent` when an internal error occurs + on the zigbee2mqtt service. + """ def __init__( @@ -1881,5 +1918,13 @@ class ZigbeeMqttPlugin(MqttPlugin): # lgtm [py/missing-call-to-init] self.device_set(self._preferred_name(dev), values=data) + def main(self): + from ._listener import ZigbeeMqttListener + + listener = ZigbeeMqttListener() + listener.start() + self.wait_stop() + listener.join() + # vim:sw=4:ts=4:et: diff --git a/platypush/plugins/zigbee/mqtt/_listener.py b/platypush/plugins/zigbee/mqtt/_listener.py new file mode 100644 index 000000000..015bbc057 --- /dev/null +++ b/platypush/plugins/zigbee/mqtt/_listener.py @@ -0,0 +1,269 @@ +import contextlib +import json +from typing import Mapping + +from platypush.backend.mqtt import MqttBackend +from platypush.bus import Bus +from platypush.context import get_bus, get_plugin +from platypush.message.event.zigbee.mqtt import ( + ZigbeeMqttOnlineEvent, + ZigbeeMqttOfflineEvent, + ZigbeeMqttDevicePropertySetEvent, + ZigbeeMqttDevicePairingEvent, + ZigbeeMqttDeviceConnectedEvent, + ZigbeeMqttDeviceBannedEvent, + ZigbeeMqttDeviceRemovedEvent, + ZigbeeMqttDeviceRemovedFailedEvent, + ZigbeeMqttDeviceWhitelistedEvent, + ZigbeeMqttDeviceRenamedEvent, + ZigbeeMqttDeviceBindEvent, + ZigbeeMqttDeviceUnbindEvent, + ZigbeeMqttGroupAddedEvent, + ZigbeeMqttGroupAddedFailedEvent, + ZigbeeMqttGroupRemovedEvent, + ZigbeeMqttGroupRemovedFailedEvent, + ZigbeeMqttGroupRemoveAllEvent, + ZigbeeMqttGroupRemoveAllFailedEvent, + ZigbeeMqttErrorEvent, +) +from platypush.plugins.zigbee.mqtt import ZigbeeMqttPlugin + + +class ZigbeeMqttListener(MqttBackend): + """ + Listener for zigbee2mqtt events. + """ + + def __init__(self): + plugin = self._plugin + self.base_topic = plugin.base_topic # type: ignore + self._devices = {} + self._devices_info = {} + self._groups = {} + self._last_state = None + self.server_info = { + 'host': plugin.host, # type: ignore + 'port': plugin.port or self._default_mqtt_port, # type: ignore + 'tls_cafile': plugin.tls_cafile, # type: ignore + 'tls_certfile': plugin.tls_certfile, # type: ignore + 'tls_ciphers': plugin.tls_ciphers, # type: ignore + 'tls_keyfile': plugin.tls_keyfile, # type: ignore + 'tls_version': plugin.tls_version, # type: ignore + 'username': plugin.username, # type: ignore + 'password': plugin.password, # type: ignore + } + + listeners = [ + { + **self.server_info, + 'topics': [ + self.base_topic + '/' + topic + for topic in [ + 'bridge/state', + 'bridge/log', + 'bridge/logging', + 'bridge/devices', + 'bridge/groups', + ] + ], + } + ] + + super().__init__( + subscribe_default_topic=False, listeners=listeners, **self.server_info + ) + + assert self.client_id + self.client_id += '-zigbee-mqtt' + + def _process_state_message(self, client, msg): + if msg == self._last_state: + return + + if msg == 'online': + evt = ZigbeeMqttOnlineEvent + elif msg == 'offline': + evt = ZigbeeMqttOfflineEvent + self.logger.warning('zigbee2mqtt service is offline') + else: + return + + self._bus.post(evt(host=client._host, port=client._port)) + self._last_state = msg + + def _process_log_message(self, client, msg): + msg_type = msg.get('type') + text = msg.get('message') + args = {'host': client._host, 'port': client._port} + + if msg_type == 'devices': + devices = {} + for dev in text or []: + devices[dev['friendly_name']] = dev + client.subscribe(self.base_topic + '/' + dev['friendly_name']) + elif msg_type == 'pairing': + self._bus.post(ZigbeeMqttDevicePairingEvent(device=text, **args)) + elif msg_type in ['device_ban', 'device_banned']: + self._bus.post(ZigbeeMqttDeviceBannedEvent(device=text, **args)) + elif msg_type in ['device_removed_failed', 'device_force_removed_failed']: + force = msg_type == 'device_force_removed_failed' + self._bus.post( + ZigbeeMqttDeviceRemovedFailedEvent(device=text, force=force, **args) + ) + elif msg_type == 'device_whitelisted': + self._bus.post(ZigbeeMqttDeviceWhitelistedEvent(device=text, **args)) + elif msg_type == 'device_renamed': + self._bus.post(ZigbeeMqttDeviceRenamedEvent(device=text, **args)) + elif msg_type == 'device_bind': + self._bus.post(ZigbeeMqttDeviceBindEvent(device=text, **args)) + elif msg_type == 'device_unbind': + self._bus.post(ZigbeeMqttDeviceUnbindEvent(device=text, **args)) + elif msg_type == 'device_group_add': + self._bus.post(ZigbeeMqttGroupAddedEvent(group=text, **args)) + elif msg_type == 'device_group_add_failed': + self._bus.post(ZigbeeMqttGroupAddedFailedEvent(group=text, **args)) + elif msg_type == 'device_group_remove': + self._bus.post(ZigbeeMqttGroupRemovedEvent(group=text, **args)) + elif msg_type == 'device_group_remove_failed': + self._bus.post(ZigbeeMqttGroupRemovedFailedEvent(group=text, **args)) + elif msg_type == 'device_group_remove_all': + self._bus.post(ZigbeeMqttGroupRemoveAllEvent(group=text, **args)) + elif msg_type == 'device_group_remove_all_failed': + self._bus.post(ZigbeeMqttGroupRemoveAllFailedEvent(group=text, **args)) + elif msg_type == 'zigbee_publish_error': + self.logger.error('zigbee2mqtt error: {}'.format(text)) + self._bus.post(ZigbeeMqttErrorEvent(error=text, **args)) + elif msg.get('level') in ['warning', 'error']: + log = getattr(self.logger, msg['level']) + log( + 'zigbee2mqtt {}: {}'.format( + msg['level'], text or msg.get('error', msg.get('warning')) + ) + ) + + def _process_devices(self, client, msg): + devices_info = { + device.get('friendly_name', device.get('ieee_address')): device + for device in msg + } + + # noinspection PyProtectedMember + event_args = {'host': client._host, 'port': client._port} + client.subscribe( + *[self.base_topic + '/' + device for device in devices_info.keys()] + ) + + for name, device in devices_info.items(): + if name not in self._devices: + self._bus.post( + ZigbeeMqttDeviceConnectedEvent(device=name, **event_args) + ) + + exposes = (device.get('definition', {}) or {}).get('exposes', []) + payload = self._plugin._build_device_get_request(exposes) # type: ignore + if payload: + client.publish( + self.base_topic + '/' + name + '/get', + json.dumps(payload), + ) + + devices_copy = [*self._devices.keys()] + for name in devices_copy: + if name not in devices_info: + self._bus.post(ZigbeeMqttDeviceRemovedEvent(device=name, **event_args)) + del self._devices[name] + + self._devices = {device: {} for device in devices_info.keys()} + self._devices_info = devices_info + + def _process_groups(self, client, msg): + # noinspection PyProtectedMember + event_args = {'host': client._host, 'port': client._port} + groups_info = { + group.get('friendly_name', group.get('id')): group for group in msg + } + + for name in groups_info.keys(): + if name not in self._groups: + self._bus.post(ZigbeeMqttGroupAddedEvent(group=name, **event_args)) + + groups_copy = [*self._groups.keys()] + for name in groups_copy: + if name not in groups_info: + self._bus.post(ZigbeeMqttGroupRemovedEvent(group=name, **event_args)) + del self._groups[name] + + self._groups = {group: {} for group in groups_info.keys()} + + def on_mqtt_message(self): + def handler(client, _, msg): + topic = msg.topic[len(self.base_topic) + 1 :] + data = msg.payload.decode() + if not data: + return + + with contextlib.suppress(ValueError, TypeError): + data = json.loads(data) + + if topic == 'bridge/state': + self._process_state_message(client, data) + elif topic in ['bridge/log', 'bridge/logging']: + self._process_log_message(client, data) + elif topic == 'bridge/devices': + self._process_devices(client, data) + elif topic == 'bridge/groups': + self._process_groups(client, data) + else: + suffix = topic.split('/')[-1] + if suffix not in self._devices: + return + + name = suffix + changed_props = { + k: v for k, v in data.items() if v != self._devices[name].get(k) + } + + if changed_props: + self._process_property_update(name, data) + self._bus.post( + ZigbeeMqttDevicePropertySetEvent( + host=client._host, + port=client._port, + device=name, + properties=changed_props, + ) + ) + + self._devices[name].update(data) + + return handler + + @property + def _plugin(self) -> ZigbeeMqttPlugin: + plugin = get_plugin('zigbee.mqtt') + assert plugin, 'The zigbee.mqtt plugin is not configured' + return plugin + + @property + def _bus(self) -> Bus: + return get_bus() + + def _process_property_update(self, device_name: str, properties: Mapping): + device_info = self._devices_info.get(device_name) + if not (device_info and properties): + return + + self._plugin.publish_entities( # type: ignore + [ + { + **device_info, + 'state': properties, + } + ] + ) + + def run(self): + super().run() + + +# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/zigbee/mqtt/manifest.yaml b/platypush/plugins/zigbee/mqtt/manifest.yaml index 3c57ec8d1..d8ee99a7f 100644 --- a/platypush/plugins/zigbee/mqtt/manifest.yaml +++ b/platypush/plugins/zigbee/mqtt/manifest.yaml @@ -1,5 +1,43 @@ manifest: - events: {} + events: + platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceBannedEvent: > + when a device is banned from the network. + platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceBindEvent: > + when a device bind event occurs. + platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceConnectedEvent: > + when a device connects to the network. + platypush.message.event.zigbee.mqtt.ZigbeeMqttDevicePairingEvent: > + when a device is pairing. + platypush.message.event.zigbee.mqtt.ZigbeeMqttDevicePropertySetEvent: > + when the properties of a connected device change. + platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceRemovedEvent: > + when a device is removed from the network. + platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceRemovedFailedEvent: > + when a request to remove a device from the network fails. + platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceRenamedEvent: > + when a device is renamed on the network. + platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceUnbindEvent: > + when a device unbind event occurs. + platypush.message.event.zigbee.mqtt.ZigbeeMqttDeviceWhitelistedEvent: > + when a device is whitelisted on the network. + platypush.message.event.zigbee.mqtt.ZigbeeMqttErrorEvent: > + when an internal error occurs on the zigbee2mqtt service. + platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupAddedEvent: > + when a group is added. + platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupAddedFailedEvent: > + when a request to add a new group fails. + platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupRemoveAllEvent: > + when all the devices are removed from a group. + platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupRemoveAllFailedEvent: > + when a request to remove all the devices from a group fails. + platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupRemovedEvent: > + when a group is removed. + platypush.message.event.zigbee.mqtt.ZigbeeMqttGroupRemovedFailedEvent: > + when a request to remove a group fails. + platypush.message.event.zigbee.mqtt.ZigbeeMqttOfflineEvent: > + when the service goes offline. + platypush.message.event.zigbee.mqtt.ZigbeeMqttOnlineEvent: > + when the service comes online. install: pip: - paho-mqtt