diff --git a/runbullet/backend/__init__.py b/runbullet/backend/__init__.py index 9f7e9b10..bdb858ab 100644 --- a/runbullet/backend/__init__.py +++ b/runbullet/backend/__init__.py @@ -53,6 +53,9 @@ class Backend(Thread): self.mq.put(msg) + def send_msg(self, msg): + raise NotImplementedError() + def run(self): raise NotImplementedError() diff --git a/runbullet/backend/kafka/__init__.py b/runbullet/backend/kafka/__init__.py new file mode 100644 index 00000000..bf2ba45b --- /dev/null +++ b/runbullet/backend/kafka/__init__.py @@ -0,0 +1,46 @@ +import logging +import json + +from kafka import KafkaConsumer, KafkaProducer + +from .. import Backend + +class KafkaBackend(Backend): + def _init(self, server, topic): + self.server = server + self.topic = topic + + def _on_record(self, record): + if record.topic != self.topic: return + + try: + msg = json.loads(record.value.decode('utf-8')) + except Exception as e: + logging.exception(e) + + logging.debug('Received message: {}'.format(msg)) + self.on_msg(msg) + + def _init_producer(self): + if not self.producer: + self.producer = KafkaProducer(bootstrap_servers=self.server) + + def send_msg(self, msg): + if isinstance(msg, bytes): + msg = msg.encode('utf-8') + if isinstance(msg, str): + msg = json.dumps(msg) + if not isinstance(msg, dict): + raise RuntimeError('Invalid non-JSON message') + + self._init_producer() + self.producer.send(self.topic, msg) + + def run(self): + self.producer = None + self.consumer = KafkaConsumer(self.topic, bootstrap_servers=self.server) + for msg in self.consumer: + self._on_record(msg) + +# vim:sw=4:ts=4:et: + diff --git a/runbullet/backend/pushbullet/__init__.py b/runbullet/backend/pushbullet/__init__.py index 6019a7af..5ebef040 100644 --- a/runbullet/backend/pushbullet/__init__.py +++ b/runbullet/backend/pushbullet/__init__.py @@ -5,7 +5,7 @@ import websocket from .. import Backend class PushbulletBackend(Backend): - def _init(self, token, device=None): + def _init(self, token, device): self.token = token self.device = device @@ -28,7 +28,11 @@ class PushbulletBackend(Backend): self._init_socket() def _on_push(self, data): - data = json.loads(data) if isinstance(data, str) else push + try: + data = json.loads(data) if isinstance(data, str) else push + except Exception as e: + logging.exception(e) + return if data['type'] != 'push': return # Not a push notification diff --git a/runbullet/bin/pusher b/runbullet/bin/pusher index 0a66e827..65db3c24 100755 --- a/runbullet/bin/pusher +++ b/runbullet/bin/pusher @@ -9,6 +9,7 @@ import yaml from pushbullet import Pushbullet from runbullet import parse_config_file +from runbullet.backend.kafka import KafkaBackend def print_usage(): @@ -19,22 +20,42 @@ def print_usage(): payload:\t\tArguments to the action '''.format(sys.argv[0])) -def main(): - config = parse_config_file() - API_KEY = config['backend.pushbullet']['token'] - pb = Pushbullet(API_KEY) - +def get_pb_device_by_name(pb, name): devices = [ - _ for _ in pb.devices if _.nickname == config['backend.pushbullet']['device'] + _ for _ in pb.devices if _.nickname == name ] - if len(devices) > 0: - device = devices[0] - else: + return devices[0] if devices else None + +def send_pb_message(pb, device_name, msg): + device = get_pb_device_by_name(pb, name=device_name) + if not device: print('Device {} not found - please create a virtual device on ' + 'your PushBullet account'.format(config['backend.pushbullet']['device'])) return + pb.push_note('', json.dumps(msg), device) + +def send_kafka_message(backend, msg): + backend.send_msg(msg) + +def get_backend(config): + # TODO Refactor this as something better and reuse the same + # backend classes from the runbullet consumer module + if 'backend.pushbullet' in config \ + and 'pusher' in config['backend.pushbullet'] \ + and config['backend.pushbullet']['pusher']: + API_KEY = config['backend.pushbullet']['token'] + return Pushbullet(API_KEY) + elif 'backend.kafka' in config \ + and 'pusher' in config['backend.kafka'] \ + and config['backend.kafka']['pusher']: + c = config['backend.kafka'] + return KafkaBackend(server=c['server'], topic=c['topic']) + +def main(): + config = parse_config_file() + parser = argparse.ArgumentParser() parser.add_argument('--target', '-t', dest='target', required=True, help="Destination of the command") @@ -59,7 +80,12 @@ def main(): } print('msg: {}'.format(msg)) - pb.push_note('', json.dumps(msg), device) + + backend = get_backend(config) + if isinstance(backend, Pushbullet): + send_pb_message(backend, config['backend.pushbullet']['device'], msg) + elif isinstance(backend, Pushbullet): + send_kafka_message(backend, msg) if __name__ == '__main__': diff --git a/runbullet/config.example.yaml b/runbullet/config.example.yaml index 5a845b85..0a7eb808 100644 --- a/runbullet/config.example.yaml +++ b/runbullet/config.example.yaml @@ -1,5 +1,12 @@ -backend.pushbullet: +backend.kafka: disabled: False + pusher: True # The pusher executable will use this backend by default + logging: DEBUG + server: your_server:9092 + topic: runbullet + +backend.pushbullet: + disabled: True logging: DEBUG token: your_pushbullet_token_here device: turing