forked from platypush/platypush
#11 Kafka topics are now in the format topic.device_id so messages aren't broadcast to all the listeners of the same topic
This commit is contained in:
parent
c62a1a2bd3
commit
bf49fb8af3
3 changed files with 11 additions and 7 deletions
|
@ -163,7 +163,7 @@ def get_default_pusher_backend(config):
|
|||
|
||||
def get_device_id():
|
||||
global config
|
||||
return config['device_id']
|
||||
return config['device_id'] if 'device_id' in config else None
|
||||
|
||||
|
||||
def main():
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
import logging
|
||||
import socket
|
||||
import platypush
|
||||
|
||||
from threading import Thread
|
||||
|
@ -29,6 +30,7 @@ class Backend(Thread):
|
|||
self.on_init = on_init
|
||||
self.on_close = on_close
|
||||
self.on_error = on_error
|
||||
self.device_id = platypush.get_device_id() or socket.gethostname()
|
||||
|
||||
Thread.__init__(self)
|
||||
logging.basicConfig(level=logging.INFO
|
||||
|
@ -48,7 +50,7 @@ class Backend(Thread):
|
|||
return # No target
|
||||
|
||||
target = msg.pop('target')
|
||||
if target != platypush.get_device_id() and not self.is_local():
|
||||
if target != self.device_id and not self.is_local():
|
||||
return # Not for me
|
||||
|
||||
if 'action' not in msg:
|
||||
|
|
|
@ -8,7 +8,8 @@ from .. import Backend
|
|||
class KafkaBackend(Backend):
|
||||
def _init(self, server, topic):
|
||||
self.server = server
|
||||
self.topic = topic
|
||||
self.topic_prefix = topic
|
||||
self.topic = topic + '.' + self.device_id
|
||||
self.producer = None
|
||||
|
||||
def _on_record(self, record):
|
||||
|
@ -27,16 +28,17 @@ class KafkaBackend(Backend):
|
|||
self.producer = KafkaProducer(bootstrap_servers=self.server)
|
||||
|
||||
def send_msg(self, msg):
|
||||
if isinstance(msg, dict):
|
||||
msg = json.dumps(msg)
|
||||
if isinstance(msg, str):
|
||||
msg = msg.encode('utf-8')
|
||||
msg = json.loads(msg)
|
||||
if isinstance(msg, dict):
|
||||
target = msg['target']
|
||||
msg = json.dumps(msg).encode('utf-8')
|
||||
if not isinstance(msg, bytes):
|
||||
msg = json.dumps(msg)
|
||||
raise RuntimeError('Invalid non-JSON message')
|
||||
|
||||
self._init_producer()
|
||||
self.producer.send(self.topic, msg)
|
||||
self.producer.send(self.topic_prefix + '.' + target, msg)
|
||||
self.producer.flush()
|
||||
|
||||
def run(self):
|
||||
|
|
Loading…
Reference in a new issue