Removed last reference of SwitchPlugin

This commit is contained in:
Fabio Manganiello 2023-02-05 23:10:35 +01:00
parent 419a0cec61
commit 9d028af524
Signed by: blacklight
GPG key ID: D90FBA7F76362774
2 changed files with 51 additions and 100 deletions

View file

@ -1,72 +0,0 @@
from abc import ABC, abstractmethod
from typing import List, Union
from platypush.plugins import Plugin, action
# TODO REMOVE ME
class SwitchPlugin(Plugin, ABC):
"""
Abstract class for interacting with switch devices
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
@action
@abstractmethod
def on(self, device, *args, **kwargs):
"""Turn the device on"""
raise NotImplementedError()
@action
@abstractmethod
def off(self, device, *args, **kwargs):
"""Turn the device off"""
raise NotImplementedError()
@action
@abstractmethod
def toggle(self, device, *args, **kwargs):
"""Toggle the device status (on/off)"""
raise NotImplementedError()
@action
def switch_status(self, device=None) -> Union[dict, List[dict]]:
"""
Get the status of a specified device or of all the configured devices (default).
:param device: Filter by device name or ID.
:return: .. schema:: switch.SwitchStatusSchema(many=True)
"""
devices = self.switches
if device:
devices = [
d
for d in self.switches
if d.get('id') == device or d.get('name') == device
]
return devices[0] if devices else []
return devices
@action
def status(self, device=None, *_, **__) -> Union[dict, List[dict]]:
"""
Get the status of all the devices, or filter by device name or ID (alias for :meth:`.switch_status`).
:param device: Filter by device name or ID.
:return: .. schema:: switch.SwitchStatusSchema(many=True)
"""
return self.switch_status(device)
@property
@abstractmethod
def switches(self) -> List[dict]:
"""
:return: .. schema:: switch.SwitchStatusSchema(many=True)
"""
raise NotImplementedError()
# vim:sw=4:ts=4:et:

View file

@ -1,16 +1,16 @@
import enum import enum
import time import time
from typing import List, Optional from typing import Any, Collection, Dict, List, Optional
from platypush.entities import EnumSwitchEntityManager
from platypush.entities.switches import EnumSwitch
from platypush.message.response.bluetooth import BluetoothScanResponse from platypush.message.response.bluetooth import BluetoothScanResponse
from platypush.plugins import action from platypush.plugins import action
from platypush.plugins.bluetooth.ble import BluetoothBlePlugin from platypush.plugins.bluetooth.ble import BluetoothBlePlugin
from platypush.plugins.switch import SwitchPlugin
class SwitchbotBluetoothPlugin( # lgtm [py/missing-call-to-init] # pylint: disable=too-many-ancestors
SwitchPlugin, BluetoothBlePlugin class SwitchbotBluetoothPlugin(BluetoothBlePlugin, EnumSwitchEntityManager):
):
""" """
Plugin to interact with a Switchbot (https://www.switch-bot.com/) device and Plugin to interact with a Switchbot (https://www.switch-bot.com/) device and
programmatically control switches over a Bluetooth interface. programmatically control switches over a Bluetooth interface.
@ -91,7 +91,7 @@ class SwitchbotBluetoothPlugin( # lgtm [py/missing-call-to-init]
raise e raise e
time.sleep(5) time.sleep(5)
return self.status(device) # type: ignore return self.status(device)
@action @action
def press(self, device): def press(self, device):
@ -127,6 +127,30 @@ class SwitchbotBluetoothPlugin( # lgtm [py/missing-call-to-init]
""" """
return self._run(device, self.Command.OFF) return self._run(device, self.Command.OFF)
@action
# pylint: disable=arguments-differ
def set_value(self, device: str, data: str, *_, **__):
"""
Entity-compatible ``set_value`` method to send a command to a device.
:param device: Device name or address
:param data: Command to send. Possible values are:
- ``on``: Press the button and remain in the pressed state.
- ``off``: Release a previously pressed button.
- ``press``: Press and release the button.
"""
if data == 'on':
return self.on(device)
if data == 'off':
return self.off(device)
if data == 'press':
return self.press(device)
self.logger.warning('Unknown command for SwitchBot "%s": "%s"', device, data)
return None
@action @action
def scan( def scan(
self, interface: Optional[str] = None, duration: int = 10 self, interface: Optional[str] = None, duration: int = 10
@ -138,14 +162,16 @@ class SwitchbotBluetoothPlugin( # lgtm [py/missing-call-to-init]
:param duration: Scan duration in seconds :param duration: Scan duration in seconds
""" """
devices = super().scan(interface=interface, duration=duration).devices compatible_devices: Dict[str, Any] = {}
compatible_devices = {} devices = (
super().scan(interface=interface, duration=duration).devices # type: ignore
)
for dev in devices: for dev in devices:
try: try:
characteristics = [ characteristics = [
chrc chrc
for chrc in self.discover_characteristics( for chrc in self.discover_characteristics( # type: ignore
dev['addr'], dev['addr'],
channel_type='random', channel_type='random',
wait=False, wait=False,
@ -157,14 +183,14 @@ class SwitchbotBluetoothPlugin( # lgtm [py/missing-call-to-init]
if characteristics: if characteristics:
compatible_devices[dev['addr']] = None compatible_devices[dev['addr']] = None
except Exception as e: except Exception as e:
self.logger.warning('Device scan error', e) self.logger.warning('Device scan error: %s', e)
self.publish_entities(compatible_devices) # type: ignore self.publish_entities(compatible_devices)
return BluetoothScanResponse(devices=compatible_devices) return BluetoothScanResponse(devices=compatible_devices)
@property @action
def switches(self) -> List[dict]: def status(self, *_, **__) -> List[dict]:
self.publish_entities(self.configured_devices) # type: ignore self.publish_entities(self.configured_devices)
return [ return [
{ {
'address': addr, 'address': addr,
@ -175,20 +201,17 @@ class SwitchbotBluetoothPlugin( # lgtm [py/missing-call-to-init]
for addr, name in self.configured_devices.items() for addr, name in self.configured_devices.items()
] ]
def transform_entities(self, devices: dict): def transform_entities(self, entities: Collection[dict]) -> Collection[EnumSwitch]:
from platypush.entities.switches import Switch return [
EnumSwitch(
return super().transform_entities( # type: ignore id=addr,
[ name=name,
Switch( value='on',
id=addr, values=['on', 'off', 'press'],
name=name, is_write_only=True,
state=False, )
is_write_only=True, for addr, name in entities
) ]
for addr, name in devices.items()
]
)
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et: