forked from platypush/platypush
[kafka] Extended features.
- Added partition, key, offset, timestamp and headers to `KafkaMessageEvent`. - Added `group_id` to consumers configuration. - Added `key`, `timeout` and `compression_type` to `kafka.publish`.
This commit is contained in:
parent
f712537673
commit
f3aa245c0e
2 changed files with 86 additions and 15 deletions
|
@ -1,4 +1,5 @@
|
|||
from typing import Union
|
||||
from typing import Iterable, Optional, Union
|
||||
|
||||
from platypush.message.event import Event
|
||||
|
||||
|
||||
|
@ -15,6 +16,11 @@ class KafkaMessageEvent(Event):
|
|||
topic: str,
|
||||
host: str,
|
||||
port: int,
|
||||
partition: int,
|
||||
offset: int,
|
||||
timestamp: float,
|
||||
key: Optional[str] = None,
|
||||
headers: Optional[Iterable] = None,
|
||||
**kwargs
|
||||
):
|
||||
"""
|
||||
|
@ -24,8 +30,25 @@ class KafkaMessageEvent(Event):
|
|||
:param topic: Topic where the message was received.
|
||||
:param host: Host where the message was received.
|
||||
:param port: Port where the message was received.
|
||||
:param partition: Partition where the message was received.
|
||||
:param offset: Offset of the message.
|
||||
:param timestamp: Timestamp of the message.
|
||||
:param key: Optional message key.
|
||||
:param headers: Optional message headers.
|
||||
"""
|
||||
super().__init__(*args, msg=msg, topic=topic, host=host, port=port, **kwargs)
|
||||
super().__init__(
|
||||
*args,
|
||||
msg=msg,
|
||||
topic=topic,
|
||||
host=host,
|
||||
port=port,
|
||||
partition=partition,
|
||||
offset=offset,
|
||||
timestamp=timestamp,
|
||||
key=key,
|
||||
headers=headers,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
|
||||
# vim:sw=4:ts=4:et:
|
||||
|
|
|
@ -22,6 +22,7 @@ class KafkaPlugin(RunnablePlugin):
|
|||
port: int = 9092,
|
||||
listeners: Optional[Iterable[dict]] = None,
|
||||
connection_retry_secs: float = 5.0,
|
||||
group_id: str = 'platypush',
|
||||
**kwargs,
|
||||
):
|
||||
"""
|
||||
|
@ -30,6 +31,7 @@ class KafkaPlugin(RunnablePlugin):
|
|||
:param port: Default Kafka server port (default: 9092).
|
||||
:param connection_retry_secs: Seconds to wait before retrying to
|
||||
connect to the Kafka server after a connection error (default: 5).
|
||||
:param group_id: Default Kafka consumer group ID (default: platypush).
|
||||
:param listeners: If specified, the Kafka plugin will listen for
|
||||
messages on these topics. Use this parameter if you also want to
|
||||
listen on other Kafka brokers other than the primary one. This
|
||||
|
@ -54,18 +56,20 @@ class KafkaPlugin(RunnablePlugin):
|
|||
port: 19200
|
||||
username: myuser
|
||||
password: secret
|
||||
group_id: mygroup
|
||||
compression_type: gzip
|
||||
topics:
|
||||
- topic4
|
||||
- topic5
|
||||
|
||||
"""
|
||||
|
||||
super().__init__(**kwargs)
|
||||
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.group_id = group_id
|
||||
self._conn_retry_secs = connection_retry_secs
|
||||
self._listeners = listeners or []
|
||||
self._listeners = list(self._convert_listeners(listeners))
|
||||
|
||||
# `server:port` -> KafkaProducer mapping
|
||||
self.producers: Dict[str, KafkaProducer] = {}
|
||||
|
@ -80,6 +84,13 @@ class KafkaPlugin(RunnablePlugin):
|
|||
# Kafka can be very noisy
|
||||
logging.getLogger('kafka').setLevel(logging.ERROR)
|
||||
|
||||
def _convert_listeners(self, listeners: Optional[Iterable[dict]]) -> Iterable[dict]:
|
||||
for listener in listeners or []:
|
||||
listener['host'] = listener.get('host', self.host)
|
||||
listener['port'] = listener.get('port', self.port)
|
||||
listener['topics'] = listener.get('topics', [])
|
||||
yield listener
|
||||
|
||||
def _get_srv_str(
|
||||
self, host: Optional[str] = None, port: Optional[int] = None
|
||||
) -> str:
|
||||
|
@ -104,7 +115,10 @@ class KafkaPlugin(RunnablePlugin):
|
|||
return self.producers[srv_str]
|
||||
|
||||
def _get_consumer(
|
||||
self, host: Optional[str] = None, port: Optional[int] = None, **kwargs
|
||||
self,
|
||||
host: Optional[str] = None,
|
||||
port: Optional[int] = None,
|
||||
**kwargs,
|
||||
):
|
||||
srv_str = self._get_srv_str(host, port)
|
||||
with self._consumers_locks[srv_str]:
|
||||
|
@ -112,6 +126,7 @@ class KafkaPlugin(RunnablePlugin):
|
|||
self.consumers[srv_str] = KafkaConsumer(
|
||||
bootstrap_servers=srv_str, **kwargs
|
||||
)
|
||||
|
||||
return self.consumers[srv_str]
|
||||
|
||||
def _on_msg(self, record, host: str, port: int):
|
||||
|
@ -125,8 +140,24 @@ class KafkaPlugin(RunnablePlugin):
|
|||
except (TypeError, ValueError):
|
||||
pass
|
||||
|
||||
try:
|
||||
key = record.key.decode()
|
||||
except Exception:
|
||||
key = record.key
|
||||
|
||||
self._bus.post(
|
||||
KafkaMessageEvent(msg=msg, topic=record.topic, host=host, port=port)
|
||||
KafkaMessageEvent(
|
||||
msg=msg,
|
||||
topic=record.topic,
|
||||
host=host,
|
||||
port=port,
|
||||
partition=record.partition,
|
||||
offset=record.offset,
|
||||
timestamp=float(record.timestamp / 1000) if record.timestamp else None,
|
||||
key=key,
|
||||
headers=record.headers,
|
||||
size=record.serialized_value_size,
|
||||
)
|
||||
)
|
||||
|
||||
def _consumer_monitor(self, consumer: KafkaConsumer, host: str, port: int):
|
||||
|
@ -149,10 +180,23 @@ class KafkaPlugin(RunnablePlugin):
|
|||
self.wait_stop(self._conn_retry_secs)
|
||||
|
||||
@action
|
||||
def publish(self, msg: Union[str, list, dict, tuple, bytes], topic: str, **kwargs):
|
||||
def publish(
|
||||
self,
|
||||
msg: Union[str, list, dict, tuple, bytes],
|
||||
topic: str,
|
||||
key: Optional[str] = None,
|
||||
timeout: Optional[float] = 60.0,
|
||||
compression_type: Optional[str] = None,
|
||||
**kwargs,
|
||||
):
|
||||
"""
|
||||
:param msg: Message to send.
|
||||
:param topic: Topic to send the message to.
|
||||
:param key: For hashed-based partitioning, the key to use to determine
|
||||
the partition.
|
||||
:param timeout: Timeout in seconds for the message to be sent.
|
||||
:param compression_type: Compression type to use for the message (e.g.
|
||||
``gzip``).
|
||||
:param kwargs: Additional arguments to pass to the KafkaConsumer,
|
||||
including ``host`` and ``port``.
|
||||
"""
|
||||
|
@ -163,9 +207,10 @@ class KafkaPlugin(RunnablePlugin):
|
|||
if not isinstance(msg, bytes):
|
||||
msg = str(msg).encode()
|
||||
|
||||
producer = self._get_producer(**kwargs)
|
||||
producer.send(topic, msg)
|
||||
producer.flush()
|
||||
encoded_key = key.encode() if key else None
|
||||
producer = self._get_producer(compression_type=compression_type, **kwargs)
|
||||
future = producer.send(topic, key=encoded_key, value=msg)
|
||||
future.get(timeout=timeout)
|
||||
|
||||
@action
|
||||
def send_message(
|
||||
|
@ -177,14 +222,17 @@ class KafkaPlugin(RunnablePlugin):
|
|||
return self.send_message(msg=msg, topic=topic, **kwargs)
|
||||
|
||||
@action
|
||||
def subscribe(self, topic: str, **kwargs):
|
||||
def subscribe(self, topic: str, group_id: Optional[str] = None, **kwargs):
|
||||
"""
|
||||
Subscribe to a topic.
|
||||
|
||||
:param topic: Topic to subscribe to.
|
||||
:param group_id: Group ID to use for the consumer. If None, then the
|
||||
group ID from the plugin configuration will be used.
|
||||
:param kwargs: Additional arguments to pass to the KafkaConsumer,
|
||||
including ``host`` and ``port``.
|
||||
"""
|
||||
kwargs['group_id'] = kwargs.get('group_id', group_id or self.group_id)
|
||||
consumer = self._get_consumer(**kwargs)
|
||||
consumer.subscribe([topic])
|
||||
|
||||
|
@ -201,16 +249,16 @@ class KafkaPlugin(RunnablePlugin):
|
|||
|
||||
def main(self):
|
||||
for listener in self._listeners:
|
||||
host = listener.get('host', self.host)
|
||||
port = listener.get('port', self.port)
|
||||
topics = listener.get('topics')
|
||||
host = listener['host']
|
||||
port = listener['port']
|
||||
topics = listener['topics']
|
||||
if not topics:
|
||||
continue
|
||||
|
||||
consumer = self._get_consumer(
|
||||
host=host,
|
||||
port=port,
|
||||
group_id='platypush',
|
||||
group_id=self.group_id,
|
||||
auto_offset_reset='earliest',
|
||||
)
|
||||
|
||||
|
|
Loading…
Reference in a new issue