platypush/platypush/backend/midi.py

120 lines
4 KiB
Python
Raw Normal View History

import json
import time
import rtmidi
from threading import Timer
from platypush.backend import Backend
from platypush.message.event.midi import MidiMessageEvent
class MidiBackend(Backend):
"""
This backend will listen for events from a MIDI device and post a
MidiMessageEvent whenever a new MIDI event happens.
It requires `rtmidi`, `pip install rtmidi`
"""
def __init__(self, device_name=None, port_number=None,
midi_throttle_time=None, *args, **kwargs):
"""
Params:
device_name -- Name of the MIDI device.
*N.B.* either `device_name` or `port_number` must be set
port_number -- MIDI port number
*N.B.* either `device_name` or `port_number` must be set
midi_throttle_time -- If set, the MIDI events will be throttled -
max one per selected time frame (in seconds). Set this parameter
if you want to synchronize MIDI events with plugins that
normally operate with a lower throughput.
"""
super().__init__(*args, **kwargs)
if (device_name and port_number is not None) or \
(not device_name and port_number is None):
raise RuntimeError('Either device_name or port_number (not both) ' +
'must be set in the MIDI backend configuration')
self.midi_throttle_time = midi_throttle_time
self.midi = rtmidi.MidiIn()
self.last_trigger_event_time = None
self.midi_flush_timeout = None
ports = self.midi.get_ports()
if not ports:
raise RuntimeError('No MIDI devices available')
if device_name:
if device_name not in ports:
raise RuntimeError('MIDI device "{}" not found'.format(device_name))
self.port_number = ports.index(device_name)
self.device_name = device_name
if port_number:
if port_number < 0 or port_number >= len(ports):
raise RuntimeError('MIDI port {} not found')
self.port_number = port_number
self.device_name = ports[port_number]
self.midi.set_callback(self._on_midi_message())
def _on_midi_message(self):
def flush_midi_message(message):
def _f():
2018-06-06 20:09:18 +02:00
self.logger.info('Flushing throttled MIDI message {} to the bus'.format(message))
delay = time.time() - self.last_trigger_event_time
self.bus.post(MidiMessageEvent(message=message, delay=delay))
return _f
def callback(message, data):
# rtmidi will provide a tuple in the format
# (midi_message, time_since_last_event)
delay = message[1]
message = message[0]
if self.midi_throttle_time and self.last_trigger_event_time:
event_delta = time.time() - self.last_trigger_event_time
if event_delta < self.midi_throttle_time:
2018-06-06 20:09:18 +02:00
self.logger.debug('Skipping throttled message {}'.format(message))
if self.midi_flush_timeout:
self.midi_flush_timeout.cancel()
self.midi_flush_timeout = Timer(
self.midi_throttle_time-event_delta,
flush_midi_message(message))
self.midi_flush_timeout.start()
return
self.last_trigger_event_time = time.time()
self.bus.post(MidiMessageEvent(message=message, delay=delay))
return callback
def run(self):
super().run()
self.midi.open_port(self.port_number)
2018-06-06 20:09:18 +02:00
self.logger.info('Initialized MIDI backend, listening for events on device {}'.
format(self.device_name))
while not self.should_stop():
try:
time.sleep(1)
except Exception as e:
2018-06-06 20:09:18 +02:00
self.logger.exception(e)
if self.midi:
self.midi.close_port(self.port_number)
self.midi = None
# vim:sw=4:ts=4:et: