Major refactoring.

Solves, among the others, #2, #18 and #22
This commit is contained in:
Fabio Manganiello 2017-12-17 16:15:44 +01:00
parent cb423dab03
commit 339e7b73a5
9 changed files with 385 additions and 154 deletions

View file

@ -13,6 +13,7 @@ from queue import Queue
from threading import Thread from threading import Thread
from getopt import getopt from getopt import getopt
from .message.request import Request
from .message.response import Response from .message.response import Response
__author__ = 'Fabio Manganiello <info@fabiomanganiello.com>' __author__ = 'Fabio Manganiello <info@fabiomanganiello.com>'
@ -57,11 +58,8 @@ def _init_plugin(plugin_name, reload=False):
return plugin return plugin
def _exec_func(args, retry=True): def _execute_request(request, retry=True):
backend = args.pop('backend') if 'backend' in args else None tokens = request.action.split('.')
origin = args.pop('origin') if 'origin' in args else None
action = args.pop('action')
tokens = action.split('.')
module_name = str.join('.', tokens[:-1]) module_name = str.join('.', tokens[:-1])
method_name = tokens[-1:][0] method_name = tokens[-1:][0]
@ -72,27 +70,31 @@ def _exec_func(args, retry=True):
return return
try: try:
response = plugin.run(method=method_name, **args) response = plugin.run(method=method_name, **request.args)
if response and response.is_error(): if response and response.is_error():
logging.warn('Response processed with errors: {}'.format(response)) logging.warn('Response processed with errors: {}'.format(response))
else: else:
logging.info('Processed response: {}'.format(response)) logging.info('Processed response: {}'.format(response))
except Exception as e: except Exception as e:
response = Response(output=None, errors=[e, traceback.format_exc()]) response = Response(output=None, errors=[str(e), traceback.format_exc()])
logging.exception(e) logging.exception(e)
if retry: if retry:
# Put the popped args back where they were before retrying
args['action'] = action; args['origin'] = origin; args['backend'] = backend
logging.info('Reloading plugin {} and retrying'.format(module_name)) logging.info('Reloading plugin {} and retrying'.format(module_name))
_init_plugin(module_name, reload=True) _init_plugin(module_name, reload=True)
_exec_func(args, retry=False) _execute_request(request, retry=False)
finally: finally:
if backend: backend.send_response(origin, response) # Send the response on the backend that received the request
if request.backend and request.origin:
response.target = request.origin
request.backend.send_response(response)
def on_msg(msg): def on_msg(msg):
Thread(target=_exec_func, args=(msg,)).start() if isinstance(msg, Request):
logging.info('Processing request: {}'.format(msg))
Thread(target=_execute_request, args=(msg,)).start()
elif isinstance(msg, Response):
logging.info('Received response: {}'.format(msg))
def parse_config_file(config_file=None): def parse_config_file(config_file=None):
@ -132,11 +134,12 @@ def parse_config_file(config_file=None):
return config return config
def get_backends(config): def init_backends(config, bus=None):
backends = {} backends = {}
for k in config.keys(): for k in config.keys():
if k.startswith('backend.'): if not k.startswith('backend.'): continue
module = importlib.import_module(__package__ + '.' + k) module = importlib.import_module(__package__ + '.' + k)
# e.g. backend.pushbullet main class: PushbulletBackend # e.g. backend.pushbullet main class: PushbulletBackend
@ -149,7 +152,7 @@ def get_backends(config):
if 'pusher' in config[k]: del config[k]['pusher'] if 'pusher' in config[k]: del config[k]['pusher']
try: try:
b = getattr(module, cls_name)(config[k]) b = getattr(module, cls_name)(bus=bus, **config[k])
name = '.'.join((k.split('.'))[1:]) name = '.'.join((k.split('.'))[1:])
backends[name] = b backends[name] = b
except AttributeError as e: except AttributeError as e:
@ -208,16 +211,15 @@ Usage: {} [-v] [-h] [-c <config_file>]
logging.basicConfig(level=get_logging_level(), stream=sys.stdout) logging.basicConfig(level=get_logging_level(), stream=sys.stdout)
logging.debug('Configuration dump: {}'.format(config)) logging.debug('Configuration dump: {}'.format(config))
mq = Queue() bus = Queue()
backends = get_backends(config) backends = init_backends(config, bus)
for backend in backends.values(): for backend in backends.values():
backend.mq = mq
backend.start() backend.start()
while True: while True:
try: try:
on_msg(mq.get()) on_msg(bus.get())
except KeyboardInterrupt: except KeyboardInterrupt:
return return

View file

@ -1,91 +1,137 @@
import json import importlib
import logging import logging
import sys import sys
import platypush import platypush
from queue import Queue
from threading import Thread from threading import Thread
def _default_on_init(backend): from platypush.message import Message
logging.info('Backend {} initialized'.format(backend.__module__)) from platypush.message.request import Request
from platypush.message.response import Response
def _default_on_close(backend):
logging.info('Backend {} terminated'.format(backend.__module__))
def _default_on_msg(backend, msg):
logging.info('Received message: {}'.format(msg))
def _default_on_error(backend, error):
logging.error(error)
class Backend(Thread): class Backend(Thread):
def __init__(self, config, mq = None, """ Parent class for backends """
on_init = _default_on_init,
on_close = _default_on_close, def __init__(self, bus=None, **kwargs):
on_error = _default_on_error): """
self.config = config Params:
self.mq = mq bus -- Reference to the Platypush bus where the requests and the
self.on_init = on_init responses will be posted [Queue]
self.on_close = on_close kwargs -- key-value configuration for this backend [Dict]
self.on_error = on_error """
# If no bus is specified, create an internal queue where
# the received messages will be pushed
self.bus = bus if bus else Queue()
self.device_id = platypush.get_device_id() self.device_id = platypush.get_device_id()
self.msgtypes = {}
Thread.__init__(self) Thread.__init__(self)
logging.basicConfig(stream=sys.stdout, level=platypush.get_logging_level() logging.basicConfig(stream=sys.stdout, level=platypush.get_logging_level()
if 'logging' not in config if 'logging' not in kwargs
else getattr(logging, config.pop('logging'))) else getattr(logging, kwargs['logging']))
for cls in reversed(self.__class__.mro()):
if cls is not object and hasattr(cls, '_init'):
cls._init(self, **config)
def is_local(self): def is_local(self):
""" Returns true if this is a local backend """
from platypush.backend.local import LocalBackend from platypush.backend.local import LocalBackend
return isinstance(self, LocalBackend) return isinstance(self, LocalBackend)
def on_msg(self, msg): def _get_msgtype_class(self, msgtype):
if 'target' not in msg: return # No target """ Gets the class of a message type """
target = msg.pop('target')
if target != self.device_id and not self.is_local(): if msgtype in self.msgtypes: return self.msgtypes[msgtype]
try:
module = importlib.import_module('platypush.message.' + msgtype)
except ModuleNotFoundError as e:
logging.warn('Unsupported message type {}'.format(msgtype))
raise RuntimeError(e)
cls_name = msgtype[0].upper() + msgtype[1:]
try:
msgclass = getattr(module, cls_name)
self.msgtypes[msgtype] = msgclass
except AttributeError as e:
logging.warn('No such class in {}: {}'.format(
module.__name__, cls_name))
raise RuntimeError(e)
return msgclass
def on_msg(self, msg):
"""
Callback when a message is received on the backend.
It parses and posts the message on the main bus.
It should be called by the derived classes whenever
a new message should be processed.
Params:
msg -- The message. It can be either a key-value
dictionary, a platypush.message.Message
object, or a string/byte UTF-8 encoded string
"""
msg = Message.parse(msg)
if 'type' not in msg:
logging.warn('Ignoring message with no type: {}'.format(msg))
return
msgtype = self._get_msgtype_class(msg['type'])
msg = msgtype.build(msg)
if not getattr(msg, 'target') or (msg.target != self.device_id and not self.is_local()):
return # Not for me return # Not for me
if 'response' in msg: logging.debug('Message received on the backend: {}'.format(msg))
logging.info('Received response: {}'.format(msg)) msg.backend = self # Augment message
return self.bus.put(msg)
if 'action' not in msg: def send_request(self, request):
self.on_error('No action specified: {}'.format(msg)) """
return Send a request message on the backend
Params:
request -- The request, either a dict, a string/bytes UTF-8 JSON,
or a platypush.message.request.Request object
"""
msg['backend'] = self # Augment message request = Request.build(request)
self.mq.put(msg) assert isinstance(request, Request)
def send_msg(self, msg): request.origin = self.device_id
if isinstance(msg, str): self._send_msg(request)
msg = json.loads(msg)
if not isinstance(msg, dict):
raise RuntimeError('send_msg expects either a JSON string or ' +
'a dictionary but received {}'.format(type(msg)))
msg['origin'] = self.device_id # To get the response def send_response(self, response):
self._send_msg(msg) """
Send a response message on the backend
Params:
response -- The response, either a dict, a string/bytes UTF-8 JSON,
or a platypush.message.response.Response object
"""
def send_response(self, target, response): response = Response.build(response)
self.send_msg({ assert isinstance(response, Response)
'target' : target,
'response' : { response.origin = self.device_id
'output' : response.output, self._send_msg(response)
'errors' : response.errors,
} def _send_msg(self, msg):
}) """
Sends a platypush.message.Message to a node.
To be implemented in the derived classes.
Always call send_request or send_response instead of _send_msg directly
Param:
msg -- The message
"""
raise NotImplementedError("_send_msg should be implemented in a derived class")
def run(self): def run(self):
raise NotImplementedError() """ Starts the backend thread. To be implemented in the derived classes """
raise NotImplementedError("run should be implemented in a derived class")
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et:

View file

@ -6,11 +6,14 @@ from kafka import KafkaConsumer, KafkaProducer
from .. import Backend from .. import Backend
class KafkaBackend(Backend): class KafkaBackend(Backend):
def _init(self, server, topic): def __init__(self, server, topic, **kwargs):
super().__init__(**kwargs)
self.server = server self.server = server
self.topic_prefix = topic self.topic_prefix = topic
self.topic = self._topic_by_device_id(self.device_id) self.topic = self._topic_by_device_id(self.device_id)
self.producer = None self.producer = None
self._init_producer()
def _on_record(self, record): def _on_record(self, record):
if record.topic != self.topic: return if record.topic != self.topic: return
@ -31,8 +34,8 @@ class KafkaBackend(Backend):
return '{}.{}'.format(self.topic_prefix, device_id) return '{}.{}'.format(self.topic_prefix, device_id)
def _send_msg(self, msg): def _send_msg(self, msg):
target = msg['target'] target = msg.target
msg = json.dumps(msg).encode('utf-8') msg = str(msg).encode('utf-8')
self._init_producer() self._init_producer()
self.producer.send(self._topic_by_device_id(target), msg) self.producer.send(self._topic_by_device_id(target), msg)

View file

@ -6,13 +6,16 @@ import time
from .. import Backend from .. import Backend
class LocalBackend(Backend): class LocalBackend(Backend):
def _init(self, fifo): def __init__(self, fifo, **kwargs):
super().__init__(**kwargs)
self.fifo = fifo self.fifo = fifo
def _send_msg(self, msg): def _send_msg(self, msg):
msg = json.dumps(msg) msglen = len(str(msg))+1 # Include \n
msglen = len(msg)+1 # Include \n
msg = bytearray((str(msglen) + '\n' + msg + '\n').encode('utf-8')) # Message format: b"<length>\n<message>\n"
msg = bytearray((str(msglen) + '\n' + str(msg) + '\n').encode('utf-8'))
with open(self.fifo, 'wb') as f: with open(self.fifo, 'wb') as f:
f.write(msg) f.write(msg)

View file

@ -4,24 +4,22 @@ import requests
import time import time
import websocket import websocket
from platypush.message import Message
from .. import Backend from .. import Backend
class PushbulletBackend(Backend): class PushbulletBackend(Backend):
def _init(self, token, device): def __init__(self, token, device, **kwargs):
super().__init__(**kwargs)
self.token = token self.token = token
self.device_name = device self.device_name = device
self.pb_device_id = self.get_device_id() self.pb_device_id = self.get_device_id()
@staticmethod self._last_received_msg = {
def _on_msg(ws, msg): 'request': { 'body': None, 'time': None },
ws.backend._on_push(msg) 'response': { 'body': None, 'time': None },
}
@staticmethod
def _on_error(ws, e):
logging.exception(e)
backend = ws.backend
ws.close()
backend._init_socket()
def _get_latest_push(self): def _get_latest_push(self):
t = int(time.time()) - 2 t = int(time.time()) - 2
@ -46,7 +44,28 @@ class PushbulletBackend(Backend):
else: else:
return {} return {}
def _on_push(self, data): def _should_skip_last_received_msg(self, msg):
is_duplicate=False
last_msg = self._last_received_msg[msg['type']]
if last_msg:
msg = Message.parse(msg)
if str(msg) == str(last_msg['body']) \
and time.time() - last_msg['time'] <= 2:
# Duplicate message sent on the Pushbullet socket within
# two seconds, ignore it
logging.debug('Ignoring duplicate message received on the socket')
is_duplicate = True
self._last_received_msg[msg['type']] = {
'body': msg, 'time': time.time()
}
return is_duplicate
@staticmethod
def _on_msg(backend):
def _f(ws, data):
try: try:
data = json.loads(data) if isinstance(data, str) else push data = json.loads(data) if isinstance(data, str) else push
except Exception as e: except Exception as e:
@ -54,7 +73,7 @@ class PushbulletBackend(Backend):
return return
if data['type'] == 'tickle' and data['subtype'] == 'push': if data['type'] == 'tickle' and data['subtype'] == 'push':
push = self._get_latest_push() push = backend._get_latest_push()
elif data['type'] == 'push': elif data['type'] == 'push':
push = data['push'] push = data['push']
else: else:
@ -68,15 +87,27 @@ class PushbulletBackend(Backend):
try: body = json.loads(body) try: body = json.loads(body)
except ValueError as e: return except ValueError as e: return
self.on_msg(body) if not backend._should_skip_last_received_msg(body):
backend.on_msg(body)
return _f
@staticmethod
def _on_error(backend):
def _f(ws, e):
logging.exception(e)
logging.info('Restarting PushBullet backend')
ws.close()
backend._init_socket()
return _f
def _init_socket(self): def _init_socket(self):
self.ws = websocket.WebSocketApp( self.ws = websocket.WebSocketApp(
'wss://stream.pushbullet.com/websocket/' + self.token, 'wss://stream.pushbullet.com/websocket/' + self.token,
on_message = self._on_msg, # on_message = self._on_msg,
on_error = self._on_error) on_message = self._on_msg(self),
on_error = self._on_error(self))
self.ws.backend = self
def get_device_id(self): def get_device_id(self):
response = requests.get( response = requests.get(
@ -100,7 +131,7 @@ class PushbulletBackend(Backend):
json = { json = {
'type': 'note', 'type': 'note',
'device_iden': self.pb_device_id, 'device_iden': self.pb_device_id,
'body': json.dumps(msg), 'body': str(msg)
} }
).json() ).json()

View file

@ -0,0 +1,57 @@
import inspect
import json
class Message(object):
""" Message generic class """
def __str__(self):
"""
Overrides the str() operator and converts
the message into a UTF-8 JSON string
"""
return json.dumps({
attr: getattr(self, attr)
for attr in self.__dir__()
if not attr.startswith('_')
and not inspect.ismethod(getattr(self, attr))
})
def __bytes__(self):
"""
Overrides the bytes() operator, converts the message into
its JSON-serialized UTF-8-encoded representation
"""
return str(self).encode('utf-8')
@classmethod
def parse(cls, msg):
"""
Parse a generic message into a key-value dictionary
Params:
msg -- Original message - can be a dictionary, a Message,
or a string/bytearray, as long as it's valid UTF-8 JSON
"""
if isinstance(msg, cls):
msg = str(msg)
if isinstance(msg, bytes) or isinstance(msg, bytearray):
msg = msg.decode('utf-8')
if isinstance(msg, str):
msg = json.loads(msg)
assert isinstance(msg, dict)
return msg
@classmethod
def build(cls, msg):
"""
Builds a Message object from a dictionary.
To be implemented in the derived classes.
Params:
msg -- The message as a key-value dictionary
"""
raise RuntimeError('build should be implemented in a derived class')
# vim:sw=4:ts=4:et:

View file

@ -0,0 +1,50 @@
import json
from platypush.message import Message
class Request(Message):
""" Request message class """
def __init__(self, target, action, origin=None, args={}):
"""
Params:
target -- Target node [String]
action -- Action to be executed (e.g. music.mpd.play) [String]
origin -- Origin node [String]
args -- Additional arguments for the action [Dict]
"""
self.target = target
self.action = action
self.origin = origin
self.args = {}
@classmethod
def build(cls, msg):
msg = super().parse(msg)
args = {
'target' : msg['target'],
'action' : msg['action'],
'args' : msg['args'],
}
if 'origin' in msg: args['origin'] = msg['origin']
return Request(**args)
def __str__(self):
"""
Overrides the str() operator and converts
the message into a UTF-8 JSON string
"""
return json.dumps({
'type' : 'request',
'target' : self.target,
'action' : self.action,
'args' : self.args,
'origin' : self.origin if hasattr(self, 'origin') else None,
})
# vim:sw=4:ts=4:et:

View file

@ -1,16 +1,56 @@
import json import json
class Response(object): from platypush.message import Message
def __init__(self, output=None, errors=[]):
class Response(Message):
""" Response message class """
def __init__(self, target=None, origin=None, output=None, errors=[]):
"""
Params:
target -- Target [String]
origin -- Origin [String]
output -- Output [String]
errors -- Errors [List of strings or exceptions]
"""
self.target = target
self.output = output self.output = output
self.errors = errors self.errors = errors
self.origin = origin
def __str__(self):
return json.dumps({ 'output': self.output, 'errors': self.errors })
def is_error(self): def is_error(self):
""" Returns True if the respopnse has errors """
return len(self.errors) != 0 return len(self.errors) != 0
@classmethod
def build(cls, msg):
msg = super().parse(msg)
args = {
'target' : msg['target'],
'output' : msg['response']['output'],
'errors' : msg['response']['errors'],
}
if 'origin' in msg: args['origin'] = msg['origin']
return Response(**args)
def __str__(self):
"""
Overrides the str() operator and converts
the message into a UTF-8 JSON string
"""
return json.dumps({
'type' : 'response',
'target' : self.target if hasattr(self, 'target') else None,
'origin' : self.origin if hasattr(self, 'origin') else None,
'response' : {
'output' : self.output,
'errors' : self.errors,
},
})
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et:

View file

@ -2,7 +2,8 @@ import argparse
import re import re
import sys import sys
from platypush import get_backends, get_default_pusher_backend, parse_config_file from platypush import init_backends, get_default_pusher_backend, parse_config_file
from platypush.message.request import Request
def print_usage(): def print_usage():
print ('''Usage: {} [-h|--help] <-t|--target <target name>> <-a|--action <action name>> payload print ('''Usage: {} [-h|--help] <-t|--target <target name>> <-a|--action <action name>> payload
@ -17,23 +18,21 @@ def print_usage():
def pusher(target, action, backend=None, **kwargs): def pusher(target, action, backend=None, **kwargs):
config = parse_config_file() config = parse_config_file()
msg = {
'target': target,
'action': action,
**kwargs,
}
if target == 'localhost': if target == 'localhost':
backend = 'local' backend = 'local'
elif not backend: elif not backend:
backend = get_default_pusher_backend(config) backend = get_default_pusher_backend(config)
backends = get_backends(config) backends = init_backends(config)
if backend not in backends: if backend not in backends:
raise RuntimeError('No such backend configured: {}'.format(backend)) raise RuntimeError('No such backend configured: {}'.format(backend))
b = backends[backend] b = backends[backend]
b.send_msg(msg) b.send_request({
'target' : target,
'action' : action,
'args' : kwargs,
})
def main(): def main():