diff --git a/platypush/message/event/kafka.py b/platypush/message/event/kafka.py index e4d7e48866..14c872a5ba 100644 --- a/platypush/message/event/kafka.py +++ b/platypush/message/event/kafka.py @@ -1,4 +1,5 @@ -from typing import Union +from typing import Iterable, Optional, Union + from platypush.message.event import Event @@ -15,6 +16,11 @@ class KafkaMessageEvent(Event): topic: str, host: str, port: int, + partition: int, + offset: int, + timestamp: float, + key: Optional[str] = None, + headers: Optional[Iterable] = None, **kwargs ): """ @@ -24,8 +30,25 @@ class KafkaMessageEvent(Event): :param topic: Topic where the message was received. :param host: Host where the message was received. :param port: Port where the message was received. + :param partition: Partition where the message was received. + :param offset: Offset of the message. + :param timestamp: Timestamp of the message. + :param key: Optional message key. + :param headers: Optional message headers. """ - super().__init__(*args, msg=msg, topic=topic, host=host, port=port, **kwargs) + super().__init__( + *args, + msg=msg, + topic=topic, + host=host, + port=port, + partition=partition, + offset=offset, + timestamp=timestamp, + key=key, + headers=headers, + **kwargs, + ) # vim:sw=4:ts=4:et: diff --git a/platypush/plugins/kafka/__init__.py b/platypush/plugins/kafka/__init__.py index 7d42777fff..49f4099a84 100644 --- a/platypush/plugins/kafka/__init__.py +++ b/platypush/plugins/kafka/__init__.py @@ -22,6 +22,7 @@ class KafkaPlugin(RunnablePlugin): port: int = 9092, listeners: Optional[Iterable[dict]] = None, connection_retry_secs: float = 5.0, + group_id: str = 'platypush', **kwargs, ): """ @@ -30,6 +31,7 @@ class KafkaPlugin(RunnablePlugin): :param port: Default Kafka server port (default: 9092). :param connection_retry_secs: Seconds to wait before retrying to connect to the Kafka server after a connection error (default: 5). + :param group_id: Default Kafka consumer group ID (default: platypush). :param listeners: If specified, the Kafka plugin will listen for messages on these topics. Use this parameter if you also want to listen on other Kafka brokers other than the primary one. This @@ -54,18 +56,20 @@ class KafkaPlugin(RunnablePlugin): port: 19200 username: myuser password: secret + group_id: mygroup + compression_type: gzip topics: - topic4 - topic5 """ - super().__init__(**kwargs) self.host = host self.port = port + self.group_id = group_id self._conn_retry_secs = connection_retry_secs - self._listeners = listeners or [] + self._listeners = list(self._convert_listeners(listeners)) # `server:port` -> KafkaProducer mapping self.producers: Dict[str, KafkaProducer] = {} @@ -80,6 +84,13 @@ class KafkaPlugin(RunnablePlugin): # Kafka can be very noisy logging.getLogger('kafka').setLevel(logging.ERROR) + def _convert_listeners(self, listeners: Optional[Iterable[dict]]) -> Iterable[dict]: + for listener in listeners or []: + listener['host'] = listener.get('host', self.host) + listener['port'] = listener.get('port', self.port) + listener['topics'] = listener.get('topics', []) + yield listener + def _get_srv_str( self, host: Optional[str] = None, port: Optional[int] = None ) -> str: @@ -104,7 +115,10 @@ class KafkaPlugin(RunnablePlugin): return self.producers[srv_str] def _get_consumer( - self, host: Optional[str] = None, port: Optional[int] = None, **kwargs + self, + host: Optional[str] = None, + port: Optional[int] = None, + **kwargs, ): srv_str = self._get_srv_str(host, port) with self._consumers_locks[srv_str]: @@ -112,6 +126,7 @@ class KafkaPlugin(RunnablePlugin): self.consumers[srv_str] = KafkaConsumer( bootstrap_servers=srv_str, **kwargs ) + return self.consumers[srv_str] def _on_msg(self, record, host: str, port: int): @@ -125,8 +140,24 @@ class KafkaPlugin(RunnablePlugin): except (TypeError, ValueError): pass + try: + key = record.key.decode() + except Exception: + key = record.key + self._bus.post( - KafkaMessageEvent(msg=msg, topic=record.topic, host=host, port=port) + KafkaMessageEvent( + msg=msg, + topic=record.topic, + host=host, + port=port, + partition=record.partition, + offset=record.offset, + timestamp=float(record.timestamp / 1000) if record.timestamp else None, + key=key, + headers=record.headers, + size=record.serialized_value_size, + ) ) def _consumer_monitor(self, consumer: KafkaConsumer, host: str, port: int): @@ -149,10 +180,23 @@ class KafkaPlugin(RunnablePlugin): self.wait_stop(self._conn_retry_secs) @action - def publish(self, msg: Union[str, list, dict, tuple, bytes], topic: str, **kwargs): + def publish( + self, + msg: Union[str, list, dict, tuple, bytes], + topic: str, + key: Optional[str] = None, + timeout: Optional[float] = 60.0, + compression_type: Optional[str] = None, + **kwargs, + ): """ :param msg: Message to send. :param topic: Topic to send the message to. + :param key: For hashed-based partitioning, the key to use to determine + the partition. + :param timeout: Timeout in seconds for the message to be sent. + :param compression_type: Compression type to use for the message (e.g. + ``gzip``). :param kwargs: Additional arguments to pass to the KafkaConsumer, including ``host`` and ``port``. """ @@ -163,9 +207,10 @@ class KafkaPlugin(RunnablePlugin): if not isinstance(msg, bytes): msg = str(msg).encode() - producer = self._get_producer(**kwargs) - producer.send(topic, msg) - producer.flush() + encoded_key = key.encode() if key else None + producer = self._get_producer(compression_type=compression_type, **kwargs) + future = producer.send(topic, key=encoded_key, value=msg) + future.get(timeout=timeout) @action def send_message( @@ -177,14 +222,17 @@ class KafkaPlugin(RunnablePlugin): return self.send_message(msg=msg, topic=topic, **kwargs) @action - def subscribe(self, topic: str, **kwargs): + def subscribe(self, topic: str, group_id: Optional[str] = None, **kwargs): """ Subscribe to a topic. :param topic: Topic to subscribe to. + :param group_id: Group ID to use for the consumer. If None, then the + group ID from the plugin configuration will be used. :param kwargs: Additional arguments to pass to the KafkaConsumer, including ``host`` and ``port``. """ + kwargs['group_id'] = kwargs.get('group_id', group_id or self.group_id) consumer = self._get_consumer(**kwargs) consumer.subscribe([topic]) @@ -201,16 +249,16 @@ class KafkaPlugin(RunnablePlugin): def main(self): for listener in self._listeners: - host = listener.get('host', self.host) - port = listener.get('port', self.port) - topics = listener.get('topics') + host = listener['host'] + port = listener['port'] + topics = listener['topics'] if not topics: continue consumer = self._get_consumer( host=host, port=port, - group_id='platypush', + group_id=self.group_id, auto_offset_reset='earliest', )