Legacy `sensor` backend replaced by an extended `sensor` runnable plugin.

This commit is contained in:
Fabio Manganiello 2023-04-01 19:24:35 +02:00
parent bf75eb73ac
commit bf4db76830
Signed by: blacklight
GPG Key ID: D90FBA7F76362774
2 changed files with 355 additions and 245 deletions

View File

@ -1,232 +0,0 @@
import time
from platypush.backend import Backend
from platypush.context import get_plugin
from platypush.message.event.sensor import SensorDataChangeEvent, \
SensorDataAboveThresholdEvent, SensorDataBelowThresholdEvent
class SensorBackend(Backend):
"""
Abstract backend for polling sensors.
Triggers:
* :class:`platypush.message.event.sensor.SensorDataChangeEvent` if the measurements of a sensor have changed
* :class:`platypush.message.event.sensor.SensorDataAboveThresholdEvent` if the measurements of a sensor have
gone above a configured threshold
* :class:`platypush.message.event.sensor.SensorDataBelowThresholdEvent` if the measurements of a sensor have
gone below a configured threshold
"""
default_tolerance = 1e-7
def __init__(self, plugin=None, plugin_args=None, thresholds=None, tolerance=default_tolerance, poll_seconds=None,
enabled_sensors=None, **kwargs):
"""
:param plugin: If set, then this plugin instance, referenced by plugin id, will be polled
through ``get_plugin()``. Example: ``'gpio.sensor.bme280'`` or ``'gpio.sensor.envirophat'``.
:type plugin: str
:param plugin_args: If plugin is set and its ``get_measurement()`` method accepts optional arguments, then you
can pass those arguments through ``plugin_args``.
:type plugin_args: dict
:param thresholds: Thresholds can be either a scalar value or a dictionary (e.g. ``{"temperature": 20.0}``).
Sensor threshold events will be fired when measurements get above or below these values.
Set it as a scalar if your get_measurement() code returns a scalar, as a dictionary if it returns a
dictionary of values. For instance, if your sensor code returns both humidity and temperature in a format
like ``{'humidity':60.0, 'temperature': 25.0}``, you'll want to set up a threshold on temperature with a
syntax like ``{'temperature':20.0}`` to trigger events when the temperature goes above/below 20 degrees.
:param tolerance: If set, then the sensor change events will be triggered only if the difference between
the new value and the previous value is higher than the specified tolerance. Example::
{
"temperature": 0.01, # Tolerance on the 2nd decimal digit
"humidity": 0.1 # Tolerance on the 1st decimal digit
}
:type tolerance: dict or float
:param poll_seconds: If set, the thread will wait for the specified number of seconds between a read and the
next one.
:type poll_seconds: float
:param enabled_sensors: If ``get_measurement()`` returns data in dict form, then ``enabled_sensors`` selects
which keys should be taken into account when monitoring for new events (e.g. "temperature" or "humidity").
:type enabled_sensors: dict (in the form ``name -> [True/False]``), set or list
"""
super().__init__(**kwargs)
self.data = None
self.plugin = plugin
self.plugin_args = plugin_args or {}
self.thresholds = thresholds
self.tolerance = tolerance
self.poll_seconds = poll_seconds
if isinstance(enabled_sensors, list):
enabled_sensors = set(enabled_sensors)
if isinstance(enabled_sensors, set):
enabled_sensors = {k: True for k in enabled_sensors}
self.enabled_sensors = enabled_sensors or {}
def get_measurement(self):
"""
Wrapper around ``plugin.get_measurement()`` that can filter events on specified enabled sensors data or on
specified tolerance values. It can be overridden by derived classes.
"""
if not self.plugin:
raise NotImplementedError('No plugin specified')
reload = False
success = False
data = None
while not success:
try:
plugin = get_plugin(self.plugin, reload=reload)
data = plugin.get_data(**self.plugin_args).output
if reload:
self.logger.info('Backend successfully restored')
success = True
except Exception as e:
self.logger.warning('Unexpected exception while getting data: {}'.format(str(e)))
self.logger.exception(e)
reload = True
time.sleep(5)
if self.enabled_sensors and data is not None:
data = {
sensor: data[sensor]
for sensor, enabled in self.enabled_sensors.items()
if enabled and sensor in data
}
return data
@staticmethod
def _get_value(value):
if isinstance(value, float) or isinstance(value, int) or isinstance(value, bool):
return value
return float(value)
def get_new_data(self, new_data):
if self.data is None or new_data is None:
return new_data
try:
# Scalar data case
new_data = self._get_value(new_data)
return new_data if abs(new_data - self.data) >= self.tolerance else None
except (ValueError, TypeError):
# If it's not a scalar then it should be a dict
assert isinstance(new_data, dict), 'Invalid type {} received for sensor data'.format(type(new_data))
ret = {}
for k, v in new_data.items():
if (v is None and self.data.get(k) is not None) \
or k not in self.data \
or self.tolerance is None:
ret[k] = v
continue
if v is None:
continue
tolerance = None
is_nan = False
old_v = None
try:
v = self._get_value(v)
old_v = self._get_value(self.data.get(k))
except (TypeError, ValueError):
is_nan = True
if not is_nan:
if isinstance(self.tolerance, dict):
tolerance = float(self.tolerance.get(k, self.default_tolerance))
else:
try:
tolerance = float(self.tolerance)
except (TypeError, ValueError):
pass
if tolerance is None or abs(v - old_v) >= tolerance:
ret[k] = v
elif k not in self.data or self.data[k] != v:
ret[k] = v
return ret
def on_stop(self):
super().on_stop()
if not self.plugin:
return
plugin = get_plugin(self.plugin)
if plugin and hasattr(plugin, 'close'):
plugin.close()
def process_data(self, data, new_data=None, **__):
if data is not None and data not in ({}, []):
self.bus.post(SensorDataChangeEvent(data=data, source=self.plugin or self.__class__.__name__))
def run(self):
super().run()
self.logger.info('Initialized {} sensor backend'.format(self.__class__.__name__))
while not self.should_stop():
try:
data = self.get_measurement()
new_data = self.get_new_data(data)
self.process_data(new_data)
data_below_threshold = {}
data_above_threshold = {}
if self.thresholds:
if isinstance(self.thresholds, dict) and isinstance(data, dict):
for (measure, thresholds) in self.thresholds.items():
if measure not in data:
continue
if not isinstance(thresholds, list):
thresholds = [thresholds]
for threshold in thresholds:
if data[measure] > threshold and (self.data is None or (
measure in self.data and self.data[measure] <= threshold)):
data_above_threshold[measure] = data[measure]
elif data[measure] < threshold and (self.data is None or (
measure in self.data and self.data[measure] >= threshold)):
data_below_threshold[measure] = data[measure]
if data_below_threshold:
self.bus.post(SensorDataBelowThresholdEvent(data=data_below_threshold))
if data_above_threshold:
self.bus.post(SensorDataAboveThresholdEvent(data=data_above_threshold))
self.data = data
if new_data:
if isinstance(new_data, dict):
for k, v in new_data.items():
self.data[k] = v
else:
self.data = new_data
except Exception as e:
self.logger.exception(e)
if self.poll_seconds:
time.sleep(self.poll_seconds)
# vim:sw=4:ts=4:et:

View File

@ -1,26 +1,335 @@
from abc import ABC, abstractmethod
from types import NoneType
from typing import Collection, List, Mapping, Optional, Tuple, Type, Union
from typing_extensions import override
from platypush.plugins import Plugin, action
from platypush.common.sensors import Numeric, SensorDataType
from platypush.context import get_bus
from platypush.entities import Entity
from platypush.entities.managers.sensors import SensorEntityManager
from platypush.message.event.sensor import (
SensorDataAboveThresholdEvent,
SensorDataBelowThresholdEvent,
SensorDataChangeEvent,
SensorDataEvent,
)
from platypush.plugins import RunnablePlugin, action
from platypush.utils import get_plugin_name_by_class
ThresholdType = Union[Numeric, Tuple[Numeric, Numeric]]
ThresholdConfiguration = Union[ThresholdType, Mapping[str, ThresholdType]]
class SensorPlugin(Plugin, ABC):
class SensorPlugin(RunnablePlugin, SensorEntityManager, ABC):
"""
Sensor abstract plugin. Any plugin that interacts with sensors
should implement this class (and the get_measurement() method)
should implement this class.
Triggers:
* :class:`platypush.message.event.sensor.SensorDataAboveThresholdEvent`
* :class:`platypush.message.event.sensor.SensorDataBelowThresholdEvent`
* :class:`platypush.message.event.sensor.SensorDataChangeEvent`
"""
def __init__(self, **kwargs):
_max_retry_secs = 60.0
"""
In case of failure, we apply an exponential back-off retry algorithm. This
is the maximum number of seconds that we should wait during these retries.
"""
def __init__(
self,
thresholds: Optional[ThresholdConfiguration] = None,
tolerance: SensorDataType = 0,
**kwargs,
):
"""
:param thresholds: A number, numeric pair or mapping of ``str`` to
number/numeric pair representing the thresholds for the sensor.
Examples:
.. code-block:: yaml
# Any value below 25 from any sensor will trigger a
# SensorDataBelowThresholdEvent, if the previous value was
# equal or above, and any value above 25 will trigger a
# SensorDataAboveThresholdEvent, if the previous value was
# equal or below
thresholds: 25.0
# Same as above, but the threshold is only applied to
# ``temperature`` readings
thresholds:
temperature: 25.0
# Any value below 20 from any sensor will trigger a
# SensorDataBelowThresholdEvent, if the previous value was
# equal or above, and any value above 25 will trigger a
# SensorDataAboveThresholdEvent, if the previous value was
# equal or below (hysteresis configuration with double
# threshold)
thresholds:
- 20.0
- 25.0
# Same as above, but the threshold is only applied to
# ``temperature`` readings
thresholds:
temperature:
- 20.0
- 25.0
:param tolerance: If set, then the sensor change events will be
triggered only if the difference between the new value and the
previous value is higher than the specified tolerance. For example,
if the sensor data is mapped to a dictionary::
{
"temperature": 0.01, # Tolerance on the 2nd decimal digit
"humidity": 0.1 # Tolerance on the 1st decimal digit
}
Or, if it's a raw scalar number::
0.1 # Tolerance on the 1st decimal digit
Or, if it's a list of values::
[
0.01, # Tolerance on the 2nd decimal digit for the first value
0.1 # Tolerance on the 1st decimal digit for the second value
]
"""
super().__init__(**kwargs)
self._tolerance = tolerance
self._thresholds = thresholds
self._last_measurement: Optional[SensorDataType] = None
""" Latest measurement from the sensor. """
def _has_changes_scalar(
self,
old_data: Union[int, float],
new_data: Union[int, float],
attr: Optional[str] = None,
index: Optional[int] = None,
) -> bool:
"""
Returns ``True`` if the new data has changes compared to the old data -
limited to numeric scalar values.
"""
if isinstance(self._tolerance, (int, float)):
tolerance = self._tolerance
elif isinstance(self._tolerance, dict) and attr:
tolerance = self._tolerance.get(attr, 0) # type: ignore
elif isinstance(self._tolerance, (list, tuple)) and index:
tolerance = self._tolerance[index]
else:
tolerance = 0
return abs(old_data - new_data) > tolerance
def _has_changes(
self,
old_data: Optional[SensorDataType],
new_data: Optional[SensorDataType],
attr: Optional[str] = None,
index: Optional[int] = None,
) -> bool:
"""
Returns ``True`` if the new data has changes compared to the old data.
It also applies the configured tolerance thresholds.
"""
# If there is no previous data, then the new data will always be a change
if old_data is None:
return True
# If the new data is missing, then we have no new changes
if new_data is None:
return False
# If the data is scalar, then run the comparison logic
if isinstance(old_data, (int, float)) and isinstance(new_data, (int, float)):
return self._has_changes_scalar(old_data, new_data, attr, index)
# If the data is dict-like, recursively call _has_changes on its attributes
if isinstance(old_data, dict) and isinstance(new_data, dict):
return any(
self._has_changes(old_data.get(attr), value, attr=attr) # type: ignore
for attr, value in new_data.items()
)
# If the data is list-like, recursively call _has_changes on its values
if isinstance(old_data, (list, tuple)) and isinstance(new_data, (list, tuple)):
return any(
self._has_changes(old_data[i], value, index=i)
for i, value in enumerate(new_data)
)
raise AssertionError(
f'Mismatching types for old_data and new_data: "{type(old_data)}" '
f'and "{type(new_data)}"'
)
def _process_scalar_threshold_events(
self,
old_data: Optional[Numeric],
new_data: Numeric,
attr: Optional[str] = None,
) -> List[SensorDataEvent]:
"""
Inner scalar processing for sensor above/below threshold events.
"""
event_types: List[Type[SensorDataEvent]] = []
event_args = {
'source': get_plugin_name_by_class(self.__class__),
}
# If we're mapping against a dict attribute, extract its thresholds,
# otherwise use the default configured thresholds
thresholds = (
self._thresholds.get(attr)
if attr and isinstance(self._thresholds, dict)
else self._thresholds
)
# Normalize low/high thresholds
low_t, high_t = (
sorted(thresholds[:2])
if isinstance(thresholds, (list, tuple))
else (thresholds, thresholds)
)
if low_t is None or high_t is None:
return []
assert isinstance(low_t, Numeric) and isinstance(
high_t, Numeric
), f'Non-numeric thresholds detected: "{low_t}" and "{high_t}"'
# Above threshold case
if (old_data is None or old_data <= high_t) and new_data > high_t:
event_types.append(SensorDataAboveThresholdEvent)
# Below threshold case
elif (old_data is None or old_data >= low_t) and new_data < low_t:
event_types.append(SensorDataBelowThresholdEvent)
return [
event_type(
data={attr: new_data} if attr else new_data,
**event_args,
)
for event_type in event_types
]
def _process_threshold_events(
self,
old_data: Optional[SensorDataType],
new_data: SensorDataType,
attr: Optional[str] = None,
) -> List[SensorDataEvent]:
"""
Processes sensor above/below threshold events.
"""
events: List[SensorDataEvent] = []
# If there are no configured thresholds, there's nothing to do
if self._thresholds in (None, {}, (), []):
return events
# Scalar case
if isinstance(old_data, (Numeric, NoneType)) and isinstance(new_data, Numeric):
return self._process_scalar_threshold_events(
old_data, new_data, attr # type: ignore
)
# From here on, threshold comparison only applies if both the old and
# new data is a str -> number mapping
if not (isinstance(old_data, (dict, NoneType)) and isinstance(new_data, dict)):
return events
# Recursively call _process_threshold_events on the data attributes
for attr, value in new_data.items(): # type: ignore
events.extend(
self._process_threshold_events(
old_data=(
old_data.get(attr) # type: ignore
if isinstance(old_data, dict)
else old_data
),
new_data=value,
attr=str(attr),
)
)
return events
def _process_sensor_events(
self, old_data: Optional[SensorDataType], new_data: Optional[SensorDataType]
):
"""
Given the previous and new measurement, it runs the comparison logic
against the configured tolerance values and thresholds, and it
processes the required sensor data change and above/below threshold
events.
"""
# If the new data is missing or there are no changes, there are no
# events to process
if new_data is None or not self._has_changes(old_data, new_data):
return
events = [
SensorDataChangeEvent(
data=new_data, # type: ignore
source=get_plugin_name_by_class(self.__class__),
),
*self._process_threshold_events(old_data, new_data),
]
for event in events:
get_bus().post(event)
self.publish_entities(new_data)
def _update_last_measurement(self, new_data: SensorDataType):
"""
Update the ``_last_measurement`` attribute with the newly acquired data.
"""
# If there is no last measurement, or either the new or old
# measurements are not dictionaries, then overwrite the previous data
# with the new data
if not (
isinstance(self._last_measurement, dict) and isinstance(new_data, dict)
):
self._last_measurement = new_data
# Otherwise, merge the old data with the new
self._last_measurement.update(new_data) # type: ignore
@override
@abstractmethod
def transform_entities(self, entities: SensorDataType) -> Collection[Entity]:
raise NotImplementedError()
@override
def publish_entities(
self, entities: SensorDataType, *args, **kwargs
) -> Collection[Entity]:
entities_args = [entities] if isinstance(entities, Numeric) else entities
return super().publish_entities(entities_args, *args, **kwargs) # type: ignore
@abstractmethod
@action
def get_measurement(self, *args, **kwargs):
def get_measurement(self, *args, **kwargs) -> SensorDataType:
"""
Implemented by the subclasses.
:returns: Either a raw scalar:
:returns: Either a raw scalar::
``output = 273.16``
output = 273.16
or a name-value dictionary with the values that have been read::
@ -31,7 +340,7 @@ class SensorPlugin(Plugin, ABC):
or a list of values::
[
output = [
0.01,
0.34,
0.53,
@ -39,18 +348,51 @@ class SensorPlugin(Plugin, ABC):
]
"""
raise NotImplementedError('get_measurement should be implemented in a derived class')
raise NotImplementedError()
@action
def get_data(self, *args, **kwargs):
"""
Alias for ``get_measurement``
(Deprecated) alias for :meth:`.get_measurement``
"""
return self.get_measurement(*args, **kwargs).output
return self.get_measurement(*args, **kwargs)
@action
def close(self):
pass
def status(self, *_, **__) -> Optional[SensorDataType]:
"""
Returns the latest read values and publishes the
:class:`platypush.message.event.entities.EntityUpdateEvent` events if
required.
"""
if self._last_measurement is not None:
self.publish_entities(self._last_measurement)
return self._last_measurement
@override
def main(self):
sleep_retry_secs = 1 # Exponential back-off
while not self.should_stop():
try:
new_data: SensorDataType = self.get_measurement().output # type: ignore
# Reset the exponential back-off retry counter in case of success
sleep_retry_secs = 1
except Exception as e:
self.logger.warning(
'Could not update the status: %s. Next retry in %d seconds',
e,
sleep_retry_secs,
)
self.wait_stop(sleep_retry_secs)
sleep_retry_secs = min(
sleep_retry_secs * 2,
self._max_retry_secs,
)
continue
self._process_sensor_events(self._last_measurement, new_data)
self._update_last_measurement(new_data)
self.wait_stop(self.poll_interval)
# vim:sw=4:ts=4:et: