Refactoring the `sound` plugin to use ffmpeg as a stream converter.

This commit is contained in:
Fabio Manganiello 2023-06-11 12:48:49 +02:00
parent 4587b262b0
commit e238fcb6e4
Signed by: blacklight
GPG Key ID: D90FBA7F76362774
9 changed files with 401 additions and 167 deletions

View File

@ -1,74 +0,0 @@
import os
import tempfile
from flask import Response, Blueprint, request
from platypush.backend.http.app import template_folder
from platypush.backend.http.app.utils import authenticate, send_request
sound = Blueprint('sound', __name__, template_folder=template_folder)
# Declare routes list
__routes__ = [
sound,
]
# Generates the .wav file header for a given set of samples and specs
# noinspection PyRedundantParentheses
def gen_header(sample_rate, sample_width, channels):
datasize = int(2000 * 1e6) # Arbitrary data size for streaming
o = bytes("RIFF", ' ascii') # (4byte) Marks file as RIFF
o += (datasize + 36).to_bytes(4, 'little') # (4byte) File size in bytes
o += bytes("WAVE", 'ascii') # (4byte) File type
o += bytes("fmt ", 'ascii') # (4byte) Format Chunk Marker
o += (16).to_bytes(4, 'little') # (4byte) Length of above format data
o += (1).to_bytes(2, 'little') # (2byte) Format type (1 - PCM)
o += channels.to_bytes(2, 'little') # (2byte)
o += sample_rate.to_bytes(4, 'little') # (4byte)
o += (sample_rate * channels * sample_width // 8).to_bytes(4, 'little') # (4byte)
o += (channels * sample_width // 8).to_bytes(2, 'little') # (2byte)
o += sample_width.to_bytes(2, 'little') # (2byte)
o += bytes("data", 'ascii') # (4byte) Data Chunk Marker
o += datasize.to_bytes(4, 'little') # (4byte) Data size in bytes
return o
def audio_feed(device, fifo, sample_rate, blocksize, latency, channels):
send_request(action='sound.stream_recording', device=device, sample_rate=sample_rate,
dtype='int16', fifo=fifo, blocksize=blocksize, latency=latency,
channels=channels)
try:
with open(fifo, 'rb') as f: # lgtm [py/path-injection]
send_header = True
while True:
audio = f.read(blocksize)
if audio:
if send_header:
audio = gen_header(sample_rate=sample_rate, sample_width=16, channels=channels) + audio
send_header = False
yield audio
finally:
send_request(action='sound.stop_recording')
@sound.route('/sound/stream', methods=['GET'])
@authenticate()
def get_sound_feed():
device = request.args.get('device')
sample_rate = request.args.get('sample_rate', 44100)
blocksize = request.args.get('blocksize', 512)
latency = request.args.get('latency', 0)
channels = request.args.get('channels', 1)
fifo = request.args.get('fifo', os.path.join(tempfile.gettempdir(), 'inputstream'))
return Response(audio_feed(device=device, fifo=fifo, sample_rate=sample_rate,
blocksize=blocksize, latency=latency, channels=channels),
mimetype='audio/x-wav;codec=pcm')
# vim:sw=4:ts=4:et:

View File

@ -1,3 +1,3 @@
from ._base import StreamingRoute, logger
from ._base import StreamingRoute
__all__ = ['StreamingRoute', 'logger']
__all__ = ['StreamingRoute']

View File

@ -11,8 +11,6 @@ from platypush.backend.http.app.utils.auth import AuthStatus, get_auth_status
from ..mixins import PubSubMixin
logger = getLogger(__name__)
@stream_request_body
class StreamingRoute(RequestHandler, PubSubMixin, ABC):
@ -20,6 +18,10 @@ class StreamingRoute(RequestHandler, PubSubMixin, ABC):
Base class for Tornado streaming routes.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.logger = getLogger(__name__)
@override
def prepare(self):
"""
@ -32,7 +34,7 @@ class StreamingRoute(RequestHandler, PubSubMixin, ABC):
self.send_error(auth_status.value.code, error=auth_status.value.message)
return
logger.info(
self.logger.info(
'Client %s connected to %s', self.request.remote_ip, self.request.path
)
@ -63,3 +65,62 @@ class StreamingRoute(RequestHandler, PubSubMixin, ABC):
authentication and return 401 if authentication fails.
"""
return True
@classmethod
def _get_redis_queue(cls, *_, **__) -> Optional[str]:
"""
Returns the Redis channel associated with a given set of arguments.
This is None by default, and it should be implemented by subclasses if
required.
"""
return None
def forward_stream(self, *args, **kwargs):
"""
Utility method that does the following:
1. It listens for new messages on the subscribed Redis channels;
2. It applies a filter on the channel if :meth:`._get_redis_queue`
returns a non-null result given ``args`` and ``kwargs``;
3. It forward the frames read from the Redis channel(s) to the HTTP client;
4. It periodically invokes :meth:`._should_stop` to cleanly
terminate when the HTTP client socket is closed.
"""
redis_queue = self._get_redis_queue( # pylint: disable=assignment-from-none
*args, **kwargs
)
if redis_queue:
self.subscribe(redis_queue)
try:
for msg in self.listen():
if self._should_stop():
break
if redis_queue and msg.channel != redis_queue:
continue
frame = msg.data
if frame:
self.write(frame)
self.flush()
finally:
if redis_queue:
self.unsubscribe(redis_queue)
def _should_stop(self):
"""
Utility method used by :meth:`._forward_stream` to automatically
terminate when the client connection is closed (it can be overridden by
the subclasses).
"""
if self._finished:
return True
if self.request.connection and getattr(self.request.connection, 'stream', None):
return self.request.connection.stream.closed() # type: ignore
return True

View File

@ -1,19 +1,17 @@
from enum import Enum
import json
from logging import getLogger
from typing import Optional
from typing_extensions import override
from tornado.web import stream_request_body
from platypush.context import get_plugin
from platypush.config import Config
from platypush.plugins.camera import Camera, CameraPlugin, StreamWriter
from platypush.utils import get_plugin_name_by_class
from .. import StreamingRoute
logger = getLogger(__name__)
class RequestType(Enum):
"""
@ -31,10 +29,9 @@ class CameraRoute(StreamingRoute):
Route for camera streams.
"""
_redis_queue_prefix = '_platypush/camera'
_redis_queue_prefix = f'_platypush/{Config.get("device_id") or ""}/camera'
def __init__(self, *args, **kwargs):
# TODO Support multiple concurrent requests
super().__init__(*args, **kwargs)
self._camera: Optional[Camera] = None
self._request_type = RequestType.UNKNOWN
@ -61,29 +58,6 @@ class CameraRoute(StreamingRoute):
return None
def _should_stop(self):
if self._finished:
return True
if self.request.connection and getattr(self.request.connection, 'stream', None):
return self.request.connection.stream.closed() # type: ignore
return True
def send_feed(self, camera: CameraPlugin):
redis_queue = self._get_redis_queue_by_camera(camera)
for msg in self.listen():
if self._should_stop():
break
if msg.channel != redis_queue:
continue
frame = msg.data
if frame:
self.write(frame)
self.flush()
def send_frame(self, camera: Camera):
frame = None
for _ in range(camera.info.warmup_frames):
@ -121,8 +95,9 @@ class CameraRoute(StreamingRoute):
return kwargs
@override
@classmethod
def _get_redis_queue_by_camera(cls, camera: CameraPlugin) -> str:
def _get_redis_queue(cls, camera: CameraPlugin, *_, **__) -> str:
plugin_name = get_plugin_name_by_class(camera.__class__)
assert plugin_name, f'No such plugin: {plugin_name}'
return '/'.join(
@ -144,9 +119,8 @@ class CameraRoute(StreamingRoute):
stream_class = StreamWriter.get_class_by_name(self._extension)
camera = self._get_camera(plugin)
redis_queue = self._get_redis_queue_by_camera(camera)
redis_queue = self._get_redis_queue(camera)
self.set_header('Content-Type', stream_class.mimetype)
self.subscribe(redis_queue)
with camera.open(
stream=True,
@ -159,6 +133,6 @@ class CameraRoute(StreamingRoute):
if self._request_type == RequestType.PHOTO:
self.send_frame(session)
elif self._request_type == RequestType.VIDEO:
self.send_feed(camera)
self.forward_stream(camera)
self.finish()

View File

@ -0,0 +1,97 @@
from contextlib import contextmanager
import json
from typing import Generator, Optional
from typing_extensions import override
from tornado.web import stream_request_body
from platypush.backend.http.app.utils import send_request
from platypush.config import Config
from .. import StreamingRoute
@stream_request_body
class SoundRoute(StreamingRoute):
"""
Route for audio streams.
"""
_redis_queue_prefix = f'_platypush/{Config.get("device_id") or ""}/sound'
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._audio_headers_written: bool = False
"""Send the audio file headers before we send the first audio frame."""
@override
@classmethod
def path(cls) -> str:
return r"/sound/stream\.?([a-zA-Z0-9_]+)?"
@contextmanager
def _audio_stream(self, **kwargs) -> Generator[None, None, None]:
response = send_request(
'sound.stream_recording',
dtype='int16',
**kwargs,
)
assert response and not response.is_error(), (
'Streaming error: ' + str(response.errors) if response else '(unknown)'
)
yield
send_request('sound.stop_recording')
@override
@classmethod
def _get_redis_queue(cls, device: Optional[str] = None, *_, **__) -> str:
return '/'.join([cls._redis_queue_prefix, *([device] if device else [])])
def _get_args(self, **kwargs):
kwargs.update({k: v[0].decode() for k, v in self.request.arguments.items()})
device = kwargs.get('device')
return {
'device': device,
'sample_rate': int(kwargs.get('sample_rate', 44100)),
'blocksize': int(kwargs.get('blocksize', 512)),
'latency': float(kwargs.get('latency', 0)),
'channels': int(kwargs.get('channels', 1)),
'format': kwargs.get('format', 'wav'),
'redis_queue': kwargs.get('redis_queue', self._get_redis_queue(device)),
}
@staticmethod
def _content_type_by_extension(extension: str) -> str:
if extension == 'mp3':
return 'audio/mpeg'
if extension == 'ogg':
return 'audio/ogg'
if extension == 'wav':
return 'audio/wav;codec=pcm'
if extension == 'flac':
return 'audio/flac'
if extension == 'aac':
return 'audio/aac'
return 'application/octet-stream'
def get(self, extension: Optional[str] = None) -> None:
ext = extension or 'wav'
args = self._get_args(format=ext)
try:
with self._audio_stream(**args):
self.set_header('Content-Type', self._content_type_by_extension(ext))
self.forward_stream(**args)
self.finish()
except AssertionError as e:
self.set_header("Content-Type", "application/json")
self.set_status(400, str(e))
self.finish(json.dumps({"error": str(e)}))
except Exception as e:
self.set_header("Content-Type", "application/json")
self.logger.exception(e)
self.set_status(500, str(e))
self.finish(json.dumps({"error": str(e)}))

View File

@ -1,3 +1,4 @@
import logging
import os
import importlib
import inspect
@ -5,7 +6,9 @@ from typing import List, Type
import pkgutil
from ..streaming import StreamingRoute, logger
from ..streaming import StreamingRoute
logger = logging.getLogger(__name__)
def get_streaming_routes() -> List[Type[StreamingRoute]]:

View File

@ -6,7 +6,10 @@ import time
from enum import Enum
from threading import Thread, Event, RLock
from typing import Optional
from typing import Optional, Union
import sounddevice as sd
import soundfile as sf
from platypush.context import get_bus
from platypush.message.event.sound import (
@ -15,8 +18,10 @@ from platypush.message.event.sound import (
)
from platypush.plugins import Plugin, action
from platypush.utils import get_redis
from .core import Sound, Mix
from ._converter import ConverterProcess
class PlaybackState(Enum):
@ -49,10 +54,11 @@ class SoundPlugin(Plugin):
* **sounddevice** (``pip install sounddevice``)
* **soundfile** (``pip install soundfile``)
* **numpy** (``pip install numpy``)
* **ffmpeg** package installed on the system (for streaming support)
"""
_STREAM_NAME_PREFIX = 'platypush-stream-'
_default_input_stream_fifo = os.path.join(tempfile.gettempdir(), 'inputstream')
def __init__(
self,
@ -60,6 +66,7 @@ class SoundPlugin(Plugin):
output_device=None,
input_blocksize=Sound._DEFAULT_BLOCKSIZE,
output_blocksize=Sound._DEFAULT_BLOCKSIZE,
ffmpeg_bin: str = 'ffmpeg',
**kwargs,
):
"""
@ -82,6 +89,9 @@ class SoundPlugin(Plugin):
Try to increase this value if you get output underflow errors while
playing. Default: 1024
:type output_blocksize: int
:param ffmpeg_bin: Path of the ``ffmpeg`` binary (default: search for
the ``ffmpeg`` in the ``PATH``).
"""
super().__init__(**kwargs)
@ -102,6 +112,7 @@ class SoundPlugin(Plugin):
self.stream_name_to_index = {}
self.stream_index_to_name = {}
self.completed_callback_events = {}
self.ffmpeg_bin = ffmpeg_bin
@staticmethod
def _get_default_device(category):
@ -111,9 +122,6 @@ class SoundPlugin(Plugin):
:param category: Device category to query. Can be either input or output
:type category: str
"""
import sounddevice as sd
return sd.query_hostapis()[0].get('default_' + category.lower() + '_device')
@action
@ -155,8 +163,6 @@ class SoundPlugin(Plugin):
"""
import sounddevice as sd
devs = sd.query_devices()
if category == 'input':
devs = [d for d in devs if d.get('max_input_channels') > 0]
@ -166,8 +172,6 @@ class SoundPlugin(Plugin):
return devs
def _play_audio_callback(self, q, blocksize, streamtype, stream_index):
import sounddevice as sd
is_raw_stream = streamtype == sd.RawOutputStream
def audio_callback(outdata, frames, *, status):
@ -277,8 +281,6 @@ class SoundPlugin(Plugin):
'Please specify either a file to play or a ' + 'list of sound objects'
)
import sounddevice as sd
if blocksize is None:
blocksize = self.output_blocksize
@ -301,8 +303,6 @@ class SoundPlugin(Plugin):
device = self._get_default_device('output')
if file:
import soundfile as sf
f = sf.SoundFile(file)
if not samplerate:
samplerate = f.samplerate if f else Sound._DEFAULT_SAMPLERATE
@ -444,10 +444,12 @@ class SoundPlugin(Plugin):
fifo: Optional[str] = None,
duration: Optional[float] = None,
sample_rate: Optional[int] = None,
dtype: Optional[str] = 'float32',
dtype: str = 'float32',
blocksize: Optional[int] = None,
latency: float = 0,
latency: Union[float, str] = 'high',
channels: int = 1,
redis_queue: Optional[str] = None,
format: str = 'wav',
):
"""
Return audio data from an audio source
@ -464,12 +466,13 @@ class SoundPlugin(Plugin):
float32
:param blocksize: Audio block size (default: configured
`input_blocksize` or 2048)
:param latency: Device latency in seconds (default: 0)
:param latency: Device latency in seconds (default: the device's default high latency)
:param channels: Number of channels (default: 1)
:param redis_queue: If set, the audio chunks will also be published to
this Redis channel, so other consumers can process them downstream.
:param format: Audio format. Supported: wav, mp3, ogg, aac. Default: wav.
"""
import sounddevice as sd
self.recording_paused_changed.clear()
if device is None:
@ -485,30 +488,42 @@ class SoundPlugin(Plugin):
blocksize = self.input_blocksize
if not fifo:
fifo = self._default_input_stream_fifo
fifo = os.devnull
q = queue.Queue()
def audio_callback(audio_converter: ConverterProcess):
# _ = frames
# __ = time
def callback(indata, _, __, status):
while self._get_recording_state() == RecordingState.PAUSED:
self.recording_paused_changed.wait()
def audio_callback(indata, frames, time_duration, status): # noqa
while self._get_recording_state() == RecordingState.PAUSED:
self.recording_paused_changed.wait()
if status:
self.logger.warning('Recording callback status: %s', status)
if status:
self.logger.warning('Recording callback status: %s', status)
audio_converter.write(indata.tobytes())
q.put(indata.copy())
return callback
def streaming_thread():
try:
with sd.InputStream(
with ConverterProcess(
ffmpeg_bin=self.ffmpeg_bin,
sample_rate=sample_rate,
channels=channels,
dtype=dtype,
chunk_size=self.input_blocksize,
output_format=format,
) as converter, sd.InputStream(
samplerate=sample_rate,
device=device,
channels=channels,
callback=audio_callback,
callback=audio_callback(converter),
dtype=dtype,
latency=latency,
blocksize=blocksize,
), open(fifo, 'wb') as audio_queue:
), open(
fifo, 'wb'
) as audio_queue:
self.start_recording()
get_bus().post(SoundRecordingStartedEvent())
self.logger.info('Started recording from device [%s]', device)
@ -521,23 +536,23 @@ class SoundPlugin(Plugin):
while self._get_recording_state() == RecordingState.PAUSED:
self.recording_paused_changed.wait()
get_args = (
{
'block': True,
'timeout': max(
0,
duration - (time.time() - recording_started_time),
),
}
timeout = (
max(
0,
duration - (time.time() - recording_started_time),
)
if duration is not None
else {}
else 1
)
data = q.get(**get_args)
if not len(data):
data = converter.read(timeout=timeout)
if not data:
continue
audio_queue.write(data)
if redis_queue:
get_redis().publish(redis_queue, data)
except queue.Empty:
self.logger.warning('Recording timeout: audio callback failed?')
finally:
@ -548,12 +563,9 @@ class SoundPlugin(Plugin):
if stat.S_ISFIFO(os.stat(fifo).st_mode):
self.logger.info('Removing previous input stream FIFO %s', fifo)
os.unlink(fifo)
else:
raise RuntimeError(
f'{fifo} exists and is not a FIFO. Please remove it or rename it'
)
else:
os.mkfifo(fifo, 0o644)
os.mkfifo(fifo, 0o644)
Thread(target=streaming_thread).start()
@action
@ -565,7 +577,7 @@ class SoundPlugin(Plugin):
sample_rate=None,
format=None,
blocksize=None,
latency=0,
latency='high',
channels=1,
subtype='PCM_24',
):
@ -590,7 +602,7 @@ class SoundPlugin(Plugin):
:param blocksize: Audio block size (default: configured `input_blocksize` or 2048)
:type blocksize: int
:param latency: Device latency in seconds (default: 0)
:param latency: Device latency in seconds (default: the device's default high latency)
:type latency: float
:param channels: Number of channels (default: 1)
@ -613,8 +625,6 @@ class SoundPlugin(Plugin):
channels,
subtype,
):
import sounddevice as sd
self.recording_paused_changed.clear()
if outfile:
@ -661,8 +671,6 @@ class SoundPlugin(Plugin):
)
try:
import soundfile as sf
with sf.SoundFile(
outfile,
mode='w',
@ -785,8 +793,6 @@ class SoundPlugin(Plugin):
:type dtype: str
"""
import sounddevice as sd
self.recording_paused_changed.clear()
if input_device is None:
@ -806,8 +812,9 @@ class SoundPlugin(Plugin):
if blocksize is None:
blocksize = self.output_blocksize
# noinspection PyUnusedLocal
def audio_callback(indata, outdata, frames, time, status):
# _ = frames
# __ = time
def audio_callback(indata, outdata, _, __, status):
while self._get_recording_state() == RecordingState.PAUSED:
self.recording_paused_changed.wait()

View File

@ -0,0 +1,162 @@
import asyncio
from asyncio.subprocess import PIPE
from queue import Empty
from queue import Queue
from threading import Thread
from typing import Optional, Self
from platypush.context import get_or_create_event_loop
_dtype_to_ffmpeg_format = {
'int8': 's8',
'uint8': 'u8',
'int16': 's16le',
'uint16': 'u16le',
'int32': 's32le',
'uint32': 'u32le',
'float32': 'f32le',
'float64': 'f64le',
}
"""
Supported input types:
'int8', 'uint8', 'int16', 'uint16', 'int32', 'uint32', 'float32', 'float64'
"""
_output_format_to_ffmpeg_args = {
'wav': ('-f', 'wav'),
'ogg': ('-f', 'ogg'),
'mp3': ('-f', 'mp3'),
'aac': ('-f', 'adts'),
'flac': ('-f', 'flac'),
}
class ConverterProcess(Thread):
"""
Wrapper for an ffmpeg converter instance.
"""
def __init__(
self,
ffmpeg_bin: str,
sample_rate: int,
channels: int,
dtype: str,
chunk_size: int,
output_format: str,
*args,
**kwargs,
):
"""
:param ffmpeg_bin: Path to the ffmpeg binary.
:param sample_rate: The sample rate of the input audio.
:param channels: The number of channels of the input audio.
:param dtype: The (numpy) data type of the raw input audio.
:param chunk_size: Number of bytes that will be read at once from the
ffmpeg process.
:param output_format: Output audio format.
"""
super().__init__(*args, **kwargs)
ffmpeg_format = _dtype_to_ffmpeg_format.get(dtype)
assert ffmpeg_format, (
f'Unsupported data type: {dtype}. Supported data types: '
f'{list(_dtype_to_ffmpeg_format.keys())}'
)
self._ffmpeg_bin = ffmpeg_bin
self._ffmpeg_format = ffmpeg_format
self._sample_rate = sample_rate
self._channels = channels
self._chunk_size = chunk_size
self._output_format = output_format
self._closed = False
self._out_queue = Queue()
self.ffmpeg = None
self._loop = None
def __enter__(self) -> Self:
self.start()
return self
def __exit__(self, *_, **__):
if self.ffmpeg and self._loop:
self._loop.call_soon_threadsafe(self.ffmpeg.kill)
self.ffmpeg = None
if self._loop:
self._loop = None
def _check_ffmpeg(self):
assert (
self.ffmpeg and self.ffmpeg.returncode is None
), 'The ffmpeg process has already terminated'
def _get_format_args(self):
ffmpeg_args = _output_format_to_ffmpeg_args.get(self._output_format)
assert ffmpeg_args, (
f'Unsupported output format: {self._output_format}. Supported formats: '
f'{list(_output_format_to_ffmpeg_args.keys())}'
)
return ffmpeg_args
async def _audio_proxy(self, timeout: Optional[float] = None):
self.ffmpeg = await asyncio.create_subprocess_exec(
self._ffmpeg_bin,
'-f',
self._ffmpeg_format,
'-ar',
str(self._sample_rate),
'-ac',
str(self._channels),
'-i',
'pipe:',
*self._get_format_args(),
'pipe:',
stdin=PIPE,
stdout=PIPE,
)
try:
await asyncio.wait_for(self.ffmpeg.wait(), 0.1)
except asyncio.TimeoutError:
pass
while self._loop and self.ffmpeg and self.ffmpeg.returncode is None:
self._check_ffmpeg()
assert (
self.ffmpeg and self.ffmpeg.stdout
), 'The stdout is closed for the ffmpeg process'
try:
data = await asyncio.wait_for(
self.ffmpeg.stdout.read(self._chunk_size), timeout
)
self._out_queue.put(data)
except asyncio.TimeoutError:
self._out_queue.put(b'')
def write(self, data: bytes):
self._check_ffmpeg()
assert (
self.ffmpeg and self._loop and self.ffmpeg.stdin
), 'The stdin is closed for the ffmpeg process'
self._loop.call_soon_threadsafe(self.ffmpeg.stdin.write, data)
def read(self, timeout: Optional[float] = None) -> Optional[bytes]:
try:
return self._out_queue.get(timeout=timeout)
except Empty:
return None
def run(self):
super().run()
self._loop = get_or_create_event_loop()
self._loop.run_until_complete(self._audio_proxy(timeout=1))
# vim:sw=4:ts=4:et:

View File

@ -11,5 +11,9 @@ manifest:
- sounddevice
- soundfile
- numpy
apt:
- ffmpeg
pacman:
- ffmpeg
package: platypush.plugins.sound
type: plugin