diff --git a/platypush/backend/http/webapp/package-lock.json b/platypush/backend/http/webapp/package-lock.json index c27e627e..bb2a6b70 100644 --- a/platypush/backend/http/webapp/package-lock.json +++ b/platypush/backend/http/webapp/package-lock.json @@ -10,13 +10,13 @@ "dependencies": { "@fortawesome/fontawesome-free": "^5.15.4", "axios": "^0.21.4", - "core-js": "^3.21.1", + "core-js": "^3.23.4", "lato-font": "^3.0.0", "mitt": "^2.1.0", - "sass": "^1.49.9", - "sass-loader": "^10.2.1", + "sass": "^1.53.0", + "sass-loader": "^10.3.1", "vue": "^3.2.13", - "vue-router": "^4.0.14", + "vue-router": "^4.1.2", "vue-skycons": "^4.2.0", "w3css": "^2.7.0" }, @@ -2768,9 +2768,9 @@ "dev": true }, "node_modules/@vue/devtools-api": { - "version": "6.1.3", - "resolved": "https://registry.npmjs.org/@vue/devtools-api/-/devtools-api-6.1.3.tgz", - "integrity": "sha512-79InfO2xHv+WHIrH1bHXQUiQD/wMls9qBk6WVwGCbdwP7/3zINtvqPNMtmSHXsIKjvUAHc8L0ouOj6ZQQRmcXg==" + "version": "6.2.1", + "resolved": "https://registry.npmjs.org/@vue/devtools-api/-/devtools-api-6.2.1.tgz", + "integrity": "sha512-OEgAMeQXvCoJ+1x8WyQuVZzFo0wcyCmUR3baRVLmKBo1LmYZWMlRiXlux5jd0fqVJu6PfDbOrZItVqUEzLobeQ==" }, "node_modules/@vue/reactivity": { "version": "3.2.31", @@ -4205,9 +4205,9 @@ } }, "node_modules/core-js": { - "version": "3.21.1", - "resolved": "https://registry.npmjs.org/core-js/-/core-js-3.21.1.tgz", - "integrity": "sha512-FRq5b/VMrWlrmCzwRrpDYNxyHP9BcAZC+xHJaqTgIE5091ZV1NTmyh0sGOg5XqpnHvR0svdy0sv1gWA1zmhxig==", + "version": "3.23.4", + "resolved": "https://registry.npmjs.org/core-js/-/core-js-3.23.4.tgz", + "integrity": "sha512-vjsKqRc1RyAJC3Ye2kYqgfdThb3zYnx9CrqoCcjMOENMtQPC7ZViBvlDxwYU/2z2NI/IPuiXw5mT4hWhddqjzQ==", "hasInstallScript": true, "funding": { "type": "opencollective", @@ -9402,9 +9402,9 @@ "dev": true }, "node_modules/sass": { - "version": "1.49.9", - "resolved": "https://registry.npmjs.org/sass/-/sass-1.49.9.tgz", - "integrity": "sha512-YlYWkkHP9fbwaFRZQRXgDi3mXZShslVmmo+FVK3kHLUELHHEYrCmL1x6IUjC7wLS6VuJSAFXRQS/DxdsC4xL1A==", + "version": "1.53.0", + "resolved": "https://registry.npmjs.org/sass/-/sass-1.53.0.tgz", + "integrity": "sha512-zb/oMirbKhUgRQ0/GFz8TSAwRq2IlR29vOUJZOx0l8sV+CkHUfHa4u5nqrG+1VceZp7Jfj59SVW9ogdhTvJDcQ==", "dependencies": { "chokidar": ">=3.0.0 <4.0.0", "immutable": "^4.0.0", @@ -9418,9 +9418,9 @@ } }, "node_modules/sass-loader": { - "version": "10.2.1", - "resolved": "https://registry.npmjs.org/sass-loader/-/sass-loader-10.2.1.tgz", - "integrity": "sha512-RRvWl+3K2LSMezIsd008ErK4rk6CulIMSwrcc2aZvjymUgKo/vjXGp1rSWmfTUX7bblEOz8tst4wBwWtCGBqKA==", + "version": "10.3.1", + "resolved": "https://registry.npmjs.org/sass-loader/-/sass-loader-10.3.1.tgz", + "integrity": "sha512-y2aBdtYkbqorVavkC3fcJIUDGIegzDWPn3/LAFhsf3G+MzPKTJx37sROf5pXtUeggSVbNbmfj8TgRaSLMelXRA==", "dependencies": { "klona": "^2.0.4", "loader-utils": "^2.0.0", @@ -9437,7 +9437,7 @@ }, "peerDependencies": { "fibers": ">= 3.1.0", - "node-sass": "^4.0.0 || ^5.0.0 || ^6.0.0", + "node-sass": "^4.0.0 || ^5.0.0 || ^6.0.0 || ^7.0.0", "sass": "^1.3.0", "webpack": "^4.36.0 || ^5.0.0" }, @@ -10825,11 +10825,11 @@ } }, "node_modules/vue-router": { - "version": "4.0.14", - "resolved": "https://registry.npmjs.org/vue-router/-/vue-router-4.0.14.tgz", - "integrity": "sha512-wAO6zF9zxA3u+7AkMPqw9LjoUCjSxfFvINQj3E/DceTt6uEz1XZLraDhdg2EYmvVwTBSGlLYsUw8bDmx0754Mw==", + "version": "4.1.2", + "resolved": "https://registry.npmjs.org/vue-router/-/vue-router-4.1.2.tgz", + "integrity": "sha512-5BP1qXFncVRwgV/XnqzsKApdMjQPqWIpoUBdL1ynz8HyLxIX/UDAx7Ql2BjmA5CXT/p61JfZvkpiFWFpaqcfag==", "dependencies": { - "@vue/devtools-api": "^6.0.0" + "@vue/devtools-api": "^6.1.4" }, "funding": { "url": "https://github.com/sponsors/posva" @@ -13770,9 +13770,9 @@ } }, "@vue/devtools-api": { - "version": "6.1.3", - "resolved": "https://registry.npmjs.org/@vue/devtools-api/-/devtools-api-6.1.3.tgz", - "integrity": "sha512-79InfO2xHv+WHIrH1bHXQUiQD/wMls9qBk6WVwGCbdwP7/3zINtvqPNMtmSHXsIKjvUAHc8L0ouOj6ZQQRmcXg==" + "version": "6.2.1", + "resolved": "https://registry.npmjs.org/@vue/devtools-api/-/devtools-api-6.2.1.tgz", + "integrity": "sha512-OEgAMeQXvCoJ+1x8WyQuVZzFo0wcyCmUR3baRVLmKBo1LmYZWMlRiXlux5jd0fqVJu6PfDbOrZItVqUEzLobeQ==" }, "@vue/reactivity": { "version": "3.2.31", @@ -14868,9 +14868,9 @@ } }, "core-js": { - "version": "3.21.1", - "resolved": "https://registry.npmjs.org/core-js/-/core-js-3.21.1.tgz", - "integrity": "sha512-FRq5b/VMrWlrmCzwRrpDYNxyHP9BcAZC+xHJaqTgIE5091ZV1NTmyh0sGOg5XqpnHvR0svdy0sv1gWA1zmhxig==" + "version": "3.23.4", + "resolved": "https://registry.npmjs.org/core-js/-/core-js-3.23.4.tgz", + "integrity": "sha512-vjsKqRc1RyAJC3Ye2kYqgfdThb3zYnx9CrqoCcjMOENMtQPC7ZViBvlDxwYU/2z2NI/IPuiXw5mT4hWhddqjzQ==" }, "core-js-compat": { "version": "3.21.1", @@ -18676,9 +18676,9 @@ "dev": true }, "sass": { - "version": "1.49.9", - "resolved": "https://registry.npmjs.org/sass/-/sass-1.49.9.tgz", - "integrity": "sha512-YlYWkkHP9fbwaFRZQRXgDi3mXZShslVmmo+FVK3kHLUELHHEYrCmL1x6IUjC7wLS6VuJSAFXRQS/DxdsC4xL1A==", + "version": "1.53.0", + "resolved": "https://registry.npmjs.org/sass/-/sass-1.53.0.tgz", + "integrity": "sha512-zb/oMirbKhUgRQ0/GFz8TSAwRq2IlR29vOUJZOx0l8sV+CkHUfHa4u5nqrG+1VceZp7Jfj59SVW9ogdhTvJDcQ==", "requires": { "chokidar": ">=3.0.0 <4.0.0", "immutable": "^4.0.0", @@ -18686,9 +18686,9 @@ } }, "sass-loader": { - "version": "10.2.1", - "resolved": "https://registry.npmjs.org/sass-loader/-/sass-loader-10.2.1.tgz", - "integrity": "sha512-RRvWl+3K2LSMezIsd008ErK4rk6CulIMSwrcc2aZvjymUgKo/vjXGp1rSWmfTUX7bblEOz8tst4wBwWtCGBqKA==", + "version": "10.3.1", + "resolved": "https://registry.npmjs.org/sass-loader/-/sass-loader-10.3.1.tgz", + "integrity": "sha512-y2aBdtYkbqorVavkC3fcJIUDGIegzDWPn3/LAFhsf3G+MzPKTJx37sROf5pXtUeggSVbNbmfj8TgRaSLMelXRA==", "requires": { "klona": "^2.0.4", "loader-utils": "^2.0.0", @@ -19746,11 +19746,11 @@ } }, "vue-router": { - "version": "4.0.14", - "resolved": "https://registry.npmjs.org/vue-router/-/vue-router-4.0.14.tgz", - "integrity": "sha512-wAO6zF9zxA3u+7AkMPqw9LjoUCjSxfFvINQj3E/DceTt6uEz1XZLraDhdg2EYmvVwTBSGlLYsUw8bDmx0754Mw==", + "version": "4.1.2", + "resolved": "https://registry.npmjs.org/vue-router/-/vue-router-4.1.2.tgz", + "integrity": "sha512-5BP1qXFncVRwgV/XnqzsKApdMjQPqWIpoUBdL1ynz8HyLxIX/UDAx7Ql2BjmA5CXT/p61JfZvkpiFWFpaqcfag==", "requires": { - "@vue/devtools-api": "^6.0.0" + "@vue/devtools-api": "^6.1.4" } }, "vue-skycons": { diff --git a/platypush/backend/http/webapp/package.json b/platypush/backend/http/webapp/package.json index 531f041d..7f76766d 100644 --- a/platypush/backend/http/webapp/package.json +++ b/platypush/backend/http/webapp/package.json @@ -10,13 +10,13 @@ "dependencies": { "@fortawesome/fontawesome-free": "^5.15.4", "axios": "^0.21.4", - "core-js": "^3.21.1", + "core-js": "^3.23.4", "lato-font": "^3.0.0", "mitt": "^2.1.0", - "sass": "^1.49.9", - "sass-loader": "^10.2.1", + "sass": "^1.53.0", + "sass-loader": "^10.3.1", "vue": "^3.2.13", - "vue-router": "^4.0.14", + "vue-router": "^4.1.2", "vue-skycons": "^4.2.0", "w3css": "^2.7.0" }, diff --git a/platypush/message/event/websocket.py b/platypush/message/event/websocket.py new file mode 100644 index 00000000..4656c190 --- /dev/null +++ b/platypush/message/event/websocket.py @@ -0,0 +1,16 @@ +from typing import Any + +from platypush.message.event import Event + + +class WebsocketMessageEvent(Event): + """ + Event triggered when a message is receive on a subscribed websocket URL. + """ + + def __init__(self, *args, url: str, message: Any, **kwargs): + """ + :param url: Websocket URL. + :param message: The received message. + """ + super().__init__(*args, url=url, message=message, **kwargs) diff --git a/platypush/plugins/websocket/__init__.py b/platypush/plugins/websocket/__init__.py index 1cfa7c0e..86716394 100644 --- a/platypush/plugins/websocket/__init__.py +++ b/platypush/plugins/websocket/__init__.py @@ -1,13 +1,12 @@ +import asyncio import json +import time -try: - from websockets.exceptions import ConnectionClosed - from websockets import connect as websocket_connect -except ImportError: - from websockets import ConnectionClosed, connect as websocket_connect +from websockets import connect as websocket_connect +from websockets.exceptions import ConnectionClosed -from platypush.context import get_or_create_event_loop -from platypush.message import Message +from platypush.context import get_or_create_event_loop, get_bus +from platypush.message.event.websocket import WebsocketMessageEvent from platypush.plugins import Plugin, action from platypush.utils import get_ssl_client_context @@ -15,61 +14,180 @@ from platypush.utils import get_ssl_client_context class WebsocketPlugin(Plugin): """ Plugin to send messages over a websocket connection. + + Triggers: + + * :class:`platypush.message.event.websocket.WebsocketMessageEvent` when + a message is received on a subscribed websocket. + """ - def __init__(self, **kwargs): - super().__init__(**kwargs) - @action - def send(self, url, msg, ssl_cert=None, ssl_key=None, ssl_cafile=None, ssl_capath=None): + def send( + self, + url: str, + msg, + ssl_cert=None, + ssl_key=None, + ssl_cafile=None, + ssl_capath=None, + wait_response=False, + ): """ Sends a message to a websocket. :param url: Websocket URL, e.g. ws://localhost:8765 or wss://localhost:8765 - :type url: str - :param msg: Message to be sent. It can be a list, a dict, or a Message object - - :param ssl_cert: Path to the SSL certificate to be used, if the SSL connection requires client authentication - as well (default: None) :type ssl_cert: str - - :param ssl_key: Path to the SSL key to be used, if the SSL connection requires client authentication as well - (default: None) :type ssl_key: str - - :param ssl_cafile: Path to the certificate authority file if required by the SSL configuration (default: None) - :type ssl_cafile: str - - :param ssl_capath: Path to the certificate authority directory if required by the SSL configuration - (default: None) - :type ssl_capath: str + :param ssl_cert: Path to the SSL certificate to be used, if the SSL + connection requires client authentication as well (default: None) + :param ssl_key: Path to the SSL key to be used, if the SSL connection + requires client authentication as well (default: None) + :param ssl_cafile: Path to the certificate authority file if required + by the SSL configuration (default: None) + :param ssl_capath: Path to the certificate authority directory if + required by the SSL configuration (default: None) + :param wait_response: Set to True if you expect a response to the + delivered message. + :return: The received response if ``wait_response`` is set to True, + otherwise nothing. """ async def send(): - websocket_args = {} - if ssl_cert: - websocket_args['ssl'] = get_ssl_client_context(ssl_cert=ssl_cert, - ssl_key=ssl_key, - ssl_cafile=ssl_cafile, - ssl_capath=ssl_capath) + websocket_args = { + 'ssl': self._get_ssl_context( + url, + ssl_cert=ssl_cert, + ssl_key=ssl_key, + ssl_cafile=ssl_cafile, + ssl_capath=ssl_capath, + ) + } - async with websocket_connect(url, **websocket_args) as websocket: + async with websocket_connect(url, **websocket_args) as ws: try: - await websocket.send(str(msg)) + await ws.send(str(msg)) except ConnectionClosed as err: - self.logger.warning('Error on websocket {}: {}'. - format(url, err)) + self.logger.warning('Error on websocket %s: %s', url, err) - try: - msg = json.dumps(msg) - except Exception as e: - self.logger.debug(e) + if wait_response: + messages = await self._ws_recv(ws, num_messages=1) + if messages: + return self._parse_msg(messages[0]) - try: - msg = Message.build(json.loads(msg)) - except Exception as e: - self.logger.debug(e) + msg = self._parse_msg(msg) + loop = get_or_create_event_loop() + return loop.run_until_complete(send()) + + @action + def recv( + self, + url: str, + ssl_cert=None, + ssl_key=None, + ssl_cafile=None, + ssl_capath=None, + num_messages=0, + timeout=0, + ): + """ + Receive one or more messages from a websocket. + + A :class:`platypush.message.event.websocket.WebsocketMessageEvent` + event will be triggered whenever a new message is received. + + :param url: Websocket URL, e.g. ws://localhost:8765 or wss://localhost:8765 + :param ssl_cert: Path to the SSL certificate to be used, if the SSL + connection requires client authentication as well (default: None) + :param ssl_key: Path to the SSL key to be used, if the SSL connection + requires client authentication as well (default: None) + :param ssl_cafile: Path to the certificate authority file if required + by the SSL configuration (default: None) + :param ssl_capath: Path to the certificate authority directory if + required by the SSL configuration (default: None) + :param num_messages: Exit after receiving this number of messages. + Default: 0, receive forever. + :param timeout: Message receive timeout in seconds. Default: 0 - no timeout. + :return: A list with the messages that have been received, unless + ``num_messages`` is set to 0 or ``None``. + """ + + async def recv(): + websocket_args = { + 'ssl': self._get_ssl_context( + url, + ssl_cert=ssl_cert, + ssl_key=ssl_key, + ssl_cafile=ssl_cafile, + ssl_capath=ssl_capath, + ) + } + + async with websocket_connect(url, **websocket_args) as ws: + return await self._ws_recv( + ws, timeout=timeout, num_messages=num_messages + ) loop = get_or_create_event_loop() - loop.run_until_complete(send()) + return loop.run_until_complete(recv()) + + async def _ws_recv(self, ws, timeout=0, num_messages=0): + messages = [] + time_start = time.time() + time_end = time_start + timeout if timeout else 0 + url = 'ws{secure}://{host}:{port}{path}'.format( + secure='s' if ws.secure else '', + host=ws.host, + port=ws.port, + path=ws.path, + ) + + while (not num_messages) or (len(messages) < num_messages): + msg = None + err = None + remaining_timeout = time_end - time.time() if time_end else None + + try: + msg = await asyncio.wait_for(ws.recv(), remaining_timeout) + except (ConnectionClosed, asyncio.exceptions.TimeoutError) as e: + err = e + self.logger.warning('Error on websocket %s: %s', url, e) + + if isinstance(err, ConnectionClosed) or ( + time_end and time.time() > time_end + ): + break + + if msg is None: + continue + + msg = self._parse_msg(msg) + messages.append(msg) + get_bus().post(WebsocketMessageEvent(url=url, message=msg)) + + return messages + + @staticmethod + def _parse_msg(msg): + try: + msg = json.dumps(msg) + except Exception: + pass + + return msg + + @staticmethod + def _get_ssl_context( + url: str, ssl_cert=None, ssl_key=None, ssl_cafile=None, ssl_capath=None + ): + if url.startswith('wss://'): + return get_ssl_client_context( + ssl_cert=ssl_cert, + ssl_key=ssl_key, + ssl_cafile=ssl_cafile, + ssl_capath=ssl_capath, + ) + + return None + # vim:sw=4:ts=4:et: diff --git a/platypush/plugins/websocket/manifest.yaml b/platypush/plugins/websocket/manifest.yaml index 8c95c60e..973fde3f 100644 --- a/platypush/plugins/websocket/manifest.yaml +++ b/platypush/plugins/websocket/manifest.yaml @@ -1,5 +1,7 @@ manifest: - events: {} + events: + platypush.message.event.websocket.WebsocketMessageEvent: when a message is + received on a subscribed websocket. install: pip: [] package: platypush.plugins.websocket