platypush/platypush/backend/http/__init__.py

153 lines
5.0 KiB
Python
Raw Normal View History

2018-01-29 13:47:21 +01:00
import asyncio
import inspect
2018-01-04 02:45:23 +01:00
import logging
import json
2018-01-29 13:47:21 +01:00
import os
2018-01-04 17:20:35 +01:00
import time
2018-01-29 13:47:21 +01:00
import websockets
2018-01-04 02:45:23 +01:00
2018-01-29 13:47:21 +01:00
from threading import Thread
2018-01-04 02:45:23 +01:00
from multiprocessing import Process
2018-01-29 13:47:21 +01:00
from flask import Flask, abort, jsonify, request, render_template, send_from_directory
2018-01-04 02:45:23 +01:00
2018-01-04 17:20:35 +01:00
from platypush.config import Config
2018-01-04 02:45:23 +01:00
from platypush.message import Message
2018-01-29 13:47:21 +01:00
from platypush.message.event import Event
2018-01-04 02:45:23 +01:00
from platypush.message.request import Request
from .. import Backend
class HttpBackend(Backend):
""" Example interaction with the HTTP backend to make requests:
$ curl -XPOST -H 'Content-Type: application/json' -H "X-Token: your_token" \
-d '{"type":"request","target":"nodename","action":"tts.say","args": {"phrase":"This is a test"}}' \
http://localhost:8008/execute """
2018-01-04 02:45:23 +01:00
2018-01-29 13:47:21 +01:00
def __init__(self, port=8008, websocket_port=8009, token=None, **kwargs):
2018-01-04 02:45:23 +01:00
super().__init__(**kwargs)
self.port = port
2018-01-29 13:47:21 +01:00
self.websocket_port = websocket_port
2018-01-04 02:45:23 +01:00
self.token = token
2018-01-04 17:20:35 +01:00
self.server_proc = None
2018-01-29 13:47:21 +01:00
self.websocket_thread = None
self.active_websockets = set()
2018-01-04 02:45:23 +01:00
def send_message(self, msg):
2018-01-04 17:20:35 +01:00
logging.warning('Use cURL or any HTTP client to query the HTTP backend')
2018-01-04 02:45:23 +01:00
2018-01-04 17:20:35 +01:00
def stop(self):
logging.info('Received STOP event on HttpBackend')
2018-01-29 13:47:21 +01:00
2018-01-04 17:20:35 +01:00
if self.server_proc:
self.server_proc.terminate()
self.server_proc.join()
2018-01-04 02:45:23 +01:00
2018-01-29 13:47:21 +01:00
def notify_web_clients(self, event):
async def send_event(websocket):
await websocket.send(str(event))
loop = asyncio.new_event_loop()
active_websockets = set()
for websocket in self.active_websockets:
try:
loop.run_until_complete(send_event(websocket))
active_websockets.add(websocket)
except websockets.exceptions.ConnectionClosed:
logging.info('Client connection lost')
self.active_websockets = active_websockets
2018-01-04 02:45:23 +01:00
2018-01-29 13:47:21 +01:00
def webserver(self):
basedir = os.path.dirname(inspect.getfile(self.__class__))
template_dir = os.path.join(basedir, 'templates')
static_dir = os.path.join(basedir, 'static')
2018-01-29 13:47:21 +01:00
app = Flask(__name__, template_folder=template_dir)
2018-01-04 02:45:23 +01:00
@app.route('/execute', methods=['POST'])
2018-01-29 13:47:21 +01:00
def execute():
args = json.loads(request.data.decode('utf-8'))
token = request.headers['X-Token'] if 'X-Token' in request.headers else None
if token != self.token: abort(401)
2018-01-04 02:45:23 +01:00
msg = Message.build(args)
2018-01-04 17:20:35 +01:00
logging.info('Received message on the HTTP backend: {}'.format(msg))
2018-01-04 02:45:23 +01:00
2018-01-04 17:20:35 +01:00
if isinstance(msg, Request):
response = msg.execute(async=False)
logging.info('Processing response on the HTTP backend: {}'.format(msg))
return str(response)
2018-01-29 13:47:21 +01:00
elif isinstance(msg, Event):
self.bus.post(msg)
2018-01-04 02:45:23 +01:00
2018-01-04 17:20:35 +01:00
return jsonify({ 'status': 'ok' })
2018-01-29 13:47:21 +01:00
@app.route('/')
def index():
configured_plugins = Config.get_plugins()
enabled_plugins = {}
for plugin, conf in configured_plugins.items():
template_file = os.path.join('plugins', plugin + '.html')
if os.path.isfile(os.path.join(template_dir, template_file)):
enabled_plugins[plugin] = conf
return render_template('index.html', plugins=enabled_plugins,
token=self.token, websocket_port=self.websocket_port)
@app.route('/static/<path>')
def static_path(path):
return send_from_directory(static_dir, filename)
return app
def websocket(self):
async def register_websocket(websocket, path):
logging.info('New websocket connection from {}'.format(websocket.remote_address[0]))
self.active_websockets.add(websocket)
while True:
try:
waiter = await websocket.ping()
await asyncio.wait_for(waiter, timeout=5)
except (asyncio.TimeoutError, websockets.exceptions.ConnectionClosed) as e:
logging.info('Client {} closed connection'.format(websocket.remote_address[0]))
self.active_websockets.remove(websocket)
break
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(
2018-01-29 14:46:31 +01:00
websockets.serve(register_websocket, '0.0.0.0', self.websocket_port))
2018-01-29 13:47:21 +01:00
loop.run_forever()
def run(self):
super().run()
2018-01-04 17:20:35 +01:00
logging.info('Initialized HTTP backend on port {}'.format(self.port))
2018-01-29 13:47:21 +01:00
webserver = self.webserver()
self.server_proc = Process(target=webserver.run, kwargs={
'debug':True, 'host':'0.0.0.0', 'port':self.port, 'use_reloader':False
2018-01-04 17:20:35 +01:00
})
2018-01-29 13:47:21 +01:00
self.websocket_thread = Thread(target=self.websocket)
2018-01-04 17:20:35 +01:00
time.sleep(1)
2018-01-04 02:45:23 +01:00
self.server_proc.start()
2018-01-29 13:47:21 +01:00
self.websocket_thread.start()
2018-01-04 02:45:23 +01:00
self.server_proc.join()
2018-01-29 13:47:21 +01:00
self.websocket_thread.join()
2018-01-04 02:45:23 +01:00
# vim:sw=4:ts=4:et: