From d5c31d938b715cd1da858938b2df8ae16a2adab7 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Sat, 23 Feb 2019 23:58:43 +0100 Subject: [PATCH] Run each message handling logic on the bus on a separate thread to make sure that messages don't queue up too much --- platypush/backend/http/app/__init__.py | 4 ++++ platypush/bus/__init__.py | 17 +++++++++++------ 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/platypush/backend/http/app/__init__.py b/platypush/backend/http/app/__init__.py index 350622e476..4e28d8c116 100644 --- a/platypush/backend/http/app/__init__.py +++ b/platypush/backend/http/app/__init__.py @@ -21,4 +21,8 @@ for route in get_routes(): app.register_blueprint(route) +if __name__ == '__main__': + app.run() + + # vim:sw=4:ts=4:et: diff --git a/platypush/bus/__init__.py b/platypush/bus/__init__.py index 501e1d55f4..b053d56740 100644 --- a/platypush/bus/__init__.py +++ b/platypush/bus/__init__.py @@ -37,6 +37,16 @@ class Bus(object): self.post(evt) + def _msg_executor(self, msg): + def executor(): + try: + self.on_message(msg) + except Exception as e: + logger.error('Error on processing message {}'.format(msg)) + logger.exception(e) + + return executor + def poll(self): """ Reads messages from the bus until either stop event message or KeyboardInterrupt @@ -54,11 +64,7 @@ class Bus(object): .format(int(time.time()-msg.timestamp), msg)) continue - try: - self.on_message(msg) - except Exception as e: - logger.error('Error on processing message {}'.format(msg)) - logger.exception(e) + threading.Thread(target=self._msg_executor(msg)).start() if isinstance(msg, StopEvent) and msg.targets_me(): logger.info('Received STOP event on the bus') @@ -66,4 +72,3 @@ class Bus(object): # vim:sw=4:ts=4:et: -