diff --git a/platypush/backend/http/app/routes/plugins/sound/__init__.py b/platypush/backend/http/app/routes/plugins/sound/__init__.py
new file mode 100644
index 0000000000..b73e37910b
--- /dev/null
+++ b/platypush/backend/http/app/routes/plugins/sound/__init__.py
@@ -0,0 +1,74 @@
+import os
+import tempfile
+
+from flask import Response, Blueprint, request
+
+from platypush.backend.http.app import template_folder
+from platypush.backend.http.app.utils import authenticate, send_request
+
+sound = Blueprint('sound', __name__, template_folder=template_folder)
+
+# Declare routes list
+__routes__ = [
+ sound,
+]
+
+
+# Generates the .wav file header for a given set of samples and specs
+# noinspection PyRedundantParentheses
+def gen_header(sample_rate, sample_width, channels):
+ datasize = int(2000 * 1e6) # Arbitrary data size for streaming
+ o = bytes("RIFF", ' ascii') # (4byte) Marks file as RIFF
+ o += (datasize + 36).to_bytes(4, 'little') # (4byte) File size in bytes
+ o += bytes("WAVE", 'ascii') # (4byte) File type
+ o += bytes("fmt ", 'ascii') # (4byte) Format Chunk Marker
+ o += (16).to_bytes(4, 'little') # (4byte) Length of above format data
+ o += (1).to_bytes(2, 'little') # (2byte) Format type (1 - PCM)
+ o += channels.to_bytes(2, 'little') # (2byte)
+ o += sample_rate.to_bytes(4, 'little') # (4byte)
+ o += (sample_rate * channels * sample_width // 8).to_bytes(4, 'little') # (4byte)
+ o += (channels * sample_width // 8).to_bytes(2, 'little') # (2byte)
+ o += sample_width.to_bytes(2, 'little') # (2byte)
+ o += bytes("data", 'ascii') # (4byte) Data Chunk Marker
+ o += datasize.to_bytes(4, 'little') # (4byte) Data size in bytes
+ return o
+
+
+def audio_feed(device, fifo, sample_rate, blocksize, latency, channels):
+ send_request(action='sound.stream_recording', device=device, sample_rate=sample_rate,
+ dtype='int16', fifo=fifo, blocksize=blocksize, latency=latency,
+ channels=channels)
+
+ try:
+ with open(fifo, 'rb') as f:
+ send_header = True
+
+ while True:
+ audio = f.read(blocksize)
+
+ if audio:
+ if send_header:
+ audio = gen_header(sample_rate=sample_rate, sample_width=16, channels=channels) + audio
+ send_header = False
+
+ yield audio
+ finally:
+ send_request(action='sound.stop_recording')
+
+
+@sound.route('/sound/stream', methods=['GET'])
+@authenticate()
+def get_sound_feed():
+ device = request.args.get('device')
+ sample_rate = request.args.get('sample_rate', 44100)
+ blocksize = request.args.get('blocksize', 512)
+ latency = request.args.get('latency', 0)
+ channels = request.args.get('channels', 1)
+ fifo = request.args.get('fifo', os.path.join(tempfile.gettempdir(), 'inputstream'))
+
+ return Response(audio_feed(device=device, fifo=fifo, sample_rate=sample_rate,
+ blocksize=blocksize, latency=latency, channels=channels),
+ mimetype='audio/x-wav;codec=pcm')
+
+
+# vim:sw=4:ts=4:et:
diff --git a/platypush/backend/http/app/utils.py b/platypush/backend/http/app/utils.py
index 3566412bb0..5dd8d83322 100644
--- a/platypush/backend/http/app/utils.py
+++ b/platypush/backend/http/app/utils.py
@@ -1,10 +1,9 @@
import importlib
-import json
import logging
import os
from functools import wraps
-from flask import request, redirect, Response
+from flask import abort, request, redirect, Response
from redis import Redis
# NOTE: The HTTP service will *only* work on top of a Redis bus. The default
@@ -77,7 +76,7 @@ def get_websocket_port():
return http_conf.get('websocket_port', HttpBackend._DEFAULT_WEBSOCKET_PORT)
-def send_message(msg):
+def send_message(msg, wait_for_response=True):
msg = Message.build(msg)
if isinstance(msg, Request):
@@ -88,7 +87,7 @@ def send_message(msg):
bus().post(msg)
- if isinstance(msg, Request):
+ if isinstance(msg, Request) and wait_for_response:
response = get_message_response(msg)
logger().debug('Processing response on the HTTP backend: {}'.
format(response))
@@ -96,7 +95,7 @@ def send_message(msg):
return response
-def send_request(action, **kwargs):
+def send_request(action, wait_for_response=True, **kwargs):
msg = {
'type': 'request',
'action': action
@@ -105,12 +104,11 @@ def send_request(action, **kwargs):
if kwargs:
msg['args'] = kwargs
- return send_message(msg)
+ return send_message(msg, wait_for_response=wait_for_response)
def _authenticate_token():
token = Config.get('token')
- user_token = None
if 'X-Token' in request.headers:
user_token = request.headers['X-Token']
@@ -154,7 +152,6 @@ def _authenticate_session():
def _authenticate_csrf_token():
user_manager = UserManager()
user_session_token = None
- user = None
if 'X-Session-Token' in request.headers:
user_session_token = request.headers['X-Session-Token']
@@ -207,7 +204,6 @@ def authenticate(redirect_page='', skip_auth_methods=None, check_csrf_token=Fals
return redirect('/login?redirect=' + redirect_page, 307)
# CSRF token check
- csrf_check_ok = True
if check_csrf_token:
csrf_check_ok = _authenticate_csrf_token()
if not csrf_check_ok:
diff --git a/platypush/backend/http/static/css/source/webpanel/plugins/sound/index.scss b/platypush/backend/http/static/css/source/webpanel/plugins/sound/index.scss
new file mode 100644
index 0000000000..2c8f9def8d
--- /dev/null
+++ b/platypush/backend/http/static/css/source/webpanel/plugins/sound/index.scss
@@ -0,0 +1,15 @@
+@import 'common/vars';
+
+.sound {
+ height: 90%;
+ margin-top: 7%;
+ overflow: hidden;
+ display: flex;
+ flex-direction: column;
+ align-items: center;
+
+ .sound-container {
+ margin-bottom: 1em;
+ }
+}
+
diff --git a/platypush/backend/http/static/js/plugins/camera/index.js b/platypush/backend/http/static/js/plugins/camera/index.js
index 6ce0edce2f..4f34e9a56a 100644
--- a/platypush/backend/http/static/js/plugins/camera/index.js
+++ b/platypush/backend/http/static/js/plugins/camera/index.js
@@ -41,7 +41,7 @@ Vue.component('camera', {
this.streaming = false;
this.capturing = true;
- this.$refs.frame.setAttribute('src', '/camera/' + this.deviceId + '/frame');
+ this.$refs.frame.setAttribute('src', '/camera/' + this.deviceId + '/frame?t=' + (new Date()).getTime());
},
onFrameLoaded: function(event) {
diff --git a/platypush/backend/http/static/js/plugins/sound/index.js b/platypush/backend/http/static/js/plugins/sound/index.js
new file mode 100644
index 0000000000..2a5b8e9617
--- /dev/null
+++ b/platypush/backend/http/static/js/plugins/sound/index.js
@@ -0,0 +1,22 @@
+Vue.component('sound', {
+ template: '#tmpl-sound',
+ props: ['config'],
+
+ data: function() {
+ return {
+ bus: new Vue({}),
+ recording: false,
+ };
+ },
+
+ methods: {
+ startRecording: function() {
+ this.recording = true;
+ },
+
+ stopRecording: function() {
+ this.recording = false;
+ },
+ },
+});
+
diff --git a/platypush/backend/http/templates/nav.html b/platypush/backend/http/templates/nav.html
index 9505052056..2a4fc26f92 100644
--- a/platypush/backend/http/templates/nav.html
+++ b/platypush/backend/http/templates/nav.html
@@ -9,6 +9,7 @@
'music.mpd': 'fa fa-music',
'music.snapcast': 'fa fa-volume-up',
'sensors': 'fas fa-thermometer-half',
+ 'sound': 'fas fa-microphone',
'switches': 'fa fa-toggle-on',
'tts': 'fa fa-comment',
'tts.google': 'fa fa-comment',
diff --git a/platypush/backend/http/templates/plugins/sound/index.html b/platypush/backend/http/templates/plugins/sound/index.html
new file mode 100644
index 0000000000..e80d86abae
--- /dev/null
+++ b/platypush/backend/http/templates/plugins/sound/index.html
@@ -0,0 +1,21 @@
+
+
diff --git a/platypush/plugins/sound/__init__.py b/platypush/plugins/sound/__init__.py
index f5c672f56e..cd5804d38e 100644
--- a/platypush/plugins/sound/__init__.py
+++ b/platypush/plugins/sound/__init__.py
@@ -2,10 +2,9 @@
.. moduleauthor:: Fabio Manganiello
"""
-import json
-import math
import os
import queue
+import stat
import tempfile
import time
@@ -15,24 +14,22 @@ from threading import Thread, Event, RLock
from .core import Sound, Mix
from platypush.context import get_bus
-from platypush.message.event.sound import SoundPlaybackStartedEvent, \
- SoundPlaybackPausedEvent, SoundPlaybackStoppedEvent, \
- SoundRecordingStartedEvent, SoundRecordingPausedEvent, \
- SoundRecordingStoppedEvent
+from platypush.message.event.sound import \
+ SoundRecordingStartedEvent, SoundRecordingStoppedEvent
from platypush.plugins import Plugin, action
class PlaybackState(Enum):
- STOPPED='STOPPED',
- PLAYING='PLAYING',
- PAUSED='PAUSED'
+ STOPPED = 'STOPPED',
+ PLAYING = 'PLAYING',
+ PAUSED = 'PAUSED'
class RecordingState(Enum):
- STOPPED='STOPPED',
- RECORDING='RECORDING',
- PAUSED='PAUSED'
+ STOPPED = 'STOPPED',
+ RECORDING = 'RECORDING',
+ PAUSED = 'PAUSED'
class SoundPlugin(Plugin):
@@ -56,10 +53,12 @@ class SoundPlugin(Plugin):
"""
_STREAM_NAME_PREFIX = 'platypush-stream-'
+ _default_input_stream_fifo = os.path.join(tempfile.gettempdir(), 'inputstream')
+ # noinspection PyProtectedMember
def __init__(self, input_device=None, output_device=None,
input_blocksize=Sound._DEFAULT_BLOCKSIZE,
- output_blocksize=Sound._DEFAULT_BLOCKSIZE, *args, **kwargs):
+ output_blocksize=Sound._DEFAULT_BLOCKSIZE, **kwargs):
"""
:param input_device: Index or name of the default input device. Use
:meth:`platypush.plugins.sound.query_devices` to get the
@@ -82,7 +81,7 @@ class SoundPlugin(Plugin):
:type output_blocksize: int
"""
- super().__init__(*args, **kwargs)
+ super().__init__(**kwargs)
self.input_device = input_device
self.output_device = output_device
@@ -101,7 +100,8 @@ class SoundPlugin(Plugin):
self.stream_index_to_name = {}
self.completed_callback_events = {}
- def _get_default_device(self, category):
+ @staticmethod
+ def _get_default_device(category):
"""
Query the default audio devices.
@@ -110,8 +110,7 @@ class SoundPlugin(Plugin):
"""
import sounddevice as sd
- return sd.query_hostapis()[0].get('default_' + category.lower() +
- '_device')
+ return sd.query_hostapis()[0].get('default_' + category.lower() + '_device')
@action
def query_devices(self, category=None):
@@ -167,7 +166,8 @@ class SoundPlugin(Plugin):
is_raw_stream = streamtype == sd.RawOutputStream
- def audio_callback(outdata, frames, time, status):
+ # noinspection PyUnusedLocal
+ def audio_callback(outdata, frames, frame_time, status):
if self._get_playback_state(stream_index) == PlaybackState.STOPPED:
raise sd.CallbackStop
@@ -196,13 +196,12 @@ class SoundPlugin(Plugin):
if len(data) < len(outdata):
outdata[:len(data)] = data
outdata[len(data):] = (b'\x00' if is_raw_stream else 0.) * \
- (len(outdata) - len(data))
+ (len(outdata) - len(data))
else:
outdata[:] = data
return audio_callback
-
@action
def play(self, file=None, sound=None, device=None, blocksize=None,
bufsize=None, samplerate=None, channels=None, stream_name=None,
@@ -294,22 +293,26 @@ class SoundPlugin(Plugin):
if not channels:
channels = f.channels if f else 1
+ mix = None
with self.playback_state_lock:
stream_index, is_new_stream = self._get_or_allocate_stream_index(
stream_index=stream_index, stream_name=stream_name)
- if sound:
+ if sound and stream_index in self.stream_mixes:
mix = self.stream_mixes[stream_index]
mix.add(sound)
+ if not mix:
+ return None, "Unable to allocate the stream"
+
self.logger.info(('Starting playback of {} to sound device [{}] ' +
'on stream [{}]').format(
- file or sound, device, stream_index))
+ file or sound, device, stream_index))
if not is_new_stream:
- return # Let the existing callback handle the new mix
- # TODO Potential support also for mixed streams with
- # multiple sound files and synth sounds?
+ return # Let the existing callback handle the new mix
+ # TODO Potential support also for mixed streams with
+ # multiple sound files and synth sounds?
try:
# Audio queue pre-fill loop
@@ -321,11 +324,10 @@ class SoundPlugin(Plugin):
else:
duration = mix.duration()
blocktime = float(blocksize / samplerate)
- next_t = min(t+blocktime, duration) \
- if duration is not None else t+blocktime
+ next_t = min(t + blocktime, duration) \
+ if duration is not None else t + blocktime
- data = mix.get_wave(t_start=t, t_end=next_t,
- samplerate=samplerate)
+ data = mix.get_wave(t_start=t, t_end=next_t, samplerate=samplerate)
t = next_t
if duration is not None and t >= duration:
@@ -333,7 +335,6 @@ class SoundPlugin(Plugin):
q.put_nowait(data) # Pre-fill the audio queue
-
stream = self.active_streams[stream_index]
completed_callback_event = self.completed_callback_events[stream_index]
@@ -367,8 +368,8 @@ class SoundPlugin(Plugin):
else:
duration = mix.duration()
blocktime = float(blocksize / samplerate)
- next_t = min(t+blocktime, duration) \
- if duration is not None else t+blocktime
+ next_t = min(t + blocktime, duration) \
+ if duration is not None else t + blocktime
data = mix.get_wave(t_start=t, t_end=next_t,
samplerate=samplerate)
@@ -400,10 +401,115 @@ class SoundPlugin(Plugin):
self.stop_playback([stream_index])
+ @action
+ def stream_recording(self, device=None, fifo=None, duration=None, sample_rate=None,
+ dtype='float32', blocksize=None, latency=0, channels=1):
+ """
+ Return audio data from an audio source
+
+ :param device: Input device (default: default configured device or system default audio input if not configured)
+ :type device: int or str
+
+ :param fifo: Path of the FIFO that will be used to exchange audio samples (default: /tmp/inputstream)
+ :type fifo: str
+
+ :param duration: Recording duration in seconds (default: record until stop event)
+ :type duration: float
+
+ :param sample_rate: Recording sample rate (default: device default rate)
+ :type sample_rate: int
+
+ :param dtype: Data type for the audio samples. Supported types:
+ 'float64', 'float32', 'int32', 'int16', 'int8', 'uint8'. Default: float32
+ :type dtype: str
+
+ :param blocksize: Audio block size (default: configured `input_blocksize` or 2048)
+ :type blocksize: int
+
+ :param latency: Device latency in seconds (default: 0)
+ :type latency: float
+
+ :param channels: Number of channels (default: 1)
+ :type channels: int
+ """
+
+ import sounddevice as sd
+
+ self.recording_paused_changed.clear()
+
+ if device is None:
+ device = self.input_device
+ if device is None:
+ device = self._get_default_device('input')
+
+ if sample_rate is None:
+ dev_info = sd.query_devices(device, 'input')
+ sample_rate = int(dev_info['default_samplerate'])
+
+ if blocksize is None:
+ blocksize = self.input_blocksize
+
+ if not fifo:
+ fifo = self._default_input_stream_fifo
+
+ q = queue.Queue()
+
+ # noinspection PyUnusedLocal
+ def audio_callback(indata, frames, time_duration, status):
+ while self._get_recording_state() == RecordingState.PAUSED:
+ self.recording_paused_changed.wait()
+
+ if status:
+ self.logger.warning('Recording callback status: {}'.format(str(status)))
+
+ q.put(indata.copy())
+
+ def streaming_thread():
+ try:
+ with sd.InputStream(samplerate=sample_rate, device=device,
+ channels=channels, callback=audio_callback,
+ dtype=dtype, latency=latency, blocksize=blocksize):
+ with open(fifo, 'wb') as audio_queue:
+ self.start_recording()
+ get_bus().post(SoundRecordingStartedEvent())
+ self.logger.info('Started recording from device [{}]'.format(device))
+ recording_started_time = time.time()
+
+ while self._get_recording_state() != RecordingState.STOPPED \
+ and (duration is None or
+ time.time() - recording_started_time < duration):
+ while self._get_recording_state() == RecordingState.PAUSED:
+ self.recording_paused_changed.wait()
+
+ get_args = {
+ 'block': True,
+ 'timeout': max(0, duration - (time.time() - recording_started_time)),
+ } if duration is not None else {}
+
+ data = q.get(**get_args)
+ if not len(data):
+ continue
+
+ audio_queue.write(data)
+ except queue.Empty:
+ self.logger.warning('Recording timeout: audio callback failed?')
+ finally:
+ self.stop_recording()
+ get_bus().post(SoundRecordingStoppedEvent())
+
+ if os.path.exists(fifo):
+ if stat.S_ISFIFO(os.stat(fifo).st_mode):
+ self.logger.info('Removing previous input stream FIFO {}'.format(fifo))
+ os.unlink(fifo)
+ else:
+ raise RuntimeError('{} exists and is not a FIFO. Please remove it or rename it'.format(fifo))
+
+ os.mkfifo(fifo, 0o644)
+ Thread(target=streaming_thread).start()
@action
def record(self, outfile=None, duration=None, device=None, sample_rate=None,
- blocksize=None, latency=0, channels=1, subtype='PCM_24'):
+ format=None, blocksize=None, latency=0, channels=1, subtype='PCM_24'):
"""
Records audio to a sound file (support formats: wav, raw)
@@ -419,6 +525,9 @@ class SoundPlugin(Plugin):
:param sample_rate: Recording sample rate (default: device default rate)
:type sample_rate: int
+ :param format: Audio format (default: WAV)
+ :type format: str
+
:param blocksize: Audio block size (default: configured `input_blocksize` or 2048)
:type blocksize: int
@@ -432,23 +541,22 @@ class SoundPlugin(Plugin):
:type subtype: str
"""
- def recording_thread(outfile, duration, device, sample_rate, blocksize,
- latency, channels, subtype):
+ def recording_thread(outfile, duration, device, sample_rate, format,
+ blocksize, latency, channels, subtype):
import sounddevice as sd
self.recording_paused_changed.clear()
if outfile:
outfile = os.path.abspath(os.path.expanduser(outfile))
+ if os.path.isfile(outfile):
+ self.logger.info('Removing existing audio file {}'.format(outfile))
+ os.unlink(outfile)
else:
outfile = tempfile.NamedTemporaryFile(
prefix='recording_', suffix='.wav', delete=False,
dir=tempfile.gettempdir()).name
- if os.path.isfile(outfile):
- self.logger.info('Removing existing audio file {}'.format(outfile))
- os.unlink(outfile)
-
if device is None:
device = self.input_device
if device is None:
@@ -463,7 +571,7 @@ class SoundPlugin(Plugin):
q = queue.Queue()
- def audio_callback(indata, frames, time, status):
+ def audio_callback(indata, frames, duration, status):
while self._get_recording_state() == RecordingState.PAUSED:
self.recording_paused_changed.wait()
@@ -471,57 +579,61 @@ class SoundPlugin(Plugin):
self.logger.warning('Recording callback status: {}'.format(
str(status)))
- q.put(indata.copy())
-
+ q.put({
+ 'timestamp': time.time(),
+ 'frames': frames,
+ 'time': duration,
+ 'data': indata.copy()
+ })
try:
import soundfile as sf
import numpy
- with sf.SoundFile(outfile, mode='x', samplerate=sample_rate,
- channels=channels, subtype=subtype) as f:
+ with sf.SoundFile(outfile, mode='w', samplerate=sample_rate,
+ format=format, channels=channels, subtype=subtype) as f:
with sd.InputStream(samplerate=sample_rate, device=device,
channels=channels, callback=audio_callback,
latency=latency, blocksize=blocksize):
self.start_recording()
get_bus().post(SoundRecordingStartedEvent(filename=outfile))
self.logger.info('Started recording from device [{}] to [{}]'.
- format(device, outfile))
+ format(device, outfile))
recording_started_time = time.time()
while self._get_recording_state() != RecordingState.STOPPED \
and (duration is None or
- time.time() - recording_started_time < duration):
+ time.time() - recording_started_time < duration):
while self._get_recording_state() == RecordingState.PAUSED:
self.recording_paused_changed.wait()
get_args = {
'block': True,
- 'timeout': max(0, duration - (time.time() -
- recording_started_time))
+ 'timeout': max(0, duration - (time.time() - recording_started_time)),
} if duration is not None else {}
data = q.get(**get_args)
- f.write(data)
+ if data and time.time() - data.get('timestamp') <= 1.0:
+ # Only write the block if the latency is still acceptable
+ f.write(data['data'])
f.flush()
- except queue.Empty as e:
+ except queue.Empty:
self.logger.warning('Recording timeout: audio callback failed?')
finally:
self.stop_recording()
get_bus().post(SoundRecordingStoppedEvent(filename=outfile))
- Thread(target=recording_thread, args=(
- outfile, duration, device, sample_rate, blocksize, latency, channels,
- subtype)).start()
-
+ Thread(target=recording_thread,
+ args=(
+ outfile, duration, device, sample_rate, format, blocksize, latency, channels, subtype)
+ ).start()
@action
def recordplay(self, duration=None, input_device=None, output_device=None,
- sample_rate=None, blocksize=None, latency=0, channels=1,
- dtype=None):
+ sample_rate=None, blocksize=None, latency=0, channels=1, dtype=None):
"""
Records audio and plays it on an output sound device (audio pass-through)
@@ -571,6 +683,7 @@ class SoundPlugin(Plugin):
if blocksize is None:
blocksize = self.output_blocksize
+ # noinspection PyUnusedLocal
def audio_callback(indata, outdata, frames, time, status):
while self._get_recording_state() == RecordingState.PAUSED:
self.recording_paused_changed.wait()
@@ -581,7 +694,6 @@ class SoundPlugin(Plugin):
outdata[:] = indata
-
stream_index = None
try:
@@ -598,14 +710,14 @@ class SoundPlugin(Plugin):
stream=stream)
self.logger.info('Started recording pass-through from device ' +
- '[{}] to sound device [{}]'.
- format(input_device, output_device))
+ '[{}] to sound device [{}]'.
+ format(input_device, output_device))
recording_started_time = time.time()
while self._get_recording_state() != RecordingState.STOPPED \
and (duration is None or
- time.time() - recording_started_time < duration):
+ time.time() - recording_started_time < duration):
while self._get_recording_state() == RecordingState.PAUSED:
self.recording_paused_changed.wait()
@@ -617,7 +729,6 @@ class SoundPlugin(Plugin):
self.stop_playback([stream_index])
self.stop_recording()
-
@action
def query_streams(self):
"""
@@ -638,12 +749,11 @@ class SoundPlugin(Plugin):
stream['playback_state'] = self.playback_state[i].name
stream['name'] = self.stream_index_to_name.get(i)
if i in self.stream_mixes:
- stream['mix'] = { j: sound for j, sound in
- enumerate(list(self.stream_mixes[i])) }
+ stream['mix'] = {j: sound for j, sound in
+ enumerate(list(self.stream_mixes[i]))}
return streams
-
def _get_or_allocate_stream_index(self, stream_index=None, stream_name=None,
completed_callback_event=None):
stream = None
@@ -662,8 +772,8 @@ class SoundPlugin(Plugin):
if not stream:
return (self._allocate_stream_index(stream_name=stream_name,
- completed_callback_event=
- completed_callback_event),
+ completed_callback_event=
+ completed_callback_event),
True)
return (stream_index, False)
@@ -673,7 +783,7 @@ class SoundPlugin(Plugin):
stream_index = None
with self.playback_state_lock:
- for i in range(len(self.active_streams)+1):
+ for i in range(len(self.active_streams) + 1):
if i not in self.active_streams:
stream_index = i
break