import logging import json import os import threading import time import grpc import google.auth.transport.grpc import google.auth.transport.requests import google.oauth2.credentials import googlesamples.assistant.grpc.audio_helpers as audio_helpers import googlesamples.assistant.grpc.device_helpers as device_helpers import googlesamples.assistant.grpc.assistant_helpers as assistant_helpers from tenacity import retry, stop_after_attempt, retry_if_exception from google.assistant.embedded.v1alpha2 import ( embedded_assistant_pb2, embedded_assistant_pb2_grpc ) from platypush.backend import Backend from platypush.message.event.assistant import \ ConversationStartEvent, ConversationEndEvent, SpeechRecognizedEvent class AssistantGooglePushtotalkBackend(Backend): """ Google Assistant pushtotalk backend. Instead of listening for the "OK Google" hotword like the assistant.google backend, this implementation programmatically starts a conversation upon start_conversation() method call. Use this backend on devices that don't have an Assistant SDK package (e.g. arm6 devices like the RaspberryPi Zero or the RaspberryPi 1) """ api_endpoint = 'embeddedassistant.googleapis.com' audio_sample_rate = audio_helpers.DEFAULT_AUDIO_SAMPLE_RATE audio_sample_width = audio_helpers.DEFAULT_AUDIO_SAMPLE_WIDTH audio_iter_size = audio_helpers.DEFAULT_AUDIO_ITER_SIZE audio_block_size = audio_helpers.DEFAULT_AUDIO_DEVICE_BLOCK_SIZE audio_flush_size = audio_helpers.DEFAULT_AUDIO_DEVICE_FLUSH_SIZE grpc_deadline = 60 * 3 + 5 def __init__(self, credentials_file=os.path.join( os.path.expanduser('~'), '.config', 'google-oauthlib-tool', 'credentials.json'), device_config=os.path.join( os.path.expanduser('~'), '.config', 'googlesamples-assistant', 'device_config.json'), lang='en-US', conversation_start_fifo = os.path.join(os.path.sep, 'tmp', 'pushtotalk.fifo'), *args, **kwargs): """ Params: credentials_file -- Path to the Google OAuth credentials file (default: ~/.config/google-oauthlib-tool/credentials.json) device_config -- Path to device_config.json. Register your device and create a project, then run the pushtotalk.py script from googlesamples to create your device_config.json lang -- Assistant language (default: en-US) """ super().__init__(*args, **kwargs) self.lang = lang self.credentials_file = credentials_file self.device_config = device_config self.conversation_start_fifo = conversation_start_fifo try: os.mkfifo(self.conversation_start_fifo) except FileExistsError: pass with open(self.device_config) as f: device = json.load(f) self.device_id = device['id'] self.device_model_id = device['model_id'] # Load OAuth 2.0 credentials. try: with open(self.credentials_file, 'r') as f: credentials = google.oauth2.credentials.Credentials(token=None, **json.load(f)) http_request = google.auth.transport.requests.Request() credentials.refresh(http_request) except: logging.error('Error loading credentials: %s', e) logging.error('Run google-oauthlib-tool to initialize ' 'new OAuth 2.0 credentials.') raise # Create an authorized gRPC channel. self.grpc_channel = google.auth.transport.grpc.secure_authorized_channel( credentials, http_request, self.api_endpoint) logging.info('Connecting to %s', self.api_endpoint) # Configure audio source and sink. audio_device = None audio_source = audio_device = ( audio_device or audio_helpers.SoundDeviceStream( sample_rate=self.audio_sample_rate, sample_width=self.audio_sample_width, block_size=self.audio_block_size, flush_size=self.audio_flush_size ) ) audio_sink = audio_device = ( audio_device or audio_helpers.SoundDeviceStream( sample_rate=self.audio_sample_rate, sample_width=self.audio_sample_width, block_size=self.audio_block_size, flush_size=self.audio_flush_size ) ) # Create conversation stream with the given audio source and sink. self.conversation_stream = audio_helpers.ConversationStream( source=audio_source, sink=audio_sink, iter_size=self.audio_iter_size, sample_width=self.audio_sample_width, ) self.device_handler = device_helpers.DeviceRequestHandler(self.device_id) def _process_event(self, event): logging.info('Received assistant event: {}'.format(event)) if event.type == EventType.ON_CONVERSATION_TURN_STARTED: self.bus.post(ConversationStartEvent()) elif event.type == EventType.ON_CONVERSATION_TURN_FINISHED: self.bus.post(ConversationEndEvent()) elif event.type == EventType.ON_RECOGNIZING_SPEECH_FINISHED: phrase = event.args['text'].lower().strip() logging.info('Speech recognized: {}'.format(phrase)) self.bus.post(SpeechRecognizedEvent(phrase=phrase)) def start_conversation(self): if self.assistant: with open(self.conversation_start_fifo, 'w') as f: f.write('1') def stop_conversation(self): if self.assistant: self.conversation_stream.stop_playback() def send_message(self, msg): pass def run(self): super().run() with SampleAssistant(self.lang, self.device_model_id, self.device_id, self.conversation_stream, self.grpc_channel, self.grpc_deadline, self.device_handler) as self.assistant: while not self.should_stop(): with open(self.conversation_start_fifo, 'r') as f: for line in f: pass logging.info('Assistant conversation triggered') continue_conversation = True while continue_conversation: (user_request, continue_conversation) = self.assistant.assist() if user_request: self.bus.post(SpeechRecognizedEvent(phrase=user_request)) class SampleAssistant(object): """Sample Assistant that supports conversations and device actions. Args: device_model_id: identifier of the device model. device_id: identifier of the registered device instance. conversation_stream(ConversationStream): audio stream for recording query and playing back assistant answer. channel: authorized gRPC channel for connection to the Google Assistant API. deadline_sec: gRPC deadline in seconds for Google Assistant API call. device_handler: callback for device actions. """ END_OF_UTTERANCE = embedded_assistant_pb2.AssistResponse.END_OF_UTTERANCE DIALOG_FOLLOW_ON = embedded_assistant_pb2.DialogStateOut.DIALOG_FOLLOW_ON CLOSE_MICROPHONE = embedded_assistant_pb2.DialogStateOut.CLOSE_MICROPHONE def __init__(self, language_code, device_model_id, device_id, conversation_stream, channel, deadline_sec, device_handler): self.language_code = language_code self.device_model_id = device_model_id self.device_id = device_id self.conversation_stream = conversation_stream # Opaque blob provided in AssistResponse that, # when provided in a follow-up AssistRequest, # gives the Assistant a context marker within the current state # of the multi-Assist()-RPC "conversation". # This value, along with MicrophoneMode, supports a more natural # "conversation" with the Assistant. self.conversation_state = None # Create Google Assistant API gRPC client. self.assistant = embedded_assistant_pb2_grpc.EmbeddedAssistantStub( channel ) self.deadline = deadline_sec self.device_handler = device_handler def __enter__(self): return self def __exit__(self, etype, e, traceback): if e: return False self.conversation_stream.close() def is_grpc_error_unavailable(e): is_grpc_error = isinstance(e, grpc.RpcError) if is_grpc_error and (e.code() == grpc.StatusCode.UNAVAILABLE): logging.error('grpc unavailable error: %s', e) return True return False @retry(reraise=True, stop=stop_after_attempt(3), retry=retry_if_exception(is_grpc_error_unavailable)) def assist(self): """Send a voice request to the Assistant and playback the response. Returns: True if conversation should continue. """ continue_conversation = False device_actions_futures = [] self.conversation_stream.start_recording() logging.info('Recording audio request.') def iter_assist_requests(): for c in self.gen_assist_requests(): assistant_helpers.log_assist_request_without_audio(c) yield c self.conversation_stream.start_playback() # This generator yields AssistResponse proto messages # received from the gRPC Google Assistant API. for resp in self.assistant.Assist(iter_assist_requests(), self.deadline): assistant_helpers.log_assist_response_without_audio(resp) if resp.event_type == self.END_OF_UTTERANCE: logging.info('End of audio request detected') self.conversation_stream.stop_recording() if resp.speech_results: user_request = ' '.join( r.transcript for r in resp.speech_results) logging.info('Transcript of user request: "%s".', user_request) logging.info('Playing assistant response.') if len(resp.audio_out.audio_data) > 0: self.conversation_stream.write(resp.audio_out.audio_data) if resp.dialog_state_out.conversation_state: conversation_state = resp.dialog_state_out.conversation_state logging.debug('Updating conversation state.') self.conversation_state = conversation_state if resp.dialog_state_out.volume_percentage != 0: volume_percentage = resp.dialog_state_out.volume_percentage logging.info('Setting volume to %s%%', volume_percentage) self.conversation_stream.volume_percentage = volume_percentage if resp.dialog_state_out.microphone_mode == self.DIALOG_FOLLOW_ON: continue_conversation = True logging.info('Expecting follow-on query from user.') elif resp.dialog_state_out.microphone_mode == self.CLOSE_MICROPHONE: continue_conversation = False if resp.device_action.device_request_json: device_request = json.loads( resp.device_action.device_request_json ) fs = self.device_handler(device_request) if fs: device_actions_futures.extend(fs) if len(device_actions_futures): logging.info('Waiting for device executions to complete.') concurrent.futures.wait(device_actions_futures) logging.info('Finished playing assistant response.') self.conversation_stream.stop_playback() return (user_request, continue_conversation) def gen_assist_requests(self): """Yields: AssistRequest messages to send to the API.""" dialog_state_in = embedded_assistant_pb2.DialogStateIn( language_code=self.language_code, conversation_state=b'' ) if self.conversation_state: logging.debug('Sending conversation state.') dialog_state_in.conversation_state = self.conversation_state config = embedded_assistant_pb2.AssistConfig( audio_in_config=embedded_assistant_pb2.AudioInConfig( encoding='LINEAR16', sample_rate_hertz=self.conversation_stream.sample_rate, ), audio_out_config=embedded_assistant_pb2.AudioOutConfig( encoding='LINEAR16', sample_rate_hertz=self.conversation_stream.sample_rate, volume_percentage=self.conversation_stream.volume_percentage, ), dialog_state_in=dialog_state_in, device_config=embedded_assistant_pb2.DeviceConfig( device_id=self.device_id, device_model_id=self.device_model_id, ) ) # The first AssistRequest must contain the AssistConfig # and no audio data. yield embedded_assistant_pb2.AssistRequest(config=config) for data in self.conversation_stream: # Subsequent requests need audio data, but not config. yield embedded_assistant_pb2.AssistRequest(audio_in=data) # vim:sw=4:ts=4:et: