Refactored Tornado routes for native pub/sub support.

The Redis pub/sub mechanism is now a native feature for Tornado routes
through the `PubSubMixin`.

(Plus, lint/black chore for the sound plugin)
This commit is contained in:
Fabio Manganiello 2023-05-30 11:08:27 +02:00
parent 8b5eb82497
commit d7208c6bbc
Signed by untrusted user: blacklight
GPG key ID: D90FBA7F76362774
10 changed files with 657 additions and 322 deletions

View file

@ -17,11 +17,10 @@ from tornado.web import Application, FallbackHandler
from platypush.backend import Backend
from platypush.backend.http.app import application
from platypush.backend.http.app.utils import get_streaming_routes, get_ws_routes
from platypush.backend.http.app.ws.events import events_redis_topic
from platypush.backend.http.app.ws.events import WSEventProxy
from platypush.bus.redis import RedisBus
from platypush.config import Config
from platypush.utils import get_redis
class HttpBackend(Backend):
@ -286,7 +285,7 @@ class HttpBackend(Backend):
def notify_web_clients(self, event):
"""Notify all the connected web clients (over websocket) of a new event"""
get_redis().publish(events_redis_topic, str(event))
WSEventProxy.publish(event) # noqa: E1120
def _get_secret_key(self, _create=False):
if _create:

View file

@ -0,0 +1,158 @@
from contextlib import contextmanager
from dataclasses import dataclass
import json
import logging
from multiprocessing import RLock
from typing import Generator, Iterable, Optional, Set, Union
from redis import ConnectionError as RedisConnectionError
from redis.client import PubSub
from platypush.config import Config
from platypush.message import Message as AppMessage
from platypush.utils import get_redis
logger = logging.getLogger(__name__)
MessageType = Union[AppMessage, bytes, str, dict, list, set, tuple]
"""Types of supported messages on Redis/websocket channels."""
@dataclass
class Message:
"""
A wrapper for a message received on a Redis subscription.
"""
data: bytes
"""The data received in the message."""
channel: str
"""The channel the message was received on."""
class PubSubMixin:
"""
A mixin for Tornado route handlers that support pub/sub mechanisms.
"""
def __init__(self, *_, subscriptions: Optional[Iterable[str]] = None, **__):
self._pubsub: Optional[PubSub] = None
"""Pub/sub proxy."""
self._subscriptions: Set[str] = set(subscriptions or [])
"""Set of current channel subscriptions."""
self._pubsub_lock = RLock()
"""
Subscriptions lock. It ensures that the list of subscriptions is
manipulated by one thread or process at the time.
"""
self.subscribe(*self._subscriptions)
@property
@contextmanager
def pubsub(self):
"""
Pub/sub proxy lazy property with context manager.
"""
with self._pubsub_lock:
# Lazy initialization for the pub/sub object.
if self._pubsub is None:
self._pubsub = get_redis().pubsub()
# Yield the pub/sub object (context manager pattern).
yield self._pubsub
with self._pubsub_lock:
# Close and free the pub/sub object if it has no active subscriptions.
if self._pubsub is not None and len(self._subscriptions) == 0:
self._pubsub.close()
self._pubsub = None
@staticmethod
def _serialize(data: MessageType) -> bytes:
"""
Serialize a message as bytes before delivering it to either a Redis or websocket channel.
"""
if isinstance(data, AppMessage):
data = str(data)
if isinstance(data, (list, tuple, set)):
data = list(data)
if isinstance(data, (list, dict)):
data = json.dumps(data, cls=AppMessage.Encoder)
if isinstance(data, str):
data = data.encode('utf-8')
return data
@classmethod
def publish(cls, data: MessageType, *channels: str) -> None:
"""
Publish data on one or more Redis channels.
"""
for channel in channels:
get_redis().publish(channel, cls._serialize(data))
def subscribe(self, *channels: str) -> None:
"""
Subscribe to a set of Redis channels.
"""
with self.pubsub as pubsub:
for channel in channels:
pubsub.subscribe(channel)
self._subscriptions.add(channel)
def unsubscribe(self, *channels: str) -> None:
"""
Unsubscribe from a set of Redis channels.
"""
with self.pubsub as pubsub:
for channel in channels:
if channel in self._subscriptions:
pubsub.unsubscribe(channel)
self._subscriptions.remove(channel)
def listen(self) -> Generator[Message, None, None]:
"""
Listens for pub/sub messages and yields them.
"""
try:
with self.pubsub as pubsub:
for msg in pubsub.listen():
channel = msg.get('channel', b'').decode()
if msg.get('type') != 'message' or not (
channel and channel in self._subscriptions
):
continue
yield Message(data=msg.get('data', b''), channel=channel)
except (AttributeError, RedisConnectionError):
return
def _pubsub_close(self):
"""
Closes the pub/sub object.
"""
with self._pubsub_lock:
if self._pubsub is not None:
try:
self._pubsub.close()
except Exception as e:
logger.debug('Error on pubsub close: %s', e)
finally:
self._pubsub = None
def on_close(self):
"""
Extensible close handler that closes the pub/sub object.
"""
self._pubsub_close()
@staticmethod
def get_channel(channel: str) -> str:
"""
Utility method that returns the prefixed Redis channel for a certain subscription name.
"""
return f'_platypush/{Config.get("device_id")}/{channel}' # type: ignore
# vim:sw=4:ts=4:et:

View file

@ -9,18 +9,23 @@ from tornado.web import RequestHandler, stream_request_body
from platypush.backend.http.app.utils.auth import AuthStatus, get_auth_status
from ..mixins import PubSubMixin
logger = getLogger(__name__)
@stream_request_body
class StreamingRoute(RequestHandler, ABC):
class StreamingRoute(RequestHandler, PubSubMixin, ABC):
"""
Base class for Tornado streaming routes.
"""
@override
def prepare(self):
# Perform authentication
"""
Request preparation logic. It performs user authentication if
``auth_required`` returns True, and it can be extended/overridden.
"""
if self.auth_required:
auth_status = get_auth_status(self.request)
if auth_status != AuthStatus.OK:
@ -33,18 +38,28 @@ class StreamingRoute(RequestHandler, ABC):
@override
def write_error(self, status_code: int, error: Optional[str] = None, **_):
"""
Make sure that errors are always returned in JSON format.
"""
self.set_header("Content-Type", "application/json")
self.finish(
json.dumps(
{"status": status_code, "error": error or responses[status_code]}
{"status": status_code, "error": error or responses.get(status_code)}
)
)
@classmethod
@abstractmethod
def path(cls) -> str:
"""
Path/URL pattern for this route.
"""
raise NotImplementedError()
@property
def auth_required(self):
def auth_required(self) -> bool:
"""
If set to True (default) then this route will require user
authentication and return 401 if authentication fails.
"""
return True

View file

@ -56,6 +56,8 @@ class CameraRoute(StreamingRoute):
camera.stream.ready.wait(timeout=timeout)
return camera.stream.frame
return None
def _should_stop(self):
if self._finished:
return True

View file

@ -1,3 +1,3 @@
from ._base import WSRoute, logger, pubsub_redis_topic
from ._base import WSRoute, logger
__all__ = ['WSRoute', 'logger', 'pubsub_redis_topic']
__all__ = ['WSRoute', 'logger']

View file

