forked from platypush/platypush
Added Arduino over Firmata integration - closes #92
This commit is contained in:
parent
42686f229e
commit
d8a7c9c6e0
8 changed files with 438 additions and 15 deletions
|
@ -233,6 +233,7 @@ autodoc_mock_imports = ['googlesamples.assistant.grpc.audio_helpers',
|
||||||
'trello',
|
'trello',
|
||||||
'telegram',
|
'telegram',
|
||||||
'telegram.ext',
|
'telegram.ext',
|
||||||
|
'pyfirmata2'
|
||||||
]
|
]
|
||||||
|
|
||||||
sys.path.insert(0, os.path.abspath('../..'))
|
sys.path.insert(0, os.path.abspath('../..'))
|
||||||
|
|
|
@ -119,15 +119,16 @@ class SensorBackend(Backend):
|
||||||
continue
|
continue
|
||||||
|
|
||||||
tolerance = None
|
tolerance = None
|
||||||
isNaN = False
|
is_nan = False
|
||||||
|
old_v = None
|
||||||
|
|
||||||
try:
|
try:
|
||||||
v = float(v)
|
v = float(v)
|
||||||
old_v = float(self.data.get(k))
|
old_v = float(self.data.get(k))
|
||||||
except (TypeError, ValueError):
|
except (TypeError, ValueError):
|
||||||
isNaN = True
|
is_nan = True
|
||||||
|
|
||||||
if not isNaN:
|
if not is_nan:
|
||||||
if isinstance(self.tolerance, dict):
|
if isinstance(self.tolerance, dict):
|
||||||
tolerance = float(self.tolerance.get(k, self.default_tolerance))
|
tolerance = float(self.tolerance.get(k, self.default_tolerance))
|
||||||
else:
|
else:
|
||||||
|
@ -136,12 +137,19 @@ class SensorBackend(Backend):
|
||||||
except (TypeError, ValueError):
|
except (TypeError, ValueError):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
if tolerance is None or isNaN or \
|
if tolerance is None or is_nan or abs(v - old_v) >= tolerance:
|
||||||
abs(v - old_v) >= tolerance:
|
|
||||||
ret[k] = v
|
ret[k] = v
|
||||||
|
|
||||||
return ret
|
return ret
|
||||||
|
|
||||||
|
def on_stop(self):
|
||||||
|
if not self.plugin:
|
||||||
|
return
|
||||||
|
|
||||||
|
plugin = get_plugin(self.plugin)
|
||||||
|
if plugin and hasattr(plugin, 'close'):
|
||||||
|
plugin.close()
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
super().run()
|
super().run()
|
||||||
self.logger.info('Initialized {} sensor backend'.format(self.__class__.__name__))
|
self.logger.info('Initialized {} sensor backend'.format(self.__class__.__name__))
|
||||||
|
|
18
platypush/backend/sensor/arduino.py
Normal file
18
platypush/backend/sensor/arduino.py
Normal file
|
@ -0,0 +1,18 @@
|
||||||
|
from platypush.backend.sensor import SensorBackend
|
||||||
|
|
||||||
|
|
||||||
|
class SensorArduinoBackend(SensorBackend):
|
||||||
|
"""
|
||||||
|
This backend listens for new events from an Arduino with a Firmata-compatible firmware.
|
||||||
|
|
||||||
|
Requires:
|
||||||
|
|
||||||
|
* The :class:`platypush.plugins.arduino.ArduinoPlugin` plugin configured.
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, **kwargs):
|
||||||
|
super().__init__(plugin='arduino', **kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
# vim:sw=4:ts=4:et:
|
|
@ -51,8 +51,10 @@ class Request(Message):
|
||||||
'args': msg.get('args', {}), 'id': msg['id'] if 'id' in msg else cls._generate_id(),
|
'args': msg.get('args', {}), 'id': msg['id'] if 'id' in msg else cls._generate_id(),
|
||||||
'timestamp': msg['_timestamp'] if '_timestamp' in msg else time.time()}
|
'timestamp': msg['_timestamp'] if '_timestamp' in msg else time.time()}
|
||||||
|
|
||||||
if 'origin' in msg: args['origin'] = msg['origin']
|
if 'origin' in msg:
|
||||||
if 'token' in msg: args['token'] = msg['token']
|
args['origin'] = msg['origin']
|
||||||
|
if 'token' in msg:
|
||||||
|
args['token'] = msg['token']
|
||||||
return cls(**args)
|
return cls(**args)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
|
@ -78,11 +80,12 @@ class Request(Message):
|
||||||
def _expand_context(self, event_args=None, **context):
|
def _expand_context(self, event_args=None, **context):
|
||||||
from platypush.config import Config
|
from platypush.config import Config
|
||||||
|
|
||||||
if event_args is None: event_args = copy.deepcopy(self.args)
|
if event_args is None:
|
||||||
|
event_args = copy.deepcopy(self.args)
|
||||||
|
|
||||||
constants = Config.get_constants()
|
constants = Config.get_constants()
|
||||||
context['constants'] = {}
|
context['constants'] = {}
|
||||||
for (name,value) in constants.items():
|
for (name, value) in constants.items():
|
||||||
context['constants'][name] = value
|
context['constants'][name] = value
|
||||||
|
|
||||||
keys = []
|
keys = []
|
||||||
|
@ -103,6 +106,7 @@ class Request(Message):
|
||||||
|
|
||||||
return event_args
|
return event_args
|
||||||
|
|
||||||
|
# noinspection PyBroadException
|
||||||
@classmethod
|
@classmethod
|
||||||
def expand_value_from_context(cls, _value, **context):
|
def expand_value_from_context(cls, _value, **context):
|
||||||
for (k, v) in context.items():
|
for (k, v) in context.items():
|
||||||
|
@ -142,9 +146,9 @@ class Request(Message):
|
||||||
|
|
||||||
parsed_value += prefix + (
|
parsed_value += prefix + (
|
||||||
json.dumps(context_value)
|
json.dumps(context_value)
|
||||||
if isinstance(context_value, list)
|
if isinstance(context_value, list) or isinstance(context_value, dict)
|
||||||
or isinstance(context_value, dict)
|
else str(context_value)
|
||||||
else str(context_value))
|
)
|
||||||
else:
|
else:
|
||||||
parsed_value += _value
|
parsed_value += _value
|
||||||
_value = ''
|
_value = ''
|
||||||
|
@ -219,6 +223,10 @@ class Request(Message):
|
||||||
elif not response.disable_logging:
|
elif not response.disable_logging:
|
||||||
logger.info('Processed response from action {}: {}'.
|
logger.info('Processed response from action {}: {}'.
|
||||||
format(action, str(response)))
|
format(action, str(response)))
|
||||||
|
except AssertionError as e:
|
||||||
|
plugin.logger.exception(e)
|
||||||
|
logger.warning('Assertion error from action [{}]: {}'.format(action, str(e)))
|
||||||
|
response = Response(output=None, errors=[str(e)])
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
# Retry mechanism
|
# Retry mechanism
|
||||||
plugin.logger.exception(e)
|
plugin.logger.exception(e)
|
||||||
|
@ -230,10 +238,10 @@ class Request(Message):
|
||||||
errors.append(str(e))
|
errors.append(str(e))
|
||||||
|
|
||||||
response = Response(output=None, errors=errors)
|
response = Response(output=None, errors=errors)
|
||||||
if _n_tries-1 > 0:
|
if _n_tries - 1 > 0:
|
||||||
logger.info('Reloading plugin {} and retrying'.format(module_name))
|
logger.info('Reloading plugin {} and retrying'.format(module_name))
|
||||||
get_plugin(module_name, reload=True)
|
get_plugin(module_name, reload=True)
|
||||||
response = _thread_func(_n_tries=_n_tries - 1, errors=errors)
|
response = _thread_func(_n_tries=_n_tries-1, errors=errors)
|
||||||
finally:
|
finally:
|
||||||
self._send_response(response)
|
self._send_response(response)
|
||||||
return response
|
return response
|
||||||
|
@ -266,5 +274,4 @@ class Request(Message):
|
||||||
'_timestamp': self.timestamp,
|
'_timestamp': self.timestamp,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
||||||
# vim:sw=4:ts=4:et:
|
# vim:sw=4:ts=4:et:
|
||||||
|
|
382
platypush/plugins/arduino.py
Normal file
382
platypush/plugins/arduino.py
Normal file
|
@ -0,0 +1,382 @@
|
||||||
|
import enum
|
||||||
|
import time
|
||||||
|
|
||||||
|
from typing import Optional, Dict, Union, Callable, Tuple, Type
|
||||||
|
|
||||||
|
from pyfirmata2 import Arduino, ArduinoMega, ArduinoDue, ArduinoNano, Pin, util, ANALOG, INPUT, PWM
|
||||||
|
|
||||||
|
from platypush.plugins import action
|
||||||
|
from platypush.plugins.gpio.sensor import GpioSensorPlugin
|
||||||
|
|
||||||
|
|
||||||
|
class PinType(enum.IntEnum):
|
||||||
|
ANALOG = 1
|
||||||
|
DIGITAL = 2
|
||||||
|
|
||||||
|
|
||||||
|
class BoardType(enum.Enum):
|
||||||
|
MEGA = 'mega'
|
||||||
|
DUE = 'due'
|
||||||
|
NANO = 'nano'
|
||||||
|
|
||||||
|
|
||||||
|
# noinspection PyBroadException
|
||||||
|
class ArduinoPlugin(GpioSensorPlugin):
|
||||||
|
"""
|
||||||
|
Interact with an Arduino connected to the host machine over USB using the
|
||||||
|
`Firmata <https://www.arduino.cc/en/reference/firmata>`_ protocol.
|
||||||
|
|
||||||
|
You have two options to communicate with an Arduino-compatible board over USB:
|
||||||
|
|
||||||
|
- Use this plugin if you want to use the general-purpose Firmata protocol - in this case most of your
|
||||||
|
processing logic will be on the host side and you can read/write data to the Arduino transparently.
|
||||||
|
- Use the :class:`platypush.plugins.serial.SerialPlugin` if instead you want to run more custom logic
|
||||||
|
on the Arduino and communicate back with the host computer through JSON formatted messages.
|
||||||
|
|
||||||
|
Download and flash the
|
||||||
|
`Standard Firmata <https://github.com/firmata/arduino/blob/master/examples/StandardFirmata/StandardFirmata.ino>`_
|
||||||
|
firmware to the Arduino in order to use this plugin.
|
||||||
|
|
||||||
|
Requires:
|
||||||
|
|
||||||
|
* **pyfirmata2** (``pip install pyfirmata2``)
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self,
|
||||||
|
board: Optional[str] = None,
|
||||||
|
board_type: Optional[str] = None,
|
||||||
|
baud_rate: int = 57600,
|
||||||
|
analog_pins: Optional[Dict[str, int]] = None,
|
||||||
|
digital_pins: Optional[Dict[str, int]] = None,
|
||||||
|
timeout: float = 20.0,
|
||||||
|
conv_functions: Optional[Dict[Union[str, int], Union[str, Callable]]] = None,
|
||||||
|
**kwargs):
|
||||||
|
"""
|
||||||
|
:param board: Default board name or path (e.g. ``COM3`` on Windows or ``/dev/ttyUSB0`` on Unix). If not set
|
||||||
|
then the plugin will attempt an auto-discovery.
|
||||||
|
|
||||||
|
:param board_type: Default board type. It can be 'mega', 'due' or 'nano'. Leave empty for auto-detection.
|
||||||
|
:param baud_rate: Default serial baud rate (default: 57600)
|
||||||
|
:param analog_pins: Optional analog PINs map name->pin_number.
|
||||||
|
:param digital_pins: Optional digital PINs map name->pin_number.
|
||||||
|
:param timeout: Board communication timeout in seconds.
|
||||||
|
:param conv_functions: Optional mapping of conversion functions to apply to the analog values read from a
|
||||||
|
certain PIN. The key can either be the PIN number or the name as specified in ``analog_pins``, the value
|
||||||
|
can be a function that takes an argument and transforms it or its lambda string representation.
|
||||||
|
Note that ``analog_read`` returns by default float values in the range [0.0, 1.0]. Example:
|
||||||
|
|
||||||
|
..code-block:: yaml
|
||||||
|
|
||||||
|
arduino:
|
||||||
|
board: /dev/ttyUSB0
|
||||||
|
analog_pins:
|
||||||
|
temperature: 1 # Analog PIN 1
|
||||||
|
|
||||||
|
conv_functions:
|
||||||
|
temperature: 'lambda t: t * 500.0'
|
||||||
|
|
||||||
|
"""
|
||||||
|
super().__init__(**kwargs)
|
||||||
|
|
||||||
|
self.board = board
|
||||||
|
self.board_type = self._get_board_type(board_type)
|
||||||
|
self.baud_rate = baud_rate
|
||||||
|
self.timeout = timeout
|
||||||
|
|
||||||
|
self._pin_number_by_name = {
|
||||||
|
PinType.ANALOG: analog_pins or {},
|
||||||
|
PinType.DIGITAL: digital_pins or {},
|
||||||
|
}
|
||||||
|
|
||||||
|
self._pin_name_by_number = {
|
||||||
|
PinType.ANALOG: {number: name for name, number in self._pin_number_by_name[PinType.ANALOG].items()},
|
||||||
|
PinType.DIGITAL: {number: name for name, number in self._pin_number_by_name[PinType.DIGITAL].items()},
|
||||||
|
}
|
||||||
|
|
||||||
|
self.conv_functions: Dict[Union[str, int], Callable] = {
|
||||||
|
(self._pin_number_by_name[PinType.ANALOG].get(pin, pin)): (f if callable(f) else eval(f))
|
||||||
|
for pin, f in (conv_functions or {}).items()
|
||||||
|
}
|
||||||
|
|
||||||
|
self._boards: Dict[str, Arduino] = {}
|
||||||
|
self._board_iterators: Dict[str, util.Iterator] = {}
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _get_board_type(board_type: Optional[str] = None) -> Type:
|
||||||
|
if not board_type:
|
||||||
|
return Arduino
|
||||||
|
|
||||||
|
board_type = board_type.lower()
|
||||||
|
if board_type == BoardType.DUE.value:
|
||||||
|
return ArduinoDue
|
||||||
|
if board_type == BoardType.NANO.value:
|
||||||
|
return ArduinoNano
|
||||||
|
if board_type == BoardType.MEGA.value:
|
||||||
|
return ArduinoMega
|
||||||
|
|
||||||
|
raise AssertionError('Invalid board_type: {}'.format(board_type))
|
||||||
|
|
||||||
|
def _get_board(self,
|
||||||
|
board: Optional[str] = None,
|
||||||
|
board_type: Optional[str] = None,
|
||||||
|
baud_rate: Optional[int] = None,
|
||||||
|
timeout: Optional[float] = None):
|
||||||
|
board_name = board or self.board or Arduino.AUTODETECT
|
||||||
|
baud_rate = baud_rate or self.baud_rate
|
||||||
|
timeout = timeout or self.timeout
|
||||||
|
|
||||||
|
if board_name in self._boards:
|
||||||
|
return self._boards[board_name]
|
||||||
|
|
||||||
|
board_type = self._get_board_type(board_type) if board_type else self.board_type
|
||||||
|
board = board_type(board_name, baudrate=baud_rate, timeout=timeout)
|
||||||
|
self.logger.info('Connected to board {}'.format(board.name))
|
||||||
|
self._boards[board_name] = board
|
||||||
|
|
||||||
|
self._board_iterators[board.name] = util.Iterator(board)
|
||||||
|
self._board_iterators[board.name].start()
|
||||||
|
return board
|
||||||
|
|
||||||
|
def _get_board_and_pin(self,
|
||||||
|
pin: Union[int, str],
|
||||||
|
pin_type: PinType,
|
||||||
|
board: Optional[str] = None,
|
||||||
|
board_type: Optional[str] = None,
|
||||||
|
baud_rate: Optional[int] = None,
|
||||||
|
timeout: Optional[int] = None) -> Tuple[Arduino, int]:
|
||||||
|
board = self._get_board(board, board_type=board_type, baud_rate=baud_rate, timeout=timeout)
|
||||||
|
if pin in self._pin_number_by_name[pin_type]:
|
||||||
|
pin = self._pin_number_by_name[pin_type][pin]
|
||||||
|
|
||||||
|
assert isinstance(pin, int), 'Invalid PIN number/name: {}'.format(pin)
|
||||||
|
return board, pin
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _get_pin(pin: int, board: Arduino, pin_type: PinType) -> Pin:
|
||||||
|
pins = None
|
||||||
|
if pin_type == PinType.ANALOG:
|
||||||
|
pins = board.analog
|
||||||
|
if pin_type == PinType.DIGITAL:
|
||||||
|
pins = board.digital
|
||||||
|
|
||||||
|
assert pins, 'Invalid pin_type: {}'.format(pin_type)
|
||||||
|
|
||||||
|
if pins[pin].mode in [ANALOG, INPUT]:
|
||||||
|
pins[pin].enable_reporting()
|
||||||
|
return pins[pin]
|
||||||
|
|
||||||
|
def _poll_value(self,
|
||||||
|
pin: int,
|
||||||
|
board: Arduino,
|
||||||
|
pin_type: PinType,
|
||||||
|
timeout: Optional[float] = None) -> Optional[Union[bool, float]]:
|
||||||
|
value = None
|
||||||
|
poll_start = time.time()
|
||||||
|
|
||||||
|
while value is None:
|
||||||
|
if timeout and time.time() - poll_start >= timeout:
|
||||||
|
raise RuntimeError('Read timeout')
|
||||||
|
|
||||||
|
pin_ = self._get_pin(pin=pin, board=board, pin_type=pin_type)
|
||||||
|
if pin_.mode not in [INPUT, ANALOG]:
|
||||||
|
self.logger.warning('PIN {} is not configured in input/analog mode'.format(pin))
|
||||||
|
return
|
||||||
|
|
||||||
|
value = pin_.read()
|
||||||
|
if value is None:
|
||||||
|
time.sleep(0.001)
|
||||||
|
|
||||||
|
if pin_type == PinType.DIGITAL:
|
||||||
|
value = bool(value)
|
||||||
|
|
||||||
|
return value
|
||||||
|
|
||||||
|
@action
|
||||||
|
def analog_read(self,
|
||||||
|
pin: Union[int, str],
|
||||||
|
board: Optional[str] = None,
|
||||||
|
board_type: Optional[str] = None,
|
||||||
|
baud_rate: Optional[int] = None,
|
||||||
|
conv_function: Optional[Union[str, Callable]] = None,
|
||||||
|
timeout: Optional[int] = None) -> float:
|
||||||
|
"""
|
||||||
|
Read an analog value from a PIN.
|
||||||
|
|
||||||
|
:param pin: PIN number or configured name.
|
||||||
|
:param board: Board path or name (default: default configured ``board``).
|
||||||
|
:param board_type: Board type. It can be 'mega', 'due' or 'nano' (default: configured ``board_type``).
|
||||||
|
:param baud_rate: Baud rate (default: default configured ``baud_rate``).
|
||||||
|
:param conv_function: Optional conversion function override to apply to the output. It can be either a function
|
||||||
|
object or its lambda string representation (e.g. ``lambda x: x*x``). Keep in mind that ``analog_read``
|
||||||
|
returns by default float values in the range ``[0.0, 1.0]``.
|
||||||
|
:param timeout: Communication timeout in seconds (default: default configured ``timeout``).
|
||||||
|
"""
|
||||||
|
conv_function = conv_function or self.conv_functions.get(pin)
|
||||||
|
board, pin = self._get_board_and_pin(pin=pin,
|
||||||
|
pin_type=PinType.ANALOG,
|
||||||
|
board=board,
|
||||||
|
board_type=board_type,
|
||||||
|
baud_rate=baud_rate,
|
||||||
|
timeout=timeout)
|
||||||
|
|
||||||
|
conv_function = conv_function or self.conv_functions.get(pin)
|
||||||
|
value = self._poll_value(pin=pin, board=board, pin_type=PinType.ANALOG, timeout=timeout)
|
||||||
|
|
||||||
|
if conv_function:
|
||||||
|
value = conv_function(value)
|
||||||
|
return value
|
||||||
|
|
||||||
|
@action
|
||||||
|
def digital_read(self,
|
||||||
|
pin: Union[int, str],
|
||||||
|
board: Optional[str] = None,
|
||||||
|
board_type: Optional[str] = None,
|
||||||
|
baud_rate: Optional[int] = None,
|
||||||
|
timeout: Optional[int] = None) -> bool:
|
||||||
|
"""
|
||||||
|
Read a digital value from a PIN.
|
||||||
|
|
||||||
|
:param pin: PIN number or configured name.
|
||||||
|
:param board: Board path or name (default: default configured ``board``).
|
||||||
|
:param board_type: Board type. It can be 'mega', 'due' or 'nano' (default: configured ``board_type``).
|
||||||
|
:param baud_rate: Baud rate (default: default configured ``baud_rate``).
|
||||||
|
:param timeout: Communication timeout in seconds (default: default configured ``timeout``).
|
||||||
|
"""
|
||||||
|
board, pin = self._get_board_and_pin(pin=pin,
|
||||||
|
pin_type=PinType.DIGITAL,
|
||||||
|
board=board,
|
||||||
|
board_type=board_type,
|
||||||
|
baud_rate=baud_rate,
|
||||||
|
timeout=timeout)
|
||||||
|
|
||||||
|
return self._poll_value(pin=pin, board=board, pin_type=PinType.DIGITAL, timeout=timeout)
|
||||||
|
|
||||||
|
@action
|
||||||
|
def analog_write(self, pin: Union[int, str],
|
||||||
|
value: float,
|
||||||
|
board: Optional[str] = None,
|
||||||
|
board_type: Optional[str] = None,
|
||||||
|
baud_rate: Optional[int] = None,
|
||||||
|
timeout: Optional[int] = None):
|
||||||
|
"""
|
||||||
|
Write a value to an analog PIN.
|
||||||
|
|
||||||
|
:param pin: PIN number or configured name.
|
||||||
|
:param value: Voltage to be sent, a real number normalized between 0 and 1.
|
||||||
|
:param board: Board path or name (default: default configured ``board``).
|
||||||
|
:param board_type: Board type. It can be 'mega', 'due' or 'nano' (default: configured ``board_type``).
|
||||||
|
:param baud_rate: Baud rate (default: default configured ``baud_rate``).
|
||||||
|
:param timeout: Communication timeout in seconds (default: default configured ``timeout``).
|
||||||
|
"""
|
||||||
|
board, pin = self._get_board_and_pin(pin=pin,
|
||||||
|
pin_type=PinType.ANALOG,
|
||||||
|
board=board,
|
||||||
|
board_type=board_type,
|
||||||
|
baud_rate=baud_rate,
|
||||||
|
timeout=timeout)
|
||||||
|
board.analog[pin].write(value)
|
||||||
|
|
||||||
|
@action
|
||||||
|
def digital_write(self, pin: Union[int, str],
|
||||||
|
value: bool,
|
||||||
|
board: Optional[str] = None,
|
||||||
|
board_type: Optional[str] = None,
|
||||||
|
baud_rate: Optional[int] = None,
|
||||||
|
timeout: Optional[int] = None):
|
||||||
|
"""
|
||||||
|
Write a value to a digital PIN.
|
||||||
|
|
||||||
|
:param pin: PIN number or configured name.
|
||||||
|
:param value: True (HIGH) or False (LOW).
|
||||||
|
:param board: Board path or name (default: default configured ``board``).
|
||||||
|
:param board_type: Board type. It can be 'mega', 'due' or 'nano' (default: configured ``board_type``).
|
||||||
|
:param baud_rate: Baud rate (default: default configured ``baud_rate``).
|
||||||
|
:param timeout: Communication timeout in seconds (default: default configured ``timeout``).
|
||||||
|
"""
|
||||||
|
board, pin = self._get_board_and_pin(pin=pin,
|
||||||
|
pin_type=PinType.DIGITAL,
|
||||||
|
board=board,
|
||||||
|
board_type=board_type,
|
||||||
|
baud_rate=baud_rate,
|
||||||
|
timeout=timeout)
|
||||||
|
board.digital[pin].write(value)
|
||||||
|
|
||||||
|
@action
|
||||||
|
def pwm_write(self, pin: Union[int, str],
|
||||||
|
value: float,
|
||||||
|
board: Optional[str] = None,
|
||||||
|
board_type: Optional[str] = None,
|
||||||
|
baud_rate: Optional[int] = None,
|
||||||
|
timeout: Optional[int] = None):
|
||||||
|
"""
|
||||||
|
Write a PWM value to a digital PIN.
|
||||||
|
|
||||||
|
:param pin: PIN number or configured name.
|
||||||
|
:param value: PWM real value normalized between 0 and 1.
|
||||||
|
:param board: Board path or name (default: default configured ``board``).
|
||||||
|
:param board_type: Board type. It can be 'mega', 'due' or 'nano' (default: configured ``board_type``).
|
||||||
|
:param baud_rate: Baud rate (default: default configured ``baud_rate``).
|
||||||
|
:param timeout: Communication timeout in seconds (default: default configured ``timeout``).
|
||||||
|
"""
|
||||||
|
board, pin = self._get_board_and_pin(pin=pin,
|
||||||
|
pin_type=PinType.DIGITAL,
|
||||||
|
board=board,
|
||||||
|
board_type=board_type,
|
||||||
|
baud_rate=baud_rate,
|
||||||
|
timeout=timeout)
|
||||||
|
|
||||||
|
assert board.digital[pin].PWM_CAPABLE, 'PIN {} is not PWM capable'.format(pin)
|
||||||
|
if board.digital[pin] != PWM:
|
||||||
|
board.digital[pin].mode = PWM
|
||||||
|
time.sleep(0.001)
|
||||||
|
|
||||||
|
board.digital[pin].write(value)
|
||||||
|
|
||||||
|
@action
|
||||||
|
def get_measurement(self,
|
||||||
|
board: Optional[str] = None,
|
||||||
|
board_type: Optional[str] = None,
|
||||||
|
baud_rate: Optional[int] = None,
|
||||||
|
timeout: Optional[int] = None) -> Dict[str, float]:
|
||||||
|
"""
|
||||||
|
Get a measurement from all the configured PINs.
|
||||||
|
|
||||||
|
:param board: Board path or name (default: default configured ``board``)
|
||||||
|
:param board_type: Board type. It can be 'mega', 'due' or 'nano' (default: configured ``board_type``).
|
||||||
|
:param baud_rate: Baud rate (default: default configured ``baud_rate``)
|
||||||
|
:param timeout: Communication timeout in seconds (default: default configured ``timeout``).
|
||||||
|
:return: dict, where the keys are either the configured names of the PINs (see ``analog_pins`` configuration)
|
||||||
|
or all the analog PINs (names will be in the format 'A0..A7' in that case), and the values will be the
|
||||||
|
real values measured, either normalized between 0 and 1 if no conversion functions were provided, or
|
||||||
|
transformed through the configured ``conv_functions``.
|
||||||
|
"""
|
||||||
|
ret = {}
|
||||||
|
board = self._get_board(board=board, board_type=board_type, baud_rate=baud_rate, timeout=timeout)
|
||||||
|
|
||||||
|
for pin in board.analog:
|
||||||
|
if self._pin_name_by_number[PinType.ANALOG] and \
|
||||||
|
pin.pin_number not in self._pin_name_by_number[PinType.ANALOG]:
|
||||||
|
continue
|
||||||
|
|
||||||
|
name = self._pin_name_by_number[PinType.ANALOG].get(pin.pin_number, 'A{}'.format(pin.pin_number))
|
||||||
|
value = self._poll_value(pin=pin.pin_number, board=board, pin_type=PinType.ANALOG,
|
||||||
|
timeout=timeout or self.timeout)
|
||||||
|
|
||||||
|
if value is None:
|
||||||
|
continue
|
||||||
|
|
||||||
|
conv_function = self.conv_functions.get(name, self.conv_functions.get(pin.pin_number))
|
||||||
|
if conv_function:
|
||||||
|
value = conv_function(value)
|
||||||
|
|
||||||
|
ret[name] = value
|
||||||
|
|
||||||
|
return ret
|
||||||
|
|
||||||
|
@action
|
||||||
|
def close(self):
|
||||||
|
for board in self._boards.values():
|
||||||
|
board.exit()
|
||||||
|
|
||||||
|
# vim:sw=4:ts=4:et:
|
|
@ -14,6 +14,7 @@ class SwitchTplinkPlugin(SwitchPlugin):
|
||||||
Requires:
|
Requires:
|
||||||
|
|
||||||
* **pyHS100** (``pip install pyHS100``)
|
* **pyHS100** (``pip install pyHS100``)
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
_ip_to_dev = {}
|
_ip_to_dev = {}
|
||||||
|
|
|
@ -206,3 +206,7 @@ croniter
|
||||||
|
|
||||||
# Support for Telegram integration
|
# Support for Telegram integration
|
||||||
# python-telegram-bot
|
# python-telegram-bot
|
||||||
|
|
||||||
|
# Support for Arduino integration
|
||||||
|
# pyserial
|
||||||
|
# pyfirmata2
|
||||||
|
|
2
setup.py
2
setup.py
|
@ -267,5 +267,7 @@ setup(
|
||||||
'buienradar': ['buienradar'],
|
'buienradar': ['buienradar'],
|
||||||
# Support for Telegram integration
|
# Support for Telegram integration
|
||||||
'telegram': ['python-telegram-bot'],
|
'telegram': ['python-telegram-bot'],
|
||||||
|
# Support for Arduino integration
|
||||||
|
'arduino': ['pyserial', 'pyfirmata2'],
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
Loading…
Reference in a new issue