[joystick] Rewritten `joystick` integration as a plugin.
continuous-integration/drone/push Build is passing Details

And removed legacy `joystick*` backends and `inputs` plugin.

Closes: #290
This commit is contained in:
Fabio Manganiello 2023-11-16 21:42:57 +01:00
parent 5e629990e6
commit 7bb08bca07
Signed by: blacklight
GPG Key ID: D90FBA7F76362774
15 changed files with 4230 additions and 812 deletions

View File

@ -1,42 +0,0 @@
import time
from platypush.backend import Backend
from platypush.message.event.joystick import JoystickEvent
class JoystickBackend(Backend):
"""
This backend will listen for events from a joystick device and post a
JoystickEvent whenever a new event is captured.
"""
def __init__(self, device, *args, **kwargs):
"""
:param device: Path to the joystick device (e.g. `/dev/input/js0`)
:type device_name: str
"""
super().__init__(*args, **kwargs)
self.device = device
def run(self):
import inputs
super().run()
self.logger.info(
'Initialized joystick backend on device {}'.format(self.device)
)
while not self.should_stop():
try:
events = inputs.get_gamepad()
for event in events:
if event.ev_type == 'Key' or event.ev_type == 'Absolute':
self.bus.post(JoystickEvent(code=event.code, state=event.state))
except Exception as e:
self.logger.exception(e)
time.sleep(1)
# vim:sw=4:ts=4:et:

View File

@ -1,283 +0,0 @@
import json
import os
import re
import subprocess
import time
from typing import Optional, List
from platypush.backend import Backend
from platypush.message.event.joystick import (
JoystickConnectedEvent,
JoystickDisconnectedEvent,
JoystickStateEvent,
JoystickButtonPressedEvent,
JoystickButtonReleasedEvent,
JoystickAxisEvent,
)
class JoystickState:
def __init__(self, axes: List[int], buttons: List[bool]):
self.axes = axes
self.buttons = buttons
def __str__(self):
return json.dumps(self.__dict__)
def __eq__(self, obj):
return obj.axes == self.axes and obj.buttons == self.buttons
def __sub__(self, obj) -> dict:
if len(obj.axes) < len(self.axes) or len(obj.buttons) < len(self.buttons):
return {}
diff = {
'axes': {
axis: obj.axes[axis]
for axis in range(len(self.axes))
if self.axes[axis] != obj.axes[axis]
},
'buttons': {
button: obj.buttons[button]
for button in range(len(self.buttons))
if self.buttons[button] != obj.buttons[button]
},
}
return {k: v for k, v in diff.items() if v}
class JoystickJstestBackend(Backend):
"""
This backend can be used to intercept events from a joystick device if the device does not work with the standard
:class:`platypush.backend.joystick.JoystickBackend` backend (this may especially happen with some Bluetooth
joysticks that don't support the ``ioctl`` requests used by ``inputs``).
This backend only works on Linux, and it requires the ``joystick`` package to be installed.
**NOTE**: This backend can be quite slow, since it has to run another program (``jstest``) and parse its output.
Consider it as a last resort if your joystick works with neither :class:`platypush.backend.joystick.JoystickBackend`
nor :class:`platypush.backend.joystick.JoystickLinuxBackend`.
To test if your joystick is compatible, connect it to your device, check for its path (usually under
``/dev/input/js*``) and run::
$ jstest /dev/input/js[n]
"""
js_axes_regex = re.compile(r'Axes:\s+(((\d+):\s*([\-\d]+)\s*)+)')
js_buttons_regex = re.compile(r'Buttons:\s+(((\d+):\s*(on|off)\s*)+)')
js_axis_regex = re.compile(r'^\s*(\d+):\s*([\-\d]+)\s*(.*)')
js_button_regex = re.compile(r'^\s*(\d+):\s*(on|off)\s*(.*)')
def __init__(
self,
device: str = '/dev/input/js0',
jstest_path: str = '/usr/bin/jstest',
**kwargs,
):
"""
:param device: Path to the joystick device (default: ``/dev/input/js0``).
:param jstest_path: Path to the ``jstest`` executable that comes with the ``joystick`` system package
(default: ``/usr/bin/jstest``).
"""
super().__init__(device=device, **kwargs)
self.device = device
self.jstest_path = jstest_path
self._process: Optional[subprocess.Popen] = None
self._state: Optional[JoystickState] = None
def _wait_ready(self):
self.logger.info(f'Waiting for joystick device on {self.device}')
while not self.should_stop():
if not os.path.exists(self.device):
time.sleep(1)
try:
with open(self.device, 'rb'):
break
except Exception as e:
self.logger.debug(e)
time.sleep(0.1)
continue
self.bus.post(JoystickConnectedEvent(device=self.device))
def _read_states(self):
while not self.should_stop():
yield self._get_state()
def _get_state(self) -> JoystickState:
axes = []
buttons = []
line = ''
while os.path.exists(self.device) and not self.should_stop():
ch = self._process.stdout.read(1).decode()
if not ch:
continue
if ch in ['\r', '\n']:
line = ''
continue
line += ch
if line.endswith('Axes: '):
break
while (
os.path.exists(self.device)
and not self.should_stop()
and len(axes) < len(self._state.axes)
):
ch = ' '
while ch == ' ':
ch = self._process.stdout.read(1).decode()
self._process.stdout.read(len(f'{len(axes)}'))
value = ''
while os.path.exists(self.device) and not self.should_stop():
ch = self._process.stdout.read(1).decode()
if ch == ' ':
if not value:
continue
break
if ch == ':':
break
value += ch
if value:
axes.append(int(value))
line = ''
while os.path.exists(self.device) and not self.should_stop():
ch = self._process.stdout.read(1).decode()
if not ch:
continue
line += ch
if line.endswith('Buttons: '):
break
while (
os.path.exists(self.device)
and not self.should_stop()
and len(buttons) < len(self._state.buttons)
):
ch = ' '
while ch == ' ':
ch = self._process.stdout.read(1).decode()
self._process.stdout.read(len(f'{len(buttons)}'))
value = ''
while os.path.exists(self.device) and not self.should_stop():
ch = self._process.stdout.read(1).decode()
if ch == ' ':
continue
value += ch
if value in ['on', 'off']:
buttons.append(value == 'on')
break
return JoystickState(axes=axes, buttons=buttons)
def _initialize(self):
while (
self._process.poll() is None
and os.path.exists(self.device)
and not self.should_stop()
and not self._state
):
line = b''
ch = None
while ch not in [b'\r', b'\n']:
ch = self._process.stdout.read(1)
line += ch
line = line.decode().strip()
if not (line and line.startswith('Axes:')):
continue
re_axes = self.js_axes_regex.search(line)
re_buttons = self.js_buttons_regex.search(line)
if not (re_axes and re_buttons):
return
state = {
'axes': [],
'buttons': [],
}
axes = re_axes.group(1)
while axes:
m = self.js_axis_regex.search(axes)
state['axes'].append(int(m.group(2)))
axes = m.group(3)
buttons = re_buttons.group(1)
while buttons:
m = self.js_button_regex.search(buttons)
state['buttons'].append(m.group(2) == 'on')
buttons = m.group(3)
self._state = JoystickState(**state)
def _process_state(self, state: JoystickState):
diff = self._state - state
if not diff:
return
self.bus.post(JoystickStateEvent(device=self.device, **state.__dict__))
for button, pressed in diff.get('buttons', {}).items():
evt_class = (
JoystickButtonPressedEvent if pressed else JoystickButtonReleasedEvent
)
self.bus.post(evt_class(device=self.device, button=button))
for axis, value in diff.get('axes', {}).items():
self.bus.post(JoystickAxisEvent(device=self.device, axis=axis, value=value))
self._state = state
def run(self):
super().run()
try:
while not self.should_stop():
self._wait_ready()
with subprocess.Popen(
[self.jstest_path, '--normal', self.device], stdout=subprocess.PIPE
) as self._process:
self.logger.info('Device opened')
self._initialize()
if self._process.poll() is not None:
break
for state in self._read_states():
if self._process.poll() is not None or not os.path.exists(
self.device
):
self.logger.warning(f'Connection to {self.device} lost')
self.bus.post(JoystickDisconnectedEvent(self.device))
break
self._process_state(state)
finally:
self._process = None
# vim:sw=4:ts=4:et:

View File

@ -1,19 +0,0 @@
manifest:
events:
- platypush.message.event.joystick.JoystickAxisEvent
- platypush.message.event.joystick.JoystickButtonPressedEvent
- platypush.message.event.joystick.JoystickButtonReleasedEvent
- platypush.message.event.joystick.JoystickConnectedEvent
- platypush.message.event.joystick.JoystickDisconnectedEvent
- platypush.message.event.joystick.JoystickStateEvent
install:
apk:
- linuxconsoletools
apt:
- joystick
dnf:
- joystick
pacman:
- joyutils
package: platypush.backend.joystick.jstest
type: backend

View File

@ -1,206 +0,0 @@
import array
import struct
import time
from fcntl import ioctl
from typing import IO
from platypush.backend import Backend
from platypush.message.event.joystick import (
JoystickConnectedEvent,
JoystickDisconnectedEvent,
JoystickButtonPressedEvent,
JoystickButtonReleasedEvent,
JoystickAxisEvent,
)
class JoystickLinuxBackend(Backend):
"""
This backend intercepts events from joystick devices through the native Linux API implementation.
It is loosely based on https://gist.github.com/rdb/8864666, which itself uses the
`Linux kernel joystick API <https://www.kernel.org/doc/Documentation/input/joystick-api.txt>`_ to interact with
the devices.
"""
# These constants were borrowed from linux/input.h
axis_names = {
0x00: 'x',
0x01: 'y',
0x02: 'z',
0x03: 'rx',
0x04: 'ry',
0x05: 'rz',
0x06: 'throttle',
0x07: 'rudder',
0x08: 'wheel',
0x09: 'gas',
0x0A: 'brake',
0x10: 'hat0x',
0x11: 'hat0y',
0x12: 'hat1x',
0x13: 'hat1y',
0x14: 'hat2x',
0x15: 'hat2y',
0x16: 'hat3x',
0x17: 'hat3y',
0x18: 'pressure',
0x19: 'distance',
0x1A: 'tilt_x',
0x1B: 'tilt_y',
0x1C: 'tool_width',
0x20: 'volume',
0x28: 'misc',
}
button_names = {
0x120: 'trigger',
0x121: 'thumb',
0x122: 'thumb2',
0x123: 'top',
0x124: 'top2',
0x125: 'pinkie',
0x126: 'base',
0x127: 'base2',
0x128: 'base3',
0x129: 'base4',
0x12A: 'base5',
0x12B: 'base6',
0x12F: 'dead',
0x130: 'a',
0x131: 'b',
0x132: 'c',
0x133: 'x',
0x134: 'y',
0x135: 'z',
0x136: 'tl',
0x137: 'tr',
0x138: 'tl2',
0x139: 'tr2',
0x13A: 'select',
0x13B: 'start',
0x13C: 'mode',
0x13D: 'thumbl',
0x13E: 'thumbr',
0x220: 'dpad_up',
0x221: 'dpad_down',
0x222: 'dpad_left',
0x223: 'dpad_right',
# XBox 360 controller uses these codes.
0x2C0: 'dpad_left',
0x2C1: 'dpad_right',
0x2C2: 'dpad_up',
0x2C3: 'dpad_down',
}
def __init__(self, device: str = '/dev/input/js0', *args, **kwargs):
"""
:param device: Joystick device to monitor (default: ``/dev/input/js0``).
"""
super().__init__(*args, **kwargs)
self.device = device
self._axis_states = {}
self._button_states = {}
self._axis_map = []
self._button_map = []
def _init_joystick(self, dev: IO):
# Get the device name.
buf = array.array('B', [0] * 64)
ioctl(dev, 0x80006A13 + (0x10000 * len(buf)), buf) # JSIOCGNAME(len)
js_name = buf.tobytes().rstrip(b'\x00').decode('utf-8')
# Get number of axes and buttons.
buf = array.array('B', [0])
ioctl(dev, 0x80016A11, buf) # JSIOCGAXES
num_axes = buf[0]
buf = array.array('B', [0])
ioctl(dev, 0x80016A12, buf) # JSIOCGBUTTONS
num_buttons = buf[0]
# Get the axis map.
buf = array.array('B', [0] * 0x40)
ioctl(dev, 0x80406A32, buf) # JSIOCGAXMAP
for axis in buf[:num_axes]:
axis_name = self.axis_names.get(axis, 'unknown(0x%02x)' % axis)
self._axis_map.append(axis_name)
self._axis_states[axis_name] = 0.0
# Get the button map.
buf = array.array('H', [0] * 200)
ioctl(dev, 0x80406A34, buf) # JSIOCGBTNMAP
for btn in buf[:num_buttons]:
btn_name = self.button_names.get(btn, 'unknown(0x%03x)' % btn)
self._button_map.append(btn_name)
self._button_states[btn_name] = 0
self.bus.post(
JoystickConnectedEvent(
device=self.device,
name=js_name,
axes=self._axis_map,
buttons=self._button_map,
)
)
def run(self):
super().run()
self.logger.info(f'Opening {self.device}...')
while not self.should_stop():
# Open the joystick device.
try:
jsdev = open(self.device, 'rb') # noqa
self._init_joystick(jsdev)
except Exception as e:
self.logger.debug(
'Joystick device on %s not available: %s', self.device, e
)
time.sleep(5)
continue
try:
# Joystick event loop
while not self.should_stop():
try:
evbuf = jsdev.read(8)
if evbuf:
_, value, evt_type, number = struct.unpack('IhBB', evbuf)
if evt_type & 0x80: # Initial state notification
continue
if evt_type & 0x01:
button = self._button_map[number]
if button:
self._button_states[button] = value
evt_class = (
JoystickButtonPressedEvent
if value
else JoystickButtonReleasedEvent
)
# noinspection PyTypeChecker
self.bus.post(
evt_class(device=self.device, button=button)
)
if evt_type & 0x02:
axis = self._axis_map[number]
if axis:
fvalue = value / 32767.0
self._axis_states[axis] = fvalue
# noinspection PyTypeChecker
self.bus.post(
JoystickAxisEvent(
device=self.device, axis=axis, value=fvalue
)
)
except OSError as e:
self.logger.warning(f'Connection to {self.device} lost: {e}')
self.bus.post(JoystickDisconnectedEvent(device=self.device))
break
finally:
jsdev.close()

View File

@ -1,16 +0,0 @@
manifest:
events:
platypush.message.event.joystick.JoystickAxisEvent: when an axis value of the
joystick changes.
platypush.message.event.joystick.JoystickButtonPressedEvent: when a joystick button
is pressed.
platypush.message.event.joystick.JoystickButtonReleasedEvent: when a joystick
button is released.
platypush.message.event.joystick.JoystickConnectedEvent: when the joystick is
connected.
platypush.message.event.joystick.JoystickDisconnectedEvent: when the joystick
is disconnected.
install:
pip: []
package: platypush.backend.joystick.linux
type: backend

View File

@ -1,108 +1,46 @@
from abc import ABCMeta
from typing import Optional, Iterable, Union
from abc import ABC
from platypush.message.event import Event
class JoystickEvent(Event):
"""
Generic joystick event.
"""
def __init__(self, code, state, *args, **kwargs):
"""
:param code: Event code, usually the code of the source key/handle
:type code: str
:param state: State of the triggering element. Can be 0/1 for a button, -1/0/1 for an axis, a discrete integer
for an analog input etc.
:type state: int
"""
super().__init__(*args, code=code, state=state, **kwargs)
class _JoystickEvent(Event, ABCMeta):
class JoystickEvent(Event, ABC):
"""
Base joystick event class.
"""
def __init__(self, device: str, **kwargs):
super().__init__(device=device, **kwargs)
def __init__(self, device: dict, *args, **kwargs):
"""
:param device: Joystick device info as a dictionary:
.. schema:: joystick.JoystickDeviceSchema
"""
super().__init__(*args, device=device, **kwargs)
class JoystickConnectedEvent(_JoystickEvent):
class JoystickConnectedEvent(JoystickEvent):
"""
Event triggered upon joystick connection.
"""
def __init__(self,
*args,
name: Optional[str] = None,
axes: Optional[Iterable[Union[int, str]]] = None,
buttons: Optional[Iterable[Union[int, str]]] = None,
**kwargs):
"""
:param name: Device name.
:param axes: List of the device axes, as indices or names.
:param buttons: List of the device buttons, as indices or names.
"""
super().__init__(*args, name=name, axes=axes, buttons=buttons, **kwargs)
class JoystickDisconnectedEvent(_JoystickEvent):
class JoystickDisconnectedEvent(JoystickEvent):
"""
Event triggered upon joystick disconnection.
"""
class JoystickStateEvent(_JoystickEvent):
class JoystickStateEvent(JoystickEvent):
"""
Event triggered when the state of the joystick changes.
Base joystick state event class.
"""
def __init__(self, *args, axes: Iterable[int], buttons: Iterable[bool], **kwargs):
def __init__(self, *args, state: dict, **kwargs):
"""
:param axes: Joystick axes values, as a list of integer values.
:param buttons: Joystick buttons values, as a list of boolean values (True for pressed, False for released).
:param state: Joystick state as a dictionary:
.. schema:: joystick.JoystickStateSchema
"""
super().__init__(*args, axes=axes, buttons=buttons, **kwargs)
class JoystickButtonPressedEvent(_JoystickEvent):
"""
Event triggered when a joystick button is pressed.
"""
def __init__(self, *args, button: Union[int, str], **kwargs):
"""
:param button: Button index or name.
"""
super().__init__(*args, button=button, **kwargs)
class JoystickButtonReleasedEvent(_JoystickEvent):
"""
Event triggered when a joystick button is released.
"""
def __init__(self, *args, button: Union[int, str], **kwargs):
"""
:param button: Button index or name.
"""
super().__init__(*args, button=button, **kwargs)
class JoystickAxisEvent(_JoystickEvent):
"""
Event triggered when an axis value of the joystick changes.
"""
def __init__(self, *args, axis: Union[int, str], value: int, **kwargs):
"""
:param axis: Axis index or name.
:param value: Axis value.
"""
super().__init__(*args, axis=axis, value=value, **kwargs)
super().__init__(*args, state=state, **kwargs)
# vim:sw=4:ts=4:et:

View File

@ -1,156 +0,0 @@
from typing import List
from platypush.plugins import Plugin, action
class InputsPlugin(Plugin):
"""
This plugin emulates user input on a keyboard/mouse. It requires the a graphical server (X server or Mac/Win
interface) to be running - it won't work in console mode.
"""
@staticmethod
def _get_keyboard():
# noinspection PyPackageRequirements
from pykeyboard import PyKeyboard
return PyKeyboard()
@staticmethod
def _get_mouse():
# noinspection PyPackageRequirements
from pymouse import PyMouse
return PyMouse()
@classmethod
def _parse_key(cls, key: str):
k = cls._get_keyboard()
keymap = {
'ctrl': k.control_key,
'alt': k.alt_key,
'meta': k.windows_l_key,
'shift': k.shift_key,
'enter': k.enter_key,
'tab': k.tab_key,
'home': k.home_key,
'end': k.end_key,
'capslock': k.caps_lock_key,
'back': k.backspace_key,
'del': k.delete_key,
'up': k.up_key,
'down': k.down_key,
'left': k.left_key,
'right': k.right_key,
'pageup': k.page_up_key,
'pagedown': k.page_down_key,
'esc': k.escape_key,
'find': k.find_key,
'f1': k.function_keys[1],
'f2': k.function_keys[2],
'f3': k.function_keys[3],
'f4': k.function_keys[4],
'f5': k.function_keys[5],
'f6': k.function_keys[6],
'f7': k.function_keys[7],
'f8': k.function_keys[8],
'f9': k.function_keys[9],
'f10': k.function_keys[10],
'f11': k.function_keys[11],
'f12': k.function_keys[12],
'help': k.help_key,
'media_next': k.media_next_track_key,
'media_prev': k.media_prev_track_key,
'media_play': k.media_play_pause_key,
'menu': k.menu_key,
'numlock': k.num_lock_key,
'print': k.print_key,
'print_screen': k.print_screen_key,
'sleep': k.sleep_key,
'space': k.space_key,
'voldown': k.volume_down_key,
'volup': k.volume_up_key,
'volmute': k.volume_mute_key,
'zoom': k.zoom_key,
}
lkey = key.lower()
if lkey in keymap:
return keymap[lkey]
return key
@action
def press_key(self, key: str):
"""
Emulate the pressure of a key.
:param key: Key to be pressed
"""
kbd = self._get_keyboard()
key = self._parse_key(key)
kbd.press_key(key)
@action
def release_key(self, key: str):
"""
Release a pressed key.
:param key: Key to be released
"""
kbd = self._get_keyboard()
key = self._parse_key(key)
kbd.release_key(key)
@action
def press_keys(self, keys: List[str]):
"""
Emulate the pressure of multiple keys.
:param keys: List of keys to be pressed.
"""
kbd = self._get_keyboard()
keys = [self._parse_key(k) for k in keys]
kbd.press_keys(keys)
@action
def tap_key(self, key: str, repeat: int = 1, interval: float = 0):
"""
Emulate a key tap.
:param key: Key to be pressed
:param repeat: Number of iterations (default: 1)
:param interval: Repeat interval in seconds (default: 0)
"""
kbd = self._get_keyboard()
key = self._parse_key(key)
kbd.tap_key(key, n=repeat, interval=interval)
@action
def type_string(self, string: str, interval: float = 0):
"""
Type a string.
:param string: String to be typed
:param interval: Interval between key strokes in seconds (default: 0)
"""
kbd = self._get_keyboard()
kbd.type_string(string, interval=interval)
@action
def get_screen_size(self) -> List[int]:
"""
Get the size of the screen in pixels.
"""
m = self._get_mouse()
return [m.screen_size()]
@action
def mouse_click(self, x: int, y: int, btn: int, repeat: int = 1):
"""
Mouse click.
:param x: x screen position
:param y: y screen position
:param btn: Button number (1 for left, 2 for right, 3 for middle)
:param repeat: Number of clicks (default: 1)
"""
m = self._get_mouse()
m.click(x, y, btn, n=repeat)
# vim:sw=4:ts=4:et:

