platypush/platypush/procedure/__init__.py
Fabio Manganiello 9c3da7a2a9
Several improvements for request/procedure execution.
- Fixed regression introduced by incorrect format string in `exec`.

- LINT for the `procedure` module.

- Apply `Message.Encoder` when dumping values from the context.
2024-01-04 13:13:16 +01:00

592 lines
18 KiB
Python

import enum
import logging
import re
from functools import wraps
from queue import LifoQueue
from ..common import exec_wrapper
from ..config import Config
from ..message.request import Request
from ..message.response import Response
logger = logging.getLogger('platypush')
class Statement(enum.Enum):
"""
Enumerates the possible statements in a procedure.
"""
BREAK = 'break'
CONTINUE = 'continue'
RETURN = 'return'
class Procedure:
"""Procedure class. A procedure is a pre-configured list of requests"""
def __init__(self, name, _async, requests, args=None, backend=None):
"""
Params:
name -- Procedure name
_async -- Whether the actions in the procedure are supposed to
be executed sequentially or in parallel (True or False)
requests -- List of platypush.message.request.Request objects
"""
self.name = name
self._async = _async
self.requests = requests
self.backend = backend
self.args = args or {}
self._should_return = False
for req in requests:
req.backend = self.backend
@classmethod
# pylint: disable=too-many-branches,too-many-statements
def build(
cls,
name,
_async,
requests,
args=None,
backend=None,
id=None, # pylint: disable=redefined-builtin
procedure_class=None,
**kwargs,
):
reqs = []
for_count = 0
while_count = 0
if_count = 0
if_config = LifoQueue()
procedure_class = procedure_class or cls
key = None
for request_config in requests:
# Check if it's a break/continue/return statement
if isinstance(request_config, str):
reqs.append(Statement(request_config))
continue
# Check if this request is an if-else
if len(request_config.keys()) >= 1:
key = list(request_config.keys())[0]
m = re.match(r'\s*(if)\s+\${(.*)}\s*', key)
if m:
if_count += 1
if_name = f'{name}__if_{if_count}'
condition = m.group(2)
if_config.put(
{
'name': if_name,
'_async': False,
'requests': request_config[key],
'condition': condition,
'else_branch': [],
'backend': backend,
'id': id,
}
)
continue
if key == 'else':
if if_config.empty():
raise RuntimeError(
f'else statement with no associated if in {name}'
)
conf = if_config.get()
conf['else_branch'] = request_config[key]
if_config.put(conf)
if not if_config.empty():
reqs.append(IfProcedure.build(**(if_config.get())))
if key == 'else':
continue
# Check if this request is a for loop
if len(request_config.keys()) == 1:
key = list(request_config.keys())[0]
m = re.match(r'\s*(fork?)\s+([\w\d_]+)\s+in\s+(.*)\s*', key)
if m:
for_count += 1
loop_name = f'{name}__for_{for_count}'
# A 'for' loop is synchronous. Declare a 'fork' loop if you
# want to process the elements in the iterable in parallel
_async = m.group(1) == 'fork'
iterator_name = m.group(2)
iterable = m.group(3)
loop = ForProcedure.build(
name=loop_name,
_async=_async,
requests=request_config[key],
backend=backend,
id=id,
iterator_name=iterator_name,
iterable=iterable,
)
reqs.append(loop)
continue
# Check if this request is a while loop
if len(request_config.keys()) == 1:
key = list(request_config.keys())[0]
m = re.match(r'\s*while\s+\${(.*)}\s*', key)
if m:
while_count += 1
loop_name = f'{name}__while_{while_count}'
condition = m.group(1).strip()
loop = WhileProcedure.build(
name=loop_name,
_async=False,
requests=request_config[key],
condition=condition,
backend=backend,
id=id,
)
reqs.append(loop)
continue
request_config['origin'] = Config.get('device_id')
request_config['id'] = id
if 'target' not in request_config:
request_config['target'] = request_config['origin']
request = Request.build(request_config)
reqs.append(request)
while not if_config.empty():
pending_if = if_config.get()
reqs.append(IfProcedure.build(**pending_if))
return procedure_class(
name=name,
_async=_async,
requests=reqs,
args=args,
backend=backend,
**kwargs,
)
@staticmethod
def _find_nearest_loop(stack):
for proc in stack[::-1]:
if isinstance(proc, LoopProcedure):
return proc
raise AssertionError('break/continue statement found outside of a loop')
# pylint: disable=too-many-branches,too-many-statements
def execute(self, n_tries=1, __stack__=None, **context):
"""
Execute the requests in the procedure.
:param n_tries: Number of tries in case of failure before raising a RuntimeError.
"""
if not __stack__:
__stack__ = [self]
else:
__stack__.append(self)
if self.args:
args = self.args.copy()
for k, v in args.items():
v = Request.expand_value_from_context(v, **context)
args[k] = v
context[k] = v
logger.info('Executing procedure %s with arguments %s', self.name, args)
else:
logger.info('Executing procedure %s', self.name)
response = Response()
token = Config.get('token')
for request in self.requests:
if callable(request):
response = request(**context)
continue
if isinstance(request, Statement):
if request == Statement.RETURN:
self._should_return = True
for proc in __stack__:
proc._should_return = True # pylint: disable=protected-access
break
if request in [Statement.BREAK, Statement.CONTINUE]:
loop = self._find_nearest_loop(__stack__)
if request == Statement.BREAK:
loop._should_break = True # pylint: disable=protected-access
else:
loop._should_continue = True # pylint: disable=protected-access
break
should_continue = getattr(self, '_should_continue', False)
should_break = getattr(self, '_should_break', False)
if isinstance(self, LoopProcedure) and (should_continue or should_break):
if should_continue:
self._should_continue = ( # pylint: disable=attribute-defined-outside-init
False
)
break
if token and not isinstance(request, Statement):
request.token = token
context['_async'] = self._async
context['n_tries'] = n_tries
exec_ = getattr(request, 'execute', None)
if callable(exec_):
response = exec_(__stack__=__stack__, **context)
if not self._async and response:
if isinstance(response.output, dict):
for k, v in response.output.items():
context[k] = v
context['output'] = response.output
context['errors'] = response.errors
if self._should_return:
break
return response or Response()
def to_dict(self):
return {
'name': self.name,
'requests': self.requests,
'args': self.args,
'_async': self._async,
}
class LoopProcedure(Procedure):
"""
Base class while and for/fork loops.
"""
def __init__(self, name, requests, _async=False, args=None, backend=None):
super().__init__(
name=name, _async=_async, requests=requests, args=args, backend=backend
)
self._should_break = False
self._should_continue = False
class ForProcedure(LoopProcedure):
"""
Models a loop procedure, which expresses a construct similar to a
for loop in a programming language. The 'for' keyword implies a synchronous
loop, i.e. the nested actions will be executed in sequence. Use 'fork'
instead of 'for' if you want to run the actions in parallel.
Example::
procedure.sync.process_results:
- action: http.get
args:
url: https://some-service/some/json/endpoint
# Example response: { "results": [ {"id":1, "name":"foo"}, {"id":2,"name":"bar"} ]}
- for result in ${results}:
- action: some.custom.action
args:
id: ${result['id']}
name: ${result['name']}
"""
def __init__(
self,
name,
iterator_name,
iterable,
requests,
_async=False,
args=None,
backend=None,
):
super().__init__(
name=name, _async=_async, requests=requests, args=args, backend=backend
)
self.iterator_name = iterator_name
self.iterable = iterable
# pylint: disable=eval-used
def execute(self, *_, **context):
try:
iterable = eval(self.iterable)
assert hasattr(
iterable, '__iter__'
), f'Object of type {type(iterable)} is not iterable: {iterable}'
except Exception as e:
logger.debug('Iterable %s expansion error: %s', self.iterable, e)
iterable = Request.expand_value_from_context(self.iterable, **context)
response = Response()
for item in iterable:
if self._should_return:
logger.info('Returning from %s', self.name)
break
if self._should_continue:
self._should_continue = False
logger.info('Continuing loop %s', self.name)
continue
if self._should_break:
self._should_break = False
logger.info('Breaking loop %s', self.name)
break
context[self.iterator_name] = item
response = super().execute(**context)
return response
class WhileProcedure(LoopProcedure):
"""
Models a while loop procedure.
Example::
procedure.process_results:
- action: http.get
args:
url: https://some-service/some/json/endpoint
# Example response: {"id":1, "name":"foo"}}
- while ${output}:
- action: some.custom.action
args:
id: ${id}
name: ${name}
- action: http.get
args:
url: https://some-service/some/json/endpoint
# Example response: {"id":1, "name":"foo"}}
"""
def __init__(
self, name, condition, requests, _async=False, args=None, backend=None
):
super().__init__(
name=name, _async=_async, requests=requests, args=args, backend=backend
)
self.condition = condition
@staticmethod
def _get_context(**context):
for k, v in context.items():
try:
context[k] = eval(v) # pylint: disable=eval-used
except Exception as e:
logger.debug('Evaluation error for %s=%s: %s', k, v, e)
if isinstance(v, str):
try:
context[k] = eval( # pylint: disable=eval-used
'"' + re.sub(r'(^|[^\\])"', '\1\\"', v) + '"'
)
except Exception as ee:
logger.warning(
'Could not parse value for context variable %s=%s: %s',
k,
v,
ee,
)
logger.warning('Context: %s', context)
logger.exception(e)
return context
def execute(self, *_, **context):
response = Response()
context = self._get_context(**context)
for k, v in context.items():
try:
exec(f'{k}={v}') # pylint: disable=exec-used
except Exception as e:
logger.debug('Evaluation error: %s=%s: %s', k, v, e)
while True:
condition_true = eval(self.condition) # pylint: disable=eval-used
if not condition_true:
break
if self._should_return:
logger.info('Returning from %s', self.name)
break
if self._should_continue:
self._should_continue = False
logger.info('Continuing loop %s', self.name)
continue
if self._should_break:
self._should_break = False
logger.info('Breaking loop %s', self.name)
break
response = super().execute(**context)
if response.output and isinstance(response.output, dict):
new_context = self._get_context(**response.output)
for k, v in new_context.items():
try:
exec(f'{k}={v}') # pylint: disable=exec-used
except Exception as e:
logger.debug('Evaluation error: %s=%s: %s', k, v, e)
return response
class IfProcedure(Procedure):
"""
Models an if-else construct.
Example::
procedure.sync.process_results:
- action: http.get
args:
url: https://some-service/some/json/endpoint
# Example response: { "sensors": [ {"temperature": 18 } ] }
- if ${sensors['temperature'] < 20}:
- action: shell.exec
args:
cmd: '/path/turn_on_heating.sh'
- else:
- action: shell.exec
args:
cmd: '/path/turn_off_heating.sh'
"""
def __init__(
self,
name,
condition,
requests,
else_branch=None,
args=None,
backend=None,
id=None, # pylint: disable=redefined-builtin
**kwargs,
):
kwargs['_async'] = False
self.condition = condition
self.else_branch = else_branch
reqs = []
for req in requests:
if isinstance(req, dict):
req['origin'] = Config.get('device_id')
req['id'] = id
if 'target' not in req:
req['target'] = req['origin']
req = Request.build(req)
reqs.append(req)
super().__init__(name=name, requests=reqs, args=args, backend=backend, **kwargs)
@classmethod
# pylint: disable=arguments-differ
def build(
cls,
name,
*_,
condition,
requests,
args=None,
backend=None,
id=None, # pylint: disable=redefined-builtin
else_branch=None,
**kwargs,
):
kwargs['_async'] = False
if else_branch:
else_branch = super().build(
name=name + '__else',
requests=else_branch,
args=args,
backend=backend,
id=id,
procedure_class=Procedure,
**kwargs,
)
return super().build(
name=name,
condition=condition,
requests=requests,
else_branch=else_branch,
args=args,
backend=backend,
id=id,
**kwargs,
)
def execute(self, *_, **context):
for k, v in context.items():
try:
exec(f'{k}={v}') # pylint: disable=exec-used
except Exception:
if isinstance(v, str):
try:
exec( # pylint: disable=exec-used
f'{k}="' + re.sub(r'(^|[^\\])"', '\1\\"', v) + '"'
)
except Exception as e:
logger.debug(
'Could not set context variable %s=%s: %s', k, v, e
)
logger.debug('Context: %s', context)
condition_true = eval(self.condition) # pylint: disable=eval-used
response = Response()
if condition_true:
response = super().execute(**context)
elif self.else_branch:
response = self.else_branch.execute(**context)
return response
def procedure(f):
"""
Public decorator to mark a function as a procedure.
"""
f.procedure = True
@wraps(f)
def _execute_procedure(*args, **kwargs):
return exec_wrapper(f, *args, **kwargs)
return _execute_procedure
# vim:sw=4:ts=4:et: