If a request on a RunnablePlugin throws an exception then we should also restart the plugin upon reload

Plus some Black/LINT chores
This commit is contained in:
Fabio Manganiello 2022-07-25 00:41:08 +02:00
parent c32142c8b5
commit 55671f4aff
Signed by untrusted user: blacklight
GPG key ID: D90FBA7F76362774

View file

@ -12,17 +12,30 @@ from platypush.config import Config
from platypush.context import get_plugin
from platypush.message import Message
from platypush.message.response import Response
from platypush.utils import get_hash, get_module_and_method_from_action, get_redis_queue_name_by_message, \
is_functional_procedure
from platypush.utils import (
get_hash,
get_module_and_method_from_action,
get_redis_queue_name_by_message,
is_functional_procedure,
)
logger = logging.getLogger('platypush')
class Request(Message):
""" Request message class """
"""Request message class"""
def __init__(self, target, action, origin=None, id=None, backend=None,
args=None, token=None, timestamp=None):
def __init__(
self,
target,
action,
origin=None,
id=None,
backend=None,
args=None,
token=None,
timestamp=None,
):
"""
Params:
target -- Target node [Str]
@ -48,9 +61,13 @@ class Request(Message):
@classmethod
def build(cls, msg):
msg = super().parse(msg)
args = {'target': msg.get('target', Config.get('device_id')), 'action': msg['action'],
'args': msg.get('args', {}), 'id': msg['id'] if 'id' in msg else cls._generate_id(),
'timestamp': msg['_timestamp'] if '_timestamp' in msg else time.time()}
args = {
'target': msg.get('target', Config.get('device_id')),
'action': msg['action'],
'args': msg.get('args', {}),
'id': msg['id'] if 'id' in msg else cls._generate_id(),
'timestamp': msg['_timestamp'] if '_timestamp' in msg else time.time(),
}
if 'origin' in msg:
args['origin'] = msg['origin']
@ -61,7 +78,7 @@ class Request(Message):
@staticmethod
def _generate_id():
_id = ''
for i in range(0, 16):
for _ in range(0, 16):
_id += '%.2x' % random.randint(0, 255)
return _id
@ -84,9 +101,14 @@ class Request(Message):
return proc_config(*args, **kwargs)
proc = Procedure.build(name=proc_name, requests=proc_config['actions'],
_async=proc_config['_async'], args=self.args,
backend=self.backend, id=self.id)
proc = Procedure.build(
name=proc_name,
requests=proc_config['actions'],
_async=proc_config['_async'],
args=self.args,
backend=self.backend,
id=self.id,
)
return proc.execute(*args, **kwargs)
@ -112,7 +134,7 @@ class Request(Message):
if isinstance(value, str):
value = self.expand_value_from_context(value, **context)
elif isinstance(value, dict) or isinstance(value, list):
elif isinstance(value, (dict, list)):
self._expand_context(event_args=value, **context)
event_args[key] = value
@ -132,7 +154,11 @@ class Request(Message):
try:
exec('{}="{}"'.format(k, re.sub(r'(^|[^\\])"', '\1\\"', v)))
except Exception as e:
logger.debug('Could not set context variable {}={}: {}'.format(k, v, str(e)))
logger.debug(
'Could not set context variable {}={}: {}'.format(
k, v, str(e)
)
)
logger.debug('Context: {}'.format(context))
parsed_value = ''
@ -152,7 +178,7 @@ class Request(Message):
if callable(context_value):
context_value = context_value()
if isinstance(context_value, range) or isinstance(context_value, tuple):
if isinstance(context_value, (range, tuple)):
context_value = [*context_value]
if isinstance(context_value, datetime.date):
context_value = context_value.isoformat()
@ -162,7 +188,7 @@ class Request(Message):
parsed_value += prefix + (
json.dumps(context_value)
if isinstance(context_value, list) or isinstance(context_value, dict)
if isinstance(context_value, (list, dict))
else str(context_value)
)
else:
@ -205,6 +231,9 @@ class Request(Message):
"""
def _thread_func(_n_tries, errors=None):
from platypush.context import get_bus
from platypush.plugins import RunnablePlugin
response = None
try:
@ -221,11 +250,15 @@ class Request(Message):
return response
else:
action = self.expand_value_from_context(self.action, **context)
(module_name, method_name) = get_module_and_method_from_action(action)
(module_name, method_name) = get_module_and_method_from_action(
action
)
plugin = get_plugin(module_name)
except Exception as e:
logger.exception(e)
msg = 'Uncaught pre-processing exception from action [{}]: {}'.format(self.action, str(e))
msg = 'Uncaught pre-processing exception from action [{}]: {}'.format(
self.action, str(e)
)
logger.warning(msg)
response = Response(output=None, errors=[msg])
self._send_response(response)
@ -243,24 +276,37 @@ class Request(Message):
response = plugin.run(method_name, args)
if not response:
logger.warning('Received null response from action {}'.format(action))
logger.warning(
'Received null response from action {}'.format(action)
)
else:
if response.is_error():
logger.warning(('Response processed with errors from ' +
'action {}: {}').format(
action, str(response)))
logger.warning(
(
'Response processed with errors from ' + 'action {}: {}'
).format(action, str(response))
)
elif not response.disable_logging:
logger.info('Processed response from action {}: {}'.
format(action, str(response)))
logger.info(
'Processed response from action {}: {}'.format(
action, str(response)
)
)
except (AssertionError, TimeoutError) as e:
plugin.logger.exception(e)
logger.warning('{} from action [{}]: {}'.format(type(e), action, str(e)))
logger.warning(
'{} from action [{}]: {}'.format(type(e), action, str(e))
)
response = Response(output=None, errors=[str(e)])
except Exception as e:
# Retry mechanism
plugin.logger.exception(e)
logger.warning(('Uncaught exception while processing response ' +
'from action [{}]: {}').format(action, str(e)))
logger.warning(
(
'Uncaught exception while processing response '
+ 'from action [{}]: {}'
).format(action, str(e))
)
errors = errors or []
if str(e) not in errors:
@ -269,17 +315,21 @@ class Request(Message):
response = Response(output=None, errors=errors)
if _n_tries - 1 > 0:
logger.info('Reloading plugin {} and retrying'.format(module_name))
get_plugin(module_name, reload=True)
response = _thread_func(_n_tries=_n_tries-1, errors=errors)
plugin = get_plugin(module_name, reload=True)
if isinstance(plugin, RunnablePlugin):
plugin.bus = get_bus()
plugin.start()
response = _thread_func(_n_tries=_n_tries - 1, errors=errors)
finally:
self._send_response(response)
return response
token_hash = Config.get('token_hash')
return response
if token_hash:
if self.token is None or get_hash(self.token) != token_hash:
raise PermissionError()
stored_token_hash = Config.get('token_hash')
token = getattr(self, 'token', '')
if stored_token_hash and get_hash(token) != stored_token_hash:
raise PermissionError()
if _async:
Thread(target=_thread_func, args=(n_tries,)).start()
@ -292,15 +342,18 @@ class Request(Message):
the message into a UTF-8 JSON string
"""
return json.dumps({
'type': 'request',
'target': self.target,
'action': self.action,
'args': self.args,
'origin': self.origin if hasattr(self, 'origin') else None,
'id': self.id if hasattr(self, 'id') else None,
'token': self.token if hasattr(self, 'token') else None,
'_timestamp': self.timestamp,
})
return json.dumps(
{
'type': 'request',
'target': self.target,
'action': self.action,
'args': self.args,
'origin': self.origin if hasattr(self, 'origin') else None,
'id': self.id if hasattr(self, 'id') else None,
'token': self.token if hasattr(self, 'token') else None,
'_timestamp': self.timestamp,
}
)
# vim:sw=4:ts=4:et: