Fabio Manganiello
3e8d6bb01e
Some checks failed
continuous-integration/drone/push Build is failing
The package isn't present on older versions of Debian.
501 lines
15 KiB
Python
501 lines
15 KiB
Python
from abc import ABC, abstractmethod
|
|
from contextlib import contextmanager
|
|
from datetime import datetime
|
|
from logging import getLogger
|
|
import os
|
|
import queue
|
|
from threading import Event, RLock, Thread
|
|
import time
|
|
from typing import IO, Callable, Final, Generator, Optional, Tuple, Type, Union
|
|
|
|
import sounddevice as sd
|
|
|
|
from platypush.context import get_bus
|
|
from platypush.message.event.sound import SoundEvent
|
|
from platypush.utils import get_redis
|
|
|
|
from .._converters import AudioConverter
|
|
from .._model import AudioState, StreamType
|
|
|
|
_StreamType = Union[sd.Stream, sd.OutputStream]
|
|
|
|
|
|
class AudioThread(Thread, ABC):
|
|
"""
|
|
Base class for audio play/record stream threads.
|
|
"""
|
|
|
|
_DEFAULT_FILE: Final[str] = os.devnull
|
|
"""Unless otherwise specified, the audio streams will be sent to /dev/null"""
|
|
_DEFAULT_CONVERTER_TIMEOUT: Final[float] = 1
|
|
|
|
def __init__(
|
|
self,
|
|
device: Union[str, Tuple[str, str]],
|
|
channels: int,
|
|
volume: float,
|
|
sample_rate: int,
|
|
dtype: str,
|
|
blocksize: int,
|
|
ffmpeg_bin: str = 'ffmpeg',
|
|
stream: bool = False,
|
|
audio_pass_through: bool = False,
|
|
infile: Optional[str] = None,
|
|
outfile: Optional[str] = None,
|
|
duration: Optional[float] = None,
|
|
latency: Union[float, str] = 'high',
|
|
redis_queue: Optional[str] = None,
|
|
should_stop: Optional[Event] = None,
|
|
converter_timeout: Optional[float] = None,
|
|
stream_name: Optional[str] = None,
|
|
queue_size: Optional[int] = None,
|
|
**kwargs,
|
|
):
|
|
"""
|
|
:param device: Audio device to use.
|
|
:param channels: Number of channels to use.
|
|
:param volume: Input/output volume, between 0 and 100.
|
|
:param sample_rate: Sample rate to use.
|
|
:param dtype: Data type to use.
|
|
:param blocksize: Block size to use.
|
|
:param ffmpeg_bin: Path to the ffmpeg binary.
|
|
:param stream: Whether to stream the audio to Redis consumers.
|
|
:param audio_pass_through: Whether to pass the audio through to the
|
|
application's output stream.
|
|
:param infile: Path to the input file or URL, if this is an output
|
|
stream.
|
|
:param outfile: Path to the output file.
|
|
:param duration: Duration of the audio stream.
|
|
:param latency: Latency to use.
|
|
:param redis_queue: Redis queue to use.
|
|
:param should_stop: Synchronize with upstream stop events.
|
|
:param converter_timeout: How long to wait for the converter to finish.
|
|
:param stream_name: Name of the stream.
|
|
:param queue_size: Maximum size of the audio queue.
|
|
"""
|
|
super().__init__(**kwargs)
|
|
|
|
self.device = device
|
|
self.outfile = os.path.expanduser(outfile or self._DEFAULT_FILE)
|
|
self.infile = os.path.expanduser(infile or self._DEFAULT_FILE)
|
|
self.ffmpeg_bin = ffmpeg_bin
|
|
self.channels = channels
|
|
self.volume = volume
|
|
self.sample_rate = sample_rate
|
|
self.dtype = dtype
|
|
self.stream = stream
|
|
self.duration = duration
|
|
self.blocksize = blocksize * channels
|
|
self.latency = latency
|
|
self._redis_queue = redis_queue
|
|
self.audio_pass_through = audio_pass_through
|
|
self.queue_size = queue_size
|
|
self._stream_name = stream_name
|
|
self.logger = getLogger(__name__)
|
|
|
|
self._state = AudioState.STOPPED
|
|
self._state_lock = RLock()
|
|
self._started_time: Optional[float] = None
|
|
self._converter: Optional[AudioConverter] = None
|
|
self._should_stop = should_stop or Event()
|
|
self._converter_timeout = converter_timeout or self._DEFAULT_CONVERTER_TIMEOUT
|
|
self.audio_stream: Optional[_StreamType] = None
|
|
self.stream_index: Optional[int] = None
|
|
self.paused_changed = Event()
|
|
self._converter_terminated = Event()
|
|
|
|
@property
|
|
def should_stop(self) -> bool:
|
|
"""
|
|
Proxy for `._should_stop.is_set()`.
|
|
"""
|
|
return self._should_stop.is_set() or bool(
|
|
self.state == AudioState.STOPPED and self._started_time
|
|
)
|
|
|
|
@property
|
|
def gain(self) -> float:
|
|
return self.volume / 100
|
|
|
|
def wait_stop(self, timeout: Optional[float] = None):
|
|
"""
|
|
Wait for the stop signal to be received.
|
|
"""
|
|
return self._should_stop.wait(timeout=timeout)
|
|
|
|
def _audio_callback(self) -> Callable:
|
|
"""
|
|
Returns a callback to handle the raw frames captures from the audio device.
|
|
"""
|
|
|
|
def empty_callback(*_, **__):
|
|
pass
|
|
|
|
return empty_callback
|
|
|
|
@property
|
|
def stream_name(self) -> str:
|
|
if self._stream_name:
|
|
return self._stream_name
|
|
|
|
ret = f'platypush:audio:{self.direction.value}'
|
|
if self.stream_index is not None:
|
|
ret += f':{self.stream_index}'
|
|
return ret
|
|
|
|
@stream_name.setter
|
|
def stream_name(self, value: Optional[str]):
|
|
self._stream_name = value
|
|
|
|
@property
|
|
@abstractmethod
|
|
def direction(self) -> StreamType:
|
|
"""
|
|
The default direction for this stream - input or output.
|
|
"""
|
|
raise NotImplementedError()
|
|
|
|
@property
|
|
@abstractmethod
|
|
def _audio_converter_type(self) -> Optional[Type[AudioConverter]]:
|
|
"""
|
|
This property indicates the type that should be used for the audio
|
|
converter.
|
|
"""
|
|
raise NotImplementedError()
|
|
|
|
@property
|
|
@abstractmethod
|
|
def _started_event_type(self) -> Type[SoundEvent]:
|
|
"""
|
|
Event type that will be emitted when the audio starts.
|
|
"""
|
|
raise NotImplementedError()
|
|
|
|
@property
|
|
@abstractmethod
|
|
def _stopped_event_type(self) -> Type[SoundEvent]:
|
|
"""
|
|
Event type that will be emitted when the audio stops.
|
|
"""
|
|
raise NotImplementedError()
|
|
|
|
@property
|
|
@abstractmethod
|
|
def _paused_event_type(self) -> Type[SoundEvent]:
|
|
"""
|
|
Event type that will be emitted when the audio is paused.
|
|
"""
|
|
raise NotImplementedError()
|
|
|
|
@property
|
|
@abstractmethod
|
|
def _resumed_event_type(self) -> Type[SoundEvent]:
|
|
"""
|
|
Event type that will be emitted when the audio is resumed.
|
|
"""
|
|
raise NotImplementedError()
|
|
|
|
@property
|
|
def _stream_type(self) -> Union[Type[sd.Stream], Type[sd.OutputStream]]:
|
|
"""
|
|
The type of stream this thread is mapped to.
|
|
"""
|
|
return sd.Stream
|
|
|
|
@property
|
|
def _converter_args(self) -> dict:
|
|
"""
|
|
Extra arguments to pass to the audio converter.
|
|
"""
|
|
return {}
|
|
|
|
@property
|
|
def _stream_args(self) -> dict:
|
|
"""
|
|
Extra arguments to pass to the stream constructor.
|
|
"""
|
|
return {}
|
|
|
|
@property
|
|
def redis_queue(self) -> str:
|
|
"""
|
|
Redis queue for audio streaming.
|
|
"""
|
|
if self._redis_queue:
|
|
return self._redis_queue
|
|
|
|
dev = (
|
|
self.device
|
|
if isinstance(self.device, (str, int))
|
|
else '-'.join(map(str, self.device))
|
|
)
|
|
|
|
name = f'platypush-audio-stream-{self.__class__.__name__}-{dev}'
|
|
if self.stream_index is not None:
|
|
name = f'{name}-{self.stream_index}'
|
|
|
|
return name
|
|
|
|
def _on_audio_converted(self, data: bytes, out_f: Optional[IO] = None):
|
|
"""
|
|
This callback will be called when the audio data has been converted.
|
|
"""
|
|
if out_f:
|
|
out_f.write(data)
|
|
|
|
if self.stream:
|
|
get_redis().publish(self.redis_queue, data)
|
|
|
|
def _wait_running(self):
|
|
"""
|
|
If the stream is in paused state, wait for the state to change.
|
|
"""
|
|
while self.state == AudioState.PAUSED:
|
|
self.paused_changed.wait()
|
|
|
|
def main(
|
|
self,
|
|
converter: Optional[AudioConverter] = None,
|
|
out_f: Optional[IO] = None,
|
|
):
|
|
"""
|
|
Main loop.
|
|
"""
|
|
self.notify_start()
|
|
|
|
self.logger.info(
|
|
'Started %s on device [%s]', self.__class__.__name__, self.device
|
|
)
|
|
self._started_time = time.time()
|
|
|
|
while not self.should_stop and (
|
|
self.duration is None or time.time() - self._started_time < self.duration
|
|
):
|
|
self._wait_running()
|
|
if not converter:
|
|
self.wait_stop(0.1)
|
|
continue
|
|
|
|
if self.should_stop:
|
|
break
|
|
|
|
timeout = (
|
|
max(
|
|
0,
|
|
min(
|
|
self.duration - (time.time() - self._started_time),
|
|
self._converter_timeout,
|
|
),
|
|
)
|
|
if self.duration is not None
|
|
else self._converter_timeout
|
|
)
|
|
|
|
should_continue = self._process_converted_audio(
|
|
converter, timeout=timeout, out_f=out_f
|
|
)
|
|
|
|
if not should_continue:
|
|
break
|
|
|
|
def _process_converted_audio(
|
|
self, converter: AudioConverter, timeout: float, out_f: Optional[IO]
|
|
) -> bool:
|
|
"""
|
|
It reads the converted audio from the converter and passes it downstream.
|
|
|
|
:return: True if the process should continue, False if it should terminate.
|
|
"""
|
|
data = converter.read(timeout=timeout)
|
|
if not data:
|
|
return self._on_converter_timeout(converter)
|
|
|
|
self._on_audio_converted(data, out_f)
|
|
return True
|
|
|
|
def _on_converter_timeout(self, converter: AudioConverter) -> bool:
|
|
"""
|
|
Callback logic invoked if the converter times out.
|
|
|
|
:return: ``True`` (default) if the thread is supposed to continue,
|
|
``False`` if it should terminate.
|
|
"""
|
|
self.logger.debug('Timeout on converter %s', converter.__class__.__name__)
|
|
# Continue only if the converter hasn't terminated
|
|
return not self._converter_terminated.is_set()
|
|
|
|
def run(self):
|
|
"""
|
|
Wrapper for the main loop that initializes the converter and the stream.
|
|
"""
|
|
super().run()
|
|
self.paused_changed.clear()
|
|
|
|
try:
|
|
with self.open_converter() as converter, self._stream_type(
|
|
samplerate=self.sample_rate,
|
|
device=self.device,
|
|
channels=self.channels,
|
|
dtype=self.dtype,
|
|
latency=self.latency,
|
|
blocksize=self.blocksize,
|
|
**self._stream_args,
|
|
) as self.audio_stream, open(
|
|
self.outfile, 'wb'
|
|
) as out_f, self._audio_generator():
|
|
self.main(converter=converter, out_f=out_f)
|
|
except queue.Empty:
|
|
self.logger.warning(
|
|
'Audio callback timeout for %s', self.__class__.__name__
|
|
)
|
|
finally:
|
|
self.notify_stop()
|
|
|
|
@contextmanager
|
|
def _audio_generator(self) -> Generator[Optional[Thread], None, None]:
|
|
"""
|
|
:yield: A <Thread, Queue> pair where the thread generates raw audio
|
|
frames (as numpy arrays) that are sent to the specified queue.
|
|
"""
|
|
yield None
|
|
|
|
@contextmanager
|
|
def open_converter(self) -> Generator[Optional[AudioConverter], None, None]:
|
|
"""
|
|
Context manager for the converter process.
|
|
"""
|
|
if self._audio_converter_type is None:
|
|
yield None
|
|
return
|
|
|
|
assert not self._converter, 'A converter process is already running'
|
|
self._converter = self._audio_converter_type(
|
|
ffmpeg_bin=self.ffmpeg_bin,
|
|
sample_rate=self.sample_rate,
|
|
channels=self.channels,
|
|
volume=self.volume,
|
|
dtype=self.dtype,
|
|
chunk_size=self.blocksize,
|
|
on_exit=self._converter_terminated.set,
|
|
**self._converter_args,
|
|
)
|
|
|
|
self._converter.start()
|
|
yield self._converter
|
|
|
|
self._converter.stop()
|
|
self._converter.join(timeout=2)
|
|
self._converter = None
|
|
|
|
@contextmanager
|
|
def _change_state(self, state: AudioState, event_type: Type[SoundEvent]):
|
|
"""
|
|
Changes the state and it emits the specified event if the state has
|
|
actually changed.
|
|
|
|
It uses a context manager pattern, and everything in between will be
|
|
executed before the events are dispatched.
|
|
"""
|
|
with self._state_lock:
|
|
prev_state = self.state
|
|
self.state = state
|
|
|
|
yield
|
|
if prev_state != state:
|
|
self._notify(event_type)
|
|
|
|
def _notify(self, event_type: Type[SoundEvent], **kwargs):
|
|
"""
|
|
Notifies the specified event.
|
|
"""
|
|
get_bus().post(event_type(device=self.device, **kwargs))
|
|
|
|
def notify_start(self):
|
|
"""
|
|
Notifies the start event.
|
|
"""
|
|
with self._change_state(AudioState.RUNNING, self._started_event_type):
|
|
pass
|
|
|
|
def notify_stop(self):
|
|
"""
|
|
Notifies the stop event.
|
|
"""
|
|
with self._change_state(AudioState.STOPPED, self._stopped_event_type):
|
|
if self._converter:
|
|
self._converter.stop()
|
|
self.paused_changed.set()
|
|
self.paused_changed.clear()
|
|
|
|
def notify_pause(self):
|
|
"""
|
|
Notifies a pause toggle event.
|
|
"""
|
|
states = {
|
|
AudioState.PAUSED: AudioState.RUNNING,
|
|
AudioState.RUNNING: AudioState.PAUSED,
|
|
}
|
|
|
|
with self._state_lock:
|
|
new_state = states.get(self.state)
|
|
if not new_state:
|
|
return
|
|
|
|
event_type = (
|
|
self._paused_event_type
|
|
if new_state == AudioState.PAUSED
|
|
else self._resumed_event_type
|
|
)
|
|
|
|
with self._change_state(new_state, event_type):
|
|
self.paused_changed.set()
|
|
self.paused_changed.clear()
|
|
|
|
@property
|
|
def state(self):
|
|
"""
|
|
Thread-safe wrapper for the stream state.
|
|
"""
|
|
with self._state_lock:
|
|
return self._state
|
|
|
|
@state.setter
|
|
def state(self, value: AudioState):
|
|
"""
|
|
Thread-safe setter for the stream state.
|
|
"""
|
|
with self._state_lock:
|
|
self._state = value
|
|
|
|
def asdict(self) -> dict:
|
|
"""
|
|
Serialize the thread information.
|
|
"""
|
|
return {
|
|
'device': self.device,
|
|
'outfile': self.outfile,
|
|
'infile': self.infile,
|
|
'direction': self.direction,
|
|
'ffmpeg_bin': self.ffmpeg_bin,
|
|
'channels': self.channels,
|
|
'sample_rate': self.sample_rate,
|
|
'dtype': self.dtype,
|
|
'streaming': self.stream,
|
|
'duration': self.duration,
|
|
'blocksize': self.blocksize,
|
|
'latency': self.latency,
|
|
'redis_queue': self.redis_queue,
|
|
'audio_pass_through': self.audio_pass_through,
|
|
'state': self._state.value,
|
|
'volume': self.volume,
|
|
'started_time': datetime.fromtimestamp(self._started_time)
|
|
if self._started_time
|
|
else None,
|
|
'stream_index': self.stream_index,
|
|
'stream_name': self.stream_name,
|
|
}
|
|
|
|
|
|
# vim:sw=4:ts=4:et:
|