From d89184358a5921bd3e49c389c3c6610fca4170fb Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Tue, 25 Dec 2018 17:31:20 +0100 Subject: [PATCH] Support for playback control for individual sound streams --- platypush/plugins/sound/plugin.py | 153 ++++++++++++++++++------------ 1 file changed, 92 insertions(+), 61 deletions(-) diff --git a/platypush/plugins/sound/plugin.py b/platypush/plugins/sound/plugin.py index 962e31e5f..5922c68c1 100644 --- a/platypush/plugins/sound/plugin.py +++ b/platypush/plugins/sound/plugin.py @@ -68,9 +68,9 @@ class SoundPlugin(Plugin): self.output_blocksize = output_blocksize self.playback_bufsize = playback_bufsize - self.playback_state = PlaybackState.STOPPED + self.playback_state = {} self.playback_state_lock = RLock() - self.playback_paused_changed = Event() + self.playback_paused_changed = {} self.recording_state = RecordingState.STOPPED self.recording_state_lock = RLock() self.recording_paused_changed = Event() @@ -135,16 +135,17 @@ class SoundPlugin(Plugin): return devs - def _play_audio_callback(self, q, blocksize, streamtype): + def _play_audio_callback(self, q, blocksize, streamtype, stream_index): import sounddevice as sd is_raw_stream = streamtype == sd.RawOutputStream - def audio_callback(outdata, frames, time, status): - if self._get_playback_state() == PlaybackState.STOPPED: - raise sd.CallbackAbort - while self._get_playback_state() == PlaybackState.PAUSED: - self.playback_paused_changed.wait() + def audio_callback(outdata, frames, time, status): + if self._get_playback_state(stream_index) == PlaybackState.STOPPED: + raise sd.CallbackStop + + while self._get_playback_state(stream_index) == PlaybackState.PAUSED: + self.playback_paused_changed[stream_index].wait() assert frames == blocksize if status.output_underflow: @@ -213,7 +214,7 @@ class SoundPlugin(Plugin): :param stream_index: If specified, play to an already active stream index (you can get them through - :method:`platypush.plugins.sound.query_active_streams`). Default: + :method:`platypush.plugins.sound.query_streams`). Default: creates a new audio stream through PortAudio. """ @@ -226,8 +227,6 @@ class SoundPlugin(Plugin): if blocksize is None: blocksize = self.output_blocksize - self.playback_paused_changed.clear() - q = queue.Queue(maxsize=bufsize) f = None t = 0. @@ -249,7 +248,7 @@ class SoundPlugin(Plugin): if not channels: channels = f.channels if f else 1 - self.logger.info('Starting playback of {} to device [{}]'. + self.logger.info('Starting playback of {} to sound device [{}]'. format(file or sound, device)) if sound: @@ -273,25 +272,23 @@ class SoundPlugin(Plugin): if sound.duration is not None and t >= sound.duration: break - while self._get_playback_state() == PlaybackState.PAUSED: - self.playback_paused_changed.wait() - q.put_nowait(data) # Pre-fill the audio queue if stream_index is None: - completed_callback_event = Event() streamtype = sd.RawOutputStream if file else sd.OutputStream + stream_index = self._allocate_stream_index() + completed_callback_event = self.completed_callback_events[stream_index] stream = streamtype(samplerate=samplerate, blocksize=blocksize, device=device, channels=channels, dtype='float32', callback=self._play_audio_callback( q=q, blocksize=blocksize, - streamtype=streamtype), + streamtype=streamtype, + stream_index=stream_index), finished_callback=completed_callback_event.set) - stream_index = self.start_playback(stream, - completed_callback_event) + self._start_playback(stream_index=stream_index, stream=stream) else: stream = self.active_streams[stream_index] completed_callback_event = self.completed_callback_events[stream_index] @@ -302,8 +299,9 @@ class SoundPlugin(Plugin): timeout = blocksize * bufsize / samplerate while True: - while self._get_playback_state() == PlaybackState.PAUSED: - self.playback_paused_changed.wait() + while self._get_playback_state(stream_index) == \ + PlaybackState.PAUSED: + self.playback_paused_changed[stream_index].wait() if f: data = f.buffer_read(blocksize, dtype='float32') @@ -321,18 +319,22 @@ class SoundPlugin(Plugin): if sound.duration is not None and t >= sound.duration: break - if self._get_playback_state() == PlaybackState.STOPPED: - raise sd.CallbackAbort + if self._get_playback_state(stream_index) == \ + PlaybackState.STOPPED: + break try: q.put(data, timeout=timeout) except queue.Full as e: - if self._get_playback_state() != PlaybackState.PAUSED: + if self._get_playback_state(stream_index) != \ + PlaybackState.PAUSED: raise e completed_callback_event.wait() except queue.Full as e: - self.logger.warning('Playback timeout: audio callback failed?') + if stream_index is None or \ + self._get_playback_state(stream_index) != PlaybackState.STOPPED: + self.logger.warning('Playback timeout: audio callback failed?') finally: if f and not f.closed: f.close() @@ -483,7 +485,6 @@ class SoundPlugin(Plugin): import sounddevice as sd - self.playback_paused_changed.clear() self.recording_paused_changed.clear() if input_device is None: @@ -520,15 +521,17 @@ class SoundPlugin(Plugin): import soundfile as sf import numpy + stream_index = self._allocate_stream_index() stream = sd.Stream(samplerate=sample_rate, channels=channels, blocksize=blocksize, latency=latency, device=(input_device, output_device), dtype=dtype, callback=audio_callback) self.start_recording() - stream_index = self.start_playback(stream) + self._start_playback(stream_index=stream_index, + stream=stream) self.logger.info('Started recording pass-through from device ' + - '[{}] to device [{}]'. + '[{}] to sound device [{}]'. format(input_device, output_device)) recording_started_time = time.time() @@ -549,11 +552,12 @@ class SoundPlugin(Plugin): @action - def query_active_streams(self): + def query_streams(self): """ - :returns: A list of active players + :returns: A list of active audio streams """ - return { + + streams = { i: { attr: getattr(stream, attr) for attr in ['active', 'closed', 'stopped', 'blocksize', @@ -563,36 +567,56 @@ class SoundPlugin(Plugin): } for i, stream in self.active_streams.items() } + for i, stream in streams.items(): + stream['playback_state'] = self.playback_state[i].name - def start_playback(self, stream, completed_callback_event=None): + return streams + + + def _allocate_stream_index(self, completed_callback_event=None): stream_index = None + with self.playback_state_lock: - self.playback_state = PlaybackState.PLAYING - stream_index = None for i in range(len(self.active_streams)+1): if i not in self.active_streams: stream_index = i break - self.active_streams[stream_index] = stream + if stream_index is None: + raise RuntimeError('No stream index available') + + self.active_streams[stream_index] = None self.completed_callback_events[stream_index] = \ completed_callback_event if completed_callback_event else Event() + return stream_index + + def _start_playback(self, stream_index, stream): + with self.playback_state_lock: + self.playback_state[stream_index] = PlaybackState.PLAYING + self.active_streams[stream_index] = stream + + if isinstance(self.playback_paused_changed.get(stream_index), Event): + self.playback_paused_changed[stream_index].clear() + else: + self.playback_paused_changed[stream_index] = Event() + self.logger.info('Playback started on stream index {}'. format(stream_index)) + return stream_index @action def stop_playback(self, streams=None): """ - :param streams: Stream to stop by index (default: all) + :param streams: Streams to stop by index (default: all) :type streams: list[int] """ with self.playback_state_lock: + streams = streams or self.active_streams.keys() if not streams: - streams = self.active_streams.keys() - updated_n_players = len(self.active_streams) + return completed_callback_events = {} for i in streams: @@ -600,33 +624,47 @@ class SoundPlugin(Plugin): continue stream = self.active_streams[i] - updated_n_players -= 1 if self.completed_callback_events[i]: completed_callback_events[i] = self.completed_callback_events[i] - - if not updated_n_players: - self.playback_state = PlaybackState.STOPPED + self.playback_state[i] = PlaybackState.STOPPED for i, event in completed_callback_events.items(): event.wait() del self.completed_callback_events[i] del self.active_streams[i] - self.logger.info('Playback stopped') + self.logger.info('Playback stopped on streams [{}]'.format( + ', '.join([str(stream) for stream in + completed_callback_events.keys()]))) @action - def pause_playback(self): + def pause_playback(self, streams=None): + """ + :param streams: Streams to pause by index (default: all) + :type streams: list[int] + """ + with self.playback_state_lock: - if self.playback_state == PlaybackState.PAUSED: - self.playback_state = PlaybackState.PLAYING - elif self.playback_state == PlaybackState.PLAYING: - self.playback_state = PlaybackState.PAUSED - else: + streams = streams or self.active_streams.keys() + if not streams: return - self.playback_paused_changed.set() - self.logger.info('Playback ' + ('paused' if self.playback_state == - PlaybackState.PAUSED else 'playing')) + for i in streams: + if i is None or not (i in self.active_streams): + continue + + stream = self.active_streams[i] + if self.playback_state[i] == PlaybackState.PAUSED: + self.playback_state[i] = PlaybackState.PLAYING + elif self.playback_state[i] == PlaybackState.PLAYING: + self.playback_state[i] = PlaybackState.PAUSED + else: + continue + + self.playback_paused_changed[i].set() + + self.logger.info('Playback pause toggled on streams [{}]'.format( + ', '.join([str(stream) for stream in streams]))) def start_recording(self): with self.recording_state_lock: @@ -651,20 +689,13 @@ class SoundPlugin(Plugin): self.logger.info('Recording paused state toggled') self.recording_paused_changed.set() - def _get_playback_state(self): + def _get_playback_state(self, stream_index): with self.playback_state_lock: - return self.playback_state + return self.playback_state[stream_index] def _get_recording_state(self): with self.recording_state_lock: return self.recording_state - @action - def get_state(self): - return { - 'playback_state': self._get_playback_state().name, - 'recording_state': self._get_recording_state().name, - } - # vim:sw=4:ts=4:et: