From bf4db76830da38af2731d84dfa1837b471f5b6e2 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Sat, 1 Apr 2023 19:24:35 +0200 Subject: [PATCH] Legacy `sensor` backend replaced by an extended `sensor` runnable plugin. --- platypush/backend/sensor/__init__.py | 232 ----------------- platypush/plugins/sensor/__init__.py | 368 ++++++++++++++++++++++++++- 2 files changed, 355 insertions(+), 245 deletions(-) delete mode 100644 platypush/backend/sensor/__init__.py diff --git a/platypush/backend/sensor/__init__.py b/platypush/backend/sensor/__init__.py deleted file mode 100644 index a0c0cd4c..00000000 --- a/platypush/backend/sensor/__init__.py +++ /dev/null @@ -1,232 +0,0 @@ -import time - -from platypush.backend import Backend -from platypush.context import get_plugin -from platypush.message.event.sensor import SensorDataChangeEvent, \ - SensorDataAboveThresholdEvent, SensorDataBelowThresholdEvent - - -class SensorBackend(Backend): - """ - Abstract backend for polling sensors. - - Triggers: - - * :class:`platypush.message.event.sensor.SensorDataChangeEvent` if the measurements of a sensor have changed - * :class:`platypush.message.event.sensor.SensorDataAboveThresholdEvent` if the measurements of a sensor have - gone above a configured threshold - * :class:`platypush.message.event.sensor.SensorDataBelowThresholdEvent` if the measurements of a sensor have - gone below a configured threshold - - """ - - default_tolerance = 1e-7 - - def __init__(self, plugin=None, plugin_args=None, thresholds=None, tolerance=default_tolerance, poll_seconds=None, - enabled_sensors=None, **kwargs): - """ - :param plugin: If set, then this plugin instance, referenced by plugin id, will be polled - through ``get_plugin()``. Example: ``'gpio.sensor.bme280'`` or ``'gpio.sensor.envirophat'``. - :type plugin: str - - :param plugin_args: If plugin is set and its ``get_measurement()`` method accepts optional arguments, then you - can pass those arguments through ``plugin_args``. - :type plugin_args: dict - - :param thresholds: Thresholds can be either a scalar value or a dictionary (e.g. ``{"temperature": 20.0}``). - Sensor threshold events will be fired when measurements get above or below these values. - Set it as a scalar if your get_measurement() code returns a scalar, as a dictionary if it returns a - dictionary of values. For instance, if your sensor code returns both humidity and temperature in a format - like ``{'humidity':60.0, 'temperature': 25.0}``, you'll want to set up a threshold on temperature with a - syntax like ``{'temperature':20.0}`` to trigger events when the temperature goes above/below 20 degrees. - - :param tolerance: If set, then the sensor change events will be triggered only if the difference between - the new value and the previous value is higher than the specified tolerance. Example:: - - { - "temperature": 0.01, # Tolerance on the 2nd decimal digit - "humidity": 0.1 # Tolerance on the 1st decimal digit - } - - :type tolerance: dict or float - - :param poll_seconds: If set, the thread will wait for the specified number of seconds between a read and the - next one. - :type poll_seconds: float - - :param enabled_sensors: If ``get_measurement()`` returns data in dict form, then ``enabled_sensors`` selects - which keys should be taken into account when monitoring for new events (e.g. "temperature" or "humidity"). - :type enabled_sensors: dict (in the form ``name -> [True/False]``), set or list - """ - - super().__init__(**kwargs) - - self.data = None - self.plugin = plugin - self.plugin_args = plugin_args or {} - self.thresholds = thresholds - self.tolerance = tolerance - self.poll_seconds = poll_seconds - - if isinstance(enabled_sensors, list): - enabled_sensors = set(enabled_sensors) - if isinstance(enabled_sensors, set): - enabled_sensors = {k: True for k in enabled_sensors} - - self.enabled_sensors = enabled_sensors or {} - - def get_measurement(self): - """ - Wrapper around ``plugin.get_measurement()`` that can filter events on specified enabled sensors data or on - specified tolerance values. It can be overridden by derived classes. - """ - if not self.plugin: - raise NotImplementedError('No plugin specified') - - reload = False - success = False - data = None - - while not success: - try: - plugin = get_plugin(self.plugin, reload=reload) - data = plugin.get_data(**self.plugin_args).output - if reload: - self.logger.info('Backend successfully restored') - - success = True - except Exception as e: - self.logger.warning('Unexpected exception while getting data: {}'.format(str(e))) - self.logger.exception(e) - reload = True - time.sleep(5) - - if self.enabled_sensors and data is not None: - data = { - sensor: data[sensor] - for sensor, enabled in self.enabled_sensors.items() - if enabled and sensor in data - } - - return data - - @staticmethod - def _get_value(value): - if isinstance(value, float) or isinstance(value, int) or isinstance(value, bool): - return value - return float(value) - - def get_new_data(self, new_data): - if self.data is None or new_data is None: - return new_data - - try: - # Scalar data case - new_data = self._get_value(new_data) - return new_data if abs(new_data - self.data) >= self.tolerance else None - except (ValueError, TypeError): - # If it's not a scalar then it should be a dict - assert isinstance(new_data, dict), 'Invalid type {} received for sensor data'.format(type(new_data)) - - ret = {} - for k, v in new_data.items(): - if (v is None and self.data.get(k) is not None) \ - or k not in self.data \ - or self.tolerance is None: - ret[k] = v - continue - - if v is None: - continue - - tolerance = None - is_nan = False - old_v = None - - try: - v = self._get_value(v) - old_v = self._get_value(self.data.get(k)) - except (TypeError, ValueError): - is_nan = True - - if not is_nan: - if isinstance(self.tolerance, dict): - tolerance = float(self.tolerance.get(k, self.default_tolerance)) - else: - try: - tolerance = float(self.tolerance) - except (TypeError, ValueError): - pass - - if tolerance is None or abs(v - old_v) >= tolerance: - ret[k] = v - elif k not in self.data or self.data[k] != v: - ret[k] = v - - return ret - - def on_stop(self): - super().on_stop() - if not self.plugin: - return - - plugin = get_plugin(self.plugin) - if plugin and hasattr(plugin, 'close'): - plugin.close() - - def process_data(self, data, new_data=None, **__): - if data is not None and data not in ({}, []): - self.bus.post(SensorDataChangeEvent(data=data, source=self.plugin or self.__class__.__name__)) - - def run(self): - super().run() - self.logger.info('Initialized {} sensor backend'.format(self.__class__.__name__)) - - while not self.should_stop(): - try: - data = self.get_measurement() - new_data = self.get_new_data(data) - self.process_data(new_data) - - data_below_threshold = {} - data_above_threshold = {} - - if self.thresholds: - if isinstance(self.thresholds, dict) and isinstance(data, dict): - for (measure, thresholds) in self.thresholds.items(): - if measure not in data: - continue - - if not isinstance(thresholds, list): - thresholds = [thresholds] - - for threshold in thresholds: - if data[measure] > threshold and (self.data is None or ( - measure in self.data and self.data[measure] <= threshold)): - data_above_threshold[measure] = data[measure] - elif data[measure] < threshold and (self.data is None or ( - measure in self.data and self.data[measure] >= threshold)): - data_below_threshold[measure] = data[measure] - - if data_below_threshold: - self.bus.post(SensorDataBelowThresholdEvent(data=data_below_threshold)) - - if data_above_threshold: - self.bus.post(SensorDataAboveThresholdEvent(data=data_above_threshold)) - - self.data = data - - if new_data: - if isinstance(new_data, dict): - for k, v in new_data.items(): - self.data[k] = v - else: - self.data = new_data - except Exception as e: - self.logger.exception(e) - - if self.poll_seconds: - time.sleep(self.poll_seconds) - - -# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/sensor/__init__.py b/platypush/plugins/sensor/__init__.py index a60a8a9e..de61e9a9 100644 --- a/platypush/plugins/sensor/__init__.py +++ b/platypush/plugins/sensor/__init__.py @@ -1,26 +1,335 @@ from abc import ABC, abstractmethod +from types import NoneType +from typing import Collection, List, Mapping, Optional, Tuple, Type, Union +from typing_extensions import override -from platypush.plugins import Plugin, action +from platypush.common.sensors import Numeric, SensorDataType +from platypush.context import get_bus +from platypush.entities import Entity +from platypush.entities.managers.sensors import SensorEntityManager +from platypush.message.event.sensor import ( + SensorDataAboveThresholdEvent, + SensorDataBelowThresholdEvent, + SensorDataChangeEvent, + SensorDataEvent, +) +from platypush.plugins import RunnablePlugin, action +from platypush.utils import get_plugin_name_by_class + +ThresholdType = Union[Numeric, Tuple[Numeric, Numeric]] +ThresholdConfiguration = Union[ThresholdType, Mapping[str, ThresholdType]] -class SensorPlugin(Plugin, ABC): +class SensorPlugin(RunnablePlugin, SensorEntityManager, ABC): """ Sensor abstract plugin. Any plugin that interacts with sensors - should implement this class (and the get_measurement() method) + should implement this class. + + Triggers: + + * :class:`platypush.message.event.sensor.SensorDataAboveThresholdEvent` + * :class:`platypush.message.event.sensor.SensorDataBelowThresholdEvent` + * :class:`platypush.message.event.sensor.SensorDataChangeEvent` + """ - def __init__(self, **kwargs): + _max_retry_secs = 60.0 + """ + In case of failure, we apply an exponential back-off retry algorithm. This + is the maximum number of seconds that we should wait during these retries. + """ + + def __init__( + self, + thresholds: Optional[ThresholdConfiguration] = None, + tolerance: SensorDataType = 0, + **kwargs, + ): + """ + :param thresholds: A number, numeric pair or mapping of ``str`` to + number/numeric pair representing the thresholds for the sensor. + + Examples: + + .. code-block:: yaml + + # Any value below 25 from any sensor will trigger a + # SensorDataBelowThresholdEvent, if the previous value was + # equal or above, and any value above 25 will trigger a + # SensorDataAboveThresholdEvent, if the previous value was + # equal or below + thresholds: 25.0 + + # Same as above, but the threshold is only applied to + # ``temperature`` readings + thresholds: + temperature: 25.0 + + # Any value below 20 from any sensor will trigger a + # SensorDataBelowThresholdEvent, if the previous value was + # equal or above, and any value above 25 will trigger a + # SensorDataAboveThresholdEvent, if the previous value was + # equal or below (hysteresis configuration with double + # threshold) + thresholds: + - 20.0 + - 25.0 + + # Same as above, but the threshold is only applied to + # ``temperature`` readings + thresholds: + temperature: + - 20.0 + - 25.0 + + :param tolerance: If set, then the sensor change events will be + triggered only if the difference between the new value and the + previous value is higher than the specified tolerance. For example, + if the sensor data is mapped to a dictionary:: + + { + "temperature": 0.01, # Tolerance on the 2nd decimal digit + "humidity": 0.1 # Tolerance on the 1st decimal digit + } + + Or, if it's a raw scalar number:: + + 0.1 # Tolerance on the 1st decimal digit + + Or, if it's a list of values:: + + [ + 0.01, # Tolerance on the 2nd decimal digit for the first value + 0.1 # Tolerance on the 1st decimal digit for the second value + ] + + """ super().__init__(**kwargs) + self._tolerance = tolerance + self._thresholds = thresholds + self._last_measurement: Optional[SensorDataType] = None + """ Latest measurement from the sensor. """ + + def _has_changes_scalar( + self, + old_data: Union[int, float], + new_data: Union[int, float], + attr: Optional[str] = None, + index: Optional[int] = None, + ) -> bool: + """ + Returns ``True`` if the new data has changes compared to the old data - + limited to numeric scalar values. + """ + if isinstance(self._tolerance, (int, float)): + tolerance = self._tolerance + elif isinstance(self._tolerance, dict) and attr: + tolerance = self._tolerance.get(attr, 0) # type: ignore + elif isinstance(self._tolerance, (list, tuple)) and index: + tolerance = self._tolerance[index] + else: + tolerance = 0 + + return abs(old_data - new_data) > tolerance + + def _has_changes( + self, + old_data: Optional[SensorDataType], + new_data: Optional[SensorDataType], + attr: Optional[str] = None, + index: Optional[int] = None, + ) -> bool: + """ + Returns ``True`` if the new data has changes compared to the old data. + It also applies the configured tolerance thresholds. + """ + # If there is no previous data, then the new data will always be a change + if old_data is None: + return True + + # If the new data is missing, then we have no new changes + if new_data is None: + return False + + # If the data is scalar, then run the comparison logic + if isinstance(old_data, (int, float)) and isinstance(new_data, (int, float)): + return self._has_changes_scalar(old_data, new_data, attr, index) + + # If the data is dict-like, recursively call _has_changes on its attributes + if isinstance(old_data, dict) and isinstance(new_data, dict): + return any( + self._has_changes(old_data.get(attr), value, attr=attr) # type: ignore + for attr, value in new_data.items() + ) + + # If the data is list-like, recursively call _has_changes on its values + if isinstance(old_data, (list, tuple)) and isinstance(new_data, (list, tuple)): + return any( + self._has_changes(old_data[i], value, index=i) + for i, value in enumerate(new_data) + ) + + raise AssertionError( + f'Mismatching types for old_data and new_data: "{type(old_data)}" ' + f'and "{type(new_data)}"' + ) + + def _process_scalar_threshold_events( + self, + old_data: Optional[Numeric], + new_data: Numeric, + attr: Optional[str] = None, + ) -> List[SensorDataEvent]: + """ + Inner scalar processing for sensor above/below threshold events. + """ + event_types: List[Type[SensorDataEvent]] = [] + event_args = { + 'source': get_plugin_name_by_class(self.__class__), + } + + # If we're mapping against a dict attribute, extract its thresholds, + # otherwise use the default configured thresholds + thresholds = ( + self._thresholds.get(attr) + if attr and isinstance(self._thresholds, dict) + else self._thresholds + ) + + # Normalize low/high thresholds + low_t, high_t = ( + sorted(thresholds[:2]) + if isinstance(thresholds, (list, tuple)) + else (thresholds, thresholds) + ) + + if low_t is None or high_t is None: + return [] + + assert isinstance(low_t, Numeric) and isinstance( + high_t, Numeric + ), f'Non-numeric thresholds detected: "{low_t}" and "{high_t}"' + + # Above threshold case + if (old_data is None or old_data <= high_t) and new_data > high_t: + event_types.append(SensorDataAboveThresholdEvent) + # Below threshold case + elif (old_data is None or old_data >= low_t) and new_data < low_t: + event_types.append(SensorDataBelowThresholdEvent) + + return [ + event_type( + data={attr: new_data} if attr else new_data, + **event_args, + ) + for event_type in event_types + ] + + def _process_threshold_events( + self, + old_data: Optional[SensorDataType], + new_data: SensorDataType, + attr: Optional[str] = None, + ) -> List[SensorDataEvent]: + """ + Processes sensor above/below threshold events. + """ + events: List[SensorDataEvent] = [] + + # If there are no configured thresholds, there's nothing to do + if self._thresholds in (None, {}, (), []): + return events + + # Scalar case + if isinstance(old_data, (Numeric, NoneType)) and isinstance(new_data, Numeric): + return self._process_scalar_threshold_events( + old_data, new_data, attr # type: ignore + ) + + # From here on, threshold comparison only applies if both the old and + # new data is a str -> number mapping + if not (isinstance(old_data, (dict, NoneType)) and isinstance(new_data, dict)): + return events + + # Recursively call _process_threshold_events on the data attributes + for attr, value in new_data.items(): # type: ignore + events.extend( + self._process_threshold_events( + old_data=( + old_data.get(attr) # type: ignore + if isinstance(old_data, dict) + else old_data + ), + new_data=value, + attr=str(attr), + ) + ) + + return events + + def _process_sensor_events( + self, old_data: Optional[SensorDataType], new_data: Optional[SensorDataType] + ): + """ + Given the previous and new measurement, it runs the comparison logic + against the configured tolerance values and thresholds, and it + processes the required sensor data change and above/below threshold + events. + """ + # If the new data is missing or there are no changes, there are no + # events to process + if new_data is None or not self._has_changes(old_data, new_data): + return + + events = [ + SensorDataChangeEvent( + data=new_data, # type: ignore + source=get_plugin_name_by_class(self.__class__), + ), + *self._process_threshold_events(old_data, new_data), + ] + + for event in events: + get_bus().post(event) + + self.publish_entities(new_data) + + def _update_last_measurement(self, new_data: SensorDataType): + """ + Update the ``_last_measurement`` attribute with the newly acquired data. + """ + # If there is no last measurement, or either the new or old + # measurements are not dictionaries, then overwrite the previous data + # with the new data + if not ( + isinstance(self._last_measurement, dict) and isinstance(new_data, dict) + ): + self._last_measurement = new_data + + # Otherwise, merge the old data with the new + self._last_measurement.update(new_data) # type: ignore + + @override + @abstractmethod + def transform_entities(self, entities: SensorDataType) -> Collection[Entity]: + raise NotImplementedError() + + @override + def publish_entities( + self, entities: SensorDataType, *args, **kwargs + ) -> Collection[Entity]: + entities_args = [entities] if isinstance(entities, Numeric) else entities + return super().publish_entities(entities_args, *args, **kwargs) # type: ignore @abstractmethod @action - def get_measurement(self, *args, **kwargs): + def get_measurement(self, *args, **kwargs) -> SensorDataType: """ Implemented by the subclasses. - :returns: Either a raw scalar: + :returns: Either a raw scalar:: - ``output = 273.16`` + output = 273.16 or a name-value dictionary with the values that have been read:: @@ -31,7 +340,7 @@ class SensorPlugin(Plugin, ABC): or a list of values:: - [ + output = [ 0.01, 0.34, 0.53, @@ -39,18 +348,51 @@ class SensorPlugin(Plugin, ABC): ] """ - raise NotImplementedError('get_measurement should be implemented in a derived class') + raise NotImplementedError() @action def get_data(self, *args, **kwargs): """ - Alias for ``get_measurement`` + (Deprecated) alias for :meth:`.get_measurement`` """ - return self.get_measurement(*args, **kwargs).output + return self.get_measurement(*args, **kwargs) @action - def close(self): - pass + def status(self, *_, **__) -> Optional[SensorDataType]: + """ + Returns the latest read values and publishes the + :class:`platypush.message.event.entities.EntityUpdateEvent` events if + required. + """ + if self._last_measurement is not None: + self.publish_entities(self._last_measurement) + return self._last_measurement + + @override + def main(self): + sleep_retry_secs = 1 # Exponential back-off + + while not self.should_stop(): + try: + new_data: SensorDataType = self.get_measurement().output # type: ignore + # Reset the exponential back-off retry counter in case of success + sleep_retry_secs = 1 + except Exception as e: + self.logger.warning( + 'Could not update the status: %s. Next retry in %d seconds', + e, + sleep_retry_secs, + ) + self.wait_stop(sleep_retry_secs) + sleep_retry_secs = min( + sleep_retry_secs * 2, + self._max_retry_secs, + ) + continue + + self._process_sensor_events(self._last_measurement, new_data) + self._update_last_measurement(new_data) + self.wait_stop(self.poll_interval) # vim:sw=4:ts=4:et: