Another major refactoring. Among the other things, reintroduced local backend, made requests and responses working in every case, and properly handling stop events

This commit is contained in:
Fabio Manganiello 2017-12-22 00:49:03 +01:00
parent 7e79fa0418
commit 4b819d5460
9 changed files with 231 additions and 98 deletions

View file

@ -4,12 +4,10 @@ import sys
import traceback
from threading import Thread
from getopt import getopt
from .bus import Bus
from .config import Config
from .utils import get_or_load_plugin, init_backends, get_module_and_name_from_action
from .message.event import Event, StopEvent
from .message.request import Request
from .message.response import Response
@ -46,6 +44,7 @@ class Daemon(object):
"""
self.config_file = config_file
self.message_handler = message_handler
Config.init(self.config_file)
logging.basicConfig(level=Config.get('logging'), stream=sys.stdout)
@ -83,21 +82,6 @@ class Daemon(object):
return _f
def send_response(self, request, response):
""" Sends a response back.
Params:
request -- The platypush.message.request.Request object
response -- The platypush.message.response.Response object """
if request.backend and request.origin:
if request.id: response.id = request.id
response.target = request.origin
logging.info('Processing response: {}'.format(response))
request.backend.send_response(response)
else:
logging.info('Ignoring response as the request has no backend: '
.format(request))
def run_request(self):
""" Runs a request and returns the response """
@ -118,8 +102,12 @@ class Daemon(object):
# Run the action
response = plugin.run(method=method_name, **request.args)
if response and response.is_error():
logging.warning('Response processed with errors: {}'.format(response))
except Exception as e: # Retry mechanism
raise RuntimeError('Response processed with errors: {}'.format(response))
logging.info('Processed response from plugin {}: {}'.
format(plugin, response))
except Exception as e:
# Retry mechanism
response = Response(output=None, errors=[str(e), traceback.format_exc()])
logging.exception(e)
if n_tries:
@ -127,8 +115,12 @@ class Daemon(object):
get_or_load_plugin(module_name, reload=True)
_thread_func(request, n_tries=n_tries-1)
finally:
# Send the response on the backend that received the request
self.send_response(request, response)
# Send the response on the backend
if request.backend and request.origin:
request.backend.send_response(response=response, request=request)
else:
logging.info('Dropping response whose request has no ' +
'origin attached: {}'.format(request))
return _thread_func
@ -137,7 +129,7 @@ class Daemon(object):
self.bus = Bus(on_message=self.on_message())
# Initialize the backends and link them to the bus
self.backends = init_backends(self.bus)
self.backends = init_backends(bus=self.bus)
# Start the backend threads
for backend in self.backends.values():

View file

@ -7,7 +7,7 @@ from threading import Thread
from platypush.bus import Bus
from platypush.config import Config
from platypush.utils import get_message_class_by_type
from platypush.utils import get_message_class_by_type, set_timeout, clear_timeout
from platypush.message import Message
from platypush.message.event import Event, StopEvent
from platypush.message.request import Request
@ -16,6 +16,8 @@ from platypush.message.response import Response
class Backend(Thread):
""" Parent class for backends """
_default_response_timeout = 5
def __init__(self, bus=None, **kwargs):
"""
Params:
@ -26,10 +28,16 @@ class Backend(Thread):
# If no bus is specified, create an internal queue where
# the received messages will be pushed
self.bus = bus if bus else Bus()
self.bus = bus or Bus()
self.device_id = Config.get('device_id')
self.thread_id = None
self._stop = False
self._kwargs = kwargs
# Internal-only, we set the request context on a backend if that
# backend is intended to react for a response to a specific request
self._request_context = kwargs['_req_ctx'] if '_req_ctx' in kwargs \
else None
Thread.__init__(self)
logging.basicConfig(stream=sys.stdout, level=Config.get('logging')
@ -50,59 +58,112 @@ class Backend(Thread):
object, or a string/byte UTF-8 encoded string
"""
msg = Message.parse(msg)
if 'type' not in msg:
logging.warning('Ignoring message with no type: {}'.format(msg))
return
msgtype = get_message_class_by_type(msg['type'])
msg = msgtype.build(msg)
msg = Message.build(msg)
if not getattr(msg, 'target') or msg.target != self.device_id:
return # Not for me
logging.info('Message received on the {} backend: {}'.format(
logging.debug('Message received on the {} backend: {}'.format(
self.__class__.__name__, msg))
msg.backend = self # Augment message
if self._is_expected_response(msg):
# Expected response, trigger the response handler
clear_timeout()
self._request_context['on_response'](msg)
self.stop()
return
if isinstance(msg, StopEvent) and msg.targets_me():
logging.info('Received STOP event on the {} backend: {}'.format(
self.__class__.__name__, msg))
logging.info('Received STOP event on {}'.format(self.__class__.__name__))
self._stop = True
else:
msg.backend = self # Augment message to be able to process responses
self.bus.post(msg)
def send_request(self, request):
def _is_expected_response(self, msg):
""" Internal only - returns true if we are expecting for a response
and msg is that response """
return self._request_context \
and isinstance(msg, Response) \
and msg.id == self._request_context['request'].id
def _get_backend_config(self):
config_name = 'backend.' + self.__class__.__name__.split('Backend')[0].lower()
return Config.get(config_name)
def _setup_response_handler(self, request, on_response, response_timeout):
def _timeout_hndl():
raise RuntimeError('Timed out while waiting for a response from {}'.
format(request.target))
req_ctx = {
'request': request,
'on_response': on_response,
'response_timeout': response_timeout,
}
resp_backend = self.__class__(bus=self.bus, _req_ctx=req_ctx,
**self._get_backend_config(), **self._kwargs)
# Set the response timeout
set_timeout(seconds=self._default_response_timeout,
on_timeout=_timeout_hndl)
resp_backend.start()
def send_request(self, request, on_response=None,
response_timeout=_default_response_timeout, **kwargs):
"""
Send a request message on the backend
Params:
request -- The request, either a dict, a string/bytes UTF-8 JSON,
or a platypush.message.request.Request object
request -- The request, either a dict, a string/bytes UTF-8 JSON,
or a platypush.message.request.Request object.
on_response -- Response handler, takes a platypush.message.response.Response
as argument. If set, the method will wait for a
response before exiting (default: None)
response_timeout -- If on_response is set, the backend will raise
an exception if the response isn't received
within this number of seconds (default: 5)
"""
request = Request.build(request)
assert isinstance(request, Request)
request.origin = self.device_id
self.send_message(request)
def send_response(self, response):
if on_response and response_timeout:
self._setup_response_handler(request, on_response, response_timeout)
self.send_message(request, **kwargs)
def send_response(self, response, request, **kwargs):
"""
Send a response message on the backend
Params:
response -- The response, either a dict, a string/bytes UTF-8 JSON,
or a platypush.message.response.Response object
request -- Associated request, used to set the response parameters
that will link them
"""
response = Response.build(response)
assert isinstance(response, Response)
assert isinstance(request, Request)
response.id = request.id
response.target = request.origin
response.origin = self.device_id
self.send_message(response)
self.send_message(response, **kwargs)
def send_message(self, msg):
def send_message(self, msg, **kwargs):
"""
Sends a platypush.message.Message to a node.
To be implemented in the derived classes.
@ -114,6 +175,7 @@ class Backend(Thread):
raise NotImplementedError("send_message should be implemented in a derived class")
def run(self):
""" Starts the backend thread. To be implemented in the derived classes """
self.thread_id = threading.get_ident()
@ -124,11 +186,13 @@ class Backend(Thread):
def stop(self):
""" Stops the backend thread by sending a STOP event on its bus """
evt = StopEvent(target=self.device_id, origin=self.device_id,
thread_id=self.thread_id)
def _async_stop():
evt = StopEvent(target=self.device_id, origin=self.device_id,
thread_id=self.thread_id)
self.send_message(evt)
self.on_stop()
self.send_message(evt)
self.on_stop()
Thread(target=_async_stop).start()
def should_stop(self):
return self._stop

View file

@ -17,6 +17,8 @@ class KafkaBackend(Backend):
self.topic = self._topic_by_device_id(self.device_id)
self.producer = None
logging.getLogger('kafka').setLevel(logging.ERROR)
def _on_record(self, record):
if record.topic != self.topic: return
@ -25,7 +27,7 @@ class KafkaBackend(Backend):
except Exception as e:
logging.exception(e)
logging.debug('Received message: {}'.format(msg))
logging.debug('Received message on Kafka backend: {}'.format(msg))
self.on_message(msg)
def _init_producer(self):
@ -44,14 +46,12 @@ class KafkaBackend(Backend):
self.producer.flush()
def on_stop(self):
try:
if self.producer:
self.producer.flush()
self.producer.close()
if self.producer:
self.producer.flush()
self.producer.close()
if self.consumer:
self.consumer.close()
except: pass
if self.consumer:
self.consumer.close()
def run(self):
super().run()
@ -63,8 +63,7 @@ class KafkaBackend(Backend):
try:
for msg in self.consumer:
self._on_record(msg)
if self.should_stop():
break
if self.should_stop(): break
except ConnectionError:
logging.warning('Kafka connection error, retrying in {} seconds'.
format(self._conn_retry_secs))

View file

@ -0,0 +1,77 @@
import logging
import json
import os
import time
from .. import Backend
from platypush.message import Message
from platypush.message.request import Request
from platypush.message.response import Response
class LocalBackend(Backend):
""" Sends and receive messages on two distinct local FIFOs, one for
the requests and one for the responses """
def __init__(self, request_fifo, response_fifo, **kwargs):
super().__init__(**kwargs)
self.request_fifo = request_fifo
self.response_fifo = response_fifo
try: os.mkfifo(self.request_fifo)
except FileExistsError as e: pass
try: os.mkfifo(self.response_fifo)
except FileExistsError as e: pass
def send_message(self, msg):
fifo = self.response_fifo \
if isinstance(msg, Response) or self._request_context \
else self.request_fifo
msg = '{}\n'.format(str(msg)).encode('utf-8')
with open(fifo, 'wb') as f:
f.write(msg)
def _get_next_message(self):
fifo = self.response_fifo if self._request_context else self.request_fifo
with open(fifo, 'rb', 0) as f:
msg = f.readline()
return Message.build(msg) if len(msg) else None
def on_stop(self):
try: os.remove(self.request_fifo)
except: pass
try: os.remove(self.response_fifo)
except: pass
def run(self):
super().run()
logging.info('Initialized local backend on {} and {}'.
format(self.request_fifo, self.response_fifo))
while not self.should_stop():
try:
msg = self._get_next_message()
if not msg: continue
except Exception as e:
logging.exception(e)
time.sleep(0.2)
continue
# logging.debug('Received message on the local backend: {}'.format(msg))
logging.info('Received message on the local backend: {}'.format(msg))
if self.should_stop(): break
self.on_message(msg)
# vim:sw=4:ts=4:et:

View file

@ -80,6 +80,7 @@ class PushbulletBackend(Backend):
push = data['push']
else: return # Not a push notification
if 'body' not in push: return
logging.debug('Received push: {}'.format(push))
body = push['body']

View file

@ -11,6 +11,10 @@ backend.pushbullet:
token: your_pushbullet_token_here
device: your_pushbullet_virtual_device_name
backend.local:
request_fifo: /tmp/platypush-requests.fifo
response_fifo: /tmp/platypush-responses.fifo
# device_id: <your_device_id> (default: current hostname)
# debug: True (default: False)

View file

@ -1,6 +1,8 @@
import logging
import inspect
import json
class Message(object):
""" Message generic class """
@ -15,7 +17,7 @@ class Message(object):
for attr in self.__dir__()
if not attr.startswith('_')
and not inspect.ismethod(getattr(self, attr))
})
}).replace('\n', ' ')
def __bytes__(self):
"""
@ -38,7 +40,10 @@ class Message(object):
if isinstance(msg, bytes) or isinstance(msg, bytearray):
msg = msg.decode('utf-8')
if isinstance(msg, str):
msg = json.loads(msg.strip())
try:
msg = json.loads(msg.strip())
except:
logging.warning('Invalid JSON message: {}'.format(msg))
assert isinstance(msg, dict)
return msg
@ -47,11 +52,15 @@ class Message(object):
def build(cls, msg):
"""
Builds a Message object from a dictionary.
To be implemented in the derived classes.
Params:
msg -- The message as a key-value dictionary
msg -- The message as a key-value dictionary, Message object or JSON string
"""
raise RuntimeError('build should be implemented in a derived class')
from platypush.utils import get_message_class_by_type
msg = cls.parse(msg)
msgtype = get_message_class_by_type(msg['type'])
if msgtype != cls: return msgtype.build(msg)
# vim:sw=4:ts=4:et:

46
platypush/pusher/__init__.py Executable file → Normal file
View file

@ -1,13 +1,13 @@
import argparse
import os
import logging
import re
import sys
from platypush.bus import Bus
from platypush.config import Config
from platypush.message.request import Request
from platypush.message.response import Response
from platypush.utils import init_backends, set_timeout, clear_timeout
from platypush.utils import init_backends
class Pusher(object):
"""
@ -49,6 +49,7 @@ class Pusher(object):
# Initialize the configuration
self.config_file = config_file
Config.init(config_file)
logging.basicConfig(level=Config.get('logging'), stream=sys.stdout)
self.on_response = on_response or self.default_on_response()
self.backend = backend or Config.get_default_pusher_backend()
@ -95,40 +96,19 @@ class Pusher(object):
def get_backend(self, name):
# Lazy init
if not self.backends: self.backends = init_backends(bus=self.bus)
if not self.backends:
self.backends = init_backends(bus=self.bus)
if name not in self.backends:
raise RuntimeError('No such backend configured: {}'.format(name))
return self.backends[name]
def on_timeout(self):
""" Default response timeout handle: raise RuntimeError """
def _f():
raise RuntimeError('Response timed out')
return _f
def default_on_response(self):
def _f(response):
print('Received response: {}'.format(response))
os._exit(0)
logging.info('Received response: {}'.format(response))
# self.backend_instance.stop()
return _f
def response_wait(self, request, timeout):
# Install the timeout handler
set_timeout(seconds=timeout, on_timeout=self.on_timeout())
# Loop on the bus until you get a response for your request ID
response_received = False
while not response_received:
msg = self.bus.get()
response_received = (
isinstance(msg, Response) and
hasattr(msg, 'id') and
msg.id == request.id)
if timeout: clear_timeout()
self.on_response(msg)
def push(self, target, action, backend=None, config_file=None,
timeout=default_response_wait_timeout, **kwargs):
"""
@ -161,11 +141,9 @@ class Pusher(object):
'args' : kwargs,
})
b = self.get_backend(backend)
b.start()
b.send_request(req)
if timeout: self.response_wait(request=req, timeout=timeout)
self.backend_instance = self.get_backend(backend)
self.backend_instance.send_request(req, on_response=self.on_response,
response_timeout=timeout)
def main(args=sys.argv[1:]):

View file

@ -42,7 +42,16 @@ def get_or_load_plugin(plugin_name, reload=False):
return plugin
def init_backends(bus=None):
def init_backends(bus=None, **kwargs):
""" Initialize the backend objects based on the configuration and returns
a name -> backend_instance map.
Params:
bus -- If specific (it usually should), the messages processed by the
backends will be posted on this bus.
kwargs -- Any additional key-value parameters required to initialize the backends
"""
backends = {}
for k in Config.get_backends().keys():
@ -56,7 +65,7 @@ def init_backends(bus=None):
) + 'Backend'
try:
b = getattr(module, cls_name)(bus=bus, **cfg)
b = getattr(module, cls_name)(bus=bus, **cfg, **kwargs)
backends[k] = b
except AttributeError as e:
logging.warning('No such class in {}: {}'.format(