import json import os import re import subprocess import time from typing import Optional, List from platypush.backend import Backend from platypush.message.event.joystick import JoystickConnectedEvent, JoystickDisconnectedEvent, JoystickStateEvent, \ JoystickButtonPressedEvent, JoystickButtonReleasedEvent, JoystickAxisEvent class JoystickState: def __init__(self, axes: List[int], buttons: List[bool]): self.axes = axes self.buttons = buttons def __str__(self): return json.dumps(self.__dict__) def __eq__(self, obj): return obj.axes == self.axes and obj.buttons == self.buttons def __sub__(self, obj) -> dict: if len(obj.axes) < len(self.axes) or len(obj.buttons) < len(self.buttons): return {} diff = { 'axes': { axis: obj.axes[axis] for axis in range(len(self.axes)) if self.axes[axis] != obj.axes[axis] }, 'buttons': { button: obj.buttons[button] for button in range(len(self.buttons)) if self.buttons[button] != obj.buttons[button] }, } return { k: v for k, v in diff.items() if v } class JoystickJstestBackend(Backend): """ This backend can be used to intercept events from a joystick device if the device does not work with the standard :class:`platypush.backend.joystick.JoystickBackend` backend (this may especially happen with some Bluetooth joysticks that don't support the ``ioctl`` requests used by ``inputs``). This backend only works on Linux and it requires the ``joystick`` package to be installed. Instructions on Debian-based distros:: # apt-get install joystick Instructions on Arch-based distros:: # pacman -S joyutils To test if your joystick is compatible, connect it to your device, check for its path (usually under ``/dev/input/js*``) and run:: $ jstest /dev/input/js[n] Triggers: * :class:`platypush.message.event.joystick.JoystickConnectedEvent` when the joystick is connected. * :class:`platypush.message.event.joystick.JoystickDisconnectedEvent` when the joystick is disconnected. * :class:`platypush.message.event.joystick.JoystickStateEvent` when the state of the joystick (i.e. some of its axes or buttons values) changes. * :class:`platypush.message.event.joystick.JoystickButtonPressedEvent` when a joystick button is pressed. * :class:`platypush.message.event.joystick.JoystickButtonReleasedEvent` when a joystick button is released. * :class:`platypush.message.event.joystick.JoystickAxisEvent` when an axis value of the joystick changes. """ js_axes_regex = re.compile(r'Axes:\s+(((\d+):\s*([\-\d]+)\s*)+)') js_buttons_regex = re.compile(r'Buttons:\s+(((\d+):\s*(on|off)\s*)+)') js_axis_regex = re.compile(r'^\s*(\d+):\s*([\-\d]+)\s*(.*)') js_button_regex = re.compile(r'^\s*(\d+):\s*(on|off)\s*(.*)') def __init__(self, device: str = '/dev/input/js0', jstest_path: str = '/usr/bin/jstest', **kwargs): """ :param device: Path to the joystick device (default: ``/dev/input/js0``). :param jstest_path: Path to the ``jstest`` executable that comes with the ``joystick`` system package (default: ``/usr/bin/jstest``). """ super().__init__(device=device, **kwargs) self.device = device self.jstest_path = jstest_path self._process: Optional[subprocess.Popen] = None self._state: Optional[JoystickState] = None def _wait_ready(self): self.logger.info(f'Waiting for joystick device on {self.device}') while not self.should_stop(): if not os.path.exists(self.device): time.sleep(1) try: with open(self.device, 'rb'): break except Exception as e: self.logger.debug(e) time.sleep(0.1) continue self.bus.post(JoystickConnectedEvent(device=self.device)) def _read_states(self): while not self.should_stop(): yield self._get_state() def _get_state(self) -> JoystickState: axes = [] buttons = [] line = '' while os.path.exists(self.device) and not self.should_stop(): ch = self._process.stdout.read(1).decode() if not ch: continue if ch in ['\r', '\n']: line = '' continue line += ch if line.endswith('Axes: '): break while os.path.exists(self.device) and not self.should_stop() and len(axes) < len(self._state.axes): ch = ' ' while ch == ' ': ch = self._process.stdout.read(1).decode() self._process.stdout.read(len(f'{len(axes)}')) value = '' while os.path.exists(self.device) and not self.should_stop(): ch = self._process.stdout.read(1).decode() if ch == ' ': if not value: continue break if ch == ':': break value += ch if value: axes.append(int(value)) line = '' while os.path.exists(self.device) and not self.should_stop(): ch = self._process.stdout.read(1).decode() if not ch: continue line += ch if line.endswith('Buttons: '): break while os.path.exists(self.device) and not self.should_stop() and len(buttons) < len(self._state.buttons): ch = ' ' while ch == ' ': ch = self._process.stdout.read(1).decode() self._process.stdout.read(len(f'{len(buttons)}')) value = '' while os.path.exists(self.device) and not self.should_stop(): ch = self._process.stdout.read(1).decode() if ch == ' ': continue value += ch if value in ['on', 'off']: buttons.append(value == 'on') break return JoystickState(axes=axes, buttons=buttons) def _initialize(self): while self._process.poll() is None and \ os.path.exists(self.device) and \ not self.should_stop() and \ not self._state: line = b'' ch = None while ch not in [b'\r', b'\n']: ch = self._process.stdout.read(1) line += ch line = line.decode().strip() if not (line and line.startswith('Axes:')): continue re_axes = self.js_axes_regex.search(line) re_buttons = self.js_buttons_regex.search(line) if not (re_axes and re_buttons): return state = { 'axes': [], 'buttons': [], } axes = re_axes.group(1) while axes: m = self.js_axis_regex.search(axes) state['axes'].append(int(m.group(2))) axes = m.group(3) buttons = re_buttons.group(1) while buttons: m = self.js_button_regex.search(buttons) state['buttons'].append(m.group(2) == 'on') buttons = m.group(3) self._state = JoystickState(**state) def _process_state(self, state: JoystickState): diff = self._state - state if not diff: return self.bus.post(JoystickStateEvent(device=self.device, **state.__dict__)) for button, pressed in diff.get('buttons', {}).items(): evt_class = JoystickButtonPressedEvent if pressed else JoystickButtonReleasedEvent self.bus.post(evt_class(device=self.device, button=button)) for axis, value in diff.get('axes', {}).items(): self.bus.post(JoystickAxisEvent(device=self.device, axis=axis, value=value)) self._state = state def run(self): super().run() try: while not self.should_stop(): self._wait_ready() with subprocess.Popen( [self.jstest_path, '--normal', self.device], stdout=subprocess.PIPE) as self._process: self.logger.info('Device opened') self._initialize() if self._process.poll() is not None: break for state in self._read_states(): if self._process.poll() is not None or not os.path.exists(self.device): self.logger.warning(f'Connection to {self.device} lost') self.bus.post(JoystickDisconnectedEvent(self.device)) break self._process_state(state) finally: self._process = None # vim:sw=4:ts=4:et: