forked from platypush/platypush
Added HID plugin to support interaction with generic HID devices
This commit is contained in:
parent
dd3c4b10c7
commit
a77206800d
9 changed files with 413 additions and 0 deletions
13
CHANGELOG.md
13
CHANGELOG.md
|
@ -4,6 +4,19 @@ All notable changes to this project will be documented in this file.
|
|||
Given the high speed of development in the first phase, changes are being
|
||||
reported only starting from v0.20.2.
|
||||
|
||||
## [Unreleased]
|
||||
|
||||
### Added
|
||||
|
||||
- Added `hid` plugin to support discoverability and data interaction with
|
||||
generic HID devices - like Bluetooth/USB peripherals, joysticks, dongles and
|
||||
any other type of devices that supports the HID interface.
|
||||
|
||||
### Fixed
|
||||
|
||||
- Running the Zeroconf registration logic in another thread in `backend.http`,
|
||||
so failures in the Zeroconf logic don't affect the startup of the web server.
|
||||
|
||||
## [0.24.4] - 2022-12-20
|
||||
|
||||
### Fixed
|
||||
|
|
|
@ -30,6 +30,7 @@ Events
|
|||
platypush/events/gotify.rst
|
||||
platypush/events/gpio.rst
|
||||
platypush/events/gps.rst
|
||||
platypush/events/hid.rst
|
||||
platypush/events/http.rst
|
||||
platypush/events/http.hook.rst
|
||||
platypush/events/http.rss.rst
|
||||
|
|
5
docs/source/platypush/events/hid.rst
Normal file
5
docs/source/platypush/events/hid.rst
Normal file
|
@ -0,0 +1,5 @@
|
|||
``hid``
|
||||
=======
|
||||
|
||||
.. automodule:: platypush.message.event.hid
|
||||
:members:
|
5
docs/source/platypush/plugins/hid.rst
Normal file
5
docs/source/platypush/plugins/hid.rst
Normal file
|
@ -0,0 +1,5 @@
|
|||
``hid``
|
||||
=======
|
||||
|
||||
.. automodule:: platypush.plugins.hid
|
||||
:members:
|
|
@ -57,6 +57,7 @@ Plugins
|
|||
platypush/plugins/gpio.sensor.motion.pmw3901.rst
|
||||
platypush/plugins/gpio.zeroborg.rst
|
||||
platypush/plugins/graphite.rst
|
||||
platypush/plugins/hid.rst
|
||||
platypush/plugins/http.request.rst
|
||||
platypush/plugins/http.request.rss.rst
|
||||
platypush/plugins/http.webpage.rst
|
||||
|
|
56
platypush/message/event/hid.py
Normal file
56
platypush/message/event/hid.py
Normal file
|
@ -0,0 +1,56 @@
|
|||
from platypush.message.event import Event
|
||||
|
||||
|
||||
class HidBaseEvent(Event):
|
||||
"""
|
||||
Base class for HID events.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*args,
|
||||
path: str,
|
||||
serial_number: str,
|
||||
vendor_id: int,
|
||||
product_id: int,
|
||||
product_string: str,
|
||||
manufacturer_string: str,
|
||||
**kwargs
|
||||
):
|
||||
super().__init__(
|
||||
*args,
|
||||
path=path,
|
||||
serial_number=serial_number,
|
||||
vendor_id=vendor_id,
|
||||
product_id=product_id,
|
||||
product_string=product_string,
|
||||
manufacturer_string=manufacturer_string,
|
||||
**kwargs
|
||||
)
|
||||
|
||||
|
||||
class HidDeviceConnectedEvent(HidBaseEvent):
|
||||
"""
|
||||
Event triggered when a device is discovered.
|
||||
"""
|
||||
|
||||
|
||||
class HidDeviceDisconnectedEvent(HidBaseEvent):
|
||||
"""
|
||||
Event triggered when a device is disconnected.
|
||||
"""
|
||||
|
||||
|
||||
class HidDeviceDataEvent(HidBaseEvent):
|
||||
"""
|
||||
Event triggered when a monitored device sends some data.
|
||||
"""
|
||||
|
||||
def __init__(self, *args, data: str, **kwargs):
|
||||
"""
|
||||
:param data: Hex-encoded representation of the received data.
|
||||
"""
|
||||
super().__init__(*args, data=data, **kwargs)
|
||||
|
||||
|
||||
# vim:sw=4:ts=4:et:
|
234
platypush/plugins/hid/__init__.py
Normal file
234
platypush/plugins/hid/__init__.py
Normal file
|
@ -0,0 +1,234 @@
|
|||
from collections.abc import Iterable
|
||||
from multiprocessing import Process
|
||||
from typing import Dict, List, Optional
|
||||
from time import sleep
|
||||
|
||||
import hid
|
||||
|
||||
from platypush.context import get_bus
|
||||
from platypush.message.event.hid import (
|
||||
HidDeviceConnectedEvent,
|
||||
HidDeviceDataEvent,
|
||||
HidDeviceDisconnectedEvent,
|
||||
)
|
||||
from platypush.plugins import RunnablePlugin, action
|
||||
from platypush.schemas.hid import HidDeviceSchema, HidMonitoredDeviceSchema
|
||||
|
||||
|
||||
class HidPlugin(RunnablePlugin):
|
||||
"""
|
||||
This plugin can be used to interact directly with HID devices (including
|
||||
Bluetooth, USB and several serial and wireless devices) over the raw
|
||||
interface.
|
||||
|
||||
This is the preferred way of communicating with joypads.
|
||||
|
||||
Note that on Linux reading from the devices requires the user running the
|
||||
Platypush service to have (at least) read access to the ``/dev/hidraw*``
|
||||
devices. However, it is still possible to get connected/disconnected events
|
||||
even without having to open a connection to the device. A way to make HID
|
||||
raw devices accessible to e.g. a particular user group is via udev rules.
|
||||
For example, create a file named ``/etc/udev/rules.d/99-hid.rules`` with
|
||||
the following content::
|
||||
|
||||
# Make all /dev/hidraw* devices accessible in read/write to users in
|
||||
# the group input
|
||||
KERNEL=="hidraw*", GROUP="input", MODE="0660"
|
||||
|
||||
A more granular solution is to provide read and/or write access only to a
|
||||
specific device that you want to access, and only to the specific user
|
||||
running the Platypush service::
|
||||
|
||||
KERNEL=="hidraw*", ATTRS{idVendor}=="1234", ATTRS{idProduct}=="5678", USER="user", MODE="0660"
|
||||
|
||||
If you don't want to reboot the device after adding the rules, then you can
|
||||
reload the rules for udev service and re-trigger them::
|
||||
|
||||
# udevadm control --reload && udevadm trigger
|
||||
|
||||
Triggers:
|
||||
|
||||
* :class:`platypush.message.event.hid.HidDeviceConnectedEvent` when a
|
||||
device is connected
|
||||
* :class:`platypush.message.event.hid.HidDeviceDisconnectedEvent` when
|
||||
a previously available device is disconnected
|
||||
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
monitored_devices: Optional[Iterable[dict]] = None,
|
||||
poll_seconds: int = 1,
|
||||
**kwargs,
|
||||
):
|
||||
"""
|
||||
:param monitored_devices: Map of devices that should be monitored for
|
||||
new data. Format (note that all the device filtering attributes are
|
||||
optional):
|
||||
|
||||
.. schema:: hid.HidMonitoredDeviceSchema(many=True)
|
||||
|
||||
:param poll_seconds: How often the plugin should check for changes in
|
||||
the list of devices (default: 1 second).
|
||||
"""
|
||||
super().__init__(**kwargs)
|
||||
self._poll_seconds = poll_seconds
|
||||
self._filters = HidMonitoredDeviceSchema().load(
|
||||
monitored_devices or [], many=True
|
||||
)
|
||||
|
||||
self._device_monitors: Dict[str, Process] = {}
|
||||
self._available_devices: Dict[str, dict] = {}
|
||||
|
||||
def main(self):
|
||||
while not self.should_stop():
|
||||
scanned_devices = {
|
||||
dev['path']: dev for dev in self.get_devices().output # type: ignore
|
||||
}
|
||||
|
||||
self._handle_device_events(scanned_devices)
|
||||
self._available_devices = scanned_devices
|
||||
if self._poll_seconds and self._poll_seconds > 0:
|
||||
sleep(self._poll_seconds)
|
||||
|
||||
def stop(self):
|
||||
device_monitors = self._device_monitors.copy()
|
||||
for monitored_device in device_monitors:
|
||||
self._unregister_device_monitor(monitored_device)
|
||||
|
||||
super().stop()
|
||||
|
||||
@action
|
||||
def get_devices(self) -> List[dict]:
|
||||
"""
|
||||
Get the HID devices available on the host.
|
||||
|
||||
:return: .. schema:: hid.HidDeviceSchema(many=True)
|
||||
"""
|
||||
return list( # type: ignore
|
||||
{
|
||||
dev['path']: HidDeviceSchema().load(dev)
|
||||
for dev in hid.enumerate() # type: ignore
|
||||
}.values()
|
||||
)
|
||||
|
||||
def _get_monitor_rule(self, device: dict) -> Optional[dict]:
|
||||
"""
|
||||
:return: .. schema:: hid.HidMonitoredDeviceSchema
|
||||
"""
|
||||
matching_rules = [
|
||||
rule
|
||||
for rule in (self._filters or [])
|
||||
if all(
|
||||
rule[attr] == device.get(attr)
|
||||
for attr in (
|
||||
'path',
|
||||
'serial_number',
|
||||
'vendor_id',
|
||||
'product_id',
|
||||
'manufacturer_string',
|
||||
'product_string',
|
||||
)
|
||||
if rule.get(attr)
|
||||
)
|
||||
]
|
||||
|
||||
if matching_rules:
|
||||
return matching_rules[0]
|
||||
|
||||
def _device_monitor(self, dev_def: dict, rule: dict):
|
||||
path = dev_def['path']
|
||||
data_size = rule['data_size']
|
||||
poll_seconds = rule['poll_seconds']
|
||||
last_data = None
|
||||
self.logger.info(f'Starting monitor for device {path}')
|
||||
|
||||
def wait():
|
||||
if poll_seconds and poll_seconds > 0:
|
||||
sleep(poll_seconds)
|
||||
|
||||
while True:
|
||||
device = None
|
||||
|
||||
try:
|
||||
if not device:
|
||||
device = hid.Device(dev_def['vendor_id'], dev_def['product_id']) # type: ignore
|
||||
data = device.read(data_size)
|
||||
except Exception as e:
|
||||
self.logger.warning(f'Read error from {path}: {e}')
|
||||
device = None
|
||||
wait()
|
||||
continue
|
||||
|
||||
if len(data) and data != last_data:
|
||||
data_dump = ''.join(f'{x:02x}' for x in data)
|
||||
get_bus().post(HidDeviceDataEvent(data=data_dump, **dev_def))
|
||||
last_data = data
|
||||
|
||||
wait()
|
||||
|
||||
def _register_device_monitor(self, device: dict, rule: dict):
|
||||
"""
|
||||
Register a monitor for a device.
|
||||
"""
|
||||
path = device['path']
|
||||
# Make sure that no other monitors are registered for this device
|
||||
self._unregister_device_monitor(path)
|
||||
monitor_proc = self._device_monitors[path] = Process(
|
||||
target=self._device_monitor, args=(device, rule)
|
||||
)
|
||||
|
||||
monitor_proc.start()
|
||||
|
||||
def _unregister_device_monitor(self, path: str):
|
||||
"""
|
||||
Unregister a monitor for a device.
|
||||
"""
|
||||
monitor_proc = self._device_monitors.get(path)
|
||||
if monitor_proc and monitor_proc.is_alive():
|
||||
self.logger.info(f'Terminating monitor for device {path}')
|
||||
|
||||
try:
|
||||
monitor_proc.terminate()
|
||||
monitor_proc.join(2)
|
||||
if monitor_proc.is_alive():
|
||||
monitor_proc.kill()
|
||||
except Exception as e:
|
||||
self.logger.warning(f'Error while terminating monitor for {path}: {e}')
|
||||
|
||||
del self._device_monitors[path]
|
||||
|
||||
def _handle_new_devices(self, scanned_devices: dict):
|
||||
"""
|
||||
Handles connection events and monitor registering.
|
||||
"""
|
||||
scanned_device_paths = set(scanned_devices.keys())
|
||||
available_device_paths = set(self._available_devices)
|
||||
new_device_paths = scanned_device_paths.difference(available_device_paths)
|
||||
|
||||
for path in new_device_paths:
|
||||
device = scanned_devices[path]
|
||||
get_bus().post(HidDeviceConnectedEvent(**device))
|
||||
|
||||
monitor_rule = self._get_monitor_rule(device)
|
||||
if monitor_rule:
|
||||
self._register_device_monitor(device, monitor_rule)
|
||||
|
||||
def _handle_disconnected_devices(self, scanned_devices: dict):
|
||||
"""
|
||||
Handles disconnection events and monitor unregistering.
|
||||
"""
|
||||
scanned_device_paths = set(scanned_devices.keys())
|
||||
available_device_paths = set(self._available_devices)
|
||||
lost_device_paths = available_device_paths.difference(scanned_device_paths)
|
||||
|
||||
for path in lost_device_paths:
|
||||
get_bus().post(HidDeviceDisconnectedEvent(**self._available_devices[path]))
|
||||
|
||||
def _handle_device_events(self, scanned_devices: dict):
|
||||
"""
|
||||
Handles connected/disconnected device events and register/unregister the
|
||||
monitors based on the user-provided rules when required.
|
||||
"""
|
||||
self._handle_new_devices(scanned_devices)
|
||||
self._handle_disconnected_devices(scanned_devices)
|
10
platypush/plugins/hid/manifest.yaml
Normal file
10
platypush/plugins/hid/manifest.yaml
Normal file
|
@ -0,0 +1,10 @@
|
|||
manifest:
|
||||
events:
|
||||
platypush.message.event.hid.HidDeviceConnectedEvent: when a device is connected
|
||||
platypush.message.event.hid.HidDeviceDisconnectedEvent: when a previously available device is disconnected
|
||||
platypush.message.event.hid.HidDeviceDataEvent: when a monitored device sends some data
|
||||
install:
|
||||
pip:
|
||||
- hid
|
||||
package: platypush.plugins.hid
|
||||
type: plugin
|
88
platypush/schemas/hid.py
Normal file
88
platypush/schemas/hid.py
Normal file
|
@ -0,0 +1,88 @@
|
|||
from marshmallow import fields, Schema, INCLUDE
|
||||
|
||||
|
||||
class HidDeviceSchema(Schema):
|
||||
class Meta:
|
||||
unknown = INCLUDE
|
||||
|
||||
path = fields.String(
|
||||
metadata={
|
||||
'description': 'Path to the raw HID device',
|
||||
'example': '/dev/hidraw0',
|
||||
},
|
||||
)
|
||||
|
||||
serial_number = fields.String(
|
||||
metadata={
|
||||
'description': 'Serial number',
|
||||
'example': '00:11:22:33:44:55',
|
||||
},
|
||||
)
|
||||
|
||||
vendor_id = fields.Integer(
|
||||
metadata={
|
||||
'description': 'Vendor ID',
|
||||
'example': 1234,
|
||||
},
|
||||
)
|
||||
|
||||
product_id = fields.Integer(
|
||||
metadata={
|
||||
'description': 'Product ID',
|
||||
'example': 4321,
|
||||
},
|
||||
)
|
||||
|
||||
manufacturer_string = fields.String(
|
||||
metadata={
|
||||
'description': 'Manufacturer custom string',
|
||||
'example': 'foo',
|
||||
},
|
||||
)
|
||||
|
||||
product_string = fields.String(
|
||||
metadata={
|
||||
'description': 'Main name of the product',
|
||||
'example': 'My Device',
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
class HidMonitoredDeviceSchema(HidDeviceSchema):
|
||||
notify_only_if_changed = fields.Boolean(
|
||||
missing=True,
|
||||
metadata={
|
||||
'description': 'If set to true (default), only changes in the '
|
||||
'values of the device will trigger events. So if you are e.g. '
|
||||
'monitoring the state of a joystick, only changes in the pressed '
|
||||
'buttons will trigger events.',
|
||||
},
|
||||
)
|
||||
|
||||
data_size = fields.Integer(
|
||||
missing=64,
|
||||
metadata={
|
||||
'description': 'How many bytes should be read from the device on '
|
||||
'each iteration (default: 64)',
|
||||
},
|
||||
)
|
||||
|
||||
poll_seconds = fields.Float(
|
||||
missing=0.1,
|
||||
metadata={
|
||||
'description': 'How often we should check this device for new data '
|
||||
'(default: 0.1 seconds)'
|
||||
},
|
||||
)
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
for attr in (
|
||||
'path',
|
||||
'serial_number',
|
||||
'vendor_id',
|
||||
'product_id',
|
||||
'manufacturer_string',
|
||||
'product_string',
|
||||
):
|
||||
self._declared_fields[attr].metadata['description'] += ' (optional)'
|
Loading…
Add table
Reference in a new issue