Better way to handle plugins->backends communication through Redis

This commit is contained in:
Fabio Manganiello 2018-06-29 10:21:16 +02:00
parent f3bdeaf418
commit 3872276234
5 changed files with 41 additions and 28 deletions

View file

@ -187,7 +187,7 @@ class Backend(Thread):
self.send_message(response, **kwargs)
def send_message(self, msg, **kwargs):
def send_message(self, msg, queue_name=None, **kwargs):
"""
Sends a platypush.message.Message to a node.
To be implemented in the derived classes. By default, if the Redis
@ -195,7 +195,9 @@ class Backend(Thread):
other consumers through the configured Redis main queue.
:param msg: The message to send
:param queue_name: Send the message on a specific queue (default: the queue_name configured on the Redis backend)
"""
try:
redis = get_backend('redis')
if not redis:
@ -205,7 +207,7 @@ class Backend(Thread):
"and the fallback Redis backend isn't configured")
return
redis.send_message(msg)
redis.send_message(msg, queue_name=queue_name)
def run(self):

View file

@ -3,16 +3,17 @@ import socket
import time
from enum import Enum
from redis import Redis
from threading import Thread
from platypush.backend import Backend
from platypush.context import get_backend
class CameraPiBackend(Backend):
"""
Backend to interact with a Raspberry Pi camera. It can start and stop
recordings and take pictures. It can be programmatically controlled through
the :class:`platypush.plugins.camera.pi` plugin.
the :class:`platypush.plugins.camera.pi` plugin. Note that the Redis backend
must be configured and running to enable camera control.
Requires:
@ -29,7 +30,7 @@ class CameraPiBackend(Backend):
return self.value == other
def __init__(self, listen_port, x_resolution=640, y_resolution=480,
redis_queue='platypush_mq_camera',
redis_queue='platypush/camera/pi',
start_recording_on_startup=True,
framerate=24, hflip=False, vflip=False,
sharpness=0, contrast=0, brightness=50,
@ -72,7 +73,7 @@ class CameraPiBackend(Backend):
self.camera.rotation = rotation
self.camera.crop = crop
self.start_recording_on_startup = start_recording_on_startup
self.redis = Redis()
self.redis = get_backend('redis')
self.redis_queue = redis_queue
self._recording_thread = None
@ -87,7 +88,7 @@ class CameraPiBackend(Backend):
**kwargs
}
self.redis.rpush(self.redis_queue, json.dumps(action))
self.redis.send_message(msg=json.dumps(action), queue_name=self.redis_queue)
def take_picture(self, image_file):
"""
@ -152,14 +153,15 @@ class CameraPiBackend(Backend):
try:
self.camera.stop_recording()
except:
self.logger.info('No recording currently in progress')
except Exception as e:
self.logger.info('Failed to stop recording')
self.logger.exception(e)
def run(self):
super().run()
while not self.should_stop():
msg = json.loads(self.redis.blpop(self.redis_queue)[1].decode())
msg = self.redis.get_message(self.redis_queue)
if msg.get('action') == self.CameraAction.START_RECORDING:
self.start_recording()

View file

@ -52,7 +52,7 @@ class HttpBackend(Backend):
}
def __init__(self, port=8008, websocket_port=8009, disable_websocket=False,
redis_queue='platypush_flask_mq', token=None, dashboard={},
redis_queue='platypush/http', token=None, dashboard={},
maps={}, **kwargs):
"""
:param port: Listen port for the web server (default: 8008)
@ -64,7 +64,7 @@ class HttpBackend(Backend):
:param disable_websocket: Disable the websocket interface (default: False)
:type disable_websocket: bool
:param redis_queue: Name of the Redis queue used to synchronize messages with the web server process (default: ``platypush_flask_mq``)
:param redis_queue: Name of the Redis queue used to synchronize messages with the web server process (default: ``platypush/http``)
:type redis_queue: str
:param token: If set (recommended) any interaction with the web server needs to bear an ``X-Token: <token>`` header, or it will fail with a 403: Forbidden

View file

@ -34,8 +34,24 @@ class RedisBackend(Backend):
self.redis = Redis(**self.redis_args)
def send_message(self, msg):
self.redis.rpush(self.queue, msg)
def send_message(self, msg, queue_name=None):
if queue_name:
self.redis.rpush(queue_name, msg)
else:
self.redis.rpush(self.queue, msg)
def get_message(self, queue_name=None):
queue = queue_name or self.queue
msg = self.redis.blpop(queue)[1].decode('utf-8')
try:
msg = Message.build(json.loads(msg))
except:
import ast
msg = Message.build(ast.literal_eval(msg))
return msg
def run(self):
@ -45,19 +61,9 @@ class RedisBackend(Backend):
format(self.queue, self.redis_args))
while not self.should_stop():
try:
msg = self.redis.blpop(self.queue)[1].decode('utf-8')
try:
msg = Message.build(json.loads(msg))
except:
import ast
msg = Message.build(ast.literal_eval(msg))
self.logger.info('Received message on the Redis backend: {}'.format(msg))
self.bus.post(msg)
except Exception as e:
self.logger.exception(e)
msg = self.get_message()
self.logger.info('Received message on the Redis backend: {}'.format(msg))
self.bus.post(msg)
# vim:sw=4:ts=4:et:

View file

@ -7,6 +7,7 @@ from redis import Redis
from redis.exceptions import TimeoutError as QueueTimeoutError
from phue import Bridge
from platypush.context import get_backend
from platypush.message.response import Response
from .. import LightPlugin
@ -513,7 +514,9 @@ class LightHuePlugin(LightPlugin):
self.animation_thread = None
self.redis = None
self.redis = Redis(socket_timeout=transition_seconds)
redis_args = get_backend('redis').redis_args
redis_args['socket_timeout'] = transition_seconds
self.redis = Redis(**redis_args)
if groups:
groups = [g for g in self.bridge.groups if g.name in groups]