From b98fe01352c91a75efcfda76d6b6af0ea7c90031 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Fri, 5 Jan 2018 23:20:39 +0100 Subject: [PATCH] Implemented sequential execution of tasks in procedures, response context parsing and procedure response returned on the bus as well, #37 --- platypush/message/request/__init__.py | 70 ++++++++++++++++++++------ platypush/message/response/__init__.py | 15 +++++- platypush/procedure/__init__.py | 13 ++++- 3 files changed, 79 insertions(+), 19 deletions(-) diff --git a/platypush/message/request/__init__.py b/platypush/message/request/__init__.py index 15fb5b4c4e..da7b7fb9d3 100644 --- a/platypush/message/request/__init__.py +++ b/platypush/message/request/__init__.py @@ -1,6 +1,7 @@ import json import logging import random +import re import traceback from threading import Thread @@ -60,10 +61,44 @@ class Request(Message): proc_name = self.action.split('.')[-1] proc_config = Config.get_procedures()[proc_name] proc = Procedure.build(name=proc_name, requests=proc_config, backend=self.backend, id=self.id) - proc.execute(*args, **kwargs) + return proc.execute(*args, **kwargs) - def execute(self, n_tries=1, async=True): + def _expand_context(self, **context): + args = {} + for (name, value) in self.args.items(): + if isinstance(value, str): + parsed_value = '' + while value: + m = re.match('([^\\\]*)\$([\w\d_-]+)(.*)', value) + if m: + context_name = m.group(2) + value = m.group(3) + if context_name in context: + parsed_value += m.group(1) + context[context_name] + else: + parsed_value += m.group(1) + '$' + m.group(2) + else: + parsed_value += value + value = '' + + value = parsed_value + + args[name] = value + + return args + + + + def _send_response(self, response): + if self.backend and self.origin: + self.backend.send_response(response=response, request=self) + else: + logging.info('Response whose request has no ' + + 'origin attached: {}'.format(response)) + + + def execute(self, n_tries=1, async=True, **context): """ Execute this request and returns a Response object Params: @@ -72,18 +107,28 @@ class Request(Message): response posted on the bus when available (default), otherwise the current thread will wait for the response to be returned synchronously. + context -- Key-valued context. Example: + context = (group_name='Kitchen lights') + request.args: + - group: $group_name # will be expanded as "Kitchen lights") """ def _thread_func(n_tries): if self.action.startswith('procedure.'): - return self._execute_procedure(n_tries=n_tries) - - (module_name, method_name) = get_module_and_method_from_action(self.action) - plugin = get_plugin(module_name) + try: + response = self._execute_procedure(n_tries=n_tries) + finally: + self._send_response(response) + return response + else: + (module_name, method_name) = get_module_and_method_from_action(self.action) + plugin = get_plugin(module_name) try: # Run the action - response = plugin.run(method=method_name, **self.args) + args = self._expand_context(**context) + response = plugin.run(method=method_name, **args) + if response and response.is_error(): raise RuntimeError('Response processed with errors: {}'.format(response)) @@ -99,15 +144,8 @@ class Request(Message): _thread_func(n_tries-1) return finally: - if async: - # Send the response on the backend - if self.backend and self.origin: - self.backend.send_response(response=response, request=self) - else: - logging.info('Response whose request has no ' + - 'origin attached: {}'.format(response)) - else: - return response + self._send_response(response) + return response if async: Thread(target=_thread_func, args=(n_tries,)).start() diff --git a/platypush/message/response/__init__.py b/platypush/message/response/__init__.py index 3f865bb262..ed499b1884 100644 --- a/platypush/message/response/__init__.py +++ b/platypush/message/response/__init__.py @@ -16,8 +16,8 @@ class Response(Message): """ self.target = target - self.output = output - self.errors = errors + self.output = self._parse_msg(output) + self.errors = self._parse_msg(errors) self.origin = origin self.id = id @@ -25,6 +25,17 @@ class Response(Message): """ Returns True if the respopnse has errors """ return len(self.errors) != 0 + @classmethod + def _parse_msg(cls, msg): + if isinstance(msg, bytes) or isinstance(msg, bytearray): + msg = msg.decode('utf-8') + if isinstance(msg, str): + try: msg = json.loads(msg.strip()) + except ValueError as e: pass + + return msg + + @classmethod def build(cls, msg): msg = super().parse(msg) diff --git a/platypush/procedure/__init__.py b/platypush/procedure/__init__.py index 9e2a6880bb..a7781fde39 100644 --- a/platypush/procedure/__init__.py +++ b/platypush/procedure/__init__.py @@ -2,6 +2,7 @@ import logging from ..config import Config from ..message.request import Request +from ..message.response import Response class Procedure(object): """ Procedure class. A procedure is a pre-configured list of requests """ @@ -42,8 +43,18 @@ class Procedure(object): """ logging.info('Executing request {}'.format(self.name)) + context = {} + response = Response() + for request in self.requests: - request.execute(n_tries) + response = request.execute(n_tries, async=False, **context) + context = { k:v for (k,v) in response.output.items() } \ + if isinstance(response.output, dict) else {} + + context['output'] = response.output + context['errors'] = response.errors + + return response # vim:sw=4:ts=4:et: