From 18a5902ac491e3a50f9c4038f8cc95dcf9c56c3f Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Mon, 18 Dec 2017 03:09:38 +0100 Subject: [PATCH] Locking requests and responses with ids --- platypush/__init__.py | 5 +++-- platypush/backend/__init__.py | 6 +++--- platypush/backend/kafka/__init__.py | 1 - platypush/bus/__init__.py | 4 +++- platypush/message/request/__init__.py | 13 ++++++++++++- platypush/message/response/__init__.py | 6 +++++- platypush/pusher/__init__.py | 20 ++++++++++++-------- 7 files changed, 38 insertions(+), 17 deletions(-) diff --git a/platypush/__init__.py b/platypush/__init__.py index 2b1b69b60..5e4f3c626 100644 --- a/platypush/__init__.py +++ b/platypush/__init__.py @@ -31,8 +31,6 @@ def _execute_request(request, retry=True): response = plugin.run(method=method_name, **request.args) if response and response.is_error(): logging.warn('Response processed with errors: {}'.format(response)) - else: - logging.info('Processed response: {}'.format(response)) except Exception as e: response = Response(output=None, errors=[str(e), traceback.format_exc()]) logging.exception(e) @@ -43,7 +41,10 @@ def _execute_request(request, retry=True): finally: # Send the response on the backend that received the request if request.backend and request.origin: + if request.id: response.id = request.id response.target = request.origin + + logging.info('Processing response: {}'.format(response)) request.backend.send_response(response) diff --git a/platypush/backend/__init__.py b/platypush/backend/__init__.py index 8852f11d5..24e2cac85 100644 --- a/platypush/backend/__init__.py +++ b/platypush/backend/__init__.py @@ -2,9 +2,9 @@ import importlib import logging import sys -from queue import Queue from threading import Thread +from platypush.bus import Bus from platypush.config import Config from platypush.message import Message from platypush.message.request import Request @@ -17,13 +17,13 @@ class Backend(Thread): """ Params: bus -- Reference to the Platypush bus where the requests and the - responses will be posted [Queue] + responses will be posted [Bus] kwargs -- key-value configuration for this backend [Dict] """ # If no bus is specified, create an internal queue where # the received messages will be pushed - self.bus = bus if bus else Queue() + self.bus = bus if bus else Bus() self.device_id = Config.get('device_id') self.msgtypes = {} diff --git a/platypush/backend/kafka/__init__.py b/platypush/backend/kafka/__init__.py index 612f783ee..bb8d83419 100644 --- a/platypush/backend/kafka/__init__.py +++ b/platypush/backend/kafka/__init__.py @@ -13,7 +13,6 @@ class KafkaBackend(Backend): self.topic_prefix = topic self.topic = self._topic_by_device_id(self.device_id) self.producer = None - self._init_producer() def _on_record(self, record): if record.topic != self.topic: return diff --git a/platypush/bus/__init__.py b/platypush/bus/__init__.py index 1ee474586..46086b3a2 100644 --- a/platypush/bus/__init__.py +++ b/platypush/bus/__init__.py @@ -3,7 +3,7 @@ from queue import Queue class Bus(object): """ Main local bus where the daemon will listen for new messages """ - def __init__(self, on_msg): + def __init__(self, on_msg=None): self.bus = Queue() self.on_msg = on_msg @@ -13,6 +13,8 @@ class Bus(object): def loop_forever(self): """ Reads messages from the bus """ + if not self.on_msg: return + while True: try: self.on_msg(self.bus.get()) diff --git a/platypush/message/request/__init__.py b/platypush/message/request/__init__.py index 6b03e368f..e1f42a66e 100644 --- a/platypush/message/request/__init__.py +++ b/platypush/message/request/__init__.py @@ -1,19 +1,22 @@ import json +import random from platypush.message import Message class Request(Message): """ Request message class """ - def __init__(self, target, action, origin=None, args={}): + def __init__(self, target, action, origin=None, id=None, args={}): """ Params: target -- Target node [String] action -- Action to be executed (e.g. music.mpd.play) [String] origin -- Origin node [String] + id -- Message ID, or None to get it auto-generated args -- Additional arguments for the action [Dict] """ + self.id = id if id else self._generate_id() self.target = target self.action = action self.origin = origin @@ -31,6 +34,13 @@ class Request(Message): if 'origin' in msg: args['origin'] = msg['origin'] return Request(**args) + @staticmethod + def _generate_id(): + id = '' + for i in range(0,16): + id += '%.2x' % random.randint(0, 255) + return id + def __str__(self): """ Overrides the str() operator and converts @@ -43,6 +53,7 @@ class Request(Message): 'action' : self.action, 'args' : self.args, 'origin' : self.origin if hasattr(self, 'origin') else None, + 'id' : self.id if hasattr(self, 'id') else None, }) diff --git a/platypush/message/response/__init__.py b/platypush/message/response/__init__.py index bf3be681a..a7156eff6 100644 --- a/platypush/message/response/__init__.py +++ b/platypush/message/response/__init__.py @@ -5,19 +5,21 @@ from platypush.message import Message class Response(Message): """ Response message class """ - def __init__(self, target=None, origin=None, output=None, errors=[]): + def __init__(self, target=None, origin=None, id=None, output=None, errors=[]): """ Params: target -- Target [String] origin -- Origin [String] output -- Output [String] errors -- Errors [List of strings or exceptions] + id -- Message ID this response refers to """ self.target = target self.output = output self.errors = errors self.origin = origin + self.id = id def is_error(self): """ Returns True if the respopnse has errors """ @@ -32,6 +34,7 @@ class Response(Message): 'errors' : msg['response']['errors'], } + if 'id' in msg: args['id'] = msg['id'] if 'origin' in msg: args['origin'] = msg['origin'] return Response(**args) @@ -43,6 +46,7 @@ class Response(Message): """ return json.dumps({ + 'id' : self.id, 'type' : 'response', 'target' : self.target if hasattr(self, 'target') else None, 'origin' : self.origin if hasattr(self, 'origin') else None, diff --git a/platypush/pusher/__init__.py b/platypush/pusher/__init__.py index 767b86fd1..0e0c31d7b 100755 --- a/platypush/pusher/__init__.py +++ b/platypush/pusher/__init__.py @@ -1,10 +1,13 @@ import argparse +import os import re +import signal import sys from platypush.config import Config -from platypush.utils import init_backends from platypush.message.request import Request +from platypush.message.response import Response +from platypush.utils import init_backends def print_usage(): print ('''Usage: {} [-h|--help] <-t|--target > <-a|--action > payload @@ -25,18 +28,19 @@ def pusher(target, action, backend=None, config=None, **kwargs): elif not backend: backend = Config.get_default_pusher_backend() - # TODO Initialize a local bus and wait for the response - backends = init_backends() - if backend not in backends: - raise RuntimeError('No such backend configured: {}'.format(backend)) - - b = backends[backend] - b.send_request({ + req = Request.build({ 'target' : target, 'action' : action, 'args' : kwargs, }) + backends = init_backends() + if backend not in backends: + raise RuntimeError('No such backend configured: {}'.format(backend)) + + b = backends[backend] + b.start() + b.send_request(req) def main(): parser = argparse.ArgumentParser()