Extracted AudioRecorder out of SoundPlugin.

This commit is contained in:
Fabio Manganiello 2023-06-16 03:12:55 +02:00
parent da93f1b3b0
commit a6351dddd4
Signed by: blacklight
GPG key ID: D90FBA7F76362774
6 changed files with 517 additions and 197 deletions

View file

@ -1,42 +1,24 @@
from collections import defaultdict
import os import os
import queue import queue
import stat import stat
import time from typing_extensions import override
import warnings import warnings
from enum import Enum from threading import Event, RLock
from threading import Thread, Event, RLock from typing import Dict, Optional, Union
from typing import Optional, Union
import sounddevice as sd import sounddevice as sd
import soundfile as sf import soundfile as sf
from platypush.context import get_bus from platypush.plugins import RunnablePlugin, action
from platypush.message.event.sound import (
SoundRecordingStartedEvent,
SoundRecordingStoppedEvent,
)
from platypush.plugins import Plugin, action
from platypush.utils import get_redis
from .core import Sound, Mix from .core import Sound, Mix
from ._converter import ConverterProcess from ._controllers import AudioRecorder
from ._model import AudioState
class PlaybackState(Enum): class SoundPlugin(RunnablePlugin):
STOPPED = 'STOPPED'
PLAYING = 'PLAYING'
PAUSED = 'PAUSED'
class RecordingState(Enum):
STOPPED = 'STOPPED'
RECORDING = 'RECORDING'
PAUSED = 'PAUSED'
class SoundPlugin(Plugin):
""" """
Plugin to interact with a sound device. Plugin to interact with a sound device.
@ -105,24 +87,36 @@ class SoundPlugin(Plugin):
self.playback_state_lock = RLock() self.playback_state_lock = RLock()
self.playback_paused_changed = {} self.playback_paused_changed = {}
self.stream_mixes = {} self.stream_mixes = {}
self.recording_state = RecordingState.STOPPED
self.recording_state_lock = RLock()
self.recording_paused_changed = Event()
self.active_streams = {} self.active_streams = {}
self.stream_name_to_index = {} self.stream_name_to_index = {}
self.stream_index_to_name = {} self.stream_index_to_name = {}
self.completed_callback_events = {} self.completed_callback_events = {}
self.ffmpeg_bin = ffmpeg_bin self.ffmpeg_bin = ffmpeg_bin
self._recorders: Dict[str, AudioRecorder] = {}
self._recorder_locks: Dict[str, RLock] = defaultdict(RLock)
@staticmethod @staticmethod
def _get_default_device(category): def _get_default_device(category: str) -> str:
""" """
Query the default audio devices. Query the default audio devices.
:param category: Device category to query. Can be either input or output :param category: Device category to query. Can be either input or output
:type category: str
""" """
return sd.query_hostapis()[0].get('default_' + category.lower() + '_device') # type: ignore host_apis = sd.query_hostapis()
assert host_apis, 'No sound devices found'
available_devices = list(
filter(
lambda x: x is not None,
(
host_api.get('default_' + category.lower() + '_device') # type: ignore
for host_api in host_apis
),
),
)
assert available_devices, f'No default "{category}" device found'
return available_devices[0]
@action @action
def query_devices(self, category=None): def query_devices(self, category=None):
@ -175,10 +169,10 @@ class SoundPlugin(Plugin):
is_raw_stream = streamtype == sd.RawOutputStream is_raw_stream = streamtype == sd.RawOutputStream
def audio_callback(outdata, frames, *, status): def audio_callback(outdata, frames, *, status):
if self._get_playback_state(stream_index) == PlaybackState.STOPPED: if self._get_playback_state(stream_index) == AudioState.STOPPED:
raise sd.CallbackStop raise sd.CallbackStop
while self._get_playback_state(stream_index) == PlaybackState.PAUSED: while self._get_playback_state(stream_index) == AudioState.PAUSED:
self.playback_paused_changed[stream_index].wait() self.playback_paused_changed[stream_index].wait()
if frames != blocksize: if frames != blocksize:
@ -378,7 +372,7 @@ class SoundPlugin(Plugin):
finished_callback=completed_callback_event.set, finished_callback=completed_callback_event.set,
) )
self._start_playback(stream_index=stream_index, stream=stream) self.start_playback(stream_index=stream_index, stream=stream)
with stream: with stream:
# Timeout set until we expect all the buffered blocks to # Timeout set until we expect all the buffered blocks to
@ -386,9 +380,7 @@ class SoundPlugin(Plugin):
timeout = blocksize * bufsize / samplerate timeout = blocksize * bufsize / samplerate
while True: while True:
while ( while self._get_playback_state(stream_index) == AudioState.PAUSED:
self._get_playback_state(stream_index) == PlaybackState.PAUSED
):
self.playback_paused_changed[stream_index].wait() self.playback_paused_changed[stream_index].wait()
if f: if f:
@ -412,23 +404,20 @@ class SoundPlugin(Plugin):
if duration is not None and t >= duration: if duration is not None and t >= duration:
break break
if self._get_playback_state(stream_index) == PlaybackState.STOPPED: if self._get_playback_state(stream_index) == AudioState.STOPPED:
break break
try: try:
q.put(data, timeout=timeout) q.put(data, timeout=timeout)
except queue.Full as e: except queue.Full as e:
if ( if self._get_playback_state(stream_index) != AudioState.PAUSED:
self._get_playback_state(stream_index)
!= PlaybackState.PAUSED
):
raise e raise e
completed_callback_event.wait() completed_callback_event.wait()
except queue.Full: except queue.Full:
if ( if (
stream_index is None stream_index is None
or self._get_playback_state(stream_index) != PlaybackState.STOPPED or self._get_playback_state(stream_index) != AudioState.STOPPED
): ):
self.logger.warning('Playback timeout: audio callback failed?') self.logger.warning('Playback timeout: audio callback failed?')
finally: finally:
@ -450,6 +439,81 @@ class SoundPlugin(Plugin):
return self.record(*args, **kwargs) return self.record(*args, **kwargs)
def create_recorder(
self,
device: str,
output_device: Optional[str] = None,
fifo: Optional[str] = None,
outfile: Optional[str] = None,
duration: Optional[float] = None,
sample_rate: Optional[int] = None,
dtype: str = 'int16',
blocksize: Optional[int] = None,
latency: Union[float, str] = 'high',
channels: int = 1,
redis_queue: Optional[str] = None,
format: str = 'wav', # pylint: disable=redefined-builtin
stream: bool = True,
play_audio: bool = False,
) -> AudioRecorder:
with self._recorder_locks[device]:
assert self._recorders.get(device) is None, (
f'Recording already in progress for device {device}',
)
if play_audio:
output_device = (
output_device
or self.output_device
or self._get_default_device('output')
)
device = (device, output_device) # type: ignore
input_device = device[0]
else:
input_device = device
if sample_rate is None:
dev_info = sd.query_devices(device, 'input')
sample_rate = int(dev_info['default_samplerate']) # type: ignore
if blocksize is None:
blocksize = self.input_blocksize
if fifo:
fifo = os.path.expanduser(fifo)
if os.path.exists(fifo) and stat.S_ISFIFO(os.stat(fifo).st_mode):
self.logger.info('Removing previous input stream FIFO %s', fifo)
os.unlink(fifo)
os.mkfifo(fifo, 0o644)
outfile = fifo
elif outfile:
outfile = os.path.expanduser(outfile)
outfile = outfile or fifo or os.devnull
self._recorders[input_device] = AudioRecorder(
plugin=self,
device=device,
outfile=outfile,
duration=duration,
sample_rate=sample_rate,
dtype=dtype,
blocksize=blocksize,
latency=latency,
output_format=format,
channels=channels,
redis_queue=redis_queue,
stream=stream,
audio_pass_through=play_audio,
should_stop=self._should_stop,
)
return self._recorders[input_device]
def _get_input_device(self, device: Optional[str] = None) -> str:
return device or self.input_device or self._get_default_device('input')
@action @action
def record( # pylint: disable=too-many-statements def record( # pylint: disable=too-many-statements
self, self,
@ -499,121 +563,23 @@ class SoundPlugin(Plugin):
HTTP endpoint too (default: ``/sound/stream<.format>``). HTTP endpoint too (default: ``/sound/stream<.format>``).
""" """
self.recording_paused_changed.clear() device = self._get_input_device(device)
self.create_recorder(
if device is None: device,
device = self.input_device output_device=output_device,
if device is None: fifo=fifo,
device = self._get_default_device('input') outfile=outfile,
duration=duration,
if play_audio:
output_device = (
output_device
or self.output_device
or self._get_default_device('output')
)
device = (device, output_device) # type: ignore
if sample_rate is None:
dev_info = sd.query_devices(device, 'input')
sample_rate = int(dev_info['default_samplerate']) # type: ignore
if blocksize is None:
blocksize = self.input_blocksize
if fifo:
fifo = os.path.expanduser(fifo)
if os.path.exists(fifo) and stat.S_ISFIFO(os.stat(fifo).st_mode):
self.logger.info('Removing previous input stream FIFO %s', fifo)
os.unlink(fifo)
os.mkfifo(fifo, 0o644)
outfile = fifo
elif outfile:
outfile = os.path.expanduser(outfile)
outfile = outfile or fifo or os.devnull
def audio_callback(audio_converter: ConverterProcess):
# _ = frames
# __ = time
def callback(indata, outdata, _, __, status):
while self._get_recording_state() == RecordingState.PAUSED:
self.recording_paused_changed.wait()
if status:
self.logger.warning('Recording callback status: %s', status)
audio_converter.write(indata.tobytes())
if play_audio:
outdata[:] = indata
return callback
def streaming_thread():
try:
stream_index = self._allocate_stream_index() if play_audio else None
with ConverterProcess(
ffmpeg_bin=self.ffmpeg_bin,
sample_rate=sample_rate, sample_rate=sample_rate,
channels=channels,
dtype=dtype, dtype=dtype,
chunk_size=self.input_blocksize,
output_format=format,
) as converter, sd.Stream(
samplerate=sample_rate,
device=device,
channels=channels,
callback=audio_callback(converter),
dtype=dtype,
latency=latency,
blocksize=blocksize, blocksize=blocksize,
) as audio_stream, open( latency=latency,
outfile, 'wb' channels=channels,
) as f: redis_queue=redis_queue,
self.start_recording() format=format,
if stream_index: stream=stream,
self._start_playback( play_audio=play_audio,
stream_index=stream_index, stream=audio_stream ).start()
)
get_bus().post(SoundRecordingStartedEvent())
self.logger.info('Started recording from device [%s]', device)
recording_started_time = time.time()
while self._get_recording_state() != RecordingState.STOPPED and (
duration is None
or time.time() - recording_started_time < duration
):
while self._get_recording_state() == RecordingState.PAUSED:
self.recording_paused_changed.wait()
timeout = (
max(
0,
duration - (time.time() - recording_started_time),
)
if duration is not None
else 1
)
data = converter.read(timeout=timeout)
if not data:
continue
f.write(data)
if redis_queue and stream:
get_redis().publish(redis_queue, data)
except queue.Empty:
self.logger.warning('Recording timeout: audio callback failed?')
finally:
self.stop_recording()
get_bus().post(SoundRecordingStoppedEvent())
Thread(target=streaming_thread).start()
@action @action
def recordplay(self, *args, **kwargs): def recordplay(self, *args, **kwargs):
@ -626,6 +592,7 @@ class SoundPlugin(Plugin):
stacklevel=1, stacklevel=1,
) )
kwargs['play_audio'] = True
return self.record(*args, **kwargs) return self.record(*args, **kwargs)
@action @action
@ -718,9 +685,9 @@ class SoundPlugin(Plugin):
return stream_index return stream_index
def _start_playback(self, stream_index, stream): def start_playback(self, stream_index, stream):
with self.playback_state_lock: with self.playback_state_lock:
self.playback_state[stream_index] = PlaybackState.PLAYING self.playback_state[stream_index] = AudioState.RUNNING
self.active_streams[stream_index] = stream self.active_streams[stream_index] = stream
if isinstance(self.playback_paused_changed.get(stream_index), Event): if isinstance(self.playback_paused_changed.get(stream_index), Event):
@ -756,7 +723,7 @@ class SoundPlugin(Plugin):
if self.completed_callback_events[i]: if self.completed_callback_events[i]:
completed_callback_events[i] = self.completed_callback_events[i] completed_callback_events[i] = self.completed_callback_events[i]
self.playback_state[i] = PlaybackState.STOPPED self.playback_state[i] = AudioState.STOPPED
for i, event in completed_callback_events.items(): for i, event in completed_callback_events.items():
event.wait() event.wait()
@ -800,10 +767,10 @@ class SoundPlugin(Plugin):
self.logger.info('No such stream index or name: %d', i) self.logger.info('No such stream index or name: %d', i)
continue continue
if self.playback_state[i] == PlaybackState.PAUSED: if self.playback_state[i] == AudioState.PAUSED:
self.playback_state[i] = PlaybackState.PLAYING self.playback_state[i] = AudioState.RUNNING
elif self.playback_state[i] == PlaybackState.PLAYING: elif self.playback_state[i] == AudioState.RUNNING:
self.playback_state[i] = PlaybackState.PAUSED self.playback_state[i] = AudioState.PAUSED
else: else:
continue continue
@ -814,28 +781,41 @@ class SoundPlugin(Plugin):
', '.join([str(stream) for stream in streams]), ', '.join([str(stream) for stream in streams]),
) )
def start_recording(self):
with self.recording_state_lock:
self.recording_state = RecordingState.RECORDING
@action @action
def stop_recording(self): def stop_recording(
with self.recording_state_lock: self, device: Optional[str] = None, timeout: Optional[float] = 2
self.recording_state = RecordingState.STOPPED ):
self.logger.info('Recording stopped') """
Stop the current recording process on the selected device (default:
@action default input device), if it is running.
def pause_recording(self): """
with self.recording_state_lock: device = self._get_input_device(device)
if self.recording_state == RecordingState.PAUSED: with self._recorder_locks[device]:
self.recording_state = RecordingState.RECORDING recorder = self._recorders.pop(device, None)
elif self.recording_state == RecordingState.RECORDING: if not recorder:
self.recording_state = RecordingState.PAUSED self.logger.warning('No active recording session for device %s', device)
else:
return return
self.logger.info('Recording paused state toggled') recorder.notify_stop()
self.recording_paused_changed.set() recorder.join(timeout=timeout)
@action
def pause_recording(self, device: Optional[str] = None):
"""
Toggle the recording pause state on the selected device (default:
default input device), if it is running.
If paused, the recording will be resumed. If running, it will be
paused. Otherwise, no action will be taken.
"""
device = self._get_input_device(device)
with self._recorder_locks[device]:
recorder = self._recorders.get(device)
if not recorder:
self.logger.warning('No active recording session for device %s', device)
return
recorder.notify_pause()
@action @action
def release( def release(
@ -905,9 +885,17 @@ class SoundPlugin(Plugin):
with self.playback_state_lock: with self.playback_state_lock:
return self.playback_state[stream_index] return self.playback_state[stream_index]
def _get_recording_state(self): @override
with self.recording_state_lock: def main(self):
return self.recording_state self.wait_stop()
@override
def stop(self):
super().stop()
devices = list(self._recorders.keys())
for device in devices:
self.stop_recording(device, timeout=0)
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et:

View file

@ -0,0 +1,5 @@
from ._base import AudioThread
from ._recorder import AudioRecorder
__all__ = ['AudioRecorder', 'AudioThread']

View file

@ -0,0 +1,227 @@
from contextlib import contextmanager
import queue
import time
from abc import ABC, abstractmethod
from logging import getLogger
from threading import Event, RLock, Thread
from typing import IO, Generator, Optional, Tuple, Union
from typing_extensions import override
import sounddevice as sd
from .._converter import ConverterProcess
from .._model import AudioState
class AudioThread(Thread, ABC):
"""
Base class for audio play/record threads.
"""
_STREAM_NAME_PREFIX = 'platypush-stream-'
def __init__(
self,
plugin,
device: Union[str, Tuple[str, str]],
outfile: str,
output_format: str,
channels: int,
sample_rate: int,
dtype: str,
stream: bool,
audio_pass_through: bool,
duration: Optional[float] = None,
blocksize: Optional[int] = None,
latency: Union[float, str] = 'high',
redis_queue: Optional[str] = None,
should_stop: Optional[Event] = None,
**kwargs,
):
from .. import SoundPlugin
super().__init__(**kwargs)
self.plugin: SoundPlugin = plugin
self.device = device
self.outfile = outfile
self.output_format = output_format
self.channels = channels
self.sample_rate = sample_rate
self.dtype = dtype
self.stream = stream
self.duration = duration
self.blocksize = blocksize
self.latency = latency
self.redis_queue = redis_queue
self.audio_pass_through = audio_pass_through
self.logger = getLogger(__name__)
self._state = AudioState.STOPPED
self._state_lock = RLock()
self._started_time: Optional[float] = None
self._converter: Optional[ConverterProcess] = None
self._should_stop = should_stop or Event()
self.paused_changed = Event()
@property
def should_stop(self) -> bool:
"""
Proxy for `._should_stop.is_set()`.
"""
return self._should_stop.is_set()
@abstractmethod
def _audio_callback(self, audio_converter: ConverterProcess):
"""
Returns a callback to handle the raw frames captures from the audio device.
"""
raise NotImplementedError()
@abstractmethod
def _on_audio_converted(self, data: bytes, out_f: IO):
"""
This callback will be called when the audio data has been converted.
"""
raise NotImplementedError()
def main(
self,
converter: ConverterProcess,
audio_stream: sd.Stream,
out_stream_index: Optional[int],
out_f: IO,
):
"""
Main loop.
"""
self.notify_start()
if out_stream_index:
self.plugin.start_playback(
stream_index=out_stream_index, stream=audio_stream
)
self.logger.info(
'Started %s on device [%s]', self.__class__.__name__, self.device
)
self._started_time = time.time()
while (
self.state != AudioState.STOPPED
and not self.should_stop
and (
self.duration is None
or time.time() - self._started_time < self.duration
)
):
while self.state == AudioState.PAUSED:
self.paused_changed.wait()
if self.should_stop:
break
timeout = (
max(
0,
self.duration - (time.time() - self._started_time),
)
if self.duration is not None
else 1
)
data = converter.read(timeout=timeout)
if not data:
continue
self._on_audio_converted(data, out_f)
@override
def run(self):
super().run()
self.paused_changed.clear()
try:
stream_index = (
self.plugin._allocate_stream_index() # pylint: disable=protected-access
if self.audio_pass_through
else None
)
with self.open_converter() as converter, sd.Stream(
samplerate=self.sample_rate,
device=self.device,
channels=self.channels,
callback=self._audio_callback(converter),
dtype=self.dtype,
latency=self.latency,
blocksize=self.blocksize,
) as audio_stream, open(self.outfile, 'wb') as f:
self.main(
out_stream_index=stream_index,
converter=converter,
audio_stream=audio_stream,
out_f=f,
)
except queue.Empty:
self.logger.warning(
'Audio callback timeout for %s', self.__class__.__name__
)
finally:
self.notify_stop()
@contextmanager
def open_converter(self) -> Generator[ConverterProcess, None, None]:
assert not self._converter, 'A converter process is already running'
self._converter = ConverterProcess(
ffmpeg_bin=self.plugin.ffmpeg_bin,
sample_rate=self.sample_rate,
channels=self.channels,
dtype=self.dtype,
chunk_size=self.plugin.input_blocksize,
output_format=self.output_format,
)
self._converter.start()
yield self._converter
self._converter.stop()
self._converter.join(timeout=2)
self._converter = None
def notify_start(self):
self.state = AudioState.RUNNING
def notify_stop(self):
self.state = AudioState.STOPPED
if self._converter:
self._converter.stop()
def notify_pause(self):
states = {
AudioState.PAUSED: AudioState.RUNNING,
AudioState.RUNNING: AudioState.PAUSED,
}
with self._state_lock:
new_state = states.get(self.state)
if new_state:
self.state = new_state
else:
return
self.logger.info('Paused state toggled for %s', self.__class__.__name__)
self.paused_changed.set()
@property
def state(self):
with self._state_lock:
return self._state
@state.setter
def state(self, value: AudioState):
with self._state_lock:
self._state = value
# vim:sw=4:ts=4:et:

View file

@ -0,0 +1,70 @@
from dataclasses import dataclass
from typing import IO
from typing_extensions import override
from platypush.context import get_bus
from platypush.message.event.sound import (
SoundRecordingStartedEvent,
SoundRecordingStoppedEvent,
)
from platypush.utils import get_redis
from .._converter import ConverterProcess
from .._model import AudioState
from ._base import AudioThread
@dataclass
class AudioRecorder(AudioThread):
"""
The ``AudioRecorder`` thread is responsible for recording audio from the
input device, writing it to the converter process and dispatch the
converted audio to the registered consumers.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
@override
def _audio_callback(self, audio_converter: ConverterProcess):
# _ = frames
# __ = time
def callback(indata, outdata, _, __, status):
if self.state == AudioState.PAUSED:
return
if status:
self.logger.warning('Recording callback status: %s', status)
try:
audio_converter.write(indata.tobytes())
except AssertionError as e:
self.logger.warning('Audio recorder callback error: %s', e)
self.state = AudioState.STOPPED
return
if self.audio_pass_through:
outdata[:] = indata
return callback
@override
def _on_audio_converted(self, data: bytes, out_f: IO):
out_f.write(data)
if self.redis_queue and self.stream:
get_redis().publish(self.redis_queue, data)
@override
def notify_start(self):
super().notify_start()
get_bus().post(SoundRecordingStartedEvent())
@override
def notify_stop(self):
super().notify_stop()
get_bus().post(SoundRecordingStoppedEvent())
# vim:sw=4:ts=4:et:

View file

@ -3,7 +3,7 @@ from asyncio.subprocess import PIPE
from queue import Empty from queue import Empty
from queue import Queue from queue import Queue
from threading import Thread from threading import Event, RLock, Thread
from typing import Optional, Self from typing import Optional, Self
from platypush.context import get_or_create_event_loop from platypush.context import get_or_create_event_loop
@ -75,19 +75,15 @@ class ConverterProcess(Thread):
self._out_queue = Queue() self._out_queue = Queue()
self.ffmpeg = None self.ffmpeg = None
self._loop = None self._loop = None
self._should_stop = Event()
self._stop_lock = RLock()
def __enter__(self) -> Self: def __enter__(self) -> Self:
self.start() self.start()
return self return self
def __exit__(self, *_, **__): def __exit__(self, *_, **__):
if self.ffmpeg and self._loop: self.stop()
self._loop.call_soon_threadsafe(self.ffmpeg.kill)
self.ffmpeg = None
if self._loop:
self._loop = None
def _check_ffmpeg(self): def _check_ffmpeg(self):
assert ( assert (
@ -125,7 +121,12 @@ class ConverterProcess(Thread):
except asyncio.TimeoutError: except asyncio.TimeoutError:
pass pass
while self._loop and self.ffmpeg and self.ffmpeg.returncode is None: while (
self._loop
and self.ffmpeg
and self.ffmpeg.returncode is None
and not self.should_stop
):
self._check_ffmpeg() self._check_ffmpeg()
assert ( assert (
self.ffmpeg and self.ffmpeg.stdout self.ffmpeg and self.ffmpeg.stdout
@ -158,5 +159,23 @@ class ConverterProcess(Thread):
self._loop = get_or_create_event_loop() self._loop = get_or_create_event_loop()
self._loop.run_until_complete(self._audio_proxy(timeout=1)) self._loop.run_until_complete(self._audio_proxy(timeout=1))
def stop(self):
with self._stop_lock:
self._should_stop.set()
if self.ffmpeg:
try:
self.ffmpeg.kill()
except ProcessLookupError:
pass
self.ffmpeg = None
if self._loop:
self._loop = None
@property
def should_stop(self) -> bool:
return self._should_stop.is_set()
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et:

View file

@ -0,0 +1,11 @@
from enum import Enum
class AudioState(Enum):
"""
Audio states.
"""
STOPPED = 'STOPPED'
RUNNING = 'RUNNING'
PAUSED = 'PAUSED'