- #30 Implemented Google Assistant backend

- #21 Implemented events management
This commit is contained in:
Fabio Manganiello 2017-12-24 01:03:26 +01:00
parent 16dabd7575
commit 7b97a5b229
13 changed files with 395 additions and 83 deletions

View file

@ -7,11 +7,13 @@ from threading import Thread
from .bus import Bus from .bus import Bus
from .config import Config from .config import Config
from .event.processor import EventProcessor
from .utils import get_or_load_plugin, init_backends, get_module_and_name_from_action from .utils import get_or_load_plugin, init_backends, get_module_and_name_from_action
from .message.event import Event, StopEvent
from .message.request import Request from .message.request import Request
from .message.response import Response from .message.response import Response
__author__ = 'Fabio Manganiello <info@fabiomanganiello.com>' __author__ = 'Fabio Manganiello <blacklight86@gmail.com>'
__version__ = '0.5' __version__ = '0.5'
#-----------# #-----------#
@ -43,6 +45,7 @@ class Daemon(object):
""" """
self.config_file = config_file self.config_file = config_file
self.event_processor = EventProcessor()
self.requests_to_process = requests_to_process self.requests_to_process = requests_to_process
self.processed_requests = 0 self.processed_requests = 0
@ -80,11 +83,18 @@ class Daemon(object):
self.stop_app() self.stop_app()
elif isinstance(msg, Response): elif isinstance(msg, Response):
logging.info('Received response: {}'.format(msg)) logging.info('Received response: {}'.format(msg))
elif isinstance(msg, StopEvent) and msg.targets_me():
logging.info('Received STOP event: {}'.format(msg))
self.stop_app()
elif isinstance(msg, Event):
logging.info('Received event: {}'.format(msg))
self.event_processor.process_event(msg)
return _f return _f
def stop_app(self): def stop_app(self):
""" Stops the backends and the bus """
for backend in self.backends.values(): for backend in self.backends.values():
backend.stop() backend.stop()
self.bus.stop() self.bus.stop()

View file

@ -57,8 +57,8 @@ class Backend(Thread):
dictionary, a platypush.message.Message dictionary, a platypush.message.Message
object, or a string/byte UTF-8 encoded string object, or a string/byte UTF-8 encoded string
""" """
msg = Message.build(msg) msg = Message.build(msg)
if not getattr(msg, 'target') or msg.target != self.device_id: if not getattr(msg, 'target') or msg.target != self.device_id:
return # Not for me return # Not for me
@ -115,6 +115,24 @@ class Backend(Thread):
resp_backend.start() resp_backend.start()
def send_event(self, event, **kwargs):
"""
Send an event message on the backend
Params:
event -- The request, either a dict, a string/bytes UTF-8 JSON,
or a platypush.message.event.Event object.
"""
event = Event.build(event)
assert isinstance(event, Event)
event.origin = self.device_id
if not hasattr(event, 'target'):
event.target = self.device_id
self.send_message(event, **kwargs)
def send_request(self, request, on_response=None, def send_request(self, request, on_response=None,
response_timeout=_default_response_timeout, **kwargs): response_timeout=_default_response_timeout, **kwargs):
""" """
@ -189,6 +207,7 @@ class Backend(Thread):
def _async_stop(): def _async_stop():
evt = StopEvent(target=self.device_id, origin=self.device_id, evt = StopEvent(target=self.device_id, origin=self.device_id,
thread_id=self.thread_id) thread_id=self.thread_id)
self.send_message(evt) self.send_message(evt)
self.on_stop() self.on_stop()

View file

@ -11,6 +11,8 @@ from google.assistant.library.event import EventType
from google.assistant.library.file_helpers import existing_file from google.assistant.library.file_helpers import existing_file
from platypush.backend import Backend from platypush.backend import Backend
from platypush.message.event.assistant import \
ConversationStartEvent, ConversationEndEvent, SpeechRecognizedEvent
class AssistantGoogleBackend(Backend): class AssistantGoogleBackend(Backend):
""" Class for the Google Assistant backend. It creates and event source """ Class for the Google Assistant backend. It creates and event source
@ -18,59 +20,45 @@ class AssistantGoogleBackend(Backend):
def __init__(self, credentials_file=os.path.join( def __init__(self, credentials_file=os.path.join(
os.path.expanduser('~/.config'), os.path.expanduser('~/.config'),
'google-oauthlib-tool', 'credentials.json'), 'google-oauthlib-tool', 'credentials.json'), **kwargs):
on_conversation_start=None, on_conversation_end=None, **kwargs):
""" Params: """ Params:
credentials_file -- Path to the Google OAuth credentials file credentials_file -- Path to the Google OAuth credentials file
(default: ~/.config/google-oauthlib-tool/credentials.json) (default: ~/.config/google-oauthlib-tool/credentials.json)
on_conversation_start: Custom shell command to execute when a
conversation starts (default: none)
on_conversation_end: Custom shell command to execute when a
conversation ends (default: none)
""" """
super().__init__(**kwargs) super().__init__(**kwargs)
self.credentials_file = credentials_file self.credentials_file = credentials_file
self.on_conversation_start = on_conversation_start self.assistant = None
self.on_conversation_end = on_conversation_end
with open(self.credentials_file, 'r') as f: with open(self.credentials_file, 'r') as f:
self.credentials = google.oauth2.credentials.Credentials(token=None, self.credentials = google.oauth2.credentials.Credentials(token=None,
**json.load(f)) **json.load(f))
self.assistant = None
def _process_event(self, event): def _process_event(self, event):
logging.debug('Received assistant event: {}'.format(event)) logging.info('Received assistant event: {}'.format(event))
if event.type == EventType.ON_CONVERSATION_TURN_STARTED and self.on_conversation_start: if event.type == EventType.ON_CONVERSATION_TURN_STARTED:
subprocess.check_output(self.on_conversation_start, self.bus.post(ConversationStartEvent())
stderr=subprocess.STDOUT, shell=True) elif event.type == EventType.ON_CONVERSATION_TURN_FINISHED:
elif event.type == EventType.ON_CONVERSATION_TURN_FINISHED and self.on_conversation_end: self.bus.post(ConversationEndEvent())
subprocess.check_output(self.on_conversation_end,
stderr=subprocess.STDOUT, shell=True)
elif event.type == EventType.ON_RECOGNIZING_SPEECH_FINISHED: elif event.type == EventType.ON_RECOGNIZING_SPEECH_FINISHED:
phrase = event.args['text'].lower().strip() phrase = event.args['text'].lower().strip()
logging.info('Speech recognized: {}'.format(phrase)) logging.info('Speech recognized: {}'.format(phrase))
# self.on_message(event) self.bus.post(SpeechRecognizedEvent(phrase=phrase))
def send_message(self, msg): def send_message(self, msg):
# Cant' send a message on an event source, ignoring # Can't send a message on an event source, ignoring
# TODO Make a class for event sources like these. Event sources
# would be a subset of the backends which can fire events on the bus
# but not receive requests or process responses.
pass pass
def on_stop(self):
if self.producer:
self.producer.flush()
self.producer.close()
if self.consumer:
self.consumer.close()
def run(self): def run(self):
super().run() super().run()
with Assistant(self.credentials) as self.assistant: with Assistant(self.credentials) as assistant:
for event in self.assistant.start(): for event in assistant.start():
self._process_event(event) self._process_event(event)

View file

@ -46,12 +46,16 @@ class KafkaBackend(Backend):
self.producer.flush() self.producer.flush()
def on_stop(self): def on_stop(self):
if self.producer: try:
self.producer.flush() if self.producer:
self.producer.close() self.producer.flush()
self.producer.close()
if self.consumer: if self.consumer:
self.consumer.close() self.consumer.close()
except Exception as e:
logging.warning('Exception occurred while closing Kafka connection')
logging.exception(e)
def run(self): def run(self):
super().run() super().run()
@ -64,9 +68,10 @@ class KafkaBackend(Backend):
for msg in self.consumer: for msg in self.consumer:
self._on_record(msg) self._on_record(msg)
if self.should_stop(): break if self.should_stop(): break
except ConnectionError: except Exception as e:
logging.warning('Kafka connection error, retrying in {} seconds'. logging.warning('Kafka connection error, reconnecting in {} seconds'.
format(self._conn_retry_secs)) format(self._conn_retry_secs))
logging.exception(e)
time.sleep(self._conn_retry_secs) time.sleep(self._conn_retry_secs)
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et:

View file

@ -62,21 +62,23 @@ class Config(object):
if 'device_id' not in self._config: if 'device_id' not in self._config:
self._config['device_id'] = socket.gethostname() self._config['device_id'] = socket.gethostname()
self._init_backends() self._init_components()
self._init_plugins()
def _init_backends(self):
def _init_components(self):
self.backends = {} self.backends = {}
for key in self._config.keys():
if not key.startswith('backend.'): continue
backend_name = '.'.join(key.split('.')[1:])
self.backends[backend_name] = self._config[key]
def _init_plugins(self):
self.plugins = {} self.plugins = {}
self.event_hooks = {}
for key in self._config.keys(): for key in self._config.keys():
if key.startswith('backend.'): continue if key.startswith('backend.'):
self.plugins[key] = self._config[key] backend_name = '.'.join(key.split('.')[1:])
self.backends[backend_name] = self._config[key]
elif key.startswith('event.hook.'):
hook_name = '.'.join(key.split('.')[2:])
self.event_hooks[hook_name] = self._config[key]
else:
self.plugins[key] = self._config[key]
@staticmethod @staticmethod
def get_backends(): def get_backends():
@ -90,6 +92,12 @@ class Config(object):
if _default_config_instance is None: _default_config_instance = Config() if _default_config_instance is None: _default_config_instance = Config()
return _default_config_instance.plugins return _default_config_instance.plugins
@staticmethod
def get_event_hooks():
global _default_config_instance
if _default_config_instance is None: _default_config_instance = Config()
return _default_config_instance.event_hooks
@staticmethod @staticmethod
def get_default_pusher_backend(): def get_default_pusher_backend():
""" """

View file

136
platypush/event/hook.py Normal file
View file

@ -0,0 +1,136 @@
import json
import logging
from platypush.config import Config
from platypush.message.event import Event
from platypush.message.request import Request
from platypush.utils import get_event_class_by_type
def parse(msg):
""" Builds a dict given another dictionary or
a JSON UTF-8 encoded string/bytearray """
if isinstance(msg, bytes) or isinstance(msg, bytearray):
msg = msg.decode('utf-8')
if isinstance(msg, str):
try:
msg = json.loads(msg.strip())
except:
logging.warning('Invalid JSON message: {}'.format(msg))
return None
return msg
class EventCondition(object):
""" Event hook condition class """
def __init__(self, type=Event.__class__, **kwargs):
"""
Rule constructor.
Params:
type -- Class of the event to be built
kwargs -- Fields rules as a key-value (e.g. source_button=btn_id
or recognized_phrase='Your phrase')
"""
self.type = type
self.args = {}
self.parsed_args = {}
for (key, value) in kwargs.items():
# TODO So far we only allow simple value match. If value is a dict
# instead, we should allow more a sophisticated attribute matching,
# e.g. or conditions, in, and other operators.
self.args[key] = value
@classmethod
def build(cls, rule):
""" Builds a rule given either another EventRule, a dictionary or
a JSON UTF-8 encoded string/bytearray """
if isinstance(rule, cls): return rule
else: rule = parse(rule)
assert isinstance(rule, dict)
type = get_event_class_by_type(
rule.pop('type') if 'type' in rule else 'Event')
args = {}
for (key, value) in rule.items():
args[key] = value
return cls(type=type, **args)
class EventAction(Request):
""" Event hook action class. It is a special type of runnable request
whose fields can be configured later depending on the event context """
def __init__(self, target=Config.get('device_id'), action=None, **args):
super().__init__(target=target, action=action, **args)
def execute(self, **context):
for (key, value) in context.items():
self.args[key] = value
super().execute()
@classmethod
def build(cls, action):
action = super().parse(action)
action['origin'] = Config.get('device_id')
if 'target' not in action:
action['target'] = action['origin']
return super().build(action)
class EventHook(object):
""" Event hook class. It consists of one conditionss and
one or multiple actions to be executed """
def __init__(self, name, condition=None, actions=[]):
""" Construtor. Takes a name, a EventCondition object and a list of
EventAction objects as input """
self.name = name
self.condition = EventCondition.build(condition or {})
self.actions = actions
@classmethod
def build(cls, name, hook):
""" Builds a rule given either another EventRule, a dictionary or
a JSON UTF-8 encoded string/bytearray """
if isinstance(hook, cls): return hook
else: hook = parse(hook)
assert isinstance(hook, dict)
condition = EventCondition.build(hook['if']) if 'if' in hook else None
actions = []
if 'then' in hook:
if isinstance(hook['then'], list):
actions = [EventAction.build(action) for action in hook['then']]
else:
actions = [EventAction.build(hook['then'])]
return cls(name=name, condition=condition, actions=actions)
def run(self, event):
""" Checks the condition of the hook against a particular event and
runs the hook actions if the condition is met """
result = event.matches_condition(self.condition)
if result[0]: # is match
logging.info('Running hook {} triggered by an event'.format(self.name))
for action in self.actions:
action.execute(**result[1])
# vim:sw=4:ts=4:et:

View file

@ -0,0 +1,43 @@
import logging
from .rule import EventRule
from platypush.config import Config
from platypush.message import Message
from platypush.message.request import Request
class EventProcessor(object):
""" Event processor class. Checks an event against the configured
rules and executes any matching event hooks """
def __init__(self, hooks=Config.get_event_hooks(), **kwargs):
"""
Params:
hooks -- List of event hooks (default: any entry in the config
named as event.hook.<hook_name> """
self.hooks = {}
for (name, hook) in hooks.items():
self.hooks[name] = {
'if': EventRule.build(hook['if'] if 'if' in hook else {}),
'then': hook['then'],
}
def process_event(self, event):
""" Processes an event and runs any matched hooks """
matching_hooks = { name: hook['then'] for (name, hook) in self.hooks.items()
if event.matches_rule(hook['if']) }
for (name, hook) in matching_hooks.items():
logging.info('Running command {} triggered by matching event'
.format(name))
# TODO Extend the request with the parameters coming from the event.
# A hook should support a syntax like "playlist_id: $EVENT[playlist_id]"
request = Request.build(hook)
request.execute()
# vim:sw=4:ts=4:et:

View file

@ -0,0 +1,32 @@
import logging
from ..hook import EventHook
from platypush.config import Config
class EventProcessor(object):
""" Event processor class. Checks an event against the configured
rules and executes any matching event hooks """
def __init__(self, hooks=Config.get_event_hooks(), **kwargs):
"""
Params:
hooks -- List of event hooks (default: any entry in the config
named as event.hook.<hook_name> """
self.hooks = []
for (name, hook) in hooks.items():
h = EventHook.build(name=name, hook=hook)
self.hooks.append(h)
def process_event(self, event):
""" Processes an event and runs any matched hooks """
for hook in self.hooks:
hook.run(event)
# vim:sw=4:ts=4:et:

View file

@ -2,27 +2,27 @@ import json
import random import random
import threading import threading
from enum import Enum from platypush.config import Config
from platypush.message import Message from platypush.message import Message
from platypush.utils import get_event_class_by_type
class Event(Message): class Event(Message):
""" Event message class """ """ Event message class """
def __init__(self, target, type, origin, id=None, **kwargs): def __init__(self, target=None, origin=None, id=None, **kwargs):
""" """
Params: Params:
target -- Target node [String] target -- Target node [String]
type -- Event type [EventType]
origin -- Origin node (default: current node) [String] origin -- Origin node (default: current node) [String]
id -- Event ID (default: auto-generated) id -- Event ID (default: auto-generated)
kwargs -- Additional arguments for the event [kwDict] kwargs -- Additional arguments for the event [kwDict]
""" """
self.id = id if id else self._generate_id() self.id = id if id else self._generate_id()
self.target = target self.target = target if target else Config.get('device_id')
self.origin = origin self.origin = origin if origin else Config.get('device_id')
self.type = type self.type = '{}.{}'.format(self.__class__.__module__,
self.__class__.__name__)
self.args = kwargs self.args = kwargs
@classmethod @classmethod
@ -32,7 +32,7 @@ class Event(Message):
msg = super().parse(msg) msg = super().parse(msg)
event_type = msg['args'].pop('type') event_type = msg['args'].pop('type')
event_class = getattr(EventType, event_type).cls event_class = get_event_class_by_type(event_type)
args = { args = {
'target' : msg['target'], 'target' : msg['target'],
@ -43,6 +43,29 @@ class Event(Message):
args['id'] = msg['id'] if 'id' in msg else cls._generate_id() args['id'] = msg['id'] if 'id' in msg else cls._generate_id()
return event_class(**args) return event_class(**args)
def matches_condition(self, condition):
"""
If the event matches an event condition, it will return True and a
dictionary containing any parsed arguments, otherwise False and {}
Params:
-- condition -- The platypush.event.hook.EventCondition object
"""
parsed_args = {}
if not isinstance(self, condition.type): return [False, parsed_args]
for (attr, value) in condition.args.items():
# TODO Be more sophisticated, not only simple match options!
if not hasattr(self.args, attr):
return [False, parsed_args]
if isinstance(self.args[attr], str) and not value in self.args[attr]:
return [False, parsed_args]
elif self.args[attr] != value:
return [False, parsed_args]
return [True, parsed_args]
@staticmethod @staticmethod
def _generate_id(): def _generate_id():
""" Generate a unique event ID """ """ Generate a unique event ID """
@ -63,12 +86,13 @@ class Event(Message):
'origin' : self.origin if hasattr(self, 'origin') else None, 'origin' : self.origin if hasattr(self, 'origin') else None,
'id' : self.id if hasattr(self, 'id') else None, 'id' : self.id if hasattr(self, 'id') else None,
'args' : { 'args' : {
'type' : self.type.name, 'type' : self.type,
**self.args, **self.args,
}, },
}) })
# XXX Should be a stop Request, not an Event
class StopEvent(Event): class StopEvent(Event):
""" StopEvent message. When received on a Bus, it will terminate the """ StopEvent message. When received on a Bus, it will terminate the
listening thread having the specified ID. Useful to keep listeners in listening thread having the specified ID. Useful to keep listeners in
@ -85,28 +109,12 @@ class StopEvent(Event):
""" """
super().__init__(target=target, origin=origin, id=id, super().__init__(target=target, origin=origin, id=id,
type=EventType.STOP, thread_id=thread_id, **kwargs) thread_id=thread_id, **kwargs)
def targets_me(self): def targets_me(self):
""" Returns true if the stop event is for the current thread """ """ Returns true if the stop event is for the current thread """
return self.args['thread_id'] == threading.get_ident() return self.args['thread_id'] == threading.get_ident()
class EventType(Enum):
""" Event types enum """
def __new__(cls, *args, **kwds):
value = len(cls.__members__) + 1
obj = object.__new__(cls)
obj._value_ = value
return obj
def __init__(self, label, cls):
self.label = label
self.cls = cls
STOP = 'STOP', StopEvent
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et:

View file

@ -0,0 +1,56 @@
import re
from platypush.message.event import Event
class AssistantEvent(Event):
""" Base class for assistant events """
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
class ConversationStartEvent(AssistantEvent):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
class ConversationEndEvent(AssistantEvent):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
class SpeechRecognizedEvent(AssistantEvent):
def __init__(self, phrase, *args, **kwargs):
super().__init__(phrase=phrase, *args, **kwargs)
self.recognized_phrase = phrase.strip().lower()
def matches_condition(self, condition):
if not isinstance(self, condition.type): return [False, {}]
recognized_tokens = re.split('\s+', self.recognized_phrase.strip().lower())
condition_tokens = re.split('\s+', condition.args['phrase'].strip().lower())
parsed_args = {}
while recognized_tokens and condition_tokens:
rec_token = recognized_tokens[0]
cond_token = condition_tokens[0]
if rec_token == cond_token:
recognized_tokens.pop(0)
condition_tokens.pop(0)
elif re.search(cond_token, rec_token):
condition_tokens.pop(0)
else:
m = re.match('^\$([\w\d_])', cond_token)
if m:
parsed_args[cond_token[1:]] = rec_token
recognized_tokens.pop(0)
condition_tokens.pop(0)
else:
recognized_tokens.pop(0)
return [len(condition_tokens) == 0, parsed_args]
# vim:sw=4:ts=4:et:

View file

@ -1,6 +1,9 @@
import json import json
import logging import logging
import random import random
import traceback
from threading import Thread
from threading import Thread from threading import Thread
@ -54,7 +57,7 @@ class Request(Message):
Params: Params:
n_tries -- Number of tries in case of failure before raising a RuntimeError n_tries -- Number of tries in case of failure before raising a RuntimeError
""" """
def _thread_func(): def _thread_func(n_tries):
(module_name, method_name) = get_module_and_name_from_action(self.action) (module_name, method_name) = get_module_and_name_from_action(self.action)
plugin = get_or_load_plugin(module_name) plugin = get_or_load_plugin(module_name)
@ -62,7 +65,6 @@ class Request(Message):
try: try:
# Run the action # Run the action
response = plugin.run(method=method_name, **self.args) response = plugin.run(method=method_name, **self.args)
print(response)
if response and response.is_error(): if response and response.is_error():
raise RuntimeError('Response processed with errors: {}'.format(response)) raise RuntimeError('Response processed with errors: {}'.format(response))
@ -75,18 +77,17 @@ class Request(Message):
if n_tries: if n_tries:
logging.info('Reloading plugin {} and retrying'.format(module_name)) logging.info('Reloading plugin {} and retrying'.format(module_name))
get_or_load_plugin(module_name, reload=True) get_or_load_plugin(module_name, reload=True)
n_tries -= 1 _thread_func(n_tries-1)
_thread_func()
return return
finally: finally:
# Send the response on the backend # Send the response on the backend
if self.backend and self.origin: if self.backend and self.origin:
self.backend.send_response(response=response, request=self) self.backend.send_response(response=response, request=self)
else: else:
logging.info('Dropping response whose request has no ' + logging.info('Response whose request has no ' +
'origin attached: {}'.format(self)) 'origin attached: {}'.format(response))
Thread(target=_thread_func).start() Thread(target=_thread_func, args=(n_tries,)).start()
def __str__(self): def __str__(self):

View file

@ -105,6 +105,12 @@ def get_message_class_by_type(msgtype):
return msgclass return msgclass
def get_event_class_by_type(type):
""" Gets an event class by type name """
event_module = importlib.import_module('.'.join(type.split('.')[:-1]))
return getattr(event_module, type.split('.')[-1])
def set_timeout(seconds, on_timeout): def set_timeout(seconds, on_timeout):
""" """
Set a function to be called if timeout expires without being cleared. Set a function to be called if timeout expires without being cleared.