Added Redis bus

This commit is contained in:
Fabio Manganiello 2018-09-20 09:41:19 +02:00
parent 6bbae19d39
commit 661563d1f1
4 changed files with 46 additions and 5 deletions

View file

@ -13,6 +13,7 @@ import traceback
from threading import Thread
from .bus import Bus
# from .bus.redis import RedisBus
from .config import Config
from .context import register_backends
from .cron.scheduler import CronScheduler

View file

@ -237,6 +237,9 @@ class HttpBackend(Backend):
if Config.get('token'):
msg.token = Config.get('token')
# TODO planning change to bus message
# self.bus.post(msg)
if isinstance(msg, Request):
try:
response = msg.execute(_async=False)

View file

@ -1,10 +1,6 @@
import os
import sys
import signal
import logging
import threading
from enum import Enum
from queue import Queue
from platypush.config import Config
@ -35,7 +31,7 @@ class Bus(object):
origin=Config.get('device_id'),
thread_id=self.thread_id)
self.bus.put(evt)
self.post(evt)
def poll(self):
"""

41
platypush/bus/redis.py Normal file
View file

@ -0,0 +1,41 @@
import json
import logging
import threading
from redis import Redis
from platypush.bus import Bus
from platypush.message import Message
logger = logging.getLogger(__name__)
class RedisBus(Bus):
""" Overrides the in-process in-memory local bus with a Redis bus """
_DEFAULT_REDIS_QUEUE = 'platypush/bus'
def __init__(self, on_message=None, redis_queue=_DEFAULT_REDIS_QUEUE,
*args, **kwargs):
super().__init__(on_message=on_message)
self.redis = Redis(*args, **kwargs)
self.redis_queue = redis_queue
self.on_message = on_message
self.thread_id = threading.get_ident()
def get(self):
""" Reads one message from the Redis queue """
try:
msg = self.redis.blpop(self.redis_queue)
msg = Message.build(json.loads(msg[1].decode('utf-8')))
except Exception as e:
logger.exception(e)
return msg
def post(self, msg):
""" Sends a message to the Redis queue """
return self.redis.rpush(self.redis_queue, msg)
# vim:sw=4:ts=4:et: