diff --git a/platypush/message/event/sound.py b/platypush/message/event/sound.py index 3e485600d..16ceb7456 100644 --- a/platypush/message/event/sound.py +++ b/platypush/message/event/sound.py @@ -1,65 +1,70 @@ +from abc import ABC +from typing import Optional, Tuple, Union from platypush.message.event import Event -class SoundEvent(Event): - """ Base class for sound events """ +class SoundEvent(Event, ABC): + """Base class for sound events""" - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) + def __init__( + self, *args, device: Optional[Union[str, Tuple[str, str]]] = None, **kwargs + ): + super().__init__(*args, device=device, **kwargs) -class SoundPlaybackStartedEvent(SoundEvent): +class SoundEventWithResource(SoundEvent, ABC): + """Base class for sound events with resource names attached""" + + def __init__(self, *args, resource: Optional[str] = None, **kwargs): + super().__init__(*args, resource=resource, **kwargs) + + +class SoundPlaybackStartedEvent(SoundEventWithResource): """ Event triggered when a new sound playback starts """ - def __init__(self, filename=None, *args, **kwargs): - super().__init__(*args, filename=filename, **kwargs) - -class SoundPlaybackStoppedEvent(SoundEvent): +class SoundPlaybackStoppedEvent(SoundEventWithResource): """ Event triggered when the sound playback stops """ - def __init__(self, filename=None, *args, **kwargs): - super().__init__(*args, filename=filename, **kwargs) - -class SoundPlaybackPausedEvent(SoundEvent): +class SoundPlaybackPausedEvent(SoundEventWithResource): """ Event triggered when the sound playback pauses """ - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) + +class SoundPlaybackResumedEvent(SoundEventWithResource): + """ + Event triggered when the sound playback resumsed from a paused state + """ -class SoundRecordingStartedEvent(SoundEvent): +class SoundRecordingStartedEvent(SoundEventWithResource): """ Event triggered when a new recording starts """ - def __init__(self, filename=None, *args, **kwargs): - super().__init__(*args, filename=filename, **kwargs) - -class SoundRecordingStoppedEvent(SoundEvent): +class SoundRecordingStoppedEvent(SoundEventWithResource): """ Event triggered when a sound recording stops """ - def __init__(self, filename=None, *args, **kwargs): - super().__init__(*args, filename=filename, **kwargs) - -class SoundRecordingPausedEvent(SoundEvent): +class SoundRecordingPausedEvent(SoundEventWithResource): """ Event triggered when a sound recording pauses """ - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) + +class SoundRecordingResumedEvent(SoundEvent): + """ + Event triggered when a sound recording resumes from a paused state + """ # vim:sw=4:ts=4:et: diff --git a/platypush/plugins/sound/__init__.py b/platypush/plugins/sound/__init__.py index c89f34658..70d410297 100644 --- a/platypush/plugins/sound/__init__.py +++ b/platypush/plugins/sound/__init__.py @@ -1,21 +1,12 @@ -from collections import defaultdict -import os -import queue -import stat -from typing_extensions import override +from dataclasses import asdict import warnings - -from threading import Event, RLock -from typing import Dict, Optional, Union - -import sounddevice as sd -import soundfile as sf +from typing import Iterable, List, Optional, Union +from typing_extensions import override from platypush.plugins import RunnablePlugin, action -from .core import Sound, Mix -from ._controllers import AudioRecorder -from ._model import AudioState +from ._manager import AudioManager +from ._model import DeviceType, StreamType class SoundPlugin(RunnablePlugin): @@ -27,27 +18,30 @@ class SoundPlugin(RunnablePlugin): * :class:`platypush.message.event.sound.SoundPlaybackStartedEvent` on playback start * :class:`platypush.message.event.sound.SoundPlaybackStoppedEvent` on playback stop * :class:`platypush.message.event.sound.SoundPlaybackPausedEvent` on playback pause + * :class:`platypush.message.event.sound.SoundPlaybackResumedEvent` on playback resume * :class:`platypush.message.event.sound.SoundRecordingStartedEvent` on recording start * :class:`platypush.message.event.sound.SoundRecordingStoppedEvent` on recording stop * :class:`platypush.message.event.sound.SoundRecordingPausedEvent` on recording pause + * :class:`platypush.message.event.sound.SoundRecordingResumedEvent` on recording resume Requires: * **sounddevice** (``pip install sounddevice``) - * **soundfile** (``pip install soundfile``) * **numpy** (``pip install numpy``) - * **ffmpeg** package installed on the system (for streaming support) + * **ffmpeg** package installed on the system """ - _STREAM_NAME_PREFIX = 'platypush-stream-' + _DEFAULT_BLOCKSIZE = 1024 + _DEFAULT_QUEUE_SIZE = 10 def __init__( self, - input_device=None, - output_device=None, - input_blocksize=Sound._DEFAULT_BLOCKSIZE, - output_blocksize=Sound._DEFAULT_BLOCKSIZE, + input_device: Optional[DeviceType] = None, + output_device: Optional[DeviceType] = None, + input_blocksize: int = _DEFAULT_BLOCKSIZE, + output_blocksize: int = _DEFAULT_BLOCKSIZE, + queue_size: Optional[int] = _DEFAULT_QUEUE_SIZE, ffmpeg_bin: str = 'ffmpeg', **kwargs, ): @@ -55,23 +49,18 @@ class SoundPlugin(RunnablePlugin): :param input_device: Index or name of the default input device. Use :meth:`platypush.plugins.sound.query_devices` to get the available devices. Default: system default - :type input_device: int or str - :param output_device: Index or name of the default output device. Use :meth:`platypush.plugins.sound.query_devices` to get the available devices. Default: system default - :type output_device: int or str - :param input_blocksize: Blocksize to be applied to the input device. Try to increase this value if you get input overflow errors while recording. Default: 1024 - :type input_blocksize: int - :param output_blocksize: Blocksize to be applied to the output device. Try to increase this value if you get output underflow errors while playing. Default: 1024 - :type output_blocksize: int - + :param queue_size: When running in synth mode, this is the maximum + number of generated audio frames that will be queued before the + audio device processes them (default: 100). :param ffmpeg_bin: Path of the ``ffmpeg`` binary (default: search for the ``ffmpeg`` in the ``PATH``). """ @@ -82,349 +71,147 @@ class SoundPlugin(RunnablePlugin): self.output_device = output_device self.input_blocksize = input_blocksize self.output_blocksize = output_blocksize - - self.playback_state = {} - self.playback_state_lock = RLock() - self.playback_paused_changed = {} - self.stream_mixes = {} - self.active_streams = {} - self.stream_name_to_index = {} - self.stream_index_to_name = {} - self.completed_callback_events = {} self.ffmpeg_bin = ffmpeg_bin - - self._recorders: Dict[str, AudioRecorder] = {} - self._recorder_locks: Dict[str, RLock] = defaultdict(RLock) - - @staticmethod - def _get_default_device(category: str) -> str: - """ - Query the default audio devices. - - :param category: Device category to query. Can be either input or output - """ - host_apis = sd.query_hostapis() - assert host_apis, 'No sound devices found' - available_devices = list( - filter( - lambda x: x is not None, - ( - host_api.get('default_' + category.lower() + '_device') # type: ignore - for host_api in host_apis - ), - ), + self._manager = AudioManager( + input_blocksize=self.input_blocksize, + output_blocksize=self.output_blocksize, + should_stop=self._should_stop, + input_device=input_device, + output_device=output_device, + queue_size=queue_size, ) - assert available_devices, f'No default "{category}" device found' - return available_devices[0] - - @action - def query_devices(self, category=None): - """ - Query the available devices - - :param category: Device category to query. Can be either input or output. Default: None (query all devices) - :type category: str - - :returns: A dictionary representing the available devices. - - Example:: - - [ - { - "name": "pulse", - "hostapi": 0, - "max_input_channels": 32, - "max_output_channels": 32, - "default_low_input_latency": 0.008684807256235827, - "default_low_output_latency": 0.008684807256235827, - "default_high_input_latency": 0.034807256235827665, - "default_high_output_latency": 0.034807256235827665, - "default_samplerate": 44100 - }, - { - "name": "default", - "hostapi": 0, - "max_input_channels": 32, - "max_output_channels": 32, - "default_low_input_latency": 0.008684807256235827, - "default_low_output_latency": 0.008684807256235827, - "default_high_input_latency": 0.034807256235827665, - "default_high_output_latency": 0.034807256235827665, - "default_samplerate": 44100 - } - ] - - """ - - devs = sd.query_devices() - if category == 'input': - devs = [d for d in devs if d.get('max_input_channels') > 0] # type: ignore - elif category == 'output': - devs = [d for d in devs if d.get('max_output_channels') > 0] # type: ignore - - return devs - - def _play_audio_callback(self, q, blocksize, streamtype, stream_index): - is_raw_stream = streamtype == sd.RawOutputStream - - def audio_callback(outdata, frames, *, status): - if self._get_playback_state(stream_index) == AudioState.STOPPED: - raise sd.CallbackStop - - while self._get_playback_state(stream_index) == AudioState.PAUSED: - self.playback_paused_changed[stream_index].wait() - - if frames != blocksize: - self.logger.warning( - 'Received %d frames, expected blocksize is %d', frames, blocksize - ) - return - - if status.output_underflow: - self.logger.warning('Output underflow: increase blocksize?') - outdata[:] = (b'\x00' if is_raw_stream else 0.0) * len(outdata) - return - - if status: - self.logger.warning('Audio callback failed: %s', status) - - try: - data = q.get_nowait() - except queue.Empty: - self.logger.warning('Buffer is empty: increase buffersize?') - raise sd.CallbackStop - - if len(data) < len(outdata): - outdata[: len(data)] = data - outdata[len(data) :] = (b'\x00' if is_raw_stream else 0.0) * ( - len(outdata) - len(data) - ) - else: - outdata[:] = data - - return audio_callback - @action def play( self, - file=None, - sound=None, - device=None, - blocksize=None, - bufsize=None, - samplerate=None, - channels=None, - stream_name=None, - stream_index=None, + resource: Optional[str] = None, + file: Optional[str] = None, + sound: Optional[Union[dict, Iterable[dict]]] = None, + device: Optional[DeviceType] = None, + duration: Optional[float] = None, + blocksize: Optional[int] = None, + sample_rate: Optional[int] = None, + channels: int = 2, + volume: float = 100, + stream_name: Optional[str] = None, + stream_index: Optional[int] = None, ): """ - Plays a sound file (support formats: wav, raw) or a synthetic sound. - - :param file: Sound file path. Specify this if you want to play a file - :type file: str + Plays an audio file/URL (any audio format supported by ffmpeg works) or + a synthetic sound. + :param resource: Audio resource to be played. It can be a local file or + a URL. + :param file: **Deprecated**. Use ``resource`` instead. :param sound: Sound to play. Specify this if you want to play synthetic sounds. You can also create polyphonic sounds by just - calling play multiple times. - :type sound: Sound. You can initialize it either from a list - of `Sound` objects or from its JSON representation, e.g.:: + calling play multiple times. Frequencies can be specified either by + ``midi_note`` - either as strings (e.g. ``A4``) or integers (e.g. + ``69``) - or by ``frequency`` (e.g. ``440`` for 440 Hz). You can + also specify a list of sounds here if you want to apply multiple + harmonics on a base sound. + Some examples: - { - "midi_note": 69, # 440 Hz A - "gain": 1.0, # Maximum volume - "duration": 1.0 # 1 second or until release/pause/stop - } + .. code-block:: python + + { + "frequency": 440, # 440 Hz + "volume": 100, # Maximum volume + "duration": 1.0 # 1 second or until stop_playback + } + + .. code-block:: python + + [ + { + "midi_note": "A4", # A @ 440 Hz + "volume": 100, # Maximum volume + "duration": 1.0 # 1 second or until stop_playback + }, + { + "midi_note": "E5", # Play the harmonic one fifth up + "volume": 25, # 1/4 of the volume + "duration": 1.0 # 1 second or until stop_playback + "phase": 3.14 # ~180 degrees phase + # Make it a triangular wave (default: sin). + # Supported types: "sin", "triang", "square", + # "sawtooth" + "shape: "triang" + } + ] + + .. code-block:: python + + [ + { + "midi_note": "C4", # C4 MIDI note + "duration": 0.5 # 0.5 seconds or until stop_playback + }, + { + "midi_note": "G5", # G5 MIDI note + "duration": 0.5, # 0.5 seconds or until stop_playback + "delay": 0.5 # Start this note 0.5 seconds + # after playback has started + } + ] :param device: Output device (default: default configured device or system default audio output if not configured) - :type device: int or str - + :param duration: Playback duration, in seconds. Default: None - play + until the end of the audio source or until :meth:`.stop_playback`. :param blocksize: Audio block size (default: configured `output_blocksize` or 2048) - :type blocksize: int - - :param bufsize: Size of the audio buffer (default: 20 frames for audio - files, 2 frames for synth sounds) - :type bufsize: int - - :param samplerate: Audio samplerate. Default: audio file samplerate if - in file mode, 44100 Hz if in synth mode - :type samplerate: int - + :param sample_rate: Audio sample rate. Default: audio file sample rate + if in file mode, 44100 Hz if in synth mode :param channels: Number of audio channels. Default: number of channels in the audio file in file mode, 1 if in synth mode - :type channels: int - + :param volume: Playback volume, between 0 and 100. Default: 100. :param stream_index: If specified, play to an already active stream index (you can get them through :meth:`platypush.plugins.sound.query_streams`). Default: creates a new audio stream through PortAudio. - :type stream_index: int - :param stream_name: Name of the stream to play to. If set, the sound will be played to the specified stream name, or a stream with that name will be created. If not set, and ``stream_index`` is not set either, then a new stream will be created on the next available index and named ``platypush-stream-``. - :type stream_name: str """ - if not file and not sound: + dev = self._manager.get_device(device=device, type=StreamType.OUTPUT) + blocksize = blocksize or self.output_blocksize + + if file: + warnings.warn( + 'file is deprecated, use resource instead', + DeprecationWarning, + stacklevel=1, + ) + if not resource: + resource = file + + if not (resource or sound): raise RuntimeError( - 'Please specify either a file to play or a ' + 'list of sound objects' + 'Please specify either a file to play or a list of sound objects' ) - if blocksize is None: - blocksize = self.output_blocksize - - if bufsize is None: - if file: - bufsize = Sound._DEFAULT_FILE_BUFSIZE - else: - bufsize = Sound._DEFAULT_SYNTH_BUFSIZE - - q = queue.Queue(maxsize=bufsize) - f = None - t = 0.0 - - if file: - file = os.path.abspath(os.path.expanduser(file)) - - if device is None: - device = self.output_device - if device is None: - device = self._get_default_device('output') - - if file: - f = sf.SoundFile(file) - if not samplerate: - samplerate = f.samplerate if f else Sound._DEFAULT_SAMPLERATE - if not channels: - channels = f.channels if f else 1 - - mix = None - with self.playback_state_lock: - stream_index, is_new_stream = self._get_or_allocate_stream_index( - stream_index=stream_index, stream_name=stream_name - ) - - if sound and stream_index in self.stream_mixes: - mix = self.stream_mixes[stream_index] - mix.add(sound) - - if not mix: - return None, "Unable to allocate the stream" - self.logger.info( 'Starting playback of %s to sound device [%s] on stream [%s]', - file or sound, - device, + resource or sound, + dev.index, stream_index, ) - if not is_new_stream: - return # Let the existing callback handle the new mix - # TODO Potential support also for mixed streams with - # multiple sound files and synth sounds? - - try: - # Audio queue pre-fill loop - for _ in range(bufsize): - if f: - data = f.buffer_read(blocksize, dtype='float32') - if not data: - break - else: - duration = mix.duration() - blocktime = float(blocksize / samplerate) - next_t = ( - min(t + blocktime, duration) - if duration is not None - else t + blocktime - ) - - data = mix.get_wave(t_start=t, t_end=next_t, samplerate=samplerate) - t = next_t - - if duration is not None and t >= duration: - break - - q.put_nowait(data) # Pre-fill the audio queue - - stream = self.active_streams[stream_index] - completed_callback_event = self.completed_callback_events[stream_index] - - if stream is None: - streamtype = sd.RawOutputStream if file else sd.OutputStream - stream = streamtype( - samplerate=samplerate, - blocksize=blocksize, - device=device, - channels=channels, - dtype='float32', - callback=self._play_audio_callback( - q=q, - blocksize=blocksize, - streamtype=streamtype, - stream_index=stream_index, - ), - finished_callback=completed_callback_event.set, - ) - - self.start_playback(stream_index=stream_index, stream=stream) - - with stream: - # Timeout set until we expect all the buffered blocks to - # be consumed - timeout = blocksize * bufsize / samplerate - - while True: - while self._get_playback_state(stream_index) == AudioState.PAUSED: - self.playback_paused_changed[stream_index].wait() - - if f: - data = f.buffer_read(blocksize, dtype='float32') - if not data: - break - else: - duration = mix.duration() - blocktime = float(blocksize / samplerate) - next_t = ( - min(t + blocktime, duration) - if duration is not None - else t + blocktime - ) - - data = mix.get_wave( - t_start=t, t_end=next_t, samplerate=samplerate - ) - t = next_t - - if duration is not None and t >= duration: - break - - if self._get_playback_state(stream_index) == AudioState.STOPPED: - break - - try: - q.put(data, timeout=timeout) - except queue.Full as e: - if self._get_playback_state(stream_index) != AudioState.PAUSED: - raise e - - completed_callback_event.wait() - except queue.Full: - if ( - stream_index is None - or self._get_playback_state(stream_index) != AudioState.STOPPED - ): - self.logger.warning('Playback timeout: audio callback failed?') - finally: - if f and not f.closed: - f.close() - - self.stop_playback([stream_index]) + self._manager.create_player( + device=dev.index, + infile=resource, + sound=sound, + duration=duration, + blocksize=blocksize, + sample_rate=sample_rate, + channels=channels, + volume=volume, + stream_name=stream_name, + ).start() @action def stream_recording(self, *args, **kwargs): @@ -439,86 +226,11 @@ class SoundPlugin(RunnablePlugin): return self.record(*args, **kwargs) - def create_recorder( - self, - device: str, - output_device: Optional[str] = None, - fifo: Optional[str] = None, - outfile: Optional[str] = None, - duration: Optional[float] = None, - sample_rate: Optional[int] = None, - dtype: str = 'int16', - blocksize: Optional[int] = None, - latency: Union[float, str] = 'high', - channels: int = 1, - redis_queue: Optional[str] = None, - format: str = 'wav', # pylint: disable=redefined-builtin - stream: bool = True, - play_audio: bool = False, - ) -> AudioRecorder: - with self._recorder_locks[device]: - assert self._recorders.get(device) is None, ( - f'Recording already in progress for device {device}', - ) - - if play_audio: - output_device = ( - output_device - or self.output_device - or self._get_default_device('output') - ) - - device = (device, output_device) # type: ignore - input_device = device[0] - else: - input_device = device - - if sample_rate is None: - dev_info = sd.query_devices(device, 'input') - sample_rate = int(dev_info['default_samplerate']) # type: ignore - - if blocksize is None: - blocksize = self.input_blocksize - - if fifo: - fifo = os.path.expanduser(fifo) - if os.path.exists(fifo) and stat.S_ISFIFO(os.stat(fifo).st_mode): - self.logger.info('Removing previous input stream FIFO %s', fifo) - os.unlink(fifo) - - os.mkfifo(fifo, 0o644) - outfile = fifo - elif outfile: - outfile = os.path.expanduser(outfile) - - outfile = outfile or fifo or os.devnull - self._recorders[input_device] = AudioRecorder( - plugin=self, - device=device, - outfile=outfile, - duration=duration, - sample_rate=sample_rate, - dtype=dtype, - blocksize=blocksize, - latency=latency, - output_format=format, - channels=channels, - redis_queue=redis_queue, - stream=stream, - audio_pass_through=play_audio, - should_stop=self._should_stop, - ) - - return self._recorders[input_device] - - def _get_input_device(self, device: Optional[str] = None) -> str: - return device or self.input_device or self._get_default_device('input') - @action - def record( # pylint: disable=too-many-statements + def record( self, - device: Optional[str] = None, - output_device: Optional[str] = None, + device: Optional[DeviceType] = None, + output_device: Optional[DeviceType] = None, fifo: Optional[str] = None, outfile: Optional[str] = None, duration: Optional[float] = None, @@ -527,9 +239,11 @@ class SoundPlugin(RunnablePlugin): blocksize: Optional[int] = None, latency: Union[float, str] = 'high', channels: int = 1, + volume: float = 100, redis_queue: Optional[str] = None, format: str = 'wav', # pylint: disable=redefined-builtin stream: bool = True, + stream_name: Optional[str] = None, play_audio: bool = False, ): """ @@ -556,16 +270,19 @@ class SoundPlugin(RunnablePlugin): real-time on the selected ``output_device`` (default: False). :param latency: Device latency in seconds (default: the device's default high latency) :param channels: Number of channels (default: 1) + :param volume: Recording volume, between 0 and 100. Default: 100. :param redis_queue: If set, the audio chunks will also be published to this Redis channel, so other consumers can process them downstream. - :param format: Audio format. Supported: wav, mp3, ogg, aac. Default: wav. + :param format: Audio format. Supported: wav, mp3, ogg, aac, flac. + Default: wav. :param stream: If True (default), then the audio will be streamed to an HTTP endpoint too (default: ``/sound/stream<.format>``). + :param stream_name: Custom name for the output stream. """ - device = self._get_input_device(device) - self.create_recorder( - device, + dev = self._manager.get_device(device=device, type=StreamType.INPUT) + self._manager.create_recorder( + dev.index, output_device=output_device, fifo=fifo, outfile=outfile, @@ -575,9 +292,11 @@ class SoundPlugin(RunnablePlugin): blocksize=blocksize, latency=latency, channels=channels, + volume=volume, redis_queue=redis_queue, format=format, stream=stream, + stream_name=stream_name, play_audio=play_audio, ).start() @@ -596,211 +315,121 @@ class SoundPlugin(RunnablePlugin): return self.record(*args, **kwargs) @action - def query_streams(self): - """ - :returns: A list of active audio streams + def status(self) -> List[dict]: """ + :return: The current status of the audio devices and streams. - streams = { - i: { - attr: getattr(stream, attr) - for attr in [ - 'active', - 'closed', - 'stopped', - 'blocksize', - 'channels', - 'cpu_load', - 'device', - 'dtype', - 'latency', - 'samplerate', - 'samplesize', + Example: + + .. code-block:: json + + [ + { + "streams": [ + { + "device": 3, + "direction": "output", + "outfile": "/dev/null", + "infile": "/mnt/hd/media/music/audio.mp3", + "ffmpeg_bin": "ffmpeg", + "channels": 2, + "sample_rate": 44100, + "dtype": "int16", + "streaming": false, + "duration": null, + "blocksize": 1024, + "latency": "high", + "redis_queue": "platypush-stream-AudioResourcePlayer-3", + "audio_pass_through": false, + "state": "PAUSED", + "started_time": "2023-06-19T11:57:05.882329", + "stream_index": 1, + "stream_name": "platypush:audio:output:1" + } + ], + "index": 3, + "name": "default", + "hostapi": 0, + "max_input_channels": 32, + "max_output_channels": 32, + "default_samplerate": 44100, + "default_low_input_latency": 0.008707482993197279, + "default_low_output_latency": 0.008707482993197279, + "default_high_input_latency": 0.034829931972789115, + "default_high_output_latency": 0.034829931972789115 + } ] - if hasattr(stream, attr) - } - for i, stream in self.active_streams.items() - } - for i, stream in streams.items(): - stream['playback_state'] = self.playback_state[i].name - stream['name'] = self.stream_index_to_name.get(i) - if i in self.stream_mixes: - stream['mix'] = dict(enumerate(list(self.stream_mixes[i]))) + """ + devices = self._manager.get_devices() + streams = self._manager.get_streams() + ret = {dev.index: {'streams': [], **asdict(dev)} for dev in devices} + for stream in streams: + if stream.device is None: + continue - return streams - - def _get_or_allocate_stream_index( - self, stream_index=None, stream_name=None, completed_callback_event=None - ): - stream = None - - with self.playback_state_lock: - if stream_index is None: - if stream_name is not None: - stream_index = self.stream_name_to_index.get(stream_name) - else: - if stream_name is not None: - raise RuntimeError( - 'Redundant specification of both ' - + 'stream_name and stream_index' - ) - - if stream_index is not None: - stream = self.active_streams.get(stream_index) - - if not stream: - return ( - self._allocate_stream_index( - stream_name=stream_name, - completed_callback_event=completed_callback_event, - ), - True, - ) - - return stream_index, False - - def _allocate_stream_index(self, stream_name=None, completed_callback_event=None): - stream_index = None - - with self.playback_state_lock: - for i in range(len(self.active_streams) + 1): - if i not in self.active_streams: - stream_index = i - break - - if stream_index is None: - raise RuntimeError('No stream index available') - - if stream_name is None: - stream_name = self._STREAM_NAME_PREFIX + str(stream_index) - - self.active_streams[stream_index] = None - self.stream_mixes[stream_index] = Mix() - self.stream_index_to_name[stream_index] = stream_name - self.stream_name_to_index[stream_name] = stream_index - self.completed_callback_events[stream_index] = ( - completed_callback_event if completed_callback_event else Event() + dev_index = int( + stream.device + if isinstance(stream.device, (int, str)) + else stream.device[0] ) - return stream_index + ret[dev_index]['streams'].append(stream.asdict()) - def start_playback(self, stream_index, stream): - with self.playback_state_lock: - self.playback_state[stream_index] = AudioState.RUNNING - self.active_streams[stream_index] = stream - - if isinstance(self.playback_paused_changed.get(stream_index), Event): - self.playback_paused_changed[stream_index].clear() - else: - self.playback_paused_changed[stream_index] = Event() - - self.logger.info('Playback started on stream index %d', stream_index) - - return stream_index + return list(ret.values()) @action - def stop_playback(self, streams=None): + def query_streams(self): """ - :param streams: Streams to stop by index or name (default: all) - :type streams: list[int] or list[str] + Deprecated alias for :meth:`.status`. """ - with self.playback_state_lock: - streams = streams or self.active_streams.keys() - if not streams: - return - completed_callback_events = {} - - for i in streams: - stream = self.active_streams.get(i) - if not stream: - i = self.stream_name_to_index.get(i) - stream = self.active_streams.get(i) - if not stream: - self.logger.info('No such stream index or name: %d', i) - continue - - if self.completed_callback_events[i]: - completed_callback_events[i] = self.completed_callback_events[i] - self.playback_state[i] = AudioState.STOPPED - - for i, event in completed_callback_events.items(): - event.wait() - - if i in self.completed_callback_events: - del self.completed_callback_events[i] - if i in self.active_streams: - del self.active_streams[i] - if i in self.stream_mixes: - del self.stream_mixes[i] - - if i in self.stream_index_to_name: - name = self.stream_index_to_name[i] - del self.stream_index_to_name[i] - if name in self.stream_name_to_index: - del self.stream_name_to_index[name] - - self.logger.info( - 'Playback stopped on streams [%s]', - ', '.join([str(stream) for stream in completed_callback_events]), + warnings.warn( + 'sound.query_streams is deprecated, use sound.status instead', + DeprecationWarning, + stacklevel=1, ) + return self.status() + @action - def pause_playback(self, streams=None): + def stop_playback( + self, + device: Optional[DeviceType] = None, + streams: Optional[Iterable[Union[int, str]]] = None, + ): """ - :param streams: Streams to pause by index (default: all) - :type streams: list[int] + :param device: Only stop the streams on the specified device, by name or index (default: all). + :param streams: Streams to stop by index or name (default: all). """ + self._manager.stop_audio(device=device, streams=streams, type=StreamType.OUTPUT) - with self.playback_state_lock: - streams = streams or self.active_streams.keys() - if not streams: - return - - for i in streams: - stream = self.active_streams.get(i) - if not stream: - i = self.stream_name_to_index.get(i) - stream = self.active_streams.get(i) - if not stream: - self.logger.info('No such stream index or name: %d', i) - continue - - if self.playback_state[i] == AudioState.PAUSED: - self.playback_state[i] = AudioState.RUNNING - elif self.playback_state[i] == AudioState.RUNNING: - self.playback_state[i] = AudioState.PAUSED - else: - continue - - self.playback_paused_changed[i].set() - - self.logger.info( - 'Playback pause toggled on streams [%s]', - ', '.join([str(stream) for stream in streams]), + @action + def pause_playback( + self, + device: Optional[DeviceType] = None, + streams: Optional[Iterable[Union[int, str]]] = None, + ): + """ + :param device: Only stop the streams on the specified device, by name or index (default: all). + :param streams: Streams to stop by index or name (default: all). + """ + self._manager.pause_audio( + device=device, streams=streams, type=StreamType.OUTPUT ) @action def stop_recording( - self, device: Optional[str] = None, timeout: Optional[float] = 2 + self, device: Optional[DeviceType] = None, timeout: Optional[float] = 2 ): """ Stop the current recording process on the selected device (default: default input device), if it is running. """ - device = self._get_input_device(device) - with self._recorder_locks[device]: - recorder = self._recorders.pop(device, None) - if not recorder: - self.logger.warning('No active recording session for device %s', device) - return - - recorder.notify_stop() - recorder.join(timeout=timeout) + self._manager.stop_audio(device, StreamType.INPUT, timeout=timeout) @action - def pause_recording(self, device: Optional[str] = None): + def pause_recording(self, device: Optional[DeviceType] = None): """ Toggle the recording pause state on the selected device (default: default input device), if it is running. @@ -808,94 +437,32 @@ class SoundPlugin(RunnablePlugin): If paused, the recording will be resumed. If running, it will be paused. Otherwise, no action will be taken. """ - device = self._get_input_device(device) - with self._recorder_locks[device]: - recorder = self._recorders.get(device) - if not recorder: - self.logger.warning('No active recording session for device %s', device) - return - - recorder.notify_pause() + self._manager.pause_audio(device, StreamType.INPUT) @action - def release( + def set_volume( self, - stream_index=None, - stream_name=None, - sound_index=None, - midi_note=None, - frequency=None, + volume: float, + device: Optional[DeviceType] = None, + streams: Optional[Iterable[Union[int, str]]] = None, ): """ - Remove a sound from an active stream, either by sound index (use - :meth:`platypush.sound.plugin.SoundPlugin.query_streams` to get - the sounds playing on the active streams), midi_note, frequency - or absolute file path. + Set the audio input/output volume. - :param stream_index: Stream index (default: sound removed from all the - active streams) - :type stream_index: int - - :param stream_name: Stream name (default: sound removed from all the - active streams) - :type stream_index: str - - :param sound_index: Sound index - :type sound_index: int - - :param midi_note: MIDI note - :type midi_note: int - - :param frequency: Sound frequency - :type frequency: float + :param volume: New volume, between 0 and 100. + :param device: Set the volume only on the specified device (default: + all). + :param streams: Set the volume only on the specified list of stream + indices/names (default: all). """ - - if stream_name: - if stream_index: - raise RuntimeError( - 'stream_index and stream name are ' + 'mutually exclusive' - ) - stream_index = self.stream_name_to_index.get(stream_name) - - mixes = ( - self.stream_mixes.copy() - if stream_index is None - else {stream_index: self.stream_mixes[stream_index]} - ) - - streams_to_stop = [] - - for i, mix in mixes.items(): - for j, sound in enumerate(mix): - if ( - (sound_index is not None and j == sound_index) - or (midi_note is not None and sound.get('midi_note') == midi_note) - or (frequency is not None and sound.get('frequency') == frequency) - ): - if len(list(mix)) == 1: - # Last sound in the mix - streams_to_stop.append(i) - else: - mix.remove(j) - - if streams_to_stop: - self.stop_playback(streams_to_stop) - - def _get_playback_state(self, stream_index): - with self.playback_state_lock: - return self.playback_state[stream_index] + self._manager.set_volume(volume=volume, device=device, streams=streams) @override def main(self): - self.wait_stop() - - @override - def stop(self): - super().stop() - devices = list(self._recorders.keys()) - - for device in devices: - self.stop_recording(device, timeout=0) + try: + self.wait_stop() + finally: + self._manager.stop_audio() # vim:sw=4:ts=4:et: diff --git a/platypush/plugins/sound/_controllers/__init__.py b/platypush/plugins/sound/_controllers/__init__.py deleted file mode 100644 index 01ce57283..000000000 --- a/platypush/plugins/sound/_controllers/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -from ._base import AudioThread -from ._recorder import AudioRecorder - - -__all__ = ['AudioRecorder', 'AudioThread'] diff --git a/platypush/plugins/sound/_controllers/_base.py b/platypush/plugins/sound/_controllers/_base.py deleted file mode 100644 index e7060cd92..000000000 --- a/platypush/plugins/sound/_controllers/_base.py +++ /dev/null @@ -1,227 +0,0 @@ -from contextlib import contextmanager -import queue -import time - -from abc import ABC, abstractmethod -from logging import getLogger -from threading import Event, RLock, Thread -from typing import IO, Generator, Optional, Tuple, Union -from typing_extensions import override - -import sounddevice as sd - -from .._converter import ConverterProcess -from .._model import AudioState - - -class AudioThread(Thread, ABC): - """ - Base class for audio play/record threads. - """ - - _STREAM_NAME_PREFIX = 'platypush-stream-' - - def __init__( - self, - plugin, - device: Union[str, Tuple[str, str]], - outfile: str, - output_format: str, - channels: int, - sample_rate: int, - dtype: str, - stream: bool, - audio_pass_through: bool, - duration: Optional[float] = None, - blocksize: Optional[int] = None, - latency: Union[float, str] = 'high', - redis_queue: Optional[str] = None, - should_stop: Optional[Event] = None, - **kwargs, - ): - from .. import SoundPlugin - - super().__init__(**kwargs) - - self.plugin: SoundPlugin = plugin - self.device = device - self.outfile = outfile - self.output_format = output_format - self.channels = channels - self.sample_rate = sample_rate - self.dtype = dtype - self.stream = stream - self.duration = duration - self.blocksize = blocksize - self.latency = latency - self.redis_queue = redis_queue - self.audio_pass_through = audio_pass_through - self.logger = getLogger(__name__) - - self._state = AudioState.STOPPED - self._state_lock = RLock() - self._started_time: Optional[float] = None - self._converter: Optional[ConverterProcess] = None - self._should_stop = should_stop or Event() - self.paused_changed = Event() - - @property - def should_stop(self) -> bool: - """ - Proxy for `._should_stop.is_set()`. - """ - return self._should_stop.is_set() - - @abstractmethod - def _audio_callback(self, audio_converter: ConverterProcess): - """ - Returns a callback to handle the raw frames captures from the audio device. - """ - raise NotImplementedError() - - @abstractmethod - def _on_audio_converted(self, data: bytes, out_f: IO): - """ - This callback will be called when the audio data has been converted. - """ - raise NotImplementedError() - - def main( - self, - converter: ConverterProcess, - audio_stream: sd.Stream, - out_stream_index: Optional[int], - out_f: IO, - ): - """ - Main loop. - """ - self.notify_start() - if out_stream_index: - self.plugin.start_playback( - stream_index=out_stream_index, stream=audio_stream - ) - - self.logger.info( - 'Started %s on device [%s]', self.__class__.__name__, self.device - ) - self._started_time = time.time() - - while ( - self.state != AudioState.STOPPED - and not self.should_stop - and ( - self.duration is None - or time.time() - self._started_time < self.duration - ) - ): - while self.state == AudioState.PAUSED: - self.paused_changed.wait() - - if self.should_stop: - break - - timeout = ( - max( - 0, - self.duration - (time.time() - self._started_time), - ) - if self.duration is not None - else 1 - ) - - data = converter.read(timeout=timeout) - if not data: - continue - - self._on_audio_converted(data, out_f) - - @override - def run(self): - super().run() - self.paused_changed.clear() - - try: - stream_index = ( - self.plugin._allocate_stream_index() # pylint: disable=protected-access - if self.audio_pass_through - else None - ) - - with self.open_converter() as converter, sd.Stream( - samplerate=self.sample_rate, - device=self.device, - channels=self.channels, - callback=self._audio_callback(converter), - dtype=self.dtype, - latency=self.latency, - blocksize=self.blocksize, - ) as audio_stream, open(self.outfile, 'wb') as f: - self.main( - out_stream_index=stream_index, - converter=converter, - audio_stream=audio_stream, - out_f=f, - ) - except queue.Empty: - self.logger.warning( - 'Audio callback timeout for %s', self.__class__.__name__ - ) - finally: - self.notify_stop() - - @contextmanager - def open_converter(self) -> Generator[ConverterProcess, None, None]: - assert not self._converter, 'A converter process is already running' - self._converter = ConverterProcess( - ffmpeg_bin=self.plugin.ffmpeg_bin, - sample_rate=self.sample_rate, - channels=self.channels, - dtype=self.dtype, - chunk_size=self.plugin.input_blocksize, - output_format=self.output_format, - ) - - self._converter.start() - yield self._converter - - self._converter.stop() - self._converter.join(timeout=2) - self._converter = None - - def notify_start(self): - self.state = AudioState.RUNNING - - def notify_stop(self): - self.state = AudioState.STOPPED - if self._converter: - self._converter.stop() - - def notify_pause(self): - states = { - AudioState.PAUSED: AudioState.RUNNING, - AudioState.RUNNING: AudioState.PAUSED, - } - - with self._state_lock: - new_state = states.get(self.state) - if new_state: - self.state = new_state - else: - return - - self.logger.info('Paused state toggled for %s', self.__class__.__name__) - self.paused_changed.set() - - @property - def state(self): - with self._state_lock: - return self._state - - @state.setter - def state(self, value: AudioState): - with self._state_lock: - self._state = value - - -# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/sound/_controllers/_recorder.py b/platypush/plugins/sound/_controllers/_recorder.py deleted file mode 100644 index 4c1393113..000000000 --- a/platypush/plugins/sound/_controllers/_recorder.py +++ /dev/null @@ -1,69 +0,0 @@ -from typing import IO -from typing_extensions import override - -from platypush.context import get_bus -from platypush.message.event.sound import ( - SoundRecordingStartedEvent, - SoundRecordingStoppedEvent, -) - -from platypush.utils import get_redis - -from .._converter import ConverterProcess -from .._model import AudioState -from ._base import AudioThread - - -class AudioRecorder(AudioThread): - """ - The ``AudioRecorder`` thread is responsible for recording audio from the - input device, writing it to the converter process and dispatch the - converted audio to the registered consumers. - """ - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - - @override - def _audio_callback(self, audio_converter: ConverterProcess): - # _ = frames - # __ = time - def callback(indata, outdata, _, __, status): - if self.state != AudioState.RUNNING: - return - - if status: - self.logger.warning('Recording callback status: %s', status) - - try: - audio_converter.write(indata.tobytes()) - except AssertionError as e: - self.logger.warning('Audio converter callback error: %s', e) - self.state = AudioState.STOPPED - return - - if self.audio_pass_through: - outdata[:] = indata - - return callback - - @override - def _on_audio_converted(self, data: bytes, out_f: IO): - out_f.write(data) - if self.redis_queue and self.stream: - get_redis().publish(self.redis_queue, data) - - @override - def notify_start(self): - super().notify_start() - get_bus().post(SoundRecordingStartedEvent()) - - @override - def notify_stop(self): - prev_state = self.state - super().notify_stop() - if prev_state != AudioState.STOPPED: - get_bus().post(SoundRecordingStoppedEvent()) - - -# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/sound/_converter.py b/platypush/plugins/sound/_converter.py deleted file mode 100644 index bdab2d03e..000000000 --- a/platypush/plugins/sound/_converter.py +++ /dev/null @@ -1,183 +0,0 @@ -import asyncio -from asyncio.subprocess import PIPE -from logging import getLogger -from queue import Empty - -from queue import Queue -from threading import Event, RLock, Thread -from typing import Optional, Self - -from platypush.context import get_or_create_event_loop - -_dtype_to_ffmpeg_format = { - 'int8': 's8', - 'uint8': 'u8', - 'int16': 's16le', - 'uint16': 'u16le', - 'int32': 's32le', - 'uint32': 'u32le', - 'float32': 'f32le', - 'float64': 'f64le', -} -""" -Supported input types: - 'int8', 'uint8', 'int16', 'uint16', 'int32', 'uint32', 'float32', 'float64' -""" - -_output_format_to_ffmpeg_args = { - 'wav': ('-f', 'wav'), - 'ogg': ('-f', 'ogg'), - 'mp3': ('-f', 'mp3'), - 'aac': ('-f', 'adts'), - 'flac': ('-f', 'flac'), -} - - -class ConverterProcess(Thread): - """ - Wrapper for an ffmpeg converter instance. - """ - - def __init__( - self, - ffmpeg_bin: str, - sample_rate: int, - channels: int, - dtype: str, - chunk_size: int, - output_format: str, - *args, - **kwargs, - ): - """ - :param ffmpeg_bin: Path to the ffmpeg binary. - :param sample_rate: The sample rate of the input audio. - :param channels: The number of channels of the input audio. - :param dtype: The (numpy) data type of the raw input audio. - :param chunk_size: Number of bytes that will be read at once from the - ffmpeg process. - :param output_format: Output audio format. - """ - super().__init__(*args, **kwargs) - - ffmpeg_format = _dtype_to_ffmpeg_format.get(dtype) - assert ffmpeg_format, ( - f'Unsupported data type: {dtype}. Supported data types: ' - f'{list(_dtype_to_ffmpeg_format.keys())}' - ) - - self._ffmpeg_bin = ffmpeg_bin - self._ffmpeg_format = ffmpeg_format - self._sample_rate = sample_rate - self._channels = channels - self._chunk_size = chunk_size - self._output_format = output_format - self._closed = False - self._out_queue = Queue() - self.ffmpeg = None - self.logger = getLogger(__name__) - self._loop = None - self._should_stop = Event() - self._stop_lock = RLock() - - def __enter__(self) -> Self: - self.start() - return self - - def __exit__(self, *_, **__): - self.stop() - - def _check_ffmpeg(self): - assert ( - self.ffmpeg and self.ffmpeg.returncode is None - ), 'The ffmpeg process has already terminated' - - def _get_format_args(self): - ffmpeg_args = _output_format_to_ffmpeg_args.get(self._output_format) - assert ffmpeg_args, ( - f'Unsupported output format: {self._output_format}. Supported formats: ' - f'{list(_output_format_to_ffmpeg_args.keys())}' - ) - - return ffmpeg_args - - async def _audio_proxy(self, timeout: Optional[float] = None): - self.ffmpeg = await asyncio.create_subprocess_exec( - self._ffmpeg_bin, - '-f', - self._ffmpeg_format, - '-ar', - str(self._sample_rate), - '-ac', - str(self._channels), - '-i', - 'pipe:', - *self._get_format_args(), - 'pipe:', - stdin=PIPE, - stdout=PIPE, - ) - - try: - await asyncio.wait_for(self.ffmpeg.wait(), 0.1) - except asyncio.TimeoutError: - pass - - while ( - self._loop - and self.ffmpeg - and self.ffmpeg.returncode is None - and not self.should_stop - ): - self._check_ffmpeg() - assert ( - self.ffmpeg and self.ffmpeg.stdout - ), 'The stdout is closed for the ffmpeg process' - - try: - data = await asyncio.wait_for( - self.ffmpeg.stdout.read(self._chunk_size), timeout - ) - self._out_queue.put(data) - except asyncio.TimeoutError: - self._out_queue.put(b'') - - def write(self, data: bytes): - self._check_ffmpeg() - assert ( - self.ffmpeg and self._loop and self.ffmpeg.stdin - ), 'The stdin is closed for the ffmpeg process' - - self._loop.call_soon_threadsafe(self.ffmpeg.stdin.write, data) - - def read(self, timeout: Optional[float] = None) -> Optional[bytes]: - try: - return self._out_queue.get(timeout=timeout) - except Empty: - return None - - def run(self): - super().run() - self._loop = get_or_create_event_loop() - try: - self._loop.run_until_complete(self._audio_proxy(timeout=1)) - except RuntimeError as e: - self.logger.warning(e) - finally: - self.stop() - - def stop(self): - with self._stop_lock: - self._should_stop.set() - if self.ffmpeg: - self.ffmpeg.kill() - - self.ffmpeg = None - self._loop = None - - @property - def should_stop(self) -> bool: - return self._should_stop.is_set() - - -# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/sound/_converters/__init__.py b/platypush/plugins/sound/_converters/__init__.py new file mode 100644 index 000000000..3ab39e98c --- /dev/null +++ b/platypush/plugins/sound/_converters/__init__.py @@ -0,0 +1,10 @@ +from ._base import AudioConverter +from ._from_raw import RawInputAudioConverter +from ._to_raw import RawOutputAudioConverter, RawOutputAudioFromFileConverter + +__all__ = [ + 'AudioConverter', + 'RawInputAudioConverter', + 'RawOutputAudioConverter', + 'RawOutputAudioFromFileConverter', +] diff --git a/platypush/plugins/sound/_converters/_base.py b/platypush/plugins/sound/_converters/_base.py new file mode 100644 index 000000000..1a738f3ac --- /dev/null +++ b/platypush/plugins/sound/_converters/_base.py @@ -0,0 +1,331 @@ +from abc import ABC, abstractmethod +import asyncio +from asyncio.subprocess import PIPE +from logging import getLogger +from queue import Empty, Queue +from threading import Event, RLock, Thread +from typing import Any, Callable, Coroutine, Iterable, Optional, Self + +from platypush.context import get_or_create_event_loop + +_dtype_to_ffmpeg_format = { + 'int8': 's8', + 'uint8': 'u8', + 'int16': 's16le', + 'uint16': 'u16le', + 'int32': 's32le', + 'uint32': 'u32le', + 'float32': 'f32le', + 'float64': 'f64le', +} +""" +Supported raw types: + 'int8', 'uint8', 'int16', 'uint16', 'int32', 'uint32', 'float32', 'float64' +""" + + +class AudioConverter(Thread, ABC): + """ + Base class for an ffmpeg audio converter instance. + """ + + _format_to_ffmpeg_args = { + 'wav': ('-f', 'wav'), + 'ogg': ('-f', 'ogg'), + 'mp3': ('-f', 'mp3'), + 'aac': ('-f', 'adts'), + 'flac': ('-f', 'flac'), + } + + def __init__( + self, + *args, + ffmpeg_bin: str, + sample_rate: int, + channels: int, + volume: float, + dtype: str, + chunk_size: int, + format: Optional[str] = None, # pylint: disable=redefined-builtin + on_exit: Optional[Callable[[], Any]] = None, + **kwargs, + ): + """ + :param ffmpeg_bin: Path to the ffmpeg binary. + :param sample_rate: The sample rate of the input/output audio. + :param channels: The number of channels of the input/output audio. + :param volume: Audio volume, as a percentage between 0 and 100. + :param dtype: The (numpy) data type of the raw input/output audio. + :param chunk_size: Number of bytes that will be read at once from the + ffmpeg process. + :param format: Input/output audio format. + :param on_exit: Function to call when the ffmpeg process exits. + """ + super().__init__(*args, **kwargs) + + ffmpeg_format = _dtype_to_ffmpeg_format.get(dtype) + assert ffmpeg_format, ( + f'Unsupported data type: {dtype}. Supported data types: ' + f'{list(_dtype_to_ffmpeg_format.keys())}' + ) + + self._ffmpeg_bin = ffmpeg_bin + self._ffmpeg_format = ffmpeg_format + self._ffmpeg_task: Optional[Coroutine] = None + self._sample_rate = sample_rate + self._channels = channels + self._chunk_size = chunk_size + self._format = format + self._closed = False + self._out_queue = Queue() + self.ffmpeg = None + self.volume = volume + self.logger = getLogger(__name__) + self._loop = None + self._should_stop = Event() + self._stop_lock = RLock() + self._on_exit = on_exit + self._ffmpeg_terminated = Event() + + def __enter__(self) -> Self: + """ + Audio converter context manager. + + It starts and registers the ffmpeg converter process. + """ + self.start() + return self + + def __exit__(self, *_, **__): + """ + Audio converter context manager. + + It stops and unregisters the ffmpeg converter process. + """ + self.stop() + + def _check_ffmpeg(self): + assert not self.terminated, 'The ffmpeg process has already terminated' + + @property + def gain(self) -> float: + return self.volume / 100 + + @property + def terminated(self) -> bool: + return self._ffmpeg_terminated.is_set() + + @property + def _default_args(self) -> Iterable[str]: + """ + Set of arguments common to all ffmpeg converter instances. + """ + return ('-hide_banner', '-loglevel', 'warning', '-y') + + @property + @abstractmethod + def _input_format_args(self) -> Iterable[str]: + """ + Ffmpeg audio input arguments. + """ + raise NotImplementedError() + + @property + @abstractmethod + def _output_format_args(self): + """ + Ffmpeg audio output arguments. + """ + raise NotImplementedError() + + @property + def _channel_layout_args(self) -> Iterable[str]: + """ + Set of extra ffmpeg arguments for the channel layout. + """ + args = ('-ac', str(self._channels)) + if self._channels == 1: + return args + ('-channel_layout', 'mono') + if self._channels == 2: + return args + ('-channel_layout', 'stereo') + return args + + @property + def _raw_ffmpeg_args(self) -> Iterable[str]: + """ + Ffmpeg arguments for raw audio input/output given the current + configuration. + """ + return ( + '-f', + self._ffmpeg_format, + '-ar', + str(self._sample_rate), + *self._channel_layout_args, + ) + + @property + def _audio_volume_args(self) -> Iterable[str]: + """ + Ffmpeg audio volume arguments. + """ + return ('-filter:a', f'volume={self.gain}') + + @property + def _input_source_args(self) -> Iterable[str]: + """ + Default arguments for the ffmpeg input source (default: ``-i pipe:``, + ffmpeg will read from a pipe filled by the application). + """ + return ('-i', 'pipe:') + + @property + def _output_target_args(self) -> Iterable[str]: + """ + Default arguments for the ffmpeg output target (default: ``pipe:``, + ffmpeg will write the output to a pipe read by the application). + """ + return ('pipe:',) + + @property + def _converter_stdin(self) -> Optional[int]: + """ + Default stdin file descriptor to be used by the ffmpeg converter. + + Default: ``PIPE``, as the ffmpeg process by default reads audio frames + from the stdin. + """ + return PIPE + + @property + def _compressed_ffmpeg_args(self) -> Iterable[str]: + """ + Ffmpeg arguments for the compressed audio given the current + configuration. + """ + if not self._format: + return () + + ffmpeg_args = self._format_to_ffmpeg_args.get(self._format) + assert ffmpeg_args, ( + f'Unsupported output format: {self._format}. Supported formats: ' + f'{list(self._format_to_ffmpeg_args.keys())}' + ) + + return ffmpeg_args + + async def _audio_proxy(self, timeout: Optional[float] = None): + """ + Proxy the converted audio stream to the output queue for downstream + consumption. + """ + ffmpeg_args = ( + self._ffmpeg_bin, + *self._default_args, + *self._input_format_args, + *self._input_source_args, + *self._output_format_args, + *self._output_target_args, + ) + + self.ffmpeg = await asyncio.create_subprocess_exec( + *ffmpeg_args, + stdin=self._converter_stdin, + stdout=PIPE, + ) + + self.logger.info('Running ffmpeg: %s', ' '.join(ffmpeg_args)) + + try: + await asyncio.wait_for(self.ffmpeg.wait(), 0.1) + except asyncio.TimeoutError: + pass + + while ( + self._loop + and self.ffmpeg + and self.ffmpeg.returncode is None + and not self.should_stop + ): + self._check_ffmpeg() + assert ( + self.ffmpeg and self.ffmpeg.stdout + ), 'The stdout is closed for the ffmpeg process' + + self._ffmpeg_terminated.clear() + try: + data = await asyncio.wait_for( + self.ffmpeg.stdout.read(self._chunk_size), timeout + ) + self._out_queue.put(data) + except asyncio.TimeoutError: + self._out_queue.put(b'') + + def write(self, data: bytes): + """ + Write raw data to the ffmpeg process. + """ + self._check_ffmpeg() + assert ( + self.ffmpeg and self._loop and self.ffmpeg.stdin + ), 'The stdin is closed for the ffmpeg process' + + self._loop.call_soon_threadsafe(self.ffmpeg.stdin.write, data) + + def read(self, timeout: Optional[float] = None) -> Optional[bytes]: + """ + Read the next chunk of converted audio bytes from the converter queue. + """ + try: + return self._out_queue.get(timeout=timeout) + except Empty: + return None + + def run(self): + """ + Main runner. It runs the audio proxy in a loop and cleans up everything + in case of stop/failure. + """ + super().run() + self._loop = get_or_create_event_loop() + try: + self._ffmpeg_task = self._audio_proxy(timeout=1) + self._loop.run_until_complete(self._ffmpeg_task) + except RuntimeError as e: + self.logger.warning(e) + finally: + self.stop() + + def stop(self): + """ + Sets the stop event, kills the ffmpeg process and resets the context. + """ + with self._stop_lock: + self._should_stop.set() + if self._ffmpeg_task: + self._ffmpeg_task.close() + self._ffmpeg_task = None + + try: + if self.ffmpeg and self.ffmpeg.returncode is None: + self.ffmpeg.kill() + except ProcessLookupError: + pass + + self.ffmpeg = None + self._loop = None + + self._ffmpeg_terminated.set() + + if self._on_exit: + self._on_exit() + + @property + def should_stop(self) -> bool: + """ + Proxy property for the ``_should_stop`` event. + """ + return self._should_stop.is_set() + + +# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/sound/_converters/_from_raw.py b/platypush/plugins/sound/_converters/_from_raw.py new file mode 100644 index 000000000..09c21c184 --- /dev/null +++ b/platypush/plugins/sound/_converters/_from_raw.py @@ -0,0 +1,23 @@ +from typing import Iterable +from typing_extensions import override + +from ._base import AudioConverter + + +class RawInputAudioConverter(AudioConverter): + """ + Converts raw audio input to a compressed media format. + """ + + @property + @override + def _input_format_args(self) -> Iterable[str]: + return self._raw_ffmpeg_args + + @property + @override + def _output_format_args(self) -> Iterable[str]: + return self._compressed_ffmpeg_args + + +# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/sound/_converters/_to_raw.py b/platypush/plugins/sound/_converters/_to_raw.py new file mode 100644 index 000000000..1263683ad --- /dev/null +++ b/platypush/plugins/sound/_converters/_to_raw.py @@ -0,0 +1,38 @@ +from typing import Iterable +from typing_extensions import override + +from ._base import AudioConverter + + +class RawOutputAudioConverter(AudioConverter): + """ + Converts input audio to raw audio output. + """ + + @property + @override + def _input_format_args(self) -> Iterable[str]: + return self._compressed_ffmpeg_args + + @property + @override + def _output_format_args(self) -> Iterable[str]: + return self._raw_ffmpeg_args + + +class RawOutputAudioFromFileConverter(RawOutputAudioConverter): + """ + Converts an input file to raw audio output. + """ + + def __init__(self, *args, infile: str, **kwargs): + super().__init__(*args, **kwargs) + self.infile = infile + + @property + @override + def _input_source_args(self) -> Iterable[str]: + return ('-i', self.infile) + + +# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/sound/_manager/__init__.py b/platypush/plugins/sound/_manager/__init__.py new file mode 100644 index 000000000..acb86ad9d --- /dev/null +++ b/platypush/plugins/sound/_manager/__init__.py @@ -0,0 +1,3 @@ +from ._main import AudioManager + +__all__ = ["AudioManager"] diff --git a/platypush/plugins/sound/_manager/_device.py b/platypush/plugins/sound/_manager/_device.py new file mode 100644 index 000000000..2961a5ece --- /dev/null +++ b/platypush/plugins/sound/_manager/_device.py @@ -0,0 +1,91 @@ +from typing import List, Optional + +import sounddevice as sd + +from .._model import AudioDevice, DeviceType, StreamType + + +class DeviceManager: + """ + The device manager is responsible for managing the virtual audio device + abstractions exposed by the OS. + + For example, on a pure ALSA system virtual devices are usually mapped the + physical audio devices available on the system. + + On a system that runs through PulseAudio or Jack, there may be a + ``default`` virtual device whose sound card mappings may be managed by the + audio server. + """ + + def __init__( + self, + input_device: Optional[DeviceType] = None, + output_device: Optional[DeviceType] = None, + ): + """ + :param input_device: The default input device to use (by index or name). + :param output_device: The default output device to use (by index or name). + """ + self.input_device = ( + self.get_device(input_device, StreamType.INPUT) + if input_device is not None + else None + ) + + self.output_device = ( + self.get_device(output_device, StreamType.OUTPUT) + if output_device is not None + else None + ) + + def get_devices( + self, type: Optional[StreamType] = None # pylint: disable=redefined-builtin + ) -> List[AudioDevice]: + """ + Get available audio devices. + + :param type: The type of devices to filter (default: return all). + """ + devices: List[dict] = sd.query_devices() # type: ignore + if type: + devices = [dev for dev in devices if dev.get(f'max_{type.value}_channels')] + + return [AudioDevice(**info) for info in devices] + + def get_device( + self, + device: Optional[DeviceType] = None, + type: Optional[StreamType] = None, # pylint: disable=redefined-builtin + ) -> AudioDevice: + """ + Search for a device. + + Either ``device`` or ``type`` have to be specified. + + :param device: The device to search for, either by index or name. If + not specified, then the default device for the given type is + returned. + :param type: The type of the device to search. + """ + assert device or type, 'Please specify either device or type' + if device is None: + if type == StreamType.INPUT and self.input_device is not None: + return self.input_device + if type == StreamType.OUTPUT and self.output_device is not None: + return self.output_device + + try: + info: dict = sd.query_devices( + kind=type.value if type else None, device=device # type: ignore + ) + except sd.PortAudioError as e: + raise AssertionError( + f'Could not get device for type={type} and device={device}: {e}', + type, + device, + e, + ) from e + + assert info, f'No such device: {device}' + return AudioDevice(**info) diff --git a/platypush/plugins/sound/_manager/_main.py b/platypush/plugins/sound/_manager/_main.py new file mode 100644 index 000000000..927847588 --- /dev/null +++ b/platypush/plugins/sound/_manager/_main.py @@ -0,0 +1,291 @@ +from logging import getLogger +import os +import stat +from threading import Event +from time import time +from typing import Iterable, List, Optional, Union + +from .._model import AudioDevice, DeviceType, StreamType +from .._streams import AudioPlayer, AudioRecorder, AudioThread +from ._device import DeviceManager +from ._stream import StreamManager + + +class AudioManager: + """ + The audio manager is responsible for managing multiple audio controllers and + their access to audio resources. + + It main purpose is to act as a proxy/facade between the high-level audio + plugin and the audio functionalities (allocating streams, managing the state + of the player and recorder processes, etc.). + """ + + _default_signal_timeout = 2 + + def __init__( + self, + should_stop: Event, + input_blocksize: int, + output_blocksize: int, + input_device: Optional[DeviceType] = None, + output_device: Optional[DeviceType] = None, + queue_size: Optional[int] = None, + ): + """ + :param should_stop: Event to synchronize the audio manager stop. + :param input_blocksize: Block size for the input stream. + :param output_blocksize: Block size for the output stream. + :param input_device: Default device to use for the input stream. + :param output_device: Default device to use for the output stream. + :param queue_size: Maximum size of the audio queues. + """ + self._should_stop = should_stop + self._device_manager = DeviceManager( + input_device=input_device, output_device=output_device + ) + + self._stream_manager = StreamManager(device_manager=self._device_manager) + self.logger = getLogger(__name__) + self.input_blocksize = input_blocksize + self.output_blocksize = output_blocksize + self.queue_size = queue_size + + def create_player( + self, + device: DeviceType, + channels: int, + volume: float, + infile: Optional[str] = None, + sound: Optional[Union[dict, Iterable[dict]]] = None, + duration: Optional[float] = None, + sample_rate: Optional[int] = None, + dtype: str = 'int16', + blocksize: Optional[int] = None, + latency: Union[float, str] = 'high', + stream_name: Optional[str] = None, + ) -> AudioPlayer: + """ + Create an audio player thread. + + :param device: Audio device to use. + :param channels: Number of output channels. + :param volume: Output volume, between 0 and 100. + :param infile: File or URL to play. + :param sound: Alternatively to a file/URL, you can play synthetic + sounds. + :param duration: Duration of the stream in seconds. + :param sample_rate: Sample rate of the stream. + :param dtype: Data type of the stream. + :param blocksize: Block size of the stream. + :param latency: Latency of the stream. + :param stream_name: Name of the stream. + """ + dev = self._device_manager.get_device(device, type=StreamType.OUTPUT) + player = AudioPlayer.build( + device=device, + infile=infile, + sound=sound, + duration=duration, + volume=volume, + sample_rate=sample_rate or dev.default_samplerate, + dtype=dtype, + blocksize=blocksize or self.output_blocksize, + latency=latency, + channels=channels, + queue_size=self.queue_size, + should_stop=self._should_stop, + ) + + self._stream_manager.register( + player, dev, StreamType.OUTPUT, stream_name=stream_name + ) + return player + + def create_recorder( + self, + device: DeviceType, + output_device: Optional[DeviceType] = None, + fifo: Optional[str] = None, + outfile: Optional[str] = None, + duration: Optional[float] = None, + sample_rate: Optional[int] = None, + dtype: str = 'int16', + blocksize: Optional[int] = None, + latency: Union[float, str] = 'high', + channels: int = 1, + volume: float = 100, + redis_queue: Optional[str] = None, + format: str = 'wav', # pylint: disable=redefined-builtin + stream: bool = True, + stream_name: Optional[str] = None, + play_audio: bool = False, + ) -> AudioRecorder: + """ + Create an audio recorder thread. + + :param device: Audio device to use. + :param output_device: Output device to use. + :param fifo: Path to an output FIFO file to use to synchronize the audio + to other processes. + :param outfile: Optional output file for the recorded audio. + :param duration: Duration of the recording in seconds. + :param sample_rate: Sample rate of the stream. + :param dtype: Data type of the stream. + :param blocksize: Block size of the stream. + :param latency: Latency of the stream. + :param channels: Number of output channels. + :param volume: Input volume, between 0 and 100. + :param redis_queue: Name of the Redis queue to use. + :param format: Format of the recorded audio. + :param stream: Whether to stream the recorded audio. + :param play_audio: Whether to play the recorded audio in real-time. + :param stream_name: Name of the stream. + """ + blocksize = blocksize or self.input_blocksize + dev = self._device_manager.get_device(device, type=StreamType.OUTPUT) + + if fifo: + fifo = os.path.expanduser(fifo) + if os.path.exists(fifo) and stat.S_ISFIFO(os.stat(fifo).st_mode): + self.logger.info('Removing previous input stream FIFO %s', fifo) + os.unlink(fifo) + + os.mkfifo(fifo, 0o644) + outfile = fifo + elif outfile: + outfile = os.path.expanduser(outfile) + + outfile = outfile or fifo or os.devnull + recorder = AudioRecorder( + device=( + ( + dev.index, + self._device_manager.get_device( + type=StreamType.OUTPUT, device=output_device + ).index, + ) + if play_audio + else dev.index + ), + outfile=outfile, + duration=duration, + sample_rate=sample_rate or dev.default_samplerate, + dtype=dtype, + blocksize=blocksize, + latency=latency, + output_format=format, + channels=channels, + volume=volume, + redis_queue=redis_queue, + stream=stream, + audio_pass_through=play_audio, + queue_size=self.queue_size, + should_stop=self._should_stop, + ) + + self._stream_manager.register( + recorder, dev, StreamType.INPUT, stream_name=stream_name + ) + return recorder + + def get_device( + self, + device: Optional[DeviceType] = None, + type: Optional[StreamType] = None, # pylint: disable=redefined-builtin + ) -> AudioDevice: + """ + Proxy to ``self._device_manager.get_device``. + """ + return self._device_manager.get_device(device=device, type=type) + + def get_devices( + self, + type: Optional[StreamType] = None, # pylint: disable=redefined-builtin + ) -> List[AudioDevice]: + """ + Proxy to ``self._device_manager.get_devices``. + """ + return self._device_manager.get_devices(type=type) + + def get_streams( + self, + device: Optional[DeviceType] = None, + type: Optional[StreamType] = None, # pylint: disable=redefined-builtin + streams: Optional[Iterable[Union[str, int]]] = None, + ) -> List[AudioThread]: + """ + Proxy to ``self._stream_manager.get``. + """ + return self._stream_manager.get(device=device, type=type, streams=streams) + + def stop_audio( + self, + device: Optional[DeviceType] = None, + type: Optional[StreamType] = None, # pylint: disable=redefined-builtin + streams: Optional[Iterable[Union[str, int]]] = None, + timeout: Optional[float] = 2, + ): + """ + Stops audio sessions. + + :param device: Filter by host audio device. + :param type: Filter by stream type (input or output). + :param streams: Filter by stream indices/names. + :param timeout: Wait timeout in seconds. + """ + streams_to_stop = self._stream_manager.get(device, type, streams=streams) + + # Send the stop signals + for audio_thread in streams_to_stop: + audio_thread.notify_stop() + + # Wait for termination (with timeout) + wait_start = time() + for audio_thread in streams_to_stop: + audio_thread.join( + timeout=max(0, timeout - (time() - wait_start)) + if timeout is not None + else None + ) + + # Remove references + for audio_thread in streams_to_stop: + self._stream_manager.unregister(audio_thread) + + def pause_audio( + self, + device: Optional[DeviceType] = None, + type: Optional[StreamType] = None, # pylint: disable=redefined-builtin + streams: Optional[Iterable[Union[str, int]]] = None, + ): + """ + Pauses/resumes audio sessions. + + :param device: Filter by host audio device. + :param type: Filter by stream type (input or output). + :param streams: Filter by stream indices/names. + """ + streams_to_pause = self._stream_manager.get(device, type, streams=streams) + + # Send the pause toggle signals + for audio_thread in streams_to_pause: + audio_thread.notify_pause() + + def set_volume( + self, + volume: float, + device: Optional[DeviceType] = None, + streams: Optional[Iterable[Union[str, int]]] = None, + ): + """ + :param volume: New volume, between 0 and 100. + :param device: Set the volume only on the specified device (default: + all). + :param streams: Set the volume only on the specified list of stream + indices/names (default: all). + """ + stream_objs = self._stream_manager.get(device=device, streams=streams) + + for stream in stream_objs: + stream.volume = volume diff --git a/platypush/plugins/sound/_manager/_stream.py b/platypush/plugins/sound/_manager/_stream.py new file mode 100644 index 000000000..6fbabe77d --- /dev/null +++ b/platypush/plugins/sound/_manager/_stream.py @@ -0,0 +1,207 @@ +from collections import defaultdict +from logging import getLogger +from threading import RLock +from typing import Dict, Iterable, List, Optional, Union + +from .._model import AudioDevice, DeviceType, StreamType +from .._streams import AudioThread +from ._device import DeviceManager + + +class StreamManager: + """ + The audio manager is responsible for storing the current state of the + playing/recording audio streams and allowing fast flexible lookups (by + stream index, name, type, device, and any combination of those). + """ + + def __init__(self, device_manager: DeviceManager): + """ + :param device_manager: Reference to the device manager. + """ + self._next_stream_index = 1 + self._device_manager = device_manager + self._state_lock = RLock() + self._stream_index_by_name: Dict[str, int] = {} + self._stream_name_by_index: Dict[int, str] = {} + self._stream_index_to_device: Dict[int, AudioDevice] = {} + self._stream_index_to_type: Dict[int, StreamType] = {} + self.logger = getLogger(__name__) + + self._streams: Dict[ + int, Dict[StreamType, Dict[int, AudioThread]] + ] = defaultdict(lambda: {stream_type: {} for stream_type in StreamType}) + """ {device_index: {stream_type: {stream_index: audio_thread}}} """ + + self._streams_by_index: Dict[StreamType, Dict[int, AudioThread]] = { + stream_type: {} for stream_type in StreamType + } + """ {stream_type: {stream_index: [audio_threads]}} """ + + self._stream_locks: Dict[int, Dict[StreamType, RLock]] = defaultdict( + lambda: {stream_type: RLock() for stream_type in StreamType} + ) + """ {device_index: {stream_type: RLock}} """ + + @classmethod + def _generate_stream_name( + cls, + type: StreamType, # pylint: disable=redefined-builtin + stream_index: int, + ) -> str: + return f'platypush:audio:{type.value}:{stream_index}' + + def _gen_next_stream_index( + self, + type: StreamType, # pylint: disable=redefined-builtin + stream_name: Optional[str] = None, + ) -> int: + """ + :param type: The type of the stream to allocate (input or output). + :param stream_name: The name of the stream to allocate. + :return: The index of the new stream. + """ + with self._state_lock: + stream_index = self._next_stream_index + + if not stream_name: + stream_name = self._generate_stream_name(type, stream_index) + + self._stream_name_by_index[stream_index] = stream_name + self._stream_index_by_name[stream_name] = stream_index + self._next_stream_index += 1 + + return stream_index + + def register( + self, + audio_thread: AudioThread, + device: AudioDevice, + type: StreamType, # pylint: disable=redefined-builtin + stream_name: Optional[str] = None, + ): + """ + Registers an audio stream to a device. + + :param audio_thread: Stream to register. + :param device: Device to register the stream to. + :param type: The type of the stream to allocate (input or output). + :param stream_name: The name of the stream to allocate. + """ + with self._state_lock: + stream_index = audio_thread.stream_index + if stream_index is None: + stream_index = audio_thread.stream_index = self._gen_next_stream_index( + type, stream_name=stream_name + ) + + self._streams[device.index][type][stream_index] = audio_thread + self._stream_index_to_device[stream_index] = device + self._stream_index_to_type[stream_index] = type + self._streams_by_index[type][stream_index] = audio_thread + + def unregister( + self, + audio_thread: AudioThread, + device: Optional[AudioDevice] = None, + type: Optional[StreamType] = None, # pylint: disable=redefined-builtin + ): + """ + Unregisters an audio stream from a device. + + :param audio_thread: Stream to unregister. + :param device: Device to unregister the stream from. + :param type: The type of the stream to unregister (input or output). + """ + with self._state_lock: + stream_index = audio_thread.stream_index + if stream_index is None: + return + + if device is None: + device = self._stream_index_to_device.get(stream_index) + + if not type: + type = self._stream_index_to_type.get(stream_index) + + if device is None or type is None: + return + + self._streams[device.index][type].pop(stream_index, None) + self._stream_index_to_device.pop(stream_index, None) + self._stream_index_to_type.pop(stream_index, None) + self._streams_by_index[type].pop(stream_index, None) + stream_name = self._stream_name_by_index.pop(stream_index, None) + if stream_name: + self._stream_index_by_name.pop(stream_name, None) + + def _get_by_device_and_type( + self, + device: Optional[DeviceType] = None, + type: Optional[StreamType] = None, # pylint: disable=redefined-builtin + ) -> List[AudioThread]: + """ + Filter streams by device and/or type. + """ + devs = ( + [self._device_manager.get_device(device, type)] + if device is not None + else self._device_manager.get_devices(type) + ) + + return [ + audio_thread + for dev in devs + for stream_info in ( + [self._streams[dev.index].get(type, {})] + if type + else list(self._streams[dev.index].values()) + ) + for audio_thread in stream_info.values() + if audio_thread and audio_thread.is_alive() + ] + + def _get_by_stream_index_or_name( + self, streams: Iterable[Union[str, int]] + ) -> List[AudioThread]: + """ + Filter streams by index or name. + """ + threads = [] + + for stream in streams: + try: + stream_index = int(stream) + except (TypeError, ValueError): + stream_index = self._stream_index_by_name.get(stream) # type: ignore + if stream_index is None: + self.logger.warning('No such audio stream: %s', stream) + continue + + stream_type = self._stream_index_to_type.get(stream_index) + if not stream_type: + self.logger.warning( + 'No type available for this audio stream: %s', stream + ) + continue + + thread = self._streams_by_index.get(stream_type, {}).get(stream_index) + if thread: + threads.append(thread) + + return threads + + def get( + self, + device: Optional[DeviceType] = None, + type: Optional[StreamType] = None, # pylint: disable=redefined-builtin + streams: Optional[Iterable[Union[str, int]]] = None, + ) -> List[AudioThread]: + """ + Searches streams, either by device and/or type, or by stream index/name. + """ + return ( + self._get_by_stream_index_or_name(streams) + if streams + else self._get_by_device_and_type(device, type) + ) diff --git a/platypush/plugins/sound/_model.py b/platypush/plugins/sound/_model.py index 2f1215133..e7ceca63f 100644 --- a/platypush/plugins/sound/_model.py +++ b/platypush/plugins/sound/_model.py @@ -1,4 +1,26 @@ +from dataclasses import dataclass from enum import Enum +from typing import Union + +DeviceType = Union[int, str] + + +@dataclass +class AudioDevice: + """ + Maps the properties of an audio device. + """ + + index: int + name: str + hostapi: int + max_input_channels: int + max_output_channels: int + default_samplerate: int + default_low_input_latency: float = 0 + default_low_output_latency: float = 0 + default_high_input_latency: float = 0 + default_high_output_latency: float = 0 class AudioState(Enum): @@ -9,3 +31,12 @@ class AudioState(Enum): STOPPED = 'STOPPED' RUNNING = 'RUNNING' PAUSED = 'PAUSED' + + +class StreamType(Enum): + """ + Stream types. + """ + + INPUT = 'input' + OUTPUT = 'output' diff --git a/platypush/plugins/sound/_streams/__init__.py b/platypush/plugins/sound/_streams/__init__.py new file mode 100644 index 000000000..c3489f947 --- /dev/null +++ b/platypush/plugins/sound/_streams/__init__.py @@ -0,0 +1,6 @@ +from ._base import AudioThread +from ._player import AudioPlayer +from ._recorder import AudioRecorder + + +__all__ = ['AudioPlayer', 'AudioRecorder', 'AudioThread'] diff --git a/platypush/plugins/sound/_streams/_base.py b/platypush/plugins/sound/_streams/_base.py new file mode 100644 index 000000000..fa2d95e99 --- /dev/null +++ b/platypush/plugins/sound/_streams/_base.py @@ -0,0 +1,502 @@ +from abc import ABC, abstractmethod +from contextlib import contextmanager +from datetime import datetime +from logging import getLogger +import os +import queue +from threading import Event, RLock, Thread +import time +from typing import IO, Callable, Final, Generator, Optional, Tuple, Type, Union +from typing_extensions import override + +import sounddevice as sd + +from platypush.context import get_bus +from platypush.message.event.sound import SoundEvent +from platypush.utils import get_redis + +from .._converters import AudioConverter +from .._model import AudioState, StreamType + +_StreamType = Union[sd.Stream, sd.OutputStream] + + +class AudioThread(Thread, ABC): + """ + Base class for audio play/record stream threads. + """ + + _DEFAULT_FILE: Final[str] = os.devnull + """Unless otherwise specified, the audio streams will be sent to /dev/null""" + _DEFAULT_CONVERTER_TIMEOUT: Final[float] = 1 + + def __init__( + self, + device: Union[str, Tuple[str, str]], + channels: int, + volume: float, + sample_rate: int, + dtype: str, + blocksize: int, + ffmpeg_bin: str = 'ffmpeg', + stream: bool = False, + audio_pass_through: bool = False, + infile: Optional[str] = None, + outfile: Optional[str] = None, + duration: Optional[float] = None, + latency: Union[float, str] = 'high', + redis_queue: Optional[str] = None, + should_stop: Optional[Event] = None, + converter_timeout: Optional[float] = None, + stream_name: Optional[str] = None, + queue_size: Optional[int] = None, + **kwargs, + ): + """ + :param device: Audio device to use. + :param channels: Number of channels to use. + :param volume: Input/output volume, between 0 and 100. + :param sample_rate: Sample rate to use. + :param dtype: Data type to use. + :param blocksize: Block size to use. + :param ffmpeg_bin: Path to the ffmpeg binary. + :param stream: Whether to stream the audio to Redis consumers. + :param audio_pass_through: Whether to pass the audio through to the + application's output stream. + :param infile: Path to the input file or URL, if this is an output + stream. + :param outfile: Path to the output file. + :param duration: Duration of the audio stream. + :param latency: Latency to use. + :param redis_queue: Redis queue to use. + :param should_stop: Synchronize with upstream stop events. + :param converter_timeout: How long to wait for the converter to finish. + :param stream_name: Name of the stream. + :param queue_size: Maximum size of the audio queue. + """ + super().__init__(**kwargs) + + self.device = device + self.outfile = os.path.expanduser(outfile or self._DEFAULT_FILE) + self.infile = os.path.expanduser(infile or self._DEFAULT_FILE) + self.ffmpeg_bin = ffmpeg_bin + self.channels = channels + self.volume = volume + self.sample_rate = sample_rate + self.dtype = dtype + self.stream = stream + self.duration = duration + self.blocksize = blocksize * channels + self.latency = latency + self._redis_queue = redis_queue + self.audio_pass_through = audio_pass_through + self.queue_size = queue_size + self._stream_name = stream_name + self.logger = getLogger(__name__) + + self._state = AudioState.STOPPED + self._state_lock = RLock() + self._started_time: Optional[float] = None + self._converter: Optional[AudioConverter] = None + self._should_stop = should_stop or Event() + self._converter_timeout = converter_timeout or self._DEFAULT_CONVERTER_TIMEOUT + self.audio_stream: Optional[_StreamType] = None + self.stream_index: Optional[int] = None + self.paused_changed = Event() + self._converter_terminated = Event() + + @property + def should_stop(self) -> bool: + """ + Proxy for `._should_stop.is_set()`. + """ + return self._should_stop.is_set() or bool( + self.state == AudioState.STOPPED and self._started_time + ) + + @property + def gain(self) -> float: + return self.volume / 100 + + def wait_stop(self, timeout: Optional[float] = None): + """ + Wait for the stop signal to be received. + """ + return self._should_stop.wait(timeout=timeout) + + def _audio_callback(self) -> Callable: + """ + Returns a callback to handle the raw frames captures from the audio device. + """ + + def empty_callback(*_, **__): + pass + + return empty_callback + + @property + def stream_name(self) -> str: + if self._stream_name: + return self._stream_name + + ret = f'platypush:audio:{self.direction.value}' + if self.stream_index is not None: + ret += f':{self.stream_index}' + return ret + + @stream_name.setter + def stream_name(self, value: Optional[str]): + self._stream_name = value + + @property + @abstractmethod + def direction(self) -> StreamType: + """ + The default direction for this stream - input or output. + """ + raise NotImplementedError() + + @property + @abstractmethod + def _audio_converter_type(self) -> Optional[Type[AudioConverter]]: + """ + This property indicates the type that should be used for the audio + converter. + """ + raise NotImplementedError() + + @property + @abstractmethod + def _started_event_type(self) -> Type[SoundEvent]: + """ + Event type that will be emitted when the audio starts. + """ + raise NotImplementedError() + + @property + @abstractmethod + def _stopped_event_type(self) -> Type[SoundEvent]: + """ + Event type that will be emitted when the audio stops. + """ + raise NotImplementedError() + + @property + @abstractmethod + def _paused_event_type(self) -> Type[SoundEvent]: + """ + Event type that will be emitted when the audio is paused. + """ + raise NotImplementedError() + + @property + @abstractmethod + def _resumed_event_type(self) -> Type[SoundEvent]: + """ + Event type that will be emitted when the audio is resumed. + """ + raise NotImplementedError() + + @property + def _stream_type(self) -> Union[Type[sd.Stream], Type[sd.OutputStream]]: + """ + The type of stream this thread is mapped to. + """ + return sd.Stream + + @property + def _converter_args(self) -> dict: + """ + Extra arguments to pass to the audio converter. + """ + return {} + + @property + def _stream_args(self) -> dict: + """ + Extra arguments to pass to the stream constructor. + """ + return {} + + @property + def redis_queue(self) -> str: + """ + Redis queue for audio streaming. + """ + if self._redis_queue: + return self._redis_queue + + dev = ( + self.device + if isinstance(self.device, (str, int)) + else '-'.join(map(str, self.device)) + ) + + name = f'platypush-audio-stream-{self.__class__.__name__}-{dev}' + if self.stream_index is not None: + name = f'{name}-{self.stream_index}' + + return name + + def _on_audio_converted(self, data: bytes, out_f: Optional[IO] = None): + """ + This callback will be called when the audio data has been converted. + """ + if out_f: + out_f.write(data) + + if self.stream: + get_redis().publish(self.redis_queue, data) + + def _wait_running(self): + """ + If the stream is in paused state, wait for the state to change. + """ + while self.state == AudioState.PAUSED: + self.paused_changed.wait() + + def main( + self, + converter: Optional[AudioConverter] = None, + out_f: Optional[IO] = None, + ): + """ + Main loop. + """ + self.notify_start() + + self.logger.info( + 'Started %s on device [%s]', self.__class__.__name__, self.device + ) + self._started_time = time.time() + + while not self.should_stop and ( + self.duration is None or time.time() - self._started_time < self.duration + ): + self._wait_running() + if not converter: + self.wait_stop(0.1) + continue + + if self.should_stop: + break + + timeout = ( + max( + 0, + min( + self.duration - (time.time() - self._started_time), + self._converter_timeout, + ), + ) + if self.duration is not None + else self._converter_timeout + ) + + should_continue = self._process_converted_audio( + converter, timeout=timeout, out_f=out_f + ) + + if not should_continue: + break + + def _process_converted_audio( + self, converter: AudioConverter, timeout: float, out_f: Optional[IO] + ) -> bool: + """ + It reads the converted audio from the converter and passes it downstream. + + :return: True if the process should continue, False if it should terminate. + """ + data = converter.read(timeout=timeout) + if not data: + return self._on_converter_timeout(converter) + + self._on_audio_converted(data, out_f) + return True + + def _on_converter_timeout(self, converter: AudioConverter) -> bool: + """ + Callback logic invoked if the converter times out. + + :return: ``True`` (default) if the thread is supposed to continue, + ``False`` if it should terminate. + """ + self.logger.debug('Timeout on converter %s', converter.__class__.__name__) + # Continue only if the converter hasn't terminated + return self._converter_terminated.is_set() + + @override + def run(self): + """ + Wrapper for the main loop that initializes the converter and the stream. + """ + super().run() + self.paused_changed.clear() + + try: + with self.open_converter() as converter, self._stream_type( + samplerate=self.sample_rate, + device=self.device, + channels=self.channels, + dtype=self.dtype, + latency=self.latency, + blocksize=self.blocksize, + **self._stream_args, + ) as self.audio_stream, open( + self.outfile, 'wb' + ) as out_f, self._audio_generator(): + self.main(converter=converter, out_f=out_f) + except queue.Empty: + self.logger.warning( + 'Audio callback timeout for %s', self.__class__.__name__ + ) + finally: + self.notify_stop() + + @contextmanager + def _audio_generator(self) -> Generator[Optional[Thread], None, None]: + """ + :yield: A pair where the thread generates raw audio + frames (as numpy arrays) that are sent to the specified queue. + """ + yield None + + @contextmanager + def open_converter(self) -> Generator[Optional[AudioConverter], None, None]: + """ + Context manager for the converter process. + """ + if self._audio_converter_type is None: + yield None + return + + assert not self._converter, 'A converter process is already running' + self._converter = self._audio_converter_type( + ffmpeg_bin=self.ffmpeg_bin, + sample_rate=self.sample_rate, + channels=self.channels, + volume=self.volume, + dtype=self.dtype, + chunk_size=self.blocksize, + on_exit=self._converter_terminated.set, + **self._converter_args, + ) + + self._converter.start() + yield self._converter + + self._converter.stop() + self._converter.join(timeout=2) + self._converter = None + + @contextmanager + def _change_state(self, state: AudioState, event_type: Type[SoundEvent]): + """ + Changes the state and it emits the specified event if the state has + actually changed. + + It uses a context manager pattern, and everything in between will be + executed before the events are dispatched. + """ + with self._state_lock: + prev_state = self.state + self.state = state + + yield + if prev_state != state: + self._notify(event_type) + + def _notify(self, event_type: Type[SoundEvent], **kwargs): + """ + Notifies the specified event. + """ + get_bus().post(event_type(device=self.device, **kwargs)) + + def notify_start(self): + """ + Notifies the start event. + """ + with self._change_state(AudioState.RUNNING, self._started_event_type): + pass + + def notify_stop(self): + """ + Notifies the stop event. + """ + with self._change_state(AudioState.STOPPED, self._stopped_event_type): + if self._converter: + self._converter.stop() + self.paused_changed.set() + self.paused_changed.clear() + + def notify_pause(self): + """ + Notifies a pause toggle event. + """ + states = { + AudioState.PAUSED: AudioState.RUNNING, + AudioState.RUNNING: AudioState.PAUSED, + } + + with self._state_lock: + new_state = states.get(self.state) + if not new_state: + return + + event_type = ( + self._paused_event_type + if new_state == AudioState.PAUSED + else self._resumed_event_type + ) + + with self._change_state(new_state, event_type): + self.paused_changed.set() + self.paused_changed.clear() + + @property + def state(self): + """ + Thread-safe wrapper for the stream state. + """ + with self._state_lock: + return self._state + + @state.setter + def state(self, value: AudioState): + """ + Thread-safe setter for the stream state. + """ + with self._state_lock: + self._state = value + + def asdict(self) -> dict: + """ + Serialize the thread information. + """ + return { + 'device': self.device, + 'outfile': self.outfile, + 'infile': self.infile, + 'direction': self.direction, + 'ffmpeg_bin': self.ffmpeg_bin, + 'channels': self.channels, + 'sample_rate': self.sample_rate, + 'dtype': self.dtype, + 'streaming': self.stream, + 'duration': self.duration, + 'blocksize': self.blocksize, + 'latency': self.latency, + 'redis_queue': self.redis_queue, + 'audio_pass_through': self.audio_pass_through, + 'state': self._state.value, + 'volume': self.volume, + 'started_time': datetime.fromtimestamp(self._started_time) + if self._started_time + else None, + 'stream_index': self.stream_index, + 'stream_name': self.stream_name, + } + + +# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/sound/_streams/_player/__init__.py b/platypush/plugins/sound/_streams/_player/__init__.py new file mode 100644 index 000000000..4f74ecdbc --- /dev/null +++ b/platypush/plugins/sound/_streams/_player/__init__.py @@ -0,0 +1,3 @@ +from ._base import AudioPlayer + +__all__ = ['AudioPlayer'] diff --git a/platypush/plugins/sound/_streams/_player/_base.py b/platypush/plugins/sound/_streams/_player/_base.py new file mode 100644 index 000000000..b2a324a2d --- /dev/null +++ b/platypush/plugins/sound/_streams/_player/_base.py @@ -0,0 +1,110 @@ +from abc import ABC +from typing import IO, Iterable, List, Optional, Self, Type, Union +from typing_extensions import override + +import numpy as np +import sounddevice as sd + +from platypush.message.event.sound import ( + SoundPlaybackPausedEvent, + SoundPlaybackResumedEvent, + SoundPlaybackStartedEvent, + SoundPlaybackStoppedEvent, +) + +from ..._converters import RawOutputAudioConverter +from ..._model import StreamType +from .._base import AudioThread + + +class AudioPlayer(AudioThread, ABC): + """ + Base ``AudioPlayer`` class. + + An ``AudioPlayer`` thread is responsible for playing audio (either from a + file/URL or from a synthetic source) to an output device, writing it to the + converter process and dispatching the converted audio to the registered + consumers. + """ + + def __init__( + self, *args, sound: Optional[Union[dict, Iterable[dict]]] = None, **kwargs + ): + super().__init__(*args, **kwargs) + self.sound = sound + + @classmethod + def build( + cls, + infile: Optional[str] = None, + sound: Optional[Union[dict, Iterable[dict]]] = None, + **kwargs, + ) -> Self: + from ._resource import AudioResourcePlayer + from ._synth import AudioSynthPlayer, Sound + + if infile: + return AudioResourcePlayer(infile=infile, **kwargs) + if sound: + sounds: List[dict] = ( # type: ignore + [sound] if isinstance(sound, dict) else sound + ) + + return AudioSynthPlayer(sounds=[Sound.build(**s) for s in sounds], **kwargs) + + raise AssertionError('Either infile or url must be specified') + + @property + @override + def direction(self) -> StreamType: + return StreamType.OUTPUT + + @override + def _on_converter_timeout(self, *_, **__) -> bool: + return False # break + + @property + @override + def _stream_type(self) -> Type[sd.RawOutputStream]: + return sd.RawOutputStream + + @property + @override + def _audio_converter_type(self) -> Type[RawOutputAudioConverter]: + return RawOutputAudioConverter + + @override + def _on_audio_converted(self, data: bytes, out_f: Optional[IO] = None): + if self.audio_stream: + self.audio_stream.write( + np.asarray( + self.gain + * np.frombuffer(data, dtype=self.dtype).reshape(-1, self.channels), + dtype=self.dtype, + ) + ) + + super()._on_audio_converted(data, out_f) + + @property + @override + def _started_event_type(self) -> Type[SoundPlaybackStartedEvent]: + return SoundPlaybackStartedEvent + + @property + @override + def _stopped_event_type(self) -> Type[SoundPlaybackStoppedEvent]: + return SoundPlaybackStoppedEvent + + @property + @override + def _paused_event_type(self) -> Type[SoundPlaybackPausedEvent]: + return SoundPlaybackPausedEvent + + @property + @override + def _resumed_event_type(self) -> Type[SoundPlaybackResumedEvent]: + return SoundPlaybackResumedEvent + + +# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/sound/_streams/_player/_resource.py b/platypush/plugins/sound/_streams/_player/_resource.py new file mode 100644 index 000000000..f58ed91a9 --- /dev/null +++ b/platypush/plugins/sound/_streams/_player/_resource.py @@ -0,0 +1,39 @@ +from typing import Optional, Type +from typing_extensions import override + +from platypush.message.event.sound import SoundEvent + +from ..._converters import RawOutputAudioFromFileConverter +from ._base import AudioPlayer + + +class AudioResourcePlayer(AudioPlayer): + """ + A ``AudioResourcePlayer`` thread is responsible for playing an audio + resource - either a file or a URL. + """ + + @property + @override + def _audio_converter_type(self) -> Type[RawOutputAudioFromFileConverter]: + return RawOutputAudioFromFileConverter + + @property + @override + def _converter_args(self) -> dict: + return { + 'infile': self.infile, + **super()._converter_args, + } + + @property + @override + def _converter_stdin(self) -> Optional[int]: + return None + + @override + def _notify(self, event_type: Type[SoundEvent], **kwargs): + return super()._notify(event_type, resource=self.infile, **kwargs) + + +# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/sound/_streams/_player/_synth/__init__.py b/platypush/plugins/sound/_streams/_player/_synth/__init__.py new file mode 100644 index 000000000..84fe582de --- /dev/null +++ b/platypush/plugins/sound/_streams/_player/_synth/__init__.py @@ -0,0 +1,4 @@ +from ._player import AudioSynthPlayer +from ._sound import Sound + +__all__ = ['AudioSynthPlayer', 'Sound'] diff --git a/platypush/plugins/sound/_streams/_player/_synth/_base.py b/platypush/plugins/sound/_streams/_player/_synth/_base.py new file mode 100644 index 000000000..7b1d13c9a --- /dev/null +++ b/platypush/plugins/sound/_streams/_player/_synth/_base.py @@ -0,0 +1,79 @@ +from abc import ABC, abstractmethod +from typing import Optional, Tuple + +import numpy as np +from numpy.typing import NDArray + +from ._parser import SoundParser + + +class SoundBase(SoundParser, ABC): + """ + Base class for synthetic sounds and mixes. + """ + + def __init__(self, *args, volume: float = 100, **kwargs) -> None: + super().__init__(*args, **kwargs) + self.volume = volume + + @property + def gain(self) -> float: + return self.volume / 100 + + @gain.setter + def gain(self, value: float): + self.volume = value * 100 + + @abstractmethod + def get_wave( + self, + sample_rate: float, + t_start: float = 0, + t_end: float = 0, + **_, + ) -> NDArray[np.floating]: + """ + Get the wave binary data associated to this sound + + :param t_start: Start offset for the wave in seconds. Default: 0 + :param t_end: End offset for the wave in seconds. Default: 0 + :param sample_rate: Audio sample rate. Default: 44100 Hz + :returns: A ``numpy.ndarray[(t_end-t_start)*sample_rate, 1]`` + with the raw float values + """ + raise NotImplementedError() + + def fft( + self, + sample_rate: float, + t_start: float = 0.0, + t_end: float = 0.0, + freq_range: Optional[Tuple[float, float]] = None, + freq_buckets: Optional[int] = None, + ) -> NDArray[np.floating]: + """ + Get the real part of the Fourier transform associated to a time-bounded + sample of this sound. + + :param t_start: Start offset for the wave in seconds. Default: 0 + :param t_end: End offset for the wave in seconds. Default: 0 + :param sample_rate: Audio sample rate. Default: 44100 Hz + :param freq_range: FFT frequency range. Default: ``(0, sample_rate/2)`` + (see`Nyquist-Shannon sampling theorem + `_) + :param freq_buckets: Number of buckets to subdivide the frequency range. + Default: None + :returns: A numpy.ndarray[freq_range,1] with the raw float values + """ + + if not freq_range: + freq_range = (0, int(sample_rate / 2)) + + wave = self.get_wave(t_start=t_start, t_end=t_end, sample_rate=sample_rate) + fft = np.fft.fft(wave.reshape(len(wave))) + fft = fft.real[freq_range[0] : freq_range[1]] + + if freq_buckets is not None: + fft = np.histogram(fft, bins=freq_buckets)[0] + + return fft diff --git a/platypush/plugins/sound/_streams/_player/_synth/_generator.py b/platypush/plugins/sound/_streams/_player/_synth/_generator.py new file mode 100644 index 000000000..22bc60fc5 --- /dev/null +++ b/platypush/plugins/sound/_streams/_player/_synth/_generator.py @@ -0,0 +1,101 @@ +from logging import getLogger +from queue import Full, Queue +from threading import Thread +from time import time +from typing import Any, Callable, Optional + +import numpy as np +from numpy.typing import NDArray + +from ._mix import Mix + + +class AudioGenerator(Thread): + """ + The ``AudioGenerator`` class is a thread that generates synthetic raw audio + waves and dispatches them to a queue that can be consumed by other players, + streamers and converters. + """ + + def __init__( + self, + *args, + audio_queue: Queue[NDArray[np.number]], + mix: Mix, + blocksize: int, + sample_rate: int, + queue_timeout: Optional[float] = None, + should_stop: Callable[[], bool] = lambda: False, + wait_running: Callable[[], Any] = lambda: None, + on_stop: Callable[[], Any] = lambda: None, + **kwargs, + ): + super().__init__(*args, **kwargs) + self._audio_queue = audio_queue + self._t_start: float = 0 + self._blocksize: int = blocksize + self._sample_rate: int = sample_rate + self._blocktime = self._blocksize / self._sample_rate + self._should_stop = should_stop + self._queue_timeout = queue_timeout + self._wait_running = wait_running + self._on_stop = on_stop + self.mix = mix + self.logger = getLogger(__name__) + + def _next_t(self, t: float) -> float: + """ + Calculates the next starting time for the wave function. + """ + return ( + min(t + self._blocktime, self._duration) + if self._duration is not None + else t + self._blocktime + ) + + def should_stop(self) -> bool: + """ + Stops if the upstream dependencies have signalled to stop or if the + duration is set and we have reached it. + """ + return self._should_stop() or ( + self._duration is not None and time() - self._t_start >= self._duration + ) + + @property + def _duration(self) -> Optional[float]: + """ + Proxy to the mix object's duration. + """ + return self.mix.duration() + + def run(self): + super().run() + self._t_start = time() + t = 0 + + while not self.should_stop(): + self._wait_running() + if self.should_stop(): + break + + next_t = self._next_t(t) + + try: + data = self.mix.get_wave( + t_start=t, t_end=next_t, sample_rate=self._sample_rate + ) + except Exception as e: + self.logger.warning('Could not generate the audio wave: %s', e) + break + + try: + self._audio_queue.put(data, timeout=self._queue_timeout) + t = next_t + except Full: + self.logger.warning( + 'The processing queue is full: either the audio consumer is stuck, ' + 'or you may want to increase queue_size' + ) + + self._on_stop() diff --git a/platypush/plugins/sound/_streams/_player/_synth/_mix.py b/platypush/plugins/sound/_streams/_player/_synth/_mix.py new file mode 100644 index 000000000..c7d09a419 --- /dev/null +++ b/platypush/plugins/sound/_streams/_player/_synth/_mix.py @@ -0,0 +1,115 @@ +import json +import logging +from typing import List, Tuple, Union +from typing_extensions import override + +import numpy as np +from numpy.typing import DTypeLike, NDArray + +from ...._utils import convert_nd_array +from ._base import SoundBase +from ._sound import Sound + + +class Mix(SoundBase): + """ + This class models a set of mixed :class:`._sound.Sound` instances that can be played + through an audio stream to an audio device + """ + + def __init__(self, *sounds, channels: int, dtype: DTypeLike, **kwargs): + super().__init__(**kwargs) + self._sounds: List[Sound] = [] + self.logger = logging.getLogger(__name__) + self.channels = channels + self.dtype = np.dtype(dtype) + + for sound in sounds: + self.add(sound) + + def __iter__(self): + """ + Iterate over the object's attributes and return key-pair values. + """ + for sound in self._sounds: + yield dict(sound) + + def __str__(self): + """ + Return a JSON string representation of the object. + """ + return json.dumps(list(self)) + + def add(self, *sounds: Union[Sound, dict]): + """ + Add one or more sounds to the mix. + """ + self._sounds += [Sound.build(sound) for sound in sounds] + + def remove(self, *sound_indices: int): + """ + Remove one or more sounds from the mix. + """ + assert self._sounds and all( + 0 <= sound_index < len(sound_indices) for sound_index in sound_indices + ), f'Sound indices must be between 0 and {len(self._sounds) - 1}' + + for sound_index in sound_indices[::-1]: + self._sounds.pop(sound_index) + + @override + def get_wave( + self, + sample_rate: float, + t_start: float = 0, + t_end: float = 0, + normalize_range: Tuple[float, float] = (-1.0, 1.0), + on_clip: str = 'scale', + **_, + ) -> NDArray[np.number]: + wave = None + + for sound in self._sounds: + sound_wave = sound.get_wave( + t_start=t_start, t_end=t_end, sample_rate=sample_rate + ) + + if wave is None: + wave = sound_wave + else: + wave += sound_wave + + if wave is not None and len(wave): + scale_factor = (normalize_range[1] - normalize_range[0]) / ( + wave.max() - wave.min() + ) + + if scale_factor < 1.0: # Wave clipping + if on_clip == 'scale': + wave = scale_factor * wave + elif on_clip == 'clip': + wave[wave < normalize_range[0]] = normalize_range[0] + wave[wave > normalize_range[1]] = normalize_range[1] + else: + raise RuntimeError( + 'Supported values for "on_clip": ' + '"scale" or "clip"' + ) + + assert wave is not None + return convert_nd_array(self.gain * wave, dtype=self.dtype) + + def duration(self): + """ + :returns: The duration of the mix in seconds as duration of its longest + sample, or None if the mixed sample have no duration set + """ + + # If any sound has no duration specified, then the resulting mix will + # have no duration as well. + if any(sound.duration is None for sound in self._sounds): + return None + + return max(((sound.duration or 0) + sound.delay for sound in self._sounds)) + + +# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/sound/_streams/_player/_synth/_output.py b/platypush/plugins/sound/_streams/_player/_synth/_output.py new file mode 100644 index 000000000..d2743f2db --- /dev/null +++ b/platypush/plugins/sound/_streams/_player/_synth/_output.py @@ -0,0 +1,79 @@ +from logging import getLogger +from queue import Empty, Queue +from typing import Callable, Optional + +import sounddevice as sd + +import numpy as np +from numpy.typing import NDArray + + +# pylint: disable=too-few-public-methods +class AudioOutputCallback: + """ + The ``AudioSynthOutput`` is a functor that wraps the ``sounddevice.Stream`` + callback and writes raw audio data to the audio device. + """ + + def __init__( + self, + *args, + audio_queue: Queue[NDArray[np.number]], + channels: int, + blocksize: int, + should_stop: Callable[[], bool] = lambda: False, + is_paused: Callable[[], bool] = lambda: False, + queue_timeout: Optional[float] = None, + **kwargs, + ): + super().__init__(*args, **kwargs) + self._audio_queue = audio_queue + self._channels = channels + self._blocksize = blocksize + self._should_stop = should_stop + self._is_paused = is_paused + self._queue_timeout = queue_timeout + self.logger = getLogger(__name__) + + def _check_status(self, frames: int, status): + """ + Checks the current status of the audio callback and raises errors if + the processing shouldn't continue. + """ + if self._should_stop(): + raise sd.CallbackStop + + assert frames == self._blocksize, ( + f'Received {frames} frames, expected blocksize is {self._blocksize}', + ) + + assert not status.output_underflow, 'Output underflow: increase blocksize?' + assert not status, f'Audio callback failed: {status}' + + def _audio_callback(self, outdata: NDArray[np.number], frames: int, status): + if self._is_paused(): + return + + self._check_status(frames, status) + + try: + data = self._audio_queue.get_nowait() + except Empty as e: + raise ( + sd.CallbackStop + if self._should_stop() + else AssertionError('Buffer is empty: increase buffersize?') + ) from e + + if data.shape[0] == 0: + raise sd.CallbackStop + + audio_length = min(len(data), len(outdata)) + outdata[:audio_length] = data[:audio_length] + + # _ = time + def __call__(self, outdata: NDArray[np.number], frames: int, _, status): + try: + self._audio_callback(outdata, frames, status) + except AssertionError as e: + self.logger.warning(str(e)) diff --git a/platypush/plugins/sound/_streams/_player/_synth/_parser.py b/platypush/plugins/sound/_streams/_player/_synth/_parser.py new file mode 100644 index 000000000..bdccb9fde --- /dev/null +++ b/platypush/plugins/sound/_streams/_player/_synth/_parser.py @@ -0,0 +1,111 @@ +import math +import re +from typing import Optional, Union + + +class SoundParser: + """ + A utility mixin with some methods to parse and convert sound information - + e.g. MIDI notes from strings, MIDI notes to frequencies, and the other way + around. + """ + + _DEFAULT_A4_FREQUENCY = 440.0 + _MIDI_NOTE_REGEX = re.compile(r'^([A-G])([#b]?)(-?[0-9]+)$') + _MID_A_MIDI_NOTE = 69 + _NOTE_OFFSETS = { + 'C': 0, + 'C#': 1, + 'Db': 1, + 'D': 2, + 'D#': 3, + 'Eb': 3, + 'E': 4, + 'F': 5, + 'F#': 6, + 'Gb': 6, + 'G': 7, + 'G#': 8, + 'Ab': 8, + 'A': 9, + 'A#': 10, + 'Bb': 10, + 'B': 11, + } + + _ALTERATION_OFFSETS = { + 'b': -1, + '': 0, + '#': 1, + } + + def __init__(self, *_, ref_frequency: float = _DEFAULT_A4_FREQUENCY, **__) -> None: + self._ref_frequency = ref_frequency + + @staticmethod + def _get_alteration_offset(alt: str) -> int: + """ + Calculate the MIDI note offset given by its reported sharp/flat alteration. + """ + if alt == '#': + return 1 + if alt == 'b': + return -1 + return 0 + + @classmethod + def get_midi_note(cls, note: Union[str, int]) -> int: + """ + Convert a MIDI note given as input (either an integer or a string like + 'C4') to a MIDI note number. + + :raise: ValueError + """ + + if isinstance(note, str): + note = note[:1].upper() + note[1:] + m = cls._MIDI_NOTE_REGEX.match(note) + if not m: + raise ValueError(f'Invalid MIDI note: {note}') + + base_note, alteration, octave = m.groups() + octave = int(octave) + note_offset = cls._NOTE_OFFSETS[base_note] + cls._get_alteration_offset( + alteration + ) + + octave_offset = (octave + 1) * 12 + note = octave_offset + note_offset + + if isinstance(note, int): + if not 0 <= note <= 127: + raise ValueError(f'MIDI note out of range: {note}') + return note + + raise ValueError(f'Invalid MIDI note: {note}') + + def note_to_freq( + self, midi_note: Union[int, str], ref_frequency: Optional[float] = None + ): + """ + Converts a MIDI note to its frequency in Hz + + :param midi_note: MIDI note to convert + :param ref_frequency: Reference A4 frequency override (default: 440 Hz). + """ + + note = self.get_midi_note(midi_note) + return (2.0 ** ((note - self._MID_A_MIDI_NOTE) / 12.0)) * ( + ref_frequency or self._ref_frequency + ) + + def freq_to_note(self, frequency: float, ref_frequency: Optional[float] = None): + """ + Converts a frequency in Hz to its closest MIDI note + + :param frequency: Frequency in Hz + :param ref_frequency: Reference A4 frequency override (default: 440 Hz). + """ + + std_freq = ref_frequency or self._ref_frequency + return int(12.0 * math.log(frequency / std_freq, 2) + self._MID_A_MIDI_NOTE) diff --git a/platypush/plugins/sound/_streams/_player/_synth/_player.py b/platypush/plugins/sound/_streams/_player/_synth/_player.py new file mode 100644 index 000000000..dde23caf0 --- /dev/null +++ b/platypush/plugins/sound/_streams/_player/_synth/_player.py @@ -0,0 +1,125 @@ +from contextlib import contextmanager +from queue import Queue +from threading import Event +from typing import Any, Generator, Iterable, Optional, Type +from typing_extensions import override + +import numpy as np +import sounddevice as sd +from numpy.typing import DTypeLike, NDArray + +from ...._model import AudioState +from ..._player import AudioPlayer +from ._generator import AudioGenerator +from ._mix import Mix +from ._output import AudioOutputCallback +from ._sound import Sound + + +class AudioSynthPlayer(AudioPlayer): + """ + The ``AudioSynthPlayer`` can play synthetic sounds (specified either by MIDI + note or raw frequency) to an audio device. + """ + + def __init__( + self, + *args, + volume: float, + channels: int, + dtype: DTypeLike, + sounds: Optional[Iterable[Sound]] = None, + **kwargs + ): + sounds = sounds or [] + self.mix = Mix(*sounds, volume=volume, channels=channels, dtype=dtype) + + super().__init__(*args, volume=volume, channels=channels, dtype=dtype, **kwargs) + self._generator_stopped = Event() + self._completed_callback_event = Event() + self._audio_queue: Queue[NDArray[np.number]] = Queue( + maxsize=self.queue_size or 0 + ) + + @property + @override + def _stream_type(self) -> Type[sd.OutputStream]: + return sd.OutputStream + + @property + @override + def _audio_converter_type(self) -> None: + pass + + def __setattr__(self, __name: str, __value: Any): + """ + Make sure that the relevant attributes are synchronized to the mix + object upon set/update. + """ + if __name == 'volume': + # Propagate the volume changes to the mix object. + self.mix.volume = __value + return super().__setattr__(__name, __value) + + @override + def _on_converter_timeout(self, *_, **__) -> bool: + """ + Don't break the audio stream if the output converter failed + """ + return True + + @property + @override + def _stream_args(self) -> dict: + """ + Register an :class:`.AudioOutputCallback` to fill up the audio buffers. + """ + return { + 'callback': AudioOutputCallback( + audio_queue=self._audio_queue, + channels=self.channels, + blocksize=self.blocksize, + queue_timeout=self._queue_timeout, + should_stop=lambda: self.should_stop + or self._generator_stopped.is_set(), + is_paused=lambda: self.state == AudioState.PAUSED, + ), + 'finished_callback': self._completed_callback_event.set, + **super()._stream_args, + } + + @property + def _queue_timeout(self) -> float: + """ + Estimated max read/write timeout on the audio queue. + """ + return self.blocksize * (self.queue_size or 5) / self.sample_rate + + @override + @contextmanager + def _audio_generator(self) -> Generator[AudioGenerator, None, None]: + stop_generator = Event() + gen = AudioGenerator( + audio_queue=self._audio_queue, + mix=self.mix, + blocksize=self.blocksize, + sample_rate=self.sample_rate, + queue_timeout=self._queue_timeout, + should_stop=lambda: self.should_stop or stop_generator.is_set(), + wait_running=self._wait_running, + on_stop=self._on_stop, + ) + + self._generator_stopped.clear() + gen.start() + yield gen + + stop_generator.set() + gen.join() + + def _on_stop(self): + self._generator_stopped.set() + self.notify_stop() + + +# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/sound/_streams/_player/_synth/_sound.py b/platypush/plugins/sound/_streams/_player/_synth/_sound.py new file mode 100644 index 000000000..1edc401ad --- /dev/null +++ b/platypush/plugins/sound/_streams/_player/_synth/_sound.py @@ -0,0 +1,225 @@ +from enum import Enum +import json +from typing import Final, Optional, Tuple, Union +from typing_extensions import override + +import numpy as np +from numpy.typing import NDArray + +from ._base import SoundBase + + +class WaveShape(Enum): + """ + Supported audio wave shapes. + """ + + SIN = 'sin' + SQUARE = 'square' + SAWTOOTH = 'sawtooth' + TRIANG = 'triang' + + +class Sound(SoundBase): + """ + Models a basic synthetic sound that can be played through an audio device + """ + + _DEFAULT_MID_A_FREQUENCY: Final[float] = 440.0 + + def __init__( + self, + *args, + midi_note: Optional[Union[str, int]] = None, + frequency: Optional[float] = None, + phase: float = 0, + duration: Optional[float] = None, + delay: float = 0, + shape: WaveShape = WaveShape.SIN, + **kwargs, + ): + """ + You can construct a sound either from a MIDI note or a base frequency, + as well as the shape of the output wave. + + :param midi_note: MIDI note code, see `this chart + `_. + :param frequency: Sound base frequency in Hz + :param phase: Wave phase shift as a multiple of pi (default: 0.0) + :param duration: Note duration in seconds. Default: keep until + release/pause/stop + :param delay: Sound delay in seconds, calculated from the moment the + command execution starts. Default: 0. + :param shape: Wave shape. Possible values: "``sin``", "``square``", + "``sawtooth``" or "``triang``" (see :class:`WaveShape`). + Default: "``sin``" + """ + + super().__init__(*args, **kwargs) + invalid_request = RuntimeError( + 'Please specify either a MIDI note or a base frequency' + ) + + if midi_note and frequency: + raise invalid_request + + if midi_note: + self.midi_note = self.get_midi_note(midi_note) + self.frequency = self.note_to_freq(midi_note=midi_note) + elif frequency: + self.frequency = frequency + self.midi_note = self.freq_to_note(frequency=frequency) + else: + raise invalid_request + + self.phase = phase + self.duration = duration + self.delay = delay + self.shape = WaveShape(shape) + + def _get_left_audio_pad( + self, sample_rate: float, t_start: float, t_end: float + ) -> int: + """ + Get the size of the audio wave left zero-pad given in function of its + ``delay``, ``sample_rate``, ``t_start`` and ``t_end``. + """ + return round(max(0, min(t_end, self.delay) - t_start) * sample_rate) + + def _get_right_audio_pad( + self, sample_rate: float, t_start: float, t_end: float + ) -> int: + """ + Get the size of the audio wave right zero-pad given its declared + ``delay`` in function of ``t_start`` and ``t_end``. + """ + if not self.duration: + return 0 + + duration = self.delay + self.duration + if t_end <= duration: + return 0 + + return round((t_end - max(t_start, duration)) * sample_rate) + + def _get_audio_pad( + self, sample_rate: float, t_start: float, t_end: float + ) -> Tuple[NDArray[np.floating], NDArray[np.floating]]: + """ + Return the left and right audio pads for a given audio length as a + ``(left, right)`` tuple of numpy zero-filled arrays. + """ + return tuple( + np.zeros([pad_size, 1]) + for pad_size in ( + self._get_left_audio_pad( + sample_rate=sample_rate, t_start=t_start, t_end=t_end + ), + self._get_right_audio_pad( + sample_rate=sample_rate, t_start=t_start, t_end=t_end + ), + ) + ) + + def _generate_wave(self, x: NDArray[np.floating]): + """ + Generate a raw audio wave as a numpy array of floating between -1 and 1 + given ``x`` as a set of timestamp samples. + """ + if self.shape in (WaveShape.SIN, WaveShape.SQUARE): + wave = np.sin((2 * np.pi * self.frequency * x) + np.pi * self.phase) + + if self.shape == WaveShape.SQUARE: + wave[wave < 0] = -0.95 + wave[wave >= 0] = 0.95 + elif self.shape in (WaveShape.SAWTOOTH, WaveShape.TRIANG): + wave = 2 * (self.frequency * x - np.floor(0.5 + self.frequency * x)) + if self.shape == WaveShape.TRIANG: + wave = 2 * np.abs(wave) - 1 + else: + raise RuntimeError( + f'Unsupported wave shape: {self.shape}. ' + f'Supported values: {[s.value for s in WaveShape]}' + ) + + return wave + + @override + def get_wave( + self, + sample_rate: float, + t_start: float = 0, + t_end: float = 0, + **_, + ) -> NDArray[np.floating]: + """ + Get the wave binary data associated to this sound + + :param t_start: Start offset for the wave in seconds. Default: 0 + :param t_end: End offset for the wave in seconds. Default: 0 + :param sample_rate: Audio sample rate. Default: 44100 Hz + :returns: A ``numpy.ndarray[(t_end-t_start)*sample_rate, 1]`` + with the raw float values + """ + + assert self.frequency is not None, 'The sound has no associated base frequency' + if t_start > t_end: + return np.array([]) + + left_pad, right_pad = self._get_audio_pad( + sample_rate=sample_rate, t_start=t_start, t_end=t_end + ) + t_start = min(t_end, t_start + (left_pad.shape[0] / sample_rate)) + t_end = max(t_start, t_end - (right_pad.shape[0] / sample_rate)) + actual_n_samples = abs(round((t_end - t_start) * sample_rate)) + wave_length = max(t_start, self.delay - t_start) + + if self.duration is not None: + wave_length = min(wave_length, self.duration - self.delay) + + x = np.linspace( + max(t_start, self.delay - t_start), + t_end, + actual_n_samples, + ).reshape(-1, 1) + + return self.gain * np.array( + ( + *left_pad, + *self._generate_wave(x), + *right_pad, + ) + ) + + def __iter__(self): + """ + Iterates over the sound's attributes and returns key-value pairs. + """ + for attr in ['midi_note', 'frequency', 'volume', 'duration', 'ref_frequency']: + yield attr, getattr(self, attr) + + def __str__(self): + """ + :return: A JSON-string representation of the sound dictionary. + """ + return json.dumps(dict(self)) + + @classmethod + def build(cls, *args, **kwargs) -> "Sound": + """ + Construct a sound object either from a JSON representation or a + key-value representation + """ + + if args: + if isinstance(args[0], cls): + return args[0] + if isinstance(args[0], str): + kwargs = json.loads(args[0]) + elif isinstance(args[0], dict): + kwargs = args[0] + + if kwargs: + return Sound(**kwargs) + + raise RuntimeError(f'Usage: {__doc__}') diff --git a/platypush/plugins/sound/_streams/_recorder.py b/platypush/plugins/sound/_streams/_recorder.py new file mode 100644 index 000000000..d5c560caa --- /dev/null +++ b/platypush/plugins/sound/_streams/_recorder.py @@ -0,0 +1,106 @@ +from typing import Type +from typing_extensions import override + +import sounddevice as sd + +from platypush.message.event.sound import ( + SoundRecordingPausedEvent, + SoundRecordingResumedEvent, + SoundRecordingStartedEvent, + SoundRecordingStoppedEvent, +) + +from .._converters import RawInputAudioConverter +from .._model import AudioState, StreamType +from ._base import AudioThread + + +class AudioRecorder(AudioThread): + """ + The ``AudioRecorder`` thread is responsible for recording audio from the + input device, writing it to the converter process and dispatch the + converted audio to the registered consumers. + """ + + def __init__(self, *args, output_format: str, **kwargs): + super().__init__(*args, **kwargs) + self.output_format = output_format + + @property + @override + def direction(self) -> StreamType: + return StreamType.INPUT + + @override + def _audio_callback(self): + # _ = frames + # __ = time + def callback(indata, outdata, _, __, status): + if self.state != AudioState.RUNNING: + return + + if status: + self.logger.warning('Recording callback status: %s', status) + + if not self._converter: + self.logger.warning( + 'The ffmpeg converter process has already terminated' + ) + self.notify_stop() + raise sd.CallbackStop + + try: + self._converter.write(indata.tobytes()) + except AssertionError as e: + self.logger.warning('Audio converter callback error: %s', e) + self.state = AudioState.STOPPED + return + + if self.audio_pass_through: + outdata[:] = indata + + return callback + + @property + @override + def _audio_converter_type(self) -> Type[RawInputAudioConverter]: + return RawInputAudioConverter + + @property + @override + def _started_event_type(self) -> Type[SoundRecordingStartedEvent]: + return SoundRecordingStartedEvent + + @property + @override + def _stopped_event_type(self) -> Type[SoundRecordingStoppedEvent]: + return SoundRecordingStoppedEvent + + @property + @override + def _paused_event_type(self) -> Type[SoundRecordingPausedEvent]: + return SoundRecordingPausedEvent + + @property + @override + def _resumed_event_type(self) -> Type[SoundRecordingResumedEvent]: + return SoundRecordingResumedEvent + + @property + @override + def _converter_args(self) -> dict: + return { + 'format': self.output_format, + **super()._converter_args, + } + + @property + @override + def _stream_args(self) -> dict: + return { + 'callback': self._audio_callback(), + **super()._stream_args, + } + + +# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/sound/_utils/__init__.py b/platypush/plugins/sound/_utils/__init__.py new file mode 100644 index 000000000..116ba5d70 --- /dev/null +++ b/platypush/plugins/sound/_utils/__init__.py @@ -0,0 +1,3 @@ +from ._convert import convert_nd_array + +__all__ = ["convert_nd_array"] diff --git a/platypush/plugins/sound/_utils/_convert.py b/platypush/plugins/sound/_utils/_convert.py new file mode 100644 index 000000000..fa27fe510 --- /dev/null +++ b/platypush/plugins/sound/_utils/_convert.py @@ -0,0 +1,28 @@ +import numpy as np +from numpy.typing import DTypeLike, NDArray + + +def convert_nd_array( # pylint: disable=too-many-return-statements + wave: NDArray[np.floating], dtype: DTypeLike +) -> NDArray[np.number]: + """ + Given a wave as a series of floating point numbers, convert them to the + appropriate data type. + """ + t = np.dtype(dtype) + if t in {np.float16, np.float32, np.float64}: + return wave.astype(t) + if t == np.int8: + return (wave * 2**8).astype(t) + if t == np.uint8: + return ((wave + 1) * 2**8).astype(t) + if t == np.int16: + return (wave * 2**15).astype(t) + if t == np.uint16: + return ((wave + 1) * 2**16).astype(t) + if t == np.int32: + return (wave * 2**31).astype(t) + if t == np.uint32: + return ((wave + 1) * 2**32).astype(t) + + raise AssertionError(f'Unsupported dtype: {dtype}') diff --git a/platypush/plugins/sound/core.py b/platypush/plugins/sound/core.py deleted file mode 100644 index fc1d3ee22..000000000 --- a/platypush/plugins/sound/core.py +++ /dev/null @@ -1,405 +0,0 @@ -import enum -import logging -import json -import math - - -class WaveShape(enum.Enum): - SIN = 'sin' - SQUARE = 'square' - SAWTOOTH = 'sawtooth' - TRIANG = 'triang' - - -class Sound: - """ - Models a basic synthetic sound that can be played through an audio device - """ - - STANDARD_A_FREQUENCY = 440.0 - STANDARD_A_MIDI_NOTE = 69 - _DEFAULT_BLOCKSIZE = 1024 - _DEFAULT_SYNTH_BUFSIZE = 2 - _DEFAULT_FILE_BUFSIZE = 20 - _DEFAULT_SAMPLERATE = 44100 - - midi_note = None - frequency = None - phase = 0.0 - gain = 1.0 - duration = None - shape = None - - def __init__( - self, - midi_note=midi_note, - frequency=None, - phase=phase, - gain=gain, - duration=duration, - shape=WaveShape.SIN, - A_frequency=STANDARD_A_FREQUENCY, - ): - """ - You can construct a sound either from a MIDI note or a base frequency - - :param midi_note: MIDI note code, see - https://newt.phys.unsw.edu.au/jw/graphics/notes.GIF - :type midi_note: int - - :param frequency: Sound base frequency in Hz - :type frequency: float - - :param phase: Wave phase shift as a multiple of pi (default: 0.0) - :type phase: float - - :param gain: Note gain/volume between 0.0 and 1.0 (default: 1.0) - :type gain: float - - :param duration: Note duration in seconds. Default: keep until - release/pause/stop - :type duration: float - - :param shape: Wave shape. Possible values: "``sin``", "``square``", - "``sawtooth``" or "``triang``" (see :class:`WaveSound`). - Default: "``sin``" - :type shape: str - - :param A_frequency: Reference A4 frequency (default: 440 Hz) - :type A_frequency: float - """ - - if midi_note and frequency: - raise RuntimeError( - 'Please specify either a MIDI note or a base ' + 'frequency' - ) - - if midi_note: - self.midi_note = midi_note - self.frequency = self.note_to_freq( - midi_note=midi_note, A_frequency=A_frequency - ) - elif frequency: - self.frequency = frequency - self.midi_note = self.freq_to_note( - frequency=frequency, A_frequency=A_frequency - ) - else: - raise RuntimeError( - 'Please specify either a MIDI note or a base ' + 'frequency' - ) - - self.phase = phase - self.gain = gain - self.duration = duration - self.shape = WaveShape(shape) - - @classmethod - def note_to_freq(cls, midi_note, A_frequency=STANDARD_A_FREQUENCY): - """ - Converts a MIDI note to its frequency in Hz - - :param midi_note: MIDI note to convert - :type midi_note: int - - :param A_frequency: Reference A4 frequency (default: 440 Hz) - :type A_frequency: float - """ - - return (2.0 ** ((midi_note - cls.STANDARD_A_MIDI_NOTE) / 12.0)) * A_frequency - - @classmethod - def freq_to_note(cls, frequency, A_frequency=STANDARD_A_FREQUENCY): - """ - Converts a frequency in Hz to its closest MIDI note - - :param frequency: Frequency in Hz - :type frequency: float - - :param A_frequency: Reference A4 frequency (default: 440 Hz) - :type A_frequency: float - """ - - # TODO return also the offset in % between the provided frequency - # and the standard MIDI note frequency - return int( - 12.0 * math.log(frequency / A_frequency, 2) + cls.STANDARD_A_MIDI_NOTE - ) - - def get_wave(self, t_start=0.0, t_end=0.0, samplerate=_DEFAULT_SAMPLERATE): - """ - Get the wave binary data associated to this sound - - :param t_start: Start offset for the wave in seconds. Default: 0 - :type t_start: float - - :param t_end: End offset for the wave in seconds. Default: 0 - :type t_end: float - - :param samplerate: Audio sample rate. Default: 44100 Hz - :type samplerate: int - - :returns: A ``numpy.ndarray[(t_end-t_start)*samplerate, 1]`` - with the raw float values - """ - - import numpy as np - - x = np.linspace(t_start, t_end, int((t_end - t_start) * samplerate)) - - x = x.reshape(len(x), 1) - - if self.shape == WaveShape.SIN or self.shape == WaveShape.SQUARE: - wave = np.sin((2 * np.pi * self.frequency * x) + np.pi * self.phase) - - if self.shape == WaveShape.SQUARE: - wave[wave < 0] = -1 - wave[wave >= 0] = 1 - elif self.shape == WaveShape.SAWTOOTH or self.shape == WaveShape.TRIANG: - wave = 2 * (self.frequency * x - np.floor(0.5 + self.frequency * x)) - if self.shape == WaveShape.TRIANG: - wave = 2 * np.abs(wave) - 1 - else: - raise RuntimeError('Unsupported wave shape: {}'.format(self.shape)) - - return self.gain * wave - - def fft( - self, - t_start=0.0, - t_end=0.0, - samplerate=_DEFAULT_SAMPLERATE, - freq_range=None, - freq_buckets=None, - ): - """ - Get the real part of the Fourier transform associated to a time-bounded - sample of this sound - - :param t_start: Start offset for the wave in seconds. Default: 0 - :type t_start: float - - :param t_end: End offset for the wave in seconds. Default: 0 - :type t_end: float - - :param samplerate: Audio sample rate. Default: 44100 Hz - :type samplerate: int - - :param freq_range: FFT frequency range. Default: ``(0, samplerate/2)`` - (see`Nyquist-Shannon sampling theorem - `_) - :type freq_range: list or tuple with 2 int elements (range) - - :param freq_buckets: Number of buckets to subdivide the frequency range. - Default: None - :type freq_buckets: int - - :returns: A numpy.ndarray[freq_range,1] with the raw float values - """ - - import numpy as np - - if not freq_range: - freq_range = (0, int(samplerate / 2)) - - wave = self.get_wave(t_start=t_start, t_end=t_end, samplerate=samplerate) - fft = np.fft.fft(wave.reshape(len(wave))) - fft = fft.real[freq_range[0] : freq_range[1]] - - if freq_buckets is not None: - fft = np.histogram(fft, bins=freq_buckets) - - return fft - - def __iter__(self): - for attr in ['midi_note', 'frequency', 'gain', 'duration']: - yield attr, getattr(self, attr) - - def __str__(self): - return json.dumps(dict(self)) - - @classmethod - def build(cls, *args, **kwargs): - """ - Construct a sound object either from a JSON representation or a - key-value representation - """ - - if args: - if isinstance(args[0], cls): - return args[0] - if isinstance(args[0], str): - kwargs = json.loads(args[0]) - elif isinstance(args[0], dict): - kwargs = args[0] - if kwargs: - return Sound(**kwargs) - - raise RuntimeError('Usage: {}'.format(__doc__)) - - -class Mix: - """ - This class models a set of mixed :class:`Sound` instances that can be played - through an audio stream to an audio device - """ - - _sounds = None - - def __init__(self, *sounds): - self._sounds = [] - self.logger = logging.getLogger(__name__) - - for sound in sounds: - self.add(sound) - - def __iter__(self): - for sound in self._sounds: - yield dict(sound) - - def __str__(self): - return json.dumps(list(self)) - - def add(self, sound): - self._sounds.append(Sound.build(sound)) - - def remove(self, sound_index): - if sound_index >= len(self._sounds): - self.logger.error( - 'No such sound index: {} in mix {}'.format(sound_index, list(self)) - ) - return - - self._sounds.pop(sound_index) - - # noinspection PyProtectedMember - def get_wave( - self, - t_start=0.0, - t_end=0.0, - normalize_range=(-1.0, 1.0), - on_clip='scale', - samplerate=Sound._DEFAULT_SAMPLERATE, - ): - """ - Get the wave binary data associated to this mix - - :param t_start: Start offset for the wave in seconds. Default: 0 - :type t_start: float - - :param t_end: End offset for the wave in seconds. Default: 0 - :type t_end: float - - :param normalize_range: Normalization range. If set the gain values of the - wave will be normalized to fit into the specified range if it - "clips" above or below. Default: ``(-1.0, 1.0)`` - :type normalize_range: list[float] - - :param on_clip: Action to take on wave clipping if ``normalize_range`` - is set. Possible values: "``scale``" (scale down the frame to remove - the clipping) or "``clip``" (saturate the values above/below range). - Default: "``scale``". - :type on_clip: str - - :param samplerate: Audio sample rate. Default: 44100 Hz - :type samplerate: int - - :returns: A numpy.ndarray[n,1] with the raw float values - """ - - wave = None - - for sound in self._sounds: - sound_wave = sound.get_wave( - t_start=t_start, t_end=t_end, samplerate=samplerate - ) - - if wave is None: - wave = sound_wave - else: - wave += sound_wave - - if normalize_range and len(wave): - scale_factor = (normalize_range[1] - normalize_range[0]) / ( - wave.max() - wave.min() - ) - - if scale_factor < 1.0: # Wave clipping - if on_clip == 'scale': - wave = scale_factor * wave - elif on_clip == 'clip': - wave[wave < normalize_range[0]] = normalize_range[0] - wave[wave > normalize_range[1]] = normalize_range[1] - else: - raise RuntimeError( - 'Supported values for "on_clip": ' + '"scale" or "clip"' - ) - - return wave - - # noinspection PyProtectedMember - def fft( - self, - t_start=0.0, - t_end=0.0, - samplerate=Sound._DEFAULT_SAMPLERATE, - freq_range=None, - freq_buckets=None, - ): - """ - Get the real part of the Fourier transform associated to a time-bounded - sample of this mix - - :param t_start: Start offset for the wave in seconds. Default: 0 - :type t_start: float - - :param t_end: End offset for the wave in seconds. Default: 0 - :type t_end: float - - :param samplerate: Audio sample rate. Default: 44100 Hz - :type samplerate: int - - :param freq_range: FFT frequency range. Default: ``(0, samplerate/2)`` - (see `Nyquist-Shannon sampling theorem - `_) - :type freq_range: list or tuple with 2 int elements (range) - - :param freq_buckets: Number of buckets to subdivide the frequency range. - Default: None - :type freq_buckets: int - - :returns: A numpy.ndarray[freq_range,1] with the raw float values - """ - - import numpy as np - - if not freq_range: - freq_range = (0, int(samplerate / 2)) - - wave = self.get_wave(t_start=t_start, t_end=t_end, samplerate=samplerate) - fft = np.fft.fft(wave.reshape(len(wave))) - fft = fft.real[freq_range[0] : freq_range[1]] - - if freq_buckets is not None: - fft = np.histogram(fft, bins=freq_buckets) - - return fft - - def duration(self): - """ - :returns: The duration of the mix in seconds as duration of its longest - sample, or None if the mixed sample have no duration set - """ - - duration = 0 - - for sound in self._sounds: - if sound.duration is None: - return None - - duration = max(duration, sound.duration) - - return duration - - -# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/sound/manifest.yaml b/platypush/plugins/sound/manifest.yaml index a2cddcf97..c44f0f17b 100644 --- a/platypush/plugins/sound/manifest.yaml +++ b/platypush/plugins/sound/manifest.yaml @@ -1,15 +1,16 @@ manifest: events: platypush.message.event.sound.SoundPlaybackPausedEvent: on playback pause + platypush.message.event.sound.SoundPlaybackResumedEvent: on playback resume platypush.message.event.sound.SoundPlaybackStartedEvent: on playback start platypush.message.event.sound.SoundPlaybackStoppedEvent: on playback stop platypush.message.event.sound.SoundRecordingPausedEvent: on recording pause + platypush.message.event.sound.SoundRecordingResumedEvent: on recording resumed platypush.message.event.sound.SoundRecordingStartedEvent: on recording start platypush.message.event.sound.SoundRecordingStoppedEvent: on recording stop install: pip: - sounddevice - - soundfile - numpy apt: - ffmpeg diff --git a/setup.py b/setup.py index 3bc505f92..25829bb34 100755 --- a/setup.py +++ b/setup.py @@ -154,7 +154,7 @@ setup( # Support for Chromecast plugin 'chromecast': ['pychromecast'], # Support for sound devices - 'sound': ['sounddevice', 'soundfile', 'numpy'], + 'sound': ['sounddevice', 'numpy'], # Support for web media subtitles 'subtitles': [ 'webvtt-py',