From 54c3381ae38e7cd1abd5e65b04cebf51813e69a2 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Thu, 27 Dec 2018 20:24:51 +0100 Subject: [PATCH] - Added frames throttling support on Leap Motion backend - Moved the Leap listener to another process to prevent it from hanging the other backends - Added support for sound and mix frequency analysis --- platypush/backend/sensor/leap.py | 78 ++++++++++++++++++++++----- platypush/plugins/sound/__init__.py | 15 +++--- platypush/plugins/sound/core.py | 82 ++++++++++++++++++++++++++++- 3 files changed, 153 insertions(+), 22 deletions(-) diff --git a/platypush/backend/sensor/leap.py b/platypush/backend/sensor/leap.py index 8caed2b5b..9cab2f14d 100644 --- a/platypush/backend/sensor/leap.py +++ b/platypush/backend/sensor/leap.py @@ -1,5 +1,8 @@ import time +from threading import Timer +from multiprocessing import Process + import Leap from platypush.backend import Backend @@ -21,6 +24,12 @@ class SensorLeapBackend(Backend): instructions at https://www.leapmotion.com/setup/) and the `leapd` daemon running to recognize your controller. + Requires: + + * The Redis backend enabled + * The Leap Motion SDK compiled with Python 3 support, see my port at https://github.com:BlackLight/leap-sdk-python3.git + * The `leapd` daemon to be running and your Leap Motion connected + Triggers: * :class:`platypush.message.event.sensor.leap.LeapFrameEvent` when a new frame is received @@ -30,6 +39,8 @@ class SensorLeapBackend(Backend): * :class:`platypush.message.event.sensor.leap.LeapDisconnectEvent` when a Leap Motion device disconnects """ + _listener_proc = None + def __init__(self, position_ranges=[ [-300.0, 300.0], # x axis @@ -37,6 +48,7 @@ class SensorLeapBackend(Backend): [-300.0, 300.0], # z axis ], position_tolerance=0.0, # Position variation tolerance in % + frames_throttle_secs=None, *args, **kwargs): """ :param position_ranges: It specifies how wide the hand space (x, y and z axes) should be in millimiters. @@ -53,46 +65,76 @@ class SensorLeapBackend(Backend): :param position_tolerance: % of change between a frame and the next to really consider the next frame as a new one (default: 0) :type position_tolerance: float + + :param frames_throttle_secs: If set, the frame events will be throttled + and pushed to the main queue at the specified rate. Good to set if + you want to connect Leap Motion events to actions that have a lower + throughput (the Leap Motion can send a lot of frames per second). + Default: None (no throttling) + :type frames_throttle_secs: float """ super().__init__(*args, **kwargs) self.position_ranges = position_ranges self.position_tolerance = position_tolerance + self.frames_throttle_secs = frames_throttle_secs def run(self): super().run() - listener = LeapListener(position_ranges=self.position_ranges, - position_tolerance=self.position_tolerance) + def _listener_process(): + listener = LeapListener(position_ranges=self.position_ranges, + position_tolerance=self.position_tolerance, + frames_throttle_secs=self.frames_throttle_secs, + logger=self.logger) - controller = Leap.Controller() + controller = Leap.Controller() - if not controller: - raise RuntimeError('No Leap Motion controller found - is your ' + - 'device connected and is leapd running?') + if not controller: + raise RuntimeError('No Leap Motion controller found - is your ' + + 'device connected and is leapd running?') - controller.add_listener(listener) - self.logger.info('Leap Motion backend initialized') + controller.add_listener(listener) + self.logger.info('Leap Motion backend initialized') - try: while not self.should_stop(): time.sleep(0.1) - finally: - controller.remove_listener(listener) + time.sleep(1) + self._listener_proc = Process(target=_listener_process) + self._listener_proc.start() + self._listener_proc.join() + + +class LeapFuture(Timer): + def __init__(self, seconds, listener, event): + self.listener = listener + self.event = event + + super().__init__(seconds, self._callback_wrapper()) + + def _callback_wrapper(self): + def _callback(): + self.listener._send_event(self.event) + return _callback class LeapListener(Leap.Listener): - def __init__(self, position_ranges, position_tolerance, *args, **kwargs): + def __init__(self, position_ranges, position_tolerance, logger, + frames_throttle_secs=None, *args, **kwargs): super().__init__(*args, **kwargs) + self.prev_frame = None self.position_ranges = position_ranges self.position_tolerance = position_tolerance + self.frames_throttle_secs = frames_throttle_secs + self.logger = logger + self.running_future = None - def send_event(self, event): + def _send_event(self, event): backend = get_backend('redis') if not backend: self.logger.warning('Redis backend not configured, I cannot propagate the following event: {}'.format(event)) @@ -101,6 +143,16 @@ class LeapListener(Leap.Listener): backend.send_message(event) + def send_event(self, event): + if self.frames_throttle_secs: + if not self.running_future or not self.running_future.is_alive(): + self.running_future = LeapFuture(seconds=self.frames_throttle_secs, + listener=self, event=event) + self.running_future.start() + else: + self._send_event(event) + + def on_init(self, controller): self.prev_frame = None self.logger.info('Leap controller listener initialized') diff --git a/platypush/plugins/sound/__init__.py b/platypush/plugins/sound/__init__.py index 2a4423b25..fbb130fe5 100644 --- a/platypush/plugins/sound/__init__.py +++ b/platypush/plugins/sound/__init__.py @@ -155,7 +155,7 @@ class SoundPlugin(Plugin): data = q.get_nowait() except queue.Empty: self.logger.warning('Buffer is empty: increase buffersize?') - return + raise sd.CallbackAbort if len(data) < len(outdata): outdata[:len(data)] = data @@ -647,9 +647,12 @@ class SoundPlugin(Plugin): for i, event in completed_callback_events.items(): event.wait() - del self.completed_callback_events[i] - del self.active_streams[i] - del self.stream_mixes[i] + if i in self.completed_callback_events: + del self.completed_callback_events[i] + if i in self.active_streams: + del self.active_streams[i] + if i in self.stream_mixes: + del self.stream_mixes[i] self.logger.info('Playback stopped on streams [{}]'.format( ', '.join([str(stream) for stream in @@ -730,10 +733,6 @@ class SoundPlugin(Plugin): :type frequency: float """ - if sound_index is None and midi_note is None and frequency is None: - raise RuntimeError('Please specify either a sound index, ' + - 'midi_note or frequency to release') - mixes = { i: mix for i, mix in self.stream_mixes.items() } if stream_index is None else { diff --git a/platypush/plugins/sound/core.py b/platypush/plugins/sound/core.py index c3282e543..6018fc58f 100644 --- a/platypush/plugins/sound/core.py +++ b/platypush/plugins/sound/core.py @@ -131,7 +131,8 @@ class Sound(object): :param samplerate: Audio sample rate. Default: 44100 Hz :type samplerate: int - :returns: A numpy.ndarray[n,1] with the raw float values + :returns: A ``numpy.ndarray[(t_end-t_start)*samplerate, 1]`` + with the raw float values """ import numpy as np @@ -156,6 +157,45 @@ class Sound(object): return self.gain * wave + def fft(self, t_start=0., t_end=0., samplerate=_DEFAULT_SAMPLERATE, + freq_range=None, freq_buckets=None): + """ + Get the Fourier transform associated to a time-bounded sample of this + sound + + :param t_start: Start offset for the wave in seconds. Default: 0 + :type t_start: float + + :param t_end: End offset for the wave in seconds. Default: 0 + :type t_end: float + + :param samplerate: Audio sample rate. Default: 44100 Hz + :type samplerate: int + + :param freq_range: FFT frequency range. Default: ``(0, samplerate/2)`` + (see `Nyquist-Shannon sampling theorem `_) + :type freq_range: list or tuple with 2 int elements (range) + + :param freq_buckets: Number of buckets to subdivide the frequency range. + Default: None + :type freq_buckets: int + + :returns: A numpy.ndarray[freq_range,1] with the raw float values + """ + + import numpy as np + + if not freq_range: + freq_range = (0, int(samplerate/2)) + + wave = self.get_wave(t_start=t_start, t_end=t_end, samplerate=samplerate) + fft = np.fft.fft(wave.reshape(len(wave)))[freq_range[0]:freq_range[1]] + + if freq_buckets is not None: + fft = np.histogram(fft, bins=freq_buckets) + + return fft + def __iter__(self): for attr in ['midi_note', 'frequency', 'gain', 'duration']: yield (attr, getattr(self, attr)) @@ -278,6 +318,46 @@ class Mix(object): return wave + def fft(self, t_start=0., t_end=0., samplerate=Sound._DEFAULT_SAMPLERATE, + freq_range=None, freq_buckets=None): + """ + Get the Fourier transform associated to a time-bounded sample of this + mix + + :param t_start: Start offset for the wave in seconds. Default: 0 + :type t_start: float + + :param t_end: End offset for the wave in seconds. Default: 0 + :type t_end: float + + :param samplerate: Audio sample rate. Default: 44100 Hz + :type samplerate: int + + :param freq_range: FFT frequency range. Default: ``(0, samplerate/2)`` + (see `Nyquist-Shannon sampling theorem `_) + :type freq_range: list or tuple with 2 int elements (range) + + :param freq_buckets: Number of buckets to subdivide the frequency range. + Default: None + :type freq_buckets: int + + :returns: A numpy.ndarray[freq_range,1] with the raw float values + """ + + import numpy as np + + if not freq_range: + freq_range = (0, int(samplerate/2)) + + wave = self.get_wave(t_start=t_start, t_end=t_end, samplerate=samplerate) + fft = np.fft.fft(wave.reshape(len(wave)))[freq_range[0]:freq_range[1]] + + if freq_buckets is not None: + fft = np.histogram(fft, bins=freq_buckets) + + return fft + + def duration(self): """ :returns: The duration of the mix in seconds as duration of its longest