diff --git a/platypush/message/request/__init__.py b/platypush/message/request/__init__.py index 5423e4dd..5b93062a 100644 --- a/platypush/message/request/__init__.py +++ b/platypush/message/request/__init__.py @@ -138,6 +138,8 @@ class Request(Message): if callable(context_value): context_value = context_value() + if isinstance(context_value, range) or isinstance(context_value, tuple): + context_value = [*context_value] if isinstance(context_value, datetime.date): context_value = context_value.isoformat() except Exception as e: diff --git a/platypush/procedure/__init__.py b/platypush/procedure/__init__.py index f255b6bd..4e5db70e 100644 --- a/platypush/procedure/__init__.py +++ b/platypush/procedure/__init__.py @@ -1,3 +1,4 @@ +import enum import logging import re @@ -9,6 +10,12 @@ from ..message.response import Response logger = logging.getLogger(__name__) +class Statement(enum.Enum): + BREAK = 'break' + CONTINUE = 'continue' + RETURN = 'return' + + class Procedure(object): """ Procedure class. A procedure is a pre-configured list of requests """ @@ -18,7 +25,7 @@ class Procedure(object): name -- Procedure name _async -- Whether the actions in the procedure are supposed to be executed sequentially or in parallel (True or False) - requests -- List of platylist.message.request.Request objects + requests -- List of platypush.message.request.Request objects """ self.name = name @@ -26,6 +33,7 @@ class Procedure(object): self.requests = requests self.backend = backend self.args = args or {} + self._should_return = False for req in requests: req.backend = self.backend @@ -33,36 +41,18 @@ class Procedure(object): @classmethod def build(cls, name, _async, requests, args=None, backend=None, id=None, procedure_class=None, **kwargs): reqs = [] - loop_count = 0 + for_count = 0 + while_count = 0 if_count = 0 if_config = LifoQueue() procedure_class = procedure_class or cls key = None for request_config in requests: - # Check if this request is a for loop - if len(request_config.keys()) == 1: - key = list(request_config.keys())[0] - m = re.match('\s*(fork?)\s+([\w\d_]+)\s+in\s+(.*)\s*', key) - - if m: - loop_count += 1 - loop_name = '{}__loop_{}'.format(name, loop_count) - - # A 'for' loop is synchronous. Declare a 'fork' loop if you - # want to process the elements in the iterable in parallel - _async = True if m.group(1) == 'fork' else False - iterator_name = m.group(2) - iterable = m.group(3) - - loop = LoopProcedure.build(name=loop_name, _async=_async, - requests=request_config[key], - backend=backend, id=id, - iterator_name=iterator_name, - iterable=iterable) - - reqs.append(loop) - continue + # Check if it's a break/continue/return statement + if isinstance(request_config, str): + reqs.append(Statement(request_config)) + continue # Check if this request is an if-else if len(request_config.keys()) >= 1: @@ -100,6 +90,48 @@ class Procedure(object): if key == 'else': continue + # Check if this request is a for loop + if len(request_config.keys()) == 1: + key = list(request_config.keys())[0] + m = re.match('\s*(fork?)\s+([\w\d_]+)\s+in\s+(.*)\s*', key) + + if m: + for_count += 1 + loop_name = '{}__for_{}'.format(name, for_count) + + # A 'for' loop is synchronous. Declare a 'fork' loop if you + # want to process the elements in the iterable in parallel + _async = True if m.group(1) == 'fork' else False + iterator_name = m.group(2) + iterable = m.group(3) + + loop = ForProcedure.build(name=loop_name, _async=_async, + requests=request_config[key], + backend=backend, id=id, + iterator_name=iterator_name, + iterable=iterable) + + reqs.append(loop) + continue + + # Check if this request is a while loop + if len(request_config.keys()) == 1: + key = list(request_config.keys())[0] + m = re.match('\s*while\s+\${(.*)}\s*', key) + + if m: + while_count += 1 + loop_name = '{}__while_{}'.format(name, while_count) + condition = m.group(1).strip() + + loop = WhileProcedure.build(name=loop_name, _async=False, + requests=request_config[key], + condition=condition, + backend=backend, id=id) + + reqs.append(loop) + continue + request_config['origin'] = Config.get('device_id') request_config['id'] = id if 'target' not in request_config: @@ -114,12 +146,25 @@ class Procedure(object): return procedure_class(name=name, _async=_async, requests=reqs, args=args, backend=backend, **kwargs) - def execute(self, n_tries=1, **context): + @staticmethod + def _find_nearest_loop(stack): + for proc in stack[::-1]: + if isinstance(proc, LoopProcedure): + return proc + + raise AssertionError('break/continue statement found outside of a loop') + + def execute(self, n_tries=1, __stack__=None, **context): """ Execute the requests in the procedure Params: n_tries -- Number of tries in case of failure before raising a RuntimeError """ + if not __stack__: + __stack__ = [self] + else: + __stack__.append(self) + if self.args: args = self.args.copy() for k, v in args.items(): @@ -134,12 +179,35 @@ class Procedure(object): token = Config.get('token') for request in self.requests: + if isinstance(request, Statement): + if request == Statement.RETURN: + self._should_return = True + for proc in __stack__: + proc._should_return = True + break + + if request in [Statement.BREAK, Statement.CONTINUE]: + loop = self._find_nearest_loop(__stack__) + if request == Statement.BREAK: + loop._should_break = True + else: + loop._should_continue = True + break + + if isinstance(self, LoopProcedure): + if self._should_continue or self._should_break: + if self._should_continue: + # noinspection PyAttributeOutsideInit + self._should_continue = False + + break + if token: request.token = token context['_async'] = self._async context['n_tries'] = n_tries - response = request.execute(**context) + response = request.execute(__stack__=__stack__, **context) if not self._async and response: if isinstance(response.output, dict): @@ -149,60 +217,171 @@ class Procedure(object): context['output'] = response.output context['errors'] = response.errors + if self._should_return: + break + return response or Response() class LoopProcedure(Procedure): + """ + Base class while and for/fork loops. + """ + + def __init__(self, name, requests, _async=False, args=None, backend=None): + super(). __init__(name=name, _async=_async, requests=requests, args=args, backend=backend) + self._should_break = False + self._should_continue = False + + +class ForProcedure(LoopProcedure): """ Models a loop procedure, which expresses a construct similar to a for loop in a programming language. The 'for' keyword implies a synchronous loop, i.e. the nested actions will be executed in sequence. Use 'fork' instead of 'for' if you want to run the actions in parallel. - Example: + Example:: procedure.sync.process_results: - - - action: http.get - args: - url: https://some-service/some/json/endpoint - # Example response: { "results": [ {"id":1, "name":"foo"}, {"id":2,"name":"bar"} ]} - - - for result in ${results}: - - - action: some.custom.action - args: - id: ${result['id']} - name: ${result['name']} - """ + - action: http.get + args: + url: https://some-service/some/json/endpoint + # Example response: { "results": [ {"id":1, "name":"foo"}, {"id":2,"name":"bar"} ]} + - for result in ${results}: + - action: some.custom.action + args: + id: ${result['id']} + name: ${result['name']} - context = {} + """ def __init__(self, name, iterator_name, iterable, requests, _async=False, args=None, backend=None): super(). __init__(name=name, _async=_async, requests=requests, args=args, backend=backend) - self.iterator_name = iterator_name self.iterable = iterable - self.requests = requests def execute(self, _async=None, **context): # noinspection PyBroadException try: iterable = eval(self.iterable) - if not hasattr(iterable, '__iter__'): - raise RuntimeError + assert hasattr(iterable, '__iter__'), 'Object of type {} is not iterable: {}'.\ + format(type(iterable), iterable) except: iterable = Request.expand_value_from_context(self.iterable, **context) response = Response() + # noinspection DuplicatedCode for item in iterable: + if self._should_return: + logger.info('Returning from {}'.format(self.name)) + break + + if self._should_continue: + self._should_continue = False + logger.info('Continuing loop {}'.format(self.name)) + continue + + if self._should_break: + self._should_break = False + logger.info('Breaking loop {}'.format(self.name)) + break + context[self.iterator_name] = item response = super().execute(**context) return response +class WhileProcedure(LoopProcedure): + """ + Models a while loop procedure. + + Example:: + + procedure.process_results: + - action: http.get + args: + url: https://some-service/some/json/endpoint + # Example response: {"id":1, "name":"foo"}} + + - while ${output}: + - action: some.custom.action + args: + id: ${id} + name: ${name} + + - action: http.get + args: + url: https://some-service/some/json/endpoint + # Example response: {"id":1, "name":"foo"}} + + """ + + def __init__(self, name, condition, requests, _async=False, args=None, backend=None): + super(). __init__(name=name, _async=_async, requests=requests, args=args, backend=backend) + self.condition = condition + + @staticmethod + def _get_context(**context): + for (k, v) in context.items(): + # noinspection PyBroadException + try: + context[k] = eval(v) + except: + if isinstance(v, str): + # noinspection PyBroadException + try: + context[k] = eval('"{}"'.format(re.sub('(^|[^\\\])"', '\1\\"', v))) + except: + pass + + return context + + # noinspection DuplicatedCode,PyBroadException + def execute(self, _async=None, **context): + response = Response() + context = self._get_context(**context) + for k, v in context.items(): + try: + exec('{}={}'.format(k, v)) + except: + pass + + while True: + condition_true = eval(self.condition) + if not condition_true: + break + + if self._should_return: + logger.info('Returning from {}'.format(self.name)) + break + + if self._should_continue: + self._should_continue = False + logger.info('Continuing loop {}'.format(self.name)) + continue + + if self._should_break: + self._should_break = False + logger.info('Breaking loop {}'.format(self.name)) + break + + response = super().execute(**context) + + if response.output: + if isinstance(response.output, dict): + new_context = self._get_context(**response.output) + for k, v in new_context.items(): + try: + exec('{}={}'.format(k, v)) + except: + pass + + return response + + # noinspection PyBroadException class IfProcedure(Procedure): """ @@ -229,8 +408,6 @@ class IfProcedure(Procedure): cmd: '/path/turn_off_heating.sh' """ - context = {} - def __init__(self, name, condition, requests, else_branch=None, args=None, backend=None, id=None, **kwargs): kwargs['_async'] = False self.condition = condition