serial plugin migrated to the new SensorPlugin interface.

This commit is contained in:
Fabio Manganiello 2023-04-01 19:29:56 +02:00
parent bf4db76830
commit 6a5a5de03e
Signed by untrusted user: blacklight
GPG key ID: D90FBA7F76362774

View file

@ -1,25 +1,23 @@
import base64
from collections import namedtuple
from collections.abc import Collection
import json
from typing import Any, List, Optional, Tuple, Union
from typing import Dict, List, Optional, Union
import threading
from typing_extensions import override
from serial import Serial
from platypush.context import get_bus
from platypush.common.sensors import Numeric
from platypush.entities.devices import Device
from platypush.entities.sensors import RawSensor, NumericSensor
from platypush.entities.managers.sensors import SensorEntityManager
from platypush.message.event.sensor import SensorDataChangeEvent
from platypush.plugins import RunnablePlugin, action
from platypush.utils import get_lock, get_plugin_name_by_class
from platypush.plugins import action
from platypush.plugins.sensor import SensorPlugin
from platypush.utils import get_lock
_DeviceAndRate = namedtuple('_DeviceAndRate', ['device', 'baud_rate'])
class SerialPlugin(RunnablePlugin, SensorEntityManager):
class SerialPlugin(SensorPlugin):
"""
The serial plugin can read data from a serial device.
@ -62,6 +60,8 @@ class SerialPlugin(RunnablePlugin, SensorEntityManager):
Triggers:
* :class:`platypush.message.event.sensor.SensorDataAboveThresholdEvent`
* :class:`platypush.message.event.sensor.SensorDataBelowThresholdEvent`
* :class:`platypush.message.event.sensor.SensorDataChangeEvent`
"""
@ -75,6 +75,7 @@ class SerialPlugin(RunnablePlugin, SensorEntityManager):
max_size: int = 1 << 19,
timeout: float = _default_lock_timeout,
enable_polling: bool = True,
poll_interval: float = 0.1,
**kwargs,
):
"""
@ -94,8 +95,12 @@ class SerialPlugin(RunnablePlugin, SensorEntityManager):
programmatically interface with the device via the :meth:`.read`
and :meth:`.write` methods instead of polling for updates in JSON
format.
:param poll_interval: How often (in seconds) we should poll the device
for new data. Since we are reading JSON data from a serial
interface whenever it's ready, the default here can be quite low
(default: 0.1 seconds).
"""
super().__init__(**kwargs)
super().__init__(poll_interval=poll_interval, **kwargs)
self.device = device
self.baud_rate = baud_rate
@ -255,21 +260,18 @@ class SerialPlugin(RunnablePlugin, SensorEntityManager):
@override
@action
def status(
def get_measurement(
self,
*_,
device: Optional[str] = None,
baud_rate: Optional[int] = None,
publish_entities: bool = True,
**__,
) -> dict:
) -> Dict[str, Numeric]:
"""
Reads JSON data from the serial device and returns it as a message
:param device: Device path (default: default configured device).
:param baud_rate: Baud rate (default: default configured baud_rate).
:param publish_entities: Whether to publish an event with the newly
read values (default: True).
"""
device, baud_rate = self._get_device_and_baud_rate(device, baud_rate)
@ -292,20 +294,8 @@ class SerialPlugin(RunnablePlugin, SensorEntityManager):
if data:
self.last_data = data
if publish_entities:
self.publish_entities(self.last_data.items())
return data
@action
def get_measurement(
self, device: Optional[str] = None, baud_rate: Optional[int] = None
):
"""
(Deprecated) alias for :meth:`.status`.
"""
return self.status(device, baud_rate)
@action
def read(
self,
@ -397,10 +387,10 @@ class SerialPlugin(RunnablePlugin, SensorEntityManager):
ser.write(data)
@override
def transform_entities(self, entities: Tuple[str, Any]) -> List[Device]:
def transform_entities(self, entities: Dict[str, Numeric]) -> List[Device]:
transformed_entities = []
for k, v in entities:
for k, v in entities.items():
sensor_id = f'serial:{k}'
try:
value = float(v)
@ -425,18 +415,6 @@ class SerialPlugin(RunnablePlugin, SensorEntityManager):
)
]
@override
def publish_entities(self, entities: Collection[Tuple[str, Any]], *_, **__):
ret = super().publish_entities(entities)
get_bus().post(
SensorDataChangeEvent(
data=dict(entities),
source=get_plugin_name_by_class(self.__class__),
)
)
return ret
@override
def main(self):
if not self._enable_polling:
@ -444,24 +422,7 @@ class SerialPlugin(RunnablePlugin, SensorEntityManager):
self.wait_stop()
return
while not self.should_stop():
last_data = self.last_data.copy()
try:
new_data: dict = self.status(publish_entities=False).output # type: ignore
except Exception as e:
self.logger.warning('Could not update the status: %s', e)
self.wait_stop(1)
continue
updated_entries = {
k: v for k, v in new_data.items() if v != last_data.get(k)
}
self.publish_entities(updated_entries.items())
self.last_data = {
**last_data,
**new_data,
}
super().main()
@override
def stop(self):