Migrated old sensor.accelerometer integration.

Removed `backend.sensor.accelerometer` and `gpio.sensor.accelerometer`.
The logic has now been merged in the new `sensor.lis3dh` integration,
which is compatible with the new `SensorPlugin` API.
This commit is contained in:
Fabio Manganiello 2023-04-02 13:17:40 +02:00
parent 44cf25271c
commit 8f604445a2
Signed by: blacklight
GPG key ID: D90FBA7F76362774
8 changed files with 207 additions and 197 deletions

View file

@ -1,27 +0,0 @@
from platypush.backend.sensor import SensorBackend
class SensorAccelerometerBackend(SensorBackend):
"""
Backend to poll position information from an accelerometer sensor.
Requires:
* ``Adafruit-GPIO`` (``pip install Adafruit-GPIO``)
* The :mod:`platypush.plugins.gpio.sensor.accelerometer` plugin configured
Triggers:
* :class:`platypush.message.event.sensor.SensorDataChangeEvent` if the measurements of a sensor have changed
* :class:`platypush.message.event.sensor.SensorDataAboveThresholdEvent` if the measurements of a sensor have
gone above a configured threshold
* :class:`platypush.message.event.sensor.SensorDataBelowThresholdEvent` if the measurements of a sensor have
gone below a configured threshold
"""
def __init__(self, **kwargs):
super().__init__(plugin='gpio.sensor.accelerometer', **kwargs)
# vim:sw=4:ts=4:et:

View file

@ -1,13 +0,0 @@
manifest:
events:
platypush.message.event.sensor.SensorDataAboveThresholdEvent: if the measurements
of a sensor havegone above a configured threshold
platypush.message.event.sensor.SensorDataBelowThresholdEvent: if the measurements
of a sensor havegone below a configured threshold
platypush.message.event.sensor.SensorDataChangeEvent: if the measurements of a
sensor have changed
install:
pip:
- Adafruit-GPIO
package: platypush.backend.sensor.accelerometer
type: backend

View file

@ -1,68 +0,0 @@
from platypush.plugins import action
from platypush.plugins.gpio.sensor import GpioSensorPlugin
class GpioSensorAccelerometerPlugin(GpioSensorPlugin):
"""
Plugin to interact with an accelerometer sensor and get X,Y,Z position.
Tested with Adafruit LIS3DH accelerometer (https://www.adafruit.com/product/2809)
with Raspberry Pi over I2C connection.
Requires:
* ``Adafruit-GPIO`` (``pip install Adafruit-GPIO``)
"""
def __init__(self, g=4, precision=None, **kwargs):
"""
Only LIS3DH in I2C mode is currently supported: https://learn.adafruit.com/assets/59080.
:param g: Accelerometer range as a multiple of G - can be 2G, 4G, 8G or 16G
:type g: int
:param precision: If set, the position values will be rounded to the specified number of decimal digits
(default: no rounding)
:type precision: int
"""
super().__init__(**kwargs)
from platypush.plugins.gpio.sensor.accelerometer.lib.LIS3DH import LIS3DH
if g == 2:
self.g = LIS3DH.RANGE_2G
elif g == 4:
self.g = LIS3DH.RANGE_4G
elif g == 8:
self.g = LIS3DH.RANGE_8G
elif g == 16:
self.g = LIS3DH.RANGE_16G
else:
raise RuntimeError('Invalid G range: {}'.format(g))
self.precision = precision
self.sensor = LIS3DH()
self.sensor.setRange(self.g)
@action
def get_measurement(self):
"""
Extends :func:`.GpioSensorPlugin.get_measurement`
:returns: The sensor's current position as a dictionary with the three components (x,y,z) in degrees, each
between -90 and 90
"""
values = [
(pos*100 if self.precision is None else round(pos*100, self.precision))
for pos in (self.sensor.getX(), self.sensor.getY(), self.sensor.getZ())
]
return {
'name': 'position',
'value': {
'x': values[0], 'y': values[1], 'z': values[2]
}
}
# vim:sw=4:ts=4:et:

View file

@ -1,7 +0,0 @@
manifest:
events: {}
install:
pip:
- Adafruit-GPIO
package: platypush.plugins.gpio.sensor.accelerometer
type: plugin

View file

@ -0,0 +1,86 @@
from typing import Any, Dict, List
from typing_extensions import override
from platypush.entities.acceleration import Accelerometer
from platypush.plugins import action
from platypush.plugins.sensor import SensorPlugin
# pylint: disable=too-many-ancestors
class SensorLis3dhPlugin(SensorPlugin):
"""
Plugin to interact with an `Adafruit LIS3DH accelerometer
<https://www.adafruit.com/product/2809>`_ and get X,Y,Z measurement. Tested
with a Raspberry Pi over I2C connection.
Requires:
* ``Adafruit-GPIO`` (``pip install Adafruit-GPIO``)
Triggers:
* :class:`platypush.message.event.sensor.SensorDataAboveThresholdEvent`
* :class:`platypush.message.event.sensor.SensorDataBelowThresholdEvent`
* :class:`platypush.message.event.sensor.SensorDataChangeEvent`
"""
def __init__(self, g=4, precision=None, poll_interval=1, **kwargs):
"""
Only LIS3DH in I2C mode is currently supported: https://learn.adafruit.com/assets/59080.
:param g: Accelerometer range as a multiple of G - can be 2G, 4G, 8G or 16G
:type g: int
:param precision: If set, the position values will be rounded to the specified number of decimal digits
(default: no rounding)
:type precision: int
"""
from .lib.LIS3DH import LIS3DH
super().__init__(poll_interval=poll_interval, **kwargs)
if g == 2:
self.g = LIS3DH.RANGE_2G
elif g == 4:
self.g = LIS3DH.RANGE_4G
elif g == 8:
self.g = LIS3DH.RANGE_8G
elif g == 16:
self.g = LIS3DH.RANGE_16G
else:
raise RuntimeError(f'Invalid G range: {g}')
self.precision = precision
self.sensor = LIS3DH()
self.sensor.setRange(self.g)
@override
@action
def get_measurement(self, *_, **__):
"""
:returns: The sensor's current position as a dictionary with the three components (x,y,z) in degrees, each
between -90 and 90
"""
values = [
(pos * 100 if self.precision is None else round(pos * 100, self.precision))
for pos in (self.sensor.getX(), self.sensor.getY(), self.sensor.getZ())
]
return {
'name': 'position',
'value': {'x': values[0], 'y': values[1], 'z': values[2]},
}
@override
def transform_entities(self, entities: Dict[str, Any]) -> List[Accelerometer]:
return Accelerometer(
id='lis3dh',
name='LIS3DH Accelerometer',
value=entities['value'],
)
# vim:sw=4:ts=4:et:

View file

