diff --git a/platypush/backend/http/__init__.py b/platypush/backend/http/__init__.py index 051dc983..4df3ce8a 100644 --- a/platypush/backend/http/__init__.py +++ b/platypush/backend/http/__init__.py @@ -17,11 +17,10 @@ from tornado.web import Application, FallbackHandler from platypush.backend import Backend from platypush.backend.http.app import application from platypush.backend.http.app.utils import get_streaming_routes, get_ws_routes -from platypush.backend.http.app.ws.events import events_redis_topic +from platypush.backend.http.app.ws.events import WSEventProxy from platypush.bus.redis import RedisBus from platypush.config import Config -from platypush.utils import get_redis class HttpBackend(Backend): @@ -286,7 +285,7 @@ class HttpBackend(Backend): def notify_web_clients(self, event): """Notify all the connected web clients (over websocket) of a new event""" - get_redis().publish(events_redis_topic, str(event)) + WSEventProxy.publish(event) # noqa: E1120 def _get_secret_key(self, _create=False): if _create: diff --git a/platypush/backend/http/app/mixins/__init__.py b/platypush/backend/http/app/mixins/__init__.py new file mode 100644 index 00000000..516b1117 --- /dev/null +++ b/platypush/backend/http/app/mixins/__init__.py @@ -0,0 +1,158 @@ +from contextlib import contextmanager +from dataclasses import dataclass +import json +import logging +from multiprocessing import RLock +from typing import Generator, Iterable, Optional, Set, Union + +from redis import ConnectionError as RedisConnectionError +from redis.client import PubSub + +from platypush.config import Config +from platypush.message import Message as AppMessage +from platypush.utils import get_redis + +logger = logging.getLogger(__name__) + +MessageType = Union[AppMessage, bytes, str, dict, list, set, tuple] +"""Types of supported messages on Redis/websocket channels.""" + + +@dataclass +class Message: + """ + A wrapper for a message received on a Redis subscription. + """ + + data: bytes + """The data received in the message.""" + channel: str + """The channel the message was received on.""" + + +class PubSubMixin: + """ + A mixin for Tornado route handlers that support pub/sub mechanisms. + """ + + def __init__(self, *_, subscriptions: Optional[Iterable[str]] = None, **__): + self._pubsub: Optional[PubSub] = None + """Pub/sub proxy.""" + self._subscriptions: Set[str] = set(subscriptions or []) + """Set of current channel subscriptions.""" + self._pubsub_lock = RLock() + """ + Subscriptions lock. It ensures that the list of subscriptions is + manipulated by one thread or process at the time. + """ + + self.subscribe(*self._subscriptions) + + @property + @contextmanager + def pubsub(self): + """ + Pub/sub proxy lazy property with context manager. + """ + with self._pubsub_lock: + # Lazy initialization for the pub/sub object. + if self._pubsub is None: + self._pubsub = get_redis().pubsub() + + # Yield the pub/sub object (context manager pattern). + yield self._pubsub + + with self._pubsub_lock: + # Close and free the pub/sub object if it has no active subscriptions. + if self._pubsub is not None and len(self._subscriptions) == 0: + self._pubsub.close() + self._pubsub = None + + @staticmethod + def _serialize(data: MessageType) -> bytes: + """ + Serialize a message as bytes before delivering it to either a Redis or websocket channel. + """ + if isinstance(data, AppMessage): + data = str(data) + if isinstance(data, (list, tuple, set)): + data = list(data) + if isinstance(data, (list, dict)): + data = json.dumps(data, cls=AppMessage.Encoder) + if isinstance(data, str): + data = data.encode('utf-8') + + return data + + @classmethod + def publish(cls, data: MessageType, *channels: str) -> None: + """ + Publish data on one or more Redis channels. + """ + for channel in channels: + get_redis().publish(channel, cls._serialize(data)) + + def subscribe(self, *channels: str) -> None: + """ + Subscribe to a set of Redis channels. + """ + with self.pubsub as pubsub: + for channel in channels: + pubsub.subscribe(channel) + self._subscriptions.add(channel) + + def unsubscribe(self, *channels: str) -> None: + """ + Unsubscribe from a set of Redis channels. + """ + with self.pubsub as pubsub: + for channel in channels: + if channel in self._subscriptions: + pubsub.unsubscribe(channel) + self._subscriptions.remove(channel) + + def listen(self) -> Generator[Message, None, None]: + """ + Listens for pub/sub messages and yields them. + """ + try: + with self.pubsub as pubsub: + for msg in pubsub.listen(): + channel = msg.get('channel', b'').decode() + if msg.get('type') != 'message' or not ( + channel and channel in self._subscriptions + ): + continue + + yield Message(data=msg.get('data', b''), channel=channel) + except (AttributeError, RedisConnectionError): + return + + def _pubsub_close(self): + """ + Closes the pub/sub object. + """ + with self._pubsub_lock: + if self._pubsub is not None: + try: + self._pubsub.close() + except Exception as e: + logger.debug('Error on pubsub close: %s', e) + finally: + self._pubsub = None + + def on_close(self): + """ + Extensible close handler that closes the pub/sub object. + """ + self._pubsub_close() + + @staticmethod + def get_channel(channel: str) -> str: + """ + Utility method that returns the prefixed Redis channel for a certain subscription name. + """ + return f'_platypush/{Config.get("device_id")}/{channel}' # type: ignore + + +# vim:sw=4:ts=4:et: diff --git a/platypush/backend/http/app/streaming/_base.py b/platypush/backend/http/app/streaming/_base.py index e162eeed..e1a861fa 100644 --- a/platypush/backend/http/app/streaming/_base.py +++ b/platypush/backend/http/app/streaming/_base.py @@ -9,18 +9,23 @@ from tornado.web import RequestHandler, stream_request_body from platypush.backend.http.app.utils.auth import AuthStatus, get_auth_status +from ..mixins import PubSubMixin + logger = getLogger(__name__) @stream_request_body -class StreamingRoute(RequestHandler, ABC): +class StreamingRoute(RequestHandler, PubSubMixin, ABC): """ Base class for Tornado streaming routes. """ @override def prepare(self): - # Perform authentication + """ + Request preparation logic. It performs user authentication if + ``auth_required`` returns True, and it can be extended/overridden. + """ if self.auth_required: auth_status = get_auth_status(self.request) if auth_status != AuthStatus.OK: @@ -33,18 +38,28 @@ class StreamingRoute(RequestHandler, ABC): @override def write_error(self, status_code: int, error: Optional[str] = None, **_): + """ + Make sure that errors are always returned in JSON format. + """ self.set_header("Content-Type", "application/json") self.finish( json.dumps( - {"status": status_code, "error": error or responses[status_code]} + {"status": status_code, "error": error or responses.get(status_code)} ) ) @classmethod @abstractmethod def path(cls) -> str: + """ + Path/URL pattern for this route. + """ raise NotImplementedError() @property - def auth_required(self): + def auth_required(self) -> bool: + """ + If set to True (default) then this route will require user + authentication and return 401 if authentication fails. + """ return True diff --git a/platypush/backend/http/app/streaming/plugins/camera.py b/platypush/backend/http/app/streaming/plugins/camera.py index ccc978c2..84767461 100644 --- a/platypush/backend/http/app/streaming/plugins/camera.py +++ b/platypush/backend/http/app/streaming/plugins/camera.py @@ -56,6 +56,8 @@ class CameraRoute(StreamingRoute): camera.stream.ready.wait(timeout=timeout) return camera.stream.frame + return None + def _should_stop(self): if self._finished: return True diff --git a/platypush/backend/http/app/ws/__init__.py b/platypush/backend/http/app/ws/__init__.py index 7d62e1fe..61a762ab 100644 --- a/platypush/backend/http/app/ws/__init__.py +++ b/platypush/backend/http/app/ws/__init__.py @@ -1,3 +1,3 @@ -from ._base import WSRoute, logger, pubsub_redis_topic +from ._base import WSRoute, logger -__all__ = ['WSRoute', 'logger', 'pubsub_redis_topic'] +__all__ = ['WSRoute', 'logger'] diff --git a/platypush/backend/http/app/ws/_base.py b/platypush/backend/http/app/ws/_base.py index 3ce53378..40274b71 100644 --- a/platypush/backend/http/app/ws/_base.py +++ b/platypush/backend/http/app/ws/_base.py @@ -1,37 +1,28 @@ -from abc import ABC, abstractclassmethod -import json +from abc import ABC, abstractmethod from logging import getLogger -from threading import RLock, Thread -from typing import Any, Generator, Iterable, Optional, Union +from threading import Thread from typing_extensions import override -from redis import ConnectionError as RedisConnectionError from tornado.ioloop import IOLoop from tornado.websocket import WebSocketHandler from platypush.backend.http.app.utils.auth import AuthStatus, get_auth_status -from platypush.config import Config -from platypush.message import Message -from platypush.utils import get_redis + +from ..mixins import MessageType, PubSubMixin logger = getLogger(__name__) -def pubsub_redis_topic(topic: str) -> str: - return f'_platypush/{Config.get("device_id")}/{topic}' # type: ignore - - -class WSRoute(WebSocketHandler, Thread, ABC): +class WSRoute(WebSocketHandler, Thread, PubSubMixin, ABC): """ Base class for Tornado websocket endpoints. """ - def __init__(self, *args, redis_topics: Optional[Iterable[str]] = None, **kwargs): - super().__init__(*args, **kwargs) - self._redis_topics = set(redis_topics or []) - self._sub = get_redis().pubsub() + def __init__(self, *args, **kwargs): + WebSocketHandler.__init__(self, *args) + PubSubMixin.__init__(self, **kwargs) + Thread.__init__(self) self._io_loop = IOLoop.current() - self._sub_lock = RLock() @override def open(self, *_, **__): @@ -51,10 +42,11 @@ class WSRoute(WebSocketHandler, Thread, ABC): pass @override - def on_message(self, message): # type: ignore - pass + def on_message(self, message): + return message - @abstractclassmethod + @classmethod + @abstractmethod def app_name(cls) -> str: raise NotImplementedError() @@ -66,55 +58,25 @@ class WSRoute(WebSocketHandler, Thread, ABC): def auth_required(self): return True - def subscribe(self, *topics: str) -> None: - with self._sub_lock: - for topic in topics: - self._sub.subscribe(topic) - self._redis_topics.add(topic) - - def unsubscribe(self, *topics: str) -> None: - with self._sub_lock: - for topic in topics: - if topic in self._redis_topics: - self._sub.unsubscribe(topic) - self._redis_topics.remove(topic) - - def listen(self) -> Generator[Any, None, None]: - try: - for msg in self._sub.listen(): - if ( - msg.get('type') != 'message' - and msg.get('channel').decode() not in self._redis_topics - ): - continue - - yield msg.get('data') - except (AttributeError, RedisConnectionError): - return - - def send(self, msg: Union[str, bytes, dict, list, tuple, set]) -> None: - if isinstance(msg, (list, tuple, set)): - msg = list(msg) - if isinstance(msg, (list, dict)): - msg = json.dumps(msg, cls=Message.Encoder) - + def send(self, msg: MessageType) -> None: self._io_loop.asyncio_loop.call_soon_threadsafe( # type: ignore - self.write_message, msg + self.write_message, self._serialize(msg) ) @override def run(self) -> None: super().run() - for topic in self._redis_topics: - self._sub.subscribe(topic) + self.subscribe(*self._subscriptions) @override def on_close(self): - topics = self._redis_topics.copy() - for topic in topics: - self.unsubscribe(topic) + super().on_close() + for channel in self._subscriptions.copy(): + self.unsubscribe(channel) + + if self._pubsub: + self._pubsub.close() - self._sub.close() logger.info( 'Client %s disconnected from %s, reason=%s, message=%s', self.request.remote_ip, diff --git a/platypush/backend/http/app/ws/events.py b/platypush/backend/http/app/ws/events.py index e96833aa..53751ebb 100644 --- a/platypush/backend/http/app/ws/events.py +++ b/platypush/backend/http/app/ws/events.py @@ -1,12 +1,11 @@ from typing_extensions import override +from platypush.backend.http.app.mixins import MessageType from platypush.message.event import Event -from . import WSRoute, logger, pubsub_redis_topic +from . import WSRoute, logger from ..utils import send_message -events_redis_topic = pubsub_redis_topic('events') - class WSEventProxy(WSRoute): """ @@ -14,14 +13,23 @@ class WSEventProxy(WSRoute): """ def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.subscribe(events_redis_topic) + super().__init__(*args, subscriptions=[self.events_channel], **kwargs) @classmethod @override def app_name(cls) -> str: return 'events' + @classmethod + @property + def events_channel(cls) -> str: + return cls.get_channel('events') + + @override + @classmethod + def publish(cls, data: MessageType, *_) -> None: + super().publish(data, cls.events_channel) + @override def on_message(self, message): try: @@ -38,9 +46,9 @@ class WSEventProxy(WSRoute): def run(self) -> None: for msg in self.listen(): try: - evt = Event.build(msg) + evt = Event.build(msg.data) except Exception as e: logger.warning('Error parsing event: %s: %s', msg, e) continue - self.send(str(evt)) + self.send(evt) diff --git a/platypush/plugins/camera/model/camera.py b/platypush/plugins/camera/model/camera.py index ce027ab4..2116d34e 100644 --- a/platypush/plugins/camera/model/camera.py +++ b/platypush/plugins/camera/model/camera.py @@ -5,7 +5,11 @@ from typing import Optional, Union, Tuple, Set import numpy as np -from platypush.plugins.camera.model.writer import StreamWriter, VideoWriter, FileVideoWriter +from platypush.plugins.camera.model.writer import ( + StreamWriter, + VideoWriter, + FileVideoWriter, +) from platypush.plugins.camera.model.writer.preview import PreviewWriter @@ -32,7 +36,7 @@ class CameraInfo: stream_format: Optional[str] = None vertical_flip: bool = False warmup_frames: int = 0 - warmup_seconds: float = 0. + warmup_seconds: float = 0.0 def set(self, **kwargs): for k, v in kwargs.items(): @@ -97,10 +101,15 @@ class Camera: return writers def effective_resolution(self) -> Tuple[int, int]: + """ + Calculates the effective resolution of the camera in pixels, taking + into account the base resolution, the scale and the rotation. + """ + assert self.info.resolution, 'No base resolution specified' rot = (self.info.rotate or 0) * math.pi / 180 sin = math.sin(rot) cos = math.cos(rot) - scale = np.array([[self.info.scale_x or 1., self.info.scale_y or 1.]]) + scale = np.array([[self.info.scale_x or 1.0, self.info.scale_y or 1.0]]) resolution = np.array([[self.info.resolution[0], self.info.resolution[1]]]) rot_matrix = np.array([[sin, cos], [cos, sin]]) resolution = (scale * abs(np.cross(rot_matrix, resolution)))[0] diff --git a/platypush/plugins/sound/__init__.py b/platypush/plugins/sound/__init__.py index 9f7e4e30..94745a84 100644 --- a/platypush/plugins/sound/__init__.py +++ b/platypush/plugins/sound/__init__.py @@ -1,7 +1,3 @@ -""" -.. moduleauthor:: Fabio Manganiello -""" - import os import queue import stat @@ -10,25 +6,28 @@ import time from enum import Enum from threading import Thread, Event, RLock - -from .core import Sound, Mix +from typing import Optional from platypush.context import get_bus -from platypush.message.event.sound import \ - SoundRecordingStartedEvent, SoundRecordingStoppedEvent +from platypush.message.event.sound import ( + SoundRecordingStartedEvent, + SoundRecordingStoppedEvent, +) from platypush.plugins import Plugin, action +from .core import Sound, Mix + class PlaybackState(Enum): - STOPPED = 'STOPPED', - PLAYING = 'PLAYING', + STOPPED = 'STOPPED' + PLAYING = 'PLAYING' PAUSED = 'PAUSED' class RecordingState(Enum): - STOPPED = 'STOPPED', - RECORDING = 'RECORDING', + STOPPED = 'STOPPED' + RECORDING = 'RECORDING' PAUSED = 'PAUSED' @@ -55,10 +54,14 @@ class SoundPlugin(Plugin): _STREAM_NAME_PREFIX = 'platypush-stream-' _default_input_stream_fifo = os.path.join(tempfile.gettempdir(), 'inputstream') - # noinspection PyProtectedMember - def __init__(self, input_device=None, output_device=None, - input_blocksize=Sound._DEFAULT_BLOCKSIZE, - output_blocksize=Sound._DEFAULT_BLOCKSIZE, **kwargs): + def __init__( + self, + input_device=None, + output_device=None, + input_blocksize=Sound._DEFAULT_BLOCKSIZE, + output_blocksize=Sound._DEFAULT_BLOCKSIZE, + **kwargs, + ): """ :param input_device: Index or name of the default input device. Use :meth:`platypush.plugins.sound.query_devices` to get the @@ -110,6 +113,7 @@ class SoundPlugin(Plugin): """ import sounddevice as sd + return sd.query_hostapis()[0].get('default_' + category.lower() + '_device') @action @@ -174,17 +178,18 @@ class SoundPlugin(Plugin): self.playback_paused_changed[stream_index].wait() if frames != blocksize: - self.logger.warning('Received {} frames, expected blocksize is {}'. - format(frames, blocksize)) + self.logger.warning( + 'Received %d frames, expected blocksize is %d', frames, blocksize + ) return if status.output_underflow: self.logger.warning('Output underflow: increase blocksize?') - outdata[:] = (b'\x00' if is_raw_stream else 0.) * len(outdata) + outdata[:] = (b'\x00' if is_raw_stream else 0.0) * len(outdata) return if status: - self.logger.warning('Audio callback failed: {}'.format(status)) + self.logger.warning('Audio callback failed: %s', status) try: data = q.get_nowait() @@ -193,18 +198,28 @@ class SoundPlugin(Plugin): raise sd.CallbackStop if len(data) < len(outdata): - outdata[:len(data)] = data - outdata[len(data):] = (b'\x00' if is_raw_stream else 0.) * \ - (len(outdata) - len(data)) + outdata[: len(data)] = data + outdata[len(data) :] = (b'\x00' if is_raw_stream else 0.0) * ( + len(outdata) - len(data) + ) else: outdata[:] = data return audio_callback @action - def play(self, file=None, sound=None, device=None, blocksize=None, - bufsize=None, samplerate=None, channels=None, stream_name=None, - stream_index=None): + def play( + self, + file=None, + sound=None, + device=None, + blocksize=None, + bufsize=None, + samplerate=None, + channels=None, + stream_name=None, + stream_index=None, + ): """ Plays a sound file (support formats: wav, raw) or a synthetic sound. @@ -258,8 +273,9 @@ class SoundPlugin(Plugin): """ if not file and not sound: - raise RuntimeError('Please specify either a file to play or a ' + - 'list of sound objects') + raise RuntimeError( + 'Please specify either a file to play or a ' + 'list of sound objects' + ) import sounddevice as sd @@ -274,7 +290,7 @@ class SoundPlugin(Plugin): q = queue.Queue(maxsize=bufsize) f = None - t = 0. + t = 0.0 if file: file = os.path.abspath(os.path.expanduser(file)) @@ -286,6 +302,7 @@ class SoundPlugin(Plugin): if file: import soundfile as sf + f = sf.SoundFile(file) if not samplerate: samplerate = f.samplerate if f else Sound._DEFAULT_SAMPLERATE @@ -295,7 +312,8 @@ class SoundPlugin(Plugin): mix = None with self.playback_state_lock: stream_index, is_new_stream = self._get_or_allocate_stream_index( - stream_index=stream_index, stream_name=stream_name) + stream_index=stream_index, stream_name=stream_name + ) if sound and stream_index in self.stream_mixes: mix = self.stream_mixes[stream_index] @@ -304,9 +322,12 @@ class SoundPlugin(Plugin): if not mix: return None, "Unable to allocate the stream" - self.logger.info(('Starting playback of {} to sound device [{}] ' + - 'on stream [{}]').format( - file or sound, device, stream_index)) + self.logger.info( + 'Starting playback of %s to sound device [%s] on stream [%s]', + file or sound, + device, + stream_index, + ) if not is_new_stream: return # Let the existing callback handle the new mix @@ -323,8 +344,11 @@ class SoundPlugin(Plugin): else: duration = mix.duration() blocktime = float(blocksize / samplerate) - next_t = min(t + blocktime, duration) \ - if duration is not None else t + blocktime + next_t = ( + min(t + blocktime, duration) + if duration is not None + else t + blocktime + ) data = mix.get_wave(t_start=t, t_end=next_t, samplerate=samplerate) t = next_t @@ -339,14 +363,20 @@ class SoundPlugin(Plugin): if stream is None: streamtype = sd.RawOutputStream if file else sd.OutputStream - stream = streamtype(samplerate=samplerate, blocksize=blocksize, - device=device, channels=channels, - dtype='float32', - callback=self._play_audio_callback( - q=q, blocksize=blocksize, - streamtype=streamtype, - stream_index=stream_index), - finished_callback=completed_callback_event.set) + stream = streamtype( + samplerate=samplerate, + blocksize=blocksize, + device=device, + channels=channels, + dtype='float32', + callback=self._play_audio_callback( + q=q, + blocksize=blocksize, + streamtype=streamtype, + stream_index=stream_index, + ), + finished_callback=completed_callback_event.set, + ) self._start_playback(stream_index=stream_index, stream=stream) @@ -356,8 +386,9 @@ class SoundPlugin(Plugin): timeout = blocksize * bufsize / samplerate while True: - while self._get_playback_state(stream_index) == \ - PlaybackState.PAUSED: + while ( + self._get_playback_state(stream_index) == PlaybackState.PAUSED + ): self.playback_paused_changed[stream_index].wait() if f: @@ -367,31 +398,38 @@ class SoundPlugin(Plugin): else: duration = mix.duration() blocktime = float(blocksize / samplerate) - next_t = min(t + blocktime, duration) \ - if duration is not None else t + blocktime + next_t = ( + min(t + blocktime, duration) + if duration is not None + else t + blocktime + ) - data = mix.get_wave(t_start=t, t_end=next_t, - samplerate=samplerate) + data = mix.get_wave( + t_start=t, t_end=next_t, samplerate=samplerate + ) t = next_t if duration is not None and t >= duration: break - if self._get_playback_state(stream_index) == \ - PlaybackState.STOPPED: + if self._get_playback_state(stream_index) == PlaybackState.STOPPED: break try: q.put(data, timeout=timeout) except queue.Full as e: - if self._get_playback_state(stream_index) != \ - PlaybackState.PAUSED: + if ( + self._get_playback_state(stream_index) + != PlaybackState.PAUSED + ): raise e completed_callback_event.wait() except queue.Full: - if stream_index is None or \ - self._get_playback_state(stream_index) != PlaybackState.STOPPED: + if ( + stream_index is None + or self._get_playback_state(stream_index) != PlaybackState.STOPPED + ): self.logger.warning('Playback timeout: audio callback failed?') finally: if f and not f.closed: @@ -400,35 +438,34 @@ class SoundPlugin(Plugin): self.stop_playback([stream_index]) @action - def stream_recording(self, device=None, fifo=None, duration=None, sample_rate=None, - dtype='float32', blocksize=None, latency=0, channels=1): + def stream_recording( + self, + device: Optional[str] = None, + fifo: Optional[str] = None, + duration: Optional[float] = None, + sample_rate: Optional[int] = None, + dtype: Optional[str] = 'float32', + blocksize: Optional[int] = None, + latency: float = 0, + channels: int = 1, + ): """ Return audio data from an audio source - :param device: Input device (default: default configured device or system default audio input if not configured) - :type device: int or str - - :param fifo: Path of the FIFO that will be used to exchange audio samples (default: /tmp/inputstream) - :type fifo: str - - :param duration: Recording duration in seconds (default: record until stop event) - :type duration: float - + :param device: Input device (default: default configured device or + system default audio input if not configured) + :param fifo: Path of the FIFO that will be used to exchange audio + samples (default: /tmp/inputstream) + :param duration: Recording duration in seconds (default: record until + stop event) :param sample_rate: Recording sample rate (default: device default rate) - :type sample_rate: int - :param dtype: Data type for the audio samples. Supported types: - 'float64', 'float32', 'int32', 'int16', 'int8', 'uint8'. Default: float32 - :type dtype: str - - :param blocksize: Audio block size (default: configured `input_blocksize` or 2048) - :type blocksize: int - + 'float64', 'float32', 'int32', 'int16', 'int8', 'uint8'. Default: + float32 + :param blocksize: Audio block size (default: configured + `input_blocksize` or 2048) :param latency: Device latency in seconds (default: 0) - :type latency: float - :param channels: Number of channels (default: 1) - :type channels: int """ import sounddevice as sd @@ -452,43 +489,55 @@ class SoundPlugin(Plugin): q = queue.Queue() - # noinspection PyUnusedLocal - def audio_callback(indata, frames, time_duration, status): + def audio_callback(indata, frames, time_duration, status): # noqa while self._get_recording_state() == RecordingState.PAUSED: self.recording_paused_changed.wait() if status: - self.logger.warning('Recording callback status: {}'.format(str(status))) + self.logger.warning('Recording callback status: %s', status) q.put(indata.copy()) def streaming_thread(): try: - with sd.InputStream(samplerate=sample_rate, device=device, - channels=channels, callback=audio_callback, - dtype=dtype, latency=latency, blocksize=blocksize): - with open(fifo, 'wb') as audio_queue: - self.start_recording() - get_bus().post(SoundRecordingStartedEvent()) - self.logger.info('Started recording from device [{}]'.format(device)) - recording_started_time = time.time() + with sd.InputStream( + samplerate=sample_rate, + device=device, + channels=channels, + callback=audio_callback, + dtype=dtype, + latency=latency, + blocksize=blocksize, + ), open(fifo, 'wb') as audio_queue: + self.start_recording() + get_bus().post(SoundRecordingStartedEvent()) + self.logger.info('Started recording from device [%s]', device) + recording_started_time = time.time() - while self._get_recording_state() != RecordingState.STOPPED \ - and (duration is None or - time.time() - recording_started_time < duration): - while self._get_recording_state() == RecordingState.PAUSED: - self.recording_paused_changed.wait() + while self._get_recording_state() != RecordingState.STOPPED and ( + duration is None + or time.time() - recording_started_time < duration + ): + while self._get_recording_state() == RecordingState.PAUSED: + self.recording_paused_changed.wait() - get_args = { + get_args = ( + { 'block': True, - 'timeout': max(0, duration - (time.time() - recording_started_time)), - } if duration is not None else {} + 'timeout': max( + 0, + duration - (time.time() - recording_started_time), + ), + } + if duration is not None + else {} + ) - data = q.get(**get_args) - if not len(data): - continue + data = q.get(**get_args) + if not len(data): + continue - audio_queue.write(data) + audio_queue.write(data) except queue.Empty: self.logger.warning('Recording timeout: audio callback failed?') finally: @@ -497,17 +546,29 @@ class SoundPlugin(Plugin): if os.path.exists(fifo): if stat.S_ISFIFO(os.stat(fifo).st_mode): - self.logger.info('Removing previous input stream FIFO {}'.format(fifo)) + self.logger.info('Removing previous input stream FIFO %s', fifo) os.unlink(fifo) else: - raise RuntimeError('{} exists and is not a FIFO. Please remove it or rename it'.format(fifo)) + raise RuntimeError( + f'{fifo} exists and is not a FIFO. Please remove it or rename it' + ) os.mkfifo(fifo, 0o644) Thread(target=streaming_thread).start() @action - def record(self, outfile=None, duration=None, device=None, sample_rate=None, - format=None, blocksize=None, latency=0, channels=1, subtype='PCM_24'): + def record( + self, + outfile=None, + duration=None, + device=None, + sample_rate=None, + format=None, + blocksize=None, + latency=0, + channels=1, + subtype='PCM_24', + ): """ Records audio to a sound file (support formats: wav, raw) @@ -535,12 +596,23 @@ class SoundPlugin(Plugin): :param channels: Number of channels (default: 1) :type channels: int - :param subtype: Recording subtype - see `Soundfile docs - Subtypes `_ for a list of the available subtypes (default: PCM_24) + :param subtype: Recording subtype - see `Soundfile docs - Subtypes + `_ + for a list of the available subtypes (default: PCM_24) :type subtype: str """ - def recording_thread(outfile, duration, device, sample_rate, format, - blocksize, latency, channels, subtype): + def recording_thread( + outfile, + duration, + device, + sample_rate, + format, + blocksize, + latency, + channels, + subtype, + ): import sounddevice as sd self.recording_paused_changed.clear() @@ -548,12 +620,15 @@ class SoundPlugin(Plugin): if outfile: outfile = os.path.abspath(os.path.expanduser(outfile)) if os.path.isfile(outfile): - self.logger.info('Removing existing audio file {}'.format(outfile)) + self.logger.info('Removing existing audio file %s', outfile) os.unlink(outfile) else: outfile = tempfile.NamedTemporaryFile( - prefix='recording_', suffix='.wav', delete=False, - dir=tempfile.gettempdir()).name + prefix='recording_', + suffix='.wav', + delete=False, + dir=tempfile.gettempdir(), + ).name if device is None: device = self.input_device @@ -574,42 +649,68 @@ class SoundPlugin(Plugin): self.recording_paused_changed.wait() if status: - self.logger.warning('Recording callback status: {}'.format( - str(status))) + self.logger.warning('Recording callback status: %s', status) - q.put({ - 'timestamp': time.time(), - 'frames': frames, - 'time': duration, - 'data': indata.copy() - }) + q.put( + { + 'timestamp': time.time(), + 'frames': frames, + 'time': duration, + 'data': indata.copy(), + } + ) try: import soundfile as sf - import numpy - with sf.SoundFile(outfile, mode='w', samplerate=sample_rate, - format=format, channels=channels, subtype=subtype) as f: - with sd.InputStream(samplerate=sample_rate, device=device, - channels=channels, callback=audio_callback, - latency=latency, blocksize=blocksize): + with sf.SoundFile( + outfile, + mode='w', + samplerate=sample_rate, + format=format, + channels=channels, + subtype=subtype, + ) as f: + with sd.InputStream( + samplerate=sample_rate, + device=device, + channels=channels, + callback=audio_callback, + latency=latency, + blocksize=blocksize, + ): self.start_recording() get_bus().post(SoundRecordingStartedEvent(filename=outfile)) - self.logger.info('Started recording from device [{}] to [{}]'. - format(device, outfile)) + self.logger.info( + 'Started recording from device [%s] to [%s]', + device, + outfile, + ) recording_started_time = time.time() - while self._get_recording_state() != RecordingState.STOPPED \ - and (duration is None or - time.time() - recording_started_time < duration): + while ( + self._get_recording_state() != RecordingState.STOPPED + and ( + duration is None + or time.time() - recording_started_time < duration + ) + ): while self._get_recording_state() == RecordingState.PAUSED: self.recording_paused_changed.wait() - get_args = { - 'block': True, - 'timeout': max(0, duration - (time.time() - recording_started_time)), - } if duration is not None else {} + get_args = ( + { + 'block': True, + 'timeout': max( + 0, + duration + - (time.time() - recording_started_time), + ), + } + if duration is not None + else {} + ) data = q.get(**get_args) if data and time.time() - data.get('timestamp') <= 1.0: @@ -624,24 +725,45 @@ class SoundPlugin(Plugin): self.stop_recording() get_bus().post(SoundRecordingStoppedEvent(filename=outfile)) - Thread(target=recording_thread, - args=( - outfile, duration, device, sample_rate, format, blocksize, latency, channels, subtype) - ).start() + Thread( + target=recording_thread, + args=( + outfile, + duration, + device, + sample_rate, + format, + blocksize, + latency, + channels, + subtype, + ), + ).start() @action - def recordplay(self, duration=None, input_device=None, output_device=None, - sample_rate=None, blocksize=None, latency=0, channels=1, dtype=None): + def recordplay( + self, + duration=None, + input_device=None, + output_device=None, + sample_rate=None, + blocksize=None, + latency=0, + channels=1, + dtype=None, + ): """ Records audio and plays it on an output sound device (audio pass-through) :param duration: Recording duration in seconds (default: record until stop event) :type duration: float - :param input_device: Input device (default: default configured device or system default audio input if not configured) + :param input_device: Input device (default: default configured device + or system default audio input if not configured) :type input_device: int or str - :param output_device: Output device (default: default configured device or system default audio output if not configured) + :param output_device: Output device (default: default configured device + or system default audio output if not configured) :type output_device: int or str :param sample_rate: Recording sample rate (default: device default rate) @@ -656,7 +778,10 @@ class SoundPlugin(Plugin): :param channels: Number of channels (default: 1) :type channels: int - :param dtype: Data type for the recording - see `Soundfile docs - Recording `_ for available types (default: input device default) + :param dtype: Data type for the recording - see `Soundfile docs - + Recording + `_ + for available types (default: input device default) :type dtype: str """ @@ -687,35 +812,37 @@ class SoundPlugin(Plugin): self.recording_paused_changed.wait() if status: - self.logger.warning('Recording callback status: {}'.format( - str(status))) + self.logger.warning('Recording callback status: %s', status) outdata[:] = indata stream_index = None try: - import soundfile as sf - import numpy - stream_index = self._allocate_stream_index() - stream = sd.Stream(samplerate=sample_rate, channels=channels, - blocksize=blocksize, latency=latency, - device=(input_device, output_device), - dtype=dtype, callback=audio_callback) + stream = sd.Stream( + samplerate=sample_rate, + channels=channels, + blocksize=blocksize, + latency=latency, + device=(input_device, output_device), + dtype=dtype, + callback=audio_callback, + ) self.start_recording() - self._start_playback(stream_index=stream_index, - stream=stream) + self._start_playback(stream_index=stream_index, stream=stream) - self.logger.info('Started recording pass-through from device ' + - '[{}] to sound device [{}]'. - format(input_device, output_device)) + self.logger.info( + 'Started recording pass-through from device [%s] to sound device [%s]', + input_device, + output_device, + ) recording_started_time = time.time() - while self._get_recording_state() != RecordingState.STOPPED \ - and (duration is None or - time.time() - recording_started_time < duration): + while self._get_recording_state() != RecordingState.STOPPED and ( + duration is None or time.time() - recording_started_time < duration + ): while self._get_recording_state() == RecordingState.PAUSED: self.recording_paused_changed.wait() @@ -736,24 +863,35 @@ class SoundPlugin(Plugin): streams = { i: { attr: getattr(stream, attr) - for attr in ['active', 'closed', 'stopped', 'blocksize', - 'channels', 'cpu_load', 'device', 'dtype', - 'latency', 'samplerate', 'samplesize'] + for attr in [ + 'active', + 'closed', + 'stopped', + 'blocksize', + 'channels', + 'cpu_load', + 'device', + 'dtype', + 'latency', + 'samplerate', + 'samplesize', + ] if hasattr(stream, attr) - } for i, stream in self.active_streams.items() + } + for i, stream in self.active_streams.items() } for i, stream in streams.items(): stream['playback_state'] = self.playback_state[i].name stream['name'] = self.stream_index_to_name.get(i) if i in self.stream_mixes: - stream['mix'] = {j: sound for j, sound in - enumerate(list(self.stream_mixes[i]))} + stream['mix'] = dict(enumerate(list(self.stream_mixes[i]))) return streams - def _get_or_allocate_stream_index(self, stream_index=None, stream_name=None, - completed_callback_event=None): + def _get_or_allocate_stream_index( + self, stream_index=None, stream_name=None, completed_callback_event=None + ): stream = None with self.playback_state_lock: @@ -762,22 +900,26 @@ class SoundPlugin(Plugin): stream_index = self.stream_name_to_index.get(stream_name) else: if stream_name is not None: - raise RuntimeError('Redundant specification of both ' + - 'stream_name and stream_index') + raise RuntimeError( + 'Redundant specification of both ' + + 'stream_name and stream_index' + ) if stream_index is not None: stream = self.active_streams.get(stream_index) if not stream: - return (self._allocate_stream_index(stream_name=stream_name, - completed_callback_event= - completed_callback_event), - True) + return ( + self._allocate_stream_index( + stream_name=stream_name, + completed_callback_event=completed_callback_event, + ), + True, + ) return stream_index, False - def _allocate_stream_index(self, stream_name=None, - completed_callback_event=None): + def _allocate_stream_index(self, stream_name=None, completed_callback_event=None): stream_index = None with self.playback_state_lock: @@ -796,8 +938,9 @@ class SoundPlugin(Plugin): self.stream_mixes[stream_index] = Mix() self.stream_index_to_name[stream_index] = stream_name self.stream_name_to_index[stream_name] = stream_index - self.completed_callback_events[stream_index] = \ + self.completed_callback_events[stream_index] = ( completed_callback_event if completed_callback_event else Event() + ) return stream_index @@ -811,8 +954,7 @@ class SoundPlugin(Plugin): else: self.playback_paused_changed[stream_index] = Event() - self.logger.info('Playback started on stream index {}'. - format(stream_index)) + self.logger.info('Playback started on stream index %d', stream_index) return stream_index @@ -835,8 +977,7 @@ class SoundPlugin(Plugin): i = self.stream_name_to_index.get(i) stream = self.active_streams.get(i) if not stream: - self.logger.info('No such stream index or name: {}'. - format(i)) + self.logger.info('No such stream index or name: %d', i) continue if self.completed_callback_events[i]: @@ -859,9 +1000,10 @@ class SoundPlugin(Plugin): if name in self.stream_name_to_index: del self.stream_name_to_index[name] - self.logger.info('Playback stopped on streams [{}]'.format( - ', '.join([str(stream) for stream in - completed_callback_events.keys()]))) + self.logger.info( + 'Playback stopped on streams [%s]', + ', '.join([str(stream) for stream in completed_callback_events]), + ) @action def pause_playback(self, streams=None): @@ -881,8 +1023,7 @@ class SoundPlugin(Plugin): i = self.stream_name_to_index.get(i) stream = self.active_streams.get(i) if not stream: - self.logger.info('No such stream index or name: {}'. - format(i)) + self.logger.info('No such stream index or name: %d', i) continue if self.playback_state[i] == PlaybackState.PAUSED: @@ -894,8 +1035,10 @@ class SoundPlugin(Plugin): self.playback_paused_changed[i].set() - self.logger.info('Playback pause toggled on streams [{}]'.format( - ', '.join([str(stream) for stream in streams]))) + self.logger.info( + 'Playback pause toggled on streams [%s]', + ', '.join([str(stream) for stream in streams]), + ) def start_recording(self): with self.recording_state_lock: @@ -921,8 +1064,14 @@ class SoundPlugin(Plugin): self.recording_paused_changed.set() @action - def release(self, stream_index=None, stream_name=None, - sound_index=None, midi_note=None, frequency=None): + def release( + self, + stream_index=None, + stream_name=None, + sound_index=None, + midi_note=None, + frequency=None, + ): """ Remove a sound from an active stream, either by sound index (use :meth:`platypush.sound.plugin.SoundPlugin.query_streams` to get @@ -949,25 +1098,26 @@ class SoundPlugin(Plugin): if stream_name: if stream_index: - raise RuntimeError('stream_index and stream name are ' + - 'mutually exclusive') + raise RuntimeError( + 'stream_index and stream name are ' + 'mutually exclusive' + ) stream_index = self.stream_name_to_index.get(stream_name) - mixes = { - i: mix for i, mix in self.stream_mixes.items() - } if stream_index is None else { - stream_index: self.stream_mixes[stream_index] - } + mixes = ( + self.stream_mixes.copy() + if stream_index is None + else {stream_index: self.stream_mixes[stream_index]} + ) streams_to_stop = [] for i, mix in mixes.items(): for j, sound in enumerate(mix): - if (sound_index is not None and j == sound_index) or \ - (midi_note is not None - and sound.get('midi_note') == midi_note) or \ - (frequency is not None - and sound.get('frequency') == frequency): + if ( + (sound_index is not None and j == sound_index) + or (midi_note is not None and sound.get('midi_note') == midi_note) + or (frequency is not None and sound.get('frequency') == frequency) + ): if len(list(mix)) == 1: # Last sound in the mix streams_to_stop.append(i) diff --git a/platypush/plugins/sound/core.py b/platypush/plugins/sound/core.py index cfc51342..fc1d3ee2 100644 --- a/platypush/plugins/sound/core.py +++ b/platypush/plugins/sound/core.py @@ -1,7 +1,3 @@ -""" -.. moduleauthor:: Fabio Manganiello -""" - import enum import logging import json @@ -15,7 +11,7 @@ class WaveShape(enum.Enum): TRIANG = 'triang' -class Sound(object): +class Sound: """ Models a basic synthetic sound that can be played through an audio device """ @@ -34,9 +30,16 @@ class Sound(object): duration = None shape = None - def __init__(self, midi_note=midi_note, frequency=None, phase=phase, - gain=gain, duration=duration, shape=WaveShape.SIN, - A_frequency=STANDARD_A_FREQUENCY): + def __init__( + self, + midi_note=midi_note, + frequency=None, + phase=phase, + gain=gain, + duration=duration, + shape=WaveShape.SIN, + A_frequency=STANDARD_A_FREQUENCY, + ): """ You can construct a sound either from a MIDI note or a base frequency @@ -67,20 +70,24 @@ class Sound(object): """ if midi_note and frequency: - raise RuntimeError('Please specify either a MIDI note or a base ' + - 'frequency') + raise RuntimeError( + 'Please specify either a MIDI note or a base ' + 'frequency' + ) if midi_note: self.midi_note = midi_note - self.frequency = self.note_to_freq(midi_note=midi_note, - A_frequency=A_frequency) + self.frequency = self.note_to_freq( + midi_note=midi_note, A_frequency=A_frequency + ) elif frequency: self.frequency = frequency - self.midi_note = self.freq_to_note(frequency=frequency, - A_frequency=A_frequency) + self.midi_note = self.freq_to_note( + frequency=frequency, A_frequency=A_frequency + ) else: - raise RuntimeError('Please specify either a MIDI note or a base ' + - 'frequency') + raise RuntimeError( + 'Please specify either a MIDI note or a base ' + 'frequency' + ) self.phase = phase self.gain = gain @@ -99,8 +106,7 @@ class Sound(object): :type A_frequency: float """ - return (2.0 ** ((midi_note - cls.STANDARD_A_MIDI_NOTE) / 12.0)) \ - * A_frequency + return (2.0 ** ((midi_note - cls.STANDARD_A_MIDI_NOTE) / 12.0)) * A_frequency @classmethod def freq_to_note(cls, frequency, A_frequency=STANDARD_A_FREQUENCY): @@ -116,10 +122,11 @@ class Sound(object): # TODO return also the offset in % between the provided frequency # and the standard MIDI note frequency - return int(12.0 * math.log(frequency / A_frequency, 2) - + cls.STANDARD_A_MIDI_NOTE) + return int( + 12.0 * math.log(frequency / A_frequency, 2) + cls.STANDARD_A_MIDI_NOTE + ) - def get_wave(self, t_start=0., t_end=0., samplerate=_DEFAULT_SAMPLERATE): + def get_wave(self, t_start=0.0, t_end=0.0, samplerate=_DEFAULT_SAMPLERATE): """ Get the wave binary data associated to this sound @@ -137,6 +144,7 @@ class Sound(object): """ import numpy as np + x = np.linspace(t_start, t_end, int((t_end - t_start) * samplerate)) x = x.reshape(len(x), 1) @@ -148,8 +156,7 @@ class Sound(object): wave[wave < 0] = -1 wave[wave >= 0] = 1 elif self.shape == WaveShape.SAWTOOTH or self.shape == WaveShape.TRIANG: - wave = 2 * (self.frequency * x - - np.floor(0.5 + self.frequency * x)) + wave = 2 * (self.frequency * x - np.floor(0.5 + self.frequency * x)) if self.shape == WaveShape.TRIANG: wave = 2 * np.abs(wave) - 1 else: @@ -157,8 +164,14 @@ class Sound(object): return self.gain * wave - def fft(self, t_start=0., t_end=0., samplerate=_DEFAULT_SAMPLERATE, - freq_range=None, freq_buckets=None): + def fft( + self, + t_start=0.0, + t_end=0.0, + samplerate=_DEFAULT_SAMPLERATE, + freq_range=None, + freq_buckets=None, + ): """ Get the real part of the Fourier transform associated to a time-bounded sample of this sound @@ -173,7 +186,8 @@ class Sound(object): :type samplerate: int :param freq_range: FFT frequency range. Default: ``(0, samplerate/2)`` - (see `Nyquist-Shannon sampling theorem `_) + (see`Nyquist-Shannon sampling theorem + `_) :type freq_range: list or tuple with 2 int elements (range) :param freq_buckets: Number of buckets to subdivide the frequency range. @@ -190,7 +204,7 @@ class Sound(object): wave = self.get_wave(t_start=t_start, t_end=t_end, samplerate=samplerate) fft = np.fft.fft(wave.reshape(len(wave))) - fft = fft.real[freq_range[0]:freq_range[1]] + fft = fft.real[freq_range[0] : freq_range[1]] if freq_buckets is not None: fft = np.histogram(fft, bins=freq_buckets) @@ -224,7 +238,7 @@ class Sound(object): raise RuntimeError('Usage: {}'.format(__doc__)) -class Mix(object): +class Mix: """ This class models a set of mixed :class:`Sound` instances that can be played through an audio stream to an audio device @@ -251,15 +265,22 @@ class Mix(object): def remove(self, sound_index): if sound_index >= len(self._sounds): - self.logger.error('No such sound index: {} in mix {}'.format( - sound_index, list(self))) + self.logger.error( + 'No such sound index: {} in mix {}'.format(sound_index, list(self)) + ) return self._sounds.pop(sound_index) # noinspection PyProtectedMember - def get_wave(self, t_start=0., t_end=0., normalize_range=(-1.0, 1.0), - on_clip='scale', samplerate=Sound._DEFAULT_SAMPLERATE): + def get_wave( + self, + t_start=0.0, + t_end=0.0, + normalize_range=(-1.0, 1.0), + on_clip='scale', + samplerate=Sound._DEFAULT_SAMPLERATE, + ): """ Get the wave binary data associated to this mix @@ -289,8 +310,9 @@ class Mix(object): wave = None for sound in self._sounds: - sound_wave = sound.get_wave(t_start=t_start, t_end=t_end, - samplerate=samplerate) + sound_wave = sound.get_wave( + t_start=t_start, t_end=t_end, samplerate=samplerate + ) if wave is None: wave = sound_wave @@ -298,8 +320,9 @@ class Mix(object): wave += sound_wave if normalize_range and len(wave): - scale_factor = (normalize_range[1] - normalize_range[0]) / \ - (wave.max() - wave.min()) + scale_factor = (normalize_range[1] - normalize_range[0]) / ( + wave.max() - wave.min() + ) if scale_factor < 1.0: # Wave clipping if on_clip == 'scale': @@ -308,14 +331,21 @@ class Mix(object): wave[wave < normalize_range[0]] = normalize_range[0] wave[wave > normalize_range[1]] = normalize_range[1] else: - raise RuntimeError('Supported values for "on_clip": ' + - '"scale" or "clip"') + raise RuntimeError( + 'Supported values for "on_clip": ' + '"scale" or "clip"' + ) return wave # noinspection PyProtectedMember - def fft(self, t_start=0., t_end=0., samplerate=Sound._DEFAULT_SAMPLERATE, - freq_range=None, freq_buckets=None): + def fft( + self, + t_start=0.0, + t_end=0.0, + samplerate=Sound._DEFAULT_SAMPLERATE, + freq_range=None, + freq_buckets=None, + ): """ Get the real part of the Fourier transform associated to a time-bounded sample of this mix @@ -330,7 +360,8 @@ class Mix(object): :type samplerate: int :param freq_range: FFT frequency range. Default: ``(0, samplerate/2)`` - (see `Nyquist-Shannon sampling theorem `_) + (see `Nyquist-Shannon sampling theorem + `_) :type freq_range: list or tuple with 2 int elements (range) :param freq_buckets: Number of buckets to subdivide the frequency range. @@ -347,7 +378,7 @@ class Mix(object): wave = self.get_wave(t_start=t_start, t_end=t_end, samplerate=samplerate) fft = np.fft.fft(wave.reshape(len(wave))) - fft = fft.real[freq_range[0]:freq_range[1]] + fft = fft.real[freq_range[0] : freq_range[1]] if freq_buckets is not None: fft = np.histogram(fft, bins=freq_buckets) @@ -370,4 +401,5 @@ class Mix(object): return duration + # vim:sw=4:ts=4:et: