From 661563d1f15d6bcc6b8684f9e29043d52f92da75 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Thu, 20 Sep 2018 09:41:19 +0200 Subject: [PATCH] Added Redis bus --- platypush/__init__.py | 1 + platypush/backend/http/__init__.py | 3 +++ platypush/bus/__init__.py | 6 +---- platypush/bus/redis.py | 41 ++++++++++++++++++++++++++++++ 4 files changed, 46 insertions(+), 5 deletions(-) create mode 100644 platypush/bus/redis.py diff --git a/platypush/__init__.py b/platypush/__init__.py index 3d88d4ee1..5bc8d3461 100644 --- a/platypush/__init__.py +++ b/platypush/__init__.py @@ -13,6 +13,7 @@ import traceback from threading import Thread from .bus import Bus +# from .bus.redis import RedisBus from .config import Config from .context import register_backends from .cron.scheduler import CronScheduler diff --git a/platypush/backend/http/__init__.py b/platypush/backend/http/__init__.py index 9706c0635..d026d74f6 100644 --- a/platypush/backend/http/__init__.py +++ b/platypush/backend/http/__init__.py @@ -237,6 +237,9 @@ class HttpBackend(Backend): if Config.get('token'): msg.token = Config.get('token') + # TODO planning change to bus message + # self.bus.post(msg) + if isinstance(msg, Request): try: response = msg.execute(_async=False) diff --git a/platypush/bus/__init__.py b/platypush/bus/__init__.py index 13f5c28fa..47e1e5ded 100644 --- a/platypush/bus/__init__.py +++ b/platypush/bus/__init__.py @@ -1,10 +1,6 @@ -import os -import sys -import signal import logging import threading -from enum import Enum from queue import Queue from platypush.config import Config @@ -35,7 +31,7 @@ class Bus(object): origin=Config.get('device_id'), thread_id=self.thread_id) - self.bus.put(evt) + self.post(evt) def poll(self): """ diff --git a/platypush/bus/redis.py b/platypush/bus/redis.py new file mode 100644 index 000000000..f95fbc51c --- /dev/null +++ b/platypush/bus/redis.py @@ -0,0 +1,41 @@ +import json +import logging +import threading + +from redis import Redis + +from platypush.bus import Bus +from platypush.message import Message + +logger = logging.getLogger(__name__) + + +class RedisBus(Bus): + """ Overrides the in-process in-memory local bus with a Redis bus """ + _DEFAULT_REDIS_QUEUE = 'platypush/bus' + + def __init__(self, on_message=None, redis_queue=_DEFAULT_REDIS_QUEUE, + *args, **kwargs): + super().__init__(on_message=on_message) + self.redis = Redis(*args, **kwargs) + self.redis_queue = redis_queue + self.on_message = on_message + self.thread_id = threading.get_ident() + + def get(self): + """ Reads one message from the Redis queue """ + try: + msg = self.redis.blpop(self.redis_queue) + msg = Message.build(json.loads(msg[1].decode('utf-8'))) + except Exception as e: + logger.exception(e) + + return msg + + def post(self, msg): + """ Sends a message to the Redis queue """ + return self.redis.rpush(self.redis_queue, msg) + + +# vim:sw=4:ts=4:et: +