From 7b97a5b229b2bef19e180cdc84e888f492a885e7 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Sun, 24 Dec 2017 01:03:26 +0100 Subject: [PATCH] - #30 Implemented Google Assistant backend - #21 Implemented events management --- platypush/__init__.py | 12 +- platypush/backend/__init__.py | 21 ++- .../backend/assistant/google/__init__.py | 46 +++--- platypush/backend/kafka/__init__.py | 19 ++- platypush/config/__init__.py | 30 ++-- platypush/event/__init__.py | 0 platypush/event/hook.py | 136 ++++++++++++++++++ platypush/event/processor.py | 43 ++++++ platypush/event/processor/__init__.py | 32 +++++ platypush/message/event/__init__.py | 62 ++++---- platypush/message/event/assistant/__init__.py | 56 ++++++++ platypush/message/request/__init__.py | 15 +- platypush/utils/__init__.py | 6 + 13 files changed, 395 insertions(+), 83 deletions(-) create mode 100644 platypush/event/__init__.py create mode 100644 platypush/event/hook.py create mode 100644 platypush/event/processor.py create mode 100644 platypush/event/processor/__init__.py create mode 100644 platypush/message/event/assistant/__init__.py diff --git a/platypush/__init__.py b/platypush/__init__.py index 5950b6b01..c3ca5a873 100644 --- a/platypush/__init__.py +++ b/platypush/__init__.py @@ -7,11 +7,13 @@ from threading import Thread from .bus import Bus from .config import Config +from .event.processor import EventProcessor from .utils import get_or_load_plugin, init_backends, get_module_and_name_from_action +from .message.event import Event, StopEvent from .message.request import Request from .message.response import Response -__author__ = 'Fabio Manganiello ' +__author__ = 'Fabio Manganiello ' __version__ = '0.5' #-----------# @@ -43,6 +45,7 @@ class Daemon(object): """ self.config_file = config_file + self.event_processor = EventProcessor() self.requests_to_process = requests_to_process self.processed_requests = 0 @@ -80,11 +83,18 @@ class Daemon(object): self.stop_app() elif isinstance(msg, Response): logging.info('Received response: {}'.format(msg)) + elif isinstance(msg, StopEvent) and msg.targets_me(): + logging.info('Received STOP event: {}'.format(msg)) + self.stop_app() + elif isinstance(msg, Event): + logging.info('Received event: {}'.format(msg)) + self.event_processor.process_event(msg) return _f def stop_app(self): + """ Stops the backends and the bus """ for backend in self.backends.values(): backend.stop() self.bus.stop() diff --git a/platypush/backend/__init__.py b/platypush/backend/__init__.py index 28dd565b6..2fb0a66a4 100644 --- a/platypush/backend/__init__.py +++ b/platypush/backend/__init__.py @@ -57,8 +57,8 @@ class Backend(Thread): dictionary, a platypush.message.Message object, or a string/byte UTF-8 encoded string """ - msg = Message.build(msg) + if not getattr(msg, 'target') or msg.target != self.device_id: return # Not for me @@ -115,6 +115,24 @@ class Backend(Thread): resp_backend.start() + def send_event(self, event, **kwargs): + """ + Send an event message on the backend + Params: + event -- The request, either a dict, a string/bytes UTF-8 JSON, + or a platypush.message.event.Event object. + """ + + event = Event.build(event) + assert isinstance(event, Event) + + event.origin = self.device_id + if not hasattr(event, 'target'): + event.target = self.device_id + + self.send_message(event, **kwargs) + + def send_request(self, request, on_response=None, response_timeout=_default_response_timeout, **kwargs): """ @@ -189,6 +207,7 @@ class Backend(Thread): def _async_stop(): evt = StopEvent(target=self.device_id, origin=self.device_id, thread_id=self.thread_id) + self.send_message(evt) self.on_stop() diff --git a/platypush/backend/assistant/google/__init__.py b/platypush/backend/assistant/google/__init__.py index 95d183b7b..fcb2cd40d 100644 --- a/platypush/backend/assistant/google/__init__.py +++ b/platypush/backend/assistant/google/__init__.py @@ -11,6 +11,8 @@ from google.assistant.library.event import EventType from google.assistant.library.file_helpers import existing_file from platypush.backend import Backend +from platypush.message.event.assistant import \ + ConversationStartEvent, ConversationEndEvent, SpeechRecognizedEvent class AssistantGoogleBackend(Backend): """ Class for the Google Assistant backend. It creates and event source @@ -18,59 +20,45 @@ class AssistantGoogleBackend(Backend): def __init__(self, credentials_file=os.path.join( os.path.expanduser('~/.config'), - 'google-oauthlib-tool', 'credentials.json'), - on_conversation_start=None, on_conversation_end=None, **kwargs): + 'google-oauthlib-tool', 'credentials.json'), **kwargs): """ Params: credentials_file -- Path to the Google OAuth credentials file (default: ~/.config/google-oauthlib-tool/credentials.json) - on_conversation_start: Custom shell command to execute when a - conversation starts (default: none) - on_conversation_end: Custom shell command to execute when a - conversation ends (default: none) """ super().__init__(**kwargs) self.credentials_file = credentials_file - self.on_conversation_start = on_conversation_start - self.on_conversation_end = on_conversation_end + self.assistant = None with open(self.credentials_file, 'r') as f: self.credentials = google.oauth2.credentials.Credentials(token=None, **json.load(f)) - self.assistant = None - def _process_event(self, event): - logging.debug('Received assistant event: {}'.format(event)) + logging.info('Received assistant event: {}'.format(event)) - if event.type == EventType.ON_CONVERSATION_TURN_STARTED and self.on_conversation_start: - subprocess.check_output(self.on_conversation_start, - stderr=subprocess.STDOUT, shell=True) - elif event.type == EventType.ON_CONVERSATION_TURN_FINISHED and self.on_conversation_end: - subprocess.check_output(self.on_conversation_end, - stderr=subprocess.STDOUT, shell=True) + if event.type == EventType.ON_CONVERSATION_TURN_STARTED: + self.bus.post(ConversationStartEvent()) + elif event.type == EventType.ON_CONVERSATION_TURN_FINISHED: + self.bus.post(ConversationEndEvent()) elif event.type == EventType.ON_RECOGNIZING_SPEECH_FINISHED: phrase = event.args['text'].lower().strip() logging.info('Speech recognized: {}'.format(phrase)) - # self.on_message(event) + self.bus.post(SpeechRecognizedEvent(phrase=phrase)) + def send_message(self, msg): - # Cant' send a message on an event source, ignoring + # Can't send a message on an event source, ignoring + # TODO Make a class for event sources like these. Event sources + # would be a subset of the backends which can fire events on the bus + # but not receive requests or process responses. pass - def on_stop(self): - if self.producer: - self.producer.flush() - self.producer.close() - - if self.consumer: - self.consumer.close() - def run(self): super().run() - with Assistant(self.credentials) as self.assistant: - for event in self.assistant.start(): + with Assistant(self.credentials) as assistant: + for event in assistant.start(): self._process_event(event) diff --git a/platypush/backend/kafka/__init__.py b/platypush/backend/kafka/__init__.py index 42dd1fed0..e2118d6bd 100644 --- a/platypush/backend/kafka/__init__.py +++ b/platypush/backend/kafka/__init__.py @@ -46,12 +46,16 @@ class KafkaBackend(Backend): self.producer.flush() def on_stop(self): - if self.producer: - self.producer.flush() - self.producer.close() + try: + if self.producer: + self.producer.flush() + self.producer.close() - if self.consumer: - self.consumer.close() + if self.consumer: + self.consumer.close() + except Exception as e: + logging.warning('Exception occurred while closing Kafka connection') + logging.exception(e) def run(self): super().run() @@ -64,9 +68,10 @@ class KafkaBackend(Backend): for msg in self.consumer: self._on_record(msg) if self.should_stop(): break - except ConnectionError: - logging.warning('Kafka connection error, retrying in {} seconds'. + except Exception as e: + logging.warning('Kafka connection error, reconnecting in {} seconds'. format(self._conn_retry_secs)) + logging.exception(e) time.sleep(self._conn_retry_secs) # vim:sw=4:ts=4:et: diff --git a/platypush/config/__init__.py b/platypush/config/__init__.py index 59d6e3ed4..66d3ae6a4 100644 --- a/platypush/config/__init__.py +++ b/platypush/config/__init__.py @@ -62,21 +62,23 @@ class Config(object): if 'device_id' not in self._config: self._config['device_id'] = socket.gethostname() - self._init_backends() - self._init_plugins() + self._init_components() - def _init_backends(self): + + def _init_components(self): self.backends = {} - for key in self._config.keys(): - if not key.startswith('backend.'): continue - backend_name = '.'.join(key.split('.')[1:]) - self.backends[backend_name] = self._config[key] - - def _init_plugins(self): self.plugins = {} + self.event_hooks = {} + for key in self._config.keys(): - if key.startswith('backend.'): continue - self.plugins[key] = self._config[key] + if key.startswith('backend.'): + backend_name = '.'.join(key.split('.')[1:]) + self.backends[backend_name] = self._config[key] + elif key.startswith('event.hook.'): + hook_name = '.'.join(key.split('.')[2:]) + self.event_hooks[hook_name] = self._config[key] + else: + self.plugins[key] = self._config[key] @staticmethod def get_backends(): @@ -90,6 +92,12 @@ class Config(object): if _default_config_instance is None: _default_config_instance = Config() return _default_config_instance.plugins + @staticmethod + def get_event_hooks(): + global _default_config_instance + if _default_config_instance is None: _default_config_instance = Config() + return _default_config_instance.event_hooks + @staticmethod def get_default_pusher_backend(): """ diff --git a/platypush/event/__init__.py b/platypush/event/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/platypush/event/hook.py b/platypush/event/hook.py new file mode 100644 index 000000000..486e64cc9 --- /dev/null +++ b/platypush/event/hook.py @@ -0,0 +1,136 @@ +import json +import logging + +from platypush.config import Config +from platypush.message.event import Event +from platypush.message.request import Request +from platypush.utils import get_event_class_by_type + + +def parse(msg): + """ Builds a dict given another dictionary or + a JSON UTF-8 encoded string/bytearray """ + + if isinstance(msg, bytes) or isinstance(msg, bytearray): + msg = msg.decode('utf-8') + if isinstance(msg, str): + try: + msg = json.loads(msg.strip()) + except: + logging.warning('Invalid JSON message: {}'.format(msg)) + return None + + return msg + + +class EventCondition(object): + """ Event hook condition class """ + + def __init__(self, type=Event.__class__, **kwargs): + """ + Rule constructor. + Params: + type -- Class of the event to be built + kwargs -- Fields rules as a key-value (e.g. source_button=btn_id + or recognized_phrase='Your phrase') + """ + + self.type = type + self.args = {} + self.parsed_args = {} + + for (key, value) in kwargs.items(): + # TODO So far we only allow simple value match. If value is a dict + # instead, we should allow more a sophisticated attribute matching, + # e.g. or conditions, in, and other operators. + self.args[key] = value + + @classmethod + def build(cls, rule): + """ Builds a rule given either another EventRule, a dictionary or + a JSON UTF-8 encoded string/bytearray """ + + if isinstance(rule, cls): return rule + else: rule = parse(rule) + assert isinstance(rule, dict) + + type = get_event_class_by_type( + rule.pop('type') if 'type' in rule else 'Event') + + args = {} + for (key, value) in rule.items(): + args[key] = value + + return cls(type=type, **args) + + +class EventAction(Request): + """ Event hook action class. It is a special type of runnable request + whose fields can be configured later depending on the event context """ + + def __init__(self, target=Config.get('device_id'), action=None, **args): + super().__init__(target=target, action=action, **args) + + + def execute(self, **context): + for (key, value) in context.items(): + self.args[key] = value + + super().execute() + + @classmethod + def build(cls, action): + action = super().parse(action) + action['origin'] = Config.get('device_id') + + if 'target' not in action: + action['target'] = action['origin'] + return super().build(action) + +class EventHook(object): + """ Event hook class. It consists of one conditionss and + one or multiple actions to be executed """ + + def __init__(self, name, condition=None, actions=[]): + """ Construtor. Takes a name, a EventCondition object and a list of + EventAction objects as input """ + + self.name = name + self.condition = EventCondition.build(condition or {}) + self.actions = actions + + + @classmethod + def build(cls, name, hook): + """ Builds a rule given either another EventRule, a dictionary or + a JSON UTF-8 encoded string/bytearray """ + + if isinstance(hook, cls): return hook + else: hook = parse(hook) + assert isinstance(hook, dict) + + condition = EventCondition.build(hook['if']) if 'if' in hook else None + actions = [] + if 'then' in hook: + if isinstance(hook['then'], list): + actions = [EventAction.build(action) for action in hook['then']] + else: + actions = [EventAction.build(hook['then'])] + + return cls(name=name, condition=condition, actions=actions) + + + def run(self, event): + """ Checks the condition of the hook against a particular event and + runs the hook actions if the condition is met """ + + result = event.matches_condition(self.condition) + if result[0]: # is match + logging.info('Running hook {} triggered by an event'.format(self.name)) + + for action in self.actions: + action.execute(**result[1]) + + +# vim:sw=4:ts=4:et: + diff --git a/platypush/event/processor.py b/platypush/event/processor.py new file mode 100644 index 000000000..32689835c --- /dev/null +++ b/platypush/event/processor.py @@ -0,0 +1,43 @@ +import logging + +from .rule import EventRule + +from platypush.config import Config +from platypush.message import Message +from platypush.message.request import Request + +class EventProcessor(object): + """ Event processor class. Checks an event against the configured + rules and executes any matching event hooks """ + + def __init__(self, hooks=Config.get_event_hooks(), **kwargs): + """ + Params: + hooks -- List of event hooks (default: any entry in the config + named as event.hook. """ + + self.hooks = {} + for (name, hook) in hooks.items(): + self.hooks[name] = { + 'if': EventRule.build(hook['if'] if 'if' in hook else {}), + 'then': hook['then'], + } + + def process_event(self, event): + """ Processes an event and runs any matched hooks """ + + matching_hooks = { name: hook['then'] for (name, hook) in self.hooks.items() + if event.matches_rule(hook['if']) } + + for (name, hook) in matching_hooks.items(): + logging.info('Running command {} triggered by matching event' + .format(name)) + + # TODO Extend the request with the parameters coming from the event. + # A hook should support a syntax like "playlist_id: $EVENT[playlist_id]" + request = Request.build(hook) + request.execute() + + +# vim:sw=4:ts=4:et: + diff --git a/platypush/event/processor/__init__.py b/platypush/event/processor/__init__.py new file mode 100644 index 000000000..72e3d9176 --- /dev/null +++ b/platypush/event/processor/__init__.py @@ -0,0 +1,32 @@ +import logging + +from ..hook import EventHook + +from platypush.config import Config + + +class EventProcessor(object): + """ Event processor class. Checks an event against the configured + rules and executes any matching event hooks """ + + def __init__(self, hooks=Config.get_event_hooks(), **kwargs): + """ + Params: + hooks -- List of event hooks (default: any entry in the config + named as event.hook. """ + + self.hooks = [] + for (name, hook) in hooks.items(): + h = EventHook.build(name=name, hook=hook) + self.hooks.append(h) + + + def process_event(self, event): + """ Processes an event and runs any matched hooks """ + + for hook in self.hooks: + hook.run(event) + + +# vim:sw=4:ts=4:et: + diff --git a/platypush/message/event/__init__.py b/platypush/message/event/__init__.py index 973bc5a85..5e7889c8d 100644 --- a/platypush/message/event/__init__.py +++ b/platypush/message/event/__init__.py @@ -2,27 +2,27 @@ import json import random import threading -from enum import Enum - +from platypush.config import Config from platypush.message import Message +from platypush.utils import get_event_class_by_type class Event(Message): """ Event message class """ - def __init__(self, target, type, origin, id=None, **kwargs): + def __init__(self, target=None, origin=None, id=None, **kwargs): """ Params: target -- Target node [String] - type -- Event type [EventType] origin -- Origin node (default: current node) [String] - id -- Event ID (default: auto-generated) + id -- Event ID (default: auto-generated) kwargs -- Additional arguments for the event [kwDict] """ self.id = id if id else self._generate_id() - self.target = target - self.origin = origin - self.type = type + self.target = target if target else Config.get('device_id') + self.origin = origin if origin else Config.get('device_id') + self.type = '{}.{}'.format(self.__class__.__module__, + self.__class__.__name__) self.args = kwargs @classmethod @@ -32,7 +32,7 @@ class Event(Message): msg = super().parse(msg) event_type = msg['args'].pop('type') - event_class = getattr(EventType, event_type).cls + event_class = get_event_class_by_type(event_type) args = { 'target' : msg['target'], @@ -43,6 +43,29 @@ class Event(Message): args['id'] = msg['id'] if 'id' in msg else cls._generate_id() return event_class(**args) + def matches_condition(self, condition): + """ + If the event matches an event condition, it will return True and a + dictionary containing any parsed arguments, otherwise False and {} + Params: + -- condition -- The platypush.event.hook.EventCondition object + """ + + parsed_args = {} + if not isinstance(self, condition.type): return [False, parsed_args] + + for (attr, value) in condition.args.items(): + # TODO Be more sophisticated, not only simple match options! + if not hasattr(self.args, attr): + return [False, parsed_args] + if isinstance(self.args[attr], str) and not value in self.args[attr]: + return [False, parsed_args] + elif self.args[attr] != value: + return [False, parsed_args] + + return [True, parsed_args] + + @staticmethod def _generate_id(): """ Generate a unique event ID """ @@ -63,12 +86,13 @@ class Event(Message): 'origin' : self.origin if hasattr(self, 'origin') else None, 'id' : self.id if hasattr(self, 'id') else None, 'args' : { - 'type' : self.type.name, + 'type' : self.type, **self.args, }, }) +# XXX Should be a stop Request, not an Event class StopEvent(Event): """ StopEvent message. When received on a Bus, it will terminate the listening thread having the specified ID. Useful to keep listeners in @@ -85,28 +109,12 @@ class StopEvent(Event): """ super().__init__(target=target, origin=origin, id=id, - type=EventType.STOP, thread_id=thread_id, **kwargs) + thread_id=thread_id, **kwargs) def targets_me(self): """ Returns true if the stop event is for the current thread """ return self.args['thread_id'] == threading.get_ident() -class EventType(Enum): - """ Event types enum """ - - def __new__(cls, *args, **kwds): - value = len(cls.__members__) + 1 - obj = object.__new__(cls) - obj._value_ = value - return obj - - def __init__(self, label, cls): - self.label = label - self.cls = cls - - STOP = 'STOP', StopEvent - - # vim:sw=4:ts=4:et: diff --git a/platypush/message/event/assistant/__init__.py b/platypush/message/event/assistant/__init__.py new file mode 100644 index 000000000..c9613929b --- /dev/null +++ b/platypush/message/event/assistant/__init__.py @@ -0,0 +1,56 @@ +import re + +from platypush.message.event import Event + +class AssistantEvent(Event): + """ Base class for assistant events """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + +class ConversationStartEvent(AssistantEvent): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + +class ConversationEndEvent(AssistantEvent): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + +class SpeechRecognizedEvent(AssistantEvent): + def __init__(self, phrase, *args, **kwargs): + super().__init__(phrase=phrase, *args, **kwargs) + self.recognized_phrase = phrase.strip().lower() + + def matches_condition(self, condition): + if not isinstance(self, condition.type): return [False, {}] + + recognized_tokens = re.split('\s+', self.recognized_phrase.strip().lower()) + condition_tokens = re.split('\s+', condition.args['phrase'].strip().lower()) + parsed_args = {} + + while recognized_tokens and condition_tokens: + rec_token = recognized_tokens[0] + cond_token = condition_tokens[0] + + if rec_token == cond_token: + recognized_tokens.pop(0) + condition_tokens.pop(0) + elif re.search(cond_token, rec_token): + condition_tokens.pop(0) + else: + m = re.match('^\$([\w\d_])', cond_token) + if m: + parsed_args[cond_token[1:]] = rec_token + recognized_tokens.pop(0) + condition_tokens.pop(0) + else: + recognized_tokens.pop(0) + + return [len(condition_tokens) == 0, parsed_args] + + +# vim:sw=4:ts=4:et: + diff --git a/platypush/message/request/__init__.py b/platypush/message/request/__init__.py index 07850aa40..cb92b0d00 100644 --- a/platypush/message/request/__init__.py +++ b/platypush/message/request/__init__.py @@ -1,6 +1,9 @@ import json import logging import random +import traceback + +from threading import Thread from threading import Thread @@ -54,7 +57,7 @@ class Request(Message): Params: n_tries -- Number of tries in case of failure before raising a RuntimeError """ - def _thread_func(): + def _thread_func(n_tries): (module_name, method_name) = get_module_and_name_from_action(self.action) plugin = get_or_load_plugin(module_name) @@ -62,7 +65,6 @@ class Request(Message): try: # Run the action response = plugin.run(method=method_name, **self.args) - print(response) if response and response.is_error(): raise RuntimeError('Response processed with errors: {}'.format(response)) @@ -75,18 +77,17 @@ class Request(Message): if n_tries: logging.info('Reloading plugin {} and retrying'.format(module_name)) get_or_load_plugin(module_name, reload=True) - n_tries -= 1 - _thread_func() + _thread_func(n_tries-1) return finally: # Send the response on the backend if self.backend and self.origin: self.backend.send_response(response=response, request=self) else: - logging.info('Dropping response whose request has no ' + - 'origin attached: {}'.format(self)) + logging.info('Response whose request has no ' + + 'origin attached: {}'.format(response)) - Thread(target=_thread_func).start() + Thread(target=_thread_func, args=(n_tries,)).start() def __str__(self): diff --git a/platypush/utils/__init__.py b/platypush/utils/__init__.py index f14bef1e9..b2f253f48 100644 --- a/platypush/utils/__init__.py +++ b/platypush/utils/__init__.py @@ -105,6 +105,12 @@ def get_message_class_by_type(msgtype): return msgclass +def get_event_class_by_type(type): + """ Gets an event class by type name """ + event_module = importlib.import_module('.'.join(type.split('.')[:-1])) + return getattr(event_module, type.split('.')[-1]) + + def set_timeout(seconds, on_timeout): """ Set a function to be called if timeout expires without being cleared.