Compare commits

...

7 Commits

Author SHA1 Message Date
Fabio Manganiello 8df34a13f1
New architecture for the assistant speech detection logic.
continuous-integration/drone/push Build is passing Details
The assistant object now runs in its own thread and leverages an
external `SpeechProcessor` that uses two threads to scan for both
intents and speech in parallel on audio frames.
2024-04-20 17:24:03 +02:00
Fabio Manganiello cb2077eddc
Merge branch 'master' into 304-new-picovoice-integration 2024-04-17 04:08:49 +02:00
Fabio Manganiello 51a9b956bf Merge pull request '[#389] Fix for "Too many open files" media issue.' (#390) from fix-too-many-open-files-during-media into master
continuous-integration/drone/push Build is passing Details
Reviewed-on: #390
2024-04-17 04:08:10 +02:00
Fabio Manganiello e123463804
[media.chromecast] Refactored implementation.
continuous-integration/drone/push Build is passing Details
Explicitly use a `CastBrowser` object initialized at plugin boot instead
of relying on blocking calls to `pychromecast.get_chromecasts`.

1. It enables better event handling via callbacks instead of
   synchronously waiting for scan batches.

2. It optimizes resources - only one Zeroconf and one CastBrowser object
   will be created in the plugin, and destroyed upon stop.

3. No need for separate `get_chromecast`/`_refresh_chromecasts` methods:
   all the scanning is run continuously, so we can just return the
   results from the maps.
2024-04-17 03:56:45 +02:00
Fabio Manganiello f99f6bdab9
[media.chromecast] Resource clean up + new API adaptations.
continuous-integration/drone/push Build is passing Details
- `pychromecast.get_chromecasts` returns both a list of devices and a
  browser object. Since the Chromecast plugin is the most likely culprit
  of the excessive number of open MDNS sockets, it seems that we may
  need to explicitly stop discovery on the browser and close the
  ZeroConf object after the discovery is done.

- I was still using an ancient version of pychromecast on my RPi4, and I
  didn't notice that more recent versions implemented several breaking
  changes. Adapted the code to cope with those changes.
2024-04-17 02:49:31 +02:00
Fabio Manganiello 4972c8bdcf
Unregister a Zeroconf instance if it already exists before publishing a backend service.
continuous-integration/drone/push Build is passing Details
`mdns` connection are another culprit for the increasing number of open
files in the process.
2024-04-16 00:12:55 +02:00
Fabio Manganiello 33d4c8342d
[#389] Possible fix for "Too many open files" media issue.
continuous-integration/drone/push Build is passing Details
It seems that the process keeps a lot of open connections to Chromecast
devices during playback.

The most likely culprit is the `_refresh_chromecasts` logic.

We should start a `cast` object and register a status listener only if a
Chromecast with the same identifier isn't already registered in the
plugin.
2024-04-15 23:01:10 +02:00
17 changed files with 865 additions and 246 deletions

View File

@ -402,6 +402,13 @@ class Backend(Thread, EventGenerator, ExtensionWithManifest):
)
return
if self.zeroconf:
self.logger.info(
'Zeroconf service already registered for %s, removing the previous instance',
self.__class__.__name__,
)
self.unregister_service()
self.zeroconf = Zeroconf()
srv_desc = {
'name': 'Platypush',

View File

@ -107,13 +107,17 @@ class ResponseEndEvent(ConversationEndEvent):
Event triggered when a response has been rendered on the assistant.
"""
def __init__(self, *args, with_follow_on_turn: bool = False, **kwargs):
def __init__(
self, *args, response_text: str, with_follow_on_turn: bool = False, **kwargs
):
"""
:param response_text: Response text rendered on the assistant.
:param with_follow_on_turn: Set to true if the conversation expects a
user follow-up, false otherwise.
"""
super().__init__(
*args,
response_text=response_text,
with_follow_on_turn=with_follow_on_turn,
**kwargs,
)

View File

@ -244,7 +244,7 @@ class AssistantPlugin(Plugin, AssistantEntityManager, ABC):
def _on_response_render_end(self):
from platypush.message.event.assistant import ResponseEndEvent
self._send_event(ResponseEndEvent)
self._send_event(ResponseEndEvent, response_text=self._last_response)
def _on_hotword_detected(self, hotword: Optional[str]):
from platypush.message.event.assistant import HotwordDetectedEvent

View File

@ -216,6 +216,7 @@ class AssistantPicovoicePlugin(AssistantPlugin, RunnablePlugin):
'on_conversation_end': self._on_conversation_end,
'on_conversation_timeout': self._on_conversation_timeout,
'on_speech_recognized': self._on_speech_recognized,
'on_intent_matched': self._on_intent_matched,
'on_hotword_detected': self._on_hotword_detected,
}

View File

@ -1,28 +1,28 @@
import logging
import os
from threading import Event, RLock
from queue import Full, Queue
from threading import Event, RLock, Thread
from time import time
from typing import Any, Dict, Optional, Sequence
import pvcheetah
import pvleopard
import pvporcupine
import pvrhino
from platypush.context import get_plugin
from platypush.message.event.assistant import (
AssistantEvent,
ConversationTimeoutEvent,
HotwordDetectedEvent,
IntentMatchedEvent,
SpeechRecognizedEvent,
)
from platypush.plugins.tts.picovoice import TtsPicovoicePlugin
from ._context import ConversationContext
from ._recorder import AudioRecorder
from ._speech import SpeechProcessor
from ._state import AssistantState
class Assistant:
class Assistant(Thread):
"""
A facade class that wraps the Picovoice engines under an assistant API.
"""
@ -43,6 +43,7 @@ class Assistant:
keyword_model_path: Optional[str] = None,
frame_expiration: float = 3.0, # Don't process audio frames older than this
speech_model_path: Optional[str] = None,
intent_model_path: Optional[str] = None,
endpoint_duration: Optional[float] = None,
enable_automatic_punctuation: bool = False,
start_conversation_on_hotword: bool = False,
@ -53,8 +54,13 @@ class Assistant:
on_conversation_end=_default_callback,
on_conversation_timeout=_default_callback,
on_speech_recognized=_default_callback,
on_intent_matched=_default_callback,
on_hotword_detected=_default_callback,
):
super().__init__(name='picovoice:Assistant')
if intent_enabled:
assert intent_model_path, 'Intent model path not provided'
self._access_key = access_key
self._stop_event = stop_event
self.logger = logging.getLogger(__name__)
@ -64,26 +70,40 @@ class Assistant:
self.keywords = list(keywords or [])
self.keyword_paths = None
self.keyword_model_path = None
self._responding = Event()
self.frame_expiration = frame_expiration
self.endpoint_duration = endpoint_duration
self.enable_automatic_punctuation = enable_automatic_punctuation
self.start_conversation_on_hotword = start_conversation_on_hotword
self.audio_queue_size = audio_queue_size
self._responding = Event()
self._muted = muted
self._speech_model_path = speech_model_path
self._speech_model_path_override = None
self._intent_model_path = intent_model_path
self._intent_model_path_override = None
self._in_ctx = False
self._speech_processor = SpeechProcessor(
stop_event=stop_event,
stt_enabled=stt_enabled,
intent_enabled=intent_enabled,
conversation_timeout=conversation_timeout,
model_path=speech_model_path,
get_cheetah_args=self._get_speech_engine_args,
get_rhino_args=self._get_speech_engine_args,
)
self._on_conversation_start = on_conversation_start
self._on_conversation_end = on_conversation_end
self._on_conversation_timeout = on_conversation_timeout
self._on_speech_recognized = on_speech_recognized
self._on_intent_matched = on_intent_matched
self._on_hotword_detected = on_hotword_detected
self._recorder = None
self._state = AssistantState.IDLE
self._state_lock = RLock()
self._ctx = ConversationContext(timeout=conversation_timeout)
self._evt_queue = Queue(maxsize=100)
if hotword_enabled:
if not keywords:
@ -110,11 +130,7 @@ class Assistant:
self.keyword_model_path = keyword_model_path
# Model path -> model instance cache
self._cheetah = {}
self._leopard: Optional[pvleopard.Leopard] = None
self._porcupine: Optional[pvporcupine.Porcupine] = None
self._rhino: Optional[pvrhino.Rhino] = None
@property
def is_responding(self):
@ -124,6 +140,10 @@ class Assistant:
def speech_model_path(self):
return self._speech_model_path_override or self._speech_model_path
@property
def intent_model_path(self):
return self._intent_model_path_override or self._intent_model_path
@property
def tts(self) -> TtsPicovoicePlugin:
p = get_plugin('tts.picovoice')
@ -157,18 +177,23 @@ class Assistant:
if prev_state == new_state:
return
self.logger.info('Assistant state transition: %s -> %s', prev_state, new_state)
if prev_state == AssistantState.DETECTING_SPEECH:
self.tts.stop()
self._ctx.stop()
self._speech_model_path_override = None
self._intent_model_path_override = None
self._speech_processor.on_conversation_end()
self._on_conversation_end()
elif new_state == AssistantState.DETECTING_SPEECH:
self._ctx.start()
self._speech_processor.on_conversation_start()
self._on_conversation_start()
if new_state == AssistantState.DETECTING_HOTWORD:
self.tts.stop()
self._ctx.reset()
self._speech_processor.on_conversation_reset()
# Put a null event on the event queue to unblock next_event
self._evt_queue.put(None)
@property
def porcupine(self) -> Optional[pvporcupine.Porcupine]:
@ -188,23 +213,16 @@ class Assistant:
return self._porcupine
@property
def cheetah(self) -> Optional[pvcheetah.Cheetah]:
if not self.stt_enabled:
return None
def _get_speech_engine_args(self) -> dict:
args: Dict[str, Any] = {'access_key': self._access_key}
if self.speech_model_path:
args['model_path'] = self.speech_model_path
if self.endpoint_duration:
args['endpoint_duration_sec'] = self.endpoint_duration
if self.enable_automatic_punctuation:
args['enable_automatic_punctuation'] = self.enable_automatic_punctuation
if not self._cheetah.get(self.speech_model_path):
args: Dict[str, Any] = {'access_key': self._access_key}
if self.speech_model_path:
args['model_path'] = self.speech_model_path
if self.endpoint_duration:
args['endpoint_duration_sec'] = self.endpoint_duration
if self.enable_automatic_punctuation:
args['enable_automatic_punctuation'] = self.enable_automatic_punctuation
self._cheetah[self.speech_model_path] = pvcheetah.create(**args)
return self._cheetah[self.speech_model_path]
return args
def __enter__(self):
"""
@ -213,11 +231,14 @@ class Assistant:
if self.should_stop():
return self
assert not self.is_alive(), 'The assistant is already running'
self._in_ctx = True
if self._recorder:
self.logger.info('A recording stream already exists')
elif self.hotword_enabled or self.stt_enabled:
sample_rate = (self.porcupine or self.cheetah).sample_rate # type: ignore
frame_length = (self.porcupine or self.cheetah).frame_length # type: ignore
elif self.hotword_enabled or self.stt_enabled or self.intent_enabled:
sample_rate = (self.porcupine or self._speech_processor).sample_rate
frame_length = (self.porcupine or self._speech_processor).frame_length
self._recorder = AudioRecorder(
stop_event=self._stop_event,
sample_rate=sample_rate,
@ -227,9 +248,7 @@ class Assistant:
channels=1,
)
if self.stt_enabled:
self._cheetah[self.speech_model_path] = self.cheetah
self._speech_processor.__enter__()
self._recorder.__enter__()
if self.porcupine:
@ -237,33 +256,25 @@ class Assistant:
else:
self.state = AssistantState.DETECTING_SPEECH
self.start()
return self
def __exit__(self, *_):
"""
Stop the assistant and release all resources.
"""
self._in_ctx = False
if self._recorder:
self._recorder.__exit__(*_)
self._recorder = None
self.state = AssistantState.IDLE
for model in [*self._cheetah.keys()]:
cheetah = self._cheetah.pop(model, None)
if cheetah:
cheetah.delete()
if self._leopard:
self._leopard.delete()
self._leopard = None
if self._porcupine:
self._porcupine.delete()
self._porcupine = None
if self._rhino:
self._rhino.delete()
self._rhino = None
self._speech_processor.__exit__(*_)
def __iter__(self):
"""
@ -275,29 +286,36 @@ class Assistant:
"""
Process the next audio frame and return the corresponding event.
"""
has_data = False
if self.should_stop() or not self._recorder:
raise StopIteration
while not (self.should_stop() or has_data):
data = self._recorder.read()
if data is None:
continue
if self.hotword_enabled and self.state == AssistantState.DETECTING_HOTWORD:
return self._evt_queue.get()
frame, t = data
if time() - t > self.frame_expiration:
self.logger.info(
'Skipping audio frame older than %ss', self.frame_expiration
)
continue # The audio frame is too old
evt = None
if (
self._speech_processor.enabled
and self.state == AssistantState.DETECTING_SPEECH
):
evt = self._speech_processor.next_event()
if self.hotword_enabled and self.state == AssistantState.DETECTING_HOTWORD:
return self._process_hotword(frame)
if isinstance(evt, SpeechRecognizedEvent):
self._on_speech_recognized(phrase=evt.args['phrase'])
if isinstance(evt, IntentMatchedEvent):
self._on_intent_matched(
intent=evt.args['intent'], slots=evt.args.get('slots', {})
)
if isinstance(evt, ConversationTimeoutEvent):
self._on_conversation_timeout()
if self.stt_enabled and self.state == AssistantState.DETECTING_SPEECH:
return self._process_speech(frame)
if (
evt
and self.state == AssistantState.DETECTING_SPEECH
and self.hotword_enabled
):
self.state = AssistantState.DETECTING_HOTWORD
raise StopIteration
return evt
def mute(self):
self._muted = True
@ -321,7 +339,7 @@ class Assistant:
else:
self.mute()
def _process_hotword(self, frame):
def _process_hotword(self, frame) -> Optional[HotwordDetectedEvent]:
if not self.porcupine:
return None
@ -333,48 +351,61 @@ class Assistant:
if self.start_conversation_on_hotword:
self.state = AssistantState.DETECTING_SPEECH
self.tts.stop()
self.tts.stop() # Stop any ongoing TTS when the hotword is detected
self._on_hotword_detected(hotword=self.keywords[keyword_index])
return HotwordDetectedEvent(hotword=self.keywords[keyword_index])
return None
def _process_speech(self, frame):
if not self.cheetah:
return None
event = None
partial_transcript, self._ctx.is_final = self.cheetah.process(frame)
if partial_transcript:
self._ctx.transcript += partial_transcript
self.logger.info(
'Partial transcript: %s, is_final: %s',
self._ctx.transcript,
self._ctx.is_final,
)
if self._ctx.is_final or self._ctx.timed_out:
phrase = self.cheetah.flush() or ''
self._ctx.transcript += phrase
phrase = self._ctx.transcript
phrase = phrase[:1].lower() + phrase[1:]
if phrase:
event = SpeechRecognizedEvent(phrase=phrase)
self._on_speech_recognized(phrase=phrase)
else:
event = ConversationTimeoutEvent()
self._on_conversation_timeout()
self._ctx.reset()
if self.hotword_enabled:
self.state = AssistantState.DETECTING_HOTWORD
return event
def override_speech_model(self, model_path: Optional[str]):
self._speech_model_path_override = model_path
def override_intent_model(self, model_path: Optional[str]):
self._intent_model_path_override = model_path
def _put_event(self, evt: AssistantEvent):
try:
self._evt_queue.put_nowait(evt)
except Full:
self.logger.warning('The assistant event queue is full')
def run(self):
assert (
self._in_ctx
), 'The assistant can only be started through a context manager'
super().run()
while not self.should_stop() and self._recorder:
self._recorder.wait_start()
if self.should_stop():
break
data = self._recorder.read()
if data is None:
continue
frame, t = data
if time() - t > self.frame_expiration:
self.logger.info(
'Skipping audio frame older than %ss', self.frame_expiration
)
continue # The audio frame is too old
if self.hotword_enabled and self.state == AssistantState.DETECTING_HOTWORD:
evt = self._process_hotword(frame)
if evt:
self._put_event(evt)
continue
if (
self._speech_processor.enabled
and self.state == AssistantState.DETECTING_SPEECH
):
self._speech_processor.process(frame, block=False)
self.logger.info('Assistant stopped')
# vim:sw=4:ts=4:et:

View File

@ -2,6 +2,8 @@ from dataclasses import dataclass
from time import time
from typing import Optional
from ._intent import Intent
@dataclass
class ConversationContext:
@ -11,6 +13,7 @@ class ConversationContext:
transcript: str = ''
is_final: bool = False
intent: Optional[Intent] = None
timeout: Optional[float] = None
t_start: Optional[float] = None
t_end: Optional[float] = None
@ -25,6 +28,7 @@ class ConversationContext:
def reset(self):
self.transcript = ''
self.intent = None
self.is_final = False
self.t_start = None
self.t_end = None
@ -32,14 +36,18 @@ class ConversationContext:
@property
def timed_out(self):
return (
not self.transcript
and not self.is_final
(
(not self.transcript and not self.is_final)
or (not self.intent and not self.is_final)
)
and self.timeout
and self.t_start
and time() - self.t_start > self.timeout
) or (
self.transcript
and not self.is_final
(
(self.transcript and not self.is_final)
or (self.intent and not self.is_final)
)
and self.timeout
and self.t_start
and time() - self.t_start > self.timeout * 2

View File

@ -0,0 +1,11 @@
from dataclasses import dataclass, field
@dataclass
class Intent:
"""
Speech intent data class.
"""
name: str
slots: dict = field(default_factory=dict)

View File

@ -178,3 +178,14 @@ class AudioRecorder:
Wait until the audio stream is stopped.
"""
wait_for_either(self._stop_event, self._upstream_stop_event, timeout=timeout)
def wait_start(self, timeout: Optional[float] = None):
"""
Wait until the audio stream is started.
"""
wait_for_either(
self._stop_event,
self._upstream_stop_event,
self._paused_state._recording_event,
timeout=timeout,
)

View File

@ -0,0 +1,3 @@
from ._processor import SpeechProcessor
__all__ = ['SpeechProcessor']

View File

@ -0,0 +1,152 @@
import logging
from abc import ABC, abstractmethod
from queue import Empty, Queue
from threading import Event, Thread, get_ident
from typing import Optional, Sequence
from platypush.message.event.assistant import AssistantEvent
from .._context import ConversationContext
class BaseProcessor(ABC, Thread):
"""
Base speech processor class. It is implemented by the ``SttProcessor`` and
the ``IntentProcessor`` classes.
"""
def __init__(
self,
*args,
stop_event: Event,
conversation_timeout: Optional[float] = None,
**kwargs,
):
super().__init__(*args, name=f'picovoice:{self.__class__.__name__}', **kwargs)
self.logger = logging.getLogger(self.name)
self._audio_queue = Queue()
self._stop_event = stop_event
self._ctx = ConversationContext(timeout=conversation_timeout)
self._event_queue = Queue()
# This event is set if the upstream processor is waiting for an event
# from this processor
self._event_wait = Event()
# This event is set when the processor is done with the audio
# processing and it's ready to accept a new audio frame
self._processing_done = Event()
self._processing_done.set()
def should_stop(self) -> bool:
return self._stop_event.is_set()
def wait_stop(self, timeout: Optional[float] = None) -> bool:
return self._stop_event.wait(timeout)
def enqueue(self, audio: Sequence[int]):
self._event_wait.set()
self._processing_done.clear()
self._audio_queue.put_nowait(audio)
@property
def processing_done(self) -> Event:
return self._processing_done
@property
@abstractmethod
def _model_path(self) -> Optional[str]:
"""
Return the model path.
"""
@property
@abstractmethod
def sample_rate(self) -> int:
"""
:return: The sample rate wanted by Cheetah/Rhino.
"""
@property
@abstractmethod
def frame_length(self) -> int:
"""
:return: The frame length wanted by Cheetah/Rhino.
"""
def last_event(self) -> Optional[AssistantEvent]:
"""
:return: The latest event that was processed by the processor.
"""
evt = None
try:
while True:
evt = self._event_queue.get_nowait()
except Empty:
pass
if evt:
self._event_wait.clear()
return evt
def clear_wait(self):
self._event_wait.clear()
@abstractmethod
def process(self, audio: Sequence[int]) -> Optional[AssistantEvent]:
"""
Process speech events from a raw audio input.
"""
def run(self):
super().run()
self._ctx.reset()
self._processing_done.clear()
self.logger.info('Processor started: %s', self.name)
while not self.should_stop():
audio = self._audio_queue.get()
# The thread is stopped when it receives a None object
if audio is None:
break
# Don't process the audio if the upstream processor is not waiting
# for an event
if not self._event_wait.is_set():
continue
try:
self._processing_done.clear()
event = self.process(audio)
if event:
self._event_queue.put_nowait(event)
self._processing_done.set()
except Exception as e:
self.logger.error(
'An error occurred while processing the audio on %s: %s',
self.name,
e,
exc_info=e,
)
self.wait_stop(timeout=1)
self._processing_done.set()
continue
self._ctx.reset()
self.logger.info('Processor stopped: %s', self.name)
def stop(self):
self._audio_queue.put_nowait(None)
if self.is_alive() and self.ident != get_ident():
self.logger.debug('Stopping %s', self.name)
self.join()
def on_conversation_start(self):
self._ctx.start()
def on_conversation_end(self):
self._ctx.stop()
def on_conversation_reset(self):
self._ctx.reset()

View File

@ -0,0 +1,86 @@
from typing import Callable, Optional, Sequence, Union
import pvrhino
from platypush.message.event.assistant import (
ConversationTimeoutEvent,
IntentMatchedEvent,
)
from ._base import BaseProcessor
class IntentProcessor(BaseProcessor):
"""
Implementation of the speech-to-intent processor using the Picovoice Rhino
engine.
"""
def __init__(
self, *args, get_rhino_args: Callable[[], dict] = lambda: {}, **kwargs
):
super().__init__(*args, **kwargs)
self._get_rhino_args = get_rhino_args
# model_path -> Rhino instance cache
self._rhino = {}
@property
def _model_path(self) -> Optional[str]:
return self._get_rhino_args().get('model_path')
@property
def sample_rate(self) -> int:
return self._get_rhino().sample_rate
@property
def frame_length(self) -> int:
return self._get_rhino().frame_length
def _get_rhino(self) -> pvrhino.Rhino:
if not self._rhino.get(self._model_path):
self._rhino[self._model_path] = pvrhino.create(**self._get_rhino_args())
return self._rhino[self._model_path]
def process(
self, audio: Sequence[int]
) -> Optional[Union[IntentMatchedEvent, ConversationTimeoutEvent]]:
"""
Process the audio and return an ``IntentMatchedEvent`` if the intent was
understood, or a ``ConversationTimeoutEvent`` if the conversation timed
out, or ``None`` if the intent processing is not yet finalized.
"""
event = None
rhino = self._get_rhino()
self._ctx.is_final = rhino.process(audio)
if self._ctx.is_final:
inference = rhino.get_inference()
self.logger.debug(
'Intent detection finalized. Inference understood: %s',
inference.is_understood,
)
if inference.is_understood:
event = IntentMatchedEvent(
intent=inference.intent,
slots={slot.key: slot.value for slot in inference.slots},
)
if not event and self._ctx.timed_out:
event = ConversationTimeoutEvent()
if event:
self._ctx.reset()
if event:
self.logger.debug('Intent event: %s', event)
return event
def stop(self):
super().stop()
objs = self._rhino.copy()
for key, obj in objs.items():
obj.delete()
self._rhino.pop(key)

View File

@ -0,0 +1,196 @@
import logging
from queue import Queue
from threading import Event
from typing import Callable, Optional, Sequence
from platypush.message.event.assistant import AssistantEvent
from platypush.utils import wait_for_either
from ._intent import IntentProcessor
from ._stt import SttProcessor
class SpeechProcessor:
"""
Speech processor class that wraps the STT and Intent processors under the
same interface.
"""
def __init__(
self,
stop_event: Event,
model_path: Optional[str] = None,
stt_enabled: bool = True,
intent_enabled: bool = False,
conversation_timeout: Optional[float] = None,
get_cheetah_args: Callable[[], dict] = lambda: {},
get_rhino_args: Callable[[], dict] = lambda: {},
):
self.logger = logging.getLogger(self.__class__.__name__)
self._stt_enabled = stt_enabled
self._intent_enabled = intent_enabled
self._model_path = model_path
self._conversation_timeout = conversation_timeout
self._audio_queue = Queue()
self._stop_event = stop_event
self._get_cheetah_args = get_cheetah_args
self._get_rhino_args = get_rhino_args
self._stt_processor = SttProcessor(
conversation_timeout=conversation_timeout,
stop_event=stop_event,
get_cheetah_args=get_cheetah_args,
)
self._intent_processor = IntentProcessor(
conversation_timeout=conversation_timeout,
stop_event=stop_event,
get_rhino_args=get_rhino_args,
)
@property
def enabled(self) -> bool:
"""
The processor is enabled if either the STT or the Intent processor are
enabled.
"""
return self._stt_enabled or self._intent_enabled
def should_stop(self) -> bool:
return self._stop_event.is_set()
def next_event(self, timeout: Optional[float] = None) -> Optional[AssistantEvent]:
evt = None
# Wait for either the STT or Intent processor to finish processing the audio
completed = wait_for_either(
self._stt_processor.processing_done,
self._intent_processor.processing_done,
self._stop_event,
timeout=timeout,
)
if not completed:
self.logger.warning('Timeout while waiting for the processors to finish')
# Immediately return if the stop event is set
if self.should_stop():
return evt
# Priority to the intent processor event, if the processor is enabled
if self._intent_enabled:
evt = self._intent_processor.last_event()
if evt:
self.logger.debug('Intent processor event: %s', evt)
# If the intent processor didn't return any event, then return the STT
# processor event
if not evt and self._stt_enabled:
evt = self._stt_processor.last_event()
if evt:
self.logger.debug('STT processor event: %s', evt)
if evt:
self._stt_processor.clear_wait()
self._intent_processor.clear_wait()
return evt
def process(
self, audio: Sequence[int], block: bool = True, timeout: Optional[float] = None
) -> Optional[AssistantEvent]:
"""
Process an audio frame.
The audio frame is enqueued to both the STT and Intent processors, if
enabled. The function waits for either processor to finish processing
the audio, and returns the event from the first processor that returns
a result.
Priority is given to the Intent processor if enabled, otherwise the STT
processor is used.
"""
# Enqueue the audio to both the STT and Intent processors if enabled
if self._stt_enabled:
self._stt_processor.enqueue(audio)
if self._intent_enabled:
self._intent_processor.enqueue(audio)
if not block:
return None
return self.next_event(timeout=timeout)
def __enter__(self):
"""
Context manager entry point - it wraps :meth:`start`.
"""
self.start()
def __exit__(self, *_, **__):
"""
Context manager exit point - it wraps :meth:`stop`.
"""
self.stop()
def start(self):
"""
Start the STT and Intent processors.
"""
self._stt_processor.start()
self._intent_processor.start()
def stop(self):
"""
Stop the STT and Intent processors.
"""
self._stt_processor.stop()
self._intent_processor.stop()
def on_conversation_start(self):
if self._stt_enabled:
self._stt_processor.on_conversation_start()
if self._intent_enabled:
self._intent_processor.on_conversation_start()
def on_conversation_end(self):
if self._stt_enabled:
self._stt_processor.on_conversation_end()
if self._intent_enabled:
self._intent_processor.on_conversation_end()
def on_conversation_reset(self):
if self._stt_enabled:
self._stt_processor.on_conversation_reset()
if self._intent_enabled:
self._intent_processor.on_conversation_reset()
@property
def sample_rate(self) -> int:
"""
The sample rate of the audio frames.
"""
if self._intent_enabled:
return self._intent_processor.sample_rate
if self._stt_enabled:
return self._stt_processor.sample_rate
raise ValueError('No processor enabled')
@property
def frame_length(self) -> int:
"""
The frame length of the audio frames.
"""
if self._intent_enabled:
return self._intent_processor.frame_length
if self._stt_enabled:
return self._stt_processor.frame_length
raise ValueError('No processor enabled')

View File

@ -0,0 +1,92 @@
from typing import Callable, Optional, Sequence, Union
import pvcheetah
from platypush.message.event.assistant import (
ConversationTimeoutEvent,
SpeechRecognizedEvent,
)
from ._base import BaseProcessor
class SttProcessor(BaseProcessor):
"""
Implementation of the speech-to-text processor using the Picovoice Cheetah
engine.
"""
def __init__(
self, *args, get_cheetah_args: Callable[[], dict] = lambda: {}, **kwargs
):
super().__init__(*args, **kwargs)
self._get_cheetah_args = get_cheetah_args
# model_path -> Cheetah instance cache
self._cheetah = {self._model_path: pvcheetah.create(**self._get_cheetah_args())}
@property
def _model_path(self) -> Optional[str]:
return self._get_cheetah_args().get('model_path')
@property
def sample_rate(self) -> int:
return self._get_cheetah().sample_rate
@property
def frame_length(self) -> int:
return self._get_cheetah().frame_length
def _get_cheetah(self) -> pvcheetah.Cheetah:
if not self._cheetah.get(self._model_path):
self.logger.debug(
'Creating Cheetah instance for model %s', self._model_path
)
self._cheetah[self._model_path] = pvcheetah.create(
**self._get_cheetah_args()
)
self.logger.debug('Cheetah instance created for model %s', self._model_path)
return self._cheetah[self._model_path]
def process(
self, audio: Sequence[int]
) -> Optional[Union[SpeechRecognizedEvent, ConversationTimeoutEvent]]:
event = None
cheetah = self._get_cheetah()
partial_transcript, self._ctx.is_final = cheetah.process(audio)
# Concatenate the partial transcript to the context
if partial_transcript:
self._ctx.transcript += partial_transcript
self.logger.info(
'Partial transcript: %s, is_final: %s',
self._ctx.transcript,
self._ctx.is_final,
)
# If the transcript is final or the conversation timed out, then
# process and return whatever is available in the context
if self._ctx.is_final or self._ctx.timed_out:
phrase = cheetah.flush() or ''
self._ctx.transcript += phrase
phrase = self._ctx.transcript
phrase = phrase[:1].lower() + phrase[1:]
event = (
SpeechRecognizedEvent(phrase=phrase)
if phrase
else ConversationTimeoutEvent()
)
self._ctx.reset()
if event:
self.logger.debug('STT event: %s', event)
return event
def stop(self):
super().stop()
objs = self._cheetah.copy()
for key, obj in objs.items():
obj.delete()
self._cheetah.pop(key)

View File

@ -6,9 +6,11 @@ manifest:
- platypush.message.event.assistant.ConversationStartEvent
- platypush.message.event.assistant.ConversationTimeoutEvent
- platypush.message.event.assistant.HotwordDetectedEvent
- platypush.message.event.assistant.IntentMatchedEvent
- platypush.message.event.assistant.MicMutedEvent
- platypush.message.event.assistant.MicUnmutedEvent
- platypush.message.event.assistant.NoResponseEvent
- platypush.message.event.assistant.ResponseEndEvent
- platypush.message.event.assistant.ResponseEvent
- platypush.message.event.assistant.SpeechRecognizedEvent
install:
@ -22,6 +24,7 @@ manifest:
- ffmpeg
- python-sounddevice
pip:
- num2words # Temporary dependency
- pvcheetah
- pvleopard
- pvorca

View File

@ -1,7 +1,12 @@
import threading
from typing import Callable, Optional
from typing import Optional
import pychromecast # type: ignore
from pychromecast import (
CastBrowser,
Chromecast,
ChromecastConnectionError,
SimpleCastListener,
get_chromecast_from_cast_info,
)
from platypush.backend.http.app.utils import get_remote_base_url
from platypush.plugins import RunnablePlugin, action
@ -31,22 +36,38 @@ class MediaChromecastPlugin(MediaPlugin, RunnablePlugin):
:param poll_interval: How often the plugin should poll for new/removed
Chromecast devices (default: 30 seconds).
"""
super().__init__(poll_interval=poll_interval, **kwargs)
self._is_local = False
self.chromecast = chromecast
self.chromecasts = {}
self._chromecasts_by_uuid = {}
self._chromecasts_by_name = {}
self._media_listeners = {}
self._refresh_lock = threading.RLock()
self._zc = None
self._browser = None
def _get_chromecasts(self, *args, **kwargs):
with self._refresh_lock:
chromecasts = pychromecast.get_chromecasts(*args, **kwargs)
@property
def zc(self):
from zeroconf import Zeroconf
if isinstance(chromecasts, tuple):
return chromecasts[0]
return chromecasts
if not self._zc:
self._zc = Zeroconf()
return self._zc
@property
def browser(self):
if not self._browser:
self._browser = CastBrowser(
SimpleCastListener(self._on_chromecast_discovered), self.zc
)
self._browser.start_discovery()
return self._browser
def _on_chromecast_discovered(self, _, service: str):
self.logger.info('Discovered Chromecast: %s', service)
@staticmethod
def _get_device_property(cc, prop: str):
@ -54,18 +75,29 @@ class MediaChromecastPlugin(MediaPlugin, RunnablePlugin):
return getattr(cc.device, prop)
return getattr(cc.cast_info, prop)
def _serialize_device(self, cc: pychromecast.Chromecast) -> dict:
def _serialize_device(self, cc: Chromecast) -> dict:
"""
Convert a Chromecast object and its status to a dictionary.
"""
if hasattr(cc, 'cast_info'): # Newer PyChromecast API
host = cc.cast_info.host
port = cc.cast_info.port
elif hasattr(cc, 'host'):
host = getattr(cc, 'host', None)
port = getattr(cc, 'port', None)
elif hasattr(cc, 'uri'):
host, port = cc.uri.split(':')
else:
raise RuntimeError('Invalid Chromecast object')
return {
'type': cc.cast_type,
'name': cc.name,
'manufacturer': self._get_device_property(cc, 'manufacturer'),
'model_name': cc.model_name,
'uuid': str(cc.uuid),
'address': cc.host if hasattr(cc, 'host') else cc.uri.split(':')[0],
'port': cc.port if hasattr(cc, 'port') else int(cc.uri.split(':')[1]),
'address': host,
'port': port,
'status': (
{
'app': {
@ -85,93 +117,23 @@ class MediaChromecastPlugin(MediaPlugin, RunnablePlugin):
),
}
def _refresh_chromecasts(
self,
tries: int = 2,
retry_wait: float = 10,
timeout: float = 60,
blocking: bool = True,
callback: Optional[Callable] = None,
):
"""
Get the list of Chromecast devices
def _event_callback(self, _, cast: Chromecast):
self._chromecasts_by_uuid[cast.uuid] = cast
self._chromecasts_by_name[
self._get_device_property(cast, 'friendly_name')
] = cast
:param tries: Number of retries (default: 2)
:param retry_wait: Number of seconds between retries (default: 10 seconds)
:param timeout: Timeout before failing the call (default: 60 seconds)
:param blocking: If true, then the function will block until all the
Chromecast devices have been scanned. If false, then the provided
callback function will be invoked when a new device is discovered
:param callback: If blocking is false, then you can provide a callback
function that will be invoked when a new device is discovered
"""
self.chromecasts = {
self._get_device_property(cast, 'friendly_name'): cast
for cast in self._get_chromecasts(
tries=tries,
retry_wait=retry_wait,
timeout=timeout,
blocking=blocking,
callback=callback,
)
}
for name, cast in self.chromecasts.items():
self._update_listeners(name, cast)
for cast in self.chromecasts.values():
cast.wait()
def _event_callback(self, _, cast: pychromecast.Chromecast):
with self._refresh_lock:
self.chromecasts[self._get_device_property(cast, 'friendly_name')] = cast
def _update_listeners(self, name, cast):
if name not in self._media_listeners:
cast.start()
self._media_listeners[name] = MediaListener(
name=name, cast=cast, callback=self._event_callback
)
cast.media_controller.register_status_listener(self._media_listeners[name])
def get_chromecast(self, chromecast=None, n_tries=2):
if isinstance(chromecast, pychromecast.Chromecast):
assert chromecast, 'Invalid Chromecast object'
def get_chromecast(self, chromecast=None):
if isinstance(chromecast, Chromecast):
return chromecast
if not chromecast:
if not self.chromecast:
raise RuntimeError(
'No Chromecast specified nor default Chromecast configured'
)
chromecast = self.chromecast
if self._chromecasts_by_uuid.get(chromecast):
return self._chromecasts_by_uuid[chromecast]
if chromecast not in self.chromecasts:
casts = {}
while n_tries > 0:
n_tries -= 1
casts.update(
{
self._get_device_property(cast, 'friendly_name'): cast
for cast in self._get_chromecasts()
}
)
if self._chromecasts_by_name.get(chromecast):
return self._chromecasts_by_name[chromecast]
if chromecast in casts:
self.chromecasts.update(casts)
break
if chromecast not in self.chromecasts:
raise RuntimeError(f'Device {chromecast} not found')
cast = self.chromecasts[chromecast]
try:
cast.wait()
except Exception as e:
self.logger.warning('Failed to wait Chromecast sync: %s', e)
return cast
raise AssertionError(f'Chromecast {chromecast} not found')
@action
def play(
@ -276,24 +238,34 @@ class MediaChromecastPlugin(MediaPlugin, RunnablePlugin):
chromecast = chromecast or self.chromecast
cast = self.get_chromecast(chromecast)
if cast.media_controller.is_paused:
if cast.media_controller.status.player_is_paused:
cast.media_controller.play()
elif cast.media_controller.is_playing:
elif cast.media_controller.status.player_is_playing:
cast.media_controller.pause()
cast.wait()
return self.status(chromecast=chromecast)
@action
def stop(self, *_, chromecast: Optional[str] = None, **__):
def stop(self, *_, chromecast: Optional[str] = None, **__): # type: ignore
if self.should_stop():
if self._zc:
self._zc.close()
self._zc = None
if self._browser:
self._browser.stop_discovery()
self._browser = None
return
chromecast = chromecast or self.chromecast
if not chromecast:
return
return None
cast = self.get_chromecast(chromecast)
cast.media_controller.stop()
cast.wait()
return self.status(chromecast=chromecast)
@action
@ -339,51 +311,51 @@ class MediaChromecastPlugin(MediaPlugin, RunnablePlugin):
def is_playing(self, chromecast: Optional[str] = None, **_):
return self.get_chromecast(
chromecast or self.chromecast
).media_controller.is_playing
).media_controller.status.player_is_playing
@action
def is_paused(self, chromecast: Optional[str] = None, **_):
return self.get_chromecast(
chromecast or self.chromecast
).media_controller.is_paused
).media_controller.status.player_is_paused
@action
def is_idle(self, chromecast: Optional[str] = None):
return self.get_chromecast(
chromecast or self.chromecast
).media_controller.is_idle
).media_controller.status.player_is_idle
@action
def list_subtitles(self, chromecast: Optional[str] = None):
return self.get_chromecast(
chromecast or self.chromecast
).media_controller.subtitle_tracks
).media_controller.status.subtitle_tracks
@action
def enable_subtitles(
self, chromecast: Optional[str] = None, track_id: Optional[str] = None, **_
self, chromecast: Optional[str] = None, track_id: Optional[int] = None, **_
):
mc = self.get_chromecast(chromecast or self.chromecast).media_controller
if track_id is not None:
return mc.enable_subtitle(track_id)
if mc.subtitle_tracks:
return mc.enable_subtitle(mc.subtitle_tracks[0].get('trackId'))
if mc.status.subtitle_tracks:
return mc.enable_subtitle(mc.status.subtitle_tracks[0].get('trackId'))
@action
def disable_subtitles(
self, chromecast: Optional[str] = None, track_id: Optional[str] = None, **_
self, chromecast: Optional[str] = None, track_id: Optional[int] = None, **_
):
mc = self.get_chromecast(chromecast or self.chromecast).media_controller
if track_id:
return mc.disable_subtitle(track_id)
if mc.current_subtitle_tracks:
return mc.disable_subtitle(mc.current_subtitle_tracks[0])
if mc.status.current_subtitle_tracks:
return mc.disable_subtitle(mc.status.current_subtitle_tracks[0])
@action
def toggle_subtitles(self, chromecast: Optional[str] = None, **_):
mc = self.get_chromecast(chromecast or self.chromecast).media_controller
all_subs = mc.status.subtitle_tracks
cur_subs = mc.status.status.current_subtitle_tracks
cur_subs = mc.status.current_subtitle_tracks
if cur_subs:
return self.disable_subtitles(chromecast, cur_subs[0])
@ -489,13 +461,13 @@ class MediaChromecastPlugin(MediaPlugin, RunnablePlugin):
def _status(self, chromecast: Optional[str] = None) -> dict:
if chromecast:
assert (
chromecast in self.chromecasts
chromecast in self._chromecasts_by_name
), f'No such Chromecast device: {chromecast}'
return self._serialize_device(self.chromecasts[chromecast])
return self._serialize_device(self._chromecasts_by_name[chromecast])
return {
name: self._serialize_device(cast)
for name, cast in self.chromecasts.items()
for name, cast in self._chromecasts_by_name.items()
}
@action
@ -503,7 +475,6 @@ class MediaChromecastPlugin(MediaPlugin, RunnablePlugin):
self,
chromecast: Optional[str] = None,
timeout: Optional[float] = None,
blocking: bool = True,
):
"""
Disconnect a Chromecast and wait for it to terminate
@ -512,11 +483,9 @@ class MediaChromecastPlugin(MediaPlugin, RunnablePlugin):
the default configured Chromecast will be used.
:param timeout: Number of seconds to wait for disconnection (default:
None: block until termination).
:param blocking: If set (default), then the code will wait until
disconnection, otherwise it will return immediately.
"""
cast = self.get_chromecast(chromecast)
cast.disconnect(timeout=timeout, blocking=blocking)
cast.disconnect(timeout=timeout)
@action
def join(self, chromecast: Optional[str] = None, timeout: Optional[float] = None):
@ -542,17 +511,6 @@ class MediaChromecastPlugin(MediaPlugin, RunnablePlugin):
cast = self.get_chromecast(chromecast)
cast.quit_app()
@action
def reboot(self, chromecast: Optional[str] = None):
"""
Reboots the Chromecast
:param chromecast: Chromecast to cast to. If none is specified, then
the default configured Chromecast will be used.
"""
cast = self.get_chromecast(chromecast)
cast.reboot()
@action
def set_volume(self, volume: float, chromecast: Optional[str] = None):
"""
@ -613,7 +571,7 @@ class MediaChromecastPlugin(MediaPlugin, RunnablePlugin):
"""
chromecast = chromecast or self.chromecast
cast = self.get_chromecast(chromecast)
cast.set_volume_muted(not cast.status.volume_muted)
cast.set_volume_muted(not cast.media_controller.status.volume_muted)
cast.wait()
return self.status(chromecast=chromecast)
@ -623,6 +581,54 @@ class MediaChromecastPlugin(MediaPlugin, RunnablePlugin):
def remove_subtitles(self, *_, **__):
raise NotImplementedError
def _refresh_chromecasts(self):
cast_info = {cast.friendly_name: cast for cast in self.browser.devices.values()}
for info in cast_info.values():
name = info.friendly_name
if self._chromecasts_by_uuid.get(
info.uuid
) and self._chromecasts_by_name.get(name):
self.logger.debug('Chromecast %s already connected', name)
continue
self.logger.info('Started scan for Chromecast %s', name)
try:
cc = get_chromecast_from_cast_info(
info,
self.browser.zc,
tries=2,
retry_wait=5,
timeout=30,
)
self._chromecasts_by_name[cc.name] = cc
except ChromecastConnectionError:
self.logger.warning('Failed to connect to Chromecast %s', info)
continue
if cc.uuid not in self._chromecasts_by_uuid:
self._chromecasts_by_uuid[cc.uuid] = cc
self.logger.debug('Connecting to Chromecast %s', name)
if name not in self._media_listeners:
cc.start()
self._media_listeners[name] = MediaListener(
name=name or str(cc.uuid),
cast=cc,
callback=self._event_callback,
)
cc.media_controller.register_status_listener(
self._media_listeners[name]
)
self.logger.info('Connected to Chromecast %s', name)
self._chromecasts_by_uuid[cc.uuid] = cc
self._chromecasts_by_name[name] = cc
def main(self):
while not self.should_stop():
try:

View File

@ -1,5 +1,9 @@
# pylint: disable=too-few-public-methods
class SubtitlesAsyncHandler:
import logging
from pychromecast.controllers.media import MediaStatusListener
class SubtitlesAsyncHandler(MediaStatusListener):
"""
This class is used to enable subtitles when the media is loaded.
"""
@ -8,9 +12,17 @@ class SubtitlesAsyncHandler:
self.mc = mc
self.subtitle_id = subtitle_id
self.initialized = False
self.logger = logging.getLogger(__name__)
def new_media_status(self, *_):
if self.subtitle_id and not self.initialized:
self.mc.update_status()
self.mc.enable_subtitle(self.subtitle_id)
self.initialized = True
def load_media_failed(self, queue_item_id: int, error_code: int) -> None:
self.logger.warning(
"Failed to load media with queue_item_id %d, error code: %d",
queue_item_id,
error_code,
)

View File

@ -46,9 +46,7 @@ class ZeroconfListener(ServiceListener):
'properties': {
k.decode()
if isinstance(k, bytes)
else k: v.decode()
if isinstance(v, bytes)
else v
else k: (v.decode() if isinstance(v, bytes) else v)
for k, v in info.properties.items()
},
'server': info.server,
@ -166,9 +164,7 @@ class ZeroconfPlugin(Plugin):
get_bus().post(evt)
except queue.Empty:
if not services:
self.logger.warning(
'No such service discovered: {}'.format(service)
)
self.logger.warning('No such service discovered: %s', service)
finally:
if browser:
browser.cancel()