#6: Made a more general purpose backend infrastructure

This commit is contained in:
Fabio Manganiello 2017-11-09 05:04:48 +01:00
parent 4d636386bf
commit 661ff9a0b8
6 changed files with 178 additions and 79 deletions

View file

@ -1,5 +1,3 @@
#!/usr/bin/env python
import functools import functools
import importlib import importlib
import os import os
@ -12,6 +10,7 @@ import time
import websocket import websocket
import yaml import yaml
from queue import Queue
from threading import Thread from threading import Thread
from getopt import getopt from getopt import getopt
@ -24,18 +23,6 @@ modules = {}
wrkdir = os.path.dirname(os.path.realpath(__file__)) wrkdir = os.path.dirname(os.path.realpath(__file__))
def on_open(ws):
logging.info('Connection opened')
def on_close(ws):
logging.info('Connection closed')
def on_error(ws, error):
logging.error(error)
def _init_plugin(plugin_name, reload=False): def _init_plugin(plugin_name, reload=False):
global modules global modules
global config global config
@ -44,7 +31,6 @@ def _init_plugin(plugin_name, reload=False):
return modules[plugin_name] return modules[plugin_name]
try: try:
logging.warn(__package__ + '.plugins.' + plugin_name)
module = importlib.import_module(__package__ + '.plugins.' + plugin_name) module = importlib.import_module(__package__ + '.plugins.' + plugin_name)
except ModuleNotFoundError as e: except ModuleNotFoundError as e:
logging.warn('No such plugin: {}'.format(plugin_name)) logging.warn('No such plugin: {}'.format(plugin_name))
@ -70,17 +56,6 @@ def _init_plugin(plugin_name, reload=False):
def _exec_func(args, retry=True): def _exec_func(args, retry=True):
args = json.loads(args) \
if isinstance(args, str) \
else args
if 'action' not in args:
logging.warn('No action specified')
return
if 'target' in args:
args.pop('target')
action = args.pop('action') action = args.pop('action')
tokens = action.split('.') tokens = action.split('.')
module_name = str.join('.', tokens[:-1]) module_name = str.join('.', tokens[:-1])
@ -89,6 +64,7 @@ def _exec_func(args, retry=True):
try: try:
plugin = _init_plugin(module_name) plugin = _init_plugin(module_name)
except RuntimeError as e: # Module/class not found except RuntimeError as e: # Module/class not found
logging.exception(e)
return return
try: try:
@ -118,42 +94,8 @@ def _exec_func(args, retry=True):
_exec_func(args, retry=False) _exec_func(args, retry=False)
def _on_push(ws, data): def on_msg(msg):
global config Thread(target=_exec_func, args=(msg,)).start()
data = json.loads(data)
if data['type'] == 'tickle' and data['subtype'] == 'push':
logging.debug('Received push tickle')
return
if data['type'] != 'push':
return # Not a push notification
push = data['push']
logging.debug('Received push: {}'.format(push))
if 'body' not in push:
return
body = push['body']
try:
body = json.loads(body)
except ValueError as e:
return
if 'target' not in body or body['target'] != config['device_id']:
return # Not for me
logging.info('Received push addressed to me: {}'.format(body))
thread = Thread(target=_exec_func, args=(body,))
thread.start()
def on_push(ws, data):
try:
_on_push(ws, data)
except Exception as e:
on_error(ws, e)
def parse_config_file(config_file=None): def parse_config_file(config_file=None):
@ -181,6 +123,35 @@ def parse_config_file(config_file=None):
return config return config
def get_backends(config):
backends = []
for k in config.keys():
if k.startswith('backend.') and (
'disabled' not in config[k] or not config[k]['disabled']):
module = importlib.import_module(__package__ + '.' + k)
# e.g. backend.pushbullet main class: PushbulletBackend
cls_name = functools.reduce(
lambda a,b: a.title() + b.title(),
(module.__name__.title().split('.')[2:])
) + 'Backend'
try:
b = getattr(module, cls_name)(config[k])
backends.append(b)
except AttributeError as e:
logging.warn('No such class in {}: {}'.format(
module.__name__, cls_name))
raise RuntimeError(e)
return backends
def get_device_id():
global config
return config['device_id']
def main(): def main():
DEBUG = False DEBUG = False
config_file = None config_file = None
@ -204,6 +175,7 @@ Usage: {} [-v] [-h] [-c <config_file>]
return return
config = parse_config_file(config_file) config = parse_config_file(config_file)
logging.info('Configuration dump: {}'.format(config))
if 'device_id' not in config: if 'device_id' not in config:
config['device_id'] = socket.gethostname() config['device_id'] = socket.gethostname()
@ -217,18 +189,21 @@ Usage: {} [-v] [-h] [-c <config_file>]
else: else:
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
ws = websocket.WebSocketApp('wss://stream.pushbullet.com/websocket/' + mq = Queue()
config['pushbullet']['token'], backends = get_backends(config)
on_message = on_push,
on_error = on_error,
on_close = on_close)
ws.on_open = on_open
ws.run_forever()
for backend in backends:
backend.mq = mq
backend.start()
while True:
try:
on_msg(mq.get())
except KeyboardInterrupt:
return
if __name__ == '__main__': if __name__ == '__main__':
main() main()
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et:

View file

@ -0,0 +1,60 @@
import logging
import runbullet
from threading import Thread
def _default_on_init(backend):
logging.info('Backend {} initialized'.format(backend.__module__))
def _default_on_close(backend):
logging.info('Backend {} terminated'.format(backend.__module__))
def _default_on_msg(backend, msg):
logging.info('Received message: {}'.format(msg))
def _default_on_error(backend, error):
logging.error(error)
class Backend(Thread):
def __init__(self, config, mq = None,
on_init = _default_on_init,
on_close = _default_on_close,
on_error = _default_on_error):
self.config = config
self.mq = mq
self.on_init = on_init
self.on_close = on_close
self.on_error = on_error
Thread.__init__(self)
logging.basicConfig(level=logging.INFO
if 'logging' not in config
else getattr(logging, config.pop('logging')))
for cls in reversed(self.__class__.mro()):
if cls is not object and hasattr(cls, '_init'):
cls._init(self, **config)
def on_msg(self, msg):
if 'target' not in msg:
return # No target
target = msg.pop('target')
if target != runbullet.get_device_id():
return # Not for me
if 'action' not in msg:
self.on_error('No action specified: {}'.format(msg))
return
self.mq.put(msg)
def run(self):
raise NotImplementedError()
# vim:sw=4:ts=4:et:

View file

@ -0,0 +1,59 @@
import logging
import json
import websocket
from .. import Backend
class PushbulletBackend(Backend):
def _init(self, token):
self.token = token
@staticmethod
def _on_init(ws):
logging.info('Connection opened')
@staticmethod
def _on_close(ws):
logging.info('Connection closed')
@staticmethod
def _on_msg(ws, msg):
ws.backend._on_push(msg)
@staticmethod
def _on_error(ws, e):
logging.exception(e)
def _on_push(self, data):
data = json.loads(data) if isinstance(data, str) else push
if data['type'] != 'push':
return # Not a push notification
push = data['push']
logging.debug('Received push: {}'.format(push))
if 'body' not in push:
return
body = push['body']
try:
body = json.loads(body)
except ValueError as e:
return
self.on_msg(body)
def run(self):
self.ws = websocket.WebSocketApp(
'wss://stream.pushbullet.com/websocket/' + self.token,
on_open = self._on_init,
on_message = self._on_msg,
on_error = self._on_error,
on_close = self._on_close)
self.ws.backend = self
self.ws.run_forever()
# vim:sw=4:ts=4:et:

View file

@ -21,18 +21,18 @@ def print_usage():
def main(): def main():
config = parse_config_file() config = parse_config_file()
API_KEY = config['pushbullet']['token'] API_KEY = config['backend.pushbullet']['token']
pb = Pushbullet(API_KEY) pb = Pushbullet(API_KEY)
devices = [ devices = [
_ for _ in pb.devices if _.nickname == config['pushbullet']['device'] _ for _ in pb.devices if _.nickname == config['backend.pushbullet']['device']
] ]
if len(devices) > 0: if len(devices) > 0:
device = devices[0] device = devices[0]
else: else:
print('Device {} not found - please create a virtual device on ' + print('Device {} not found - please create a virtual device on ' +
'your PushBullet account'.format(config['pushbullet']['device'])) 'your PushBullet account'.format(config['backend.pushbullet']['device']))
return return
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()

View file

@ -1,6 +1,8 @@
pushbullet: backend.pushbullet:
disabled: False
logging: DEBUG
token: your_pushbullet_token_here token: your_pushbullet_token_here
device: your_pushbullet_device_here # Virtual PushBullet device linked to runbullet device: turing
# device_id: <your_device_id> (default: current hostname) # device_id: <your_device_id> (default: current hostname)
# debug: True (default: False) # debug: True (default: False)

View file

@ -2,7 +2,7 @@
import errno import errno
import os import os
from setuptools import setup from setuptools import setup, find_packages
def read(fname): def read(fname):
return open(os.path.join(os.path.dirname(__file__), fname)).read() return open(os.path.join(os.path.dirname(__file__), fname)).read()
@ -27,6 +27,7 @@ def create_etc_dir():
raise raise
plugins = pkg_files('runbullet/plugins') plugins = pkg_files('runbullet/plugins')
backend = pkg_files('runbullet/backend')
create_etc_dir() create_etc_dir()
setup( setup(
@ -39,8 +40,9 @@ setup(
python_requires = '>= 3', python_requires = '>= 3',
keywords = "pushbullet notifications automation", keywords = "pushbullet notifications automation",
url = "https://github.com/BlackLight/runbullet", url = "https://github.com/BlackLight/runbullet",
packages = ['runbullet'], # packages = ['runbullet'],
package_data = {'': plugins}, packages = find_packages(),
# package_data = {'': plugins},
scripts = ['runbullet/bin/pusher'], scripts = ['runbullet/bin/pusher'],
entry_points = { entry_points = {
'console_scripts': [ 'console_scripts': [
@ -57,7 +59,8 @@ setup(
"Development Status :: 3 - Alpha", "Development Status :: 3 - Alpha",
], ],
install_requires = [ install_requires = [
'pyyaml' 'pyyaml',
'websocket-client',
], ],
) )