From 1efaff878e98116ccf09677715cecf2111ab8803 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Tue, 28 Mar 2023 15:26:45 +0200 Subject: [PATCH] Rewritten `serial` plugin. `backend.serial` has been removed and the polling logic merged into the `serial` plugin. The `serial` plugin now supports the new entity engine as well. --- platypush/backend/sensor/serial/__init__.py | 27 - platypush/backend/sensor/serial/manifest.yaml | 12 - platypush/plugins/serial/__init__.py | 482 ++++++++++++------ platypush/plugins/serial/manifest.yaml | 6 +- platypush/utils/__init__.py | 32 +- 5 files changed, 371 insertions(+), 188 deletions(-) delete mode 100644 platypush/backend/sensor/serial/__init__.py delete mode 100644 platypush/backend/sensor/serial/manifest.yaml diff --git a/platypush/backend/sensor/serial/__init__.py b/platypush/backend/sensor/serial/__init__.py deleted file mode 100644 index f6d1b3b5..00000000 --- a/platypush/backend/sensor/serial/__init__.py +++ /dev/null @@ -1,27 +0,0 @@ -from platypush.backend.sensor import SensorBackend - - -class SensorSerialBackend(SensorBackend): - """ - This backend listens for new events from sensors connected through a serial - interface (like Arduino) acting as a wrapper for the ``serial`` plugin. - - Requires: - - * The :mod:`platypush.plugins.serial` plugin configured - - Triggers: - - * :class:`platypush.message.event.sensor.SensorDataChangeEvent` if the measurements of a sensor have changed - * :class:`platypush.message.event.sensor.SensorDataAboveThresholdEvent` if the measurements of a sensor have - gone above a configured threshold - * :class:`platypush.message.event.sensor.SensorDataBelowThresholdEvent` if the measurements of a sensor have - gone below a configured threshold - - """ - - def __init__(self, **kwargs): - super().__init__(plugin='serial', **kwargs) - - -# vim:sw=4:ts=4:et: diff --git a/platypush/backend/sensor/serial/manifest.yaml b/platypush/backend/sensor/serial/manifest.yaml deleted file mode 100644 index f0d7abf4..00000000 --- a/platypush/backend/sensor/serial/manifest.yaml +++ /dev/null @@ -1,12 +0,0 @@ -manifest: - events: - platypush.message.event.sensor.SensorDataAboveThresholdEvent: if the measurements - of a sensor havegone above a configured threshold - platypush.message.event.sensor.SensorDataBelowThresholdEvent: if the measurements - of a sensor havegone below a configured threshold - platypush.message.event.sensor.SensorDataChangeEvent: if the measurements of a - sensor have changed - install: - pip: [] - package: platypush.backend.sensor.serial - type: backend diff --git a/platypush/plugins/serial/__init__.py b/platypush/plugins/serial/__init__.py index 31e3bb87..ac578e9b 100644 --- a/platypush/plugins/serial/__init__.py +++ b/platypush/plugins/serial/__init__.py @@ -1,47 +1,137 @@ import base64 +from collections import namedtuple +from collections.abc import Collection import json -import serial +from typing import Any, List, Optional, Tuple, Union import threading -import time +from typing_extensions import override -from platypush.plugins import action -from platypush.plugins.sensor import SensorPlugin +from serial import Serial + +from platypush.context import get_bus +from platypush.entities.devices import Device +from platypush.entities.sensors import RawSensor, NumericSensor +from platypush.entities.managers.sensors import SensorEntityManager +from platypush.message.event.sensor import SensorDataChangeEvent +from platypush.plugins import RunnablePlugin, action +from platypush.utils import get_lock, get_plugin_name_by_class + +_DeviceAndRate = namedtuple('_DeviceAndRate', ['device', 'baud_rate']) -class SerialPlugin(SensorPlugin): +class SerialPlugin(RunnablePlugin, SensorEntityManager): """ - The serial plugin can read data from a serial device, as long as the serial - device returns a JSON. You can use this plugin to interact for example with - some sensors connected through an Arduino. Just make sure that the code on - your serial device returns JSON values. If you're using an Arduino or any - ATMega compatible device, take a look at - https://github.com/bblanchon/ArduinoJson. + The serial plugin can read data from a serial device. + + If the device returns a JSON string, then that string will be parsed for + individual values. For example: + + .. code-block:: json + + {"temperature": 25.0, "humidity": 15.0} + + If the serial device returns such a value, then ``temperature`` and + ``humidity`` will be parsed as separate entities with the same names as the + keys provided on the payload. + + The JSON option is a good choice if you have an Arduino/ESP-like device + whose code you can control, as it allows to easily send data to Platypush + in a simple key-value format. The use-case would be that of an Arduino/ESP + device that pushes data on the wire, and this integration would then listen + for updates. + + Alternatively, you can also use this integration in a more traditional way + through the :meth:`.read` and :meth:`.write` methods to read and write data + to the device. In such a case, you may want to disable the "smart polling" + by setting ``enable_polling`` to ``False`` in the configuration. + + If you want an out-of-the-box solution with a Firmata-compatible firmware, + you may consider using the :class:`platypush.plugin.arduino.ArduinoPlugin` + instead. + + Note that device paths on Linux may be subject to change. If you want to + create static naming associations for your devices (e.g. make sure that + your Arduino will always be symlinked to ``/dev/arduino`` instead of + ``/dev/ttyUSB``), you may consider creating `static mappings through + udev + `_. + + Requires: + + * **pyserial** (``pip install pyserial``) + + Triggers: + + * :class:`platypush.message.event.sensor.SensorDataChangeEvent` + """ - def __init__(self, device=None, baud_rate=9600, **kwargs): + _default_lock_timeout: float = 2.0 + + def __init__( + self, + device: Optional[str] = None, + baud_rate: int = 9600, + max_size: int = 1 << 19, + timeout: float = _default_lock_timeout, + enable_polling: bool = True, + **kwargs, + ): """ :param device: Device path (e.g. ``/dev/ttyUSB0`` or ``/dev/ttyACM0``) - :type device: str - :param baud_rate: Serial baud rate (default: 9600) - :type baud_rate: int + :param max_size: Maximum size of a JSON payload (default: 512 KB). The + plugin will keep reading bytes from the wire until it can form a + valid JSON payload, so this upper limit is required to prevent the + integration from listening forever and dumping garbage in memory. + :param timeout: This integration will ensure that only one + reader/writer can access the serial device at the time, in order to + prevent mixing up bytes in the response. This value specifies how + long we should wait for a pending action to terminate when we try + to run a new action. Default: 2 seconds. + :param enable_polling: If ``False``, the plugin will not poll the + device for updates. This can be the case if you want to + programmatically interface with the device via the :meth:`.read` + and :meth:`.write` methods instead of polling for updates in JSON + format. """ - super().__init__(**kwargs) self.device = device self.baud_rate = baud_rate self.serial = None - self.serial_lock = threading.Lock() - self.last_measurement = None + self.serial_lock = threading.RLock() + self.last_data: dict = {} + self._max_size = max_size + self._timeout = timeout + self._enable_polling = enable_polling - def _read_json(self, serial_port): + def _read_json( + self, + serial_port: Serial, + max_size: Optional[int] = None, + ) -> str: + """ + Reads a JSON payload from the wire. It counts the number of curly + brackets detected, ignoring everything before the first curly bracket, + and it stops when the processed payload has balanced curly brackets - + i.e. it can be mapped to JSON. + + :param serial_port: Serial connection. + :param max_size: Default ``max_size`` override. + """ n_brackets = 0 is_escaped_ch = False parse_start = False output = bytes() + max_size = max_size or self._max_size while True: + assert len(output) <= max_size, ( + 'Maximum allowed size exceeded while reading from the device: ' + f'read {len(output)} bytes' + ) + ch = serial_port.read() if not ch: break @@ -70,15 +160,27 @@ class SerialPlugin(SensorPlugin): return output.decode().strip() - def _get_serial(self, device=None, baud_rate=None, reset=False): + def __get_serial( + self, + device: Optional[str] = None, + baud_rate: Optional[int] = None, + reset: bool = False, + ) -> Serial: + """ + Return a ``Serial`` connection object to the given device. + + :param device: Default device path override. + :param baud_rate: Default baud rate override. + :param reset: By default, if a connection to the device is already open + then the current object will be returned. If ``reset=True``, the + connection will be reset and a new one will be created instead. + """ if not device: - if not self.device: - raise RuntimeError('No device specified nor default device configured') + assert self.device, 'No device specified nor default device configured' device = self.device if baud_rate is None: - if self.baud_rate is None: - raise RuntimeError('No baud_rate specified nor default configured') + assert self.baud_rate, 'No baud_rate specified nor default configured' baud_rate = self.baud_rate if self.serial: @@ -87,192 +189,284 @@ class SerialPlugin(SensorPlugin): self._close_serial() - self.serial = serial.Serial(device, baud_rate) + self.serial = Serial(device, baud_rate) return self.serial + def _get_serial( + self, + device: Optional[str] = None, + baud_rate: Optional[int] = None, + ) -> Serial: + """ + Return a ``Serial`` connection object to the given device. + + :param device: Default device path override. + :param baud_rate: Default baud rate override. + :param reset: By default, if a connection to the device is already open + then the current object will be returned. If ``reset=True``, the + connection will be reset and a new one will be created instead. + """ + try: + return self.__get_serial(device, baud_rate) + except AssertionError as e: + raise e + except Exception as e: + self.logger.debug(e) + self.wait_stop(1) + return self.__get_serial(device=device, baud_rate=baud_rate, reset=True) + def _close_serial(self): + """ + Close the serial connection if it's currently open. + """ if self.serial: try: self.serial.close() - self.serial = None except Exception as e: - self.logger.warning('Error while closing serial communication: {}') + self.logger.warning('Error while closing serial communication: %s', e) self.logger.exception(e) - @action - def get_measurement(self, device=None, baud_rate=None): + self.serial = None + + def _get_device_and_baud_rate( + self, device: Optional[str] = None, baud_rate: Optional[int] = None + ) -> _DeviceAndRate: """ - Reads JSON data from the serial device and returns it as a message + Gets the device path and baud rate from the given device and baud rate + if set, or it falls back on the default configured ones. - :param device: Device path (default: default configured device) - :type device: str - - :param baud_rate: Baud rate (default: default configured baud_rate) - :type baud_rate: int + :raise AssertionError: If neither ``device`` nor ``baud_rate`` is set + nor configured. """ if not device: - if not self.device: - raise RuntimeError('No device specified nor default device configured') + assert ( + self.device + ), 'No device specified and a default one is not configured' device = self.device if baud_rate is None: - if self.baud_rate is None: - raise RuntimeError('No baud_rate specified nor default configured') + assert ( + self.baud_rate is not None + ), 'No baud_rate specified nor a default value is configured' baud_rate = self.baud_rate + return _DeviceAndRate(device, baud_rate) + + @override + @action + def status( + self, + *_, + device: Optional[str] = None, + baud_rate: Optional[int] = None, + publish_entities: bool = True, + **__, + ) -> dict: + """ + Reads JSON data from the serial device and returns it as a message + + :param device: Device path (default: default configured device). + :param baud_rate: Baud rate (default: default configured baud_rate). + :param publish_entities: Whether to publish an event with the newly + read values (default: True). + """ + + device, baud_rate = self._get_device_and_baud_rate(device, baud_rate) data = None - try: - serial_available = self.serial_lock.acquire(timeout=2) + with get_lock(self.serial_lock, timeout=self._timeout) as serial_available: if serial_available: - try: - ser = self._get_serial(device=device, baud_rate=baud_rate) - except Exception as e: - self.logger.debug(e) - time.sleep(1) - ser = self._get_serial(device=device, baud_rate=baud_rate, reset=True) - + ser = self._get_serial(device=device, baud_rate=baud_rate) data = self._read_json(ser) try: - data = json.loads(data) - except (ValueError, TypeError): - self.logger.warning('Invalid JSON message from {}: {}'.format(self.device, data)) + data = dict(json.loads(data)) + except (ValueError, TypeError) as e: + raise AssertionError( + f'Invalid JSON message from {device}: {e}. Message: {data}' + ) from e else: - data = self.last_measurement - finally: - try: - self.serial_lock.release() - except Exception as e: - self.logger.debug(e) + data = self.last_data if data: - self.last_measurement = data + self.last_data = data + + if publish_entities: + self.publish_entities(self.last_data.items()) return data @action - def read(self, device=None, baud_rate=None, size=None, end=None): + def get_measurement( + self, device: Optional[str] = None, baud_rate: Optional[int] = None + ): + """ + (Deprecated) alias for :meth:`.status`. + """ + return self.status(device, baud_rate) + + @action + def read( + self, + device: Optional[str] = None, + baud_rate: Optional[int] = None, + size: Optional[int] = None, + end: Optional[Union[int, str]] = None, + ) -> str: """ Reads raw data from the serial device :param device: Device to read (default: default configured device) - :type device: str - :param baud_rate: Baud rate (default: default configured baud_rate) - :type baud_rate: int - :param size: Number of bytes to read - :type size: int - - :param end: End of message byte or character - :type end: int, bytes or str + :param end: End of message, as a character or bytecode + :return: The read message as a string if it's a valid UTF-8 string, + otherwise as a base64-encoded string. """ - if not device: - if not self.device: - raise RuntimeError('No device specified nor default device configured') - device = self.device + device, baud_rate = self._get_device_and_baud_rate(device, baud_rate) + assert not ( + (size is None and end is None) or (size is not None and end is not None) + ), 'Either size or end must be specified' - if baud_rate is None: - if self.baud_rate is None: - raise RuntimeError('No baud_rate specified nor default configured') - baud_rate = self.baud_rate - - if (size is None and end is None) or (size is not None and end is not None): - raise RuntimeError('Either size or end must be specified') - - if end and isinstance(end, str) and len(end) > 1: - raise RuntimeError('The serial end must be a single character, not a string') + assert not ( + end and isinstance(end, str) and len(end) > 1 + ), 'The serial end must be a single character, not a string' data = bytes() - try: - serial_available = self.serial_lock.acquire(timeout=2) - if serial_available: - try: - ser = self._get_serial(device=device, baud_rate=baud_rate) - except Exception as e: - self.logger.debug(e) - time.sleep(1) - ser = self._get_serial(device=device, baud_rate=baud_rate, reset=True) + with get_lock(self.serial_lock, timeout=self._timeout) as serial_available: + assert serial_available, 'Serial read timed out' - if size is not None: - for _ in range(0, size): - data += ser.read() - elif end is not None: - if isinstance(end, str): - end = end.encode() + ser = self._get_serial(device=device, baud_rate=baud_rate) + if size is not None: + for _ in range(0, size): + data += ser.read() + elif end is not None: + end_byte = end.encode() if isinstance(end, str) else bytes([end]) + ch = None - ch = None - while ch != end: - ch = ser.read() - - if ch != end: - data += ch - else: - self.logger.warning('Serial read timeout') - finally: - try: - self.serial_lock.release() - except Exception as e: - self.logger.debug(e) + while ch != end_byte: + ch = ser.read() + if ch != end: + data += ch try: - data = data.decode() + data = data.decode('utf-8') except (ValueError, TypeError): - data = base64.encodebytes(data) + data = base64.b64encode(data).decode() return data @action - def write(self, data, device=None, baud_rate=None): + def write( + self, + data: Union[str, dict, list, bytes, bytearray], + device: Optional[str] = None, + baud_rate: Optional[int] = None, + binary: bool = False, + ): """ Writes data to the serial device. - :param device: Device to write (default: default configured device) - :type device: str + :param device: Device to write (default: default configured device). + :param baud_rate: Baud rate (default: default configured baud_rate). + :param data: Data to send to the serial device. It can be any of the following: - :param baud_rate: Baud rate (default: default configured baud_rate) - :type baud_rate: int + - A UTF-8 string + - A base64-encoded string (if ``binary=True``) + - A dictionary/list that will be encoded as JSON + - A bytes/bytearray sequence - :param data: Data to send to the serial device - :type data: str, bytes or dict. If dict, it will be serialized as JSON. + :param binary: If ``True``, then the message is either a + bytes/bytearray sequence or a base64-encoded string. """ - if not device: - if not self.device: - raise RuntimeError('No device specified nor default device configured') - device = self.device - - if baud_rate is None: - if self.baud_rate is None: - raise RuntimeError('No baud_rate specified nor default configured') - baud_rate = self.baud_rate - - if isinstance(data, dict): + device, baud_rate = self._get_device_and_baud_rate(device, baud_rate) + if isinstance(data, (dict, list)): data = json.dumps(data) if isinstance(data, str): - data = data.encode('utf-8') + data = base64.b64decode(data) if binary else data.encode('utf-8') - try: - serial_available = self.serial_lock.acquire(timeout=2) - if serial_available: - try: - ser = self._get_serial(device=device, baud_rate=baud_rate) - except Exception as e: - self.logger.debug(e) - time.sleep(1) - ser = self._get_serial(device=device, baud_rate=baud_rate, reset=True) + data = bytes(data) + with get_lock(self.serial_lock, timeout=self._timeout) as serial_available: + assert serial_available, 'Could not acquire the device lock' + ser = self._get_serial(device=device, baud_rate=baud_rate) + self.logger.info('Writing %d bytes to %s', len(data), device) + ser.write(data) - self.logger.info('Writing {} to {}'.format(data, self.device)) - ser.write(data) - finally: + @override + def transform_entities(self, entities: Tuple[str, Any]) -> List[Device]: + transformed_entities = [] + + for k, v in entities: + sensor_id = f'serial:{k}' try: - self.serial_lock.release() + value = float(v) + entity_type = NumericSensor + except (TypeError, ValueError): + value = v + entity_type = RawSensor + + transformed_entities.append( + entity_type( + id=sensor_id, + name=k, + value=value, + ) + ) + + return [ + Device( + id='serial', + name=self.device, + children=transformed_entities, + ) + ] + + @override + def publish_entities(self, entities: Collection[Tuple[str, Any]], *_, **__): + ret = super().publish_entities(entities) + get_bus().post( + SensorDataChangeEvent( + data=dict(entities), + source=get_plugin_name_by_class(self.__class__), + ) + ) + + return ret + + @override + def main(self): + if not self._enable_polling: + # If the polling is disabled, we don't need to do anything here + self.wait_stop() + return + + while not self.should_stop(): + last_data = self.last_data.copy() + try: + new_data: dict = self.status(publish_entities=False).output # type: ignore except Exception as e: - self.logger.debug(e) + self.logger.warning('Could not update the status: %s', e) + self.wait_stop(1) + continue + + updated_entries = { + k: v for k, v in new_data.items() if v != last_data.get(k) + } + + self.publish_entities(updated_entries.items()) + self.last_data = { + **last_data, + **new_data, + } + + @override + def stop(self): + super().stop() + self._close_serial() # vim:sw=4:ts=4:et: diff --git a/platypush/plugins/serial/manifest.yaml b/platypush/plugins/serial/manifest.yaml index e5978d23..6298d06b 100644 --- a/platypush/plugins/serial/manifest.yaml +++ b/platypush/plugins/serial/manifest.yaml @@ -1,6 +1,8 @@ manifest: - events: {} + events: + - platypush.message.event.sensor.SensorDataChangeEvent: install: - pip: [] + pip: + - pyserial package: platypush.plugins.serial type: plugin diff --git a/platypush/utils/__init__.py b/platypush/utils/__init__.py index 7b46b096..2376b21d 100644 --- a/platypush/utils/__init__.py +++ b/platypush/utils/__init__.py @@ -5,6 +5,7 @@ import hashlib import importlib import inspect import logging +from multiprocessing import Lock as PLock import os import pathlib import re @@ -12,13 +13,15 @@ import signal import socket import ssl import urllib.request -from typing import Optional, Tuple, Union +from threading import Lock as TLock +from typing import Generator, Optional, Tuple, Union from dateutil import parser, tz from redis import Redis from rsa.key import PublicKey, PrivateKey, newkeys logger = logging.getLogger('utils') +Lock = Union[PLock, TLock] # type: ignore def get_module_and_method_from_action(action): @@ -363,7 +366,7 @@ def get_mime_type(resource: str) -> Optional[str]: return response.info().get_content_type() else: if hasattr(magic, 'detect_from_filename'): - mime = magic.detect_from_filename(resource) + mime = magic.detect_from_filename(resource) # type: ignore elif hasattr(magic, 'from_file'): mime = magic.from_file(resource, mime=True) else: @@ -372,7 +375,7 @@ def get_mime_type(resource: str) -> Optional[str]: ) if mime: - return mime.mime_type if hasattr(mime, 'mime_type') else mime + return mime.mime_type if hasattr(mime, 'mime_type') else mime # type: ignore return None @@ -559,4 +562,27 @@ def to_datetime(t: Union[str, int, float, datetime.datetime]) -> datetime.dateti return t +@contextlib.contextmanager +def get_lock( + lock: Lock, timeout: Optional[float] = None +) -> Generator[bool, None, None]: + """ + Get a lock with an optional timeout through a context manager construct: + + >>> from threading import Lock + >>> lock = Lock() + >>> with get_lock(lock, timeout=2): + >>> ... + + """ + kwargs = {'timeout': timeout} if timeout else {} + result = lock.acquire(**kwargs) + + try: + yield result + finally: + if result: + lock.release() + + # vim:sw=4:ts=4:et: