"""Based on Google pushtotalk.py sample.""" import concurrent.futures import json import logging import grpc from google.assistant.embedded.v1alpha2 import ( embedded_assistant_pb2, embedded_assistant_pb2_grpc ) from tenacity import retry, stop_after_attempt, retry_if_exception try: from googlesamples.assistant.grpc import ( assistant_helpers, audio_helpers, browser_helpers, device_helpers ) except (SystemError, ImportError): import assistant_helpers import audio_helpers import browser_helpers import device_helpers ASSISTANT_API_ENDPOINT = 'embeddedassistant.googleapis.com' END_OF_UTTERANCE = embedded_assistant_pb2.AssistResponse.END_OF_UTTERANCE DIALOG_FOLLOW_ON = embedded_assistant_pb2.DialogStateOut.DIALOG_FOLLOW_ON CLOSE_MICROPHONE = embedded_assistant_pb2.DialogStateOut.CLOSE_MICROPHONE PLAYING = embedded_assistant_pb2.ScreenOutConfig.PLAYING DEFAULT_GRPC_DEADLINE = 60 * 3 + 5 class SampleAssistant(object): """Sample Assistant that supports conversations and device actions. Args: device_model_id: identifier of the device model. device_id: identifier of the registered device instance. conversation_stream(ConversationStream): audio stream for recording query and playing back assistant answer. channel: authorized gRPC channel for connection to the Google Assistant API. deadline_sec: gRPC deadline in seconds for Google Assistant API call. device_handler: callback for device actions. """ def __init__(self, language_code, device_model_id, device_id, conversation_stream, display, channel, deadline_sec, device_handler, play_response=True, on_conversation_start=None, on_conversation_end=None, on_speech_recognized=None, on_volume_changed=None, on_response=None): self.language_code = language_code self.device_model_id = device_model_id self.device_id = device_id self.conversation_stream = conversation_stream self.display = display self.play_response = play_response # Opaque blob provided in AssistResponse that, # when provided in a follow-up AssistRequest, # gives the Assistant a context marker within the current state # of the multi-Assist()-RPC "conversation". # This value, along with MicrophoneMode, supports a more natural # "conversation" with the Assistant. self.conversation_state = None # Force reset of first conversation. self.is_new_conversation = True # Create Google Assistant API gRPC client. self.assistant = embedded_assistant_pb2_grpc.EmbeddedAssistantStub(channel) self.deadline = deadline_sec self.device_handler = device_handler self.detected_speech = None self.on_conversation_start = on_conversation_start self.on_conversation_end = on_conversation_end self.on_speech_recognized = on_speech_recognized self.on_volume_changed = on_volume_changed self.on_response = on_response def __enter__(self): return self def __exit__(self, etype, e, traceback): if e: return False self.conversation_stream.close() @staticmethod def is_grpc_error_unavailable(e): is_grpc_error = isinstance(e, grpc.RpcError) if is_grpc_error and (e.code() == grpc.StatusCode.UNAVAILABLE): logging.error('grpc unavailable error: %s', e) return True return False @retry(reraise=True, stop=stop_after_attempt(3), retry=retry_if_exception(is_grpc_error_unavailable)) def assist(self): """Send a voice request to the Assistant and playback the response. Returns: True if conversation should continue. """ continue_conversation = False device_actions_futures = [] self.conversation_stream.start_recording() if self.on_conversation_start: self.on_conversation_start() logging.info('Recording audio request.') def iter_log_assist_requests(): for c in self.gen_assist_requests(): assistant_helpers.log_assist_request_without_audio(c) yield c logging.debug('Reached end of AssistRequest iteration.') # This generator yields AssistResponse proto messages # received from the gRPC Google Assistant API. for resp in self.assistant.Assist(iter_log_assist_requests(), self.deadline): assistant_helpers.log_assist_response_without_audio(resp) if resp.event_type == END_OF_UTTERANCE: logging.info('End of audio request detected.') logging.info('Stopping recording.') self.conversation_stream.stop_recording() if self.detected_speech and self.on_speech_recognized: self.on_speech_recognized(self.detected_speech) if resp.speech_results: self.detected_speech = ' '.join( r.transcript.strip() for r in resp.speech_results if len(r.transcript.strip())).strip() logging.info('Transcript of user request: "%s".', self.detected_speech) if len(resp.audio_out.audio_data) > 0: if not self.conversation_stream.playing: self.conversation_stream.stop_recording() if self.play_response: self.conversation_stream.start_playback() logging.info('Playing assistant response.') if self.play_response and self.conversation_stream.playing: self.conversation_stream.write(resp.audio_out.audio_data) elif self.conversation_stream.playing: self.conversation_stream.stop_playback() if resp.dialog_state_out.conversation_state: conversation_state = resp.dialog_state_out.conversation_state logging.debug('Updating conversation state.') self.conversation_state = conversation_state if resp.dialog_state_out.volume_percentage != 0: volume_percentage = resp.dialog_state_out.volume_percentage logging.info('Setting volume to %s%%', volume_percentage) self.conversation_stream.volume_percentage = volume_percentage if self.on_volume_changed: self.on_volume_changed(volume_percentage) if resp.dialog_state_out.microphone_mode == DIALOG_FOLLOW_ON: continue_conversation = True logging.info('Expecting follow-on query from user.') elif resp.dialog_state_out.microphone_mode == CLOSE_MICROPHONE: continue_conversation = False if resp.device_action.device_request_json: device_request = json.loads( resp.device_action.device_request_json ) fs = self.device_handler(device_request) if fs: device_actions_futures.extend(fs) if self.display and resp.screen_out.data: system_browser = browser_helpers.system_browser system_browser.display(resp.screen_out.data) if resp.dialog_state_out.supplemental_display_text and self.on_response: self.on_response(resp.dialog_state_out.supplemental_display_text) if len(device_actions_futures): logging.info('Waiting for device executions to complete.') concurrent.futures.wait(device_actions_futures) logging.info('Finished playing assistant response.') self.conversation_stream.stop_playback() if self.on_conversation_end: self.on_conversation_end(continue_conversation) return continue_conversation def gen_assist_requests(self): """Yields: AssistRequest messages to send to the API.""" config = embedded_assistant_pb2.AssistConfig( audio_in_config=embedded_assistant_pb2.AudioInConfig( encoding='LINEAR16', sample_rate_hertz=self.conversation_stream.sample_rate, ), audio_out_config=embedded_assistant_pb2.AudioOutConfig( encoding='LINEAR16', sample_rate_hertz=self.conversation_stream.sample_rate, volume_percentage=self.conversation_stream.volume_percentage, ), dialog_state_in=embedded_assistant_pb2.DialogStateIn( language_code=self.language_code, conversation_state=self.conversation_state, is_new_conversation=self.is_new_conversation, ), device_config=embedded_assistant_pb2.DeviceConfig( device_id=self.device_id, device_model_id=self.device_model_id, ) ) if self.display: config.screen_out_config.screen_mode = PLAYING # Continue current conversation with later requests. self.is_new_conversation = False # The first AssistRequest must contain the AssistConfig # and no audio data. yield embedded_assistant_pb2.AssistRequest(config=config) for data in self.conversation_stream: # Subsequent requests need audio data, but not config. yield embedded_assistant_pb2.AssistRequest(audio_in=data)