diff --git a/docs/source/backends.rst b/docs/source/backends.rst index 7de6dc1b3..c209095c6 100644 --- a/docs/source/backends.rst +++ b/docs/source/backends.rst @@ -14,7 +14,6 @@ Backends platypush/backend/google.pubsub.rst platypush/backend/gps.rst platypush/backend/http.rst - platypush/backend/kafka.rst platypush/backend/mail.rst platypush/backend/midi.rst platypush/backend/music.mopidy.rst diff --git a/docs/source/platypush/backend/kafka.rst b/docs/source/platypush/backend/kafka.rst deleted file mode 100644 index 7de4c3ee4..000000000 --- a/docs/source/platypush/backend/kafka.rst +++ /dev/null @@ -1,6 +0,0 @@ -``kafka`` -=========================== - -.. automodule:: platypush.backend.kafka - :members: - diff --git a/platypush/backend/kafka/__init__.py b/platypush/backend/kafka/__init__.py deleted file mode 100644 index a86b715cf..000000000 --- a/platypush/backend/kafka/__init__.py +++ /dev/null @@ -1,109 +0,0 @@ -import logging -import time - -from platypush.backend import Backend -from platypush.context import get_plugin -from platypush.message import Message -from platypush.message.event.kafka import KafkaMessageEvent - - -class KafkaBackend(Backend): - """ - Backend to interact with an Apache Kafka (https://kafka.apache.org/) - streaming platform, send and receive messages. - """ - - _conn_retry_secs = 5 - - def __init__(self, server='localhost:9092', topic='platypush', **kwargs): - """ - :param server: Kafka server name or address + port (default: ``localhost:9092``) - :type server: str - - :param topic: (Prefix) topic to listen to (default: platypush). The - Platypush device_id (by default the hostname) will be appended to - the topic (the real topic name will e.g. be "platypush.my_rpi") - :type topic: str - """ - - super().__init__(**kwargs) - - self.server = server - self.topic_prefix = topic - self.topic = self._topic_by_device_id(self.device_id) - self.producer = None - self.consumer = None - - # Kafka can be veryyyy noisy - logging.getLogger('kafka').setLevel(logging.ERROR) - - def _on_record(self, record): - if record.topic != self.topic: - return - msg = record.value.decode('utf-8') - is_platypush_message = False - - try: - msg = Message.build(msg) - is_platypush_message = True - except Exception as e: - self.logger.debug(str(e)) - - self.logger.info('Received message on Kafka backend: {}'.format(msg)) - - if is_platypush_message: - self.on_message(msg) - else: - self.on_message(KafkaMessageEvent(msg=msg)) - - def _topic_by_device_id(self, device_id): - return '{}.{}'.format(self.topic_prefix, device_id) - - def send_message(self, msg, **_): - target = msg.target - kafka_plugin = get_plugin('kafka') - kafka_plugin.send_message( - msg=msg, topic=self._topic_by_device_id(target), server=self.server - ) - - def on_stop(self): - super().on_stop() - try: - if self.producer: - self.producer.flush() - self.producer.close() - - if self.consumer: - self.consumer.close() - except Exception as e: - self.logger.warning('Exception occurred while closing Kafka connection') - self.logger.exception(e) - - def run(self): - from kafka import KafkaConsumer - - super().run() - - self.consumer = KafkaConsumer(self.topic, bootstrap_servers=self.server) - self.logger.info( - 'Initialized kafka backend - server: {}, topic: {}'.format( - self.server, self.topic - ) - ) - - try: - for msg in self.consumer: - self._on_record(msg) - if self.should_stop(): - break - except Exception as e: - self.logger.warning( - 'Kafka connection error, reconnecting in {} seconds'.format( - self._conn_retry_secs - ) - ) - self.logger.exception(e) - time.sleep(self._conn_retry_secs) - - -# vim:sw=4:ts=4:et: diff --git a/platypush/backend/kafka/manifest.yaml b/platypush/backend/kafka/manifest.yaml deleted file mode 100644 index 91cf8860b..000000000 --- a/platypush/backend/kafka/manifest.yaml +++ /dev/null @@ -1,11 +0,0 @@ -manifest: - events: {} - install: - apt: - - python3-kafka - dnf: - - python-kafka - pip: - - kafka - package: platypush.backend.kafka - type: backend diff --git a/platypush/message/event/kafka.py b/platypush/message/event/kafka.py index 79a1938f5..e4d7e4886 100644 --- a/platypush/message/event/kafka.py +++ b/platypush/message/event/kafka.py @@ -1,3 +1,4 @@ +from typing import Union from platypush.message.event import Event @@ -7,14 +8,24 @@ class KafkaMessageEvent(Event): a new event. """ - def __init__(self, msg, *args, **kwargs): + def __init__( + self, + *args, + msg: Union[str, list, dict], + topic: str, + host: str, + port: int, + **kwargs + ): """ - :param msg: Received message - :type msg: str or bytes stream + :param msg: Received message. If the message is a JSON string, it will + be returned as a dict or list. If it's a binary blob, it will be + returned as a base64-encoded string. + :param topic: Topic where the message was received. + :param host: Host where the message was received. + :param port: Port where the message was received. """ - - super().__init__(msg=msg, *args, **kwargs) + super().__init__(*args, msg=msg, topic=topic, host=host, port=port, **kwargs) # vim:sw=4:ts=4:et: - diff --git a/platypush/plugins/kafka/__init__.py b/platypush/plugins/kafka/__init__.py index 376a6b9f9..7d42777ff 100644 --- a/platypush/plugins/kafka/__init__.py +++ b/platypush/plugins/kafka/__init__.py @@ -1,71 +1,243 @@ +import base64 import json import logging +from collections import defaultdict +from threading import RLock, Thread +from typing import Dict, Iterable, Optional, Union -from platypush.context import get_backend -from platypush.plugins import Plugin, action +from kafka import KafkaConsumer, KafkaProducer + +from platypush.message.event.kafka import KafkaMessageEvent +from platypush.plugins import RunnablePlugin, action -class KafkaPlugin(Plugin): +class KafkaPlugin(RunnablePlugin): """ Plugin to send messages to an Apache Kafka instance (https://kafka.apache.org/) """ - def __init__(self, server=None, port=9092, **kwargs): + def __init__( + self, + host: Optional[str] = None, + port: int = 9092, + listeners: Optional[Iterable[dict]] = None, + connection_retry_secs: float = 5.0, + **kwargs, + ): """ - :param server: Default Kafka server name or address. If None (default), then it has to be specified upon - message sending. - :type server: str - + :param host: Default Kafka server name or address. If None (default), + then it has to be specified when calling the ``send_message`` action. :param port: Default Kafka server port (default: 9092). - :type port: int + :param connection_retry_secs: Seconds to wait before retrying to + connect to the Kafka server after a connection error (default: 5). + :param listeners: If specified, the Kafka plugin will listen for + messages on these topics. Use this parameter if you also want to + listen on other Kafka brokers other than the primary one. This + parameter supports a list of maps, where each item supports the + same arguments passed to the main configuration (host, port, topic, + password etc.). If host/port are omitted, then the host/port value + from the plugin configuration will be used. If any of the other + fields are omitted, then their default value will be used (usually + null). Example: + + .. code-block:: yaml + + listeners: + # This listener use the default configured host/port + - topics: + - topic1 + - topic2 + - topic3 + + # This will use a custom MQTT broker host + - host: sensors + port: 19200 + username: myuser + password: secret + topics: + - topic4 + - topic5 + """ super().__init__(**kwargs) - self.server = ( - '{server}:{port}'.format(server=server, port=port) if server else None - ) + self.host = host + self.port = port + self._conn_retry_secs = connection_retry_secs + self._listeners = listeners or [] - self.producer = None + # `server:port` -> KafkaProducer mapping + self.producers: Dict[str, KafkaProducer] = {} + # `server:port` -> KafkaConsumer mapping + self.consumers: Dict[str, KafkaConsumer] = {} - # Kafka can be veryyyy noisy + # Synchronization locks for the producers/consumers maps, + # since python-kafka is not thread-safe + self._producers_locks = defaultdict(RLock) + self._consumers_locks = defaultdict(RLock) + + # Kafka can be very noisy logging.getLogger('kafka').setLevel(logging.ERROR) - @action - def send_message(self, msg, topic, server=None): - """ - :param msg: Message to send - as a string, bytes stream, JSON, Platypush message, dictionary, or anything - that implements ``__str__`` + def _get_srv_str( + self, host: Optional[str] = None, port: Optional[int] = None + ) -> str: + if not host: + host = self.host - :param topic: Topic to send the message to. - :type topic: str + assert host, 'No Kafka server specified' + if not port: + port = self.port - :param server: Kafka server name or address + port (format: ``host:port``). If None, then the default server - will be used - :type server: str - """ + return f'{host}:{port}' - from kafka import KafkaProducer + def _get_producer( + self, host: Optional[str] = None, port: Optional[int] = None, **kwargs + ): + srv_str = self._get_srv_str(host, port) + with self._producers_locks[srv_str]: + if srv_str not in self.producers: + self.producers[srv_str] = KafkaProducer( + bootstrap_servers=srv_str, **kwargs + ) + return self.producers[srv_str] - if not server: - if not self.server: - try: - kafka_backend = get_backend('kafka') - server = kafka_backend.server - except Exception as e: - raise RuntimeError( - f'No Kafka server nor default server specified: {str(e)}' + def _get_consumer( + self, host: Optional[str] = None, port: Optional[int] = None, **kwargs + ): + srv_str = self._get_srv_str(host, port) + with self._consumers_locks[srv_str]: + if srv_str not in self.consumers: + self.consumers[srv_str] = KafkaConsumer( + bootstrap_servers=srv_str, **kwargs + ) + return self.consumers[srv_str] + + def _on_msg(self, record, host: str, port: int): + try: + msg = record.value.decode() + except UnicodeDecodeError: + msg = base64.b64encode(record.value).decode() + + try: + msg = json.loads(msg) + except (TypeError, ValueError): + pass + + self._bus.post( + KafkaMessageEvent(msg=msg, topic=record.topic, host=host, port=port) + ) + + def _consumer_monitor(self, consumer: KafkaConsumer, host: str, port: int): + while not self.should_stop(): + try: + for msg in consumer: + self._on_msg(msg, host=host, port=port) + if self.should_stop(): + break + except Exception as e: + if not self.should_stop(): + self.logger.exception(e) + self.logger.warning( + 'Kafka connection error to %s:%d, reconnecting in %f seconds', + host, + port, + self._conn_retry_secs, ) - else: - server = self.server + self.wait_stop(self._conn_retry_secs) + + @action + def publish(self, msg: Union[str, list, dict, tuple, bytes], topic: str, **kwargs): + """ + :param msg: Message to send. + :param topic: Topic to send the message to. + :param kwargs: Additional arguments to pass to the KafkaConsumer, + including ``host`` and ``port``. + """ + if isinstance(msg, tuple): + msg = list(msg) if isinstance(msg, (dict, list)): msg = json.dumps(msg) - msg = str(msg).encode() + if not isinstance(msg, bytes): + msg = str(msg).encode() - producer = KafkaProducer(bootstrap_servers=server) + producer = self._get_producer(**kwargs) producer.send(topic, msg) producer.flush() + @action + def send_message( + self, msg: Union[str, list, dict, tuple, bytes], topic: str, **kwargs + ): + """ + Alias for :meth:`.publish`. + """ + return self.send_message(msg=msg, topic=topic, **kwargs) + + @action + def subscribe(self, topic: str, **kwargs): + """ + Subscribe to a topic. + + :param topic: Topic to subscribe to. + :param kwargs: Additional arguments to pass to the KafkaConsumer, + including ``host`` and ``port``. + """ + consumer = self._get_consumer(**kwargs) + consumer.subscribe([topic]) + + @action + def unsubscribe(self, **kwargs): + """ + Unsubscribe from all the topics on a consumer. + + :param kwargs: Additional arguments to pass to the KafkaConsumer, + including ``host`` and ``port``. + """ + consumer = self._get_consumer(**kwargs) + consumer.unsubscribe() + + def main(self): + for listener in self._listeners: + host = listener.get('host', self.host) + port = listener.get('port', self.port) + topics = listener.get('topics') + if not topics: + continue + + consumer = self._get_consumer( + host=host, + port=port, + group_id='platypush', + auto_offset_reset='earliest', + ) + + consumer.subscribe(topics) + Thread( + target=self._consumer_monitor, + args=(consumer,), + kwargs={'host': host, 'port': port}, + daemon=True, + ).start() + + self.wait_stop() + + def stop(self): + super().stop() + for srv, producer in self.producers.items(): + try: + producer.flush() + producer.close() + except Exception as e: + self.logger.warning('Error while closing Kafka producer %s: %s', srv, e) + + for srv, consumer in self.consumers.items(): + try: + consumer.close() + except Exception as e: + self.logger.warning('Error while closing Kafka consumer %s: %s', srv, e) + # vim:sw=4:ts=4:et: diff --git a/platypush/plugins/kafka/manifest.yaml b/platypush/plugins/kafka/manifest.yaml index 6804752e2..2fac76954 100644 --- a/platypush/plugins/kafka/manifest.yaml +++ b/platypush/plugins/kafka/manifest.yaml @@ -1,7 +1,6 @@ manifest: events: - platypush.message.event.kafka.KafkaMessageEvent: when a new message is received - on the consumer topic. + - platypush.message.event.kafka.KafkaMessageEvent install: apt: - python-kafka