From d215410a6a8aa2a19efd5f50eddd7058168eb052 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Mon, 24 Sep 2018 23:13:44 +0200 Subject: [PATCH] More consistent flow for messages received by a backend --- platypush/__init__.py | 2 +- platypush/backend/http/__init__.py | 17 +++++------------ tests/test_http.py | 4 ++-- 3 files changed, 8 insertions(+), 15 deletions(-) diff --git a/platypush/__init__.py b/platypush/__init__.py index fb4de4d2..727d14e2 100644 --- a/platypush/__init__.py +++ b/platypush/__init__.py @@ -114,7 +114,6 @@ class Daemon: def stop_app(self): """ Stops the backends and the bus """ - self.bus.post(ApplicationStoppedEvent()) for backend in self.backends.values(): backend.stop() self.bus.stop() @@ -143,6 +142,7 @@ class Daemon: except KeyboardInterrupt: LOGGER.info('SIGINT received, terminating application') finally: + self.bus.post(ApplicationStoppedEvent()) self.stop_app() diff --git a/platypush/backend/http/__init__.py b/platypush/backend/http/__init__.py index 3d8916c4..16b46b9b 100644 --- a/platypush/backend/http/__init__.py +++ b/platypush/backend/http/__init__.py @@ -169,19 +169,14 @@ class HttpBackend(Backend): def redis_poll(self): """ Polls for new messages on the internal Redis queue """ - while True: + while not self.should_stop(): redis = self._get_redis() if not redis: continue msg = redis.blpop(self.redis_queue) msg = Message.build(json.loads(msg[1].decode('utf-8'))) - - if isinstance(msg, StopEvent) and \ - msg.args.get('thread_id') == get_ident(): - break - - self.bus.post(msg) + self.on_message(msg) @classmethod @@ -243,7 +238,7 @@ class HttpBackend(Backend): msg.origin = 'http' redis = self._get_redis() - self.bus.post(msg) + self.on_message(msg) if isinstance(msg, Request): response = redis.blpop(get_redis_queue_name_by_message(msg), timeout=60) @@ -402,16 +397,14 @@ class HttpBackend(Backend): def run(self): super().run() os.putenv('FLASK_APP', 'platypush') - os.putenv('FLASK_ENV', 'development') + os.putenv('FLASK_ENV', 'production') self.logger.info('Initialized HTTP backend on port {}'.format(self.port)) webserver = self.webserver() self.server_proc = Process(target=webserver.run, kwargs={ - 'debug':True, 'host':'0.0.0.0', 'port':self.port, 'use_reloader':False + 'host':'0.0.0.0', 'port':self.port, 'use_reloader':False }) - time.sleep(1) - self.server_proc.start() if not self.disable_websocket: diff --git a/tests/test_http.py b/tests/test_http.py index 62fb986b..07e28330 100644 --- a/tests/test_http.py +++ b/tests/test_http.py @@ -28,12 +28,12 @@ class TestHttp(unittest.TestCase): def test_request_exec_flow(self): self.start_daemon() - time.sleep(2) + time.sleep(1) self.send_request() def start_daemon(self): def _f(): - self.receiver = Daemon(config_file=config_file, requests_to_process=1) + self.receiver = Daemon(config_file=config_file) self.receiver.start() Thread(target=_f).start()