Added Z-Wave integration (closes #76)

This commit is contained in:
Fabio Manganiello 2020-02-05 22:26:52 +01:00
parent 8d203723da
commit 02246a48ae
7 changed files with 1412 additions and 11 deletions

View File

@ -239,6 +239,7 @@ autodoc_mock_imports = ['googlesamples.assistant.grpc.audio_helpers',
'graphyte',
'cpuinfo',
'psutil',
'openzwave',
]
sys.path.insert(0, os.path.abspath('../..'))

View File

@ -220,22 +220,33 @@ class Backend(Thread, EventGenerator):
redis.send_message(msg, queue_name=queue_name)
def run(self):
""" Starts the backend thread. To be implemented in the derived classes """
""" Starts the backend thread. To be implemented in the derived classes if the loop method isn't defined. """
self.thread_id = threading.get_ident()
set_thread_name(self._thread_name)
if not callable(self.loop):
return
with self:
while not self.should_stop():
try:
self.loop()
except Exception as e:
self.logger.error(str(e))
self.logger.exception(e)
finally:
if self.poll_seconds:
time.sleep(self.poll_seconds)
while not self.should_stop():
try:
with self:
has_error = False
while not self.should_stop() and not has_error:
try:
self.loop()
except Exception as e:
has_error = True
self.logger.error(str(e))
self.logger.exception(e)
finally:
if self.poll_seconds:
time.sleep(self.poll_seconds)
elif has_error:
time.sleep(5)
except Exception as e:
self.logger.error('{} initialization error: {}'.format(self.__class__.__name__, str(e)))
self.logger.exception(e)
time.sleep(self.poll_seconds or 5)
def __enter__(self):
""" Invoked when the backend is initialized, if the main logic is within a ``loop()`` function """

View File

@ -0,0 +1,262 @@
import inspect
import logging
import queue
import os
import threading
from typing import Optional
from platypush.backend import Backend
from platypush.config import Config
from platypush.message.event.zwave import ZwaveNetworkReadyEvent, ZwaveNetworkStoppedEvent, ZwaveEvent, \
ZwaveNodeAddedEvent, ZwaveValueAddedEvent, ZwaveNodeQueryCompletedEvent, ZwaveValueChangedEvent, \
ZwaveValueRefreshedEvent, ZwaveValueRemovedEvent, ZwaveNetworkResetEvent, ZwaveCommandEvent, \
ZwaveCommandWaitingEvent, ZwaveNodeRemovedEvent, ZwaveNodeRenamedEvent, ZwaveNodeReadyEvent, \
ZwaveButtonRemovedEvent, ZwaveButtonCreatedEvent, ZwaveButtonOnEvent, ZwaveButtonOffEvent, ZwaveNetworkErrorEvent, \
ZwaveNodeGroupEvent, ZwaveNodePollingEnabledEvent, ZwaveNodePollingDisabledEvent, ZwaveNodeSceneEvent, \
ZwaveNodeEvent
event_queue = queue.Queue()
network_ready = threading.Event()
class _ZWEvent:
def __init__(self, signal: str, sender: str, network=None, **kwargs):
self.signal = signal
self.sender = sender
self.network = network
self.args = kwargs
def _zwcallback(signal, sender, network, **kwargs):
if signal == network.SIGNAL_NETWORK_AWAKED:
network_ready.set()
event_queue.put(_ZWEvent(signal=signal, sender=sender, network=network, **kwargs))
class ZwaveBackend(Backend):
"""
Start and manage a Z-Wave network.
If you are using a USB adapter and want a consistent naming for the device paths, you can use udev.
.. code-block:: shell
# Get the vendorID and productID of your device through lsusb.
# Then add a udev rule for it to link it e.g. to /dev/zwave.
cat <<EOF > /etc/udev/rules.d/92-zwave.rules
SUBSYSTEM=="tty", ATTRS{idVendor}=="0658", ATTRS{idProduct}=="0200", SYMLINK+="zwave"
EOF
# Restart the udev service
systemctl restart systemd-udevd.service
Triggers:
* :class:`platypush.message.event.zwave.ZwaveNetworkReadyEvent` when the network is up and running.
* :class:`platypush.message.event.zwave.ZwaveNetworkStoppedEvent` when the network goes down.
* :class:`platypush.message.event.zwave.ZwaveNetworkResetEvent` when the network is reset.
* :class:`platypush.message.event.zwave.ZwaveNetworkErrorEvent` when an error occurs on the network.
* :class:`platypush.message.event.zwave.ZwaveNodeQueryCompletedEvent` when all the nodes on the network
have been queried.
* :class:`platypush.message.event.zwave.ZwaveNodeEvent` when a node attribute changes.
* :class:`platypush.message.event.zwave.ZwaveNodeAddedEvent` when a node is added to the network.
* :class:`platypush.message.event.zwave.ZwaveNodeRemovedEvent` when a node is removed from the network.
* :class:`platypush.message.event.zwave.ZwaveNodeRenamedEvent` when a node is renamed.
* :class:`platypush.message.event.zwave.ZwaveNodeReadyEvent` when a node is ready.
* :class:`platypush.message.event.zwave.ZwaveNodeGroupEvent` when a node is associated/de-associated to a
group.
* :class:`platypush.message.event.zwave.ZwaveNodeSceneEvent` when a scene is set on a node.
* :class:`platypush.message.event.zwave.ZwaveNodePollingEnabledEvent` when the polling is successfully turned
on a node.
* :class:`platypush.message.event.zwave.ZwaveNodePollingDisabledEvent` when the polling is successfully turned
off a node.
* :class:`platypush.message.event.zwave.ZwaveButtonCreatedEvent` when a button is added to the network.
* :class:`platypush.message.event.zwave.ZwaveButtonRemovedEvent` when a button is removed from the network.
* :class:`platypush.message.event.zwave.ZwaveButtonOnEvent` when a button is pressed.
* :class:`platypush.message.event.zwave.ZwaveButtonOffEvent` when a button is released.
* :class:`platypush.message.event.zwave.ZwaveValueAddedEvent` when a value is added to a node on the network.
* :class:`platypush.message.event.zwave.ZwaveValueChangedEvent` when the value of a node on the network
changes.
* :class:`platypush.message.event.zwave.ZwaveValueRefreshedEvent` when the value of a node on the network
is refreshed.
* :class:`platypush.message.event.zwave.ZwaveValueRemovedEvent` when the value of a node on the network
is removed.
* :class:`platypush.message.event.zwave.ZwaveCommandEvent` when a command is received on the network.
* :class:`platypush.message.event.zwave.ZwaveCommandWaitingEvent` when a command is waiting for a message
to complete.
Requires:
* **python-openzwave** (``pip install python-openzwave``)
"""
def __init__(self, device: str, config_path: Optional[str] = None, user_path: Optional[str] = None,
ready_timeout: float = 10.0, *args, **kwargs):
"""
:param device: Path to the Z-Wave adapter (e.g. /dev/ttyUSB0 or /dev/ttyACM0).
:param config_path: Z-Wave configuration path (default: ``<OPENZWAVE_PATH>/ozw_config``).
:param user_path: Z-Wave user path where runtime and configuration files will be stored
(default: ``<PLATYPUSH_WORKDIR>/zwave``).
:param ready_timeout: Network ready timeout in seconds (default: 60).
"""
import python_openzwave
from openzwave.network import ZWaveNetwork
super().__init__(*args, **kwargs)
self.device = device
if not config_path:
config_path = os.path.join(os.path.dirname(inspect.getfile(python_openzwave)), 'ozw_config')
if not user_path:
user_path = os.path.join(Config.get('workdir'), 'zwave')
os.makedirs(user_path, mode=0o770, exist_ok=True)
self.config_path = config_path
self.user_path = user_path
self.ready_timeout = ready_timeout
self.network: Optional[ZWaveNetwork] = None
def start_network(self):
if self.network and self.network.state >= self.network.STATE_AWAKED:
self.logger.info('Z-Wave network already started')
return
from openzwave.network import ZWaveNetwork, dispatcher
from openzwave.option import ZWaveOption
network_ready.clear()
logging.getLogger('openzwave').addHandler(self.logger)
opts = ZWaveOption(self.device, config_path=self.config_path, user_path=self.user_path)
opts.set_console_output(False)
opts.lock()
self.network = ZWaveNetwork(opts, log=None)
dispatcher.connect(_zwcallback)
ready = network_ready.wait(self.ready_timeout)
if not ready:
self.logger.warning('Driver not ready after {} seconds: continuing anyway'.format(self.ready_timeout))
def stop_network(self):
if self.network:
self.network.stop()
network_ready.clear()
self.network = None
def _process_event(self, event: _ZWEvent):
from platypush.plugins.zwave import ZwavePlugin
network = event.network or self.network
if event.signal == network.SIGNAL_NETWORK_STOPPED or \
event.signal == network.SIGNAL_DRIVER_REMOVED:
event_queue.put(ZwaveNetworkStoppedEvent(device=self.device))
elif event.signal == network.SIGNAL_ALL_NODES_QUERIED or \
event.signal == network.SIGNAL_ALL_NODES_QUERIED_SOME_DEAD:
event = ZwaveNodeQueryCompletedEvent(device=self.device)
elif event.signal == network.SIGNAL_NETWORK_FAILED:
event = ZwaveNetworkErrorEvent(device=self.device)
self.logger.warning('Z-Wave network error')
elif event.signal == network.SIGNAL_NETWORK_RESETTED or \
event.signal == network.SIGNAL_DRIVER_RESET:
event = ZwaveNetworkResetEvent(device=self.device)
elif event.signal == network.SIGNAL_BUTTON_ON:
event = ZwaveButtonOnEvent(device=self.device,
node=ZwavePlugin.node_to_dict(event.args['node']))
elif event.signal == network.SIGNAL_BUTTON_OFF:
event = ZwaveButtonOffEvent(device=self.device,
node=ZwavePlugin.node_to_dict(event.args['node']))
elif event.signal == network.SIGNAL_CONTROLLER_COMMAND:
event = ZwaveCommandEvent(device=self.device,
state=event.args['state'],
state_description=event.args['state_full'],
error=event.args['error'] if event.args['error_int'] else None,
error_description=event.args['error_full'] if event.args['error_int'] else None,
node=ZwavePlugin.node_to_dict(event.args['node']) if event.args['node'] else None)
elif event.signal == network.SIGNAL_CONTROLLER_WAITING:
event = ZwaveCommandWaitingEvent(device=self.device,
state=event.args['state'],
state_description=event.args['state_full'])
elif event.signal == network.SIGNAL_CREATE_BUTTON:
event = ZwaveButtonCreatedEvent(device=self.device,
node=ZwavePlugin.node_to_dict(event.args['node']))
elif event.signal == network.SIGNAL_DELETE_BUTTON:
event = ZwaveButtonRemovedEvent(device=self.device,
node=ZwavePlugin.node_to_dict(event.args['node']))
elif event.signal == network.SIGNAL_GROUP:
event = ZwaveNodeGroupEvent(device=self.device,
node=ZwavePlugin.node_to_dict(event.args['node']),
group_index=event.args['groupidx'])
elif event.signal == network.SIGNAL_NETWORK_AWAKED:
event = ZwaveNetworkReadyEvent(
device=self.device,
ozw_library_version=self.network.controller.ozw_library_version,
python_library_version=self.network.controller.python_library_version,
zwave_library=self.network.controller.library_description,
home_id=self.network.controller.home_id,
node_id=self.network.controller.node_id,
node_version=self.network.controller.node.version,
nodes_count=self.network.nodes_count,
)
elif event.signal == network.SIGNAL_NODE_EVENT:
event = ZwaveNodeEvent(device=self.device,
node=ZwavePlugin.node_to_dict(event.args['node']))
elif event.signal == network.SIGNAL_NODE_ADDED:
event = ZwaveNodeAddedEvent(device=self.device,
node=ZwavePlugin.node_to_dict(event.args['node']))
elif event.signal == network.SIGNAL_NODE_NAMING:
event = ZwaveNodeRenamedEvent(device=self.device,
node=ZwavePlugin.node_to_dict(event.args['node']))
elif event.signal == network.SIGNAL_NODE_READY:
event = ZwaveNodeReadyEvent(device=self.device,
node=ZwavePlugin.node_to_dict(event.args['node']))
elif event.signal == network.SIGNAL_NODE_REMOVED:
event = ZwaveNodeRemovedEvent(device=self.device,
node=ZwavePlugin.node_to_dict(event.args['node']))
elif event.signal == network.SIGNAL_POLLING_DISABLED:
event = ZwaveNodePollingEnabledEvent(device=self.device,
node=ZwavePlugin.node_to_dict(event.args['node']))
elif event.signal == network.SIGNAL_POLLING_ENABLED:
event = ZwaveNodePollingDisabledEvent(device=self.device,
node=ZwavePlugin.node_to_dict(event.args['node']))
elif event.signal == network.SIGNAL_SCENE_EVENT:
event = ZwaveNodeSceneEvent(device=self.device,
scene_id=event.args['scene_id'],
node=ZwavePlugin.node_to_dict(event.args['node']))
elif event.signal == network.SIGNAL_VALUE_ADDED:
event = ZwaveValueAddedEvent(device=self.device,
node=ZwavePlugin.node_to_dict(event.args['node']),
value=ZwavePlugin.value_to_dict(event.args['value']))
elif event.signal == network.SIGNAL_VALUE_CHANGED:
event = ZwaveValueChangedEvent(device=self.device,
node=ZwavePlugin.node_to_dict(event.args['node']),
value=ZwavePlugin.value_to_dict(event.args['value']))
elif event.signal == network.SIGNAL_VALUE_REFRESHED:
event = ZwaveValueRefreshedEvent(device=self.device,
node=ZwavePlugin.node_to_dict(event.args['node']),
value=ZwavePlugin.value_to_dict(event.args['value']))
elif event.signal == network.SIGNAL_VALUE_REMOVED:
event = ZwaveValueRemovedEvent(device=self.device,
node=ZwavePlugin.node_to_dict(event.args['node']),
value=ZwavePlugin.value_to_dict(event.args['value']))
if isinstance(event, ZwaveEvent):
self.bus.post(event)
def __enter__(self):
self.start_network()
def __exit__(self, exc_type, exc_val, exc_tb):
self.stop_network()
def loop(self):
try:
event = event_queue.get(block=True, timeout=1.0)
self._process_event(event)
except queue.Empty:
pass
# vim:sw=4:ts=4:et:

View File

@ -0,0 +1,212 @@
from typing import Optional, Dict, Any
from platypush.message.event import Event
class ZwaveEvent(Event):
def __init__(self, device: Optional[str] = None, *args, **kwargs):
super().__init__(*args, device=device, **kwargs)
class ZwaveNetworkReadyEvent(ZwaveEvent):
"""
Triggered when the network started on a Z-Wave adapter becomes ready.
"""
def __init__(self,
ozw_library_version: str,
python_library_version: str,
zwave_library: str,
node_id: int,
node_version: str,
home_id: int,
nodes_count: int,
device: Optional[str] = None,
*args, **kwargs):
super().__init__(*args,
device=device,
ozw_library_version=ozw_library_version,
python_library_version=python_library_version,
zwave_library=zwave_library,
home_id=home_id,
node_id=node_id,
node_version=node_version,
nodes_count=nodes_count,
**kwargs)
class ZwaveNetworkStoppedEvent(ZwaveEvent):
"""
Triggered when a Z-Wave network is stopped.
"""
pass
class ZwaveNetworkErrorEvent(ZwaveEvent):
"""
Triggered when an error occurs on the Z-Wave network.
"""
pass
class ZwaveNetworkResetEvent(ZwaveEvent):
"""
Triggered when a Z-Wave network is reset.
"""
pass
class ZwaveNodeEvent(ZwaveEvent):
"""
Generic Z-Wave node event class.
"""
def __init__(self, node: Dict[str, Any], *args, **kwargs):
super().__init__(*args, node=node, **kwargs)
class ZwaveNodeAddedEvent(ZwaveNodeEvent):
"""
Triggered when a node is added to the network.
"""
pass
class ZwaveNodeRemovedEvent(ZwaveNodeEvent):
"""
Triggered when a node is removed from the network.
"""
pass
class ZwaveNodeRenamedEvent(ZwaveNodeEvent):
"""
Triggered when a node is renamed.
"""
pass
class ZwaveNodeReadyEvent(ZwaveNodeEvent):
"""
Triggered when a node is ready.
"""
pass
class ZwaveNodeGroupEvent(ZwaveNodeEvent):
"""
Triggered when a node is associated/de-associated to a group.
"""
def __init__(self, group_index: Optional[int] = None, *args, **kwargs):
super().__init__(*args, group_index=group_index, **kwargs)
class ZwaveNodeSceneEvent(ZwaveNodeEvent):
"""
Triggered when a scene is activated on a node.
"""
def __init__(self, scene_id: int, *args, **kwargs):
super().__init__(*args, scene_id=scene_id, **kwargs)
class ZwaveNodePollingEnabledEvent(ZwaveNodeEvent):
"""
Triggered when the polling of a node is successfully turned on.
"""
pass
class ZwaveNodePollingDisabledEvent(ZwaveNodeEvent):
"""
Triggered when the polling of a node is successfully turned off.
"""
pass
class ZwaveButtonCreatedEvent(ZwaveNodeEvent):
"""
Triggered when a button is added to the network.
"""
pass
class ZwaveButtonRemovedEvent(ZwaveNodeEvent):
"""
Triggered when a button is removed from the network.
"""
pass
class ZwaveButtonOnEvent(ZwaveNodeEvent):
"""
Triggered when a button is pressed.
"""
pass
class ZwaveButtonOffEvent(ZwaveNodeEvent):
"""
Triggered when a button is released.
"""
pass
class ZwaveValueEvent(ZwaveEvent):
"""
Abstract class for Z-Wave value events.
"""
def __init__(self, node: Dict[str, Any], value: Dict[str, Any], *args, **kwargs):
super().__init__(*args, node=node, value=value, **kwargs)
class ZwaveValueAddedEvent(ZwaveValueEvent):
"""
Triggered when a value is added to a node on the network.
"""
pass
class ZwaveValueChangedEvent(ZwaveValueEvent):
"""
Triggered when a value of a node on the network changes.
"""
pass
class ZwaveValueRefreshedEvent(ZwaveValueEvent):
"""
Triggered when a value of a node on the network is refreshed.
"""
pass
class ZwaveValueRemovedEvent(ZwaveValueEvent):
"""
Triggered when a value of a node on the network is removed.
"""
pass
class ZwaveNodeQueryCompletedEvent(ZwaveEvent):
"""
Triggered when all the nodes on the network have been queried.
"""
pass
class ZwaveCommandEvent(ZwaveEvent):
"""
Triggered when a command is received on the network.
"""
def __init__(self, state: str, state_description: str, error: Optional[str] = None,
error_description: Optional[str] = None, node: Optional[Dict[str, Any]] = None, *args, **kwargs):
super().__init__(*args, state=state, state_description=state_description,
error=error, error_description=error_description, node=node, **kwargs)
class ZwaveCommandWaitingEvent(ZwaveCommandEvent):
"""
Triggered when a command is waiting for a message to proceed.
"""
pass
# vim:sw=4:ts=4:et:

View File

@ -0,0 +1,906 @@
from typing import Any, Dict, Optional, List, Union
from openzwave.group import ZWaveGroup
from openzwave.node import ZWaveNode
from openzwave.scene import ZWaveScene
from openzwave.value import ZWaveValue
from platypush.backend.zwave import ZwaveBackend
from platypush.context import get_backend
from platypush.plugins import Plugin, action
class ZwavePlugin(Plugin):
"""
This plugin interacts with the devices on a Z-Wave network started through the
:class:`platypush.backend.zwave.ZwaveBackend` backend.
Requires:
* **python-openzwave** (``pip install python-openzwave``)
* The :class:`platypush.backend.zwave.ZwaveBackend` backend configured and running.
"""
@staticmethod
def _get_backend() -> ZwaveBackend:
backend = get_backend('zwave')
if not backend:
raise AssertionError('Z-Wave backend not configured')
return backend
@classmethod
def _get_network(cls):
backend = cls._get_backend()
if not backend.network:
backend.start_network()
return backend.network
@classmethod
def _get_controller(cls):
return cls._get_network().controller
@action
def start_network(self):
backend = self._get_backend()
backend.start_network()
@action
def stop_network(self):
backend = self._get_backend()
backend.stop_network()
@action
def add_node(self, do_security=False):
"""
Start the inclusion process to add a node to the network.
:param do_security: Whether to initialize the Network Key on the device if it supports the Security CC
"""
controller = self._get_controller()
controller.add_node(do_security)
@action
def remove_node(self):
"""
Remove a node from the network.
"""
controller = self._get_controller()
controller.remove_node()
self.write_config()
@action
def remove_failed_node(self, node_id: Optional[int] = None, node_name: Optional[str] = None):
"""
Remove a failed node from the network.
:param node_id: Filter by node_id.
:param node_name: Filter by node name.
"""
controller = self._get_controller()
node = self._get_node(node_id=node_id, node_name=node_name)
controller.remove_failed_node(node.node_id)
self.write_config()
@action
def replace_failed_node(self, node_id: Optional[int] = None, node_name: Optional[str] = None):
"""
Replace a failed node on the network.
:param node_id: Filter by node_id.
:param node_name: Filter by node name.
"""
controller = self._get_controller()
node = self._get_node(node_id=node_id, node_name=node_name)
controller.replace_failed_node(node.node_id)
self.write_config()
@action
def replication_send(self, node_id: Optional[int] = None, node_name: Optional[str] = None):
"""
Send node information from the primary to the secondary controller.
:param node_id: Filter by node_id.
:param node_name: Filter by node name.
"""
controller = self._get_controller()
node = self._get_node(node_id=node_id, node_name=node_name)
controller.replication_send(node.node_id)
@action
def request_network_update(self, node_id: Optional[int] = None, node_name: Optional[str] = None):
"""
Request a network update to a node.
:param node_id: Filter by node_id.
:param node_name: Filter by node name.
"""
controller = self._get_controller()
node = self._get_node(node_id=node_id, node_name=node_name)
controller.request_network_update(node.node_id)
@action
def request_node_neighbour_update(self, node_id: Optional[int] = None, node_name: Optional[str] = None):
"""
Request a neighbours list update to a node.
:param node_id: Filter by node_id.
:param node_name: Filter by node name.
"""
controller = self._get_controller()
node = self._get_node(node_id=node_id, node_name=node_name)
controller.request_node_neighbor_update(node.node_id)
@staticmethod
def value_to_dict(value) -> Dict[str, Any]:
if not value:
return {}
return {
'command_class': value.command_class,
'data': value.data,
'data_items': list(value.data_items) if isinstance(value.data_items, set) else value.data_items,
'genre': value.genre,
'help': value.help,
'home_id': value.home_id,
'id_on_network': value.id_on_network if value.parent_id is not None and value.command_class is not None
and value.instance is not None and value.index is not None else None,
'index': value.index,
'instance': value.instance,
'is_polled': value.is_polled,
'is_read_only': value.is_read_only,
'is_set': value.is_set,
'is_write_only': value.is_write_only,
'label': value.label,
'last_update': value.last_update,
'min': value.min,
'max': value.max,
'object_id': value.object_id,
'outdated': value.outdated,
'parent_id': value.parent_id,
'poll_intensity': value.poll_intensity,
'precision': value.precision,
'type': value.type,
'units': value.units,
'use_cache': value.use_cache,
'value_id': value.value_id,
}
@staticmethod
def group_to_dict(group) -> Dict[str, Any]:
if not group:
return {}
return {
'index': group.index,
'label': group.label,
'max_associations': group.max_associations,
'associations': group.associations,
}
@classmethod
def node_to_dict(cls, node) -> Dict[str, Any]:
if not node:
return {}
return {
'node_id': node.node_id,
'home_id': node.home_id,
'capabilities': list(node.capabilities),
'command_classes': list(node.command_classes),
'device_type': node.device_type,
'groups': {
group_id: cls.group_to_dict(group)
for group_id, group in node.groups.items()
},
'is_awake': node.is_awake,
'is_failed': node.is_failed,
'is_beaming_device': node.is_beaming_device,
'is_frequent_listening_device': node.is_frequent_listening_device,
'is_info_received': node.is_info_received,
'is_listening_device': node.is_listening_device,
'is_locked': node.is_locked,
'is_ready': node.is_ready,
'is_routing_device': node.is_routing_device,
'is_security_device': node.is_security_device,
'is_sleeping': node.is_sleeping,
'last_update': node.last_update,
'location': node.location,
'manufacturer_id': node.manufacturer_id,
'manufacturer_name': node.manufacturer_name,
'max_baud_rate': node.max_baud_rate,
'name': node.name,
'outdated': node.outdated,
'product_id': node.product_id,
'product_name': node.product_name,
'product_type': node.product_type,
'query_stage': node.query_stage,
'role': node.role,
'security': node.security,
'specific': node.specific,
'type': node.type,
'use_cache': node.use_cache,
'version': node.version,
'values': {
value_id: cls.value_to_dict(value)
for value_id, value in (node.values or {}).items()
},
}
def _get_node(self, node_id: Optional[int] = None, node_name: Optional[str] = None) -> ZWaveNode:
assert node_id is not None or node_name is not None, 'Specify either node_id or name'
nodes = self._get_backend().network.nodes
if node_id is not None:
assert node_id in nodes, 'No such node_id: {}'.format(node_id)
return nodes[node_id]
nodes = [n for n in nodes.values() if n.name == node_name]
assert nodes, 'No such node name: {}'.format(node_name)
return nodes[0]
def _get_groups(self) -> Dict[int, ZWaveGroup]:
return {
group_index: group
for node in self._get_backend().network.nodes.values()
for group_index, group in node.groups.items()
}
def _get_group(self, group_index: Optional[int] = None, group_label: Optional[str] = None) -> ZWaveGroup:
assert group_index is not None or group_label is not None, 'Specify either group_index or label'
groups = self._get_groups()
if group_index is not None:
assert group_index in groups, 'No such group_index: {}'.format(group_index)
return groups[group_index]
groups = [g for g in groups.values() if g.label == group_label]
assert groups, 'No such group label: {}'.format(group_label)
return groups[0]
def _get_scene(self, scene_id: Optional[int] = None, scene_label: Optional[str] = None) -> ZWaveScene:
assert scene_id is not None or scene_label is not None, 'Specify either scene_id or label'
scenes = self._get_backend().network.get_scenes()
if scene_id is not None:
assert scene_id in scenes, 'No such scene_id: {}'.format(scene_id)
return scenes[scene_id]
scenes = [s for s in scenes.values() if s['label'] == scene_label]
assert scenes, 'No such scene label: {}'.format(scene_label)
return scenes[0]
@action
def get_nodes(self, node_id: Optional[int] = None, node_name: Optional[str] = None) -> Dict[str, Any]:
"""
Get the nodes associated to the network.
:param node_id: Filter by node_id.
:param node_name: Filter by node name.
"""
if node_id is not None or node_name is not None:
return self.node_to_dict(self._get_node(node_id=node_id, node_name=node_name))
return {
node_id: self.node_to_dict(node)
for node_id, node in self._get_backend().network.nodes.items()
}
@action
def get_node_stats(self, node_id: Optional[int] = None, node_name: Optional[str] = None) -> Dict[str, Any]:
"""
Get the statistics of a node on the network.
:param node_id: Filter by node_id.
:param node_name: Filter by node name.
"""
node = self._get_node(node_id=node_id, node_name=node_name)
return node.stats
@action
def set_node_name(self, new_name: str, node_id: Optional[int] = None, node_name: Optional[str] = None):
"""
Rename a node on the network.
:param new_name: New name for the node.
:param node_id: Filter by node_id.
:param node_name: Filter by current node name.
"""
node = self._get_node(node_id=node_id, node_name=node_name)
node.name = new_name
self.write_config()
@action
def set_node_product_name(self, product_name: str, node_id: Optional[int] = None, node_name: Optional[str] = None):
"""
Set the product name of a node.
:param product_name: Product name.
:param node_id: Filter by node_id.
:param node_name: Filter by current node name.
"""
node = self._get_node(node_id=node_id, node_name=node_name)
node.product_name = product_name
self.write_config()
@action
def set_node_manufacturer_name(self, manufacturer_name: str, node_id: Optional[int] = None,
node_name: Optional[str] = None):
"""
Set the manufacturer name of a node.
:param manufacturer_name: Manufacturer name.
:param node_id: Filter by node_id.
:param node_name: Filter by current node name.
"""
node = self._get_node(node_id=node_id, node_name=node_name)
node.manufacturer_name = manufacturer_name
self.write_config()
@action
def set_node_location(self, location: str, node_id: Optional[int] = None, node_name: Optional[str] = None):
"""
Set the location of a node.
:param location: Node location.
:param node_id: Filter by node_id.
:param node_name: Filter by current node name.
"""
node = self._get_node(node_id=node_id, node_name=node_name)
node.location = location
self.write_config()
@action
def cancel_command(self):
"""
Cancel the current running command.
"""
self._get_controller().cancel_command()
@action
def kill_command(self):
"""
Immediately terminate any running command on the controller and release the lock.
"""
self._get_controller().kill_command()
@action
def set_controller_name(self, node_name: str):
"""
Set the name of the controller on the network.
:param node_name: New controller name.
"""
self._get_controller().name = node_name
self.write_config()
@action
def get_capabilities(self) -> List[str]:
"""
Get the capabilities of the controller.
"""
return list(self._get_controller().capabilities)
@action
def receive_configuration(self):
"""
Receive the configuration from the primary controller on the network. Requires a primary controller active.
"""
self._get_controller().receive_configuration()
@action
def transfer_primary_role(self):
"""
Add a new controller to the network and make it the primary.
The existing primary will become a secondary controller.
"""
self._get_controller().transfer_primary_role()
@action
def heal(self, refresh_routes: bool = False):
"""
Heal network by requesting nodes rediscover their neighbors.
:param refresh_routes: Whether to perform return routes initialization (default: ``False``).
"""
self._get_network().heal(refresh_routes)
@action
def switch_all(self, state: bool):
"""
Switch all the connected devices on/off.
:param state: True (switch on) or False (switch off).
"""
self._get_network().switch_all(state)
@action
def test(self, count: int = 1):
"""
Send a number of test messages to every node and record results.
:param count: The number of test messages to send.
"""
self._get_network().test(count)
def _get_value(self, value_id: Optional[int] = None, id_on_network: Optional[str] = None,
node_id: Optional[int] = None, node_name: Optional[str] = None, value_label: Optional[str] = None) \
-> ZWaveValue:
assert (value_id is not None or id_on_network is not None) or \
(node_id is not None and node_name is not None and value_label is not None),\
'Specify either value_id, id_on_network, or [node_id/node_name, value_label]'
if value_id is not None:
return self._get_network().get_value(value_id)
if id_on_network is not None:
return self._get_network().get_value_from_id_on_network(id_on_network)
node = self._get_node(node_id=node_id, node_name=node_name)
values = [v for v in node.values if v.label == value_label]
assert values, 'No such value on node "{}": "{}"'.format(node.name, value_label)
return values[0]
@action
def get_value(self, value_id: Optional[int] = None, id_on_network: Optional[str] = None,
value_label: Optional[str] = None, node_id: Optional[int] = None, node_name: Optional[str] = None) \
-> Dict[str, Any]:
"""
Get a value on the network.
:param value_id: Select by value_id.
:param id_on_network: Select value by id_on_network.
:param value_label: Select value by [node_id/node_name, value_label]
:param node_id: Select value by [node_id/node_name, value_label]
:param node_name: Select value by [node_id/node_name, value_label]
"""
return self._get_value(value_id=value_id, id_on_network=id_on_network,
node_id=node_id, node_name=node_name, value_label=value_label).to_dict()
@action
def set_value(self, data, value_id: Optional[int] = None, id_on_network: Optional[str] = None,
value_label: Optional[str] = None, node_id: Optional[int] = None, node_name: Optional[str] = None):
"""
Set a value.
:param data: Data to set for the value.
:param value_id: Select value by value_id.
:param id_on_network: Select value by id_on_network.
:param value_label: Select value by [node_id/node_name, value_label]
:param node_id: Select value by [node_id/node_name, value_label]
:param node_name: Select value by [node_id/node_name, value_label]
"""
value = self._get_value(value_id=value_id, id_on_network=id_on_network,
node_id=node_id, node_name=node_name, value_label=value_label)
value.data = data
@action
def node_add_value(self, value_id: Optional[int] = None, id_on_network: Optional[str] = None,
value_label: Optional[str] = None, node_id: Optional[int] = None,
node_name: Optional[str] = None):
"""
Add a value to a node.
:param value_id: Select value by value_id.
:param id_on_network: Select value by id_on_network.
:param value_label: Select value by label.
:param node_id: Select node by node_id.
:param node_name: Select node by label.
"""
node = self._get_node(node_id=node_id, node_name=node_name)
value = self._get_value(value_id=value_id, id_on_network=id_on_network, node_id=node.node_id,
value_label=value_label)
node.add_value(value.value_id)
self.write_config()
@action
def node_remove_value(self, value_id: Optional[int] = None, id_on_network: Optional[str] = None,
value_label: Optional[str] = None, node_id: Optional[int] = None,
node_name: Optional[str] = None):
"""
Remove a value from a node.
:param value_id: Select value by value_id.
:param id_on_network: Select value by id_on_network.
:param value_label: Select value by [node_id/node_name, value_label]
:param node_id: Select node by node_id.
:param node_name: Select node by label.
"""
node = self._get_node(node_id=node_id, node_name=node_name)
value = self._get_value(value_id=value_id, id_on_network=id_on_network, node_id=node.node_id,
value_label=value_label)
node.remove_value(value.value_id)
self.write_config()
@action
def node_heal(self, node_id: Optional[int] = None, node_name: Optional[str] = None, refresh_routes: bool = False):
"""
Heal network node by requesting the node to rediscover their neighbours.
:param node_id: Select node by node_id.
:param node_name: Select node by label.
:param refresh_routes: Whether to perform return routes initialization. (default: ``False``).
"""
node = self._get_node(node_id=node_id, node_name=node_name)
node.heal(refresh_routes)
@action
def node_update_neighbours(self, node_id: Optional[int] = None, node_name: Optional[str] = None):
"""
Ask a node to update its neighbours table.
:param node_id: Select node by node_id.
:param node_name: Select node by label.
"""
node = self._get_node(node_id=node_id, node_name=node_name)
node.neighbor_update()
@action
def node_network_update(self, node_id: Optional[int] = None, node_name: Optional[str] = None):
"""
Update the controller with network information.
:param node_id: Select node by node_id.
:param node_name: Select node by label.
"""
node = self._get_node(node_id=node_id, node_name=node_name)
node.network_update()
@action
def node_refresh_info(self, node_id: Optional[int] = None, node_name: Optional[str] = None):
"""
Fetch up-to-date information about the node.
:param node_id: Select node by node_id.
:param node_name: Select node by label.
"""
node = self._get_node(node_id=node_id, node_name=node_name)
node.refresh_info()
def _get_values(self, item: str, node_id: Optional[int] = None, node_name: Optional[str] = None) -> Dict[int, Any]:
nodes = [self._get_node(node_id=node_id, node_name=node_name)] if node_id or node_name \
else self._get_network().nodes.values()
return {
value_id: {
'node_id': node.node_id,
'node_name': node.name,
**self.value_to_dict(value)
}
for node in nodes
for value_id, value in getattr(node, 'get_' + item)().items()
}
@action
def get_dimmers(self, node_id: Optional[int] = None, node_name: Optional[str] = None) -> Dict[int, Any]:
"""
Get the dimmers on the network or associated to a node.
:param node_id: Select node by node_id.
:param node_name: Select node by label.
"""
return self._get_values('dimmers', node_id=node_id, node_name=node_name)
@action
def get_node_config(self, node_id: Optional[int] = None, node_name: Optional[str] = None) -> Dict[int, Any]:
"""
Get the configuration values of a node or of all the nodes on the network.
:param node_id: Select node by node_id.
:param node_name: Select node by label.
"""
return self._get_values('configs', node_id=node_id, node_name=node_name)
@action
def get_battery_levels(self, node_id: Optional[int] = None, node_name: Optional[str] = None) -> Dict[int, Any]:
"""
Get the battery levels of a node or of all the nodes on the network.
:param node_id: Select node by node_id.
:param node_name: Select node by name.
"""
return self._get_values('battery_levels', node_id=node_id, node_name=node_name)
@action
def get_power_levels(self, node_id: Optional[int] = None, node_name: Optional[str] = None) -> Dict[int, Any]:
"""
Get the power levels of this node.
:param node_id: Select node by node_id.
:param node_name: Select node by name.
"""
return self._get_values('power_levels', node_id=node_id, node_name=node_name)
@action
def get_rgb_bulbs(self, node_id: Optional[int] = None, node_name: Optional[str] = None) -> Dict[int, Any]:
"""
Get the RGB bulbs/LEDs on the network or associated to a node.
:param node_id: Select node by node_id.
:param node_name: Select node by name.
"""
return self._get_values('rgbbulbs', node_id=node_id, node_name=node_name)
@action
def get_switches(self, node_id: Optional[int] = None, node_name: Optional[str] = None) -> Dict[int, Any]:
"""
Get the switches on the network or associated to a node.
:param node_id: Select node by node_id.
:param node_name: Select node by name.
"""
return self._get_values('switches', node_id=node_id, node_name=node_name)
@action
def get_sensors(self, node_id: Optional[int] = None, node_name: Optional[str] = None) -> Dict[int, Any]:
"""
Get the sensors on the network or associated to a node.
:param node_id: Select node by node_id.
:param node_name: Select node by name.
"""
return self._get_values('sensors', node_id=node_id, node_name=node_name)
@action
def get_doorlocks(self, node_id: Optional[int] = None, node_name: Optional[str] = None) -> Dict[int, Any]:
"""
Get the doorlocks on the network or associated to a node.
:param node_id: Select node by node_id.
:param node_name: Select node by name.
"""
return self._get_values('doorlocks', node_id=node_id, node_name=node_name)
@action
def get_usercodes(self, node_id: Optional[int] = None, node_name: Optional[str] = None) -> Dict[int, Any]:
"""
Get the usercodes on the network or associated to a node.
:param node_id: Select node by node_id.
:param node_name: Select node by name.
"""
return self._get_values('usercodes', node_id=node_id, node_name=node_name)
@action
def get_thermostats(self, node_id: Optional[int] = None, node_name: Optional[str] = None) -> Dict[int, Any]:
"""
Get the thermostats on the network or associated to a node.
:param node_id: Select node by node_id.
:param node_name: Select node by name.
"""
return self._get_values('thermostats', node_id=node_id, node_name=node_name)
@action
def get_protections(self, node_id: Optional[int] = None, node_name: Optional[str] = None) -> Dict[int, Any]:
"""
Get the protection-compatible devices on the network or associated to a node.
:param node_id: Select node by node_id.
:param node_name: Select node by name.
"""
return self._get_values('protections', node_id=node_id, node_name=node_name)
@action
def get_groups(self) -> Dict[int, Any]:
"""
Get the groups on the network.
"""
return {
group_index: self.group_to_dict(group)
for group_index, group in self._get_groups().items()
}
@action
def get_scenes(self) -> Dict[str, Any]:
"""
Get the scenes configured on the network.
"""
return self._get_network().scenes_to_dict()
@action
def create_scene(self, label: str):
"""
Create a new scene.
:param label: Scene label.
"""
self._get_network().create_scene(label)
@action
def remove_scene(self, scene_id: Optional[int] = None, scene_label: Optional[str] = None):
"""
Remove a scene.
:param scene_id: Select by scene_id.
:param scene_label: Select by scene label.
"""
scene = self._get_scene(scene_id=scene_id, scene_label=scene_label)
self._get_network().remove_scene(scene.scene_id)
@action
def activate_scene(self, scene_id: Optional[int] = None, scene_label: Optional[str] = None):
"""
Activate a scene.
:param scene_id: Select by scene_id.
:param scene_label: Select by scene label.
"""
scene = self._get_scene(scene_id=scene_id, scene_label=scene_label)
scene.activate()
@action
def set_scene_label(self, new_label: str, scene_id: Optional[int] = None, scene_label: Optional[str] = None):
"""
Rename a scene/set the scene label.
:param new_label: New label.
:param scene_id: Select by scene_id.
:param scene_label: Select by current scene label.
"""
scene = self._get_scene(scene_id=scene_id, scene_label=scene_label)
scene.label = new_label
self.write_config()
@action
def scene_add_value(self, data, value_id: Optional[int] = None, id_on_network: Optional[str] = None,
value_label: Optional[str] = None, scene_id: Optional[int] = None,
scene_label: Optional[str] = None, node_id: Optional[int] = None,
node_name: Optional[str] = None):
"""
Add a value to a scene.
:param data: Data to set for the value.
:param value_id: Select value by value_id.
:param id_on_network: Select value by id_on_network.
:param value_label: Select value by [node_id/node_name, value_label]
:param node_id: Select value by [node_id/node_name, value_label]
:param node_name: Select value by [node_id/node_name, value_label]
:param scene_id: Select scene by scene_id.
:param scene_label: Select scene by scene label.
"""
value = self._get_value(value_id=value_id, id_on_network=id_on_network, node_id=node_id, node_name=node_name,
value_label=value_label)
scene = self._get_scene(scene_id=scene_id, scene_label=scene_label)
scene.add_value(value.value_id, data)
self.write_config()
@action
def scene_remove_value(self, value_id: Optional[int] = None, id_on_network: Optional[str] = None,
value_label: Optional[str] = None, scene_id: Optional[int] = None,
scene_label: Optional[str] = None, node_id: Optional[int] = None,
node_name: Optional[str] = None):
"""
Remove a value from a scene.
:param value_id: Select value by value_id.
:param id_on_network: Select value by id_on_network.
:param value_label: Select value by [node_id/node_name, value_label]
:param node_id: Select value by [node_id/node_name, value_label]
:param node_name: Select value by [node_id/node_name, value_label]
:param scene_id: Select scene by scene_id.
:param scene_label: Select scene by scene label.
"""
value = self._get_value(value_id=value_id, id_on_network=id_on_network, node_id=node_id, node_name=node_name,
value_label=value_label)
scene = self._get_scene(scene_id=scene_id, scene_label=scene_label)
scene.remove_value(value.value_id)
self.write_config()
@action
def get_scene_values(self, scene_id: Optional[int] = None, scene_label: Optional[str] = None) -> dict:
"""
Get the values associated to a scene.
:param scene_id: Select by scene_id.
:param scene_label: Select by scene label.
:return: value_id -> value (as a dict) mapping.
"""
scene = self._get_scene(scene_id=scene_id, scene_label=scene_label)
return {v.value_id: v.to_dict() for v in scene.get_values().items()}
@action
def get_scene_values(self, scene_id: Optional[int] = None, scene_label: Optional[str] = None) -> dict:
"""
Get the values associated to a scene.
:param scene_id: Select by scene_id.
:param scene_label: Select by scene label.
"""
scene = self._get_scene(scene_id=scene_id, scene_label=scene_label)
return {v.value_id: v.to_dict() for v in scene.get_values().items()}
@action
def create_button(self, button_id: Union[int, str], node_id: Optional[int] = None, node_name: Optional[str] = None):
"""
Create a handheld button on a device. Only intended for bridge firmware controllers.
:param button_id: The ID of the button.
:param node_id: Filter by node_id.
:param node_name: Filter by current node name.
"""
node = self._get_node(node_id=node_id, node_name=node_name)
self._get_controller().create_button(node.node_id, button_id)
self.write_config()
@action
def delete_button(self, button_id: Union[int, str], node_id: Optional[int] = None, node_name: Optional[str] = None):
"""
Delete a button association from a device. Only intended for bridge firmware controllers.
:param button_id: The ID of the button.
:param node_id: Filter by node_id.
:param node_name: Filter by current node name.
"""
node = self._get_node(node_id=node_id, node_name=node_name)
self._get_controller().delete_button(node.node_id, button_id)
self.write_config()
@action
def add_node_to_group(self, group_index: Optional[int] = None, group_label: Optional[str] = None,
node_id: Optional[int] = None, node_name: Optional[str] = None):
"""
Add a node to a group.
:param group_index: Select group by group index.
:param group_label: Select group by group label.
:param node_id: Select node by node_id.
:param node_name: Select node by node name.
:return:
"""
node = self._get_node(node_id=node_id, node_name=node_name)
group = self._get_group(group_index=group_index, group_label=group_label)
group.add_association(node.node_id)
self.write_config()
@action
def remove_node_from_group(self, group_index: Optional[int] = None, group_label: Optional[str] = None,
node_id: Optional[int] = None, node_name: Optional[str] = None):
"""
Remove a node from a group.
:param group_index: Select group by group index.
:param group_label: Select group by group label.
:param node_id: Select node by node_id.
:param node_name: Select node by node name.
:return:
"""
node = self._get_node(node_id=node_id, node_name=node_name)
group = self._get_group(group_index=group_index, group_label=group_label)
group.remove_association(node.node_id)
self.write_config()
@action
def create_new_primary(self):
"""
Create a new primary controller on the network when the previous primary fails.
"""
self._get_controller().create_new_primary()
self.write_config()
@action
def hard_reset(self):
"""
Perform a hard reset of the controller. It erases its network configuration settings.
The controller becomes a primary controller ready to add devices to a new network.
"""
self._get_controller().hard_reset()
@action
def soft_reset(self):
"""
Perform a soft reset of the controller.
Resets a controller without erasing its network configuration settings.
"""
self._get_controller().soft_reset()
@action
def write_config(self):
"""
Store the current configuration of the network to the user directory.
"""
self._get_network().write_config()
# vim:sw=4:ts=4:et:

View File

@ -224,3 +224,8 @@ croniter
# Support for nmap integration
# python-nmap
# Support for zigbee2mqtt
# paho-mqtt
# Support for Z-Wave
# python-openzwave

View File

@ -279,5 +279,9 @@ setup(
'sys': ['py-cpuinfo', 'psutil'],
# Support for nmap integration
'nmap': ['python-nmap'],
# Support for zigbee2mqtt
'zigbee': ['paho-mqtt'],
# Support for Z-Wave
'zwave': ['python-openzwave'],
},
)