forked from platypush/platypush
Migrated arduino
integration to the new SensorPlugin
API.
This commit is contained in:
parent
cf16076bce
commit
d5ddc0c65e
5 changed files with 282 additions and 179 deletions
|
@ -1,4 +1,7 @@
|
||||||
{
|
{
|
||||||
|
"arduino": {
|
||||||
|
"class": "fas fa-microchip"
|
||||||
|
},
|
||||||
"bluetooth": {
|
"bluetooth": {
|
||||||
"class": "fab fa-bluetooth"
|
"class": "fab fa-bluetooth"
|
||||||
},
|
},
|
||||||
|
|
|
@ -1,26 +0,0 @@
|
||||||
from platypush.backend.sensor import SensorBackend
|
|
||||||
|
|
||||||
|
|
||||||
class SensorArduinoBackend(SensorBackend):
|
|
||||||
"""
|
|
||||||
This backend listens for new events from an Arduino with a Firmata-compatible firmware.
|
|
||||||
|
|
||||||
Requires:
|
|
||||||
|
|
||||||
* The :class:`platypush.plugins.arduino.ArduinoPlugin` plugin configured.
|
|
||||||
|
|
||||||
Triggers:
|
|
||||||
|
|
||||||
* :class:`platypush.message.event.sensor.SensorDataChangeEvent` if the measurements of a sensor have changed
|
|
||||||
* :class:`platypush.message.event.sensor.SensorDataAboveThresholdEvent` if the measurements of a sensor have
|
|
||||||
gone above a configured threshold
|
|
||||||
* :class:`platypush.message.event.sensor.SensorDataBelowThresholdEvent` if the measurements of a sensor have
|
|
||||||
gone below a configured threshold
|
|
||||||
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, **kwargs):
|
|
||||||
super().__init__(plugin='arduino', **kwargs)
|
|
||||||
|
|
||||||
|
|
||||||
# vim:sw=4:ts=4:et:
|
|
|
@ -1,12 +0,0 @@
|
||||||
manifest:
|
|
||||||
events:
|
|
||||||
platypush.message.event.sensor.SensorDataAboveThresholdEvent: if the measurements
|
|
||||||
of a sensor havegone above a configured threshold
|
|
||||||
platypush.message.event.sensor.SensorDataBelowThresholdEvent: if the measurements
|
|
||||||
of a sensor havegone below a configured threshold
|
|
||||||
platypush.message.event.sensor.SensorDataChangeEvent: if the measurements of a
|
|
||||||
sensor have changed
|
|
||||||
install:
|
|
||||||
pip: []
|
|
||||||
package: platypush.backend.sensor.arduino
|
|
||||||
type: backend
|
|
|
@ -1,26 +1,49 @@
|
||||||
import enum
|
import enum
|
||||||
import time
|
import time
|
||||||
|
|
||||||
from typing import Optional, Dict, Union, Callable, Tuple, Type
|
from typing import List, Optional, Dict, Union, Callable, Tuple, Type
|
||||||
|
from typing_extensions import override
|
||||||
|
|
||||||
from pyfirmata2 import Arduino, ArduinoMega, ArduinoDue, ArduinoNano, Pin, util, ANALOG, INPUT, PWM
|
from pyfirmata2 import (
|
||||||
|
Arduino,
|
||||||
|
ArduinoMega,
|
||||||
|
ArduinoDue,
|
||||||
|
ArduinoNano,
|
||||||
|
Board,
|
||||||
|
Pin,
|
||||||
|
util,
|
||||||
|
ANALOG,
|
||||||
|
INPUT,
|
||||||
|
PWM,
|
||||||
|
)
|
||||||
|
|
||||||
|
from platypush.common.sensors import Numeric
|
||||||
|
from platypush.entities.devices import Device
|
||||||
|
from platypush.entities.sensors import NumericSensor
|
||||||
from platypush.plugins import action
|
from platypush.plugins import action
|
||||||
from platypush.plugins.sensor import SensorPlugin
|
from platypush.plugins.sensor import SensorPlugin
|
||||||
|
|
||||||
|
|
||||||
class PinType(enum.IntEnum):
|
class PinType(enum.IntEnum):
|
||||||
|
"""
|
||||||
|
PIN type enumeration (analog or digital).
|
||||||
|
"""
|
||||||
|
|
||||||
ANALOG = 1
|
ANALOG = 1
|
||||||
DIGITAL = 2
|
DIGITAL = 2
|
||||||
|
|
||||||
|
|
||||||
class BoardType(enum.Enum):
|
class BoardType(enum.Enum):
|
||||||
|
"""
|
||||||
|
Board types.
|
||||||
|
"""
|
||||||
|
|
||||||
MEGA = 'mega'
|
MEGA = 'mega'
|
||||||
DUE = 'due'
|
DUE = 'due'
|
||||||
NANO = 'nano'
|
NANO = 'nano'
|
||||||
|
|
||||||
|
|
||||||
# noinspection PyBroadException
|
# pylint: disable=too-many-ancestors
|
||||||
class ArduinoPlugin(SensorPlugin):
|
class ArduinoPlugin(SensorPlugin):
|
||||||
"""
|
"""
|
||||||
Interact with an Arduino connected to the host machine over USB using the
|
Interact with an Arduino connected to the host machine over USB using the
|
||||||
|
@ -41,23 +64,32 @@ class ArduinoPlugin(SensorPlugin):
|
||||||
|
|
||||||
* **pyfirmata2** (``pip install pyfirmata2``)
|
* **pyfirmata2** (``pip install pyfirmata2``)
|
||||||
|
|
||||||
|
Triggers:
|
||||||
|
|
||||||
|
* :class:`platypush.message.event.sensor.SensorDataAboveThresholdEvent`
|
||||||
|
* :class:`platypush.message.event.sensor.SensorDataBelowThresholdEvent`
|
||||||
|
* :class:`platypush.message.event.sensor.SensorDataChangeEvent`
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self,
|
def __init__(
|
||||||
|
self,
|
||||||
board: Optional[str] = None,
|
board: Optional[str] = None,
|
||||||
board_type: Optional[str] = None,
|
board_type: Optional[str] = None,
|
||||||
baud_rate: int = 57600,
|
baud_rate: int = 9600,
|
||||||
analog_pins: Optional[Dict[str, int]] = None,
|
analog_pins: Optional[Dict[str, int]] = None,
|
||||||
digital_pins: Optional[Dict[str, int]] = None,
|
digital_pins: Optional[Dict[str, int]] = None,
|
||||||
timeout: float = 20.0,
|
timeout: float = 20.0,
|
||||||
conv_functions: Optional[Dict[Union[str, int], Union[str, Callable]]] = None,
|
conv_functions: Optional[Dict[Union[str, int], Union[str, Callable]]] = None,
|
||||||
**kwargs):
|
poll_interval: float = 1.0,
|
||||||
|
**kwargs,
|
||||||
|
):
|
||||||
"""
|
"""
|
||||||
:param board: Default board name or path (e.g. ``COM3`` on Windows or ``/dev/ttyUSB0`` on Unix). If not set
|
:param board: Default board name or path (e.g. ``COM3`` on Windows or ``/dev/ttyUSB0`` on Unix). If not set
|
||||||
then the plugin will attempt an auto-discovery.
|
then the plugin will attempt an auto-discovery.
|
||||||
|
|
||||||
:param board_type: Default board type. It can be 'mega', 'due' or 'nano'. Leave empty for auto-detection.
|
:param board_type: Default board type. It can be 'mega', 'due' or 'nano'. Leave empty for auto-detection.
|
||||||
:param baud_rate: Default serial baud rate (default: 57600)
|
:param baud_rate: Default serial baud rate (default: 9600)
|
||||||
:param analog_pins: Optional analog PINs map name->pin_number.
|
:param analog_pins: Optional analog PINs map name->pin_number.
|
||||||
:param digital_pins: Optional digital PINs map name->pin_number.
|
:param digital_pins: Optional digital PINs map name->pin_number.
|
||||||
:param timeout: Board communication timeout in seconds.
|
:param timeout: Board communication timeout in seconds.
|
||||||
|
@ -77,7 +109,7 @@ class ArduinoPlugin(SensorPlugin):
|
||||||
temperature: 'lambda t: t * 500.0'
|
temperature: 'lambda t: t * 500.0'
|
||||||
|
|
||||||
"""
|
"""
|
||||||
super().__init__(**kwargs)
|
super().__init__(poll_interval=poll_interval, **kwargs)
|
||||||
|
|
||||||
self.board = board
|
self.board = board
|
||||||
self.board_type = self._get_board_type(board_type)
|
self.board_type = self._get_board_type(board_type)
|
||||||
|
@ -90,20 +122,28 @@ class ArduinoPlugin(SensorPlugin):
|
||||||
}
|
}
|
||||||
|
|
||||||
self._pin_name_by_number = {
|
self._pin_name_by_number = {
|
||||||
PinType.ANALOG: {number: name for name, number in self._pin_number_by_name[PinType.ANALOG].items()},
|
PinType.ANALOG: {
|
||||||
PinType.DIGITAL: {number: name for name, number in self._pin_number_by_name[PinType.DIGITAL].items()},
|
number: name
|
||||||
|
for name, number in self._pin_number_by_name[PinType.ANALOG].items()
|
||||||
|
},
|
||||||
|
PinType.DIGITAL: {
|
||||||
|
number: name
|
||||||
|
for name, number in self._pin_number_by_name[PinType.DIGITAL].items()
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
self.conv_functions: Dict[Union[str, int], Callable] = {
|
self.conv_functions: Dict[Union[str, int], Callable] = {
|
||||||
(self._pin_number_by_name[PinType.ANALOG].get(pin, pin)): (f if callable(f) else eval(f))
|
(self._pin_number_by_name[PinType.ANALOG].get(str(pin), pin)): (
|
||||||
|
f if callable(f) else eval(f)
|
||||||
|
)
|
||||||
for pin, f in (conv_functions or {}).items()
|
for pin, f in (conv_functions or {}).items()
|
||||||
}
|
}
|
||||||
|
|
||||||
self._boards = {}
|
self._boards: Dict[str, Board] = {}
|
||||||
self._board_iterators = {}
|
self._board_iterators: Dict[str, util.Iterator] = {}
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _get_board_type(board_type: Optional[str] = None) -> Type:
|
def _get_board_type(board_type: Optional[str] = None) -> Type[Board]:
|
||||||
if not board_type:
|
if not board_type:
|
||||||
return Arduino
|
return Arduino
|
||||||
|
|
||||||
|
@ -115,13 +155,15 @@ class ArduinoPlugin(SensorPlugin):
|
||||||
if board_type == BoardType.MEGA.value:
|
if board_type == BoardType.MEGA.value:
|
||||||
return ArduinoMega
|
return ArduinoMega
|
||||||
|
|
||||||
raise AssertionError('Invalid board_type: {}'.format(board_type))
|
raise AssertionError(f'Invalid board_type: {board_type}')
|
||||||
|
|
||||||
def _get_board(self,
|
def _get_board(
|
||||||
|
self,
|
||||||
board: Optional[str] = None,
|
board: Optional[str] = None,
|
||||||
board_type: Optional[str] = None,
|
board_type: Optional[str] = None,
|
||||||
baud_rate: Optional[int] = None,
|
baud_rate: Optional[int] = None,
|
||||||
timeout: Optional[float] = None):
|
timeout: Optional[float] = None,
|
||||||
|
) -> Board:
|
||||||
board_name = board or self.board or Arduino.AUTODETECT
|
board_name = board or self.board or Arduino.AUTODETECT
|
||||||
baud_rate = baud_rate or self.baud_rate
|
baud_rate = baud_rate or self.baud_rate
|
||||||
timeout = timeout or self.timeout
|
timeout = timeout or self.timeout
|
||||||
|
@ -129,48 +171,58 @@ class ArduinoPlugin(SensorPlugin):
|
||||||
if board_name in self._boards:
|
if board_name in self._boards:
|
||||||
return self._boards[board_name]
|
return self._boards[board_name]
|
||||||
|
|
||||||
board_type = self._get_board_type(board_type) if board_type else self.board_type
|
board_obj_type = (
|
||||||
board = board_type(board_name, baudrate=baud_rate, timeout=timeout)
|
self._get_board_type(board_type) if board_type else self.board_type
|
||||||
self.logger.info('Connected to board {}'.format(board.name))
|
)
|
||||||
self._boards[board_name] = board
|
assert board_obj_type
|
||||||
|
|
||||||
self._board_iterators[board.name] = util.Iterator(board)
|
board_obj = board_obj_type(board_name, baudrate=baud_rate, timeout=timeout)
|
||||||
self._board_iterators[board.name].start()
|
board_name = board_obj.name or ''
|
||||||
return board
|
self.logger.info('Connected to board %s', board_name)
|
||||||
|
self._boards[board_name] = board_obj
|
||||||
|
self._board_iterators[board_name] = util.Iterator(board_obj)
|
||||||
|
self._board_iterators[board_name].start()
|
||||||
|
return board_obj
|
||||||
|
|
||||||
def _get_board_and_pin(self,
|
def _get_board_and_pin(
|
||||||
|
self,
|
||||||
pin: Union[int, str],
|
pin: Union[int, str],
|
||||||
pin_type: PinType,
|
pin_type: PinType,
|
||||||
board: Optional[str] = None,
|
board: Optional[str] = None,
|
||||||
board_type: Optional[str] = None,
|
board_type: Optional[str] = None,
|
||||||
baud_rate: Optional[int] = None,
|
baud_rate: Optional[int] = None,
|
||||||
timeout: Optional[int] = None) -> Tuple[Arduino, int]:
|
timeout: Optional[int] = None,
|
||||||
board = self._get_board(board, board_type=board_type, baud_rate=baud_rate, timeout=timeout)
|
) -> Tuple[Board, int]:
|
||||||
|
board_ = self._get_board(
|
||||||
|
board, board_type=board_type, baud_rate=baud_rate, timeout=timeout
|
||||||
|
)
|
||||||
if pin in self._pin_number_by_name[pin_type]:
|
if pin in self._pin_number_by_name[pin_type]:
|
||||||
pin = self._pin_number_by_name[pin_type][pin]
|
pin = self._pin_number_by_name[pin_type][str(pin)]
|
||||||
|
|
||||||
assert isinstance(pin, int), 'Invalid PIN number/name: {}'.format(pin)
|
assert isinstance(pin, int), f'Invalid PIN number/name: {pin}'
|
||||||
return board, pin
|
return board_, pin
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _get_pin(pin: int, board: Arduino, pin_type: PinType) -> Pin:
|
def _get_pin(pin: int, board: Board, pin_type: PinType) -> Pin:
|
||||||
pins = None
|
pins = None
|
||||||
if pin_type == PinType.ANALOG:
|
if pin_type == PinType.ANALOG:
|
||||||
pins = board.analog
|
pins = board.analog
|
||||||
if pin_type == PinType.DIGITAL:
|
if pin_type == PinType.DIGITAL:
|
||||||
pins = board.digital
|
pins = board.digital
|
||||||
|
|
||||||
assert pins, 'Invalid pin_type: {}'.format(pin_type)
|
assert pins, f'Invalid pin_type: {pin_type}'
|
||||||
|
|
||||||
if pins[pin].mode in [ANALOG, INPUT]:
|
if pins[pin].mode in [ANALOG, INPUT]:
|
||||||
pins[pin].enable_reporting()
|
pins[pin].enable_reporting()
|
||||||
return pins[pin]
|
return pins[pin]
|
||||||
|
|
||||||
def _poll_value(self,
|
def _poll_value(
|
||||||
|
self,
|
||||||
pin: int,
|
pin: int,
|
||||||
board: Arduino,
|
board: Board,
|
||||||
pin_type: PinType,
|
pin_type: PinType,
|
||||||
timeout: Optional[float] = None) -> Optional[Union[bool, float]]:
|
timeout: Optional[float] = None,
|
||||||
|
) -> Optional[Union[bool, float]]:
|
||||||
value = None
|
value = None
|
||||||
poll_start = time.time()
|
poll_start = time.time()
|
||||||
|
|
||||||
|
@ -180,8 +232,10 @@ class ArduinoPlugin(SensorPlugin):
|
||||||
|
|
||||||
pin_ = self._get_pin(pin=pin, board=board, pin_type=pin_type)
|
pin_ = self._get_pin(pin=pin, board=board, pin_type=pin_type)
|
||||||
if pin_.mode not in [INPUT, ANALOG]:
|
if pin_.mode not in [INPUT, ANALOG]:
|
||||||
self.logger.warning('PIN {} is not configured in input/analog mode'.format(pin))
|
self.logger.warning(
|
||||||
return
|
'PIN %d is not configured in input/analog mode', pin
|
||||||
|
)
|
||||||
|
return None
|
||||||
|
|
||||||
value = pin_.read()
|
value = pin_.read()
|
||||||
if value is None:
|
if value is None:
|
||||||
|
@ -193,13 +247,15 @@ class ArduinoPlugin(SensorPlugin):
|
||||||
return value
|
return value
|
||||||
|
|
||||||
@action
|
@action
|
||||||
def analog_read(self,
|
def analog_read(
|
||||||
|
self,
|
||||||
pin: Union[int, str],
|
pin: Union[int, str],
|
||||||
board: Optional[str] = None,
|
board: Optional[str] = None,
|
||||||
board_type: Optional[str] = None,
|
board_type: Optional[str] = None,
|
||||||
baud_rate: Optional[int] = None,
|
baud_rate: Optional[int] = None,
|
||||||
conv_function: Optional[Union[str, Callable]] = None,
|
conv_function: Optional[Union[str, Callable]] = None,
|
||||||
timeout: Optional[int] = None) -> float:
|
timeout: Optional[int] = None,
|
||||||
|
) -> Optional[float]:
|
||||||
"""
|
"""
|
||||||
Read an analog value from a PIN.
|
Read an analog value from a PIN.
|
||||||
|
|
||||||
|
@ -212,28 +268,44 @@ class ArduinoPlugin(SensorPlugin):
|
||||||
returns by default float values in the range ``[0.0, 1.0]``.
|
returns by default float values in the range ``[0.0, 1.0]``.
|
||||||
:param timeout: Communication timeout in seconds (default: default configured ``timeout``).
|
:param timeout: Communication timeout in seconds (default: default configured ``timeout``).
|
||||||
"""
|
"""
|
||||||
conv_function = conv_function or self.conv_functions.get(pin)
|
board_, pin = self._get_board_and_pin(
|
||||||
board, pin = self._get_board_and_pin(pin=pin,
|
pin=pin,
|
||||||
pin_type=PinType.ANALOG,
|
pin_type=PinType.ANALOG,
|
||||||
board=board,
|
board=board,
|
||||||
board_type=board_type,
|
board_type=board_type,
|
||||||
baud_rate=baud_rate,
|
baud_rate=baud_rate,
|
||||||
timeout=timeout)
|
timeout=timeout,
|
||||||
|
)
|
||||||
|
|
||||||
conv_function = conv_function or self.conv_functions.get(pin)
|
conv_function = conv_function or self.conv_functions.get(pin)
|
||||||
value = self._poll_value(pin=pin, board=board, pin_type=PinType.ANALOG, timeout=timeout)
|
converter: Optional[Callable[[float], float]] = None
|
||||||
|
if isinstance(conv_function, str):
|
||||||
|
converter = eval(conv_function)
|
||||||
|
elif callable(conv_function):
|
||||||
|
converter = conv_function
|
||||||
|
elif conv_function is not None:
|
||||||
|
raise AssertionError(
|
||||||
|
'Expected conv_function to be null, a string or a function, '
|
||||||
|
f'got "{conv_function}" instead'
|
||||||
|
)
|
||||||
|
|
||||||
if conv_function:
|
value = self._poll_value(
|
||||||
value = conv_function(value)
|
pin=pin, board=board_, pin_type=PinType.ANALOG, timeout=timeout
|
||||||
|
)
|
||||||
|
|
||||||
|
if converter and value is not None:
|
||||||
|
value = converter(value)
|
||||||
return value
|
return value
|
||||||
|
|
||||||
@action
|
@action
|
||||||
def digital_read(self,
|
def digital_read(
|
||||||
|
self,
|
||||||
pin: Union[int, str],
|
pin: Union[int, str],
|
||||||
board: Optional[str] = None,
|
board: Optional[str] = None,
|
||||||
board_type: Optional[str] = None,
|
board_type: Optional[str] = None,
|
||||||
baud_rate: Optional[int] = None,
|
baud_rate: Optional[int] = None,
|
||||||
timeout: Optional[int] = None) -> bool:
|
timeout: Optional[int] = None,
|
||||||
|
) -> bool:
|
||||||
"""
|
"""
|
||||||
Read a digital value from a PIN.
|
Read a digital value from a PIN.
|
||||||
|
|
||||||
|
@ -243,22 +315,31 @@ class ArduinoPlugin(SensorPlugin):
|
||||||
:param baud_rate: Baud rate (default: default configured ``baud_rate``).
|
:param baud_rate: Baud rate (default: default configured ``baud_rate``).
|
||||||
:param timeout: Communication timeout in seconds (default: default configured ``timeout``).
|
:param timeout: Communication timeout in seconds (default: default configured ``timeout``).
|
||||||
"""
|
"""
|
||||||
board, pin = self._get_board_and_pin(pin=pin,
|
board_, pin = self._get_board_and_pin(
|
||||||
|
pin=pin,
|
||||||
pin_type=PinType.DIGITAL,
|
pin_type=PinType.DIGITAL,
|
||||||
board=board,
|
board=board,
|
||||||
board_type=board_type,
|
board_type=board_type,
|
||||||
baud_rate=baud_rate,
|
baud_rate=baud_rate,
|
||||||
timeout=timeout)
|
timeout=timeout,
|
||||||
|
)
|
||||||
|
|
||||||
return self._poll_value(pin=pin, board=board, pin_type=PinType.DIGITAL, timeout=timeout)
|
return bool(
|
||||||
|
self._poll_value(
|
||||||
|
pin=pin, board=board_, pin_type=PinType.DIGITAL, timeout=timeout
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
@action
|
@action
|
||||||
def analog_write(self, pin: Union[int, str],
|
def analog_write(
|
||||||
|
self,
|
||||||
|
pin: Union[int, str],
|
||||||
value: float,
|
value: float,
|
||||||
board: Optional[str] = None,
|
board: Optional[str] = None,
|
||||||
board_type: Optional[str] = None,
|
board_type: Optional[str] = None,
|
||||||
baud_rate: Optional[int] = None,
|
baud_rate: Optional[int] = None,
|
||||||
timeout: Optional[int] = None):
|
timeout: Optional[int] = None,
|
||||||
|
):
|
||||||
"""
|
"""
|
||||||
Write a value to an analog PIN.
|
Write a value to an analog PIN.
|
||||||
|
|
||||||
|
@ -269,21 +350,26 @@ class ArduinoPlugin(SensorPlugin):
|
||||||
:param baud_rate: Baud rate (default: default configured ``baud_rate``).
|
:param baud_rate: Baud rate (default: default configured ``baud_rate``).
|
||||||
:param timeout: Communication timeout in seconds (default: default configured ``timeout``).
|
:param timeout: Communication timeout in seconds (default: default configured ``timeout``).
|
||||||
"""
|
"""
|
||||||
board, pin = self._get_board_and_pin(pin=pin,
|
board_, pin = self._get_board_and_pin(
|
||||||
|
pin=pin,
|
||||||
pin_type=PinType.ANALOG,
|
pin_type=PinType.ANALOG,
|
||||||
board=board,
|
board=board,
|
||||||
board_type=board_type,
|
board_type=board_type,
|
||||||
baud_rate=baud_rate,
|
baud_rate=baud_rate,
|
||||||
timeout=timeout)
|
timeout=timeout,
|
||||||
board.analog[pin].write(value)
|
)
|
||||||
|
board_.analog[pin].write(value)
|
||||||
|
|
||||||
@action
|
@action
|
||||||
def digital_write(self, pin: Union[int, str],
|
def digital_write(
|
||||||
|
self,
|
||||||
|
pin: Union[int, str],
|
||||||
value: bool,
|
value: bool,
|
||||||
board: Optional[str] = None,
|
board: Optional[str] = None,
|
||||||
board_type: Optional[str] = None,
|
board_type: Optional[str] = None,
|
||||||
baud_rate: Optional[int] = None,
|
baud_rate: Optional[int] = None,
|
||||||
timeout: Optional[int] = None):
|
timeout: Optional[int] = None,
|
||||||
|
):
|
||||||
"""
|
"""
|
||||||
Write a value to a digital PIN.
|
Write a value to a digital PIN.
|
||||||
|
|
||||||
|
@ -294,21 +380,26 @@ class ArduinoPlugin(SensorPlugin):
|
||||||
:param baud_rate: Baud rate (default: default configured ``baud_rate``).
|
:param baud_rate: Baud rate (default: default configured ``baud_rate``).
|
||||||
:param timeout: Communication timeout in seconds (default: default configured ``timeout``).
|
:param timeout: Communication timeout in seconds (default: default configured ``timeout``).
|
||||||
"""
|
"""
|
||||||
board, pin = self._get_board_and_pin(pin=pin,
|
board_, pin = self._get_board_and_pin(
|
||||||
|
pin=pin,
|
||||||
pin_type=PinType.DIGITAL,
|
pin_type=PinType.DIGITAL,
|
||||||
board=board,
|
board=board,
|
||||||
board_type=board_type,
|
board_type=board_type,
|
||||||
baud_rate=baud_rate,
|
baud_rate=baud_rate,
|
||||||
timeout=timeout)
|
timeout=timeout,
|
||||||
board.digital[pin].write(value)
|
)
|
||||||
|
board_.digital[pin].write(value)
|
||||||
|
|
||||||
@action
|
@action
|
||||||
def pwm_write(self, pin: Union[int, str],
|
def pwm_write(
|
||||||
|
self,
|
||||||
|
pin: Union[int, str],
|
||||||
value: float,
|
value: float,
|
||||||
board: Optional[str] = None,
|
board: Optional[str] = None,
|
||||||
board_type: Optional[str] = None,
|
board_type: Optional[str] = None,
|
||||||
baud_rate: Optional[int] = None,
|
baud_rate: Optional[int] = None,
|
||||||
timeout: Optional[int] = None):
|
timeout: Optional[int] = None,
|
||||||
|
):
|
||||||
"""
|
"""
|
||||||
Write a PWM value to a digital PIN.
|
Write a PWM value to a digital PIN.
|
||||||
|
|
||||||
|
@ -319,26 +410,32 @@ class ArduinoPlugin(SensorPlugin):
|
||||||
:param baud_rate: Baud rate (default: default configured ``baud_rate``).
|
:param baud_rate: Baud rate (default: default configured ``baud_rate``).
|
||||||
:param timeout: Communication timeout in seconds (default: default configured ``timeout``).
|
:param timeout: Communication timeout in seconds (default: default configured ``timeout``).
|
||||||
"""
|
"""
|
||||||
board, pin = self._get_board_and_pin(pin=pin,
|
board_, pin = self._get_board_and_pin(
|
||||||
|
pin=pin,
|
||||||
pin_type=PinType.DIGITAL,
|
pin_type=PinType.DIGITAL,
|
||||||
board=board,
|
board=board,
|
||||||
board_type=board_type,
|
board_type=board_type,
|
||||||
baud_rate=baud_rate,
|
baud_rate=baud_rate,
|
||||||
timeout=timeout)
|
timeout=timeout,
|
||||||
|
)
|
||||||
|
|
||||||
assert board.digital[pin].PWM_CAPABLE, 'PIN {} is not PWM capable'.format(pin)
|
assert board_.digital[pin].PWM_CAPABLE, f'PIN {pin} is not PWM capable'
|
||||||
if board.digital[pin] != PWM:
|
if board_.digital[pin] != PWM:
|
||||||
board.digital[pin].mode = PWM
|
board_.digital[pin].mode = PWM
|
||||||
time.sleep(0.001)
|
time.sleep(0.001) # 1 μs spike to activate a PWM pin
|
||||||
|
|
||||||
board.digital[pin].write(value)
|
board_.digital[pin].write(value)
|
||||||
|
|
||||||
@action
|
@action
|
||||||
def get_measurement(self,
|
def get_measurement(
|
||||||
|
self,
|
||||||
|
*_,
|
||||||
board: Optional[str] = None,
|
board: Optional[str] = None,
|
||||||
board_type: Optional[str] = None,
|
board_type: Optional[str] = None,
|
||||||
baud_rate: Optional[int] = None,
|
baud_rate: Optional[int] = None,
|
||||||
timeout: Optional[int] = None) -> Dict[str, float]:
|
timeout: Optional[int] = None,
|
||||||
|
**__,
|
||||||
|
) -> Dict[str, Optional[Union[float, bool]]]:
|
||||||
"""
|
"""
|
||||||
Get a measurement from all the configured PINs.
|
Get a measurement from all the configured PINs.
|
||||||
|
|
||||||
|
@ -352,21 +449,35 @@ class ArduinoPlugin(SensorPlugin):
|
||||||
transformed through the configured ``conv_functions``.
|
transformed through the configured ``conv_functions``.
|
||||||
"""
|
"""
|
||||||
ret = {}
|
ret = {}
|
||||||
board = self._get_board(board=board, board_type=board_type, baud_rate=baud_rate, timeout=timeout)
|
board_ = self._get_board(
|
||||||
|
board=board, board_type=board_type, baud_rate=baud_rate, timeout=timeout
|
||||||
|
)
|
||||||
|
|
||||||
for pin in board.analog:
|
assert board_, f'No such board: board={board}, board_type={board_type}'
|
||||||
if self._pin_name_by_number[PinType.ANALOG] and \
|
|
||||||
pin.pin_number not in self._pin_name_by_number[PinType.ANALOG]:
|
for pin in board_.analog:
|
||||||
|
if (
|
||||||
|
self._pin_name_by_number[PinType.ANALOG]
|
||||||
|
and pin.pin_number not in self._pin_name_by_number[PinType.ANALOG]
|
||||||
|
):
|
||||||
continue
|
continue
|
||||||
|
|
||||||
name = self._pin_name_by_number[PinType.ANALOG].get(pin.pin_number, 'A{}'.format(pin.pin_number))
|
name = self._pin_name_by_number[PinType.ANALOG].get(
|
||||||
value = self._poll_value(pin=pin.pin_number, board=board, pin_type=PinType.ANALOG,
|
pin.pin_number, f'A{pin.pin_number}'
|
||||||
timeout=timeout or self.timeout)
|
)
|
||||||
|
value = self._poll_value(
|
||||||
|
pin=pin.pin_number,
|
||||||
|
board=board_,
|
||||||
|
pin_type=PinType.ANALOG,
|
||||||
|
timeout=timeout or self.timeout,
|
||||||
|
)
|
||||||
|
|
||||||
if value is None:
|
if value is None:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
conv_function = self.conv_functions.get(name, self.conv_functions.get(pin.pin_number))
|
conv_function = self.conv_functions.get(
|
||||||
|
name, self.conv_functions.get(pin.pin_number)
|
||||||
|
)
|
||||||
if conv_function:
|
if conv_function:
|
||||||
value = conv_function(value)
|
value = conv_function(value)
|
||||||
|
|
||||||
|
@ -374,8 +485,32 @@ class ArduinoPlugin(SensorPlugin):
|
||||||
|
|
||||||
return ret
|
return ret
|
||||||
|
|
||||||
|
@override
|
||||||
|
def transform_entities(self, entities: Dict[str, Numeric]) -> List[Device]: # type: ignore
|
||||||
|
dev_id = 'arduino'
|
||||||
|
dev_name = 'Arduino'
|
||||||
|
if self.board:
|
||||||
|
dev_id += f':{self.board}'
|
||||||
|
dev_name += f' @ {self.board}'
|
||||||
|
|
||||||
|
return [
|
||||||
|
Device(
|
||||||
|
id=dev_id,
|
||||||
|
name=dev_name,
|
||||||
|
children=[
|
||||||
|
NumericSensor(
|
||||||
|
id=f'{dev_id}:{key}',
|
||||||
|
name=key,
|
||||||
|
value=value,
|
||||||
|
)
|
||||||
|
for key, value in entities.items()
|
||||||
|
],
|
||||||
|
)
|
||||||
|
]
|
||||||
|
|
||||||
@action
|
@action
|
||||||
def close(self):
|
def stop(self):
|
||||||
|
super().stop()
|
||||||
for it in self._board_iterators.values():
|
for it in self._board_iterators.values():
|
||||||
it.stop()
|
it.stop()
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,8 @@
|
||||||
manifest:
|
manifest:
|
||||||
events: {}
|
events:
|
||||||
|
platypush.message.event.sensor.SensorDataAboveThresholdEvent:
|
||||||
|
platypush.message.event.sensor.SensorDataBelowThresholdEvent:
|
||||||
|
platypush.message.event.sensor.SensorDataChangeEvent:
|
||||||
install:
|
install:
|
||||||
pip:
|
pip:
|
||||||
- pyfirmata2
|
- pyfirmata2
|
||||||
|
|
Loading…
Reference in a new issue