Locking requests and responses with ids

This commit is contained in:
Fabio Manganiello 2017-12-18 03:09:38 +01:00
parent 97b6fab376
commit 18a5902ac4
7 changed files with 38 additions and 17 deletions

View File

@ -31,8 +31,6 @@ def _execute_request(request, retry=True):
response = plugin.run(method=method_name, **request.args)
if response and response.is_error():
logging.warn('Response processed with errors: {}'.format(response))
else:
logging.info('Processed response: {}'.format(response))
except Exception as e:
response = Response(output=None, errors=[str(e), traceback.format_exc()])
logging.exception(e)
@ -43,7 +41,10 @@ def _execute_request(request, retry=True):
finally:
# Send the response on the backend that received the request
if request.backend and request.origin:
if request.id: response.id = request.id
response.target = request.origin
logging.info('Processing response: {}'.format(response))
request.backend.send_response(response)

View File

@ -2,9 +2,9 @@ import importlib
import logging
import sys
from queue import Queue
from threading import Thread
from platypush.bus import Bus
from platypush.config import Config
from platypush.message import Message
from platypush.message.request import Request
@ -17,13 +17,13 @@ class Backend(Thread):
"""
Params:
bus -- Reference to the Platypush bus where the requests and the
responses will be posted [Queue]
responses will be posted [Bus]
kwargs -- key-value configuration for this backend [Dict]
"""
# If no bus is specified, create an internal queue where
# the received messages will be pushed
self.bus = bus if bus else Queue()
self.bus = bus if bus else Bus()
self.device_id = Config.get('device_id')
self.msgtypes = {}

View File

@ -13,7 +13,6 @@ class KafkaBackend(Backend):
self.topic_prefix = topic
self.topic = self._topic_by_device_id(self.device_id)
self.producer = None
self._init_producer()
def _on_record(self, record):
if record.topic != self.topic: return

View File

@ -3,7 +3,7 @@ from queue import Queue
class Bus(object):
""" Main local bus where the daemon will listen for new messages """
def __init__(self, on_msg):
def __init__(self, on_msg=None):
self.bus = Queue()
self.on_msg = on_msg
@ -13,6 +13,8 @@ class Bus(object):
def loop_forever(self):
""" Reads messages from the bus """
if not self.on_msg: return
while True:
try:
self.on_msg(self.bus.get())

View File

@ -1,19 +1,22 @@
import json
import random
from platypush.message import Message
class Request(Message):
""" Request message class """
def __init__(self, target, action, origin=None, args={}):
def __init__(self, target, action, origin=None, id=None, args={}):
"""
Params:
target -- Target node [String]
action -- Action to be executed (e.g. music.mpd.play) [String]
origin -- Origin node [String]
id -- Message ID, or None to get it auto-generated
args -- Additional arguments for the action [Dict]
"""
self.id = id if id else self._generate_id()
self.target = target
self.action = action
self.origin = origin
@ -31,6 +34,13 @@ class Request(Message):
if 'origin' in msg: args['origin'] = msg['origin']
return Request(**args)
@staticmethod
def _generate_id():
id = ''
for i in range(0,16):
id += '%.2x' % random.randint(0, 255)
return id
def __str__(self):
"""
Overrides the str() operator and converts
@ -43,6 +53,7 @@ class Request(Message):
'action' : self.action,
'args' : self.args,
'origin' : self.origin if hasattr(self, 'origin') else None,
'id' : self.id if hasattr(self, 'id') else None,
})

View File

@ -5,19 +5,21 @@ from platypush.message import Message
class Response(Message):
""" Response message class """
def __init__(self, target=None, origin=None, output=None, errors=[]):
def __init__(self, target=None, origin=None, id=None, output=None, errors=[]):
"""
Params:
target -- Target [String]
origin -- Origin [String]
output -- Output [String]
errors -- Errors [List of strings or exceptions]
id -- Message ID this response refers to
"""
self.target = target
self.output = output
self.errors = errors
self.origin = origin
self.id = id
def is_error(self):
""" Returns True if the respopnse has errors """
@ -32,6 +34,7 @@ class Response(Message):
'errors' : msg['response']['errors'],
}
if 'id' in msg: args['id'] = msg['id']
if 'origin' in msg: args['origin'] = msg['origin']
return Response(**args)
@ -43,6 +46,7 @@ class Response(Message):
"""
return json.dumps({
'id' : self.id,
'type' : 'response',
'target' : self.target if hasattr(self, 'target') else None,
'origin' : self.origin if hasattr(self, 'origin') else None,

View File

@ -1,10 +1,13 @@
import argparse
import os
import re
import signal
import sys
from platypush.config import Config
from platypush.utils import init_backends
from platypush.message.request import Request
from platypush.message.response import Response
from platypush.utils import init_backends
def print_usage():
print ('''Usage: {} [-h|--help] <-t|--target <target name>> <-a|--action <action name>> payload
@ -25,18 +28,19 @@ def pusher(target, action, backend=None, config=None, **kwargs):
elif not backend:
backend = Config.get_default_pusher_backend()
# TODO Initialize a local bus and wait for the response
backends = init_backends()
if backend not in backends:
raise RuntimeError('No such backend configured: {}'.format(backend))
b = backends[backend]
b.send_request({
req = Request.build({
'target' : target,
'action' : action,
'args' : kwargs,
})
backends = init_backends()
if backend not in backends:
raise RuntimeError('No such backend configured: {}'.format(backend))
b = backends[backend]
b.start()
b.send_request(req)
def main():
parser = argparse.ArgumentParser()