forked from platypush/platypush
Implemented for loops support in procedures, #44
This commit is contained in:
parent
252f503e4d
commit
d4ef2bf59e
2 changed files with 88 additions and 10 deletions
|
@ -1,3 +1,4 @@
|
||||||
|
import copy
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
import random
|
import random
|
||||||
|
@ -68,7 +69,7 @@ class Request(Message):
|
||||||
|
|
||||||
|
|
||||||
def _expand_context(self, event_args=None, **context):
|
def _expand_context(self, event_args=None, **context):
|
||||||
if event_args is None: event_args = self.args
|
if event_args is None: event_args = copy.deepcopy(self.args)
|
||||||
|
|
||||||
keys = []
|
keys = []
|
||||||
if isinstance(event_args, dict):
|
if isinstance(event_args, dict):
|
||||||
|
@ -78,8 +79,9 @@ class Request(Message):
|
||||||
|
|
||||||
for key in keys:
|
for key in keys:
|
||||||
value = event_args[key]
|
value = event_args[key]
|
||||||
|
|
||||||
if isinstance(value, str):
|
if isinstance(value, str):
|
||||||
value = self._expand_value_from_context(value, **context)
|
value = self.expand_value_from_context(value, **context)
|
||||||
elif isinstance(value, dict) or isinstance(value, list):
|
elif isinstance(value, dict) or isinstance(value, list):
|
||||||
self._expand_context(event_args=value, **context)
|
self._expand_context(event_args=value, **context)
|
||||||
|
|
||||||
|
@ -89,7 +91,7 @@ class Request(Message):
|
||||||
|
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def _expand_value_from_context(cls, value, **context):
|
def expand_value_from_context(cls, value, **context):
|
||||||
parsed_value = ''
|
parsed_value = ''
|
||||||
while value:
|
while value:
|
||||||
m = re.match('([^\$]*)(\${\s*(.+?)\s*})(.*)', value)
|
m = re.match('([^\$]*)(\${\s*(.+?)\s*})(.*)', value)
|
||||||
|
@ -107,7 +109,12 @@ class Request(Message):
|
||||||
context_argname, path if path else ''))
|
context_argname, path if path else ''))
|
||||||
except: context_value = expr
|
except: context_value = expr
|
||||||
|
|
||||||
parsed_value += prefix + str(context_value)
|
parsed_value += prefix + (
|
||||||
|
json.dumps(context_value)
|
||||||
|
if isinstance(context_value, list)
|
||||||
|
or isinstance(context_value, dict)
|
||||||
|
else str(context_value))
|
||||||
|
|
||||||
else: parsed_value += prefix + expr
|
else: parsed_value += prefix + expr
|
||||||
else:
|
else:
|
||||||
parsed_value += value
|
parsed_value += value
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
import logging
|
import logging
|
||||||
|
import re
|
||||||
|
|
||||||
from ..config import Config
|
from ..config import Config
|
||||||
from ..message.request import Request
|
from ..message.request import Request
|
||||||
|
@ -24,10 +25,37 @@ class Procedure(object):
|
||||||
for req in requests:
|
for req in requests:
|
||||||
req.backend = self.backend
|
req.backend = self.backend
|
||||||
|
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def build(cls, name, async, requests, backend=None, id=None, **kwargs):
|
def build(cls, name, async, requests, backend=None, id=None, **kwargs):
|
||||||
reqs = []
|
reqs = []
|
||||||
|
loop_count = 0
|
||||||
|
|
||||||
for request_config in requests:
|
for request_config in requests:
|
||||||
|
# Check if this request is a for loop
|
||||||
|
if len(request_config.keys()) == 1:
|
||||||
|
key = list(request_config.keys())[0]
|
||||||
|
m = re.match('\s*(fork?)\s+([\w\d_]+)\s+in\s+(.*)\s*', key)
|
||||||
|
|
||||||
|
if m:
|
||||||
|
loop_count += 1
|
||||||
|
loop_name = '{}__loop_{}'.format(name, loop_count)
|
||||||
|
|
||||||
|
# A 'for' loop is synchronous. Declare a 'fork' loop if you
|
||||||
|
# want to process the elements in the iterable in parallel
|
||||||
|
async = True if m.group(1) == 'fork' else False
|
||||||
|
iterator_name = m.group(2)
|
||||||
|
iterable = m.group(3)
|
||||||
|
|
||||||
|
loop = LoopProcedure.build(name=loop_name, async=async,
|
||||||
|
requests=request_config[key],
|
||||||
|
backend=backend, id=id,
|
||||||
|
iterator_name=iterator_name,
|
||||||
|
iterable=iterable)
|
||||||
|
|
||||||
|
reqs.append(loop)
|
||||||
|
continue
|
||||||
|
|
||||||
request_config['origin'] = Config.get('device_id')
|
request_config['origin'] = Config.get('device_id')
|
||||||
request_config['id'] = id
|
request_config['id'] = id
|
||||||
if 'target' not in request_config:
|
if 'target' not in request_config:
|
||||||
|
@ -38,7 +66,8 @@ class Procedure(object):
|
||||||
|
|
||||||
return cls(name=name, async=async, requests=reqs, backend=backend, **kwargs)
|
return cls(name=name, async=async, requests=reqs, backend=backend, **kwargs)
|
||||||
|
|
||||||
def execute(self, n_tries=1):
|
|
||||||
|
def execute(self, n_tries=1, **context):
|
||||||
"""
|
"""
|
||||||
Execute the requests in the procedure
|
Execute the requests in the procedure
|
||||||
Params:
|
Params:
|
||||||
|
@ -46,15 +75,12 @@ class Procedure(object):
|
||||||
"""
|
"""
|
||||||
|
|
||||||
logging.info('Executing request {}'.format(self.name))
|
logging.info('Executing request {}'.format(self.name))
|
||||||
context = {}
|
|
||||||
response = Response()
|
response = Response()
|
||||||
|
|
||||||
for request in self.requests:
|
for request in self.requests:
|
||||||
if self.async:
|
response = request.execute(n_tries, async=self.async, **context)
|
||||||
request.execute(n_tries, async=True, **context)
|
|
||||||
else:
|
|
||||||
response = request.execute(n_tries, async=False, **context)
|
|
||||||
|
|
||||||
|
if not self.async:
|
||||||
if isinstance(response.output, dict):
|
if isinstance(response.output, dict):
|
||||||
for (k,v) in response.output.items():
|
for (k,v) in response.output.items():
|
||||||
context[k] = v
|
context[k] = v
|
||||||
|
@ -65,5 +91,50 @@ class Procedure(object):
|
||||||
return response
|
return response
|
||||||
|
|
||||||
|
|
||||||
|
class LoopProcedure(Procedure):
|
||||||
|
"""
|
||||||
|
Models a loop procedure, which expresses a construct similar to a
|
||||||
|
for loop in a programming language. The 'for' keyword implies a synchronous
|
||||||
|
loop, i.e. the nested actions will be executed in sequence. Use 'fork'
|
||||||
|
instead of 'for' if you want to run the actions in parallel.
|
||||||
|
|
||||||
|
Example:
|
||||||
|
|
||||||
|
procedure.sync.process_results:
|
||||||
|
-
|
||||||
|
action: http.get
|
||||||
|
args:
|
||||||
|
url: https://some-service/some/json/endpoint
|
||||||
|
# Example response: { "results": [ {"id":1, "name":"foo"}, {"id":2,"name":"bar"} ]}
|
||||||
|
-
|
||||||
|
for result in ${results}:
|
||||||
|
-
|
||||||
|
action: some.custom.action
|
||||||
|
args:
|
||||||
|
id: ${result['id']}
|
||||||
|
name: ${result['name']}
|
||||||
|
"""
|
||||||
|
|
||||||
|
context = {}
|
||||||
|
|
||||||
|
def __init__(self, name, iterator_name, iterable, requests, async=False, backend=None, **kwargs):
|
||||||
|
super(). __init__(name=name, async=async, requests=requests, backend=None, **kwargs)
|
||||||
|
|
||||||
|
self.iterator_name = iterator_name
|
||||||
|
self.iterable = iterable
|
||||||
|
self.requests = requests
|
||||||
|
|
||||||
|
|
||||||
|
def execute(self, n_tries=1, async=None, **context):
|
||||||
|
iterable = Request.expand_value_from_context(self.iterable, **context)
|
||||||
|
response = Response()
|
||||||
|
|
||||||
|
for item in iterable:
|
||||||
|
context[self.iterator_name] = item
|
||||||
|
# print('**** context[{}]: {}, iterable type: {}'.format(self.iterator_name, item, type(iterable)))
|
||||||
|
response = super().execute(n_tries, **context)
|
||||||
|
|
||||||
|
return response
|
||||||
|
|
||||||
# vim:sw=4:ts=4:et:
|
# vim:sw=4:ts=4:et:
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue