From bf49fb8af39a203d12a6576e9c86adc560bf8016 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Wed, 13 Dec 2017 01:07:46 +0100 Subject: [PATCH] #11 Kafka topics are now in the format topic.device_id so messages aren't broadcast to all the listeners of the same topic --- platypush/__init__.py | 2 +- platypush/backend/__init__.py | 4 +++- platypush/backend/kafka/__init__.py | 12 +++++++----- 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/platypush/__init__.py b/platypush/__init__.py index e9fa0b12c6..e2eb2dbc34 100644 --- a/platypush/__init__.py +++ b/platypush/__init__.py @@ -163,7 +163,7 @@ def get_default_pusher_backend(config): def get_device_id(): global config - return config['device_id'] + return config['device_id'] if 'device_id' in config else None def main(): diff --git a/platypush/backend/__init__.py b/platypush/backend/__init__.py index 47f34b6d59..bdf564e7c4 100644 --- a/platypush/backend/__init__.py +++ b/platypush/backend/__init__.py @@ -1,4 +1,5 @@ import logging +import socket import platypush from threading import Thread @@ -29,6 +30,7 @@ class Backend(Thread): self.on_init = on_init self.on_close = on_close self.on_error = on_error + self.device_id = platypush.get_device_id() or socket.gethostname() Thread.__init__(self) logging.basicConfig(level=logging.INFO @@ -48,7 +50,7 @@ class Backend(Thread): return # No target target = msg.pop('target') - if target != platypush.get_device_id() and not self.is_local(): + if target != self.device_id and not self.is_local(): return # Not for me if 'action' not in msg: diff --git a/platypush/backend/kafka/__init__.py b/platypush/backend/kafka/__init__.py index 9ce04be082..c494351e5f 100644 --- a/platypush/backend/kafka/__init__.py +++ b/platypush/backend/kafka/__init__.py @@ -8,7 +8,8 @@ from .. import Backend class KafkaBackend(Backend): def _init(self, server, topic): self.server = server - self.topic = topic + self.topic_prefix = topic + self.topic = topic + '.' + self.device_id self.producer = None def _on_record(self, record): @@ -27,16 +28,17 @@ class KafkaBackend(Backend): self.producer = KafkaProducer(bootstrap_servers=self.server) def send_msg(self, msg): - if isinstance(msg, dict): - msg = json.dumps(msg) if isinstance(msg, str): - msg = msg.encode('utf-8') + msg = json.loads(msg) + if isinstance(msg, dict): + target = msg['target'] + msg = json.dumps(msg).encode('utf-8') if not isinstance(msg, bytes): msg = json.dumps(msg) raise RuntimeError('Invalid non-JSON message') self._init_producer() - self.producer.send(self.topic, msg) + self.producer.send(self.topic_prefix + '.' + target, msg) self.producer.flush() def run(self):