forked from platypush/platypush
parent
6a8c83f99b
commit
3c88593e9a
8 changed files with 480 additions and 236 deletions
|
@ -7,7 +7,6 @@ Backends
|
|||
:caption: Backends:
|
||||
|
||||
platypush/backend/http.rst
|
||||
platypush/backend/midi.rst
|
||||
platypush/backend/nodered.rst
|
||||
platypush/backend/redis.rst
|
||||
platypush/backend/tcp.rst
|
||||
|
|
|
@ -1,6 +0,0 @@
|
|||
``midi``
|
||||
==========================
|
||||
|
||||
.. automodule:: platypush.backend.midi
|
||||
:members:
|
||||
|
|
@ -1,136 +0,0 @@
|
|||
import time
|
||||
|
||||
from threading import Timer
|
||||
|
||||
from platypush.backend import Backend
|
||||
from platypush.message.event.midi import MidiMessageEvent
|
||||
|
||||
|
||||
class MidiBackend(Backend):
|
||||
"""
|
||||
This backend will listen for events from a MIDI device and post a
|
||||
MidiMessageEvent whenever a new MIDI event happens.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
device_name=None,
|
||||
port_number=None,
|
||||
midi_throttle_time=None,
|
||||
*args,
|
||||
**kwargs
|
||||
):
|
||||
"""
|
||||
:param device_name: Name of the MIDI device. *N.B.* either
|
||||
`device_name` or `port_number` must be set.
|
||||
Use :meth:`platypush.plugins.midi.MidiPlugin.query_ports` to get the
|
||||
available ports indices and names
|
||||
:type device_name: str
|
||||
|
||||
:param port_number: MIDI port number
|
||||
:type port_number: int
|
||||
|
||||
:param midi_throttle_time: If set, the MIDI events will be throttled -
|
||||
max one per selected time frame (in seconds). Set this parameter if
|
||||
you want to synchronize MIDI events with plugins that normally
|
||||
operate with a lower throughput.
|
||||
:type midi_throttle_time: int
|
||||
"""
|
||||
|
||||
import rtmidi
|
||||
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
if (device_name and port_number is not None) or (
|
||||
not device_name and port_number is None
|
||||
):
|
||||
raise RuntimeError(
|
||||
'Either device_name or port_number (not both) '
|
||||
+ 'must be set in the MIDI backend configuration'
|
||||
)
|
||||
|
||||
self.midi_throttle_time = midi_throttle_time
|
||||
self.midi = rtmidi.MidiIn()
|
||||
self.last_trigger_event_time = None
|
||||
self.midi_flush_timeout = None
|
||||
ports = self.midi.get_ports()
|
||||
|
||||
if not ports:
|
||||
raise RuntimeError('No MIDI devices available')
|
||||
|
||||
if device_name:
|
||||
if device_name not in ports:
|
||||
raise RuntimeError('MIDI device "{}" not found'.format(device_name))
|
||||
|
||||
self.port_number = ports.index(device_name)
|
||||
self.device_name = device_name
|
||||
|
||||
if port_number:
|
||||
if port_number < 0 or port_number >= len(ports):
|
||||
raise RuntimeError('MIDI port {} not found')
|
||||
|
||||
self.port_number = port_number
|
||||
self.device_name = ports[port_number]
|
||||
|
||||
self.midi.set_callback(self._on_midi_message())
|
||||
|
||||
def _on_midi_message(self):
|
||||
def flush_midi_message(message):
|
||||
def _f():
|
||||
self.logger.info(
|
||||
'Flushing throttled MIDI message {} to the bus'.format(message)
|
||||
)
|
||||
delay = time.time() - self.last_trigger_event_time
|
||||
self.bus.post(MidiMessageEvent(message=message, delay=delay))
|
||||
|
||||
return _f
|
||||
|
||||
# noinspection PyUnusedLocal
|
||||
def callback(message, data):
|
||||
# rtmidi will provide a tuple in the format
|
||||
# (midi_message, time_since_last_event)
|
||||
delay = message[1]
|
||||
message = message[0]
|
||||
|
||||
if self.midi_throttle_time and self.last_trigger_event_time:
|
||||
event_delta = time.time() - self.last_trigger_event_time
|
||||
if event_delta < self.midi_throttle_time:
|
||||
self.logger.debug('Skipping throttled message {}'.format(message))
|
||||
if self.midi_flush_timeout:
|
||||
self.midi_flush_timeout.cancel()
|
||||
|
||||
self.midi_flush_timeout = Timer(
|
||||
self.midi_throttle_time - event_delta,
|
||||
flush_midi_message(message),
|
||||
)
|
||||
|
||||
self.midi_flush_timeout.start()
|
||||
return
|
||||
|
||||
self.last_trigger_event_time = time.time()
|
||||
self.bus.post(MidiMessageEvent(message=message, delay=delay))
|
||||
|
||||
return callback
|
||||
|
||||
def run(self):
|
||||
super().run()
|
||||
|
||||
self.midi.open_port(self.port_number)
|
||||
self.logger.info(
|
||||
'Initialized MIDI backend, listening for events on device {}'.format(
|
||||
self.device_name
|
||||
)
|
||||
)
|
||||
|
||||
while not self.should_stop():
|
||||
try:
|
||||
time.sleep(1)
|
||||
except Exception as e:
|
||||
self.logger.exception(e)
|
||||
|
||||
if self.midi:
|
||||
self.midi.close_port()
|
||||
self.midi = None
|
||||
|
||||
|
||||
# vim:sw=4:ts=4:et:
|
|
@ -1,15 +0,0 @@
|
|||
manifest:
|
||||
events:
|
||||
platypush.message.event.midi.MidiMessageEvent: when a new MIDI event is received
|
||||
install:
|
||||
apt:
|
||||
- python3-rtmidi
|
||||
dnf:
|
||||
- python-rtmidi
|
||||
pacman:
|
||||
- rtmidi
|
||||
- python-rtmidi
|
||||
pip:
|
||||
- rtmidi
|
||||
package: platypush.backend.midi
|
||||
type: backend
|
|
@ -1,22 +1,57 @@
|
|||
from abc import ABC
|
||||
from typing import Any, List, Optional
|
||||
|
||||
from platypush.message.event import Event
|
||||
|
||||
|
||||
class MidiMessageEvent(Event):
|
||||
class MidiEvent(Event, ABC):
|
||||
"""
|
||||
Event triggered upon received MIDI message
|
||||
Base class for MIDI events.
|
||||
"""
|
||||
|
||||
def __init__(self, message, delay=None, *args, **kwargs):
|
||||
def __init__(self, *args, device: Optional[str], port: Optional[int], **kwargs):
|
||||
"""
|
||||
:param message: Received MIDI message
|
||||
:type message: tuple[int]
|
||||
:param device: The MIDI device name.
|
||||
:param port: The MIDI device port number.
|
||||
"""
|
||||
super().__init__(*args, device=device, port=port, **kwargs)
|
||||
|
||||
:param delay: Time in seconds since the previous MIDI event (default: None)
|
||||
:type delay: float
|
||||
|
||||
class MidiMessageEvent(MidiEvent):
|
||||
"""
|
||||
Event triggered upon received MIDI message.
|
||||
"""
|
||||
|
||||
super().__init__(*args, message=message, delay=delay, **kwargs)
|
||||
def __init__(self, *args, message: List[int], data: Optional[Any] = None, **kwargs):
|
||||
"""
|
||||
:param message: The received MIDI message.
|
||||
:param data: Additional data associated to the event.
|
||||
"""
|
||||
super().__init__(*args, message=message, data=data, **kwargs)
|
||||
|
||||
|
||||
class MidiDeviceConnectedEvent(MidiEvent):
|
||||
"""
|
||||
Event triggered when a MIDI device is connected.
|
||||
"""
|
||||
|
||||
|
||||
class MidiDeviceDisconnectedEvent(MidiEvent):
|
||||
"""
|
||||
Event triggered when a MIDI device is disconnected.
|
||||
"""
|
||||
|
||||
|
||||
class MidiDeviceAddedEvent(MidiEvent):
|
||||
"""
|
||||
Event triggered when a MIDI device is added to the list of available devices.
|
||||
"""
|
||||
|
||||
|
||||
class MidiDeviceRemovedEvent(MidiEvent):
|
||||
"""
|
||||
Event triggered when a MIDI device is removed from the list of available devices.
|
||||
"""
|
||||
|
||||
|
||||
# vim:sw=4:ts=4:et:
|
||||
|
||||
|
|
|
@ -1,46 +1,144 @@
|
|||
import time
|
||||
import re
|
||||
from threading import RLock, Timer
|
||||
from typing import Optional, Sequence, Union
|
||||
|
||||
from platypush.plugins import Plugin, action
|
||||
import rtmidi
|
||||
|
||||
from platypush.message.event.midi import MidiMessageEvent
|
||||
from platypush.plugins import RunnablePlugin, action
|
||||
|
||||
from ._model import MidiDevice, MidiDeviceType, MidiDevices
|
||||
|
||||
|
||||
class MidiPlugin(Plugin):
|
||||
class MidiPlugin(RunnablePlugin):
|
||||
"""
|
||||
Virtual MIDI controller plugin. It allows you to send custom MIDI messages
|
||||
to any connected devices.
|
||||
to any connected MIDI devices and listen for MIDI events.
|
||||
"""
|
||||
|
||||
_played_notes = set()
|
||||
|
||||
def __init__(self, device_name='Platypush virtual MIDI output', **kwargs):
|
||||
def __init__(
|
||||
self,
|
||||
device_name: str = 'Platypush MIDI plugin',
|
||||
midi_devices: Optional[Sequence[Union[str, int]]] = None,
|
||||
poll_interval: Optional[float] = 5.0,
|
||||
event_resolution: Optional[float] = None,
|
||||
**kwargs,
|
||||
):
|
||||
"""
|
||||
:param device_name: MIDI virtual device name (default: *Platypush virtual MIDI output*)
|
||||
:type device_name: str
|
||||
:param device_name: Name of the MIDI device associated to this plugin.
|
||||
:param midi_devices: List of MIDI devices to open and monitor for
|
||||
events, by name or by port number. If set, and ``poll_interval``
|
||||
is set, then the plugin will only listen for events from these
|
||||
devices. If not set, and ``poll_interval`` is set, then the plugin
|
||||
will listen for events from all available MIDI devices (default).
|
||||
:param poll_interval: How often the plugin should scan for new MIDI
|
||||
devices. Set this to 0 or null to disable polling.
|
||||
:param event_resolution: If set, then the plugin will throttle
|
||||
MIDI events to the specified time resolution. If an event is
|
||||
triggered within the specified time resolution, then the event will
|
||||
be throttled and the last event will be discarded. This is useful
|
||||
to avoid sending too many MIDI messages in a short time frame.
|
||||
Default: no throttling.
|
||||
"""
|
||||
import rtmidi
|
||||
|
||||
super().__init__(**kwargs)
|
||||
super().__init__(poll_interval=poll_interval, **kwargs)
|
||||
|
||||
self._devices = MidiDevices()
|
||||
self.device_name = device_name
|
||||
self.midiout = rtmidi.MidiOut()
|
||||
available_ports = self.midiout.get_ports()
|
||||
|
||||
if available_ports:
|
||||
self.midiout.open_port(0)
|
||||
self.logger.info('Initialized MIDI plugin on port 0')
|
||||
else:
|
||||
self.midiout.open_virtual_port(self.device_name)
|
||||
self._devices_to_monitor = {
|
||||
self._to_device_id(dev) for dev in (midi_devices or [])
|
||||
}
|
||||
self._midi_out = None
|
||||
self._midi_in = None
|
||||
self._last_midi_event = None
|
||||
self._last_event_trigger_time = None
|
||||
self._event_timer = None
|
||||
self._event_timer_lock = RLock()
|
||||
self._event_resolution = event_resolution
|
||||
self.logger.info(
|
||||
'Initialized MIDI plugin on virtual device {}'.format(self.device_name)
|
||||
'Initialized MIDI plugin on virtual device %s', self.device_name
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _to_device_id(device: Union[str, int]) -> Union[str, int]:
|
||||
try:
|
||||
device = int(device)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
return device
|
||||
|
||||
@property
|
||||
def midi_in(self) -> rtmidi.MidiIn: # type: ignore
|
||||
if not self._midi_in:
|
||||
self._midi_in = rtmidi.MidiIn() # type: ignore
|
||||
self._midi_in.open_port()
|
||||
|
||||
return self._midi_in
|
||||
|
||||
@property
|
||||
def midi_out(self) -> rtmidi.MidiOut: # type: ignore
|
||||
if not self._midi_out:
|
||||
self._midi_out = rtmidi.MidiOut() # type: ignore
|
||||
self._midi_out.open_port()
|
||||
|
||||
return self._midi_out
|
||||
|
||||
def start_event_timer(self):
|
||||
def flush_last_event():
|
||||
self._last_event_trigger_time = time.time()
|
||||
|
||||
if not self._last_midi_event:
|
||||
return
|
||||
|
||||
self._bus.post(self._last_midi_event)
|
||||
|
||||
with self._event_timer_lock:
|
||||
if not self._event_timer:
|
||||
self._event_timer = Timer(self._event_resolution or 0, flush_last_event)
|
||||
|
||||
self._event_timer.start()
|
||||
|
||||
return self._event_timer
|
||||
|
||||
def _on_midi_message(self, device: Optional[MidiDevice] = None):
|
||||
def callback(message, data):
|
||||
# rtmidi will provide a tuple in the format
|
||||
# (midi_message, time_since_last_event)
|
||||
# delay = message[1]
|
||||
message = message[0]
|
||||
self._last_midi_event = event = MidiMessageEvent(
|
||||
message=message,
|
||||
data=data,
|
||||
device=device.name if device else None,
|
||||
port=device.port if device else None,
|
||||
)
|
||||
|
||||
if self._event_resolution and self._last_event_trigger_time:
|
||||
event_delta = time.time() - self._last_event_trigger_time
|
||||
if event_delta < self._event_resolution:
|
||||
self.start_event_timer()
|
||||
self.logger.debug('Skipping throttled message: %s', message)
|
||||
return
|
||||
|
||||
self._last_event_trigger_time = time.time()
|
||||
self._bus.post(event)
|
||||
|
||||
return callback
|
||||
|
||||
@action
|
||||
def send_message(self, values):
|
||||
def send_message(
|
||||
self, values: Sequence[int], device: Optional[Union[int, str]] = None
|
||||
):
|
||||
"""
|
||||
:param values: Values is expected to be a list containing the MIDI command code and the command parameters -
|
||||
see reference at https://ccrma.stanford.edu/~craig/articles/linuxmidi/misc/essenmidi.html
|
||||
:type values: list[int]
|
||||
:param values: Values is expected to be a list containing the MIDI
|
||||
command code and the command parameters - `see reference
|
||||
<https://ccrma.stanford.edu/~craig/articles/linuxmidi/misc/essenmidi.html>`_.
|
||||
|
||||
Available MIDI commands:
|
||||
|
||||
* ``0x80`` Note Off
|
||||
* ``0x90`` Note On
|
||||
* ``0xA0`` Aftertouch
|
||||
|
@ -60,25 +158,40 @@ class MidiPlugin(Plugin):
|
|||
* ``0xFC`` Stop (Sys Realtime)
|
||||
* ``0xFE`` Active Sensing (Sys Realtime)
|
||||
* ``0xFF`` System Reset (Sys Realtime)
|
||||
"""
|
||||
|
||||
self.midiout.send_message(values)
|
||||
:param device: MIDI port to send the message to, by number or by name.
|
||||
If None then the message will be sent to the default port allocated
|
||||
for the plugin.
|
||||
"""
|
||||
if isinstance(device, (str, int)):
|
||||
if isinstance(device, str):
|
||||
dev = self._devices.by_type(MidiDeviceType.OUTPUT).get(device)
|
||||
else:
|
||||
dev = self._devices.by_port(device)
|
||||
|
||||
assert dev, f'Could not find device by name {device}'
|
||||
assert (
|
||||
dev.device_type == MidiDeviceType.OUTPUT
|
||||
), f'The device {device} is not an output device'
|
||||
|
||||
midi_out = dev.midi_out
|
||||
if not (midi_out and midi_out.is_port_open()):
|
||||
dev.open()
|
||||
else:
|
||||
midi_out = self.midi_out
|
||||
|
||||
assert midi_out and midi_out.is_port_open(), 'No MIDI output port available'
|
||||
midi_out.send_message(values)
|
||||
|
||||
@action
|
||||
def play_note(self, note, velocity, duration=0):
|
||||
def play(self, note: int, velocity: int, duration: float = 0):
|
||||
"""
|
||||
Play a note with selected velocity and duration.
|
||||
|
||||
:param note: MIDI note in range 0-127 with #60 = C4
|
||||
:type note: int
|
||||
|
||||
:param velocity: MIDI note velocity in range 0-127
|
||||
:type velocity: int
|
||||
|
||||
:param duration: Note duration in seconds. Pass 0 if you don't want the note to get off
|
||||
:type duration: float
|
||||
"""
|
||||
|
||||
self.send_message([0x90, note, velocity]) # Note on
|
||||
self._played_notes.add(note)
|
||||
|
||||
|
@ -88,42 +201,110 @@ class MidiPlugin(Plugin):
|
|||
self._played_notes.remove(note)
|
||||
|
||||
@action
|
||||
def release_note(self, note):
|
||||
def release(self, note: int):
|
||||
"""
|
||||
Release a played note.
|
||||
|
||||
:param note: MIDI note in range 0-127 with #60 = C4
|
||||
:type note: int
|
||||
"""
|
||||
|
||||
self.send_message([0x80, note, 0]) # Note off
|
||||
self._played_notes.remove(note)
|
||||
|
||||
@action
|
||||
def release_all_notes(self):
|
||||
def release_all(self):
|
||||
"""
|
||||
Release all the notes being played.
|
||||
"""
|
||||
|
||||
played_notes = self._played_notes.copy()
|
||||
for note in played_notes:
|
||||
self.release_note(note)
|
||||
self.release(note)
|
||||
|
||||
@action
|
||||
def query_ports(self):
|
||||
def query(self):
|
||||
"""
|
||||
:returns: dict: A list of the available MIDI ports with index and name
|
||||
:returns: dict: A list of the available MIDI ports with index and name.
|
||||
Format: ``port_index: device_name``.
|
||||
|
||||
.. code-block:: json
|
||||
|
||||
{
|
||||
"in": {
|
||||
0: "Midi Through:Midi Through Port-0 14:0",
|
||||
1: "MPK mini 3:MPK mini 3 MIDI 1 32:0",
|
||||
2: "X-TOUCH MINI:X-TOUCH MINI MIDI 1 36:0",
|
||||
3: "RtMidiOut Client:Platypush MIDI plugin 129:0"
|
||||
},
|
||||
"out": {
|
||||
0: "Midi Through:Midi Through Port-0 14:0",
|
||||
1: "MPK mini 3:MPK mini 3 MIDI 1 32:0",
|
||||
2: "X-TOUCH MINI:X-TOUCH MINI MIDI 1 36:0"
|
||||
}
|
||||
}
|
||||
|
||||
"""
|
||||
in_ports = {int(port): dev for port, dev in enumerate(self.midi_in.get_ports())}
|
||||
out_ports = {
|
||||
int(port): dev for port, dev in enumerate(self.midi_out.get_ports())
|
||||
}
|
||||
|
||||
import rtmidi
|
||||
for device_type, devices in (
|
||||
(MidiDeviceType.INPUT, in_ports),
|
||||
(MidiDeviceType.OUTPUT, out_ports),
|
||||
):
|
||||
for port, name in devices.items():
|
||||
dev = MidiDevice(
|
||||
name=name,
|
||||
port=port,
|
||||
device_type=device_type,
|
||||
callback=self._on_midi_message,
|
||||
)
|
||||
|
||||
in_ports = rtmidi.MidiIn().get_ports()
|
||||
out_ports = rtmidi.MidiOut().get_ports()
|
||||
self._devices.add(dev)
|
||||
dev = self._devices.by_port(port)
|
||||
assert dev, f'Could not find device by port {port}'
|
||||
|
||||
if not self._is_self(dev) and (
|
||||
not self._devices_to_monitor
|
||||
or port in self._devices_to_monitor
|
||||
or name in self._devices_to_monitor
|
||||
):
|
||||
dev.open()
|
||||
|
||||
return {
|
||||
'in': dict(enumerate(in_ports)),
|
||||
'out': dict(enumerate(out_ports)),
|
||||
'in': in_ports,
|
||||
'out': out_ports,
|
||||
}
|
||||
|
||||
def _is_self(self, device: MidiDevice):
|
||||
return device.name == self.device_name or re.search(
|
||||
r'^RtMidi(In|Out) Client:', device.name
|
||||
)
|
||||
|
||||
def cleanup(self):
|
||||
self.release_all()
|
||||
if self._midi_in:
|
||||
self._midi_in.close_port()
|
||||
self._midi_in = None
|
||||
|
||||
if self._midi_out:
|
||||
self._midi_out.close_port()
|
||||
self._midi_out = None
|
||||
|
||||
self._devices.close()
|
||||
|
||||
def main(self):
|
||||
# Don't run the polling logic if the poll_interval is not set
|
||||
if not self.poll_interval:
|
||||
self.wait_stop()
|
||||
return
|
||||
|
||||
while not self.should_stop():
|
||||
self.query()
|
||||
self.wait_stop(self.poll_interval)
|
||||
|
||||
def stop(self):
|
||||
self.cleanup()
|
||||
super().stop()
|
||||
|
||||
|
||||
# vim:sw=4:ts=4:et:
|
||||
|
|
180
platypush/plugins/midi/_model.py
Normal file
180
platypush/plugins/midi/_model.py
Normal file
|
@ -0,0 +1,180 @@
|
|||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
from typing import Any, Callable, Dict, Optional, Sequence, Union
|
||||
|
||||
import rtmidi
|
||||
|
||||
from platypush.context import get_bus
|
||||
from platypush.message.event.midi import (
|
||||
MidiDeviceAddedEvent,
|
||||
MidiDeviceConnectedEvent,
|
||||
MidiDeviceDisconnectedEvent,
|
||||
MidiDeviceRemovedEvent,
|
||||
)
|
||||
|
||||
|
||||
class MidiDeviceType(Enum):
|
||||
"""
|
||||
Enum for MIDI device types.
|
||||
"""
|
||||
|
||||
INPUT = 'input'
|
||||
OUTPUT = 'output'
|
||||
|
||||
|
||||
@dataclass
|
||||
class MidiDevice:
|
||||
"""
|
||||
Data class for MIDI devices.
|
||||
"""
|
||||
|
||||
name: str
|
||||
port: int
|
||||
device_type: MidiDeviceType
|
||||
midi_in: Optional[rtmidi.MidiIn] = None # type: ignore
|
||||
midi_out: Optional[rtmidi.MidiOut] = None # type: ignore
|
||||
callback: Optional[
|
||||
Callable[
|
||||
[Optional["MidiDevice"]],
|
||||
Callable[[Sequence[int], Optional[Any]], Optional[Any]],
|
||||
]
|
||||
] = None
|
||||
|
||||
def open(self):
|
||||
"""
|
||||
Open the MIDI device.
|
||||
"""
|
||||
if self.device_type == MidiDeviceType.INPUT and not self.midi_in:
|
||||
get_bus().post(MidiDeviceConnectedEvent(device=self.name, port=self.port))
|
||||
self.midi_in = rtmidi.MidiIn() # type: ignore
|
||||
self.midi_in.open_port(self.port)
|
||||
if self.callback:
|
||||
self.midi_in.set_callback(self.callback(self))
|
||||
elif self.device_type == MidiDeviceType.OUTPUT and not self.midi_out:
|
||||
self.midi_out = rtmidi.MidiOut() # type: ignore
|
||||
self.midi_out.open_port(self.port)
|
||||
|
||||
def close(self):
|
||||
"""
|
||||
Close the MIDI device.
|
||||
"""
|
||||
if self.midi_in:
|
||||
self.midi_in.close_port()
|
||||
self.midi_in = None
|
||||
get_bus().post(
|
||||
MidiDeviceDisconnectedEvent(device=self.name, port=self.port)
|
||||
)
|
||||
|
||||
if self.midi_out:
|
||||
self.midi_out.close_port()
|
||||
self.midi_out = None
|
||||
get_bus().post(
|
||||
MidiDeviceDisconnectedEvent(device=self.name, port=self.port)
|
||||
)
|
||||
|
||||
def is_open(self) -> bool:
|
||||
"""
|
||||
Check if the MIDI device is open.
|
||||
|
||||
:returns: True if the MIDI device is open, False otherwise.
|
||||
"""
|
||||
return (self.midi_in is not None and self.midi_in.is_port_open()) or (
|
||||
self.midi_out is not None and self.midi_out.is_port_open()
|
||||
)
|
||||
|
||||
|
||||
class MidiDevices:
|
||||
"""
|
||||
Class to manage MIDI devices.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self._devices_by_port = {}
|
||||
self._devices_by_name = {}
|
||||
self._devices_by_type = {MidiDeviceType.INPUT: {}, MidiDeviceType.OUTPUT: {}}
|
||||
|
||||
def add(self, device: MidiDevice):
|
||||
"""
|
||||
Add a MIDI device to the list of available devices.
|
||||
|
||||
:param device: The MIDI device to add.
|
||||
"""
|
||||
if (
|
||||
device.port not in self._devices_by_port
|
||||
or self._devices_by_port[device.port].name != device.name
|
||||
):
|
||||
self._devices_by_port[device.port] = device
|
||||
|
||||
if device.name not in self._devices_by_name:
|
||||
get_bus().post(MidiDeviceAddedEvent(device=device.name, port=device.port))
|
||||
self._devices_by_name[device.name] = device
|
||||
|
||||
if device.port not in self._devices_by_type[device.device_type]:
|
||||
self._devices_by_type[device.device_type][device.name] = device
|
||||
|
||||
def remove(self, device: MidiDevice):
|
||||
"""
|
||||
Remove a MIDI device from the list of available devices.
|
||||
|
||||
:param device: The MIDI device to remove.
|
||||
"""
|
||||
device.close()
|
||||
del self._devices_by_port[device.port]
|
||||
del self._devices_by_name[device.name]
|
||||
del self._devices_by_type[device.device_type][device.name]
|
||||
get_bus().post(MidiDeviceRemovedEvent(device=device.name, port=device.port))
|
||||
|
||||
def by_port(self, port: int) -> Optional[MidiDevice]:
|
||||
"""
|
||||
Get a MIDI device by port.
|
||||
|
||||
:param port: The port of the MIDI device.
|
||||
:returns: The MIDI device if found, None otherwise.
|
||||
"""
|
||||
return self._devices_by_port.get(port)
|
||||
|
||||
def by_name(self, name: str) -> Optional[MidiDevice]:
|
||||
"""
|
||||
Get a MIDI device by name.
|
||||
|
||||
:param name: The name of the MIDI device.
|
||||
:returns: The MIDI device if found, None otherwise.
|
||||
"""
|
||||
return self._devices_by_name.get(name)
|
||||
|
||||
def by_type(self, device_type: MidiDeviceType) -> Dict[str, MidiDevice]:
|
||||
"""
|
||||
Get all MIDI devices of a certain type.
|
||||
|
||||
:param device_type: The type of the MIDI devices.
|
||||
:returns: A list of MIDI devices of the specified type.
|
||||
"""
|
||||
return self._devices_by_type[device_type]
|
||||
|
||||
def get(self, device: Union[str, int]) -> Optional[MidiDevice]:
|
||||
"""
|
||||
Get a MIDI device by name or port.
|
||||
|
||||
:param device: The name or port of the MIDI device.
|
||||
:returns: The MIDI device if found, None otherwise.
|
||||
"""
|
||||
if isinstance(device, str):
|
||||
return self.by_name(device)
|
||||
if isinstance(device, int):
|
||||
return self.by_port(device)
|
||||
return None
|
||||
|
||||
def close(self):
|
||||
"""
|
||||
Close and clear all MIDI devices.
|
||||
"""
|
||||
devices = list(self._devices_by_port.values())
|
||||
for dev in devices:
|
||||
self.remove(dev)
|
||||
|
||||
self._devices_by_port.clear()
|
||||
self._devices_by_name.clear()
|
||||
self._devices_by_type = {MidiDeviceType.INPUT: {}, MidiDeviceType.OUTPUT: {}}
|
||||
|
||||
|
||||
# vim:sw=4:ts=4:et:
|
|
@ -1,13 +1,19 @@
|
|||
manifest:
|
||||
events: {}
|
||||
events:
|
||||
- platypush.message.event.midi.MidiDeviceAddedEvent
|
||||
- platypush.message.event.midi.MidiDeviceConnectedEvent
|
||||
- platypush.message.event.midi.MidiDeviceDisconnectedEvent
|
||||
- platypush.message.event.midi.MidiDeviceRemovedEvent
|
||||
- platypush.message.event.midi.MidiMessageEvent
|
||||
install:
|
||||
apt:
|
||||
- python3-rtmidi
|
||||
dnf:
|
||||
- python-rtmidi
|
||||
pacman:
|
||||
- rtmidi
|
||||
- python-rtmidi
|
||||
pip:
|
||||
- python-rtmidi
|
||||
- rtmidi
|
||||
package: platypush.plugins.midi
|
||||
type: plugin
|
||||
|
|
Loading…
Reference in a new issue