Changed design for run_request - it should be a method of Request, not a member of Daemon

This commit is contained in:
Fabio Manganiello 2017-12-22 18:04:18 +01:00
parent 1ea8badd59
commit ac3dad5fd9
2 changed files with 57 additions and 53 deletions

View file

@ -72,60 +72,18 @@ class Daemon(object):
if isinstance(msg, Request): if isinstance(msg, Request):
logging.info('Processing request: {}'.format(msg)) logging.info('Processing request: {}'.format(msg))
Thread(target=self.run_request(), args=(msg,)).start() msg.execute()
self.processed_requests += 1
if self.requests_to_process \
and self.processed_requests >= self.requests_to_process:
self.stop_app()
elif isinstance(msg, Response): elif isinstance(msg, Response):
logging.info('Received response: {}'.format(msg)) logging.info('Received response: {}'.format(msg))
return _f return _f
def run_request(self):
""" Runs a request and returns the response """
def _thread_func(request, n_tries=self.n_tries):
""" Thread closure method
Params:
request - platypush.message.request.Request object """
(module_name, method_name) = get_module_and_name_from_action(request.action)
try:
plugin = get_or_load_plugin(module_name)
except RuntimeError as e: # Module/class not found
logging.exception(e)
return
try:
# Run the action
response = plugin.run(method=method_name, **request.args)
if response and response.is_error():
raise RuntimeError('Response processed with errors: {}'.format(response))
logging.info('Processed response from plugin {}: {}'.
format(plugin, response))
except Exception as e:
# Retry mechanism
response = Response(output=None, errors=[str(e), traceback.format_exc()])
logging.exception(e)
if n_tries:
logging.info('Reloading plugin {} and retrying'.format(module_name))
get_or_load_plugin(module_name, reload=True)
_thread_func(request, n_tries=n_tries-1)
finally:
# Send the response on the backend
if request.backend and request.origin:
request.backend.send_response(response=response, request=request)
else:
logging.info('Dropping response whose request has no ' +
'origin attached: {}'.format(request))
self.processed_requests += 1
if self.requests_to_process \
and self.processed_requests >= self.requests_to_process:
self.stop_app()
return _thread_func
def stop_app(self): def stop_app(self):
for backend in self.backends.values(): for backend in self.backends.values():
backend.stop() backend.stop()

View file

@ -1,7 +1,12 @@
import json import json
import logging
import random import random
from threading import Thread
from platypush.message import Message from platypush.message import Message
from platypush.message.response import Response
from platypush.utils import get_or_load_plugin, get_module_and_name_from_action
class Request(Message): class Request(Message):
""" Request message class """ """ Request message class """
@ -16,11 +21,12 @@ class Request(Message):
args -- Additional arguments for the action [Dict] args -- Additional arguments for the action [Dict]
""" """
self.id = id if id else self._generate_id() self.id = id if id else self._generate_id()
self.target = target self.target = target
self.action = action self.action = action
self.origin = origin self.origin = origin
self.args = args self.args = args
self.backend = None
@classmethod @classmethod
def build(cls, msg): def build(cls, msg):
@ -42,6 +48,46 @@ class Request(Message):
id += '%.2x' % random.randint(0, 255) id += '%.2x' % random.randint(0, 255)
return id return id
def execute(self, n_tries=1):
"""
Execute this request and returns a Response object
Params:
n_tries -- Number of tries in case of failure before raising a RuntimeError
"""
def _thread_func():
(module_name, method_name) = get_module_and_name_from_action(self.action)
plugin = get_or_load_plugin(module_name)
try:
# Run the action
response = plugin.run(method=method_name, **self.args)
print(response)
if response and response.is_error():
raise RuntimeError('Response processed with errors: {}'.format(response))
logging.info('Processed response from plugin {}: {}'.
format(plugin, response))
except Exception as e:
# Retry mechanism
response = Response(output=None, errors=[str(e), traceback.format_exc()])
logging.exception(e)
if n_tries:
logging.info('Reloading plugin {} and retrying'.format(module_name))
get_or_load_plugin(module_name, reload=True)
n_tries -= 1
_thread_func()
finally:
# Send the response on the backend
if self.backend and self.origin:
self.backend.send_response(response=response, request=self)
else:
logging.info('Dropping response whose request has no ' +
'origin attached: {}'.format(self))
Thread(target=_thread_func).start()
def __str__(self): def __str__(self):
""" """
Overrides the str() operator and converts Overrides the str() operator and converts