View File

@ -1,7 +0,0 @@
manifest:
events: {}
install:
pip:
- pyuserinput
package: platypush.plugins.inputs
type: plugin

View File

@ -0,0 +1,124 @@
import multiprocessing
import threading
from queue import Empty
from typing import Dict, List, Optional
from platypush.plugins import RunnablePlugin, action
from platypush.schemas.joystick import JoystickStatusSchema
from ._inputs import DeviceManager, GamePad
from ._manager import JoystickManager
class JoystickPlugin(RunnablePlugin):
"""
A plugin to monitor joystick events.
"""
def __init__(self, poll_interval: float = 0.025, **kwargs):
"""
:param poll_interval: Polling interval in seconds (default: 0.025)
"""
super().__init__(poll_interval=poll_interval, **kwargs)
self._dev_managers: Dict[str, JoystickManager] = {}
self._state_queue = multiprocessing.Queue()
self._state_monitor_thread: Optional[threading.Thread] = None
@staticmethod
def _key(device: GamePad) -> str:
return device.get_char_device_path()
def _start_manager(self, device: GamePad):
if device in self._dev_managers:
return
dev_manager = JoystickManager(
device=device,
poll_interval=self.poll_interval,
state_queue=self._state_queue,
)
dev_manager.start()
self._dev_managers[self._key(device)] = dev_manager
def _stop_manager(self, device: str):
dev_manager = self._dev_managers.get(device)
if not dev_manager:
return
dev_manager.stop()
dev_manager.join(1)
if dev_manager.is_alive():
dev_manager.kill()
del self._dev_managers[device]
def _state_monitor(self):
while not self.should_stop():
try:
state = self._state_queue.get(timeout=0.5)
device = state.device
if device not in self._dev_managers:
continue
self._dev_managers[device].state = state.state
except Empty:
pass
except Exception as e:
self.logger.exception(e)
def main(self):
if not self._state_monitor_thread:
self._state_monitor_thread = threading.Thread(
target=self._state_monitor, daemon=True
)
self._state_monitor_thread.start()
while not self.should_stop():
try:
devices = DeviceManager().gamepads
missing_devices_keys = set(self._dev_managers.keys()) - {
self._key(dev) for dev in devices
}
new_devices = {
dev for dev in devices if self._key(dev) not in self._dev_managers
}
# Stop managers for devices that are no longer connected
for dev in missing_devices_keys:
self._stop_manager(dev)
# Start managers for new devices
for dev in new_devices:
self._start_manager(dev)
except Exception as e:
self.logger.exception(e)
finally:
self.wait_stop(max(0.5, min(10, (self.poll_interval or 0) * 10)))
def stop(self):
for dev in list(self._dev_managers.keys()):
self._stop_manager(dev)
super().stop()
@action
def status(self) -> List[dict]:
"""
:return: .. schema:: joystick.JoystickStatusSchema(many=True)
"""
return JoystickStatusSchema().dump(
[
{
'device': dev.device,
'state': dev.state,
}
for dev in self._dev_managers.values()
],
many=True,
)
# vim:sw=4:ts=4:et:

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,175 @@
from dataclasses import asdict
import multiprocessing
from threading import Timer
from typing import Optional, Type
from time import time
from platypush.context import get_bus
from platypush.message.event.joystick import (
JoystickConnectedEvent,
JoystickDisconnectedEvent,
JoystickEvent,
JoystickStateEvent,
)
from platypush.schemas.joystick import JoystickDeviceSchema
from ._inputs import GamePad, InputEvent, UnpluggedError
from ._state import ConnectedState, JoystickDeviceState, JoystickState
class JoystickManager(multiprocessing.Process):
"""
A process that monitors and publishes joystick events.
"""
MAX_TRIG_VAL = 1 << 8
MAX_JOY_VAL = 1 << 15
THROTTLE_INTERVAL = 0.2
def __init__(
self,
device: GamePad,
poll_interval: Optional[float],
state_queue: multiprocessing.Queue,
):
super().__init__()
self.device = device
self.poll_interval = poll_interval
self.state = JoystickState()
self._state_queue = state_queue
self._connected_state = ConnectedState.UNKNOWN
self._stop_evt = multiprocessing.Event()
self._state_throttler: Optional[Timer] = None
self._state_timestamp: float = 0
@property
def should_stop(self):
return self._stop_evt.is_set()
def wait_stop(self, timeout: Optional[float] = None):
self._stop_evt.wait(timeout=timeout or self.poll_interval)
def _enqueue_state(self):
now = time()
self._state_queue.put_nowait(
JoystickDeviceState(
device=self.device.get_char_device_path(), state=self.state
)
)
if now - self._state_timestamp >= self.THROTTLE_INTERVAL:
self._post_state()
self._state_timestamp = now
return
self._state_timestamp = now
self._state_throttler = Timer(self.THROTTLE_INTERVAL, self._post_state)
self._state_throttler.start()
def _stop_state_throrttler(self):
if self._state_throttler:
self._state_throttler.cancel()
self._state_throttler = None
def _post_state(self):
self._post_event(JoystickStateEvent, state=asdict(self.state))
self._stop_state_throrttler()
def _parse_event(self, event: InputEvent): # pylint: disable=too-many-branches
"""
Solution adapted from https://stackoverflow.com/questions/46506850.
"""
if event.code == "ABS_Y":
# normalize between -1 and 1
self.state.left_joystick_y = event.state / self.MAX_JOY_VAL
elif event.code == "ABS_X":
# normalize between -1 and 1
self.state.left_joystick_x = event.state / self.MAX_JOY_VAL
elif event.code == "ABS_RY":
# normalize between -1 and 1
self.state.right_joystick_y = event.state / self.MAX_JOY_VAL
elif event.code == "ABS_RX":
# normalize between -1 and 1
self.state.right_joystick_x = event.state / self.MAX_JOY_VAL
elif event.code == "ABS_Z":
# normalize between 0 and 1
self.state.left_trigger = event.state / self.MAX_TRIG_VAL
elif event.code == "ABS_RZ":
# normalize between 0 and 1
self.state.right_trigger = event.state / self.MAX_TRIG_VAL
elif event.code == "BTN_TL":
self.state.left_bumper = event.state
elif event.code == "BTN_TR":
self.state.right_bumper = event.state
elif event.code == "BTN_SOUTH":
self.state.a = event.state
elif event.code == "BTN_NORTH":
# previously switched with X
self.state.y = event.state
elif event.code == "BTN_WEST":
# previously switched with Y
self.state.x = event.state
elif event.code == "BTN_EAST":
self.state.b = event.state
elif event.code == "BTN_THUMBL":
self.state.left_thumb = event.state
elif event.code == "BTN_THUMBR":
self.state.right_thumb = event.state
elif event.code == "BTN_SELECT":
self.state.back = event.state
elif event.code == "BTN_START":
self.state.start = event.state
elif event.code == "BTN_TRIGGER_HAPPY1":
self.state.left_dir_pad = event.state
elif event.code == "BTN_TRIGGER_HAPPY2":
self.state.right_dir_pad = event.state
elif event.code == "BTN_TRIGGER_HAPPY3":
self.state.up_dir_pad = event.state
elif event.code == "BTN_TRIGGER_HAPPY4":
self.state.down_dir_pad = event.state
def _post_event(
self, type: Type[JoystickEvent], **kwargs # pylint: disable=redefined-builtin
):
get_bus().post(
type(device=dict(JoystickDeviceSchema().dump(self.device)), **kwargs)
)
def _on_connect(self):
if self._connected_state != ConnectedState.CONNECTED:
self._connected_state = ConnectedState.CONNECTED
self._post_event(JoystickConnectedEvent)
def _on_disconnect(self):
if self._connected_state != ConnectedState.DISCONNECTED:
self._connected_state = ConnectedState.DISCONNECTED
self._post_event(JoystickDisconnectedEvent)
def _loop(self):
try:
for event in self.device.read():
self._on_connect()
prev_state = asdict(self.state)
self._parse_event(event)
new_state = asdict(self.state)
if prev_state != new_state:
self._enqueue_state()
except (UnpluggedError, OSError):
self._on_disconnect()
finally:
self.wait_stop(self.poll_interval)
def run(self):
try:
while not self.should_stop:
try:
self._loop()
except KeyboardInterrupt:
break
finally:
self._on_disconnect()
def stop(self):
self._stop_evt.set()
self._stop_state_throrttler()

View File

@ -0,0 +1,50 @@
from dataclasses import dataclass
from enum import Enum
class ConnectedState(Enum):
"""
Enum to represent the connection state of a joystick.
"""
UNKNOWN = 0
CONNECTED = 1
DISCONNECTED = 2
@dataclass
class JoystickState:
"""
Dataclass that holds the joystick state properties.
"""
left_joystick_y: float = 0
left_joystick_x: float = 0
right_joystick_y: float = 0
right_joystick_x: float = 0
left_trigger: float = 0
right_trigger: float = 0
left_bumper: int = 0
right_bumper: int = 0
a: int = 0
x: int = 0
y: int = 0
b: int = 0
left_thumb: int = 0
right_thumb: int = 0
back: int = 0
start: int = 0
left_dir_pad: int = 0
right_dir_pad: int = 0
up_dir_pad: int = 0
down_dir_pad: int = 0
@dataclass
class JoystickDeviceState:
"""
Dataclass that holds the joystick device state properties.
"""
device: str
state: JoystickState

View File

@ -1,8 +1,6 @@
manifest:
events:
platypush.message.event.joystick.JoystickEvent: when a new joystick event is received
install:
pip:
- inputs
install: {}
package: platypush.backend.joystick
type: backend

View File

@ -287,6 +287,9 @@ class MediaChromecastPlugin(MediaPlugin, RunnablePlugin):
@action
def stop(self, *_, chromecast: Optional[str] = None, **__):
chromecast = chromecast or self.chromecast
if not chromecast:
return
cast = self.get_chromecast(chromecast)
cast.media_controller.stop()
cast.wait()

