From 4b819d5460eff33b4f3faeadc6e56adc00514d59 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Fri, 22 Dec 2017 00:49:03 +0100 Subject: [PATCH] Another major refactoring. Among the other things, reintroduced local backend, made requests and responses working in every case, and properly handling stop events --- platypush/__init__.py | 36 +++---- platypush/backend/__init__.py | 114 ++++++++++++++++++----- platypush/backend/kafka/__init__.py | 19 ++-- platypush/backend/local/__init__.py | 77 +++++++++++++++ platypush/backend/pushbullet/__init__.py | 1 + platypush/config.example.yaml | 4 + platypush/message/__init__.py | 19 +++- platypush/pusher/__init__.py | 46 +++------ platypush/utils/__init__.py | 13 ++- 9 files changed, 231 insertions(+), 98 deletions(-) create mode 100644 platypush/backend/local/__init__.py mode change 100755 => 100644 platypush/pusher/__init__.py diff --git a/platypush/__init__.py b/platypush/__init__.py index d6a5f4e867..3212eca0cd 100644 --- a/platypush/__init__.py +++ b/platypush/__init__.py @@ -4,12 +4,10 @@ import sys import traceback from threading import Thread -from getopt import getopt from .bus import Bus from .config import Config from .utils import get_or_load_plugin, init_backends, get_module_and_name_from_action -from .message.event import Event, StopEvent from .message.request import Request from .message.response import Response @@ -46,6 +44,7 @@ class Daemon(object): """ self.config_file = config_file + self.message_handler = message_handler Config.init(self.config_file) logging.basicConfig(level=Config.get('logging'), stream=sys.stdout) @@ -83,21 +82,6 @@ class Daemon(object): return _f - def send_response(self, request, response): - """ Sends a response back. - Params: - request -- The platypush.message.request.Request object - response -- The platypush.message.response.Response object """ - - if request.backend and request.origin: - if request.id: response.id = request.id - response.target = request.origin - - logging.info('Processing response: {}'.format(response)) - request.backend.send_response(response) - else: - logging.info('Ignoring response as the request has no backend: ' - .format(request)) def run_request(self): """ Runs a request and returns the response """ @@ -118,8 +102,12 @@ class Daemon(object): # Run the action response = plugin.run(method=method_name, **request.args) if response and response.is_error(): - logging.warning('Response processed with errors: {}'.format(response)) - except Exception as e: # Retry mechanism + raise RuntimeError('Response processed with errors: {}'.format(response)) + + logging.info('Processed response from plugin {}: {}'. + format(plugin, response)) + except Exception as e: + # Retry mechanism response = Response(output=None, errors=[str(e), traceback.format_exc()]) logging.exception(e) if n_tries: @@ -127,8 +115,12 @@ class Daemon(object): get_or_load_plugin(module_name, reload=True) _thread_func(request, n_tries=n_tries-1) finally: - # Send the response on the backend that received the request - self.send_response(request, response) + # Send the response on the backend + if request.backend and request.origin: + request.backend.send_response(response=response, request=request) + else: + logging.info('Dropping response whose request has no ' + + 'origin attached: {}'.format(request)) return _thread_func @@ -137,7 +129,7 @@ class Daemon(object): self.bus = Bus(on_message=self.on_message()) # Initialize the backends and link them to the bus - self.backends = init_backends(self.bus) + self.backends = init_backends(bus=self.bus) # Start the backend threads for backend in self.backends.values(): diff --git a/platypush/backend/__init__.py b/platypush/backend/__init__.py index 3d4dac4273..cf37a13323 100644 --- a/platypush/backend/__init__.py +++ b/platypush/backend/__init__.py @@ -7,7 +7,7 @@ from threading import Thread from platypush.bus import Bus from platypush.config import Config -from platypush.utils import get_message_class_by_type +from platypush.utils import get_message_class_by_type, set_timeout, clear_timeout from platypush.message import Message from platypush.message.event import Event, StopEvent from platypush.message.request import Request @@ -16,6 +16,8 @@ from platypush.message.response import Response class Backend(Thread): """ Parent class for backends """ + _default_response_timeout = 5 + def __init__(self, bus=None, **kwargs): """ Params: @@ -26,10 +28,16 @@ class Backend(Thread): # If no bus is specified, create an internal queue where # the received messages will be pushed - self.bus = bus if bus else Bus() + self.bus = bus or Bus() self.device_id = Config.get('device_id') self.thread_id = None self._stop = False + self._kwargs = kwargs + + # Internal-only, we set the request context on a backend if that + # backend is intended to react for a response to a specific request + self._request_context = kwargs['_req_ctx'] if '_req_ctx' in kwargs \ + else None Thread.__init__(self) logging.basicConfig(stream=sys.stdout, level=Config.get('logging') @@ -50,59 +58,112 @@ class Backend(Thread): object, or a string/byte UTF-8 encoded string """ - msg = Message.parse(msg) - if 'type' not in msg: - logging.warning('Ignoring message with no type: {}'.format(msg)) - return - - msgtype = get_message_class_by_type(msg['type']) - msg = msgtype.build(msg) - + msg = Message.build(msg) if not getattr(msg, 'target') or msg.target != self.device_id: return # Not for me - logging.info('Message received on the {} backend: {}'.format( + logging.debug('Message received on the {} backend: {}'.format( self.__class__.__name__, msg)) - msg.backend = self # Augment message + if self._is_expected_response(msg): + # Expected response, trigger the response handler + clear_timeout() + self._request_context['on_response'](msg) + self.stop() + return if isinstance(msg, StopEvent) and msg.targets_me(): - logging.info('Received STOP event on the {} backend: {}'.format( - self.__class__.__name__, msg)) + logging.info('Received STOP event on {}'.format(self.__class__.__name__)) self._stop = True else: + msg.backend = self # Augment message to be able to process responses self.bus.post(msg) - def send_request(self, request): + + def _is_expected_response(self, msg): + """ Internal only - returns true if we are expecting for a response + and msg is that response """ + + return self._request_context \ + and isinstance(msg, Response) \ + and msg.id == self._request_context['request'].id + + + def _get_backend_config(self): + config_name = 'backend.' + self.__class__.__name__.split('Backend')[0].lower() + return Config.get(config_name) + + + def _setup_response_handler(self, request, on_response, response_timeout): + def _timeout_hndl(): + raise RuntimeError('Timed out while waiting for a response from {}'. + format(request.target)) + + req_ctx = { + 'request': request, + 'on_response': on_response, + 'response_timeout': response_timeout, + } + + resp_backend = self.__class__(bus=self.bus, _req_ctx=req_ctx, + **self._get_backend_config(), **self._kwargs) + + # Set the response timeout + set_timeout(seconds=self._default_response_timeout, + on_timeout=_timeout_hndl) + + resp_backend.start() + + + def send_request(self, request, on_response=None, + response_timeout=_default_response_timeout, **kwargs): """ Send a request message on the backend Params: - request -- The request, either a dict, a string/bytes UTF-8 JSON, - or a platypush.message.request.Request object + request -- The request, either a dict, a string/bytes UTF-8 JSON, + or a platypush.message.request.Request object. + + on_response -- Response handler, takes a platypush.message.response.Response + as argument. If set, the method will wait for a + response before exiting (default: None) + response_timeout -- If on_response is set, the backend will raise + an exception if the response isn't received + within this number of seconds (default: 5) """ request = Request.build(request) assert isinstance(request, Request) request.origin = self.device_id - self.send_message(request) - def send_response(self, response): + if on_response and response_timeout: + self._setup_response_handler(request, on_response, response_timeout) + + self.send_message(request, **kwargs) + + + def send_response(self, response, request, **kwargs): """ Send a response message on the backend Params: response -- The response, either a dict, a string/bytes UTF-8 JSON, or a platypush.message.response.Response object + request -- Associated request, used to set the response parameters + that will link them """ response = Response.build(response) assert isinstance(response, Response) + assert isinstance(request, Request) + response.id = request.id + response.target = request.origin response.origin = self.device_id - self.send_message(response) + + self.send_message(response, **kwargs) - def send_message(self, msg): + def send_message(self, msg, **kwargs): """ Sends a platypush.message.Message to a node. To be implemented in the derived classes. @@ -114,6 +175,7 @@ class Backend(Thread): raise NotImplementedError("send_message should be implemented in a derived class") + def run(self): """ Starts the backend thread. To be implemented in the derived classes """ self.thread_id = threading.get_ident() @@ -124,11 +186,13 @@ class Backend(Thread): def stop(self): """ Stops the backend thread by sending a STOP event on its bus """ - evt = StopEvent(target=self.device_id, origin=self.device_id, - thread_id=self.thread_id) + def _async_stop(): + evt = StopEvent(target=self.device_id, origin=self.device_id, + thread_id=self.thread_id) + self.send_message(evt) + self.on_stop() - self.send_message(evt) - self.on_stop() + Thread(target=_async_stop).start() def should_stop(self): return self._stop diff --git a/platypush/backend/kafka/__init__.py b/platypush/backend/kafka/__init__.py index 91acb836ad..42dd1fed0c 100644 --- a/platypush/backend/kafka/__init__.py +++ b/platypush/backend/kafka/__init__.py @@ -17,6 +17,8 @@ class KafkaBackend(Backend): self.topic = self._topic_by_device_id(self.device_id) self.producer = None + logging.getLogger('kafka').setLevel(logging.ERROR) + def _on_record(self, record): if record.topic != self.topic: return @@ -25,7 +27,7 @@ class KafkaBackend(Backend): except Exception as e: logging.exception(e) - logging.debug('Received message: {}'.format(msg)) + logging.debug('Received message on Kafka backend: {}'.format(msg)) self.on_message(msg) def _init_producer(self): @@ -44,14 +46,12 @@ class KafkaBackend(Backend): self.producer.flush() def on_stop(self): - try: - if self.producer: - self.producer.flush() - self.producer.close() + if self.producer: + self.producer.flush() + self.producer.close() - if self.consumer: - self.consumer.close() - except: pass + if self.consumer: + self.consumer.close() def run(self): super().run() @@ -63,8 +63,7 @@ class KafkaBackend(Backend): try: for msg in self.consumer: self._on_record(msg) - if self.should_stop(): - break + if self.should_stop(): break except ConnectionError: logging.warning('Kafka connection error, retrying in {} seconds'. format(self._conn_retry_secs)) diff --git a/platypush/backend/local/__init__.py b/platypush/backend/local/__init__.py new file mode 100644 index 0000000000..7429de470a --- /dev/null +++ b/platypush/backend/local/__init__.py @@ -0,0 +1,77 @@ +import logging +import json +import os +import time + +from .. import Backend + +from platypush.message import Message +from platypush.message.request import Request +from platypush.message.response import Response + +class LocalBackend(Backend): + """ Sends and receive messages on two distinct local FIFOs, one for + the requests and one for the responses """ + + def __init__(self, request_fifo, response_fifo, **kwargs): + super().__init__(**kwargs) + + self.request_fifo = request_fifo + self.response_fifo = response_fifo + + try: os.mkfifo(self.request_fifo) + except FileExistsError as e: pass + + try: os.mkfifo(self.response_fifo) + except FileExistsError as e: pass + + + def send_message(self, msg): + fifo = self.response_fifo \ + if isinstance(msg, Response) or self._request_context \ + else self.request_fifo + + msg = '{}\n'.format(str(msg)).encode('utf-8') + with open(fifo, 'wb') as f: + f.write(msg) + + + def _get_next_message(self): + fifo = self.response_fifo if self._request_context else self.request_fifo + with open(fifo, 'rb', 0) as f: + msg = f.readline() + + return Message.build(msg) if len(msg) else None + + + def on_stop(self): + try: os.remove(self.request_fifo) + except: pass + + try: os.remove(self.response_fifo) + except: pass + + + def run(self): + super().run() + logging.info('Initialized local backend on {} and {}'. + format(self.request_fifo, self.response_fifo)) + + while not self.should_stop(): + try: + msg = self._get_next_message() + if not msg: continue + except Exception as e: + logging.exception(e) + time.sleep(0.2) + continue + + # logging.debug('Received message on the local backend: {}'.format(msg)) + logging.info('Received message on the local backend: {}'.format(msg)) + + if self.should_stop(): break + self.on_message(msg) + + +# vim:sw=4:ts=4:et: + diff --git a/platypush/backend/pushbullet/__init__.py b/platypush/backend/pushbullet/__init__.py index 9ec630d9e9..ccba3958f0 100644 --- a/platypush/backend/pushbullet/__init__.py +++ b/platypush/backend/pushbullet/__init__.py @@ -80,6 +80,7 @@ class PushbulletBackend(Backend): push = data['push'] else: return # Not a push notification + if 'body' not in push: return logging.debug('Received push: {}'.format(push)) body = push['body'] diff --git a/platypush/config.example.yaml b/platypush/config.example.yaml index 706ca35aaf..49e2eade18 100644 --- a/platypush/config.example.yaml +++ b/platypush/config.example.yaml @@ -11,6 +11,10 @@ backend.pushbullet: token: your_pushbullet_token_here device: your_pushbullet_virtual_device_name +backend.local: + request_fifo: /tmp/platypush-requests.fifo + response_fifo: /tmp/platypush-responses.fifo + # device_id: (default: current hostname) # debug: True (default: False) diff --git a/platypush/message/__init__.py b/platypush/message/__init__.py index 1427aa5320..47a72f3fd0 100644 --- a/platypush/message/__init__.py +++ b/platypush/message/__init__.py @@ -1,6 +1,8 @@ +import logging import inspect import json + class Message(object): """ Message generic class """ @@ -15,7 +17,7 @@ class Message(object): for attr in self.__dir__() if not attr.startswith('_') and not inspect.ismethod(getattr(self, attr)) - }) + }).replace('\n', ' ') def __bytes__(self): """ @@ -38,7 +40,10 @@ class Message(object): if isinstance(msg, bytes) or isinstance(msg, bytearray): msg = msg.decode('utf-8') if isinstance(msg, str): - msg = json.loads(msg.strip()) + try: + msg = json.loads(msg.strip()) + except: + logging.warning('Invalid JSON message: {}'.format(msg)) assert isinstance(msg, dict) return msg @@ -47,11 +52,15 @@ class Message(object): def build(cls, msg): """ Builds a Message object from a dictionary. - To be implemented in the derived classes. Params: - msg -- The message as a key-value dictionary + msg -- The message as a key-value dictionary, Message object or JSON string """ - raise RuntimeError('build should be implemented in a derived class') + from platypush.utils import get_message_class_by_type + + + msg = cls.parse(msg) + msgtype = get_message_class_by_type(msg['type']) + if msgtype != cls: return msgtype.build(msg) # vim:sw=4:ts=4:et: diff --git a/platypush/pusher/__init__.py b/platypush/pusher/__init__.py old mode 100755 new mode 100644 index fca8dde353..f3cdbf0395 --- a/platypush/pusher/__init__.py +++ b/platypush/pusher/__init__.py @@ -1,13 +1,13 @@ import argparse -import os +import logging import re import sys from platypush.bus import Bus from platypush.config import Config from platypush.message.request import Request -from platypush.message.response import Response -from platypush.utils import init_backends, set_timeout, clear_timeout +from platypush.utils import init_backends + class Pusher(object): """ @@ -49,6 +49,7 @@ class Pusher(object): # Initialize the configuration self.config_file = config_file Config.init(config_file) + logging.basicConfig(level=Config.get('logging'), stream=sys.stdout) self.on_response = on_response or self.default_on_response() self.backend = backend or Config.get_default_pusher_backend() @@ -95,40 +96,19 @@ class Pusher(object): def get_backend(self, name): # Lazy init - if not self.backends: self.backends = init_backends(bus=self.bus) + if not self.backends: + self.backends = init_backends(bus=self.bus) + if name not in self.backends: raise RuntimeError('No such backend configured: {}'.format(name)) return self.backends[name] - def on_timeout(self): - """ Default response timeout handle: raise RuntimeError """ - def _f(): - raise RuntimeError('Response timed out') - return _f - def default_on_response(self): def _f(response): - print('Received response: {}'.format(response)) - os._exit(0) + logging.info('Received response: {}'.format(response)) + # self.backend_instance.stop() return _f - def response_wait(self, request, timeout): - # Install the timeout handler - set_timeout(seconds=timeout, on_timeout=self.on_timeout()) - - # Loop on the bus until you get a response for your request ID - response_received = False - while not response_received: - msg = self.bus.get() - response_received = ( - isinstance(msg, Response) and - hasattr(msg, 'id') and - msg.id == request.id) - - if timeout: clear_timeout() - self.on_response(msg) - - def push(self, target, action, backend=None, config_file=None, timeout=default_response_wait_timeout, **kwargs): """ @@ -161,11 +141,9 @@ class Pusher(object): 'args' : kwargs, }) - b = self.get_backend(backend) - b.start() - b.send_request(req) - - if timeout: self.response_wait(request=req, timeout=timeout) + self.backend_instance = self.get_backend(backend) + self.backend_instance.send_request(req, on_response=self.on_response, + response_timeout=timeout) def main(args=sys.argv[1:]): diff --git a/platypush/utils/__init__.py b/platypush/utils/__init__.py index c3248e34b0..f14bef1e9c 100644 --- a/platypush/utils/__init__.py +++ b/platypush/utils/__init__.py @@ -42,7 +42,16 @@ def get_or_load_plugin(plugin_name, reload=False): return plugin -def init_backends(bus=None): +def init_backends(bus=None, **kwargs): + """ Initialize the backend objects based on the configuration and returns + a name -> backend_instance map. + Params: + bus -- If specific (it usually should), the messages processed by the + backends will be posted on this bus. + + kwargs -- Any additional key-value parameters required to initialize the backends + """ + backends = {} for k in Config.get_backends().keys(): @@ -56,7 +65,7 @@ def init_backends(bus=None): ) + 'Backend' try: - b = getattr(module, cls_name)(bus=bus, **cfg) + b = getattr(module, cls_name)(bus=bus, **cfg, **kwargs) backends[k] = b except AttributeError as e: logging.warning('No such class in {}: {}'.format(