- Implemented Kafka backend, as a variant of #6

- pusher changed to use either Pushbullet or Kafka as a backend
(whatever is specified with pusher=True in the main config file). Still
a lot of dirty code to refactor there tho.
This commit is contained in:
Fabio Manganiello 2017-11-29 02:42:36 +01:00
parent fcc136ae18
commit 57519ed114
5 changed files with 99 additions and 13 deletions

View file

@ -53,6 +53,9 @@ class Backend(Thread):
self.mq.put(msg) self.mq.put(msg)
def send_msg(self, msg):
raise NotImplementedError()
def run(self): def run(self):
raise NotImplementedError() raise NotImplementedError()

View file

@ -0,0 +1,46 @@
import logging
import json
from kafka import KafkaConsumer, KafkaProducer
from .. import Backend
class KafkaBackend(Backend):
def _init(self, server, topic):
self.server = server
self.topic = topic
def _on_record(self, record):
if record.topic != self.topic: return
try:
msg = json.loads(record.value.decode('utf-8'))
except Exception as e:
logging.exception(e)
logging.debug('Received message: {}'.format(msg))
self.on_msg(msg)
def _init_producer(self):
if not self.producer:
self.producer = KafkaProducer(bootstrap_servers=self.server)
def send_msg(self, msg):
if isinstance(msg, bytes):
msg = msg.encode('utf-8')
if isinstance(msg, str):
msg = json.dumps(msg)
if not isinstance(msg, dict):
raise RuntimeError('Invalid non-JSON message')
self._init_producer()
self.producer.send(self.topic, msg)
def run(self):
self.producer = None
self.consumer = KafkaConsumer(self.topic, bootstrap_servers=self.server)
for msg in self.consumer:
self._on_record(msg)
# vim:sw=4:ts=4:et:

View file

@ -5,7 +5,7 @@ import websocket
from .. import Backend from .. import Backend
class PushbulletBackend(Backend): class PushbulletBackend(Backend):
def _init(self, token, device=None): def _init(self, token, device):
self.token = token self.token = token
self.device = device self.device = device
@ -28,7 +28,11 @@ class PushbulletBackend(Backend):
self._init_socket() self._init_socket()
def _on_push(self, data): def _on_push(self, data):
try:
data = json.loads(data) if isinstance(data, str) else push data = json.loads(data) if isinstance(data, str) else push
except Exception as e:
logging.exception(e)
return
if data['type'] != 'push': if data['type'] != 'push':
return # Not a push notification return # Not a push notification

View file

@ -9,6 +9,7 @@ import yaml
from pushbullet import Pushbullet from pushbullet import Pushbullet
from runbullet import parse_config_file from runbullet import parse_config_file
from runbullet.backend.kafka import KafkaBackend
def print_usage(): def print_usage():
@ -19,22 +20,42 @@ def print_usage():
payload:\t\tArguments to the action payload:\t\tArguments to the action
'''.format(sys.argv[0])) '''.format(sys.argv[0]))
def main(): def get_pb_device_by_name(pb, name):
config = parse_config_file()
API_KEY = config['backend.pushbullet']['token']
pb = Pushbullet(API_KEY)
devices = [ devices = [
_ for _ in pb.devices if _.nickname == config['backend.pushbullet']['device'] _ for _ in pb.devices if _.nickname == name
] ]
if len(devices) > 0: return devices[0] if devices else None
device = devices[0]
else: def send_pb_message(pb, device_name, msg):
device = get_pb_device_by_name(pb, name=device_name)
if not device:
print('Device {} not found - please create a virtual device on ' + print('Device {} not found - please create a virtual device on ' +
'your PushBullet account'.format(config['backend.pushbullet']['device'])) 'your PushBullet account'.format(config['backend.pushbullet']['device']))
return return
pb.push_note('', json.dumps(msg), device)
def send_kafka_message(backend, msg):
backend.send_msg(msg)
def get_backend(config):
# TODO Refactor this as something better and reuse the same
# backend classes from the runbullet consumer module
if 'backend.pushbullet' in config \
and 'pusher' in config['backend.pushbullet'] \
and config['backend.pushbullet']['pusher']:
API_KEY = config['backend.pushbullet']['token']
return Pushbullet(API_KEY)
elif 'backend.kafka' in config \
and 'pusher' in config['backend.kafka'] \
and config['backend.kafka']['pusher']:
c = config['backend.kafka']
return KafkaBackend(server=c['server'], topic=c['topic'])
def main():
config = parse_config_file()
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument('--target', '-t', dest='target', required=True, parser.add_argument('--target', '-t', dest='target', required=True,
help="Destination of the command") help="Destination of the command")
@ -59,7 +80,12 @@ def main():
} }
print('msg: {}'.format(msg)) print('msg: {}'.format(msg))
pb.push_note('', json.dumps(msg), device)
backend = get_backend(config)
if isinstance(backend, Pushbullet):
send_pb_message(backend, config['backend.pushbullet']['device'], msg)
elif isinstance(backend, Pushbullet):
send_kafka_message(backend, msg)
if __name__ == '__main__': if __name__ == '__main__':

View file

@ -1,5 +1,12 @@
backend.pushbullet: backend.kafka:
disabled: False disabled: False
pusher: True # The pusher executable will use this backend by default
logging: DEBUG
server: your_server:9092
topic: runbullet
backend.pushbullet:
disabled: True
logging: DEBUG logging: DEBUG
token: your_pushbullet_token_here token: your_pushbullet_token_here
device: turing device: turing