diff --git a/README.md b/README.md index 9f9f5a05c..8ce79eb61 100644 --- a/README.md +++ b/README.md @@ -188,8 +188,6 @@ backend](https://docs.platypush.tech/en/latest/platypush/backend/http.html), an instance](https://docs.platypush.tech/en/latest/platypush/backend/mqtt.html), a [Kafka instance](https://docs.platypush.tech/en/latest/platypush/backend/kafka.html), -a [Websocket -service](https://docs.platypush.tech/en/latest/platypush/backend/websocket.html), [Pushbullet](https://docs.platypush.tech/en/latest/platypush/backend/pushbullet.html) etc.) or monitor a device or a service for events (like a [sensor](https://docs.platypush.tech/en/latest/platypush/backend/sensor.html), diff --git a/docs/source/backends.rst b/docs/source/backends.rst index d017cd03e..1ab13b225 100644 --- a/docs/source/backends.rst +++ b/docs/source/backends.rst @@ -55,7 +55,6 @@ Backends platypush/backend/weather.buienradar.rst platypush/backend/weather.darksky.rst platypush/backend/weather.openweathermap.rst - platypush/backend/websocket.rst platypush/backend/wiimote.rst platypush/backend/zwave.rst platypush/backend/zwave.mqtt.rst diff --git a/docs/source/platypush/backend/websocket.rst b/docs/source/platypush/backend/websocket.rst deleted file mode 100644 index 80fa634c3..000000000 --- a/docs/source/platypush/backend/websocket.rst +++ /dev/null @@ -1,6 +0,0 @@ -``websocket`` -=============================== - -.. automodule:: platypush.backend.websocket - :members: - diff --git a/platypush/backend/websocket/__init__.py b/platypush/backend/websocket/__init__.py deleted file mode 100644 index 1fd8a6b43..000000000 --- a/platypush/backend/websocket/__init__.py +++ /dev/null @@ -1,164 +0,0 @@ -from platypush.backend import Backend -try: - from websockets.exceptions import ConnectionClosed - from websockets import serve as websocket_serve -except ImportError: - from websockets import ConnectionClosed, serve as websocket_serve - -from platypush.context import get_plugin, get_or_create_event_loop -from platypush.message import Message -from platypush.message.request import Request -from platypush.message.response import Response -from platypush.utils import get_ssl_server_context - - -class WebsocketBackend(Backend): - """ - Backend to communicate messages over a websocket medium. - """ - - # Websocket client message recv timeout in seconds - _websocket_client_timeout = 0 - - # Default websocket service port - _default_websocket_port = 8765 - - def __init__(self, port=_default_websocket_port, bind_address='0.0.0.0', - ssl_cafile=None, ssl_capath=None, ssl_cert=None, ssl_key=None, - client_timeout=_websocket_client_timeout, **kwargs): - """ - :param port: Listen port for the websocket server (default: 8765) - :type port: int - - :param bind_address: Bind address for the websocket server (default: 0.0.0.0, listen for any IP connection) - :type websocket_port: str - - :param ssl_cert: Path to the certificate file if you want to enable SSL (default: None) - :type ssl_cert: str - - :param ssl_key: Path to the key file if you want to enable SSL (default: None) - :type ssl_key: str - - :param ssl_cafile: Path to the certificate authority file if required by the SSL configuration (default: None) - :type ssl_cafile: str - - :param ssl_capath: Path to the certificate authority directory if required by the SSL configuration - (default: None) - :type ssl_capath: str - - :param client_timeout: Timeout without any messages being received before closing a client connection. - A zero timeout keeps the websocket open until an error occurs (default: 0, no timeout) - :type ping_timeout: int - """ - - super().__init__(**kwargs) - - self.port = port - self.bind_address = bind_address - self.client_timeout = client_timeout - self.active_websockets = set() - self._loop = None - - self.ssl_context = get_ssl_server_context(ssl_cert=ssl_cert, - ssl_key=ssl_key, - ssl_cafile=ssl_cafile, - ssl_capath=ssl_capath) \ - if ssl_cert else None - - def send_message(self, msg, **kwargs): - websocket = get_plugin('websocket') - websocket_args = {} - - if self.ssl_context: - url = 'wss://localhost:{}'.format(self.port) - websocket_args['ssl'] = self.ssl_context - else: - url = 'ws://localhost:{}'.format(self.port) - - websocket.send(url=url, msg=msg, **websocket_args) - - def notify_web_clients(self, event): - """ Notify all the connected web clients (over websocket) of a new event """ - - async def send_event(websocket): - try: - await websocket.send(str(event)) - except Exception as e: - self.logger.warning('Error on websocket send_event: {}'.format(e)) - - loop = get_or_create_event_loop() - active_websockets = self.active_websockets.copy() - - for ws in active_websockets: - try: - loop.run_until_complete(send_event(ws)) - except ConnectionClosed: - self.logger.info('Client connection lost') - self.active_websockets.remove(ws) - - def run(self): - import asyncio - - super().run() - self.register_service(port=self.port, name='ws') - - # noinspection PyUnusedLocal - async def serve_client(websocket, path): - self.active_websockets.add(websocket) - self.logger.debug('New websocket connection from {}'. - format(websocket.remote_address[0])) - - try: - while not self.should_stop(): - if self.client_timeout: - msg = await asyncio.wait_for(websocket.recv(), - timeout=self.client_timeout) - else: - msg = await websocket.recv() - - msg = Message.build(msg) - self.logger.info('Received message from {}: {}'. - format(websocket.remote_address[0], msg)) - - self.on_message(msg) - - if isinstance(msg, Request): - response = self.get_message_response(msg) or Response() - self.logger.info('Processing response on the websocket backend: {}'. - format(response)) - - await websocket.send(str(response)) - - except ConnectionClosed as e: - self.active_websockets.remove(websocket) - self.logger.debug('Websocket client {} closed connection'. - format(websocket.remote_address[0])) - except asyncio.TimeoutError as e: - self.active_websockets.remove(websocket) - self.logger.debug('Websocket connection to {} timed out'. - format(websocket.remote_address[0])) - except Exception as e: - self.logger.exception(e) - - self.logger.info('Initialized websocket backend on port {}, bind address: {}'. - format(self.port, self.bind_address)) - - websocket_args = {} - if self.ssl_context: - websocket_args['ssl'] = self.ssl_context - - self._loop = get_or_create_event_loop() - server = websocket_serve(serve_client, self.bind_address, self.port, **websocket_args) - - self._loop.run_until_complete(server) - self._loop.run_forever() - - def on_stop(self): - self.logger.info('Received STOP event on the websocket backend') - if self._loop: - self._loop.stop() - - self.logger.info('Websocket backend terminated') - - -# vim:sw=4:ts=4:et: diff --git a/platypush/backend/websocket/manifest.yaml b/platypush/backend/websocket/manifest.yaml deleted file mode 100644 index dc60f459d..000000000 --- a/platypush/backend/websocket/manifest.yaml +++ /dev/null @@ -1,6 +0,0 @@ -manifest: - events: {} - install: - pip: [] - package: platypush.backend.websocket - type: backend