diff --git a/platypush/backend/__init__.py b/platypush/backend/__init__.py index 24e2cac85c..a8e3361bd7 100644 --- a/platypush/backend/__init__.py +++ b/platypush/backend/__init__.py @@ -133,5 +133,10 @@ class Backend(Thread): """ Starts the backend thread. To be implemented in the derived classes """ raise NotImplementedError("run should be implemented in a derived class") + def stop(self): + """ Stops the backend thread (default: do nothing) """ + pass + + # vim:sw=4:ts=4:et: diff --git a/platypush/bus/__init__.py b/platypush/bus/__init__.py index 46086b3a2e..52010c17fd 100644 --- a/platypush/bus/__init__.py +++ b/platypush/bus/__init__.py @@ -1,8 +1,17 @@ +import os +import sys +import signal +import logging + from queue import Queue class Bus(object): """ Main local bus where the daemon will listen for new messages """ + """ Number of seconds to wait for any pending threads + before the process returns to the OS """ + _kill_sec_timeout = 5 + def __init__(self, on_msg=None): self.bus = Queue() self.on_msg = on_msg @@ -11,15 +20,29 @@ class Bus(object): """ Sends a message to the bus """ self.bus.put(msg) + def get(self): + """ Reads one message from the bus """ + return self.bus.get() + def loop_forever(self): - """ Reads messages from the bus """ + """ Reads messages from the bus until KeyboardInterrupt """ + def _on_stop_timeout(signum, frame): + logging.warn('Stopping all the active threads after waiting for ' + + '{} seconds'.format(self._kill_sec_timeout)) + os._exit(1) + if not self.on_msg: return while True: try: - self.on_msg(self.bus.get()) + self.on_msg(self.get()) except KeyboardInterrupt: - return + logging.info('Received keyboard interrupt ' + + '- terminating application') + + signal.signal(signal.SIGALRM, _on_stop_timeout) + signal.alarm(self._kill_sec_timeout) + sys.exit(0) # vim:sw=4:ts=4:et: diff --git a/platypush/message/request/__init__.py b/platypush/message/request/__init__.py index e1f42a66e9..f0083c3241 100644 --- a/platypush/message/request/__init__.py +++ b/platypush/message/request/__init__.py @@ -31,6 +31,7 @@ class Request(Message): 'args' : msg['args'] if 'args' in msg else {}, } + args['id'] = msg['id'] if 'id' in msg else cls._generate_id() if 'origin' in msg: args['origin'] = msg['origin'] return Request(**args) diff --git a/platypush/pusher/__init__.py b/platypush/pusher/__init__.py index 6b3664067d..722cd1984e 100755 --- a/platypush/pusher/__init__.py +++ b/platypush/pusher/__init__.py @@ -4,6 +4,7 @@ import re import signal import sys +from platypush.bus import Bus from platypush.config import Config from platypush.message.request import Request from platypush.message.response import Response @@ -15,12 +16,20 @@ def print_usage(): -c, --config:\tPath to the platypush config.yaml (default: ~/.config/platypush/config.yaml or /etc/platypush/config.yaml) -b, --backend:\tBackend to deliver the message [pushbullet|kafka] (default: whatever specified in your config with pusher=True) -t, --target:\tName of the target device/host + -T, --timeout:\tThe application will wait for a response for this number of seconds (default: 5 seconds. A zero value means that the application will exit without waiting for a response) -a, --action\tAction to run, it includes both the package name and the method (e.g. shell.exec or music.mpd.play) payload:\t\tArguments to the action '''.format(sys.argv[0])) -def pusher(target, action, backend=None, config=None, **kwargs): +_DEFAULT_TIMEOUT_SEC=5 + +def pusher(target, action, backend=None, config=None, + timeout=_DEFAULT_TIMEOUT_SEC, **kwargs): + def on_timeout(signum, frame): + raise RuntimeError('Response timed out after {} seconds'.format( + timeout)) + Config.init(config) if target == 'localhost': @@ -34,19 +43,35 @@ def pusher(target, action, backend=None, config=None, **kwargs): 'args' : kwargs, }) - backends = init_backends() + bus = Bus() + backends = init_backends(bus=bus) if backend not in backends: raise RuntimeError('No such backend configured: {}'.format(backend)) b = backends[backend] b.start() b.send_request(req) + + if timeout: + signal.signal(signal.SIGALRM, on_timeout) + signal.alarm(timeout) + + response_received = False + while not response_received: + msg = bus.get() + response_received = isinstance(msg, Response) and ( + hasattr(msg, 'id') and msg.id == req.id) + + signal.alarm(0) + print('Response received!') + os._exit(0) + def main(): parser = argparse.ArgumentParser() parser.add_argument('--config', '-c', dest='config', required=False, - help="Configuration file path (default: " + + default=None, help="Configuration file path (default: " + "~/.config/platypush/config.yaml or " + "/etc/platypush/config.yaml") @@ -57,10 +82,17 @@ def main(): help="Action to execute, as package.method") parser.add_argument('--backend', '-b', dest='backend', required=False, - help="Backend to deliver the message " + + default=None, help="Backend to deliver the message " + "[pushbullet|kafka|local] (default: whatever " + "specified in your config with pusher=True)") + parser.add_argument('--timeout', '-T', dest='timeout', required=False, + default=_DEFAULT_TIMEOUT_SEC, help="The application " + + "will wait for a response for this number of seconds " + + "(default: " + str(_DEFAULT_TIMEOUT_SEC) + " seconds. " + "A zero value means that the application " + + " will exit without waiting for a response)") + opts, args = parser.parse_known_args(sys.argv[1:]) if len(args) % 2 != 0: @@ -72,8 +104,7 @@ def main(): payload[re.sub('^-+', '', args[i])] = args[i+1] pusher(target=opts.target, action=opts.action, - backend=opts.backend if 'backend' in opts else None, - config=opts.config if 'config' in opts else None, + backend=opts.backend, config=opts.config, timeout=opts.timeout, **payload)