Support for playback control for individual sound streams

This commit is contained in:
Fabio Manganiello 2018-12-25 17:31:20 +01:00
parent 3baf0b1589
commit d89184358a

View file

@ -68,9 +68,9 @@ class SoundPlugin(Plugin):
self.output_blocksize = output_blocksize self.output_blocksize = output_blocksize
self.playback_bufsize = playback_bufsize self.playback_bufsize = playback_bufsize
self.playback_state = PlaybackState.STOPPED self.playback_state = {}
self.playback_state_lock = RLock() self.playback_state_lock = RLock()
self.playback_paused_changed = Event() self.playback_paused_changed = {}
self.recording_state = RecordingState.STOPPED self.recording_state = RecordingState.STOPPED
self.recording_state_lock = RLock() self.recording_state_lock = RLock()
self.recording_paused_changed = Event() self.recording_paused_changed = Event()
@ -135,16 +135,17 @@ class SoundPlugin(Plugin):
return devs return devs
def _play_audio_callback(self, q, blocksize, streamtype): def _play_audio_callback(self, q, blocksize, streamtype, stream_index):
import sounddevice as sd import sounddevice as sd
is_raw_stream = streamtype == sd.RawOutputStream is_raw_stream = streamtype == sd.RawOutputStream
def audio_callback(outdata, frames, time, status):
if self._get_playback_state() == PlaybackState.STOPPED:
raise sd.CallbackAbort
while self._get_playback_state() == PlaybackState.PAUSED: def audio_callback(outdata, frames, time, status):
self.playback_paused_changed.wait() if self._get_playback_state(stream_index) == PlaybackState.STOPPED:
raise sd.CallbackStop
while self._get_playback_state(stream_index) == PlaybackState.PAUSED:
self.playback_paused_changed[stream_index].wait()
assert frames == blocksize assert frames == blocksize
if status.output_underflow: if status.output_underflow:
@ -213,7 +214,7 @@ class SoundPlugin(Plugin):
:param stream_index: If specified, play to an already active stream :param stream_index: If specified, play to an already active stream
index (you can get them through index (you can get them through
:method:`platypush.plugins.sound.query_active_streams`). Default: :method:`platypush.plugins.sound.query_streams`). Default:
creates a new audio stream through PortAudio. creates a new audio stream through PortAudio.
""" """
@ -226,8 +227,6 @@ class SoundPlugin(Plugin):
if blocksize is None: if blocksize is None:
blocksize = self.output_blocksize blocksize = self.output_blocksize
self.playback_paused_changed.clear()
q = queue.Queue(maxsize=bufsize) q = queue.Queue(maxsize=bufsize)
f = None f = None
t = 0. t = 0.
@ -249,7 +248,7 @@ class SoundPlugin(Plugin):
if not channels: if not channels:
channels = f.channels if f else 1 channels = f.channels if f else 1
self.logger.info('Starting playback of {} to device [{}]'. self.logger.info('Starting playback of {} to sound device [{}]'.
format(file or sound, device)) format(file or sound, device))
if sound: if sound:
@ -273,25 +272,23 @@ class SoundPlugin(Plugin):
if sound.duration is not None and t >= sound.duration: if sound.duration is not None and t >= sound.duration:
break break
while self._get_playback_state() == PlaybackState.PAUSED:
self.playback_paused_changed.wait()
q.put_nowait(data) # Pre-fill the audio queue q.put_nowait(data) # Pre-fill the audio queue
if stream_index is None: if stream_index is None:
completed_callback_event = Event()
streamtype = sd.RawOutputStream if file else sd.OutputStream streamtype = sd.RawOutputStream if file else sd.OutputStream
stream_index = self._allocate_stream_index()
completed_callback_event = self.completed_callback_events[stream_index]
stream = streamtype(samplerate=samplerate, blocksize=blocksize, stream = streamtype(samplerate=samplerate, blocksize=blocksize,
device=device, channels=channels, device=device, channels=channels,
dtype='float32', dtype='float32',
callback=self._play_audio_callback( callback=self._play_audio_callback(
q=q, blocksize=blocksize, q=q, blocksize=blocksize,
streamtype=streamtype), streamtype=streamtype,
stream_index=stream_index),
finished_callback=completed_callback_event.set) finished_callback=completed_callback_event.set)
stream_index = self.start_playback(stream, self._start_playback(stream_index=stream_index, stream=stream)
completed_callback_event)
else: else:
stream = self.active_streams[stream_index] stream = self.active_streams[stream_index]
completed_callback_event = self.completed_callback_events[stream_index] completed_callback_event = self.completed_callback_events[stream_index]
@ -302,8 +299,9 @@ class SoundPlugin(Plugin):
timeout = blocksize * bufsize / samplerate timeout = blocksize * bufsize / samplerate
while True: while True:
while self._get_playback_state() == PlaybackState.PAUSED: while self._get_playback_state(stream_index) == \
self.playback_paused_changed.wait() PlaybackState.PAUSED:
self.playback_paused_changed[stream_index].wait()
if f: if f:
data = f.buffer_read(blocksize, dtype='float32') data = f.buffer_read(blocksize, dtype='float32')
@ -321,17 +319,21 @@ class SoundPlugin(Plugin):
if sound.duration is not None and t >= sound.duration: if sound.duration is not None and t >= sound.duration:
break break
if self._get_playback_state() == PlaybackState.STOPPED: if self._get_playback_state(stream_index) == \
raise sd.CallbackAbort PlaybackState.STOPPED:
break
try: try:
q.put(data, timeout=timeout) q.put(data, timeout=timeout)
except queue.Full as e: except queue.Full as e:
if self._get_playback_state() != PlaybackState.PAUSED: if self._get_playback_state(stream_index) != \
PlaybackState.PAUSED:
raise e raise e
completed_callback_event.wait() completed_callback_event.wait()
except queue.Full as e: except queue.Full as e:
if stream_index is None or \
self._get_playback_state(stream_index) != PlaybackState.STOPPED:
self.logger.warning('Playback timeout: audio callback failed?') self.logger.warning('Playback timeout: audio callback failed?')
finally: finally:
if f and not f.closed: if f and not f.closed:
@ -483,7 +485,6 @@ class SoundPlugin(Plugin):
import sounddevice as sd import sounddevice as sd
self.playback_paused_changed.clear()
self.recording_paused_changed.clear() self.recording_paused_changed.clear()
if input_device is None: if input_device is None:
@ -520,15 +521,17 @@ class SoundPlugin(Plugin):
import soundfile as sf import soundfile as sf
import numpy import numpy
stream_index = self._allocate_stream_index()
stream = sd.Stream(samplerate=sample_rate, channels=channels, stream = sd.Stream(samplerate=sample_rate, channels=channels,
blocksize=blocksize, latency=latency, blocksize=blocksize, latency=latency,
device=(input_device, output_device), device=(input_device, output_device),
dtype=dtype, callback=audio_callback) dtype=dtype, callback=audio_callback)
self.start_recording() self.start_recording()
stream_index = self.start_playback(stream) self._start_playback(stream_index=stream_index,
stream=stream)
self.logger.info('Started recording pass-through from device ' + self.logger.info('Started recording pass-through from device ' +
'[{}] to device [{}]'. '[{}] to sound device [{}]'.
format(input_device, output_device)) format(input_device, output_device))
recording_started_time = time.time() recording_started_time = time.time()
@ -549,11 +552,12 @@ class SoundPlugin(Plugin):
@action @action
def query_active_streams(self): def query_streams(self):
""" """
:returns: A list of active players :returns: A list of active audio streams
""" """
return {
streams = {
i: { i: {
attr: getattr(stream, attr) attr: getattr(stream, attr)
for attr in ['active', 'closed', 'stopped', 'blocksize', for attr in ['active', 'closed', 'stopped', 'blocksize',
@ -563,36 +567,56 @@ class SoundPlugin(Plugin):
} for i, stream in self.active_streams.items() } for i, stream in self.active_streams.items()
} }
for i, stream in streams.items():
stream['playback_state'] = self.playback_state[i].name
def start_playback(self, stream, completed_callback_event=None): return streams
def _allocate_stream_index(self, completed_callback_event=None):
stream_index = None stream_index = None
with self.playback_state_lock: with self.playback_state_lock:
self.playback_state = PlaybackState.PLAYING
stream_index = None
for i in range(len(self.active_streams)+1): for i in range(len(self.active_streams)+1):
if i not in self.active_streams: if i not in self.active_streams:
stream_index = i stream_index = i
break break
self.active_streams[stream_index] = stream if stream_index is None:
raise RuntimeError('No stream index available')
self.active_streams[stream_index] = None
self.completed_callback_events[stream_index] = \ self.completed_callback_events[stream_index] = \
completed_callback_event if completed_callback_event else Event() completed_callback_event if completed_callback_event else Event()
return stream_index
def _start_playback(self, stream_index, stream):
with self.playback_state_lock:
self.playback_state[stream_index] = PlaybackState.PLAYING
self.active_streams[stream_index] = stream
if isinstance(self.playback_paused_changed.get(stream_index), Event):
self.playback_paused_changed[stream_index].clear()
else:
self.playback_paused_changed[stream_index] = Event()
self.logger.info('Playback started on stream index {}'. self.logger.info('Playback started on stream index {}'.
format(stream_index)) format(stream_index))
return stream_index return stream_index
@action @action
def stop_playback(self, streams=None): def stop_playback(self, streams=None):
""" """
:param streams: Stream to stop by index (default: all) :param streams: Streams to stop by index (default: all)
:type streams: list[int] :type streams: list[int]
""" """
with self.playback_state_lock: with self.playback_state_lock:
streams = streams or self.active_streams.keys()
if not streams: if not streams:
streams = self.active_streams.keys() return
updated_n_players = len(self.active_streams)
completed_callback_events = {} completed_callback_events = {}
for i in streams: for i in streams:
@ -600,33 +624,47 @@ class SoundPlugin(Plugin):
continue continue
stream = self.active_streams[i] stream = self.active_streams[i]
updated_n_players -= 1
if self.completed_callback_events[i]: if self.completed_callback_events[i]:
completed_callback_events[i] = self.completed_callback_events[i] completed_callback_events[i] = self.completed_callback_events[i]
self.playback_state[i] = PlaybackState.STOPPED
if not updated_n_players:
self.playback_state = PlaybackState.STOPPED
for i, event in completed_callback_events.items(): for i, event in completed_callback_events.items():
event.wait() event.wait()
del self.completed_callback_events[i] del self.completed_callback_events[i]
del self.active_streams[i] del self.active_streams[i]
self.logger.info('Playback stopped') self.logger.info('Playback stopped on streams [{}]'.format(
', '.join([str(stream) for stream in
completed_callback_events.keys()])))
@action @action
def pause_playback(self): def pause_playback(self, streams=None):
"""
:param streams: Streams to pause by index (default: all)
:type streams: list[int]
"""
with self.playback_state_lock: with self.playback_state_lock:
if self.playback_state == PlaybackState.PAUSED: streams = streams or self.active_streams.keys()
self.playback_state = PlaybackState.PLAYING if not streams:
elif self.playback_state == PlaybackState.PLAYING:
self.playback_state = PlaybackState.PAUSED
else:
return return
self.playback_paused_changed.set() for i in streams:
self.logger.info('Playback ' + ('paused' if self.playback_state == if i is None or not (i in self.active_streams):
PlaybackState.PAUSED else 'playing')) continue
stream = self.active_streams[i]
if self.playback_state[i] == PlaybackState.PAUSED:
self.playback_state[i] = PlaybackState.PLAYING
elif self.playback_state[i] == PlaybackState.PLAYING:
self.playback_state[i] = PlaybackState.PAUSED
else:
continue
self.playback_paused_changed[i].set()
self.logger.info('Playback pause toggled on streams [{}]'.format(
', '.join([str(stream) for stream in streams])))
def start_recording(self): def start_recording(self):
with self.recording_state_lock: with self.recording_state_lock:
@ -651,20 +689,13 @@ class SoundPlugin(Plugin):
self.logger.info('Recording paused state toggled') self.logger.info('Recording paused state toggled')
self.recording_paused_changed.set() self.recording_paused_changed.set()
def _get_playback_state(self): def _get_playback_state(self, stream_index):
with self.playback_state_lock: with self.playback_state_lock:
return self.playback_state return self.playback_state[stream_index]
def _get_recording_state(self): def _get_recording_state(self):
with self.recording_state_lock: with self.recording_state_lock:
return self.recording_state return self.recording_state
@action
def get_state(self):
return {
'playback_state': self._get_playback_state().name,
'recording_state': self._get_recording_state().name,
}
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et: