Migrated `sensor.distance` integration.

Remove `backend.sensor.distance` and `gpio.sensor.distance`. They are
now replaced by the `sensor.hcsr04` integration, which is compatible
with the new `SensorPlugin` API.
This commit is contained in:
Fabio Manganiello 2023-04-02 14:20:12 +02:00
parent 92578a17c9
commit 962c55937d
Signed by: blacklight
GPG Key ID: D90FBA7F76362774
7 changed files with 204 additions and 256 deletions

View File

@ -1,32 +0,0 @@
from platypush.backend.sensor import SensorBackend
from platypush.context import get_plugin
class SensorDistanceBackend(SensorBackend):
"""
Backend to poll a distance sensor.
Requires:
* ``RPi.GPIO`` (``pip install RPi.GPIO``)
* The :mod:`platypush.plugins.gpio.sensor.distance` plugin configured
Triggers:
* :class:`platypush.message.event.sensor.SensorDataChangeEvent` if the measurements of a sensor have changed
* :class:`platypush.message.event.sensor.SensorDataAboveThresholdEvent` if the measurements of a sensor have
gone above a configured threshold
* :class:`platypush.message.event.sensor.SensorDataBelowThresholdEvent` if the measurements of a sensor have
gone below a configured threshold
"""
def get_measurement(self):
""" get_measurement implementation """
plugin = get_plugin('gpio.sensor.distance')
return {"distance": plugin.get_data().output }
# vim:sw=4:ts=4:et:

View File

@ -1,13 +0,0 @@
manifest:
events:
platypush.message.event.sensor.SensorDataAboveThresholdEvent: if the measurements
of a sensor havegone above a configured threshold
platypush.message.event.sensor.SensorDataBelowThresholdEvent: if the measurements
of a sensor havegone below a configured threshold
platypush.message.event.sensor.SensorDataChangeEvent: if the measurements of a
sensor have changed
install:
pip:
- RPi.GPIO
package: platypush.backend.sensor.distance
type: backend

View File

@ -1,7 +1,3 @@
"""
.. moduleauthor:: Fabio Manganiello <blacklight86@gmail.com>
"""
import threading
from typing import Any, Optional, Dict, Union, Collection
@ -216,7 +212,7 @@ class GpioPlugin(RunnablePlugin):
raise RuntimeError("No PIN mappings were provided/configured")
values = []
for (pin, name) in self.pins_by_number.items():
for pin, name in self.pins_by_number.items():
# noinspection PyUnresolvedReferences
values.append(self.read(pin=pin, name=name).output)

View File

@ -1,197 +0,0 @@
import threading
import time
from typing import Optional
from platypush.context import get_bus
from platypush.message.event.distance import DistanceSensorEvent
from platypush.plugins import action
from platypush.plugins.gpio import GpioPlugin
from platypush.plugins.gpio.sensor import GpioSensorPlugin
class GpioSensorDistancePlugin(GpioPlugin, GpioSensorPlugin):
"""
You can use this plugin to interact with a distance sensor on your Raspberry
Pi (tested with a HC-SR04 ultrasound sensor).
Requires:
* ``RPi.GPIO`` (``pip install RPi.GPIO``)
Triggers:
* :class:`platypush.message.event.distance.DistanceSensorEvent` when a new distance measurement is available
"""
def __init__(self, trigger_pin: int, echo_pin: int, measurement_interval: float = 0.15,
timeout: float = 2.0, warmup_time: float = 2.0, *args, **kwargs):
"""
:param trigger_pin: GPIO PIN where you connected your sensor trigger PIN (the one that triggers the
sensor to perform a measurement).
:param echo_pin: GPIO PIN where you connected your sensor echo PIN (the one that will listen for the
signal to bounce back and therefore trigger the distance calculation).
:param measurement_interval: When running in continuous mode (see
:func:`platypush.plugins.gpio.sensor.distance.GpioSensorDistancePlugin.start_measurement`) this parameter
indicates how long should be waited between two measurements (default: 0.15 seconds)
:param timeout: The echo-wait will terminate and the plugin will return null if no echo has been
received after this time (default: 1 second).
:param warmup_time: Number of seconds that should be waited on plugin instantiation
for the sensor to be ready (default: 2 seconds).
"""
GpioPlugin.__init__(self, pins={'trigger': trigger_pin, 'echo': echo_pin, }, *args, **kwargs)
self.trigger_pin = trigger_pin
self.echo_pin = echo_pin
self.measurement_interval = measurement_interval
self.timeout = timeout
self.warmup_time = warmup_time
self._measurement_thread = None
self._measurement_thread_lock = threading.RLock()
self._measurement_thread_can_run = False
self._init_board()
def _init_board(self):
import RPi.GPIO as GPIO
with self._init_lock:
if self._initialized:
return
GpioPlugin._init_board(self)
self._initialized = False
GPIO.setup(self.trigger_pin, GPIO.OUT)
GPIO.setup(self.echo_pin, GPIO.IN)
GPIO.output(self.trigger_pin, GPIO.LOW)
self.logger.info('Waiting {} seconds for the sensor to be ready'.format(self.warmup_time))
time.sleep(self.warmup_time)
self.logger.info('Sensor ready')
self._initialized = True
def _get_data(self):
import RPi.GPIO as GPIO
pulse_start = pulse_on = time.time()
self._init_board()
GPIO.output(self.trigger_pin, GPIO.HIGH)
time.sleep(0.00001) # 1 us pulse to trigger echo measurement
GPIO.output(self.trigger_pin, GPIO.LOW)
while GPIO.input(self.echo_pin) == 0:
pulse_on = time.time()
if pulse_on - pulse_start > self.timeout:
raise TimeoutError('Distance sensor echo timeout after {} seconds: 0'.
format(self.timeout))
pulse_start = pulse_on
pulse_end = pulse_off = time.time()
while GPIO.input(self.echo_pin) == 1:
pulse_off = time.time()
if pulse_off - pulse_end > self.timeout:
raise TimeoutError('Distance sensor echo timeout after {} seconds: 1'.
format(self.timeout))
pulse_end = pulse_off
pulse_duration = pulse_end - pulse_start
# s = vt where v = 1/2 * avg speed of sound in mm/s
return round(pulse_duration * 171500.0, 2)
@action
def get_measurement(self):
"""
Extends :func:`.GpioSensorPlugin.get_measurement`
:returns: Distance measurement as a scalar (in mm):
"""
try:
distance = self._get_data()
bus = get_bus()
bus.post(DistanceSensorEvent(distance=distance, unit='mm'))
return distance
except TimeoutError as e:
self.logger.warning(str(e))
return
except Exception as e:
self.close()
raise e
@action
def close(self):
return self.cleanup()
def __enter__(self):
self._init_board()
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
def _get_measurement_thread(self, duration: float):
def _thread():
with self:
self.logger.info('Started distance measurement thread')
start_time = time.time()
try:
while self._measurement_thread_can_run and (
not duration or time.time() - start_time <= duration):
self.get_measurement()
if self.measurement_interval:
time.sleep(self.measurement_interval)
finally:
self._measurement_thread = None
return _thread
def _is_measurement_thread_running(self):
with self._measurement_thread_lock:
return self._measurement_thread is not None
@action
def start_measurement(self, duration: Optional[float] = None):
"""
Start the measurement thread. It will trigger :class:`platypush.message.event.distance.DistanceSensorEvent`
events when new measurements are available.
:param duration: If set, then the thread will run for the specified amount of seconds (default: None)
"""
with self._measurement_thread_lock:
if self._is_measurement_thread_running():
self.logger.warning('A measurement thread is already running')
return
thread_func = self._get_measurement_thread(duration=duration)
self._measurement_thread = threading.Thread(target=thread_func)
self._measurement_thread_can_run = True
self._measurement_thread.start()
@action
def stop_measurement(self):
"""
Stop the running measurement thread.
"""
with self._measurement_thread_lock:
if not self._is_measurement_thread_running():
self.logger.warning('No measurement thread is running')
return
self._measurement_thread_can_run = False
self.logger.info('Waiting for the measurement thread to end')
if self._measurement_thread:
self._measurement_thread.join(timeout=self.timeout)
self.logger.info('Measurement thread terminated')
# vim:sw=4:ts=4:et:

View File

@ -1,9 +0,0 @@
manifest:
events:
platypush.message.event.distance.DistanceSensorEvent: when a new distance measurement
is available
install:
pip:
- RPi.GPIO
package: platypush.plugins.gpio.sensor.distance
type: plugin

View File

@ -0,0 +1,192 @@
from collections.abc import Collection
import time
from typing import List, Optional, Union
from typing_extensions import override
import warnings
from platypush.context import get_bus
from platypush.entities.distance import DistanceSensor
from platypush.message.event.distance import DistanceSensorEvent
from platypush.plugins import action
from platypush.plugins.gpio import GpioPlugin
from platypush.plugins.sensor import SensorPlugin
class SensorHcsr04Plugin(GpioPlugin, SensorPlugin):
"""
You can use this plugin to interact with a distance sensor on your
Raspberry Pi. It's been tested with a `HC-SR04 ultrasound sensor
<https://www.sparkfun.com/products/15569>`_, but it should be compatible
with any GPIO-compatible sensor that relies on the same trigger-and-echo
principle.
Requires:
* ``RPi.GPIO`` (``pip install RPi.GPIO``)
Triggers:
* :class:`platypush.message.event.sensor.SensorDataAboveThresholdEvent`
* :class:`platypush.message.event.sensor.SensorDataBelowThresholdEvent`
* :class:`platypush.message.event.sensor.SensorDataChangeEvent`
* :class:`platypush.message.event.distance.DistanceSensorEvent` when a
new distance measurement is available (legacy event)
"""
def __init__(
self,
trigger_pin: int,
echo_pin: int,
poll_interval: float = 0.25,
timeout: float = 2.0,
warmup_time: float = 2.0,
*args,
**kwargs,
):
"""
:param trigger_pin: GPIO PIN where you connected your sensor trigger
PIN (the one that triggers the sensor to perform a measurement).
:param echo_pin: GPIO PIN where you connected your sensor echo PIN (the
one that will listen for the signal to bounce back and therefore
trigger the distance calculation).
:param timeout: The echo-wait will terminate and the plugin will return
null if no echo has been received after this time (default: 1
second).
:param warmup_time: Number of seconds that should be waited on plugin
instantiation for the sensor to be ready (default: 2 seconds).
"""
measurement_interval = kwargs.pop('measurement_interval', None)
if measurement_interval is not None:
warnings.warn(
'measurement_interval is deprecated, use poll_interval instead',
DeprecationWarning,
stacklevel=2,
)
poll_interval = measurement_interval
SensorPlugin.__init__(self, poll_interval=poll_interval, *args, **kwargs)
GpioPlugin.__init__(
self,
pins={
'trigger': trigger_pin,
'echo': echo_pin,
},
*args,
**kwargs,
)
self.trigger_pin = trigger_pin
self.echo_pin = echo_pin
self.timeout = timeout
self.warmup_time = warmup_time
self._init_board()
def _init_board(self):
import RPi.GPIO as GPIO
with self._init_lock:
if self._initialized:
return
GpioPlugin._init_board(self)
self._initialized = False
GPIO.setup(self.trigger_pin, GPIO.OUT)
GPIO.setup(self.echo_pin, GPIO.IN)
GPIO.output(self.trigger_pin, GPIO.LOW)
self.logger.info(
'Waiting {} seconds for the sensor to be ready'.format(self.warmup_time)
)
self.wait_stop(self.warmup_time)
self.logger.info('Sensor ready')
self._initialized = True
def _get_data(self):
import RPi.GPIO as GPIO
pulse_start = pulse_on = time.time()
self._init_board()
GPIO.output(self.trigger_pin, GPIO.HIGH)
time.sleep(0.00001) # 1 us pulse to trigger echo measurement
GPIO.output(self.trigger_pin, GPIO.LOW)
while GPIO.input(self.echo_pin) == 0:
pulse_on = time.time()
if pulse_on - pulse_start > self.timeout:
raise TimeoutError(
'Distance sensor echo timeout after {} seconds: 0'.format(
self.timeout
)
)
pulse_start = pulse_on
pulse_end = pulse_off = time.time()
while GPIO.input(self.echo_pin) == 1:
pulse_off = time.time()
if pulse_off - pulse_end > self.timeout:
raise TimeoutError(
'Distance sensor echo timeout after {} seconds: 1'.format(
self.timeout
)
)
pulse_end = pulse_off
pulse_duration = pulse_end - pulse_start
# s = vt where v = 1/2 * avg speed of sound in mm/s
return round(pulse_duration * 171500.0, 2)
@override
@action
def get_measurement(self, *_, **__) -> Optional[float]:
"""
:returns: Distance measurement as a scalar (in mm):
"""
try:
distance = self._get_data()
bus = get_bus()
bus.post(DistanceSensorEvent(distance=distance, unit='mm'))
return distance
except TimeoutError as e:
self.logger.warning(f'Read timeout: {e}')
return None
except Exception as e:
self.cleanup()
raise e
@override
def transform_entities(
self, entities: Union[Optional[float], Collection[Optional[float]]]
) -> List[DistanceSensor]:
value = (
entities[0]
if isinstance(entities, (list, tuple)) and len(entities)
else entities
)
if value is None:
return []
return [
DistanceSensor(
id='hcsr04',
name='HC-SR04 Distance Sensor',
value=value,
unit='mm',
)
]
@action
def stop(self):
super().stop()
self.cleanup()
# vim:sw=4:ts=4:et:

View File

@ -0,0 +1,11 @@
manifest:
events:
platypush.message.event.sensor.SensorDataAboveThresholdEvent:
platypush.message.event.sensor.SensorDataBelowThresholdEvent:
platypush.message.event.sensor.SensorDataChangeEvent:
platypush.message.event.distance.DistanceSensorEvent:
install:
pip:
- RPi.GPIO
package: platypush.plugins.sensor.hcsr04
type: plugin