diff --git a/platypush/message/event/assistant/__init__.py b/platypush/message/event/assistant/__init__.py index 431f9971..22a066dc 100644 --- a/platypush/message/event/assistant/__init__.py +++ b/platypush/message/event/assistant/__init__.py @@ -107,13 +107,17 @@ class ResponseEndEvent(ConversationEndEvent): Event triggered when a response has been rendered on the assistant. """ - def __init__(self, *args, with_follow_on_turn: bool = False, **kwargs): + def __init__( + self, *args, response_text: str, with_follow_on_turn: bool = False, **kwargs + ): """ + :param response_text: Response text rendered on the assistant. :param with_follow_on_turn: Set to true if the conversation expects a user follow-up, false otherwise. """ super().__init__( *args, + response_text=response_text, with_follow_on_turn=with_follow_on_turn, **kwargs, ) diff --git a/platypush/plugins/assistant/__init__.py b/platypush/plugins/assistant/__init__.py index ad0cc106..89c6a0d8 100644 --- a/platypush/plugins/assistant/__init__.py +++ b/platypush/plugins/assistant/__init__.py @@ -244,7 +244,7 @@ class AssistantPlugin(Plugin, AssistantEntityManager, ABC): def _on_response_render_end(self): from platypush.message.event.assistant import ResponseEndEvent - self._send_event(ResponseEndEvent) + self._send_event(ResponseEndEvent, response_text=self._last_response) def _on_hotword_detected(self, hotword: Optional[str]): from platypush.message.event.assistant import HotwordDetectedEvent diff --git a/platypush/plugins/assistant/picovoice/__init__.py b/platypush/plugins/assistant/picovoice/__init__.py index 092a0e74..51d349fc 100644 --- a/platypush/plugins/assistant/picovoice/__init__.py +++ b/platypush/plugins/assistant/picovoice/__init__.py @@ -216,6 +216,7 @@ class AssistantPicovoicePlugin(AssistantPlugin, RunnablePlugin): 'on_conversation_end': self._on_conversation_end, 'on_conversation_timeout': self._on_conversation_timeout, 'on_speech_recognized': self._on_speech_recognized, + 'on_intent_matched': self._on_intent_matched, 'on_hotword_detected': self._on_hotword_detected, } diff --git a/platypush/plugins/assistant/picovoice/_assistant.py b/platypush/plugins/assistant/picovoice/_assistant.py index c3499860..c7e5f147 100644 --- a/platypush/plugins/assistant/picovoice/_assistant.py +++ b/platypush/plugins/assistant/picovoice/_assistant.py @@ -1,28 +1,28 @@ import logging import os -from threading import Event, RLock +from queue import Full, Queue +from threading import Event, RLock, Thread from time import time from typing import Any, Dict, Optional, Sequence -import pvcheetah -import pvleopard import pvporcupine -import pvrhino from platypush.context import get_plugin from platypush.message.event.assistant import ( + AssistantEvent, ConversationTimeoutEvent, HotwordDetectedEvent, + IntentMatchedEvent, SpeechRecognizedEvent, ) from platypush.plugins.tts.picovoice import TtsPicovoicePlugin -from ._context import ConversationContext from ._recorder import AudioRecorder +from ._speech import SpeechProcessor from ._state import AssistantState -class Assistant: +class Assistant(Thread): """ A facade class that wraps the Picovoice engines under an assistant API. """ @@ -43,6 +43,7 @@ class Assistant: keyword_model_path: Optional[str] = None, frame_expiration: float = 3.0, # Don't process audio frames older than this speech_model_path: Optional[str] = None, + intent_model_path: Optional[str] = None, endpoint_duration: Optional[float] = None, enable_automatic_punctuation: bool = False, start_conversation_on_hotword: bool = False, @@ -53,8 +54,13 @@ class Assistant: on_conversation_end=_default_callback, on_conversation_timeout=_default_callback, on_speech_recognized=_default_callback, + on_intent_matched=_default_callback, on_hotword_detected=_default_callback, ): + super().__init__(name='picovoice:Assistant') + if intent_enabled: + assert intent_model_path, 'Intent model path not provided' + self._access_key = access_key self._stop_event = stop_event self.logger = logging.getLogger(__name__) @@ -64,26 +70,40 @@ class Assistant: self.keywords = list(keywords or []) self.keyword_paths = None self.keyword_model_path = None - self._responding = Event() self.frame_expiration = frame_expiration self.endpoint_duration = endpoint_duration self.enable_automatic_punctuation = enable_automatic_punctuation self.start_conversation_on_hotword = start_conversation_on_hotword self.audio_queue_size = audio_queue_size + self._responding = Event() self._muted = muted self._speech_model_path = speech_model_path self._speech_model_path_override = None + self._intent_model_path = intent_model_path + self._intent_model_path_override = None + self._in_ctx = False + + self._speech_processor = SpeechProcessor( + stop_event=stop_event, + stt_enabled=stt_enabled, + intent_enabled=intent_enabled, + conversation_timeout=conversation_timeout, + model_path=speech_model_path, + get_cheetah_args=self._get_speech_engine_args, + get_rhino_args=self._get_speech_engine_args, + ) self._on_conversation_start = on_conversation_start self._on_conversation_end = on_conversation_end self._on_conversation_timeout = on_conversation_timeout self._on_speech_recognized = on_speech_recognized + self._on_intent_matched = on_intent_matched self._on_hotword_detected = on_hotword_detected self._recorder = None self._state = AssistantState.IDLE self._state_lock = RLock() - self._ctx = ConversationContext(timeout=conversation_timeout) + self._evt_queue = Queue(maxsize=100) if hotword_enabled: if not keywords: @@ -110,11 +130,7 @@ class Assistant: self.keyword_model_path = keyword_model_path - # Model path -> model instance cache - self._cheetah = {} - self._leopard: Optional[pvleopard.Leopard] = None self._porcupine: Optional[pvporcupine.Porcupine] = None - self._rhino: Optional[pvrhino.Rhino] = None @property def is_responding(self): @@ -124,6 +140,10 @@ class Assistant: def speech_model_path(self): return self._speech_model_path_override or self._speech_model_path + @property + def intent_model_path(self): + return self._intent_model_path_override or self._intent_model_path + @property def tts(self) -> TtsPicovoicePlugin: p = get_plugin('tts.picovoice') @@ -157,18 +177,23 @@ class Assistant: if prev_state == new_state: return + self.logger.info('Assistant state transition: %s -> %s', prev_state, new_state) if prev_state == AssistantState.DETECTING_SPEECH: self.tts.stop() - self._ctx.stop() self._speech_model_path_override = None + self._intent_model_path_override = None + self._speech_processor.on_conversation_end() self._on_conversation_end() elif new_state == AssistantState.DETECTING_SPEECH: - self._ctx.start() + self._speech_processor.on_conversation_start() self._on_conversation_start() if new_state == AssistantState.DETECTING_HOTWORD: self.tts.stop() - self._ctx.reset() + self._speech_processor.on_conversation_reset() + + # Put a null event on the event queue to unblock next_event + self._evt_queue.put(None) @property def porcupine(self) -> Optional[pvporcupine.Porcupine]: @@ -188,23 +213,16 @@ class Assistant: return self._porcupine - @property - def cheetah(self) -> Optional[pvcheetah.Cheetah]: - if not self.stt_enabled: - return None + def _get_speech_engine_args(self) -> dict: + args: Dict[str, Any] = {'access_key': self._access_key} + if self.speech_model_path: + args['model_path'] = self.speech_model_path + if self.endpoint_duration: + args['endpoint_duration_sec'] = self.endpoint_duration + if self.enable_automatic_punctuation: + args['enable_automatic_punctuation'] = self.enable_automatic_punctuation - if not self._cheetah.get(self.speech_model_path): - args: Dict[str, Any] = {'access_key': self._access_key} - if self.speech_model_path: - args['model_path'] = self.speech_model_path - if self.endpoint_duration: - args['endpoint_duration_sec'] = self.endpoint_duration - if self.enable_automatic_punctuation: - args['enable_automatic_punctuation'] = self.enable_automatic_punctuation - - self._cheetah[self.speech_model_path] = pvcheetah.create(**args) - - return self._cheetah[self.speech_model_path] + return args def __enter__(self): """ @@ -213,11 +231,14 @@ class Assistant: if self.should_stop(): return self + assert not self.is_alive(), 'The assistant is already running' + self._in_ctx = True + if self._recorder: self.logger.info('A recording stream already exists') - elif self.hotword_enabled or self.stt_enabled: - sample_rate = (self.porcupine or self.cheetah).sample_rate # type: ignore - frame_length = (self.porcupine or self.cheetah).frame_length # type: ignore + elif self.hotword_enabled or self.stt_enabled or self.intent_enabled: + sample_rate = (self.porcupine or self._speech_processor).sample_rate + frame_length = (self.porcupine or self._speech_processor).frame_length self._recorder = AudioRecorder( stop_event=self._stop_event, sample_rate=sample_rate, @@ -227,9 +248,7 @@ class Assistant: channels=1, ) - if self.stt_enabled: - self._cheetah[self.speech_model_path] = self.cheetah - + self._speech_processor.__enter__() self._recorder.__enter__() if self.porcupine: @@ -237,33 +256,25 @@ class Assistant: else: self.state = AssistantState.DETECTING_SPEECH + self.start() return self def __exit__(self, *_): """ Stop the assistant and release all resources. """ + self._in_ctx = False if self._recorder: self._recorder.__exit__(*_) self._recorder = None self.state = AssistantState.IDLE - for model in [*self._cheetah.keys()]: - cheetah = self._cheetah.pop(model, None) - if cheetah: - cheetah.delete() - - if self._leopard: - self._leopard.delete() - self._leopard = None if self._porcupine: self._porcupine.delete() self._porcupine = None - if self._rhino: - self._rhino.delete() - self._rhino = None + self._speech_processor.__exit__(*_) def __iter__(self): """ @@ -275,29 +286,36 @@ class Assistant: """ Process the next audio frame and return the corresponding event. """ - has_data = False if self.should_stop() or not self._recorder: raise StopIteration - while not (self.should_stop() or has_data): - data = self._recorder.read() - if data is None: - continue + if self.hotword_enabled and self.state == AssistantState.DETECTING_HOTWORD: + return self._evt_queue.get() - frame, t = data - if time() - t > self.frame_expiration: - self.logger.info( - 'Skipping audio frame older than %ss', self.frame_expiration - ) - continue # The audio frame is too old + evt = None + if ( + self._speech_processor.enabled + and self.state == AssistantState.DETECTING_SPEECH + ): + evt = self._speech_processor.next_event() - if self.hotword_enabled and self.state == AssistantState.DETECTING_HOTWORD: - return self._process_hotword(frame) + if isinstance(evt, SpeechRecognizedEvent): + self._on_speech_recognized(phrase=evt.args['phrase']) + if isinstance(evt, IntentMatchedEvent): + self._on_intent_matched( + intent=evt.args['intent'], slots=evt.args.get('slots', {}) + ) + if isinstance(evt, ConversationTimeoutEvent): + self._on_conversation_timeout() - if self.stt_enabled and self.state == AssistantState.DETECTING_SPEECH: - return self._process_speech(frame) + if ( + evt + and self.state == AssistantState.DETECTING_SPEECH + and self.hotword_enabled + ): + self.state = AssistantState.DETECTING_HOTWORD - raise StopIteration + return evt def mute(self): self._muted = True @@ -321,7 +339,7 @@ class Assistant: else: self.mute() - def _process_hotword(self, frame): + def _process_hotword(self, frame) -> Optional[HotwordDetectedEvent]: if not self.porcupine: return None @@ -333,48 +351,61 @@ class Assistant: if self.start_conversation_on_hotword: self.state = AssistantState.DETECTING_SPEECH - self.tts.stop() + self.tts.stop() # Stop any ongoing TTS when the hotword is detected self._on_hotword_detected(hotword=self.keywords[keyword_index]) return HotwordDetectedEvent(hotword=self.keywords[keyword_index]) return None - def _process_speech(self, frame): - if not self.cheetah: - return None - - event = None - partial_transcript, self._ctx.is_final = self.cheetah.process(frame) - - if partial_transcript: - self._ctx.transcript += partial_transcript - self.logger.info( - 'Partial transcript: %s, is_final: %s', - self._ctx.transcript, - self._ctx.is_final, - ) - - if self._ctx.is_final or self._ctx.timed_out: - phrase = self.cheetah.flush() or '' - self._ctx.transcript += phrase - phrase = self._ctx.transcript - phrase = phrase[:1].lower() + phrase[1:] - - if phrase: - event = SpeechRecognizedEvent(phrase=phrase) - self._on_speech_recognized(phrase=phrase) - else: - event = ConversationTimeoutEvent() - self._on_conversation_timeout() - - self._ctx.reset() - if self.hotword_enabled: - self.state = AssistantState.DETECTING_HOTWORD - - return event - def override_speech_model(self, model_path: Optional[str]): self._speech_model_path_override = model_path + def override_intent_model(self, model_path: Optional[str]): + self._intent_model_path_override = model_path + + def _put_event(self, evt: AssistantEvent): + try: + self._evt_queue.put_nowait(evt) + except Full: + self.logger.warning('The assistant event queue is full') + + def run(self): + assert ( + self._in_ctx + ), 'The assistant can only be started through a context manager' + + super().run() + + while not self.should_stop() and self._recorder: + self._recorder.wait_start() + if self.should_stop(): + break + + data = self._recorder.read() + if data is None: + continue + + frame, t = data + if time() - t > self.frame_expiration: + self.logger.info( + 'Skipping audio frame older than %ss', self.frame_expiration + ) + continue # The audio frame is too old + + if self.hotword_enabled and self.state == AssistantState.DETECTING_HOTWORD: + evt = self._process_hotword(frame) + if evt: + self._put_event(evt) + + continue + + if ( + self._speech_processor.enabled + and self.state == AssistantState.DETECTING_SPEECH + ): + self._speech_processor.process(frame, block=False) + + self.logger.info('Assistant stopped') + # vim:sw=4:ts=4:et: diff --git a/platypush/plugins/assistant/picovoice/_context.py b/platypush/plugins/assistant/picovoice/_context.py index e3696601..4b3264bc 100644 --- a/platypush/plugins/assistant/picovoice/_context.py +++ b/platypush/plugins/assistant/picovoice/_context.py @@ -2,6 +2,8 @@ from dataclasses import dataclass from time import time from typing import Optional +from ._intent import Intent + @dataclass class ConversationContext: @@ -11,6 +13,7 @@ class ConversationContext: transcript: str = '' is_final: bool = False + intent: Optional[Intent] = None timeout: Optional[float] = None t_start: Optional[float] = None t_end: Optional[float] = None @@ -25,6 +28,7 @@ class ConversationContext: def reset(self): self.transcript = '' + self.intent = None self.is_final = False self.t_start = None self.t_end = None @@ -32,14 +36,18 @@ class ConversationContext: @property def timed_out(self): return ( - not self.transcript - and not self.is_final + ( + (not self.transcript and not self.is_final) + or (not self.intent and not self.is_final) + ) and self.timeout and self.t_start and time() - self.t_start > self.timeout ) or ( - self.transcript - and not self.is_final + ( + (self.transcript and not self.is_final) + or (self.intent and not self.is_final) + ) and self.timeout and self.t_start and time() - self.t_start > self.timeout * 2 diff --git a/platypush/plugins/assistant/picovoice/_intent.py b/platypush/plugins/assistant/picovoice/_intent.py new file mode 100644 index 00000000..427a52d1 --- /dev/null +++ b/platypush/plugins/assistant/picovoice/_intent.py @@ -0,0 +1,11 @@ +from dataclasses import dataclass, field + + +@dataclass +class Intent: + """ + Speech intent data class. + """ + + name: str + slots: dict = field(default_factory=dict) diff --git a/platypush/plugins/assistant/picovoice/_recorder.py b/platypush/plugins/assistant/picovoice/_recorder.py index afe9ee5e..523806be 100644 --- a/platypush/plugins/assistant/picovoice/_recorder.py +++ b/platypush/plugins/assistant/picovoice/_recorder.py @@ -178,3 +178,14 @@ class AudioRecorder: Wait until the audio stream is stopped. """ wait_for_either(self._stop_event, self._upstream_stop_event, timeout=timeout) + + def wait_start(self, timeout: Optional[float] = None): + """ + Wait until the audio stream is started. + """ + wait_for_either( + self._stop_event, + self._upstream_stop_event, + self._paused_state._recording_event, + timeout=timeout, + ) diff --git a/platypush/plugins/assistant/picovoice/_speech/__init__.py b/platypush/plugins/assistant/picovoice/_speech/__init__.py new file mode 100644 index 00000000..6318c0b6 --- /dev/null +++ b/platypush/plugins/assistant/picovoice/_speech/__init__.py @@ -0,0 +1,3 @@ +from ._processor import SpeechProcessor + +__all__ = ['SpeechProcessor'] diff --git a/platypush/plugins/assistant/picovoice/_speech/_base.py b/platypush/plugins/assistant/picovoice/_speech/_base.py new file mode 100644 index 00000000..b4660c97 --- /dev/null +++ b/platypush/plugins/assistant/picovoice/_speech/_base.py @@ -0,0 +1,152 @@ +import logging +from abc import ABC, abstractmethod +from queue import Empty, Queue +from threading import Event, Thread, get_ident +from typing import Optional, Sequence + +from platypush.message.event.assistant import AssistantEvent + +from .._context import ConversationContext + + +class BaseProcessor(ABC, Thread): + """ + Base speech processor class. It is implemented by the ``SttProcessor`` and + the ``IntentProcessor`` classes. + """ + + def __init__( + self, + *args, + stop_event: Event, + conversation_timeout: Optional[float] = None, + **kwargs, + ): + super().__init__(*args, name=f'picovoice:{self.__class__.__name__}', **kwargs) + + self.logger = logging.getLogger(self.name) + self._audio_queue = Queue() + self._stop_event = stop_event + self._ctx = ConversationContext(timeout=conversation_timeout) + self._event_queue = Queue() + # This event is set if the upstream processor is waiting for an event + # from this processor + self._event_wait = Event() + # This event is set when the processor is done with the audio + # processing and it's ready to accept a new audio frame + self._processing_done = Event() + self._processing_done.set() + + def should_stop(self) -> bool: + return self._stop_event.is_set() + + def wait_stop(self, timeout: Optional[float] = None) -> bool: + return self._stop_event.wait(timeout) + + def enqueue(self, audio: Sequence[int]): + self._event_wait.set() + self._processing_done.clear() + self._audio_queue.put_nowait(audio) + + @property + def processing_done(self) -> Event: + return self._processing_done + + @property + @abstractmethod + def _model_path(self) -> Optional[str]: + """ + Return the model path. + """ + + @property + @abstractmethod + def sample_rate(self) -> int: + """ + :return: The sample rate wanted by Cheetah/Rhino. + """ + + @property + @abstractmethod + def frame_length(self) -> int: + """ + :return: The frame length wanted by Cheetah/Rhino. + """ + + def last_event(self) -> Optional[AssistantEvent]: + """ + :return: The latest event that was processed by the processor. + """ + evt = None + try: + while True: + evt = self._event_queue.get_nowait() + except Empty: + pass + + if evt: + self._event_wait.clear() + + return evt + + def clear_wait(self): + self._event_wait.clear() + + @abstractmethod + def process(self, audio: Sequence[int]) -> Optional[AssistantEvent]: + """ + Process speech events from a raw audio input. + """ + + def run(self): + super().run() + self._ctx.reset() + self._processing_done.clear() + self.logger.info('Processor started: %s', self.name) + + while not self.should_stop(): + audio = self._audio_queue.get() + + # The thread is stopped when it receives a None object + if audio is None: + break + + # Don't process the audio if the upstream processor is not waiting + # for an event + if not self._event_wait.is_set(): + continue + + try: + self._processing_done.clear() + event = self.process(audio) + if event: + self._event_queue.put_nowait(event) + self._processing_done.set() + except Exception as e: + self.logger.error( + 'An error occurred while processing the audio on %s: %s', + self.name, + e, + exc_info=e, + ) + self.wait_stop(timeout=1) + self._processing_done.set() + continue + + self._ctx.reset() + self.logger.info('Processor stopped: %s', self.name) + + def stop(self): + self._audio_queue.put_nowait(None) + if self.is_alive() and self.ident != get_ident(): + self.logger.debug('Stopping %s', self.name) + self.join() + + def on_conversation_start(self): + self._ctx.start() + + def on_conversation_end(self): + self._ctx.stop() + + def on_conversation_reset(self): + self._ctx.reset() diff --git a/platypush/plugins/assistant/picovoice/_speech/_intent.py b/platypush/plugins/assistant/picovoice/_speech/_intent.py new file mode 100644 index 00000000..6593f79a --- /dev/null +++ b/platypush/plugins/assistant/picovoice/_speech/_intent.py @@ -0,0 +1,86 @@ +from typing import Callable, Optional, Sequence, Union + +import pvrhino + +from platypush.message.event.assistant import ( + ConversationTimeoutEvent, + IntentMatchedEvent, +) + +from ._base import BaseProcessor + + +class IntentProcessor(BaseProcessor): + """ + Implementation of the speech-to-intent processor using the Picovoice Rhino + engine. + """ + + def __init__( + self, *args, get_rhino_args: Callable[[], dict] = lambda: {}, **kwargs + ): + super().__init__(*args, **kwargs) + self._get_rhino_args = get_rhino_args + # model_path -> Rhino instance cache + self._rhino = {} + + @property + def _model_path(self) -> Optional[str]: + return self._get_rhino_args().get('model_path') + + @property + def sample_rate(self) -> int: + return self._get_rhino().sample_rate + + @property + def frame_length(self) -> int: + return self._get_rhino().frame_length + + def _get_rhino(self) -> pvrhino.Rhino: + if not self._rhino.get(self._model_path): + self._rhino[self._model_path] = pvrhino.create(**self._get_rhino_args()) + + return self._rhino[self._model_path] + + def process( + self, audio: Sequence[int] + ) -> Optional[Union[IntentMatchedEvent, ConversationTimeoutEvent]]: + """ + Process the audio and return an ``IntentMatchedEvent`` if the intent was + understood, or a ``ConversationTimeoutEvent`` if the conversation timed + out, or ``None`` if the intent processing is not yet finalized. + """ + event = None + rhino = self._get_rhino() + self._ctx.is_final = rhino.process(audio) + + if self._ctx.is_final: + inference = rhino.get_inference() + self.logger.debug( + 'Intent detection finalized. Inference understood: %s', + inference.is_understood, + ) + + if inference.is_understood: + event = IntentMatchedEvent( + intent=inference.intent, + slots={slot.key: slot.value for slot in inference.slots}, + ) + + if not event and self._ctx.timed_out: + event = ConversationTimeoutEvent() + + if event: + self._ctx.reset() + + if event: + self.logger.debug('Intent event: %s', event) + + return event + + def stop(self): + super().stop() + objs = self._rhino.copy() + for key, obj in objs.items(): + obj.delete() + self._rhino.pop(key) diff --git a/platypush/plugins/assistant/picovoice/_speech/_processor.py b/platypush/plugins/assistant/picovoice/_speech/_processor.py new file mode 100644 index 00000000..2032bbe8 --- /dev/null +++ b/platypush/plugins/assistant/picovoice/_speech/_processor.py @@ -0,0 +1,196 @@ +import logging +from queue import Queue +from threading import Event +from typing import Callable, Optional, Sequence + +from platypush.message.event.assistant import AssistantEvent +from platypush.utils import wait_for_either + +from ._intent import IntentProcessor +from ._stt import SttProcessor + + +class SpeechProcessor: + """ + Speech processor class that wraps the STT and Intent processors under the + same interface. + """ + + def __init__( + self, + stop_event: Event, + model_path: Optional[str] = None, + stt_enabled: bool = True, + intent_enabled: bool = False, + conversation_timeout: Optional[float] = None, + get_cheetah_args: Callable[[], dict] = lambda: {}, + get_rhino_args: Callable[[], dict] = lambda: {}, + ): + self.logger = logging.getLogger(self.__class__.__name__) + self._stt_enabled = stt_enabled + self._intent_enabled = intent_enabled + self._model_path = model_path + self._conversation_timeout = conversation_timeout + self._audio_queue = Queue() + self._stop_event = stop_event + self._get_cheetah_args = get_cheetah_args + self._get_rhino_args = get_rhino_args + + self._stt_processor = SttProcessor( + conversation_timeout=conversation_timeout, + stop_event=stop_event, + get_cheetah_args=get_cheetah_args, + ) + + self._intent_processor = IntentProcessor( + conversation_timeout=conversation_timeout, + stop_event=stop_event, + get_rhino_args=get_rhino_args, + ) + + @property + def enabled(self) -> bool: + """ + The processor is enabled if either the STT or the Intent processor are + enabled. + """ + return self._stt_enabled or self._intent_enabled + + def should_stop(self) -> bool: + return self._stop_event.is_set() + + def next_event(self, timeout: Optional[float] = None) -> Optional[AssistantEvent]: + evt = None + + # Wait for either the STT or Intent processor to finish processing the audio + completed = wait_for_either( + self._stt_processor.processing_done, + self._intent_processor.processing_done, + self._stop_event, + timeout=timeout, + ) + + if not completed: + self.logger.warning('Timeout while waiting for the processors to finish') + + # Immediately return if the stop event is set + if self.should_stop(): + return evt + + # Priority to the intent processor event, if the processor is enabled + if self._intent_enabled: + evt = self._intent_processor.last_event() + if evt: + self.logger.debug('Intent processor event: %s', evt) + + # If the intent processor didn't return any event, then return the STT + # processor event + if not evt and self._stt_enabled: + evt = self._stt_processor.last_event() + if evt: + self.logger.debug('STT processor event: %s', evt) + + if evt: + self._stt_processor.clear_wait() + self._intent_processor.clear_wait() + + return evt + + def process( + self, audio: Sequence[int], block: bool = True, timeout: Optional[float] = None + ) -> Optional[AssistantEvent]: + """ + Process an audio frame. + + The audio frame is enqueued to both the STT and Intent processors, if + enabled. The function waits for either processor to finish processing + the audio, and returns the event from the first processor that returns + a result. + + Priority is given to the Intent processor if enabled, otherwise the STT + processor is used. + """ + # Enqueue the audio to both the STT and Intent processors if enabled + if self._stt_enabled: + self._stt_processor.enqueue(audio) + + if self._intent_enabled: + self._intent_processor.enqueue(audio) + + if not block: + return None + + return self.next_event(timeout=timeout) + + def __enter__(self): + """ + Context manager entry point - it wraps :meth:`start`. + """ + self.start() + + def __exit__(self, *_, **__): + """ + Context manager exit point - it wraps :meth:`stop`. + """ + self.stop() + + def start(self): + """ + Start the STT and Intent processors. + """ + self._stt_processor.start() + self._intent_processor.start() + + def stop(self): + """ + Stop the STT and Intent processors. + """ + self._stt_processor.stop() + self._intent_processor.stop() + + def on_conversation_start(self): + if self._stt_enabled: + self._stt_processor.on_conversation_start() + + if self._intent_enabled: + self._intent_processor.on_conversation_start() + + def on_conversation_end(self): + if self._stt_enabled: + self._stt_processor.on_conversation_end() + + if self._intent_enabled: + self._intent_processor.on_conversation_end() + + def on_conversation_reset(self): + if self._stt_enabled: + self._stt_processor.on_conversation_reset() + + if self._intent_enabled: + self._intent_processor.on_conversation_reset() + + @property + def sample_rate(self) -> int: + """ + The sample rate of the audio frames. + """ + if self._intent_enabled: + return self._intent_processor.sample_rate + + if self._stt_enabled: + return self._stt_processor.sample_rate + + raise ValueError('No processor enabled') + + @property + def frame_length(self) -> int: + """ + The frame length of the audio frames. + """ + if self._intent_enabled: + return self._intent_processor.frame_length + + if self._stt_enabled: + return self._stt_processor.frame_length + + raise ValueError('No processor enabled') diff --git a/platypush/plugins/assistant/picovoice/_speech/_stt.py b/platypush/plugins/assistant/picovoice/_speech/_stt.py new file mode 100644 index 00000000..b6ab33c4 --- /dev/null +++ b/platypush/plugins/assistant/picovoice/_speech/_stt.py @@ -0,0 +1,92 @@ +from typing import Callable, Optional, Sequence, Union + +import pvcheetah + +from platypush.message.event.assistant import ( + ConversationTimeoutEvent, + SpeechRecognizedEvent, +) + +from ._base import BaseProcessor + + +class SttProcessor(BaseProcessor): + """ + Implementation of the speech-to-text processor using the Picovoice Cheetah + engine. + """ + + def __init__( + self, *args, get_cheetah_args: Callable[[], dict] = lambda: {}, **kwargs + ): + super().__init__(*args, **kwargs) + self._get_cheetah_args = get_cheetah_args + # model_path -> Cheetah instance cache + self._cheetah = {self._model_path: pvcheetah.create(**self._get_cheetah_args())} + + @property + def _model_path(self) -> Optional[str]: + return self._get_cheetah_args().get('model_path') + + @property + def sample_rate(self) -> int: + return self._get_cheetah().sample_rate + + @property + def frame_length(self) -> int: + return self._get_cheetah().frame_length + + def _get_cheetah(self) -> pvcheetah.Cheetah: + if not self._cheetah.get(self._model_path): + self.logger.debug( + 'Creating Cheetah instance for model %s', self._model_path + ) + self._cheetah[self._model_path] = pvcheetah.create( + **self._get_cheetah_args() + ) + self.logger.debug('Cheetah instance created for model %s', self._model_path) + + return self._cheetah[self._model_path] + + def process( + self, audio: Sequence[int] + ) -> Optional[Union[SpeechRecognizedEvent, ConversationTimeoutEvent]]: + event = None + cheetah = self._get_cheetah() + partial_transcript, self._ctx.is_final = cheetah.process(audio) + + # Concatenate the partial transcript to the context + if partial_transcript: + self._ctx.transcript += partial_transcript + self.logger.info( + 'Partial transcript: %s, is_final: %s', + self._ctx.transcript, + self._ctx.is_final, + ) + + # If the transcript is final or the conversation timed out, then + # process and return whatever is available in the context + if self._ctx.is_final or self._ctx.timed_out: + phrase = cheetah.flush() or '' + self._ctx.transcript += phrase + phrase = self._ctx.transcript + phrase = phrase[:1].lower() + phrase[1:] + event = ( + SpeechRecognizedEvent(phrase=phrase) + if phrase + else ConversationTimeoutEvent() + ) + + self._ctx.reset() + + if event: + self.logger.debug('STT event: %s', event) + + return event + + def stop(self): + super().stop() + objs = self._cheetah.copy() + for key, obj in objs.items(): + obj.delete() + self._cheetah.pop(key) diff --git a/platypush/plugins/assistant/picovoice/manifest.yaml b/platypush/plugins/assistant/picovoice/manifest.yaml index f3dc3814..d89406a8 100644 --- a/platypush/plugins/assistant/picovoice/manifest.yaml +++ b/platypush/plugins/assistant/picovoice/manifest.yaml @@ -6,9 +6,11 @@ manifest: - platypush.message.event.assistant.ConversationStartEvent - platypush.message.event.assistant.ConversationTimeoutEvent - platypush.message.event.assistant.HotwordDetectedEvent + - platypush.message.event.assistant.IntentMatchedEvent - platypush.message.event.assistant.MicMutedEvent - platypush.message.event.assistant.MicUnmutedEvent - platypush.message.event.assistant.NoResponseEvent + - platypush.message.event.assistant.ResponseEndEvent - platypush.message.event.assistant.ResponseEvent - platypush.message.event.assistant.SpeechRecognizedEvent install: @@ -22,6 +24,7 @@ manifest: - ffmpeg - python-sounddevice pip: + - num2words # Temporary dependency - pvcheetah - pvleopard - pvorca