Merge branch 'master' into 29-generic-entities-support

This commit is contained in:
Fabio Manganiello 2023-01-07 22:31:36 +01:00
commit b7f266cd92
Signed by untrusted user: blacklight
GPG key ID: D90FBA7F76362774
9 changed files with 413 additions and 0 deletions

View file

@ -4,6 +4,19 @@ All notable changes to this project will be documented in this file.
Given the high speed of development in the first phase, changes are being
reported only starting from v0.20.2.
## [Unreleased]
### Added
- Added `hid` plugin to support discoverability and data interaction with
generic HID devices - like Bluetooth/USB peripherals, joysticks, dongles and
any other type of devices that supports the HID interface.
### Fixed
- Running the Zeroconf registration logic in another thread in `backend.http`,
so failures in the Zeroconf logic don't affect the startup of the web server.
## [0.24.4] - 2022-12-20
### Fixed

View file

@ -31,6 +31,7 @@ Events
platypush/events/gotify.rst
platypush/events/gpio.rst
platypush/events/gps.rst
platypush/events/hid.rst
platypush/events/http.rst
platypush/events/http.hook.rst
platypush/events/http.rss.rst

View file

@ -0,0 +1,5 @@
``hid``
=======
.. automodule:: platypush.message.event.hid
:members:

View file

@ -0,0 +1,5 @@
``hid``
=======
.. automodule:: platypush.plugins.hid
:members:

View file

@ -58,6 +58,7 @@ Plugins
platypush/plugins/gpio.sensor.motion.pmw3901.rst
platypush/plugins/gpio.zeroborg.rst
platypush/plugins/graphite.rst
platypush/plugins/hid.rst
platypush/plugins/http.request.rst
platypush/plugins/http.request.rss.rst
platypush/plugins/http.webpage.rst

View file

@ -0,0 +1,56 @@
from platypush.message.event import Event
class HidBaseEvent(Event):
"""
Base class for HID events.
"""
def __init__(
self,
*args,
path: str,
serial_number: str,
vendor_id: int,
product_id: int,
product_string: str,
manufacturer_string: str,
**kwargs
):
super().__init__(
*args,
path=path,
serial_number=serial_number,
vendor_id=vendor_id,
product_id=product_id,
product_string=product_string,
manufacturer_string=manufacturer_string,
**kwargs
)
class HidDeviceConnectedEvent(HidBaseEvent):
"""
Event triggered when a device is discovered.
"""
class HidDeviceDisconnectedEvent(HidBaseEvent):
"""
Event triggered when a device is disconnected.
"""
class HidDeviceDataEvent(HidBaseEvent):
"""
Event triggered when a monitored device sends some data.
"""
def __init__(self, *args, data: str, **kwargs):
"""
:param data: Hex-encoded representation of the received data.
"""
super().__init__(*args, data=data, **kwargs)
# vim:sw=4:ts=4:et:

View file

@ -0,0 +1,234 @@
from collections.abc import Iterable
from multiprocessing import Process
from typing import Dict, List, Optional
from time import sleep
import hid
from platypush.context import get_bus
from platypush.message.event.hid import (
HidDeviceConnectedEvent,
HidDeviceDataEvent,
HidDeviceDisconnectedEvent,
)
from platypush.plugins import RunnablePlugin, action
from platypush.schemas.hid import HidDeviceSchema, HidMonitoredDeviceSchema
class HidPlugin(RunnablePlugin):
"""
This plugin can be used to interact directly with HID devices (including
Bluetooth, USB and several serial and wireless devices) over the raw
interface.
This is the preferred way of communicating with joypads.
Note that on Linux reading from the devices requires the user running the
Platypush service to have (at least) read access to the ``/dev/hidraw*``
devices. However, it is still possible to get connected/disconnected events
even without having to open a connection to the device. A way to make HID
raw devices accessible to e.g. a particular user group is via udev rules.
For example, create a file named ``/etc/udev/rules.d/99-hid.rules`` with
the following content::
# Make all /dev/hidraw* devices accessible in read/write to users in
# the group input
KERNEL=="hidraw*", GROUP="input", MODE="0660"
A more granular solution is to provide read and/or write access only to a
specific device that you want to access, and only to the specific user
running the Platypush service::
KERNEL=="hidraw*", ATTRS{idVendor}=="1234", ATTRS{idProduct}=="5678", USER="user", MODE="0660"
If you don't want to reboot the device after adding the rules, then you can
reload the rules for udev service and re-trigger them::
# udevadm control --reload && udevadm trigger
Triggers:
* :class:`platypush.message.event.hid.HidDeviceConnectedEvent` when a
device is connected
* :class:`platypush.message.event.hid.HidDeviceDisconnectedEvent` when
a previously available device is disconnected
"""
def __init__(
self,
monitored_devices: Optional[Iterable[dict]] = None,
poll_seconds: int = 1,
**kwargs,
):
"""
:param monitored_devices: Map of devices that should be monitored for
new data. Format (note that all the device filtering attributes are
optional):
.. schema:: hid.HidMonitoredDeviceSchema(many=True)
:param poll_seconds: How often the plugin should check for changes in
the list of devices (default: 1 second).
"""
super().__init__(**kwargs)
self._poll_seconds = poll_seconds
self._filters = HidMonitoredDeviceSchema().load(
monitored_devices or [], many=True
)
self._device_monitors: Dict[str, Process] = {}
self._available_devices: Dict[str, dict] = {}
def main(self):
while not self.should_stop():
scanned_devices = {
dev['path']: dev for dev in self.get_devices().output # type: ignore
}
self._handle_device_events(scanned_devices)
self._available_devices = scanned_devices
if self._poll_seconds and self._poll_seconds > 0:
sleep(self._poll_seconds)
def stop(self):
device_monitors = self._device_monitors.copy()
for monitored_device in device_monitors:
self._unregister_device_monitor(monitored_device)
super().stop()
@action
def get_devices(self) -> List[dict]:
"""
Get the HID devices available on the host.
:return: .. schema:: hid.HidDeviceSchema(many=True)
"""
return list( # type: ignore
{
dev['path']: HidDeviceSchema().load(dev)
for dev in hid.enumerate() # type: ignore
}.values()
)
def _get_monitor_rule(self, device: dict) -> Optional[dict]:
"""
:return: .. schema:: hid.HidMonitoredDeviceSchema
"""
matching_rules = [
rule
for rule in (self._filters or [])
if all(
rule[attr] == device.get(attr)
for attr in (
'path',
'serial_number',
'vendor_id',
'product_id',
'manufacturer_string',
'product_string',
)
if rule.get(attr)
)
]
if matching_rules:
return matching_rules[0]
def _device_monitor(self, dev_def: dict, rule: dict):
path = dev_def['path']
data_size = rule['data_size']
poll_seconds = rule['poll_seconds']
last_data = None
self.logger.info(f'Starting monitor for device {path}')
def wait():
if poll_seconds and poll_seconds > 0:
sleep(poll_seconds)
while True:
device = None
try:
if not device:
device = hid.Device(dev_def['vendor_id'], dev_def['product_id']) # type: ignore
data = device.read(data_size)
except Exception as e:
self.logger.warning(f'Read error from {path}: {e}')
device = None
wait()
continue
if len(data) and data != last_data:
data_dump = ''.join(f'{x:02x}' for x in data)
get_bus().post(HidDeviceDataEvent(data=data_dump, **dev_def))
last_data = data
wait()
def _register_device_monitor(self, device: dict, rule: dict):
"""
Register a monitor for a device.
"""
path = device['path']
# Make sure that no other monitors are registered for this device
self._unregister_device_monitor(path)
monitor_proc = self._device_monitors[path] = Process(
target=self._device_monitor, args=(device, rule)
)
monitor_proc.start()
def _unregister_device_monitor(self, path: str):
"""
Unregister a monitor for a device.
"""
monitor_proc = self._device_monitors.get(path)
if monitor_proc and monitor_proc.is_alive():
self.logger.info(f'Terminating monitor for device {path}')
try:
monitor_proc.terminate()
monitor_proc.join(2)
if monitor_proc.is_alive():
monitor_proc.kill()
except Exception as e:
self.logger.warning(f'Error while terminating monitor for {path}: {e}')
del self._device_monitors[path]
def _handle_new_devices(self, scanned_devices: dict):
"""
Handles connection events and monitor registering.
"""
scanned_device_paths = set(scanned_devices.keys())
available_device_paths = set(self._available_devices)
new_device_paths = scanned_device_paths.difference(available_device_paths)
for path in new_device_paths:
device = scanned_devices[path]
get_bus().post(HidDeviceConnectedEvent(**device))
monitor_rule = self._get_monitor_rule(device)
if monitor_rule:
self._register_device_monitor(device, monitor_rule)
def _handle_disconnected_devices(self, scanned_devices: dict):
"""
Handles disconnection events and monitor unregistering.
"""
scanned_device_paths = set(scanned_devices.keys())
available_device_paths = set(self._available_devices)
lost_device_paths = available_device_paths.difference(scanned_device_paths)
for path in lost_device_paths:
get_bus().post(HidDeviceDisconnectedEvent(**self._available_devices[path]))
def _handle_device_events(self, scanned_devices: dict):
"""
Handles connected/disconnected device events and register/unregister the
monitors based on the user-provided rules when required.
"""
self._handle_new_devices(scanned_devices)
self._handle_disconnected_devices(scanned_devices)

View file

@ -0,0 +1,10 @@
manifest:
events:
platypush.message.event.hid.HidDeviceConnectedEvent: when a device is connected
platypush.message.event.hid.HidDeviceDisconnectedEvent: when a previously available device is disconnected
platypush.message.event.hid.HidDeviceDataEvent: when a monitored device sends some data
install:
pip:
- hid
package: platypush.plugins.hid
type: plugin

88
platypush/schemas/hid.py Normal file
View file

@ -0,0 +1,88 @@
from marshmallow import fields, Schema, INCLUDE
class HidDeviceSchema(Schema):
class Meta:
unknown = INCLUDE
path = fields.String(
metadata={
'description': 'Path to the raw HID device',
'example': '/dev/hidraw0',
},
)
serial_number = fields.String(
metadata={
'description': 'Serial number',
'example': '00:11:22:33:44:55',
},
)
vendor_id = fields.Integer(
metadata={
'description': 'Vendor ID',
'example': 1234,
},
)
product_id = fields.Integer(
metadata={
'description': 'Product ID',
'example': 4321,
},
)
manufacturer_string = fields.String(
metadata={
'description': 'Manufacturer custom string',
'example': 'foo',
},
)
product_string = fields.String(
metadata={
'description': 'Main name of the product',
'example': 'My Device',
},
)
class HidMonitoredDeviceSchema(HidDeviceSchema):
notify_only_if_changed = fields.Boolean(
missing=True,
metadata={
'description': 'If set to true (default), only changes in the '
'values of the device will trigger events. So if you are e.g. '
'monitoring the state of a joystick, only changes in the pressed '
'buttons will trigger events.',
},
)
data_size = fields.Integer(
missing=64,
metadata={
'description': 'How many bytes should be read from the device on '
'each iteration (default: 64)',
},
)
poll_seconds = fields.Float(
missing=0.1,
metadata={
'description': 'How often we should check this device for new data '
'(default: 0.1 seconds)'
},
)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
for attr in (
'path',
'serial_number',
'vendor_id',
'product_id',
'manufacturer_string',
'product_string',
):
self._declared_fields[attr].metadata['description'] += ' (optional)'