diff --git a/platypush/__init__.py b/platypush/__init__.py index 094f1871..1b97dda1 100644 --- a/platypush/__init__.py +++ b/platypush/__init__.py @@ -72,60 +72,18 @@ class Daemon(object): if isinstance(msg, Request): logging.info('Processing request: {}'.format(msg)) - Thread(target=self.run_request(), args=(msg,)).start() + msg.execute() + + self.processed_requests += 1 + if self.requests_to_process \ + and self.processed_requests >= self.requests_to_process: + self.stop_app() elif isinstance(msg, Response): logging.info('Received response: {}'.format(msg)) return _f - def run_request(self): - """ Runs a request and returns the response """ - def _thread_func(request, n_tries=self.n_tries): - """ Thread closure method - Params: - request - platypush.message.request.Request object """ - - (module_name, method_name) = get_module_and_name_from_action(request.action) - - try: - plugin = get_or_load_plugin(module_name) - except RuntimeError as e: # Module/class not found - logging.exception(e) - return - - try: - # Run the action - response = plugin.run(method=method_name, **request.args) - if response and response.is_error(): - raise RuntimeError('Response processed with errors: {}'.format(response)) - - logging.info('Processed response from plugin {}: {}'. - format(plugin, response)) - except Exception as e: - # Retry mechanism - response = Response(output=None, errors=[str(e), traceback.format_exc()]) - logging.exception(e) - if n_tries: - logging.info('Reloading plugin {} and retrying'.format(module_name)) - get_or_load_plugin(module_name, reload=True) - _thread_func(request, n_tries=n_tries-1) - finally: - # Send the response on the backend - if request.backend and request.origin: - request.backend.send_response(response=response, request=request) - else: - logging.info('Dropping response whose request has no ' + - 'origin attached: {}'.format(request)) - - self.processed_requests += 1 - if self.requests_to_process \ - and self.processed_requests >= self.requests_to_process: - self.stop_app() - - return _thread_func - - def stop_app(self): for backend in self.backends.values(): backend.stop() diff --git a/platypush/message/request/__init__.py b/platypush/message/request/__init__.py index 464f1277..514f2775 100644 --- a/platypush/message/request/__init__.py +++ b/platypush/message/request/__init__.py @@ -1,7 +1,12 @@ import json +import logging import random +from threading import Thread + from platypush.message import Message +from platypush.message.response import Response +from platypush.utils import get_or_load_plugin, get_module_and_name_from_action class Request(Message): """ Request message class """ @@ -16,11 +21,12 @@ class Request(Message): args -- Additional arguments for the action [Dict] """ - self.id = id if id else self._generate_id() - self.target = target - self.action = action - self.origin = origin - self.args = args + self.id = id if id else self._generate_id() + self.target = target + self.action = action + self.origin = origin + self.args = args + self.backend = None @classmethod def build(cls, msg): @@ -42,6 +48,46 @@ class Request(Message): id += '%.2x' % random.randint(0, 255) return id + def execute(self, n_tries=1): + """ + Execute this request and returns a Response object + Params: + n_tries -- Number of tries in case of failure before raising a RuntimeError + """ + def _thread_func(): + (module_name, method_name) = get_module_and_name_from_action(self.action) + + plugin = get_or_load_plugin(module_name) + + try: + # Run the action + response = plugin.run(method=method_name, **self.args) + print(response) + if response and response.is_error(): + raise RuntimeError('Response processed with errors: {}'.format(response)) + + logging.info('Processed response from plugin {}: {}'. + format(plugin, response)) + except Exception as e: + # Retry mechanism + response = Response(output=None, errors=[str(e), traceback.format_exc()]) + logging.exception(e) + if n_tries: + logging.info('Reloading plugin {} and retrying'.format(module_name)) + get_or_load_plugin(module_name, reload=True) + n_tries -= 1 + _thread_func() + finally: + # Send the response on the backend + if self.backend and self.origin: + self.backend.send_response(response=response, request=self) + else: + logging.info('Dropping response whose request has no ' + + 'origin attached: {}'.format(self)) + + Thread(target=_thread_func).start() + + def __str__(self): """ Overrides the str() operator and converts