[#23] Request/Response ID chaining fixes

[#17] Response wait and response timeout implementation on pusher side
This commit is contained in:
Fabio Manganiello 2017-12-18 22:40:56 +01:00
parent 08f7fce028
commit 21381e7c0f
4 changed files with 69 additions and 9 deletions

View file

@ -133,5 +133,10 @@ class Backend(Thread):
""" Starts the backend thread. To be implemented in the derived classes """ """ Starts the backend thread. To be implemented in the derived classes """
raise NotImplementedError("run should be implemented in a derived class") raise NotImplementedError("run should be implemented in a derived class")
def stop(self):
""" Stops the backend thread (default: do nothing) """
pass
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et:

View file

@ -1,8 +1,17 @@
import os
import sys
import signal
import logging
from queue import Queue from queue import Queue
class Bus(object): class Bus(object):
""" Main local bus where the daemon will listen for new messages """ """ Main local bus where the daemon will listen for new messages """
""" Number of seconds to wait for any pending threads
before the process returns to the OS """
_kill_sec_timeout = 5
def __init__(self, on_msg=None): def __init__(self, on_msg=None):
self.bus = Queue() self.bus = Queue()
self.on_msg = on_msg self.on_msg = on_msg
@ -11,15 +20,29 @@ class Bus(object):
""" Sends a message to the bus """ """ Sends a message to the bus """
self.bus.put(msg) self.bus.put(msg)
def get(self):
""" Reads one message from the bus """
return self.bus.get()
def loop_forever(self): def loop_forever(self):
""" Reads messages from the bus """ """ Reads messages from the bus until KeyboardInterrupt """
def _on_stop_timeout(signum, frame):
logging.warn('Stopping all the active threads after waiting for ' +
'{} seconds'.format(self._kill_sec_timeout))
os._exit(1)
if not self.on_msg: return if not self.on_msg: return
while True: while True:
try: try:
self.on_msg(self.bus.get()) self.on_msg(self.get())
except KeyboardInterrupt: except KeyboardInterrupt:
return logging.info('Received keyboard interrupt ' +
'- terminating application')
signal.signal(signal.SIGALRM, _on_stop_timeout)
signal.alarm(self._kill_sec_timeout)
sys.exit(0)
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et:

View file

@ -31,6 +31,7 @@ class Request(Message):
'args' : msg['args'] if 'args' in msg else {}, 'args' : msg['args'] if 'args' in msg else {},
} }
args['id'] = msg['id'] if 'id' in msg else cls._generate_id()
if 'origin' in msg: args['origin'] = msg['origin'] if 'origin' in msg: args['origin'] = msg['origin']
return Request(**args) return Request(**args)

View file

@ -4,6 +4,7 @@ import re
import signal import signal
import sys import sys
from platypush.bus import Bus
from platypush.config import Config from platypush.config import Config
from platypush.message.request import Request from platypush.message.request import Request
from platypush.message.response import Response from platypush.message.response import Response
@ -15,12 +16,20 @@ def print_usage():
-c, --config:\tPath to the platypush config.yaml (default: ~/.config/platypush/config.yaml or /etc/platypush/config.yaml) -c, --config:\tPath to the platypush config.yaml (default: ~/.config/platypush/config.yaml or /etc/platypush/config.yaml)
-b, --backend:\tBackend to deliver the message [pushbullet|kafka] (default: whatever specified in your config with pusher=True) -b, --backend:\tBackend to deliver the message [pushbullet|kafka] (default: whatever specified in your config with pusher=True)
-t, --target:\tName of the target device/host -t, --target:\tName of the target device/host
-T, --timeout:\tThe application will wait for a response for this number of seconds (default: 5 seconds. A zero value means that the application will exit without waiting for a response)
-a, --action\tAction to run, it includes both the package name and the method (e.g. shell.exec or music.mpd.play) -a, --action\tAction to run, it includes both the package name and the method (e.g. shell.exec or music.mpd.play)
payload:\t\tArguments to the action payload:\t\tArguments to the action
'''.format(sys.argv[0])) '''.format(sys.argv[0]))
def pusher(target, action, backend=None, config=None, **kwargs): _DEFAULT_TIMEOUT_SEC=5
def pusher(target, action, backend=None, config=None,
timeout=_DEFAULT_TIMEOUT_SEC, **kwargs):
def on_timeout(signum, frame):
raise RuntimeError('Response timed out after {} seconds'.format(
timeout))
Config.init(config) Config.init(config)
if target == 'localhost': if target == 'localhost':
@ -34,19 +43,35 @@ def pusher(target, action, backend=None, config=None, **kwargs):
'args' : kwargs, 'args' : kwargs,
}) })
backends = init_backends() bus = Bus()
backends = init_backends(bus=bus)
if backend not in backends: if backend not in backends:
raise RuntimeError('No such backend configured: {}'.format(backend)) raise RuntimeError('No such backend configured: {}'.format(backend))
b = backends[backend] b = backends[backend]
b.start() b.start()
b.send_request(req) b.send_request(req)
if timeout:
signal.signal(signal.SIGALRM, on_timeout)
signal.alarm(timeout)
response_received = False
while not response_received:
msg = bus.get()
response_received = isinstance(msg, Response) and (
hasattr(msg, 'id') and msg.id == req.id)
signal.alarm(0)
print('Response received!')
os._exit(0) os._exit(0)
def main(): def main():
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument('--config', '-c', dest='config', required=False, parser.add_argument('--config', '-c', dest='config', required=False,
help="Configuration file path (default: " + default=None, help="Configuration file path (default: " +
"~/.config/platypush/config.yaml or " + "~/.config/platypush/config.yaml or " +
"/etc/platypush/config.yaml") "/etc/platypush/config.yaml")
@ -57,10 +82,17 @@ def main():
help="Action to execute, as package.method") help="Action to execute, as package.method")
parser.add_argument('--backend', '-b', dest='backend', required=False, parser.add_argument('--backend', '-b', dest='backend', required=False,
help="Backend to deliver the message " + default=None, help="Backend to deliver the message " +
"[pushbullet|kafka|local] (default: whatever " + "[pushbullet|kafka|local] (default: whatever " +
"specified in your config with pusher=True)") "specified in your config with pusher=True)")
parser.add_argument('--timeout', '-T', dest='timeout', required=False,
default=_DEFAULT_TIMEOUT_SEC, help="The application " +
"will wait for a response for this number of seconds " +
"(default: " + str(_DEFAULT_TIMEOUT_SEC) + " seconds. "
"A zero value means that the application " +
" will exit without waiting for a response)")
opts, args = parser.parse_known_args(sys.argv[1:]) opts, args = parser.parse_known_args(sys.argv[1:])
if len(args) % 2 != 0: if len(args) % 2 != 0:
@ -72,8 +104,7 @@ def main():
payload[re.sub('^-+', '', args[i])] = args[i+1] payload[re.sub('^-+', '', args[i])] = args[i+1]
pusher(target=opts.target, action=opts.action, pusher(target=opts.target, action=opts.action,
backend=opts.backend if 'backend' in opts else None, backend=opts.backend, config=opts.config, timeout=opts.timeout,
config=opts.config if 'config' in opts else None,
**payload) **payload)