Compare commits

..

13 commits

Author SHA1 Message Date
5dabfed365
Migrated sensor.bme280 to the new SensorPlugin interface.
Removed the old `backend.sensor.bme280` and the old `gpio.sensor.bme280`
plugin. They have now been merged into the new `sensor.bme280` runnable
plugin, which extends the `SensorPlugin` API and supports entities.
2023-04-01 22:31:24 +02:00
6f237a1500
Support the deprecated poll_seconds option on RunnablePlugin 2023-04-01 22:02:59 +02:00
c23e8867e2
Added enabled_sensors to the sensor plugin 2023-04-01 21:56:56 +02:00
7912a59ff8
vl53l1x plugin migrated to the new SensorPlugin interface. 2023-04-01 19:31:13 +02:00
6a5a5de03e
serial plugin migrated to the new SensorPlugin interface. 2023-04-01 19:29:56 +02:00
bf4db76830
Legacy sensor backend replaced by an extended sensor runnable plugin. 2023-04-01 19:24:35 +02:00
bf75eb73ac
Added an abstract base SensorDataEvent for sensor events. 2023-03-31 22:51:35 +02:00
6a3ade3304
Added common.sensors package.
The package contains the base types and constants shared across
sensor-based integrations.
2023-03-31 22:50:47 +02:00
42d468c895
get_lock should raise a TimeoutError if lock.acquire is False 2023-03-31 22:31:32 +02:00
9693becb9e
Removed LGTM badges from the README.
LGTM is now merged into Github and the badges are no longer available.
2023-03-31 14:31:45 +02:00
7bdd877e49
Support the binary flag both on serial.read and serial.write. 2023-03-31 14:31:45 +02:00
1efaff878e
Rewritten serial plugin.
`backend.serial` has been removed and the polling logic merged into the
`serial` plugin.

The `serial` plugin now supports the new entity engine as well.
2023-03-31 14:31:45 +02:00
4f15758de9
black fixes 2023-03-31 14:31:38 +02:00
19 changed files with 953 additions and 647 deletions

View file

@ -8,8 +8,6 @@ Platypush
[![Last Commit](https://img.shields.io/github/last-commit/BlackLight/platypush.svg)](https://git.platypush.tech/platypush/platypush/-/commits/master/) [![Last Commit](https://img.shields.io/github/last-commit/BlackLight/platypush.svg)](https://git.platypush.tech/platypush/platypush/-/commits/master/)
[![Join chat on Matrix](https://img.shields.io/matrix/:platypush?server_fqdn=matrix.platypush.tech)](https://matrix.to/#/#platypush:matrix.platypush.tech) [![Join chat on Matrix](https://img.shields.io/matrix/:platypush?server_fqdn=matrix.platypush.tech)](https://matrix.to/#/#platypush:matrix.platypush.tech)
[![Contributions](https://img.shields.io/badge/contributions-welcome-brightgreen.svg?style=flat)](https://git.platypush.tech/platypush/platypush/-/blob/master/CONTRIBUTING.md) [![Contributions](https://img.shields.io/badge/contributions-welcome-brightgreen.svg?style=flat)](https://git.platypush.tech/platypush/platypush/-/blob/master/CONTRIBUTING.md)
[![Language grade: Python](https://img.shields.io/lgtm/grade/python/g/BlackLight/platypush.svg?logo=lgtm&logoWidth=18)](https://lgtm.com/projects/g/BlackLight/platypush/context:python)
[![Language grade: JavaScript](https://img.shields.io/lgtm/grade/javascript/g/BlackLight/platypush.svg?logo=lgtm&logoWidth=18)](https://lgtm.com/projects/g/BlackLight/platypush/context:javascript)
<!-- toc --> <!-- toc -->

View file

@ -1,232 +0,0 @@
import time
from platypush.backend import Backend
from platypush.context import get_plugin
from platypush.message.event.sensor import SensorDataChangeEvent, \
SensorDataAboveThresholdEvent, SensorDataBelowThresholdEvent
class SensorBackend(Backend):
"""
Abstract backend for polling sensors.
Triggers:
* :class:`platypush.message.event.sensor.SensorDataChangeEvent` if the measurements of a sensor have changed
* :class:`platypush.message.event.sensor.SensorDataAboveThresholdEvent` if the measurements of a sensor have
gone above a configured threshold
* :class:`platypush.message.event.sensor.SensorDataBelowThresholdEvent` if the measurements of a sensor have
gone below a configured threshold
"""
default_tolerance = 1e-7
def __init__(self, plugin=None, plugin_args=None, thresholds=None, tolerance=default_tolerance, poll_seconds=None,
enabled_sensors=None, **kwargs):
"""
:param plugin: If set, then this plugin instance, referenced by plugin id, will be polled
through ``get_plugin()``. Example: ``'gpio.sensor.bme280'`` or ``'gpio.sensor.envirophat'``.
:type plugin: str
:param plugin_args: If plugin is set and its ``get_measurement()`` method accepts optional arguments, then you
can pass those arguments through ``plugin_args``.
:type plugin_args: dict
:param thresholds: Thresholds can be either a scalar value or a dictionary (e.g. ``{"temperature": 20.0}``).
Sensor threshold events will be fired when measurements get above or below these values.
Set it as a scalar if your get_measurement() code returns a scalar, as a dictionary if it returns a
dictionary of values. For instance, if your sensor code returns both humidity and temperature in a format
like ``{'humidity':60.0, 'temperature': 25.0}``, you'll want to set up a threshold on temperature with a
syntax like ``{'temperature':20.0}`` to trigger events when the temperature goes above/below 20 degrees.
:param tolerance: If set, then the sensor change events will be triggered only if the difference between
the new value and the previous value is higher than the specified tolerance. Example::
{
"temperature": 0.01, # Tolerance on the 2nd decimal digit
"humidity": 0.1 # Tolerance on the 1st decimal digit
}
:type tolerance: dict or float
:param poll_seconds: If set, the thread will wait for the specified number of seconds between a read and the
next one.
:type poll_seconds: float
:param enabled_sensors: If ``get_measurement()`` returns data in dict form, then ``enabled_sensors`` selects
which keys should be taken into account when monitoring for new events (e.g. "temperature" or "humidity").
:type enabled_sensors: dict (in the form ``name -> [True/False]``), set or list
"""
super().__init__(**kwargs)
self.data = None
self.plugin = plugin
self.plugin_args = plugin_args or {}
self.thresholds = thresholds
self.tolerance = tolerance
self.poll_seconds = poll_seconds
if isinstance(enabled_sensors, list):
enabled_sensors = set(enabled_sensors)
if isinstance(enabled_sensors, set):
enabled_sensors = {k: True for k in enabled_sensors}
self.enabled_sensors = enabled_sensors or {}
def get_measurement(self):
"""
Wrapper around ``plugin.get_measurement()`` that can filter events on specified enabled sensors data or on
specified tolerance values. It can be overridden by derived classes.
"""
if not self.plugin:
raise NotImplementedError('No plugin specified')
reload = False
success = False
data = None
while not success:
try:
plugin = get_plugin(self.plugin, reload=reload)
data = plugin.get_data(**self.plugin_args).output
if reload:
self.logger.info('Backend successfully restored')
success = True
except Exception as e:
self.logger.warning('Unexpected exception while getting data: {}'.format(str(e)))
self.logger.exception(e)
reload = True
time.sleep(5)
if self.enabled_sensors and data is not None:
data = {
sensor: data[sensor]
for sensor, enabled in self.enabled_sensors.items()
if enabled and sensor in data
}
return data
@staticmethod
def _get_value(value):
if isinstance(value, float) or isinstance(value, int) or isinstance(value, bool):
return value
return float(value)
def get_new_data(self, new_data):
if self.data is None or new_data is None:
return new_data
try:
# Scalar data case
new_data = self._get_value(new_data)
return new_data if abs(new_data - self.data) >= self.tolerance else None
except (ValueError, TypeError):
# If it's not a scalar then it should be a dict
assert isinstance(new_data, dict), 'Invalid type {} received for sensor data'.format(type(new_data))
ret = {}
for k, v in new_data.items():
if (v is None and self.data.get(k) is not None) \
or k not in self.data \
or self.tolerance is None:
ret[k] = v
continue
if v is None:
continue
tolerance = None
is_nan = False
old_v = None
try:
v = self._get_value(v)
old_v = self._get_value(self.data.get(k))
except (TypeError, ValueError):
is_nan = True
if not is_nan:
if isinstance(self.tolerance, dict):
tolerance = float(self.tolerance.get(k, self.default_tolerance))
else:
try:
tolerance = float(self.tolerance)
except (TypeError, ValueError):
pass
if tolerance is None or abs(v - old_v) >= tolerance:
ret[k] = v
elif k not in self.data or self.data[k] != v:
ret[k] = v
return ret
def on_stop(self):
super().on_stop()
if not self.plugin:
return
plugin = get_plugin(self.plugin)
if plugin and hasattr(plugin, 'close'):
plugin.close()
def process_data(self, data, new_data=None, **__):
if data is not None and data not in ({}, []):
self.bus.post(SensorDataChangeEvent(data=data, source=self.plugin or self.__class__.__name__))
def run(self):
super().run()
self.logger.info('Initialized {} sensor backend'.format(self.__class__.__name__))
while not self.should_stop():
try:
data = self.get_measurement()
new_data = self.get_new_data(data)
self.process_data(new_data)
data_below_threshold = {}
data_above_threshold = {}
if self.thresholds:
if isinstance(self.thresholds, dict) and isinstance(data, dict):
for (measure, thresholds) in self.thresholds.items():
if measure not in data:
continue
if not isinstance(thresholds, list):
thresholds = [thresholds]
for threshold in thresholds:
if data[measure] > threshold and (self.data is None or (
measure in self.data and self.data[measure] <= threshold)):
data_above_threshold[measure] = data[measure]
elif data[measure] < threshold and (self.data is None or (
measure in self.data and self.data[measure] >= threshold)):
data_below_threshold[measure] = data[measure]
if data_below_threshold:
self.bus.post(SensorDataBelowThresholdEvent(data=data_below_threshold))
if data_above_threshold:
self.bus.post(SensorDataAboveThresholdEvent(data=data_above_threshold))
self.data = data
if new_data:
if isinstance(new_data, dict):
for k, v in new_data.items():
self.data[k] = v
else:
self.data = new_data
except Exception as e:
self.logger.exception(e)
if self.poll_seconds:
time.sleep(self.poll_seconds)
# vim:sw=4:ts=4:et:

View file

@ -1,39 +0,0 @@
from platypush.backend.sensor import SensorBackend
class SensorBme280Backend(SensorBackend):
"""
Backend to poll analog sensor values from a `BME280 <https://shop.pimoroni.com/products/bme280-breakout>`_
environment sensor
Requires:
* ``pimoroni-bme280`` (``pip install pimoroni-bme280``)
Triggers:
* :class:`platypush.message.event.sensor.SensorDataChangeEvent` if the measurements of a sensor have changed
* :class:`platypush.message.event.sensor.SensorDataAboveThresholdEvent` if the measurements of a sensor have
gone above a configured threshold
* :class:`platypush.message.event.sensor.SensorDataBelowThresholdEvent` if the measurements of a sensor have
gone below a configured threshold
"""
def __init__(self, temperature=True, pressure=True, humidity=True, **kwargs):
"""
:param temperature: Enable temperature sensor polling
:param pressure: Enable pressure sensor polling
:param humidity: Enable humidity sensor polling
"""
enabled_sensors = {
'temperature': temperature,
'pressure': pressure,
'humidity': humidity,
}
super().__init__(plugin='gpio.sensor.bme280', enabled_sensors=enabled_sensors, **kwargs)
# vim:sw=4:ts=4:et:

View file

@ -1,13 +0,0 @@
manifest:
events:
platypush.message.event.sensor.SensorDataAboveThresholdEvent: if the measurements
of a sensor havegone above a configured threshold
platypush.message.event.sensor.SensorDataBelowThresholdEvent: if the measurements
of a sensor havegone below a configured threshold
platypush.message.event.sensor.SensorDataChangeEvent: if the measurements of a
sensor have changed
install:
pip:
- pimoroni-bme280
package: platypush.backend.sensor.bme280
type: backend

View file

@ -1,27 +0,0 @@
from platypush.backend.sensor import SensorBackend
class SensorSerialBackend(SensorBackend):
"""
This backend listens for new events from sensors connected through a serial
interface (like Arduino) acting as a wrapper for the ``serial`` plugin.
Requires:
* The :mod:`platypush.plugins.serial` plugin configured
Triggers:
* :class:`platypush.message.event.sensor.SensorDataChangeEvent` if the measurements of a sensor have changed
* :class:`platypush.message.event.sensor.SensorDataAboveThresholdEvent` if the measurements of a sensor have
gone above a configured threshold
* :class:`platypush.message.event.sensor.SensorDataBelowThresholdEvent` if the measurements of a sensor have
gone below a configured threshold
"""
def __init__(self, **kwargs):
super().__init__(plugin='serial', **kwargs)
# vim:sw=4:ts=4:et:

View file

@ -1,12 +0,0 @@
manifest:
events:
platypush.message.event.sensor.SensorDataAboveThresholdEvent: if the measurements
of a sensor havegone above a configured threshold
platypush.message.event.sensor.SensorDataBelowThresholdEvent: if the measurements
of a sensor havegone below a configured threshold
platypush.message.event.sensor.SensorDataChangeEvent: if the measurements of a
sensor have changed
install:
pip: []
package: platypush.backend.sensor.serial
type: backend

View file

@ -0,0 +1,11 @@
from typing import Iterable, Mapping, Union
Numeric = Union[float, int]
SensorDataType = Union[Numeric, Mapping[str, Numeric], Iterable[Numeric]]
"""
Numeric sensor data published by integrations can be either of:
- ``int``/``float``
- Mapping of ``str -> int/float``
- List of ``int``/``float``
"""

View file

@ -1,49 +1,41 @@
from abc import ABC
from typing import Optional from typing import Optional
from platypush.common.sensors import SensorDataType
from platypush.message.event import Event from platypush.message.event import Event
class SensorDataChangeEvent(Event): class SensorDataEvent(Event, ABC):
"""
Sensor events base class.
"""
def __init__(
self, *args, data: SensorDataType, source: Optional[str] = None, **kwargs
):
"""
:param data: Sensor data.
:param source: Sensor source - usually the plugin qualified name.
"""
super().__init__(data=data, source=source, *args, **kwargs)
class SensorDataChangeEvent(SensorDataEvent):
""" """
Event triggered when a sensor has new data Event triggered when a sensor has new data
""" """
def __init__(self, data, source: Optional[str] = None, *args, **kwargs):
"""
:param data: Sensor data
"""
super().__init__(data=data, source=source, *args, **kwargs) class SensorDataAboveThresholdEvent(SensorDataEvent):
self.data = data
self.source = source
class SensorDataAboveThresholdEvent(Event):
""" """
Event triggered when a sensor's read goes above a configured threshold Event triggered when a sensor's read goes above a configured threshold
""" """
def __init__(self, data, *args, **kwargs):
"""
:param data: Sensor data
"""
super().__init__(data=data, *args, **kwargs) class SensorDataBelowThresholdEvent(SensorDataEvent):
self.data = data
class SensorDataBelowThresholdEvent(Event):
""" """
Event triggered when a sensor's read goes below a configured threshold Event triggered when a sensor's read goes below a configured threshold
""" """
def __init__(self, data, *args, **kwargs):
"""
:param data: Sensor data
"""
super().__init__(data=data, *args, **kwargs)
self.data = data
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et:

View file

@ -1,6 +1,7 @@
import asyncio import asyncio
import logging import logging
import threading import threading
import warnings
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from functools import wraps from functools import wraps
@ -86,7 +87,9 @@ class RunnablePlugin(Plugin):
): ):
""" """
:param poll_interval: How often the :meth:`.loop` function should be :param poll_interval: How often the :meth:`.loop` function should be
execute (default: 15 seconds). execute (default: 15 seconds). *NOTE*: For back-compatibility
reasons, the `poll_seconds` argument is also supported, but it's
deprecated.
:param stop_timeout: How long we should wait for any running :param stop_timeout: How long we should wait for any running
threads/processes to stop before exiting (default: 5 seconds). threads/processes to stop before exiting (default: 5 seconds).
""" """
@ -97,6 +100,16 @@ class RunnablePlugin(Plugin):
self._stop_timeout = stop_timeout self._stop_timeout = stop_timeout
self._thread: Optional[threading.Thread] = None self._thread: Optional[threading.Thread] = None
if kwargs.get('poll_seconds') is not None:
warnings.warn(
'poll_seconds is deprecated, use poll_interval instead',
DeprecationWarning,
stacklevel=2,
)
if self.poll_interval is None:
self.poll_interval = kwargs['poll_seconds']
def main(self): def main(self):
""" """
Implementation of the main loop of the plugin. Implementation of the main loop of the plugin.

View file

@ -1,63 +0,0 @@
from platypush.plugins import action
from platypush.plugins.gpio.sensor import GpioSensorPlugin
class GpioSensorBme280Plugin(GpioSensorPlugin):
"""
Plugin to interact with a `BME280 <https://shop.pimoroni.com/products/bme280-breakout>`_ environment sensor for
temperature, humidity and pressure measurements over I2C interface
Requires:
* ``pimoroni-bme280`` (``pip install pimoroni-bme280``)
"""
def __init__(self, port=1, **kwargs):
"""
:param port: I2C port. 0 = /dev/i2c-0 (port I2C0), 1 = /dev/i2c-1 (port I2C1)
"""
super().__init__(**kwargs)
self.port = port
self._bus = None
self._device = None
# noinspection PyPackageRequirements
# noinspection PyUnresolvedReferences
def _get_device(self):
if self._device:
return self._device
from smbus import SMBus
from bme280 import BME280
self._bus = SMBus(self.port)
self._device = BME280(i2c_dev=self._bus)
return self._device
@action
def get_measurement(self):
"""
:returns: dict. Example:
.. code-block:: python
output = {
"temperature": 21.0, # Celsius
"pressure": 101555.08, # Pascals
"humidity": 23.543, # percentage
"altitude": 15.703 # meters
}
"""
device = self._get_device()
return {
'temperature': device.get_temperature(),
'pressure': device.get_pressure()*100,
'humidity': device.get_humidity(),
'altitude': device.get_altitude(),
}
# vim:sw=4:ts=4:et:

View file

@ -1,7 +0,0 @@
manifest:
events: {}
install:
pip:
- pimoroni-bme280
package: platypush.plugins.gpio.sensor.bme280
type: plugin

View file

@ -1,26 +1,350 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from types import NoneType
from typing import Collection, Iterable, List, Mapping, Optional, Tuple, Type, Union
from typing_extensions import override
from platypush.plugins import Plugin, action from platypush.common.sensors import Numeric, SensorDataType
from platypush.context import get_bus
from platypush.entities import Entity
from platypush.entities.managers.sensors import SensorEntityManager
from platypush.message.event.sensor import (
SensorDataAboveThresholdEvent,
SensorDataBelowThresholdEvent,
SensorDataChangeEvent,
SensorDataEvent,
)
from platypush.plugins import RunnablePlugin, action
from platypush.utils import get_plugin_name_by_class
ThresholdType = Union[Numeric, Tuple[Numeric, Numeric]]
ThresholdConfiguration = Union[ThresholdType, Mapping[str, ThresholdType]]
class SensorPlugin(Plugin, ABC): class SensorPlugin(RunnablePlugin, SensorEntityManager, ABC):
""" """
Sensor abstract plugin. Any plugin that interacts with sensors Sensor abstract plugin. Any plugin that interacts with sensors
should implement this class (and the get_measurement() method) should implement this class.
Triggers:
* :class:`platypush.message.event.sensor.SensorDataAboveThresholdEvent`
* :class:`platypush.message.event.sensor.SensorDataBelowThresholdEvent`
* :class:`platypush.message.event.sensor.SensorDataChangeEvent`
""" """
def __init__(self, **kwargs): _max_retry_secs = 60.0
"""
In case of failure, we apply an exponential back-off retry algorithm. This
is the maximum number of seconds that we should wait during these retries.
"""
def __init__(
self,
thresholds: Optional[ThresholdConfiguration] = None,
tolerance: SensorDataType = 0,
enabled_sensors: Optional[Iterable[str]] = None,
**kwargs,
):
"""
:param thresholds: A number, numeric pair or mapping of ``str`` to
number/numeric pair representing the thresholds for the sensor.
Examples:
.. code-block:: yaml
# Any value below 25 from any sensor will trigger a
# SensorDataBelowThresholdEvent, if the previous value was
# equal or above, and any value above 25 will trigger a
# SensorDataAboveThresholdEvent, if the previous value was
# equal or below
thresholds: 25.0
# Same as above, but the threshold is only applied to
# ``temperature`` readings
thresholds:
temperature: 25.0
# Any value below 20 from any sensor will trigger a
# SensorDataBelowThresholdEvent, if the previous value was
# equal or above, and any value above 25 will trigger a
# SensorDataAboveThresholdEvent, if the previous value was
# equal or below (hysteresis configuration with double
# threshold)
thresholds:
- 20.0
- 25.0
# Same as above, but the threshold is only applied to
# ``temperature`` readings
thresholds:
temperature:
- 20.0
- 25.0
:param tolerance: If set, then the sensor change events will be
triggered only if the difference between the new value and the
previous value is higher than the specified tolerance. For example,
if the sensor data is mapped to a dictionary::
{
"temperature": 0.01, # Tolerance on the 2nd decimal digit
"humidity": 0.1 # Tolerance on the 1st decimal digit
}
Or, if it's a raw scalar number::
0.1 # Tolerance on the 1st decimal digit
Or, if it's a list of values::
[
0.01, # Tolerance on the 2nd decimal digit for the first value
0.1 # Tolerance on the 1st decimal digit for the second value
]
:param enabled_sensors: If :meth:`.get_measurement` returns a key-value
mapping, and ``enabled_sensors`` is set, then only the reported
sensor keys will be returned.
"""
super().__init__(**kwargs) super().__init__(**kwargs)
self._tolerance = tolerance
self._thresholds = thresholds
self._enabled_sensors = set(enabled_sensors or [])
self._last_measurement: Optional[SensorDataType] = None
""" Latest measurement from the sensor. """
def _has_changes_scalar(
self,
old_data: Union[int, float],
new_data: Union[int, float],
attr: Optional[str] = None,
index: Optional[int] = None,
) -> bool:
"""
Returns ``True`` if the new data has changes compared to the old data -
limited to numeric scalar values.
"""
if isinstance(self._tolerance, (int, float)):
tolerance = self._tolerance
elif isinstance(self._tolerance, dict) and attr:
tolerance = self._tolerance.get(attr, 0) # type: ignore
elif isinstance(self._tolerance, (list, tuple)) and index:
tolerance = self._tolerance[index]
else:
tolerance = 0
return abs(old_data - new_data) > tolerance
def _has_changes(
self,
old_data: Optional[SensorDataType],
new_data: Optional[SensorDataType],
attr: Optional[str] = None,
index: Optional[int] = None,
) -> bool:
"""
Returns ``True`` if the new data has changes compared to the old data.
It also applies the configured tolerance thresholds.
"""
# If there is no previous data, then the new data will always be a change
if old_data is None:
return True
# If the new data is missing, then we have no new changes
if new_data is None:
return False
# If the data is scalar, then run the comparison logic
if isinstance(old_data, (int, float)) and isinstance(new_data, (int, float)):
return self._has_changes_scalar(old_data, new_data, attr, index)
# If the data is dict-like, recursively call _has_changes on its attributes
if isinstance(old_data, dict) and isinstance(new_data, dict):
return any(
self._has_changes(old_data.get(attr), value, attr=attr) # type: ignore
for attr, value in new_data.items()
)
# If the data is list-like, recursively call _has_changes on its values
if isinstance(old_data, (list, tuple)) and isinstance(new_data, (list, tuple)):
return any(
self._has_changes(old_data[i], value, index=i)
for i, value in enumerate(new_data)
)
raise AssertionError(
f'Mismatching types for old_data and new_data: "{type(old_data)}" '
f'and "{type(new_data)}"'
)
def _process_scalar_threshold_events(
self,
old_data: Optional[Numeric],
new_data: Numeric,
attr: Optional[str] = None,
) -> List[SensorDataEvent]:
"""
Inner scalar processing for sensor above/below threshold events.
"""
event_types: List[Type[SensorDataEvent]] = []
event_args = {
'source': get_plugin_name_by_class(self.__class__),
}
# If we're mapping against a dict attribute, extract its thresholds,
# otherwise use the default configured thresholds
thresholds = (
self._thresholds.get(attr)
if attr and isinstance(self._thresholds, dict)
else self._thresholds
)
# Normalize low/high thresholds
low_t, high_t = (
sorted(thresholds[:2])
if isinstance(thresholds, (list, tuple))
else (thresholds, thresholds)
)
if low_t is None or high_t is None:
return []
assert isinstance(low_t, Numeric) and isinstance(
high_t, Numeric
), f'Non-numeric thresholds detected: "{low_t}" and "{high_t}"'
# Above threshold case
if (old_data is None or old_data <= high_t) and new_data > high_t:
event_types.append(SensorDataAboveThresholdEvent)
# Below threshold case
elif (old_data is None or old_data >= low_t) and new_data < low_t:
event_types.append(SensorDataBelowThresholdEvent)
return [
event_type(
data={attr: new_data} if attr else new_data,
**event_args,
)
for event_type in event_types
]
def _process_threshold_events(
self,
old_data: Optional[SensorDataType],
new_data: SensorDataType,
attr: Optional[str] = None,
) -> List[SensorDataEvent]:
"""
Processes sensor above/below threshold events.
"""
events: List[SensorDataEvent] = []
# If there are no configured thresholds, there's nothing to do
if self._thresholds in (None, {}, (), []):
return events
# Scalar case
if isinstance(old_data, (Numeric, NoneType)) and isinstance(new_data, Numeric):
return self._process_scalar_threshold_events(
old_data, new_data, attr # type: ignore
)
# From here on, threshold comparison only applies if both the old and
# new data is a str -> number mapping
if not (isinstance(old_data, (dict, NoneType)) and isinstance(new_data, dict)):
return events
# Recursively call _process_threshold_events on the data attributes
for attr, value in new_data.items(): # type: ignore
events.extend(
self._process_threshold_events(
old_data=(
old_data.get(attr) # type: ignore
if isinstance(old_data, dict)
else old_data
),
new_data=value,
attr=str(attr),
)
)
return events
def _process_sensor_events(
self, old_data: Optional[SensorDataType], new_data: Optional[SensorDataType]
):
"""
Given the previous and new measurement, it runs the comparison logic
against the configured tolerance values and thresholds, and it
processes the required sensor data change and above/below threshold
events.
"""
# If the new data is missing or there are no changes, there are no
# events to process
if new_data is None or not self._has_changes(old_data, new_data):
return
events = [
SensorDataChangeEvent(
data=new_data, # type: ignore
source=get_plugin_name_by_class(self.__class__),
),
*self._process_threshold_events(old_data, new_data),
]
for event in events:
get_bus().post(event)
self.publish_entities(new_data)
def _update_last_measurement(self, new_data: SensorDataType):
"""
Update the ``_last_measurement`` attribute with the newly acquired data.
"""
# If there is no last measurement, or either the new or old
# measurements are not dictionaries, then overwrite the previous data
# with the new data
if not (
isinstance(self._last_measurement, dict) and isinstance(new_data, dict)
):
self._last_measurement = new_data
# Otherwise, merge the old data with the new
self._last_measurement.update(new_data) # type: ignore
def _filter_enabled_sensors(self, data: SensorDataType) -> SensorDataType:
"""
If ``data`` is a sensor mapping, and ``enabled_sensors`` is set, filter
out only the requested keys.
"""
if not (isinstance(data, dict) and self._enabled_sensors):
return data
return {k: v for k, v in data.items() if k in self._enabled_sensors}
@override
@abstractmethod
def transform_entities(self, entities: SensorDataType) -> Collection[Entity]:
raise NotImplementedError()
@override
def publish_entities(
self, entities: SensorDataType, *args, **kwargs
) -> Collection[Entity]:
entities_args = [entities] if isinstance(entities, Numeric) else entities
return super().publish_entities(entities_args, *args, **kwargs) # type: ignore
@abstractmethod @abstractmethod
@action @action
def get_measurement(self, *args, **kwargs): def get_measurement(self, *args, **kwargs) -> SensorDataType:
""" """
Implemented by the subclasses. Implemented by the subclasses.
:returns: Either a raw scalar: :returns: Either a raw scalar::
``output = 273.16`` output = 273.16
or a name-value dictionary with the values that have been read:: or a name-value dictionary with the values that have been read::
@ -31,7 +355,7 @@ class SensorPlugin(Plugin, ABC):
or a list of values:: or a list of values::
[ output = [
0.01, 0.01,
0.34, 0.34,
0.53, 0.53,
@ -39,18 +363,52 @@ class SensorPlugin(Plugin, ABC):
] ]
""" """
raise NotImplementedError('get_measurement should be implemented in a derived class') raise NotImplementedError()
@action @action
def get_data(self, *args, **kwargs): def get_data(self, *args, **kwargs):
""" """
Alias for ``get_measurement`` (Deprecated) alias for :meth:`.get_measurement``
""" """
return self.get_measurement(*args, **kwargs).output return self.get_measurement(*args, **kwargs)
@action @action
def close(self): def status(self, *_, **__) -> Optional[SensorDataType]:
pass """
Returns the latest read values and publishes the
:class:`platypush.message.event.entities.EntityUpdateEvent` events if
required.
"""
if self._last_measurement is not None:
self.publish_entities(self._last_measurement)
return self._last_measurement
@override
def main(self):
sleep_retry_secs = 1 # Exponential back-off
while not self.should_stop():
try:
new_data: SensorDataType = self.get_measurement().output # type: ignore
# Reset the exponential back-off retry counter in case of success
sleep_retry_secs = 1
except Exception as e:
self.logger.warning(
'Could not update the status: %s. Next retry in %d seconds',
e,
sleep_retry_secs,
)
self.wait_stop(sleep_retry_secs)
sleep_retry_secs = min(
sleep_retry_secs * 2,
self._max_retry_secs,
)
continue
new_data = self._filter_enabled_sensors(new_data)
self._process_sensor_events(self._last_measurement, new_data)
self._update_last_measurement(new_data)
self.wait_stop(self.poll_interval)
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et:

View file

@ -0,0 +1,155 @@
from dataclasses import dataclass
from typing import Dict, List, Type
from typing_extensions import override
from platypush.common.sensors import Numeric
from platypush.entities.devices import Device
from platypush.entities.distance import DistanceSensor
from platypush.entities.humidity import HumiditySensor
from platypush.entities.pressure import PressureSensor
from platypush.entities.sensors import NumericSensor
from platypush.entities.temperature import TemperatureSensor
from platypush.plugins import action
from platypush.plugins.sensor import SensorPlugin
@dataclass
class SensorEntityMapping:
"""
Maps the dict-like data returned by the plugin to corresponding sensor
entities.
"""
name: str
unit: str
entity_type: Type[NumericSensor]
@property
def id(self):
"""
The standardized external ID for the entity.
"""
return f'bme280:{self.name}'
_sensor_entity_mappings = {
mapping.name: mapping
for mapping in [
SensorEntityMapping(
name='temperature',
unit='°C',
entity_type=TemperatureSensor,
),
SensorEntityMapping(
name='humidity',
unit='%',
entity_type=HumiditySensor,
),
SensorEntityMapping(
name='pressure',
unit='Pa',
entity_type=PressureSensor,
),
SensorEntityMapping(
name='altitude',
unit='m',
entity_type=DistanceSensor,
),
]
}
# pylint: disable=too-many-ancestors
class SensorBme280Plugin(SensorPlugin):
"""
Plugin to interact with a `BME280 <https://shop.pimoroni.com/products/bme280-breakout>`_ environment sensor for
temperature, humidity and pressure measurements over I2C interface
Requires:
* ``pimoroni-bme280`` (``pip install pimoroni-bme280``)
Triggers:
* :class:`platypush.message.event.sensor.SensorDataAboveThresholdEvent`
* :class:`platypush.message.event.sensor.SensorDataBelowThresholdEvent`
* :class:`platypush.message.event.sensor.SensorDataChangeEvent`
"""
def __init__(self, port: int = 1, **kwargs):
"""
:param port: I2C port. 0 = /dev/i2c-0 (port I2C0), 1 = /dev/i2c-1 (port I2C1)
"""
super().__init__(**kwargs)
self.port = port
self._bus = None
self._device = None
def _get_device(self):
from smbus import SMBus
from bme280 import BME280
if self._device:
return self._device
self._bus = SMBus(self.port)
self._device = BME280(i2c_dev=self._bus)
return self._device
@override
@action
def get_measurement(self, *_, **__):
"""
:returns: dict. Example:
.. code-block:: python
output = {
"temperature": 21.0, # Celsius
"pressure": 101555.08, # Pascals
"humidity": 23.543, # percentage
"altitude": 15.703 # meters
}
"""
device = self._get_device()
return {
'temperature': device.get_temperature(),
'pressure': device.get_pressure() * 100,
'humidity': device.get_humidity(),
'altitude': device.get_altitude(),
}
@override
def transform_entities(self, entities: Dict[str, Numeric]) -> List[Device]:
sensors = []
for sensor, value in entities.items():
if value is None:
continue
mapping = _sensor_entity_mappings[sensor]
sensors.append(
mapping.entity_type(
id=mapping.id,
name=mapping.name,
value=value,
unit=mapping.unit,
)
)
if not sensors:
return []
return [
Device(
id='bme280',
name='BME280 Sensor',
children=sensors,
)
]
# vim:sw=4:ts=4:et:

View file

@ -0,0 +1,10 @@
manifest:
events:
platypush.message.event.sensor.SensorDataAboveThresholdEvent:
platypush.message.event.sensor.SensorDataBelowThresholdEvent:
platypush.message.event.sensor.SensorDataChangeEvent:
install:
pip:
- pimoroni-bme280
package: platypush.plugins.sensor.bme280
type: plugin

View file

@ -3,16 +3,13 @@ from threading import RLock
from typing import List, Mapping from typing import List, Mapping
from typing_extensions import override from typing_extensions import override
from platypush.context import get_bus
from platypush.entities.managers.sensors import SensorEntityManager
from platypush.entities.devices import Device from platypush.entities.devices import Device
from platypush.entities.distance import DistanceSensor from platypush.entities.distance import DistanceSensor
from platypush.message.event.sensor import SensorDataChangeEvent from platypush.plugins.sensor import SensorPlugin
from platypush.plugins import RunnablePlugin, action
from platypush.utils import get_plugin_name_by_class
class SensorDistanceVl53l1xPlugin(RunnablePlugin, SensorEntityManager): # pylint: disable=too-many-ancestors
class SensorDistanceVl53l1xPlugin(SensorPlugin):
""" """
Plugin to interact with an `VL53L1x Plugin to interact with an `VL53L1x
<https://www.st.com/en/imaging-and-photonics-solutions/vl53l1x.html>`_ <https://www.st.com/en/imaging-and-photonics-solutions/vl53l1x.html>`_
@ -68,13 +65,8 @@ class SensorDistanceVl53l1xPlugin(RunnablePlugin, SensorEntityManager):
self._device.close() self._device.close()
self._device = None self._device = None
@action
@override @override
def status(self, *_, **__): def get_measurement(self, *_, short=True, medium=True, long=True, **__):
self.publish_entities(self._last_data)
return self._last_data
def _status(self, *_, short=True, medium=True, long=True, **__):
""" """
:param short: Enable short range measurement (default: True) :param short: Enable short range measurement (default: True)
:param medium: Enable medium range measurement (default: True) :param medium: Enable medium range measurement (default: True)
@ -117,20 +109,6 @@ class SensorDistanceVl53l1xPlugin(RunnablePlugin, SensorEntityManager):
return ret return ret
@action
def get_data(self, *args, **kwargs):
"""
(Deprecated) alias for :meth:`.status`.
"""
return self.status(*args, **kwargs)
@action
def get_measurement(self, *args, **kwargs):
"""
(Deprecated) alias for :meth:`.status`.
"""
return self.status(*args, **kwargs)
@override @override
def transform_entities(self, entities: Mapping[str, int]) -> List[Device]: def transform_entities(self, entities: Mapping[str, int]) -> List[Device]:
return super().transform_entities( # type: ignore return super().transform_entities( # type: ignore
@ -151,25 +129,5 @@ class SensorDistanceVl53l1xPlugin(RunnablePlugin, SensorEntityManager):
] ]
) )
@override
def publish_entities(self, entities: Mapping[str, int], *_, **__) -> List[Device]:
get_bus().post(
SensorDataChangeEvent(
data=entities,
source=get_plugin_name_by_class(self.__class__),
)
)
return super().publish_entities(entities) # type: ignore
@override
def main(self):
while not self.should_stop():
status = self._status()
if status != self._last_data:
self.publish_entities(status)
self._last_data = status
self.wait_stop(self.poll_interval)
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et:

View file

@ -1,47 +1,142 @@
import base64 import base64
from collections import namedtuple
import json import json
import serial from typing import Dict, List, Optional, Union
import threading import threading
import time from typing_extensions import override
from serial import Serial
from platypush.common.sensors import Numeric
from platypush.entities.devices import Device
from platypush.entities.sensors import RawSensor, NumericSensor
from platypush.plugins import action from platypush.plugins import action
from platypush.plugins.sensor import SensorPlugin from platypush.plugins.sensor import SensorPlugin
from platypush.utils import get_lock
_DeviceAndRate = namedtuple('_DeviceAndRate', ['device', 'baud_rate'])
class SerialPlugin(SensorPlugin): class SerialPlugin(SensorPlugin):
""" """
The serial plugin can read data from a serial device, as long as the serial The serial plugin can read data from a serial device.
device returns a JSON. You can use this plugin to interact for example with
some sensors connected through an Arduino. Just make sure that the code on If the device returns a JSON string, then that string will be parsed for
your serial device returns JSON values. If you're using an Arduino or any individual values. For example:
ATMega compatible device, take a look at
https://github.com/bblanchon/ArduinoJson. .. code-block:: json
{"temperature": 25.0, "humidity": 15.0}
If the serial device returns such a value, then ``temperature`` and
``humidity`` will be parsed as separate entities with the same names as the
keys provided on the payload.
The JSON option is a good choice if you have an Arduino/ESP-like device
whose code you can control, as it allows to easily send data to Platypush
in a simple key-value format. The use-case would be that of an Arduino/ESP
device that pushes data on the wire, and this integration would then listen
for updates.
Alternatively, you can also use this integration in a more traditional way
through the :meth:`.read` and :meth:`.write` methods to read and write data
to the device. In such a case, you may want to disable the "smart polling"
by setting ``enable_polling`` to ``False`` in the configuration.
If you want an out-of-the-box solution with a Firmata-compatible firmware,
you may consider using the :class:`platypush.plugin.arduino.ArduinoPlugin`
instead.
Note that device paths on Linux may be subject to change. If you want to
create static naming associations for your devices (e.g. make sure that
your Arduino will always be symlinked to ``/dev/arduino`` instead of
``/dev/ttyUSB<n>``), you may consider creating `static mappings through
udev
<https://dev.to/enbis/how-udev-rules-can-help-us-to-recognize-a-usb-to-serial-device-over-dev-tty-interface-pbk>`_.
Requires:
* **pyserial** (``pip install pyserial``)
Triggers:
* :class:`platypush.message.event.sensor.SensorDataAboveThresholdEvent`
* :class:`platypush.message.event.sensor.SensorDataBelowThresholdEvent`
* :class:`platypush.message.event.sensor.SensorDataChangeEvent`
""" """
def __init__(self, device=None, baud_rate=9600, **kwargs): _default_lock_timeout: float = 2.0
def __init__(
self,
device: Optional[str] = None,
baud_rate: int = 9600,
max_size: int = 1 << 19,
timeout: float = _default_lock_timeout,
enable_polling: bool = True,
poll_interval: float = 0.1,
**kwargs,
):
""" """
:param device: Device path (e.g. ``/dev/ttyUSB0`` or ``/dev/ttyACM0``) :param device: Device path (e.g. ``/dev/ttyUSB0`` or ``/dev/ttyACM0``)
:type device: str
:param baud_rate: Serial baud rate (default: 9600) :param baud_rate: Serial baud rate (default: 9600)
:type baud_rate: int :param max_size: Maximum size of a JSON payload (default: 512 KB). The
plugin will keep reading bytes from the wire until it can form a
valid JSON payload, so this upper limit is required to prevent the
integration from listening forever and dumping garbage in memory.
:param timeout: This integration will ensure that only one
reader/writer can access the serial device at the time, in order to
prevent mixing up bytes in the response. This value specifies how
long we should wait for a pending action to terminate when we try
to run a new action. Default: 2 seconds.
:param enable_polling: If ``False``, the plugin will not poll the
device for updates. This can be the case if you want to
programmatically interface with the device via the :meth:`.read`
and :meth:`.write` methods instead of polling for updates in JSON
format.
:param poll_interval: How often (in seconds) we should poll the device
for new data. Since we are reading JSON data from a serial
interface whenever it's ready, the default here can be quite low
(default: 0.1 seconds).
""" """
super().__init__(poll_interval=poll_interval, **kwargs)
super().__init__(**kwargs)
self.device = device self.device = device
self.baud_rate = baud_rate self.baud_rate = baud_rate
self.serial = None self.serial = None
self.serial_lock = threading.Lock() self.serial_lock = threading.RLock()
self.last_measurement = None self.last_data: dict = {}
self._max_size = max_size
self._timeout = timeout
self._enable_polling = enable_polling
def _read_json(self, serial_port): def _read_json(
self,
serial_port: Serial,
max_size: Optional[int] = None,
) -> str:
"""
Reads a JSON payload from the wire. It counts the number of curly
brackets detected, ignoring everything before the first curly bracket,
and it stops when the processed payload has balanced curly brackets -
i.e. it can be mapped to JSON.
:param serial_port: Serial connection.
:param max_size: Default ``max_size`` override.
"""
n_brackets = 0 n_brackets = 0
is_escaped_ch = False is_escaped_ch = False
parse_start = False parse_start = False
output = bytes() output = bytes()
max_size = max_size or self._max_size
while True: while True:
assert len(output) <= max_size, (
'Maximum allowed size exceeded while reading from the device: '
f'read {len(output)} bytes'
)
ch = serial_port.read() ch = serial_port.read()
if not ch: if not ch:
break break
@ -70,15 +165,27 @@ class SerialPlugin(SensorPlugin):
return output.decode().strip() return output.decode().strip()
def _get_serial(self, device=None, baud_rate=None, reset=False): def __get_serial(
self,
device: Optional[str] = None,
baud_rate: Optional[int] = None,
reset: bool = False,
) -> Serial:
"""
Return a ``Serial`` connection object to the given device.
:param device: Default device path override.
:param baud_rate: Default baud rate override.
:param reset: By default, if a connection to the device is already open
then the current object will be returned. If ``reset=True``, the
connection will be reset and a new one will be created instead.
"""
if not device: if not device:
if not self.device: assert self.device, 'No device specified nor default device configured'
raise RuntimeError('No device specified nor default device configured')
device = self.device device = self.device
if baud_rate is None: if baud_rate is None:
if self.baud_rate is None: assert self.baud_rate, 'No baud_rate specified nor default configured'
raise RuntimeError('No baud_rate specified nor default configured')
baud_rate = self.baud_rate baud_rate = self.baud_rate
if self.serial: if self.serial:
@ -87,192 +194,240 @@ class SerialPlugin(SensorPlugin):
self._close_serial() self._close_serial()
self.serial = serial.Serial(device, baud_rate) self.serial = Serial(device, baud_rate)
return self.serial return self.serial
def _get_serial(
self,
device: Optional[str] = None,
baud_rate: Optional[int] = None,
) -> Serial:
"""
Return a ``Serial`` connection object to the given device.
:param device: Default device path override.
:param baud_rate: Default baud rate override.
:param reset: By default, if a connection to the device is already open
then the current object will be returned. If ``reset=True``, the
connection will be reset and a new one will be created instead.
"""
try:
return self.__get_serial(device, baud_rate)
except AssertionError as e:
raise e
except Exception as e:
self.logger.debug(e)
self.wait_stop(1)
return self.__get_serial(device=device, baud_rate=baud_rate, reset=True)
def _close_serial(self): def _close_serial(self):
"""
Close the serial connection if it's currently open.
"""
if self.serial: if self.serial:
try: try:
self.serial.close() self.serial.close()
self.serial = None
except Exception as e: except Exception as e:
self.logger.warning('Error while closing serial communication: {}') self.logger.warning('Error while closing serial communication: %s', e)
self.logger.exception(e) self.logger.exception(e)
@action self.serial = None
def get_measurement(self, device=None, baud_rate=None):
def _get_device_and_baud_rate(
self, device: Optional[str] = None, baud_rate: Optional[int] = None
) -> _DeviceAndRate:
""" """
Reads JSON data from the serial device and returns it as a message Gets the device path and baud rate from the given device and baud rate
if set, or it falls back on the default configured ones.
:param device: Device path (default: default configured device) :raise AssertionError: If neither ``device`` nor ``baud_rate`` is set
:type device: str nor configured.
:param baud_rate: Baud rate (default: default configured baud_rate)
:type baud_rate: int
""" """
if not device: if not device:
if not self.device: assert (
raise RuntimeError('No device specified nor default device configured') self.device
), 'No device specified and a default one is not configured'
device = self.device device = self.device
if baud_rate is None: if baud_rate is None:
if self.baud_rate is None: assert (
raise RuntimeError('No baud_rate specified nor default configured') self.baud_rate is not None
), 'No baud_rate specified nor a default value is configured'
baud_rate = self.baud_rate baud_rate = self.baud_rate
return _DeviceAndRate(device, baud_rate)
@override
@action
def get_measurement(
self,
*_,
device: Optional[str] = None,
baud_rate: Optional[int] = None,
**__,
) -> Dict[str, Numeric]:
"""
Reads JSON data from the serial device and returns it as a message
:param device: Device path (default: default configured device).
:param baud_rate: Baud rate (default: default configured baud_rate).
"""
device, baud_rate = self._get_device_and_baud_rate(device, baud_rate)
data = None data = None
try: with get_lock(self.serial_lock, timeout=self._timeout) as serial_available:
serial_available = self.serial_lock.acquire(timeout=2)
if serial_available: if serial_available:
try:
ser = self._get_serial(device=device, baud_rate=baud_rate) ser = self._get_serial(device=device, baud_rate=baud_rate)
except Exception as e:
self.logger.debug(e)
time.sleep(1)
ser = self._get_serial(device=device, baud_rate=baud_rate, reset=True)
data = self._read_json(ser) data = self._read_json(ser)
try: try:
data = json.loads(data) data = dict(json.loads(data))
except (ValueError, TypeError): except (ValueError, TypeError) as e:
self.logger.warning('Invalid JSON message from {}: {}'.format(self.device, data)) raise AssertionError(
f'Invalid JSON message from {device}: {e}. Message: {data}'
) from e
else: else:
data = self.last_measurement data = self.last_data
finally:
try:
self.serial_lock.release()
except Exception as e:
self.logger.debug(e)
if data: if data:
self.last_measurement = data self.last_data = data
return data return data
@action @action
def read(self, device=None, baud_rate=None, size=None, end=None): def read(
self,
device: Optional[str] = None,
baud_rate: Optional[int] = None,
size: Optional[int] = None,
end: Optional[Union[int, str]] = None,
binary: bool = False,
) -> str:
""" """
Reads raw data from the serial device Reads raw data from the serial device
:param device: Device to read (default: default configured device) :param device: Device to read (default: default configured device)
:type device: str
:param baud_rate: Baud rate (default: default configured baud_rate) :param baud_rate: Baud rate (default: default configured baud_rate)
:type baud_rate: int
:param size: Number of bytes to read :param size: Number of bytes to read
:type size: int :param end: End of message, as a character or bytecode
:param binary: If set to ``True``, then the serial output will be
:param end: End of message byte or character interpreted as binary data and a base64-encoded representation will
:type end: int, bytes or str be returned. Otherwise, the output will be interpreted as a UTF-8
encoded string.
:return: The read message as a UTF-8 string if ``binary=False``,
otherwise as a base64-encoded string.
""" """
if not device: device, baud_rate = self._get_device_and_baud_rate(device, baud_rate)
if not self.device: assert not (
raise RuntimeError('No device specified nor default device configured') (size is None and end is None) or (size is not None and end is not None)
device = self.device ), 'Either size or end must be specified'
if baud_rate is None: assert not (
if self.baud_rate is None: end and isinstance(end, str) and len(end) > 1
raise RuntimeError('No baud_rate specified nor default configured') ), 'The serial end must be a single character, not a string'
baud_rate = self.baud_rate
if (size is None and end is None) or (size is not None and end is not None):
raise RuntimeError('Either size or end must be specified')
if end and isinstance(end, str) and len(end) > 1:
raise RuntimeError('The serial end must be a single character, not a string')
data = bytes() data = bytes()
try: with get_lock(self.serial_lock, timeout=self._timeout) as serial_available:
serial_available = self.serial_lock.acquire(timeout=2) assert serial_available, 'Serial read timed out'
if serial_available:
try:
ser = self._get_serial(device=device, baud_rate=baud_rate)
except Exception as e:
self.logger.debug(e)
time.sleep(1)
ser = self._get_serial(device=device, baud_rate=baud_rate, reset=True)
ser = self._get_serial(device=device, baud_rate=baud_rate)
if size is not None: if size is not None:
for _ in range(0, size): for _ in range(0, size):
data += ser.read() data += ser.read()
elif end is not None: elif end is not None:
if isinstance(end, str): end_byte = end.encode() if isinstance(end, str) else bytes([end])
end = end.encode()
ch = None ch = None
while ch != end:
ch = ser.read()
while ch != end_byte:
ch = ser.read()
if ch != end: if ch != end:
data += ch data += ch
else:
self.logger.warning('Serial read timeout')
finally:
try:
self.serial_lock.release()
except Exception as e:
self.logger.debug(e)
try: return base64.b64encode(data).decode() if binary else data.decode('utf-8')
data = data.decode()
except (ValueError, TypeError):
data = base64.encodebytes(data)
return data
@action @action
def write(self, data, device=None, baud_rate=None): def write(
self,
data: Union[str, dict, list, bytes, bytearray],
device: Optional[str] = None,
baud_rate: Optional[int] = None,
binary: bool = False,
):
""" """
Writes data to the serial device. Writes data to the serial device.
:param device: Device to write (default: default configured device) :param device: Device to write (default: default configured device).
:type device: str :param baud_rate: Baud rate (default: default configured baud_rate).
:param data: Data to send to the serial device. It can be any of the following:
:param baud_rate: Baud rate (default: default configured baud_rate) - A UTF-8 string
:type baud_rate: int - A base64-encoded string (if ``binary=True``)
- A dictionary/list that will be encoded as JSON
- A bytes/bytearray sequence
:param data: Data to send to the serial device :param binary: If ``True``, then the message is either a
:type data: str, bytes or dict. If dict, it will be serialized as JSON. bytes/bytearray sequence or a base64-encoded string.
""" """
if not device: device, baud_rate = self._get_device_and_baud_rate(device, baud_rate)
if not self.device: if isinstance(data, (dict, list)):
raise RuntimeError('No device specified nor default device configured')
device = self.device
if baud_rate is None:
if self.baud_rate is None:
raise RuntimeError('No baud_rate specified nor default configured')
baud_rate = self.baud_rate
if isinstance(data, dict):
data = json.dumps(data) data = json.dumps(data)
if isinstance(data, str): if isinstance(data, str):
data = data.encode('utf-8') data = base64.b64decode(data) if binary else data.encode('utf-8')
try: data = bytes(data)
serial_available = self.serial_lock.acquire(timeout=2) with get_lock(self.serial_lock, timeout=self._timeout) as serial_available:
if serial_available: assert serial_available, 'Could not acquire the device lock'
try:
ser = self._get_serial(device=device, baud_rate=baud_rate) ser = self._get_serial(device=device, baud_rate=baud_rate)
except Exception as e: self.logger.info('Writing %d bytes to %s', len(data), device)
self.logger.debug(e)
time.sleep(1)
ser = self._get_serial(device=device, baud_rate=baud_rate, reset=True)
self.logger.info('Writing {} to {}'.format(data, self.device))
ser.write(data) ser.write(data)
finally:
@override
def transform_entities(self, entities: Dict[str, Numeric]) -> List[Device]:
transformed_entities = []
for k, v in entities.items():
sensor_id = f'serial:{k}'
try: try:
self.serial_lock.release() value = float(v)
except Exception as e: entity_type = NumericSensor
self.logger.debug(e) except (TypeError, ValueError):
value = v
entity_type = RawSensor
transformed_entities.append(
entity_type(
id=sensor_id,
name=k,
value=value,
)
)
return [
Device(
id='serial',
name=self.device,
children=transformed_entities,
)
]
@override
def main(self):
if not self._enable_polling:
# If the polling is disabled, we don't need to do anything here
self.wait_stop()
return
super().main()
@override
def stop(self):
super().stop()
self._close_serial()
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et:

View file

@ -1,6 +1,8 @@
manifest: manifest:
events: {} events:
- platypush.message.event.sensor.SensorDataChangeEvent:
install: install:
pip: [] pip:
- pyserial
package: platypush.plugins.serial package: platypush.plugins.serial
type: plugin type: plugin

View file

@ -5,6 +5,7 @@ import hashlib
import importlib import importlib
import inspect import inspect
import logging import logging
from multiprocessing import Lock as PLock
import os import os
import pathlib import pathlib
import re import re
@ -12,13 +13,15 @@ import signal
import socket import socket
import ssl import ssl
import urllib.request import urllib.request
from typing import Optional, Tuple, Union from threading import Lock as TLock
from typing import Generator, Optional, Tuple, Union
from dateutil import parser, tz from dateutil import parser, tz
from redis import Redis from redis import Redis
from rsa.key import PublicKey, PrivateKey, newkeys from rsa.key import PublicKey, PrivateKey, newkeys
logger = logging.getLogger('utils') logger = logging.getLogger('utils')
Lock = Union[PLock, TLock] # type: ignore
def get_module_and_method_from_action(action): def get_module_and_method_from_action(action):
@ -363,7 +366,7 @@ def get_mime_type(resource: str) -> Optional[str]:
return response.info().get_content_type() return response.info().get_content_type()
else: else:
if hasattr(magic, 'detect_from_filename'): if hasattr(magic, 'detect_from_filename'):
mime = magic.detect_from_filename(resource) mime = magic.detect_from_filename(resource) # type: ignore
elif hasattr(magic, 'from_file'): elif hasattr(magic, 'from_file'):
mime = magic.from_file(resource, mime=True) mime = magic.from_file(resource, mime=True)
else: else:
@ -372,7 +375,7 @@ def get_mime_type(resource: str) -> Optional[str]:
) )
if mime: if mime:
return mime.mime_type if hasattr(mime, 'mime_type') else mime return mime.mime_type if hasattr(mime, 'mime_type') else mime # type: ignore
return None return None
@ -559,4 +562,29 @@ def to_datetime(t: Union[str, int, float, datetime.datetime]) -> datetime.dateti
return t return t
@contextlib.contextmanager
def get_lock(
lock: Lock, timeout: Optional[float] = None
) -> Generator[bool, None, None]:
"""
Get a lock with an optional timeout through a context manager construct:
>>> from threading import Lock
>>> lock = Lock()
>>> with get_lock(lock, timeout=2):
>>> ...
"""
kwargs = {'timeout': timeout} if timeout else {}
result = lock.acquire(**kwargs)
try:
if not result:
raise TimeoutError()
yield result
finally:
if result:
lock.release()
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et:

View file

@ -1,7 +1,8 @@
import os import os
import requests
from typing import Optional from typing import Optional
import requests
from platypush.message import Message from platypush.message import Message
from platypush.message.response import Response from platypush.message.response import Response
from platypush.utils import set_timeout, clear_timeout from platypush.utils import set_timeout, clear_timeout
@ -36,12 +37,19 @@ class TimeoutException(RuntimeError):
""" """
Exception raised in case of timeout. Exception raised in case of timeout.
""" """
def __init__(self, msg: str = 'Timeout'): def __init__(self, msg: str = 'Timeout'):
self.msg = msg self.msg = msg
def send_request(action: str, timeout: Optional[float] = None, args: Optional[dict] = None, def send_request(
parse_json: bool = True, authenticate: bool = True, **kwargs): action: str,
timeout: Optional[float] = None,
args: Optional[dict] = None,
parse_json: bool = True,
authenticate: bool = True,
**kwargs
):
if not timeout: if not timeout:
timeout = request_timeout timeout = request_timeout
if not args: if not args:
@ -56,7 +64,8 @@ def send_request(action: str, timeout: Optional[float] = None, args: Optional[di
'type': 'request', 'type': 'request',
'action': action, 'action': action,
'args': args, 'args': args,
}, **kwargs },
**kwargs
) )
clear_timeout() clear_timeout()
@ -71,23 +80,33 @@ def register_user(username: Optional[str] = None, password: Optional[str] = None
username = test_user username = test_user
password = test_pass password = test_pass
set_timeout(seconds=request_timeout, on_timeout=on_timeout('User registration response timed out')) set_timeout(
response = requests.post('{base_url}/register?redirect={base_url}/'.format(base_url=base_url), data={ seconds=request_timeout,
on_timeout=on_timeout('User registration response timed out'),
)
response = requests.post(
'{base_url}/register?redirect={base_url}/'.format(base_url=base_url),
data={
'username': username, 'username': username,
'password': password, 'password': password,
'confirm_password': password, 'confirm_password': password,
}) },
)
clear_timeout() clear_timeout()
return response return response
def on_timeout(msg): def on_timeout(msg):
def _f(): raise TimeoutException(msg) def _f():
raise TimeoutException(msg)
return _f return _f
def parse_response(response): def parse_response(response):
response = Message.build(response.json()) response = Message.build(response.json())
assert isinstance(response, Response), 'Expected Response type, got {}'.format(response.__class__.__name__) assert isinstance(response, Response), 'Expected Response type, got {}'.format(
response.__class__.__name__
)
return response return response