@ -1,37 +1,28 @@
from abc import ABC, abstractclassmethod
import json
from abc import ABC, abstractmethod
from logging import getLogger
from threading import RLock, Thread
from typing import Any, Generator, Iterable, Optional, Union
from threading import Thread
from typing_extensions import override
from redis import ConnectionError as RedisConnectionError
from tornado.ioloop import IOLoop
from tornado.websocket import WebSocketHandler
from platypush.backend.http.app.utils.auth import AuthStatus, get_auth_status
from platypush.config import Config
from platypush.message import Message
from platypush.utils import get_redis
from ..mixins import MessageType, PubSubMixin
logger = getLogger(__name__)
def pubsub_redis_topic(topic: str) -> str:
return f'_platypush/{Config.get("device_id")}/{topic}' # type: ignore
class WSRoute(WebSocketHandler, Thread, ABC):
class WSRoute(WebSocketHandler, Thread, PubSubMixin, ABC):
"""
Base class for Tornado websocket endpoints.
"""
def __init__(self, *args, redis_topics: Optional[Iterable[str]] = None, **kwargs):
super().__init__(*args, **kwargs)
self._redis_topics = set(redis_topics or [])
self._sub = get_redis().pubsub()
def __init__(self, *args, **kwargs):
WebSocketHandler.__init__(self, *args)
PubSubMixin.__init__(self, **kwargs)
Thread.__init__(self)
self._io_loop = IOLoop.current()
self._sub_lock = RLock()
@override
def open(self, *_, **__):
@ -51,10 +42,11 @@ class WSRoute(WebSocketHandler, Thread, ABC):
pass
@override
def on_message(self, message): # type: ignore
pass
def on_message(self, message):
return message
@abstractclassmethod
@classmethod
@abstractmethod
def app_name(cls) -> str:
raise NotImplementedError()
@ -66,55 +58,25 @@ class WSRoute(WebSocketHandler, Thread, ABC):
def auth_required(self):
return True
def subscribe(self, *topics: str) -> None:
with self._sub_lock:
for topic in topics:
self._sub.subscribe(topic)
self._redis_topics.add(topic)
def unsubscribe(self, *topics: str) -> None:
with self._sub_lock:
for topic in topics:
if topic in self._redis_topics:
self._sub.unsubscribe(topic)
self._redis_topics.remove(topic)
def listen(self) -> Generator[Any, None, None]:
try:
for msg in self._sub.listen():
if (
msg.get('type') != 'message'
and msg.get('channel').decode() not in self._redis_topics
):
continue
yield msg.get('data')
except (AttributeError, RedisConnectionError):
return
def send(self, msg: Union[str, bytes, dict, list, tuple, set]) -> None:
if isinstance(msg, (list, tuple, set)):
msg = list(msg)
if isinstance(msg, (list, dict)):
msg = json.dumps(msg, cls=Message.Encoder)
def send(self, msg: MessageType) -> None:
self._io_loop.asyncio_loop.call_soon_threadsafe( # type: ignore
self.write_message, msg
self.write_message, self._serialize(msg)
)
@override
def run(self) -> None:
super().run()
for topic in self._redis_topics:
self._sub.subscribe(topic)
self.subscribe(*self._subscriptions)
@override
def on_close(self):
topics = self._redis_topics.copy()
for topic in topics:
self.unsubscribe(topic)
super().on_close()
for channel in self._subscriptions.copy():
self.unsubscribe(channel)
if self._pubsub:
self._pubsub.close()
self._sub.close()
logger.info(
'Client %s disconnected from %s, reason=%s, message=%s',
self.request.remote_ip,

View file

@ -1,12 +1,11 @@
from typing_extensions import override
from platypush.backend.http.app.mixins import MessageType
from platypush.message.event import Event
from . import WSRoute, logger, pubsub_redis_topic
from . import WSRoute, logger
from ..utils import send_message
events_redis_topic = pubsub_redis_topic('events')
class WSEventProxy(WSRoute):
"""
@ -14,14 +13,23 @@ class WSEventProxy(WSRoute):
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.subscribe(events_redis_topic)
super().__init__(*args, subscriptions=[self.events_channel], **kwargs)
@classmethod
@override
def app_name(cls) -> str:
return 'events'
@classmethod
@property
def events_channel(cls) -> str:
return cls.get_channel('events')
@override
@classmethod
def publish(cls, data: MessageType, *_) -> None:
super().publish(data, cls.events_channel)
@override
def on_message(self, message):
try:
@ -38,9 +46,9 @@ class WSEventProxy(WSRoute):
def run(self) -> None:
for msg in self.listen():
try:
evt = Event.build(msg)
evt = Event.build(msg.data)
except Exception as e:
logger.warning('Error parsing event: %s: %s', msg, e)
continue
self.send(str(evt))
self.send(evt)

View file

@ -5,7 +5,11 @@ from typing import Optional, Union, Tuple, Set
import numpy as np
from platypush.plugins.camera.model.writer import StreamWriter, VideoWriter, FileVideoWriter
from platypush.plugins.camera.model.writer import (
StreamWriter,
VideoWriter,
FileVideoWriter,
)
from platypush.plugins.camera.model.writer.preview import PreviewWriter
@ -32,7 +36,7 @@ class CameraInfo:
stream_format: Optional[str] = None
vertical_flip: bool = False
warmup_frames: int = 0
warmup_seconds: float = 0.
warmup_seconds: float = 0.0
def set(self, **kwargs):
for k, v in kwargs.items():
@ -97,10 +101,15 @@ class Camera:
return writers
def effective_resolution(self) -> Tuple[int, int]:
"""
Calculates the effective resolution of the camera in pixels, taking
into account the base resolution, the scale and the rotation.
"""
assert self.info.resolution, 'No base resolution specified'
rot = (self.info.rotate or 0) * math.pi / 180
sin = math.sin(rot)
cos = math.cos(rot)
scale = np.array([[self.info.scale_x or 1., self.info.scale_y or 1.]])
scale = np.array([[self.info.scale_x or 1.0, self.info.scale_y or 1.0]])
resolution = np.array([[self.info.resolution[0], self.info.resolution[1]]])
rot_matrix = np.array([[sin, cos], [cos, sin]])
resolution = (scale * abs(np.cross(rot_matrix, resolution)))[0]

View file

@ -1,7 +1,3 @@
"""
.. moduleauthor:: Fabio Manganiello <blacklight86@gmail.com>
"""
import os
import queue
import stat
@ -10,25 +6,28 @@ import time
from enum import Enum
from threading import Thread, Event, RLock
from .core import Sound, Mix
from typing import Optional
from platypush.context import get_bus
from platypush.message.event.sound import \
SoundRecordingStartedEvent, SoundRecordingStoppedEvent
from platypush.message.event.sound import (
SoundRecordingStartedEvent,
SoundRecordingStoppedEvent,
)
from platypush.plugins import Plugin, action
from .core import Sound, Mix
class PlaybackState(Enum):
STOPPED = 'STOPPED',
PLAYING = 'PLAYING',
STOPPED = 'STOPPED'
PLAYING = 'PLAYING'
PAUSED = 'PAUSED'
class RecordingState(Enum):
STOPPED = 'STOPPED',
RECORDING = 'RECORDING',
STOPPED = 'STOPPED'
RECORDING = 'RECORDING'
PAUSED = 'PAUSED'
@ -55,10 +54,14 @@ class SoundPlugin(Plugin):
_STREAM_NAME_PREFIX = 'platypush-stream-'
_default_input_stream_fifo = os.path.join(tempfile.gettempdir(), 'inputstream')
# noinspection PyProtectedMember
def __init__(self, input_device=None, output_device=None,
def __init__(
self,
input_device=None,
output_device=None,
input_blocksize=Sound._DEFAULT_BLOCKSIZE,
output_blocksize=Sound._DEFAULT_BLOCKSIZE, **kwargs):
output_blocksize=Sound._DEFAULT_BLOCKSIZE,
**kwargs,
):
"""
:param input_device: Index or name of the default input device. Use
:meth:`platypush.plugins.sound.query_devices` to get the
@ -110,6 +113,7 @@ class SoundPlugin(Plugin):
"""
import sounddevice as sd
return sd.query_hostapis()[0].get('default_' + category.lower() + '_device')
@action
@ -174,17 +178,18 @@ class SoundPlugin(Plugin):
self.playback_paused_changed[stream_index].wait()
if frames != blocksize:
self.logger.warning('Received {} frames, expected blocksize is {}'.
format(frames, blocksize))
self.logger.warning(
'Received %d frames, expected blocksize is %d', frames, blocksize
)
return
if status.output_underflow:
self.logger.warning('Output underflow: increase blocksize?')
outdata[:] = (b'\x00' if is_raw_stream else 0.) * len(outdata)
outdata[:] = (b'\x00' if is_raw_stream else 0.0) * len(outdata)
return
if status:
self.logger.warning('Audio callback failed: {}'.format(status))
self.logger.warning('Audio callback failed: %s', status)
try:
data = q.get_nowait()
@ -193,18 +198,28 @@ class SoundPlugin(Plugin):
raise sd.CallbackStop
if len(data) < len(outdata):
outdata[:len(data)] = data
outdata[len(data):] = (b'\x00' if is_raw_stream else 0.) * \
(len(outdata) - len(data))
outdata[: len(data)] = data
outdata[len(data) :] = (b'\x00' if is_raw_stream else 0.0) * (
len(outdata) - len(data)
)
else:
outdata[:] = data
return audio_callback
@action
def play(self, file=None, sound=None, device=None, blocksize=None,
bufsize=None, samplerate=None, channels=None, stream_name=None,
stream_index=None):
def play(
self,
file=None,
sound=None,
device=None,
blocksize=None,
bufsize=None,
samplerate=None,
channels=None,
stream_name=None,
stream_index=None,
):
"""
Plays a sound file (support formats: wav, raw) or a synthetic sound.
@ -258,8 +273,9 @@ class SoundPlugin(Plugin):
"""
if not file and not sound:
raise RuntimeError('Please specify either a file to play or a ' +
'list of sound objects')
raise RuntimeError(
'Please specify either a file to play or a ' + 'list of sound objects'
)
import sounddevice as sd
@ -274,7 +290,7 @@ class SoundPlugin(Plugin):
q = queue.Queue(maxsize=bufsize)
f = None
t = 0.
t = 0.0
if file:
file = os.path.abspath(os.path.expanduser(file))
@ -286,6 +302,7 @@ class SoundPlugin(Plugin):
if file:
import soundfile as sf
f = sf.SoundFile(file)
if not samplerate:
samplerate = f.samplerate if f else Sound._DEFAULT_SAMPLERATE
@ -295,7 +312,8 @@ class SoundPlugin(Plugin):
mix = None
with self.playback_state_lock:
stream_index, is_new_stream = self._get_or_allocate_stream_index(
stream_index=stream_index, stream_name=stream_name)
stream_index=stream_index, stream_name=stream_name
)
if sound and stream_index in self.stream_mixes:
mix = self.stream_mixes[stream_index]
@ -304,9 +322,12 @@ class SoundPlugin(Plugin):
if not mix:
return None, "Unable to allocate the stream"
self.logger.info(('Starting playback of {} to sound device [{}] ' +
'on stream [{}]').format(
file or sound, device, stream_index))
self.logger.info(
'Starting playback of %s to sound device [%s] on stream [%s]',
file or sound,
device,
stream_index,
)
if not is_new_stream:
return # Let the existing callback handle the new mix
@ -323,8 +344,11 @@ class SoundPlugin(Plugin):
else:
duration = mix.duration()
blocktime = float(blocksize / samplerate)
next_t = min(t + blocktime, duration) \
if duration is not None else t + blocktime
next_t = (
min(t + blocktime, duration)
if duration is not None
else t + blocktime
)
data = mix.get_wave(t_start=t, t_end=next_t, samplerate=samplerate)
t = next_t
@ -339,14 +363,20 @@ class SoundPlugin(Plugin):
if stream is None:
streamtype = sd.RawOutputStream if file else sd.OutputStream
stream = streamtype(samplerate=samplerate, blocksize=blocksize,
device=device, channels=channels,
stream = streamtype(
samplerate=samplerate,
blocksize=blocksize,
device=device,
channels=channels,
dtype='float32',
callback=self._play_audio_callback(
q=q, blocksize=blocksize,
q=q,
blocksize=blocksize,
streamtype=streamtype,
stream_index=stream_index),
finished_callback=completed_callback_event.set)
stream_index=stream_index,
),
finished_callback=completed_callback_event.set,
)
self._start_playback(stream_index=stream_index, stream=stream)
@ -356,8 +386,9 @@ class SoundPlugin(Plugin):
timeout = blocksize * bufsize / samplerate
while True:
while self._get_playback_state(stream_index) == \
PlaybackState.PAUSED:
while (
self._get_playback_state(stream_index) == PlaybackState.PAUSED
):
self.playback_paused_changed[stream_index].wait()
if f:
@ -367,31 +398,38 @@ class SoundPlugin(Plugin):
else:
duration = mix.duration()
blocktime = float(blocksize / samplerate)
next_t = min(t + blocktime, duration) \
if duration is not None else t + blocktime
next_t = (
min(t + blocktime, duration)
if duration is not None
else t + blocktime
)
data = mix.get_wave(t_start=t, t_end=next_t,
samplerate=samplerate)
data = mix.get_wave(
t_start=t, t_end=next_t, samplerate=samplerate
)
t = next_t
if duration is not None and t >= duration:
break
if self._get_playback_state(stream_index) == \
PlaybackState.STOPPED:
if self._get_playback_state(stream_index) == PlaybackState.STOPPED:
break
try:
q.put(data, timeout=timeout)
except queue.Full as e:
if self._get_playback_state(stream_index) != \
PlaybackState.PAUSED:
if (
self._get_playback_state(stream_index)
!= PlaybackState.PAUSED
):
raise e
completed_callback_event.wait()
except queue.Full:
if stream_index is None or \
self._get_playback_state(stream_index) != PlaybackState.STOPPED:
if (
stream_index is None
or self._get_playback_state(stream_index) != PlaybackState.STOPPED
):
self.logger.warning('Playback timeout: audio callback failed?')
finally:
if f and not f.closed:
@ -400,35 +438,34 @@ class SoundPlugin(Plugin):
self.stop_playback([stream_index])
@action
def stream_recording(self, device=None, fifo=None, duration=None, sample_rate=None,
dtype='float32', blocksize=None, latency=0, channels=1):
def stream_recording(
self,
device: Optional[str] = None,
fifo: Optional[str] = None,
duration: Optional[float] = None,
sample_rate: Optional[int] = None,
dtype: Optional[str] = 'float32',
blocksize: Optional[int] = None,
latency: float = 0,
channels: int = 1,
):
"""
Return audio data from an audio source
:param device: Input device (default: default configured device or system default audio input if not configured)
:type device: int or str
:param fifo: Path of the FIFO that will be used to exchange audio samples (default: /tmp/inputstream)
:type fifo: str
:param duration: Recording duration in seconds (default: record until stop event)
:type duration: float
:param device: Input device (default: default configured device or
system default audio input if not configured)
:param fifo: Path of the FIFO that will be used to exchange audio
samples (default: /tmp/inputstream)
:param duration: Recording duration in seconds (default: record until
stop event)
:param sample_rate: Recording sample rate (default: device default rate)
:type sample_rate: int
:param dtype: Data type for the audio samples. Supported types:
'float64', 'float32', 'int32', 'int16', 'int8', 'uint8'. Default: float32
:type dtype: str
:param blocksize: Audio block size (default: configured `input_blocksize` or 2048)
:type blocksize: int
'float64', 'float32', 'int32', 'int16', 'int8', 'uint8'. Default:
float32
:param blocksize: Audio block size (default: configured
`input_blocksize` or 2048)
:param latency: Device latency in seconds (default: 0)
:type latency: float
:param channels: Number of channels (default: 1)
:type channels: int
"""
import sounddevice as sd
@ -452,37 +489,49 @@ class SoundPlugin(Plugin):
q = queue.Queue()
# noinspection PyUnusedLocal
def audio_callback(indata, frames, time_duration, status):
def audio_callback(indata, frames, time_duration, status): # noqa
while self._get_recording_state() == RecordingState.PAUSED:
self.recording_paused_changed.wait()
if status:
self.logger.warning('Recording callback status: {}'.format(str(status)))
self.logger.warning('Recording callback status: %s', status)
q.put(indata.copy())
def streaming_thread():
try:
with sd.InputStream(samplerate=sample_rate, device=device,
channels=channels, callback=audio_callback,
dtype=dtype, latency=latency, blocksize=blocksize):
with open(fifo, 'wb') as audio_queue:
with sd.InputStream(
samplerate=sample_rate,
device=device,
channels=channels,
callback=audio_callback,
dtype=dtype,
latency=latency,
blocksize=blocksize,
), open(fifo, 'wb') as audio_queue:
self.start_recording()
get_bus().post(SoundRecordingStartedEvent())
self.logger.info('Started recording from device [{}]'.format(device))
self.logger.info('Started recording from device [%s]', device)
recording_started_time = time.time()
while self._get_recording_state() != RecordingState.STOPPED \
and (duration is None or
time.time() - recording_started_time < duration):
while self._get_recording_state() != RecordingState.STOPPED and (
duration is None
or time.time() - recording_started_time < duration
):
while self._get_recording_state() == RecordingState.PAUSED:
self.recording_paused_changed.wait()
get_args = {
get_args = (
{
'block': True,
'timeout': max(0, duration - (time.time() - recording_started_time)),
} if duration is not None else {}
'timeout': max(
0,
duration - (time.time() - recording_started_time),
),
}
if duration is not None
else {}
)
data = q.get(**get_args)
if not len(data):
@ -497,17 +546,29 @@ class SoundPlugin(Plugin):
if os.path.exists(fifo):
if stat.S_ISFIFO(os.stat(fifo).st_mode):
self.logger.info('Removing previous input stream FIFO {}'.format(fifo))
self.logger.info('Removing previous input stream FIFO %s', fifo)
os.unlink(fifo)
else:
raise RuntimeError('{} exists and is not a FIFO. Please remove it or rename it'.format(fifo))
raise RuntimeError(
f'{fifo} exists and is not a FIFO. Please remove it or rename it'
)
os.mkfifo(fifo, 0o644)
Thread(target=streaming_thread).start()
@action
def record(self, outfile=None, duration=None, device=None, sample_rate=None,
format=None, blocksize=None, latency=0, channels=1, subtype='PCM_24'):
def record(
self,
outfile=None,
duration=None,
device=None,
sample_rate=None,
format=None,
blocksize=None,
latency=0,
channels=1,
subtype='PCM_24',
):
"""
Records audio to a sound file (support formats: wav, raw)
@ -535,12 +596,23 @@ class SoundPlugin(Plugin):
:param channels: Number of channels (default: 1)
:type channels: int
:param subtype: Recording subtype - see `Soundfile docs - Subtypes <https://pysoundfile.readthedocs.io/en/0.9.0/#soundfile.available_subtypes>`_ for a list of the available subtypes (default: PCM_24)
:param subtype: Recording subtype - see `Soundfile docs - Subtypes
<https://pysoundfile.readthedocs.io/en/0.9.0/#soundfile.available_subtypes>`_
for a list of the available subtypes (default: PCM_24)
:type subtype: str
"""
def recording_thread(outfile, duration, device, sample_rate, format,
blocksize, latency, channels, subtype):
def recording_thread(
outfile,
duration,
device,
sample_rate,
format,
blocksize,
latency,
channels,
subtype,
):
import sounddevice as sd
self.recording_paused_changed.clear()
@ -548,12 +620,15 @@ class SoundPlugin(Plugin):
if outfile:
outfile = os.path.abspath(os.path.expanduser(outfile))
if os.path.isfile(outfile):
self.logger.info('Removing existing audio file {}'.format(outfile))
self.logger.info('Removing existing audio file %s', outfile)
os.unlink(outfile)
else:
outfile = tempfile.NamedTemporaryFile(
prefix='recording_', suffix='.wav', delete=False,
dir=tempfile.gettempdir()).name
prefix='recording_',
suffix='.wav',
delete=False,
dir=tempfile.gettempdir(),
).name
if device is None:
device = self.input_device
@ -574,42 +649,68 @@ class SoundPlugin(Plugin):
self.recording_paused_changed.wait()
if status:
self.logger.warning('Recording callback status: {}'.format(
str(status)))
self.logger.warning('Recording callback status: %s', status)
q.put({
q.put(
{
'timestamp': time.time(),
'frames': frames,
'time': duration,
'data': indata.copy()
})
'data': indata.copy(),
}
)
try:
import soundfile as sf
import numpy
with sf.SoundFile(outfile, mode='w', samplerate=sample_rate,
format=format, channels=channels, subtype=subtype) as f:
with sd.InputStream(samplerate=sample_rate, device=device,
channels=channels, callback=audio_callback,
latency=latency, blocksize=blocksize):
with sf.SoundFile(
outfile,
mode='w',
samplerate=sample_rate,
format=format,
channels=channels,
subtype=subtype,
) as f:
with sd.InputStream(
samplerate=sample_rate,
device=device,
channels=channels,
callback=audio_callback,
latency=latency,
blocksize=blocksize,
):
self.start_recording()
get_bus().post(SoundRecordingStartedEvent(filename=outfile))
self.logger.info('Started recording from device [{}] to [{}]'.
format(device, outfile))
self.logger.info(
'Started recording from device [%s] to [%s]',
device,
outfile,
)
recording_started_time = time.time()
while self._get_recording_state() != RecordingState.STOPPED \
and (duration is None or
time.time() - recording_started_time < duration):
while (
self._get_recording_state() != RecordingState.STOPPED
and (
duration is None
or time.time() - recording_started_time < duration
)
):
while self._get_recording_state() == RecordingState.PAUSED:
self.recording_paused_changed.wait()
get_args = {
get_args = (
{
'block': True,
'timeout': max(0, duration - (time.time() - recording_started_time)),
} if duration is not None else {}
'timeout': max(
0,
duration
- (time.time() - recording_started_time),
),
}
if duration is not None
else {}
)
data = q.get(**get_args)
if data and time.time() - data.get('timestamp') <= 1.0:
@ -624,24 +725,45 @@ class SoundPlugin(Plugin):
self.stop_recording()
get_bus().post(SoundRecordingStoppedEvent(filename=outfile))
Thread(target=recording_thread,
Thread(
target=recording_thread,
args=(
outfile, duration, device, sample_rate, format, blocksize, latency, channels, subtype)
outfile,
duration,
device,
sample_rate,
format,
blocksize,
latency,
channels,
subtype,
),
).start()
@action
def recordplay(self, duration=None, input_device=None, output_device=None,
sample_rate=None, blocksize=None, latency=0, channels=1, dtype=None):
def recordplay(
self,
duration=None,
input_device=None,
output_device=None,
sample_rate=None,
blocksize=None,
latency=0,
channels=1,
dtype=None,
):
"""
Records audio and plays it on an output sound device (audio pass-through)
:param duration: Recording duration in seconds (default: record until stop event)
:type duration: float
:param input_device: Input device (default: default configured device or system default audio input if not configured)
:param input_device: Input device (default: default configured device
or system default audio input if not configured)
:type input_device: int or str
:param output_device: Output device (default: default configured device or system default audio output if not configured)
:param output_device: Output device (default: default configured device
or system default audio output if not configured)
:type output_device: int or str
:param sample_rate: Recording sample rate (default: device default rate)
@ -656,7 +778,10 @@ class SoundPlugin(Plugin):
:param channels: Number of channels (default: 1)
:type channels: int
:param dtype: Data type for the recording - see `Soundfile docs - Recording <https://python-sounddevice.readthedocs.io/en/0.3.12/_modules/sounddevice.html#rec>`_ for available types (default: input device default)
:param dtype: Data type for the recording - see `Soundfile docs -
Recording
<https://python-sounddevice.readthedocs.io/en/0.3.12/_modules/sounddevice.html#rec>`_
for available types (default: input device default)
:type dtype: str
"""
@ -687,35 +812,37 @@ class SoundPlugin(Plugin):
self.recording_paused_changed.wait()
if status:
self.logger.warning('Recording callback status: {}'.format(
str(status)))
self.logger.warning('Recording callback status: %s', status)
outdata[:] = indata
stream_index = None
try:
import soundfile as sf
import numpy
stream_index = self._allocate_stream_index()
stream = sd.Stream(samplerate=sample_rate, channels=channels,
blocksize=blocksize, latency=latency,
stream = sd.Stream(
samplerate=sample_rate,
channels=channels,
blocksize=blocksize,
latency=latency,
device=(input_device, output_device),
dtype=dtype, callback=audio_callback)
dtype=dtype,
callback=audio_callback,
)
self.start_recording()
self._start_playback(stream_index=stream_index,
stream=stream)
self._start_playback(stream_index=stream_index, stream=stream)
self.logger.info('Started recording pass-through from device ' +
'[{}] to sound device [{}]'.
format(input_device, output_device))
self.logger.info(
'Started recording pass-through from device [%s] to sound device [%s]',
input_device,
output_device,
)
recording_started_time = time.time()
while self._get_recording_state() != RecordingState.STOPPED \
and (duration is None or
time.time() - recording_started_time < duration):
while self._get_recording_state() != RecordingState.STOPPED and (
duration is None or time.time() - recording_started_time < duration
):
while self._get_recording_state() == RecordingState.PAUSED:
self.recording_paused_changed.wait()
@ -736,24 +863,35 @@ class SoundPlugin(Plugin):
streams = {
i: {
attr: getattr(stream, attr)
for attr in ['active', 'closed', 'stopped', 'blocksize',
'channels', 'cpu_load', 'device', 'dtype',
'latency', 'samplerate', 'samplesize']
for attr in [
'active',
'closed',
'stopped',
'blocksize',
'channels',
'cpu_load',
'device',
'dtype',
'latency',
'samplerate',
'samplesize',
]
if hasattr(stream, attr)
} for i, stream in self.active_streams.items()
}
for i, stream in self.active_streams.items()
}
for i, stream in streams.items():
stream['playback_state'] = self.playback_state[i].name
stream['name'] = self.stream_index_to_name.get(i)
if i in self.stream_mixes:
stream['mix'] = {j: sound for j, sound in
enumerate(list(self.stream_mixes[i]))}
stream['mix'] = dict(enumerate(list(self.stream_mixes[i])))
return streams
def _get_or_allocate_stream_index(self, stream_index=None, stream_name=None,
completed_callback_event=None):
def _get_or_allocate_stream_index(
self, stream_index=None, stream_name=None, completed_callback_event=None
):
stream = None
with self.playback_state_lock:
@ -762,22 +900,26 @@ class SoundPlugin(Plugin):
stream_index = self.stream_name_to_index.get(stream_name)
else:
if stream_name is not None:
raise RuntimeError('Redundant specification of both ' +
'stream_name and stream_index')
raise RuntimeError(
'Redundant specification of both '
+ 'stream_name and stream_index'
)
if stream_index is not None:
stream = self.active_streams.get(stream_index)
if not stream:
return (self._allocate_stream_index(stream_name=stream_name,
completed_callback_event=
completed_callback_event),
True)
return (
self._allocate_stream_index(
stream_name=stream_name,
completed_callback_event=completed_callback_event,
),
True,
)
return stream_index, False
def _allocate_stream_index(self, stream_name=None,
completed_callback_event=None):
def _allocate_stream_index(self, stream_name=None, completed_callback_event=None):
stream_index = None
with self.playback_state_lock:
@ -796,8 +938,9 @@ class SoundPlugin(Plugin):
self.stream_mixes[stream_index] = Mix()
self.stream_index_to_name[stream_index] = stream_name
self.stream_name_to_index[stream_name] = stream_index
self.completed_callback_events[stream_index] = \
self.completed_callback_events[stream_index] = (
completed_callback_event if completed_callback_event else Event()
)
return stream_index
@ -811,8 +954,7 @@ class SoundPlugin(Plugin):
else:
self.playback_paused_changed[stream_index] = Event()
self.logger.info('Playback started on stream index {}'.
format(stream_index))
self.logger.info('Playback started on stream index %d', stream_index)
return stream_index
@ -835,8 +977,7 @@ class SoundPlugin(Plugin):
i = self.stream_name_to_index.get(i)
stream = self.active_streams.get(i)
if not stream:
self.logger.info('No such stream index or name: {}'.
format(i))
self.logger.info('No such stream index or name: %d', i)
continue
if self.completed_callback_events[i]:
@ -859,9 +1000,10 @@ class SoundPlugin(Plugin):
if name in self.stream_name_to_index:
del self.stream_name_to_index[name]
self.logger.info('Playback stopped on streams [{}]'.format(
', '.join([str(stream) for stream in
completed_callback_events.keys()])))
self.logger.info(
'Playback stopped on streams [%s]',
', '.join([str(stream) for stream in completed_callback_events]),
)
@action
def pause_playback(self, streams=None):
@ -881,8 +1023,7 @@ class SoundPlugin(Plugin):
i = self.stream_name_to_index.get(i)
stream = self.active_streams.get(i)
if not stream:
self.logger.info('No such stream index or name: {}'.
format(i))
self.logger.info('No such stream index or name: %d', i)
continue
if self.playback_state[i] == PlaybackState.PAUSED:
@ -894,8 +1035,10 @@ class SoundPlugin(Plugin):
self.playback_paused_changed[i].set()
self.logger.info('Playback pause toggled on streams [{}]'.format(
', '.join([str(stream) for stream in streams])))
self.logger.info(
'Playback pause toggled on streams [%s]',
', '.join([str(stream) for stream in streams]),
)
def start_recording(self):
with self.recording_state_lock:
@ -921,8 +1064,14 @@ class SoundPlugin(Plugin):
self.recording_paused_changed.set()
@action
def release(self, stream_index=None, stream_name=None,
sound_index=None, midi_note=None, frequency=None):
def release(
self,
stream_index=None,
stream_name=None,
sound_index=None,
midi_note=None,
frequency=None,
):
"""
Remove a sound from an active stream, either by sound index (use
:meth:`platypush.sound.plugin.SoundPlugin.query_streams` to get
@ -949,25 +1098,26 @@ class SoundPlugin(Plugin):
if stream_name:
if stream_index:
raise RuntimeError('stream_index and stream name are ' +
'mutually exclusive')
raise RuntimeError(
'stream_index and stream name are ' + 'mutually exclusive'
)
stream_index = self.stream_name_to_index.get(stream_name)
mixes = {
i: mix for i, mix in self.stream_mixes.items()
} if stream_index is None else {
stream_index: self.stream_mixes[stream_index]
}
mixes = (
self.stream_mixes.copy()
if stream_index is None
else {stream_index: self.stream_mixes[stream_index]}
)
streams_to_stop = []
for i, mix in mixes.items():
for j, sound in enumerate(mix):
if (sound_index is not None and j == sound_index) or \
(midi_note is not None
and sound.get('midi_note') == midi_note) or \
(frequency is not None
and sound.get('frequency') == frequency):
if (
(sound_index is not None and j == sound_index)
or (midi_note is not None and sound.get('midi_note') == midi_note)
or (frequency is not None and sound.get('frequency') == frequency)
):
if len(list(mix)) == 1:
# Last sound in the mix
streams_to_stop.append(i)

View file

@ -1,7 +1,3 @@
"""
.. moduleauthor:: Fabio Manganiello <blacklight86@gmail.com>
"""
import enum
import logging
import json
@ -15,7 +11,7 @@ class WaveShape(enum.Enum):
TRIANG = 'triang'
class Sound(object):
class Sound:
"""
Models a basic synthetic sound that can be played through an audio device
"""
@ -34,9 +30,16 @@ class Sound(object):
duration = None
shape = None
def __init__(self, midi_note=midi_note, frequency=None, phase=phase,
gain=gain, duration=duration, shape=WaveShape.SIN,
A_frequency=STANDARD_A_FREQUENCY):
def __init__(
self,
midi_note=midi_note,
frequency=None,
phase=phase,
gain=gain,
duration=duration,
shape=WaveShape.SIN,
A_frequency=STANDARD_A_FREQUENCY,
):
"""
You can construct a sound either from a MIDI note or a base frequency
@ -67,20 +70,24 @@ class Sound(object):
"""
if midi_note and frequency:
raise RuntimeError('Please specify either a MIDI note or a base ' +
'frequency')
raise RuntimeError(
'Please specify either a MIDI note or a base ' + 'frequency'
)
if midi_note:
self.midi_note = midi_note
self.frequency = self.note_to_freq(midi_note=midi_note,
A_frequency=A_frequency)
self.frequency = self.note_to_freq(
midi_note=midi_note, A_frequency=A_frequency
)
elif frequency:
self.frequency = frequency
self.midi_note = self.freq_to_note(frequency=frequency,
A_frequency=A_frequency)
self.midi_note = self.freq_to_note(
frequency=frequency, A_frequency=A_frequency
)
else:
raise RuntimeError('Please specify either a MIDI note or a base ' +
'frequency')
raise RuntimeError(
'Please specify either a MIDI note or a base ' + 'frequency'
)
self.phase = phase
self.gain = gain
@ -99,8 +106,7 @@ class Sound(object):
:type A_frequency: float
"""
return (2.0 ** ((midi_note - cls.STANDARD_A_MIDI_NOTE) / 12.0)) \
* A_frequency
return (2.0 ** ((midi_note - cls.STANDARD_A_MIDI_NOTE) / 12.0)) * A_frequency
@classmethod
def freq_to_note(cls, frequency, A_frequency=STANDARD_A_FREQUENCY):
@ -116,10 +122,11 @@ class Sound(object):
# TODO return also the offset in % between the provided frequency
# and the standard MIDI note frequency
return int(12.0 * math.log(frequency / A_frequency, 2)
+ cls.STANDARD_A_MIDI_NOTE)
return int(
12.0 * math.log(frequency / A_frequency, 2) + cls.STANDARD_A_MIDI_NOTE
)
def get_wave(self, t_start=0., t_end=0., samplerate=_DEFAULT_SAMPLERATE):
def get_wave(self, t_start=0.0, t_end=0.0, samplerate=_DEFAULT_SAMPLERATE):
"""
Get the wave binary data associated to this sound
@ -137,6 +144,7 @@ class Sound(object):
"""
import numpy as np
x = np.linspace(t_start, t_end, int((t_end - t_start) * samplerate))
x = x.reshape(len(x), 1)
@ -148,8 +156,7 @@ class Sound(object):
wave[wave < 0] = -1
wave[wave >= 0] = 1
elif self.shape == WaveShape.SAWTOOTH or self.shape == WaveShape.TRIANG:
wave = 2 * (self.frequency * x -
np.floor(0.5 + self.frequency * x))
wave = 2 * (self.frequency * x - np.floor(0.5 + self.frequency * x))
if self.shape == WaveShape.TRIANG:
wave = 2 * np.abs(wave) - 1
else:
@ -157,8 +164,14 @@ class Sound(object):
return self.gain * wave
def fft(self, t_start=0., t_end=0., samplerate=_DEFAULT_SAMPLERATE,
freq_range=None, freq_buckets=None):
def fft(
self,
t_start=0.0,
t_end=0.0,
samplerate=_DEFAULT_SAMPLERATE,
freq_range=None,
freq_buckets=None,
):
"""
Get the real part of the Fourier transform associated to a time-bounded
sample of this sound
@ -173,7 +186,8 @@ class Sound(object):
:type samplerate: int
:param freq_range: FFT frequency range. Default: ``(0, samplerate/2)``
(see `Nyquist-Shannon sampling theorem <https://en.wikipedia.org/wiki/Nyquist%E2%80%93Shannon_sampling_theorem>`_)
(see`Nyquist-Shannon sampling theorem
<https://en.wikipedia.org/wiki/Nyquist%E2%80%93Shannon_sampling_theorem>`_)
:type freq_range: list or tuple with 2 int elements (range)
:param freq_buckets: Number of buckets to subdivide the frequency range.
@ -190,7 +204,7 @@ class Sound(object):
wave = self.get_wave(t_start=t_start, t_end=t_end, samplerate=samplerate)
fft = np.fft.fft(wave.reshape(len(wave)))
fft = fft.real[freq_range[0]:freq_range[1]]
fft = fft.real[freq_range[0] : freq_range[1]]
if freq_buckets is not None:
fft = np.histogram(fft, bins=freq_buckets)
@ -224,7 +238,7 @@ class Sound(object):
raise RuntimeError('Usage: {}'.format(__doc__))
class Mix(object):
class Mix:
"""
This class models a set of mixed :class:`Sound` instances that can be played
through an audio stream to an audio device
@ -251,15 +265,22 @@ class Mix(object):
def remove(self, sound_index):
if sound_index >= len(self._sounds):
self.logger.error('No such sound index: {} in mix {}'.format(
sound_index, list(self)))
self.logger.error(
'No such sound index: {} in mix {}'.format(sound_index, list(self))
)
return
self._sounds.pop(sound_index)
# noinspection PyProtectedMember
def get_wave(self, t_start=0., t_end=0., normalize_range=(-1.0, 1.0),
on_clip='scale', samplerate=Sound._DEFAULT_SAMPLERATE):
def get_wave(
self,
t_start=0.0,
t_end=0.0,
normalize_range=(-1.0, 1.0),
on_clip='scale',
samplerate=Sound._DEFAULT_SAMPLERATE,
):
"""
Get the wave binary data associated to this mix
@ -289,8 +310,9 @@ class Mix(object):
wave = None
for sound in self._sounds:
sound_wave = sound.get_wave(t_start=t_start, t_end=t_end,
samplerate=samplerate)
sound_wave = sound.get_wave(
t_start=t_start, t_end=t_end, samplerate=samplerate
)
if wave is None:
wave = sound_wave
@ -298,8 +320,9 @@ class Mix(object):
wave += sound_wave
if normalize_range and len(wave):
scale_factor = (normalize_range[1] - normalize_range[0]) / \
(wave.max() - wave.min())
scale_factor = (normalize_range[1] - normalize_range[0]) / (
wave.max() - wave.min()
)
if scale_factor < 1.0: # Wave clipping
if on_clip == 'scale':
@ -308,14 +331,21 @@ class Mix(object):
wave[wave < normalize_range[0]] = normalize_range[0]
wave[wave > normalize_range[1]] = normalize_range[1]
else:
raise RuntimeError('Supported values for "on_clip": ' +
'"scale" or "clip"')
raise RuntimeError(
'Supported values for "on_clip": ' + '"scale" or "clip"'
)
return wave
# noinspection PyProtectedMember
def fft(self, t_start=0., t_end=0., samplerate=Sound._DEFAULT_SAMPLERATE,
freq_range=None, freq_buckets=None):
def fft(
self,
t_start=0.0,
t_end=0.0,
samplerate=Sound._DEFAULT_SAMPLERATE,
freq_range=None,
freq_buckets=None,
):
"""
Get the real part of the Fourier transform associated to a time-bounded
sample of this mix
@ -330,7 +360,8 @@ class Mix(object):
:type samplerate: int
:param freq_range: FFT frequency range. Default: ``(0, samplerate/2)``
(see `Nyquist-Shannon sampling theorem <https://en.wikipedia.org/wiki/Nyquist%E2%80%93Shannon_sampling_theorem>`_)
(see `Nyquist-Shannon sampling theorem
<https://en.wikipedia.org/wiki/Nyquist%E2%80%93Shannon_sampling_theorem>`_)
:type freq_range: list or tuple with 2 int elements (range)
:param freq_buckets: Number of buckets to subdivide the frequency range.
@ -347,7 +378,7 @@ class Mix(object):
wave = self.get_wave(t_start=t_start, t_end=t_end, samplerate=samplerate)
fft = np.fft.fft(wave.reshape(len(wave)))
fft = fft.real[freq_range[0]:freq_range[1]]
fft = fft.real[freq_range[0] : freq_range[1]]
if freq_buckets is not None:
fft = np.histogram(fft, bins=freq_buckets)
@ -370,4 +401,5 @@ class Mix(object):
return duration
# vim:sw=4:ts=4:et: