Added logic to propagate events to the websockets backend

This commit is contained in:
Fabio Manganiello 2018-11-02 10:14:06 +00:00
parent cd3aea5cd6
commit c05fc9ee3f
3 changed files with 29 additions and 2 deletions

View file

@ -172,6 +172,7 @@ class HttpBackend(Backend):
loop.run_until_complete(send_event(websocket))
except websockets.exceptions.ConnectionClosed:
self.logger.info('Client connection lost')
self.active_websockets.remove(websocket)
def redis_poll(self):

View file

@ -21,7 +21,7 @@ class WebsocketBackend(Backend):
"""
# Websocket client message recv timeout in seconds
_websocket_client_timeout = 60
_websocket_client_timeout = 0
def __init__(self, port=8765, bind_address='0.0.0.0', ssl_cafile=None,
ssl_capath=None, ssl_cert=None, ssl_key=None,
@ -45,7 +45,7 @@ class WebsocketBackend(Backend):
:param ssl_capath: Path to the certificate authority directory if required by the SSL configuration (default: None)
:type ssl_capath: str
:param client_timeout: Timeout without any messages being received before closing a client connection. A zero timeout keeps the websocket open until an error occurs (default: 60 seconds)
:param client_timeout: Timeout without any messages being received before closing a client connection. A zero timeout keeps the websocket open until an error occurs (default: 0, no timeout)
:type ping_timeout: int
"""
@ -54,6 +54,7 @@ class WebsocketBackend(Backend):
self.port = port
self.bind_address = bind_address
self.client_timeout = client_timeout
self.active_websockets = set()
self.ssl_context = get_ssl_server_context(ssl_cert=ssl_cert,
ssl_key=ssl_key,
@ -74,10 +75,29 @@ class WebsocketBackend(Backend):
websocket.send(url=url, msg=msg, **websocket_args)
def notify_web_clients(self, event):
""" Notify all the connected web clients (over websocket) of a new event """
async def send_event(websocket):
try:
await websocket.send(str(event))
except Exception as e:
self.logger.warning('Error on websocket send_event: {}'.format(e))
loop = get_or_create_event_loop()
for websocket in self.active_websockets:
try:
loop.run_until_complete(send_event(websocket))
except websockets.exceptions.ConnectionClosed:
self.logger.info('Client connection lost')
self.active_websockets.remove(websocket)
def run(self):
super().run()
async def serve_client(websocket, path):
self.active_websockets.add(websocket)
self.logger.debug('New websocket connection from {}'.
format(websocket.remote_address[0]))
@ -107,9 +127,11 @@ class WebsocketBackend(Backend):
except Exception as e:
if isinstance(e, websockets.exceptions.ConnectionClosed):
self.active_websockets.remove(websocket)
self.logger.debug('Websocket client {} closed connection'.
format(websocket.remote_address[0]))
elif isinstance(e, asyncio.TimeoutError):
self.active_websockets.remove(websocket)
self.logger.debug('Websocket connection to {} timed out'.
format(websocket.remote_address[0]))
else:

View file

@ -33,6 +33,10 @@ class EventProcessor(object):
if backend:
backend.notify_web_clients(event)
backend = get_backend('websocket')
if backend:
backend.notify_web_clients(event)
def process_event(self, event):
""" Processes an event and runs the matched hooks with the highest score """