diff --git a/platypush/plugins/gpio/zeroborg/__init__.py b/platypush/plugins/gpio/zeroborg/__init__.py index 97ebd51ea0..87e328eec1 100644 --- a/platypush/plugins/gpio/zeroborg/__init__.py +++ b/platypush/plugins/gpio/zeroborg/__init__.py @@ -2,12 +2,11 @@ import enum import threading import time -from typing import Optional +from typing import Optional, Dict, List from platypush.context import get_bus from platypush.message.event.zeroborg import ZeroborgDriveEvent, ZeroborgStopEvent from platypush.plugins import Plugin, action -from platypush.context import get_plugin class Direction(enum.Enum): @@ -15,8 +14,6 @@ class Direction(enum.Enum): DIR_DOWN = 'down' DIR_LEFT = 'left' DIR_RIGHT = 'right' - DIR_AUTO = 'auto' - DIR_AUTO_TOGGLE = 'auto_toggle' # noinspection PyPep8Naming @@ -33,7 +30,7 @@ class GpioZeroborgPlugin(Plugin): """ - def __init__(self, directions=None, **kwargs): + def __init__(self, directions: Dict[str, List[float]] = None, **kwargs): """ :param directions: Configuration for the motor directions. A direction is basically a configuration of the power delivered to each motor to allow whichever object you're controlling (wheels, robotic arms etc.) to @@ -43,53 +40,32 @@ class GpioZeroborgPlugin(Plugin): Example configuration that I use to control a simple 4WD robot:: directions = { - "up": { - "motor_1_power": 0.4821428571428572, - "motor_2_power": 0.4821428571428572, - "motor_3_power": -0.6707142857142858, - "motor_4_power": -0.6707142857142858 - }, - "down": { - "motor_1_power": -0.4821428571428572, - "motor_2_power": -0.4821428571428572, - "motor_3_power": 0.825, - "motor_4_power": 0.825 - }, - "left": { - "motor_1_power": -0.1392857142857143, - "motor_2_power": -0.1392857142857143, - "motor_3_power": -1.0553571428571429, - "motor_4_power": -1.0553571428571429 - }, - "right": { - "motor_1_power": 1.0017857142857143, - "motor_2_power": 1.0017857142857143, - "motor_3_power": 0.6214285714285713, - "motor_4_power": 0.6214285714285713 - }, - "auto": { - "sensors": [ - { - "plugin": "gpio.sensor.distance", - "threshold": 400.0, - "timeout": 2.0, - "above_threshold_direction": "up", - "below_threshold_direction": "left" - } - ] - } + "up": [ + 0.4821428571428572, # Motor 1 power + 0.4821428571428572, # Motor 2 power + -0.6707142857142858, # Motor 3 power + -0.6707142857142858 # Motor 4 power + ], + "down": [ + -0.4821428571428572, + -0.4821428571428572, + 0.825, + 0.825 + ], + "left": [ + -0.1392857142857143, + -0.1392857142857143, + -1.0553571428571429, + -1.0553571428571429 + ], + "right": [ + 1.0017857142857143, + 1.0017857142857143, + 0.6214285714285713, + 0.6214285714285713 + ] } - Note that the special direction "auto" can contain a configuration that allows your device to move autonomously - based on the inputs it gets from some sensors. In this case, I set the sensors configuration (a list) to - periodically poll a GPIO-based ultrasound distance sensor plugin. ``timeout`` says after how long a poll - attempt should fail. The plugin package is specified through ``plugin`` (``gpio.sensor.distance``) in this - case, note that the plugin must be configured as well in order to work). The ``threshold`` value says - around which value your logic should trigger. In this case, threshold=400 (40 cm). When the distance value - is above that threshold (``above_threshold_direction``), then go "up" (no obstacles ahead). Otherwise - (``below_threshold_direction``), turn "left" (avoid the obstacle). - - :type directions: dict """ if directions is None: @@ -99,7 +75,6 @@ class GpioZeroborgPlugin(Plugin): super().__init__(**kwargs) self.directions = directions - self.auto_mode = False self._direction = None self._drive_thread: Optional[threading.Thread] = None self._motors = [0, 0, 0, 0] @@ -121,67 +96,17 @@ class GpioZeroborgPlugin(Plugin): return value - def _get_direction_from_sensors(self): - if Direction.DIR_AUTO.value not in self.directions: - raise RuntimeError("Can't start auto pilot: " + - "no sensor configured in gpio.zeroborg.directions.auto") - - direction = None - - for sensor in self.directions[Direction.DIR_AUTO.value]['sensors']: - plugin = get_plugin(sensor['plugin']) - if not sensor: - raise RuntimeError('No such sensor: ' + sensor['plugin']) - - value = self._get_measurement(plugin=plugin, timeout=sensor['timeout']) - threshold = sensor['threshold'] - - if value is None: - self.logger.warning('Read timeout from sensor {}'.format(sensor['plugin'])) - return Direction.DIR_AUTO.value - - if value >= threshold and 'above_threshold_direction' in sensor: - direction = sensor['above_threshold_direction'] - elif 'below_threshold_direction' in sensor: - direction = sensor['below_threshold_direction'] - - self.logger.info('Sensor: {}\tMeasurement: {}\tDirection: {}' - .format(sensor['plugin'], value, direction)) - - return direction - @action def drive(self, direction): """ Drive the motors in a certain direction. - - :param direction: Direction name (note: it must be a configured direction). Special directions: - * ``auto`` - Enter automatic drive mode - * ``auto_toggle`` - Toggle automatic drive mode (on or off) - * ``stop`` - Turn off the motors - """ def _run(): try: while self._direction: - if self._direction == Direction.DIR_AUTO_TOGGLE.value: - if self.auto_mode: - self._direction = None - self.auto_mode = False - else: - self._direction = Direction.DIR_AUTO - self.auto_mode = True - - if self._direction == Direction.DIR_AUTO.value: - self.auto_mode = True - try: - if self.auto_mode: - self._direction = self._get_direction_from_sensors() - time.sleep(0.1) - - if self._direction in self.directions and self._direction != Direction.DIR_AUTO.value: + if self._direction in self.directions: self._motors = self.directions[self._direction] else: self.logger.warning('Invalid direction {}: stopping motors'.format(self._direction)) @@ -212,9 +137,7 @@ class GpioZeroborgPlugin(Plugin): Turns off the motors """ - self.auto_mode = False self._direction = None - if self._drive_thread: self._drive_thread.join()