diff --git a/platypush/message/event/websocket.py b/platypush/message/event/websocket.py new file mode 100644 index 000000000..4656c1907 --- /dev/null +++ b/platypush/message/event/websocket.py @@ -0,0 +1,16 @@ +from typing import Any + +from platypush.message.event import Event + + +class WebsocketMessageEvent(Event): + """ + Event triggered when a message is receive on a subscribed websocket URL. + """ + + def __init__(self, *args, url: str, message: Any, **kwargs): + """ + :param url: Websocket URL. + :param message: The received message. + """ + super().__init__(*args, url=url, message=message, **kwargs) diff --git a/platypush/plugins/websocket/__init__.py b/platypush/plugins/websocket/__init__.py index 1cfa7c0e5..86716394a 100644 --- a/platypush/plugins/websocket/__init__.py +++ b/platypush/plugins/websocket/__init__.py @@ -1,13 +1,12 @@ +import asyncio import json +import time -try: - from websockets.exceptions import ConnectionClosed - from websockets import connect as websocket_connect -except ImportError: - from websockets import ConnectionClosed, connect as websocket_connect +from websockets import connect as websocket_connect +from websockets.exceptions import ConnectionClosed -from platypush.context import get_or_create_event_loop -from platypush.message import Message +from platypush.context import get_or_create_event_loop, get_bus +from platypush.message.event.websocket import WebsocketMessageEvent from platypush.plugins import Plugin, action from platypush.utils import get_ssl_client_context @@ -15,61 +14,180 @@ from platypush.utils import get_ssl_client_context class WebsocketPlugin(Plugin): """ Plugin to send messages over a websocket connection. + + Triggers: + + * :class:`platypush.message.event.websocket.WebsocketMessageEvent` when + a message is received on a subscribed websocket. + """ - def __init__(self, **kwargs): - super().__init__(**kwargs) - @action - def send(self, url, msg, ssl_cert=None, ssl_key=None, ssl_cafile=None, ssl_capath=None): + def send( + self, + url: str, + msg, + ssl_cert=None, + ssl_key=None, + ssl_cafile=None, + ssl_capath=None, + wait_response=False, + ): """ Sends a message to a websocket. :param url: Websocket URL, e.g. ws://localhost:8765 or wss://localhost:8765 - :type url: str - :param msg: Message to be sent. It can be a list, a dict, or a Message object - - :param ssl_cert: Path to the SSL certificate to be used, if the SSL connection requires client authentication - as well (default: None) :type ssl_cert: str - - :param ssl_key: Path to the SSL key to be used, if the SSL connection requires client authentication as well - (default: None) :type ssl_key: str - - :param ssl_cafile: Path to the certificate authority file if required by the SSL configuration (default: None) - :type ssl_cafile: str - - :param ssl_capath: Path to the certificate authority directory if required by the SSL configuration - (default: None) - :type ssl_capath: str + :param ssl_cert: Path to the SSL certificate to be used, if the SSL + connection requires client authentication as well (default: None) + :param ssl_key: Path to the SSL key to be used, if the SSL connection + requires client authentication as well (default: None) + :param ssl_cafile: Path to the certificate authority file if required + by the SSL configuration (default: None) + :param ssl_capath: Path to the certificate authority directory if + required by the SSL configuration (default: None) + :param wait_response: Set to True if you expect a response to the + delivered message. + :return: The received response if ``wait_response`` is set to True, + otherwise nothing. """ async def send(): - websocket_args = {} - if ssl_cert: - websocket_args['ssl'] = get_ssl_client_context(ssl_cert=ssl_cert, - ssl_key=ssl_key, - ssl_cafile=ssl_cafile, - ssl_capath=ssl_capath) + websocket_args = { + 'ssl': self._get_ssl_context( + url, + ssl_cert=ssl_cert, + ssl_key=ssl_key, + ssl_cafile=ssl_cafile, + ssl_capath=ssl_capath, + ) + } - async with websocket_connect(url, **websocket_args) as websocket: + async with websocket_connect(url, **websocket_args) as ws: try: - await websocket.send(str(msg)) + await ws.send(str(msg)) except ConnectionClosed as err: - self.logger.warning('Error on websocket {}: {}'. - format(url, err)) + self.logger.warning('Error on websocket %s: %s', url, err) - try: - msg = json.dumps(msg) - except Exception as e: - self.logger.debug(e) + if wait_response: + messages = await self._ws_recv(ws, num_messages=1) + if messages: + return self._parse_msg(messages[0]) - try: - msg = Message.build(json.loads(msg)) - except Exception as e: - self.logger.debug(e) + msg = self._parse_msg(msg) + loop = get_or_create_event_loop() + return loop.run_until_complete(send()) + + @action + def recv( + self, + url: str, + ssl_cert=None, + ssl_key=None, + ssl_cafile=None, + ssl_capath=None, + num_messages=0, + timeout=0, + ): + """ + Receive one or more messages from a websocket. + + A :class:`platypush.message.event.websocket.WebsocketMessageEvent` + event will be triggered whenever a new message is received. + + :param url: Websocket URL, e.g. ws://localhost:8765 or wss://localhost:8765 + :param ssl_cert: Path to the SSL certificate to be used, if the SSL + connection requires client authentication as well (default: None) + :param ssl_key: Path to the SSL key to be used, if the SSL connection + requires client authentication as well (default: None) + :param ssl_cafile: Path to the certificate authority file if required + by the SSL configuration (default: None) + :param ssl_capath: Path to the certificate authority directory if + required by the SSL configuration (default: None) + :param num_messages: Exit after receiving this number of messages. + Default: 0, receive forever. + :param timeout: Message receive timeout in seconds. Default: 0 - no timeout. + :return: A list with the messages that have been received, unless + ``num_messages`` is set to 0 or ``None``. + """ + + async def recv(): + websocket_args = { + 'ssl': self._get_ssl_context( + url, + ssl_cert=ssl_cert, + ssl_key=ssl_key, + ssl_cafile=ssl_cafile, + ssl_capath=ssl_capath, + ) + } + + async with websocket_connect(url, **websocket_args) as ws: + return await self._ws_recv( + ws, timeout=timeout, num_messages=num_messages + ) loop = get_or_create_event_loop() - loop.run_until_complete(send()) + return loop.run_until_complete(recv()) + + async def _ws_recv(self, ws, timeout=0, num_messages=0): + messages = [] + time_start = time.time() + time_end = time_start + timeout if timeout else 0 + url = 'ws{secure}://{host}:{port}{path}'.format( + secure='s' if ws.secure else '', + host=ws.host, + port=ws.port, + path=ws.path, + ) + + while (not num_messages) or (len(messages) < num_messages): + msg = None + err = None + remaining_timeout = time_end - time.time() if time_end else None + + try: + msg = await asyncio.wait_for(ws.recv(), remaining_timeout) + except (ConnectionClosed, asyncio.exceptions.TimeoutError) as e: + err = e + self.logger.warning('Error on websocket %s: %s', url, e) + + if isinstance(err, ConnectionClosed) or ( + time_end and time.time() > time_end + ): + break + + if msg is None: + continue + + msg = self._parse_msg(msg) + messages.append(msg) + get_bus().post(WebsocketMessageEvent(url=url, message=msg)) + + return messages + + @staticmethod + def _parse_msg(msg): + try: + msg = json.dumps(msg) + except Exception: + pass + + return msg + + @staticmethod + def _get_ssl_context( + url: str, ssl_cert=None, ssl_key=None, ssl_cafile=None, ssl_capath=None + ): + if url.startswith('wss://'): + return get_ssl_client_context( + ssl_cert=ssl_cert, + ssl_key=ssl_key, + ssl_cafile=ssl_cafile, + ssl_capath=ssl_capath, + ) + + return None + # vim:sw=4:ts=4:et: diff --git a/platypush/plugins/websocket/manifest.yaml b/platypush/plugins/websocket/manifest.yaml index 8c95c60ed..973fde3f9 100644 --- a/platypush/plugins/websocket/manifest.yaml +++ b/platypush/plugins/websocket/manifest.yaml @@ -1,5 +1,7 @@ manifest: - events: {} + events: + platypush.message.event.websocket.WebsocketMessageEvent: when a message is + received on a subscribed websocket. install: pip: [] package: platypush.plugins.websocket