diff --git a/platypush/backend/zigbee/mqtt.py b/platypush/backend/zigbee/mqtt.py index aea36acd..22a1c311 100644 --- a/platypush/backend/zigbee/mqtt.py +++ b/platypush/backend/zigbee/mqtt.py @@ -82,47 +82,29 @@ class ZigbeeMqttBackend(MqttBackend): :param password: Specify it if the MQTT server requires authentication (default: None) """ - if host: - self.base_topic = base_topic - listeners = [{ - 'host': host, - 'port': port or self._default_mqtt_port, - 'tls_cafile': tls_cafile, - 'tls_certfile': tls_certfile, - 'tls_ciphers': tls_ciphers, - 'tls_keyfile': tls_keyfile, - 'tls_version': tls_version, - 'username': username, - 'password': password, - 'topics': [ - base_topic + '/' + topic - for topic in ['bridge/state', 'bridge/log'] - ], - }] - else: - plugin = get_plugin('zigbee.mqtt') - self.base_topic = plugin.base_topic - listeners = [{ - 'host': plugin.host, - 'port': plugin.port or self._default_mqtt_port, - 'tls_cafile': plugin.tls_cafile, - 'tls_certfile': plugin.tls_certfile, - 'tls_ciphers': plugin.tls_ciphers, - 'username': plugin.username, - 'password': plugin.password, - 'topics': [ - plugin.base_topic + '/' + topic - for topic in ['bridge/state', 'bridge/log'] - ], - }] + plugin = get_plugin('zigbee.mqtt') + self.base_topic = base_topic or plugin.base_topic + listeners = [{ + 'host': host or plugin.host, + 'port': port or plugin.port or self._default_mqtt_port, + 'tls_cafile': tls_cafile or plugin.tls_cafile, + 'tls_certfile': tls_certfile or plugin.tls_certfile, + 'tls_ciphers': tls_ciphers or plugin.tls_ciphers, + 'tls_keyfile': tls_keyfile or plugin.tls_keyfile, + 'tls_version': tls_version or plugin.tls_version, + 'username': username or plugin.username , + 'password': password or plugin.password, + 'topics': [ + self.base_topic + '/' + topic + for topic in ['bridge/state', 'bridge/log'] + ], + }] super().__init__(subscribe_default_topic=False, listeners=listeners, *args, **kwargs) - self._devices = {} def _process_state_message(self, client, msg): if msg == 'online': evt = ZigbeeMqttOnlineEvent - self._refresh_devices(client) elif msg == 'offline': evt = ZigbeeMqttOfflineEvent self.logger.warning('zigbee2mqtt service is offline') @@ -132,9 +114,6 @@ class ZigbeeMqttBackend(MqttBackend): # noinspection PyProtectedMember self.bus.post(evt(host=client._host, port=client._port)) - def _refresh_devices(self, client): - client.publish(self.base_topic + '/' + 'bridge/config/devices/get') - def _process_log_message(self, client, msg): msg_type = msg.get('type') msg = msg.get('message') @@ -146,13 +125,10 @@ class ZigbeeMqttBackend(MqttBackend): for dev in (msg or []): devices[dev['friendly_name']] = dev client.subscribe(self.base_topic + '/' + dev['friendly_name']) - - self._devices = devices elif msg_type == 'pairing': self.bus.post(ZigbeeMqttDevicePairingEvent(device=msg, **args)) elif msg_type == 'device_connected': self.bus.post(ZigbeeMqttDeviceConnectedEvent(device=msg, **args)) - self._refresh_devices(client) elif msg_type in ['device_ban', 'device_banned']: self.bus.post(ZigbeeMqttDeviceBannedEvent(device=msg, **args)) elif msg_type in ['device_removed', 'device_force_removed']: @@ -165,7 +141,6 @@ class ZigbeeMqttBackend(MqttBackend): self.bus.post(ZigbeeMqttDeviceWhitelistedEvent(device=msg, **args)) elif msg_type == 'device_renamed': self.bus.post(ZigbeeMqttDeviceRenamedEvent(device=msg, **args)) - self._refresh_devices(client) elif msg_type == 'device_bind': self.bus.post(ZigbeeMqttDeviceBindEvent(device=msg, **args)) elif msg_type == 'device_unbind': diff --git a/platypush/plugins/mqtt.py b/platypush/plugins/mqtt.py index 8ee393eb..fa6e7889 100644 --- a/platypush/plugins/mqtt.py +++ b/platypush/plugins/mqtt.py @@ -98,6 +98,37 @@ class MqttPlugin(Plugin): def _expandpath(path: Optional[str] = None) -> Optional[str]: return os.path.abspath(os.path.expanduser(path)) if path else None + def _get_client(self, tls_cafile: Optional[str] = None, tls_certfile: Optional[str] = None, + tls_keyfile: Optional[str] = None, tls_version: Optional[str] = None, + tls_ciphers: Optional[str] = None, tls_insecure: Optional[bool] = None, + username: Optional[str] = None, password: Optional[str] = None): + from paho.mqtt.client import Client + + tls_cafile = self._expandpath(tls_cafile or self.tls_cafile) + tls_certfile = self._expandpath(tls_certfile or self.tls_certfile) + tls_keyfile = self._expandpath(tls_keyfile or self.tls_keyfile) + tls_ciphers = tls_ciphers or self.tls_ciphers + username = username or self.username + password = password or self.password + + tls_version = tls_version or self.tls_version + if tls_version: + tls_version = self.get_tls_version(tls_version) + if tls_insecure is None: + tls_insecure = self.tls_insecure + + client = Client() + + if username and password: + client.username_pw_set(username, password) + if tls_cafile: + client.tls_set(ca_certs=tls_cafile, certfile=tls_certfile, keyfile=tls_keyfile, + tls_version=tls_version, ciphers=tls_ciphers) + + client.tls_insecure_set(tls_insecure) + + return client + @action def publish(self, topic: str, msg: Any, host: Optional[str] = None, port: int = 1883, reply_topic: Optional[str] = None, timeout: int = 60, @@ -129,55 +160,25 @@ class MqttPlugin(Plugin): :param username: Specify it if the MQTT server requires authentication (default: None). :param password: Specify it if the MQTT server requires authentication (default: None). """ - from paho.mqtt.client import Client - - if not host and not self.host: - raise RuntimeError('No host specified and no default host configured') - - if not host: - host = self.host - port = self.port - tls_cafile = self.tls_cafile - tls_certfile = self.tls_certfile - tls_keyfile = self.tls_keyfile - tls_version = self.tls_version - tls_ciphers = self.tls_ciphers - tls_insecure = self.tls_insecure - username = self.username - password = self.password - else: - tls_cafile = self._expandpath(tls_cafile) - tls_certfile = self._expandpath(tls_certfile) - tls_keyfile = self._expandpath(tls_keyfile) - if tls_version: - tls_version = self.get_tls_version(tls_version) - if tls_insecure is None: - tls_insecure = self.tls_insecure - - client = Client() - - if username and password: - client.username_pw_set(username, password) - if tls_cafile: - client.tls_set(ca_certs=tls_cafile, certfile=tls_certfile, keyfile=tls_keyfile, - tls_version=tls_version, ciphers=tls_ciphers) - - client.tls_insecure_set(tls_insecure) - - # Try to parse it as a platypush message or dump it to JSON from a dict/list - if isinstance(msg, (dict, list)): - msg = json.dumps(msg) - - # noinspection PyBroadException - try: - msg = Message.build(json.loads(msg)) - except: - pass - - client.connect(host, port, keepalive=timeout) response_buffer = io.BytesIO() + client = None try: + # Try to parse it as a platypush message or dump it to JSON from a dict/list + if isinstance(msg, (dict, list)): + msg = json.dumps(msg) + + # noinspection PyBroadException + try: + msg = Message.build(json.loads(msg)) + except: + pass + + client = self._get_client(tls_cafile=tls_cafile, tls_certfile=tls_certfile, tls_keyfile=tls_keyfile, + tls_version=tls_version, tls_ciphers=tls_ciphers, tls_insecure=tls_insecure, + username=username, password=password) + + client.connect(host, port, keepalive=timeout) response_received = threading.Event() if reply_topic: @@ -198,13 +199,14 @@ class MqttPlugin(Plugin): finally: response_buffer.close() - # noinspection PyBroadException - try: - client.loop_stop() - except: - pass + if client: + # noinspection PyBroadException + try: + client.loop_stop() + except: + pass - client.disconnect() + client.disconnect() @staticmethod def _response_callback(reply_topic: str, event: threading.Event, buffer: IO[bytes]): diff --git a/platypush/plugins/zigbee/mqtt.py b/platypush/plugins/zigbee/mqtt.py index e90a8ee7..185e40fc 100644 --- a/platypush/plugins/zigbee/mqtt.py +++ b/platypush/plugins/zigbee/mqtt.py @@ -1,3 +1,4 @@ +import json import threading from typing import Optional, List, Any, Dict @@ -12,18 +13,20 @@ class ZigbeeMqttPlugin(MqttPlugin): In order to get started you'll need: - - A Zigbee USB adapter/sniffer (in this example I'll use the `CC2531 `_. + - A Zigbee USB adapter/sniffer (in this example I'll use the + `CC2531 `_. - A Zigbee debugger/emulator + downloader cable (only to flash the firmware). Instructions: - Install `cc-tool `_ either from sources or from a package manager. - - Connect the Zigbee to your PC/RaspberryPi in this way: USB -> CC debugger -> downloader cable -> CC2531 -> USB. - The debugger and the adapter should be connected *at the same time*. If the later ``cc-tool`` command throws up - an error, put the device in sync while connected by pressing the _Reset_ button on the debugger. + - Connect the Zigbee to your PC/RaspberryPi in this way: + USB -> CC debugger -> downloader cable -> CC2531 -> USB + The debugger and the adapter should be connected *at the same time*. If the later ``cc-tool`` command throws + up an error, put the device in sync while connected by pressing the _Reset_ button on the debugger. - Check where the device is mapped. On Linux it will usually be ``/dev/ttyACM0``. - - Download the latest `Z-Stack firmware `_ to your device. - Instructions for a CC2531 device: + - Download the latest `Z-Stack firmware `_ + to your device. Instructions for a CC2531 device: .. code-block:: shell @@ -33,8 +36,8 @@ class ZigbeeMqttPlugin(MqttPlugin): - You can disconnect your debugger and downloader cable once the firmware is flashed. - - Install ``zigbee2mqtt``. First install a node/npm environment, then either install ``zigbee2mqtt`` manually or through your - package manager. Manual instructions: + - Install ``zigbee2mqtt``. First install a node/npm environment, then either install ``zigbee2mqtt`` manually or + through your package manager. Manual instructions: .. code-block:: shell @@ -82,7 +85,8 @@ class ZigbeeMqttPlugin(MqttPlugin): - If it all goes fine, once the daemon is running and a new device is found you should see traces like this in the output of ``zigbee2mqtt``:: - zigbee2mqtt:info 2019-11-09T12:19:56: Successfully interviewed '0x00158d0001dc126a', device has successfully been paired + zigbee2mqtt:info 2019-11-09T12:19:56: Successfully interviewed '0x00158d0001dc126a', device has + successfully been paired - You are now ready to use this integration. @@ -121,6 +125,55 @@ class ZigbeeMqttPlugin(MqttPlugin): self.base_topic = base_topic self.timeout = timeout + def _get_network_info(self, **kwargs): + self.logger.info('Fetching Zigbee network information') + client = None + mqtt_args = self._mqtt_args(**kwargs) + timeout = 30 + if 'timeout' in mqtt_args: + timeout = mqtt_args.pop('timeout') + + info = { + 'state': None, + 'info': {}, + 'config': {}, + 'devices': [], + 'groups': [], + } + + info_ready_events = {topic: threading.Event() for topic in info.keys()} + + def _on_message(): + def callback(_, __, msg): + topic = msg.topic.split('/')[-1] + if topic in info: + info[topic] = msg.payload.decode() if topic == 'state' else json.loads(msg.payload.decode()) + info_ready_events[topic].set() + + return callback + + try: + host = mqtt_args.pop('host') + port = mqtt_args.pop('port') + client = self._get_client(**mqtt_args) + client.on_message = _on_message() + client.connect(host, port, keepalive=timeout) + client.subscribe(self.base_topic + '/bridge/#') + client.loop_start() + + for event in info_ready_events.values(): + info_ready = event.wait(timeout=timeout) + if not info_ready: + raise TimeoutError('A timeout occurred while fetching the Zigbee network information') + + return info + finally: + try: + client.loop_stop() + client.disconnect() + except Exception as e: + self.logger.warning('Error on MQTT client disconnection: {}'.format(str(e))) + def _mqtt_args(self, host: Optional[str] = None, **kwargs): if not host: return { @@ -154,41 +207,171 @@ class ZigbeeMqttPlugin(MqttPlugin): [ { - "dateCode": "20190608", + "date_code": "20190608", "friendly_name": "Coordinator", - "ieeeAddr": "0x00123456789abcde", - "lastSeen": 1579640601215, - "networkAddress": 0, - "softwareBuildID": "zStack12", - "type": "Coordinator" + "ieee_address": "0x00123456789abcde", + "network_address": 0, + "supported": false, + "type": "Coordinator", + "interviewing": false, + "interviewing_completed": true, + "definition": null, + "endpoints": { + "13": { + "bindings": [], + "clusters": { + "input": ["genOta"], + "output": [] + }, + "output": [] + } + } }, + { - "dateCode": "20160906", + "date_code": "20180906", "friendly_name": "My Lightbulb", - "hardwareVersion": 1, - "ieeeAddr": "0x00123456789abcdf", - "lastSeen": 1579595191623, - "manufacturerID": 4107, - "manufacturerName": "Philips", - "model": "8718696598283", - "modelID": "LTW013", - "networkAddress": 52715, - "powerSource": "Mains (single phase)", - "softwareBuildID": "1.15.2_r19181", - "type": "Router" + "ieee_address": "0x00123456789abcdf", + "network_address": 52715, + "power_source": "Mains (single phase)", + "software_build_id": "5.127.1.26581", + "model_id": "LCT001", + "supported": true, + "interviewing": false, + "interviewing_completed": true, + "type": "Router", + "definition": { + "description": "Hue white and color ambiance E26/E27/E14", + "model": "9290012573A", + "vendor": "Philips", + "exposes": [ + { + "features": [ + { + "access": 7, + "description": "On/off state of this light", + "name": "state", + "property": "state", + "type": "binary", + "value_off": "OFF", + "value_on": "ON", + "value_toggle": "TOGGLE" + }, + { + "access": 7, + "description": "Brightness of this light", + "name": "brightness", + "property": "brightness", + "type": "numeric", + "value_max": 254, + "value_min": 0 + }, + { + "access": 7, + "description": "Color temperature of this light", + "name": "color_temp", + "property": "color_temp", + "type": "numeric", + "unit": "mired", + "value_max": 500, + "value_min": 150 + }, + { + "description": "Color of this light in the CIE 1931 color space (x/y)", + "features": [ + { + "access": 7, + "name": "x", + "property": "x", + "type": "numeric" + }, + { + "access": 7, + "name": "y", + "property": "y", + "type": "numeric" + } + ], + "name": "color_xy", + "property": "color", + "type": "composite" + } + ], + "type": "light" + }, + { + "access": 2, + "description": "Triggers an effect on the light (e.g. make light blink for a few seconds)", + "name": "effect", + "property": "effect", + "type": "enum", + "values": [ + "blink", + "breathe", + "okay", + "channel_change", + "finish_effect", + "stop_effect" + ] + }, + { + "access": 1, + "description": "Link quality (signal strength)", + "name": "linkquality", + "property": "linkquality", + "type": "numeric", + "unit": "lqi", + "value_max": 255, + "value_min": 0 + } + ] + }, + + "endpoints": { + "11": { + "bindings": [], + "clusters": { + "input": [ + "genBasic", + "genIdentify", + "genGroups", + "genScenes", + "genOnOff", + "genLevelCtrl", + "touchlink", + "lightingColorCtrl", + "manuSpecificUbisysDimmerSetup" + ], + "output": [ + "genOta" + ] + }, + "configured_reportings": [] + }, + "242": { + "bindings": [], + "clusters": { + "input": [ + "greenPower" + ], + "output": [ + "greenPower" + ] + }, + "configured_reportings": [] + } + } } ] """ - return self.publish( - topic=self._topic('bridge/config/devices/get'), msg='', - reply_topic=self._topic('bridge/config/devices'), - **self._mqtt_args(**kwargs)) + return self._get_network_info(**kwargs).get('devices') def _permit_join_timeout_callback(self, permit: bool, **kwargs): def callback(): self.logger.info('Restoring permit_join state to {}'.format(permit)) self.permit_join(permit, **kwargs) + return callback @action @@ -334,9 +517,9 @@ class ZigbeeMqttPlugin(MqttPlugin): :return: Key->value map of the device properties. """ properties = self.publish(topic=self._topic(device + ('/get' if property else '')), - reply_topic=self._topic(device), - msg={property: ''} if property else '', - **self._mqtt_args(**kwargs)).output + reply_topic=self._topic(device), + msg={property: ''} if property else '', + **self._mqtt_args(**kwargs)).output if property: assert property in properties, 'No such property: ' + property @@ -380,40 +563,153 @@ class ZigbeeMqttPlugin(MqttPlugin): """ return self.publish(topic=self._topic('bridge/device/{}/get_group_membership'.format(device)), - reply_topic=self._topic(device), msg=device, **self._mqtt_args(**kwargs)).\ + reply_topic=self._topic(device), msg=device, **self._mqtt_args(**kwargs)). \ output.get('group_list', []) @action - def groups(self, **kwargs): + def groups(self, **kwargs) -> List[dict]: """ Get the groups registered on the device. :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` (default: query the default configured device). """ - groups = self.publish(topic=self._topic('bridge/config/groups'), msg={}, - reply_topic=self._topic('bridge/log'), - **self._mqtt_args(**kwargs)).output.get('message', []) + return self._get_network_info(**kwargs).get('groups') - # noinspection PyUnresolvedReferences - devices = { - device['ieeeAddr']: device - for device in self.devices(**kwargs).output + @action + def info(self, **kwargs) -> dict: + """ + Get the information, configuration and state of the network. + + :param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish`` + (default: query the default configured device). + + :return: Example: + + .. code-block:: json + + { + "state": "online", + "commit": "07cdc9d", + "config": { + "advanced": { + "adapter_concurrent": null, + "adapter_delay": null, + "availability_blacklist": [], + "availability_blocklist": [], + "availability_passlist": [], + "availability_timeout": 0, + "availability_whitelist": [], + "cache_state": true, + "cache_state_persistent": true, + "cache_state_send_on_startup": true, + "channel": 11, + "elapsed": false, + "ext_pan_id": [ + 221, + 221, + 221, + 221, + 221, + 221, + 221, + 221 + ], + "homeassistant_discovery_topic": "homeassistant", + "homeassistant_legacy_triggers": true, + "homeassistant_status_topic": "hass/status", + "last_seen": "disable", + "legacy_api": true, + "log_directory": "/opt/zigbee2mqtt/data/log/%TIMESTAMP%", + "log_file": "log.txt", + "log_level": "debug", + "log_output": [ + "console", + "file" + ], + "log_rotation": true, + "log_syslog": {}, + "pan_id": 6754, + "report": false, + "soft_reset_timeout": 0, + "timestamp_format": "YYYY-MM-DD HH:mm:ss" + }, + "ban": [], + "blocklist": [], + "device_options": {}, + "devices": { + "0x00123456789abcdf": { + "friendly_name": "My Lightbulb" + } + }, + "experimental": { + "output": "json" + }, + "external_converters": [], + "groups": {}, + "homeassistant": false, + "map_options": { + "graphviz": { + "colors": { + "fill": { + "coordinator": "#e04e5d", + "enddevice": "#fff8ce", + "router": "#4ea3e0" + }, + "font": { + "coordinator": "#ffffff", + "enddevice": "#000000", + "router": "#ffffff" + }, + "line": { + "active": "#009900", + "inactive": "#994444" + } + } + } + }, + "mqtt": { + "base_topic": "zigbee2mqtt", + "force_disable_retain": false, + "include_device_information": false, + "server": "mqtt://localhost" + }, + "passlist": [], + "permit_join": true, + "serial": { + "disable_led": false, + "port": "/dev/ttyUSB0" + }, + "whitelist": [] + }, + "coordinator": { + "meta": { + "maintrel": 3, + "majorrel": 2, + "minorrel": 6, + "product": 0, + "revision": 20190608, + "transportrev": 2 + }, + "type": "zStack12" + }, + "log_level": "debug", + "network": { + "channel": 11, + "extended_pan_id": "0xdddddddddddddddd", + "pan_id": 6754 + }, + "permit_join": true, + "version": "1.17.0" + } + + """ + info = self._get_network_info(**kwargs) + return { + 'state': info.get('state'), + 'info': info.get('info'), } - return [ - { - 'id': group['ID'], - 'friendly_name': group['friendly_name'], - 'optimistic': group.get('optimistic', False), - 'devices': [ - devices[device.split('/')[0]] - for device in group.get('devices', []) - ] - } - for group in groups - ] - @action def group_add(self, name: str, id: Optional[int] = None, **kwargs): """ @@ -549,4 +845,4 @@ class ZigbeeMqttPlugin(MqttPlugin): msg=target, **self._mqtt_args(**kwargs)) -# vim:sw=4:ts=4:et: +# vim:sw=4:ts=4:et: \ No newline at end of file