From 3c88593e9ae98a2abf9d8e7470032a7ad017eee9 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Mon, 6 May 2024 02:26:27 +0200 Subject: [PATCH] [#293] Merged `midi` plugin and backend. Closes: #293 --- docs/source/backends.rst | 1 - docs/source/platypush/backend/midi.rst | 6 - platypush/backend/midi/__init__.py | 136 ----------- platypush/backend/midi/manifest.yaml | 15 -- platypush/message/event/midi.py | 55 ++++- platypush/plugins/midi/__init__.py | 313 +++++++++++++++++++------ platypush/plugins/midi/_model.py | 180 ++++++++++++++ platypush/plugins/midi/manifest.yaml | 10 +- 8 files changed, 480 insertions(+), 236 deletions(-) delete mode 100644 docs/source/platypush/backend/midi.rst delete mode 100644 platypush/backend/midi/__init__.py delete mode 100644 platypush/backend/midi/manifest.yaml create mode 100644 platypush/plugins/midi/_model.py diff --git a/docs/source/backends.rst b/docs/source/backends.rst index 4171e882..7a0e0db3 100644 --- a/docs/source/backends.rst +++ b/docs/source/backends.rst @@ -7,7 +7,6 @@ Backends :caption: Backends: platypush/backend/http.rst - platypush/backend/midi.rst platypush/backend/nodered.rst platypush/backend/redis.rst platypush/backend/tcp.rst diff --git a/docs/source/platypush/backend/midi.rst b/docs/source/platypush/backend/midi.rst deleted file mode 100644 index 36f603d4..00000000 --- a/docs/source/platypush/backend/midi.rst +++ /dev/null @@ -1,6 +0,0 @@ -``midi`` -========================== - -.. automodule:: platypush.backend.midi - :members: - diff --git a/platypush/backend/midi/__init__.py b/platypush/backend/midi/__init__.py deleted file mode 100644 index 73aea343..00000000 --- a/platypush/backend/midi/__init__.py +++ /dev/null @@ -1,136 +0,0 @@ -import time - -from threading import Timer - -from platypush.backend import Backend -from platypush.message.event.midi import MidiMessageEvent - - -class MidiBackend(Backend): - """ - This backend will listen for events from a MIDI device and post a - MidiMessageEvent whenever a new MIDI event happens. - """ - - def __init__( - self, - device_name=None, - port_number=None, - midi_throttle_time=None, - *args, - **kwargs - ): - """ - :param device_name: Name of the MIDI device. *N.B.* either - `device_name` or `port_number` must be set. - Use :meth:`platypush.plugins.midi.MidiPlugin.query_ports` to get the - available ports indices and names - :type device_name: str - - :param port_number: MIDI port number - :type port_number: int - - :param midi_throttle_time: If set, the MIDI events will be throttled - - max one per selected time frame (in seconds). Set this parameter if - you want to synchronize MIDI events with plugins that normally - operate with a lower throughput. - :type midi_throttle_time: int - """ - - import rtmidi - - super().__init__(*args, **kwargs) - - if (device_name and port_number is not None) or ( - not device_name and port_number is None - ): - raise RuntimeError( - 'Either device_name or port_number (not both) ' - + 'must be set in the MIDI backend configuration' - ) - - self.midi_throttle_time = midi_throttle_time - self.midi = rtmidi.MidiIn() - self.last_trigger_event_time = None - self.midi_flush_timeout = None - ports = self.midi.get_ports() - - if not ports: - raise RuntimeError('No MIDI devices available') - - if device_name: - if device_name not in ports: - raise RuntimeError('MIDI device "{}" not found'.format(device_name)) - - self.port_number = ports.index(device_name) - self.device_name = device_name - - if port_number: - if port_number < 0 or port_number >= len(ports): - raise RuntimeError('MIDI port {} not found') - - self.port_number = port_number - self.device_name = ports[port_number] - - self.midi.set_callback(self._on_midi_message()) - - def _on_midi_message(self): - def flush_midi_message(message): - def _f(): - self.logger.info( - 'Flushing throttled MIDI message {} to the bus'.format(message) - ) - delay = time.time() - self.last_trigger_event_time - self.bus.post(MidiMessageEvent(message=message, delay=delay)) - - return _f - - # noinspection PyUnusedLocal - def callback(message, data): - # rtmidi will provide a tuple in the format - # (midi_message, time_since_last_event) - delay = message[1] - message = message[0] - - if self.midi_throttle_time and self.last_trigger_event_time: - event_delta = time.time() - self.last_trigger_event_time - if event_delta < self.midi_throttle_time: - self.logger.debug('Skipping throttled message {}'.format(message)) - if self.midi_flush_timeout: - self.midi_flush_timeout.cancel() - - self.midi_flush_timeout = Timer( - self.midi_throttle_time - event_delta, - flush_midi_message(message), - ) - - self.midi_flush_timeout.start() - return - - self.last_trigger_event_time = time.time() - self.bus.post(MidiMessageEvent(message=message, delay=delay)) - - return callback - - def run(self): - super().run() - - self.midi.open_port(self.port_number) - self.logger.info( - 'Initialized MIDI backend, listening for events on device {}'.format( - self.device_name - ) - ) - - while not self.should_stop(): - try: - time.sleep(1) - except Exception as e: - self.logger.exception(e) - - if self.midi: - self.midi.close_port() - self.midi = None - - -# vim:sw=4:ts=4:et: diff --git a/platypush/backend/midi/manifest.yaml b/platypush/backend/midi/manifest.yaml deleted file mode 100644 index 990d0202..00000000 --- a/platypush/backend/midi/manifest.yaml +++ /dev/null @@ -1,15 +0,0 @@ -manifest: - events: - platypush.message.event.midi.MidiMessageEvent: when a new MIDI event is received - install: - apt: - - python3-rtmidi - dnf: - - python-rtmidi - pacman: - - rtmidi - - python-rtmidi - pip: - - rtmidi - package: platypush.backend.midi - type: backend diff --git a/platypush/message/event/midi.py b/platypush/message/event/midi.py index d2032baa..6dbddee6 100644 --- a/platypush/message/event/midi.py +++ b/platypush/message/event/midi.py @@ -1,22 +1,57 @@ +from abc import ABC +from typing import Any, List, Optional + from platypush.message.event import Event -class MidiMessageEvent(Event): +class MidiEvent(Event, ABC): """ - Event triggered upon received MIDI message + Base class for MIDI events. """ - def __init__(self, message, delay=None, *args, **kwargs): + def __init__(self, *args, device: Optional[str], port: Optional[int], **kwargs): """ - :param message: Received MIDI message - :type message: tuple[int] - - :param delay: Time in seconds since the previous MIDI event (default: None) - :type delay: float + :param device: The MIDI device name. + :param port: The MIDI device port number. """ + super().__init__(*args, device=device, port=port, **kwargs) - super().__init__(*args, message=message, delay=delay, **kwargs) + +class MidiMessageEvent(MidiEvent): + """ + Event triggered upon received MIDI message. + """ + + def __init__(self, *args, message: List[int], data: Optional[Any] = None, **kwargs): + """ + :param message: The received MIDI message. + :param data: Additional data associated to the event. + """ + super().__init__(*args, message=message, data=data, **kwargs) + + +class MidiDeviceConnectedEvent(MidiEvent): + """ + Event triggered when a MIDI device is connected. + """ + + +class MidiDeviceDisconnectedEvent(MidiEvent): + """ + Event triggered when a MIDI device is disconnected. + """ + + +class MidiDeviceAddedEvent(MidiEvent): + """ + Event triggered when a MIDI device is added to the list of available devices. + """ + + +class MidiDeviceRemovedEvent(MidiEvent): + """ + Event triggered when a MIDI device is removed from the list of available devices. + """ # vim:sw=4:ts=4:et: - diff --git a/platypush/plugins/midi/__init__.py b/platypush/plugins/midi/__init__.py index 894ca1c1..b8a4b334 100644 --- a/platypush/plugins/midi/__init__.py +++ b/platypush/plugins/midi/__init__.py @@ -1,84 +1,197 @@ import time +import re +from threading import RLock, Timer +from typing import Optional, Sequence, Union -from platypush.plugins import Plugin, action +import rtmidi + +from platypush.message.event.midi import MidiMessageEvent +from platypush.plugins import RunnablePlugin, action + +from ._model import MidiDevice, MidiDeviceType, MidiDevices -class MidiPlugin(Plugin): +class MidiPlugin(RunnablePlugin): """ Virtual MIDI controller plugin. It allows you to send custom MIDI messages - to any connected devices. + to any connected MIDI devices and listen for MIDI events. """ _played_notes = set() - def __init__(self, device_name='Platypush virtual MIDI output', **kwargs): + def __init__( + self, + device_name: str = 'Platypush MIDI plugin', + midi_devices: Optional[Sequence[Union[str, int]]] = None, + poll_interval: Optional[float] = 5.0, + event_resolution: Optional[float] = None, + **kwargs, + ): """ - :param device_name: MIDI virtual device name (default: *Platypush virtual MIDI output*) - :type device_name: str + :param device_name: Name of the MIDI device associated to this plugin. + :param midi_devices: List of MIDI devices to open and monitor for + events, by name or by port number. If set, and ``poll_interval`` + is set, then the plugin will only listen for events from these + devices. If not set, and ``poll_interval`` is set, then the plugin + will listen for events from all available MIDI devices (default). + :param poll_interval: How often the plugin should scan for new MIDI + devices. Set this to 0 or null to disable polling. + :param event_resolution: If set, then the plugin will throttle + MIDI events to the specified time resolution. If an event is + triggered within the specified time resolution, then the event will + be throttled and the last event will be discarded. This is useful + to avoid sending too many MIDI messages in a short time frame. + Default: no throttling. """ - import rtmidi - - super().__init__(**kwargs) + super().__init__(poll_interval=poll_interval, **kwargs) + self._devices = MidiDevices() self.device_name = device_name - self.midiout = rtmidi.MidiOut() - available_ports = self.midiout.get_ports() + self._devices_to_monitor = { + self._to_device_id(dev) for dev in (midi_devices or []) + } + self._midi_out = None + self._midi_in = None + self._last_midi_event = None + self._last_event_trigger_time = None + self._event_timer = None + self._event_timer_lock = RLock() + self._event_resolution = event_resolution + self.logger.info( + 'Initialized MIDI plugin on virtual device %s', self.device_name + ) - if available_ports: - self.midiout.open_port(0) - self.logger.info('Initialized MIDI plugin on port 0') - else: - self.midiout.open_virtual_port(self.device_name) - self.logger.info( - 'Initialized MIDI plugin on virtual device {}'.format(self.device_name) + @staticmethod + def _to_device_id(device: Union[str, int]) -> Union[str, int]: + try: + device = int(device) + except ValueError: + pass + + return device + + @property + def midi_in(self) -> rtmidi.MidiIn: # type: ignore + if not self._midi_in: + self._midi_in = rtmidi.MidiIn() # type: ignore + self._midi_in.open_port() + + return self._midi_in + + @property + def midi_out(self) -> rtmidi.MidiOut: # type: ignore + if not self._midi_out: + self._midi_out = rtmidi.MidiOut() # type: ignore + self._midi_out.open_port() + + return self._midi_out + + def start_event_timer(self): + def flush_last_event(): + self._last_event_trigger_time = time.time() + + if not self._last_midi_event: + return + + self._bus.post(self._last_midi_event) + + with self._event_timer_lock: + if not self._event_timer: + self._event_timer = Timer(self._event_resolution or 0, flush_last_event) + + self._event_timer.start() + + return self._event_timer + + def _on_midi_message(self, device: Optional[MidiDevice] = None): + def callback(message, data): + # rtmidi will provide a tuple in the format + # (midi_message, time_since_last_event) + # delay = message[1] + message = message[0] + self._last_midi_event = event = MidiMessageEvent( + message=message, + data=data, + device=device.name if device else None, + port=device.port if device else None, ) - @action - def send_message(self, values): - """ - :param values: Values is expected to be a list containing the MIDI command code and the command parameters - - see reference at https://ccrma.stanford.edu/~craig/articles/linuxmidi/misc/essenmidi.html - :type values: list[int] + if self._event_resolution and self._last_event_trigger_time: + event_delta = time.time() - self._last_event_trigger_time + if event_delta < self._event_resolution: + self.start_event_timer() + self.logger.debug('Skipping throttled message: %s', message) + return - Available MIDI commands: - * ``0x80`` Note Off - * ``0x90`` Note On - * ``0xA0`` Aftertouch - * ``0xB0`` Continuous controller - * ``0xC0`` Patch change - * ``0xD0`` Channel Pressure - * ``0xE0`` Pitch bend - * ``0xF0`` Start of system exclusive message - * ``0xF1`` MIDI Time Code Quarter Frame (Sys Common) - * ``0xF2`` Song Position Pointer (Sys Common) - * ``0xF3`` Song Select - * ``0xF6`` Tune Request (Sys Common) - * ``0xF7`` End of system exclusive message - * ``0xF8`` Timing Clock (Sys Realtime) - * ``0xFA`` Start (Sys Realtime) - * ``0xFB`` Continue (Sys Realtime) - * ``0xFC`` Stop (Sys Realtime) - * ``0xFE`` Active Sensing (Sys Realtime) - * ``0xFF`` System Reset (Sys Realtime) - """ + self._last_event_trigger_time = time.time() + self._bus.post(event) - self.midiout.send_message(values) + return callback @action - def play_note(self, note, velocity, duration=0): + def send_message( + self, values: Sequence[int], device: Optional[Union[int, str]] = None + ): + """ + :param values: Values is expected to be a list containing the MIDI + command code and the command parameters - `see reference + `_. + + Available MIDI commands: + + * ``0x80`` Note Off + * ``0x90`` Note On + * ``0xA0`` Aftertouch + * ``0xB0`` Continuous controller + * ``0xC0`` Patch change + * ``0xD0`` Channel Pressure + * ``0xE0`` Pitch bend + * ``0xF0`` Start of system exclusive message + * ``0xF1`` MIDI Time Code Quarter Frame (Sys Common) + * ``0xF2`` Song Position Pointer (Sys Common) + * ``0xF3`` Song Select + * ``0xF6`` Tune Request (Sys Common) + * ``0xF7`` End of system exclusive message + * ``0xF8`` Timing Clock (Sys Realtime) + * ``0xFA`` Start (Sys Realtime) + * ``0xFB`` Continue (Sys Realtime) + * ``0xFC`` Stop (Sys Realtime) + * ``0xFE`` Active Sensing (Sys Realtime) + * ``0xFF`` System Reset (Sys Realtime) + + :param device: MIDI port to send the message to, by number or by name. + If None then the message will be sent to the default port allocated + for the plugin. + """ + if isinstance(device, (str, int)): + if isinstance(device, str): + dev = self._devices.by_type(MidiDeviceType.OUTPUT).get(device) + else: + dev = self._devices.by_port(device) + + assert dev, f'Could not find device by name {device}' + assert ( + dev.device_type == MidiDeviceType.OUTPUT + ), f'The device {device} is not an output device' + + midi_out = dev.midi_out + if not (midi_out and midi_out.is_port_open()): + dev.open() + else: + midi_out = self.midi_out + + assert midi_out and midi_out.is_port_open(), 'No MIDI output port available' + midi_out.send_message(values) + + @action + def play(self, note: int, velocity: int, duration: float = 0): """ Play a note with selected velocity and duration. :param note: MIDI note in range 0-127 with #60 = C4 - :type note: int - :param velocity: MIDI note velocity in range 0-127 - :type velocity: int - :param duration: Note duration in seconds. Pass 0 if you don't want the note to get off - :type duration: float """ - self.send_message([0x90, note, velocity]) # Note on self._played_notes.add(note) @@ -88,42 +201,110 @@ class MidiPlugin(Plugin): self._played_notes.remove(note) @action - def release_note(self, note): + def release(self, note: int): """ Release a played note. :param note: MIDI note in range 0-127 with #60 = C4 - :type note: int """ - self.send_message([0x80, note, 0]) # Note off self._played_notes.remove(note) @action - def release_all_notes(self): + def release_all(self): """ Release all the notes being played. """ - played_notes = self._played_notes.copy() for note in played_notes: - self.release_note(note) + self.release(note) @action - def query_ports(self): + def query(self): """ - :returns: dict: A list of the available MIDI ports with index and name + :returns: dict: A list of the available MIDI ports with index and name. + Format: ``port_index: device_name``. + + .. code-block:: json + + { + "in": { + 0: "Midi Through:Midi Through Port-0 14:0", + 1: "MPK mini 3:MPK mini 3 MIDI 1 32:0", + 2: "X-TOUCH MINI:X-TOUCH MINI MIDI 1 36:0", + 3: "RtMidiOut Client:Platypush MIDI plugin 129:0" + }, + "out": { + 0: "Midi Through:Midi Through Port-0 14:0", + 1: "MPK mini 3:MPK mini 3 MIDI 1 32:0", + 2: "X-TOUCH MINI:X-TOUCH MINI MIDI 1 36:0" + } + } + """ + in_ports = {int(port): dev for port, dev in enumerate(self.midi_in.get_ports())} + out_ports = { + int(port): dev for port, dev in enumerate(self.midi_out.get_ports()) + } - import rtmidi + for device_type, devices in ( + (MidiDeviceType.INPUT, in_ports), + (MidiDeviceType.OUTPUT, out_ports), + ): + for port, name in devices.items(): + dev = MidiDevice( + name=name, + port=port, + device_type=device_type, + callback=self._on_midi_message, + ) - in_ports = rtmidi.MidiIn().get_ports() - out_ports = rtmidi.MidiOut().get_ports() + self._devices.add(dev) + dev = self._devices.by_port(port) + assert dev, f'Could not find device by port {port}' + + if not self._is_self(dev) and ( + not self._devices_to_monitor + or port in self._devices_to_monitor + or name in self._devices_to_monitor + ): + dev.open() return { - 'in': dict(enumerate(in_ports)), - 'out': dict(enumerate(out_ports)), + 'in': in_ports, + 'out': out_ports, } + def _is_self(self, device: MidiDevice): + return device.name == self.device_name or re.search( + r'^RtMidi(In|Out) Client:', device.name + ) + + def cleanup(self): + self.release_all() + if self._midi_in: + self._midi_in.close_port() + self._midi_in = None + + if self._midi_out: + self._midi_out.close_port() + self._midi_out = None + + self._devices.close() + + def main(self): + # Don't run the polling logic if the poll_interval is not set + if not self.poll_interval: + self.wait_stop() + return + + while not self.should_stop(): + self.query() + self.wait_stop(self.poll_interval) + + def stop(self): + self.cleanup() + super().stop() + # vim:sw=4:ts=4:et: diff --git a/platypush/plugins/midi/_model.py b/platypush/plugins/midi/_model.py new file mode 100644 index 00000000..e90781c3 --- /dev/null +++ b/platypush/plugins/midi/_model.py @@ -0,0 +1,180 @@ +from dataclasses import dataclass +from enum import Enum +from typing import Any, Callable, Dict, Optional, Sequence, Union + +import rtmidi + +from platypush.context import get_bus +from platypush.message.event.midi import ( + MidiDeviceAddedEvent, + MidiDeviceConnectedEvent, + MidiDeviceDisconnectedEvent, + MidiDeviceRemovedEvent, +) + + +class MidiDeviceType(Enum): + """ + Enum for MIDI device types. + """ + + INPUT = 'input' + OUTPUT = 'output' + + +@dataclass +class MidiDevice: + """ + Data class for MIDI devices. + """ + + name: str + port: int + device_type: MidiDeviceType + midi_in: Optional[rtmidi.MidiIn] = None # type: ignore + midi_out: Optional[rtmidi.MidiOut] = None # type: ignore + callback: Optional[ + Callable[ + [Optional["MidiDevice"]], + Callable[[Sequence[int], Optional[Any]], Optional[Any]], + ] + ] = None + + def open(self): + """ + Open the MIDI device. + """ + if self.device_type == MidiDeviceType.INPUT and not self.midi_in: + get_bus().post(MidiDeviceConnectedEvent(device=self.name, port=self.port)) + self.midi_in = rtmidi.MidiIn() # type: ignore + self.midi_in.open_port(self.port) + if self.callback: + self.midi_in.set_callback(self.callback(self)) + elif self.device_type == MidiDeviceType.OUTPUT and not self.midi_out: + self.midi_out = rtmidi.MidiOut() # type: ignore + self.midi_out.open_port(self.port) + + def close(self): + """ + Close the MIDI device. + """ + if self.midi_in: + self.midi_in.close_port() + self.midi_in = None + get_bus().post( + MidiDeviceDisconnectedEvent(device=self.name, port=self.port) + ) + + if self.midi_out: + self.midi_out.close_port() + self.midi_out = None + get_bus().post( + MidiDeviceDisconnectedEvent(device=self.name, port=self.port) + ) + + def is_open(self) -> bool: + """ + Check if the MIDI device is open. + + :returns: True if the MIDI device is open, False otherwise. + """ + return (self.midi_in is not None and self.midi_in.is_port_open()) or ( + self.midi_out is not None and self.midi_out.is_port_open() + ) + + +class MidiDevices: + """ + Class to manage MIDI devices. + """ + + def __init__(self): + self._devices_by_port = {} + self._devices_by_name = {} + self._devices_by_type = {MidiDeviceType.INPUT: {}, MidiDeviceType.OUTPUT: {}} + + def add(self, device: MidiDevice): + """ + Add a MIDI device to the list of available devices. + + :param device: The MIDI device to add. + """ + if ( + device.port not in self._devices_by_port + or self._devices_by_port[device.port].name != device.name + ): + self._devices_by_port[device.port] = device + + if device.name not in self._devices_by_name: + get_bus().post(MidiDeviceAddedEvent(device=device.name, port=device.port)) + self._devices_by_name[device.name] = device + + if device.port not in self._devices_by_type[device.device_type]: + self._devices_by_type[device.device_type][device.name] = device + + def remove(self, device: MidiDevice): + """ + Remove a MIDI device from the list of available devices. + + :param device: The MIDI device to remove. + """ + device.close() + del self._devices_by_port[device.port] + del self._devices_by_name[device.name] + del self._devices_by_type[device.device_type][device.name] + get_bus().post(MidiDeviceRemovedEvent(device=device.name, port=device.port)) + + def by_port(self, port: int) -> Optional[MidiDevice]: + """ + Get a MIDI device by port. + + :param port: The port of the MIDI device. + :returns: The MIDI device if found, None otherwise. + """ + return self._devices_by_port.get(port) + + def by_name(self, name: str) -> Optional[MidiDevice]: + """ + Get a MIDI device by name. + + :param name: The name of the MIDI device. + :returns: The MIDI device if found, None otherwise. + """ + return self._devices_by_name.get(name) + + def by_type(self, device_type: MidiDeviceType) -> Dict[str, MidiDevice]: + """ + Get all MIDI devices of a certain type. + + :param device_type: The type of the MIDI devices. + :returns: A list of MIDI devices of the specified type. + """ + return self._devices_by_type[device_type] + + def get(self, device: Union[str, int]) -> Optional[MidiDevice]: + """ + Get a MIDI device by name or port. + + :param device: The name or port of the MIDI device. + :returns: The MIDI device if found, None otherwise. + """ + if isinstance(device, str): + return self.by_name(device) + if isinstance(device, int): + return self.by_port(device) + return None + + def close(self): + """ + Close and clear all MIDI devices. + """ + devices = list(self._devices_by_port.values()) + for dev in devices: + self.remove(dev) + + self._devices_by_port.clear() + self._devices_by_name.clear() + self._devices_by_type = {MidiDeviceType.INPUT: {}, MidiDeviceType.OUTPUT: {}} + + +# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/midi/manifest.yaml b/platypush/plugins/midi/manifest.yaml index f78ea806..3887fa22 100644 --- a/platypush/plugins/midi/manifest.yaml +++ b/platypush/plugins/midi/manifest.yaml @@ -1,13 +1,19 @@ manifest: - events: {} + events: + - platypush.message.event.midi.MidiDeviceAddedEvent + - platypush.message.event.midi.MidiDeviceConnectedEvent + - platypush.message.event.midi.MidiDeviceDisconnectedEvent + - platypush.message.event.midi.MidiDeviceRemovedEvent + - platypush.message.event.midi.MidiMessageEvent install: apt: - python3-rtmidi dnf: - python-rtmidi pacman: + - rtmidi - python-rtmidi pip: - - python-rtmidi + - rtmidi package: platypush.plugins.midi type: plugin