platypush/platypush/backend/joystick/jstest.py

263 lines
8.4 KiB
Python

import json
import os
import re
import subprocess
import time
from typing import Optional, List
from platypush.backend.joystick import JoystickBackend
from platypush.message.event.joystick import JoystickConnectedEvent, JoystickDisconnectedEvent, JoystickStateEvent, \
JoystickButtonPressedEvent, JoystickButtonReleasedEvent, JoystickAxisEvent
class JoystickState:
def __init__(self, axes: List[int], buttons: List[bool]):
self.axes = axes
self.buttons = buttons
def __str__(self):
return json.dumps(self.__dict__)
def __eq__(self, obj):
return obj.axes == self.axes and obj.buttons == self.buttons
def __sub__(self, obj) -> dict:
diff = {
'axes': {
axis: obj.axes[axis]
for axis in range(len(self.axes))
if self.axes[axis] != obj.axes[axis]
},
'buttons': {
button: obj.buttons[button]
for button in range(len(self.buttons))
if self.buttons[button] != obj.buttons[button]
},
}
return {
k: v for k, v in diff.items() if v
}
class JoystickJstestBackend(JoystickBackend):
"""
This backend can be used to intercept events from a joystick device if the device does not work with the standard
:class:`platypush.backend.joystick.JoystickBackend` backend (this may especially happen with some Bluetooth
joysticks that don't support the ``ioctl`` requests used by ``inputs``).
This backend only works on Linux and it requires the ``joystick`` package to be installed.
Instructions on Debian-based distros::
# apt-get install joystick
Instructions on Arch-based distros::
# pacman -S joyutils
To test if your joystick is compatible, connect it to your device, check for its path (usually under
``/dev/input/js*``) and run::
$ jstest /dev/input/js[n]
Triggers:
* :class:`platypush.message.event.joystick.JoystickConnectedEvent` when the joystick is connected.
* :class:`platypush.message.event.joystick.JoystickDisconnectedEvent` when the joystick is disconnected.
* :class:`platypush.message.event.joystick.JoystickStateEvent` when the state of the joystick (i.e. some of its
axes or buttons values) changes.
* :class:`platypush.message.event.joystick.JoystickButtonPressedEvent` when a joystick button is pressed.
* :class:`platypush.message.event.joystick.JoystickButtonReleasedEvent` when a joystick button is released.
* :class:`platypush.message.event.joystick.JoystickAxisEvent` when an axis value of the joystick changes.
"""
js_axes_regex = re.compile(r'Axes:\s+(((\d+):\s*([\-\d]+)\s*)+)')
js_buttons_regex = re.compile(r'Buttons:\s+(((\d+):\s*(on|off)\s*)+)')
js_axis_regex = re.compile(r'^\s*(\d+):\s*([\-\d]+)\s*(.*)')
js_button_regex = re.compile(r'^\s*(\d+):\s*(on|off)\s*(.*)')
def __init__(self,
device: str = '/dev/input/js0',
jstest_path: str = '/usr/bin/jstest',
**kwargs):
"""
:param device: Path to the joystick device (default: ``/dev/input/js0``).
:param jstest_path: Path to the ``jstest`` executable that comes with the ``joystick`` system package
(default: ``/usr/bin/jstest``).
"""
super().__init__(**kwargs)
self.device_path = device
self.jstest_path = jstest_path
self._process: Optional[subprocess.Popen] = None
self._state: Optional[JoystickState] = None
def _wait_ready(self):
self.logger.info(f'Waiting for joystick device on {self.device_path}')
while not self.should_stop() or not os.path.exists(self.device_path):
time.sleep(1)
if self.should_stop():
return
self.bus.post(JoystickConnectedEvent(device=self.device))
def _read_states(self):
while not self.should_stop():
yield self._get_state()
def _get_state(self) -> JoystickState:
axes = []
buttons = []
line = ''
while not self.should_stop():
ch = self._process.stdout.read(1).decode()
if not ch:
continue
if ch in ['\r', '\n']:
line = ''
continue
line += ch
if line.endswith('Axes: '):
break
while not self.should_stop() and len(axes) < len(self._state.axes):
ch = ' '
while ch == ' ':
ch = self._process.stdout.read(1).decode()
self._process.stdout.read(len(f'{len(axes)}'))
value = ''
while not self.should_stop():
ch = self._process.stdout.read(1).decode()
if ch == ' ':
if not value:
continue
break
if ch == ':':
break
value += ch
if value:
axes.append(int(value))
line = ''
while not self.should_stop():
ch = self._process.stdout.read(1).decode()
if not ch:
continue
line += ch
if line.endswith('Buttons: '):
break
while not self.should_stop() and len(buttons) < len(self._state.buttons):
ch = ' '
while ch == ' ':
ch = self._process.stdout.read(1).decode()
self._process.stdout.read(len(f'{len(buttons)}'))
value = ''
while not self.should_stop():
ch = self._process.stdout.read(1).decode()
if ch == ' ':
continue
value += ch
if value in ['on', 'off']:
buttons.append(value == 'on')
break
return JoystickState(axes=axes, buttons=buttons)
def _initialize(self):
while not self.should_stop() and not self._state:
line = b''
ch = None
while ch not in [b'\r', b'\n']:
ch = self._process.stdout.read(1)
line += ch
line = line.decode().strip()
if not (line and line.startswith('Axes:')):
continue
re_axes = self.js_axes_regex.search(line)
re_buttons = self.js_buttons_regex.search(line)
if not (re_axes and re_buttons):
return
state = {
'axes': [],
'buttons': [],
}
axes = re_axes.group(1)
while axes:
m = self.js_axis_regex.search(axes)
state['axes'].append(int(m.group(2)))
axes = m.group(3)
buttons = re_buttons.group(1)
while buttons:
m = self.js_button_regex.search(buttons)
state['buttons'].append(m.group(2) == 'on')
buttons = m.group(3)
self._state = JoystickState(**state)
def _process_state(self, state: JoystickState):
diff = self._state - state
if not diff:
return
self.bus.post(JoystickStateEvent(device=self.device, **state.__dict__))
for button, pressed in diff.get('buttons', {}).items():
evt_class = JoystickButtonPressedEvent if pressed else JoystickButtonReleasedEvent
self.bus.post(evt_class(device=self.device, button=button))
for axis, value in diff.get('buttons', {}).items():
self.bus.post(JoystickAxisEvent(device=self.device, axis=axis, value=value))
self._state = state
def run(self):
super().run()
try:
while not self.should_stop():
self._wait_ready()
with subprocess.Popen(
[self.jstest_path, '--normal', self.device_path],
stdout=subprocess.PIPE) as self._process:
self.logger.info('Device opened')
self._initialize()
for state in self._read_states():
self._process_state(state)
if not os.path.exists(self.device_path):
self.logger.warning(f'Connection to {self.device_path} lost')
self.bus.post(JoystickDisconnectedEvent(self.device_path))
break
finally:
self._process = None
# vim:sw=4:ts=4:et: