Added websocket backend and plugin
This commit is contained in:
parent
8635ed8268
commit
92a3759721
4 changed files with 181 additions and 4 deletions
|
@ -15,7 +15,7 @@ from flask import Flask, Response, abort, jsonify, request as http_request, \
|
||||||
from redis import Redis
|
from redis import Redis
|
||||||
|
|
||||||
from platypush.config import Config
|
from platypush.config import Config
|
||||||
from platypush.context import get_backend
|
from platypush.context import get_backend, get_or_create_event_loop
|
||||||
from platypush.message import Message
|
from platypush.message import Message
|
||||||
from platypush.message.event import Event, StopEvent
|
from platypush.message.event import Event, StopEvent
|
||||||
from platypush.message.event.web.widget import WidgetUpdateEvent
|
from platypush.message.event.web.widget import WidgetUpdateEvent
|
||||||
|
@ -146,7 +146,7 @@ class HttpBackend(Backend):
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.logger.warning('Error on websocket send_event: {}'.format(e))
|
self.logger.warning('Error on websocket send_event: {}'.format(e))
|
||||||
|
|
||||||
loop = asyncio.new_event_loop()
|
loop = get_or_create_event_loop()
|
||||||
|
|
||||||
for websocket in self.active_websockets:
|
for websocket in self.active_websockets:
|
||||||
try:
|
try:
|
||||||
|
@ -370,8 +370,7 @@ class HttpBackend(Backend):
|
||||||
self.logger.info('Websocket client {} closed connection'.format(websocket.remote_address[0]))
|
self.logger.info('Websocket client {} closed connection'.format(websocket.remote_address[0]))
|
||||||
self.active_websockets.remove(websocket)
|
self.active_websockets.remove(websocket)
|
||||||
|
|
||||||
loop = asyncio.new_event_loop()
|
loop = get_or_create_event_loop()
|
||||||
asyncio.set_event_loop(loop)
|
|
||||||
loop.run_until_complete(
|
loop.run_until_complete(
|
||||||
websockets.serve(register_websocket, '0.0.0.0', self.websocket_port))
|
websockets.serve(register_websocket, '0.0.0.0', self.websocket_port))
|
||||||
loop.run_forever()
|
loop.run_forever()
|
||||||
|
|
105
platypush/backend/websocket.py
Normal file
105
platypush/backend/websocket.py
Normal file
|
@ -0,0 +1,105 @@
|
||||||
|
import asyncio
|
||||||
|
import os
|
||||||
|
import ssl
|
||||||
|
import threading
|
||||||
|
import websockets
|
||||||
|
|
||||||
|
from platypush.backend import Backend
|
||||||
|
from platypush.context import get_plugin, get_or_create_event_loop
|
||||||
|
from platypush.message import Message
|
||||||
|
from platypush.message.request import Request
|
||||||
|
|
||||||
|
|
||||||
|
class WebsocketBackend(Backend):
|
||||||
|
"""
|
||||||
|
Backend to communicate messages over a websocket medium.
|
||||||
|
|
||||||
|
Requires:
|
||||||
|
|
||||||
|
* **websockets** (``pip install websockets``)
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, port=8765, bind_address='0.0.0.0', ssl_cert=None, **kwargs):
|
||||||
|
"""
|
||||||
|
:param port: Listen port for the websocket server (default: 8765)
|
||||||
|
:type port: int
|
||||||
|
|
||||||
|
:param bind_address: Bind address for the websocket server (default: 0.0.0.0, listen for any IP connection)
|
||||||
|
:type websocket_port: str
|
||||||
|
|
||||||
|
:param ssl_cert: Path to the PEM certificate file if you want to enable SSL (default: None)
|
||||||
|
:type ssl_cert: str
|
||||||
|
"""
|
||||||
|
|
||||||
|
super().__init__(**kwargs)
|
||||||
|
|
||||||
|
self.port = port
|
||||||
|
self.bind_address = bind_address
|
||||||
|
self.ssl_context = None
|
||||||
|
|
||||||
|
if ssl_cert:
|
||||||
|
self.ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
|
||||||
|
self.ssl_context.load_cert_chain(os.path.abspath(
|
||||||
|
os.path.expanduser(ssl_cert)))
|
||||||
|
|
||||||
|
|
||||||
|
def send_message(self, msg):
|
||||||
|
websocket = get_plugin('websocket')
|
||||||
|
websocket_args = {}
|
||||||
|
|
||||||
|
if self.ssl_context:
|
||||||
|
url = 'wss://localhost:{}'.format(self.port)
|
||||||
|
websocket_args['ssl'] = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
||||||
|
websocket_args['ssl'].load_cert_chain(os.path.abspath(
|
||||||
|
os.path.expanduser(ssl_cert)))
|
||||||
|
else:
|
||||||
|
url = 'ws://localhost:{}'.format(self.port)
|
||||||
|
|
||||||
|
websocket.send(url=url, msg=msg, **websocket_args)
|
||||||
|
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
super().run()
|
||||||
|
|
||||||
|
async def serve_client(websocket, path):
|
||||||
|
self.logger.info('New websocket connection from {}'.
|
||||||
|
format(websocket.remote_address[0]))
|
||||||
|
|
||||||
|
try:
|
||||||
|
msg = await websocket.recv()
|
||||||
|
msg = Message.build(msg)
|
||||||
|
self.logger.info('Received message from {}: {}'.
|
||||||
|
format(websocket.remote_address[0], msg))
|
||||||
|
|
||||||
|
self.on_message(msg)
|
||||||
|
|
||||||
|
if isinstance(msg, Request):
|
||||||
|
response = self.get_message_response(msg)
|
||||||
|
|
||||||
|
if response:
|
||||||
|
self.logger.info('Processing response on the websocket backend: {}'.
|
||||||
|
format(response))
|
||||||
|
|
||||||
|
await websocket.send(str(response))
|
||||||
|
except Exception as e:
|
||||||
|
if isinstance(e, websockets.exceptions.ConnectionClosed):
|
||||||
|
self.logger.info('Websocket client {} closed connection'.
|
||||||
|
format(websocket.remote_address[0]))
|
||||||
|
else:
|
||||||
|
self.logger.exception(e)
|
||||||
|
|
||||||
|
self.logger.info('Initialized websocket backend on port {}, bind address: {}'.
|
||||||
|
format(self.port, self.bind_address))
|
||||||
|
|
||||||
|
websocket_args = {}
|
||||||
|
if self.ssl_context:
|
||||||
|
websocket_args['ssl'] = self.ssl_context
|
||||||
|
|
||||||
|
loop = get_or_create_event_loop()
|
||||||
|
server = websockets.serve(serve_client, self.bind_address, self.port, **websocket_args)
|
||||||
|
loop.run_until_complete(server)
|
||||||
|
loop.run_forever()
|
||||||
|
|
||||||
|
|
||||||
|
# vim:sw=4:ts=4:et:
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
import asyncio
|
||||||
import importlib
|
import importlib
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
|
@ -113,5 +114,15 @@ def register_plugin(name, plugin, **kwargs):
|
||||||
""" Registers a plugin instance by name """
|
""" Registers a plugin instance by name """
|
||||||
global plugins
|
global plugins
|
||||||
|
|
||||||
|
def get_or_create_event_loop():
|
||||||
|
try:
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
except RuntimeError:
|
||||||
|
loop = asyncio.new_event_loop()
|
||||||
|
asyncio.set_event_loop(loop)
|
||||||
|
|
||||||
|
return loop
|
||||||
|
|
||||||
|
|
||||||
# vim:sw=4:ts=4:et:
|
# vim:sw=4:ts=4:et:
|
||||||
|
|
||||||
|
|
62
platypush/plugins/websocket.py
Normal file
62
platypush/plugins/websocket.py
Normal file
|
@ -0,0 +1,62 @@
|
||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import websockets
|
||||||
|
|
||||||
|
from platypush.context import get_or_create_event_loop
|
||||||
|
from platypush.message import Message
|
||||||
|
from platypush.plugins import Plugin, action
|
||||||
|
|
||||||
|
|
||||||
|
class WebsocketPlugin(Plugin):
|
||||||
|
"""
|
||||||
|
Plugin to send messages over a websocket connection
|
||||||
|
|
||||||
|
Requires:
|
||||||
|
|
||||||
|
* **websockets** (``pip install websockets``)
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
super().__init__(*args, **kwargs)
|
||||||
|
|
||||||
|
@action
|
||||||
|
def send(self, url, msg, ssl_cert=None, *args, **kwargs):
|
||||||
|
"""
|
||||||
|
Sends a message to a websocket.
|
||||||
|
|
||||||
|
:param url: Websocket URL, e.g. ws://localhost:8765 or wss://localhost:8765
|
||||||
|
:type topic: str
|
||||||
|
|
||||||
|
:param msg: Message to be sent. It can be a list, a dict, or a Message object
|
||||||
|
|
||||||
|
:param ssl_cert: Path to the SSL certificate to be used, if the SSL connection requires client authentication as well (default: None)
|
||||||
|
:type ssl_cert: str
|
||||||
|
"""
|
||||||
|
|
||||||
|
async def send():
|
||||||
|
websocket_args = {}
|
||||||
|
if ssl_cert:
|
||||||
|
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
||||||
|
ssl_context.load_cert_chain(os.path.abspath(
|
||||||
|
os.path.expanduser(ssl_cert)))
|
||||||
|
|
||||||
|
async with websockets.connect(url, **websocket_args) as websocket:
|
||||||
|
try:
|
||||||
|
await websocket.send(str(msg))
|
||||||
|
except websockets.exceptions.ConnectionClosed:
|
||||||
|
self.logger.warning('Error on websocket {}: {}'.
|
||||||
|
format(url, e))
|
||||||
|
|
||||||
|
try: msg = json.dumps(msg)
|
||||||
|
except: pass
|
||||||
|
|
||||||
|
try: msg = Message.build(json.loads(msg))
|
||||||
|
except: pass
|
||||||
|
|
||||||
|
loop = get_or_create_event_loop()
|
||||||
|
loop.run_until_complete(send())
|
||||||
|
|
||||||
|
|
||||||
|
# vim:sw=4:ts=4:et:
|
||||||
|
|
Loading…
Reference in a new issue