forked from platypush/platypush
Added Kafka plugin
This commit is contained in:
parent
d0fea0c9c6
commit
61e211ee07
7 changed files with 125 additions and 16 deletions
|
@ -10,6 +10,7 @@ Events
|
||||||
platypush/events/geo.rst
|
platypush/events/geo.rst
|
||||||
platypush/events/http.rst
|
platypush/events/http.rst
|
||||||
platypush/events/http.rss.rst
|
platypush/events/http.rss.rst
|
||||||
|
platypush/events/kafka.rst
|
||||||
platypush/events/midi.rst
|
platypush/events/midi.rst
|
||||||
platypush/events/music.rst
|
platypush/events/music.rst
|
||||||
platypush/events/path.rst
|
platypush/events/path.rst
|
||||||
|
|
6
docs/source/platypush/events/kafka.rst
Normal file
6
docs/source/platypush/events/kafka.rst
Normal file
|
@ -0,0 +1,6 @@
|
||||||
|
``platypush.message.event.kafka``
|
||||||
|
=================================
|
||||||
|
|
||||||
|
.. automodule:: platypush.message.event.kafka
|
||||||
|
:members:
|
||||||
|
|
6
docs/source/platypush/plugins/kafka.rst
Normal file
6
docs/source/platypush/plugins/kafka.rst
Normal file
|
@ -0,0 +1,6 @@
|
||||||
|
``platypush.plugins.kafka``
|
||||||
|
===========================
|
||||||
|
|
||||||
|
.. automodule:: platypush.plugins.kafka
|
||||||
|
:members:
|
||||||
|
|
|
@ -24,6 +24,7 @@ Plugins
|
||||||
platypush/plugins/gpio.zeroborg.rst
|
platypush/plugins/gpio.zeroborg.rst
|
||||||
platypush/plugins/http.request.rst
|
platypush/plugins/http.request.rst
|
||||||
platypush/plugins/ifttt.rst
|
platypush/plugins/ifttt.rst
|
||||||
|
platypush/plugins/kafka.rst
|
||||||
platypush/plugins/light.rst
|
platypush/plugins/light.rst
|
||||||
platypush/plugins/light.hue.rst
|
platypush/plugins/light.hue.rst
|
||||||
platypush/plugins/midi.rst
|
platypush/plugins/midi.rst
|
||||||
|
|
|
@ -4,7 +4,10 @@ import time
|
||||||
|
|
||||||
from kafka import KafkaConsumer, KafkaProducer
|
from kafka import KafkaConsumer, KafkaProducer
|
||||||
|
|
||||||
from .. import Backend
|
from platypush.backend import Backend
|
||||||
|
from platypush.context import get_plugin
|
||||||
|
from platypush.message import Message
|
||||||
|
from platypush.message.event.kafka import KafkaMessageEvent
|
||||||
|
|
||||||
|
|
||||||
class KafkaBackend(Backend):
|
class KafkaBackend(Backend):
|
||||||
|
@ -19,9 +22,9 @@ class KafkaBackend(Backend):
|
||||||
|
|
||||||
_conn_retry_secs = 5
|
_conn_retry_secs = 5
|
||||||
|
|
||||||
def __init__(self, server, topic='platypush', **kwargs):
|
def __init__(self, server='localhost:9092', topic='platypush', **kwargs):
|
||||||
"""
|
"""
|
||||||
:param server: Kafka server
|
:param server: Kafka server name or address + port (default: ``localhost:9092``)
|
||||||
:type server: str
|
:type server: str
|
||||||
|
|
||||||
:param topic: (Prefix) topic to listen to (default: platypush). The Platypush device_id (by default the hostname) will be appended to the topic (the real topic name will e.g. be "platypush.my_rpi")
|
:param topic: (Prefix) topic to listen to (default: platypush). The Platypush device_id (by default the hostname) will be appended to the topic (the real topic name will e.g. be "platypush.my_rpi")
|
||||||
|
@ -40,29 +43,31 @@ class KafkaBackend(Backend):
|
||||||
|
|
||||||
def _on_record(self, record):
|
def _on_record(self, record):
|
||||||
if record.topic != self.topic: return
|
if record.topic != self.topic: return
|
||||||
|
msg = record.value.decode('utf-8')
|
||||||
|
is_platypush_message = False
|
||||||
|
|
||||||
try:
|
try:
|
||||||
msg = json.loads(record.value.decode('utf-8'))
|
msg = Message.build(msg)
|
||||||
except Exception as e:
|
is_platypush_message = True
|
||||||
self.logger.exception(e)
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
self.logger.debug('Received message on Kafka backend: {}'.format(msg))
|
self.logger.info('Received message on Kafka backend: {}'.format(msg))
|
||||||
self.on_message(msg)
|
|
||||||
|
|
||||||
def _init_producer(self):
|
if is_platypush_message:
|
||||||
if not self.producer:
|
self.on_message(msg)
|
||||||
self.producer = KafkaProducer(bootstrap_servers=self.server)
|
else:
|
||||||
|
self.on_message(KafkaMessageEvent(msg=msg))
|
||||||
|
|
||||||
def _topic_by_device_id(self, device_id):
|
def _topic_by_device_id(self, device_id):
|
||||||
return '{}.{}'.format(self.topic_prefix, device_id)
|
return '{}.{}'.format(self.topic_prefix, device_id)
|
||||||
|
|
||||||
def send_message(self, msg):
|
def send_message(self, msg):
|
||||||
target = msg.target
|
target = msg.target
|
||||||
msg = str(msg).encode('utf-8')
|
kafka_plugin = get_plugin('kafka')
|
||||||
|
kafka_plugin.send_message(msg=msg,
|
||||||
self._init_producer()
|
topic=self._topic_by_device_id(target),
|
||||||
self.producer.send(self._topic_by_device_id(target), msg)
|
server=self.server)
|
||||||
self.producer.flush()
|
|
||||||
|
|
||||||
def on_stop(self):
|
def on_stop(self):
|
||||||
try:
|
try:
|
||||||
|
|
20
platypush/message/event/kafka.py
Normal file
20
platypush/message/event/kafka.py
Normal file
|
@ -0,0 +1,20 @@
|
||||||
|
from platypush.message.event import Event
|
||||||
|
|
||||||
|
|
||||||
|
class KafkaMessageEvent(Event):
|
||||||
|
"""
|
||||||
|
Kafka message event object. Fired when :mod:`platypush.backend.kafka` receives
|
||||||
|
a new event.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, msg, *args, **kwargs):
|
||||||
|
"""
|
||||||
|
:param msg: Received message
|
||||||
|
:type msg: str or bytes stream
|
||||||
|
"""
|
||||||
|
|
||||||
|
super().__init__(msg=msg, *args, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
# vim:sw=4:ts=4:et:
|
||||||
|
|
70
platypush/plugins/kafka.py
Normal file
70
platypush/plugins/kafka.py
Normal file
|
@ -0,0 +1,70 @@
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import time
|
||||||
|
|
||||||
|
from kafka import KafkaProducer
|
||||||
|
|
||||||
|
from platypush.context import get_backend
|
||||||
|
from platypush.plugins import Plugin, action
|
||||||
|
|
||||||
|
|
||||||
|
class KafkaPlugin(Plugin):
|
||||||
|
"""
|
||||||
|
Plugin to send messages to an Apache Kafka instance (https://kafka.apache.org/)
|
||||||
|
|
||||||
|
Triggers:
|
||||||
|
|
||||||
|
* :class:`platypush.message.event.kafka.KafkaMessageEvent` when a new message is received on the consumer topic.
|
||||||
|
|
||||||
|
Requires:
|
||||||
|
|
||||||
|
* **kafka** (``pip install kafka-python``)
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, server=None, **kwargs):
|
||||||
|
"""
|
||||||
|
:param server: Default Kafka server name or address + port (format: ``host:port``) to dispatch the messages to. If None (default), then it has to be specified upon message sending.
|
||||||
|
:type server: str
|
||||||
|
"""
|
||||||
|
|
||||||
|
super().__init__(**kwargs)
|
||||||
|
|
||||||
|
self.server = '{server}:{port}'.format(server=server, port=port) \
|
||||||
|
if server else None
|
||||||
|
|
||||||
|
self.producer = None
|
||||||
|
|
||||||
|
# Kafka can be veryyyy noisy
|
||||||
|
logging.getLogger('kafka').setLevel(logging.ERROR)
|
||||||
|
|
||||||
|
|
||||||
|
@action
|
||||||
|
def send_message(self, msg, topic, server=None, **kwargs):
|
||||||
|
"""
|
||||||
|
:param msg: Message to send - as a string, bytes stream, JSON, Platypush message, dictionary, or anything that implements ``__str__``
|
||||||
|
|
||||||
|
:param server: Kafka server name or address + port (format: ``host:port``). If None, then the default server will be used
|
||||||
|
:type server: str
|
||||||
|
"""
|
||||||
|
|
||||||
|
if not server:
|
||||||
|
if not self.server:
|
||||||
|
try:
|
||||||
|
kafka_backend = get_backend('kafka')
|
||||||
|
server = kafka_backend.server
|
||||||
|
except:
|
||||||
|
raise RuntimeError('No Kafka server nor default server specified')
|
||||||
|
else:
|
||||||
|
server = self.server
|
||||||
|
|
||||||
|
if isinstance(msg, dict) or isinstance(msg, list):
|
||||||
|
msg = json.dumps(msg)
|
||||||
|
msg = str(msg).encode('utf-8')
|
||||||
|
|
||||||
|
producer = KafkaProducer(bootstrap_servers=server)
|
||||||
|
producer.send(topic, msg)
|
||||||
|
producer.flush()
|
||||||
|
|
||||||
|
|
||||||
|
# vim:sw=4:ts=4:et:
|
||||||
|
|
Loading…
Reference in a new issue