diff --git a/platypush/backend/http/__init__.py b/platypush/backend/http/__init__.py index e49012b170..00e6a40d22 100644 --- a/platypush/backend/http/__init__.py +++ b/platypush/backend/http/__init__.py @@ -172,6 +172,7 @@ class HttpBackend(Backend): loop.run_until_complete(send_event(websocket)) except websockets.exceptions.ConnectionClosed: self.logger.info('Client connection lost') + self.active_websockets.remove(websocket) def redis_poll(self): diff --git a/platypush/backend/websocket.py b/platypush/backend/websocket.py index 376747b42e..caddeda05e 100644 --- a/platypush/backend/websocket.py +++ b/platypush/backend/websocket.py @@ -21,7 +21,7 @@ class WebsocketBackend(Backend): """ # Websocket client message recv timeout in seconds - _websocket_client_timeout = 60 + _websocket_client_timeout = 0 def __init__(self, port=8765, bind_address='0.0.0.0', ssl_cafile=None, ssl_capath=None, ssl_cert=None, ssl_key=None, @@ -45,7 +45,7 @@ class WebsocketBackend(Backend): :param ssl_capath: Path to the certificate authority directory if required by the SSL configuration (default: None) :type ssl_capath: str - :param client_timeout: Timeout without any messages being received before closing a client connection. A zero timeout keeps the websocket open until an error occurs (default: 60 seconds) + :param client_timeout: Timeout without any messages being received before closing a client connection. A zero timeout keeps the websocket open until an error occurs (default: 0, no timeout) :type ping_timeout: int """ @@ -54,6 +54,7 @@ class WebsocketBackend(Backend): self.port = port self.bind_address = bind_address self.client_timeout = client_timeout + self.active_websockets = set() self.ssl_context = get_ssl_server_context(ssl_cert=ssl_cert, ssl_key=ssl_key, @@ -74,10 +75,29 @@ class WebsocketBackend(Backend): websocket.send(url=url, msg=msg, **websocket_args) + def notify_web_clients(self, event): + """ Notify all the connected web clients (over websocket) of a new event """ + async def send_event(websocket): + try: + await websocket.send(str(event)) + except Exception as e: + self.logger.warning('Error on websocket send_event: {}'.format(e)) + + loop = get_or_create_event_loop() + + for websocket in self.active_websockets: + try: + loop.run_until_complete(send_event(websocket)) + except websockets.exceptions.ConnectionClosed: + self.logger.info('Client connection lost') + self.active_websockets.remove(websocket) + + def run(self): super().run() async def serve_client(websocket, path): + self.active_websockets.add(websocket) self.logger.debug('New websocket connection from {}'. format(websocket.remote_address[0])) @@ -107,9 +127,11 @@ class WebsocketBackend(Backend): except Exception as e: if isinstance(e, websockets.exceptions.ConnectionClosed): + self.active_websockets.remove(websocket) self.logger.debug('Websocket client {} closed connection'. format(websocket.remote_address[0])) elif isinstance(e, asyncio.TimeoutError): + self.active_websockets.remove(websocket) self.logger.debug('Websocket connection to {} timed out'. format(websocket.remote_address[0])) else: diff --git a/platypush/event/processor/__init__.py b/platypush/event/processor/__init__.py index 32096bdd47..7cf119fc2c 100644 --- a/platypush/event/processor/__init__.py +++ b/platypush/event/processor/__init__.py @@ -33,6 +33,10 @@ class EventProcessor(object): if backend: backend.notify_web_clients(event) + backend = get_backend('websocket') + if backend: + backend.notify_web_clients(event) + def process_event(self, event): """ Processes an event and runs the matched hooks with the highest score """