forked from platypush/platypush
Merge branch 'master' into matrix-integration
This commit is contained in:
commit
dc7ba881f1
5 changed files with 223 additions and 87 deletions
74
platypush/backend/http/webapp/package-lock.json
generated
74
platypush/backend/http/webapp/package-lock.json
generated
|
@ -10,13 +10,13 @@
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
"@fortawesome/fontawesome-free": "^5.15.4",
|
"@fortawesome/fontawesome-free": "^5.15.4",
|
||||||
"axios": "^0.21.4",
|
"axios": "^0.21.4",
|
||||||
"core-js": "^3.21.1",
|
"core-js": "^3.23.4",
|
||||||
"lato-font": "^3.0.0",
|
"lato-font": "^3.0.0",
|
||||||
"mitt": "^2.1.0",
|
"mitt": "^2.1.0",
|
||||||
"sass": "^1.49.9",
|
"sass": "^1.53.0",
|
||||||
"sass-loader": "^10.2.1",
|
"sass-loader": "^10.3.1",
|
||||||
"vue": "^3.2.13",
|
"vue": "^3.2.13",
|
||||||
"vue-router": "^4.0.14",
|
"vue-router": "^4.1.2",
|
||||||
"vue-skycons": "^4.2.0",
|
"vue-skycons": "^4.2.0",
|
||||||
"w3css": "^2.7.0"
|
"w3css": "^2.7.0"
|
||||||
},
|
},
|
||||||
|
@ -2768,9 +2768,9 @@
|
||||||
"dev": true
|
"dev": true
|
||||||
},
|
},
|
||||||
"node_modules/@vue/devtools-api": {
|
"node_modules/@vue/devtools-api": {
|
||||||
"version": "6.1.3",
|
"version": "6.2.1",
|
||||||
"resolved": "https://registry.npmjs.org/@vue/devtools-api/-/devtools-api-6.1.3.tgz",
|
"resolved": "https://registry.npmjs.org/@vue/devtools-api/-/devtools-api-6.2.1.tgz",
|
||||||
"integrity": "sha512-79InfO2xHv+WHIrH1bHXQUiQD/wMls9qBk6WVwGCbdwP7/3zINtvqPNMtmSHXsIKjvUAHc8L0ouOj6ZQQRmcXg=="
|
"integrity": "sha512-OEgAMeQXvCoJ+1x8WyQuVZzFo0wcyCmUR3baRVLmKBo1LmYZWMlRiXlux5jd0fqVJu6PfDbOrZItVqUEzLobeQ=="
|
||||||
},
|
},
|
||||||
"node_modules/@vue/reactivity": {
|
"node_modules/@vue/reactivity": {
|
||||||
"version": "3.2.31",
|
"version": "3.2.31",
|
||||||
|
@ -4205,9 +4205,9 @@
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"node_modules/core-js": {
|
"node_modules/core-js": {
|
||||||
"version": "3.21.1",
|
"version": "3.23.4",
|
||||||
"resolved": "https://registry.npmjs.org/core-js/-/core-js-3.21.1.tgz",
|
"resolved": "https://registry.npmjs.org/core-js/-/core-js-3.23.4.tgz",
|
||||||
"integrity": "sha512-FRq5b/VMrWlrmCzwRrpDYNxyHP9BcAZC+xHJaqTgIE5091ZV1NTmyh0sGOg5XqpnHvR0svdy0sv1gWA1zmhxig==",
|
"integrity": "sha512-vjsKqRc1RyAJC3Ye2kYqgfdThb3zYnx9CrqoCcjMOENMtQPC7ZViBvlDxwYU/2z2NI/IPuiXw5mT4hWhddqjzQ==",
|
||||||
"hasInstallScript": true,
|
"hasInstallScript": true,
|
||||||
"funding": {
|
"funding": {
|
||||||
"type": "opencollective",
|
"type": "opencollective",
|
||||||
|
@ -9402,9 +9402,9 @@
|
||||||
"dev": true
|
"dev": true
|
||||||
},
|
},
|
||||||
"node_modules/sass": {
|
"node_modules/sass": {
|
||||||
"version": "1.49.9",
|
"version": "1.53.0",
|
||||||
"resolved": "https://registry.npmjs.org/sass/-/sass-1.49.9.tgz",
|
"resolved": "https://registry.npmjs.org/sass/-/sass-1.53.0.tgz",
|
||||||
"integrity": "sha512-YlYWkkHP9fbwaFRZQRXgDi3mXZShslVmmo+FVK3kHLUELHHEYrCmL1x6IUjC7wLS6VuJSAFXRQS/DxdsC4xL1A==",
|
"integrity": "sha512-zb/oMirbKhUgRQ0/GFz8TSAwRq2IlR29vOUJZOx0l8sV+CkHUfHa4u5nqrG+1VceZp7Jfj59SVW9ogdhTvJDcQ==",
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
"chokidar": ">=3.0.0 <4.0.0",
|
"chokidar": ">=3.0.0 <4.0.0",
|
||||||
"immutable": "^4.0.0",
|
"immutable": "^4.0.0",
|
||||||
|
@ -9418,9 +9418,9 @@
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"node_modules/sass-loader": {
|
"node_modules/sass-loader": {
|
||||||
"version": "10.2.1",
|
"version": "10.3.1",
|
||||||
"resolved": "https://registry.npmjs.org/sass-loader/-/sass-loader-10.2.1.tgz",
|
"resolved": "https://registry.npmjs.org/sass-loader/-/sass-loader-10.3.1.tgz",
|
||||||
"integrity": "sha512-RRvWl+3K2LSMezIsd008ErK4rk6CulIMSwrcc2aZvjymUgKo/vjXGp1rSWmfTUX7bblEOz8tst4wBwWtCGBqKA==",
|
"integrity": "sha512-y2aBdtYkbqorVavkC3fcJIUDGIegzDWPn3/LAFhsf3G+MzPKTJx37sROf5pXtUeggSVbNbmfj8TgRaSLMelXRA==",
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
"klona": "^2.0.4",
|
"klona": "^2.0.4",
|
||||||
"loader-utils": "^2.0.0",
|
"loader-utils": "^2.0.0",
|
||||||
|
@ -9437,7 +9437,7 @@
|
||||||
},
|
},
|
||||||
"peerDependencies": {
|
"peerDependencies": {
|
||||||
"fibers": ">= 3.1.0",
|
"fibers": ">= 3.1.0",
|
||||||
"node-sass": "^4.0.0 || ^5.0.0 || ^6.0.0",
|
"node-sass": "^4.0.0 || ^5.0.0 || ^6.0.0 || ^7.0.0",
|
||||||
"sass": "^1.3.0",
|
"sass": "^1.3.0",
|
||||||
"webpack": "^4.36.0 || ^5.0.0"
|
"webpack": "^4.36.0 || ^5.0.0"
|
||||||
},
|
},
|
||||||
|
@ -10825,11 +10825,11 @@
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"node_modules/vue-router": {
|
"node_modules/vue-router": {
|
||||||
"version": "4.0.14",
|
"version": "4.1.2",
|
||||||
"resolved": "https://registry.npmjs.org/vue-router/-/vue-router-4.0.14.tgz",
|
"resolved": "https://registry.npmjs.org/vue-router/-/vue-router-4.1.2.tgz",
|
||||||
"integrity": "sha512-wAO6zF9zxA3u+7AkMPqw9LjoUCjSxfFvINQj3E/DceTt6uEz1XZLraDhdg2EYmvVwTBSGlLYsUw8bDmx0754Mw==",
|
"integrity": "sha512-5BP1qXFncVRwgV/XnqzsKApdMjQPqWIpoUBdL1ynz8HyLxIX/UDAx7Ql2BjmA5CXT/p61JfZvkpiFWFpaqcfag==",
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
"@vue/devtools-api": "^6.0.0"
|
"@vue/devtools-api": "^6.1.4"
|
||||||
},
|
},
|
||||||
"funding": {
|
"funding": {
|
||||||
"url": "https://github.com/sponsors/posva"
|
"url": "https://github.com/sponsors/posva"
|
||||||
|
@ -13770,9 +13770,9 @@
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"@vue/devtools-api": {
|
"@vue/devtools-api": {
|
||||||
"version": "6.1.3",
|
"version": "6.2.1",
|
||||||
"resolved": "https://registry.npmjs.org/@vue/devtools-api/-/devtools-api-6.1.3.tgz",
|
"resolved": "https://registry.npmjs.org/@vue/devtools-api/-/devtools-api-6.2.1.tgz",
|
||||||
"integrity": "sha512-79InfO2xHv+WHIrH1bHXQUiQD/wMls9qBk6WVwGCbdwP7/3zINtvqPNMtmSHXsIKjvUAHc8L0ouOj6ZQQRmcXg=="
|
"integrity": "sha512-OEgAMeQXvCoJ+1x8WyQuVZzFo0wcyCmUR3baRVLmKBo1LmYZWMlRiXlux5jd0fqVJu6PfDbOrZItVqUEzLobeQ=="
|
||||||
},
|
},
|
||||||
"@vue/reactivity": {
|
"@vue/reactivity": {
|
||||||
"version": "3.2.31",
|
"version": "3.2.31",
|
||||||
|
@ -14868,9 +14868,9 @@
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"core-js": {
|
"core-js": {
|
||||||
"version": "3.21.1",
|
"version": "3.23.4",
|
||||||
"resolved": "https://registry.npmjs.org/core-js/-/core-js-3.21.1.tgz",
|
"resolved": "https://registry.npmjs.org/core-js/-/core-js-3.23.4.tgz",
|
||||||
"integrity": "sha512-FRq5b/VMrWlrmCzwRrpDYNxyHP9BcAZC+xHJaqTgIE5091ZV1NTmyh0sGOg5XqpnHvR0svdy0sv1gWA1zmhxig=="
|
"integrity": "sha512-vjsKqRc1RyAJC3Ye2kYqgfdThb3zYnx9CrqoCcjMOENMtQPC7ZViBvlDxwYU/2z2NI/IPuiXw5mT4hWhddqjzQ=="
|
||||||
},
|
},
|
||||||
"core-js-compat": {
|
"core-js-compat": {
|
||||||
"version": "3.21.1",
|
"version": "3.21.1",
|
||||||
|
@ -18676,9 +18676,9 @@
|
||||||
"dev": true
|
"dev": true
|
||||||
},
|
},
|
||||||
"sass": {
|
"sass": {
|
||||||
"version": "1.49.9",
|
"version": "1.53.0",
|
||||||
"resolved": "https://registry.npmjs.org/sass/-/sass-1.49.9.tgz",
|
"resolved": "https://registry.npmjs.org/sass/-/sass-1.53.0.tgz",
|
||||||
"integrity": "sha512-YlYWkkHP9fbwaFRZQRXgDi3mXZShslVmmo+FVK3kHLUELHHEYrCmL1x6IUjC7wLS6VuJSAFXRQS/DxdsC4xL1A==",
|
"integrity": "sha512-zb/oMirbKhUgRQ0/GFz8TSAwRq2IlR29vOUJZOx0l8sV+CkHUfHa4u5nqrG+1VceZp7Jfj59SVW9ogdhTvJDcQ==",
|
||||||
"requires": {
|
"requires": {
|
||||||
"chokidar": ">=3.0.0 <4.0.0",
|
"chokidar": ">=3.0.0 <4.0.0",
|
||||||
"immutable": "^4.0.0",
|
"immutable": "^4.0.0",
|
||||||
|
@ -18686,9 +18686,9 @@
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"sass-loader": {
|
"sass-loader": {
|
||||||
"version": "10.2.1",
|
"version": "10.3.1",
|
||||||
"resolved": "https://registry.npmjs.org/sass-loader/-/sass-loader-10.2.1.tgz",
|
"resolved": "https://registry.npmjs.org/sass-loader/-/sass-loader-10.3.1.tgz",
|
||||||
"integrity": "sha512-RRvWl+3K2LSMezIsd008ErK4rk6CulIMSwrcc2aZvjymUgKo/vjXGp1rSWmfTUX7bblEOz8tst4wBwWtCGBqKA==",
|
"integrity": "sha512-y2aBdtYkbqorVavkC3fcJIUDGIegzDWPn3/LAFhsf3G+MzPKTJx37sROf5pXtUeggSVbNbmfj8TgRaSLMelXRA==",
|
||||||
"requires": {
|
"requires": {
|
||||||
"klona": "^2.0.4",
|
"klona": "^2.0.4",
|
||||||
"loader-utils": "^2.0.0",
|
"loader-utils": "^2.0.0",
|
||||||
|
@ -19746,11 +19746,11 @@
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"vue-router": {
|
"vue-router": {
|
||||||
"version": "4.0.14",
|
"version": "4.1.2",
|
||||||
"resolved": "https://registry.npmjs.org/vue-router/-/vue-router-4.0.14.tgz",
|
"resolved": "https://registry.npmjs.org/vue-router/-/vue-router-4.1.2.tgz",
|
||||||
"integrity": "sha512-wAO6zF9zxA3u+7AkMPqw9LjoUCjSxfFvINQj3E/DceTt6uEz1XZLraDhdg2EYmvVwTBSGlLYsUw8bDmx0754Mw==",
|
"integrity": "sha512-5BP1qXFncVRwgV/XnqzsKApdMjQPqWIpoUBdL1ynz8HyLxIX/UDAx7Ql2BjmA5CXT/p61JfZvkpiFWFpaqcfag==",
|
||||||
"requires": {
|
"requires": {
|
||||||
"@vue/devtools-api": "^6.0.0"
|
"@vue/devtools-api": "^6.1.4"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"vue-skycons": {
|
"vue-skycons": {
|
||||||
|
|
|
@ -10,13 +10,13 @@
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
"@fortawesome/fontawesome-free": "^5.15.4",
|
"@fortawesome/fontawesome-free": "^5.15.4",
|
||||||
"axios": "^0.21.4",
|
"axios": "^0.21.4",
|
||||||
"core-js": "^3.21.1",
|
"core-js": "^3.23.4",
|
||||||
"lato-font": "^3.0.0",
|
"lato-font": "^3.0.0",
|
||||||
"mitt": "^2.1.0",
|
"mitt": "^2.1.0",
|
||||||
"sass": "^1.49.9",
|
"sass": "^1.53.0",
|
||||||
"sass-loader": "^10.2.1",
|
"sass-loader": "^10.3.1",
|
||||||
"vue": "^3.2.13",
|
"vue": "^3.2.13",
|
||||||
"vue-router": "^4.0.14",
|
"vue-router": "^4.1.2",
|
||||||
"vue-skycons": "^4.2.0",
|
"vue-skycons": "^4.2.0",
|
||||||
"w3css": "^2.7.0"
|
"w3css": "^2.7.0"
|
||||||
},
|
},
|
||||||
|
|
16
platypush/message/event/websocket.py
Normal file
16
platypush/message/event/websocket.py
Normal file
|
@ -0,0 +1,16 @@
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from platypush.message.event import Event
|
||||||
|
|
||||||
|
|
||||||
|
class WebsocketMessageEvent(Event):
|
||||||
|
"""
|
||||||
|
Event triggered when a message is receive on a subscribed websocket URL.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, *args, url: str, message: Any, **kwargs):
|
||||||
|
"""
|
||||||
|
:param url: Websocket URL.
|
||||||
|
:param message: The received message.
|
||||||
|
"""
|
||||||
|
super().__init__(*args, url=url, message=message, **kwargs)
|
|
@ -1,13 +1,12 @@
|
||||||
|
import asyncio
|
||||||
import json
|
import json
|
||||||
|
import time
|
||||||
|
|
||||||
try:
|
|
||||||
from websockets.exceptions import ConnectionClosed
|
|
||||||
from websockets import connect as websocket_connect
|
from websockets import connect as websocket_connect
|
||||||
except ImportError:
|
from websockets.exceptions import ConnectionClosed
|
||||||
from websockets import ConnectionClosed, connect as websocket_connect
|
|
||||||
|
|
||||||
from platypush.context import get_or_create_event_loop
|
from platypush.context import get_or_create_event_loop, get_bus
|
||||||
from platypush.message import Message
|
from platypush.message.event.websocket import WebsocketMessageEvent
|
||||||
from platypush.plugins import Plugin, action
|
from platypush.plugins import Plugin, action
|
||||||
from platypush.utils import get_ssl_client_context
|
from platypush.utils import get_ssl_client_context
|
||||||
|
|
||||||
|
@ -15,61 +14,180 @@ from platypush.utils import get_ssl_client_context
|
||||||
class WebsocketPlugin(Plugin):
|
class WebsocketPlugin(Plugin):
|
||||||
"""
|
"""
|
||||||
Plugin to send messages over a websocket connection.
|
Plugin to send messages over a websocket connection.
|
||||||
|
|
||||||
|
Triggers:
|
||||||
|
|
||||||
|
* :class:`platypush.message.event.websocket.WebsocketMessageEvent` when
|
||||||
|
a message is received on a subscribed websocket.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, **kwargs):
|
|
||||||
super().__init__(**kwargs)
|
|
||||||
|
|
||||||
@action
|
@action
|
||||||
def send(self, url, msg, ssl_cert=None, ssl_key=None, ssl_cafile=None, ssl_capath=None):
|
def send(
|
||||||
|
self,
|
||||||
|
url: str,
|
||||||
|
msg,
|
||||||
|
ssl_cert=None,
|
||||||
|
ssl_key=None,
|
||||||
|
ssl_cafile=None,
|
||||||
|
ssl_capath=None,
|
||||||
|
wait_response=False,
|
||||||
|
):
|
||||||
"""
|
"""
|
||||||
Sends a message to a websocket.
|
Sends a message to a websocket.
|
||||||
|
|
||||||
:param url: Websocket URL, e.g. ws://localhost:8765 or wss://localhost:8765
|
:param url: Websocket URL, e.g. ws://localhost:8765 or wss://localhost:8765
|
||||||
:type url: str
|
|
||||||
|
|
||||||
:param msg: Message to be sent. It can be a list, a dict, or a Message object
|
:param msg: Message to be sent. It can be a list, a dict, or a Message object
|
||||||
|
:param ssl_cert: Path to the SSL certificate to be used, if the SSL
|
||||||
:param ssl_cert: Path to the SSL certificate to be used, if the SSL connection requires client authentication
|
connection requires client authentication as well (default: None)
|
||||||
as well (default: None) :type ssl_cert: str
|
:param ssl_key: Path to the SSL key to be used, if the SSL connection
|
||||||
|
requires client authentication as well (default: None)
|
||||||
:param ssl_key: Path to the SSL key to be used, if the SSL connection requires client authentication as well
|
:param ssl_cafile: Path to the certificate authority file if required
|
||||||
(default: None) :type ssl_key: str
|
by the SSL configuration (default: None)
|
||||||
|
:param ssl_capath: Path to the certificate authority directory if
|
||||||
:param ssl_cafile: Path to the certificate authority file if required by the SSL configuration (default: None)
|
required by the SSL configuration (default: None)
|
||||||
:type ssl_cafile: str
|
:param wait_response: Set to True if you expect a response to the
|
||||||
|
delivered message.
|
||||||
:param ssl_capath: Path to the certificate authority directory if required by the SSL configuration
|
:return: The received response if ``wait_response`` is set to True,
|
||||||
(default: None)
|
otherwise nothing.
|
||||||
:type ssl_capath: str
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
async def send():
|
async def send():
|
||||||
websocket_args = {}
|
websocket_args = {
|
||||||
if ssl_cert:
|
'ssl': self._get_ssl_context(
|
||||||
websocket_args['ssl'] = get_ssl_client_context(ssl_cert=ssl_cert,
|
url,
|
||||||
|
ssl_cert=ssl_cert,
|
||||||
ssl_key=ssl_key,
|
ssl_key=ssl_key,
|
||||||
ssl_cafile=ssl_cafile,
|
ssl_cafile=ssl_cafile,
|
||||||
ssl_capath=ssl_capath)
|
ssl_capath=ssl_capath,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
async with websocket_connect(url, **websocket_args) as websocket:
|
async with websocket_connect(url, **websocket_args) as ws:
|
||||||
try:
|
try:
|
||||||
await websocket.send(str(msg))
|
await ws.send(str(msg))
|
||||||
except ConnectionClosed as err:
|
except ConnectionClosed as err:
|
||||||
self.logger.warning('Error on websocket {}: {}'.
|
self.logger.warning('Error on websocket %s: %s', url, err)
|
||||||
format(url, err))
|
|
||||||
|
|
||||||
try:
|
if wait_response:
|
||||||
msg = json.dumps(msg)
|
messages = await self._ws_recv(ws, num_messages=1)
|
||||||
except Exception as e:
|
if messages:
|
||||||
self.logger.debug(e)
|
return self._parse_msg(messages[0])
|
||||||
|
|
||||||
try:
|
msg = self._parse_msg(msg)
|
||||||
msg = Message.build(json.loads(msg))
|
loop = get_or_create_event_loop()
|
||||||
except Exception as e:
|
return loop.run_until_complete(send())
|
||||||
self.logger.debug(e)
|
|
||||||
|
@action
|
||||||
|
def recv(
|
||||||
|
self,
|
||||||
|
url: str,
|
||||||
|
ssl_cert=None,
|
||||||
|
ssl_key=None,
|
||||||
|
ssl_cafile=None,
|
||||||
|
ssl_capath=None,
|
||||||
|
num_messages=0,
|
||||||
|
timeout=0,
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Receive one or more messages from a websocket.
|
||||||
|
|
||||||
|
A :class:`platypush.message.event.websocket.WebsocketMessageEvent`
|
||||||
|
event will be triggered whenever a new message is received.
|
||||||
|
|
||||||
|
:param url: Websocket URL, e.g. ws://localhost:8765 or wss://localhost:8765
|
||||||
|
:param ssl_cert: Path to the SSL certificate to be used, if the SSL
|
||||||
|
connection requires client authentication as well (default: None)
|
||||||
|
:param ssl_key: Path to the SSL key to be used, if the SSL connection
|
||||||
|
requires client authentication as well (default: None)
|
||||||
|
:param ssl_cafile: Path to the certificate authority file if required
|
||||||
|
by the SSL configuration (default: None)
|
||||||
|
:param ssl_capath: Path to the certificate authority directory if
|
||||||
|
required by the SSL configuration (default: None)
|
||||||
|
:param num_messages: Exit after receiving this number of messages.
|
||||||
|
Default: 0, receive forever.
|
||||||
|
:param timeout: Message receive timeout in seconds. Default: 0 - no timeout.
|
||||||
|
:return: A list with the messages that have been received, unless
|
||||||
|
``num_messages`` is set to 0 or ``None``.
|
||||||
|
"""
|
||||||
|
|
||||||
|
async def recv():
|
||||||
|
websocket_args = {
|
||||||
|
'ssl': self._get_ssl_context(
|
||||||
|
url,
|
||||||
|
ssl_cert=ssl_cert,
|
||||||
|
ssl_key=ssl_key,
|
||||||
|
ssl_cafile=ssl_cafile,
|
||||||
|
ssl_capath=ssl_capath,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
async with websocket_connect(url, **websocket_args) as ws:
|
||||||
|
return await self._ws_recv(
|
||||||
|
ws, timeout=timeout, num_messages=num_messages
|
||||||
|
)
|
||||||
|
|
||||||
loop = get_or_create_event_loop()
|
loop = get_or_create_event_loop()
|
||||||
loop.run_until_complete(send())
|
return loop.run_until_complete(recv())
|
||||||
|
|
||||||
|
async def _ws_recv(self, ws, timeout=0, num_messages=0):
|
||||||
|
messages = []
|
||||||
|
time_start = time.time()
|
||||||
|
time_end = time_start + timeout if timeout else 0
|
||||||
|
url = 'ws{secure}://{host}:{port}{path}'.format(
|
||||||
|
secure='s' if ws.secure else '',
|
||||||
|
host=ws.host,
|
||||||
|
port=ws.port,
|
||||||
|
path=ws.path,
|
||||||
|
)
|
||||||
|
|
||||||
|
while (not num_messages) or (len(messages) < num_messages):
|
||||||
|
msg = None
|
||||||
|
err = None
|
||||||
|
remaining_timeout = time_end - time.time() if time_end else None
|
||||||
|
|
||||||
|
try:
|
||||||
|
msg = await asyncio.wait_for(ws.recv(), remaining_timeout)
|
||||||
|
except (ConnectionClosed, asyncio.exceptions.TimeoutError) as e:
|
||||||
|
err = e
|
||||||
|
self.logger.warning('Error on websocket %s: %s', url, e)
|
||||||
|
|
||||||
|
if isinstance(err, ConnectionClosed) or (
|
||||||
|
time_end and time.time() > time_end
|
||||||
|
):
|
||||||
|
break
|
||||||
|
|
||||||
|
if msg is None:
|
||||||
|
continue
|
||||||
|
|
||||||
|
msg = self._parse_msg(msg)
|
||||||
|
messages.append(msg)
|
||||||
|
get_bus().post(WebsocketMessageEvent(url=url, message=msg))
|
||||||
|
|
||||||
|
return messages
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _parse_msg(msg):
|
||||||
|
try:
|
||||||
|
msg = json.dumps(msg)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
return msg
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _get_ssl_context(
|
||||||
|
url: str, ssl_cert=None, ssl_key=None, ssl_cafile=None, ssl_capath=None
|
||||||
|
):
|
||||||
|
if url.startswith('wss://'):
|
||||||
|
return get_ssl_client_context(
|
||||||
|
ssl_cert=ssl_cert,
|
||||||
|
ssl_key=ssl_key,
|
||||||
|
ssl_cafile=ssl_cafile,
|
||||||
|
ssl_capath=ssl_capath,
|
||||||
|
)
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
# vim:sw=4:ts=4:et:
|
# vim:sw=4:ts=4:et:
|
||||||
|
|
|
@ -1,5 +1,7 @@
|
||||||
manifest:
|
manifest:
|
||||||
events: {}
|
events:
|
||||||
|
platypush.message.event.websocket.WebsocketMessageEvent: when a message is
|
||||||
|
received on a subscribed websocket.
|
||||||
install:
|
install:
|
||||||
pip: []
|
pip: []
|
||||||
package: platypush.plugins.websocket
|
package: platypush.plugins.websocket
|
||||||
|
|
Loading…
Reference in a new issue