Commit c5395cc9 authored by Fabio Manganiello's avatar Fabio Manganiello
Browse files

Merge branch '212-starting-cronjob-with-high-pin' into 'master'

Resolve "Starting Cronjob with High Pin"

Closes #212

See merge request !11
parents 34e1e673 88846aa8
Pipeline #171 passed with stages
in 10 minutes and 12 seconds
......@@ -3,11 +3,33 @@
All notable changes to this project will be documented in this file.
Given the high speed of development in the first phase, changes are being reported only starting from v0.20.2.
## [Unreleased]
## [0.23.2] - 2022-03-27
### Added
- Simplified script API to interact with platform variables (
- Support for asynchronous events over GPIO PINs. It is now possible to specify
a list of `monitored_pins` in the [`gpio`
configuration. A change in the value on those GPIO PINs (caused by e.g. a
button, a binary sensor or a probe) will trigger a
`platypush.message.event.gpio.GPIOEvent` that you can use in your automation
- Simplified script API to interact with platform variables
(closes [#206](
You can now read and write stored variables in your Platypush scripts through
a much more intuitive interface compared to explicitly using the `variable`
plugin explicitly:
from platypush.context import Variable
# ...
my_var = Variable.get('my_var')
my_var = int(my_var) + 1
## [0.23.0] - 2022-03-01
from typing import Union
from platypush.message.event import Event
class GPIOEvent(Event):
Event triggered when the value on a GPIO PIN changes.
def __init__(self, pin: Union[int, str], value: int, *args, **kwargs):
:param pin: PIN number or name.
:param value: Current value of the PIN.
super().__init__(*args, pin=pin, value=value, **kwargs)
......@@ -88,7 +88,8 @@ class RunnablePlugin(Plugin):
if self._thread and self._thread.is_alive():'Waiting for {self.__class__.__name__} to stop')
if self._thread:
except Exception as e:
self.logger.warning(f'Could not join thread on stop: {e}')
......@@ -3,35 +3,58 @@
import threading
from typing import Any, Optional, Dict, Union
from typing import Any, Optional, Dict, Union, Collection
from platypush.plugins import Plugin, action
from platypush.context import get_bus
from platypush.message.event.gpio import GPIOEvent
from platypush.plugins import RunnablePlugin, action
class GpioPlugin(Plugin):
class GpioPlugin(RunnablePlugin):
Plugin to handle raw read/write operation on the Raspberry Pi GPIO pins.
This plugin can be used to interact with custom electronic devices
connected to a Raspberry Pi (or compatible device) over GPIO pins.
* **RPi.GPIO** (``pip install RPi.GPIO``)
def __init__(self, pins: Optional[Dict[str, int]] = None, mode: str = 'board', **kwargs):
:param mode: Specify 'board' if you want to use the board PIN numbers,
'bcm' for Broadcom PIN numbers (default: 'board')
:param pins: Configuration for the GPIO PINs as a name -> pin_number map.
* :class:`platypush.message.event.gpio.GPIOEvent` when the value of a
monitored PIN changes.
"LED_1": 14,
"LED_2": 15,
"MOTOR": 16,
"SENSOR": 17
def __init__(
pins: Optional[Dict[str, int]] = None,
monitored_pins: Optional[Collection[Union[str, int]]] = None,
mode: str = 'board',
:param mode: Specify ``board`` if you want to use the board PIN numbers,
``bcm`` for Broadcom PIN numbers (default: ``board``)
:param pins: Custom `GPIO name` -> `PIN number` mapping. This can be
useful if you want to reference your GPIO ports by name instead
of PIN number.
.. code-block:: yaml
LED_1: 14,
LED_2: 15,
MOTOR: 16,
SENSOR_1: 17
SENSOR_2: 18
:param monitored_pins: List of PINs to monitor. If a new value is detected
on these pins then a :class:`platypush.message.event.gpio.GPIOEvent`
event will be triggered. GPIO PINS can be referenced either by number
or name, if a name is specified on the `pins` argument.
......@@ -39,6 +62,7 @@ class GpioPlugin(Plugin):
self._initialized = False
self._init_lock = threading.RLock()
self._initialized_pins = {}
self._monitored_pins = monitored_pins or []
self.pins_by_name = pins if pins else {}
self.pins_by_number = {number: name
for (name, number) in self.pins_by_name.items()}
......@@ -71,6 +95,31 @@ class GpioPlugin(Plugin):
assert mode_str in ['BOARD', 'BCM'], 'Invalid mode: {}'.format(mode_str)
return getattr(GPIO, mode_str)
def on_gpio_event(self):
def callback(pin: int):
import RPi.GPIO as GPIO
value = GPIO.input(pin)
pin = self.pins_by_number.get(pin, pin)
get_bus().post(GPIOEvent(pin=pin, value=value))
return callback
def main(self):
import RPi.GPIO as GPIO
if not self._monitored_pins:
return # No need to start the monitor
monitored_pins = [
self._get_pin_number(pin) for pin in self._monitored_pins
for pin in monitored_pins:
GPIO.setup(pin, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
GPIO.add_event_detect(pin, GPIO.BOTH, callback=self.on_gpio_event())
def write(self, pin: Union[int, str], value: Union[int, bool],
name: Optional[str] = None) -> Dict[str, Any]:
events: {}
- platypush.message.event.gpio.GPIOEvent:
When the value of a monitored PIN changes.
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment