From 339e7b73a5cc63514517a36aed445931526e2e1c Mon Sep 17 00:00:00 2001
From: Fabio Manganiello <blacklight86@gmail.com>
Date: Sun, 17 Dec 2017 16:15:44 +0100
Subject: [PATCH] Major refactoring. Solves, among the others, #2, #18 and #22

---
 platypush/__init__.py                    |  72 +++++-----
 platypush/backend/__init__.py            | 168 +++++++++++++++--------
 platypush/backend/kafka/__init__.py      |   9 +-
 platypush/backend/local/__init__.py      |  11 +-
 platypush/backend/pushbullet/__init__.py | 105 +++++++++-----
 platypush/message/__init__.py            |  57 ++++++++
 platypush/message/request/__init__.py    |  50 +++++++
 platypush/message/response/__init__.py   |  50 ++++++-
 platypush/pusher/__init__.py             |  17 ++-
 9 files changed, 385 insertions(+), 154 deletions(-)
 create mode 100644 platypush/message/request/__init__.py

diff --git a/platypush/__init__.py b/platypush/__init__.py
index d03513ef..70a6cbdc 100644
--- a/platypush/__init__.py
+++ b/platypush/__init__.py
@@ -13,6 +13,7 @@ from queue import Queue
 from threading import Thread
 from getopt import getopt
 
+from .message.request import Request
 from .message.response import Response
 
 __author__ = 'Fabio Manganiello <info@fabiomanganiello.com>'
@@ -57,11 +58,8 @@ def _init_plugin(plugin_name, reload=False):
     return plugin
 
 
-def _exec_func(args, retry=True):
-    backend = args.pop('backend') if 'backend' in args else None
-    origin = args.pop('origin') if 'origin' in args else None
-    action = args.pop('action')
-    tokens = action.split('.')
+def _execute_request(request, retry=True):
+    tokens = request.action.split('.')
     module_name = str.join('.', tokens[:-1])
     method_name = tokens[-1:][0]
 
@@ -72,27 +70,31 @@ def _exec_func(args, retry=True):
         return
 
     try:
-        response = plugin.run(method=method_name, **args)
+        response = plugin.run(method=method_name, **request.args)
         if response and response.is_error():
             logging.warn('Response processed with errors: {}'.format(response))
         else:
             logging.info('Processed response: {}'.format(response))
     except Exception as e:
-        response = Response(output=None, errors=[e, traceback.format_exc()])
+        response = Response(output=None, errors=[str(e), traceback.format_exc()])
         logging.exception(e)
         if retry:
-            # Put the popped args back where they were before retrying
-            args['action'] = action; args['origin'] = origin; args['backend'] = backend
-
             logging.info('Reloading plugin {} and retrying'.format(module_name))
             _init_plugin(module_name, reload=True)
-            _exec_func(args, retry=False)
+            _execute_request(request, retry=False)
     finally:
-        if backend: backend.send_response(origin, response)
+        # Send the response on the backend that received the request
+        if request.backend and request.origin:
+            response.target = request.origin
+            request.backend.send_response(response)
 
 
 def on_msg(msg):
-    Thread(target=_exec_func, args=(msg,)).start()
+    if isinstance(msg, Request):
+        logging.info('Processing request: {}'.format(msg))
+        Thread(target=_execute_request, args=(msg,)).start()
+    elif isinstance(msg, Response):
+        logging.info('Received response: {}'.format(msg))
 
 
 def parse_config_file(config_file=None):
@@ -132,30 +134,31 @@ def parse_config_file(config_file=None):
     return config
 
 
-def get_backends(config):
+def init_backends(config, bus=None):
     backends = {}
 
     for k in config.keys():
-        if k.startswith('backend.'):
-            module = importlib.import_module(__package__ + '.' + k)
+        if not k.startswith('backend.'): continue
 
-            # e.g. backend.pushbullet main class: PushbulletBackend
-            cls_name = functools.reduce(
-                lambda a,b: a.title() + b.title(),
-                (module.__name__.title().split('.')[2:])
-            ) + 'Backend'
+        module = importlib.import_module(__package__ + '.' + k)
 
-            # Ignore the pusher attribute here
-            if 'pusher' in config[k]: del config[k]['pusher']
+        # e.g. backend.pushbullet main class: PushbulletBackend
+        cls_name = functools.reduce(
+            lambda a,b: a.title() + b.title(),
+            (module.__name__.title().split('.')[2:])
+        ) + 'Backend'
 
-            try:
-                b = getattr(module, cls_name)(config[k])
-                name = '.'.join((k.split('.'))[1:])
-                backends[name] = b
-            except AttributeError as e:
-                logging.warn('No such class in {}: {}'.format(
-                    module.__name__, cls_name))
-                raise RuntimeError(e)
+        # Ignore the pusher attribute here
+        if 'pusher' in config[k]: del config[k]['pusher']
+
+        try:
+            b = getattr(module, cls_name)(bus=bus, **config[k])
+            name = '.'.join((k.split('.'))[1:])
+            backends[name] = b
+        except AttributeError as e:
+            logging.warn('No such class in {}: {}'.format(
+                module.__name__, cls_name))
+            raise RuntimeError(e)
 
     return backends
 
@@ -208,16 +211,15 @@ Usage: {} [-v] [-h] [-c <config_file>]
     logging.basicConfig(level=get_logging_level(), stream=sys.stdout)
     logging.debug('Configuration dump: {}'.format(config))
 
-    mq = Queue()
-    backends = get_backends(config)
+    bus = Queue()
+    backends = init_backends(config, bus)
 
     for backend in backends.values():
-        backend.mq = mq
         backend.start()
 
     while True:
         try:
-            on_msg(mq.get())
+            on_msg(bus.get())
         except KeyboardInterrupt:
             return
 
diff --git a/platypush/backend/__init__.py b/platypush/backend/__init__.py
index 86195b9c..98faf8b7 100644
--- a/platypush/backend/__init__.py
+++ b/platypush/backend/__init__.py
@@ -1,91 +1,137 @@
-import json
+import importlib
 import logging
 import sys
 import platypush
 
+from queue import Queue
 from threading import Thread
 
-def _default_on_init(backend):
-    logging.info('Backend {} initialized'.format(backend.__module__))
-
-
-def _default_on_close(backend):
-    logging.info('Backend {} terminated'.format(backend.__module__))
-
-
-def _default_on_msg(backend, msg):
-    logging.info('Received message: {}'.format(msg))
-
-
-def _default_on_error(backend, error):
-    logging.error(error)
-
+from platypush.message import Message
+from platypush.message.request import Request
+from platypush.message.response import Response
 
 class Backend(Thread):
-    def __init__(self, config, mq = None,
-                 on_init  = _default_on_init,
-                 on_close = _default_on_close,
-                 on_error = _default_on_error):
-        self.config = config
-        self.mq = mq
-        self.on_init = on_init
-        self.on_close = on_close
-        self.on_error = on_error
+    """ Parent class for backends """
+
+    def __init__(self, bus=None, **kwargs):
+        """
+        Params:
+            bus    -- Reference to the Platypush bus where the requests and the
+                      responses will be posted [Queue]
+            kwargs -- key-value configuration for this backend [Dict]
+        """
+
+        # If no bus is specified, create an internal queue where
+        # the received messages will be pushed
+        self.bus = bus if bus else Queue()
         self.device_id = platypush.get_device_id()
+        self.msgtypes = {}
 
         Thread.__init__(self)
-
         logging.basicConfig(stream=sys.stdout, level=platypush.get_logging_level()
-                            if 'logging' not in config
-                            else getattr(logging, config.pop('logging')))
-
-        for cls in reversed(self.__class__.mro()):
-            if cls is not object and hasattr(cls, '_init'):
-                cls._init(self, **config)
+                            if 'logging' not in kwargs
+                            else getattr(logging, kwargs['logging']))
 
     def is_local(self):
+        """ Returns true if this is a local backend """
         from platypush.backend.local import LocalBackend
         return isinstance(self, LocalBackend)
 
-    def on_msg(self, msg):
-        if 'target' not in msg: return  # No target
-        target = msg.pop('target')
+    def _get_msgtype_class(self, msgtype):
+        """ Gets the class of a message type """
 
-        if target != self.device_id and not self.is_local():
+        if msgtype in self.msgtypes: return self.msgtypes[msgtype]
+
+        try:
+            module = importlib.import_module('platypush.message.' + msgtype)
+        except ModuleNotFoundError as e:
+            logging.warn('Unsupported message type {}'.format(msgtype))
+            raise RuntimeError(e)
+
+        cls_name = msgtype[0].upper() + msgtype[1:]
+
+        try:
+            msgclass = getattr(module, cls_name)
+            self.msgtypes[msgtype] = msgclass
+        except AttributeError as e:
+            logging.warn('No such class in {}: {}'.format(
+                module.__name__, cls_name))
+            raise RuntimeError(e)
+
+        return msgclass
+
+
+    def on_msg(self, msg):
+        """
+        Callback when a message is received on the backend.
+        It parses and posts the message on the main bus.
+        It should be called by the derived classes whenever
+        a new message should be processed.
+
+        Params:
+            msg -- The message. It can be either a key-value
+                   dictionary, a platypush.message.Message
+                   object, or a string/byte UTF-8 encoded string
+        """
+
+        msg = Message.parse(msg)
+        if 'type' not in msg:
+            logging.warn('Ignoring message with no type: {}'.format(msg))
+            return
+
+        msgtype = self._get_msgtype_class(msg['type'])
+        msg = msgtype.build(msg)
+
+        if not getattr(msg, 'target') or (msg.target != self.device_id and not self.is_local()):
             return  # Not for me
 
-        if 'response' in msg:
-            logging.info('Received response: {}'.format(msg))
-            return
+        logging.debug('Message received on the backend: {}'.format(msg))
+        msg.backend = self   # Augment message
+        self.bus.put(msg)
 
-        if 'action' not in msg:
-            self.on_error('No action specified: {}'.format(msg))
-            return
+    def send_request(self, request):
+        """
+        Send a request message on the backend
+        Params:
+            request -- The request, either a dict, a string/bytes UTF-8 JSON,
+                       or a platypush.message.request.Request object
+        """
 
-        msg['backend'] = self   # Augment message
-        self.mq.put(msg)
+        request = Request.build(request)
+        assert isinstance(request, Request)
 
-    def send_msg(self, msg):
-        if isinstance(msg, str):
-            msg = json.loads(msg)
-        if not isinstance(msg, dict):
-            raise RuntimeError('send_msg expects either a JSON string or ' +
-                               'a dictionary but received {}'.format(type(msg)))
+        request.origin = self.device_id
+        self._send_msg(request)
 
-        msg['origin'] = self.device_id  # To get the response
-        self._send_msg(msg)
+    def send_response(self, response):
+        """
+        Send a response message on the backend
+        Params:
+            response -- The response, either a dict, a string/bytes UTF-8 JSON,
+                        or a platypush.message.response.Response object
+        """
 
-    def send_response(self, target, response):
-        self.send_msg({
-            'target'     : target,
-            'response'   : {
-                'output' : response.output,
-                'errors' : response.errors,
-            }
-        })
+        response = Response.build(response)
+        assert isinstance(response, Response)
+
+        response.origin = self.device_id
+        self._send_msg(response)
+
+    def _send_msg(self, msg):
+        """
+        Sends a platypush.message.Message to a node.
+        To be implemented in the derived classes.
+        Always call send_request or send_response instead of _send_msg directly
+
+        Param:
+            msg -- The message
+        """
+
+        raise NotImplementedError("_send_msg should be implemented in a derived class")
 
     def run(self):
-        raise NotImplementedError()
+        """ Starts the backend thread. To be implemented in the derived classes """
+        raise NotImplementedError("run should be implemented in a derived class")
 
 # vim:sw=4:ts=4:et:
 
diff --git a/platypush/backend/kafka/__init__.py b/platypush/backend/kafka/__init__.py
index bf4db057..612f783e 100644
--- a/platypush/backend/kafka/__init__.py
+++ b/platypush/backend/kafka/__init__.py
@@ -6,11 +6,14 @@ from kafka import KafkaConsumer, KafkaProducer
 from .. import Backend
 
 class KafkaBackend(Backend):
-    def _init(self, server, topic):
+    def __init__(self, server, topic, **kwargs):
+        super().__init__(**kwargs)
+
         self.server = server
         self.topic_prefix = topic
         self.topic = self._topic_by_device_id(self.device_id)
         self.producer = None
+        self._init_producer()
 
     def _on_record(self, record):
         if record.topic != self.topic: return
@@ -31,8 +34,8 @@ class KafkaBackend(Backend):
         return '{}.{}'.format(self.topic_prefix, device_id)
 
     def _send_msg(self, msg):
-        target = msg['target']
-        msg = json.dumps(msg).encode('utf-8')
+        target = msg.target
+        msg = str(msg).encode('utf-8')
 
         self._init_producer()
         self.producer.send(self._topic_by_device_id(target), msg)
diff --git a/platypush/backend/local/__init__.py b/platypush/backend/local/__init__.py
index 85a28187..6de045e5 100644
--- a/platypush/backend/local/__init__.py
+++ b/platypush/backend/local/__init__.py
@@ -6,13 +6,16 @@ import time
 from .. import Backend
 
 class LocalBackend(Backend):
-    def _init(self, fifo):
+    def __init__(self, fifo, **kwargs):
+        super().__init__(**kwargs)
+
         self.fifo = fifo
 
     def _send_msg(self, msg):
-        msg = json.dumps(msg)
-        msglen = len(msg)+1  # Include \n
-        msg = bytearray((str(msglen) + '\n' + msg + '\n').encode('utf-8'))
+        msglen = len(str(msg))+1  # Include \n
+
+        # Message format: b"<length>\n<message>\n"
+        msg = bytearray((str(msglen) + '\n' + str(msg) + '\n').encode('utf-8'))
         with open(self.fifo, 'wb') as f:
             f.write(msg)
 
diff --git a/platypush/backend/pushbullet/__init__.py b/platypush/backend/pushbullet/__init__.py
index 6cd6d8f3..67464e55 100644
--- a/platypush/backend/pushbullet/__init__.py
+++ b/platypush/backend/pushbullet/__init__.py
@@ -4,24 +4,22 @@ import requests
 import time
 import websocket
 
+from platypush.message import Message
+
 from .. import Backend
 
 class PushbulletBackend(Backend):
-    def _init(self, token, device):
+    def __init__(self, token, device, **kwargs):
+        super().__init__(**kwargs)
+
         self.token = token
         self.device_name = device
         self.pb_device_id = self.get_device_id()
 
-    @staticmethod
-    def _on_msg(ws, msg):
-        ws.backend._on_push(msg)
-
-    @staticmethod
-    def _on_error(ws, e):
-        logging.exception(e)
-        backend = ws.backend
-        ws.close()
-        backend._init_socket()
+        self._last_received_msg = {
+            'request': { 'body': None, 'time': None },
+            'response': { 'body': None, 'time': None },
+        }
 
     def _get_latest_push(self):
         t = int(time.time()) - 2
@@ -46,37 +44,70 @@ class PushbulletBackend(Backend):
         else:
             return {}
 
-    def _on_push(self, data):
-        try:
-            data = json.loads(data) if isinstance(data, str) else push
-        except Exception as e:
+    def _should_skip_last_received_msg(self, msg):
+        is_duplicate=False
+        last_msg = self._last_received_msg[msg['type']]
+
+        if last_msg:
+            msg = Message.parse(msg)
+            if str(msg) == str(last_msg['body']) \
+                    and time.time() - last_msg['time'] <= 2:
+                # Duplicate message sent on the Pushbullet socket within
+                # two seconds, ignore it
+                logging.debug('Ignoring duplicate message received on the socket')
+                is_duplicate = True
+
+        self._last_received_msg[msg['type']] = {
+            'body': msg, 'time': time.time()
+        }
+
+        return is_duplicate
+
+    @staticmethod
+    def _on_msg(backend):
+        def _f(ws, data):
+            try:
+                data = json.loads(data) if isinstance(data, str) else push
+            except Exception as e:
+                logging.exception(e)
+                return
+
+            if data['type'] == 'tickle' and data['subtype'] == 'push':
+                push = backend._get_latest_push()
+            elif data['type'] == 'push':
+                push = data['push']
+            else:
+                return  # Not a push notification
+
+            logging.debug('Received push: {}'.format(push))
+
+            if 'body' not in push: return
+
+            body = push['body']
+            try: body = json.loads(body)
+            except ValueError as e: return
+
+            if not backend._should_skip_last_received_msg(body):
+                backend.on_msg(body)
+
+        return _f
+
+    @staticmethod
+    def _on_error(backend):
+        def _f(ws, e):
             logging.exception(e)
-            return
+            logging.info('Restarting PushBullet backend')
+            ws.close()
+            backend._init_socket()
 
-        if data['type'] == 'tickle' and data['subtype'] == 'push':
-            push = self._get_latest_push()
-        elif data['type'] == 'push':
-            push = data['push']
-        else:
-            return  # Not a push notification
-
-        logging.debug('Received push: {}'.format(push))
-
-        if 'body' not in push: return
-
-        body = push['body']
-        try: body = json.loads(body)
-        except ValueError as e: return
-
-        self.on_msg(body)
+        return _f
 
     def _init_socket(self):
         self.ws = websocket.WebSocketApp(
             'wss://stream.pushbullet.com/websocket/' + self.token,
-            on_message = self._on_msg,
-            on_error = self._on_error)
-
-        self.ws.backend = self
+            # on_message = self._on_msg,
+            on_message = self._on_msg(self),
+            on_error = self._on_error(self))
 
     def get_device_id(self):
         response = requests.get(
@@ -100,7 +131,7 @@ class PushbulletBackend(Backend):
             json = {
                 'type': 'note',
                 'device_iden': self.pb_device_id,
-                'body': json.dumps(msg),
+                'body': str(msg)
             }
         ).json()
 
diff --git a/platypush/message/__init__.py b/platypush/message/__init__.py
index e69de29b..0dc2bcf1 100644
--- a/platypush/message/__init__.py
+++ b/platypush/message/__init__.py
@@ -0,0 +1,57 @@
+import inspect
+import json
+
+class Message(object):
+    """ Message generic class """
+
+    def __str__(self):
+        """
+        Overrides the str() operator and converts
+        the message into a UTF-8 JSON string
+        """
+
+        return json.dumps({
+            attr: getattr(self, attr)
+            for attr in self.__dir__()
+            if not attr.startswith('_')
+            and not inspect.ismethod(getattr(self, attr))
+        })
+
+    def __bytes__(self):
+        """
+        Overrides the bytes() operator, converts the message into
+        its JSON-serialized UTF-8-encoded representation
+        """
+        return str(self).encode('utf-8')
+
+    @classmethod
+    def parse(cls, msg):
+        """
+        Parse a generic message into a key-value dictionary
+        Params:
+            msg -- Original message - can be a dictionary, a Message,
+                   or a string/bytearray, as long as it's valid UTF-8 JSON
+        """
+
+        if isinstance(msg, cls):
+            msg = str(msg)
+        if isinstance(msg, bytes) or isinstance(msg, bytearray):
+            msg = msg.decode('utf-8')
+        if isinstance(msg, str):
+            msg = json.loads(msg)
+
+        assert isinstance(msg, dict)
+        return msg
+
+    @classmethod
+    def build(cls, msg):
+        """
+        Builds a Message object from a dictionary.
+        To be implemented in the derived classes.
+        Params:
+            msg -- The message as a key-value dictionary
+        """
+        raise RuntimeError('build should be implemented in a derived class')
+
+# vim:sw=4:ts=4:et:
+
diff --git a/platypush/message/request/__init__.py b/platypush/message/request/__init__.py
new file mode 100644
index 00000000..22a1f74c
--- /dev/null
+++ b/platypush/message/request/__init__.py
@@ -0,0 +1,50 @@
+import json
+
+from platypush.message import Message
+
+class Request(Message):
+    """ Request message class """
+
+    def __init__(self, target, action, origin=None, args={}):
+        """
+        Params:
+            target -- Target node [String]
+            action -- Action to be executed (e.g. music.mpd.play) [String]
+            origin -- Origin node [String]
+              args -- Additional arguments for the action [Dict]
+        """
+
+        self.target = target
+        self.action = action
+        self.origin = origin
+        self.args   = {}
+
+    @classmethod
+    def build(cls, msg):
+        msg = super().parse(msg)
+        args = {
+            'target' : msg['target'],
+            'action' : msg['action'],
+            'args'   : msg['args'],
+        }
+
+        if 'origin' in msg: args['origin'] = msg['origin']
+        return Request(**args)
+
+    def __str__(self):
+        """
+        Overrides the str() operator and converts
+        the message into a UTF-8 JSON string
+        """
+
+        return json.dumps({
+            'type'   : 'request',
+            'target' : self.target,
+            'action' : self.action,
+            'args'   : self.args,
+            'origin' : self.origin if hasattr(self, 'origin') else None,
+        })
+
+
+# vim:sw=4:ts=4:et:
+
diff --git a/platypush/message/response/__init__.py b/platypush/message/response/__init__.py
index e845b24a..bf3be681 100644
--- a/platypush/message/response/__init__.py
+++ b/platypush/message/response/__init__.py
@@ -1,16 +1,56 @@
 import json
 
-class Response(object):
-    def __init__(self, output=None, errors=[]):
+from platypush.message import Message
+
+class Response(Message):
+    """ Response message class """
+
+    def __init__(self, target=None, origin=None, output=None, errors=[]):
+        """
+        Params:
+            target -- Target [String]
+            origin -- Origin [String]
+            output -- Output [String]
+            errors -- Errors [List of strings or exceptions]
+        """
+
+        self.target = target
         self.output = output
         self.errors = errors
-
-    def __str__(self):
-        return json.dumps({ 'output': self.output, 'errors': self.errors })
+        self.origin = origin
 
     def is_error(self):
+        """ Returns True if the respopnse has errors """
         return len(self.errors) != 0
 
+    @classmethod
+    def build(cls, msg):
+        msg = super().parse(msg)
+        args = {
+            'target' : msg['target'],
+            'output' : msg['response']['output'],
+            'errors' : msg['response']['errors'],
+        }
+
+        if 'origin' in msg: args['origin'] = msg['origin']
+        return Response(**args)
+
+
+    def __str__(self):
+        """
+        Overrides the str() operator and converts
+        the message into a UTF-8 JSON string
+        """
+
+        return json.dumps({
+            'type'       : 'response',
+            'target'     : self.target if hasattr(self, 'target') else None,
+            'origin'     : self.origin if hasattr(self, 'origin') else None,
+            'response'   : {
+                'output' : self.output,
+                'errors' : self.errors,
+            },
+        })
 
 # vim:sw=4:ts=4:et:
 
diff --git a/platypush/pusher/__init__.py b/platypush/pusher/__init__.py
index ca00b709..69c84d50 100755
--- a/platypush/pusher/__init__.py
+++ b/platypush/pusher/__init__.py
@@ -2,7 +2,8 @@ import argparse
 import re
 import sys
 
-from platypush import get_backends, get_default_pusher_backend, parse_config_file
+from platypush import init_backends, get_default_pusher_backend, parse_config_file
+from platypush.message.request import Request
 
 def print_usage():
     print ('''Usage: {} [-h|--help] <-t|--target <target name>> <-a|--action <action name>> payload
@@ -17,23 +18,21 @@ def print_usage():
 def pusher(target, action, backend=None, **kwargs):
     config = parse_config_file()
 
-    msg = {
-        'target': target,
-        'action': action,
-        **kwargs,
-    }
-
     if target == 'localhost':
         backend = 'local'
     elif not backend:
         backend = get_default_pusher_backend(config)
 
-    backends = get_backends(config)
+    backends = init_backends(config)
     if backend not in backends:
         raise RuntimeError('No such backend configured: {}'.format(backend))
 
     b = backends[backend]
-    b.send_msg(msg)
+    b.send_request({
+        'target' : target,
+        'action' : action,
+        'args'   : kwargs,
+    })
 
 
 def main():