Implemented auto pilot with distance sensors
This commit is contained in:
parent
a8254181b2
commit
cf83f07f79
3 changed files with 141 additions and 1 deletions
0
platypush/plugins/gpio/sensor/__init__.py
Normal file
0
platypush/plugins/gpio/sensor/__init__.py
Normal file
99
platypush/plugins/gpio/sensor/distance/__init__.py
Normal file
99
platypush/plugins/gpio/sensor/distance/__init__.py
Normal file
|
@ -0,0 +1,99 @@
|
|||
import logging
|
||||
import threading
|
||||
import time
|
||||
|
||||
import RPi.GPIO as gpio
|
||||
|
||||
from platypush.message.response import Response
|
||||
from platypush.plugins import Plugin
|
||||
|
||||
|
||||
class GpioSensorDistancePlugin(Plugin):
|
||||
def __init__(self, trigger_pin, echo_pin):
|
||||
self.trigger_pin = trigger_pin
|
||||
self.echo_pin = echo_pin
|
||||
self.measured_distance_mm = None
|
||||
self._is_probing = False
|
||||
|
||||
gpio.setmode(gpio.BCM)
|
||||
gpio.setup(self.trigger_pin, gpio.OUT)
|
||||
gpio.setup(self.echo_pin, gpio.IN)
|
||||
|
||||
logging.info('Resetting trigger sensor on GPIO pin {}'.format(self.trigger_pin))
|
||||
gpio.output(self.trigger_pin, False)
|
||||
time.sleep(1)
|
||||
|
||||
|
||||
def start(self):
|
||||
self._is_probing = True
|
||||
logging.info('Received distance sensor start command')
|
||||
|
||||
def _run():
|
||||
gpio.setmode(gpio.BCM)
|
||||
gpio.setup(self.trigger_pin, gpio.OUT)
|
||||
gpio.setup(self.echo_pin, gpio.IN)
|
||||
|
||||
while self._is_probing:
|
||||
gpio.output(self.trigger_pin, True)
|
||||
time.sleep(0.00001) # 1 us pulse to trigger echo measurement
|
||||
gpio.output(self.trigger_pin, False)
|
||||
read_timeout = False
|
||||
|
||||
pulse_start = time.time()
|
||||
pulse_on = pulse_start
|
||||
|
||||
while gpio.input(self.echo_pin) == 0:
|
||||
pulse_on = time.time()
|
||||
if pulse_on - pulse_start > 0.5:
|
||||
logging.warning('Distance sensor echo timeout: 0')
|
||||
read_timeout = True
|
||||
break
|
||||
|
||||
pulse_start = pulse_on
|
||||
pulse_end = time.time()
|
||||
pulse_off = pulse_end
|
||||
|
||||
while gpio.input(self.echo_pin) == 1:
|
||||
pulse_off = time.time()
|
||||
if pulse_off - pulse_end > 0.5:
|
||||
logging.warning('Distance sensor echo timeout: 1')
|
||||
read_timeout = True
|
||||
break
|
||||
|
||||
if read_timeout:
|
||||
continue
|
||||
|
||||
pulse_end = pulse_off
|
||||
pulse_duration = pulse_end - pulse_start
|
||||
|
||||
# s = vt where v = avg speed of sound
|
||||
self.measured_distance_mm = round(pulse_duration * 171500.0, 2)
|
||||
|
||||
gpio.output(self.trigger_pin, False)
|
||||
time.sleep(0.1)
|
||||
|
||||
gpio.cleanup()
|
||||
self.measured_distance_mm = None
|
||||
|
||||
self._probe_thread = threading.Thread(target=_run)
|
||||
self._probe_thread.start()
|
||||
|
||||
return Response(output={'status': 'probing'})
|
||||
|
||||
|
||||
def get_distance(self):
|
||||
return self.measured_distance_mm
|
||||
|
||||
|
||||
def stop(self):
|
||||
self._is_probing = False
|
||||
self.measured_distance_mm = None
|
||||
|
||||
if self._probe_thread and threading.get_ident() != self._probe_thread.ident:
|
||||
self._probe_thread.join()
|
||||
|
||||
return Response(output={'status':'stopped'})
|
||||
|
||||
|
||||
# vim:sw=4:ts=4:et:
|
||||
|
|
@ -5,6 +5,8 @@ import time
|
|||
|
||||
from platypush.message.response import Response
|
||||
from platypush.plugins import Plugin
|
||||
from platypush.context import get_plugin
|
||||
from platypush.config import Config
|
||||
|
||||
|
||||
class Direction(enum.Enum):
|
||||
|
@ -12,6 +14,7 @@ class Direction(enum.Enum):
|
|||
DIR_DOWN = 'down'
|
||||
DIR_LEFT = 'left'
|
||||
DIR_RIGHT = 'right'
|
||||
DIR_AUTO = 'auto'
|
||||
|
||||
|
||||
class GpioZeroborgPlugin(Plugin):
|
||||
|
@ -37,6 +40,8 @@ class GpioZeroborgPlugin(Plugin):
|
|||
self.v_in = v_in
|
||||
self.v_out = v_out
|
||||
self.max_power = v_out / float(v_in)
|
||||
self.distance_sensor = None
|
||||
self.auto_mode = False
|
||||
|
||||
self.zb = ZeroBorg.ZeroBorg()
|
||||
self.zb.Init()
|
||||
|
@ -44,6 +49,19 @@ class GpioZeroborgPlugin(Plugin):
|
|||
self.zb.ResetEpo()
|
||||
|
||||
|
||||
def get_distance(self):
|
||||
if not self.distance_sensor:
|
||||
self.distance_sensor = get_plugin('gpio.sensor.distance')
|
||||
|
||||
if not self.distance_sensor:
|
||||
raise RuntimeError('No gpio.sensor.distance configuration found')
|
||||
|
||||
if self.distance_sensor.get_distance() is None:
|
||||
self.distance_sensor.start()
|
||||
|
||||
return self.distance_sensor.get_distance()
|
||||
|
||||
|
||||
def drive(self, direction):
|
||||
self._can_run = True
|
||||
self._direction = direction.lower()
|
||||
|
@ -54,6 +72,22 @@ class GpioZeroborgPlugin(Plugin):
|
|||
left = 0.0
|
||||
right = 0.0
|
||||
|
||||
if self._direction == Direction.DIR_AUTO.value:
|
||||
self.auto_mode = True
|
||||
|
||||
if self.auto_mode:
|
||||
distance = None
|
||||
while distance is None:
|
||||
distance = self.get_distance()
|
||||
|
||||
print(distance)
|
||||
if distance > 200.0: # distance in mm
|
||||
self._direction = Direction.DIR_UP.value
|
||||
else:
|
||||
self._direction = Direction.DIR_LEFT.value
|
||||
|
||||
time.sleep(0.1)
|
||||
|
||||
if self._direction == Direction.DIR_UP.value:
|
||||
left = 1.0 + self._power_offsets[Direction.DIR_LEFT][Direction.DIR_UP]
|
||||
right = 1.0 + self._power_offsets[Direction.DIR_RIGHT][Direction.DIR_UP]
|
||||
|
@ -65,15 +99,18 @@ class GpioZeroborgPlugin(Plugin):
|
|||
right = -1.25 - self._power_offsets[Direction.DIR_RIGHT][Direction.DIR_DOWN]
|
||||
elif self._direction == Direction.DIR_RIGHT.value:
|
||||
left = -1.25 - self._power_offsets[Direction.DIR_LEFT][Direction.DIR_DOWN]
|
||||
right = 2 + self._power_offsets[Direction.DIR_RIGHT][Direction.DIR_UP]
|
||||
right = 2.0 + self._power_offsets[Direction.DIR_RIGHT][Direction.DIR_UP]
|
||||
elif self._direction is not None:
|
||||
logging.warning('Invalid direction: {}'.format(direction))
|
||||
self.stop()
|
||||
|
||||
self.zb.SetMotor1(left * self.max_power)
|
||||
self.zb.SetMotor2(left * self.max_power)
|
||||
self.zb.SetMotor3(-right * self.max_power)
|
||||
self.zb.SetMotor4(-right * self.max_power)
|
||||
|
||||
self.auto_mode = False
|
||||
|
||||
|
||||
self._drive_thread = threading.Thread(target=_run)
|
||||
self._drive_thread.start()
|
||||
|
@ -88,6 +125,10 @@ class GpioZeroborgPlugin(Plugin):
|
|||
|
||||
self.zb.MotorsOff()
|
||||
self.zb.ResetEpo()
|
||||
|
||||
if self.distance_sensor:
|
||||
self.distance_sensor.stop()
|
||||
|
||||
return Response(output={'status':'stopped'})
|
||||
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue