From 83cbbb26ac63e81e04f673825bb5ebd512c1d390 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Wed, 29 Nov 2017 03:12:50 +0100 Subject: [PATCH] Doing proper message encoding on kafka producer --- runbullet/backend/kafka/__init__.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/runbullet/backend/kafka/__init__.py b/runbullet/backend/kafka/__init__.py index bf2ba45b..0cfd797e 100644 --- a/runbullet/backend/kafka/__init__.py +++ b/runbullet/backend/kafka/__init__.py @@ -9,6 +9,7 @@ class KafkaBackend(Backend): def _init(self, server, topic): self.server = server self.topic = topic + self.producer = None def _on_record(self, record): if record.topic != self.topic: return @@ -26,18 +27,18 @@ class KafkaBackend(Backend): self.producer = KafkaProducer(bootstrap_servers=self.server) def send_msg(self, msg): - if isinstance(msg, bytes): - msg = msg.encode('utf-8') - if isinstance(msg, str): + if isinstance(msg, dict): + msg = json.dumps(msg) + if isinstance(msg, str): + msg = msg.encode('utf-8') + if not isinstance(msg, bytes): msg = json.dumps(msg) - if not isinstance(msg, dict): raise RuntimeError('Invalid non-JSON message') self._init_producer() self.producer.send(self.topic, msg) def run(self): - self.producer = None self.consumer = KafkaConsumer(self.topic, bootstrap_servers=self.server) for msg in self.consumer: self._on_record(msg)