diff --git a/platypush/message/request/__init__.py b/platypush/message/request/__init__.py index 50838a8f76..174a91fdc8 100644 --- a/platypush/message/request/__init__.py +++ b/platypush/message/request/__init__.py @@ -190,7 +190,7 @@ class Request(Message): - group: ${group_name} # will be expanded as "Kitchen lights") """ - def _thread_func(n_tries): + def _thread_func(n_tries, errors=None): if self.action.startswith('procedure.'): context['n_tries'] = n_tries response = self._execute_procedure(**context) @@ -207,18 +207,25 @@ class Request(Message): response = plugin.run(method=method_name, **args) if response and response.is_error(): - raise RuntimeError('Response processed with errors: {}'.format(response)) - - logger.info('Processed response from plugin {}: {}'. - format(plugin, str(response))) + logger.warning(('Response processed with errors from ' + + 'action {}.{}: {}').format( + plugin, self.action, str(response))) + else: + logger.info('Processed response from action {}.{}: {}'. + format(plugin, self.action, str(response))) except Exception as e: # Retry mechanism - response = Response(output=None, errors=[str(e), traceback.format_exc()]) - logger.exception(e) - if n_tries: + logger.warning(('Uncaught exception while processing response ' + + 'from action {}.{}: {}').format( + plugin, self.action, str(e))) + + errors = errors or [] + errors.append(str(e)) + response = Response(output=None, errors=errors) + if n_tries-1 > 0: logger.info('Reloading plugin {} and retrying'.format(module_name)) get_plugin(module_name, reload=True) - response = _thread_func(n_tries-1) + response = _thread_func(n_tries=n_tries-1, errors=errors) finally: self._send_response(response) return response diff --git a/platypush/plugins/__init__.py b/platypush/plugins/__init__.py index fa9b56b223..22c7f03cce 100644 --- a/platypush/plugins/__init__.py +++ b/platypush/plugins/__init__.py @@ -29,7 +29,8 @@ def action(f): except Exception as e: if isinstance(args[0], Plugin): args[0].logger.exception(e) - errors.append(str(e) + '\n' + traceback.format_exc()) + raise e + # errors.append(str(e) + '\n' + traceback.format_exc()) return Response(output=output, errors=errors) diff --git a/platypush/plugins/utils.py b/platypush/plugins/utils.py index f16277906b..90f0807b2f 100644 --- a/platypush/plugins/utils.py +++ b/platypush/plugins/utils.py @@ -1,6 +1,8 @@ +import threading import time from platypush.plugins import Plugin, action +from platypush.procedure import Procedure class UtilsPlugin(Plugin): @@ -8,6 +10,17 @@ class UtilsPlugin(Plugin): A plugin for general-purpose util methods """ + _DEFAULT_TIMEOUT_PREFIX = '_PlatypushTimeout_' + _timeout_hndl_idx = 0 + _timeout_hndl_idx_lock = threading.RLock() + + _DEFAULT_INTERVAL_PREFIX = '_PlatypushInterval_' + _interval_hndl_idx = 0 + _interval_hndl_idx_lock = threading.RLock() + + _pending_intervals = {} + _pending_intervals_lock = threading.RLock() + @action def sleep(self, seconds): """ @@ -19,6 +32,114 @@ class UtilsPlugin(Plugin): time.sleep(seconds) + @action + def set_timeout(self, seconds, actions, name=None, **args): + """ + Define a set of actions to run after the specified amount of `seconds`. + + :param seconds: Number of seconds before running the timeout procedure + :type seconds: float + + :param actions: List of actions to be executed after the timeout expires + :type actions: list[dict] + + :param name: Set an optional name for this timeout. It is advised to set + a name if you are planning to programmatically cancel the timeout in + your business logic. + :type name: str + + :param args: Optional arguments/context to pass to the timeout function + """ + + with self._timeout_hndl_idx_lock: + self._timeout_hndl_idx += 1 + if not name: + name = self._DEFAULT_TIMEOUT_PREFIX + str(self._timeout_hndl_idx) + if name in self._pending_timeouts: + return (None, + "A timeout named '{}' is already awaiting".format(name)) + + + procedure = Procedure.build(name=name, requests=actions, _async=False) + self._pending_timeouts[name] = procedure + + def _proc_wrapper(**kwargs): + procedure.execute(**kwargs) + + with self._pending_timeouts_lock: + self._pending_timeouts[name] = threading.Timer(seconds, + _proc_wrapper, + kwargs=args) + self._pending_timeouts[name].start() + + @action + def clear_timeout(self, name): + timer = None + + with self._pending_timeouts_lock: + if name not in self._pending_timeouts: + return + timer = self._pending_timeouts.pop(name) + + timer.cancel() + + + @action + def set_interval(self, seconds, actions, name=None, **args): + """ + Define a set of actions to run each specified amount of `seconds`. + + :param seconds: Number of seconds between two runs of the interval + procedure + :type seconds: float + + :param actions: List of actions to be executed at each interval + :type actions: list[dict] + + :param name: Set an optional name for this interval. It is advised to + set a name if you are planning to programmatically cancel the + interval in your business logic. + :type name: str + + :param args: Optional arguments/context to pass to the interval function + """ + + with self._interval_hndl_idx_lock: + self._interval_hndl_idx += 1 + if not name: + name = self._DEFAULT_INTERVAL_PREFIX + \ + str(self._interval_hndl_idx) + + if name in self._pending_intervals: + return (None, + "An interval named '{}' is already running".format(name)) + + + procedure = Procedure.build(name=name, requests=actions, _async=False) + self._pending_intervals[name] = procedure + + def _proc_wrapper(**kwargs): + while True: + with self._pending_intervals_lock: + if name not in self._pending_intervals: + return + + procedure.execute(**kwargs) + time.sleep(seconds) + + with self._pending_intervals_lock: + self._pending_intervals[name] = threading.Thread( + target=_proc_wrapper, kwargs=args) + self._pending_intervals[name].start() + + @action + def clear_interval(self, name): + interval = None + + with self._pending_intervals_lock: + if name not in self._pending_intervals: + return + del self._pending_intervals[name] + # vim:sw=4:ts=4:et: -