@ -25,7 +25,6 @@ import RPi.GPIO as GPIO # needed for Hardware interrupt
class LIS3DH:
# I2C
i2c = None
I2C_ADDRESS_1 = 0x18
@ -54,7 +53,9 @@ class LIS3DH:
DATARATE_1HZ = 0b0001 # 1Hz
DATARATE_POWERDOWN = 0 # Power down
DATARATE_LOWPOWER_1K6HZ = 0b1000 # Low power mode (1.6KHz)
DATARATE_LOWPOWER_5KHZ = 0b1001 # Low power mode (5KHz) / Normal power mode (1.25KHz)
DATARATE_LOWPOWER_5KHZ = (
0b1001 # Low power mode (5KHz) / Normal power mode (1.25KHz)
)
# default
DATARATE_DEFAULT = DATARATE_400HZ
@ -69,10 +70,14 @@ class LIS3DH:
REG_INTCOUNT = 0x0E
REG_WHOAMI = 0x0F # Device identification register
REG_TEMPCFG = 0x1F
REG_CTRL1 = 0x20 # Used for data rate selection, and enabling/disabling individual axis
REG_CTRL1 = (
0x20 # Used for data rate selection, and enabling/disabling individual axis
)
REG_CTRL2 = 0x21
REG_CTRL3 = 0x22
REG_CTRL4 = 0x23 # Used for BDU, scale selection, resolution selection and self-testing
REG_CTRL4 = (
0x23 # Used for BDU, scale selection, resolution selection and self-testing
)
REG_CTRL5 = 0x24
REG_CTRL6 = 0x25
REG_REFERENCE = 0x26
@ -109,9 +114,14 @@ class LIS3DH:
# changed busnumber to 1 (from -1)
# alternative i2c address=0x19
def __init__(self, address=I2C_DEFAULT, bus=BUS_NUMBER,
g_range=RANGE_DEFAULT, datarate=DATARATE_DEFAULT,
debug=False):
def __init__(
self,
address=I2C_DEFAULT,
bus=BUS_NUMBER,
g_range=RANGE_DEFAULT,
datarate=DATARATE_DEFAULT,
debug=False,
):
self.isDebug = debug
self.debug("Initialising LIS3DH")
@ -125,12 +135,18 @@ class LIS3DH:
# raise Exception(("Device ID incorrect - expected 0x%X, " +
# "got 0x%X at address 0x%X") % (
# self.DEVICE_ID, val, self.address))
raise Exception(("Device ID incorrect - expected 0x{:x}, " +
"got 0x{:x} at address 0x{:x}").format(
self.DEVICE_ID, val, self.address))
raise Exception(
(
"Device ID incorrect - expected 0x{:x}, "
+ "got 0x{:x} at address 0x{:x}"
).format(self.DEVICE_ID, val, self.address)
)
self.debug(("Successfully connected to LIS3DH " +
"at address 0x{:x}").format(self.address))
self.debug(
("Successfully connected to LIS3DH " + "at address 0x{:x}").format(
self.address
)
)
except Exception as e:
print("Error establishing connection with LIS3DH")
print(e)
@ -178,23 +194,31 @@ class LIS3DH:
# accurately calculate the result
g_range = self.getRange()
divisor = 1
if g_range == self.RANGE_2G: divisor = 16380
elif g_range == self.RANGE_4G: divisor = 8190
elif g_range == self.RANGE_8G: divisor = 4096
elif g_range == self.RANGE_16G: divisor = 1365.33
if g_range == self.RANGE_2G:
divisor = 16380
elif g_range == self.RANGE_4G:
divisor = 8190
elif g_range == self.RANGE_8G:
divisor = 4096
elif g_range == self.RANGE_16G:
divisor = 1365.33
return float(res) / divisor
# Get the range that the sensor is currently set to
def getRange(self):
val = self.i2c.readU8(self.REG_CTRL4) # Get value from register
val = (val >> 4) # Remove lowest 4 bits
val = val >> 4 # Remove lowest 4 bits
val &= 0b0011 # Mask off two highest bits
if val == self.RANGE_2G: return self.RANGE_2G
elif val == self.RANGE_4G: return self.RANGE_4G
elif val == self.RANGE_8G: return self.RANGE_8G
else: return self.RANGE_16G
if val == self.RANGE_2G:
return self.RANGE_2G
elif val == self.RANGE_4G:
return self.RANGE_4G
elif val == self.RANGE_8G:
return self.RANGE_8G
else:
return self.RANGE_16G
# Set the range of the sensor (2G, 4G, 8G, 16G)
def setRange(self, g_range):
@ -203,7 +227,7 @@ class LIS3DH:
val = self.i2c.readU8(self.REG_CTRL4) # Get value from register
val &= ~(0b110000) # Mask off lowest 4 bits
val |= (g_range << 4) # Write in our new range
val |= g_range << 4 # Write in our new range
self.writeRegister(self.REG_CTRL4, val) # Write back to register
# Enable or disable an individual axis
@ -223,10 +247,16 @@ class LIS3DH:
GPIO.setup(self.INT_IO, GPIO.IN)
GPIO.add_event_detect(self.INT_IO, GPIO.RISING, callback=mycallback)
def setClick(self, clickmode, clickthresh=80,
timelimit=10, timelatency=20, timewindow=100,
mycallback=None):
if (clickmode == self.CLK_NONE):
def setClick(
self,
clickmode,
clickthresh=80,
timelimit=10,
timelatency=20,
timewindow=100,
mycallback=None,
):
if clickmode == self.CLK_NONE:
val = self.i2c.readU8(self.REG_CTRL3) # Get value from register
val &= ~(0x80) # unset bit 8 to disable interrupt
self.writeRegister(self.REG_CTRL3, val) # Write back to register
@ -235,10 +265,10 @@ class LIS3DH:
self.writeRegister(self.REG_CTRL3, 0x80) # turn on int1 click
self.writeRegister(self.REG_CTRL5, 0x08) # latch interrupt on int1
if (clickmode == self.CLK_SINGLE):
if clickmode == self.CLK_SINGLE:
# turn on all axes & singletap
self.writeRegister(self.REG_CLICKCFG, 0x15)
if (clickmode == self.CLK_DOUBLE):
if clickmode == self.CLK_DOUBLE:
# turn on all axes & doubletap
self.writeRegister(self.REG_CLICKCFG, 0x2A)
@ -260,7 +290,7 @@ class LIS3DH:
def setDataRate(self, dataRate):
val = self.i2c.readU8(self.REG_CTRL1) # Get current value
val &= 0b1111 # Mask off lowest 4 bits
val |= (dataRate << 4) # Write in our new data rate to highest 4 bits
val |= dataRate << 4 # Write in our new data rate to highest 4 bits
self.writeRegister(self.REG_CTRL1, val) # Write back to register
# Set whether we want to use high resolution or not
@ -284,8 +314,7 @@ class LIS3DH:
# Write the given value to the given register
def writeRegister(self, register, value):
self.debug("WRT {} to register 0x{:x}".format(
bin(value), register))
self.debug("WRT {} to register 0x{:x}".format(bin(value), register))
self.i2c.write8(register, value)
# Set the bit at index 'bit' to 'value' on 'input' and return
@ -300,8 +329,8 @@ class LIS3DH:
# Thanks to http://stackoverflow.com/questions/16124059/trying-to-
# read-a-twos-complement-16bit-into-a-signed-decimal
def twosComp(self, x):
if (0x8000 & x):
x = - (0x010000 - x)
if 0x8000 & x:
x = -(0x010000 - x)
return x
# Print an output of all registers

View file

@ -0,0 +1,10 @@
manifest:
events:
platypush.message.event.sensor.SensorDataAboveThresholdEvent:
platypush.message.event.sensor.SensorDataBelowThresholdEvent:
platypush.message.event.sensor.SensorDataChangeEvent:
install:
pip:
- Adafruit-GPIO
package: platypush.plugins.sensor.lis3dh
type: plugin