diff --git a/platypush/backend/zigbee/mqtt.py b/platypush/backend/zigbee/mqtt.py index 22a1c311..2637bb75 100644 --- a/platypush/backend/zigbee/mqtt.py +++ b/platypush/backend/zigbee/mqtt.py @@ -84,7 +84,9 @@ class ZigbeeMqttBackend(MqttBackend): plugin = get_plugin('zigbee.mqtt') self.base_topic = base_topic or plugin.base_topic - listeners = [{ + self._devices = {} + self._groups = {} + self.server_info = { 'host': host or plugin.host, 'port': port or plugin.port or self._default_mqtt_port, 'tls_cafile': tls_cafile or plugin.tls_cafile, @@ -92,11 +94,15 @@ class ZigbeeMqttBackend(MqttBackend): 'tls_ciphers': tls_ciphers or plugin.tls_ciphers, 'tls_keyfile': tls_keyfile or plugin.tls_keyfile, 'tls_version': tls_version or plugin.tls_version, - 'username': username or plugin.username , + 'username': username or plugin.username, 'password': password or plugin.password, + } + + listeners = [{ + **self.server_info, 'topics': [ self.base_topic + '/' + topic - for topic in ['bridge/state', 'bridge/log'] + for topic in ['bridge/state', 'bridge/log', 'bridge/logging', 'bridge/devices', 'bridge/groups'] ], }] @@ -116,50 +122,99 @@ class ZigbeeMqttBackend(MqttBackend): def _process_log_message(self, client, msg): msg_type = msg.get('type') - msg = msg.get('message') + text = msg.get('message') # noinspection PyProtectedMember args = {'host': client._host, 'port': client._port} if msg_type == 'devices': devices = {} - for dev in (msg or []): + for dev in (text or []): devices[dev['friendly_name']] = dev client.subscribe(self.base_topic + '/' + dev['friendly_name']) elif msg_type == 'pairing': - self.bus.post(ZigbeeMqttDevicePairingEvent(device=msg, **args)) - elif msg_type == 'device_connected': - self.bus.post(ZigbeeMqttDeviceConnectedEvent(device=msg, **args)) + self.bus.post(ZigbeeMqttDevicePairingEvent(device=text, **args)) elif msg_type in ['device_ban', 'device_banned']: - self.bus.post(ZigbeeMqttDeviceBannedEvent(device=msg, **args)) - elif msg_type in ['device_removed', 'device_force_removed']: - force = msg_type == 'device_force_removed' - self.bus.post(ZigbeeMqttDeviceRemovedEvent(device=msg, force=force, **args)) + self.bus.post(ZigbeeMqttDeviceBannedEvent(device=text, **args)) elif msg_type in ['device_removed_failed', 'device_force_removed_failed']: force = msg_type == 'device_force_removed_failed' - self.bus.post(ZigbeeMqttDeviceRemovedFailedEvent(device=msg, force=force, **args)) + self.bus.post(ZigbeeMqttDeviceRemovedFailedEvent(device=text, force=force, **args)) elif msg_type == 'device_whitelisted': - self.bus.post(ZigbeeMqttDeviceWhitelistedEvent(device=msg, **args)) + self.bus.post(ZigbeeMqttDeviceWhitelistedEvent(device=text, **args)) elif msg_type == 'device_renamed': - self.bus.post(ZigbeeMqttDeviceRenamedEvent(device=msg, **args)) + self.bus.post(ZigbeeMqttDeviceRenamedEvent(device=text, **args)) elif msg_type == 'device_bind': - self.bus.post(ZigbeeMqttDeviceBindEvent(device=msg, **args)) + self.bus.post(ZigbeeMqttDeviceBindEvent(device=text, **args)) elif msg_type == 'device_unbind': - self.bus.post(ZigbeeMqttDeviceUnbindEvent(device=msg, **args)) + self.bus.post(ZigbeeMqttDeviceUnbindEvent(device=text, **args)) elif msg_type == 'device_group_add': - self.bus.post(ZigbeeMqttGroupAddedEvent(group=msg, **args)) + self.bus.post(ZigbeeMqttGroupAddedEvent(group=text, **args)) elif msg_type == 'device_group_add_failed': - self.bus.post(ZigbeeMqttGroupAddedFailedEvent(group=msg, **args)) + self.bus.post(ZigbeeMqttGroupAddedFailedEvent(group=text, **args)) elif msg_type == 'device_group_remove': - self.bus.post(ZigbeeMqttGroupRemovedEvent(group=msg, **args)) + self.bus.post(ZigbeeMqttGroupRemovedEvent(group=text, **args)) elif msg_type == 'device_group_remove_failed': - self.bus.post(ZigbeeMqttGroupRemovedFailedEvent(group=msg, **args)) + self.bus.post(ZigbeeMqttGroupRemovedFailedEvent(group=text, **args)) elif msg_type == 'device_group_remove_all': - self.bus.post(ZigbeeMqttGroupRemoveAllEvent(group=msg, **args)) + self.bus.post(ZigbeeMqttGroupRemoveAllEvent(group=text, **args)) elif msg_type == 'device_group_remove_all_failed': - self.bus.post(ZigbeeMqttGroupRemoveAllFailedEvent(group=msg, **args)) + self.bus.post(ZigbeeMqttGroupRemoveAllFailedEvent(group=text, **args)) elif msg_type == 'zigbee_publish_error': - self.logger.warning('zigbee2mqtt internal error: {}'.format(msg)) - self.bus.post(ZigbeeMqttErrorEvent(error=msg, **args)) + self.logger.error('zigbee2mqtt error: {}'.format(text)) + self.bus.post(ZigbeeMqttErrorEvent(error=text, **args)) + elif msg.get('level') in ['warning', 'error']: + log = getattr(self.logger, msg['level']) + log('zigbee2mqtt {}: {}'.format(msg['level'], text or msg.get('error', msg.get('warning')))) + + def _process_devices(self, client, msg): + devices_info = { + device.get('friendly_name', device.get('ieee_address')): device + for device in msg + } + + # noinspection PyProtectedMember + event_args = {'host': client._host, 'port': client._port} + client.subscribe(*[ + self.base_topic + '/' + device + for device in devices_info.keys() + ]) + + for name, device in devices_info.items(): + if name not in self._devices: + self.bus.post(ZigbeeMqttDeviceConnectedEvent(device=name, **event_args)) + + exposes = (device.get('definition', {}) or {}).get('exposes', []) + client.publish( + self.base_topic + '/' + name + '/get', + json.dumps(get_plugin('zigbee.mqtt').build_device_get_request(exposes)) + ) + + devices_copy = [*self._devices.keys()] + for name in devices_copy: + if name not in devices_info: + self.bus.post(ZigbeeMqttDeviceRemovedEvent(device=name, **event_args)) + del self._devices[name] + + self._devices = {device: {} for device in devices_info.keys()} + + def _process_groups(self, client, msg): + # noinspection PyProtectedMember + event_args = {'host': client._host, 'port': client._port} + groups_info = { + group.get('friendly_name', group.get('id')): group + for group in msg + } + + for name in groups_info.keys(): + if name not in self._groups: + self.bus.post(ZigbeeMqttGroupAddedEvent(group=name, **event_args)) + + groups_copy = [*self._groups.keys()] + for name in groups_copy: + if name not in groups_info: + self.bus.post(ZigbeeMqttGroupRemovedEvent(group=name, **event_args)) + del self._groups[name] + + self._groups = {group: {} for group in groups_info.keys()} def on_mqtt_message(self): def handler(client, _, msg): @@ -174,14 +229,31 @@ class ZigbeeMqttBackend(MqttBackend): if topic == 'bridge/state': self._process_state_message(client, data) - elif topic == 'bridge/log': + elif topic in ['bridge/log', 'bridge/logging']: self._process_log_message(client, data) + elif topic == 'bridge/devices': + self._process_devices(client, data) + elif topic == 'bridge/groups': + self._process_groups(client, data) else: - # noinspection PyProtectedMember - self.bus.post(ZigbeeMqttDevicePropertySetEvent(host=client._host, port=client._port, - device=topic, properties=data)) + suffix = topic.split('/')[-1] + if suffix not in self._devices: + return + + name = suffix + changed_props = {k: v for k, v in data.items() if v != self._devices[name].get(k)} + + if changed_props: + # noinspection PyProtectedMember + self.bus.post(ZigbeeMqttDevicePropertySetEvent(host=client._host, port=client._port, + device=name, properties=changed_props)) + + self._devices[name].update(data) return handler + def run(self): + super().run() + # vim:sw=4:ts=4:et: diff --git a/platypush/message/request/__init__.py b/platypush/message/request/__init__.py index 0980ff06..690b7c34 100644 --- a/platypush/message/request/__init__.py +++ b/platypush/message/request/__init__.py @@ -242,9 +242,9 @@ class Request(Message): elif not response.disable_logging: logger.info('Processed response from action {}: {}'. format(action, str(response))) - except AssertionError as e: + except (AssertionError, TimeoutError) as e: plugin.logger.exception(e) - logger.warning('Assertion error from action [{}]: {}'.format(action, str(e))) + logger.warning('{} from action [{}]: {}'.format(type(e), action, str(e))) response = Response(output=None, errors=[str(e)]) except Exception as e: # Retry mechanism diff --git a/platypush/plugins/zigbee/mqtt.py b/platypush/plugins/zigbee/mqtt.py index 8b964105..5f08eeaf 100644 --- a/platypush/plugins/zigbee/mqtt.py +++ b/platypush/plugins/zigbee/mqtt.py @@ -38,7 +38,9 @@ class ZigbeeMqttPlugin(MqttPlugin): - You can disconnect your debugger and downloader cable once the firmware is flashed. - Install ``zigbee2mqtt``. First install a node/npm environment, then either install ``zigbee2mqtt`` manually or - through your package manager. Manual instructions: + through your package manager. **NOTE**: many API breaking changes have occurred on Zigbee2MQTT 1.17.0, + therefore this integration will only be compatible with the version 1.17.0 of the service or higher versions. + Manual instructions: .. code-block:: shell @@ -97,7 +99,7 @@ class ZigbeeMqttPlugin(MqttPlugin): """ - def __init__(self, host: str = 'localhost', port: int = 1883, base_topic: str = 'zigbee2mqtt', timeout: int = 60, + def __init__(self, host: str = 'localhost', port: int = 1883, base_topic: str = 'zigbee2mqtt', timeout: int = 10, tls_certfile: Optional[str] = None, tls_keyfile: Optional[str] = None, tls_version: Optional[str] = None, tls_ciphers: Optional[str] = None, username: Optional[str] = None, password: Optional[str] = None, **kwargs): @@ -545,7 +547,7 @@ class ZigbeeMqttPlugin(MqttPlugin): **self._mqtt_args(**kwargs))) @staticmethod - def _build_device_get_request(values: List[Dict[str, Any]]) -> dict: + def build_device_get_request(values: List[Dict[str, Any]]) -> dict: def extract_value(value: dict, root: dict): if not value.get('access', 1) & 0x1: # Property not readable @@ -553,7 +555,7 @@ class ZigbeeMqttPlugin(MqttPlugin): if 'features' not in value: if 'property' in value: - root[value['property']] = '' + root[value['property']] = 0 if value['type'] == 'numeric' else '' return if 'property' in value: @@ -602,7 +604,7 @@ class ZigbeeMqttPlugin(MqttPlugin): return {} return self.publish(topic=self._topic(device) + '/get', reply_topic=self._topic(device), - msg=self._build_device_get_request(exposes), **kwargs) + msg=self.build_device_get_request(exposes), **kwargs) # noinspection PyShadowingBuiltins,DuplicatedCode @action