[#347] Merge kafka backend and plugin.

Closes: #347
This commit is contained in:
Fabio Manganiello 2024-01-18 02:25:37 +01:00
parent a596ed80a2
commit 95c15f3f5f
7 changed files with 228 additions and 173 deletions

View file

@ -14,7 +14,6 @@ Backends
platypush/backend/google.pubsub.rst platypush/backend/google.pubsub.rst
platypush/backend/gps.rst platypush/backend/gps.rst
platypush/backend/http.rst platypush/backend/http.rst
platypush/backend/kafka.rst
platypush/backend/mail.rst platypush/backend/mail.rst
platypush/backend/midi.rst platypush/backend/midi.rst
platypush/backend/music.mopidy.rst platypush/backend/music.mopidy.rst

View file

@ -1,6 +0,0 @@
``kafka``
===========================
.. automodule:: platypush.backend.kafka
:members:

View file

@ -1,109 +0,0 @@
import logging
import time
from platypush.backend import Backend
from platypush.context import get_plugin
from platypush.message import Message
from platypush.message.event.kafka import KafkaMessageEvent
class KafkaBackend(Backend):
"""
Backend to interact with an Apache Kafka (https://kafka.apache.org/)
streaming platform, send and receive messages.
"""
_conn_retry_secs = 5
def __init__(self, server='localhost:9092', topic='platypush', **kwargs):
"""
:param server: Kafka server name or address + port (default: ``localhost:9092``)
:type server: str
:param topic: (Prefix) topic to listen to (default: platypush). The
Platypush device_id (by default the hostname) will be appended to
the topic (the real topic name will e.g. be "platypush.my_rpi")
:type topic: str
"""
super().__init__(**kwargs)
self.server = server
self.topic_prefix = topic
self.topic = self._topic_by_device_id(self.device_id)
self.producer = None
self.consumer = None
# Kafka can be veryyyy noisy
logging.getLogger('kafka').setLevel(logging.ERROR)
def _on_record(self, record):
if record.topic != self.topic:
return
msg = record.value.decode('utf-8')
is_platypush_message = False
try:
msg = Message.build(msg)
is_platypush_message = True
except Exception as e:
self.logger.debug(str(e))
self.logger.info('Received message on Kafka backend: {}'.format(msg))
if is_platypush_message:
self.on_message(msg)
else:
self.on_message(KafkaMessageEvent(msg=msg))
def _topic_by_device_id(self, device_id):
return '{}.{}'.format(self.topic_prefix, device_id)
def send_message(self, msg, **_):
target = msg.target
kafka_plugin = get_plugin('kafka')
kafka_plugin.send_message(
msg=msg, topic=self._topic_by_device_id(target), server=self.server
)
def on_stop(self):
super().on_stop()
try:
if self.producer:
self.producer.flush()
self.producer.close()
if self.consumer:
self.consumer.close()
except Exception as e:
self.logger.warning('Exception occurred while closing Kafka connection')
self.logger.exception(e)
def run(self):
from kafka import KafkaConsumer
super().run()
self.consumer = KafkaConsumer(self.topic, bootstrap_servers=self.server)
self.logger.info(
'Initialized kafka backend - server: {}, topic: {}'.format(
self.server, self.topic
)
)
try:
for msg in self.consumer:
self._on_record(msg)
if self.should_stop():
break
except Exception as e:
self.logger.warning(
'Kafka connection error, reconnecting in {} seconds'.format(
self._conn_retry_secs
)
)
self.logger.exception(e)
time.sleep(self._conn_retry_secs)
# vim:sw=4:ts=4:et:

View file

@ -1,11 +0,0 @@
manifest:
events: {}
install:
apt:
- python3-kafka
dnf:
- python-kafka
pip:
- kafka
package: platypush.backend.kafka
type: backend

View file

@ -1,3 +1,4 @@
from typing import Union
from platypush.message.event import Event from platypush.message.event import Event
@ -7,14 +8,24 @@ class KafkaMessageEvent(Event):
a new event. a new event.
""" """
def __init__(self, msg, *args, **kwargs): def __init__(
self,
*args,
msg: Union[str, list, dict],
topic: str,
host: str,
port: int,
**kwargs
):
""" """
:param msg: Received message :param msg: Received message. If the message is a JSON string, it will
:type msg: str or bytes stream be returned as a dict or list. If it's a binary blob, it will be
returned as a base64-encoded string.
:param topic: Topic where the message was received.
:param host: Host where the message was received.
:param port: Port where the message was received.
""" """
super().__init__(*args, msg=msg, topic=topic, host=host, port=port, **kwargs)
super().__init__(msg=msg, *args, **kwargs)
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et:

View file

@ -1,71 +1,243 @@
import base64
import json import json
import logging import logging
from collections import defaultdict
from threading import RLock, Thread
from typing import Dict, Iterable, Optional, Union
from platypush.context import get_backend from kafka import KafkaConsumer, KafkaProducer
from platypush.plugins import Plugin, action
from platypush.message.event.kafka import KafkaMessageEvent
from platypush.plugins import RunnablePlugin, action
class KafkaPlugin(Plugin): class KafkaPlugin(RunnablePlugin):
""" """
Plugin to send messages to an Apache Kafka instance (https://kafka.apache.org/) Plugin to send messages to an Apache Kafka instance (https://kafka.apache.org/)
""" """
def __init__(self, server=None, port=9092, **kwargs): def __init__(
self,
host: Optional[str] = None,
port: int = 9092,
listeners: Optional[Iterable[dict]] = None,
connection_retry_secs: float = 5.0,
**kwargs,
):
""" """
:param server: Default Kafka server name or address. If None (default), then it has to be specified upon :param host: Default Kafka server name or address. If None (default),
message sending. then it has to be specified when calling the ``send_message`` action.
:type server: str
:param port: Default Kafka server port (default: 9092). :param port: Default Kafka server port (default: 9092).
:type port: int :param connection_retry_secs: Seconds to wait before retrying to
connect to the Kafka server after a connection error (default: 5).
:param listeners: If specified, the Kafka plugin will listen for
messages on these topics. Use this parameter if you also want to
listen on other Kafka brokers other than the primary one. This
parameter supports a list of maps, where each item supports the
same arguments passed to the main configuration (host, port, topic,
password etc.). If host/port are omitted, then the host/port value
from the plugin configuration will be used. If any of the other
fields are omitted, then their default value will be used (usually
null). Example:
.. code-block:: yaml
listeners:
# This listener use the default configured host/port
- topics:
- topic1
- topic2
- topic3
# This will use a custom MQTT broker host
- host: sensors
port: 19200
username: myuser
password: secret
topics:
- topic4
- topic5
""" """
super().__init__(**kwargs) super().__init__(**kwargs)
self.server = ( self.host = host
'{server}:{port}'.format(server=server, port=port) if server else None self.port = port
) self._conn_retry_secs = connection_retry_secs
self._listeners = listeners or []
self.producer = None # `server:port` -> KafkaProducer mapping
self.producers: Dict[str, KafkaProducer] = {}
# `server:port` -> KafkaConsumer mapping
self.consumers: Dict[str, KafkaConsumer] = {}
# Kafka can be veryyyy noisy # Synchronization locks for the producers/consumers maps,
# since python-kafka is not thread-safe
self._producers_locks = defaultdict(RLock)
self._consumers_locks = defaultdict(RLock)
# Kafka can be very noisy
logging.getLogger('kafka').setLevel(logging.ERROR) logging.getLogger('kafka').setLevel(logging.ERROR)
@action def _get_srv_str(
def send_message(self, msg, topic, server=None): self, host: Optional[str] = None, port: Optional[int] = None
""" ) -> str:
:param msg: Message to send - as a string, bytes stream, JSON, Platypush message, dictionary, or anything if not host:
that implements ``__str__`` host = self.host
:param topic: Topic to send the message to. assert host, 'No Kafka server specified'
:type topic: str if not port:
port = self.port
:param server: Kafka server name or address + port (format: ``host:port``). If None, then the default server return f'{host}:{port}'
will be used
:type server: str
"""
from kafka import KafkaProducer def _get_producer(
self, host: Optional[str] = None, port: Optional[int] = None, **kwargs
):
srv_str = self._get_srv_str(host, port)
with self._producers_locks[srv_str]:
if srv_str not in self.producers:
self.producers[srv_str] = KafkaProducer(
bootstrap_servers=srv_str, **kwargs
)
return self.producers[srv_str]
if not server: def _get_consumer(
if not self.server: self, host: Optional[str] = None, port: Optional[int] = None, **kwargs
try: ):
kafka_backend = get_backend('kafka') srv_str = self._get_srv_str(host, port)
server = kafka_backend.server with self._consumers_locks[srv_str]:
except Exception as e: if srv_str not in self.consumers:
raise RuntimeError( self.consumers[srv_str] = KafkaConsumer(
f'No Kafka server nor default server specified: {str(e)}' bootstrap_servers=srv_str, **kwargs
)
return self.consumers[srv_str]
def _on_msg(self, record, host: str, port: int):
try:
msg = record.value.decode()
except UnicodeDecodeError:
msg = base64.b64encode(record.value).decode()
try:
msg = json.loads(msg)
except (TypeError, ValueError):
pass
self._bus.post(
KafkaMessageEvent(msg=msg, topic=record.topic, host=host, port=port)
)
def _consumer_monitor(self, consumer: KafkaConsumer, host: str, port: int):
while not self.should_stop():
try:
for msg in consumer:
self._on_msg(msg, host=host, port=port)
if self.should_stop():
break
except Exception as e:
if not self.should_stop():
self.logger.exception(e)
self.logger.warning(
'Kafka connection error to %s:%d, reconnecting in %f seconds',
host,
port,
self._conn_retry_secs,
) )
else:
server = self.server
self.wait_stop(self._conn_retry_secs)
@action
def publish(self, msg: Union[str, list, dict, tuple, bytes], topic: str, **kwargs):
"""
:param msg: Message to send.
:param topic: Topic to send the message to.
:param kwargs: Additional arguments to pass to the KafkaConsumer,
including ``host`` and ``port``.
"""
if isinstance(msg, tuple):
msg = list(msg)
if isinstance(msg, (dict, list)): if isinstance(msg, (dict, list)):
msg = json.dumps(msg) msg = json.dumps(msg)
msg = str(msg).encode() if not isinstance(msg, bytes):
msg = str(msg).encode()
producer = KafkaProducer(bootstrap_servers=server) producer = self._get_producer(**kwargs)
producer.send(topic, msg) producer.send(topic, msg)
producer.flush() producer.flush()
@action
def send_message(
self, msg: Union[str, list, dict, tuple, bytes], topic: str, **kwargs
):
"""
Alias for :meth:`.publish`.
"""
return self.send_message(msg=msg, topic=topic, **kwargs)
@action
def subscribe(self, topic: str, **kwargs):
"""
Subscribe to a topic.
:param topic: Topic to subscribe to.
:param kwargs: Additional arguments to pass to the KafkaConsumer,
including ``host`` and ``port``.
"""
consumer = self._get_consumer(**kwargs)
consumer.subscribe([topic])
@action
def unsubscribe(self, **kwargs):
"""
Unsubscribe from all the topics on a consumer.
:param kwargs: Additional arguments to pass to the KafkaConsumer,
including ``host`` and ``port``.
"""
consumer = self._get_consumer(**kwargs)
consumer.unsubscribe()
def main(self):
for listener in self._listeners:
host = listener.get('host', self.host)
port = listener.get('port', self.port)
topics = listener.get('topics')
if not topics:
continue
consumer = self._get_consumer(
host=host,
port=port,
group_id='platypush',
auto_offset_reset='earliest',
)
consumer.subscribe(topics)
Thread(
target=self._consumer_monitor,
args=(consumer,),
kwargs={'host': host, 'port': port},
daemon=True,
).start()
self.wait_stop()
def stop(self):
super().stop()
for srv, producer in self.producers.items():
try:
producer.flush()
producer.close()
except Exception as e:
self.logger.warning('Error while closing Kafka producer %s: %s', srv, e)
for srv, consumer in self.consumers.items():
try:
consumer.close()
except Exception as e:
self.logger.warning('Error while closing Kafka consumer %s: %s', srv, e)
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et:

View file

@ -1,7 +1,6 @@
manifest: manifest:
events: events:
platypush.message.event.kafka.KafkaMessageEvent: when a new message is received - platypush.message.event.kafka.KafkaMessageEvent
on the consumer topic.
install: install:
apt: apt:
- python-kafka - python-kafka