forked from platypush/platypush
[#260] Removed legacy backend.websocket
.
It has now been replaced by the `/ws/events` and `/ws/requests` websocket routes under `backend.http`.
This commit is contained in:
parent
a069d23bb7
commit
ab2425ebd0
5 changed files with 0 additions and 179 deletions
|
@ -188,8 +188,6 @@ backend](https://docs.platypush.tech/en/latest/platypush/backend/http.html), an
|
||||||
instance](https://docs.platypush.tech/en/latest/platypush/backend/mqtt.html), a
|
instance](https://docs.platypush.tech/en/latest/platypush/backend/mqtt.html), a
|
||||||
[Kafka
|
[Kafka
|
||||||
instance](https://docs.platypush.tech/en/latest/platypush/backend/kafka.html),
|
instance](https://docs.platypush.tech/en/latest/platypush/backend/kafka.html),
|
||||||
a [Websocket
|
|
||||||
service](https://docs.platypush.tech/en/latest/platypush/backend/websocket.html),
|
|
||||||
[Pushbullet](https://docs.platypush.tech/en/latest/platypush/backend/pushbullet.html)
|
[Pushbullet](https://docs.platypush.tech/en/latest/platypush/backend/pushbullet.html)
|
||||||
etc.) or monitor a device or a service for events (like a
|
etc.) or monitor a device or a service for events (like a
|
||||||
[sensor](https://docs.platypush.tech/en/latest/platypush/backend/sensor.html),
|
[sensor](https://docs.platypush.tech/en/latest/platypush/backend/sensor.html),
|
||||||
|
|
|
@ -55,7 +55,6 @@ Backends
|
||||||
platypush/backend/weather.buienradar.rst
|
platypush/backend/weather.buienradar.rst
|
||||||
platypush/backend/weather.darksky.rst
|
platypush/backend/weather.darksky.rst
|
||||||
platypush/backend/weather.openweathermap.rst
|
platypush/backend/weather.openweathermap.rst
|
||||||
platypush/backend/websocket.rst
|
|
||||||
platypush/backend/wiimote.rst
|
platypush/backend/wiimote.rst
|
||||||
platypush/backend/zwave.rst
|
platypush/backend/zwave.rst
|
||||||
platypush/backend/zwave.mqtt.rst
|
platypush/backend/zwave.mqtt.rst
|
||||||
|
|
|
@ -1,6 +0,0 @@
|
||||||
``websocket``
|
|
||||||
===============================
|
|
||||||
|
|
||||||
.. automodule:: platypush.backend.websocket
|
|
||||||
:members:
|
|
||||||
|
|
|
@ -1,164 +0,0 @@
|
||||||
from platypush.backend import Backend
|
|
||||||
try:
|
|
||||||
from websockets.exceptions import ConnectionClosed
|
|
||||||
from websockets import serve as websocket_serve
|
|
||||||
except ImportError:
|
|
||||||
from websockets import ConnectionClosed, serve as websocket_serve
|
|
||||||
|
|
||||||
from platypush.context import get_plugin, get_or_create_event_loop
|
|
||||||
from platypush.message import Message
|
|
||||||
from platypush.message.request import Request
|
|
||||||
from platypush.message.response import Response
|
|
||||||
from platypush.utils import get_ssl_server_context
|
|
||||||
|
|
||||||
|
|
||||||
class WebsocketBackend(Backend):
|
|
||||||
"""
|
|
||||||
Backend to communicate messages over a websocket medium.
|
|
||||||
"""
|
|
||||||
|
|
||||||
# Websocket client message recv timeout in seconds
|
|
||||||
_websocket_client_timeout = 0
|
|
||||||
|
|
||||||
# Default websocket service port
|
|
||||||
_default_websocket_port = 8765
|
|
||||||
|
|
||||||
def __init__(self, port=_default_websocket_port, bind_address='0.0.0.0',
|
|
||||||
ssl_cafile=None, ssl_capath=None, ssl_cert=None, ssl_key=None,
|
|
||||||
client_timeout=_websocket_client_timeout, **kwargs):
|
|
||||||
"""
|
|
||||||
:param port: Listen port for the websocket server (default: 8765)
|
|
||||||
:type port: int
|
|
||||||
|
|
||||||
:param bind_address: Bind address for the websocket server (default: 0.0.0.0, listen for any IP connection)
|
|
||||||
:type websocket_port: str
|
|
||||||
|
|
||||||
:param ssl_cert: Path to the certificate file if you want to enable SSL (default: None)
|
|
||||||
:type ssl_cert: str
|
|
||||||
|
|
||||||
:param ssl_key: Path to the key file if you want to enable SSL (default: None)
|
|
||||||
:type ssl_key: str
|
|
||||||
|
|
||||||
:param ssl_cafile: Path to the certificate authority file if required by the SSL configuration (default: None)
|
|
||||||
:type ssl_cafile: str
|
|
||||||
|
|
||||||
:param ssl_capath: Path to the certificate authority directory if required by the SSL configuration
|
|
||||||
(default: None)
|
|
||||||
:type ssl_capath: str
|
|
||||||
|
|
||||||
:param client_timeout: Timeout without any messages being received before closing a client connection.
|
|
||||||
A zero timeout keeps the websocket open until an error occurs (default: 0, no timeout)
|
|
||||||
:type ping_timeout: int
|
|
||||||
"""
|
|
||||||
|
|
||||||
super().__init__(**kwargs)
|
|
||||||
|
|
||||||
self.port = port
|
|
||||||
self.bind_address = bind_address
|
|
||||||
self.client_timeout = client_timeout
|
|
||||||
self.active_websockets = set()
|
|
||||||
self._loop = None
|
|
||||||
|
|
||||||
self.ssl_context = get_ssl_server_context(ssl_cert=ssl_cert,
|
|
||||||
ssl_key=ssl_key,
|
|
||||||
ssl_cafile=ssl_cafile,
|
|
||||||
ssl_capath=ssl_capath) \
|
|
||||||
if ssl_cert else None
|
|
||||||
|
|
||||||
def send_message(self, msg, **kwargs):
|
|
||||||
websocket = get_plugin('websocket')
|
|
||||||
websocket_args = {}
|
|
||||||
|
|
||||||
if self.ssl_context:
|
|
||||||
url = 'wss://localhost:{}'.format(self.port)
|
|
||||||
websocket_args['ssl'] = self.ssl_context
|
|
||||||
else:
|
|
||||||
url = 'ws://localhost:{}'.format(self.port)
|
|
||||||
|
|
||||||
websocket.send(url=url, msg=msg, **websocket_args)
|
|
||||||
|
|
||||||
def notify_web_clients(self, event):
|
|
||||||
""" Notify all the connected web clients (over websocket) of a new event """
|
|
||||||
|
|
||||||
async def send_event(websocket):
|
|
||||||
try:
|
|
||||||
await websocket.send(str(event))
|
|
||||||
except Exception as e:
|
|
||||||
self.logger.warning('Error on websocket send_event: {}'.format(e))
|
|
||||||
|
|
||||||
loop = get_or_create_event_loop()
|
|
||||||
active_websockets = self.active_websockets.copy()
|
|
||||||
|
|
||||||
for ws in active_websockets:
|
|
||||||
try:
|
|
||||||
loop.run_until_complete(send_event(ws))
|
|
||||||
except ConnectionClosed:
|
|
||||||
self.logger.info('Client connection lost')
|
|
||||||
self.active_websockets.remove(ws)
|
|
||||||
|
|
||||||
def run(self):
|
|
||||||
import asyncio
|
|
||||||
|
|
||||||
super().run()
|
|
||||||
self.register_service(port=self.port, name='ws')
|
|
||||||
|
|
||||||
# noinspection PyUnusedLocal
|
|
||||||
async def serve_client(websocket, path):
|
|
||||||
self.active_websockets.add(websocket)
|
|
||||||
self.logger.debug('New websocket connection from {}'.
|
|
||||||
format(websocket.remote_address[0]))
|
|
||||||
|
|
||||||
try:
|
|
||||||
while not self.should_stop():
|
|
||||||
if self.client_timeout:
|
|
||||||
msg = await asyncio.wait_for(websocket.recv(),
|
|
||||||
timeout=self.client_timeout)
|
|
||||||
else:
|
|
||||||
msg = await websocket.recv()
|
|
||||||
|
|
||||||
msg = Message.build(msg)
|
|
||||||
self.logger.info('Received message from {}: {}'.
|
|
||||||
format(websocket.remote_address[0], msg))
|
|
||||||
|
|
||||||
self.on_message(msg)
|
|
||||||
|
|
||||||
if isinstance(msg, Request):
|
|
||||||
response = self.get_message_response(msg) or Response()
|
|
||||||
self.logger.info('Processing response on the websocket backend: {}'.
|
|
||||||
format(response))
|
|
||||||
|
|
||||||
await websocket.send(str(response))
|
|
||||||
|
|
||||||
except ConnectionClosed as e:
|
|
||||||
self.active_websockets.remove(websocket)
|
|
||||||
self.logger.debug('Websocket client {} closed connection'.
|
|
||||||
format(websocket.remote_address[0]))
|
|
||||||
except asyncio.TimeoutError as e:
|
|
||||||
self.active_websockets.remove(websocket)
|
|
||||||
self.logger.debug('Websocket connection to {} timed out'.
|
|
||||||
format(websocket.remote_address[0]))
|
|
||||||
except Exception as e:
|
|
||||||
self.logger.exception(e)
|
|
||||||
|
|
||||||
self.logger.info('Initialized websocket backend on port {}, bind address: {}'.
|
|
||||||
format(self.port, self.bind_address))
|
|
||||||
|
|
||||||
websocket_args = {}
|
|
||||||
if self.ssl_context:
|
|
||||||
websocket_args['ssl'] = self.ssl_context
|
|
||||||
|
|
||||||
self._loop = get_or_create_event_loop()
|
|
||||||
server = websocket_serve(serve_client, self.bind_address, self.port, **websocket_args)
|
|
||||||
|
|
||||||
self._loop.run_until_complete(server)
|
|
||||||
self._loop.run_forever()
|
|
||||||
|
|
||||||
def on_stop(self):
|
|
||||||
self.logger.info('Received STOP event on the websocket backend')
|
|
||||||
if self._loop:
|
|
||||||
self._loop.stop()
|
|
||||||
|
|
||||||
self.logger.info('Websocket backend terminated')
|
|
||||||
|
|
||||||
|
|
||||||
# vim:sw=4:ts=4:et:
|
|
|
@ -1,6 +0,0 @@
|
||||||
manifest:
|
|
||||||
events: {}
|
|
||||||
install:
|
|
||||||
pip: []
|
|
||||||
package: platypush.backend.websocket
|
|
||||||
type: backend
|
|
Loading…
Reference in a new issue