forked from platypush/platypush
More rational management of ZeroBorg auto pilot mode together with sensor - no longer limited to gpio.sensor.distance alone, but including any kind of GpioSensorPlugin. Thresholds, timeouts and actions are configured in config.yaml instead of being hardcoded
This commit is contained in:
parent
b597097be3
commit
b863213bd7
3 changed files with 51 additions and 34 deletions
|
@ -0,0 +1,13 @@
|
|||
from platypush.plugins import Plugin
|
||||
|
||||
|
||||
class GpioSensorPlugin(Plugin):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
def get_measurement(self, *args, **kwargs):
|
||||
raise NotImplementedError('get_measurement should be implemented in a derived class')
|
||||
|
||||
|
||||
# vim:sw=4:ts=4:et:
|
||||
|
|
@ -5,11 +5,13 @@ import time
|
|||
import RPi.GPIO as gpio
|
||||
|
||||
from platypush.message.response import Response
|
||||
from platypush.plugins import Plugin
|
||||
from platypush.plugins.gpio.sensor import GpioSensorPlugin
|
||||
|
||||
|
||||
class GpioSensorDistancePlugin(Plugin):
|
||||
def __init__(self, trigger_pin, echo_pin):
|
||||
class GpioSensorDistancePlugin(GpioSensorPlugin):
|
||||
def __init__(self, trigger_pin, echo_pin, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
self.trigger_pin = trigger_pin
|
||||
self.echo_pin = echo_pin
|
||||
self._is_probing = False
|
||||
|
@ -20,10 +22,9 @@ class GpioSensorDistancePlugin(Plugin):
|
|||
|
||||
logging.info('Resetting trigger sensor on GPIO pin {}'.format(self.trigger_pin))
|
||||
gpio.output(self.trigger_pin, False)
|
||||
time.sleep(1)
|
||||
|
||||
|
||||
def get_distance(self):
|
||||
def get_measurement(self):
|
||||
gpio.setmode(gpio.BCM)
|
||||
gpio.setup(self.trigger_pin, gpio.OUT)
|
||||
gpio.setup(self.echo_pin, gpio.IN)
|
||||
|
|
|
@ -37,12 +37,38 @@ class GpioZeroborgPlugin(Plugin):
|
|||
self.zb.ResetEpo()
|
||||
|
||||
|
||||
def get_distance(self):
|
||||
distance_sensor = get_plugin('gpio.sensor.distance')
|
||||
if not distance_sensor:
|
||||
raise RuntimeError('No gpio.sensor.distance configuration found')
|
||||
def _get_measurement(self, plugin, timeout):
|
||||
measure_start_time = time.time()
|
||||
value = None
|
||||
|
||||
return distance_sensor.get_distance()
|
||||
while value is None:
|
||||
value = plugin.get_measurement()
|
||||
if time.time() - measure_start_time > timeout:
|
||||
return None
|
||||
|
||||
return value
|
||||
|
||||
def _get_direction_from_sensors(self):
|
||||
if Direction.DIR_AUTO.value not in self.directions:
|
||||
raise RuntimeError("Can't start auto pilot: " +
|
||||
"no sensor configured in gpio.zeroborg.directions.auto")
|
||||
|
||||
direction = None
|
||||
|
||||
for sensor in self.directions[Direction.DIR_AUTO.value]['sensors']:
|
||||
plugin = get_plugin(sensor['plugin'])
|
||||
if not sensor:
|
||||
raise RuntimeError('No such sensor: ' + sensor['plugin'])
|
||||
|
||||
value = self._get_measurement(plugin=plugin, timeout=sensor['timeout'])
|
||||
threshold = sensor['threshold']
|
||||
|
||||
if value >= threshold and 'above_threshold_direction' in sensor:
|
||||
direction = sensor['above_threshold_direction']
|
||||
elif 'below_threshold_direction' in sensor:
|
||||
direction = sensor['below_threshold_direction']
|
||||
|
||||
return direction
|
||||
|
||||
|
||||
def drive(self, direction):
|
||||
|
@ -69,30 +95,7 @@ class GpioZeroborgPlugin(Plugin):
|
|||
self.auto_mode = True
|
||||
|
||||
if self.auto_mode:
|
||||
distance = None
|
||||
last_recorded_distance_timestamp = None
|
||||
distance_record_timeout = 2.0
|
||||
|
||||
while distance is None:
|
||||
distance = self.get_distance()
|
||||
logging.info('Closest obstacle distance: {} mm'.format(distance))
|
||||
|
||||
if last_recorded_distance_timestamp and \
|
||||
time.time() - last_recorded_distance_timestamp > distance_record_timeout:
|
||||
# Stop the motors if we have been unable
|
||||
# to access the distance sensor data
|
||||
self._direction = None
|
||||
break
|
||||
|
||||
last_recorded_distance_timestamp = time.time()
|
||||
|
||||
if distance > 400.0: # distance in mm
|
||||
self._direction = Direction.DIR_UP.value
|
||||
else:
|
||||
logging.info('Physical obstacle detected at {} mm'.
|
||||
format(distance))
|
||||
self._direction = Direction.DIR_LEFT.value
|
||||
|
||||
self._direction = self._get_direction_from_sensors()
|
||||
time.sleep(0.1)
|
||||
|
||||
motor_1_power = motor_2_power = motor_3_power = motor_4_power = 0.0
|
||||
|
|
Loading…
Add table
Reference in a new issue