diff --git a/platypush/backend/http/webapp/src/assets/icons.json b/platypush/backend/http/webapp/src/assets/icons.json index 6cb81631a..70bf4457c 100644 --- a/platypush/backend/http/webapp/src/assets/icons.json +++ b/platypush/backend/http/webapp/src/assets/icons.json @@ -1,4 +1,7 @@ { + "arduino": { + "class": "fas fa-microchip" + }, "bluetooth": { "class": "fab fa-bluetooth" }, diff --git a/platypush/backend/sensor/arduino/__init__.py b/platypush/backend/sensor/arduino/__init__.py deleted file mode 100644 index ff37ebb61..000000000 --- a/platypush/backend/sensor/arduino/__init__.py +++ /dev/null @@ -1,26 +0,0 @@ -from platypush.backend.sensor import SensorBackend - - -class SensorArduinoBackend(SensorBackend): - """ - This backend listens for new events from an Arduino with a Firmata-compatible firmware. - - Requires: - - * The :class:`platypush.plugins.arduino.ArduinoPlugin` plugin configured. - - Triggers: - - * :class:`platypush.message.event.sensor.SensorDataChangeEvent` if the measurements of a sensor have changed - * :class:`platypush.message.event.sensor.SensorDataAboveThresholdEvent` if the measurements of a sensor have - gone above a configured threshold - * :class:`platypush.message.event.sensor.SensorDataBelowThresholdEvent` if the measurements of a sensor have - gone below a configured threshold - - """ - - def __init__(self, **kwargs): - super().__init__(plugin='arduino', **kwargs) - - -# vim:sw=4:ts=4:et: diff --git a/platypush/backend/sensor/arduino/manifest.yaml b/platypush/backend/sensor/arduino/manifest.yaml deleted file mode 100644 index 539966d75..000000000 --- a/platypush/backend/sensor/arduino/manifest.yaml +++ /dev/null @@ -1,12 +0,0 @@ -manifest: - events: - platypush.message.event.sensor.SensorDataAboveThresholdEvent: if the measurements - of a sensor havegone above a configured threshold - platypush.message.event.sensor.SensorDataBelowThresholdEvent: if the measurements - of a sensor havegone below a configured threshold - platypush.message.event.sensor.SensorDataChangeEvent: if the measurements of a - sensor have changed - install: - pip: [] - package: platypush.backend.sensor.arduino - type: backend diff --git a/platypush/plugins/arduino/__init__.py b/platypush/plugins/arduino/__init__.py index 844aabf07..40fb2b172 100644 --- a/platypush/plugins/arduino/__init__.py +++ b/platypush/plugins/arduino/__init__.py @@ -1,26 +1,49 @@ import enum import time -from typing import Optional, Dict, Union, Callable, Tuple, Type +from typing import List, Optional, Dict, Union, Callable, Tuple, Type +from typing_extensions import override -from pyfirmata2 import Arduino, ArduinoMega, ArduinoDue, ArduinoNano, Pin, util, ANALOG, INPUT, PWM +from pyfirmata2 import ( + Arduino, + ArduinoMega, + ArduinoDue, + ArduinoNano, + Board, + Pin, + util, + ANALOG, + INPUT, + PWM, +) +from platypush.common.sensors import Numeric +from platypush.entities.devices import Device +from platypush.entities.sensors import NumericSensor from platypush.plugins import action from platypush.plugins.sensor import SensorPlugin class PinType(enum.IntEnum): + """ + PIN type enumeration (analog or digital). + """ + ANALOG = 1 DIGITAL = 2 class BoardType(enum.Enum): + """ + Board types. + """ + MEGA = 'mega' DUE = 'due' NANO = 'nano' -# noinspection PyBroadException +# pylint: disable=too-many-ancestors class ArduinoPlugin(SensorPlugin): """ Interact with an Arduino connected to the host machine over USB using the @@ -41,23 +64,32 @@ class ArduinoPlugin(SensorPlugin): * **pyfirmata2** (``pip install pyfirmata2``) + Triggers: + + * :class:`platypush.message.event.sensor.SensorDataAboveThresholdEvent` + * :class:`platypush.message.event.sensor.SensorDataBelowThresholdEvent` + * :class:`platypush.message.event.sensor.SensorDataChangeEvent` + """ - def __init__(self, - board: Optional[str] = None, - board_type: Optional[str] = None, - baud_rate: int = 57600, - analog_pins: Optional[Dict[str, int]] = None, - digital_pins: Optional[Dict[str, int]] = None, - timeout: float = 20.0, - conv_functions: Optional[Dict[Union[str, int], Union[str, Callable]]] = None, - **kwargs): + def __init__( + self, + board: Optional[str] = None, + board_type: Optional[str] = None, + baud_rate: int = 9600, + analog_pins: Optional[Dict[str, int]] = None, + digital_pins: Optional[Dict[str, int]] = None, + timeout: float = 20.0, + conv_functions: Optional[Dict[Union[str, int], Union[str, Callable]]] = None, + poll_interval: float = 1.0, + **kwargs, + ): """ :param board: Default board name or path (e.g. ``COM3`` on Windows or ``/dev/ttyUSB0`` on Unix). If not set then the plugin will attempt an auto-discovery. :param board_type: Default board type. It can be 'mega', 'due' or 'nano'. Leave empty for auto-detection. - :param baud_rate: Default serial baud rate (default: 57600) + :param baud_rate: Default serial baud rate (default: 9600) :param analog_pins: Optional analog PINs map name->pin_number. :param digital_pins: Optional digital PINs map name->pin_number. :param timeout: Board communication timeout in seconds. @@ -77,7 +109,7 @@ class ArduinoPlugin(SensorPlugin): temperature: 'lambda t: t * 500.0' """ - super().__init__(**kwargs) + super().__init__(poll_interval=poll_interval, **kwargs) self.board = board self.board_type = self._get_board_type(board_type) @@ -90,20 +122,28 @@ class ArduinoPlugin(SensorPlugin): } self._pin_name_by_number = { - PinType.ANALOG: {number: name for name, number in self._pin_number_by_name[PinType.ANALOG].items()}, - PinType.DIGITAL: {number: name for name, number in self._pin_number_by_name[PinType.DIGITAL].items()}, + PinType.ANALOG: { + number: name + for name, number in self._pin_number_by_name[PinType.ANALOG].items() + }, + PinType.DIGITAL: { + number: name + for name, number in self._pin_number_by_name[PinType.DIGITAL].items() + }, } self.conv_functions: Dict[Union[str, int], Callable] = { - (self._pin_number_by_name[PinType.ANALOG].get(pin, pin)): (f if callable(f) else eval(f)) + (self._pin_number_by_name[PinType.ANALOG].get(str(pin), pin)): ( + f if callable(f) else eval(f) + ) for pin, f in (conv_functions or {}).items() } - self._boards = {} - self._board_iterators = {} + self._boards: Dict[str, Board] = {} + self._board_iterators: Dict[str, util.Iterator] = {} @staticmethod - def _get_board_type(board_type: Optional[str] = None) -> Type: + def _get_board_type(board_type: Optional[str] = None) -> Type[Board]: if not board_type: return Arduino @@ -115,13 +155,15 @@ class ArduinoPlugin(SensorPlugin): if board_type == BoardType.MEGA.value: return ArduinoMega - raise AssertionError('Invalid board_type: {}'.format(board_type)) + raise AssertionError(f'Invalid board_type: {board_type}') - def _get_board(self, - board: Optional[str] = None, - board_type: Optional[str] = None, - baud_rate: Optional[int] = None, - timeout: Optional[float] = None): + def _get_board( + self, + board: Optional[str] = None, + board_type: Optional[str] = None, + baud_rate: Optional[int] = None, + timeout: Optional[float] = None, + ) -> Board: board_name = board or self.board or Arduino.AUTODETECT baud_rate = baud_rate or self.baud_rate timeout = timeout or self.timeout @@ -129,48 +171,58 @@ class ArduinoPlugin(SensorPlugin): if board_name in self._boards: return self._boards[board_name] - board_type = self._get_board_type(board_type) if board_type else self.board_type - board = board_type(board_name, baudrate=baud_rate, timeout=timeout) - self.logger.info('Connected to board {}'.format(board.name)) - self._boards[board_name] = board + board_obj_type = ( + self._get_board_type(board_type) if board_type else self.board_type + ) + assert board_obj_type - self._board_iterators[board.name] = util.Iterator(board) - self._board_iterators[board.name].start() - return board + board_obj = board_obj_type(board_name, baudrate=baud_rate, timeout=timeout) + board_name = board_obj.name or '' + self.logger.info('Connected to board %s', board_name) + self._boards[board_name] = board_obj + self._board_iterators[board_name] = util.Iterator(board_obj) + self._board_iterators[board_name].start() + return board_obj - def _get_board_and_pin(self, - pin: Union[int, str], - pin_type: PinType, - board: Optional[str] = None, - board_type: Optional[str] = None, - baud_rate: Optional[int] = None, - timeout: Optional[int] = None) -> Tuple[Arduino, int]: - board = self._get_board(board, board_type=board_type, baud_rate=baud_rate, timeout=timeout) + def _get_board_and_pin( + self, + pin: Union[int, str], + pin_type: PinType, + board: Optional[str] = None, + board_type: Optional[str] = None, + baud_rate: Optional[int] = None, + timeout: Optional[int] = None, + ) -> Tuple[Board, int]: + board_ = self._get_board( + board, board_type=board_type, baud_rate=baud_rate, timeout=timeout + ) if pin in self._pin_number_by_name[pin_type]: - pin = self._pin_number_by_name[pin_type][pin] + pin = self._pin_number_by_name[pin_type][str(pin)] - assert isinstance(pin, int), 'Invalid PIN number/name: {}'.format(pin) - return board, pin + assert isinstance(pin, int), f'Invalid PIN number/name: {pin}' + return board_, pin @staticmethod - def _get_pin(pin: int, board: Arduino, pin_type: PinType) -> Pin: + def _get_pin(pin: int, board: Board, pin_type: PinType) -> Pin: pins = None if pin_type == PinType.ANALOG: pins = board.analog if pin_type == PinType.DIGITAL: pins = board.digital - assert pins, 'Invalid pin_type: {}'.format(pin_type) + assert pins, f'Invalid pin_type: {pin_type}' if pins[pin].mode in [ANALOG, INPUT]: pins[pin].enable_reporting() return pins[pin] - def _poll_value(self, - pin: int, - board: Arduino, - pin_type: PinType, - timeout: Optional[float] = None) -> Optional[Union[bool, float]]: + def _poll_value( + self, + pin: int, + board: Board, + pin_type: PinType, + timeout: Optional[float] = None, + ) -> Optional[Union[bool, float]]: value = None poll_start = time.time() @@ -180,8 +232,10 @@ class ArduinoPlugin(SensorPlugin): pin_ = self._get_pin(pin=pin, board=board, pin_type=pin_type) if pin_.mode not in [INPUT, ANALOG]: - self.logger.warning('PIN {} is not configured in input/analog mode'.format(pin)) - return + self.logger.warning( + 'PIN %d is not configured in input/analog mode', pin + ) + return None value = pin_.read() if value is None: @@ -193,13 +247,15 @@ class ArduinoPlugin(SensorPlugin): return value @action - def analog_read(self, - pin: Union[int, str], - board: Optional[str] = None, - board_type: Optional[str] = None, - baud_rate: Optional[int] = None, - conv_function: Optional[Union[str, Callable]] = None, - timeout: Optional[int] = None) -> float: + def analog_read( + self, + pin: Union[int, str], + board: Optional[str] = None, + board_type: Optional[str] = None, + baud_rate: Optional[int] = None, + conv_function: Optional[Union[str, Callable]] = None, + timeout: Optional[int] = None, + ) -> Optional[float]: """ Read an analog value from a PIN. @@ -212,28 +268,44 @@ class ArduinoPlugin(SensorPlugin): returns by default float values in the range ``[0.0, 1.0]``. :param timeout: Communication timeout in seconds (default: default configured ``timeout``). """ - conv_function = conv_function or self.conv_functions.get(pin) - board, pin = self._get_board_and_pin(pin=pin, - pin_type=PinType.ANALOG, - board=board, - board_type=board_type, - baud_rate=baud_rate, - timeout=timeout) + board_, pin = self._get_board_and_pin( + pin=pin, + pin_type=PinType.ANALOG, + board=board, + board_type=board_type, + baud_rate=baud_rate, + timeout=timeout, + ) conv_function = conv_function or self.conv_functions.get(pin) - value = self._poll_value(pin=pin, board=board, pin_type=PinType.ANALOG, timeout=timeout) + converter: Optional[Callable[[float], float]] = None + if isinstance(conv_function, str): + converter = eval(conv_function) + elif callable(conv_function): + converter = conv_function + elif conv_function is not None: + raise AssertionError( + 'Expected conv_function to be null, a string or a function, ' + f'got "{conv_function}" instead' + ) - if conv_function: - value = conv_function(value) + value = self._poll_value( + pin=pin, board=board_, pin_type=PinType.ANALOG, timeout=timeout + ) + + if converter and value is not None: + value = converter(value) return value @action - def digital_read(self, - pin: Union[int, str], - board: Optional[str] = None, - board_type: Optional[str] = None, - baud_rate: Optional[int] = None, - timeout: Optional[int] = None) -> bool: + def digital_read( + self, + pin: Union[int, str], + board: Optional[str] = None, + board_type: Optional[str] = None, + baud_rate: Optional[int] = None, + timeout: Optional[int] = None, + ) -> bool: """ Read a digital value from a PIN. @@ -243,22 +315,31 @@ class ArduinoPlugin(SensorPlugin): :param baud_rate: Baud rate (default: default configured ``baud_rate``). :param timeout: Communication timeout in seconds (default: default configured ``timeout``). """ - board, pin = self._get_board_and_pin(pin=pin, - pin_type=PinType.DIGITAL, - board=board, - board_type=board_type, - baud_rate=baud_rate, - timeout=timeout) + board_, pin = self._get_board_and_pin( + pin=pin, + pin_type=PinType.DIGITAL, + board=board, + board_type=board_type, + baud_rate=baud_rate, + timeout=timeout, + ) - return self._poll_value(pin=pin, board=board, pin_type=PinType.DIGITAL, timeout=timeout) + return bool( + self._poll_value( + pin=pin, board=board_, pin_type=PinType.DIGITAL, timeout=timeout + ) + ) @action - def analog_write(self, pin: Union[int, str], - value: float, - board: Optional[str] = None, - board_type: Optional[str] = None, - baud_rate: Optional[int] = None, - timeout: Optional[int] = None): + def analog_write( + self, + pin: Union[int, str], + value: float, + board: Optional[str] = None, + board_type: Optional[str] = None, + baud_rate: Optional[int] = None, + timeout: Optional[int] = None, + ): """ Write a value to an analog PIN. @@ -269,21 +350,26 @@ class ArduinoPlugin(SensorPlugin): :param baud_rate: Baud rate (default: default configured ``baud_rate``). :param timeout: Communication timeout in seconds (default: default configured ``timeout``). """ - board, pin = self._get_board_and_pin(pin=pin, - pin_type=PinType.ANALOG, - board=board, - board_type=board_type, - baud_rate=baud_rate, - timeout=timeout) - board.analog[pin].write(value) + board_, pin = self._get_board_and_pin( + pin=pin, + pin_type=PinType.ANALOG, + board=board, + board_type=board_type, + baud_rate=baud_rate, + timeout=timeout, + ) + board_.analog[pin].write(value) @action - def digital_write(self, pin: Union[int, str], - value: bool, - board: Optional[str] = None, - board_type: Optional[str] = None, - baud_rate: Optional[int] = None, - timeout: Optional[int] = None): + def digital_write( + self, + pin: Union[int, str], + value: bool, + board: Optional[str] = None, + board_type: Optional[str] = None, + baud_rate: Optional[int] = None, + timeout: Optional[int] = None, + ): """ Write a value to a digital PIN. @@ -294,21 +380,26 @@ class ArduinoPlugin(SensorPlugin): :param baud_rate: Baud rate (default: default configured ``baud_rate``). :param timeout: Communication timeout in seconds (default: default configured ``timeout``). """ - board, pin = self._get_board_and_pin(pin=pin, - pin_type=PinType.DIGITAL, - board=board, - board_type=board_type, - baud_rate=baud_rate, - timeout=timeout) - board.digital[pin].write(value) + board_, pin = self._get_board_and_pin( + pin=pin, + pin_type=PinType.DIGITAL, + board=board, + board_type=board_type, + baud_rate=baud_rate, + timeout=timeout, + ) + board_.digital[pin].write(value) @action - def pwm_write(self, pin: Union[int, str], - value: float, - board: Optional[str] = None, - board_type: Optional[str] = None, - baud_rate: Optional[int] = None, - timeout: Optional[int] = None): + def pwm_write( + self, + pin: Union[int, str], + value: float, + board: Optional[str] = None, + board_type: Optional[str] = None, + baud_rate: Optional[int] = None, + timeout: Optional[int] = None, + ): """ Write a PWM value to a digital PIN. @@ -319,26 +410,32 @@ class ArduinoPlugin(SensorPlugin): :param baud_rate: Baud rate (default: default configured ``baud_rate``). :param timeout: Communication timeout in seconds (default: default configured ``timeout``). """ - board, pin = self._get_board_and_pin(pin=pin, - pin_type=PinType.DIGITAL, - board=board, - board_type=board_type, - baud_rate=baud_rate, - timeout=timeout) + board_, pin = self._get_board_and_pin( + pin=pin, + pin_type=PinType.DIGITAL, + board=board, + board_type=board_type, + baud_rate=baud_rate, + timeout=timeout, + ) - assert board.digital[pin].PWM_CAPABLE, 'PIN {} is not PWM capable'.format(pin) - if board.digital[pin] != PWM: - board.digital[pin].mode = PWM - time.sleep(0.001) + assert board_.digital[pin].PWM_CAPABLE, f'PIN {pin} is not PWM capable' + if board_.digital[pin] != PWM: + board_.digital[pin].mode = PWM + time.sleep(0.001) # 1 μs spike to activate a PWM pin - board.digital[pin].write(value) + board_.digital[pin].write(value) @action - def get_measurement(self, - board: Optional[str] = None, - board_type: Optional[str] = None, - baud_rate: Optional[int] = None, - timeout: Optional[int] = None) -> Dict[str, float]: + def get_measurement( + self, + *_, + board: Optional[str] = None, + board_type: Optional[str] = None, + baud_rate: Optional[int] = None, + timeout: Optional[int] = None, + **__, + ) -> Dict[str, Optional[Union[float, bool]]]: """ Get a measurement from all the configured PINs. @@ -352,21 +449,35 @@ class ArduinoPlugin(SensorPlugin): transformed through the configured ``conv_functions``. """ ret = {} - board = self._get_board(board=board, board_type=board_type, baud_rate=baud_rate, timeout=timeout) + board_ = self._get_board( + board=board, board_type=board_type, baud_rate=baud_rate, timeout=timeout + ) - for pin in board.analog: - if self._pin_name_by_number[PinType.ANALOG] and \ - pin.pin_number not in self._pin_name_by_number[PinType.ANALOG]: + assert board_, f'No such board: board={board}, board_type={board_type}' + + for pin in board_.analog: + if ( + self._pin_name_by_number[PinType.ANALOG] + and pin.pin_number not in self._pin_name_by_number[PinType.ANALOG] + ): continue - name = self._pin_name_by_number[PinType.ANALOG].get(pin.pin_number, 'A{}'.format(pin.pin_number)) - value = self._poll_value(pin=pin.pin_number, board=board, pin_type=PinType.ANALOG, - timeout=timeout or self.timeout) + name = self._pin_name_by_number[PinType.ANALOG].get( + pin.pin_number, f'A{pin.pin_number}' + ) + value = self._poll_value( + pin=pin.pin_number, + board=board_, + pin_type=PinType.ANALOG, + timeout=timeout or self.timeout, + ) if value is None: continue - conv_function = self.conv_functions.get(name, self.conv_functions.get(pin.pin_number)) + conv_function = self.conv_functions.get( + name, self.conv_functions.get(pin.pin_number) + ) if conv_function: value = conv_function(value) @@ -374,8 +485,32 @@ class ArduinoPlugin(SensorPlugin): return ret + @override + def transform_entities(self, entities: Dict[str, Numeric]) -> List[Device]: # type: ignore + dev_id = 'arduino' + dev_name = 'Arduino' + if self.board: + dev_id += f':{self.board}' + dev_name += f' @ {self.board}' + + return [ + Device( + id=dev_id, + name=dev_name, + children=[ + NumericSensor( + id=f'{dev_id}:{key}', + name=key, + value=value, + ) + for key, value in entities.items() + ], + ) + ] + @action - def close(self): + def stop(self): + super().stop() for it in self._board_iterators.values(): it.stop() diff --git a/platypush/plugins/arduino/manifest.yaml b/platypush/plugins/arduino/manifest.yaml index dd644b38b..abe0839aa 100644 --- a/platypush/plugins/arduino/manifest.yaml +++ b/platypush/plugins/arduino/manifest.yaml @@ -1,5 +1,8 @@ manifest: - events: {} + events: + platypush.message.event.sensor.SensorDataAboveThresholdEvent: + platypush.message.event.sensor.SensorDataBelowThresholdEvent: + platypush.message.event.sensor.SensorDataChangeEvent: install: pip: - pyfirmata2