diff --git a/platypush/backend/http/app/streaming/plugins/sound.py b/platypush/backend/http/app/streaming/plugins/sound.py index 69e7b45ad..b1688f3d9 100644 --- a/platypush/backend/http/app/streaming/plugins/sound.py +++ b/platypush/backend/http/app/streaming/plugins/sound.py @@ -32,7 +32,7 @@ class SoundRoute(StreamingRoute): @contextmanager def _audio_stream(self, **kwargs) -> Generator[None, None, None]: response = send_request( - 'sound.stream_recording', + 'sound.record', dtype='int16', **kwargs, ) diff --git a/platypush/plugins/sound/__init__.py b/platypush/plugins/sound/__init__.py index 31dfa3130..55d382da9 100644 --- a/platypush/plugins/sound/__init__.py +++ b/platypush/plugins/sound/__init__.py @@ -1,8 +1,8 @@ import os import queue import stat -import tempfile import time +import warnings from enum import Enum from threading import Thread, Event, RLock @@ -438,7 +438,20 @@ class SoundPlugin(Plugin): self.stop_playback([stream_index]) @action - def stream_recording( + def stream_recording(self, *args, **kwargs): + """ + Deprecated alias for :meth:`.record`. + """ + warnings.warn( + 'sound.stream_recording is deprecated, use sound.record instead', + DeprecationWarning, + stacklevel=1, + ) + + return self.record(*args, **kwargs) + + @action + def record( self, device: Optional[str] = None, fifo: Optional[str] = None, @@ -451,6 +464,7 @@ class SoundPlugin(Plugin): channels: int = 1, redis_queue: Optional[str] = None, format: str = 'wav', + stream: bool = True, ): """ Return audio data from an audio source @@ -474,6 +488,8 @@ class SoundPlugin(Plugin): :param redis_queue: If set, the audio chunks will also be published to this Redis channel, so other consumers can process them downstream. :param format: Audio format. Supported: wav, mp3, ogg, aac. Default: wav. + :param stream: If True (default), then the audio will be streamed to an + HTTP endpoint too (default: ``/sound/stream<.format>``). """ self.recording_paused_changed.clear() @@ -563,7 +579,7 @@ class SoundPlugin(Plugin): continue f.write(data) - if redis_queue: + if redis_queue and stream: get_redis().publish(redis_queue, data) except queue.Empty: @@ -574,186 +590,6 @@ class SoundPlugin(Plugin): Thread(target=streaming_thread).start() - @action - def record( - self, - outfile=None, - duration=None, - device=None, - sample_rate=None, - format=None, - blocksize=None, - latency='high', - channels=1, - subtype='PCM_24', - ): - """ - Records audio to a sound file (support formats: wav, raw) - - :param outfile: Sound file (default: the method will create a temporary file with the recording) - :type outfile: str - - :param duration: Recording duration in seconds (default: record until stop event) - :type duration: float - - :param device: Input device (default: default configured device or system default audio input if not configured) - :type device: int or str - - :param sample_rate: Recording sample rate (default: device default rate) - :type sample_rate: int - - :param format: Audio format (default: WAV) - :type format: str - - :param blocksize: Audio block size (default: configured `input_blocksize` or 2048) - :type blocksize: int - - :param latency: Device latency in seconds (default: the device's default high latency) - :type latency: float - - :param channels: Number of channels (default: 1) - :type channels: int - - :param subtype: Recording subtype - see `Soundfile docs - Subtypes - `_ - for a list of the available subtypes (default: PCM_24) - :type subtype: str - """ - - def recording_thread( - outfile, - duration, - device, - sample_rate, - format, - blocksize, - latency, - channels, - subtype, - ): - self.recording_paused_changed.clear() - - if outfile: - outfile = os.path.abspath(os.path.expanduser(outfile)) - if os.path.isfile(outfile): - self.logger.info('Removing existing audio file %s', outfile) - os.unlink(outfile) - else: - outfile = tempfile.NamedTemporaryFile( - prefix='recording_', - suffix='.wav', - delete=False, - dir=tempfile.gettempdir(), - ).name - - if device is None: - device = self.input_device - if device is None: - device = self._get_default_device('input') - - if sample_rate is None: - dev_info = sd.query_devices(device, 'input') - sample_rate = int(dev_info['default_samplerate']) - - if blocksize is None: - blocksize = self.input_blocksize - - q = queue.Queue() - - def audio_callback(indata, frames, duration, status): - while self._get_recording_state() == RecordingState.PAUSED: - self.recording_paused_changed.wait() - - if status: - self.logger.warning('Recording callback status: %s', status) - - q.put( - { - 'timestamp': time.time(), - 'frames': frames, - 'time': duration, - 'data': indata.copy(), - } - ) - - try: - with sf.SoundFile( - outfile, - mode='w', - samplerate=sample_rate, - format=format, - channels=channels, - subtype=subtype, - ) as f: - with sd.InputStream( - samplerate=sample_rate, - device=device, - channels=channels, - callback=audio_callback, - latency=latency, - blocksize=blocksize, - ): - self.start_recording() - get_bus().post(SoundRecordingStartedEvent(filename=outfile)) - self.logger.info( - 'Started recording from device [%s] to [%s]', - device, - outfile, - ) - - recording_started_time = time.time() - - while ( - self._get_recording_state() != RecordingState.STOPPED - and ( - duration is None - or time.time() - recording_started_time < duration - ) - ): - while self._get_recording_state() == RecordingState.PAUSED: - self.recording_paused_changed.wait() - - get_args = ( - { - 'block': True, - 'timeout': max( - 0, - duration - - (time.time() - recording_started_time), - ), - } - if duration is not None - else {} - ) - - data = q.get(**get_args) - if data and time.time() - data.get('timestamp') <= 1.0: - # Only write the block if the latency is still acceptable - f.write(data['data']) - - f.flush() - - except queue.Empty: - self.logger.warning('Recording timeout: audio callback failed?') - finally: - self.stop_recording() - get_bus().post(SoundRecordingStoppedEvent(filename=outfile)) - - Thread( - target=recording_thread, - args=( - outfile, - duration, - device, - sample_rate, - format, - blocksize, - latency, - channels, - subtype, - ), - ).start() - @action def recordplay( self,