[WIP] Refactoring @manages annotation into a proper EntityManager hierarchy

This commit is contained in:
Fabio Manganiello 2023-02-02 23:21:12 +01:00
parent 63d6920716
commit be3b99326f
Signed by: blacklight
GPG key ID: D90FBA7F76362774
13 changed files with 535 additions and 394 deletions

View file

@ -1,18 +1,26 @@
import logging
from typing import Collection, Optional
from ._base import Entity, get_entities_registry
from ._base import Entity, get_entities_registry, init_entities_db
from ._engine import EntitiesEngine
from ._registry import manages, register_entity_plugin, get_plugin_entity_registry
from ._managers import register_entity_manager, get_plugin_entity_registry
from ._managers.lights import LightEntityManager
from ._managers.sensors import SensorEntityManager
from ._managers.switches import (
SwitchEntityManager,
DimmerEntityManager,
EnumSwitchEntityManager,
)
_engine: Optional[EntitiesEngine] = None
logger = logging.getLogger(__name__)
def init_entities_engine() -> EntitiesEngine:
from ._base import init_entities_db
global _engine
"""
Initialize and start the entities engine.
"""
global _engine # pylint: disable=global-statement
init_entities_db()
_engine = EntitiesEngine()
_engine.start()
@ -20,6 +28,17 @@ def init_entities_engine() -> EntitiesEngine:
def publish_entities(entities: Collection[Entity]):
"""
Publish a collection of entities to be processed by the engine.
The engine will:
- Normalize and merge the provided entities.
- Trigger ``EntityUpdateEvent`` events.
- Persist the new state to the local database.
:param entities: Entities to be published.
"""
if not _engine:
logger.debug('No entities engine registered')
return
@ -28,12 +47,16 @@ def publish_entities(entities: Collection[Entity]):
__all__ = (
'Entity',
'DimmerEntityManager',
'EntitiesEngine',
'Entity',
'EnumSwitchEntityManager',
'LightEntityManager',
'SensorEntityManager',
'SwitchEntityManager',
'get_entities_registry',
'get_plugin_entity_registry',
'init_entities_engine',
'publish_entities',
'register_entity_plugin',
'get_plugin_entity_registry',
'get_entities_registry',
'manages',
'register_entity_manager',
)

View file

@ -0,0 +1,155 @@
import inspect
import json
from abc import ABC, abstractmethod
from datetime import datetime
from typing import Any, Optional, Dict, Collection, Type
from platypush.config import Config
from platypush.entities import Entity
from platypush.utils import get_plugin_name_by_class, get_redis
_entity_registry_varname = '_platypush/plugin_entity_registry'
class EntityManager(ABC):
"""
Base mixin for all the integrations that support entities mapping.
The classes that implement the entity manager need to implement the
:meth:`.transform_entities` method, which will convert the supported
entities from whichever format the integration supports to a collection of
:class:`platypush.entities.Entity` objects.
The converted entities will then be passed to the
:class:`platypush.entities.EntitiesEngine` whenever
:meth:`.publish_entities` is called
The implemented classes should also implement the :meth:`.status` method.
This method should retrieve the current state of the entities and call
:meth:`.publish_entities`.
"""
def __new__(cls, *_, **__) -> 'EntityManager':
register_entity_manager(cls)
return super().__new__(cls)
@abstractmethod
def transform_entities(self, entities: Collection[Any]) -> Collection[Entity]:
"""
This method takes a list of entities in any (plugin-specific)
format and converts them into a standardized collection of
`Entity` objects. Since this method is called by
:meth:`.publish_entities` before entity updates are published,
you may usually want to extend it to pre-process the entities
managed by your extension into the standard format before they
are stored and published to all the consumers.
"""
assert all(isinstance(e, Entity) for e in entities), (
'Expected all the instances to be entities, got '
f'{[e.__class__.__name__ for e in entities]}'
)
return entities
@abstractmethod
def status(self, *_, **__):
"""
All derived classes should implement this method.
At the very least, this method should refresh the current state of the
integration's entities and call :meth:`.publish_entities`.
It should also return the current state of the entities as a list of
serialized entities, if possible.
"""
raise NotImplementedError(
'The `status` method has not been implemented in '
f'{self.__class__.__name__}'
)
def _normalize_entities(self, entities: Collection[Entity]) -> Collection[Entity]:
for entity in entities:
if entity.id:
# Entity IDs can only refer to the internal primary key
entity.external_id = entity.id
entity.id = None # type: ignore
entity.plugin = get_plugin_name_by_class(self.__class__) # type: ignore
entity.updated_at = datetime.utcnow() # type: ignore
return entities
def publish_entities(self, entities: Optional[Collection[Any]]):
"""
Publishes a list of entities. The downstream consumers include:
- The entity persistence manager
- The web server
- Any consumer subscribed to
:class:`platypush.message.event.entities.EntityUpdateEvent`
events (e.g. web clients)
You usually don't need to override this class (but you may want to
extend :meth:`.transform_entities` instead if your extension doesn't
natively handle `Entity` objects).
"""
from platypush.entities import publish_entities
transformed_entities = self._normalize_entities(
self.transform_entities(entities or [])
)
publish_entities(transformed_entities)
def register_entity_manager(cls: Type[EntityManager]):
"""
Associates a plugin as a manager for a certain entity type.
You usually don't have to call this method directly.
"""
entity_managers = [
c
for c in inspect.getmro(cls)
if issubclass(c, EntityManager) and c not in {cls, EntityManager}
]
plugin_name = get_plugin_name_by_class(cls) or ''
redis = get_redis()
registry = get_plugin_entity_registry()
registry_by_plugin = set(registry['by_plugin'].get(plugin_name, []))
for manager in entity_managers:
entity_type_name = manager.__name__
registry_by_type = set(registry['by_type'].get(entity_type_name, []))
registry_by_plugin.add(entity_type_name)
registry_by_type.add(plugin_name)
registry['by_plugin'][plugin_name] = list(registry_by_plugin)
registry['by_type'][entity_type_name] = list(registry_by_type)
redis.mset({_entity_registry_varname: json.dumps(registry)})
def get_plugin_entity_registry() -> Dict[str, Dict[str, Collection[str]]]:
"""
Get the `plugin->entity_types` and `entity_type->plugin`
mappings supported by the current configuration.
"""
redis = get_redis()
registry = redis.mget([_entity_registry_varname])[0]
try:
registry = json.loads((registry or b'').decode())
except (TypeError, ValueError):
return {'by_plugin': {}, 'by_type': {}}
enabled_plugins = set(Config.get_plugins().keys())
return {
'by_plugin': {
plugin_name: entity_types
for plugin_name, entity_types in registry.get('by_plugin', {}).items()
if plugin_name in enabled_plugins
},
'by_type': {
entity_type: [p for p in plugins if p in enabled_plugins]
for entity_type, plugins in registry.get('by_type', {}).items()
},
}

View file

@ -0,0 +1,20 @@
from abc import ABC, abstractmethod
from .switches import SwitchEntityManager
class LightEntityManager(SwitchEntityManager, ABC):
"""
Base class for integrations that support light/bulbs entities.
"""
@abstractmethod
def set_lights(self, *args, lights=None, **kwargs):
"""
Set a set of properties on a set of lights.
:param light: List of lights to set. Each item can represent a light
name or ID.
:param kwargs: key-value list of the parameters to set.
"""
raise NotImplementedError()

View file

@ -0,0 +1,9 @@
from abc import ABC
from . import EntityManager
class SensorEntityManager(EntityManager, ABC):
"""
Base class for integrations that support sensor entities.
"""

View file

@ -0,0 +1,54 @@
from abc import ABC, abstractmethod
from . import EntityManager
class SwitchEntityManager(EntityManager, ABC):
"""
Base class for integrations that support binary switches.
"""
@abstractmethod
def on(self, *_, **__):
"""Turn on a device"""
raise NotImplementedError()
@abstractmethod
def off(self, *_, **__):
"""Turn off a device"""
raise NotImplementedError()
@abstractmethod
def toggle(self, *_, **__):
"""Toggle the state of a device (on->off or off->on)"""
raise NotImplementedError()
class MultiLevelSwitchEntityManager(EntityManager, ABC):
"""
Base class for integrations that support dimmers/multi-level/enum switches.
Don't extend this class directly. Instead, use on of the available
intermediate abstract classes - like ``DimmerEntityManager`` or
``EnumSwitchEntityManager``.
"""
@abstractmethod
def set_value( # pylint: disable=redefined-builtin
self, *entities, property=None, value=None, **__
):
"""Set a value"""
raise NotImplementedError()
class DimmerEntityManager(MultiLevelSwitchEntityManager, ABC):
"""
Base class for integrations that support dimmers/multi-level switches.
"""
class EnumSwitchEntityManager(MultiLevelSwitchEntityManager, ABC):
"""
Base class for integrations that support switches with a pre-defined,
enum-like set of possible values.
"""

View file

@ -1,135 +0,0 @@
import json
from datetime import datetime
from typing import Optional, Dict, Collection, Type
from platypush.config import Config
from platypush.plugins import Plugin
from platypush.utils import get_plugin_name_by_class, get_redis
from ._base import Entity
_entity_registry_varname = '_platypush/plugin_entity_registry'
def register_entity_plugin(entity_type: Type[Entity], plugin: Plugin):
"""
Associates a plugin as a manager for a certain entity type.
If you use the `@manages` decorator then you usually don't have
to call this method directly.
"""
plugin_name = get_plugin_name_by_class(plugin.__class__) or ''
entity_type_name = entity_type.__name__.lower()
redis = get_redis()
registry = get_plugin_entity_registry()
registry_by_plugin = set(registry['by_plugin'].get(plugin_name, []))
registry_by_entity_type = set(registry['by_entity_type'].get(entity_type_name, []))
registry_by_plugin.add(entity_type_name)
registry_by_entity_type.add(plugin_name)
registry['by_plugin'][plugin_name] = list(registry_by_plugin)
registry['by_entity_type'][entity_type_name] = list(registry_by_entity_type)
redis.mset({_entity_registry_varname: json.dumps(registry)})
def get_plugin_entity_registry() -> Dict[str, Dict[str, Collection[str]]]:
"""
Get the `plugin->entity_types` and `entity_type->plugin`
mappings supported by the current configuration.
"""
redis = get_redis()
registry = redis.mget([_entity_registry_varname])[0]
try:
registry = json.loads((registry or b'').decode())
except (TypeError, ValueError):
return {'by_plugin': {}, 'by_entity_type': {}}
enabled_plugins = set(Config.get_plugins().keys())
return {
'by_plugin': {
plugin_name: entity_types
for plugin_name, entity_types in registry['by_plugin'].items()
if plugin_name in enabled_plugins
},
'by_entity_type': {
entity_type: [p for p in plugins if p in enabled_plugins]
for entity_type, plugins in registry['by_entity_type'].items()
},
}
class EntityManagerMixin:
"""
This mixin is injected on the fly into any plugin class declared with
the @manages decorator. The class will therefore implement the
`publish_entities` and `transform_entities` methods, which can be
overridden if required.
"""
def transform_entities(self, entities):
"""
This method takes a list of entities in any (plugin-specific)
format and converts them into a standardized collection of
`Entity` objects. Since this method is called by
:meth:`.publish_entities` before entity updates are published,
you may usually want to extend it to pre-process the entities
managed by your extension into the standard format before they
are stored and published to all the consumers.
"""
entities = entities or []
for entity in entities:
if entity.id:
# Entity IDs can only refer to the internal primary key
entity.external_id = entity.id
entity.id = None # type: ignore
entity.plugin = get_plugin_name_by_class(self.__class__) # type: ignore
entity.updated_at = datetime.utcnow()
return entities
def publish_entities(self, entities: Optional[Collection[Entity]]):
"""
Publishes a list of entities. The downstream consumers include:
- The entity persistence manager
- The web server
- Any consumer subscribed to
:class:`platypush.message.event.entities.EntityUpdateEvent`
events (e.g. web clients)
If your extension class uses the `@manages` decorator then you usually
don't need to override this class (but you may want to extend
:meth:`.transform_entities` instead if your extension doesn't natively
handle `Entity` objects).
"""
from . import publish_entities
transformed_entities = self.transform_entities(entities)
publish_entities(transformed_entities)
def manages(*entities: Type[Entity]):
"""
This decorator is used to register a plugin/backend class as a
manager of one or more types of entities.
"""
def wrapper(plugin: Type[Plugin]):
init = plugin.__init__
def __init__(self, *args, **kwargs):
for entity_type in entities:
register_entity_plugin(entity_type, self)
init(self, *args, **kwargs)
plugin.__init__ = __init__ # type: ignore
# Inject the EntityManagerMixin
if EntityManagerMixin not in plugin.__bases__:
plugin.__bases__ = (EntityManagerMixin,) + plugin.__bases__
return plugin
return wrapper

View file

@ -1,53 +0,0 @@
from abc import ABC, abstractmethod
from platypush.entities import manages
from platypush.entities.lights import Light
from platypush.plugins import Plugin, action
@manages(Light)
class LightPlugin(Plugin, ABC):
"""
Abstract plugin to interface your logic with lights/bulbs.
"""
@action
@abstractmethod
def on(self, lights=None, *args, **kwargs):
"""Turn the light on"""
raise NotImplementedError()
@action
@abstractmethod
def off(self, lights=None, *args, **kwargs):
"""Turn the light off"""
raise NotImplementedError()
@action
@abstractmethod
def toggle(self, lights=None, *args, **kwargs):
"""Toggle the light status (on/off)"""
raise NotImplementedError()
@action
@abstractmethod
def set_lights(self, lights=None, *args, **kwargs):
"""
Set a set of properties on a set of lights.
:param light: List of lights to set. Each item can represent a light
name or ID.
:param kwargs: key-value list of the parameters to set.
"""
raise NotImplementedError()
@action
@abstractmethod
def status(self, *args, **kwargs):
"""
Get the current status of the lights.
"""
raise NotImplementedError()
# vim:sw=4:ts=4:et:

View file

@ -4,22 +4,29 @@ import time
from enum import Enum
from threading import Thread, Event
from typing import Iterable, Union, Mapping, Any, Set
from typing import (
Any,
Collection,
Dict,
Iterable,
Mapping,
Set,
Union,
)
from platypush.context import get_bus
from platypush.entities import Entity
from platypush.entities import Entity, LightEntityManager
from platypush.entities.lights import Light as LightEntity
from platypush.message.event.light import (
LightAnimationStartedEvent,
LightAnimationStoppedEvent,
LightStatusChangeEvent,
)
from platypush.plugins import action, RunnablePlugin
from platypush.plugins.light import LightPlugin
from platypush.plugins import RunnablePlugin, action
from platypush.utils import set_thread_name
class LightHuePlugin(RunnablePlugin, LightPlugin):
class LightHuePlugin(RunnablePlugin, LightEntityManager):
"""
Philips Hue lights plugin.
@ -47,14 +54,22 @@ class LightHuePlugin(RunnablePlugin, LightPlugin):
_UNINITIALIZED_BRIDGE_ERR = 'The Hue bridge is not initialized'
class Animation(Enum):
"""
Inner class to model light animations.
"""
COLOR_TRANSITION = 'color_transition'
BLINK = 'blink'
def __eq__(self, other):
"""
Check if the configuration of two light animations matches.
"""
if isinstance(other, str):
return self.value == other
elif isinstance(other, self.__class__):
if isinstance(other, self.__class__):
return self == other
return False
def __init__(self, bridge, lights=None, groups=None, poll_seconds: float = 20.0):
"""
@ -76,14 +91,14 @@ class LightHuePlugin(RunnablePlugin, LightPlugin):
self.bridge_address = bridge
self.bridge = None
self.logger.info(
'Initializing Hue lights plugin - bridge: "{}"'.format(self.bridge_address)
'Initializing Hue lights plugin - bridge: "%s"', self.bridge_address
)
self.connect()
self.lights = set()
self.groups = set()
self.poll_seconds = poll_seconds
self._cached_lights = {}
self._cached_lights: Dict[str, dict] = {}
if lights:
self.lights = set(lights)
@ -94,10 +109,10 @@ class LightHuePlugin(RunnablePlugin, LightPlugin):
self.lights = {light['name'] for light in self._get_lights().values()}
self.animation_thread = None
self.animations = {}
self.animations: Dict[str, dict] = {}
self._animation_stop = Event()
self._init_animations()
self.logger.info(f'Configured lights: {self.lights}')
self.logger.info('Configured lights: %s', self.lights)
def _expand_groups(self, groups: Iterable[str]) -> Set[str]:
lights = set()
@ -147,7 +162,7 @@ class LightHuePlugin(RunnablePlugin, LightPlugin):
self.bridge = Bridge(self.bridge_address)
success = True
except PhueRegistrationException as e:
self.logger.warning('Bridge registration error: {}'.format(str(e)))
self.logger.warning('Bridge registration error: %s', e)
if n_tries >= self._MAX_RECONNECT_TRIES:
self.logger.error(
@ -392,7 +407,7 @@ class LightHuePlugin(RunnablePlugin, LightPlugin):
return self._get_lights(publish_entities=True)
@action
def set_lights(self, lights, **kwargs):
def set_lights(self, lights, *_, **kwargs): # pylint: disable=arguments-differ
"""
Set a set of properties on a set of lights.
@ -404,7 +419,7 @@ class LightHuePlugin(RunnablePlugin, LightPlugin):
{
"type": "request",
"action": "light.hue.set_light",
"action": "light.hue.set_lights",
"args": {
"lights": ["Bulb 1", "Bulb 2"],
"sat": 255
@ -434,7 +449,10 @@ class LightHuePlugin(RunnablePlugin, LightPlugin):
for arg, value in kwargs.items():
args += [arg, value]
self.bridge.set_light(lights, *args)
assert len(args) > 1, 'Not enough parameters passed to set_lights'
param = args.pop(0)
value = args.pop(0)
self.bridge.set_light(lights, param, value, *args)
return self._get_lights(publish_entities=True)
@action
@ -442,8 +460,9 @@ class LightHuePlugin(RunnablePlugin, LightPlugin):
"""
Set a group (or groups) property.
:param group: Group or groups to set. It can be a string representing the
group name, a group object, a list of strings, or a list of group objects.
:param group: Group or groups to set. Can be a string representing the
group name, a group object, a list of strings, or a list of group
objects.
:param kwargs: key-value list of parameters to set.
Example call::
@ -464,7 +483,9 @@ class LightHuePlugin(RunnablePlugin, LightPlugin):
self.bridge.set_group(group, **kwargs)
@action
def on(self, lights=None, groups=None, **kwargs):
def on( # pylint: disable=arguments-differ
self, lights=None, groups=None, **kwargs
):
"""
Turn lights/groups on.
@ -479,7 +500,9 @@ class LightHuePlugin(RunnablePlugin, LightPlugin):
return self._exec('on', True, lights=lights, groups=groups, **kwargs)
@action
def off(self, lights=None, groups=None, **kwargs):
def off( # pylint: disable=arguments-differ
self, lights=None, groups=None, **kwargs
):
"""
Turn lights/groups off.
@ -494,7 +517,9 @@ class LightHuePlugin(RunnablePlugin, LightPlugin):
return self._exec('on', False, lights=lights, groups=groups, **kwargs)
@action
def toggle(self, lights=None, groups=None, **kwargs):
def toggle( # pylint: disable=arguments-differ
self, lights=None, groups=None, **kwargs
):
"""
Toggle lights/groups on/off.
@ -857,34 +882,42 @@ class LightHuePlugin(RunnablePlugin, LightPlugin):
:type duration: float
:param hue_range: If you selected a ``color_transition``, this will
specify the hue range of your color ``color_transition``. Default: [0, 65535]
specify the hue range of your color ``color_transition``.
Default: [0, 65535]
:type hue_range: list[int]
:param sat_range: If you selected a color ``color_transition``, this
will specify the saturation range of your color ``color_transition``.
Default: [0, 255]
will specify the saturation range of your color
``color_transition``. Default: [0, 255]
:type sat_range: list[int]
:param bri_range: If you selected a color ``color_transition``, this
will specify the brightness range of your color ``color_transition``.
Default: [254, 255] :type bri_range: list[int]
will specify the brightness range of your color
``color_transition``. Default: [254, 255]
:type bri_range: list[int]
:param lights: Lights to control (names, IDs or light objects). Default: plugin default lights
:param groups: Groups to control (names, IDs or group objects). Default: plugin default groups
:param lights: Lights to control (names, IDs or light objects).
Default: plugin default lights
:param groups: Groups to control (names, IDs or group objects).
Default: plugin default groups
:param hue_step: If you selected a color ``color_transition``, this
will specify by how much the color hue will change between iterations.
Default: 1000 :type hue_step: int
will specify by how much the color hue will change between
iterations. Default: 1000
:type hue_step: int
:param sat_step: If you selected a color ``color_transition``, this
will specify by how much the saturation will change between iterations.
Default: 2 :type sat_step: int
will specify by how much the saturation will change
between iterations. Default: 2
:type sat_step: int
:param bri_step: If you selected a color ``color_transition``, this
will specify by how much the brightness will change between iterations.
Default: 1 :type bri_step: int
Default: 1
:type bri_step: int
:param transition_seconds: Time between two transitions or blinks in seconds. Default: 1.0
:param transition_seconds: Time between two transitions or blinks in
seconds. Default: 1.0
:type transition_seconds: float
"""
@ -1028,11 +1061,11 @@ class LightHuePlugin(RunnablePlugin, LightPlugin):
try:
if animation == self.Animation.COLOR_TRANSITION:
for (light, attrs) in lights.items():
self.logger.debug('Setting {} to {}'.format(light, attrs))
self.logger.debug('Setting %s to %s', lights, attrs)
self.bridge.set_light(light, attrs)
elif animation == self.Animation.BLINK:
conf = lights[list(lights.keys())[0]]
self.logger.debug('Setting lights to {}'.format(conf))
self.logger.debug('Setting lights to %s', conf)
if groups:
self.bridge.set_group(
@ -1074,7 +1107,7 @@ class LightHuePlugin(RunnablePlugin, LightPlugin):
def transform_entities(
self, entities: Union[Iterable[Union[dict, Entity]], Mapping[Any, dict]]
) -> Iterable[Entity]:
) -> Collection[Entity]:
new_entities = []
if isinstance(entities, dict):
entities = [{'id': id, **e} for id, e in entities.items()]
@ -1108,10 +1141,7 @@ class LightHuePlugin(RunnablePlugin, LightPlugin):
'hue_max': self.MAX_HUE,
}
if entity.get('state', {}).get('hue') is not None
else {
'hue_min': None,
'hue_max': None,
}
else {}
),
**(
{
@ -1119,10 +1149,7 @@ class LightHuePlugin(RunnablePlugin, LightPlugin):
'saturation_max': self.MAX_SAT,
}
if entity.get('state', {}).get('sat') is not None
else {
'saturation_min': None,
'saturation_max': None,
}
else {}
),
**(
{
@ -1130,10 +1157,7 @@ class LightHuePlugin(RunnablePlugin, LightPlugin):
'brightness_max': self.MAX_BRI,
}
if entity.get('state', {}).get('bri') is not None
else {
'brightness_min': None,
'brightness_max': None,
}
else {}
),
**(
{
@ -1141,15 +1165,12 @@ class LightHuePlugin(RunnablePlugin, LightPlugin):
'temperature_max': self.MAX_CT,
}
if entity.get('state', {}).get('ct') is not None
else {
'temperature_min': None,
'temperature_max': None,
}
else {}
),
)
)
return super().transform_entities(new_entities) # type: ignore
return new_entities
def _get_lights(self, publish_entities=False) -> dict:
assert self.bridge, self._UNINITIALIZED_BRIDGE_ERR
@ -1171,7 +1192,7 @@ class LightHuePlugin(RunnablePlugin, LightPlugin):
return {id: scene for id, scene in scenes.items() if not scene.get('recycle')}
@action
def status(self) -> Iterable[LightEntity]:
def status(self, *_, **__) -> Iterable[LightEntity]:
lights = self.transform_entities(self._get_lights(publish_entities=True))
for light in lights:
light.id = light.external_id

View file

@ -259,7 +259,7 @@ class MqttPlugin(Plugin):
try:
msg = Message.build(json.loads(msg))
except Exception as e:
self.logger.debug(f'Not a valid JSON: {str(e)}')
self.logger.debug('Not a valid JSON: %s', e)
host = host or self.host
port = port or self.port or 1883
@ -303,7 +303,7 @@ class MqttPlugin(Plugin):
try:
client.loop_stop()
except Exception as e:
self.logger.warning(f'Could not stop client loop: {e}')
self.logger.warning('Could not stop client loop: %s', e)
client.disconnect()

View file

@ -1,6 +1,6 @@
import asyncio
from threading import RLock
from typing import Dict, Iterable, List, Optional, Tuple, Type, Union
from typing import Any, Dict, Iterable, List, Optional, Tuple, Type, Union
import aiohttp
from pysmartthings import (
@ -9,10 +9,19 @@ from pysmartthings import (
Command,
DeviceEntity,
DeviceStatus,
Location,
Room,
SmartThings,
)
from platypush.entities import Entity, manages
from platypush.entities import (
DimmerEntityManager,
Entity,
EnumSwitchEntityManager,
LightEntityManager,
SensorEntityManager,
SwitchEntityManager,
)
from platypush.entities.devices import Device
from platypush.entities.dimmers import Dimmer
from platypush.entities.lights import Light
@ -24,8 +33,14 @@ from platypush.utils import camel_case_to_snake_case
from ._mappers import DeviceMapper, device_mappers
@manages(Device, Dimmer, EnumSwitch, Light, Sensor, Switch)
class SmartthingsPlugin(RunnablePlugin):
class SmartthingsPlugin(
RunnablePlugin,
DimmerEntityManager,
EnumSwitchEntityManager,
LightEntityManager,
SensorEntityManager,
SwitchEntityManager,
):
"""
Plugin to interact with devices and locations registered to a Samsung SmartThings account.
@ -49,17 +64,17 @@ class SmartthingsPlugin(RunnablePlugin):
self._refresh_lock = RLock()
self._execute_lock = RLock()
self._locations = []
self._devices = []
self._rooms_by_location = {}
self._locations: List[Location] = []
self._devices: List[DeviceEntity] = []
self._rooms_by_location: Dict[str, Room] = {}
self._locations_by_id = {}
self._locations_by_name = {}
self._devices_by_id = {}
self._devices_by_name = {}
self._rooms_by_id = {}
self._rooms_by_location_and_id = {}
self._rooms_by_location_and_name = {}
self._locations_by_id: Dict[str, Location] = {}
self._locations_by_name: Dict[str, Location] = {}
self._devices_by_id: Dict[str, DeviceEntity] = {}
self._devices_by_name: Dict[str, DeviceEntity] = {}
self._rooms_by_id: Dict[str, Room] = {}
self._rooms_by_location_and_id: Dict[str, Dict[str, Room]] = {}
self._rooms_by_location_and_name: Dict[str, Dict[str, Room]] = {}
self._entities_by_id: Dict[str, Entity] = {}
async def _refresh_locations(self, api):
@ -308,10 +323,13 @@ class SmartthingsPlugin(RunnablePlugin):
):
self.refresh_info()
location = self._locations_by_id.get(
location_id, self._locations_by_name.get(name)
)
assert location, 'Location {} not found'.format(location_id or name)
location: Optional[Dict[str, Any]] = {}
if location_id:
location = self._locations_by_id.get(location_id)
elif name:
location = self._locations_by_name.get(name)
assert location, f'Location {location_id or name} not found'
return self._location_to_dict(location)
def _get_device(self, device: str) -> DeviceEntity:
@ -321,7 +339,7 @@ class SmartthingsPlugin(RunnablePlugin):
def _to_device_and_property(device: str) -> Tuple[str, Optional[str]]:
tokens = device.split(':')
if len(tokens) > 1:
return tuple(tokens[:2])
return (tokens[0], tokens[1])
return tokens[0], None
def _get_existing_and_missing_devices(
@ -397,9 +415,7 @@ class SmartthingsPlugin(RunnablePlugin):
assert (
ret
), 'The command {capability}={command} failed on device {device}'.format(
capability=capability, command=command, device=device_id
)
), f'The command {capability}={command} failed on device {device_id}'
await self._get_device_status(api, device_id, publish_entities=True)
@ -455,7 +471,9 @@ class SmartthingsPlugin(RunnablePlugin):
loop.stop()
@staticmethod
def _property_to_entity_name(property: str) -> str:
def _property_to_entity_name(
property: str,
) -> str: # pylint: disable=redefined-builtin
return ' '.join(
[
t[:1].upper() + t[1:]
@ -464,7 +482,7 @@ class SmartthingsPlugin(RunnablePlugin):
)
@classmethod
def _to_entity(
def _to_entity( # pylint: disable=redefined-builtin
cls, device: DeviceEntity, property: str, entity_type: Type[Entity], **kwargs
) -> Entity:
return entity_type(
@ -669,7 +687,7 @@ class SmartthingsPlugin(RunnablePlugin):
# Fail if some devices haven't been found after refreshing
assert (
not missing_device_ids
), 'Could not find the following devices: {}'.format(list(missing_device_ids))
), f'Could not find the following devices: {list(missing_device_ids)}'
async with aiohttp.ClientSession(timeout=self._timeout) as session:
api = SmartThings(session, self._access_token)
@ -682,12 +700,11 @@ class SmartthingsPlugin(RunnablePlugin):
for device_id in device_ids
]
# noinspection PyTypeChecker
return await asyncio.gather(*status_tasks)
@action
def status(
self, device: Optional[Union[str, List[str]]] = None, publish_entities=True
def status( # pylint: disable=arguments-differ
self, device: Optional[Union[str, List[str]]] = None, publish_entities=True, **_
) -> List[dict]:
"""
Refresh and return the status of one or more devices.
@ -715,7 +732,7 @@ class SmartthingsPlugin(RunnablePlugin):
if not device:
self.refresh_info()
devices = self._devices_by_id.keys()
devices = list(self._devices_by_id.keys())
elif isinstance(device, str):
devices = [device]
else:
@ -734,7 +751,13 @@ class SmartthingsPlugin(RunnablePlugin):
loop.stop()
def _set_switch(self, device: str, value: Optional[bool] = None):
device, property = self._to_device_and_property(device)
(
device,
property,
) = self._to_device_and_property( # pylint: disable=redefined-builtin
device
)
if not property:
property = Attribute.switch
@ -744,6 +767,7 @@ class SmartthingsPlugin(RunnablePlugin):
if property == 'light':
property = 'switch'
else:
assert property, 'No property specified'
assert hasattr(
dev.status, property
), f'No such property on device "{dev.label}": "{property}"'
@ -760,7 +784,7 @@ class SmartthingsPlugin(RunnablePlugin):
return self.set_value(device, property, value)
@action
def on(self, device: str, *_, **__):
def on(self, device: str, *_, **__): # pylint: disable=arguments-differ
"""
Turn on a device with ``switch`` capability.
@ -769,7 +793,7 @@ class SmartthingsPlugin(RunnablePlugin):
return self._set_switch(device, True)
@action
def off(self, device: str, *_, **__):
def off(self, device: str, *_, **__): # pylint: disable=arguments-differ
"""
Turn off a device with ``switch`` capability.
@ -778,7 +802,7 @@ class SmartthingsPlugin(RunnablePlugin):
return self._set_switch(device, False)
@action
def toggle(self, device: str, *_, **__):
def toggle(self, device: str, *_, **__): # pylint: disable=arguments-differ
"""
Toggle a device with ``switch`` capability.
@ -799,7 +823,7 @@ class SmartthingsPlugin(RunnablePlugin):
"""
return self.set_value(device, Capability.switch_level, level, **kwargs)
def _set_value(
def _set_value( # pylint: disable=redefined-builtin
self, device: str, property: Optional[str] = None, data=None, **kwargs
):
if not property:
@ -830,14 +854,14 @@ class SmartthingsPlugin(RunnablePlugin):
device,
mapper.capability,
command,
args=mapper.set_value_args(data),
args=mapper.set_value_args(data), # type: ignore
**kwargs,
)
return self.status(device)
@action
def set_value(
def set_value( # pylint: disable=arguments-differ,redefined-builtin
self, device: str, property: Optional[str] = None, data=None, **kwargs
):
"""
@ -854,10 +878,10 @@ class SmartthingsPlugin(RunnablePlugin):
return self._set_value(device, property, data, **kwargs)
except Exception as e:
self.logger.exception(e)
raise AssertionError(e)
raise AssertionError(e) from e
@action
def set_lights(
def set_lights( # pylint: disable=arguments-differ,redefined-builtin
self,
lights: Iterable[str],
on: Optional[bool] = None,
@ -961,7 +985,7 @@ class SmartthingsPlugin(RunnablePlugin):
return self.status(publish_entities=False)
except Exception as e:
self.logger.exception(e)
self.logger.error(f'Could not refresh the status: {e}')
self.logger.error('Could not refresh the status: %s', e)
self.wait_stop(3 * (self.poll_interval or 5))
while not self.should_stop():

View file

@ -5,6 +5,7 @@ import threading
from queue import Queue
from typing import (
Any,
Collection,
Dict,
List,
Optional,
@ -13,7 +14,14 @@ from typing import (
Union,
)
from platypush.entities import Entity, manages
from platypush.entities import (
DimmerEntityManager,
Entity,
EnumSwitchEntityManager,
LightEntityManager,
SensorEntityManager,
SwitchEntityManager,
)
from platypush.entities.batteries import Battery
from platypush.entities.devices import Device
from platypush.entities.dimmers import Dimmer
@ -40,8 +48,15 @@ from platypush.plugins import RunnablePlugin
from platypush.plugins.mqtt import MqttPlugin, action
@manages(Battery, Device, Dimmer, Light, LinkQuality, Sensor, Switch)
class ZigbeeMqttPlugin(RunnablePlugin, MqttPlugin): # lgtm [py/missing-call-to-init]
class ZigbeeMqttPlugin(
RunnablePlugin,
MqttPlugin,
DimmerEntityManager,
EnumSwitchEntityManager,
LightEntityManager,
SensorEntityManager,
SwitchEntityManager,
): # lgtm [py/missing-call-to-init]
"""
This plugin allows you to interact with Zigbee devices over MQTT through any Zigbee sniffer and
`zigbee2mqtt <https://www.zigbee2mqtt.io/>`_.
@ -245,9 +260,9 @@ class ZigbeeMqttPlugin(RunnablePlugin, MqttPlugin): # lgtm [py/missing-call-to-
if option.get('property')
}
def transform_entities(self, devices):
def transform_entities(self, entities: Collection[dict]) -> List[Entity]:
compatible_entities = []
for dev in devices:
for dev in entities:
if not dev:
continue
@ -275,7 +290,7 @@ class ZigbeeMqttPlugin(RunnablePlugin, MqttPlugin): # lgtm [py/missing-call-to-
)
light_info = self._get_light_meta(dev)
dev_entities = [
dev_entities: List[Entity] = [
*self._get_sensors(dev, exposed, options),
*self._get_dimmers(dev, exposed, options),
*self._get_switches(dev, exposed, options),
@ -348,7 +363,7 @@ class ZigbeeMqttPlugin(RunnablePlugin, MqttPlugin): # lgtm [py/missing-call-to-
compatible_entities += dev_entities
return super().transform_entities(compatible_entities) # type: ignore
return compatible_entities
@staticmethod
def _get_device_url(device_info: dict) -> Optional[str]:
@ -400,7 +415,9 @@ class ZigbeeMqttPlugin(RunnablePlugin, MqttPlugin): # lgtm [py/missing-call-to-
try:
host = mqtt_args.pop('host')
port = mqtt_args.pop('port')
client = self._get_client(**mqtt_args)
client = self._get_client( # pylint: disable=unexpected-keyword-arg
**mqtt_args
)
client.on_message = _on_message()
client.connect(host, port, keepalive=timeout)
client.subscribe(self.base_topic + '/bridge/#')
@ -444,7 +461,8 @@ class ZigbeeMqttPlugin(RunnablePlugin, MqttPlugin): # lgtm [py/missing-call-to-
@staticmethod
def _parse_response(response: Union[dict, Response]) -> dict:
if isinstance(response, Response):
response = dict(response.output)
rs: dict = response.output # type: ignore
response = rs
assert response.get('status') != 'error', response.get(
'error', 'zigbee2mqtt error'
@ -873,7 +891,7 @@ class ZigbeeMqttPlugin(RunnablePlugin, MqttPlugin): # lgtm [py/missing-call-to-
return name == device.get('friendly_name') or name == device.get('ieee_address')
@action
def device_get(
def device_get( # pylint: disable=redefined-builtin
self, device: str, property: Optional[str] = None, **kwargs
) -> Dict[str, Any]:
"""
@ -994,7 +1012,7 @@ class ZigbeeMqttPlugin(RunnablePlugin, MqttPlugin): # lgtm [py/missing-call-to-
return self.devices_get([device] if device else None, *args, **kwargs)
@action
def device_set(
def device_set( # pylint: disable=redefined-builtin
self,
device: str,
property: Optional[str] = None,
@ -1054,7 +1072,7 @@ class ZigbeeMqttPlugin(RunnablePlugin, MqttPlugin): # lgtm [py/missing-call-to-
return properties
@action
def set_value(
def set_value( # pylint: disable=redefined-builtin,arguments-differ
self, device: str, property: Optional[str] = None, data=None, **kwargs
):
"""
@ -1069,7 +1087,7 @@ class ZigbeeMqttPlugin(RunnablePlugin, MqttPlugin): # lgtm [py/missing-call-to-
:meth:`platypush.plugins.mqtt.MqttPlugin.publish`` (default: query
the default configured device).
"""
dev, prop = self._ieee_address(device, with_property=True)
dev, prop = self._ieee_address_and_property(device)
if not property:
property = prop
@ -1273,7 +1291,9 @@ class ZigbeeMqttPlugin(RunnablePlugin, MqttPlugin): # lgtm [py/missing-call-to-
}
@action
def group_add(self, name: str, id: Optional[int] = None, **kwargs):
def group_add( # pylint: disable=redefined-builtin
self, name: str, id: Optional[int] = None, **kwargs
):
"""
Add a new group.
@ -1301,7 +1321,9 @@ class ZigbeeMqttPlugin(RunnablePlugin, MqttPlugin): # lgtm [py/missing-call-to-
)
@action
def group_get(self, group: str, property: Optional[str] = None, **kwargs) -> dict:
def group_get( # pylint: disable=redefined-builtin
self, group: str, property: Optional[str] = None, **kwargs
) -> dict:
"""
Get one or more properties of a group. The compatible properties vary depending on the devices on the group.
For example, a light bulb may have the "``state``" (with values ``"ON"`` and ``"OFF"``) and "``brightness``"
@ -1329,9 +1351,10 @@ class ZigbeeMqttPlugin(RunnablePlugin, MqttPlugin): # lgtm [py/missing-call-to-
return properties
# noinspection PyShadowingBuiltins,DuplicatedCode
@action
def group_set(self, group: str, property: str, value: Any, **kwargs):
def group_set( # pylint: disable=redefined-builtin
self, group: str, property: str, value: Any, **kwargs
):
"""
Set a properties on a group. The compatible properties vary depending on the devices on the group.
For example, a light bulb may have the "``state``" (with values ``"ON"`` and ``"OFF"``) and "``brightness``"
@ -1501,7 +1524,9 @@ class ZigbeeMqttPlugin(RunnablePlugin, MqttPlugin): # lgtm [py/missing-call-to-
)
@action
def on(self, device, *_, **__):
def on( # pylint: disable=redefined-builtin,arguments-differ
self, device, *_, **__
):
"""
Turn on/set to true a switch, a binary property or an option.
"""
@ -1511,7 +1536,9 @@ class ZigbeeMqttPlugin(RunnablePlugin, MqttPlugin): # lgtm [py/missing-call-to-
)
@action
def off(self, device, *_, **__):
def off( # pylint: disable=redefined-builtin,arguments-differ
self, device, *_, **__
):
"""
Turn off/set to false a switch, a binary property or an option.
"""
@ -1521,7 +1548,9 @@ class ZigbeeMqttPlugin(RunnablePlugin, MqttPlugin): # lgtm [py/missing-call-to-
)
@action
def toggle(self, device, *_, **__):
def toggle( # pylint: disable=redefined-builtin,arguments-differ
self, device, *_, **__
):
"""
Toggles the state of a switch, a binary property or an option.
"""
@ -1540,7 +1569,7 @@ class ZigbeeMqttPlugin(RunnablePlugin, MqttPlugin): # lgtm [py/missing-call-to-
)
def _get_switch_info(self, name: str) -> Tuple[str, dict]:
name, prop = self._ieee_address(name, with_property=True)
name, prop = self._ieee_address_and_property(name)
if not prop or prop == 'light':
prop = 'state'
@ -1548,13 +1577,13 @@ class ZigbeeMqttPlugin(RunnablePlugin, MqttPlugin): # lgtm [py/missing-call-to-
assert device_info, f'No such device: {name}'
name = self._preferred_name(device_info)
property = self._get_properties(device_info).get(prop)
prop = self._get_properties(device_info).get(prop)
option = self._get_options(device_info).get(prop)
if option:
return name, option
assert property, f'No such property on device {name}: {prop}'
return name, property
assert prop, f'No such property on device {name}: {prop}'
return name, prop
@staticmethod
def _is_read_only(feature: dict) -> bool:
@ -1575,9 +1604,9 @@ class ZigbeeMqttPlugin(RunnablePlugin, MqttPlugin): # lgtm [py/missing-call-to-
return bool(feature.get('access', 0) & 4) == 0
@staticmethod
def _ieee_address(
device: Union[dict, str], with_property=False
) -> Union[str, Tuple[str, Optional[str]]]:
def _ieee_address_and_property(
device: Union[dict, str]
) -> Tuple[str, Optional[str]]:
# Entity value IDs are stored in the `<address>:<property>`
# format. Therefore, we need to split by `:` if we want to
# retrieve the original address.
@ -1589,13 +1618,13 @@ class ZigbeeMqttPlugin(RunnablePlugin, MqttPlugin): # lgtm [py/missing-call-to-
# IEEE address + property format
if re.search(r'^0x[0-9a-fA-F]{16}:', dev):
parts = dev.split(':')
return (
(parts[0], parts[1] if len(parts) > 1 else None)
if with_property
else parts[0]
)
return (parts[0], parts[1] if len(parts) > 1 else None)
return (dev, None) if with_property else dev
return (dev, None)
@classmethod
def _ieee_address(cls, device: Union[dict, str]) -> str:
return cls._ieee_address_and_property(device)[0]
@classmethod
def _get_switches(
@ -1733,7 +1762,7 @@ class ZigbeeMqttPlugin(RunnablePlugin, MqttPlugin): # lgtm [py/missing-call-to-
]
@classmethod
def _to_entity(
def _to_entity( # pylint: disable=redefined-builtin
cls,
entity_type: Type[Entity],
device_info: dict,
@ -1867,7 +1896,7 @@ class ZigbeeMqttPlugin(RunnablePlugin, MqttPlugin): # lgtm [py/missing-call-to-
return {}
@action
def set_lights(self, lights, **kwargs):
def set_lights(self, *_, lights, **kwargs):
"""
Set the state for one or more Zigbee lights.
"""

View file

@ -1,16 +1,25 @@
from abc import ABC, abstractmethod
from typing import Any, Dict, Optional, List, Union
from platypush.entities import manages
from platypush.entities.batteries import Battery
from platypush.entities.dimmers import Dimmer
from platypush.entities.lights import Light
from platypush.entities.switches import Switch
from platypush.entities import (
DimmerEntityManager,
EnumSwitchEntityManager,
LightEntityManager,
SensorEntityManager,
SwitchEntityManager,
)
from platypush.plugins import Plugin, action
@manages(Battery, Dimmer, Light, Switch)
class ZwaveBasePlugin(Plugin, ABC):
class ZwaveBasePlugin(
DimmerEntityManager,
EnumSwitchEntityManager,
LightEntityManager,
SensorEntityManager,
SwitchEntityManager,
Plugin,
ABC,
):
"""
Base class for Z-Wave plugins.
"""
@ -27,7 +36,7 @@ class ZwaveBasePlugin(Plugin, ABC):
@abstractmethod
@action
def status(self) -> Dict[str, Any]:
def status(self) -> Dict[str, Any]: # pylint: disable=arguments-differ
"""
Get the status of the controller.
"""
@ -316,7 +325,7 @@ class ZwaveBasePlugin(Plugin, ABC):
@abstractmethod
@action
def set_value(
def set_value( # pylint: disable=arguments-differ
self,
data,
value_id: Optional[int] = None,
@ -864,7 +873,7 @@ class ZwaveBasePlugin(Plugin, ABC):
@abstractmethod
@action
def on(self, device: str, *args, **kwargs):
def on(self, device: str, *args, **kwargs): # pylint: disable=arguments-differ
"""
Turn on a switch on a device.
@ -874,7 +883,7 @@ class ZwaveBasePlugin(Plugin, ABC):
@abstractmethod
@action
def off(self, device: str, *args, **kwargs):
def off(self, device: str, *args, **kwargs): # pylint: disable=arguments-differ
"""
Turn off a switch on a device.
@ -884,7 +893,7 @@ class ZwaveBasePlugin(Plugin, ABC):
@abstractmethod
@action
def toggle(self, device: str, *args, **kwargs):
def toggle(self, device: str, *args, **kwargs): # pylint: disable=arguments-differ
"""
Toggle a switch on a device.

View file

@ -8,6 +8,7 @@ from threading import Timer
from typing import (
Any,
Callable,
Collection,
Dict,
Iterable,
List,
@ -204,13 +205,13 @@ class ZwaveMqttPlugin(MqttPlugin, RunnablePlugin, ZwaveBasePlugin):
return rs
def _api_request(self, api: str, *args, **kwargs):
def _api_request(self, api: str, *args: Any, **kwargs):
if len(args) == 1 and isinstance(args[0], dict):
args = args[0]
args = args[0] # type: ignore
payload = json.dumps({'args': args})
ret = self._parse_response(
self.publish( # type: ignore[reportGeneralTypeIssues]
self.publish(
topic=self._api_topic(api) + '/set',
msg=payload,
reply_topic=self._api_topic(api),
@ -402,7 +403,7 @@ class ZwaveMqttPlugin(MqttPlugin, RunnablePlugin, ZwaveBasePlugin):
'product_id': f'0x{node["productId"]:04x}'
if node.get('productId')
else None,
'product_type': '0x{:04x}'.format(node['productType'])
'product_type': f'0x{node["productType"]:04x}'
if node.get('productType')
else None,
'product_name': ' '.join(
@ -506,7 +507,7 @@ class ZwaveMqttPlugin(MqttPlugin, RunnablePlugin, ZwaveBasePlugin):
if not nodes:
nodes = self.get_nodes().output # type: ignore[reportGeneralTypeIssues]
assert nodes, 'No nodes found on the network'
nodes = nodes.values()
nodes = nodes.values() # type: ignore
if value_id:
values = [
@ -528,7 +529,7 @@ class ZwaveMqttPlugin(MqttPlugin, RunnablePlugin, ZwaveBasePlugin):
if value['label']:
self._values_cache['by_label'][value['label']] = value
self.publish_entities([self._to_current_value(value)]) # type: ignore
self.publish_entities([self._to_current_value(value)])
return value
@staticmethod
@ -683,10 +684,10 @@ class ZwaveMqttPlugin(MqttPlugin, RunnablePlugin, ZwaveBasePlugin):
args['updated_at'] = value['last_update']
return args
def transform_entities(self, values: Iterable[dict]):
entities = []
def transform_entities(self, entities: Iterable[dict]) -> Collection[Entity]:
transformed_entities = []
for value in values:
for value in entities:
if not value or self._matches_classes(value, *self._ignored_entity_classes):
continue
@ -739,10 +740,10 @@ class ZwaveMqttPlugin(MqttPlugin, RunnablePlugin, ZwaveBasePlugin):
entity_args.update(sensor_args)
if entity_type:
entities.append(entity_type(**entity_args))
transformed_entities.append(entity_type(**entity_args))
self._process_parent_entities(values, entities)
return super().transform_entities(entities) # type: ignore
self._process_parent_entities(entities, transformed_entities)
return transformed_entities
def _process_parent_entities(
self, values: Iterable[Mapping], entities: List[Entity]
@ -867,7 +868,7 @@ class ZwaveMqttPlugin(MqttPlugin, RunnablePlugin, ZwaveBasePlugin):
else self.get_nodes(**kwargs).output.values() # type: ignore[reportGeneralTypeIssues]
)
command_classes: set = {
classes: set = {
command_class_by_name[command_name]
for command_name in (command_classes or [])
}
@ -879,10 +880,9 @@ class ZwaveMqttPlugin(MqttPlugin, RunnablePlugin, ZwaveBasePlugin):
continue
for value in node.get('values', {}).values():
if (
command_classes
and value.get('command_class') not in command_classes
) or (filter_callback and not filter_callback(value)):
if (classes and value.get('command_class') not in classes) or (
filter_callback and not filter_callback(value)
):
continue
value = self._to_current_value(value)
@ -940,7 +940,7 @@ class ZwaveMqttPlugin(MqttPlugin, RunnablePlugin, ZwaveBasePlugin):
:param kwargs: Extra arguments to be passed to :meth:`platypush.plugins.mqtt.MqttPlugin.publish``
(default: query the default configured device).
"""
msg_queue = queue.Queue()
msg_queue: queue.Queue = queue.Queue()
topic = f'{self.topic_prefix}/driver/status'
client = self._get_client(**kwargs)
@ -963,8 +963,8 @@ class ZwaveMqttPlugin(MqttPlugin, RunnablePlugin, ZwaveBasePlugin):
status = msg_queue.get(
block=True, timeout=kwargs.get('timeout', self.timeout)
)
except queue.Empty:
raise TimeoutError('The request timed out')
except queue.Empty as e:
raise TimeoutError('The request timed out') from e
finally:
client.loop_stop()
@ -973,7 +973,7 @@ class ZwaveMqttPlugin(MqttPlugin, RunnablePlugin, ZwaveBasePlugin):
}
@action
def add_node(
def add_node( # pylint: disable=arguments-differ
self,
name: str,
location: str = '',
@ -1056,7 +1056,7 @@ class ZwaveMqttPlugin(MqttPlugin, RunnablePlugin, ZwaveBasePlugin):
self._api_request('replaceFailedNode', node_id, **kwargs)
@action
def replication_send(self, **_):
def replication_send(self, **_): # pylint: disable=arguments-differ
"""
Send node information from the primary to the secondary controller (not implemented by zwavejs2mqtt).
"""
@ -1312,14 +1312,14 @@ class ZwaveMqttPlugin(MqttPlugin, RunnablePlugin, ZwaveBasePlugin):
)
@action
def set_node_product_name(self, **_):
def set_node_product_name(self, **_): # pylint: disable=arguments-differ
"""
Set the product name of a node (not implemented by zwavejs2mqtt).
"""
raise _NOT_IMPLEMENTED_ERR
@action
def set_node_manufacturer_name(self, **_):
def set_node_manufacturer_name(self, **_): # pylint: disable=arguments-differ
"""
Set the manufacturer name of a node (not implemented by zwavejs2mqtt).
"""
@ -1374,7 +1374,7 @@ class ZwaveMqttPlugin(MqttPlugin, RunnablePlugin, ZwaveBasePlugin):
raise _NOT_IMPLEMENTED_ERR
@action
def set_controller_name(self, **_):
def set_controller_name(self, **_): # pylint: disable=arguments-differ
"""
Set the name of the controller on the network (not implemented: use
:meth:`platypush.plugin.zwave.mqtt.ZwaveMqttPlugin.set_node_name` instead).
@ -1406,7 +1406,9 @@ class ZwaveMqttPlugin(MqttPlugin, RunnablePlugin, ZwaveBasePlugin):
raise _NOT_IMPLEMENTED_ERR
@action
def heal(self, timeout: Optional[int] = 60, **kwargs):
def heal(
self, *_, timeout: Optional[int] = 60, **kwargs
): # pylint: disable=arguments-differ
"""
Heal network by requesting nodes rediscover their neighbours.
@ -1421,7 +1423,7 @@ class ZwaveMqttPlugin(MqttPlugin, RunnablePlugin, ZwaveBasePlugin):
).start()
@action
def switch_all(self, **_):
def switch_all(self, **_): # pylint: disable=arguments-differ
"""
Switch all the connected devices on/off (not implemented).
"""
@ -1524,7 +1526,7 @@ class ZwaveMqttPlugin(MqttPlugin, RunnablePlugin, ZwaveBasePlugin):
)
@action
def set_lights(self, lights, **kwargs):
def set_lights(self, lights, **kwargs): # pylint: disable=arguments-differ
"""
Set the state for one or more Z-Wave lights.
"""
@ -1533,28 +1535,28 @@ class ZwaveMqttPlugin(MqttPlugin, RunnablePlugin, ZwaveBasePlugin):
self.set_value(light, kwargs)
@action
def set_value_label(self, **_):
def set_value_label(self, **_): # pylint: disable=arguments-differ
"""
Change the label/name of a value (not implemented by zwavejs2mqtt).
"""
raise _NOT_IMPLEMENTED_ERR
@action
def node_add_value(self, **_):
def node_add_value(self, **_): # pylint: disable=arguments-differ
"""
Add a value to a node (not implemented by zwavejs2mqtt).
"""
raise _NOT_IMPLEMENTED_ERR
@action
def node_remove_value(self, **_):
def node_remove_value(self, **_): # pylint: disable=arguments-differ
"""
Remove a value from a node (not implemented by zwavejs2mqtt).
"""
raise _NOT_IMPLEMENTED_ERR
@action
def node_heal(
def node_heal( # pylint: disable=arguments-differ
self, node_id: Optional[int] = None, node_name: Optional[str] = None, **kwargs
):
"""
@ -2133,7 +2135,7 @@ class ZwaveMqttPlugin(MqttPlugin, RunnablePlugin, ZwaveBasePlugin):
raise _NOT_IMPLEMENTED_ERR
@action
def add_node_to_group(
def add_node_to_group( # pylint: disable=arguments-differ
self,
group_id: Optional[str] = None,
node_id: Optional[int] = None,
@ -2157,7 +2159,7 @@ class ZwaveMqttPlugin(MqttPlugin, RunnablePlugin, ZwaveBasePlugin):
self._api_request('addAssociations', group['node_id'], group['index'], [assoc])
@action
def remove_node_from_group(
def remove_node_from_group( # pylint: disable=arguments-differ
self,
group_id: Optional[str] = None,
node_id: Optional[int] = None,
@ -2261,32 +2263,15 @@ class ZwaveMqttPlugin(MqttPlugin, RunnablePlugin, ZwaveBasePlugin):
self.set_value(data=value['data'], id_on_network=device, **kwargs)
return {
'name': '{} - {}'.format(
self._nodes_cache['by_id'][value['node_id']]['name'],
value.get('label', '[No Label]'),
'name': (
self._nodes_cache['by_id'][value['node_id']]['name']
+ ' - '
+ value.get('label', '[No Label]')
),
'on': value['data'],
'id': value['value_id'],
}
@property
def switches(self) -> List[dict]:
# Repopulate the nodes cache
self.get_nodes()
# noinspection PyUnresolvedReferences
devices = self.get_switches().output.values() # type: ignore[reportGeneralTypeIssues]
return [
{
'name': '{} - {}'.format(
self._nodes_cache['by_id'][dev['node_id']]['name'],
dev.get('label', '[No Label]'),
),
'on': dev['data'],
'id': dev['value_id'],
}
for dev in devices
]
def main(self):
from ._listener import ZwaveMqttListener