forked from platypush/platypush
Implemented sequential execution of tasks in procedures, response context parsing and procedure response returned on the bus as well, #37
This commit is contained in:
parent
b7854cb5dd
commit
b98fe01352
3 changed files with 79 additions and 19 deletions
|
@ -1,6 +1,7 @@
|
|||
import json
|
||||
import logging
|
||||
import random
|
||||
import re
|
||||
import traceback
|
||||
|
||||
from threading import Thread
|
||||
|
@ -60,10 +61,44 @@ class Request(Message):
|
|||
proc_name = self.action.split('.')[-1]
|
||||
proc_config = Config.get_procedures()[proc_name]
|
||||
proc = Procedure.build(name=proc_name, requests=proc_config, backend=self.backend, id=self.id)
|
||||
proc.execute(*args, **kwargs)
|
||||
return proc.execute(*args, **kwargs)
|
||||
|
||||
|
||||
def execute(self, n_tries=1, async=True):
|
||||
def _expand_context(self, **context):
|
||||
args = {}
|
||||
for (name, value) in self.args.items():
|
||||
if isinstance(value, str):
|
||||
parsed_value = ''
|
||||
while value:
|
||||
m = re.match('([^\\\]*)\$([\w\d_-]+)(.*)', value)
|
||||
if m:
|
||||
context_name = m.group(2)
|
||||
value = m.group(3)
|
||||
if context_name in context:
|
||||
parsed_value += m.group(1) + context[context_name]
|
||||
else:
|
||||
parsed_value += m.group(1) + '$' + m.group(2)
|
||||
else:
|
||||
parsed_value += value
|
||||
value = ''
|
||||
|
||||
value = parsed_value
|
||||
|
||||
args[name] = value
|
||||
|
||||
return args
|
||||
|
||||
|
||||
|
||||
def _send_response(self, response):
|
||||
if self.backend and self.origin:
|
||||
self.backend.send_response(response=response, request=self)
|
||||
else:
|
||||
logging.info('Response whose request has no ' +
|
||||
'origin attached: {}'.format(response))
|
||||
|
||||
|
||||
def execute(self, n_tries=1, async=True, **context):
|
||||
"""
|
||||
Execute this request and returns a Response object
|
||||
Params:
|
||||
|
@ -72,18 +107,28 @@ class Request(Message):
|
|||
response posted on the bus when available (default),
|
||||
otherwise the current thread will wait for the response
|
||||
to be returned synchronously.
|
||||
context -- Key-valued context. Example:
|
||||
context = (group_name='Kitchen lights')
|
||||
request.args:
|
||||
- group: $group_name # will be expanded as "Kitchen lights")
|
||||
"""
|
||||
|
||||
def _thread_func(n_tries):
|
||||
if self.action.startswith('procedure.'):
|
||||
return self._execute_procedure(n_tries=n_tries)
|
||||
|
||||
try:
|
||||
response = self._execute_procedure(n_tries=n_tries)
|
||||
finally:
|
||||
self._send_response(response)
|
||||
return response
|
||||
else:
|
||||
(module_name, method_name) = get_module_and_method_from_action(self.action)
|
||||
plugin = get_plugin(module_name)
|
||||
|
||||
try:
|
||||
# Run the action
|
||||
response = plugin.run(method=method_name, **self.args)
|
||||
args = self._expand_context(**context)
|
||||
response = plugin.run(method=method_name, **args)
|
||||
|
||||
if response and response.is_error():
|
||||
raise RuntimeError('Response processed with errors: {}'.format(response))
|
||||
|
||||
|
@ -99,14 +144,7 @@ class Request(Message):
|
|||
_thread_func(n_tries-1)
|
||||
return
|
||||
finally:
|
||||
if async:
|
||||
# Send the response on the backend
|
||||
if self.backend and self.origin:
|
||||
self.backend.send_response(response=response, request=self)
|
||||
else:
|
||||
logging.info('Response whose request has no ' +
|
||||
'origin attached: {}'.format(response))
|
||||
else:
|
||||
self._send_response(response)
|
||||
return response
|
||||
|
||||
if async:
|
||||
|
|
|
@ -16,8 +16,8 @@ class Response(Message):
|
|||
"""
|
||||
|
||||
self.target = target
|
||||
self.output = output
|
||||
self.errors = errors
|
||||
self.output = self._parse_msg(output)
|
||||
self.errors = self._parse_msg(errors)
|
||||
self.origin = origin
|
||||
self.id = id
|
||||
|
||||
|
@ -25,6 +25,17 @@ class Response(Message):
|
|||
""" Returns True if the respopnse has errors """
|
||||
return len(self.errors) != 0
|
||||
|
||||
@classmethod
|
||||
def _parse_msg(cls, msg):
|
||||
if isinstance(msg, bytes) or isinstance(msg, bytearray):
|
||||
msg = msg.decode('utf-8')
|
||||
if isinstance(msg, str):
|
||||
try: msg = json.loads(msg.strip())
|
||||
except ValueError as e: pass
|
||||
|
||||
return msg
|
||||
|
||||
|
||||
@classmethod
|
||||
def build(cls, msg):
|
||||
msg = super().parse(msg)
|
||||
|
|
|
@ -2,6 +2,7 @@ import logging
|
|||
|
||||
from ..config import Config
|
||||
from ..message.request import Request
|
||||
from ..message.response import Response
|
||||
|
||||
class Procedure(object):
|
||||
""" Procedure class. A procedure is a pre-configured list of requests """
|
||||
|
@ -42,8 +43,18 @@ class Procedure(object):
|
|||
"""
|
||||
|
||||
logging.info('Executing request {}'.format(self.name))
|
||||
context = {}
|
||||
response = Response()
|
||||
|
||||
for request in self.requests:
|
||||
request.execute(n_tries)
|
||||
response = request.execute(n_tries, async=False, **context)
|
||||
context = { k:v for (k,v) in response.output.items() } \
|
||||
if isinstance(response.output, dict) else {}
|
||||
|
||||
context['output'] = response.output
|
||||
context['errors'] = response.errors
|
||||
|
||||
return response
|
||||
|
||||
|
||||
# vim:sw=4:ts=4:et:
|
||||
|
|
Loading…
Reference in a new issue