From dc96b4995ca2aa3b795f15fc4a48bcb86a9ea309 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Tue, 16 Jul 2024 20:56:51 +0200 Subject: [PATCH] [core] Added `ApplicationStartedEvent` to Redis bus instead of application. The Redis bus now uses a pub/sub architecture rather than a simple queue. Earlier on, the application could post an event to the queue and then pick it up when it started listening. When doing a publish on a pub/sub channel, however, any messages sent before the client started listening will be lost. --- platypush/app/_app.py | 2 -- platypush/bus/redis.py | 4 ++++ 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/platypush/app/_app.py b/platypush/app/_app.py index 76204a145e..6d3d2f9240 100644 --- a/platypush/app/_app.py +++ b/platypush/app/_app.py @@ -19,7 +19,6 @@ from platypush.entities import init_entities_engine, EntitiesEngine from platypush.event.processor import EventProcessor from platypush.logger import Logger from platypush.message.event import Event -from platypush.message.event.application import ApplicationStartedEvent from platypush.message.request import Request from platypush.message.response import Response from platypush.utils import get_enabled_plugins, get_redis_conf @@ -462,7 +461,6 @@ class Application: self.cron_scheduler.start() assert self.bus, 'The bus is not running' - self.bus.post(ApplicationStartedEvent()) # Poll for messages on the bus try: diff --git a/platypush/bus/redis.py b/platypush/bus/redis.py index ab836da5fc..c23c74622b 100644 --- a/platypush/bus/redis.py +++ b/platypush/bus/redis.py @@ -37,8 +37,12 @@ class RedisBus(Bus): """ Polls the Redis queue for new messages """ + from platypush.message.event.application import ApplicationStartedEvent + with self.pubsub as pubsub: pubsub.subscribe(self.redis_queue) + self.post(ApplicationStartedEvent()) + try: for msg in pubsub.listen(): if msg.get('type') != 'message':