platypush/platypush/backend/kafka/__init__.py
Fabio Manganiello c3337ccc6c
[] Docs deps autogen sphinx plugin.
Added an `add_dependencies` plugin to the Sphinx build process that
parses the manifest files of the scanned backends and plugins and
automatically generates the documentation for the required dependencies
and triggered events.

This means that those dependencies are no longer required to be listed
in the docstring of the class itself.

Also in this commit:

- Black/LINT for some integrations that hadn't been touched in a long
  time.

- Deleted some leftovers from previous refactors (deprecated
  `backend.mqtt`, `backend.zwave.mqtt`, `backend.http.request.rss`).

- Deleted deprecated `inotify` backend - replaced by `file.monitor` (see
  ).
2023-09-24 17:00:08 +02:00

109 lines
3.2 KiB
Python

import logging
import time
from platypush.backend import Backend
from platypush.context import get_plugin
from platypush.message import Message
from platypush.message.event.kafka import KafkaMessageEvent
class KafkaBackend(Backend):
"""
Backend to interact with an Apache Kafka (https://kafka.apache.org/)
streaming platform, send and receive messages.
"""
_conn_retry_secs = 5
def __init__(self, server='localhost:9092', topic='platypush', **kwargs):
"""
:param server: Kafka server name or address + port (default: ``localhost:9092``)
:type server: str
:param topic: (Prefix) topic to listen to (default: platypush). The
Platypush device_id (by default the hostname) will be appended to
the topic (the real topic name will e.g. be "platypush.my_rpi")
:type topic: str
"""
super().__init__(**kwargs)
self.server = server
self.topic_prefix = topic
self.topic = self._topic_by_device_id(self.device_id)
self.producer = None
self.consumer = None
# Kafka can be veryyyy noisy
logging.getLogger('kafka').setLevel(logging.ERROR)
def _on_record(self, record):
if record.topic != self.topic:
return
msg = record.value.decode('utf-8')
is_platypush_message = False
try:
msg = Message.build(msg)
is_platypush_message = True
except Exception as e:
self.logger.debug(str(e))
self.logger.info('Received message on Kafka backend: {}'.format(msg))
if is_platypush_message:
self.on_message(msg)
else:
self.on_message(KafkaMessageEvent(msg=msg))
def _topic_by_device_id(self, device_id):
return '{}.{}'.format(self.topic_prefix, device_id)
def send_message(self, msg, **_):
target = msg.target
kafka_plugin = get_plugin('kafka')
kafka_plugin.send_message(
msg=msg, topic=self._topic_by_device_id(target), server=self.server
)
def on_stop(self):
super().on_stop()
try:
if self.producer:
self.producer.flush()
self.producer.close()
if self.consumer:
self.consumer.close()
except Exception as e:
self.logger.warning('Exception occurred while closing Kafka connection')
self.logger.exception(e)
def run(self):
from kafka import KafkaConsumer
super().run()
self.consumer = KafkaConsumer(self.topic, bootstrap_servers=self.server)
self.logger.info(
'Initialized kafka backend - server: {}, topic: {}'.format(
self.server, self.topic
)
)
try:
for msg in self.consumer:
self._on_record(msg)
if self.should_stop():
break
except Exception as e:
self.logger.warning(
'Kafka connection error, reconnecting in {} seconds'.format(
self._conn_retry_secs
)
)
self.logger.exception(e)
time.sleep(self._conn_retry_secs)
# vim:sw=4:ts=4:et: