forked from platypush/platypush
[#260] Support for sending events via websocket over /ws/events
.
This commit is contained in:
parent
edb7197f71
commit
7716a416e9
3 changed files with 18 additions and 4 deletions
|
@ -1,8 +1,8 @@
|
||||||
from flask import current_app
|
|
||||||
from redis import Redis
|
from redis import Redis
|
||||||
|
|
||||||
from platypush.bus.redis import RedisBus
|
from platypush.bus.redis import RedisBus
|
||||||
from platypush.config import Config
|
from platypush.config import Config
|
||||||
|
from platypush.context import get_backend
|
||||||
from platypush.message import Message
|
from platypush.message import Message
|
||||||
from platypush.message.request import Request
|
from platypush.message.request import Request
|
||||||
from platypush.utils import get_redis_queue_name_by_message
|
from platypush.utils import get_redis_queue_name_by_message
|
||||||
|
@ -15,7 +15,8 @@ _bus = None
|
||||||
def bus():
|
def bus():
|
||||||
global _bus
|
global _bus
|
||||||
if _bus is None:
|
if _bus is None:
|
||||||
_bus = RedisBus(redis_queue=current_app.config.get('redis_queue'))
|
redis_queue = get_backend('http').bus.redis_queue # type: ignore
|
||||||
|
_bus = RedisBus(redis_queue=redis_queue)
|
||||||
return _bus
|
return _bus
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -51,7 +51,7 @@ class WSRoute(WebSocketHandler, Thread, ABC):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@override
|
@override
|
||||||
def on_message(self, *_, **__):
|
def on_message(self, message): # type: ignore
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@abstractclassmethod
|
@abstractclassmethod
|
||||||
|
|
|
@ -3,6 +3,7 @@ from typing_extensions import override
|
||||||
from platypush.message.event import Event
|
from platypush.message.event import Event
|
||||||
|
|
||||||
from . import WSRoute, logger, pubsub_redis_topic
|
from . import WSRoute, logger, pubsub_redis_topic
|
||||||
|
from ..utils import send_message
|
||||||
|
|
||||||
events_redis_topic = pubsub_redis_topic('events')
|
events_redis_topic = pubsub_redis_topic('events')
|
||||||
|
|
||||||
|
@ -21,11 +22,23 @@ class WSEventProxy(WSRoute):
|
||||||
def app_name(cls) -> str:
|
def app_name(cls) -> str:
|
||||||
return 'events'
|
return 'events'
|
||||||
|
|
||||||
|
@override
|
||||||
|
def on_message(self, message):
|
||||||
|
try:
|
||||||
|
event = Event.build(message)
|
||||||
|
assert isinstance(event, Event), f'Expected {Event}, got {type(event)}'
|
||||||
|
except Exception as e:
|
||||||
|
logger.info('Could not build event from %s: %s', message, e)
|
||||||
|
logger.exception(e)
|
||||||
|
return
|
||||||
|
|
||||||
|
send_message(event, wait_for_response=False)
|
||||||
|
|
||||||
@override
|
@override
|
||||||
def run(self) -> None:
|
def run(self) -> None:
|
||||||
for msg in self.listen():
|
for msg in self.listen():
|
||||||
try:
|
try:
|
||||||
evt = Event.build(msg.decode())
|
evt = Event.build(msg)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning('Error parsing event: %s: %s', msg, e)
|
logger.warning('Error parsing event: %s: %s', msg, e)
|
||||||
continue
|
continue
|
||||||
|
|
Loading…
Reference in a new issue