From 661ff9a0b83756c651b9a7519e637a0cf6841729 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Thu, 9 Nov 2017 05:04:48 +0100 Subject: [PATCH] #6: Made a more general purpose backend infrastructure --- runbullet/__init__.py | 115 +++++++++-------------- runbullet/backend/__init__.py | 60 ++++++++++++ runbullet/backend/pushbullet/__init__.py | 59 ++++++++++++ runbullet/bin/pusher | 6 +- runbullet/config.example.yaml | 6 +- setup.py | 11 ++- 6 files changed, 178 insertions(+), 79 deletions(-) create mode 100644 runbullet/backend/__init__.py create mode 100644 runbullet/backend/pushbullet/__init__.py diff --git a/runbullet/__init__.py b/runbullet/__init__.py index f17a5df314..4e6a89c1ad 100644 --- a/runbullet/__init__.py +++ b/runbullet/__init__.py @@ -1,5 +1,3 @@ -#!/usr/bin/env python - import functools import importlib import os @@ -12,6 +10,7 @@ import time import websocket import yaml +from queue import Queue from threading import Thread from getopt import getopt @@ -24,18 +23,6 @@ modules = {} wrkdir = os.path.dirname(os.path.realpath(__file__)) -def on_open(ws): - logging.info('Connection opened') - - -def on_close(ws): - logging.info('Connection closed') - - -def on_error(ws, error): - logging.error(error) - - def _init_plugin(plugin_name, reload=False): global modules global config @@ -44,7 +31,6 @@ def _init_plugin(plugin_name, reload=False): return modules[plugin_name] try: - logging.warn(__package__ + '.plugins.' + plugin_name) module = importlib.import_module(__package__ + '.plugins.' + plugin_name) except ModuleNotFoundError as e: logging.warn('No such plugin: {}'.format(plugin_name)) @@ -70,17 +56,6 @@ def _init_plugin(plugin_name, reload=False): def _exec_func(args, retry=True): - args = json.loads(args) \ - if isinstance(args, str) \ - else args - - if 'action' not in args: - logging.warn('No action specified') - return - - if 'target' in args: - args.pop('target') - action = args.pop('action') tokens = action.split('.') module_name = str.join('.', tokens[:-1]) @@ -89,6 +64,7 @@ def _exec_func(args, retry=True): try: plugin = _init_plugin(module_name) except RuntimeError as e: # Module/class not found + logging.exception(e) return try: @@ -118,42 +94,8 @@ def _exec_func(args, retry=True): _exec_func(args, retry=False) -def _on_push(ws, data): - global config - - data = json.loads(data) - if data['type'] == 'tickle' and data['subtype'] == 'push': - logging.debug('Received push tickle') - return - - if data['type'] != 'push': - return # Not a push notification - - push = data['push'] - logging.debug('Received push: {}'.format(push)) - - if 'body' not in push: - return - - body = push['body'] - try: - body = json.loads(body) - except ValueError as e: - return - - if 'target' not in body or body['target'] != config['device_id']: - return # Not for me - - logging.info('Received push addressed to me: {}'.format(body)) - - thread = Thread(target=_exec_func, args=(body,)) - thread.start() - -def on_push(ws, data): - try: - _on_push(ws, data) - except Exception as e: - on_error(ws, e) +def on_msg(msg): + Thread(target=_exec_func, args=(msg,)).start() def parse_config_file(config_file=None): @@ -181,6 +123,35 @@ def parse_config_file(config_file=None): return config +def get_backends(config): + backends = [] + + for k in config.keys(): + if k.startswith('backend.') and ( + 'disabled' not in config[k] or not config[k]['disabled']): + module = importlib.import_module(__package__ + '.' + k) + + # e.g. backend.pushbullet main class: PushbulletBackend + cls_name = functools.reduce( + lambda a,b: a.title() + b.title(), + (module.__name__.title().split('.')[2:]) + ) + 'Backend' + + try: + b = getattr(module, cls_name)(config[k]) + backends.append(b) + except AttributeError as e: + logging.warn('No such class in {}: {}'.format( + module.__name__, cls_name)) + raise RuntimeError(e) + + return backends + +def get_device_id(): + global config + return config['device_id'] + + def main(): DEBUG = False config_file = None @@ -204,6 +175,7 @@ Usage: {} [-v] [-h] [-c ] return config = parse_config_file(config_file) + logging.info('Configuration dump: {}'.format(config)) if 'device_id' not in config: config['device_id'] = socket.gethostname() @@ -217,18 +189,21 @@ Usage: {} [-v] [-h] [-c ] else: logging.basicConfig(level=logging.INFO) - ws = websocket.WebSocketApp('wss://stream.pushbullet.com/websocket/' + - config['pushbullet']['token'], - on_message = on_push, - on_error = on_error, - on_close = on_close) - ws.on_open = on_open - ws.run_forever() + mq = Queue() + backends = get_backends(config) + for backend in backends: + backend.mq = mq + backend.start() + + while True: + try: + on_msg(mq.get()) + except KeyboardInterrupt: + return if __name__ == '__main__': main() - # vim:sw=4:ts=4:et: diff --git a/runbullet/backend/__init__.py b/runbullet/backend/__init__.py new file mode 100644 index 0000000000..9f7e9b1078 --- /dev/null +++ b/runbullet/backend/__init__.py @@ -0,0 +1,60 @@ +import logging +import runbullet + +from threading import Thread + +def _default_on_init(backend): + logging.info('Backend {} initialized'.format(backend.__module__)) + + +def _default_on_close(backend): + logging.info('Backend {} terminated'.format(backend.__module__)) + + +def _default_on_msg(backend, msg): + logging.info('Received message: {}'.format(msg)) + + +def _default_on_error(backend, error): + logging.error(error) + + +class Backend(Thread): + def __init__(self, config, mq = None, + on_init = _default_on_init, + on_close = _default_on_close, + on_error = _default_on_error): + self.config = config + self.mq = mq + self.on_init = on_init + self.on_close = on_close + self.on_error = on_error + + Thread.__init__(self) + logging.basicConfig(level=logging.INFO + if 'logging' not in config + else getattr(logging, config.pop('logging'))) + + for cls in reversed(self.__class__.mro()): + if cls is not object and hasattr(cls, '_init'): + cls._init(self, **config) + + def on_msg(self, msg): + if 'target' not in msg: + return # No target + + target = msg.pop('target') + if target != runbullet.get_device_id(): + return # Not for me + + if 'action' not in msg: + self.on_error('No action specified: {}'.format(msg)) + return + + self.mq.put(msg) + + def run(self): + raise NotImplementedError() + +# vim:sw=4:ts=4:et: + diff --git a/runbullet/backend/pushbullet/__init__.py b/runbullet/backend/pushbullet/__init__.py new file mode 100644 index 0000000000..a55eb4a3d6 --- /dev/null +++ b/runbullet/backend/pushbullet/__init__.py @@ -0,0 +1,59 @@ +import logging +import json +import websocket + +from .. import Backend + +class PushbulletBackend(Backend): + def _init(self, token): + self.token = token + + @staticmethod + def _on_init(ws): + logging.info('Connection opened') + + @staticmethod + def _on_close(ws): + logging.info('Connection closed') + + @staticmethod + def _on_msg(ws, msg): + ws.backend._on_push(msg) + + @staticmethod + def _on_error(ws, e): + logging.exception(e) + + def _on_push(self, data): + data = json.loads(data) if isinstance(data, str) else push + + if data['type'] != 'push': + return # Not a push notification + + push = data['push'] + logging.debug('Received push: {}'.format(push)) + + if 'body' not in push: + return + + body = push['body'] + try: + body = json.loads(body) + except ValueError as e: + return + + self.on_msg(body) + + def run(self): + self.ws = websocket.WebSocketApp( + 'wss://stream.pushbullet.com/websocket/' + self.token, + on_open = self._on_init, + on_message = self._on_msg, + on_error = self._on_error, + on_close = self._on_close) + + self.ws.backend = self + self.ws.run_forever() + +# vim:sw=4:ts=4:et: + diff --git a/runbullet/bin/pusher b/runbullet/bin/pusher index 2f281b9ae8..0a66e82745 100755 --- a/runbullet/bin/pusher +++ b/runbullet/bin/pusher @@ -21,18 +21,18 @@ def print_usage(): def main(): config = parse_config_file() - API_KEY = config['pushbullet']['token'] + API_KEY = config['backend.pushbullet']['token'] pb = Pushbullet(API_KEY) devices = [ - _ for _ in pb.devices if _.nickname == config['pushbullet']['device'] + _ for _ in pb.devices if _.nickname == config['backend.pushbullet']['device'] ] if len(devices) > 0: device = devices[0] else: print('Device {} not found - please create a virtual device on ' + - 'your PushBullet account'.format(config['pushbullet']['device'])) + 'your PushBullet account'.format(config['backend.pushbullet']['device'])) return parser = argparse.ArgumentParser() diff --git a/runbullet/config.example.yaml b/runbullet/config.example.yaml index e0501da348..5a845b8527 100644 --- a/runbullet/config.example.yaml +++ b/runbullet/config.example.yaml @@ -1,6 +1,8 @@ -pushbullet: +backend.pushbullet: + disabled: False + logging: DEBUG token: your_pushbullet_token_here - device: your_pushbullet_device_here # Virtual PushBullet device linked to runbullet + device: turing # device_id: (default: current hostname) # debug: True (default: False) diff --git a/setup.py b/setup.py index 8983167e22..353d6b46ec 100755 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ import errno import os -from setuptools import setup +from setuptools import setup, find_packages def read(fname): return open(os.path.join(os.path.dirname(__file__), fname)).read() @@ -27,6 +27,7 @@ def create_etc_dir(): raise plugins = pkg_files('runbullet/plugins') +backend = pkg_files('runbullet/backend') create_etc_dir() setup( @@ -39,8 +40,9 @@ setup( python_requires = '>= 3', keywords = "pushbullet notifications automation", url = "https://github.com/BlackLight/runbullet", - packages = ['runbullet'], - package_data = {'': plugins}, + # packages = ['runbullet'], + packages = find_packages(), + # package_data = {'': plugins}, scripts = ['runbullet/bin/pusher'], entry_points = { 'console_scripts': [ @@ -57,7 +59,8 @@ setup( "Development Status :: 3 - Alpha", ], install_requires = [ - 'pyyaml' + 'pyyaml', + 'websocket-client', ], )