Migrated mcp3008 integration to the new SensorPlugin API.

This commit is contained in:
Fabio Manganiello 2023-04-02 17:31:03 +02:00
parent 45e5ca47e7
commit ac2ec58f89
Signed by untrusted user: blacklight
GPG key ID: D90FBA7F76362774
5 changed files with 72 additions and 61 deletions

View file

@ -1,28 +0,0 @@
from platypush.backend.sensor import SensorBackend
class SensorMcp3008Backend(SensorBackend):
"""
Backend to poll analog sensor values from an MCP3008 chipset
(https://learn.adafruit.com/raspberry-pi-analog-to-digital-converters/mcp3008)
Requires:
* ``adafruit-mcp3008`` (``pip install adafruit-mcp3008``)
* The :mod:`platypush.plugins.gpio.sensor.mcp3008` plugin configured
Triggers:
* :class:`platypush.message.event.sensor.SensorDataChangeEvent` if the measurements of a sensor have changed
* :class:`platypush.message.event.sensor.SensorDataAboveThresholdEvent` if the measurements of a sensor have
gone above a configured threshold
* :class:`platypush.message.event.sensor.SensorDataBelowThresholdEvent` if the measurements of a sensor have
gone below a configured threshold
"""
def __init__(self, **kwargs):
super().__init__(plugin='gpio.sensor.mcp3008', **kwargs)
# vim:sw=4:ts=4:et:

View file

@ -1,13 +0,0 @@
manifest:
events:
platypush.message.event.sensor.SensorDataAboveThresholdEvent: if the measurements
of a sensor havegone above a configured threshold
platypush.message.event.sensor.SensorDataBelowThresholdEvent: if the measurements
of a sensor havegone below a configured threshold
platypush.message.event.sensor.SensorDataChangeEvent: if the measurements of a
sensor have changed
install:
pip:
- adafruit-mcp3008
package: platypush.backend.sensor.mcp3008
type: backend

View file

@ -1,7 +0,0 @@
manifest:
events: {}
install:
pip:
- adafruit-mcp3008
package: platypush.plugins.gpio.sensor.mcp3008
type: plugin

View file

@ -1,15 +1,25 @@
import enum import enum
from typing import Dict, List
from typing_extensions import override
from platypush.common.sensors import Numeric
from platypush.entities.devices import Device
from platypush.entities.sensors import NumericSensor
from platypush.plugins import action from platypush.plugins import action
from platypush.plugins.gpio.sensor import GpioSensorPlugin from platypush.plugins.sensor import SensorPlugin
class MCP3008Mode(enum.Enum): class MCP3008Mode(enum.Enum):
"""
MPC3008 mode enum (``hardware`` or ``software``).
"""
SOFTWARE = 'software' SOFTWARE = 'software'
HARDWARE = 'hardware' HARDWARE = 'hardware'
class GpioSensorMcp3008Plugin(GpioSensorPlugin): # pylint: disable=too-many-ancestors
class GpioSensorMcp3008Plugin(SensorPlugin):
""" """
Plugin to read analog sensor values from an MCP3008 chipset. The MCP3008 Plugin to read analog sensor values from an MCP3008 chipset. The MCP3008
chipset is a circuit that allows you to read measurements from multiple chipset is a circuit that allows you to read measurements from multiple
@ -21,13 +31,30 @@ class GpioSensorMcp3008Plugin(GpioSensorPlugin):
Requires: Requires:
* ``adafruit-mcp3008`` (``pip install adafruit-mcp3008``) * ``adafruit-mcp3008`` (``pip install adafruit-mcp3008``)
Triggers:
* :class:`platypush.message.event.sensor.SensorDataAboveThresholdEvent`
* :class:`platypush.message.event.sensor.SensorDataBelowThresholdEvent`
* :class:`platypush.message.event.sensor.SensorDataChangeEvent`
""" """
N_CHANNELS = 8 N_CHANNELS = 8
# noinspection PyPep8Naming # noinspection PyPep8Naming
def __init__(self, CLK=None, MISO=None, MOSI=None, CS=None, spi_port=None, def __init__(
spi_device=None, channels=None, Vdd=3.3, **kwargs): self,
CLK=None,
MISO=None,
MOSI=None,
CS=None,
spi_port=None,
spi_device=None,
channels=None,
Vdd=3.3,
**kwargs,
):
""" """
The MCP3008 can be connected in two modes: The MCP3008 can be connected in two modes:
@ -97,9 +124,11 @@ class GpioSensorMcp3008Plugin(GpioSensorPlugin):
self.spi_device = spi_device self.spi_device = spi_device
self.mode = MCP3008Mode.HARDWARE self.mode = MCP3008Mode.HARDWARE
else: else:
raise RuntimeError("At least one mode must be specified.\n" + raise RuntimeError(
"Software SPI: Specify CLK, MISO, MOSI and CS pins\n" + "At least one mode must be specified.\n"
"Hardware SPI: Specify spi_port and spi_device\n") + "Software SPI: Specify CLK, MISO, MOSI and CS pins\n"
+ "Hardware SPI: Specify spi_port and spi_device\n"
)
self.Vdd = Vdd self.Vdd = Vdd
self.channels = channels if channels else {} self.channels = channels if channels else {}
@ -110,18 +139,22 @@ class GpioSensorMcp3008Plugin(GpioSensorPlugin):
import Adafruit_MCP3008 import Adafruit_MCP3008
if self.mode == MCP3008Mode.SOFTWARE: if self.mode == MCP3008Mode.SOFTWARE:
self.mcp = Adafruit_MCP3008.MCP3008(clk=self.CLK, cs=self.CS, self.mcp = Adafruit_MCP3008.MCP3008(
miso=self.MISO, mosi=self.MOSI) clk=self.CLK, cs=self.CS, miso=self.MISO, mosi=self.MOSI
)
elif self.mode == MCP3008Mode.HARDWARE: elif self.mode == MCP3008Mode.HARDWARE:
self.mcp = Adafruit_MCP3008.MCP3008(spi=SPI.SpiDev(self.spi_port, self.spi_device)) self.mcp = Adafruit_MCP3008.MCP3008(
spi=SPI.SpiDev(self.spi_port, self.spi_device)
)
else: else:
raise RuntimeError('Unsupported MCP3008 mode: {}'.format(self.mode)) raise RuntimeError(f'Unsupported MCP3008 mode: {self.mode}')
return self.mcp return self.mcp
def _convert_to_voltage(self, value): def _convert_to_voltage(self, value):
return (value * self.Vdd) / 1023.0 if value is not None else None return (value * self.Vdd) / 1023.0 if value is not None else None
@override
@action @action
def get_measurement(self): def get_measurement(self):
""" """
@ -155,8 +188,7 @@ class GpioSensorMcp3008Plugin(GpioSensorPlugin):
if i in self.channels: if i in self.channels:
channel = self.channels[i] channel = self.channels[i]
if 'conv_function' in channel: if 'conv_function' in channel:
# noinspection PyUnusedLocal x = value # noqa
x = value # lgtm [py/unused-local-variable]
value = eval(channel['conv_function']) value = eval(channel['conv_function'])
values[channel['name']] = value values[channel['name']] = value
@ -165,5 +197,22 @@ class GpioSensorMcp3008Plugin(GpioSensorPlugin):
return values return values
@override
def transform_entities(self, entities: Dict[str, Numeric]) -> List[Device]:
return [
Device(
id='mcp3008',
name='MCP3008',
children=[
NumericSensor(
id=f'mcp3008:{key}',
name=key,
value=value,
)
for key, value in entities.items()
],
)
]
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et:

View file

@ -0,0 +1,10 @@
manifest:
events:
platypush.message.event.sensor.SensorDataAboveThresholdEvent:
platypush.message.event.sensor.SensorDataBelowThresholdEvent:
platypush.message.event.sensor.SensorDataChangeEvent:
install:
pip:
- adafruit-mcp3008
package: platypush.plugins.sensor.mcp3008
type: plugin