platypush/platypush/plugins/gpio/__init__.py
Fabio Manganiello c3337ccc6c
[#311] Docs deps autogen sphinx plugin.
Added an `add_dependencies` plugin to the Sphinx build process that
parses the manifest files of the scanned backends and plugins and
automatically generates the documentation for the required dependencies
and triggered events.

This means that those dependencies are no longer required to be listed
in the docstring of the class itself.

Also in this commit:

- Black/LINT for some integrations that hadn't been touched in a long
  time.

- Deleted some leftovers from previous refactors (deprecated
  `backend.mqtt`, `backend.zwave.mqtt`, `backend.http.request.rss`).

- Deleted deprecated `inotify` backend - replaced by `file.monitor` (see
  #289).
2023-09-24 17:00:08 +02:00

225 lines
6.3 KiB
Python

import threading
from typing import Any, Optional, Dict, Union, Collection
from platypush.context import get_bus
from platypush.message.event.gpio import GPIOEvent
from platypush.plugins import RunnablePlugin, action
class GpioPlugin(RunnablePlugin):
"""
This plugin can be used to interact with custom electronic devices
connected to a Raspberry Pi (or compatible device) over GPIO pins.
"""
def __init__(
self,
pins: Optional[Dict[str, int]] = None,
monitored_pins: Optional[Collection[Union[str, int]]] = None,
mode: str = 'board',
**kwargs
):
"""
:param mode: Specify ``board`` if you want to use the board PIN numbers,
``bcm`` for Broadcom PIN numbers (default: ``board``)
:param pins: Custom `GPIO name` -> `PIN number` mapping. This can be
useful if you want to reference your GPIO ports by name instead
of PIN number.
Example:
.. code-block:: yaml
pins:
LED_1: 14,
LED_2: 15,
MOTOR: 16,
SENSOR_1: 17
SENSOR_2: 18
:param monitored_pins: List of PINs to monitor. If a new value is detected
on these pins then a :class:`platypush.message.event.gpio.GPIOEvent`
event will be triggered. GPIO PINS can be referenced either by number
or name, if a name is specified on the `pins` argument.
"""
super().__init__(**kwargs)
self.mode = self._get_mode(mode)
self._initialized = False
self._init_lock = threading.RLock()
self._initialized_pins = {}
self._monitored_pins = monitored_pins or []
self.pins_by_name = pins if pins else {}
self.pins_by_number = {
number: name for (name, number) in self.pins_by_name.items()
}
def _init_board(self):
import RPi.GPIO as GPIO
with self._init_lock:
if self._initialized and GPIO.getmode():
return
GPIO.setmode(self.mode)
self._initialized = True
def _get_pin_number(self, pin):
try:
pin = int(str(pin))
except ValueError:
pin = self.pins_by_name.get(pin)
if not pin:
raise RuntimeError('Unknown PIN name: "{}"'.format(pin))
return pin
@staticmethod
def _get_mode(mode_str: str) -> int:
import RPi.GPIO as GPIO
mode_str = mode_str.upper()
assert mode_str in ['BOARD', 'BCM'], 'Invalid mode: {}'.format(mode_str)
return getattr(GPIO, mode_str)
def on_gpio_event(self):
def callback(pin: int):
import RPi.GPIO as GPIO
value = GPIO.input(pin)
pin = self.pins_by_number.get(pin, pin)
get_bus().post(GPIOEvent(pin=pin, value=value))
return callback
def main(self):
import RPi.GPIO as GPIO
if not self._monitored_pins:
return # No need to start the monitor
self._init_board()
monitored_pins = [self._get_pin_number(pin) for pin in self._monitored_pins]
for pin in monitored_pins:
GPIO.setup(pin, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
GPIO.add_event_detect(pin, GPIO.BOTH, callback=self.on_gpio_event())
self.wait_stop()
@action
def write(
self, pin: Union[int, str], value: Union[int, bool], name: Optional[str] = None
) -> Dict[str, Any]:
"""
Write a byte value to a pin.
:param pin: PIN number or configured name
:param name: Optional name for the written value (e.g. "temperature" or "humidity")
:param value: Value to write
Response::
output = {
"name": <pin or metric name>,
"pin": <pin>,
"value": <value>,
"method": "write"
}
"""
import RPi.GPIO as GPIO
self._init_board()
name = name or pin
pin = self._get_pin_number(pin)
if pin not in self._initialized_pins or self._initialized_pins[pin] != GPIO.OUT:
GPIO.setup(pin, GPIO.OUT)
self._initialized_pins[pin] = GPIO.OUT
GPIO.setup(pin, GPIO.OUT)
GPIO.output(pin, value)
return {
'name': name,
'pin': pin,
'value': value,
'method': 'write',
}
@action
def read(self, pin: Union[int, str], name: Optional[str] = None) -> Dict[str, Any]:
"""
Reads a value from a PIN.
:param pin: PIN number or configured name.
:param name: Optional name for the read value (e.g. "temperature" or "humidity")
Response::
output = {
"name": <pin number or pin/metric name>,
"pin": <pin>,
"value": <value>,
"method": "read"
}
"""
import RPi.GPIO as GPIO
self._init_board()
name = name or pin
pin = self._get_pin_number(pin)
if pin not in self._initialized_pins:
GPIO.setup(pin, GPIO.IN)
self._initialized_pins[pin] = GPIO.IN
val = GPIO.input(pin)
return {
'name': name,
'pin': pin,
'value': val,
'method': 'read',
}
@action
def get_measurement(self, pin=None):
if pin is None:
return self.read_all()
return self.read(pin)
@action
def read_all(self):
"""
Reads the values from all the configured PINs and returns them as a list. It will raise a RuntimeError if no
PIN mappings were configured.
"""
if not self.pins_by_number:
raise RuntimeError("No PIN mappings were provided/configured")
values = []
for pin, name in self.pins_by_number.items():
# noinspection PyUnresolvedReferences
values.append(self.read(pin=pin, name=name).output)
return values
@action
def cleanup(self):
"""
Cleanup the state of the GPIO and resets PIN values.
"""
import RPi.GPIO as GPIO
with self._init_lock:
if self._initialized:
GPIO.cleanup()
self._initialized_pins = {}
self._initialized = False
# vim:sw=4:ts=4:et: