Migrated pmw3901 integration.

Removed legacy `backend.sensor.motion.pmw3901` and
`gpio.sensor.motion.pmw3901`. They have been merged in the new
`sensor.pmw3901` integration, compatible with the new `SensorPlugin`
API.
This commit is contained in:
Fabio Manganiello 2023-04-02 12:36:08 +02:00
parent fcdda40c4a
commit 44cf25271c
Signed by untrusted user: blacklight
GPG key ID: D90FBA7F76362774
7 changed files with 200 additions and 177 deletions

View file

@ -1,33 +0,0 @@
from platypush.backend.sensor import SensorBackend
class SensorMotionPmw3901Backend(SensorBackend):
"""
Backend to poll an `PMW3901 <https://github.com/pimoroni/pmw3901-python>`_
optical flow and motion sensor
Requires:
* ``pmw3901`` (``pip install pmw3901``)
Triggers:
* :class:`platypush.message.event.sensor.SensorDataChangeEvent` if the measurements of a sensor have changed
* :class:`platypush.message.event.sensor.SensorDataAboveThresholdEvent` if the measurements of a sensor have
gone above a configured threshold
* :class:`platypush.message.event.sensor.SensorDataBelowThresholdEvent` if the measurements of a sensor have
gone below a configured threshold
"""
_default_poll_seconds = 0.01
def __init__(self, **kwargs):
if 'poll_seconds' not in kwargs:
# noinspection PyTypeChecker
kwargs['poll_seconds'] = self._default_poll_seconds
super().__init__(plugin='gpio.sensor.motion.pmw3901', **kwargs)
# vim:sw=4:ts=4:et:

View file

@ -1,13 +0,0 @@
manifest:
events:
platypush.message.event.sensor.SensorDataAboveThresholdEvent: if the measurements
of a sensor havegone above a configured threshold
platypush.message.event.sensor.SensorDataBelowThresholdEvent: if the measurements
of a sensor havegone below a configured threshold
platypush.message.event.sensor.SensorDataChangeEvent: if the measurements of a
sensor have changed
install:
pip:
- pmw3901
package: platypush.backend.sensor.motion.pmw3901
type: backend

View file

@ -1,124 +0,0 @@
import enum
import math
import time
from platypush.plugins import action
from platypush.plugins.gpio.sensor import GpioSensorPlugin
class Rotation(enum.IntEnum):
ROTATE_0 = 0
ROTATE_90 = 90
ROTATE_180 = 180
ROTATE_270 = 270
class SPISlot(enum.Enum):
FRONT = 'front'
BACK = 'back'
class GpioSensorMotionPmw3901Plugin(GpioSensorPlugin):
"""
Plugin to interact with an `PMW3901 <https://github.com/pimoroni/pmw3901-python>`_
optical flow and motion sensor
Requires:
* ``pmw3901`` (``pip install pmw3901``)
"""
def __init__(self, rotation=Rotation.ROTATE_0.value, spi_slot=SPISlot.FRONT.value, spi_port=0, **kwargs):
"""
:param rotation: Rotation angle for the captured optical flow. Possible options: 0, 90, 180, 270 (default: 0)
:type rotation: int
:param spi_slot: SPI slot where the sensor is connected if you're using a Breakout Garden interface.
Possible options: 'front', 'back' (default: 'front')
:type spi_slot: str
:param spi_port: SPI port (default: 0)
:type spi_slot: int
"""
from pmw3901 import BG_CS_FRONT_BCM, BG_CS_BACK_BCM
super().__init__(**kwargs)
self.spi_port = spi_port
self._sensor = None
self._events_per_sec = {}
self.x, self.y = (0, 0)
try:
if isinstance(rotation, int):
rotation = [r for r in Rotation if r.value == rotation][0]
self.rotation = rotation
except IndexError:
raise ValueError('{} is not a valid value for rotation - possible values: {}'.format(
rotation, [r.value for r in Rotation]))
try:
if isinstance(spi_slot, str):
spi_slot = [s for s in SPISlot if s.value == spi_slot][0]
if spi_slot == SPISlot.FRONT:
self.spi_slot = BG_CS_FRONT_BCM
else:
self.spi_slot = BG_CS_BACK_BCM
except IndexError:
raise ValueError('{} is not a valid value for spi_slot - possible values: {}'.format(
spi_slot, [s.value for s in SPISlot]))
def _get_sensor(self):
from pmw3901 import PMW3901
if not self._sensor:
self._sensor = PMW3901(spi_port=self.spi_port,
spi_cs=1,
spi_cs_gpio=self.spi_slot)
self._sensor.set_rotation(self.rotation)
return self._sensor
@action
def get_measurement(self):
"""
:returns: dict. Example:
.. code-block:: python
output = {
"motion_x": 3, # Detected motion vector X-coord
"motion_y": 4, # Detected motion vector Y-coord
"motion_mod": 5 # Detected motion vector module
"motion_events_per_sec": 7 # Number of motion events detected in the last second
}
"""
sensor = self._get_sensor()
while True:
try:
x, y = sensor.get_motion()
break
except RuntimeError:
time.sleep(0.01)
secs = int(time.time())
if (x, y) != (self.x, self.y):
(self.x, self.y) = (x, y)
if secs not in self._events_per_sec:
self._events_per_sec = {secs: 1}
else:
self._events_per_sec[secs] += 1
return {
'motion_x': x,
'motion_y': y,
'motion_mod': math.sqrt(x * x + y * y),
'motion_events_per_sec': self._events_per_sec.get(secs, 0),
}
# vim:sw=4:ts=4:et:

View file

@ -1,7 +0,0 @@
manifest:
events: {}
install:
pip:
- pmw3901
package: platypush.plugins.gpio.sensor.motion.pmw3901
type: plugin

View file

@ -0,0 +1,190 @@
import enum
import math
import time
from typing import Dict, List
from typing_extensions import override
from platypush.common.sensors import Numeric
from platypush.entities.devices import Device
from platypush.entities.distance import DistanceSensor
from platypush.entities.sensors import NumericSensor, RawSensor
from platypush.plugins import action
from platypush.plugins.sensor import SensorPlugin
class Rotation(enum.IntEnum):
"""
Enumerates the possible rotations of the sensor.
"""
ROTATE_0 = 0
ROTATE_90 = 90
ROTATE_180 = 180
ROTATE_270 = 270
class SPISlot(enum.Enum):
"""
Enumeration for the possible SPI slot positions of the sensor on the
Breakout Garden (front or back).
"""
FRONT = 'front'
BACK = 'back'
# pylint: disable=too-many-ancestors
class SensorPmw3901Plugin(SensorPlugin):
"""
Plugin to interact with an `PMW3901 <https://github.com/pimoroni/pmw3901-python>`_
optical flow and motion sensor
Requires:
* ``pmw3901`` (``pip install pmw3901``)
Triggers:
* :class:`platypush.message.event.sensor.SensorDataAboveThresholdEvent`
* :class:`platypush.message.event.sensor.SensorDataBelowThresholdEvent`
* :class:`platypush.message.event.sensor.SensorDataChangeEvent`
"""
def __init__(
self,
rotation=Rotation.ROTATE_0.value,
spi_slot=SPISlot.FRONT.value,
spi_port=0,
poll_interval=0.01,
**kwargs,
):
"""
:param rotation: Rotation angle for the captured optical flow. Possible options: 0, 90, 180, 270 (default: 0)
:type rotation: int
:param spi_slot: SPI slot where the sensor is connected if you're using a Breakout Garden interface.
Possible options: 'front', 'back' (default: 'front')
:type spi_slot: str
:param spi_port: SPI port (default: 0)
:type spi_slot: int
"""
from pmw3901 import BG_CS_FRONT_BCM, BG_CS_BACK_BCM
super().__init__(poll_interval=poll_interval, **kwargs)
self.spi_port = spi_port
self._sensor = None
self._events_per_sec = {}
self.x, self.y = (0, 0)
try:
if isinstance(rotation, int):
rotation = [r for r in Rotation if r.value == rotation][0]
self.rotation = rotation
except IndexError as e:
raise AssertionError(
f'{rotation} is not a valid value for rotation - '
f'possible values: {[r.value for r in Rotation]}'
) from e
try:
if isinstance(spi_slot, str):
spi_slot = [s for s in SPISlot if s.value == spi_slot][0]
if spi_slot == SPISlot.FRONT:
self.spi_slot = BG_CS_FRONT_BCM
else:
self.spi_slot = BG_CS_BACK_BCM
except IndexError as e:
raise AssertionError(
f'{spi_slot} is not a valid value for spi_slot - '
f'possible values: {[s.value for s in SPISlot]}'
) from e
def _get_sensor(self):
from pmw3901 import PMW3901
if not self._sensor:
self._sensor = PMW3901(
spi_port=self.spi_port, spi_cs=1, spi_cs_gpio=self.spi_slot
)
self._sensor.set_rotation(self.rotation)
return self._sensor
@override
@action
def get_measurement(self, *_, **__):
"""
:returns: dict. Example:
.. code-block:: python
output = {
"motion_x": 3, # Detected motion vector X-coord
"motion_y": 4, # Detected motion vector Y-coord
"motion_mod": 5 # Detected motion vector module
"motion_events_per_sec": 7 # Number of motion events detected in the last second
}
"""
sensor = self._get_sensor()
while True:
try:
x, y = sensor.get_motion()
break
except RuntimeError:
self.wait_stop(0.01)
secs = int(time.time())
if (x, y) != (self.x, self.y):
(self.x, self.y) = (x, y)
if secs not in self._events_per_sec:
self._events_per_sec = {secs: 1}
else:
self._events_per_sec[secs] += 1
return {
'motion_x': x,
'motion_y': y,
'motion_mod': math.sqrt(x * x + y * y),
'motion_events_per_sec': self._events_per_sec.get(secs, 0),
}
@override
def transform_entities(self, entities: Dict[str, Numeric]) -> List[Device]:
return [
Device(
id='pmw3901',
name='PMW3901',
children=[
RawSensor(
id='pmw3901:motion',
name='motion',
description='Motion vector',
value=[entities['motion_x'], entities['motion_y']],
is_json=True,
),
DistanceSensor(
id='pmw3901:module',
name='module',
description='Motion vector module',
value=entities['motion_mod'],
),
NumericSensor(
id='pmw3901:rate',
name='rate',
description='Events per second',
value=entities['motion_events_per_sec'],
),
],
)
]
# vim:sw=4:ts=4:et:

View file

@ -0,0 +1,10 @@
manifest:
events:
platypush.message.event.sensor.SensorDataAboveThresholdEvent:
platypush.message.event.sensor.SensorDataBelowThresholdEvent:
platypush.message.event.sensor.SensorDataChangeEvent:
install:
pip:
- pmw3901
package: platypush.plugins.sensor.pmw3901
type: plugin