diff --git a/platypush/backend/http/__init__.py b/platypush/backend/http/__init__.py index f7afa70b4..8e50c8c32 100644 --- a/platypush/backend/http/__init__.py +++ b/platypush/backend/http/__init__.py @@ -15,7 +15,7 @@ from flask import Flask, Response, abort, jsonify, request as http_request, \ from redis import Redis from platypush.config import Config -from platypush.context import get_backend +from platypush.context import get_backend, get_or_create_event_loop from platypush.message import Message from platypush.message.event import Event, StopEvent from platypush.message.event.web.widget import WidgetUpdateEvent @@ -146,7 +146,7 @@ class HttpBackend(Backend): except Exception as e: self.logger.warning('Error on websocket send_event: {}'.format(e)) - loop = asyncio.new_event_loop() + loop = get_or_create_event_loop() for websocket in self.active_websockets: try: @@ -370,8 +370,7 @@ class HttpBackend(Backend): self.logger.info('Websocket client {} closed connection'.format(websocket.remote_address[0])) self.active_websockets.remove(websocket) - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) + loop = get_or_create_event_loop() loop.run_until_complete( websockets.serve(register_websocket, '0.0.0.0', self.websocket_port)) loop.run_forever() diff --git a/platypush/backend/websocket.py b/platypush/backend/websocket.py new file mode 100644 index 000000000..99b3dbd13 --- /dev/null +++ b/platypush/backend/websocket.py @@ -0,0 +1,105 @@ +import asyncio +import os +import ssl +import threading +import websockets + +from platypush.backend import Backend +from platypush.context import get_plugin, get_or_create_event_loop +from platypush.message import Message +from platypush.message.request import Request + + +class WebsocketBackend(Backend): + """ + Backend to communicate messages over a websocket medium. + + Requires: + + * **websockets** (``pip install websockets``) + """ + + def __init__(self, port=8765, bind_address='0.0.0.0', ssl_cert=None, **kwargs): + """ + :param port: Listen port for the websocket server (default: 8765) + :type port: int + + :param bind_address: Bind address for the websocket server (default: 0.0.0.0, listen for any IP connection) + :type websocket_port: str + + :param ssl_cert: Path to the PEM certificate file if you want to enable SSL (default: None) + :type ssl_cert: str + """ + + super().__init__(**kwargs) + + self.port = port + self.bind_address = bind_address + self.ssl_context = None + + if ssl_cert: + self.ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) + self.ssl_context.load_cert_chain(os.path.abspath( + os.path.expanduser(ssl_cert))) + + + def send_message(self, msg): + websocket = get_plugin('websocket') + websocket_args = {} + + if self.ssl_context: + url = 'wss://localhost:{}'.format(self.port) + websocket_args['ssl'] = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + websocket_args['ssl'].load_cert_chain(os.path.abspath( + os.path.expanduser(ssl_cert))) + else: + url = 'ws://localhost:{}'.format(self.port) + + websocket.send(url=url, msg=msg, **websocket_args) + + + def run(self): + super().run() + + async def serve_client(websocket, path): + self.logger.info('New websocket connection from {}'. + format(websocket.remote_address[0])) + + try: + msg = await websocket.recv() + msg = Message.build(msg) + self.logger.info('Received message from {}: {}'. + format(websocket.remote_address[0], msg)) + + self.on_message(msg) + + if isinstance(msg, Request): + response = self.get_message_response(msg) + + if response: + self.logger.info('Processing response on the websocket backend: {}'. + format(response)) + + await websocket.send(str(response)) + except Exception as e: + if isinstance(e, websockets.exceptions.ConnectionClosed): + self.logger.info('Websocket client {} closed connection'. + format(websocket.remote_address[0])) + else: + self.logger.exception(e) + + self.logger.info('Initialized websocket backend on port {}, bind address: {}'. + format(self.port, self.bind_address)) + + websocket_args = {} + if self.ssl_context: + websocket_args['ssl'] = self.ssl_context + + loop = get_or_create_event_loop() + server = websockets.serve(serve_client, self.bind_address, self.port, **websocket_args) + loop.run_until_complete(server) + loop.run_forever() + + +# vim:sw=4:ts=4:et: + diff --git a/platypush/context/__init__.py b/platypush/context/__init__.py index 95945b589..6b46449fe 100644 --- a/platypush/context/__init__.py +++ b/platypush/context/__init__.py @@ -1,3 +1,4 @@ +import asyncio import importlib import logging @@ -113,5 +114,15 @@ def register_plugin(name, plugin, **kwargs): """ Registers a plugin instance by name """ global plugins +def get_or_create_event_loop(): + try: + loop = asyncio.get_event_loop() + except RuntimeError: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + return loop + + # vim:sw=4:ts=4:et: diff --git a/platypush/plugins/websocket.py b/platypush/plugins/websocket.py new file mode 100644 index 000000000..06dd07e5e --- /dev/null +++ b/platypush/plugins/websocket.py @@ -0,0 +1,62 @@ +import asyncio +import json +import os +import websockets + +from platypush.context import get_or_create_event_loop +from platypush.message import Message +from platypush.plugins import Plugin, action + + +class WebsocketPlugin(Plugin): + """ + Plugin to send messages over a websocket connection + + Requires: + + * **websockets** (``pip install websockets``) + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + @action + def send(self, url, msg, ssl_cert=None, *args, **kwargs): + """ + Sends a message to a websocket. + + :param url: Websocket URL, e.g. ws://localhost:8765 or wss://localhost:8765 + :type topic: str + + :param msg: Message to be sent. It can be a list, a dict, or a Message object + + :param ssl_cert: Path to the SSL certificate to be used, if the SSL connection requires client authentication as well (default: None) + :type ssl_cert: str + """ + + async def send(): + websocket_args = {} + if ssl_cert: + ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + ssl_context.load_cert_chain(os.path.abspath( + os.path.expanduser(ssl_cert))) + + async with websockets.connect(url, **websocket_args) as websocket: + try: + await websocket.send(str(msg)) + except websockets.exceptions.ConnectionClosed: + self.logger.warning('Error on websocket {}: {}'. + format(url, e)) + + try: msg = json.dumps(msg) + except: pass + + try: msg = Message.build(json.loads(msg)) + except: pass + + loop = get_or_create_event_loop() + loop.run_until_complete(send()) + + +# vim:sw=4:ts=4:et: +