Switched the bus to a Redis bus

This commit is contained in:
Fabio Manganiello 2018-09-20 10:49:57 +00:00
parent 661563d1f1
commit 6449504e26
8 changed files with 78 additions and 28 deletions

View file

@ -13,7 +13,7 @@ import traceback
from threading import Thread from threading import Thread
from .bus import Bus from .bus import Bus
# from .bus.redis import RedisBus from .bus.redis import RedisBus
from .config import Config from .config import Config
from .context import register_backends from .context import register_backends
from .cron.scheduler import CronScheduler from .cron.scheduler import CronScheduler
@ -120,7 +120,7 @@ class Daemon:
def start(self): def start(self):
""" Start the daemon """ """ Start the daemon """
self.bus = Bus(on_message=self.on_message()) self.bus = RedisBus(on_message=self.on_message())
# Initialize the backends and link them to the bus # Initialize the backends and link them to the bus
self.backends = register_backends(bus=self.bus, global_scope=True) self.backends = register_backends(bus=self.bus, global_scope=True)

View file

@ -12,7 +12,7 @@ from threading import Thread
from platypush.bus import Bus from platypush.bus import Bus
from platypush.config import Config from platypush.config import Config
from platypush.context import get_backend from platypush.context import get_backend, get_plugin
from platypush.utils import get_message_class_by_type, set_timeout, clear_timeout from platypush.utils import get_message_class_by_type, set_timeout, clear_timeout
from platypush.message import Message from platypush.message import Message
from platypush.message.event import Event, StopEvent from platypush.message.event import Event, StopEvent
@ -181,14 +181,9 @@ class Backend(Thread):
:param request: Associated request, used to set the response parameters that will link them :param request: Associated request, used to set the response parameters that will link them
""" """
response = Response.build(response)
assert isinstance(response, Response) assert isinstance(response, Response)
assert isinstance(request, Request) assert isinstance(request, Request)
response.id = request.id
response.target = request.origin
response.origin = self.device_id
self.send_message(response, **kwargs) self.send_message(response, **kwargs)

View file

@ -20,6 +20,7 @@ from platypush.message import Message
from platypush.message.event import Event, StopEvent from platypush.message.event import Event, StopEvent
from platypush.message.event.web.widget import WidgetUpdateEvent from platypush.message.event.web.widget import WidgetUpdateEvent
from platypush.message.request import Request from platypush.message.request import Request
from platypush.utils import get_redis_queue_name_by_message
from .. import Backend from .. import Backend
@ -237,24 +238,23 @@ class HttpBackend(Backend):
if Config.get('token'): if Config.get('token'):
msg.token = Config.get('token') msg.token = Config.get('token')
# TODO planning change to bus message if isinstance(msg, Request):
# self.bus.post(msg) msg.backend = self
msg.origin = 'http'
redis = self._get_redis()
self.bus.post(msg)
if isinstance(msg, Request): if isinstance(msg, Request):
try: response = redis.blpop(get_redis_queue_name_by_message(msg), timeout=10)
response = msg.execute(_async=False) if response and response[1]:
except PermissionError: response = Message.build(json.loads(response[1].decode('utf-8')))
abort(401) else:
response = None
self.logger.info('Processing response on the HTTP backend: {}'.format(msg)) self.logger.info('Processing response on the HTTP backend: {}'.format(response))
if response:
return str(response) return str(response)
elif isinstance(msg, Event):
redis = self._get_redis()
if redis:
redis.rpush(self.redis_queue, msg)
return jsonify({ 'status': 'ok' })
@app.route('/') @app.route('/')
def index(): def index():

View file

@ -26,7 +26,10 @@ class RedisBus(Bus):
""" Reads one message from the Redis queue """ """ Reads one message from the Redis queue """
try: try:
msg = self.redis.blpop(self.redis_queue) msg = self.redis.blpop(self.redis_queue)
if msg and msg[1]:
msg = Message.build(json.loads(msg[1].decode('utf-8'))) msg = Message.build(json.loads(msg[1].decode('utf-8')))
else:
msg = None
except Exception as e: except Exception as e:
logger.exception(e) logger.exception(e)

View file

@ -12,7 +12,7 @@ from platypush.config import Config
from platypush.context import get_plugin from platypush.context import get_plugin
from platypush.message import Message from platypush.message import Message
from platypush.message.response import Response from platypush.message.response import Response
from platypush.utils import get_hash, get_module_and_method_from_action from platypush.utils import get_hash, get_module_and_method_from_action, get_redis_queue_name_by_message
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -152,11 +152,19 @@ class Request(Message):
def _send_response(self, response): def _send_response(self, response):
response = Response.build(response)
response.id = self.id
response.target = self.origin
response.origin = Config.get('device_id')
if self.backend and self.origin: if self.backend and self.origin:
self.backend.send_response(response=response, request=self) self.backend.send_response(response=response, request=self)
else: else:
logger.info('Response whose request has no ' + redis = get_plugin('redis')
'origin attached: {}'.format(response)) if redis:
queue_name = get_redis_queue_name_by_message(self)
redis.send_message(queue_name, response)
redis.expire(queue_name, 60)
def execute(self, n_tries=1, _async=True, **context): def execute(self, n_tries=1, _async=True, **context):
@ -231,6 +239,7 @@ class Request(Message):
'args' : self.args, 'args' : self.args,
'origin' : self.origin if hasattr(self, 'origin') else None, 'origin' : self.origin if hasattr(self, 'origin') else None,
'id' : self.id if hasattr(self, 'id') else None, 'id' : self.id if hasattr(self, 'id') else None,
'token' : self.token if hasattr(self, 'token') else None,
}) })

View file

@ -60,5 +60,30 @@ class RedisPlugin(Plugin):
return self._get_redis().mset(*args, **kwargs) return self._get_redis().mset(*args, **kwargs)
@action
def expire(self, key, expiration):
"""
Set an expiration time in seconds for the specified key
:param key: Key to set to expire
:type key: str
:param expiration: Expiration timeout (in seconds)
:type expiration: int
"""
return self._get_redis().expire(key, expiration)
@action
def delete(self, *args):
"""
Delete one or multiple keys
:param args: Keys to delete
"""
return self._get_redis().delete(*args)
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et:

View file

@ -141,7 +141,16 @@ class VariablePlugin(Plugin):
:type name: str :type name: str
""" """
return self.redis_plugin.mset(**{name: None}) return self.redis_plugin.delete(name)
@action
def expire(self, key, expiration):
"""
Set a variable expiration on Redis
"""
return self.redis_plugin.expire(key, expiration)
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et:

View file

@ -111,5 +111,14 @@ def get_decorators(cls, climb_class_hierarchy=False):
return decorators return decorators
def get_redis_queue_name_by_message(msg):
from platypush.message import Message
if not isinstance(msg, Message):
logger.warning('Not a valid message (type: {}): {}'.format(type(msg), msg))
return 'platypush/responses/{}'.format(msg.id) if msg.id else None
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et: