diff --git a/platypush/backend/midi.py b/platypush/backend/midi.py new file mode 100644 index 00000000..2c8725cf --- /dev/null +++ b/platypush/backend/midi.py @@ -0,0 +1,122 @@ +import logging +import json +import time + +import rtmidi + +from threading import Timer + +from platypush.backend import Backend +from platypush.message.event.midi import MidiMessageEvent + +logger = logging.getLogger(__name__) + + +class MidiBackend(Backend): + """ + This backend will listen for events from a MIDI device and post a + MidiMessageEvent whenever a new MIDI event happens. + + It requires `rtmidi`, `pip install rtmidi` + """ + + def __init__(self, device_name=None, port_number=None, + midi_throttle_time=None, *args, **kwargs): + """ + Params: + device_name -- Name of the MIDI device. + *N.B.* either `device_name` or `port_number` must be set + port_number -- MIDI port number + *N.B.* either `device_name` or `port_number` must be set + midi_throttle_time -- If set, the MIDI events will be throttled - + max one per selected time frame (in seconds). Set this parameter + if you want to synchronize MIDI events with plugins that + normally operate with a lower throughput. + """ + super().__init__(*args, **kwargs) + + if (device_name and port_number is not None) or \ + (not device_name and port_number is None): + raise RuntimeError('Either device_name or port_number (not both) ' + + 'must be set in the MIDI backend configuration') + + self.midi_throttle_time = midi_throttle_time + self.midi = rtmidi.MidiIn() + self.last_trigger_event_time = None + self.midi_flush_timeout = None + ports = self.midi.get_ports() + + if not ports: + raise RuntimeError('No MIDI devices available') + + if device_name: + if device_name not in ports: + raise RuntimeError('MIDI device "{}" not found'.format(device_name)) + + self.port_number = ports.index(device_name) + self.device_name = device_name + + if port_number: + if port_number < 0 or port_number >= len(ports): + raise RuntimeError('MIDI port {} not found') + + self.port_number = port_number + self.device_name = ports[port_number] + + self.midi.set_callback(self._on_midi_message()) + + + def _on_midi_message(self): + def flush_midi_message(message): + def _f(): + logger.info('Flushing throttled MIDI message {} to the bus'.format(message)) + delay = time.time() - self.last_trigger_event_time + self.bus.post(MidiMessageEvent(message=message, delay=delay)) + return _f + + def callback(message, data): + # rtmidi will provide a tuple in the format + # (midi_message, time_since_last_event) + delay = message[1] + message = message[0] + + if self.midi_throttle_time and self.last_trigger_event_time: + event_delta = time.time() - self.last_trigger_event_time + if event_delta < self.midi_throttle_time: + logger.debug('Skipping throttled message {}'.format(message)) + if self.midi_flush_timeout: + self.midi_flush_timeout.cancel() + + self.midi_flush_timeout = Timer( + self.midi_throttle_time-event_delta, + flush_midi_message(message)) + + self.midi_flush_timeout.start() + return + + self.last_trigger_event_time = time.time() + self.bus.post(MidiMessageEvent(message=message, delay=delay)) + + return callback + + + def run(self): + super().run() + + self.midi.open_port(self.port_number) + logger.info('Initialized MIDI backend, listening for events on device {}'. + format(self.device_name)) + + while not self.should_stop(): + try: + time.sleep(1) + except Exception as e: + logging.exception(e) + + if self.midi: + self.midi.close_port(self.port_number) + self.midi = None + + +# vim:sw=4:ts=4:et: + diff --git a/platypush/message/event/midi.py b/platypush/message/event/midi.py new file mode 100644 index 00000000..ca300c86 --- /dev/null +++ b/platypush/message/event/midi.py @@ -0,0 +1,10 @@ +from platypush.message.event import Event + + +class MidiMessageEvent(Event): + def __init__(self, message, delay=None, *args, **kwargs): + super().__init__(*args, message=message, delay=delay, **kwargs) + + +# vim:sw=4:ts=4:et: +