diff --git a/docs/source/events.rst b/docs/source/events.rst index 9edfc3cd..2c062598 100644 --- a/docs/source/events.rst +++ b/docs/source/events.rst @@ -10,6 +10,7 @@ Events platypush/events/geo.rst platypush/events/http.rst platypush/events/http.rss.rst + platypush/events/kafka.rst platypush/events/midi.rst platypush/events/music.rst platypush/events/path.rst diff --git a/docs/source/platypush/events/kafka.rst b/docs/source/platypush/events/kafka.rst new file mode 100644 index 00000000..69a704a6 --- /dev/null +++ b/docs/source/platypush/events/kafka.rst @@ -0,0 +1,6 @@ +``platypush.message.event.kafka`` +================================= + +.. automodule:: platypush.message.event.kafka + :members: + diff --git a/docs/source/platypush/plugins/kafka.rst b/docs/source/platypush/plugins/kafka.rst new file mode 100644 index 00000000..49e13fd0 --- /dev/null +++ b/docs/source/platypush/plugins/kafka.rst @@ -0,0 +1,6 @@ +``platypush.plugins.kafka`` +=========================== + +.. automodule:: platypush.plugins.kafka + :members: + diff --git a/docs/source/plugins.rst b/docs/source/plugins.rst index 6f0d24e3..db7f58a4 100644 --- a/docs/source/plugins.rst +++ b/docs/source/plugins.rst @@ -24,6 +24,7 @@ Plugins platypush/plugins/gpio.zeroborg.rst platypush/plugins/http.request.rst platypush/plugins/ifttt.rst + platypush/plugins/kafka.rst platypush/plugins/light.rst platypush/plugins/light.hue.rst platypush/plugins/midi.rst diff --git a/platypush/backend/kafka/__init__.py b/platypush/backend/kafka/__init__.py index 70487f2e..bce8b052 100644 --- a/platypush/backend/kafka/__init__.py +++ b/platypush/backend/kafka/__init__.py @@ -4,7 +4,10 @@ import time from kafka import KafkaConsumer, KafkaProducer -from .. import Backend +from platypush.backend import Backend +from platypush.context import get_plugin +from platypush.message import Message +from platypush.message.event.kafka import KafkaMessageEvent class KafkaBackend(Backend): @@ -19,9 +22,9 @@ class KafkaBackend(Backend): _conn_retry_secs = 5 - def __init__(self, server, topic='platypush', **kwargs): + def __init__(self, server='localhost:9092', topic='platypush', **kwargs): """ - :param server: Kafka server + :param server: Kafka server name or address + port (default: ``localhost:9092``) :type server: str :param topic: (Prefix) topic to listen to (default: platypush). The Platypush device_id (by default the hostname) will be appended to the topic (the real topic name will e.g. be "platypush.my_rpi") @@ -40,29 +43,31 @@ class KafkaBackend(Backend): def _on_record(self, record): if record.topic != self.topic: return + msg = record.value.decode('utf-8') + is_platypush_message = False try: - msg = json.loads(record.value.decode('utf-8')) - except Exception as e: - self.logger.exception(e) + msg = Message.build(msg) + is_platypush_message = True + except: + pass - self.logger.debug('Received message on Kafka backend: {}'.format(msg)) - self.on_message(msg) + self.logger.info('Received message on Kafka backend: {}'.format(msg)) - def _init_producer(self): - if not self.producer: - self.producer = KafkaProducer(bootstrap_servers=self.server) + if is_platypush_message: + self.on_message(msg) + else: + self.on_message(KafkaMessageEvent(msg=msg)) def _topic_by_device_id(self, device_id): return '{}.{}'.format(self.topic_prefix, device_id) def send_message(self, msg): target = msg.target - msg = str(msg).encode('utf-8') - - self._init_producer() - self.producer.send(self._topic_by_device_id(target), msg) - self.producer.flush() + kafka_plugin = get_plugin('kafka') + kafka_plugin.send_message(msg=msg, + topic=self._topic_by_device_id(target), + server=self.server) def on_stop(self): try: diff --git a/platypush/message/event/kafka.py b/platypush/message/event/kafka.py new file mode 100644 index 00000000..79a1938f --- /dev/null +++ b/platypush/message/event/kafka.py @@ -0,0 +1,20 @@ +from platypush.message.event import Event + + +class KafkaMessageEvent(Event): + """ + Kafka message event object. Fired when :mod:`platypush.backend.kafka` receives + a new event. + """ + + def __init__(self, msg, *args, **kwargs): + """ + :param msg: Received message + :type msg: str or bytes stream + """ + + super().__init__(msg=msg, *args, **kwargs) + + +# vim:sw=4:ts=4:et: + diff --git a/platypush/plugins/kafka.py b/platypush/plugins/kafka.py new file mode 100644 index 00000000..d50bc03b --- /dev/null +++ b/platypush/plugins/kafka.py @@ -0,0 +1,70 @@ +import json +import logging +import time + +from kafka import KafkaProducer + +from platypush.context import get_backend +from platypush.plugins import Plugin, action + + +class KafkaPlugin(Plugin): + """ + Plugin to send messages to an Apache Kafka instance (https://kafka.apache.org/) + + Triggers: + + * :class:`platypush.message.event.kafka.KafkaMessageEvent` when a new message is received on the consumer topic. + + Requires: + + * **kafka** (``pip install kafka-python``) + """ + + def __init__(self, server=None, **kwargs): + """ + :param server: Default Kafka server name or address + port (format: ``host:port``) to dispatch the messages to. If None (default), then it has to be specified upon message sending. + :type server: str + """ + + super().__init__(**kwargs) + + self.server = '{server}:{port}'.format(server=server, port=port) \ + if server else None + + self.producer = None + + # Kafka can be veryyyy noisy + logging.getLogger('kafka').setLevel(logging.ERROR) + + + @action + def send_message(self, msg, topic, server=None, **kwargs): + """ + :param msg: Message to send - as a string, bytes stream, JSON, Platypush message, dictionary, or anything that implements ``__str__`` + + :param server: Kafka server name or address + port (format: ``host:port``). If None, then the default server will be used + :type server: str + """ + + if not server: + if not self.server: + try: + kafka_backend = get_backend('kafka') + server = kafka_backend.server + except: + raise RuntimeError('No Kafka server nor default server specified') + else: + server = self.server + + if isinstance(msg, dict) or isinstance(msg, list): + msg = json.dumps(msg) + msg = str(msg).encode('utf-8') + + producer = KafkaProducer(bootstrap_servers=server) + producer.send(topic, msg) + producer.flush() + + +# vim:sw=4:ts=4:et: +