diff --git a/platypush/plugins/serial/__init__.py b/platypush/plugins/serial/__init__.py index fb1792472..d4811eec4 100644 --- a/platypush/plugins/serial/__init__.py +++ b/platypush/plugins/serial/__init__.py @@ -1,25 +1,23 @@ import base64 from collections import namedtuple -from collections.abc import Collection import json -from typing import Any, List, Optional, Tuple, Union +from typing import Dict, List, Optional, Union import threading from typing_extensions import override from serial import Serial -from platypush.context import get_bus +from platypush.common.sensors import Numeric from platypush.entities.devices import Device from platypush.entities.sensors import RawSensor, NumericSensor -from platypush.entities.managers.sensors import SensorEntityManager -from platypush.message.event.sensor import SensorDataChangeEvent -from platypush.plugins import RunnablePlugin, action -from platypush.utils import get_lock, get_plugin_name_by_class +from platypush.plugins import action +from platypush.plugins.sensor import SensorPlugin +from platypush.utils import get_lock _DeviceAndRate = namedtuple('_DeviceAndRate', ['device', 'baud_rate']) -class SerialPlugin(RunnablePlugin, SensorEntityManager): +class SerialPlugin(SensorPlugin): """ The serial plugin can read data from a serial device. @@ -62,6 +60,8 @@ class SerialPlugin(RunnablePlugin, SensorEntityManager): Triggers: + * :class:`platypush.message.event.sensor.SensorDataAboveThresholdEvent` + * :class:`platypush.message.event.sensor.SensorDataBelowThresholdEvent` * :class:`platypush.message.event.sensor.SensorDataChangeEvent` """ @@ -75,6 +75,7 @@ class SerialPlugin(RunnablePlugin, SensorEntityManager): max_size: int = 1 << 19, timeout: float = _default_lock_timeout, enable_polling: bool = True, + poll_interval: float = 0.1, **kwargs, ): """ @@ -94,8 +95,12 @@ class SerialPlugin(RunnablePlugin, SensorEntityManager): programmatically interface with the device via the :meth:`.read` and :meth:`.write` methods instead of polling for updates in JSON format. + :param poll_interval: How often (in seconds) we should poll the device + for new data. Since we are reading JSON data from a serial + interface whenever it's ready, the default here can be quite low + (default: 0.1 seconds). """ - super().__init__(**kwargs) + super().__init__(poll_interval=poll_interval, **kwargs) self.device = device self.baud_rate = baud_rate @@ -255,21 +260,18 @@ class SerialPlugin(RunnablePlugin, SensorEntityManager): @override @action - def status( + def get_measurement( self, *_, device: Optional[str] = None, baud_rate: Optional[int] = None, - publish_entities: bool = True, **__, - ) -> dict: + ) -> Dict[str, Numeric]: """ Reads JSON data from the serial device and returns it as a message :param device: Device path (default: default configured device). :param baud_rate: Baud rate (default: default configured baud_rate). - :param publish_entities: Whether to publish an event with the newly - read values (default: True). """ device, baud_rate = self._get_device_and_baud_rate(device, baud_rate) @@ -292,20 +294,8 @@ class SerialPlugin(RunnablePlugin, SensorEntityManager): if data: self.last_data = data - if publish_entities: - self.publish_entities(self.last_data.items()) - return data - @action - def get_measurement( - self, device: Optional[str] = None, baud_rate: Optional[int] = None - ): - """ - (Deprecated) alias for :meth:`.status`. - """ - return self.status(device, baud_rate) - @action def read( self, @@ -397,10 +387,10 @@ class SerialPlugin(RunnablePlugin, SensorEntityManager): ser.write(data) @override - def transform_entities(self, entities: Tuple[str, Any]) -> List[Device]: + def transform_entities(self, entities: Dict[str, Numeric]) -> List[Device]: transformed_entities = [] - for k, v in entities: + for k, v in entities.items(): sensor_id = f'serial:{k}' try: value = float(v) @@ -425,18 +415,6 @@ class SerialPlugin(RunnablePlugin, SensorEntityManager): ) ] - @override - def publish_entities(self, entities: Collection[Tuple[str, Any]], *_, **__): - ret = super().publish_entities(entities) - get_bus().post( - SensorDataChangeEvent( - data=dict(entities), - source=get_plugin_name_by_class(self.__class__), - ) - ) - - return ret - @override def main(self): if not self._enable_polling: @@ -444,24 +422,7 @@ class SerialPlugin(RunnablePlugin, SensorEntityManager): self.wait_stop() return - while not self.should_stop(): - last_data = self.last_data.copy() - try: - new_data: dict = self.status(publish_entities=False).output # type: ignore - except Exception as e: - self.logger.warning('Could not update the status: %s', e) - self.wait_stop(1) - continue - - updated_entries = { - k: v for k, v in new_data.items() if v != last_data.get(k) - } - - self.publish_entities(updated_entries.items()) - self.last_data = { - **last_data, - **new_data, - } + super().main() @override def stop(self):