From a39452124ddb286eee2707e92b8775bd27fc2a61 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Mon, 17 May 2021 15:32:43 +0200 Subject: [PATCH] Refactored PCA9685 backend --- platypush/backend/joystick/linux.py | 9 ++- platypush/plugins/pwm/pca9685.py | 106 ++++++++++++++-------------- 2 files changed, 59 insertions(+), 56 deletions(-) diff --git a/platypush/backend/joystick/linux.py b/platypush/backend/joystick/linux.py index f7155bb74..9130e2a60 100644 --- a/platypush/backend/joystick/linux.py +++ b/platypush/backend/joystick/linux.py @@ -146,10 +146,10 @@ class JoystickLinuxBackend(Backend): def run(self): super().run() + self.logger.info(f'Opening {self.device}...') while not self.should_stop(): # Open the joystick device. - self.logger.info(f'Opening {self.device}...') try: jsdev = open(self.device, 'rb') self._init_joystick(jsdev) @@ -165,13 +165,16 @@ class JoystickLinuxBackend(Backend): if evbuf: _, value, evt_type, number = struct.unpack('IhBB', evbuf) + if evt_type & 0x80: # Initial state notification + continue + if evt_type & 0x01: button = self._button_map[number] if button: self._button_states[button] = value evt_class = JoystickButtonPressedEvent if value else JoystickButtonReleasedEvent # noinspection PyTypeChecker - self.bus.post(evt_class(button=button)) + self.bus.post(evt_class(device=self.device, button=button)) if evt_type & 0x02: axis = self._axis_map[number] @@ -179,7 +182,7 @@ class JoystickLinuxBackend(Backend): fvalue = value / 32767.0 self._axis_states[axis] = fvalue # noinspection PyTypeChecker - self.bus.post(JoystickAxisEvent(axis=axis, value=fvalue)) + self.bus.post(JoystickAxisEvent(device=self.device, axis=axis, value=fvalue)) except OSError as e: self.logger.warning(f'Connection to {self.device} lost: {e}') self.bus.post(JoystickDisconnectedEvent(device=self.device)) diff --git a/platypush/plugins/pwm/pca9685.py b/platypush/plugins/pwm/pca9685.py index ce448a344..1d27efee0 100644 --- a/platypush/plugins/pwm/pca9685.py +++ b/platypush/plugins/pwm/pca9685.py @@ -1,5 +1,5 @@ import time -from typing import Optional, Dict +from typing import Optional, Dict, Iterable from platypush.plugins import Plugin, action @@ -30,83 +30,82 @@ class PwmPca9685Plugin(Plugin): This plugin works with a PCA9685 circuit connected to the Platypush host over I2C interface. """ - MIN_PWM_VALUE = 0 - MAX_PWM_VALUE = 0xffff - N_CHANNELS = 16 - - def __init__(self, frequency: float, step_value: float = 0.1, step_duration: float = 0.05, **kwargs): + def __init__(self, frequency: float, min_duty_cycle: int = 0, max_duty_cycle: int = 0xffff, channels: Iterable[int] = tuple(range(16)), **kwargs): """ :param frequency: Default PWM frequency to use for the driver, in Hz. - :param step_value: How much each channel value should be increased/decreased on each step. - :param step_duration: Length of each step transient when writing PWM values (default: 0.05 seconds). + :param min_duty_cycle: Minimum PWM duty cycle (you can often find it in the documentation of your device). + Default: 0. + :param max_duty_cycle: Maximum PWM duty cycle (you can often find it in the documentation of your device). + Default: 0xffff. + :param Indices of the default channels to be controlled (default: all channels, + i.e. ``[0-15]``). """ super().__init__(**kwargs) self.frequency = frequency - self.step_value = step_value - self.step_duration = step_duration + self.min_duty_cycle = min_duty_cycle + self.max_duty_cycle = max_duty_cycle + self.channels = channels self._pca = None - @staticmethod - def _convert_percent_to_duty_cycle(value: float) -> int: - """ - Convert a duty cycle percentage value to a PCA9685 value between 0 and ``0xffff``. - - :param value: Duty cycle value, between 0 and 1. - :return: Duty cycle 16-bit value, between 0 and ``0xffff``. - """ - return int(value * 65535) - - @staticmethod - def _convert_duty_cycle_to_percent(value: int) -> float: - """ - Convert a PCA9685 duty cycle value value to a percentage value. - - :param value: Duty cycle 16-bit value, between 0 and ``0xffff``. - :return: Duty cycle percentage, between 0 and 1. - """ - return value / 65535 - @action - def write(self, channels: Dict[int, float], frequency: Optional[float] = None, step_value: Optional[float] = None, + def write(self, value: Optional[int] = None, channels: Optional[Dict[int, float]] = None, + frequency: Optional[float] = None, step: Optional[int] = None, step_duration: Optional[float] = None): """ - Send PWM values to the specified channels. + Send PWM values to the channels. - :param channels: Map of the values to be written, as ``channel_index -> value``, where value is a real number - between 0.0 (minimum duty cycle) and 1.0 (maximum duty cycle). + :param value: Send the value to all the channels (or to all the configured default channels). + ``value`` and ``channels`` are mutually exclusive. + :param channels: Map of the values to be written, as a ``channel_index -> value``, where value is a real number. + ``value`` and ``channels`` are mutually exclusive. :param frequency: Override default frequency. - :param step_value: Override default step value. - :param step_duration: Override default step duration. + :param step: If set, then the PWM duty cycle will be increased/decreased by + this much per cycle (i.e. 1/frequency). This is useful when dealing with PWM + devices that require smooth transitions, arming sequences or ramping signals. + If None (default) then the new PWM values will be directly written with no + ramping logic. + :param step_duration: If step is configured, this parameter identifies how long + each step should last (default: ``1/frequency``). """ import busio from board import SCL, SDA from adafruit_pca9685 import PCA9685 - values = {k: self._convert_percent_to_duty_cycle(v) for k, v in channels.items()} - step_value = self._convert_percent_to_duty_cycle(step_value if step_value is not None else self.step_value) - step_duration = step_duration if step_duration is not None else self.step_duration + if value is not None: + assert self.channels, 'No default channels configured' + channels = {i: value for i in self.channels} + + assert channels, 'Both value and channels are missing' + i2c_bus = busio.I2C(SCL, SDA) - pca = self._pca or PCA9685(i2c_bus) + pca = self._pca = self._pca or PCA9685(i2c_bus) pca.frequency = frequency or self.frequency - self._pca = pca + step_duration = step_duration or 1/pca.frequency + + if not step: + for i, val in channels.items(): + pca.channels[i].duty_cycle = val + return + done = False + cur_values = { + i: channel.duty_cycle + for i, channel in enumerate(pca.channels) + } while not done: done = True - for channel, value in values.items(): - if abs(value - pca.channels[channel].duty_cycle) < 2: + for i, val in channels.items(): + if val == cur_values[i]: continue done = False - if value > pca.channels[channel].duty_cycle: - pca.channels[channel].duty_cycle = min(pca.channels[channel].duty_cycle + step_value, - value, - self.MAX_PWM_VALUE) - else: - pca.channels[channel].duty_cycle = max(pca.channels[channel].duty_cycle - step_value, - value, - self.MIN_PWM_VALUE) + val = min(cur_values[i] + step, val, self.max_duty_cycle) \ + if val > pca.channels[i].duty_cycle \ + else max(cur_values[i] - step, val, self.min_duty_cycle) + + pca.channels[i].duty_cycle = cur_values[i] = val time.sleep(step_duration) @@ -119,10 +118,10 @@ class PwmPca9685Plugin(Plugin): channel, as a percentage value between 0 and 1. """ if not self._pca: - return {i: 0 for i in range(self.N_CHANNELS)} + return {i: 0 for i in self.channels} return { - i: self._convert_duty_cycle_to_percent(channel.duty_cycle) + i: channel.duty_cycle for i, channel in enumerate(self._pca.channels) } @@ -148,3 +147,4 @@ class PwmPca9685Plugin(Plugin): return self._pca.reset() +