2018-02-25 23:45:31 +01:00
|
|
|
import enum
|
|
|
|
import threading
|
|
|
|
import time
|
|
|
|
|
|
|
|
from platypush.message.response import Response
|
|
|
|
from platypush.plugins import Plugin
|
2018-02-26 11:31:06 +01:00
|
|
|
from platypush.context import get_plugin
|
|
|
|
from platypush.config import Config
|
2018-02-25 23:45:31 +01:00
|
|
|
|
|
|
|
|
|
|
|
class Direction(enum.Enum):
|
|
|
|
DIR_UP = 'up'
|
|
|
|
DIR_DOWN = 'down'
|
|
|
|
DIR_LEFT = 'left'
|
|
|
|
DIR_RIGHT = 'right'
|
2018-02-26 11:31:06 +01:00
|
|
|
DIR_AUTO = 'auto'
|
2018-03-05 23:27:20 +01:00
|
|
|
DIR_AUTO_TOGGLE = 'auto_toggle'
|
2018-02-25 23:45:31 +01:00
|
|
|
|
|
|
|
|
|
|
|
class GpioZeroborgPlugin(Plugin):
|
|
|
|
_drive_thread = None
|
|
|
|
_can_run = False
|
|
|
|
_direction = None
|
|
|
|
|
|
|
|
|
2018-04-13 15:12:31 +02:00
|
|
|
def __init__(self, directions = {}):
|
2018-02-25 23:45:31 +01:00
|
|
|
import platypush.plugins.gpio.zeroborg.lib as ZeroBorg
|
|
|
|
|
2018-04-13 15:12:31 +02:00
|
|
|
self.directions = directions
|
2018-02-26 11:31:06 +01:00
|
|
|
self.auto_mode = False
|
2018-03-03 03:24:08 +01:00
|
|
|
self._direction = None
|
2018-02-25 23:45:31 +01:00
|
|
|
|
|
|
|
self.zb = ZeroBorg.ZeroBorg()
|
|
|
|
self.zb.Init()
|
|
|
|
self.zb.SetCommsFailsafe(True)
|
|
|
|
self.zb.ResetEpo()
|
|
|
|
|
|
|
|
|
2018-04-13 15:49:10 +02:00
|
|
|
def _get_measurement(self, plugin, timeout):
|
|
|
|
measure_start_time = time.time()
|
|
|
|
value = None
|
2018-02-26 11:31:06 +01:00
|
|
|
|
2018-04-13 15:49:10 +02:00
|
|
|
while value is None:
|
|
|
|
value = plugin.get_measurement()
|
|
|
|
if time.time() - measure_start_time > timeout:
|
|
|
|
return None
|
|
|
|
|
|
|
|
return value
|
|
|
|
|
|
|
|
def _get_direction_from_sensors(self):
|
|
|
|
if Direction.DIR_AUTO.value not in self.directions:
|
|
|
|
raise RuntimeError("Can't start auto pilot: " +
|
|
|
|
"no sensor configured in gpio.zeroborg.directions.auto")
|
|
|
|
|
|
|
|
direction = None
|
|
|
|
|
|
|
|
for sensor in self.directions[Direction.DIR_AUTO.value]['sensors']:
|
|
|
|
plugin = get_plugin(sensor['plugin'])
|
|
|
|
if not sensor:
|
|
|
|
raise RuntimeError('No such sensor: ' + sensor['plugin'])
|
|
|
|
|
|
|
|
value = self._get_measurement(plugin=plugin, timeout=sensor['timeout'])
|
|
|
|
threshold = sensor['threshold']
|
|
|
|
|
|
|
|
if value >= threshold and 'above_threshold_direction' in sensor:
|
|
|
|
direction = sensor['above_threshold_direction']
|
|
|
|
elif 'below_threshold_direction' in sensor:
|
|
|
|
direction = sensor['below_threshold_direction']
|
|
|
|
|
2018-06-06 20:09:18 +02:00
|
|
|
self.logger.info('Sensor: {}\tMeasurement: {}\tDirection: {}'
|
2018-04-13 23:45:29 +02:00
|
|
|
.format(sensor['plugin'], value, direction))
|
|
|
|
|
2018-04-13 15:49:10 +02:00
|
|
|
return direction
|
2018-02-26 11:31:06 +01:00
|
|
|
|
|
|
|
|
2018-02-25 23:45:31 +01:00
|
|
|
def drive(self, direction):
|
2018-03-03 03:24:08 +01:00
|
|
|
prev_direction = self._direction
|
|
|
|
|
2018-02-25 23:45:31 +01:00
|
|
|
self._can_run = True
|
|
|
|
self._direction = direction.lower()
|
2018-06-06 20:09:18 +02:00
|
|
|
self.logger.info('Received ZeroBorg drive command: {}'.format(direction))
|
2018-02-25 23:45:31 +01:00
|
|
|
|
|
|
|
def _run():
|
|
|
|
while self._can_run and self._direction:
|
|
|
|
left = 0.0
|
|
|
|
right = 0.0
|
|
|
|
|
2018-03-05 23:27:20 +01:00
|
|
|
if self._direction == Direction.DIR_AUTO_TOGGLE.value:
|
|
|
|
if self.auto_mode:
|
|
|
|
self._direction = None
|
|
|
|
self.auto_mode = False
|
|
|
|
else:
|
|
|
|
self._direction = Direction.DIR_AUTO
|
|
|
|
self.auto_mode = True
|
|
|
|
|
2018-02-26 11:31:06 +01:00
|
|
|
if self._direction == Direction.DIR_AUTO.value:
|
|
|
|
self.auto_mode = True
|
|
|
|
|
|
|
|
if self.auto_mode:
|
2018-04-13 15:49:10 +02:00
|
|
|
self._direction = self._get_direction_from_sensors()
|
2018-02-26 11:31:06 +01:00
|
|
|
time.sleep(0.1)
|
|
|
|
|
2018-04-13 15:12:31 +02:00
|
|
|
motor_1_power = motor_2_power = motor_3_power = motor_4_power = 0.0
|
|
|
|
if self._direction in self.directions:
|
|
|
|
motor_1_power = self.directions[self._direction]['motor_1_power']
|
|
|
|
motor_2_power = self.directions[self._direction]['motor_2_power']
|
|
|
|
motor_3_power = self.directions[self._direction]['motor_3_power']
|
|
|
|
motor_4_power = self.directions[self._direction]['motor_4_power']
|
|
|
|
elif self._direction:
|
2018-06-06 20:09:18 +02:00
|
|
|
self.logger.warning('Invalid direction {}, stopping motors'.format(self._direction))
|
2018-04-13 15:12:31 +02:00
|
|
|
|
|
|
|
self.zb.SetMotor1(motor_1_power)
|
|
|
|
self.zb.SetMotor2(motor_2_power)
|
|
|
|
self.zb.SetMotor3(motor_3_power)
|
|
|
|
self.zb.SetMotor4(motor_4_power)
|
2018-02-25 23:45:31 +01:00
|
|
|
|
2018-02-26 11:31:06 +01:00
|
|
|
self.auto_mode = False
|
|
|
|
|
2018-02-25 23:45:31 +01:00
|
|
|
|
|
|
|
self._drive_thread = threading.Thread(target=_run)
|
|
|
|
self._drive_thread.start()
|
|
|
|
|
|
|
|
return Response(output={'status': 'running', 'direction': direction})
|
|
|
|
|
|
|
|
|
|
|
|
def stop(self):
|
|
|
|
self._can_run = False
|
|
|
|
if self._drive_thread and threading.get_ident() != self._drive_thread.ident:
|
|
|
|
self._drive_thread.join()
|
|
|
|
|
|
|
|
self.zb.MotorsOff()
|
|
|
|
self.zb.ResetEpo()
|
2018-02-26 11:31:06 +01:00
|
|
|
|
2018-02-25 23:45:31 +01:00
|
|
|
return Response(output={'status':'stopped'})
|
|
|
|
|
|
|
|
|
|
|
|
# vim:sw=4:ts=4:et:
|
|
|
|
|