Support for set/clear timeout/interval utils actions and error handling refactor
- Added set/cler timeout/interval actions in the utils plugin to dynamically set or stop timed actions from your custom logic - Actions error handling refactoring. If a Response is processed with some errors, then just log the error and return it to the caller, without raising an exception. If instead the action execution raised an uncaught exception, then handle the retries properly and return errors as a list with the output from all the retries, without being too verbose with the returning and logging the whole stack trace multiple times.
This commit is contained in:
parent
2a52eb770b
commit
8ea0519954
3 changed files with 140 additions and 11 deletions
|
@ -190,7 +190,7 @@ class Request(Message):
|
||||||
- group: ${group_name} # will be expanded as "Kitchen lights")
|
- group: ${group_name} # will be expanded as "Kitchen lights")
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def _thread_func(n_tries):
|
def _thread_func(n_tries, errors=None):
|
||||||
if self.action.startswith('procedure.'):
|
if self.action.startswith('procedure.'):
|
||||||
context['n_tries'] = n_tries
|
context['n_tries'] = n_tries
|
||||||
response = self._execute_procedure(**context)
|
response = self._execute_procedure(**context)
|
||||||
|
@ -207,18 +207,25 @@ class Request(Message):
|
||||||
response = plugin.run(method=method_name, **args)
|
response = plugin.run(method=method_name, **args)
|
||||||
|
|
||||||
if response and response.is_error():
|
if response and response.is_error():
|
||||||
raise RuntimeError('Response processed with errors: {}'.format(response))
|
logger.warning(('Response processed with errors from ' +
|
||||||
|
'action {}.{}: {}').format(
|
||||||
logger.info('Processed response from plugin {}: {}'.
|
plugin, self.action, str(response)))
|
||||||
format(plugin, str(response)))
|
else:
|
||||||
|
logger.info('Processed response from action {}.{}: {}'.
|
||||||
|
format(plugin, self.action, str(response)))
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
# Retry mechanism
|
# Retry mechanism
|
||||||
response = Response(output=None, errors=[str(e), traceback.format_exc()])
|
logger.warning(('Uncaught exception while processing response ' +
|
||||||
logger.exception(e)
|
'from action {}.{}: {}').format(
|
||||||
if n_tries:
|
plugin, self.action, str(e)))
|
||||||
|
|
||||||
|
errors = errors or []
|
||||||
|
errors.append(str(e))
|
||||||
|
response = Response(output=None, errors=errors)
|
||||||
|
if n_tries-1 > 0:
|
||||||
logger.info('Reloading plugin {} and retrying'.format(module_name))
|
logger.info('Reloading plugin {} and retrying'.format(module_name))
|
||||||
get_plugin(module_name, reload=True)
|
get_plugin(module_name, reload=True)
|
||||||
response = _thread_func(n_tries-1)
|
response = _thread_func(n_tries=n_tries-1, errors=errors)
|
||||||
finally:
|
finally:
|
||||||
self._send_response(response)
|
self._send_response(response)
|
||||||
return response
|
return response
|
||||||
|
|
|
@ -29,7 +29,8 @@ def action(f):
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
if isinstance(args[0], Plugin):
|
if isinstance(args[0], Plugin):
|
||||||
args[0].logger.exception(e)
|
args[0].logger.exception(e)
|
||||||
errors.append(str(e) + '\n' + traceback.format_exc())
|
raise e
|
||||||
|
# errors.append(str(e) + '\n' + traceback.format_exc())
|
||||||
|
|
||||||
return Response(output=output, errors=errors)
|
return Response(output=output, errors=errors)
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,8 @@
|
||||||
|
import threading
|
||||||
import time
|
import time
|
||||||
|
|
||||||
from platypush.plugins import Plugin, action
|
from platypush.plugins import Plugin, action
|
||||||
|
from platypush.procedure import Procedure
|
||||||
|
|
||||||
|
|
||||||
class UtilsPlugin(Plugin):
|
class UtilsPlugin(Plugin):
|
||||||
|
@ -8,6 +10,17 @@ class UtilsPlugin(Plugin):
|
||||||
A plugin for general-purpose util methods
|
A plugin for general-purpose util methods
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
_DEFAULT_TIMEOUT_PREFIX = '_PlatypushTimeout_'
|
||||||
|
_timeout_hndl_idx = 0
|
||||||
|
_timeout_hndl_idx_lock = threading.RLock()
|
||||||
|
|
||||||
|
_DEFAULT_INTERVAL_PREFIX = '_PlatypushInterval_'
|
||||||
|
_interval_hndl_idx = 0
|
||||||
|
_interval_hndl_idx_lock = threading.RLock()
|
||||||
|
|
||||||
|
_pending_intervals = {}
|
||||||
|
_pending_intervals_lock = threading.RLock()
|
||||||
|
|
||||||
@action
|
@action
|
||||||
def sleep(self, seconds):
|
def sleep(self, seconds):
|
||||||
"""
|
"""
|
||||||
|
@ -19,6 +32,114 @@ class UtilsPlugin(Plugin):
|
||||||
|
|
||||||
time.sleep(seconds)
|
time.sleep(seconds)
|
||||||
|
|
||||||
|
@action
|
||||||
|
def set_timeout(self, seconds, actions, name=None, **args):
|
||||||
|
"""
|
||||||
|
Define a set of actions to run after the specified amount of `seconds`.
|
||||||
|
|
||||||
|
:param seconds: Number of seconds before running the timeout procedure
|
||||||
|
:type seconds: float
|
||||||
|
|
||||||
|
:param actions: List of actions to be executed after the timeout expires
|
||||||
|
:type actions: list[dict]
|
||||||
|
|
||||||
|
:param name: Set an optional name for this timeout. It is advised to set
|
||||||
|
a name if you are planning to programmatically cancel the timeout in
|
||||||
|
your business logic.
|
||||||
|
:type name: str
|
||||||
|
|
||||||
|
:param args: Optional arguments/context to pass to the timeout function
|
||||||
|
"""
|
||||||
|
|
||||||
|
with self._timeout_hndl_idx_lock:
|
||||||
|
self._timeout_hndl_idx += 1
|
||||||
|
if not name:
|
||||||
|
name = self._DEFAULT_TIMEOUT_PREFIX + str(self._timeout_hndl_idx)
|
||||||
|
if name in self._pending_timeouts:
|
||||||
|
return (None,
|
||||||
|
"A timeout named '{}' is already awaiting".format(name))
|
||||||
|
|
||||||
|
|
||||||
|
procedure = Procedure.build(name=name, requests=actions, _async=False)
|
||||||
|
self._pending_timeouts[name] = procedure
|
||||||
|
|
||||||
|
def _proc_wrapper(**kwargs):
|
||||||
|
procedure.execute(**kwargs)
|
||||||
|
|
||||||
|
with self._pending_timeouts_lock:
|
||||||
|
self._pending_timeouts[name] = threading.Timer(seconds,
|
||||||
|
_proc_wrapper,
|
||||||
|
kwargs=args)
|
||||||
|
self._pending_timeouts[name].start()
|
||||||
|
|
||||||
|
@action
|
||||||
|
def clear_timeout(self, name):
|
||||||
|
timer = None
|
||||||
|
|
||||||
|
with self._pending_timeouts_lock:
|
||||||
|
if name not in self._pending_timeouts:
|
||||||
|
return
|
||||||
|
timer = self._pending_timeouts.pop(name)
|
||||||
|
|
||||||
|
timer.cancel()
|
||||||
|
|
||||||
|
|
||||||
|
@action
|
||||||
|
def set_interval(self, seconds, actions, name=None, **args):
|
||||||
|
"""
|
||||||
|
Define a set of actions to run each specified amount of `seconds`.
|
||||||
|
|
||||||
|
:param seconds: Number of seconds between two runs of the interval
|
||||||
|
procedure
|
||||||
|
:type seconds: float
|
||||||
|
|
||||||
|
:param actions: List of actions to be executed at each interval
|
||||||
|
:type actions: list[dict]
|
||||||
|
|
||||||
|
:param name: Set an optional name for this interval. It is advised to
|
||||||
|
set a name if you are planning to programmatically cancel the
|
||||||
|
interval in your business logic.
|
||||||
|
:type name: str
|
||||||
|
|
||||||
|
:param args: Optional arguments/context to pass to the interval function
|
||||||
|
"""
|
||||||
|
|
||||||
|
with self._interval_hndl_idx_lock:
|
||||||
|
self._interval_hndl_idx += 1
|
||||||
|
if not name:
|
||||||
|
name = self._DEFAULT_INTERVAL_PREFIX + \
|
||||||
|
str(self._interval_hndl_idx)
|
||||||
|
|
||||||
|
if name in self._pending_intervals:
|
||||||
|
return (None,
|
||||||
|
"An interval named '{}' is already running".format(name))
|
||||||
|
|
||||||
|
|
||||||
|
procedure = Procedure.build(name=name, requests=actions, _async=False)
|
||||||
|
self._pending_intervals[name] = procedure
|
||||||
|
|
||||||
|
def _proc_wrapper(**kwargs):
|
||||||
|
while True:
|
||||||
|
with self._pending_intervals_lock:
|
||||||
|
if name not in self._pending_intervals:
|
||||||
|
return
|
||||||
|
|
||||||
|
procedure.execute(**kwargs)
|
||||||
|
time.sleep(seconds)
|
||||||
|
|
||||||
|
with self._pending_intervals_lock:
|
||||||
|
self._pending_intervals[name] = threading.Thread(
|
||||||
|
target=_proc_wrapper, kwargs=args)
|
||||||
|
self._pending_intervals[name].start()
|
||||||
|
|
||||||
|
@action
|
||||||
|
def clear_interval(self, name):
|
||||||
|
interval = None
|
||||||
|
|
||||||
|
with self._pending_intervals_lock:
|
||||||
|
if name not in self._pending_intervals:
|
||||||
|
return
|
||||||
|
del self._pending_intervals[name]
|
||||||
|
|
||||||
|
|
||||||
# vim:sw=4:ts=4:et:
|
# vim:sw=4:ts=4:et:
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue