diff --git a/platypush/message/request/__init__.py b/platypush/message/request/__init__.py index c28eb9c5..2f97b5be 100644 --- a/platypush/message/request/__init__.py +++ b/platypush/message/request/__init__.py @@ -1,3 +1,4 @@ +import copy import json import logging import random @@ -68,7 +69,7 @@ class Request(Message): def _expand_context(self, event_args=None, **context): - if event_args is None: event_args = self.args + if event_args is None: event_args = copy.deepcopy(self.args) keys = [] if isinstance(event_args, dict): @@ -78,8 +79,9 @@ class Request(Message): for key in keys: value = event_args[key] + if isinstance(value, str): - value = self._expand_value_from_context(value, **context) + value = self.expand_value_from_context(value, **context) elif isinstance(value, dict) or isinstance(value, list): self._expand_context(event_args=value, **context) @@ -89,7 +91,7 @@ class Request(Message): @classmethod - def _expand_value_from_context(cls, value, **context): + def expand_value_from_context(cls, value, **context): parsed_value = '' while value: m = re.match('([^\$]*)(\${\s*(.+?)\s*})(.*)', value) @@ -107,7 +109,12 @@ class Request(Message): context_argname, path if path else '')) except: context_value = expr - parsed_value += prefix + str(context_value) + parsed_value += prefix + ( + json.dumps(context_value) + if isinstance(context_value, list) + or isinstance(context_value, dict) + else str(context_value)) + else: parsed_value += prefix + expr else: parsed_value += value diff --git a/platypush/procedure/__init__.py b/platypush/procedure/__init__.py index ef2322ca..d843550a 100644 --- a/platypush/procedure/__init__.py +++ b/platypush/procedure/__init__.py @@ -1,4 +1,5 @@ import logging +import re from ..config import Config from ..message.request import Request @@ -24,10 +25,37 @@ class Procedure(object): for req in requests: req.backend = self.backend + @classmethod def build(cls, name, async, requests, backend=None, id=None, **kwargs): reqs = [] + loop_count = 0 + for request_config in requests: + # Check if this request is a for loop + if len(request_config.keys()) == 1: + key = list(request_config.keys())[0] + m = re.match('\s*(fork?)\s+([\w\d_]+)\s+in\s+(.*)\s*', key) + + if m: + loop_count += 1 + loop_name = '{}__loop_{}'.format(name, loop_count) + + # A 'for' loop is synchronous. Declare a 'fork' loop if you + # want to process the elements in the iterable in parallel + async = True if m.group(1) == 'fork' else False + iterator_name = m.group(2) + iterable = m.group(3) + + loop = LoopProcedure.build(name=loop_name, async=async, + requests=request_config[key], + backend=backend, id=id, + iterator_name=iterator_name, + iterable=iterable) + + reqs.append(loop) + continue + request_config['origin'] = Config.get('device_id') request_config['id'] = id if 'target' not in request_config: @@ -38,7 +66,8 @@ class Procedure(object): return cls(name=name, async=async, requests=reqs, backend=backend, **kwargs) - def execute(self, n_tries=1): + + def execute(self, n_tries=1, **context): """ Execute the requests in the procedure Params: @@ -46,15 +75,12 @@ class Procedure(object): """ logging.info('Executing request {}'.format(self.name)) - context = {} response = Response() for request in self.requests: - if self.async: - request.execute(n_tries, async=True, **context) - else: - response = request.execute(n_tries, async=False, **context) + response = request.execute(n_tries, async=self.async, **context) + if not self.async: if isinstance(response.output, dict): for (k,v) in response.output.items(): context[k] = v @@ -65,5 +91,50 @@ class Procedure(object): return response +class LoopProcedure(Procedure): + """ + Models a loop procedure, which expresses a construct similar to a + for loop in a programming language. The 'for' keyword implies a synchronous + loop, i.e. the nested actions will be executed in sequence. Use 'fork' + instead of 'for' if you want to run the actions in parallel. + + Example: + + procedure.sync.process_results: + - + action: http.get + args: + url: https://some-service/some/json/endpoint + # Example response: { "results": [ {"id":1, "name":"foo"}, {"id":2,"name":"bar"} ]} + - + for result in ${results}: + - + action: some.custom.action + args: + id: ${result['id']} + name: ${result['name']} + """ + + context = {} + + def __init__(self, name, iterator_name, iterable, requests, async=False, backend=None, **kwargs): + super(). __init__(name=name, async=async, requests=requests, backend=None, **kwargs) + + self.iterator_name = iterator_name + self.iterable = iterable + self.requests = requests + + + def execute(self, n_tries=1, async=None, **context): + iterable = Request.expand_value_from_context(self.iterable, **context) + response = Response() + + for item in iterable: + context[self.iterator_name] = item + # print('**** context[{}]: {}, iterable type: {}'.format(self.iterator_name, item, type(iterable))) + response = super().execute(n_tries, **context) + + return response + # vim:sw=4:ts=4:et: