The switch.wemo integration now extends SwitchEntityManager

This commit is contained in:
Fabio Manganiello 2023-02-04 00:58:28 +01:00
parent de2849546a
commit 0311d87bc3
Signed by: blacklight
GPG key ID: D90FBA7F76362774
2 changed files with 115 additions and 73 deletions

View file

@ -1,16 +1,17 @@
import contextlib
import ipaddress
from typing import List, Optional
from typing import Collection, Dict, List, Mapping, Optional, Union
from platypush.plugins import action
from platypush.plugins.switch import SwitchPlugin
from platypush.entities import Entity, SwitchEntityManager
from platypush.entities.switches import Switch
from platypush.plugins import RunnablePlugin, action
from platypush.utils.workers import Workers
from .lib import WemoRunner
from .scanner import Scanner
class SwitchWemoPlugin(SwitchPlugin):
class SwitchWemoPlugin(RunnablePlugin, SwitchEntityManager):
"""
Plugin to control a Belkin WeMo smart switches
(https://www.belkin.com/us/Products/home-automation/c/wemo-home-automation/)
@ -20,27 +21,46 @@ class SwitchWemoPlugin(SwitchPlugin):
def __init__(
self,
devices=None,
devices: Optional[Union[Collection[str], Mapping[str, str]]] = None,
netmask: Optional[str] = None,
port: int = _default_port,
**kwargs
):
"""
:param devices: List of IP addresses or name->address map containing the WeMo Switch devices to control.
This plugin previously used ouimeaux for auto-discovery but it's been dropped because
1. too slow 2. too heavy 3. auto-discovery failed too often.
This plugin previously used ``ouimeaux`` for auto-discovery, but it's
been dropped because:
1. Too slow
2. Too heavy
3. Auto-discovery failed too often
However, this also means that you now have to specify either:
- ``devices``: The devices you want to control, as a static list/map
- ``netmask``: The IP netmask that should be scanned for WeMo devices
:param devices: List of IP addresses or name->address map containing
the WeMo Switch devices to control.
:type devices: list or dict
:param netmask: Alternatively to a list of static IP->name pairs, you can specify the network mask where
the devices should be scanned (e.g. '192.168.1.0/24')
:param netmask: Alternatively to a list of static IP->name pairs, you
can specify the network mask where the devices should be scanned
(e.g. '192.168.1.0/24')
:param port: Port where the WeMo devices are expected to expose the RPC/XML over HTTP service (default: 49153)
:param port: Port where the WeMo devices are expected to expose the
RPC/XML over HTTP service (default: 49153)
"""
super().__init__(**kwargs)
assert devices or netmask, (
'Please specify either a static list of devices (either a list of '
'IP addresses or a name->address map) or an IP netmask to scan for '
'devices'
)
self.port = port
self.netmask = netmask
self._devices = {}
self._devices: Dict[str, str] = {}
self._init_devices(devices)
def _init_devices(self, devices):
@ -55,34 +75,6 @@ class SwitchWemoPlugin(SwitchPlugin):
self._addresses = set(self._devices.values())
@property
def switches(self) -> List[dict]:
"""
Get the list of available devices
:returns: The list of devices.
.. code-block:: json
[
{
"ip": "192.168.1.123",
"name": "Switch 1",
"on": true
},
{
"ip": "192.168.1.124",
"name": "Switch 2",
"on": false
}
]
"""
return [
self.status(device).output # type: ignore
for device in self._devices.values()
]
def _get_address(self, device: str) -> str:
if device not in self._addresses:
with contextlib.suppress(KeyError):
@ -91,8 +83,20 @@ class SwitchWemoPlugin(SwitchPlugin):
return device
@action
def status(self, device: Optional[str] = None, *_, **__):
devices = {device: device} if device else self._devices.copy()
# pylint: disable=arguments-differ
def status(
self,
device: Optional[Union[str, Collection[str]]] = None,
publish_entities: bool = True,
**__
) -> List[dict]:
if device:
if isinstance(device, str):
devices = {device: device}
else:
devices = {d: d for d in device}
else:
devices = self._devices.copy()
ret = [
{
@ -104,14 +108,12 @@ class SwitchWemoPlugin(SwitchPlugin):
for (name, addr) in devices.items()
]
self.publish_entities(ret) # type: ignore
return ret[0] if device else ret
if publish_entities:
self.publish_entities(ret)
return ret
def transform_entities(self, devices: List[dict]):
from platypush.entities.switches import Switch
return super().transform_entities( # type: ignore
[
def transform_entities(self, entities: Collection[dict]) -> List[Entity]:
return [
Switch(
id=dev["id"],
name=dev["name"],
@ -120,12 +122,11 @@ class SwitchWemoPlugin(SwitchPlugin):
"ip": dev["ip"],
},
)
for dev in (devices or [])
for dev in (entities or [])
]
)
@action
def on(self, device: str, **_):
def on(self, device: str, **_): # pylint: disable=arguments-differ
"""
Turn a switch on
@ -136,7 +137,7 @@ class SwitchWemoPlugin(SwitchPlugin):
return self.status(device)
@action
def off(self, device: str, **_):
def off(self, device: str, **_): # pylint: disable=arguments-differ
"""
Turn a switch off
@ -147,7 +148,7 @@ class SwitchWemoPlugin(SwitchPlugin):
return self.status(device)
@action
def toggle(self, device: str, *_, **__):
def toggle(self, device: str, *_, **__): # pylint: disable=arguments-differ
"""
Toggle a device on/off state
@ -178,7 +179,9 @@ class SwitchWemoPlugin(SwitchPlugin):
return WemoRunner.get_name(device)
@action
def scan(self, netmask: Optional[str] = None):
def scan(
self, netmask: Optional[str] = None, publish_entities: bool = True
) -> List[dict]:
netmask = netmask or self.netmask
assert netmask, "Scan not supported: No netmask specified"
@ -190,7 +193,33 @@ class SwitchWemoPlugin(SwitchPlugin):
devices = {dev.name: dev.addr for dev in workers.responses}
self._init_devices(devices)
return self.status()
return self.status(publish_entities=publish_entities).output
def main(self):
def scan():
status = (
self.scan(publish_entities=False).output
if not self._devices
else self.status(self._devices.values(), publish_entities=False).output
)
return {dev['ip']: dev for dev in status}
devices = {}
while not self.should_stop():
new_devices = scan()
updated_devices = {
ip: new_devices[ip]
for ip, dev in new_devices.items()
if any(v != devices.get(ip, {}).get(k) for k, v in dev.items())
}
if updated_devices:
self.publish_entities(updated_devices.values())
devices = new_devices
self.wait_stop(self.poll_interval)
# vim:sw=4:ts=4:et:

View file

@ -1,35 +1,48 @@
import socket
from dataclasses import dataclass
from typing import Optional
from platypush.utils.workers import Worker
from .lib import WemoRunner
@dataclass
class ScanResult:
def __init__(self, addr: str, name: str, on: bool):
self.addr = addr
self.name = name
self.on = on
"""
Models a scan result.
"""
addr: str
name: str
on: bool
class Scanner(Worker):
"""
Worker class used to scan WeMo devices on the network.
"""
timeout = 1.5
def __init__(self, port: int = WemoRunner.default_port, *args, **kwargs):
def __init__(self, *args, port: int = WemoRunner.default_port, **kwargs):
super().__init__(*args, **kwargs)
self.port = port
def process(self, addr: str) -> Optional[ScanResult]:
def process(self, msg: str) -> Optional[ScanResult]:
addr = msg
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(self.timeout)
sock.connect((addr, self.port))
sock.close()
return ScanResult(addr=addr, name=WemoRunner.get_name(addr), on=WemoRunner.get_state(addr))
return ScanResult(
addr=addr, name=WemoRunner.get_name(addr), on=WemoRunner.get_state(addr)
)
except OSError:
pass
return None
# vim:sw=4:ts=4:et: