platypush/platypush/backend/redis.py

88 lines
2.4 KiB
Python
Raw Normal View History

2018-05-14 20:09:25 +02:00
import json
from redis import Redis
from platypush.backend import Backend
from platypush.context import get_plugin
2018-05-14 20:09:25 +02:00
from platypush.message import Message
class RedisBackend(Backend):
"""
2018-06-26 00:16:39 +02:00
Backend that reads messages from a configured Redis queue (default:
``platypush_bus_mq``) and posts them to the application bus. Very
useful when you have plugin whose code is executed in another process
2018-05-14 20:09:25 +02:00
and can't post events or requests to the application bus.
2018-06-26 00:16:39 +02:00
Requires:
* **redis** (``pip install redis``)
2018-05-14 20:09:25 +02:00
"""
def __init__(self, queue='platypush_bus_mq', redis_args={}, *args, **kwargs):
"""
2018-06-26 00:16:39 +02:00
:param queue: Queue name to listen on (default: ``platypush_bus_mq``)
:type queue: str
:param redis_args: Arguments that will be passed to the redis-py constructor (e.g. host, port, password), see http://redis-py.readthedocs.io/en/latest/
:type redis_args: dict
2018-05-14 20:09:25 +02:00
"""
2018-06-26 00:16:39 +02:00
2018-05-14 20:09:25 +02:00
super().__init__(*args, **kwargs)
self.queue = queue
self.redis_args = redis_args
if not redis_args:
try:
redis_plugin = get_plugin('redis')
if redis_plugin and redis_plugin.kwargs:
self.redis_args = redis_plugin.kwargs
except:
pass
self.redis = Redis(**self.redis_args)
2018-05-14 20:09:25 +02:00
def send_message(self, msg, queue_name=None):
msg = str(msg)
if queue_name:
self.redis.rpush(queue_name, msg)
else:
self.redis.rpush(self.queue, msg)
def get_message(self, queue_name=None):
queue = queue_name or self.queue
msg = self.redis.blpop(queue)[1].decode('utf-8')
try:
msg = Message.build(json.loads(msg))
except:
try:
import ast
msg = Message.build(ast.literal_eval(msg))
except:
try:
msg = json.loads(msg)
except Exception as e:
self.logger.exception(e)
return msg
2018-05-14 20:09:25 +02:00
def run(self):
super().run()
2018-05-14 20:09:25 +02:00
2018-06-06 20:09:18 +02:00
self.logger.info('Initialized Redis backend on queue {} with arguments {}'.
2018-05-14 20:09:25 +02:00
format(self.queue, self.redis_args))
while not self.should_stop():
msg = self.get_message()
self.logger.info('Received message on the Redis backend: {}'.format(msg))
self.on_message(msg)
2018-05-14 20:09:25 +02:00
# vim:sw=4:ts=4:et: