forked from platypush/platypush
Doing proper message encoding on kafka producer
This commit is contained in:
parent
9a868fc875
commit
83cbbb26ac
1 changed files with 6 additions and 5 deletions
|
@ -9,6 +9,7 @@ class KafkaBackend(Backend):
|
||||||
def _init(self, server, topic):
|
def _init(self, server, topic):
|
||||||
self.server = server
|
self.server = server
|
||||||
self.topic = topic
|
self.topic = topic
|
||||||
|
self.producer = None
|
||||||
|
|
||||||
def _on_record(self, record):
|
def _on_record(self, record):
|
||||||
if record.topic != self.topic: return
|
if record.topic != self.topic: return
|
||||||
|
@ -26,18 +27,18 @@ class KafkaBackend(Backend):
|
||||||
self.producer = KafkaProducer(bootstrap_servers=self.server)
|
self.producer = KafkaProducer(bootstrap_servers=self.server)
|
||||||
|
|
||||||
def send_msg(self, msg):
|
def send_msg(self, msg):
|
||||||
if isinstance(msg, bytes):
|
if isinstance(msg, dict):
|
||||||
msg = msg.encode('utf-8')
|
msg = json.dumps(msg)
|
||||||
if isinstance(msg, str):
|
if isinstance(msg, str):
|
||||||
|
msg = msg.encode('utf-8')
|
||||||
|
if not isinstance(msg, bytes):
|
||||||
msg = json.dumps(msg)
|
msg = json.dumps(msg)
|
||||||
if not isinstance(msg, dict):
|
|
||||||
raise RuntimeError('Invalid non-JSON message')
|
raise RuntimeError('Invalid non-JSON message')
|
||||||
|
|
||||||
self._init_producer()
|
self._init_producer()
|
||||||
self.producer.send(self.topic, msg)
|
self.producer.send(self.topic, msg)
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
self.producer = None
|
|
||||||
self.consumer = KafkaConsumer(self.topic, bootstrap_servers=self.server)
|
self.consumer = KafkaConsumer(self.topic, bootstrap_servers=self.server)
|
||||||
for msg in self.consumer:
|
for msg in self.consumer:
|
||||||
self._on_record(msg)
|
self._on_record(msg)
|
||||||
|
|
Loading…
Reference in a new issue