From 3dfcf0ec97c3ce0c25f75153a4b2d1f5dbfef0a6 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Fri, 22 Dec 2017 02:11:56 +0100 Subject: [PATCH] Added tests --- platypush/__init__.py | 32 +++++++------ platypush/backend/__init__.py | 6 +-- platypush/backend/local/__init__.py | 14 ++---- platypush/bus/__init__.py | 13 +++++- run_test.sh | 11 +++++ tests/context.py | 20 +++++++++ tests/test_local.py | 69 +++++++++++++++++++++++++++++ 7 files changed, 138 insertions(+), 27 deletions(-) create mode 100755 run_test.sh create mode 100644 tests/context.py create mode 100644 tests/test_local.py diff --git a/platypush/__init__.py b/platypush/__init__.py index 3212eca0c..f830cec27 100644 --- a/platypush/__init__.py +++ b/platypush/__init__.py @@ -34,17 +34,18 @@ class Daemon(object): """ number of executions retries before a request fails """ n_tries = 2 - def __init__(self, config_file=None, message_handler=None): + def __init__(self, config_file=None, requests_to_process=None): """ Constructor Params: config_file -- Configuration file override (default: None) - message_handler -- Another function that will receive the messages. - If set, Platypush will just act as a message proxy. Useful to - embed into other projects, for tests, or for delegating events. + requests_to_process -- Exit after processing the specified number + of requests (default: None, loop forever) """ self.config_file = config_file - self.message_handler = message_handler + self.requests_to_process = requests_to_process + self.processed_requests = 0 + Config.init(self.config_file) logging.basicConfig(level=Config.get('logging'), stream=sys.stdout) @@ -69,11 +70,6 @@ class Daemon(object): Params: msg -- platypush.message.Message instance """ - if self.message_handler: - # Proxy the message - self.message_handler(msg) - return - if isinstance(msg, Request): logging.info('Processing request: {}'.format(msg)) Thread(target=self.run_request(), args=(msg,)).start() @@ -122,8 +118,19 @@ class Daemon(object): logging.info('Dropping response whose request has no ' + 'origin attached: {}'.format(request)) + self.processed_requests += 1 + if self.processed_requests >= self.requests_to_process: + self.stop_app() + return _thread_func + + def stop_app(self): + for backend in self.backends.values(): + backend.stop() + self.bus.stop() + + def start(self): """ Start the daemon """ self.bus = Bus(on_message=self.on_message()) @@ -140,9 +147,8 @@ class Daemon(object): self.bus.poll() except KeyboardInterrupt as e: logging.info('SIGINT received, terminating application') - - for backend in self.backends.values(): - backend.stop() + finally: + self.stop_app() def main(args=sys.argv[1:]): diff --git a/platypush/backend/__init__.py b/platypush/backend/__init__.py index cf37a1332..28dd565b6 100644 --- a/platypush/backend/__init__.py +++ b/platypush/backend/__init__.py @@ -109,8 +109,8 @@ class Backend(Thread): **self._get_backend_config(), **self._kwargs) # Set the response timeout - set_timeout(seconds=self._default_response_timeout, - on_timeout=_timeout_hndl) + if response_timeout: + set_timeout(seconds=response_timeout, on_timeout=_timeout_hndl) resp_backend.start() @@ -136,7 +136,7 @@ class Backend(Thread): request.origin = self.device_id - if on_response and response_timeout: + if on_response and response_timeout != 0: self._setup_response_handler(request, on_response, response_timeout) self.send_message(request, **kwargs) diff --git a/platypush/backend/local/__init__.py b/platypush/backend/local/__init__.py index 7429de470..b77105e99 100644 --- a/platypush/backend/local/__init__.py +++ b/platypush/backend/local/__init__.py @@ -2,10 +2,12 @@ import logging import json import os import time +import threading from .. import Backend from platypush.message import Message +from platypush.message.event import StopEvent from platypush.message.request import Request from platypush.message.response import Response @@ -32,6 +34,7 @@ class LocalBackend(Backend): else self.request_fifo msg = '{}\n'.format(str(msg)).encode('utf-8') + with open(fifo, 'wb') as f: f.write(msg) @@ -44,14 +47,6 @@ class LocalBackend(Backend): return Message.build(msg) if len(msg) else None - def on_stop(self): - try: os.remove(self.request_fifo) - except: pass - - try: os.remove(self.response_fifo) - except: pass - - def run(self): super().run() logging.info('Initialized local backend on {} and {}'. @@ -66,8 +61,7 @@ class LocalBackend(Backend): time.sleep(0.2) continue - # logging.debug('Received message on the local backend: {}'.format(msg)) - logging.info('Received message on the local backend: {}'.format(msg)) + logging.debug('Received message on the local backend: {}, thread_id: '.format(msg, self.thread_id)) if self.should_stop(): break self.on_message(msg) diff --git a/platypush/bus/__init__.py b/platypush/bus/__init__.py index c2c53ce57..f5bf113b0 100644 --- a/platypush/bus/__init__.py +++ b/platypush/bus/__init__.py @@ -2,10 +2,12 @@ import os import sys import signal import logging +import threading from enum import Enum from queue import Queue +from platypush.config import Config from platypush.message.event import Event, StopEvent class Bus(object): @@ -14,6 +16,7 @@ class Bus(object): def __init__(self, on_message=None): self.bus = Queue() self.on_message = on_message + self.thread_id = threading.get_ident() def post(self, msg): """ Sends a message to the bus """ @@ -23,6 +26,14 @@ class Bus(object): """ Reads one message from the bus """ return self.bus.get() + def stop(self): + """ Stops the bus by sending a STOP event """ + evt = StopEvent(target=Config.get('device_id'), + origin=Config.get('device_id'), + thread_id=self.thread_id) + + self.bus.put(evt) + def poll(self): """ Reads messages from the bus until either stop event message or KeyboardInterrupt @@ -38,7 +49,7 @@ class Bus(object): self.on_message(msg) if isinstance(msg, StopEvent) and msg.targets_me(): - logging.info('Received STOP event') + logging.info('Received STOP event on the bus') stop=True diff --git a/run_test.sh b/run_test.sh new file mode 100755 index 000000000..1b3b1bf49 --- /dev/null +++ b/run_test.sh @@ -0,0 +1,11 @@ +#!/bin/bash + +PYTHON=python + +for testcase in tests/test_*.py +do + $PYTHON -m unittest $testcase +done + +# vim:sw=4:ts=4:et: + diff --git a/tests/context.py b/tests/context.py new file mode 100644 index 000000000..313cb438f --- /dev/null +++ b/tests/context.py @@ -0,0 +1,20 @@ +import os +import sys + +testdir = os.path.dirname(__file__) +sys.path.insert(0, os.path.abspath(os.path.join(testdir, '..'))) +config_file = os.path.join(testdir, 'etc', 'config.yaml') + +from platypush.config import Config +Config.init(config_file) + +import platypush + + +class TestTimeoutException(RuntimeError): + def __init__(self, msg): + self.msg = msg + + +# vim:sw=4:ts=4:et: + diff --git a/tests/test_local.py b/tests/test_local.py new file mode 100644 index 000000000..d4947c801 --- /dev/null +++ b/tests/test_local.py @@ -0,0 +1,69 @@ +from .context import platypush, config_file, TestTimeoutException + +import os +import sys +import logging +import unittest +import threading + +from threading import Thread + +from platypush import Daemon +from platypush.config import Config +from platypush.pusher import Pusher +from platypush.utils import set_timeout, clear_timeout + +class TestLocal(unittest.TestCase): + timeout = 5 + + def setUp(self): + logging.basicConfig(level=logging.INFO, stream=sys.stdout) + backends = Config.get_backends() + self.assertTrue('local' in backends) + + + def test_local_shell_exec_flow(self): + self.start_sender() + self.start_receiver() + + def on_response(self): + def _f(response): + logging.info("Received response: {}".format(response)) + self.assertEqual(response.output.strip(), 'ping') + # os._exit(0) + return _f + + def on_timeout(self, msg): + def _f(): raise TestTimeoutException(msg) + return _f + + def start_sender(self): + def _run_sender(): + pusher = Pusher(config_file=config_file, backend='local', + on_response=self.on_response()) + + logging.info('Sending request') + pusher.push(target=Config.get('device_id'), action='shell.exec', + cmd='echo ping', timeout=None) + + + # Start the sender thread and wait for a response + set_timeout(seconds=self.timeout, + on_timeout=self.on_timeout('Receiver response timed out')) + + self.sender = Thread(target=_run_sender) + self.sender.start() + + def start_receiver(self): + set_timeout(seconds=self.timeout, + on_timeout=self.on_timeout('Sender request timed out')) + + self.receiver = Daemon(config_file=config_file, requests_to_process=1) + self.receiver.start() + + +if __name__ == '__main__': + unittest.main() + +# vim:sw=4:ts=4:et: +