From 43ca3a6f9487080b64dbb21fcd2a8d722513ff16 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Tue, 23 Jul 2019 18:22:00 +0200 Subject: [PATCH] Added support for streaming audio from an input source over HTTP --- .../http/app/routes/plugins/sound/__init__.py | 74 ++++++ platypush/backend/http/app/utils.py | 14 +- .../source/webpanel/plugins/sound/index.scss | 15 ++ .../http/static/js/plugins/camera/index.js | 2 +- .../http/static/js/plugins/sound/index.js | 22 ++ platypush/backend/http/templates/nav.html | 1 + .../http/templates/plugins/sound/index.html | 21 ++ platypush/plugins/sound/__init__.py | 244 +++++++++++++----- 8 files changed, 316 insertions(+), 77 deletions(-) create mode 100644 platypush/backend/http/app/routes/plugins/sound/__init__.py create mode 100644 platypush/backend/http/static/css/source/webpanel/plugins/sound/index.scss create mode 100644 platypush/backend/http/static/js/plugins/sound/index.js create mode 100644 platypush/backend/http/templates/plugins/sound/index.html diff --git a/platypush/backend/http/app/routes/plugins/sound/__init__.py b/platypush/backend/http/app/routes/plugins/sound/__init__.py new file mode 100644 index 000000000..b73e37910 --- /dev/null +++ b/platypush/backend/http/app/routes/plugins/sound/__init__.py @@ -0,0 +1,74 @@ +import os +import tempfile + +from flask import Response, Blueprint, request + +from platypush.backend.http.app import template_folder +from platypush.backend.http.app.utils import authenticate, send_request + +sound = Blueprint('sound', __name__, template_folder=template_folder) + +# Declare routes list +__routes__ = [ + sound, +] + + +# Generates the .wav file header for a given set of samples and specs +# noinspection PyRedundantParentheses +def gen_header(sample_rate, sample_width, channels): + datasize = int(2000 * 1e6) # Arbitrary data size for streaming + o = bytes("RIFF", ' ascii') # (4byte) Marks file as RIFF + o += (datasize + 36).to_bytes(4, 'little') # (4byte) File size in bytes + o += bytes("WAVE", 'ascii') # (4byte) File type + o += bytes("fmt ", 'ascii') # (4byte) Format Chunk Marker + o += (16).to_bytes(4, 'little') # (4byte) Length of above format data + o += (1).to_bytes(2, 'little') # (2byte) Format type (1 - PCM) + o += channels.to_bytes(2, 'little') # (2byte) + o += sample_rate.to_bytes(4, 'little') # (4byte) + o += (sample_rate * channels * sample_width // 8).to_bytes(4, 'little') # (4byte) + o += (channels * sample_width // 8).to_bytes(2, 'little') # (2byte) + o += sample_width.to_bytes(2, 'little') # (2byte) + o += bytes("data", 'ascii') # (4byte) Data Chunk Marker + o += datasize.to_bytes(4, 'little') # (4byte) Data size in bytes + return o + + +def audio_feed(device, fifo, sample_rate, blocksize, latency, channels): + send_request(action='sound.stream_recording', device=device, sample_rate=sample_rate, + dtype='int16', fifo=fifo, blocksize=blocksize, latency=latency, + channels=channels) + + try: + with open(fifo, 'rb') as f: + send_header = True + + while True: + audio = f.read(blocksize) + + if audio: + if send_header: + audio = gen_header(sample_rate=sample_rate, sample_width=16, channels=channels) + audio + send_header = False + + yield audio + finally: + send_request(action='sound.stop_recording') + + +@sound.route('/sound/stream', methods=['GET']) +@authenticate() +def get_sound_feed(): + device = request.args.get('device') + sample_rate = request.args.get('sample_rate', 44100) + blocksize = request.args.get('blocksize', 512) + latency = request.args.get('latency', 0) + channels = request.args.get('channels', 1) + fifo = request.args.get('fifo', os.path.join(tempfile.gettempdir(), 'inputstream')) + + return Response(audio_feed(device=device, fifo=fifo, sample_rate=sample_rate, + blocksize=blocksize, latency=latency, channels=channels), + mimetype='audio/x-wav;codec=pcm') + + +# vim:sw=4:ts=4:et: diff --git a/platypush/backend/http/app/utils.py b/platypush/backend/http/app/utils.py index 3566412bb..5dd8d8332 100644 --- a/platypush/backend/http/app/utils.py +++ b/platypush/backend/http/app/utils.py @@ -1,10 +1,9 @@ import importlib -import json import logging import os from functools import wraps -from flask import request, redirect, Response +from flask import abort, request, redirect, Response from redis import Redis # NOTE: The HTTP service will *only* work on top of a Redis bus. The default @@ -77,7 +76,7 @@ def get_websocket_port(): return http_conf.get('websocket_port', HttpBackend._DEFAULT_WEBSOCKET_PORT) -def send_message(msg): +def send_message(msg, wait_for_response=True): msg = Message.build(msg) if isinstance(msg, Request): @@ -88,7 +87,7 @@ def send_message(msg): bus().post(msg) - if isinstance(msg, Request): + if isinstance(msg, Request) and wait_for_response: response = get_message_response(msg) logger().debug('Processing response on the HTTP backend: {}'. format(response)) @@ -96,7 +95,7 @@ def send_message(msg): return response -def send_request(action, **kwargs): +def send_request(action, wait_for_response=True, **kwargs): msg = { 'type': 'request', 'action': action @@ -105,12 +104,11 @@ def send_request(action, **kwargs): if kwargs: msg['args'] = kwargs - return send_message(msg) + return send_message(msg, wait_for_response=wait_for_response) def _authenticate_token(): token = Config.get('token') - user_token = None if 'X-Token' in request.headers: user_token = request.headers['X-Token'] @@ -154,7 +152,6 @@ def _authenticate_session(): def _authenticate_csrf_token(): user_manager = UserManager() user_session_token = None - user = None if 'X-Session-Token' in request.headers: user_session_token = request.headers['X-Session-Token'] @@ -207,7 +204,6 @@ def authenticate(redirect_page='', skip_auth_methods=None, check_csrf_token=Fals return redirect('/login?redirect=' + redirect_page, 307) # CSRF token check - csrf_check_ok = True if check_csrf_token: csrf_check_ok = _authenticate_csrf_token() if not csrf_check_ok: diff --git a/platypush/backend/http/static/css/source/webpanel/plugins/sound/index.scss b/platypush/backend/http/static/css/source/webpanel/plugins/sound/index.scss new file mode 100644 index 000000000..2c8f9def8 --- /dev/null +++ b/platypush/backend/http/static/css/source/webpanel/plugins/sound/index.scss @@ -0,0 +1,15 @@ +@import 'common/vars'; + +.sound { + height: 90%; + margin-top: 7%; + overflow: hidden; + display: flex; + flex-direction: column; + align-items: center; + + .sound-container { + margin-bottom: 1em; + } +} + diff --git a/platypush/backend/http/static/js/plugins/camera/index.js b/platypush/backend/http/static/js/plugins/camera/index.js index 6ce0edce2..4f34e9a56 100644 --- a/platypush/backend/http/static/js/plugins/camera/index.js +++ b/platypush/backend/http/static/js/plugins/camera/index.js @@ -41,7 +41,7 @@ Vue.component('camera', { this.streaming = false; this.capturing = true; - this.$refs.frame.setAttribute('src', '/camera/' + this.deviceId + '/frame'); + this.$refs.frame.setAttribute('src', '/camera/' + this.deviceId + '/frame?t=' + (new Date()).getTime()); }, onFrameLoaded: function(event) { diff --git a/platypush/backend/http/static/js/plugins/sound/index.js b/platypush/backend/http/static/js/plugins/sound/index.js new file mode 100644 index 000000000..2a5b8e961 --- /dev/null +++ b/platypush/backend/http/static/js/plugins/sound/index.js @@ -0,0 +1,22 @@ +Vue.component('sound', { + template: '#tmpl-sound', + props: ['config'], + + data: function() { + return { + bus: new Vue({}), + recording: false, + }; + }, + + methods: { + startRecording: function() { + this.recording = true; + }, + + stopRecording: function() { + this.recording = false; + }, + }, +}); + diff --git a/platypush/backend/http/templates/nav.html b/platypush/backend/http/templates/nav.html index 950505205..2a4fc26f9 100644 --- a/platypush/backend/http/templates/nav.html +++ b/platypush/backend/http/templates/nav.html @@ -9,6 +9,7 @@ 'music.mpd': 'fa fa-music', 'music.snapcast': 'fa fa-volume-up', 'sensors': 'fas fa-thermometer-half', + 'sound': 'fas fa-microphone', 'switches': 'fa fa-toggle-on', 'tts': 'fa fa-comment', 'tts.google': 'fa fa-comment', diff --git a/platypush/backend/http/templates/plugins/sound/index.html b/platypush/backend/http/templates/plugins/sound/index.html new file mode 100644 index 000000000..e80d86aba --- /dev/null +++ b/platypush/backend/http/templates/plugins/sound/index.html @@ -0,0 +1,21 @@ + + diff --git a/platypush/plugins/sound/__init__.py b/platypush/plugins/sound/__init__.py index f5c672f56..cd5804d38 100644 --- a/platypush/plugins/sound/__init__.py +++ b/platypush/plugins/sound/__init__.py @@ -2,10 +2,9 @@ .. moduleauthor:: Fabio Manganiello """ -import json -import math import os import queue +import stat import tempfile import time @@ -15,24 +14,22 @@ from threading import Thread, Event, RLock from .core import Sound, Mix from platypush.context import get_bus -from platypush.message.event.sound import SoundPlaybackStartedEvent, \ - SoundPlaybackPausedEvent, SoundPlaybackStoppedEvent, \ - SoundRecordingStartedEvent, SoundRecordingPausedEvent, \ - SoundRecordingStoppedEvent +from platypush.message.event.sound import \ + SoundRecordingStartedEvent, SoundRecordingStoppedEvent from platypush.plugins import Plugin, action class PlaybackState(Enum): - STOPPED='STOPPED', - PLAYING='PLAYING', - PAUSED='PAUSED' + STOPPED = 'STOPPED', + PLAYING = 'PLAYING', + PAUSED = 'PAUSED' class RecordingState(Enum): - STOPPED='STOPPED', - RECORDING='RECORDING', - PAUSED='PAUSED' + STOPPED = 'STOPPED', + RECORDING = 'RECORDING', + PAUSED = 'PAUSED' class SoundPlugin(Plugin): @@ -56,10 +53,12 @@ class SoundPlugin(Plugin): """ _STREAM_NAME_PREFIX = 'platypush-stream-' + _default_input_stream_fifo = os.path.join(tempfile.gettempdir(), 'inputstream') + # noinspection PyProtectedMember def __init__(self, input_device=None, output_device=None, input_blocksize=Sound._DEFAULT_BLOCKSIZE, - output_blocksize=Sound._DEFAULT_BLOCKSIZE, *args, **kwargs): + output_blocksize=Sound._DEFAULT_BLOCKSIZE, **kwargs): """ :param input_device: Index or name of the default input device. Use :meth:`platypush.plugins.sound.query_devices` to get the @@ -82,7 +81,7 @@ class SoundPlugin(Plugin): :type output_blocksize: int """ - super().__init__(*args, **kwargs) + super().__init__(**kwargs) self.input_device = input_device self.output_device = output_device @@ -101,7 +100,8 @@ class SoundPlugin(Plugin): self.stream_index_to_name = {} self.completed_callback_events = {} - def _get_default_device(self, category): + @staticmethod + def _get_default_device(category): """ Query the default audio devices. @@ -110,8 +110,7 @@ class SoundPlugin(Plugin): """ import sounddevice as sd - return sd.query_hostapis()[0].get('default_' + category.lower() + - '_device') + return sd.query_hostapis()[0].get('default_' + category.lower() + '_device') @action def query_devices(self, category=None): @@ -167,7 +166,8 @@ class SoundPlugin(Plugin): is_raw_stream = streamtype == sd.RawOutputStream - def audio_callback(outdata, frames, time, status): + # noinspection PyUnusedLocal + def audio_callback(outdata, frames, frame_time, status): if self._get_playback_state(stream_index) == PlaybackState.STOPPED: raise sd.CallbackStop @@ -196,13 +196,12 @@ class SoundPlugin(Plugin): if len(data) < len(outdata): outdata[:len(data)] = data outdata[len(data):] = (b'\x00' if is_raw_stream else 0.) * \ - (len(outdata) - len(data)) + (len(outdata) - len(data)) else: outdata[:] = data return audio_callback - @action def play(self, file=None, sound=None, device=None, blocksize=None, bufsize=None, samplerate=None, channels=None, stream_name=None, @@ -294,22 +293,26 @@ class SoundPlugin(Plugin): if not channels: channels = f.channels if f else 1 + mix = None with self.playback_state_lock: stream_index, is_new_stream = self._get_or_allocate_stream_index( stream_index=stream_index, stream_name=stream_name) - if sound: + if sound and stream_index in self.stream_mixes: mix = self.stream_mixes[stream_index] mix.add(sound) + if not mix: + return None, "Unable to allocate the stream" + self.logger.info(('Starting playback of {} to sound device [{}] ' + 'on stream [{}]').format( - file or sound, device, stream_index)) + file or sound, device, stream_index)) if not is_new_stream: - return # Let the existing callback handle the new mix - # TODO Potential support also for mixed streams with - # multiple sound files and synth sounds? + return # Let the existing callback handle the new mix + # TODO Potential support also for mixed streams with + # multiple sound files and synth sounds? try: # Audio queue pre-fill loop @@ -321,11 +324,10 @@ class SoundPlugin(Plugin): else: duration = mix.duration() blocktime = float(blocksize / samplerate) - next_t = min(t+blocktime, duration) \ - if duration is not None else t+blocktime + next_t = min(t + blocktime, duration) \ + if duration is not None else t + blocktime - data = mix.get_wave(t_start=t, t_end=next_t, - samplerate=samplerate) + data = mix.get_wave(t_start=t, t_end=next_t, samplerate=samplerate) t = next_t if duration is not None and t >= duration: @@ -333,7 +335,6 @@ class SoundPlugin(Plugin): q.put_nowait(data) # Pre-fill the audio queue - stream = self.active_streams[stream_index] completed_callback_event = self.completed_callback_events[stream_index] @@ -367,8 +368,8 @@ class SoundPlugin(Plugin): else: duration = mix.duration() blocktime = float(blocksize / samplerate) - next_t = min(t+blocktime, duration) \ - if duration is not None else t+blocktime + next_t = min(t + blocktime, duration) \ + if duration is not None else t + blocktime data = mix.get_wave(t_start=t, t_end=next_t, samplerate=samplerate) @@ -400,10 +401,115 @@ class SoundPlugin(Plugin): self.stop_playback([stream_index]) + @action + def stream_recording(self, device=None, fifo=None, duration=None, sample_rate=None, + dtype='float32', blocksize=None, latency=0, channels=1): + """ + Return audio data from an audio source + + :param device: Input device (default: default configured device or system default audio input if not configured) + :type device: int or str + + :param fifo: Path of the FIFO that will be used to exchange audio samples (default: /tmp/inputstream) + :type fifo: str + + :param duration: Recording duration in seconds (default: record until stop event) + :type duration: float + + :param sample_rate: Recording sample rate (default: device default rate) + :type sample_rate: int + + :param dtype: Data type for the audio samples. Supported types: + 'float64', 'float32', 'int32', 'int16', 'int8', 'uint8'. Default: float32 + :type dtype: str + + :param blocksize: Audio block size (default: configured `input_blocksize` or 2048) + :type blocksize: int + + :param latency: Device latency in seconds (default: 0) + :type latency: float + + :param channels: Number of channels (default: 1) + :type channels: int + """ + + import sounddevice as sd + + self.recording_paused_changed.clear() + + if device is None: + device = self.input_device + if device is None: + device = self._get_default_device('input') + + if sample_rate is None: + dev_info = sd.query_devices(device, 'input') + sample_rate = int(dev_info['default_samplerate']) + + if blocksize is None: + blocksize = self.input_blocksize + + if not fifo: + fifo = self._default_input_stream_fifo + + q = queue.Queue() + + # noinspection PyUnusedLocal + def audio_callback(indata, frames, time_duration, status): + while self._get_recording_state() == RecordingState.PAUSED: + self.recording_paused_changed.wait() + + if status: + self.logger.warning('Recording callback status: {}'.format(str(status))) + + q.put(indata.copy()) + + def streaming_thread(): + try: + with sd.InputStream(samplerate=sample_rate, device=device, + channels=channels, callback=audio_callback, + dtype=dtype, latency=latency, blocksize=blocksize): + with open(fifo, 'wb') as audio_queue: + self.start_recording() + get_bus().post(SoundRecordingStartedEvent()) + self.logger.info('Started recording from device [{}]'.format(device)) + recording_started_time = time.time() + + while self._get_recording_state() != RecordingState.STOPPED \ + and (duration is None or + time.time() - recording_started_time < duration): + while self._get_recording_state() == RecordingState.PAUSED: + self.recording_paused_changed.wait() + + get_args = { + 'block': True, + 'timeout': max(0, duration - (time.time() - recording_started_time)), + } if duration is not None else {} + + data = q.get(**get_args) + if not len(data): + continue + + audio_queue.write(data) + except queue.Empty: + self.logger.warning('Recording timeout: audio callback failed?') + finally: + self.stop_recording() + get_bus().post(SoundRecordingStoppedEvent()) + + if os.path.exists(fifo): + if stat.S_ISFIFO(os.stat(fifo).st_mode): + self.logger.info('Removing previous input stream FIFO {}'.format(fifo)) + os.unlink(fifo) + else: + raise RuntimeError('{} exists and is not a FIFO. Please remove it or rename it'.format(fifo)) + + os.mkfifo(fifo, 0o644) + Thread(target=streaming_thread).start() @action def record(self, outfile=None, duration=None, device=None, sample_rate=None, - blocksize=None, latency=0, channels=1, subtype='PCM_24'): + format=None, blocksize=None, latency=0, channels=1, subtype='PCM_24'): """ Records audio to a sound file (support formats: wav, raw) @@ -419,6 +525,9 @@ class SoundPlugin(Plugin): :param sample_rate: Recording sample rate (default: device default rate) :type sample_rate: int + :param format: Audio format (default: WAV) + :type format: str + :param blocksize: Audio block size (default: configured `input_blocksize` or 2048) :type blocksize: int @@ -432,23 +541,22 @@ class SoundPlugin(Plugin): :type subtype: str """ - def recording_thread(outfile, duration, device, sample_rate, blocksize, - latency, channels, subtype): + def recording_thread(outfile, duration, device, sample_rate, format, + blocksize, latency, channels, subtype): import sounddevice as sd self.recording_paused_changed.clear() if outfile: outfile = os.path.abspath(os.path.expanduser(outfile)) + if os.path.isfile(outfile): + self.logger.info('Removing existing audio file {}'.format(outfile)) + os.unlink(outfile) else: outfile = tempfile.NamedTemporaryFile( prefix='recording_', suffix='.wav', delete=False, dir=tempfile.gettempdir()).name - if os.path.isfile(outfile): - self.logger.info('Removing existing audio file {}'.format(outfile)) - os.unlink(outfile) - if device is None: device = self.input_device if device is None: @@ -463,7 +571,7 @@ class SoundPlugin(Plugin): q = queue.Queue() - def audio_callback(indata, frames, time, status): + def audio_callback(indata, frames, duration, status): while self._get_recording_state() == RecordingState.PAUSED: self.recording_paused_changed.wait() @@ -471,57 +579,61 @@ class SoundPlugin(Plugin): self.logger.warning('Recording callback status: {}'.format( str(status))) - q.put(indata.copy()) - + q.put({ + 'timestamp': time.time(), + 'frames': frames, + 'time': duration, + 'data': indata.copy() + }) try: import soundfile as sf import numpy - with sf.SoundFile(outfile, mode='x', samplerate=sample_rate, - channels=channels, subtype=subtype) as f: + with sf.SoundFile(outfile, mode='w', samplerate=sample_rate, + format=format, channels=channels, subtype=subtype) as f: with sd.InputStream(samplerate=sample_rate, device=device, channels=channels, callback=audio_callback, latency=latency, blocksize=blocksize): self.start_recording() get_bus().post(SoundRecordingStartedEvent(filename=outfile)) self.logger.info('Started recording from device [{}] to [{}]'. - format(device, outfile)) + format(device, outfile)) recording_started_time = time.time() while self._get_recording_state() != RecordingState.STOPPED \ and (duration is None or - time.time() - recording_started_time < duration): + time.time() - recording_started_time < duration): while self._get_recording_state() == RecordingState.PAUSED: self.recording_paused_changed.wait() get_args = { 'block': True, - 'timeout': max(0, duration - (time.time() - - recording_started_time)) + 'timeout': max(0, duration - (time.time() - recording_started_time)), } if duration is not None else {} data = q.get(**get_args) - f.write(data) + if data and time.time() - data.get('timestamp') <= 1.0: + # Only write the block if the latency is still acceptable + f.write(data['data']) f.flush() - except queue.Empty as e: + except queue.Empty: self.logger.warning('Recording timeout: audio callback failed?') finally: self.stop_recording() get_bus().post(SoundRecordingStoppedEvent(filename=outfile)) - Thread(target=recording_thread, args=( - outfile, duration, device, sample_rate, blocksize, latency, channels, - subtype)).start() - + Thread(target=recording_thread, + args=( + outfile, duration, device, sample_rate, format, blocksize, latency, channels, subtype) + ).start() @action def recordplay(self, duration=None, input_device=None, output_device=None, - sample_rate=None, blocksize=None, latency=0, channels=1, - dtype=None): + sample_rate=None, blocksize=None, latency=0, channels=1, dtype=None): """ Records audio and plays it on an output sound device (audio pass-through) @@ -571,6 +683,7 @@ class SoundPlugin(Plugin): if blocksize is None: blocksize = self.output_blocksize + # noinspection PyUnusedLocal def audio_callback(indata, outdata, frames, time, status): while self._get_recording_state() == RecordingState.PAUSED: self.recording_paused_changed.wait() @@ -581,7 +694,6 @@ class SoundPlugin(Plugin): outdata[:] = indata - stream_index = None try: @@ -598,14 +710,14 @@ class SoundPlugin(Plugin): stream=stream) self.logger.info('Started recording pass-through from device ' + - '[{}] to sound device [{}]'. - format(input_device, output_device)) + '[{}] to sound device [{}]'. + format(input_device, output_device)) recording_started_time = time.time() while self._get_recording_state() != RecordingState.STOPPED \ and (duration is None or - time.time() - recording_started_time < duration): + time.time() - recording_started_time < duration): while self._get_recording_state() == RecordingState.PAUSED: self.recording_paused_changed.wait() @@ -617,7 +729,6 @@ class SoundPlugin(Plugin): self.stop_playback([stream_index]) self.stop_recording() - @action def query_streams(self): """ @@ -638,12 +749,11 @@ class SoundPlugin(Plugin): stream['playback_state'] = self.playback_state[i].name stream['name'] = self.stream_index_to_name.get(i) if i in self.stream_mixes: - stream['mix'] = { j: sound for j, sound in - enumerate(list(self.stream_mixes[i])) } + stream['mix'] = {j: sound for j, sound in + enumerate(list(self.stream_mixes[i]))} return streams - def _get_or_allocate_stream_index(self, stream_index=None, stream_name=None, completed_callback_event=None): stream = None @@ -662,8 +772,8 @@ class SoundPlugin(Plugin): if not stream: return (self._allocate_stream_index(stream_name=stream_name, - completed_callback_event= - completed_callback_event), + completed_callback_event= + completed_callback_event), True) return (stream_index, False) @@ -673,7 +783,7 @@ class SoundPlugin(Plugin): stream_index = None with self.playback_state_lock: - for i in range(len(self.active_streams)+1): + for i in range(len(self.active_streams) + 1): if i not in self.active_streams: stream_index = i break