[#212] Support for asynchronous event monitoring on the GPIO plugin

This commit is contained in:
Fabio Manganiello 2022-03-27 16:14:30 +02:00
parent 34e1e673e8
commit ffd23cf04d
4 changed files with 86 additions and 17 deletions

View file

@ -0,0 +1,17 @@
from typing import Union
from platypush.message.event import Event
class GPIOEvent(Event):
"""
Event triggered when the value on a GPIO PIN changes.
"""
def __init__(self, pin: Union[int, str], value: int, *args, **kwargs):
"""
:param pin: PIN number or name.
:param value: Current value of the PIN.
"""
super().__init__(*args, pin=pin, value=value, **kwargs)

View file

@ -88,7 +88,8 @@ class RunnablePlugin(Plugin):
if self._thread and self._thread.is_alive(): if self._thread and self._thread.is_alive():
self.logger.info(f'Waiting for {self.__class__.__name__} to stop') self.logger.info(f'Waiting for {self.__class__.__name__} to stop')
try: try:
self._thread.join() if self._thread:
self._thread.join()
except Exception as e: except Exception as e:
self.logger.warning(f'Could not join thread on stop: {e}') self.logger.warning(f'Could not join thread on stop: {e}')

View file

@ -3,35 +3,58 @@
""" """
import threading import threading
from typing import Any, Optional, Dict, Union from typing import Any, Optional, Dict, Union, Collection
from platypush.plugins import Plugin, action from platypush.context import get_bus
from platypush.message.event.gpio import GPIOEvent
from platypush.plugins import RunnablePlugin, action
class GpioPlugin(Plugin): class GpioPlugin(RunnablePlugin):
""" """
Plugin to handle raw read/write operation on the Raspberry Pi GPIO pins. This plugin can be used to interact with custom electronic devices
connected to a Raspberry Pi (or compatible device) over GPIO pins.
Requires: Requires:
* **RPi.GPIO** (``pip install RPi.GPIO``) * **RPi.GPIO** (``pip install RPi.GPIO``)
Triggers:
* :class:`platypush.message.event.gpio.GPIOEvent` when the value of a
monitored PIN changes.
""" """
def __init__(self, pins: Optional[Dict[str, int]] = None, mode: str = 'board', **kwargs): def __init__(
self,
pins: Optional[Dict[str, int]] = None,
monitored_pins: Optional[Collection[Union[str, int]]] = None,
mode: str = 'board',
**kwargs
):
""" """
:param mode: Specify 'board' if you want to use the board PIN numbers, :param mode: Specify ``board`` if you want to use the board PIN numbers,
'bcm' for Broadcom PIN numbers (default: 'board') ``bcm`` for Broadcom PIN numbers (default: ``board``)
:param pins: Configuration for the GPIO PINs as a name -> pin_number map. :param pins: Custom `GPIO name` -> `PIN number` mapping. This can be
useful if you want to reference your GPIO ports by name instead
of PIN number.
Example:: Example:
{ .. code-block:: yaml
"LED_1": 14,
"LED_2": 15,
"MOTOR": 16,
"SENSOR": 17
}
pins:
LED_1: 14,
LED_2: 15,
MOTOR: 16,
SENSOR_1: 17
SENSOR_2: 18
:param monitored_pins: List of PINs to monitor. If a new value is detected
on these pins then a :class:`platypush.message.event.gpio.GPIOEvent`
event will be triggered. GPIO PINS can be referenced either by number
or name, if a name is specified on the `pins` argument.
""" """
super().__init__(**kwargs) super().__init__(**kwargs)
@ -39,6 +62,7 @@ class GpioPlugin(Plugin):
self._initialized = False self._initialized = False
self._init_lock = threading.RLock() self._init_lock = threading.RLock()
self._initialized_pins = {} self._initialized_pins = {}
self._monitored_pins = monitored_pins or []
self.pins_by_name = pins if pins else {} self.pins_by_name = pins if pins else {}
self.pins_by_number = {number: name self.pins_by_number = {number: name
for (name, number) in self.pins_by_name.items()} for (name, number) in self.pins_by_name.items()}
@ -71,6 +95,31 @@ class GpioPlugin(Plugin):
assert mode_str in ['BOARD', 'BCM'], 'Invalid mode: {}'.format(mode_str) assert mode_str in ['BOARD', 'BCM'], 'Invalid mode: {}'.format(mode_str)
return getattr(GPIO, mode_str) return getattr(GPIO, mode_str)
def on_gpio_event(self):
def callback(pin: int):
import RPi.GPIO as GPIO
value = GPIO.input(pin)
pin = self.pins_by_number.get(pin, pin)
get_bus().post(GPIOEvent(pin=pin, value=value))
return callback
def main(self):
import RPi.GPIO as GPIO
if not self._monitored_pins:
return # No need to start the monitor
self._init_board()
monitored_pins = [
self._get_pin_number(pin) for pin in self._monitored_pins
]
for pin in monitored_pins:
GPIO.setup(pin, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
GPIO.add_event_detect(pin, GPIO.BOTH, callback=self.on_gpio_event())
self._should_stop.wait()
@action @action
def write(self, pin: Union[int, str], value: Union[int, bool], def write(self, pin: Union[int, str], value: Union[int, bool],
name: Optional[str] = None) -> Dict[str, Any]: name: Optional[str] = None) -> Dict[str, Any]:

View file

@ -1,5 +1,7 @@
manifest: manifest:
events: {} events:
- platypush.message.event.gpio.GPIOEvent:
When the value of a monitored PIN changes.
install: install:
pip: pip:
- RPi.GPIO - RPi.GPIO