From 6449504e266850bd5118430df0dd7d98b48837ad Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Thu, 20 Sep 2018 10:49:57 +0000 Subject: [PATCH] Switched the bus to a Redis bus --- platypush/__init__.py | 4 ++-- platypush/backend/__init__.py | 7 +------ platypush/backend/http/__init__.py | 30 +++++++++++++-------------- platypush/bus/redis.py | 5 ++++- platypush/message/request/__init__.py | 15 +++++++++++--- platypush/plugins/redis.py | 25 ++++++++++++++++++++++ platypush/plugins/variable.py | 11 +++++++++- platypush/utils/__init__.py | 9 ++++++++ 8 files changed, 78 insertions(+), 28 deletions(-) diff --git a/platypush/__init__.py b/platypush/__init__.py index 5bc8d3461..c6750c3a2 100644 --- a/platypush/__init__.py +++ b/platypush/__init__.py @@ -13,7 +13,7 @@ import traceback from threading import Thread from .bus import Bus -# from .bus.redis import RedisBus +from .bus.redis import RedisBus from .config import Config from .context import register_backends from .cron.scheduler import CronScheduler @@ -120,7 +120,7 @@ class Daemon: def start(self): """ Start the daemon """ - self.bus = Bus(on_message=self.on_message()) + self.bus = RedisBus(on_message=self.on_message()) # Initialize the backends and link them to the bus self.backends = register_backends(bus=self.bus, global_scope=True) diff --git a/platypush/backend/__init__.py b/platypush/backend/__init__.py index e714c8008..9999d0290 100644 --- a/platypush/backend/__init__.py +++ b/platypush/backend/__init__.py @@ -12,7 +12,7 @@ from threading import Thread from platypush.bus import Bus from platypush.config import Config -from platypush.context import get_backend +from platypush.context import get_backend, get_plugin from platypush.utils import get_message_class_by_type, set_timeout, clear_timeout from platypush.message import Message from platypush.message.event import Event, StopEvent @@ -181,14 +181,9 @@ class Backend(Thread): :param request: Associated request, used to set the response parameters that will link them """ - response = Response.build(response) assert isinstance(response, Response) assert isinstance(request, Request) - response.id = request.id - response.target = request.origin - response.origin = self.device_id - self.send_message(response, **kwargs) diff --git a/platypush/backend/http/__init__.py b/platypush/backend/http/__init__.py index d026d74f6..fc3179d03 100644 --- a/platypush/backend/http/__init__.py +++ b/platypush/backend/http/__init__.py @@ -20,6 +20,7 @@ from platypush.message import Message from platypush.message.event import Event, StopEvent from platypush.message.event.web.widget import WidgetUpdateEvent from platypush.message.request import Request +from platypush.utils import get_redis_queue_name_by_message from .. import Backend @@ -237,24 +238,23 @@ class HttpBackend(Backend): if Config.get('token'): msg.token = Config.get('token') - # TODO planning change to bus message - # self.bus.post(msg) + if isinstance(msg, Request): + msg.backend = self + msg.origin = 'http' + + redis = self._get_redis() + self.bus.post(msg) if isinstance(msg, Request): - try: - response = msg.execute(_async=False) - except PermissionError: - abort(401) - - self.logger.info('Processing response on the HTTP backend: {}'.format(msg)) - return str(response) - elif isinstance(msg, Event): - redis = self._get_redis() - if redis: - redis.rpush(self.redis_queue, msg) - - return jsonify({ 'status': 'ok' }) + response = redis.blpop(get_redis_queue_name_by_message(msg), timeout=10) + if response and response[1]: + response = Message.build(json.loads(response[1].decode('utf-8'))) + else: + response = None + self.logger.info('Processing response on the HTTP backend: {}'.format(response)) + if response: + return str(response) @app.route('/') def index(): diff --git a/platypush/bus/redis.py b/platypush/bus/redis.py index f95fbc51c..ccc5fd00c 100644 --- a/platypush/bus/redis.py +++ b/platypush/bus/redis.py @@ -26,7 +26,10 @@ class RedisBus(Bus): """ Reads one message from the Redis queue """ try: msg = self.redis.blpop(self.redis_queue) - msg = Message.build(json.loads(msg[1].decode('utf-8'))) + if msg and msg[1]: + msg = Message.build(json.loads(msg[1].decode('utf-8'))) + else: + msg = None except Exception as e: logger.exception(e) diff --git a/platypush/message/request/__init__.py b/platypush/message/request/__init__.py index 713f59943..2914bd2da 100644 --- a/platypush/message/request/__init__.py +++ b/platypush/message/request/__init__.py @@ -12,7 +12,7 @@ from platypush.config import Config from platypush.context import get_plugin from platypush.message import Message from platypush.message.response import Response -from platypush.utils import get_hash, get_module_and_method_from_action +from platypush.utils import get_hash, get_module_and_method_from_action, get_redis_queue_name_by_message logger = logging.getLogger(__name__) @@ -152,11 +152,19 @@ class Request(Message): def _send_response(self, response): + response = Response.build(response) + response.id = self.id + response.target = self.origin + response.origin = Config.get('device_id') + if self.backend and self.origin: self.backend.send_response(response=response, request=self) else: - logger.info('Response whose request has no ' + - 'origin attached: {}'.format(response)) + redis = get_plugin('redis') + if redis: + queue_name = get_redis_queue_name_by_message(self) + redis.send_message(queue_name, response) + redis.expire(queue_name, 60) def execute(self, n_tries=1, _async=True, **context): @@ -231,6 +239,7 @@ class Request(Message): 'args' : self.args, 'origin' : self.origin if hasattr(self, 'origin') else None, 'id' : self.id if hasattr(self, 'id') else None, + 'token' : self.token if hasattr(self, 'token') else None, }) diff --git a/platypush/plugins/redis.py b/platypush/plugins/redis.py index 635b4c50f..e976f47ef 100644 --- a/platypush/plugins/redis.py +++ b/platypush/plugins/redis.py @@ -60,5 +60,30 @@ class RedisPlugin(Plugin): return self._get_redis().mset(*args, **kwargs) + @action + def expire(self, key, expiration): + """ + Set an expiration time in seconds for the specified key + + :param key: Key to set to expire + :type key: str + + :param expiration: Expiration timeout (in seconds) + :type expiration: int + """ + + return self._get_redis().expire(key, expiration) + + @action + def delete(self, *args): + """ + Delete one or multiple keys + + :param args: Keys to delete + """ + + return self._get_redis().delete(*args) + + # vim:sw=4:ts=4:et: diff --git a/platypush/plugins/variable.py b/platypush/plugins/variable.py index 1df71e143..f16bdaf05 100644 --- a/platypush/plugins/variable.py +++ b/platypush/plugins/variable.py @@ -141,7 +141,16 @@ class VariablePlugin(Plugin): :type name: str """ - return self.redis_plugin.mset(**{name: None}) + return self.redis_plugin.delete(name) + + + @action + def expire(self, key, expiration): + """ + Set a variable expiration on Redis + """ + + return self.redis_plugin.expire(key, expiration) # vim:sw=4:ts=4:et: diff --git a/platypush/utils/__init__.py b/platypush/utils/__init__.py index 01644a921..32cd68f0c 100644 --- a/platypush/utils/__init__.py +++ b/platypush/utils/__init__.py @@ -111,5 +111,14 @@ def get_decorators(cls, climb_class_hierarchy=False): return decorators +def get_redis_queue_name_by_message(msg): + from platypush.message import Message + + if not isinstance(msg, Message): + logger.warning('Not a valid message (type: {}): {}'.format(type(msg), msg)) + + return 'platypush/responses/{}'.format(msg.id) if msg.id else None + + # vim:sw=4:ts=4:et: