diff --git a/platypush/__init__.py b/platypush/__init__.py index d03513ef..70a6cbdc 100644 --- a/platypush/__init__.py +++ b/platypush/__init__.py @@ -13,6 +13,7 @@ from queue import Queue from threading import Thread from getopt import getopt +from .message.request import Request from .message.response import Response __author__ = 'Fabio Manganiello ' @@ -57,11 +58,8 @@ def _init_plugin(plugin_name, reload=False): return plugin -def _exec_func(args, retry=True): - backend = args.pop('backend') if 'backend' in args else None - origin = args.pop('origin') if 'origin' in args else None - action = args.pop('action') - tokens = action.split('.') +def _execute_request(request, retry=True): + tokens = request.action.split('.') module_name = str.join('.', tokens[:-1]) method_name = tokens[-1:][0] @@ -72,27 +70,31 @@ def _exec_func(args, retry=True): return try: - response = plugin.run(method=method_name, **args) + response = plugin.run(method=method_name, **request.args) if response and response.is_error(): logging.warn('Response processed with errors: {}'.format(response)) else: logging.info('Processed response: {}'.format(response)) except Exception as e: - response = Response(output=None, errors=[e, traceback.format_exc()]) + response = Response(output=None, errors=[str(e), traceback.format_exc()]) logging.exception(e) if retry: - # Put the popped args back where they were before retrying - args['action'] = action; args['origin'] = origin; args['backend'] = backend - logging.info('Reloading plugin {} and retrying'.format(module_name)) _init_plugin(module_name, reload=True) - _exec_func(args, retry=False) + _execute_request(request, retry=False) finally: - if backend: backend.send_response(origin, response) + # Send the response on the backend that received the request + if request.backend and request.origin: + response.target = request.origin + request.backend.send_response(response) def on_msg(msg): - Thread(target=_exec_func, args=(msg,)).start() + if isinstance(msg, Request): + logging.info('Processing request: {}'.format(msg)) + Thread(target=_execute_request, args=(msg,)).start() + elif isinstance(msg, Response): + logging.info('Received response: {}'.format(msg)) def parse_config_file(config_file=None): @@ -132,30 +134,31 @@ def parse_config_file(config_file=None): return config -def get_backends(config): +def init_backends(config, bus=None): backends = {} for k in config.keys(): - if k.startswith('backend.'): - module = importlib.import_module(__package__ + '.' + k) + if not k.startswith('backend.'): continue - # e.g. backend.pushbullet main class: PushbulletBackend - cls_name = functools.reduce( - lambda a,b: a.title() + b.title(), - (module.__name__.title().split('.')[2:]) - ) + 'Backend' + module = importlib.import_module(__package__ + '.' + k) - # Ignore the pusher attribute here - if 'pusher' in config[k]: del config[k]['pusher'] + # e.g. backend.pushbullet main class: PushbulletBackend + cls_name = functools.reduce( + lambda a,b: a.title() + b.title(), + (module.__name__.title().split('.')[2:]) + ) + 'Backend' - try: - b = getattr(module, cls_name)(config[k]) - name = '.'.join((k.split('.'))[1:]) - backends[name] = b - except AttributeError as e: - logging.warn('No such class in {}: {}'.format( - module.__name__, cls_name)) - raise RuntimeError(e) + # Ignore the pusher attribute here + if 'pusher' in config[k]: del config[k]['pusher'] + + try: + b = getattr(module, cls_name)(bus=bus, **config[k]) + name = '.'.join((k.split('.'))[1:]) + backends[name] = b + except AttributeError as e: + logging.warn('No such class in {}: {}'.format( + module.__name__, cls_name)) + raise RuntimeError(e) return backends @@ -208,16 +211,15 @@ Usage: {} [-v] [-h] [-c ] logging.basicConfig(level=get_logging_level(), stream=sys.stdout) logging.debug('Configuration dump: {}'.format(config)) - mq = Queue() - backends = get_backends(config) + bus = Queue() + backends = init_backends(config, bus) for backend in backends.values(): - backend.mq = mq backend.start() while True: try: - on_msg(mq.get()) + on_msg(bus.get()) except KeyboardInterrupt: return diff --git a/platypush/backend/__init__.py b/platypush/backend/__init__.py index 86195b9c..98faf8b7 100644 --- a/platypush/backend/__init__.py +++ b/platypush/backend/__init__.py @@ -1,91 +1,137 @@ -import json +import importlib import logging import sys import platypush +from queue import Queue from threading import Thread -def _default_on_init(backend): - logging.info('Backend {} initialized'.format(backend.__module__)) - - -def _default_on_close(backend): - logging.info('Backend {} terminated'.format(backend.__module__)) - - -def _default_on_msg(backend, msg): - logging.info('Received message: {}'.format(msg)) - - -def _default_on_error(backend, error): - logging.error(error) - +from platypush.message import Message +from platypush.message.request import Request +from platypush.message.response import Response class Backend(Thread): - def __init__(self, config, mq = None, - on_init = _default_on_init, - on_close = _default_on_close, - on_error = _default_on_error): - self.config = config - self.mq = mq - self.on_init = on_init - self.on_close = on_close - self.on_error = on_error + """ Parent class for backends """ + + def __init__(self, bus=None, **kwargs): + """ + Params: + bus -- Reference to the Platypush bus where the requests and the + responses will be posted [Queue] + kwargs -- key-value configuration for this backend [Dict] + """ + + # If no bus is specified, create an internal queue where + # the received messages will be pushed + self.bus = bus if bus else Queue() self.device_id = platypush.get_device_id() + self.msgtypes = {} Thread.__init__(self) - logging.basicConfig(stream=sys.stdout, level=platypush.get_logging_level() - if 'logging' not in config - else getattr(logging, config.pop('logging'))) - - for cls in reversed(self.__class__.mro()): - if cls is not object and hasattr(cls, '_init'): - cls._init(self, **config) + if 'logging' not in kwargs + else getattr(logging, kwargs['logging'])) def is_local(self): + """ Returns true if this is a local backend """ from platypush.backend.local import LocalBackend return isinstance(self, LocalBackend) - def on_msg(self, msg): - if 'target' not in msg: return # No target - target = msg.pop('target') + def _get_msgtype_class(self, msgtype): + """ Gets the class of a message type """ - if target != self.device_id and not self.is_local(): + if msgtype in self.msgtypes: return self.msgtypes[msgtype] + + try: + module = importlib.import_module('platypush.message.' + msgtype) + except ModuleNotFoundError as e: + logging.warn('Unsupported message type {}'.format(msgtype)) + raise RuntimeError(e) + + cls_name = msgtype[0].upper() + msgtype[1:] + + try: + msgclass = getattr(module, cls_name) + self.msgtypes[msgtype] = msgclass + except AttributeError as e: + logging.warn('No such class in {}: {}'.format( + module.__name__, cls_name)) + raise RuntimeError(e) + + return msgclass + + + def on_msg(self, msg): + """ + Callback when a message is received on the backend. + It parses and posts the message on the main bus. + It should be called by the derived classes whenever + a new message should be processed. + + Params: + msg -- The message. It can be either a key-value + dictionary, a platypush.message.Message + object, or a string/byte UTF-8 encoded string + """ + + msg = Message.parse(msg) + if 'type' not in msg: + logging.warn('Ignoring message with no type: {}'.format(msg)) + return + + msgtype = self._get_msgtype_class(msg['type']) + msg = msgtype.build(msg) + + if not getattr(msg, 'target') or (msg.target != self.device_id and not self.is_local()): return # Not for me - if 'response' in msg: - logging.info('Received response: {}'.format(msg)) - return + logging.debug('Message received on the backend: {}'.format(msg)) + msg.backend = self # Augment message + self.bus.put(msg) - if 'action' not in msg: - self.on_error('No action specified: {}'.format(msg)) - return + def send_request(self, request): + """ + Send a request message on the backend + Params: + request -- The request, either a dict, a string/bytes UTF-8 JSON, + or a platypush.message.request.Request object + """ - msg['backend'] = self # Augment message - self.mq.put(msg) + request = Request.build(request) + assert isinstance(request, Request) - def send_msg(self, msg): - if isinstance(msg, str): - msg = json.loads(msg) - if not isinstance(msg, dict): - raise RuntimeError('send_msg expects either a JSON string or ' + - 'a dictionary but received {}'.format(type(msg))) + request.origin = self.device_id + self._send_msg(request) - msg['origin'] = self.device_id # To get the response - self._send_msg(msg) + def send_response(self, response): + """ + Send a response message on the backend + Params: + response -- The response, either a dict, a string/bytes UTF-8 JSON, + or a platypush.message.response.Response object + """ - def send_response(self, target, response): - self.send_msg({ - 'target' : target, - 'response' : { - 'output' : response.output, - 'errors' : response.errors, - } - }) + response = Response.build(response) + assert isinstance(response, Response) + + response.origin = self.device_id + self._send_msg(response) + + def _send_msg(self, msg): + """ + Sends a platypush.message.Message to a node. + To be implemented in the derived classes. + Always call send_request or send_response instead of _send_msg directly + + Param: + msg -- The message + """ + + raise NotImplementedError("_send_msg should be implemented in a derived class") def run(self): - raise NotImplementedError() + """ Starts the backend thread. To be implemented in the derived classes """ + raise NotImplementedError("run should be implemented in a derived class") # vim:sw=4:ts=4:et: diff --git a/platypush/backend/kafka/__init__.py b/platypush/backend/kafka/__init__.py index bf4db057..612f783e 100644 --- a/platypush/backend/kafka/__init__.py +++ b/platypush/backend/kafka/__init__.py @@ -6,11 +6,14 @@ from kafka import KafkaConsumer, KafkaProducer from .. import Backend class KafkaBackend(Backend): - def _init(self, server, topic): + def __init__(self, server, topic, **kwargs): + super().__init__(**kwargs) + self.server = server self.topic_prefix = topic self.topic = self._topic_by_device_id(self.device_id) self.producer = None + self._init_producer() def _on_record(self, record): if record.topic != self.topic: return @@ -31,8 +34,8 @@ class KafkaBackend(Backend): return '{}.{}'.format(self.topic_prefix, device_id) def _send_msg(self, msg): - target = msg['target'] - msg = json.dumps(msg).encode('utf-8') + target = msg.target + msg = str(msg).encode('utf-8') self._init_producer() self.producer.send(self._topic_by_device_id(target), msg) diff --git a/platypush/backend/local/__init__.py b/platypush/backend/local/__init__.py index 85a28187..6de045e5 100644 --- a/platypush/backend/local/__init__.py +++ b/platypush/backend/local/__init__.py @@ -6,13 +6,16 @@ import time from .. import Backend class LocalBackend(Backend): - def _init(self, fifo): + def __init__(self, fifo, **kwargs): + super().__init__(**kwargs) + self.fifo = fifo def _send_msg(self, msg): - msg = json.dumps(msg) - msglen = len(msg)+1 # Include \n - msg = bytearray((str(msglen) + '\n' + msg + '\n').encode('utf-8')) + msglen = len(str(msg))+1 # Include \n + + # Message format: b"\n\n" + msg = bytearray((str(msglen) + '\n' + str(msg) + '\n').encode('utf-8')) with open(self.fifo, 'wb') as f: f.write(msg) diff --git a/platypush/backend/pushbullet/__init__.py b/platypush/backend/pushbullet/__init__.py index 6cd6d8f3..67464e55 100644 --- a/platypush/backend/pushbullet/__init__.py +++ b/platypush/backend/pushbullet/__init__.py @@ -4,24 +4,22 @@ import requests import time import websocket +from platypush.message import Message + from .. import Backend class PushbulletBackend(Backend): - def _init(self, token, device): + def __init__(self, token, device, **kwargs): + super().__init__(**kwargs) + self.token = token self.device_name = device self.pb_device_id = self.get_device_id() - @staticmethod - def _on_msg(ws, msg): - ws.backend._on_push(msg) - - @staticmethod - def _on_error(ws, e): - logging.exception(e) - backend = ws.backend - ws.close() - backend._init_socket() + self._last_received_msg = { + 'request': { 'body': None, 'time': None }, + 'response': { 'body': None, 'time': None }, + } def _get_latest_push(self): t = int(time.time()) - 2 @@ -46,37 +44,70 @@ class PushbulletBackend(Backend): else: return {} - def _on_push(self, data): - try: - data = json.loads(data) if isinstance(data, str) else push - except Exception as e: + def _should_skip_last_received_msg(self, msg): + is_duplicate=False + last_msg = self._last_received_msg[msg['type']] + + if last_msg: + msg = Message.parse(msg) + if str(msg) == str(last_msg['body']) \ + and time.time() - last_msg['time'] <= 2: + # Duplicate message sent on the Pushbullet socket within + # two seconds, ignore it + logging.debug('Ignoring duplicate message received on the socket') + is_duplicate = True + + self._last_received_msg[msg['type']] = { + 'body': msg, 'time': time.time() + } + + return is_duplicate + + @staticmethod + def _on_msg(backend): + def _f(ws, data): + try: + data = json.loads(data) if isinstance(data, str) else push + except Exception as e: + logging.exception(e) + return + + if data['type'] == 'tickle' and data['subtype'] == 'push': + push = backend._get_latest_push() + elif data['type'] == 'push': + push = data['push'] + else: + return # Not a push notification + + logging.debug('Received push: {}'.format(push)) + + if 'body' not in push: return + + body = push['body'] + try: body = json.loads(body) + except ValueError as e: return + + if not backend._should_skip_last_received_msg(body): + backend.on_msg(body) + + return _f + + @staticmethod + def _on_error(backend): + def _f(ws, e): logging.exception(e) - return + logging.info('Restarting PushBullet backend') + ws.close() + backend._init_socket() - if data['type'] == 'tickle' and data['subtype'] == 'push': - push = self._get_latest_push() - elif data['type'] == 'push': - push = data['push'] - else: - return # Not a push notification - - logging.debug('Received push: {}'.format(push)) - - if 'body' not in push: return - - body = push['body'] - try: body = json.loads(body) - except ValueError as e: return - - self.on_msg(body) + return _f def _init_socket(self): self.ws = websocket.WebSocketApp( 'wss://stream.pushbullet.com/websocket/' + self.token, - on_message = self._on_msg, - on_error = self._on_error) - - self.ws.backend = self + # on_message = self._on_msg, + on_message = self._on_msg(self), + on_error = self._on_error(self)) def get_device_id(self): response = requests.get( @@ -100,7 +131,7 @@ class PushbulletBackend(Backend): json = { 'type': 'note', 'device_iden': self.pb_device_id, - 'body': json.dumps(msg), + 'body': str(msg) } ).json() diff --git a/platypush/message/__init__.py b/platypush/message/__init__.py index e69de29b..0dc2bcf1 100644 --- a/platypush/message/__init__.py +++ b/platypush/message/__init__.py @@ -0,0 +1,57 @@ +import inspect +import json + +class Message(object): + """ Message generic class """ + + def __str__(self): + """ + Overrides the str() operator and converts + the message into a UTF-8 JSON string + """ + + return json.dumps({ + attr: getattr(self, attr) + for attr in self.__dir__() + if not attr.startswith('_') + and not inspect.ismethod(getattr(self, attr)) + }) + + def __bytes__(self): + """ + Overrides the bytes() operator, converts the message into + its JSON-serialized UTF-8-encoded representation + """ + return str(self).encode('utf-8') + + @classmethod + def parse(cls, msg): + """ + Parse a generic message into a key-value dictionary + Params: + msg -- Original message - can be a dictionary, a Message, + or a string/bytearray, as long as it's valid UTF-8 JSON + """ + + if isinstance(msg, cls): + msg = str(msg) + if isinstance(msg, bytes) or isinstance(msg, bytearray): + msg = msg.decode('utf-8') + if isinstance(msg, str): + msg = json.loads(msg) + + assert isinstance(msg, dict) + return msg + + @classmethod + def build(cls, msg): + """ + Builds a Message object from a dictionary. + To be implemented in the derived classes. + Params: + msg -- The message as a key-value dictionary + """ + raise RuntimeError('build should be implemented in a derived class') + +# vim:sw=4:ts=4:et: + diff --git a/platypush/message/request/__init__.py b/platypush/message/request/__init__.py new file mode 100644 index 00000000..22a1f74c --- /dev/null +++ b/platypush/message/request/__init__.py @@ -0,0 +1,50 @@ +import json + +from platypush.message import Message + +class Request(Message): + """ Request message class """ + + def __init__(self, target, action, origin=None, args={}): + """ + Params: + target -- Target node [String] + action -- Action to be executed (e.g. music.mpd.play) [String] + origin -- Origin node [String] + args -- Additional arguments for the action [Dict] + """ + + self.target = target + self.action = action + self.origin = origin + self.args = {} + + @classmethod + def build(cls, msg): + msg = super().parse(msg) + args = { + 'target' : msg['target'], + 'action' : msg['action'], + 'args' : msg['args'], + } + + if 'origin' in msg: args['origin'] = msg['origin'] + return Request(**args) + + def __str__(self): + """ + Overrides the str() operator and converts + the message into a UTF-8 JSON string + """ + + return json.dumps({ + 'type' : 'request', + 'target' : self.target, + 'action' : self.action, + 'args' : self.args, + 'origin' : self.origin if hasattr(self, 'origin') else None, + }) + + +# vim:sw=4:ts=4:et: + diff --git a/platypush/message/response/__init__.py b/platypush/message/response/__init__.py index e845b24a..bf3be681 100644 --- a/platypush/message/response/__init__.py +++ b/platypush/message/response/__init__.py @@ -1,16 +1,56 @@ import json -class Response(object): - def __init__(self, output=None, errors=[]): +from platypush.message import Message + +class Response(Message): + """ Response message class """ + + def __init__(self, target=None, origin=None, output=None, errors=[]): + """ + Params: + target -- Target [String] + origin -- Origin [String] + output -- Output [String] + errors -- Errors [List of strings or exceptions] + """ + + self.target = target self.output = output self.errors = errors - - def __str__(self): - return json.dumps({ 'output': self.output, 'errors': self.errors }) + self.origin = origin def is_error(self): + """ Returns True if the respopnse has errors """ return len(self.errors) != 0 + @classmethod + def build(cls, msg): + msg = super().parse(msg) + args = { + 'target' : msg['target'], + 'output' : msg['response']['output'], + 'errors' : msg['response']['errors'], + } + + if 'origin' in msg: args['origin'] = msg['origin'] + return Response(**args) + + + def __str__(self): + """ + Overrides the str() operator and converts + the message into a UTF-8 JSON string + """ + + return json.dumps({ + 'type' : 'response', + 'target' : self.target if hasattr(self, 'target') else None, + 'origin' : self.origin if hasattr(self, 'origin') else None, + 'response' : { + 'output' : self.output, + 'errors' : self.errors, + }, + }) # vim:sw=4:ts=4:et: diff --git a/platypush/pusher/__init__.py b/platypush/pusher/__init__.py index ca00b709..69c84d50 100755 --- a/platypush/pusher/__init__.py +++ b/platypush/pusher/__init__.py @@ -2,7 +2,8 @@ import argparse import re import sys -from platypush import get_backends, get_default_pusher_backend, parse_config_file +from platypush import init_backends, get_default_pusher_backend, parse_config_file +from platypush.message.request import Request def print_usage(): print ('''Usage: {} [-h|--help] <-t|--target > <-a|--action > payload @@ -17,23 +18,21 @@ def print_usage(): def pusher(target, action, backend=None, **kwargs): config = parse_config_file() - msg = { - 'target': target, - 'action': action, - **kwargs, - } - if target == 'localhost': backend = 'local' elif not backend: backend = get_default_pusher_backend(config) - backends = get_backends(config) + backends = init_backends(config) if backend not in backends: raise RuntimeError('No such backend configured: {}'.format(backend)) b = backends[backend] - b.send_msg(msg) + b.send_request({ + 'target' : target, + 'action' : action, + 'args' : kwargs, + }) def main():