View File

@ -0,0 +1,199 @@
from marshmallow import EXCLUDE, fields
from marshmallow.schema import Schema
class JoystickStateSchema(Schema):
"""
Joystick state schema.
"""
left_joystick_y = fields.Float(
metadata={
'description': 'Left joystick Y axis value',
'example': 0.5,
}
)
left_joystick_x = fields.Float(
metadata={
'description': 'Left joystick X axis value',
'example': 0.5,
}
)
right_joystick_y = fields.Float(
metadata={
'description': 'Right joystick Y axis value',
'example': 0.5,
}
)
right_joystick_x = fields.Float(
metadata={
'description': 'Right joystick X axis value',
'example': 0.5,
}
)
left_trigger = fields.Float(
metadata={
'description': 'Left trigger value',
'example': 0.5,
}
)
right_trigger = fields.Float(
metadata={
'description': 'Right trigger value',
'example': 0.5,
}
)
left_bumper = fields.Integer(
metadata={
'description': 'Left bumper state',
'example': 1,
}
)
right_bumper = fields.Integer(
metadata={
'description': 'Right bumper state',
'example': 1,
}
)
a = fields.Integer(
metadata={
'description': 'A button state',
'example': 1,
}
)
x = fields.Integer(
metadata={
'description': 'X button state',
'example': 1,
}
)
y = fields.Integer(
metadata={
'description': 'Y button state',
'example': 1,
}
)
b = fields.Integer(
metadata={
'description': 'B button state',
'example': 1,
}
)
left_thumb = fields.Integer(
metadata={
'description': 'Left thumb button state',
'example': 1,
}
)
right_thumb = fields.Integer(
metadata={
'description': 'Right thumb button state',
'example': 1,
}
)
back = fields.Integer(
metadata={
'description': 'Back button state',
'example': 1,
}
)
start = fields.Integer(
metadata={
'description': 'Start button state',
'example': 1,
}
)
left_dir_pad = fields.Integer(
metadata={
'description': 'Left direction pad button state',
'example': 1,
}
)
right_dir_pad = fields.Integer(
metadata={
'description': 'Right direction pad button state',
'example': 1,
}
)
up_dir_pad = fields.Integer(
metadata={
'description': 'Up direction pad button state',
'example': 1,
}
)
down_dir_pad = fields.Integer(
metadata={
'description': 'Down direction pad button state',
'example': 1,
}
)
class JoystickDeviceSchema(Schema):
"""
Joystick device schema.
"""
class Meta:
"""
Meta class.
"""
unknown = EXCLUDE
name = fields.String(
metadata={
'description': 'Joystick name',
'example': 'Xbox 360 Controller',
}
)
path = fields.Function(
lambda obj: obj.get_char_device_path(),
metadata={
'description': 'Joystick character device path',
'example': '/dev/input/event0',
},
)
number = fields.Integer(
metadata={
'description': 'Joystick number',
'example': 0,
}
)
protocol = fields.String(
metadata={
'description': 'Joystick protocol',
'example': 'usb',
}
)
class JoystickStatusSchema(Schema):
"""
Joystick status schema.
"""
device = fields.Nested(JoystickDeviceSchema, required=True)
state = fields.Nested(JoystickStateSchema, required=True)