diff --git a/platypush/backend/redis.py b/platypush/backend/redis.py index b1d80534..d53db81f 100644 --- a/platypush/backend/redis.py +++ b/platypush/backend/redis.py @@ -78,9 +78,13 @@ class RedisBackend(Backend): format(self.queue, self.redis_args)) while not self.should_stop(): - msg = self.get_message() - self.logger.info('Received message on the Redis backend: {}'.format(msg)) - self.on_message(msg) + try: + msg = self.get_message() + self.logger.info('Received message on the Redis backend: {}'. + format(msg)) + self.on_message(msg) + except Exception as e: + self.logger.exception(e) # vim:sw=4:ts=4:et: diff --git a/platypush/plugins/sound/core.py b/platypush/plugins/sound/core.py index ef72028e..0378fc9d 100644 --- a/platypush/plugins/sound/core.py +++ b/platypush/plugins/sound/core.py @@ -13,8 +13,9 @@ class Sound(object): STANDARD_A_FREQUENCY = 440.0 STANDARD_A_MIDI_NOTE = 69 - _DEFAULT_BLOCKSIZE = 2048 - _DEFAULT_BUFSIZE = 20 + _DEFAULT_BLOCKSIZE = 1024 + _DEFAULT_SYNTH_BUFSIZE = 2 + _DEFAULT_FILE_BUFSIZE = 20 _DEFAULT_SAMPLERATE = 44100 midi_note = None @@ -159,9 +160,11 @@ class Mix(object): through an audio stream to an audio device """ - _sounds = [] + _sounds = None def __init__(self, *sounds): + self._sounds = [] + for sound in sounds: self.add(sound) @@ -235,4 +238,21 @@ class Mix(object): return wave + def duration(self): + """ + :returns: The duration of the mix in seconds as duration of its longest + sample, or None if the mixed sample have no duration set + """ + + duration = 0 + + for sound in self._sounds: + if sound.duration is None: + return None + + duration = max(duration, sound.duration) + + return duration + + # vim:sw=4:ts=4:et: diff --git a/platypush/plugins/sound/plugin.py b/platypush/plugins/sound/plugin.py index 5922c68c..930465ba 100644 --- a/platypush/plugins/sound/plugin.py +++ b/platypush/plugins/sound/plugin.py @@ -12,7 +12,7 @@ import time from enum import Enum from threading import Thread, Event, RLock -from .core import Sound +from .core import Sound, Mix from platypush.plugins import Plugin, action @@ -41,8 +41,7 @@ class SoundPlugin(Plugin): def __init__(self, input_device=None, output_device=None, input_blocksize=Sound._DEFAULT_BLOCKSIZE, - output_blocksize=Sound._DEFAULT_BLOCKSIZE, - playback_bufsize=Sound._DEFAULT_BUFSIZE, *args, **kwargs): + output_blocksize=Sound._DEFAULT_BLOCKSIZE, *args, **kwargs): """ :param input_device: Index or name of the default input device. Use :method:`platypush.plugins.sound.query_devices` to get the available devices. Default: system default :type input_device: int or str @@ -55,9 +54,6 @@ class SoundPlugin(Plugin): :param output_blocksize: Blocksize to be applied to the output device. Try to increase this value if you get output underflow errors while playing. Default: 2048 :type output_blocksize: int - - :param playback_bufsize: Number of audio blocks that will be cached while playing (default: 20) - :type playback_bufsize: int """ super().__init__(*args, **kwargs) @@ -66,11 +62,11 @@ class SoundPlugin(Plugin): self.output_device = output_device self.input_blocksize = input_blocksize self.output_blocksize = output_blocksize - self.playback_bufsize = playback_bufsize self.playback_state = {} self.playback_state_lock = RLock() self.playback_paused_changed = {} + self.stream_mixes = {} self.recording_state = RecordingState.STOPPED self.recording_state_lock = RLock() self.recording_paused_changed = Event() @@ -173,7 +169,7 @@ class SoundPlugin(Plugin): @action def play(self, file=None, sound=None, device=None, blocksize=None, - bufsize=Sound._DEFAULT_BUFSIZE, samplerate=None, channels=None, + bufsize=None, samplerate=None, channels=None, stream_index=None): """ Plays a sound file (support formats: wav, raw) or a synthetic sound. @@ -201,7 +197,8 @@ class SoundPlugin(Plugin): `output_blocksize` or 2048) :type blocksize: int - :param bufsize: Size of the audio buffer (default: 20) + :param bufsize: Size of the audio buffer (default: 20 frames for audio + files, 2 frames for synth sounds) :type bufsize: int :param samplerate: Audio samplerate. Default: audio file samplerate if @@ -227,9 +224,17 @@ class SoundPlugin(Plugin): if blocksize is None: blocksize = self.output_blocksize + if bufsize is None: + if file: + bufsize = Sound._DEFAULT_FILE_BUFSIZE + else: + bufsize = Sound._DEFAULT_SYNTH_BUFSIZE + q = queue.Queue(maxsize=bufsize) f = None t = 0. + is_new_stream = stream_index is None or \ + self.active_streams.get(stream_index) is None if file: file = os.path.abspath(os.path.expanduser(file)) @@ -239,21 +244,28 @@ class SoundPlugin(Plugin): if device is None: device = self._get_default_device('output') + if file: + import soundfile as sf + f = sf.SoundFile(file) + if not samplerate: + samplerate = f.samplerate if f else Sound._DEFAULT_SAMPLERATE + if not channels: + channels = f.channels if f else 1 + + self.logger.info('Starting playback of {} to sound device [{}]'. + format(file or sound, device)) + + if is_new_stream: + stream_index = self._allocate_stream_index() + + if sound: + mix = self.stream_mixes[stream_index] + mix.add(sound) + + if not is_new_stream: + return # Let the existing callback handle the new mix + try: - if file: - import soundfile as sf - f = sf.SoundFile(file) - if not samplerate: - samplerate = f.samplerate if f else Sound._DEFAULT_SAMPLERATE - if not channels: - channels = f.channels if f else 1 - - self.logger.info('Starting playback of {} to sound device [{}]'. - format(file or sound, device)) - - if sound: - sound = Sound.build(sound) - # Audio queue pre-fill loop for _ in range(bufsize): if f: @@ -261,24 +273,26 @@ class SoundPlugin(Plugin): if not data: break else: + duration = mix.duration() blocktime = float(blocksize / samplerate) - next_t = min(t+blocktime, sound.duration) \ - if sound.duration is not None else t+blocktime + next_t = min(t+blocktime, duration) \ + if duration is not None else t+blocktime - data = sound.get_wave(t_start=t, t_end=next_t, + data = mix.get_wave(t_start=t, t_end=next_t, samplerate=samplerate) t = next_t - if sound.duration is not None and t >= sound.duration: + if duration is not None and t >= duration: break q.put_nowait(data) # Pre-fill the audio queue - if stream_index is None: + stream = self.active_streams[stream_index] + completed_callback_event = self.completed_callback_events[stream_index] + + if stream is None: streamtype = sd.RawOutputStream if file else sd.OutputStream - stream_index = self._allocate_stream_index() - completed_callback_event = self.completed_callback_events[stream_index] stream = streamtype(samplerate=samplerate, blocksize=blocksize, device=device, channels=channels, dtype='float32', @@ -289,9 +303,6 @@ class SoundPlugin(Plugin): finished_callback=completed_callback_event.set) self._start_playback(stream_index=stream_index, stream=stream) - else: - stream = self.active_streams[stream_index] - completed_callback_event = self.completed_callback_events[stream_index] with stream: # Timeout set until we expect all the buffered blocks to @@ -308,15 +319,16 @@ class SoundPlugin(Plugin): if not data: break else: + duration = mix.duration() blocktime = float(blocksize / samplerate) - next_t = min(t+blocktime, sound.duration) \ - if sound.duration is not None else t+blocktime + next_t = min(t+blocktime, duration) \ + if duration is not None else t+blocktime - data = sound.get_wave(t_start=t, t_end=next_t, + data = mix.get_wave(t_start=t, t_end=next_t, samplerate=samplerate) t = next_t - if sound.duration is not None and t >= sound.duration: + if duration is not None and t >= duration: break if self._get_playback_state(stream_index) == \ @@ -586,6 +598,7 @@ class SoundPlugin(Plugin): raise RuntimeError('No stream index available') self.active_streams[stream_index] = None + self.stream_mixes[stream_index] = Mix() self.completed_callback_events[stream_index] = \ completed_callback_event if completed_callback_event else Event() @@ -632,6 +645,7 @@ class SoundPlugin(Plugin): event.wait() del self.completed_callback_events[i] del self.active_streams[i] + del self.stream_mixes[i] self.logger.info('Playback stopped on streams [{}]'.format( ', '.join([str(stream) for stream in