diff --git a/platypush/commands/_stream.py b/platypush/commands/_stream.py index 3e480c11c..47f595473 100644 --- a/platypush/commands/_stream.py +++ b/platypush/commands/_stream.py @@ -1,4 +1,4 @@ -from multiprocessing import RLock, Queue +from multiprocessing import RLock, Queue, active_children import os from queue import Empty import socket @@ -57,6 +57,25 @@ class CommandStream(ControllableProcess): self.reset() return super().close() + def _term_or_kill(self, kill: bool = False, visited=None) -> None: + func = 'kill' if kill else 'terminate' + visited = visited or set() + visited.add(id(self)) + + for child in active_children(): + child_id = id(child) + if child_id not in visited: + visited.add(child_id) + getattr(child, func)() + + getattr(super(), func)() + + def terminate(self, visited=None) -> None: + self._term_or_kill(kill=False, visited=visited) + + def kill(self) -> None: + self._term_or_kill(kill=True) + def __enter__(self) -> "CommandStream": self.reset() sock = self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) diff --git a/platypush/message/event/assistant/__init__.py b/platypush/message/event/assistant/__init__.py index 22a066dc9..41d0d229f 100644 --- a/platypush/message/event/assistant/__init__.py +++ b/platypush/message/event/assistant/__init__.py @@ -226,7 +226,7 @@ class SpeechRecognizedEvent(AssistantEvent): return result -class IntentMatchedEvent(AssistantEvent): +class IntentRecognizedEvent(AssistantEvent): """ Event triggered when an intent is matched by a speech command. """ diff --git a/platypush/plugins/assistant/__init__.py b/platypush/plugins/assistant/__init__.py index 89c6a0d80..22ddb6225 100644 --- a/platypush/plugins/assistant/__init__.py +++ b/platypush/plugins/assistant/__init__.py @@ -259,9 +259,9 @@ class AssistantPlugin(Plugin, AssistantEntityManager, ABC): self._send_event(SpeechRecognizedEvent, phrase=phrase) def _on_intent_matched(self, intent: str, slots: Optional[Dict[str, Any]] = None): - from platypush.message.event.assistant import IntentMatchedEvent + from platypush.message.event.assistant import IntentRecognizedEvent - self._send_event(IntentMatchedEvent, intent=intent, slots=slots) + self._send_event(IntentRecognizedEvent, intent=intent, slots=slots) def _on_alarm_start(self): from platypush.message.event.assistant import AlarmStartedEvent diff --git a/platypush/plugins/assistant/picovoice/__init__.py b/platypush/plugins/assistant/picovoice/__init__.py index 51d349fca..71ff1a3ce 100644 --- a/platypush/plugins/assistant/picovoice/__init__.py +++ b/platypush/plugins/assistant/picovoice/__init__.py @@ -54,7 +54,6 @@ class AssistantPicovoicePlugin(AssistantPlugin, RunnablePlugin): access_key: str, hotword_enabled: bool = True, stt_enabled: bool = True, - intent_enabled: bool = False, keywords: Optional[Sequence[str]] = None, keyword_paths: Optional[Sequence[str]] = None, keyword_model_path: Optional[str] = None, @@ -77,10 +76,6 @@ class AssistantPicovoicePlugin(AssistantPlugin, RunnablePlugin): :param stt_enabled: Enable the speech-to-text engine (default: True). **Note**: The speech-to-text engine requires you to add Cheetah to the products available in your Picovoice account. - :param intent_enabled: Enable the intent recognition engine (default: - False). - **Note**: The intent recognition engine requires you to add Rhino - to the products available in your Picovoice account. :param keywords: List of keywords to listen for (e.g. ``alexa``, ``ok google``...). This is required if the wake-word engine is enabled. See the `Porcupine keywords repository @@ -142,7 +137,7 @@ class AssistantPicovoicePlugin(AssistantPlugin, RunnablePlugin): Then a phrase like "turn on the lights in the living room" would trigger a - :class:`platypush.message.event.assistant.IntentMatchedEvent` with: + :class:`platypush.message.event.assistant.IntentRecognizedEvent` with: .. code-block:: json @@ -155,6 +150,8 @@ class AssistantPicovoicePlugin(AssistantPlugin, RunnablePlugin): } } + **Note**: The intent recognition engine requires you to add Rhino + to the products available in your Picovoice account. :param endpoint_duration: If set, the assistant will stop listening when no speech is detected for the specified duration (in seconds) after the end of an utterance. @@ -191,7 +188,6 @@ class AssistantPicovoicePlugin(AssistantPlugin, RunnablePlugin): 'access_key': access_key, 'hotword_enabled': hotword_enabled, 'stt_enabled': stt_enabled, - 'intent_enabled': intent_enabled, 'keywords': keywords, 'keyword_paths': ( os.path.expanduser(keyword_path) @@ -208,7 +204,11 @@ class AssistantPicovoicePlugin(AssistantPlugin, RunnablePlugin): ), 'endpoint_duration': endpoint_duration, 'enable_automatic_punctuation': enable_automatic_punctuation, - 'start_conversation_on_hotword': start_conversation_on_hotword, + 'start_conversation_on_hotword': ( + start_conversation_on_hotword + if (intent_model_path or stt_enabled) + else False + ), 'audio_queue_size': audio_queue_size, 'conversation_timeout': conversation_timeout, 'muted': muted, @@ -420,7 +420,7 @@ class AssistantPicovoicePlugin(AssistantPlugin, RunnablePlugin): try: for event in self._assistant: if event is not None: - self.logger.debug('Picovoice assistant event: %s', event) + self.logger.debug('Dequeued assistant event: %s', event) except KeyboardInterrupt: break except Exception as e: diff --git a/platypush/plugins/assistant/picovoice/_assistant.py b/platypush/plugins/assistant/picovoice/_assistant.py index c7e5f1479..9fcf66d04 100644 --- a/platypush/plugins/assistant/picovoice/_assistant.py +++ b/platypush/plugins/assistant/picovoice/_assistant.py @@ -12,7 +12,7 @@ from platypush.message.event.assistant import ( AssistantEvent, ConversationTimeoutEvent, HotwordDetectedEvent, - IntentMatchedEvent, + IntentRecognizedEvent, SpeechRecognizedEvent, ) from platypush.plugins.tts.picovoice import TtsPicovoicePlugin @@ -37,7 +37,6 @@ class Assistant(Thread): stop_event: Event, hotword_enabled: bool = True, stt_enabled: bool = True, - intent_enabled: bool = False, keywords: Optional[Sequence[str]] = None, keyword_paths: Optional[Sequence[str]] = None, keyword_model_path: Optional[str] = None, @@ -58,15 +57,12 @@ class Assistant(Thread): on_hotword_detected=_default_callback, ): super().__init__(name='picovoice:Assistant') - if intent_enabled: - assert intent_model_path, 'Intent model path not provided' self._access_key = access_key self._stop_event = stop_event self.logger = logging.getLogger(__name__) self.hotword_enabled = hotword_enabled self.stt_enabled = stt_enabled - self.intent_enabled = intent_enabled self.keywords = list(keywords or []) self.keyword_paths = None self.keyword_model_path = None @@ -86,11 +82,11 @@ class Assistant(Thread): self._speech_processor = SpeechProcessor( stop_event=stop_event, stt_enabled=stt_enabled, - intent_enabled=intent_enabled, + intent_enabled=self.intent_enabled, conversation_timeout=conversation_timeout, model_path=speech_model_path, get_cheetah_args=self._get_speech_engine_args, - get_rhino_args=self._get_speech_engine_args, + get_rhino_args=self._get_intent_engine_args, ) self._on_conversation_start = on_conversation_start @@ -133,15 +129,19 @@ class Assistant(Thread): self._porcupine: Optional[pvporcupine.Porcupine] = None @property - def is_responding(self): + def intent_enabled(self) -> bool: + return self.intent_model_path is not None + + @property + def is_responding(self) -> bool: return self._responding.is_set() @property - def speech_model_path(self): + def speech_model_path(self) -> Optional[str]: return self._speech_model_path_override or self._speech_model_path @property - def intent_model_path(self): + def intent_model_path(self) -> Optional[str]: return self._intent_model_path_override or self._intent_model_path @property @@ -224,6 +224,16 @@ class Assistant(Thread): return args + def _get_intent_engine_args(self) -> dict: + args: Dict[str, Any] = {'access_key': self._access_key} + args['context_path'] = self.intent_model_path + if self.endpoint_duration: + args['endpoint_duration_sec'] = self.endpoint_duration + if self.enable_automatic_punctuation: + args['enable_automatic_punctuation'] = self.enable_automatic_punctuation + + return args + def __enter__(self): """ Get the assistant ready to start processing audio frames. @@ -301,13 +311,16 @@ class Assistant(Thread): if isinstance(evt, SpeechRecognizedEvent): self._on_speech_recognized(phrase=evt.args['phrase']) - if isinstance(evt, IntentMatchedEvent): + if isinstance(evt, IntentRecognizedEvent): self._on_intent_matched( intent=evt.args['intent'], slots=evt.args.get('slots', {}) ) if isinstance(evt, ConversationTimeoutEvent): self._on_conversation_timeout() + if evt: + self._speech_processor.reset() + if ( evt and self.state == AssistantState.DETECTING_SPEECH diff --git a/platypush/plugins/assistant/picovoice/_context.py b/platypush/plugins/assistant/picovoice/_context.py index 4b3264bc8..1a85bb24d 100644 --- a/platypush/plugins/assistant/picovoice/_context.py +++ b/platypush/plugins/assistant/picovoice/_context.py @@ -16,22 +16,16 @@ class ConversationContext: intent: Optional[Intent] = None timeout: Optional[float] = None t_start: Optional[float] = None - t_end: Optional[float] = None def start(self): self.reset() self.t_start = time() - def stop(self): - self.reset() - self.t_end = time() - def reset(self): self.transcript = '' self.intent = None self.is_final = False self.t_start = None - self.t_end = None @property def timed_out(self): diff --git a/platypush/plugins/assistant/picovoice/_speech/_base.py b/platypush/plugins/assistant/picovoice/_speech/_base.py index b4660c979..8ac7c278d 100644 --- a/platypush/plugins/assistant/picovoice/_speech/_base.py +++ b/platypush/plugins/assistant/picovoice/_speech/_base.py @@ -1,8 +1,8 @@ import logging from abc import ABC, abstractmethod from queue import Empty, Queue -from threading import Event, Thread, get_ident -from typing import Optional, Sequence +from threading import Event, RLock, Thread, get_ident +from typing import Any, Optional, Sequence from platypush.message.event.assistant import AssistantEvent @@ -19,12 +19,14 @@ class BaseProcessor(ABC, Thread): self, *args, stop_event: Event, + enabled: bool = True, conversation_timeout: Optional[float] = None, **kwargs, ): super().__init__(*args, name=f'picovoice:{self.__class__.__name__}', **kwargs) self.logger = logging.getLogger(self.name) + self._enabled = enabled self._audio_queue = Queue() self._stop_event = stop_event self._ctx = ConversationContext(timeout=conversation_timeout) @@ -36,6 +38,7 @@ class BaseProcessor(ABC, Thread): # processing and it's ready to accept a new audio frame self._processing_done = Event() self._processing_done.set() + self._state_lock = RLock() def should_stop(self) -> bool: return self._stop_event.is_set() @@ -44,10 +47,26 @@ class BaseProcessor(ABC, Thread): return self._stop_event.wait(timeout) def enqueue(self, audio: Sequence[int]): + if not self._enabled: + return + self._event_wait.set() self._processing_done.clear() self._audio_queue.put_nowait(audio) + def reset(self) -> Optional[Any]: + """ + Reset any pending context. + """ + if not self._enabled: + return + + with self._state_lock: + self._ctx.reset() + self._event_queue.queue.clear() + self._event_wait.clear() + self._processing_done.set() + @property def processing_done(self) -> Event: return self._processing_done @@ -77,20 +96,18 @@ class BaseProcessor(ABC, Thread): """ :return: The latest event that was processed by the processor. """ - evt = None - try: - while True: - evt = self._event_queue.get_nowait() - except Empty: - pass + with self._state_lock: + evt = None + try: + while True: + evt = self._event_queue.get_nowait() + except Empty: + pass - if evt: - self._event_wait.clear() + if evt: + self._event_wait.clear() - return evt - - def clear_wait(self): - self._event_wait.clear() + return evt @abstractmethod def process(self, audio: Sequence[int]) -> Optional[AssistantEvent]: @@ -100,7 +117,10 @@ class BaseProcessor(ABC, Thread): def run(self): super().run() - self._ctx.reset() + if not self._enabled: + self.wait_stop() + + self.reset() self._processing_done.clear() self.logger.info('Processor started: %s', self.name) @@ -119,7 +139,11 @@ class BaseProcessor(ABC, Thread): try: self._processing_done.clear() event = self.process(audio) + if event: + self.logger.debug( + 'Dispatching event processed from %s: %s', self.name, event + ) self._event_queue.put_nowait(event) self._processing_done.set() except Exception as e: @@ -133,10 +157,14 @@ class BaseProcessor(ABC, Thread): self._processing_done.set() continue - self._ctx.reset() + self._processing_done.set() + self.reset() self.logger.info('Processor stopped: %s', self.name) def stop(self): + if not self._enabled: + return + self._audio_queue.put_nowait(None) if self.is_alive() and self.ident != get_ident(): self.logger.debug('Stopping %s', self.name) @@ -146,7 +174,7 @@ class BaseProcessor(ABC, Thread): self._ctx.start() def on_conversation_end(self): - self._ctx.stop() + self.reset() def on_conversation_reset(self): - self._ctx.reset() + self.reset() diff --git a/platypush/plugins/assistant/picovoice/_speech/_intent.py b/platypush/plugins/assistant/picovoice/_speech/_intent.py index 6593f79a2..eb5767bb2 100644 --- a/platypush/plugins/assistant/picovoice/_speech/_intent.py +++ b/platypush/plugins/assistant/picovoice/_speech/_intent.py @@ -4,7 +4,7 @@ import pvrhino from platypush.message.event.assistant import ( ConversationTimeoutEvent, - IntentMatchedEvent, + IntentRecognizedEvent, ) from ._base import BaseProcessor @@ -44,9 +44,9 @@ class IntentProcessor(BaseProcessor): def process( self, audio: Sequence[int] - ) -> Optional[Union[IntentMatchedEvent, ConversationTimeoutEvent]]: + ) -> Optional[Union[IntentRecognizedEvent, ConversationTimeoutEvent]]: """ - Process the audio and return an ``IntentMatchedEvent`` if the intent was + Process the audio and return an ``IntentRecognizedEvent`` if the intent was understood, or a ``ConversationTimeoutEvent`` if the conversation timed out, or ``None`` if the intent processing is not yet finalized. """ @@ -62,25 +62,32 @@ class IntentProcessor(BaseProcessor): ) if inference.is_understood: - event = IntentMatchedEvent( + event = IntentRecognizedEvent( intent=inference.intent, - slots={slot.key: slot.value for slot in inference.slots}, + slots=dict(inference.slots), ) if not event and self._ctx.timed_out: event = ConversationTimeoutEvent() - if event: - self._ctx.reset() - - if event: - self.logger.debug('Intent event: %s', event) - return event + def reset(self) -> None: + if not self._enabled: + return + + with self._state_lock: + self._get_rhino().reset() + super().reset() + def stop(self): + if not self._enabled: + return + super().stop() - objs = self._rhino.copy() - for key, obj in objs.items(): - obj.delete() - self._rhino.pop(key) + + with self._state_lock: + objs = self._rhino.copy() + for key, obj in objs.items(): + obj.delete() + self._rhino.pop(key) diff --git a/platypush/plugins/assistant/picovoice/_speech/_processor.py b/platypush/plugins/assistant/picovoice/_speech/_processor.py index 2032bbe8d..edda995a9 100644 --- a/platypush/plugins/assistant/picovoice/_speech/_processor.py +++ b/platypush/plugins/assistant/picovoice/_speech/_processor.py @@ -3,7 +3,7 @@ from queue import Queue from threading import Event from typing import Callable, Optional, Sequence -from platypush.message.event.assistant import AssistantEvent +from platypush.message.event.assistant import AssistantEvent, ConversationTimeoutEvent from platypush.utils import wait_for_either from ._intent import IntentProcessor @@ -39,12 +39,14 @@ class SpeechProcessor: self._stt_processor = SttProcessor( conversation_timeout=conversation_timeout, stop_event=stop_event, + enabled=stt_enabled, get_cheetah_args=get_cheetah_args, ) self._intent_processor = IntentProcessor( conversation_timeout=conversation_timeout, stop_event=stop_event, + enabled=intent_enabled, get_rhino_args=get_rhino_args, ) @@ -77,24 +79,24 @@ class SpeechProcessor: if self.should_stop(): return evt - # Priority to the intent processor event, if the processor is enabled - if self._intent_enabled: - evt = self._intent_processor.last_event() + with self._stt_processor._state_lock, self._intent_processor._state_lock: + # Priority to the intent processor event, if the processor is enabled + if self._intent_enabled: + evt = self._intent_processor.last_event() + + # If the intent processor didn't return any event, then return the STT + # processor event + if ( + not evt or isinstance(evt, ConversationTimeoutEvent) + ) and self._stt_enabled: + # self._stt_processor.processing_done.wait(timeout=timeout) + evt = self._stt_processor.last_event() + if evt: - self.logger.debug('Intent processor event: %s', evt) + self._stt_processor.reset() + self._intent_processor.reset() - # If the intent processor didn't return any event, then return the STT - # processor event - if not evt and self._stt_enabled: - evt = self._stt_processor.last_event() - if evt: - self.logger.debug('STT processor event: %s', evt) - - if evt: - self._stt_processor.clear_wait() - self._intent_processor.clear_wait() - - return evt + return evt def process( self, audio: Sequence[int], block: bool = True, timeout: Optional[float] = None @@ -138,8 +140,11 @@ class SpeechProcessor: """ Start the STT and Intent processors. """ - self._stt_processor.start() - self._intent_processor.start() + if self._stt_enabled: + self._stt_processor.start() + + if self._intent_enabled: + self._intent_processor.start() def stop(self): """ @@ -169,6 +174,13 @@ class SpeechProcessor: if self._intent_enabled: self._intent_processor.on_conversation_reset() + def reset(self): + """ + Reset the state of the STT and Intent processors. + """ + self._stt_processor.reset() + self._intent_processor.reset() + @property def sample_rate(self) -> int: """ diff --git a/platypush/plugins/assistant/picovoice/_speech/_stt.py b/platypush/plugins/assistant/picovoice/_speech/_stt.py index b6ab33c41..cdd87d608 100644 --- a/platypush/plugins/assistant/picovoice/_speech/_stt.py +++ b/platypush/plugins/assistant/picovoice/_speech/_stt.py @@ -37,16 +37,19 @@ class SttProcessor(BaseProcessor): return self._get_cheetah().frame_length def _get_cheetah(self) -> pvcheetah.Cheetah: - if not self._cheetah.get(self._model_path): - self.logger.debug( - 'Creating Cheetah instance for model %s', self._model_path - ) - self._cheetah[self._model_path] = pvcheetah.create( - **self._get_cheetah_args() - ) - self.logger.debug('Cheetah instance created for model %s', self._model_path) + with self._state_lock: + if not self._cheetah.get(self._model_path): + self.logger.debug( + 'Creating Cheetah instance for model %s', self._model_path + ) + self._cheetah[self._model_path] = pvcheetah.create( + **self._get_cheetah_args() + ) + self.logger.debug( + 'Cheetah instance created for model %s', self._model_path + ) - return self._cheetah[self._model_path] + return self._cheetah[self._model_path] def process( self, audio: Sequence[int] @@ -54,6 +57,7 @@ class SttProcessor(BaseProcessor): event = None cheetah = self._get_cheetah() partial_transcript, self._ctx.is_final = cheetah.process(audio) + last_transcript = self._ctx.transcript # Concatenate the partial transcript to the context if partial_transcript: @@ -69,24 +73,38 @@ class SttProcessor(BaseProcessor): if self._ctx.is_final or self._ctx.timed_out: phrase = cheetah.flush() or '' self._ctx.transcript += phrase + if self._ctx.transcript and self._ctx.transcript != last_transcript: + self.logger.debug('Processed STT transcript: %s', self._ctx.transcript) + last_transcript = self._ctx.transcript + phrase = self._ctx.transcript - phrase = phrase[:1].lower() + phrase[1:] + phrase = (phrase[:1].lower() + phrase[1:]).strip() event = ( SpeechRecognizedEvent(phrase=phrase) if phrase else ConversationTimeoutEvent() ) - self._ctx.reset() - - if event: - self.logger.debug('STT event: %s', event) + self.reset() return event + def reset(self): + if not self._enabled: + return + + with self._state_lock: + super().reset() + self._get_cheetah().flush() + def stop(self): + if not self._enabled: + return + super().stop() - objs = self._cheetah.copy() - for key, obj in objs.items(): - obj.delete() - self._cheetah.pop(key) + + with self._state_lock: + objs = self._cheetah.copy() + for key, obj in objs.items(): + obj.delete() + self._cheetah.pop(key) diff --git a/platypush/plugins/assistant/picovoice/manifest.yaml b/platypush/plugins/assistant/picovoice/manifest.yaml index d89406a8e..730012004 100644 --- a/platypush/plugins/assistant/picovoice/manifest.yaml +++ b/platypush/plugins/assistant/picovoice/manifest.yaml @@ -6,7 +6,7 @@ manifest: - platypush.message.event.assistant.ConversationStartEvent - platypush.message.event.assistant.ConversationTimeoutEvent - platypush.message.event.assistant.HotwordDetectedEvent - - platypush.message.event.assistant.IntentMatchedEvent + - platypush.message.event.assistant.IntentRecognizedEvent - platypush.message.event.assistant.MicMutedEvent - platypush.message.event.assistant.MicUnmutedEvent - platypush.message.event.assistant.NoResponseEvent