- Added frames throttling support on Leap Motion backend

- Moved the Leap listener to another process to prevent it from hanging the other backends
- Added support for sound and mix frequency analysis
This commit is contained in:
Fabio Manganiello 2018-12-27 20:24:51 +01:00
parent 2e82651dbf
commit 54c3381ae3
3 changed files with 153 additions and 22 deletions

View File

@ -1,5 +1,8 @@
import time import time
from threading import Timer
from multiprocessing import Process
import Leap import Leap
from platypush.backend import Backend from platypush.backend import Backend
@ -21,6 +24,12 @@ class SensorLeapBackend(Backend):
instructions at https://www.leapmotion.com/setup/) and the `leapd` daemon instructions at https://www.leapmotion.com/setup/) and the `leapd` daemon
running to recognize your controller. running to recognize your controller.
Requires:
* The Redis backend enabled
* The Leap Motion SDK compiled with Python 3 support, see my port at https://github.com:BlackLight/leap-sdk-python3.git
* The `leapd` daemon to be running and your Leap Motion connected
Triggers: Triggers:
* :class:`platypush.message.event.sensor.leap.LeapFrameEvent` when a new frame is received * :class:`platypush.message.event.sensor.leap.LeapFrameEvent` when a new frame is received
@ -30,6 +39,8 @@ class SensorLeapBackend(Backend):
* :class:`platypush.message.event.sensor.leap.LeapDisconnectEvent` when a Leap Motion device disconnects * :class:`platypush.message.event.sensor.leap.LeapDisconnectEvent` when a Leap Motion device disconnects
""" """
_listener_proc = None
def __init__(self, def __init__(self,
position_ranges=[ position_ranges=[
[-300.0, 300.0], # x axis [-300.0, 300.0], # x axis
@ -37,6 +48,7 @@ class SensorLeapBackend(Backend):
[-300.0, 300.0], # z axis [-300.0, 300.0], # z axis
], ],
position_tolerance=0.0, # Position variation tolerance in % position_tolerance=0.0, # Position variation tolerance in %
frames_throttle_secs=None,
*args, **kwargs): *args, **kwargs):
""" """
:param position_ranges: It specifies how wide the hand space (x, y and z axes) should be in millimiters. :param position_ranges: It specifies how wide the hand space (x, y and z axes) should be in millimiters.
@ -53,46 +65,76 @@ class SensorLeapBackend(Backend):
:param position_tolerance: % of change between a frame and the next to really consider the next frame as a new one (default: 0) :param position_tolerance: % of change between a frame and the next to really consider the next frame as a new one (default: 0)
:type position_tolerance: float :type position_tolerance: float
:param frames_throttle_secs: If set, the frame events will be throttled
and pushed to the main queue at the specified rate. Good to set if
you want to connect Leap Motion events to actions that have a lower
throughput (the Leap Motion can send a lot of frames per second).
Default: None (no throttling)
:type frames_throttle_secs: float
""" """
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self.position_ranges = position_ranges self.position_ranges = position_ranges
self.position_tolerance = position_tolerance self.position_tolerance = position_tolerance
self.frames_throttle_secs = frames_throttle_secs
def run(self): def run(self):
super().run() super().run()
listener = LeapListener(position_ranges=self.position_ranges, def _listener_process():
position_tolerance=self.position_tolerance) listener = LeapListener(position_ranges=self.position_ranges,
position_tolerance=self.position_tolerance,
frames_throttle_secs=self.frames_throttle_secs,
logger=self.logger)
controller = Leap.Controller() controller = Leap.Controller()
if not controller: if not controller:
raise RuntimeError('No Leap Motion controller found - is your ' + raise RuntimeError('No Leap Motion controller found - is your ' +
'device connected and is leapd running?') 'device connected and is leapd running?')
controller.add_listener(listener) controller.add_listener(listener)
self.logger.info('Leap Motion backend initialized') self.logger.info('Leap Motion backend initialized')
try:
while not self.should_stop(): while not self.should_stop():
time.sleep(0.1) time.sleep(0.1)
finally:
controller.remove_listener(listener)
time.sleep(1)
self._listener_proc = Process(target=_listener_process)
self._listener_proc.start()
self._listener_proc.join()
class LeapFuture(Timer):
def __init__(self, seconds, listener, event):
self.listener = listener
self.event = event
super().__init__(seconds, self._callback_wrapper())
def _callback_wrapper(self):
def _callback():
self.listener._send_event(self.event)
return _callback
class LeapListener(Leap.Listener): class LeapListener(Leap.Listener):
def __init__(self, position_ranges, position_tolerance, *args, **kwargs): def __init__(self, position_ranges, position_tolerance, logger,
frames_throttle_secs=None, *args, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self.prev_frame = None self.prev_frame = None
self.position_ranges = position_ranges self.position_ranges = position_ranges
self.position_tolerance = position_tolerance self.position_tolerance = position_tolerance
self.frames_throttle_secs = frames_throttle_secs
self.logger = logger
self.running_future = None
def send_event(self, event): def _send_event(self, event):
backend = get_backend('redis') backend = get_backend('redis')
if not backend: if not backend:
self.logger.warning('Redis backend not configured, I cannot propagate the following event: {}'.format(event)) self.logger.warning('Redis backend not configured, I cannot propagate the following event: {}'.format(event))
@ -101,6 +143,16 @@ class LeapListener(Leap.Listener):
backend.send_message(event) backend.send_message(event)
def send_event(self, event):
if self.frames_throttle_secs:
if not self.running_future or not self.running_future.is_alive():
self.running_future = LeapFuture(seconds=self.frames_throttle_secs,
listener=self, event=event)
self.running_future.start()
else:
self._send_event(event)
def on_init(self, controller): def on_init(self, controller):
self.prev_frame = None self.prev_frame = None
self.logger.info('Leap controller listener initialized') self.logger.info('Leap controller listener initialized')

View File

@ -155,7 +155,7 @@ class SoundPlugin(Plugin):
data = q.get_nowait() data = q.get_nowait()
except queue.Empty: except queue.Empty:
self.logger.warning('Buffer is empty: increase buffersize?') self.logger.warning('Buffer is empty: increase buffersize?')
return raise sd.CallbackAbort
if len(data) < len(outdata): if len(data) < len(outdata):
outdata[:len(data)] = data outdata[:len(data)] = data
@ -647,9 +647,12 @@ class SoundPlugin(Plugin):
for i, event in completed_callback_events.items(): for i, event in completed_callback_events.items():
event.wait() event.wait()
del self.completed_callback_events[i] if i in self.completed_callback_events:
del self.active_streams[i] del self.completed_callback_events[i]
del self.stream_mixes[i] if i in self.active_streams:
del self.active_streams[i]
if i in self.stream_mixes:
del self.stream_mixes[i]
self.logger.info('Playback stopped on streams [{}]'.format( self.logger.info('Playback stopped on streams [{}]'.format(
', '.join([str(stream) for stream in ', '.join([str(stream) for stream in
@ -730,10 +733,6 @@ class SoundPlugin(Plugin):
:type frequency: float :type frequency: float
""" """
if sound_index is None and midi_note is None and frequency is None:
raise RuntimeError('Please specify either a sound index, ' +
'midi_note or frequency to release')
mixes = { mixes = {
i: mix for i, mix in self.stream_mixes.items() i: mix for i, mix in self.stream_mixes.items()
} if stream_index is None else { } if stream_index is None else {

View File

@ -131,7 +131,8 @@ class Sound(object):
:param samplerate: Audio sample rate. Default: 44100 Hz :param samplerate: Audio sample rate. Default: 44100 Hz
:type samplerate: int :type samplerate: int
:returns: A numpy.ndarray[n,1] with the raw float values :returns: A ``numpy.ndarray[(t_end-t_start)*samplerate, 1]``
with the raw float values
""" """
import numpy as np import numpy as np
@ -156,6 +157,45 @@ class Sound(object):
return self.gain * wave return self.gain * wave
def fft(self, t_start=0., t_end=0., samplerate=_DEFAULT_SAMPLERATE,
freq_range=None, freq_buckets=None):
"""
Get the Fourier transform associated to a time-bounded sample of this
sound
:param t_start: Start offset for the wave in seconds. Default: 0
:type t_start: float
:param t_end: End offset for the wave in seconds. Default: 0
:type t_end: float
:param samplerate: Audio sample rate. Default: 44100 Hz
:type samplerate: int
:param freq_range: FFT frequency range. Default: ``(0, samplerate/2)``
(see `Nyquist-Shannon sampling theorem <https://en.wikipedia.org/wiki/Nyquist%E2%80%93Shannon_sampling_theorem>`_)
:type freq_range: list or tuple with 2 int elements (range)
:param freq_buckets: Number of buckets to subdivide the frequency range.
Default: None
:type freq_buckets: int
:returns: A numpy.ndarray[freq_range,1] with the raw float values
"""
import numpy as np
if not freq_range:
freq_range = (0, int(samplerate/2))
wave = self.get_wave(t_start=t_start, t_end=t_end, samplerate=samplerate)
fft = np.fft.fft(wave.reshape(len(wave)))[freq_range[0]:freq_range[1]]
if freq_buckets is not None:
fft = np.histogram(fft, bins=freq_buckets)
return fft
def __iter__(self): def __iter__(self):
for attr in ['midi_note', 'frequency', 'gain', 'duration']: for attr in ['midi_note', 'frequency', 'gain', 'duration']:
yield (attr, getattr(self, attr)) yield (attr, getattr(self, attr))
@ -278,6 +318,46 @@ class Mix(object):
return wave return wave
def fft(self, t_start=0., t_end=0., samplerate=Sound._DEFAULT_SAMPLERATE,
freq_range=None, freq_buckets=None):
"""
Get the Fourier transform associated to a time-bounded sample of this
mix
:param t_start: Start offset for the wave in seconds. Default: 0
:type t_start: float
:param t_end: End offset for the wave in seconds. Default: 0
:type t_end: float
:param samplerate: Audio sample rate. Default: 44100 Hz
:type samplerate: int
:param freq_range: FFT frequency range. Default: ``(0, samplerate/2)``
(see `Nyquist-Shannon sampling theorem <https://en.wikipedia.org/wiki/Nyquist%E2%80%93Shannon_sampling_theorem>`_)
:type freq_range: list or tuple with 2 int elements (range)
:param freq_buckets: Number of buckets to subdivide the frequency range.
Default: None
:type freq_buckets: int
:returns: A numpy.ndarray[freq_range,1] with the raw float values
"""
import numpy as np
if not freq_range:
freq_range = (0, int(samplerate/2))
wave = self.get_wave(t_start=t_start, t_end=t_end, samplerate=samplerate)
fft = np.fft.fft(wave.reshape(len(wave)))[freq_range[0]:freq_range[1]]
if freq_buckets is not None:
fft = np.histogram(fft, bins=freq_buckets)
return fft
def duration(self): def duration(self):
""" """
:returns: The duration of the mix in seconds as duration of its longest :returns: The duration of the mix in seconds as duration of its longest