Support for while/break/continue/return statemnts - closes #107
This commit is contained in:
parent
35cefcc9f5
commit
40e65d882f
2 changed files with 227 additions and 48 deletions
|
@ -138,6 +138,8 @@ class Request(Message):
|
||||||
|
|
||||||
if callable(context_value):
|
if callable(context_value):
|
||||||
context_value = context_value()
|
context_value = context_value()
|
||||||
|
if isinstance(context_value, range) or isinstance(context_value, tuple):
|
||||||
|
context_value = [*context_value]
|
||||||
if isinstance(context_value, datetime.date):
|
if isinstance(context_value, datetime.date):
|
||||||
context_value = context_value.isoformat()
|
context_value = context_value.isoformat()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
import enum
|
||||||
import logging
|
import logging
|
||||||
import re
|
import re
|
||||||
|
|
||||||
|
@ -9,6 +10,12 @@ from ..message.response import Response
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class Statement(enum.Enum):
|
||||||
|
BREAK = 'break'
|
||||||
|
CONTINUE = 'continue'
|
||||||
|
RETURN = 'return'
|
||||||
|
|
||||||
|
|
||||||
class Procedure(object):
|
class Procedure(object):
|
||||||
""" Procedure class. A procedure is a pre-configured list of requests """
|
""" Procedure class. A procedure is a pre-configured list of requests """
|
||||||
|
|
||||||
|
@ -18,7 +25,7 @@ class Procedure(object):
|
||||||
name -- Procedure name
|
name -- Procedure name
|
||||||
_async -- Whether the actions in the procedure are supposed to
|
_async -- Whether the actions in the procedure are supposed to
|
||||||
be executed sequentially or in parallel (True or False)
|
be executed sequentially or in parallel (True or False)
|
||||||
requests -- List of platylist.message.request.Request objects
|
requests -- List of platypush.message.request.Request objects
|
||||||
"""
|
"""
|
||||||
|
|
||||||
self.name = name
|
self.name = name
|
||||||
|
@ -26,6 +33,7 @@ class Procedure(object):
|
||||||
self.requests = requests
|
self.requests = requests
|
||||||
self.backend = backend
|
self.backend = backend
|
||||||
self.args = args or {}
|
self.args = args or {}
|
||||||
|
self._should_return = False
|
||||||
|
|
||||||
for req in requests:
|
for req in requests:
|
||||||
req.backend = self.backend
|
req.backend = self.backend
|
||||||
|
@ -33,36 +41,18 @@ class Procedure(object):
|
||||||
@classmethod
|
@classmethod
|
||||||
def build(cls, name, _async, requests, args=None, backend=None, id=None, procedure_class=None, **kwargs):
|
def build(cls, name, _async, requests, args=None, backend=None, id=None, procedure_class=None, **kwargs):
|
||||||
reqs = []
|
reqs = []
|
||||||
loop_count = 0
|
for_count = 0
|
||||||
|
while_count = 0
|
||||||
if_count = 0
|
if_count = 0
|
||||||
if_config = LifoQueue()
|
if_config = LifoQueue()
|
||||||
procedure_class = procedure_class or cls
|
procedure_class = procedure_class or cls
|
||||||
key = None
|
key = None
|
||||||
|
|
||||||
for request_config in requests:
|
for request_config in requests:
|
||||||
# Check if this request is a for loop
|
# Check if it's a break/continue/return statement
|
||||||
if len(request_config.keys()) == 1:
|
if isinstance(request_config, str):
|
||||||
key = list(request_config.keys())[0]
|
reqs.append(Statement(request_config))
|
||||||
m = re.match('\s*(fork?)\s+([\w\d_]+)\s+in\s+(.*)\s*', key)
|
continue
|
||||||
|
|
||||||
if m:
|
|
||||||
loop_count += 1
|
|
||||||
loop_name = '{}__loop_{}'.format(name, loop_count)
|
|
||||||
|
|
||||||
# A 'for' loop is synchronous. Declare a 'fork' loop if you
|
|
||||||
# want to process the elements in the iterable in parallel
|
|
||||||
_async = True if m.group(1) == 'fork' else False
|
|
||||||
iterator_name = m.group(2)
|
|
||||||
iterable = m.group(3)
|
|
||||||
|
|
||||||
loop = LoopProcedure.build(name=loop_name, _async=_async,
|
|
||||||
requests=request_config[key],
|
|
||||||
backend=backend, id=id,
|
|
||||||
iterator_name=iterator_name,
|
|
||||||
iterable=iterable)
|
|
||||||
|
|
||||||
reqs.append(loop)
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Check if this request is an if-else
|
# Check if this request is an if-else
|
||||||
if len(request_config.keys()) >= 1:
|
if len(request_config.keys()) >= 1:
|
||||||
|
@ -100,6 +90,48 @@ class Procedure(object):
|
||||||
if key == 'else':
|
if key == 'else':
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
# Check if this request is a for loop
|
||||||
|
if len(request_config.keys()) == 1:
|
||||||
|
key = list(request_config.keys())[0]
|
||||||
|
m = re.match('\s*(fork?)\s+([\w\d_]+)\s+in\s+(.*)\s*', key)
|
||||||
|
|
||||||
|
if m:
|
||||||
|
for_count += 1
|
||||||
|
loop_name = '{}__for_{}'.format(name, for_count)
|
||||||
|
|
||||||
|
# A 'for' loop is synchronous. Declare a 'fork' loop if you
|
||||||
|
# want to process the elements in the iterable in parallel
|
||||||
|
_async = True if m.group(1) == 'fork' else False
|
||||||
|
iterator_name = m.group(2)
|
||||||
|
iterable = m.group(3)
|
||||||
|
|
||||||
|
loop = ForProcedure.build(name=loop_name, _async=_async,
|
||||||
|
requests=request_config[key],
|
||||||
|
backend=backend, id=id,
|
||||||
|
iterator_name=iterator_name,
|
||||||
|
iterable=iterable)
|
||||||
|
|
||||||
|
reqs.append(loop)
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Check if this request is a while loop
|
||||||
|
if len(request_config.keys()) == 1:
|
||||||
|
key = list(request_config.keys())[0]
|
||||||
|
m = re.match('\s*while\s+\${(.*)}\s*', key)
|
||||||
|
|
||||||
|
if m:
|
||||||
|
while_count += 1
|
||||||
|
loop_name = '{}__while_{}'.format(name, while_count)
|
||||||
|
condition = m.group(1).strip()
|
||||||
|
|
||||||
|
loop = WhileProcedure.build(name=loop_name, _async=False,
|
||||||
|
requests=request_config[key],
|
||||||
|
condition=condition,
|
||||||
|
backend=backend, id=id)
|
||||||
|
|
||||||
|
reqs.append(loop)
|
||||||
|
continue
|
||||||
|
|
||||||
request_config['origin'] = Config.get('device_id')
|
request_config['origin'] = Config.get('device_id')
|
||||||
request_config['id'] = id
|
request_config['id'] = id
|
||||||
if 'target' not in request_config:
|
if 'target' not in request_config:
|
||||||
|
@ -114,12 +146,25 @@ class Procedure(object):
|
||||||
|
|
||||||
return procedure_class(name=name, _async=_async, requests=reqs, args=args, backend=backend, **kwargs)
|
return procedure_class(name=name, _async=_async, requests=reqs, args=args, backend=backend, **kwargs)
|
||||||
|
|
||||||
def execute(self, n_tries=1, **context):
|
@staticmethod
|
||||||
|
def _find_nearest_loop(stack):
|
||||||
|
for proc in stack[::-1]:
|
||||||
|
if isinstance(proc, LoopProcedure):
|
||||||
|
return proc
|
||||||
|
|
||||||
|
raise AssertionError('break/continue statement found outside of a loop')
|
||||||
|
|
||||||
|
def execute(self, n_tries=1, __stack__=None, **context):
|
||||||
"""
|
"""
|
||||||
Execute the requests in the procedure
|
Execute the requests in the procedure
|
||||||
Params:
|
Params:
|
||||||
n_tries -- Number of tries in case of failure before raising a RuntimeError
|
n_tries -- Number of tries in case of failure before raising a RuntimeError
|
||||||
"""
|
"""
|
||||||
|
if not __stack__:
|
||||||
|
__stack__ = [self]
|
||||||
|
else:
|
||||||
|
__stack__.append(self)
|
||||||
|
|
||||||
if self.args:
|
if self.args:
|
||||||
args = self.args.copy()
|
args = self.args.copy()
|
||||||
for k, v in args.items():
|
for k, v in args.items():
|
||||||
|
@ -134,12 +179,35 @@ class Procedure(object):
|
||||||
token = Config.get('token')
|
token = Config.get('token')
|
||||||
|
|
||||||
for request in self.requests:
|
for request in self.requests:
|
||||||
|
if isinstance(request, Statement):
|
||||||
|
if request == Statement.RETURN:
|
||||||
|
self._should_return = True
|
||||||
|
for proc in __stack__:
|
||||||
|
proc._should_return = True
|
||||||
|
break
|
||||||
|
|
||||||
|
if request in [Statement.BREAK, Statement.CONTINUE]:
|
||||||
|
loop = self._find_nearest_loop(__stack__)
|
||||||
|
if request == Statement.BREAK:
|
||||||
|
loop._should_break = True
|
||||||
|
else:
|
||||||
|
loop._should_continue = True
|
||||||
|
break
|
||||||
|
|
||||||
|
if isinstance(self, LoopProcedure):
|
||||||
|
if self._should_continue or self._should_break:
|
||||||
|
if self._should_continue:
|
||||||
|
# noinspection PyAttributeOutsideInit
|
||||||
|
self._should_continue = False
|
||||||
|
|
||||||
|
break
|
||||||
|
|
||||||
if token:
|
if token:
|
||||||
request.token = token
|
request.token = token
|
||||||
|
|
||||||
context['_async'] = self._async
|
context['_async'] = self._async
|
||||||
context['n_tries'] = n_tries
|
context['n_tries'] = n_tries
|
||||||
response = request.execute(**context)
|
response = request.execute(__stack__=__stack__, **context)
|
||||||
|
|
||||||
if not self._async and response:
|
if not self._async and response:
|
||||||
if isinstance(response.output, dict):
|
if isinstance(response.output, dict):
|
||||||
|
@ -149,60 +217,171 @@ class Procedure(object):
|
||||||
context['output'] = response.output
|
context['output'] = response.output
|
||||||
context['errors'] = response.errors
|
context['errors'] = response.errors
|
||||||
|
|
||||||
|
if self._should_return:
|
||||||
|
break
|
||||||
|
|
||||||
return response or Response()
|
return response or Response()
|
||||||
|
|
||||||
|
|
||||||
class LoopProcedure(Procedure):
|
class LoopProcedure(Procedure):
|
||||||
|
"""
|
||||||
|
Base class while and for/fork loops.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, name, requests, _async=False, args=None, backend=None):
|
||||||
|
super(). __init__(name=name, _async=_async, requests=requests, args=args, backend=backend)
|
||||||
|
self._should_break = False
|
||||||
|
self._should_continue = False
|
||||||
|
|
||||||
|
|
||||||
|
class ForProcedure(LoopProcedure):
|
||||||
"""
|
"""
|
||||||
Models a loop procedure, which expresses a construct similar to a
|
Models a loop procedure, which expresses a construct similar to a
|
||||||
for loop in a programming language. The 'for' keyword implies a synchronous
|
for loop in a programming language. The 'for' keyword implies a synchronous
|
||||||
loop, i.e. the nested actions will be executed in sequence. Use 'fork'
|
loop, i.e. the nested actions will be executed in sequence. Use 'fork'
|
||||||
instead of 'for' if you want to run the actions in parallel.
|
instead of 'for' if you want to run the actions in parallel.
|
||||||
|
|
||||||
Example:
|
Example::
|
||||||
|
|
||||||
procedure.sync.process_results:
|
procedure.sync.process_results:
|
||||||
-
|
- action: http.get
|
||||||
action: http.get
|
args:
|
||||||
args:
|
url: https://some-service/some/json/endpoint
|
||||||
url: https://some-service/some/json/endpoint
|
# Example response: { "results": [ {"id":1, "name":"foo"}, {"id":2,"name":"bar"} ]}
|
||||||
# Example response: { "results": [ {"id":1, "name":"foo"}, {"id":2,"name":"bar"} ]}
|
- for result in ${results}:
|
||||||
-
|
- action: some.custom.action
|
||||||
for result in ${results}:
|
args:
|
||||||
-
|
id: ${result['id']}
|
||||||
action: some.custom.action
|
name: ${result['name']}
|
||||||
args:
|
|
||||||
id: ${result['id']}
|
|
||||||
name: ${result['name']}
|
|
||||||
"""
|
|
||||||
|
|
||||||
context = {}
|
"""
|
||||||
|
|
||||||
def __init__(self, name, iterator_name, iterable, requests, _async=False, args=None, backend=None):
|
def __init__(self, name, iterator_name, iterable, requests, _async=False, args=None, backend=None):
|
||||||
super(). __init__(name=name, _async=_async, requests=requests, args=args, backend=backend)
|
super(). __init__(name=name, _async=_async, requests=requests, args=args, backend=backend)
|
||||||
|
|
||||||
self.iterator_name = iterator_name
|
self.iterator_name = iterator_name
|
||||||
self.iterable = iterable
|
self.iterable = iterable
|
||||||
self.requests = requests
|
|
||||||
|
|
||||||
def execute(self, _async=None, **context):
|
def execute(self, _async=None, **context):
|
||||||
# noinspection PyBroadException
|
# noinspection PyBroadException
|
||||||
try:
|
try:
|
||||||
iterable = eval(self.iterable)
|
iterable = eval(self.iterable)
|
||||||
if not hasattr(iterable, '__iter__'):
|
assert hasattr(iterable, '__iter__'), 'Object of type {} is not iterable: {}'.\
|
||||||
raise RuntimeError
|
format(type(iterable), iterable)
|
||||||
except:
|
except:
|
||||||
iterable = Request.expand_value_from_context(self.iterable, **context)
|
iterable = Request.expand_value_from_context(self.iterable, **context)
|
||||||
|
|
||||||
response = Response()
|
response = Response()
|
||||||
|
|
||||||
|
# noinspection DuplicatedCode
|
||||||
for item in iterable:
|
for item in iterable:
|
||||||
|
if self._should_return:
|
||||||
|
logger.info('Returning from {}'.format(self.name))
|
||||||
|
break
|
||||||
|
|
||||||
|
if self._should_continue:
|
||||||
|
self._should_continue = False
|
||||||
|
logger.info('Continuing loop {}'.format(self.name))
|
||||||
|
continue
|
||||||
|
|
||||||
|
if self._should_break:
|
||||||
|
self._should_break = False
|
||||||
|
logger.info('Breaking loop {}'.format(self.name))
|
||||||
|
break
|
||||||
|
|
||||||
context[self.iterator_name] = item
|
context[self.iterator_name] = item
|
||||||
response = super().execute(**context)
|
response = super().execute(**context)
|
||||||
|
|
||||||
return response
|
return response
|
||||||
|
|
||||||
|
|
||||||
|
class WhileProcedure(LoopProcedure):
|
||||||
|
"""
|
||||||
|
Models a while loop procedure.
|
||||||
|
|
||||||
|
Example::
|
||||||
|
|
||||||
|
procedure.process_results:
|
||||||
|
- action: http.get
|
||||||
|
args:
|
||||||
|
url: https://some-service/some/json/endpoint
|
||||||
|
# Example response: {"id":1, "name":"foo"}}
|
||||||
|
|
||||||
|
- while ${output}:
|
||||||
|
- action: some.custom.action
|
||||||
|
args:
|
||||||
|
id: ${id}
|
||||||
|
name: ${name}
|
||||||
|
|
||||||
|
- action: http.get
|
||||||
|
args:
|
||||||
|
url: https://some-service/some/json/endpoint
|
||||||
|
# Example response: {"id":1, "name":"foo"}}
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, name, condition, requests, _async=False, args=None, backend=None):
|
||||||
|
super(). __init__(name=name, _async=_async, requests=requests, args=args, backend=backend)
|
||||||
|
self.condition = condition
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _get_context(**context):
|
||||||
|
for (k, v) in context.items():
|
||||||
|
# noinspection PyBroadException
|
||||||
|
try:
|
||||||
|
context[k] = eval(v)
|
||||||
|
except:
|
||||||
|
if isinstance(v, str):
|
||||||
|
# noinspection PyBroadException
|
||||||
|
try:
|
||||||
|
context[k] = eval('"{}"'.format(re.sub('(^|[^\\\])"', '\1\\"', v)))
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
|
return context
|
||||||
|
|
||||||
|
# noinspection DuplicatedCode,PyBroadException
|
||||||
|
def execute(self, _async=None, **context):
|
||||||
|
response = Response()
|
||||||
|
context = self._get_context(**context)
|
||||||
|
for k, v in context.items():
|
||||||
|
try:
|
||||||
|
exec('{}={}'.format(k, v))
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
|
while True:
|
||||||
|
condition_true = eval(self.condition)
|
||||||
|
if not condition_true:
|
||||||
|
break
|
||||||
|
|
||||||
|
if self._should_return:
|
||||||
|
logger.info('Returning from {}'.format(self.name))
|
||||||
|
break
|
||||||
|
|
||||||
|
if self._should_continue:
|
||||||
|
self._should_continue = False
|
||||||
|
logger.info('Continuing loop {}'.format(self.name))
|
||||||
|
continue
|
||||||
|
|
||||||
|
if self._should_break:
|
||||||
|
self._should_break = False
|
||||||
|
logger.info('Breaking loop {}'.format(self.name))
|
||||||
|
break
|
||||||
|
|
||||||
|
response = super().execute(**context)
|
||||||
|
|
||||||
|
if response.output:
|
||||||
|
if isinstance(response.output, dict):
|
||||||
|
new_context = self._get_context(**response.output)
|
||||||
|
for k, v in new_context.items():
|
||||||
|
try:
|
||||||
|
exec('{}={}'.format(k, v))
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
|
return response
|
||||||
|
|
||||||
|
|
||||||
# noinspection PyBroadException
|
# noinspection PyBroadException
|
||||||
class IfProcedure(Procedure):
|
class IfProcedure(Procedure):
|
||||||
"""
|
"""
|
||||||
|
@ -229,8 +408,6 @@ class IfProcedure(Procedure):
|
||||||
cmd: '/path/turn_off_heating.sh'
|
cmd: '/path/turn_off_heating.sh'
|
||||||
"""
|
"""
|
||||||
|
|
||||||
context = {}
|
|
||||||
|
|
||||||
def __init__(self, name, condition, requests, else_branch=None, args=None, backend=None, id=None, **kwargs):
|
def __init__(self, name, condition, requests, else_branch=None, args=None, backend=None, id=None, **kwargs):
|
||||||
kwargs['_async'] = False
|
kwargs['_async'] = False
|
||||||
self.condition = condition
|
self.condition = condition
|
||||||
|
|
Loading…
Reference in a new issue