From e238fcb6e43401f6725f6db5540464df8220faca Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Sun, 11 Jun 2023 12:48:49 +0200 Subject: [PATCH] Refactoring the `sound` plugin to use ffmpeg as a stream converter. --- .../http/app/routes/plugins/sound/__init__.py | 74 -------- .../backend/http/app/streaming/__init__.py | 4 +- platypush/backend/http/app/streaming/_base.py | 67 +++++++- .../http/app/streaming/plugins/camera.py | 38 +--- .../http/app/streaming/plugins/sound.py | 97 +++++++++++ platypush/backend/http/app/utils/streaming.py | 5 +- platypush/plugins/sound/__init__.py | 117 +++++++------ platypush/plugins/sound/_converter.py | 162 ++++++++++++++++++ platypush/plugins/sound/manifest.yaml | 4 + 9 files changed, 401 insertions(+), 167 deletions(-) delete mode 100644 platypush/backend/http/app/routes/plugins/sound/__init__.py create mode 100644 platypush/backend/http/app/streaming/plugins/sound.py create mode 100644 platypush/plugins/sound/_converter.py diff --git a/platypush/backend/http/app/routes/plugins/sound/__init__.py b/platypush/backend/http/app/routes/plugins/sound/__init__.py deleted file mode 100644 index 8f4ae479..00000000 --- a/platypush/backend/http/app/routes/plugins/sound/__init__.py +++ /dev/null @@ -1,74 +0,0 @@ -import os -import tempfile - -from flask import Response, Blueprint, request - -from platypush.backend.http.app import template_folder -from platypush.backend.http.app.utils import authenticate, send_request - -sound = Blueprint('sound', __name__, template_folder=template_folder) - -# Declare routes list -__routes__ = [ - sound, -] - - -# Generates the .wav file header for a given set of samples and specs -# noinspection PyRedundantParentheses -def gen_header(sample_rate, sample_width, channels): - datasize = int(2000 * 1e6) # Arbitrary data size for streaming - o = bytes("RIFF", ' ascii') # (4byte) Marks file as RIFF - o += (datasize + 36).to_bytes(4, 'little') # (4byte) File size in bytes - o += bytes("WAVE", 'ascii') # (4byte) File type - o += bytes("fmt ", 'ascii') # (4byte) Format Chunk Marker - o += (16).to_bytes(4, 'little') # (4byte) Length of above format data - o += (1).to_bytes(2, 'little') # (2byte) Format type (1 - PCM) - o += channels.to_bytes(2, 'little') # (2byte) - o += sample_rate.to_bytes(4, 'little') # (4byte) - o += (sample_rate * channels * sample_width // 8).to_bytes(4, 'little') # (4byte) - o += (channels * sample_width // 8).to_bytes(2, 'little') # (2byte) - o += sample_width.to_bytes(2, 'little') # (2byte) - o += bytes("data", 'ascii') # (4byte) Data Chunk Marker - o += datasize.to_bytes(4, 'little') # (4byte) Data size in bytes - return o - - -def audio_feed(device, fifo, sample_rate, blocksize, latency, channels): - send_request(action='sound.stream_recording', device=device, sample_rate=sample_rate, - dtype='int16', fifo=fifo, blocksize=blocksize, latency=latency, - channels=channels) - - try: - with open(fifo, 'rb') as f: # lgtm [py/path-injection] - send_header = True - - while True: - audio = f.read(blocksize) - - if audio: - if send_header: - audio = gen_header(sample_rate=sample_rate, sample_width=16, channels=channels) + audio - send_header = False - - yield audio - finally: - send_request(action='sound.stop_recording') - - -@sound.route('/sound/stream', methods=['GET']) -@authenticate() -def get_sound_feed(): - device = request.args.get('device') - sample_rate = request.args.get('sample_rate', 44100) - blocksize = request.args.get('blocksize', 512) - latency = request.args.get('latency', 0) - channels = request.args.get('channels', 1) - fifo = request.args.get('fifo', os.path.join(tempfile.gettempdir(), 'inputstream')) - - return Response(audio_feed(device=device, fifo=fifo, sample_rate=sample_rate, - blocksize=blocksize, latency=latency, channels=channels), - mimetype='audio/x-wav;codec=pcm') - - -# vim:sw=4:ts=4:et: diff --git a/platypush/backend/http/app/streaming/__init__.py b/platypush/backend/http/app/streaming/__init__.py index b174974e..eb0937e9 100644 --- a/platypush/backend/http/app/streaming/__init__.py +++ b/platypush/backend/http/app/streaming/__init__.py @@ -1,3 +1,3 @@ -from ._base import StreamingRoute, logger +from ._base import StreamingRoute -__all__ = ['StreamingRoute', 'logger'] +__all__ = ['StreamingRoute'] diff --git a/platypush/backend/http/app/streaming/_base.py b/platypush/backend/http/app/streaming/_base.py index e1a861fa..0e698617 100644 --- a/platypush/backend/http/app/streaming/_base.py +++ b/platypush/backend/http/app/streaming/_base.py @@ -11,8 +11,6 @@ from platypush.backend.http.app.utils.auth import AuthStatus, get_auth_status from ..mixins import PubSubMixin -logger = getLogger(__name__) - @stream_request_body class StreamingRoute(RequestHandler, PubSubMixin, ABC): @@ -20,6 +18,10 @@ class StreamingRoute(RequestHandler, PubSubMixin, ABC): Base class for Tornado streaming routes. """ + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.logger = getLogger(__name__) + @override def prepare(self): """ @@ -32,7 +34,7 @@ class StreamingRoute(RequestHandler, PubSubMixin, ABC): self.send_error(auth_status.value.code, error=auth_status.value.message) return - logger.info( + self.logger.info( 'Client %s connected to %s', self.request.remote_ip, self.request.path ) @@ -63,3 +65,62 @@ class StreamingRoute(RequestHandler, PubSubMixin, ABC): authentication and return 401 if authentication fails. """ return True + + @classmethod + def _get_redis_queue(cls, *_, **__) -> Optional[str]: + """ + Returns the Redis channel associated with a given set of arguments. + + This is None by default, and it should be implemented by subclasses if + required. + """ + return None + + def forward_stream(self, *args, **kwargs): + """ + Utility method that does the following: + + 1. It listens for new messages on the subscribed Redis channels; + 2. It applies a filter on the channel if :meth:`._get_redis_queue` + returns a non-null result given ``args`` and ``kwargs``; + 3. It forward the frames read from the Redis channel(s) to the HTTP client; + 4. It periodically invokes :meth:`._should_stop` to cleanly + terminate when the HTTP client socket is closed. + + """ + redis_queue = self._get_redis_queue( # pylint: disable=assignment-from-none + *args, **kwargs + ) + + if redis_queue: + self.subscribe(redis_queue) + + try: + for msg in self.listen(): + if self._should_stop(): + break + + if redis_queue and msg.channel != redis_queue: + continue + + frame = msg.data + if frame: + self.write(frame) + self.flush() + finally: + if redis_queue: + self.unsubscribe(redis_queue) + + def _should_stop(self): + """ + Utility method used by :meth:`._forward_stream` to automatically + terminate when the client connection is closed (it can be overridden by + the subclasses). + """ + if self._finished: + return True + + if self.request.connection and getattr(self.request.connection, 'stream', None): + return self.request.connection.stream.closed() # type: ignore + + return True diff --git a/platypush/backend/http/app/streaming/plugins/camera.py b/platypush/backend/http/app/streaming/plugins/camera.py index d3b128ba..cde01592 100644 --- a/platypush/backend/http/app/streaming/plugins/camera.py +++ b/platypush/backend/http/app/streaming/plugins/camera.py @@ -1,19 +1,17 @@ from enum import Enum import json -from logging import getLogger from typing import Optional from typing_extensions import override from tornado.web import stream_request_body from platypush.context import get_plugin +from platypush.config import Config from platypush.plugins.camera import Camera, CameraPlugin, StreamWriter from platypush.utils import get_plugin_name_by_class from .. import StreamingRoute -logger = getLogger(__name__) - class RequestType(Enum): """ @@ -31,10 +29,9 @@ class CameraRoute(StreamingRoute): Route for camera streams. """ - _redis_queue_prefix = '_platypush/camera' + _redis_queue_prefix = f'_platypush/{Config.get("device_id") or ""}/camera' def __init__(self, *args, **kwargs): - # TODO Support multiple concurrent requests super().__init__(*args, **kwargs) self._camera: Optional[Camera] = None self._request_type = RequestType.UNKNOWN @@ -61,29 +58,6 @@ class CameraRoute(StreamingRoute): return None - def _should_stop(self): - if self._finished: - return True - - if self.request.connection and getattr(self.request.connection, 'stream', None): - return self.request.connection.stream.closed() # type: ignore - - return True - - def send_feed(self, camera: CameraPlugin): - redis_queue = self._get_redis_queue_by_camera(camera) - for msg in self.listen(): - if self._should_stop(): - break - - if msg.channel != redis_queue: - continue - - frame = msg.data - if frame: - self.write(frame) - self.flush() - def send_frame(self, camera: Camera): frame = None for _ in range(camera.info.warmup_frames): @@ -121,8 +95,9 @@ class CameraRoute(StreamingRoute): return kwargs + @override @classmethod - def _get_redis_queue_by_camera(cls, camera: CameraPlugin) -> str: + def _get_redis_queue(cls, camera: CameraPlugin, *_, **__) -> str: plugin_name = get_plugin_name_by_class(camera.__class__) assert plugin_name, f'No such plugin: {plugin_name}' return '/'.join( @@ -144,9 +119,8 @@ class CameraRoute(StreamingRoute): stream_class = StreamWriter.get_class_by_name(self._extension) camera = self._get_camera(plugin) - redis_queue = self._get_redis_queue_by_camera(camera) + redis_queue = self._get_redis_queue(camera) self.set_header('Content-Type', stream_class.mimetype) - self.subscribe(redis_queue) with camera.open( stream=True, @@ -159,6 +133,6 @@ class CameraRoute(StreamingRoute): if self._request_type == RequestType.PHOTO: self.send_frame(session) elif self._request_type == RequestType.VIDEO: - self.send_feed(camera) + self.forward_stream(camera) self.finish() diff --git a/platypush/backend/http/app/streaming/plugins/sound.py b/platypush/backend/http/app/streaming/plugins/sound.py new file mode 100644 index 00000000..69e7b45a --- /dev/null +++ b/platypush/backend/http/app/streaming/plugins/sound.py @@ -0,0 +1,97 @@ +from contextlib import contextmanager +import json +from typing import Generator, Optional +from typing_extensions import override + +from tornado.web import stream_request_body + +from platypush.backend.http.app.utils import send_request +from platypush.config import Config + +from .. import StreamingRoute + + +@stream_request_body +class SoundRoute(StreamingRoute): + """ + Route for audio streams. + """ + + _redis_queue_prefix = f'_platypush/{Config.get("device_id") or ""}/sound' + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._audio_headers_written: bool = False + """Send the audio file headers before we send the first audio frame.""" + + @override + @classmethod + def path(cls) -> str: + return r"/sound/stream\.?([a-zA-Z0-9_]+)?" + + @contextmanager + def _audio_stream(self, **kwargs) -> Generator[None, None, None]: + response = send_request( + 'sound.stream_recording', + dtype='int16', + **kwargs, + ) + + assert response and not response.is_error(), ( + 'Streaming error: ' + str(response.errors) if response else '(unknown)' + ) + + yield + send_request('sound.stop_recording') + + @override + @classmethod + def _get_redis_queue(cls, device: Optional[str] = None, *_, **__) -> str: + return '/'.join([cls._redis_queue_prefix, *([device] if device else [])]) + + def _get_args(self, **kwargs): + kwargs.update({k: v[0].decode() for k, v in self.request.arguments.items()}) + device = kwargs.get('device') + return { + 'device': device, + 'sample_rate': int(kwargs.get('sample_rate', 44100)), + 'blocksize': int(kwargs.get('blocksize', 512)), + 'latency': float(kwargs.get('latency', 0)), + 'channels': int(kwargs.get('channels', 1)), + 'format': kwargs.get('format', 'wav'), + 'redis_queue': kwargs.get('redis_queue', self._get_redis_queue(device)), + } + + @staticmethod + def _content_type_by_extension(extension: str) -> str: + if extension == 'mp3': + return 'audio/mpeg' + if extension == 'ogg': + return 'audio/ogg' + if extension == 'wav': + return 'audio/wav;codec=pcm' + if extension == 'flac': + return 'audio/flac' + if extension == 'aac': + return 'audio/aac' + return 'application/octet-stream' + + def get(self, extension: Optional[str] = None) -> None: + ext = extension or 'wav' + args = self._get_args(format=ext) + + try: + with self._audio_stream(**args): + self.set_header('Content-Type', self._content_type_by_extension(ext)) + self.forward_stream(**args) + + self.finish() + except AssertionError as e: + self.set_header("Content-Type", "application/json") + self.set_status(400, str(e)) + self.finish(json.dumps({"error": str(e)})) + except Exception as e: + self.set_header("Content-Type", "application/json") + self.logger.exception(e) + self.set_status(500, str(e)) + self.finish(json.dumps({"error": str(e)})) diff --git a/platypush/backend/http/app/utils/streaming.py b/platypush/backend/http/app/utils/streaming.py index 02a9dd99..16ec231d 100644 --- a/platypush/backend/http/app/utils/streaming.py +++ b/platypush/backend/http/app/utils/streaming.py @@ -1,3 +1,4 @@ +import logging import os import importlib import inspect @@ -5,7 +6,9 @@ from typing import List, Type import pkgutil -from ..streaming import StreamingRoute, logger +from ..streaming import StreamingRoute + +logger = logging.getLogger(__name__) def get_streaming_routes() -> List[Type[StreamingRoute]]: diff --git a/platypush/plugins/sound/__init__.py b/platypush/plugins/sound/__init__.py index 94745a84..4af83d62 100644 --- a/platypush/plugins/sound/__init__.py +++ b/platypush/plugins/sound/__init__.py @@ -6,7 +6,10 @@ import time from enum import Enum from threading import Thread, Event, RLock -from typing import Optional +from typing import Optional, Union + +import sounddevice as sd +import soundfile as sf from platypush.context import get_bus from platypush.message.event.sound import ( @@ -15,8 +18,10 @@ from platypush.message.event.sound import ( ) from platypush.plugins import Plugin, action +from platypush.utils import get_redis from .core import Sound, Mix +from ._converter import ConverterProcess class PlaybackState(Enum): @@ -49,10 +54,11 @@ class SoundPlugin(Plugin): * **sounddevice** (``pip install sounddevice``) * **soundfile** (``pip install soundfile``) * **numpy** (``pip install numpy``) + * **ffmpeg** package installed on the system (for streaming support) + """ _STREAM_NAME_PREFIX = 'platypush-stream-' - _default_input_stream_fifo = os.path.join(tempfile.gettempdir(), 'inputstream') def __init__( self, @@ -60,6 +66,7 @@ class SoundPlugin(Plugin): output_device=None, input_blocksize=Sound._DEFAULT_BLOCKSIZE, output_blocksize=Sound._DEFAULT_BLOCKSIZE, + ffmpeg_bin: str = 'ffmpeg', **kwargs, ): """ @@ -82,6 +89,9 @@ class SoundPlugin(Plugin): Try to increase this value if you get output underflow errors while playing. Default: 1024 :type output_blocksize: int + + :param ffmpeg_bin: Path of the ``ffmpeg`` binary (default: search for + the ``ffmpeg`` in the ``PATH``). """ super().__init__(**kwargs) @@ -102,6 +112,7 @@ class SoundPlugin(Plugin): self.stream_name_to_index = {} self.stream_index_to_name = {} self.completed_callback_events = {} + self.ffmpeg_bin = ffmpeg_bin @staticmethod def _get_default_device(category): @@ -111,9 +122,6 @@ class SoundPlugin(Plugin): :param category: Device category to query. Can be either input or output :type category: str """ - - import sounddevice as sd - return sd.query_hostapis()[0].get('default_' + category.lower() + '_device') @action @@ -155,8 +163,6 @@ class SoundPlugin(Plugin): """ - import sounddevice as sd - devs = sd.query_devices() if category == 'input': devs = [d for d in devs if d.get('max_input_channels') > 0] @@ -166,8 +172,6 @@ class SoundPlugin(Plugin): return devs def _play_audio_callback(self, q, blocksize, streamtype, stream_index): - import sounddevice as sd - is_raw_stream = streamtype == sd.RawOutputStream def audio_callback(outdata, frames, *, status): @@ -277,8 +281,6 @@ class SoundPlugin(Plugin): 'Please specify either a file to play or a ' + 'list of sound objects' ) - import sounddevice as sd - if blocksize is None: blocksize = self.output_blocksize @@ -301,8 +303,6 @@ class SoundPlugin(Plugin): device = self._get_default_device('output') if file: - import soundfile as sf - f = sf.SoundFile(file) if not samplerate: samplerate = f.samplerate if f else Sound._DEFAULT_SAMPLERATE @@ -444,10 +444,12 @@ class SoundPlugin(Plugin): fifo: Optional[str] = None, duration: Optional[float] = None, sample_rate: Optional[int] = None, - dtype: Optional[str] = 'float32', + dtype: str = 'float32', blocksize: Optional[int] = None, - latency: float = 0, + latency: Union[float, str] = 'high', channels: int = 1, + redis_queue: Optional[str] = None, + format: str = 'wav', ): """ Return audio data from an audio source @@ -464,12 +466,13 @@ class SoundPlugin(Plugin): float32 :param blocksize: Audio block size (default: configured `input_blocksize` or 2048) - :param latency: Device latency in seconds (default: 0) + :param latency: Device latency in seconds (default: the device's default high latency) :param channels: Number of channels (default: 1) + :param redis_queue: If set, the audio chunks will also be published to + this Redis channel, so other consumers can process them downstream. + :param format: Audio format. Supported: wav, mp3, ogg, aac. Default: wav. """ - import sounddevice as sd - self.recording_paused_changed.clear() if device is None: @@ -485,30 +488,42 @@ class SoundPlugin(Plugin): blocksize = self.input_blocksize if not fifo: - fifo = self._default_input_stream_fifo + fifo = os.devnull - q = queue.Queue() + def audio_callback(audio_converter: ConverterProcess): + # _ = frames + # __ = time + def callback(indata, _, __, status): + while self._get_recording_state() == RecordingState.PAUSED: + self.recording_paused_changed.wait() - def audio_callback(indata, frames, time_duration, status): # noqa - while self._get_recording_state() == RecordingState.PAUSED: - self.recording_paused_changed.wait() + if status: + self.logger.warning('Recording callback status: %s', status) - if status: - self.logger.warning('Recording callback status: %s', status) + audio_converter.write(indata.tobytes()) - q.put(indata.copy()) + return callback def streaming_thread(): try: - with sd.InputStream( + with ConverterProcess( + ffmpeg_bin=self.ffmpeg_bin, + sample_rate=sample_rate, + channels=channels, + dtype=dtype, + chunk_size=self.input_blocksize, + output_format=format, + ) as converter, sd.InputStream( samplerate=sample_rate, device=device, channels=channels, - callback=audio_callback, + callback=audio_callback(converter), dtype=dtype, latency=latency, blocksize=blocksize, - ), open(fifo, 'wb') as audio_queue: + ), open( + fifo, 'wb' + ) as audio_queue: self.start_recording() get_bus().post(SoundRecordingStartedEvent()) self.logger.info('Started recording from device [%s]', device) @@ -521,23 +536,23 @@ class SoundPlugin(Plugin): while self._get_recording_state() == RecordingState.PAUSED: self.recording_paused_changed.wait() - get_args = ( - { - 'block': True, - 'timeout': max( - 0, - duration - (time.time() - recording_started_time), - ), - } + timeout = ( + max( + 0, + duration - (time.time() - recording_started_time), + ) if duration is not None - else {} + else 1 ) - data = q.get(**get_args) - if not len(data): + data = converter.read(timeout=timeout) + if not data: continue audio_queue.write(data) + if redis_queue: + get_redis().publish(redis_queue, data) + except queue.Empty: self.logger.warning('Recording timeout: audio callback failed?') finally: @@ -548,12 +563,9 @@ class SoundPlugin(Plugin): if stat.S_ISFIFO(os.stat(fifo).st_mode): self.logger.info('Removing previous input stream FIFO %s', fifo) os.unlink(fifo) - else: - raise RuntimeError( - f'{fifo} exists and is not a FIFO. Please remove it or rename it' - ) + else: + os.mkfifo(fifo, 0o644) - os.mkfifo(fifo, 0o644) Thread(target=streaming_thread).start() @action @@ -565,7 +577,7 @@ class SoundPlugin(Plugin): sample_rate=None, format=None, blocksize=None, - latency=0, + latency='high', channels=1, subtype='PCM_24', ): @@ -590,7 +602,7 @@ class SoundPlugin(Plugin): :param blocksize: Audio block size (default: configured `input_blocksize` or 2048) :type blocksize: int - :param latency: Device latency in seconds (default: 0) + :param latency: Device latency in seconds (default: the device's default high latency) :type latency: float :param channels: Number of channels (default: 1) @@ -613,8 +625,6 @@ class SoundPlugin(Plugin): channels, subtype, ): - import sounddevice as sd - self.recording_paused_changed.clear() if outfile: @@ -661,8 +671,6 @@ class SoundPlugin(Plugin): ) try: - import soundfile as sf - with sf.SoundFile( outfile, mode='w', @@ -785,8 +793,6 @@ class SoundPlugin(Plugin): :type dtype: str """ - import sounddevice as sd - self.recording_paused_changed.clear() if input_device is None: @@ -806,8 +812,9 @@ class SoundPlugin(Plugin): if blocksize is None: blocksize = self.output_blocksize - # noinspection PyUnusedLocal - def audio_callback(indata, outdata, frames, time, status): + # _ = frames + # __ = time + def audio_callback(indata, outdata, _, __, status): while self._get_recording_state() == RecordingState.PAUSED: self.recording_paused_changed.wait() diff --git a/platypush/plugins/sound/_converter.py b/platypush/plugins/sound/_converter.py new file mode 100644 index 00000000..ee0f41a7 --- /dev/null +++ b/platypush/plugins/sound/_converter.py @@ -0,0 +1,162 @@ +import asyncio +from asyncio.subprocess import PIPE +from queue import Empty + +from queue import Queue +from threading import Thread +from typing import Optional, Self + +from platypush.context import get_or_create_event_loop + +_dtype_to_ffmpeg_format = { + 'int8': 's8', + 'uint8': 'u8', + 'int16': 's16le', + 'uint16': 'u16le', + 'int32': 's32le', + 'uint32': 'u32le', + 'float32': 'f32le', + 'float64': 'f64le', +} +""" +Supported input types: + 'int8', 'uint8', 'int16', 'uint16', 'int32', 'uint32', 'float32', 'float64' +""" + +_output_format_to_ffmpeg_args = { + 'wav': ('-f', 'wav'), + 'ogg': ('-f', 'ogg'), + 'mp3': ('-f', 'mp3'), + 'aac': ('-f', 'adts'), + 'flac': ('-f', 'flac'), +} + + +class ConverterProcess(Thread): + """ + Wrapper for an ffmpeg converter instance. + """ + + def __init__( + self, + ffmpeg_bin: str, + sample_rate: int, + channels: int, + dtype: str, + chunk_size: int, + output_format: str, + *args, + **kwargs, + ): + """ + :param ffmpeg_bin: Path to the ffmpeg binary. + :param sample_rate: The sample rate of the input audio. + :param channels: The number of channels of the input audio. + :param dtype: The (numpy) data type of the raw input audio. + :param chunk_size: Number of bytes that will be read at once from the + ffmpeg process. + :param output_format: Output audio format. + """ + super().__init__(*args, **kwargs) + + ffmpeg_format = _dtype_to_ffmpeg_format.get(dtype) + assert ffmpeg_format, ( + f'Unsupported data type: {dtype}. Supported data types: ' + f'{list(_dtype_to_ffmpeg_format.keys())}' + ) + + self._ffmpeg_bin = ffmpeg_bin + self._ffmpeg_format = ffmpeg_format + self._sample_rate = sample_rate + self._channels = channels + self._chunk_size = chunk_size + self._output_format = output_format + self._closed = False + self._out_queue = Queue() + self.ffmpeg = None + self._loop = None + + def __enter__(self) -> Self: + self.start() + return self + + def __exit__(self, *_, **__): + if self.ffmpeg and self._loop: + self._loop.call_soon_threadsafe(self.ffmpeg.kill) + + self.ffmpeg = None + + if self._loop: + self._loop = None + + def _check_ffmpeg(self): + assert ( + self.ffmpeg and self.ffmpeg.returncode is None + ), 'The ffmpeg process has already terminated' + + def _get_format_args(self): + ffmpeg_args = _output_format_to_ffmpeg_args.get(self._output_format) + assert ffmpeg_args, ( + f'Unsupported output format: {self._output_format}. Supported formats: ' + f'{list(_output_format_to_ffmpeg_args.keys())}' + ) + + return ffmpeg_args + + async def _audio_proxy(self, timeout: Optional[float] = None): + self.ffmpeg = await asyncio.create_subprocess_exec( + self._ffmpeg_bin, + '-f', + self._ffmpeg_format, + '-ar', + str(self._sample_rate), + '-ac', + str(self._channels), + '-i', + 'pipe:', + *self._get_format_args(), + 'pipe:', + stdin=PIPE, + stdout=PIPE, + ) + + try: + await asyncio.wait_for(self.ffmpeg.wait(), 0.1) + except asyncio.TimeoutError: + pass + + while self._loop and self.ffmpeg and self.ffmpeg.returncode is None: + self._check_ffmpeg() + assert ( + self.ffmpeg and self.ffmpeg.stdout + ), 'The stdout is closed for the ffmpeg process' + + try: + data = await asyncio.wait_for( + self.ffmpeg.stdout.read(self._chunk_size), timeout + ) + self._out_queue.put(data) + except asyncio.TimeoutError: + self._out_queue.put(b'') + + def write(self, data: bytes): + self._check_ffmpeg() + assert ( + self.ffmpeg and self._loop and self.ffmpeg.stdin + ), 'The stdin is closed for the ffmpeg process' + + self._loop.call_soon_threadsafe(self.ffmpeg.stdin.write, data) + + def read(self, timeout: Optional[float] = None) -> Optional[bytes]: + try: + return self._out_queue.get(timeout=timeout) + except Empty: + return None + + def run(self): + super().run() + self._loop = get_or_create_event_loop() + self._loop.run_until_complete(self._audio_proxy(timeout=1)) + + +# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/sound/manifest.yaml b/platypush/plugins/sound/manifest.yaml index c833f0ac..a2cddcf9 100644 --- a/platypush/plugins/sound/manifest.yaml +++ b/platypush/plugins/sound/manifest.yaml @@ -11,5 +11,9 @@ manifest: - sounddevice - soundfile - numpy + apt: + - ffmpeg + pacman: + - ffmpeg package: platypush.plugins.sound type: plugin