48 lines
1.3 KiB
Python
48 lines
1.3 KiB
Python
import logging
|
|
import json
|
|
|
|
from kafka import KafkaConsumer, KafkaProducer
|
|
|
|
from .. import Backend
|
|
|
|
class KafkaBackend(Backend):
|
|
def _init(self, server, topic):
|
|
self.server = server
|
|
self.topic = topic
|
|
self.producer = None
|
|
|
|
def _on_record(self, record):
|
|
if record.topic != self.topic: return
|
|
|
|
try:
|
|
msg = json.loads(record.value.decode('utf-8'))
|
|
except Exception as e:
|
|
logging.exception(e)
|
|
|
|
logging.debug('Received message: {}'.format(msg))
|
|
self.on_msg(msg)
|
|
|
|
def _init_producer(self):
|
|
if not self.producer:
|
|
self.producer = KafkaProducer(bootstrap_servers=self.server)
|
|
|
|
def send_msg(self, msg):
|
|
if isinstance(msg, dict):
|
|
msg = json.dumps(msg)
|
|
if isinstance(msg, str):
|
|
msg = msg.encode('utf-8')
|
|
if not isinstance(msg, bytes):
|
|
msg = json.dumps(msg)
|
|
raise RuntimeError('Invalid non-JSON message')
|
|
|
|
self._init_producer()
|
|
self.producer.send(self.topic, msg)
|
|
self.producer.flush()
|
|
|
|
def run(self):
|
|
self.consumer = KafkaConsumer(self.topic, bootstrap_servers=self.server)
|
|
for msg in self.consumer:
|
|
self._on_record(msg)
|
|
|
|
# vim:sw=4:ts=4:et:
|
|
|