Support for Zeroborg events
This commit is contained in:
parent
b36a8095ab
commit
7a7acbe6b9
3 changed files with 175 additions and 140 deletions
19
platypush/message/event/zeroborg.py
Normal file
19
platypush/message/event/zeroborg.py
Normal file
|
@ -0,0 +1,19 @@
|
|||
from typing import Optional, Union
|
||||
|
||||
from platypush.message.event import Event
|
||||
|
||||
|
||||
class ZeroborgEvent(Event):
|
||||
pass
|
||||
|
||||
|
||||
class ZeroborgDriveEvent(ZeroborgEvent):
|
||||
def __init__(self, motors: Union[list, tuple], direction: Optional[str] = None, *args, **kwargs):
|
||||
super().__init__(*args, motors=motors, direction=direction, **kwargs)
|
||||
|
||||
|
||||
class ZeroborgStopEvent(ZeroborgEvent):
|
||||
pass
|
||||
|
||||
|
||||
# vim:sw=4:ts=4:et:
|
|
@ -2,6 +2,10 @@ import enum
|
|||
import threading
|
||||
import time
|
||||
|
||||
from typing import Optional
|
||||
|
||||
from platypush.context import get_bus
|
||||
from platypush.message.event.zeroborg import ZeroborgDriveEvent, ZeroborgStopEvent
|
||||
from platypush.plugins import Plugin, action
|
||||
from platypush.context import get_plugin
|
||||
|
||||
|
@ -21,12 +25,13 @@ class GpioZeroborgPlugin(Plugin):
|
|||
ZeroBorg plugin. It allows you to control a ZeroBorg
|
||||
(https://www.piborg.org/motor-control-1135/zeroborg) motor controller and
|
||||
infrared sensor circuitry for Raspberry Pi
|
||||
"""
|
||||
|
||||
_drive_thread = None
|
||||
_can_run = False
|
||||
_direction = None
|
||||
_init_in_progress = threading.Lock()
|
||||
Triggers:
|
||||
|
||||
* :class:`platypush.message.event.zeroborg.ZeroborgDriveEvent` when motors direction changes
|
||||
* :class:`platypush.message.event.zeroborg.ZeroborgStopEvent` upon motors stop
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, directions=None, **kwargs):
|
||||
"""
|
||||
|
@ -96,6 +101,8 @@ class GpioZeroborgPlugin(Plugin):
|
|||
self.directions = directions
|
||||
self.auto_mode = False
|
||||
self._direction = None
|
||||
self._drive_thread: Optional[threading.Thread] = None
|
||||
self._can_run: bool = False
|
||||
|
||||
self.zb = ZeroBorg.ZeroBorg()
|
||||
self.zb.Init()
|
||||
|
@ -155,10 +162,6 @@ class GpioZeroborgPlugin(Plugin):
|
|||
|
||||
"""
|
||||
|
||||
self._can_run = True
|
||||
self._direction = direction.lower()
|
||||
self.logger.info('Received ZeroBorg drive command: {}'.format(direction))
|
||||
|
||||
def _run():
|
||||
try:
|
||||
while self._can_run and self._direction:
|
||||
|
@ -173,7 +176,7 @@ class GpioZeroborgPlugin(Plugin):
|
|||
if self._direction == Direction.DIR_AUTO.value:
|
||||
self.auto_mode = True
|
||||
|
||||
motor_1_power = motor_2_power = motor_3_power = motor_4_power = 0.0
|
||||
motors = [0, 0, 0, 0]
|
||||
|
||||
try:
|
||||
if self.auto_mode:
|
||||
|
@ -181,25 +184,27 @@ class GpioZeroborgPlugin(Plugin):
|
|||
time.sleep(0.1)
|
||||
|
||||
if self._direction in self.directions and self._direction != Direction.DIR_AUTO.value:
|
||||
motor_1_power = self.directions[self._direction]['motor_1_power']
|
||||
motor_2_power = self.directions[self._direction]['motor_2_power']
|
||||
motor_3_power = self.directions[self._direction]['motor_3_power']
|
||||
motor_4_power = self.directions[self._direction]['motor_4_power']
|
||||
motors = self.directions[self._direction]
|
||||
else:
|
||||
self.logger.warning('Invalid direction {}: stopping motors'.format(self._direction))
|
||||
except Exception as e:
|
||||
self.logger.error('Error on _get_direction_from_sensors: {}'.format(str(e)))
|
||||
break
|
||||
|
||||
self.zb.SetMotor1(motor_1_power)
|
||||
self.zb.SetMotor2(motor_2_power)
|
||||
self.zb.SetMotor3(motor_3_power)
|
||||
self.zb.SetMotor4(motor_4_power)
|
||||
for i, power in enumerate(motors):
|
||||
method = getattr(self.zb, 'SetMotor{}'.format(i+1))
|
||||
method(power)
|
||||
finally:
|
||||
self.stop()
|
||||
self._drive_thread = None
|
||||
|
||||
self._drive_thread = threading.Thread(target=_run)
|
||||
self._drive_thread.start()
|
||||
self._can_run = True
|
||||
self._direction = direction.lower()
|
||||
get_bus().post(ZeroborgDriveEvent(direction=self._direction, motors=self.directions[self._direction]))
|
||||
|
||||
if not self._drive_thread:
|
||||
self._drive_thread = threading.Thread(target=_run)
|
||||
self._drive_thread.start()
|
||||
|
||||
return {'status': 'running', 'direction': direction}
|
||||
|
||||
|
@ -217,7 +222,7 @@ class GpioZeroborgPlugin(Plugin):
|
|||
|
||||
self.zb.MotorsOff()
|
||||
self.zb.ResetEpo()
|
||||
|
||||
get_bus().post(ZeroborgStopEvent())
|
||||
return {'status': 'stopped'}
|
||||
|
||||
|
||||
|
|
|
@ -33,64 +33,67 @@ import types
|
|||
import time
|
||||
|
||||
# Constant values
|
||||
I2C_SLAVE = 0x0703
|
||||
PWM_MAX = 255
|
||||
I2C_NORM_LEN = 4
|
||||
I2C_LONG_LEN = 24
|
||||
I2C_SLAVE = 0x0703
|
||||
PWM_MAX = 255
|
||||
I2C_NORM_LEN = 4
|
||||
I2C_LONG_LEN = 24
|
||||
|
||||
I2C_ID_ZEROBORG = 0x40
|
||||
I2C_ID_ZEROBORG = 0x40
|
||||
|
||||
COMMAND_SET_LED = 1 # Set the LED status
|
||||
COMMAND_GET_LED = 2 # Get the LED status
|
||||
COMMAND_SET_A_FWD = 3 # Set motor 1 PWM rate in a forwards direction
|
||||
COMMAND_SET_A_REV = 4 # Set motor 1 PWM rate in a reverse direction
|
||||
COMMAND_GET_A = 5 # Get motor 1 direction and PWM rate
|
||||
COMMAND_SET_B_FWD = 6 # Set motor 2 PWM rate in a forwards direction
|
||||
COMMAND_SET_B_REV = 7 # Set motor 2 PWM rate in a reverse direction
|
||||
COMMAND_GET_B = 8 # Get motor 2 direction and PWM rate
|
||||
COMMAND_SET_C_FWD = 9 # Set motor 3 PWM rate in a forwards direction
|
||||
COMMAND_SET_C_REV = 10 # Set motor 3 PWM rate in a reverse direction
|
||||
COMMAND_GET_C = 11 # Get motor 3 direction and PWM rate
|
||||
COMMAND_SET_D_FWD = 12 # Set motor 4 PWM rate in a forwards direction
|
||||
COMMAND_SET_D_REV = 13 # Set motor 4 PWM rate in a reverse direction
|
||||
COMMAND_GET_D = 14 # Get motor 4 direction and PWM rate
|
||||
COMMAND_ALL_OFF = 15 # Switch everything off
|
||||
COMMAND_SET_ALL_FWD = 16 # Set all motors PWM rate in a forwards direction
|
||||
COMMAND_SET_ALL_REV = 17 # Set all motors PWM rate in a reverse direction
|
||||
COMMAND_SET_FAILSAFE = 18 # Set the failsafe flag, turns the motors off if communication is interrupted
|
||||
COMMAND_GET_FAILSAFE = 19 # Get the failsafe flag
|
||||
COMMAND_RESET_EPO = 20 # Resets the EPO flag, use after EPO has been tripped and switch is now clear
|
||||
COMMAND_GET_EPO = 21 # Get the EPO latched flag
|
||||
COMMAND_SET_EPO_IGNORE = 22 # Set the EPO ignored flag, allows the system to run without an EPO
|
||||
COMMAND_GET_EPO_IGNORE = 23 # Get the EPO ignored flag
|
||||
COMMAND_GET_NEW_IR = 24 # Get the new IR message received flag
|
||||
COMMAND_GET_LAST_IR = 25 # Get the last IR message received (long message, resets new IR flag)
|
||||
COMMAND_SET_LED_IR = 26 # Set the LED for indicating IR messages
|
||||
COMMAND_GET_LED_IR = 27 # Get if the LED is being used to indicate IR messages
|
||||
COMMAND_GET_ANALOG_1 = 28 # Get the analog reading from port #1, pin 2
|
||||
COMMAND_GET_ANALOG_2 = 29 # Get the analog reading from port #2, pin 4
|
||||
COMMAND_GET_ID = 0x99 # Get the board identifier
|
||||
COMMAND_SET_I2C_ADD = 0xAA # Set a new I2C address
|
||||
COMMAND_SET_LED = 1 # Set the LED status
|
||||
COMMAND_GET_LED = 2 # Get the LED status
|
||||
COMMAND_SET_A_FWD = 3 # Set motor 1 PWM rate in a forwards direction
|
||||
COMMAND_SET_A_REV = 4 # Set motor 1 PWM rate in a reverse direction
|
||||
COMMAND_GET_A = 5 # Get motor 1 direction and PWM rate
|
||||
COMMAND_SET_B_FWD = 6 # Set motor 2 PWM rate in a forwards direction
|
||||
COMMAND_SET_B_REV = 7 # Set motor 2 PWM rate in a reverse direction
|
||||
COMMAND_GET_B = 8 # Get motor 2 direction and PWM rate
|
||||
COMMAND_SET_C_FWD = 9 # Set motor 3 PWM rate in a forwards direction
|
||||
COMMAND_SET_C_REV = 10 # Set motor 3 PWM rate in a reverse direction
|
||||
COMMAND_GET_C = 11 # Get motor 3 direction and PWM rate
|
||||
COMMAND_SET_D_FWD = 12 # Set motor 4 PWM rate in a forwards direction
|
||||
COMMAND_SET_D_REV = 13 # Set motor 4 PWM rate in a reverse direction
|
||||
COMMAND_GET_D = 14 # Get motor 4 direction and PWM rate
|
||||
COMMAND_ALL_OFF = 15 # Switch everything off
|
||||
COMMAND_SET_ALL_FWD = 16 # Set all motors PWM rate in a forwards direction
|
||||
COMMAND_SET_ALL_REV = 17 # Set all motors PWM rate in a reverse direction
|
||||
COMMAND_SET_FAILSAFE = 18 # Set the failsafe flag, turns the motors off if communication is interrupted
|
||||
COMMAND_GET_FAILSAFE = 19 # Get the failsafe flag
|
||||
COMMAND_RESET_EPO = 20 # Resets the EPO flag, use after EPO has been tripped and switch is now clear
|
||||
COMMAND_GET_EPO = 21 # Get the EPO latched flag
|
||||
COMMAND_SET_EPO_IGNORE = 22 # Set the EPO ignored flag, allows the system to run without an EPO
|
||||
COMMAND_GET_EPO_IGNORE = 23 # Get the EPO ignored flag
|
||||
COMMAND_GET_NEW_IR = 24 # Get the new IR message received flag
|
||||
COMMAND_GET_LAST_IR = 25 # Get the last IR message received (long message, resets new IR flag)
|
||||
COMMAND_SET_LED_IR = 26 # Set the LED for indicating IR messages
|
||||
COMMAND_GET_LED_IR = 27 # Get if the LED is being used to indicate IR messages
|
||||
COMMAND_GET_ANALOG_1 = 28 # Get the analog reading from port #1, pin 2
|
||||
COMMAND_GET_ANALOG_2 = 29 # Get the analog reading from port #2, pin 4
|
||||
COMMAND_GET_ID = 0x99 # Get the board identifier
|
||||
COMMAND_SET_I2C_ADD = 0xAA # Set a new I2C address
|
||||
|
||||
COMMAND_VALUE_FWD = 1 # I2C value representing forward
|
||||
COMMAND_VALUE_REV = 2 # I2C value representing reverse
|
||||
COMMAND_VALUE_FWD = 1 # I2C value representing forward
|
||||
COMMAND_VALUE_REV = 2 # I2C value representing reverse
|
||||
|
||||
COMMAND_VALUE_ON = 1 # I2C value representing on
|
||||
COMMAND_VALUE_OFF = 0 # I2C value representing off
|
||||
COMMAND_VALUE_ON = 1 # I2C value representing on
|
||||
COMMAND_VALUE_OFF = 0 # I2C value representing off
|
||||
|
||||
COMMAND_ANALOG_MAX = 0x3FF # Maximum value for analog readings
|
||||
COMMAND_ANALOG_MAX = 0x3FF # Maximum value for analog readings
|
||||
|
||||
IR_MAX_BYTES = I2C_LONG_LEN - 2
|
||||
IR_MAX_BYTES = I2C_LONG_LEN - 2
|
||||
|
||||
def ScanForZeroBorg(busNumber = 1):
|
||||
|
||||
# noinspection PyPep8Naming
|
||||
def ScanForZeroBorg(busNumber=1):
|
||||
"""
|
||||
ScanForZeroBorg([busNumber])
|
||||
|
||||
Scans the I?C bus for a ZeroBorg boards and returns a list of all usable addresses
|
||||
The busNumber if supplied is which I?C bus to scan, 0 for Rev 1 boards, 1 for Rev 2 boards, if not supplied the default is 1
|
||||
The busNumber if supplied is which I?C bus to scan, 0 for Rev 1 boards, 1 for Rev 2 boards, if not supplied
|
||||
the default is 1
|
||||
"""
|
||||
found = []
|
||||
print('Scanning I?C bus #%d' % (busNumber))
|
||||
print('Scanning I?C bus #%d' % busNumber)
|
||||
bus = ZeroBorg()
|
||||
for address in range(0x03, 0x78, 1):
|
||||
try:
|
||||
|
@ -98,7 +101,7 @@ The busNumber if supplied is which I?C bus to scan, 0 for Rev 1 boards, 1 for Re
|
|||
i2cRecv = bus.RawRead(COMMAND_GET_ID, I2C_NORM_LEN)
|
||||
if len(i2cRecv) == I2C_NORM_LEN:
|
||||
if i2cRecv[1] == I2C_ID_ZEROBORG:
|
||||
print('Found ZeroBorg at %02X' % (address))
|
||||
print('Found ZeroBorg at %02X' % address)
|
||||
found.append(address)
|
||||
else:
|
||||
pass
|
||||
|
@ -106,10 +109,11 @@ The busNumber if supplied is which I?C bus to scan, 0 for Rev 1 boards, 1 for Re
|
|||
pass
|
||||
except KeyboardInterrupt:
|
||||
raise
|
||||
except:
|
||||
pass
|
||||
except Exception as e:
|
||||
print('Error on ZeroBorg scan: {}: {}'.format(type(e), str(e)))
|
||||
|
||||
if len(found) == 0:
|
||||
print('No ZeroBorg boards found, is bus #%d correct (should be 0 for Rev 1, 1 for Rev 2)' % (busNumber))
|
||||
print('No ZeroBorg boards found, is bus #%d correct (should be 0 for Rev 1, 1 for Rev 2)' % busNumber)
|
||||
elif len(found) == 1:
|
||||
print('1 ZeroBorg board found')
|
||||
else:
|
||||
|
@ -117,13 +121,15 @@ The busNumber if supplied is which I?C bus to scan, 0 for Rev 1 boards, 1 for Re
|
|||
return found
|
||||
|
||||
|
||||
def SetNewAddress(newAddress, oldAddress = -1, busNumber = 1):
|
||||
# noinspection PyPep8Naming
|
||||
def SetNewAddress(newAddress, oldAddress=-1, busNumber=1):
|
||||
"""
|
||||
SetNewAddress(newAddress, [oldAddress], [busNumber])
|
||||
|
||||
Scans the I?C bus for the first ZeroBorg and sets it to a new I2C address
|
||||
If oldAddress is supplied it will change the address of the board at that address rather than scanning the bus
|
||||
The busNumber if supplied is which I?C bus to scan, 0 for Rev 1 boards, 1 for Rev 2 boards, if not supplied the default is 1
|
||||
The busNumber if supplied is which I?C bus to scan, 0 for Rev 1 boards, 1 for Rev 2 boards, if not supplied
|
||||
the default is 1.
|
||||
Warning, this new I?C address will still be used after resetting the power on the device
|
||||
"""
|
||||
if newAddress < 0x03:
|
||||
|
@ -142,52 +148,57 @@ Warning, this new I?C address will still be used after resetting the power on th
|
|||
print('Changing I2C address from %02X to %02X (bus #%d)' % (oldAddress, newAddress, busNumber))
|
||||
bus = ZeroBorg()
|
||||
bus.InitBusOnly(busNumber, oldAddress)
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
i2cRecv = bus.RawRead(COMMAND_GET_ID, I2C_NORM_LEN)
|
||||
if len(i2cRecv) == I2C_NORM_LEN:
|
||||
if i2cRecv[1] == I2C_ID_ZEROBORG:
|
||||
foundChip = True
|
||||
print('Found ZeroBorg at %02X' % (oldAddress))
|
||||
print('Found ZeroBorg at %02X' % oldAddress)
|
||||
else:
|
||||
foundChip = False
|
||||
print('Found a device at %02X, but it is not a ZeroBorg (ID %02X instead of %02X)' % (oldAddress, i2cRecv[1], I2C_ID_ZEROBORG))
|
||||
print('Found a device at %02X, but it is not a ZeroBorg (ID %02X instead of %02X)' % (
|
||||
oldAddress, i2cRecv[1], I2C_ID_ZEROBORG))
|
||||
else:
|
||||
foundChip = False
|
||||
print('Missing ZeroBorg at %02X' % (oldAddress))
|
||||
print('Missing ZeroBorg at %02X' % oldAddress)
|
||||
except KeyboardInterrupt:
|
||||
raise
|
||||
except:
|
||||
foundChip = False
|
||||
print('Missing ZeroBorg at %02X' % (oldAddress))
|
||||
print('Missing ZeroBorg at %02X' % oldAddress)
|
||||
if foundChip:
|
||||
bus.RawWrite(COMMAND_SET_I2C_ADD, [newAddress])
|
||||
time.sleep(0.1)
|
||||
print('Address changed to %02X, attempting to talk with the new address' % (newAddress))
|
||||
print('Address changed to %02X, attempting to talk with the new address' % newAddress)
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
bus.InitBusOnly(busNumber, newAddress)
|
||||
i2cRecv = bus.RawRead(COMMAND_GET_ID, I2C_NORM_LEN)
|
||||
if len(i2cRecv) == I2C_NORM_LEN:
|
||||
if i2cRecv[1] == I2C_ID_ZEROBORG:
|
||||
foundChip = True
|
||||
print('Found ZeroBorg at %02X' % (newAddress))
|
||||
print('Found ZeroBorg at %02X' % newAddress)
|
||||
else:
|
||||
foundChip = False
|
||||
print('Found a device at %02X, but it is not a ZeroBorg (ID %02X instead of %02X)' % (newAddress, i2cRecv[1], I2C_ID_ZEROBORG))
|
||||
print('Found a device at %02X, but it is not a ZeroBorg (ID %02X instead of %02X)' % (
|
||||
newAddress, i2cRecv[1], I2C_ID_ZEROBORG))
|
||||
else:
|
||||
foundChip = False
|
||||
print('Missing ZeroBorg at %02X' % (newAddress))
|
||||
print('Missing ZeroBorg at %02X' % newAddress)
|
||||
except KeyboardInterrupt:
|
||||
raise
|
||||
except:
|
||||
foundChip = False
|
||||
print('Missing ZeroBorg at %02X' % (newAddress))
|
||||
print('Missing ZeroBorg at %02X' % newAddress)
|
||||
if foundChip:
|
||||
print('New I?C address of %02X set successfully' % (newAddress))
|
||||
print('New I?C address of %02X set successfully' % newAddress)
|
||||
else:
|
||||
print('Failed to set new I?C address...')
|
||||
|
||||
|
||||
# Class used to control ZeroBorg
|
||||
# noinspection PyPep8Naming
|
||||
class ZeroBorg:
|
||||
"""
|
||||
This module is designed to communicate with the ZeroBorg
|
||||
|
@ -200,13 +211,12 @@ printFunction Function reference to call when printing text, if None "
|
|||
"""
|
||||
|
||||
# Shared values used by this class
|
||||
busNumber = 1 # Check here for Rev 1 vs Rev 2 and select the correct bus
|
||||
i2cAddress = I2C_ID_ZEROBORG # I?C address, override for a different address
|
||||
foundChip = False
|
||||
printFunction = None
|
||||
i2cWrite = None
|
||||
i2cRead = None
|
||||
|
||||
busNumber = 1 # Check here for Rev 1 vs Rev 2 and select the correct bus
|
||||
i2cAddress = I2C_ID_ZEROBORG # I?C address, override for a different address
|
||||
foundChip = False
|
||||
printFunction = None
|
||||
i2cWrite = None
|
||||
i2cRead = None
|
||||
|
||||
def RawWrite(self, command, data):
|
||||
"""
|
||||
|
@ -218,11 +228,11 @@ Command codes can be found at the top of ZeroBorg.py, data is a list of 0 or mor
|
|||
Under most circumstances you should use the appropriate function instead of RawWrite
|
||||
"""
|
||||
rawOutput = bytes([command])
|
||||
if data: rawOutput += bytes(data)
|
||||
if data:
|
||||
rawOutput += bytes(data)
|
||||
self.i2cWrite.write(rawOutput)
|
||||
|
||||
|
||||
def RawRead(self, command, length, retryCount = 3):
|
||||
def RawRead(self, command, length, retryCount=3):
|
||||
"""
|
||||
RawRead(command, length, [retryCount])
|
||||
|
||||
|
@ -234,6 +244,8 @@ If it does not it will retry the request until retryCount is exhausted (default
|
|||
|
||||
Under most circumstances you should use the appropriate function instead of RawRead
|
||||
"""
|
||||
reply = []
|
||||
|
||||
while retryCount > 0:
|
||||
self.RawWrite(command, [])
|
||||
rawReply = self.i2cRead.read(length)
|
||||
|
@ -247,8 +259,7 @@ Under most circumstances you should use the appropriate function instead of RawR
|
|||
if retryCount > 0:
|
||||
return reply
|
||||
else:
|
||||
raise IOError('I2C read for command %d failed' % (command))
|
||||
|
||||
raise IOError('I2C read for command %d failed' % command)
|
||||
|
||||
def InitBusOnly(self, busNumber, address):
|
||||
"""
|
||||
|
@ -259,24 +270,22 @@ This call does not check the board is present or working, under most circumstanc
|
|||
"""
|
||||
self.busNumber = busNumber
|
||||
self.i2cAddress = address
|
||||
self.i2cRead = io.open("/dev/i2c-" + str(self.busNumber), "rb", buffering = 0)
|
||||
self.i2cRead = io.open("/dev/i2c-" + str(self.busNumber), "rb", buffering=0)
|
||||
fcntl.ioctl(self.i2cRead, I2C_SLAVE, self.i2cAddress)
|
||||
self.i2cWrite = io.open("/dev/i2c-" + str(self.busNumber), "wb", buffering = 0)
|
||||
self.i2cWrite = io.open("/dev/i2c-" + str(self.busNumber), "wb", buffering=0)
|
||||
fcntl.ioctl(self.i2cWrite, I2C_SLAVE, self.i2cAddress)
|
||||
|
||||
|
||||
def Print(self, message):
|
||||
"""
|
||||
Print(message)
|
||||
|
||||
Wrapper used by the ZeroBorg instance to print messages, will call printFunction if set, print otherwise
|
||||
"""
|
||||
if self.printFunction == None:
|
||||
if self.printFunction is None:
|
||||
print(message)
|
||||
else:
|
||||
self.printFunction(message)
|
||||
|
||||
|
||||
def NoPrint(self, message):
|
||||
"""
|
||||
NoPrint(message)
|
||||
|
@ -287,42 +296,45 @@ ZB.printFunction = ZB.NoPrint
|
|||
"""
|
||||
pass
|
||||
|
||||
|
||||
def Init(self, tryOtherBus = False):
|
||||
def Init(self, tryOtherBus=False):
|
||||
"""
|
||||
Init([tryOtherBus])
|
||||
|
||||
Prepare the I2C driver for talking to the ZeroBorg
|
||||
|
||||
If tryOtherBus is True, this function will attempt to use the other bus if the ThunderBorg devices can not be found on the current busNumber
|
||||
If tryOtherBus is True, this function will attempt to use the other bus if the ThunderBorg devices can not be found on
|
||||
the current busNumber.
|
||||
|
||||
This is only really useful for early Raspberry Pi models!
|
||||
"""
|
||||
self.Print('Loading ZeroBorg on bus %d, address %02X' % (self.busNumber, self.i2cAddress))
|
||||
|
||||
# Open the bus
|
||||
self.i2cRead = io.open("/dev/i2c-" + str(self.busNumber), "rb", buffering = 0)
|
||||
self.i2cRead = io.open("/dev/i2c-" + str(self.busNumber), "rb", buffering=0)
|
||||
fcntl.ioctl(self.i2cRead, I2C_SLAVE, self.i2cAddress)
|
||||
self.i2cWrite = io.open("/dev/i2c-" + str(self.busNumber), "wb", buffering = 0)
|
||||
self.i2cWrite = io.open("/dev/i2c-" + str(self.busNumber), "wb", buffering=0)
|
||||
fcntl.ioctl(self.i2cWrite, I2C_SLAVE, self.i2cAddress)
|
||||
|
||||
# Check for ZeroBorg
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
i2cRecv = self.RawRead(COMMAND_GET_ID, I2C_NORM_LEN)
|
||||
if len(i2cRecv) == I2C_NORM_LEN:
|
||||
if i2cRecv[1] == I2C_ID_ZEROBORG:
|
||||
self.foundChip = True
|
||||
self.Print('Found ZeroBorg at %02X' % (self.i2cAddress))
|
||||
self.Print('Found ZeroBorg at %02X' % self.i2cAddress)
|
||||
else:
|
||||
self.foundChip = False
|
||||
self.Print('Found a device at %02X, but it is not a ZeroBorg (ID %02X instead of %02X)' % (self.i2cAddress, i2cRecv[1], I2C_ID_ZEROBORG))
|
||||
self.Print('Found a device at %02X, but it is not a ZeroBorg (ID %02X instead of %02X)' % (
|
||||
self.i2cAddress, i2cRecv[1], I2C_ID_ZEROBORG))
|
||||
else:
|
||||
self.foundChip = False
|
||||
self.Print('Missing ZeroBorg at %02X' % (self.i2cAddress))
|
||||
self.Print('Missing ZeroBorg at %02X' % self.i2cAddress)
|
||||
except KeyboardInterrupt:
|
||||
raise
|
||||
except:
|
||||
self.foundChip = False
|
||||
self.Print('Missing ZeroBorg at %02X' % (self.i2cAddress))
|
||||
self.Print('Missing ZeroBorg at %02X' % self.i2cAddress)
|
||||
|
||||
# See if we are missing chips
|
||||
if not self.foundChip:
|
||||
|
@ -332,15 +344,16 @@ If tryOtherBus is True, this function will attempt to use the other bus if the T
|
|||
self.busNumber = 0
|
||||
else:
|
||||
self.busNumber = 1
|
||||
self.Print('Trying bus %d instead' % (self.busNumber))
|
||||
self.Print('Trying bus %d instead' % self.busNumber)
|
||||
self.Init(False)
|
||||
else:
|
||||
self.Print('Are you sure your ZeroBorg is properly attached, the correct address is used, and the I2C drivers are running?')
|
||||
self.Print(
|
||||
'Are you sure your ZeroBorg is properly attached, the correct address is used, and the I2C ' +
|
||||
'drivers are running?')
|
||||
self.bus = None
|
||||
else:
|
||||
self.Print('ZeroBorg loaded on bus %d' % (self.busNumber))
|
||||
|
||||
|
||||
def SetMotor1(self, power):
|
||||
"""
|
||||
SetMotor1(power)
|
||||
|
@ -365,6 +378,7 @@ SetMotor1(1) -> motor 1 moving forward at 100% power
|
|||
if pwm > PWM_MAX:
|
||||
pwm = PWM_MAX
|
||||
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
self.RawWrite(command, [pwm])
|
||||
except KeyboardInterrupt:
|
||||
|
@ -372,7 +386,6 @@ SetMotor1(1) -> motor 1 moving forward at 100% power
|
|||
except:
|
||||
self.Print('Failed sending motor 1 drive level!')
|
||||
|
||||
|
||||
def GetMotor1(self):
|
||||
"""
|
||||
power = GetMotor1()
|
||||
|
@ -384,6 +397,7 @@ e.g.
|
|||
-0.5 -> motor 1 moving reverse at 50% power
|
||||
1 -> motor 1 moving forward at 100% power
|
||||
"""
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
i2cRecv = self.RawRead(COMMAND_GET_A, I2C_NORM_LEN)
|
||||
except KeyboardInterrupt:
|
||||
|
@ -401,7 +415,6 @@ e.g.
|
|||
else:
|
||||
return
|
||||
|
||||
|
||||
def SetMotor2(self, power):
|
||||
"""
|
||||
SetMotor1(power)
|
||||
|
@ -426,6 +439,7 @@ SetMotor2(1) -> motor 2 moving forward at 100% power
|
|||
if pwm > PWM_MAX:
|
||||
pwm = PWM_MAX
|
||||
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
self.RawWrite(command, [pwm])
|
||||
except KeyboardInterrupt:
|
||||
|
@ -433,7 +447,6 @@ SetMotor2(1) -> motor 2 moving forward at 100% power
|
|||
except:
|
||||
self.Print('Failed sending motor 2 drive level!')
|
||||
|
||||
|
||||
def GetMotor2(self):
|
||||
"""
|
||||
power = GetMotor2()
|
||||
|
@ -445,6 +458,7 @@ e.g.
|
|||
-0.5 -> motor 2 moving reverse at 50% power
|
||||
1 -> motor 2 moving forward at 100% power
|
||||
"""
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
i2cRecv = self.RawRead(COMMAND_GET_B, I2C_NORM_LEN)
|
||||
except KeyboardInterrupt:
|
||||
|
@ -462,7 +476,6 @@ e.g.
|
|||
else:
|
||||
return
|
||||
|
||||
|
||||
def SetMotor3(self, power):
|
||||
"""
|
||||
SetMotor3(power)
|
||||
|
@ -487,6 +500,7 @@ SetMotor3(1) -> motor 3 moving forward at 100% power
|
|||
if pwm > PWM_MAX:
|
||||
pwm = PWM_MAX
|
||||
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
self.RawWrite(command, [pwm])
|
||||
except KeyboardInterrupt:
|
||||
|
@ -494,7 +508,6 @@ SetMotor3(1) -> motor 3 moving forward at 100% power
|
|||
except:
|
||||
self.Print('Failed sending motor 3 drive level!')
|
||||
|
||||
|
||||
def GetMotor3(self):
|
||||
"""
|
||||
power = GetMotor3()
|
||||
|
@ -506,6 +519,7 @@ e.g.
|
|||
-0.5 -> motor 3 moving reverse at 50% power
|
||||
1 -> motor 3 moving forward at 100% power
|
||||
"""
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
i2cRecv = self.RawRead(COMMAND_GET_C, I2C_NORM_LEN)
|
||||
except KeyboardInterrupt:
|
||||
|
@ -523,7 +537,6 @@ e.g.
|
|||
else:
|
||||
return
|
||||
|
||||
|
||||
def SetMotor4(self, power):
|
||||
"""
|
||||
SetMotor4(power)
|
||||
|
@ -548,6 +561,7 @@ SetMotor4(1) -> motor 4 moving forward at 100% power
|
|||
if pwm > PWM_MAX:
|
||||
pwm = PWM_MAX
|
||||
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
self.RawWrite(command, [pwm])
|
||||
except KeyboardInterrupt:
|
||||
|
@ -555,7 +569,6 @@ SetMotor4(1) -> motor 4 moving forward at 100% power
|
|||
except:
|
||||
self.Print('Failed sending motor 4 drive level!')
|
||||
|
||||
|
||||
def GetMotor4(self):
|
||||
"""
|
||||
power = GetMotor4()
|
||||
|
@ -567,6 +580,7 @@ e.g.
|
|||
-0.5 -> motor 4 moving reverse at 50% power
|
||||
1 -> motor 4 moving forward at 100% power
|
||||
"""
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
i2cRecv = self.RawRead(COMMAND_GET_D, I2C_NORM_LEN)
|
||||
except KeyboardInterrupt:
|
||||
|
@ -584,7 +598,6 @@ e.g.
|
|||
else:
|
||||
return
|
||||
|
||||
|
||||
def SetMotors(self, power):
|
||||
"""
|
||||
SetMotors(power)
|
||||
|
@ -609,6 +622,7 @@ SetMotors(1) -> all motors are moving forward at 100% power
|
|||
if pwm > PWM_MAX:
|
||||
pwm = PWM_MAX
|
||||
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
self.RawWrite(command, [pwm])
|
||||
except KeyboardInterrupt:
|
||||
|
@ -616,13 +630,13 @@ SetMotors(1) -> all motors are moving forward at 100% power
|
|||
except:
|
||||
self.Print('Failed sending all motors drive level!')
|
||||
|
||||
|
||||
def MotorsOff(self):
|
||||
"""
|
||||
MotorsOff()
|
||||
|
||||
Sets all motors to stopped, useful when ending a program
|
||||
"""
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
self.RawWrite(COMMAND_ALL_OFF, [0])
|
||||
except KeyboardInterrupt:
|
||||
|
@ -630,7 +644,6 @@ Sets all motors to stopped, useful when ending a program
|
|||
except:
|
||||
self.Print('Failed sending motors off command!')
|
||||
|
||||
|
||||
def SetLed(self, state):
|
||||
"""
|
||||
SetLed(state)
|
||||
|
@ -650,13 +663,13 @@ Sets the current state of the LED, False for off, True for on
|
|||
self.Print('Failed sending LED state!')
|
||||
self.Print(e)
|
||||
|
||||
|
||||
def GetLed(self):
|
||||
"""
|
||||
state = GetLed()
|
||||
|
||||
Reads the current state of the LED, False for off, True for on
|
||||
"""
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
i2cRecv = self.RawRead(COMMAND_GET_LED, I2C_NORM_LEN)
|
||||
except KeyboardInterrupt:
|
||||
|
@ -670,13 +683,13 @@ Reads the current state of the LED, False for off, True for on
|
|||
else:
|
||||
return True
|
||||
|
||||
|
||||
def ResetEpo(self):
|
||||
"""
|
||||
ResetEpo()
|
||||
|
||||
Resets the EPO latch state, use to allow movement again after the EPO has been tripped
|
||||
"""
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
self.RawWrite(COMMAND_RESET_EPO, [0])
|
||||
except KeyboardInterrupt:
|
||||
|
@ -684,7 +697,6 @@ Resets the EPO latch state, use to allow movement again after the EPO has been t
|
|||
except:
|
||||
self.Print('Failed resetting EPO!')
|
||||
|
||||
|
||||
def GetEpo(self):
|
||||
"""
|
||||
state = GetEpo()
|
||||
|
@ -694,6 +706,7 @@ If False the EPO has not been tripped, and movement is allowed.
|
|||
If True the EPO has been tripped, movement is disabled if the EPO is not ignored (see SetEpoIgnore)
|
||||
Movement can be re-enabled by calling ResetEpo.
|
||||
"""
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
i2cRecv = self.RawRead(COMMAND_GET_EPO, I2C_NORM_LEN)
|
||||
except KeyboardInterrupt:
|
||||
|
@ -707,7 +720,6 @@ If True the EPO has been tripped, movement is disabled if the EPO is not ignored
|
|||
else:
|
||||
return True
|
||||
|
||||
|
||||
def SetEpoIgnore(self, state):
|
||||
"""
|
||||
SetEpoIgnore(state)
|
||||
|
@ -719,6 +731,7 @@ Sets the system to ignore or use the EPO latch, set to False if you have an EPO
|
|||
else:
|
||||
level = COMMAND_VALUE_OFF
|
||||
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
self.RawWrite(COMMAND_SET_EPO_IGNORE, [level])
|
||||
except KeyboardInterrupt:
|
||||
|
@ -726,13 +739,13 @@ Sets the system to ignore or use the EPO latch, set to False if you have an EPO
|
|||
except:
|
||||
self.Print('Failed sending EPO ignore state!')
|
||||
|
||||
|
||||
def GetEpoIgnore(self):
|
||||
"""
|
||||
state = GetEpoIgnore()
|
||||
|
||||
Reads the system EPO ignore state, False for using the EPO latch, True for ignoring the EPO latch
|
||||
"""
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
i2cRecv = self.RawRead(COMMAND_GET_EPO_IGNORE, I2C_NORM_LEN)
|
||||
except KeyboardInterrupt:
|
||||
|
@ -746,7 +759,6 @@ Reads the system EPO ignore state, False for using the EPO latch, True for ignor
|
|||
else:
|
||||
return True
|
||||
|
||||
|
||||
def HasNewIrMessage(self):
|
||||
"""
|
||||
state = HasNewIrMessage()
|
||||
|
@ -768,7 +780,6 @@ If True there has been a new IR message which can be read using GetIrMessage().
|
|||
else:
|
||||
return True
|
||||
|
||||
|
||||
def GetIrMessage(self):
|
||||
"""
|
||||
message = GetIrMessage()
|
||||
|
@ -777,6 +788,7 @@ Reads the last IR message which has been received and clears the new IR message
|
|||
Returns the bytes from the remote control as a hexadecimal string, e.g. 'F75AD5AA8000'
|
||||
Use HasNewIrMessage() to see if there has been a new IR message since the last call.
|
||||
"""
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
i2cRecv = self.RawRead(COMMAND_GET_LAST_IR, I2C_LONG_LEN)
|
||||
except KeyboardInterrupt:
|
||||
|
@ -787,10 +799,9 @@ Use HasNewIrMessage() to see if there has been a new IR message since the last c
|
|||
|
||||
message = ''
|
||||
for i in range(IR_MAX_BYTES):
|
||||
message += '%02X' % (i2cRecv[1+i])
|
||||
message += '%02X' % (i2cRecv[1 + i])
|
||||
return message.rstrip('0')
|
||||
|
||||
|
||||
def SetLedIr(self, state):
|
||||
"""
|
||||
SetLedIr(state)
|
||||
|
@ -802,6 +813,7 @@ Sets if IR messages control the state of the LED, False for no effect, True for
|
|||
else:
|
||||
level = COMMAND_VALUE_OFF
|
||||
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
self.RawWrite(COMMAND_SET_LED_IR, [level])
|
||||
except KeyboardInterrupt:
|
||||
|
@ -809,13 +821,13 @@ Sets if IR messages control the state of the LED, False for no effect, True for
|
|||
except:
|
||||
self.Print('Failed sending LED state!')
|
||||
|
||||
|
||||
def GetLedIr(self):
|
||||
"""
|
||||
state = GetLedIr()
|
||||
|
||||
Reads if IR messages control the state of the LED, False for no effect, True for incoming messages blink the LED
|
||||
"""
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
i2cRecv = self.RawRead(COMMAND_GET_LED_IR, I2C_NORM_LEN)
|
||||
except KeyboardInterrupt:
|
||||
|
@ -829,7 +841,6 @@ Reads if IR messages control the state of the LED, False for no effect, True for
|
|||
else:
|
||||
return True
|
||||
|
||||
|
||||
def GetAnalog1(self):
|
||||
"""
|
||||
voltage = GetAnalog1()
|
||||
|
@ -837,6 +848,7 @@ voltage = GetAnalog1()
|
|||
Reads the current analog level from port #1 (pin 2).
|
||||
Returns the value as a voltage based on the 3.3 V reference pin (pin 1).
|
||||
"""
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
i2cRecv = self.RawRead(COMMAND_GET_ANALOG_1, I2C_NORM_LEN)
|
||||
except KeyboardInterrupt:
|
||||
|
@ -849,7 +861,6 @@ Returns the value as a voltage based on the 3.3 V reference pin (pin 1).
|
|||
level = float(raw) / float(COMMAND_ANALOG_MAX)
|
||||
return level * 3.3
|
||||
|
||||
|
||||
def GetAnalog2(self):
|
||||
"""
|
||||
voltage = GetAnalog2()
|
||||
|
@ -857,6 +868,7 @@ voltage = GetAnalog2()
|
|||
Reads the current analog level from port #2 (pin 4).
|
||||
Returns the value as a voltage based on the 3.3 V reference pin (pin 1).
|
||||
"""
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
i2cRecv = self.RawRead(COMMAND_GET_ANALOG_2, I2C_NORM_LEN)
|
||||
except KeyboardInterrupt:
|
||||
|
@ -869,7 +881,6 @@ Returns the value as a voltage based on the 3.3 V reference pin (pin 1).
|
|||
level = float(raw) / float(COMMAND_ANALOG_MAX)
|
||||
return level * 3.3
|
||||
|
||||
|
||||
def SetCommsFailsafe(self, state):
|
||||
"""
|
||||
SetCommsFailsafe(state)
|
||||
|
@ -884,6 +895,7 @@ The failsafe is disabled at power on
|
|||
else:
|
||||
level = COMMAND_VALUE_OFF
|
||||
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
self.RawWrite(COMMAND_SET_FAILSAFE, [level])
|
||||
except KeyboardInterrupt:
|
||||
|
@ -891,7 +903,6 @@ The failsafe is disabled at power on
|
|||
except:
|
||||
self.Print('Failed sending communications failsafe state!')
|
||||
|
||||
|
||||
def GetCommsFailsafe(self):
|
||||
"""
|
||||
state = GetCommsFailsafe()
|
||||
|
@ -899,6 +910,7 @@ state = GetCommsFailsafe()
|
|||
Read the current system state of the communications failsafe, True for enabled, False for disabled
|
||||
The failsafe will turn the motors off unless it is commanded at least once every 1/4 of a second
|
||||
"""
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
i2cRecv = self.RawRead(COMMAND_GET_FAILSAFE, I2C_NORM_LEN)
|
||||
except KeyboardInterrupt:
|
||||
|
@ -912,18 +924,17 @@ The failsafe will turn the motors off unless it is commanded at least once every
|
|||
else:
|
||||
return True
|
||||
|
||||
|
||||
def Help(self):
|
||||
"""
|
||||
Help()
|
||||
|
||||
Displays the names and descriptions of the various functions and settings provided
|
||||
"""
|
||||
funcList = [ZeroBorg.__dict__.get(a) for a in dir(ZeroBorg) if isinstance(ZeroBorg.__dict__.get(a), types.FunctionType)]
|
||||
funcListSorted = sorted(funcList, key = lambda x: x.func_code.co_firstlineno)
|
||||
funcList = [ZeroBorg.__dict__.get(a) for a in dir(ZeroBorg) if
|
||||
isinstance(ZeroBorg.__dict__.get(a), types.FunctionType)]
|
||||
funcListSorted = sorted(funcList, key=lambda x: x.func_code.co_firstlineno)
|
||||
|
||||
print(self.__doc__)
|
||||
print
|
||||
print()
|
||||
for func in funcListSorted:
|
||||
print('=== %s === %s' % (func.func_name, func.func_doc))
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue