66 lines
1.8 KiB
Python
66 lines
1.8 KiB
Python
|
from logging import getLogger
|
||
|
|
||
|
from flask import Blueprint, request
|
||
|
from simple_websocket import ConnectionClosed, Server
|
||
|
|
||
|
from platypush.backend.http.app import template_folder
|
||
|
from platypush.backend.http.app.utils import authenticate
|
||
|
from platypush.backend.http.ws import events_redis_topic
|
||
|
from platypush.message.event import Event
|
||
|
from platypush.utils import get_redis
|
||
|
|
||
|
|
||
|
ws = Blueprint('ws', __name__, template_folder=template_folder)
|
||
|
|
||
|
__routes__ = [ws]
|
||
|
|
||
|
logger = getLogger(__name__)
|
||
|
|
||
|
|
||
|
@ws.route('/ws/events', websocket=True)
|
||
|
@authenticate(json=True)
|
||
|
def ws_events_route():
|
||
|
"""
|
||
|
A websocket endpoint to asynchronously receive events generated from the
|
||
|
application.
|
||
|
|
||
|
This endpoint is mainly used by web clients to listen for the events
|
||
|
generated by the application.
|
||
|
"""
|
||
|
|
||
|
sock = Server(request.environ, ping_interval=25)
|
||
|
ws_key = (sock.environ['REMOTE_ADDR'], int(sock.environ['REMOTE_PORT']))
|
||
|
sub = get_redis().pubsub()
|
||
|
sub.subscribe(events_redis_topic)
|
||
|
logger.info('Started websocket connection with %s', ws_key)
|
||
|
|
||
|
try:
|
||
|
for msg in sub.listen():
|
||
|
if (
|
||
|
msg.get('type') != 'message'
|
||
|
and msg.get('channel').decode() != events_redis_topic
|
||
|
):
|
||
|
continue
|
||
|
|
||
|
try:
|
||
|
evt = Event.build(msg.get('data').decode())
|
||
|
except Exception as e:
|
||
|
logger.warning('Error parsing event: %s: %s', msg.get('data'), e)
|
||
|
continue
|
||
|
|
||
|
sock.send(str(evt))
|
||
|
except ConnectionClosed as e:
|
||
|
logger.info(
|
||
|
'Websocket connection to %s closed, reason=%s, message=%s',
|
||
|
ws_key,
|
||
|
e.reason,
|
||
|
e.message,
|
||
|
)
|
||
|
finally:
|
||
|
sub.unsubscribe(events_redis_topic)
|
||
|
|
||
|
return ''
|
||
|
|
||
|
|
||
|
# vim:sw=4:ts=4:et:
|