Rewritten `serial` plugin.

`backend.serial` has been removed and the polling logic merged into the
`serial` plugin.

The `serial` plugin now supports the new entity engine as well.
This commit is contained in:
Fabio Manganiello 2023-03-28 15:26:45 +02:00
parent 4f15758de9
commit 1efaff878e
Signed by: blacklight
GPG Key ID: D90FBA7F76362774
5 changed files with 371 additions and 188 deletions

View File

@ -1,27 +0,0 @@
from platypush.backend.sensor import SensorBackend
class SensorSerialBackend(SensorBackend):
"""
This backend listens for new events from sensors connected through a serial
interface (like Arduino) acting as a wrapper for the ``serial`` plugin.
Requires:
* The :mod:`platypush.plugins.serial` plugin configured
Triggers:
* :class:`platypush.message.event.sensor.SensorDataChangeEvent` if the measurements of a sensor have changed
* :class:`platypush.message.event.sensor.SensorDataAboveThresholdEvent` if the measurements of a sensor have
gone above a configured threshold
* :class:`platypush.message.event.sensor.SensorDataBelowThresholdEvent` if the measurements of a sensor have
gone below a configured threshold
"""
def __init__(self, **kwargs):
super().__init__(plugin='serial', **kwargs)
# vim:sw=4:ts=4:et:

View File

@ -1,12 +0,0 @@
manifest:
events:
platypush.message.event.sensor.SensorDataAboveThresholdEvent: if the measurements
of a sensor havegone above a configured threshold
platypush.message.event.sensor.SensorDataBelowThresholdEvent: if the measurements
of a sensor havegone below a configured threshold
platypush.message.event.sensor.SensorDataChangeEvent: if the measurements of a
sensor have changed
install:
pip: []
package: platypush.backend.sensor.serial
type: backend

View File

@ -1,47 +1,137 @@
import base64
from collections import namedtuple
from collections.abc import Collection
import json
import serial
from typing import Any, List, Optional, Tuple, Union
import threading
import time
from typing_extensions import override
from platypush.plugins import action
from platypush.plugins.sensor import SensorPlugin
from serial import Serial
from platypush.context import get_bus
from platypush.entities.devices import Device
from platypush.entities.sensors import RawSensor, NumericSensor
from platypush.entities.managers.sensors import SensorEntityManager
from platypush.message.event.sensor import SensorDataChangeEvent
from platypush.plugins import RunnablePlugin, action
from platypush.utils import get_lock, get_plugin_name_by_class
_DeviceAndRate = namedtuple('_DeviceAndRate', ['device', 'baud_rate'])
class SerialPlugin(SensorPlugin):
class SerialPlugin(RunnablePlugin, SensorEntityManager):
"""
The serial plugin can read data from a serial device, as long as the serial
device returns a JSON. You can use this plugin to interact for example with
some sensors connected through an Arduino. Just make sure that the code on
your serial device returns JSON values. If you're using an Arduino or any
ATMega compatible device, take a look at
https://github.com/bblanchon/ArduinoJson.
The serial plugin can read data from a serial device.
If the device returns a JSON string, then that string will be parsed for
individual values. For example:
.. code-block:: json
{"temperature": 25.0, "humidity": 15.0}
If the serial device returns such a value, then ``temperature`` and
``humidity`` will be parsed as separate entities with the same names as the
keys provided on the payload.
The JSON option is a good choice if you have an Arduino/ESP-like device
whose code you can control, as it allows to easily send data to Platypush
in a simple key-value format. The use-case would be that of an Arduino/ESP
device that pushes data on the wire, and this integration would then listen
for updates.
Alternatively, you can also use this integration in a more traditional way
through the :meth:`.read` and :meth:`.write` methods to read and write data
to the device. In such a case, you may want to disable the "smart polling"
by setting ``enable_polling`` to ``False`` in the configuration.
If you want an out-of-the-box solution with a Firmata-compatible firmware,
you may consider using the :class:`platypush.plugin.arduino.ArduinoPlugin`
instead.
Note that device paths on Linux may be subject to change. If you want to
create static naming associations for your devices (e.g. make sure that
your Arduino will always be symlinked to ``/dev/arduino`` instead of
``/dev/ttyUSB<n>``), you may consider creating `static mappings through
udev
<https://dev.to/enbis/how-udev-rules-can-help-us-to-recognize-a-usb-to-serial-device-over-dev-tty-interface-pbk>`_.
Requires:
* **pyserial** (``pip install pyserial``)
Triggers:
* :class:`platypush.message.event.sensor.SensorDataChangeEvent`
"""
def __init__(self, device=None, baud_rate=9600, **kwargs):
_default_lock_timeout: float = 2.0
def __init__(
self,
device: Optional[str] = None,
baud_rate: int = 9600,
max_size: int = 1 << 19,
timeout: float = _default_lock_timeout,
enable_polling: bool = True,
**kwargs,
):
"""
:param device: Device path (e.g. ``/dev/ttyUSB0`` or ``/dev/ttyACM0``)
:type device: str
:param baud_rate: Serial baud rate (default: 9600)
:type baud_rate: int
:param max_size: Maximum size of a JSON payload (default: 512 KB). The
plugin will keep reading bytes from the wire until it can form a
valid JSON payload, so this upper limit is required to prevent the
integration from listening forever and dumping garbage in memory.
:param timeout: This integration will ensure that only one
reader/writer can access the serial device at the time, in order to
prevent mixing up bytes in the response. This value specifies how
long we should wait for a pending action to terminate when we try
to run a new action. Default: 2 seconds.
:param enable_polling: If ``False``, the plugin will not poll the
device for updates. This can be the case if you want to
programmatically interface with the device via the :meth:`.read`
and :meth:`.write` methods instead of polling for updates in JSON
format.
"""
super().__init__(**kwargs)
self.device = device
self.baud_rate = baud_rate
self.serial = None
self.serial_lock = threading.Lock()
self.last_measurement = None
self.serial_lock = threading.RLock()
self.last_data: dict = {}
self._max_size = max_size
self._timeout = timeout
self._enable_polling = enable_polling
def _read_json(self, serial_port):
def _read_json(
self,
serial_port: Serial,
max_size: Optional[int] = None,
) -> str:
"""
Reads a JSON payload from the wire. It counts the number of curly
brackets detected, ignoring everything before the first curly bracket,
and it stops when the processed payload has balanced curly brackets -
i.e. it can be mapped to JSON.
:param serial_port: Serial connection.
:param max_size: Default ``max_size`` override.
"""
n_brackets = 0
is_escaped_ch = False
parse_start = False
output = bytes()
max_size = max_size or self._max_size
while True:
assert len(output) <= max_size, (
'Maximum allowed size exceeded while reading from the device: '
f'read {len(output)} bytes'
)
ch = serial_port.read()
if not ch:
break
@ -70,15 +160,27 @@ class SerialPlugin(SensorPlugin):
return output.decode().strip()
def _get_serial(self, device=None, baud_rate=None, reset=False):
def __get_serial(
self,
device: Optional[str] = None,
baud_rate: Optional[int] = None,
reset: bool = False,
) -> Serial:
"""
Return a ``Serial`` connection object to the given device.
:param device: Default device path override.
:param baud_rate: Default baud rate override.
:param reset: By default, if a connection to the device is already open
then the current object will be returned. If ``reset=True``, the
connection will be reset and a new one will be created instead.
"""
if not device:
if not self.device:
raise RuntimeError('No device specified nor default device configured')
assert self.device, 'No device specified nor default device configured'
device = self.device
if baud_rate is None:
if self.baud_rate is None:
raise RuntimeError('No baud_rate specified nor default configured')
assert self.baud_rate, 'No baud_rate specified nor default configured'
baud_rate = self.baud_rate
if self.serial:
@ -87,192 +189,284 @@ class SerialPlugin(SensorPlugin):
self._close_serial()
self.serial = serial.Serial(device, baud_rate)
self.serial = Serial(device, baud_rate)
return self.serial
def _get_serial(
self,
device: Optional[str] = None,
baud_rate: Optional[int] = None,
) -> Serial:
"""
Return a ``Serial`` connection object to the given device.
:param device: Default device path override.
:param baud_rate: Default baud rate override.
:param reset: By default, if a connection to the device is already open
then the current object will be returned. If ``reset=True``, the
connection will be reset and a new one will be created instead.
"""
try:
return self.__get_serial(device, baud_rate)
except AssertionError as e:
raise e
except Exception as e:
self.logger.debug(e)
self.wait_stop(1)
return self.__get_serial(device=device, baud_rate=baud_rate, reset=True)
def _close_serial(self):
"""
Close the serial connection if it's currently open.
"""
if self.serial:
try:
self.serial.close()
self.serial = None
except Exception as e:
self.logger.warning('Error while closing serial communication: {}')
self.logger.warning('Error while closing serial communication: %s', e)
self.logger.exception(e)
@action
def get_measurement(self, device=None, baud_rate=None):
self.serial = None
def _get_device_and_baud_rate(
self, device: Optional[str] = None, baud_rate: Optional[int] = None
) -> _DeviceAndRate:
"""
Reads JSON data from the serial device and returns it as a message
Gets the device path and baud rate from the given device and baud rate
if set, or it falls back on the default configured ones.
:param device: Device path (default: default configured device)
:type device: str
:param baud_rate: Baud rate (default: default configured baud_rate)
:type baud_rate: int
:raise AssertionError: If neither ``device`` nor ``baud_rate`` is set
nor configured.
"""
if not device:
if not self.device:
raise RuntimeError('No device specified nor default device configured')
assert (
self.device
), 'No device specified and a default one is not configured'
device = self.device
if baud_rate is None:
if self.baud_rate is None:
raise RuntimeError('No baud_rate specified nor default configured')
assert (
self.baud_rate is not None
), 'No baud_rate specified nor a default value is configured'
baud_rate = self.baud_rate
return _DeviceAndRate(device, baud_rate)
@override
@action
def status(
self,
*_,
device: Optional[str] = None,
baud_rate: Optional[int] = None,
publish_entities: bool = True,
**__,
) -> dict:
"""
Reads JSON data from the serial device and returns it as a message
:param device: Device path (default: default configured device).
:param baud_rate: Baud rate (default: default configured baud_rate).
:param publish_entities: Whether to publish an event with the newly
read values (default: True).
"""
device, baud_rate = self._get_device_and_baud_rate(device, baud_rate)
data = None
try:
serial_available = self.serial_lock.acquire(timeout=2)
with get_lock(self.serial_lock, timeout=self._timeout) as serial_available:
if serial_available:
try:
ser = self._get_serial(device=device, baud_rate=baud_rate)
except Exception as e:
self.logger.debug(e)
time.sleep(1)
ser = self._get_serial(device=device, baud_rate=baud_rate, reset=True)
ser = self._get_serial(device=device, baud_rate=baud_rate)
data = self._read_json(ser)
try:
data = json.loads(data)
except (ValueError, TypeError):
self.logger.warning('Invalid JSON message from {}: {}'.format(self.device, data))
data = dict(json.loads(data))
except (ValueError, TypeError) as e:
raise AssertionError(
f'Invalid JSON message from {device}: {e}. Message: {data}'
) from e
else:
data = self.last_measurement
finally:
try:
self.serial_lock.release()
except Exception as e:
self.logger.debug(e)
data = self.last_data
if data:
self.last_measurement = data
self.last_data = data
if publish_entities:
self.publish_entities(self.last_data.items())
return data
@action
def read(self, device=None, baud_rate=None, size=None, end=None):
def get_measurement(
self, device: Optional[str] = None, baud_rate: Optional[int] = None
):
"""
(Deprecated) alias for :meth:`.status`.
"""
return self.status(device, baud_rate)
@action
def read(
self,
device: Optional[str] = None,
baud_rate: Optional[int] = None,
size: Optional[int] = None,
end: Optional[Union[int, str]] = None,
) -> str:
"""
Reads raw data from the serial device
:param device: Device to read (default: default configured device)
:type device: str
:param baud_rate: Baud rate (default: default configured baud_rate)
:type baud_rate: int
:param size: Number of bytes to read
:type size: int
:param end: End of message byte or character
:type end: int, bytes or str
:param end: End of message, as a character or bytecode
:return: The read message as a string if it's a valid UTF-8 string,
otherwise as a base64-encoded string.
"""
if not device:
if not self.device:
raise RuntimeError('No device specified nor default device configured')
device = self.device
device, baud_rate = self._get_device_and_baud_rate(device, baud_rate)
assert not (
(size is None and end is None) or (size is not None and end is not None)
), 'Either size or end must be specified'
if baud_rate is None:
if self.baud_rate is None:
raise RuntimeError('No baud_rate specified nor default configured')
baud_rate = self.baud_rate
if (size is None and end is None) or (size is not None and end is not None):
raise RuntimeError('Either size or end must be specified')
if end and isinstance(end, str) and len(end) > 1:
raise RuntimeError('The serial end must be a single character, not a string')
assert not (
end and isinstance(end, str) and len(end) > 1
), 'The serial end must be a single character, not a string'
data = bytes()
try:
serial_available = self.serial_lock.acquire(timeout=2)
if serial_available:
try:
ser = self._get_serial(device=device, baud_rate=baud_rate)
except Exception as e:
self.logger.debug(e)
time.sleep(1)
ser = self._get_serial(device=device, baud_rate=baud_rate, reset=True)
with get_lock(self.serial_lock, timeout=self._timeout) as serial_available:
assert serial_available, 'Serial read timed out'
if size is not None:
for _ in range(0, size):
data += ser.read()
elif end is not None:
if isinstance(end, str):
end = end.encode()
ser = self._get_serial(device=device, baud_rate=baud_rate)
if size is not None:
for _ in range(0, size):
data += ser.read()
elif end is not None:
end_byte = end.encode() if isinstance(end, str) else bytes([end])
ch = None
ch = None
while ch != end:
ch = ser.read()
if ch != end:
data += ch
else:
self.logger.warning('Serial read timeout')
finally:
try:
self.serial_lock.release()
except Exception as e:
self.logger.debug(e)
while ch != end_byte:
ch = ser.read()
if ch != end:
data += ch
try:
data = data.decode()
data = data.decode('utf-8')
except (ValueError, TypeError):
data = base64.encodebytes(data)
data = base64.b64encode(data).decode()
return data
@action
def write(self, data, device=None, baud_rate=None):
def write(
self,
data: Union[str, dict, list, bytes, bytearray],
device: Optional[str] = None,
baud_rate: Optional[int] = None,
binary: bool = False,
):
"""
Writes data to the serial device.
:param device: Device to write (default: default configured device)
:type device: str
:param device: Device to write (default: default configured device).
:param baud_rate: Baud rate (default: default configured baud_rate).
:param data: Data to send to the serial device. It can be any of the following:
:param baud_rate: Baud rate (default: default configured baud_rate)
:type baud_rate: int
- A UTF-8 string
- A base64-encoded string (if ``binary=True``)
- A dictionary/list that will be encoded as JSON
- A bytes/bytearray sequence
:param data: Data to send to the serial device
:type data: str, bytes or dict. If dict, it will be serialized as JSON.
:param binary: If ``True``, then the message is either a
bytes/bytearray sequence or a base64-encoded string.
"""
if not device:
if not self.device:
raise RuntimeError('No device specified nor default device configured')
device = self.device
if baud_rate is None:
if self.baud_rate is None:
raise RuntimeError('No baud_rate specified nor default configured')
baud_rate = self.baud_rate
if isinstance(data, dict):
device, baud_rate = self._get_device_and_baud_rate(device, baud_rate)
if isinstance(data, (dict, list)):
data = json.dumps(data)
if isinstance(data, str):
data = data.encode('utf-8')
data = base64.b64decode(data) if binary else data.encode('utf-8')
try:
serial_available = self.serial_lock.acquire(timeout=2)
if serial_available:
try:
ser = self._get_serial(device=device, baud_rate=baud_rate)
except Exception as e:
self.logger.debug(e)
time.sleep(1)
ser = self._get_serial(device=device, baud_rate=baud_rate, reset=True)
data = bytes(data)
with get_lock(self.serial_lock, timeout=self._timeout) as serial_available:
assert serial_available, 'Could not acquire the device lock'
ser = self._get_serial(device=device, baud_rate=baud_rate)
self.logger.info('Writing %d bytes to %s', len(data), device)
ser.write(data)
self.logger.info('Writing {} to {}'.format(data, self.device))
ser.write(data)
finally:
@override
def transform_entities(self, entities: Tuple[str, Any]) -> List[Device]:
transformed_entities = []
for k, v in entities:
sensor_id = f'serial:{k}'
try:
self.serial_lock.release()
value = float(v)
entity_type = NumericSensor
except (TypeError, ValueError):
value = v
entity_type = RawSensor
transformed_entities.append(
entity_type(
id=sensor_id,
name=k,
value=value,
)
)
return [
Device(
id='serial',
name=self.device,
children=transformed_entities,
)
]
@override
def publish_entities(self, entities: Collection[Tuple[str, Any]], *_, **__):
ret = super().publish_entities(entities)
get_bus().post(
SensorDataChangeEvent(
data=dict(entities),
source=get_plugin_name_by_class(self.__class__),
)
)
return ret
@override
def main(self):
if not self._enable_polling:
# If the polling is disabled, we don't need to do anything here
self.wait_stop()
return
while not self.should_stop():
last_data = self.last_data.copy()
try:
new_data: dict = self.status(publish_entities=False).output # type: ignore
except Exception as e:
self.logger.debug(e)
self.logger.warning('Could not update the status: %s', e)
self.wait_stop(1)
continue
updated_entries = {
k: v for k, v in new_data.items() if v != last_data.get(k)
}
self.publish_entities(updated_entries.items())
self.last_data = {
**last_data,
**new_data,
}
@override
def stop(self):
super().stop()
self._close_serial()
# vim:sw=4:ts=4:et:

View File

@ -1,6 +1,8 @@
manifest:
events: {}
events:
- platypush.message.event.sensor.SensorDataChangeEvent:
install:
pip: []
pip:
- pyserial
package: platypush.plugins.serial
type: plugin

View File

@ -5,6 +5,7 @@ import hashlib
import importlib
import inspect
import logging
from multiprocessing import Lock as PLock
import os
import pathlib
import re
@ -12,13 +13,15 @@ import signal
import socket
import ssl
import urllib.request
from typing import Optional, Tuple, Union
from threading import Lock as TLock
from typing import Generator, Optional, Tuple, Union
from dateutil import parser, tz
from redis import Redis
from rsa.key import PublicKey, PrivateKey, newkeys
logger = logging.getLogger('utils')
Lock = Union[PLock, TLock] # type: ignore
def get_module_and_method_from_action(action):
@ -363,7 +366,7 @@ def get_mime_type(resource: str) -> Optional[str]:
return response.info().get_content_type()
else:
if hasattr(magic, 'detect_from_filename'):
mime = magic.detect_from_filename(resource)
mime = magic.detect_from_filename(resource) # type: ignore
elif hasattr(magic, 'from_file'):
mime = magic.from_file(resource, mime=True)
else:
@ -372,7 +375,7 @@ def get_mime_type(resource: str) -> Optional[str]:
)
if mime:
return mime.mime_type if hasattr(mime, 'mime_type') else mime
return mime.mime_type if hasattr(mime, 'mime_type') else mime # type: ignore
return None
@ -559,4 +562,27 @@ def to_datetime(t: Union[str, int, float, datetime.datetime]) -> datetime.dateti
return t
@contextlib.contextmanager
def get_lock(
lock: Lock, timeout: Optional[float] = None
) -> Generator[bool, None, None]:
"""
Get a lock with an optional timeout through a context manager construct:
>>> from threading import Lock
>>> lock = Lock()
>>> with get_lock(lock, timeout=2):
>>> ...
"""
kwargs = {'timeout': timeout} if timeout else {}
result = lock.acquire(**kwargs)
try:
yield result
finally:
if result:
lock.release()
# vim:sw=4:ts=4:et: