From 7716a416e970afcfdb163ed53d5b514d7401832f Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Tue, 9 May 2023 02:18:58 +0200 Subject: [PATCH] [#260] Support for sending events via websocket over `/ws/events`. --- platypush/backend/http/app/utils/bus.py | 5 +++-- platypush/backend/http/app/ws/_base.py | 2 +- platypush/backend/http/app/ws/events.py | 15 ++++++++++++++- 3 files changed, 18 insertions(+), 4 deletions(-) diff --git a/platypush/backend/http/app/utils/bus.py b/platypush/backend/http/app/utils/bus.py index 5834a46d8..54556cede 100644 --- a/platypush/backend/http/app/utils/bus.py +++ b/platypush/backend/http/app/utils/bus.py @@ -1,8 +1,8 @@ -from flask import current_app from redis import Redis from platypush.bus.redis import RedisBus from platypush.config import Config +from platypush.context import get_backend from platypush.message import Message from platypush.message.request import Request from platypush.utils import get_redis_queue_name_by_message @@ -15,7 +15,8 @@ _bus = None def bus(): global _bus if _bus is None: - _bus = RedisBus(redis_queue=current_app.config.get('redis_queue')) + redis_queue = get_backend('http').bus.redis_queue # type: ignore + _bus = RedisBus(redis_queue=redis_queue) return _bus diff --git a/platypush/backend/http/app/ws/_base.py b/platypush/backend/http/app/ws/_base.py index d1596940e..47e449885 100644 --- a/platypush/backend/http/app/ws/_base.py +++ b/platypush/backend/http/app/ws/_base.py @@ -51,7 +51,7 @@ class WSRoute(WebSocketHandler, Thread, ABC): pass @override - def on_message(self, *_, **__): + def on_message(self, message): # type: ignore pass @abstractclassmethod diff --git a/platypush/backend/http/app/ws/events.py b/platypush/backend/http/app/ws/events.py index 42cd75b08..e96833aad 100644 --- a/platypush/backend/http/app/ws/events.py +++ b/platypush/backend/http/app/ws/events.py @@ -3,6 +3,7 @@ from typing_extensions import override from platypush.message.event import Event from . import WSRoute, logger, pubsub_redis_topic +from ..utils import send_message events_redis_topic = pubsub_redis_topic('events') @@ -21,11 +22,23 @@ class WSEventProxy(WSRoute): def app_name(cls) -> str: return 'events' + @override + def on_message(self, message): + try: + event = Event.build(message) + assert isinstance(event, Event), f'Expected {Event}, got {type(event)}' + except Exception as e: + logger.info('Could not build event from %s: %s', message, e) + logger.exception(e) + return + + send_message(event, wait_for_response=False) + @override def run(self) -> None: for msg in self.listen(): try: - evt = Event.build(msg.decode()) + evt = Event.build(msg) except Exception as e: logger.warning('Error parsing event: %s: %s', msg, e) continue