- Refactored the class sound out of the plugin module

- More robust stream naming and index assignment logic
This commit is contained in:
Fabio Manganiello 2018-12-24 17:12:11 +01:00
parent f53d1c06dc
commit 65465e3a18
2 changed files with 162 additions and 152 deletions

View file

@ -12,6 +12,7 @@ import time
from enum import Enum from enum import Enum
from threading import Thread, Event, RLock from threading import Thread, Event, RLock
from .core import Sound
from platypush.plugins import Plugin, action from platypush.plugins import Plugin, action
@ -27,147 +28,6 @@ class RecordingState(Enum):
PAUSED='PAUSED' PAUSED='PAUSED'
class Sound(object):
"""
Class model a synthetic sound that can be played through the audio device
"""
STANDARD_A_FREQUENCY = 440.0
STANDARD_A_MIDI_NOTE = 69
_DEFAULT_BLOCKSIZE = 2048
_DEFAULT_BUFSIZE = 20
_DEFAULT_SAMPLERATE = 44100
midi_note = None
frequency = None
gain = 1.0
duration = None
def __init__(self, midi_note=midi_note, frequency=None, gain=gain,
duration=duration, A_frequency=STANDARD_A_FREQUENCY):
"""
You can construct a sound either from a MIDI note or a base frequency
:param midi_note: MIDI note code, see
https://newt.phys.unsw.edu.au/jw/graphics/notes.GIF
:type midi_note: int
:param frequency: Sound base frequency in Hz
:type frequency: float
:param gain: Note gain/volume between 0.0 and 1.0 (default: 1.0)
:type gain: float
:param duration: Note duration in seconds. Default: keep until
release/pause/stop
:type duration: float
:param A_frequency: Reference A4 frequency (default: 440 Hz)
:type A_frequency: float
"""
if midi_note and frequency:
raise RuntimeError('Please specify either a MIDI note or a base ' +
'frequency')
if midi_note:
self.midi_note = midi_note
self.frequency = self.note_to_freq(midi_note=midi_note,
A_frequency=A_frequency)
elif frequency:
self.frequency = frequency
self.midi_note = self.freq_to_note(frequency=frequency,
A_frequency=A_frequency)
else:
raise RuntimeError('Please specify either a MIDI note or a base ' +
'frequency')
self.gain = gain
self.duration = duration
@classmethod
def note_to_freq(cls, midi_note, A_frequency=STANDARD_A_FREQUENCY):
"""
Converts a MIDI note to its frequency in Hz
:param midi_note: MIDI note to convert
:type midi_note: int
:param A_frequency: Reference A4 frequency (default: 440 Hz)
:type A_frequency: float
"""
return (2.0 ** ((midi_note - cls.STANDARD_A_MIDI_NOTE) / 12.0)) \
* A_frequency
@classmethod
def freq_to_note(cls, frequency, A_frequency=STANDARD_A_FREQUENCY):
"""
Converts a frequency in Hz to its closest MIDI note
:param frequency: Frequency in Hz
:type midi_note: float
:param A_frequency: Reference A4 frequency (default: 440 Hz)
:type A_frequency: float
"""
# TODO return also the offset in % between the provided frequency
# and the standard MIDI note frequency
return int(12.0 * math.log(frequency/A_frequency, 2)
+ cls.STANDARD_A_MIDI_NOTE)
def get_wave(self, t_start=0., t_end=0., samplerate=_DEFAULT_SAMPLERATE):
"""
Get the wave binary data associated to this sound
:param t_start: Start offset for the sine wave in seconds. Default: 0
:type t_start: float
:param t_end: End offset for the sine wave in seconds. Default: 0
:type t_end: float
:param samplerate: Audio sample rate. Default: 44100 Hz
:type samplerate: int
:returns: A numpy.ndarray[n,1] with the raw float values
"""
import numpy as np
x = np.linspace(t_start, t_end, int((t_end-t_start)*samplerate))
x = x.reshape(len(x), 1)
return self.gain * np.sin(2 * np.pi * self.frequency * x)
def __str__(self):
return json.dumps({
'midi_note': midi_note,
'frequency': frequency,
'gain': gain,
'duration': duration,
})
@classmethod
def build(cls, *args, **kwargs):
"""
Construct a sound object either from a JSON representation or a
key-value representation
"""
if args:
if isinstance(args[0], cls):
return args[0]
if isinstance(args[0], str):
kwargs = json.loads(args[0])
elif isinstance(args[0], dict):
kwargs = args[0]
if kwargs:
return Sound(**kwargs)
raise RuntimeError('Usage: {}'.format(__doc__))
class SoundPlugin(Plugin): class SoundPlugin(Plugin):
""" """
Plugin to interact with a sound device. Plugin to interact with a sound device.
@ -214,7 +74,7 @@ class SoundPlugin(Plugin):
self.recording_state = RecordingState.STOPPED self.recording_state = RecordingState.STOPPED
self.recording_state_lock = RLock() self.recording_state_lock = RLock()
self.recording_paused_changed = Event() self.recording_paused_changed = Event()
self.active_players = {} self.active_streams = {}
self.completed_callback_events = {} self.completed_callback_events = {}
def _get_default_device(self, category): def _get_default_device(self, category):
@ -678,7 +538,7 @@ class SoundPlugin(Plugin):
@action @action
def get_active_players(self): def get_active_streams(self):
""" """
:returns: A list of active players :returns: A list of active players
""" """
@ -689,7 +549,7 @@ class SoundPlugin(Plugin):
'channels', 'cpu_load', 'device', 'dtype', 'channels', 'cpu_load', 'device', 'dtype',
'latency', 'samplerate', 'samplesize'] 'latency', 'samplerate', 'samplesize']
if hasattr(stream, attr) if hasattr(stream, attr)
} for i, stream in self.active_players.items() } for i, stream in self.active_streams.items()
} }
@ -697,8 +557,13 @@ class SoundPlugin(Plugin):
stream_index = None stream_index = None
with self.playback_state_lock: with self.playback_state_lock:
self.playback_state = PlaybackState.PLAYING self.playback_state = PlaybackState.PLAYING
stream_index = len(self.active_players) stream_index = None
self.active_players[stream_index] = stream for i in range(len(self.active_streams)+1):
if i not in self.active_streams:
stream_index = i
break
self.active_streams[stream_index] = stream
self.completed_callback_events[stream_index] = \ self.completed_callback_events[stream_index] = \
completed_callback_event if completed_callback_event else Event() completed_callback_event if completed_callback_event else Event()
@ -710,15 +575,15 @@ class SoundPlugin(Plugin):
def stop_playback(self, *streams): def stop_playback(self, *streams):
with self.playback_state_lock: with self.playback_state_lock:
if not streams: if not streams:
streams = self.active_players.keys() streams = self.active_streams.keys()
updated_n_players = len(self.active_players) updated_n_players = len(self.active_streams)
completed_callback_events = {} completed_callback_events = {}
for i in streams: for i in streams:
if i is None or not (i in self.active_players): if i is None or not (i in self.active_streams):
continue continue
stream = self.active_players[i] stream = self.active_streams[i]
updated_n_players -= 1 updated_n_players -= 1
if self.completed_callback_events[i]: if self.completed_callback_events[i]:
completed_callback_events[i] = self.completed_callback_events[i] completed_callback_events[i] = self.completed_callback_events[i]
@ -729,7 +594,7 @@ class SoundPlugin(Plugin):
for i, event in completed_callback_events.items(): for i, event in completed_callback_events.items():
event.wait() event.wait()
del self.completed_callback_events[i] del self.completed_callback_events[i]
del self.active_players[i] del self.active_streams[i]
self.logger.info('Playback stopped') self.logger.info('Playback stopped')
@ -787,4 +652,3 @@ class SoundPlugin(Plugin):
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et:

View file

@ -0,0 +1,146 @@
class Sound(object):
"""
Class model a synthetic sound that can be played through the audio device
"""
STANDARD_A_FREQUENCY = 440.0
STANDARD_A_MIDI_NOTE = 69
_DEFAULT_BLOCKSIZE = 2048
_DEFAULT_BUFSIZE = 20
_DEFAULT_SAMPLERATE = 44100
midi_note = None
frequency = None
gain = 1.0
duration = None
def __init__(self, midi_note=midi_note, frequency=None, gain=gain,
duration=duration, A_frequency=STANDARD_A_FREQUENCY):
"""
You can construct a sound either from a MIDI note or a base frequency
:param midi_note: MIDI note code, see
https://newt.phys.unsw.edu.au/jw/graphics/notes.GIF
:type midi_note: int
:param frequency: Sound base frequency in Hz
:type frequency: float
:param gain: Note gain/volume between 0.0 and 1.0 (default: 1.0)
:type gain: float
:param duration: Note duration in seconds. Default: keep until
release/pause/stop
:type duration: float
:param A_frequency: Reference A4 frequency (default: 440 Hz)
:type A_frequency: float
"""
if midi_note and frequency:
raise RuntimeError('Please specify either a MIDI note or a base ' +
'frequency')
if midi_note:
self.midi_note = midi_note
self.frequency = self.note_to_freq(midi_note=midi_note,
A_frequency=A_frequency)
elif frequency:
self.frequency = frequency
self.midi_note = self.freq_to_note(frequency=frequency,
A_frequency=A_frequency)
else:
raise RuntimeError('Please specify either a MIDI note or a base ' +
'frequency')
self.gain = gain
self.duration = duration
@classmethod
def note_to_freq(cls, midi_note, A_frequency=STANDARD_A_FREQUENCY):
"""
Converts a MIDI note to its frequency in Hz
:param midi_note: MIDI note to convert
:type midi_note: int
:param A_frequency: Reference A4 frequency (default: 440 Hz)
:type A_frequency: float
"""
return (2.0 ** ((midi_note - cls.STANDARD_A_MIDI_NOTE) / 12.0)) \
* A_frequency
@classmethod
def freq_to_note(cls, frequency, A_frequency=STANDARD_A_FREQUENCY):
"""
Converts a frequency in Hz to its closest MIDI note
:param frequency: Frequency in Hz
:type midi_note: float
:param A_frequency: Reference A4 frequency (default: 440 Hz)
:type A_frequency: float
"""
# TODO return also the offset in % between the provided frequency
# and the standard MIDI note frequency
return int(12.0 * math.log(frequency/A_frequency, 2)
+ cls.STANDARD_A_MIDI_NOTE)
def get_wave(self, t_start=0., t_end=0., samplerate=_DEFAULT_SAMPLERATE):
"""
Get the wave binary data associated to this sound
:param t_start: Start offset for the sine wave in seconds. Default: 0
:type t_start: float
:param t_end: End offset for the sine wave in seconds. Default: 0
:type t_end: float
:param samplerate: Audio sample rate. Default: 44100 Hz
:type samplerate: int
:returns: A numpy.ndarray[n,1] with the raw float values
"""
import numpy as np
x = np.linspace(t_start, t_end, int((t_end-t_start)*samplerate))
x = x.reshape(len(x), 1)
return self.gain * np.sin(2 * np.pi * self.frequency * x)
def __str__(self):
import json
return json.dumps({
'midi_note': midi_note,
'frequency': frequency,
'gain': gain,
'duration': duration,
})
@classmethod
def build(cls, *args, **kwargs):
"""
Construct a sound object either from a JSON representation or a
key-value representation
"""
import json
if args:
if isinstance(args[0], cls):
return args[0]
if isinstance(args[0], str):
kwargs = json.loads(args[0])
elif isinstance(args[0], dict):
kwargs = args[0]
if kwargs:
return Sound(**kwargs)
raise RuntimeError('Usage: {}'.format(__doc__))
# vim:sw=4:ts=4:et: