diff --git a/runbullet/backend/kafka/__init__.py b/runbullet/backend/kafka/__init__.py index bf2ba45ba4..0cfd797e03 100644 --- a/runbullet/backend/kafka/__init__.py +++ b/runbullet/backend/kafka/__init__.py @@ -9,6 +9,7 @@ class KafkaBackend(Backend): def _init(self, server, topic): self.server = server self.topic = topic + self.producer = None def _on_record(self, record): if record.topic != self.topic: return @@ -26,18 +27,18 @@ class KafkaBackend(Backend): self.producer = KafkaProducer(bootstrap_servers=self.server) def send_msg(self, msg): - if isinstance(msg, bytes): - msg = msg.encode('utf-8') - if isinstance(msg, str): + if isinstance(msg, dict): + msg = json.dumps(msg) + if isinstance(msg, str): + msg = msg.encode('utf-8') + if not isinstance(msg, bytes): msg = json.dumps(msg) - if not isinstance(msg, dict): raise RuntimeError('Invalid non-JSON message') self._init_producer() self.producer.send(self.topic, msg) def run(self): - self.producer = None self.consumer = KafkaConsumer(self.topic, bootstrap_servers=self.server) for msg in self.consumer: self._on_record(msg)