diff --git a/platypush/plugins/sound/__init__.py b/platypush/plugins/sound/__init__.py index 0979d35ee..c89f34658 100644 --- a/platypush/plugins/sound/__init__.py +++ b/platypush/plugins/sound/__init__.py @@ -1,42 +1,24 @@ +from collections import defaultdict import os import queue import stat -import time +from typing_extensions import override import warnings -from enum import Enum -from threading import Thread, Event, RLock -from typing import Optional, Union +from threading import Event, RLock +from typing import Dict, Optional, Union import sounddevice as sd import soundfile as sf -from platypush.context import get_bus -from platypush.message.event.sound import ( - SoundRecordingStartedEvent, - SoundRecordingStoppedEvent, -) - -from platypush.plugins import Plugin, action -from platypush.utils import get_redis +from platypush.plugins import RunnablePlugin, action from .core import Sound, Mix -from ._converter import ConverterProcess +from ._controllers import AudioRecorder +from ._model import AudioState -class PlaybackState(Enum): - STOPPED = 'STOPPED' - PLAYING = 'PLAYING' - PAUSED = 'PAUSED' - - -class RecordingState(Enum): - STOPPED = 'STOPPED' - RECORDING = 'RECORDING' - PAUSED = 'PAUSED' - - -class SoundPlugin(Plugin): +class SoundPlugin(RunnablePlugin): """ Plugin to interact with a sound device. @@ -105,24 +87,36 @@ class SoundPlugin(Plugin): self.playback_state_lock = RLock() self.playback_paused_changed = {} self.stream_mixes = {} - self.recording_state = RecordingState.STOPPED - self.recording_state_lock = RLock() - self.recording_paused_changed = Event() self.active_streams = {} self.stream_name_to_index = {} self.stream_index_to_name = {} self.completed_callback_events = {} self.ffmpeg_bin = ffmpeg_bin + self._recorders: Dict[str, AudioRecorder] = {} + self._recorder_locks: Dict[str, RLock] = defaultdict(RLock) + @staticmethod - def _get_default_device(category): + def _get_default_device(category: str) -> str: """ Query the default audio devices. :param category: Device category to query. Can be either input or output - :type category: str """ - return sd.query_hostapis()[0].get('default_' + category.lower() + '_device') # type: ignore + host_apis = sd.query_hostapis() + assert host_apis, 'No sound devices found' + available_devices = list( + filter( + lambda x: x is not None, + ( + host_api.get('default_' + category.lower() + '_device') # type: ignore + for host_api in host_apis + ), + ), + ) + + assert available_devices, f'No default "{category}" device found' + return available_devices[0] @action def query_devices(self, category=None): @@ -175,10 +169,10 @@ class SoundPlugin(Plugin): is_raw_stream = streamtype == sd.RawOutputStream def audio_callback(outdata, frames, *, status): - if self._get_playback_state(stream_index) == PlaybackState.STOPPED: + if self._get_playback_state(stream_index) == AudioState.STOPPED: raise sd.CallbackStop - while self._get_playback_state(stream_index) == PlaybackState.PAUSED: + while self._get_playback_state(stream_index) == AudioState.PAUSED: self.playback_paused_changed[stream_index].wait() if frames != blocksize: @@ -378,7 +372,7 @@ class SoundPlugin(Plugin): finished_callback=completed_callback_event.set, ) - self._start_playback(stream_index=stream_index, stream=stream) + self.start_playback(stream_index=stream_index, stream=stream) with stream: # Timeout set until we expect all the buffered blocks to @@ -386,9 +380,7 @@ class SoundPlugin(Plugin): timeout = blocksize * bufsize / samplerate while True: - while ( - self._get_playback_state(stream_index) == PlaybackState.PAUSED - ): + while self._get_playback_state(stream_index) == AudioState.PAUSED: self.playback_paused_changed[stream_index].wait() if f: @@ -412,23 +404,20 @@ class SoundPlugin(Plugin): if duration is not None and t >= duration: break - if self._get_playback_state(stream_index) == PlaybackState.STOPPED: + if self._get_playback_state(stream_index) == AudioState.STOPPED: break try: q.put(data, timeout=timeout) except queue.Full as e: - if ( - self._get_playback_state(stream_index) - != PlaybackState.PAUSED - ): + if self._get_playback_state(stream_index) != AudioState.PAUSED: raise e completed_callback_event.wait() except queue.Full: if ( stream_index is None - or self._get_playback_state(stream_index) != PlaybackState.STOPPED + or self._get_playback_state(stream_index) != AudioState.STOPPED ): self.logger.warning('Playback timeout: audio callback failed?') finally: @@ -450,6 +439,81 @@ class SoundPlugin(Plugin): return self.record(*args, **kwargs) + def create_recorder( + self, + device: str, + output_device: Optional[str] = None, + fifo: Optional[str] = None, + outfile: Optional[str] = None, + duration: Optional[float] = None, + sample_rate: Optional[int] = None, + dtype: str = 'int16', + blocksize: Optional[int] = None, + latency: Union[float, str] = 'high', + channels: int = 1, + redis_queue: Optional[str] = None, + format: str = 'wav', # pylint: disable=redefined-builtin + stream: bool = True, + play_audio: bool = False, + ) -> AudioRecorder: + with self._recorder_locks[device]: + assert self._recorders.get(device) is None, ( + f'Recording already in progress for device {device}', + ) + + if play_audio: + output_device = ( + output_device + or self.output_device + or self._get_default_device('output') + ) + + device = (device, output_device) # type: ignore + input_device = device[0] + else: + input_device = device + + if sample_rate is None: + dev_info = sd.query_devices(device, 'input') + sample_rate = int(dev_info['default_samplerate']) # type: ignore + + if blocksize is None: + blocksize = self.input_blocksize + + if fifo: + fifo = os.path.expanduser(fifo) + if os.path.exists(fifo) and stat.S_ISFIFO(os.stat(fifo).st_mode): + self.logger.info('Removing previous input stream FIFO %s', fifo) + os.unlink(fifo) + + os.mkfifo(fifo, 0o644) + outfile = fifo + elif outfile: + outfile = os.path.expanduser(outfile) + + outfile = outfile or fifo or os.devnull + self._recorders[input_device] = AudioRecorder( + plugin=self, + device=device, + outfile=outfile, + duration=duration, + sample_rate=sample_rate, + dtype=dtype, + blocksize=blocksize, + latency=latency, + output_format=format, + channels=channels, + redis_queue=redis_queue, + stream=stream, + audio_pass_through=play_audio, + should_stop=self._should_stop, + ) + + return self._recorders[input_device] + + def _get_input_device(self, device: Optional[str] = None) -> str: + return device or self.input_device or self._get_default_device('input') + @action def record( # pylint: disable=too-many-statements self, @@ -499,121 +563,23 @@ class SoundPlugin(Plugin): HTTP endpoint too (default: ``/sound/stream<.format>``). """ - self.recording_paused_changed.clear() - - if device is None: - device = self.input_device - if device is None: - device = self._get_default_device('input') - - if play_audio: - output_device = ( - output_device - or self.output_device - or self._get_default_device('output') - ) - - device = (device, output_device) # type: ignore - - if sample_rate is None: - dev_info = sd.query_devices(device, 'input') - sample_rate = int(dev_info['default_samplerate']) # type: ignore - - if blocksize is None: - blocksize = self.input_blocksize - - if fifo: - fifo = os.path.expanduser(fifo) - if os.path.exists(fifo) and stat.S_ISFIFO(os.stat(fifo).st_mode): - self.logger.info('Removing previous input stream FIFO %s', fifo) - os.unlink(fifo) - - os.mkfifo(fifo, 0o644) - outfile = fifo - elif outfile: - outfile = os.path.expanduser(outfile) - - outfile = outfile or fifo or os.devnull - - def audio_callback(audio_converter: ConverterProcess): - # _ = frames - # __ = time - def callback(indata, outdata, _, __, status): - while self._get_recording_state() == RecordingState.PAUSED: - self.recording_paused_changed.wait() - - if status: - self.logger.warning('Recording callback status: %s', status) - - audio_converter.write(indata.tobytes()) - if play_audio: - outdata[:] = indata - - return callback - - def streaming_thread(): - try: - stream_index = self._allocate_stream_index() if play_audio else None - - with ConverterProcess( - ffmpeg_bin=self.ffmpeg_bin, - sample_rate=sample_rate, - channels=channels, - dtype=dtype, - chunk_size=self.input_blocksize, - output_format=format, - ) as converter, sd.Stream( - samplerate=sample_rate, - device=device, - channels=channels, - callback=audio_callback(converter), - dtype=dtype, - latency=latency, - blocksize=blocksize, - ) as audio_stream, open( - outfile, 'wb' - ) as f: - self.start_recording() - if stream_index: - self._start_playback( - stream_index=stream_index, stream=audio_stream - ) - - get_bus().post(SoundRecordingStartedEvent()) - self.logger.info('Started recording from device [%s]', device) - recording_started_time = time.time() - - while self._get_recording_state() != RecordingState.STOPPED and ( - duration is None - or time.time() - recording_started_time < duration - ): - while self._get_recording_state() == RecordingState.PAUSED: - self.recording_paused_changed.wait() - - timeout = ( - max( - 0, - duration - (time.time() - recording_started_time), - ) - if duration is not None - else 1 - ) - - data = converter.read(timeout=timeout) - if not data: - continue - - f.write(data) - if redis_queue and stream: - get_redis().publish(redis_queue, data) - - except queue.Empty: - self.logger.warning('Recording timeout: audio callback failed?') - finally: - self.stop_recording() - get_bus().post(SoundRecordingStoppedEvent()) - - Thread(target=streaming_thread).start() + device = self._get_input_device(device) + self.create_recorder( + device, + output_device=output_device, + fifo=fifo, + outfile=outfile, + duration=duration, + sample_rate=sample_rate, + dtype=dtype, + blocksize=blocksize, + latency=latency, + channels=channels, + redis_queue=redis_queue, + format=format, + stream=stream, + play_audio=play_audio, + ).start() @action def recordplay(self, *args, **kwargs): @@ -626,6 +592,7 @@ class SoundPlugin(Plugin): stacklevel=1, ) + kwargs['play_audio'] = True return self.record(*args, **kwargs) @action @@ -718,9 +685,9 @@ class SoundPlugin(Plugin): return stream_index - def _start_playback(self, stream_index, stream): + def start_playback(self, stream_index, stream): with self.playback_state_lock: - self.playback_state[stream_index] = PlaybackState.PLAYING + self.playback_state[stream_index] = AudioState.RUNNING self.active_streams[stream_index] = stream if isinstance(self.playback_paused_changed.get(stream_index), Event): @@ -756,7 +723,7 @@ class SoundPlugin(Plugin): if self.completed_callback_events[i]: completed_callback_events[i] = self.completed_callback_events[i] - self.playback_state[i] = PlaybackState.STOPPED + self.playback_state[i] = AudioState.STOPPED for i, event in completed_callback_events.items(): event.wait() @@ -800,10 +767,10 @@ class SoundPlugin(Plugin): self.logger.info('No such stream index or name: %d', i) continue - if self.playback_state[i] == PlaybackState.PAUSED: - self.playback_state[i] = PlaybackState.PLAYING - elif self.playback_state[i] == PlaybackState.PLAYING: - self.playback_state[i] = PlaybackState.PAUSED + if self.playback_state[i] == AudioState.PAUSED: + self.playback_state[i] = AudioState.RUNNING + elif self.playback_state[i] == AudioState.RUNNING: + self.playback_state[i] = AudioState.PAUSED else: continue @@ -814,28 +781,41 @@ class SoundPlugin(Plugin): ', '.join([str(stream) for stream in streams]), ) - def start_recording(self): - with self.recording_state_lock: - self.recording_state = RecordingState.RECORDING - @action - def stop_recording(self): - with self.recording_state_lock: - self.recording_state = RecordingState.STOPPED - self.logger.info('Recording stopped') - - @action - def pause_recording(self): - with self.recording_state_lock: - if self.recording_state == RecordingState.PAUSED: - self.recording_state = RecordingState.RECORDING - elif self.recording_state == RecordingState.RECORDING: - self.recording_state = RecordingState.PAUSED - else: + def stop_recording( + self, device: Optional[str] = None, timeout: Optional[float] = 2 + ): + """ + Stop the current recording process on the selected device (default: + default input device), if it is running. + """ + device = self._get_input_device(device) + with self._recorder_locks[device]: + recorder = self._recorders.pop(device, None) + if not recorder: + self.logger.warning('No active recording session for device %s', device) return - self.logger.info('Recording paused state toggled') - self.recording_paused_changed.set() + recorder.notify_stop() + recorder.join(timeout=timeout) + + @action + def pause_recording(self, device: Optional[str] = None): + """ + Toggle the recording pause state on the selected device (default: + default input device), if it is running. + + If paused, the recording will be resumed. If running, it will be + paused. Otherwise, no action will be taken. + """ + device = self._get_input_device(device) + with self._recorder_locks[device]: + recorder = self._recorders.get(device) + if not recorder: + self.logger.warning('No active recording session for device %s', device) + return + + recorder.notify_pause() @action def release( @@ -905,9 +885,17 @@ class SoundPlugin(Plugin): with self.playback_state_lock: return self.playback_state[stream_index] - def _get_recording_state(self): - with self.recording_state_lock: - return self.recording_state + @override + def main(self): + self.wait_stop() + + @override + def stop(self): + super().stop() + devices = list(self._recorders.keys()) + + for device in devices: + self.stop_recording(device, timeout=0) # vim:sw=4:ts=4:et: diff --git a/platypush/plugins/sound/_controllers/__init__.py b/platypush/plugins/sound/_controllers/__init__.py new file mode 100644 index 000000000..01ce57283 --- /dev/null +++ b/platypush/plugins/sound/_controllers/__init__.py @@ -0,0 +1,5 @@ +from ._base import AudioThread +from ._recorder import AudioRecorder + + +__all__ = ['AudioRecorder', 'AudioThread'] diff --git a/platypush/plugins/sound/_controllers/_base.py b/platypush/plugins/sound/_controllers/_base.py new file mode 100644 index 000000000..e7060cd92 --- /dev/null +++ b/platypush/plugins/sound/_controllers/_base.py @@ -0,0 +1,227 @@ +from contextlib import contextmanager +import queue +import time + +from abc import ABC, abstractmethod +from logging import getLogger +from threading import Event, RLock, Thread +from typing import IO, Generator, Optional, Tuple, Union +from typing_extensions import override + +import sounddevice as sd + +from .._converter import ConverterProcess +from .._model import AudioState + + +class AudioThread(Thread, ABC): + """ + Base class for audio play/record threads. + """ + + _STREAM_NAME_PREFIX = 'platypush-stream-' + + def __init__( + self, + plugin, + device: Union[str, Tuple[str, str]], + outfile: str, + output_format: str, + channels: int, + sample_rate: int, + dtype: str, + stream: bool, + audio_pass_through: bool, + duration: Optional[float] = None, + blocksize: Optional[int] = None, + latency: Union[float, str] = 'high', + redis_queue: Optional[str] = None, + should_stop: Optional[Event] = None, + **kwargs, + ): + from .. import SoundPlugin + + super().__init__(**kwargs) + + self.plugin: SoundPlugin = plugin + self.device = device + self.outfile = outfile + self.output_format = output_format + self.channels = channels + self.sample_rate = sample_rate + self.dtype = dtype + self.stream = stream + self.duration = duration + self.blocksize = blocksize + self.latency = latency + self.redis_queue = redis_queue + self.audio_pass_through = audio_pass_through + self.logger = getLogger(__name__) + + self._state = AudioState.STOPPED + self._state_lock = RLock() + self._started_time: Optional[float] = None + self._converter: Optional[ConverterProcess] = None + self._should_stop = should_stop or Event() + self.paused_changed = Event() + + @property + def should_stop(self) -> bool: + """ + Proxy for `._should_stop.is_set()`. + """ + return self._should_stop.is_set() + + @abstractmethod + def _audio_callback(self, audio_converter: ConverterProcess): + """ + Returns a callback to handle the raw frames captures from the audio device. + """ + raise NotImplementedError() + + @abstractmethod + def _on_audio_converted(self, data: bytes, out_f: IO): + """ + This callback will be called when the audio data has been converted. + """ + raise NotImplementedError() + + def main( + self, + converter: ConverterProcess, + audio_stream: sd.Stream, + out_stream_index: Optional[int], + out_f: IO, + ): + """ + Main loop. + """ + self.notify_start() + if out_stream_index: + self.plugin.start_playback( + stream_index=out_stream_index, stream=audio_stream + ) + + self.logger.info( + 'Started %s on device [%s]', self.__class__.__name__, self.device + ) + self._started_time = time.time() + + while ( + self.state != AudioState.STOPPED + and not self.should_stop + and ( + self.duration is None + or time.time() - self._started_time < self.duration + ) + ): + while self.state == AudioState.PAUSED: + self.paused_changed.wait() + + if self.should_stop: + break + + timeout = ( + max( + 0, + self.duration - (time.time() - self._started_time), + ) + if self.duration is not None + else 1 + ) + + data = converter.read(timeout=timeout) + if not data: + continue + + self._on_audio_converted(data, out_f) + + @override + def run(self): + super().run() + self.paused_changed.clear() + + try: + stream_index = ( + self.plugin._allocate_stream_index() # pylint: disable=protected-access + if self.audio_pass_through + else None + ) + + with self.open_converter() as converter, sd.Stream( + samplerate=self.sample_rate, + device=self.device, + channels=self.channels, + callback=self._audio_callback(converter), + dtype=self.dtype, + latency=self.latency, + blocksize=self.blocksize, + ) as audio_stream, open(self.outfile, 'wb') as f: + self.main( + out_stream_index=stream_index, + converter=converter, + audio_stream=audio_stream, + out_f=f, + ) + except queue.Empty: + self.logger.warning( + 'Audio callback timeout for %s', self.__class__.__name__ + ) + finally: + self.notify_stop() + + @contextmanager + def open_converter(self) -> Generator[ConverterProcess, None, None]: + assert not self._converter, 'A converter process is already running' + self._converter = ConverterProcess( + ffmpeg_bin=self.plugin.ffmpeg_bin, + sample_rate=self.sample_rate, + channels=self.channels, + dtype=self.dtype, + chunk_size=self.plugin.input_blocksize, + output_format=self.output_format, + ) + + self._converter.start() + yield self._converter + + self._converter.stop() + self._converter.join(timeout=2) + self._converter = None + + def notify_start(self): + self.state = AudioState.RUNNING + + def notify_stop(self): + self.state = AudioState.STOPPED + if self._converter: + self._converter.stop() + + def notify_pause(self): + states = { + AudioState.PAUSED: AudioState.RUNNING, + AudioState.RUNNING: AudioState.PAUSED, + } + + with self._state_lock: + new_state = states.get(self.state) + if new_state: + self.state = new_state + else: + return + + self.logger.info('Paused state toggled for %s', self.__class__.__name__) + self.paused_changed.set() + + @property + def state(self): + with self._state_lock: + return self._state + + @state.setter + def state(self, value: AudioState): + with self._state_lock: + self._state = value + + +# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/sound/_controllers/_recorder.py b/platypush/plugins/sound/_controllers/_recorder.py new file mode 100644 index 000000000..ef6adf26f --- /dev/null +++ b/platypush/plugins/sound/_controllers/_recorder.py @@ -0,0 +1,70 @@ +from dataclasses import dataclass + +from typing import IO +from typing_extensions import override + +from platypush.context import get_bus +from platypush.message.event.sound import ( + SoundRecordingStartedEvent, + SoundRecordingStoppedEvent, +) + +from platypush.utils import get_redis + +from .._converter import ConverterProcess +from .._model import AudioState +from ._base import AudioThread + + +@dataclass +class AudioRecorder(AudioThread): + """ + The ``AudioRecorder`` thread is responsible for recording audio from the + input device, writing it to the converter process and dispatch the + converted audio to the registered consumers. + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + @override + def _audio_callback(self, audio_converter: ConverterProcess): + # _ = frames + # __ = time + def callback(indata, outdata, _, __, status): + if self.state == AudioState.PAUSED: + return + + if status: + self.logger.warning('Recording callback status: %s', status) + + try: + audio_converter.write(indata.tobytes()) + except AssertionError as e: + self.logger.warning('Audio recorder callback error: %s', e) + self.state = AudioState.STOPPED + return + + if self.audio_pass_through: + outdata[:] = indata + + return callback + + @override + def _on_audio_converted(self, data: bytes, out_f: IO): + out_f.write(data) + if self.redis_queue and self.stream: + get_redis().publish(self.redis_queue, data) + + @override + def notify_start(self): + super().notify_start() + get_bus().post(SoundRecordingStartedEvent()) + + @override + def notify_stop(self): + super().notify_stop() + get_bus().post(SoundRecordingStoppedEvent()) + + +# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/sound/_converter.py b/platypush/plugins/sound/_converter.py index ee0f41a76..3aa307f2b 100644 --- a/platypush/plugins/sound/_converter.py +++ b/platypush/plugins/sound/_converter.py @@ -3,7 +3,7 @@ from asyncio.subprocess import PIPE from queue import Empty from queue import Queue -from threading import Thread +from threading import Event, RLock, Thread from typing import Optional, Self from platypush.context import get_or_create_event_loop @@ -75,19 +75,15 @@ class ConverterProcess(Thread): self._out_queue = Queue() self.ffmpeg = None self._loop = None + self._should_stop = Event() + self._stop_lock = RLock() def __enter__(self) -> Self: self.start() return self def __exit__(self, *_, **__): - if self.ffmpeg and self._loop: - self._loop.call_soon_threadsafe(self.ffmpeg.kill) - - self.ffmpeg = None - - if self._loop: - self._loop = None + self.stop() def _check_ffmpeg(self): assert ( @@ -125,7 +121,12 @@ class ConverterProcess(Thread): except asyncio.TimeoutError: pass - while self._loop and self.ffmpeg and self.ffmpeg.returncode is None: + while ( + self._loop + and self.ffmpeg + and self.ffmpeg.returncode is None + and not self.should_stop + ): self._check_ffmpeg() assert ( self.ffmpeg and self.ffmpeg.stdout @@ -158,5 +159,23 @@ class ConverterProcess(Thread): self._loop = get_or_create_event_loop() self._loop.run_until_complete(self._audio_proxy(timeout=1)) + def stop(self): + with self._stop_lock: + self._should_stop.set() + if self.ffmpeg: + try: + self.ffmpeg.kill() + except ProcessLookupError: + pass + + self.ffmpeg = None + + if self._loop: + self._loop = None + + @property + def should_stop(self) -> bool: + return self._should_stop.is_set() + # vim:sw=4:ts=4:et: diff --git a/platypush/plugins/sound/_model.py b/platypush/plugins/sound/_model.py new file mode 100644 index 000000000..2f1215133 --- /dev/null +++ b/platypush/plugins/sound/_model.py @@ -0,0 +1,11 @@ +from enum import Enum + + +class AudioState(Enum): + """ + Audio states. + """ + + STOPPED = 'STOPPED' + RUNNING = 'RUNNING' + PAUSED = 'PAUSED'