diff --git a/platypush/backend/__init__.py b/platypush/backend/__init__.py index dca6575d8..d41fd27d5 100644 --- a/platypush/backend/__init__.py +++ b/platypush/backend/__init__.py @@ -187,7 +187,7 @@ class Backend(Thread): self.send_message(response, **kwargs) - def send_message(self, msg, **kwargs): + def send_message(self, msg, queue_name=None, **kwargs): """ Sends a platypush.message.Message to a node. To be implemented in the derived classes. By default, if the Redis @@ -195,7 +195,9 @@ class Backend(Thread): other consumers through the configured Redis main queue. :param msg: The message to send + :param queue_name: Send the message on a specific queue (default: the queue_name configured on the Redis backend) """ + try: redis = get_backend('redis') if not redis: @@ -205,7 +207,7 @@ class Backend(Thread): "and the fallback Redis backend isn't configured") return - redis.send_message(msg) + redis.send_message(msg, queue_name=queue_name) def run(self): diff --git a/platypush/backend/camera/pi.py b/platypush/backend/camera/pi.py index 3943af077..25f234753 100644 --- a/platypush/backend/camera/pi.py +++ b/platypush/backend/camera/pi.py @@ -3,16 +3,17 @@ import socket import time from enum import Enum -from redis import Redis from threading import Thread from platypush.backend import Backend +from platypush.context import get_backend class CameraPiBackend(Backend): """ Backend to interact with a Raspberry Pi camera. It can start and stop recordings and take pictures. It can be programmatically controlled through - the :class:`platypush.plugins.camera.pi` plugin. + the :class:`platypush.plugins.camera.pi` plugin. Note that the Redis backend + must be configured and running to enable camera control. Requires: @@ -29,7 +30,7 @@ class CameraPiBackend(Backend): return self.value == other def __init__(self, listen_port, x_resolution=640, y_resolution=480, - redis_queue='platypush_mq_camera', + redis_queue='platypush/camera/pi', start_recording_on_startup=True, framerate=24, hflip=False, vflip=False, sharpness=0, contrast=0, brightness=50, @@ -72,7 +73,7 @@ class CameraPiBackend(Backend): self.camera.rotation = rotation self.camera.crop = crop self.start_recording_on_startup = start_recording_on_startup - self.redis = Redis() + self.redis = get_backend('redis') self.redis_queue = redis_queue self._recording_thread = None @@ -87,7 +88,7 @@ class CameraPiBackend(Backend): **kwargs } - self.redis.rpush(self.redis_queue, json.dumps(action)) + self.redis.send_message(msg=json.dumps(action), queue_name=self.redis_queue) def take_picture(self, image_file): """ @@ -152,14 +153,15 @@ class CameraPiBackend(Backend): try: self.camera.stop_recording() - except: - self.logger.info('No recording currently in progress') + except Exception as e: + self.logger.info('Failed to stop recording') + self.logger.exception(e) def run(self): super().run() while not self.should_stop(): - msg = json.loads(self.redis.blpop(self.redis_queue)[1].decode()) + msg = self.redis.get_message(self.redis_queue) if msg.get('action') == self.CameraAction.START_RECORDING: self.start_recording() diff --git a/platypush/backend/http/__init__.py b/platypush/backend/http/__init__.py index 10a0e0ea3..14e869711 100644 --- a/platypush/backend/http/__init__.py +++ b/platypush/backend/http/__init__.py @@ -52,7 +52,7 @@ class HttpBackend(Backend): } def __init__(self, port=8008, websocket_port=8009, disable_websocket=False, - redis_queue='platypush_flask_mq', token=None, dashboard={}, + redis_queue='platypush/http', token=None, dashboard={}, maps={}, **kwargs): """ :param port: Listen port for the web server (default: 8008) @@ -64,7 +64,7 @@ class HttpBackend(Backend): :param disable_websocket: Disable the websocket interface (default: False) :type disable_websocket: bool - :param redis_queue: Name of the Redis queue used to synchronize messages with the web server process (default: ``platypush_flask_mq``) + :param redis_queue: Name of the Redis queue used to synchronize messages with the web server process (default: ``platypush/http``) :type redis_queue: str :param token: If set (recommended) any interaction with the web server needs to bear an ``X-Token: `` header, or it will fail with a 403: Forbidden diff --git a/platypush/backend/redis.py b/platypush/backend/redis.py index a297a8d97..6e9db3497 100644 --- a/platypush/backend/redis.py +++ b/platypush/backend/redis.py @@ -34,8 +34,24 @@ class RedisBackend(Backend): self.redis = Redis(**self.redis_args) - def send_message(self, msg): - self.redis.rpush(self.queue, msg) + def send_message(self, msg, queue_name=None): + if queue_name: + self.redis.rpush(queue_name, msg) + else: + self.redis.rpush(self.queue, msg) + + + def get_message(self, queue_name=None): + queue = queue_name or self.queue + msg = self.redis.blpop(queue)[1].decode('utf-8') + + try: + msg = Message.build(json.loads(msg)) + except: + import ast + msg = Message.build(ast.literal_eval(msg)) + + return msg def run(self): @@ -45,19 +61,9 @@ class RedisBackend(Backend): format(self.queue, self.redis_args)) while not self.should_stop(): - try: - msg = self.redis.blpop(self.queue)[1].decode('utf-8') - - try: - msg = Message.build(json.loads(msg)) - except: - import ast - msg = Message.build(ast.literal_eval(msg)) - - self.logger.info('Received message on the Redis backend: {}'.format(msg)) - self.bus.post(msg) - except Exception as e: - self.logger.exception(e) + msg = self.get_message() + self.logger.info('Received message on the Redis backend: {}'.format(msg)) + self.bus.post(msg) # vim:sw=4:ts=4:et: diff --git a/platypush/plugins/light/hue/__init__.py b/platypush/plugins/light/hue/__init__.py index 7d111d50d..71677f38a 100644 --- a/platypush/plugins/light/hue/__init__.py +++ b/platypush/plugins/light/hue/__init__.py @@ -7,6 +7,7 @@ from redis import Redis from redis.exceptions import TimeoutError as QueueTimeoutError from phue import Bridge +from platypush.context import get_backend from platypush.message.response import Response from .. import LightPlugin @@ -513,7 +514,9 @@ class LightHuePlugin(LightPlugin): self.animation_thread = None self.redis = None - self.redis = Redis(socket_timeout=transition_seconds) + redis_args = get_backend('redis').redis_args + redis_args['socket_timeout'] = transition_seconds + self.redis = Redis(**redis_args) if groups: groups = [g for g in self.bridge.groups if g.name in groups]