New architecture for the assistant speech detection logic.

The assistant object now runs in its own thread and leverages an
external `SpeechProcessor` that uses two threads to scan for both
intents and speech in parallel on audio frames.
This commit is contained in:
Fabio Manganiello 2024-04-20 17:24:03 +02:00
parent 6f8816d23d
commit 632d98703b
13 changed files with 704 additions and 106 deletions

View file

@ -107,13 +107,17 @@ class ResponseEndEvent(ConversationEndEvent):
Event triggered when a response has been rendered on the assistant. Event triggered when a response has been rendered on the assistant.
""" """
def __init__(self, *args, with_follow_on_turn: bool = False, **kwargs): def __init__(
self, *args, response_text: str, with_follow_on_turn: bool = False, **kwargs
):
""" """
:param response_text: Response text rendered on the assistant.
:param with_follow_on_turn: Set to true if the conversation expects a :param with_follow_on_turn: Set to true if the conversation expects a
user follow-up, false otherwise. user follow-up, false otherwise.
""" """
super().__init__( super().__init__(
*args, *args,
response_text=response_text,
with_follow_on_turn=with_follow_on_turn, with_follow_on_turn=with_follow_on_turn,
**kwargs, **kwargs,
) )

View file

@ -244,7 +244,7 @@ class AssistantPlugin(Plugin, AssistantEntityManager, ABC):
def _on_response_render_end(self): def _on_response_render_end(self):
from platypush.message.event.assistant import ResponseEndEvent from platypush.message.event.assistant import ResponseEndEvent
self._send_event(ResponseEndEvent) self._send_event(ResponseEndEvent, response_text=self._last_response)
def _on_hotword_detected(self, hotword: Optional[str]): def _on_hotword_detected(self, hotword: Optional[str]):
from platypush.message.event.assistant import HotwordDetectedEvent from platypush.message.event.assistant import HotwordDetectedEvent

View file

@ -216,6 +216,7 @@ class AssistantPicovoicePlugin(AssistantPlugin, RunnablePlugin):
'on_conversation_end': self._on_conversation_end, 'on_conversation_end': self._on_conversation_end,
'on_conversation_timeout': self._on_conversation_timeout, 'on_conversation_timeout': self._on_conversation_timeout,
'on_speech_recognized': self._on_speech_recognized, 'on_speech_recognized': self._on_speech_recognized,
'on_intent_matched': self._on_intent_matched,
'on_hotword_detected': self._on_hotword_detected, 'on_hotword_detected': self._on_hotword_detected,
} }

View file

@ -1,28 +1,28 @@
import logging import logging
import os import os
from threading import Event, RLock from queue import Full, Queue
from threading import Event, RLock, Thread
from time import time from time import time
from typing import Any, Dict, Optional, Sequence from typing import Any, Dict, Optional, Sequence
import pvcheetah
import pvleopard
import pvporcupine import pvporcupine
import pvrhino
from platypush.context import get_plugin from platypush.context import get_plugin
from platypush.message.event.assistant import ( from platypush.message.event.assistant import (
AssistantEvent,
ConversationTimeoutEvent, ConversationTimeoutEvent,
HotwordDetectedEvent, HotwordDetectedEvent,
IntentMatchedEvent,
SpeechRecognizedEvent, SpeechRecognizedEvent,
) )
from platypush.plugins.tts.picovoice import TtsPicovoicePlugin from platypush.plugins.tts.picovoice import TtsPicovoicePlugin
from ._context import ConversationContext
from ._recorder import AudioRecorder from ._recorder import AudioRecorder
from ._speech import SpeechProcessor
from ._state import AssistantState from ._state import AssistantState
class Assistant: class Assistant(Thread):
""" """
A facade class that wraps the Picovoice engines under an assistant API. A facade class that wraps the Picovoice engines under an assistant API.
""" """
@ -43,6 +43,7 @@ class Assistant:
keyword_model_path: Optional[str] = None, keyword_model_path: Optional[str] = None,
frame_expiration: float = 3.0, # Don't process audio frames older than this frame_expiration: float = 3.0, # Don't process audio frames older than this
speech_model_path: Optional[str] = None, speech_model_path: Optional[str] = None,
intent_model_path: Optional[str] = None,
endpoint_duration: Optional[float] = None, endpoint_duration: Optional[float] = None,
enable_automatic_punctuation: bool = False, enable_automatic_punctuation: bool = False,
start_conversation_on_hotword: bool = False, start_conversation_on_hotword: bool = False,
@ -53,8 +54,13 @@ class Assistant:
on_conversation_end=_default_callback, on_conversation_end=_default_callback,
on_conversation_timeout=_default_callback, on_conversation_timeout=_default_callback,
on_speech_recognized=_default_callback, on_speech_recognized=_default_callback,
on_intent_matched=_default_callback,
on_hotword_detected=_default_callback, on_hotword_detected=_default_callback,
): ):
super().__init__(name='picovoice:Assistant')
if intent_enabled:
assert intent_model_path, 'Intent model path not provided'
self._access_key = access_key self._access_key = access_key
self._stop_event = stop_event self._stop_event = stop_event
self.logger = logging.getLogger(__name__) self.logger = logging.getLogger(__name__)
@ -64,26 +70,40 @@ class Assistant:
self.keywords = list(keywords or []) self.keywords = list(keywords or [])
self.keyword_paths = None self.keyword_paths = None
self.keyword_model_path = None self.keyword_model_path = None
self._responding = Event()
self.frame_expiration = frame_expiration self.frame_expiration = frame_expiration
self.endpoint_duration = endpoint_duration self.endpoint_duration = endpoint_duration
self.enable_automatic_punctuation = enable_automatic_punctuation self.enable_automatic_punctuation = enable_automatic_punctuation
self.start_conversation_on_hotword = start_conversation_on_hotword self.start_conversation_on_hotword = start_conversation_on_hotword
self.audio_queue_size = audio_queue_size self.audio_queue_size = audio_queue_size
self._responding = Event()
self._muted = muted self._muted = muted
self._speech_model_path = speech_model_path self._speech_model_path = speech_model_path
self._speech_model_path_override = None self._speech_model_path_override = None
self._intent_model_path = intent_model_path
self._intent_model_path_override = None
self._in_ctx = False
self._speech_processor = SpeechProcessor(
stop_event=stop_event,
stt_enabled=stt_enabled,
intent_enabled=intent_enabled,
conversation_timeout=conversation_timeout,
model_path=speech_model_path,
get_cheetah_args=self._get_speech_engine_args,
get_rhino_args=self._get_speech_engine_args,
)
self._on_conversation_start = on_conversation_start self._on_conversation_start = on_conversation_start
self._on_conversation_end = on_conversation_end self._on_conversation_end = on_conversation_end
self._on_conversation_timeout = on_conversation_timeout self._on_conversation_timeout = on_conversation_timeout
self._on_speech_recognized = on_speech_recognized self._on_speech_recognized = on_speech_recognized
self._on_intent_matched = on_intent_matched
self._on_hotword_detected = on_hotword_detected self._on_hotword_detected = on_hotword_detected
self._recorder = None self._recorder = None
self._state = AssistantState.IDLE self._state = AssistantState.IDLE
self._state_lock = RLock() self._state_lock = RLock()
self._ctx = ConversationContext(timeout=conversation_timeout) self._evt_queue = Queue(maxsize=100)
if hotword_enabled: if hotword_enabled:
if not keywords: if not keywords:
@ -110,11 +130,7 @@ class Assistant:
self.keyword_model_path = keyword_model_path self.keyword_model_path = keyword_model_path
# Model path -> model instance cache
self._cheetah = {}
self._leopard: Optional[pvleopard.Leopard] = None
self._porcupine: Optional[pvporcupine.Porcupine] = None self._porcupine: Optional[pvporcupine.Porcupine] = None
self._rhino: Optional[pvrhino.Rhino] = None
@property @property
def is_responding(self): def is_responding(self):
@ -124,6 +140,10 @@ class Assistant:
def speech_model_path(self): def speech_model_path(self):
return self._speech_model_path_override or self._speech_model_path return self._speech_model_path_override or self._speech_model_path
@property
def intent_model_path(self):
return self._intent_model_path_override or self._intent_model_path
@property @property
def tts(self) -> TtsPicovoicePlugin: def tts(self) -> TtsPicovoicePlugin:
p = get_plugin('tts.picovoice') p = get_plugin('tts.picovoice')
@ -157,18 +177,23 @@ class Assistant:
if prev_state == new_state: if prev_state == new_state:
return return
self.logger.info('Assistant state transition: %s -> %s', prev_state, new_state)
if prev_state == AssistantState.DETECTING_SPEECH: if prev_state == AssistantState.DETECTING_SPEECH:
self.tts.stop() self.tts.stop()
self._ctx.stop()
self._speech_model_path_override = None self._speech_model_path_override = None
self._intent_model_path_override = None
self._speech_processor.on_conversation_end()
self._on_conversation_end() self._on_conversation_end()
elif new_state == AssistantState.DETECTING_SPEECH: elif new_state == AssistantState.DETECTING_SPEECH:
self._ctx.start() self._speech_processor.on_conversation_start()
self._on_conversation_start() self._on_conversation_start()
if new_state == AssistantState.DETECTING_HOTWORD: if new_state == AssistantState.DETECTING_HOTWORD:
self.tts.stop() self.tts.stop()
self._ctx.reset() self._speech_processor.on_conversation_reset()
# Put a null event on the event queue to unblock next_event
self._evt_queue.put(None)
@property @property
def porcupine(self) -> Optional[pvporcupine.Porcupine]: def porcupine(self) -> Optional[pvporcupine.Porcupine]:
@ -188,23 +213,16 @@ class Assistant:
return self._porcupine return self._porcupine
@property def _get_speech_engine_args(self) -> dict:
def cheetah(self) -> Optional[pvcheetah.Cheetah]: args: Dict[str, Any] = {'access_key': self._access_key}
if not self.stt_enabled: if self.speech_model_path:
return None args['model_path'] = self.speech_model_path
if self.endpoint_duration:
args['endpoint_duration_sec'] = self.endpoint_duration
if self.enable_automatic_punctuation:
args['enable_automatic_punctuation'] = self.enable_automatic_punctuation
if not self._cheetah.get(self.speech_model_path): return args
args: Dict[str, Any] = {'access_key': self._access_key}
if self.speech_model_path:
args['model_path'] = self.speech_model_path
if self.endpoint_duration:
args['endpoint_duration_sec'] = self.endpoint_duration
if self.enable_automatic_punctuation:
args['enable_automatic_punctuation'] = self.enable_automatic_punctuation
self._cheetah[self.speech_model_path] = pvcheetah.create(**args)
return self._cheetah[self.speech_model_path]
def __enter__(self): def __enter__(self):
""" """
@ -213,11 +231,14 @@ class Assistant:
if self.should_stop(): if self.should_stop():
return self return self
assert not self.is_alive(), 'The assistant is already running'
self._in_ctx = True
if self._recorder: if self._recorder:
self.logger.info('A recording stream already exists') self.logger.info('A recording stream already exists')
elif self.hotword_enabled or self.stt_enabled: elif self.hotword_enabled or self.stt_enabled or self.intent_enabled:
sample_rate = (self.porcupine or self.cheetah).sample_rate # type: ignore sample_rate = (self.porcupine or self._speech_processor).sample_rate
frame_length = (self.porcupine or self.cheetah).frame_length # type: ignore frame_length = (self.porcupine or self._speech_processor).frame_length
self._recorder = AudioRecorder( self._recorder = AudioRecorder(
stop_event=self._stop_event, stop_event=self._stop_event,
sample_rate=sample_rate, sample_rate=sample_rate,
@ -227,9 +248,7 @@ class Assistant:
channels=1, channels=1,
) )
if self.stt_enabled: self._speech_processor.__enter__()
self._cheetah[self.speech_model_path] = self.cheetah
self._recorder.__enter__() self._recorder.__enter__()
if self.porcupine: if self.porcupine:
@ -237,33 +256,25 @@ class Assistant:
else: else:
self.state = AssistantState.DETECTING_SPEECH self.state = AssistantState.DETECTING_SPEECH
self.start()
return self return self
def __exit__(self, *_): def __exit__(self, *_):
""" """
Stop the assistant and release all resources. Stop the assistant and release all resources.
""" """
self._in_ctx = False
if self._recorder: if self._recorder:
self._recorder.__exit__(*_) self._recorder.__exit__(*_)
self._recorder = None self._recorder = None
self.state = AssistantState.IDLE self.state = AssistantState.IDLE
for model in [*self._cheetah.keys()]:
cheetah = self._cheetah.pop(model, None)
if cheetah:
cheetah.delete()
if self._leopard:
self._leopard.delete()
self._leopard = None
if self._porcupine: if self._porcupine:
self._porcupine.delete() self._porcupine.delete()
self._porcupine = None self._porcupine = None
if self._rhino: self._speech_processor.__exit__(*_)
self._rhino.delete()
self._rhino = None
def __iter__(self): def __iter__(self):
""" """
@ -275,29 +286,36 @@ class Assistant:
""" """
Process the next audio frame and return the corresponding event. Process the next audio frame and return the corresponding event.
""" """
has_data = False
if self.should_stop() or not self._recorder: if self.should_stop() or not self._recorder:
raise StopIteration raise StopIteration
while not (self.should_stop() or has_data): if self.hotword_enabled and self.state == AssistantState.DETECTING_HOTWORD:
data = self._recorder.read() return self._evt_queue.get()
if data is None:
continue
frame, t = data evt = None
if time() - t > self.frame_expiration: if (
self.logger.info( self._speech_processor.enabled
'Skipping audio frame older than %ss', self.frame_expiration and self.state == AssistantState.DETECTING_SPEECH
) ):
continue # The audio frame is too old evt = self._speech_processor.next_event()
if self.hotword_enabled and self.state == AssistantState.DETECTING_HOTWORD: if isinstance(evt, SpeechRecognizedEvent):
return self._process_hotword(frame) self._on_speech_recognized(phrase=evt.args['phrase'])
if isinstance(evt, IntentMatchedEvent):
self._on_intent_matched(
intent=evt.args['intent'], slots=evt.args.get('slots', {})
)
if isinstance(evt, ConversationTimeoutEvent):
self._on_conversation_timeout()
if self.stt_enabled and self.state == AssistantState.DETECTING_SPEECH: if (
return self._process_speech(frame) evt
and self.state == AssistantState.DETECTING_SPEECH
and self.hotword_enabled
):
self.state = AssistantState.DETECTING_HOTWORD
raise StopIteration return evt
def mute(self): def mute(self):
self._muted = True self._muted = True
@ -321,7 +339,7 @@ class Assistant:
else: else:
self.mute() self.mute()
def _process_hotword(self, frame): def _process_hotword(self, frame) -> Optional[HotwordDetectedEvent]:
if not self.porcupine: if not self.porcupine:
return None return None
@ -333,48 +351,61 @@ class Assistant:
if self.start_conversation_on_hotword: if self.start_conversation_on_hotword:
self.state = AssistantState.DETECTING_SPEECH self.state = AssistantState.DETECTING_SPEECH
self.tts.stop() self.tts.stop() # Stop any ongoing TTS when the hotword is detected
self._on_hotword_detected(hotword=self.keywords[keyword_index]) self._on_hotword_detected(hotword=self.keywords[keyword_index])
return HotwordDetectedEvent(hotword=self.keywords[keyword_index]) return HotwordDetectedEvent(hotword=self.keywords[keyword_index])
return None return None
def _process_speech(self, frame):
if not self.cheetah:
return None
event = None
partial_transcript, self._ctx.is_final = self.cheetah.process(frame)
if partial_transcript:
self._ctx.transcript += partial_transcript
self.logger.info(
'Partial transcript: %s, is_final: %s',
self._ctx.transcript,
self._ctx.is_final,
)
if self._ctx.is_final or self._ctx.timed_out:
phrase = self.cheetah.flush() or ''
self._ctx.transcript += phrase
phrase = self._ctx.transcript
phrase = phrase[:1].lower() + phrase[1:]
if phrase:
event = SpeechRecognizedEvent(phrase=phrase)
self._on_speech_recognized(phrase=phrase)
else:
event = ConversationTimeoutEvent()
self._on_conversation_timeout()
self._ctx.reset()
if self.hotword_enabled:
self.state = AssistantState.DETECTING_HOTWORD
return event
def override_speech_model(self, model_path: Optional[str]): def override_speech_model(self, model_path: Optional[str]):
self._speech_model_path_override = model_path self._speech_model_path_override = model_path
def override_intent_model(self, model_path: Optional[str]):
self._intent_model_path_override = model_path
def _put_event(self, evt: AssistantEvent):
try:
self._evt_queue.put_nowait(evt)
except Full:
self.logger.warning('The assistant event queue is full')
def run(self):
assert (
self._in_ctx
), 'The assistant can only be started through a context manager'
super().run()
while not self.should_stop() and self._recorder:
self._recorder.wait_start()
if self.should_stop():
break
data = self._recorder.read()
if data is None:
continue
frame, t = data
if time() - t > self.frame_expiration:
self.logger.info(
'Skipping audio frame older than %ss', self.frame_expiration
)
continue # The audio frame is too old
if self.hotword_enabled and self.state == AssistantState.DETECTING_HOTWORD:
evt = self._process_hotword(frame)
if evt:
self._put_event(evt)
continue
if (
self._speech_processor.enabled
and self.state == AssistantState.DETECTING_SPEECH
):
self._speech_processor.process(frame, block=False)
self.logger.info('Assistant stopped')
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et:

View file

@ -2,6 +2,8 @@ from dataclasses import dataclass
from time import time from time import time
from typing import Optional from typing import Optional
from ._intent import Intent
@dataclass @dataclass
class ConversationContext: class ConversationContext:
@ -11,6 +13,7 @@ class ConversationContext:
transcript: str = '' transcript: str = ''
is_final: bool = False is_final: bool = False
intent: Optional[Intent] = None
timeout: Optional[float] = None timeout: Optional[float] = None
t_start: Optional[float] = None t_start: Optional[float] = None
t_end: Optional[float] = None t_end: Optional[float] = None
@ -25,6 +28,7 @@ class ConversationContext:
def reset(self): def reset(self):
self.transcript = '' self.transcript = ''
self.intent = None
self.is_final = False self.is_final = False
self.t_start = None self.t_start = None
self.t_end = None self.t_end = None
@ -32,14 +36,18 @@ class ConversationContext:
@property @property
def timed_out(self): def timed_out(self):
return ( return (
not self.transcript (
and not self.is_final (not self.transcript and not self.is_final)
or (not self.intent and not self.is_final)
)
and self.timeout and self.timeout
and self.t_start and self.t_start
and time() - self.t_start > self.timeout and time() - self.t_start > self.timeout
) or ( ) or (
self.transcript (
and not self.is_final (self.transcript and not self.is_final)
or (self.intent and not self.is_final)
)
and self.timeout and self.timeout
and self.t_start and self.t_start
and time() - self.t_start > self.timeout * 2 and time() - self.t_start > self.timeout * 2

View file

@ -0,0 +1,11 @@
from dataclasses import dataclass, field
@dataclass
class Intent:
"""
Speech intent data class.
"""
name: str
slots: dict = field(default_factory=dict)

View file

@ -178,3 +178,14 @@ class AudioRecorder:
Wait until the audio stream is stopped. Wait until the audio stream is stopped.
""" """
wait_for_either(self._stop_event, self._upstream_stop_event, timeout=timeout) wait_for_either(self._stop_event, self._upstream_stop_event, timeout=timeout)
def wait_start(self, timeout: Optional[float] = None):
"""
Wait until the audio stream is started.
"""
wait_for_either(
self._stop_event,
self._upstream_stop_event,
self._paused_state._recording_event,
timeout=timeout,
)

View file

@ -0,0 +1,3 @@
from ._processor import SpeechProcessor
__all__ = ['SpeechProcessor']

View file

@ -0,0 +1,152 @@
import logging
from abc import ABC, abstractmethod
from queue import Empty, Queue
from threading import Event, Thread, get_ident
from typing import Optional, Sequence
from platypush.message.event.assistant import AssistantEvent
from .._context import ConversationContext
class BaseProcessor(ABC, Thread):
"""
Base speech processor class. It is implemented by the ``SttProcessor`` and
the ``IntentProcessor`` classes.
"""
def __init__(
self,
*args,
stop_event: Event,
conversation_timeout: Optional[float] = None,
**kwargs,
):
super().__init__(*args, name=f'picovoice:{self.__class__.__name__}', **kwargs)
self.logger = logging.getLogger(self.name)
self._audio_queue = Queue()
self._stop_event = stop_event
self._ctx = ConversationContext(timeout=conversation_timeout)
self._event_queue = Queue()
# This event is set if the upstream processor is waiting for an event
# from this processor
self._event_wait = Event()
# This event is set when the processor is done with the audio
# processing and it's ready to accept a new audio frame
self._processing_done = Event()
self._processing_done.set()
def should_stop(self) -> bool:
return self._stop_event.is_set()
def wait_stop(self, timeout: Optional[float] = None) -> bool:
return self._stop_event.wait(timeout)
def enqueue(self, audio: Sequence[int]):
self._event_wait.set()
self._processing_done.clear()
self._audio_queue.put_nowait(audio)
@property
def processing_done(self) -> Event:
return self._processing_done
@property
@abstractmethod
def _model_path(self) -> Optional[str]:
"""
Return the model path.
"""
@property
@abstractmethod
def sample_rate(self) -> int:
"""
:return: The sample rate wanted by Cheetah/Rhino.
"""
@property
@abstractmethod
def frame_length(self) -> int:
"""
:return: The frame length wanted by Cheetah/Rhino.
"""
def last_event(self) -> Optional[AssistantEvent]:
"""
:return: The latest event that was processed by the processor.
"""
evt = None
try:
while True:
evt = self._event_queue.get_nowait()
except Empty:
pass
if evt:
self._event_wait.clear()
return evt
def clear_wait(self):
self._event_wait.clear()
@abstractmethod
def process(self, audio: Sequence[int]) -> Optional[AssistantEvent]:
"""
Process speech events from a raw audio input.
"""
def run(self):
super().run()
self._ctx.reset()
self._processing_done.clear()
self.logger.info('Processor started: %s', self.name)
while not self.should_stop():
audio = self._audio_queue.get()
# The thread is stopped when it receives a None object
if audio is None:
break
# Don't process the audio if the upstream processor is not waiting
# for an event
if not self._event_wait.is_set():
continue
try:
self._processing_done.clear()
event = self.process(audio)
if event:
self._event_queue.put_nowait(event)
self._processing_done.set()
except Exception as e:
self.logger.error(
'An error occurred while processing the audio on %s: %s',
self.name,
e,
exc_info=e,
)
self.wait_stop(timeout=1)
self._processing_done.set()
continue
self._ctx.reset()
self.logger.info('Processor stopped: %s', self.name)
def stop(self):
self._audio_queue.put_nowait(None)
if self.is_alive() and self.ident != get_ident():
self.logger.debug('Stopping %s', self.name)
self.join()
def on_conversation_start(self):
self._ctx.start()
def on_conversation_end(self):
self._ctx.stop()
def on_conversation_reset(self):
self._ctx.reset()

View file

@ -0,0 +1,86 @@
from typing import Callable, Optional, Sequence, Union
import pvrhino
from platypush.message.event.assistant import (
ConversationTimeoutEvent,
IntentMatchedEvent,
)
from ._base import BaseProcessor
class IntentProcessor(BaseProcessor):
"""
Implementation of the speech-to-intent processor using the Picovoice Rhino
engine.
"""
def __init__(
self, *args, get_rhino_args: Callable[[], dict] = lambda: {}, **kwargs
):
super().__init__(*args, **kwargs)
self._get_rhino_args = get_rhino_args
# model_path -> Rhino instance cache
self._rhino = {}
@property
def _model_path(self) -> Optional[str]:
return self._get_rhino_args().get('model_path')
@property
def sample_rate(self) -> int:
return self._get_rhino().sample_rate
@property
def frame_length(self) -> int:
return self._get_rhino().frame_length
def _get_rhino(self) -> pvrhino.Rhino:
if not self._rhino.get(self._model_path):
self._rhino[self._model_path] = pvrhino.create(**self._get_rhino_args())
return self._rhino[self._model_path]
def process(
self, audio: Sequence[int]
) -> Optional[Union[IntentMatchedEvent, ConversationTimeoutEvent]]:
"""
Process the audio and return an ``IntentMatchedEvent`` if the intent was
understood, or a ``ConversationTimeoutEvent`` if the conversation timed
out, or ``None`` if the intent processing is not yet finalized.
"""
event = None
rhino = self._get_rhino()
self._ctx.is_final = rhino.process(audio)
if self._ctx.is_final:
inference = rhino.get_inference()
self.logger.debug(
'Intent detection finalized. Inference understood: %s',
inference.is_understood,
)
if inference.is_understood:
event = IntentMatchedEvent(
intent=inference.intent,
slots={slot.key: slot.value for slot in inference.slots},
)
if not event and self._ctx.timed_out:
event = ConversationTimeoutEvent()
if event:
self._ctx.reset()
if event:
self.logger.debug('Intent event: %s', event)
return event
def stop(self):
super().stop()
objs = self._rhino.copy()
for key, obj in objs.items():
obj.delete()
self._rhino.pop(key)

View file

@ -0,0 +1,196 @@
import logging
from queue import Queue
from threading import Event
from typing import Callable, Optional, Sequence
from platypush.message.event.assistant import AssistantEvent
from platypush.utils import wait_for_either
from ._intent import IntentProcessor
from ._stt import SttProcessor
class SpeechProcessor:
"""
Speech processor class that wraps the STT and Intent processors under the
same interface.
"""
def __init__(
self,
stop_event: Event,
model_path: Optional[str] = None,
stt_enabled: bool = True,
intent_enabled: bool = False,
conversation_timeout: Optional[float] = None,
get_cheetah_args: Callable[[], dict] = lambda: {},
get_rhino_args: Callable[[], dict] = lambda: {},
):
self.logger = logging.getLogger(self.__class__.__name__)
self._stt_enabled = stt_enabled
self._intent_enabled = intent_enabled
self._model_path = model_path
self._conversation_timeout = conversation_timeout
self._audio_queue = Queue()
self._stop_event = stop_event
self._get_cheetah_args = get_cheetah_args
self._get_rhino_args = get_rhino_args
self._stt_processor = SttProcessor(
conversation_timeout=conversation_timeout,
stop_event=stop_event,
get_cheetah_args=get_cheetah_args,
)
self._intent_processor = IntentProcessor(
conversation_timeout=conversation_timeout,
stop_event=stop_event,
get_rhino_args=get_rhino_args,
)
@property
def enabled(self) -> bool:
"""
The processor is enabled if either the STT or the Intent processor are
enabled.
"""
return self._stt_enabled or self._intent_enabled
def should_stop(self) -> bool:
return self._stop_event.is_set()
def next_event(self, timeout: Optional[float] = None) -> Optional[AssistantEvent]:
evt = None
# Wait for either the STT or Intent processor to finish processing the audio
completed = wait_for_either(
self._stt_processor.processing_done,
self._intent_processor.processing_done,
self._stop_event,
timeout=timeout,
)
if not completed:
self.logger.warning('Timeout while waiting for the processors to finish')
# Immediately return if the stop event is set
if self.should_stop():
return evt
# Priority to the intent processor event, if the processor is enabled
if self._intent_enabled:
evt = self._intent_processor.last_event()
if evt:
self.logger.debug('Intent processor event: %s', evt)
# If the intent processor didn't return any event, then return the STT
# processor event
if not evt and self._stt_enabled:
evt = self._stt_processor.last_event()
if evt:
self.logger.debug('STT processor event: %s', evt)
if evt:
self._stt_processor.clear_wait()
self._intent_processor.clear_wait()
return evt
def process(
self, audio: Sequence[int], block: bool = True, timeout: Optional[float] = None
) -> Optional[AssistantEvent]:
"""
Process an audio frame.
The audio frame is enqueued to both the STT and Intent processors, if
enabled. The function waits for either processor to finish processing
the audio, and returns the event from the first processor that returns
a result.
Priority is given to the Intent processor if enabled, otherwise the STT
processor is used.
"""
# Enqueue the audio to both the STT and Intent processors if enabled
if self._stt_enabled:
self._stt_processor.enqueue(audio)
if self._intent_enabled:
self._intent_processor.enqueue(audio)
if not block:
return None
return self.next_event(timeout=timeout)
def __enter__(self):
"""
Context manager entry point - it wraps :meth:`start`.
"""
self.start()
def __exit__(self, *_, **__):
"""
Context manager exit point - it wraps :meth:`stop`.
"""
self.stop()
def start(self):
"""
Start the STT and Intent processors.
"""
self._stt_processor.start()
self._intent_processor.start()
def stop(self):
"""
Stop the STT and Intent processors.
"""
self._stt_processor.stop()
self._intent_processor.stop()
def on_conversation_start(self):
if self._stt_enabled:
self._stt_processor.on_conversation_start()
if self._intent_enabled:
self._intent_processor.on_conversation_start()
def on_conversation_end(self):
if self._stt_enabled:
self._stt_processor.on_conversation_end()
if self._intent_enabled:
self._intent_processor.on_conversation_end()
def on_conversation_reset(self):
if self._stt_enabled:
self._stt_processor.on_conversation_reset()
if self._intent_enabled:
self._intent_processor.on_conversation_reset()
@property
def sample_rate(self) -> int:
"""
The sample rate of the audio frames.
"""
if self._intent_enabled:
return self._intent_processor.sample_rate
if self._stt_enabled:
return self._stt_processor.sample_rate
raise ValueError('No processor enabled')
@property
def frame_length(self) -> int:
"""
The frame length of the audio frames.
"""
if self._intent_enabled:
return self._intent_processor.frame_length
if self._stt_enabled:
return self._stt_processor.frame_length
raise ValueError('No processor enabled')

View file

@ -0,0 +1,92 @@
from typing import Callable, Optional, Sequence, Union
import pvcheetah
from platypush.message.event.assistant import (
ConversationTimeoutEvent,
SpeechRecognizedEvent,
)
from ._base import BaseProcessor
class SttProcessor(BaseProcessor):
"""
Implementation of the speech-to-text processor using the Picovoice Cheetah
engine.
"""
def __init__(
self, *args, get_cheetah_args: Callable[[], dict] = lambda: {}, **kwargs
):
super().__init__(*args, **kwargs)
self._get_cheetah_args = get_cheetah_args
# model_path -> Cheetah instance cache
self._cheetah = {self._model_path: pvcheetah.create(**self._get_cheetah_args())}
@property
def _model_path(self) -> Optional[str]:
return self._get_cheetah_args().get('model_path')
@property
def sample_rate(self) -> int:
return self._get_cheetah().sample_rate
@property
def frame_length(self) -> int:
return self._get_cheetah().frame_length
def _get_cheetah(self) -> pvcheetah.Cheetah:
if not self._cheetah.get(self._model_path):
self.logger.debug(
'Creating Cheetah instance for model %s', self._model_path
)
self._cheetah[self._model_path] = pvcheetah.create(
**self._get_cheetah_args()
)
self.logger.debug('Cheetah instance created for model %s', self._model_path)
return self._cheetah[self._model_path]
def process(
self, audio: Sequence[int]
) -> Optional[Union[SpeechRecognizedEvent, ConversationTimeoutEvent]]:
event = None
cheetah = self._get_cheetah()
partial_transcript, self._ctx.is_final = cheetah.process(audio)
# Concatenate the partial transcript to the context
if partial_transcript:
self._ctx.transcript += partial_transcript
self.logger.info(
'Partial transcript: %s, is_final: %s',
self._ctx.transcript,
self._ctx.is_final,
)
# If the transcript is final or the conversation timed out, then
# process and return whatever is available in the context
if self._ctx.is_final or self._ctx.timed_out:
phrase = cheetah.flush() or ''
self._ctx.transcript += phrase
phrase = self._ctx.transcript
phrase = phrase[:1].lower() + phrase[1:]
event = (
SpeechRecognizedEvent(phrase=phrase)
if phrase
else ConversationTimeoutEvent()
)
self._ctx.reset()
if event:
self.logger.debug('STT event: %s', event)
return event
def stop(self):
super().stop()
objs = self._cheetah.copy()
for key, obj in objs.items():
obj.delete()
self._cheetah.pop(key)

View file

@ -6,9 +6,11 @@ manifest:
- platypush.message.event.assistant.ConversationStartEvent - platypush.message.event.assistant.ConversationStartEvent
- platypush.message.event.assistant.ConversationTimeoutEvent - platypush.message.event.assistant.ConversationTimeoutEvent
- platypush.message.event.assistant.HotwordDetectedEvent - platypush.message.event.assistant.HotwordDetectedEvent
- platypush.message.event.assistant.IntentMatchedEvent
- platypush.message.event.assistant.MicMutedEvent - platypush.message.event.assistant.MicMutedEvent
- platypush.message.event.assistant.MicUnmutedEvent - platypush.message.event.assistant.MicUnmutedEvent
- platypush.message.event.assistant.NoResponseEvent - platypush.message.event.assistant.NoResponseEvent
- platypush.message.event.assistant.ResponseEndEvent
- platypush.message.event.assistant.ResponseEvent - platypush.message.event.assistant.ResponseEvent
- platypush.message.event.assistant.SpeechRecognizedEvent - platypush.message.event.assistant.SpeechRecognizedEvent
install: install:
@ -22,6 +24,7 @@ manifest:
- ffmpeg - ffmpeg
- python-sounddevice - python-sounddevice
pip: pip:
- num2words # Temporary dependency
- pvcheetah - pvcheetah
- pvleopard - pvleopard
- pvorca - pvorca